2 * Copyright (c) 2016-present, Yann Collet, Facebook, Inc.
5 * This source code is licensed under both the BSD-style license (found in the
6 * LICENSE file in the root directory of this source tree) and the GPLv2 (found
7 * in the COPYING file in the root directory of this source tree).
8 * You may select, at your option, one of the above-listed licenses.
12 /* ====== Dependencies ======= */
13 #include <stddef.h> /* size_t */
14 #include <stdlib.h> /* malloc, calloc, free */
17 /* ====== Compiler specifics ====== */
19 # pragma warning(disable : 4204) /* disable: C4204: non-constant aggregate initializer */
23 #ifdef ZSTD_MULTITHREAD
25 #include "threading.h" /* pthread adaptation */
27 /* A job is a function and an opaque argument */
28 typedef struct POOL_job_s
{
29 POOL_function function
;
34 ZSTD_customMem customMem
;
35 /* Keep track of the threads */
36 ZSTD_pthread_t
*threads
;
39 /* The queue is a circular buffer */
45 /* The number of threads working on jobs */
46 size_t numThreadsBusy
;
47 /* Indicates if the queue is empty */
50 /* The mutex protects the queue */
51 ZSTD_pthread_mutex_t queueMutex
;
52 /* Condition variable for pushers to wait on when the queue is full */
53 ZSTD_pthread_cond_t queuePushCond
;
54 /* Condition variables for poppers to wait on when the queue is empty */
55 ZSTD_pthread_cond_t queuePopCond
;
56 /* Indicates if the queue is shutting down */
61 Work thread for the thread pool.
62 Waits for jobs and executes them.
63 @returns : NULL on failure else non-null.
65 static void* POOL_thread(void* opaque
) {
66 POOL_ctx
* const ctx
= (POOL_ctx
*)opaque
;
67 if (!ctx
) { return NULL
; }
69 /* Lock the mutex and wait for a non-empty queue or until shutdown */
70 ZSTD_pthread_mutex_lock(&ctx
->queueMutex
);
72 while (ctx
->queueEmpty
&& !ctx
->shutdown
) {
73 ZSTD_pthread_cond_wait(&ctx
->queuePopCond
, &ctx
->queueMutex
);
75 /* empty => shutting down: so stop */
76 if (ctx
->queueEmpty
) {
77 ZSTD_pthread_mutex_unlock(&ctx
->queueMutex
);
80 /* Pop a job off the queue */
81 { POOL_job
const job
= ctx
->queue
[ctx
->queueHead
];
82 ctx
->queueHead
= (ctx
->queueHead
+ 1) % ctx
->queueSize
;
83 ctx
->numThreadsBusy
++;
84 ctx
->queueEmpty
= ctx
->queueHead
== ctx
->queueTail
;
85 /* Unlock the mutex, signal a pusher, and run the job */
86 ZSTD_pthread_mutex_unlock(&ctx
->queueMutex
);
87 ZSTD_pthread_cond_signal(&ctx
->queuePushCond
);
89 job
.function(job
.opaque
);
91 /* If the intended queue size was 0, signal after finishing job */
92 if (ctx
->queueSize
== 1) {
93 ZSTD_pthread_mutex_lock(&ctx
->queueMutex
);
94 ctx
->numThreadsBusy
--;
95 ZSTD_pthread_mutex_unlock(&ctx
->queueMutex
);
96 ZSTD_pthread_cond_signal(&ctx
->queuePushCond
);
102 POOL_ctx
* POOL_create(size_t numThreads
, size_t queueSize
) {
103 return POOL_create_advanced(numThreads
, queueSize
, ZSTD_defaultCMem
);
106 POOL_ctx
* POOL_create_advanced(size_t numThreads
, size_t queueSize
, ZSTD_customMem customMem
) {
108 /* Check the parameters */
109 if (!numThreads
) { return NULL
; }
110 /* Allocate the context and zero initialize */
111 ctx
= (POOL_ctx
*)ZSTD_calloc(sizeof(POOL_ctx
), customMem
);
112 if (!ctx
) { return NULL
; }
113 /* Initialize the job queue.
114 * It needs one extra space since one space is wasted to differentiate empty
117 ctx
->queueSize
= queueSize
+ 1;
118 ctx
->queue
= (POOL_job
*) malloc(ctx
->queueSize
* sizeof(POOL_job
));
121 ctx
->numThreadsBusy
= 0;
123 (void)ZSTD_pthread_mutex_init(&ctx
->queueMutex
, NULL
);
124 (void)ZSTD_pthread_cond_init(&ctx
->queuePushCond
, NULL
);
125 (void)ZSTD_pthread_cond_init(&ctx
->queuePopCond
, NULL
);
127 /* Allocate space for the thread handles */
128 ctx
->threads
= (ZSTD_pthread_t
*)ZSTD_malloc(numThreads
* sizeof(ZSTD_pthread_t
), customMem
);
130 ctx
->customMem
= customMem
;
131 /* Check for errors */
132 if (!ctx
->threads
|| !ctx
->queue
) { POOL_free(ctx
); return NULL
; }
133 /* Initialize the threads */
135 for (i
= 0; i
< numThreads
; ++i
) {
136 if (ZSTD_pthread_create(&ctx
->threads
[i
], NULL
, &POOL_thread
, ctx
)) {
141 ctx
->numThreads
= numThreads
;
147 Shutdown the queue, wake any sleeping threads, and join all of the threads.
149 static void POOL_join(POOL_ctx
* ctx
) {
150 /* Shut down the queue */
151 ZSTD_pthread_mutex_lock(&ctx
->queueMutex
);
153 ZSTD_pthread_mutex_unlock(&ctx
->queueMutex
);
154 /* Wake up sleeping threads */
155 ZSTD_pthread_cond_broadcast(&ctx
->queuePushCond
);
156 ZSTD_pthread_cond_broadcast(&ctx
->queuePopCond
);
157 /* Join all of the threads */
159 for (i
= 0; i
< ctx
->numThreads
; ++i
) {
160 ZSTD_pthread_join(ctx
->threads
[i
], NULL
);
164 void POOL_free(POOL_ctx
*ctx
) {
165 if (!ctx
) { return; }
167 ZSTD_pthread_mutex_destroy(&ctx
->queueMutex
);
168 ZSTD_pthread_cond_destroy(&ctx
->queuePushCond
);
169 ZSTD_pthread_cond_destroy(&ctx
->queuePopCond
);
170 ZSTD_free(ctx
->queue
, ctx
->customMem
);
171 ZSTD_free(ctx
->threads
, ctx
->customMem
);
172 ZSTD_free(ctx
, ctx
->customMem
);
175 size_t POOL_sizeof(POOL_ctx
*ctx
) {
176 if (ctx
==NULL
) return 0; /* supports sizeof NULL */
178 + ctx
->queueSize
* sizeof(POOL_job
)
179 + ctx
->numThreads
* sizeof(ZSTD_pthread_t
);
183 * Returns 1 if the queue is full and 0 otherwise.
185 * If the queueSize is 1 (the pool was created with an intended queueSize of 0),
186 * then a queue is empty if there is a thread free and no job is waiting.
188 static int isQueueFull(POOL_ctx
const* ctx
) {
189 if (ctx
->queueSize
> 1) {
190 return ctx
->queueHead
== ((ctx
->queueTail
+ 1) % ctx
->queueSize
);
192 return ctx
->numThreadsBusy
== ctx
->numThreads
||
197 void POOL_add(void* ctxVoid
, POOL_function function
, void *opaque
) {
198 POOL_ctx
* const ctx
= (POOL_ctx
*)ctxVoid
;
199 if (!ctx
) { return; }
201 ZSTD_pthread_mutex_lock(&ctx
->queueMutex
);
202 { POOL_job
const job
= {function
, opaque
};
204 /* Wait until there is space in the queue for the new job */
205 while (isQueueFull(ctx
) && !ctx
->shutdown
) {
206 ZSTD_pthread_cond_wait(&ctx
->queuePushCond
, &ctx
->queueMutex
);
208 /* The queue is still going => there is space */
209 if (!ctx
->shutdown
) {
211 ctx
->queue
[ctx
->queueTail
] = job
;
212 ctx
->queueTail
= (ctx
->queueTail
+ 1) % ctx
->queueSize
;
215 ZSTD_pthread_mutex_unlock(&ctx
->queueMutex
);
216 ZSTD_pthread_cond_signal(&ctx
->queuePopCond
);
219 #else /* ZSTD_MULTITHREAD not defined */
220 /* No multi-threading support */
222 /* We don't need any data, but if it is empty malloc() might return NULL. */
226 static POOL_ctx g_ctx
;
228 POOL_ctx
* POOL_create(size_t numThreads
, size_t queueSize
) {
229 return POOL_create_advanced(numThreads
, queueSize
, ZSTD_defaultCMem
);
232 POOL_ctx
* POOL_create_advanced(size_t numThreads
, size_t queueSize
, ZSTD_customMem customMem
) {
239 void POOL_free(POOL_ctx
* ctx
) {
240 assert(!ctx
|| ctx
== &g_ctx
);
244 void POOL_add(void* ctx
, POOL_function function
, void* opaque
) {
249 size_t POOL_sizeof(POOL_ctx
* ctx
) {
250 if (ctx
==NULL
) return 0; /* supports sizeof NULL */
251 assert(ctx
== &g_ctx
);
255 #endif /* ZSTD_MULTITHREAD */