]> git.proxmox.com Git - ceph.git/blame - ceph/src/common/WorkQueue.h
import ceph nautilus 14.2.2
[ceph.git] / ceph / src / common / WorkQueue.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#ifndef CEPH_WORKQUEUE_H
16#define CEPH_WORKQUEUE_H
17
11fdf7f2
TL
18#ifdef WITH_SEASTAR
19// for ObjectStore.h
20struct ThreadPool {
21 struct TPHandle {
22 };
23};
24
25#else
7c673cae 26
31f18b77 27#include <atomic>
11fdf7f2
TL
28#include <list>
29#include <set>
30#include <string>
31#include <vector>
32
33#include "common/ceph_mutex.h"
34#include "include/unordered_map.h"
35#include "common/config_obs.h"
36#include "common/HeartbeatMap.h"
37#include "common/Thread.h"
38#include "include/Context.h"
31f18b77 39
7c673cae
FG
40class CephContext;
41
42/// Pool of threads that share work submitted to multiple work queues.
43class ThreadPool : public md_config_obs_t {
44 CephContext *cct;
11fdf7f2
TL
45 std::string name;
46 std::string thread_name;
47 std::string lockname;
48 ceph::mutex _lock;
49 ceph::condition_variable _cond;
7c673cae
FG
50 bool _stop;
51 int _pause;
52 int _draining;
11fdf7f2 53 ceph::condition_variable _wait_cond;
7c673cae
FG
54
55public:
56 class TPHandle {
57 friend class ThreadPool;
58 CephContext *cct;
59 heartbeat_handle_d *hb;
11fdf7f2
TL
60 ceph::coarse_mono_clock::rep grace;
61 ceph::coarse_mono_clock::rep suicide_grace;
7c673cae
FG
62 public:
63 TPHandle(
64 CephContext *cct,
65 heartbeat_handle_d *hb,
66 time_t grace,
67 time_t suicide_grace)
68 : cct(cct), hb(hb), grace(grace), suicide_grace(suicide_grace) {}
69 void reset_tp_timeout();
70 void suspend_tp_timeout();
71 };
72private:
73
74 /// Basic interface to a work queue used by the worker threads.
75 struct WorkQueue_ {
11fdf7f2 76 std::string name;
7c673cae 77 time_t timeout_interval, suicide_interval;
11fdf7f2 78 WorkQueue_(std::string n, time_t ti, time_t sti)
7c673cae
FG
79 : name(std::move(n)), timeout_interval(ti), suicide_interval(sti)
80 { }
81 virtual ~WorkQueue_() {}
82 /// Remove all work items from the queue.
83 virtual void _clear() = 0;
84 /// Check whether there is anything to do.
85 virtual bool _empty() = 0;
86 /// Get the next work item to process.
87 virtual void *_void_dequeue() = 0;
88 /** @brief Process the work item.
89 * This function will be called several times in parallel
90 * and must therefore be thread-safe. */
91 virtual void _void_process(void *item, TPHandle &handle) = 0;
92 /** @brief Synchronously finish processing a work item.
93 * This function is called after _void_process with the global thread pool lock held,
94 * so at most one copy will execute simultaneously for a given thread pool.
95 * It can be used for non-thread-safe finalization. */
96 virtual void _void_process_finish(void *) = 0;
97 };
98
99 // track thread pool size changes
100 unsigned _num_threads;
11fdf7f2 101 std::string _thread_num_option;
7c673cae
FG
102 const char **_conf_keys;
103
104 const char **get_tracked_conf_keys() const override {
105 return _conf_keys;
106 }
11fdf7f2 107 void handle_conf_change(const ConfigProxy& conf,
7c673cae
FG
108 const std::set <std::string> &changed) override;
109
110public:
111 /** @brief Work queue that processes several submitted items at once.
112 * The queue will automatically add itself to the thread pool on construction
113 * and remove itself on destruction. */
114 template<class T>
115 class BatchWorkQueue : public WorkQueue_ {
116 ThreadPool *pool;
117
118 virtual bool _enqueue(T *) = 0;
119 virtual void _dequeue(T *) = 0;
11fdf7f2
TL
120 virtual void _dequeue(std::list<T*> *) = 0;
121 virtual void _process_finish(const std::list<T*> &) {}
7c673cae
FG
122
123 // virtual methods from WorkQueue_ below
124 void *_void_dequeue() override {
11fdf7f2 125 std::list<T*> *out(new std::list<T*>);
7c673cae
FG
126 _dequeue(out);
127 if (!out->empty()) {
128 return (void *)out;
129 } else {
130 delete out;
131 return 0;
132 }
133 }
134 void _void_process(void *p, TPHandle &handle) override {
11fdf7f2 135 _process(*((std::list<T*>*)p), handle);
7c673cae
FG
136 }
137 void _void_process_finish(void *p) override {
11fdf7f2
TL
138 _process_finish(*(std::list<T*>*)p);
139 delete (std::list<T*> *)p;
7c673cae
FG
140 }
141
142 protected:
11fdf7f2 143 virtual void _process(const std::list<T*> &items, TPHandle &handle) = 0;
7c673cae
FG
144
145 public:
11fdf7f2 146 BatchWorkQueue(std::string n, time_t ti, time_t sti, ThreadPool* p)
7c673cae
FG
147 : WorkQueue_(std::move(n), ti, sti), pool(p) {
148 pool->add_work_queue(this);
149 }
150 ~BatchWorkQueue() override {
151 pool->remove_work_queue(this);
152 }
153
154 bool queue(T *item) {
11fdf7f2 155 pool->_lock.lock();
7c673cae 156 bool r = _enqueue(item);
11fdf7f2
TL
157 pool->_cond.notify_one();
158 pool->_lock.unlock();
7c673cae
FG
159 return r;
160 }
161 void dequeue(T *item) {
11fdf7f2 162 pool->_lock.lock();
7c673cae 163 _dequeue(item);
11fdf7f2 164 pool->_lock.unlock();
7c673cae
FG
165 }
166 void clear() {
11fdf7f2 167 pool->_lock.lock();
7c673cae 168 _clear();
11fdf7f2 169 pool->_lock.unlock();
7c673cae
FG
170 }
171
172 void lock() {
173 pool->lock();
174 }
175 void unlock() {
176 pool->unlock();
177 }
178 void wake() {
179 pool->wake();
180 }
181 void _wake() {
182 pool->_wake();
183 }
184 void drain() {
185 pool->drain(this);
186 }
187
188 };
189
190 /** @brief Templated by-value work queue.
191 * Skeleton implementation of a queue that processes items submitted by value.
192 * This is useful if the items are single primitive values or very small objects
193 * (a few bytes). The queue will automatically add itself to the thread pool on
194 * construction and remove itself on destruction. */
195 template<typename T, typename U = T>
196 class WorkQueueVal : public WorkQueue_ {
11fdf7f2 197 ceph::mutex _lock = ceph::make_mutex("WorkQueueVal::_lock");
7c673cae 198 ThreadPool *pool;
11fdf7f2
TL
199 std::list<U> to_process;
200 std::list<U> to_finish;
7c673cae
FG
201 virtual void _enqueue(T) = 0;
202 virtual void _enqueue_front(T) = 0;
203 bool _empty() override = 0;
204 virtual U _dequeue() = 0;
205 virtual void _process_finish(U) {}
206
207 void *_void_dequeue() override {
208 {
11fdf7f2 209 std::lock_guard l(_lock);
7c673cae
FG
210 if (_empty())
211 return 0;
212 U u = _dequeue();
213 to_process.push_back(u);
214 }
215 return ((void*)1); // Not used
216 }
217 void _void_process(void *, TPHandle &handle) override {
11fdf7f2
TL
218 _lock.lock();
219 ceph_assert(!to_process.empty());
7c673cae
FG
220 U u = to_process.front();
221 to_process.pop_front();
11fdf7f2 222 _lock.unlock();
7c673cae
FG
223
224 _process(u, handle);
225
11fdf7f2 226 _lock.lock();
7c673cae 227 to_finish.push_back(u);
11fdf7f2 228 _lock.unlock();
7c673cae
FG
229 }
230
231 void _void_process_finish(void *) override {
11fdf7f2
TL
232 _lock.lock();
233 ceph_assert(!to_finish.empty());
7c673cae
FG
234 U u = to_finish.front();
235 to_finish.pop_front();
11fdf7f2 236 _lock.unlock();
7c673cae
FG
237
238 _process_finish(u);
239 }
240
241 void _clear() override {}
242
243 public:
11fdf7f2
TL
244 WorkQueueVal(std::string n, time_t ti, time_t sti, ThreadPool *p)
245 : WorkQueue_(std::move(n), ti, sti), pool(p) {
7c673cae
FG
246 pool->add_work_queue(this);
247 }
248 ~WorkQueueVal() override {
249 pool->remove_work_queue(this);
250 }
251 void queue(T item) {
11fdf7f2 252 std::lock_guard l(pool->_lock);
7c673cae 253 _enqueue(item);
11fdf7f2 254 pool->_cond.notify_one();
7c673cae
FG
255 }
256 void queue_front(T item) {
11fdf7f2 257 std::lock_guard l(pool->_lock);
7c673cae 258 _enqueue_front(item);
11fdf7f2 259 pool->_cond.notify_one();
7c673cae
FG
260 }
261 void drain() {
262 pool->drain(this);
263 }
264 protected:
265 void lock() {
266 pool->lock();
267 }
268 void unlock() {
269 pool->unlock();
270 }
271 virtual void _process(U u, TPHandle &) = 0;
272 };
273
274 /** @brief Template by-pointer work queue.
275 * Skeleton implementation of a queue that processes items of a given type submitted as pointers.
276 * This is useful when the work item are large or include dynamically allocated memory. The queue
277 * will automatically add itself to the thread pool on construction and remove itself on
278 * destruction. */
279 template<class T>
280 class WorkQueue : public WorkQueue_ {
281 ThreadPool *pool;
282
283 /// Add a work item to the queue.
284 virtual bool _enqueue(T *) = 0;
285 /// Dequeue a previously submitted work item.
286 virtual void _dequeue(T *) = 0;
287 /// Dequeue a work item and return the original submitted pointer.
288 virtual T *_dequeue() = 0;
289 virtual void _process_finish(T *) {}
290
291 // implementation of virtual methods from WorkQueue_
292 void *_void_dequeue() override {
293 return (void *)_dequeue();
294 }
295 void _void_process(void *p, TPHandle &handle) override {
296 _process(static_cast<T *>(p), handle);
297 }
298 void _void_process_finish(void *p) override {
299 _process_finish(static_cast<T *>(p));
300 }
301
302 protected:
303 /// Process a work item. Called from the worker threads.
304 virtual void _process(T *t, TPHandle &) = 0;
305
306 public:
11fdf7f2 307 WorkQueue(std::string n, time_t ti, time_t sti, ThreadPool* p)
7c673cae
FG
308 : WorkQueue_(std::move(n), ti, sti), pool(p) {
309 pool->add_work_queue(this);
310 }
311 ~WorkQueue() override {
312 pool->remove_work_queue(this);
313 }
314
315 bool queue(T *item) {
11fdf7f2 316 pool->_lock.lock();
7c673cae 317 bool r = _enqueue(item);
11fdf7f2
TL
318 pool->_cond.notify_one();
319 pool->_lock.unlock();
7c673cae
FG
320 return r;
321 }
322 void dequeue(T *item) {
11fdf7f2 323 pool->_lock.lock();
7c673cae 324 _dequeue(item);
11fdf7f2 325 pool->_lock.unlock();
7c673cae
FG
326 }
327 void clear() {
11fdf7f2 328 pool->_lock.lock();
7c673cae 329 _clear();
11fdf7f2 330 pool->_lock.unlock();
7c673cae
FG
331 }
332
333 void lock() {
334 pool->lock();
335 }
336 void unlock() {
337 pool->unlock();
338 }
339 /// wake up the thread pool (without lock held)
340 void wake() {
341 pool->wake();
342 }
343 /// wake up the thread pool (with lock already held)
344 void _wake() {
345 pool->_wake();
346 }
347 void _wait() {
348 pool->_wait();
349 }
350 void drain() {
351 pool->drain(this);
352 }
353
354 };
355
356 template<typename T>
357 class PointerWQ : public WorkQueue_ {
358 public:
359 ~PointerWQ() override {
360 m_pool->remove_work_queue(this);
11fdf7f2 361 ceph_assert(m_processing == 0);
7c673cae
FG
362 }
363 void drain() {
364 {
365 // if this queue is empty and not processing, don't wait for other
366 // queues to finish processing
11fdf7f2 367 std::lock_guard l(m_pool->_lock);
7c673cae
FG
368 if (m_processing == 0 && m_items.empty()) {
369 return;
370 }
371 }
372 m_pool->drain(this);
373 }
374 void queue(T *item) {
11fdf7f2 375 std::lock_guard l(m_pool->_lock);
7c673cae 376 m_items.push_back(item);
11fdf7f2 377 m_pool->_cond.notify_one();
7c673cae
FG
378 }
379 bool empty() {
11fdf7f2 380 std::lock_guard l(m_pool->_lock);
7c673cae
FG
381 return _empty();
382 }
383 protected:
11fdf7f2 384 PointerWQ(std::string n, time_t ti, time_t sti, ThreadPool* p)
7c673cae
FG
385 : WorkQueue_(std::move(n), ti, sti), m_pool(p), m_processing(0) {
386 }
224ce89b
WB
387 void register_work_queue() {
388 m_pool->add_work_queue(this);
389 }
7c673cae 390 void _clear() override {
11fdf7f2 391 ceph_assert(ceph_mutex_is_locked(m_pool->_lock));
7c673cae
FG
392 m_items.clear();
393 }
394 bool _empty() override {
11fdf7f2 395 ceph_assert(ceph_mutex_is_locked(m_pool->_lock));
7c673cae
FG
396 return m_items.empty();
397 }
398 void *_void_dequeue() override {
11fdf7f2 399 ceph_assert(ceph_mutex_is_locked(m_pool->_lock));
7c673cae
FG
400 if (m_items.empty()) {
401 return NULL;
402 }
403
404 ++m_processing;
405 T *item = m_items.front();
406 m_items.pop_front();
407 return item;
408 }
409 void _void_process(void *item, ThreadPool::TPHandle &handle) override {
410 process(reinterpret_cast<T *>(item));
411 }
412 void _void_process_finish(void *item) override {
11fdf7f2
TL
413 ceph_assert(ceph_mutex_is_locked(m_pool->_lock));
414 ceph_assert(m_processing > 0);
7c673cae
FG
415 --m_processing;
416 }
417
418 virtual void process(T *item) = 0;
419 void process_finish() {
11fdf7f2 420 std::lock_guard locker(m_pool->_lock);
7c673cae
FG
421 _void_process_finish(nullptr);
422 }
423
424 T *front() {
11fdf7f2 425 ceph_assert(ceph_mutex_is_locked(m_pool->_lock));
7c673cae
FG
426 if (m_items.empty()) {
427 return NULL;
428 }
429 return m_items.front();
430 }
81eedcae 431 void requeue_front(T *item) {
11fdf7f2 432 std::lock_guard pool_locker(m_pool->_lock);
7c673cae
FG
433 _void_process_finish(nullptr);
434 m_items.push_front(item);
435 }
81eedcae
TL
436 void requeue_back(T *item) {
437 std::lock_guard pool_locker(m_pool->_lock);
438 _void_process_finish(nullptr);
439 m_items.push_back(item);
440 }
7c673cae 441 void signal() {
11fdf7f2
TL
442 std::lock_guard pool_locker(m_pool->_lock);
443 m_pool->_cond.notify_one();
7c673cae 444 }
11fdf7f2 445 ceph::mutex &get_pool_lock() {
7c673cae
FG
446 return m_pool->_lock;
447 }
448 private:
449 ThreadPool *m_pool;
450 std::list<T *> m_items;
451 uint32_t m_processing;
452 };
453private:
11fdf7f2 454 std::vector<WorkQueue_*> work_queues;
7c673cae
FG
455 int next_work_queue = 0;
456
457
458 // threads
459 struct WorkThread : public Thread {
460 ThreadPool *pool;
461 // cppcheck-suppress noExplicitConstructor
462 WorkThread(ThreadPool *p) : pool(p) {}
463 void *entry() override {
464 pool->worker(this);
465 return 0;
466 }
467 };
468
11fdf7f2
TL
469 std::set<WorkThread*> _threads;
470 std::list<WorkThread*> _old_threads; ///< need to be joined
7c673cae
FG
471 int processing;
472
473 void start_threads();
474 void join_old_threads();
475 void worker(WorkThread *wt);
476
477public:
11fdf7f2 478 ThreadPool(CephContext *cct_, std::string nm, std::string tn, int n, const char *option = NULL);
7c673cae
FG
479 ~ThreadPool() override;
480
481 /// return number of threads currently running
482 int get_num_threads() {
11fdf7f2 483 std::lock_guard l(_lock);
7c673cae
FG
484 return _num_threads;
485 }
486
487 /// assign a work queue to this thread pool
488 void add_work_queue(WorkQueue_* wq) {
11fdf7f2 489 std::lock_guard l(_lock);
7c673cae
FG
490 work_queues.push_back(wq);
491 }
492 /// remove a work queue from this thread pool
493 void remove_work_queue(WorkQueue_* wq) {
11fdf7f2 494 std::lock_guard l(_lock);
7c673cae
FG
495 unsigned i = 0;
496 while (work_queues[i] != wq)
497 i++;
498 for (i++; i < work_queues.size(); i++)
499 work_queues[i-1] = work_queues[i];
11fdf7f2 500 ceph_assert(i == work_queues.size());
7c673cae
FG
501 work_queues.resize(i-1);
502 }
503
504 /// take thread pool lock
505 void lock() {
11fdf7f2 506 _lock.lock();
7c673cae
FG
507 }
508 /// release thread pool lock
509 void unlock() {
11fdf7f2 510 _lock.unlock();
7c673cae
FG
511 }
512
513 /// wait for a kick on this thread pool
11fdf7f2
TL
514 void wait(ceph::condition_variable &c) {
515 std::unique_lock l(_lock, std::adopt_lock);
516 c.wait(l);
7c673cae
FG
517 }
518
519 /// wake up a waiter (with lock already held)
520 void _wake() {
11fdf7f2 521 _cond.notify_all();
7c673cae
FG
522 }
523 /// wake up a waiter (without lock held)
524 void wake() {
11fdf7f2
TL
525 std::lock_guard l(_lock);
526 _cond.notify_all();
7c673cae
FG
527 }
528 void _wait() {
11fdf7f2
TL
529 std::unique_lock l(_lock, std::adopt_lock);
530 _cond.wait(l);
7c673cae
FG
531 }
532
533 /// start thread pool thread
534 void start();
535 /// stop thread pool thread
536 void stop(bool clear_after=true);
537 /// pause thread pool (if it not already paused)
538 void pause();
539 /// pause initiation of new work
540 void pause_new();
541 /// resume work in thread pool. must match each pause() call 1:1 to resume.
542 void unpause();
543 /** @brief Wait until work completes.
544 * If the parameter is NULL, blocks until all threads are idle.
545 * If it is not NULL, blocks until the given work queue does not have
546 * any items left to process. */
547 void drain(WorkQueue_* wq = 0);
7c673cae
FG
548};
549
550class GenContextWQ :
551 public ThreadPool::WorkQueueVal<GenContext<ThreadPool::TPHandle&>*> {
11fdf7f2 552 std::list<GenContext<ThreadPool::TPHandle&>*> _queue;
7c673cae 553public:
11fdf7f2 554 GenContextWQ(const std::string &name, time_t ti, ThreadPool *tp)
7c673cae
FG
555 : ThreadPool::WorkQueueVal<
556 GenContext<ThreadPool::TPHandle&>*>(name, ti, ti*10, tp) {}
557
558 void _enqueue(GenContext<ThreadPool::TPHandle&> *c) override {
559 _queue.push_back(c);
560 }
561 void _enqueue_front(GenContext<ThreadPool::TPHandle&> *c) override {
562 _queue.push_front(c);
563 }
564 bool _empty() override {
565 return _queue.empty();
566 }
567 GenContext<ThreadPool::TPHandle&> *_dequeue() override {
11fdf7f2 568 ceph_assert(!_queue.empty());
7c673cae
FG
569 GenContext<ThreadPool::TPHandle&> *c = _queue.front();
570 _queue.pop_front();
571 return c;
572 }
573 void _process(GenContext<ThreadPool::TPHandle&> *c,
574 ThreadPool::TPHandle &tp) override {
575 c->complete(tp);
576 }
577};
578
579class C_QueueInWQ : public Context {
580 GenContextWQ *wq;
581 GenContext<ThreadPool::TPHandle&> *c;
582public:
583 C_QueueInWQ(GenContextWQ *wq, GenContext<ThreadPool::TPHandle &> *c)
584 : wq(wq), c(c) {}
585 void finish(int) override {
586 wq->queue(c);
587 }
588};
589
590/// Work queue that asynchronously completes contexts (executes callbacks).
591/// @see Finisher
592class ContextWQ : public ThreadPool::PointerWQ<Context> {
593public:
11fdf7f2
TL
594 ContextWQ(const std::string &name, time_t ti, ThreadPool *tp)
595 : ThreadPool::PointerWQ<Context>(name, ti, 0, tp) {
224ce89b 596 this->register_work_queue();
7c673cae
FG
597 }
598
599 void queue(Context *ctx, int result = 0) {
600 if (result != 0) {
11fdf7f2 601 std::lock_guard locker(m_lock);
7c673cae
FG
602 m_context_results[ctx] = result;
603 }
604 ThreadPool::PointerWQ<Context>::queue(ctx);
605 }
606protected:
607 void _clear() override {
608 ThreadPool::PointerWQ<Context>::_clear();
609
11fdf7f2 610 std::lock_guard locker(m_lock);
7c673cae
FG
611 m_context_results.clear();
612 }
613
614 void process(Context *ctx) override {
615 int result = 0;
616 {
11fdf7f2 617 std::lock_guard locker(m_lock);
7c673cae
FG
618 ceph::unordered_map<Context *, int>::iterator it =
619 m_context_results.find(ctx);
620 if (it != m_context_results.end()) {
621 result = it->second;
622 m_context_results.erase(it);
623 }
624 }
625 ctx->complete(result);
626 }
627private:
11fdf7f2 628 ceph::mutex m_lock = ceph::make_mutex("ContextWQ::m_lock");
7c673cae
FG
629 ceph::unordered_map<Context*, int> m_context_results;
630};
631
632class ShardedThreadPool {
633
634 CephContext *cct;
11fdf7f2
TL
635 std::string name;
636 std::string thread_name;
637 std::string lockname;
638 ceph::mutex shardedpool_lock;
639 ceph::condition_variable shardedpool_cond;
640 ceph::condition_variable wait_cond;
7c673cae 641 uint32_t num_threads;
31f18b77
FG
642
643 std::atomic<bool> stop_threads = { false };
644 std::atomic<bool> pause_threads = { false };
645 std::atomic<bool> drain_threads = { false };
646
7c673cae
FG
647 uint32_t num_paused;
648 uint32_t num_drained;
649
650public:
651
652 class BaseShardedWQ {
653
654 public:
655 time_t timeout_interval, suicide_interval;
656 BaseShardedWQ(time_t ti, time_t sti):timeout_interval(ti), suicide_interval(sti) {}
657 virtual ~BaseShardedWQ() {}
658
659 virtual void _process(uint32_t thread_index, heartbeat_handle_d *hb ) = 0;
660 virtual void return_waiting_threads() = 0;
11fdf7f2 661 virtual void stop_return_waiting_threads() = 0;
7c673cae
FG
662 virtual bool is_shard_empty(uint32_t thread_index) = 0;
663 };
664
665 template <typename T>
666 class ShardedWQ: public BaseShardedWQ {
667
668 ShardedThreadPool* sharded_pool;
669
670 protected:
11fdf7f2
TL
671 virtual void _enqueue(T&&) = 0;
672 virtual void _enqueue_front(T&&) = 0;
7c673cae
FG
673
674
675 public:
676 ShardedWQ(time_t ti, time_t sti, ShardedThreadPool* tp): BaseShardedWQ(ti, sti),
677 sharded_pool(tp) {
678 tp->set_wq(this);
679 }
680 ~ShardedWQ() override {}
681
11fdf7f2
TL
682 void queue(T&& item) {
683 _enqueue(std::move(item));
7c673cae 684 }
11fdf7f2
TL
685 void queue_front(T&& item) {
686 _enqueue_front(std::move(item));
7c673cae
FG
687 }
688 void drain() {
689 sharded_pool->drain();
690 }
691
692 };
693
694private:
695
696 BaseShardedWQ* wq;
697 // threads
698 struct WorkThreadSharded : public Thread {
699 ShardedThreadPool *pool;
700 uint32_t thread_index;
701 WorkThreadSharded(ShardedThreadPool *p, uint32_t pthread_index): pool(p),
702 thread_index(pthread_index) {}
703 void *entry() override {
704 pool->shardedthreadpool_worker(thread_index);
705 return 0;
706 }
707 };
708
11fdf7f2 709 std::vector<WorkThreadSharded*> threads_shardedpool;
7c673cae
FG
710 void start_threads();
711 void shardedthreadpool_worker(uint32_t thread_index);
712 void set_wq(BaseShardedWQ* swq) {
713 wq = swq;
714 }
715
716
717
718public:
719
11fdf7f2 720 ShardedThreadPool(CephContext *cct_, std::string nm, std::string tn, uint32_t pnum_threads);
7c673cae
FG
721
722 ~ShardedThreadPool(){};
723
724 /// start thread pool thread
725 void start();
726 /// stop thread pool thread
727 void stop();
728 /// pause thread pool (if it not already paused)
729 void pause();
730 /// pause initiation of new work
731 void pause_new();
732 /// resume work in thread pool. must match each pause() call 1:1 to resume.
733 void unpause();
734 /// wait for all work to complete
735 void drain();
736
737};
738
11fdf7f2 739#endif
7c673cae
FG
740
741#endif