2 * Copyright (c) 2017-present, Facebook, Inc.
5 * This source code is licensed under both the BSD-style license (found in the
6 * LICENSE file in the root directory of this source tree) and the GPLv2 (found
7 * in the COPYING file in the root directory of this source tree).
10 #include <stdio.h> /* fprintf */
11 #include <stdlib.h> /* malloc, free */
12 #include <pthread.h> /* pthread functions */
13 #include <string.h> /* memset */
14 #include "zstd_internal.h"
16 #include "timefn.h" /* UTIL_time_t, UTIL_getTime, UTIL_getSpanTimeMicro */
18 #define DISPLAY(...) fprintf(stderr, __VA_ARGS__)
19 #define PRINT(...) fprintf(stdout, __VA_ARGS__)
20 #define DEBUG(l, ...) { if (g_displayLevel>=l) { DISPLAY(__VA_ARGS__); } }
21 #define FILE_CHUNK_SIZE 4 << 20
22 #define MAX_NUM_JOBS 2
23 #define stdinmark "/*stdin*\\"
24 #define stdoutmark "/*stdout*\\"
26 #define DEFAULT_DISPLAY_LEVEL 1
27 #define DEFAULT_COMPRESSION_LEVEL 6
28 #define MAX_COMPRESSION_LEVEL_CHANGE 2
29 #define CONVERGENCE_LOWER_BOUND 5
30 #define CLEVEL_DECREASE_COOLDOWN 5
31 #define CHANGE_BY_TWO_THRESHOLD 0.1
32 #define CHANGE_BY_ONE_THRESHOLD 0.65
35 static int g_displayLevel
= DEFAULT_DISPLAY_LEVEL
;
37 static int g_displayLevel
= DEBUG_MODE
;
40 static unsigned g_compressionLevel
= DEFAULT_COMPRESSION_LEVEL
;
41 static UTIL_time_t g_startTime
;
42 static size_t g_streamedSize
= 0;
43 static unsigned g_useProgressBar
= 1;
44 static unsigned g_forceCompressionLevel
= 0;
45 static unsigned g_minCLevel
= 1;
46 static unsigned g_maxCLevel
;
63 unsigned lastJobPlusOne
;
64 size_t compressedSize
;
69 pthread_mutex_t pMutex
;
79 unsigned compressionLevel
;
85 * JobIDs for the next jobs to be created, compressed, and written
88 unsigned jobCompressedID
;
90 unsigned allJobsCompleted
;
93 * counter for how many jobs in a row the compression level has not changed
94 * if the counter becomes >= CONVERGENCE_LOWER_BOUND, the next time the
95 * compression level tries to change (by non-zero amount) resets the counter
96 * to 1 and does not apply the change
98 unsigned convergenceCounter
;
101 * cooldown counter in order to prevent rapid successive decreases in compression level
102 * whenever compression level is decreased, cooldown is set to CLEVEL_DECREASE_COOLDOWN
103 * whenever adaptCompressionLevel() is called and cooldown != 0, it is decremented
104 * as long as cooldown != 0, the compression level cannot be decreased
110 * Range from 0.0 to 1.0
111 * if the value is not 1.0, then this implies that thread X waited on thread Y to finish
112 * and thread Y was XWaitYCompletion finished at the time of the wait (i.e. compressWaitWriteCompletion=0.5
113 * implies that the compression thread waited on the write thread and it was only 50% finished writing a job)
115 double createWaitCompressionCompletion
;
116 double compressWaitCreateCompletion
;
117 double compressWaitWriteCompletion
;
118 double writeWaitCompressionCompletion
;
122 * Range from 0.0 to 1.0
123 * Jobs are divided into mini-chunks in order to measure completion
124 * these values are updated each time a thread finishes its operation on the
125 * mini-chunk (i.e. finishes writing out, compressing, etc. this mini-chunk).
127 double compressionCompletion
;
128 double writeCompletion
;
129 double createCompletion
;
131 mutex_t jobCompressed_mutex
;
132 cond_t jobCompressed_cond
;
133 mutex_t jobReady_mutex
;
134 cond_t jobReady_cond
;
135 mutex_t allJobsCompleted_mutex
;
136 cond_t allJobsCompleted_cond
;
137 mutex_t jobWrite_mutex
;
138 cond_t jobWrite_cond
;
139 mutex_t compressionCompletion_mutex
;
140 mutex_t createCompletion_mutex
;
141 mutex_t writeCompletion_mutex
;
142 mutex_t compressionLevel_mutex
;
145 jobDescription
* jobs
;
157 outputThreadArg
* otArg
;
160 static void freeCompressionJobs(adaptCCtx
* ctx
)
163 for (u
=0; u
<ctx
->numJobs
; u
++) {
164 jobDescription job
= ctx
->jobs
[u
];
170 static int destroyMutex(mutex_t
* mutex
)
172 if (mutex
->noError
) {
173 int const ret
= pthread_mutex_destroy(&mutex
->pMutex
);
179 static int destroyCond(cond_t
* cond
)
182 int const ret
= pthread_cond_destroy(&cond
->pCond
);
188 static int freeCCtx(adaptCCtx
* ctx
)
193 error
|= destroyMutex(&ctx
->jobCompressed_mutex
);
194 error
|= destroyCond(&ctx
->jobCompressed_cond
);
195 error
|= destroyMutex(&ctx
->jobReady_mutex
);
196 error
|= destroyCond(&ctx
->jobReady_cond
);
197 error
|= destroyMutex(&ctx
->allJobsCompleted_mutex
);
198 error
|= destroyCond(&ctx
->allJobsCompleted_cond
);
199 error
|= destroyMutex(&ctx
->jobWrite_mutex
);
200 error
|= destroyCond(&ctx
->jobWrite_cond
);
201 error
|= destroyMutex(&ctx
->compressionCompletion_mutex
);
202 error
|= destroyMutex(&ctx
->createCompletion_mutex
);
203 error
|= destroyMutex(&ctx
->writeCompletion_mutex
);
204 error
|= destroyMutex(&ctx
->compressionLevel_mutex
);
205 error
|= ZSTD_isError(ZSTD_freeCCtx(ctx
->cctx
));
206 free(ctx
->input
.buffer
.start
);
208 freeCompressionJobs(ctx
);
216 static int initMutex(mutex_t
* mutex
)
218 int const ret
= pthread_mutex_init(&mutex
->pMutex
, NULL
);
219 mutex
->noError
= !ret
;
223 static int initCond(cond_t
* cond
)
225 int const ret
= pthread_cond_init(&cond
->pCond
, NULL
);
226 cond
->noError
= !ret
;
230 static int initCCtx(adaptCCtx
* ctx
, unsigned numJobs
)
232 ctx
->compressionLevel
= g_compressionLevel
;
234 int pthreadError
= 0;
235 pthreadError
|= initMutex(&ctx
->jobCompressed_mutex
);
236 pthreadError
|= initCond(&ctx
->jobCompressed_cond
);
237 pthreadError
|= initMutex(&ctx
->jobReady_mutex
);
238 pthreadError
|= initCond(&ctx
->jobReady_cond
);
239 pthreadError
|= initMutex(&ctx
->allJobsCompleted_mutex
);
240 pthreadError
|= initCond(&ctx
->allJobsCompleted_cond
);
241 pthreadError
|= initMutex(&ctx
->jobWrite_mutex
);
242 pthreadError
|= initCond(&ctx
->jobWrite_cond
);
243 pthreadError
|= initMutex(&ctx
->compressionCompletion_mutex
);
244 pthreadError
|= initMutex(&ctx
->createCompletion_mutex
);
245 pthreadError
|= initMutex(&ctx
->writeCompletion_mutex
);
246 pthreadError
|= initMutex(&ctx
->compressionLevel_mutex
);
247 if (pthreadError
) return pthreadError
;
249 ctx
->numJobs
= numJobs
;
251 ctx
->jobCompressedID
= 0;
253 ctx
->lastDictSize
= 0;
256 ctx
->createWaitCompressionCompletion
= 1;
257 ctx
->compressWaitCreateCompletion
= 1;
258 ctx
->compressWaitWriteCompletion
= 1;
259 ctx
->writeWaitCompressionCompletion
= 1;
260 ctx
->createCompletion
= 1;
261 ctx
->writeCompletion
= 1;
262 ctx
->compressionCompletion
= 1;
263 ctx
->convergenceCounter
= 0;
266 ctx
->jobs
= calloc(1, numJobs
*sizeof(jobDescription
));
269 DISPLAY("Error: could not allocate space for jobs during context creation\n");
273 /* initializing jobs */
276 for (jobNum
=0; jobNum
<numJobs
; jobNum
++) {
277 jobDescription
* job
= &ctx
->jobs
[jobNum
];
278 job
->src
.start
= malloc(2 * FILE_CHUNK_SIZE
);
279 job
->dst
.start
= malloc(ZSTD_compressBound(FILE_CHUNK_SIZE
));
280 job
->lastJobPlusOne
= 0;
281 if (!job
->src
.start
|| !job
->dst
.start
) {
282 DISPLAY("Could not allocate buffers for jobs\n");
285 job
->src
.capacity
= FILE_CHUNK_SIZE
;
286 job
->dst
.capacity
= ZSTD_compressBound(FILE_CHUNK_SIZE
);
291 ctx
->threadError
= 0;
292 ctx
->allJobsCompleted
= 0;
294 ctx
->cctx
= ZSTD_createCCtx();
296 DISPLAY("Error: could not allocate ZSTD_CCtx\n");
300 ctx
->input
.filled
= 0;
301 ctx
->input
.buffer
.capacity
= 2 * FILE_CHUNK_SIZE
;
303 ctx
->input
.buffer
.start
= malloc(ctx
->input
.buffer
.capacity
);
304 if (!ctx
->input
.buffer
.start
) {
305 DISPLAY("Error: could not allocate input buffer\n");
311 static adaptCCtx
* createCCtx(unsigned numJobs
)
314 adaptCCtx
* const ctx
= calloc(1, sizeof(adaptCCtx
));
316 DISPLAY("Error: could not allocate space for context\n");
320 int const error
= initCCtx(ctx
, numJobs
);
329 static void signalErrorToThreads(adaptCCtx
* ctx
)
331 ctx
->threadError
= 1;
332 pthread_mutex_lock(&ctx
->jobReady_mutex
.pMutex
);
333 pthread_cond_signal(&ctx
->jobReady_cond
.pCond
);
334 pthread_mutex_unlock(&ctx
->jobReady_mutex
.pMutex
);
336 pthread_mutex_lock(&ctx
->jobCompressed_mutex
.pMutex
);
337 pthread_cond_broadcast(&ctx
->jobCompressed_cond
.pCond
);
338 pthread_mutex_unlock(&ctx
->jobReady_mutex
.pMutex
);
340 pthread_mutex_lock(&ctx
->jobWrite_mutex
.pMutex
);
341 pthread_cond_signal(&ctx
->jobWrite_cond
.pCond
);
342 pthread_mutex_unlock(&ctx
->jobWrite_mutex
.pMutex
);
344 pthread_mutex_lock(&ctx
->allJobsCompleted_mutex
.pMutex
);
345 pthread_cond_signal(&ctx
->allJobsCompleted_cond
.pCond
);
346 pthread_mutex_unlock(&ctx
->allJobsCompleted_mutex
.pMutex
);
349 static void waitUntilAllJobsCompleted(adaptCCtx
* ctx
)
352 pthread_mutex_lock(&ctx
->allJobsCompleted_mutex
.pMutex
);
353 while (ctx
->allJobsCompleted
== 0 && !ctx
->threadError
) {
354 pthread_cond_wait(&ctx
->allJobsCompleted_cond
.pCond
, &ctx
->allJobsCompleted_mutex
.pMutex
);
356 pthread_mutex_unlock(&ctx
->allJobsCompleted_mutex
.pMutex
);
359 /* map completion percentages to values for changing compression level */
360 static unsigned convertCompletionToChange(double completion
)
362 if (completion
< CHANGE_BY_TWO_THRESHOLD
) {
365 else if (completion
< CHANGE_BY_ONE_THRESHOLD
) {
374 * Compression level is changed depending on which part of the compression process is lagging
375 * Currently, three theads exist for job creation, compression, and file writing respectively.
376 * adaptCompressionLevel() increments or decrements compression level based on which of the threads is lagging
377 * job creation or file writing lag => increased compression level
378 * compression thread lag => decreased compression level
379 * detecting which thread is lagging is done by keeping track of how many calls each thread makes to pthread_cond_wait
381 static void adaptCompressionLevel(adaptCCtx
* ctx
)
383 double createWaitCompressionCompletion
;
384 double compressWaitCreateCompletion
;
385 double compressWaitWriteCompletion
;
386 double writeWaitCompressionCompletion
;
387 double const threshold
= 0.00001;
388 unsigned prevCompressionLevel
;
390 pthread_mutex_lock(&ctx
->compressionLevel_mutex
.pMutex
);
391 prevCompressionLevel
= ctx
->compressionLevel
;
392 pthread_mutex_unlock(&ctx
->compressionLevel_mutex
.pMutex
);
395 if (g_forceCompressionLevel
) {
396 pthread_mutex_lock(&ctx
->compressionLevel_mutex
.pMutex
);
397 ctx
->compressionLevel
= g_compressionLevel
;
398 pthread_mutex_unlock(&ctx
->compressionLevel_mutex
.pMutex
);
403 DEBUG(2, "adapting compression level %u\n", prevCompressionLevel
);
405 /* read and reset completion measurements */
406 pthread_mutex_lock(&ctx
->compressionCompletion_mutex
.pMutex
);
407 DEBUG(2, "createWaitCompressionCompletion %f\n", ctx
->createWaitCompressionCompletion
);
408 DEBUG(2, "writeWaitCompressionCompletion %f\n", ctx
->writeWaitCompressionCompletion
);
409 createWaitCompressionCompletion
= ctx
->createWaitCompressionCompletion
;
410 writeWaitCompressionCompletion
= ctx
->writeWaitCompressionCompletion
;
411 pthread_mutex_unlock(&ctx
->compressionCompletion_mutex
.pMutex
);
413 pthread_mutex_lock(&ctx
->writeCompletion_mutex
.pMutex
);
414 DEBUG(2, "compressWaitWriteCompletion %f\n", ctx
->compressWaitWriteCompletion
);
415 compressWaitWriteCompletion
= ctx
->compressWaitWriteCompletion
;
416 pthread_mutex_unlock(&ctx
->writeCompletion_mutex
.pMutex
);
418 pthread_mutex_lock(&ctx
->createCompletion_mutex
.pMutex
);
419 DEBUG(2, "compressWaitCreateCompletion %f\n", ctx
->compressWaitCreateCompletion
);
420 compressWaitCreateCompletion
= ctx
->compressWaitCreateCompletion
;
421 pthread_mutex_unlock(&ctx
->createCompletion_mutex
.pMutex
);
422 DEBUG(2, "convergence counter: %u\n", ctx
->convergenceCounter
);
424 assert(g_minCLevel
<= prevCompressionLevel
&& g_maxCLevel
>= prevCompressionLevel
);
426 /* adaptation logic */
427 if (ctx
->cooldown
) ctx
->cooldown
--;
429 if ((1-createWaitCompressionCompletion
> threshold
|| 1-writeWaitCompressionCompletion
> threshold
) && ctx
->cooldown
== 0) {
430 /* create or write waiting on compression */
431 /* use whichever one waited less because it was slower */
432 double const completion
= MAX(createWaitCompressionCompletion
, writeWaitCompressionCompletion
);
433 unsigned const change
= convertCompletionToChange(completion
);
434 unsigned const boundChange
= MIN(change
, prevCompressionLevel
- g_minCLevel
);
435 if (ctx
->convergenceCounter
>= CONVERGENCE_LOWER_BOUND
&& boundChange
!= 0) {
436 /* reset convergence counter, might have been a spike */
437 ctx
->convergenceCounter
= 0;
438 DEBUG(2, "convergence counter reset, no change applied\n");
440 else if (boundChange
!= 0) {
441 pthread_mutex_lock(&ctx
->compressionLevel_mutex
.pMutex
);
442 ctx
->compressionLevel
-= boundChange
;
443 pthread_mutex_unlock(&ctx
->compressionLevel_mutex
.pMutex
);
444 ctx
->cooldown
= CLEVEL_DECREASE_COOLDOWN
;
445 ctx
->convergenceCounter
= 1;
447 DEBUG(2, "create or write threads waiting on compression, tried to decrease compression level by %u\n\n", boundChange
);
450 else if (1-compressWaitWriteCompletion
> threshold
|| 1-compressWaitCreateCompletion
> threshold
) {
451 /* compress waiting on write */
452 double const completion
= MIN(compressWaitWriteCompletion
, compressWaitCreateCompletion
);
453 unsigned const change
= convertCompletionToChange(completion
);
454 unsigned const boundChange
= MIN(change
, g_maxCLevel
- prevCompressionLevel
);
455 if (ctx
->convergenceCounter
>= CONVERGENCE_LOWER_BOUND
&& boundChange
!= 0) {
456 /* reset convergence counter, might have been a spike */
457 ctx
->convergenceCounter
= 0;
458 DEBUG(2, "convergence counter reset, no change applied\n");
460 else if (boundChange
!= 0) {
461 pthread_mutex_lock(&ctx
->compressionLevel_mutex
.pMutex
);
462 ctx
->compressionLevel
+= boundChange
;
463 pthread_mutex_unlock(&ctx
->compressionLevel_mutex
.pMutex
);
465 ctx
->convergenceCounter
= 1;
467 DEBUG(2, "compress waiting on write or create, tried to increase compression level by %u\n\n", boundChange
);
472 pthread_mutex_lock(&ctx
->compressionLevel_mutex
.pMutex
);
473 if (ctx
->compressionLevel
== prevCompressionLevel
) {
474 ctx
->convergenceCounter
++;
476 pthread_mutex_unlock(&ctx
->compressionLevel_mutex
.pMutex
);
479 static size_t getUseableDictSize(unsigned compressionLevel
)
481 ZSTD_parameters
const params
= ZSTD_getParams(compressionLevel
, 0, 0);
482 unsigned const overlapLog
= compressionLevel
>= (unsigned)ZSTD_maxCLevel() ? 0 : 3;
483 size_t const overlapSize
= 1 << (params
.cParams
.windowLog
- overlapLog
);
487 static void* compressionThread(void* arg
)
489 adaptCCtx
* const ctx
= (adaptCCtx
*)arg
;
490 unsigned currJob
= 0;
492 unsigned const currJobIndex
= currJob
% ctx
->numJobs
;
493 jobDescription
* const job
= &ctx
->jobs
[currJobIndex
];
494 DEBUG(2, "starting compression for job %u\n", currJob
);
497 /* check if compression thread will have to wait */
498 unsigned willWaitForCreate
= 0;
499 unsigned willWaitForWrite
= 0;
501 pthread_mutex_lock(&ctx
->jobReady_mutex
.pMutex
);
502 if (currJob
+ 1 > ctx
->jobReadyID
) willWaitForCreate
= 1;
503 pthread_mutex_unlock(&ctx
->jobReady_mutex
.pMutex
);
505 pthread_mutex_lock(&ctx
->jobWrite_mutex
.pMutex
);
506 if (currJob
- ctx
->jobWriteID
>= ctx
->numJobs
) willWaitForWrite
= 1;
507 pthread_mutex_unlock(&ctx
->jobWrite_mutex
.pMutex
);
510 pthread_mutex_lock(&ctx
->createCompletion_mutex
.pMutex
);
511 if (willWaitForCreate
) {
512 DEBUG(2, "compression will wait for create on job %u\n", currJob
);
513 ctx
->compressWaitCreateCompletion
= ctx
->createCompletion
;
514 DEBUG(2, "create completion %f\n", ctx
->compressWaitCreateCompletion
);
518 ctx
->compressWaitCreateCompletion
= 1;
520 pthread_mutex_unlock(&ctx
->createCompletion_mutex
.pMutex
);
522 pthread_mutex_lock(&ctx
->writeCompletion_mutex
.pMutex
);
523 if (willWaitForWrite
) {
524 DEBUG(2, "compression will wait for write on job %u\n", currJob
);
525 ctx
->compressWaitWriteCompletion
= ctx
->writeCompletion
;
526 DEBUG(2, "write completion %f\n", ctx
->compressWaitWriteCompletion
);
529 ctx
->compressWaitWriteCompletion
= 1;
531 pthread_mutex_unlock(&ctx
->writeCompletion_mutex
.pMutex
);
535 /* wait until job is ready */
536 pthread_mutex_lock(&ctx
->jobReady_mutex
.pMutex
);
537 while (currJob
+ 1 > ctx
->jobReadyID
&& !ctx
->threadError
) {
538 pthread_cond_wait(&ctx
->jobReady_cond
.pCond
, &ctx
->jobReady_mutex
.pMutex
);
540 pthread_mutex_unlock(&ctx
->jobReady_mutex
.pMutex
);
542 /* wait until job previously in this space is written */
543 pthread_mutex_lock(&ctx
->jobWrite_mutex
.pMutex
);
544 while (currJob
- ctx
->jobWriteID
>= ctx
->numJobs
&& !ctx
->threadError
) {
545 pthread_cond_wait(&ctx
->jobWrite_cond
.pCond
, &ctx
->jobWrite_mutex
.pMutex
);
547 pthread_mutex_unlock(&ctx
->jobWrite_mutex
.pMutex
);
548 /* reset compression completion */
549 pthread_mutex_lock(&ctx
->compressionCompletion_mutex
.pMutex
);
550 ctx
->compressionCompletion
= 0;
551 pthread_mutex_unlock(&ctx
->compressionCompletion_mutex
.pMutex
);
553 /* adapt compression level */
554 if (currJob
) adaptCompressionLevel(ctx
);
556 pthread_mutex_lock(&ctx
->compressionLevel_mutex
.pMutex
);
557 DEBUG(2, "job %u compressed with level %u\n", currJob
, ctx
->compressionLevel
);
558 pthread_mutex_unlock(&ctx
->compressionLevel_mutex
.pMutex
);
560 /* compress the data */
562 size_t const compressionBlockSize
= ZSTD_BLOCKSIZE_MAX
; /* 128 KB */
564 unsigned blockNum
= 0;
565 size_t remaining
= job
->src
.size
;
569 pthread_mutex_lock(&ctx
->compressionLevel_mutex
.pMutex
);
570 cLevel
= ctx
->compressionLevel
;
571 pthread_mutex_unlock(&ctx
->compressionLevel_mutex
.pMutex
);
573 /* reset compressed size */
574 job
->compressedSize
= 0;
575 DEBUG(2, "calling ZSTD_compressBegin()\n");
576 /* begin compression */
578 size_t const useDictSize
= MIN(getUseableDictSize(cLevel
), job
->dictSize
);
579 ZSTD_parameters params
= ZSTD_getParams(cLevel
, 0, useDictSize
);
580 params
.cParams
.windowLog
= 23;
582 size_t const initError
= ZSTD_compressBegin_advanced(ctx
->cctx
, job
->src
.start
+ job
->dictSize
- useDictSize
, useDictSize
, params
, 0);
583 size_t const windowSizeError
= ZSTD_CCtx_setParameter(ctx
->cctx
, ZSTD_c_forceMaxWindow
, 1);
584 if (ZSTD_isError(initError
) || ZSTD_isError(windowSizeError
)) {
585 DISPLAY("Error: something went wrong while starting compression\n");
586 signalErrorToThreads(ctx
);
591 DEBUG(2, "finished with ZSTD_compressBegin()\n");
594 size_t const actualBlockSize
= MIN(remaining
, compressionBlockSize
);
596 /* continue compression */
597 if (currJob
!= 0 || blockNum
!= 0) { /* not first block of first job flush/overwrite the frame header */
598 size_t const hSize
= ZSTD_compressContinue(ctx
->cctx
, job
->dst
.start
+ dstPos
, job
->dst
.capacity
- dstPos
, job
->src
.start
+ job
->dictSize
+ srcPos
, 0);
599 if (ZSTD_isError(hSize
)) {
600 DISPLAY("Error: something went wrong while continuing compression\n");
601 job
->compressedSize
= hSize
;
602 signalErrorToThreads(ctx
);
605 ZSTD_invalidateRepCodes(ctx
->cctx
);
608 size_t const ret
= (job
->lastJobPlusOne
== currJob
+ 1 && remaining
== actualBlockSize
) ?
609 ZSTD_compressEnd (ctx
->cctx
, job
->dst
.start
+ dstPos
, job
->dst
.capacity
- dstPos
, job
->src
.start
+ job
->dictSize
+ srcPos
, actualBlockSize
) :
610 ZSTD_compressContinue(ctx
->cctx
, job
->dst
.start
+ dstPos
, job
->dst
.capacity
- dstPos
, job
->src
.start
+ job
->dictSize
+ srcPos
, actualBlockSize
);
611 if (ZSTD_isError(ret
)) {
612 DISPLAY("Error: something went wrong during compression: %s\n", ZSTD_getErrorName(ret
));
613 signalErrorToThreads(ctx
);
616 job
->compressedSize
+= ret
;
617 remaining
-= actualBlockSize
;
618 srcPos
+= actualBlockSize
;
622 /* update completion */
623 pthread_mutex_lock(&ctx
->compressionCompletion_mutex
.pMutex
);
624 ctx
->compressionCompletion
= 1 - (double)remaining
/job
->src
.size
;
625 pthread_mutex_unlock(&ctx
->compressionCompletion_mutex
.pMutex
);
627 } while (remaining
!= 0);
628 job
->dst
.size
= job
->compressedSize
;
630 pthread_mutex_lock(&ctx
->jobCompressed_mutex
.pMutex
);
631 ctx
->jobCompressedID
++;
632 pthread_cond_broadcast(&ctx
->jobCompressed_cond
.pCond
);
633 pthread_mutex_unlock(&ctx
->jobCompressed_mutex
.pMutex
);
634 if (job
->lastJobPlusOne
== currJob
+ 1 || ctx
->threadError
) {
635 /* finished compressing all jobs */
638 DEBUG(2, "finished compressing job %u\n", currJob
);
644 static void displayProgress(unsigned cLevel
, unsigned last
)
646 UTIL_time_t currTime
= UTIL_getTime();
647 if (!g_useProgressBar
) return;
648 { double const timeElapsed
= (double)(UTIL_getSpanTimeMicro(g_startTime
, currTime
) / 1000.0);
649 double const sizeMB
= (double)g_streamedSize
/ (1 << 20);
650 double const avgCompRate
= sizeMB
* 1000 / timeElapsed
;
651 fprintf(stderr
, "\r| Comp. Level: %2u | Time Elapsed: %7.2f s | Data Size: %7.1f MB | Avg Comp. Rate: %6.2f MB/s |", cLevel
, timeElapsed
/1000.0, sizeMB
, avgCompRate
);
653 fprintf(stderr
, "\n");
659 static void* outputThread(void* arg
)
661 outputThreadArg
* const otArg
= (outputThreadArg
*)arg
;
662 adaptCCtx
* const ctx
= otArg
->ctx
;
663 FILE* const dstFile
= otArg
->dstFile
;
665 unsigned currJob
= 0;
667 unsigned const currJobIndex
= currJob
% ctx
->numJobs
;
668 jobDescription
* const job
= &ctx
->jobs
[currJobIndex
];
669 unsigned willWaitForCompress
= 0;
670 DEBUG(2, "starting write for job %u\n", currJob
);
672 pthread_mutex_lock(&ctx
->jobCompressed_mutex
.pMutex
);
673 if (currJob
+ 1 > ctx
->jobCompressedID
) willWaitForCompress
= 1;
674 pthread_mutex_unlock(&ctx
->jobCompressed_mutex
.pMutex
);
677 pthread_mutex_lock(&ctx
->compressionCompletion_mutex
.pMutex
);
678 if (willWaitForCompress
) {
679 /* write thread is waiting on compression thread */
680 ctx
->writeWaitCompressionCompletion
= ctx
->compressionCompletion
;
681 DEBUG(2, "writer thread waiting for nextJob: %u, writeWaitCompressionCompletion %f\n", currJob
, ctx
->writeWaitCompressionCompletion
);
684 ctx
->writeWaitCompressionCompletion
= 1;
686 pthread_mutex_unlock(&ctx
->compressionCompletion_mutex
.pMutex
);
688 pthread_mutex_lock(&ctx
->jobCompressed_mutex
.pMutex
);
689 while (currJob
+ 1 > ctx
->jobCompressedID
&& !ctx
->threadError
) {
690 pthread_cond_wait(&ctx
->jobCompressed_cond
.pCond
, &ctx
->jobCompressed_mutex
.pMutex
);
692 pthread_mutex_unlock(&ctx
->jobCompressed_mutex
.pMutex
);
694 /* reset write completion */
695 pthread_mutex_lock(&ctx
->writeCompletion_mutex
.pMutex
);
696 ctx
->writeCompletion
= 0;
697 pthread_mutex_unlock(&ctx
->writeCompletion_mutex
.pMutex
);
700 size_t const compressedSize
= job
->compressedSize
;
701 size_t remaining
= compressedSize
;
702 if (ZSTD_isError(compressedSize
)) {
703 DISPLAY("Error: an error occurred during compression\n");
704 signalErrorToThreads(ctx
);
708 size_t const blockSize
= MAX(compressedSize
>> 7, 1 << 10);
711 size_t const writeSize
= MIN(remaining
, blockSize
);
712 size_t const ret
= fwrite(job
->dst
.start
+ pos
, 1, writeSize
, dstFile
);
713 if (ret
!= writeSize
) break;
717 /* update completion variable for writing */
718 pthread_mutex_lock(&ctx
->writeCompletion_mutex
.pMutex
);
719 ctx
->writeCompletion
= 1 - (double)remaining
/compressedSize
;
720 pthread_mutex_unlock(&ctx
->writeCompletion_mutex
.pMutex
);
722 if (remaining
== 0) break;
724 if (pos
!= compressedSize
) {
725 DISPLAY("Error: an error occurred during file write operation\n");
726 signalErrorToThreads(ctx
);
733 pthread_mutex_lock(&ctx
->compressionLevel_mutex
.pMutex
);
734 cLevel
= ctx
->compressionLevel
;
735 pthread_mutex_unlock(&ctx
->compressionLevel_mutex
.pMutex
);
736 displayProgress(cLevel
, job
->lastJobPlusOne
== currJob
+ 1);
738 pthread_mutex_lock(&ctx
->jobWrite_mutex
.pMutex
);
740 pthread_cond_signal(&ctx
->jobWrite_cond
.pCond
);
741 pthread_mutex_unlock(&ctx
->jobWrite_mutex
.pMutex
);
743 if (job
->lastJobPlusOne
== currJob
+ 1 || ctx
->threadError
) {
744 /* finished with all jobs */
745 pthread_mutex_lock(&ctx
->allJobsCompleted_mutex
.pMutex
);
746 ctx
->allJobsCompleted
= 1;
747 pthread_cond_signal(&ctx
->allJobsCompleted_cond
.pCond
);
748 pthread_mutex_unlock(&ctx
->allJobsCompleted_mutex
.pMutex
);
751 DEBUG(2, "finished writing job %u\n", currJob
);
758 static int createCompressionJob(adaptCCtx
* ctx
, size_t srcSize
, int last
)
760 unsigned const nextJob
= ctx
->nextJobID
;
761 unsigned const nextJobIndex
= nextJob
% ctx
->numJobs
;
762 jobDescription
* const job
= &ctx
->jobs
[nextJobIndex
];
765 job
->src
.size
= srcSize
;
766 job
->jobID
= nextJob
;
767 if (last
) job
->lastJobPlusOne
= nextJob
+ 1;
770 void* const copy
= job
->src
.start
;
771 job
->src
.start
= ctx
->input
.buffer
.start
;
772 ctx
->input
.buffer
.start
= copy
;
774 job
->dictSize
= ctx
->lastDictSize
;
777 /* if not on the last job, reuse data as dictionary in next job */
779 size_t const oldDictSize
= ctx
->lastDictSize
;
780 memcpy(ctx
->input
.buffer
.start
, job
->src
.start
+ oldDictSize
, srcSize
);
781 ctx
->lastDictSize
= srcSize
;
782 ctx
->input
.filled
= srcSize
;
785 /* signal job ready */
786 pthread_mutex_lock(&ctx
->jobReady_mutex
.pMutex
);
788 pthread_cond_signal(&ctx
->jobReady_cond
.pCond
);
789 pthread_mutex_unlock(&ctx
->jobReady_mutex
.pMutex
);
794 static int performCompression(adaptCCtx
* ctx
, FILE* const srcFile
, outputThreadArg
* otArg
)
796 /* early error check to exit */
797 if (!ctx
|| !srcFile
|| !otArg
) {
801 /* create output thread */
804 if (pthread_create(&out
, NULL
, &outputThread
, otArg
)) {
805 DISPLAY("Error: could not create output thread\n");
806 signalErrorToThreads(ctx
);
809 else if (pthread_detach(out
)) {
810 DISPLAY("Error: could not detach output thread\n");
811 signalErrorToThreads(ctx
);
816 /* create compression thread */
818 pthread_t compression
;
819 if (pthread_create(&compression
, NULL
, &compressionThread
, ctx
)) {
820 DISPLAY("Error: could not create compression thread\n");
821 signalErrorToThreads(ctx
);
824 else if (pthread_detach(compression
)) {
825 DISPLAY("Error: could not detach compression thread\n");
826 signalErrorToThreads(ctx
);
831 unsigned currJob
= 0;
835 size_t const readBlockSize
= 1 << 15;
836 size_t remaining
= FILE_CHUNK_SIZE
;
837 unsigned const nextJob
= ctx
->nextJobID
;
838 unsigned willWaitForCompress
= 0;
839 DEBUG(2, "starting creation of job %u\n", currJob
);
841 pthread_mutex_lock(&ctx
->jobCompressed_mutex
.pMutex
);
842 if (nextJob
- ctx
->jobCompressedID
>= ctx
->numJobs
) willWaitForCompress
= 1;
843 pthread_mutex_unlock(&ctx
->jobCompressed_mutex
.pMutex
);
845 pthread_mutex_lock(&ctx
->compressionCompletion_mutex
.pMutex
);
846 if (willWaitForCompress
) {
847 /* creation thread is waiting, take measurement of completion */
848 ctx
->createWaitCompressionCompletion
= ctx
->compressionCompletion
;
849 DEBUG(2, "create thread waiting for nextJob: %u, createWaitCompressionCompletion %f\n", nextJob
, ctx
->createWaitCompressionCompletion
);
852 ctx
->createWaitCompressionCompletion
= 1;
854 pthread_mutex_unlock(&ctx
->compressionCompletion_mutex
.pMutex
);
856 /* wait until the job has been compressed */
857 pthread_mutex_lock(&ctx
->jobCompressed_mutex
.pMutex
);
858 while (nextJob
- ctx
->jobCompressedID
>= ctx
->numJobs
&& !ctx
->threadError
) {
859 pthread_cond_wait(&ctx
->jobCompressed_cond
.pCond
, &ctx
->jobCompressed_mutex
.pMutex
);
861 pthread_mutex_unlock(&ctx
->jobCompressed_mutex
.pMutex
);
863 /* reset create completion */
864 pthread_mutex_lock(&ctx
->createCompletion_mutex
.pMutex
);
865 ctx
->createCompletion
= 0;
866 pthread_mutex_unlock(&ctx
->createCompletion_mutex
.pMutex
);
868 while (remaining
!= 0 && !feof(srcFile
)) {
869 size_t const ret
= fread(ctx
->input
.buffer
.start
+ ctx
->input
.filled
+ pos
, 1, readBlockSize
, srcFile
);
870 if (ret
!= readBlockSize
&& !feof(srcFile
)) {
871 /* error could not read correct number of bytes */
872 DISPLAY("Error: problem occurred during read from src file\n");
873 signalErrorToThreads(ctx
);
878 pthread_mutex_lock(&ctx
->createCompletion_mutex
.pMutex
);
879 ctx
->createCompletion
= 1 - (double)remaining
/((size_t)FILE_CHUNK_SIZE
);
880 pthread_mutex_unlock(&ctx
->createCompletion_mutex
.pMutex
);
882 if (remaining
!= 0 && !feof(srcFile
)) {
883 DISPLAY("Error: problem occurred during read from src file\n");
884 signalErrorToThreads(ctx
);
887 g_streamedSize
+= pos
;
888 /* reading was fine, now create the compression job */
890 int const last
= feof(srcFile
);
891 int const error
= createCompressionJob(ctx
, pos
, last
);
893 signalErrorToThreads(ctx
);
897 DEBUG(2, "finished creating job %u\n", currJob
);
904 /* success -- created all jobs */
908 static fcResources
createFileCompressionResources(const char* const srcFilename
, const char* const dstFilenameOrNull
)
911 unsigned const stdinUsed
= !strcmp(srcFilename
, stdinmark
);
912 FILE* const srcFile
= stdinUsed
? stdin
: fopen(srcFilename
, "rb");
913 const char* const outFilenameIntermediate
= (stdinUsed
&& !dstFilenameOrNull
) ? stdoutmark
: dstFilenameOrNull
;
914 const char* outFilename
= outFilenameIntermediate
;
915 char fileAndSuffix
[MAX_PATH
];
916 size_t const numJobs
= MAX_NUM_JOBS
;
918 memset(&fcr
, 0, sizeof(fcr
));
920 if (!outFilenameIntermediate
) {
921 if (snprintf(fileAndSuffix
, MAX_PATH
, "%s.zst", srcFilename
) + 1 > MAX_PATH
) {
922 DISPLAY("Error: output filename is too long\n");
925 outFilename
= fileAndSuffix
;
929 unsigned const stdoutUsed
= !strcmp(outFilename
, stdoutmark
);
930 FILE* const dstFile
= stdoutUsed
? stdout
: fopen(outFilename
, "wb");
931 fcr
.otArg
= malloc(sizeof(outputThreadArg
));
933 DISPLAY("Error: could not allocate space for output thread argument\n");
936 fcr
.otArg
->dstFile
= dstFile
;
938 /* checking for errors */
939 if (!fcr
.otArg
->dstFile
|| !srcFile
) {
940 DISPLAY("Error: some file(s) could not be opened\n");
944 /* creating context */
945 fcr
.ctx
= createCCtx(numJobs
);
946 fcr
.otArg
->ctx
= fcr
.ctx
;
947 fcr
.srcFile
= srcFile
;
951 static int freeFileCompressionResources(fcResources
* fcr
)
954 waitUntilAllJobsCompleted(fcr
->ctx
);
955 ret
|= (fcr
->srcFile
!= NULL
) ? fclose(fcr
->srcFile
) : 0;
956 ret
|= (fcr
->ctx
!= NULL
) ? freeCCtx(fcr
->ctx
) : 0;
958 ret
|= (fcr
->otArg
->dstFile
!= stdout
) ? fclose(fcr
->otArg
->dstFile
) : 0;
960 /* no need to freeCCtx() on otArg->ctx because it should be the same context */
965 static int compressFilename(const char* const srcFilename
, const char* const dstFilenameOrNull
)
968 fcResources fcr
= createFileCompressionResources(srcFilename
, dstFilenameOrNull
);
970 ret
|= performCompression(fcr
.ctx
, fcr
.srcFile
, fcr
.otArg
);
971 ret
|= freeFileCompressionResources(&fcr
);
975 static int compressFilenames(const char** filenameTable
, unsigned numFiles
, unsigned forceStdout
)
979 for (fileNum
=0; fileNum
<numFiles
; fileNum
++) {
980 const char* filename
= filenameTable
[fileNum
];
982 ret
|= compressFilename(filename
, NULL
);
985 ret
|= compressFilename(filename
, stdoutmark
);
992 /*! readU32FromChar() :
993 @return : unsigned integer value read from input in `char` format
994 allows and interprets K, KB, KiB, M, MB and MiB suffix.
995 Will also modify `*stringPtr`, advancing it to position where it stopped reading.
996 Note : function result can overflow if digit string > MAX_UINT */
997 static unsigned readU32FromChar(const char** stringPtr
)
1000 while ((**stringPtr
>='0') && (**stringPtr
<='9'))
1001 result
*= 10, result
+= **stringPtr
- '0', (*stringPtr
)++ ;
1002 if ((**stringPtr
=='K') || (**stringPtr
=='M')) {
1004 if (**stringPtr
=='M') result
<<= 10;
1006 if (**stringPtr
=='i') (*stringPtr
)++;
1007 if (**stringPtr
=='B') (*stringPtr
)++;
1012 static void help(const char* progPath
)
1015 PRINT(" %s [options] [file(s)]\n", progPath
);
1017 PRINT("Options:\n");
1018 PRINT(" -oFILE : specify the output file name\n");
1019 PRINT(" -i# : provide initial compression level -- default %d, must be in the range [L, U] where L and U are bound values (see below for defaults)\n", DEFAULT_COMPRESSION_LEVEL
);
1020 PRINT(" -h : display help/information\n");
1021 PRINT(" -f : force the compression level to stay constant\n");
1022 PRINT(" -c : force write to stdout\n");
1023 PRINT(" -p : hide progress bar\n");
1024 PRINT(" -q : quiet mode -- do not show progress bar or other information\n");
1025 PRINT(" -l# : provide lower bound for compression level -- default 1\n");
1026 PRINT(" -u# : provide upper bound for compression level -- default %u\n", ZSTD_maxCLevel());
1028 /* return 0 if successful, else return error */
1029 int main(int argCount
, const char* argv
[])
1031 const char* outFilename
= NULL
;
1032 const char** filenameTable
= (const char**)malloc(argCount
*sizeof(const char*));
1033 unsigned filenameIdx
= 0;
1034 unsigned forceStdout
= 0;
1035 unsigned providedInitialCLevel
= 0;
1038 filenameTable
[0] = stdinmark
;
1039 g_maxCLevel
= ZSTD_maxCLevel();
1041 if (filenameTable
== NULL
) {
1042 DISPLAY("Error: could not allocate sapce for filename table.\n");
1046 for (argNum
=1; argNum
<argCount
; argNum
++) {
1047 const char* argument
= argv
[argNum
];
1049 /* output filename designated with "-o" */
1050 if (argument
[0]=='-' && strlen(argument
) > 1) {
1051 switch (argument
[1]) {
1054 outFilename
= argument
;
1058 g_compressionLevel
= readU32FromChar(&argument
);
1059 providedInitialCLevel
= 1;
1065 g_useProgressBar
= 0;
1069 outFilename
= stdoutmark
;
1072 g_forceCompressionLevel
= 1;
1075 g_useProgressBar
= 0;
1080 g_minCLevel
= readU32FromChar(&argument
);
1084 g_maxCLevel
= readU32FromChar(&argument
);
1087 DISPLAY("Error: invalid argument provided\n");
1094 /* regular files to be compressed */
1095 filenameTable
[filenameIdx
++] = argument
;
1098 /* check initial, max, and min compression levels */
1100 unsigned const minMaxInconsistent
= g_minCLevel
> g_maxCLevel
;
1101 unsigned const initialNotInRange
= g_minCLevel
> g_compressionLevel
|| g_maxCLevel
< g_compressionLevel
;
1102 if (minMaxInconsistent
|| (initialNotInRange
&& providedInitialCLevel
)) {
1103 DISPLAY("Error: provided compression level parameters are invalid\n");
1107 else if (initialNotInRange
) {
1108 g_compressionLevel
= g_minCLevel
;
1112 /* error checking with number of files */
1113 if (filenameIdx
> 1 && (outFilename
!= NULL
&& strcmp(outFilename
, stdoutmark
))) {
1114 DISPLAY("Error: multiple input files provided, cannot use specified output file\n");
1119 /* compress files */
1120 if (filenameIdx
<= 1) {
1121 ret
|= compressFilename(filenameTable
[0], outFilename
);
1124 ret
|= compressFilenames(filenameTable
, filenameIdx
, forceStdout
);
1127 free(filenameTable
);