1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #ifndef CEPH_RGW_COROUTINE_H
5 #define CEPH_RGW_COROUTINE_H
9 #pragma push_macro("_ASSERT_H")
12 #include <boost/asio.hpp>
13 #include <boost/intrusive_ptr.hpp>
16 #pragma pop_macro("_ASSERT_H")
19 #include "include/utime.h"
20 #include "common/RefCountedObj.h"
21 #include "common/debug.h"
22 #include "common/Timer.h"
23 #include "common/admin_socket.h"
25 #include "rgw_common.h"
26 #include "rgw_http_client_types.h"
28 #include <boost/asio/coroutine.hpp>
32 #define RGW_ASYNC_OPS_MGR_WINDOW 100
34 class RGWCoroutinesStack
;
35 class RGWCoroutinesManager
;
36 class RGWAioCompletionNotifier
;
38 class RGWCompletionManager
: public RefCountedObject
{
39 friend class RGWCoroutinesManager
;
43 struct io_completion
{
47 std::list
<io_completion
> complete_reqs
;
48 std::set
<rgw_io_id
> complete_reqs_set
;
49 using NotifierRef
= boost::intrusive_ptr
<RGWAioCompletionNotifier
>;
50 std::set
<NotifierRef
> cns
;
52 ceph::mutex lock
= ceph::make_mutex("RGWCompletionManager::lock");
53 ceph::condition_variable cond
;
57 std::atomic
<bool> going_down
= { false };
59 std::map
<void *, void *> waiters
;
64 void _wakeup(void *opaque
);
65 void _complete(RGWAioCompletionNotifier
*cn
, const rgw_io_id
& io_id
, void *user_info
);
67 explicit RGWCompletionManager(CephContext
*_cct
);
68 ~RGWCompletionManager() override
;
70 void complete(RGWAioCompletionNotifier
*cn
, const rgw_io_id
& io_id
, void *user_info
);
71 int get_next(io_completion
*io
);
72 bool try_get_next(io_completion
*io
);
77 * wait for interval length to complete user_info
79 void wait_interval(void *opaque
, const utime_t
& interval
, void *user_info
);
80 void wakeup(void *opaque
);
82 void register_completion_notifier(RGWAioCompletionNotifier
*cn
);
83 void unregister_completion_notifier(RGWAioCompletionNotifier
*cn
);
86 /* a single use librados aio completion notifier that hooks into the RGWCompletionManager */
87 class RGWAioCompletionNotifier
: public RefCountedObject
{
88 librados::AioCompletion
*c
;
89 RGWCompletionManager
*completion_mgr
;
92 ceph::mutex lock
= ceph::make_mutex("RGWAioCompletionNotifier");
96 RGWAioCompletionNotifier(RGWCompletionManager
*_mgr
, const rgw_io_id
& _io_id
, void *_user_data
);
97 ~RGWAioCompletionNotifier() override
{
100 bool need_unregister
= registered
;
102 completion_mgr
->get();
106 if (need_unregister
) {
107 completion_mgr
->unregister_completion_notifier(this);
108 completion_mgr
->put();
112 librados::AioCompletion
*completion() {
117 std::lock_guard l
{lock
};
131 completion_mgr
->get();
134 completion_mgr
->complete(this, io_id
, user_data
);
135 completion_mgr
->put();
140 // completion notifier with opaque payload (ie a reference-counted pointer)
141 template <typename T
>
142 class RGWAioCompletionNotifierWith
: public RGWAioCompletionNotifier
{
145 RGWAioCompletionNotifierWith(RGWCompletionManager
*mgr
,
146 const rgw_io_id
& io_id
, void *user_data
,
148 : RGWAioCompletionNotifier(mgr
, io_id
, user_data
), value(std::move(value
))
152 struct RGWCoroutinesEnv
{
153 uint64_t run_context
;
154 RGWCoroutinesManager
*manager
;
155 std::list
<RGWCoroutinesStack
*> *scheduled_stacks
;
156 RGWCoroutinesStack
*stack
;
158 RGWCoroutinesEnv() : run_context(0), manager(NULL
), scheduled_stacks(NULL
), stack(NULL
) {}
161 enum RGWCoroutineState
{
162 RGWCoroutine_Error
= -2,
163 RGWCoroutine_Done
= -1,
164 RGWCoroutine_Run
= 0,
167 struct rgw_spawned_stacks
{
168 std::vector
<RGWCoroutinesStack
*> entries
;
170 rgw_spawned_stacks() {}
172 void add_pending(RGWCoroutinesStack
*s
) {
173 entries
.push_back(s
);
176 void inherit(rgw_spawned_stacks
*source
) {
177 for (auto* entry
: source
->entries
) {
180 source
->entries
.clear();
186 class RGWCoroutine
: public RefCountedObject
, public boost::asio::coroutine
{
187 friend class RGWCoroutinesStack
;
193 StatusItem(utime_t
& t
, const std::string
& s
) : timestamp(t
), status(s
) {}
195 void dump(Formatter
*f
) const;
198 #define MAX_COROUTINE_HISTORY 10
202 ceph::shared_mutex lock
=
203 ceph::make_shared_mutex("RGWCoroutine::Status::lock");
207 std::stringstream status
;
209 explicit Status(CephContext
*_cct
) : cct(_cct
), max_history(MAX_COROUTINE_HISTORY
) {}
211 std::deque
<StatusItem
> history
;
213 std::stringstream
& set_status();
216 std::stringstream description
;
222 boost::asio::coroutine cr
;
223 bool should_exit
{false};
227 cr
= boost::asio::coroutine();
235 RGWCoroutinesStack
*stack
;
239 rgw_spawned_stacks spawned
;
241 std::stringstream error_stream
;
243 int set_state(int s
, int ret
= 0) {
248 int set_cr_error(int ret
) {
249 return set_state(RGWCoroutine_Error
, ret
);
252 return set_state(RGWCoroutine_Done
, 0);
254 void set_io_blocked(bool flag
);
256 void reset_description() {
257 description
.str(std::string());
260 std::stringstream
& set_description() {
263 std::stringstream
& set_status() {
264 return status
.set_status();
267 std::stringstream
& set_status(const std::string
& s
) {
268 std::stringstream
& status
= set_status();
273 virtual int operate_wrapper(const DoutPrefixProvider
*dpp
) {
277 RGWCoroutine(CephContext
*_cct
) : status(_cct
), _yield_ret(false), cct(_cct
), stack(NULL
), retcode(0), state(RGWCoroutine_Run
) {}
278 virtual ~RGWCoroutine() override
;
280 virtual int operate(const DoutPrefixProvider
*dpp
) = 0;
282 bool is_done() { return (state
== RGWCoroutine_Done
|| state
== RGWCoroutine_Error
); }
283 bool is_error() { return (state
== RGWCoroutine_Error
); }
285 std::stringstream
& log_error() { return error_stream
; }
286 std::string
error_str() {
287 return error_stream
.str();
290 void set_retcode(int r
) {
294 int get_ret_status() {
298 void call(RGWCoroutine
*op
); /* call at the same stack we're in */
299 RGWCoroutinesStack
*spawn(RGWCoroutine
*op
, bool wait
); /* execute on a different stack */
300 bool collect(int *ret
, RGWCoroutinesStack
*skip_stack
, uint64_t *stack_id
= nullptr); /* returns true if needs to be called again */
301 bool collect_next(int *ret
, RGWCoroutinesStack
**collected_stack
= NULL
); /* returns true if found a stack to collect */
303 RGWCoroutinesStack
*prealloc_stack(); /* prepare a stack that will be used in the next spawn operation */
304 uint64_t prealloc_stack_id(); /* prepare a stack that will be used in the next spawn operation, return its id */
306 int wait(const utime_t
& interval
);
307 bool drain_children(int num_cr_left
,
308 RGWCoroutinesStack
*skip_stack
= nullptr,
309 std::optional
<std::function
<void(uint64_t stack_id
, int ret
)> > cb
= std::nullopt
); /* returns true if needed to be called again,
310 cb will be called on completion of every
312 bool drain_children(int num_cr_left
,
313 std::optional
<std::function
<int(uint64_t stack_id
, int ret
)> > cb
); /* returns true if needed to be called again,
314 cb will be called on every completion, can filter errors.
315 A negative return value from cb means that current cr
318 void set_sleeping(bool flag
); /* put in sleep, or wakeup from sleep */
320 size_t num_spawned() {
321 return spawned
.entries
.size();
324 void wait_for_child();
326 virtual std::string
to_str() const;
328 RGWCoroutinesStack
*get_stack() const {
332 RGWCoroutinesEnv
*get_env() const;
334 void dump(Formatter
*f
) const;
336 void init_new_io(RGWIOProvider
*io_provider
); /* only links the default io id */
338 int io_block(int ret
= 0) {
339 return io_block(ret
, -1);
341 int io_block(int ret
, int64_t io_id
);
342 int io_block(int ret
, const rgw_io_id
& io_id
);
344 io_complete(rgw_io_id
{});
346 void io_complete(const rgw_io_id
& io_id
);
349 std::ostream
& operator<<(std::ostream
& out
, const RGWCoroutine
& cr
);
351 #define yield_until_true(x) \
354 yield _yield_ret = x; \
355 } while (!_yield_ret); \
356 _yield_ret = false; \
359 #define drain_all() \
360 drain_status.init(); \
361 yield_until_true(drain_children(0))
363 #define drain_all_but(n) \
364 drain_status.init(); \
365 yield_until_true(drain_children(n))
367 #define drain_all_but_stack(stack) \
368 drain_status.init(); \
369 yield_until_true(drain_children(1, stack))
371 #define drain_all_but_stack_cb(stack, cb) \
372 drain_status.init(); \
373 yield_until_true(drain_children(1, stack, cb))
375 #define drain_with_cb(n, cb) \
376 drain_status.init(); \
377 yield_until_true(drain_children(n, cb)); \
378 if (drain_status.should_exit) { \
379 return set_cr_error(drain_status.ret); \
382 #define drain_all_cb(cb) \
385 #define yield_spawn_window(cr, n, cb) \
388 drain_with_cb(n, cb); /* this is guaranteed to yield */ \
394 class RGWConsumerCR
: public RGWCoroutine
{
395 std::list
<T
> product
;
398 explicit RGWConsumerCR(CephContext
*_cct
) : RGWCoroutine(_cct
) {}
401 return !product
.empty();
404 void wait_for_product() {
405 if (!has_product()) {
411 if (product
.empty()) {
414 *p
= product
.front();
419 void receive(const T
& p
, bool wakeup
= true);
420 void receive(std::list
<T
>& l
, bool wakeup
= true);
423 class RGWCoroutinesStack
: public RefCountedObject
{
424 friend class RGWCoroutine
;
425 friend class RGWCoroutinesManager
;
431 RGWCoroutinesManager
*ops_mgr
;
433 std::list
<RGWCoroutine
*> ops
;
434 std::list
<RGWCoroutine
*>::iterator pos
;
436 rgw_spawned_stacks spawned
;
438 RGWCoroutinesStack
*preallocated_stack
{nullptr};
440 std::set
<RGWCoroutinesStack
*> blocked_by_stack
;
441 std::set
<RGWCoroutinesStack
*> blocking_stacks
;
443 std::map
<int64_t, rgw_io_id
> io_finish_ids
;
444 rgw_io_id io_blocked_id
;
450 bool interval_wait_flag
;
454 bool is_waiting_for_child
;
461 RGWCoroutinesEnv
*env
;
462 RGWCoroutinesStack
*parent
;
464 RGWCoroutinesStack
*spawn(RGWCoroutine
*source_op
, RGWCoroutine
*next_op
, bool wait
);
465 bool collect(RGWCoroutine
*op
, int *ret
, RGWCoroutinesStack
*skip_stack
, uint64_t *stack_id
); /* returns true if needs to be called again */
466 bool collect_next(RGWCoroutine
*op
, int *ret
, RGWCoroutinesStack
**collected_stack
); /* returns true if found a stack to collect */
468 RGWCoroutinesStack(CephContext
*_cct
, RGWCoroutinesManager
*_ops_mgr
, RGWCoroutine
*start
= NULL
);
469 ~RGWCoroutinesStack() override
;
471 int64_t get_id() const {
475 int operate(const DoutPrefixProvider
*dpp
, RGWCoroutinesEnv
*env
);
483 bool is_blocked_by_stack() {
484 return !blocked_by_stack
.empty();
486 void set_io_blocked(bool flag
) {
489 void set_io_blocked_id(const rgw_io_id
& io_id
) {
490 io_blocked_id
= io_id
;
492 bool is_io_blocked() {
493 return blocked_flag
&& !done_flag
;
495 bool can_io_unblock(const rgw_io_id
& io_id
) {
496 return ((io_blocked_id
.id
< 0) ||
497 io_blocked_id
.intersects(io_id
));
499 bool try_io_unblock(const rgw_io_id
& io_id
);
500 bool consume_io_finish(const rgw_io_id
& io_id
);
501 void set_interval_wait(bool flag
) {
502 interval_wait_flag
= flag
;
504 bool is_interval_waiting() {
505 return interval_wait_flag
;
507 void set_sleeping(bool flag
) {
508 bool wakeup
= sleep_flag
& !flag
;
517 void set_is_scheduled(bool flag
) {
522 return is_blocked_by_stack() || is_sleeping() ||
523 is_io_blocked() || waiting_for_child() ;
529 int get_ret_status() {
533 std::string
error_str();
535 void call(RGWCoroutine
*next_op
);
536 RGWCoroutinesStack
*spawn(RGWCoroutine
*next_op
, bool wait
);
537 RGWCoroutinesStack
*prealloc_stack();
538 int unwind(int retcode
);
540 int wait(const utime_t
& interval
);
543 io_complete(rgw_io_id
{});
545 void io_complete(const rgw_io_id
& io_id
);
547 bool collect(int *ret
, RGWCoroutinesStack
*skip_stack
, uint64_t *stack_id
); /* returns true if needs to be called again */
551 RGWAioCompletionNotifier
*create_completion_notifier();
552 template <typename T
>
553 RGWAioCompletionNotifier
*create_completion_notifier(T value
);
554 RGWCompletionManager
*get_completion_mgr();
556 void set_blocked_by(RGWCoroutinesStack
*s
) {
557 blocked_by_stack
.insert(s
);
558 s
->blocking_stacks
.insert(this);
561 void set_wait_for_child(bool flag
) {
562 is_waiting_for_child
= flag
;
565 bool waiting_for_child() {
566 return is_waiting_for_child
;
569 bool unblock_stack(RGWCoroutinesStack
**s
);
571 RGWCoroutinesEnv
*get_env() const { return env
; }
573 void dump(Formatter
*f
) const;
575 void init_new_io(RGWIOProvider
*io_provider
);
579 void RGWConsumerCR
<T
>::receive(std::list
<T
>& l
, bool wakeup
)
581 product
.splice(product
.end(), l
);
589 void RGWConsumerCR
<T
>::receive(const T
& p
, bool wakeup
)
591 product
.push_back(p
);
597 class RGWCoroutinesManagerRegistry
: public RefCountedObject
, public AdminSocketHook
{
600 std::set
<RGWCoroutinesManager
*> managers
;
601 ceph::shared_mutex lock
=
602 ceph::make_shared_mutex("RGWCoroutinesRegistry::lock");
604 std::string admin_command
;
607 explicit RGWCoroutinesManagerRegistry(CephContext
*_cct
) : cct(_cct
) {}
608 virtual ~RGWCoroutinesManagerRegistry() override
;
610 void add(RGWCoroutinesManager
*mgr
);
611 void remove(RGWCoroutinesManager
*mgr
);
613 int hook_to_admin_command(const std::string
& command
);
614 int call(std::string_view command
, const cmdmap_t
& cmdmap
,
617 bufferlist
& out
) override
;
619 void dump(Formatter
*f
) const;
622 class RGWCoroutinesManager
{
624 std::atomic
<bool> going_down
= { false };
626 std::atomic
<int64_t> run_context_count
= { 0 };
627 std::map
<uint64_t, std::set
<RGWCoroutinesStack
*> > run_contexts
;
629 std::atomic
<int64_t> max_io_id
= { 0 };
630 std::atomic
<uint64_t> max_stack_id
= { 0 };
632 mutable ceph::shared_mutex lock
=
633 ceph::make_shared_mutex("RGWCoroutinesManager::lock");
635 RGWIOIDProvider io_id_provider
;
637 void handle_unblocked_stack(std::set
<RGWCoroutinesStack
*>& context_stacks
, std::list
<RGWCoroutinesStack
*>& scheduled_stacks
,
638 RGWCompletionManager::io_completion
& io
, int *waiting_count
);
640 RGWCompletionManager
*completion_mgr
;
641 RGWCoroutinesManagerRegistry
*cr_registry
;
647 void put_completion_notifier(RGWAioCompletionNotifier
*cn
);
649 RGWCoroutinesManager(CephContext
*_cct
, RGWCoroutinesManagerRegistry
*_cr_registry
) : cct(_cct
),
650 cr_registry(_cr_registry
), ops_window(RGW_ASYNC_OPS_MGR_WINDOW
) {
651 completion_mgr
= new RGWCompletionManager(cct
);
653 cr_registry
->add(this);
656 virtual ~RGWCoroutinesManager();
658 int run(const DoutPrefixProvider
*dpp
, std::list
<RGWCoroutinesStack
*>& ops
);
659 int run(const DoutPrefixProvider
*dpp
, RGWCoroutine
*op
);
661 bool expected
= false;
662 if (going_down
.compare_exchange_strong(expected
, true)) {
663 completion_mgr
->go_down();
667 virtual void report_error(RGWCoroutinesStack
*op
);
669 RGWAioCompletionNotifier
*create_completion_notifier(RGWCoroutinesStack
*stack
);
670 template <typename T
>
671 RGWAioCompletionNotifier
*create_completion_notifier(RGWCoroutinesStack
*stack
, T value
);
672 RGWCompletionManager
*get_completion_mgr() { return completion_mgr
; }
674 void schedule(RGWCoroutinesEnv
*env
, RGWCoroutinesStack
*stack
);
675 void _schedule(RGWCoroutinesEnv
*env
, RGWCoroutinesStack
*stack
);
676 RGWCoroutinesStack
*allocate_stack();
678 int64_t get_next_io_id();
679 uint64_t get_next_stack_id();
681 void set_sleeping(RGWCoroutine
*cr
, bool flag
);
682 void io_complete(RGWCoroutine
*cr
, const rgw_io_id
& io_id
);
684 virtual std::string
get_id();
685 void dump(Formatter
*f
) const;
687 RGWIOIDProvider
& get_io_id_provider() {
688 return io_id_provider
;
692 template <typename T
>
693 RGWAioCompletionNotifier
*RGWCoroutinesManager::create_completion_notifier(RGWCoroutinesStack
*stack
, T value
)
695 rgw_io_id io_id
{get_next_io_id(), -1};
696 RGWAioCompletionNotifier
*cn
= new RGWAioCompletionNotifierWith
<T
>(completion_mgr
, io_id
, (void *)stack
, std::move(value
));
697 completion_mgr
->register_completion_notifier(cn
);
701 template <typename T
>
702 RGWAioCompletionNotifier
*RGWCoroutinesStack::create_completion_notifier(T value
)
704 return ops_mgr
->create_completion_notifier(this, std::move(value
));
707 class RGWSimpleCoroutine
: public RGWCoroutine
{
710 int operate(const DoutPrefixProvider
*dpp
) override
;
713 int state_send_request(const DoutPrefixProvider
*dpp
);
714 int state_request_complete();
715 int state_all_complete();
720 RGWSimpleCoroutine(CephContext
*_cct
) : RGWCoroutine(_cct
), called_cleanup(false) {}
721 ~RGWSimpleCoroutine() override
;
723 virtual int init() { return 0; }
724 virtual int send_request(const DoutPrefixProvider
*dpp
) = 0;
725 virtual int request_complete() = 0;
726 virtual int finish() { return 0; }
727 virtual void request_cleanup() {}