1 #ifndef CEPH_RGW_COROUTINE_H
2 #define CEPH_RGW_COROUTINE_H
6 #pragma push_macro("_ASSERT_H")
9 #include <boost/intrusive_ptr.hpp>
12 #pragma pop_macro("_ASSERT_H")
15 #include "include/utime.h"
16 #include "common/RefCountedObj.h"
17 #include "common/debug.h"
18 #include "common/Timer.h"
19 #include "common/admin_socket.h"
21 #include "rgw_common.h"
22 #include <boost/asio/coroutine.hpp>
26 #define RGW_ASYNC_OPS_MGR_WINDOW 100
28 class RGWCoroutinesStack
;
29 class RGWCoroutinesManager
;
30 class RGWAioCompletionNotifier
;
32 class RGWCompletionManager
: public RefCountedObject
{
34 list
<void *> complete_reqs
;
35 using NotifierRef
= boost::intrusive_ptr
<RGWAioCompletionNotifier
>;
43 std::atomic
<bool> going_down
= { false };
45 map
<void *, void *> waiters
;
50 void _wakeup(void *opaque
);
51 void _complete(RGWAioCompletionNotifier
*cn
, void *user_info
);
53 RGWCompletionManager(CephContext
*_cct
);
54 ~RGWCompletionManager() override
;
56 void complete(RGWAioCompletionNotifier
*cn
, void *user_info
);
57 int get_next(void **user_info
);
58 bool try_get_next(void **user_info
);
63 * wait for interval length to complete user_info
65 void wait_interval(void *opaque
, const utime_t
& interval
, void *user_info
);
66 void wakeup(void *opaque
);
68 void register_completion_notifier(RGWAioCompletionNotifier
*cn
);
69 void unregister_completion_notifier(RGWAioCompletionNotifier
*cn
);
72 /* a single use librados aio completion notifier that hooks into the RGWCompletionManager */
73 class RGWAioCompletionNotifier
: public RefCountedObject
{
74 librados::AioCompletion
*c
;
75 RGWCompletionManager
*completion_mgr
;
81 RGWAioCompletionNotifier(RGWCompletionManager
*_mgr
, void *_user_data
);
82 ~RGWAioCompletionNotifier() override
{
85 bool need_unregister
= registered
;
87 completion_mgr
->get();
91 if (need_unregister
) {
92 completion_mgr
->unregister_completion_notifier(this);
93 completion_mgr
->put();
97 librados::AioCompletion
*completion() {
102 Mutex::Locker
l(lock
);
116 completion_mgr
->get();
119 completion_mgr
->complete(this, user_data
);
120 completion_mgr
->put();
125 struct RGWCoroutinesEnv
{
126 uint64_t run_context
;
127 RGWCoroutinesManager
*manager
;
128 list
<RGWCoroutinesStack
*> *scheduled_stacks
;
129 RGWCoroutinesStack
*stack
;
131 RGWCoroutinesEnv() : run_context(0), manager(NULL
), scheduled_stacks(NULL
), stack(NULL
) {}
134 enum RGWCoroutineState
{
135 RGWCoroutine_Error
= -2,
136 RGWCoroutine_Done
= -1,
137 RGWCoroutine_Run
= 0,
140 struct rgw_spawned_stacks
{
141 vector
<RGWCoroutinesStack
*> entries
;
143 rgw_spawned_stacks() {}
145 void add_pending(RGWCoroutinesStack
*s
) {
146 entries
.push_back(s
);
149 void inherit(rgw_spawned_stacks
*source
) {
150 for (vector
<RGWCoroutinesStack
*>::iterator iter
= source
->entries
.begin();
151 iter
!= source
->entries
.end(); ++iter
) {
154 source
->entries
.clear();
160 class RGWCoroutine
: public RefCountedObject
, public boost::asio::coroutine
{
161 friend class RGWCoroutinesStack
;
167 StatusItem(utime_t
& t
, const string
& s
) : timestamp(t
), status(s
) {}
169 void dump(Formatter
*f
) const;
172 #define MAX_COROUTINE_HISTORY 10
182 Status(CephContext
*_cct
) : cct(_cct
), lock("RGWCoroutine::Status::lock"), max_history(MAX_COROUTINE_HISTORY
) {}
184 deque
<StatusItem
> history
;
186 stringstream
& set_status();
189 stringstream description
;
193 boost::asio::coroutine drain_cr
;
197 RGWCoroutinesStack
*stack
;
201 rgw_spawned_stacks spawned
;
203 stringstream error_stream
;
205 int set_state(int s
, int ret
= 0) {
209 int set_cr_error(int ret
) {
210 state
= RGWCoroutine_Error
;
214 state
= RGWCoroutine_Done
;
217 void set_io_blocked(bool flag
);
218 int io_block(int ret
= 0);
220 void reset_description() {
221 description
.str(string());
224 stringstream
& set_description() {
227 stringstream
& set_status() {
228 return status
.set_status();
231 stringstream
& set_status(const string
& s
) {
232 stringstream
& status
= set_status();
238 RGWCoroutine(CephContext
*_cct
) : status(_cct
), _yield_ret(false), cct(_cct
), stack(NULL
), retcode(0), state(RGWCoroutine_Run
) {}
239 ~RGWCoroutine() override
;
241 virtual int operate() = 0;
243 bool is_done() { return (state
== RGWCoroutine_Done
|| state
== RGWCoroutine_Error
); }
244 bool is_error() { return (state
== RGWCoroutine_Error
); }
246 stringstream
& log_error() { return error_stream
; }
248 return error_stream
.str();
251 void set_retcode(int r
) {
255 int get_ret_status() {
259 void call(RGWCoroutine
*op
); /* call at the same stack we're in */
260 RGWCoroutinesStack
*spawn(RGWCoroutine
*op
, bool wait
); /* execute on a different stack */
261 bool collect(int *ret
, RGWCoroutinesStack
*skip_stack
); /* returns true if needs to be called again */
262 bool collect_next(int *ret
, RGWCoroutinesStack
**collected_stack
= NULL
); /* returns true if found a stack to collect */
264 int wait(const utime_t
& interval
);
265 bool drain_children(int num_cr_left
, RGWCoroutinesStack
*skip_stack
= NULL
); /* returns true if needed to be called again */
267 void set_sleeping(bool flag
); /* put in sleep, or wakeup from sleep */
269 size_t num_spawned() {
270 return spawned
.entries
.size();
273 void wait_for_child();
275 virtual string
to_str() const;
277 RGWCoroutinesStack
*get_stack() const {
281 void dump(Formatter
*f
) const;
284 ostream
& operator<<(ostream
& out
, const RGWCoroutine
& cr
);
286 #define yield_until_true(x) \
289 yield _yield_ret = x; \
290 } while (!_yield_ret); \
291 _yield_ret = false; \
294 #define drain_all() \
295 drain_cr = boost::asio::coroutine(); \
296 yield_until_true(drain_children(0))
298 #define drain_all_but(n) \
299 drain_cr = boost::asio::coroutine(); \
300 yield_until_true(drain_children(n))
302 #define drain_all_but_stack(stack) \
303 drain_cr = boost::asio::coroutine(); \
304 yield_until_true(drain_children(1, stack))
307 class RGWConsumerCR
: public RGWCoroutine
{
311 RGWConsumerCR(CephContext
*_cct
) : RGWCoroutine(_cct
) {}
314 return !product
.empty();
317 void wait_for_product() {
318 if (!has_product()) {
324 if (product
.empty()) {
327 *p
= product
.front();
332 void receive(const T
& p
, bool wakeup
= true);
333 void receive(list
<T
>& l
, bool wakeup
= true);
336 class RGWCoroutinesStack
: public RefCountedObject
{
337 friend class RGWCoroutine
;
338 friend class RGWCoroutinesManager
;
342 RGWCoroutinesManager
*ops_mgr
;
344 list
<RGWCoroutine
*> ops
;
345 list
<RGWCoroutine
*>::iterator pos
;
347 rgw_spawned_stacks spawned
;
349 set
<RGWCoroutinesStack
*> blocked_by_stack
;
350 set
<RGWCoroutinesStack
*> blocking_stacks
;
356 bool interval_wait_flag
;
360 bool is_waiting_for_child
;
367 RGWCoroutinesEnv
*env
;
368 RGWCoroutinesStack
*parent
;
370 RGWCoroutinesStack
*spawn(RGWCoroutine
*source_op
, RGWCoroutine
*next_op
, bool wait
);
371 bool collect(RGWCoroutine
*op
, int *ret
, RGWCoroutinesStack
*skip_stack
); /* returns true if needs to be called again */
372 bool collect_next(RGWCoroutine
*op
, int *ret
, RGWCoroutinesStack
**collected_stack
); /* returns true if found a stack to collect */
374 RGWCoroutinesStack(CephContext
*_cct
, RGWCoroutinesManager
*_ops_mgr
, RGWCoroutine
*start
= NULL
);
375 ~RGWCoroutinesStack() override
;
377 int operate(RGWCoroutinesEnv
*env
);
385 bool is_blocked_by_stack() {
386 return !blocked_by_stack
.empty();
388 void set_io_blocked(bool flag
) {
391 bool is_io_blocked() {
394 void set_interval_wait(bool flag
) {
395 interval_wait_flag
= flag
;
397 bool is_interval_waiting() {
398 return interval_wait_flag
;
400 void set_sleeping(bool flag
) {
401 bool wakeup
= sleep_flag
& !flag
;
410 void set_is_scheduled(bool flag
) {
415 return is_blocked_by_stack() || is_sleeping() ||
416 is_io_blocked() || waiting_for_child() ;
419 void schedule(list
<RGWCoroutinesStack
*> *stacks
= NULL
) {
421 stacks
= env
->scheduled_stacks
;
424 stacks
->push_back(this);
429 int get_ret_status() {
435 void call(RGWCoroutine
*next_op
);
436 RGWCoroutinesStack
*spawn(RGWCoroutine
*next_op
, bool wait
);
437 int unwind(int retcode
);
439 int wait(const utime_t
& interval
);
442 bool collect(int *ret
, RGWCoroutinesStack
*skip_stack
); /* returns true if needs to be called again */
444 RGWAioCompletionNotifier
*create_completion_notifier();
445 RGWCompletionManager
*get_completion_mgr();
447 void set_blocked_by(RGWCoroutinesStack
*s
) {
448 blocked_by_stack
.insert(s
);
449 s
->blocking_stacks
.insert(this);
452 void set_wait_for_child(bool flag
) {
453 is_waiting_for_child
= flag
;
456 bool waiting_for_child() {
457 return is_waiting_for_child
;
460 bool unblock_stack(RGWCoroutinesStack
**s
);
462 RGWCoroutinesEnv
*get_env() { return env
; }
464 void dump(Formatter
*f
) const;
468 void RGWConsumerCR
<T
>::receive(list
<T
>& l
, bool wakeup
)
470 product
.splice(product
.end(), l
);
478 void RGWConsumerCR
<T
>::receive(const T
& p
, bool wakeup
)
480 product
.push_back(p
);
486 class RGWCoroutinesManagerRegistry
: public RefCountedObject
, public AdminSocketHook
{
489 set
<RGWCoroutinesManager
*> managers
;
492 string admin_command
;
495 RGWCoroutinesManagerRegistry(CephContext
*_cct
) : cct(_cct
), lock("RGWCoroutinesRegistry::lock") {}
496 ~RGWCoroutinesManagerRegistry() override
;
498 void add(RGWCoroutinesManager
*mgr
);
499 void remove(RGWCoroutinesManager
*mgr
);
501 int hook_to_admin_command(const string
& command
);
502 bool call(std::string command
, cmdmap_t
& cmdmap
, std::string format
,
503 bufferlist
& out
) override
;
505 void dump(Formatter
*f
) const;
508 class RGWCoroutinesManager
{
510 std::atomic
<bool> going_down
= { false };
512 std::atomic
<int64_t> run_context_count
= { 0 };
513 map
<uint64_t, set
<RGWCoroutinesStack
*> > run_contexts
;
517 void handle_unblocked_stack(set
<RGWCoroutinesStack
*>& context_stacks
, list
<RGWCoroutinesStack
*>& scheduled_stacks
, RGWCoroutinesStack
*stack
, int *waiting_count
);
519 RGWCompletionManager
*completion_mgr
;
520 RGWCoroutinesManagerRegistry
*cr_registry
;
526 void put_completion_notifier(RGWAioCompletionNotifier
*cn
);
528 RGWCoroutinesManager(CephContext
*_cct
, RGWCoroutinesManagerRegistry
*_cr_registry
) : cct(_cct
), lock("RGWCoroutinesManager::lock"),
529 cr_registry(_cr_registry
), ops_window(RGW_ASYNC_OPS_MGR_WINDOW
) {
530 completion_mgr
= new RGWCompletionManager(cct
);
532 cr_registry
->add(this);
535 virtual ~RGWCoroutinesManager() {
537 completion_mgr
->put();
539 cr_registry
->remove(this);
543 int run(list
<RGWCoroutinesStack
*>& ops
);
544 int run(RGWCoroutine
*op
);
546 bool expected
= false;
547 if (going_down
.compare_exchange_strong(expected
, true)) {
548 completion_mgr
->go_down();
552 virtual void report_error(RGWCoroutinesStack
*op
);
554 RGWAioCompletionNotifier
*create_completion_notifier(RGWCoroutinesStack
*stack
);
555 RGWCompletionManager
*get_completion_mgr() { return completion_mgr
; }
557 void schedule(RGWCoroutinesEnv
*env
, RGWCoroutinesStack
*stack
);
558 RGWCoroutinesStack
*allocate_stack();
560 virtual string
get_id();
561 void dump(Formatter
*f
) const;
564 class RGWSimpleCoroutine
: public RGWCoroutine
{
567 int operate() override
;
570 int state_send_request();
571 int state_request_complete();
572 int state_all_complete();
577 RGWSimpleCoroutine(CephContext
*_cct
) : RGWCoroutine(_cct
), called_cleanup(false) {}
578 ~RGWSimpleCoroutine() override
;
580 virtual int init() { return 0; }
581 virtual int send_request() = 0;
582 virtual int request_complete() = 0;
583 virtual int finish() { return 0; }
584 virtual void request_cleanup() {}