]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_coroutine.h
update sources to v12.2.3
[ceph.git] / ceph / src / rgw / rgw_coroutine.h
CommitLineData
7c673cae
FG
1#ifndef CEPH_RGW_COROUTINE_H
2#define CEPH_RGW_COROUTINE_H
3
4#ifdef _ASSERT_H
5#define NEED_ASSERT_H
6#pragma push_macro("_ASSERT_H")
7#endif
8
7c673cae
FG
9#include <boost/intrusive_ptr.hpp>
10
11#ifdef NEED_ASSERT_H
12#pragma pop_macro("_ASSERT_H")
13#endif
14
15#include "include/utime.h"
16#include "common/RefCountedObj.h"
17#include "common/debug.h"
18#include "common/Timer.h"
19#include "common/admin_socket.h"
20
21#include "rgw_common.h"
31f18b77 22#include <boost/asio/coroutine.hpp>
7c673cae
FG
23
24#include <atomic>
25
26#define RGW_ASYNC_OPS_MGR_WINDOW 100
27
28class RGWCoroutinesStack;
29class RGWCoroutinesManager;
30class RGWAioCompletionNotifier;
31
32class RGWCompletionManager : public RefCountedObject {
33 CephContext *cct;
34 list<void *> complete_reqs;
35 using NotifierRef = boost::intrusive_ptr<RGWAioCompletionNotifier>;
36 set<NotifierRef> cns;
37
38 Mutex lock;
39 Cond cond;
40
41 SafeTimer timer;
42
43 std::atomic<bool> going_down = { false };
44
45 map<void *, void *> waiters;
46
47 class WaitContext;
48
49protected:
50 void _wakeup(void *opaque);
51 void _complete(RGWAioCompletionNotifier *cn, void *user_info);
52public:
53 RGWCompletionManager(CephContext *_cct);
54 ~RGWCompletionManager() override;
55
56 void complete(RGWAioCompletionNotifier *cn, void *user_info);
57 int get_next(void **user_info);
58 bool try_get_next(void **user_info);
59
60 void go_down();
61
62 /*
63 * wait for interval length to complete user_info
64 */
65 void wait_interval(void *opaque, const utime_t& interval, void *user_info);
66 void wakeup(void *opaque);
67
68 void register_completion_notifier(RGWAioCompletionNotifier *cn);
69 void unregister_completion_notifier(RGWAioCompletionNotifier *cn);
70};
71
72/* a single use librados aio completion notifier that hooks into the RGWCompletionManager */
73class RGWAioCompletionNotifier : public RefCountedObject {
74 librados::AioCompletion *c;
75 RGWCompletionManager *completion_mgr;
76 void *user_data;
77 Mutex lock;
78 bool registered;
79
80public:
81 RGWAioCompletionNotifier(RGWCompletionManager *_mgr, void *_user_data);
82 ~RGWAioCompletionNotifier() override {
83 c->release();
84 lock.Lock();
85 bool need_unregister = registered;
86 if (registered) {
87 completion_mgr->get();
88 }
89 registered = false;
90 lock.Unlock();
91 if (need_unregister) {
92 completion_mgr->unregister_completion_notifier(this);
93 completion_mgr->put();
94 }
95 }
96
97 librados::AioCompletion *completion() {
98 return c;
99 }
100
101 void unregister() {
102 Mutex::Locker l(lock);
103 if (!registered) {
104 return;
105 }
106 registered = false;
107 }
108
109 void cb() {
110 lock.Lock();
111 if (!registered) {
112 lock.Unlock();
113 put();
114 return;
115 }
116 completion_mgr->get();
117 registered = false;
118 lock.Unlock();
119 completion_mgr->complete(this, user_data);
120 completion_mgr->put();
121 put();
122 }
123};
124
125struct RGWCoroutinesEnv {
126 uint64_t run_context;
127 RGWCoroutinesManager *manager;
128 list<RGWCoroutinesStack *> *scheduled_stacks;
129 RGWCoroutinesStack *stack;
130
131 RGWCoroutinesEnv() : run_context(0), manager(NULL), scheduled_stacks(NULL), stack(NULL) {}
132};
133
134enum RGWCoroutineState {
135 RGWCoroutine_Error = -2,
136 RGWCoroutine_Done = -1,
137 RGWCoroutine_Run = 0,
138};
139
140struct rgw_spawned_stacks {
141 vector<RGWCoroutinesStack *> entries;
142
143 rgw_spawned_stacks() {}
144
145 void add_pending(RGWCoroutinesStack *s) {
146 entries.push_back(s);
147 }
148
149 void inherit(rgw_spawned_stacks *source) {
150 for (vector<RGWCoroutinesStack *>::iterator iter = source->entries.begin();
151 iter != source->entries.end(); ++iter) {
152 add_pending(*iter);
153 }
154 source->entries.clear();
155 }
156};
157
158
159
160class RGWCoroutine : public RefCountedObject, public boost::asio::coroutine {
161 friend class RGWCoroutinesStack;
162
163 struct StatusItem {
164 utime_t timestamp;
165 string status;
166
167 StatusItem(utime_t& t, const string& s) : timestamp(t), status(s) {}
168
169 void dump(Formatter *f) const;
170 };
171
172#define MAX_COROUTINE_HISTORY 10
173
174 struct Status {
175 CephContext *cct;
176 RWLock lock;
177 int max_history;
178
179 utime_t timestamp;
180 stringstream status;
181
182 Status(CephContext *_cct) : cct(_cct), lock("RGWCoroutine::Status::lock"), max_history(MAX_COROUTINE_HISTORY) {}
183
184 deque<StatusItem> history;
185
186 stringstream& set_status();
187 } status;
188
189 stringstream description;
190
191protected:
192 bool _yield_ret;
193 boost::asio::coroutine drain_cr;
194
195 CephContext *cct;
196
197 RGWCoroutinesStack *stack;
198 int retcode;
199 int state;
200
201 rgw_spawned_stacks spawned;
202
203 stringstream error_stream;
204
205 int set_state(int s, int ret = 0) {
206 state = s;
207 return ret;
208 }
209 int set_cr_error(int ret) {
210 state = RGWCoroutine_Error;
211 return ret;
212 }
213 int set_cr_done() {
214 state = RGWCoroutine_Done;
215 return 0;
216 }
217 void set_io_blocked(bool flag);
218 int io_block(int ret = 0);
219
220 void reset_description() {
221 description.str(string());
222 }
223
224 stringstream& set_description() {
225 return description;
226 }
227 stringstream& set_status() {
228 return status.set_status();
229 }
230
231 stringstream& set_status(const string& s) {
232 stringstream& status = set_status();
233 status << s;
234 return status;
235 }
236
237public:
238 RGWCoroutine(CephContext *_cct) : status(_cct), _yield_ret(false), cct(_cct), stack(NULL), retcode(0), state(RGWCoroutine_Run) {}
239 ~RGWCoroutine() override;
240
241 virtual int operate() = 0;
242
243 bool is_done() { return (state == RGWCoroutine_Done || state == RGWCoroutine_Error); }
244 bool is_error() { return (state == RGWCoroutine_Error); }
245
246 stringstream& log_error() { return error_stream; }
247 string error_str() {
248 return error_stream.str();
249 }
250
251 void set_retcode(int r) {
252 retcode = r;
253 }
254
255 int get_ret_status() {
256 return retcode;
257 }
258
259 void call(RGWCoroutine *op); /* call at the same stack we're in */
260 RGWCoroutinesStack *spawn(RGWCoroutine *op, bool wait); /* execute on a different stack */
261 bool collect(int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */
262 bool collect_next(int *ret, RGWCoroutinesStack **collected_stack = NULL); /* returns true if found a stack to collect */
263
264 int wait(const utime_t& interval);
265 bool drain_children(int num_cr_left, RGWCoroutinesStack *skip_stack = NULL); /* returns true if needed to be called again */
266 void wakeup();
267 void set_sleeping(bool flag); /* put in sleep, or wakeup from sleep */
268
269 size_t num_spawned() {
270 return spawned.entries.size();
271 }
272
273 void wait_for_child();
274
275 virtual string to_str() const;
276
277 RGWCoroutinesStack *get_stack() const {
278 return stack;
279 }
280
281 void dump(Formatter *f) const;
282};
283
284ostream& operator<<(ostream& out, const RGWCoroutine& cr);
285
286#define yield_until_true(x) \
287do { \
288 do { \
289 yield _yield_ret = x; \
290 } while (!_yield_ret); \
291 _yield_ret = false; \
292} while (0)
293
294#define drain_all() \
295 drain_cr = boost::asio::coroutine(); \
296 yield_until_true(drain_children(0))
297
298#define drain_all_but(n) \
299 drain_cr = boost::asio::coroutine(); \
300 yield_until_true(drain_children(n))
301
302#define drain_all_but_stack(stack) \
303 drain_cr = boost::asio::coroutine(); \
304 yield_until_true(drain_children(1, stack))
305
306template <class T>
307class RGWConsumerCR : public RGWCoroutine {
308 list<T> product;
309
310public:
311 RGWConsumerCR(CephContext *_cct) : RGWCoroutine(_cct) {}
312
313 bool has_product() {
314 return !product.empty();
315 }
316
317 void wait_for_product() {
318 if (!has_product()) {
319 set_sleeping(true);
320 }
321 }
322
323 bool consume(T *p) {
324 if (product.empty()) {
325 return false;
326 }
327 *p = product.front();
328 product.pop_front();
329 return true;
330 }
331
332 void receive(const T& p, bool wakeup = true);
333 void receive(list<T>& l, bool wakeup = true);
334};
335
336class RGWCoroutinesStack : public RefCountedObject {
337 friend class RGWCoroutine;
338 friend class RGWCoroutinesManager;
339
340 CephContext *cct;
341
342 RGWCoroutinesManager *ops_mgr;
343
344 list<RGWCoroutine *> ops;
345 list<RGWCoroutine *>::iterator pos;
346
347 rgw_spawned_stacks spawned;
348
349 set<RGWCoroutinesStack *> blocked_by_stack;
350 set<RGWCoroutinesStack *> blocking_stacks;
351
352 bool done_flag;
353 bool error_flag;
354 bool blocked_flag;
355 bool sleep_flag;
356 bool interval_wait_flag;
357
358 bool is_scheduled;
359
360 bool is_waiting_for_child;
361
362 int retcode;
363
364 uint64_t run_count;
365
366protected:
367 RGWCoroutinesEnv *env;
368 RGWCoroutinesStack *parent;
369
370 RGWCoroutinesStack *spawn(RGWCoroutine *source_op, RGWCoroutine *next_op, bool wait);
371 bool collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */
372 bool collect_next(RGWCoroutine *op, int *ret, RGWCoroutinesStack **collected_stack); /* returns true if found a stack to collect */
373public:
374 RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start = NULL);
375 ~RGWCoroutinesStack() override;
376
377 int operate(RGWCoroutinesEnv *env);
378
379 bool is_done() {
380 return done_flag;
381 }
382 bool is_error() {
383 return error_flag;
384 }
385 bool is_blocked_by_stack() {
386 return !blocked_by_stack.empty();
387 }
388 void set_io_blocked(bool flag) {
389 blocked_flag = flag;
390 }
391 bool is_io_blocked() {
392 return blocked_flag;
393 }
394 void set_interval_wait(bool flag) {
395 interval_wait_flag = flag;
396 }
397 bool is_interval_waiting() {
398 return interval_wait_flag;
399 }
400 void set_sleeping(bool flag) {
401 bool wakeup = sleep_flag & !flag;
402 sleep_flag = flag;
403 if (wakeup) {
404 schedule();
405 }
406 }
407 bool is_sleeping() {
408 return sleep_flag;
409 }
410 void set_is_scheduled(bool flag) {
411 is_scheduled = flag;
412 }
413
414 bool is_blocked() {
415 return is_blocked_by_stack() || is_sleeping() ||
416 is_io_blocked() || waiting_for_child() ;
417 }
418
419 void schedule(list<RGWCoroutinesStack *> *stacks = NULL) {
420 if (!stacks) {
421 stacks = env->scheduled_stacks;
422 }
423 if (!is_scheduled) {
424 stacks->push_back(this);
425 is_scheduled = true;
426 }
427 }
428
429 int get_ret_status() {
430 return retcode;
431 }
432
433 string error_str();
434
435 void call(RGWCoroutine *next_op);
436 RGWCoroutinesStack *spawn(RGWCoroutine *next_op, bool wait);
437 int unwind(int retcode);
438
439 int wait(const utime_t& interval);
440 void wakeup();
441
442 bool collect(int *ret, RGWCoroutinesStack *skip_stack); /* returns true if needs to be called again */
443
444 RGWAioCompletionNotifier *create_completion_notifier();
445 RGWCompletionManager *get_completion_mgr();
446
447 void set_blocked_by(RGWCoroutinesStack *s) {
448 blocked_by_stack.insert(s);
449 s->blocking_stacks.insert(this);
450 }
451
452 void set_wait_for_child(bool flag) {
453 is_waiting_for_child = flag;
454 }
455
456 bool waiting_for_child() {
457 return is_waiting_for_child;
458 }
459
460 bool unblock_stack(RGWCoroutinesStack **s);
461
462 RGWCoroutinesEnv *get_env() { return env; }
463
464 void dump(Formatter *f) const;
465};
466
467template <class T>
468void RGWConsumerCR<T>::receive(list<T>& l, bool wakeup)
469{
470 product.splice(product.end(), l);
471 if (wakeup) {
472 set_sleeping(false);
473 }
474}
475
476
477template <class T>
478void RGWConsumerCR<T>::receive(const T& p, bool wakeup)
479{
480 product.push_back(p);
481 if (wakeup) {
482 set_sleeping(false);
483 }
484}
485
486class RGWCoroutinesManagerRegistry : public RefCountedObject, public AdminSocketHook {
487 CephContext *cct;
488
489 set<RGWCoroutinesManager *> managers;
490 RWLock lock;
491
492 string admin_command;
493
494public:
495 RGWCoroutinesManagerRegistry(CephContext *_cct) : cct(_cct), lock("RGWCoroutinesRegistry::lock") {}
496 ~RGWCoroutinesManagerRegistry() override;
497
498 void add(RGWCoroutinesManager *mgr);
499 void remove(RGWCoroutinesManager *mgr);
500
501 int hook_to_admin_command(const string& command);
502 bool call(std::string command, cmdmap_t& cmdmap, std::string format,
503 bufferlist& out) override;
504
505 void dump(Formatter *f) const;
506};
507
508class RGWCoroutinesManager {
509 CephContext *cct;
510 std::atomic<bool> going_down = { false };
511
512 std::atomic<int64_t> run_context_count = { 0 };
513 map<uint64_t, set<RGWCoroutinesStack *> > run_contexts;
514
515 RWLock lock;
516
517 void handle_unblocked_stack(set<RGWCoroutinesStack *>& context_stacks, list<RGWCoroutinesStack *>& scheduled_stacks, RGWCoroutinesStack *stack, int *waiting_count);
518protected:
519 RGWCompletionManager *completion_mgr;
520 RGWCoroutinesManagerRegistry *cr_registry;
521
522 int ops_window;
523
524 string id;
525
526 void put_completion_notifier(RGWAioCompletionNotifier *cn);
527public:
528 RGWCoroutinesManager(CephContext *_cct, RGWCoroutinesManagerRegistry *_cr_registry) : cct(_cct), lock("RGWCoroutinesManager::lock"),
529 cr_registry(_cr_registry), ops_window(RGW_ASYNC_OPS_MGR_WINDOW) {
530 completion_mgr = new RGWCompletionManager(cct);
531 if (cr_registry) {
532 cr_registry->add(this);
533 }
534 }
535 virtual ~RGWCoroutinesManager() {
536 stop();
537 completion_mgr->put();
538 if (cr_registry) {
539 cr_registry->remove(this);
540 }
541 }
542
543 int run(list<RGWCoroutinesStack *>& ops);
544 int run(RGWCoroutine *op);
545 void stop() {
546 bool expected = false;
547 if (going_down.compare_exchange_strong(expected, true)) {
548 completion_mgr->go_down();
549 }
550 }
551
552 virtual void report_error(RGWCoroutinesStack *op);
553
554 RGWAioCompletionNotifier *create_completion_notifier(RGWCoroutinesStack *stack);
555 RGWCompletionManager *get_completion_mgr() { return completion_mgr; }
556
557 void schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack);
558 RGWCoroutinesStack *allocate_stack();
559
560 virtual string get_id();
561 void dump(Formatter *f) const;
562};
563
564class RGWSimpleCoroutine : public RGWCoroutine {
565 bool called_cleanup;
566
567 int operate() override;
568
569 int state_init();
570 int state_send_request();
571 int state_request_complete();
572 int state_all_complete();
573
574 void call_cleanup();
575
576public:
577 RGWSimpleCoroutine(CephContext *_cct) : RGWCoroutine(_cct), called_cleanup(false) {}
578 ~RGWSimpleCoroutine() override;
579
580 virtual int init() { return 0; }
581 virtual int send_request() = 0;
582 virtual int request_complete() = 0;
583 virtual int finish() { return 0; }
584 virtual void request_cleanup() {}
585};
586
587#endif