2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
20 #include <thrift/thrift-config.h>
21 #include <thrift/concurrency/ThreadManager.h>
22 #include <thrift/concurrency/ThreadFactory.h>
23 #include <thrift/concurrency/Monitor.h>
33 namespace concurrency
{
36 using namespace apache::thrift::concurrency
;
38 static std::deque
<std::shared_ptr
<Runnable
> > m_expired
;
39 static void expiredNotifier(std::shared_ptr
<Runnable
> runnable
)
41 m_expired
.push_back(runnable
);
44 static void sleep_(int64_t millisec
) {
46 Synchronized
s(_sleep
);
49 _sleep
.wait(millisec
);
50 } catch (TimedOutException
&) {
57 class ThreadManagerTests
{
60 class Task
: public Runnable
{
63 Task(Monitor
& monitor
, size_t& count
, int64_t timeout
)
64 : _monitor(monitor
), _count(count
), _timeout(timeout
), _startTime(0), _endTime(0), _done(false) {}
68 _startTime
= std::chrono::duration_cast
<std::chrono::milliseconds
>(std::chrono::steady_clock::now().time_since_epoch()).count();
72 _endTime
= std::chrono::duration_cast
<std::chrono::milliseconds
>(std::chrono::steady_clock::now().time_since_epoch()).count();
77 Synchronized
s(_monitor
);
79 // std::cout << "Thread " << _count << " completed " << std::endl;
82 if (_count
% 10000 == 0) {
98 * Dispatch count tasks, each of which blocks for timeout milliseconds then
99 * completes. Verify that all tasks completed and that thread manager cleans
100 * up properly on delete.
102 bool loadTest(size_t count
= 100, int64_t timeout
= 100LL, size_t workerCount
= 4) {
106 size_t activeCount
= count
;
108 shared_ptr
<ThreadManager
> threadManager
= ThreadManager::newSimpleThreadManager(workerCount
);
110 shared_ptr
<ThreadFactory
> threadFactory
111 = shared_ptr
<ThreadFactory
>(new ThreadFactory(false));
113 threadManager
->threadFactory(threadFactory
);
115 threadManager
->start();
117 std::set
<shared_ptr
<ThreadManagerTests::Task
> > tasks
;
119 for (size_t ix
= 0; ix
< count
; ix
++) {
121 tasks
.insert(shared_ptr
<ThreadManagerTests::Task
>(
122 new ThreadManagerTests::Task(monitor
, activeCount
, timeout
)));
125 int64_t time00
= std::chrono::duration_cast
<std::chrono::milliseconds
>(std::chrono::steady_clock::now().time_since_epoch()).count();
127 for (auto ix
= tasks
.begin();
131 threadManager
->add(*ix
);
134 std::cout
<< "\t\t\t\tloaded " << count
<< " tasks to execute" << std::endl
;
137 Synchronized
s(monitor
);
139 while (activeCount
> 0) {
140 std::cout
<< "\t\t\t\tactiveCount = " << activeCount
<< std::endl
;
145 int64_t time01
= std::chrono::duration_cast
<std::chrono::milliseconds
>(std::chrono::steady_clock::now().time_since_epoch()).count();
147 int64_t firstTime
= 9223372036854775807LL;
148 int64_t lastTime
= 0;
150 double averageTime
= 0;
151 int64_t minTime
= 9223372036854775807LL;
154 for (auto ix
= tasks
.begin();
158 shared_ptr
<ThreadManagerTests::Task
> task
= *ix
;
160 int64_t delta
= task
->_endTime
- task
->_startTime
;
164 if (task
->_startTime
< firstTime
) {
165 firstTime
= task
->_startTime
;
168 if (task
->_endTime
> lastTime
) {
169 lastTime
= task
->_endTime
;
172 if (delta
< minTime
) {
176 if (delta
> maxTime
) {
180 averageTime
+= delta
;
183 averageTime
/= count
;
185 std::cout
<< "\t\t\tfirst start: " << firstTime
<< " Last end: " << lastTime
186 << " min: " << minTime
<< "ms max: " << maxTime
<< "ms average: " << averageTime
187 << "ms" << std::endl
;
189 bool success
= (time01
- time00
) >= ((int64_t)count
* timeout
) / (int64_t)workerCount
;
191 std::cout
<< "\t\t\t" << (success
? "Success" : "Failure")
192 << "! expected time: " << ((int64_t)count
* timeout
) / (int64_t)workerCount
<< "ms elapsed time: " << time01
- time00
193 << "ms" << std::endl
;
198 class BlockTask
: public Runnable
{
201 BlockTask(Monitor
& entryMonitor
, Monitor
& blockMonitor
, bool& blocked
, Monitor
& doneMonitor
, size_t& count
)
202 : _entryMonitor(entryMonitor
), _entered(false), _blockMonitor(blockMonitor
), _blocked(blocked
), _doneMonitor(doneMonitor
), _count(count
) {}
204 void run() override
{
206 Synchronized
s(_entryMonitor
);
208 _entryMonitor
.notify();
212 Synchronized
s(_blockMonitor
);
214 _blockMonitor
.wait();
219 Synchronized
s(_doneMonitor
);
221 _doneMonitor
.notify();
226 Monitor
& _entryMonitor
;
228 Monitor
& _blockMonitor
;
230 Monitor
& _doneMonitor
;
235 * Block test. Create pendingTaskCountMax tasks. Verify that we block adding the
236 * pendingTaskCountMax + 1th task. Verify that we unblock when a task completes */
238 bool blockTest(int64_t timeout
= 100LL, size_t workerCount
= 2) {
240 bool success
= false;
244 Monitor entryMonitor
; // not used by this test
245 Monitor blockMonitor
;
246 bool blocked
[] = {true, true, true};
249 size_t pendingTaskMaxCount
= workerCount
;
251 size_t activeCounts
[] = {workerCount
, pendingTaskMaxCount
, 1};
253 shared_ptr
<ThreadManager
> threadManager
254 = ThreadManager::newSimpleThreadManager(workerCount
, pendingTaskMaxCount
);
256 shared_ptr
<ThreadFactory
> threadFactory
257 = shared_ptr
<ThreadFactory
>(new ThreadFactory());
259 threadManager
->threadFactory(threadFactory
);
261 threadManager
->start();
263 std::vector
<shared_ptr
<ThreadManagerTests::BlockTask
> > tasks
;
264 tasks
.reserve(workerCount
+ pendingTaskMaxCount
);
266 for (size_t ix
= 0; ix
< workerCount
; ix
++) {
268 tasks
.push_back(shared_ptr
<ThreadManagerTests::BlockTask
>(
269 new ThreadManagerTests::BlockTask(entryMonitor
, blockMonitor
, blocked
[0], doneMonitor
, activeCounts
[0])));
272 for (size_t ix
= 0; ix
< pendingTaskMaxCount
; ix
++) {
274 tasks
.push_back(shared_ptr
<ThreadManagerTests::BlockTask
>(
275 new ThreadManagerTests::BlockTask(entryMonitor
, blockMonitor
, blocked
[1], doneMonitor
, activeCounts
[1])));
278 for (auto ix
= tasks
.begin();
281 threadManager
->add(*ix
);
284 if (!(success
= (threadManager
->totalTaskCount() == pendingTaskMaxCount
+ workerCount
))) {
285 throw TException("Unexpected pending task count");
288 shared_ptr
<ThreadManagerTests::BlockTask
> extraTask(
289 new ThreadManagerTests::BlockTask(entryMonitor
, blockMonitor
, blocked
[2], doneMonitor
, activeCounts
[2]));
292 threadManager
->add(extraTask
, 1);
293 throw TException("Unexpected success adding task in excess of pending task count");
294 } catch (TooManyPendingTasksException
&) {
295 throw TException("Should have timed out adding task in excess of pending task count");
296 } catch (TimedOutException
&) {
301 threadManager
->add(extraTask
, -1);
302 throw TException("Unexpected success adding task in excess of pending task count");
303 } catch (TimedOutException
&) {
304 throw TException("Unexpected timeout adding task in excess of pending task count");
305 } catch (TooManyPendingTasksException
&) {
309 std::cout
<< "\t\t\t"
310 << "Pending tasks " << threadManager
->pendingTaskCount() << std::endl
;
313 Synchronized
s(blockMonitor
);
315 blockMonitor
.notifyAll();
319 Synchronized
s(doneMonitor
);
320 while (activeCounts
[0] != 0) {
325 std::cout
<< "\t\t\t"
326 << "Pending tasks " << threadManager
->pendingTaskCount() << std::endl
;
329 threadManager
->add(extraTask
, 1);
330 } catch (TimedOutException
&) {
331 std::cout
<< "\t\t\t"
332 << "add timed out unexpectedly" << std::endl
;
333 throw TException("Unexpected timeout adding task");
335 } catch (TooManyPendingTasksException
&) {
336 std::cout
<< "\t\t\t"
337 << "add encountered too many pending exepctions" << std::endl
;
338 throw TException("Unexpected timeout adding task");
341 // Wake up tasks that were pending before and wait for them to complete
344 Synchronized
s(blockMonitor
);
346 blockMonitor
.notifyAll();
350 Synchronized
s(doneMonitor
);
351 while (activeCounts
[1] != 0) {
356 // Wake up the extra task and wait for it to complete
359 Synchronized
s(blockMonitor
);
361 blockMonitor
.notifyAll();
365 Synchronized
s(doneMonitor
);
366 while (activeCounts
[2] != 0) {
371 threadManager
->stop();
373 if (!(success
= (threadManager
->totalTaskCount() == 0))) {
374 throw TException("Unexpected total task count");
377 } catch (TException
& e
) {
378 std::cout
<< "ERROR: " << e
.what() << std::endl
;
381 std::cout
<< "\t\t\t" << (success
? "Success" : "Failure") << std::endl
;
388 // prove currentTime has milliseconds granularity since many other things depend on it
389 int64_t a
= std::chrono::duration_cast
<std::chrono::milliseconds
>(std::chrono::steady_clock::now().time_since_epoch()).count();
391 int64_t b
= std::chrono::duration_cast
<std::chrono::milliseconds
>(std::chrono::steady_clock::now().time_since_epoch()).count();
392 if (b
- a
< 50 || b
- a
> 150) {
393 std::cerr
<< "\t\t\texpected 100ms gap, found " << (b
-a
) << "ms gap instead." << std::endl
;
397 return apiTestWithThreadFactory(shared_ptr
<ThreadFactory
>(new ThreadFactory()));
401 bool apiTestWithThreadFactory(shared_ptr
<ThreadFactory
> threadFactory
)
403 shared_ptr
<ThreadManager
> threadManager
= ThreadManager::newSimpleThreadManager(1);
404 threadManager
->threadFactory(threadFactory
);
406 std::cout
<< "\t\t\t\tstarting.. " << std::endl
;
408 threadManager
->start();
409 threadManager
->setExpireCallback(expiredNotifier
); // std::bind(&ThreadManagerTests::expiredNotifier, this));
411 #define EXPECT(FUNC, COUNT) { size_t c = FUNC; if (c != COUNT) { std::cerr << "expected " #FUNC" to be " #COUNT ", but was " << c << std::endl; return false; } }
413 EXPECT(threadManager
->workerCount(), 1);
414 EXPECT(threadManager
->idleWorkerCount(), 1);
415 EXPECT(threadManager
->pendingTaskCount(), 0);
417 std::cout
<< "\t\t\t\tadd 2nd worker.. " << std::endl
;
419 threadManager
->addWorker();
421 EXPECT(threadManager
->workerCount(), 2);
422 EXPECT(threadManager
->idleWorkerCount(), 2);
423 EXPECT(threadManager
->pendingTaskCount(), 0);
425 std::cout
<< "\t\t\t\tremove 2nd worker.. " << std::endl
;
427 threadManager
->removeWorker();
429 EXPECT(threadManager
->workerCount(), 1);
430 EXPECT(threadManager
->idleWorkerCount(), 1);
431 EXPECT(threadManager
->pendingTaskCount(), 0);
433 std::cout
<< "\t\t\t\tremove 1st worker.. " << std::endl
;
435 threadManager
->removeWorker();
437 EXPECT(threadManager
->workerCount(), 0);
438 EXPECT(threadManager
->idleWorkerCount(), 0);
439 EXPECT(threadManager
->pendingTaskCount(), 0);
441 std::cout
<< "\t\t\t\tadd blocking task.. " << std::endl
;
443 // We're going to throw a blocking task into the mix
444 Monitor entryMonitor
; // signaled when task is running
445 Monitor blockMonitor
; // to be signaled to unblock the task
446 bool blocked(true); // set to false before notifying
447 Monitor doneMonitor
; // signaled when count reaches zero
448 size_t activeCount
= 1;
449 shared_ptr
<ThreadManagerTests::BlockTask
> blockingTask(
450 new ThreadManagerTests::BlockTask(entryMonitor
, blockMonitor
, blocked
, doneMonitor
, activeCount
));
451 threadManager
->add(blockingTask
);
453 EXPECT(threadManager
->workerCount(), 0);
454 EXPECT(threadManager
->idleWorkerCount(), 0);
455 EXPECT(threadManager
->pendingTaskCount(), 1);
457 std::cout
<< "\t\t\t\tadd other task.. " << std::endl
;
459 shared_ptr
<ThreadManagerTests::Task
> otherTask(
460 new ThreadManagerTests::Task(doneMonitor
, activeCount
, 0));
462 threadManager
->add(otherTask
);
464 EXPECT(threadManager
->workerCount(), 0);
465 EXPECT(threadManager
->idleWorkerCount(), 0);
466 EXPECT(threadManager
->pendingTaskCount(), 2);
468 std::cout
<< "\t\t\t\tremove blocking task specifically.. " << std::endl
;
470 threadManager
->remove(blockingTask
);
472 EXPECT(threadManager
->workerCount(), 0);
473 EXPECT(threadManager
->idleWorkerCount(), 0);
474 EXPECT(threadManager
->pendingTaskCount(), 1);
476 std::cout
<< "\t\t\t\tremove next pending task.." << std::endl
;
478 shared_ptr
<Runnable
> nextTask
= threadManager
->removeNextPending();
479 if (nextTask
!= otherTask
) {
480 std::cerr
<< "\t\t\t\t\texpected removeNextPending to return otherTask" << std::endl
;
484 EXPECT(threadManager
->workerCount(), 0);
485 EXPECT(threadManager
->idleWorkerCount(), 0);
486 EXPECT(threadManager
->pendingTaskCount(), 0);
488 std::cout
<< "\t\t\t\tremove next pending task (none left).." << std::endl
;
490 nextTask
= threadManager
->removeNextPending();
492 std::cerr
<< "\t\t\t\t\texpected removeNextPending to return an empty Runnable" << std::endl
;
496 std::cout
<< "\t\t\t\tadd 2 expired tasks and 1 not.." << std::endl
;
498 shared_ptr
<ThreadManagerTests::Task
> expiredTask(
499 new ThreadManagerTests::Task(doneMonitor
, activeCount
, 0));
501 threadManager
->add(expiredTask
, 0, 1);
502 threadManager
->add(blockingTask
); // add one that hasn't expired to make sure it gets skipped
503 threadManager
->add(expiredTask
, 0, 1); // add a second expired to ensure removeExpiredTasks removes both
505 sleep_(50); // make sure enough time elapses for it to expire - the shortest expiration time is 1 millisecond
507 EXPECT(threadManager
->workerCount(), 0);
508 EXPECT(threadManager
->idleWorkerCount(), 0);
509 EXPECT(threadManager
->pendingTaskCount(), 3);
510 EXPECT(threadManager
->expiredTaskCount(), 0);
512 std::cout
<< "\t\t\t\tremove expired tasks.." << std::endl
;
514 if (!m_expired
.empty()) {
515 std::cerr
<< "\t\t\t\t\texpected m_expired to be empty" << std::endl
;
519 threadManager
->removeExpiredTasks();
521 if (m_expired
.size() != 2) {
522 std::cerr
<< "\t\t\t\t\texpected m_expired to be set" << std::endl
;
526 if (m_expired
.front() != expiredTask
) {
527 std::cerr
<< "\t\t\t\t\texpected m_expired[0] to be the expired task" << std::endl
;
530 m_expired
.pop_front();
532 if (m_expired
.front() != expiredTask
) {
533 std::cerr
<< "\t\t\t\t\texpected m_expired[1] to be the expired task" << std::endl
;
539 threadManager
->remove(blockingTask
);
541 EXPECT(threadManager
->workerCount(), 0);
542 EXPECT(threadManager
->idleWorkerCount(), 0);
543 EXPECT(threadManager
->pendingTaskCount(), 0);
544 EXPECT(threadManager
->expiredTaskCount(), 2);
546 std::cout
<< "\t\t\t\tadd expired task (again).." << std::endl
;
548 threadManager
->add(expiredTask
, 0, 1); // expires in 1ms
549 sleep_(50); // make sure enough time elapses for it to expire - the shortest expiration time is 1ms
551 std::cout
<< "\t\t\t\tadd worker to consume expired task.." << std::endl
;
553 threadManager
->addWorker();
554 sleep_(100); // make sure it has time to spin up and expire the task
556 if (m_expired
.empty()) {
557 std::cerr
<< "\t\t\t\t\texpected m_expired to be set" << std::endl
;
561 if (m_expired
.front() != expiredTask
) {
562 std::cerr
<< "\t\t\t\t\texpected m_expired to be the expired task" << std::endl
;
568 EXPECT(threadManager
->workerCount(), 1);
569 EXPECT(threadManager
->idleWorkerCount(), 1);
570 EXPECT(threadManager
->pendingTaskCount(), 0);
571 EXPECT(threadManager
->expiredTaskCount(), 3);
573 std::cout
<< "\t\t\t\ttry to remove too many workers" << std::endl
;
575 threadManager
->removeWorker(2);
576 std::cerr
<< "\t\t\t\t\texpected InvalidArgumentException" << std::endl
;
578 } catch (const InvalidArgumentException
&) {
582 std::cout
<< "\t\t\t\tremove worker.. " << std::endl
;
584 threadManager
->removeWorker();
586 EXPECT(threadManager
->workerCount(), 0);
587 EXPECT(threadManager
->idleWorkerCount(), 0);
588 EXPECT(threadManager
->pendingTaskCount(), 0);
589 EXPECT(threadManager
->expiredTaskCount(), 3);
591 std::cout
<< "\t\t\t\tadd blocking task.. " << std::endl
;
593 threadManager
->add(blockingTask
);
595 EXPECT(threadManager
->workerCount(), 0);
596 EXPECT(threadManager
->idleWorkerCount(), 0);
597 EXPECT(threadManager
->pendingTaskCount(), 1);
599 std::cout
<< "\t\t\t\tadd worker.. " << std::endl
;
601 threadManager
->addWorker();
603 Synchronized
s(entryMonitor
);
604 while (!blockingTask
->_entered
) {
609 EXPECT(threadManager
->workerCount(), 1);
610 EXPECT(threadManager
->idleWorkerCount(), 0);
611 EXPECT(threadManager
->pendingTaskCount(), 0);
613 std::cout
<< "\t\t\t\tunblock task and remove worker.. " << std::endl
;
616 Synchronized
s(blockMonitor
);
618 blockMonitor
.notifyAll();
620 threadManager
->removeWorker();
622 EXPECT(threadManager
->workerCount(), 0);
623 EXPECT(threadManager
->idleWorkerCount(), 0);
624 EXPECT(threadManager
->pendingTaskCount(), 0);
626 std::cout
<< "\t\t\t\tcleanup.. " << std::endl
;
628 blockingTask
.reset();
629 threadManager
.reset();
637 } // apache::thrift::concurrency
639 using namespace apache::thrift::concurrency::test
;