2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
19 * Copyright (C) 2018 ScyllaDB Ltd.
22 #include <seastar/core/execution_stage.hh>
23 #include <seastar/core/print.hh>
24 #include <seastar/core/make_task.hh>
30 void execution_stage_manager::register_execution_stage(execution_stage
& stage
) {
31 auto ret
= _stages_by_name
.emplace(stage
.name(), &stage
);
33 throw std::invalid_argument(format("Execution stage {} already exists.", stage
.name()));
36 _execution_stages
.push_back(&stage
);
38 _stages_by_name
.erase(stage
.name());
43 void execution_stage_manager::unregister_execution_stage(execution_stage
& stage
) noexcept
{
44 auto it
= std::find(_execution_stages
.begin(), _execution_stages
.end(), &stage
);
45 if (it
== _execution_stages
.end()) {
46 return; // was changed by update_execution_stage_registration
48 _execution_stages
.erase(it
);
49 _stages_by_name
.erase(stage
.name());
52 void execution_stage_manager::update_execution_stage_registration(execution_stage
& old_es
, execution_stage
& new_es
) noexcept
{
53 auto it
= std::find(_execution_stages
.begin(), _execution_stages
.end(), &old_es
);
55 _stages_by_name
.find(new_es
.name())->second
= &new_es
;
58 execution_stage
* execution_stage_manager::get_stage(const sstring
& name
) {
59 return _stages_by_name
[name
];
62 bool execution_stage_manager::flush() noexcept
{
63 bool did_work
= false;
64 for (auto&& stage
: _execution_stages
) {
65 did_work
|= stage
->flush();
70 bool execution_stage_manager::poll() const noexcept
{
71 for (auto&& stage
: _execution_stages
) {
79 execution_stage_manager
& execution_stage_manager::get() noexcept
{
80 static thread_local execution_stage_manager instance
;
86 execution_stage::~execution_stage()
88 internal::execution_stage_manager::get().unregister_execution_stage(*this);
91 execution_stage::execution_stage(execution_stage
&& other
)
93 , _stats(other
._stats
)
94 , _name(std::move(other
._name
))
95 , _metric_group(std::move(other
._metric_group
))
97 internal::execution_stage_manager::get().update_execution_stage_registration(other
, *this);
100 execution_stage::execution_stage(const sstring
& name
, scheduling_group sg
)
104 internal::execution_stage_manager::get().register_execution_stage(*this);
105 auto undo
= defer([&] { internal::execution_stage_manager::get().unregister_execution_stage(*this); });
106 _metric_group
= metrics::metric_group("execution_stages", {
107 metrics::make_derive("tasks_scheduled",
108 metrics::description("Counts tasks scheduled by execution stages"),
109 { metrics::label_instance("execution_stage", name
), },
110 [name
, &esm
= internal::execution_stage_manager::get()] {
111 return esm
.get_stage(name
)->get_stats().tasks_scheduled
;
113 metrics::make_derive("tasks_preempted",
114 metrics::description("Counts tasks which were preempted before execution all queued operations"),
115 { metrics::label_instance("execution_stage", name
), },
116 [name
, &esm
= internal::execution_stage_manager::get()] {
117 return esm
.get_stage(name
)->get_stats().tasks_preempted
;
119 metrics::make_derive("function_calls_enqueued",
120 metrics::description("Counts function calls added to execution stages queues"),
121 { metrics::label_instance("execution_stage", name
), },
122 [name
, &esm
= internal::execution_stage_manager::get()] {
123 return esm
.get_stage(name
)->get_stats().function_calls_enqueued
;
125 metrics::make_derive("function_calls_executed",
126 metrics::description("Counts function calls executed by execution stages"),
127 { metrics::label_instance("execution_stage", name
), },
128 [name
, &esm
= internal::execution_stage_manager::get()] {
129 return esm
.get_stage(name
)->get_stats().function_calls_executed
;
135 bool execution_stage::flush() noexcept
{
136 if (_empty
|| _flush_scheduled
) {
139 _stats
.tasks_scheduled
++;
140 schedule(make_task(_sg
, [this] {
142 _flush_scheduled
= false;
144 _flush_scheduled
= true;