]> git.proxmox.com Git - ceph.git/blame - ceph/src/jaegertracing/thrift/lib/cpp/src/thrift/concurrency/ThreadManager.cpp
buildsys: switch source download to quincy
[ceph.git] / ceph / src / jaegertracing / thrift / lib / cpp / src / thrift / concurrency / ThreadManager.cpp
CommitLineData
f67539c2
TL
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#include <thrift/thrift-config.h>
21
22#include <thrift/concurrency/ThreadManager.h>
23#include <thrift/concurrency/Exception.h>
24#include <thrift/concurrency/Monitor.h>
25
26#include <memory>
27
28#include <stdexcept>
29#include <deque>
30#include <set>
31
32namespace apache {
33namespace thrift {
34namespace concurrency {
35
36using std::shared_ptr;
37using std::unique_ptr;
38using std::dynamic_pointer_cast;
39
40/**
41 * ThreadManager class
42 *
43 * This class manages a pool of threads. It uses a ThreadFactory to create
44 * threads. It never actually creates or destroys worker threads, rather
45 * it maintains statistics on number of idle threads, number of active threads,
46 * task backlog, and average wait and service times.
47 *
48 * There are three different monitors used for signaling different conditions
49 * however they all share the same mutex_.
50 *
51 * @version $Id:$
52 */
53class ThreadManager::Impl : public ThreadManager {
54
55public:
56 Impl()
57 : workerCount_(0),
58 workerMaxCount_(0),
59 idleCount_(0),
60 pendingTaskCountMax_(0),
61 expiredCount_(0),
62 state_(ThreadManager::UNINITIALIZED),
63 monitor_(&mutex_),
64 maxMonitor_(&mutex_),
65 workerMonitor_(&mutex_) {}
66
67 ~Impl() override { stop(); }
68
69 void start() override;
70 void stop() override;
71
72 ThreadManager::STATE state() const override { return state_; }
73
74 shared_ptr<ThreadFactory> threadFactory() const override {
75 Guard g(mutex_);
76 return threadFactory_;
77 }
78
79 void threadFactory(shared_ptr<ThreadFactory> value) override {
80 Guard g(mutex_);
81 if (threadFactory_ && threadFactory_->isDetached() != value->isDetached()) {
82 throw InvalidArgumentException();
83 }
84 threadFactory_ = value;
85 }
86
87 void addWorker(size_t value) override;
88
89 void removeWorker(size_t value) override;
90
91 size_t idleWorkerCount() const override { return idleCount_; }
92
93 size_t workerCount() const override {
94 Guard g(mutex_);
95 return workerCount_;
96 }
97
98 size_t pendingTaskCount() const override {
99 Guard g(mutex_);
100 return tasks_.size();
101 }
102
103 size_t totalTaskCount() const override {
104 Guard g(mutex_);
105 return tasks_.size() + workerCount_ - idleCount_;
106 }
107
108 size_t pendingTaskCountMax() const override {
109 Guard g(mutex_);
110 return pendingTaskCountMax_;
111 }
112
113 size_t expiredTaskCount() const override {
114 Guard g(mutex_);
115 return expiredCount_;
116 }
117
118 void pendingTaskCountMax(const size_t value) {
119 Guard g(mutex_);
120 pendingTaskCountMax_ = value;
121 }
122
123 void add(shared_ptr<Runnable> value, int64_t timeout, int64_t expiration) override;
124
125 void remove(shared_ptr<Runnable> task) override;
126
127 shared_ptr<Runnable> removeNextPending() override;
128
129 void removeExpiredTasks() override {
130 removeExpired(false);
131 }
132
133 void setExpireCallback(ExpireCallback expireCallback) override;
134
135private:
136 /**
137 * Remove one or more expired tasks.
138 * \param[in] justOne if true, try to remove just one task and return
139 */
140 void removeExpired(bool justOne);
141
142 /**
143 * \returns whether it is acceptable to block, depending on the current thread id
144 */
145 bool canSleep() const;
146
147 /**
148 * Lowers the maximum worker count and blocks until enough worker threads complete
149 * to get to the new maximum worker limit. The caller is responsible for acquiring
150 * a lock on the class mutex_.
151 */
152 void removeWorkersUnderLock(size_t value);
153
154 size_t workerCount_;
155 size_t workerMaxCount_;
156 size_t idleCount_;
157 size_t pendingTaskCountMax_;
158 size_t expiredCount_;
159 ExpireCallback expireCallback_;
160
161 ThreadManager::STATE state_;
162 shared_ptr<ThreadFactory> threadFactory_;
163
164 friend class ThreadManager::Task;
165 typedef std::deque<shared_ptr<Task> > TaskQueue;
166 TaskQueue tasks_;
167 Mutex mutex_;
168 Monitor monitor_;
169 Monitor maxMonitor_;
170 Monitor workerMonitor_; // used to synchronize changes in worker count
171
172 friend class ThreadManager::Worker;
173 std::set<shared_ptr<Thread> > workers_;
174 std::set<shared_ptr<Thread> > deadWorkers_;
175 std::map<const Thread::id_t, shared_ptr<Thread> > idMap_;
176};
177
178class ThreadManager::Task : public Runnable {
179
180public:
181 enum STATE { WAITING, EXECUTING, TIMEDOUT, COMPLETE };
182
183 Task(shared_ptr<Runnable> runnable, uint64_t expiration = 0ULL)
184 : runnable_(runnable),
185 state_(WAITING) {
186 if (expiration != 0ULL) {
187 expireTime_.reset(new std::chrono::steady_clock::time_point(std::chrono::steady_clock::now() + std::chrono::milliseconds(expiration)));
188 }
189 }
190
191 ~Task() override = default;
192
193 void run() override {
194 if (state_ == EXECUTING) {
195 runnable_->run();
196 state_ = COMPLETE;
197 }
198 }
199
200 shared_ptr<Runnable> getRunnable() { return runnable_; }
201
202 const unique_ptr<std::chrono::steady_clock::time_point> & getExpireTime() const { return expireTime_; }
203
204private:
205 shared_ptr<Runnable> runnable_;
206 friend class ThreadManager::Worker;
207 STATE state_;
208 unique_ptr<std::chrono::steady_clock::time_point> expireTime_;
209};
210
211class ThreadManager::Worker : public Runnable {
212 enum STATE { UNINITIALIZED, STARTING, STARTED, STOPPING, STOPPED };
213
214public:
215 Worker(ThreadManager::Impl* manager) : manager_(manager), state_(UNINITIALIZED) {}
216
217 ~Worker() override = default;
218
219private:
220 bool isActive() const {
221 return (manager_->workerCount_ <= manager_->workerMaxCount_)
222 || (manager_->state_ == JOINING && !manager_->tasks_.empty());
223 }
224
225public:
226 /**
227 * Worker entry point
228 *
229 * As long as worker thread is running, pull tasks off the task queue and
230 * execute.
231 */
232 void run() override {
233 Guard g(manager_->mutex_);
234
235 /**
236 * This method has three parts; one is to check for and account for
237 * admitting a task which happens under a lock. Then the lock is released
238 * and the task itself is executed. Finally we do some accounting
239 * under lock again when the task completes.
240 */
241
242 /**
243 * Admitting
244 */
245
246 /**
247 * Increment worker semaphore and notify manager if worker count reached
248 * desired max
249 */
250 bool active = manager_->workerCount_ < manager_->workerMaxCount_;
251 if (active) {
252 if (++manager_->workerCount_ == manager_->workerMaxCount_) {
253 manager_->workerMonitor_.notify();
254 }
255 }
256
257 while (active) {
258 /**
259 * While holding manager monitor block for non-empty task queue (Also
260 * check that the thread hasn't been requested to stop). Once the queue
261 * is non-empty, dequeue a task, release monitor, and execute. If the
262 * worker max count has been decremented such that we exceed it, mark
263 * ourself inactive, decrement the worker count and notify the manager
264 * (technically we're notifying the next blocked thread but eventually
265 * the manager will see it.
266 */
267 active = isActive();
268
269 while (active && manager_->tasks_.empty()) {
270 manager_->idleCount_++;
271 manager_->monitor_.wait();
272 active = isActive();
273 manager_->idleCount_--;
274 }
275
276 shared_ptr<ThreadManager::Task> task;
277
278 if (active) {
279 if (!manager_->tasks_.empty()) {
280 task = manager_->tasks_.front();
281 manager_->tasks_.pop_front();
282 if (task->state_ == ThreadManager::Task::WAITING) {
283 // If the state is changed to anything other than EXECUTING or TIMEDOUT here
284 // then the execution loop needs to be changed below.
285 task->state_ =
286 (task->getExpireTime() && *(task->getExpireTime()) < std::chrono::steady_clock::now()) ?
287 ThreadManager::Task::TIMEDOUT :
288 ThreadManager::Task::EXECUTING;
289 }
290 }
291
292 /* If we have a pending task max and we just dropped below it, wakeup any
293 thread that might be blocked on add. */
294 if (manager_->pendingTaskCountMax_ != 0
295 && manager_->tasks_.size() <= manager_->pendingTaskCountMax_ - 1) {
296 manager_->maxMonitor_.notify();
297 }
298 }
299
300 /**
301 * Execution - not holding a lock
302 */
303 if (task) {
304 if (task->state_ == ThreadManager::Task::EXECUTING) {
305
306 // Release the lock so we can run the task without blocking the thread manager
307 manager_->mutex_.unlock();
308
309 try {
310 task->run();
311 } catch (const std::exception& e) {
312 GlobalOutput.printf("[ERROR] task->run() raised an exception: %s", e.what());
313 } catch (...) {
314 GlobalOutput.printf("[ERROR] task->run() raised an unknown exception");
315 }
316
317 // Re-acquire the lock to proceed in the thread manager
318 manager_->mutex_.lock();
319
320 } else if (manager_->expireCallback_) {
321 // The only other state the task could have been in is TIMEDOUT (see above)
322 manager_->expireCallback_(task->getRunnable());
323 manager_->expiredCount_++;
324 }
325 }
326 }
327
328 /**
329 * Final accounting for the worker thread that is done working
330 */
331 manager_->deadWorkers_.insert(this->thread());
332 if (--manager_->workerCount_ == manager_->workerMaxCount_) {
333 manager_->workerMonitor_.notify();
334 }
335 }
336
337private:
338 ThreadManager::Impl* manager_;
339 friend class ThreadManager::Impl;
340 STATE state_;
341};
342
343void ThreadManager::Impl::addWorker(size_t value) {
344 std::set<shared_ptr<Thread> > newThreads;
345 for (size_t ix = 0; ix < value; ix++) {
346 shared_ptr<ThreadManager::Worker> worker
347 = std::make_shared<ThreadManager::Worker>(this);
348 newThreads.insert(threadFactory_->newThread(worker));
349 }
350
351 Guard g(mutex_);
352 workerMaxCount_ += value;
353 workers_.insert(newThreads.begin(), newThreads.end());
354
355 for (const auto & newThread : newThreads) {
356 shared_ptr<ThreadManager::Worker> worker
357 = dynamic_pointer_cast<ThreadManager::Worker, Runnable>(newThread->runnable());
358 worker->state_ = ThreadManager::Worker::STARTING;
359 newThread->start();
360 idMap_.insert(std::pair<const Thread::id_t, shared_ptr<Thread> >(newThread->getId(), newThread));
361 }
362
363 while (workerCount_ != workerMaxCount_) {
364 workerMonitor_.wait();
365 }
366}
367
368void ThreadManager::Impl::start() {
369 Guard g(mutex_);
370 if (state_ == ThreadManager::STOPPED) {
371 return;
372 }
373
374 if (state_ == ThreadManager::UNINITIALIZED) {
375 if (!threadFactory_) {
376 throw InvalidArgumentException();
377 }
378 state_ = ThreadManager::STARTED;
379 monitor_.notifyAll();
380 }
381
382 while (state_ == STARTING) {
383 monitor_.wait();
384 }
385}
386
387void ThreadManager::Impl::stop() {
388 Guard g(mutex_);
389 bool doStop = false;
390
391 if (state_ != ThreadManager::STOPPING && state_ != ThreadManager::JOINING
392 && state_ != ThreadManager::STOPPED) {
393 doStop = true;
394 state_ = ThreadManager::JOINING;
395 }
396
397 if (doStop) {
398 removeWorkersUnderLock(workerCount_);
399 }
400
401 state_ = ThreadManager::STOPPED;
402}
403
404void ThreadManager::Impl::removeWorker(size_t value) {
405 Guard g(mutex_);
406 removeWorkersUnderLock(value);
407}
408
409void ThreadManager::Impl::removeWorkersUnderLock(size_t value) {
410 if (value > workerMaxCount_) {
411 throw InvalidArgumentException();
412 }
413
414 workerMaxCount_ -= value;
415
416 if (idleCount_ > value) {
417 // There are more idle workers than we need to remove,
418 // so notify enough of them so they can terminate.
419 for (size_t ix = 0; ix < value; ix++) {
420 monitor_.notify();
421 }
422 } else {
423 // There are as many or less idle workers than we need to remove,
424 // so just notify them all so they can terminate.
425 monitor_.notifyAll();
426 }
427
428 while (workerCount_ != workerMaxCount_) {
429 workerMonitor_.wait();
430 }
431
432 for (const auto & deadWorker : deadWorkers_) {
433
434 // when used with a joinable thread factory, we join the threads as we remove them
435 if (!threadFactory_->isDetached()) {
436 deadWorker->join();
437 }
438
439 idMap_.erase(deadWorker->getId());
440 workers_.erase(deadWorker);
441 }
442
443 deadWorkers_.clear();
444}
445
446bool ThreadManager::Impl::canSleep() const {
447 const Thread::id_t id = threadFactory_->getCurrentThreadId();
448 return idMap_.find(id) == idMap_.end();
449}
450
451void ThreadManager::Impl::add(shared_ptr<Runnable> value, int64_t timeout, int64_t expiration) {
452 Guard g(mutex_, timeout);
453
454 if (!g) {
455 throw TimedOutException();
456 }
457
458 if (state_ != ThreadManager::STARTED) {
459 throw IllegalStateException(
460 "ThreadManager::Impl::add ThreadManager "
461 "not started");
462 }
463
464 // if we're at a limit, remove an expired task to see if the limit clears
465 if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
466 removeExpired(true);
467 }
468
469 if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
470 if (canSleep() && timeout >= 0) {
471 while (pendingTaskCountMax_ > 0 && tasks_.size() >= pendingTaskCountMax_) {
472 // This is thread safe because the mutex is shared between monitors.
473 maxMonitor_.wait(timeout);
474 }
475 } else {
476 throw TooManyPendingTasksException();
477 }
478 }
479
480 tasks_.push_back(std::make_shared<ThreadManager::Task>(value, expiration));
481
482 // If idle thread is available notify it, otherwise all worker threads are
483 // running and will get around to this task in time.
484 if (idleCount_ > 0) {
485 monitor_.notify();
486 }
487}
488
489void ThreadManager::Impl::remove(shared_ptr<Runnable> task) {
490 Guard g(mutex_);
491 if (state_ != ThreadManager::STARTED) {
492 throw IllegalStateException(
493 "ThreadManager::Impl::remove ThreadManager not "
494 "started");
495 }
496
497 for (auto it = tasks_.begin(); it != tasks_.end(); ++it)
498 {
499 if ((*it)->getRunnable() == task)
500 {
501 tasks_.erase(it);
502 return;
503 }
504 }
505}
506
507std::shared_ptr<Runnable> ThreadManager::Impl::removeNextPending() {
508 Guard g(mutex_);
509 if (state_ != ThreadManager::STARTED) {
510 throw IllegalStateException(
511 "ThreadManager::Impl::removeNextPending "
512 "ThreadManager not started");
513 }
514
515 if (tasks_.empty()) {
516 return std::shared_ptr<Runnable>();
517 }
518
519 shared_ptr<ThreadManager::Task> task = tasks_.front();
520 tasks_.pop_front();
521
522 return task->getRunnable();
523}
524
525void ThreadManager::Impl::removeExpired(bool justOne) {
526 // this is always called under a lock
527 if (tasks_.empty()) {
528 return;
529 }
530 auto now = std::chrono::steady_clock::now();
531
532 for (auto it = tasks_.begin(); it != tasks_.end(); )
533 {
534 if ((*it)->getExpireTime() && *((*it)->getExpireTime()) < now) {
535 if (expireCallback_) {
536 expireCallback_((*it)->getRunnable());
537 }
538 it = tasks_.erase(it);
539 ++expiredCount_;
540 if (justOne) {
541 return;
542 }
543 }
544 else
545 {
546 ++it;
547 }
548 }
549}
550
551void ThreadManager::Impl::setExpireCallback(ExpireCallback expireCallback) {
552 Guard g(mutex_);
553 expireCallback_ = expireCallback;
554}
555
556class SimpleThreadManager : public ThreadManager::Impl {
557
558public:
559 SimpleThreadManager(size_t workerCount = 4, size_t pendingTaskCountMax = 0)
560 : workerCount_(workerCount), pendingTaskCountMax_(pendingTaskCountMax) {}
561
562 void start() override {
563 ThreadManager::Impl::pendingTaskCountMax(pendingTaskCountMax_);
564 ThreadManager::Impl::start();
565 addWorker(workerCount_);
566 }
567
568private:
569 const size_t workerCount_;
570 const size_t pendingTaskCountMax_;
571};
572
573shared_ptr<ThreadManager> ThreadManager::newThreadManager() {
574 return shared_ptr<ThreadManager>(new ThreadManager::Impl());
575}
576
577shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager(size_t count,
578 size_t pendingTaskCountMax) {
579 return shared_ptr<ThreadManager>(new SimpleThreadManager(count, pendingTaskCountMax));
580}
581}
582}
583} // apache::thrift::concurrency