1 #ifndef CEPH_RGW_COROUTINE_H
2 #define CEPH_RGW_COROUTINE_H
6 #pragma push_macro("_ASSERT_H")
9 #include <boost/asio.hpp>
10 #include <boost/intrusive_ptr.hpp>
13 #pragma pop_macro("_ASSERT_H")
16 #include "include/utime.h"
17 #include "common/RefCountedObj.h"
18 #include "common/debug.h"
19 #include "common/Timer.h"
20 #include "common/admin_socket.h"
22 #include "rgw_common.h"
23 #include <boost/asio/coroutine.hpp>
27 #define RGW_ASYNC_OPS_MGR_WINDOW 100
29 class RGWCoroutinesStack
;
30 class RGWCoroutinesManager
;
31 class RGWAioCompletionNotifier
;
33 class RGWCompletionManager
: public RefCountedObject
{
35 list
<void *> complete_reqs
;
36 using NotifierRef
= boost::intrusive_ptr
<RGWAioCompletionNotifier
>;
44 std::atomic
<bool> going_down
= { false };
46 map
<void *, void *> waiters
;
51 void _wakeup(void *opaque
);
52 void _complete(RGWAioCompletionNotifier
*cn
, void *user_info
);
54 RGWCompletionManager(CephContext
*_cct
);
55 ~RGWCompletionManager() override
;
57 void complete(RGWAioCompletionNotifier
*cn
, void *user_info
);
58 int get_next(void **user_info
);
59 bool try_get_next(void **user_info
);
64 * wait for interval length to complete user_info
66 void wait_interval(void *opaque
, const utime_t
& interval
, void *user_info
);
67 void wakeup(void *opaque
);
69 void register_completion_notifier(RGWAioCompletionNotifier
*cn
);
70 void unregister_completion_notifier(RGWAioCompletionNotifier
*cn
);
73 /* a single use librados aio completion notifier that hooks into the RGWCompletionManager */
74 class RGWAioCompletionNotifier
: public RefCountedObject
{
75 librados::AioCompletion
*c
;
76 RGWCompletionManager
*completion_mgr
;
82 RGWAioCompletionNotifier(RGWCompletionManager
*_mgr
, void *_user_data
);
83 ~RGWAioCompletionNotifier() override
{
86 bool need_unregister
= registered
;
88 completion_mgr
->get();
92 if (need_unregister
) {
93 completion_mgr
->unregister_completion_notifier(this);
94 completion_mgr
->put();
98 librados::AioCompletion
*completion() {
103 Mutex::Locker
l(lock
);
117 completion_mgr
->get();
120 completion_mgr
->complete(this, user_data
);
121 completion_mgr
->put();
126 struct RGWCoroutinesEnv
{
127 uint64_t run_context
;
128 RGWCoroutinesManager
*manager
;
129 list
<RGWCoroutinesStack
*> *scheduled_stacks
;
130 RGWCoroutinesStack
*stack
;
132 RGWCoroutinesEnv() : run_context(0), manager(NULL
), scheduled_stacks(NULL
), stack(NULL
) {}
135 enum RGWCoroutineState
{
136 RGWCoroutine_Error
= -2,
137 RGWCoroutine_Done
= -1,
138 RGWCoroutine_Run
= 0,
141 struct rgw_spawned_stacks
{
142 vector
<RGWCoroutinesStack
*> entries
;
144 rgw_spawned_stacks() {}
146 void add_pending(RGWCoroutinesStack
*s
) {
147 entries
.push_back(s
);
150 void inherit(rgw_spawned_stacks
*source
) {
151 for (vector
<RGWCoroutinesStack
*>::iterator iter
= source
->entries
.begin();
152 iter
!= source
->entries
.end(); ++iter
) {
155 source
->entries
.clear();
161 class RGWCoroutine
: public RefCountedObject
, public boost::asio::coroutine
{
162 friend class RGWCoroutinesStack
;
168 StatusItem(utime_t
& t
, const string
& s
) : timestamp(t
), status(s
) {}
170 void dump(Formatter
*f
) const;
173 #define MAX_COROUTINE_HISTORY 10
183 Status(CephContext
*_cct
) : cct(_cct
), lock("RGWCoroutine::Status::lock"), max_history(MAX_COROUTINE_HISTORY
) {}
185 deque
<StatusItem
> history
;
187 stringstream
& set_status();
190 stringstream description
;
194 boost::asio::coroutine drain_cr
;
198 RGWCoroutinesStack
*stack
;
202 rgw_spawned_stacks spawned
;
204 stringstream error_stream
;
206 int set_state(int s
, int ret
= 0) {
210 int set_cr_error(int ret
) {
211 state
= RGWCoroutine_Error
;
215 state
= RGWCoroutine_Done
;
218 void set_io_blocked(bool flag
);
219 int io_block(int ret
= 0);
221 void reset_description() {
222 description
.str(string());
225 stringstream
& set_description() {
228 stringstream
& set_status() {
229 return status
.set_status();
232 stringstream
& set_status(const string
& s
) {
233 stringstream
& status
= set_status();
239 RGWCoroutine(CephContext
*_cct
) : status(_cct
), _yield_ret(false), cct(_cct
), stack(NULL
), retcode(0), state(RGWCoroutine_Run
) {}
240 ~RGWCoroutine() override
;
242 virtual int operate() = 0;
244 bool is_done() { return (state
== RGWCoroutine_Done
|| state
== RGWCoroutine_Error
); }
245 bool is_error() { return (state
== RGWCoroutine_Error
); }
247 stringstream
& log_error() { return error_stream
; }
249 return error_stream
.str();
252 void set_retcode(int r
) {
256 int get_ret_status() {
260 void call(RGWCoroutine
*op
); /* call at the same stack we're in */
261 RGWCoroutinesStack
*spawn(RGWCoroutine
*op
, bool wait
); /* execute on a different stack */
262 bool collect(int *ret
, RGWCoroutinesStack
*skip_stack
); /* returns true if needs to be called again */
263 bool collect_next(int *ret
, RGWCoroutinesStack
**collected_stack
= NULL
); /* returns true if found a stack to collect */
265 int wait(const utime_t
& interval
);
266 bool drain_children(int num_cr_left
, RGWCoroutinesStack
*skip_stack
= NULL
); /* returns true if needed to be called again */
268 void set_sleeping(bool flag
); /* put in sleep, or wakeup from sleep */
270 size_t num_spawned() {
271 return spawned
.entries
.size();
274 void wait_for_child();
276 virtual string
to_str() const;
278 RGWCoroutinesStack
*get_stack() const {
282 void dump(Formatter
*f
) const;
285 ostream
& operator<<(ostream
& out
, const RGWCoroutine
& cr
);
287 #define yield_until_true(x) \
290 yield _yield_ret = x; \
291 } while (!_yield_ret); \
292 _yield_ret = false; \
295 #define drain_all() \
296 drain_cr = boost::asio::coroutine(); \
297 yield_until_true(drain_children(0))
299 #define drain_all_but(n) \
300 drain_cr = boost::asio::coroutine(); \
301 yield_until_true(drain_children(n))
303 #define drain_all_but_stack(stack) \
304 drain_cr = boost::asio::coroutine(); \
305 yield_until_true(drain_children(1, stack))
308 class RGWConsumerCR
: public RGWCoroutine
{
312 RGWConsumerCR(CephContext
*_cct
) : RGWCoroutine(_cct
) {}
315 return !product
.empty();
318 void wait_for_product() {
319 if (!has_product()) {
325 if (product
.empty()) {
328 *p
= product
.front();
333 void receive(const T
& p
, bool wakeup
= true);
334 void receive(list
<T
>& l
, bool wakeup
= true);
337 class RGWCoroutinesStack
: public RefCountedObject
{
338 friend class RGWCoroutine
;
339 friend class RGWCoroutinesManager
;
343 RGWCoroutinesManager
*ops_mgr
;
345 list
<RGWCoroutine
*> ops
;
346 list
<RGWCoroutine
*>::iterator pos
;
348 rgw_spawned_stacks spawned
;
350 set
<RGWCoroutinesStack
*> blocked_by_stack
;
351 set
<RGWCoroutinesStack
*> blocking_stacks
;
357 bool interval_wait_flag
;
361 bool is_waiting_for_child
;
368 RGWCoroutinesEnv
*env
;
369 RGWCoroutinesStack
*parent
;
371 RGWCoroutinesStack
*spawn(RGWCoroutine
*source_op
, RGWCoroutine
*next_op
, bool wait
);
372 bool collect(RGWCoroutine
*op
, int *ret
, RGWCoroutinesStack
*skip_stack
); /* returns true if needs to be called again */
373 bool collect_next(RGWCoroutine
*op
, int *ret
, RGWCoroutinesStack
**collected_stack
); /* returns true if found a stack to collect */
375 RGWCoroutinesStack(CephContext
*_cct
, RGWCoroutinesManager
*_ops_mgr
, RGWCoroutine
*start
= NULL
);
376 ~RGWCoroutinesStack() override
;
378 int operate(RGWCoroutinesEnv
*env
);
386 bool is_blocked_by_stack() {
387 return !blocked_by_stack
.empty();
389 void set_io_blocked(bool flag
) {
392 bool is_io_blocked() {
395 void set_interval_wait(bool flag
) {
396 interval_wait_flag
= flag
;
398 bool is_interval_waiting() {
399 return interval_wait_flag
;
401 void set_sleeping(bool flag
) {
402 bool wakeup
= sleep_flag
& !flag
;
411 void set_is_scheduled(bool flag
) {
416 return is_blocked_by_stack() || is_sleeping() ||
417 is_io_blocked() || waiting_for_child() ;
420 void schedule(list
<RGWCoroutinesStack
*> *stacks
= NULL
) {
422 stacks
= env
->scheduled_stacks
;
425 stacks
->push_back(this);
430 int get_ret_status() {
436 void call(RGWCoroutine
*next_op
);
437 RGWCoroutinesStack
*spawn(RGWCoroutine
*next_op
, bool wait
);
438 int unwind(int retcode
);
440 int wait(const utime_t
& interval
);
443 bool collect(int *ret
, RGWCoroutinesStack
*skip_stack
); /* returns true if needs to be called again */
445 RGWAioCompletionNotifier
*create_completion_notifier();
446 RGWCompletionManager
*get_completion_mgr();
448 void set_blocked_by(RGWCoroutinesStack
*s
) {
449 blocked_by_stack
.insert(s
);
450 s
->blocking_stacks
.insert(this);
453 void set_wait_for_child(bool flag
) {
454 is_waiting_for_child
= flag
;
457 bool waiting_for_child() {
458 return is_waiting_for_child
;
461 bool unblock_stack(RGWCoroutinesStack
**s
);
463 RGWCoroutinesEnv
*get_env() { return env
; }
465 void dump(Formatter
*f
) const;
469 void RGWConsumerCR
<T
>::receive(list
<T
>& l
, bool wakeup
)
471 product
.splice(product
.end(), l
);
479 void RGWConsumerCR
<T
>::receive(const T
& p
, bool wakeup
)
481 product
.push_back(p
);
487 class RGWCoroutinesManagerRegistry
: public RefCountedObject
, public AdminSocketHook
{
490 set
<RGWCoroutinesManager
*> managers
;
493 string admin_command
;
496 RGWCoroutinesManagerRegistry(CephContext
*_cct
) : cct(_cct
), lock("RGWCoroutinesRegistry::lock") {}
497 ~RGWCoroutinesManagerRegistry() override
;
499 void add(RGWCoroutinesManager
*mgr
);
500 void remove(RGWCoroutinesManager
*mgr
);
502 int hook_to_admin_command(const string
& command
);
503 bool call(std::string command
, cmdmap_t
& cmdmap
, std::string format
,
504 bufferlist
& out
) override
;
506 void dump(Formatter
*f
) const;
509 class RGWCoroutinesManager
{
511 std::atomic
<bool> going_down
= { false };
513 std::atomic
<int64_t> run_context_count
= { 0 };
514 map
<uint64_t, set
<RGWCoroutinesStack
*> > run_contexts
;
518 void handle_unblocked_stack(set
<RGWCoroutinesStack
*>& context_stacks
, list
<RGWCoroutinesStack
*>& scheduled_stacks
, RGWCoroutinesStack
*stack
, int *waiting_count
);
520 RGWCompletionManager
*completion_mgr
;
521 RGWCoroutinesManagerRegistry
*cr_registry
;
527 void put_completion_notifier(RGWAioCompletionNotifier
*cn
);
529 RGWCoroutinesManager(CephContext
*_cct
, RGWCoroutinesManagerRegistry
*_cr_registry
) : cct(_cct
), lock("RGWCoroutinesManager::lock"),
530 cr_registry(_cr_registry
), ops_window(RGW_ASYNC_OPS_MGR_WINDOW
) {
531 completion_mgr
= new RGWCompletionManager(cct
);
533 cr_registry
->add(this);
536 virtual ~RGWCoroutinesManager() {
538 completion_mgr
->put();
540 cr_registry
->remove(this);
544 int run(list
<RGWCoroutinesStack
*>& ops
);
545 int run(RGWCoroutine
*op
);
547 bool expected
= false;
548 if (going_down
.compare_exchange_strong(expected
, true)) {
549 completion_mgr
->go_down();
553 virtual void report_error(RGWCoroutinesStack
*op
);
555 RGWAioCompletionNotifier
*create_completion_notifier(RGWCoroutinesStack
*stack
);
556 RGWCompletionManager
*get_completion_mgr() { return completion_mgr
; }
558 void schedule(RGWCoroutinesEnv
*env
, RGWCoroutinesStack
*stack
);
559 RGWCoroutinesStack
*allocate_stack();
561 virtual string
get_id();
562 void dump(Formatter
*f
) const;
565 class RGWSimpleCoroutine
: public RGWCoroutine
{
568 int operate() override
;
571 int state_send_request();
572 int state_request_complete();
573 int state_all_complete();
578 RGWSimpleCoroutine(CephContext
*_cct
) : RGWCoroutine(_cct
), called_cleanup(false) {}
579 ~RGWSimpleCoroutine() override
;
581 virtual int init() { return 0; }
582 virtual int send_request() = 0;
583 virtual int request_complete() = 0;
584 virtual int finish() { return 0; }
585 virtual void request_cleanup() {}