]> git.proxmox.com Git - ceph.git/blame - ceph/src/common/WorkQueue.h
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / common / WorkQueue.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#ifndef CEPH_WORKQUEUE_H
16#define CEPH_WORKQUEUE_H
17
9f95a23c 18#if defined(WITH_SEASTAR) && !defined(WITH_ALIEN)
11fdf7f2
TL
19// for ObjectStore.h
20struct ThreadPool {
21 struct TPHandle {
22 };
23};
24
25#else
7c673cae 26
31f18b77 27#include <atomic>
11fdf7f2
TL
28#include <list>
29#include <set>
30#include <string>
31#include <vector>
32
33#include "common/ceph_mutex.h"
34#include "include/unordered_map.h"
35#include "common/config_obs.h"
36#include "common/HeartbeatMap.h"
37#include "common/Thread.h"
9f95a23c 38#include "include/common_fwd.h"
11fdf7f2 39#include "include/Context.h"
9f95a23c 40#include "common/HBHandle.h"
31f18b77 41
7c673cae
FG
42
43/// Pool of threads that share work submitted to multiple work queues.
44class ThreadPool : public md_config_obs_t {
eafe8130 45protected:
7c673cae 46 CephContext *cct;
11fdf7f2
TL
47 std::string name;
48 std::string thread_name;
49 std::string lockname;
50 ceph::mutex _lock;
51 ceph::condition_variable _cond;
7c673cae
FG
52 bool _stop;
53 int _pause;
54 int _draining;
11fdf7f2 55 ceph::condition_variable _wait_cond;
7c673cae
FG
56
57public:
9f95a23c 58 class TPHandle : public HBHandle {
7c673cae
FG
59 friend class ThreadPool;
60 CephContext *cct;
9f95a23c 61 ceph::heartbeat_handle_d *hb;
f67539c2
TL
62 ceph::timespan grace;
63 ceph::timespan suicide_grace;
7c673cae
FG
64 public:
65 TPHandle(
66 CephContext *cct,
9f95a23c 67 ceph::heartbeat_handle_d *hb,
f67539c2
TL
68 ceph::timespan grace,
69 ceph::timespan suicide_grace)
7c673cae 70 : cct(cct), hb(hb), grace(grace), suicide_grace(suicide_grace) {}
9f95a23c
TL
71 void reset_tp_timeout() override final;
72 void suspend_tp_timeout() override final;
7c673cae 73 };
eafe8130 74protected:
7c673cae
FG
75
76 /// Basic interface to a work queue used by the worker threads.
77 struct WorkQueue_ {
11fdf7f2 78 std::string name;
f67539c2
TL
79 ceph::timespan timeout_interval;
80 ceph::timespan suicide_interval;
81 WorkQueue_(std::string n, ceph::timespan ti, ceph::timespan sti)
7c673cae
FG
82 : name(std::move(n)), timeout_interval(ti), suicide_interval(sti)
83 { }
84 virtual ~WorkQueue_() {}
85 /// Remove all work items from the queue.
86 virtual void _clear() = 0;
87 /// Check whether there is anything to do.
88 virtual bool _empty() = 0;
89 /// Get the next work item to process.
90 virtual void *_void_dequeue() = 0;
91 /** @brief Process the work item.
92 * This function will be called several times in parallel
93 * and must therefore be thread-safe. */
94 virtual void _void_process(void *item, TPHandle &handle) = 0;
95 /** @brief Synchronously finish processing a work item.
96 * This function is called after _void_process with the global thread pool lock held,
97 * so at most one copy will execute simultaneously for a given thread pool.
98 * It can be used for non-thread-safe finalization. */
99 virtual void _void_process_finish(void *) = 0;
20effc67
TL
100 void set_timeout(time_t ti){
101 timeout_interval = ceph::make_timespan(ti);
102 }
103 void set_suicide_timeout(time_t sti){
104 suicide_interval = ceph::make_timespan(sti);
105 }
7c673cae
FG
106 };
107
108 // track thread pool size changes
109 unsigned _num_threads;
11fdf7f2 110 std::string _thread_num_option;
7c673cae
FG
111 const char **_conf_keys;
112
113 const char **get_tracked_conf_keys() const override {
114 return _conf_keys;
115 }
11fdf7f2 116 void handle_conf_change(const ConfigProxy& conf,
7c673cae
FG
117 const std::set <std::string> &changed) override;
118
119public:
7c673cae
FG
120 /** @brief Templated by-value work queue.
121 * Skeleton implementation of a queue that processes items submitted by value.
122 * This is useful if the items are single primitive values or very small objects
123 * (a few bytes). The queue will automatically add itself to the thread pool on
124 * construction and remove itself on destruction. */
125 template<typename T, typename U = T>
126 class WorkQueueVal : public WorkQueue_ {
11fdf7f2 127 ceph::mutex _lock = ceph::make_mutex("WorkQueueVal::_lock");
7c673cae 128 ThreadPool *pool;
11fdf7f2
TL
129 std::list<U> to_process;
130 std::list<U> to_finish;
7c673cae
FG
131 virtual void _enqueue(T) = 0;
132 virtual void _enqueue_front(T) = 0;
133 bool _empty() override = 0;
134 virtual U _dequeue() = 0;
135 virtual void _process_finish(U) {}
136
137 void *_void_dequeue() override {
138 {
11fdf7f2 139 std::lock_guard l(_lock);
7c673cae
FG
140 if (_empty())
141 return 0;
142 U u = _dequeue();
143 to_process.push_back(u);
144 }
145 return ((void*)1); // Not used
146 }
147 void _void_process(void *, TPHandle &handle) override {
11fdf7f2
TL
148 _lock.lock();
149 ceph_assert(!to_process.empty());
7c673cae
FG
150 U u = to_process.front();
151 to_process.pop_front();
11fdf7f2 152 _lock.unlock();
7c673cae
FG
153
154 _process(u, handle);
155
11fdf7f2 156 _lock.lock();
7c673cae 157 to_finish.push_back(u);
11fdf7f2 158 _lock.unlock();
7c673cae
FG
159 }
160
161 void _void_process_finish(void *) override {
11fdf7f2
TL
162 _lock.lock();
163 ceph_assert(!to_finish.empty());
7c673cae
FG
164 U u = to_finish.front();
165 to_finish.pop_front();
11fdf7f2 166 _lock.unlock();
7c673cae
FG
167
168 _process_finish(u);
169 }
170
171 void _clear() override {}
172
173 public:
f67539c2
TL
174 WorkQueueVal(std::string n,
175 ceph::timespan ti,
176 ceph::timespan sti,
177 ThreadPool *p)
11fdf7f2 178 : WorkQueue_(std::move(n), ti, sti), pool(p) {
7c673cae
FG
179 pool->add_work_queue(this);
180 }
181 ~WorkQueueVal() override {
182 pool->remove_work_queue(this);
183 }
184 void queue(T item) {
11fdf7f2 185 std::lock_guard l(pool->_lock);
7c673cae 186 _enqueue(item);
11fdf7f2 187 pool->_cond.notify_one();
7c673cae
FG
188 }
189 void queue_front(T item) {
11fdf7f2 190 std::lock_guard l(pool->_lock);
7c673cae 191 _enqueue_front(item);
11fdf7f2 192 pool->_cond.notify_one();
7c673cae
FG
193 }
194 void drain() {
195 pool->drain(this);
196 }
197 protected:
198 void lock() {
199 pool->lock();
200 }
201 void unlock() {
202 pool->unlock();
203 }
204 virtual void _process(U u, TPHandle &) = 0;
205 };
206
207 /** @brief Template by-pointer work queue.
208 * Skeleton implementation of a queue that processes items of a given type submitted as pointers.
209 * This is useful when the work item are large or include dynamically allocated memory. The queue
210 * will automatically add itself to the thread pool on construction and remove itself on
211 * destruction. */
212 template<class T>
213 class WorkQueue : public WorkQueue_ {
214 ThreadPool *pool;
215
216 /// Add a work item to the queue.
217 virtual bool _enqueue(T *) = 0;
218 /// Dequeue a previously submitted work item.
219 virtual void _dequeue(T *) = 0;
220 /// Dequeue a work item and return the original submitted pointer.
221 virtual T *_dequeue() = 0;
222 virtual void _process_finish(T *) {}
223
224 // implementation of virtual methods from WorkQueue_
225 void *_void_dequeue() override {
226 return (void *)_dequeue();
227 }
228 void _void_process(void *p, TPHandle &handle) override {
229 _process(static_cast<T *>(p), handle);
230 }
231 void _void_process_finish(void *p) override {
232 _process_finish(static_cast<T *>(p));
233 }
234
235 protected:
236 /// Process a work item. Called from the worker threads.
237 virtual void _process(T *t, TPHandle &) = 0;
238
239 public:
f67539c2
TL
240 WorkQueue(std::string n,
241 ceph::timespan ti, ceph::timespan sti,
242 ThreadPool* p)
7c673cae
FG
243 : WorkQueue_(std::move(n), ti, sti), pool(p) {
244 pool->add_work_queue(this);
245 }
246 ~WorkQueue() override {
247 pool->remove_work_queue(this);
248 }
249
250 bool queue(T *item) {
11fdf7f2 251 pool->_lock.lock();
7c673cae 252 bool r = _enqueue(item);
11fdf7f2
TL
253 pool->_cond.notify_one();
254 pool->_lock.unlock();
7c673cae
FG
255 return r;
256 }
257 void dequeue(T *item) {
11fdf7f2 258 pool->_lock.lock();
7c673cae 259 _dequeue(item);
11fdf7f2 260 pool->_lock.unlock();
7c673cae
FG
261 }
262 void clear() {
11fdf7f2 263 pool->_lock.lock();
7c673cae 264 _clear();
11fdf7f2 265 pool->_lock.unlock();
7c673cae
FG
266 }
267
268 void lock() {
269 pool->lock();
270 }
271 void unlock() {
272 pool->unlock();
273 }
274 /// wake up the thread pool (without lock held)
275 void wake() {
276 pool->wake();
277 }
278 /// wake up the thread pool (with lock already held)
279 void _wake() {
280 pool->_wake();
281 }
282 void _wait() {
283 pool->_wait();
284 }
285 void drain() {
286 pool->drain(this);
287 }
288
289 };
290
291 template<typename T>
292 class PointerWQ : public WorkQueue_ {
293 public:
294 ~PointerWQ() override {
295 m_pool->remove_work_queue(this);
11fdf7f2 296 ceph_assert(m_processing == 0);
7c673cae
FG
297 }
298 void drain() {
299 {
300 // if this queue is empty and not processing, don't wait for other
301 // queues to finish processing
11fdf7f2 302 std::lock_guard l(m_pool->_lock);
7c673cae
FG
303 if (m_processing == 0 && m_items.empty()) {
304 return;
305 }
306 }
307 m_pool->drain(this);
308 }
309 void queue(T *item) {
11fdf7f2 310 std::lock_guard l(m_pool->_lock);
7c673cae 311 m_items.push_back(item);
11fdf7f2 312 m_pool->_cond.notify_one();
7c673cae
FG
313 }
314 bool empty() {
11fdf7f2 315 std::lock_guard l(m_pool->_lock);
7c673cae
FG
316 return _empty();
317 }
318 protected:
f67539c2
TL
319 PointerWQ(std::string n,
320 ceph::timespan ti, ceph::timespan sti,
321 ThreadPool* p)
7c673cae
FG
322 : WorkQueue_(std::move(n), ti, sti), m_pool(p), m_processing(0) {
323 }
224ce89b
WB
324 void register_work_queue() {
325 m_pool->add_work_queue(this);
326 }
7c673cae 327 void _clear() override {
11fdf7f2 328 ceph_assert(ceph_mutex_is_locked(m_pool->_lock));
7c673cae
FG
329 m_items.clear();
330 }
331 bool _empty() override {
11fdf7f2 332 ceph_assert(ceph_mutex_is_locked(m_pool->_lock));
7c673cae
FG
333 return m_items.empty();
334 }
335 void *_void_dequeue() override {
11fdf7f2 336 ceph_assert(ceph_mutex_is_locked(m_pool->_lock));
7c673cae
FG
337 if (m_items.empty()) {
338 return NULL;
339 }
340
341 ++m_processing;
342 T *item = m_items.front();
343 m_items.pop_front();
344 return item;
345 }
346 void _void_process(void *item, ThreadPool::TPHandle &handle) override {
347 process(reinterpret_cast<T *>(item));
348 }
349 void _void_process_finish(void *item) override {
11fdf7f2
TL
350 ceph_assert(ceph_mutex_is_locked(m_pool->_lock));
351 ceph_assert(m_processing > 0);
7c673cae
FG
352 --m_processing;
353 }
354
355 virtual void process(T *item) = 0;
356 void process_finish() {
11fdf7f2 357 std::lock_guard locker(m_pool->_lock);
7c673cae
FG
358 _void_process_finish(nullptr);
359 }
360
361 T *front() {
11fdf7f2 362 ceph_assert(ceph_mutex_is_locked(m_pool->_lock));
7c673cae
FG
363 if (m_items.empty()) {
364 return NULL;
365 }
366 return m_items.front();
367 }
81eedcae 368 void requeue_front(T *item) {
11fdf7f2 369 std::lock_guard pool_locker(m_pool->_lock);
7c673cae
FG
370 _void_process_finish(nullptr);
371 m_items.push_front(item);
372 }
81eedcae
TL
373 void requeue_back(T *item) {
374 std::lock_guard pool_locker(m_pool->_lock);
375 _void_process_finish(nullptr);
376 m_items.push_back(item);
377 }
7c673cae 378 void signal() {
11fdf7f2
TL
379 std::lock_guard pool_locker(m_pool->_lock);
380 m_pool->_cond.notify_one();
7c673cae 381 }
11fdf7f2 382 ceph::mutex &get_pool_lock() {
7c673cae
FG
383 return m_pool->_lock;
384 }
385 private:
386 ThreadPool *m_pool;
387 std::list<T *> m_items;
388 uint32_t m_processing;
389 };
eafe8130 390protected:
11fdf7f2 391 std::vector<WorkQueue_*> work_queues;
7c673cae
FG
392 int next_work_queue = 0;
393
394
395 // threads
396 struct WorkThread : public Thread {
397 ThreadPool *pool;
398 // cppcheck-suppress noExplicitConstructor
399 WorkThread(ThreadPool *p) : pool(p) {}
400 void *entry() override {
401 pool->worker(this);
402 return 0;
403 }
404 };
405
11fdf7f2
TL
406 std::set<WorkThread*> _threads;
407 std::list<WorkThread*> _old_threads; ///< need to be joined
7c673cae
FG
408 int processing;
409
410 void start_threads();
411 void join_old_threads();
eafe8130 412 virtual void worker(WorkThread *wt);
7c673cae
FG
413
414public:
11fdf7f2 415 ThreadPool(CephContext *cct_, std::string nm, std::string tn, int n, const char *option = NULL);
7c673cae
FG
416 ~ThreadPool() override;
417
418 /// return number of threads currently running
419 int get_num_threads() {
11fdf7f2 420 std::lock_guard l(_lock);
7c673cae
FG
421 return _num_threads;
422 }
423
424 /// assign a work queue to this thread pool
425 void add_work_queue(WorkQueue_* wq) {
11fdf7f2 426 std::lock_guard l(_lock);
7c673cae
FG
427 work_queues.push_back(wq);
428 }
429 /// remove a work queue from this thread pool
430 void remove_work_queue(WorkQueue_* wq) {
11fdf7f2 431 std::lock_guard l(_lock);
7c673cae
FG
432 unsigned i = 0;
433 while (work_queues[i] != wq)
434 i++;
435 for (i++; i < work_queues.size(); i++)
436 work_queues[i-1] = work_queues[i];
11fdf7f2 437 ceph_assert(i == work_queues.size());
7c673cae
FG
438 work_queues.resize(i-1);
439 }
440
441 /// take thread pool lock
442 void lock() {
11fdf7f2 443 _lock.lock();
7c673cae
FG
444 }
445 /// release thread pool lock
446 void unlock() {
11fdf7f2 447 _lock.unlock();
7c673cae
FG
448 }
449
450 /// wait for a kick on this thread pool
11fdf7f2
TL
451 void wait(ceph::condition_variable &c) {
452 std::unique_lock l(_lock, std::adopt_lock);
453 c.wait(l);
7c673cae
FG
454 }
455
456 /// wake up a waiter (with lock already held)
457 void _wake() {
11fdf7f2 458 _cond.notify_all();
7c673cae
FG
459 }
460 /// wake up a waiter (without lock held)
461 void wake() {
11fdf7f2
TL
462 std::lock_guard l(_lock);
463 _cond.notify_all();
7c673cae
FG
464 }
465 void _wait() {
11fdf7f2
TL
466 std::unique_lock l(_lock, std::adopt_lock);
467 _cond.wait(l);
7c673cae
FG
468 }
469
470 /// start thread pool thread
471 void start();
472 /// stop thread pool thread
473 void stop(bool clear_after=true);
474 /// pause thread pool (if it not already paused)
475 void pause();
476 /// pause initiation of new work
477 void pause_new();
478 /// resume work in thread pool. must match each pause() call 1:1 to resume.
479 void unpause();
480 /** @brief Wait until work completes.
481 * If the parameter is NULL, blocks until all threads are idle.
482 * If it is not NULL, blocks until the given work queue does not have
483 * any items left to process. */
484 void drain(WorkQueue_* wq = 0);
7c673cae
FG
485};
486
487class GenContextWQ :
488 public ThreadPool::WorkQueueVal<GenContext<ThreadPool::TPHandle&>*> {
11fdf7f2 489 std::list<GenContext<ThreadPool::TPHandle&>*> _queue;
7c673cae 490public:
f67539c2 491 GenContextWQ(const std::string &name, ceph::timespan ti, ThreadPool *tp)
7c673cae
FG
492 : ThreadPool::WorkQueueVal<
493 GenContext<ThreadPool::TPHandle&>*>(name, ti, ti*10, tp) {}
494
495 void _enqueue(GenContext<ThreadPool::TPHandle&> *c) override {
496 _queue.push_back(c);
497 }
498 void _enqueue_front(GenContext<ThreadPool::TPHandle&> *c) override {
499 _queue.push_front(c);
500 }
501 bool _empty() override {
502 return _queue.empty();
503 }
504 GenContext<ThreadPool::TPHandle&> *_dequeue() override {
11fdf7f2 505 ceph_assert(!_queue.empty());
7c673cae
FG
506 GenContext<ThreadPool::TPHandle&> *c = _queue.front();
507 _queue.pop_front();
508 return c;
509 }
510 void _process(GenContext<ThreadPool::TPHandle&> *c,
511 ThreadPool::TPHandle &tp) override {
512 c->complete(tp);
513 }
514};
515
516class C_QueueInWQ : public Context {
517 GenContextWQ *wq;
518 GenContext<ThreadPool::TPHandle&> *c;
519public:
520 C_QueueInWQ(GenContextWQ *wq, GenContext<ThreadPool::TPHandle &> *c)
521 : wq(wq), c(c) {}
522 void finish(int) override {
523 wq->queue(c);
524 }
525};
526
527/// Work queue that asynchronously completes contexts (executes callbacks).
528/// @see Finisher
529class ContextWQ : public ThreadPool::PointerWQ<Context> {
530public:
f67539c2
TL
531 ContextWQ(const std::string &name, ceph::timespan ti, ThreadPool *tp)
532 : ThreadPool::PointerWQ<Context>(name, ti, ceph::timespan::zero(), tp) {
224ce89b 533 this->register_work_queue();
7c673cae
FG
534 }
535
536 void queue(Context *ctx, int result = 0) {
537 if (result != 0) {
11fdf7f2 538 std::lock_guard locker(m_lock);
7c673cae
FG
539 m_context_results[ctx] = result;
540 }
541 ThreadPool::PointerWQ<Context>::queue(ctx);
542 }
543protected:
544 void _clear() override {
545 ThreadPool::PointerWQ<Context>::_clear();
546
11fdf7f2 547 std::lock_guard locker(m_lock);
7c673cae
FG
548 m_context_results.clear();
549 }
550
551 void process(Context *ctx) override {
552 int result = 0;
553 {
11fdf7f2 554 std::lock_guard locker(m_lock);
7c673cae
FG
555 ceph::unordered_map<Context *, int>::iterator it =
556 m_context_results.find(ctx);
557 if (it != m_context_results.end()) {
558 result = it->second;
559 m_context_results.erase(it);
560 }
561 }
562 ctx->complete(result);
563 }
564private:
11fdf7f2 565 ceph::mutex m_lock = ceph::make_mutex("ContextWQ::m_lock");
7c673cae
FG
566 ceph::unordered_map<Context*, int> m_context_results;
567};
568
569class ShardedThreadPool {
570
571 CephContext *cct;
11fdf7f2
TL
572 std::string name;
573 std::string thread_name;
574 std::string lockname;
575 ceph::mutex shardedpool_lock;
576 ceph::condition_variable shardedpool_cond;
577 ceph::condition_variable wait_cond;
7c673cae 578 uint32_t num_threads;
31f18b77
FG
579
580 std::atomic<bool> stop_threads = { false };
581 std::atomic<bool> pause_threads = { false };
582 std::atomic<bool> drain_threads = { false };
583
7c673cae
FG
584 uint32_t num_paused;
585 uint32_t num_drained;
586
587public:
588
589 class BaseShardedWQ {
590
591 public:
f67539c2
TL
592 ceph::timespan timeout_interval, suicide_interval;
593 BaseShardedWQ(ceph::timespan ti, ceph::timespan sti)
594 :timeout_interval(ti), suicide_interval(sti) {}
7c673cae
FG
595 virtual ~BaseShardedWQ() {}
596
9f95a23c 597 virtual void _process(uint32_t thread_index, ceph::heartbeat_handle_d *hb ) = 0;
7c673cae 598 virtual void return_waiting_threads() = 0;
11fdf7f2 599 virtual void stop_return_waiting_threads() = 0;
7c673cae 600 virtual bool is_shard_empty(uint32_t thread_index) = 0;
9f95a23c 601 };
7c673cae
FG
602
603 template <typename T>
604 class ShardedWQ: public BaseShardedWQ {
605
606 ShardedThreadPool* sharded_pool;
607
608 protected:
11fdf7f2
TL
609 virtual void _enqueue(T&&) = 0;
610 virtual void _enqueue_front(T&&) = 0;
7c673cae
FG
611
612
613 public:
f67539c2
TL
614 ShardedWQ(ceph::timespan ti,
615 ceph::timespan sti, ShardedThreadPool* tp)
616 : BaseShardedWQ(ti, sti), sharded_pool(tp) {
7c673cae
FG
617 tp->set_wq(this);
618 }
619 ~ShardedWQ() override {}
620
11fdf7f2
TL
621 void queue(T&& item) {
622 _enqueue(std::move(item));
7c673cae 623 }
11fdf7f2
TL
624 void queue_front(T&& item) {
625 _enqueue_front(std::move(item));
7c673cae
FG
626 }
627 void drain() {
628 sharded_pool->drain();
629 }
630
631 };
632
633private:
634
635 BaseShardedWQ* wq;
636 // threads
637 struct WorkThreadSharded : public Thread {
638 ShardedThreadPool *pool;
639 uint32_t thread_index;
640 WorkThreadSharded(ShardedThreadPool *p, uint32_t pthread_index): pool(p),
641 thread_index(pthread_index) {}
642 void *entry() override {
643 pool->shardedthreadpool_worker(thread_index);
644 return 0;
645 }
646 };
647
11fdf7f2 648 std::vector<WorkThreadSharded*> threads_shardedpool;
7c673cae
FG
649 void start_threads();
650 void shardedthreadpool_worker(uint32_t thread_index);
651 void set_wq(BaseShardedWQ* swq) {
652 wq = swq;
653 }
654
655
656
657public:
658
11fdf7f2 659 ShardedThreadPool(CephContext *cct_, std::string nm, std::string tn, uint32_t pnum_threads);
7c673cae
FG
660
661 ~ShardedThreadPool(){};
662
663 /// start thread pool thread
664 void start();
665 /// stop thread pool thread
666 void stop();
667 /// pause thread pool (if it not already paused)
668 void pause();
669 /// pause initiation of new work
670 void pause_new();
671 /// resume work in thread pool. must match each pause() call 1:1 to resume.
672 void unpause();
673 /// wait for all work to complete
674 void drain();
675
676};
677
11fdf7f2 678#endif
7c673cae
FG
679
680#endif