]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | #ifndef CEPH_FINISHER_H | |
16 | #define CEPH_FINISHER_H | |
17 | ||
11fdf7f2 TL |
18 | #include "include/Context.h" |
19 | #include "common/Thread.h" | |
20 | #include "common/ceph_mutex.h" | |
7c673cae | 21 | #include "common/perf_counters.h" |
11fdf7f2 | 22 | #include "common/Cond.h" |
7c673cae FG |
23 | |
24 | class CephContext; | |
25 | ||
26 | /// Finisher queue length performance counter ID. | |
27 | enum { | |
28 | l_finisher_first = 997082, | |
29 | l_finisher_queue_len, | |
30 | l_finisher_complete_lat, | |
31 | l_finisher_last | |
32 | }; | |
33 | ||
34 | /** @brief Asynchronous cleanup class. | |
35 | * Finisher asynchronously completes Contexts, which are simple classes | |
36 | * representing callbacks, in a dedicated worker thread. Enqueuing | |
37 | * contexts to complete is thread-safe. | |
38 | */ | |
39 | class Finisher { | |
40 | CephContext *cct; | |
11fdf7f2 TL |
41 | ceph::mutex finisher_lock; ///< Protects access to queues and finisher_running. |
42 | ceph::condition_variable finisher_cond; ///< Signaled when there is something to process. | |
43 | ceph::condition_variable finisher_empty_cond; ///< Signaled when the finisher has nothing more to process. | |
7c673cae FG |
44 | bool finisher_stop; ///< Set when the finisher should stop. |
45 | bool finisher_running; ///< True when the finisher is currently executing contexts. | |
46 | bool finisher_empty_wait; ///< True mean someone wait finisher empty. | |
11fdf7f2 | 47 | |
7c673cae | 48 | /// Queue for contexts for which complete(0) will be called. |
11fdf7f2 | 49 | vector<pair<Context*,int>> finisher_queue; |
7c673cae FG |
50 | |
51 | string thread_name; | |
52 | ||
7c673cae FG |
53 | /// Performance counter for the finisher's queue length. |
54 | /// Only active for named finishers. | |
55 | PerfCounters *logger; | |
56 | ||
57 | void *finisher_thread_entry(); | |
58 | ||
59 | struct FinisherThread : public Thread { | |
60 | Finisher *fin; | |
61 | explicit FinisherThread(Finisher *f) : fin(f) {} | |
11fdf7f2 | 62 | void* entry() override { return fin->finisher_thread_entry(); } |
7c673cae FG |
63 | } finisher_thread; |
64 | ||
65 | public: | |
66 | /// Add a context to complete, optionally specifying a parameter for the complete function. | |
67 | void queue(Context *c, int r = 0) { | |
11fdf7f2 | 68 | std::unique_lock ul(finisher_lock); |
7c673cae | 69 | if (finisher_queue.empty()) { |
11fdf7f2 | 70 | finisher_cond.notify_all(); |
7c673cae | 71 | } |
11fdf7f2 | 72 | finisher_queue.push_back(make_pair(c, r)); |
7c673cae FG |
73 | if (logger) |
74 | logger->inc(l_finisher_queue_len); | |
7c673cae | 75 | } |
11fdf7f2 TL |
76 | |
77 | void queue(list<Context*>& ls) { | |
78 | { | |
79 | std::unique_lock ul(finisher_lock); | |
80 | if (finisher_queue.empty()) { | |
81 | finisher_cond.notify_all(); | |
82 | } | |
83 | for (auto i : ls) { | |
84 | finisher_queue.push_back(make_pair(i, 0)); | |
85 | } | |
86 | if (logger) | |
87 | logger->inc(l_finisher_queue_len, ls.size()); | |
7c673cae | 88 | } |
7c673cae FG |
89 | ls.clear(); |
90 | } | |
91 | void queue(deque<Context*>& ls) { | |
11fdf7f2 TL |
92 | { |
93 | std::unique_lock ul(finisher_lock); | |
94 | if (finisher_queue.empty()) { | |
95 | finisher_cond.notify_all(); | |
96 | } | |
97 | for (auto i : ls) { | |
98 | finisher_queue.push_back(make_pair(i, 0)); | |
99 | } | |
100 | if (logger) | |
101 | logger->inc(l_finisher_queue_len, ls.size()); | |
7c673cae | 102 | } |
7c673cae FG |
103 | ls.clear(); |
104 | } | |
11fdf7f2 TL |
105 | void queue(vector<Context*>& ls) { |
106 | { | |
107 | std::unique_lock ul(finisher_lock); | |
108 | if (finisher_queue.empty()) { | |
109 | finisher_cond.notify_all(); | |
110 | } | |
111 | for (auto i : ls) { | |
112 | finisher_queue.push_back(make_pair(i, 0)); | |
113 | } | |
114 | if (logger) | |
115 | logger->inc(l_finisher_queue_len, ls.size()); | |
7c673cae | 116 | } |
7c673cae FG |
117 | ls.clear(); |
118 | } | |
119 | ||
120 | /// Start the worker thread. | |
121 | void start(); | |
122 | ||
123 | /** @brief Stop the worker thread. | |
124 | * | |
125 | * Does not wait until all outstanding contexts are completed. | |
126 | * To ensure that everything finishes, you should first shut down | |
127 | * all sources that can add contexts to this finisher and call | |
128 | * wait_for_empty() before calling stop(). */ | |
129 | void stop(); | |
130 | ||
131 | /** @brief Blocks until the finisher has nothing left to process. | |
132 | * This function will also return when a concurrent call to stop() | |
133 | * finishes, but this class should never be used in this way. */ | |
134 | void wait_for_empty(); | |
135 | ||
136 | /// Construct an anonymous Finisher. | |
137 | /// Anonymous finishers do not log their queue length. | |
138 | explicit Finisher(CephContext *cct_) : | |
11fdf7f2 | 139 | cct(cct_), finisher_lock(ceph::make_mutex("Finisher::finisher_lock")), |
7c673cae FG |
140 | finisher_stop(false), finisher_running(false), finisher_empty_wait(false), |
141 | thread_name("fn_anonymous"), logger(0), | |
142 | finisher_thread(this) {} | |
143 | ||
144 | /// Construct a named Finisher that logs its queue length. | |
145 | Finisher(CephContext *cct_, string name, string tn) : | |
11fdf7f2 | 146 | cct(cct_), finisher_lock(ceph::make_mutex("Finisher::" + name)), |
7c673cae FG |
147 | finisher_stop(false), finisher_running(false), finisher_empty_wait(false), |
148 | thread_name(tn), logger(0), | |
149 | finisher_thread(this) { | |
150 | PerfCountersBuilder b(cct, string("finisher-") + name, | |
151 | l_finisher_first, l_finisher_last); | |
152 | b.add_u64(l_finisher_queue_len, "queue_len"); | |
153 | b.add_time_avg(l_finisher_complete_lat, "complete_latency"); | |
154 | logger = b.create_perf_counters(); | |
155 | cct->get_perfcounters_collection()->add(logger); | |
156 | logger->set(l_finisher_queue_len, 0); | |
157 | logger->set(l_finisher_complete_lat, 0); | |
158 | } | |
159 | ||
160 | ~Finisher() { | |
161 | if (logger && cct) { | |
162 | cct->get_perfcounters_collection()->remove(logger); | |
163 | delete logger; | |
164 | } | |
165 | } | |
166 | }; | |
167 | ||
168 | /// Context that is completed asynchronously on the supplied finisher. | |
169 | class C_OnFinisher : public Context { | |
170 | Context *con; | |
171 | Finisher *fin; | |
172 | public: | |
173 | C_OnFinisher(Context *c, Finisher *f) : con(c), fin(f) { | |
11fdf7f2 TL |
174 | ceph_assert(fin != NULL); |
175 | ceph_assert(con != NULL); | |
7c673cae FG |
176 | } |
177 | ||
178 | ~C_OnFinisher() override { | |
179 | if (con != nullptr) { | |
180 | delete con; | |
181 | con = nullptr; | |
182 | } | |
183 | } | |
184 | ||
185 | void finish(int r) override { | |
186 | fin->queue(con, r); | |
187 | con = nullptr; | |
188 | } | |
189 | }; | |
190 | ||
11fdf7f2 TL |
191 | class ContextQueue { |
192 | list<Context *> q; | |
193 | std::mutex q_mutex; | |
194 | ceph::mutex& mutex; | |
195 | ceph::condition_variable& cond; | |
196 | public: | |
197 | ContextQueue(ceph::mutex& mut, | |
198 | ceph::condition_variable& con) | |
199 | : mutex(mut), cond(con) {} | |
200 | ||
201 | void queue(list<Context *>& ls) { | |
202 | bool empty = false; | |
203 | { | |
204 | std::scoped_lock l(q_mutex); | |
205 | if (q.empty()) { | |
206 | q.swap(ls); | |
207 | empty = true; | |
208 | } else { | |
209 | q.insert(q.end(), ls.begin(), ls.end()); | |
210 | } | |
211 | } | |
212 | ||
213 | if (empty) { | |
214 | std::scoped_lock l{mutex}; | |
215 | cond.notify_all(); | |
216 | } | |
217 | ||
218 | ls.clear(); | |
219 | } | |
220 | ||
221 | void swap(list<Context *>& ls) { | |
222 | ls.clear(); | |
223 | std::scoped_lock l(q_mutex); | |
224 | if (!q.empty()) { | |
225 | q.swap(ls); | |
226 | } | |
227 | } | |
228 | ||
229 | bool empty() { | |
230 | std::scoped_lock l(q_mutex); | |
231 | return q.empty(); | |
232 | } | |
233 | }; | |
234 | ||
7c673cae | 235 | #endif |