]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #ifndef CEPH_FINISHER_H | |
16 | #define CEPH_FINISHER_H | |
17 | ||
11fdf7f2 | 18 | #include "include/Context.h" |
9f95a23c | 19 | #include "include/common_fwd.h" |
11fdf7f2 TL |
20 | #include "common/Thread.h" |
21 | #include "common/ceph_mutex.h" | |
7c673cae | 22 | #include "common/perf_counters.h" |
11fdf7f2 | 23 | #include "common/Cond.h" |
7c673cae | 24 | |
7c673cae FG |
25 | |
26 | /// Finisher queue length performance counter ID. | |
27 | enum { | |
28 | l_finisher_first = 997082, | |
29 | l_finisher_queue_len, | |
30 | l_finisher_complete_lat, | |
31 | l_finisher_last | |
32 | }; | |
33 | ||
34 | /** @brief Asynchronous cleanup class. | |
35 | * Finisher asynchronously completes Contexts, which are simple classes | |
36 | * representing callbacks, in a dedicated worker thread. Enqueuing | |
37 | * contexts to complete is thread-safe. | |
38 | */ | |
39 | class Finisher { | |
40 | CephContext *cct; | |
11fdf7f2 TL |
41 | ceph::mutex finisher_lock; ///< Protects access to queues and finisher_running. |
42 | ceph::condition_variable finisher_cond; ///< Signaled when there is something to process. | |
43 | ceph::condition_variable finisher_empty_cond; ///< Signaled when the finisher has nothing more to process. | |
7c673cae FG |
44 | bool finisher_stop; ///< Set when the finisher should stop. |
45 | bool finisher_running; ///< True when the finisher is currently executing contexts. | |
46 | bool finisher_empty_wait; ///< True mean someone wait finisher empty. | |
11fdf7f2 | 47 | |
7c673cae | 48 | /// Queue for contexts for which complete(0) will be called. |
9f95a23c TL |
49 | std::vector<std::pair<Context*,int>> finisher_queue; |
50 | std::vector<std::pair<Context*,int>> in_progress_queue; | |
7c673cae | 51 | |
9f95a23c | 52 | std::string thread_name; |
7c673cae | 53 | |
7c673cae FG |
54 | /// Performance counter for the finisher's queue length. |
55 | /// Only active for named finishers. | |
56 | PerfCounters *logger; | |
9f95a23c | 57 | |
7c673cae FG |
58 | void *finisher_thread_entry(); |
59 | ||
60 | struct FinisherThread : public Thread { | |
9f95a23c | 61 | Finisher *fin; |
7c673cae | 62 | explicit FinisherThread(Finisher *f) : fin(f) {} |
11fdf7f2 | 63 | void* entry() override { return fin->finisher_thread_entry(); } |
7c673cae FG |
64 | } finisher_thread; |
65 | ||
66 | public: | |
67 | /// Add a context to complete, optionally specifying a parameter for the complete function. | |
68 | void queue(Context *c, int r = 0) { | |
11fdf7f2 | 69 | std::unique_lock ul(finisher_lock); |
9f95a23c TL |
70 | bool was_empty = finisher_queue.empty(); |
71 | finisher_queue.push_back(std::make_pair(c, r)); | |
72 | if (was_empty) { | |
73 | finisher_cond.notify_one(); | |
7c673cae | 74 | } |
7c673cae FG |
75 | if (logger) |
76 | logger->inc(l_finisher_queue_len); | |
7c673cae | 77 | } |
11fdf7f2 | 78 | |
9f95a23c | 79 | void queue(std::list<Context*>& ls) { |
11fdf7f2 TL |
80 | { |
81 | std::unique_lock ul(finisher_lock); | |
82 | if (finisher_queue.empty()) { | |
83 | finisher_cond.notify_all(); | |
84 | } | |
85 | for (auto i : ls) { | |
9f95a23c | 86 | finisher_queue.push_back(std::make_pair(i, 0)); |
11fdf7f2 TL |
87 | } |
88 | if (logger) | |
89 | logger->inc(l_finisher_queue_len, ls.size()); | |
7c673cae | 90 | } |
7c673cae FG |
91 | ls.clear(); |
92 | } | |
9f95a23c | 93 | void queue(std::deque<Context*>& ls) { |
11fdf7f2 TL |
94 | { |
95 | std::unique_lock ul(finisher_lock); | |
96 | if (finisher_queue.empty()) { | |
97 | finisher_cond.notify_all(); | |
98 | } | |
99 | for (auto i : ls) { | |
9f95a23c | 100 | finisher_queue.push_back(std::make_pair(i, 0)); |
11fdf7f2 TL |
101 | } |
102 | if (logger) | |
103 | logger->inc(l_finisher_queue_len, ls.size()); | |
7c673cae | 104 | } |
7c673cae FG |
105 | ls.clear(); |
106 | } | |
9f95a23c | 107 | void queue(std::vector<Context*>& ls) { |
11fdf7f2 TL |
108 | { |
109 | std::unique_lock ul(finisher_lock); | |
110 | if (finisher_queue.empty()) { | |
111 | finisher_cond.notify_all(); | |
112 | } | |
113 | for (auto i : ls) { | |
9f95a23c | 114 | finisher_queue.push_back(std::make_pair(i, 0)); |
11fdf7f2 TL |
115 | } |
116 | if (logger) | |
117 | logger->inc(l_finisher_queue_len, ls.size()); | |
7c673cae | 118 | } |
7c673cae FG |
119 | ls.clear(); |
120 | } | |
121 | ||
122 | /// Start the worker thread. | |
123 | void start(); | |
124 | ||
125 | /** @brief Stop the worker thread. | |
126 | * | |
127 | * Does not wait until all outstanding contexts are completed. | |
128 | * To ensure that everything finishes, you should first shut down | |
129 | * all sources that can add contexts to this finisher and call | |
130 | * wait_for_empty() before calling stop(). */ | |
131 | void stop(); | |
132 | ||
133 | /** @brief Blocks until the finisher has nothing left to process. | |
134 | * This function will also return when a concurrent call to stop() | |
135 | * finishes, but this class should never be used in this way. */ | |
136 | void wait_for_empty(); | |
137 | ||
138 | /// Construct an anonymous Finisher. | |
139 | /// Anonymous finishers do not log their queue length. | |
140 | explicit Finisher(CephContext *cct_) : | |
11fdf7f2 | 141 | cct(cct_), finisher_lock(ceph::make_mutex("Finisher::finisher_lock")), |
7c673cae FG |
142 | finisher_stop(false), finisher_running(false), finisher_empty_wait(false), |
143 | thread_name("fn_anonymous"), logger(0), | |
144 | finisher_thread(this) {} | |
145 | ||
146 | /// Construct a named Finisher that logs its queue length. | |
9f95a23c | 147 | Finisher(CephContext *cct_, std::string name, std::string tn) : |
11fdf7f2 | 148 | cct(cct_), finisher_lock(ceph::make_mutex("Finisher::" + name)), |
7c673cae FG |
149 | finisher_stop(false), finisher_running(false), finisher_empty_wait(false), |
150 | thread_name(tn), logger(0), | |
151 | finisher_thread(this) { | |
9f95a23c | 152 | PerfCountersBuilder b(cct, std::string("finisher-") + name, |
7c673cae FG |
153 | l_finisher_first, l_finisher_last); |
154 | b.add_u64(l_finisher_queue_len, "queue_len"); | |
155 | b.add_time_avg(l_finisher_complete_lat, "complete_latency"); | |
156 | logger = b.create_perf_counters(); | |
157 | cct->get_perfcounters_collection()->add(logger); | |
158 | logger->set(l_finisher_queue_len, 0); | |
159 | logger->set(l_finisher_complete_lat, 0); | |
160 | } | |
161 | ||
162 | ~Finisher() { | |
163 | if (logger && cct) { | |
164 | cct->get_perfcounters_collection()->remove(logger); | |
165 | delete logger; | |
166 | } | |
167 | } | |
168 | }; | |
169 | ||
170 | /// Context that is completed asynchronously on the supplied finisher. | |
171 | class C_OnFinisher : public Context { | |
172 | Context *con; | |
173 | Finisher *fin; | |
174 | public: | |
175 | C_OnFinisher(Context *c, Finisher *f) : con(c), fin(f) { | |
11fdf7f2 TL |
176 | ceph_assert(fin != NULL); |
177 | ceph_assert(con != NULL); | |
7c673cae FG |
178 | } |
179 | ||
180 | ~C_OnFinisher() override { | |
181 | if (con != nullptr) { | |
182 | delete con; | |
183 | con = nullptr; | |
184 | } | |
185 | } | |
186 | ||
187 | void finish(int r) override { | |
188 | fin->queue(con, r); | |
189 | con = nullptr; | |
190 | } | |
191 | }; | |
192 | ||
11fdf7f2 | 193 | class ContextQueue { |
9f95a23c | 194 | std::list<Context *> q; |
11fdf7f2 TL |
195 | std::mutex q_mutex; |
196 | ceph::mutex& mutex; | |
197 | ceph::condition_variable& cond; | |
9f95a23c | 198 | std::atomic_bool q_empty = true; |
11fdf7f2 TL |
199 | public: |
200 | ContextQueue(ceph::mutex& mut, | |
201 | ceph::condition_variable& con) | |
202 | : mutex(mut), cond(con) {} | |
203 | ||
9f95a23c TL |
204 | void queue(std::list<Context *>& ls) { |
205 | bool was_empty = false; | |
11fdf7f2 TL |
206 | { |
207 | std::scoped_lock l(q_mutex); | |
208 | if (q.empty()) { | |
209 | q.swap(ls); | |
9f95a23c | 210 | was_empty = true; |
11fdf7f2 TL |
211 | } else { |
212 | q.insert(q.end(), ls.begin(), ls.end()); | |
213 | } | |
9f95a23c | 214 | q_empty = q.empty(); |
11fdf7f2 TL |
215 | } |
216 | ||
9f95a23c | 217 | if (was_empty) { |
11fdf7f2 TL |
218 | std::scoped_lock l{mutex}; |
219 | cond.notify_all(); | |
220 | } | |
221 | ||
222 | ls.clear(); | |
223 | } | |
224 | ||
9f95a23c | 225 | void move_to(std::list<Context *>& ls) { |
11fdf7f2 TL |
226 | ls.clear(); |
227 | std::scoped_lock l(q_mutex); | |
228 | if (!q.empty()) { | |
229 | q.swap(ls); | |
230 | } | |
9f95a23c | 231 | q_empty = true; |
11fdf7f2 TL |
232 | } |
233 | ||
234 | bool empty() { | |
9f95a23c | 235 | return q_empty; |
11fdf7f2 TL |
236 | } |
237 | }; | |
238 | ||
7c673cae | 239 | #endif |