]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/include/seastar/core/execution_stage.hh
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / seastar / include / seastar / core / execution_stage.hh
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18 /*
19 * Copyright (C) 2017 ScyllaDB Ltd.
20 */
21
22 #pragma once
23
24 #include <seastar/core/future.hh>
25 #include <seastar/core/chunked_fifo.hh>
26 #include <seastar/core/function_traits.hh>
27 #include <seastar/core/sstring.hh>
28 #include <seastar/core/metrics.hh>
29 #include <seastar/core/scheduling.hh>
30 #include <seastar/util/reference_wrapper.hh>
31 #include <seastar/util/concepts.hh>
32 #include <seastar/util/noncopyable_function.hh>
33 #include <seastar/util/tuple_utils.hh>
34 #include <seastar/util/defer.hh>
35 #include <seastar/util/std-compat.hh>
36 #include <fmt/format.h>
37 #include <fmt/ostream.h>
38 #include <vector>
39 #include <boost/range/irange.hpp>
40 #include <boost/range/adaptor/transformed.hpp>
41 #include <boost/container/static_vector.hpp>
42
43 namespace seastar {
44
45 /// \defgroup execution-stages Execution Stages
46 ///
47 /// \brief
48 /// Execution stages provide an infrastructure for processing function calls in
49 /// batches in order to improve instruction cache locality.
50 ///
51 /// When the application logic becomes more and more complex and the length
52 /// of the data processing pipeline grows it may happen that the most
53 /// significant bottleneck are instruction cache misses. The solution for that
54 /// problem may be processing similar operations in batches so that instruction
55 /// cache locality is improved at the cost of potentially higher latencies and
56 /// worse data cache locality.
57 ///
58 /// Execution stages allow batching calls to the specified function object.
59 /// Every time concrete_execution_stage::operator()() is used the function call
60 /// is added to the queue and a future is returned. Once the number of queued
61 /// calls reaches certain threshold the stage is flushed and a task is which
62 /// would execute these function calls is scheduled. Execution stages are also
63 /// flushed when the reactor polls for events.
64 ///
65 /// When calling a function that is wrapped inside execution stage it is
66 /// important to remember that the actual function call will happen at some
67 /// later time and it has to be guaranteed the objects passed by lvalue
68 /// reference are still alive. In order to avoid accidental passing of a
69 /// temporary object by lvalue reference the interface of execution stages
70 /// accepts only lvalue references wrapped in reference_wrapper. It is safe to
71 /// pass rvalue references, they are decayed and the objects are moved. See
72 /// concrete_execution_stage::operator()() for more details.
73
74 /// \addtogroup execution-stages
75 /// @{
76
77 /// \cond internal
78 namespace internal {
79
80 // Execution wraps lreferences in reference_wrapper so that the caller is forced
81 // to use seastar::ref(). Then when the function is actually called the
82 // reference is unwrapped. However, we need to distinguish between functions
83 // which argument is lvalue reference and functions that take
84 // reference_wrapper<> as an argument and not unwrap the latter. To solve this
85 // issue reference_wrapper_for_es type is used for wrappings done automatically
86 // by execution stage.
87 template<typename T>
88 struct reference_wrapper_for_es : reference_wrapper<T> {
89 reference_wrapper_for_es(reference_wrapper <T> rw) noexcept
90 : reference_wrapper<T>(std::move(rw)) {}
91 };
92
93 template<typename T>
94 struct wrap_for_es {
95 using type = T;
96 };
97
98 template<typename T>
99 struct wrap_for_es<T&> {
100 using type = reference_wrapper_for_es<T>;
101 };
102
103 template<typename T>
104 struct wrap_for_es<T&&> {
105 using type = T;
106 };
107
108 template<typename T>
109 decltype(auto) unwrap_for_es(T&& object) {
110 return std::forward<T>(object);
111 }
112
113 template<typename T>
114 std::reference_wrapper<T> unwrap_for_es(reference_wrapper_for_es<T> ref) {
115 return std::reference_wrapper<T>(ref.get());
116 }
117
118 }
119 /// \endcond
120
121 /// Base execution stage class
122 class execution_stage {
123 public:
124 struct stats {
125 uint64_t tasks_scheduled = 0;
126 uint64_t tasks_preempted = 0;
127 uint64_t function_calls_enqueued = 0;
128 uint64_t function_calls_executed = 0;
129 };
130 protected:
131 bool _empty = true;
132 bool _flush_scheduled = false;
133 scheduling_group _sg;
134 stats _stats;
135 sstring _name;
136 metrics::metric_group _metric_group;
137 protected:
138 virtual void do_flush() noexcept = 0;
139 public:
140 explicit execution_stage(const sstring& name, scheduling_group sg = {});
141 virtual ~execution_stage();
142
143 execution_stage(const execution_stage&) = delete;
144
145 /// Move constructor
146 ///
147 /// \warning It is illegal to move execution_stage after any operation has
148 /// been pushed to it. The only reason why the move constructor is not
149 /// deleted is the fact that C++14 does not guarantee return value
150 /// optimisation which is required by make_execution_stage().
151 execution_stage(execution_stage&&);
152
153 /// Returns execution stage name
154 const sstring& name() const noexcept { return _name; }
155
156 /// Returns execution stage usage statistics
157 const stats& get_stats() const noexcept { return _stats; }
158
159 /// Flushes execution stage
160 ///
161 /// Ensures that a task which would execute all queued operations is
162 /// scheduled. Does not schedule a new task if there is one already pending
163 /// or the queue is empty.
164 ///
165 /// \return true if a new task has been scheduled
166 bool flush() noexcept;
167
168 /// Checks whether there are pending operations.
169 ///
170 /// \return true if there is at least one queued operation
171 bool poll() const noexcept {
172 return !_empty;
173 }
174 };
175
176 /// \cond internal
177 namespace internal {
178
179 class execution_stage_manager {
180 std::vector<execution_stage*> _execution_stages;
181 std::unordered_map<sstring, execution_stage*> _stages_by_name;
182 private:
183 execution_stage_manager() = default;
184 execution_stage_manager(const execution_stage_manager&) = delete;
185 execution_stage_manager(execution_stage_manager&&) = delete;
186 public:
187 void register_execution_stage(execution_stage& stage);
188 void unregister_execution_stage(execution_stage& stage) noexcept;
189 void update_execution_stage_registration(execution_stage& old_es, execution_stage& new_es) noexcept;
190 execution_stage* get_stage(const sstring& name);
191 bool flush() noexcept;
192 bool poll() const noexcept;
193 public:
194 static execution_stage_manager& get() noexcept;
195 };
196
197 }
198 /// \endcond
199
200 /// \brief Concrete execution stage class
201 ///
202 /// \note The recommended way of creating execution stages is to use
203 /// make_execution_stage().
204 ///
205 /// \tparam ReturnType return type of the function object
206 /// \tparam Args argument pack containing arguments to the function object, needs
207 /// to have move constructor that doesn't throw
208 template<typename ReturnType, typename... Args>
209 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible<std::tuple<Args...>>::value)
210 class concrete_execution_stage final : public execution_stage {
211 using args_tuple = std::tuple<Args...>;
212 static_assert(std::is_nothrow_move_constructible<args_tuple>::value,
213 "Function arguments need to be nothrow move constructible");
214
215 static constexpr size_t flush_threshold = 128;
216 static constexpr size_t max_queue_length = 1024;
217
218 using return_type = futurize_t<ReturnType>;
219 using promise_type = typename return_type::promise_type;
220 using input_type = typename tuple_map_types<internal::wrap_for_es, args_tuple>::type;
221
222 struct work_item {
223 input_type _in;
224 promise_type _ready;
225
226 work_item(typename internal::wrap_for_es<Args>::type... args) : _in(std::move(args)...) { }
227
228 work_item(work_item&& other) = delete;
229 work_item(const work_item&) = delete;
230 work_item(work_item&) = delete;
231 };
232 chunked_fifo<work_item, flush_threshold> _queue;
233
234 noncopyable_function<ReturnType (Args...)> _function;
235 private:
236 auto unwrap(input_type&& in) {
237 return tuple_map(std::move(in), [] (auto&& obj) {
238 return internal::unwrap_for_es(std::forward<decltype(obj)>(obj));
239 });
240 }
241
242 virtual void do_flush() noexcept override {
243 while (!_queue.empty()) {
244 auto& wi = _queue.front();
245 auto wi_in = std::move(wi._in);
246 auto wi_ready = std::move(wi._ready);
247 _queue.pop_front();
248 futurize<ReturnType>::apply(_function, unwrap(std::move(wi_in))).forward_to(std::move(wi_ready));
249 _stats.function_calls_executed++;
250
251 if (need_preempt()) {
252 _stats.tasks_preempted++;
253 break;
254 }
255 }
256 _empty = _queue.empty();
257 }
258 public:
259 explicit concrete_execution_stage(const sstring& name, scheduling_group sg, noncopyable_function<ReturnType (Args...)> f)
260 : execution_stage(name, sg)
261 , _function(std::move(f))
262 {
263 _queue.reserve(flush_threshold);
264 }
265 explicit concrete_execution_stage(const sstring& name, noncopyable_function<ReturnType (Args...)> f)
266 : concrete_execution_stage(name, scheduling_group(), std::move(f)) {
267 }
268
269 /// Enqueues a call to the stage's function
270 ///
271 /// Adds a function call to the queue. Objects passed by value are moved,
272 /// rvalue references are decayed and the objects are moved, lvalue
273 /// references need to be explicitly wrapped using seastar::ref().
274 ///
275 /// Usage example:
276 /// ```
277 /// void do_something(int&, int, std::vector<int>&&);
278 /// thread_local auto stage = seastar::make_execution_stage("execution-stage", do_something);
279 ///
280 /// int global_value;
281 ///
282 /// future<> func(std::vector<int> vec) {
283 /// //return stage(global_value, 42, std::move(vec)); // fail: use seastar::ref to pass references
284 /// return stage(seastar::ref(global_value), 42, std::move(vec)); // ok
285 /// }
286 /// ```
287 ///
288 /// \param args arguments passed to the stage's function
289 /// \return future containing the result of the call to the stage's function
290 return_type operator()(typename internal::wrap_for_es<Args>::type... args) {
291 if (_queue.size() >= max_queue_length) {
292 do_flush();
293 }
294 _queue.emplace_back(std::move(args)...);
295 _empty = false;
296 _stats.function_calls_enqueued++;
297 auto f = _queue.back()._ready.get_future();
298 flush();
299 return f;
300 }
301 };
302
303 /// \brief Base class for execution stages with support for automatic \ref scheduling_group inheritance
304 class inheriting_execution_stage {
305 public:
306 struct per_scheduling_group_stats {
307 scheduling_group sg;
308 execution_stage::stats stats;
309 };
310 using stats = boost::container::static_vector<per_scheduling_group_stats, max_scheduling_groups()>;
311 };
312
313 /// \brief Concrete execution stage class, with support for automatic \ref scheduling_group inheritance
314 ///
315 /// A variation of \ref concrete_execution_stage that inherits the \ref scheduling_group
316 /// from the caller. Each call (of `operator()`) can be in its own scheduling group.
317 ///
318 /// \tparam ReturnType return type of the function object
319 /// \tparam Args argument pack containing arguments to the function object, needs
320 /// to have move constructor that doesn't throw
321 template<typename ReturnType, typename... Args>
322 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible<std::tuple<Args...>>::value)
323 class inheriting_concrete_execution_stage final : public inheriting_execution_stage {
324 using return_type = futurize_t<ReturnType>;
325 using args_tuple = std::tuple<Args...>;
326 using per_group_stage_type = concrete_execution_stage<ReturnType, Args...>;
327
328 static_assert(std::is_nothrow_move_constructible<args_tuple>::value,
329 "Function arguments need to be nothrow move constructible");
330
331 sstring _name;
332 noncopyable_function<ReturnType (Args...)> _function;
333 std::vector<std::optional<per_group_stage_type>> _stage_for_group{max_scheduling_groups()};
334 private:
335 per_group_stage_type make_stage_for_group(scheduling_group sg) {
336 // We can't use std::ref(function), because reference_wrapper decays to noncopyable_function& and
337 // that selects the noncopyable_function copy constructor. Use a lambda instead.
338 auto wrapped_function = [&_function = _function] (Args... args) {
339 return _function(std::forward<Args>(args)...);
340 };
341 auto name = fmt::format("{}.{}", _name, sg.name());
342 return per_group_stage_type(name, sg, wrapped_function);
343 }
344 public:
345 /// Construct an inheriting concrete execution stage.
346 ///
347 /// \param name A name for the execution stage; must be unique
348 /// \param f Function to be called in response to operator(). The function
349 /// call will be deferred and batched with similar calls to increase
350 /// instruction cache hit rate.
351 inheriting_concrete_execution_stage(const sstring& name, noncopyable_function<ReturnType (Args...)> f)
352 : _name(std::move(name)),_function(std::move(f)) {
353 }
354
355 /// Enqueues a call to the stage's function
356 ///
357 /// Adds a function call to the queue. Objects passed by value are moved,
358 /// rvalue references are decayed and the objects are moved, lvalue
359 /// references need to be explicitly wrapped using seastar::ref().
360 ///
361 /// The caller's \ref scheduling_group will be preserved across the call.
362 ///
363 /// Usage example:
364 /// ```
365 /// void do_something(int);
366 /// thread_local auto stage = seastar::inheriting_concrete_execution_stage<int>("execution-stage", do_something);
367 ///
368 /// future<> func(int x) {
369 /// return stage(x);
370 /// }
371 /// ```
372 ///
373 /// \param args arguments passed to the stage's function
374 /// \return future containing the result of the call to the stage's function
375 return_type operator()(typename internal::wrap_for_es<Args>::type... args) {
376 auto sg = current_scheduling_group();
377 auto sg_id = internal::scheduling_group_index(sg);
378 auto& slot = _stage_for_group[sg_id];
379 if (!slot) {
380 slot.emplace(make_stage_for_group(sg));
381 }
382 return (*slot)(std::move(args)...);
383 }
384
385 /// Returns summary of individual execution stage usage statistics
386 ///
387 /// \returns a vector of the stats of the individual per-scheduling group
388 /// executation stages. Each element in the vector is a pair composed of
389 /// the scheduling group and the stats for the respective execution
390 /// stage. Scheduling groups that have had no respective calls enqueued
391 /// yet are omitted.
392 inheriting_execution_stage::stats get_stats() const noexcept {
393 inheriting_execution_stage::stats summary;
394 for (unsigned sg_id = 0; sg_id != _stage_for_group.size(); ++sg_id) {
395 auto sg = internal::scheduling_group_from_index(sg_id);
396 if (_stage_for_group[sg_id]) {
397 summary.push_back({sg, _stage_for_group[sg_id]->get_stats()});
398 }
399 }
400 return summary;
401 }
402 };
403
404
405 /// \cond internal
406 namespace internal {
407
408 template <typename Ret, typename ArgsTuple>
409 struct concrete_execution_stage_helper;
410
411 template <typename Ret, typename... Args>
412 struct concrete_execution_stage_helper<Ret, std::tuple<Args...>> {
413 using type = concrete_execution_stage<Ret, Args...>;
414 };
415
416 }
417 /// \endcond
418
419 /// Creates a new execution stage
420 ///
421 /// Wraps given function object in a concrete_execution_stage. All arguments
422 /// of the function object are required to have move constructors that do not
423 /// throw. Function object may return a future or an immediate object or void.
424 ///
425 /// Moving execution stages is discouraged and illegal after first function
426 /// call is enqueued.
427 ///
428 /// Usage example:
429 /// ```
430 /// double do_something(int);
431 /// thread_local auto stage1 = seastar::make_execution_stage("execution-stage1", do_something);
432 ///
433 /// future<double> func1(int val) {
434 /// return stage1(val);
435 /// }
436 ///
437 /// future<double> do_some_io(int);
438 /// thread_local auto stage2 = seastar::make_execution_stage("execution-stage2", do_some_io);
439 ///
440 /// future<double> func2(int val) {
441 /// return stage2(val);
442 /// }
443 /// ```
444 ///
445 /// \param name unique name of the execution stage
446 /// \param sg scheduling group to run under
447 /// \param fn function to be executed by the stage
448 /// \return concrete_execution_stage
449 ///
450 template<typename Function>
451 auto make_execution_stage(const sstring& name, scheduling_group sg, Function&& fn) {
452 using traits = function_traits<Function>;
453 using ret_type = typename traits::return_type;
454 using args_as_tuple = typename traits::args_as_tuple;
455 using concrete_execution_stage = typename internal::concrete_execution_stage_helper<ret_type, args_as_tuple>::type;
456 return concrete_execution_stage(name, sg, std::forward<Function>(fn));
457 }
458
459 /// Creates a new execution stage (variant taking \ref scheduling_group)
460 ///
461 /// Wraps given function object in a concrete_execution_stage. All arguments
462 /// of the function object are required to have move constructors that do not
463 /// throw. Function object may return a future or an immediate object or void.
464 ///
465 /// Moving execution stages is discouraged and illegal after first function
466 /// call is enqueued.
467 ///
468 /// Usage example:
469 /// ```
470 /// double do_something(int);
471 /// thread_local auto stage1 = seastar::make_execution_stage("execution-stage1", do_something);
472 ///
473 /// future<double> func1(int val) {
474 /// return stage1(val);
475 /// }
476 ///
477 /// future<double> do_some_io(int);
478 /// thread_local auto stage2 = seastar::make_execution_stage("execution-stage2", do_some_io);
479 ///
480 /// future<double> func2(int val) {
481 /// return stage2(val);
482 /// }
483 /// ```
484 ///
485 /// \param name unique name of the execution stage (variant not taking \ref scheduling_group)
486 /// \param fn function to be executed by the stage
487 /// \return concrete_execution_stage
488 ///
489 template<typename Function>
490 auto make_execution_stage(const sstring& name, Function&& fn) {
491 return make_execution_stage(name, scheduling_group(), std::forward<Function>(fn));
492 }
493
494 /// Creates a new execution stage from a member function
495 ///
496 /// Wraps a pointer to member function in a concrete_execution_stage. When
497 /// a function call is pushed to the stage the first argument should be a
498 /// pointer to the object the function is a member of.
499 ///
500 /// Usage example:
501 /// ```
502 /// struct foo {
503 /// void do_something(int);
504 /// };
505 ///
506 /// thread_local auto stage = seastar::make_execution_stage("execution-stage", &foo::do_something);
507 ///
508 /// future<> func(foo& obj, int val) {
509 /// return stage(&obj, val);
510 /// }
511 /// ```
512 ///
513 /// \see make_execution_stage(const sstring&, Function&&)
514 /// \param name unique name of the execution stage
515 /// \param fn member function to be executed by the stage
516 /// \return concrete_execution_stage
517 template<typename Ret, typename Object, typename... Args>
518 concrete_execution_stage<Ret, Object*, Args...>
519 make_execution_stage(const sstring& name, scheduling_group sg, Ret (Object::*fn)(Args...)) {
520 return concrete_execution_stage<Ret, Object*, Args...>(name, sg, std::mem_fn(fn));
521 }
522
523 template<typename Ret, typename Object, typename... Args>
524 concrete_execution_stage<Ret, const Object*, Args...>
525 make_execution_stage(const sstring& name, scheduling_group sg, Ret (Object::*fn)(Args...) const) {
526 return concrete_execution_stage<Ret, const Object*, Args...>(name, sg, std::mem_fn(fn));
527 }
528
529 template<typename Ret, typename Object, typename... Args>
530 concrete_execution_stage<Ret, Object*, Args...>
531 make_execution_stage(const sstring& name, Ret (Object::*fn)(Args...)) {
532 return make_execution_stage(name, scheduling_group(), fn);
533 }
534
535 template<typename Ret, typename Object, typename... Args>
536 concrete_execution_stage<Ret, const Object*, Args...>
537 make_execution_stage(const sstring& name, Ret (Object::*fn)(Args...) const) {
538 return make_execution_stage(name, scheduling_group(), fn);
539 }
540
541 /// @}
542
543 }