]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_coroutine.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rgw / rgw_coroutine.cc
CommitLineData
11fdf7f2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
7c673cae
FG
3
4#include "common/ceph_json.h"
7c673cae 5#include "rgw_coroutine.h"
7c673cae
FG
6
7// re-include our assert to clobber the system one; fix dout:
11fdf7f2 8#include "include/ceph_assert.h"
7c673cae 9
31f18b77
FG
10#include <boost/asio/yield.hpp>
11
7c673cae 12#define dout_subsys ceph_subsys_rgw
11fdf7f2 13#define dout_context g_ceph_context
7c673cae
FG
14
15
16class RGWCompletionManager::WaitContext : public Context {
17 RGWCompletionManager *manager;
18 void *opaque;
19public:
20 WaitContext(RGWCompletionManager *_cm, void *_opaque) : manager(_cm), opaque(_opaque) {}
21 void finish(int r) override {
22 manager->_wakeup(opaque);
23 }
24};
25
26RGWCompletionManager::RGWCompletionManager(CephContext *_cct) : cct(_cct), lock("RGWCompletionManager::lock"),
27 timer(cct, lock)
28{
29 timer.init();
30}
31
32RGWCompletionManager::~RGWCompletionManager()
33{
34 Mutex::Locker l(lock);
35 timer.cancel_all_events();
36 timer.shutdown();
37}
38
11fdf7f2 39void RGWCompletionManager::complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info)
7c673cae
FG
40{
41 Mutex::Locker l(lock);
11fdf7f2 42 _complete(cn, io_id, user_info);
7c673cae
FG
43}
44
45void RGWCompletionManager::register_completion_notifier(RGWAioCompletionNotifier *cn)
46{
47 Mutex::Locker l(lock);
48 if (cn) {
49 cns.insert(cn);
50 }
51}
52
53void RGWCompletionManager::unregister_completion_notifier(RGWAioCompletionNotifier *cn)
54{
55 Mutex::Locker l(lock);
56 if (cn) {
57 cns.erase(cn);
58 }
59}
60
11fdf7f2 61void RGWCompletionManager::_complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info)
7c673cae
FG
62{
63 if (cn) {
64 cns.erase(cn);
65 }
11fdf7f2
TL
66
67 if (complete_reqs_set.find(io_id) != complete_reqs_set.end()) {
68 /* already have completion for this io_id, don't allow multiple completions for it */
69 return;
70 }
71 complete_reqs.push_back(io_completion{io_id, user_info});
7c673cae
FG
72 cond.Signal();
73}
74
11fdf7f2 75int RGWCompletionManager::get_next(io_completion *io)
7c673cae
FG
76{
77 Mutex::Locker l(lock);
78 while (complete_reqs.empty()) {
7c673cae
FG
79 if (going_down) {
80 return -ECANCELED;
81 }
11fdf7f2 82 cond.Wait(lock);
7c673cae 83 }
11fdf7f2
TL
84 *io = complete_reqs.front();
85 complete_reqs_set.erase(io->io_id);
7c673cae
FG
86 complete_reqs.pop_front();
87 return 0;
88}
89
11fdf7f2 90bool RGWCompletionManager::try_get_next(io_completion *io)
7c673cae
FG
91{
92 Mutex::Locker l(lock);
93 if (complete_reqs.empty()) {
94 return false;
95 }
11fdf7f2
TL
96 *io = complete_reqs.front();
97 complete_reqs_set.erase(io->io_id);
7c673cae
FG
98 complete_reqs.pop_front();
99 return true;
100}
101
102void RGWCompletionManager::go_down()
103{
104 Mutex::Locker l(lock);
105 for (auto cn : cns) {
106 cn->unregister();
107 }
108 going_down = true;
109 cond.Signal();
110}
111
112void RGWCompletionManager::wait_interval(void *opaque, const utime_t& interval, void *user_info)
113{
114 Mutex::Locker l(lock);
11fdf7f2 115 ceph_assert(waiters.find(opaque) == waiters.end());
7c673cae
FG
116 waiters[opaque] = user_info;
117 timer.add_event_after(interval, new WaitContext(this, opaque));
118}
119
120void RGWCompletionManager::wakeup(void *opaque)
121{
122 Mutex::Locker l(lock);
123 _wakeup(opaque);
124}
125
126void RGWCompletionManager::_wakeup(void *opaque)
127{
128 map<void *, void *>::iterator iter = waiters.find(opaque);
129 if (iter != waiters.end()) {
130 void *user_id = iter->second;
131 waiters.erase(iter);
11fdf7f2 132 _complete(NULL, rgw_io_id{0, -1} /* no IO id */, user_id);
7c673cae
FG
133 }
134}
135
136RGWCoroutine::~RGWCoroutine() {
137 for (auto stack : spawned.entries) {
138 stack->put();
139 }
140}
141
11fdf7f2
TL
142void RGWCoroutine::init_new_io(RGWIOProvider *io_provider)
143{
144 stack->init_new_io(io_provider);
145}
146
7c673cae
FG
147void RGWCoroutine::set_io_blocked(bool flag) {
148 stack->set_io_blocked(flag);
149}
150
151void RGWCoroutine::set_sleeping(bool flag) {
152 stack->set_sleeping(flag);
153}
154
11fdf7f2
TL
155int RGWCoroutine::io_block(int ret, int64_t io_id) {
156 return io_block(ret, rgw_io_id{io_id, -1});
157}
158
159int RGWCoroutine::io_block(int ret, const rgw_io_id& io_id) {
160 if (stack->consume_io_finish(io_id)) {
161 return 0;
162 }
7c673cae 163 set_io_blocked(true);
11fdf7f2 164 stack->set_io_blocked_id(io_id);
7c673cae
FG
165 return ret;
166}
167
11fdf7f2
TL
168void RGWCoroutine::io_complete(const rgw_io_id& io_id) {
169 stack->io_complete(io_id);
170}
171
7c673cae
FG
172void RGWCoroutine::StatusItem::dump(Formatter *f) const {
173 ::encode_json("timestamp", timestamp, f);
174 ::encode_json("status", status, f);
175}
176
177stringstream& RGWCoroutine::Status::set_status()
178{
179 RWLock::WLocker l(lock);
180 string s = status.str();
181 status.str(string());
182 if (!timestamp.is_zero()) {
183 history.push_back(StatusItem(timestamp, s));
184 }
185 if (history.size() > (size_t)max_history) {
186 history.pop_front();
187 }
188 timestamp = ceph_clock_now();
189
190 return status;
191}
192
11fdf7f2
TL
193int64_t RGWCoroutinesManager::get_next_io_id()
194{
195 return (int64_t)++max_io_id;
196}
197
7c673cae
FG
198RGWCoroutinesStack::RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start) : cct(_cct), ops_mgr(_ops_mgr),
199 done_flag(false), error_flag(false), blocked_flag(false),
200 sleep_flag(false), interval_wait_flag(false), is_scheduled(false), is_waiting_for_child(false),
201 retcode(0), run_count(0),
202 env(NULL), parent(NULL)
203{
204 if (start) {
205 ops.push_back(start);
206 }
207 pos = ops.begin();
208}
209
210RGWCoroutinesStack::~RGWCoroutinesStack()
211{
212 for (auto op : ops) {
213 op->put();
214 }
215
216 for (auto stack : spawned.entries) {
217 stack->put();
218 }
219}
220
221int RGWCoroutinesStack::operate(RGWCoroutinesEnv *_env)
222{
223 env = _env;
224 RGWCoroutine *op = *pos;
225 op->stack = this;
226 ldout(cct, 20) << *op << ": operate()" << dendl;
11fdf7f2 227 int r = op->operate_wrapper();
7c673cae
FG
228 if (r < 0) {
229 ldout(cct, 20) << *op << ": operate() returned r=" << r << dendl;
230 }
231
232 error_flag = op->is_error();
233
234 if (op->is_done()) {
235 int op_retcode = r;
236 r = unwind(op_retcode);
237 op->put();
238 done_flag = (pos == ops.end());
11fdf7f2 239 blocked_flag &= !done_flag;
7c673cae
FG
240 if (done_flag) {
241 retcode = op_retcode;
242 }
243 return r;
244 }
245
246 /* should r ever be negative at this point? */
11fdf7f2 247 ceph_assert(r >= 0);
7c673cae
FG
248
249 return 0;
250}
251
252string RGWCoroutinesStack::error_str()
253{
254 if (pos != ops.end()) {
255 return (*pos)->error_str();
256 }
257 return string();
258}
259
260void RGWCoroutinesStack::call(RGWCoroutine *next_op) {
261 if (!next_op) {
262 return;
263 }
264 ops.push_back(next_op);
265 if (pos != ops.end()) {
266 ++pos;
267 } else {
268 pos = ops.begin();
269 }
270}
271
11fdf7f2
TL
272void RGWCoroutinesStack::schedule()
273{
274 env->manager->schedule(env, this);
275}
276
277void RGWCoroutinesStack::_schedule()
278{
279 env->manager->_schedule(env, this);
280}
281
7c673cae
FG
282RGWCoroutinesStack *RGWCoroutinesStack::spawn(RGWCoroutine *source_op, RGWCoroutine *op, bool wait)
283{
284 if (!op) {
285 return NULL;
286 }
287
288 rgw_spawned_stacks *s = (source_op ? &source_op->spawned : &spawned);
289
290 RGWCoroutinesStack *stack = env->manager->allocate_stack();
291 s->add_pending(stack);
292 stack->parent = this;
293
294 stack->get(); /* we'll need to collect the stack */
295 stack->call(op);
296
297 env->manager->schedule(env, stack);
298
299 if (wait) {
300 set_blocked_by(stack);
301 }
302
303 return stack;
304}
305
306RGWCoroutinesStack *RGWCoroutinesStack::spawn(RGWCoroutine *op, bool wait)
307{
308 return spawn(NULL, op, wait);
309}
310
311int RGWCoroutinesStack::wait(const utime_t& interval)
312{
313 RGWCompletionManager *completion_mgr = env->manager->get_completion_mgr();
314 completion_mgr->wait_interval((void *)this, interval, (void *)this);
315 set_io_blocked(true);
316 set_interval_wait(true);
317 return 0;
318}
319
320void RGWCoroutinesStack::wakeup()
321{
322 RGWCompletionManager *completion_mgr = env->manager->get_completion_mgr();
323 completion_mgr->wakeup((void *)this);
324}
325
11fdf7f2
TL
326void RGWCoroutinesStack::io_complete(const rgw_io_id& io_id)
327{
328 RGWCompletionManager *completion_mgr = env->manager->get_completion_mgr();
329 completion_mgr->complete(nullptr, io_id, (void *)this);
330}
331
7c673cae
FG
332int RGWCoroutinesStack::unwind(int retcode)
333{
334 rgw_spawned_stacks *src_spawned = &(*pos)->spawned;
335
336 if (pos == ops.begin()) {
11fdf7f2 337 ldout(cct, 15) << "stack " << (void *)this << " end" << dendl;
7c673cae
FG
338 spawned.inherit(src_spawned);
339 ops.clear();
340 pos = ops.end();
341 return retcode;
342 }
343
344 --pos;
345 ops.pop_back();
346 RGWCoroutine *op = *pos;
347 op->set_retcode(retcode);
348 op->spawned.inherit(src_spawned);
349 return 0;
350}
351
11fdf7f2
TL
352void RGWCoroutinesStack::cancel()
353{
354 while (!ops.empty()) {
355 RGWCoroutine *op = *pos;
356 unwind(-ECANCELED);
357 op->put();
358 }
359 put();
360}
7c673cae
FG
361
362bool RGWCoroutinesStack::collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack) /* returns true if needs to be called again */
363{
11fdf7f2 364 bool need_retry = false;
7c673cae
FG
365 rgw_spawned_stacks *s = (op ? &op->spawned : &spawned);
366 *ret = 0;
367 vector<RGWCoroutinesStack *> new_list;
368
369 for (vector<RGWCoroutinesStack *>::iterator iter = s->entries.begin(); iter != s->entries.end(); ++iter) {
370 RGWCoroutinesStack *stack = *iter;
371 if (stack == skip_stack || !stack->is_done()) {
372 new_list.push_back(stack);
373 if (!stack->is_done()) {
374 ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " is still running" << dendl;
375 } else if (stack == skip_stack) {
376 ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " explicitly skipping stack" << dendl;
377 }
378 continue;
379 }
380 int r = stack->get_ret_status();
381 stack->put();
382 if (r < 0) {
383 *ret = r;
384 ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " encountered error (r=" << r << "), skipping next stacks" << dendl;
385 new_list.insert(new_list.end(), ++iter, s->entries.end());
11fdf7f2 386 need_retry = (iter != s->entries.end());
7c673cae
FG
387 break;
388 }
389
390 ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " is complete" << dendl;
391 }
392
393 s->entries.swap(new_list);
11fdf7f2 394 return need_retry;
7c673cae
FG
395}
396
397bool RGWCoroutinesStack::collect_next(RGWCoroutine *op, int *ret, RGWCoroutinesStack **collected_stack) /* returns true if found a stack to collect */
398{
399 rgw_spawned_stacks *s = (op ? &op->spawned : &spawned);
400 *ret = 0;
401
402 if (collected_stack) {
403 *collected_stack = NULL;
404 }
405
406 for (vector<RGWCoroutinesStack *>::iterator iter = s->entries.begin(); iter != s->entries.end(); ++iter) {
407 RGWCoroutinesStack *stack = *iter;
408 if (!stack->is_done()) {
409 continue;
410 }
411 int r = stack->get_ret_status();
412 if (r < 0) {
413 *ret = r;
414 }
415
416 if (collected_stack) {
417 *collected_stack = stack;
418 }
419 stack->put();
420
421 s->entries.erase(iter);
422 return true;
423 }
424
425 return false;
426}
427
428bool RGWCoroutinesStack::collect(int *ret, RGWCoroutinesStack *skip_stack) /* returns true if needs to be called again */
429{
430 return collect(NULL, ret, skip_stack);
431}
432
433static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg)
434{
11fdf7f2 435 (static_cast<RGWAioCompletionNotifier *>(arg))->cb();
7c673cae
FG
436}
437
11fdf7f2
TL
438RGWAioCompletionNotifier::RGWAioCompletionNotifier(RGWCompletionManager *_mgr, const rgw_io_id& _io_id, void *_user_data) : completion_mgr(_mgr),
439 io_id(_io_id),
7c673cae
FG
440 user_data(_user_data), lock("RGWAioCompletionNotifier"), registered(true) {
441 c = librados::Rados::aio_create_completion((void *)this, NULL,
442 _aio_completion_notifier_cb);
443}
444
445RGWAioCompletionNotifier *RGWCoroutinesStack::create_completion_notifier()
446{
447 return ops_mgr->create_completion_notifier(this);
448}
449
450RGWCompletionManager *RGWCoroutinesStack::get_completion_mgr()
451{
452 return ops_mgr->get_completion_mgr();
453}
454
455bool RGWCoroutinesStack::unblock_stack(RGWCoroutinesStack **s)
456{
457 if (blocking_stacks.empty()) {
458 return false;
459 }
460
461 set<RGWCoroutinesStack *>::iterator iter = blocking_stacks.begin();
462 *s = *iter;
463 blocking_stacks.erase(iter);
464 (*s)->blocked_by_stack.erase(this);
465
466 return true;
467}
468
469void RGWCoroutinesManager::report_error(RGWCoroutinesStack *op)
470{
471 if (!op) {
472 return;
473 }
474 string err = op->error_str();
475 if (err.empty()) {
476 return;
477 }
478 lderr(cct) << "ERROR: failed operation: " << op->error_str() << dendl;
479}
480
481void RGWCoroutinesStack::dump(Formatter *f) const {
482 stringstream ss;
483 ss << (void *)this;
484 ::encode_json("stack", ss.str(), f);
485 ::encode_json("run_count", run_count, f);
486 f->open_array_section("ops");
487 for (auto& i : ops) {
488 encode_json("op", *i, f);
489 }
490 f->close_section();
491}
492
11fdf7f2 493void RGWCoroutinesStack::init_new_io(RGWIOProvider *io_provider)
7c673cae 494{
11fdf7f2
TL
495 io_provider->set_io_user_info((void *)this);
496 io_provider->assign_io(env->manager->get_io_id_provider());
497}
498
499bool RGWCoroutinesStack::try_io_unblock(const rgw_io_id& io_id)
500{
501 if (!can_io_unblock(io_id)) {
502 auto p = io_finish_ids.emplace(io_id.id, io_id);
503 auto& iter = p.first;
504 bool inserted = p.second;
505 if (!inserted) { /* could not insert, entry already existed, add channel to completion mask */
506 iter->second.channels |= io_id.channels;
507 }
508 return false;
509 }
510
511 return true;
512}
513
514bool RGWCoroutinesStack::consume_io_finish(const rgw_io_id& io_id)
515{
516 auto iter = io_finish_ids.find(io_id.id);
517 if (iter == io_finish_ids.end()) {
518 return false;
519 }
520 int finish_mask = iter->second.channels;
521 bool found = (finish_mask & io_id.channels) != 0;
522
523 finish_mask &= ~(finish_mask & io_id.channels);
524
525 if (finish_mask == 0) {
526 io_finish_ids.erase(iter);
527 }
528 return found;
529}
530
531
532void RGWCoroutinesManager::handle_unblocked_stack(set<RGWCoroutinesStack *>& context_stacks, list<RGWCoroutinesStack *>& scheduled_stacks,
533 RGWCompletionManager::io_completion& io, int *blocked_count)
534{
535 ceph_assert(lock.is_wlocked());
536 RGWCoroutinesStack *stack = static_cast<RGWCoroutinesStack *>(io.user_info);
537 if (context_stacks.find(stack) == context_stacks.end()) {
538 return;
539 }
540 if (!stack->try_io_unblock(io.io_id)) {
541 return;
542 }
543 if (stack->is_io_blocked()) {
544 --(*blocked_count);
545 stack->set_io_blocked(false);
546 }
7c673cae
FG
547 stack->set_interval_wait(false);
548 if (!stack->is_done()) {
11fdf7f2
TL
549 if (!stack->is_scheduled) {
550 scheduled_stacks.push_back(stack);
551 stack->set_is_scheduled(true);
552 }
7c673cae 553 } else {
7c673cae
FG
554 context_stacks.erase(stack);
555 stack->put();
556 }
557}
558
559void RGWCoroutinesManager::schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack)
560{
11fdf7f2
TL
561 RWLock::WLocker wl(lock);
562 _schedule(env, stack);
563}
564
565void RGWCoroutinesManager::_schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack)
566{
567 ceph_assert(lock.is_wlocked());
568 if (!stack->is_scheduled) {
569 env->scheduled_stacks->push_back(stack);
570 stack->set_is_scheduled(true);
571 }
7c673cae
FG
572 set<RGWCoroutinesStack *>& context_stacks = run_contexts[env->run_context];
573 context_stacks.insert(stack);
574}
575
11fdf7f2
TL
576void RGWCoroutinesManager::set_sleeping(RGWCoroutine *cr, bool flag)
577{
578 cr->set_sleeping(flag);
579}
580
581void RGWCoroutinesManager::io_complete(RGWCoroutine *cr, const rgw_io_id& io_id)
582{
583 cr->io_complete(io_id);
584}
585
7c673cae
FG
586int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks)
587{
588 int ret = 0;
589 int blocked_count = 0;
590 int interval_wait_count = 0;
591 bool canceled = false; // set on going_down
592 RGWCoroutinesEnv env;
11fdf7f2 593 bool op_not_blocked;
7c673cae
FG
594
595 uint64_t run_context = ++run_context_count;
596
597 lock.get_write();
598 set<RGWCoroutinesStack *>& context_stacks = run_contexts[run_context];
599 list<RGWCoroutinesStack *> scheduled_stacks;
600 for (auto& st : stacks) {
601 context_stacks.insert(st);
602 scheduled_stacks.push_back(st);
11fdf7f2 603 st->set_is_scheduled(true);
7c673cae 604 }
7c673cae
FG
605 env.run_context = run_context;
606 env.manager = this;
607 env.scheduled_stacks = &scheduled_stacks;
608
609 for (list<RGWCoroutinesStack *>::iterator iter = scheduled_stacks.begin(); iter != scheduled_stacks.end() && !going_down;) {
11fdf7f2 610 RGWCompletionManager::io_completion io;
7c673cae 611 RGWCoroutinesStack *stack = *iter;
11fdf7f2
TL
612 ++iter;
613 scheduled_stacks.pop_front();
614
615 if (context_stacks.find(stack) == context_stacks.end()) {
616 /* stack was probably schedule more than once due to IO, but was since complete */
617 goto next;
618 }
7c673cae
FG
619 env.stack = stack;
620
11fdf7f2
TL
621 lock.unlock();
622
7c673cae 623 ret = stack->operate(&env);
11fdf7f2
TL
624
625 lock.get_write();
626
7c673cae
FG
627 stack->set_is_scheduled(false);
628 if (ret < 0) {
629 ldout(cct, 20) << "stack->operate() returned ret=" << ret << dendl;
630 }
631
632 if (stack->is_error()) {
633 report_error(stack);
634 }
635
11fdf7f2 636 op_not_blocked = false;
7c673cae
FG
637
638 if (stack->is_io_blocked()) {
639 ldout(cct, 20) << __func__ << ":" << " stack=" << (void *)stack << " is io blocked" << dendl;
640 if (stack->is_interval_waiting()) {
641 interval_wait_count++;
642 }
643 blocked_count++;
644 } else if (stack->is_blocked()) {
645 /* do nothing, we'll re-add the stack when the blocking stack is done,
646 * or when we're awaken
647 */
648 ldout(cct, 20) << __func__ << ":" << " stack=" << (void *)stack << " is_blocked_by_stack()=" << stack->is_blocked_by_stack()
649 << " is_sleeping=" << stack->is_sleeping() << " waiting_for_child()=" << stack->waiting_for_child() << dendl;
650 } else if (stack->is_done()) {
651 ldout(cct, 20) << __func__ << ":" << " stack=" << (void *)stack << " is done" << dendl;
652 RGWCoroutinesStack *s;
653 while (stack->unblock_stack(&s)) {
654 if (!s->is_blocked_by_stack() && !s->is_done()) {
655 if (s->is_io_blocked()) {
656 if (stack->is_interval_waiting()) {
657 interval_wait_count++;
658 }
659 blocked_count++;
660 } else {
11fdf7f2 661 s->_schedule();
7c673cae
FG
662 }
663 }
664 }
665 if (stack->parent && stack->parent->waiting_for_child()) {
666 stack->parent->set_wait_for_child(false);
11fdf7f2 667 stack->parent->_schedule();
7c673cae
FG
668 }
669 context_stacks.erase(stack);
670 stack->put();
671 stack = NULL;
672 } else {
673 op_not_blocked = true;
674 stack->run_count++;
11fdf7f2 675 stack->_schedule();
7c673cae
FG
676 }
677
678 if (!op_not_blocked && stack) {
679 stack->run_count = 0;
680 }
681
11fdf7f2
TL
682 while (completion_mgr->try_get_next(&io)) {
683 handle_unblocked_stack(context_stacks, scheduled_stacks, io, &blocked_count);
7c673cae
FG
684 }
685
686 /*
687 * only account blocked operations that are not in interval_wait, these are stacks that
688 * were put on a wait without any real IO operations. While we mark these as io_blocked,
689 * these aren't really waiting for IOs
690 */
691 while (blocked_count - interval_wait_count >= ops_window) {
11fdf7f2
TL
692 lock.unlock();
693 ret = completion_mgr->get_next(&io);
694 lock.get_write();
7c673cae 695 if (ret < 0) {
28e407b8 696 ldout(cct, 5) << "completion_mgr.get_next() returned ret=" << ret << dendl;
7c673cae 697 }
11fdf7f2 698 handle_unblocked_stack(context_stacks, scheduled_stacks, io, &blocked_count);
7c673cae
FG
699 }
700
11fdf7f2 701next:
7c673cae 702 while (scheduled_stacks.empty() && blocked_count > 0) {
11fdf7f2
TL
703 lock.unlock();
704 ret = completion_mgr->get_next(&io);
705 lock.get_write();
7c673cae 706 if (ret < 0) {
28e407b8 707 ldout(cct, 5) << "completion_mgr.get_next() returned ret=" << ret << dendl;
7c673cae
FG
708 }
709 if (going_down) {
710 ldout(cct, 5) << __func__ << "(): was stopped, exiting" << dendl;
711 ret = -ECANCELED;
712 canceled = true;
713 break;
714 }
11fdf7f2 715 handle_unblocked_stack(context_stacks, scheduled_stacks, io, &blocked_count);
7c673cae
FG
716 iter = scheduled_stacks.begin();
717 }
718 if (canceled) {
719 break;
720 }
721
722 if (iter == scheduled_stacks.end()) {
723 iter = scheduled_stacks.begin();
724 }
725 }
726
7c673cae
FG
727 if (!context_stacks.empty() && !going_down) {
728 JSONFormatter formatter(true);
729 formatter.open_array_section("context_stacks");
730 for (auto& s : context_stacks) {
731 ::encode_json("entry", *s, &formatter);
732 }
733 formatter.close_section();
734 lderr(cct) << __func__ << "(): ERROR: deadlock detected, dumping remaining coroutines:\n";
735 formatter.flush(*_dout);
736 *_dout << dendl;
11fdf7f2 737 ceph_assert(context_stacks.empty() || going_down); // assert on deadlock
7c673cae
FG
738 }
739
740 for (auto stack : context_stacks) {
741 ldout(cct, 20) << "clearing stack on run() exit: stack=" << (void *)stack << " nref=" << stack->get_nref() << dendl;
11fdf7f2 742 stack->cancel();
7c673cae
FG
743 }
744 run_contexts.erase(run_context);
745 lock.unlock();
746
747 return ret;
748}
749
750int RGWCoroutinesManager::run(RGWCoroutine *op)
751{
752 if (!op) {
753 return 0;
754 }
755 list<RGWCoroutinesStack *> stacks;
756 RGWCoroutinesStack *stack = allocate_stack();
757 op->get();
758 stack->call(op);
759
11fdf7f2 760 stacks.push_back(stack);
7c673cae
FG
761
762 int r = run(stacks);
763 if (r < 0) {
764 ldout(cct, 20) << "run(stacks) returned r=" << r << dendl;
765 } else {
766 r = op->get_ret_status();
767 }
768 op->put();
769
770 return r;
771}
772
773RGWAioCompletionNotifier *RGWCoroutinesManager::create_completion_notifier(RGWCoroutinesStack *stack)
774{
11fdf7f2
TL
775 rgw_io_id io_id{get_next_io_id(), -1};
776 RGWAioCompletionNotifier *cn = new RGWAioCompletionNotifier(completion_mgr, io_id, (void *)stack);
7c673cae
FG
777 completion_mgr->register_completion_notifier(cn);
778 return cn;
779}
780
781void RGWCoroutinesManager::dump(Formatter *f) const {
782 RWLock::RLocker rl(lock);
783
784 f->open_array_section("run_contexts");
785 for (auto& i : run_contexts) {
786 f->open_object_section("context");
787 ::encode_json("id", i.first, f);
788 f->open_array_section("entries");
789 for (auto& s : i.second) {
790 ::encode_json("entry", *s, f);
791 }
792 f->close_section();
793 f->close_section();
794 }
795 f->close_section();
796}
797
798RGWCoroutinesStack *RGWCoroutinesManager::allocate_stack() {
799 return new RGWCoroutinesStack(cct, this);
800}
801
802string RGWCoroutinesManager::get_id()
803{
804 if (!id.empty()) {
805 return id;
806 }
807 stringstream ss;
808 ss << (void *)this;
809 return ss.str();
810}
811
812void RGWCoroutinesManagerRegistry::add(RGWCoroutinesManager *mgr)
813{
814 RWLock::WLocker wl(lock);
815 if (managers.find(mgr) == managers.end()) {
816 managers.insert(mgr);
817 get();
818 }
819}
820
821void RGWCoroutinesManagerRegistry::remove(RGWCoroutinesManager *mgr)
822{
823 RWLock::WLocker wl(lock);
824 if (managers.find(mgr) != managers.end()) {
825 managers.erase(mgr);
826 put();
827 }
828}
829
830RGWCoroutinesManagerRegistry::~RGWCoroutinesManagerRegistry()
831{
832 AdminSocket *admin_socket = cct->get_admin_socket();
833 if (!admin_command.empty()) {
834 admin_socket->unregister_command(admin_command);
835 }
836}
837
838int RGWCoroutinesManagerRegistry::hook_to_admin_command(const string& command)
839{
840 AdminSocket *admin_socket = cct->get_admin_socket();
841 if (!admin_command.empty()) {
842 admin_socket->unregister_command(admin_command);
843 }
844 admin_command = command;
845 int r = admin_socket->register_command(admin_command, admin_command, this,
846 "dump current coroutines stack state");
847 if (r < 0) {
848 lderr(cct) << "ERROR: fail to register admin socket command (r=" << r << ")" << dendl;
849 return r;
850 }
851 return 0;
852}
853
11fdf7f2
TL
854bool RGWCoroutinesManagerRegistry::call(std::string_view command,
855 const cmdmap_t& cmdmap,
856 std::string_view format,
857 bufferlist& out) {
7c673cae
FG
858 RWLock::RLocker rl(lock);
859 stringstream ss;
860 JSONFormatter f;
861 ::encode_json("cr_managers", *this, &f);
862 f.flush(ss);
863 out.append(ss);
864 return true;
865}
866
867void RGWCoroutinesManagerRegistry::dump(Formatter *f) const {
868 f->open_array_section("coroutine_managers");
869 for (auto m : managers) {
870 ::encode_json("entry", *m, f);
871 }
872 f->close_section();
873}
874
875void RGWCoroutine::call(RGWCoroutine *op)
876{
877 stack->call(op);
878}
879
880RGWCoroutinesStack *RGWCoroutine::spawn(RGWCoroutine *op, bool wait)
881{
882 return stack->spawn(this, op, wait);
883}
884
885bool RGWCoroutine::collect(int *ret, RGWCoroutinesStack *skip_stack) /* returns true if needs to be called again */
886{
887 return stack->collect(this, ret, skip_stack);
888}
889
890bool RGWCoroutine::collect_next(int *ret, RGWCoroutinesStack **collected_stack) /* returns true if found a stack to collect */
891{
892 return stack->collect_next(this, ret, collected_stack);
893}
894
895int RGWCoroutine::wait(const utime_t& interval)
896{
897 return stack->wait(interval);
898}
899
900void RGWCoroutine::wait_for_child()
901{
902 /* should only wait for child if there is a child that is not done yet, and no complete children */
903 if (spawned.entries.empty()) {
904 return;
905 }
906 for (vector<RGWCoroutinesStack *>::iterator iter = spawned.entries.begin(); iter != spawned.entries.end(); ++iter) {
907 if ((*iter)->is_done()) {
908 return;
909 }
910 }
911 stack->set_wait_for_child(true);
912}
913
914string RGWCoroutine::to_str() const
915{
916 return typeid(*this).name();
917}
918
919ostream& operator<<(ostream& out, const RGWCoroutine& cr)
920{
921 out << "cr:s=" << (void *)cr.get_stack() << ":op=" << (void *)&cr << ":" << typeid(cr).name();
922 return out;
923}
924
925bool RGWCoroutine::drain_children(int num_cr_left, RGWCoroutinesStack *skip_stack)
926{
927 bool done = false;
11fdf7f2 928 ceph_assert(num_cr_left >= 0);
7c673cae
FG
929 if (num_cr_left == 0 && skip_stack) {
930 num_cr_left = 1;
931 }
932 reenter(&drain_cr) {
933 while (num_spawned() > (size_t)num_cr_left) {
934 yield wait_for_child();
935 int ret;
936 while (collect(&ret, skip_stack)) {
937 if (ret < 0) {
938 ldout(cct, 10) << "collect() returned ret=" << ret << dendl;
939 /* we should have reported this error */
940 log_error() << "ERROR: collect() returned error (ret=" << ret << ")";
941 }
942 }
943 }
944 done = true;
945 }
946 return done;
947}
948
949void RGWCoroutine::wakeup()
950{
951 stack->wakeup();
952}
953
11fdf7f2
TL
954RGWCoroutinesEnv *RGWCoroutine::get_env() const
955{
956 return stack->get_env();
957}
958
7c673cae
FG
959void RGWCoroutine::dump(Formatter *f) const {
960 if (!description.str().empty()) {
961 encode_json("description", description.str(), f);
962 }
963 encode_json("type", to_str(), f);
964 if (!spawned.entries.empty()) {
965 f->open_array_section("spawned");
966 for (auto& i : spawned.entries) {
967 char buf[32];
968 snprintf(buf, sizeof(buf), "%p", (void *)i);
969 encode_json("stack", string(buf), f);
970 }
971 f->close_section();
972 }
973 if (!status.history.empty()) {
974 encode_json("history", status.history, f);
975 }
976
977 if (!status.status.str().empty()) {
978 f->open_object_section("status");
979 encode_json("status", status.status.str(), f);
980 encode_json("timestamp", status.timestamp, f);
981 f->close_section();
982 }
983}
984
985RGWSimpleCoroutine::~RGWSimpleCoroutine()
986{
987 if (!called_cleanup) {
988 request_cleanup();
989 }
990}
991
992void RGWSimpleCoroutine::call_cleanup()
993{
994 called_cleanup = true;
995 request_cleanup();
996}
997
998int RGWSimpleCoroutine::operate()
999{
1000 int ret = 0;
1001 reenter(this) {
1002 yield return state_init();
1003 yield return state_send_request();
1004 yield return state_request_complete();
1005 yield return state_all_complete();
1006 drain_all();
1007 call_cleanup();
1008 return set_state(RGWCoroutine_Done, ret);
1009 }
1010 return 0;
1011}
1012
1013int RGWSimpleCoroutine::state_init()
1014{
1015 int ret = init();
1016 if (ret < 0) {
1017 call_cleanup();
1018 return set_state(RGWCoroutine_Error, ret);
1019 }
1020 return 0;
1021}
1022
1023int RGWSimpleCoroutine::state_send_request()
1024{
1025 int ret = send_request();
1026 if (ret < 0) {
1027 call_cleanup();
1028 return set_state(RGWCoroutine_Error, ret);
1029 }
1030 return io_block(0);
1031}
1032
1033int RGWSimpleCoroutine::state_request_complete()
1034{
1035 int ret = request_complete();
1036 if (ret < 0) {
1037 call_cleanup();
1038 return set_state(RGWCoroutine_Error, ret);
1039 }
1040 return 0;
1041}
1042
1043int RGWSimpleCoroutine::state_all_complete()
1044{
1045 int ret = finish();
1046 if (ret < 0) {
1047 call_cleanup();
1048 return set_state(RGWCoroutine_Error, ret);
1049 }
1050 return 0;
1051}
1052
1053