]>
Commit | Line | Data |
---|---|---|
11fdf7f2 | 1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
9f95a23c | 2 | // vim: ts=8 sw=2 smarttab ft=cpp |
7c673cae | 3 | |
9f95a23c | 4 | #include "include/Context.h" |
7c673cae | 5 | #include "common/ceph_json.h" |
7c673cae | 6 | #include "rgw_coroutine.h" |
7c673cae FG |
7 | |
8 | // re-include our assert to clobber the system one; fix dout: | |
11fdf7f2 | 9 | #include "include/ceph_assert.h" |
7c673cae | 10 | |
31f18b77 FG |
11 | #include <boost/asio/yield.hpp> |
12 | ||
7c673cae | 13 | #define dout_subsys ceph_subsys_rgw |
11fdf7f2 | 14 | #define dout_context g_ceph_context |
7c673cae | 15 | |
20effc67 | 16 | using namespace std; |
7c673cae FG |
17 | |
18 | class RGWCompletionManager::WaitContext : public Context { | |
19 | RGWCompletionManager *manager; | |
20 | void *opaque; | |
21 | public: | |
22 | WaitContext(RGWCompletionManager *_cm, void *_opaque) : manager(_cm), opaque(_opaque) {} | |
23 | void finish(int r) override { | |
24 | manager->_wakeup(opaque); | |
25 | } | |
26 | }; | |
27 | ||
9f95a23c | 28 | RGWCompletionManager::RGWCompletionManager(CephContext *_cct) : cct(_cct), |
7c673cae FG |
29 | timer(cct, lock) |
30 | { | |
31 | timer.init(); | |
32 | } | |
33 | ||
34 | RGWCompletionManager::~RGWCompletionManager() | |
35 | { | |
9f95a23c | 36 | std::lock_guard l{lock}; |
7c673cae FG |
37 | timer.cancel_all_events(); |
38 | timer.shutdown(); | |
39 | } | |
40 | ||
11fdf7f2 | 41 | void RGWCompletionManager::complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info) |
7c673cae | 42 | { |
9f95a23c | 43 | std::lock_guard l{lock}; |
11fdf7f2 | 44 | _complete(cn, io_id, user_info); |
7c673cae FG |
45 | } |
46 | ||
47 | void RGWCompletionManager::register_completion_notifier(RGWAioCompletionNotifier *cn) | |
48 | { | |
9f95a23c | 49 | std::lock_guard l{lock}; |
7c673cae FG |
50 | if (cn) { |
51 | cns.insert(cn); | |
52 | } | |
53 | } | |
54 | ||
55 | void RGWCompletionManager::unregister_completion_notifier(RGWAioCompletionNotifier *cn) | |
56 | { | |
9f95a23c | 57 | std::lock_guard l{lock}; |
7c673cae FG |
58 | if (cn) { |
59 | cns.erase(cn); | |
60 | } | |
61 | } | |
62 | ||
11fdf7f2 | 63 | void RGWCompletionManager::_complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info) |
7c673cae FG |
64 | { |
65 | if (cn) { | |
66 | cns.erase(cn); | |
67 | } | |
11fdf7f2 TL |
68 | |
69 | if (complete_reqs_set.find(io_id) != complete_reqs_set.end()) { | |
70 | /* already have completion for this io_id, don't allow multiple completions for it */ | |
71 | return; | |
72 | } | |
73 | complete_reqs.push_back(io_completion{io_id, user_info}); | |
9f95a23c | 74 | cond.notify_all(); |
7c673cae FG |
75 | } |
76 | ||
11fdf7f2 | 77 | int RGWCompletionManager::get_next(io_completion *io) |
7c673cae | 78 | { |
9f95a23c | 79 | std::unique_lock l{lock}; |
7c673cae | 80 | while (complete_reqs.empty()) { |
7c673cae FG |
81 | if (going_down) { |
82 | return -ECANCELED; | |
83 | } | |
9f95a23c | 84 | cond.wait(l); |
7c673cae | 85 | } |
11fdf7f2 TL |
86 | *io = complete_reqs.front(); |
87 | complete_reqs_set.erase(io->io_id); | |
7c673cae FG |
88 | complete_reqs.pop_front(); |
89 | return 0; | |
90 | } | |
91 | ||
11fdf7f2 | 92 | bool RGWCompletionManager::try_get_next(io_completion *io) |
7c673cae | 93 | { |
9f95a23c | 94 | std::lock_guard l{lock}; |
7c673cae FG |
95 | if (complete_reqs.empty()) { |
96 | return false; | |
97 | } | |
11fdf7f2 TL |
98 | *io = complete_reqs.front(); |
99 | complete_reqs_set.erase(io->io_id); | |
7c673cae FG |
100 | complete_reqs.pop_front(); |
101 | return true; | |
102 | } | |
103 | ||
104 | void RGWCompletionManager::go_down() | |
105 | { | |
9f95a23c | 106 | std::lock_guard l{lock}; |
7c673cae FG |
107 | for (auto cn : cns) { |
108 | cn->unregister(); | |
109 | } | |
110 | going_down = true; | |
9f95a23c | 111 | cond.notify_all(); |
7c673cae FG |
112 | } |
113 | ||
114 | void RGWCompletionManager::wait_interval(void *opaque, const utime_t& interval, void *user_info) | |
115 | { | |
9f95a23c | 116 | std::lock_guard l{lock}; |
11fdf7f2 | 117 | ceph_assert(waiters.find(opaque) == waiters.end()); |
7c673cae FG |
118 | waiters[opaque] = user_info; |
119 | timer.add_event_after(interval, new WaitContext(this, opaque)); | |
120 | } | |
121 | ||
122 | void RGWCompletionManager::wakeup(void *opaque) | |
123 | { | |
9f95a23c | 124 | std::lock_guard l{lock}; |
7c673cae FG |
125 | _wakeup(opaque); |
126 | } | |
127 | ||
128 | void RGWCompletionManager::_wakeup(void *opaque) | |
129 | { | |
130 | map<void *, void *>::iterator iter = waiters.find(opaque); | |
131 | if (iter != waiters.end()) { | |
132 | void *user_id = iter->second; | |
133 | waiters.erase(iter); | |
11fdf7f2 | 134 | _complete(NULL, rgw_io_id{0, -1} /* no IO id */, user_id); |
7c673cae FG |
135 | } |
136 | } | |
137 | ||
138 | RGWCoroutine::~RGWCoroutine() { | |
139 | for (auto stack : spawned.entries) { | |
140 | stack->put(); | |
141 | } | |
142 | } | |
143 | ||
11fdf7f2 TL |
144 | void RGWCoroutine::init_new_io(RGWIOProvider *io_provider) |
145 | { | |
33c7a0ef | 146 | ceph_assert(stack); // if there's no stack, io_provider won't be uninitialized |
11fdf7f2 TL |
147 | stack->init_new_io(io_provider); |
148 | } | |
149 | ||
7c673cae | 150 | void RGWCoroutine::set_io_blocked(bool flag) { |
33c7a0ef TL |
151 | if (stack) { |
152 | stack->set_io_blocked(flag); | |
153 | } | |
7c673cae FG |
154 | } |
155 | ||
156 | void RGWCoroutine::set_sleeping(bool flag) { | |
33c7a0ef TL |
157 | if (stack) { |
158 | stack->set_sleeping(flag); | |
159 | } | |
7c673cae FG |
160 | } |
161 | ||
11fdf7f2 TL |
162 | int RGWCoroutine::io_block(int ret, int64_t io_id) { |
163 | return io_block(ret, rgw_io_id{io_id, -1}); | |
164 | } | |
165 | ||
166 | int RGWCoroutine::io_block(int ret, const rgw_io_id& io_id) { | |
33c7a0ef TL |
167 | if (!stack) { |
168 | return 0; | |
169 | } | |
11fdf7f2 TL |
170 | if (stack->consume_io_finish(io_id)) { |
171 | return 0; | |
172 | } | |
7c673cae | 173 | set_io_blocked(true); |
11fdf7f2 | 174 | stack->set_io_blocked_id(io_id); |
7c673cae FG |
175 | return ret; |
176 | } | |
177 | ||
11fdf7f2 | 178 | void RGWCoroutine::io_complete(const rgw_io_id& io_id) { |
33c7a0ef TL |
179 | if (stack) { |
180 | stack->io_complete(io_id); | |
181 | } | |
11fdf7f2 TL |
182 | } |
183 | ||
7c673cae FG |
184 | void RGWCoroutine::StatusItem::dump(Formatter *f) const { |
185 | ::encode_json("timestamp", timestamp, f); | |
186 | ::encode_json("status", status, f); | |
187 | } | |
188 | ||
189 | stringstream& RGWCoroutine::Status::set_status() | |
190 | { | |
9f95a23c | 191 | std::unique_lock l{lock}; |
7c673cae FG |
192 | string s = status.str(); |
193 | status.str(string()); | |
194 | if (!timestamp.is_zero()) { | |
195 | history.push_back(StatusItem(timestamp, s)); | |
196 | } | |
197 | if (history.size() > (size_t)max_history) { | |
198 | history.pop_front(); | |
199 | } | |
200 | timestamp = ceph_clock_now(); | |
201 | ||
202 | return status; | |
203 | } | |
204 | ||
20effc67 TL |
205 | RGWCoroutinesManager::~RGWCoroutinesManager() { |
206 | stop(); | |
207 | completion_mgr->put(); | |
208 | if (cr_registry) { | |
209 | cr_registry->remove(this); | |
210 | } | |
211 | } | |
212 | ||
11fdf7f2 TL |
213 | int64_t RGWCoroutinesManager::get_next_io_id() |
214 | { | |
215 | return (int64_t)++max_io_id; | |
216 | } | |
217 | ||
f67539c2 TL |
218 | uint64_t RGWCoroutinesManager::get_next_stack_id() { |
219 | return (uint64_t)++max_stack_id; | |
220 | } | |
221 | ||
7c673cae FG |
222 | RGWCoroutinesStack::RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start) : cct(_cct), ops_mgr(_ops_mgr), |
223 | done_flag(false), error_flag(false), blocked_flag(false), | |
224 | sleep_flag(false), interval_wait_flag(false), is_scheduled(false), is_waiting_for_child(false), | |
225 | retcode(0), run_count(0), | |
226 | env(NULL), parent(NULL) | |
227 | { | |
f67539c2 | 228 | id = ops_mgr->get_next_stack_id(); |
7c673cae FG |
229 | if (start) { |
230 | ops.push_back(start); | |
231 | } | |
232 | pos = ops.begin(); | |
233 | } | |
234 | ||
235 | RGWCoroutinesStack::~RGWCoroutinesStack() | |
236 | { | |
237 | for (auto op : ops) { | |
238 | op->put(); | |
239 | } | |
240 | ||
241 | for (auto stack : spawned.entries) { | |
242 | stack->put(); | |
243 | } | |
f67539c2 TL |
244 | |
245 | if (preallocated_stack) { | |
246 | preallocated_stack->put(); | |
247 | } | |
7c673cae FG |
248 | } |
249 | ||
b3b6e05e | 250 | int RGWCoroutinesStack::operate(const DoutPrefixProvider *dpp, RGWCoroutinesEnv *_env) |
7c673cae FG |
251 | { |
252 | env = _env; | |
253 | RGWCoroutine *op = *pos; | |
254 | op->stack = this; | |
b3b6e05e TL |
255 | ldpp_dout(dpp, 20) << *op << ": operate()" << dendl; |
256 | int r = op->operate_wrapper(dpp); | |
7c673cae | 257 | if (r < 0) { |
b3b6e05e | 258 | ldpp_dout(dpp, 20) << *op << ": operate() returned r=" << r << dendl; |
7c673cae FG |
259 | } |
260 | ||
261 | error_flag = op->is_error(); | |
262 | ||
263 | if (op->is_done()) { | |
264 | int op_retcode = r; | |
265 | r = unwind(op_retcode); | |
266 | op->put(); | |
267 | done_flag = (pos == ops.end()); | |
11fdf7f2 | 268 | blocked_flag &= !done_flag; |
7c673cae FG |
269 | if (done_flag) { |
270 | retcode = op_retcode; | |
271 | } | |
272 | return r; | |
273 | } | |
274 | ||
275 | /* should r ever be negative at this point? */ | |
11fdf7f2 | 276 | ceph_assert(r >= 0); |
7c673cae FG |
277 | |
278 | return 0; | |
279 | } | |
280 | ||
281 | string RGWCoroutinesStack::error_str() | |
282 | { | |
283 | if (pos != ops.end()) { | |
284 | return (*pos)->error_str(); | |
285 | } | |
286 | return string(); | |
287 | } | |
288 | ||
289 | void RGWCoroutinesStack::call(RGWCoroutine *next_op) { | |
290 | if (!next_op) { | |
291 | return; | |
292 | } | |
293 | ops.push_back(next_op); | |
294 | if (pos != ops.end()) { | |
295 | ++pos; | |
296 | } else { | |
297 | pos = ops.begin(); | |
298 | } | |
299 | } | |
300 | ||
11fdf7f2 TL |
301 | void RGWCoroutinesStack::schedule() |
302 | { | |
303 | env->manager->schedule(env, this); | |
304 | } | |
305 | ||
306 | void RGWCoroutinesStack::_schedule() | |
307 | { | |
308 | env->manager->_schedule(env, this); | |
309 | } | |
310 | ||
7c673cae FG |
311 | RGWCoroutinesStack *RGWCoroutinesStack::spawn(RGWCoroutine *source_op, RGWCoroutine *op, bool wait) |
312 | { | |
313 | if (!op) { | |
314 | return NULL; | |
315 | } | |
316 | ||
317 | rgw_spawned_stacks *s = (source_op ? &source_op->spawned : &spawned); | |
318 | ||
f67539c2 TL |
319 | RGWCoroutinesStack *stack = preallocated_stack; |
320 | if (!stack) { | |
321 | stack = env->manager->allocate_stack(); | |
322 | } | |
323 | preallocated_stack = nullptr; | |
324 | ||
7c673cae FG |
325 | s->add_pending(stack); |
326 | stack->parent = this; | |
327 | ||
328 | stack->get(); /* we'll need to collect the stack */ | |
329 | stack->call(op); | |
330 | ||
331 | env->manager->schedule(env, stack); | |
332 | ||
333 | if (wait) { | |
334 | set_blocked_by(stack); | |
335 | } | |
336 | ||
337 | return stack; | |
338 | } | |
339 | ||
340 | RGWCoroutinesStack *RGWCoroutinesStack::spawn(RGWCoroutine *op, bool wait) | |
341 | { | |
342 | return spawn(NULL, op, wait); | |
343 | } | |
344 | ||
f67539c2 TL |
345 | RGWCoroutinesStack *RGWCoroutinesStack::prealloc_stack() |
346 | { | |
347 | if (!preallocated_stack) { | |
348 | preallocated_stack = env->manager->allocate_stack(); | |
349 | } | |
350 | return preallocated_stack; | |
351 | } | |
352 | ||
7c673cae FG |
353 | int RGWCoroutinesStack::wait(const utime_t& interval) |
354 | { | |
355 | RGWCompletionManager *completion_mgr = env->manager->get_completion_mgr(); | |
356 | completion_mgr->wait_interval((void *)this, interval, (void *)this); | |
357 | set_io_blocked(true); | |
358 | set_interval_wait(true); | |
359 | return 0; | |
360 | } | |
361 | ||
362 | void RGWCoroutinesStack::wakeup() | |
363 | { | |
364 | RGWCompletionManager *completion_mgr = env->manager->get_completion_mgr(); | |
365 | completion_mgr->wakeup((void *)this); | |
366 | } | |
367 | ||
11fdf7f2 TL |
368 | void RGWCoroutinesStack::io_complete(const rgw_io_id& io_id) |
369 | { | |
370 | RGWCompletionManager *completion_mgr = env->manager->get_completion_mgr(); | |
371 | completion_mgr->complete(nullptr, io_id, (void *)this); | |
372 | } | |
373 | ||
7c673cae FG |
374 | int RGWCoroutinesStack::unwind(int retcode) |
375 | { | |
376 | rgw_spawned_stacks *src_spawned = &(*pos)->spawned; | |
377 | ||
378 | if (pos == ops.begin()) { | |
11fdf7f2 | 379 | ldout(cct, 15) << "stack " << (void *)this << " end" << dendl; |
7c673cae FG |
380 | spawned.inherit(src_spawned); |
381 | ops.clear(); | |
382 | pos = ops.end(); | |
383 | return retcode; | |
384 | } | |
385 | ||
386 | --pos; | |
387 | ops.pop_back(); | |
388 | RGWCoroutine *op = *pos; | |
389 | op->set_retcode(retcode); | |
390 | op->spawned.inherit(src_spawned); | |
391 | return 0; | |
392 | } | |
393 | ||
11fdf7f2 TL |
394 | void RGWCoroutinesStack::cancel() |
395 | { | |
396 | while (!ops.empty()) { | |
397 | RGWCoroutine *op = *pos; | |
398 | unwind(-ECANCELED); | |
399 | op->put(); | |
400 | } | |
401 | put(); | |
402 | } | |
7c673cae | 403 | |
f67539c2 | 404 | bool RGWCoroutinesStack::collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id) /* returns true if needs to be called again */ |
7c673cae | 405 | { |
11fdf7f2 | 406 | bool need_retry = false; |
7c673cae FG |
407 | rgw_spawned_stacks *s = (op ? &op->spawned : &spawned); |
408 | *ret = 0; | |
409 | vector<RGWCoroutinesStack *> new_list; | |
410 | ||
411 | for (vector<RGWCoroutinesStack *>::iterator iter = s->entries.begin(); iter != s->entries.end(); ++iter) { | |
412 | RGWCoroutinesStack *stack = *iter; | |
413 | if (stack == skip_stack || !stack->is_done()) { | |
414 | new_list.push_back(stack); | |
415 | if (!stack->is_done()) { | |
416 | ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " is still running" << dendl; | |
417 | } else if (stack == skip_stack) { | |
418 | ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " explicitly skipping stack" << dendl; | |
419 | } | |
420 | continue; | |
421 | } | |
f67539c2 TL |
422 | if (stack_id) { |
423 | *stack_id = stack->get_id(); | |
424 | } | |
7c673cae FG |
425 | int r = stack->get_ret_status(); |
426 | stack->put(); | |
427 | if (r < 0) { | |
428 | *ret = r; | |
429 | ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " encountered error (r=" << r << "), skipping next stacks" << dendl; | |
430 | new_list.insert(new_list.end(), ++iter, s->entries.end()); | |
11fdf7f2 | 431 | need_retry = (iter != s->entries.end()); |
7c673cae FG |
432 | break; |
433 | } | |
434 | ||
435 | ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " is complete" << dendl; | |
436 | } | |
437 | ||
438 | s->entries.swap(new_list); | |
11fdf7f2 | 439 | return need_retry; |
7c673cae FG |
440 | } |
441 | ||
442 | bool RGWCoroutinesStack::collect_next(RGWCoroutine *op, int *ret, RGWCoroutinesStack **collected_stack) /* returns true if found a stack to collect */ | |
443 | { | |
444 | rgw_spawned_stacks *s = (op ? &op->spawned : &spawned); | |
445 | *ret = 0; | |
446 | ||
447 | if (collected_stack) { | |
448 | *collected_stack = NULL; | |
449 | } | |
450 | ||
451 | for (vector<RGWCoroutinesStack *>::iterator iter = s->entries.begin(); iter != s->entries.end(); ++iter) { | |
452 | RGWCoroutinesStack *stack = *iter; | |
453 | if (!stack->is_done()) { | |
454 | continue; | |
455 | } | |
456 | int r = stack->get_ret_status(); | |
457 | if (r < 0) { | |
458 | *ret = r; | |
459 | } | |
460 | ||
461 | if (collected_stack) { | |
462 | *collected_stack = stack; | |
463 | } | |
464 | stack->put(); | |
465 | ||
466 | s->entries.erase(iter); | |
467 | return true; | |
468 | } | |
469 | ||
470 | return false; | |
471 | } | |
472 | ||
f67539c2 | 473 | bool RGWCoroutinesStack::collect(int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id) /* returns true if needs to be called again */ |
7c673cae | 474 | { |
f67539c2 | 475 | return collect(NULL, ret, skip_stack, stack_id); |
7c673cae FG |
476 | } |
477 | ||
478 | static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg) | |
479 | { | |
11fdf7f2 | 480 | (static_cast<RGWAioCompletionNotifier *>(arg))->cb(); |
7c673cae FG |
481 | } |
482 | ||
11fdf7f2 TL |
483 | RGWAioCompletionNotifier::RGWAioCompletionNotifier(RGWCompletionManager *_mgr, const rgw_io_id& _io_id, void *_user_data) : completion_mgr(_mgr), |
484 | io_id(_io_id), | |
9f95a23c TL |
485 | user_data(_user_data), registered(true) { |
486 | c = librados::Rados::aio_create_completion(this, _aio_completion_notifier_cb); | |
7c673cae FG |
487 | } |
488 | ||
489 | RGWAioCompletionNotifier *RGWCoroutinesStack::create_completion_notifier() | |
490 | { | |
491 | return ops_mgr->create_completion_notifier(this); | |
492 | } | |
493 | ||
494 | RGWCompletionManager *RGWCoroutinesStack::get_completion_mgr() | |
495 | { | |
496 | return ops_mgr->get_completion_mgr(); | |
497 | } | |
498 | ||
499 | bool RGWCoroutinesStack::unblock_stack(RGWCoroutinesStack **s) | |
500 | { | |
501 | if (blocking_stacks.empty()) { | |
502 | return false; | |
503 | } | |
504 | ||
505 | set<RGWCoroutinesStack *>::iterator iter = blocking_stacks.begin(); | |
506 | *s = *iter; | |
507 | blocking_stacks.erase(iter); | |
508 | (*s)->blocked_by_stack.erase(this); | |
509 | ||
510 | return true; | |
511 | } | |
512 | ||
513 | void RGWCoroutinesManager::report_error(RGWCoroutinesStack *op) | |
514 | { | |
515 | if (!op) { | |
516 | return; | |
517 | } | |
518 | string err = op->error_str(); | |
519 | if (err.empty()) { | |
520 | return; | |
521 | } | |
522 | lderr(cct) << "ERROR: failed operation: " << op->error_str() << dendl; | |
523 | } | |
524 | ||
525 | void RGWCoroutinesStack::dump(Formatter *f) const { | |
526 | stringstream ss; | |
527 | ss << (void *)this; | |
528 | ::encode_json("stack", ss.str(), f); | |
529 | ::encode_json("run_count", run_count, f); | |
530 | f->open_array_section("ops"); | |
531 | for (auto& i : ops) { | |
532 | encode_json("op", *i, f); | |
533 | } | |
534 | f->close_section(); | |
535 | } | |
536 | ||
11fdf7f2 | 537 | void RGWCoroutinesStack::init_new_io(RGWIOProvider *io_provider) |
7c673cae | 538 | { |
11fdf7f2 TL |
539 | io_provider->set_io_user_info((void *)this); |
540 | io_provider->assign_io(env->manager->get_io_id_provider()); | |
541 | } | |
542 | ||
543 | bool RGWCoroutinesStack::try_io_unblock(const rgw_io_id& io_id) | |
544 | { | |
545 | if (!can_io_unblock(io_id)) { | |
546 | auto p = io_finish_ids.emplace(io_id.id, io_id); | |
547 | auto& iter = p.first; | |
548 | bool inserted = p.second; | |
549 | if (!inserted) { /* could not insert, entry already existed, add channel to completion mask */ | |
550 | iter->second.channels |= io_id.channels; | |
551 | } | |
552 | return false; | |
553 | } | |
554 | ||
555 | return true; | |
556 | } | |
557 | ||
558 | bool RGWCoroutinesStack::consume_io_finish(const rgw_io_id& io_id) | |
559 | { | |
560 | auto iter = io_finish_ids.find(io_id.id); | |
561 | if (iter == io_finish_ids.end()) { | |
562 | return false; | |
563 | } | |
564 | int finish_mask = iter->second.channels; | |
565 | bool found = (finish_mask & io_id.channels) != 0; | |
566 | ||
567 | finish_mask &= ~(finish_mask & io_id.channels); | |
568 | ||
569 | if (finish_mask == 0) { | |
570 | io_finish_ids.erase(iter); | |
571 | } | |
572 | return found; | |
573 | } | |
574 | ||
575 | ||
576 | void RGWCoroutinesManager::handle_unblocked_stack(set<RGWCoroutinesStack *>& context_stacks, list<RGWCoroutinesStack *>& scheduled_stacks, | |
577 | RGWCompletionManager::io_completion& io, int *blocked_count) | |
578 | { | |
9f95a23c | 579 | ceph_assert(ceph_mutex_is_wlocked(lock)); |
11fdf7f2 TL |
580 | RGWCoroutinesStack *stack = static_cast<RGWCoroutinesStack *>(io.user_info); |
581 | if (context_stacks.find(stack) == context_stacks.end()) { | |
582 | return; | |
583 | } | |
584 | if (!stack->try_io_unblock(io.io_id)) { | |
585 | return; | |
586 | } | |
587 | if (stack->is_io_blocked()) { | |
588 | --(*blocked_count); | |
589 | stack->set_io_blocked(false); | |
590 | } | |
7c673cae FG |
591 | stack->set_interval_wait(false); |
592 | if (!stack->is_done()) { | |
11fdf7f2 TL |
593 | if (!stack->is_scheduled) { |
594 | scheduled_stacks.push_back(stack); | |
595 | stack->set_is_scheduled(true); | |
596 | } | |
7c673cae | 597 | } else { |
7c673cae FG |
598 | context_stacks.erase(stack); |
599 | stack->put(); | |
600 | } | |
601 | } | |
602 | ||
603 | void RGWCoroutinesManager::schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack) | |
604 | { | |
9f95a23c | 605 | std::unique_lock wl{lock}; |
11fdf7f2 TL |
606 | _schedule(env, stack); |
607 | } | |
608 | ||
609 | void RGWCoroutinesManager::_schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack) | |
610 | { | |
9f95a23c | 611 | ceph_assert(ceph_mutex_is_wlocked(lock)); |
11fdf7f2 TL |
612 | if (!stack->is_scheduled) { |
613 | env->scheduled_stacks->push_back(stack); | |
614 | stack->set_is_scheduled(true); | |
615 | } | |
7c673cae FG |
616 | set<RGWCoroutinesStack *>& context_stacks = run_contexts[env->run_context]; |
617 | context_stacks.insert(stack); | |
618 | } | |
619 | ||
11fdf7f2 TL |
620 | void RGWCoroutinesManager::set_sleeping(RGWCoroutine *cr, bool flag) |
621 | { | |
622 | cr->set_sleeping(flag); | |
623 | } | |
624 | ||
625 | void RGWCoroutinesManager::io_complete(RGWCoroutine *cr, const rgw_io_id& io_id) | |
626 | { | |
627 | cr->io_complete(io_id); | |
628 | } | |
629 | ||
b3b6e05e | 630 | int RGWCoroutinesManager::run(const DoutPrefixProvider *dpp, list<RGWCoroutinesStack *>& stacks) |
7c673cae FG |
631 | { |
632 | int ret = 0; | |
633 | int blocked_count = 0; | |
634 | int interval_wait_count = 0; | |
635 | bool canceled = false; // set on going_down | |
636 | RGWCoroutinesEnv env; | |
11fdf7f2 | 637 | bool op_not_blocked; |
7c673cae FG |
638 | |
639 | uint64_t run_context = ++run_context_count; | |
640 | ||
9f95a23c | 641 | lock.lock(); |
7c673cae FG |
642 | set<RGWCoroutinesStack *>& context_stacks = run_contexts[run_context]; |
643 | list<RGWCoroutinesStack *> scheduled_stacks; | |
644 | for (auto& st : stacks) { | |
645 | context_stacks.insert(st); | |
646 | scheduled_stacks.push_back(st); | |
11fdf7f2 | 647 | st->set_is_scheduled(true); |
7c673cae | 648 | } |
7c673cae FG |
649 | env.run_context = run_context; |
650 | env.manager = this; | |
651 | env.scheduled_stacks = &scheduled_stacks; | |
652 | ||
653 | for (list<RGWCoroutinesStack *>::iterator iter = scheduled_stacks.begin(); iter != scheduled_stacks.end() && !going_down;) { | |
11fdf7f2 | 654 | RGWCompletionManager::io_completion io; |
7c673cae | 655 | RGWCoroutinesStack *stack = *iter; |
11fdf7f2 TL |
656 | ++iter; |
657 | scheduled_stacks.pop_front(); | |
658 | ||
659 | if (context_stacks.find(stack) == context_stacks.end()) { | |
660 | /* stack was probably schedule more than once due to IO, but was since complete */ | |
661 | goto next; | |
662 | } | |
7c673cae FG |
663 | env.stack = stack; |
664 | ||
11fdf7f2 TL |
665 | lock.unlock(); |
666 | ||
b3b6e05e | 667 | ret = stack->operate(dpp, &env); |
11fdf7f2 | 668 | |
9f95a23c | 669 | lock.lock(); |
11fdf7f2 | 670 | |
7c673cae FG |
671 | stack->set_is_scheduled(false); |
672 | if (ret < 0) { | |
b3b6e05e | 673 | ldpp_dout(dpp, 20) << "stack->operate() returned ret=" << ret << dendl; |
7c673cae FG |
674 | } |
675 | ||
676 | if (stack->is_error()) { | |
677 | report_error(stack); | |
678 | } | |
679 | ||
11fdf7f2 | 680 | op_not_blocked = false; |
7c673cae FG |
681 | |
682 | if (stack->is_io_blocked()) { | |
683 | ldout(cct, 20) << __func__ << ":" << " stack=" << (void *)stack << " is io blocked" << dendl; | |
684 | if (stack->is_interval_waiting()) { | |
685 | interval_wait_count++; | |
686 | } | |
687 | blocked_count++; | |
688 | } else if (stack->is_blocked()) { | |
689 | /* do nothing, we'll re-add the stack when the blocking stack is done, | |
690 | * or when we're awaken | |
691 | */ | |
692 | ldout(cct, 20) << __func__ << ":" << " stack=" << (void *)stack << " is_blocked_by_stack()=" << stack->is_blocked_by_stack() | |
693 | << " is_sleeping=" << stack->is_sleeping() << " waiting_for_child()=" << stack->waiting_for_child() << dendl; | |
694 | } else if (stack->is_done()) { | |
695 | ldout(cct, 20) << __func__ << ":" << " stack=" << (void *)stack << " is done" << dendl; | |
696 | RGWCoroutinesStack *s; | |
697 | while (stack->unblock_stack(&s)) { | |
698 | if (!s->is_blocked_by_stack() && !s->is_done()) { | |
699 | if (s->is_io_blocked()) { | |
700 | if (stack->is_interval_waiting()) { | |
701 | interval_wait_count++; | |
702 | } | |
703 | blocked_count++; | |
704 | } else { | |
11fdf7f2 | 705 | s->_schedule(); |
7c673cae FG |
706 | } |
707 | } | |
708 | } | |
709 | if (stack->parent && stack->parent->waiting_for_child()) { | |
710 | stack->parent->set_wait_for_child(false); | |
11fdf7f2 | 711 | stack->parent->_schedule(); |
7c673cae FG |
712 | } |
713 | context_stacks.erase(stack); | |
714 | stack->put(); | |
715 | stack = NULL; | |
716 | } else { | |
717 | op_not_blocked = true; | |
718 | stack->run_count++; | |
11fdf7f2 | 719 | stack->_schedule(); |
7c673cae FG |
720 | } |
721 | ||
722 | if (!op_not_blocked && stack) { | |
723 | stack->run_count = 0; | |
724 | } | |
725 | ||
11fdf7f2 TL |
726 | while (completion_mgr->try_get_next(&io)) { |
727 | handle_unblocked_stack(context_stacks, scheduled_stacks, io, &blocked_count); | |
7c673cae FG |
728 | } |
729 | ||
730 | /* | |
731 | * only account blocked operations that are not in interval_wait, these are stacks that | |
732 | * were put on a wait without any real IO operations. While we mark these as io_blocked, | |
733 | * these aren't really waiting for IOs | |
734 | */ | |
735 | while (blocked_count - interval_wait_count >= ops_window) { | |
11fdf7f2 TL |
736 | lock.unlock(); |
737 | ret = completion_mgr->get_next(&io); | |
9f95a23c | 738 | lock.lock(); |
7c673cae | 739 | if (ret < 0) { |
28e407b8 | 740 | ldout(cct, 5) << "completion_mgr.get_next() returned ret=" << ret << dendl; |
7c673cae | 741 | } |
11fdf7f2 | 742 | handle_unblocked_stack(context_stacks, scheduled_stacks, io, &blocked_count); |
7c673cae FG |
743 | } |
744 | ||
11fdf7f2 | 745 | next: |
7c673cae | 746 | while (scheduled_stacks.empty() && blocked_count > 0) { |
11fdf7f2 TL |
747 | lock.unlock(); |
748 | ret = completion_mgr->get_next(&io); | |
9f95a23c | 749 | lock.lock(); |
7c673cae | 750 | if (ret < 0) { |
28e407b8 | 751 | ldout(cct, 5) << "completion_mgr.get_next() returned ret=" << ret << dendl; |
7c673cae FG |
752 | } |
753 | if (going_down) { | |
754 | ldout(cct, 5) << __func__ << "(): was stopped, exiting" << dendl; | |
755 | ret = -ECANCELED; | |
756 | canceled = true; | |
757 | break; | |
758 | } | |
11fdf7f2 | 759 | handle_unblocked_stack(context_stacks, scheduled_stacks, io, &blocked_count); |
7c673cae FG |
760 | iter = scheduled_stacks.begin(); |
761 | } | |
762 | if (canceled) { | |
763 | break; | |
764 | } | |
765 | ||
766 | if (iter == scheduled_stacks.end()) { | |
767 | iter = scheduled_stacks.begin(); | |
768 | } | |
769 | } | |
770 | ||
7c673cae FG |
771 | if (!context_stacks.empty() && !going_down) { |
772 | JSONFormatter formatter(true); | |
773 | formatter.open_array_section("context_stacks"); | |
774 | for (auto& s : context_stacks) { | |
775 | ::encode_json("entry", *s, &formatter); | |
776 | } | |
777 | formatter.close_section(); | |
778 | lderr(cct) << __func__ << "(): ERROR: deadlock detected, dumping remaining coroutines:\n"; | |
779 | formatter.flush(*_dout); | |
780 | *_dout << dendl; | |
11fdf7f2 | 781 | ceph_assert(context_stacks.empty() || going_down); // assert on deadlock |
7c673cae FG |
782 | } |
783 | ||
784 | for (auto stack : context_stacks) { | |
785 | ldout(cct, 20) << "clearing stack on run() exit: stack=" << (void *)stack << " nref=" << stack->get_nref() << dendl; | |
11fdf7f2 | 786 | stack->cancel(); |
7c673cae FG |
787 | } |
788 | run_contexts.erase(run_context); | |
789 | lock.unlock(); | |
790 | ||
791 | return ret; | |
792 | } | |
793 | ||
b3b6e05e | 794 | int RGWCoroutinesManager::run(const DoutPrefixProvider *dpp, RGWCoroutine *op) |
7c673cae FG |
795 | { |
796 | if (!op) { | |
797 | return 0; | |
798 | } | |
799 | list<RGWCoroutinesStack *> stacks; | |
800 | RGWCoroutinesStack *stack = allocate_stack(); | |
801 | op->get(); | |
802 | stack->call(op); | |
803 | ||
11fdf7f2 | 804 | stacks.push_back(stack); |
7c673cae | 805 | |
b3b6e05e | 806 | int r = run(dpp, stacks); |
7c673cae | 807 | if (r < 0) { |
b3b6e05e | 808 | ldpp_dout(dpp, 20) << "run(stacks) returned r=" << r << dendl; |
7c673cae FG |
809 | } else { |
810 | r = op->get_ret_status(); | |
811 | } | |
812 | op->put(); | |
813 | ||
814 | return r; | |
815 | } | |
816 | ||
817 | RGWAioCompletionNotifier *RGWCoroutinesManager::create_completion_notifier(RGWCoroutinesStack *stack) | |
818 | { | |
11fdf7f2 TL |
819 | rgw_io_id io_id{get_next_io_id(), -1}; |
820 | RGWAioCompletionNotifier *cn = new RGWAioCompletionNotifier(completion_mgr, io_id, (void *)stack); | |
7c673cae FG |
821 | completion_mgr->register_completion_notifier(cn); |
822 | return cn; | |
823 | } | |
824 | ||
825 | void RGWCoroutinesManager::dump(Formatter *f) const { | |
9f95a23c | 826 | std::shared_lock rl{lock}; |
7c673cae FG |
827 | |
828 | f->open_array_section("run_contexts"); | |
829 | for (auto& i : run_contexts) { | |
830 | f->open_object_section("context"); | |
831 | ::encode_json("id", i.first, f); | |
832 | f->open_array_section("entries"); | |
833 | for (auto& s : i.second) { | |
834 | ::encode_json("entry", *s, f); | |
835 | } | |
836 | f->close_section(); | |
837 | f->close_section(); | |
838 | } | |
839 | f->close_section(); | |
840 | } | |
841 | ||
842 | RGWCoroutinesStack *RGWCoroutinesManager::allocate_stack() { | |
843 | return new RGWCoroutinesStack(cct, this); | |
844 | } | |
845 | ||
846 | string RGWCoroutinesManager::get_id() | |
847 | { | |
848 | if (!id.empty()) { | |
849 | return id; | |
850 | } | |
851 | stringstream ss; | |
852 | ss << (void *)this; | |
853 | return ss.str(); | |
854 | } | |
855 | ||
856 | void RGWCoroutinesManagerRegistry::add(RGWCoroutinesManager *mgr) | |
857 | { | |
9f95a23c | 858 | std::unique_lock wl{lock}; |
7c673cae FG |
859 | if (managers.find(mgr) == managers.end()) { |
860 | managers.insert(mgr); | |
861 | get(); | |
862 | } | |
863 | } | |
864 | ||
865 | void RGWCoroutinesManagerRegistry::remove(RGWCoroutinesManager *mgr) | |
866 | { | |
9f95a23c | 867 | std::unique_lock wl{lock}; |
7c673cae FG |
868 | if (managers.find(mgr) != managers.end()) { |
869 | managers.erase(mgr); | |
870 | put(); | |
871 | } | |
872 | } | |
873 | ||
874 | RGWCoroutinesManagerRegistry::~RGWCoroutinesManagerRegistry() | |
875 | { | |
876 | AdminSocket *admin_socket = cct->get_admin_socket(); | |
877 | if (!admin_command.empty()) { | |
9f95a23c | 878 | admin_socket->unregister_commands(this); |
7c673cae FG |
879 | } |
880 | } | |
881 | ||
882 | int RGWCoroutinesManagerRegistry::hook_to_admin_command(const string& command) | |
883 | { | |
884 | AdminSocket *admin_socket = cct->get_admin_socket(); | |
885 | if (!admin_command.empty()) { | |
9f95a23c | 886 | admin_socket->unregister_commands(this); |
7c673cae FG |
887 | } |
888 | admin_command = command; | |
9f95a23c | 889 | int r = admin_socket->register_command(admin_command, this, |
7c673cae FG |
890 | "dump current coroutines stack state"); |
891 | if (r < 0) { | |
892 | lderr(cct) << "ERROR: fail to register admin socket command (r=" << r << ")" << dendl; | |
893 | return r; | |
894 | } | |
895 | return 0; | |
896 | } | |
897 | ||
9f95a23c TL |
898 | int RGWCoroutinesManagerRegistry::call(std::string_view command, |
899 | const cmdmap_t& cmdmap, | |
900 | Formatter *f, | |
901 | std::ostream& ss, | |
902 | bufferlist& out) { | |
903 | std::shared_lock rl{lock}; | |
904 | ::encode_json("cr_managers", *this, f); | |
905 | return 0; | |
7c673cae FG |
906 | } |
907 | ||
908 | void RGWCoroutinesManagerRegistry::dump(Formatter *f) const { | |
909 | f->open_array_section("coroutine_managers"); | |
910 | for (auto m : managers) { | |
911 | ::encode_json("entry", *m, f); | |
912 | } | |
913 | f->close_section(); | |
914 | } | |
915 | ||
916 | void RGWCoroutine::call(RGWCoroutine *op) | |
917 | { | |
eafe8130 TL |
918 | if (op) { |
919 | stack->call(op); | |
920 | } else { | |
921 | // the call()er expects this to set a retcode | |
922 | retcode = 0; | |
923 | } | |
7c673cae FG |
924 | } |
925 | ||
926 | RGWCoroutinesStack *RGWCoroutine::spawn(RGWCoroutine *op, bool wait) | |
927 | { | |
928 | return stack->spawn(this, op, wait); | |
929 | } | |
930 | ||
f67539c2 TL |
931 | RGWCoroutinesStack *RGWCoroutine::prealloc_stack() |
932 | { | |
933 | return stack->prealloc_stack(); | |
934 | } | |
935 | ||
936 | uint64_t RGWCoroutine::prealloc_stack_id() | |
7c673cae | 937 | { |
f67539c2 TL |
938 | return prealloc_stack()->get_id(); |
939 | } | |
940 | ||
941 | bool RGWCoroutine::collect(int *ret, RGWCoroutinesStack *skip_stack, uint64_t *stack_id) /* returns true if needs to be called again */ | |
942 | { | |
943 | return stack->collect(this, ret, skip_stack, stack_id); | |
7c673cae FG |
944 | } |
945 | ||
946 | bool RGWCoroutine::collect_next(int *ret, RGWCoroutinesStack **collected_stack) /* returns true if found a stack to collect */ | |
947 | { | |
948 | return stack->collect_next(this, ret, collected_stack); | |
949 | } | |
950 | ||
951 | int RGWCoroutine::wait(const utime_t& interval) | |
952 | { | |
953 | return stack->wait(interval); | |
954 | } | |
955 | ||
956 | void RGWCoroutine::wait_for_child() | |
957 | { | |
958 | /* should only wait for child if there is a child that is not done yet, and no complete children */ | |
959 | if (spawned.entries.empty()) { | |
960 | return; | |
961 | } | |
962 | for (vector<RGWCoroutinesStack *>::iterator iter = spawned.entries.begin(); iter != spawned.entries.end(); ++iter) { | |
963 | if ((*iter)->is_done()) { | |
964 | return; | |
965 | } | |
966 | } | |
967 | stack->set_wait_for_child(true); | |
968 | } | |
969 | ||
970 | string RGWCoroutine::to_str() const | |
971 | { | |
972 | return typeid(*this).name(); | |
973 | } | |
974 | ||
975 | ostream& operator<<(ostream& out, const RGWCoroutine& cr) | |
976 | { | |
977 | out << "cr:s=" << (void *)cr.get_stack() << ":op=" << (void *)&cr << ":" << typeid(cr).name(); | |
978 | return out; | |
979 | } | |
980 | ||
f67539c2 TL |
981 | bool RGWCoroutine::drain_children(int num_cr_left, |
982 | RGWCoroutinesStack *skip_stack, | |
983 | std::optional<std::function<void(uint64_t stack_id, int ret)> > cb) | |
7c673cae FG |
984 | { |
985 | bool done = false; | |
11fdf7f2 | 986 | ceph_assert(num_cr_left >= 0); |
7c673cae FG |
987 | if (num_cr_left == 0 && skip_stack) { |
988 | num_cr_left = 1; | |
989 | } | |
f67539c2 | 990 | reenter(&drain_status.cr) { |
7c673cae FG |
991 | while (num_spawned() > (size_t)num_cr_left) { |
992 | yield wait_for_child(); | |
993 | int ret; | |
f67539c2 TL |
994 | uint64_t stack_id; |
995 | bool again = false; | |
996 | do { | |
997 | again = collect(&ret, skip_stack, &stack_id); | |
998 | if (ret < 0) { | |
999 | ldout(cct, 10) << "collect() returned ret=" << ret << dendl; | |
1000 | /* we should have reported this error */ | |
1001 | log_error() << "ERROR: collect() returned error (ret=" << ret << ")"; | |
1002 | } | |
1003 | if (cb) { | |
1004 | (*cb)(stack_id, ret); | |
1005 | } | |
1006 | } while (again); | |
1007 | } | |
1008 | done = true; | |
1009 | } | |
1010 | return done; | |
1011 | } | |
1012 | ||
1013 | bool RGWCoroutine::drain_children(int num_cr_left, | |
1014 | std::optional<std::function<int(uint64_t stack_id, int ret)> > cb) | |
1015 | { | |
1016 | bool done = false; | |
1017 | ceph_assert(num_cr_left >= 0); | |
1018 | ||
1019 | reenter(&drain_status.cr) { | |
1020 | while (num_spawned() > (size_t)num_cr_left) { | |
1021 | yield wait_for_child(); | |
1022 | int ret; | |
1023 | uint64_t stack_id; | |
1024 | bool again = false; | |
1025 | do { | |
1026 | again = collect(&ret, nullptr, &stack_id); | |
7c673cae FG |
1027 | if (ret < 0) { |
1028 | ldout(cct, 10) << "collect() returned ret=" << ret << dendl; | |
1029 | /* we should have reported this error */ | |
1030 | log_error() << "ERROR: collect() returned error (ret=" << ret << ")"; | |
1031 | } | |
f67539c2 TL |
1032 | if (cb && !drain_status.should_exit) { |
1033 | int r = (*cb)(stack_id, ret); | |
1034 | if (r < 0) { | |
1035 | drain_status.ret = r; | |
1036 | drain_status.should_exit = true; | |
1037 | num_cr_left = 0; /* need to drain all */ | |
1038 | } | |
1039 | } | |
1040 | } while (again); | |
7c673cae FG |
1041 | } |
1042 | done = true; | |
1043 | } | |
1044 | return done; | |
1045 | } | |
1046 | ||
1047 | void RGWCoroutine::wakeup() | |
1048 | { | |
1049 | stack->wakeup(); | |
1050 | } | |
1051 | ||
11fdf7f2 TL |
1052 | RGWCoroutinesEnv *RGWCoroutine::get_env() const |
1053 | { | |
1054 | return stack->get_env(); | |
1055 | } | |
1056 | ||
7c673cae FG |
1057 | void RGWCoroutine::dump(Formatter *f) const { |
1058 | if (!description.str().empty()) { | |
1059 | encode_json("description", description.str(), f); | |
1060 | } | |
1061 | encode_json("type", to_str(), f); | |
1062 | if (!spawned.entries.empty()) { | |
1063 | f->open_array_section("spawned"); | |
1064 | for (auto& i : spawned.entries) { | |
1065 | char buf[32]; | |
1066 | snprintf(buf, sizeof(buf), "%p", (void *)i); | |
1067 | encode_json("stack", string(buf), f); | |
1068 | } | |
1069 | f->close_section(); | |
1070 | } | |
1071 | if (!status.history.empty()) { | |
1072 | encode_json("history", status.history, f); | |
1073 | } | |
1074 | ||
1075 | if (!status.status.str().empty()) { | |
1076 | f->open_object_section("status"); | |
1077 | encode_json("status", status.status.str(), f); | |
1078 | encode_json("timestamp", status.timestamp, f); | |
1079 | f->close_section(); | |
1080 | } | |
1081 | } | |
1082 | ||
1083 | RGWSimpleCoroutine::~RGWSimpleCoroutine() | |
1084 | { | |
1085 | if (!called_cleanup) { | |
1086 | request_cleanup(); | |
1087 | } | |
1088 | } | |
1089 | ||
1090 | void RGWSimpleCoroutine::call_cleanup() | |
1091 | { | |
1092 | called_cleanup = true; | |
1093 | request_cleanup(); | |
1094 | } | |
1095 | ||
b3b6e05e | 1096 | int RGWSimpleCoroutine::operate(const DoutPrefixProvider *dpp) |
7c673cae FG |
1097 | { |
1098 | int ret = 0; | |
1099 | reenter(this) { | |
1100 | yield return state_init(); | |
b3b6e05e | 1101 | yield return state_send_request(dpp); |
7c673cae FG |
1102 | yield return state_request_complete(); |
1103 | yield return state_all_complete(); | |
1104 | drain_all(); | |
1105 | call_cleanup(); | |
1106 | return set_state(RGWCoroutine_Done, ret); | |
1107 | } | |
1108 | return 0; | |
1109 | } | |
1110 | ||
1111 | int RGWSimpleCoroutine::state_init() | |
1112 | { | |
1113 | int ret = init(); | |
1114 | if (ret < 0) { | |
1115 | call_cleanup(); | |
1116 | return set_state(RGWCoroutine_Error, ret); | |
1117 | } | |
1118 | return 0; | |
1119 | } | |
1120 | ||
b3b6e05e | 1121 | int RGWSimpleCoroutine::state_send_request(const DoutPrefixProvider *dpp) |
7c673cae | 1122 | { |
b3b6e05e | 1123 | int ret = send_request(dpp); |
7c673cae FG |
1124 | if (ret < 0) { |
1125 | call_cleanup(); | |
1126 | return set_state(RGWCoroutine_Error, ret); | |
1127 | } | |
1128 | return io_block(0); | |
1129 | } | |
1130 | ||
1131 | int RGWSimpleCoroutine::state_request_complete() | |
1132 | { | |
1133 | int ret = request_complete(); | |
1134 | if (ret < 0) { | |
1135 | call_cleanup(); | |
1136 | return set_state(RGWCoroutine_Error, ret); | |
1137 | } | |
1138 | return 0; | |
1139 | } | |
1140 | ||
1141 | int RGWSimpleCoroutine::state_all_complete() | |
1142 | { | |
1143 | int ret = finish(); | |
1144 | if (ret < 0) { | |
1145 | call_cleanup(); | |
1146 | return set_state(RGWCoroutine_Error, ret); | |
1147 | } | |
1148 | return 0; | |
1149 | } | |
1150 | ||
1151 |