From 33bd2c9094e7f90a62cb59cdf5cf670ac58d5308 Mon Sep 17 00:00:00 2001 From: Dirk Engling Date: Thu, 18 Apr 2024 14:54:34 +0200 Subject: Add support for zstd --- ot_fullscrape.c | 150 ++++++++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 146 insertions(+), 4 deletions(-) (limited to 'ot_fullscrape.c') diff --git a/ot_fullscrape.c b/ot_fullscrape.c index aed2ad9..6fd6d1c 100644 --- a/ot_fullscrape.c +++ b/ot_fullscrape.c @@ -14,6 +14,10 @@ #ifdef WANT_COMPRESSION_GZIP #include #endif +#ifdef WANT_COMPRESSION_ZSTD +#include +#endif + /* Libowfat */ #include "byte.h" @@ -40,6 +44,9 @@ static void fullscrape_make(int taskid, ot_tasktype mode); #ifdef WANT_COMPRESSION_GZIP static void fullscrape_make_gzip(int taskid, ot_tasktype mode); #endif +#ifdef WANT_COMPRESSION_ZSTD +static void fullscrape_make_zstd(int taskid, ot_tasktype mode); +#endif /* Converter function from memory to human readable hex strings XXX - Duplicated from ot_stats. Needs fix. */ @@ -64,6 +71,11 @@ static void *fullscrape_worker(void *args) { while (g_opentracker_running) { ot_tasktype tasktype = TASK_FULLSCRAPE; ot_taskid taskid = mutex_workqueue_poptask(&tasktype); +#ifdef WANT_COMPRESSION_ZSTD + if (tasktype & TASK_FLAG_ZSTD) + fullscrape_make_zstd(taskid, tasktype); + else +#endif #ifdef WANT_COMPRESSION_GZIP if (tasktype & TASK_FLAG_GZIP) fullscrape_make_gzip(taskid, tasktype); @@ -205,7 +217,6 @@ static void fullscrape_make_gzip(int taskid, ot_tasktype mode) { struct iovec iovector = {NULL, 0}; int zres; z_stream strm; - fprintf(stderr, "GZIP path\n"); /* Setup return vector... */ iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); if (!iovector.iov_base) @@ -267,8 +278,10 @@ static void fullscrape_make_gzip(int taskid, ot_tasktype mode) { mutex_bucket_unlock(bucket, 0); /* Parent thread died? */ - if (!g_opentracker_running) + if (!g_opentracker_running) { + deflateEnd(&strm); return; + } } if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) { @@ -282,7 +295,8 @@ static void fullscrape_make_gzip(int taskid, ot_tasktype mode) { iovector.iov_len = (char *)strm.next_out - (char *)iovector.iov_base; if (mutex_workqueue_pushchunked(taskid, &iovector)) { free(iovector.iov_base); - return mutex_bucket_unlock(bucket, 0); + deflateEnd(&strm); + return; } /* Check if there's a last batch of data in the zlib buffer */ @@ -293,7 +307,7 @@ static void fullscrape_make_gzip(int taskid, ot_tasktype mode) { if (!iovector.iov_base) { fprintf(stderr, "Problem with iovec_fix_increase_or_free\n"); deflateEnd(&strm); - return mutex_bucket_unlock(bucket, 0); + return; } strm.next_out = iovector.iov_base; strm.avail_out = OT_SCRAPE_CHUNK_SIZE; @@ -311,5 +325,133 @@ static void fullscrape_make_gzip(int taskid, ot_tasktype mode) { /* WANT_COMPRESSION_GZIP */ #endif +#ifdef WANT_COMPRESSION_ZSTD + +static void fullscrape_make_zstd(int taskid, ot_tasktype mode) { + int bucket; + char *r; + struct iovec iovector = {NULL, 0}; + ZSTD_CCtx *zstream = ZSTD_createCCtx(); + ZSTD_inBuffer inbuf; + ZSTD_outBuffer outbuf; + size_t more_bytes; + + if (!zstream) + return; + + /* Setup return vector... */ + iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); + if (!iovector.iov_base) { + ZSTD_freeCCtx(zstream); + return; + } + + /* Working with a compression level 6 is half as fast as level 3, but + seems to be the last reasonable bump that's worth extra cpu */ + ZSTD_CCtx_setParameter(zstream, ZSTD_c_compressionLevel, 6); + + outbuf.dst = iovector.iov_base; + outbuf.size = OT_SCRAPE_CHUNK_SIZE; + outbuf.pos = 0; + + if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) { + inbuf.src = (const void *)"d5:filesd"; + inbuf.size = strlen("d5:filesd"); + inbuf.pos = 0; + ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_continue); + } + + /* For each bucket... */ + for (bucket = 0; bucket < OT_BUCKET_COUNT; ++bucket) { + /* Get exclusive access to that bucket */ + ot_vector *torrents_list = mutex_bucket_lock(bucket); + ot_torrent *torrents = (ot_torrent *)(torrents_list->data); + size_t i; + + /* For each torrent in this bucket.. */ + for (i = 0; i < torrents_list->size; ++i) { + char compress_buffer[OT_SCRAPE_MAXENTRYLEN]; + r = fullscrape_write_one(mode, compress_buffer, torrents + i, &torrents[i].hash); + inbuf.src = compress_buffer; + inbuf.size = r - compress_buffer; + inbuf.pos = 0; + ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_continue); + + /* Check if there still is enough buffer left */ + while (outbuf.pos + OT_SCRAPE_MAXENTRYLEN > outbuf.size) { + iovector.iov_len = outbuf.size; + + if (mutex_workqueue_pushchunked(taskid, &iovector)) { + free(iovector.iov_base); + ZSTD_freeCCtx(zstream); + return mutex_bucket_unlock(bucket, 0); + } + /* Allocate a fresh output buffer */ + iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); + if (!iovector.iov_base) { + fprintf(stderr, "Out of memory trying to claim ouput buffer\n"); + ZSTD_freeCCtx(zstream); + return mutex_bucket_unlock(bucket, 0); + } + + outbuf.dst = iovector.iov_base; + outbuf.size = OT_SCRAPE_CHUNK_SIZE; + outbuf.pos = 0; + + ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_continue); + } + } + + /* All torrents done: release lock on current bucket */ + mutex_bucket_unlock(bucket, 0); + + /* Parent thread died? */ + if (!g_opentracker_running) + return; + } + + if ((mode & TASK_TASK_MASK) == TASK_FULLSCRAPE) { + inbuf.src = (const void *)"ee"; + inbuf.size = strlen("ee"); + inbuf.pos = 0; + } + + more_bytes = ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_end); + + iovector.iov_len = outbuf.pos; + if (mutex_workqueue_pushchunked(taskid, &iovector)) { + free(iovector.iov_base); + ZSTD_freeCCtx(zstream); + return; + } + + /* Check if there's a last batch of data in the zlib buffer */ + if (more_bytes) { + /* Allocate a fresh output buffer */ + iovector.iov_base = malloc(OT_SCRAPE_CHUNK_SIZE); + + if (!iovector.iov_base) { + fprintf(stderr, "Problem with iovec_fix_increase_or_free\n"); + ZSTD_freeCCtx(zstream); + return; + } + + outbuf.dst = iovector.iov_base; + outbuf.size = OT_SCRAPE_CHUNK_SIZE; + outbuf.pos = 0; + + ZSTD_compressStream2(zstream, &outbuf, &inbuf, ZSTD_e_end); + + /* Only pass the new buffer if there actually was some data left in the buffer */ + iovector.iov_len = outbuf.pos; + if (!iovector.iov_len || mutex_workqueue_pushchunked(taskid, &iovector)) + free(iovector.iov_base); + } + + ZSTD_freeCCtx(zstream); +} +/* WANT_COMPRESSION_ZSTD */ +#endif + /* WANT_FULLSCRAPE */ #endif -- cgit v1.2.3