2 * Copyright (c) 2016-2020, Yann Collet, Facebook, Inc.
5 * This source code is licensed under both the BSD-style license (found in the
6 * LICENSE file in the root directory of this source tree) and the GPLv2 (found
7 * in the COPYING file in the root directory of this source tree).
8 * You may select, at your option, one of the above-listed licenses.
12 /* ====== Compiler specifics ====== */
14 # pragma warning(disable : 4204) /* disable: C4204: non-constant aggregate initializer */
18 /* ====== Constants ====== */
19 #define ZSTDMT_OVERLAPLOG_DEFAULT 0
22 /* ====== Dependencies ====== */
23 #include <string.h> /* memcpy, memset */
24 #include <limits.h> /* INT_MAX, UINT_MAX */
25 #include "../common/mem.h" /* MEM_STATIC */
26 #include "../common/pool.h" /* threadpool */
27 #include "../common/threading.h" /* mutex */
28 #include "zstd_compress_internal.h" /* MIN, ERROR, ZSTD_*, ZSTD_highbit32 */
30 #include "zstdmt_compress.h"
32 /* Guards code to support resizing the SeqPool.
33 * We will want to resize the SeqPool to save memory in the future.
34 * Until then, comment the code out since it is unused.
36 #define ZSTD_RESIZE_SEQPOOL 0
38 /* ====== Debug ====== */
39 #if defined(DEBUGLEVEL) && (DEBUGLEVEL>=2) \
40 && !defined(_MSC_VER) \
41 && !defined(__MINGW32__)
45 # include <sys/times.h>
47 # define DEBUG_PRINTHEX(l,p,n) { \
49 for (debug_u=0; debug_u<(n); debug_u++) \
50 RAWLOG(l, "%02X ", ((const unsigned char*)(p))[debug_u]); \
54 static unsigned long long GetCurrentClockTimeMicroseconds(void)
56 static clock_t _ticksPerSecond
= 0;
57 if (_ticksPerSecond
<= 0) _ticksPerSecond
= sysconf(_SC_CLK_TCK
);
59 { struct tms junk
; clock_t newTicks
= (clock_t) times(&junk
);
60 return ((((unsigned long long)newTicks
)*(1000000))/_ticksPerSecond
);
63 #define MUTEX_WAIT_TIME_DLEVEL 6
64 #define ZSTD_PTHREAD_MUTEX_LOCK(mutex) { \
65 if (DEBUGLEVEL >= MUTEX_WAIT_TIME_DLEVEL) { \
66 unsigned long long const beforeTime = GetCurrentClockTimeMicroseconds(); \
67 ZSTD_pthread_mutex_lock(mutex); \
68 { unsigned long long const afterTime = GetCurrentClockTimeMicroseconds(); \
69 unsigned long long const elapsedTime = (afterTime-beforeTime); \
70 if (elapsedTime > 1000) { /* or whatever threshold you like; I'm using 1 millisecond here */ \
71 DEBUGLOG(MUTEX_WAIT_TIME_DLEVEL, "Thread took %llu microseconds to acquire mutex %s \n", \
72 elapsedTime, #mutex); \
75 ZSTD_pthread_mutex_lock(mutex); \
81 # define ZSTD_PTHREAD_MUTEX_LOCK(m) ZSTD_pthread_mutex_lock(m)
82 # define DEBUG_PRINTHEX(l,p,n) {}
87 /* ===== Buffer Pool ===== */
88 /* a single Buffer Pool can be invoked from multiple threads in parallel */
90 typedef struct buffer_s
{
95 static const buffer_t g_nullBuffer
= { NULL
, 0 };
97 typedef struct ZSTDMT_bufferPool_s
{
98 ZSTD_pthread_mutex_t poolMutex
;
100 unsigned totalBuffers
;
103 buffer_t bTable
[1]; /* variable size */
106 static ZSTDMT_bufferPool
* ZSTDMT_createBufferPool(unsigned nbWorkers
, ZSTD_customMem cMem
)
108 unsigned const maxNbBuffers
= 2*nbWorkers
+ 3;
109 ZSTDMT_bufferPool
* const bufPool
= (ZSTDMT_bufferPool
*)ZSTD_calloc(
110 sizeof(ZSTDMT_bufferPool
) + (maxNbBuffers
-1) * sizeof(buffer_t
), cMem
);
111 if (bufPool
==NULL
) return NULL
;
112 if (ZSTD_pthread_mutex_init(&bufPool
->poolMutex
, NULL
)) {
113 ZSTD_free(bufPool
, cMem
);
116 bufPool
->bufferSize
= 64 KB
;
117 bufPool
->totalBuffers
= maxNbBuffers
;
118 bufPool
->nbBuffers
= 0;
119 bufPool
->cMem
= cMem
;
123 static void ZSTDMT_freeBufferPool(ZSTDMT_bufferPool
* bufPool
)
126 DEBUGLOG(3, "ZSTDMT_freeBufferPool (address:%08X)", (U32
)(size_t)bufPool
);
127 if (!bufPool
) return; /* compatibility with free on NULL */
128 for (u
=0; u
<bufPool
->totalBuffers
; u
++) {
129 DEBUGLOG(4, "free buffer %2u (address:%08X)", u
, (U32
)(size_t)bufPool
->bTable
[u
].start
);
130 ZSTD_free(bufPool
->bTable
[u
].start
, bufPool
->cMem
);
132 ZSTD_pthread_mutex_destroy(&bufPool
->poolMutex
);
133 ZSTD_free(bufPool
, bufPool
->cMem
);
136 /* only works at initialization, not during compression */
137 static size_t ZSTDMT_sizeof_bufferPool(ZSTDMT_bufferPool
* bufPool
)
139 size_t const poolSize
= sizeof(*bufPool
)
140 + (bufPool
->totalBuffers
- 1) * sizeof(buffer_t
);
142 size_t totalBufferSize
= 0;
143 ZSTD_pthread_mutex_lock(&bufPool
->poolMutex
);
144 for (u
=0; u
<bufPool
->totalBuffers
; u
++)
145 totalBufferSize
+= bufPool
->bTable
[u
].capacity
;
146 ZSTD_pthread_mutex_unlock(&bufPool
->poolMutex
);
148 return poolSize
+ totalBufferSize
;
151 /* ZSTDMT_setBufferSize() :
152 * all future buffers provided by this buffer pool will have _at least_ this size
153 * note : it's better for all buffers to have same size,
154 * as they become freely interchangeable, reducing malloc/free usages and memory fragmentation */
155 static void ZSTDMT_setBufferSize(ZSTDMT_bufferPool
* const bufPool
, size_t const bSize
)
157 ZSTD_pthread_mutex_lock(&bufPool
->poolMutex
);
158 DEBUGLOG(4, "ZSTDMT_setBufferSize: bSize = %u", (U32
)bSize
);
159 bufPool
->bufferSize
= bSize
;
160 ZSTD_pthread_mutex_unlock(&bufPool
->poolMutex
);
164 static ZSTDMT_bufferPool
* ZSTDMT_expandBufferPool(ZSTDMT_bufferPool
* srcBufPool
, U32 nbWorkers
)
166 unsigned const maxNbBuffers
= 2*nbWorkers
+ 3;
167 if (srcBufPool
==NULL
) return NULL
;
168 if (srcBufPool
->totalBuffers
>= maxNbBuffers
) /* good enough */
170 /* need a larger buffer pool */
171 { ZSTD_customMem
const cMem
= srcBufPool
->cMem
;
172 size_t const bSize
= srcBufPool
->bufferSize
; /* forward parameters */
173 ZSTDMT_bufferPool
* newBufPool
;
174 ZSTDMT_freeBufferPool(srcBufPool
);
175 newBufPool
= ZSTDMT_createBufferPool(nbWorkers
, cMem
);
176 if (newBufPool
==NULL
) return newBufPool
;
177 ZSTDMT_setBufferSize(newBufPool
, bSize
);
182 /** ZSTDMT_getBuffer() :
183 * assumption : bufPool must be valid
184 * @return : a buffer, with start pointer and size
185 * note: allocation may fail, in this case, start==NULL and size==0 */
186 static buffer_t
ZSTDMT_getBuffer(ZSTDMT_bufferPool
* bufPool
)
188 size_t const bSize
= bufPool
->bufferSize
;
189 DEBUGLOG(5, "ZSTDMT_getBuffer: bSize = %u", (U32
)bufPool
->bufferSize
);
190 ZSTD_pthread_mutex_lock(&bufPool
->poolMutex
);
191 if (bufPool
->nbBuffers
) { /* try to use an existing buffer */
192 buffer_t
const buf
= bufPool
->bTable
[--(bufPool
->nbBuffers
)];
193 size_t const availBufferSize
= buf
.capacity
;
194 bufPool
->bTable
[bufPool
->nbBuffers
] = g_nullBuffer
;
195 if ((availBufferSize
>= bSize
) & ((availBufferSize
>>3) <= bSize
)) {
196 /* large enough, but not too much */
197 DEBUGLOG(5, "ZSTDMT_getBuffer: provide buffer %u of size %u",
198 bufPool
->nbBuffers
, (U32
)buf
.capacity
);
199 ZSTD_pthread_mutex_unlock(&bufPool
->poolMutex
);
202 /* size conditions not respected : scratch this buffer, create new one */
203 DEBUGLOG(5, "ZSTDMT_getBuffer: existing buffer does not meet size conditions => freeing");
204 ZSTD_free(buf
.start
, bufPool
->cMem
);
206 ZSTD_pthread_mutex_unlock(&bufPool
->poolMutex
);
207 /* create new buffer */
208 DEBUGLOG(5, "ZSTDMT_getBuffer: create a new buffer");
210 void* const start
= ZSTD_malloc(bSize
, bufPool
->cMem
);
211 buffer
.start
= start
; /* note : start can be NULL if malloc fails ! */
212 buffer
.capacity
= (start
==NULL
) ? 0 : bSize
;
214 DEBUGLOG(5, "ZSTDMT_getBuffer: buffer allocation failure !!");
216 DEBUGLOG(5, "ZSTDMT_getBuffer: created buffer of size %u", (U32
)bSize
);
222 #if ZSTD_RESIZE_SEQPOOL
223 /** ZSTDMT_resizeBuffer() :
224 * assumption : bufPool must be valid
225 * @return : a buffer that is at least the buffer pool buffer size.
226 * If a reallocation happens, the data in the input buffer is copied.
228 static buffer_t
ZSTDMT_resizeBuffer(ZSTDMT_bufferPool
* bufPool
, buffer_t buffer
)
230 size_t const bSize
= bufPool
->bufferSize
;
231 if (buffer
.capacity
< bSize
) {
232 void* const start
= ZSTD_malloc(bSize
, bufPool
->cMem
);
234 newBuffer
.start
= start
;
235 newBuffer
.capacity
= start
== NULL
? 0 : bSize
;
237 assert(newBuffer
.capacity
>= buffer
.capacity
);
238 memcpy(newBuffer
.start
, buffer
.start
, buffer
.capacity
);
239 DEBUGLOG(5, "ZSTDMT_resizeBuffer: created buffer of size %u", (U32
)bSize
);
242 DEBUGLOG(5, "ZSTDMT_resizeBuffer: buffer allocation failure !!");
248 /* store buffer for later re-use, up to pool capacity */
249 static void ZSTDMT_releaseBuffer(ZSTDMT_bufferPool
* bufPool
, buffer_t buf
)
251 DEBUGLOG(5, "ZSTDMT_releaseBuffer");
252 if (buf
.start
== NULL
) return; /* compatible with release on NULL */
253 ZSTD_pthread_mutex_lock(&bufPool
->poolMutex
);
254 if (bufPool
->nbBuffers
< bufPool
->totalBuffers
) {
255 bufPool
->bTable
[bufPool
->nbBuffers
++] = buf
; /* stored for later use */
256 DEBUGLOG(5, "ZSTDMT_releaseBuffer: stored buffer of size %u in slot %u",
257 (U32
)buf
.capacity
, (U32
)(bufPool
->nbBuffers
-1));
258 ZSTD_pthread_mutex_unlock(&bufPool
->poolMutex
);
261 ZSTD_pthread_mutex_unlock(&bufPool
->poolMutex
);
262 /* Reached bufferPool capacity (should not happen) */
263 DEBUGLOG(5, "ZSTDMT_releaseBuffer: pool capacity reached => freeing ");
264 ZSTD_free(buf
.start
, bufPool
->cMem
);
268 /* ===== Seq Pool Wrapper ====== */
270 static rawSeqStore_t kNullRawSeqStore
= {NULL
, 0, 0, 0};
272 typedef ZSTDMT_bufferPool ZSTDMT_seqPool
;
274 static size_t ZSTDMT_sizeof_seqPool(ZSTDMT_seqPool
* seqPool
)
276 return ZSTDMT_sizeof_bufferPool(seqPool
);
279 static rawSeqStore_t
bufferToSeq(buffer_t buffer
)
281 rawSeqStore_t seq
= {NULL
, 0, 0, 0};
282 seq
.seq
= (rawSeq
*)buffer
.start
;
283 seq
.capacity
= buffer
.capacity
/ sizeof(rawSeq
);
287 static buffer_t
seqToBuffer(rawSeqStore_t seq
)
290 buffer
.start
= seq
.seq
;
291 buffer
.capacity
= seq
.capacity
* sizeof(rawSeq
);
295 static rawSeqStore_t
ZSTDMT_getSeq(ZSTDMT_seqPool
* seqPool
)
297 if (seqPool
->bufferSize
== 0) {
298 return kNullRawSeqStore
;
300 return bufferToSeq(ZSTDMT_getBuffer(seqPool
));
303 #if ZSTD_RESIZE_SEQPOOL
304 static rawSeqStore_t
ZSTDMT_resizeSeq(ZSTDMT_seqPool
* seqPool
, rawSeqStore_t seq
)
306 return bufferToSeq(ZSTDMT_resizeBuffer(seqPool
, seqToBuffer(seq
)));
310 static void ZSTDMT_releaseSeq(ZSTDMT_seqPool
* seqPool
, rawSeqStore_t seq
)
312 ZSTDMT_releaseBuffer(seqPool
, seqToBuffer(seq
));
315 static void ZSTDMT_setNbSeq(ZSTDMT_seqPool
* const seqPool
, size_t const nbSeq
)
317 ZSTDMT_setBufferSize(seqPool
, nbSeq
* sizeof(rawSeq
));
320 static ZSTDMT_seqPool
* ZSTDMT_createSeqPool(unsigned nbWorkers
, ZSTD_customMem cMem
)
322 ZSTDMT_seqPool
* const seqPool
= ZSTDMT_createBufferPool(nbWorkers
, cMem
);
323 if (seqPool
== NULL
) return NULL
;
324 ZSTDMT_setNbSeq(seqPool
, 0);
328 static void ZSTDMT_freeSeqPool(ZSTDMT_seqPool
* seqPool
)
330 ZSTDMT_freeBufferPool(seqPool
);
333 static ZSTDMT_seqPool
* ZSTDMT_expandSeqPool(ZSTDMT_seqPool
* pool
, U32 nbWorkers
)
335 return ZSTDMT_expandBufferPool(pool
, nbWorkers
);
339 /* ===== CCtx Pool ===== */
340 /* a single CCtx Pool can be invoked from multiple threads in parallel */
343 ZSTD_pthread_mutex_t poolMutex
;
347 ZSTD_CCtx
* cctx
[1]; /* variable size */
350 /* note : all CCtx borrowed from the pool should be released back to the pool _before_ freeing the pool */
351 static void ZSTDMT_freeCCtxPool(ZSTDMT_CCtxPool
* pool
)
354 for (cid
=0; cid
<pool
->totalCCtx
; cid
++)
355 ZSTD_freeCCtx(pool
->cctx
[cid
]); /* note : compatible with free on NULL */
356 ZSTD_pthread_mutex_destroy(&pool
->poolMutex
);
357 ZSTD_free(pool
, pool
->cMem
);
360 /* ZSTDMT_createCCtxPool() :
361 * implies nbWorkers >= 1 , checked by caller ZSTDMT_createCCtx() */
362 static ZSTDMT_CCtxPool
* ZSTDMT_createCCtxPool(int nbWorkers
,
365 ZSTDMT_CCtxPool
* const cctxPool
= (ZSTDMT_CCtxPool
*) ZSTD_calloc(
366 sizeof(ZSTDMT_CCtxPool
) + (nbWorkers
-1)*sizeof(ZSTD_CCtx
*), cMem
);
367 assert(nbWorkers
> 0);
368 if (!cctxPool
) return NULL
;
369 if (ZSTD_pthread_mutex_init(&cctxPool
->poolMutex
, NULL
)) {
370 ZSTD_free(cctxPool
, cMem
);
373 cctxPool
->cMem
= cMem
;
374 cctxPool
->totalCCtx
= nbWorkers
;
375 cctxPool
->availCCtx
= 1; /* at least one cctx for single-thread mode */
376 cctxPool
->cctx
[0] = ZSTD_createCCtx_advanced(cMem
);
377 if (!cctxPool
->cctx
[0]) { ZSTDMT_freeCCtxPool(cctxPool
); return NULL
; }
378 DEBUGLOG(3, "cctxPool created, with %u workers", nbWorkers
);
382 static ZSTDMT_CCtxPool
* ZSTDMT_expandCCtxPool(ZSTDMT_CCtxPool
* srcPool
,
385 if (srcPool
==NULL
) return NULL
;
386 if (nbWorkers
<= srcPool
->totalCCtx
) return srcPool
; /* good enough */
387 /* need a larger cctx pool */
388 { ZSTD_customMem
const cMem
= srcPool
->cMem
;
389 ZSTDMT_freeCCtxPool(srcPool
);
390 return ZSTDMT_createCCtxPool(nbWorkers
, cMem
);
394 /* only works during initialization phase, not during compression */
395 static size_t ZSTDMT_sizeof_CCtxPool(ZSTDMT_CCtxPool
* cctxPool
)
397 ZSTD_pthread_mutex_lock(&cctxPool
->poolMutex
);
398 { unsigned const nbWorkers
= cctxPool
->totalCCtx
;
399 size_t const poolSize
= sizeof(*cctxPool
)
400 + (nbWorkers
-1) * sizeof(ZSTD_CCtx
*);
402 size_t totalCCtxSize
= 0;
403 for (u
=0; u
<nbWorkers
; u
++) {
404 totalCCtxSize
+= ZSTD_sizeof_CCtx(cctxPool
->cctx
[u
]);
406 ZSTD_pthread_mutex_unlock(&cctxPool
->poolMutex
);
407 assert(nbWorkers
> 0);
408 return poolSize
+ totalCCtxSize
;
412 static ZSTD_CCtx
* ZSTDMT_getCCtx(ZSTDMT_CCtxPool
* cctxPool
)
414 DEBUGLOG(5, "ZSTDMT_getCCtx");
415 ZSTD_pthread_mutex_lock(&cctxPool
->poolMutex
);
416 if (cctxPool
->availCCtx
) {
417 cctxPool
->availCCtx
--;
418 { ZSTD_CCtx
* const cctx
= cctxPool
->cctx
[cctxPool
->availCCtx
];
419 ZSTD_pthread_mutex_unlock(&cctxPool
->poolMutex
);
422 ZSTD_pthread_mutex_unlock(&cctxPool
->poolMutex
);
423 DEBUGLOG(5, "create one more CCtx");
424 return ZSTD_createCCtx_advanced(cctxPool
->cMem
); /* note : can be NULL, when creation fails ! */
427 static void ZSTDMT_releaseCCtx(ZSTDMT_CCtxPool
* pool
, ZSTD_CCtx
* cctx
)
429 if (cctx
==NULL
) return; /* compatibility with release on NULL */
430 ZSTD_pthread_mutex_lock(&pool
->poolMutex
);
431 if (pool
->availCCtx
< pool
->totalCCtx
)
432 pool
->cctx
[pool
->availCCtx
++] = cctx
;
434 /* pool overflow : should not happen, since totalCCtx==nbWorkers */
435 DEBUGLOG(4, "CCtx pool overflow : free cctx");
438 ZSTD_pthread_mutex_unlock(&pool
->poolMutex
);
441 /* ==== Serial State ==== */
449 /* All variables in the struct are protected by mutex. */
450 ZSTD_pthread_mutex_t mutex
;
451 ZSTD_pthread_cond_t cond
;
452 ZSTD_CCtx_params params
;
454 XXH64_state_t xxhState
;
456 /* Protects ldmWindow.
457 * Must be acquired after the main mutex when acquiring both.
459 ZSTD_pthread_mutex_t ldmWindowMutex
;
460 ZSTD_pthread_cond_t ldmWindowCond
; /* Signaled when ldmWindow is updated */
461 ZSTD_window_t ldmWindow
; /* A thread-safe copy of ldmState.window */
465 ZSTDMT_serialState_reset(serialState_t
* serialState
,
466 ZSTDMT_seqPool
* seqPool
,
467 ZSTD_CCtx_params params
,
469 const void* dict
, size_t const dictSize
,
470 ZSTD_dictContentType_e dictContentType
)
472 /* Adjust parameters */
473 if (params
.ldmParams
.enableLdm
) {
474 DEBUGLOG(4, "LDM window size = %u KB", (1U << params
.cParams
.windowLog
) >> 10);
475 ZSTD_ldm_adjustParameters(¶ms
.ldmParams
, ¶ms
.cParams
);
476 assert(params
.ldmParams
.hashLog
>= params
.ldmParams
.bucketSizeLog
);
477 assert(params
.ldmParams
.hashRateLog
< 32);
478 serialState
->ldmState
.hashPower
=
479 ZSTD_rollingHash_primePower(params
.ldmParams
.minMatchLength
);
481 memset(¶ms
.ldmParams
, 0, sizeof(params
.ldmParams
));
483 serialState
->nextJobID
= 0;
484 if (params
.fParams
.checksumFlag
)
485 XXH64_reset(&serialState
->xxhState
, 0);
486 if (params
.ldmParams
.enableLdm
) {
487 ZSTD_customMem cMem
= params
.customMem
;
488 unsigned const hashLog
= params
.ldmParams
.hashLog
;
489 size_t const hashSize
= ((size_t)1 << hashLog
) * sizeof(ldmEntry_t
);
490 unsigned const bucketLog
=
491 params
.ldmParams
.hashLog
- params
.ldmParams
.bucketSizeLog
;
492 size_t const bucketSize
= (size_t)1 << bucketLog
;
493 unsigned const prevBucketLog
=
494 serialState
->params
.ldmParams
.hashLog
-
495 serialState
->params
.ldmParams
.bucketSizeLog
;
496 /* Size the seq pool tables */
497 ZSTDMT_setNbSeq(seqPool
, ZSTD_ldm_getMaxNbSeq(params
.ldmParams
, jobSize
));
498 /* Reset the window */
499 ZSTD_window_init(&serialState
->ldmState
.window
);
500 /* Resize tables and output space if necessary. */
501 if (serialState
->ldmState
.hashTable
== NULL
|| serialState
->params
.ldmParams
.hashLog
< hashLog
) {
502 ZSTD_free(serialState
->ldmState
.hashTable
, cMem
);
503 serialState
->ldmState
.hashTable
= (ldmEntry_t
*)ZSTD_malloc(hashSize
, cMem
);
505 if (serialState
->ldmState
.bucketOffsets
== NULL
|| prevBucketLog
< bucketLog
) {
506 ZSTD_free(serialState
->ldmState
.bucketOffsets
, cMem
);
507 serialState
->ldmState
.bucketOffsets
= (BYTE
*)ZSTD_malloc(bucketSize
, cMem
);
509 if (!serialState
->ldmState
.hashTable
|| !serialState
->ldmState
.bucketOffsets
)
511 /* Zero the tables */
512 memset(serialState
->ldmState
.hashTable
, 0, hashSize
);
513 memset(serialState
->ldmState
.bucketOffsets
, 0, bucketSize
);
515 /* Update window state and fill hash table with dict */
516 serialState
->ldmState
.loadedDictEnd
= 0;
518 if (dictContentType
== ZSTD_dct_rawContent
) {
519 BYTE
const* const dictEnd
= (const BYTE
*)dict
+ dictSize
;
520 ZSTD_window_update(&serialState
->ldmState
.window
, dict
, dictSize
);
521 ZSTD_ldm_fillHashTable(&serialState
->ldmState
, (const BYTE
*)dict
, dictEnd
, ¶ms
.ldmParams
);
522 serialState
->ldmState
.loadedDictEnd
= params
.forceWindow
? 0 : (U32
)(dictEnd
- serialState
->ldmState
.window
.base
);
524 /* don't even load anything */
528 /* Initialize serialState's copy of ldmWindow. */
529 serialState
->ldmWindow
= serialState
->ldmState
.window
;
532 serialState
->params
= params
;
533 serialState
->params
.jobSize
= (U32
)jobSize
;
537 static int ZSTDMT_serialState_init(serialState_t
* serialState
)
540 memset(serialState
, 0, sizeof(*serialState
));
541 initError
|= ZSTD_pthread_mutex_init(&serialState
->mutex
, NULL
);
542 initError
|= ZSTD_pthread_cond_init(&serialState
->cond
, NULL
);
543 initError
|= ZSTD_pthread_mutex_init(&serialState
->ldmWindowMutex
, NULL
);
544 initError
|= ZSTD_pthread_cond_init(&serialState
->ldmWindowCond
, NULL
);
548 static void ZSTDMT_serialState_free(serialState_t
* serialState
)
550 ZSTD_customMem cMem
= serialState
->params
.customMem
;
551 ZSTD_pthread_mutex_destroy(&serialState
->mutex
);
552 ZSTD_pthread_cond_destroy(&serialState
->cond
);
553 ZSTD_pthread_mutex_destroy(&serialState
->ldmWindowMutex
);
554 ZSTD_pthread_cond_destroy(&serialState
->ldmWindowCond
);
555 ZSTD_free(serialState
->ldmState
.hashTable
, cMem
);
556 ZSTD_free(serialState
->ldmState
.bucketOffsets
, cMem
);
559 static void ZSTDMT_serialState_update(serialState_t
* serialState
,
560 ZSTD_CCtx
* jobCCtx
, rawSeqStore_t seqStore
,
561 range_t src
, unsigned jobID
)
563 /* Wait for our turn */
564 ZSTD_PTHREAD_MUTEX_LOCK(&serialState
->mutex
);
565 while (serialState
->nextJobID
< jobID
) {
566 DEBUGLOG(5, "wait for serialState->cond");
567 ZSTD_pthread_cond_wait(&serialState
->cond
, &serialState
->mutex
);
569 /* A future job may error and skip our job */
570 if (serialState
->nextJobID
== jobID
) {
571 /* It is now our turn, do any processing necessary */
572 if (serialState
->params
.ldmParams
.enableLdm
) {
574 assert(seqStore
.seq
!= NULL
&& seqStore
.pos
== 0 &&
575 seqStore
.size
== 0 && seqStore
.capacity
> 0);
576 assert(src
.size
<= serialState
->params
.jobSize
);
577 ZSTD_window_update(&serialState
->ldmState
.window
, src
.start
, src
.size
);
578 error
= ZSTD_ldm_generateSequences(
579 &serialState
->ldmState
, &seqStore
,
580 &serialState
->params
.ldmParams
, src
.start
, src
.size
);
581 /* We provide a large enough buffer to never fail. */
582 assert(!ZSTD_isError(error
)); (void)error
;
583 /* Update ldmWindow to match the ldmState.window and signal the main
584 * thread if it is waiting for a buffer.
586 ZSTD_PTHREAD_MUTEX_LOCK(&serialState
->ldmWindowMutex
);
587 serialState
->ldmWindow
= serialState
->ldmState
.window
;
588 ZSTD_pthread_cond_signal(&serialState
->ldmWindowCond
);
589 ZSTD_pthread_mutex_unlock(&serialState
->ldmWindowMutex
);
591 if (serialState
->params
.fParams
.checksumFlag
&& src
.size
> 0)
592 XXH64_update(&serialState
->xxhState
, src
.start
, src
.size
);
594 /* Now it is the next jobs turn */
595 serialState
->nextJobID
++;
596 ZSTD_pthread_cond_broadcast(&serialState
->cond
);
597 ZSTD_pthread_mutex_unlock(&serialState
->mutex
);
599 if (seqStore
.size
> 0) {
600 size_t const err
= ZSTD_referenceExternalSequences(
601 jobCCtx
, seqStore
.seq
, seqStore
.size
);
602 assert(serialState
->params
.ldmParams
.enableLdm
);
603 assert(!ZSTD_isError(err
));
608 static void ZSTDMT_serialState_ensureFinished(serialState_t
* serialState
,
609 unsigned jobID
, size_t cSize
)
611 ZSTD_PTHREAD_MUTEX_LOCK(&serialState
->mutex
);
612 if (serialState
->nextJobID
<= jobID
) {
613 assert(ZSTD_isError(cSize
)); (void)cSize
;
614 DEBUGLOG(5, "Skipping past job %u because of error", jobID
);
615 serialState
->nextJobID
= jobID
+ 1;
616 ZSTD_pthread_cond_broadcast(&serialState
->cond
);
618 ZSTD_PTHREAD_MUTEX_LOCK(&serialState
->ldmWindowMutex
);
619 ZSTD_window_clear(&serialState
->ldmWindow
);
620 ZSTD_pthread_cond_signal(&serialState
->ldmWindowCond
);
621 ZSTD_pthread_mutex_unlock(&serialState
->ldmWindowMutex
);
623 ZSTD_pthread_mutex_unlock(&serialState
->mutex
);
628 /* ------------------------------------------ */
629 /* ===== Worker thread ===== */
630 /* ------------------------------------------ */
632 static const range_t kNullRange
= { NULL
, 0 };
635 size_t consumed
; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx */
636 size_t cSize
; /* SHARED - set0 by mtctx, then modified by worker AND read by mtctx, then set0 by mtctx */
637 ZSTD_pthread_mutex_t job_mutex
; /* Thread-safe - used by mtctx and worker */
638 ZSTD_pthread_cond_t job_cond
; /* Thread-safe - used by mtctx and worker */
639 ZSTDMT_CCtxPool
* cctxPool
; /* Thread-safe - used by mtctx and (all) workers */
640 ZSTDMT_bufferPool
* bufPool
; /* Thread-safe - used by mtctx and (all) workers */
641 ZSTDMT_seqPool
* seqPool
; /* Thread-safe - used by mtctx and (all) workers */
642 serialState_t
* serial
; /* Thread-safe - used by mtctx and (all) workers */
643 buffer_t dstBuff
; /* set by worker (or mtctx), then read by worker & mtctx, then modified by mtctx => no barrier */
644 range_t prefix
; /* set by mtctx, then read by worker & mtctx => no barrier */
645 range_t src
; /* set by mtctx, then read by worker & mtctx => no barrier */
646 unsigned jobID
; /* set by mtctx, then read by worker => no barrier */
647 unsigned firstJob
; /* set by mtctx, then read by worker => no barrier */
648 unsigned lastJob
; /* set by mtctx, then read by worker => no barrier */
649 ZSTD_CCtx_params params
; /* set by mtctx, then read by worker => no barrier */
650 const ZSTD_CDict
* cdict
; /* set by mtctx, then read by worker => no barrier */
651 unsigned long long fullFrameSize
; /* set by mtctx, then read by worker => no barrier */
652 size_t dstFlushed
; /* used only by mtctx */
653 unsigned frameChecksumNeeded
; /* used only by mtctx */
654 } ZSTDMT_jobDescription
;
656 #define JOB_ERROR(e) { \
657 ZSTD_PTHREAD_MUTEX_LOCK(&job->job_mutex); \
659 ZSTD_pthread_mutex_unlock(&job->job_mutex); \
663 /* ZSTDMT_compressionJob() is a POOL_function type */
664 static void ZSTDMT_compressionJob(void* jobDescription
)
666 ZSTDMT_jobDescription
* const job
= (ZSTDMT_jobDescription
*)jobDescription
;
667 ZSTD_CCtx_params jobParams
= job
->params
; /* do not modify job->params ! copy it, modify the copy */
668 ZSTD_CCtx
* const cctx
= ZSTDMT_getCCtx(job
->cctxPool
);
669 rawSeqStore_t rawSeqStore
= ZSTDMT_getSeq(job
->seqPool
);
670 buffer_t dstBuff
= job
->dstBuff
;
671 size_t lastCBlockSize
= 0;
674 if (cctx
==NULL
) JOB_ERROR(ERROR(memory_allocation
));
675 if (dstBuff
.start
== NULL
) { /* streaming job : doesn't provide a dstBuffer */
676 dstBuff
= ZSTDMT_getBuffer(job
->bufPool
);
677 if (dstBuff
.start
==NULL
) JOB_ERROR(ERROR(memory_allocation
));
678 job
->dstBuff
= dstBuff
; /* this value can be read in ZSTDMT_flush, when it copies the whole job */
680 if (jobParams
.ldmParams
.enableLdm
&& rawSeqStore
.seq
== NULL
)
681 JOB_ERROR(ERROR(memory_allocation
));
683 /* Don't compute the checksum for chunks, since we compute it externally,
684 * but write it in the header.
686 if (job
->jobID
!= 0) jobParams
.fParams
.checksumFlag
= 0;
687 /* Don't run LDM for the chunks, since we handle it externally */
688 jobParams
.ldmParams
.enableLdm
= 0;
693 size_t const initError
= ZSTD_compressBegin_advanced_internal(cctx
, NULL
, 0, ZSTD_dct_auto
, ZSTD_dtlm_fast
, job
->cdict
, &jobParams
, job
->fullFrameSize
);
694 assert(job
->firstJob
); /* only allowed for first job */
695 if (ZSTD_isError(initError
)) JOB_ERROR(initError
);
696 } else { /* srcStart points at reloaded section */
697 U64
const pledgedSrcSize
= job
->firstJob
? job
->fullFrameSize
: job
->src
.size
;
698 { size_t const forceWindowError
= ZSTD_CCtxParams_setParameter(&jobParams
, ZSTD_c_forceMaxWindow
, !job
->firstJob
);
699 if (ZSTD_isError(forceWindowError
)) JOB_ERROR(forceWindowError
);
701 { size_t const initError
= ZSTD_compressBegin_advanced_internal(cctx
,
702 job
->prefix
.start
, job
->prefix
.size
, ZSTD_dct_rawContent
, /* load dictionary in "content-only" mode (no header analysis) */
705 &jobParams
, pledgedSrcSize
);
706 if (ZSTD_isError(initError
)) JOB_ERROR(initError
);
709 /* Perform serial step as early as possible, but after CCtx initialization */
710 ZSTDMT_serialState_update(job
->serial
, cctx
, rawSeqStore
, job
->src
, job
->jobID
);
712 if (!job
->firstJob
) { /* flush and overwrite frame header when it's not first job */
713 size_t const hSize
= ZSTD_compressContinue(cctx
, dstBuff
.start
, dstBuff
.capacity
, job
->src
.start
, 0);
714 if (ZSTD_isError(hSize
)) JOB_ERROR(hSize
);
715 DEBUGLOG(5, "ZSTDMT_compressionJob: flush and overwrite %u bytes of frame header (not first job)", (U32
)hSize
);
716 ZSTD_invalidateRepCodes(cctx
);
720 { size_t const chunkSize
= 4*ZSTD_BLOCKSIZE_MAX
;
721 int const nbChunks
= (int)((job
->src
.size
+ (chunkSize
-1)) / chunkSize
);
722 const BYTE
* ip
= (const BYTE
*) job
->src
.start
;
723 BYTE
* const ostart
= (BYTE
*)dstBuff
.start
;
725 BYTE
* oend
= op
+ dstBuff
.capacity
;
727 if (sizeof(size_t) > sizeof(int)) assert(job
->src
.size
< ((size_t)INT_MAX
) * chunkSize
); /* check overflow */
728 DEBUGLOG(5, "ZSTDMT_compressionJob: compress %u bytes in %i blocks", (U32
)job
->src
.size
, nbChunks
);
729 assert(job
->cSize
== 0);
730 for (chunkNb
= 1; chunkNb
< nbChunks
; chunkNb
++) {
731 size_t const cSize
= ZSTD_compressContinue(cctx
, op
, oend
-op
, ip
, chunkSize
);
732 if (ZSTD_isError(cSize
)) JOB_ERROR(cSize
);
734 op
+= cSize
; assert(op
< oend
);
736 ZSTD_PTHREAD_MUTEX_LOCK(&job
->job_mutex
);
738 job
->consumed
= chunkSize
* chunkNb
;
739 DEBUGLOG(5, "ZSTDMT_compressionJob: compress new block : cSize==%u bytes (total: %u)",
740 (U32
)cSize
, (U32
)job
->cSize
);
741 ZSTD_pthread_cond_signal(&job
->job_cond
); /* warns some more data is ready to be flushed */
742 ZSTD_pthread_mutex_unlock(&job
->job_mutex
);
745 assert(chunkSize
> 0);
746 assert((chunkSize
& (chunkSize
- 1)) == 0); /* chunkSize must be power of 2 for mask==(chunkSize-1) to work */
747 if ((nbChunks
> 0) | job
->lastJob
/*must output a "last block" flag*/ ) {
748 size_t const lastBlockSize1
= job
->src
.size
& (chunkSize
-1);
749 size_t const lastBlockSize
= ((lastBlockSize1
==0) & (job
->src
.size
>=chunkSize
)) ? chunkSize
: lastBlockSize1
;
750 size_t const cSize
= (job
->lastJob
) ?
751 ZSTD_compressEnd (cctx
, op
, oend
-op
, ip
, lastBlockSize
) :
752 ZSTD_compressContinue(cctx
, op
, oend
-op
, ip
, lastBlockSize
);
753 if (ZSTD_isError(cSize
)) JOB_ERROR(cSize
);
754 lastCBlockSize
= cSize
;
758 ZSTDMT_serialState_ensureFinished(job
->serial
, job
->jobID
, job
->cSize
);
759 if (job
->prefix
.size
> 0)
760 DEBUGLOG(5, "Finished with prefix: %zx", (size_t)job
->prefix
.start
);
761 DEBUGLOG(5, "Finished with source: %zx", (size_t)job
->src
.start
);
762 /* release resources */
763 ZSTDMT_releaseSeq(job
->seqPool
, rawSeqStore
);
764 ZSTDMT_releaseCCtx(job
->cctxPool
, cctx
);
766 ZSTD_PTHREAD_MUTEX_LOCK(&job
->job_mutex
);
767 if (ZSTD_isError(job
->cSize
)) assert(lastCBlockSize
== 0);
768 job
->cSize
+= lastCBlockSize
;
769 job
->consumed
= job
->src
.size
; /* when job->consumed == job->src.size , compression job is presumed completed */
770 ZSTD_pthread_cond_signal(&job
->job_cond
);
771 ZSTD_pthread_mutex_unlock(&job
->job_mutex
);
775 /* ------------------------------------------ */
776 /* ===== Multi-threaded compression ===== */
777 /* ------------------------------------------ */
780 range_t prefix
; /* read-only non-owned prefix buffer */
786 BYTE
* buffer
; /* The round input buffer. All jobs get references
787 * to pieces of the buffer. ZSTDMT_tryGetInputRange()
788 * handles handing out job input buffers, and makes
789 * sure it doesn't overlap with any pieces still in use.
791 size_t capacity
; /* The capacity of buffer. */
792 size_t pos
; /* The position of the current inBuff in the round
793 * buffer. Updated past the end if the inBuff once
794 * the inBuff is sent to the worker thread.
799 static const roundBuff_t kNullRoundBuff
= {NULL
, 0, 0};
801 #define RSYNC_LENGTH 32
809 struct ZSTDMT_CCtx_s
{
811 ZSTDMT_jobDescription
* jobs
;
812 ZSTDMT_bufferPool
* bufPool
;
813 ZSTDMT_CCtxPool
* cctxPool
;
814 ZSTDMT_seqPool
* seqPool
;
815 ZSTD_CCtx_params params
;
816 size_t targetSectionSize
;
817 size_t targetPrefixSize
;
818 int jobReady
; /* 1 => one job is already prepared, but pool has shortage of workers. Don't create a new job. */
820 roundBuff_t roundBuff
;
821 serialState_t serial
;
823 unsigned singleBlockingThread
;
828 unsigned allJobsCompleted
;
829 unsigned long long frameContentSize
;
830 unsigned long long consumed
;
831 unsigned long long produced
;
833 ZSTD_CDict
* cdictLocal
;
834 const ZSTD_CDict
* cdict
;
837 static void ZSTDMT_freeJobsTable(ZSTDMT_jobDescription
* jobTable
, U32 nbJobs
, ZSTD_customMem cMem
)
840 if (jobTable
== NULL
) return;
841 for (jobNb
=0; jobNb
<nbJobs
; jobNb
++) {
842 ZSTD_pthread_mutex_destroy(&jobTable
[jobNb
].job_mutex
);
843 ZSTD_pthread_cond_destroy(&jobTable
[jobNb
].job_cond
);
845 ZSTD_free(jobTable
, cMem
);
848 /* ZSTDMT_allocJobsTable()
849 * allocate and init a job table.
850 * update *nbJobsPtr to next power of 2 value, as size of table */
851 static ZSTDMT_jobDescription
* ZSTDMT_createJobsTable(U32
* nbJobsPtr
, ZSTD_customMem cMem
)
853 U32
const nbJobsLog2
= ZSTD_highbit32(*nbJobsPtr
) + 1;
854 U32
const nbJobs
= 1 << nbJobsLog2
;
856 ZSTDMT_jobDescription
* const jobTable
= (ZSTDMT_jobDescription
*)
857 ZSTD_calloc(nbJobs
* sizeof(ZSTDMT_jobDescription
), cMem
);
859 if (jobTable
==NULL
) return NULL
;
861 for (jobNb
=0; jobNb
<nbJobs
; jobNb
++) {
862 initError
|= ZSTD_pthread_mutex_init(&jobTable
[jobNb
].job_mutex
, NULL
);
863 initError
|= ZSTD_pthread_cond_init(&jobTable
[jobNb
].job_cond
, NULL
);
865 if (initError
!= 0) {
866 ZSTDMT_freeJobsTable(jobTable
, nbJobs
, cMem
);
872 static size_t ZSTDMT_expandJobsTable (ZSTDMT_CCtx
* mtctx
, U32 nbWorkers
) {
873 U32 nbJobs
= nbWorkers
+ 2;
874 if (nbJobs
> mtctx
->jobIDMask
+1) { /* need more job capacity */
875 ZSTDMT_freeJobsTable(mtctx
->jobs
, mtctx
->jobIDMask
+1, mtctx
->cMem
);
876 mtctx
->jobIDMask
= 0;
877 mtctx
->jobs
= ZSTDMT_createJobsTable(&nbJobs
, mtctx
->cMem
);
878 if (mtctx
->jobs
==NULL
) return ERROR(memory_allocation
);
879 assert((nbJobs
!= 0) && ((nbJobs
& (nbJobs
- 1)) == 0)); /* ensure nbJobs is a power of 2 */
880 mtctx
->jobIDMask
= nbJobs
- 1;
886 /* ZSTDMT_CCtxParam_setNbWorkers():
887 * Internal use only */
888 size_t ZSTDMT_CCtxParam_setNbWorkers(ZSTD_CCtx_params
* params
, unsigned nbWorkers
)
890 return ZSTD_CCtxParams_setParameter(params
, ZSTD_c_nbWorkers
, (int)nbWorkers
);
893 MEM_STATIC ZSTDMT_CCtx
* ZSTDMT_createCCtx_advanced_internal(unsigned nbWorkers
, ZSTD_customMem cMem
)
896 U32 nbJobs
= nbWorkers
+ 2;
898 DEBUGLOG(3, "ZSTDMT_createCCtx_advanced (nbWorkers = %u)", nbWorkers
);
900 if (nbWorkers
< 1) return NULL
;
901 nbWorkers
= MIN(nbWorkers
, ZSTDMT_NBWORKERS_MAX
);
902 if ((cMem
.customAlloc
!=NULL
) ^ (cMem
.customFree
!=NULL
))
903 /* invalid custom allocator */
906 mtctx
= (ZSTDMT_CCtx
*) ZSTD_calloc(sizeof(ZSTDMT_CCtx
), cMem
);
907 if (!mtctx
) return NULL
;
908 ZSTDMT_CCtxParam_setNbWorkers(&mtctx
->params
, nbWorkers
);
910 mtctx
->allJobsCompleted
= 1;
911 mtctx
->factory
= POOL_create_advanced(nbWorkers
, 0, cMem
);
912 mtctx
->jobs
= ZSTDMT_createJobsTable(&nbJobs
, cMem
);
913 assert(nbJobs
> 0); assert((nbJobs
& (nbJobs
- 1)) == 0); /* ensure nbJobs is a power of 2 */
914 mtctx
->jobIDMask
= nbJobs
- 1;
915 mtctx
->bufPool
= ZSTDMT_createBufferPool(nbWorkers
, cMem
);
916 mtctx
->cctxPool
= ZSTDMT_createCCtxPool(nbWorkers
, cMem
);
917 mtctx
->seqPool
= ZSTDMT_createSeqPool(nbWorkers
, cMem
);
918 initError
= ZSTDMT_serialState_init(&mtctx
->serial
);
919 mtctx
->roundBuff
= kNullRoundBuff
;
920 if (!mtctx
->factory
| !mtctx
->jobs
| !mtctx
->bufPool
| !mtctx
->cctxPool
| !mtctx
->seqPool
| initError
) {
921 ZSTDMT_freeCCtx(mtctx
);
924 DEBUGLOG(3, "mt_cctx created, for %u threads", nbWorkers
);
928 ZSTDMT_CCtx
* ZSTDMT_createCCtx_advanced(unsigned nbWorkers
, ZSTD_customMem cMem
)
930 #ifdef ZSTD_MULTITHREAD
931 return ZSTDMT_createCCtx_advanced_internal(nbWorkers
, cMem
);
939 ZSTDMT_CCtx
* ZSTDMT_createCCtx(unsigned nbWorkers
)
941 return ZSTDMT_createCCtx_advanced(nbWorkers
, ZSTD_defaultCMem
);
945 /* ZSTDMT_releaseAllJobResources() :
946 * note : ensure all workers are killed first ! */
947 static void ZSTDMT_releaseAllJobResources(ZSTDMT_CCtx
* mtctx
)
950 DEBUGLOG(3, "ZSTDMT_releaseAllJobResources");
951 for (jobID
=0; jobID
<= mtctx
->jobIDMask
; jobID
++) {
952 /* Copy the mutex/cond out */
953 ZSTD_pthread_mutex_t
const mutex
= mtctx
->jobs
[jobID
].job_mutex
;
954 ZSTD_pthread_cond_t
const cond
= mtctx
->jobs
[jobID
].job_cond
;
956 DEBUGLOG(4, "job%02u: release dst address %08X", jobID
, (U32
)(size_t)mtctx
->jobs
[jobID
].dstBuff
.start
);
957 ZSTDMT_releaseBuffer(mtctx
->bufPool
, mtctx
->jobs
[jobID
].dstBuff
);
959 /* Clear the job description, but keep the mutex/cond */
960 memset(&mtctx
->jobs
[jobID
], 0, sizeof(mtctx
->jobs
[jobID
]));
961 mtctx
->jobs
[jobID
].job_mutex
= mutex
;
962 mtctx
->jobs
[jobID
].job_cond
= cond
;
964 mtctx
->inBuff
.buffer
= g_nullBuffer
;
965 mtctx
->inBuff
.filled
= 0;
966 mtctx
->allJobsCompleted
= 1;
969 static void ZSTDMT_waitForAllJobsCompleted(ZSTDMT_CCtx
* mtctx
)
971 DEBUGLOG(4, "ZSTDMT_waitForAllJobsCompleted");
972 while (mtctx
->doneJobID
< mtctx
->nextJobID
) {
973 unsigned const jobID
= mtctx
->doneJobID
& mtctx
->jobIDMask
;
974 ZSTD_PTHREAD_MUTEX_LOCK(&mtctx
->jobs
[jobID
].job_mutex
);
975 while (mtctx
->jobs
[jobID
].consumed
< mtctx
->jobs
[jobID
].src
.size
) {
976 DEBUGLOG(4, "waiting for jobCompleted signal from job %u", mtctx
->doneJobID
); /* we want to block when waiting for data to flush */
977 ZSTD_pthread_cond_wait(&mtctx
->jobs
[jobID
].job_cond
, &mtctx
->jobs
[jobID
].job_mutex
);
979 ZSTD_pthread_mutex_unlock(&mtctx
->jobs
[jobID
].job_mutex
);
984 size_t ZSTDMT_freeCCtx(ZSTDMT_CCtx
* mtctx
)
986 if (mtctx
==NULL
) return 0; /* compatible with free on NULL */
987 POOL_free(mtctx
->factory
); /* stop and free worker threads */
988 ZSTDMT_releaseAllJobResources(mtctx
); /* release job resources into pools first */
989 ZSTDMT_freeJobsTable(mtctx
->jobs
, mtctx
->jobIDMask
+1, mtctx
->cMem
);
990 ZSTDMT_freeBufferPool(mtctx
->bufPool
);
991 ZSTDMT_freeCCtxPool(mtctx
->cctxPool
);
992 ZSTDMT_freeSeqPool(mtctx
->seqPool
);
993 ZSTDMT_serialState_free(&mtctx
->serial
);
994 ZSTD_freeCDict(mtctx
->cdictLocal
);
995 if (mtctx
->roundBuff
.buffer
)
996 ZSTD_free(mtctx
->roundBuff
.buffer
, mtctx
->cMem
);
997 ZSTD_free(mtctx
, mtctx
->cMem
);
1001 size_t ZSTDMT_sizeof_CCtx(ZSTDMT_CCtx
* mtctx
)
1003 if (mtctx
== NULL
) return 0; /* supports sizeof NULL */
1004 return sizeof(*mtctx
)
1005 + POOL_sizeof(mtctx
->factory
)
1006 + ZSTDMT_sizeof_bufferPool(mtctx
->bufPool
)
1007 + (mtctx
->jobIDMask
+1) * sizeof(ZSTDMT_jobDescription
)
1008 + ZSTDMT_sizeof_CCtxPool(mtctx
->cctxPool
)
1009 + ZSTDMT_sizeof_seqPool(mtctx
->seqPool
)
1010 + ZSTD_sizeof_CDict(mtctx
->cdictLocal
)
1011 + mtctx
->roundBuff
.capacity
;
1016 ZSTDMT_CCtxParam_setMTCtxParameter(ZSTD_CCtx_params
* params
,
1017 ZSTDMT_parameter parameter
,
1020 DEBUGLOG(4, "ZSTDMT_CCtxParam_setMTCtxParameter");
1023 case ZSTDMT_p_jobSize
:
1024 DEBUGLOG(4, "ZSTDMT_CCtxParam_setMTCtxParameter : set jobSize to %i", value
);
1025 return ZSTD_CCtxParams_setParameter(params
, ZSTD_c_jobSize
, value
);
1026 case ZSTDMT_p_overlapLog
:
1027 DEBUGLOG(4, "ZSTDMT_p_overlapLog : %i", value
);
1028 return ZSTD_CCtxParams_setParameter(params
, ZSTD_c_overlapLog
, value
);
1029 case ZSTDMT_p_rsyncable
:
1030 DEBUGLOG(4, "ZSTD_p_rsyncable : %i", value
);
1031 return ZSTD_CCtxParams_setParameter(params
, ZSTD_c_rsyncable
, value
);
1033 return ERROR(parameter_unsupported
);
1037 size_t ZSTDMT_setMTCtxParameter(ZSTDMT_CCtx
* mtctx
, ZSTDMT_parameter parameter
, int value
)
1039 DEBUGLOG(4, "ZSTDMT_setMTCtxParameter");
1040 return ZSTDMT_CCtxParam_setMTCtxParameter(&mtctx
->params
, parameter
, value
);
1043 size_t ZSTDMT_getMTCtxParameter(ZSTDMT_CCtx
* mtctx
, ZSTDMT_parameter parameter
, int* value
)
1045 switch (parameter
) {
1046 case ZSTDMT_p_jobSize
:
1047 return ZSTD_CCtxParams_getParameter(&mtctx
->params
, ZSTD_c_jobSize
, value
);
1048 case ZSTDMT_p_overlapLog
:
1049 return ZSTD_CCtxParams_getParameter(&mtctx
->params
, ZSTD_c_overlapLog
, value
);
1050 case ZSTDMT_p_rsyncable
:
1051 return ZSTD_CCtxParams_getParameter(&mtctx
->params
, ZSTD_c_rsyncable
, value
);
1053 return ERROR(parameter_unsupported
);
1057 /* Sets parameters relevant to the compression job,
1058 * initializing others to default values. */
1059 static ZSTD_CCtx_params
ZSTDMT_initJobCCtxParams(const ZSTD_CCtx_params
* params
)
1061 ZSTD_CCtx_params jobParams
= *params
;
1062 /* Clear parameters related to multithreading */
1063 jobParams
.forceWindow
= 0;
1064 jobParams
.nbWorkers
= 0;
1065 jobParams
.jobSize
= 0;
1066 jobParams
.overlapLog
= 0;
1067 jobParams
.rsyncable
= 0;
1068 memset(&jobParams
.ldmParams
, 0, sizeof(ldmParams_t
));
1069 memset(&jobParams
.customMem
, 0, sizeof(ZSTD_customMem
));
1074 /* ZSTDMT_resize() :
1075 * @return : error code if fails, 0 on success */
1076 static size_t ZSTDMT_resize(ZSTDMT_CCtx
* mtctx
, unsigned nbWorkers
)
1078 if (POOL_resize(mtctx
->factory
, nbWorkers
)) return ERROR(memory_allocation
);
1079 FORWARD_IF_ERROR( ZSTDMT_expandJobsTable(mtctx
, nbWorkers
) , "");
1080 mtctx
->bufPool
= ZSTDMT_expandBufferPool(mtctx
->bufPool
, nbWorkers
);
1081 if (mtctx
->bufPool
== NULL
) return ERROR(memory_allocation
);
1082 mtctx
->cctxPool
= ZSTDMT_expandCCtxPool(mtctx
->cctxPool
, nbWorkers
);
1083 if (mtctx
->cctxPool
== NULL
) return ERROR(memory_allocation
);
1084 mtctx
->seqPool
= ZSTDMT_expandSeqPool(mtctx
->seqPool
, nbWorkers
);
1085 if (mtctx
->seqPool
== NULL
) return ERROR(memory_allocation
);
1086 ZSTDMT_CCtxParam_setNbWorkers(&mtctx
->params
, nbWorkers
);
1091 /*! ZSTDMT_updateCParams_whileCompressing() :
1092 * Updates a selected set of compression parameters, remaining compatible with currently active frame.
1093 * New parameters will be applied to next compression job. */
1094 void ZSTDMT_updateCParams_whileCompressing(ZSTDMT_CCtx
* mtctx
, const ZSTD_CCtx_params
* cctxParams
)
1096 U32
const saved_wlog
= mtctx
->params
.cParams
.windowLog
; /* Do not modify windowLog while compressing */
1097 int const compressionLevel
= cctxParams
->compressionLevel
;
1098 DEBUGLOG(5, "ZSTDMT_updateCParams_whileCompressing (level:%i)",
1100 mtctx
->params
.compressionLevel
= compressionLevel
;
1101 { ZSTD_compressionParameters cParams
= ZSTD_getCParamsFromCCtxParams(cctxParams
, ZSTD_CONTENTSIZE_UNKNOWN
, 0);
1102 cParams
.windowLog
= saved_wlog
;
1103 mtctx
->params
.cParams
= cParams
;
1107 /* ZSTDMT_getFrameProgression():
1108 * tells how much data has been consumed (input) and produced (output) for current frame.
1109 * able to count progression inside worker threads.
1110 * Note : mutex will be acquired during statistics collection inside workers. */
1111 ZSTD_frameProgression
ZSTDMT_getFrameProgression(ZSTDMT_CCtx
* mtctx
)
1113 ZSTD_frameProgression fps
;
1114 DEBUGLOG(5, "ZSTDMT_getFrameProgression");
1115 fps
.ingested
= mtctx
->consumed
+ mtctx
->inBuff
.filled
;
1116 fps
.consumed
= mtctx
->consumed
;
1117 fps
.produced
= fps
.flushed
= mtctx
->produced
;
1118 fps
.currentJobID
= mtctx
->nextJobID
;
1119 fps
.nbActiveWorkers
= 0;
1121 unsigned lastJobNb
= mtctx
->nextJobID
+ mtctx
->jobReady
; assert(mtctx
->jobReady
<= 1);
1122 DEBUGLOG(6, "ZSTDMT_getFrameProgression: jobs: from %u to <%u (jobReady:%u)",
1123 mtctx
->doneJobID
, lastJobNb
, mtctx
->jobReady
)
1124 for (jobNb
= mtctx
->doneJobID
; jobNb
< lastJobNb
; jobNb
++) {
1125 unsigned const wJobID
= jobNb
& mtctx
->jobIDMask
;
1126 ZSTDMT_jobDescription
* jobPtr
= &mtctx
->jobs
[wJobID
];
1127 ZSTD_pthread_mutex_lock(&jobPtr
->job_mutex
);
1128 { size_t const cResult
= jobPtr
->cSize
;
1129 size_t const produced
= ZSTD_isError(cResult
) ? 0 : cResult
;
1130 size_t const flushed
= ZSTD_isError(cResult
) ? 0 : jobPtr
->dstFlushed
;
1131 assert(flushed
<= produced
);
1132 fps
.ingested
+= jobPtr
->src
.size
;
1133 fps
.consumed
+= jobPtr
->consumed
;
1134 fps
.produced
+= produced
;
1135 fps
.flushed
+= flushed
;
1136 fps
.nbActiveWorkers
+= (jobPtr
->consumed
< jobPtr
->src
.size
);
1138 ZSTD_pthread_mutex_unlock(&mtctx
->jobs
[wJobID
].job_mutex
);
1145 size_t ZSTDMT_toFlushNow(ZSTDMT_CCtx
* mtctx
)
1148 unsigned const jobID
= mtctx
->doneJobID
;
1149 assert(jobID
<= mtctx
->nextJobID
);
1150 if (jobID
== mtctx
->nextJobID
) return 0; /* no active job => nothing to flush */
1152 /* look into oldest non-fully-flushed job */
1153 { unsigned const wJobID
= jobID
& mtctx
->jobIDMask
;
1154 ZSTDMT_jobDescription
* const jobPtr
= &mtctx
->jobs
[wJobID
];
1155 ZSTD_pthread_mutex_lock(&jobPtr
->job_mutex
);
1156 { size_t const cResult
= jobPtr
->cSize
;
1157 size_t const produced
= ZSTD_isError(cResult
) ? 0 : cResult
;
1158 size_t const flushed
= ZSTD_isError(cResult
) ? 0 : jobPtr
->dstFlushed
;
1159 assert(flushed
<= produced
);
1160 assert(jobPtr
->consumed
<= jobPtr
->src
.size
);
1161 toFlush
= produced
- flushed
;
1162 /* if toFlush==0, nothing is available to flush.
1163 * However, jobID is expected to still be active:
1164 * if jobID was already completed and fully flushed,
1165 * ZSTDMT_flushProduced() should have already moved onto next job.
1166 * Therefore, some input has not yet been consumed. */
1168 assert(jobPtr
->consumed
< jobPtr
->src
.size
);
1171 ZSTD_pthread_mutex_unlock(&mtctx
->jobs
[wJobID
].job_mutex
);
1178 /* ------------------------------------------ */
1179 /* ===== Multi-threaded compression ===== */
1180 /* ------------------------------------------ */
1182 static unsigned ZSTDMT_computeTargetJobLog(const ZSTD_CCtx_params
* params
)
1185 if (params
->ldmParams
.enableLdm
) {
1186 /* In Long Range Mode, the windowLog is typically oversized.
1187 * In which case, it's preferable to determine the jobSize
1188 * based on chainLog instead. */
1189 jobLog
= MAX(21, params
->cParams
.chainLog
+ 4);
1191 jobLog
= MAX(20, params
->cParams
.windowLog
+ 2);
1193 return MIN(jobLog
, (unsigned)ZSTDMT_JOBLOG_MAX
);
1196 static int ZSTDMT_overlapLog_default(ZSTD_strategy strat
)
1217 static int ZSTDMT_overlapLog(int ovlog
, ZSTD_strategy strat
)
1219 assert(0 <= ovlog
&& ovlog
<= 9);
1220 if (ovlog
== 0) return ZSTDMT_overlapLog_default(strat
);
1224 static size_t ZSTDMT_computeOverlapSize(const ZSTD_CCtx_params
* params
)
1226 int const overlapRLog
= 9 - ZSTDMT_overlapLog(params
->overlapLog
, params
->cParams
.strategy
);
1227 int ovLog
= (overlapRLog
>= 8) ? 0 : (params
->cParams
.windowLog
- overlapRLog
);
1228 assert(0 <= overlapRLog
&& overlapRLog
<= 8);
1229 if (params
->ldmParams
.enableLdm
) {
1230 /* In Long Range Mode, the windowLog is typically oversized.
1231 * In which case, it's preferable to determine the jobSize
1232 * based on chainLog instead.
1233 * Then, ovLog becomes a fraction of the jobSize, rather than windowSize */
1234 ovLog
= MIN(params
->cParams
.windowLog
, ZSTDMT_computeTargetJobLog(params
) - 2)
1237 assert(0 <= ovLog
&& ovLog
<= ZSTD_WINDOWLOG_MAX
);
1238 DEBUGLOG(4, "overlapLog : %i", params
->overlapLog
);
1239 DEBUGLOG(4, "overlap size : %i", 1 << ovLog
);
1240 return (ovLog
==0) ? 0 : (size_t)1 << ovLog
;
1244 ZSTDMT_computeNbJobs(const ZSTD_CCtx_params
* params
, size_t srcSize
, unsigned nbWorkers
)
1246 assert(nbWorkers
>0);
1247 { size_t const jobSizeTarget
= (size_t)1 << ZSTDMT_computeTargetJobLog(params
);
1248 size_t const jobMaxSize
= jobSizeTarget
<< 2;
1249 size_t const passSizeMax
= jobMaxSize
* nbWorkers
;
1250 unsigned const multiplier
= (unsigned)(srcSize
/ passSizeMax
) + 1;
1251 unsigned const nbJobsLarge
= multiplier
* nbWorkers
;
1252 unsigned const nbJobsMax
= (unsigned)(srcSize
/ jobSizeTarget
) + 1;
1253 unsigned const nbJobsSmall
= MIN(nbJobsMax
, nbWorkers
);
1254 return (multiplier
>1) ? nbJobsLarge
: nbJobsSmall
;
1257 /* ZSTDMT_compress_advanced_internal() :
1258 * This is a blocking function : it will only give back control to caller after finishing its compression job.
1261 ZSTDMT_compress_advanced_internal(
1263 void* dst
, size_t dstCapacity
,
1264 const void* src
, size_t srcSize
,
1265 const ZSTD_CDict
* cdict
,
1266 ZSTD_CCtx_params params
)
1268 ZSTD_CCtx_params
const jobParams
= ZSTDMT_initJobCCtxParams(¶ms
);
1269 size_t const overlapSize
= ZSTDMT_computeOverlapSize(¶ms
);
1270 unsigned const nbJobs
= ZSTDMT_computeNbJobs(¶ms
, srcSize
, params
.nbWorkers
);
1271 size_t const proposedJobSize
= (srcSize
+ (nbJobs
-1)) / nbJobs
;
1272 size_t const avgJobSize
= (((proposedJobSize
-1) & 0x1FFFF) < 0x7FFF) ? proposedJobSize
+ 0xFFFF : proposedJobSize
; /* avoid too small last block */
1273 const char* const srcStart
= (const char*)src
;
1274 size_t remainingSrcSize
= srcSize
;
1275 unsigned const compressWithinDst
= (dstCapacity
>= ZSTD_compressBound(srcSize
)) ? nbJobs
: (unsigned)(dstCapacity
/ ZSTD_compressBound(avgJobSize
)); /* presumes avgJobSize >= 256 KB, which should be the case */
1276 size_t frameStartPos
= 0, dstBufferPos
= 0;
1277 assert(jobParams
.nbWorkers
== 0);
1278 assert(mtctx
->cctxPool
->totalCCtx
== params
.nbWorkers
);
1280 params
.jobSize
= (U32
)avgJobSize
;
1281 DEBUGLOG(4, "ZSTDMT_compress_advanced_internal: nbJobs=%2u (rawSize=%u bytes; fixedSize=%u) ",
1282 nbJobs
, (U32
)proposedJobSize
, (U32
)avgJobSize
);
1284 if ((nbJobs
==1) | (params
.nbWorkers
<=1)) { /* fallback to single-thread mode : this is a blocking invocation anyway */
1285 ZSTD_CCtx
* const cctx
= mtctx
->cctxPool
->cctx
[0];
1286 DEBUGLOG(4, "ZSTDMT_compress_advanced_internal: fallback to single-thread mode");
1287 if (cdict
) return ZSTD_compress_usingCDict_advanced(cctx
, dst
, dstCapacity
, src
, srcSize
, cdict
, jobParams
.fParams
);
1288 return ZSTD_compress_advanced_internal(cctx
, dst
, dstCapacity
, src
, srcSize
, NULL
, 0, &jobParams
);
1291 assert(avgJobSize
>= 256 KB
); /* condition for ZSTD_compressBound(A) + ZSTD_compressBound(B) <= ZSTD_compressBound(A+B), required to compress directly into Dst (no additional buffer) */
1292 ZSTDMT_setBufferSize(mtctx
->bufPool
, ZSTD_compressBound(avgJobSize
) );
1293 /* LDM doesn't even try to load the dictionary in single-ingestion mode */
1294 if (ZSTDMT_serialState_reset(&mtctx
->serial
, mtctx
->seqPool
, params
, avgJobSize
, NULL
, 0, ZSTD_dct_auto
))
1295 return ERROR(memory_allocation
);
1297 FORWARD_IF_ERROR( ZSTDMT_expandJobsTable(mtctx
, nbJobs
) , ""); /* only expands if necessary */
1300 for (u
=0; u
<nbJobs
; u
++) {
1301 size_t const jobSize
= MIN(remainingSrcSize
, avgJobSize
);
1302 size_t const dstBufferCapacity
= ZSTD_compressBound(jobSize
);
1303 buffer_t
const dstAsBuffer
= { (char*)dst
+ dstBufferPos
, dstBufferCapacity
};
1304 buffer_t
const dstBuffer
= u
< compressWithinDst
? dstAsBuffer
: g_nullBuffer
;
1305 size_t dictSize
= u
? overlapSize
: 0;
1307 mtctx
->jobs
[u
].prefix
.start
= srcStart
+ frameStartPos
- dictSize
;
1308 mtctx
->jobs
[u
].prefix
.size
= dictSize
;
1309 mtctx
->jobs
[u
].src
.start
= srcStart
+ frameStartPos
;
1310 mtctx
->jobs
[u
].src
.size
= jobSize
; assert(jobSize
> 0); /* avoid job.src.size == 0 */
1311 mtctx
->jobs
[u
].consumed
= 0;
1312 mtctx
->jobs
[u
].cSize
= 0;
1313 mtctx
->jobs
[u
].cdict
= (u
==0) ? cdict
: NULL
;
1314 mtctx
->jobs
[u
].fullFrameSize
= srcSize
;
1315 mtctx
->jobs
[u
].params
= jobParams
;
1316 /* do not calculate checksum within sections, but write it in header for first section */
1317 mtctx
->jobs
[u
].dstBuff
= dstBuffer
;
1318 mtctx
->jobs
[u
].cctxPool
= mtctx
->cctxPool
;
1319 mtctx
->jobs
[u
].bufPool
= mtctx
->bufPool
;
1320 mtctx
->jobs
[u
].seqPool
= mtctx
->seqPool
;
1321 mtctx
->jobs
[u
].serial
= &mtctx
->serial
;
1322 mtctx
->jobs
[u
].jobID
= u
;
1323 mtctx
->jobs
[u
].firstJob
= (u
==0);
1324 mtctx
->jobs
[u
].lastJob
= (u
==nbJobs
-1);
1326 DEBUGLOG(5, "ZSTDMT_compress_advanced_internal: posting job %u (%u bytes)", u
, (U32
)jobSize
);
1327 DEBUG_PRINTHEX(6, mtctx
->jobs
[u
].prefix
.start
, 12);
1328 POOL_add(mtctx
->factory
, ZSTDMT_compressionJob
, &mtctx
->jobs
[u
]);
1330 frameStartPos
+= jobSize
;
1331 dstBufferPos
+= dstBufferCapacity
;
1332 remainingSrcSize
-= jobSize
;
1335 /* collect result */
1336 { size_t error
= 0, dstPos
= 0;
1338 for (jobID
=0; jobID
<nbJobs
; jobID
++) {
1339 DEBUGLOG(5, "waiting for job %u ", jobID
);
1340 ZSTD_PTHREAD_MUTEX_LOCK(&mtctx
->jobs
[jobID
].job_mutex
);
1341 while (mtctx
->jobs
[jobID
].consumed
< mtctx
->jobs
[jobID
].src
.size
) {
1342 DEBUGLOG(5, "waiting for jobCompleted signal from job %u", jobID
);
1343 ZSTD_pthread_cond_wait(&mtctx
->jobs
[jobID
].job_cond
, &mtctx
->jobs
[jobID
].job_mutex
);
1345 ZSTD_pthread_mutex_unlock(&mtctx
->jobs
[jobID
].job_mutex
);
1346 DEBUGLOG(5, "ready to write job %u ", jobID
);
1348 { size_t const cSize
= mtctx
->jobs
[jobID
].cSize
;
1349 if (ZSTD_isError(cSize
)) error
= cSize
;
1350 if ((!error
) && (dstPos
+ cSize
> dstCapacity
)) error
= ERROR(dstSize_tooSmall
);
1351 if (jobID
) { /* note : job 0 is written directly at dst, which is correct position */
1353 memmove((char*)dst
+ dstPos
, mtctx
->jobs
[jobID
].dstBuff
.start
, cSize
); /* may overlap when job compressed within dst */
1354 if (jobID
>= compressWithinDst
) { /* job compressed into its own buffer, which must be released */
1355 DEBUGLOG(5, "releasing buffer %u>=%u", jobID
, compressWithinDst
);
1356 ZSTDMT_releaseBuffer(mtctx
->bufPool
, mtctx
->jobs
[jobID
].dstBuff
);
1358 mtctx
->jobs
[jobID
].dstBuff
= g_nullBuffer
;
1359 mtctx
->jobs
[jobID
].cSize
= 0;
1362 } /* for (jobID=0; jobID<nbJobs; jobID++) */
1364 DEBUGLOG(4, "checksumFlag : %u ", params
.fParams
.checksumFlag
);
1365 if (params
.fParams
.checksumFlag
) {
1366 U32
const checksum
= (U32
)XXH64_digest(&mtctx
->serial
.xxhState
);
1367 if (dstPos
+ 4 > dstCapacity
) {
1368 error
= ERROR(dstSize_tooSmall
);
1370 DEBUGLOG(4, "writing checksum : %08X \n", checksum
);
1371 MEM_writeLE32((char*)dst
+ dstPos
, checksum
);
1375 if (!error
) DEBUGLOG(4, "compressed size : %u ", (U32
)dstPos
);
1376 return error
? error
: dstPos
;
1380 size_t ZSTDMT_compress_advanced(ZSTDMT_CCtx
* mtctx
,
1381 void* dst
, size_t dstCapacity
,
1382 const void* src
, size_t srcSize
,
1383 const ZSTD_CDict
* cdict
,
1384 ZSTD_parameters params
,
1387 ZSTD_CCtx_params cctxParams
= mtctx
->params
;
1388 cctxParams
.cParams
= params
.cParams
;
1389 cctxParams
.fParams
= params
.fParams
;
1390 assert(ZSTD_OVERLAPLOG_MIN
<= overlapLog
&& overlapLog
<= ZSTD_OVERLAPLOG_MAX
);
1391 cctxParams
.overlapLog
= overlapLog
;
1392 return ZSTDMT_compress_advanced_internal(mtctx
,
1399 size_t ZSTDMT_compressCCtx(ZSTDMT_CCtx
* mtctx
,
1400 void* dst
, size_t dstCapacity
,
1401 const void* src
, size_t srcSize
,
1402 int compressionLevel
)
1404 ZSTD_parameters params
= ZSTD_getParams(compressionLevel
, srcSize
, 0);
1405 int const overlapLog
= ZSTDMT_overlapLog_default(params
.cParams
.strategy
);
1406 params
.fParams
.contentSizeFlag
= 1;
1407 return ZSTDMT_compress_advanced(mtctx
, dst
, dstCapacity
, src
, srcSize
, NULL
, params
, overlapLog
);
1411 /* ====================================== */
1412 /* ======= Streaming API ======= */
1413 /* ====================================== */
1415 size_t ZSTDMT_initCStream_internal(
1417 const void* dict
, size_t dictSize
, ZSTD_dictContentType_e dictContentType
,
1418 const ZSTD_CDict
* cdict
, ZSTD_CCtx_params params
,
1419 unsigned long long pledgedSrcSize
)
1421 DEBUGLOG(4, "ZSTDMT_initCStream_internal (pledgedSrcSize=%u, nbWorkers=%u, cctxPool=%u)",
1422 (U32
)pledgedSrcSize
, params
.nbWorkers
, mtctx
->cctxPool
->totalCCtx
);
1424 /* params supposed partially fully validated at this point */
1425 assert(!ZSTD_isError(ZSTD_checkCParams(params
.cParams
)));
1426 assert(!((dict
) && (cdict
))); /* either dict or cdict, not both */
1429 if (params
.nbWorkers
!= mtctx
->params
.nbWorkers
)
1430 FORWARD_IF_ERROR( ZSTDMT_resize(mtctx
, params
.nbWorkers
) , "");
1432 if (params
.jobSize
!= 0 && params
.jobSize
< ZSTDMT_JOBSIZE_MIN
) params
.jobSize
= ZSTDMT_JOBSIZE_MIN
;
1433 if (params
.jobSize
> (size_t)ZSTDMT_JOBSIZE_MAX
) params
.jobSize
= (size_t)ZSTDMT_JOBSIZE_MAX
;
1435 mtctx
->singleBlockingThread
= (pledgedSrcSize
<= ZSTDMT_JOBSIZE_MIN
); /* do not trigger multi-threading when srcSize is too small */
1436 if (mtctx
->singleBlockingThread
) {
1437 ZSTD_CCtx_params
const singleThreadParams
= ZSTDMT_initJobCCtxParams(¶ms
);
1438 DEBUGLOG(5, "ZSTDMT_initCStream_internal: switch to single blocking thread mode");
1439 assert(singleThreadParams
.nbWorkers
== 0);
1440 return ZSTD_initCStream_internal(mtctx
->cctxPool
->cctx
[0],
1441 dict
, dictSize
, cdict
,
1442 &singleThreadParams
, pledgedSrcSize
);
1445 DEBUGLOG(4, "ZSTDMT_initCStream_internal: %u workers", params
.nbWorkers
);
1447 if (mtctx
->allJobsCompleted
== 0) { /* previous compression not correctly finished */
1448 ZSTDMT_waitForAllJobsCompleted(mtctx
);
1449 ZSTDMT_releaseAllJobResources(mtctx
);
1450 mtctx
->allJobsCompleted
= 1;
1453 mtctx
->params
= params
;
1454 mtctx
->frameContentSize
= pledgedSrcSize
;
1456 ZSTD_freeCDict(mtctx
->cdictLocal
);
1457 mtctx
->cdictLocal
= ZSTD_createCDict_advanced(dict
, dictSize
,
1458 ZSTD_dlm_byCopy
, dictContentType
, /* note : a loadPrefix becomes an internal CDict */
1459 params
.cParams
, mtctx
->cMem
);
1460 mtctx
->cdict
= mtctx
->cdictLocal
;
1461 if (mtctx
->cdictLocal
== NULL
) return ERROR(memory_allocation
);
1463 ZSTD_freeCDict(mtctx
->cdictLocal
);
1464 mtctx
->cdictLocal
= NULL
;
1465 mtctx
->cdict
= cdict
;
1468 mtctx
->targetPrefixSize
= ZSTDMT_computeOverlapSize(¶ms
);
1469 DEBUGLOG(4, "overlapLog=%i => %u KB", params
.overlapLog
, (U32
)(mtctx
->targetPrefixSize
>>10));
1470 mtctx
->targetSectionSize
= params
.jobSize
;
1471 if (mtctx
->targetSectionSize
== 0) {
1472 mtctx
->targetSectionSize
= 1ULL << ZSTDMT_computeTargetJobLog(¶ms
);
1474 assert(mtctx
->targetSectionSize
<= (size_t)ZSTDMT_JOBSIZE_MAX
);
1476 if (params
.rsyncable
) {
1477 /* Aim for the targetsectionSize as the average job size. */
1478 U32
const jobSizeMB
= (U32
)(mtctx
->targetSectionSize
>> 20);
1479 U32
const rsyncBits
= ZSTD_highbit32(jobSizeMB
) + 20;
1480 assert(jobSizeMB
>= 1);
1481 DEBUGLOG(4, "rsyncLog = %u", rsyncBits
);
1482 mtctx
->rsync
.hash
= 0;
1483 mtctx
->rsync
.hitMask
= (1ULL << rsyncBits
) - 1;
1484 mtctx
->rsync
.primePower
= ZSTD_rollingHash_primePower(RSYNC_LENGTH
);
1486 if (mtctx
->targetSectionSize
< mtctx
->targetPrefixSize
) mtctx
->targetSectionSize
= mtctx
->targetPrefixSize
; /* job size must be >= overlap size */
1487 DEBUGLOG(4, "Job Size : %u KB (note : set to %u)", (U32
)(mtctx
->targetSectionSize
>>10), (U32
)params
.jobSize
);
1488 DEBUGLOG(4, "inBuff Size : %u KB", (U32
)(mtctx
->targetSectionSize
>>10));
1489 ZSTDMT_setBufferSize(mtctx
->bufPool
, ZSTD_compressBound(mtctx
->targetSectionSize
));
1491 /* If ldm is enabled we need windowSize space. */
1492 size_t const windowSize
= mtctx
->params
.ldmParams
.enableLdm
? (1U << mtctx
->params
.cParams
.windowLog
) : 0;
1493 /* Two buffers of slack, plus extra space for the overlap
1494 * This is the minimum slack that LDM works with. One extra because
1495 * flush might waste up to targetSectionSize-1 bytes. Another extra
1496 * for the overlap (if > 0), then one to fill which doesn't overlap
1497 * with the LDM window.
1499 size_t const nbSlackBuffers
= 2 + (mtctx
->targetPrefixSize
> 0);
1500 size_t const slackSize
= mtctx
->targetSectionSize
* nbSlackBuffers
;
1501 /* Compute the total size, and always have enough slack */
1502 size_t const nbWorkers
= MAX(mtctx
->params
.nbWorkers
, 1);
1503 size_t const sectionsSize
= mtctx
->targetSectionSize
* nbWorkers
;
1504 size_t const capacity
= MAX(windowSize
, sectionsSize
) + slackSize
;
1505 if (mtctx
->roundBuff
.capacity
< capacity
) {
1506 if (mtctx
->roundBuff
.buffer
)
1507 ZSTD_free(mtctx
->roundBuff
.buffer
, mtctx
->cMem
);
1508 mtctx
->roundBuff
.buffer
= (BYTE
*)ZSTD_malloc(capacity
, mtctx
->cMem
);
1509 if (mtctx
->roundBuff
.buffer
== NULL
) {
1510 mtctx
->roundBuff
.capacity
= 0;
1511 return ERROR(memory_allocation
);
1513 mtctx
->roundBuff
.capacity
= capacity
;
1516 DEBUGLOG(4, "roundBuff capacity : %u KB", (U32
)(mtctx
->roundBuff
.capacity
>>10));
1517 mtctx
->roundBuff
.pos
= 0;
1518 mtctx
->inBuff
.buffer
= g_nullBuffer
;
1519 mtctx
->inBuff
.filled
= 0;
1520 mtctx
->inBuff
.prefix
= kNullRange
;
1521 mtctx
->doneJobID
= 0;
1522 mtctx
->nextJobID
= 0;
1523 mtctx
->frameEnded
= 0;
1524 mtctx
->allJobsCompleted
= 0;
1525 mtctx
->consumed
= 0;
1526 mtctx
->produced
= 0;
1527 if (ZSTDMT_serialState_reset(&mtctx
->serial
, mtctx
->seqPool
, params
, mtctx
->targetSectionSize
,
1528 dict
, dictSize
, dictContentType
))
1529 return ERROR(memory_allocation
);
1533 size_t ZSTDMT_initCStream_advanced(ZSTDMT_CCtx
* mtctx
,
1534 const void* dict
, size_t dictSize
,
1535 ZSTD_parameters params
,
1536 unsigned long long pledgedSrcSize
)
1538 ZSTD_CCtx_params cctxParams
= mtctx
->params
; /* retrieve sticky params */
1539 DEBUGLOG(4, "ZSTDMT_initCStream_advanced (pledgedSrcSize=%u)", (U32
)pledgedSrcSize
);
1540 cctxParams
.cParams
= params
.cParams
;
1541 cctxParams
.fParams
= params
.fParams
;
1542 return ZSTDMT_initCStream_internal(mtctx
, dict
, dictSize
, ZSTD_dct_auto
, NULL
,
1543 cctxParams
, pledgedSrcSize
);
1546 size_t ZSTDMT_initCStream_usingCDict(ZSTDMT_CCtx
* mtctx
,
1547 const ZSTD_CDict
* cdict
,
1548 ZSTD_frameParameters fParams
,
1549 unsigned long long pledgedSrcSize
)
1551 ZSTD_CCtx_params cctxParams
= mtctx
->params
;
1552 if (cdict
==NULL
) return ERROR(dictionary_wrong
); /* method incompatible with NULL cdict */
1553 cctxParams
.cParams
= ZSTD_getCParamsFromCDict(cdict
);
1554 cctxParams
.fParams
= fParams
;
1555 return ZSTDMT_initCStream_internal(mtctx
, NULL
, 0 /*dictSize*/, ZSTD_dct_auto
, cdict
,
1556 cctxParams
, pledgedSrcSize
);
1560 /* ZSTDMT_resetCStream() :
1561 * pledgedSrcSize can be zero == unknown (for the time being)
1562 * prefer using ZSTD_CONTENTSIZE_UNKNOWN,
1563 * as `0` might mean "empty" in the future */
1564 size_t ZSTDMT_resetCStream(ZSTDMT_CCtx
* mtctx
, unsigned long long pledgedSrcSize
)
1566 if (!pledgedSrcSize
) pledgedSrcSize
= ZSTD_CONTENTSIZE_UNKNOWN
;
1567 return ZSTDMT_initCStream_internal(mtctx
, NULL
, 0, ZSTD_dct_auto
, 0, mtctx
->params
,
1571 size_t ZSTDMT_initCStream(ZSTDMT_CCtx
* mtctx
, int compressionLevel
) {
1572 ZSTD_parameters
const params
= ZSTD_getParams(compressionLevel
, ZSTD_CONTENTSIZE_UNKNOWN
, 0);
1573 ZSTD_CCtx_params cctxParams
= mtctx
->params
; /* retrieve sticky params */
1574 DEBUGLOG(4, "ZSTDMT_initCStream (cLevel=%i)", compressionLevel
);
1575 cctxParams
.cParams
= params
.cParams
;
1576 cctxParams
.fParams
= params
.fParams
;
1577 return ZSTDMT_initCStream_internal(mtctx
, NULL
, 0, ZSTD_dct_auto
, NULL
, cctxParams
, ZSTD_CONTENTSIZE_UNKNOWN
);
1581 /* ZSTDMT_writeLastEmptyBlock()
1582 * Write a single empty block with an end-of-frame to finish a frame.
1583 * Job must be created from streaming variant.
1584 * This function is always successful if expected conditions are fulfilled.
1586 static void ZSTDMT_writeLastEmptyBlock(ZSTDMT_jobDescription
* job
)
1588 assert(job
->lastJob
== 1);
1589 assert(job
->src
.size
== 0); /* last job is empty -> will be simplified into a last empty block */
1590 assert(job
->firstJob
== 0); /* cannot be first job, as it also needs to create frame header */
1591 assert(job
->dstBuff
.start
== NULL
); /* invoked from streaming variant only (otherwise, dstBuff might be user's output) */
1592 job
->dstBuff
= ZSTDMT_getBuffer(job
->bufPool
);
1593 if (job
->dstBuff
.start
== NULL
) {
1594 job
->cSize
= ERROR(memory_allocation
);
1597 assert(job
->dstBuff
.capacity
>= ZSTD_blockHeaderSize
); /* no buffer should ever be that small */
1598 job
->src
= kNullRange
;
1599 job
->cSize
= ZSTD_writeLastEmptyBlock(job
->dstBuff
.start
, job
->dstBuff
.capacity
);
1600 assert(!ZSTD_isError(job
->cSize
));
1601 assert(job
->consumed
== 0);
1604 static size_t ZSTDMT_createCompressionJob(ZSTDMT_CCtx
* mtctx
, size_t srcSize
, ZSTD_EndDirective endOp
)
1606 unsigned const jobID
= mtctx
->nextJobID
& mtctx
->jobIDMask
;
1607 int const endFrame
= (endOp
== ZSTD_e_end
);
1609 if (mtctx
->nextJobID
> mtctx
->doneJobID
+ mtctx
->jobIDMask
) {
1610 DEBUGLOG(5, "ZSTDMT_createCompressionJob: will not create new job : table is full");
1611 assert((mtctx
->nextJobID
& mtctx
->jobIDMask
) == (mtctx
->doneJobID
& mtctx
->jobIDMask
));
1615 if (!mtctx
->jobReady
) {
1616 BYTE
const* src
= (BYTE
const*)mtctx
->inBuff
.buffer
.start
;
1617 DEBUGLOG(5, "ZSTDMT_createCompressionJob: preparing job %u to compress %u bytes with %u preload ",
1618 mtctx
->nextJobID
, (U32
)srcSize
, (U32
)mtctx
->inBuff
.prefix
.size
);
1619 mtctx
->jobs
[jobID
].src
.start
= src
;
1620 mtctx
->jobs
[jobID
].src
.size
= srcSize
;
1621 assert(mtctx
->inBuff
.filled
>= srcSize
);
1622 mtctx
->jobs
[jobID
].prefix
= mtctx
->inBuff
.prefix
;
1623 mtctx
->jobs
[jobID
].consumed
= 0;
1624 mtctx
->jobs
[jobID
].cSize
= 0;
1625 mtctx
->jobs
[jobID
].params
= mtctx
->params
;
1626 mtctx
->jobs
[jobID
].cdict
= mtctx
->nextJobID
==0 ? mtctx
->cdict
: NULL
;
1627 mtctx
->jobs
[jobID
].fullFrameSize
= mtctx
->frameContentSize
;
1628 mtctx
->jobs
[jobID
].dstBuff
= g_nullBuffer
;
1629 mtctx
->jobs
[jobID
].cctxPool
= mtctx
->cctxPool
;
1630 mtctx
->jobs
[jobID
].bufPool
= mtctx
->bufPool
;
1631 mtctx
->jobs
[jobID
].seqPool
= mtctx
->seqPool
;
1632 mtctx
->jobs
[jobID
].serial
= &mtctx
->serial
;
1633 mtctx
->jobs
[jobID
].jobID
= mtctx
->nextJobID
;
1634 mtctx
->jobs
[jobID
].firstJob
= (mtctx
->nextJobID
==0);
1635 mtctx
->jobs
[jobID
].lastJob
= endFrame
;
1636 mtctx
->jobs
[jobID
].frameChecksumNeeded
= mtctx
->params
.fParams
.checksumFlag
&& endFrame
&& (mtctx
->nextJobID
>0);
1637 mtctx
->jobs
[jobID
].dstFlushed
= 0;
1639 /* Update the round buffer pos and clear the input buffer to be reset */
1640 mtctx
->roundBuff
.pos
+= srcSize
;
1641 mtctx
->inBuff
.buffer
= g_nullBuffer
;
1642 mtctx
->inBuff
.filled
= 0;
1643 /* Set the prefix */
1645 size_t const newPrefixSize
= MIN(srcSize
, mtctx
->targetPrefixSize
);
1646 mtctx
->inBuff
.prefix
.start
= src
+ srcSize
- newPrefixSize
;
1647 mtctx
->inBuff
.prefix
.size
= newPrefixSize
;
1648 } else { /* endFrame==1 => no need for another input buffer */
1649 mtctx
->inBuff
.prefix
= kNullRange
;
1650 mtctx
->frameEnded
= endFrame
;
1651 if (mtctx
->nextJobID
== 0) {
1652 /* single job exception : checksum is already calculated directly within worker thread */
1653 mtctx
->params
.fParams
.checksumFlag
= 0;
1657 && (mtctx
->nextJobID
>0)/*single job must also write frame header*/ ) {
1658 DEBUGLOG(5, "ZSTDMT_createCompressionJob: creating a last empty block to end frame");
1659 assert(endOp
== ZSTD_e_end
); /* only possible case : need to end the frame with an empty last block */
1660 ZSTDMT_writeLastEmptyBlock(mtctx
->jobs
+ jobID
);
1666 DEBUGLOG(5, "ZSTDMT_createCompressionJob: posting job %u : %u bytes (end:%u, jobNb == %u (mod:%u))",
1668 (U32
)mtctx
->jobs
[jobID
].src
.size
,
1669 mtctx
->jobs
[jobID
].lastJob
,
1672 if (POOL_tryAdd(mtctx
->factory
, ZSTDMT_compressionJob
, &mtctx
->jobs
[jobID
])) {
1674 mtctx
->jobReady
= 0;
1676 DEBUGLOG(5, "ZSTDMT_createCompressionJob: no worker available for job %u", mtctx
->nextJobID
);
1677 mtctx
->jobReady
= 1;
1683 /*! ZSTDMT_flushProduced() :
1684 * flush whatever data has been produced but not yet flushed in current job.
1685 * move to next job if current one is fully flushed.
1686 * `output` : `pos` will be updated with amount of data flushed .
1687 * `blockToFlush` : if >0, the function will block and wait if there is no data available to flush .
1688 * @return : amount of data remaining within internal buffer, 0 if no more, 1 if unknown but > 0, or an error code */
1689 static size_t ZSTDMT_flushProduced(ZSTDMT_CCtx
* mtctx
, ZSTD_outBuffer
* output
, unsigned blockToFlush
, ZSTD_EndDirective end
)
1691 unsigned const wJobID
= mtctx
->doneJobID
& mtctx
->jobIDMask
;
1692 DEBUGLOG(5, "ZSTDMT_flushProduced (blocking:%u , job %u <= %u)",
1693 blockToFlush
, mtctx
->doneJobID
, mtctx
->nextJobID
);
1694 assert(output
->size
>= output
->pos
);
1696 ZSTD_PTHREAD_MUTEX_LOCK(&mtctx
->jobs
[wJobID
].job_mutex
);
1698 && (mtctx
->doneJobID
< mtctx
->nextJobID
) ) {
1699 assert(mtctx
->jobs
[wJobID
].dstFlushed
<= mtctx
->jobs
[wJobID
].cSize
);
1700 while (mtctx
->jobs
[wJobID
].dstFlushed
== mtctx
->jobs
[wJobID
].cSize
) { /* nothing to flush */
1701 if (mtctx
->jobs
[wJobID
].consumed
== mtctx
->jobs
[wJobID
].src
.size
) {
1702 DEBUGLOG(5, "job %u is completely consumed (%u == %u) => don't wait for cond, there will be none",
1703 mtctx
->doneJobID
, (U32
)mtctx
->jobs
[wJobID
].consumed
, (U32
)mtctx
->jobs
[wJobID
].src
.size
);
1706 DEBUGLOG(5, "waiting for something to flush from job %u (currently flushed: %u bytes)",
1707 mtctx
->doneJobID
, (U32
)mtctx
->jobs
[wJobID
].dstFlushed
);
1708 ZSTD_pthread_cond_wait(&mtctx
->jobs
[wJobID
].job_cond
, &mtctx
->jobs
[wJobID
].job_mutex
); /* block when nothing to flush but some to come */
1711 /* try to flush something */
1712 { size_t cSize
= mtctx
->jobs
[wJobID
].cSize
; /* shared */
1713 size_t const srcConsumed
= mtctx
->jobs
[wJobID
].consumed
; /* shared */
1714 size_t const srcSize
= mtctx
->jobs
[wJobID
].src
.size
; /* read-only, could be done after mutex lock, but no-declaration-after-statement */
1715 ZSTD_pthread_mutex_unlock(&mtctx
->jobs
[wJobID
].job_mutex
);
1716 if (ZSTD_isError(cSize
)) {
1717 DEBUGLOG(5, "ZSTDMT_flushProduced: job %u : compression error detected : %s",
1718 mtctx
->doneJobID
, ZSTD_getErrorName(cSize
));
1719 ZSTDMT_waitForAllJobsCompleted(mtctx
);
1720 ZSTDMT_releaseAllJobResources(mtctx
);
1723 /* add frame checksum if necessary (can only happen once) */
1724 assert(srcConsumed
<= srcSize
);
1725 if ( (srcConsumed
== srcSize
) /* job completed -> worker no longer active */
1726 && mtctx
->jobs
[wJobID
].frameChecksumNeeded
) {
1727 U32
const checksum
= (U32
)XXH64_digest(&mtctx
->serial
.xxhState
);
1728 DEBUGLOG(4, "ZSTDMT_flushProduced: writing checksum : %08X \n", checksum
);
1729 MEM_writeLE32((char*)mtctx
->jobs
[wJobID
].dstBuff
.start
+ mtctx
->jobs
[wJobID
].cSize
, checksum
);
1731 mtctx
->jobs
[wJobID
].cSize
+= 4; /* can write this shared value, as worker is no longer active */
1732 mtctx
->jobs
[wJobID
].frameChecksumNeeded
= 0;
1735 if (cSize
> 0) { /* compression is ongoing or completed */
1736 size_t const toFlush
= MIN(cSize
- mtctx
->jobs
[wJobID
].dstFlushed
, output
->size
- output
->pos
);
1737 DEBUGLOG(5, "ZSTDMT_flushProduced: Flushing %u bytes from job %u (completion:%u/%u, generated:%u)",
1738 (U32
)toFlush
, mtctx
->doneJobID
, (U32
)srcConsumed
, (U32
)srcSize
, (U32
)cSize
);
1739 assert(mtctx
->doneJobID
< mtctx
->nextJobID
);
1740 assert(cSize
>= mtctx
->jobs
[wJobID
].dstFlushed
);
1741 assert(mtctx
->jobs
[wJobID
].dstBuff
.start
!= NULL
);
1743 memcpy((char*)output
->dst
+ output
->pos
,
1744 (const char*)mtctx
->jobs
[wJobID
].dstBuff
.start
+ mtctx
->jobs
[wJobID
].dstFlushed
,
1747 output
->pos
+= toFlush
;
1748 mtctx
->jobs
[wJobID
].dstFlushed
+= toFlush
; /* can write : this value is only used by mtctx */
1750 if ( (srcConsumed
== srcSize
) /* job is completed */
1751 && (mtctx
->jobs
[wJobID
].dstFlushed
== cSize
) ) { /* output buffer fully flushed => free this job position */
1752 DEBUGLOG(5, "Job %u completed (%u bytes), moving to next one",
1753 mtctx
->doneJobID
, (U32
)mtctx
->jobs
[wJobID
].dstFlushed
);
1754 ZSTDMT_releaseBuffer(mtctx
->bufPool
, mtctx
->jobs
[wJobID
].dstBuff
);
1755 DEBUGLOG(5, "dstBuffer released");
1756 mtctx
->jobs
[wJobID
].dstBuff
= g_nullBuffer
;
1757 mtctx
->jobs
[wJobID
].cSize
= 0; /* ensure this job slot is considered "not started" in future check */
1758 mtctx
->consumed
+= srcSize
;
1759 mtctx
->produced
+= cSize
;
1763 /* return value : how many bytes left in buffer ; fake it to 1 when unknown but >0 */
1764 if (cSize
> mtctx
->jobs
[wJobID
].dstFlushed
) return (cSize
- mtctx
->jobs
[wJobID
].dstFlushed
);
1765 if (srcSize
> srcConsumed
) return 1; /* current job not completely compressed */
1767 if (mtctx
->doneJobID
< mtctx
->nextJobID
) return 1; /* some more jobs ongoing */
1768 if (mtctx
->jobReady
) return 1; /* one job is ready to push, just not yet in the list */
1769 if (mtctx
->inBuff
.filled
> 0) return 1; /* input is not empty, and still needs to be converted into a job */
1770 mtctx
->allJobsCompleted
= mtctx
->frameEnded
; /* all jobs are entirely flushed => if this one is last one, frame is completed */
1771 if (end
== ZSTD_e_end
) return !mtctx
->frameEnded
; /* for ZSTD_e_end, question becomes : is frame completed ? instead of : are internal buffers fully flushed ? */
1772 return 0; /* internal buffers fully flushed */
1776 * Returns the range of data used by the earliest job that is not yet complete.
1777 * If the data of the first job is broken up into two segments, we cover both
1780 static range_t
ZSTDMT_getInputDataInUse(ZSTDMT_CCtx
* mtctx
)
1782 unsigned const firstJobID
= mtctx
->doneJobID
;
1783 unsigned const lastJobID
= mtctx
->nextJobID
;
1786 for (jobID
= firstJobID
; jobID
< lastJobID
; ++jobID
) {
1787 unsigned const wJobID
= jobID
& mtctx
->jobIDMask
;
1790 ZSTD_PTHREAD_MUTEX_LOCK(&mtctx
->jobs
[wJobID
].job_mutex
);
1791 consumed
= mtctx
->jobs
[wJobID
].consumed
;
1792 ZSTD_pthread_mutex_unlock(&mtctx
->jobs
[wJobID
].job_mutex
);
1794 if (consumed
< mtctx
->jobs
[wJobID
].src
.size
) {
1795 range_t range
= mtctx
->jobs
[wJobID
].prefix
;
1796 if (range
.size
== 0) {
1798 range
= mtctx
->jobs
[wJobID
].src
;
1800 /* Job source in multiple segments not supported yet */
1801 assert(range
.start
<= mtctx
->jobs
[wJobID
].src
.start
);
1809 * Returns non-zero iff buffer and range overlap.
1811 static int ZSTDMT_isOverlapped(buffer_t buffer
, range_t range
)
1813 BYTE
const* const bufferStart
= (BYTE
const*)buffer
.start
;
1814 BYTE
const* const bufferEnd
= bufferStart
+ buffer
.capacity
;
1815 BYTE
const* const rangeStart
= (BYTE
const*)range
.start
;
1816 BYTE
const* const rangeEnd
= range
.size
!= 0 ? rangeStart
+ range
.size
: rangeStart
;
1818 if (rangeStart
== NULL
|| bufferStart
== NULL
)
1820 /* Empty ranges cannot overlap */
1821 if (bufferStart
== bufferEnd
|| rangeStart
== rangeEnd
)
1824 return bufferStart
< rangeEnd
&& rangeStart
< bufferEnd
;
1827 static int ZSTDMT_doesOverlapWindow(buffer_t buffer
, ZSTD_window_t window
)
1832 DEBUGLOG(5, "ZSTDMT_doesOverlapWindow");
1833 extDict
.start
= window
.dictBase
+ window
.lowLimit
;
1834 extDict
.size
= window
.dictLimit
- window
.lowLimit
;
1836 prefix
.start
= window
.base
+ window
.dictLimit
;
1837 prefix
.size
= window
.nextSrc
- (window
.base
+ window
.dictLimit
);
1838 DEBUGLOG(5, "extDict [0x%zx, 0x%zx)",
1839 (size_t)extDict
.start
,
1840 (size_t)extDict
.start
+ extDict
.size
);
1841 DEBUGLOG(5, "prefix [0x%zx, 0x%zx)",
1842 (size_t)prefix
.start
,
1843 (size_t)prefix
.start
+ prefix
.size
);
1845 return ZSTDMT_isOverlapped(buffer
, extDict
)
1846 || ZSTDMT_isOverlapped(buffer
, prefix
);
1849 static void ZSTDMT_waitForLdmComplete(ZSTDMT_CCtx
* mtctx
, buffer_t buffer
)
1851 if (mtctx
->params
.ldmParams
.enableLdm
) {
1852 ZSTD_pthread_mutex_t
* mutex
= &mtctx
->serial
.ldmWindowMutex
;
1853 DEBUGLOG(5, "ZSTDMT_waitForLdmComplete");
1854 DEBUGLOG(5, "source [0x%zx, 0x%zx)",
1855 (size_t)buffer
.start
,
1856 (size_t)buffer
.start
+ buffer
.capacity
);
1857 ZSTD_PTHREAD_MUTEX_LOCK(mutex
);
1858 while (ZSTDMT_doesOverlapWindow(buffer
, mtctx
->serial
.ldmWindow
)) {
1859 DEBUGLOG(5, "Waiting for LDM to finish...");
1860 ZSTD_pthread_cond_wait(&mtctx
->serial
.ldmWindowCond
, mutex
);
1862 DEBUGLOG(6, "Done waiting for LDM to finish");
1863 ZSTD_pthread_mutex_unlock(mutex
);
1868 * Attempts to set the inBuff to the next section to fill.
1869 * If any part of the new section is still in use we give up.
1870 * Returns non-zero if the buffer is filled.
1872 static int ZSTDMT_tryGetInputRange(ZSTDMT_CCtx
* mtctx
)
1874 range_t
const inUse
= ZSTDMT_getInputDataInUse(mtctx
);
1875 size_t const spaceLeft
= mtctx
->roundBuff
.capacity
- mtctx
->roundBuff
.pos
;
1876 size_t const target
= mtctx
->targetSectionSize
;
1879 DEBUGLOG(5, "ZSTDMT_tryGetInputRange");
1880 assert(mtctx
->inBuff
.buffer
.start
== NULL
);
1881 assert(mtctx
->roundBuff
.capacity
>= target
);
1883 if (spaceLeft
< target
) {
1884 /* ZSTD_invalidateRepCodes() doesn't work for extDict variants.
1885 * Simply copy the prefix to the beginning in that case.
1887 BYTE
* const start
= (BYTE
*)mtctx
->roundBuff
.buffer
;
1888 size_t const prefixSize
= mtctx
->inBuff
.prefix
.size
;
1890 buffer
.start
= start
;
1891 buffer
.capacity
= prefixSize
;
1892 if (ZSTDMT_isOverlapped(buffer
, inUse
)) {
1893 DEBUGLOG(5, "Waiting for buffer...");
1896 ZSTDMT_waitForLdmComplete(mtctx
, buffer
);
1897 memmove(start
, mtctx
->inBuff
.prefix
.start
, prefixSize
);
1898 mtctx
->inBuff
.prefix
.start
= start
;
1899 mtctx
->roundBuff
.pos
= prefixSize
;
1901 buffer
.start
= mtctx
->roundBuff
.buffer
+ mtctx
->roundBuff
.pos
;
1902 buffer
.capacity
= target
;
1904 if (ZSTDMT_isOverlapped(buffer
, inUse
)) {
1905 DEBUGLOG(5, "Waiting for buffer...");
1908 assert(!ZSTDMT_isOverlapped(buffer
, mtctx
->inBuff
.prefix
));
1910 ZSTDMT_waitForLdmComplete(mtctx
, buffer
);
1912 DEBUGLOG(5, "Using prefix range [%zx, %zx)",
1913 (size_t)mtctx
->inBuff
.prefix
.start
,
1914 (size_t)mtctx
->inBuff
.prefix
.start
+ mtctx
->inBuff
.prefix
.size
);
1915 DEBUGLOG(5, "Using source range [%zx, %zx)",
1916 (size_t)buffer
.start
,
1917 (size_t)buffer
.start
+ buffer
.capacity
);
1920 mtctx
->inBuff
.buffer
= buffer
;
1921 mtctx
->inBuff
.filled
= 0;
1922 assert(mtctx
->roundBuff
.pos
+ buffer
.capacity
<= mtctx
->roundBuff
.capacity
);
1927 size_t toLoad
; /* The number of bytes to load from the input. */
1928 int flush
; /* Boolean declaring if we must flush because we found a synchronization point. */
1932 * Searches through the input for a synchronization point. If one is found, we
1933 * will instruct the caller to flush, and return the number of bytes to load.
1934 * Otherwise, we will load as many bytes as possible and instruct the caller
1935 * to continue as normal.
1938 findSynchronizationPoint(ZSTDMT_CCtx
const* mtctx
, ZSTD_inBuffer
const input
)
1940 BYTE
const* const istart
= (BYTE
const*)input
.src
+ input
.pos
;
1941 U64
const primePower
= mtctx
->rsync
.primePower
;
1942 U64
const hitMask
= mtctx
->rsync
.hitMask
;
1944 syncPoint_t syncPoint
;
1949 syncPoint
.toLoad
= MIN(input
.size
- input
.pos
, mtctx
->targetSectionSize
- mtctx
->inBuff
.filled
);
1950 syncPoint
.flush
= 0;
1951 if (!mtctx
->params
.rsyncable
)
1952 /* Rsync is disabled. */
1954 if (mtctx
->inBuff
.filled
+ syncPoint
.toLoad
< RSYNC_LENGTH
)
1955 /* Not enough to compute the hash.
1956 * We will miss any synchronization points in this RSYNC_LENGTH byte
1957 * window. However, since it depends only in the internal buffers, if the
1958 * state is already synchronized, we will remain synchronized.
1959 * Additionally, the probability that we miss a synchronization point is
1960 * low: RSYNC_LENGTH / targetSectionSize.
1963 /* Initialize the loop variables. */
1964 if (mtctx
->inBuff
.filled
>= RSYNC_LENGTH
) {
1965 /* We have enough bytes buffered to initialize the hash.
1966 * Start scanning at the beginning of the input.
1969 prev
= (BYTE
const*)mtctx
->inBuff
.buffer
.start
+ mtctx
->inBuff
.filled
- RSYNC_LENGTH
;
1970 hash
= ZSTD_rollingHash_compute(prev
, RSYNC_LENGTH
);
1972 /* We don't have enough bytes buffered to initialize the hash, but
1973 * we know we have at least RSYNC_LENGTH bytes total.
1974 * Start scanning after the first RSYNC_LENGTH bytes less the bytes
1977 pos
= RSYNC_LENGTH
- mtctx
->inBuff
.filled
;
1978 prev
= (BYTE
const*)mtctx
->inBuff
.buffer
.start
- pos
;
1979 hash
= ZSTD_rollingHash_compute(mtctx
->inBuff
.buffer
.start
, mtctx
->inBuff
.filled
);
1980 hash
= ZSTD_rollingHash_append(hash
, istart
, pos
);
1982 /* Starting with the hash of the previous RSYNC_LENGTH bytes, roll
1983 * through the input. If we hit a synchronization point, then cut the
1984 * job off, and tell the compressor to flush the job. Otherwise, load
1985 * all the bytes and continue as normal.
1986 * If we go too long without a synchronization point (targetSectionSize)
1987 * then a block will be emitted anyways, but this is okay, since if we
1988 * are already synchronized we will remain synchronized.
1990 for (; pos
< syncPoint
.toLoad
; ++pos
) {
1991 BYTE
const toRemove
= pos
< RSYNC_LENGTH
? prev
[pos
] : istart
[pos
- RSYNC_LENGTH
];
1992 /* if (pos >= RSYNC_LENGTH) assert(ZSTD_rollingHash_compute(istart + pos - RSYNC_LENGTH, RSYNC_LENGTH) == hash); */
1993 hash
= ZSTD_rollingHash_rotate(hash
, toRemove
, istart
[pos
], primePower
);
1994 if ((hash
& hitMask
) == hitMask
) {
1995 syncPoint
.toLoad
= pos
+ 1;
1996 syncPoint
.flush
= 1;
2003 size_t ZSTDMT_nextInputSizeHint(const ZSTDMT_CCtx
* mtctx
)
2005 size_t hintInSize
= mtctx
->targetSectionSize
- mtctx
->inBuff
.filled
;
2006 if (hintInSize
==0) hintInSize
= mtctx
->targetSectionSize
;
2010 /** ZSTDMT_compressStream_generic() :
2011 * internal use only - exposed to be invoked from zstd_compress.c
2012 * assumption : output and input are valid (pos <= size)
2013 * @return : minimum amount of data remaining to flush, 0 if none */
2014 size_t ZSTDMT_compressStream_generic(ZSTDMT_CCtx
* mtctx
,
2015 ZSTD_outBuffer
* output
,
2016 ZSTD_inBuffer
* input
,
2017 ZSTD_EndDirective endOp
)
2019 unsigned forwardInputProgress
= 0;
2020 DEBUGLOG(5, "ZSTDMT_compressStream_generic (endOp=%u, srcSize=%u)",
2021 (U32
)endOp
, (U32
)(input
->size
- input
->pos
));
2022 assert(output
->pos
<= output
->size
);
2023 assert(input
->pos
<= input
->size
);
2025 if (mtctx
->singleBlockingThread
) { /* delegate to single-thread (synchronous) */
2026 return ZSTD_compressStream2(mtctx
->cctxPool
->cctx
[0], output
, input
, endOp
);
2029 if ((mtctx
->frameEnded
) && (endOp
==ZSTD_e_continue
)) {
2030 /* current frame being ended. Only flush/end are allowed */
2031 return ERROR(stage_wrong
);
2034 /* single-pass shortcut (note : synchronous-mode) */
2035 if ( (!mtctx
->params
.rsyncable
) /* rsyncable mode is disabled */
2036 && (mtctx
->nextJobID
== 0) /* just started */
2037 && (mtctx
->inBuff
.filled
== 0) /* nothing buffered */
2038 && (!mtctx
->jobReady
) /* no job already created */
2039 && (endOp
== ZSTD_e_end
) /* end order */
2040 && (output
->size
- output
->pos
>= ZSTD_compressBound(input
->size
- input
->pos
)) ) { /* enough space in dst */
2041 size_t const cSize
= ZSTDMT_compress_advanced_internal(mtctx
,
2042 (char*)output
->dst
+ output
->pos
, output
->size
- output
->pos
,
2043 (const char*)input
->src
+ input
->pos
, input
->size
- input
->pos
,
2044 mtctx
->cdict
, mtctx
->params
);
2045 if (ZSTD_isError(cSize
)) return cSize
;
2046 input
->pos
= input
->size
;
2047 output
->pos
+= cSize
;
2048 mtctx
->allJobsCompleted
= 1;
2049 mtctx
->frameEnded
= 1;
2053 /* fill input buffer */
2054 if ( (!mtctx
->jobReady
)
2055 && (input
->size
> input
->pos
) ) { /* support NULL input */
2056 if (mtctx
->inBuff
.buffer
.start
== NULL
) {
2057 assert(mtctx
->inBuff
.filled
== 0); /* Can't fill an empty buffer */
2058 if (!ZSTDMT_tryGetInputRange(mtctx
)) {
2059 /* It is only possible for this operation to fail if there are
2060 * still compression jobs ongoing.
2062 DEBUGLOG(5, "ZSTDMT_tryGetInputRange failed");
2063 assert(mtctx
->doneJobID
!= mtctx
->nextJobID
);
2065 DEBUGLOG(5, "ZSTDMT_tryGetInputRange completed successfully : mtctx->inBuff.buffer.start = %p", mtctx
->inBuff
.buffer
.start
);
2067 if (mtctx
->inBuff
.buffer
.start
!= NULL
) {
2068 syncPoint_t
const syncPoint
= findSynchronizationPoint(mtctx
, *input
);
2069 if (syncPoint
.flush
&& endOp
== ZSTD_e_continue
) {
2070 endOp
= ZSTD_e_flush
;
2072 assert(mtctx
->inBuff
.buffer
.capacity
>= mtctx
->targetSectionSize
);
2073 DEBUGLOG(5, "ZSTDMT_compressStream_generic: adding %u bytes on top of %u to buffer of size %u",
2074 (U32
)syncPoint
.toLoad
, (U32
)mtctx
->inBuff
.filled
, (U32
)mtctx
->targetSectionSize
);
2075 memcpy((char*)mtctx
->inBuff
.buffer
.start
+ mtctx
->inBuff
.filled
, (const char*)input
->src
+ input
->pos
, syncPoint
.toLoad
);
2076 input
->pos
+= syncPoint
.toLoad
;
2077 mtctx
->inBuff
.filled
+= syncPoint
.toLoad
;
2078 forwardInputProgress
= syncPoint
.toLoad
>0;
2080 if ((input
->pos
< input
->size
) && (endOp
== ZSTD_e_end
))
2081 endOp
= ZSTD_e_flush
; /* can't end now : not all input consumed */
2084 if ( (mtctx
->jobReady
)
2085 || (mtctx
->inBuff
.filled
>= mtctx
->targetSectionSize
) /* filled enough : let's compress */
2086 || ((endOp
!= ZSTD_e_continue
) && (mtctx
->inBuff
.filled
> 0)) /* something to flush : let's go */
2087 || ((endOp
== ZSTD_e_end
) && (!mtctx
->frameEnded
)) ) { /* must finish the frame with a zero-size block */
2088 size_t const jobSize
= mtctx
->inBuff
.filled
;
2089 assert(mtctx
->inBuff
.filled
<= mtctx
->targetSectionSize
);
2090 FORWARD_IF_ERROR( ZSTDMT_createCompressionJob(mtctx
, jobSize
, endOp
) , "");
2093 /* check for potential compressed data ready to be flushed */
2094 { size_t const remainingToFlush
= ZSTDMT_flushProduced(mtctx
, output
, !forwardInputProgress
, endOp
); /* block if there was no forward input progress */
2095 if (input
->pos
< input
->size
) return MAX(remainingToFlush
, 1); /* input not consumed : do not end flush yet */
2096 DEBUGLOG(5, "end of ZSTDMT_compressStream_generic: remainingToFlush = %u", (U32
)remainingToFlush
);
2097 return remainingToFlush
;
2102 size_t ZSTDMT_compressStream(ZSTDMT_CCtx
* mtctx
, ZSTD_outBuffer
* output
, ZSTD_inBuffer
* input
)
2104 FORWARD_IF_ERROR( ZSTDMT_compressStream_generic(mtctx
, output
, input
, ZSTD_e_continue
) , "");
2106 /* recommended next input size : fill current input buffer */
2107 return mtctx
->targetSectionSize
- mtctx
->inBuff
.filled
; /* note : could be zero when input buffer is fully filled and no more availability to create new job */
2111 static size_t ZSTDMT_flushStream_internal(ZSTDMT_CCtx
* mtctx
, ZSTD_outBuffer
* output
, ZSTD_EndDirective endFrame
)
2113 size_t const srcSize
= mtctx
->inBuff
.filled
;
2114 DEBUGLOG(5, "ZSTDMT_flushStream_internal");
2116 if ( mtctx
->jobReady
/* one job ready for a worker to pick up */
2117 || (srcSize
> 0) /* still some data within input buffer */
2118 || ((endFrame
==ZSTD_e_end
) && !mtctx
->frameEnded
)) { /* need a last 0-size block to end frame */
2119 DEBUGLOG(5, "ZSTDMT_flushStream_internal : create a new job (%u bytes, end:%u)",
2120 (U32
)srcSize
, (U32
)endFrame
);
2121 FORWARD_IF_ERROR( ZSTDMT_createCompressionJob(mtctx
, srcSize
, endFrame
) , "");
2124 /* check if there is any data available to flush */
2125 return ZSTDMT_flushProduced(mtctx
, output
, 1 /* blockToFlush */, endFrame
);
2129 size_t ZSTDMT_flushStream(ZSTDMT_CCtx
* mtctx
, ZSTD_outBuffer
* output
)
2131 DEBUGLOG(5, "ZSTDMT_flushStream");
2132 if (mtctx
->singleBlockingThread
)
2133 return ZSTD_flushStream(mtctx
->cctxPool
->cctx
[0], output
);
2134 return ZSTDMT_flushStream_internal(mtctx
, output
, ZSTD_e_flush
);
2137 size_t ZSTDMT_endStream(ZSTDMT_CCtx
* mtctx
, ZSTD_outBuffer
* output
)
2139 DEBUGLOG(4, "ZSTDMT_endStream");
2140 if (mtctx
->singleBlockingThread
)
2141 return ZSTD_endStream(mtctx
->cctxPool
->cctx
[0], output
);
2142 return ZSTDMT_flushStream_internal(mtctx
, output
, ZSTD_e_end
);