]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/src/core/execution_stage.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / seastar / src / core / execution_stage.cc
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18 /*
19 * Copyright (C) 2018 ScyllaDB Ltd.
20 */
21
22 #include <seastar/core/execution_stage.hh>
23 #include <seastar/core/print.hh>
24 #include <seastar/core/make_task.hh>
25
26 namespace seastar {
27
28 namespace internal {
29
30 void execution_stage_manager::register_execution_stage(execution_stage& stage) {
31 auto ret = _stages_by_name.emplace(stage.name(), &stage);
32 if (!ret.second) {
33 throw std::invalid_argument(format("Execution stage {} already exists.", stage.name()));
34 }
35 try {
36 _execution_stages.push_back(&stage);
37 } catch (...) {
38 _stages_by_name.erase(stage.name());
39 throw;
40 }
41 }
42
43 void execution_stage_manager::unregister_execution_stage(execution_stage& stage) noexcept {
44 auto it = std::find(_execution_stages.begin(), _execution_stages.end(), &stage);
45 if (it == _execution_stages.end()) {
46 return; // was changed by update_execution_stage_registration
47 }
48 _execution_stages.erase(it);
49 _stages_by_name.erase(stage.name());
50 }
51
52 void execution_stage_manager::update_execution_stage_registration(execution_stage& old_es, execution_stage& new_es) noexcept {
53 auto it = std::find(_execution_stages.begin(), _execution_stages.end(), &old_es);
54 *it = &new_es;
55 _stages_by_name.find(new_es.name())->second = &new_es;
56 }
57
58 execution_stage* execution_stage_manager::get_stage(const sstring& name) {
59 return _stages_by_name[name];
60 }
61
62 bool execution_stage_manager::flush() noexcept {
63 bool did_work = false;
64 for (auto&& stage : _execution_stages) {
65 did_work |= stage->flush();
66 }
67 return did_work;
68 }
69
70 bool execution_stage_manager::poll() const noexcept {
71 for (auto&& stage : _execution_stages) {
72 if (stage->poll()) {
73 return true;
74 }
75 }
76 return false;
77 }
78
79 execution_stage_manager& execution_stage_manager::get() noexcept {
80 static thread_local execution_stage_manager instance;
81 return instance;
82 }
83
84 }
85
86 execution_stage::~execution_stage()
87 {
88 internal::execution_stage_manager::get().unregister_execution_stage(*this);
89 }
90
91 execution_stage::execution_stage(execution_stage&& other)
92 : _sg(other._sg)
93 , _stats(other._stats)
94 , _name(std::move(other._name))
95 , _metric_group(std::move(other._metric_group))
96 {
97 internal::execution_stage_manager::get().update_execution_stage_registration(other, *this);
98 }
99
100 execution_stage::execution_stage(const sstring& name, scheduling_group sg)
101 : _sg(sg)
102 , _name(name)
103 {
104 internal::execution_stage_manager::get().register_execution_stage(*this);
105 auto undo = defer([&] { internal::execution_stage_manager::get().unregister_execution_stage(*this); });
106 _metric_group = metrics::metric_group("execution_stages", {
107 metrics::make_derive("tasks_scheduled",
108 metrics::description("Counts tasks scheduled by execution stages"),
109 { metrics::label_instance("execution_stage", name), },
110 [name, &esm = internal::execution_stage_manager::get()] {
111 return esm.get_stage(name)->get_stats().tasks_scheduled;
112 }),
113 metrics::make_derive("tasks_preempted",
114 metrics::description("Counts tasks which were preempted before execution all queued operations"),
115 { metrics::label_instance("execution_stage", name), },
116 [name, &esm = internal::execution_stage_manager::get()] {
117 return esm.get_stage(name)->get_stats().tasks_preempted;
118 }),
119 metrics::make_derive("function_calls_enqueued",
120 metrics::description("Counts function calls added to execution stages queues"),
121 { metrics::label_instance("execution_stage", name), },
122 [name, &esm = internal::execution_stage_manager::get()] {
123 return esm.get_stage(name)->get_stats().function_calls_enqueued;
124 }),
125 metrics::make_derive("function_calls_executed",
126 metrics::description("Counts function calls executed by execution stages"),
127 { metrics::label_instance("execution_stage", name), },
128 [name, &esm = internal::execution_stage_manager::get()] {
129 return esm.get_stage(name)->get_stats().function_calls_executed;
130 }),
131 });
132 undo.cancel();
133 }
134
135 bool execution_stage::flush() noexcept {
136 if (_empty || _flush_scheduled) {
137 return false;
138 }
139 _stats.tasks_scheduled++;
140 schedule(make_task(_sg, [this] {
141 do_flush();
142 _flush_scheduled = false;
143 }));
144 _flush_scheduled = true;
145 return true;
146 };
147
148 }