]> git.proxmox.com Git - ceph.git/blob - ceph/src/boost/libs/asio/example/cpp11/executors/pipeline.cpp
import quincy beta 17.1.0
[ceph.git] / ceph / src / boost / libs / asio / example / cpp11 / executors / pipeline.cpp
1 #include <boost/asio/associated_executor.hpp>
2 #include <boost/asio/bind_executor.hpp>
3 #include <boost/asio/execution_context.hpp>
4 #include <boost/asio/post.hpp>
5 #include <boost/asio/system_executor.hpp>
6 #include <boost/asio/use_future.hpp>
7 #include <condition_variable>
8 #include <future>
9 #include <memory>
10 #include <mutex>
11 #include <queue>
12 #include <thread>
13 #include <vector>
14 #include <cctype>
15
16 using boost::asio::execution_context;
17 using boost::asio::executor_binder;
18 using boost::asio::get_associated_executor;
19 using boost::asio::post;
20 using boost::asio::system_executor;
21 using boost::asio::use_future;
22 using boost::asio::use_service;
23 namespace execution = boost::asio::execution;
24
25 // An executor that launches a new thread for each function submitted to it.
26 // This class satisfies the executor requirements.
27 class thread_executor
28 {
29 private:
30 // Service to track all threads started through a thread_executor.
31 class thread_bag : public execution_context::service
32 {
33 public:
34 typedef thread_bag key_type;
35
36 explicit thread_bag(execution_context& ctx)
37 : execution_context::service(ctx)
38 {
39 }
40
41 void add_thread(std::thread&& t)
42 {
43 std::unique_lock<std::mutex> lock(mutex_);
44 threads_.push_back(std::move(t));
45 }
46
47 private:
48 virtual void shutdown()
49 {
50 for (auto& t : threads_)
51 t.join();
52 }
53
54 std::mutex mutex_;
55 std::vector<std::thread> threads_;
56 };
57
58 public:
59 execution_context& query(execution::context_t) const
60 {
61 return boost::asio::query(system_executor(), execution::context);
62 }
63
64 execution::blocking_t query(execution::blocking_t) const
65 {
66 return execution::blocking.never;
67 }
68
69 thread_executor require(execution::blocking_t::never_t) const
70 {
71 return *this;
72 }
73
74 template <class Func>
75 void execute(Func f) const
76 {
77 thread_bag& bag = use_service<thread_bag>(query(execution::context));
78 bag.add_thread(std::thread(std::move(f)));
79 }
80
81 friend bool operator==(const thread_executor&,
82 const thread_executor&) noexcept
83 {
84 return true;
85 }
86
87 friend bool operator!=(const thread_executor&,
88 const thread_executor&) noexcept
89 {
90 return false;
91 }
92 };
93
94 // Base class for all thread-safe queue implementations.
95 class queue_impl_base
96 {
97 template <class> friend class queue_front;
98 template <class> friend class queue_back;
99 std::mutex mutex_;
100 std::condition_variable condition_;
101 bool stop_ = false;
102 };
103
104 // Underlying implementation of a thread-safe queue, shared between the
105 // queue_front and queue_back classes.
106 template <class T>
107 class queue_impl : public queue_impl_base
108 {
109 template <class> friend class queue_front;
110 template <class> friend class queue_back;
111 std::queue<T> queue_;
112 };
113
114 // The front end of a queue between consecutive pipeline stages.
115 template <class T>
116 class queue_front
117 {
118 public:
119 typedef T value_type;
120
121 explicit queue_front(std::shared_ptr<queue_impl<T>> impl)
122 : impl_(impl)
123 {
124 }
125
126 void push(T t)
127 {
128 std::unique_lock<std::mutex> lock(impl_->mutex_);
129 impl_->queue_.push(std::move(t));
130 impl_->condition_.notify_one();
131 }
132
133 void stop()
134 {
135 std::unique_lock<std::mutex> lock(impl_->mutex_);
136 impl_->stop_ = true;
137 impl_->condition_.notify_one();
138 }
139
140 private:
141 std::shared_ptr<queue_impl<T>> impl_;
142 };
143
144 // The back end of a queue between consecutive pipeline stages.
145 template <class T>
146 class queue_back
147 {
148 public:
149 typedef T value_type;
150
151 explicit queue_back(std::shared_ptr<queue_impl<T>> impl)
152 : impl_(impl)
153 {
154 }
155
156 bool pop(T& t)
157 {
158 std::unique_lock<std::mutex> lock(impl_->mutex_);
159 while (impl_->queue_.empty() && !impl_->stop_)
160 impl_->condition_.wait(lock);
161 if (!impl_->queue_.empty())
162 {
163 t = impl_->queue_.front();
164 impl_->queue_.pop();
165 return true;
166 }
167 return false;
168 }
169
170 private:
171 std::shared_ptr<queue_impl<T>> impl_;
172 };
173
174 // Launch the last stage in a pipeline.
175 template <class T, class F>
176 std::future<void> pipeline(queue_back<T> in, F f)
177 {
178 // Get the function's associated executor, defaulting to thread_executor.
179 auto ex = get_associated_executor(f, thread_executor());
180
181 // Run the function, and as we're the last stage return a future so that the
182 // caller can wait for the pipeline to finish.
183 return post(ex, use_future([in, f]() mutable { f(in); }));
184 }
185
186 // Launch an intermediate stage in a pipeline.
187 template <class T, class F, class... Tail>
188 std::future<void> pipeline(queue_back<T> in, F f, Tail... t)
189 {
190 // Determine the output queue type.
191 typedef typename executor_binder<F, thread_executor>::second_argument_type::value_type output_value_type;
192
193 // Create the output queue and its implementation.
194 auto out_impl = std::make_shared<queue_impl<output_value_type>>();
195 queue_front<output_value_type> out(out_impl);
196 queue_back<output_value_type> next_in(out_impl);
197
198 // Get the function's associated executor, defaulting to thread_executor.
199 auto ex = get_associated_executor(f, thread_executor());
200
201 // Run the function.
202 post(ex, [in, out, f]() mutable
203 {
204 f(in, out);
205 out.stop();
206 });
207
208 // Launch the rest of the pipeline.
209 return pipeline(next_in, std::move(t)...);
210 }
211
212 // Launch the first stage in a pipeline.
213 template <class F, class... Tail>
214 std::future<void> pipeline(F f, Tail... t)
215 {
216 // Determine the output queue type.
217 typedef typename executor_binder<F, thread_executor>::argument_type::value_type output_value_type;
218
219 // Create the output queue and its implementation.
220 auto out_impl = std::make_shared<queue_impl<output_value_type>>();
221 queue_front<output_value_type> out(out_impl);
222 queue_back<output_value_type> next_in(out_impl);
223
224 // Get the function's associated executor, defaulting to thread_executor.
225 auto ex = get_associated_executor(f, thread_executor());
226
227 // Run the function.
228 post(ex, [out, f]() mutable
229 {
230 f(out);
231 out.stop();
232 });
233
234 // Launch the rest of the pipeline.
235 return pipeline(next_in, std::move(t)...);
236 }
237
238 //------------------------------------------------------------------------------
239
240 #include <boost/asio/thread_pool.hpp>
241 #include <iostream>
242 #include <string>
243
244 using boost::asio::bind_executor;
245 using boost::asio::thread_pool;
246
247 void reader(queue_front<std::string> out)
248 {
249 std::string line;
250 while (std::getline(std::cin, line))
251 out.push(line);
252 }
253
254 void filter(queue_back<std::string> in, queue_front<std::string> out)
255 {
256 std::string line;
257 while (in.pop(line))
258 if (line.length() > 5)
259 out.push(line);
260 }
261
262 void upper(queue_back<std::string> in, queue_front<std::string> out)
263 {
264 std::string line;
265 while (in.pop(line))
266 {
267 std::string new_line;
268 for (char c : line)
269 new_line.push_back(std::toupper(c));
270 out.push(new_line);
271 }
272 }
273
274 void writer(queue_back<std::string> in)
275 {
276 std::size_t count = 0;
277 std::string line;
278 while (in.pop(line))
279 std::cout << count++ << ": " << line << std::endl;
280 }
281
282 int main()
283 {
284 thread_pool pool(1);
285
286 auto f = pipeline(reader, filter, bind_executor(pool, upper), writer);
287 f.wait();
288 }