-#include <boost/asio/ts/executor.hpp>
-#include <boost/asio/thread_pool.hpp>
+#include <boost/asio/execution.hpp>
+#include <boost/asio/static_thread_pool.hpp>
+#include <algorithm>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <thread>
#include <numeric>
-using boost::asio::dispatch;
-using boost::asio::execution_context;
-using boost::asio::thread_pool;
+using boost::asio::static_thread_pool;
+namespace execution = boost::asio::execution;
// A fixed-size thread pool used to implement fork/join semantics. Functions
// are scheduled using a simple FIFO queue. Implementing work stealing, or
// using a queue based on atomic operations, are left as tasks for the reader.
-class fork_join_pool : public execution_context
+class fork_join_pool
{
public:
// The constructor starts a thread pool with the specified number of threads.
// Additional threads may temporarily be added to the pool if they join a
// fork_executor.
explicit fork_join_pool(
- std::size_t thread_count = std::thread::hardware_concurrency() * 2)
+ std::size_t thread_count = std::max(std::thread::hardware_concurrency(), 1u) * 2)
: use_count_(1),
threads_(thread_count)
{
// it is time to shut down, i.e. the use count is zero.
for (thread_count_ = 0; thread_count_ < thread_count; ++thread_count_)
{
- dispatch(threads_, [&]
+ execution::execute(
+ threads_.executor(),
+ [this]
{
std::unique_lock<std::mutex> lock(mutex_);
while (use_count_ > 0)
catch (...)
{
stop_threads();
- threads_.join();
+ threads_.wait();
throw;
}
}
~fork_join_pool()
{
stop_threads();
- threads_.join();
+ threads_.wait();
}
private:
// Dispatch a function, executing it immediately if the queue is already
// loaded. Otherwise adds the function to the queue and wakes a thread.
- void do_dispatch(std::shared_ptr<function_base> p,
+ void do_execute(std::shared_ptr<function_base> p,
const std::shared_ptr<std::size_t>& work_count)
{
std::unique_lock<std::mutex> lock(mutex_);
}
}
- // Add a function to the queue and wake a thread.
- void do_post(std::shared_ptr<function_base> p,
- const std::shared_ptr<std::size_t>& work_count)
- {
- std::lock_guard<std::mutex> lock(mutex_);
- queue_.push(p);
- do_work_started(work_count);
- condition_.notify_one();
- }
-
// Ask all threads to shut down.
void stop_threads()
{
std::queue<std::shared_ptr<function_base>> queue_;
std::size_t use_count_;
std::size_t thread_count_;
- thread_pool threads_;
+ static_thread_pool threads_;
};
// A class that satisfies the Executor requirements. Every function or piece of
{
}
- fork_join_pool& context() const noexcept
+ fork_join_pool& query(execution::context_t) const noexcept
{
return context_;
}
- void on_work_started() const noexcept
- {
- std::lock_guard<std::mutex> lock(context_.mutex_);
- context_.do_work_started(work_count_);
- }
-
- void on_work_finished() const noexcept
- {
- std::lock_guard<std::mutex> lock(context_.mutex_);
- context_.do_work_finished(work_count_);
- }
-
- template <class Func, class Alloc>
- void dispatch(Func&& f, const Alloc& a) const
- {
- auto p(std::allocate_shared<function<Func>>(
- typename std::allocator_traits<Alloc>::template rebind_alloc<char>(a),
- std::move(f), work_count_));
- context_.do_dispatch(p, work_count_);
- }
-
- template <class Func, class Alloc>
- void post(Func f, const Alloc& a) const
- {
- auto p(std::allocate_shared<function<Func>>(
- typename std::allocator_traits<Alloc>::template rebind_alloc<char>(a),
- std::move(f), work_count_));
- context_.do_post(p, work_count_);
- }
-
- template <class Func, class Alloc>
- void defer(Func&& f, const Alloc& a) const
+ template <class Func>
+ void execute(Func f) const
{
- post(std::forward<Func>(f), a);
+ auto p(std::make_shared<function<Func>>(std::move(f), work_count_));
+ context_.do_execute(p, work_count_);
}
friend bool operator==(const fork_executor& a,
{
fork_executor fork(pool);
join_guard join(fork);
- dispatch(fork, [=]{ fork_join_sort(begin, begin + n / 2); });
- dispatch(fork, [=]{ fork_join_sort(begin + n / 2, end); });
+ execution::execute(fork, [=]{ fork_join_sort(begin, begin + n / 2); });
+ execution::execute(fork, [=]{ fork_join_sort(begin + n / 2, end); });
}
std::inplace_merge(begin, begin + n / 2, end);
}