]> git.proxmox.com Git - ceph.git/blob - ceph/src/include/Context.h
bf992d0a2bf9bcfcfbaaedc2b14eae2411a1c93c
[ceph.git] / ceph / src / include / Context.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16 #ifndef CEPH_CONTEXT_H
17 #define CEPH_CONTEXT_H
18
19 #include "common/dout.h"
20
21 #include <boost/function.hpp>
22 #include <list>
23 #include <set>
24 #include <memory>
25
26 #include "include/assert.h"
27 #include "include/memory.h"
28
29 #define mydout(cct, v) lgeneric_subdout(cct, context, v)
30
31 /*
32 * GenContext - abstract callback class
33 */
34 template <typename T>
35 class GenContext {
36 GenContext(const GenContext& other);
37 const GenContext& operator=(const GenContext& other);
38
39 protected:
40 virtual void finish(T t) = 0;
41
42 public:
43 GenContext() {}
44 virtual ~GenContext() {} // we want a virtual destructor!!!
45
46 template <typename C>
47 void complete(C &&t) {
48 finish(std::forward<C>(t));
49 delete this;
50 }
51 };
52
53 template <typename T>
54 using GenContextURef = std::unique_ptr<GenContext<T> >;
55
56 /*
57 * Context - abstract callback class
58 */
59 class Context {
60 Context(const Context& other);
61 const Context& operator=(const Context& other);
62
63 protected:
64 virtual void finish(int r) = 0;
65
66 public:
67 Context() {}
68 virtual ~Context() {} // we want a virtual destructor!!!
69 virtual void complete(int r) {
70 finish(r);
71 delete this;
72 }
73 };
74
75 /**
76 * Simple context holding a single object
77 */
78 template<class T>
79 class ContainerContext : public Context {
80 T obj;
81 public:
82 ContainerContext(T &obj) : obj(obj) {}
83 void finish(int r) override {}
84 };
85 template <typename T>
86 ContainerContext<T> *make_container_context(T &&t) {
87 return new ContainerContext<T>(std::forward<T>(t));
88 }
89
90 template <class T>
91 struct Wrapper : public Context {
92 Context *to_run;
93 T val;
94 Wrapper(Context *to_run, T val) : to_run(to_run), val(val) {}
95 void finish(int r) override {
96 if (to_run)
97 to_run->complete(r);
98 }
99 };
100 struct RunOnDelete {
101 Context *to_run;
102 RunOnDelete(Context *to_run) : to_run(to_run) {}
103 ~RunOnDelete() {
104 if (to_run)
105 to_run->complete(0);
106 }
107 };
108 typedef ceph::shared_ptr<RunOnDelete> RunOnDeleteRef;
109
110 template <typename T>
111 struct LambdaContext : public Context {
112 T t;
113 LambdaContext(T &&t) : t(std::forward<T>(t)) {}
114 void finish(int) override {
115 t();
116 }
117 };
118 template <typename T>
119 LambdaContext<T> *make_lambda_context(T &&t) {
120 return new LambdaContext<T>(std::move(t));
121 }
122
123 template <typename F, typename T>
124 struct LambdaGenContext : GenContext<T> {
125 F f;
126 LambdaGenContext(F &&f) : f(std::forward<F>(f)) {}
127 void finish(T t) override {
128 f(std::forward<T>(t));
129 }
130 };
131 template <typename T, typename F>
132 GenContextURef<T> make_gen_lambda_context(F &&f) {
133 return GenContextURef<T>(new LambdaGenContext<F, T>(std::move(f)));
134 }
135
136 /*
137 * finish and destroy a list of Contexts
138 */
139 template<class A>
140 inline void finish_contexts(CephContext *cct, std::list<A*>& finished,
141 int result = 0)
142 {
143 if (finished.empty())
144 return;
145
146 list<A*> ls;
147 ls.swap(finished); // swap out of place to avoid weird loops
148
149 if (cct)
150 mydout(cct, 10) << ls.size() << " contexts to finish with " << result << dendl;
151 typename std::list<A*>::iterator it;
152 for (it = ls.begin(); it != ls.end(); it++) {
153 A *c = *it;
154 if (cct)
155 mydout(cct,10) << "---- " << c << dendl;
156 c->complete(result);
157 }
158 }
159
160 inline void finish_contexts(CephContext *cct, std::vector<Context*>& finished,
161 int result = 0)
162 {
163 if (finished.empty())
164 return;
165
166 vector<Context*> ls;
167 ls.swap(finished); // swap out of place to avoid weird loops
168
169 if (cct)
170 mydout(cct,10) << ls.size() << " contexts to finish with " << result << dendl;
171 for (std::vector<Context*>::iterator it = ls.begin();
172 it != ls.end();
173 it++) {
174 Context *c = *it;
175 if (cct)
176 mydout(cct,10) << "---- " << c << dendl;
177 c->complete(result);
178 }
179 }
180
181 class C_NoopContext : public Context {
182 public:
183 void finish(int r) override { }
184 };
185
186
187 struct C_Lock : public Context {
188 Mutex *lock;
189 Context *fin;
190 C_Lock(Mutex *l, Context *c) : lock(l), fin(c) {}
191 ~C_Lock() override {
192 delete fin;
193 }
194 void finish(int r) override {
195 if (fin) {
196 lock->Lock();
197 fin->complete(r);
198 fin = NULL;
199 lock->Unlock();
200 }
201 }
202 };
203
204 /*
205 * C_Contexts - set of Contexts
206 *
207 * ContextType must be an ancestor class of ContextInstanceType, or the same class.
208 * ContextInstanceType must be default-constructable.
209 */
210 template <class ContextType, class ContextInstanceType>
211 class C_ContextsBase : public ContextInstanceType {
212 public:
213 CephContext *cct;
214 std::list<ContextType*> contexts;
215
216 C_ContextsBase(CephContext *cct_)
217 : cct(cct_)
218 {
219 }
220
221 void add(ContextType* c) {
222 contexts.push_back(c);
223 }
224 void take(std::list<ContextType*>& ls) {
225 contexts.splice(contexts.end(), ls);
226 }
227 void complete(int r) override {
228 // Neuter any ContextInstanceType custom complete(), because although
229 // I want to look like it, I don't actually want to run its code.
230 Context::complete(r);
231 }
232 void finish(int r) override {
233 finish_contexts(cct, contexts, r);
234 }
235 bool empty() { return contexts.empty(); }
236
237 static ContextType *list_to_context(list<ContextType *> &cs) {
238 if (cs.size() == 0) {
239 return 0;
240 } else if (cs.size() == 1) {
241 ContextType *c = cs.front();
242 cs.clear();
243 return c;
244 } else {
245 C_ContextsBase<ContextType, ContextInstanceType> *c(new C_ContextsBase<ContextType, ContextInstanceType>(0));
246 c->take(cs);
247 return c;
248 }
249 }
250 };
251
252 typedef C_ContextsBase<Context, Context> C_Contexts;
253
254 /*
255 * C_Gather
256 *
257 * ContextType must be an ancestor class of ContextInstanceType, or the same class.
258 * ContextInstanceType must be default-constructable.
259 *
260 * BUG:? only reports error from last sub to have an error return
261 */
262 template <class ContextType, class ContextInstanceType>
263 class C_GatherBase : public ContextType {
264 private:
265 CephContext *cct;
266 int result;
267 ContextType *onfinish;
268 #ifdef DEBUG_GATHER
269 std::set<ContextType*> waitfor;
270 #endif
271 int sub_created_count;
272 int sub_existing_count;
273 mutable Mutex lock;
274 bool activated;
275
276 void sub_finish(ContextType* sub, int r) {
277 lock.Lock();
278 #ifdef DEBUG_GATHER
279 assert(waitfor.count(sub));
280 waitfor.erase(sub);
281 #endif
282 --sub_existing_count;
283 mydout(cct,10) << "C_GatherBase " << this << ".sub_finish(r=" << r << ") " << sub
284 #ifdef DEBUG_GATHER
285 << " (remaining " << waitfor << ")"
286 #endif
287 << dendl;
288 if (r < 0 && result == 0)
289 result = r;
290 if ((activated == false) || (sub_existing_count != 0)) {
291 lock.Unlock();
292 return;
293 }
294 lock.Unlock();
295 delete_me();
296 }
297
298 void delete_me() {
299 if (onfinish) {
300 onfinish->complete(result);
301 onfinish = 0;
302 }
303 delete this;
304 }
305
306 class C_GatherSub : public ContextInstanceType {
307 C_GatherBase *gather;
308 public:
309 C_GatherSub(C_GatherBase *g) : gather(g) {}
310 void complete(int r) override {
311 // Cancel any customized complete() functionality
312 // from the Context subclass we're templated for,
313 // we only want to hit that in onfinish, not at each
314 // sub finish. e.g. MDSInternalContext.
315 Context::complete(r);
316 }
317 void finish(int r) override {
318 gather->sub_finish(this, r);
319 gather = 0;
320 }
321 ~C_GatherSub() override {
322 if (gather)
323 gather->sub_finish(this, 0);
324 }
325 };
326
327 public:
328 C_GatherBase(CephContext *cct_, ContextType *onfinish_)
329 : cct(cct_), result(0), onfinish(onfinish_),
330 sub_created_count(0), sub_existing_count(0),
331 lock("C_GatherBase::lock", true, false), //disable lockdep
332 activated(false)
333 {
334 mydout(cct,10) << "C_GatherBase " << this << ".new" << dendl;
335 }
336 ~C_GatherBase() override {
337 mydout(cct,10) << "C_GatherBase " << this << ".delete" << dendl;
338 }
339 void set_finisher(ContextType *onfinish_) {
340 Mutex::Locker l(lock);
341 assert(!onfinish);
342 onfinish = onfinish_;
343 }
344 void activate() {
345 lock.Lock();
346 assert(activated == false);
347 activated = true;
348 if (sub_existing_count != 0) {
349 lock.Unlock();
350 return;
351 }
352 lock.Unlock();
353 delete_me();
354 }
355 ContextType *new_sub() {
356 Mutex::Locker l(lock);
357 assert(activated == false);
358 sub_created_count++;
359 sub_existing_count++;
360 ContextType *s = new C_GatherSub(this);
361 #ifdef DEBUG_GATHER
362 waitfor.insert(s);
363 #endif
364 mydout(cct,10) << "C_GatherBase " << this << ".new_sub is " << sub_created_count << " " << s << dendl;
365 return s;
366 }
367 void finish(int r) override {
368 ceph_abort(); // nobody should ever call me.
369 }
370
371 inline int get_sub_existing_count() const {
372 Mutex::Locker l(lock);
373 return sub_existing_count;
374 }
375
376 inline int get_sub_created_count() const {
377 Mutex::Locker l(lock);
378 return sub_created_count;
379 }
380 };
381
382 /*
383 * The C_GatherBuilder remembers each C_Context created by
384 * C_GatherBuilder.new_sub() in a C_Gather. When a C_Context created
385 * by new_sub() is complete(), C_Gather forgets about it. When
386 * C_GatherBuilder notices that there are no C_Context left in
387 * C_Gather, it calls complete() on the C_Context provided as the
388 * second argument of the constructor (finisher).
389 *
390 * How to use C_GatherBuilder:
391 *
392 * 1. Create a C_GatherBuilder on the stack
393 * 2. Call gather_bld.new_sub() as many times as you want to create new subs
394 * It is safe to call this 0 times, or 100, or anything in between.
395 * 3. If you didn't supply a finisher in the C_GatherBuilder constructor,
396 * set one with gather_bld.set_finisher(my_finisher)
397 * 4. Call gather_bld.activate()
398 *
399 * Example:
400 *
401 * C_SaferCond all_done;
402 * C_GatherBuilder gb(g_ceph_context, all_done);
403 * j.submit_entry(1, first, 0, gb.new_sub()); // add a C_Context to C_Gather
404 * j.submit_entry(2, first, 0, gb.new_sub()); // add a C_Context to C_Gather
405 * gb.activate(); // consume C_Context as soon as they complete()
406 * all_done.wait(); // all_done is complete() after all new_sub() are complete()
407 *
408 * The finisher may be called at any point after step 4, including immediately
409 * from the activate() function.
410 * The finisher will never be called before activate().
411 *
412 * Note: Currently, subs must be manually freed by the caller (for some reason.)
413 */
414 template <class ContextType, class GatherType>
415 class C_GatherBuilderBase
416 {
417 public:
418 C_GatherBuilderBase(CephContext *cct_)
419 : cct(cct_), c_gather(NULL), finisher(NULL), activated(false)
420 {
421 }
422 C_GatherBuilderBase(CephContext *cct_, ContextType *finisher_)
423 : cct(cct_), c_gather(NULL), finisher(finisher_), activated(false)
424 {
425 }
426 ~C_GatherBuilderBase() {
427 if (c_gather) {
428 assert(activated); // Don't forget to activate your C_Gather!
429 }
430 else {
431 delete finisher;
432 }
433 }
434 ContextType *new_sub() {
435 if (!c_gather) {
436 c_gather = new GatherType(cct, finisher);
437 }
438 return c_gather->new_sub();
439 }
440 void activate() {
441 if (!c_gather)
442 return;
443 assert(finisher != NULL);
444 activated = true;
445 c_gather->activate();
446 }
447 void set_finisher(ContextType *finisher_) {
448 finisher = finisher_;
449 if (c_gather)
450 c_gather->set_finisher(finisher);
451 }
452 GatherType *get() const {
453 return c_gather;
454 }
455 bool has_subs() const {
456 return (c_gather != NULL);
457 }
458 int num_subs_created() {
459 assert(!activated);
460 if (c_gather == NULL)
461 return 0;
462 return c_gather->get_sub_created_count();
463 }
464 int num_subs_remaining() {
465 assert(!activated);
466 if (c_gather == NULL)
467 return 0;
468 return c_gather->get_sub_existing_count();
469 }
470
471 private:
472 CephContext *cct;
473 GatherType *c_gather;
474 ContextType *finisher;
475 bool activated;
476 };
477
478 typedef C_GatherBase<Context, Context> C_Gather;
479 typedef C_GatherBuilderBase<Context, C_Gather > C_GatherBuilder;
480
481 class FunctionContext : public Context {
482 public:
483 FunctionContext(boost::function<void(int)> &&callback)
484 : m_callback(std::move(callback))
485 {
486 }
487
488 void finish(int r) override {
489 m_callback(r);
490 }
491 private:
492 boost::function<void(int)> m_callback;
493 };
494
495 #undef mydout
496
497 #endif