]>
Commit | Line | Data |
---|---|---|
11fdf7f2 | 1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
9f95a23c | 2 | // vim: ts=8 sw=2 smarttab ft=cpp |
11fdf7f2 | 3 | |
1e59de90 | 4 | #pragma once |
7c673cae FG |
5 | |
6 | #ifdef _ASSERT_H | |
7 | #define NEED_ASSERT_H | |
8 | #pragma push_macro("_ASSERT_H") | |
9 | #endif | |
10 | ||
11fdf7f2 | 11 | #include <boost/asio.hpp> |
7c673cae FG |
12 | #include <boost/intrusive_ptr.hpp> |
13 | ||
14 | #ifdef NEED_ASSERT_H | |
15 | #pragma pop_macro("_ASSERT_H") | |
16 | #endif | |
17 | ||
18 | #include "include/utime.h" | |
19 | #include "common/RefCountedObj.h" | |
20 | #include "common/debug.h" | |
21 | #include "common/Timer.h" | |
22 | #include "common/admin_socket.h" | |
23 | ||
24 | #include "rgw_common.h" | |
9f95a23c TL |
25 | #include "rgw_http_client_types.h" |
26 | ||
31f18b77 | 27 | #include <boost/asio/coroutine.hpp> |
7c673cae FG |
28 | |
29 | #include <atomic> | |
30 | ||
31 | #define RGW_ASYNC_OPS_MGR_WINDOW 100 | |
32 | ||
33 | class RGWCoroutinesStack; | |
34 | class RGWCoroutinesManager; | |
35 | class RGWAioCompletionNotifier; | |
36 | ||
37 | class RGWCompletionManager : public RefCountedObject { | |
11fdf7f2 TL |
38 | friend class RGWCoroutinesManager; |
39 | ||
7c673cae | 40 | CephContext *cct; |
11fdf7f2 TL |
41 | |
42 | struct io_completion { | |
43 | rgw_io_id io_id; | |
44 | void *user_info; | |
45 | }; | |
20effc67 TL |
46 | std::list<io_completion> complete_reqs; |
47 | std::set<rgw_io_id> complete_reqs_set; | |
7c673cae | 48 | using NotifierRef = boost::intrusive_ptr<RGWAioCompletionNotifier>; |
20effc67 | 49 | std::set<NotifierRef> cns; |
7c673cae | 50 | |
9f95a23c TL |
51 | ceph::mutex lock = ceph::make_mutex("RGWCompletionManager::lock"); |
52 | ceph::condition_variable cond; | |
7c673cae FG |
53 | |
54 | SafeTimer timer; | |
55 | ||
56 | std::atomic<bool> going_down = { false }; | |
57 | ||
20effc67 | 58 | std::map<void *, void *> waiters; |
7c673cae FG |
59 | |
60 | class WaitContext; | |
61 | ||
62 | protected: | |
63 | void _wakeup(void *opaque); | |
11fdf7f2 | 64 | void _complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info); |
7c673cae | 65 | public: |
11fdf7f2 | 66 | explicit RGWCompletionManager(CephContext *_cct); |
1e59de90 | 67 | virtual ~RGWCompletionManager() override; |
7c673cae | 68 | |
11fdf7f2 TL |
69 | void complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info); |
70 | int get_next(io_completion *io); | |
71 | bool try_get_next(io_completion *io); | |
7c673cae FG |
72 | |
73 | void go_down(); | |
74 | ||
75 | /* | |
76 | * wait for interval length to complete user_info | |
77 | */ | |
78 | void wait_interval(void *opaque, const utime_t& interval, void *user_info); | |
79 | void wakeup(void *opaque); | |
80 | ||
81 | void register_completion_notifier(RGWAioCompletionNotifier *cn); | |
82 | void unregister_completion_notifier(RGWAioCompletionNotifier *cn); | |
83 | }; | |
84 | ||
85 | /* a single use librados aio completion notifier that hooks into the RGWCompletionManager */ | |
86 | class RGWAioCompletionNotifier : public RefCountedObject { | |
87 | librados::AioCompletion *c; | |
88 | RGWCompletionManager *completion_mgr; | |
11fdf7f2 | 89 | rgw_io_id io_id; |
7c673cae | 90 | void *user_data; |
9f95a23c | 91 | ceph::mutex lock = ceph::make_mutex("RGWAioCompletionNotifier"); |
7c673cae FG |
92 | bool registered; |
93 | ||
94 | public: | |
11fdf7f2 | 95 | RGWAioCompletionNotifier(RGWCompletionManager *_mgr, const rgw_io_id& _io_id, void *_user_data); |
1e59de90 | 96 | virtual ~RGWAioCompletionNotifier() override { |
7c673cae | 97 | c->release(); |
9f95a23c | 98 | lock.lock(); |
7c673cae FG |
99 | bool need_unregister = registered; |
100 | if (registered) { | |
101 | completion_mgr->get(); | |
102 | } | |
103 | registered = false; | |
9f95a23c | 104 | lock.unlock(); |
7c673cae FG |
105 | if (need_unregister) { |
106 | completion_mgr->unregister_completion_notifier(this); | |
107 | completion_mgr->put(); | |
108 | } | |
109 | } | |
110 | ||
111 | librados::AioCompletion *completion() { | |
112 | return c; | |
113 | } | |
114 | ||
115 | void unregister() { | |
9f95a23c | 116 | std::lock_guard l{lock}; |
7c673cae FG |
117 | if (!registered) { |
118 | return; | |
119 | } | |
120 | registered = false; | |
121 | } | |
122 | ||
123 | void cb() { | |
9f95a23c | 124 | lock.lock(); |
7c673cae | 125 | if (!registered) { |
9f95a23c | 126 | lock.unlock(); |
7c673cae FG |
127 | put(); |
128 | return; | |
129 | } | |
130 | completion_mgr->get(); | |
131 | registered = false; | |
9f95a23c | 132 | lock.unlock(); |
11fdf7f2 | 133 | completion_mgr->complete(this, io_id, user_data); |
7c673cae FG |
134 | completion_mgr->put(); |
135 | put(); | |
136 | } | |
137 | }; | |
138 | ||
11fdf7f2 TL |
139 | // completion notifier with opaque payload (ie a reference-counted pointer) |
140 | template <typename T> | |
141 | class RGWAioCompletionNotifierWith : public RGWAioCompletionNotifier { | |
142 | T value; | |
143 | public: | |
144 | RGWAioCompletionNotifierWith(RGWCompletionManager *mgr, | |
145 | const rgw_io_id& io_id, void *user_data, | |
146 | T value) | |
147 | : RGWAioCompletionNotifier(mgr, io_id, user_data), value(std::move(value)) | |
148 | {} | |
149 | }; | |
150 | ||
7c673cae FG |
151 | struct RGWCoroutinesEnv { |
152 | uint64_t run_context; | |
153 | RGWCoroutinesManager *manager; | |
20effc67 | 154 | std::list<RGWCoroutinesStack *> *scheduled_stacks; |
7c673cae FG |
155 | RGWCoroutinesStack *stack; |
156 | ||
157 | RGWCoroutinesEnv() : run_context(0), manager(NULL), scheduled_stacks(NULL), stack(NULL) {} | |
158 | }; | |
159 | ||
160 | enum RGWCoroutineState { | |
161 | RGWCoroutine_Error = -2, | |
162 | RGWCoroutine_Done = -1, | |
163 | RGWCoroutine_Run = 0, | |
164 | }; | |
165 | ||
166 | struct rgw_spawned_stacks { | |
20effc67 | 167 | std::vector<RGWCoroutinesStack *> entries; |
7c673cae FG |
168 | |
169 | rgw_spawned_stacks() {} | |
170 | ||
171 | void add_pending(RGWCoroutinesStack *s) { | |
172 | entries.push_back(s); | |
173 | } | |
174 | ||
175 | void inherit(rgw_spawned_stacks *source) { | |
20effc67 TL |
176 | for (auto* entry : source->entries) { |
177 | add_pending(entry); | |
7c673cae FG |
178 | } |
179 | source->entries.clear(); | |
180 | } | |
181 | }; | |
182 | ||
183 | ||
184 | ||
185 | class RGWCoroutine : public RefCountedObject, public boost::asio::coroutine { | |
186 | friend class RGWCoroutinesStack; | |
187 | ||
188 | struct StatusItem { | |
189 | utime_t timestamp; | |
20effc67 | 190 | std::string status; |
7c673cae | 191 | |
20effc67 | 192 | StatusItem(utime_t& t, const std::string& s) : timestamp(t), status(s) {} |
7c673cae FG |
193 | |
194 | void dump(Formatter *f) const; | |
195 | }; | |
196 | ||
197 | #define MAX_COROUTINE_HISTORY 10 | |
198 | ||
199 | struct Status { | |
200 | CephContext *cct; | |
9f95a23c TL |
201 | ceph::shared_mutex lock = |
202 | ceph::make_shared_mutex("RGWCoroutine::Status::lock"); | |
7c673cae FG |
203 | int max_history; |
204 | ||
205 | utime_t timestamp; | |
20effc67 | 206 | std::stringstream status; |
7c673cae | 207 | |
9f95a23c | 208 | explicit Status(CephContext *_cct) : cct(_cct), max_history(MAX_COROUTINE_HISTORY) {} |
7c673cae | 209 | |
20effc67 | 210 | std::deque<StatusItem> history; |
7c673cae | 211 | |
20effc67 | 212 | std::stringstream& set_status(); |
7c673cae FG |
213 | } status; |
214 | ||
20effc67 | 215 | std::stringstream description; |
7c673cae FG |
216 | |
217 | protected: | |
218 | bool _yield_ret; | |
f67539c2 TL |
219 | |
220 | struct { | |
221 | boost::asio::coroutine cr; | |
222 | bool should_exit{false}; | |
223 | int ret{0}; | |
224 | ||
225 | void init() { | |
226 | cr = boost::asio::coroutine(); | |
227 | should_exit = false; | |
228 | ret = 0; | |
229 | } | |
230 | } drain_status; | |
7c673cae FG |
231 | |
232 | CephContext *cct; | |
233 | ||
234 | RGWCoroutinesStack *stack; | |
235 | int retcode; | |
236 | int state; | |
237 | ||
238 | rgw_spawned_stacks spawned; | |
239 | ||
20effc67 | 240 | std::stringstream error_stream; |
7c673cae FG |
241 | |
242 | int set_state(int s, int ret = 0) { | |
11fdf7f2 | 243 | retcode = ret; |
7c673cae FG |
244 | state = s; |
245 | return ret; | |
246 | } | |
247 | int set_cr_error(int ret) { | |
11fdf7f2 | 248 | return set_state(RGWCoroutine_Error, ret); |
7c673cae FG |
249 | } |
250 | int set_cr_done() { | |
11fdf7f2 | 251 | return set_state(RGWCoroutine_Done, 0); |
7c673cae FG |
252 | } |
253 | void set_io_blocked(bool flag); | |
7c673cae FG |
254 | |
255 | void reset_description() { | |
20effc67 | 256 | description.str(std::string()); |
7c673cae FG |
257 | } |
258 | ||
20effc67 | 259 | std::stringstream& set_description() { |
7c673cae FG |
260 | return description; |
261 | } | |
20effc67 | 262 | std::stringstream& set_status() { |
7c673cae FG |
263 | return status.set_status(); |
264 | } | |
265 | ||
20effc67 TL |
266 | std::stringstream& set_status(const std::string& s) { |
267 | std::stringstream& status = set_status(); | |
7c673cae FG |
268 | status << s; |
269 | return status; | |
270 | } | |
271 | ||
b3b6e05e TL |
272 | virtual int operate_wrapper(const DoutPrefixProvider *dpp) { |
273 | return operate(dpp); | |
11fdf7f2 | 274 | } |
7c673cae FG |
275 | public: |
276 | RGWCoroutine(CephContext *_cct) : status(_cct), _yield_ret(false), cct(_cct), stack(NULL), retcode(0), state(RGWCoroutine_Run) {} | |
20effc67 | 277 | virtual ~RGWCoroutine() override; |
7c673cae | 278 | |
b3b6e05e | 279 | virtual int operate(const DoutPrefixProvider *dpp) = 0; |
7c673cae FG |
280 | |
281 | bool is_done() { return (state == RGWCoroutine_Done || state == RGWCoroutine_Error); } | |
282 | bool is_error() { return (state == RGWCoroutine_Error); } | |
283 | ||
20effc67 TL |
284 | std::stringstream& log_error() { return error_stream; } |
285 | std::string error_str() { | |
7c673cae FG |
286 | return error_stream.str(); |
287 | } | |
288 | ||
289 | void set_retcode(int r) { | |
290 | retcode = r; | |
291 | } | |
292 | ||
293 | int get_ret_status() { | |
294 | return retcode; | |
295 | } | |
296 | ||
297 | void call(RGWCoroutine *op); /* call at the same stack we're in */ | |
298 | RGWCoroutinesStack *spawn(RGWCoroutine *op, bool wait); /* execute on a different stack */ | |
f67539c2 | 299 | bool collect(int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id = nullptr); /* returns true if needs to be called again */ |
7c673cae FG |
300 | bool collect_next(int *ret, RGWCoroutinesStack **collected_stack = NULL); /* returns true if found a stack to collect */ |
301 | ||
302 | int wait(const utime_t& interval); | |
f67539c2 TL |
303 | bool drain_children(int num_cr_left, |
304 | RGWCoroutinesStack *skip_stack = nullptr, | |
305 | std::optional<std::function<void(uint64_t stack_id, int ret)> > cb = std::nullopt); /* returns true if needed to be called again, | |
306 | cb will be called on completion of every | |
307 | completion. */ | |
308 | bool drain_children(int num_cr_left, | |
309 | std::optional<std::function<int(uint64_t stack_id, int ret)> > cb); /* returns true if needed to be called again, | |
310 | cb will be called on every completion, can filter errors. | |
311 | A negative return value from cb means that current cr | |
312 | will need to exit */ | |
7c673cae FG |
313 | void wakeup(); |
314 | void set_sleeping(bool flag); /* put in sleep, or wakeup from sleep */ | |
315 | ||
316 | size_t num_spawned() { | |
317 | return spawned.entries.size(); | |
318 | } | |
319 | ||
320 | void wait_for_child(); | |
321 | ||
20effc67 | 322 | virtual std::string to_str() const; |
7c673cae FG |
323 | |
324 | RGWCoroutinesStack *get_stack() const { | |
325 | return stack; | |
326 | } | |
327 | ||
11fdf7f2 TL |
328 | RGWCoroutinesEnv *get_env() const; |
329 | ||
7c673cae | 330 | void dump(Formatter *f) const; |
11fdf7f2 TL |
331 | |
332 | void init_new_io(RGWIOProvider *io_provider); /* only links the default io id */ | |
333 | ||
334 | int io_block(int ret = 0) { | |
335 | return io_block(ret, -1); | |
336 | } | |
337 | int io_block(int ret, int64_t io_id); | |
338 | int io_block(int ret, const rgw_io_id& io_id); | |
339 | void io_complete() { | |
340 | io_complete(rgw_io_id{}); | |
341 | } | |
342 | void io_complete(const rgw_io_id& io_id); | |
7c673cae FG |
343 | }; |
344 | ||
20effc67 | 345 | std::ostream& operator<<(std::ostream& out, const RGWCoroutine& cr); |
7c673cae FG |
346 | |
347 | #define yield_until_true(x) \ | |
348 | do { \ | |
349 | do { \ | |
350 | yield _yield_ret = x; \ | |
351 | } while (!_yield_ret); \ | |
352 | _yield_ret = false; \ | |
353 | } while (0) | |
354 | ||
355 | #define drain_all() \ | |
f67539c2 | 356 | drain_status.init(); \ |
7c673cae FG |
357 | yield_until_true(drain_children(0)) |
358 | ||
359 | #define drain_all_but(n) \ | |
f67539c2 | 360 | drain_status.init(); \ |
7c673cae FG |
361 | yield_until_true(drain_children(n)) |
362 | ||
363 | #define drain_all_but_stack(stack) \ | |
f67539c2 | 364 | drain_status.init(); \ |
7c673cae FG |
365 | yield_until_true(drain_children(1, stack)) |
366 | ||
f67539c2 TL |
367 | #define drain_all_but_stack_cb(stack, cb) \ |
368 | drain_status.init(); \ | |
369 | yield_until_true(drain_children(1, stack, cb)) | |
370 | ||
371 | #define drain_with_cb(n, cb) \ | |
372 | drain_status.init(); \ | |
373 | yield_until_true(drain_children(n, cb)); \ | |
374 | if (drain_status.should_exit) { \ | |
375 | return set_cr_error(drain_status.ret); \ | |
376 | } | |
377 | ||
378 | #define drain_all_cb(cb) \ | |
379 | drain_with_cb(0, cb) | |
380 | ||
381 | #define yield_spawn_window(cr, n, cb) \ | |
382 | do { \ | |
383 | spawn(cr, false); \ | |
384 | drain_with_cb(n, cb); /* this is guaranteed to yield */ \ | |
385 | } while (0) | |
386 | ||
387 | ||
388 | ||
7c673cae FG |
389 | template <class T> |
390 | class RGWConsumerCR : public RGWCoroutine { | |
20effc67 | 391 | std::list<T> product; |
7c673cae FG |
392 | |
393 | public: | |
11fdf7f2 | 394 | explicit RGWConsumerCR(CephContext *_cct) : RGWCoroutine(_cct) {} |
7c673cae FG |
395 | |
396 | bool has_product() { | |
397 | return !product.empty(); | |
398 | } | |
399 | ||
400 | void wait_for_product() { | |
401 | if (!has_product()) { | |
402 | set_sleeping(true); | |
403 | } | |
404 | } | |
405 | ||
406 | bool consume(T *p) { | |
407 | if (product.empty()) { | |
408 | return false; | |
409 | } | |
410 | *p = product.front(); | |
411 | product.pop_front(); | |
412 | return true; | |
413 | } | |
414 | ||
415 | void receive(const T& p, bool wakeup = true); | |
20effc67 | 416 | void receive(std::list<T>& l, bool wakeup = true); |
7c673cae FG |
417 | }; |
418 | ||
419 | class RGWCoroutinesStack : public RefCountedObject { | |
420 | friend class RGWCoroutine; | |
421 | friend class RGWCoroutinesManager; | |
422 | ||
423 | CephContext *cct; | |
424 | ||
f67539c2 TL |
425 | int64_t id{-1}; |
426 | ||
7c673cae FG |
427 | RGWCoroutinesManager *ops_mgr; |
428 | ||
20effc67 TL |
429 | std::list<RGWCoroutine *> ops; |
430 | std::list<RGWCoroutine *>::iterator pos; | |
7c673cae FG |
431 | |
432 | rgw_spawned_stacks spawned; | |
433 | ||
20effc67 TL |
434 | std::set<RGWCoroutinesStack *> blocked_by_stack; |
435 | std::set<RGWCoroutinesStack *> blocking_stacks; | |
7c673cae | 436 | |
20effc67 | 437 | std::map<int64_t, rgw_io_id> io_finish_ids; |
11fdf7f2 TL |
438 | rgw_io_id io_blocked_id; |
439 | ||
7c673cae FG |
440 | bool done_flag; |
441 | bool error_flag; | |
442 | bool blocked_flag; | |
443 | bool sleep_flag; | |
444 | bool interval_wait_flag; | |
445 | ||
446 | bool is_scheduled; | |
447 | ||
448 | bool is_waiting_for_child; | |
449 | ||
450 | int retcode; | |
451 | ||
452 | uint64_t run_count; | |
453 | ||
454 | protected: | |
455 | RGWCoroutinesEnv *env; | |
456 | RGWCoroutinesStack *parent; | |
457 | ||
458 | RGWCoroutinesStack *spawn(RGWCoroutine *source_op, RGWCoroutine *next_op, bool wait); | |
f67539c2 | 459 | bool collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id); /* returns true if needs to be called again */ |
7c673cae FG |
460 | bool collect_next(RGWCoroutine *op, int *ret, RGWCoroutinesStack **collected_stack); /* returns true if found a stack to collect */ |
461 | public: | |
462 | RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start = NULL); | |
1e59de90 | 463 | virtual ~RGWCoroutinesStack() override; |
7c673cae | 464 | |
f67539c2 TL |
465 | int64_t get_id() const { |
466 | return id; | |
467 | } | |
468 | ||
b3b6e05e | 469 | int operate(const DoutPrefixProvider *dpp, RGWCoroutinesEnv *env); |
7c673cae FG |
470 | |
471 | bool is_done() { | |
472 | return done_flag; | |
473 | } | |
474 | bool is_error() { | |
475 | return error_flag; | |
476 | } | |
477 | bool is_blocked_by_stack() { | |
478 | return !blocked_by_stack.empty(); | |
479 | } | |
480 | void set_io_blocked(bool flag) { | |
481 | blocked_flag = flag; | |
482 | } | |
11fdf7f2 TL |
483 | void set_io_blocked_id(const rgw_io_id& io_id) { |
484 | io_blocked_id = io_id; | |
485 | } | |
7c673cae | 486 | bool is_io_blocked() { |
11fdf7f2 | 487 | return blocked_flag && !done_flag; |
7c673cae | 488 | } |
11fdf7f2 TL |
489 | bool can_io_unblock(const rgw_io_id& io_id) { |
490 | return ((io_blocked_id.id < 0) || | |
491 | io_blocked_id.intersects(io_id)); | |
492 | } | |
493 | bool try_io_unblock(const rgw_io_id& io_id); | |
494 | bool consume_io_finish(const rgw_io_id& io_id); | |
7c673cae FG |
495 | void set_interval_wait(bool flag) { |
496 | interval_wait_flag = flag; | |
497 | } | |
498 | bool is_interval_waiting() { | |
499 | return interval_wait_flag; | |
500 | } | |
501 | void set_sleeping(bool flag) { | |
502 | bool wakeup = sleep_flag & !flag; | |
503 | sleep_flag = flag; | |
504 | if (wakeup) { | |
505 | schedule(); | |
506 | } | |
507 | } | |
508 | bool is_sleeping() { | |
509 | return sleep_flag; | |
510 | } | |
511 | void set_is_scheduled(bool flag) { | |
512 | is_scheduled = flag; | |
513 | } | |
514 | ||
515 | bool is_blocked() { | |
516 | return is_blocked_by_stack() || is_sleeping() || | |
517 | is_io_blocked() || waiting_for_child() ; | |
518 | } | |
519 | ||
11fdf7f2 TL |
520 | void schedule(); |
521 | void _schedule(); | |
7c673cae FG |
522 | |
523 | int get_ret_status() { | |
524 | return retcode; | |
525 | } | |
526 | ||
20effc67 | 527 | std::string error_str(); |
7c673cae FG |
528 | |
529 | void call(RGWCoroutine *next_op); | |
530 | RGWCoroutinesStack *spawn(RGWCoroutine *next_op, bool wait); | |
531 | int unwind(int retcode); | |
532 | ||
533 | int wait(const utime_t& interval); | |
534 | void wakeup(); | |
11fdf7f2 TL |
535 | void io_complete() { |
536 | io_complete(rgw_io_id{}); | |
537 | } | |
538 | void io_complete(const rgw_io_id& io_id); | |
7c673cae | 539 | |
f67539c2 | 540 | bool collect(int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id); /* returns true if needs to be called again */ |
7c673cae | 541 | |
11fdf7f2 TL |
542 | void cancel(); |
543 | ||
7c673cae | 544 | RGWAioCompletionNotifier *create_completion_notifier(); |
11fdf7f2 TL |
545 | template <typename T> |
546 | RGWAioCompletionNotifier *create_completion_notifier(T value); | |
7c673cae FG |
547 | RGWCompletionManager *get_completion_mgr(); |
548 | ||
549 | void set_blocked_by(RGWCoroutinesStack *s) { | |
550 | blocked_by_stack.insert(s); | |
551 | s->blocking_stacks.insert(this); | |
552 | } | |
553 | ||
554 | void set_wait_for_child(bool flag) { | |
555 | is_waiting_for_child = flag; | |
556 | } | |
557 | ||
558 | bool waiting_for_child() { | |
559 | return is_waiting_for_child; | |
560 | } | |
561 | ||
562 | bool unblock_stack(RGWCoroutinesStack **s); | |
563 | ||
11fdf7f2 | 564 | RGWCoroutinesEnv *get_env() const { return env; } |
7c673cae FG |
565 | |
566 | void dump(Formatter *f) const; | |
11fdf7f2 TL |
567 | |
568 | void init_new_io(RGWIOProvider *io_provider); | |
7c673cae FG |
569 | }; |
570 | ||
571 | template <class T> | |
20effc67 | 572 | void RGWConsumerCR<T>::receive(std::list<T>& l, bool wakeup) |
7c673cae FG |
573 | { |
574 | product.splice(product.end(), l); | |
575 | if (wakeup) { | |
576 | set_sleeping(false); | |
577 | } | |
578 | } | |
579 | ||
580 | ||
581 | template <class T> | |
582 | void RGWConsumerCR<T>::receive(const T& p, bool wakeup) | |
583 | { | |
584 | product.push_back(p); | |
585 | if (wakeup) { | |
586 | set_sleeping(false); | |
587 | } | |
588 | } | |
589 | ||
590 | class RGWCoroutinesManagerRegistry : public RefCountedObject, public AdminSocketHook { | |
591 | CephContext *cct; | |
592 | ||
20effc67 | 593 | std::set<RGWCoroutinesManager *> managers; |
9f95a23c TL |
594 | ceph::shared_mutex lock = |
595 | ceph::make_shared_mutex("RGWCoroutinesRegistry::lock"); | |
7c673cae | 596 | |
20effc67 | 597 | std::string admin_command; |
7c673cae FG |
598 | |
599 | public: | |
9f95a23c | 600 | explicit RGWCoroutinesManagerRegistry(CephContext *_cct) : cct(_cct) {} |
20effc67 | 601 | virtual ~RGWCoroutinesManagerRegistry() override; |
7c673cae FG |
602 | |
603 | void add(RGWCoroutinesManager *mgr); | |
604 | void remove(RGWCoroutinesManager *mgr); | |
605 | ||
20effc67 | 606 | int hook_to_admin_command(const std::string& command); |
9f95a23c | 607 | int call(std::string_view command, const cmdmap_t& cmdmap, |
39ae355f | 608 | const bufferlist&, |
9f95a23c TL |
609 | Formatter *f, |
610 | std::ostream& ss, | |
611 | bufferlist& out) override; | |
11fdf7f2 | 612 | |
7c673cae FG |
613 | void dump(Formatter *f) const; |
614 | }; | |
615 | ||
616 | class RGWCoroutinesManager { | |
617 | CephContext *cct; | |
618 | std::atomic<bool> going_down = { false }; | |
619 | ||
620 | std::atomic<int64_t> run_context_count = { 0 }; | |
20effc67 | 621 | std::map<uint64_t, std::set<RGWCoroutinesStack *> > run_contexts; |
7c673cae | 622 | |
11fdf7f2 | 623 | std::atomic<int64_t> max_io_id = { 0 }; |
f67539c2 | 624 | std::atomic<uint64_t> max_stack_id = { 0 }; |
11fdf7f2 | 625 | |
9f95a23c TL |
626 | mutable ceph::shared_mutex lock = |
627 | ceph::make_shared_mutex("RGWCoroutinesManager::lock"); | |
7c673cae | 628 | |
11fdf7f2 TL |
629 | RGWIOIDProvider io_id_provider; |
630 | ||
20effc67 | 631 | void handle_unblocked_stack(std::set<RGWCoroutinesStack *>& context_stacks, std::list<RGWCoroutinesStack *>& scheduled_stacks, |
1e59de90 | 632 | RGWCompletionManager::io_completion& io, int *waiting_count, int *interval_wait_count); |
7c673cae FG |
633 | protected: |
634 | RGWCompletionManager *completion_mgr; | |
635 | RGWCoroutinesManagerRegistry *cr_registry; | |
636 | ||
637 | int ops_window; | |
638 | ||
20effc67 | 639 | std::string id; |
7c673cae FG |
640 | |
641 | void put_completion_notifier(RGWAioCompletionNotifier *cn); | |
642 | public: | |
9f95a23c | 643 | RGWCoroutinesManager(CephContext *_cct, RGWCoroutinesManagerRegistry *_cr_registry) : cct(_cct), |
7c673cae FG |
644 | cr_registry(_cr_registry), ops_window(RGW_ASYNC_OPS_MGR_WINDOW) { |
645 | completion_mgr = new RGWCompletionManager(cct); | |
646 | if (cr_registry) { | |
647 | cr_registry->add(this); | |
648 | } | |
649 | } | |
20effc67 | 650 | virtual ~RGWCoroutinesManager(); |
7c673cae | 651 | |
20effc67 | 652 | int run(const DoutPrefixProvider *dpp, std::list<RGWCoroutinesStack *>& ops); |
b3b6e05e | 653 | int run(const DoutPrefixProvider *dpp, RGWCoroutine *op); |
7c673cae FG |
654 | void stop() { |
655 | bool expected = false; | |
656 | if (going_down.compare_exchange_strong(expected, true)) { | |
657 | completion_mgr->go_down(); | |
658 | } | |
659 | } | |
660 | ||
661 | virtual void report_error(RGWCoroutinesStack *op); | |
662 | ||
663 | RGWAioCompletionNotifier *create_completion_notifier(RGWCoroutinesStack *stack); | |
11fdf7f2 TL |
664 | template <typename T> |
665 | RGWAioCompletionNotifier *create_completion_notifier(RGWCoroutinesStack *stack, T value); | |
7c673cae FG |
666 | RGWCompletionManager *get_completion_mgr() { return completion_mgr; } |
667 | ||
668 | void schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack); | |
11fdf7f2 | 669 | void _schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack); |
7c673cae FG |
670 | RGWCoroutinesStack *allocate_stack(); |
671 | ||
11fdf7f2 | 672 | int64_t get_next_io_id(); |
f67539c2 | 673 | uint64_t get_next_stack_id(); |
11fdf7f2 TL |
674 | |
675 | void set_sleeping(RGWCoroutine *cr, bool flag); | |
676 | void io_complete(RGWCoroutine *cr, const rgw_io_id& io_id); | |
677 | ||
20effc67 | 678 | virtual std::string get_id(); |
7c673cae | 679 | void dump(Formatter *f) const; |
11fdf7f2 TL |
680 | |
681 | RGWIOIDProvider& get_io_id_provider() { | |
682 | return io_id_provider; | |
683 | } | |
7c673cae FG |
684 | }; |
685 | ||
11fdf7f2 TL |
686 | template <typename T> |
687 | RGWAioCompletionNotifier *RGWCoroutinesManager::create_completion_notifier(RGWCoroutinesStack *stack, T value) | |
688 | { | |
689 | rgw_io_id io_id{get_next_io_id(), -1}; | |
690 | RGWAioCompletionNotifier *cn = new RGWAioCompletionNotifierWith<T>(completion_mgr, io_id, (void *)stack, std::move(value)); | |
691 | completion_mgr->register_completion_notifier(cn); | |
692 | return cn; | |
693 | } | |
694 | ||
695 | template <typename T> | |
696 | RGWAioCompletionNotifier *RGWCoroutinesStack::create_completion_notifier(T value) | |
697 | { | |
698 | return ops_mgr->create_completion_notifier(this, std::move(value)); | |
699 | } | |
700 | ||
7c673cae FG |
701 | class RGWSimpleCoroutine : public RGWCoroutine { |
702 | bool called_cleanup; | |
703 | ||
b3b6e05e | 704 | int operate(const DoutPrefixProvider *dpp) override; |
7c673cae FG |
705 | |
706 | int state_init(); | |
b3b6e05e | 707 | int state_send_request(const DoutPrefixProvider *dpp); |
7c673cae FG |
708 | int state_request_complete(); |
709 | int state_all_complete(); | |
710 | ||
711 | void call_cleanup(); | |
712 | ||
713 | public: | |
714 | RGWSimpleCoroutine(CephContext *_cct) : RGWCoroutine(_cct), called_cleanup(false) {} | |
1e59de90 | 715 | virtual ~RGWSimpleCoroutine() override; |
7c673cae FG |
716 | |
717 | virtual int init() { return 0; } | |
b3b6e05e | 718 | virtual int send_request(const DoutPrefixProvider *dpp) = 0; |
7c673cae FG |
719 | virtual int request_complete() = 0; |
720 | virtual int finish() { return 0; } | |
721 | virtual void request_cleanup() {} | |
722 | }; |