1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #ifndef CEPH_RGW_COROUTINE_H
5 #define CEPH_RGW_COROUTINE_H
9 #pragma push_macro("_ASSERT_H")
12 #include <boost/asio.hpp>
13 #include <boost/intrusive_ptr.hpp>
16 #pragma pop_macro("_ASSERT_H")
19 #include "include/utime.h"
20 #include "common/RefCountedObj.h"
21 #include "common/debug.h"
22 #include "common/Timer.h"
23 #include "common/admin_socket.h"
25 #include "rgw_common.h"
26 #include <boost/asio/coroutine.hpp>
30 #define RGW_ASYNC_OPS_MGR_WINDOW 100
32 class RGWCoroutinesStack
;
33 class RGWCoroutinesManager
;
34 class RGWAioCompletionNotifier
;
36 class RGWCompletionManager
: public RefCountedObject
{
37 friend class RGWCoroutinesManager
;
41 struct io_completion
{
45 list
<io_completion
> complete_reqs
;
46 set
<rgw_io_id
> complete_reqs_set
;
47 using NotifierRef
= boost::intrusive_ptr
<RGWAioCompletionNotifier
>;
55 std::atomic
<bool> going_down
= { false };
57 map
<void *, void *> waiters
;
62 void _wakeup(void *opaque
);
63 void _complete(RGWAioCompletionNotifier
*cn
, const rgw_io_id
& io_id
, void *user_info
);
65 explicit RGWCompletionManager(CephContext
*_cct
);
66 ~RGWCompletionManager() override
;
68 void complete(RGWAioCompletionNotifier
*cn
, const rgw_io_id
& io_id
, void *user_info
);
69 int get_next(io_completion
*io
);
70 bool try_get_next(io_completion
*io
);
75 * wait for interval length to complete user_info
77 void wait_interval(void *opaque
, const utime_t
& interval
, void *user_info
);
78 void wakeup(void *opaque
);
80 void register_completion_notifier(RGWAioCompletionNotifier
*cn
);
81 void unregister_completion_notifier(RGWAioCompletionNotifier
*cn
);
84 /* a single use librados aio completion notifier that hooks into the RGWCompletionManager */
85 class RGWAioCompletionNotifier
: public RefCountedObject
{
86 librados::AioCompletion
*c
;
87 RGWCompletionManager
*completion_mgr
;
94 RGWAioCompletionNotifier(RGWCompletionManager
*_mgr
, const rgw_io_id
& _io_id
, void *_user_data
);
95 ~RGWAioCompletionNotifier() override
{
98 bool need_unregister
= registered
;
100 completion_mgr
->get();
104 if (need_unregister
) {
105 completion_mgr
->unregister_completion_notifier(this);
106 completion_mgr
->put();
110 librados::AioCompletion
*completion() {
115 Mutex::Locker
l(lock
);
129 completion_mgr
->get();
132 completion_mgr
->complete(this, io_id
, user_data
);
133 completion_mgr
->put();
138 // completion notifier with opaque payload (ie a reference-counted pointer)
139 template <typename T
>
140 class RGWAioCompletionNotifierWith
: public RGWAioCompletionNotifier
{
143 RGWAioCompletionNotifierWith(RGWCompletionManager
*mgr
,
144 const rgw_io_id
& io_id
, void *user_data
,
146 : RGWAioCompletionNotifier(mgr
, io_id
, user_data
), value(std::move(value
))
150 struct RGWCoroutinesEnv
{
151 uint64_t run_context
;
152 RGWCoroutinesManager
*manager
;
153 list
<RGWCoroutinesStack
*> *scheduled_stacks
;
154 RGWCoroutinesStack
*stack
;
156 RGWCoroutinesEnv() : run_context(0), manager(NULL
), scheduled_stacks(NULL
), stack(NULL
) {}
159 enum RGWCoroutineState
{
160 RGWCoroutine_Error
= -2,
161 RGWCoroutine_Done
= -1,
162 RGWCoroutine_Run
= 0,
165 struct rgw_spawned_stacks
{
166 vector
<RGWCoroutinesStack
*> entries
;
168 rgw_spawned_stacks() {}
170 void add_pending(RGWCoroutinesStack
*s
) {
171 entries
.push_back(s
);
174 void inherit(rgw_spawned_stacks
*source
) {
175 for (vector
<RGWCoroutinesStack
*>::iterator iter
= source
->entries
.begin();
176 iter
!= source
->entries
.end(); ++iter
) {
179 source
->entries
.clear();
185 class RGWCoroutine
: public RefCountedObject
, public boost::asio::coroutine
{
186 friend class RGWCoroutinesStack
;
192 StatusItem(utime_t
& t
, const string
& s
) : timestamp(t
), status(s
) {}
194 void dump(Formatter
*f
) const;
197 #define MAX_COROUTINE_HISTORY 10
207 explicit Status(CephContext
*_cct
) : cct(_cct
), lock("RGWCoroutine::Status::lock"), max_history(MAX_COROUTINE_HISTORY
) {}
209 deque
<StatusItem
> history
;
211 stringstream
& set_status();
214 stringstream description
;
218 boost::asio::coroutine drain_cr
;
222 RGWCoroutinesStack
*stack
;
226 rgw_spawned_stacks spawned
;
228 stringstream error_stream
;
230 int set_state(int s
, int ret
= 0) {
235 int set_cr_error(int ret
) {
236 return set_state(RGWCoroutine_Error
, ret
);
239 return set_state(RGWCoroutine_Done
, 0);
241 void set_io_blocked(bool flag
);
243 void reset_description() {
244 description
.str(string());
247 stringstream
& set_description() {
250 stringstream
& set_status() {
251 return status
.set_status();
254 stringstream
& set_status(const string
& s
) {
255 stringstream
& status
= set_status();
260 virtual int operate_wrapper() {
264 RGWCoroutine(CephContext
*_cct
) : status(_cct
), _yield_ret(false), cct(_cct
), stack(NULL
), retcode(0), state(RGWCoroutine_Run
) {}
265 ~RGWCoroutine() override
;
267 virtual int operate() = 0;
269 bool is_done() { return (state
== RGWCoroutine_Done
|| state
== RGWCoroutine_Error
); }
270 bool is_error() { return (state
== RGWCoroutine_Error
); }
272 stringstream
& log_error() { return error_stream
; }
274 return error_stream
.str();
277 void set_retcode(int r
) {
281 int get_ret_status() {
285 void call(RGWCoroutine
*op
); /* call at the same stack we're in */
286 RGWCoroutinesStack
*spawn(RGWCoroutine
*op
, bool wait
); /* execute on a different stack */
287 bool collect(int *ret
, RGWCoroutinesStack
*skip_stack
); /* returns true if needs to be called again */
288 bool collect_next(int *ret
, RGWCoroutinesStack
**collected_stack
= NULL
); /* returns true if found a stack to collect */
290 int wait(const utime_t
& interval
);
291 bool drain_children(int num_cr_left
, RGWCoroutinesStack
*skip_stack
= NULL
); /* returns true if needed to be called again */
293 void set_sleeping(bool flag
); /* put in sleep, or wakeup from sleep */
295 size_t num_spawned() {
296 return spawned
.entries
.size();
299 void wait_for_child();
301 virtual string
to_str() const;
303 RGWCoroutinesStack
*get_stack() const {
307 RGWCoroutinesEnv
*get_env() const;
309 void dump(Formatter
*f
) const;
311 void init_new_io(RGWIOProvider
*io_provider
); /* only links the default io id */
313 int io_block(int ret
= 0) {
314 return io_block(ret
, -1);
316 int io_block(int ret
, int64_t io_id
);
317 int io_block(int ret
, const rgw_io_id
& io_id
);
319 io_complete(rgw_io_id
{});
321 void io_complete(const rgw_io_id
& io_id
);
324 ostream
& operator<<(ostream
& out
, const RGWCoroutine
& cr
);
326 #define yield_until_true(x) \
329 yield _yield_ret = x; \
330 } while (!_yield_ret); \
331 _yield_ret = false; \
334 #define drain_all() \
335 drain_cr = boost::asio::coroutine(); \
336 yield_until_true(drain_children(0))
338 #define drain_all_but(n) \
339 drain_cr = boost::asio::coroutine(); \
340 yield_until_true(drain_children(n))
342 #define drain_all_but_stack(stack) \
343 drain_cr = boost::asio::coroutine(); \
344 yield_until_true(drain_children(1, stack))
347 class RGWConsumerCR
: public RGWCoroutine
{
351 explicit RGWConsumerCR(CephContext
*_cct
) : RGWCoroutine(_cct
) {}
354 return !product
.empty();
357 void wait_for_product() {
358 if (!has_product()) {
364 if (product
.empty()) {
367 *p
= product
.front();
372 void receive(const T
& p
, bool wakeup
= true);
373 void receive(list
<T
>& l
, bool wakeup
= true);
376 class RGWCoroutinesStack
: public RefCountedObject
{
377 friend class RGWCoroutine
;
378 friend class RGWCoroutinesManager
;
382 RGWCoroutinesManager
*ops_mgr
;
384 list
<RGWCoroutine
*> ops
;
385 list
<RGWCoroutine
*>::iterator pos
;
387 rgw_spawned_stacks spawned
;
389 set
<RGWCoroutinesStack
*> blocked_by_stack
;
390 set
<RGWCoroutinesStack
*> blocking_stacks
;
392 map
<int64_t, rgw_io_id
> io_finish_ids
;
393 rgw_io_id io_blocked_id
;
399 bool interval_wait_flag
;
403 bool is_waiting_for_child
;
410 RGWCoroutinesEnv
*env
;
411 RGWCoroutinesStack
*parent
;
413 RGWCoroutinesStack
*spawn(RGWCoroutine
*source_op
, RGWCoroutine
*next_op
, bool wait
);
414 bool collect(RGWCoroutine
*op
, int *ret
, RGWCoroutinesStack
*skip_stack
); /* returns true if needs to be called again */
415 bool collect_next(RGWCoroutine
*op
, int *ret
, RGWCoroutinesStack
**collected_stack
); /* returns true if found a stack to collect */
417 RGWCoroutinesStack(CephContext
*_cct
, RGWCoroutinesManager
*_ops_mgr
, RGWCoroutine
*start
= NULL
);
418 ~RGWCoroutinesStack() override
;
420 int operate(RGWCoroutinesEnv
*env
);
428 bool is_blocked_by_stack() {
429 return !blocked_by_stack
.empty();
431 void set_io_blocked(bool flag
) {
434 void set_io_blocked_id(const rgw_io_id
& io_id
) {
435 io_blocked_id
= io_id
;
437 bool is_io_blocked() {
438 return blocked_flag
&& !done_flag
;
440 bool can_io_unblock(const rgw_io_id
& io_id
) {
441 return ((io_blocked_id
.id
< 0) ||
442 io_blocked_id
.intersects(io_id
));
444 bool try_io_unblock(const rgw_io_id
& io_id
);
445 bool consume_io_finish(const rgw_io_id
& io_id
);
446 void set_interval_wait(bool flag
) {
447 interval_wait_flag
= flag
;
449 bool is_interval_waiting() {
450 return interval_wait_flag
;
452 void set_sleeping(bool flag
) {
453 bool wakeup
= sleep_flag
& !flag
;
462 void set_is_scheduled(bool flag
) {
467 return is_blocked_by_stack() || is_sleeping() ||
468 is_io_blocked() || waiting_for_child() ;
474 int get_ret_status() {
480 void call(RGWCoroutine
*next_op
);
481 RGWCoroutinesStack
*spawn(RGWCoroutine
*next_op
, bool wait
);
482 int unwind(int retcode
);
484 int wait(const utime_t
& interval
);
487 io_complete(rgw_io_id
{});
489 void io_complete(const rgw_io_id
& io_id
);
491 bool collect(int *ret
, RGWCoroutinesStack
*skip_stack
); /* returns true if needs to be called again */
495 RGWAioCompletionNotifier
*create_completion_notifier();
496 template <typename T
>
497 RGWAioCompletionNotifier
*create_completion_notifier(T value
);
498 RGWCompletionManager
*get_completion_mgr();
500 void set_blocked_by(RGWCoroutinesStack
*s
) {
501 blocked_by_stack
.insert(s
);
502 s
->blocking_stacks
.insert(this);
505 void set_wait_for_child(bool flag
) {
506 is_waiting_for_child
= flag
;
509 bool waiting_for_child() {
510 return is_waiting_for_child
;
513 bool unblock_stack(RGWCoroutinesStack
**s
);
515 RGWCoroutinesEnv
*get_env() const { return env
; }
517 void dump(Formatter
*f
) const;
519 void init_new_io(RGWIOProvider
*io_provider
);
523 void RGWConsumerCR
<T
>::receive(list
<T
>& l
, bool wakeup
)
525 product
.splice(product
.end(), l
);
533 void RGWConsumerCR
<T
>::receive(const T
& p
, bool wakeup
)
535 product
.push_back(p
);
541 class RGWCoroutinesManagerRegistry
: public RefCountedObject
, public AdminSocketHook
{
544 set
<RGWCoroutinesManager
*> managers
;
547 string admin_command
;
550 explicit RGWCoroutinesManagerRegistry(CephContext
*_cct
) : cct(_cct
), lock("RGWCoroutinesRegistry::lock") {}
551 ~RGWCoroutinesManagerRegistry() override
;
553 void add(RGWCoroutinesManager
*mgr
);
554 void remove(RGWCoroutinesManager
*mgr
);
556 int hook_to_admin_command(const string
& command
);
557 bool call(std::string_view command
, const cmdmap_t
& cmdmap
,
558 std::string_view format
, bufferlist
& out
) override
;
560 void dump(Formatter
*f
) const;
563 class RGWCoroutinesManager
{
565 std::atomic
<bool> going_down
= { false };
567 std::atomic
<int64_t> run_context_count
= { 0 };
568 map
<uint64_t, set
<RGWCoroutinesStack
*> > run_contexts
;
570 std::atomic
<int64_t> max_io_id
= { 0 };
574 RGWIOIDProvider io_id_provider
;
576 void handle_unblocked_stack(set
<RGWCoroutinesStack
*>& context_stacks
, list
<RGWCoroutinesStack
*>& scheduled_stacks
,
577 RGWCompletionManager::io_completion
& io
, int *waiting_count
);
579 RGWCompletionManager
*completion_mgr
;
580 RGWCoroutinesManagerRegistry
*cr_registry
;
586 void put_completion_notifier(RGWAioCompletionNotifier
*cn
);
588 RGWCoroutinesManager(CephContext
*_cct
, RGWCoroutinesManagerRegistry
*_cr_registry
) : cct(_cct
), lock("RGWCoroutinesManager::lock"),
589 cr_registry(_cr_registry
), ops_window(RGW_ASYNC_OPS_MGR_WINDOW
) {
590 completion_mgr
= new RGWCompletionManager(cct
);
592 cr_registry
->add(this);
595 virtual ~RGWCoroutinesManager() {
597 completion_mgr
->put();
599 cr_registry
->remove(this);
603 int run(list
<RGWCoroutinesStack
*>& ops
);
604 int run(RGWCoroutine
*op
);
606 bool expected
= false;
607 if (going_down
.compare_exchange_strong(expected
, true)) {
608 completion_mgr
->go_down();
612 virtual void report_error(RGWCoroutinesStack
*op
);
614 RGWAioCompletionNotifier
*create_completion_notifier(RGWCoroutinesStack
*stack
);
615 template <typename T
>
616 RGWAioCompletionNotifier
*create_completion_notifier(RGWCoroutinesStack
*stack
, T value
);
617 RGWCompletionManager
*get_completion_mgr() { return completion_mgr
; }
619 void schedule(RGWCoroutinesEnv
*env
, RGWCoroutinesStack
*stack
);
620 void _schedule(RGWCoroutinesEnv
*env
, RGWCoroutinesStack
*stack
);
621 RGWCoroutinesStack
*allocate_stack();
623 int64_t get_next_io_id();
625 void set_sleeping(RGWCoroutine
*cr
, bool flag
);
626 void io_complete(RGWCoroutine
*cr
, const rgw_io_id
& io_id
);
628 virtual string
get_id();
629 void dump(Formatter
*f
) const;
631 RGWIOIDProvider
& get_io_id_provider() {
632 return io_id_provider
;
636 template <typename T
>
637 RGWAioCompletionNotifier
*RGWCoroutinesManager::create_completion_notifier(RGWCoroutinesStack
*stack
, T value
)
639 rgw_io_id io_id
{get_next_io_id(), -1};
640 RGWAioCompletionNotifier
*cn
= new RGWAioCompletionNotifierWith
<T
>(completion_mgr
, io_id
, (void *)stack
, std::move(value
));
641 completion_mgr
->register_completion_notifier(cn
);
645 template <typename T
>
646 RGWAioCompletionNotifier
*RGWCoroutinesStack::create_completion_notifier(T value
)
648 return ops_mgr
->create_completion_notifier(this, std::move(value
));
651 class RGWSimpleCoroutine
: public RGWCoroutine
{
654 int operate() override
;
657 int state_send_request();
658 int state_request_complete();
659 int state_all_complete();
664 RGWSimpleCoroutine(CephContext
*_cct
) : RGWCoroutine(_cct
), called_cleanup(false) {}
665 ~RGWSimpleCoroutine() override
;
667 virtual int init() { return 0; }
668 virtual int send_request() = 0;
669 virtual int request_complete() = 0;
670 virtual int finish() { return 0; }
671 virtual void request_cleanup() {}