friend class ThreadPool;
CephContext *cct;
ceph::heartbeat_handle_d *hb;
- ceph::coarse_mono_clock::rep grace;
- ceph::coarse_mono_clock::rep suicide_grace;
+ ceph::timespan grace;
+ ceph::timespan suicide_grace;
public:
TPHandle(
CephContext *cct,
ceph::heartbeat_handle_d *hb,
- time_t grace,
- time_t suicide_grace)
+ ceph::timespan grace,
+ ceph::timespan suicide_grace)
: cct(cct), hb(hb), grace(grace), suicide_grace(suicide_grace) {}
void reset_tp_timeout() override final;
void suspend_tp_timeout() override final;
/// Basic interface to a work queue used by the worker threads.
struct WorkQueue_ {
std::string name;
- time_t timeout_interval, suicide_interval;
- WorkQueue_(std::string n, time_t ti, time_t sti)
+ ceph::timespan timeout_interval;
+ ceph::timespan suicide_interval;
+ WorkQueue_(std::string n, ceph::timespan ti, ceph::timespan sti)
: name(std::move(n)), timeout_interval(ti), suicide_interval(sti)
{ }
virtual ~WorkQueue_() {}
const std::set <std::string> &changed) override;
public:
- /** @brief Work queue that processes several submitted items at once.
- * The queue will automatically add itself to the thread pool on construction
- * and remove itself on destruction. */
- template<class T>
- class BatchWorkQueue : public WorkQueue_ {
- ThreadPool *pool;
-
- virtual bool _enqueue(T *) = 0;
- virtual void _dequeue(T *) = 0;
- virtual void _dequeue(std::list<T*> *) = 0;
- virtual void _process_finish(const std::list<T*> &) {}
-
- // virtual methods from WorkQueue_ below
- void *_void_dequeue() override {
- std::list<T*> *out(new std::list<T*>);
- _dequeue(out);
- if (!out->empty()) {
- return (void *)out;
- } else {
- delete out;
- return 0;
- }
- }
- void _void_process(void *p, TPHandle &handle) override {
- _process(*((std::list<T*>*)p), handle);
- }
- void _void_process_finish(void *p) override {
- _process_finish(*(std::list<T*>*)p);
- delete (std::list<T*> *)p;
- }
-
- protected:
- virtual void _process(const std::list<T*> &items, TPHandle &handle) = 0;
-
- public:
- BatchWorkQueue(std::string n, time_t ti, time_t sti, ThreadPool* p)
- : WorkQueue_(std::move(n), ti, sti), pool(p) {
- pool->add_work_queue(this);
- }
- ~BatchWorkQueue() override {
- pool->remove_work_queue(this);
- }
-
- bool queue(T *item) {
- pool->_lock.lock();
- bool r = _enqueue(item);
- pool->_cond.notify_one();
- pool->_lock.unlock();
- return r;
- }
- void dequeue(T *item) {
- pool->_lock.lock();
- _dequeue(item);
- pool->_lock.unlock();
- }
- void clear() {
- pool->_lock.lock();
- _clear();
- pool->_lock.unlock();
- }
-
- void lock() {
- pool->lock();
- }
- void unlock() {
- pool->unlock();
- }
- void wake() {
- pool->wake();
- }
- void _wake() {
- pool->_wake();
- }
- void drain() {
- pool->drain(this);
- }
-
- };
-
/** @brief Templated by-value work queue.
* Skeleton implementation of a queue that processes items submitted by value.
* This is useful if the items are single primitive values or very small objects
void _clear() override {}
public:
- WorkQueueVal(std::string n, time_t ti, time_t sti, ThreadPool *p)
+ WorkQueueVal(std::string n,
+ ceph::timespan ti,
+ ceph::timespan sti,
+ ThreadPool *p)
: WorkQueue_(std::move(n), ti, sti), pool(p) {
pool->add_work_queue(this);
}
virtual void _process(T *t, TPHandle &) = 0;
public:
- WorkQueue(std::string n, time_t ti, time_t sti, ThreadPool* p)
+ WorkQueue(std::string n,
+ ceph::timespan ti, ceph::timespan sti,
+ ThreadPool* p)
: WorkQueue_(std::move(n), ti, sti), pool(p) {
pool->add_work_queue(this);
}
return _empty();
}
protected:
- PointerWQ(std::string n, time_t ti, time_t sti, ThreadPool* p)
+ PointerWQ(std::string n,
+ ceph::timespan ti, ceph::timespan sti,
+ ThreadPool* p)
: WorkQueue_(std::move(n), ti, sti), m_pool(p), m_processing(0) {
}
void register_work_queue() {
}
void requeue_front(T *item) {
std::lock_guard pool_locker(m_pool->_lock);
- requeue_front(pool_locker, item);
- }
- void requeue_front(const std::lock_guard<ceph::mutex>&, T *item) {
_void_process_finish(nullptr);
m_items.push_front(item);
}
void requeue_back(T *item) {
std::lock_guard pool_locker(m_pool->_lock);
- requeue_back(pool_locker, item);
- }
- void requeue_back(const std::lock_guard<ceph::mutex>&, T *item) {
_void_process_finish(nullptr);
m_items.push_back(item);
}
void signal() {
std::lock_guard pool_locker(m_pool->_lock);
- signal(pool_locker);
- }
- void signal(const std::lock_guard<ceph::mutex>&) {
m_pool->_cond.notify_one();
}
ceph::mutex &get_pool_lock() {
public ThreadPool::WorkQueueVal<GenContext<ThreadPool::TPHandle&>*> {
std::list<GenContext<ThreadPool::TPHandle&>*> _queue;
public:
- GenContextWQ(const std::string &name, time_t ti, ThreadPool *tp)
+ GenContextWQ(const std::string &name, ceph::timespan ti, ThreadPool *tp)
: ThreadPool::WorkQueueVal<
GenContext<ThreadPool::TPHandle&>*>(name, ti, ti*10, tp) {}
/// @see Finisher
class ContextWQ : public ThreadPool::PointerWQ<Context> {
public:
- ContextWQ(const std::string &name, time_t ti, ThreadPool *tp)
- : ThreadPool::PointerWQ<Context>(name, ti, 0, tp) {
+ ContextWQ(const std::string &name, ceph::timespan ti, ThreadPool *tp)
+ : ThreadPool::PointerWQ<Context>(name, ti, ceph::timespan::zero(), tp) {
this->register_work_queue();
}
class BaseShardedWQ {
public:
- time_t timeout_interval, suicide_interval;
- BaseShardedWQ(time_t ti, time_t sti):timeout_interval(ti), suicide_interval(sti) {}
+ ceph::timespan timeout_interval, suicide_interval;
+ BaseShardedWQ(ceph::timespan ti, ceph::timespan sti)
+ :timeout_interval(ti), suicide_interval(sti) {}
virtual ~BaseShardedWQ() {}
virtual void _process(uint32_t thread_index, ceph::heartbeat_handle_d *hb ) = 0;
public:
- ShardedWQ(time_t ti, time_t sti, ShardedThreadPool* tp): BaseShardedWQ(ti, sti),
- sharded_pool(tp) {
+ ShardedWQ(ceph::timespan ti,
+ ceph::timespan sti, ShardedThreadPool* tp)
+ : BaseShardedWQ(ti, sti), sharded_pool(tp) {
tp->set_wq(this);
}
~ShardedWQ() override {}