diff options
| author | Dirk Engling <erdgeist@erdgeist.org> | 2024-04-18 14:54:34 +0200 |
|---|---|---|
| committer | Dirk Engling <erdgeist@erdgeist.org> | 2024-04-18 14:54:34 +0200 |
| commit | 33bd2c9094e7f90a62cb59cdf5cf670ac58d5308 (patch) | |
| tree | 9b5825b65a655953a605c3cc7525eb9456e686e6 | |
| parent | 160ba08074827f0ddecaf611dd9f9b15593be9c1 (diff) | |
Add support for zstd
| -rw-r--r-- | Makefile | 5 | ||||
| -rw-r--r-- | ot_fullscrape.c | 150 | ||||
| -rw-r--r-- | ot_http.c | 35 | ||||
| -rw-r--r-- | ot_http.h | 5 | ||||
| -rw-r--r-- | ot_mutex.h | 3 | ||||
| -rw-r--r-- | ot_stats.h | 1 |
6 files changed, 183 insertions, 16 deletions
| @@ -27,6 +27,11 @@ STRIP?=strip | |||
| 27 | #FEATURES+=-DWANT_IP_FROM_QUERY_STRING | 27 | #FEATURES+=-DWANT_IP_FROM_QUERY_STRING |
| 28 | FEATURES+=-DWANT_COMPRESSION_GZIP | 28 | FEATURES+=-DWANT_COMPRESSION_GZIP |
| 29 | FEATURES+=-DWANT_COMPRESSION_GZIP_ALWAYS | 29 | FEATURES+=-DWANT_COMPRESSION_GZIP_ALWAYS |
| 30 | |||
| 31 | #FEATURES+=-DWANT_COMPRESSION_ZSTD | ||
| 32 | #FEATURES+=-DWANT_COMPRESSION_ZSTD_ALWAYS | ||
| 33 | #LDFLAGS+=-lzstd | ||
| 34 | |||
| 30 | #FEATURES+=-DWANT_LOG_NETWORKS | 35 | #FEATURES+=-DWANT_LOG_NETWORKS |
| 31 | #FEATURES+=-DWANT_RESTRICT_STATS | 36 | #FEATURES+=-DWANT_RESTRICT_STATS |
| 32 | #FEATURES+=-DWANT_IP_FROM_PROXY | 37 | #FEATURES+=-DWANT_IP_FROM_PROXY |
diff --git a/ot_fullscrape.c b/ot_fullscrape.c index aed2ad9..6fd6d1c 100644 --- a/ot_fullscrape.c +++ b/ot_fullscrape.c | |||
| @@ -14,6 +14,10 @@ | |||
| 14 | #ifdef WANT_COMPRESSION_GZIP | 14 | #ifdef WANT_COMPRESSION_GZIP |
| 15 | #include <zlib.h> | 15 | #include <zlib.h> |
| 16 | #endif | 16 | #endif |
| 17 | #ifdef WANT_COMPRESSION_ZSTD | ||
| 18 | #include <zstd.h> | ||
| 19 | #endif | ||
| 20 | |||
| 17 | 21 | ||
| 18 | /* Libowfat */ | 22 | /* Libowfat */ |
| 19 | #include "byte.h" | 23 | #include "byte.h" |
| @@ -40,6 +44,9 @@ static void fullscrape_make(int taskid, ot_tasktype mode); | |||
| 40 | #ifdef WANT_COMPRESSION_GZIP | 44 | #ifdef WANT_COMPRESSION_GZIP |
| 41 | static void fullscrape_make_gzip(int taskid, ot_tasktype mode); | 45 | static void fullscrape_make_gzip(int taskid, ot_tasktype mode); |
| 42 | #endif | 46 | #endif |
| 47 | #ifdef WANT_COMPRESSION_ZSTD | ||
| 48 | static void fullscrape_make_zstd(int taskid, ot_tasktype mode); | ||
| 49 | #endif | ||
| 43 | 50 | ||
| 44 | /* Converter function from memory to human readable hex strings | 51 | /* Converter function from memory to human readable hex strings |
| 45 | XXX - Duplicated from ot_stats. Needs fix. */ | 52 | XXX - Duplicated from ot_stats. Needs fix. */ |
| @@ -64,6 +71,11 @@ static void *fullscrape_worker(void *args) { | |||
| 64 | while (g_opentracker_running) { | 71 | while (g_opentracker_running) { |
| 65 | ot_tasktype tasktype = TASK_FULLSCRAPE; | 72 | ot_tasktype tasktype = TASK_FULLSCRAPE; |
| 66 | ot_taskid taskid = mutex_workqueue_poptask(&tasktype); | 73 | ot_taskid taskid = mutex_workqueue_poptask(&tasktype); |
| 74 | #ifdef WANT_COMPRESSION_ZSTD | ||
| 75 | if (tasktype & TASK_FLAG_ZSTD) | ||
| 76 | fullscrape_make_zstd(taskid, tasktype); | ||
| 77 | else | ||
| 78 | #endif | ||
| 67 | #ifdef WANT_COMPRESSION_GZIP | 79 | #ifdef WANT_COMPRESSION_GZIP |
| 68 | if (tasktype & TASK_FLAG_GZIP) | 80 | if (tasktype & TASK_FLAG_GZIP) |
| 69 | fullscrape_make_gzip(taskid, tasktype); | 81 | fullscrape_make_gzip(taskid, tasktype); |
| @@ -205,7 +217,6 @@ static void fullscrape_make_gzip(int taskid, ot_tasktype mode) { | |||
| 205 | struct iovec iovector = {NULL, 0}; | 217 | struct iovec iovector = {NULL, 0}; |
| 206 | int zres; | 218 | int zres; |
| 207 | z_stream strm; | 219 | z_stream strm; |
| 208 | fprintf(stderr, "GZIP path\n"); | ||
| 209 | /* Setup return vector... */ | 220 | /* Setup return vector... */ |
| 210 | iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); | 221 | iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); |
| 211 | if (!iovector.iov_base) | 222 | if (!iovector.iov_base) |
| @@ -267,8 +278,10 @@ static void fullscrape_make_gzip(int taskid, ot_tasktype mode) { | |||
| 267 | mutex_bucket_unlock(bucket, 0); | 278 | mutex_bucket_unlock(bucket, 0); |
| 268 | 279 | ||
| 269 | /* Parent thread died? */ | 280 | /* Parent thread died? */ |
| 270 | if (!g_opentracker_running) | 281 | if (!g_opentracker_running) { |
| 282 | deflateEnd(&strm); | ||
| 271 | return; | 283 | return; |
| 284 | } | ||
| 272 | } | 285 | } |
| 273 | 286 | ||
| 274 | if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) { | 287 | if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) { |
| @@ -282,7 +295,8 @@ static void fullscrape_make_gzip(int taskid, ot_tasktype mode) { | |||
| 282 | iovector.iov_len = (char *)strm.next_out - (char *)iovector.iov_base; | 295 | iovector.iov_len = (char *)strm.next_out - (char *)iovector.iov_base; |
| 283 | if (mutex_workqueue_pushchunked(taskid, &iovector)) { | 296 | if (mutex_workqueue_pushchunked(taskid, &iovector)) { |
| 284 | free(iovector.iov_base); | 297 | free(iovector.iov_base); |
| 285 | return mutex_bucket_unlock(bucket, 0); | 298 | deflateEnd(&strm); |
| 299 | return; | ||
| 286 | } | 300 | } |
| 287 | 301 | ||
| 288 | /* Check if there's a last batch of data in the zlib buffer */ | 302 | /* Check if there's a last batch of data in the zlib buffer */ |
| @@ -293,7 +307,7 @@ static void fullscrape_make_gzip(int taskid, ot_tasktype mode) { | |||
| 293 | if (!iovector.iov_base) { | 307 | if (!iovector.iov_base) { |
| 294 | fprintf(stderr, "Problem with iovec_fix_increase_or_free\n"); | 308 | fprintf(stderr, "Problem with iovec_fix_increase_or_free\n"); |
| 295 | deflateEnd(&strm); | 309 | deflateEnd(&strm); |
| 296 | return mutex_bucket_unlock(bucket, 0); | 310 | return; |
| 297 | } | 311 | } |
| 298 | strm.next_out = iovector.iov_base; | 312 | strm.next_out = iovector.iov_base; |
| 299 | strm.avail_out = OT_SCRAPE_CHUNK_SIZE; | 313 | strm.avail_out = OT_SCRAPE_CHUNK_SIZE; |
| @@ -311,5 +325,133 @@ static void fullscrape_make_gzip(int taskid, ot_tasktype mode) { | |||
| 311 | /* WANT_COMPRESSION_GZIP */ | 325 | /* WANT_COMPRESSION_GZIP */ |
| 312 | #endif | 326 | #endif |
| 313 | 327 | ||
| 328 | #ifdef WANT_COMPRESSION_ZSTD | ||
| 329 | |||
| 330 | static void fullscrape_make_zstd(int taskid, ot_tasktype mode) { | ||
| 331 | int bucket; | ||
| 332 | char *r; | ||
| 333 | struct iovec iovector = {NULL, 0}; | ||
| 334 | ZSTD_CCtx *zstream = ZSTD_createCCtx(); | ||
| 335 | ZSTD_inBuffer inbuf; | ||
| 336 | ZSTD_outBuffer outbuf; | ||
| 337 | size_t more_bytes; | ||
| 338 | |||
| 339 | if (!zstream) | ||
| 340 | return; | ||
| 341 | |||
| 342 | /* Setup return vector... */ | ||
| 343 | iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); | ||
| 344 | if (!iovector.iov_base) { | ||
| 345 | ZSTD_freeCCtx(zstream); | ||
| 346 | return; | ||
| 347 | } | ||
| 348 | |||
| 349 | /* Working with a compression level 6 is half as fast as level 3, but | ||
| 350 | seems to be the last reasonable bump that's worth extra cpu */ | ||
| 351 | ZSTD_CCtx_setParameter(zstream, ZSTD_c_compressionLevel, 6); | ||
| 352 | |||
| 353 | outbuf.dst = iovector.iov_base; | ||
| 354 | outbuf.size = OT_SCRAPE_CHUNK_SIZE; | ||
| 355 | outbuf.pos = 0; | ||
| 356 | |||
| 357 | if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) { | ||
| 358 | inbuf.src = (const void *)"d5:filesd"; | ||
| 359 | inbuf.size = strlen("d5:filesd"); | ||
| 360 | inbuf.pos = 0; | ||
| 361 | ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_continue); | ||
| 362 | } | ||
| 363 | |||
| 364 | /* For each bucket... */ | ||
| 365 | for (bucket = 0; bucket < OT_BUCKET_COUNT; ++bucket) { | ||
| 366 | /* Get exclusive access to that bucket */ | ||
| 367 | ot_vector *torrents_list = mutex_bucket_lock(bucket); | ||
| 368 | ot_torrent *torrents = (ot_torrent *)(torrents_list->data); | ||
| 369 | size_t i; | ||
| 370 | |||
| 371 | /* For each torrent in this bucket.. */ | ||
| 372 | for (i = 0; i < torrents_list->size; ++i) { | ||
| 373 | char compress_buffer[OT_SCRAPE_MAXENTRYLEN]; | ||
| 374 | r = fullscrape_write_one(mode, compress_buffer, torrents + i, &torrents[i].hash); | ||
| 375 | inbuf.src = compress_buffer; | ||
| 376 | inbuf.size = r - compress_buffer; | ||
| 377 | inbuf.pos = 0; | ||
| 378 | ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_continue); | ||
| 379 | |||
| 380 | /* Check if there still is enough buffer left */ | ||
| 381 | while (outbuf.pos + OT_SCRAPE_MAXENTRYLEN > outbuf.size) { | ||
| 382 | iovector.iov_len = outbuf.size; | ||
| 383 | |||
| 384 | if (mutex_workqueue_pushchunked(taskid, &iovector)) { | ||
| 385 | free(iovector.iov_base); | ||
| 386 | ZSTD_freeCCtx(zstream); | ||
| 387 | return mutex_bucket_unlock(bucket, 0); | ||
| 388 | } | ||
| 389 | /* Allocate a fresh output buffer */ | ||
| 390 | iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); | ||
| 391 | if (!iovector.iov_base) { | ||
| 392 | fprintf(stderr, "Out of memory trying to claim ouput buffer\n"); | ||
| 393 | ZSTD_freeCCtx(zstream); | ||
| 394 | return mutex_bucket_unlock(bucket, 0); | ||
| 395 | } | ||
| 396 | |||
| 397 | outbuf.dst = iovector.iov_base; | ||
| 398 | outbuf.size = OT_SCRAPE_CHUNK_SIZE; | ||
| 399 | outbuf.pos = 0; | ||
| 400 | |||
| 401 | ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_continue); | ||
| 402 | } | ||
| 403 | } | ||
| 404 | |||
| 405 | /* All torrents done: release lock on current bucket */ | ||
| 406 | mutex_bucket_unlock(bucket, 0); | ||
| 407 | |||
| 408 | /* Parent thread died? */ | ||
| 409 | if (!g_opentracker_running) | ||
| 410 | return; | ||
| 411 | } | ||
| 412 | |||
| 413 | if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) { | ||
| 414 | inbuf.src = (const void *)"ee"; | ||
| 415 | inbuf.size = strlen("ee"); | ||
| 416 | inbuf.pos = 0; | ||
| 417 | } | ||
| 418 | |||
| 419 | more_bytes = ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_end); | ||
| 420 | |||
| 421 | iovector.iov_len = outbuf.pos; | ||
| 422 | if (mutex_workqueue_pushchunked(taskid, &iovector)) { | ||
| 423 | free(iovector.iov_base); | ||
| 424 | ZSTD_freeCCtx(zstream); | ||
| 425 | return; | ||
| 426 | } | ||
| 427 | |||
| 428 | /* Check if there's a last batch of data in the zlib buffer */ | ||
| 429 | if (more_bytes) { | ||
| 430 | /* Allocate a fresh output buffer */ | ||
| 431 | iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); | ||
| 432 | |||
| 433 | if (!iovector.iov_base) { | ||
| 434 | fprintf(stderr, "Problem with iovec_fix_increase_or_free\n"); | ||
| 435 | ZSTD_freeCCtx(zstream); | ||
| 436 | return; | ||
| 437 | } | ||
| 438 | |||
| 439 | outbuf.dst = iovector.iov_base; | ||
| 440 | outbuf.size = OT_SCRAPE_CHUNK_SIZE; | ||
| 441 | outbuf.pos = 0; | ||
| 442 | |||
| 443 | ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_end); | ||
| 444 | |||
| 445 | /* Only pass the new buffer if there actually was some data left in the buffer */ | ||
| 446 | iovector.iov_len = outbuf.pos; | ||
| 447 | if (!iovector.iov_len || mutex_workqueue_pushchunked(taskid, &iovector)) | ||
| 448 | free(iovector.iov_base); | ||
| 449 | } | ||
| 450 | |||
| 451 | ZSTD_freeCCtx(zstream); | ||
| 452 | } | ||
| 453 | /* WANT_COMPRESSION_ZSTD */ | ||
| 454 | #endif | ||
| 455 | |||
| 314 | /* WANT_FULLSCRAPE */ | 456 | /* WANT_FULLSCRAPE */ |
| 315 | #endif | 457 | #endif |
| @@ -159,7 +159,9 @@ ssize_t http_sendiovecdata(const int64 sock, struct ot_workstruct *ws, int iovec | |||
| 159 | 159 | ||
| 160 | if (iovec_entries) { | 160 | if (iovec_entries) { |
| 161 | 161 | ||
| 162 | if (cookie->flag & STRUCT_HTTP_FLAG_GZIP) | 162 | if (cookie->flag & STRUCT_HTTP_FLAG_ZSTD) |
| 163 | encoding = "Content-Encoding: zstd\r\n"; | ||
| 164 | else if (cookie->flag & STRUCT_HTTP_FLAG_GZIP) | ||
| 163 | encoding = "Content-Encoding: gzip\r\n"; | 165 | encoding = "Content-Encoding: gzip\r\n"; |
| 164 | else if (cookie->flag & STRUCT_HTTP_FLAG_BZIP2) | 166 | else if (cookie->flag & STRUCT_HTTP_FLAG_BZIP2) |
| 165 | encoding = "Content-Encoding: bzip2\r\n"; | 167 | encoding = "Content-Encoding: bzip2\r\n"; |
| @@ -369,19 +371,34 @@ static ssize_t http_handle_fullscrape(const int64 sock, struct ot_workstruct *ws | |||
| 369 | } | 371 | } |
| 370 | #endif | 372 | #endif |
| 371 | 373 | ||
| 372 | #ifdef WANT_COMPRESSION_GZIP | 374 | |
| 375 | #if defined(WANT_COMPRESSION_GZIP) || defined(WANT_COMPRESSION_ZSTD) | ||
| 373 | ws->request[ws->request_size - 1] = 0; | 376 | ws->request[ws->request_size - 1] = 0; |
| 374 | #ifndef WANT_COMPRESSION_GZIP_ALWAYS | 377 | #ifdef WANT_COMPRESSION_GZIP |
| 375 | if (strstr(ws->request, "gzip")) { | 378 | if (strstr(ws->request, "gzip")) { |
| 376 | #endif | ||
| 377 | cookie->flag |= STRUCT_HTTP_FLAG_GZIP; | 379 | cookie->flag |= STRUCT_HTTP_FLAG_GZIP; |
| 378 | format = TASK_FLAG_GZIP; | 380 | format |= TASK_FLAG_GZIP; |
| 379 | stats_issue_event(EVENT_FULLSCRAPE_REQUEST_GZIP, 0, (uintptr_t)cookie->ip); | 381 | } |
| 380 | #ifndef WANT_COMPRESSION_GZIP_ALWAYS | ||
| 381 | } else | ||
| 382 | #endif | 382 | #endif |
| 383 | #ifdef WANT_COMPRESSION_ZSTD | ||
| 384 | if (strstr(ws->request, "zstd")) { | ||
| 385 | cookie->flag |= STRUCT_HTTP_FLAG_ZSTD; | ||
| 386 | format |= TASK_FLAG_ZSTD; | ||
| 387 | } | ||
| 388 | #endif | ||
| 389 | |||
| 390 | #if defined(WANT_COMPRESSION_ZSTD) && defined(WANT_COMPRESSION_ZSTD_ALWAYS) | ||
| 391 | cookie->flag |= STRUCT_HTTP_FLAG_ZSTD; | ||
| 392 | format |= TASK_FLAG_ZSTD; | ||
| 383 | #endif | 393 | #endif |
| 384 | stats_issue_event(EVENT_FULLSCRAPE_REQUEST, 0, (uintptr_t)cookie->ip); | 394 | |
| 395 | #if defined(WANT_COMPRESSION_GZIP) && defined(WANT_COMPRESSION_GZIP_ALWAYS) | ||
| 396 | cookie->flag |= STRUCT_HTTP_FLAG_GZIP; | ||
| 397 | format |= TASK_FLAG_GZIP; | ||
| 398 | #endif | ||
| 399 | #endif | ||
| 400 | |||
| 401 | stats_issue_event(EVENT_FULLSCRAPE_REQUEST, 0, (uintptr_t)cookie->ip); | ||
| 385 | 402 | ||
| 386 | #ifdef _DEBUG_HTTPERROR | 403 | #ifdef _DEBUG_HTTPERROR |
| 387 | fprintf(stderr, "%s", ws->debugbuf); | 404 | fprintf(stderr, "%s", ws->debugbuf); |
| @@ -10,8 +10,9 @@ typedef enum { | |||
| 10 | STRUCT_HTTP_FLAG_WAITINGFORTASK = 1, | 10 | STRUCT_HTTP_FLAG_WAITINGFORTASK = 1, |
| 11 | STRUCT_HTTP_FLAG_GZIP = 2, | 11 | STRUCT_HTTP_FLAG_GZIP = 2, |
| 12 | STRUCT_HTTP_FLAG_BZIP2 = 4, | 12 | STRUCT_HTTP_FLAG_BZIP2 = 4, |
| 13 | STRUCT_HTTP_FLAG_CHUNKED = 8, | 13 | STRUCT_HTTP_FLAG_ZSTD = 8, |
| 14 | STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER = 16 | 14 | STRUCT_HTTP_FLAG_CHUNKED = 16, |
| 15 | STRUCT_HTTP_FLAG_CHUNKED_IN_TRANSFER = 32 | ||
| 15 | } STRUCT_HTTP_FLAG; | 16 | } STRUCT_HTTP_FLAG; |
| 16 | 17 | ||
| 17 | struct http_data { | 18 | struct http_data { |
| @@ -59,7 +59,8 @@ typedef enum { | |||
| 59 | 59 | ||
| 60 | TASK_FLAG_GZIP = 0x1000, | 60 | TASK_FLAG_GZIP = 0x1000, |
| 61 | TASK_FLAG_BZIP2 = 0x2000, | 61 | TASK_FLAG_BZIP2 = 0x2000, |
| 62 | TASK_FLAG_CHUNKED = 0x4000, | 62 | TASK_FLAG_ZSTD = 0x4000, |
| 63 | TASK_FLAG_CHUNKED = 0x8000, | ||
| 63 | 64 | ||
| 64 | TASK_TASK_MASK = 0x0fff, | 65 | TASK_TASK_MASK = 0x0fff, |
| 65 | TASK_CLASS_MASK = 0x0f00, | 66 | TASK_CLASS_MASK = 0x0f00, |
| @@ -19,6 +19,7 @@ typedef enum { | |||
| 19 | EVENT_SCRAPE, | 19 | EVENT_SCRAPE, |
| 20 | EVENT_FULLSCRAPE_REQUEST, | 20 | EVENT_FULLSCRAPE_REQUEST, |
| 21 | EVENT_FULLSCRAPE_REQUEST_GZIP, | 21 | EVENT_FULLSCRAPE_REQUEST_GZIP, |
| 22 | EVENT_FULLSCRAPE_REQUEST_ZSTD, | ||
| 22 | EVENT_FULLSCRAPE, /* TCP only */ | 23 | EVENT_FULLSCRAPE, /* TCP only */ |
| 23 | EVENT_FAILED, | 24 | EVENT_FAILED, |
| 24 | EVENT_BUCKET_LOCKED, | 25 | EVENT_BUCKET_LOCKED, |
