]> git.proxmox.com Git - ceph.git/blame - ceph/src/jaegertracing/thrift/lib/cpp/test/concurrency/ThreadManagerTests.h
buildsys: switch source download to quincy
[ceph.git] / ceph / src / jaegertracing / thrift / lib / cpp / test / concurrency / ThreadManagerTests.h
CommitLineData
f67539c2
TL
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#include <thrift/thrift-config.h>
21#include <thrift/concurrency/ThreadManager.h>
22#include <thrift/concurrency/ThreadFactory.h>
23#include <thrift/concurrency/Monitor.h>
24
25#include <assert.h>
26#include <deque>
27#include <set>
28#include <iostream>
29#include <stdint.h>
30
31namespace apache {
32namespace thrift {
33namespace concurrency {
34namespace test {
35
36using namespace apache::thrift::concurrency;
37
38static std::deque<std::shared_ptr<Runnable> > m_expired;
39static void expiredNotifier(std::shared_ptr<Runnable> runnable)
40{
41 m_expired.push_back(runnable);
42}
43
44static void sleep_(int64_t millisec) {
45 Monitor _sleep;
46 Synchronized s(_sleep);
47
48 try {
49 _sleep.wait(millisec);
50 } catch (TimedOutException&) {
51 ;
52 } catch (...) {
53 assert(0);
54 }
55}
56
57class ThreadManagerTests {
58
59public:
60 class Task : public Runnable {
61
62 public:
63 Task(Monitor& monitor, size_t& count, int64_t timeout)
64 : _monitor(monitor), _count(count), _timeout(timeout), _startTime(0), _endTime(0), _done(false) {}
65
66 void run() override {
67
68 _startTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
69
70 sleep_(_timeout);
71
72 _endTime = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
73
74 _done = true;
75
76 {
77 Synchronized s(_monitor);
78
79 // std::cout << "Thread " << _count << " completed " << std::endl;
80
81 _count--;
82 if (_count % 10000 == 0) {
83 _monitor.notify();
84 }
85 }
86 }
87
88 Monitor& _monitor;
89 size_t& _count;
90 int64_t _timeout;
91 int64_t _startTime;
92 int64_t _endTime;
93 bool _done;
94 Monitor _sleep;
95 };
96
97 /**
98 * Dispatch count tasks, each of which blocks for timeout milliseconds then
99 * completes. Verify that all tasks completed and that thread manager cleans
100 * up properly on delete.
101 */
102 bool loadTest(size_t count = 100, int64_t timeout = 100LL, size_t workerCount = 4) {
103
104 Monitor monitor;
105
106 size_t activeCount = count;
107
108 shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount);
109
110 shared_ptr<ThreadFactory> threadFactory
111 = shared_ptr<ThreadFactory>(new ThreadFactory(false));
112
113 threadManager->threadFactory(threadFactory);
114
115 threadManager->start();
116
117 std::set<shared_ptr<ThreadManagerTests::Task> > tasks;
118
119 for (size_t ix = 0; ix < count; ix++) {
120
121 tasks.insert(shared_ptr<ThreadManagerTests::Task>(
122 new ThreadManagerTests::Task(monitor, activeCount, timeout)));
123 }
124
125 int64_t time00 = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
126
127 for (auto ix = tasks.begin();
128 ix != tasks.end();
129 ix++) {
130
131 threadManager->add(*ix);
132 }
133
134 std::cout << "\t\t\t\tloaded " << count << " tasks to execute" << std::endl;
135
136 {
137 Synchronized s(monitor);
138
139 while (activeCount > 0) {
140 std::cout << "\t\t\t\tactiveCount = " << activeCount << std::endl;
141 monitor.wait();
142 }
143 }
144
145 int64_t time01 = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
146
147 int64_t firstTime = 9223372036854775807LL;
148 int64_t lastTime = 0;
149
150 double averageTime = 0;
151 int64_t minTime = 9223372036854775807LL;
152 int64_t maxTime = 0;
153
154 for (auto ix = tasks.begin();
155 ix != tasks.end();
156 ix++) {
157
158 shared_ptr<ThreadManagerTests::Task> task = *ix;
159
160 int64_t delta = task->_endTime - task->_startTime;
161
162 assert(delta > 0);
163
164 if (task->_startTime < firstTime) {
165 firstTime = task->_startTime;
166 }
167
168 if (task->_endTime > lastTime) {
169 lastTime = task->_endTime;
170 }
171
172 if (delta < minTime) {
173 minTime = delta;
174 }
175
176 if (delta > maxTime) {
177 maxTime = delta;
178 }
179
180 averageTime += delta;
181 }
182
183 averageTime /= count;
184
185 std::cout << "\t\t\tfirst start: " << firstTime << " Last end: " << lastTime
186 << " min: " << minTime << "ms max: " << maxTime << "ms average: " << averageTime
187 << "ms" << std::endl;
188
189 bool success = (time01 - time00) >= ((int64_t)count * timeout) / (int64_t)workerCount;
190
191 std::cout << "\t\t\t" << (success ? "Success" : "Failure")
192 << "! expected time: " << ((int64_t)count * timeout) / (int64_t)workerCount << "ms elapsed time: " << time01 - time00
193 << "ms" << std::endl;
194
195 return success;
196 }
197
198 class BlockTask : public Runnable {
199
200 public:
201 BlockTask(Monitor& entryMonitor, Monitor& blockMonitor, bool& blocked, Monitor& doneMonitor, size_t& count)
202 : _entryMonitor(entryMonitor), _entered(false), _blockMonitor(blockMonitor), _blocked(blocked), _doneMonitor(doneMonitor), _count(count) {}
203
204 void run() override {
205 {
206 Synchronized s(_entryMonitor);
207 _entered = true;
208 _entryMonitor.notify();
209 }
210
211 {
212 Synchronized s(_blockMonitor);
213 while (_blocked) {
214 _blockMonitor.wait();
215 }
216 }
217
218 {
219 Synchronized s(_doneMonitor);
220 if (--_count == 0) {
221 _doneMonitor.notify();
222 }
223 }
224 }
225
226 Monitor& _entryMonitor;
227 bool _entered;
228 Monitor& _blockMonitor;
229 bool& _blocked;
230 Monitor& _doneMonitor;
231 size_t& _count;
232 };
233
234 /**
235 * Block test. Create pendingTaskCountMax tasks. Verify that we block adding the
236 * pendingTaskCountMax + 1th task. Verify that we unblock when a task completes */
237
238 bool blockTest(int64_t timeout = 100LL, size_t workerCount = 2) {
239 (void)timeout;
240 bool success = false;
241
242 try {
243
244 Monitor entryMonitor; // not used by this test
245 Monitor blockMonitor;
246 bool blocked[] = {true, true, true};
247 Monitor doneMonitor;
248
249 size_t pendingTaskMaxCount = workerCount;
250
251 size_t activeCounts[] = {workerCount, pendingTaskMaxCount, 1};
252
253 shared_ptr<ThreadManager> threadManager
254 = ThreadManager::newSimpleThreadManager(workerCount, pendingTaskMaxCount);
255
256 shared_ptr<ThreadFactory> threadFactory
257 = shared_ptr<ThreadFactory>(new ThreadFactory());
258
259 threadManager->threadFactory(threadFactory);
260
261 threadManager->start();
262
263 std::vector<shared_ptr<ThreadManagerTests::BlockTask> > tasks;
264 tasks.reserve(workerCount + pendingTaskMaxCount);
265
266 for (size_t ix = 0; ix < workerCount; ix++) {
267
268 tasks.push_back(shared_ptr<ThreadManagerTests::BlockTask>(
269 new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked[0], doneMonitor, activeCounts[0])));
270 }
271
272 for (size_t ix = 0; ix < pendingTaskMaxCount; ix++) {
273
274 tasks.push_back(shared_ptr<ThreadManagerTests::BlockTask>(
275 new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked[1], doneMonitor, activeCounts[1])));
276 }
277
278 for (auto ix = tasks.begin();
279 ix != tasks.end();
280 ix++) {
281 threadManager->add(*ix);
282 }
283
284 if (!(success = (threadManager->totalTaskCount() == pendingTaskMaxCount + workerCount))) {
285 throw TException("Unexpected pending task count");
286 }
287
288 shared_ptr<ThreadManagerTests::BlockTask> extraTask(
289 new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked[2], doneMonitor, activeCounts[2]));
290
291 try {
292 threadManager->add(extraTask, 1);
293 throw TException("Unexpected success adding task in excess of pending task count");
294 } catch (TooManyPendingTasksException&) {
295 throw TException("Should have timed out adding task in excess of pending task count");
296 } catch (TimedOutException&) {
297 // Expected result
298 }
299
300 try {
301 threadManager->add(extraTask, -1);
302 throw TException("Unexpected success adding task in excess of pending task count");
303 } catch (TimedOutException&) {
304 throw TException("Unexpected timeout adding task in excess of pending task count");
305 } catch (TooManyPendingTasksException&) {
306 // Expected result
307 }
308
309 std::cout << "\t\t\t"
310 << "Pending tasks " << threadManager->pendingTaskCount() << std::endl;
311
312 {
313 Synchronized s(blockMonitor);
314 blocked[0] = false;
315 blockMonitor.notifyAll();
316 }
317
318 {
319 Synchronized s(doneMonitor);
320 while (activeCounts[0] != 0) {
321 doneMonitor.wait();
322 }
323 }
324
325 std::cout << "\t\t\t"
326 << "Pending tasks " << threadManager->pendingTaskCount() << std::endl;
327
328 try {
329 threadManager->add(extraTask, 1);
330 } catch (TimedOutException&) {
331 std::cout << "\t\t\t"
332 << "add timed out unexpectedly" << std::endl;
333 throw TException("Unexpected timeout adding task");
334
335 } catch (TooManyPendingTasksException&) {
336 std::cout << "\t\t\t"
337 << "add encountered too many pending exepctions" << std::endl;
338 throw TException("Unexpected timeout adding task");
339 }
340
341 // Wake up tasks that were pending before and wait for them to complete
342
343 {
344 Synchronized s(blockMonitor);
345 blocked[1] = false;
346 blockMonitor.notifyAll();
347 }
348
349 {
350 Synchronized s(doneMonitor);
351 while (activeCounts[1] != 0) {
352 doneMonitor.wait();
353 }
354 }
355
356 // Wake up the extra task and wait for it to complete
357
358 {
359 Synchronized s(blockMonitor);
360 blocked[2] = false;
361 blockMonitor.notifyAll();
362 }
363
364 {
365 Synchronized s(doneMonitor);
366 while (activeCounts[2] != 0) {
367 doneMonitor.wait();
368 }
369 }
370
371 threadManager->stop();
372
373 if (!(success = (threadManager->totalTaskCount() == 0))) {
374 throw TException("Unexpected total task count");
375 }
376
377 } catch (TException& e) {
378 std::cout << "ERROR: " << e.what() << std::endl;
379 }
380
381 std::cout << "\t\t\t" << (success ? "Success" : "Failure") << std::endl;
382 return success;
383 }
384
385
386 bool apiTest() {
387
388 // prove currentTime has milliseconds granularity since many other things depend on it
389 int64_t a = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
390 sleep_(100);
391 int64_t b = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count();
392 if (b - a < 50 || b - a > 150) {
393 std::cerr << "\t\t\texpected 100ms gap, found " << (b-a) << "ms gap instead." << std::endl;
394 return false;
395 }
396
397 return apiTestWithThreadFactory(shared_ptr<ThreadFactory>(new ThreadFactory()));
398
399 }
400
401 bool apiTestWithThreadFactory(shared_ptr<ThreadFactory> threadFactory)
402 {
403 shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(1);
404 threadManager->threadFactory(threadFactory);
405
406 std::cout << "\t\t\t\tstarting.. " << std::endl;
407
408 threadManager->start();
409 threadManager->setExpireCallback(expiredNotifier); // std::bind(&ThreadManagerTests::expiredNotifier, this));
410
411#define EXPECT(FUNC, COUNT) { size_t c = FUNC; if (c != COUNT) { std::cerr << "expected " #FUNC" to be " #COUNT ", but was " << c << std::endl; return false; } }
412
413 EXPECT(threadManager->workerCount(), 1);
414 EXPECT(threadManager->idleWorkerCount(), 1);
415 EXPECT(threadManager->pendingTaskCount(), 0);
416
417 std::cout << "\t\t\t\tadd 2nd worker.. " << std::endl;
418
419 threadManager->addWorker();
420
421 EXPECT(threadManager->workerCount(), 2);
422 EXPECT(threadManager->idleWorkerCount(), 2);
423 EXPECT(threadManager->pendingTaskCount(), 0);
424
425 std::cout << "\t\t\t\tremove 2nd worker.. " << std::endl;
426
427 threadManager->removeWorker();
428
429 EXPECT(threadManager->workerCount(), 1);
430 EXPECT(threadManager->idleWorkerCount(), 1);
431 EXPECT(threadManager->pendingTaskCount(), 0);
432
433 std::cout << "\t\t\t\tremove 1st worker.. " << std::endl;
434
435 threadManager->removeWorker();
436
437 EXPECT(threadManager->workerCount(), 0);
438 EXPECT(threadManager->idleWorkerCount(), 0);
439 EXPECT(threadManager->pendingTaskCount(), 0);
440
441 std::cout << "\t\t\t\tadd blocking task.. " << std::endl;
442
443 // We're going to throw a blocking task into the mix
444 Monitor entryMonitor; // signaled when task is running
445 Monitor blockMonitor; // to be signaled to unblock the task
446 bool blocked(true); // set to false before notifying
447 Monitor doneMonitor; // signaled when count reaches zero
448 size_t activeCount = 1;
449 shared_ptr<ThreadManagerTests::BlockTask> blockingTask(
450 new ThreadManagerTests::BlockTask(entryMonitor, blockMonitor, blocked, doneMonitor, activeCount));
451 threadManager->add(blockingTask);
452
453 EXPECT(threadManager->workerCount(), 0);
454 EXPECT(threadManager->idleWorkerCount(), 0);
455 EXPECT(threadManager->pendingTaskCount(), 1);
456
457 std::cout << "\t\t\t\tadd other task.. " << std::endl;
458
459 shared_ptr<ThreadManagerTests::Task> otherTask(
460 new ThreadManagerTests::Task(doneMonitor, activeCount, 0));
461
462 threadManager->add(otherTask);
463
464 EXPECT(threadManager->workerCount(), 0);
465 EXPECT(threadManager->idleWorkerCount(), 0);
466 EXPECT(threadManager->pendingTaskCount(), 2);
467
468 std::cout << "\t\t\t\tremove blocking task specifically.. " << std::endl;
469
470 threadManager->remove(blockingTask);
471
472 EXPECT(threadManager->workerCount(), 0);
473 EXPECT(threadManager->idleWorkerCount(), 0);
474 EXPECT(threadManager->pendingTaskCount(), 1);
475
476 std::cout << "\t\t\t\tremove next pending task.." << std::endl;
477
478 shared_ptr<Runnable> nextTask = threadManager->removeNextPending();
479 if (nextTask != otherTask) {
480 std::cerr << "\t\t\t\t\texpected removeNextPending to return otherTask" << std::endl;
481 return false;
482 }
483
484 EXPECT(threadManager->workerCount(), 0);
485 EXPECT(threadManager->idleWorkerCount(), 0);
486 EXPECT(threadManager->pendingTaskCount(), 0);
487
488 std::cout << "\t\t\t\tremove next pending task (none left).." << std::endl;
489
490 nextTask = threadManager->removeNextPending();
491 if (nextTask) {
492 std::cerr << "\t\t\t\t\texpected removeNextPending to return an empty Runnable" << std::endl;
493 return false;
494 }
495
496 std::cout << "\t\t\t\tadd 2 expired tasks and 1 not.." << std::endl;
497
498 shared_ptr<ThreadManagerTests::Task> expiredTask(
499 new ThreadManagerTests::Task(doneMonitor, activeCount, 0));
500
501 threadManager->add(expiredTask, 0, 1);
502 threadManager->add(blockingTask); // add one that hasn't expired to make sure it gets skipped
503 threadManager->add(expiredTask, 0, 1); // add a second expired to ensure removeExpiredTasks removes both
504
505 sleep_(50); // make sure enough time elapses for it to expire - the shortest expiration time is 1 millisecond
506
507 EXPECT(threadManager->workerCount(), 0);
508 EXPECT(threadManager->idleWorkerCount(), 0);
509 EXPECT(threadManager->pendingTaskCount(), 3);
510 EXPECT(threadManager->expiredTaskCount(), 0);
511
512 std::cout << "\t\t\t\tremove expired tasks.." << std::endl;
513
514 if (!m_expired.empty()) {
515 std::cerr << "\t\t\t\t\texpected m_expired to be empty" << std::endl;
516 return false;
517 }
518
519 threadManager->removeExpiredTasks();
520
521 if (m_expired.size() != 2) {
522 std::cerr << "\t\t\t\t\texpected m_expired to be set" << std::endl;
523 return false;
524 }
525
526 if (m_expired.front() != expiredTask) {
527 std::cerr << "\t\t\t\t\texpected m_expired[0] to be the expired task" << std::endl;
528 return false;
529 }
530 m_expired.pop_front();
531
532 if (m_expired.front() != expiredTask) {
533 std::cerr << "\t\t\t\t\texpected m_expired[1] to be the expired task" << std::endl;
534 return false;
535 }
536
537 m_expired.clear();
538
539 threadManager->remove(blockingTask);
540
541 EXPECT(threadManager->workerCount(), 0);
542 EXPECT(threadManager->idleWorkerCount(), 0);
543 EXPECT(threadManager->pendingTaskCount(), 0);
544 EXPECT(threadManager->expiredTaskCount(), 2);
545
546 std::cout << "\t\t\t\tadd expired task (again).." << std::endl;
547
548 threadManager->add(expiredTask, 0, 1); // expires in 1ms
549 sleep_(50); // make sure enough time elapses for it to expire - the shortest expiration time is 1ms
550
551 std::cout << "\t\t\t\tadd worker to consume expired task.." << std::endl;
552
553 threadManager->addWorker();
554 sleep_(100); // make sure it has time to spin up and expire the task
555
556 if (m_expired.empty()) {
557 std::cerr << "\t\t\t\t\texpected m_expired to be set" << std::endl;
558 return false;
559 }
560
561 if (m_expired.front() != expiredTask) {
562 std::cerr << "\t\t\t\t\texpected m_expired to be the expired task" << std::endl;
563 return false;
564 }
565
566 m_expired.clear();
567
568 EXPECT(threadManager->workerCount(), 1);
569 EXPECT(threadManager->idleWorkerCount(), 1);
570 EXPECT(threadManager->pendingTaskCount(), 0);
571 EXPECT(threadManager->expiredTaskCount(), 3);
572
573 std::cout << "\t\t\t\ttry to remove too many workers" << std::endl;
574 try {
575 threadManager->removeWorker(2);
576 std::cerr << "\t\t\t\t\texpected InvalidArgumentException" << std::endl;
577 return false;
578 } catch (const InvalidArgumentException&) {
579 /* expected */
580 }
581
582 std::cout << "\t\t\t\tremove worker.. " << std::endl;
583
584 threadManager->removeWorker();
585
586 EXPECT(threadManager->workerCount(), 0);
587 EXPECT(threadManager->idleWorkerCount(), 0);
588 EXPECT(threadManager->pendingTaskCount(), 0);
589 EXPECT(threadManager->expiredTaskCount(), 3);
590
591 std::cout << "\t\t\t\tadd blocking task.. " << std::endl;
592
593 threadManager->add(blockingTask);
594
595 EXPECT(threadManager->workerCount(), 0);
596 EXPECT(threadManager->idleWorkerCount(), 0);
597 EXPECT(threadManager->pendingTaskCount(), 1);
598
599 std::cout << "\t\t\t\tadd worker.. " << std::endl;
600
601 threadManager->addWorker();
602 {
603 Synchronized s(entryMonitor);
604 while (!blockingTask->_entered) {
605 entryMonitor.wait();
606 }
607 }
608
609 EXPECT(threadManager->workerCount(), 1);
610 EXPECT(threadManager->idleWorkerCount(), 0);
611 EXPECT(threadManager->pendingTaskCount(), 0);
612
613 std::cout << "\t\t\t\tunblock task and remove worker.. " << std::endl;
614
615 {
616 Synchronized s(blockMonitor);
617 blocked = false;
618 blockMonitor.notifyAll();
619 }
620 threadManager->removeWorker();
621
622 EXPECT(threadManager->workerCount(), 0);
623 EXPECT(threadManager->idleWorkerCount(), 0);
624 EXPECT(threadManager->pendingTaskCount(), 0);
625
626 std::cout << "\t\t\t\tcleanup.. " << std::endl;
627
628 blockingTask.reset();
629 threadManager.reset();
630 return true;
631 }
632};
633
634}
635}
636}
637} // apache::thrift::concurrency
638
639using namespace apache::thrift::concurrency::test;