]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_coroutine.cc
update sources to 12.2.7
[ceph.git] / ceph / src / rgw / rgw_coroutine.cc
CommitLineData
7c673cae
FG
1
2#include "common/ceph_json.h"
7c673cae 3#include "rgw_coroutine.h"
7c673cae
FG
4
5// re-include our assert to clobber the system one; fix dout:
6#include "include/assert.h"
7
31f18b77
FG
8#include <boost/asio/yield.hpp>
9
7c673cae
FG
10#define dout_subsys ceph_subsys_rgw
11
12
13class RGWCompletionManager::WaitContext : public Context {
14 RGWCompletionManager *manager;
15 void *opaque;
16public:
17 WaitContext(RGWCompletionManager *_cm, void *_opaque) : manager(_cm), opaque(_opaque) {}
18 void finish(int r) override {
19 manager->_wakeup(opaque);
20 }
21};
22
23RGWCompletionManager::RGWCompletionManager(CephContext *_cct) : cct(_cct), lock("RGWCompletionManager::lock"),
24 timer(cct, lock)
25{
26 timer.init();
27}
28
29RGWCompletionManager::~RGWCompletionManager()
30{
31 Mutex::Locker l(lock);
32 timer.cancel_all_events();
33 timer.shutdown();
34}
35
36void RGWCompletionManager::complete(RGWAioCompletionNotifier *cn, void *user_info)
37{
38 Mutex::Locker l(lock);
39 _complete(cn, user_info);
40}
41
42void RGWCompletionManager::register_completion_notifier(RGWAioCompletionNotifier *cn)
43{
44 Mutex::Locker l(lock);
45 if (cn) {
46 cns.insert(cn);
47 }
48}
49
50void RGWCompletionManager::unregister_completion_notifier(RGWAioCompletionNotifier *cn)
51{
52 Mutex::Locker l(lock);
53 if (cn) {
54 cns.erase(cn);
55 }
56}
57
58void RGWCompletionManager::_complete(RGWAioCompletionNotifier *cn, void *user_info)
59{
60 if (cn) {
61 cns.erase(cn);
62 }
63 complete_reqs.push_back(user_info);
64 cond.Signal();
65}
66
67int RGWCompletionManager::get_next(void **user_info)
68{
69 Mutex::Locker l(lock);
70 while (complete_reqs.empty()) {
71 cond.Wait(lock);
72 if (going_down) {
73 return -ECANCELED;
74 }
75 }
76 *user_info = complete_reqs.front();
77 complete_reqs.pop_front();
78 return 0;
79}
80
81bool RGWCompletionManager::try_get_next(void **user_info)
82{
83 Mutex::Locker l(lock);
84 if (complete_reqs.empty()) {
85 return false;
86 }
87 *user_info = complete_reqs.front();
88 complete_reqs.pop_front();
89 return true;
90}
91
92void RGWCompletionManager::go_down()
93{
94 Mutex::Locker l(lock);
95 for (auto cn : cns) {
96 cn->unregister();
97 }
98 going_down = true;
99 cond.Signal();
100}
101
102void RGWCompletionManager::wait_interval(void *opaque, const utime_t& interval, void *user_info)
103{
104 Mutex::Locker l(lock);
105 assert(waiters.find(opaque) == waiters.end());
106 waiters[opaque] = user_info;
107 timer.add_event_after(interval, new WaitContext(this, opaque));
108}
109
110void RGWCompletionManager::wakeup(void *opaque)
111{
112 Mutex::Locker l(lock);
113 _wakeup(opaque);
114}
115
116void RGWCompletionManager::_wakeup(void *opaque)
117{
118 map<void *, void *>::iterator iter = waiters.find(opaque);
119 if (iter != waiters.end()) {
120 void *user_id = iter->second;
121 waiters.erase(iter);
122 _complete(NULL, user_id);
123 }
124}
125
126RGWCoroutine::~RGWCoroutine() {
127 for (auto stack : spawned.entries) {
128 stack->put();
129 }
130}
131
132void RGWCoroutine::set_io_blocked(bool flag) {
133 stack->set_io_blocked(flag);
134}
135
136void RGWCoroutine::set_sleeping(bool flag) {
137 stack->set_sleeping(flag);
138}
139
140int RGWCoroutine::io_block(int ret) {
141 set_io_blocked(true);
142 return ret;
143}
144
145void RGWCoroutine::StatusItem::dump(Formatter *f) const {
146 ::encode_json("timestamp", timestamp, f);
147 ::encode_json("status", status, f);
148}
149
150stringstream& RGWCoroutine::Status::set_status()
151{
152 RWLock::WLocker l(lock);
153 string s = status.str();
154 status.str(string());
155 if (!timestamp.is_zero()) {
156 history.push_back(StatusItem(timestamp, s));
157 }
158 if (history.size() > (size_t)max_history) {
159 history.pop_front();
160 }
161 timestamp = ceph_clock_now();
162
163 return status;
164}
165
166RGWCoroutinesStack::RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start) : cct(_cct), ops_mgr(_ops_mgr),
167 done_flag(false), error_flag(false), blocked_flag(false),
168 sleep_flag(false), interval_wait_flag(false), is_scheduled(false), is_waiting_for_child(false),
169 retcode(0), run_count(0),
170 env(NULL), parent(NULL)
171{
172 if (start) {
173 ops.push_back(start);
174 }
175 pos = ops.begin();
176}
177
178RGWCoroutinesStack::~RGWCoroutinesStack()
179{
180 for (auto op : ops) {
181 op->put();
182 }
183
184 for (auto stack : spawned.entries) {
185 stack->put();
186 }
187}
188
189int RGWCoroutinesStack::operate(RGWCoroutinesEnv *_env)
190{
191 env = _env;
192 RGWCoroutine *op = *pos;
193 op->stack = this;
194 ldout(cct, 20) << *op << ": operate()" << dendl;
195 int r = op->operate();
196 if (r < 0) {
197 ldout(cct, 20) << *op << ": operate() returned r=" << r << dendl;
198 }
199
200 error_flag = op->is_error();
201
202 if (op->is_done()) {
203 int op_retcode = r;
204 r = unwind(op_retcode);
205 op->put();
206 done_flag = (pos == ops.end());
207 if (done_flag) {
208 retcode = op_retcode;
209 }
210 return r;
211 }
212
213 /* should r ever be negative at this point? */
214 assert(r >= 0);
215
216 return 0;
217}
218
219string RGWCoroutinesStack::error_str()
220{
221 if (pos != ops.end()) {
222 return (*pos)->error_str();
223 }
224 return string();
225}
226
227void RGWCoroutinesStack::call(RGWCoroutine *next_op) {
228 if (!next_op) {
229 return;
230 }
231 ops.push_back(next_op);
232 if (pos != ops.end()) {
233 ++pos;
234 } else {
235 pos = ops.begin();
236 }
237}
238
239RGWCoroutinesStack *RGWCoroutinesStack::spawn(RGWCoroutine *source_op, RGWCoroutine *op, bool wait)
240{
241 if (!op) {
242 return NULL;
243 }
244
245 rgw_spawned_stacks *s = (source_op ? &source_op->spawned : &spawned);
246
247 RGWCoroutinesStack *stack = env->manager->allocate_stack();
248 s->add_pending(stack);
249 stack->parent = this;
250
251 stack->get(); /* we'll need to collect the stack */
252 stack->call(op);
253
254 env->manager->schedule(env, stack);
255
256 if (wait) {
257 set_blocked_by(stack);
258 }
259
260 return stack;
261}
262
263RGWCoroutinesStack *RGWCoroutinesStack::spawn(RGWCoroutine *op, bool wait)
264{
265 return spawn(NULL, op, wait);
266}
267
268int RGWCoroutinesStack::wait(const utime_t& interval)
269{
270 RGWCompletionManager *completion_mgr = env->manager->get_completion_mgr();
271 completion_mgr->wait_interval((void *)this, interval, (void *)this);
272 set_io_blocked(true);
273 set_interval_wait(true);
274 return 0;
275}
276
277void RGWCoroutinesStack::wakeup()
278{
279 RGWCompletionManager *completion_mgr = env->manager->get_completion_mgr();
280 completion_mgr->wakeup((void *)this);
281}
282
283int RGWCoroutinesStack::unwind(int retcode)
284{
285 rgw_spawned_stacks *src_spawned = &(*pos)->spawned;
286
287 if (pos == ops.begin()) {
288 spawned.inherit(src_spawned);
289 ops.clear();
290 pos = ops.end();
291 return retcode;
292 }
293
294 --pos;
295 ops.pop_back();
296 RGWCoroutine *op = *pos;
297 op->set_retcode(retcode);
298 op->spawned.inherit(src_spawned);
299 return 0;
300}
301
302
303bool RGWCoroutinesStack::collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack) /* returns true if needs to be called again */
304{
305 bool done = true;
306 rgw_spawned_stacks *s = (op ? &op->spawned : &spawned);
307 *ret = 0;
308 vector<RGWCoroutinesStack *> new_list;
309
310 for (vector<RGWCoroutinesStack *>::iterator iter = s->entries.begin(); iter != s->entries.end(); ++iter) {
311 RGWCoroutinesStack *stack = *iter;
312 if (stack == skip_stack || !stack->is_done()) {
313 new_list.push_back(stack);
314 if (!stack->is_done()) {
315 ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " is still running" << dendl;
316 } else if (stack == skip_stack) {
317 ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " explicitly skipping stack" << dendl;
318 }
319 continue;
320 }
321 int r = stack->get_ret_status();
322 stack->put();
323 if (r < 0) {
324 *ret = r;
325 ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " encountered error (r=" << r << "), skipping next stacks" << dendl;
326 new_list.insert(new_list.end(), ++iter, s->entries.end());
327 done &= (iter != s->entries.end());
328 break;
329 }
330
331 ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " is complete" << dendl;
332 }
333
334 s->entries.swap(new_list);
335 return (!done);
336}
337
338bool RGWCoroutinesStack::collect_next(RGWCoroutine *op, int *ret, RGWCoroutinesStack **collected_stack) /* returns true if found a stack to collect */
339{
340 rgw_spawned_stacks *s = (op ? &op->spawned : &spawned);
341 *ret = 0;
342
343 if (collected_stack) {
344 *collected_stack = NULL;
345 }
346
347 for (vector<RGWCoroutinesStack *>::iterator iter = s->entries.begin(); iter != s->entries.end(); ++iter) {
348 RGWCoroutinesStack *stack = *iter;
349 if (!stack->is_done()) {
350 continue;
351 }
352 int r = stack->get_ret_status();
353 if (r < 0) {
354 *ret = r;
355 }
356
357 if (collected_stack) {
358 *collected_stack = stack;
359 }
360 stack->put();
361
362 s->entries.erase(iter);
363 return true;
364 }
365
366 return false;
367}
368
369bool RGWCoroutinesStack::collect(int *ret, RGWCoroutinesStack *skip_stack) /* returns true if needs to be called again */
370{
371 return collect(NULL, ret, skip_stack);
372}
373
374static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg)
375{
376 ((RGWAioCompletionNotifier *)arg)->cb();
377}
378
379RGWAioCompletionNotifier::RGWAioCompletionNotifier(RGWCompletionManager *_mgr, void *_user_data) : completion_mgr(_mgr),
380 user_data(_user_data), lock("RGWAioCompletionNotifier"), registered(true) {
381 c = librados::Rados::aio_create_completion((void *)this, NULL,
382 _aio_completion_notifier_cb);
383}
384
385RGWAioCompletionNotifier *RGWCoroutinesStack::create_completion_notifier()
386{
387 return ops_mgr->create_completion_notifier(this);
388}
389
390RGWCompletionManager *RGWCoroutinesStack::get_completion_mgr()
391{
392 return ops_mgr->get_completion_mgr();
393}
394
395bool RGWCoroutinesStack::unblock_stack(RGWCoroutinesStack **s)
396{
397 if (blocking_stacks.empty()) {
398 return false;
399 }
400
401 set<RGWCoroutinesStack *>::iterator iter = blocking_stacks.begin();
402 *s = *iter;
403 blocking_stacks.erase(iter);
404 (*s)->blocked_by_stack.erase(this);
405
406 return true;
407}
408
409void RGWCoroutinesManager::report_error(RGWCoroutinesStack *op)
410{
411 if (!op) {
412 return;
413 }
414 string err = op->error_str();
415 if (err.empty()) {
416 return;
417 }
418 lderr(cct) << "ERROR: failed operation: " << op->error_str() << dendl;
419}
420
421void RGWCoroutinesStack::dump(Formatter *f) const {
422 stringstream ss;
423 ss << (void *)this;
424 ::encode_json("stack", ss.str(), f);
425 ::encode_json("run_count", run_count, f);
426 f->open_array_section("ops");
427 for (auto& i : ops) {
428 encode_json("op", *i, f);
429 }
430 f->close_section();
431}
432
433void RGWCoroutinesManager::handle_unblocked_stack(set<RGWCoroutinesStack *>& context_stacks, list<RGWCoroutinesStack *>& scheduled_stacks, RGWCoroutinesStack *stack, int *blocked_count)
434{
435 RWLock::WLocker wl(lock);
436 --(*blocked_count);
437 stack->set_io_blocked(false);
438 stack->set_interval_wait(false);
439 if (!stack->is_done()) {
440 scheduled_stacks.push_back(stack);
441 } else {
442 RWLock::WLocker wl(lock);
443 context_stacks.erase(stack);
444 stack->put();
445 }
446}
447
448void RGWCoroutinesManager::schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack)
449{
450 assert(lock.is_wlocked());
451 env->scheduled_stacks->push_back(stack);
452 set<RGWCoroutinesStack *>& context_stacks = run_contexts[env->run_context];
453 context_stacks.insert(stack);
454}
455
456int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks)
457{
458 int ret = 0;
459 int blocked_count = 0;
460 int interval_wait_count = 0;
461 bool canceled = false; // set on going_down
462 RGWCoroutinesEnv env;
463
464 uint64_t run_context = ++run_context_count;
465
466 lock.get_write();
467 set<RGWCoroutinesStack *>& context_stacks = run_contexts[run_context];
468 list<RGWCoroutinesStack *> scheduled_stacks;
469 for (auto& st : stacks) {
470 context_stacks.insert(st);
471 scheduled_stacks.push_back(st);
472 }
473 lock.unlock();
474
475 env.run_context = run_context;
476 env.manager = this;
477 env.scheduled_stacks = &scheduled_stacks;
478
479 for (list<RGWCoroutinesStack *>::iterator iter = scheduled_stacks.begin(); iter != scheduled_stacks.end() && !going_down;) {
480 lock.get_write();
481
482 RGWCoroutinesStack *stack = *iter;
483 env.stack = stack;
484
485 ret = stack->operate(&env);
486 stack->set_is_scheduled(false);
487 if (ret < 0) {
488 ldout(cct, 20) << "stack->operate() returned ret=" << ret << dendl;
489 }
490
491 if (stack->is_error()) {
492 report_error(stack);
493 }
494
495 bool op_not_blocked = false;
496
497 if (stack->is_io_blocked()) {
498 ldout(cct, 20) << __func__ << ":" << " stack=" << (void *)stack << " is io blocked" << dendl;
499 if (stack->is_interval_waiting()) {
500 interval_wait_count++;
501 }
502 blocked_count++;
503 } else if (stack->is_blocked()) {
504 /* do nothing, we'll re-add the stack when the blocking stack is done,
505 * or when we're awaken
506 */
507 ldout(cct, 20) << __func__ << ":" << " stack=" << (void *)stack << " is_blocked_by_stack()=" << stack->is_blocked_by_stack()
508 << " is_sleeping=" << stack->is_sleeping() << " waiting_for_child()=" << stack->waiting_for_child() << dendl;
509 } else if (stack->is_done()) {
510 ldout(cct, 20) << __func__ << ":" << " stack=" << (void *)stack << " is done" << dendl;
511 RGWCoroutinesStack *s;
512 while (stack->unblock_stack(&s)) {
513 if (!s->is_blocked_by_stack() && !s->is_done()) {
514 if (s->is_io_blocked()) {
515 if (stack->is_interval_waiting()) {
516 interval_wait_count++;
517 }
518 blocked_count++;
519 } else {
520 s->schedule();
521 }
522 }
523 }
524 if (stack->parent && stack->parent->waiting_for_child()) {
525 stack->parent->set_wait_for_child(false);
526 stack->parent->schedule();
527 }
528 context_stacks.erase(stack);
529 stack->put();
530 stack = NULL;
531 } else {
532 op_not_blocked = true;
533 stack->run_count++;
534 stack->schedule();
535 }
536
537 if (!op_not_blocked && stack) {
538 stack->run_count = 0;
539 }
540
541 lock.unlock();
542
543 RGWCoroutinesStack *blocked_stack;
544 while (completion_mgr->try_get_next((void **)&blocked_stack)) {
545 handle_unblocked_stack(context_stacks, scheduled_stacks, blocked_stack, &blocked_count);
546 }
547
548 /*
549 * only account blocked operations that are not in interval_wait, these are stacks that
550 * were put on a wait without any real IO operations. While we mark these as io_blocked,
551 * these aren't really waiting for IOs
552 */
553 while (blocked_count - interval_wait_count >= ops_window) {
554 ret = completion_mgr->get_next((void **)&blocked_stack);
555 if (ret < 0) {
28e407b8 556 ldout(cct, 5) << "completion_mgr.get_next() returned ret=" << ret << dendl;
7c673cae
FG
557 }
558 handle_unblocked_stack(context_stacks, scheduled_stacks, blocked_stack, &blocked_count);
559 }
560
561 ++iter;
562 scheduled_stacks.pop_front();
563
564
565 while (scheduled_stacks.empty() && blocked_count > 0) {
566 ret = completion_mgr->get_next((void **)&blocked_stack);
567 if (ret < 0) {
28e407b8 568 ldout(cct, 5) << "completion_mgr.get_next() returned ret=" << ret << dendl;
7c673cae
FG
569 }
570 if (going_down) {
571 ldout(cct, 5) << __func__ << "(): was stopped, exiting" << dendl;
572 ret = -ECANCELED;
573 canceled = true;
574 break;
575 }
576 handle_unblocked_stack(context_stacks, scheduled_stacks, blocked_stack, &blocked_count);
577 iter = scheduled_stacks.begin();
578 }
579 if (canceled) {
580 break;
581 }
582
583 if (iter == scheduled_stacks.end()) {
584 iter = scheduled_stacks.begin();
585 }
586 }
587
588 lock.get_write();
589 if (!context_stacks.empty() && !going_down) {
590 JSONFormatter formatter(true);
591 formatter.open_array_section("context_stacks");
592 for (auto& s : context_stacks) {
593 ::encode_json("entry", *s, &formatter);
594 }
595 formatter.close_section();
596 lderr(cct) << __func__ << "(): ERROR: deadlock detected, dumping remaining coroutines:\n";
597 formatter.flush(*_dout);
598 *_dout << dendl;
599 assert(context_stacks.empty() || going_down); // assert on deadlock
600 }
601
602 for (auto stack : context_stacks) {
603 ldout(cct, 20) << "clearing stack on run() exit: stack=" << (void *)stack << " nref=" << stack->get_nref() << dendl;
604 stack->put();
605 }
606 run_contexts.erase(run_context);
607 lock.unlock();
608
609 return ret;
610}
611
612int RGWCoroutinesManager::run(RGWCoroutine *op)
613{
614 if (!op) {
615 return 0;
616 }
617 list<RGWCoroutinesStack *> stacks;
618 RGWCoroutinesStack *stack = allocate_stack();
619 op->get();
620 stack->call(op);
621
622 stack->schedule(&stacks);
623
624 int r = run(stacks);
625 if (r < 0) {
626 ldout(cct, 20) << "run(stacks) returned r=" << r << dendl;
627 } else {
628 r = op->get_ret_status();
629 }
630 op->put();
631
632 return r;
633}
634
635RGWAioCompletionNotifier *RGWCoroutinesManager::create_completion_notifier(RGWCoroutinesStack *stack)
636{
637 RGWAioCompletionNotifier *cn = new RGWAioCompletionNotifier(completion_mgr, (void *)stack);
638 completion_mgr->register_completion_notifier(cn);
639 return cn;
640}
641
642void RGWCoroutinesManager::dump(Formatter *f) const {
643 RWLock::RLocker rl(lock);
644
645 f->open_array_section("run_contexts");
646 for (auto& i : run_contexts) {
647 f->open_object_section("context");
648 ::encode_json("id", i.first, f);
649 f->open_array_section("entries");
650 for (auto& s : i.second) {
651 ::encode_json("entry", *s, f);
652 }
653 f->close_section();
654 f->close_section();
655 }
656 f->close_section();
657}
658
659RGWCoroutinesStack *RGWCoroutinesManager::allocate_stack() {
660 return new RGWCoroutinesStack(cct, this);
661}
662
663string RGWCoroutinesManager::get_id()
664{
665 if (!id.empty()) {
666 return id;
667 }
668 stringstream ss;
669 ss << (void *)this;
670 return ss.str();
671}
672
673void RGWCoroutinesManagerRegistry::add(RGWCoroutinesManager *mgr)
674{
675 RWLock::WLocker wl(lock);
676 if (managers.find(mgr) == managers.end()) {
677 managers.insert(mgr);
678 get();
679 }
680}
681
682void RGWCoroutinesManagerRegistry::remove(RGWCoroutinesManager *mgr)
683{
684 RWLock::WLocker wl(lock);
685 if (managers.find(mgr) != managers.end()) {
686 managers.erase(mgr);
687 put();
688 }
689}
690
691RGWCoroutinesManagerRegistry::~RGWCoroutinesManagerRegistry()
692{
693 AdminSocket *admin_socket = cct->get_admin_socket();
694 if (!admin_command.empty()) {
695 admin_socket->unregister_command(admin_command);
696 }
697}
698
699int RGWCoroutinesManagerRegistry::hook_to_admin_command(const string& command)
700{
701 AdminSocket *admin_socket = cct->get_admin_socket();
702 if (!admin_command.empty()) {
703 admin_socket->unregister_command(admin_command);
704 }
705 admin_command = command;
706 int r = admin_socket->register_command(admin_command, admin_command, this,
707 "dump current coroutines stack state");
708 if (r < 0) {
709 lderr(cct) << "ERROR: fail to register admin socket command (r=" << r << ")" << dendl;
710 return r;
711 }
712 return 0;
713}
714
715bool RGWCoroutinesManagerRegistry::call(std::string command, cmdmap_t& cmdmap, std::string format,
716 bufferlist& out) {
717 RWLock::RLocker rl(lock);
718 stringstream ss;
719 JSONFormatter f;
720 ::encode_json("cr_managers", *this, &f);
721 f.flush(ss);
722 out.append(ss);
723 return true;
724}
725
726void RGWCoroutinesManagerRegistry::dump(Formatter *f) const {
727 f->open_array_section("coroutine_managers");
728 for (auto m : managers) {
729 ::encode_json("entry", *m, f);
730 }
731 f->close_section();
732}
733
734void RGWCoroutine::call(RGWCoroutine *op)
735{
736 stack->call(op);
737}
738
739RGWCoroutinesStack *RGWCoroutine::spawn(RGWCoroutine *op, bool wait)
740{
741 return stack->spawn(this, op, wait);
742}
743
744bool RGWCoroutine::collect(int *ret, RGWCoroutinesStack *skip_stack) /* returns true if needs to be called again */
745{
746 return stack->collect(this, ret, skip_stack);
747}
748
749bool RGWCoroutine::collect_next(int *ret, RGWCoroutinesStack **collected_stack) /* returns true if found a stack to collect */
750{
751 return stack->collect_next(this, ret, collected_stack);
752}
753
754int RGWCoroutine::wait(const utime_t& interval)
755{
756 return stack->wait(interval);
757}
758
759void RGWCoroutine::wait_for_child()
760{
761 /* should only wait for child if there is a child that is not done yet, and no complete children */
762 if (spawned.entries.empty()) {
763 return;
764 }
765 for (vector<RGWCoroutinesStack *>::iterator iter = spawned.entries.begin(); iter != spawned.entries.end(); ++iter) {
766 if ((*iter)->is_done()) {
767 return;
768 }
769 }
770 stack->set_wait_for_child(true);
771}
772
773string RGWCoroutine::to_str() const
774{
775 return typeid(*this).name();
776}
777
778ostream& operator<<(ostream& out, const RGWCoroutine& cr)
779{
780 out << "cr:s=" << (void *)cr.get_stack() << ":op=" << (void *)&cr << ":" << typeid(cr).name();
781 return out;
782}
783
784bool RGWCoroutine::drain_children(int num_cr_left, RGWCoroutinesStack *skip_stack)
785{
786 bool done = false;
787 assert(num_cr_left >= 0);
788 if (num_cr_left == 0 && skip_stack) {
789 num_cr_left = 1;
790 }
791 reenter(&drain_cr) {
792 while (num_spawned() > (size_t)num_cr_left) {
793 yield wait_for_child();
794 int ret;
795 while (collect(&ret, skip_stack)) {
796 if (ret < 0) {
797 ldout(cct, 10) << "collect() returned ret=" << ret << dendl;
798 /* we should have reported this error */
799 log_error() << "ERROR: collect() returned error (ret=" << ret << ")";
800 }
801 }
802 }
803 done = true;
804 }
805 return done;
806}
807
808void RGWCoroutine::wakeup()
809{
810 stack->wakeup();
811}
812
813void RGWCoroutine::dump(Formatter *f) const {
814 if (!description.str().empty()) {
815 encode_json("description", description.str(), f);
816 }
817 encode_json("type", to_str(), f);
818 if (!spawned.entries.empty()) {
819 f->open_array_section("spawned");
820 for (auto& i : spawned.entries) {
821 char buf[32];
822 snprintf(buf, sizeof(buf), "%p", (void *)i);
823 encode_json("stack", string(buf), f);
824 }
825 f->close_section();
826 }
827 if (!status.history.empty()) {
828 encode_json("history", status.history, f);
829 }
830
831 if (!status.status.str().empty()) {
832 f->open_object_section("status");
833 encode_json("status", status.status.str(), f);
834 encode_json("timestamp", status.timestamp, f);
835 f->close_section();
836 }
837}
838
839RGWSimpleCoroutine::~RGWSimpleCoroutine()
840{
841 if (!called_cleanup) {
842 request_cleanup();
843 }
844}
845
846void RGWSimpleCoroutine::call_cleanup()
847{
848 called_cleanup = true;
849 request_cleanup();
850}
851
852int RGWSimpleCoroutine::operate()
853{
854 int ret = 0;
855 reenter(this) {
856 yield return state_init();
857 yield return state_send_request();
858 yield return state_request_complete();
859 yield return state_all_complete();
860 drain_all();
861 call_cleanup();
862 return set_state(RGWCoroutine_Done, ret);
863 }
864 return 0;
865}
866
867int RGWSimpleCoroutine::state_init()
868{
869 int ret = init();
870 if (ret < 0) {
871 call_cleanup();
872 return set_state(RGWCoroutine_Error, ret);
873 }
874 return 0;
875}
876
877int RGWSimpleCoroutine::state_send_request()
878{
879 int ret = send_request();
880 if (ret < 0) {
881 call_cleanup();
882 return set_state(RGWCoroutine_Error, ret);
883 }
884 return io_block(0);
885}
886
887int RGWSimpleCoroutine::state_request_complete()
888{
889 int ret = request_complete();
890 if (ret < 0) {
891 call_cleanup();
892 return set_state(RGWCoroutine_Error, ret);
893 }
894 return 0;
895}
896
897int RGWSimpleCoroutine::state_all_complete()
898{
899 int ret = finish();
900 if (ret < 0) {
901 call_cleanup();
902 return set_state(RGWCoroutine_Error, ret);
903 }
904 return 0;
905}
906
907