]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_coroutine.cc
96f2ddabd07687a977f4555260f723ee62eecd80
[ceph.git] / ceph / src / rgw / rgw_coroutine.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "include/Context.h"
5 #include "common/ceph_json.h"
6 #include "rgw_coroutine.h"
7
8 // re-include our assert to clobber the system one; fix dout:
9 #include "include/ceph_assert.h"
10
11 #include <boost/asio/yield.hpp>
12
13 #define dout_subsys ceph_subsys_rgw
14 #define dout_context g_ceph_context
15
16 using namespace std;
17
18 class RGWCompletionManager::WaitContext : public Context {
19 RGWCompletionManager *manager;
20 void *opaque;
21 public:
22 WaitContext(RGWCompletionManager *_cm, void *_opaque) : manager(_cm), opaque(_opaque) {}
23 void finish(int r) override {
24 manager->_wakeup(opaque);
25 }
26 };
27
28 RGWCompletionManager::RGWCompletionManager(CephContext *_cct) : cct(_cct),
29 timer(cct, lock)
30 {
31 timer.init();
32 }
33
34 RGWCompletionManager::~RGWCompletionManager()
35 {
36 std::lock_guard l{lock};
37 timer.cancel_all_events();
38 timer.shutdown();
39 }
40
41 void RGWCompletionManager::complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info)
42 {
43 std::lock_guard l{lock};
44 _complete(cn, io_id, user_info);
45 }
46
47 void RGWCompletionManager::register_completion_notifier(RGWAioCompletionNotifier *cn)
48 {
49 std::lock_guard l{lock};
50 if (cn) {
51 cns.insert(cn);
52 }
53 }
54
55 void RGWCompletionManager::unregister_completion_notifier(RGWAioCompletionNotifier *cn)
56 {
57 std::lock_guard l{lock};
58 if (cn) {
59 cns.erase(cn);
60 }
61 }
62
63 void RGWCompletionManager::_complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info)
64 {
65 if (cn) {
66 cns.erase(cn);
67 }
68
69 if (complete_reqs_set.find(io_id) != complete_reqs_set.end()) {
70 /* already have completion for this io_id, don't allow multiple completions for it */
71 return;
72 }
73 complete_reqs.push_back(io_completion{io_id, user_info});
74 cond.notify_all();
75 }
76
77 int RGWCompletionManager::get_next(io_completion *io)
78 {
79 std::unique_lock l{lock};
80 while (complete_reqs.empty()) {
81 if (going_down) {
82 return -ECANCELED;
83 }
84 cond.wait(l);
85 }
86 *io = complete_reqs.front();
87 complete_reqs_set.erase(io->io_id);
88 complete_reqs.pop_front();
89 return 0;
90 }
91
92 bool RGWCompletionManager::try_get_next(io_completion *io)
93 {
94 std::lock_guard l{lock};
95 if (complete_reqs.empty()) {
96 return false;
97 }
98 *io = complete_reqs.front();
99 complete_reqs_set.erase(io->io_id);
100 complete_reqs.pop_front();
101 return true;
102 }
103
104 void RGWCompletionManager::go_down()
105 {
106 std::lock_guard l{lock};
107 for (auto cn : cns) {
108 cn->unregister();
109 }
110 going_down = true;
111 cond.notify_all();
112 }
113
114 void RGWCompletionManager::wait_interval(void *opaque, const utime_t& interval, void *user_info)
115 {
116 std::lock_guard l{lock};
117 ceph_assert(waiters.find(opaque) == waiters.end());
118 waiters[opaque] = user_info;
119 timer.add_event_after(interval, new WaitContext(this, opaque));
120 }
121
122 void RGWCompletionManager::wakeup(void *opaque)
123 {
124 std::lock_guard l{lock};
125 _wakeup(opaque);
126 }
127
128 void RGWCompletionManager::_wakeup(void *opaque)
129 {
130 map<void *, void *>::iterator iter = waiters.find(opaque);
131 if (iter != waiters.end()) {
132 void *user_id = iter->second;
133 waiters.erase(iter);
134 _complete(NULL, rgw_io_id{0, -1} /* no IO id */, user_id);
135 }
136 }
137
138 RGWCoroutine::~RGWCoroutine() {
139 for (auto stack : spawned.entries) {
140 stack->put();
141 }
142 }
143
144 void RGWCoroutine::init_new_io(RGWIOProvider *io_provider)
145 {
146 stack->init_new_io(io_provider);
147 }
148
149 void RGWCoroutine::set_io_blocked(bool flag) {
150 stack->set_io_blocked(flag);
151 }
152
153 void RGWCoroutine::set_sleeping(bool flag) {
154 stack->set_sleeping(flag);
155 }
156
157 int RGWCoroutine::io_block(int ret, int64_t io_id) {
158 return io_block(ret, rgw_io_id{io_id, -1});
159 }
160
161 int RGWCoroutine::io_block(int ret, const rgw_io_id& io_id) {
162 if (stack->consume_io_finish(io_id)) {
163 return 0;
164 }
165 set_io_blocked(true);
166 stack->set_io_blocked_id(io_id);
167 return ret;
168 }
169
170 void RGWCoroutine::io_complete(const rgw_io_id& io_id) {
171 stack->io_complete(io_id);
172 }
173
174 void RGWCoroutine::StatusItem::dump(Formatter *f) const {
175 ::encode_json("timestamp", timestamp, f);
176 ::encode_json("status", status, f);
177 }
178
179 stringstream& RGWCoroutine::Status::set_status()
180 {
181 std::unique_lock l{lock};
182 string s = status.str();
183 status.str(string());
184 if (!timestamp.is_zero()) {
185 history.push_back(StatusItem(timestamp, s));
186 }
187 if (history.size() > (size_t)max_history) {
188 history.pop_front();
189 }
190 timestamp = ceph_clock_now();
191
192 return status;
193 }
194
195 RGWCoroutinesManager::~RGWCoroutinesManager() {
196 stop();
197 completion_mgr->put();
198 if (cr_registry) {
199 cr_registry->remove(this);
200 }
201 }
202
203 int64_t RGWCoroutinesManager::get_next_io_id()
204 {
205 return (int64_t)++max_io_id;
206 }
207
208 uint64_t RGWCoroutinesManager::get_next_stack_id() {
209 return (uint64_t)++max_stack_id;
210 }
211
212 RGWCoroutinesStack::RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start) : cct(_cct), ops_mgr(_ops_mgr),
213 done_flag(false), error_flag(false), blocked_flag(false),
214 sleep_flag(false), interval_wait_flag(false), is_scheduled(false), is_waiting_for_child(false),
215 retcode(0), run_count(0),
216 env(NULL), parent(NULL)
217 {
218 id = ops_mgr->get_next_stack_id();
219 if (start) {
220 ops.push_back(start);
221 }
222 pos = ops.begin();
223 }
224
225 RGWCoroutinesStack::~RGWCoroutinesStack()
226 {
227 for (auto op : ops) {
228 op->put();
229 }
230
231 for (auto stack : spawned.entries) {
232 stack->put();
233 }
234
235 if (preallocated_stack) {
236 preallocated_stack->put();
237 }
238 }
239
240 int RGWCoroutinesStack::operate(const DoutPrefixProvider *dpp, RGWCoroutinesEnv *_env)
241 {
242 env = _env;
243 RGWCoroutine *op = *pos;
244 op->stack = this;
245 ldpp_dout(dpp, 20) << *op << ": operate()" << dendl;
246 int r = op->operate_wrapper(dpp);
247 if (r < 0) {
248 ldpp_dout(dpp, 20) << *op << ": operate() returned r=" << r << dendl;
249 }
250
251 error_flag = op->is_error();
252
253 if (op->is_done()) {
254 int op_retcode = r;
255 r = unwind(op_retcode);
256 op->put();
257 done_flag = (pos == ops.end());
258 blocked_flag &= !done_flag;
259 if (done_flag) {
260 retcode = op_retcode;
261 }
262 return r;
263 }
264
265 /* should r ever be negative at this point? */
266 ceph_assert(r >= 0);
267
268 return 0;
269 }
270
271 string RGWCoroutinesStack::error_str()
272 {
273 if (pos != ops.end()) {
274 return (*pos)->error_str();
275 }
276 return string();
277 }
278
279 void RGWCoroutinesStack::call(RGWCoroutine *next_op) {
280 if (!next_op) {
281 return;
282 }
283 ops.push_back(next_op);
284 if (pos != ops.end()) {
285 ++pos;
286 } else {
287 pos = ops.begin();
288 }
289 }
290
291 void RGWCoroutinesStack::schedule()
292 {
293 env->manager->schedule(env, this);
294 }
295
296 void RGWCoroutinesStack::_schedule()
297 {
298 env->manager->_schedule(env, this);
299 }
300
301 RGWCoroutinesStack *RGWCoroutinesStack::spawn(RGWCoroutine *source_op, RGWCoroutine *op, bool wait)
302 {
303 if (!op) {
304 return NULL;
305 }
306
307 rgw_spawned_stacks *s = (source_op ? &source_op->spawned : &spawned);
308
309 RGWCoroutinesStack *stack = preallocated_stack;
310 if (!stack) {
311 stack = env->manager->allocate_stack();
312 }
313 preallocated_stack = nullptr;
314
315 s->add_pending(stack);
316 stack->parent = this;
317
318 stack->get(); /* we'll need to collect the stack */
319 stack->call(op);
320
321 env->manager->schedule(env, stack);
322
323 if (wait) {
324 set_blocked_by(stack);
325 }
326
327 return stack;
328 }
329
330 RGWCoroutinesStack *RGWCoroutinesStack::spawn(RGWCoroutine *op, bool wait)
331 {
332 return spawn(NULL, op, wait);
333 }
334
335 RGWCoroutinesStack *RGWCoroutinesStack::prealloc_stack()
336 {
337 if (!preallocated_stack) {
338 preallocated_stack = env->manager->allocate_stack();
339 }
340 return preallocated_stack;
341 }
342
343 int RGWCoroutinesStack::wait(const utime_t& interval)
344 {
345 RGWCompletionManager *completion_mgr = env->manager->get_completion_mgr();
346 completion_mgr->wait_interval((void *)this, interval, (void *)this);
347 set_io_blocked(true);
348 set_interval_wait(true);
349 return 0;
350 }
351
352 void RGWCoroutinesStack::wakeup()
353 {
354 RGWCompletionManager *completion_mgr = env->manager->get_completion_mgr();
355 completion_mgr->wakeup((void *)this);
356 }
357
358 void RGWCoroutinesStack::io_complete(const rgw_io_id& io_id)
359 {
360 RGWCompletionManager *completion_mgr = env->manager->get_completion_mgr();
361 completion_mgr->complete(nullptr, io_id, (void *)this);
362 }
363
364 int RGWCoroutinesStack::unwind(int retcode)
365 {
366 rgw_spawned_stacks *src_spawned = &(*pos)->spawned;
367
368 if (pos == ops.begin()) {
369 ldout(cct, 15) << "stack " << (void *)this << " end" << dendl;
370 spawned.inherit(src_spawned);
371 ops.clear();
372 pos = ops.end();
373 return retcode;
374 }
375
376 --pos;
377 ops.pop_back();
378 RGWCoroutine *op = *pos;
379 op->set_retcode(retcode);
380 op->spawned.inherit(src_spawned);
381 return 0;
382 }
383
384 void RGWCoroutinesStack::cancel()
385 {
386 while (!ops.empty()) {
387 RGWCoroutine *op = *pos;
388 unwind(-ECANCELED);
389 op->put();
390 }
391 put();
392 }
393
394 bool RGWCoroutinesStack::collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id) /* returns true if needs to be called again */
395 {
396 bool need_retry = false;
397 rgw_spawned_stacks *s = (op ? &op->spawned : &spawned);
398 *ret = 0;
399 vector<RGWCoroutinesStack *> new_list;
400
401 for (vector<RGWCoroutinesStack *>::iterator iter = s->entries.begin(); iter != s->entries.end(); ++iter) {
402 RGWCoroutinesStack *stack = *iter;
403 if (stack == skip_stack || !stack->is_done()) {
404 new_list.push_back(stack);
405 if (!stack->is_done()) {
406 ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " is still running" << dendl;
407 } else if (stack == skip_stack) {
408 ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " explicitly skipping stack" << dendl;
409 }
410 continue;
411 }
412 if (stack_id) {
413 *stack_id = stack->get_id();
414 }
415 int r = stack->get_ret_status();
416 stack->put();
417 if (r < 0) {
418 *ret = r;
419 ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " encountered error (r=" << r << "), skipping next stacks" << dendl;
420 new_list.insert(new_list.end(), ++iter, s->entries.end());
421 need_retry = (iter != s->entries.end());
422 break;
423 }
424
425 ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " is complete" << dendl;
426 }
427
428 s->entries.swap(new_list);
429 return need_retry;
430 }
431
432 bool RGWCoroutinesStack::collect_next(RGWCoroutine *op, int *ret, RGWCoroutinesStack **collected_stack) /* returns true if found a stack to collect */
433 {
434 rgw_spawned_stacks *s = (op ? &op->spawned : &spawned);
435 *ret = 0;
436
437 if (collected_stack) {
438 *collected_stack = NULL;
439 }
440
441 for (vector<RGWCoroutinesStack *>::iterator iter = s->entries.begin(); iter != s->entries.end(); ++iter) {
442 RGWCoroutinesStack *stack = *iter;
443 if (!stack->is_done()) {
444 continue;
445 }
446 int r = stack->get_ret_status();
447 if (r < 0) {
448 *ret = r;
449 }
450
451 if (collected_stack) {
452 *collected_stack = stack;
453 }
454 stack->put();
455
456 s->entries.erase(iter);
457 return true;
458 }
459
460 return false;
461 }
462
463 bool RGWCoroutinesStack::collect(int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id) /* returns true if needs to be called again */
464 {
465 return collect(NULL, ret, skip_stack, stack_id);
466 }
467
468 static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg)
469 {
470 (static_cast<RGWAioCompletionNotifier *>(arg))->cb();
471 }
472
473 RGWAioCompletionNotifier::RGWAioCompletionNotifier(RGWCompletionManager *_mgr, const rgw_io_id& _io_id, void *_user_data) : completion_mgr(_mgr),
474 io_id(_io_id),
475 user_data(_user_data), registered(true) {
476 c = librados::Rados::aio_create_completion(this, _aio_completion_notifier_cb);
477 }
478
479 RGWAioCompletionNotifier *RGWCoroutinesStack::create_completion_notifier()
480 {
481 return ops_mgr->create_completion_notifier(this);
482 }
483
484 RGWCompletionManager *RGWCoroutinesStack::get_completion_mgr()
485 {
486 return ops_mgr->get_completion_mgr();
487 }
488
489 bool RGWCoroutinesStack::unblock_stack(RGWCoroutinesStack **s)
490 {
491 if (blocking_stacks.empty()) {
492 return false;
493 }
494
495 set<RGWCoroutinesStack *>::iterator iter = blocking_stacks.begin();
496 *s = *iter;
497 blocking_stacks.erase(iter);
498 (*s)->blocked_by_stack.erase(this);
499
500 return true;
501 }
502
503 void RGWCoroutinesManager::report_error(RGWCoroutinesStack *op)
504 {
505 if (!op) {
506 return;
507 }
508 string err = op->error_str();
509 if (err.empty()) {
510 return;
511 }
512 lderr(cct) << "ERROR: failed operation: " << op->error_str() << dendl;
513 }
514
515 void RGWCoroutinesStack::dump(Formatter *f) const {
516 stringstream ss;
517 ss << (void *)this;
518 ::encode_json("stack", ss.str(), f);
519 ::encode_json("run_count", run_count, f);
520 f->open_array_section("ops");
521 for (auto& i : ops) {
522 encode_json("op", *i, f);
523 }
524 f->close_section();
525 }
526
527 void RGWCoroutinesStack::init_new_io(RGWIOProvider *io_provider)
528 {
529 io_provider->set_io_user_info((void *)this);
530 io_provider->assign_io(env->manager->get_io_id_provider());
531 }
532
533 bool RGWCoroutinesStack::try_io_unblock(const rgw_io_id& io_id)
534 {
535 if (!can_io_unblock(io_id)) {
536 auto p = io_finish_ids.emplace(io_id.id, io_id);
537 auto& iter = p.first;
538 bool inserted = p.second;
539 if (!inserted) { /* could not insert, entry already existed, add channel to completion mask */
540 iter->second.channels |= io_id.channels;
541 }
542 return false;
543 }
544
545 return true;
546 }
547
548 bool RGWCoroutinesStack::consume_io_finish(const rgw_io_id& io_id)
549 {
550 auto iter = io_finish_ids.find(io_id.id);
551 if (iter == io_finish_ids.end()) {
552 return false;
553 }
554 int finish_mask = iter->second.channels;
555 bool found = (finish_mask & io_id.channels) != 0;
556
557 finish_mask &= ~(finish_mask & io_id.channels);
558
559 if (finish_mask == 0) {
560 io_finish_ids.erase(iter);
561 }
562 return found;
563 }
564
565
566 void RGWCoroutinesManager::handle_unblocked_stack(set<RGWCoroutinesStack *>& context_stacks, list<RGWCoroutinesStack *>& scheduled_stacks,
567 RGWCompletionManager::io_completion& io, int *blocked_count)
568 {
569 ceph_assert(ceph_mutex_is_wlocked(lock));
570 RGWCoroutinesStack *stack = static_cast<RGWCoroutinesStack *>(io.user_info);
571 if (context_stacks.find(stack) == context_stacks.end()) {
572 return;
573 }
574 if (!stack->try_io_unblock(io.io_id)) {
575 return;
576 }
577 if (stack->is_io_blocked()) {
578 --(*blocked_count);
579 stack->set_io_blocked(false);
580 }
581 stack->set_interval_wait(false);
582 if (!stack->is_done()) {
583 if (!stack->is_scheduled) {
584 scheduled_stacks.push_back(stack);
585 stack->set_is_scheduled(true);
586 }
587 } else {
588 context_stacks.erase(stack);
589 stack->put();
590 }
591 }
592
593 void RGWCoroutinesManager::schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack)
594 {
595 std::unique_lock wl{lock};
596 _schedule(env, stack);
597 }
598
599 void RGWCoroutinesManager::_schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack)
600 {
601 ceph_assert(ceph_mutex_is_wlocked(lock));
602 if (!stack->is_scheduled) {
603 env->scheduled_stacks->push_back(stack);
604 stack->set_is_scheduled(true);
605 }
606 set<RGWCoroutinesStack *>& context_stacks = run_contexts[env->run_context];
607 context_stacks.insert(stack);
608 }
609
610 void RGWCoroutinesManager::set_sleeping(RGWCoroutine *cr, bool flag)
611 {
612 cr->set_sleeping(flag);
613 }
614
615 void RGWCoroutinesManager::io_complete(RGWCoroutine *cr, const rgw_io_id& io_id)
616 {
617 cr->io_complete(io_id);
618 }
619
620 int RGWCoroutinesManager::run(const DoutPrefixProvider *dpp, list<RGWCoroutinesStack *>& stacks)
621 {
622 int ret = 0;
623 int blocked_count = 0;
624 int interval_wait_count = 0;
625 bool canceled = false; // set on going_down
626 RGWCoroutinesEnv env;
627 bool op_not_blocked;
628
629 uint64_t run_context = ++run_context_count;
630
631 lock.lock();
632 set<RGWCoroutinesStack *>& context_stacks = run_contexts[run_context];
633 list<RGWCoroutinesStack *> scheduled_stacks;
634 for (auto& st : stacks) {
635 context_stacks.insert(st);
636 scheduled_stacks.push_back(st);
637 st->set_is_scheduled(true);
638 }
639 env.run_context = run_context;
640 env.manager = this;
641 env.scheduled_stacks = &scheduled_stacks;
642
643 for (list<RGWCoroutinesStack *>::iterator iter = scheduled_stacks.begin(); iter != scheduled_stacks.end() && !going_down;) {
644 RGWCompletionManager::io_completion io;
645 RGWCoroutinesStack *stack = *iter;
646 ++iter;
647 scheduled_stacks.pop_front();
648
649 if (context_stacks.find(stack) == context_stacks.end()) {
650 /* stack was probably schedule more than once due to IO, but was since complete */
651 goto next;
652 }
653 env.stack = stack;
654
655 lock.unlock();
656
657 ret = stack->operate(dpp, &env);
658
659 lock.lock();
660
661 stack->set_is_scheduled(false);
662 if (ret < 0) {
663 ldpp_dout(dpp, 20) << "stack->operate() returned ret=" << ret << dendl;
664 }
665
666 if (stack->is_error()) {
667 report_error(stack);
668 }
669
670 op_not_blocked = false;
671
672 if (stack->is_io_blocked()) {
673 ldout(cct, 20) << __func__ << ":" << " stack=" << (void *)stack << " is io blocked" << dendl;
674 if (stack->is_interval_waiting()) {
675 interval_wait_count++;
676 }
677 blocked_count++;
678 } else if (stack->is_blocked()) {
679 /* do nothing, we'll re-add the stack when the blocking stack is done,
680 * or when we're awaken
681 */
682 ldout(cct, 20) << __func__ << ":" << " stack=" << (void *)stack << " is_blocked_by_stack()=" << stack->is_blocked_by_stack()
683 << " is_sleeping=" << stack->is_sleeping() << " waiting_for_child()=" << stack->waiting_for_child() << dendl;
684 } else if (stack->is_done()) {
685 ldout(cct, 20) << __func__ << ":" << " stack=" << (void *)stack << " is done" << dendl;
686 RGWCoroutinesStack *s;
687 while (stack->unblock_stack(&s)) {
688 if (!s->is_blocked_by_stack() && !s->is_done()) {
689 if (s->is_io_blocked()) {
690 if (stack->is_interval_waiting()) {
691 interval_wait_count++;
692 }
693 blocked_count++;
694 } else {
695 s->_schedule();
696 }
697 }
698 }
699 if (stack->parent && stack->parent->waiting_for_child()) {
700 stack->parent->set_wait_for_child(false);
701 stack->parent->_schedule();
702 }
703 context_stacks.erase(stack);
704 stack->put();
705 stack = NULL;
706 } else {
707 op_not_blocked = true;
708 stack->run_count++;
709 stack->_schedule();
710 }
711
712 if (!op_not_blocked && stack) {
713 stack->run_count = 0;
714 }
715
716 while (completion_mgr->try_get_next(&io)) {
717 handle_unblocked_stack(context_stacks, scheduled_stacks, io, &blocked_count);
718 }
719
720 /*
721 * only account blocked operations that are not in interval_wait, these are stacks that
722 * were put on a wait without any real IO operations. While we mark these as io_blocked,
723 * these aren't really waiting for IOs
724 */
725 while (blocked_count - interval_wait_count >= ops_window) {
726 lock.unlock();
727 ret = completion_mgr->get_next(&io);
728 lock.lock();
729 if (ret < 0) {
730 ldout(cct, 5) << "completion_mgr.get_next() returned ret=" << ret << dendl;
731 }
732 handle_unblocked_stack(context_stacks, scheduled_stacks, io, &blocked_count);
733 }
734
735 next:
736 while (scheduled_stacks.empty() && blocked_count > 0) {
737 lock.unlock();
738 ret = completion_mgr->get_next(&io);
739 lock.lock();
740 if (ret < 0) {
741 ldout(cct, 5) << "completion_mgr.get_next() returned ret=" << ret << dendl;
742 }
743 if (going_down) {
744 ldout(cct, 5) << __func__ << "(): was stopped, exiting" << dendl;
745 ret = -ECANCELED;
746 canceled = true;
747 break;
748 }
749 handle_unblocked_stack(context_stacks, scheduled_stacks, io, &blocked_count);
750 iter = scheduled_stacks.begin();
751 }
752 if (canceled) {
753 break;
754 }
755
756 if (iter == scheduled_stacks.end()) {
757 iter = scheduled_stacks.begin();
758 }
759 }
760
761 if (!context_stacks.empty() && !going_down) {
762 JSONFormatter formatter(true);
763 formatter.open_array_section("context_stacks");
764 for (auto& s : context_stacks) {
765 ::encode_json("entry", *s, &formatter);
766 }
767 formatter.close_section();
768 lderr(cct) << __func__ << "(): ERROR: deadlock detected, dumping remaining coroutines:\n";
769 formatter.flush(*_dout);
770 *_dout << dendl;
771 ceph_assert(context_stacks.empty() || going_down); // assert on deadlock
772 }
773
774 for (auto stack : context_stacks) {
775 ldout(cct, 20) << "clearing stack on run() exit: stack=" << (void *)stack << " nref=" << stack->get_nref() << dendl;
776 stack->cancel();
777 }
778 run_contexts.erase(run_context);
779 lock.unlock();
780
781 return ret;
782 }
783
784 int RGWCoroutinesManager::run(const DoutPrefixProvider *dpp, RGWCoroutine *op)
785 {
786 if (!op) {
787 return 0;
788 }
789 list<RGWCoroutinesStack *> stacks;
790 RGWCoroutinesStack *stack = allocate_stack();
791 op->get();
792 stack->call(op);
793
794 stacks.push_back(stack);
795
796 int r = run(dpp, stacks);
797 if (r < 0) {
798 ldpp_dout(dpp, 20) << "run(stacks) returned r=" << r << dendl;
799 } else {
800 r = op->get_ret_status();
801 }
802 op->put();
803
804 return r;
805 }
806
807 RGWAioCompletionNotifier *RGWCoroutinesManager::create_completion_notifier(RGWCoroutinesStack *stack)
808 {
809 rgw_io_id io_id{get_next_io_id(), -1};
810 RGWAioCompletionNotifier *cn = new RGWAioCompletionNotifier(completion_mgr, io_id, (void *)stack);
811 completion_mgr->register_completion_notifier(cn);
812 return cn;
813 }
814
815 void RGWCoroutinesManager::dump(Formatter *f) const {
816 std::shared_lock rl{lock};
817
818 f->open_array_section("run_contexts");
819 for (auto& i : run_contexts) {
820 f->open_object_section("context");
821 ::encode_json("id", i.first, f);
822 f->open_array_section("entries");
823 for (auto& s : i.second) {
824 ::encode_json("entry", *s, f);
825 }
826 f->close_section();
827 f->close_section();
828 }
829 f->close_section();
830 }
831
832 RGWCoroutinesStack *RGWCoroutinesManager::allocate_stack() {
833 return new RGWCoroutinesStack(cct, this);
834 }
835
836 string RGWCoroutinesManager::get_id()
837 {
838 if (!id.empty()) {
839 return id;
840 }
841 stringstream ss;
842 ss << (void *)this;
843 return ss.str();
844 }
845
846 void RGWCoroutinesManagerRegistry::add(RGWCoroutinesManager *mgr)
847 {
848 std::unique_lock wl{lock};
849 if (managers.find(mgr) == managers.end()) {
850 managers.insert(mgr);
851 get();
852 }
853 }
854
855 void RGWCoroutinesManagerRegistry::remove(RGWCoroutinesManager *mgr)
856 {
857 std::unique_lock wl{lock};
858 if (managers.find(mgr) != managers.end()) {
859 managers.erase(mgr);
860 put();
861 }
862 }
863
864 RGWCoroutinesManagerRegistry::~RGWCoroutinesManagerRegistry()
865 {
866 AdminSocket *admin_socket = cct->get_admin_socket();
867 if (!admin_command.empty()) {
868 admin_socket->unregister_commands(this);
869 }
870 }
871
872 int RGWCoroutinesManagerRegistry::hook_to_admin_command(const string& command)
873 {
874 AdminSocket *admin_socket = cct->get_admin_socket();
875 if (!admin_command.empty()) {
876 admin_socket->unregister_commands(this);
877 }
878 admin_command = command;
879 int r = admin_socket->register_command(admin_command, this,
880 "dump current coroutines stack state");
881 if (r < 0) {
882 lderr(cct) << "ERROR: fail to register admin socket command (r=" << r << ")" << dendl;
883 return r;
884 }
885 return 0;
886 }
887
888 int RGWCoroutinesManagerRegistry::call(std::string_view command,
889 const cmdmap_t& cmdmap,
890 Formatter *f,
891 std::ostream& ss,
892 bufferlist& out) {
893 std::shared_lock rl{lock};
894 ::encode_json("cr_managers", *this, f);
895 return 0;
896 }
897
898 void RGWCoroutinesManagerRegistry::dump(Formatter *f) const {
899 f->open_array_section("coroutine_managers");
900 for (auto m : managers) {
901 ::encode_json("entry", *m, f);
902 }
903 f->close_section();
904 }
905
906 void RGWCoroutine::call(RGWCoroutine *op)
907 {
908 if (op) {
909 stack->call(op);
910 } else {
911 // the call()er expects this to set a retcode
912 retcode = 0;
913 }
914 }
915
916 RGWCoroutinesStack *RGWCoroutine::spawn(RGWCoroutine *op, bool wait)
917 {
918 return stack->spawn(this, op, wait);
919 }
920
921 RGWCoroutinesStack *RGWCoroutine::prealloc_stack()
922 {
923 return stack->prealloc_stack();
924 }
925
926 uint64_t RGWCoroutine::prealloc_stack_id()
927 {
928 return prealloc_stack()->get_id();
929 }
930
931 bool RGWCoroutine::collect(int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id) /* returns true if needs to be called again */
932 {
933 return stack->collect(this, ret, skip_stack, stack_id);
934 }
935
936 bool RGWCoroutine::collect_next(int *ret, RGWCoroutinesStack **collected_stack) /* returns true if found a stack to collect */
937 {
938 return stack->collect_next(this, ret, collected_stack);
939 }
940
941 int RGWCoroutine::wait(const utime_t& interval)
942 {
943 return stack->wait(interval);
944 }
945
946 void RGWCoroutine::wait_for_child()
947 {
948 /* should only wait for child if there is a child that is not done yet, and no complete children */
949 if (spawned.entries.empty()) {
950 return;
951 }
952 for (vector<RGWCoroutinesStack *>::iterator iter = spawned.entries.begin(); iter != spawned.entries.end(); ++iter) {
953 if ((*iter)->is_done()) {
954 return;
955 }
956 }
957 stack->set_wait_for_child(true);
958 }
959
960 string RGWCoroutine::to_str() const
961 {
962 return typeid(*this).name();
963 }
964
965 ostream& operator<<(ostream& out, const RGWCoroutine& cr)
966 {
967 out << "cr:s=" << (void *)cr.get_stack() << ":op=" << (void *)&cr << ":" << typeid(cr).name();
968 return out;
969 }
970
971 bool RGWCoroutine::drain_children(int num_cr_left,
972 RGWCoroutinesStack *skip_stack,
973 std::optional<std::function<void(uint64_t stack_id, int ret)> > cb)
974 {
975 bool done = false;
976 ceph_assert(num_cr_left >= 0);
977 if (num_cr_left == 0 && skip_stack) {
978 num_cr_left = 1;
979 }
980 reenter(&drain_status.cr) {
981 while (num_spawned() > (size_t)num_cr_left) {
982 yield wait_for_child();
983 int ret;
984 uint64_t stack_id;
985 bool again = false;
986 do {
987 again = collect(&ret, skip_stack, &stack_id);
988 if (ret < 0) {
989 ldout(cct, 10) << "collect() returned ret=" << ret << dendl;
990 /* we should have reported this error */
991 log_error() << "ERROR: collect() returned error (ret=" << ret << ")";
992 }
993 if (cb) {
994 (*cb)(stack_id, ret);
995 }
996 } while (again);
997 }
998 done = true;
999 }
1000 return done;
1001 }
1002
1003 bool RGWCoroutine::drain_children(int num_cr_left,
1004 std::optional<std::function<int(uint64_t stack_id, int ret)> > cb)
1005 {
1006 bool done = false;
1007 ceph_assert(num_cr_left >= 0);
1008
1009 reenter(&drain_status.cr) {
1010 while (num_spawned() > (size_t)num_cr_left) {
1011 yield wait_for_child();
1012 int ret;
1013 uint64_t stack_id;
1014 bool again = false;
1015 do {
1016 again = collect(&ret, nullptr, &stack_id);
1017 if (ret < 0) {
1018 ldout(cct, 10) << "collect() returned ret=" << ret << dendl;
1019 /* we should have reported this error */
1020 log_error() << "ERROR: collect() returned error (ret=" << ret << ")";
1021 }
1022 if (cb && !drain_status.should_exit) {
1023 int r = (*cb)(stack_id, ret);
1024 if (r < 0) {
1025 drain_status.ret = r;
1026 drain_status.should_exit = true;
1027 num_cr_left = 0; /* need to drain all */
1028 }
1029 }
1030 } while (again);
1031 }
1032 done = true;
1033 }
1034 return done;
1035 }
1036
1037 void RGWCoroutine::wakeup()
1038 {
1039 stack->wakeup();
1040 }
1041
1042 RGWCoroutinesEnv *RGWCoroutine::get_env() const
1043 {
1044 return stack->get_env();
1045 }
1046
1047 void RGWCoroutine::dump(Formatter *f) const {
1048 if (!description.str().empty()) {
1049 encode_json("description", description.str(), f);
1050 }
1051 encode_json("type", to_str(), f);
1052 if (!spawned.entries.empty()) {
1053 f->open_array_section("spawned");
1054 for (auto& i : spawned.entries) {
1055 char buf[32];
1056 snprintf(buf, sizeof(buf), "%p", (void *)i);
1057 encode_json("stack", string(buf), f);
1058 }
1059 f->close_section();
1060 }
1061 if (!status.history.empty()) {
1062 encode_json("history", status.history, f);
1063 }
1064
1065 if (!status.status.str().empty()) {
1066 f->open_object_section("status");
1067 encode_json("status", status.status.str(), f);
1068 encode_json("timestamp", status.timestamp, f);
1069 f->close_section();
1070 }
1071 }
1072
1073 RGWSimpleCoroutine::~RGWSimpleCoroutine()
1074 {
1075 if (!called_cleanup) {
1076 request_cleanup();
1077 }
1078 }
1079
1080 void RGWSimpleCoroutine::call_cleanup()
1081 {
1082 called_cleanup = true;
1083 request_cleanup();
1084 }
1085
1086 int RGWSimpleCoroutine::operate(const DoutPrefixProvider *dpp)
1087 {
1088 int ret = 0;
1089 reenter(this) {
1090 yield return state_init();
1091 yield return state_send_request(dpp);
1092 yield return state_request_complete();
1093 yield return state_all_complete();
1094 drain_all();
1095 call_cleanup();
1096 return set_state(RGWCoroutine_Done, ret);
1097 }
1098 return 0;
1099 }
1100
1101 int RGWSimpleCoroutine::state_init()
1102 {
1103 int ret = init();
1104 if (ret < 0) {
1105 call_cleanup();
1106 return set_state(RGWCoroutine_Error, ret);
1107 }
1108 return 0;
1109 }
1110
1111 int RGWSimpleCoroutine::state_send_request(const DoutPrefixProvider *dpp)
1112 {
1113 int ret = send_request(dpp);
1114 if (ret < 0) {
1115 call_cleanup();
1116 return set_state(RGWCoroutine_Error, ret);
1117 }
1118 return io_block(0);
1119 }
1120
1121 int RGWSimpleCoroutine::state_request_complete()
1122 {
1123 int ret = request_complete();
1124 if (ret < 0) {
1125 call_cleanup();
1126 return set_state(RGWCoroutine_Error, ret);
1127 }
1128 return 0;
1129 }
1130
1131 int RGWSimpleCoroutine::state_all_complete()
1132 {
1133 int ret = finish();
1134 if (ret < 0) {
1135 call_cleanup();
1136 return set_state(RGWCoroutine_Error, ret);
1137 }
1138 return 0;
1139 }
1140
1141