1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #ifndef CEPH_RGW_COROUTINE_H
5 #define CEPH_RGW_COROUTINE_H
9 #pragma push_macro("_ASSERT_H")
12 #include <boost/asio.hpp>
13 #include <boost/intrusive_ptr.hpp>
16 #pragma pop_macro("_ASSERT_H")
19 #include "include/utime.h"
20 #include "common/RefCountedObj.h"
21 #include "common/debug.h"
22 #include "common/Timer.h"
23 #include "common/admin_socket.h"
24 #include "common/RWLock.h"
26 #include "rgw_common.h"
27 #include "rgw_http_client_types.h"
29 #include <boost/asio/coroutine.hpp>
33 #define RGW_ASYNC_OPS_MGR_WINDOW 100
35 class RGWCoroutinesStack
;
36 class RGWCoroutinesManager
;
37 class RGWAioCompletionNotifier
;
39 class RGWCompletionManager
: public RefCountedObject
{
40 friend class RGWCoroutinesManager
;
44 struct io_completion
{
48 list
<io_completion
> complete_reqs
;
49 set
<rgw_io_id
> complete_reqs_set
;
50 using NotifierRef
= boost::intrusive_ptr
<RGWAioCompletionNotifier
>;
53 ceph::mutex lock
= ceph::make_mutex("RGWCompletionManager::lock");
54 ceph::condition_variable cond
;
58 std::atomic
<bool> going_down
= { false };
60 map
<void *, void *> waiters
;
65 void _wakeup(void *opaque
);
66 void _complete(RGWAioCompletionNotifier
*cn
, const rgw_io_id
& io_id
, void *user_info
);
68 explicit RGWCompletionManager(CephContext
*_cct
);
69 ~RGWCompletionManager() override
;
71 void complete(RGWAioCompletionNotifier
*cn
, const rgw_io_id
& io_id
, void *user_info
);
72 int get_next(io_completion
*io
);
73 bool try_get_next(io_completion
*io
);
78 * wait for interval length to complete user_info
80 void wait_interval(void *opaque
, const utime_t
& interval
, void *user_info
);
81 void wakeup(void *opaque
);
83 void register_completion_notifier(RGWAioCompletionNotifier
*cn
);
84 void unregister_completion_notifier(RGWAioCompletionNotifier
*cn
);
87 /* a single use librados aio completion notifier that hooks into the RGWCompletionManager */
88 class RGWAioCompletionNotifier
: public RefCountedObject
{
89 librados::AioCompletion
*c
;
90 RGWCompletionManager
*completion_mgr
;
93 ceph::mutex lock
= ceph::make_mutex("RGWAioCompletionNotifier");
97 RGWAioCompletionNotifier(RGWCompletionManager
*_mgr
, const rgw_io_id
& _io_id
, void *_user_data
);
98 ~RGWAioCompletionNotifier() override
{
101 bool need_unregister
= registered
;
103 completion_mgr
->get();
107 if (need_unregister
) {
108 completion_mgr
->unregister_completion_notifier(this);
109 completion_mgr
->put();
113 librados::AioCompletion
*completion() {
118 std::lock_guard l
{lock
};
132 completion_mgr
->get();
135 completion_mgr
->complete(this, io_id
, user_data
);
136 completion_mgr
->put();
141 // completion notifier with opaque payload (ie a reference-counted pointer)
142 template <typename T
>
143 class RGWAioCompletionNotifierWith
: public RGWAioCompletionNotifier
{
146 RGWAioCompletionNotifierWith(RGWCompletionManager
*mgr
,
147 const rgw_io_id
& io_id
, void *user_data
,
149 : RGWAioCompletionNotifier(mgr
, io_id
, user_data
), value(std::move(value
))
153 struct RGWCoroutinesEnv
{
154 uint64_t run_context
;
155 RGWCoroutinesManager
*manager
;
156 list
<RGWCoroutinesStack
*> *scheduled_stacks
;
157 RGWCoroutinesStack
*stack
;
159 RGWCoroutinesEnv() : run_context(0), manager(NULL
), scheduled_stacks(NULL
), stack(NULL
) {}
162 enum RGWCoroutineState
{
163 RGWCoroutine_Error
= -2,
164 RGWCoroutine_Done
= -1,
165 RGWCoroutine_Run
= 0,
168 struct rgw_spawned_stacks
{
169 vector
<RGWCoroutinesStack
*> entries
;
171 rgw_spawned_stacks() {}
173 void add_pending(RGWCoroutinesStack
*s
) {
174 entries
.push_back(s
);
177 void inherit(rgw_spawned_stacks
*source
) {
178 for (vector
<RGWCoroutinesStack
*>::iterator iter
= source
->entries
.begin();
179 iter
!= source
->entries
.end(); ++iter
) {
182 source
->entries
.clear();
188 class RGWCoroutine
: public RefCountedObject
, public boost::asio::coroutine
{
189 friend class RGWCoroutinesStack
;
195 StatusItem(utime_t
& t
, const string
& s
) : timestamp(t
), status(s
) {}
197 void dump(Formatter
*f
) const;
200 #define MAX_COROUTINE_HISTORY 10
204 ceph::shared_mutex lock
=
205 ceph::make_shared_mutex("RGWCoroutine::Status::lock");
211 explicit Status(CephContext
*_cct
) : cct(_cct
), max_history(MAX_COROUTINE_HISTORY
) {}
213 deque
<StatusItem
> history
;
215 stringstream
& set_status();
218 stringstream description
;
222 boost::asio::coroutine drain_cr
;
226 RGWCoroutinesStack
*stack
;
230 rgw_spawned_stacks spawned
;
232 stringstream error_stream
;
234 int set_state(int s
, int ret
= 0) {
239 int set_cr_error(int ret
) {
240 return set_state(RGWCoroutine_Error
, ret
);
243 return set_state(RGWCoroutine_Done
, 0);
245 void set_io_blocked(bool flag
);
247 void reset_description() {
248 description
.str(string());
251 stringstream
& set_description() {
254 stringstream
& set_status() {
255 return status
.set_status();
258 stringstream
& set_status(const string
& s
) {
259 stringstream
& status
= set_status();
264 virtual int operate_wrapper() {
268 RGWCoroutine(CephContext
*_cct
) : status(_cct
), _yield_ret(false), cct(_cct
), stack(NULL
), retcode(0), state(RGWCoroutine_Run
) {}
269 ~RGWCoroutine() override
;
271 virtual int operate() = 0;
273 bool is_done() { return (state
== RGWCoroutine_Done
|| state
== RGWCoroutine_Error
); }
274 bool is_error() { return (state
== RGWCoroutine_Error
); }
276 stringstream
& log_error() { return error_stream
; }
278 return error_stream
.str();
281 void set_retcode(int r
) {
285 int get_ret_status() {
289 void call(RGWCoroutine
*op
); /* call at the same stack we're in */
290 RGWCoroutinesStack
*spawn(RGWCoroutine
*op
, bool wait
); /* execute on a different stack */
291 bool collect(int *ret
, RGWCoroutinesStack
*skip_stack
); /* returns true if needs to be called again */
292 bool collect_next(int *ret
, RGWCoroutinesStack
**collected_stack
= NULL
); /* returns true if found a stack to collect */
294 int wait(const utime_t
& interval
);
295 bool drain_children(int num_cr_left
, RGWCoroutinesStack
*skip_stack
= NULL
); /* returns true if needed to be called again */
297 void set_sleeping(bool flag
); /* put in sleep, or wakeup from sleep */
299 size_t num_spawned() {
300 return spawned
.entries
.size();
303 void wait_for_child();
305 virtual string
to_str() const;
307 RGWCoroutinesStack
*get_stack() const {
311 RGWCoroutinesEnv
*get_env() const;
313 void dump(Formatter
*f
) const;
315 void init_new_io(RGWIOProvider
*io_provider
); /* only links the default io id */
317 int io_block(int ret
= 0) {
318 return io_block(ret
, -1);
320 int io_block(int ret
, int64_t io_id
);
321 int io_block(int ret
, const rgw_io_id
& io_id
);
323 io_complete(rgw_io_id
{});
325 void io_complete(const rgw_io_id
& io_id
);
328 ostream
& operator<<(ostream
& out
, const RGWCoroutine
& cr
);
330 #define yield_until_true(x) \
333 yield _yield_ret = x; \
334 } while (!_yield_ret); \
335 _yield_ret = false; \
338 #define drain_all() \
339 drain_cr = boost::asio::coroutine(); \
340 yield_until_true(drain_children(0))
342 #define drain_all_but(n) \
343 drain_cr = boost::asio::coroutine(); \
344 yield_until_true(drain_children(n))
346 #define drain_all_but_stack(stack) \
347 drain_cr = boost::asio::coroutine(); \
348 yield_until_true(drain_children(1, stack))
351 class RGWConsumerCR
: public RGWCoroutine
{
355 explicit RGWConsumerCR(CephContext
*_cct
) : RGWCoroutine(_cct
) {}
358 return !product
.empty();
361 void wait_for_product() {
362 if (!has_product()) {
368 if (product
.empty()) {
371 *p
= product
.front();
376 void receive(const T
& p
, bool wakeup
= true);
377 void receive(list
<T
>& l
, bool wakeup
= true);
380 class RGWCoroutinesStack
: public RefCountedObject
{
381 friend class RGWCoroutine
;
382 friend class RGWCoroutinesManager
;
386 RGWCoroutinesManager
*ops_mgr
;
388 list
<RGWCoroutine
*> ops
;
389 list
<RGWCoroutine
*>::iterator pos
;
391 rgw_spawned_stacks spawned
;
393 set
<RGWCoroutinesStack
*> blocked_by_stack
;
394 set
<RGWCoroutinesStack
*> blocking_stacks
;
396 map
<int64_t, rgw_io_id
> io_finish_ids
;
397 rgw_io_id io_blocked_id
;
403 bool interval_wait_flag
;
407 bool is_waiting_for_child
;
414 RGWCoroutinesEnv
*env
;
415 RGWCoroutinesStack
*parent
;
417 RGWCoroutinesStack
*spawn(RGWCoroutine
*source_op
, RGWCoroutine
*next_op
, bool wait
);
418 bool collect(RGWCoroutine
*op
, int *ret
, RGWCoroutinesStack
*skip_stack
); /* returns true if needs to be called again */
419 bool collect_next(RGWCoroutine
*op
, int *ret
, RGWCoroutinesStack
**collected_stack
); /* returns true if found a stack to collect */
421 RGWCoroutinesStack(CephContext
*_cct
, RGWCoroutinesManager
*_ops_mgr
, RGWCoroutine
*start
= NULL
);
422 ~RGWCoroutinesStack() override
;
424 int operate(RGWCoroutinesEnv
*env
);
432 bool is_blocked_by_stack() {
433 return !blocked_by_stack
.empty();
435 void set_io_blocked(bool flag
) {
438 void set_io_blocked_id(const rgw_io_id
& io_id
) {
439 io_blocked_id
= io_id
;
441 bool is_io_blocked() {
442 return blocked_flag
&& !done_flag
;
444 bool can_io_unblock(const rgw_io_id
& io_id
) {
445 return ((io_blocked_id
.id
< 0) ||
446 io_blocked_id
.intersects(io_id
));
448 bool try_io_unblock(const rgw_io_id
& io_id
);
449 bool consume_io_finish(const rgw_io_id
& io_id
);
450 void set_interval_wait(bool flag
) {
451 interval_wait_flag
= flag
;
453 bool is_interval_waiting() {
454 return interval_wait_flag
;
456 void set_sleeping(bool flag
) {
457 bool wakeup
= sleep_flag
& !flag
;
466 void set_is_scheduled(bool flag
) {
471 return is_blocked_by_stack() || is_sleeping() ||
472 is_io_blocked() || waiting_for_child() ;
478 int get_ret_status() {
484 void call(RGWCoroutine
*next_op
);
485 RGWCoroutinesStack
*spawn(RGWCoroutine
*next_op
, bool wait
);
486 int unwind(int retcode
);
488 int wait(const utime_t
& interval
);
491 io_complete(rgw_io_id
{});
493 void io_complete(const rgw_io_id
& io_id
);
495 bool collect(int *ret
, RGWCoroutinesStack
*skip_stack
); /* returns true if needs to be called again */
499 RGWAioCompletionNotifier
*create_completion_notifier();
500 template <typename T
>
501 RGWAioCompletionNotifier
*create_completion_notifier(T value
);
502 RGWCompletionManager
*get_completion_mgr();
504 void set_blocked_by(RGWCoroutinesStack
*s
) {
505 blocked_by_stack
.insert(s
);
506 s
->blocking_stacks
.insert(this);
509 void set_wait_for_child(bool flag
) {
510 is_waiting_for_child
= flag
;
513 bool waiting_for_child() {
514 return is_waiting_for_child
;
517 bool unblock_stack(RGWCoroutinesStack
**s
);
519 RGWCoroutinesEnv
*get_env() const { return env
; }
521 void dump(Formatter
*f
) const;
523 void init_new_io(RGWIOProvider
*io_provider
);
527 void RGWConsumerCR
<T
>::receive(list
<T
>& l
, bool wakeup
)
529 product
.splice(product
.end(), l
);
537 void RGWConsumerCR
<T
>::receive(const T
& p
, bool wakeup
)
539 product
.push_back(p
);
545 class RGWCoroutinesManagerRegistry
: public RefCountedObject
, public AdminSocketHook
{
548 set
<RGWCoroutinesManager
*> managers
;
549 ceph::shared_mutex lock
=
550 ceph::make_shared_mutex("RGWCoroutinesRegistry::lock");
552 string admin_command
;
555 explicit RGWCoroutinesManagerRegistry(CephContext
*_cct
) : cct(_cct
) {}
556 ~RGWCoroutinesManagerRegistry() override
;
558 void add(RGWCoroutinesManager
*mgr
);
559 void remove(RGWCoroutinesManager
*mgr
);
561 int hook_to_admin_command(const string
& command
);
562 int call(std::string_view command
, const cmdmap_t
& cmdmap
,
565 bufferlist
& out
) override
;
567 void dump(Formatter
*f
) const;
570 class RGWCoroutinesManager
{
572 std::atomic
<bool> going_down
= { false };
574 std::atomic
<int64_t> run_context_count
= { 0 };
575 map
<uint64_t, set
<RGWCoroutinesStack
*> > run_contexts
;
577 std::atomic
<int64_t> max_io_id
= { 0 };
579 mutable ceph::shared_mutex lock
=
580 ceph::make_shared_mutex("RGWCoroutinesManager::lock");
582 RGWIOIDProvider io_id_provider
;
584 void handle_unblocked_stack(set
<RGWCoroutinesStack
*>& context_stacks
, list
<RGWCoroutinesStack
*>& scheduled_stacks
,
585 RGWCompletionManager::io_completion
& io
, int *waiting_count
);
587 RGWCompletionManager
*completion_mgr
;
588 RGWCoroutinesManagerRegistry
*cr_registry
;
594 void put_completion_notifier(RGWAioCompletionNotifier
*cn
);
596 RGWCoroutinesManager(CephContext
*_cct
, RGWCoroutinesManagerRegistry
*_cr_registry
) : cct(_cct
),
597 cr_registry(_cr_registry
), ops_window(RGW_ASYNC_OPS_MGR_WINDOW
) {
598 completion_mgr
= new RGWCompletionManager(cct
);
600 cr_registry
->add(this);
603 virtual ~RGWCoroutinesManager() {
605 completion_mgr
->put();
607 cr_registry
->remove(this);
611 int run(list
<RGWCoroutinesStack
*>& ops
);
612 int run(RGWCoroutine
*op
);
614 bool expected
= false;
615 if (going_down
.compare_exchange_strong(expected
, true)) {
616 completion_mgr
->go_down();
620 virtual void report_error(RGWCoroutinesStack
*op
);
622 RGWAioCompletionNotifier
*create_completion_notifier(RGWCoroutinesStack
*stack
);
623 template <typename T
>
624 RGWAioCompletionNotifier
*create_completion_notifier(RGWCoroutinesStack
*stack
, T value
);
625 RGWCompletionManager
*get_completion_mgr() { return completion_mgr
; }
627 void schedule(RGWCoroutinesEnv
*env
, RGWCoroutinesStack
*stack
);
628 void _schedule(RGWCoroutinesEnv
*env
, RGWCoroutinesStack
*stack
);
629 RGWCoroutinesStack
*allocate_stack();
631 int64_t get_next_io_id();
633 void set_sleeping(RGWCoroutine
*cr
, bool flag
);
634 void io_complete(RGWCoroutine
*cr
, const rgw_io_id
& io_id
);
636 virtual string
get_id();
637 void dump(Formatter
*f
) const;
639 RGWIOIDProvider
& get_io_id_provider() {
640 return io_id_provider
;
644 template <typename T
>
645 RGWAioCompletionNotifier
*RGWCoroutinesManager::create_completion_notifier(RGWCoroutinesStack
*stack
, T value
)
647 rgw_io_id io_id
{get_next_io_id(), -1};
648 RGWAioCompletionNotifier
*cn
= new RGWAioCompletionNotifierWith
<T
>(completion_mgr
, io_id
, (void *)stack
, std::move(value
));
649 completion_mgr
->register_completion_notifier(cn
);
653 template <typename T
>
654 RGWAioCompletionNotifier
*RGWCoroutinesStack::create_completion_notifier(T value
)
656 return ops_mgr
->create_completion_notifier(this, std::move(value
));
659 class RGWSimpleCoroutine
: public RGWCoroutine
{
662 int operate() override
;
665 int state_send_request();
666 int state_request_complete();
667 int state_all_complete();
672 RGWSimpleCoroutine(CephContext
*_cct
) : RGWCoroutine(_cct
), called_cleanup(false) {}
673 ~RGWSimpleCoroutine() override
;
675 virtual int init() { return 0; }
676 virtual int send_request() = 0;
677 virtual int request_complete() = 0;
678 virtual int finish() { return 0; }
679 virtual void request_cleanup() {}