]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | /* |
2 | * This file is open source software, licensed to you under the terms | |
3 | * of the Apache License, Version 2.0 (the "License"). See the NOTICE file | |
4 | * distributed with this work for additional information regarding copyright | |
5 | * ownership. You may not use this file except in compliance with the License. | |
6 | * | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, | |
12 | * software distributed under the License is distributed on an | |
13 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | * KIND, either express or implied. See the License for the | |
15 | * specific language governing permissions and limitations | |
16 | * under the License. | |
17 | */ | |
18 | /* | |
19 | * Copyright (C) 2017 ScyllaDB Ltd. | |
20 | */ | |
21 | ||
22 | #pragma once | |
23 | ||
24 | #include <seastar/core/future.hh> | |
25 | #include <seastar/core/chunked_fifo.hh> | |
26 | #include <seastar/core/function_traits.hh> | |
27 | #include <seastar/core/sstring.hh> | |
28 | #include <seastar/core/metrics.hh> | |
29 | #include <seastar/core/scheduling.hh> | |
30 | #include <seastar/util/reference_wrapper.hh> | |
31 | #include <seastar/util/gcc6-concepts.hh> | |
32 | #include <seastar/util/noncopyable_function.hh> | |
33 | #include <seastar/util/tuple_utils.hh> | |
34 | #include <seastar/util/defer.hh> | |
35 | #include <seastar/util/std-compat.hh> | |
36 | #include <fmt/format.h> | |
9f95a23c | 37 | #include <fmt/ostream.h> |
11fdf7f2 TL |
38 | #include <vector> |
39 | #include <boost/range/irange.hpp> | |
40 | #include <boost/range/adaptor/transformed.hpp> | |
41 | ||
42 | namespace seastar { | |
43 | ||
44 | /// \defgroup execution-stages Execution Stages | |
45 | /// | |
46 | /// \brief | |
47 | /// Execution stages provide an infrastructure for processing function calls in | |
48 | /// batches in order to improve instruction cache locality. | |
49 | /// | |
50 | /// When the application logic becomes more and more complex and the length | |
51 | /// of the data processing pipeline grows it may happen that the most | |
52 | /// significant bottleneck are instruction cache misses. The solution for that | |
53 | /// problem may be processing similar operations in batches so that instruction | |
54 | /// cache locality is improved at the cost of potentially higher latencies and | |
55 | /// worse data cache locality. | |
56 | /// | |
57 | /// Execution stages allow batching calls to the specified function object. | |
58 | /// Every time concrete_execution_stage::operator()() is used the function call | |
59 | /// is added to the queue and a future is returned. Once the number of queued | |
60 | /// calls reaches certain threshold the stage is flushed and a task is which | |
61 | /// would execute these function calls is scheduled. Execution stages are also | |
62 | /// flushed when the reactor polls for events. | |
63 | /// | |
64 | /// When calling a function that is wrapped inside execution stage it is | |
65 | /// important to remember that the actual function call will happen at some | |
66 | /// later time and it has to be guaranteed the objects passed by lvalue | |
67 | /// reference are still alive. In order to avoid accidental passing of a | |
68 | /// temporary object by lvalue reference the interface of execution stages | |
69 | /// accepts only lvalue references wrapped in reference_wrapper. It is safe to | |
70 | /// pass rvalue references, they are decayed and the objects are moved. See | |
71 | /// concrete_execution_stage::operator()() for more details. | |
72 | ||
73 | /// \addtogroup execution-stages | |
74 | /// @{ | |
75 | ||
76 | /// \cond internal | |
77 | namespace internal { | |
78 | ||
79 | // Execution wraps lreferences in reference_wrapper so that the caller is forced | |
80 | // to use seastar::ref(). Then when the function is actually called the | |
81 | // reference is unwrapped. However, we need to distinguish between functions | |
82 | // which argument is lvalue reference and functions that take | |
83 | // reference_wrapper<> as an argument and not unwrap the latter. To solve this | |
84 | // issue reference_wrapper_for_es type is used for wrappings done automatically | |
85 | // by execution stage. | |
86 | template<typename T> | |
87 | struct reference_wrapper_for_es : reference_wrapper<T> { | |
88 | reference_wrapper_for_es(reference_wrapper <T> rw) noexcept | |
89 | : reference_wrapper<T>(std::move(rw)) {} | |
90 | }; | |
91 | ||
92 | template<typename T> | |
93 | struct wrap_for_es { | |
94 | using type = T; | |
95 | }; | |
96 | ||
97 | template<typename T> | |
98 | struct wrap_for_es<T&> { | |
99 | using type = reference_wrapper_for_es<T>; | |
100 | }; | |
101 | ||
102 | template<typename T> | |
103 | struct wrap_for_es<T&&> { | |
104 | using type = T; | |
105 | }; | |
106 | ||
107 | template<typename T> | |
108 | decltype(auto) unwrap_for_es(T&& object) { | |
109 | return std::forward<T>(object); | |
110 | } | |
111 | ||
112 | template<typename T> | |
113 | std::reference_wrapper<T> unwrap_for_es(reference_wrapper_for_es<T> ref) { | |
114 | return std::reference_wrapper<T>(ref.get()); | |
115 | } | |
116 | ||
117 | } | |
118 | /// \endcond | |
119 | ||
120 | /// Base execution stage class | |
121 | class execution_stage { | |
122 | public: | |
123 | struct stats { | |
124 | uint64_t tasks_scheduled = 0; | |
125 | uint64_t tasks_preempted = 0; | |
126 | uint64_t function_calls_enqueued = 0; | |
127 | uint64_t function_calls_executed = 0; | |
128 | }; | |
129 | protected: | |
130 | bool _empty = true; | |
131 | bool _flush_scheduled = false; | |
132 | scheduling_group _sg; | |
133 | stats _stats; | |
134 | sstring _name; | |
135 | metrics::metric_group _metric_group; | |
136 | protected: | |
137 | virtual void do_flush() noexcept = 0; | |
138 | public: | |
139 | explicit execution_stage(const sstring& name, scheduling_group sg = {}); | |
140 | virtual ~execution_stage(); | |
141 | ||
142 | execution_stage(const execution_stage&) = delete; | |
143 | ||
144 | /// Move constructor | |
145 | /// | |
146 | /// \warning It is illegal to move execution_stage after any operation has | |
147 | /// been pushed to it. The only reason why the move constructor is not | |
148 | /// deleted is the fact that C++14 does not guarantee return value | |
149 | /// optimisation which is required by make_execution_stage(). | |
150 | execution_stage(execution_stage&&); | |
151 | ||
152 | /// Returns execution stage name | |
153 | const sstring& name() const noexcept { return _name; } | |
154 | ||
155 | /// Returns execution stage usage statistics | |
156 | const stats& get_stats() const noexcept { return _stats; } | |
157 | ||
158 | /// Flushes execution stage | |
159 | /// | |
160 | /// Ensures that a task which would execute all queued operations is | |
161 | /// scheduled. Does not schedule a new task if there is one already pending | |
162 | /// or the queue is empty. | |
163 | /// | |
164 | /// \return true if a new task has been scheduled | |
165 | bool flush() noexcept; | |
166 | ||
167 | /// Checks whether there are pending operations. | |
168 | /// | |
169 | /// \return true if there is at least one queued operation | |
170 | bool poll() const noexcept { | |
171 | return !_empty; | |
172 | } | |
173 | }; | |
174 | ||
175 | /// \cond internal | |
176 | namespace internal { | |
177 | ||
178 | class execution_stage_manager { | |
179 | std::vector<execution_stage*> _execution_stages; | |
180 | std::unordered_map<sstring, execution_stage*> _stages_by_name; | |
181 | private: | |
182 | execution_stage_manager() = default; | |
183 | execution_stage_manager(const execution_stage_manager&) = delete; | |
184 | execution_stage_manager(execution_stage_manager&&) = delete; | |
185 | public: | |
186 | void register_execution_stage(execution_stage& stage); | |
187 | void unregister_execution_stage(execution_stage& stage) noexcept; | |
188 | void update_execution_stage_registration(execution_stage& old_es, execution_stage& new_es) noexcept; | |
189 | execution_stage* get_stage(const sstring& name); | |
190 | bool flush() noexcept; | |
191 | bool poll() const noexcept; | |
192 | public: | |
193 | static execution_stage_manager& get() noexcept; | |
194 | }; | |
195 | ||
196 | } | |
197 | /// \endcond | |
198 | ||
199 | /// \brief Concrete execution stage class | |
200 | /// | |
201 | /// \note The recommended way of creating execution stages is to use | |
202 | /// make_execution_stage(). | |
203 | /// | |
204 | /// \tparam ReturnType return type of the function object | |
205 | /// \tparam Args argument pack containing arguments to the function object, needs | |
206 | /// to have move constructor that doesn't throw | |
207 | template<typename ReturnType, typename... Args> | |
208 | GCC6_CONCEPT(requires std::is_nothrow_move_constructible<std::tuple<Args...>>::value) | |
209 | class concrete_execution_stage final : public execution_stage { | |
210 | using args_tuple = std::tuple<Args...>; | |
211 | static_assert(std::is_nothrow_move_constructible<args_tuple>::value, | |
212 | "Function arguments need to be nothrow move constructible"); | |
213 | ||
214 | static constexpr size_t flush_threshold = 128; | |
9f95a23c | 215 | static constexpr size_t max_queue_length = 1024; |
11fdf7f2 TL |
216 | |
217 | using return_type = futurize_t<ReturnType>; | |
218 | using promise_type = typename return_type::promise_type; | |
219 | using input_type = typename tuple_map_types<internal::wrap_for_es, args_tuple>::type; | |
220 | ||
221 | struct work_item { | |
222 | input_type _in; | |
223 | promise_type _ready; | |
224 | ||
225 | work_item(typename internal::wrap_for_es<Args>::type... args) : _in(std::move(args)...) { } | |
226 | ||
227 | work_item(work_item&& other) = delete; | |
228 | work_item(const work_item&) = delete; | |
229 | work_item(work_item&) = delete; | |
230 | }; | |
231 | chunked_fifo<work_item, flush_threshold> _queue; | |
232 | ||
233 | noncopyable_function<ReturnType (Args...)> _function; | |
234 | private: | |
235 | auto unwrap(input_type&& in) { | |
236 | return tuple_map(std::move(in), [] (auto&& obj) { | |
237 | return internal::unwrap_for_es(std::forward<decltype(obj)>(obj)); | |
238 | }); | |
239 | } | |
240 | ||
241 | virtual void do_flush() noexcept override { | |
242 | while (!_queue.empty()) { | |
243 | auto& wi = _queue.front(); | |
9f95a23c TL |
244 | auto wi_in = std::move(wi._in); |
245 | auto wi_ready = std::move(wi._ready); | |
11fdf7f2 | 246 | _queue.pop_front(); |
9f95a23c | 247 | futurize<ReturnType>::apply(_function, unwrap(std::move(wi_in))).forward_to(std::move(wi_ready)); |
11fdf7f2 TL |
248 | _stats.function_calls_executed++; |
249 | ||
250 | if (need_preempt()) { | |
251 | _stats.tasks_preempted++; | |
252 | break; | |
253 | } | |
254 | } | |
255 | _empty = _queue.empty(); | |
256 | } | |
257 | public: | |
258 | explicit concrete_execution_stage(const sstring& name, scheduling_group sg, noncopyable_function<ReturnType (Args...)> f) | |
259 | : execution_stage(name, sg) | |
260 | , _function(std::move(f)) | |
261 | { | |
262 | _queue.reserve(flush_threshold); | |
263 | } | |
264 | explicit concrete_execution_stage(const sstring& name, noncopyable_function<ReturnType (Args...)> f) | |
265 | : concrete_execution_stage(name, scheduling_group(), std::move(f)) { | |
266 | } | |
267 | ||
268 | /// Enqueues a call to the stage's function | |
269 | /// | |
270 | /// Adds a function call to the queue. Objects passed by value are moved, | |
271 | /// rvalue references are decayed and the objects are moved, lvalue | |
272 | /// references need to be explicitly wrapped using seastar::ref(). | |
273 | /// | |
274 | /// Usage example: | |
275 | /// ``` | |
276 | /// void do_something(int&, int, std::vector<int>&&); | |
277 | /// thread_local auto stage = seastar::make_execution_stage("execution-stage", do_something); | |
278 | /// | |
279 | /// int global_value; | |
280 | /// | |
281 | /// future<> func(std::vector<int> vec) { | |
282 | /// //return stage(global_value, 42, std::move(vec)); // fail: use seastar::ref to pass references | |
283 | /// return stage(seastar::ref(global_value), 42, std::move(vec)); // ok | |
284 | /// } | |
285 | /// ``` | |
286 | /// | |
287 | /// \param args arguments passed to the stage's function | |
288 | /// \return future containing the result of the call to the stage's function | |
289 | return_type operator()(typename internal::wrap_for_es<Args>::type... args) { | |
9f95a23c TL |
290 | if (_queue.size() >= max_queue_length) { |
291 | do_flush(); | |
292 | } | |
11fdf7f2 TL |
293 | _queue.emplace_back(std::move(args)...); |
294 | _empty = false; | |
295 | _stats.function_calls_enqueued++; | |
296 | auto f = _queue.back()._ready.get_future(); | |
297 | flush(); | |
298 | return f; | |
299 | } | |
300 | }; | |
301 | ||
302 | ||
303 | /// \brief Concrete execution stage class, with support for automatic \ref scheduling_group inheritance | |
304 | /// | |
305 | /// A variation of \ref concrete_execution_stage that inherits the \ref scheduling_group | |
306 | /// from the caller. Each call (of `operator()`) can be in its own scheduling group. | |
307 | /// | |
308 | /// \tparam ReturnType return type of the function object | |
309 | /// \tparam Args argument pack containing arguments to the function object, needs | |
310 | /// to have move constructor that doesn't throw | |
311 | template<typename ReturnType, typename... Args> | |
312 | GCC6_CONCEPT(requires std::is_nothrow_move_constructible<std::tuple<Args...>>::value) | |
313 | class inheriting_concrete_execution_stage final { | |
314 | using return_type = futurize_t<ReturnType>; | |
315 | using args_tuple = std::tuple<Args...>; | |
316 | using per_group_stage_type = concrete_execution_stage<ReturnType, Args...>; | |
317 | ||
318 | static_assert(std::is_nothrow_move_constructible<args_tuple>::value, | |
319 | "Function arguments need to be nothrow move constructible"); | |
320 | ||
321 | sstring _name; | |
322 | noncopyable_function<ReturnType (Args...)> _function; | |
323 | std::vector<compat::optional<per_group_stage_type>> _stage_for_group{max_scheduling_groups()}; | |
324 | private: | |
325 | per_group_stage_type make_stage_for_group(scheduling_group sg) { | |
326 | // We can't use std::ref(function), because reference_wrapper decays to noncopyable_function& and | |
327 | // that selects the noncopyable_function copy constructor. Use a lambda instead. | |
328 | auto wrapped_function = [&_function = _function] (Args... args) { | |
329 | return _function(std::forward<Args>(args)...); | |
330 | }; | |
331 | auto name = fmt::format("{}.{}", _name, sg.name()); | |
332 | return per_group_stage_type(name, sg, wrapped_function); | |
333 | } | |
334 | public: | |
335 | /// Construct an inheriting concrete execution stage. | |
336 | /// | |
337 | /// \param name A name for the execution stage; must be unique | |
338 | /// \param f Function to be called in response to operator(). The function | |
339 | /// call will be deferred and batched with similar calls to increase | |
340 | /// instruction cache hit rate. | |
341 | inheriting_concrete_execution_stage(const sstring& name, noncopyable_function<ReturnType (Args...)> f) | |
342 | : _name(std::move(name)),_function(std::move(f)) { | |
343 | } | |
344 | ||
345 | /// Enqueues a call to the stage's function | |
346 | /// | |
347 | /// Adds a function call to the queue. Objects passed by value are moved, | |
348 | /// rvalue references are decayed and the objects are moved, lvalue | |
349 | /// references need to be explicitly wrapped using seastar::ref(). | |
350 | /// | |
351 | /// The caller's \ref scheduling_group will be preserved across the call. | |
352 | /// | |
353 | /// Usage example: | |
354 | /// ``` | |
355 | /// void do_something(int); | |
356 | /// thread_local auto stage = seastar::inheriting_concrete_execution_stage<int>("execution-stage", do_something); | |
357 | /// | |
358 | /// future<> func(int x) { | |
359 | /// return stage(x); | |
360 | /// } | |
361 | /// ``` | |
362 | /// | |
363 | /// \param args arguments passed to the stage's function | |
364 | /// \return future containing the result of the call to the stage's function | |
365 | return_type operator()(typename internal::wrap_for_es<Args>::type... args) { | |
366 | auto sg = current_scheduling_group(); | |
367 | auto sg_id = internal::scheduling_group_index(sg); | |
368 | auto& slot = _stage_for_group[sg_id]; | |
369 | if (!slot) { | |
370 | slot.emplace(make_stage_for_group(sg)); | |
371 | } | |
372 | return (*slot)(std::move(args)...); | |
373 | } | |
374 | }; | |
375 | ||
376 | ||
377 | /// \cond internal | |
378 | namespace internal { | |
379 | ||
380 | template <typename Ret, typename ArgsTuple> | |
381 | struct concrete_execution_stage_helper; | |
382 | ||
383 | template <typename Ret, typename... Args> | |
384 | struct concrete_execution_stage_helper<Ret, std::tuple<Args...>> { | |
385 | using type = concrete_execution_stage<Ret, Args...>; | |
386 | }; | |
387 | ||
388 | } | |
389 | /// \endcond | |
390 | ||
391 | /// Creates a new execution stage | |
392 | /// | |
393 | /// Wraps given function object in a concrete_execution_stage. All arguments | |
394 | /// of the function object are required to have move constructors that do not | |
395 | /// throw. Function object may return a future or an immediate object or void. | |
396 | /// | |
397 | /// Moving execution stages is discouraged and illegal after first function | |
398 | /// call is enqueued. | |
399 | /// | |
400 | /// Usage example: | |
401 | /// ``` | |
402 | /// double do_something(int); | |
403 | /// thread_local auto stage1 = seastar::make_execution_stage("execution-stage1", do_something); | |
404 | /// | |
405 | /// future<double> func1(int val) { | |
406 | /// return stage1(val); | |
407 | /// } | |
408 | /// | |
409 | /// future<double> do_some_io(int); | |
410 | /// thread_local auto stage2 = seastar::make_execution_stage("execution-stage2", do_some_io); | |
411 | /// | |
412 | /// future<double> func2(int val) { | |
413 | /// return stage2(val); | |
414 | /// } | |
415 | /// ``` | |
416 | /// | |
417 | /// \param name unique name of the execution stage | |
418 | /// \param sg scheduling group to run under | |
419 | /// \param fn function to be executed by the stage | |
420 | /// \return concrete_execution_stage | |
421 | /// | |
422 | template<typename Function> | |
423 | auto make_execution_stage(const sstring& name, scheduling_group sg, Function&& fn) { | |
424 | using traits = function_traits<Function>; | |
425 | using ret_type = typename traits::return_type; | |
426 | using args_as_tuple = typename traits::args_as_tuple; | |
427 | using concrete_execution_stage = typename internal::concrete_execution_stage_helper<ret_type, args_as_tuple>::type; | |
428 | return concrete_execution_stage(name, sg, std::forward<Function>(fn)); | |
429 | } | |
430 | ||
431 | /// Creates a new execution stage (variant taking \ref scheduling_group) | |
432 | /// | |
433 | /// Wraps given function object in a concrete_execution_stage. All arguments | |
434 | /// of the function object are required to have move constructors that do not | |
435 | /// throw. Function object may return a future or an immediate object or void. | |
436 | /// | |
437 | /// Moving execution stages is discouraged and illegal after first function | |
438 | /// call is enqueued. | |
439 | /// | |
440 | /// Usage example: | |
441 | /// ``` | |
442 | /// double do_something(int); | |
443 | /// thread_local auto stage1 = seastar::make_execution_stage("execution-stage1", do_something); | |
444 | /// | |
445 | /// future<double> func1(int val) { | |
446 | /// return stage1(val); | |
447 | /// } | |
448 | /// | |
449 | /// future<double> do_some_io(int); | |
450 | /// thread_local auto stage2 = seastar::make_execution_stage("execution-stage2", do_some_io); | |
451 | /// | |
452 | /// future<double> func2(int val) { | |
453 | /// return stage2(val); | |
454 | /// } | |
455 | /// ``` | |
456 | /// | |
457 | /// \param name unique name of the execution stage (variant not taking \ref scheduling_group) | |
458 | /// \param fn function to be executed by the stage | |
459 | /// \return concrete_execution_stage | |
460 | /// | |
461 | template<typename Function> | |
462 | auto make_execution_stage(const sstring& name, Function&& fn) { | |
463 | return make_execution_stage(name, scheduling_group(), std::forward<Function>(fn)); | |
464 | } | |
465 | ||
466 | /// Creates a new execution stage from a member function | |
467 | /// | |
468 | /// Wraps a pointer to member function in a concrete_execution_stage. When | |
469 | /// a function call is pushed to the stage the first argument should be a | |
470 | /// pointer to the object the function is a member of. | |
471 | /// | |
472 | /// Usage example: | |
473 | /// ``` | |
474 | /// struct foo { | |
475 | /// void do_something(int); | |
476 | /// }; | |
477 | /// | |
478 | /// thread_local auto stage = seastar::make_execution_stage("execution-stage", &foo::do_something); | |
479 | /// | |
480 | /// future<> func(foo& obj, int val) { | |
481 | /// return stage(&obj, val); | |
482 | /// } | |
483 | /// ``` | |
484 | /// | |
485 | /// \see make_execution_stage(const sstring&, Function&&) | |
486 | /// \param name unique name of the execution stage | |
487 | /// \param fn member function to be executed by the stage | |
488 | /// \return concrete_execution_stage | |
489 | template<typename Ret, typename Object, typename... Args> | |
490 | concrete_execution_stage<Ret, Object*, Args...> | |
491 | make_execution_stage(const sstring& name, scheduling_group sg, Ret (Object::*fn)(Args...)) { | |
492 | return concrete_execution_stage<Ret, Object*, Args...>(name, sg, std::mem_fn(fn)); | |
493 | } | |
494 | ||
495 | template<typename Ret, typename Object, typename... Args> | |
496 | concrete_execution_stage<Ret, const Object*, Args...> | |
497 | make_execution_stage(const sstring& name, scheduling_group sg, Ret (Object::*fn)(Args...) const) { | |
498 | return concrete_execution_stage<Ret, const Object*, Args...>(name, sg, std::mem_fn(fn)); | |
499 | } | |
500 | ||
501 | template<typename Ret, typename Object, typename... Args> | |
502 | concrete_execution_stage<Ret, Object*, Args...> | |
503 | make_execution_stage(const sstring& name, Ret (Object::*fn)(Args...)) { | |
504 | return make_execution_stage(name, scheduling_group(), fn); | |
505 | } | |
506 | ||
507 | template<typename Ret, typename Object, typename... Args> | |
508 | concrete_execution_stage<Ret, const Object*, Args...> | |
509 | make_execution_stage(const sstring& name, Ret (Object::*fn)(Args...) const) { | |
510 | return make_execution_stage(name, scheduling_group(), fn); | |
511 | } | |
512 | ||
513 | /// @} | |
514 | ||
515 | } |