]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_coroutine.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / rgw / rgw_coroutine.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #ifndef CEPH_RGW_COROUTINE_H
5 #define CEPH_RGW_COROUTINE_H
6
7 #ifdef _ASSERT_H
8 #define NEED_ASSERT_H
9 #pragma push_macro("_ASSERT_H")
10 #endif
11
12 #include <boost/asio.hpp>
13 #include <boost/intrusive_ptr.hpp>
14
15 #ifdef NEED_ASSERT_H
16 #pragma pop_macro("_ASSERT_H")
17 #endif
18
19 #include "include/utime.h"
20 #include "common/RefCountedObj.h"
21 #include "common/debug.h"
22 #include "common/Timer.h"
23 #include "common/admin_socket.h"
24
25 #include "rgw_common.h"
26 #include "rgw_http_client_types.h"
27
28 #include <boost/asio/coroutine.hpp>
29
30 #include <atomic>
31
32 #define RGW_ASYNC_OPS_MGR_WINDOW 100
33
34 class RGWCoroutinesStack;
35 class RGWCoroutinesManager;
36 class RGWAioCompletionNotifier;
37
38 class RGWCompletionManager : public RefCountedObject {
39 friend class RGWCoroutinesManager;
40
41 CephContext *cct;
42
43 struct io_completion {
44 rgw_io_id io_id;
45 void *user_info;
46 };
47 std::list<io_completion> complete_reqs;
48 std::set<rgw_io_id> complete_reqs_set;
49 using NotifierRef = boost::intrusive_ptr<RGWAioCompletionNotifier>;
50 std::set<NotifierRef> cns;
51
52 ceph::mutex lock = ceph::make_mutex("RGWCompletionManager::lock");
53 ceph::condition_variable cond;
54
55 SafeTimer timer;
56
57 std::atomic<bool> going_down = { false };
58
59 std::map<void *, void *> waiters;
60
61 class WaitContext;
62
63 protected:
64 void _wakeup(void *opaque);
65 void _complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info);
66 public:
67 explicit RGWCompletionManager(CephContext *_cct);
68 ~RGWCompletionManager() override;
69
70 void complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info);
71 int get_next(io_completion *io);
72 bool try_get_next(io_completion *io);
73
74 void go_down();
75
76 /*
77 * wait for interval length to complete user_info
78 */
79 void wait_interval(void *opaque, const utime_t& interval, void *user_info);
80 void wakeup(void *opaque);
81
82 void register_completion_notifier(RGWAioCompletionNotifier *cn);
83 void unregister_completion_notifier(RGWAioCompletionNotifier *cn);
84 };
85
86 /* a single use librados aio completion notifier that hooks into the RGWCompletionManager */
87 class RGWAioCompletionNotifier : public RefCountedObject {
88 librados::AioCompletion *c;
89 RGWCompletionManager *completion_mgr;
90 rgw_io_id io_id;
91 void *user_data;
92 ceph::mutex lock = ceph::make_mutex("RGWAioCompletionNotifier");
93 bool registered;
94
95 public:
96 RGWAioCompletionNotifier(RGWCompletionManager *_mgr, const rgw_io_id& _io_id, void *_user_data);
97 ~RGWAioCompletionNotifier() override {
98 c->release();
99 lock.lock();
100 bool need_unregister = registered;
101 if (registered) {
102 completion_mgr->get();
103 }
104 registered = false;
105 lock.unlock();
106 if (need_unregister) {
107 completion_mgr->unregister_completion_notifier(this);
108 completion_mgr->put();
109 }
110 }
111
112 librados::AioCompletion *completion() {
113 return c;
114 }
115
116 void unregister() {
117 std::lock_guard l{lock};
118 if (!registered) {
119 return;
120 }
121 registered = false;
122 }
123
124 void cb() {
125 lock.lock();
126 if (!registered) {
127 lock.unlock();
128 put();
129 return;
130 }
131 completion_mgr->get();
132 registered = false;
133 lock.unlock();
134 completion_mgr->complete(this, io_id, user_data);
135 completion_mgr->put();
136 put();
137 }
138 };
139
140 // completion notifier with opaque payload (ie a reference-counted pointer)
141 template <typename T>
142 class RGWAioCompletionNotifierWith : public RGWAioCompletionNotifier {
143 T value;
144 public:
145 RGWAioCompletionNotifierWith(RGWCompletionManager *mgr,
146 const rgw_io_id& io_id, void *user_data,
147 T value)
148 : RGWAioCompletionNotifier(mgr, io_id, user_data), value(std::move(value))
149 {}
150 };
151
152 struct RGWCoroutinesEnv {
153 uint64_t run_context;
154 RGWCoroutinesManager *manager;
155 std::list<RGWCoroutinesStack *> *scheduled_stacks;
156 RGWCoroutinesStack *stack;
157
158 RGWCoroutinesEnv() : run_context(0), manager(NULL), scheduled_stacks(NULL), stack(NULL) {}
159 };
160
161 enum RGWCoroutineState {
162 RGWCoroutine_Error = -2,
163 RGWCoroutine_Done = -1,
164 RGWCoroutine_Run = 0,
165 };
166
167 struct rgw_spawned_stacks {
168 std::vector<RGWCoroutinesStack *> entries;
169
170 rgw_spawned_stacks() {}
171
172 void add_pending(RGWCoroutinesStack *s) {
173 entries.push_back(s);
174 }
175
176 void inherit(rgw_spawned_stacks *source) {
177 for (auto* entry : source->entries) {
178 add_pending(entry);
179 }
180 source->entries.clear();
181 }
182 };
183
184
185
186 class RGWCoroutine : public RefCountedObject, public boost::asio::coroutine {
187 friend class RGWCoroutinesStack;
188
189 struct StatusItem {
190 utime_t timestamp;
191 std::string status;
192
193 StatusItem(utime_t& t, const std::string& s) : timestamp(t), status(s) {}
194
195 void dump(Formatter *f) const;
196 };
197
198 #define MAX_COROUTINE_HISTORY 10
199
200 struct Status {
201 CephContext *cct;
202 ceph::shared_mutex lock =
203 ceph::make_shared_mutex("RGWCoroutine::Status::lock");
204 int max_history;
205
206 utime_t timestamp;
207 std::stringstream status;
208
209 explicit Status(CephContext *_cct) : cct(_cct), max_history(MAX_COROUTINE_HISTORY) {}
210
211 std::deque<StatusItem> history;
212
213 std::stringstream& set_status();
214 } status;
215
216 std::stringstream description;
217
218 protected:
219 bool _yield_ret;
220
221 struct {
222 boost::asio::coroutine cr;
223 bool should_exit{false};
224 int ret{0};
225
226 void init() {
227 cr = boost::asio::coroutine();
228 should_exit = false;
229 ret = 0;
230 }
231 } drain_status;
232
233 CephContext *cct;
234
235 RGWCoroutinesStack *stack;
236 int retcode;
237 int state;
238
239 rgw_spawned_stacks spawned;
240
241 std::stringstream error_stream;
242
243 int set_state(int s, int ret = 0) {
244 retcode = ret;
245 state = s;
246 return ret;
247 }
248 int set_cr_error(int ret) {
249 return set_state(RGWCoroutine_Error, ret);
250 }
251 int set_cr_done() {
252 return set_state(RGWCoroutine_Done, 0);
253 }
254 void set_io_blocked(bool flag);
255
256 void reset_description() {
257 description.str(std::string());
258 }
259
260 std::stringstream& set_description() {
261 return description;
262 }
263 std::stringstream& set_status() {
264 return status.set_status();
265 }
266
267 std::stringstream& set_status(const std::string& s) {
268 std::stringstream& status = set_status();
269 status << s;
270 return status;
271 }
272
273 virtual int operate_wrapper(const DoutPrefixProvider *dpp) {
274 return operate(dpp);
275 }
276 public:
277 RGWCoroutine(CephContext *_cct) : status(_cct), _yield_ret(false), cct(_cct), stack(NULL), retcode(0), state(RGWCoroutine_Run) {}
278 virtual ~RGWCoroutine() override;
279
280 virtual int operate(const DoutPrefixProvider *dpp) = 0;
281
282 bool is_done() { return (state == RGWCoroutine_Done || state == RGWCoroutine_Error); }
283 bool is_error() { return (state == RGWCoroutine_Error); }
284
285 std::stringstream& log_error() { return error_stream; }
286 std::string error_str() {
287 return error_stream.str();
288 }
289
290 void set_retcode(int r) {
291 retcode = r;
292 }
293
294 int get_ret_status() {
295 return retcode;
296 }
297
298 void call(RGWCoroutine *op); /* call at the same stack we're in */
299 RGWCoroutinesStack *spawn(RGWCoroutine *op, bool wait); /* execute on a different stack */
300 bool collect(int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id = nullptr); /* returns true if needs to be called again */
301 bool collect_next(int *ret, RGWCoroutinesStack **collected_stack = NULL); /* returns true if found a stack to collect */
302
303 RGWCoroutinesStack *prealloc_stack(); /* prepare a stack that will be used in the next spawn operation */
304 uint64_t prealloc_stack_id(); /* prepare a stack that will be used in the next spawn operation, return its id */
305
306 int wait(const utime_t& interval);
307 bool drain_children(int num_cr_left,
308 RGWCoroutinesStack *skip_stack = nullptr,
309 std::optional<std::function<void(uint64_t stack_id, int ret)> > cb = std::nullopt); /* returns true if needed to be called again,
310 cb will be called on completion of every
311 completion. */
312 bool drain_children(int num_cr_left,
313 std::optional<std::function<int(uint64_t stack_id, int ret)> > cb); /* returns true if needed to be called again,
314 cb will be called on every completion, can filter errors.
315 A negative return value from cb means that current cr
316 will need to exit */
317 void wakeup();
318 void set_sleeping(bool flag); /* put in sleep, or wakeup from sleep */
319
320 size_t num_spawned() {
321 return spawned.entries.size();
322 }
323
324 void wait_for_child();
325
326 virtual std::string to_str() const;
327
328 RGWCoroutinesStack *get_stack() const {
329 return stack;
330 }
331
332 RGWCoroutinesEnv *get_env() const;
333
334 void dump(Formatter *f) const;
335
336 void init_new_io(RGWIOProvider *io_provider); /* only links the default io id */
337
338 int io_block(int ret = 0) {
339 return io_block(ret, -1);
340 }
341 int io_block(int ret, int64_t io_id);
342 int io_block(int ret, const rgw_io_id& io_id);
343 void io_complete() {
344 io_complete(rgw_io_id{});
345 }
346 void io_complete(const rgw_io_id& io_id);
347 };
348
349 std::ostream& operator<<(std::ostream& out, const RGWCoroutine& cr);
350
351 #define yield_until_true(x) \
352 do { \
353 do { \
354 yield _yield_ret = x; \
355 } while (!_yield_ret); \
356 _yield_ret = false; \
357 } while (0)
358
359 #define drain_all() \
360 drain_status.init(); \
361 yield_until_true(drain_children(0))
362
363 #define drain_all_but(n) \
364 drain_status.init(); \
365 yield_until_true(drain_children(n))
366
367 #define drain_all_but_stack(stack) \
368 drain_status.init(); \
369 yield_until_true(drain_children(1, stack))
370
371 #define drain_all_but_stack_cb(stack, cb) \
372 drain_status.init(); \
373 yield_until_true(drain_children(1, stack, cb))
374
375 #define drain_with_cb(n, cb) \
376 drain_status.init(); \
377 yield_until_true(drain_children(n, cb)); \
378 if (drain_status.should_exit) { \
379 return set_cr_error(drain_status.ret); \
380 }
381
382 #define drain_all_cb(cb) \
383 drain_with_cb(0, cb)
384
385 #define yield_spawn_window(cr, n, cb) \
386 do { \
387 spawn(cr, false); \
388 drain_with_cb(n, cb); /* this is guaranteed to yield */ \
389 } while (0)
390
391
392
393 template <class T>
394 class RGWConsumerCR : public RGWCoroutine {
395 std::list<T> product;
396
397 public:
398 explicit RGWConsumerCR(CephContext *_cct) : RGWCoroutine(_cct) {}
399
400 bool has_product() {
401 return !product.empty();
402 }
403
404 void wait_for_product() {
405 if (!has_product()) {
406 set_sleeping(true);
407 }
408 }
409
410 bool consume(T *p) {
411 if (product.empty()) {
412 return false;
413 }
414 *p = product.front();
415 product.pop_front();
416 return true;
417 }
418
419 void receive(const T& p, bool wakeup = true);
420 void receive(std::list<T>& l, bool wakeup = true);
421 };
422
423 class RGWCoroutinesStack : public RefCountedObject {
424 friend class RGWCoroutine;
425 friend class RGWCoroutinesManager;
426
427 CephContext *cct;
428
429 int64_t id{-1};
430
431 RGWCoroutinesManager *ops_mgr;
432
433 std::list<RGWCoroutine *> ops;
434 std::list<RGWCoroutine *>::iterator pos;
435
436 rgw_spawned_stacks spawned;
437
438 RGWCoroutinesStack *preallocated_stack{nullptr};
439
440 std::set<RGWCoroutinesStack *> blocked_by_stack;
441 std::set<RGWCoroutinesStack *> blocking_stacks;
442
443 std::map<int64_t, rgw_io_id> io_finish_ids;
444 rgw_io_id io_blocked_id;
445
446 bool done_flag;
447 bool error_flag;
448 bool blocked_flag;
449 bool sleep_flag;
450 bool interval_wait_flag;
451
452 bool is_scheduled;
453
454 bool is_waiting_for_child;
455
456 int retcode;
457
458 uint64_t run_count;
459
460 protected:
461 RGWCoroutinesEnv *env;
462 RGWCoroutinesStack *parent;
463
464 RGWCoroutinesStack *spawn(RGWCoroutine *source_op, RGWCoroutine *next_op, bool wait);
465 bool collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id); /* returns true if needs to be called again */
466 bool collect_next(RGWCoroutine *op, int *ret, RGWCoroutinesStack **collected_stack); /* returns true if found a stack to collect */
467 public:
468 RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start = NULL);
469 ~RGWCoroutinesStack() override;
470
471 int64_t get_id() const {
472 return id;
473 }
474
475 int operate(const DoutPrefixProvider *dpp, RGWCoroutinesEnv *env);
476
477 bool is_done() {
478 return done_flag;
479 }
480 bool is_error() {
481 return error_flag;
482 }
483 bool is_blocked_by_stack() {
484 return !blocked_by_stack.empty();
485 }
486 void set_io_blocked(bool flag) {
487 blocked_flag = flag;
488 }
489 void set_io_blocked_id(const rgw_io_id& io_id) {
490 io_blocked_id = io_id;
491 }
492 bool is_io_blocked() {
493 return blocked_flag && !done_flag;
494 }
495 bool can_io_unblock(const rgw_io_id& io_id) {
496 return ((io_blocked_id.id < 0) ||
497 io_blocked_id.intersects(io_id));
498 }
499 bool try_io_unblock(const rgw_io_id& io_id);
500 bool consume_io_finish(const rgw_io_id& io_id);
501 void set_interval_wait(bool flag) {
502 interval_wait_flag = flag;
503 }
504 bool is_interval_waiting() {
505 return interval_wait_flag;
506 }
507 void set_sleeping(bool flag) {
508 bool wakeup = sleep_flag & !flag;
509 sleep_flag = flag;
510 if (wakeup) {
511 schedule();
512 }
513 }
514 bool is_sleeping() {
515 return sleep_flag;
516 }
517 void set_is_scheduled(bool flag) {
518 is_scheduled = flag;
519 }
520
521 bool is_blocked() {
522 return is_blocked_by_stack() || is_sleeping() ||
523 is_io_blocked() || waiting_for_child() ;
524 }
525
526 void schedule();
527 void _schedule();
528
529 int get_ret_status() {
530 return retcode;
531 }
532
533 std::string error_str();
534
535 void call(RGWCoroutine *next_op);
536 RGWCoroutinesStack *spawn(RGWCoroutine *next_op, bool wait);
537 RGWCoroutinesStack *prealloc_stack();
538 int unwind(int retcode);
539
540 int wait(const utime_t& interval);
541 void wakeup();
542 void io_complete() {
543 io_complete(rgw_io_id{});
544 }
545 void io_complete(const rgw_io_id& io_id);
546
547 bool collect(int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id); /* returns true if needs to be called again */
548
549 void cancel();
550
551 RGWAioCompletionNotifier *create_completion_notifier();
552 template <typename T>
553 RGWAioCompletionNotifier *create_completion_notifier(T value);
554 RGWCompletionManager *get_completion_mgr();
555
556 void set_blocked_by(RGWCoroutinesStack *s) {
557 blocked_by_stack.insert(s);
558 s->blocking_stacks.insert(this);
559 }
560
561 void set_wait_for_child(bool flag) {
562 is_waiting_for_child = flag;
563 }
564
565 bool waiting_for_child() {
566 return is_waiting_for_child;
567 }
568
569 bool unblock_stack(RGWCoroutinesStack **s);
570
571 RGWCoroutinesEnv *get_env() const { return env; }
572
573 void dump(Formatter *f) const;
574
575 void init_new_io(RGWIOProvider *io_provider);
576 };
577
578 template <class T>
579 void RGWConsumerCR<T>::receive(std::list<T>& l, bool wakeup)
580 {
581 product.splice(product.end(), l);
582 if (wakeup) {
583 set_sleeping(false);
584 }
585 }
586
587
588 template <class T>
589 void RGWConsumerCR<T>::receive(const T& p, bool wakeup)
590 {
591 product.push_back(p);
592 if (wakeup) {
593 set_sleeping(false);
594 }
595 }
596
597 class RGWCoroutinesManagerRegistry : public RefCountedObject, public AdminSocketHook {
598 CephContext *cct;
599
600 std::set<RGWCoroutinesManager *> managers;
601 ceph::shared_mutex lock =
602 ceph::make_shared_mutex("RGWCoroutinesRegistry::lock");
603
604 std::string admin_command;
605
606 public:
607 explicit RGWCoroutinesManagerRegistry(CephContext *_cct) : cct(_cct) {}
608 virtual ~RGWCoroutinesManagerRegistry() override;
609
610 void add(RGWCoroutinesManager *mgr);
611 void remove(RGWCoroutinesManager *mgr);
612
613 int hook_to_admin_command(const std::string& command);
614 int call(std::string_view command, const cmdmap_t& cmdmap,
615 Formatter *f,
616 std::ostream& ss,
617 bufferlist& out) override;
618
619 void dump(Formatter *f) const;
620 };
621
622 class RGWCoroutinesManager {
623 CephContext *cct;
624 std::atomic<bool> going_down = { false };
625
626 std::atomic<int64_t> run_context_count = { 0 };
627 std::map<uint64_t, std::set<RGWCoroutinesStack *> > run_contexts;
628
629 std::atomic<int64_t> max_io_id = { 0 };
630 std::atomic<uint64_t> max_stack_id = { 0 };
631
632 mutable ceph::shared_mutex lock =
633 ceph::make_shared_mutex("RGWCoroutinesManager::lock");
634
635 RGWIOIDProvider io_id_provider;
636
637 void handle_unblocked_stack(std::set<RGWCoroutinesStack *>& context_stacks, std::list<RGWCoroutinesStack *>& scheduled_stacks,
638 RGWCompletionManager::io_completion& io, int *waiting_count);
639 protected:
640 RGWCompletionManager *completion_mgr;
641 RGWCoroutinesManagerRegistry *cr_registry;
642
643 int ops_window;
644
645 std::string id;
646
647 void put_completion_notifier(RGWAioCompletionNotifier *cn);
648 public:
649 RGWCoroutinesManager(CephContext *_cct, RGWCoroutinesManagerRegistry *_cr_registry) : cct(_cct),
650 cr_registry(_cr_registry), ops_window(RGW_ASYNC_OPS_MGR_WINDOW) {
651 completion_mgr = new RGWCompletionManager(cct);
652 if (cr_registry) {
653 cr_registry->add(this);
654 }
655 }
656 virtual ~RGWCoroutinesManager();
657
658 int run(const DoutPrefixProvider *dpp, std::list<RGWCoroutinesStack *>& ops);
659 int run(const DoutPrefixProvider *dpp, RGWCoroutine *op);
660 void stop() {
661 bool expected = false;
662 if (going_down.compare_exchange_strong(expected, true)) {
663 completion_mgr->go_down();
664 }
665 }
666
667 virtual void report_error(RGWCoroutinesStack *op);
668
669 RGWAioCompletionNotifier *create_completion_notifier(RGWCoroutinesStack *stack);
670 template <typename T>
671 RGWAioCompletionNotifier *create_completion_notifier(RGWCoroutinesStack *stack, T value);
672 RGWCompletionManager *get_completion_mgr() { return completion_mgr; }
673
674 void schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack);
675 void _schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack);
676 RGWCoroutinesStack *allocate_stack();
677
678 int64_t get_next_io_id();
679 uint64_t get_next_stack_id();
680
681 void set_sleeping(RGWCoroutine *cr, bool flag);
682 void io_complete(RGWCoroutine *cr, const rgw_io_id& io_id);
683
684 virtual std::string get_id();
685 void dump(Formatter *f) const;
686
687 RGWIOIDProvider& get_io_id_provider() {
688 return io_id_provider;
689 }
690 };
691
692 template <typename T>
693 RGWAioCompletionNotifier *RGWCoroutinesManager::create_completion_notifier(RGWCoroutinesStack *stack, T value)
694 {
695 rgw_io_id io_id{get_next_io_id(), -1};
696 RGWAioCompletionNotifier *cn = new RGWAioCompletionNotifierWith<T>(completion_mgr, io_id, (void *)stack, std::move(value));
697 completion_mgr->register_completion_notifier(cn);
698 return cn;
699 }
700
701 template <typename T>
702 RGWAioCompletionNotifier *RGWCoroutinesStack::create_completion_notifier(T value)
703 {
704 return ops_mgr->create_completion_notifier(this, std::move(value));
705 }
706
707 class RGWSimpleCoroutine : public RGWCoroutine {
708 bool called_cleanup;
709
710 int operate(const DoutPrefixProvider *dpp) override;
711
712 int state_init();
713 int state_send_request(const DoutPrefixProvider *dpp);
714 int state_request_complete();
715 int state_all_complete();
716
717 void call_cleanup();
718
719 public:
720 RGWSimpleCoroutine(CephContext *_cct) : RGWCoroutine(_cct), called_cleanup(false) {}
721 ~RGWSimpleCoroutine() override;
722
723 virtual int init() { return 0; }
724 virtual int send_request(const DoutPrefixProvider *dpp) = 0;
725 virtual int request_complete() = 0;
726 virtual int finish() { return 0; }
727 virtual void request_cleanup() {}
728 };
729
730 #endif