]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_coroutine.cc
import ceph pacific 16.2.5
[ceph.git] / ceph / src / rgw / rgw_coroutine.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include "include/Context.h"
5 #include "common/ceph_json.h"
6 #include "rgw_coroutine.h"
7
8 // re-include our assert to clobber the system one; fix dout:
9 #include "include/ceph_assert.h"
10
11 #include <boost/asio/yield.hpp>
12
13 #define dout_subsys ceph_subsys_rgw
14 #define dout_context g_ceph_context
15
16
17 class RGWCompletionManager::WaitContext : public Context {
18 RGWCompletionManager *manager;
19 void *opaque;
20 public:
21 WaitContext(RGWCompletionManager *_cm, void *_opaque) : manager(_cm), opaque(_opaque) {}
22 void finish(int r) override {
23 manager->_wakeup(opaque);
24 }
25 };
26
27 RGWCompletionManager::RGWCompletionManager(CephContext *_cct) : cct(_cct),
28 timer(cct, lock)
29 {
30 timer.init();
31 }
32
33 RGWCompletionManager::~RGWCompletionManager()
34 {
35 std::lock_guard l{lock};
36 timer.cancel_all_events();
37 timer.shutdown();
38 }
39
40 void RGWCompletionManager::complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info)
41 {
42 std::lock_guard l{lock};
43 _complete(cn, io_id, user_info);
44 }
45
46 void RGWCompletionManager::register_completion_notifier(RGWAioCompletionNotifier *cn)
47 {
48 std::lock_guard l{lock};
49 if (cn) {
50 cns.insert(cn);
51 }
52 }
53
54 void RGWCompletionManager::unregister_completion_notifier(RGWAioCompletionNotifier *cn)
55 {
56 std::lock_guard l{lock};
57 if (cn) {
58 cns.erase(cn);
59 }
60 }
61
62 void RGWCompletionManager::_complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info)
63 {
64 if (cn) {
65 cns.erase(cn);
66 }
67
68 if (complete_reqs_set.find(io_id) != complete_reqs_set.end()) {
69 /* already have completion for this io_id, don't allow multiple completions for it */
70 return;
71 }
72 complete_reqs.push_back(io_completion{io_id, user_info});
73 cond.notify_all();
74 }
75
76 int RGWCompletionManager::get_next(io_completion *io)
77 {
78 std::unique_lock l{lock};
79 while (complete_reqs.empty()) {
80 if (going_down) {
81 return -ECANCELED;
82 }
83 cond.wait(l);
84 }
85 *io = complete_reqs.front();
86 complete_reqs_set.erase(io->io_id);
87 complete_reqs.pop_front();
88 return 0;
89 }
90
91 bool RGWCompletionManager::try_get_next(io_completion *io)
92 {
93 std::lock_guard l{lock};
94 if (complete_reqs.empty()) {
95 return false;
96 }
97 *io = complete_reqs.front();
98 complete_reqs_set.erase(io->io_id);
99 complete_reqs.pop_front();
100 return true;
101 }
102
103 void RGWCompletionManager::go_down()
104 {
105 std::lock_guard l{lock};
106 for (auto cn : cns) {
107 cn->unregister();
108 }
109 going_down = true;
110 cond.notify_all();
111 }
112
113 void RGWCompletionManager::wait_interval(void *opaque, const utime_t& interval, void *user_info)
114 {
115 std::lock_guard l{lock};
116 ceph_assert(waiters.find(opaque) == waiters.end());
117 waiters[opaque] = user_info;
118 timer.add_event_after(interval, new WaitContext(this, opaque));
119 }
120
121 void RGWCompletionManager::wakeup(void *opaque)
122 {
123 std::lock_guard l{lock};
124 _wakeup(opaque);
125 }
126
127 void RGWCompletionManager::_wakeup(void *opaque)
128 {
129 map<void *, void *>::iterator iter = waiters.find(opaque);
130 if (iter != waiters.end()) {
131 void *user_id = iter->second;
132 waiters.erase(iter);
133 _complete(NULL, rgw_io_id{0, -1} /* no IO id */, user_id);
134 }
135 }
136
137 RGWCoroutine::~RGWCoroutine() {
138 for (auto stack : spawned.entries) {
139 stack->put();
140 }
141 }
142
143 void RGWCoroutine::init_new_io(RGWIOProvider *io_provider)
144 {
145 stack->init_new_io(io_provider);
146 }
147
148 void RGWCoroutine::set_io_blocked(bool flag) {
149 stack->set_io_blocked(flag);
150 }
151
152 void RGWCoroutine::set_sleeping(bool flag) {
153 stack->set_sleeping(flag);
154 }
155
156 int RGWCoroutine::io_block(int ret, int64_t io_id) {
157 return io_block(ret, rgw_io_id{io_id, -1});
158 }
159
160 int RGWCoroutine::io_block(int ret, const rgw_io_id& io_id) {
161 if (stack->consume_io_finish(io_id)) {
162 return 0;
163 }
164 set_io_blocked(true);
165 stack->set_io_blocked_id(io_id);
166 return ret;
167 }
168
169 void RGWCoroutine::io_complete(const rgw_io_id& io_id) {
170 stack->io_complete(io_id);
171 }
172
173 void RGWCoroutine::StatusItem::dump(Formatter *f) const {
174 ::encode_json("timestamp", timestamp, f);
175 ::encode_json("status", status, f);
176 }
177
178 stringstream& RGWCoroutine::Status::set_status()
179 {
180 std::unique_lock l{lock};
181 string s = status.str();
182 status.str(string());
183 if (!timestamp.is_zero()) {
184 history.push_back(StatusItem(timestamp, s));
185 }
186 if (history.size() > (size_t)max_history) {
187 history.pop_front();
188 }
189 timestamp = ceph_clock_now();
190
191 return status;
192 }
193
194 int64_t RGWCoroutinesManager::get_next_io_id()
195 {
196 return (int64_t)++max_io_id;
197 }
198
199 uint64_t RGWCoroutinesManager::get_next_stack_id() {
200 return (uint64_t)++max_stack_id;
201 }
202
203 RGWCoroutinesStack::RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start) : cct(_cct), ops_mgr(_ops_mgr),
204 done_flag(false), error_flag(false), blocked_flag(false),
205 sleep_flag(false), interval_wait_flag(false), is_scheduled(false), is_waiting_for_child(false),
206 retcode(0), run_count(0),
207 env(NULL), parent(NULL)
208 {
209 id = ops_mgr->get_next_stack_id();
210 if (start) {
211 ops.push_back(start);
212 }
213 pos = ops.begin();
214 }
215
216 RGWCoroutinesStack::~RGWCoroutinesStack()
217 {
218 for (auto op : ops) {
219 op->put();
220 }
221
222 for (auto stack : spawned.entries) {
223 stack->put();
224 }
225
226 if (preallocated_stack) {
227 preallocated_stack->put();
228 }
229 }
230
231 int RGWCoroutinesStack::operate(const DoutPrefixProvider *dpp, RGWCoroutinesEnv *_env)
232 {
233 env = _env;
234 RGWCoroutine *op = *pos;
235 op->stack = this;
236 ldpp_dout(dpp, 20) << *op << ": operate()" << dendl;
237 int r = op->operate_wrapper(dpp);
238 if (r < 0) {
239 ldpp_dout(dpp, 20) << *op << ": operate() returned r=" << r << dendl;
240 }
241
242 error_flag = op->is_error();
243
244 if (op->is_done()) {
245 int op_retcode = r;
246 r = unwind(op_retcode);
247 op->put();
248 done_flag = (pos == ops.end());
249 blocked_flag &= !done_flag;
250 if (done_flag) {
251 retcode = op_retcode;
252 }
253 return r;
254 }
255
256 /* should r ever be negative at this point? */
257 ceph_assert(r >= 0);
258
259 return 0;
260 }
261
262 string RGWCoroutinesStack::error_str()
263 {
264 if (pos != ops.end()) {
265 return (*pos)->error_str();
266 }
267 return string();
268 }
269
270 void RGWCoroutinesStack::call(RGWCoroutine *next_op) {
271 if (!next_op) {
272 return;
273 }
274 ops.push_back(next_op);
275 if (pos != ops.end()) {
276 ++pos;
277 } else {
278 pos = ops.begin();
279 }
280 }
281
282 void RGWCoroutinesStack::schedule()
283 {
284 env->manager->schedule(env, this);
285 }
286
287 void RGWCoroutinesStack::_schedule()
288 {
289 env->manager->_schedule(env, this);
290 }
291
292 RGWCoroutinesStack *RGWCoroutinesStack::spawn(RGWCoroutine *source_op, RGWCoroutine *op, bool wait)
293 {
294 if (!op) {
295 return NULL;
296 }
297
298 rgw_spawned_stacks *s = (source_op ? &source_op->spawned : &spawned);
299
300 RGWCoroutinesStack *stack = preallocated_stack;
301 if (!stack) {
302 stack = env->manager->allocate_stack();
303 }
304 preallocated_stack = nullptr;
305
306 s->add_pending(stack);
307 stack->parent = this;
308
309 stack->get(); /* we'll need to collect the stack */
310 stack->call(op);
311
312 env->manager->schedule(env, stack);
313
314 if (wait) {
315 set_blocked_by(stack);
316 }
317
318 return stack;
319 }
320
321 RGWCoroutinesStack *RGWCoroutinesStack::spawn(RGWCoroutine *op, bool wait)
322 {
323 return spawn(NULL, op, wait);
324 }
325
326 RGWCoroutinesStack *RGWCoroutinesStack::prealloc_stack()
327 {
328 if (!preallocated_stack) {
329 preallocated_stack = env->manager->allocate_stack();
330 }
331 return preallocated_stack;
332 }
333
334 int RGWCoroutinesStack::wait(const utime_t& interval)
335 {
336 RGWCompletionManager *completion_mgr = env->manager->get_completion_mgr();
337 completion_mgr->wait_interval((void *)this, interval, (void *)this);
338 set_io_blocked(true);
339 set_interval_wait(true);
340 return 0;
341 }
342
343 void RGWCoroutinesStack::wakeup()
344 {
345 RGWCompletionManager *completion_mgr = env->manager->get_completion_mgr();
346 completion_mgr->wakeup((void *)this);
347 }
348
349 void RGWCoroutinesStack::io_complete(const rgw_io_id& io_id)
350 {
351 RGWCompletionManager *completion_mgr = env->manager->get_completion_mgr();
352 completion_mgr->complete(nullptr, io_id, (void *)this);
353 }
354
355 int RGWCoroutinesStack::unwind(int retcode)
356 {
357 rgw_spawned_stacks *src_spawned = &(*pos)->spawned;
358
359 if (pos == ops.begin()) {
360 ldout(cct, 15) << "stack " << (void *)this << " end" << dendl;
361 spawned.inherit(src_spawned);
362 ops.clear();
363 pos = ops.end();
364 return retcode;
365 }
366
367 --pos;
368 ops.pop_back();
369 RGWCoroutine *op = *pos;
370 op->set_retcode(retcode);
371 op->spawned.inherit(src_spawned);
372 return 0;
373 }
374
375 void RGWCoroutinesStack::cancel()
376 {
377 while (!ops.empty()) {
378 RGWCoroutine *op = *pos;
379 unwind(-ECANCELED);
380 op->put();
381 }
382 put();
383 }
384
385 bool RGWCoroutinesStack::collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id) /* returns true if needs to be called again */
386 {
387 bool need_retry = false;
388 rgw_spawned_stacks *s = (op ? &op->spawned : &spawned);
389 *ret = 0;
390 vector<RGWCoroutinesStack *> new_list;
391
392 for (vector<RGWCoroutinesStack *>::iterator iter = s->entries.begin(); iter != s->entries.end(); ++iter) {
393 RGWCoroutinesStack *stack = *iter;
394 if (stack == skip_stack || !stack->is_done()) {
395 new_list.push_back(stack);
396 if (!stack->is_done()) {
397 ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " is still running" << dendl;
398 } else if (stack == skip_stack) {
399 ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " explicitly skipping stack" << dendl;
400 }
401 continue;
402 }
403 if (stack_id) {
404 *stack_id = stack->get_id();
405 }
406 int r = stack->get_ret_status();
407 stack->put();
408 if (r < 0) {
409 *ret = r;
410 ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " encountered error (r=" << r << "), skipping next stacks" << dendl;
411 new_list.insert(new_list.end(), ++iter, s->entries.end());
412 need_retry = (iter != s->entries.end());
413 break;
414 }
415
416 ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " is complete" << dendl;
417 }
418
419 s->entries.swap(new_list);
420 return need_retry;
421 }
422
423 bool RGWCoroutinesStack::collect_next(RGWCoroutine *op, int *ret, RGWCoroutinesStack **collected_stack) /* returns true if found a stack to collect */
424 {
425 rgw_spawned_stacks *s = (op ? &op->spawned : &spawned);
426 *ret = 0;
427
428 if (collected_stack) {
429 *collected_stack = NULL;
430 }
431
432 for (vector<RGWCoroutinesStack *>::iterator iter = s->entries.begin(); iter != s->entries.end(); ++iter) {
433 RGWCoroutinesStack *stack = *iter;
434 if (!stack->is_done()) {
435 continue;
436 }
437 int r = stack->get_ret_status();
438 if (r < 0) {
439 *ret = r;
440 }
441
442 if (collected_stack) {
443 *collected_stack = stack;
444 }
445 stack->put();
446
447 s->entries.erase(iter);
448 return true;
449 }
450
451 return false;
452 }
453
454 bool RGWCoroutinesStack::collect(int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id) /* returns true if needs to be called again */
455 {
456 return collect(NULL, ret, skip_stack, stack_id);
457 }
458
459 static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg)
460 {
461 (static_cast<RGWAioCompletionNotifier *>(arg))->cb();
462 }
463
464 RGWAioCompletionNotifier::RGWAioCompletionNotifier(RGWCompletionManager *_mgr, const rgw_io_id& _io_id, void *_user_data) : completion_mgr(_mgr),
465 io_id(_io_id),
466 user_data(_user_data), registered(true) {
467 c = librados::Rados::aio_create_completion(this, _aio_completion_notifier_cb);
468 }
469
470 RGWAioCompletionNotifier *RGWCoroutinesStack::create_completion_notifier()
471 {
472 return ops_mgr->create_completion_notifier(this);
473 }
474
475 RGWCompletionManager *RGWCoroutinesStack::get_completion_mgr()
476 {
477 return ops_mgr->get_completion_mgr();
478 }
479
480 bool RGWCoroutinesStack::unblock_stack(RGWCoroutinesStack **s)
481 {
482 if (blocking_stacks.empty()) {
483 return false;
484 }
485
486 set<RGWCoroutinesStack *>::iterator iter = blocking_stacks.begin();
487 *s = *iter;
488 blocking_stacks.erase(iter);
489 (*s)->blocked_by_stack.erase(this);
490
491 return true;
492 }
493
494 void RGWCoroutinesManager::report_error(RGWCoroutinesStack *op)
495 {
496 if (!op) {
497 return;
498 }
499 string err = op->error_str();
500 if (err.empty()) {
501 return;
502 }
503 lderr(cct) << "ERROR: failed operation: " << op->error_str() << dendl;
504 }
505
506 void RGWCoroutinesStack::dump(Formatter *f) const {
507 stringstream ss;
508 ss << (void *)this;
509 ::encode_json("stack", ss.str(), f);
510 ::encode_json("run_count", run_count, f);
511 f->open_array_section("ops");
512 for (auto& i : ops) {
513 encode_json("op", *i, f);
514 }
515 f->close_section();
516 }
517
518 void RGWCoroutinesStack::init_new_io(RGWIOProvider *io_provider)
519 {
520 io_provider->set_io_user_info((void *)this);
521 io_provider->assign_io(env->manager->get_io_id_provider());
522 }
523
524 bool RGWCoroutinesStack::try_io_unblock(const rgw_io_id& io_id)
525 {
526 if (!can_io_unblock(io_id)) {
527 auto p = io_finish_ids.emplace(io_id.id, io_id);
528 auto& iter = p.first;
529 bool inserted = p.second;
530 if (!inserted) { /* could not insert, entry already existed, add channel to completion mask */
531 iter->second.channels |= io_id.channels;
532 }
533 return false;
534 }
535
536 return true;
537 }
538
539 bool RGWCoroutinesStack::consume_io_finish(const rgw_io_id& io_id)
540 {
541 auto iter = io_finish_ids.find(io_id.id);
542 if (iter == io_finish_ids.end()) {
543 return false;
544 }
545 int finish_mask = iter->second.channels;
546 bool found = (finish_mask & io_id.channels) != 0;
547
548 finish_mask &= ~(finish_mask & io_id.channels);
549
550 if (finish_mask == 0) {
551 io_finish_ids.erase(iter);
552 }
553 return found;
554 }
555
556
557 void RGWCoroutinesManager::handle_unblocked_stack(set<RGWCoroutinesStack *>& context_stacks, list<RGWCoroutinesStack *>& scheduled_stacks,
558 RGWCompletionManager::io_completion& io, int *blocked_count)
559 {
560 ceph_assert(ceph_mutex_is_wlocked(lock));
561 RGWCoroutinesStack *stack = static_cast<RGWCoroutinesStack *>(io.user_info);
562 if (context_stacks.find(stack) == context_stacks.end()) {
563 return;
564 }
565 if (!stack->try_io_unblock(io.io_id)) {
566 return;
567 }
568 if (stack->is_io_blocked()) {
569 --(*blocked_count);
570 stack->set_io_blocked(false);
571 }
572 stack->set_interval_wait(false);
573 if (!stack->is_done()) {
574 if (!stack->is_scheduled) {
575 scheduled_stacks.push_back(stack);
576 stack->set_is_scheduled(true);
577 }
578 } else {
579 context_stacks.erase(stack);
580 stack->put();
581 }
582 }
583
584 void RGWCoroutinesManager::schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack)
585 {
586 std::unique_lock wl{lock};
587 _schedule(env, stack);
588 }
589
590 void RGWCoroutinesManager::_schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack)
591 {
592 ceph_assert(ceph_mutex_is_wlocked(lock));
593 if (!stack->is_scheduled) {
594 env->scheduled_stacks->push_back(stack);
595 stack->set_is_scheduled(true);
596 }
597 set<RGWCoroutinesStack *>& context_stacks = run_contexts[env->run_context];
598 context_stacks.insert(stack);
599 }
600
601 void RGWCoroutinesManager::set_sleeping(RGWCoroutine *cr, bool flag)
602 {
603 cr->set_sleeping(flag);
604 }
605
606 void RGWCoroutinesManager::io_complete(RGWCoroutine *cr, const rgw_io_id& io_id)
607 {
608 cr->io_complete(io_id);
609 }
610
611 int RGWCoroutinesManager::run(const DoutPrefixProvider *dpp, list<RGWCoroutinesStack *>& stacks)
612 {
613 int ret = 0;
614 int blocked_count = 0;
615 int interval_wait_count = 0;
616 bool canceled = false; // set on going_down
617 RGWCoroutinesEnv env;
618 bool op_not_blocked;
619
620 uint64_t run_context = ++run_context_count;
621
622 lock.lock();
623 set<RGWCoroutinesStack *>& context_stacks = run_contexts[run_context];
624 list<RGWCoroutinesStack *> scheduled_stacks;
625 for (auto& st : stacks) {
626 context_stacks.insert(st);
627 scheduled_stacks.push_back(st);
628 st->set_is_scheduled(true);
629 }
630 env.run_context = run_context;
631 env.manager = this;
632 env.scheduled_stacks = &scheduled_stacks;
633
634 for (list<RGWCoroutinesStack *>::iterator iter = scheduled_stacks.begin(); iter != scheduled_stacks.end() && !going_down;) {
635 RGWCompletionManager::io_completion io;
636 RGWCoroutinesStack *stack = *iter;
637 ++iter;
638 scheduled_stacks.pop_front();
639
640 if (context_stacks.find(stack) == context_stacks.end()) {
641 /* stack was probably schedule more than once due to IO, but was since complete */
642 goto next;
643 }
644 env.stack = stack;
645
646 lock.unlock();
647
648 ret = stack->operate(dpp, &env);
649
650 lock.lock();
651
652 stack->set_is_scheduled(false);
653 if (ret < 0) {
654 ldpp_dout(dpp, 20) << "stack->operate() returned ret=" << ret << dendl;
655 }
656
657 if (stack->is_error()) {
658 report_error(stack);
659 }
660
661 op_not_blocked = false;
662
663 if (stack->is_io_blocked()) {
664 ldout(cct, 20) << __func__ << ":" << " stack=" << (void *)stack << " is io blocked" << dendl;
665 if (stack->is_interval_waiting()) {
666 interval_wait_count++;
667 }
668 blocked_count++;
669 } else if (stack->is_blocked()) {
670 /* do nothing, we'll re-add the stack when the blocking stack is done,
671 * or when we're awaken
672 */
673 ldout(cct, 20) << __func__ << ":" << " stack=" << (void *)stack << " is_blocked_by_stack()=" << stack->is_blocked_by_stack()
674 << " is_sleeping=" << stack->is_sleeping() << " waiting_for_child()=" << stack->waiting_for_child() << dendl;
675 } else if (stack->is_done()) {
676 ldout(cct, 20) << __func__ << ":" << " stack=" << (void *)stack << " is done" << dendl;
677 RGWCoroutinesStack *s;
678 while (stack->unblock_stack(&s)) {
679 if (!s->is_blocked_by_stack() && !s->is_done()) {
680 if (s->is_io_blocked()) {
681 if (stack->is_interval_waiting()) {
682 interval_wait_count++;
683 }
684 blocked_count++;
685 } else {
686 s->_schedule();
687 }
688 }
689 }
690 if (stack->parent && stack->parent->waiting_for_child()) {
691 stack->parent->set_wait_for_child(false);
692 stack->parent->_schedule();
693 }
694 context_stacks.erase(stack);
695 stack->put();
696 stack = NULL;
697 } else {
698 op_not_blocked = true;
699 stack->run_count++;
700 stack->_schedule();
701 }
702
703 if (!op_not_blocked && stack) {
704 stack->run_count = 0;
705 }
706
707 while (completion_mgr->try_get_next(&io)) {
708 handle_unblocked_stack(context_stacks, scheduled_stacks, io, &blocked_count);
709 }
710
711 /*
712 * only account blocked operations that are not in interval_wait, these are stacks that
713 * were put on a wait without any real IO operations. While we mark these as io_blocked,
714 * these aren't really waiting for IOs
715 */
716 while (blocked_count - interval_wait_count >= ops_window) {
717 lock.unlock();
718 ret = completion_mgr->get_next(&io);
719 lock.lock();
720 if (ret < 0) {
721 ldout(cct, 5) << "completion_mgr.get_next() returned ret=" << ret << dendl;
722 }
723 handle_unblocked_stack(context_stacks, scheduled_stacks, io, &blocked_count);
724 }
725
726 next:
727 while (scheduled_stacks.empty() && blocked_count > 0) {
728 lock.unlock();
729 ret = completion_mgr->get_next(&io);
730 lock.lock();
731 if (ret < 0) {
732 ldout(cct, 5) << "completion_mgr.get_next() returned ret=" << ret << dendl;
733 }
734 if (going_down) {
735 ldout(cct, 5) << __func__ << "(): was stopped, exiting" << dendl;
736 ret = -ECANCELED;
737 canceled = true;
738 break;
739 }
740 handle_unblocked_stack(context_stacks, scheduled_stacks, io, &blocked_count);
741 iter = scheduled_stacks.begin();
742 }
743 if (canceled) {
744 break;
745 }
746
747 if (iter == scheduled_stacks.end()) {
748 iter = scheduled_stacks.begin();
749 }
750 }
751
752 if (!context_stacks.empty() && !going_down) {
753 JSONFormatter formatter(true);
754 formatter.open_array_section("context_stacks");
755 for (auto& s : context_stacks) {
756 ::encode_json("entry", *s, &formatter);
757 }
758 formatter.close_section();
759 lderr(cct) << __func__ << "(): ERROR: deadlock detected, dumping remaining coroutines:\n";
760 formatter.flush(*_dout);
761 *_dout << dendl;
762 ceph_assert(context_stacks.empty() || going_down); // assert on deadlock
763 }
764
765 for (auto stack : context_stacks) {
766 ldout(cct, 20) << "clearing stack on run() exit: stack=" << (void *)stack << " nref=" << stack->get_nref() << dendl;
767 stack->cancel();
768 }
769 run_contexts.erase(run_context);
770 lock.unlock();
771
772 return ret;
773 }
774
775 int RGWCoroutinesManager::run(const DoutPrefixProvider *dpp, RGWCoroutine *op)
776 {
777 if (!op) {
778 return 0;
779 }
780 list<RGWCoroutinesStack *> stacks;
781 RGWCoroutinesStack *stack = allocate_stack();
782 op->get();
783 stack->call(op);
784
785 stacks.push_back(stack);
786
787 int r = run(dpp, stacks);
788 if (r < 0) {
789 ldpp_dout(dpp, 20) << "run(stacks) returned r=" << r << dendl;
790 } else {
791 r = op->get_ret_status();
792 }
793 op->put();
794
795 return r;
796 }
797
798 RGWAioCompletionNotifier *RGWCoroutinesManager::create_completion_notifier(RGWCoroutinesStack *stack)
799 {
800 rgw_io_id io_id{get_next_io_id(), -1};
801 RGWAioCompletionNotifier *cn = new RGWAioCompletionNotifier(completion_mgr, io_id, (void *)stack);
802 completion_mgr->register_completion_notifier(cn);
803 return cn;
804 }
805
806 void RGWCoroutinesManager::dump(Formatter *f) const {
807 std::shared_lock rl{lock};
808
809 f->open_array_section("run_contexts");
810 for (auto& i : run_contexts) {
811 f->open_object_section("context");
812 ::encode_json("id", i.first, f);
813 f->open_array_section("entries");
814 for (auto& s : i.second) {
815 ::encode_json("entry", *s, f);
816 }
817 f->close_section();
818 f->close_section();
819 }
820 f->close_section();
821 }
822
823 RGWCoroutinesStack *RGWCoroutinesManager::allocate_stack() {
824 return new RGWCoroutinesStack(cct, this);
825 }
826
827 string RGWCoroutinesManager::get_id()
828 {
829 if (!id.empty()) {
830 return id;
831 }
832 stringstream ss;
833 ss << (void *)this;
834 return ss.str();
835 }
836
837 void RGWCoroutinesManagerRegistry::add(RGWCoroutinesManager *mgr)
838 {
839 std::unique_lock wl{lock};
840 if (managers.find(mgr) == managers.end()) {
841 managers.insert(mgr);
842 get();
843 }
844 }
845
846 void RGWCoroutinesManagerRegistry::remove(RGWCoroutinesManager *mgr)
847 {
848 std::unique_lock wl{lock};
849 if (managers.find(mgr) != managers.end()) {
850 managers.erase(mgr);
851 put();
852 }
853 }
854
855 RGWCoroutinesManagerRegistry::~RGWCoroutinesManagerRegistry()
856 {
857 AdminSocket *admin_socket = cct->get_admin_socket();
858 if (!admin_command.empty()) {
859 admin_socket->unregister_commands(this);
860 }
861 }
862
863 int RGWCoroutinesManagerRegistry::hook_to_admin_command(const string& command)
864 {
865 AdminSocket *admin_socket = cct->get_admin_socket();
866 if (!admin_command.empty()) {
867 admin_socket->unregister_commands(this);
868 }
869 admin_command = command;
870 int r = admin_socket->register_command(admin_command, this,
871 "dump current coroutines stack state");
872 if (r < 0) {
873 lderr(cct) << "ERROR: fail to register admin socket command (r=" << r << ")" << dendl;
874 return r;
875 }
876 return 0;
877 }
878
879 int RGWCoroutinesManagerRegistry::call(std::string_view command,
880 const cmdmap_t& cmdmap,
881 Formatter *f,
882 std::ostream& ss,
883 bufferlist& out) {
884 std::shared_lock rl{lock};
885 ::encode_json("cr_managers", *this, f);
886 return 0;
887 }
888
889 void RGWCoroutinesManagerRegistry::dump(Formatter *f) const {
890 f->open_array_section("coroutine_managers");
891 for (auto m : managers) {
892 ::encode_json("entry", *m, f);
893 }
894 f->close_section();
895 }
896
897 void RGWCoroutine::call(RGWCoroutine *op)
898 {
899 if (op) {
900 stack->call(op);
901 } else {
902 // the call()er expects this to set a retcode
903 retcode = 0;
904 }
905 }
906
907 RGWCoroutinesStack *RGWCoroutine::spawn(RGWCoroutine *op, bool wait)
908 {
909 return stack->spawn(this, op, wait);
910 }
911
912 RGWCoroutinesStack *RGWCoroutine::prealloc_stack()
913 {
914 return stack->prealloc_stack();
915 }
916
917 uint64_t RGWCoroutine::prealloc_stack_id()
918 {
919 return prealloc_stack()->get_id();
920 }
921
922 bool RGWCoroutine::collect(int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id) /* returns true if needs to be called again */
923 {
924 return stack->collect(this, ret, skip_stack, stack_id);
925 }
926
927 bool RGWCoroutine::collect_next(int *ret, RGWCoroutinesStack **collected_stack) /* returns true if found a stack to collect */
928 {
929 return stack->collect_next(this, ret, collected_stack);
930 }
931
932 int RGWCoroutine::wait(const utime_t& interval)
933 {
934 return stack->wait(interval);
935 }
936
937 void RGWCoroutine::wait_for_child()
938 {
939 /* should only wait for child if there is a child that is not done yet, and no complete children */
940 if (spawned.entries.empty()) {
941 return;
942 }
943 for (vector<RGWCoroutinesStack *>::iterator iter = spawned.entries.begin(); iter != spawned.entries.end(); ++iter) {
944 if ((*iter)->is_done()) {
945 return;
946 }
947 }
948 stack->set_wait_for_child(true);
949 }
950
951 string RGWCoroutine::to_str() const
952 {
953 return typeid(*this).name();
954 }
955
956 ostream& operator<<(ostream& out, const RGWCoroutine& cr)
957 {
958 out << "cr:s=" << (void *)cr.get_stack() << ":op=" << (void *)&cr << ":" << typeid(cr).name();
959 return out;
960 }
961
962 bool RGWCoroutine::drain_children(int num_cr_left,
963 RGWCoroutinesStack *skip_stack,
964 std::optional<std::function<void(uint64_t stack_id, int ret)> > cb)
965 {
966 bool done = false;
967 ceph_assert(num_cr_left >= 0);
968 if (num_cr_left == 0 && skip_stack) {
969 num_cr_left = 1;
970 }
971 reenter(&drain_status.cr) {
972 while (num_spawned() > (size_t)num_cr_left) {
973 yield wait_for_child();
974 int ret;
975 uint64_t stack_id;
976 bool again = false;
977 do {
978 again = collect(&ret, skip_stack, &stack_id);
979 if (ret < 0) {
980 ldout(cct, 10) << "collect() returned ret=" << ret << dendl;
981 /* we should have reported this error */
982 log_error() << "ERROR: collect() returned error (ret=" << ret << ")";
983 }
984 if (cb) {
985 (*cb)(stack_id, ret);
986 }
987 } while (again);
988 }
989 done = true;
990 }
991 return done;
992 }
993
994 bool RGWCoroutine::drain_children(int num_cr_left,
995 std::optional<std::function<int(uint64_t stack_id, int ret)> > cb)
996 {
997 bool done = false;
998 ceph_assert(num_cr_left >= 0);
999
1000 reenter(&drain_status.cr) {
1001 while (num_spawned() > (size_t)num_cr_left) {
1002 yield wait_for_child();
1003 int ret;
1004 uint64_t stack_id;
1005 bool again = false;
1006 do {
1007 again = collect(&ret, nullptr, &stack_id);
1008 if (ret < 0) {
1009 ldout(cct, 10) << "collect() returned ret=" << ret << dendl;
1010 /* we should have reported this error */
1011 log_error() << "ERROR: collect() returned error (ret=" << ret << ")";
1012 }
1013 if (cb && !drain_status.should_exit) {
1014 int r = (*cb)(stack_id, ret);
1015 if (r < 0) {
1016 drain_status.ret = r;
1017 drain_status.should_exit = true;
1018 num_cr_left = 0; /* need to drain all */
1019 }
1020 }
1021 } while (again);
1022 }
1023 done = true;
1024 }
1025 return done;
1026 }
1027
1028 void RGWCoroutine::wakeup()
1029 {
1030 stack->wakeup();
1031 }
1032
1033 RGWCoroutinesEnv *RGWCoroutine::get_env() const
1034 {
1035 return stack->get_env();
1036 }
1037
1038 void RGWCoroutine::dump(Formatter *f) const {
1039 if (!description.str().empty()) {
1040 encode_json("description", description.str(), f);
1041 }
1042 encode_json("type", to_str(), f);
1043 if (!spawned.entries.empty()) {
1044 f->open_array_section("spawned");
1045 for (auto& i : spawned.entries) {
1046 char buf[32];
1047 snprintf(buf, sizeof(buf), "%p", (void *)i);
1048 encode_json("stack", string(buf), f);
1049 }
1050 f->close_section();
1051 }
1052 if (!status.history.empty()) {
1053 encode_json("history", status.history, f);
1054 }
1055
1056 if (!status.status.str().empty()) {
1057 f->open_object_section("status");
1058 encode_json("status", status.status.str(), f);
1059 encode_json("timestamp", status.timestamp, f);
1060 f->close_section();
1061 }
1062 }
1063
1064 RGWSimpleCoroutine::~RGWSimpleCoroutine()
1065 {
1066 if (!called_cleanup) {
1067 request_cleanup();
1068 }
1069 }
1070
1071 void RGWSimpleCoroutine::call_cleanup()
1072 {
1073 called_cleanup = true;
1074 request_cleanup();
1075 }
1076
1077 int RGWSimpleCoroutine::operate(const DoutPrefixProvider *dpp)
1078 {
1079 int ret = 0;
1080 reenter(this) {
1081 yield return state_init();
1082 yield return state_send_request(dpp);
1083 yield return state_request_complete();
1084 yield return state_all_complete();
1085 drain_all();
1086 call_cleanup();
1087 return set_state(RGWCoroutine_Done, ret);
1088 }
1089 return 0;
1090 }
1091
1092 int RGWSimpleCoroutine::state_init()
1093 {
1094 int ret = init();
1095 if (ret < 0) {
1096 call_cleanup();
1097 return set_state(RGWCoroutine_Error, ret);
1098 }
1099 return 0;
1100 }
1101
1102 int RGWSimpleCoroutine::state_send_request(const DoutPrefixProvider *dpp)
1103 {
1104 int ret = send_request(dpp);
1105 if (ret < 0) {
1106 call_cleanup();
1107 return set_state(RGWCoroutine_Error, ret);
1108 }
1109 return io_block(0);
1110 }
1111
1112 int RGWSimpleCoroutine::state_request_complete()
1113 {
1114 int ret = request_complete();
1115 if (ret < 0) {
1116 call_cleanup();
1117 return set_state(RGWCoroutine_Error, ret);
1118 }
1119 return 0;
1120 }
1121
1122 int RGWSimpleCoroutine::state_all_complete()
1123 {
1124 int ret = finish();
1125 if (ret < 0) {
1126 call_cleanup();
1127 return set_state(RGWCoroutine_Error, ret);
1128 }
1129 return 0;
1130 }
1131
1132