]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/boost/libs/asio/example/cpp14/executors/fork_join.cpp
import quincy beta 17.1.0
[ceph.git] / ceph / src / boost / libs / asio / example / cpp14 / executors / fork_join.cpp
index 2d9b4f7bd2b9ce603e2773751b06e237588c8567..24a338e8726f7253e8896ab2e9a6f20472a01680 100644 (file)
@@ -1,5 +1,6 @@
-#include <boost/asio/ts/executor.hpp>
-#include <boost/asio/thread_pool.hpp>
+#include <boost/asio/execution.hpp>
+#include <boost/asio/static_thread_pool.hpp>
+#include <algorithm>
 #include <condition_variable>
 #include <memory>
 #include <mutex>
@@ -7,14 +8,13 @@
 #include <thread>
 #include <numeric>
 
-using boost::asio::dispatch;
-using boost::asio::execution_context;
-using boost::asio::thread_pool;
+using boost::asio::static_thread_pool;
+namespace execution = boost::asio::execution;
 
 // A fixed-size thread pool used to implement fork/join semantics. Functions
 // are scheduled using a simple FIFO queue. Implementing work stealing, or
 // using a queue based on atomic operations, are left as tasks for the reader.
-class fork_join_pool : public execution_context
+class fork_join_pool
 {
 public:
   // The constructor starts a thread pool with the specified number of threads.
@@ -22,7 +22,7 @@ public:
   // Additional threads may temporarily be added to the pool if they join a
   // fork_executor.
   explicit fork_join_pool(
-      std::size_t thread_count = std::thread::hardware_concurrency() * 2)
+      std::size_t thread_count = std::max(std::thread::hardware_concurrency(), 1u) * 2)
     : use_count_(1),
       threads_(thread_count)
   {
@@ -32,7 +32,9 @@ public:
       // it is time to shut down, i.e. the use count is zero.
       for (thread_count_ = 0; thread_count_ < thread_count; ++thread_count_)
       {
-        dispatch(threads_, [&]
+        execution::execute(
+            threads_.executor(),
+            [this]
             {
               std::unique_lock<std::mutex> lock(mutex_);
               while (use_count_ > 0)
@@ -44,7 +46,7 @@ public:
     catch (...)
     {
       stop_threads();
-      threads_.join();
+      threads_.wait();
       throw;
     }
   }
@@ -53,7 +55,7 @@ public:
   ~fork_join_pool()
   {
     stop_threads();
-    threads_.join();
+    threads_.wait();
   }
 
 private:
@@ -117,7 +119,7 @@ private:
 
   // Dispatch a function, executing it immediately if the queue is already
   // loaded. Otherwise adds the function to the queue and wakes a thread.
-  void do_dispatch(std::shared_ptr<function_base> p,
+  void do_execute(std::shared_ptr<function_base> p,
       const std::shared_ptr<std::size_t>& work_count)
   {
     std::unique_lock<std::mutex> lock(mutex_);
@@ -135,16 +137,6 @@ private:
     }
   }
 
-  // Add a function to the queue and wake a thread.
-  void do_post(std::shared_ptr<function_base> p,
-      const std::shared_ptr<std::size_t>& work_count)
-  {
-    std::lock_guard<std::mutex> lock(mutex_);
-    queue_.push(p);
-    do_work_started(work_count);
-    condition_.notify_one();
-  }
-
   // Ask all threads to shut down.
   void stop_threads()
   {
@@ -158,7 +150,7 @@ private:
   std::queue<std::shared_ptr<function_base>> queue_;
   std::size_t use_count_;
   std::size_t thread_count_;
-  thread_pool threads_;
+  static_thread_pool threads_;
 };
 
 // A class that satisfies the Executor requirements. Every function or piece of
@@ -172,45 +164,16 @@ public:
   {
   }
 
-  fork_join_pool& context() const noexcept
+  fork_join_pool& query(execution::context_t) const noexcept
   {
     return context_;
   }
 
-  void on_work_started() const noexcept
-  {
-    std::lock_guard<std::mutex> lock(context_.mutex_);
-    context_.do_work_started(work_count_);
-  }
-
-  void on_work_finished() const noexcept
-  {
-    std::lock_guard<std::mutex> lock(context_.mutex_);
-    context_.do_work_finished(work_count_);
-  }
-
-  template <class Func, class Alloc>
-  void dispatch(Func&& f, const Alloc& a) const
-  {
-    auto p(std::allocate_shared<function<Func>>(
-          typename std::allocator_traits<Alloc>::template rebind_alloc<char>(a),
-          std::move(f), work_count_));
-    context_.do_dispatch(p, work_count_);
-  }
-
-  template <class Func, class Alloc>
-  void post(Func f, const Alloc& a) const
-  {
-    auto p(std::allocate_shared<function<Func>>(
-          typename std::allocator_traits<Alloc>::template rebind_alloc<char>(a),
-          std::move(f), work_count_));
-    context_.do_post(p, work_count_);
-  }
-
-  template <class Func, class Alloc>
-  void defer(Func&& f, const Alloc& a) const
+  template <class Func>
+  void execute(Func f) const
   {
-    post(std::forward<Func>(f), a);
+    auto p(std::make_shared<function<Func>>(std::move(f), work_count_));
+    context_.do_execute(p, work_count_);
   }
 
   friend bool operator==(const fork_executor& a,
@@ -289,8 +252,8 @@ void fork_join_sort(Iterator begin, Iterator end)
     {
       fork_executor fork(pool);
       join_guard join(fork);
-      dispatch(fork, [=]{ fork_join_sort(begin, begin + n / 2); });
-      dispatch(fork, [=]{ fork_join_sort(begin + n / 2, end); });
+      execution::execute(fork, [=]{ fork_join_sort(begin, begin + n / 2); });
+      execution::execute(fork, [=]{ fork_join_sort(begin + n / 2, end); });
     }
     std::inplace_merge(begin, begin + n / 2, end);
   }