]>
Commit | Line | Data |
---|---|---|
11fdf7f2 | 1 | /* |
f67539c2 | 2 | * Copyright (c) 2016-2020, Yann Collet, Facebook, Inc. |
11fdf7f2 TL |
3 | * All rights reserved. |
4 | * | |
5 | * This source code is licensed under both the BSD-style license (found in the | |
6 | * LICENSE file in the root directory of this source tree) and the GPLv2 (found | |
7 | * in the COPYING file in the root directory of this source tree). | |
8 | * You may select, at your option, one of the above-listed licenses. | |
9 | */ | |
10 | ||
11 | ||
12 | /* ====== Dependencies ======= */ | |
9f95a23c TL |
13 | #include <stddef.h> /* size_t */ |
14 | #include "debug.h" /* assert */ | |
15 | #include "zstd_internal.h" /* ZSTD_malloc, ZSTD_free */ | |
11fdf7f2 TL |
16 | #include "pool.h" |
17 | ||
18 | /* ====== Compiler specifics ====== */ | |
19 | #if defined(_MSC_VER) | |
20 | # pragma warning(disable : 4204) /* disable: C4204: non-constant aggregate initializer */ | |
21 | #endif | |
22 | ||
23 | ||
24 | #ifdef ZSTD_MULTITHREAD | |
25 | ||
26 | #include "threading.h" /* pthread adaptation */ | |
27 | ||
28 | /* A job is a function and an opaque argument */ | |
29 | typedef struct POOL_job_s { | |
30 | POOL_function function; | |
31 | void *opaque; | |
32 | } POOL_job; | |
33 | ||
34 | struct POOL_ctx_s { | |
35 | ZSTD_customMem customMem; | |
36 | /* Keep track of the threads */ | |
9f95a23c TL |
37 | ZSTD_pthread_t* threads; |
38 | size_t threadCapacity; | |
39 | size_t threadLimit; | |
11fdf7f2 TL |
40 | |
41 | /* The queue is a circular buffer */ | |
42 | POOL_job *queue; | |
43 | size_t queueHead; | |
44 | size_t queueTail; | |
45 | size_t queueSize; | |
46 | ||
47 | /* The number of threads working on jobs */ | |
48 | size_t numThreadsBusy; | |
49 | /* Indicates if the queue is empty */ | |
50 | int queueEmpty; | |
51 | ||
52 | /* The mutex protects the queue */ | |
53 | ZSTD_pthread_mutex_t queueMutex; | |
54 | /* Condition variable for pushers to wait on when the queue is full */ | |
55 | ZSTD_pthread_cond_t queuePushCond; | |
56 | /* Condition variables for poppers to wait on when the queue is empty */ | |
57 | ZSTD_pthread_cond_t queuePopCond; | |
58 | /* Indicates if the queue is shutting down */ | |
59 | int shutdown; | |
60 | }; | |
61 | ||
62 | /* POOL_thread() : | |
9f95a23c TL |
63 | * Work thread for the thread pool. |
64 | * Waits for jobs and executes them. | |
65 | * @returns : NULL on failure else non-null. | |
66 | */ | |
11fdf7f2 TL |
67 | static void* POOL_thread(void* opaque) { |
68 | POOL_ctx* const ctx = (POOL_ctx*)opaque; | |
69 | if (!ctx) { return NULL; } | |
70 | for (;;) { | |
71 | /* Lock the mutex and wait for a non-empty queue or until shutdown */ | |
72 | ZSTD_pthread_mutex_lock(&ctx->queueMutex); | |
73 | ||
9f95a23c TL |
74 | while ( ctx->queueEmpty |
75 | || (ctx->numThreadsBusy >= ctx->threadLimit) ) { | |
76 | if (ctx->shutdown) { | |
77 | /* even if !queueEmpty, (possible if numThreadsBusy >= threadLimit), | |
78 | * a few threads will be shutdown while !queueEmpty, | |
79 | * but enough threads will remain active to finish the queue */ | |
80 | ZSTD_pthread_mutex_unlock(&ctx->queueMutex); | |
81 | return opaque; | |
82 | } | |
11fdf7f2 TL |
83 | ZSTD_pthread_cond_wait(&ctx->queuePopCond, &ctx->queueMutex); |
84 | } | |
11fdf7f2 TL |
85 | /* Pop a job off the queue */ |
86 | { POOL_job const job = ctx->queue[ctx->queueHead]; | |
87 | ctx->queueHead = (ctx->queueHead + 1) % ctx->queueSize; | |
88 | ctx->numThreadsBusy++; | |
89 | ctx->queueEmpty = ctx->queueHead == ctx->queueTail; | |
90 | /* Unlock the mutex, signal a pusher, and run the job */ | |
11fdf7f2 | 91 | ZSTD_pthread_cond_signal(&ctx->queuePushCond); |
9f95a23c | 92 | ZSTD_pthread_mutex_unlock(&ctx->queueMutex); |
11fdf7f2 TL |
93 | |
94 | job.function(job.opaque); | |
95 | ||
96 | /* If the intended queue size was 0, signal after finishing job */ | |
9f95a23c TL |
97 | ZSTD_pthread_mutex_lock(&ctx->queueMutex); |
98 | ctx->numThreadsBusy--; | |
11fdf7f2 | 99 | if (ctx->queueSize == 1) { |
11fdf7f2 | 100 | ZSTD_pthread_cond_signal(&ctx->queuePushCond); |
9f95a23c TL |
101 | } |
102 | ZSTD_pthread_mutex_unlock(&ctx->queueMutex); | |
103 | } | |
11fdf7f2 | 104 | } /* for (;;) */ |
9f95a23c | 105 | assert(0); /* Unreachable */ |
11fdf7f2 TL |
106 | } |
107 | ||
108 | POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) { | |
109 | return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem); | |
110 | } | |
111 | ||
9f95a23c TL |
112 | POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, |
113 | ZSTD_customMem customMem) { | |
11fdf7f2 | 114 | POOL_ctx* ctx; |
9f95a23c | 115 | /* Check parameters */ |
11fdf7f2 TL |
116 | if (!numThreads) { return NULL; } |
117 | /* Allocate the context and zero initialize */ | |
118 | ctx = (POOL_ctx*)ZSTD_calloc(sizeof(POOL_ctx), customMem); | |
119 | if (!ctx) { return NULL; } | |
120 | /* Initialize the job queue. | |
9f95a23c TL |
121 | * It needs one extra space since one space is wasted to differentiate |
122 | * empty and full queues. | |
11fdf7f2 TL |
123 | */ |
124 | ctx->queueSize = queueSize + 1; | |
9f95a23c | 125 | ctx->queue = (POOL_job*)ZSTD_malloc(ctx->queueSize * sizeof(POOL_job), customMem); |
11fdf7f2 TL |
126 | ctx->queueHead = 0; |
127 | ctx->queueTail = 0; | |
128 | ctx->numThreadsBusy = 0; | |
129 | ctx->queueEmpty = 1; | |
f67539c2 TL |
130 | { |
131 | int error = 0; | |
132 | error |= ZSTD_pthread_mutex_init(&ctx->queueMutex, NULL); | |
133 | error |= ZSTD_pthread_cond_init(&ctx->queuePushCond, NULL); | |
134 | error |= ZSTD_pthread_cond_init(&ctx->queuePopCond, NULL); | |
135 | if (error) { POOL_free(ctx); return NULL; } | |
136 | } | |
11fdf7f2 TL |
137 | ctx->shutdown = 0; |
138 | /* Allocate space for the thread handles */ | |
139 | ctx->threads = (ZSTD_pthread_t*)ZSTD_malloc(numThreads * sizeof(ZSTD_pthread_t), customMem); | |
9f95a23c | 140 | ctx->threadCapacity = 0; |
11fdf7f2 TL |
141 | ctx->customMem = customMem; |
142 | /* Check for errors */ | |
143 | if (!ctx->threads || !ctx->queue) { POOL_free(ctx); return NULL; } | |
144 | /* Initialize the threads */ | |
145 | { size_t i; | |
146 | for (i = 0; i < numThreads; ++i) { | |
147 | if (ZSTD_pthread_create(&ctx->threads[i], NULL, &POOL_thread, ctx)) { | |
9f95a23c | 148 | ctx->threadCapacity = i; |
11fdf7f2 TL |
149 | POOL_free(ctx); |
150 | return NULL; | |
151 | } } | |
9f95a23c TL |
152 | ctx->threadCapacity = numThreads; |
153 | ctx->threadLimit = numThreads; | |
11fdf7f2 TL |
154 | } |
155 | return ctx; | |
156 | } | |
157 | ||
158 | /*! POOL_join() : | |
159 | Shutdown the queue, wake any sleeping threads, and join all of the threads. | |
160 | */ | |
161 | static void POOL_join(POOL_ctx* ctx) { | |
162 | /* Shut down the queue */ | |
163 | ZSTD_pthread_mutex_lock(&ctx->queueMutex); | |
164 | ctx->shutdown = 1; | |
165 | ZSTD_pthread_mutex_unlock(&ctx->queueMutex); | |
166 | /* Wake up sleeping threads */ | |
167 | ZSTD_pthread_cond_broadcast(&ctx->queuePushCond); | |
168 | ZSTD_pthread_cond_broadcast(&ctx->queuePopCond); | |
169 | /* Join all of the threads */ | |
170 | { size_t i; | |
9f95a23c TL |
171 | for (i = 0; i < ctx->threadCapacity; ++i) { |
172 | ZSTD_pthread_join(ctx->threads[i], NULL); /* note : could fail */ | |
11fdf7f2 TL |
173 | } } |
174 | } | |
175 | ||
176 | void POOL_free(POOL_ctx *ctx) { | |
177 | if (!ctx) { return; } | |
178 | POOL_join(ctx); | |
179 | ZSTD_pthread_mutex_destroy(&ctx->queueMutex); | |
180 | ZSTD_pthread_cond_destroy(&ctx->queuePushCond); | |
181 | ZSTD_pthread_cond_destroy(&ctx->queuePopCond); | |
182 | ZSTD_free(ctx->queue, ctx->customMem); | |
183 | ZSTD_free(ctx->threads, ctx->customMem); | |
184 | ZSTD_free(ctx, ctx->customMem); | |
185 | } | |
186 | ||
9f95a23c TL |
187 | |
188 | ||
11fdf7f2 TL |
189 | size_t POOL_sizeof(POOL_ctx *ctx) { |
190 | if (ctx==NULL) return 0; /* supports sizeof NULL */ | |
191 | return sizeof(*ctx) | |
192 | + ctx->queueSize * sizeof(POOL_job) | |
9f95a23c TL |
193 | + ctx->threadCapacity * sizeof(ZSTD_pthread_t); |
194 | } | |
195 | ||
196 | ||
197 | /* @return : 0 on success, 1 on error */ | |
198 | static int POOL_resize_internal(POOL_ctx* ctx, size_t numThreads) | |
199 | { | |
200 | if (numThreads <= ctx->threadCapacity) { | |
201 | if (!numThreads) return 1; | |
202 | ctx->threadLimit = numThreads; | |
203 | return 0; | |
204 | } | |
205 | /* numThreads > threadCapacity */ | |
206 | { ZSTD_pthread_t* const threadPool = (ZSTD_pthread_t*)ZSTD_malloc(numThreads * sizeof(ZSTD_pthread_t), ctx->customMem); | |
207 | if (!threadPool) return 1; | |
208 | /* replace existing thread pool */ | |
209 | memcpy(threadPool, ctx->threads, ctx->threadCapacity * sizeof(*threadPool)); | |
210 | ZSTD_free(ctx->threads, ctx->customMem); | |
211 | ctx->threads = threadPool; | |
212 | /* Initialize additional threads */ | |
213 | { size_t threadId; | |
214 | for (threadId = ctx->threadCapacity; threadId < numThreads; ++threadId) { | |
215 | if (ZSTD_pthread_create(&threadPool[threadId], NULL, &POOL_thread, ctx)) { | |
216 | ctx->threadCapacity = threadId; | |
217 | return 1; | |
218 | } } | |
219 | } } | |
220 | /* successfully expanded */ | |
221 | ctx->threadCapacity = numThreads; | |
222 | ctx->threadLimit = numThreads; | |
223 | return 0; | |
224 | } | |
225 | ||
226 | /* @return : 0 on success, 1 on error */ | |
227 | int POOL_resize(POOL_ctx* ctx, size_t numThreads) | |
228 | { | |
229 | int result; | |
230 | if (ctx==NULL) return 1; | |
231 | ZSTD_pthread_mutex_lock(&ctx->queueMutex); | |
232 | result = POOL_resize_internal(ctx, numThreads); | |
233 | ZSTD_pthread_cond_broadcast(&ctx->queuePopCond); | |
234 | ZSTD_pthread_mutex_unlock(&ctx->queueMutex); | |
235 | return result; | |
11fdf7f2 TL |
236 | } |
237 | ||
238 | /** | |
239 | * Returns 1 if the queue is full and 0 otherwise. | |
240 | * | |
9f95a23c TL |
241 | * When queueSize is 1 (pool was created with an intended queueSize of 0), |
242 | * then a queue is empty if there is a thread free _and_ no job is waiting. | |
11fdf7f2 TL |
243 | */ |
244 | static int isQueueFull(POOL_ctx const* ctx) { | |
245 | if (ctx->queueSize > 1) { | |
246 | return ctx->queueHead == ((ctx->queueTail + 1) % ctx->queueSize); | |
247 | } else { | |
9f95a23c | 248 | return (ctx->numThreadsBusy == ctx->threadLimit) || |
11fdf7f2 TL |
249 | !ctx->queueEmpty; |
250 | } | |
251 | } | |
252 | ||
11fdf7f2 | 253 | |
9f95a23c TL |
254 | static void POOL_add_internal(POOL_ctx* ctx, POOL_function function, void *opaque) |
255 | { | |
256 | POOL_job const job = {function, opaque}; | |
257 | assert(ctx != NULL); | |
258 | if (ctx->shutdown) return; | |
259 | ||
260 | ctx->queueEmpty = 0; | |
261 | ctx->queue[ctx->queueTail] = job; | |
262 | ctx->queueTail = (ctx->queueTail + 1) % ctx->queueSize; | |
263 | ZSTD_pthread_cond_signal(&ctx->queuePopCond); | |
264 | } | |
265 | ||
266 | void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque) | |
267 | { | |
268 | assert(ctx != NULL); | |
11fdf7f2 | 269 | ZSTD_pthread_mutex_lock(&ctx->queueMutex); |
9f95a23c TL |
270 | /* Wait until there is space in the queue for the new job */ |
271 | while (isQueueFull(ctx) && (!ctx->shutdown)) { | |
272 | ZSTD_pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex); | |
273 | } | |
274 | POOL_add_internal(ctx, function, opaque); | |
275 | ZSTD_pthread_mutex_unlock(&ctx->queueMutex); | |
276 | } | |
11fdf7f2 | 277 | |
9f95a23c TL |
278 | |
279 | int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque) | |
280 | { | |
281 | assert(ctx != NULL); | |
282 | ZSTD_pthread_mutex_lock(&ctx->queueMutex); | |
283 | if (isQueueFull(ctx)) { | |
284 | ZSTD_pthread_mutex_unlock(&ctx->queueMutex); | |
285 | return 0; | |
11fdf7f2 | 286 | } |
9f95a23c | 287 | POOL_add_internal(ctx, function, opaque); |
11fdf7f2 | 288 | ZSTD_pthread_mutex_unlock(&ctx->queueMutex); |
9f95a23c | 289 | return 1; |
11fdf7f2 TL |
290 | } |
291 | ||
9f95a23c | 292 | |
11fdf7f2 | 293 | #else /* ZSTD_MULTITHREAD not defined */ |
9f95a23c TL |
294 | |
295 | /* ========================== */ | |
11fdf7f2 | 296 | /* No multi-threading support */ |
9f95a23c TL |
297 | /* ========================== */ |
298 | ||
11fdf7f2 | 299 | |
9f95a23c | 300 | /* We don't need any data, but if it is empty, malloc() might return NULL. */ |
11fdf7f2 TL |
301 | struct POOL_ctx_s { |
302 | int dummy; | |
303 | }; | |
304 | static POOL_ctx g_ctx; | |
305 | ||
306 | POOL_ctx* POOL_create(size_t numThreads, size_t queueSize) { | |
307 | return POOL_create_advanced(numThreads, queueSize, ZSTD_defaultCMem); | |
308 | } | |
309 | ||
310 | POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, ZSTD_customMem customMem) { | |
311 | (void)numThreads; | |
312 | (void)queueSize; | |
313 | (void)customMem; | |
314 | return &g_ctx; | |
315 | } | |
316 | ||
317 | void POOL_free(POOL_ctx* ctx) { | |
318 | assert(!ctx || ctx == &g_ctx); | |
319 | (void)ctx; | |
320 | } | |
321 | ||
9f95a23c TL |
322 | int POOL_resize(POOL_ctx* ctx, size_t numThreads) { |
323 | (void)ctx; (void)numThreads; | |
324 | return 0; | |
325 | } | |
326 | ||
327 | void POOL_add(POOL_ctx* ctx, POOL_function function, void* opaque) { | |
328 | (void)ctx; | |
329 | function(opaque); | |
330 | } | |
331 | ||
332 | int POOL_tryAdd(POOL_ctx* ctx, POOL_function function, void* opaque) { | |
11fdf7f2 TL |
333 | (void)ctx; |
334 | function(opaque); | |
9f95a23c | 335 | return 1; |
11fdf7f2 TL |
336 | } |
337 | ||
338 | size_t POOL_sizeof(POOL_ctx* ctx) { | |
339 | if (ctx==NULL) return 0; /* supports sizeof NULL */ | |
340 | assert(ctx == &g_ctx); | |
341 | return sizeof(*ctx); | |
342 | } | |
343 | ||
344 | #endif /* ZSTD_MULTITHREAD */ |