2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
19 * Copyright (C) 2017 ScyllaDB Ltd.
24 #include <seastar/core/future.hh>
25 #include <seastar/core/chunked_fifo.hh>
26 #include <seastar/core/function_traits.hh>
27 #include <seastar/core/sstring.hh>
28 #include <seastar/core/metrics.hh>
29 #include <seastar/core/scheduling.hh>
30 #include <seastar/util/reference_wrapper.hh>
31 #include <seastar/util/concepts.hh>
32 #include <seastar/util/noncopyable_function.hh>
33 #include <seastar/util/tuple_utils.hh>
34 #include <seastar/util/defer.hh>
35 #include <seastar/util/std-compat.hh>
36 #include <fmt/format.h>
37 #include <fmt/ostream.h>
39 #include <boost/range/irange.hpp>
40 #include <boost/range/adaptor/transformed.hpp>
41 #include <boost/container/static_vector.hpp>
45 /// \defgroup execution-stages Execution Stages
48 /// Execution stages provide an infrastructure for processing function calls in
49 /// batches in order to improve instruction cache locality.
51 /// When the application logic becomes more and more complex and the length
52 /// of the data processing pipeline grows it may happen that the most
53 /// significant bottleneck are instruction cache misses. The solution for that
54 /// problem may be processing similar operations in batches so that instruction
55 /// cache locality is improved at the cost of potentially higher latencies and
56 /// worse data cache locality.
58 /// Execution stages allow batching calls to the specified function object.
59 /// Every time concrete_execution_stage::operator()() is used the function call
60 /// is added to the queue and a future is returned. Once the number of queued
61 /// calls reaches certain threshold the stage is flushed and a task is which
62 /// would execute these function calls is scheduled. Execution stages are also
63 /// flushed when the reactor polls for events.
65 /// When calling a function that is wrapped inside execution stage it is
66 /// important to remember that the actual function call will happen at some
67 /// later time and it has to be guaranteed the objects passed by lvalue
68 /// reference are still alive. In order to avoid accidental passing of a
69 /// temporary object by lvalue reference the interface of execution stages
70 /// accepts only lvalue references wrapped in reference_wrapper. It is safe to
71 /// pass rvalue references, they are decayed and the objects are moved. See
72 /// concrete_execution_stage::operator()() for more details.
74 /// \addtogroup execution-stages
80 // Execution wraps lreferences in reference_wrapper so that the caller is forced
81 // to use seastar::ref(). Then when the function is actually called the
82 // reference is unwrapped. However, we need to distinguish between functions
83 // which argument is lvalue reference and functions that take
84 // reference_wrapper<> as an argument and not unwrap the latter. To solve this
85 // issue reference_wrapper_for_es type is used for wrappings done automatically
86 // by execution stage.
88 struct reference_wrapper_for_es : reference_wrapper<T> {
89 reference_wrapper_for_es(reference_wrapper <T> rw) noexcept
90 : reference_wrapper<T>(std::move(rw)) {}
99 struct wrap_for_es<T&> {
100 using type = reference_wrapper_for_es<T>;
104 struct wrap_for_es<T&&> {
109 decltype(auto) unwrap_for_es(T&& object) {
110 return std::forward<T>(object);
114 std::reference_wrapper<T> unwrap_for_es(reference_wrapper_for_es<T> ref) {
115 return std::reference_wrapper<T>(ref.get());
121 /// Base execution stage class
122 class execution_stage {
125 uint64_t tasks_scheduled = 0;
126 uint64_t tasks_preempted = 0;
127 uint64_t function_calls_enqueued = 0;
128 uint64_t function_calls_executed = 0;
132 bool _flush_scheduled = false;
133 scheduling_group _sg;
136 metrics::metric_group _metric_group;
138 virtual void do_flush() noexcept = 0;
140 explicit execution_stage(const sstring& name, scheduling_group sg = {});
141 virtual ~execution_stage();
143 execution_stage(const execution_stage&) = delete;
147 /// \warning It is illegal to move execution_stage after any operation has
148 /// been pushed to it. The only reason why the move constructor is not
149 /// deleted is the fact that C++14 does not guarantee return value
150 /// optimisation which is required by make_execution_stage().
151 execution_stage(execution_stage&&);
153 /// Returns execution stage name
154 const sstring& name() const noexcept { return _name; }
156 /// Returns execution stage usage statistics
157 const stats& get_stats() const noexcept { return _stats; }
159 /// Flushes execution stage
161 /// Ensures that a task which would execute all queued operations is
162 /// scheduled. Does not schedule a new task if there is one already pending
163 /// or the queue is empty.
165 /// \return true if a new task has been scheduled
166 bool flush() noexcept;
168 /// Checks whether there are pending operations.
170 /// \return true if there is at least one queued operation
171 bool poll() const noexcept {
179 class execution_stage_manager {
180 std::vector<execution_stage*> _execution_stages;
181 std::unordered_map<sstring, execution_stage*> _stages_by_name;
183 execution_stage_manager() = default;
184 execution_stage_manager(const execution_stage_manager&) = delete;
185 execution_stage_manager(execution_stage_manager&&) = delete;
187 void register_execution_stage(execution_stage& stage);
188 void unregister_execution_stage(execution_stage& stage) noexcept;
189 void update_execution_stage_registration(execution_stage& old_es, execution_stage& new_es) noexcept;
190 execution_stage* get_stage(const sstring& name);
191 bool flush() noexcept;
192 bool poll() const noexcept;
194 static execution_stage_manager& get() noexcept;
200 /// \brief Concrete execution stage class
202 /// \note The recommended way of creating execution stages is to use
203 /// make_execution_stage().
205 /// \tparam ReturnType return type of the function object
206 /// \tparam Args argument pack containing arguments to the function object, needs
207 /// to have move constructor that doesn't throw
208 template<typename ReturnType, typename... Args>
209 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible<std::tuple<Args...>>::value)
210 class concrete_execution_stage final : public execution_stage {
211 using args_tuple = std::tuple<Args...>;
212 static_assert(std::is_nothrow_move_constructible<args_tuple>::value,
213 "Function arguments need to be nothrow move constructible");
215 static constexpr size_t flush_threshold = 128;
216 static constexpr size_t max_queue_length = 1024;
218 using return_type = futurize_t<ReturnType>;
219 using promise_type = typename return_type::promise_type;
220 using input_type = typename tuple_map_types<internal::wrap_for_es, args_tuple>::type;
226 work_item(typename internal::wrap_for_es<Args>::type... args) : _in(std::move(args)...) { }
228 work_item(work_item&& other) = delete;
229 work_item(const work_item&) = delete;
230 work_item(work_item&) = delete;
232 chunked_fifo<work_item, flush_threshold> _queue;
234 noncopyable_function<ReturnType (Args...)> _function;
236 auto unwrap(input_type&& in) {
237 return tuple_map(std::move(in), [] (auto&& obj) {
238 return internal::unwrap_for_es(std::forward<decltype(obj)>(obj));
242 virtual void do_flush() noexcept override {
243 while (!_queue.empty()) {
244 auto& wi = _queue.front();
245 auto wi_in = std::move(wi._in);
246 auto wi_ready = std::move(wi._ready);
248 futurize<ReturnType>::apply(_function, unwrap(std::move(wi_in))).forward_to(std::move(wi_ready));
249 _stats.function_calls_executed++;
251 if (need_preempt()) {
252 _stats.tasks_preempted++;
256 _empty = _queue.empty();
259 explicit concrete_execution_stage(const sstring& name, scheduling_group sg, noncopyable_function<ReturnType (Args...)> f)
260 : execution_stage(name, sg)
261 , _function(std::move(f))
263 _queue.reserve(flush_threshold);
265 explicit concrete_execution_stage(const sstring& name, noncopyable_function<ReturnType (Args...)> f)
266 : concrete_execution_stage(name, scheduling_group(), std::move(f)) {
269 /// Enqueues a call to the stage's function
271 /// Adds a function call to the queue. Objects passed by value are moved,
272 /// rvalue references are decayed and the objects are moved, lvalue
273 /// references need to be explicitly wrapped using seastar::ref().
277 /// void do_something(int&, int, std::vector<int>&&);
278 /// thread_local auto stage = seastar::make_execution_stage("execution-stage", do_something);
280 /// int global_value;
282 /// future<> func(std::vector<int> vec) {
283 /// //return stage(global_value, 42, std::move(vec)); // fail: use seastar::ref to pass references
284 /// return stage(seastar::ref(global_value), 42, std::move(vec)); // ok
288 /// \param args arguments passed to the stage's function
289 /// \return future containing the result of the call to the stage's function
290 return_type operator()(typename internal::wrap_for_es<Args>::type... args) {
291 if (_queue.size() >= max_queue_length) {
294 _queue.emplace_back(std::move(args)...);
296 _stats.function_calls_enqueued++;
297 auto f = _queue.back()._ready.get_future();
303 /// \brief Base class for execution stages with support for automatic \ref scheduling_group inheritance
304 class inheriting_execution_stage {
306 struct per_scheduling_group_stats {
308 execution_stage::stats stats;
310 using stats = boost::container::static_vector<per_scheduling_group_stats, max_scheduling_groups()>;
313 /// \brief Concrete execution stage class, with support for automatic \ref scheduling_group inheritance
315 /// A variation of \ref concrete_execution_stage that inherits the \ref scheduling_group
316 /// from the caller. Each call (of `operator()`) can be in its own scheduling group.
318 /// \tparam ReturnType return type of the function object
319 /// \tparam Args argument pack containing arguments to the function object, needs
320 /// to have move constructor that doesn't throw
321 template<typename ReturnType, typename... Args>
322 SEASTAR_CONCEPT(requires std::is_nothrow_move_constructible<std::tuple<Args...>>::value)
323 class inheriting_concrete_execution_stage final : public inheriting_execution_stage {
324 using return_type = futurize_t<ReturnType>;
325 using args_tuple = std::tuple<Args...>;
326 using per_group_stage_type = concrete_execution_stage<ReturnType, Args...>;
328 static_assert(std::is_nothrow_move_constructible<args_tuple>::value,
329 "Function arguments need to be nothrow move constructible");
332 noncopyable_function<ReturnType (Args...)> _function;
333 std::vector<std::optional<per_group_stage_type>> _stage_for_group{max_scheduling_groups()};
335 per_group_stage_type make_stage_for_group(scheduling_group sg) {
336 // We can't use std::ref(function), because reference_wrapper decays to noncopyable_function& and
337 // that selects the noncopyable_function copy constructor. Use a lambda instead.
338 auto wrapped_function = [&_function = _function] (Args... args) {
339 return _function(std::forward<Args>(args)...);
341 auto name = fmt::format("{}.{}", _name, sg.name());
342 return per_group_stage_type(name, sg, wrapped_function);
345 /// Construct an inheriting concrete execution stage.
347 /// \param name A name for the execution stage; must be unique
348 /// \param f Function to be called in response to operator(). The function
349 /// call will be deferred and batched with similar calls to increase
350 /// instruction cache hit rate.
351 inheriting_concrete_execution_stage(const sstring& name, noncopyable_function<ReturnType (Args...)> f)
352 : _name(std::move(name)),_function(std::move(f)) {
355 /// Enqueues a call to the stage's function
357 /// Adds a function call to the queue. Objects passed by value are moved,
358 /// rvalue references are decayed and the objects are moved, lvalue
359 /// references need to be explicitly wrapped using seastar::ref().
361 /// The caller's \ref scheduling_group will be preserved across the call.
365 /// void do_something(int);
366 /// thread_local auto stage = seastar::inheriting_concrete_execution_stage<int>("execution-stage", do_something);
368 /// future<> func(int x) {
373 /// \param args arguments passed to the stage's function
374 /// \return future containing the result of the call to the stage's function
375 return_type operator()(typename internal::wrap_for_es<Args>::type... args) {
376 auto sg = current_scheduling_group();
377 auto sg_id = internal::scheduling_group_index(sg);
378 auto& slot = _stage_for_group[sg_id];
380 slot.emplace(make_stage_for_group(sg));
382 return (*slot)(std::move(args)...);
385 /// Returns summary of individual execution stage usage statistics
387 /// \returns a vector of the stats of the individual per-scheduling group
388 /// executation stages. Each element in the vector is a pair composed of
389 /// the scheduling group and the stats for the respective execution
390 /// stage. Scheduling groups that have had no respective calls enqueued
392 inheriting_execution_stage::stats get_stats() const noexcept {
393 inheriting_execution_stage::stats summary;
394 for (unsigned sg_id = 0; sg_id != _stage_for_group.size(); ++sg_id) {
395 auto sg = internal::scheduling_group_from_index(sg_id);
396 if (_stage_for_group[sg_id]) {
397 summary.push_back({sg, _stage_for_group[sg_id]->get_stats()});
408 template <typename Ret, typename ArgsTuple>
409 struct concrete_execution_stage_helper;
411 template <typename Ret, typename... Args>
412 struct concrete_execution_stage_helper<Ret, std::tuple<Args...>> {
413 using type = concrete_execution_stage<Ret, Args...>;
419 /// Creates a new execution stage
421 /// Wraps given function object in a concrete_execution_stage. All arguments
422 /// of the function object are required to have move constructors that do not
423 /// throw. Function object may return a future or an immediate object or void.
425 /// Moving execution stages is discouraged and illegal after first function
426 /// call is enqueued.
430 /// double do_something(int);
431 /// thread_local auto stage1 = seastar::make_execution_stage("execution-stage1", do_something);
433 /// future<double> func1(int val) {
434 /// return stage1(val);
437 /// future<double> do_some_io(int);
438 /// thread_local auto stage2 = seastar::make_execution_stage("execution-stage2", do_some_io);
440 /// future<double> func2(int val) {
441 /// return stage2(val);
445 /// \param name unique name of the execution stage
446 /// \param sg scheduling group to run under
447 /// \param fn function to be executed by the stage
448 /// \return concrete_execution_stage
450 template<typename Function>
451 auto make_execution_stage(const sstring& name, scheduling_group sg, Function&& fn) {
452 using traits = function_traits<Function>;
453 using ret_type = typename traits::return_type;
454 using args_as_tuple = typename traits::args_as_tuple;
455 using concrete_execution_stage = typename internal::concrete_execution_stage_helper<ret_type, args_as_tuple>::type;
456 return concrete_execution_stage(name, sg, std::forward<Function>(fn));
459 /// Creates a new execution stage (variant taking \ref scheduling_group)
461 /// Wraps given function object in a concrete_execution_stage. All arguments
462 /// of the function object are required to have move constructors that do not
463 /// throw. Function object may return a future or an immediate object or void.
465 /// Moving execution stages is discouraged and illegal after first function
466 /// call is enqueued.
470 /// double do_something(int);
471 /// thread_local auto stage1 = seastar::make_execution_stage("execution-stage1", do_something);
473 /// future<double> func1(int val) {
474 /// return stage1(val);
477 /// future<double> do_some_io(int);
478 /// thread_local auto stage2 = seastar::make_execution_stage("execution-stage2", do_some_io);
480 /// future<double> func2(int val) {
481 /// return stage2(val);
485 /// \param name unique name of the execution stage (variant not taking \ref scheduling_group)
486 /// \param fn function to be executed by the stage
487 /// \return concrete_execution_stage
489 template<typename Function>
490 auto make_execution_stage(const sstring& name, Function&& fn) {
491 return make_execution_stage(name, scheduling_group(), std::forward<Function>(fn));
494 /// Creates a new execution stage from a member function
496 /// Wraps a pointer to member function in a concrete_execution_stage. When
497 /// a function call is pushed to the stage the first argument should be a
498 /// pointer to the object the function is a member of.
503 /// void do_something(int);
506 /// thread_local auto stage = seastar::make_execution_stage("execution-stage", &foo::do_something);
508 /// future<> func(foo& obj, int val) {
509 /// return stage(&obj, val);
513 /// \see make_execution_stage(const sstring&, Function&&)
514 /// \param name unique name of the execution stage
515 /// \param fn member function to be executed by the stage
516 /// \return concrete_execution_stage
517 template<typename Ret, typename Object, typename... Args>
518 concrete_execution_stage<Ret, Object*, Args...>
519 make_execution_stage(const sstring& name, scheduling_group sg, Ret (Object::*fn)(Args...)) {
520 return concrete_execution_stage<Ret, Object*, Args...>(name, sg, std::mem_fn(fn));
523 template<typename Ret, typename Object, typename... Args>
524 concrete_execution_stage<Ret, const Object*, Args...>
525 make_execution_stage(const sstring& name, scheduling_group sg, Ret (Object::*fn)(Args...) const) {
526 return concrete_execution_stage<Ret, const Object*, Args...>(name, sg, std::mem_fn(fn));
529 template<typename Ret, typename Object, typename... Args>
530 concrete_execution_stage<Ret, Object*, Args...>
531 make_execution_stage(const sstring& name, Ret (Object::*fn)(Args...)) {
532 return make_execution_stage(name, scheduling_group(), fn);
535 template<typename Ret, typename Object, typename... Args>
536 concrete_execution_stage<Ret, const Object*, Args...>
537 make_execution_stage(const sstring& name, Ret (Object::*fn)(Args...) const) {
538 return make_execution_stage(name, scheduling_group(), fn);