]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | #ifndef CEPH_RGW_COROUTINE_H |
2 | #define CEPH_RGW_COROUTINE_H | |
3 | ||
4 | #ifdef _ASSERT_H | |
5 | #define NEED_ASSERT_H | |
6 | #pragma push_macro("_ASSERT_H") | |
7 | #endif | |
8 | ||
7c673cae FG |
9 | #include <boost/intrusive_ptr.hpp> |
10 | ||
11 | #ifdef NEED_ASSERT_H | |
12 | #pragma pop_macro("_ASSERT_H") | |
13 | #endif | |
14 | ||
15 | #include "include/utime.h" | |
16 | #include "common/RefCountedObj.h" | |
17 | #include "common/debug.h" | |
18 | #include "common/Timer.h" | |
19 | #include "common/admin_socket.h" | |
20 | ||
21 | #include "rgw_common.h" | |
31f18b77 | 22 | #include <boost/asio/coroutine.hpp> |
7c673cae FG |
23 | |
24 | #include <atomic> | |
25 | ||
26 | #define RGW_ASYNC_OPS_MGR_WINDOW 100 | |
27 | ||
28 | class RGWCoroutinesStack; | |
29 | class RGWCoroutinesManager; | |
30 | class RGWAioCompletionNotifier; | |
31 | ||
32 | class RGWCompletionManager : public RefCountedObject { | |
33 | CephContext *cct; | |
34 | list<void *> complete_reqs; | |
35 | using NotifierRef = boost::intrusive_ptr<RGWAioCompletionNotifier>; | |
36 | set<NotifierRef> cns; | |
37 | ||
38 | Mutex lock; | |
39 | Cond cond; | |
40 | ||
41 | SafeTimer timer; | |
42 | ||
43 | std::atomic<bool> going_down = { false }; | |
44 | ||
45 | map<void *, void *> waiters; | |
46 | ||
47 | class WaitContext; | |
48 | ||
49 | protected: | |
50 | void _wakeup(void *opaque); | |
51 | void _complete(RGWAioCompletionNotifier *cn, void *user_info); | |
52 | public: | |
53 | RGWCompletionManager(CephContext *_cct); | |
54 | ~RGWCompletionManager() override; | |
55 | ||
56 | void complete(RGWAioCompletionNotifier *cn, void *user_info); | |
57 | int get_next(void **user_info); | |
58 | bool try_get_next(void **user_info); | |
59 | ||
60 | void go_down(); | |
61 | ||
62 | /* | |
63 | * wait for interval length to complete user_info | |
64 | */ | |
65 | void wait_interval(void *opaque, const utime_t& interval, void *user_info); | |
66 | void wakeup(void *opaque); | |
67 | ||
68 | void register_completion_notifier(RGWAioCompletionNotifier *cn); | |
69 | void unregister_completion_notifier(RGWAioCompletionNotifier *cn); | |
70 | }; | |
71 | ||
72 | /* a single use librados aio completion notifier that hooks into the RGWCompletionManager */ | |
73 | class RGWAioCompletionNotifier : public RefCountedObject { | |
74 | librados::AioCompletion *c; | |
75 | RGWCompletionManager *completion_mgr; | |
76 | void *user_data; | |
77 | Mutex lock; | |
78 | bool registered; | |
79 | ||
80 | public: | |
81 | RGWAioCompletionNotifier(RGWCompletionManager *_mgr, void *_user_data); | |
82 | ~RGWAioCompletionNotifier() override { | |
83 | c->release(); | |
84 | lock.Lock(); | |
85 | bool need_unregister = registered; | |
86 | if (registered) { | |
87 | completion_mgr->get(); | |
88 | } | |
89 | registered = false; | |
90 | lock.Unlock(); | |
91 | if (need_unregister) { | |
92 | completion_mgr->unregister_completion_notifier(this); | |
93 | completion_mgr->put(); | |
94 | } | |
95 | } | |
96 | ||
97 | librados::AioCompletion *completion() { | |
98 | return c; | |
99 | } | |
100 | ||
101 | void unregister() { | |
102 | Mutex::Locker l(lock); | |
103 | if (!registered) { | |
104 | return; | |
105 | } | |
106 | registered = false; | |
107 | } | |
108 | ||
109 | void cb() { | |
110 | lock.Lock(); | |
111 | if (!registered) { | |
112 | lock.Unlock(); | |
113 | put(); | |
114 | return; | |
115 | } | |
116 | completion_mgr->get(); | |
117 | registered = false; | |
118 | lock.Unlock(); | |
119 | completion_mgr->complete(this, user_data); | |
120 | completion_mgr->put(); | |
121 | put(); | |
122 | } | |
123 | }; | |
124 | ||
125 | struct RGWCoroutinesEnv { | |
126 | uint64_t run_context; | |
127 | RGWCoroutinesManager *manager; | |
128 | list<RGWCoroutinesStack *> *scheduled_stacks; | |
129 | RGWCoroutinesStack *stack; | |
130 | ||
131 | RGWCoroutinesEnv() : run_context(0), manager(NULL), scheduled_stacks(NULL), stack(NULL) {} | |
132 | }; | |
133 | ||
134 | enum RGWCoroutineState { | |
135 | RGWCoroutine_Error = -2, | |
136 | RGWCoroutine_Done = -1, | |
137 | RGWCoroutine_Run = 0, | |
138 | }; | |
139 | ||
140 | struct rgw_spawned_stacks { | |
141 | vector<RGWCoroutinesStack *> entries; | |
142 | ||
143 | rgw_spawned_stacks() {} | |
144 | ||
145 | void add_pending(RGWCoroutinesStack *s) { | |
146 | entries.push_back(s); | |
147 | } | |
148 | ||
149 | void inherit(rgw_spawned_stacks *source) { | |
150 | for (vector<RGWCoroutinesStack *>::iterator iter = source->entries.begin(); | |
151 | iter != source->entries.end(); ++iter) { | |
152 | add_pending(*iter); | |
153 | } | |
154 | source->entries.clear(); | |
155 | } | |
156 | }; | |
157 | ||
158 | ||
159 | ||
160 | class RGWCoroutine : public RefCountedObject, public boost::asio::coroutine { | |
161 | friend class RGWCoroutinesStack; | |
162 | ||
163 | struct StatusItem { | |
164 | utime_t timestamp; | |
165 | string status; | |
166 | ||
167 | StatusItem(utime_t& t, const string& s) : timestamp(t), status(s) {} | |
168 | ||
169 | void dump(Formatter *f) const; | |
170 | }; | |
171 | ||
172 | #define MAX_COROUTINE_HISTORY 10 | |
173 | ||
174 | struct Status { | |
175 | CephContext *cct; | |
176 | RWLock lock; | |
177 | int max_history; | |
178 | ||
179 | utime_t timestamp; | |
180 | stringstream status; | |
181 | ||
182 | Status(CephContext *_cct) : cct(_cct), lock("RGWCoroutine::Status::lock"), max_history(MAX_COROUTINE_HISTORY) {} | |
183 | ||
184 | deque<StatusItem> history; | |
185 | ||
186 | stringstream& set_status(); | |
187 | } status; | |
188 | ||
189 | stringstream description; | |
190 | ||
191 | protected: | |
192 | bool _yield_ret; | |
193 | boost::asio::coroutine drain_cr; | |
194 | ||
195 | CephContext *cct; | |
196 | ||
197 | RGWCoroutinesStack *stack; | |
198 | int retcode; | |
199 | int state; | |
200 | ||
201 | rgw_spawned_stacks spawned; | |
202 | ||
203 | stringstream error_stream; | |
204 | ||
205 | int set_state(int s, int ret = 0) { | |
206 | state = s; | |
207 | return ret; | |
208 | } | |
209 | int set_cr_error(int ret) { | |
210 | state = RGWCoroutine_Error; | |
211 | return ret; | |
212 | } | |
213 | int set_cr_done() { | |
214 | state = RGWCoroutine_Done; | |
215 | return 0; | |
216 | } | |
217 | void set_io_blocked(bool flag); | |
218 | int io_block(int ret = 0); | |
219 | ||
220 | void reset_description() { | |
221 | description.str(string()); | |
222 | } | |
223 | ||
224 | stringstream& set_description() { | |
225 | return description; | |
226 | } | |
227 | stringstream& set_status() { | |
228 | return status.set_status(); | |
229 | } | |
230 | ||
231 | stringstream& set_status(const string& s) { | |
232 | stringstream& status = set_status(); | |
233 | status << s; | |
234 | return status; | |
235 | } | |
236 | ||
237 | public: | |
238 | RGWCoroutine(CephContext *_cct) : status(_cct), _yield_ret(false), cct(_cct), stack(NULL), retcode(0), state(RGWCoroutine_Run) {} | |
239 | ~RGWCoroutine() override; | |
240 | ||
241 | virtual int operate() = 0; | |
242 | ||
243 | bool is_done() { return (state == RGWCoroutine_Done || state == RGWCoroutine_Error); } | |
244 | bool is_error() { return (state == RGWCoroutine_Error); } | |
245 | ||
246 | stringstream& log_error() { return error_stream; } | |
247 | string error_str() { | |
248 | return error_stream.str(); | |
249 | } | |
250 | ||
251 | void set_retcode(int r) { | |
252 | retcode = r; | |
253 | } | |
254 | ||
255 | int get_ret_status() { | |
256 | return retcode; | |
257 | } | |
258 | ||
259 | void call(RGWCoroutine *op); /* call at the same stack we're in */ | |
260 | RGWCoroutinesStack *spawn(RGWCoroutine *op, bool wait); /* execute on a different stack */ | |
261 | bool collect(int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */ | |
262 | bool collect_next(int *ret, RGWCoroutinesStack **collected_stack = NULL); /* returns true if found a stack to collect */ | |
263 | ||
264 | int wait(const utime_t& interval); | |
265 | bool drain_children(int num_cr_left, RGWCoroutinesStack *skip_stack = NULL); /* returns true if needed to be called again */ | |
266 | void wakeup(); | |
267 | void set_sleeping(bool flag); /* put in sleep, or wakeup from sleep */ | |
268 | ||
269 | size_t num_spawned() { | |
270 | return spawned.entries.size(); | |
271 | } | |
272 | ||
273 | void wait_for_child(); | |
274 | ||
275 | virtual string to_str() const; | |
276 | ||
277 | RGWCoroutinesStack *get_stack() const { | |
278 | return stack; | |
279 | } | |
280 | ||
281 | void dump(Formatter *f) const; | |
282 | }; | |
283 | ||
284 | ostream& operator<<(ostream& out, const RGWCoroutine& cr); | |
285 | ||
286 | #define yield_until_true(x) \ | |
287 | do { \ | |
288 | do { \ | |
289 | yield _yield_ret = x; \ | |
290 | } while (!_yield_ret); \ | |
291 | _yield_ret = false; \ | |
292 | } while (0) | |
293 | ||
294 | #define drain_all() \ | |
295 | drain_cr = boost::asio::coroutine(); \ | |
296 | yield_until_true(drain_children(0)) | |
297 | ||
298 | #define drain_all_but(n) \ | |
299 | drain_cr = boost::asio::coroutine(); \ | |
300 | yield_until_true(drain_children(n)) | |
301 | ||
302 | #define drain_all_but_stack(stack) \ | |
303 | drain_cr = boost::asio::coroutine(); \ | |
304 | yield_until_true(drain_children(1, stack)) | |
305 | ||
306 | template <class T> | |
307 | class RGWConsumerCR : public RGWCoroutine { | |
308 | list<T> product; | |
309 | ||
310 | public: | |
311 | RGWConsumerCR(CephContext *_cct) : RGWCoroutine(_cct) {} | |
312 | ||
313 | bool has_product() { | |
314 | return !product.empty(); | |
315 | } | |
316 | ||
317 | void wait_for_product() { | |
318 | if (!has_product()) { | |
319 | set_sleeping(true); | |
320 | } | |
321 | } | |
322 | ||
323 | bool consume(T *p) { | |
324 | if (product.empty()) { | |
325 | return false; | |
326 | } | |
327 | *p = product.front(); | |
328 | product.pop_front(); | |
329 | return true; | |
330 | } | |
331 | ||
332 | void receive(const T& p, bool wakeup = true); | |
333 | void receive(list<T>& l, bool wakeup = true); | |
334 | }; | |
335 | ||
336 | class RGWCoroutinesStack : public RefCountedObject { | |
337 | friend class RGWCoroutine; | |
338 | friend class RGWCoroutinesManager; | |
339 | ||
340 | CephContext *cct; | |
341 | ||
342 | RGWCoroutinesManager *ops_mgr; | |
343 | ||
344 | list<RGWCoroutine *> ops; | |
345 | list<RGWCoroutine *>::iterator pos; | |
346 | ||
347 | rgw_spawned_stacks spawned; | |
348 | ||
349 | set<RGWCoroutinesStack *> blocked_by_stack; | |
350 | set<RGWCoroutinesStack *> blocking_stacks; | |
351 | ||
352 | bool done_flag; | |
353 | bool error_flag; | |
354 | bool blocked_flag; | |
355 | bool sleep_flag; | |
356 | bool interval_wait_flag; | |
357 | ||
358 | bool is_scheduled; | |
359 | ||
360 | bool is_waiting_for_child; | |
361 | ||
362 | int retcode; | |
363 | ||
364 | uint64_t run_count; | |
365 | ||
366 | protected: | |
367 | RGWCoroutinesEnv *env; | |
368 | RGWCoroutinesStack *parent; | |
369 | ||
370 | RGWCoroutinesStack *spawn(RGWCoroutine *source_op, RGWCoroutine *next_op, bool wait); | |
371 | bool collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */ | |
372 | bool collect_next(RGWCoroutine *op, int *ret, RGWCoroutinesStack **collected_stack); /* returns true if found a stack to collect */ | |
373 | public: | |
374 | RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start = NULL); | |
375 | ~RGWCoroutinesStack() override; | |
376 | ||
377 | int operate(RGWCoroutinesEnv *env); | |
378 | ||
379 | bool is_done() { | |
380 | return done_flag; | |
381 | } | |
382 | bool is_error() { | |
383 | return error_flag; | |
384 | } | |
385 | bool is_blocked_by_stack() { | |
386 | return !blocked_by_stack.empty(); | |
387 | } | |
388 | void set_io_blocked(bool flag) { | |
389 | blocked_flag = flag; | |
390 | } | |
391 | bool is_io_blocked() { | |
392 | return blocked_flag; | |
393 | } | |
394 | void set_interval_wait(bool flag) { | |
395 | interval_wait_flag = flag; | |
396 | } | |
397 | bool is_interval_waiting() { | |
398 | return interval_wait_flag; | |
399 | } | |
400 | void set_sleeping(bool flag) { | |
401 | bool wakeup = sleep_flag & !flag; | |
402 | sleep_flag = flag; | |
403 | if (wakeup) { | |
404 | schedule(); | |
405 | } | |
406 | } | |
407 | bool is_sleeping() { | |
408 | return sleep_flag; | |
409 | } | |
410 | void set_is_scheduled(bool flag) { | |
411 | is_scheduled = flag; | |
412 | } | |
413 | ||
414 | bool is_blocked() { | |
415 | return is_blocked_by_stack() || is_sleeping() || | |
416 | is_io_blocked() || waiting_for_child() ; | |
417 | } | |
418 | ||
419 | void schedule(list<RGWCoroutinesStack *> *stacks = NULL) { | |
420 | if (!stacks) { | |
421 | stacks = env->scheduled_stacks; | |
422 | } | |
423 | if (!is_scheduled) { | |
424 | stacks->push_back(this); | |
425 | is_scheduled = true; | |
426 | } | |
427 | } | |
428 | ||
429 | int get_ret_status() { | |
430 | return retcode; | |
431 | } | |
432 | ||
433 | string error_str(); | |
434 | ||
435 | void call(RGWCoroutine *next_op); | |
436 | RGWCoroutinesStack *spawn(RGWCoroutine *next_op, bool wait); | |
437 | int unwind(int retcode); | |
438 | ||
439 | int wait(const utime_t& interval); | |
440 | void wakeup(); | |
441 | ||
442 | bool collect(int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */ | |
443 | ||
444 | RGWAioCompletionNotifier *create_completion_notifier(); | |
445 | RGWCompletionManager *get_completion_mgr(); | |
446 | ||
447 | void set_blocked_by(RGWCoroutinesStack *s) { | |
448 | blocked_by_stack.insert(s); | |
449 | s->blocking_stacks.insert(this); | |
450 | } | |
451 | ||
452 | void set_wait_for_child(bool flag) { | |
453 | is_waiting_for_child = flag; | |
454 | } | |
455 | ||
456 | bool waiting_for_child() { | |
457 | return is_waiting_for_child; | |
458 | } | |
459 | ||
460 | bool unblock_stack(RGWCoroutinesStack **s); | |
461 | ||
462 | RGWCoroutinesEnv *get_env() { return env; } | |
463 | ||
464 | void dump(Formatter *f) const; | |
465 | }; | |
466 | ||
467 | template <class T> | |
468 | void RGWConsumerCR<T>::receive(list<T>& l, bool wakeup) | |
469 | { | |
470 | product.splice(product.end(), l); | |
471 | if (wakeup) { | |
472 | set_sleeping(false); | |
473 | } | |
474 | } | |
475 | ||
476 | ||
477 | template <class T> | |
478 | void RGWConsumerCR<T>::receive(const T& p, bool wakeup) | |
479 | { | |
480 | product.push_back(p); | |
481 | if (wakeup) { | |
482 | set_sleeping(false); | |
483 | } | |
484 | } | |
485 | ||
486 | class RGWCoroutinesManagerRegistry : public RefCountedObject, public AdminSocketHook { | |
487 | CephContext *cct; | |
488 | ||
489 | set<RGWCoroutinesManager *> managers; | |
490 | RWLock lock; | |
491 | ||
492 | string admin_command; | |
493 | ||
494 | public: | |
495 | RGWCoroutinesManagerRegistry(CephContext *_cct) : cct(_cct), lock("RGWCoroutinesRegistry::lock") {} | |
496 | ~RGWCoroutinesManagerRegistry() override; | |
497 | ||
498 | void add(RGWCoroutinesManager *mgr); | |
499 | void remove(RGWCoroutinesManager *mgr); | |
500 | ||
501 | int hook_to_admin_command(const string& command); | |
502 | bool call(std::string command, cmdmap_t& cmdmap, std::string format, | |
503 | bufferlist& out) override; | |
504 | ||
505 | void dump(Formatter *f) const; | |
506 | }; | |
507 | ||
508 | class RGWCoroutinesManager { | |
509 | CephContext *cct; | |
510 | std::atomic<bool> going_down = { false }; | |
511 | ||
512 | std::atomic<int64_t> run_context_count = { 0 }; | |
513 | map<uint64_t, set<RGWCoroutinesStack *> > run_contexts; | |
514 | ||
515 | RWLock lock; | |
516 | ||
517 | void handle_unblocked_stack(set<RGWCoroutinesStack *>& context_stacks, list<RGWCoroutinesStack *>& scheduled_stacks, RGWCoroutinesStack *stack, int *waiting_count); | |
518 | protected: | |
519 | RGWCompletionManager *completion_mgr; | |
520 | RGWCoroutinesManagerRegistry *cr_registry; | |
521 | ||
522 | int ops_window; | |
523 | ||
524 | string id; | |
525 | ||
526 | void put_completion_notifier(RGWAioCompletionNotifier *cn); | |
527 | public: | |
528 | RGWCoroutinesManager(CephContext *_cct, RGWCoroutinesManagerRegistry *_cr_registry) : cct(_cct), lock("RGWCoroutinesManager::lock"), | |
529 | cr_registry(_cr_registry), ops_window(RGW_ASYNC_OPS_MGR_WINDOW) { | |
530 | completion_mgr = new RGWCompletionManager(cct); | |
531 | if (cr_registry) { | |
532 | cr_registry->add(this); | |
533 | } | |
534 | } | |
535 | virtual ~RGWCoroutinesManager() { | |
536 | stop(); | |
537 | completion_mgr->put(); | |
538 | if (cr_registry) { | |
539 | cr_registry->remove(this); | |
540 | } | |
541 | } | |
542 | ||
543 | int run(list<RGWCoroutinesStack *>& ops); | |
544 | int run(RGWCoroutine *op); | |
545 | void stop() { | |
546 | bool expected = false; | |
547 | if (going_down.compare_exchange_strong(expected, true)) { | |
548 | completion_mgr->go_down(); | |
549 | } | |
550 | } | |
551 | ||
552 | virtual void report_error(RGWCoroutinesStack *op); | |
553 | ||
554 | RGWAioCompletionNotifier *create_completion_notifier(RGWCoroutinesStack *stack); | |
555 | RGWCompletionManager *get_completion_mgr() { return completion_mgr; } | |
556 | ||
557 | void schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack); | |
558 | RGWCoroutinesStack *allocate_stack(); | |
559 | ||
560 | virtual string get_id(); | |
561 | void dump(Formatter *f) const; | |
562 | }; | |
563 | ||
564 | class RGWSimpleCoroutine : public RGWCoroutine { | |
565 | bool called_cleanup; | |
566 | ||
567 | int operate() override; | |
568 | ||
569 | int state_init(); | |
570 | int state_send_request(); | |
571 | int state_request_complete(); | |
572 | int state_all_complete(); | |
573 | ||
574 | void call_cleanup(); | |
575 | ||
576 | public: | |
577 | RGWSimpleCoroutine(CephContext *_cct) : RGWCoroutine(_cct), called_cleanup(false) {} | |
578 | ~RGWSimpleCoroutine() override; | |
579 | ||
580 | virtual int init() { return 0; } | |
581 | virtual int send_request() = 0; | |
582 | virtual int request_complete() = 0; | |
583 | virtual int finish() { return 0; } | |
584 | virtual void request_cleanup() {} | |
585 | }; | |
586 | ||
587 | #endif |