]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | ||
16 | #ifndef CEPH_CONTEXT_H | |
17 | #define CEPH_CONTEXT_H | |
18 | ||
19 | #include "common/dout.h" | |
20 | ||
21 | #include <boost/function.hpp> | |
22 | #include <list> | |
23 | #include <set> | |
24 | #include <memory> | |
25 | ||
26 | #include "include/assert.h" | |
27 | #include "include/memory.h" | |
28 | ||
29 | #define mydout(cct, v) lgeneric_subdout(cct, context, v) | |
30 | ||
31 | /* | |
32 | * GenContext - abstract callback class | |
33 | */ | |
34 | template <typename T> | |
35 | class GenContext { | |
36 | GenContext(const GenContext& other); | |
37 | const GenContext& operator=(const GenContext& other); | |
38 | ||
39 | protected: | |
40 | virtual void finish(T t) = 0; | |
41 | ||
42 | public: | |
43 | GenContext() {} | |
44 | virtual ~GenContext() {} // we want a virtual destructor!!! | |
45 | ||
46 | template <typename C> | |
47 | void complete(C &&t) { | |
48 | finish(std::forward<C>(t)); | |
49 | delete this; | |
50 | } | |
51 | }; | |
52 | ||
53 | template <typename T> | |
54 | using GenContextURef = std::unique_ptr<GenContext<T> >; | |
55 | ||
56 | /* | |
57 | * Context - abstract callback class | |
58 | */ | |
59 | class Context { | |
60 | Context(const Context& other); | |
61 | const Context& operator=(const Context& other); | |
62 | ||
63 | protected: | |
64 | virtual void finish(int r) = 0; | |
65 | ||
66 | public: | |
67 | Context() {} | |
68 | virtual ~Context() {} // we want a virtual destructor!!! | |
69 | virtual void complete(int r) { | |
70 | finish(r); | |
71 | delete this; | |
72 | } | |
73 | }; | |
74 | ||
75 | /** | |
76 | * Simple context holding a single object | |
77 | */ | |
78 | template<class T> | |
79 | class ContainerContext : public Context { | |
80 | T obj; | |
81 | public: | |
82 | ContainerContext(T &obj) : obj(obj) {} | |
83 | void finish(int r) override {} | |
84 | }; | |
85 | template <typename T> | |
86 | ContainerContext<T> *make_container_context(T &&t) { | |
87 | return new ContainerContext<T>(std::forward<T>(t)); | |
88 | } | |
89 | ||
90 | template <class T> | |
91 | struct Wrapper : public Context { | |
92 | Context *to_run; | |
93 | T val; | |
94 | Wrapper(Context *to_run, T val) : to_run(to_run), val(val) {} | |
95 | void finish(int r) override { | |
96 | if (to_run) | |
97 | to_run->complete(r); | |
98 | } | |
99 | }; | |
100 | struct RunOnDelete { | |
101 | Context *to_run; | |
102 | RunOnDelete(Context *to_run) : to_run(to_run) {} | |
103 | ~RunOnDelete() { | |
104 | if (to_run) | |
105 | to_run->complete(0); | |
106 | } | |
107 | }; | |
108 | typedef ceph::shared_ptr<RunOnDelete> RunOnDeleteRef; | |
109 | ||
110 | template <typename T> | |
111 | struct LambdaContext : public Context { | |
112 | T t; | |
113 | LambdaContext(T &&t) : t(std::forward<T>(t)) {} | |
114 | void finish(int) override { | |
115 | t(); | |
116 | } | |
117 | }; | |
118 | template <typename T> | |
119 | LambdaContext<T> *make_lambda_context(T &&t) { | |
120 | return new LambdaContext<T>(std::move(t)); | |
121 | } | |
122 | ||
123 | template <typename F, typename T> | |
124 | struct LambdaGenContext : GenContext<T> { | |
125 | F f; | |
126 | LambdaGenContext(F &&f) : f(std::forward<F>(f)) {} | |
127 | void finish(T t) override { | |
128 | f(std::forward<T>(t)); | |
129 | } | |
130 | }; | |
131 | template <typename T, typename F> | |
132 | GenContextURef<T> make_gen_lambda_context(F &&f) { | |
133 | return GenContextURef<T>(new LambdaGenContext<F, T>(std::move(f))); | |
134 | } | |
135 | ||
136 | /* | |
137 | * finish and destroy a list of Contexts | |
138 | */ | |
139 | template<class A> | |
140 | inline void finish_contexts(CephContext *cct, std::list<A*>& finished, | |
141 | int result = 0) | |
142 | { | |
143 | if (finished.empty()) | |
144 | return; | |
145 | ||
146 | list<A*> ls; | |
147 | ls.swap(finished); // swap out of place to avoid weird loops | |
148 | ||
149 | if (cct) | |
150 | mydout(cct, 10) << ls.size() << " contexts to finish with " << result << dendl; | |
151 | typename std::list<A*>::iterator it; | |
152 | for (it = ls.begin(); it != ls.end(); it++) { | |
153 | A *c = *it; | |
154 | if (cct) | |
155 | mydout(cct,10) << "---- " << c << dendl; | |
156 | c->complete(result); | |
157 | } | |
158 | } | |
159 | ||
160 | inline void finish_contexts(CephContext *cct, std::vector<Context*>& finished, | |
161 | int result = 0) | |
162 | { | |
163 | if (finished.empty()) | |
164 | return; | |
165 | ||
166 | vector<Context*> ls; | |
167 | ls.swap(finished); // swap out of place to avoid weird loops | |
168 | ||
169 | if (cct) | |
170 | mydout(cct,10) << ls.size() << " contexts to finish with " << result << dendl; | |
171 | for (std::vector<Context*>::iterator it = ls.begin(); | |
172 | it != ls.end(); | |
173 | it++) { | |
174 | Context *c = *it; | |
175 | if (cct) | |
176 | mydout(cct,10) << "---- " << c << dendl; | |
177 | c->complete(result); | |
178 | } | |
179 | } | |
180 | ||
181 | class C_NoopContext : public Context { | |
182 | public: | |
183 | void finish(int r) override { } | |
184 | }; | |
185 | ||
186 | ||
187 | struct C_Lock : public Context { | |
188 | Mutex *lock; | |
189 | Context *fin; | |
190 | C_Lock(Mutex *l, Context *c) : lock(l), fin(c) {} | |
191 | ~C_Lock() override { | |
192 | delete fin; | |
193 | } | |
194 | void finish(int r) override { | |
195 | if (fin) { | |
196 | lock->Lock(); | |
197 | fin->complete(r); | |
198 | fin = NULL; | |
199 | lock->Unlock(); | |
200 | } | |
201 | } | |
202 | }; | |
203 | ||
204 | /* | |
205 | * C_Contexts - set of Contexts | |
206 | * | |
207 | * ContextType must be an ancestor class of ContextInstanceType, or the same class. | |
208 | * ContextInstanceType must be default-constructable. | |
209 | */ | |
210 | template <class ContextType, class ContextInstanceType> | |
211 | class C_ContextsBase : public ContextInstanceType { | |
212 | public: | |
213 | CephContext *cct; | |
214 | std::list<ContextType*> contexts; | |
215 | ||
216 | C_ContextsBase(CephContext *cct_) | |
217 | : cct(cct_) | |
218 | { | |
219 | } | |
224ce89b WB |
220 | ~C_ContextsBase() override { |
221 | for (auto c : contexts) { | |
222 | delete c; | |
223 | } | |
224 | } | |
7c673cae FG |
225 | void add(ContextType* c) { |
226 | contexts.push_back(c); | |
227 | } | |
228 | void take(std::list<ContextType*>& ls) { | |
229 | contexts.splice(contexts.end(), ls); | |
230 | } | |
231 | void complete(int r) override { | |
232 | // Neuter any ContextInstanceType custom complete(), because although | |
233 | // I want to look like it, I don't actually want to run its code. | |
234 | Context::complete(r); | |
235 | } | |
236 | void finish(int r) override { | |
237 | finish_contexts(cct, contexts, r); | |
238 | } | |
239 | bool empty() { return contexts.empty(); } | |
240 | ||
241 | static ContextType *list_to_context(list<ContextType *> &cs) { | |
242 | if (cs.size() == 0) { | |
243 | return 0; | |
244 | } else if (cs.size() == 1) { | |
245 | ContextType *c = cs.front(); | |
246 | cs.clear(); | |
247 | return c; | |
248 | } else { | |
249 | C_ContextsBase<ContextType, ContextInstanceType> *c(new C_ContextsBase<ContextType, ContextInstanceType>(0)); | |
250 | c->take(cs); | |
251 | return c; | |
252 | } | |
253 | } | |
254 | }; | |
255 | ||
256 | typedef C_ContextsBase<Context, Context> C_Contexts; | |
257 | ||
258 | /* | |
259 | * C_Gather | |
260 | * | |
261 | * ContextType must be an ancestor class of ContextInstanceType, or the same class. | |
262 | * ContextInstanceType must be default-constructable. | |
263 | * | |
264 | * BUG:? only reports error from last sub to have an error return | |
265 | */ | |
266 | template <class ContextType, class ContextInstanceType> | |
267 | class C_GatherBase : public ContextType { | |
268 | private: | |
269 | CephContext *cct; | |
270 | int result; | |
271 | ContextType *onfinish; | |
272 | #ifdef DEBUG_GATHER | |
273 | std::set<ContextType*> waitfor; | |
274 | #endif | |
275 | int sub_created_count; | |
276 | int sub_existing_count; | |
277 | mutable Mutex lock; | |
278 | bool activated; | |
279 | ||
280 | void sub_finish(ContextType* sub, int r) { | |
281 | lock.Lock(); | |
282 | #ifdef DEBUG_GATHER | |
283 | assert(waitfor.count(sub)); | |
284 | waitfor.erase(sub); | |
285 | #endif | |
286 | --sub_existing_count; | |
287 | mydout(cct,10) << "C_GatherBase " << this << ".sub_finish(r=" << r << ") " << sub | |
288 | #ifdef DEBUG_GATHER | |
289 | << " (remaining " << waitfor << ")" | |
290 | #endif | |
291 | << dendl; | |
292 | if (r < 0 && result == 0) | |
293 | result = r; | |
294 | if ((activated == false) || (sub_existing_count != 0)) { | |
295 | lock.Unlock(); | |
296 | return; | |
297 | } | |
298 | lock.Unlock(); | |
299 | delete_me(); | |
300 | } | |
301 | ||
302 | void delete_me() { | |
303 | if (onfinish) { | |
304 | onfinish->complete(result); | |
305 | onfinish = 0; | |
306 | } | |
307 | delete this; | |
308 | } | |
309 | ||
310 | class C_GatherSub : public ContextInstanceType { | |
311 | C_GatherBase *gather; | |
312 | public: | |
313 | C_GatherSub(C_GatherBase *g) : gather(g) {} | |
314 | void complete(int r) override { | |
315 | // Cancel any customized complete() functionality | |
316 | // from the Context subclass we're templated for, | |
317 | // we only want to hit that in onfinish, not at each | |
318 | // sub finish. e.g. MDSInternalContext. | |
319 | Context::complete(r); | |
320 | } | |
321 | void finish(int r) override { | |
322 | gather->sub_finish(this, r); | |
323 | gather = 0; | |
324 | } | |
325 | ~C_GatherSub() override { | |
326 | if (gather) | |
327 | gather->sub_finish(this, 0); | |
328 | } | |
329 | }; | |
330 | ||
331 | public: | |
332 | C_GatherBase(CephContext *cct_, ContextType *onfinish_) | |
333 | : cct(cct_), result(0), onfinish(onfinish_), | |
334 | sub_created_count(0), sub_existing_count(0), | |
335 | lock("C_GatherBase::lock", true, false), //disable lockdep | |
336 | activated(false) | |
337 | { | |
338 | mydout(cct,10) << "C_GatherBase " << this << ".new" << dendl; | |
339 | } | |
340 | ~C_GatherBase() override { | |
341 | mydout(cct,10) << "C_GatherBase " << this << ".delete" << dendl; | |
342 | } | |
343 | void set_finisher(ContextType *onfinish_) { | |
344 | Mutex::Locker l(lock); | |
345 | assert(!onfinish); | |
346 | onfinish = onfinish_; | |
347 | } | |
348 | void activate() { | |
349 | lock.Lock(); | |
350 | assert(activated == false); | |
351 | activated = true; | |
352 | if (sub_existing_count != 0) { | |
353 | lock.Unlock(); | |
354 | return; | |
355 | } | |
356 | lock.Unlock(); | |
357 | delete_me(); | |
358 | } | |
359 | ContextType *new_sub() { | |
360 | Mutex::Locker l(lock); | |
361 | assert(activated == false); | |
362 | sub_created_count++; | |
363 | sub_existing_count++; | |
364 | ContextType *s = new C_GatherSub(this); | |
365 | #ifdef DEBUG_GATHER | |
366 | waitfor.insert(s); | |
367 | #endif | |
368 | mydout(cct,10) << "C_GatherBase " << this << ".new_sub is " << sub_created_count << " " << s << dendl; | |
369 | return s; | |
370 | } | |
371 | void finish(int r) override { | |
372 | ceph_abort(); // nobody should ever call me. | |
373 | } | |
374 | ||
375 | inline int get_sub_existing_count() const { | |
376 | Mutex::Locker l(lock); | |
377 | return sub_existing_count; | |
378 | } | |
379 | ||
380 | inline int get_sub_created_count() const { | |
381 | Mutex::Locker l(lock); | |
382 | return sub_created_count; | |
383 | } | |
384 | }; | |
385 | ||
386 | /* | |
387 | * The C_GatherBuilder remembers each C_Context created by | |
388 | * C_GatherBuilder.new_sub() in a C_Gather. When a C_Context created | |
389 | * by new_sub() is complete(), C_Gather forgets about it. When | |
390 | * C_GatherBuilder notices that there are no C_Context left in | |
391 | * C_Gather, it calls complete() on the C_Context provided as the | |
392 | * second argument of the constructor (finisher). | |
393 | * | |
394 | * How to use C_GatherBuilder: | |
395 | * | |
396 | * 1. Create a C_GatherBuilder on the stack | |
397 | * 2. Call gather_bld.new_sub() as many times as you want to create new subs | |
398 | * It is safe to call this 0 times, or 100, or anything in between. | |
399 | * 3. If you didn't supply a finisher in the C_GatherBuilder constructor, | |
400 | * set one with gather_bld.set_finisher(my_finisher) | |
401 | * 4. Call gather_bld.activate() | |
402 | * | |
403 | * Example: | |
404 | * | |
405 | * C_SaferCond all_done; | |
406 | * C_GatherBuilder gb(g_ceph_context, all_done); | |
407 | * j.submit_entry(1, first, 0, gb.new_sub()); // add a C_Context to C_Gather | |
408 | * j.submit_entry(2, first, 0, gb.new_sub()); // add a C_Context to C_Gather | |
409 | * gb.activate(); // consume C_Context as soon as they complete() | |
410 | * all_done.wait(); // all_done is complete() after all new_sub() are complete() | |
411 | * | |
412 | * The finisher may be called at any point after step 4, including immediately | |
413 | * from the activate() function. | |
414 | * The finisher will never be called before activate(). | |
415 | * | |
416 | * Note: Currently, subs must be manually freed by the caller (for some reason.) | |
417 | */ | |
418 | template <class ContextType, class GatherType> | |
419 | class C_GatherBuilderBase | |
420 | { | |
421 | public: | |
422 | C_GatherBuilderBase(CephContext *cct_) | |
423 | : cct(cct_), c_gather(NULL), finisher(NULL), activated(false) | |
424 | { | |
425 | } | |
426 | C_GatherBuilderBase(CephContext *cct_, ContextType *finisher_) | |
427 | : cct(cct_), c_gather(NULL), finisher(finisher_), activated(false) | |
428 | { | |
429 | } | |
430 | ~C_GatherBuilderBase() { | |
431 | if (c_gather) { | |
432 | assert(activated); // Don't forget to activate your C_Gather! | |
433 | } | |
434 | else { | |
435 | delete finisher; | |
436 | } | |
437 | } | |
438 | ContextType *new_sub() { | |
439 | if (!c_gather) { | |
440 | c_gather = new GatherType(cct, finisher); | |
441 | } | |
442 | return c_gather->new_sub(); | |
443 | } | |
444 | void activate() { | |
445 | if (!c_gather) | |
446 | return; | |
447 | assert(finisher != NULL); | |
448 | activated = true; | |
449 | c_gather->activate(); | |
450 | } | |
451 | void set_finisher(ContextType *finisher_) { | |
452 | finisher = finisher_; | |
453 | if (c_gather) | |
454 | c_gather->set_finisher(finisher); | |
455 | } | |
456 | GatherType *get() const { | |
457 | return c_gather; | |
458 | } | |
459 | bool has_subs() const { | |
460 | return (c_gather != NULL); | |
461 | } | |
462 | int num_subs_created() { | |
463 | assert(!activated); | |
464 | if (c_gather == NULL) | |
465 | return 0; | |
466 | return c_gather->get_sub_created_count(); | |
467 | } | |
468 | int num_subs_remaining() { | |
469 | assert(!activated); | |
470 | if (c_gather == NULL) | |
471 | return 0; | |
472 | return c_gather->get_sub_existing_count(); | |
473 | } | |
474 | ||
475 | private: | |
476 | CephContext *cct; | |
477 | GatherType *c_gather; | |
478 | ContextType *finisher; | |
479 | bool activated; | |
480 | }; | |
481 | ||
482 | typedef C_GatherBase<Context, Context> C_Gather; | |
483 | typedef C_GatherBuilderBase<Context, C_Gather > C_GatherBuilder; | |
484 | ||
485 | class FunctionContext : public Context { | |
486 | public: | |
487 | FunctionContext(boost::function<void(int)> &&callback) | |
488 | : m_callback(std::move(callback)) | |
489 | { | |
490 | } | |
491 | ||
492 | void finish(int r) override { | |
493 | m_callback(r); | |
494 | } | |
495 | private: | |
496 | boost::function<void(int)> m_callback; | |
497 | }; | |
498 | ||
499 | #undef mydout | |
500 | ||
501 | #endif |