2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
20 #include <thrift/thrift-config.h>
22 #include <thrift/concurrency/ThreadManager.h>
23 #include <thrift/concurrency/Exception.h>
24 #include <thrift/concurrency/Monitor.h>
34 namespace concurrency
{
36 using std::shared_ptr
;
37 using std::unique_ptr
;
38 using std::dynamic_pointer_cast
;
43 * This class manages a pool of threads. It uses a ThreadFactory to create
44 * threads. It never actually creates or destroys worker threads, rather
45 * it maintains statistics on number of idle threads, number of active threads,
46 * task backlog, and average wait and service times.
48 * There are three different monitors used for signaling different conditions
49 * however they all share the same mutex_.
53 class ThreadManager::Impl
: public ThreadManager
{
60 pendingTaskCountMax_(0),
62 state_(ThreadManager::UNINITIALIZED
),
65 workerMonitor_(&mutex_
) {}
67 ~Impl() override
{ stop(); }
69 void start() override
;
72 ThreadManager::STATE
state() const override
{ return state_
; }
74 shared_ptr
<ThreadFactory
> threadFactory() const override
{
76 return threadFactory_
;
79 void threadFactory(shared_ptr
<ThreadFactory
> value
) override
{
81 if (threadFactory_
&& threadFactory_
->isDetached() != value
->isDetached()) {
82 throw InvalidArgumentException();
84 threadFactory_
= value
;
87 void addWorker(size_t value
) override
;
89 void removeWorker(size_t value
) override
;
91 size_t idleWorkerCount() const override
{ return idleCount_
; }
93 size_t workerCount() const override
{
98 size_t pendingTaskCount() const override
{
100 return tasks_
.size();
103 size_t totalTaskCount() const override
{
105 return tasks_
.size() + workerCount_
- idleCount_
;
108 size_t pendingTaskCountMax() const override
{
110 return pendingTaskCountMax_
;
113 size_t expiredTaskCount() const override
{
115 return expiredCount_
;
118 void pendingTaskCountMax(const size_t value
) {
120 pendingTaskCountMax_
= value
;
123 void add(shared_ptr
<Runnable
> value
, int64_t timeout
, int64_t expiration
) override
;
125 void remove(shared_ptr
<Runnable
> task
) override
;
127 shared_ptr
<Runnable
> removeNextPending() override
;
129 void removeExpiredTasks() override
{
130 removeExpired(false);
133 void setExpireCallback(ExpireCallback expireCallback
) override
;
137 * Remove one or more expired tasks.
138 * \param[in] justOne if true, try to remove just one task and return
140 void removeExpired(bool justOne
);
143 * \returns whether it is acceptable to block, depending on the current thread id
145 bool canSleep() const;
148 * Lowers the maximum worker count and blocks until enough worker threads complete
149 * to get to the new maximum worker limit. The caller is responsible for acquiring
150 * a lock on the class mutex_.
152 void removeWorkersUnderLock(size_t value
);
155 size_t workerMaxCount_
;
157 size_t pendingTaskCountMax_
;
158 size_t expiredCount_
;
159 ExpireCallback expireCallback_
;
161 ThreadManager::STATE state_
;
162 shared_ptr
<ThreadFactory
> threadFactory_
;
164 friend class ThreadManager::Task
;
165 typedef std::deque
<shared_ptr
<Task
> > TaskQueue
;
170 Monitor workerMonitor_
; // used to synchronize changes in worker count
172 friend class ThreadManager::Worker
;
173 std::set
<shared_ptr
<Thread
> > workers_
;
174 std::set
<shared_ptr
<Thread
> > deadWorkers_
;
175 std::map
<const Thread::id_t
, shared_ptr
<Thread
> > idMap_
;
178 class ThreadManager::Task
: public Runnable
{
181 enum STATE
{ WAITING
, EXECUTING
, TIMEDOUT
, COMPLETE
};
183 Task(shared_ptr
<Runnable
> runnable
, uint64_t expiration
= 0ULL)
184 : runnable_(runnable
),
186 if (expiration
!= 0ULL) {
187 expireTime_
.reset(new std::chrono::steady_clock::time_point(std::chrono::steady_clock::now() + std::chrono::milliseconds(expiration
)));
191 ~Task() override
= default;
193 void run() override
{
194 if (state_
== EXECUTING
) {
200 shared_ptr
<Runnable
> getRunnable() { return runnable_
; }
202 const unique_ptr
<std::chrono::steady_clock::time_point
> & getExpireTime() const { return expireTime_
; }
205 shared_ptr
<Runnable
> runnable_
;
206 friend class ThreadManager::Worker
;
208 unique_ptr
<std::chrono::steady_clock::time_point
> expireTime_
;
211 class ThreadManager::Worker
: public Runnable
{
212 enum STATE
{ UNINITIALIZED
, STARTING
, STARTED
, STOPPING
, STOPPED
};
215 Worker(ThreadManager::Impl
* manager
) : manager_(manager
), state_(UNINITIALIZED
) {}
217 ~Worker() override
= default;
220 bool isActive() const {
221 return (manager_
->workerCount_
<= manager_
->workerMaxCount_
)
222 || (manager_
->state_
== JOINING
&& !manager_
->tasks_
.empty());
229 * As long as worker thread is running, pull tasks off the task queue and
232 void run() override
{
233 Guard
g(manager_
->mutex_
);
236 * This method has three parts; one is to check for and account for
237 * admitting a task which happens under a lock. Then the lock is released
238 * and the task itself is executed. Finally we do some accounting
239 * under lock again when the task completes.
247 * Increment worker semaphore and notify manager if worker count reached
250 bool active
= manager_
->workerCount_
< manager_
->workerMaxCount_
;
252 if (++manager_
->workerCount_
== manager_
->workerMaxCount_
) {
253 manager_
->workerMonitor_
.notify();
259 * While holding manager monitor block for non-empty task queue (Also
260 * check that the thread hasn't been requested to stop). Once the queue
261 * is non-empty, dequeue a task, release monitor, and execute. If the
262 * worker max count has been decremented such that we exceed it, mark
263 * ourself inactive, decrement the worker count and notify the manager
264 * (technically we're notifying the next blocked thread but eventually
265 * the manager will see it.
269 while (active
&& manager_
->tasks_
.empty()) {
270 manager_
->idleCount_
++;
271 manager_
->monitor_
.wait();
273 manager_
->idleCount_
--;
276 shared_ptr
<ThreadManager::Task
> task
;
279 if (!manager_
->tasks_
.empty()) {
280 task
= manager_
->tasks_
.front();
281 manager_
->tasks_
.pop_front();
282 if (task
->state_
== ThreadManager::Task::WAITING
) {
283 // If the state is changed to anything other than EXECUTING or TIMEDOUT here
284 // then the execution loop needs to be changed below.
286 (task
->getExpireTime() && *(task
->getExpireTime()) < std::chrono::steady_clock::now()) ?
287 ThreadManager::Task::TIMEDOUT
:
288 ThreadManager::Task::EXECUTING
;
292 /* If we have a pending task max and we just dropped below it, wakeup any
293 thread that might be blocked on add. */
294 if (manager_
->pendingTaskCountMax_
!= 0
295 && manager_
->tasks_
.size() <= manager_
->pendingTaskCountMax_
- 1) {
296 manager_
->maxMonitor_
.notify();
301 * Execution - not holding a lock
304 if (task
->state_
== ThreadManager::Task::EXECUTING
) {
306 // Release the lock so we can run the task without blocking the thread manager
307 manager_
->mutex_
.unlock();
311 } catch (const std::exception
& e
) {
312 GlobalOutput
.printf("[ERROR] task->run() raised an exception: %s", e
.what());
314 GlobalOutput
.printf("[ERROR] task->run() raised an unknown exception");
317 // Re-acquire the lock to proceed in the thread manager
318 manager_
->mutex_
.lock();
320 } else if (manager_
->expireCallback_
) {
321 // The only other state the task could have been in is TIMEDOUT (see above)
322 manager_
->expireCallback_(task
->getRunnable());
323 manager_
->expiredCount_
++;
329 * Final accounting for the worker thread that is done working
331 manager_
->deadWorkers_
.insert(this->thread());
332 if (--manager_
->workerCount_
== manager_
->workerMaxCount_
) {
333 manager_
->workerMonitor_
.notify();
338 ThreadManager::Impl
* manager_
;
339 friend class ThreadManager::Impl
;
343 void ThreadManager::Impl::addWorker(size_t value
) {
344 std::set
<shared_ptr
<Thread
> > newThreads
;
345 for (size_t ix
= 0; ix
< value
; ix
++) {
346 shared_ptr
<ThreadManager::Worker
> worker
347 = std::make_shared
<ThreadManager::Worker
>(this);
348 newThreads
.insert(threadFactory_
->newThread(worker
));
352 workerMaxCount_
+= value
;
353 workers_
.insert(newThreads
.begin(), newThreads
.end());
355 for (const auto & newThread
: newThreads
) {
356 shared_ptr
<ThreadManager::Worker
> worker
357 = dynamic_pointer_cast
<ThreadManager::Worker
, Runnable
>(newThread
->runnable());
358 worker
->state_
= ThreadManager::Worker::STARTING
;
360 idMap_
.insert(std::pair
<const Thread::id_t
, shared_ptr
<Thread
> >(newThread
->getId(), newThread
));
363 while (workerCount_
!= workerMaxCount_
) {
364 workerMonitor_
.wait();
368 void ThreadManager::Impl::start() {
370 if (state_
== ThreadManager::STOPPED
) {
374 if (state_
== ThreadManager::UNINITIALIZED
) {
375 if (!threadFactory_
) {
376 throw InvalidArgumentException();
378 state_
= ThreadManager::STARTED
;
379 monitor_
.notifyAll();
382 while (state_
== STARTING
) {
387 void ThreadManager::Impl::stop() {
391 if (state_
!= ThreadManager::STOPPING
&& state_
!= ThreadManager::JOINING
392 && state_
!= ThreadManager::STOPPED
) {
394 state_
= ThreadManager::JOINING
;
398 removeWorkersUnderLock(workerCount_
);
401 state_
= ThreadManager::STOPPED
;
404 void ThreadManager::Impl::removeWorker(size_t value
) {
406 removeWorkersUnderLock(value
);
409 void ThreadManager::Impl::removeWorkersUnderLock(size_t value
) {
410 if (value
> workerMaxCount_
) {
411 throw InvalidArgumentException();
414 workerMaxCount_
-= value
;
416 if (idleCount_
> value
) {
417 // There are more idle workers than we need to remove,
418 // so notify enough of them so they can terminate.
419 for (size_t ix
= 0; ix
< value
; ix
++) {
423 // There are as many or less idle workers than we need to remove,
424 // so just notify them all so they can terminate.
425 monitor_
.notifyAll();
428 while (workerCount_
!= workerMaxCount_
) {
429 workerMonitor_
.wait();
432 for (const auto & deadWorker
: deadWorkers_
) {
434 // when used with a joinable thread factory, we join the threads as we remove them
435 if (!threadFactory_
->isDetached()) {
439 idMap_
.erase(deadWorker
->getId());
440 workers_
.erase(deadWorker
);
443 deadWorkers_
.clear();
446 bool ThreadManager::Impl::canSleep() const {
447 const Thread::id_t id
= threadFactory_
->getCurrentThreadId();
448 return idMap_
.find(id
) == idMap_
.end();
451 void ThreadManager::Impl::add(shared_ptr
<Runnable
> value
, int64_t timeout
, int64_t expiration
) {
452 Guard
g(mutex_
, timeout
);
455 throw TimedOutException();
458 if (state_
!= ThreadManager::STARTED
) {
459 throw IllegalStateException(
460 "ThreadManager::Impl::add ThreadManager "
464 // if we're at a limit, remove an expired task to see if the limit clears
465 if (pendingTaskCountMax_
> 0 && (tasks_
.size() >= pendingTaskCountMax_
)) {
469 if (pendingTaskCountMax_
> 0 && (tasks_
.size() >= pendingTaskCountMax_
)) {
470 if (canSleep() && timeout
>= 0) {
471 while (pendingTaskCountMax_
> 0 && tasks_
.size() >= pendingTaskCountMax_
) {
472 // This is thread safe because the mutex is shared between monitors.
473 maxMonitor_
.wait(timeout
);
476 throw TooManyPendingTasksException();
480 tasks_
.push_back(std::make_shared
<ThreadManager::Task
>(value
, expiration
));
482 // If idle thread is available notify it, otherwise all worker threads are
483 // running and will get around to this task in time.
484 if (idleCount_
> 0) {
489 void ThreadManager::Impl::remove(shared_ptr
<Runnable
> task
) {
491 if (state_
!= ThreadManager::STARTED
) {
492 throw IllegalStateException(
493 "ThreadManager::Impl::remove ThreadManager not "
497 for (auto it
= tasks_
.begin(); it
!= tasks_
.end(); ++it
)
499 if ((*it
)->getRunnable() == task
)
507 std::shared_ptr
<Runnable
> ThreadManager::Impl::removeNextPending() {
509 if (state_
!= ThreadManager::STARTED
) {
510 throw IllegalStateException(
511 "ThreadManager::Impl::removeNextPending "
512 "ThreadManager not started");
515 if (tasks_
.empty()) {
516 return std::shared_ptr
<Runnable
>();
519 shared_ptr
<ThreadManager::Task
> task
= tasks_
.front();
522 return task
->getRunnable();
525 void ThreadManager::Impl::removeExpired(bool justOne
) {
526 // this is always called under a lock
527 if (tasks_
.empty()) {
530 auto now
= std::chrono::steady_clock::now();
532 for (auto it
= tasks_
.begin(); it
!= tasks_
.end(); )
534 if ((*it
)->getExpireTime() && *((*it
)->getExpireTime()) < now
) {
535 if (expireCallback_
) {
536 expireCallback_((*it
)->getRunnable());
538 it
= tasks_
.erase(it
);
551 void ThreadManager::Impl::setExpireCallback(ExpireCallback expireCallback
) {
553 expireCallback_
= expireCallback
;
556 class SimpleThreadManager
: public ThreadManager::Impl
{
559 SimpleThreadManager(size_t workerCount
= 4, size_t pendingTaskCountMax
= 0)
560 : workerCount_(workerCount
), pendingTaskCountMax_(pendingTaskCountMax
) {}
562 void start() override
{
563 ThreadManager::Impl::pendingTaskCountMax(pendingTaskCountMax_
);
564 ThreadManager::Impl::start();
565 addWorker(workerCount_
);
569 const size_t workerCount_
;
570 const size_t pendingTaskCountMax_
;
573 shared_ptr
<ThreadManager
> ThreadManager::newThreadManager() {
574 return shared_ptr
<ThreadManager
>(new ThreadManager::Impl());
577 shared_ptr
<ThreadManager
> ThreadManager::newSimpleThreadManager(size_t count
,
578 size_t pendingTaskCountMax
) {
579 return shared_ptr
<ThreadManager
>(new SimpleThreadManager(count
, pendingTaskCountMax
));
583 } // apache::thrift::concurrency