]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/common/WorkQueue.h
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / common / WorkQueue.h
index f25286b288c677efaa192e68456a9a5ac94b362f..8e9ee405d06454611ad9ce6e7cc1ec6c3324938d 100644 (file)
@@ -59,14 +59,14 @@ public:
     friend class ThreadPool;
     CephContext *cct;
     ceph::heartbeat_handle_d *hb;
-    ceph::coarse_mono_clock::rep grace;
-    ceph::coarse_mono_clock::rep suicide_grace;
+    ceph::timespan grace;
+    ceph::timespan suicide_grace;
   public:
     TPHandle(
       CephContext *cct,
       ceph::heartbeat_handle_d *hb,
-      time_t grace,
-      time_t suicide_grace)
+      ceph::timespan grace,
+      ceph::timespan suicide_grace)
       : cct(cct), hb(hb), grace(grace), suicide_grace(suicide_grace) {}
     void reset_tp_timeout() override final;
     void suspend_tp_timeout() override final;
@@ -76,8 +76,9 @@ protected:
   /// Basic interface to a work queue used by the worker threads.
   struct WorkQueue_ {
     std::string name;
-    time_t timeout_interval, suicide_interval;
-    WorkQueue_(std::string n, time_t ti, time_t sti)
+    ceph::timespan timeout_interval;
+    ceph::timespan suicide_interval;
+    WorkQueue_(std::string n, ceph::timespan ti, ceph::timespan sti)
       : name(std::move(n)), timeout_interval(ti), suicide_interval(sti)
     { }
     virtual ~WorkQueue_() {}
@@ -110,85 +111,6 @@ protected:
                          const std::set <std::string> &changed) override;
 
 public:
-  /** @brief Work queue that processes several submitted items at once.
-   * The queue will automatically add itself to the thread pool on construction
-   * and remove itself on destruction. */
-  template<class T>
-  class BatchWorkQueue : public WorkQueue_ {
-    ThreadPool *pool;
-
-    virtual bool _enqueue(T *) = 0;
-    virtual void _dequeue(T *) = 0;
-    virtual void _dequeue(std::list<T*> *) = 0;
-    virtual void _process_finish(const std::list<T*> &) {}
-
-    // virtual methods from WorkQueue_ below
-    void *_void_dequeue() override {
-      std::list<T*> *out(new std::list<T*>);
-      _dequeue(out);
-      if (!out->empty()) {
-       return (void *)out;
-      } else {
-       delete out;
-       return 0;
-      }
-    }
-    void _void_process(void *p, TPHandle &handle) override {
-      _process(*((std::list<T*>*)p), handle);
-    }
-    void _void_process_finish(void *p) override {
-      _process_finish(*(std::list<T*>*)p);
-      delete (std::list<T*> *)p;
-    }
-
-  protected:
-    virtual void _process(const std::list<T*> &items, TPHandle &handle) = 0;
-
-  public:
-    BatchWorkQueue(std::string n, time_t ti, time_t sti, ThreadPool* p)
-      : WorkQueue_(std::move(n), ti, sti), pool(p) {
-      pool->add_work_queue(this);
-    }
-    ~BatchWorkQueue() override {
-      pool->remove_work_queue(this);
-    }
-
-    bool queue(T *item) {
-      pool->_lock.lock();
-      bool r = _enqueue(item);
-      pool->_cond.notify_one();
-      pool->_lock.unlock();
-      return r;
-    }
-    void dequeue(T *item) {
-      pool->_lock.lock();
-      _dequeue(item);
-      pool->_lock.unlock();
-    }
-    void clear() {
-      pool->_lock.lock();
-      _clear();
-      pool->_lock.unlock();
-    }
-
-    void lock() {
-      pool->lock();
-    }
-    void unlock() {
-      pool->unlock();
-    }
-    void wake() {
-      pool->wake();
-    }
-    void _wake() {
-      pool->_wake();
-    }
-    void drain() {
-      pool->drain(this);
-    }
-
-  };
-
   /** @brief Templated by-value work queue.
    * Skeleton implementation of a queue that processes items submitted by value.
    * This is useful if the items are single primitive values or very small objects
@@ -243,7 +165,10 @@ public:
     void _clear() override {}
 
   public:
-    WorkQueueVal(std::string n, time_t ti, time_t sti, ThreadPool *p)
+    WorkQueueVal(std::string n,
+                ceph::timespan ti,
+                ceph::timespan sti,
+                ThreadPool *p)
       : WorkQueue_(std::move(n), ti, sti), pool(p) {
       pool->add_work_queue(this);
     }
@@ -306,7 +231,9 @@ public:
     virtual void _process(T *t, TPHandle &) = 0;
 
   public:
-    WorkQueue(std::string n, time_t ti, time_t sti, ThreadPool* p)
+    WorkQueue(std::string n,
+             ceph::timespan ti, ceph::timespan sti,
+             ThreadPool* p)
       : WorkQueue_(std::move(n), ti, sti), pool(p) {
       pool->add_work_queue(this);
     }
@@ -383,7 +310,9 @@ public:
       return _empty();
     }
   protected:
-    PointerWQ(std::string n, time_t ti, time_t sti, ThreadPool* p)
+    PointerWQ(std::string n,
+             ceph::timespan ti, ceph::timespan sti,
+             ThreadPool* p)
       : WorkQueue_(std::move(n), ti, sti), m_pool(p), m_processing(0) {
     }
     void register_work_queue() {
@@ -432,25 +361,16 @@ public:
     }
     void requeue_front(T *item) {
       std::lock_guard pool_locker(m_pool->_lock);
-      requeue_front(pool_locker, item);
-    }
-    void requeue_front(const std::lock_guard<ceph::mutex>&, T *item) {
       _void_process_finish(nullptr);
       m_items.push_front(item);
     }
     void requeue_back(T *item) {
       std::lock_guard pool_locker(m_pool->_lock);
-      requeue_back(pool_locker, item);
-    }
-    void requeue_back(const std::lock_guard<ceph::mutex>&, T *item) {
       _void_process_finish(nullptr);
       m_items.push_back(item);
     }
     void signal() {
       std::lock_guard pool_locker(m_pool->_lock);
-      signal(pool_locker);
-    }
-    void signal(const std::lock_guard<ceph::mutex>&) {
       m_pool->_cond.notify_one();
     }
     ceph::mutex &get_pool_lock() {
@@ -562,7 +482,7 @@ class GenContextWQ :
   public ThreadPool::WorkQueueVal<GenContext<ThreadPool::TPHandle&>*> {
   std::list<GenContext<ThreadPool::TPHandle&>*> _queue;
 public:
-  GenContextWQ(const std::string &name, time_t ti, ThreadPool *tp)
+  GenContextWQ(const std::string &name, ceph::timespan ti, ThreadPool *tp)
     : ThreadPool::WorkQueueVal<
       GenContext<ThreadPool::TPHandle&>*>(name, ti, ti*10, tp) {}
   
@@ -602,8 +522,8 @@ public:
 /// @see Finisher
 class ContextWQ : public ThreadPool::PointerWQ<Context> {
 public:
-  ContextWQ(const std::string &name, time_t ti, ThreadPool *tp)
-    : ThreadPool::PointerWQ<Context>(name, ti, 0, tp) {
+  ContextWQ(const std::string &name, ceph::timespan ti, ThreadPool *tp)
+    : ThreadPool::PointerWQ<Context>(name, ti, ceph::timespan::zero(), tp) {
     this->register_work_queue();
   }
 
@@ -663,8 +583,9 @@ public:
   class BaseShardedWQ {
   
   public:
-    time_t timeout_interval, suicide_interval;
-    BaseShardedWQ(time_t ti, time_t sti):timeout_interval(ti), suicide_interval(sti) {}
+    ceph::timespan timeout_interval, suicide_interval;
+    BaseShardedWQ(ceph::timespan ti, ceph::timespan sti)
+      :timeout_interval(ti), suicide_interval(sti) {}
     virtual ~BaseShardedWQ() {}
 
     virtual void _process(uint32_t thread_index, ceph::heartbeat_handle_d *hb ) = 0;
@@ -684,8 +605,9 @@ public:
 
 
   public:
-    ShardedWQ(time_t ti, time_t sti, ShardedThreadPool* tp): BaseShardedWQ(ti, sti), 
-                                                                 sharded_pool(tp) {
+    ShardedWQ(ceph::timespan ti,
+             ceph::timespan sti, ShardedThreadPool* tp)
+      : BaseShardedWQ(ti, sti), sharded_pool(tp) {
       tp->set_wq(this);
     }
     ~ShardedWQ() override {}