1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
8 #pragma push_macro("_ASSERT_H")
11 #include <boost/asio.hpp>
12 #include <boost/intrusive_ptr.hpp>
15 #pragma pop_macro("_ASSERT_H")
18 #include "include/utime.h"
19 #include "common/RefCountedObj.h"
20 #include "common/debug.h"
21 #include "common/Timer.h"
22 #include "common/admin_socket.h"
24 #include "rgw_common.h"
25 #include "rgw_http_client_types.h"
27 #include <boost/asio/coroutine.hpp>
31 #define RGW_ASYNC_OPS_MGR_WINDOW 100
33 class RGWCoroutinesStack
;
34 class RGWCoroutinesManager
;
35 class RGWAioCompletionNotifier
;
37 class RGWCompletionManager
: public RefCountedObject
{
38 friend class RGWCoroutinesManager
;
42 struct io_completion
{
46 std::list
<io_completion
> complete_reqs
;
47 std::set
<rgw_io_id
> complete_reqs_set
;
48 using NotifierRef
= boost::intrusive_ptr
<RGWAioCompletionNotifier
>;
49 std::set
<NotifierRef
> cns
;
51 ceph::mutex lock
= ceph::make_mutex("RGWCompletionManager::lock");
52 ceph::condition_variable cond
;
56 std::atomic
<bool> going_down
= { false };
58 std::map
<void *, void *> waiters
;
63 void _wakeup(void *opaque
);
64 void _complete(RGWAioCompletionNotifier
*cn
, const rgw_io_id
& io_id
, void *user_info
);
66 explicit RGWCompletionManager(CephContext
*_cct
);
67 virtual ~RGWCompletionManager() override
;
69 void complete(RGWAioCompletionNotifier
*cn
, const rgw_io_id
& io_id
, void *user_info
);
70 int get_next(io_completion
*io
);
71 bool try_get_next(io_completion
*io
);
76 * wait for interval length to complete user_info
78 void wait_interval(void *opaque
, const utime_t
& interval
, void *user_info
);
79 void wakeup(void *opaque
);
81 void register_completion_notifier(RGWAioCompletionNotifier
*cn
);
82 void unregister_completion_notifier(RGWAioCompletionNotifier
*cn
);
85 /* a single use librados aio completion notifier that hooks into the RGWCompletionManager */
86 class RGWAioCompletionNotifier
: public RefCountedObject
{
87 librados::AioCompletion
*c
;
88 RGWCompletionManager
*completion_mgr
;
91 ceph::mutex lock
= ceph::make_mutex("RGWAioCompletionNotifier");
95 RGWAioCompletionNotifier(RGWCompletionManager
*_mgr
, const rgw_io_id
& _io_id
, void *_user_data
);
96 virtual ~RGWAioCompletionNotifier() override
{
99 bool need_unregister
= registered
;
101 completion_mgr
->get();
105 if (need_unregister
) {
106 completion_mgr
->unregister_completion_notifier(this);
107 completion_mgr
->put();
111 librados::AioCompletion
*completion() {
116 std::lock_guard l
{lock
};
130 completion_mgr
->get();
133 completion_mgr
->complete(this, io_id
, user_data
);
134 completion_mgr
->put();
139 // completion notifier with opaque payload (ie a reference-counted pointer)
140 template <typename T
>
141 class RGWAioCompletionNotifierWith
: public RGWAioCompletionNotifier
{
144 RGWAioCompletionNotifierWith(RGWCompletionManager
*mgr
,
145 const rgw_io_id
& io_id
, void *user_data
,
147 : RGWAioCompletionNotifier(mgr
, io_id
, user_data
), value(std::move(value
))
151 struct RGWCoroutinesEnv
{
152 uint64_t run_context
;
153 RGWCoroutinesManager
*manager
;
154 std::list
<RGWCoroutinesStack
*> *scheduled_stacks
;
155 RGWCoroutinesStack
*stack
;
157 RGWCoroutinesEnv() : run_context(0), manager(NULL
), scheduled_stacks(NULL
), stack(NULL
) {}
160 enum RGWCoroutineState
{
161 RGWCoroutine_Error
= -2,
162 RGWCoroutine_Done
= -1,
163 RGWCoroutine_Run
= 0,
166 struct rgw_spawned_stacks
{
167 std::vector
<RGWCoroutinesStack
*> entries
;
169 rgw_spawned_stacks() {}
171 void add_pending(RGWCoroutinesStack
*s
) {
172 entries
.push_back(s
);
175 void inherit(rgw_spawned_stacks
*source
) {
176 for (auto* entry
: source
->entries
) {
179 source
->entries
.clear();
185 class RGWCoroutine
: public RefCountedObject
, public boost::asio::coroutine
{
186 friend class RGWCoroutinesStack
;
192 StatusItem(utime_t
& t
, const std::string
& s
) : timestamp(t
), status(s
) {}
194 void dump(Formatter
*f
) const;
197 #define MAX_COROUTINE_HISTORY 10
201 ceph::shared_mutex lock
=
202 ceph::make_shared_mutex("RGWCoroutine::Status::lock");
206 std::stringstream status
;
208 explicit Status(CephContext
*_cct
) : cct(_cct
), max_history(MAX_COROUTINE_HISTORY
) {}
210 std::deque
<StatusItem
> history
;
212 std::stringstream
& set_status();
215 std::stringstream description
;
221 boost::asio::coroutine cr
;
222 bool should_exit
{false};
226 cr
= boost::asio::coroutine();
234 RGWCoroutinesStack
*stack
;
238 rgw_spawned_stacks spawned
;
240 std::stringstream error_stream
;
242 int set_state(int s
, int ret
= 0) {
247 int set_cr_error(int ret
) {
248 return set_state(RGWCoroutine_Error
, ret
);
251 return set_state(RGWCoroutine_Done
, 0);
253 void set_io_blocked(bool flag
);
255 void reset_description() {
256 description
.str(std::string());
259 std::stringstream
& set_description() {
262 std::stringstream
& set_status() {
263 return status
.set_status();
266 std::stringstream
& set_status(const std::string
& s
) {
267 std::stringstream
& status
= set_status();
272 virtual int operate_wrapper(const DoutPrefixProvider
*dpp
) {
276 RGWCoroutine(CephContext
*_cct
) : status(_cct
), _yield_ret(false), cct(_cct
), stack(NULL
), retcode(0), state(RGWCoroutine_Run
) {}
277 virtual ~RGWCoroutine() override
;
279 virtual int operate(const DoutPrefixProvider
*dpp
) = 0;
281 bool is_done() { return (state
== RGWCoroutine_Done
|| state
== RGWCoroutine_Error
); }
282 bool is_error() { return (state
== RGWCoroutine_Error
); }
284 std::stringstream
& log_error() { return error_stream
; }
285 std::string
error_str() {
286 return error_stream
.str();
289 void set_retcode(int r
) {
293 int get_ret_status() {
297 void call(RGWCoroutine
*op
); /* call at the same stack we're in */
298 RGWCoroutinesStack
*spawn(RGWCoroutine
*op
, bool wait
); /* execute on a different stack */
299 bool collect(int *ret
, RGWCoroutinesStack
*skip_stack
, uint64_t *stack_id
= nullptr); /* returns true if needs to be called again */
300 bool collect_next(int *ret
, RGWCoroutinesStack
**collected_stack
= NULL
); /* returns true if found a stack to collect */
302 int wait(const utime_t
& interval
);
303 bool drain_children(int num_cr_left
,
304 RGWCoroutinesStack
*skip_stack
= nullptr,
305 std::optional
<std::function
<void(uint64_t stack_id
, int ret
)> > cb
= std::nullopt
); /* returns true if needed to be called again,
306 cb will be called on completion of every
308 bool drain_children(int num_cr_left
,
309 std::optional
<std::function
<int(uint64_t stack_id
, int ret
)> > cb
); /* returns true if needed to be called again,
310 cb will be called on every completion, can filter errors.
311 A negative return value from cb means that current cr
314 void set_sleeping(bool flag
); /* put in sleep, or wakeup from sleep */
316 size_t num_spawned() {
317 return spawned
.entries
.size();
320 void wait_for_child();
322 virtual std::string
to_str() const;
324 RGWCoroutinesStack
*get_stack() const {
328 RGWCoroutinesEnv
*get_env() const;
330 void dump(Formatter
*f
) const;
332 void init_new_io(RGWIOProvider
*io_provider
); /* only links the default io id */
334 int io_block(int ret
= 0) {
335 return io_block(ret
, -1);
337 int io_block(int ret
, int64_t io_id
);
338 int io_block(int ret
, const rgw_io_id
& io_id
);
340 io_complete(rgw_io_id
{});
342 void io_complete(const rgw_io_id
& io_id
);
345 std::ostream
& operator<<(std::ostream
& out
, const RGWCoroutine
& cr
);
347 #define yield_until_true(x) \
350 yield _yield_ret = x; \
351 } while (!_yield_ret); \
352 _yield_ret = false; \
355 #define drain_all() \
356 drain_status.init(); \
357 yield_until_true(drain_children(0))
359 #define drain_all_but(n) \
360 drain_status.init(); \
361 yield_until_true(drain_children(n))
363 #define drain_all_but_stack(stack) \
364 drain_status.init(); \
365 yield_until_true(drain_children(1, stack))
367 #define drain_all_but_stack_cb(stack, cb) \
368 drain_status.init(); \
369 yield_until_true(drain_children(1, stack, cb))
371 #define drain_with_cb(n, cb) \
372 drain_status.init(); \
373 yield_until_true(drain_children(n, cb)); \
374 if (drain_status.should_exit) { \
375 return set_cr_error(drain_status.ret); \
378 #define drain_all_cb(cb) \
381 #define yield_spawn_window(cr, n, cb) \
384 drain_with_cb(n, cb); /* this is guaranteed to yield */ \
390 class RGWConsumerCR
: public RGWCoroutine
{
391 std::list
<T
> product
;
394 explicit RGWConsumerCR(CephContext
*_cct
) : RGWCoroutine(_cct
) {}
397 return !product
.empty();
400 void wait_for_product() {
401 if (!has_product()) {
407 if (product
.empty()) {
410 *p
= product
.front();
415 void receive(const T
& p
, bool wakeup
= true);
416 void receive(std::list
<T
>& l
, bool wakeup
= true);
419 class RGWCoroutinesStack
: public RefCountedObject
{
420 friend class RGWCoroutine
;
421 friend class RGWCoroutinesManager
;
427 RGWCoroutinesManager
*ops_mgr
;
429 std::list
<RGWCoroutine
*> ops
;
430 std::list
<RGWCoroutine
*>::iterator pos
;
432 rgw_spawned_stacks spawned
;
434 std::set
<RGWCoroutinesStack
*> blocked_by_stack
;
435 std::set
<RGWCoroutinesStack
*> blocking_stacks
;
437 std::map
<int64_t, rgw_io_id
> io_finish_ids
;
438 rgw_io_id io_blocked_id
;
444 bool interval_wait_flag
;
448 bool is_waiting_for_child
;
455 RGWCoroutinesEnv
*env
;
456 RGWCoroutinesStack
*parent
;
458 RGWCoroutinesStack
*spawn(RGWCoroutine
*source_op
, RGWCoroutine
*next_op
, bool wait
);
459 bool collect(RGWCoroutine
*op
, int *ret
, RGWCoroutinesStack
*skip_stack
, uint64_t *stack_id
); /* returns true if needs to be called again */
460 bool collect_next(RGWCoroutine
*op
, int *ret
, RGWCoroutinesStack
**collected_stack
); /* returns true if found a stack to collect */
462 RGWCoroutinesStack(CephContext
*_cct
, RGWCoroutinesManager
*_ops_mgr
, RGWCoroutine
*start
= NULL
);
463 virtual ~RGWCoroutinesStack() override
;
465 int64_t get_id() const {
469 int operate(const DoutPrefixProvider
*dpp
, RGWCoroutinesEnv
*env
);
477 bool is_blocked_by_stack() {
478 return !blocked_by_stack
.empty();
480 void set_io_blocked(bool flag
) {
483 void set_io_blocked_id(const rgw_io_id
& io_id
) {
484 io_blocked_id
= io_id
;
486 bool is_io_blocked() {
487 return blocked_flag
&& !done_flag
;
489 bool can_io_unblock(const rgw_io_id
& io_id
) {
490 return ((io_blocked_id
.id
< 0) ||
491 io_blocked_id
.intersects(io_id
));
493 bool try_io_unblock(const rgw_io_id
& io_id
);
494 bool consume_io_finish(const rgw_io_id
& io_id
);
495 void set_interval_wait(bool flag
) {
496 interval_wait_flag
= flag
;
498 bool is_interval_waiting() {
499 return interval_wait_flag
;
501 void set_sleeping(bool flag
) {
502 bool wakeup
= sleep_flag
& !flag
;
511 void set_is_scheduled(bool flag
) {
516 return is_blocked_by_stack() || is_sleeping() ||
517 is_io_blocked() || waiting_for_child() ;
523 int get_ret_status() {
527 std::string
error_str();
529 void call(RGWCoroutine
*next_op
);
530 RGWCoroutinesStack
*spawn(RGWCoroutine
*next_op
, bool wait
);
531 int unwind(int retcode
);
533 int wait(const utime_t
& interval
);
536 io_complete(rgw_io_id
{});
538 void io_complete(const rgw_io_id
& io_id
);
540 bool collect(int *ret
, RGWCoroutinesStack
*skip_stack
, uint64_t *stack_id
); /* returns true if needs to be called again */
544 RGWAioCompletionNotifier
*create_completion_notifier();
545 template <typename T
>
546 RGWAioCompletionNotifier
*create_completion_notifier(T value
);
547 RGWCompletionManager
*get_completion_mgr();
549 void set_blocked_by(RGWCoroutinesStack
*s
) {
550 blocked_by_stack
.insert(s
);
551 s
->blocking_stacks
.insert(this);
554 void set_wait_for_child(bool flag
) {
555 is_waiting_for_child
= flag
;
558 bool waiting_for_child() {
559 return is_waiting_for_child
;
562 bool unblock_stack(RGWCoroutinesStack
**s
);
564 RGWCoroutinesEnv
*get_env() const { return env
; }
566 void dump(Formatter
*f
) const;
568 void init_new_io(RGWIOProvider
*io_provider
);
572 void RGWConsumerCR
<T
>::receive(std::list
<T
>& l
, bool wakeup
)
574 product
.splice(product
.end(), l
);
582 void RGWConsumerCR
<T
>::receive(const T
& p
, bool wakeup
)
584 product
.push_back(p
);
590 class RGWCoroutinesManagerRegistry
: public RefCountedObject
, public AdminSocketHook
{
593 std::set
<RGWCoroutinesManager
*> managers
;
594 ceph::shared_mutex lock
=
595 ceph::make_shared_mutex("RGWCoroutinesRegistry::lock");
597 std::string admin_command
;
600 explicit RGWCoroutinesManagerRegistry(CephContext
*_cct
) : cct(_cct
) {}
601 virtual ~RGWCoroutinesManagerRegistry() override
;
603 void add(RGWCoroutinesManager
*mgr
);
604 void remove(RGWCoroutinesManager
*mgr
);
606 int hook_to_admin_command(const std::string
& command
);
607 int call(std::string_view command
, const cmdmap_t
& cmdmap
,
611 bufferlist
& out
) override
;
613 void dump(Formatter
*f
) const;
616 class RGWCoroutinesManager
{
618 std::atomic
<bool> going_down
= { false };
620 std::atomic
<int64_t> run_context_count
= { 0 };
621 std::map
<uint64_t, std::set
<RGWCoroutinesStack
*> > run_contexts
;
623 std::atomic
<int64_t> max_io_id
= { 0 };
624 std::atomic
<uint64_t> max_stack_id
= { 0 };
626 mutable ceph::shared_mutex lock
=
627 ceph::make_shared_mutex("RGWCoroutinesManager::lock");
629 RGWIOIDProvider io_id_provider
;
631 void handle_unblocked_stack(std::set
<RGWCoroutinesStack
*>& context_stacks
, std::list
<RGWCoroutinesStack
*>& scheduled_stacks
,
632 RGWCompletionManager::io_completion
& io
, int *waiting_count
, int *interval_wait_count
);
634 RGWCompletionManager
*completion_mgr
;
635 RGWCoroutinesManagerRegistry
*cr_registry
;
641 void put_completion_notifier(RGWAioCompletionNotifier
*cn
);
643 RGWCoroutinesManager(CephContext
*_cct
, RGWCoroutinesManagerRegistry
*_cr_registry
) : cct(_cct
),
644 cr_registry(_cr_registry
), ops_window(RGW_ASYNC_OPS_MGR_WINDOW
) {
645 completion_mgr
= new RGWCompletionManager(cct
);
647 cr_registry
->add(this);
650 virtual ~RGWCoroutinesManager();
652 int run(const DoutPrefixProvider
*dpp
, std::list
<RGWCoroutinesStack
*>& ops
);
653 int run(const DoutPrefixProvider
*dpp
, RGWCoroutine
*op
);
655 bool expected
= false;
656 if (going_down
.compare_exchange_strong(expected
, true)) {
657 completion_mgr
->go_down();
661 virtual void report_error(RGWCoroutinesStack
*op
);
663 RGWAioCompletionNotifier
*create_completion_notifier(RGWCoroutinesStack
*stack
);
664 template <typename T
>
665 RGWAioCompletionNotifier
*create_completion_notifier(RGWCoroutinesStack
*stack
, T value
);
666 RGWCompletionManager
*get_completion_mgr() { return completion_mgr
; }
668 void schedule(RGWCoroutinesEnv
*env
, RGWCoroutinesStack
*stack
);
669 void _schedule(RGWCoroutinesEnv
*env
, RGWCoroutinesStack
*stack
);
670 RGWCoroutinesStack
*allocate_stack();
672 int64_t get_next_io_id();
673 uint64_t get_next_stack_id();
675 void set_sleeping(RGWCoroutine
*cr
, bool flag
);
676 void io_complete(RGWCoroutine
*cr
, const rgw_io_id
& io_id
);
678 virtual std::string
get_id();
679 void dump(Formatter
*f
) const;
681 RGWIOIDProvider
& get_io_id_provider() {
682 return io_id_provider
;
686 template <typename T
>
687 RGWAioCompletionNotifier
*RGWCoroutinesManager::create_completion_notifier(RGWCoroutinesStack
*stack
, T value
)
689 rgw_io_id io_id
{get_next_io_id(), -1};
690 RGWAioCompletionNotifier
*cn
= new RGWAioCompletionNotifierWith
<T
>(completion_mgr
, io_id
, (void *)stack
, std::move(value
));
691 completion_mgr
->register_completion_notifier(cn
);
695 template <typename T
>
696 RGWAioCompletionNotifier
*RGWCoroutinesStack::create_completion_notifier(T value
)
698 return ops_mgr
->create_completion_notifier(this, std::move(value
));
701 class RGWSimpleCoroutine
: public RGWCoroutine
{
704 int operate(const DoutPrefixProvider
*dpp
) override
;
707 int state_send_request(const DoutPrefixProvider
*dpp
);
708 int state_request_complete();
709 int state_all_complete();
714 RGWSimpleCoroutine(CephContext
*_cct
) : RGWCoroutine(_cct
), called_cleanup(false) {}
715 virtual ~RGWSimpleCoroutine() override
;
717 virtual int init() { return 0; }
718 virtual int send_request(const DoutPrefixProvider
*dpp
) = 0;
719 virtual int request_complete() = 0;
720 virtual int finish() { return 0; }
721 virtual void request_cleanup() {}