]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_coroutine.h
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rgw / rgw_coroutine.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #ifndef CEPH_RGW_COROUTINE_H
5 #define CEPH_RGW_COROUTINE_H
6
7 #ifdef _ASSERT_H
8 #define NEED_ASSERT_H
9 #pragma push_macro("_ASSERT_H")
10 #endif
11
12 #include <boost/asio.hpp>
13 #include <boost/intrusive_ptr.hpp>
14
15 #ifdef NEED_ASSERT_H
16 #pragma pop_macro("_ASSERT_H")
17 #endif
18
19 #include "include/utime.h"
20 #include "common/RefCountedObj.h"
21 #include "common/debug.h"
22 #include "common/Timer.h"
23 #include "common/admin_socket.h"
24
25 #include "rgw_common.h"
26 #include <boost/asio/coroutine.hpp>
27
28 #include <atomic>
29
30 #define RGW_ASYNC_OPS_MGR_WINDOW 100
31
32 class RGWCoroutinesStack;
33 class RGWCoroutinesManager;
34 class RGWAioCompletionNotifier;
35
36 class RGWCompletionManager : public RefCountedObject {
37 friend class RGWCoroutinesManager;
38
39 CephContext *cct;
40
41 struct io_completion {
42 rgw_io_id io_id;
43 void *user_info;
44 };
45 list<io_completion> complete_reqs;
46 set<rgw_io_id> complete_reqs_set;
47 using NotifierRef = boost::intrusive_ptr<RGWAioCompletionNotifier>;
48 set<NotifierRef> cns;
49
50 Mutex lock;
51 Cond cond;
52
53 SafeTimer timer;
54
55 std::atomic<bool> going_down = { false };
56
57 map<void *, void *> waiters;
58
59 class WaitContext;
60
61 protected:
62 void _wakeup(void *opaque);
63 void _complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info);
64 public:
65 explicit RGWCompletionManager(CephContext *_cct);
66 ~RGWCompletionManager() override;
67
68 void complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info);
69 int get_next(io_completion *io);
70 bool try_get_next(io_completion *io);
71
72 void go_down();
73
74 /*
75 * wait for interval length to complete user_info
76 */
77 void wait_interval(void *opaque, const utime_t& interval, void *user_info);
78 void wakeup(void *opaque);
79
80 void register_completion_notifier(RGWAioCompletionNotifier *cn);
81 void unregister_completion_notifier(RGWAioCompletionNotifier *cn);
82 };
83
84 /* a single use librados aio completion notifier that hooks into the RGWCompletionManager */
85 class RGWAioCompletionNotifier : public RefCountedObject {
86 librados::AioCompletion *c;
87 RGWCompletionManager *completion_mgr;
88 rgw_io_id io_id;
89 void *user_data;
90 Mutex lock;
91 bool registered;
92
93 public:
94 RGWAioCompletionNotifier(RGWCompletionManager *_mgr, const rgw_io_id& _io_id, void *_user_data);
95 ~RGWAioCompletionNotifier() override {
96 c->release();
97 lock.Lock();
98 bool need_unregister = registered;
99 if (registered) {
100 completion_mgr->get();
101 }
102 registered = false;
103 lock.Unlock();
104 if (need_unregister) {
105 completion_mgr->unregister_completion_notifier(this);
106 completion_mgr->put();
107 }
108 }
109
110 librados::AioCompletion *completion() {
111 return c;
112 }
113
114 void unregister() {
115 Mutex::Locker l(lock);
116 if (!registered) {
117 return;
118 }
119 registered = false;
120 }
121
122 void cb() {
123 lock.Lock();
124 if (!registered) {
125 lock.Unlock();
126 put();
127 return;
128 }
129 completion_mgr->get();
130 registered = false;
131 lock.Unlock();
132 completion_mgr->complete(this, io_id, user_data);
133 completion_mgr->put();
134 put();
135 }
136 };
137
138 // completion notifier with opaque payload (ie a reference-counted pointer)
139 template <typename T>
140 class RGWAioCompletionNotifierWith : public RGWAioCompletionNotifier {
141 T value;
142 public:
143 RGWAioCompletionNotifierWith(RGWCompletionManager *mgr,
144 const rgw_io_id& io_id, void *user_data,
145 T value)
146 : RGWAioCompletionNotifier(mgr, io_id, user_data), value(std::move(value))
147 {}
148 };
149
150 struct RGWCoroutinesEnv {
151 uint64_t run_context;
152 RGWCoroutinesManager *manager;
153 list<RGWCoroutinesStack *> *scheduled_stacks;
154 RGWCoroutinesStack *stack;
155
156 RGWCoroutinesEnv() : run_context(0), manager(NULL), scheduled_stacks(NULL), stack(NULL) {}
157 };
158
159 enum RGWCoroutineState {
160 RGWCoroutine_Error = -2,
161 RGWCoroutine_Done = -1,
162 RGWCoroutine_Run = 0,
163 };
164
165 struct rgw_spawned_stacks {
166 vector<RGWCoroutinesStack *> entries;
167
168 rgw_spawned_stacks() {}
169
170 void add_pending(RGWCoroutinesStack *s) {
171 entries.push_back(s);
172 }
173
174 void inherit(rgw_spawned_stacks *source) {
175 for (vector<RGWCoroutinesStack *>::iterator iter = source->entries.begin();
176 iter != source->entries.end(); ++iter) {
177 add_pending(*iter);
178 }
179 source->entries.clear();
180 }
181 };
182
183
184
185 class RGWCoroutine : public RefCountedObject, public boost::asio::coroutine {
186 friend class RGWCoroutinesStack;
187
188 struct StatusItem {
189 utime_t timestamp;
190 string status;
191
192 StatusItem(utime_t& t, const string& s) : timestamp(t), status(s) {}
193
194 void dump(Formatter *f) const;
195 };
196
197 #define MAX_COROUTINE_HISTORY 10
198
199 struct Status {
200 CephContext *cct;
201 RWLock lock;
202 int max_history;
203
204 utime_t timestamp;
205 stringstream status;
206
207 explicit Status(CephContext *_cct) : cct(_cct), lock("RGWCoroutine::Status::lock"), max_history(MAX_COROUTINE_HISTORY) {}
208
209 deque<StatusItem> history;
210
211 stringstream& set_status();
212 } status;
213
214 stringstream description;
215
216 protected:
217 bool _yield_ret;
218 boost::asio::coroutine drain_cr;
219
220 CephContext *cct;
221
222 RGWCoroutinesStack *stack;
223 int retcode;
224 int state;
225
226 rgw_spawned_stacks spawned;
227
228 stringstream error_stream;
229
230 int set_state(int s, int ret = 0) {
231 retcode = ret;
232 state = s;
233 return ret;
234 }
235 int set_cr_error(int ret) {
236 return set_state(RGWCoroutine_Error, ret);
237 }
238 int set_cr_done() {
239 return set_state(RGWCoroutine_Done, 0);
240 }
241 void set_io_blocked(bool flag);
242
243 void reset_description() {
244 description.str(string());
245 }
246
247 stringstream& set_description() {
248 return description;
249 }
250 stringstream& set_status() {
251 return status.set_status();
252 }
253
254 stringstream& set_status(const string& s) {
255 stringstream& status = set_status();
256 status << s;
257 return status;
258 }
259
260 virtual int operate_wrapper() {
261 return operate();
262 }
263 public:
264 RGWCoroutine(CephContext *_cct) : status(_cct), _yield_ret(false), cct(_cct), stack(NULL), retcode(0), state(RGWCoroutine_Run) {}
265 ~RGWCoroutine() override;
266
267 virtual int operate() = 0;
268
269 bool is_done() { return (state == RGWCoroutine_Done || state == RGWCoroutine_Error); }
270 bool is_error() { return (state == RGWCoroutine_Error); }
271
272 stringstream& log_error() { return error_stream; }
273 string error_str() {
274 return error_stream.str();
275 }
276
277 void set_retcode(int r) {
278 retcode = r;
279 }
280
281 int get_ret_status() {
282 return retcode;
283 }
284
285 void call(RGWCoroutine *op); /* call at the same stack we're in */
286 RGWCoroutinesStack *spawn(RGWCoroutine *op, bool wait); /* execute on a different stack */
287 bool collect(int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */
288 bool collect_next(int *ret, RGWCoroutinesStack **collected_stack = NULL); /* returns true if found a stack to collect */
289
290 int wait(const utime_t& interval);
291 bool drain_children(int num_cr_left, RGWCoroutinesStack *skip_stack = NULL); /* returns true if needed to be called again */
292 void wakeup();
293 void set_sleeping(bool flag); /* put in sleep, or wakeup from sleep */
294
295 size_t num_spawned() {
296 return spawned.entries.size();
297 }
298
299 void wait_for_child();
300
301 virtual string to_str() const;
302
303 RGWCoroutinesStack *get_stack() const {
304 return stack;
305 }
306
307 RGWCoroutinesEnv *get_env() const;
308
309 void dump(Formatter *f) const;
310
311 void init_new_io(RGWIOProvider *io_provider); /* only links the default io id */
312
313 int io_block(int ret = 0) {
314 return io_block(ret, -1);
315 }
316 int io_block(int ret, int64_t io_id);
317 int io_block(int ret, const rgw_io_id& io_id);
318 void io_complete() {
319 io_complete(rgw_io_id{});
320 }
321 void io_complete(const rgw_io_id& io_id);
322 };
323
324 ostream& operator<<(ostream& out, const RGWCoroutine& cr);
325
326 #define yield_until_true(x) \
327 do { \
328 do { \
329 yield _yield_ret = x; \
330 } while (!_yield_ret); \
331 _yield_ret = false; \
332 } while (0)
333
334 #define drain_all() \
335 drain_cr = boost::asio::coroutine(); \
336 yield_until_true(drain_children(0))
337
338 #define drain_all_but(n) \
339 drain_cr = boost::asio::coroutine(); \
340 yield_until_true(drain_children(n))
341
342 #define drain_all_but_stack(stack) \
343 drain_cr = boost::asio::coroutine(); \
344 yield_until_true(drain_children(1, stack))
345
346 template <class T>
347 class RGWConsumerCR : public RGWCoroutine {
348 list<T> product;
349
350 public:
351 explicit RGWConsumerCR(CephContext *_cct) : RGWCoroutine(_cct) {}
352
353 bool has_product() {
354 return !product.empty();
355 }
356
357 void wait_for_product() {
358 if (!has_product()) {
359 set_sleeping(true);
360 }
361 }
362
363 bool consume(T *p) {
364 if (product.empty()) {
365 return false;
366 }
367 *p = product.front();
368 product.pop_front();
369 return true;
370 }
371
372 void receive(const T& p, bool wakeup = true);
373 void receive(list<T>& l, bool wakeup = true);
374 };
375
376 class RGWCoroutinesStack : public RefCountedObject {
377 friend class RGWCoroutine;
378 friend class RGWCoroutinesManager;
379
380 CephContext *cct;
381
382 RGWCoroutinesManager *ops_mgr;
383
384 list<RGWCoroutine *> ops;
385 list<RGWCoroutine *>::iterator pos;
386
387 rgw_spawned_stacks spawned;
388
389 set<RGWCoroutinesStack *> blocked_by_stack;
390 set<RGWCoroutinesStack *> blocking_stacks;
391
392 map<int64_t, rgw_io_id> io_finish_ids;
393 rgw_io_id io_blocked_id;
394
395 bool done_flag;
396 bool error_flag;
397 bool blocked_flag;
398 bool sleep_flag;
399 bool interval_wait_flag;
400
401 bool is_scheduled;
402
403 bool is_waiting_for_child;
404
405 int retcode;
406
407 uint64_t run_count;
408
409 protected:
410 RGWCoroutinesEnv *env;
411 RGWCoroutinesStack *parent;
412
413 RGWCoroutinesStack *spawn(RGWCoroutine *source_op, RGWCoroutine *next_op, bool wait);
414 bool collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */
415 bool collect_next(RGWCoroutine *op, int *ret, RGWCoroutinesStack **collected_stack); /* returns true if found a stack to collect */
416 public:
417 RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start = NULL);
418 ~RGWCoroutinesStack() override;
419
420 int operate(RGWCoroutinesEnv *env);
421
422 bool is_done() {
423 return done_flag;
424 }
425 bool is_error() {
426 return error_flag;
427 }
428 bool is_blocked_by_stack() {
429 return !blocked_by_stack.empty();
430 }
431 void set_io_blocked(bool flag) {
432 blocked_flag = flag;
433 }
434 void set_io_blocked_id(const rgw_io_id& io_id) {
435 io_blocked_id = io_id;
436 }
437 bool is_io_blocked() {
438 return blocked_flag && !done_flag;
439 }
440 bool can_io_unblock(const rgw_io_id& io_id) {
441 return ((io_blocked_id.id < 0) ||
442 io_blocked_id.intersects(io_id));
443 }
444 bool try_io_unblock(const rgw_io_id& io_id);
445 bool consume_io_finish(const rgw_io_id& io_id);
446 void set_interval_wait(bool flag) {
447 interval_wait_flag = flag;
448 }
449 bool is_interval_waiting() {
450 return interval_wait_flag;
451 }
452 void set_sleeping(bool flag) {
453 bool wakeup = sleep_flag & !flag;
454 sleep_flag = flag;
455 if (wakeup) {
456 schedule();
457 }
458 }
459 bool is_sleeping() {
460 return sleep_flag;
461 }
462 void set_is_scheduled(bool flag) {
463 is_scheduled = flag;
464 }
465
466 bool is_blocked() {
467 return is_blocked_by_stack() || is_sleeping() ||
468 is_io_blocked() || waiting_for_child() ;
469 }
470
471 void schedule();
472 void _schedule();
473
474 int get_ret_status() {
475 return retcode;
476 }
477
478 string error_str();
479
480 void call(RGWCoroutine *next_op);
481 RGWCoroutinesStack *spawn(RGWCoroutine *next_op, bool wait);
482 int unwind(int retcode);
483
484 int wait(const utime_t& interval);
485 void wakeup();
486 void io_complete() {
487 io_complete(rgw_io_id{});
488 }
489 void io_complete(const rgw_io_id& io_id);
490
491 bool collect(int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */
492
493 void cancel();
494
495 RGWAioCompletionNotifier *create_completion_notifier();
496 template <typename T>
497 RGWAioCompletionNotifier *create_completion_notifier(T value);
498 RGWCompletionManager *get_completion_mgr();
499
500 void set_blocked_by(RGWCoroutinesStack *s) {
501 blocked_by_stack.insert(s);
502 s->blocking_stacks.insert(this);
503 }
504
505 void set_wait_for_child(bool flag) {
506 is_waiting_for_child = flag;
507 }
508
509 bool waiting_for_child() {
510 return is_waiting_for_child;
511 }
512
513 bool unblock_stack(RGWCoroutinesStack **s);
514
515 RGWCoroutinesEnv *get_env() const { return env; }
516
517 void dump(Formatter *f) const;
518
519 void init_new_io(RGWIOProvider *io_provider);
520 };
521
522 template <class T>
523 void RGWConsumerCR<T>::receive(list<T>& l, bool wakeup)
524 {
525 product.splice(product.end(), l);
526 if (wakeup) {
527 set_sleeping(false);
528 }
529 }
530
531
532 template <class T>
533 void RGWConsumerCR<T>::receive(const T& p, bool wakeup)
534 {
535 product.push_back(p);
536 if (wakeup) {
537 set_sleeping(false);
538 }
539 }
540
541 class RGWCoroutinesManagerRegistry : public RefCountedObject, public AdminSocketHook {
542 CephContext *cct;
543
544 set<RGWCoroutinesManager *> managers;
545 RWLock lock;
546
547 string admin_command;
548
549 public:
550 explicit RGWCoroutinesManagerRegistry(CephContext *_cct) : cct(_cct), lock("RGWCoroutinesRegistry::lock") {}
551 ~RGWCoroutinesManagerRegistry() override;
552
553 void add(RGWCoroutinesManager *mgr);
554 void remove(RGWCoroutinesManager *mgr);
555
556 int hook_to_admin_command(const string& command);
557 bool call(std::string_view command, const cmdmap_t& cmdmap,
558 std::string_view format, bufferlist& out) override;
559
560 void dump(Formatter *f) const;
561 };
562
563 class RGWCoroutinesManager {
564 CephContext *cct;
565 std::atomic<bool> going_down = { false };
566
567 std::atomic<int64_t> run_context_count = { 0 };
568 map<uint64_t, set<RGWCoroutinesStack *> > run_contexts;
569
570 std::atomic<int64_t> max_io_id = { 0 };
571
572 RWLock lock;
573
574 RGWIOIDProvider io_id_provider;
575
576 void handle_unblocked_stack(set<RGWCoroutinesStack *>& context_stacks, list<RGWCoroutinesStack *>& scheduled_stacks,
577 RGWCompletionManager::io_completion& io, int *waiting_count);
578 protected:
579 RGWCompletionManager *completion_mgr;
580 RGWCoroutinesManagerRegistry *cr_registry;
581
582 int ops_window;
583
584 string id;
585
586 void put_completion_notifier(RGWAioCompletionNotifier *cn);
587 public:
588 RGWCoroutinesManager(CephContext *_cct, RGWCoroutinesManagerRegistry *_cr_registry) : cct(_cct), lock("RGWCoroutinesManager::lock"),
589 cr_registry(_cr_registry), ops_window(RGW_ASYNC_OPS_MGR_WINDOW) {
590 completion_mgr = new RGWCompletionManager(cct);
591 if (cr_registry) {
592 cr_registry->add(this);
593 }
594 }
595 virtual ~RGWCoroutinesManager() {
596 stop();
597 completion_mgr->put();
598 if (cr_registry) {
599 cr_registry->remove(this);
600 }
601 }
602
603 int run(list<RGWCoroutinesStack *>& ops);
604 int run(RGWCoroutine *op);
605 void stop() {
606 bool expected = false;
607 if (going_down.compare_exchange_strong(expected, true)) {
608 completion_mgr->go_down();
609 }
610 }
611
612 virtual void report_error(RGWCoroutinesStack *op);
613
614 RGWAioCompletionNotifier *create_completion_notifier(RGWCoroutinesStack *stack);
615 template <typename T>
616 RGWAioCompletionNotifier *create_completion_notifier(RGWCoroutinesStack *stack, T value);
617 RGWCompletionManager *get_completion_mgr() { return completion_mgr; }
618
619 void schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack);
620 void _schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack);
621 RGWCoroutinesStack *allocate_stack();
622
623 int64_t get_next_io_id();
624
625 void set_sleeping(RGWCoroutine *cr, bool flag);
626 void io_complete(RGWCoroutine *cr, const rgw_io_id& io_id);
627
628 virtual string get_id();
629 void dump(Formatter *f) const;
630
631 RGWIOIDProvider& get_io_id_provider() {
632 return io_id_provider;
633 }
634 };
635
636 template <typename T>
637 RGWAioCompletionNotifier *RGWCoroutinesManager::create_completion_notifier(RGWCoroutinesStack *stack, T value)
638 {
639 rgw_io_id io_id{get_next_io_id(), -1};
640 RGWAioCompletionNotifier *cn = new RGWAioCompletionNotifierWith<T>(completion_mgr, io_id, (void *)stack, std::move(value));
641 completion_mgr->register_completion_notifier(cn);
642 return cn;
643 }
644
645 template <typename T>
646 RGWAioCompletionNotifier *RGWCoroutinesStack::create_completion_notifier(T value)
647 {
648 return ops_mgr->create_completion_notifier(this, std::move(value));
649 }
650
651 class RGWSimpleCoroutine : public RGWCoroutine {
652 bool called_cleanup;
653
654 int operate() override;
655
656 int state_init();
657 int state_send_request();
658 int state_request_complete();
659 int state_all_complete();
660
661 void call_cleanup();
662
663 public:
664 RGWSimpleCoroutine(CephContext *_cct) : RGWCoroutine(_cct), called_cleanup(false) {}
665 ~RGWSimpleCoroutine() override;
666
667 virtual int init() { return 0; }
668 virtual int send_request() = 0;
669 virtual int request_complete() = 0;
670 virtual int finish() { return 0; }
671 virtual void request_cleanup() {}
672 };
673
674 #endif