]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_coroutine.cc
import ceph octopus 15.2.17
[ceph.git] / ceph / src / rgw / rgw_coroutine.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "include/Context.h"
5 #include "common/ceph_json.h"
6 #include "rgw_coroutine.h"
7
8 // re-include our assert to clobber the system one; fix dout:
9 #include "include/ceph_assert.h"
10
11 #include <boost/asio/yield.hpp>
12
13 #define dout_subsys ceph_subsys_rgw
14 #define dout_context g_ceph_context
15
16
17 class RGWCompletionManager::WaitContext : public Context {
18 RGWCompletionManager *manager;
19 void *opaque;
20 public:
21 WaitContext(RGWCompletionManager *_cm, void *_opaque) : manager(_cm), opaque(_opaque) {}
22 void finish(int r) override {
23 manager->_wakeup(opaque);
24 }
25 };
26
27 RGWCompletionManager::RGWCompletionManager(CephContext *_cct) : cct(_cct),
28 timer(cct, lock)
29 {
30 timer.init();
31 }
32
33 RGWCompletionManager::~RGWCompletionManager()
34 {
35 std::lock_guard l{lock};
36 timer.cancel_all_events();
37 timer.shutdown();
38 }
39
40 void RGWCompletionManager::complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info)
41 {
42 std::lock_guard l{lock};
43 _complete(cn, io_id, user_info);
44 }
45
46 void RGWCompletionManager::register_completion_notifier(RGWAioCompletionNotifier *cn)
47 {
48 std::lock_guard l{lock};
49 if (cn) {
50 cns.insert(cn);
51 }
52 }
53
54 void RGWCompletionManager::unregister_completion_notifier(RGWAioCompletionNotifier *cn)
55 {
56 std::lock_guard l{lock};
57 if (cn) {
58 cns.erase(cn);
59 }
60 }
61
62 void RGWCompletionManager::_complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info)
63 {
64 if (cn) {
65 cns.erase(cn);
66 }
67
68 if (complete_reqs_set.find(io_id) != complete_reqs_set.end()) {
69 /* already have completion for this io_id, don't allow multiple completions for it */
70 return;
71 }
72 complete_reqs.push_back(io_completion{io_id, user_info});
73 cond.notify_all();
74 }
75
76 int RGWCompletionManager::get_next(io_completion *io)
77 {
78 std::unique_lock l{lock};
79 while (complete_reqs.empty()) {
80 if (going_down) {
81 return -ECANCELED;
82 }
83 cond.wait(l);
84 }
85 *io = complete_reqs.front();
86 complete_reqs_set.erase(io->io_id);
87 complete_reqs.pop_front();
88 return 0;
89 }
90
91 bool RGWCompletionManager::try_get_next(io_completion *io)
92 {
93 std::lock_guard l{lock};
94 if (complete_reqs.empty()) {
95 return false;
96 }
97 *io = complete_reqs.front();
98 complete_reqs_set.erase(io->io_id);
99 complete_reqs.pop_front();
100 return true;
101 }
102
103 void RGWCompletionManager::go_down()
104 {
105 std::lock_guard l{lock};
106 for (auto cn : cns) {
107 cn->unregister();
108 }
109 going_down = true;
110 cond.notify_all();
111 }
112
113 void RGWCompletionManager::wait_interval(void *opaque, const utime_t& interval, void *user_info)
114 {
115 std::lock_guard l{lock};
116 ceph_assert(waiters.find(opaque) == waiters.end());
117 waiters[opaque] = user_info;
118 timer.add_event_after(interval, new WaitContext(this, opaque));
119 }
120
121 void RGWCompletionManager::wakeup(void *opaque)
122 {
123 std::lock_guard l{lock};
124 _wakeup(opaque);
125 }
126
127 void RGWCompletionManager::_wakeup(void *opaque)
128 {
129 map<void *, void *>::iterator iter = waiters.find(opaque);
130 if (iter != waiters.end()) {
131 void *user_id = iter->second;
132 waiters.erase(iter);
133 _complete(NULL, rgw_io_id{0, -1} /* no IO id */, user_id);
134 }
135 }
136
137 RGWCoroutine::~RGWCoroutine() {
138 for (auto stack : spawned.entries) {
139 stack->put();
140 }
141 }
142
143 void RGWCoroutine::init_new_io(RGWIOProvider *io_provider)
144 {
145 ceph_assert(stack); // if there's no stack, io_provider won't be uninitialized
146 stack->init_new_io(io_provider);
147 }
148
149 void RGWCoroutine::set_io_blocked(bool flag) {
150 if (stack) {
151 stack->set_io_blocked(flag);
152 }
153 }
154
155 void RGWCoroutine::set_sleeping(bool flag) {
156 if (stack) {
157 stack->set_sleeping(flag);
158 }
159 }
160
161 int RGWCoroutine::io_block(int ret, int64_t io_id) {
162 return io_block(ret, rgw_io_id{io_id, -1});
163 }
164
165 int RGWCoroutine::io_block(int ret, const rgw_io_id& io_id) {
166 if (!stack) {
167 return 0;
168 }
169 if (stack->consume_io_finish(io_id)) {
170 return 0;
171 }
172 set_io_blocked(true);
173 stack->set_io_blocked_id(io_id);
174 return ret;
175 }
176
177 void RGWCoroutine::io_complete(const rgw_io_id& io_id) {
178 if (stack) {
179 stack->io_complete(io_id);
180 }
181 }
182
183 void RGWCoroutine::StatusItem::dump(Formatter *f) const {
184 ::encode_json("timestamp", timestamp, f);
185 ::encode_json("status", status, f);
186 }
187
188 stringstream& RGWCoroutine::Status::set_status()
189 {
190 std::unique_lock l{lock};
191 string s = status.str();
192 status.str(string());
193 if (!timestamp.is_zero()) {
194 history.push_back(StatusItem(timestamp, s));
195 }
196 if (history.size() > (size_t)max_history) {
197 history.pop_front();
198 }
199 timestamp = ceph_clock_now();
200
201 return status;
202 }
203
204 int64_t RGWCoroutinesManager::get_next_io_id()
205 {
206 return (int64_t)++max_io_id;
207 }
208
209 RGWCoroutinesStack::RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start) : cct(_cct), ops_mgr(_ops_mgr),
210 done_flag(false), error_flag(false), blocked_flag(false),
211 sleep_flag(false), interval_wait_flag(false), is_scheduled(false), is_waiting_for_child(false),
212 retcode(0), run_count(0),
213 env(NULL), parent(NULL)
214 {
215 if (start) {
216 ops.push_back(start);
217 }
218 pos = ops.begin();
219 }
220
221 RGWCoroutinesStack::~RGWCoroutinesStack()
222 {
223 for (auto op : ops) {
224 op->put();
225 }
226
227 for (auto stack : spawned.entries) {
228 stack->put();
229 }
230 }
231
232 int RGWCoroutinesStack::operate(RGWCoroutinesEnv *_env)
233 {
234 env = _env;
235 RGWCoroutine *op = *pos;
236 op->stack = this;
237 ldout(cct, 20) << *op << ": operate()" << dendl;
238 int r = op->operate_wrapper();
239 if (r < 0) {
240 ldout(cct, 20) << *op << ": operate() returned r=" << r << dendl;
241 }
242
243 error_flag = op->is_error();
244
245 if (op->is_done()) {
246 int op_retcode = r;
247 r = unwind(op_retcode);
248 op->put();
249 done_flag = (pos == ops.end());
250 blocked_flag &= !done_flag;
251 if (done_flag) {
252 retcode = op_retcode;
253 }
254 return r;
255 }
256
257 /* should r ever be negative at this point? */
258 ceph_assert(r >= 0);
259
260 return 0;
261 }
262
263 string RGWCoroutinesStack::error_str()
264 {
265 if (pos != ops.end()) {
266 return (*pos)->error_str();
267 }
268 return string();
269 }
270
271 void RGWCoroutinesStack::call(RGWCoroutine *next_op) {
272 if (!next_op) {
273 return;
274 }
275 ops.push_back(next_op);
276 if (pos != ops.end()) {
277 ++pos;
278 } else {
279 pos = ops.begin();
280 }
281 }
282
283 void RGWCoroutinesStack::schedule()
284 {
285 env->manager->schedule(env, this);
286 }
287
288 void RGWCoroutinesStack::_schedule()
289 {
290 env->manager->_schedule(env, this);
291 }
292
293 RGWCoroutinesStack *RGWCoroutinesStack::spawn(RGWCoroutine *source_op, RGWCoroutine *op, bool wait)
294 {
295 if (!op) {
296 return NULL;
297 }
298
299 rgw_spawned_stacks *s = (source_op ? &source_op->spawned : &spawned);
300
301 RGWCoroutinesStack *stack = env->manager->allocate_stack();
302 s->add_pending(stack);
303 stack->parent = this;
304
305 stack->get(); /* we'll need to collect the stack */
306 stack->call(op);
307
308 env->manager->schedule(env, stack);
309
310 if (wait) {
311 set_blocked_by(stack);
312 }
313
314 return stack;
315 }
316
317 RGWCoroutinesStack *RGWCoroutinesStack::spawn(RGWCoroutine *op, bool wait)
318 {
319 return spawn(NULL, op, wait);
320 }
321
322 int RGWCoroutinesStack::wait(const utime_t& interval)
323 {
324 RGWCompletionManager *completion_mgr = env->manager->get_completion_mgr();
325 completion_mgr->wait_interval((void *)this, interval, (void *)this);
326 set_io_blocked(true);
327 set_interval_wait(true);
328 return 0;
329 }
330
331 void RGWCoroutinesStack::wakeup()
332 {
333 RGWCompletionManager *completion_mgr = env->manager->get_completion_mgr();
334 completion_mgr->wakeup((void *)this);
335 }
336
337 void RGWCoroutinesStack::io_complete(const rgw_io_id& io_id)
338 {
339 RGWCompletionManager *completion_mgr = env->manager->get_completion_mgr();
340 completion_mgr->complete(nullptr, io_id, (void *)this);
341 }
342
343 int RGWCoroutinesStack::unwind(int retcode)
344 {
345 rgw_spawned_stacks *src_spawned = &(*pos)->spawned;
346
347 if (pos == ops.begin()) {
348 ldout(cct, 15) << "stack " << (void *)this << " end" << dendl;
349 spawned.inherit(src_spawned);
350 ops.clear();
351 pos = ops.end();
352 return retcode;
353 }
354
355 --pos;
356 ops.pop_back();
357 RGWCoroutine *op = *pos;
358 op->set_retcode(retcode);
359 op->spawned.inherit(src_spawned);
360 return 0;
361 }
362
363 void RGWCoroutinesStack::cancel()
364 {
365 while (!ops.empty()) {
366 RGWCoroutine *op = *pos;
367 unwind(-ECANCELED);
368 op->put();
369 }
370 put();
371 }
372
373 bool RGWCoroutinesStack::collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack) /* returns true if needs to be called again */
374 {
375 bool need_retry = false;
376 rgw_spawned_stacks *s = (op ? &op->spawned : &spawned);
377 *ret = 0;
378 vector<RGWCoroutinesStack *> new_list;
379
380 for (vector<RGWCoroutinesStack *>::iterator iter = s->entries.begin(); iter != s->entries.end(); ++iter) {
381 RGWCoroutinesStack *stack = *iter;
382 if (stack == skip_stack || !stack->is_done()) {
383 new_list.push_back(stack);
384 if (!stack->is_done()) {
385 ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " is still running" << dendl;
386 } else if (stack == skip_stack) {
387 ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " explicitly skipping stack" << dendl;
388 }
389 continue;
390 }
391 int r = stack->get_ret_status();
392 stack->put();
393 if (r < 0) {
394 *ret = r;
395 ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " encountered error (r=" << r << "), skipping next stacks" << dendl;
396 new_list.insert(new_list.end(), ++iter, s->entries.end());
397 need_retry = (iter != s->entries.end());
398 break;
399 }
400
401 ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " is complete" << dendl;
402 }
403
404 s->entries.swap(new_list);
405 return need_retry;
406 }
407
408 bool RGWCoroutinesStack::collect_next(RGWCoroutine *op, int *ret, RGWCoroutinesStack **collected_stack) /* returns true if found a stack to collect */
409 {
410 rgw_spawned_stacks *s = (op ? &op->spawned : &spawned);
411 *ret = 0;
412
413 if (collected_stack) {
414 *collected_stack = NULL;
415 }
416
417 for (vector<RGWCoroutinesStack *>::iterator iter = s->entries.begin(); iter != s->entries.end(); ++iter) {
418 RGWCoroutinesStack *stack = *iter;
419 if (!stack->is_done()) {
420 continue;
421 }
422 int r = stack->get_ret_status();
423 if (r < 0) {
424 *ret = r;
425 }
426
427 if (collected_stack) {
428 *collected_stack = stack;
429 }
430 stack->put();
431
432 s->entries.erase(iter);
433 return true;
434 }
435
436 return false;
437 }
438
439 bool RGWCoroutinesStack::collect(int *ret, RGWCoroutinesStack *skip_stack) /* returns true if needs to be called again */
440 {
441 return collect(NULL, ret, skip_stack);
442 }
443
444 static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg)
445 {
446 (static_cast<RGWAioCompletionNotifier *>(arg))->cb();
447 }
448
449 RGWAioCompletionNotifier::RGWAioCompletionNotifier(RGWCompletionManager *_mgr, const rgw_io_id& _io_id, void *_user_data) : completion_mgr(_mgr),
450 io_id(_io_id),
451 user_data(_user_data), registered(true) {
452 c = librados::Rados::aio_create_completion(this, _aio_completion_notifier_cb);
453 }
454
455 RGWAioCompletionNotifier *RGWCoroutinesStack::create_completion_notifier()
456 {
457 return ops_mgr->create_completion_notifier(this);
458 }
459
460 RGWCompletionManager *RGWCoroutinesStack::get_completion_mgr()
461 {
462 return ops_mgr->get_completion_mgr();
463 }
464
465 bool RGWCoroutinesStack::unblock_stack(RGWCoroutinesStack **s)
466 {
467 if (blocking_stacks.empty()) {
468 return false;
469 }
470
471 set<RGWCoroutinesStack *>::iterator iter = blocking_stacks.begin();
472 *s = *iter;
473 blocking_stacks.erase(iter);
474 (*s)->blocked_by_stack.erase(this);
475
476 return true;
477 }
478
479 void RGWCoroutinesManager::report_error(RGWCoroutinesStack *op)
480 {
481 if (!op) {
482 return;
483 }
484 string err = op->error_str();
485 if (err.empty()) {
486 return;
487 }
488 lderr(cct) << "ERROR: failed operation: " << op->error_str() << dendl;
489 }
490
491 void RGWCoroutinesStack::dump(Formatter *f) const {
492 stringstream ss;
493 ss << (void *)this;
494 ::encode_json("stack", ss.str(), f);
495 ::encode_json("run_count", run_count, f);
496 f->open_array_section("ops");
497 for (auto& i : ops) {
498 encode_json("op", *i, f);
499 }
500 f->close_section();
501 }
502
503 void RGWCoroutinesStack::init_new_io(RGWIOProvider *io_provider)
504 {
505 io_provider->set_io_user_info((void *)this);
506 io_provider->assign_io(env->manager->get_io_id_provider());
507 }
508
509 bool RGWCoroutinesStack::try_io_unblock(const rgw_io_id& io_id)
510 {
511 if (!can_io_unblock(io_id)) {
512 auto p = io_finish_ids.emplace(io_id.id, io_id);
513 auto& iter = p.first;
514 bool inserted = p.second;
515 if (!inserted) { /* could not insert, entry already existed, add channel to completion mask */
516 iter->second.channels |= io_id.channels;
517 }
518 return false;
519 }
520
521 return true;
522 }
523
524 bool RGWCoroutinesStack::consume_io_finish(const rgw_io_id& io_id)
525 {
526 auto iter = io_finish_ids.find(io_id.id);
527 if (iter == io_finish_ids.end()) {
528 return false;
529 }
530 int finish_mask = iter->second.channels;
531 bool found = (finish_mask & io_id.channels) != 0;
532
533 finish_mask &= ~(finish_mask & io_id.channels);
534
535 if (finish_mask == 0) {
536 io_finish_ids.erase(iter);
537 }
538 return found;
539 }
540
541
542 void RGWCoroutinesManager::handle_unblocked_stack(set<RGWCoroutinesStack *>& context_stacks, list<RGWCoroutinesStack *>& scheduled_stacks,
543 RGWCompletionManager::io_completion& io, int *blocked_count)
544 {
545 ceph_assert(ceph_mutex_is_wlocked(lock));
546 RGWCoroutinesStack *stack = static_cast<RGWCoroutinesStack *>(io.user_info);
547 if (context_stacks.find(stack) == context_stacks.end()) {
548 return;
549 }
550 if (!stack->try_io_unblock(io.io_id)) {
551 return;
552 }
553 if (stack->is_io_blocked()) {
554 --(*blocked_count);
555 stack->set_io_blocked(false);
556 }
557 stack->set_interval_wait(false);
558 if (!stack->is_done()) {
559 if (!stack->is_scheduled) {
560 scheduled_stacks.push_back(stack);
561 stack->set_is_scheduled(true);
562 }
563 } else {
564 context_stacks.erase(stack);
565 stack->put();
566 }
567 }
568
569 void RGWCoroutinesManager::schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack)
570 {
571 std::unique_lock wl{lock};
572 _schedule(env, stack);
573 }
574
575 void RGWCoroutinesManager::_schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack)
576 {
577 ceph_assert(ceph_mutex_is_wlocked(lock));
578 if (!stack->is_scheduled) {
579 env->scheduled_stacks->push_back(stack);
580 stack->set_is_scheduled(true);
581 }
582 set<RGWCoroutinesStack *>& context_stacks = run_contexts[env->run_context];
583 context_stacks.insert(stack);
584 }
585
586 void RGWCoroutinesManager::set_sleeping(RGWCoroutine *cr, bool flag)
587 {
588 cr->set_sleeping(flag);
589 }
590
591 void RGWCoroutinesManager::io_complete(RGWCoroutine *cr, const rgw_io_id& io_id)
592 {
593 cr->io_complete(io_id);
594 }
595
596 int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks)
597 {
598 int ret = 0;
599 int blocked_count = 0;
600 int interval_wait_count = 0;
601 bool canceled = false; // set on going_down
602 RGWCoroutinesEnv env;
603 bool op_not_blocked;
604
605 uint64_t run_context = ++run_context_count;
606
607 lock.lock();
608 set<RGWCoroutinesStack *>& context_stacks = run_contexts[run_context];
609 list<RGWCoroutinesStack *> scheduled_stacks;
610 for (auto& st : stacks) {
611 context_stacks.insert(st);
612 scheduled_stacks.push_back(st);
613 st->set_is_scheduled(true);
614 }
615 env.run_context = run_context;
616 env.manager = this;
617 env.scheduled_stacks = &scheduled_stacks;
618
619 for (list<RGWCoroutinesStack *>::iterator iter = scheduled_stacks.begin(); iter != scheduled_stacks.end() && !going_down;) {
620 RGWCompletionManager::io_completion io;
621 RGWCoroutinesStack *stack = *iter;
622 ++iter;
623 scheduled_stacks.pop_front();
624
625 if (context_stacks.find(stack) == context_stacks.end()) {
626 /* stack was probably schedule more than once due to IO, but was since complete */
627 goto next;
628 }
629 env.stack = stack;
630
631 lock.unlock();
632
633 ret = stack->operate(&env);
634
635 lock.lock();
636
637 stack->set_is_scheduled(false);
638 if (ret < 0) {
639 ldout(cct, 20) << "stack->operate() returned ret=" << ret << dendl;
640 }
641
642 if (stack->is_error()) {
643 report_error(stack);
644 }
645
646 op_not_blocked = false;
647
648 if (stack->is_io_blocked()) {
649 ldout(cct, 20) << __func__ << ":" << " stack=" << (void *)stack << " is io blocked" << dendl;
650 if (stack->is_interval_waiting()) {
651 interval_wait_count++;
652 }
653 blocked_count++;
654 } else if (stack->is_blocked()) {
655 /* do nothing, we'll re-add the stack when the blocking stack is done,
656 * or when we're awaken
657 */
658 ldout(cct, 20) << __func__ << ":" << " stack=" << (void *)stack << " is_blocked_by_stack()=" << stack->is_blocked_by_stack()
659 << " is_sleeping=" << stack->is_sleeping() << " waiting_for_child()=" << stack->waiting_for_child() << dendl;
660 } else if (stack->is_done()) {
661 ldout(cct, 20) << __func__ << ":" << " stack=" << (void *)stack << " is done" << dendl;
662 RGWCoroutinesStack *s;
663 while (stack->unblock_stack(&s)) {
664 if (!s->is_blocked_by_stack() && !s->is_done()) {
665 if (s->is_io_blocked()) {
666 if (stack->is_interval_waiting()) {
667 interval_wait_count++;
668 }
669 blocked_count++;
670 } else {
671 s->_schedule();
672 }
673 }
674 }
675 if (stack->parent && stack->parent->waiting_for_child()) {
676 stack->parent->set_wait_for_child(false);
677 stack->parent->_schedule();
678 }
679 context_stacks.erase(stack);
680 stack->put();
681 stack = NULL;
682 } else {
683 op_not_blocked = true;
684 stack->run_count++;
685 stack->_schedule();
686 }
687
688 if (!op_not_blocked && stack) {
689 stack->run_count = 0;
690 }
691
692 while (completion_mgr->try_get_next(&io)) {
693 handle_unblocked_stack(context_stacks, scheduled_stacks, io, &blocked_count);
694 }
695
696 /*
697 * only account blocked operations that are not in interval_wait, these are stacks that
698 * were put on a wait without any real IO operations. While we mark these as io_blocked,
699 * these aren't really waiting for IOs
700 */
701 while (blocked_count - interval_wait_count >= ops_window) {
702 lock.unlock();
703 ret = completion_mgr->get_next(&io);
704 lock.lock();
705 if (ret < 0) {
706 ldout(cct, 5) << "completion_mgr.get_next() returned ret=" << ret << dendl;
707 }
708 handle_unblocked_stack(context_stacks, scheduled_stacks, io, &blocked_count);
709 }
710
711 next:
712 while (scheduled_stacks.empty() && blocked_count > 0) {
713 lock.unlock();
714 ret = completion_mgr->get_next(&io);
715 lock.lock();
716 if (ret < 0) {
717 ldout(cct, 5) << "completion_mgr.get_next() returned ret=" << ret << dendl;
718 }
719 if (going_down) {
720 ldout(cct, 5) << __func__ << "(): was stopped, exiting" << dendl;
721 ret = -ECANCELED;
722 canceled = true;
723 break;
724 }
725 handle_unblocked_stack(context_stacks, scheduled_stacks, io, &blocked_count);
726 iter = scheduled_stacks.begin();
727 }
728 if (canceled) {
729 break;
730 }
731
732 if (iter == scheduled_stacks.end()) {
733 iter = scheduled_stacks.begin();
734 }
735 }
736
737 if (!context_stacks.empty() && !going_down) {
738 JSONFormatter formatter(true);
739 formatter.open_array_section("context_stacks");
740 for (auto& s : context_stacks) {
741 ::encode_json("entry", *s, &formatter);
742 }
743 formatter.close_section();
744 lderr(cct) << __func__ << "(): ERROR: deadlock detected, dumping remaining coroutines:\n";
745 formatter.flush(*_dout);
746 *_dout << dendl;
747 ceph_assert(context_stacks.empty() || going_down); // assert on deadlock
748 }
749
750 for (auto stack : context_stacks) {
751 ldout(cct, 20) << "clearing stack on run() exit: stack=" << (void *)stack << " nref=" << stack->get_nref() << dendl;
752 stack->cancel();
753 }
754 run_contexts.erase(run_context);
755 lock.unlock();
756
757 return ret;
758 }
759
760 int RGWCoroutinesManager::run(RGWCoroutine *op)
761 {
762 if (!op) {
763 return 0;
764 }
765 list<RGWCoroutinesStack *> stacks;
766 RGWCoroutinesStack *stack = allocate_stack();
767 op->get();
768 stack->call(op);
769
770 stacks.push_back(stack);
771
772 int r = run(stacks);
773 if (r < 0) {
774 ldout(cct, 20) << "run(stacks) returned r=" << r << dendl;
775 } else {
776 r = op->get_ret_status();
777 }
778 op->put();
779
780 return r;
781 }
782
783 RGWAioCompletionNotifier *RGWCoroutinesManager::create_completion_notifier(RGWCoroutinesStack *stack)
784 {
785 rgw_io_id io_id{get_next_io_id(), -1};
786 RGWAioCompletionNotifier *cn = new RGWAioCompletionNotifier(completion_mgr, io_id, (void *)stack);
787 completion_mgr->register_completion_notifier(cn);
788 return cn;
789 }
790
791 void RGWCoroutinesManager::dump(Formatter *f) const {
792 std::shared_lock rl{lock};
793
794 f->open_array_section("run_contexts");
795 for (auto& i : run_contexts) {
796 f->open_object_section("context");
797 ::encode_json("id", i.first, f);
798 f->open_array_section("entries");
799 for (auto& s : i.second) {
800 ::encode_json("entry", *s, f);
801 }
802 f->close_section();
803 f->close_section();
804 }
805 f->close_section();
806 }
807
808 RGWCoroutinesStack *RGWCoroutinesManager::allocate_stack() {
809 return new RGWCoroutinesStack(cct, this);
810 }
811
812 string RGWCoroutinesManager::get_id()
813 {
814 if (!id.empty()) {
815 return id;
816 }
817 stringstream ss;
818 ss << (void *)this;
819 return ss.str();
820 }
821
822 void RGWCoroutinesManagerRegistry::add(RGWCoroutinesManager *mgr)
823 {
824 std::unique_lock wl{lock};
825 if (managers.find(mgr) == managers.end()) {
826 managers.insert(mgr);
827 get();
828 }
829 }
830
831 void RGWCoroutinesManagerRegistry::remove(RGWCoroutinesManager *mgr)
832 {
833 std::unique_lock wl{lock};
834 if (managers.find(mgr) != managers.end()) {
835 managers.erase(mgr);
836 put();
837 }
838 }
839
840 RGWCoroutinesManagerRegistry::~RGWCoroutinesManagerRegistry()
841 {
842 AdminSocket *admin_socket = cct->get_admin_socket();
843 if (!admin_command.empty()) {
844 admin_socket->unregister_commands(this);
845 }
846 }
847
848 int RGWCoroutinesManagerRegistry::hook_to_admin_command(const string& command)
849 {
850 AdminSocket *admin_socket = cct->get_admin_socket();
851 if (!admin_command.empty()) {
852 admin_socket->unregister_commands(this);
853 }
854 admin_command = command;
855 int r = admin_socket->register_command(admin_command, this,
856 "dump current coroutines stack state");
857 if (r < 0) {
858 lderr(cct) << "ERROR: fail to register admin socket command (r=" << r << ")" << dendl;
859 return r;
860 }
861 return 0;
862 }
863
864 int RGWCoroutinesManagerRegistry::call(std::string_view command,
865 const cmdmap_t& cmdmap,
866 Formatter *f,
867 std::ostream& ss,
868 bufferlist& out) {
869 std::shared_lock rl{lock};
870 ::encode_json("cr_managers", *this, f);
871 return 0;
872 }
873
874 void RGWCoroutinesManagerRegistry::dump(Formatter *f) const {
875 f->open_array_section("coroutine_managers");
876 for (auto m : managers) {
877 ::encode_json("entry", *m, f);
878 }
879 f->close_section();
880 }
881
882 void RGWCoroutine::call(RGWCoroutine *op)
883 {
884 if (op) {
885 stack->call(op);
886 } else {
887 // the call()er expects this to set a retcode
888 retcode = 0;
889 }
890 }
891
892 RGWCoroutinesStack *RGWCoroutine::spawn(RGWCoroutine *op, bool wait)
893 {
894 return stack->spawn(this, op, wait);
895 }
896
897 bool RGWCoroutine::collect(int *ret, RGWCoroutinesStack *skip_stack) /* returns true if needs to be called again */
898 {
899 return stack->collect(this, ret, skip_stack);
900 }
901
902 bool RGWCoroutine::collect_next(int *ret, RGWCoroutinesStack **collected_stack) /* returns true if found a stack to collect */
903 {
904 return stack->collect_next(this, ret, collected_stack);
905 }
906
907 int RGWCoroutine::wait(const utime_t& interval)
908 {
909 return stack->wait(interval);
910 }
911
912 void RGWCoroutine::wait_for_child()
913 {
914 /* should only wait for child if there is a child that is not done yet, and no complete children */
915 if (spawned.entries.empty()) {
916 return;
917 }
918 for (vector<RGWCoroutinesStack *>::iterator iter = spawned.entries.begin(); iter != spawned.entries.end(); ++iter) {
919 if ((*iter)->is_done()) {
920 return;
921 }
922 }
923 stack->set_wait_for_child(true);
924 }
925
926 string RGWCoroutine::to_str() const
927 {
928 return typeid(*this).name();
929 }
930
931 ostream& operator<<(ostream& out, const RGWCoroutine& cr)
932 {
933 out << "cr:s=" << (void *)cr.get_stack() << ":op=" << (void *)&cr << ":" << typeid(cr).name();
934 return out;
935 }
936
937 bool RGWCoroutine::drain_children(int num_cr_left, RGWCoroutinesStack *skip_stack)
938 {
939 bool done = false;
940 ceph_assert(num_cr_left >= 0);
941 if (num_cr_left == 0 && skip_stack) {
942 num_cr_left = 1;
943 }
944 reenter(&drain_cr) {
945 while (num_spawned() > (size_t)num_cr_left) {
946 yield wait_for_child();
947 int ret;
948 while (collect(&ret, skip_stack)) {
949 if (ret < 0) {
950 ldout(cct, 10) << "collect() returned ret=" << ret << dendl;
951 /* we should have reported this error */
952 log_error() << "ERROR: collect() returned error (ret=" << ret << ")";
953 }
954 }
955 }
956 done = true;
957 }
958 return done;
959 }
960
961 void RGWCoroutine::wakeup()
962 {
963 stack->wakeup();
964 }
965
966 RGWCoroutinesEnv *RGWCoroutine::get_env() const
967 {
968 return stack->get_env();
969 }
970
971 void RGWCoroutine::dump(Formatter *f) const {
972 if (!description.str().empty()) {
973 encode_json("description", description.str(), f);
974 }
975 encode_json("type", to_str(), f);
976 if (!spawned.entries.empty()) {
977 f->open_array_section("spawned");
978 for (auto& i : spawned.entries) {
979 char buf[32];
980 snprintf(buf, sizeof(buf), "%p", (void *)i);
981 encode_json("stack", string(buf), f);
982 }
983 f->close_section();
984 }
985 if (!status.history.empty()) {
986 encode_json("history", status.history, f);
987 }
988
989 if (!status.status.str().empty()) {
990 f->open_object_section("status");
991 encode_json("status", status.status.str(), f);
992 encode_json("timestamp", status.timestamp, f);
993 f->close_section();
994 }
995 }
996
997 RGWSimpleCoroutine::~RGWSimpleCoroutine()
998 {
999 if (!called_cleanup) {
1000 request_cleanup();
1001 }
1002 }
1003
1004 void RGWSimpleCoroutine::call_cleanup()
1005 {
1006 called_cleanup = true;
1007 request_cleanup();
1008 }
1009
1010 int RGWSimpleCoroutine::operate()
1011 {
1012 int ret = 0;
1013 reenter(this) {
1014 yield return state_init();
1015 yield return state_send_request();
1016 yield return state_request_complete();
1017 yield return state_all_complete();
1018 drain_all();
1019 call_cleanup();
1020 return set_state(RGWCoroutine_Done, ret);
1021 }
1022 return 0;
1023 }
1024
1025 int RGWSimpleCoroutine::state_init()
1026 {
1027 int ret = init();
1028 if (ret < 0) {
1029 call_cleanup();
1030 return set_state(RGWCoroutine_Error, ret);
1031 }
1032 return 0;
1033 }
1034
1035 int RGWSimpleCoroutine::state_send_request()
1036 {
1037 int ret = send_request();
1038 if (ret < 0) {
1039 call_cleanup();
1040 return set_state(RGWCoroutine_Error, ret);
1041 }
1042 return io_block(0);
1043 }
1044
1045 int RGWSimpleCoroutine::state_request_complete()
1046 {
1047 int ret = request_complete();
1048 if (ret < 0) {
1049 call_cleanup();
1050 return set_state(RGWCoroutine_Error, ret);
1051 }
1052 return 0;
1053 }
1054
1055 int RGWSimpleCoroutine::state_all_complete()
1056 {
1057 int ret = finish();
1058 if (ret < 0) {
1059 call_cleanup();
1060 return set_state(RGWCoroutine_Error, ret);
1061 }
1062 return 0;
1063 }
1064
1065