2 #include "common/ceph_json.h"
3 #include "rgw_coroutine.h"
5 // re-include our assert to clobber the system one; fix dout:
6 #include "include/assert.h"
8 #include <boost/asio/yield.hpp>
10 #define dout_subsys ceph_subsys_rgw
13 class RGWCompletionManager::WaitContext
: public Context
{
14 RGWCompletionManager
*manager
;
17 WaitContext(RGWCompletionManager
*_cm
, void *_opaque
) : manager(_cm
), opaque(_opaque
) {}
18 void finish(int r
) override
{
19 manager
->_wakeup(opaque
);
23 RGWCompletionManager::RGWCompletionManager(CephContext
*_cct
) : cct(_cct
), lock("RGWCompletionManager::lock"),
29 RGWCompletionManager::~RGWCompletionManager()
31 Mutex::Locker
l(lock
);
32 timer
.cancel_all_events();
36 void RGWCompletionManager::complete(RGWAioCompletionNotifier
*cn
, void *user_info
)
38 Mutex::Locker
l(lock
);
39 _complete(cn
, user_info
);
42 void RGWCompletionManager::register_completion_notifier(RGWAioCompletionNotifier
*cn
)
44 Mutex::Locker
l(lock
);
50 void RGWCompletionManager::unregister_completion_notifier(RGWAioCompletionNotifier
*cn
)
52 Mutex::Locker
l(lock
);
58 void RGWCompletionManager::_complete(RGWAioCompletionNotifier
*cn
, void *user_info
)
63 complete_reqs
.push_back(user_info
);
67 int RGWCompletionManager::get_next(void **user_info
)
69 Mutex::Locker
l(lock
);
70 while (complete_reqs
.empty()) {
76 *user_info
= complete_reqs
.front();
77 complete_reqs
.pop_front();
81 bool RGWCompletionManager::try_get_next(void **user_info
)
83 Mutex::Locker
l(lock
);
84 if (complete_reqs
.empty()) {
87 *user_info
= complete_reqs
.front();
88 complete_reqs
.pop_front();
92 void RGWCompletionManager::go_down()
94 Mutex::Locker
l(lock
);
102 void RGWCompletionManager::wait_interval(void *opaque
, const utime_t
& interval
, void *user_info
)
104 Mutex::Locker
l(lock
);
105 assert(waiters
.find(opaque
) == waiters
.end());
106 waiters
[opaque
] = user_info
;
107 timer
.add_event_after(interval
, new WaitContext(this, opaque
));
110 void RGWCompletionManager::wakeup(void *opaque
)
112 Mutex::Locker
l(lock
);
116 void RGWCompletionManager::_wakeup(void *opaque
)
118 map
<void *, void *>::iterator iter
= waiters
.find(opaque
);
119 if (iter
!= waiters
.end()) {
120 void *user_id
= iter
->second
;
122 _complete(NULL
, user_id
);
126 RGWCoroutine::~RGWCoroutine() {
127 for (auto stack
: spawned
.entries
) {
132 void RGWCoroutine::set_io_blocked(bool flag
) {
133 stack
->set_io_blocked(flag
);
136 void RGWCoroutine::set_sleeping(bool flag
) {
137 stack
->set_sleeping(flag
);
140 int RGWCoroutine::io_block(int ret
) {
141 set_io_blocked(true);
145 void RGWCoroutine::StatusItem::dump(Formatter
*f
) const {
146 ::encode_json("timestamp", timestamp
, f
);
147 ::encode_json("status", status
, f
);
150 stringstream
& RGWCoroutine::Status::set_status()
152 RWLock::WLocker
l(lock
);
153 string s
= status
.str();
154 status
.str(string());
155 if (!timestamp
.is_zero()) {
156 history
.push_back(StatusItem(timestamp
, s
));
158 if (history
.size() > (size_t)max_history
) {
161 timestamp
= ceph_clock_now();
166 RGWCoroutinesStack::RGWCoroutinesStack(CephContext
*_cct
, RGWCoroutinesManager
*_ops_mgr
, RGWCoroutine
*start
) : cct(_cct
), ops_mgr(_ops_mgr
),
167 done_flag(false), error_flag(false), blocked_flag(false),
168 sleep_flag(false), interval_wait_flag(false), is_scheduled(false), is_waiting_for_child(false),
169 retcode(0), run_count(0),
170 env(NULL
), parent(NULL
)
173 ops
.push_back(start
);
178 RGWCoroutinesStack::~RGWCoroutinesStack()
180 for (auto op
: ops
) {
184 for (auto stack
: spawned
.entries
) {
189 int RGWCoroutinesStack::operate(RGWCoroutinesEnv
*_env
)
192 RGWCoroutine
*op
= *pos
;
194 ldout(cct
, 20) << *op
<< ": operate()" << dendl
;
195 int r
= op
->operate();
197 ldout(cct
, 20) << *op
<< ": operate() returned r=" << r
<< dendl
;
200 error_flag
= op
->is_error();
204 r
= unwind(op_retcode
);
206 done_flag
= (pos
== ops
.end());
208 retcode
= op_retcode
;
213 /* should r ever be negative at this point? */
219 string
RGWCoroutinesStack::error_str()
221 if (pos
!= ops
.end()) {
222 return (*pos
)->error_str();
227 void RGWCoroutinesStack::call(RGWCoroutine
*next_op
) {
231 ops
.push_back(next_op
);
232 if (pos
!= ops
.end()) {
239 RGWCoroutinesStack
*RGWCoroutinesStack::spawn(RGWCoroutine
*source_op
, RGWCoroutine
*op
, bool wait
)
245 rgw_spawned_stacks
*s
= (source_op
? &source_op
->spawned
: &spawned
);
247 RGWCoroutinesStack
*stack
= env
->manager
->allocate_stack();
248 s
->add_pending(stack
);
249 stack
->parent
= this;
251 stack
->get(); /* we'll need to collect the stack */
254 env
->manager
->schedule(env
, stack
);
257 set_blocked_by(stack
);
263 RGWCoroutinesStack
*RGWCoroutinesStack::spawn(RGWCoroutine
*op
, bool wait
)
265 return spawn(NULL
, op
, wait
);
268 int RGWCoroutinesStack::wait(const utime_t
& interval
)
270 RGWCompletionManager
*completion_mgr
= env
->manager
->get_completion_mgr();
271 completion_mgr
->wait_interval((void *)this, interval
, (void *)this);
272 set_io_blocked(true);
273 set_interval_wait(true);
277 void RGWCoroutinesStack::wakeup()
279 RGWCompletionManager
*completion_mgr
= env
->manager
->get_completion_mgr();
280 completion_mgr
->wakeup((void *)this);
283 int RGWCoroutinesStack::unwind(int retcode
)
285 rgw_spawned_stacks
*src_spawned
= &(*pos
)->spawned
;
287 if (pos
== ops
.begin()) {
288 spawned
.inherit(src_spawned
);
296 RGWCoroutine
*op
= *pos
;
297 op
->set_retcode(retcode
);
298 op
->spawned
.inherit(src_spawned
);
303 bool RGWCoroutinesStack::collect(RGWCoroutine
*op
, int *ret
, RGWCoroutinesStack
*skip_stack
) /* returns true if needs to be called again */
306 rgw_spawned_stacks
*s
= (op
? &op
->spawned
: &spawned
);
308 vector
<RGWCoroutinesStack
*> new_list
;
310 for (vector
<RGWCoroutinesStack
*>::iterator iter
= s
->entries
.begin(); iter
!= s
->entries
.end(); ++iter
) {
311 RGWCoroutinesStack
*stack
= *iter
;
312 if (stack
== skip_stack
|| !stack
->is_done()) {
313 new_list
.push_back(stack
);
314 if (!stack
->is_done()) {
315 ldout(cct
, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack
<< " is still running" << dendl
;
316 } else if (stack
== skip_stack
) {
317 ldout(cct
, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack
<< " explicitly skipping stack" << dendl
;
321 int r
= stack
->get_ret_status();
325 ldout(cct
, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack
<< " encountered error (r=" << r
<< "), skipping next stacks" << dendl
;
326 new_list
.insert(new_list
.end(), ++iter
, s
->entries
.end());
327 done
&= (iter
!= s
->entries
.end());
331 ldout(cct
, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack
<< " is complete" << dendl
;
334 s
->entries
.swap(new_list
);
338 bool RGWCoroutinesStack::collect_next(RGWCoroutine
*op
, int *ret
, RGWCoroutinesStack
**collected_stack
) /* returns true if found a stack to collect */
340 rgw_spawned_stacks
*s
= (op
? &op
->spawned
: &spawned
);
343 if (collected_stack
) {
344 *collected_stack
= NULL
;
347 for (vector
<RGWCoroutinesStack
*>::iterator iter
= s
->entries
.begin(); iter
!= s
->entries
.end(); ++iter
) {
348 RGWCoroutinesStack
*stack
= *iter
;
349 if (!stack
->is_done()) {
352 int r
= stack
->get_ret_status();
357 if (collected_stack
) {
358 *collected_stack
= stack
;
362 s
->entries
.erase(iter
);
369 bool RGWCoroutinesStack::collect(int *ret
, RGWCoroutinesStack
*skip_stack
) /* returns true if needs to be called again */
371 return collect(NULL
, ret
, skip_stack
);
374 static void _aio_completion_notifier_cb(librados::completion_t cb
, void *arg
)
376 ((RGWAioCompletionNotifier
*)arg
)->cb();
379 RGWAioCompletionNotifier::RGWAioCompletionNotifier(RGWCompletionManager
*_mgr
, void *_user_data
) : completion_mgr(_mgr
),
380 user_data(_user_data
), lock("RGWAioCompletionNotifier"), registered(true) {
381 c
= librados::Rados::aio_create_completion((void *)this, NULL
,
382 _aio_completion_notifier_cb
);
385 RGWAioCompletionNotifier
*RGWCoroutinesStack::create_completion_notifier()
387 return ops_mgr
->create_completion_notifier(this);
390 RGWCompletionManager
*RGWCoroutinesStack::get_completion_mgr()
392 return ops_mgr
->get_completion_mgr();
395 bool RGWCoroutinesStack::unblock_stack(RGWCoroutinesStack
**s
)
397 if (blocking_stacks
.empty()) {
401 set
<RGWCoroutinesStack
*>::iterator iter
= blocking_stacks
.begin();
403 blocking_stacks
.erase(iter
);
404 (*s
)->blocked_by_stack
.erase(this);
409 void RGWCoroutinesManager::report_error(RGWCoroutinesStack
*op
)
414 string err
= op
->error_str();
418 lderr(cct
) << "ERROR: failed operation: " << op
->error_str() << dendl
;
421 void RGWCoroutinesStack::dump(Formatter
*f
) const {
424 ::encode_json("stack", ss
.str(), f
);
425 ::encode_json("run_count", run_count
, f
);
426 f
->open_array_section("ops");
427 for (auto& i
: ops
) {
428 encode_json("op", *i
, f
);
433 void RGWCoroutinesManager::handle_unblocked_stack(set
<RGWCoroutinesStack
*>& context_stacks
, list
<RGWCoroutinesStack
*>& scheduled_stacks
, RGWCoroutinesStack
*stack
, int *blocked_count
)
435 RWLock::WLocker
wl(lock
);
437 stack
->set_io_blocked(false);
438 stack
->set_interval_wait(false);
439 if (!stack
->is_done()) {
440 scheduled_stacks
.push_back(stack
);
442 RWLock::WLocker
wl(lock
);
443 context_stacks
.erase(stack
);
448 void RGWCoroutinesManager::schedule(RGWCoroutinesEnv
*env
, RGWCoroutinesStack
*stack
)
450 assert(lock
.is_wlocked());
451 env
->scheduled_stacks
->push_back(stack
);
452 set
<RGWCoroutinesStack
*>& context_stacks
= run_contexts
[env
->run_context
];
453 context_stacks
.insert(stack
);
456 int RGWCoroutinesManager::run(list
<RGWCoroutinesStack
*>& stacks
)
459 int blocked_count
= 0;
460 int interval_wait_count
= 0;
461 bool canceled
= false; // set on going_down
462 RGWCoroutinesEnv env
;
464 uint64_t run_context
= ++run_context_count
;
467 set
<RGWCoroutinesStack
*>& context_stacks
= run_contexts
[run_context
];
468 list
<RGWCoroutinesStack
*> scheduled_stacks
;
469 for (auto& st
: stacks
) {
470 context_stacks
.insert(st
);
471 scheduled_stacks
.push_back(st
);
475 env
.run_context
= run_context
;
477 env
.scheduled_stacks
= &scheduled_stacks
;
479 for (list
<RGWCoroutinesStack
*>::iterator iter
= scheduled_stacks
.begin(); iter
!= scheduled_stacks
.end() && !going_down
;) {
482 RGWCoroutinesStack
*stack
= *iter
;
485 ret
= stack
->operate(&env
);
486 stack
->set_is_scheduled(false);
488 ldout(cct
, 20) << "stack->operate() returned ret=" << ret
<< dendl
;
491 if (stack
->is_error()) {
495 bool op_not_blocked
= false;
497 if (stack
->is_io_blocked()) {
498 ldout(cct
, 20) << __func__
<< ":" << " stack=" << (void *)stack
<< " is io blocked" << dendl
;
499 if (stack
->is_interval_waiting()) {
500 interval_wait_count
++;
503 } else if (stack
->is_blocked()) {
504 /* do nothing, we'll re-add the stack when the blocking stack is done,
505 * or when we're awaken
507 ldout(cct
, 20) << __func__
<< ":" << " stack=" << (void *)stack
<< " is_blocked_by_stack()=" << stack
->is_blocked_by_stack()
508 << " is_sleeping=" << stack
->is_sleeping() << " waiting_for_child()=" << stack
->waiting_for_child() << dendl
;
509 } else if (stack
->is_done()) {
510 ldout(cct
, 20) << __func__
<< ":" << " stack=" << (void *)stack
<< " is done" << dendl
;
511 RGWCoroutinesStack
*s
;
512 while (stack
->unblock_stack(&s
)) {
513 if (!s
->is_blocked_by_stack() && !s
->is_done()) {
514 if (s
->is_io_blocked()) {
515 if (stack
->is_interval_waiting()) {
516 interval_wait_count
++;
524 if (stack
->parent
&& stack
->parent
->waiting_for_child()) {
525 stack
->parent
->set_wait_for_child(false);
526 stack
->parent
->schedule();
528 context_stacks
.erase(stack
);
532 op_not_blocked
= true;
537 if (!op_not_blocked
&& stack
) {
538 stack
->run_count
= 0;
543 RGWCoroutinesStack
*blocked_stack
;
544 while (completion_mgr
->try_get_next((void **)&blocked_stack
)) {
545 handle_unblocked_stack(context_stacks
, scheduled_stacks
, blocked_stack
, &blocked_count
);
549 * only account blocked operations that are not in interval_wait, these are stacks that
550 * were put on a wait without any real IO operations. While we mark these as io_blocked,
551 * these aren't really waiting for IOs
553 while (blocked_count
- interval_wait_count
>= ops_window
) {
554 ret
= completion_mgr
->get_next((void **)&blocked_stack
);
556 ldout(cct
, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret
<< dendl
;
558 handle_unblocked_stack(context_stacks
, scheduled_stacks
, blocked_stack
, &blocked_count
);
562 scheduled_stacks
.pop_front();
565 while (scheduled_stacks
.empty() && blocked_count
> 0) {
566 ret
= completion_mgr
->get_next((void **)&blocked_stack
);
568 ldout(cct
, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret
<< dendl
;
571 ldout(cct
, 5) << __func__
<< "(): was stopped, exiting" << dendl
;
576 handle_unblocked_stack(context_stacks
, scheduled_stacks
, blocked_stack
, &blocked_count
);
577 iter
= scheduled_stacks
.begin();
583 if (iter
== scheduled_stacks
.end()) {
584 iter
= scheduled_stacks
.begin();
589 if (!context_stacks
.empty() && !going_down
) {
590 JSONFormatter
formatter(true);
591 formatter
.open_array_section("context_stacks");
592 for (auto& s
: context_stacks
) {
593 ::encode_json("entry", *s
, &formatter
);
595 formatter
.close_section();
596 lderr(cct
) << __func__
<< "(): ERROR: deadlock detected, dumping remaining coroutines:\n";
597 formatter
.flush(*_dout
);
599 assert(context_stacks
.empty() || going_down
); // assert on deadlock
602 for (auto stack
: context_stacks
) {
603 ldout(cct
, 20) << "clearing stack on run() exit: stack=" << (void *)stack
<< " nref=" << stack
->get_nref() << dendl
;
606 run_contexts
.erase(run_context
);
612 int RGWCoroutinesManager::run(RGWCoroutine
*op
)
617 list
<RGWCoroutinesStack
*> stacks
;
618 RGWCoroutinesStack
*stack
= allocate_stack();
622 stack
->schedule(&stacks
);
626 ldout(cct
, 20) << "run(stacks) returned r=" << r
<< dendl
;
628 r
= op
->get_ret_status();
635 RGWAioCompletionNotifier
*RGWCoroutinesManager::create_completion_notifier(RGWCoroutinesStack
*stack
)
637 RGWAioCompletionNotifier
*cn
= new RGWAioCompletionNotifier(completion_mgr
, (void *)stack
);
638 completion_mgr
->register_completion_notifier(cn
);
642 void RGWCoroutinesManager::dump(Formatter
*f
) const {
643 RWLock::RLocker
rl(lock
);
645 f
->open_array_section("run_contexts");
646 for (auto& i
: run_contexts
) {
647 f
->open_object_section("context");
648 ::encode_json("id", i
.first
, f
);
649 f
->open_array_section("entries");
650 for (auto& s
: i
.second
) {
651 ::encode_json("entry", *s
, f
);
659 RGWCoroutinesStack
*RGWCoroutinesManager::allocate_stack() {
660 return new RGWCoroutinesStack(cct
, this);
663 string
RGWCoroutinesManager::get_id()
673 void RGWCoroutinesManagerRegistry::add(RGWCoroutinesManager
*mgr
)
675 RWLock::WLocker
wl(lock
);
676 if (managers
.find(mgr
) == managers
.end()) {
677 managers
.insert(mgr
);
682 void RGWCoroutinesManagerRegistry::remove(RGWCoroutinesManager
*mgr
)
684 RWLock::WLocker
wl(lock
);
685 if (managers
.find(mgr
) != managers
.end()) {
691 RGWCoroutinesManagerRegistry::~RGWCoroutinesManagerRegistry()
693 AdminSocket
*admin_socket
= cct
->get_admin_socket();
694 if (!admin_command
.empty()) {
695 admin_socket
->unregister_command(admin_command
);
699 int RGWCoroutinesManagerRegistry::hook_to_admin_command(const string
& command
)
701 AdminSocket
*admin_socket
= cct
->get_admin_socket();
702 if (!admin_command
.empty()) {
703 admin_socket
->unregister_command(admin_command
);
705 admin_command
= command
;
706 int r
= admin_socket
->register_command(admin_command
, admin_command
, this,
707 "dump current coroutines stack state");
709 lderr(cct
) << "ERROR: fail to register admin socket command (r=" << r
<< ")" << dendl
;
715 bool RGWCoroutinesManagerRegistry::call(std::string command
, cmdmap_t
& cmdmap
, std::string format
,
717 RWLock::RLocker
rl(lock
);
720 ::encode_json("cr_managers", *this, &f
);
726 void RGWCoroutinesManagerRegistry::dump(Formatter
*f
) const {
727 f
->open_array_section("coroutine_managers");
728 for (auto m
: managers
) {
729 ::encode_json("entry", *m
, f
);
734 void RGWCoroutine::call(RGWCoroutine
*op
)
739 RGWCoroutinesStack
*RGWCoroutine::spawn(RGWCoroutine
*op
, bool wait
)
741 return stack
->spawn(this, op
, wait
);
744 bool RGWCoroutine::collect(int *ret
, RGWCoroutinesStack
*skip_stack
) /* returns true if needs to be called again */
746 return stack
->collect(this, ret
, skip_stack
);
749 bool RGWCoroutine::collect_next(int *ret
, RGWCoroutinesStack
**collected_stack
) /* returns true if found a stack to collect */
751 return stack
->collect_next(this, ret
, collected_stack
);
754 int RGWCoroutine::wait(const utime_t
& interval
)
756 return stack
->wait(interval
);
759 void RGWCoroutine::wait_for_child()
761 /* should only wait for child if there is a child that is not done yet, and no complete children */
762 if (spawned
.entries
.empty()) {
765 for (vector
<RGWCoroutinesStack
*>::iterator iter
= spawned
.entries
.begin(); iter
!= spawned
.entries
.end(); ++iter
) {
766 if ((*iter
)->is_done()) {
770 stack
->set_wait_for_child(true);
773 string
RGWCoroutine::to_str() const
775 return typeid(*this).name();
778 ostream
& operator<<(ostream
& out
, const RGWCoroutine
& cr
)
780 out
<< "cr:s=" << (void *)cr
.get_stack() << ":op=" << (void *)&cr
<< ":" << typeid(cr
).name();
784 bool RGWCoroutine::drain_children(int num_cr_left
, RGWCoroutinesStack
*skip_stack
)
787 assert(num_cr_left
>= 0);
788 if (num_cr_left
== 0 && skip_stack
) {
792 while (num_spawned() > (size_t)num_cr_left
) {
793 yield
wait_for_child();
795 while (collect(&ret
, skip_stack
)) {
797 ldout(cct
, 10) << "collect() returned ret=" << ret
<< dendl
;
798 /* we should have reported this error */
799 log_error() << "ERROR: collect() returned error (ret=" << ret
<< ")";
808 void RGWCoroutine::wakeup()
813 void RGWCoroutine::dump(Formatter
*f
) const {
814 if (!description
.str().empty()) {
815 encode_json("description", description
.str(), f
);
817 encode_json("type", to_str(), f
);
818 if (!spawned
.entries
.empty()) {
819 f
->open_array_section("spawned");
820 for (auto& i
: spawned
.entries
) {
822 snprintf(buf
, sizeof(buf
), "%p", (void *)i
);
823 encode_json("stack", string(buf
), f
);
827 if (!status
.history
.empty()) {
828 encode_json("history", status
.history
, f
);
831 if (!status
.status
.str().empty()) {
832 f
->open_object_section("status");
833 encode_json("status", status
.status
.str(), f
);
834 encode_json("timestamp", status
.timestamp
, f
);
839 RGWSimpleCoroutine::~RGWSimpleCoroutine()
841 if (!called_cleanup
) {
846 void RGWSimpleCoroutine::call_cleanup()
848 called_cleanup
= true;
852 int RGWSimpleCoroutine::operate()
856 yield
return state_init();
857 yield
return state_send_request();
858 yield
return state_request_complete();
859 yield
return state_all_complete();
862 return set_state(RGWCoroutine_Done
, ret
);
867 int RGWSimpleCoroutine::state_init()
872 return set_state(RGWCoroutine_Error
, ret
);
877 int RGWSimpleCoroutine::state_send_request()
879 int ret
= send_request();
882 return set_state(RGWCoroutine_Error
, ret
);
887 int RGWSimpleCoroutine::state_request_complete()
889 int ret
= request_complete();
892 return set_state(RGWCoroutine_Error
, ret
);
897 int RGWSimpleCoroutine::state_all_complete()
902 return set_state(RGWCoroutine_Error
, ret
);