]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_coroutine.h
update sources to v12.1.0
[ceph.git] / ceph / src / rgw / rgw_coroutine.h
1 #ifndef CEPH_RGW_COROUTINE_H
2 #define CEPH_RGW_COROUTINE_H
3
4 #ifdef _ASSERT_H
5 #define NEED_ASSERT_H
6 #pragma push_macro("_ASSERT_H")
7 #endif
8
9 #include <boost/asio.hpp>
10 #include <boost/intrusive_ptr.hpp>
11
12 #ifdef NEED_ASSERT_H
13 #pragma pop_macro("_ASSERT_H")
14 #endif
15
16 #include "include/utime.h"
17 #include "common/RefCountedObj.h"
18 #include "common/debug.h"
19 #include "common/Timer.h"
20 #include "common/admin_socket.h"
21
22 #include "rgw_common.h"
23 #include <boost/asio/coroutine.hpp>
24
25 #include <atomic>
26
27 #define RGW_ASYNC_OPS_MGR_WINDOW 100
28
29 class RGWCoroutinesStack;
30 class RGWCoroutinesManager;
31 class RGWAioCompletionNotifier;
32
33 class RGWCompletionManager : public RefCountedObject {
34 CephContext *cct;
35 list<void *> complete_reqs;
36 using NotifierRef = boost::intrusive_ptr<RGWAioCompletionNotifier>;
37 set<NotifierRef> cns;
38
39 Mutex lock;
40 Cond cond;
41
42 SafeTimer timer;
43
44 std::atomic<bool> going_down = { false };
45
46 map<void *, void *> waiters;
47
48 class WaitContext;
49
50 protected:
51 void _wakeup(void *opaque);
52 void _complete(RGWAioCompletionNotifier *cn, void *user_info);
53 public:
54 RGWCompletionManager(CephContext *_cct);
55 ~RGWCompletionManager() override;
56
57 void complete(RGWAioCompletionNotifier *cn, void *user_info);
58 int get_next(void **user_info);
59 bool try_get_next(void **user_info);
60
61 void go_down();
62
63 /*
64 * wait for interval length to complete user_info
65 */
66 void wait_interval(void *opaque, const utime_t& interval, void *user_info);
67 void wakeup(void *opaque);
68
69 void register_completion_notifier(RGWAioCompletionNotifier *cn);
70 void unregister_completion_notifier(RGWAioCompletionNotifier *cn);
71 };
72
73 /* a single use librados aio completion notifier that hooks into the RGWCompletionManager */
74 class RGWAioCompletionNotifier : public RefCountedObject {
75 librados::AioCompletion *c;
76 RGWCompletionManager *completion_mgr;
77 void *user_data;
78 Mutex lock;
79 bool registered;
80
81 public:
82 RGWAioCompletionNotifier(RGWCompletionManager *_mgr, void *_user_data);
83 ~RGWAioCompletionNotifier() override {
84 c->release();
85 lock.Lock();
86 bool need_unregister = registered;
87 if (registered) {
88 completion_mgr->get();
89 }
90 registered = false;
91 lock.Unlock();
92 if (need_unregister) {
93 completion_mgr->unregister_completion_notifier(this);
94 completion_mgr->put();
95 }
96 }
97
98 librados::AioCompletion *completion() {
99 return c;
100 }
101
102 void unregister() {
103 Mutex::Locker l(lock);
104 if (!registered) {
105 return;
106 }
107 registered = false;
108 }
109
110 void cb() {
111 lock.Lock();
112 if (!registered) {
113 lock.Unlock();
114 put();
115 return;
116 }
117 completion_mgr->get();
118 registered = false;
119 lock.Unlock();
120 completion_mgr->complete(this, user_data);
121 completion_mgr->put();
122 put();
123 }
124 };
125
126 struct RGWCoroutinesEnv {
127 uint64_t run_context;
128 RGWCoroutinesManager *manager;
129 list<RGWCoroutinesStack *> *scheduled_stacks;
130 RGWCoroutinesStack *stack;
131
132 RGWCoroutinesEnv() : run_context(0), manager(NULL), scheduled_stacks(NULL), stack(NULL) {}
133 };
134
135 enum RGWCoroutineState {
136 RGWCoroutine_Error = -2,
137 RGWCoroutine_Done = -1,
138 RGWCoroutine_Run = 0,
139 };
140
141 struct rgw_spawned_stacks {
142 vector<RGWCoroutinesStack *> entries;
143
144 rgw_spawned_stacks() {}
145
146 void add_pending(RGWCoroutinesStack *s) {
147 entries.push_back(s);
148 }
149
150 void inherit(rgw_spawned_stacks *source) {
151 for (vector<RGWCoroutinesStack *>::iterator iter = source->entries.begin();
152 iter != source->entries.end(); ++iter) {
153 add_pending(*iter);
154 }
155 source->entries.clear();
156 }
157 };
158
159
160
161 class RGWCoroutine : public RefCountedObject, public boost::asio::coroutine {
162 friend class RGWCoroutinesStack;
163
164 struct StatusItem {
165 utime_t timestamp;
166 string status;
167
168 StatusItem(utime_t& t, const string& s) : timestamp(t), status(s) {}
169
170 void dump(Formatter *f) const;
171 };
172
173 #define MAX_COROUTINE_HISTORY 10
174
175 struct Status {
176 CephContext *cct;
177 RWLock lock;
178 int max_history;
179
180 utime_t timestamp;
181 stringstream status;
182
183 Status(CephContext *_cct) : cct(_cct), lock("RGWCoroutine::Status::lock"), max_history(MAX_COROUTINE_HISTORY) {}
184
185 deque<StatusItem> history;
186
187 stringstream& set_status();
188 } status;
189
190 stringstream description;
191
192 protected:
193 bool _yield_ret;
194 boost::asio::coroutine drain_cr;
195
196 CephContext *cct;
197
198 RGWCoroutinesStack *stack;
199 int retcode;
200 int state;
201
202 rgw_spawned_stacks spawned;
203
204 stringstream error_stream;
205
206 int set_state(int s, int ret = 0) {
207 state = s;
208 return ret;
209 }
210 int set_cr_error(int ret) {
211 state = RGWCoroutine_Error;
212 return ret;
213 }
214 int set_cr_done() {
215 state = RGWCoroutine_Done;
216 return 0;
217 }
218 void set_io_blocked(bool flag);
219 int io_block(int ret = 0);
220
221 void reset_description() {
222 description.str(string());
223 }
224
225 stringstream& set_description() {
226 return description;
227 }
228 stringstream& set_status() {
229 return status.set_status();
230 }
231
232 stringstream& set_status(const string& s) {
233 stringstream& status = set_status();
234 status << s;
235 return status;
236 }
237
238 public:
239 RGWCoroutine(CephContext *_cct) : status(_cct), _yield_ret(false), cct(_cct), stack(NULL), retcode(0), state(RGWCoroutine_Run) {}
240 ~RGWCoroutine() override;
241
242 virtual int operate() = 0;
243
244 bool is_done() { return (state == RGWCoroutine_Done || state == RGWCoroutine_Error); }
245 bool is_error() { return (state == RGWCoroutine_Error); }
246
247 stringstream& log_error() { return error_stream; }
248 string error_str() {
249 return error_stream.str();
250 }
251
252 void set_retcode(int r) {
253 retcode = r;
254 }
255
256 int get_ret_status() {
257 return retcode;
258 }
259
260 void call(RGWCoroutine *op); /* call at the same stack we're in */
261 RGWCoroutinesStack *spawn(RGWCoroutine *op, bool wait); /* execute on a different stack */
262 bool collect(int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */
263 bool collect_next(int *ret, RGWCoroutinesStack **collected_stack = NULL); /* returns true if found a stack to collect */
264
265 int wait(const utime_t& interval);
266 bool drain_children(int num_cr_left, RGWCoroutinesStack *skip_stack = NULL); /* returns true if needed to be called again */
267 void wakeup();
268 void set_sleeping(bool flag); /* put in sleep, or wakeup from sleep */
269
270 size_t num_spawned() {
271 return spawned.entries.size();
272 }
273
274 void wait_for_child();
275
276 virtual string to_str() const;
277
278 RGWCoroutinesStack *get_stack() const {
279 return stack;
280 }
281
282 void dump(Formatter *f) const;
283 };
284
285 ostream& operator<<(ostream& out, const RGWCoroutine& cr);
286
287 #define yield_until_true(x) \
288 do { \
289 do { \
290 yield _yield_ret = x; \
291 } while (!_yield_ret); \
292 _yield_ret = false; \
293 } while (0)
294
295 #define drain_all() \
296 drain_cr = boost::asio::coroutine(); \
297 yield_until_true(drain_children(0))
298
299 #define drain_all_but(n) \
300 drain_cr = boost::asio::coroutine(); \
301 yield_until_true(drain_children(n))
302
303 #define drain_all_but_stack(stack) \
304 drain_cr = boost::asio::coroutine(); \
305 yield_until_true(drain_children(1, stack))
306
307 template <class T>
308 class RGWConsumerCR : public RGWCoroutine {
309 list<T> product;
310
311 public:
312 RGWConsumerCR(CephContext *_cct) : RGWCoroutine(_cct) {}
313
314 bool has_product() {
315 return !product.empty();
316 }
317
318 void wait_for_product() {
319 if (!has_product()) {
320 set_sleeping(true);
321 }
322 }
323
324 bool consume(T *p) {
325 if (product.empty()) {
326 return false;
327 }
328 *p = product.front();
329 product.pop_front();
330 return true;
331 }
332
333 void receive(const T& p, bool wakeup = true);
334 void receive(list<T>& l, bool wakeup = true);
335 };
336
337 class RGWCoroutinesStack : public RefCountedObject {
338 friend class RGWCoroutine;
339 friend class RGWCoroutinesManager;
340
341 CephContext *cct;
342
343 RGWCoroutinesManager *ops_mgr;
344
345 list<RGWCoroutine *> ops;
346 list<RGWCoroutine *>::iterator pos;
347
348 rgw_spawned_stacks spawned;
349
350 set<RGWCoroutinesStack *> blocked_by_stack;
351 set<RGWCoroutinesStack *> blocking_stacks;
352
353 bool done_flag;
354 bool error_flag;
355 bool blocked_flag;
356 bool sleep_flag;
357 bool interval_wait_flag;
358
359 bool is_scheduled;
360
361 bool is_waiting_for_child;
362
363 int retcode;
364
365 uint64_t run_count;
366
367 protected:
368 RGWCoroutinesEnv *env;
369 RGWCoroutinesStack *parent;
370
371 RGWCoroutinesStack *spawn(RGWCoroutine *source_op, RGWCoroutine *next_op, bool wait);
372 bool collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */
373 bool collect_next(RGWCoroutine *op, int *ret, RGWCoroutinesStack **collected_stack); /* returns true if found a stack to collect */
374 public:
375 RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start = NULL);
376 ~RGWCoroutinesStack() override;
377
378 int operate(RGWCoroutinesEnv *env);
379
380 bool is_done() {
381 return done_flag;
382 }
383 bool is_error() {
384 return error_flag;
385 }
386 bool is_blocked_by_stack() {
387 return !blocked_by_stack.empty();
388 }
389 void set_io_blocked(bool flag) {
390 blocked_flag = flag;
391 }
392 bool is_io_blocked() {
393 return blocked_flag;
394 }
395 void set_interval_wait(bool flag) {
396 interval_wait_flag = flag;
397 }
398 bool is_interval_waiting() {
399 return interval_wait_flag;
400 }
401 void set_sleeping(bool flag) {
402 bool wakeup = sleep_flag & !flag;
403 sleep_flag = flag;
404 if (wakeup) {
405 schedule();
406 }
407 }
408 bool is_sleeping() {
409 return sleep_flag;
410 }
411 void set_is_scheduled(bool flag) {
412 is_scheduled = flag;
413 }
414
415 bool is_blocked() {
416 return is_blocked_by_stack() || is_sleeping() ||
417 is_io_blocked() || waiting_for_child() ;
418 }
419
420 void schedule(list<RGWCoroutinesStack *> *stacks = NULL) {
421 if (!stacks) {
422 stacks = env->scheduled_stacks;
423 }
424 if (!is_scheduled) {
425 stacks->push_back(this);
426 is_scheduled = true;
427 }
428 }
429
430 int get_ret_status() {
431 return retcode;
432 }
433
434 string error_str();
435
436 void call(RGWCoroutine *next_op);
437 RGWCoroutinesStack *spawn(RGWCoroutine *next_op, bool wait);
438 int unwind(int retcode);
439
440 int wait(const utime_t& interval);
441 void wakeup();
442
443 bool collect(int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */
444
445 RGWAioCompletionNotifier *create_completion_notifier();
446 RGWCompletionManager *get_completion_mgr();
447
448 void set_blocked_by(RGWCoroutinesStack *s) {
449 blocked_by_stack.insert(s);
450 s->blocking_stacks.insert(this);
451 }
452
453 void set_wait_for_child(bool flag) {
454 is_waiting_for_child = flag;
455 }
456
457 bool waiting_for_child() {
458 return is_waiting_for_child;
459 }
460
461 bool unblock_stack(RGWCoroutinesStack **s);
462
463 RGWCoroutinesEnv *get_env() { return env; }
464
465 void dump(Formatter *f) const;
466 };
467
468 template <class T>
469 void RGWConsumerCR<T>::receive(list<T>& l, bool wakeup)
470 {
471 product.splice(product.end(), l);
472 if (wakeup) {
473 set_sleeping(false);
474 }
475 }
476
477
478 template <class T>
479 void RGWConsumerCR<T>::receive(const T& p, bool wakeup)
480 {
481 product.push_back(p);
482 if (wakeup) {
483 set_sleeping(false);
484 }
485 }
486
487 class RGWCoroutinesManagerRegistry : public RefCountedObject, public AdminSocketHook {
488 CephContext *cct;
489
490 set<RGWCoroutinesManager *> managers;
491 RWLock lock;
492
493 string admin_command;
494
495 public:
496 RGWCoroutinesManagerRegistry(CephContext *_cct) : cct(_cct), lock("RGWCoroutinesRegistry::lock") {}
497 ~RGWCoroutinesManagerRegistry() override;
498
499 void add(RGWCoroutinesManager *mgr);
500 void remove(RGWCoroutinesManager *mgr);
501
502 int hook_to_admin_command(const string& command);
503 bool call(std::string command, cmdmap_t& cmdmap, std::string format,
504 bufferlist& out) override;
505
506 void dump(Formatter *f) const;
507 };
508
509 class RGWCoroutinesManager {
510 CephContext *cct;
511 std::atomic<bool> going_down = { false };
512
513 std::atomic<int64_t> run_context_count = { 0 };
514 map<uint64_t, set<RGWCoroutinesStack *> > run_contexts;
515
516 RWLock lock;
517
518 void handle_unblocked_stack(set<RGWCoroutinesStack *>& context_stacks, list<RGWCoroutinesStack *>& scheduled_stacks, RGWCoroutinesStack *stack, int *waiting_count);
519 protected:
520 RGWCompletionManager *completion_mgr;
521 RGWCoroutinesManagerRegistry *cr_registry;
522
523 int ops_window;
524
525 string id;
526
527 void put_completion_notifier(RGWAioCompletionNotifier *cn);
528 public:
529 RGWCoroutinesManager(CephContext *_cct, RGWCoroutinesManagerRegistry *_cr_registry) : cct(_cct), lock("RGWCoroutinesManager::lock"),
530 cr_registry(_cr_registry), ops_window(RGW_ASYNC_OPS_MGR_WINDOW) {
531 completion_mgr = new RGWCompletionManager(cct);
532 if (cr_registry) {
533 cr_registry->add(this);
534 }
535 }
536 virtual ~RGWCoroutinesManager() {
537 stop();
538 completion_mgr->put();
539 if (cr_registry) {
540 cr_registry->remove(this);
541 }
542 }
543
544 int run(list<RGWCoroutinesStack *>& ops);
545 int run(RGWCoroutine *op);
546 void stop() {
547 bool expected = false;
548 if (going_down.compare_exchange_strong(expected, true)) {
549 completion_mgr->go_down();
550 }
551 }
552
553 virtual void report_error(RGWCoroutinesStack *op);
554
555 RGWAioCompletionNotifier *create_completion_notifier(RGWCoroutinesStack *stack);
556 RGWCompletionManager *get_completion_mgr() { return completion_mgr; }
557
558 void schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack);
559 RGWCoroutinesStack *allocate_stack();
560
561 virtual string get_id();
562 void dump(Formatter *f) const;
563 };
564
565 class RGWSimpleCoroutine : public RGWCoroutine {
566 bool called_cleanup;
567
568 int operate() override;
569
570 int state_init();
571 int state_send_request();
572 int state_request_complete();
573 int state_all_complete();
574
575 void call_cleanup();
576
577 public:
578 RGWSimpleCoroutine(CephContext *_cct) : RGWCoroutine(_cct), called_cleanup(false) {}
579 ~RGWSimpleCoroutine() override;
580
581 virtual int init() { return 0; }
582 virtual int send_request() = 0;
583 virtual int request_complete() = 0;
584 virtual int finish() { return 0; }
585 virtual void request_cleanup() {}
586 };
587
588 #endif