]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_coroutine.h
0d0b48bddc8395b2a80a90aeedd0fe00d9dab9c6
[ceph.git] / ceph / src / rgw / rgw_coroutine.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #ifndef CEPH_RGW_COROUTINE_H
5 #define CEPH_RGW_COROUTINE_H
6
7 #ifdef _ASSERT_H
8 #define NEED_ASSERT_H
9 #pragma push_macro("_ASSERT_H")
10 #endif
11
12 #include <boost/asio.hpp>
13 #include <boost/intrusive_ptr.hpp>
14
15 #ifdef NEED_ASSERT_H
16 #pragma pop_macro("_ASSERT_H")
17 #endif
18
19 #include "include/utime.h"
20 #include "common/RefCountedObj.h"
21 #include "common/debug.h"
22 #include "common/Timer.h"
23 #include "common/admin_socket.h"
24 #include "common/RWLock.h"
25
26 #include "rgw_common.h"
27 #include "rgw_http_client_types.h"
28
29 #include <boost/asio/coroutine.hpp>
30
31 #include <atomic>
32
33 #define RGW_ASYNC_OPS_MGR_WINDOW 100
34
35 class RGWCoroutinesStack;
36 class RGWCoroutinesManager;
37 class RGWAioCompletionNotifier;
38
39 class RGWCompletionManager : public RefCountedObject {
40 friend class RGWCoroutinesManager;
41
42 CephContext *cct;
43
44 struct io_completion {
45 rgw_io_id io_id;
46 void *user_info;
47 };
48 list<io_completion> complete_reqs;
49 set<rgw_io_id> complete_reqs_set;
50 using NotifierRef = boost::intrusive_ptr<RGWAioCompletionNotifier>;
51 set<NotifierRef> cns;
52
53 ceph::mutex lock = ceph::make_mutex("RGWCompletionManager::lock");
54 ceph::condition_variable cond;
55
56 SafeTimer timer;
57
58 std::atomic<bool> going_down = { false };
59
60 map<void *, void *> waiters;
61
62 class WaitContext;
63
64 protected:
65 void _wakeup(void *opaque);
66 void _complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info);
67 public:
68 explicit RGWCompletionManager(CephContext *_cct);
69 ~RGWCompletionManager() override;
70
71 void complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info);
72 int get_next(io_completion *io);
73 bool try_get_next(io_completion *io);
74
75 void go_down();
76
77 /*
78 * wait for interval length to complete user_info
79 */
80 void wait_interval(void *opaque, const utime_t& interval, void *user_info);
81 void wakeup(void *opaque);
82
83 void register_completion_notifier(RGWAioCompletionNotifier *cn);
84 void unregister_completion_notifier(RGWAioCompletionNotifier *cn);
85 };
86
87 /* a single use librados aio completion notifier that hooks into the RGWCompletionManager */
88 class RGWAioCompletionNotifier : public RefCountedObject {
89 librados::AioCompletion *c;
90 RGWCompletionManager *completion_mgr;
91 rgw_io_id io_id;
92 void *user_data;
93 ceph::mutex lock = ceph::make_mutex("RGWAioCompletionNotifier");
94 bool registered;
95
96 public:
97 RGWAioCompletionNotifier(RGWCompletionManager *_mgr, const rgw_io_id& _io_id, void *_user_data);
98 ~RGWAioCompletionNotifier() override {
99 c->release();
100 lock.lock();
101 bool need_unregister = registered;
102 if (registered) {
103 completion_mgr->get();
104 }
105 registered = false;
106 lock.unlock();
107 if (need_unregister) {
108 completion_mgr->unregister_completion_notifier(this);
109 completion_mgr->put();
110 }
111 }
112
113 librados::AioCompletion *completion() {
114 return c;
115 }
116
117 void unregister() {
118 std::lock_guard l{lock};
119 if (!registered) {
120 return;
121 }
122 registered = false;
123 }
124
125 void cb() {
126 lock.lock();
127 if (!registered) {
128 lock.unlock();
129 put();
130 return;
131 }
132 completion_mgr->get();
133 registered = false;
134 lock.unlock();
135 completion_mgr->complete(this, io_id, user_data);
136 completion_mgr->put();
137 put();
138 }
139 };
140
141 // completion notifier with opaque payload (ie a reference-counted pointer)
142 template <typename T>
143 class RGWAioCompletionNotifierWith : public RGWAioCompletionNotifier {
144 T value;
145 public:
146 RGWAioCompletionNotifierWith(RGWCompletionManager *mgr,
147 const rgw_io_id& io_id, void *user_data,
148 T value)
149 : RGWAioCompletionNotifier(mgr, io_id, user_data), value(std::move(value))
150 {}
151 };
152
153 struct RGWCoroutinesEnv {
154 uint64_t run_context;
155 RGWCoroutinesManager *manager;
156 list<RGWCoroutinesStack *> *scheduled_stacks;
157 RGWCoroutinesStack *stack;
158
159 RGWCoroutinesEnv() : run_context(0), manager(NULL), scheduled_stacks(NULL), stack(NULL) {}
160 };
161
162 enum RGWCoroutineState {
163 RGWCoroutine_Error = -2,
164 RGWCoroutine_Done = -1,
165 RGWCoroutine_Run = 0,
166 };
167
168 struct rgw_spawned_stacks {
169 vector<RGWCoroutinesStack *> entries;
170
171 rgw_spawned_stacks() {}
172
173 void add_pending(RGWCoroutinesStack *s) {
174 entries.push_back(s);
175 }
176
177 void inherit(rgw_spawned_stacks *source) {
178 for (vector<RGWCoroutinesStack *>::iterator iter = source->entries.begin();
179 iter != source->entries.end(); ++iter) {
180 add_pending(*iter);
181 }
182 source->entries.clear();
183 }
184 };
185
186
187
188 class RGWCoroutine : public RefCountedObject, public boost::asio::coroutine {
189 friend class RGWCoroutinesStack;
190
191 struct StatusItem {
192 utime_t timestamp;
193 string status;
194
195 StatusItem(utime_t& t, const string& s) : timestamp(t), status(s) {}
196
197 void dump(Formatter *f) const;
198 };
199
200 #define MAX_COROUTINE_HISTORY 10
201
202 struct Status {
203 CephContext *cct;
204 ceph::shared_mutex lock =
205 ceph::make_shared_mutex("RGWCoroutine::Status::lock");
206 int max_history;
207
208 utime_t timestamp;
209 stringstream status;
210
211 explicit Status(CephContext *_cct) : cct(_cct), max_history(MAX_COROUTINE_HISTORY) {}
212
213 deque<StatusItem> history;
214
215 stringstream& set_status();
216 } status;
217
218 stringstream description;
219
220 protected:
221 bool _yield_ret;
222
223 struct {
224 boost::asio::coroutine cr;
225 bool should_exit{false};
226 int ret{0};
227
228 void init() {
229 cr = boost::asio::coroutine();
230 should_exit = false;
231 ret = 0;
232 }
233 } drain_status;
234
235 CephContext *cct;
236
237 RGWCoroutinesStack *stack;
238 int retcode;
239 int state;
240
241 rgw_spawned_stacks spawned;
242
243 stringstream error_stream;
244
245 int set_state(int s, int ret = 0) {
246 retcode = ret;
247 state = s;
248 return ret;
249 }
250 int set_cr_error(int ret) {
251 return set_state(RGWCoroutine_Error, ret);
252 }
253 int set_cr_done() {
254 return set_state(RGWCoroutine_Done, 0);
255 }
256 void set_io_blocked(bool flag);
257
258 void reset_description() {
259 description.str(string());
260 }
261
262 stringstream& set_description() {
263 return description;
264 }
265 stringstream& set_status() {
266 return status.set_status();
267 }
268
269 stringstream& set_status(const string& s) {
270 stringstream& status = set_status();
271 status << s;
272 return status;
273 }
274
275 virtual int operate_wrapper() {
276 return operate();
277 }
278 public:
279 RGWCoroutine(CephContext *_cct) : status(_cct), _yield_ret(false), cct(_cct), stack(NULL), retcode(0), state(RGWCoroutine_Run) {}
280 ~RGWCoroutine() override;
281
282 virtual int operate() = 0;
283
284 bool is_done() { return (state == RGWCoroutine_Done || state == RGWCoroutine_Error); }
285 bool is_error() { return (state == RGWCoroutine_Error); }
286
287 stringstream& log_error() { return error_stream; }
288 string error_str() {
289 return error_stream.str();
290 }
291
292 void set_retcode(int r) {
293 retcode = r;
294 }
295
296 int get_ret_status() {
297 return retcode;
298 }
299
300 void call(RGWCoroutine *op); /* call at the same stack we're in */
301 RGWCoroutinesStack *spawn(RGWCoroutine *op, bool wait); /* execute on a different stack */
302 bool collect(int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id = nullptr); /* returns true if needs to be called again */
303 bool collect_next(int *ret, RGWCoroutinesStack **collected_stack = NULL); /* returns true if found a stack to collect */
304
305 RGWCoroutinesStack *prealloc_stack(); /* prepare a stack that will be used in the next spawn operation */
306 uint64_t prealloc_stack_id(); /* prepare a stack that will be used in the next spawn operation, return its id */
307
308 int wait(const utime_t& interval);
309 bool drain_children(int num_cr_left,
310 RGWCoroutinesStack *skip_stack = nullptr,
311 std::optional<std::function<void(uint64_t stack_id, int ret)> > cb = std::nullopt); /* returns true if needed to be called again,
312 cb will be called on completion of every
313 completion. */
314 bool drain_children(int num_cr_left,
315 std::optional<std::function<int(uint64_t stack_id, int ret)> > cb); /* returns true if needed to be called again,
316 cb will be called on every completion, can filter errors.
317 A negative return value from cb means that current cr
318 will need to exit */
319 void wakeup();
320 void set_sleeping(bool flag); /* put in sleep, or wakeup from sleep */
321
322 size_t num_spawned() {
323 return spawned.entries.size();
324 }
325
326 void wait_for_child();
327
328 virtual string to_str() const;
329
330 RGWCoroutinesStack *get_stack() const {
331 return stack;
332 }
333
334 RGWCoroutinesEnv *get_env() const;
335
336 void dump(Formatter *f) const;
337
338 void init_new_io(RGWIOProvider *io_provider); /* only links the default io id */
339
340 int io_block(int ret = 0) {
341 return io_block(ret, -1);
342 }
343 int io_block(int ret, int64_t io_id);
344 int io_block(int ret, const rgw_io_id& io_id);
345 void io_complete() {
346 io_complete(rgw_io_id{});
347 }
348 void io_complete(const rgw_io_id& io_id);
349 };
350
351 ostream& operator<<(ostream& out, const RGWCoroutine& cr);
352
353 #define yield_until_true(x) \
354 do { \
355 do { \
356 yield _yield_ret = x; \
357 } while (!_yield_ret); \
358 _yield_ret = false; \
359 } while (0)
360
361 #define drain_all() \
362 drain_status.init(); \
363 yield_until_true(drain_children(0))
364
365 #define drain_all_but(n) \
366 drain_status.init(); \
367 yield_until_true(drain_children(n))
368
369 #define drain_all_but_stack(stack) \
370 drain_status.init(); \
371 yield_until_true(drain_children(1, stack))
372
373 #define drain_all_but_stack_cb(stack, cb) \
374 drain_status.init(); \
375 yield_until_true(drain_children(1, stack, cb))
376
377 #define drain_with_cb(n, cb) \
378 drain_status.init(); \
379 yield_until_true(drain_children(n, cb)); \
380 if (drain_status.should_exit) { \
381 return set_cr_error(drain_status.ret); \
382 }
383
384 #define drain_all_cb(cb) \
385 drain_with_cb(0, cb)
386
387 #define yield_spawn_window(cr, n, cb) \
388 do { \
389 spawn(cr, false); \
390 drain_with_cb(n, cb); /* this is guaranteed to yield */ \
391 } while (0)
392
393
394
395 template <class T>
396 class RGWConsumerCR : public RGWCoroutine {
397 list<T> product;
398
399 public:
400 explicit RGWConsumerCR(CephContext *_cct) : RGWCoroutine(_cct) {}
401
402 bool has_product() {
403 return !product.empty();
404 }
405
406 void wait_for_product() {
407 if (!has_product()) {
408 set_sleeping(true);
409 }
410 }
411
412 bool consume(T *p) {
413 if (product.empty()) {
414 return false;
415 }
416 *p = product.front();
417 product.pop_front();
418 return true;
419 }
420
421 void receive(const T& p, bool wakeup = true);
422 void receive(list<T>& l, bool wakeup = true);
423 };
424
425 class RGWCoroutinesStack : public RefCountedObject {
426 friend class RGWCoroutine;
427 friend class RGWCoroutinesManager;
428
429 CephContext *cct;
430
431 int64_t id{-1};
432
433 RGWCoroutinesManager *ops_mgr;
434
435 list<RGWCoroutine *> ops;
436 list<RGWCoroutine *>::iterator pos;
437
438 rgw_spawned_stacks spawned;
439
440 RGWCoroutinesStack *preallocated_stack{nullptr};
441
442 set<RGWCoroutinesStack *> blocked_by_stack;
443 set<RGWCoroutinesStack *> blocking_stacks;
444
445 map<int64_t, rgw_io_id> io_finish_ids;
446 rgw_io_id io_blocked_id;
447
448 bool done_flag;
449 bool error_flag;
450 bool blocked_flag;
451 bool sleep_flag;
452 bool interval_wait_flag;
453
454 bool is_scheduled;
455
456 bool is_waiting_for_child;
457
458 int retcode;
459
460 uint64_t run_count;
461
462 protected:
463 RGWCoroutinesEnv *env;
464 RGWCoroutinesStack *parent;
465
466 RGWCoroutinesStack *spawn(RGWCoroutine *source_op, RGWCoroutine *next_op, bool wait);
467 bool collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id); /* returns true if needs to be called again */
468 bool collect_next(RGWCoroutine *op, int *ret, RGWCoroutinesStack **collected_stack); /* returns true if found a stack to collect */
469 public:
470 RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start = NULL);
471 ~RGWCoroutinesStack() override;
472
473 int64_t get_id() const {
474 return id;
475 }
476
477 int operate(RGWCoroutinesEnv *env);
478
479 bool is_done() {
480 return done_flag;
481 }
482 bool is_error() {
483 return error_flag;
484 }
485 bool is_blocked_by_stack() {
486 return !blocked_by_stack.empty();
487 }
488 void set_io_blocked(bool flag) {
489 blocked_flag = flag;
490 }
491 void set_io_blocked_id(const rgw_io_id& io_id) {
492 io_blocked_id = io_id;
493 }
494 bool is_io_blocked() {
495 return blocked_flag && !done_flag;
496 }
497 bool can_io_unblock(const rgw_io_id& io_id) {
498 return ((io_blocked_id.id < 0) ||
499 io_blocked_id.intersects(io_id));
500 }
501 bool try_io_unblock(const rgw_io_id& io_id);
502 bool consume_io_finish(const rgw_io_id& io_id);
503 void set_interval_wait(bool flag) {
504 interval_wait_flag = flag;
505 }
506 bool is_interval_waiting() {
507 return interval_wait_flag;
508 }
509 void set_sleeping(bool flag) {
510 bool wakeup = sleep_flag & !flag;
511 sleep_flag = flag;
512 if (wakeup) {
513 schedule();
514 }
515 }
516 bool is_sleeping() {
517 return sleep_flag;
518 }
519 void set_is_scheduled(bool flag) {
520 is_scheduled = flag;
521 }
522
523 bool is_blocked() {
524 return is_blocked_by_stack() || is_sleeping() ||
525 is_io_blocked() || waiting_for_child() ;
526 }
527
528 void schedule();
529 void _schedule();
530
531 int get_ret_status() {
532 return retcode;
533 }
534
535 string error_str();
536
537 void call(RGWCoroutine *next_op);
538 RGWCoroutinesStack *spawn(RGWCoroutine *next_op, bool wait);
539 RGWCoroutinesStack *prealloc_stack();
540 int unwind(int retcode);
541
542 int wait(const utime_t& interval);
543 void wakeup();
544 void io_complete() {
545 io_complete(rgw_io_id{});
546 }
547 void io_complete(const rgw_io_id& io_id);
548
549 bool collect(int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id); /* returns true if needs to be called again */
550
551 void cancel();
552
553 RGWAioCompletionNotifier *create_completion_notifier();
554 template <typename T>
555 RGWAioCompletionNotifier *create_completion_notifier(T value);
556 RGWCompletionManager *get_completion_mgr();
557
558 void set_blocked_by(RGWCoroutinesStack *s) {
559 blocked_by_stack.insert(s);
560 s->blocking_stacks.insert(this);
561 }
562
563 void set_wait_for_child(bool flag) {
564 is_waiting_for_child = flag;
565 }
566
567 bool waiting_for_child() {
568 return is_waiting_for_child;
569 }
570
571 bool unblock_stack(RGWCoroutinesStack **s);
572
573 RGWCoroutinesEnv *get_env() const { return env; }
574
575 void dump(Formatter *f) const;
576
577 void init_new_io(RGWIOProvider *io_provider);
578 };
579
580 template <class T>
581 void RGWConsumerCR<T>::receive(list<T>& l, bool wakeup)
582 {
583 product.splice(product.end(), l);
584 if (wakeup) {
585 set_sleeping(false);
586 }
587 }
588
589
590 template <class T>
591 void RGWConsumerCR<T>::receive(const T& p, bool wakeup)
592 {
593 product.push_back(p);
594 if (wakeup) {
595 set_sleeping(false);
596 }
597 }
598
599 class RGWCoroutinesManagerRegistry : public RefCountedObject, public AdminSocketHook {
600 CephContext *cct;
601
602 set<RGWCoroutinesManager *> managers;
603 ceph::shared_mutex lock =
604 ceph::make_shared_mutex("RGWCoroutinesRegistry::lock");
605
606 string admin_command;
607
608 public:
609 explicit RGWCoroutinesManagerRegistry(CephContext *_cct) : cct(_cct) {}
610 ~RGWCoroutinesManagerRegistry() override;
611
612 void add(RGWCoroutinesManager *mgr);
613 void remove(RGWCoroutinesManager *mgr);
614
615 int hook_to_admin_command(const string& command);
616 int call(std::string_view command, const cmdmap_t& cmdmap,
617 Formatter *f,
618 std::ostream& ss,
619 bufferlist& out) override;
620
621 void dump(Formatter *f) const;
622 };
623
624 class RGWCoroutinesManager {
625 CephContext *cct;
626 std::atomic<bool> going_down = { false };
627
628 std::atomic<int64_t> run_context_count = { 0 };
629 map<uint64_t, set<RGWCoroutinesStack *> > run_contexts;
630
631 std::atomic<int64_t> max_io_id = { 0 };
632 std::atomic<uint64_t> max_stack_id = { 0 };
633
634 mutable ceph::shared_mutex lock =
635 ceph::make_shared_mutex("RGWCoroutinesManager::lock");
636
637 RGWIOIDProvider io_id_provider;
638
639 void handle_unblocked_stack(set<RGWCoroutinesStack *>& context_stacks, list<RGWCoroutinesStack *>& scheduled_stacks,
640 RGWCompletionManager::io_completion& io, int *waiting_count);
641 protected:
642 RGWCompletionManager *completion_mgr;
643 RGWCoroutinesManagerRegistry *cr_registry;
644
645 int ops_window;
646
647 string id;
648
649 void put_completion_notifier(RGWAioCompletionNotifier *cn);
650 public:
651 RGWCoroutinesManager(CephContext *_cct, RGWCoroutinesManagerRegistry *_cr_registry) : cct(_cct),
652 cr_registry(_cr_registry), ops_window(RGW_ASYNC_OPS_MGR_WINDOW) {
653 completion_mgr = new RGWCompletionManager(cct);
654 if (cr_registry) {
655 cr_registry->add(this);
656 }
657 }
658 virtual ~RGWCoroutinesManager() {
659 stop();
660 completion_mgr->put();
661 if (cr_registry) {
662 cr_registry->remove(this);
663 }
664 }
665
666 int run(list<RGWCoroutinesStack *>& ops);
667 int run(RGWCoroutine *op);
668 void stop() {
669 bool expected = false;
670 if (going_down.compare_exchange_strong(expected, true)) {
671 completion_mgr->go_down();
672 }
673 }
674
675 virtual void report_error(RGWCoroutinesStack *op);
676
677 RGWAioCompletionNotifier *create_completion_notifier(RGWCoroutinesStack *stack);
678 template <typename T>
679 RGWAioCompletionNotifier *create_completion_notifier(RGWCoroutinesStack *stack, T value);
680 RGWCompletionManager *get_completion_mgr() { return completion_mgr; }
681
682 void schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack);
683 void _schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack);
684 RGWCoroutinesStack *allocate_stack();
685
686 int64_t get_next_io_id();
687 uint64_t get_next_stack_id();
688
689 void set_sleeping(RGWCoroutine *cr, bool flag);
690 void io_complete(RGWCoroutine *cr, const rgw_io_id& io_id);
691
692 virtual string get_id();
693 void dump(Formatter *f) const;
694
695 RGWIOIDProvider& get_io_id_provider() {
696 return io_id_provider;
697 }
698 };
699
700 template <typename T>
701 RGWAioCompletionNotifier *RGWCoroutinesManager::create_completion_notifier(RGWCoroutinesStack *stack, T value)
702 {
703 rgw_io_id io_id{get_next_io_id(), -1};
704 RGWAioCompletionNotifier *cn = new RGWAioCompletionNotifierWith<T>(completion_mgr, io_id, (void *)stack, std::move(value));
705 completion_mgr->register_completion_notifier(cn);
706 return cn;
707 }
708
709 template <typename T>
710 RGWAioCompletionNotifier *RGWCoroutinesStack::create_completion_notifier(T value)
711 {
712 return ops_mgr->create_completion_notifier(this, std::move(value));
713 }
714
715 class RGWSimpleCoroutine : public RGWCoroutine {
716 bool called_cleanup;
717
718 int operate() override;
719
720 int state_init();
721 int state_send_request();
722 int state_request_complete();
723 int state_all_complete();
724
725 void call_cleanup();
726
727 public:
728 RGWSimpleCoroutine(CephContext *_cct) : RGWCoroutine(_cct), called_cleanup(false) {}
729 ~RGWSimpleCoroutine() override;
730
731 virtual int init() { return 0; }
732 virtual int send_request() = 0;
733 virtual int request_complete() = 0;
734 virtual int finish() { return 0; }
735 virtual void request_cleanup() {}
736 };
737
738 #endif