]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | /* |
2 | * Licensed to the Apache Software Foundation (ASF) under one | |
3 | * or more contributor license agreements. See the NOTICE file | |
4 | * distributed with this work for additional information | |
5 | * regarding copyright ownership. The ASF licenses this file | |
6 | * to you under the Apache License, Version 2.0 (the | |
7 | * "License"); you may not use this file except in compliance | |
8 | * with the License. You may obtain a copy of the License at | |
9 | * | |
10 | * http://www.apache.org/licenses/LICENSE-2.0 | |
11 | * | |
12 | * Unless required by applicable law or agreed to in writing, | |
13 | * software distributed under the License is distributed on an | |
14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
15 | * KIND, either express or implied. See the License for the | |
16 | * specific language governing permissions and limitations | |
17 | * under the License. | |
18 | */ | |
19 | ||
20 | #include <thrift/thrift-config.h> | |
21 | #include <thrift/concurrency/ThreadManager.h> | |
22 | #include <thrift/concurrency/ThreadFactory.h> | |
23 | #include <thrift/concurrency/Monitor.h> | |
24 | ||
25 | #include <assert.h> | |
26 | #include <deque> | |
27 | #include <set> | |
28 | #include <iostream> | |
29 | #include <stdint.h> | |
30 | ||
31 | namespace apache { | |
32 | namespace thrift { | |
33 | namespace concurrency { | |
34 | namespace test { | |
35 | ||
36 | using namespace apache::thrift::concurrency; | |
37 | ||
38 | static std::deque<std::shared_ptr<Runnable> > m_expired; | |
39 | static void expiredNotifier(std::shared_ptr<Runnable> runnable) | |
40 | { | |
41 | m_expired.push_back(runnable); | |
42 | } | |
43 | ||
44 | static void sleep_(int64_t millisec) { | |
45 | Monitor _sleep; | |
46 | Synchronized s(_sleep); | |
47 | ||
48 | try { | |
49 | _sleep.wait(millisec); | |
50 | } catch (TimedOutException&) { | |
51 | ; | |
52 | } catch (...) { | |
53 | assert(0); | |
54 | } | |
55 | } | |
56 | ||
57 | class ThreadManagerTests { | |
58 | ||
59 | public: | |
60 | class Task : public Runnable { | |
61 | ||
62 | public: | |
63 | Task(Monitor& monitor, size_t& count, int64_t timeout) | |
64 | : _monitor(monitor), _count(count), _timeout(timeout), _startTime(0), _endTime(0), _done(false) {} | |
65 | ||
66 | void run() override { | |
67 | ||
68 | _startTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count(); | |
69 | ||
70 | sleep_(_timeout); | |
71 | ||
72 | _endTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count(); | |
73 | ||
74 | _done = true; | |
75 | ||
76 | { | |
77 | Synchronized s(_monitor); | |
78 | ||
79 | // std::cout << "Thread " << _count << " completed " << std::endl; | |
80 | ||
81 | _count--; | |
82 | if (_count % 10000 == 0) { | |
83 | _monitor.notify(); | |
84 | } | |
85 | } | |
86 | } | |
87 | ||
88 | Monitor& _monitor; | |
89 | size_t& _count; | |
90 | int64_t _timeout; | |
91 | int64_t _startTime; | |
92 | int64_t _endTime; | |
93 | bool _done; | |
94 | Monitor _sleep; | |
95 | }; | |
96 | ||
97 | /** | |
98 | * Dispatch count tasks, each of which blocks for timeout milliseconds then | |
99 | * completes. Verify that all tasks completed and that thread manager cleans | |
100 | * up properly on delete. | |
101 | */ | |
102 | bool loadTest(size_t count = 100, int64_t timeout = 100LL, size_t workerCount = 4) { | |
103 | ||
104 | Monitor monitor; | |
105 | ||
106 | size_t activeCount = count; | |
107 | ||
108 | shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount); | |
109 | ||
110 | shared_ptr<ThreadFactory> threadFactory | |
111 | = shared_ptr<ThreadFactory>(new ThreadFactory(false)); | |
112 | ||
113 | threadManager->threadFactory(threadFactory); | |
114 | ||
115 | threadManager->start(); | |
116 | ||
117 | std::set<shared_ptr<ThreadManagerTests::Task> > tasks; | |
118 | ||
119 | for (size_t ix = 0; ix < count; ix++) { | |
120 | ||
121 | tasks.insert(shared_ptr<ThreadManagerTests::Task>( | |
122 | new ThreadManagerTests::Task(monitor, activeCount, timeout))); | |
123 | } | |
124 | ||
125 | int64_t time00 = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count(); | |
126 | ||
127 | for (auto ix = tasks.begin(); | |
128 | ix != tasks.end(); | |
129 | ix++) { | |
130 | ||
131 | threadManager->add(*ix); | |
132 | } | |
133 | ||
134 | std::cout << "\t\t\t\tloaded " << count << " tasks to execute" << std::endl; | |
135 | ||
136 | { | |
137 | Synchronized s(monitor); | |
138 | ||
139 | while (activeCount > 0) { | |
140 | std::cout << "\t\t\t\tactiveCount = " << activeCount << std::endl; | |
141 | monitor.wait(); | |
142 | } | |
143 | } | |
144 | ||
145 | int64_t time01 = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count(); | |
146 | ||
147 | int64_t firstTime = 9223372036854775807LL; | |
148 | int64_t lastTime = 0; | |
149 | ||
150 | double averageTime = 0; | |
151 | int64_t minTime = 9223372036854775807LL; | |
152 | int64_t maxTime = 0; | |
153 | ||
154 | for (auto ix = tasks.begin(); | |
155 | ix != tasks.end(); | |
156 | ix++) { | |
157 | ||
158 | shared_ptr<ThreadManagerTests::Task> task = *ix; | |
159 | ||
160 | int64_t delta = task->_endTime - task->_startTime; | |
161 | ||
162 | assert(delta > 0); | |
163 | ||
164 | if (task->_startTime < firstTime) { | |
165 | firstTime = task->_startTime; | |
166 | } | |
167 | ||
168 | if (task->_endTime > lastTime) { | |
169 | lastTime = task->_endTime; | |
170 | } | |
171 | ||
172 | if (delta < minTime) { | |
173 | minTime = delta; | |
174 | } | |
175 | ||
176 | if (delta > maxTime) { | |
177 | maxTime = delta; | |
178 | } | |
179 | ||
180 | averageTime += delta; | |
181 | } | |
182 | ||
183 | averageTime /= count; | |
184 | ||
185 | std::cout << "\t\t\tfirst start: " << firstTime << " Last end: " << lastTime | |
186 | << " min: " << minTime << "ms max: " << maxTime << "ms average: " << averageTime | |
187 | << "ms" << std::endl; | |
188 | ||
189 | bool success = (time01 - time00) >= ((int64_t)count * timeout) / (int64_t)workerCount; | |
190 | ||
191 | std::cout << "\t\t\t" << (success ? "Success" : "Failure") | |
192 | << "! expected time: " << ((int64_t)count * timeout) / (int64_t)workerCount << "ms elapsed time: " << time01 - time00 | |
193 | << "ms" << std::endl; | |
194 | ||
195 | return success; | |
196 | } | |
197 | ||
198 | class BlockTask : public Runnable { | |
199 | ||
200 | public: | |
201 | BlockTask(Monitor& entryMonitor, Monitor& blockMonitor, bool& blocked, Monitor& doneMonitor, size_t& count) | |
202 | : _entryMonitor(entryMonitor), _entered(false), _blockMonitor(blockMonitor), _blocked(blocked), _doneMonitor(doneMonitor), _count(count) {} | |
203 | ||
204 | void run() override { | |
205 | { | |
206 | Synchronized s(_entryMonitor); | |
207 | _entered = true; | |
208 | _entryMonitor.notify(); | |
209 | } | |
210 | ||
211 | { | |
212 | Synchronized s(_blockMonitor); | |
213 | while (_blocked) { | |
214 | _blockMonitor.wait(); | |
215 | } | |
216 | } | |
217 | ||
218 | { | |
219 | Synchronized s(_doneMonitor); | |
220 | if (--_count == 0) { | |
221 | _doneMonitor.notify(); | |
222 | } | |
223 | } | |
224 | } | |
225 | ||
226 | Monitor& _entryMonitor; | |
227 | bool _entered; | |
228 | Monitor& _blockMonitor; | |
229 | bool& _blocked; | |
230 | Monitor& _doneMonitor; | |
231 | size_t& _count; | |
232 | }; | |
233 | ||
234 | /** | |
235 | * Block test. Create pendingTaskCountMax tasks. Verify that we block adding the | |
236 | * pendingTaskCountMax + 1th task. Verify that we unblock when a task completes */ | |
237 | ||
238 | bool blockTest(int64_t timeout = 100LL, size_t workerCount = 2) { | |
239 | (void)timeout; | |
240 | bool success = false; | |
241 | ||
242 | try { | |
243 | ||
244 | Monitor entryMonitor; // not used by this test | |
245 | Monitor blockMonitor; | |
246 | bool blocked[] = {true, true, true}; | |
247 | Monitor doneMonitor; | |
248 | ||
249 | size_t pendingTaskMaxCount = workerCount; | |
250 | ||
251 | size_t activeCounts[] = {workerCount, pendingTaskMaxCount, 1}; | |
252 | ||
253 | shared_ptr<ThreadManager> threadManager | |
254 | = ThreadManager::newSimpleThreadManager(workerCount, pendingTaskMaxCount); | |
255 | ||
256 | shared_ptr<ThreadFactory> threadFactory | |
257 | = shared_ptr<ThreadFactory>(new ThreadFactory()); | |
258 | ||
259 | threadManager->threadFactory(threadFactory); | |
260 | ||
261 | threadManager->start(); | |
262 | ||
263 | std::vector<shared_ptr<ThreadManagerTests::BlockTask> > tasks; | |
264 | tasks.reserve(workerCount + pendingTaskMaxCount); | |
265 | ||
266 | for (size_t ix = 0; ix < workerCount; ix++) { | |
267 | ||
268 | tasks.push_back(shared_ptr<ThreadManagerTests::BlockTask>( | |
269 | new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked[0], doneMonitor, activeCounts[0]))); | |
270 | } | |
271 | ||
272 | for (size_t ix = 0; ix < pendingTaskMaxCount; ix++) { | |
273 | ||
274 | tasks.push_back(shared_ptr<ThreadManagerTests::BlockTask>( | |
275 | new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked[1], doneMonitor, activeCounts[1]))); | |
276 | } | |
277 | ||
278 | for (auto ix = tasks.begin(); | |
279 | ix != tasks.end(); | |
280 | ix++) { | |
281 | threadManager->add(*ix); | |
282 | } | |
283 | ||
284 | if (!(success = (threadManager->totalTaskCount() == pendingTaskMaxCount + workerCount))) { | |
285 | throw TException("Unexpected pending task count"); | |
286 | } | |
287 | ||
288 | shared_ptr<ThreadManagerTests::BlockTask> extraTask( | |
289 | new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked[2], doneMonitor, activeCounts[2])); | |
290 | ||
291 | try { | |
292 | threadManager->add(extraTask, 1); | |
293 | throw TException("Unexpected success adding task in excess of pending task count"); | |
294 | } catch (TooManyPendingTasksException&) { | |
295 | throw TException("Should have timed out adding task in excess of pending task count"); | |
296 | } catch (TimedOutException&) { | |
297 | // Expected result | |
298 | } | |
299 | ||
300 | try { | |
301 | threadManager->add(extraTask, -1); | |
302 | throw TException("Unexpected success adding task in excess of pending task count"); | |
303 | } catch (TimedOutException&) { | |
304 | throw TException("Unexpected timeout adding task in excess of pending task count"); | |
305 | } catch (TooManyPendingTasksException&) { | |
306 | // Expected result | |
307 | } | |
308 | ||
309 | std::cout << "\t\t\t" | |
310 | << "Pending tasks " << threadManager->pendingTaskCount() << std::endl; | |
311 | ||
312 | { | |
313 | Synchronized s(blockMonitor); | |
314 | blocked[0] = false; | |
315 | blockMonitor.notifyAll(); | |
316 | } | |
317 | ||
318 | { | |
319 | Synchronized s(doneMonitor); | |
320 | while (activeCounts[0] != 0) { | |
321 | doneMonitor.wait(); | |
322 | } | |
323 | } | |
324 | ||
325 | std::cout << "\t\t\t" | |
326 | << "Pending tasks " << threadManager->pendingTaskCount() << std::endl; | |
327 | ||
328 | try { | |
329 | threadManager->add(extraTask, 1); | |
330 | } catch (TimedOutException&) { | |
331 | std::cout << "\t\t\t" | |
332 | << "add timed out unexpectedly" << std::endl; | |
333 | throw TException("Unexpected timeout adding task"); | |
334 | ||
335 | } catch (TooManyPendingTasksException&) { | |
336 | std::cout << "\t\t\t" | |
337 | << "add encountered too many pending exepctions" << std::endl; | |
338 | throw TException("Unexpected timeout adding task"); | |
339 | } | |
340 | ||
341 | // Wake up tasks that were pending before and wait for them to complete | |
342 | ||
343 | { | |
344 | Synchronized s(blockMonitor); | |
345 | blocked[1] = false; | |
346 | blockMonitor.notifyAll(); | |
347 | } | |
348 | ||
349 | { | |
350 | Synchronized s(doneMonitor); | |
351 | while (activeCounts[1] != 0) { | |
352 | doneMonitor.wait(); | |
353 | } | |
354 | } | |
355 | ||
356 | // Wake up the extra task and wait for it to complete | |
357 | ||
358 | { | |
359 | Synchronized s(blockMonitor); | |
360 | blocked[2] = false; | |
361 | blockMonitor.notifyAll(); | |
362 | } | |
363 | ||
364 | { | |
365 | Synchronized s(doneMonitor); | |
366 | while (activeCounts[2] != 0) { | |
367 | doneMonitor.wait(); | |
368 | } | |
369 | } | |
370 | ||
371 | threadManager->stop(); | |
372 | ||
373 | if (!(success = (threadManager->totalTaskCount() == 0))) { | |
374 | throw TException("Unexpected total task count"); | |
375 | } | |
376 | ||
377 | } catch (TException& e) { | |
378 | std::cout << "ERROR: " << e.what() << std::endl; | |
379 | } | |
380 | ||
381 | std::cout << "\t\t\t" << (success ? "Success" : "Failure") << std::endl; | |
382 | return success; | |
383 | } | |
384 | ||
385 | ||
386 | bool apiTest() { | |
387 | ||
388 | // prove currentTime has milliseconds granularity since many other things depend on it | |
389 | int64_t a = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count(); | |
390 | sleep_(100); | |
391 | int64_t b = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count(); | |
392 | if (b - a < 50 || b - a > 150) { | |
393 | std::cerr << "\t\t\texpected 100ms gap, found " << (b-a) << "ms gap instead." << std::endl; | |
394 | return false; | |
395 | } | |
396 | ||
397 | return apiTestWithThreadFactory(shared_ptr<ThreadFactory>(new ThreadFactory())); | |
398 | ||
399 | } | |
400 | ||
401 | bool apiTestWithThreadFactory(shared_ptr<ThreadFactory> threadFactory) | |
402 | { | |
403 | shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(1); | |
404 | threadManager->threadFactory(threadFactory); | |
405 | ||
406 | std::cout << "\t\t\t\tstarting.. " << std::endl; | |
407 | ||
408 | threadManager->start(); | |
409 | threadManager->setExpireCallback(expiredNotifier); // std::bind(&ThreadManagerTests::expiredNotifier, this)); | |
410 | ||
411 | #define EXPECT(FUNC, COUNT) { size_t c = FUNC; if (c != COUNT) { std::cerr << "expected " #FUNC" to be " #COUNT ", but was " << c << std::endl; return false; } } | |
412 | ||
413 | EXPECT(threadManager->workerCount(), 1); | |
414 | EXPECT(threadManager->idleWorkerCount(), 1); | |
415 | EXPECT(threadManager->pendingTaskCount(), 0); | |
416 | ||
417 | std::cout << "\t\t\t\tadd 2nd worker.. " << std::endl; | |
418 | ||
419 | threadManager->addWorker(); | |
420 | ||
421 | EXPECT(threadManager->workerCount(), 2); | |
422 | EXPECT(threadManager->idleWorkerCount(), 2); | |
423 | EXPECT(threadManager->pendingTaskCount(), 0); | |
424 | ||
425 | std::cout << "\t\t\t\tremove 2nd worker.. " << std::endl; | |
426 | ||
427 | threadManager->removeWorker(); | |
428 | ||
429 | EXPECT(threadManager->workerCount(), 1); | |
430 | EXPECT(threadManager->idleWorkerCount(), 1); | |
431 | EXPECT(threadManager->pendingTaskCount(), 0); | |
432 | ||
433 | std::cout << "\t\t\t\tremove 1st worker.. " << std::endl; | |
434 | ||
435 | threadManager->removeWorker(); | |
436 | ||
437 | EXPECT(threadManager->workerCount(), 0); | |
438 | EXPECT(threadManager->idleWorkerCount(), 0); | |
439 | EXPECT(threadManager->pendingTaskCount(), 0); | |
440 | ||
441 | std::cout << "\t\t\t\tadd blocking task.. " << std::endl; | |
442 | ||
443 | // We're going to throw a blocking task into the mix | |
444 | Monitor entryMonitor; // signaled when task is running | |
445 | Monitor blockMonitor; // to be signaled to unblock the task | |
446 | bool blocked(true); // set to false before notifying | |
447 | Monitor doneMonitor; // signaled when count reaches zero | |
448 | size_t activeCount = 1; | |
449 | shared_ptr<ThreadManagerTests::BlockTask> blockingTask( | |
450 | new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked, doneMonitor, activeCount)); | |
451 | threadManager->add(blockingTask); | |
452 | ||
453 | EXPECT(threadManager->workerCount(), 0); | |
454 | EXPECT(threadManager->idleWorkerCount(), 0); | |
455 | EXPECT(threadManager->pendingTaskCount(), 1); | |
456 | ||
457 | std::cout << "\t\t\t\tadd other task.. " << std::endl; | |
458 | ||
459 | shared_ptr<ThreadManagerTests::Task> otherTask( | |
460 | new ThreadManagerTests::Task(doneMonitor, activeCount, 0)); | |
461 | ||
462 | threadManager->add(otherTask); | |
463 | ||
464 | EXPECT(threadManager->workerCount(), 0); | |
465 | EXPECT(threadManager->idleWorkerCount(), 0); | |
466 | EXPECT(threadManager->pendingTaskCount(), 2); | |
467 | ||
468 | std::cout << "\t\t\t\tremove blocking task specifically.. " << std::endl; | |
469 | ||
470 | threadManager->remove(blockingTask); | |
471 | ||
472 | EXPECT(threadManager->workerCount(), 0); | |
473 | EXPECT(threadManager->idleWorkerCount(), 0); | |
474 | EXPECT(threadManager->pendingTaskCount(), 1); | |
475 | ||
476 | std::cout << "\t\t\t\tremove next pending task.." << std::endl; | |
477 | ||
478 | shared_ptr<Runnable> nextTask = threadManager->removeNextPending(); | |
479 | if (nextTask != otherTask) { | |
480 | std::cerr << "\t\t\t\t\texpected removeNextPending to return otherTask" << std::endl; | |
481 | return false; | |
482 | } | |
483 | ||
484 | EXPECT(threadManager->workerCount(), 0); | |
485 | EXPECT(threadManager->idleWorkerCount(), 0); | |
486 | EXPECT(threadManager->pendingTaskCount(), 0); | |
487 | ||
488 | std::cout << "\t\t\t\tremove next pending task (none left).." << std::endl; | |
489 | ||
490 | nextTask = threadManager->removeNextPending(); | |
491 | if (nextTask) { | |
492 | std::cerr << "\t\t\t\t\texpected removeNextPending to return an empty Runnable" << std::endl; | |
493 | return false; | |
494 | } | |
495 | ||
496 | std::cout << "\t\t\t\tadd 2 expired tasks and 1 not.." << std::endl; | |
497 | ||
498 | shared_ptr<ThreadManagerTests::Task> expiredTask( | |
499 | new ThreadManagerTests::Task(doneMonitor, activeCount, 0)); | |
500 | ||
501 | threadManager->add(expiredTask, 0, 1); | |
502 | threadManager->add(blockingTask); // add one that hasn't expired to make sure it gets skipped | |
503 | threadManager->add(expiredTask, 0, 1); // add a second expired to ensure removeExpiredTasks removes both | |
504 | ||
505 | sleep_(50); // make sure enough time elapses for it to expire - the shortest expiration time is 1 millisecond | |
506 | ||
507 | EXPECT(threadManager->workerCount(), 0); | |
508 | EXPECT(threadManager->idleWorkerCount(), 0); | |
509 | EXPECT(threadManager->pendingTaskCount(), 3); | |
510 | EXPECT(threadManager->expiredTaskCount(), 0); | |
511 | ||
512 | std::cout << "\t\t\t\tremove expired tasks.." << std::endl; | |
513 | ||
514 | if (!m_expired.empty()) { | |
515 | std::cerr << "\t\t\t\t\texpected m_expired to be empty" << std::endl; | |
516 | return false; | |
517 | } | |
518 | ||
519 | threadManager->removeExpiredTasks(); | |
520 | ||
521 | if (m_expired.size() != 2) { | |
522 | std::cerr << "\t\t\t\t\texpected m_expired to be set" << std::endl; | |
523 | return false; | |
524 | } | |
525 | ||
526 | if (m_expired.front() != expiredTask) { | |
527 | std::cerr << "\t\t\t\t\texpected m_expired[0] to be the expired task" << std::endl; | |
528 | return false; | |
529 | } | |
530 | m_expired.pop_front(); | |
531 | ||
532 | if (m_expired.front() != expiredTask) { | |
533 | std::cerr << "\t\t\t\t\texpected m_expired[1] to be the expired task" << std::endl; | |
534 | return false; | |
535 | } | |
536 | ||
537 | m_expired.clear(); | |
538 | ||
539 | threadManager->remove(blockingTask); | |
540 | ||
541 | EXPECT(threadManager->workerCount(), 0); | |
542 | EXPECT(threadManager->idleWorkerCount(), 0); | |
543 | EXPECT(threadManager->pendingTaskCount(), 0); | |
544 | EXPECT(threadManager->expiredTaskCount(), 2); | |
545 | ||
546 | std::cout << "\t\t\t\tadd expired task (again).." << std::endl; | |
547 | ||
548 | threadManager->add(expiredTask, 0, 1); // expires in 1ms | |
549 | sleep_(50); // make sure enough time elapses for it to expire - the shortest expiration time is 1ms | |
550 | ||
551 | std::cout << "\t\t\t\tadd worker to consume expired task.." << std::endl; | |
552 | ||
553 | threadManager->addWorker(); | |
554 | sleep_(100); // make sure it has time to spin up and expire the task | |
555 | ||
556 | if (m_expired.empty()) { | |
557 | std::cerr << "\t\t\t\t\texpected m_expired to be set" << std::endl; | |
558 | return false; | |
559 | } | |
560 | ||
561 | if (m_expired.front() != expiredTask) { | |
562 | std::cerr << "\t\t\t\t\texpected m_expired to be the expired task" << std::endl; | |
563 | return false; | |
564 | } | |
565 | ||
566 | m_expired.clear(); | |
567 | ||
568 | EXPECT(threadManager->workerCount(), 1); | |
569 | EXPECT(threadManager->idleWorkerCount(), 1); | |
570 | EXPECT(threadManager->pendingTaskCount(), 0); | |
571 | EXPECT(threadManager->expiredTaskCount(), 3); | |
572 | ||
573 | std::cout << "\t\t\t\ttry to remove too many workers" << std::endl; | |
574 | try { | |
575 | threadManager->removeWorker(2); | |
576 | std::cerr << "\t\t\t\t\texpected InvalidArgumentException" << std::endl; | |
577 | return false; | |
578 | } catch (const InvalidArgumentException&) { | |
579 | /* expected */ | |
580 | } | |
581 | ||
582 | std::cout << "\t\t\t\tremove worker.. " << std::endl; | |
583 | ||
584 | threadManager->removeWorker(); | |
585 | ||
586 | EXPECT(threadManager->workerCount(), 0); | |
587 | EXPECT(threadManager->idleWorkerCount(), 0); | |
588 | EXPECT(threadManager->pendingTaskCount(), 0); | |
589 | EXPECT(threadManager->expiredTaskCount(), 3); | |
590 | ||
591 | std::cout << "\t\t\t\tadd blocking task.. " << std::endl; | |
592 | ||
593 | threadManager->add(blockingTask); | |
594 | ||
595 | EXPECT(threadManager->workerCount(), 0); | |
596 | EXPECT(threadManager->idleWorkerCount(), 0); | |
597 | EXPECT(threadManager->pendingTaskCount(), 1); | |
598 | ||
599 | std::cout << "\t\t\t\tadd worker.. " << std::endl; | |
600 | ||
601 | threadManager->addWorker(); | |
602 | { | |
603 | Synchronized s(entryMonitor); | |
604 | while (!blockingTask->_entered) { | |
605 | entryMonitor.wait(); | |
606 | } | |
607 | } | |
608 | ||
609 | EXPECT(threadManager->workerCount(), 1); | |
610 | EXPECT(threadManager->idleWorkerCount(), 0); | |
611 | EXPECT(threadManager->pendingTaskCount(), 0); | |
612 | ||
613 | std::cout << "\t\t\t\tunblock task and remove worker.. " << std::endl; | |
614 | ||
615 | { | |
616 | Synchronized s(blockMonitor); | |
617 | blocked = false; | |
618 | blockMonitor.notifyAll(); | |
619 | } | |
620 | threadManager->removeWorker(); | |
621 | ||
622 | EXPECT(threadManager->workerCount(), 0); | |
623 | EXPECT(threadManager->idleWorkerCount(), 0); | |
624 | EXPECT(threadManager->pendingTaskCount(), 0); | |
625 | ||
626 | std::cout << "\t\t\t\tcleanup.. " << std::endl; | |
627 | ||
628 | blockingTask.reset(); | |
629 | threadManager.reset(); | |
630 | return true; | |
631 | } | |
632 | }; | |
633 | ||
634 | } | |
635 | } | |
636 | } | |
637 | } // apache::thrift::concurrency | |
638 | ||
639 | using namespace apache::thrift::concurrency::test; |