]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_coroutine.cc
import ceph quincy 17.2.1
[ceph.git] / ceph / src / rgw / rgw_coroutine.cc
CommitLineData
11fdf7f2 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
9f95a23c 2// vim: ts=8 sw=2 smarttab ft=cpp
7c673cae 3
9f95a23c 4#include "include/Context.h"
7c673cae 5#include "common/ceph_json.h"
7c673cae 6#include "rgw_coroutine.h"
7c673cae
FG
7
8// re-include our assert to clobber the system one; fix dout:
11fdf7f2 9#include "include/ceph_assert.h"
7c673cae 10
31f18b77
FG
11#include <boost/asio/yield.hpp>
12
7c673cae 13#define dout_subsys ceph_subsys_rgw
11fdf7f2 14#define dout_context g_ceph_context
7c673cae 15
20effc67 16using namespace std;
7c673cae
FG
17
18class RGWCompletionManager::WaitContext : public Context {
19 RGWCompletionManager *manager;
20 void *opaque;
21public:
22 WaitContext(RGWCompletionManager *_cm, void *_opaque) : manager(_cm), opaque(_opaque) {}
23 void finish(int r) override {
24 manager->_wakeup(opaque);
25 }
26};
27
9f95a23c 28RGWCompletionManager::RGWCompletionManager(CephContext *_cct) : cct(_cct),
7c673cae
FG
29 timer(cct, lock)
30{
31 timer.init();
32}
33
34RGWCompletionManager::~RGWCompletionManager()
35{
9f95a23c 36 std::lock_guard l{lock};
7c673cae
FG
37 timer.cancel_all_events();
38 timer.shutdown();
39}
40
11fdf7f2 41void RGWCompletionManager::complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info)
7c673cae 42{
9f95a23c 43 std::lock_guard l{lock};
11fdf7f2 44 _complete(cn, io_id, user_info);
7c673cae
FG
45}
46
47void RGWCompletionManager::register_completion_notifier(RGWAioCompletionNotifier *cn)
48{
9f95a23c 49 std::lock_guard l{lock};
7c673cae
FG
50 if (cn) {
51 cns.insert(cn);
52 }
53}
54
55void RGWCompletionManager::unregister_completion_notifier(RGWAioCompletionNotifier *cn)
56{
9f95a23c 57 std::lock_guard l{lock};
7c673cae
FG
58 if (cn) {
59 cns.erase(cn);
60 }
61}
62
11fdf7f2 63void RGWCompletionManager::_complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info)
7c673cae
FG
64{
65 if (cn) {
66 cns.erase(cn);
67 }
11fdf7f2
TL
68
69 if (complete_reqs_set.find(io_id) != complete_reqs_set.end()) {
70 /* already have completion for this io_id, don't allow multiple completions for it */
71 return;
72 }
73 complete_reqs.push_back(io_completion{io_id, user_info});
9f95a23c 74 cond.notify_all();
7c673cae
FG
75}
76
11fdf7f2 77int RGWCompletionManager::get_next(io_completion *io)
7c673cae 78{
9f95a23c 79 std::unique_lock l{lock};
7c673cae 80 while (complete_reqs.empty()) {
7c673cae
FG
81 if (going_down) {
82 return -ECANCELED;
83 }
9f95a23c 84 cond.wait(l);
7c673cae 85 }
11fdf7f2
TL
86 *io = complete_reqs.front();
87 complete_reqs_set.erase(io->io_id);
7c673cae
FG
88 complete_reqs.pop_front();
89 return 0;
90}
91
11fdf7f2 92bool RGWCompletionManager::try_get_next(io_completion *io)
7c673cae 93{
9f95a23c 94 std::lock_guard l{lock};
7c673cae
FG
95 if (complete_reqs.empty()) {
96 return false;
97 }
11fdf7f2
TL
98 *io = complete_reqs.front();
99 complete_reqs_set.erase(io->io_id);
7c673cae
FG
100 complete_reqs.pop_front();
101 return true;
102}
103
104void RGWCompletionManager::go_down()
105{
9f95a23c 106 std::lock_guard l{lock};
7c673cae
FG
107 for (auto cn : cns) {
108 cn->unregister();
109 }
110 going_down = true;
9f95a23c 111 cond.notify_all();
7c673cae
FG
112}
113
114void RGWCompletionManager::wait_interval(void *opaque, const utime_t& interval, void *user_info)
115{
9f95a23c 116 std::lock_guard l{lock};
11fdf7f2 117 ceph_assert(waiters.find(opaque) == waiters.end());
7c673cae
FG
118 waiters[opaque] = user_info;
119 timer.add_event_after(interval, new WaitContext(this, opaque));
120}
121
122void RGWCompletionManager::wakeup(void *opaque)
123{
9f95a23c 124 std::lock_guard l{lock};
7c673cae
FG
125 _wakeup(opaque);
126}
127
128void RGWCompletionManager::_wakeup(void *opaque)
129{
130 map<void *, void *>::iterator iter = waiters.find(opaque);
131 if (iter != waiters.end()) {
132 void *user_id = iter->second;
133 waiters.erase(iter);
11fdf7f2 134 _complete(NULL, rgw_io_id{0, -1} /* no IO id */, user_id);
7c673cae
FG
135 }
136}
137
138RGWCoroutine::~RGWCoroutine() {
139 for (auto stack : spawned.entries) {
140 stack->put();
141 }
142}
143
11fdf7f2
TL
144void RGWCoroutine::init_new_io(RGWIOProvider *io_provider)
145{
33c7a0ef 146 ceph_assert(stack); // if there's no stack, io_provider won't be uninitialized
11fdf7f2
TL
147 stack->init_new_io(io_provider);
148}
149
7c673cae 150void RGWCoroutine::set_io_blocked(bool flag) {
33c7a0ef
TL
151 if (stack) {
152 stack->set_io_blocked(flag);
153 }
7c673cae
FG
154}
155
156void RGWCoroutine::set_sleeping(bool flag) {
33c7a0ef
TL
157 if (stack) {
158 stack->set_sleeping(flag);
159 }
7c673cae
FG
160}
161
11fdf7f2
TL
162int RGWCoroutine::io_block(int ret, int64_t io_id) {
163 return io_block(ret, rgw_io_id{io_id, -1});
164}
165
166int RGWCoroutine::io_block(int ret, const rgw_io_id& io_id) {
33c7a0ef
TL
167 if (!stack) {
168 return 0;
169 }
11fdf7f2
TL
170 if (stack->consume_io_finish(io_id)) {
171 return 0;
172 }
7c673cae 173 set_io_blocked(true);
11fdf7f2 174 stack->set_io_blocked_id(io_id);
7c673cae
FG
175 return ret;
176}
177
11fdf7f2 178void RGWCoroutine::io_complete(const rgw_io_id& io_id) {
33c7a0ef
TL
179 if (stack) {
180 stack->io_complete(io_id);
181 }
11fdf7f2
TL
182}
183
7c673cae
FG
184void RGWCoroutine::StatusItem::dump(Formatter *f) const {
185 ::encode_json("timestamp", timestamp, f);
186 ::encode_json("status", status, f);
187}
188
189stringstream& RGWCoroutine::Status::set_status()
190{
9f95a23c 191 std::unique_lock l{lock};
7c673cae
FG
192 string s = status.str();
193 status.str(string());
194 if (!timestamp.is_zero()) {
195 history.push_back(StatusItem(timestamp, s));
196 }
197 if (history.size() > (size_t)max_history) {
198 history.pop_front();
199 }
200 timestamp = ceph_clock_now();
201
202 return status;
203}
204
20effc67
TL
205RGWCoroutinesManager::~RGWCoroutinesManager() {
206 stop();
207 completion_mgr->put();
208 if (cr_registry) {
209 cr_registry->remove(this);
210 }
211}
212
11fdf7f2
TL
213int64_t RGWCoroutinesManager::get_next_io_id()
214{
215 return (int64_t)++max_io_id;
216}
217
f67539c2
TL
218uint64_t RGWCoroutinesManager::get_next_stack_id() {
219 return (uint64_t)++max_stack_id;
220}
221
7c673cae
FG
222RGWCoroutinesStack::RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start) : cct(_cct), ops_mgr(_ops_mgr),
223 done_flag(false), error_flag(false), blocked_flag(false),
224 sleep_flag(false), interval_wait_flag(false), is_scheduled(false), is_waiting_for_child(false),
225 retcode(0), run_count(0),
226 env(NULL), parent(NULL)
227{
f67539c2 228 id = ops_mgr->get_next_stack_id();
7c673cae
FG
229 if (start) {
230 ops.push_back(start);
231 }
232 pos = ops.begin();
233}
234
235RGWCoroutinesStack::~RGWCoroutinesStack()
236{
237 for (auto op : ops) {
238 op->put();
239 }
240
241 for (auto stack : spawned.entries) {
242 stack->put();
243 }
f67539c2
TL
244
245 if (preallocated_stack) {
246 preallocated_stack->put();
247 }
7c673cae
FG
248}
249
b3b6e05e 250int RGWCoroutinesStack::operate(const DoutPrefixProvider *dpp, RGWCoroutinesEnv *_env)
7c673cae
FG
251{
252 env = _env;
253 RGWCoroutine *op = *pos;
254 op->stack = this;
b3b6e05e
TL
255 ldpp_dout(dpp, 20) << *op << ": operate()" << dendl;
256 int r = op->operate_wrapper(dpp);
7c673cae 257 if (r < 0) {
b3b6e05e 258 ldpp_dout(dpp, 20) << *op << ": operate() returned r=" << r << dendl;
7c673cae
FG
259 }
260
261 error_flag = op->is_error();
262
263 if (op->is_done()) {
264 int op_retcode = r;
265 r = unwind(op_retcode);
266 op->put();
267 done_flag = (pos == ops.end());
11fdf7f2 268 blocked_flag &= !done_flag;
7c673cae
FG
269 if (done_flag) {
270 retcode = op_retcode;
271 }
272 return r;
273 }
274
275 /* should r ever be negative at this point? */
11fdf7f2 276 ceph_assert(r >= 0);
7c673cae
FG
277
278 return 0;
279}
280
281string RGWCoroutinesStack::error_str()
282{
283 if (pos != ops.end()) {
284 return (*pos)->error_str();
285 }
286 return string();
287}
288
289void RGWCoroutinesStack::call(RGWCoroutine *next_op) {
290 if (!next_op) {
291 return;
292 }
293 ops.push_back(next_op);
294 if (pos != ops.end()) {
295 ++pos;
296 } else {
297 pos = ops.begin();
298 }
299}
300
11fdf7f2
TL
301void RGWCoroutinesStack::schedule()
302{
303 env->manager->schedule(env, this);
304}
305
306void RGWCoroutinesStack::_schedule()
307{
308 env->manager->_schedule(env, this);
309}
310
7c673cae
FG
311RGWCoroutinesStack *RGWCoroutinesStack::spawn(RGWCoroutine *source_op, RGWCoroutine *op, bool wait)
312{
313 if (!op) {
314 return NULL;
315 }
316
317 rgw_spawned_stacks *s = (source_op ? &source_op->spawned : &spawned);
318
f67539c2
TL
319 RGWCoroutinesStack *stack = preallocated_stack;
320 if (!stack) {
321 stack = env->manager->allocate_stack();
322 }
323 preallocated_stack = nullptr;
324
7c673cae
FG
325 s->add_pending(stack);
326 stack->parent = this;
327
328 stack->get(); /* we'll need to collect the stack */
329 stack->call(op);
330
331 env->manager->schedule(env, stack);
332
333 if (wait) {
334 set_blocked_by(stack);
335 }
336
337 return stack;
338}
339
340RGWCoroutinesStack *RGWCoroutinesStack::spawn(RGWCoroutine *op, bool wait)
341{
342 return spawn(NULL, op, wait);
343}
344
f67539c2
TL
345RGWCoroutinesStack *RGWCoroutinesStack::prealloc_stack()
346{
347 if (!preallocated_stack) {
348 preallocated_stack = env->manager->allocate_stack();
349 }
350 return preallocated_stack;
351}
352
7c673cae
FG
353int RGWCoroutinesStack::wait(const utime_t& interval)
354{
355 RGWCompletionManager *completion_mgr = env->manager->get_completion_mgr();
356 completion_mgr->wait_interval((void *)this, interval, (void *)this);
357 set_io_blocked(true);
358 set_interval_wait(true);
359 return 0;
360}
361
362void RGWCoroutinesStack::wakeup()
363{
364 RGWCompletionManager *completion_mgr = env->manager->get_completion_mgr();
365 completion_mgr->wakeup((void *)this);
366}
367
11fdf7f2
TL
368void RGWCoroutinesStack::io_complete(const rgw_io_id& io_id)
369{
370 RGWCompletionManager *completion_mgr = env->manager->get_completion_mgr();
371 completion_mgr->complete(nullptr, io_id, (void *)this);
372}
373
7c673cae
FG
374int RGWCoroutinesStack::unwind(int retcode)
375{
376 rgw_spawned_stacks *src_spawned = &(*pos)->spawned;
377
378 if (pos == ops.begin()) {
11fdf7f2 379 ldout(cct, 15) << "stack " << (void *)this << " end" << dendl;
7c673cae
FG
380 spawned.inherit(src_spawned);
381 ops.clear();
382 pos = ops.end();
383 return retcode;
384 }
385
386 --pos;
387 ops.pop_back();
388 RGWCoroutine *op = *pos;
389 op->set_retcode(retcode);
390 op->spawned.inherit(src_spawned);
391 return 0;
392}
393
11fdf7f2
TL
394void RGWCoroutinesStack::cancel()
395{
396 while (!ops.empty()) {
397 RGWCoroutine *op = *pos;
398 unwind(-ECANCELED);
399 op->put();
400 }
401 put();
402}
7c673cae 403
f67539c2 404bool RGWCoroutinesStack::collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id) /* returns true if needs to be called again */
7c673cae 405{
11fdf7f2 406 bool need_retry = false;
7c673cae
FG
407 rgw_spawned_stacks *s = (op ? &op->spawned : &spawned);
408 *ret = 0;
409 vector<RGWCoroutinesStack *> new_list;
410
411 for (vector<RGWCoroutinesStack *>::iterator iter = s->entries.begin(); iter != s->entries.end(); ++iter) {
412 RGWCoroutinesStack *stack = *iter;
413 if (stack == skip_stack || !stack->is_done()) {
414 new_list.push_back(stack);
415 if (!stack->is_done()) {
416 ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " is still running" << dendl;
417 } else if (stack == skip_stack) {
418 ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " explicitly skipping stack" << dendl;
419 }
420 continue;
421 }
f67539c2
TL
422 if (stack_id) {
423 *stack_id = stack->get_id();
424 }
7c673cae
FG
425 int r = stack->get_ret_status();
426 stack->put();
427 if (r < 0) {
428 *ret = r;
429 ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " encountered error (r=" << r << "), skipping next stacks" << dendl;
430 new_list.insert(new_list.end(), ++iter, s->entries.end());
11fdf7f2 431 need_retry = (iter != s->entries.end());
7c673cae
FG
432 break;
433 }
434
435 ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " is complete" << dendl;
436 }
437
438 s->entries.swap(new_list);
11fdf7f2 439 return need_retry;
7c673cae
FG
440}
441
442bool RGWCoroutinesStack::collect_next(RGWCoroutine *op, int *ret, RGWCoroutinesStack **collected_stack) /* returns true if found a stack to collect */
443{
444 rgw_spawned_stacks *s = (op ? &op->spawned : &spawned);
445 *ret = 0;
446
447 if (collected_stack) {
448 *collected_stack = NULL;
449 }
450
451 for (vector<RGWCoroutinesStack *>::iterator iter = s->entries.begin(); iter != s->entries.end(); ++iter) {
452 RGWCoroutinesStack *stack = *iter;
453 if (!stack->is_done()) {
454 continue;
455 }
456 int r = stack->get_ret_status();
457 if (r < 0) {
458 *ret = r;
459 }
460
461 if (collected_stack) {
462 *collected_stack = stack;
463 }
464 stack->put();
465
466 s->entries.erase(iter);
467 return true;
468 }
469
470 return false;
471}
472
f67539c2 473bool RGWCoroutinesStack::collect(int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id) /* returns true if needs to be called again */
7c673cae 474{
f67539c2 475 return collect(NULL, ret, skip_stack, stack_id);
7c673cae
FG
476}
477
478static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg)
479{
11fdf7f2 480 (static_cast<RGWAioCompletionNotifier *>(arg))->cb();
7c673cae
FG
481}
482
11fdf7f2
TL
483RGWAioCompletionNotifier::RGWAioCompletionNotifier(RGWCompletionManager *_mgr, const rgw_io_id& _io_id, void *_user_data) : completion_mgr(_mgr),
484 io_id(_io_id),
9f95a23c
TL
485 user_data(_user_data), registered(true) {
486 c = librados::Rados::aio_create_completion(this, _aio_completion_notifier_cb);
7c673cae
FG
487}
488
489RGWAioCompletionNotifier *RGWCoroutinesStack::create_completion_notifier()
490{
491 return ops_mgr->create_completion_notifier(this);
492}
493
494RGWCompletionManager *RGWCoroutinesStack::get_completion_mgr()
495{
496 return ops_mgr->get_completion_mgr();
497}
498
499bool RGWCoroutinesStack::unblock_stack(RGWCoroutinesStack **s)
500{
501 if (blocking_stacks.empty()) {
502 return false;
503 }
504
505 set<RGWCoroutinesStack *>::iterator iter = blocking_stacks.begin();
506 *s = *iter;
507 blocking_stacks.erase(iter);
508 (*s)->blocked_by_stack.erase(this);
509
510 return true;
511}
512
513void RGWCoroutinesManager::report_error(RGWCoroutinesStack *op)
514{
515 if (!op) {
516 return;
517 }
518 string err = op->error_str();
519 if (err.empty()) {
520 return;
521 }
522 lderr(cct) << "ERROR: failed operation: " << op->error_str() << dendl;
523}
524
525void RGWCoroutinesStack::dump(Formatter *f) const {
526 stringstream ss;
527 ss << (void *)this;
528 ::encode_json("stack", ss.str(), f);
529 ::encode_json("run_count", run_count, f);
530 f->open_array_section("ops");
531 for (auto& i : ops) {
532 encode_json("op", *i, f);
533 }
534 f->close_section();
535}
536
11fdf7f2 537void RGWCoroutinesStack::init_new_io(RGWIOProvider *io_provider)
7c673cae 538{
11fdf7f2
TL
539 io_provider->set_io_user_info((void *)this);
540 io_provider->assign_io(env->manager->get_io_id_provider());
541}
542
543bool RGWCoroutinesStack::try_io_unblock(const rgw_io_id& io_id)
544{
545 if (!can_io_unblock(io_id)) {
546 auto p = io_finish_ids.emplace(io_id.id, io_id);
547 auto& iter = p.first;
548 bool inserted = p.second;
549 if (!inserted) { /* could not insert, entry already existed, add channel to completion mask */
550 iter->second.channels |= io_id.channels;
551 }
552 return false;
553 }
554
555 return true;
556}
557
558bool RGWCoroutinesStack::consume_io_finish(const rgw_io_id& io_id)
559{
560 auto iter = io_finish_ids.find(io_id.id);
561 if (iter == io_finish_ids.end()) {
562 return false;
563 }
564 int finish_mask = iter->second.channels;
565 bool found = (finish_mask & io_id.channels) != 0;
566
567 finish_mask &= ~(finish_mask & io_id.channels);
568
569 if (finish_mask == 0) {
570 io_finish_ids.erase(iter);
571 }
572 return found;
573}
574
575
576void RGWCoroutinesManager::handle_unblocked_stack(set<RGWCoroutinesStack *>& context_stacks, list<RGWCoroutinesStack *>& scheduled_stacks,
577 RGWCompletionManager::io_completion& io, int *blocked_count)
578{
9f95a23c 579 ceph_assert(ceph_mutex_is_wlocked(lock));
11fdf7f2
TL
580 RGWCoroutinesStack *stack = static_cast<RGWCoroutinesStack *>(io.user_info);
581 if (context_stacks.find(stack) == context_stacks.end()) {
582 return;
583 }
584 if (!stack->try_io_unblock(io.io_id)) {
585 return;
586 }
587 if (stack->is_io_blocked()) {
588 --(*blocked_count);
589 stack->set_io_blocked(false);
590 }
7c673cae
FG
591 stack->set_interval_wait(false);
592 if (!stack->is_done()) {
11fdf7f2
TL
593 if (!stack->is_scheduled) {
594 scheduled_stacks.push_back(stack);
595 stack->set_is_scheduled(true);
596 }
7c673cae 597 } else {
7c673cae
FG
598 context_stacks.erase(stack);
599 stack->put();
600 }
601}
602
603void RGWCoroutinesManager::schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack)
604{
9f95a23c 605 std::unique_lock wl{lock};
11fdf7f2
TL
606 _schedule(env, stack);
607}
608
609void RGWCoroutinesManager::_schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack)
610{
9f95a23c 611 ceph_assert(ceph_mutex_is_wlocked(lock));
11fdf7f2
TL
612 if (!stack->is_scheduled) {
613 env->scheduled_stacks->push_back(stack);
614 stack->set_is_scheduled(true);
615 }
7c673cae
FG
616 set<RGWCoroutinesStack *>& context_stacks = run_contexts[env->run_context];
617 context_stacks.insert(stack);
618}
619
11fdf7f2
TL
620void RGWCoroutinesManager::set_sleeping(RGWCoroutine *cr, bool flag)
621{
622 cr->set_sleeping(flag);
623}
624
625void RGWCoroutinesManager::io_complete(RGWCoroutine *cr, const rgw_io_id& io_id)
626{
627 cr->io_complete(io_id);
628}
629
b3b6e05e 630int RGWCoroutinesManager::run(const DoutPrefixProvider *dpp, list<RGWCoroutinesStack *>& stacks)
7c673cae
FG
631{
632 int ret = 0;
633 int blocked_count = 0;
634 int interval_wait_count = 0;
635 bool canceled = false; // set on going_down
636 RGWCoroutinesEnv env;
11fdf7f2 637 bool op_not_blocked;
7c673cae
FG
638
639 uint64_t run_context = ++run_context_count;
640
9f95a23c 641 lock.lock();
7c673cae
FG
642 set<RGWCoroutinesStack *>& context_stacks = run_contexts[run_context];
643 list<RGWCoroutinesStack *> scheduled_stacks;
644 for (auto& st : stacks) {
645 context_stacks.insert(st);
646 scheduled_stacks.push_back(st);
11fdf7f2 647 st->set_is_scheduled(true);
7c673cae 648 }
7c673cae
FG
649 env.run_context = run_context;
650 env.manager = this;
651 env.scheduled_stacks = &scheduled_stacks;
652
653 for (list<RGWCoroutinesStack *>::iterator iter = scheduled_stacks.begin(); iter != scheduled_stacks.end() && !going_down;) {
11fdf7f2 654 RGWCompletionManager::io_completion io;
7c673cae 655 RGWCoroutinesStack *stack = *iter;
11fdf7f2
TL
656 ++iter;
657 scheduled_stacks.pop_front();
658
659 if (context_stacks.find(stack) == context_stacks.end()) {
660 /* stack was probably schedule more than once due to IO, but was since complete */
661 goto next;
662 }
7c673cae
FG
663 env.stack = stack;
664
11fdf7f2
TL
665 lock.unlock();
666
b3b6e05e 667 ret = stack->operate(dpp, &env);
11fdf7f2 668
9f95a23c 669 lock.lock();
11fdf7f2 670
7c673cae
FG
671 stack->set_is_scheduled(false);
672 if (ret < 0) {
b3b6e05e 673 ldpp_dout(dpp, 20) << "stack->operate() returned ret=" << ret << dendl;
7c673cae
FG
674 }
675
676 if (stack->is_error()) {
677 report_error(stack);
678 }
679
11fdf7f2 680 op_not_blocked = false;
7c673cae
FG
681
682 if (stack->is_io_blocked()) {
683 ldout(cct, 20) << __func__ << ":" << " stack=" << (void *)stack << " is io blocked" << dendl;
684 if (stack->is_interval_waiting()) {
685 interval_wait_count++;
686 }
687 blocked_count++;
688 } else if (stack->is_blocked()) {
689 /* do nothing, we'll re-add the stack when the blocking stack is done,
690 * or when we're awaken
691 */
692 ldout(cct, 20) << __func__ << ":" << " stack=" << (void *)stack << " is_blocked_by_stack()=" << stack->is_blocked_by_stack()
693 << " is_sleeping=" << stack->is_sleeping() << " waiting_for_child()=" << stack->waiting_for_child() << dendl;
694 } else if (stack->is_done()) {
695 ldout(cct, 20) << __func__ << ":" << " stack=" << (void *)stack << " is done" << dendl;
696 RGWCoroutinesStack *s;
697 while (stack->unblock_stack(&s)) {
698 if (!s->is_blocked_by_stack() && !s->is_done()) {
699 if (s->is_io_blocked()) {
700 if (stack->is_interval_waiting()) {
701 interval_wait_count++;
702 }
703 blocked_count++;
704 } else {
11fdf7f2 705 s->_schedule();
7c673cae
FG
706 }
707 }
708 }
709 if (stack->parent && stack->parent->waiting_for_child()) {
710 stack->parent->set_wait_for_child(false);
11fdf7f2 711 stack->parent->_schedule();
7c673cae
FG
712 }
713 context_stacks.erase(stack);
714 stack->put();
715 stack = NULL;
716 } else {
717 op_not_blocked = true;
718 stack->run_count++;
11fdf7f2 719 stack->_schedule();
7c673cae
FG
720 }
721
722 if (!op_not_blocked && stack) {
723 stack->run_count = 0;
724 }
725
11fdf7f2
TL
726 while (completion_mgr->try_get_next(&io)) {
727 handle_unblocked_stack(context_stacks, scheduled_stacks, io, &blocked_count);
7c673cae
FG
728 }
729
730 /*
731 * only account blocked operations that are not in interval_wait, these are stacks that
732 * were put on a wait without any real IO operations. While we mark these as io_blocked,
733 * these aren't really waiting for IOs
734 */
735 while (blocked_count - interval_wait_count >= ops_window) {
11fdf7f2
TL
736 lock.unlock();
737 ret = completion_mgr->get_next(&io);
9f95a23c 738 lock.lock();
7c673cae 739 if (ret < 0) {
28e407b8 740 ldout(cct, 5) << "completion_mgr.get_next() returned ret=" << ret << dendl;
7c673cae 741 }
11fdf7f2 742 handle_unblocked_stack(context_stacks, scheduled_stacks, io, &blocked_count);
7c673cae
FG
743 }
744
11fdf7f2 745next:
7c673cae 746 while (scheduled_stacks.empty() && blocked_count > 0) {
11fdf7f2
TL
747 lock.unlock();
748 ret = completion_mgr->get_next(&io);
9f95a23c 749 lock.lock();
7c673cae 750 if (ret < 0) {
28e407b8 751 ldout(cct, 5) << "completion_mgr.get_next() returned ret=" << ret << dendl;
7c673cae
FG
752 }
753 if (going_down) {
754 ldout(cct, 5) << __func__ << "(): was stopped, exiting" << dendl;
755 ret = -ECANCELED;
756 canceled = true;
757 break;
758 }
11fdf7f2 759 handle_unblocked_stack(context_stacks, scheduled_stacks, io, &blocked_count);
7c673cae
FG
760 iter = scheduled_stacks.begin();
761 }
762 if (canceled) {
763 break;
764 }
765
766 if (iter == scheduled_stacks.end()) {
767 iter = scheduled_stacks.begin();
768 }
769 }
770
7c673cae
FG
771 if (!context_stacks.empty() && !going_down) {
772 JSONFormatter formatter(true);
773 formatter.open_array_section("context_stacks");
774 for (auto& s : context_stacks) {
775 ::encode_json("entry", *s, &formatter);
776 }
777 formatter.close_section();
778 lderr(cct) << __func__ << "(): ERROR: deadlock detected, dumping remaining coroutines:\n";
779 formatter.flush(*_dout);
780 *_dout << dendl;
11fdf7f2 781 ceph_assert(context_stacks.empty() || going_down); // assert on deadlock
7c673cae
FG
782 }
783
784 for (auto stack : context_stacks) {
785 ldout(cct, 20) << "clearing stack on run() exit: stack=" << (void *)stack << " nref=" << stack->get_nref() << dendl;
11fdf7f2 786 stack->cancel();
7c673cae
FG
787 }
788 run_contexts.erase(run_context);
789 lock.unlock();
790
791 return ret;
792}
793
b3b6e05e 794int RGWCoroutinesManager::run(const DoutPrefixProvider *dpp, RGWCoroutine *op)
7c673cae
FG
795{
796 if (!op) {
797 return 0;
798 }
799 list<RGWCoroutinesStack *> stacks;
800 RGWCoroutinesStack *stack = allocate_stack();
801 op->get();
802 stack->call(op);
803
11fdf7f2 804 stacks.push_back(stack);
7c673cae 805
b3b6e05e 806 int r = run(dpp, stacks);
7c673cae 807 if (r < 0) {
b3b6e05e 808 ldpp_dout(dpp, 20) << "run(stacks) returned r=" << r << dendl;
7c673cae
FG
809 } else {
810 r = op->get_ret_status();
811 }
812 op->put();
813
814 return r;
815}
816
817RGWAioCompletionNotifier *RGWCoroutinesManager::create_completion_notifier(RGWCoroutinesStack *stack)
818{
11fdf7f2
TL
819 rgw_io_id io_id{get_next_io_id(), -1};
820 RGWAioCompletionNotifier *cn = new RGWAioCompletionNotifier(completion_mgr, io_id, (void *)stack);
7c673cae
FG
821 completion_mgr->register_completion_notifier(cn);
822 return cn;
823}
824
825void RGWCoroutinesManager::dump(Formatter *f) const {
9f95a23c 826 std::shared_lock rl{lock};
7c673cae
FG
827
828 f->open_array_section("run_contexts");
829 for (auto& i : run_contexts) {
830 f->open_object_section("context");
831 ::encode_json("id", i.first, f);
832 f->open_array_section("entries");
833 for (auto& s : i.second) {
834 ::encode_json("entry", *s, f);
835 }
836 f->close_section();
837 f->close_section();
838 }
839 f->close_section();
840}
841
842RGWCoroutinesStack *RGWCoroutinesManager::allocate_stack() {
843 return new RGWCoroutinesStack(cct, this);
844}
845
846string RGWCoroutinesManager::get_id()
847{
848 if (!id.empty()) {
849 return id;
850 }
851 stringstream ss;
852 ss << (void *)this;
853 return ss.str();
854}
855
856void RGWCoroutinesManagerRegistry::add(RGWCoroutinesManager *mgr)
857{
9f95a23c 858 std::unique_lock wl{lock};
7c673cae
FG
859 if (managers.find(mgr) == managers.end()) {
860 managers.insert(mgr);
861 get();
862 }
863}
864
865void RGWCoroutinesManagerRegistry::remove(RGWCoroutinesManager *mgr)
866{
9f95a23c 867 std::unique_lock wl{lock};
7c673cae
FG
868 if (managers.find(mgr) != managers.end()) {
869 managers.erase(mgr);
870 put();
871 }
872}
873
874RGWCoroutinesManagerRegistry::~RGWCoroutinesManagerRegistry()
875{
876 AdminSocket *admin_socket = cct->get_admin_socket();
877 if (!admin_command.empty()) {
9f95a23c 878 admin_socket->unregister_commands(this);
7c673cae
FG
879 }
880}
881
882int RGWCoroutinesManagerRegistry::hook_to_admin_command(const string& command)
883{
884 AdminSocket *admin_socket = cct->get_admin_socket();
885 if (!admin_command.empty()) {
9f95a23c 886 admin_socket->unregister_commands(this);
7c673cae
FG
887 }
888 admin_command = command;
9f95a23c 889 int r = admin_socket->register_command(admin_command, this,
7c673cae
FG
890 "dump current coroutines stack state");
891 if (r < 0) {
892 lderr(cct) << "ERROR: fail to register admin socket command (r=" << r << ")" << dendl;
893 return r;
894 }
895 return 0;
896}
897
9f95a23c
TL
898int RGWCoroutinesManagerRegistry::call(std::string_view command,
899 const cmdmap_t& cmdmap,
900 Formatter *f,
901 std::ostream& ss,
902 bufferlist& out) {
903 std::shared_lock rl{lock};
904 ::encode_json("cr_managers", *this, f);
905 return 0;
7c673cae
FG
906}
907
908void RGWCoroutinesManagerRegistry::dump(Formatter *f) const {
909 f->open_array_section("coroutine_managers");
910 for (auto m : managers) {
911 ::encode_json("entry", *m, f);
912 }
913 f->close_section();
914}
915
916void RGWCoroutine::call(RGWCoroutine *op)
917{
eafe8130
TL
918 if (op) {
919 stack->call(op);
920 } else {
921 // the call()er expects this to set a retcode
922 retcode = 0;
923 }
7c673cae
FG
924}
925
926RGWCoroutinesStack *RGWCoroutine::spawn(RGWCoroutine *op, bool wait)
927{
928 return stack->spawn(this, op, wait);
929}
930
f67539c2
TL
931RGWCoroutinesStack *RGWCoroutine::prealloc_stack()
932{
933 return stack->prealloc_stack();
934}
935
936uint64_t RGWCoroutine::prealloc_stack_id()
7c673cae 937{
f67539c2
TL
938 return prealloc_stack()->get_id();
939}
940
941bool RGWCoroutine::collect(int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id) /* returns true if needs to be called again */
942{
943 return stack->collect(this, ret, skip_stack, stack_id);
7c673cae
FG
944}
945
946bool RGWCoroutine::collect_next(int *ret, RGWCoroutinesStack **collected_stack) /* returns true if found a stack to collect */
947{
948 return stack->collect_next(this, ret, collected_stack);
949}
950
951int RGWCoroutine::wait(const utime_t& interval)
952{
953 return stack->wait(interval);
954}
955
956void RGWCoroutine::wait_for_child()
957{
958 /* should only wait for child if there is a child that is not done yet, and no complete children */
959 if (spawned.entries.empty()) {
960 return;
961 }
962 for (vector<RGWCoroutinesStack *>::iterator iter = spawned.entries.begin(); iter != spawned.entries.end(); ++iter) {
963 if ((*iter)->is_done()) {
964 return;
965 }
966 }
967 stack->set_wait_for_child(true);
968}
969
970string RGWCoroutine::to_str() const
971{
972 return typeid(*this).name();
973}
974
975ostream& operator<<(ostream& out, const RGWCoroutine& cr)
976{
977 out << "cr:s=" << (void *)cr.get_stack() << ":op=" << (void *)&cr << ":" << typeid(cr).name();
978 return out;
979}
980
f67539c2
TL
981bool RGWCoroutine::drain_children(int num_cr_left,
982 RGWCoroutinesStack *skip_stack,
983 std::optional<std::function<void(uint64_t stack_id, int ret)> > cb)
7c673cae
FG
984{
985 bool done = false;
11fdf7f2 986 ceph_assert(num_cr_left >= 0);
7c673cae
FG
987 if (num_cr_left == 0 && skip_stack) {
988 num_cr_left = 1;
989 }
f67539c2 990 reenter(&drain_status.cr) {
7c673cae
FG
991 while (num_spawned() > (size_t)num_cr_left) {
992 yield wait_for_child();
993 int ret;
f67539c2
TL
994 uint64_t stack_id;
995 bool again = false;
996 do {
997 again = collect(&ret, skip_stack, &stack_id);
998 if (ret < 0) {
999 ldout(cct, 10) << "collect() returned ret=" << ret << dendl;
1000 /* we should have reported this error */
1001 log_error() << "ERROR: collect() returned error (ret=" << ret << ")";
1002 }
1003 if (cb) {
1004 (*cb)(stack_id, ret);
1005 }
1006 } while (again);
1007 }
1008 done = true;
1009 }
1010 return done;
1011}
1012
1013bool RGWCoroutine::drain_children(int num_cr_left,
1014 std::optional<std::function<int(uint64_t stack_id, int ret)> > cb)
1015{
1016 bool done = false;
1017 ceph_assert(num_cr_left >= 0);
1018
1019 reenter(&drain_status.cr) {
1020 while (num_spawned() > (size_t)num_cr_left) {
1021 yield wait_for_child();
1022 int ret;
1023 uint64_t stack_id;
1024 bool again = false;
1025 do {
1026 again = collect(&ret, nullptr, &stack_id);
7c673cae
FG
1027 if (ret < 0) {
1028 ldout(cct, 10) << "collect() returned ret=" << ret << dendl;
1029 /* we should have reported this error */
1030 log_error() << "ERROR: collect() returned error (ret=" << ret << ")";
1031 }
f67539c2
TL
1032 if (cb && !drain_status.should_exit) {
1033 int r = (*cb)(stack_id, ret);
1034 if (r < 0) {
1035 drain_status.ret = r;
1036 drain_status.should_exit = true;
1037 num_cr_left = 0; /* need to drain all */
1038 }
1039 }
1040 } while (again);
7c673cae
FG
1041 }
1042 done = true;
1043 }
1044 return done;
1045}
1046
1047void RGWCoroutine::wakeup()
1048{
1049 stack->wakeup();
1050}
1051
11fdf7f2
TL
1052RGWCoroutinesEnv *RGWCoroutine::get_env() const
1053{
1054 return stack->get_env();
1055}
1056
7c673cae
FG
1057void RGWCoroutine::dump(Formatter *f) const {
1058 if (!description.str().empty()) {
1059 encode_json("description", description.str(), f);
1060 }
1061 encode_json("type", to_str(), f);
1062 if (!spawned.entries.empty()) {
1063 f->open_array_section("spawned");
1064 for (auto& i : spawned.entries) {
1065 char buf[32];
1066 snprintf(buf, sizeof(buf), "%p", (void *)i);
1067 encode_json("stack", string(buf), f);
1068 }
1069 f->close_section();
1070 }
1071 if (!status.history.empty()) {
1072 encode_json("history", status.history, f);
1073 }
1074
1075 if (!status.status.str().empty()) {
1076 f->open_object_section("status");
1077 encode_json("status", status.status.str(), f);
1078 encode_json("timestamp", status.timestamp, f);
1079 f->close_section();
1080 }
1081}
1082
1083RGWSimpleCoroutine::~RGWSimpleCoroutine()
1084{
1085 if (!called_cleanup) {
1086 request_cleanup();
1087 }
1088}
1089
1090void RGWSimpleCoroutine::call_cleanup()
1091{
1092 called_cleanup = true;
1093 request_cleanup();
1094}
1095
b3b6e05e 1096int RGWSimpleCoroutine::operate(const DoutPrefixProvider *dpp)
7c673cae
FG
1097{
1098 int ret = 0;
1099 reenter(this) {
1100 yield return state_init();
b3b6e05e 1101 yield return state_send_request(dpp);
7c673cae
FG
1102 yield return state_request_complete();
1103 yield return state_all_complete();
1104 drain_all();
1105 call_cleanup();
1106 return set_state(RGWCoroutine_Done, ret);
1107 }
1108 return 0;
1109}
1110
1111int RGWSimpleCoroutine::state_init()
1112{
1113 int ret = init();
1114 if (ret < 0) {
1115 call_cleanup();
1116 return set_state(RGWCoroutine_Error, ret);
1117 }
1118 return 0;
1119}
1120
b3b6e05e 1121int RGWSimpleCoroutine::state_send_request(const DoutPrefixProvider *dpp)
7c673cae 1122{
b3b6e05e 1123 int ret = send_request(dpp);
7c673cae
FG
1124 if (ret < 0) {
1125 call_cleanup();
1126 return set_state(RGWCoroutine_Error, ret);
1127 }
1128 return io_block(0);
1129}
1130
1131int RGWSimpleCoroutine::state_request_complete()
1132{
1133 int ret = request_complete();
1134 if (ret < 0) {
1135 call_cleanup();
1136 return set_state(RGWCoroutine_Error, ret);
1137 }
1138 return 0;
1139}
1140
1141int RGWSimpleCoroutine::state_all_complete()
1142{
1143 int ret = finish();
1144 if (ret < 0) {
1145 call_cleanup();
1146 return set_state(RGWCoroutine_Error, ret);
1147 }
1148 return 0;
1149}
1150
1151