1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #include "include/Context.h"
5 #include "common/ceph_json.h"
6 #include "rgw_coroutine.h"
8 // re-include our assert to clobber the system one; fix dout:
9 #include "include/ceph_assert.h"
11 #include <boost/asio/yield.hpp>
13 #define dout_subsys ceph_subsys_rgw
14 #define dout_context g_ceph_context
18 class RGWCompletionManager::WaitContext
: public Context
{
19 RGWCompletionManager
*manager
;
22 WaitContext(RGWCompletionManager
*_cm
, void *_opaque
) : manager(_cm
), opaque(_opaque
) {}
23 void finish(int r
) override
{
24 manager
->_wakeup(opaque
);
28 RGWCompletionManager::RGWCompletionManager(CephContext
*_cct
) : cct(_cct
),
34 RGWCompletionManager::~RGWCompletionManager()
36 std::lock_guard l
{lock
};
37 timer
.cancel_all_events();
41 void RGWCompletionManager::complete(RGWAioCompletionNotifier
*cn
, const rgw_io_id
& io_id
, void *user_info
)
43 std::lock_guard l
{lock
};
44 _complete(cn
, io_id
, user_info
);
47 void RGWCompletionManager::register_completion_notifier(RGWAioCompletionNotifier
*cn
)
49 std::lock_guard l
{lock
};
55 void RGWCompletionManager::unregister_completion_notifier(RGWAioCompletionNotifier
*cn
)
57 std::lock_guard l
{lock
};
63 void RGWCompletionManager::_complete(RGWAioCompletionNotifier
*cn
, const rgw_io_id
& io_id
, void *user_info
)
69 if (complete_reqs_set
.find(io_id
) != complete_reqs_set
.end()) {
70 /* already have completion for this io_id, don't allow multiple completions for it */
73 complete_reqs
.push_back(io_completion
{io_id
, user_info
});
77 int RGWCompletionManager::get_next(io_completion
*io
)
79 std::unique_lock l
{lock
};
80 while (complete_reqs
.empty()) {
86 *io
= complete_reqs
.front();
87 complete_reqs_set
.erase(io
->io_id
);
88 complete_reqs
.pop_front();
92 bool RGWCompletionManager::try_get_next(io_completion
*io
)
94 std::lock_guard l
{lock
};
95 if (complete_reqs
.empty()) {
98 *io
= complete_reqs
.front();
99 complete_reqs_set
.erase(io
->io_id
);
100 complete_reqs
.pop_front();
104 void RGWCompletionManager::go_down()
106 std::lock_guard l
{lock
};
107 for (auto cn
: cns
) {
114 void RGWCompletionManager::wait_interval(void *opaque
, const utime_t
& interval
, void *user_info
)
116 std::lock_guard l
{lock
};
117 ceph_assert(waiters
.find(opaque
) == waiters
.end());
118 waiters
[opaque
] = user_info
;
119 timer
.add_event_after(interval
, new WaitContext(this, opaque
));
122 void RGWCompletionManager::wakeup(void *opaque
)
124 std::lock_guard l
{lock
};
128 void RGWCompletionManager::_wakeup(void *opaque
)
130 map
<void *, void *>::iterator iter
= waiters
.find(opaque
);
131 if (iter
!= waiters
.end()) {
132 void *user_id
= iter
->second
;
134 _complete(NULL
, rgw_io_id
{0, -1} /* no IO id */, user_id
);
138 RGWCoroutine::~RGWCoroutine() {
139 for (auto stack
: spawned
.entries
) {
144 void RGWCoroutine::init_new_io(RGWIOProvider
*io_provider
)
146 ceph_assert(stack
); // if there's no stack, io_provider won't be uninitialized
147 stack
->init_new_io(io_provider
);
150 void RGWCoroutine::set_io_blocked(bool flag
) {
152 stack
->set_io_blocked(flag
);
156 void RGWCoroutine::set_sleeping(bool flag
) {
158 stack
->set_sleeping(flag
);
162 int RGWCoroutine::io_block(int ret
, int64_t io_id
) {
163 return io_block(ret
, rgw_io_id
{io_id
, -1});
166 int RGWCoroutine::io_block(int ret
, const rgw_io_id
& io_id
) {
170 if (stack
->consume_io_finish(io_id
)) {
173 set_io_blocked(true);
174 stack
->set_io_blocked_id(io_id
);
178 void RGWCoroutine::io_complete(const rgw_io_id
& io_id
) {
180 stack
->io_complete(io_id
);
184 void RGWCoroutine::StatusItem::dump(Formatter
*f
) const {
185 ::encode_json("timestamp", timestamp
, f
);
186 ::encode_json("status", status
, f
);
189 stringstream
& RGWCoroutine::Status::set_status()
191 std::unique_lock l
{lock
};
192 string s
= status
.str();
193 status
.str(string());
194 if (!timestamp
.is_zero()) {
195 history
.push_back(StatusItem(timestamp
, s
));
197 if (history
.size() > (size_t)max_history
) {
200 timestamp
= ceph_clock_now();
205 RGWCoroutinesManager::~RGWCoroutinesManager() {
207 completion_mgr
->put();
209 cr_registry
->remove(this);
213 int64_t RGWCoroutinesManager::get_next_io_id()
215 return (int64_t)++max_io_id
;
218 uint64_t RGWCoroutinesManager::get_next_stack_id() {
219 return (uint64_t)++max_stack_id
;
222 RGWCoroutinesStack::RGWCoroutinesStack(CephContext
*_cct
, RGWCoroutinesManager
*_ops_mgr
, RGWCoroutine
*start
) : cct(_cct
), ops_mgr(_ops_mgr
),
223 done_flag(false), error_flag(false), blocked_flag(false),
224 sleep_flag(false), interval_wait_flag(false), is_scheduled(false), is_waiting_for_child(false),
225 retcode(0), run_count(0),
226 env(NULL
), parent(NULL
)
228 id
= ops_mgr
->get_next_stack_id();
230 ops
.push_back(start
);
235 RGWCoroutinesStack::~RGWCoroutinesStack()
237 for (auto op
: ops
) {
241 for (auto stack
: spawned
.entries
) {
245 if (preallocated_stack
) {
246 preallocated_stack
->put();
250 int RGWCoroutinesStack::operate(const DoutPrefixProvider
*dpp
, RGWCoroutinesEnv
*_env
)
253 RGWCoroutine
*op
= *pos
;
255 ldpp_dout(dpp
, 20) << *op
<< ": operate()" << dendl
;
256 int r
= op
->operate_wrapper(dpp
);
258 ldpp_dout(dpp
, 20) << *op
<< ": operate() returned r=" << r
<< dendl
;
261 error_flag
= op
->is_error();
265 r
= unwind(op_retcode
);
267 done_flag
= (pos
== ops
.end());
268 blocked_flag
&= !done_flag
;
270 retcode
= op_retcode
;
275 /* should r ever be negative at this point? */
281 string
RGWCoroutinesStack::error_str()
283 if (pos
!= ops
.end()) {
284 return (*pos
)->error_str();
289 void RGWCoroutinesStack::call(RGWCoroutine
*next_op
) {
293 ops
.push_back(next_op
);
294 if (pos
!= ops
.end()) {
301 void RGWCoroutinesStack::schedule()
303 env
->manager
->schedule(env
, this);
306 void RGWCoroutinesStack::_schedule()
308 env
->manager
->_schedule(env
, this);
311 RGWCoroutinesStack
*RGWCoroutinesStack::spawn(RGWCoroutine
*source_op
, RGWCoroutine
*op
, bool wait
)
317 rgw_spawned_stacks
*s
= (source_op
? &source_op
->spawned
: &spawned
);
319 RGWCoroutinesStack
*stack
= preallocated_stack
;
321 stack
= env
->manager
->allocate_stack();
323 preallocated_stack
= nullptr;
325 s
->add_pending(stack
);
326 stack
->parent
= this;
328 stack
->get(); /* we'll need to collect the stack */
331 env
->manager
->schedule(env
, stack
);
334 set_blocked_by(stack
);
340 RGWCoroutinesStack
*RGWCoroutinesStack::spawn(RGWCoroutine
*op
, bool wait
)
342 return spawn(NULL
, op
, wait
);
345 RGWCoroutinesStack
*RGWCoroutinesStack::prealloc_stack()
347 if (!preallocated_stack
) {
348 preallocated_stack
= env
->manager
->allocate_stack();
350 return preallocated_stack
;
353 int RGWCoroutinesStack::wait(const utime_t
& interval
)
355 RGWCompletionManager
*completion_mgr
= env
->manager
->get_completion_mgr();
356 completion_mgr
->wait_interval((void *)this, interval
, (void *)this);
357 set_io_blocked(true);
358 set_interval_wait(true);
362 void RGWCoroutinesStack::wakeup()
364 RGWCompletionManager
*completion_mgr
= env
->manager
->get_completion_mgr();
365 completion_mgr
->wakeup((void *)this);
368 void RGWCoroutinesStack::io_complete(const rgw_io_id
& io_id
)
370 RGWCompletionManager
*completion_mgr
= env
->manager
->get_completion_mgr();
371 completion_mgr
->complete(nullptr, io_id
, (void *)this);
374 int RGWCoroutinesStack::unwind(int retcode
)
376 rgw_spawned_stacks
*src_spawned
= &(*pos
)->spawned
;
378 if (pos
== ops
.begin()) {
379 ldout(cct
, 15) << "stack " << (void *)this << " end" << dendl
;
380 spawned
.inherit(src_spawned
);
388 RGWCoroutine
*op
= *pos
;
389 op
->set_retcode(retcode
);
390 op
->spawned
.inherit(src_spawned
);
394 void RGWCoroutinesStack::cancel()
396 while (!ops
.empty()) {
397 RGWCoroutine
*op
= *pos
;
404 bool RGWCoroutinesStack::collect(RGWCoroutine
*op
, int *ret
, RGWCoroutinesStack
*skip_stack
, uint64_t *stack_id
) /* returns true if needs to be called again */
406 bool need_retry
= false;
407 rgw_spawned_stacks
*s
= (op
? &op
->spawned
: &spawned
);
409 vector
<RGWCoroutinesStack
*> new_list
;
411 for (vector
<RGWCoroutinesStack
*>::iterator iter
= s
->entries
.begin(); iter
!= s
->entries
.end(); ++iter
) {
412 RGWCoroutinesStack
*stack
= *iter
;
413 if (stack
== skip_stack
|| !stack
->is_done()) {
414 new_list
.push_back(stack
);
415 if (!stack
->is_done()) {
416 ldout(cct
, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack
<< " is still running" << dendl
;
417 } else if (stack
== skip_stack
) {
418 ldout(cct
, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack
<< " explicitly skipping stack" << dendl
;
423 *stack_id
= stack
->get_id();
425 int r
= stack
->get_ret_status();
429 ldout(cct
, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack
<< " encountered error (r=" << r
<< "), skipping next stacks" << dendl
;
430 new_list
.insert(new_list
.end(), ++iter
, s
->entries
.end());
431 need_retry
= (iter
!= s
->entries
.end());
435 ldout(cct
, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack
<< " is complete" << dendl
;
438 s
->entries
.swap(new_list
);
442 bool RGWCoroutinesStack::collect_next(RGWCoroutine
*op
, int *ret
, RGWCoroutinesStack
**collected_stack
) /* returns true if found a stack to collect */
444 rgw_spawned_stacks
*s
= (op
? &op
->spawned
: &spawned
);
447 if (collected_stack
) {
448 *collected_stack
= NULL
;
451 for (vector
<RGWCoroutinesStack
*>::iterator iter
= s
->entries
.begin(); iter
!= s
->entries
.end(); ++iter
) {
452 RGWCoroutinesStack
*stack
= *iter
;
453 if (!stack
->is_done()) {
456 int r
= stack
->get_ret_status();
461 if (collected_stack
) {
462 *collected_stack
= stack
;
466 s
->entries
.erase(iter
);
473 bool RGWCoroutinesStack::collect(int *ret
, RGWCoroutinesStack
*skip_stack
, uint64_t *stack_id
) /* returns true if needs to be called again */
475 return collect(NULL
, ret
, skip_stack
, stack_id
);
478 static void _aio_completion_notifier_cb(librados::completion_t cb
, void *arg
)
480 (static_cast<RGWAioCompletionNotifier
*>(arg
))->cb();
483 RGWAioCompletionNotifier::RGWAioCompletionNotifier(RGWCompletionManager
*_mgr
, const rgw_io_id
& _io_id
, void *_user_data
) : completion_mgr(_mgr
),
485 user_data(_user_data
), registered(true) {
486 c
= librados::Rados::aio_create_completion(this, _aio_completion_notifier_cb
);
489 RGWAioCompletionNotifier
*RGWCoroutinesStack::create_completion_notifier()
491 return ops_mgr
->create_completion_notifier(this);
494 RGWCompletionManager
*RGWCoroutinesStack::get_completion_mgr()
496 return ops_mgr
->get_completion_mgr();
499 bool RGWCoroutinesStack::unblock_stack(RGWCoroutinesStack
**s
)
501 if (blocking_stacks
.empty()) {
505 set
<RGWCoroutinesStack
*>::iterator iter
= blocking_stacks
.begin();
507 blocking_stacks
.erase(iter
);
508 (*s
)->blocked_by_stack
.erase(this);
513 void RGWCoroutinesManager::report_error(RGWCoroutinesStack
*op
)
518 string err
= op
->error_str();
522 lderr(cct
) << "ERROR: failed operation: " << op
->error_str() << dendl
;
525 void RGWCoroutinesStack::dump(Formatter
*f
) const {
528 ::encode_json("stack", ss
.str(), f
);
529 ::encode_json("run_count", run_count
, f
);
530 f
->open_array_section("ops");
531 for (auto& i
: ops
) {
532 encode_json("op", *i
, f
);
537 void RGWCoroutinesStack::init_new_io(RGWIOProvider
*io_provider
)
539 io_provider
->set_io_user_info((void *)this);
540 io_provider
->assign_io(env
->manager
->get_io_id_provider());
543 bool RGWCoroutinesStack::try_io_unblock(const rgw_io_id
& io_id
)
545 if (!can_io_unblock(io_id
)) {
546 auto p
= io_finish_ids
.emplace(io_id
.id
, io_id
);
547 auto& iter
= p
.first
;
548 bool inserted
= p
.second
;
549 if (!inserted
) { /* could not insert, entry already existed, add channel to completion mask */
550 iter
->second
.channels
|= io_id
.channels
;
558 bool RGWCoroutinesStack::consume_io_finish(const rgw_io_id
& io_id
)
560 auto iter
= io_finish_ids
.find(io_id
.id
);
561 if (iter
== io_finish_ids
.end()) {
564 int finish_mask
= iter
->second
.channels
;
565 bool found
= (finish_mask
& io_id
.channels
) != 0;
567 finish_mask
&= ~(finish_mask
& io_id
.channels
);
569 if (finish_mask
== 0) {
570 io_finish_ids
.erase(iter
);
576 void RGWCoroutinesManager::handle_unblocked_stack(set
<RGWCoroutinesStack
*>& context_stacks
, list
<RGWCoroutinesStack
*>& scheduled_stacks
,
577 RGWCompletionManager::io_completion
& io
, int *blocked_count
)
579 ceph_assert(ceph_mutex_is_wlocked(lock
));
580 RGWCoroutinesStack
*stack
= static_cast<RGWCoroutinesStack
*>(io
.user_info
);
581 if (context_stacks
.find(stack
) == context_stacks
.end()) {
584 if (!stack
->try_io_unblock(io
.io_id
)) {
587 if (stack
->is_io_blocked()) {
589 stack
->set_io_blocked(false);
591 stack
->set_interval_wait(false);
592 if (!stack
->is_done()) {
593 if (!stack
->is_scheduled
) {
594 scheduled_stacks
.push_back(stack
);
595 stack
->set_is_scheduled(true);
598 context_stacks
.erase(stack
);
603 void RGWCoroutinesManager::schedule(RGWCoroutinesEnv
*env
, RGWCoroutinesStack
*stack
)
605 std::unique_lock wl
{lock
};
606 _schedule(env
, stack
);
609 void RGWCoroutinesManager::_schedule(RGWCoroutinesEnv
*env
, RGWCoroutinesStack
*stack
)
611 ceph_assert(ceph_mutex_is_wlocked(lock
));
612 if (!stack
->is_scheduled
) {
613 env
->scheduled_stacks
->push_back(stack
);
614 stack
->set_is_scheduled(true);
616 set
<RGWCoroutinesStack
*>& context_stacks
= run_contexts
[env
->run_context
];
617 context_stacks
.insert(stack
);
620 void RGWCoroutinesManager::set_sleeping(RGWCoroutine
*cr
, bool flag
)
622 cr
->set_sleeping(flag
);
625 void RGWCoroutinesManager::io_complete(RGWCoroutine
*cr
, const rgw_io_id
& io_id
)
627 cr
->io_complete(io_id
);
630 int RGWCoroutinesManager::run(const DoutPrefixProvider
*dpp
, list
<RGWCoroutinesStack
*>& stacks
)
633 int blocked_count
= 0;
634 int interval_wait_count
= 0;
635 bool canceled
= false; // set on going_down
636 RGWCoroutinesEnv env
;
639 uint64_t run_context
= ++run_context_count
;
642 set
<RGWCoroutinesStack
*>& context_stacks
= run_contexts
[run_context
];
643 list
<RGWCoroutinesStack
*> scheduled_stacks
;
644 for (auto& st
: stacks
) {
645 context_stacks
.insert(st
);
646 scheduled_stacks
.push_back(st
);
647 st
->set_is_scheduled(true);
649 env
.run_context
= run_context
;
651 env
.scheduled_stacks
= &scheduled_stacks
;
653 for (list
<RGWCoroutinesStack
*>::iterator iter
= scheduled_stacks
.begin(); iter
!= scheduled_stacks
.end() && !going_down
;) {
654 RGWCompletionManager::io_completion io
;
655 RGWCoroutinesStack
*stack
= *iter
;
657 scheduled_stacks
.pop_front();
659 if (context_stacks
.find(stack
) == context_stacks
.end()) {
660 /* stack was probably schedule more than once due to IO, but was since complete */
667 ret
= stack
->operate(dpp
, &env
);
671 stack
->set_is_scheduled(false);
673 ldpp_dout(dpp
, 20) << "stack->operate() returned ret=" << ret
<< dendl
;
676 if (stack
->is_error()) {
680 op_not_blocked
= false;
682 if (stack
->is_io_blocked()) {
683 ldout(cct
, 20) << __func__
<< ":" << " stack=" << (void *)stack
<< " is io blocked" << dendl
;
684 if (stack
->is_interval_waiting()) {
685 interval_wait_count
++;
688 } else if (stack
->is_blocked()) {
689 /* do nothing, we'll re-add the stack when the blocking stack is done,
690 * or when we're awaken
692 ldout(cct
, 20) << __func__
<< ":" << " stack=" << (void *)stack
<< " is_blocked_by_stack()=" << stack
->is_blocked_by_stack()
693 << " is_sleeping=" << stack
->is_sleeping() << " waiting_for_child()=" << stack
->waiting_for_child() << dendl
;
694 } else if (stack
->is_done()) {
695 ldout(cct
, 20) << __func__
<< ":" << " stack=" << (void *)stack
<< " is done" << dendl
;
696 RGWCoroutinesStack
*s
;
697 while (stack
->unblock_stack(&s
)) {
698 if (!s
->is_blocked_by_stack() && !s
->is_done()) {
699 if (s
->is_io_blocked()) {
700 if (stack
->is_interval_waiting()) {
701 interval_wait_count
++;
709 if (stack
->parent
&& stack
->parent
->waiting_for_child()) {
710 stack
->parent
->set_wait_for_child(false);
711 stack
->parent
->_schedule();
713 context_stacks
.erase(stack
);
717 op_not_blocked
= true;
722 if (!op_not_blocked
&& stack
) {
723 stack
->run_count
= 0;
726 while (completion_mgr
->try_get_next(&io
)) {
727 handle_unblocked_stack(context_stacks
, scheduled_stacks
, io
, &blocked_count
);
731 * only account blocked operations that are not in interval_wait, these are stacks that
732 * were put on a wait without any real IO operations. While we mark these as io_blocked,
733 * these aren't really waiting for IOs
735 while (blocked_count
- interval_wait_count
>= ops_window
) {
737 ret
= completion_mgr
->get_next(&io
);
740 ldout(cct
, 5) << "completion_mgr.get_next() returned ret=" << ret
<< dendl
;
742 handle_unblocked_stack(context_stacks
, scheduled_stacks
, io
, &blocked_count
);
746 while (scheduled_stacks
.empty() && blocked_count
> 0) {
748 ret
= completion_mgr
->get_next(&io
);
751 ldout(cct
, 5) << "completion_mgr.get_next() returned ret=" << ret
<< dendl
;
754 ldout(cct
, 5) << __func__
<< "(): was stopped, exiting" << dendl
;
759 handle_unblocked_stack(context_stacks
, scheduled_stacks
, io
, &blocked_count
);
760 iter
= scheduled_stacks
.begin();
766 if (iter
== scheduled_stacks
.end()) {
767 iter
= scheduled_stacks
.begin();
771 if (!context_stacks
.empty() && !going_down
) {
772 JSONFormatter
formatter(true);
773 formatter
.open_array_section("context_stacks");
774 for (auto& s
: context_stacks
) {
775 ::encode_json("entry", *s
, &formatter
);
777 formatter
.close_section();
778 lderr(cct
) << __func__
<< "(): ERROR: deadlock detected, dumping remaining coroutines:\n";
779 formatter
.flush(*_dout
);
781 ceph_assert(context_stacks
.empty() || going_down
); // assert on deadlock
784 for (auto stack
: context_stacks
) {
785 ldout(cct
, 20) << "clearing stack on run() exit: stack=" << (void *)stack
<< " nref=" << stack
->get_nref() << dendl
;
788 run_contexts
.erase(run_context
);
794 int RGWCoroutinesManager::run(const DoutPrefixProvider
*dpp
, RGWCoroutine
*op
)
799 list
<RGWCoroutinesStack
*> stacks
;
800 RGWCoroutinesStack
*stack
= allocate_stack();
804 stacks
.push_back(stack
);
806 int r
= run(dpp
, stacks
);
808 ldpp_dout(dpp
, 20) << "run(stacks) returned r=" << r
<< dendl
;
810 r
= op
->get_ret_status();
817 RGWAioCompletionNotifier
*RGWCoroutinesManager::create_completion_notifier(RGWCoroutinesStack
*stack
)
819 rgw_io_id io_id
{get_next_io_id(), -1};
820 RGWAioCompletionNotifier
*cn
= new RGWAioCompletionNotifier(completion_mgr
, io_id
, (void *)stack
);
821 completion_mgr
->register_completion_notifier(cn
);
825 void RGWCoroutinesManager::dump(Formatter
*f
) const {
826 std::shared_lock rl
{lock
};
828 f
->open_array_section("run_contexts");
829 for (auto& i
: run_contexts
) {
830 f
->open_object_section("context");
831 ::encode_json("id", i
.first
, f
);
832 f
->open_array_section("entries");
833 for (auto& s
: i
.second
) {
834 ::encode_json("entry", *s
, f
);
842 RGWCoroutinesStack
*RGWCoroutinesManager::allocate_stack() {
843 return new RGWCoroutinesStack(cct
, this);
846 string
RGWCoroutinesManager::get_id()
856 void RGWCoroutinesManagerRegistry::add(RGWCoroutinesManager
*mgr
)
858 std::unique_lock wl
{lock
};
859 if (managers
.find(mgr
) == managers
.end()) {
860 managers
.insert(mgr
);
865 void RGWCoroutinesManagerRegistry::remove(RGWCoroutinesManager
*mgr
)
867 std::unique_lock wl
{lock
};
868 if (managers
.find(mgr
) != managers
.end()) {
874 RGWCoroutinesManagerRegistry::~RGWCoroutinesManagerRegistry()
876 AdminSocket
*admin_socket
= cct
->get_admin_socket();
877 if (!admin_command
.empty()) {
878 admin_socket
->unregister_commands(this);
882 int RGWCoroutinesManagerRegistry::hook_to_admin_command(const string
& command
)
884 AdminSocket
*admin_socket
= cct
->get_admin_socket();
885 if (!admin_command
.empty()) {
886 admin_socket
->unregister_commands(this);
888 admin_command
= command
;
889 int r
= admin_socket
->register_command(admin_command
, this,
890 "dump current coroutines stack state");
892 lderr(cct
) << "ERROR: fail to register admin socket command (r=" << r
<< ")" << dendl
;
898 int RGWCoroutinesManagerRegistry::call(std::string_view command
,
899 const cmdmap_t
& cmdmap
,
903 std::shared_lock rl
{lock
};
904 ::encode_json("cr_managers", *this, f
);
908 void RGWCoroutinesManagerRegistry::dump(Formatter
*f
) const {
909 f
->open_array_section("coroutine_managers");
910 for (auto m
: managers
) {
911 ::encode_json("entry", *m
, f
);
916 void RGWCoroutine::call(RGWCoroutine
*op
)
921 // the call()er expects this to set a retcode
926 RGWCoroutinesStack
*RGWCoroutine::spawn(RGWCoroutine
*op
, bool wait
)
928 return stack
->spawn(this, op
, wait
);
931 RGWCoroutinesStack
*RGWCoroutine::prealloc_stack()
933 return stack
->prealloc_stack();
936 uint64_t RGWCoroutine::prealloc_stack_id()
938 return prealloc_stack()->get_id();
941 bool RGWCoroutine::collect(int *ret
, RGWCoroutinesStack
*skip_stack
, uint64_t *stack_id
) /* returns true if needs to be called again */
943 return stack
->collect(this, ret
, skip_stack
, stack_id
);
946 bool RGWCoroutine::collect_next(int *ret
, RGWCoroutinesStack
**collected_stack
) /* returns true if found a stack to collect */
948 return stack
->collect_next(this, ret
, collected_stack
);
951 int RGWCoroutine::wait(const utime_t
& interval
)
953 return stack
->wait(interval
);
956 void RGWCoroutine::wait_for_child()
958 /* should only wait for child if there is a child that is not done yet, and no complete children */
959 if (spawned
.entries
.empty()) {
962 for (vector
<RGWCoroutinesStack
*>::iterator iter
= spawned
.entries
.begin(); iter
!= spawned
.entries
.end(); ++iter
) {
963 if ((*iter
)->is_done()) {
967 stack
->set_wait_for_child(true);
970 string
RGWCoroutine::to_str() const
972 return typeid(*this).name();
975 ostream
& operator<<(ostream
& out
, const RGWCoroutine
& cr
)
977 out
<< "cr:s=" << (void *)cr
.get_stack() << ":op=" << (void *)&cr
<< ":" << typeid(cr
).name();
981 bool RGWCoroutine::drain_children(int num_cr_left
,
982 RGWCoroutinesStack
*skip_stack
,
983 std::optional
<std::function
<void(uint64_t stack_id
, int ret
)> > cb
)
986 ceph_assert(num_cr_left
>= 0);
987 if (num_cr_left
== 0 && skip_stack
) {
990 reenter(&drain_status
.cr
) {
991 while (num_spawned() > (size_t)num_cr_left
) {
992 yield
wait_for_child();
997 again
= collect(&ret
, skip_stack
, &stack_id
);
999 ldout(cct
, 10) << "collect() returned ret=" << ret
<< dendl
;
1000 /* we should have reported this error */
1001 log_error() << "ERROR: collect() returned error (ret=" << ret
<< ")";
1004 (*cb
)(stack_id
, ret
);
1013 bool RGWCoroutine::drain_children(int num_cr_left
,
1014 std::optional
<std::function
<int(uint64_t stack_id
, int ret
)> > cb
)
1017 ceph_assert(num_cr_left
>= 0);
1019 reenter(&drain_status
.cr
) {
1020 while (num_spawned() > (size_t)num_cr_left
) {
1021 yield
wait_for_child();
1026 again
= collect(&ret
, nullptr, &stack_id
);
1028 ldout(cct
, 10) << "collect() returned ret=" << ret
<< dendl
;
1029 /* we should have reported this error */
1030 log_error() << "ERROR: collect() returned error (ret=" << ret
<< ")";
1032 if (cb
&& !drain_status
.should_exit
) {
1033 int r
= (*cb
)(stack_id
, ret
);
1035 drain_status
.ret
= r
;
1036 drain_status
.should_exit
= true;
1037 num_cr_left
= 0; /* need to drain all */
1047 void RGWCoroutine::wakeup()
1052 RGWCoroutinesEnv
*RGWCoroutine::get_env() const
1054 return stack
->get_env();
1057 void RGWCoroutine::dump(Formatter
*f
) const {
1058 if (!description
.str().empty()) {
1059 encode_json("description", description
.str(), f
);
1061 encode_json("type", to_str(), f
);
1062 if (!spawned
.entries
.empty()) {
1063 f
->open_array_section("spawned");
1064 for (auto& i
: spawned
.entries
) {
1066 snprintf(buf
, sizeof(buf
), "%p", (void *)i
);
1067 encode_json("stack", string(buf
), f
);
1071 if (!status
.history
.empty()) {
1072 encode_json("history", status
.history
, f
);
1075 if (!status
.status
.str().empty()) {
1076 f
->open_object_section("status");
1077 encode_json("status", status
.status
.str(), f
);
1078 encode_json("timestamp", status
.timestamp
, f
);
1083 RGWSimpleCoroutine::~RGWSimpleCoroutine()
1085 if (!called_cleanup
) {
1090 void RGWSimpleCoroutine::call_cleanup()
1092 called_cleanup
= true;
1096 int RGWSimpleCoroutine::operate(const DoutPrefixProvider
*dpp
)
1100 yield
return state_init();
1101 yield
return state_send_request(dpp
);
1102 yield
return state_request_complete();
1103 yield
return state_all_complete();
1106 return set_state(RGWCoroutine_Done
, ret
);
1111 int RGWSimpleCoroutine::state_init()
1116 return set_state(RGWCoroutine_Error
, ret
);
1121 int RGWSimpleCoroutine::state_send_request(const DoutPrefixProvider
*dpp
)
1123 int ret
= send_request(dpp
);
1126 return set_state(RGWCoroutine_Error
, ret
);
1131 int RGWSimpleCoroutine::state_request_complete()
1133 int ret
= request_complete();
1136 return set_state(RGWCoroutine_Error
, ret
);
1141 int RGWSimpleCoroutine::state_all_complete()
1146 return set_state(RGWCoroutine_Error
, ret
);