1 #include <boost/asio/associated_executor.hpp>
2 #include <boost/asio/bind_executor.hpp>
3 #include <boost/asio/execution_context.hpp>
4 #include <boost/asio/post.hpp>
5 #include <boost/asio/system_executor.hpp>
6 #include <boost/asio/use_future.hpp>
7 #include <condition_variable>
16 using boost::asio::execution_context
;
17 using boost::asio::executor_binder
;
18 using boost::asio::get_associated_executor
;
19 using boost::asio::post
;
20 using boost::asio::system_executor
;
21 using boost::asio::use_future
;
22 using boost::asio::use_service
;
23 namespace execution
= boost::asio::execution
;
25 // An executor that launches a new thread for each function submitted to it.
26 // This class satisfies the executor requirements.
30 // Service to track all threads started through a thread_executor.
31 class thread_bag
: public execution_context::service
34 typedef thread_bag key_type
;
36 explicit thread_bag(execution_context
& ctx
)
37 : execution_context::service(ctx
)
41 void add_thread(std::thread
&& t
)
43 std::unique_lock
<std::mutex
> lock(mutex_
);
44 threads_
.push_back(std::move(t
));
48 virtual void shutdown()
50 for (auto& t
: threads_
)
55 std::vector
<std::thread
> threads_
;
59 execution_context
& query(execution::context_t
) const
61 return boost::asio::query(system_executor(), execution::context
);
64 execution::blocking_t
query(execution::blocking_t
) const
66 return execution::blocking
.never
;
69 thread_executor
require(execution::blocking_t::never_t
) const
75 void execute(Func f
) const
77 thread_bag
& bag
= use_service
<thread_bag
>(query(execution::context
));
78 bag
.add_thread(std::thread(std::move(f
)));
81 friend bool operator==(const thread_executor
&,
82 const thread_executor
&) noexcept
87 friend bool operator!=(const thread_executor
&,
88 const thread_executor
&) noexcept
94 // Base class for all thread-safe queue implementations.
97 template <class> friend class queue_front
;
98 template <class> friend class queue_back
;
100 std::condition_variable condition_
;
104 // Underlying implementation of a thread-safe queue, shared between the
105 // queue_front and queue_back classes.
107 class queue_impl
: public queue_impl_base
109 template <class> friend class queue_front
;
110 template <class> friend class queue_back
;
111 std::queue
<T
> queue_
;
114 // The front end of a queue between consecutive pipeline stages.
119 typedef T value_type
;
121 explicit queue_front(std::shared_ptr
<queue_impl
<T
>> impl
)
128 std::unique_lock
<std::mutex
> lock(impl_
->mutex_
);
129 impl_
->queue_
.push(std::move(t
));
130 impl_
->condition_
.notify_one();
135 std::unique_lock
<std::mutex
> lock(impl_
->mutex_
);
137 impl_
->condition_
.notify_one();
141 std::shared_ptr
<queue_impl
<T
>> impl_
;
144 // The back end of a queue between consecutive pipeline stages.
149 typedef T value_type
;
151 explicit queue_back(std::shared_ptr
<queue_impl
<T
>> impl
)
158 std::unique_lock
<std::mutex
> lock(impl_
->mutex_
);
159 while (impl_
->queue_
.empty() && !impl_
->stop_
)
160 impl_
->condition_
.wait(lock
);
161 if (!impl_
->queue_
.empty())
163 t
= impl_
->queue_
.front();
171 std::shared_ptr
<queue_impl
<T
>> impl_
;
174 // Launch the last stage in a pipeline.
175 template <class T
, class F
>
176 std::future
<void> pipeline(queue_back
<T
> in
, F f
)
178 // Get the function's associated executor, defaulting to thread_executor.
179 auto ex
= get_associated_executor(f
, thread_executor());
181 // Run the function, and as we're the last stage return a future so that the
182 // caller can wait for the pipeline to finish.
183 return post(ex
, use_future([in
, f
]() mutable { f(in
); }));
186 // Launch an intermediate stage in a pipeline.
187 template <class T
, class F
, class... Tail
>
188 std::future
<void> pipeline(queue_back
<T
> in
, F f
, Tail
... t
)
190 // Determine the output queue type.
191 typedef typename executor_binder
<F
, thread_executor
>::second_argument_type::value_type output_value_type
;
193 // Create the output queue and its implementation.
194 auto out_impl
= std::make_shared
<queue_impl
<output_value_type
>>();
195 queue_front
<output_value_type
> out(out_impl
);
196 queue_back
<output_value_type
> next_in(out_impl
);
198 // Get the function's associated executor, defaulting to thread_executor.
199 auto ex
= get_associated_executor(f
, thread_executor());
202 post(ex
, [in
, out
, f
]() mutable
208 // Launch the rest of the pipeline.
209 return pipeline(next_in
, std::move(t
)...);
212 // Launch the first stage in a pipeline.
213 template <class F
, class... Tail
>
214 std::future
<void> pipeline(F f
, Tail
... t
)
216 // Determine the output queue type.
217 typedef typename executor_binder
<F
, thread_executor
>::argument_type::value_type output_value_type
;
219 // Create the output queue and its implementation.
220 auto out_impl
= std::make_shared
<queue_impl
<output_value_type
>>();
221 queue_front
<output_value_type
> out(out_impl
);
222 queue_back
<output_value_type
> next_in(out_impl
);
224 // Get the function's associated executor, defaulting to thread_executor.
225 auto ex
= get_associated_executor(f
, thread_executor());
228 post(ex
, [out
, f
]() mutable
234 // Launch the rest of the pipeline.
235 return pipeline(next_in
, std::move(t
)...);
238 //------------------------------------------------------------------------------
240 #include <boost/asio/thread_pool.hpp>
244 using boost::asio::bind_executor
;
245 using boost::asio::thread_pool
;
247 void reader(queue_front
<std::string
> out
)
250 while (std::getline(std::cin
, line
))
254 void filter(queue_back
<std::string
> in
, queue_front
<std::string
> out
)
258 if (line
.length() > 5)
262 void upper(queue_back
<std::string
> in
, queue_front
<std::string
> out
)
267 std::string new_line
;
269 new_line
.push_back(std::toupper(c
));
274 void writer(queue_back
<std::string
> in
)
276 std::size_t count
= 0;
279 std::cout
<< count
++ << ": " << line
<< std::endl
;
286 auto f
= pipeline(reader
, filter
, bind_executor(pool
, upper
), writer
);