]> git.proxmox.com Git - ceph.git/blame - ceph/src/seastar/include/seastar/core/execution_stage.hh
import 15.2.0 Octopus source
[ceph.git] / ceph / src / seastar / include / seastar / core / execution_stage.hh
CommitLineData
11fdf7f2
TL
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2017 ScyllaDB Ltd.
20 */
21
22#pragma once
23
24#include <seastar/core/future.hh>
25#include <seastar/core/chunked_fifo.hh>
26#include <seastar/core/function_traits.hh>
27#include <seastar/core/sstring.hh>
28#include <seastar/core/metrics.hh>
29#include <seastar/core/scheduling.hh>
30#include <seastar/util/reference_wrapper.hh>
31#include <seastar/util/gcc6-concepts.hh>
32#include <seastar/util/noncopyable_function.hh>
33#include <seastar/util/tuple_utils.hh>
34#include <seastar/util/defer.hh>
35#include <seastar/util/std-compat.hh>
36#include <fmt/format.h>
9f95a23c 37#include <fmt/ostream.h>
11fdf7f2
TL
38#include <vector>
39#include <boost/range/irange.hpp>
40#include <boost/range/adaptor/transformed.hpp>
41
42namespace seastar {
43
44/// \defgroup execution-stages Execution Stages
45///
46/// \brief
47/// Execution stages provide an infrastructure for processing function calls in
48/// batches in order to improve instruction cache locality.
49///
50/// When the application logic becomes more and more complex and the length
51/// of the data processing pipeline grows it may happen that the most
52/// significant bottleneck are instruction cache misses. The solution for that
53/// problem may be processing similar operations in batches so that instruction
54/// cache locality is improved at the cost of potentially higher latencies and
55/// worse data cache locality.
56///
57/// Execution stages allow batching calls to the specified function object.
58/// Every time concrete_execution_stage::operator()() is used the function call
59/// is added to the queue and a future is returned. Once the number of queued
60/// calls reaches certain threshold the stage is flushed and a task is which
61/// would execute these function calls is scheduled. Execution stages are also
62/// flushed when the reactor polls for events.
63///
64/// When calling a function that is wrapped inside execution stage it is
65/// important to remember that the actual function call will happen at some
66/// later time and it has to be guaranteed the objects passed by lvalue
67/// reference are still alive. In order to avoid accidental passing of a
68/// temporary object by lvalue reference the interface of execution stages
69/// accepts only lvalue references wrapped in reference_wrapper. It is safe to
70/// pass rvalue references, they are decayed and the objects are moved. See
71/// concrete_execution_stage::operator()() for more details.
72
73/// \addtogroup execution-stages
74/// @{
75
76/// \cond internal
77namespace internal {
78
79// Execution wraps lreferences in reference_wrapper so that the caller is forced
80// to use seastar::ref(). Then when the function is actually called the
81// reference is unwrapped. However, we need to distinguish between functions
82// which argument is lvalue reference and functions that take
83// reference_wrapper<> as an argument and not unwrap the latter. To solve this
84// issue reference_wrapper_for_es type is used for wrappings done automatically
85// by execution stage.
86template<typename T>
87struct reference_wrapper_for_es : reference_wrapper<T> {
88 reference_wrapper_for_es(reference_wrapper <T> rw) noexcept
89 : reference_wrapper<T>(std::move(rw)) {}
90};
91
92template<typename T>
93struct wrap_for_es {
94 using type = T;
95};
96
97template<typename T>
98struct wrap_for_es<T&> {
99 using type = reference_wrapper_for_es<T>;
100};
101
102template<typename T>
103struct wrap_for_es<T&&> {
104 using type = T;
105};
106
107template<typename T>
108decltype(auto) unwrap_for_es(T&& object) {
109 return std::forward<T>(object);
110}
111
112template<typename T>
113std::reference_wrapper<T> unwrap_for_es(reference_wrapper_for_es<T> ref) {
114 return std::reference_wrapper<T>(ref.get());
115}
116
117}
118/// \endcond
119
120/// Base execution stage class
121class execution_stage {
122public:
123 struct stats {
124 uint64_t tasks_scheduled = 0;
125 uint64_t tasks_preempted = 0;
126 uint64_t function_calls_enqueued = 0;
127 uint64_t function_calls_executed = 0;
128 };
129protected:
130 bool _empty = true;
131 bool _flush_scheduled = false;
132 scheduling_group _sg;
133 stats _stats;
134 sstring _name;
135 metrics::metric_group _metric_group;
136protected:
137 virtual void do_flush() noexcept = 0;
138public:
139 explicit execution_stage(const sstring& name, scheduling_group sg = {});
140 virtual ~execution_stage();
141
142 execution_stage(const execution_stage&) = delete;
143
144 /// Move constructor
145 ///
146 /// \warning It is illegal to move execution_stage after any operation has
147 /// been pushed to it. The only reason why the move constructor is not
148 /// deleted is the fact that C++14 does not guarantee return value
149 /// optimisation which is required by make_execution_stage().
150 execution_stage(execution_stage&&);
151
152 /// Returns execution stage name
153 const sstring& name() const noexcept { return _name; }
154
155 /// Returns execution stage usage statistics
156 const stats& get_stats() const noexcept { return _stats; }
157
158 /// Flushes execution stage
159 ///
160 /// Ensures that a task which would execute all queued operations is
161 /// scheduled. Does not schedule a new task if there is one already pending
162 /// or the queue is empty.
163 ///
164 /// \return true if a new task has been scheduled
165 bool flush() noexcept;
166
167 /// Checks whether there are pending operations.
168 ///
169 /// \return true if there is at least one queued operation
170 bool poll() const noexcept {
171 return !_empty;
172 }
173};
174
175/// \cond internal
176namespace internal {
177
178class execution_stage_manager {
179 std::vector<execution_stage*> _execution_stages;
180 std::unordered_map<sstring, execution_stage*> _stages_by_name;
181private:
182 execution_stage_manager() = default;
183 execution_stage_manager(const execution_stage_manager&) = delete;
184 execution_stage_manager(execution_stage_manager&&) = delete;
185public:
186 void register_execution_stage(execution_stage& stage);
187 void unregister_execution_stage(execution_stage& stage) noexcept;
188 void update_execution_stage_registration(execution_stage& old_es, execution_stage& new_es) noexcept;
189 execution_stage* get_stage(const sstring& name);
190 bool flush() noexcept;
191 bool poll() const noexcept;
192public:
193 static execution_stage_manager& get() noexcept;
194};
195
196}
197/// \endcond
198
199/// \brief Concrete execution stage class
200///
201/// \note The recommended way of creating execution stages is to use
202/// make_execution_stage().
203///
204/// \tparam ReturnType return type of the function object
205/// \tparam Args argument pack containing arguments to the function object, needs
206/// to have move constructor that doesn't throw
207template<typename ReturnType, typename... Args>
208GCC6_CONCEPT(requires std::is_nothrow_move_constructible<std::tuple<Args...>>::value)
209class concrete_execution_stage final : public execution_stage {
210 using args_tuple = std::tuple<Args...>;
211 static_assert(std::is_nothrow_move_constructible<args_tuple>::value,
212 "Function arguments need to be nothrow move constructible");
213
214 static constexpr size_t flush_threshold = 128;
9f95a23c 215 static constexpr size_t max_queue_length = 1024;
11fdf7f2
TL
216
217 using return_type = futurize_t<ReturnType>;
218 using promise_type = typename return_type::promise_type;
219 using input_type = typename tuple_map_types<internal::wrap_for_es, args_tuple>::type;
220
221 struct work_item {
222 input_type _in;
223 promise_type _ready;
224
225 work_item(typename internal::wrap_for_es<Args>::type... args) : _in(std::move(args)...) { }
226
227 work_item(work_item&& other) = delete;
228 work_item(const work_item&) = delete;
229 work_item(work_item&) = delete;
230 };
231 chunked_fifo<work_item, flush_threshold> _queue;
232
233 noncopyable_function<ReturnType (Args...)> _function;
234private:
235 auto unwrap(input_type&& in) {
236 return tuple_map(std::move(in), [] (auto&& obj) {
237 return internal::unwrap_for_es(std::forward<decltype(obj)>(obj));
238 });
239 }
240
241 virtual void do_flush() noexcept override {
242 while (!_queue.empty()) {
243 auto& wi = _queue.front();
9f95a23c
TL
244 auto wi_in = std::move(wi._in);
245 auto wi_ready = std::move(wi._ready);
11fdf7f2 246 _queue.pop_front();
9f95a23c 247 futurize<ReturnType>::apply(_function, unwrap(std::move(wi_in))).forward_to(std::move(wi_ready));
11fdf7f2
TL
248 _stats.function_calls_executed++;
249
250 if (need_preempt()) {
251 _stats.tasks_preempted++;
252 break;
253 }
254 }
255 _empty = _queue.empty();
256 }
257public:
258 explicit concrete_execution_stage(const sstring& name, scheduling_group sg, noncopyable_function<ReturnType (Args...)> f)
259 : execution_stage(name, sg)
260 , _function(std::move(f))
261 {
262 _queue.reserve(flush_threshold);
263 }
264 explicit concrete_execution_stage(const sstring& name, noncopyable_function<ReturnType (Args...)> f)
265 : concrete_execution_stage(name, scheduling_group(), std::move(f)) {
266 }
267
268 /// Enqueues a call to the stage's function
269 ///
270 /// Adds a function call to the queue. Objects passed by value are moved,
271 /// rvalue references are decayed and the objects are moved, lvalue
272 /// references need to be explicitly wrapped using seastar::ref().
273 ///
274 /// Usage example:
275 /// ```
276 /// void do_something(int&, int, std::vector<int>&&);
277 /// thread_local auto stage = seastar::make_execution_stage("execution-stage", do_something);
278 ///
279 /// int global_value;
280 ///
281 /// future<> func(std::vector<int> vec) {
282 /// //return stage(global_value, 42, std::move(vec)); // fail: use seastar::ref to pass references
283 /// return stage(seastar::ref(global_value), 42, std::move(vec)); // ok
284 /// }
285 /// ```
286 ///
287 /// \param args arguments passed to the stage's function
288 /// \return future containing the result of the call to the stage's function
289 return_type operator()(typename internal::wrap_for_es<Args>::type... args) {
9f95a23c
TL
290 if (_queue.size() >= max_queue_length) {
291 do_flush();
292 }
11fdf7f2
TL
293 _queue.emplace_back(std::move(args)...);
294 _empty = false;
295 _stats.function_calls_enqueued++;
296 auto f = _queue.back()._ready.get_future();
297 flush();
298 return f;
299 }
300};
301
302
303/// \brief Concrete execution stage class, with support for automatic \ref scheduling_group inheritance
304///
305/// A variation of \ref concrete_execution_stage that inherits the \ref scheduling_group
306/// from the caller. Each call (of `operator()`) can be in its own scheduling group.
307///
308/// \tparam ReturnType return type of the function object
309/// \tparam Args argument pack containing arguments to the function object, needs
310/// to have move constructor that doesn't throw
311template<typename ReturnType, typename... Args>
312GCC6_CONCEPT(requires std::is_nothrow_move_constructible<std::tuple<Args...>>::value)
313class inheriting_concrete_execution_stage final {
314 using return_type = futurize_t<ReturnType>;
315 using args_tuple = std::tuple<Args...>;
316 using per_group_stage_type = concrete_execution_stage<ReturnType, Args...>;
317
318 static_assert(std::is_nothrow_move_constructible<args_tuple>::value,
319 "Function arguments need to be nothrow move constructible");
320
321 sstring _name;
322 noncopyable_function<ReturnType (Args...)> _function;
323 std::vector<compat::optional<per_group_stage_type>> _stage_for_group{max_scheduling_groups()};
324private:
325 per_group_stage_type make_stage_for_group(scheduling_group sg) {
326 // We can't use std::ref(function), because reference_wrapper decays to noncopyable_function& and
327 // that selects the noncopyable_function copy constructor. Use a lambda instead.
328 auto wrapped_function = [&_function = _function] (Args... args) {
329 return _function(std::forward<Args>(args)...);
330 };
331 auto name = fmt::format("{}.{}", _name, sg.name());
332 return per_group_stage_type(name, sg, wrapped_function);
333 }
334public:
335 /// Construct an inheriting concrete execution stage.
336 ///
337 /// \param name A name for the execution stage; must be unique
338 /// \param f Function to be called in response to operator(). The function
339 /// call will be deferred and batched with similar calls to increase
340 /// instruction cache hit rate.
341 inheriting_concrete_execution_stage(const sstring& name, noncopyable_function<ReturnType (Args...)> f)
342 : _name(std::move(name)),_function(std::move(f)) {
343 }
344
345 /// Enqueues a call to the stage's function
346 ///
347 /// Adds a function call to the queue. Objects passed by value are moved,
348 /// rvalue references are decayed and the objects are moved, lvalue
349 /// references need to be explicitly wrapped using seastar::ref().
350 ///
351 /// The caller's \ref scheduling_group will be preserved across the call.
352 ///
353 /// Usage example:
354 /// ```
355 /// void do_something(int);
356 /// thread_local auto stage = seastar::inheriting_concrete_execution_stage<int>("execution-stage", do_something);
357 ///
358 /// future<> func(int x) {
359 /// return stage(x);
360 /// }
361 /// ```
362 ///
363 /// \param args arguments passed to the stage's function
364 /// \return future containing the result of the call to the stage's function
365 return_type operator()(typename internal::wrap_for_es<Args>::type... args) {
366 auto sg = current_scheduling_group();
367 auto sg_id = internal::scheduling_group_index(sg);
368 auto& slot = _stage_for_group[sg_id];
369 if (!slot) {
370 slot.emplace(make_stage_for_group(sg));
371 }
372 return (*slot)(std::move(args)...);
373 }
374};
375
376
377/// \cond internal
378namespace internal {
379
380template <typename Ret, typename ArgsTuple>
381struct concrete_execution_stage_helper;
382
383template <typename Ret, typename... Args>
384struct concrete_execution_stage_helper<Ret, std::tuple<Args...>> {
385 using type = concrete_execution_stage<Ret, Args...>;
386};
387
388}
389/// \endcond
390
391/// Creates a new execution stage
392///
393/// Wraps given function object in a concrete_execution_stage. All arguments
394/// of the function object are required to have move constructors that do not
395/// throw. Function object may return a future or an immediate object or void.
396///
397/// Moving execution stages is discouraged and illegal after first function
398/// call is enqueued.
399///
400/// Usage example:
401/// ```
402/// double do_something(int);
403/// thread_local auto stage1 = seastar::make_execution_stage("execution-stage1", do_something);
404///
405/// future<double> func1(int val) {
406/// return stage1(val);
407/// }
408///
409/// future<double> do_some_io(int);
410/// thread_local auto stage2 = seastar::make_execution_stage("execution-stage2", do_some_io);
411///
412/// future<double> func2(int val) {
413/// return stage2(val);
414/// }
415/// ```
416///
417/// \param name unique name of the execution stage
418/// \param sg scheduling group to run under
419/// \param fn function to be executed by the stage
420/// \return concrete_execution_stage
421///
422template<typename Function>
423auto make_execution_stage(const sstring& name, scheduling_group sg, Function&& fn) {
424 using traits = function_traits<Function>;
425 using ret_type = typename traits::return_type;
426 using args_as_tuple = typename traits::args_as_tuple;
427 using concrete_execution_stage = typename internal::concrete_execution_stage_helper<ret_type, args_as_tuple>::type;
428 return concrete_execution_stage(name, sg, std::forward<Function>(fn));
429}
430
431/// Creates a new execution stage (variant taking \ref scheduling_group)
432///
433/// Wraps given function object in a concrete_execution_stage. All arguments
434/// of the function object are required to have move constructors that do not
435/// throw. Function object may return a future or an immediate object or void.
436///
437/// Moving execution stages is discouraged and illegal after first function
438/// call is enqueued.
439///
440/// Usage example:
441/// ```
442/// double do_something(int);
443/// thread_local auto stage1 = seastar::make_execution_stage("execution-stage1", do_something);
444///
445/// future<double> func1(int val) {
446/// return stage1(val);
447/// }
448///
449/// future<double> do_some_io(int);
450/// thread_local auto stage2 = seastar::make_execution_stage("execution-stage2", do_some_io);
451///
452/// future<double> func2(int val) {
453/// return stage2(val);
454/// }
455/// ```
456///
457/// \param name unique name of the execution stage (variant not taking \ref scheduling_group)
458/// \param fn function to be executed by the stage
459/// \return concrete_execution_stage
460///
461template<typename Function>
462auto make_execution_stage(const sstring& name, Function&& fn) {
463 return make_execution_stage(name, scheduling_group(), std::forward<Function>(fn));
464}
465
466/// Creates a new execution stage from a member function
467///
468/// Wraps a pointer to member function in a concrete_execution_stage. When
469/// a function call is pushed to the stage the first argument should be a
470/// pointer to the object the function is a member of.
471///
472/// Usage example:
473/// ```
474/// struct foo {
475/// void do_something(int);
476/// };
477///
478/// thread_local auto stage = seastar::make_execution_stage("execution-stage", &foo::do_something);
479///
480/// future<> func(foo& obj, int val) {
481/// return stage(&obj, val);
482/// }
483/// ```
484///
485/// \see make_execution_stage(const sstring&, Function&&)
486/// \param name unique name of the execution stage
487/// \param fn member function to be executed by the stage
488/// \return concrete_execution_stage
489template<typename Ret, typename Object, typename... Args>
490concrete_execution_stage<Ret, Object*, Args...>
491make_execution_stage(const sstring& name, scheduling_group sg, Ret (Object::*fn)(Args...)) {
492 return concrete_execution_stage<Ret, Object*, Args...>(name, sg, std::mem_fn(fn));
493}
494
495template<typename Ret, typename Object, typename... Args>
496concrete_execution_stage<Ret, const Object*, Args...>
497make_execution_stage(const sstring& name, scheduling_group sg, Ret (Object::*fn)(Args...) const) {
498 return concrete_execution_stage<Ret, const Object*, Args...>(name, sg, std::mem_fn(fn));
499}
500
501template<typename Ret, typename Object, typename... Args>
502concrete_execution_stage<Ret, Object*, Args...>
503make_execution_stage(const sstring& name, Ret (Object::*fn)(Args...)) {
504 return make_execution_stage(name, scheduling_group(), fn);
505}
506
507template<typename Ret, typename Object, typename... Args>
508concrete_execution_stage<Ret, const Object*, Args...>
509make_execution_stage(const sstring& name, Ret (Object::*fn)(Args...) const) {
510 return make_execution_stage(name, scheduling_group(), fn);
511}
512
513/// @}
514
515}