]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
7c673cae FG |
3 | |
4 | #include "common/ceph_json.h" | |
7c673cae | 5 | #include "rgw_coroutine.h" |
7c673cae FG |
6 | |
7 | // re-include our assert to clobber the system one; fix dout: | |
11fdf7f2 | 8 | #include "include/ceph_assert.h" |
7c673cae | 9 | |
31f18b77 FG |
10 | #include <boost/asio/yield.hpp> |
11 | ||
7c673cae | 12 | #define dout_subsys ceph_subsys_rgw |
11fdf7f2 | 13 | #define dout_context g_ceph_context |
7c673cae FG |
14 | |
15 | ||
16 | class RGWCompletionManager::WaitContext : public Context { | |
17 | RGWCompletionManager *manager; | |
18 | void *opaque; | |
19 | public: | |
20 | WaitContext(RGWCompletionManager *_cm, void *_opaque) : manager(_cm), opaque(_opaque) {} | |
21 | void finish(int r) override { | |
22 | manager->_wakeup(opaque); | |
23 | } | |
24 | }; | |
25 | ||
26 | RGWCompletionManager::RGWCompletionManager(CephContext *_cct) : cct(_cct), lock("RGWCompletionManager::lock"), | |
27 | timer(cct, lock) | |
28 | { | |
29 | timer.init(); | |
30 | } | |
31 | ||
32 | RGWCompletionManager::~RGWCompletionManager() | |
33 | { | |
34 | Mutex::Locker l(lock); | |
35 | timer.cancel_all_events(); | |
36 | timer.shutdown(); | |
37 | } | |
38 | ||
11fdf7f2 | 39 | void RGWCompletionManager::complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info) |
7c673cae FG |
40 | { |
41 | Mutex::Locker l(lock); | |
11fdf7f2 | 42 | _complete(cn, io_id, user_info); |
7c673cae FG |
43 | } |
44 | ||
45 | void RGWCompletionManager::register_completion_notifier(RGWAioCompletionNotifier *cn) | |
46 | { | |
47 | Mutex::Locker l(lock); | |
48 | if (cn) { | |
49 | cns.insert(cn); | |
50 | } | |
51 | } | |
52 | ||
53 | void RGWCompletionManager::unregister_completion_notifier(RGWAioCompletionNotifier *cn) | |
54 | { | |
55 | Mutex::Locker l(lock); | |
56 | if (cn) { | |
57 | cns.erase(cn); | |
58 | } | |
59 | } | |
60 | ||
11fdf7f2 | 61 | void RGWCompletionManager::_complete(RGWAioCompletionNotifier *cn, const rgw_io_id& io_id, void *user_info) |
7c673cae FG |
62 | { |
63 | if (cn) { | |
64 | cns.erase(cn); | |
65 | } | |
11fdf7f2 TL |
66 | |
67 | if (complete_reqs_set.find(io_id) != complete_reqs_set.end()) { | |
68 | /* already have completion for this io_id, don't allow multiple completions for it */ | |
69 | return; | |
70 | } | |
71 | complete_reqs.push_back(io_completion{io_id, user_info}); | |
7c673cae FG |
72 | cond.Signal(); |
73 | } | |
74 | ||
11fdf7f2 | 75 | int RGWCompletionManager::get_next(io_completion *io) |
7c673cae FG |
76 | { |
77 | Mutex::Locker l(lock); | |
78 | while (complete_reqs.empty()) { | |
7c673cae FG |
79 | if (going_down) { |
80 | return -ECANCELED; | |
81 | } | |
11fdf7f2 | 82 | cond.Wait(lock); |
7c673cae | 83 | } |
11fdf7f2 TL |
84 | *io = complete_reqs.front(); |
85 | complete_reqs_set.erase(io->io_id); | |
7c673cae FG |
86 | complete_reqs.pop_front(); |
87 | return 0; | |
88 | } | |
89 | ||
11fdf7f2 | 90 | bool RGWCompletionManager::try_get_next(io_completion *io) |
7c673cae FG |
91 | { |
92 | Mutex::Locker l(lock); | |
93 | if (complete_reqs.empty()) { | |
94 | return false; | |
95 | } | |
11fdf7f2 TL |
96 | *io = complete_reqs.front(); |
97 | complete_reqs_set.erase(io->io_id); | |
7c673cae FG |
98 | complete_reqs.pop_front(); |
99 | return true; | |
100 | } | |
101 | ||
102 | void RGWCompletionManager::go_down() | |
103 | { | |
104 | Mutex::Locker l(lock); | |
105 | for (auto cn : cns) { | |
106 | cn->unregister(); | |
107 | } | |
108 | going_down = true; | |
109 | cond.Signal(); | |
110 | } | |
111 | ||
112 | void RGWCompletionManager::wait_interval(void *opaque, const utime_t& interval, void *user_info) | |
113 | { | |
114 | Mutex::Locker l(lock); | |
11fdf7f2 | 115 | ceph_assert(waiters.find(opaque) == waiters.end()); |
7c673cae FG |
116 | waiters[opaque] = user_info; |
117 | timer.add_event_after(interval, new WaitContext(this, opaque)); | |
118 | } | |
119 | ||
120 | void RGWCompletionManager::wakeup(void *opaque) | |
121 | { | |
122 | Mutex::Locker l(lock); | |
123 | _wakeup(opaque); | |
124 | } | |
125 | ||
126 | void RGWCompletionManager::_wakeup(void *opaque) | |
127 | { | |
128 | map<void *, void *>::iterator iter = waiters.find(opaque); | |
129 | if (iter != waiters.end()) { | |
130 | void *user_id = iter->second; | |
131 | waiters.erase(iter); | |
11fdf7f2 | 132 | _complete(NULL, rgw_io_id{0, -1} /* no IO id */, user_id); |
7c673cae FG |
133 | } |
134 | } | |
135 | ||
136 | RGWCoroutine::~RGWCoroutine() { | |
137 | for (auto stack : spawned.entries) { | |
138 | stack->put(); | |
139 | } | |
140 | } | |
141 | ||
11fdf7f2 TL |
142 | void RGWCoroutine::init_new_io(RGWIOProvider *io_provider) |
143 | { | |
144 | stack->init_new_io(io_provider); | |
145 | } | |
146 | ||
7c673cae FG |
147 | void RGWCoroutine::set_io_blocked(bool flag) { |
148 | stack->set_io_blocked(flag); | |
149 | } | |
150 | ||
151 | void RGWCoroutine::set_sleeping(bool flag) { | |
152 | stack->set_sleeping(flag); | |
153 | } | |
154 | ||
11fdf7f2 TL |
155 | int RGWCoroutine::io_block(int ret, int64_t io_id) { |
156 | return io_block(ret, rgw_io_id{io_id, -1}); | |
157 | } | |
158 | ||
159 | int RGWCoroutine::io_block(int ret, const rgw_io_id& io_id) { | |
160 | if (stack->consume_io_finish(io_id)) { | |
161 | return 0; | |
162 | } | |
7c673cae | 163 | set_io_blocked(true); |
11fdf7f2 | 164 | stack->set_io_blocked_id(io_id); |
7c673cae FG |
165 | return ret; |
166 | } | |
167 | ||
11fdf7f2 TL |
168 | void RGWCoroutine::io_complete(const rgw_io_id& io_id) { |
169 | stack->io_complete(io_id); | |
170 | } | |
171 | ||
7c673cae FG |
172 | void RGWCoroutine::StatusItem::dump(Formatter *f) const { |
173 | ::encode_json("timestamp", timestamp, f); | |
174 | ::encode_json("status", status, f); | |
175 | } | |
176 | ||
177 | stringstream& RGWCoroutine::Status::set_status() | |
178 | { | |
179 | RWLock::WLocker l(lock); | |
180 | string s = status.str(); | |
181 | status.str(string()); | |
182 | if (!timestamp.is_zero()) { | |
183 | history.push_back(StatusItem(timestamp, s)); | |
184 | } | |
185 | if (history.size() > (size_t)max_history) { | |
186 | history.pop_front(); | |
187 | } | |
188 | timestamp = ceph_clock_now(); | |
189 | ||
190 | return status; | |
191 | } | |
192 | ||
11fdf7f2 TL |
193 | int64_t RGWCoroutinesManager::get_next_io_id() |
194 | { | |
195 | return (int64_t)++max_io_id; | |
196 | } | |
197 | ||
7c673cae FG |
198 | RGWCoroutinesStack::RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start) : cct(_cct), ops_mgr(_ops_mgr), |
199 | done_flag(false), error_flag(false), blocked_flag(false), | |
200 | sleep_flag(false), interval_wait_flag(false), is_scheduled(false), is_waiting_for_child(false), | |
201 | retcode(0), run_count(0), | |
202 | env(NULL), parent(NULL) | |
203 | { | |
204 | if (start) { | |
205 | ops.push_back(start); | |
206 | } | |
207 | pos = ops.begin(); | |
208 | } | |
209 | ||
210 | RGWCoroutinesStack::~RGWCoroutinesStack() | |
211 | { | |
212 | for (auto op : ops) { | |
213 | op->put(); | |
214 | } | |
215 | ||
216 | for (auto stack : spawned.entries) { | |
217 | stack->put(); | |
218 | } | |
219 | } | |
220 | ||
221 | int RGWCoroutinesStack::operate(RGWCoroutinesEnv *_env) | |
222 | { | |
223 | env = _env; | |
224 | RGWCoroutine *op = *pos; | |
225 | op->stack = this; | |
226 | ldout(cct, 20) << *op << ": operate()" << dendl; | |
11fdf7f2 | 227 | int r = op->operate_wrapper(); |
7c673cae FG |
228 | if (r < 0) { |
229 | ldout(cct, 20) << *op << ": operate() returned r=" << r << dendl; | |
230 | } | |
231 | ||
232 | error_flag = op->is_error(); | |
233 | ||
234 | if (op->is_done()) { | |
235 | int op_retcode = r; | |
236 | r = unwind(op_retcode); | |
237 | op->put(); | |
238 | done_flag = (pos == ops.end()); | |
11fdf7f2 | 239 | blocked_flag &= !done_flag; |
7c673cae FG |
240 | if (done_flag) { |
241 | retcode = op_retcode; | |
242 | } | |
243 | return r; | |
244 | } | |
245 | ||
246 | /* should r ever be negative at this point? */ | |
11fdf7f2 | 247 | ceph_assert(r >= 0); |
7c673cae FG |
248 | |
249 | return 0; | |
250 | } | |
251 | ||
252 | string RGWCoroutinesStack::error_str() | |
253 | { | |
254 | if (pos != ops.end()) { | |
255 | return (*pos)->error_str(); | |
256 | } | |
257 | return string(); | |
258 | } | |
259 | ||
260 | void RGWCoroutinesStack::call(RGWCoroutine *next_op) { | |
261 | if (!next_op) { | |
262 | return; | |
263 | } | |
264 | ops.push_back(next_op); | |
265 | if (pos != ops.end()) { | |
266 | ++pos; | |
267 | } else { | |
268 | pos = ops.begin(); | |
269 | } | |
270 | } | |
271 | ||
11fdf7f2 TL |
272 | void RGWCoroutinesStack::schedule() |
273 | { | |
274 | env->manager->schedule(env, this); | |
275 | } | |
276 | ||
277 | void RGWCoroutinesStack::_schedule() | |
278 | { | |
279 | env->manager->_schedule(env, this); | |
280 | } | |
281 | ||
7c673cae FG |
282 | RGWCoroutinesStack *RGWCoroutinesStack::spawn(RGWCoroutine *source_op, RGWCoroutine *op, bool wait) |
283 | { | |
284 | if (!op) { | |
285 | return NULL; | |
286 | } | |
287 | ||
288 | rgw_spawned_stacks *s = (source_op ? &source_op->spawned : &spawned); | |
289 | ||
290 | RGWCoroutinesStack *stack = env->manager->allocate_stack(); | |
291 | s->add_pending(stack); | |
292 | stack->parent = this; | |
293 | ||
294 | stack->get(); /* we'll need to collect the stack */ | |
295 | stack->call(op); | |
296 | ||
297 | env->manager->schedule(env, stack); | |
298 | ||
299 | if (wait) { | |
300 | set_blocked_by(stack); | |
301 | } | |
302 | ||
303 | return stack; | |
304 | } | |
305 | ||
306 | RGWCoroutinesStack *RGWCoroutinesStack::spawn(RGWCoroutine *op, bool wait) | |
307 | { | |
308 | return spawn(NULL, op, wait); | |
309 | } | |
310 | ||
311 | int RGWCoroutinesStack::wait(const utime_t& interval) | |
312 | { | |
313 | RGWCompletionManager *completion_mgr = env->manager->get_completion_mgr(); | |
314 | completion_mgr->wait_interval((void *)this, interval, (void *)this); | |
315 | set_io_blocked(true); | |
316 | set_interval_wait(true); | |
317 | return 0; | |
318 | } | |
319 | ||
320 | void RGWCoroutinesStack::wakeup() | |
321 | { | |
322 | RGWCompletionManager *completion_mgr = env->manager->get_completion_mgr(); | |
323 | completion_mgr->wakeup((void *)this); | |
324 | } | |
325 | ||
11fdf7f2 TL |
326 | void RGWCoroutinesStack::io_complete(const rgw_io_id& io_id) |
327 | { | |
328 | RGWCompletionManager *completion_mgr = env->manager->get_completion_mgr(); | |
329 | completion_mgr->complete(nullptr, io_id, (void *)this); | |
330 | } | |
331 | ||
7c673cae FG |
332 | int RGWCoroutinesStack::unwind(int retcode) |
333 | { | |
334 | rgw_spawned_stacks *src_spawned = &(*pos)->spawned; | |
335 | ||
336 | if (pos == ops.begin()) { | |
11fdf7f2 | 337 | ldout(cct, 15) << "stack " << (void *)this << " end" << dendl; |
7c673cae FG |
338 | spawned.inherit(src_spawned); |
339 | ops.clear(); | |
340 | pos = ops.end(); | |
341 | return retcode; | |
342 | } | |
343 | ||
344 | --pos; | |
345 | ops.pop_back(); | |
346 | RGWCoroutine *op = *pos; | |
347 | op->set_retcode(retcode); | |
348 | op->spawned.inherit(src_spawned); | |
349 | return 0; | |
350 | } | |
351 | ||
11fdf7f2 TL |
352 | void RGWCoroutinesStack::cancel() |
353 | { | |
354 | while (!ops.empty()) { | |
355 | RGWCoroutine *op = *pos; | |
356 | unwind(-ECANCELED); | |
357 | op->put(); | |
358 | } | |
359 | put(); | |
360 | } | |
7c673cae FG |
361 | |
362 | bool RGWCoroutinesStack::collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack) /* returns true if needs to be called again */ | |
363 | { | |
11fdf7f2 | 364 | bool need_retry = false; |
7c673cae FG |
365 | rgw_spawned_stacks *s = (op ? &op->spawned : &spawned); |
366 | *ret = 0; | |
367 | vector<RGWCoroutinesStack *> new_list; | |
368 | ||
369 | for (vector<RGWCoroutinesStack *>::iterator iter = s->entries.begin(); iter != s->entries.end(); ++iter) { | |
370 | RGWCoroutinesStack *stack = *iter; | |
371 | if (stack == skip_stack || !stack->is_done()) { | |
372 | new_list.push_back(stack); | |
373 | if (!stack->is_done()) { | |
374 | ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " is still running" << dendl; | |
375 | } else if (stack == skip_stack) { | |
376 | ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " explicitly skipping stack" << dendl; | |
377 | } | |
378 | continue; | |
379 | } | |
380 | int r = stack->get_ret_status(); | |
381 | stack->put(); | |
382 | if (r < 0) { | |
383 | *ret = r; | |
384 | ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " encountered error (r=" << r << "), skipping next stacks" << dendl; | |
385 | new_list.insert(new_list.end(), ++iter, s->entries.end()); | |
11fdf7f2 | 386 | need_retry = (iter != s->entries.end()); |
7c673cae FG |
387 | break; |
388 | } | |
389 | ||
390 | ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " is complete" << dendl; | |
391 | } | |
392 | ||
393 | s->entries.swap(new_list); | |
11fdf7f2 | 394 | return need_retry; |
7c673cae FG |
395 | } |
396 | ||
397 | bool RGWCoroutinesStack::collect_next(RGWCoroutine *op, int *ret, RGWCoroutinesStack **collected_stack) /* returns true if found a stack to collect */ | |
398 | { | |
399 | rgw_spawned_stacks *s = (op ? &op->spawned : &spawned); | |
400 | *ret = 0; | |
401 | ||
402 | if (collected_stack) { | |
403 | *collected_stack = NULL; | |
404 | } | |
405 | ||
406 | for (vector<RGWCoroutinesStack *>::iterator iter = s->entries.begin(); iter != s->entries.end(); ++iter) { | |
407 | RGWCoroutinesStack *stack = *iter; | |
408 | if (!stack->is_done()) { | |
409 | continue; | |
410 | } | |
411 | int r = stack->get_ret_status(); | |
412 | if (r < 0) { | |
413 | *ret = r; | |
414 | } | |
415 | ||
416 | if (collected_stack) { | |
417 | *collected_stack = stack; | |
418 | } | |
419 | stack->put(); | |
420 | ||
421 | s->entries.erase(iter); | |
422 | return true; | |
423 | } | |
424 | ||
425 | return false; | |
426 | } | |
427 | ||
428 | bool RGWCoroutinesStack::collect(int *ret, RGWCoroutinesStack *skip_stack) /* returns true if needs to be called again */ | |
429 | { | |
430 | return collect(NULL, ret, skip_stack); | |
431 | } | |
432 | ||
433 | static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg) | |
434 | { | |
11fdf7f2 | 435 | (static_cast<RGWAioCompletionNotifier *>(arg))->cb(); |
7c673cae FG |
436 | } |
437 | ||
11fdf7f2 TL |
438 | RGWAioCompletionNotifier::RGWAioCompletionNotifier(RGWCompletionManager *_mgr, const rgw_io_id& _io_id, void *_user_data) : completion_mgr(_mgr), |
439 | io_id(_io_id), | |
7c673cae FG |
440 | user_data(_user_data), lock("RGWAioCompletionNotifier"), registered(true) { |
441 | c = librados::Rados::aio_create_completion((void *)this, NULL, | |
442 | _aio_completion_notifier_cb); | |
443 | } | |
444 | ||
445 | RGWAioCompletionNotifier *RGWCoroutinesStack::create_completion_notifier() | |
446 | { | |
447 | return ops_mgr->create_completion_notifier(this); | |
448 | } | |
449 | ||
450 | RGWCompletionManager *RGWCoroutinesStack::get_completion_mgr() | |
451 | { | |
452 | return ops_mgr->get_completion_mgr(); | |
453 | } | |
454 | ||
455 | bool RGWCoroutinesStack::unblock_stack(RGWCoroutinesStack **s) | |
456 | { | |
457 | if (blocking_stacks.empty()) { | |
458 | return false; | |
459 | } | |
460 | ||
461 | set<RGWCoroutinesStack *>::iterator iter = blocking_stacks.begin(); | |
462 | *s = *iter; | |
463 | blocking_stacks.erase(iter); | |
464 | (*s)->blocked_by_stack.erase(this); | |
465 | ||
466 | return true; | |
467 | } | |
468 | ||
469 | void RGWCoroutinesManager::report_error(RGWCoroutinesStack *op) | |
470 | { | |
471 | if (!op) { | |
472 | return; | |
473 | } | |
474 | string err = op->error_str(); | |
475 | if (err.empty()) { | |
476 | return; | |
477 | } | |
478 | lderr(cct) << "ERROR: failed operation: " << op->error_str() << dendl; | |
479 | } | |
480 | ||
481 | void RGWCoroutinesStack::dump(Formatter *f) const { | |
482 | stringstream ss; | |
483 | ss << (void *)this; | |
484 | ::encode_json("stack", ss.str(), f); | |
485 | ::encode_json("run_count", run_count, f); | |
486 | f->open_array_section("ops"); | |
487 | for (auto& i : ops) { | |
488 | encode_json("op", *i, f); | |
489 | } | |
490 | f->close_section(); | |
491 | } | |
492 | ||
11fdf7f2 | 493 | void RGWCoroutinesStack::init_new_io(RGWIOProvider *io_provider) |
7c673cae | 494 | { |
11fdf7f2 TL |
495 | io_provider->set_io_user_info((void *)this); |
496 | io_provider->assign_io(env->manager->get_io_id_provider()); | |
497 | } | |
498 | ||
499 | bool RGWCoroutinesStack::try_io_unblock(const rgw_io_id& io_id) | |
500 | { | |
501 | if (!can_io_unblock(io_id)) { | |
502 | auto p = io_finish_ids.emplace(io_id.id, io_id); | |
503 | auto& iter = p.first; | |
504 | bool inserted = p.second; | |
505 | if (!inserted) { /* could not insert, entry already existed, add channel to completion mask */ | |
506 | iter->second.channels |= io_id.channels; | |
507 | } | |
508 | return false; | |
509 | } | |
510 | ||
511 | return true; | |
512 | } | |
513 | ||
514 | bool RGWCoroutinesStack::consume_io_finish(const rgw_io_id& io_id) | |
515 | { | |
516 | auto iter = io_finish_ids.find(io_id.id); | |
517 | if (iter == io_finish_ids.end()) { | |
518 | return false; | |
519 | } | |
520 | int finish_mask = iter->second.channels; | |
521 | bool found = (finish_mask & io_id.channels) != 0; | |
522 | ||
523 | finish_mask &= ~(finish_mask & io_id.channels); | |
524 | ||
525 | if (finish_mask == 0) { | |
526 | io_finish_ids.erase(iter); | |
527 | } | |
528 | return found; | |
529 | } | |
530 | ||
531 | ||
532 | void RGWCoroutinesManager::handle_unblocked_stack(set<RGWCoroutinesStack *>& context_stacks, list<RGWCoroutinesStack *>& scheduled_stacks, | |
533 | RGWCompletionManager::io_completion& io, int *blocked_count) | |
534 | { | |
535 | ceph_assert(lock.is_wlocked()); | |
536 | RGWCoroutinesStack *stack = static_cast<RGWCoroutinesStack *>(io.user_info); | |
537 | if (context_stacks.find(stack) == context_stacks.end()) { | |
538 | return; | |
539 | } | |
540 | if (!stack->try_io_unblock(io.io_id)) { | |
541 | return; | |
542 | } | |
543 | if (stack->is_io_blocked()) { | |
544 | --(*blocked_count); | |
545 | stack->set_io_blocked(false); | |
546 | } | |
7c673cae FG |
547 | stack->set_interval_wait(false); |
548 | if (!stack->is_done()) { | |
11fdf7f2 TL |
549 | if (!stack->is_scheduled) { |
550 | scheduled_stacks.push_back(stack); | |
551 | stack->set_is_scheduled(true); | |
552 | } | |
7c673cae | 553 | } else { |
7c673cae FG |
554 | context_stacks.erase(stack); |
555 | stack->put(); | |
556 | } | |
557 | } | |
558 | ||
559 | void RGWCoroutinesManager::schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack) | |
560 | { | |
11fdf7f2 TL |
561 | RWLock::WLocker wl(lock); |
562 | _schedule(env, stack); | |
563 | } | |
564 | ||
565 | void RGWCoroutinesManager::_schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack) | |
566 | { | |
567 | ceph_assert(lock.is_wlocked()); | |
568 | if (!stack->is_scheduled) { | |
569 | env->scheduled_stacks->push_back(stack); | |
570 | stack->set_is_scheduled(true); | |
571 | } | |
7c673cae FG |
572 | set<RGWCoroutinesStack *>& context_stacks = run_contexts[env->run_context]; |
573 | context_stacks.insert(stack); | |
574 | } | |
575 | ||
11fdf7f2 TL |
576 | void RGWCoroutinesManager::set_sleeping(RGWCoroutine *cr, bool flag) |
577 | { | |
578 | cr->set_sleeping(flag); | |
579 | } | |
580 | ||
581 | void RGWCoroutinesManager::io_complete(RGWCoroutine *cr, const rgw_io_id& io_id) | |
582 | { | |
583 | cr->io_complete(io_id); | |
584 | } | |
585 | ||
7c673cae FG |
586 | int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks) |
587 | { | |
588 | int ret = 0; | |
589 | int blocked_count = 0; | |
590 | int interval_wait_count = 0; | |
591 | bool canceled = false; // set on going_down | |
592 | RGWCoroutinesEnv env; | |
11fdf7f2 | 593 | bool op_not_blocked; |
7c673cae FG |
594 | |
595 | uint64_t run_context = ++run_context_count; | |
596 | ||
597 | lock.get_write(); | |
598 | set<RGWCoroutinesStack *>& context_stacks = run_contexts[run_context]; | |
599 | list<RGWCoroutinesStack *> scheduled_stacks; | |
600 | for (auto& st : stacks) { | |
601 | context_stacks.insert(st); | |
602 | scheduled_stacks.push_back(st); | |
11fdf7f2 | 603 | st->set_is_scheduled(true); |
7c673cae | 604 | } |
7c673cae FG |
605 | env.run_context = run_context; |
606 | env.manager = this; | |
607 | env.scheduled_stacks = &scheduled_stacks; | |
608 | ||
609 | for (list<RGWCoroutinesStack *>::iterator iter = scheduled_stacks.begin(); iter != scheduled_stacks.end() && !going_down;) { | |
11fdf7f2 | 610 | RGWCompletionManager::io_completion io; |
7c673cae | 611 | RGWCoroutinesStack *stack = *iter; |
11fdf7f2 TL |
612 | ++iter; |
613 | scheduled_stacks.pop_front(); | |
614 | ||
615 | if (context_stacks.find(stack) == context_stacks.end()) { | |
616 | /* stack was probably schedule more than once due to IO, but was since complete */ | |
617 | goto next; | |
618 | } | |
7c673cae FG |
619 | env.stack = stack; |
620 | ||
11fdf7f2 TL |
621 | lock.unlock(); |
622 | ||
7c673cae | 623 | ret = stack->operate(&env); |
11fdf7f2 TL |
624 | |
625 | lock.get_write(); | |
626 | ||
7c673cae FG |
627 | stack->set_is_scheduled(false); |
628 | if (ret < 0) { | |
629 | ldout(cct, 20) << "stack->operate() returned ret=" << ret << dendl; | |
630 | } | |
631 | ||
632 | if (stack->is_error()) { | |
633 | report_error(stack); | |
634 | } | |
635 | ||
11fdf7f2 | 636 | op_not_blocked = false; |
7c673cae FG |
637 | |
638 | if (stack->is_io_blocked()) { | |
639 | ldout(cct, 20) << __func__ << ":" << " stack=" << (void *)stack << " is io blocked" << dendl; | |
640 | if (stack->is_interval_waiting()) { | |
641 | interval_wait_count++; | |
642 | } | |
643 | blocked_count++; | |
644 | } else if (stack->is_blocked()) { | |
645 | /* do nothing, we'll re-add the stack when the blocking stack is done, | |
646 | * or when we're awaken | |
647 | */ | |
648 | ldout(cct, 20) << __func__ << ":" << " stack=" << (void *)stack << " is_blocked_by_stack()=" << stack->is_blocked_by_stack() | |
649 | << " is_sleeping=" << stack->is_sleeping() << " waiting_for_child()=" << stack->waiting_for_child() << dendl; | |
650 | } else if (stack->is_done()) { | |
651 | ldout(cct, 20) << __func__ << ":" << " stack=" << (void *)stack << " is done" << dendl; | |
652 | RGWCoroutinesStack *s; | |
653 | while (stack->unblock_stack(&s)) { | |
654 | if (!s->is_blocked_by_stack() && !s->is_done()) { | |
655 | if (s->is_io_blocked()) { | |
656 | if (stack->is_interval_waiting()) { | |
657 | interval_wait_count++; | |
658 | } | |
659 | blocked_count++; | |
660 | } else { | |
11fdf7f2 | 661 | s->_schedule(); |
7c673cae FG |
662 | } |
663 | } | |
664 | } | |
665 | if (stack->parent && stack->parent->waiting_for_child()) { | |
666 | stack->parent->set_wait_for_child(false); | |
11fdf7f2 | 667 | stack->parent->_schedule(); |
7c673cae FG |
668 | } |
669 | context_stacks.erase(stack); | |
670 | stack->put(); | |
671 | stack = NULL; | |
672 | } else { | |
673 | op_not_blocked = true; | |
674 | stack->run_count++; | |
11fdf7f2 | 675 | stack->_schedule(); |
7c673cae FG |
676 | } |
677 | ||
678 | if (!op_not_blocked && stack) { | |
679 | stack->run_count = 0; | |
680 | } | |
681 | ||
11fdf7f2 TL |
682 | while (completion_mgr->try_get_next(&io)) { |
683 | handle_unblocked_stack(context_stacks, scheduled_stacks, io, &blocked_count); | |
7c673cae FG |
684 | } |
685 | ||
686 | /* | |
687 | * only account blocked operations that are not in interval_wait, these are stacks that | |
688 | * were put on a wait without any real IO operations. While we mark these as io_blocked, | |
689 | * these aren't really waiting for IOs | |
690 | */ | |
691 | while (blocked_count - interval_wait_count >= ops_window) { | |
11fdf7f2 TL |
692 | lock.unlock(); |
693 | ret = completion_mgr->get_next(&io); | |
694 | lock.get_write(); | |
7c673cae | 695 | if (ret < 0) { |
28e407b8 | 696 | ldout(cct, 5) << "completion_mgr.get_next() returned ret=" << ret << dendl; |
7c673cae | 697 | } |
11fdf7f2 | 698 | handle_unblocked_stack(context_stacks, scheduled_stacks, io, &blocked_count); |
7c673cae FG |
699 | } |
700 | ||
11fdf7f2 | 701 | next: |
7c673cae | 702 | while (scheduled_stacks.empty() && blocked_count > 0) { |
11fdf7f2 TL |
703 | lock.unlock(); |
704 | ret = completion_mgr->get_next(&io); | |
705 | lock.get_write(); | |
7c673cae | 706 | if (ret < 0) { |
28e407b8 | 707 | ldout(cct, 5) << "completion_mgr.get_next() returned ret=" << ret << dendl; |
7c673cae FG |
708 | } |
709 | if (going_down) { | |
710 | ldout(cct, 5) << __func__ << "(): was stopped, exiting" << dendl; | |
711 | ret = -ECANCELED; | |
712 | canceled = true; | |
713 | break; | |
714 | } | |
11fdf7f2 | 715 | handle_unblocked_stack(context_stacks, scheduled_stacks, io, &blocked_count); |
7c673cae FG |
716 | iter = scheduled_stacks.begin(); |
717 | } | |
718 | if (canceled) { | |
719 | break; | |
720 | } | |
721 | ||
722 | if (iter == scheduled_stacks.end()) { | |
723 | iter = scheduled_stacks.begin(); | |
724 | } | |
725 | } | |
726 | ||
7c673cae FG |
727 | if (!context_stacks.empty() && !going_down) { |
728 | JSONFormatter formatter(true); | |
729 | formatter.open_array_section("context_stacks"); | |
730 | for (auto& s : context_stacks) { | |
731 | ::encode_json("entry", *s, &formatter); | |
732 | } | |
733 | formatter.close_section(); | |
734 | lderr(cct) << __func__ << "(): ERROR: deadlock detected, dumping remaining coroutines:\n"; | |
735 | formatter.flush(*_dout); | |
736 | *_dout << dendl; | |
11fdf7f2 | 737 | ceph_assert(context_stacks.empty() || going_down); // assert on deadlock |
7c673cae FG |
738 | } |
739 | ||
740 | for (auto stack : context_stacks) { | |
741 | ldout(cct, 20) << "clearing stack on run() exit: stack=" << (void *)stack << " nref=" << stack->get_nref() << dendl; | |
11fdf7f2 | 742 | stack->cancel(); |
7c673cae FG |
743 | } |
744 | run_contexts.erase(run_context); | |
745 | lock.unlock(); | |
746 | ||
747 | return ret; | |
748 | } | |
749 | ||
750 | int RGWCoroutinesManager::run(RGWCoroutine *op) | |
751 | { | |
752 | if (!op) { | |
753 | return 0; | |
754 | } | |
755 | list<RGWCoroutinesStack *> stacks; | |
756 | RGWCoroutinesStack *stack = allocate_stack(); | |
757 | op->get(); | |
758 | stack->call(op); | |
759 | ||
11fdf7f2 | 760 | stacks.push_back(stack); |
7c673cae FG |
761 | |
762 | int r = run(stacks); | |
763 | if (r < 0) { | |
764 | ldout(cct, 20) << "run(stacks) returned r=" << r << dendl; | |
765 | } else { | |
766 | r = op->get_ret_status(); | |
767 | } | |
768 | op->put(); | |
769 | ||
770 | return r; | |
771 | } | |
772 | ||
773 | RGWAioCompletionNotifier *RGWCoroutinesManager::create_completion_notifier(RGWCoroutinesStack *stack) | |
774 | { | |
11fdf7f2 TL |
775 | rgw_io_id io_id{get_next_io_id(), -1}; |
776 | RGWAioCompletionNotifier *cn = new RGWAioCompletionNotifier(completion_mgr, io_id, (void *)stack); | |
7c673cae FG |
777 | completion_mgr->register_completion_notifier(cn); |
778 | return cn; | |
779 | } | |
780 | ||
781 | void RGWCoroutinesManager::dump(Formatter *f) const { | |
782 | RWLock::RLocker rl(lock); | |
783 | ||
784 | f->open_array_section("run_contexts"); | |
785 | for (auto& i : run_contexts) { | |
786 | f->open_object_section("context"); | |
787 | ::encode_json("id", i.first, f); | |
788 | f->open_array_section("entries"); | |
789 | for (auto& s : i.second) { | |
790 | ::encode_json("entry", *s, f); | |
791 | } | |
792 | f->close_section(); | |
793 | f->close_section(); | |
794 | } | |
795 | f->close_section(); | |
796 | } | |
797 | ||
798 | RGWCoroutinesStack *RGWCoroutinesManager::allocate_stack() { | |
799 | return new RGWCoroutinesStack(cct, this); | |
800 | } | |
801 | ||
802 | string RGWCoroutinesManager::get_id() | |
803 | { | |
804 | if (!id.empty()) { | |
805 | return id; | |
806 | } | |
807 | stringstream ss; | |
808 | ss << (void *)this; | |
809 | return ss.str(); | |
810 | } | |
811 | ||
812 | void RGWCoroutinesManagerRegistry::add(RGWCoroutinesManager *mgr) | |
813 | { | |
814 | RWLock::WLocker wl(lock); | |
815 | if (managers.find(mgr) == managers.end()) { | |
816 | managers.insert(mgr); | |
817 | get(); | |
818 | } | |
819 | } | |
820 | ||
821 | void RGWCoroutinesManagerRegistry::remove(RGWCoroutinesManager *mgr) | |
822 | { | |
823 | RWLock::WLocker wl(lock); | |
824 | if (managers.find(mgr) != managers.end()) { | |
825 | managers.erase(mgr); | |
826 | put(); | |
827 | } | |
828 | } | |
829 | ||
830 | RGWCoroutinesManagerRegistry::~RGWCoroutinesManagerRegistry() | |
831 | { | |
832 | AdminSocket *admin_socket = cct->get_admin_socket(); | |
833 | if (!admin_command.empty()) { | |
834 | admin_socket->unregister_command(admin_command); | |
835 | } | |
836 | } | |
837 | ||
838 | int RGWCoroutinesManagerRegistry::hook_to_admin_command(const string& command) | |
839 | { | |
840 | AdminSocket *admin_socket = cct->get_admin_socket(); | |
841 | if (!admin_command.empty()) { | |
842 | admin_socket->unregister_command(admin_command); | |
843 | } | |
844 | admin_command = command; | |
845 | int r = admin_socket->register_command(admin_command, admin_command, this, | |
846 | "dump current coroutines stack state"); | |
847 | if (r < 0) { | |
848 | lderr(cct) << "ERROR: fail to register admin socket command (r=" << r << ")" << dendl; | |
849 | return r; | |
850 | } | |
851 | return 0; | |
852 | } | |
853 | ||
11fdf7f2 TL |
854 | bool RGWCoroutinesManagerRegistry::call(std::string_view command, |
855 | const cmdmap_t& cmdmap, | |
856 | std::string_view format, | |
857 | bufferlist& out) { | |
7c673cae FG |
858 | RWLock::RLocker rl(lock); |
859 | stringstream ss; | |
860 | JSONFormatter f; | |
861 | ::encode_json("cr_managers", *this, &f); | |
862 | f.flush(ss); | |
863 | out.append(ss); | |
864 | return true; | |
865 | } | |
866 | ||
867 | void RGWCoroutinesManagerRegistry::dump(Formatter *f) const { | |
868 | f->open_array_section("coroutine_managers"); | |
869 | for (auto m : managers) { | |
870 | ::encode_json("entry", *m, f); | |
871 | } | |
872 | f->close_section(); | |
873 | } | |
874 | ||
875 | void RGWCoroutine::call(RGWCoroutine *op) | |
876 | { | |
877 | stack->call(op); | |
878 | } | |
879 | ||
880 | RGWCoroutinesStack *RGWCoroutine::spawn(RGWCoroutine *op, bool wait) | |
881 | { | |
882 | return stack->spawn(this, op, wait); | |
883 | } | |
884 | ||
885 | bool RGWCoroutine::collect(int *ret, RGWCoroutinesStack *skip_stack) /* returns true if needs to be called again */ | |
886 | { | |
887 | return stack->collect(this, ret, skip_stack); | |
888 | } | |
889 | ||
890 | bool RGWCoroutine::collect_next(int *ret, RGWCoroutinesStack **collected_stack) /* returns true if found a stack to collect */ | |
891 | { | |
892 | return stack->collect_next(this, ret, collected_stack); | |
893 | } | |
894 | ||
895 | int RGWCoroutine::wait(const utime_t& interval) | |
896 | { | |
897 | return stack->wait(interval); | |
898 | } | |
899 | ||
900 | void RGWCoroutine::wait_for_child() | |
901 | { | |
902 | /* should only wait for child if there is a child that is not done yet, and no complete children */ | |
903 | if (spawned.entries.empty()) { | |
904 | return; | |
905 | } | |
906 | for (vector<RGWCoroutinesStack *>::iterator iter = spawned.entries.begin(); iter != spawned.entries.end(); ++iter) { | |
907 | if ((*iter)->is_done()) { | |
908 | return; | |
909 | } | |
910 | } | |
911 | stack->set_wait_for_child(true); | |
912 | } | |
913 | ||
914 | string RGWCoroutine::to_str() const | |
915 | { | |
916 | return typeid(*this).name(); | |
917 | } | |
918 | ||
919 | ostream& operator<<(ostream& out, const RGWCoroutine& cr) | |
920 | { | |
921 | out << "cr:s=" << (void *)cr.get_stack() << ":op=" << (void *)&cr << ":" << typeid(cr).name(); | |
922 | return out; | |
923 | } | |
924 | ||
925 | bool RGWCoroutine::drain_children(int num_cr_left, RGWCoroutinesStack *skip_stack) | |
926 | { | |
927 | bool done = false; | |
11fdf7f2 | 928 | ceph_assert(num_cr_left >= 0); |
7c673cae FG |
929 | if (num_cr_left == 0 && skip_stack) { |
930 | num_cr_left = 1; | |
931 | } | |
932 | reenter(&drain_cr) { | |
933 | while (num_spawned() > (size_t)num_cr_left) { | |
934 | yield wait_for_child(); | |
935 | int ret; | |
936 | while (collect(&ret, skip_stack)) { | |
937 | if (ret < 0) { | |
938 | ldout(cct, 10) << "collect() returned ret=" << ret << dendl; | |
939 | /* we should have reported this error */ | |
940 | log_error() << "ERROR: collect() returned error (ret=" << ret << ")"; | |
941 | } | |
942 | } | |
943 | } | |
944 | done = true; | |
945 | } | |
946 | return done; | |
947 | } | |
948 | ||
949 | void RGWCoroutine::wakeup() | |
950 | { | |
951 | stack->wakeup(); | |
952 | } | |
953 | ||
11fdf7f2 TL |
954 | RGWCoroutinesEnv *RGWCoroutine::get_env() const |
955 | { | |
956 | return stack->get_env(); | |
957 | } | |
958 | ||
7c673cae FG |
959 | void RGWCoroutine::dump(Formatter *f) const { |
960 | if (!description.str().empty()) { | |
961 | encode_json("description", description.str(), f); | |
962 | } | |
963 | encode_json("type", to_str(), f); | |
964 | if (!spawned.entries.empty()) { | |
965 | f->open_array_section("spawned"); | |
966 | for (auto& i : spawned.entries) { | |
967 | char buf[32]; | |
968 | snprintf(buf, sizeof(buf), "%p", (void *)i); | |
969 | encode_json("stack", string(buf), f); | |
970 | } | |
971 | f->close_section(); | |
972 | } | |
973 | if (!status.history.empty()) { | |
974 | encode_json("history", status.history, f); | |
975 | } | |
976 | ||
977 | if (!status.status.str().empty()) { | |
978 | f->open_object_section("status"); | |
979 | encode_json("status", status.status.str(), f); | |
980 | encode_json("timestamp", status.timestamp, f); | |
981 | f->close_section(); | |
982 | } | |
983 | } | |
984 | ||
985 | RGWSimpleCoroutine::~RGWSimpleCoroutine() | |
986 | { | |
987 | if (!called_cleanup) { | |
988 | request_cleanup(); | |
989 | } | |
990 | } | |
991 | ||
992 | void RGWSimpleCoroutine::call_cleanup() | |
993 | { | |
994 | called_cleanup = true; | |
995 | request_cleanup(); | |
996 | } | |
997 | ||
998 | int RGWSimpleCoroutine::operate() | |
999 | { | |
1000 | int ret = 0; | |
1001 | reenter(this) { | |
1002 | yield return state_init(); | |
1003 | yield return state_send_request(); | |
1004 | yield return state_request_complete(); | |
1005 | yield return state_all_complete(); | |
1006 | drain_all(); | |
1007 | call_cleanup(); | |
1008 | return set_state(RGWCoroutine_Done, ret); | |
1009 | } | |
1010 | return 0; | |
1011 | } | |
1012 | ||
1013 | int RGWSimpleCoroutine::state_init() | |
1014 | { | |
1015 | int ret = init(); | |
1016 | if (ret < 0) { | |
1017 | call_cleanup(); | |
1018 | return set_state(RGWCoroutine_Error, ret); | |
1019 | } | |
1020 | return 0; | |
1021 | } | |
1022 | ||
1023 | int RGWSimpleCoroutine::state_send_request() | |
1024 | { | |
1025 | int ret = send_request(); | |
1026 | if (ret < 0) { | |
1027 | call_cleanup(); | |
1028 | return set_state(RGWCoroutine_Error, ret); | |
1029 | } | |
1030 | return io_block(0); | |
1031 | } | |
1032 | ||
1033 | int RGWSimpleCoroutine::state_request_complete() | |
1034 | { | |
1035 | int ret = request_complete(); | |
1036 | if (ret < 0) { | |
1037 | call_cleanup(); | |
1038 | return set_state(RGWCoroutine_Error, ret); | |
1039 | } | |
1040 | return 0; | |
1041 | } | |
1042 | ||
1043 | int RGWSimpleCoroutine::state_all_complete() | |
1044 | { | |
1045 | int ret = finish(); | |
1046 | if (ret < 0) { | |
1047 | call_cleanup(); | |
1048 | return set_state(RGWCoroutine_Error, ret); | |
1049 | } | |
1050 | return 0; | |
1051 | } | |
1052 | ||
1053 |