2 * Copyright (c) 2016-present, Yann Collet, Facebook, Inc.
5 * This source code is licensed under both the BSD-style license (found in the
6 * LICENSE file in the root directory of this source tree) and the GPLv2 (found
7 * in the COPYING file in the root directory of this source tree).
8 * You may select, at your option, one of the above-listed licenses.
13 #include "threading.h"
19 #define ASSERT_TRUE(p) \
25 #define ASSERT_FALSE(p) ASSERT_TRUE(!(p))
26 #define ASSERT_EQ(lhs, rhs) ASSERT_TRUE((lhs) == (rhs))
29 ZSTD_pthread_mutex_t mutex
;
34 static void fn(void *opaque
)
36 struct data
*data
= (struct data
*)opaque
;
37 ZSTD_pthread_mutex_lock(&data
->mutex
);
38 data
->data
[data
->i
] = (unsigned)(data
->i
);
40 ZSTD_pthread_mutex_unlock(&data
->mutex
);
43 static int testOrder(size_t numThreads
, size_t queueSize
)
46 POOL_ctx
* const ctx
= POOL_create(numThreads
, queueSize
);
49 (void)ZSTD_pthread_mutex_init(&data
.mutex
, NULL
);
51 for (i
= 0; i
< 16; ++i
) {
52 POOL_add(ctx
, &fn
, &data
);
56 ASSERT_EQ(16, data
.i
);
58 for (i
= 0; i
< data
.i
; ++i
) {
59 ASSERT_EQ(i
, data
.data
[i
]);
62 ZSTD_pthread_mutex_destroy(&data
.mutex
);
67 /* --- test deadlocks --- */
69 static void waitFn(void *opaque
) {
74 /* Tests for deadlock */
75 static int testWait(size_t numThreads
, size_t queueSize
) {
77 POOL_ctx
* const ctx
= POOL_create(numThreads
, queueSize
);
80 for (i
= 0; i
< 16; ++i
) {
81 POOL_add(ctx
, &waitFn
, &data
);
89 /* --- test POOL_resize() --- */
92 ZSTD_pthread_mutex_t mut
;
95 ZSTD_pthread_cond_t cond
;
98 static void waitLongFn(void *opaque
) {
99 poolTest_t
* const test
= (poolTest_t
*) opaque
;
101 ZSTD_pthread_mutex_lock(&test
->mut
);
102 test
->val
= test
->val
+ 1;
103 if (test
->val
== test
->max
)
104 ZSTD_pthread_cond_signal(&test
->cond
);
105 ZSTD_pthread_mutex_unlock(&test
->mut
);
108 static int testThreadReduction_internal(POOL_ctx
* ctx
, poolTest_t test
)
110 int const nbWaits
= 16;
111 UTIL_time_t startTime
;
112 U64 time4threads
, time2threads
;
117 startTime
= UTIL_getTime();
119 for (i
=0; i
<nbWaits
; i
++)
120 POOL_add(ctx
, &waitLongFn
, &test
);
122 ZSTD_pthread_mutex_lock(&test
.mut
);
123 ZSTD_pthread_cond_wait(&test
.cond
, &test
.mut
);
124 ASSERT_EQ(test
.val
, nbWaits
);
125 ZSTD_pthread_mutex_unlock(&test
.mut
);
126 time4threads
= UTIL_clockSpanNano(startTime
);
128 ASSERT_EQ( POOL_resize(ctx
, 2/*nbThreads*/) , 0 );
130 startTime
= UTIL_getTime();
132 for (i
=0; i
<nbWaits
; i
++)
133 POOL_add(ctx
, &waitLongFn
, &test
);
135 ZSTD_pthread_mutex_lock(&test
.mut
);
136 ZSTD_pthread_cond_wait(&test
.cond
, &test
.mut
);
137 ASSERT_EQ(test
.val
, nbWaits
);
138 ZSTD_pthread_mutex_unlock(&test
.mut
);
139 time2threads
= UTIL_clockSpanNano(startTime
);
141 if (time4threads
>= time2threads
) return 1; /* check 4 threads were effectively faster than 2 */
145 static int testThreadReduction(void) {
148 POOL_ctx
* const ctx
= POOL_create(4 /*nbThreads*/, 2 /*queueSize*/);
152 memset(&test
, 0, sizeof(test
));
153 ASSERT_FALSE( ZSTD_pthread_mutex_init(&test
.mut
, NULL
) );
154 ASSERT_FALSE( ZSTD_pthread_cond_init(&test
.cond
, NULL
) );
156 result
= testThreadReduction_internal(ctx
, test
);
158 ZSTD_pthread_mutex_destroy(&test
.mut
);
159 ZSTD_pthread_cond_destroy(&test
.cond
);
166 /* --- test abrupt ending --- */
169 ZSTD_pthread_mutex_t mut
;
173 static void waitIncFn(void *opaque
) {
174 abruptEndCanary_t
* test
= (abruptEndCanary_t
*) opaque
;
176 ZSTD_pthread_mutex_lock(&test
->mut
);
177 test
->val
= test
->val
+ 1;
178 ZSTD_pthread_mutex_unlock(&test
->mut
);
181 static int testAbruptEnding_internal(abruptEndCanary_t test
)
183 int const nbWaits
= 16;
185 POOL_ctx
* const ctx
= POOL_create(3 /*numThreads*/, nbWaits
/*queueSize*/);
190 for (i
=0; i
<nbWaits
; i
++)
191 POOL_add(ctx
, &waitIncFn
, &test
); /* all jobs pushed into queue */
193 ASSERT_EQ( POOL_resize(ctx
, 1 /*numThreads*/) , 0 ); /* downsize numThreads, to try to break end condition */
195 POOL_free(ctx
); /* must finish all jobs in queue before giving back control */
196 ASSERT_EQ(test
.val
, nbWaits
);
200 static int testAbruptEnding(void) {
202 abruptEndCanary_t test
;
204 memset(&test
, 0, sizeof(test
));
205 ASSERT_FALSE( ZSTD_pthread_mutex_init(&test
.mut
, NULL
) );
207 result
= testAbruptEnding_internal(test
);
209 ZSTD_pthread_mutex_destroy(&test
.mut
);
215 /* --- test launcher --- */
217 int main(int argc
, const char **argv
) {
222 if (POOL_create(0, 1)) { /* should not be possible */
223 printf("FAIL: should not create POOL with 0 threads\n");
227 for (numThreads
= 1; numThreads
<= 4; ++numThreads
) {
229 for (queueSize
= 0; queueSize
<= 2; ++queueSize
) {
230 printf("queueSize==%u, numThreads=%u \n",
231 (unsigned)queueSize
, (unsigned)numThreads
);
232 if (testOrder(numThreads
, queueSize
)) {
233 printf("FAIL: testOrder\n");
236 printf("SUCCESS: testOrder\n");
237 if (testWait(numThreads
, queueSize
)) {
238 printf("FAIL: testWait\n");
241 printf("SUCCESS: testWait\n");
245 if (testThreadReduction()) {
246 printf("FAIL: thread reduction not effective \n");
249 printf("SUCCESS: thread reduction effective (slower execution) \n");
252 if (testAbruptEnding()) {
253 printf("FAIL: jobs in queue not completed on early end \n");
256 printf("SUCCESS: all jobs in queue completed on early end \n");
259 printf("PASS: all POOL tests\n");