]> git.proxmox.com Git - ceph.git/blob - ceph/src/common/WorkQueue.h
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / common / WorkQueue.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_WORKQUEUE_H
16 #define CEPH_WORKQUEUE_H
17
18 #if defined(WITH_SEASTAR) && !defined(WITH_ALIEN)
19 // for ObjectStore.h
20 struct ThreadPool {
21 struct TPHandle {
22 };
23 };
24
25 #else
26
27 #include <atomic>
28 #include <list>
29 #include <set>
30 #include <string>
31 #include <vector>
32
33 #include "common/ceph_mutex.h"
34 #include "include/unordered_map.h"
35 #include "common/config_obs.h"
36 #include "common/HeartbeatMap.h"
37 #include "common/Thread.h"
38 #include "include/common_fwd.h"
39 #include "include/Context.h"
40 #include "common/HBHandle.h"
41
42
43 /// Pool of threads that share work submitted to multiple work queues.
44 class ThreadPool : public md_config_obs_t {
45 protected:
46 CephContext *cct;
47 std::string name;
48 std::string thread_name;
49 std::string lockname;
50 ceph::mutex _lock;
51 ceph::condition_variable _cond;
52 bool _stop;
53 int _pause;
54 int _draining;
55 ceph::condition_variable _wait_cond;
56
57 public:
58 class TPHandle : public HBHandle {
59 friend class ThreadPool;
60 CephContext *cct;
61 ceph::heartbeat_handle_d *hb;
62 ceph::timespan grace;
63 ceph::timespan suicide_grace;
64 public:
65 TPHandle(
66 CephContext *cct,
67 ceph::heartbeat_handle_d *hb,
68 ceph::timespan grace,
69 ceph::timespan suicide_grace)
70 : cct(cct), hb(hb), grace(grace), suicide_grace(suicide_grace) {}
71 void reset_tp_timeout() override final;
72 void suspend_tp_timeout() override final;
73 };
74 protected:
75
76 /// Basic interface to a work queue used by the worker threads.
77 struct WorkQueue_ {
78 std::string name;
79 ceph::timespan timeout_interval;
80 ceph::timespan suicide_interval;
81 WorkQueue_(std::string n, ceph::timespan ti, ceph::timespan sti)
82 : name(std::move(n)), timeout_interval(ti), suicide_interval(sti)
83 { }
84 virtual ~WorkQueue_() {}
85 /// Remove all work items from the queue.
86 virtual void _clear() = 0;
87 /// Check whether there is anything to do.
88 virtual bool _empty() = 0;
89 /// Get the next work item to process.
90 virtual void *_void_dequeue() = 0;
91 /** @brief Process the work item.
92 * This function will be called several times in parallel
93 * and must therefore be thread-safe. */
94 virtual void _void_process(void *item, TPHandle &handle) = 0;
95 /** @brief Synchronously finish processing a work item.
96 * This function is called after _void_process with the global thread pool lock held,
97 * so at most one copy will execute simultaneously for a given thread pool.
98 * It can be used for non-thread-safe finalization. */
99 virtual void _void_process_finish(void *) = 0;
100 };
101
102 // track thread pool size changes
103 unsigned _num_threads;
104 std::string _thread_num_option;
105 const char **_conf_keys;
106
107 const char **get_tracked_conf_keys() const override {
108 return _conf_keys;
109 }
110 void handle_conf_change(const ConfigProxy& conf,
111 const std::set <std::string> &changed) override;
112
113 public:
114 /** @brief Templated by-value work queue.
115 * Skeleton implementation of a queue that processes items submitted by value.
116 * This is useful if the items are single primitive values or very small objects
117 * (a few bytes). The queue will automatically add itself to the thread pool on
118 * construction and remove itself on destruction. */
119 template<typename T, typename U = T>
120 class WorkQueueVal : public WorkQueue_ {
121 ceph::mutex _lock = ceph::make_mutex("WorkQueueVal::_lock");
122 ThreadPool *pool;
123 std::list<U> to_process;
124 std::list<U> to_finish;
125 virtual void _enqueue(T) = 0;
126 virtual void _enqueue_front(T) = 0;
127 bool _empty() override = 0;
128 virtual U _dequeue() = 0;
129 virtual void _process_finish(U) {}
130
131 void *_void_dequeue() override {
132 {
133 std::lock_guard l(_lock);
134 if (_empty())
135 return 0;
136 U u = _dequeue();
137 to_process.push_back(u);
138 }
139 return ((void*)1); // Not used
140 }
141 void _void_process(void *, TPHandle &handle) override {
142 _lock.lock();
143 ceph_assert(!to_process.empty());
144 U u = to_process.front();
145 to_process.pop_front();
146 _lock.unlock();
147
148 _process(u, handle);
149
150 _lock.lock();
151 to_finish.push_back(u);
152 _lock.unlock();
153 }
154
155 void _void_process_finish(void *) override {
156 _lock.lock();
157 ceph_assert(!to_finish.empty());
158 U u = to_finish.front();
159 to_finish.pop_front();
160 _lock.unlock();
161
162 _process_finish(u);
163 }
164
165 void _clear() override {}
166
167 public:
168 WorkQueueVal(std::string n,
169 ceph::timespan ti,
170 ceph::timespan sti,
171 ThreadPool *p)
172 : WorkQueue_(std::move(n), ti, sti), pool(p) {
173 pool->add_work_queue(this);
174 }
175 ~WorkQueueVal() override {
176 pool->remove_work_queue(this);
177 }
178 void queue(T item) {
179 std::lock_guard l(pool->_lock);
180 _enqueue(item);
181 pool->_cond.notify_one();
182 }
183 void queue_front(T item) {
184 std::lock_guard l(pool->_lock);
185 _enqueue_front(item);
186 pool->_cond.notify_one();
187 }
188 void drain() {
189 pool->drain(this);
190 }
191 protected:
192 void lock() {
193 pool->lock();
194 }
195 void unlock() {
196 pool->unlock();
197 }
198 virtual void _process(U u, TPHandle &) = 0;
199 };
200
201 /** @brief Template by-pointer work queue.
202 * Skeleton implementation of a queue that processes items of a given type submitted as pointers.
203 * This is useful when the work item are large or include dynamically allocated memory. The queue
204 * will automatically add itself to the thread pool on construction and remove itself on
205 * destruction. */
206 template<class T>
207 class WorkQueue : public WorkQueue_ {
208 ThreadPool *pool;
209
210 /// Add a work item to the queue.
211 virtual bool _enqueue(T *) = 0;
212 /// Dequeue a previously submitted work item.
213 virtual void _dequeue(T *) = 0;
214 /// Dequeue a work item and return the original submitted pointer.
215 virtual T *_dequeue() = 0;
216 virtual void _process_finish(T *) {}
217
218 // implementation of virtual methods from WorkQueue_
219 void *_void_dequeue() override {
220 return (void *)_dequeue();
221 }
222 void _void_process(void *p, TPHandle &handle) override {
223 _process(static_cast<T *>(p), handle);
224 }
225 void _void_process_finish(void *p) override {
226 _process_finish(static_cast<T *>(p));
227 }
228
229 protected:
230 /// Process a work item. Called from the worker threads.
231 virtual void _process(T *t, TPHandle &) = 0;
232
233 public:
234 WorkQueue(std::string n,
235 ceph::timespan ti, ceph::timespan sti,
236 ThreadPool* p)
237 : WorkQueue_(std::move(n), ti, sti), pool(p) {
238 pool->add_work_queue(this);
239 }
240 ~WorkQueue() override {
241 pool->remove_work_queue(this);
242 }
243
244 bool queue(T *item) {
245 pool->_lock.lock();
246 bool r = _enqueue(item);
247 pool->_cond.notify_one();
248 pool->_lock.unlock();
249 return r;
250 }
251 void dequeue(T *item) {
252 pool->_lock.lock();
253 _dequeue(item);
254 pool->_lock.unlock();
255 }
256 void clear() {
257 pool->_lock.lock();
258 _clear();
259 pool->_lock.unlock();
260 }
261
262 void lock() {
263 pool->lock();
264 }
265 void unlock() {
266 pool->unlock();
267 }
268 /// wake up the thread pool (without lock held)
269 void wake() {
270 pool->wake();
271 }
272 /// wake up the thread pool (with lock already held)
273 void _wake() {
274 pool->_wake();
275 }
276 void _wait() {
277 pool->_wait();
278 }
279 void drain() {
280 pool->drain(this);
281 }
282
283 };
284
285 template<typename T>
286 class PointerWQ : public WorkQueue_ {
287 public:
288 ~PointerWQ() override {
289 m_pool->remove_work_queue(this);
290 ceph_assert(m_processing == 0);
291 }
292 void drain() {
293 {
294 // if this queue is empty and not processing, don't wait for other
295 // queues to finish processing
296 std::lock_guard l(m_pool->_lock);
297 if (m_processing == 0 && m_items.empty()) {
298 return;
299 }
300 }
301 m_pool->drain(this);
302 }
303 void queue(T *item) {
304 std::lock_guard l(m_pool->_lock);
305 m_items.push_back(item);
306 m_pool->_cond.notify_one();
307 }
308 bool empty() {
309 std::lock_guard l(m_pool->_lock);
310 return _empty();
311 }
312 protected:
313 PointerWQ(std::string n,
314 ceph::timespan ti, ceph::timespan sti,
315 ThreadPool* p)
316 : WorkQueue_(std::move(n), ti, sti), m_pool(p), m_processing(0) {
317 }
318 void register_work_queue() {
319 m_pool->add_work_queue(this);
320 }
321 void _clear() override {
322 ceph_assert(ceph_mutex_is_locked(m_pool->_lock));
323 m_items.clear();
324 }
325 bool _empty() override {
326 ceph_assert(ceph_mutex_is_locked(m_pool->_lock));
327 return m_items.empty();
328 }
329 void *_void_dequeue() override {
330 ceph_assert(ceph_mutex_is_locked(m_pool->_lock));
331 if (m_items.empty()) {
332 return NULL;
333 }
334
335 ++m_processing;
336 T *item = m_items.front();
337 m_items.pop_front();
338 return item;
339 }
340 void _void_process(void *item, ThreadPool::TPHandle &handle) override {
341 process(reinterpret_cast<T *>(item));
342 }
343 void _void_process_finish(void *item) override {
344 ceph_assert(ceph_mutex_is_locked(m_pool->_lock));
345 ceph_assert(m_processing > 0);
346 --m_processing;
347 }
348
349 virtual void process(T *item) = 0;
350 void process_finish() {
351 std::lock_guard locker(m_pool->_lock);
352 _void_process_finish(nullptr);
353 }
354
355 T *front() {
356 ceph_assert(ceph_mutex_is_locked(m_pool->_lock));
357 if (m_items.empty()) {
358 return NULL;
359 }
360 return m_items.front();
361 }
362 void requeue_front(T *item) {
363 std::lock_guard pool_locker(m_pool->_lock);
364 _void_process_finish(nullptr);
365 m_items.push_front(item);
366 }
367 void requeue_back(T *item) {
368 std::lock_guard pool_locker(m_pool->_lock);
369 _void_process_finish(nullptr);
370 m_items.push_back(item);
371 }
372 void signal() {
373 std::lock_guard pool_locker(m_pool->_lock);
374 m_pool->_cond.notify_one();
375 }
376 ceph::mutex &get_pool_lock() {
377 return m_pool->_lock;
378 }
379 private:
380 ThreadPool *m_pool;
381 std::list<T *> m_items;
382 uint32_t m_processing;
383 };
384 protected:
385 std::vector<WorkQueue_*> work_queues;
386 int next_work_queue = 0;
387
388
389 // threads
390 struct WorkThread : public Thread {
391 ThreadPool *pool;
392 // cppcheck-suppress noExplicitConstructor
393 WorkThread(ThreadPool *p) : pool(p) {}
394 void *entry() override {
395 pool->worker(this);
396 return 0;
397 }
398 };
399
400 std::set<WorkThread*> _threads;
401 std::list<WorkThread*> _old_threads; ///< need to be joined
402 int processing;
403
404 void start_threads();
405 void join_old_threads();
406 virtual void worker(WorkThread *wt);
407
408 public:
409 ThreadPool(CephContext *cct_, std::string nm, std::string tn, int n, const char *option = NULL);
410 ~ThreadPool() override;
411
412 /// return number of threads currently running
413 int get_num_threads() {
414 std::lock_guard l(_lock);
415 return _num_threads;
416 }
417
418 /// assign a work queue to this thread pool
419 void add_work_queue(WorkQueue_* wq) {
420 std::lock_guard l(_lock);
421 work_queues.push_back(wq);
422 }
423 /// remove a work queue from this thread pool
424 void remove_work_queue(WorkQueue_* wq) {
425 std::lock_guard l(_lock);
426 unsigned i = 0;
427 while (work_queues[i] != wq)
428 i++;
429 for (i++; i < work_queues.size(); i++)
430 work_queues[i-1] = work_queues[i];
431 ceph_assert(i == work_queues.size());
432 work_queues.resize(i-1);
433 }
434
435 /// take thread pool lock
436 void lock() {
437 _lock.lock();
438 }
439 /// release thread pool lock
440 void unlock() {
441 _lock.unlock();
442 }
443
444 /// wait for a kick on this thread pool
445 void wait(ceph::condition_variable &c) {
446 std::unique_lock l(_lock, std::adopt_lock);
447 c.wait(l);
448 }
449
450 /// wake up a waiter (with lock already held)
451 void _wake() {
452 _cond.notify_all();
453 }
454 /// wake up a waiter (without lock held)
455 void wake() {
456 std::lock_guard l(_lock);
457 _cond.notify_all();
458 }
459 void _wait() {
460 std::unique_lock l(_lock, std::adopt_lock);
461 _cond.wait(l);
462 }
463
464 /// start thread pool thread
465 void start();
466 /// stop thread pool thread
467 void stop(bool clear_after=true);
468 /// pause thread pool (if it not already paused)
469 void pause();
470 /// pause initiation of new work
471 void pause_new();
472 /// resume work in thread pool. must match each pause() call 1:1 to resume.
473 void unpause();
474 /** @brief Wait until work completes.
475 * If the parameter is NULL, blocks until all threads are idle.
476 * If it is not NULL, blocks until the given work queue does not have
477 * any items left to process. */
478 void drain(WorkQueue_* wq = 0);
479 };
480
481 class GenContextWQ :
482 public ThreadPool::WorkQueueVal<GenContext<ThreadPool::TPHandle&>*> {
483 std::list<GenContext<ThreadPool::TPHandle&>*> _queue;
484 public:
485 GenContextWQ(const std::string &name, ceph::timespan ti, ThreadPool *tp)
486 : ThreadPool::WorkQueueVal<
487 GenContext<ThreadPool::TPHandle&>*>(name, ti, ti*10, tp) {}
488
489 void _enqueue(GenContext<ThreadPool::TPHandle&> *c) override {
490 _queue.push_back(c);
491 }
492 void _enqueue_front(GenContext<ThreadPool::TPHandle&> *c) override {
493 _queue.push_front(c);
494 }
495 bool _empty() override {
496 return _queue.empty();
497 }
498 GenContext<ThreadPool::TPHandle&> *_dequeue() override {
499 ceph_assert(!_queue.empty());
500 GenContext<ThreadPool::TPHandle&> *c = _queue.front();
501 _queue.pop_front();
502 return c;
503 }
504 void _process(GenContext<ThreadPool::TPHandle&> *c,
505 ThreadPool::TPHandle &tp) override {
506 c->complete(tp);
507 }
508 };
509
510 class C_QueueInWQ : public Context {
511 GenContextWQ *wq;
512 GenContext<ThreadPool::TPHandle&> *c;
513 public:
514 C_QueueInWQ(GenContextWQ *wq, GenContext<ThreadPool::TPHandle &> *c)
515 : wq(wq), c(c) {}
516 void finish(int) override {
517 wq->queue(c);
518 }
519 };
520
521 /// Work queue that asynchronously completes contexts (executes callbacks).
522 /// @see Finisher
523 class ContextWQ : public ThreadPool::PointerWQ<Context> {
524 public:
525 ContextWQ(const std::string &name, ceph::timespan ti, ThreadPool *tp)
526 : ThreadPool::PointerWQ<Context>(name, ti, ceph::timespan::zero(), tp) {
527 this->register_work_queue();
528 }
529
530 void queue(Context *ctx, int result = 0) {
531 if (result != 0) {
532 std::lock_guard locker(m_lock);
533 m_context_results[ctx] = result;
534 }
535 ThreadPool::PointerWQ<Context>::queue(ctx);
536 }
537 protected:
538 void _clear() override {
539 ThreadPool::PointerWQ<Context>::_clear();
540
541 std::lock_guard locker(m_lock);
542 m_context_results.clear();
543 }
544
545 void process(Context *ctx) override {
546 int result = 0;
547 {
548 std::lock_guard locker(m_lock);
549 ceph::unordered_map<Context *, int>::iterator it =
550 m_context_results.find(ctx);
551 if (it != m_context_results.end()) {
552 result = it->second;
553 m_context_results.erase(it);
554 }
555 }
556 ctx->complete(result);
557 }
558 private:
559 ceph::mutex m_lock = ceph::make_mutex("ContextWQ::m_lock");
560 ceph::unordered_map<Context*, int> m_context_results;
561 };
562
563 class ShardedThreadPool {
564
565 CephContext *cct;
566 std::string name;
567 std::string thread_name;
568 std::string lockname;
569 ceph::mutex shardedpool_lock;
570 ceph::condition_variable shardedpool_cond;
571 ceph::condition_variable wait_cond;
572 uint32_t num_threads;
573
574 std::atomic<bool> stop_threads = { false };
575 std::atomic<bool> pause_threads = { false };
576 std::atomic<bool> drain_threads = { false };
577
578 uint32_t num_paused;
579 uint32_t num_drained;
580
581 public:
582
583 class BaseShardedWQ {
584
585 public:
586 ceph::timespan timeout_interval, suicide_interval;
587 BaseShardedWQ(ceph::timespan ti, ceph::timespan sti)
588 :timeout_interval(ti), suicide_interval(sti) {}
589 virtual ~BaseShardedWQ() {}
590
591 virtual void _process(uint32_t thread_index, ceph::heartbeat_handle_d *hb ) = 0;
592 virtual void return_waiting_threads() = 0;
593 virtual void stop_return_waiting_threads() = 0;
594 virtual bool is_shard_empty(uint32_t thread_index) = 0;
595 };
596
597 template <typename T>
598 class ShardedWQ: public BaseShardedWQ {
599
600 ShardedThreadPool* sharded_pool;
601
602 protected:
603 virtual void _enqueue(T&&) = 0;
604 virtual void _enqueue_front(T&&) = 0;
605
606
607 public:
608 ShardedWQ(ceph::timespan ti,
609 ceph::timespan sti, ShardedThreadPool* tp)
610 : BaseShardedWQ(ti, sti), sharded_pool(tp) {
611 tp->set_wq(this);
612 }
613 ~ShardedWQ() override {}
614
615 void queue(T&& item) {
616 _enqueue(std::move(item));
617 }
618 void queue_front(T&& item) {
619 _enqueue_front(std::move(item));
620 }
621 void drain() {
622 sharded_pool->drain();
623 }
624
625 };
626
627 private:
628
629 BaseShardedWQ* wq;
630 // threads
631 struct WorkThreadSharded : public Thread {
632 ShardedThreadPool *pool;
633 uint32_t thread_index;
634 WorkThreadSharded(ShardedThreadPool *p, uint32_t pthread_index): pool(p),
635 thread_index(pthread_index) {}
636 void *entry() override {
637 pool->shardedthreadpool_worker(thread_index);
638 return 0;
639 }
640 };
641
642 std::vector<WorkThreadSharded*> threads_shardedpool;
643 void start_threads();
644 void shardedthreadpool_worker(uint32_t thread_index);
645 void set_wq(BaseShardedWQ* swq) {
646 wq = swq;
647 }
648
649
650
651 public:
652
653 ShardedThreadPool(CephContext *cct_, std::string nm, std::string tn, uint32_t pnum_threads);
654
655 ~ShardedThreadPool(){};
656
657 /// start thread pool thread
658 void start();
659 /// stop thread pool thread
660 void stop();
661 /// pause thread pool (if it not already paused)
662 void pause();
663 /// pause initiation of new work
664 void pause_new();
665 /// resume work in thread pool. must match each pause() call 1:1 to resume.
666 void unpause();
667 /// wait for all work to complete
668 void drain();
669
670 };
671
672 #endif
673
674 #endif