]> git.proxmox.com Git - ceph.git/blob - ceph/src/zstd/contrib/adaptive-compression/adapt.c
import 15.2.0 Octopus source
[ceph.git] / ceph / src / zstd / contrib / adaptive-compression / adapt.c
1 /*
2 * Copyright (c) 2017-present, Facebook, Inc.
3 * All rights reserved.
4 *
5 * This source code is licensed under both the BSD-style license (found in the
6 * LICENSE file in the root directory of this source tree) and the GPLv2 (found
7 * in the COPYING file in the root directory of this source tree).
8 */
9
10 #include <stdio.h> /* fprintf */
11 #include <stdlib.h> /* malloc, free */
12 #include <pthread.h> /* pthread functions */
13 #include <string.h> /* memset */
14 #include "zstd_internal.h"
15 #include "util.h"
16 #include "timefn.h" /* UTIL_time_t, UTIL_getTime, UTIL_getSpanTimeMicro */
17
18 #define DISPLAY(...) fprintf(stderr, __VA_ARGS__)
19 #define PRINT(...) fprintf(stdout, __VA_ARGS__)
20 #define DEBUG(l, ...) { if (g_displayLevel>=l) { DISPLAY(__VA_ARGS__); } }
21 #define FILE_CHUNK_SIZE 4 << 20
22 #define MAX_NUM_JOBS 2
23 #define stdinmark "/*stdin*\\"
24 #define stdoutmark "/*stdout*\\"
25 #define MAX_PATH 256
26 #define DEFAULT_DISPLAY_LEVEL 1
27 #define DEFAULT_COMPRESSION_LEVEL 6
28 #define MAX_COMPRESSION_LEVEL_CHANGE 2
29 #define CONVERGENCE_LOWER_BOUND 5
30 #define CLEVEL_DECREASE_COOLDOWN 5
31 #define CHANGE_BY_TWO_THRESHOLD 0.1
32 #define CHANGE_BY_ONE_THRESHOLD 0.65
33
34 #ifndef DEBUG_MODE
35 static int g_displayLevel = DEFAULT_DISPLAY_LEVEL;
36 #else
37 static int g_displayLevel = DEBUG_MODE;
38 #endif
39
40 static unsigned g_compressionLevel = DEFAULT_COMPRESSION_LEVEL;
41 static UTIL_time_t g_startTime;
42 static size_t g_streamedSize = 0;
43 static unsigned g_useProgressBar = 1;
44 static unsigned g_forceCompressionLevel = 0;
45 static unsigned g_minCLevel = 1;
46 static unsigned g_maxCLevel;
47
48 typedef struct {
49 void* start;
50 size_t size;
51 size_t capacity;
52 } buffer_t;
53
54 typedef struct {
55 size_t filled;
56 buffer_t buffer;
57 } inBuff_t;
58
59 typedef struct {
60 buffer_t src;
61 buffer_t dst;
62 unsigned jobID;
63 unsigned lastJobPlusOne;
64 size_t compressedSize;
65 size_t dictSize;
66 } jobDescription;
67
68 typedef struct {
69 pthread_mutex_t pMutex;
70 int noError;
71 } mutex_t;
72
73 typedef struct {
74 pthread_cond_t pCond;
75 int noError;
76 } cond_t;
77
78 typedef struct {
79 unsigned compressionLevel;
80 unsigned numJobs;
81 unsigned nextJobID;
82 unsigned threadError;
83
84 /*
85 * JobIDs for the next jobs to be created, compressed, and written
86 */
87 unsigned jobReadyID;
88 unsigned jobCompressedID;
89 unsigned jobWriteID;
90 unsigned allJobsCompleted;
91
92 /*
93 * counter for how many jobs in a row the compression level has not changed
94 * if the counter becomes >= CONVERGENCE_LOWER_BOUND, the next time the
95 * compression level tries to change (by non-zero amount) resets the counter
96 * to 1 and does not apply the change
97 */
98 unsigned convergenceCounter;
99
100 /*
101 * cooldown counter in order to prevent rapid successive decreases in compression level
102 * whenever compression level is decreased, cooldown is set to CLEVEL_DECREASE_COOLDOWN
103 * whenever adaptCompressionLevel() is called and cooldown != 0, it is decremented
104 * as long as cooldown != 0, the compression level cannot be decreased
105 */
106 unsigned cooldown;
107
108 /*
109 * XWaitYCompletion
110 * Range from 0.0 to 1.0
111 * if the value is not 1.0, then this implies that thread X waited on thread Y to finish
112 * and thread Y was XWaitYCompletion finished at the time of the wait (i.e. compressWaitWriteCompletion=0.5
113 * implies that the compression thread waited on the write thread and it was only 50% finished writing a job)
114 */
115 double createWaitCompressionCompletion;
116 double compressWaitCreateCompletion;
117 double compressWaitWriteCompletion;
118 double writeWaitCompressionCompletion;
119
120 /*
121 * Completion values
122 * Range from 0.0 to 1.0
123 * Jobs are divided into mini-chunks in order to measure completion
124 * these values are updated each time a thread finishes its operation on the
125 * mini-chunk (i.e. finishes writing out, compressing, etc. this mini-chunk).
126 */
127 double compressionCompletion;
128 double writeCompletion;
129 double createCompletion;
130
131 mutex_t jobCompressed_mutex;
132 cond_t jobCompressed_cond;
133 mutex_t jobReady_mutex;
134 cond_t jobReady_cond;
135 mutex_t allJobsCompleted_mutex;
136 cond_t allJobsCompleted_cond;
137 mutex_t jobWrite_mutex;
138 cond_t jobWrite_cond;
139 mutex_t compressionCompletion_mutex;
140 mutex_t createCompletion_mutex;
141 mutex_t writeCompletion_mutex;
142 mutex_t compressionLevel_mutex;
143 size_t lastDictSize;
144 inBuff_t input;
145 jobDescription* jobs;
146 ZSTD_CCtx* cctx;
147 } adaptCCtx;
148
149 typedef struct {
150 adaptCCtx* ctx;
151 FILE* dstFile;
152 } outputThreadArg;
153
154 typedef struct {
155 FILE* srcFile;
156 adaptCCtx* ctx;
157 outputThreadArg* otArg;
158 } fcResources;
159
160 static void freeCompressionJobs(adaptCCtx* ctx)
161 {
162 unsigned u;
163 for (u=0; u<ctx->numJobs; u++) {
164 jobDescription job = ctx->jobs[u];
165 free(job.dst.start);
166 free(job.src.start);
167 }
168 }
169
170 static int destroyMutex(mutex_t* mutex)
171 {
172 if (mutex->noError) {
173 int const ret = pthread_mutex_destroy(&mutex->pMutex);
174 return ret;
175 }
176 return 0;
177 }
178
179 static int destroyCond(cond_t* cond)
180 {
181 if (cond->noError) {
182 int const ret = pthread_cond_destroy(&cond->pCond);
183 return ret;
184 }
185 return 0;
186 }
187
188 static int freeCCtx(adaptCCtx* ctx)
189 {
190 if (!ctx) return 0;
191 {
192 int error = 0;
193 error |= destroyMutex(&ctx->jobCompressed_mutex);
194 error |= destroyCond(&ctx->jobCompressed_cond);
195 error |= destroyMutex(&ctx->jobReady_mutex);
196 error |= destroyCond(&ctx->jobReady_cond);
197 error |= destroyMutex(&ctx->allJobsCompleted_mutex);
198 error |= destroyCond(&ctx->allJobsCompleted_cond);
199 error |= destroyMutex(&ctx->jobWrite_mutex);
200 error |= destroyCond(&ctx->jobWrite_cond);
201 error |= destroyMutex(&ctx->compressionCompletion_mutex);
202 error |= destroyMutex(&ctx->createCompletion_mutex);
203 error |= destroyMutex(&ctx->writeCompletion_mutex);
204 error |= destroyMutex(&ctx->compressionLevel_mutex);
205 error |= ZSTD_isError(ZSTD_freeCCtx(ctx->cctx));
206 free(ctx->input.buffer.start);
207 if (ctx->jobs){
208 freeCompressionJobs(ctx);
209 free(ctx->jobs);
210 }
211 free(ctx);
212 return error;
213 }
214 }
215
216 static int initMutex(mutex_t* mutex)
217 {
218 int const ret = pthread_mutex_init(&mutex->pMutex, NULL);
219 mutex->noError = !ret;
220 return ret;
221 }
222
223 static int initCond(cond_t* cond)
224 {
225 int const ret = pthread_cond_init(&cond->pCond, NULL);
226 cond->noError = !ret;
227 return ret;
228 }
229
230 static int initCCtx(adaptCCtx* ctx, unsigned numJobs)
231 {
232 ctx->compressionLevel = g_compressionLevel;
233 {
234 int pthreadError = 0;
235 pthreadError |= initMutex(&ctx->jobCompressed_mutex);
236 pthreadError |= initCond(&ctx->jobCompressed_cond);
237 pthreadError |= initMutex(&ctx->jobReady_mutex);
238 pthreadError |= initCond(&ctx->jobReady_cond);
239 pthreadError |= initMutex(&ctx->allJobsCompleted_mutex);
240 pthreadError |= initCond(&ctx->allJobsCompleted_cond);
241 pthreadError |= initMutex(&ctx->jobWrite_mutex);
242 pthreadError |= initCond(&ctx->jobWrite_cond);
243 pthreadError |= initMutex(&ctx->compressionCompletion_mutex);
244 pthreadError |= initMutex(&ctx->createCompletion_mutex);
245 pthreadError |= initMutex(&ctx->writeCompletion_mutex);
246 pthreadError |= initMutex(&ctx->compressionLevel_mutex);
247 if (pthreadError) return pthreadError;
248 }
249 ctx->numJobs = numJobs;
250 ctx->jobReadyID = 0;
251 ctx->jobCompressedID = 0;
252 ctx->jobWriteID = 0;
253 ctx->lastDictSize = 0;
254
255
256 ctx->createWaitCompressionCompletion = 1;
257 ctx->compressWaitCreateCompletion = 1;
258 ctx->compressWaitWriteCompletion = 1;
259 ctx->writeWaitCompressionCompletion = 1;
260 ctx->createCompletion = 1;
261 ctx->writeCompletion = 1;
262 ctx->compressionCompletion = 1;
263 ctx->convergenceCounter = 0;
264 ctx->cooldown = 0;
265
266 ctx->jobs = calloc(1, numJobs*sizeof(jobDescription));
267
268 if (!ctx->jobs) {
269 DISPLAY("Error: could not allocate space for jobs during context creation\n");
270 return 1;
271 }
272
273 /* initializing jobs */
274 {
275 unsigned jobNum;
276 for (jobNum=0; jobNum<numJobs; jobNum++) {
277 jobDescription* job = &ctx->jobs[jobNum];
278 job->src.start = malloc(2 * FILE_CHUNK_SIZE);
279 job->dst.start = malloc(ZSTD_compressBound(FILE_CHUNK_SIZE));
280 job->lastJobPlusOne = 0;
281 if (!job->src.start || !job->dst.start) {
282 DISPLAY("Could not allocate buffers for jobs\n");
283 return 1;
284 }
285 job->src.capacity = FILE_CHUNK_SIZE;
286 job->dst.capacity = ZSTD_compressBound(FILE_CHUNK_SIZE);
287 }
288 }
289
290 ctx->nextJobID = 0;
291 ctx->threadError = 0;
292 ctx->allJobsCompleted = 0;
293
294 ctx->cctx = ZSTD_createCCtx();
295 if (!ctx->cctx) {
296 DISPLAY("Error: could not allocate ZSTD_CCtx\n");
297 return 1;
298 }
299
300 ctx->input.filled = 0;
301 ctx->input.buffer.capacity = 2 * FILE_CHUNK_SIZE;
302
303 ctx->input.buffer.start = malloc(ctx->input.buffer.capacity);
304 if (!ctx->input.buffer.start) {
305 DISPLAY("Error: could not allocate input buffer\n");
306 return 1;
307 }
308 return 0;
309 }
310
311 static adaptCCtx* createCCtx(unsigned numJobs)
312 {
313
314 adaptCCtx* const ctx = calloc(1, sizeof(adaptCCtx));
315 if (ctx == NULL) {
316 DISPLAY("Error: could not allocate space for context\n");
317 return NULL;
318 }
319 {
320 int const error = initCCtx(ctx, numJobs);
321 if (error) {
322 freeCCtx(ctx);
323 return NULL;
324 }
325 return ctx;
326 }
327 }
328
329 static void signalErrorToThreads(adaptCCtx* ctx)
330 {
331 ctx->threadError = 1;
332 pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
333 pthread_cond_signal(&ctx->jobReady_cond.pCond);
334 pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
335
336 pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
337 pthread_cond_broadcast(&ctx->jobCompressed_cond.pCond);
338 pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
339
340 pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
341 pthread_cond_signal(&ctx->jobWrite_cond.pCond);
342 pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
343
344 pthread_mutex_lock(&ctx->allJobsCompleted_mutex.pMutex);
345 pthread_cond_signal(&ctx->allJobsCompleted_cond.pCond);
346 pthread_mutex_unlock(&ctx->allJobsCompleted_mutex.pMutex);
347 }
348
349 static void waitUntilAllJobsCompleted(adaptCCtx* ctx)
350 {
351 if (!ctx) return;
352 pthread_mutex_lock(&ctx->allJobsCompleted_mutex.pMutex);
353 while (ctx->allJobsCompleted == 0 && !ctx->threadError) {
354 pthread_cond_wait(&ctx->allJobsCompleted_cond.pCond, &ctx->allJobsCompleted_mutex.pMutex);
355 }
356 pthread_mutex_unlock(&ctx->allJobsCompleted_mutex.pMutex);
357 }
358
359 /* map completion percentages to values for changing compression level */
360 static unsigned convertCompletionToChange(double completion)
361 {
362 if (completion < CHANGE_BY_TWO_THRESHOLD) {
363 return 2;
364 }
365 else if (completion < CHANGE_BY_ONE_THRESHOLD) {
366 return 1;
367 }
368 else {
369 return 0;
370 }
371 }
372
373 /*
374 * Compression level is changed depending on which part of the compression process is lagging
375 * Currently, three theads exist for job creation, compression, and file writing respectively.
376 * adaptCompressionLevel() increments or decrements compression level based on which of the threads is lagging
377 * job creation or file writing lag => increased compression level
378 * compression thread lag => decreased compression level
379 * detecting which thread is lagging is done by keeping track of how many calls each thread makes to pthread_cond_wait
380 */
381 static void adaptCompressionLevel(adaptCCtx* ctx)
382 {
383 double createWaitCompressionCompletion;
384 double compressWaitCreateCompletion;
385 double compressWaitWriteCompletion;
386 double writeWaitCompressionCompletion;
387 double const threshold = 0.00001;
388 unsigned prevCompressionLevel;
389
390 pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
391 prevCompressionLevel = ctx->compressionLevel;
392 pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
393
394
395 if (g_forceCompressionLevel) {
396 pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
397 ctx->compressionLevel = g_compressionLevel;
398 pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
399 return;
400 }
401
402
403 DEBUG(2, "adapting compression level %u\n", prevCompressionLevel);
404
405 /* read and reset completion measurements */
406 pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
407 DEBUG(2, "createWaitCompressionCompletion %f\n", ctx->createWaitCompressionCompletion);
408 DEBUG(2, "writeWaitCompressionCompletion %f\n", ctx->writeWaitCompressionCompletion);
409 createWaitCompressionCompletion = ctx->createWaitCompressionCompletion;
410 writeWaitCompressionCompletion = ctx->writeWaitCompressionCompletion;
411 pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
412
413 pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex);
414 DEBUG(2, "compressWaitWriteCompletion %f\n", ctx->compressWaitWriteCompletion);
415 compressWaitWriteCompletion = ctx->compressWaitWriteCompletion;
416 pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex);
417
418 pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex);
419 DEBUG(2, "compressWaitCreateCompletion %f\n", ctx->compressWaitCreateCompletion);
420 compressWaitCreateCompletion = ctx->compressWaitCreateCompletion;
421 pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex);
422 DEBUG(2, "convergence counter: %u\n", ctx->convergenceCounter);
423
424 assert(g_minCLevel <= prevCompressionLevel && g_maxCLevel >= prevCompressionLevel);
425
426 /* adaptation logic */
427 if (ctx->cooldown) ctx->cooldown--;
428
429 if ((1-createWaitCompressionCompletion > threshold || 1-writeWaitCompressionCompletion > threshold) && ctx->cooldown == 0) {
430 /* create or write waiting on compression */
431 /* use whichever one waited less because it was slower */
432 double const completion = MAX(createWaitCompressionCompletion, writeWaitCompressionCompletion);
433 unsigned const change = convertCompletionToChange(completion);
434 unsigned const boundChange = MIN(change, prevCompressionLevel - g_minCLevel);
435 if (ctx->convergenceCounter >= CONVERGENCE_LOWER_BOUND && boundChange != 0) {
436 /* reset convergence counter, might have been a spike */
437 ctx->convergenceCounter = 0;
438 DEBUG(2, "convergence counter reset, no change applied\n");
439 }
440 else if (boundChange != 0) {
441 pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
442 ctx->compressionLevel -= boundChange;
443 pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
444 ctx->cooldown = CLEVEL_DECREASE_COOLDOWN;
445 ctx->convergenceCounter = 1;
446
447 DEBUG(2, "create or write threads waiting on compression, tried to decrease compression level by %u\n\n", boundChange);
448 }
449 }
450 else if (1-compressWaitWriteCompletion > threshold || 1-compressWaitCreateCompletion > threshold) {
451 /* compress waiting on write */
452 double const completion = MIN(compressWaitWriteCompletion, compressWaitCreateCompletion);
453 unsigned const change = convertCompletionToChange(completion);
454 unsigned const boundChange = MIN(change, g_maxCLevel - prevCompressionLevel);
455 if (ctx->convergenceCounter >= CONVERGENCE_LOWER_BOUND && boundChange != 0) {
456 /* reset convergence counter, might have been a spike */
457 ctx->convergenceCounter = 0;
458 DEBUG(2, "convergence counter reset, no change applied\n");
459 }
460 else if (boundChange != 0) {
461 pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
462 ctx->compressionLevel += boundChange;
463 pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
464 ctx->cooldown = 0;
465 ctx->convergenceCounter = 1;
466
467 DEBUG(2, "compress waiting on write or create, tried to increase compression level by %u\n\n", boundChange);
468 }
469
470 }
471
472 pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
473 if (ctx->compressionLevel == prevCompressionLevel) {
474 ctx->convergenceCounter++;
475 }
476 pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
477 }
478
479 static size_t getUseableDictSize(unsigned compressionLevel)
480 {
481 ZSTD_parameters const params = ZSTD_getParams(compressionLevel, 0, 0);
482 unsigned const overlapLog = compressionLevel >= (unsigned)ZSTD_maxCLevel() ? 0 : 3;
483 size_t const overlapSize = 1 << (params.cParams.windowLog - overlapLog);
484 return overlapSize;
485 }
486
487 static void* compressionThread(void* arg)
488 {
489 adaptCCtx* const ctx = (adaptCCtx*)arg;
490 unsigned currJob = 0;
491 for ( ; ; ) {
492 unsigned const currJobIndex = currJob % ctx->numJobs;
493 jobDescription* const job = &ctx->jobs[currJobIndex];
494 DEBUG(2, "starting compression for job %u\n", currJob);
495
496 {
497 /* check if compression thread will have to wait */
498 unsigned willWaitForCreate = 0;
499 unsigned willWaitForWrite = 0;
500
501 pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
502 if (currJob + 1 > ctx->jobReadyID) willWaitForCreate = 1;
503 pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
504
505 pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
506 if (currJob - ctx->jobWriteID >= ctx->numJobs) willWaitForWrite = 1;
507 pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
508
509
510 pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex);
511 if (willWaitForCreate) {
512 DEBUG(2, "compression will wait for create on job %u\n", currJob);
513 ctx->compressWaitCreateCompletion = ctx->createCompletion;
514 DEBUG(2, "create completion %f\n", ctx->compressWaitCreateCompletion);
515
516 }
517 else {
518 ctx->compressWaitCreateCompletion = 1;
519 }
520 pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex);
521
522 pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex);
523 if (willWaitForWrite) {
524 DEBUG(2, "compression will wait for write on job %u\n", currJob);
525 ctx->compressWaitWriteCompletion = ctx->writeCompletion;
526 DEBUG(2, "write completion %f\n", ctx->compressWaitWriteCompletion);
527 }
528 else {
529 ctx->compressWaitWriteCompletion = 1;
530 }
531 pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex);
532
533 }
534
535 /* wait until job is ready */
536 pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
537 while (currJob + 1 > ctx->jobReadyID && !ctx->threadError) {
538 pthread_cond_wait(&ctx->jobReady_cond.pCond, &ctx->jobReady_mutex.pMutex);
539 }
540 pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
541
542 /* wait until job previously in this space is written */
543 pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
544 while (currJob - ctx->jobWriteID >= ctx->numJobs && !ctx->threadError) {
545 pthread_cond_wait(&ctx->jobWrite_cond.pCond, &ctx->jobWrite_mutex.pMutex);
546 }
547 pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
548 /* reset compression completion */
549 pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
550 ctx->compressionCompletion = 0;
551 pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
552
553 /* adapt compression level */
554 if (currJob) adaptCompressionLevel(ctx);
555
556 pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
557 DEBUG(2, "job %u compressed with level %u\n", currJob, ctx->compressionLevel);
558 pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
559
560 /* compress the data */
561 {
562 size_t const compressionBlockSize = ZSTD_BLOCKSIZE_MAX; /* 128 KB */
563 unsigned cLevel;
564 unsigned blockNum = 0;
565 size_t remaining = job->src.size;
566 size_t srcPos = 0;
567 size_t dstPos = 0;
568
569 pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
570 cLevel = ctx->compressionLevel;
571 pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
572
573 /* reset compressed size */
574 job->compressedSize = 0;
575 DEBUG(2, "calling ZSTD_compressBegin()\n");
576 /* begin compression */
577 {
578 size_t const useDictSize = MIN(getUseableDictSize(cLevel), job->dictSize);
579 ZSTD_parameters params = ZSTD_getParams(cLevel, 0, useDictSize);
580 params.cParams.windowLog = 23;
581 {
582 size_t const initError = ZSTD_compressBegin_advanced(ctx->cctx, job->src.start + job->dictSize - useDictSize, useDictSize, params, 0);
583 size_t const windowSizeError = ZSTD_CCtx_setParameter(ctx->cctx, ZSTD_c_forceMaxWindow, 1);
584 if (ZSTD_isError(initError) || ZSTD_isError(windowSizeError)) {
585 DISPLAY("Error: something went wrong while starting compression\n");
586 signalErrorToThreads(ctx);
587 return arg;
588 }
589 }
590 }
591 DEBUG(2, "finished with ZSTD_compressBegin()\n");
592
593 do {
594 size_t const actualBlockSize = MIN(remaining, compressionBlockSize);
595
596 /* continue compression */
597 if (currJob != 0 || blockNum != 0) { /* not first block of first job flush/overwrite the frame header */
598 size_t const hSize = ZSTD_compressContinue(ctx->cctx, job->dst.start + dstPos, job->dst.capacity - dstPos, job->src.start + job->dictSize + srcPos, 0);
599 if (ZSTD_isError(hSize)) {
600 DISPLAY("Error: something went wrong while continuing compression\n");
601 job->compressedSize = hSize;
602 signalErrorToThreads(ctx);
603 return arg;
604 }
605 ZSTD_invalidateRepCodes(ctx->cctx);
606 }
607 {
608 size_t const ret = (job->lastJobPlusOne == currJob + 1 && remaining == actualBlockSize) ?
609 ZSTD_compressEnd (ctx->cctx, job->dst.start + dstPos, job->dst.capacity - dstPos, job->src.start + job->dictSize + srcPos, actualBlockSize) :
610 ZSTD_compressContinue(ctx->cctx, job->dst.start + dstPos, job->dst.capacity - dstPos, job->src.start + job->dictSize + srcPos, actualBlockSize);
611 if (ZSTD_isError(ret)) {
612 DISPLAY("Error: something went wrong during compression: %s\n", ZSTD_getErrorName(ret));
613 signalErrorToThreads(ctx);
614 return arg;
615 }
616 job->compressedSize += ret;
617 remaining -= actualBlockSize;
618 srcPos += actualBlockSize;
619 dstPos += ret;
620 blockNum++;
621
622 /* update completion */
623 pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
624 ctx->compressionCompletion = 1 - (double)remaining/job->src.size;
625 pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
626 }
627 } while (remaining != 0);
628 job->dst.size = job->compressedSize;
629 }
630 pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
631 ctx->jobCompressedID++;
632 pthread_cond_broadcast(&ctx->jobCompressed_cond.pCond);
633 pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
634 if (job->lastJobPlusOne == currJob + 1 || ctx->threadError) {
635 /* finished compressing all jobs */
636 break;
637 }
638 DEBUG(2, "finished compressing job %u\n", currJob);
639 currJob++;
640 }
641 return arg;
642 }
643
644 static void displayProgress(unsigned cLevel, unsigned last)
645 {
646 UTIL_time_t currTime = UTIL_getTime();
647 if (!g_useProgressBar) return;
648 { double const timeElapsed = (double)(UTIL_getSpanTimeMicro(g_startTime, currTime) / 1000.0);
649 double const sizeMB = (double)g_streamedSize / (1 << 20);
650 double const avgCompRate = sizeMB * 1000 / timeElapsed;
651 fprintf(stderr, "\r| Comp. Level: %2u | Time Elapsed: %7.2f s | Data Size: %7.1f MB | Avg Comp. Rate: %6.2f MB/s |", cLevel, timeElapsed/1000.0, sizeMB, avgCompRate);
652 if (last) {
653 fprintf(stderr, "\n");
654 } else {
655 fflush(stderr);
656 } }
657 }
658
659 static void* outputThread(void* arg)
660 {
661 outputThreadArg* const otArg = (outputThreadArg*)arg;
662 adaptCCtx* const ctx = otArg->ctx;
663 FILE* const dstFile = otArg->dstFile;
664
665 unsigned currJob = 0;
666 for ( ; ; ) {
667 unsigned const currJobIndex = currJob % ctx->numJobs;
668 jobDescription* const job = &ctx->jobs[currJobIndex];
669 unsigned willWaitForCompress = 0;
670 DEBUG(2, "starting write for job %u\n", currJob);
671
672 pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
673 if (currJob + 1 > ctx->jobCompressedID) willWaitForCompress = 1;
674 pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
675
676
677 pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
678 if (willWaitForCompress) {
679 /* write thread is waiting on compression thread */
680 ctx->writeWaitCompressionCompletion = ctx->compressionCompletion;
681 DEBUG(2, "writer thread waiting for nextJob: %u, writeWaitCompressionCompletion %f\n", currJob, ctx->writeWaitCompressionCompletion);
682 }
683 else {
684 ctx->writeWaitCompressionCompletion = 1;
685 }
686 pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
687
688 pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
689 while (currJob + 1 > ctx->jobCompressedID && !ctx->threadError) {
690 pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
691 }
692 pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
693
694 /* reset write completion */
695 pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex);
696 ctx->writeCompletion = 0;
697 pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex);
698
699 {
700 size_t const compressedSize = job->compressedSize;
701 size_t remaining = compressedSize;
702 if (ZSTD_isError(compressedSize)) {
703 DISPLAY("Error: an error occurred during compression\n");
704 signalErrorToThreads(ctx);
705 return arg;
706 }
707 {
708 size_t const blockSize = MAX(compressedSize >> 7, 1 << 10);
709 size_t pos = 0;
710 for ( ; ; ) {
711 size_t const writeSize = MIN(remaining, blockSize);
712 size_t const ret = fwrite(job->dst.start + pos, 1, writeSize, dstFile);
713 if (ret != writeSize) break;
714 pos += ret;
715 remaining -= ret;
716
717 /* update completion variable for writing */
718 pthread_mutex_lock(&ctx->writeCompletion_mutex.pMutex);
719 ctx->writeCompletion = 1 - (double)remaining/compressedSize;
720 pthread_mutex_unlock(&ctx->writeCompletion_mutex.pMutex);
721
722 if (remaining == 0) break;
723 }
724 if (pos != compressedSize) {
725 DISPLAY("Error: an error occurred during file write operation\n");
726 signalErrorToThreads(ctx);
727 return arg;
728 }
729 }
730 }
731 {
732 unsigned cLevel;
733 pthread_mutex_lock(&ctx->compressionLevel_mutex.pMutex);
734 cLevel = ctx->compressionLevel;
735 pthread_mutex_unlock(&ctx->compressionLevel_mutex.pMutex);
736 displayProgress(cLevel, job->lastJobPlusOne == currJob + 1);
737 }
738 pthread_mutex_lock(&ctx->jobWrite_mutex.pMutex);
739 ctx->jobWriteID++;
740 pthread_cond_signal(&ctx->jobWrite_cond.pCond);
741 pthread_mutex_unlock(&ctx->jobWrite_mutex.pMutex);
742
743 if (job->lastJobPlusOne == currJob + 1 || ctx->threadError) {
744 /* finished with all jobs */
745 pthread_mutex_lock(&ctx->allJobsCompleted_mutex.pMutex);
746 ctx->allJobsCompleted = 1;
747 pthread_cond_signal(&ctx->allJobsCompleted_cond.pCond);
748 pthread_mutex_unlock(&ctx->allJobsCompleted_mutex.pMutex);
749 break;
750 }
751 DEBUG(2, "finished writing job %u\n", currJob);
752 currJob++;
753
754 }
755 return arg;
756 }
757
758 static int createCompressionJob(adaptCCtx* ctx, size_t srcSize, int last)
759 {
760 unsigned const nextJob = ctx->nextJobID;
761 unsigned const nextJobIndex = nextJob % ctx->numJobs;
762 jobDescription* const job = &ctx->jobs[nextJobIndex];
763
764
765 job->src.size = srcSize;
766 job->jobID = nextJob;
767 if (last) job->lastJobPlusOne = nextJob + 1;
768 {
769 /* swap buffer */
770 void* const copy = job->src.start;
771 job->src.start = ctx->input.buffer.start;
772 ctx->input.buffer.start = copy;
773 }
774 job->dictSize = ctx->lastDictSize;
775
776 ctx->nextJobID++;
777 /* if not on the last job, reuse data as dictionary in next job */
778 if (!last) {
779 size_t const oldDictSize = ctx->lastDictSize;
780 memcpy(ctx->input.buffer.start, job->src.start + oldDictSize, srcSize);
781 ctx->lastDictSize = srcSize;
782 ctx->input.filled = srcSize;
783 }
784
785 /* signal job ready */
786 pthread_mutex_lock(&ctx->jobReady_mutex.pMutex);
787 ctx->jobReadyID++;
788 pthread_cond_signal(&ctx->jobReady_cond.pCond);
789 pthread_mutex_unlock(&ctx->jobReady_mutex.pMutex);
790
791 return 0;
792 }
793
794 static int performCompression(adaptCCtx* ctx, FILE* const srcFile, outputThreadArg* otArg)
795 {
796 /* early error check to exit */
797 if (!ctx || !srcFile || !otArg) {
798 return 1;
799 }
800
801 /* create output thread */
802 {
803 pthread_t out;
804 if (pthread_create(&out, NULL, &outputThread, otArg)) {
805 DISPLAY("Error: could not create output thread\n");
806 signalErrorToThreads(ctx);
807 return 1;
808 }
809 else if (pthread_detach(out)) {
810 DISPLAY("Error: could not detach output thread\n");
811 signalErrorToThreads(ctx);
812 return 1;
813 }
814 }
815
816 /* create compression thread */
817 {
818 pthread_t compression;
819 if (pthread_create(&compression, NULL, &compressionThread, ctx)) {
820 DISPLAY("Error: could not create compression thread\n");
821 signalErrorToThreads(ctx);
822 return 1;
823 }
824 else if (pthread_detach(compression)) {
825 DISPLAY("Error: could not detach compression thread\n");
826 signalErrorToThreads(ctx);
827 return 1;
828 }
829 }
830 {
831 unsigned currJob = 0;
832 /* creating jobs */
833 for ( ; ; ) {
834 size_t pos = 0;
835 size_t const readBlockSize = 1 << 15;
836 size_t remaining = FILE_CHUNK_SIZE;
837 unsigned const nextJob = ctx->nextJobID;
838 unsigned willWaitForCompress = 0;
839 DEBUG(2, "starting creation of job %u\n", currJob);
840
841 pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
842 if (nextJob - ctx->jobCompressedID >= ctx->numJobs) willWaitForCompress = 1;
843 pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
844
845 pthread_mutex_lock(&ctx->compressionCompletion_mutex.pMutex);
846 if (willWaitForCompress) {
847 /* creation thread is waiting, take measurement of completion */
848 ctx->createWaitCompressionCompletion = ctx->compressionCompletion;
849 DEBUG(2, "create thread waiting for nextJob: %u, createWaitCompressionCompletion %f\n", nextJob, ctx->createWaitCompressionCompletion);
850 }
851 else {
852 ctx->createWaitCompressionCompletion = 1;
853 }
854 pthread_mutex_unlock(&ctx->compressionCompletion_mutex.pMutex);
855
856 /* wait until the job has been compressed */
857 pthread_mutex_lock(&ctx->jobCompressed_mutex.pMutex);
858 while (nextJob - ctx->jobCompressedID >= ctx->numJobs && !ctx->threadError) {
859 pthread_cond_wait(&ctx->jobCompressed_cond.pCond, &ctx->jobCompressed_mutex.pMutex);
860 }
861 pthread_mutex_unlock(&ctx->jobCompressed_mutex.pMutex);
862
863 /* reset create completion */
864 pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex);
865 ctx->createCompletion = 0;
866 pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex);
867
868 while (remaining != 0 && !feof(srcFile)) {
869 size_t const ret = fread(ctx->input.buffer.start + ctx->input.filled + pos, 1, readBlockSize, srcFile);
870 if (ret != readBlockSize && !feof(srcFile)) {
871 /* error could not read correct number of bytes */
872 DISPLAY("Error: problem occurred during read from src file\n");
873 signalErrorToThreads(ctx);
874 return 1;
875 }
876 pos += ret;
877 remaining -= ret;
878 pthread_mutex_lock(&ctx->createCompletion_mutex.pMutex);
879 ctx->createCompletion = 1 - (double)remaining/((size_t)FILE_CHUNK_SIZE);
880 pthread_mutex_unlock(&ctx->createCompletion_mutex.pMutex);
881 }
882 if (remaining != 0 && !feof(srcFile)) {
883 DISPLAY("Error: problem occurred during read from src file\n");
884 signalErrorToThreads(ctx);
885 return 1;
886 }
887 g_streamedSize += pos;
888 /* reading was fine, now create the compression job */
889 {
890 int const last = feof(srcFile);
891 int const error = createCompressionJob(ctx, pos, last);
892 if (error != 0) {
893 signalErrorToThreads(ctx);
894 return error;
895 }
896 }
897 DEBUG(2, "finished creating job %u\n", currJob);
898 currJob++;
899 if (feof(srcFile)) {
900 break;
901 }
902 }
903 }
904 /* success -- created all jobs */
905 return 0;
906 }
907
908 static fcResources createFileCompressionResources(const char* const srcFilename, const char* const dstFilenameOrNull)
909 {
910 fcResources fcr;
911 unsigned const stdinUsed = !strcmp(srcFilename, stdinmark);
912 FILE* const srcFile = stdinUsed ? stdin : fopen(srcFilename, "rb");
913 const char* const outFilenameIntermediate = (stdinUsed && !dstFilenameOrNull) ? stdoutmark : dstFilenameOrNull;
914 const char* outFilename = outFilenameIntermediate;
915 char fileAndSuffix[MAX_PATH];
916 size_t const numJobs = MAX_NUM_JOBS;
917
918 memset(&fcr, 0, sizeof(fcr));
919
920 if (!outFilenameIntermediate) {
921 if (snprintf(fileAndSuffix, MAX_PATH, "%s.zst", srcFilename) + 1 > MAX_PATH) {
922 DISPLAY("Error: output filename is too long\n");
923 return fcr;
924 }
925 outFilename = fileAndSuffix;
926 }
927
928 {
929 unsigned const stdoutUsed = !strcmp(outFilename, stdoutmark);
930 FILE* const dstFile = stdoutUsed ? stdout : fopen(outFilename, "wb");
931 fcr.otArg = malloc(sizeof(outputThreadArg));
932 if (!fcr.otArg) {
933 DISPLAY("Error: could not allocate space for output thread argument\n");
934 return fcr;
935 }
936 fcr.otArg->dstFile = dstFile;
937 }
938 /* checking for errors */
939 if (!fcr.otArg->dstFile || !srcFile) {
940 DISPLAY("Error: some file(s) could not be opened\n");
941 return fcr;
942 }
943
944 /* creating context */
945 fcr.ctx = createCCtx(numJobs);
946 fcr.otArg->ctx = fcr.ctx;
947 fcr.srcFile = srcFile;
948 return fcr;
949 }
950
951 static int freeFileCompressionResources(fcResources* fcr)
952 {
953 int ret = 0;
954 waitUntilAllJobsCompleted(fcr->ctx);
955 ret |= (fcr->srcFile != NULL) ? fclose(fcr->srcFile) : 0;
956 ret |= (fcr->ctx != NULL) ? freeCCtx(fcr->ctx) : 0;
957 if (fcr->otArg) {
958 ret |= (fcr->otArg->dstFile != stdout) ? fclose(fcr->otArg->dstFile) : 0;
959 free(fcr->otArg);
960 /* no need to freeCCtx() on otArg->ctx because it should be the same context */
961 }
962 return ret;
963 }
964
965 static int compressFilename(const char* const srcFilename, const char* const dstFilenameOrNull)
966 {
967 int ret = 0;
968 fcResources fcr = createFileCompressionResources(srcFilename, dstFilenameOrNull);
969 g_streamedSize = 0;
970 ret |= performCompression(fcr.ctx, fcr.srcFile, fcr.otArg);
971 ret |= freeFileCompressionResources(&fcr);
972 return ret;
973 }
974
975 static int compressFilenames(const char** filenameTable, unsigned numFiles, unsigned forceStdout)
976 {
977 int ret = 0;
978 unsigned fileNum;
979 for (fileNum=0; fileNum<numFiles; fileNum++) {
980 const char* filename = filenameTable[fileNum];
981 if (!forceStdout) {
982 ret |= compressFilename(filename, NULL);
983 }
984 else {
985 ret |= compressFilename(filename, stdoutmark);
986 }
987
988 }
989 return ret;
990 }
991
992 /*! readU32FromChar() :
993 @return : unsigned integer value read from input in `char` format
994 allows and interprets K, KB, KiB, M, MB and MiB suffix.
995 Will also modify `*stringPtr`, advancing it to position where it stopped reading.
996 Note : function result can overflow if digit string > MAX_UINT */
997 static unsigned readU32FromChar(const char** stringPtr)
998 {
999 unsigned result = 0;
1000 while ((**stringPtr >='0') && (**stringPtr <='9'))
1001 result *= 10, result += **stringPtr - '0', (*stringPtr)++ ;
1002 if ((**stringPtr=='K') || (**stringPtr=='M')) {
1003 result <<= 10;
1004 if (**stringPtr=='M') result <<= 10;
1005 (*stringPtr)++ ;
1006 if (**stringPtr=='i') (*stringPtr)++;
1007 if (**stringPtr=='B') (*stringPtr)++;
1008 }
1009 return result;
1010 }
1011
1012 static void help(const char* progPath)
1013 {
1014 PRINT("Usage:\n");
1015 PRINT(" %s [options] [file(s)]\n", progPath);
1016 PRINT("\n");
1017 PRINT("Options:\n");
1018 PRINT(" -oFILE : specify the output file name\n");
1019 PRINT(" -i# : provide initial compression level -- default %d, must be in the range [L, U] where L and U are bound values (see below for defaults)\n", DEFAULT_COMPRESSION_LEVEL);
1020 PRINT(" -h : display help/information\n");
1021 PRINT(" -f : force the compression level to stay constant\n");
1022 PRINT(" -c : force write to stdout\n");
1023 PRINT(" -p : hide progress bar\n");
1024 PRINT(" -q : quiet mode -- do not show progress bar or other information\n");
1025 PRINT(" -l# : provide lower bound for compression level -- default 1\n");
1026 PRINT(" -u# : provide upper bound for compression level -- default %u\n", ZSTD_maxCLevel());
1027 }
1028 /* return 0 if successful, else return error */
1029 int main(int argCount, const char* argv[])
1030 {
1031 const char* outFilename = NULL;
1032 const char** filenameTable = (const char**)malloc(argCount*sizeof(const char*));
1033 unsigned filenameIdx = 0;
1034 unsigned forceStdout = 0;
1035 unsigned providedInitialCLevel = 0;
1036 int ret = 0;
1037 int argNum;
1038 filenameTable[0] = stdinmark;
1039 g_maxCLevel = ZSTD_maxCLevel();
1040
1041 if (filenameTable == NULL) {
1042 DISPLAY("Error: could not allocate sapce for filename table.\n");
1043 return 1;
1044 }
1045
1046 for (argNum=1; argNum<argCount; argNum++) {
1047 const char* argument = argv[argNum];
1048
1049 /* output filename designated with "-o" */
1050 if (argument[0]=='-' && strlen(argument) > 1) {
1051 switch (argument[1]) {
1052 case 'o':
1053 argument += 2;
1054 outFilename = argument;
1055 break;
1056 case 'i':
1057 argument += 2;
1058 g_compressionLevel = readU32FromChar(&argument);
1059 providedInitialCLevel = 1;
1060 break;
1061 case 'h':
1062 help(argv[0]);
1063 goto _main_exit;
1064 case 'p':
1065 g_useProgressBar = 0;
1066 break;
1067 case 'c':
1068 forceStdout = 1;
1069 outFilename = stdoutmark;
1070 break;
1071 case 'f':
1072 g_forceCompressionLevel = 1;
1073 break;
1074 case 'q':
1075 g_useProgressBar = 0;
1076 g_displayLevel = 0;
1077 break;
1078 case 'l':
1079 argument += 2;
1080 g_minCLevel = readU32FromChar(&argument);
1081 break;
1082 case 'u':
1083 argument += 2;
1084 g_maxCLevel = readU32FromChar(&argument);
1085 break;
1086 default:
1087 DISPLAY("Error: invalid argument provided\n");
1088 ret = 1;
1089 goto _main_exit;
1090 }
1091 continue;
1092 }
1093
1094 /* regular files to be compressed */
1095 filenameTable[filenameIdx++] = argument;
1096 }
1097
1098 /* check initial, max, and min compression levels */
1099 {
1100 unsigned const minMaxInconsistent = g_minCLevel > g_maxCLevel;
1101 unsigned const initialNotInRange = g_minCLevel > g_compressionLevel || g_maxCLevel < g_compressionLevel;
1102 if (minMaxInconsistent || (initialNotInRange && providedInitialCLevel)) {
1103 DISPLAY("Error: provided compression level parameters are invalid\n");
1104 ret = 1;
1105 goto _main_exit;
1106 }
1107 else if (initialNotInRange) {
1108 g_compressionLevel = g_minCLevel;
1109 }
1110 }
1111
1112 /* error checking with number of files */
1113 if (filenameIdx > 1 && (outFilename != NULL && strcmp(outFilename, stdoutmark))) {
1114 DISPLAY("Error: multiple input files provided, cannot use specified output file\n");
1115 ret = 1;
1116 goto _main_exit;
1117 }
1118
1119 /* compress files */
1120 if (filenameIdx <= 1) {
1121 ret |= compressFilename(filenameTable[0], outFilename);
1122 }
1123 else {
1124 ret |= compressFilenames(filenameTable, filenameIdx, forceStdout);
1125 }
1126 _main_exit:
1127 free(filenameTable);
1128 return ret;
1129 }