]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_coroutine.h
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / rgw / rgw_coroutine.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #pragma once
5
6 #ifdef _ASSERT_H
7 #define NEED_ASSERT_H
8 #pragma push_macro("_ASSERT_H")
9 #endif
10
11 #include <boost/asio.hpp>
12 #include <boost/intrusive_ptr.hpp>
13
14 #ifdef NEED_ASSERT_H
15 #pragma pop_macro("_ASSERT_H")
16 #endif
17
18 #include "include/utime.h"
19 #include "common/RefCountedObj.h"
20 #include "common/debug.h"
21 #include "common/Timer.h"
22 #include "common/admin_socket.h"
23
24 #include "rgw_common.h"
25 #include "rgw_http_client_types.h"
26
27 #include <boost/asio/coroutine.hpp>
28
29 #include <atomic>
30
31 #define RGW_ASYNC_OPS_MGR_WINDOW 100
32
33 class RGWCoroutinesStack;
34 class RGWCoroutinesManager;
35 class RGWAioCompletionNotifier;
36
37 class RGWCompletionManager : public RefCountedObject {
38 friend class RGWCoroutinesManager;
39
40 CephContext *cct;
41
42 struct io_completion {
43 rgw_io_id io_id;
44 void *user_info;
45 };
46 std::list<io_completion> complete_reqs;
47 std::set<rgw_io_id> complete_reqs_set;
48 using NotifierRef = boost::intrusive_ptr<RGWAioCompletionNotifier>;
49 std::set<NotifierRef> cns;
50
51 ceph::mutex lock = ceph::make_mutex("RGWCompletionManager::lock");
52 ceph::condition_variable cond;
53
54 SafeTimer timer;
55
56 std::atomic<bool> going_down = { false };
57
58 std::map<void *, void *> waiters;
59
60 class WaitContext;
61
62 protected:
63 void _wakeup(void *opaque);
64 void _complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info);
65 public:
66 explicit RGWCompletionManager(CephContext *_cct);
67 virtual ~RGWCompletionManager() override;
68
69 void complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info);
70 int get_next(io_completion *io);
71 bool try_get_next(io_completion *io);
72
73 void go_down();
74
75 /*
76 * wait for interval length to complete user_info
77 */
78 void wait_interval(void *opaque, const utime_t& interval, void *user_info);
79 void wakeup(void *opaque);
80
81 void register_completion_notifier(RGWAioCompletionNotifier *cn);
82 void unregister_completion_notifier(RGWAioCompletionNotifier *cn);
83 };
84
85 /* a single use librados aio completion notifier that hooks into the RGWCompletionManager */
86 class RGWAioCompletionNotifier : public RefCountedObject {
87 librados::AioCompletion *c;
88 RGWCompletionManager *completion_mgr;
89 rgw_io_id io_id;
90 void *user_data;
91 ceph::mutex lock = ceph::make_mutex("RGWAioCompletionNotifier");
92 bool registered;
93
94 public:
95 RGWAioCompletionNotifier(RGWCompletionManager *_mgr, const rgw_io_id& _io_id, void *_user_data);
96 virtual ~RGWAioCompletionNotifier() override {
97 c->release();
98 lock.lock();
99 bool need_unregister = registered;
100 if (registered) {
101 completion_mgr->get();
102 }
103 registered = false;
104 lock.unlock();
105 if (need_unregister) {
106 completion_mgr->unregister_completion_notifier(this);
107 completion_mgr->put();
108 }
109 }
110
111 librados::AioCompletion *completion() {
112 return c;
113 }
114
115 void unregister() {
116 std::lock_guard l{lock};
117 if (!registered) {
118 return;
119 }
120 registered = false;
121 }
122
123 void cb() {
124 lock.lock();
125 if (!registered) {
126 lock.unlock();
127 put();
128 return;
129 }
130 completion_mgr->get();
131 registered = false;
132 lock.unlock();
133 completion_mgr->complete(this, io_id, user_data);
134 completion_mgr->put();
135 put();
136 }
137 };
138
139 // completion notifier with opaque payload (ie a reference-counted pointer)
140 template <typename T>
141 class RGWAioCompletionNotifierWith : public RGWAioCompletionNotifier {
142 T value;
143 public:
144 RGWAioCompletionNotifierWith(RGWCompletionManager *mgr,
145 const rgw_io_id& io_id, void *user_data,
146 T value)
147 : RGWAioCompletionNotifier(mgr, io_id, user_data), value(std::move(value))
148 {}
149 };
150
151 struct RGWCoroutinesEnv {
152 uint64_t run_context;
153 RGWCoroutinesManager *manager;
154 std::list<RGWCoroutinesStack *> *scheduled_stacks;
155 RGWCoroutinesStack *stack;
156
157 RGWCoroutinesEnv() : run_context(0), manager(NULL), scheduled_stacks(NULL), stack(NULL) {}
158 };
159
160 enum RGWCoroutineState {
161 RGWCoroutine_Error = -2,
162 RGWCoroutine_Done = -1,
163 RGWCoroutine_Run = 0,
164 };
165
166 struct rgw_spawned_stacks {
167 std::vector<RGWCoroutinesStack *> entries;
168
169 rgw_spawned_stacks() {}
170
171 void add_pending(RGWCoroutinesStack *s) {
172 entries.push_back(s);
173 }
174
175 void inherit(rgw_spawned_stacks *source) {
176 for (auto* entry : source->entries) {
177 add_pending(entry);
178 }
179 source->entries.clear();
180 }
181 };
182
183
184
185 class RGWCoroutine : public RefCountedObject, public boost::asio::coroutine {
186 friend class RGWCoroutinesStack;
187
188 struct StatusItem {
189 utime_t timestamp;
190 std::string status;
191
192 StatusItem(utime_t& t, const std::string& s) : timestamp(t), status(s) {}
193
194 void dump(Formatter *f) const;
195 };
196
197 #define MAX_COROUTINE_HISTORY 10
198
199 struct Status {
200 CephContext *cct;
201 ceph::shared_mutex lock =
202 ceph::make_shared_mutex("RGWCoroutine::Status::lock");
203 int max_history;
204
205 utime_t timestamp;
206 std::stringstream status;
207
208 explicit Status(CephContext *_cct) : cct(_cct), max_history(MAX_COROUTINE_HISTORY) {}
209
210 std::deque<StatusItem> history;
211
212 std::stringstream& set_status();
213 } status;
214
215 std::stringstream description;
216
217 protected:
218 bool _yield_ret;
219
220 struct {
221 boost::asio::coroutine cr;
222 bool should_exit{false};
223 int ret{0};
224
225 void init() {
226 cr = boost::asio::coroutine();
227 should_exit = false;
228 ret = 0;
229 }
230 } drain_status;
231
232 CephContext *cct;
233
234 RGWCoroutinesStack *stack;
235 int retcode;
236 int state;
237
238 rgw_spawned_stacks spawned;
239
240 std::stringstream error_stream;
241
242 int set_state(int s, int ret = 0) {
243 retcode = ret;
244 state = s;
245 return ret;
246 }
247 int set_cr_error(int ret) {
248 return set_state(RGWCoroutine_Error, ret);
249 }
250 int set_cr_done() {
251 return set_state(RGWCoroutine_Done, 0);
252 }
253 void set_io_blocked(bool flag);
254
255 void reset_description() {
256 description.str(std::string());
257 }
258
259 std::stringstream& set_description() {
260 return description;
261 }
262 std::stringstream& set_status() {
263 return status.set_status();
264 }
265
266 std::stringstream& set_status(const std::string& s) {
267 std::stringstream& status = set_status();
268 status << s;
269 return status;
270 }
271
272 virtual int operate_wrapper(const DoutPrefixProvider *dpp) {
273 return operate(dpp);
274 }
275 public:
276 RGWCoroutine(CephContext *_cct) : status(_cct), _yield_ret(false), cct(_cct), stack(NULL), retcode(0), state(RGWCoroutine_Run) {}
277 virtual ~RGWCoroutine() override;
278
279 virtual int operate(const DoutPrefixProvider *dpp) = 0;
280
281 bool is_done() { return (state == RGWCoroutine_Done || state == RGWCoroutine_Error); }
282 bool is_error() { return (state == RGWCoroutine_Error); }
283
284 std::stringstream& log_error() { return error_stream; }
285 std::string error_str() {
286 return error_stream.str();
287 }
288
289 void set_retcode(int r) {
290 retcode = r;
291 }
292
293 int get_ret_status() {
294 return retcode;
295 }
296
297 void call(RGWCoroutine *op); /* call at the same stack we're in */
298 RGWCoroutinesStack *spawn(RGWCoroutine *op, bool wait); /* execute on a different stack */
299 bool collect(int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id = nullptr); /* returns true if needs to be called again */
300 bool collect_next(int *ret, RGWCoroutinesStack **collected_stack = NULL); /* returns true if found a stack to collect */
301
302 int wait(const utime_t& interval);
303 bool drain_children(int num_cr_left,
304 RGWCoroutinesStack *skip_stack = nullptr,
305 std::optional<std::function<void(uint64_t stack_id, int ret)> > cb = std::nullopt); /* returns true if needed to be called again,
306 cb will be called on completion of every
307 completion. */
308 bool drain_children(int num_cr_left,
309 std::optional<std::function<int(uint64_t stack_id, int ret)> > cb); /* returns true if needed to be called again,
310 cb will be called on every completion, can filter errors.
311 A negative return value from cb means that current cr
312 will need to exit */
313 void wakeup();
314 void set_sleeping(bool flag); /* put in sleep, or wakeup from sleep */
315
316 size_t num_spawned() {
317 return spawned.entries.size();
318 }
319
320 void wait_for_child();
321
322 virtual std::string to_str() const;
323
324 RGWCoroutinesStack *get_stack() const {
325 return stack;
326 }
327
328 RGWCoroutinesEnv *get_env() const;
329
330 void dump(Formatter *f) const;
331
332 void init_new_io(RGWIOProvider *io_provider); /* only links the default io id */
333
334 int io_block(int ret = 0) {
335 return io_block(ret, -1);
336 }
337 int io_block(int ret, int64_t io_id);
338 int io_block(int ret, const rgw_io_id& io_id);
339 void io_complete() {
340 io_complete(rgw_io_id{});
341 }
342 void io_complete(const rgw_io_id& io_id);
343 };
344
345 std::ostream& operator<<(std::ostream& out, const RGWCoroutine& cr);
346
347 #define yield_until_true(x) \
348 do { \
349 do { \
350 yield _yield_ret = x; \
351 } while (!_yield_ret); \
352 _yield_ret = false; \
353 } while (0)
354
355 #define drain_all() \
356 drain_status.init(); \
357 yield_until_true(drain_children(0))
358
359 #define drain_all_but(n) \
360 drain_status.init(); \
361 yield_until_true(drain_children(n))
362
363 #define drain_all_but_stack(stack) \
364 drain_status.init(); \
365 yield_until_true(drain_children(1, stack))
366
367 #define drain_all_but_stack_cb(stack, cb) \
368 drain_status.init(); \
369 yield_until_true(drain_children(1, stack, cb))
370
371 #define drain_with_cb(n, cb) \
372 drain_status.init(); \
373 yield_until_true(drain_children(n, cb)); \
374 if (drain_status.should_exit) { \
375 return set_cr_error(drain_status.ret); \
376 }
377
378 #define drain_all_cb(cb) \
379 drain_with_cb(0, cb)
380
381 #define yield_spawn_window(cr, n, cb) \
382 do { \
383 spawn(cr, false); \
384 drain_with_cb(n, cb); /* this is guaranteed to yield */ \
385 } while (0)
386
387
388
389 template <class T>
390 class RGWConsumerCR : public RGWCoroutine {
391 std::list<T> product;
392
393 public:
394 explicit RGWConsumerCR(CephContext *_cct) : RGWCoroutine(_cct) {}
395
396 bool has_product() {
397 return !product.empty();
398 }
399
400 void wait_for_product() {
401 if (!has_product()) {
402 set_sleeping(true);
403 }
404 }
405
406 bool consume(T *p) {
407 if (product.empty()) {
408 return false;
409 }
410 *p = product.front();
411 product.pop_front();
412 return true;
413 }
414
415 void receive(const T& p, bool wakeup = true);
416 void receive(std::list<T>& l, bool wakeup = true);
417 };
418
419 class RGWCoroutinesStack : public RefCountedObject {
420 friend class RGWCoroutine;
421 friend class RGWCoroutinesManager;
422
423 CephContext *cct;
424
425 int64_t id{-1};
426
427 RGWCoroutinesManager *ops_mgr;
428
429 std::list<RGWCoroutine *> ops;
430 std::list<RGWCoroutine *>::iterator pos;
431
432 rgw_spawned_stacks spawned;
433
434 std::set<RGWCoroutinesStack *> blocked_by_stack;
435 std::set<RGWCoroutinesStack *> blocking_stacks;
436
437 std::map<int64_t, rgw_io_id> io_finish_ids;
438 rgw_io_id io_blocked_id;
439
440 bool done_flag;
441 bool error_flag;
442 bool blocked_flag;
443 bool sleep_flag;
444 bool interval_wait_flag;
445
446 bool is_scheduled;
447
448 bool is_waiting_for_child;
449
450 int retcode;
451
452 uint64_t run_count;
453
454 protected:
455 RGWCoroutinesEnv *env;
456 RGWCoroutinesStack *parent;
457
458 RGWCoroutinesStack *spawn(RGWCoroutine *source_op, RGWCoroutine *next_op, bool wait);
459 bool collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id); /* returns true if needs to be called again */
460 bool collect_next(RGWCoroutine *op, int *ret, RGWCoroutinesStack **collected_stack); /* returns true if found a stack to collect */
461 public:
462 RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start = NULL);
463 virtual ~RGWCoroutinesStack() override;
464
465 int64_t get_id() const {
466 return id;
467 }
468
469 int operate(const DoutPrefixProvider *dpp, RGWCoroutinesEnv *env);
470
471 bool is_done() {
472 return done_flag;
473 }
474 bool is_error() {
475 return error_flag;
476 }
477 bool is_blocked_by_stack() {
478 return !blocked_by_stack.empty();
479 }
480 void set_io_blocked(bool flag) {
481 blocked_flag = flag;
482 }
483 void set_io_blocked_id(const rgw_io_id& io_id) {
484 io_blocked_id = io_id;
485 }
486 bool is_io_blocked() {
487 return blocked_flag && !done_flag;
488 }
489 bool can_io_unblock(const rgw_io_id& io_id) {
490 return ((io_blocked_id.id < 0) ||
491 io_blocked_id.intersects(io_id));
492 }
493 bool try_io_unblock(const rgw_io_id& io_id);
494 bool consume_io_finish(const rgw_io_id& io_id);
495 void set_interval_wait(bool flag) {
496 interval_wait_flag = flag;
497 }
498 bool is_interval_waiting() {
499 return interval_wait_flag;
500 }
501 void set_sleeping(bool flag) {
502 bool wakeup = sleep_flag & !flag;
503 sleep_flag = flag;
504 if (wakeup) {
505 schedule();
506 }
507 }
508 bool is_sleeping() {
509 return sleep_flag;
510 }
511 void set_is_scheduled(bool flag) {
512 is_scheduled = flag;
513 }
514
515 bool is_blocked() {
516 return is_blocked_by_stack() || is_sleeping() ||
517 is_io_blocked() || waiting_for_child() ;
518 }
519
520 void schedule();
521 void _schedule();
522
523 int get_ret_status() {
524 return retcode;
525 }
526
527 std::string error_str();
528
529 void call(RGWCoroutine *next_op);
530 RGWCoroutinesStack *spawn(RGWCoroutine *next_op, bool wait);
531 int unwind(int retcode);
532
533 int wait(const utime_t& interval);
534 void wakeup();
535 void io_complete() {
536 io_complete(rgw_io_id{});
537 }
538 void io_complete(const rgw_io_id& io_id);
539
540 bool collect(int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id); /* returns true if needs to be called again */
541
542 void cancel();
543
544 RGWAioCompletionNotifier *create_completion_notifier();
545 template <typename T>
546 RGWAioCompletionNotifier *create_completion_notifier(T value);
547 RGWCompletionManager *get_completion_mgr();
548
549 void set_blocked_by(RGWCoroutinesStack *s) {
550 blocked_by_stack.insert(s);
551 s->blocking_stacks.insert(this);
552 }
553
554 void set_wait_for_child(bool flag) {
555 is_waiting_for_child = flag;
556 }
557
558 bool waiting_for_child() {
559 return is_waiting_for_child;
560 }
561
562 bool unblock_stack(RGWCoroutinesStack **s);
563
564 RGWCoroutinesEnv *get_env() const { return env; }
565
566 void dump(Formatter *f) const;
567
568 void init_new_io(RGWIOProvider *io_provider);
569 };
570
571 template <class T>
572 void RGWConsumerCR<T>::receive(std::list<T>& l, bool wakeup)
573 {
574 product.splice(product.end(), l);
575 if (wakeup) {
576 set_sleeping(false);
577 }
578 }
579
580
581 template <class T>
582 void RGWConsumerCR<T>::receive(const T& p, bool wakeup)
583 {
584 product.push_back(p);
585 if (wakeup) {
586 set_sleeping(false);
587 }
588 }
589
590 class RGWCoroutinesManagerRegistry : public RefCountedObject, public AdminSocketHook {
591 CephContext *cct;
592
593 std::set<RGWCoroutinesManager *> managers;
594 ceph::shared_mutex lock =
595 ceph::make_shared_mutex("RGWCoroutinesRegistry::lock");
596
597 std::string admin_command;
598
599 public:
600 explicit RGWCoroutinesManagerRegistry(CephContext *_cct) : cct(_cct) {}
601 virtual ~RGWCoroutinesManagerRegistry() override;
602
603 void add(RGWCoroutinesManager *mgr);
604 void remove(RGWCoroutinesManager *mgr);
605
606 int hook_to_admin_command(const std::string& command);
607 int call(std::string_view command, const cmdmap_t& cmdmap,
608 const bufferlist&,
609 Formatter *f,
610 std::ostream& ss,
611 bufferlist& out) override;
612
613 void dump(Formatter *f) const;
614 };
615
616 class RGWCoroutinesManager {
617 CephContext *cct;
618 std::atomic<bool> going_down = { false };
619
620 std::atomic<int64_t> run_context_count = { 0 };
621 std::map<uint64_t, std::set<RGWCoroutinesStack *> > run_contexts;
622
623 std::atomic<int64_t> max_io_id = { 0 };
624 std::atomic<uint64_t> max_stack_id = { 0 };
625
626 mutable ceph::shared_mutex lock =
627 ceph::make_shared_mutex("RGWCoroutinesManager::lock");
628
629 RGWIOIDProvider io_id_provider;
630
631 void handle_unblocked_stack(std::set<RGWCoroutinesStack *>& context_stacks, std::list<RGWCoroutinesStack *>& scheduled_stacks,
632 RGWCompletionManager::io_completion& io, int *waiting_count, int *interval_wait_count);
633 protected:
634 RGWCompletionManager *completion_mgr;
635 RGWCoroutinesManagerRegistry *cr_registry;
636
637 int ops_window;
638
639 std::string id;
640
641 void put_completion_notifier(RGWAioCompletionNotifier *cn);
642 public:
643 RGWCoroutinesManager(CephContext *_cct, RGWCoroutinesManagerRegistry *_cr_registry) : cct(_cct),
644 cr_registry(_cr_registry), ops_window(RGW_ASYNC_OPS_MGR_WINDOW) {
645 completion_mgr = new RGWCompletionManager(cct);
646 if (cr_registry) {
647 cr_registry->add(this);
648 }
649 }
650 virtual ~RGWCoroutinesManager();
651
652 int run(const DoutPrefixProvider *dpp, std::list<RGWCoroutinesStack *>& ops);
653 int run(const DoutPrefixProvider *dpp, RGWCoroutine *op);
654 void stop() {
655 bool expected = false;
656 if (going_down.compare_exchange_strong(expected, true)) {
657 completion_mgr->go_down();
658 }
659 }
660
661 virtual void report_error(RGWCoroutinesStack *op);
662
663 RGWAioCompletionNotifier *create_completion_notifier(RGWCoroutinesStack *stack);
664 template <typename T>
665 RGWAioCompletionNotifier *create_completion_notifier(RGWCoroutinesStack *stack, T value);
666 RGWCompletionManager *get_completion_mgr() { return completion_mgr; }
667
668 void schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack);
669 void _schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack);
670 RGWCoroutinesStack *allocate_stack();
671
672 int64_t get_next_io_id();
673 uint64_t get_next_stack_id();
674
675 void set_sleeping(RGWCoroutine *cr, bool flag);
676 void io_complete(RGWCoroutine *cr, const rgw_io_id& io_id);
677
678 virtual std::string get_id();
679 void dump(Formatter *f) const;
680
681 RGWIOIDProvider& get_io_id_provider() {
682 return io_id_provider;
683 }
684 };
685
686 template <typename T>
687 RGWAioCompletionNotifier *RGWCoroutinesManager::create_completion_notifier(RGWCoroutinesStack *stack, T value)
688 {
689 rgw_io_id io_id{get_next_io_id(), -1};
690 RGWAioCompletionNotifier *cn = new RGWAioCompletionNotifierWith<T>(completion_mgr, io_id, (void *)stack, std::move(value));
691 completion_mgr->register_completion_notifier(cn);
692 return cn;
693 }
694
695 template <typename T>
696 RGWAioCompletionNotifier *RGWCoroutinesStack::create_completion_notifier(T value)
697 {
698 return ops_mgr->create_completion_notifier(this, std::move(value));
699 }
700
701 class RGWSimpleCoroutine : public RGWCoroutine {
702 bool called_cleanup;
703
704 int operate(const DoutPrefixProvider *dpp) override;
705
706 int state_init();
707 int state_send_request(const DoutPrefixProvider *dpp);
708 int state_request_complete();
709 int state_all_complete();
710
711 void call_cleanup();
712
713 public:
714 RGWSimpleCoroutine(CephContext *_cct) : RGWCoroutine(_cct), called_cleanup(false) {}
715 virtual ~RGWSimpleCoroutine() override;
716
717 virtual int init() { return 0; }
718 virtual int send_request(const DoutPrefixProvider *dpp) = 0;
719 virtual int request_complete() = 0;
720 virtual int finish() { return 0; }
721 virtual void request_cleanup() {}
722 };