]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | |
2 | #include "common/ceph_json.h" | |
7c673cae | 3 | #include "rgw_coroutine.h" |
7c673cae FG |
4 | |
5 | // re-include our assert to clobber the system one; fix dout: | |
6 | #include "include/assert.h" | |
7 | ||
31f18b77 FG |
8 | #include <boost/asio/yield.hpp> |
9 | ||
7c673cae FG |
10 | #define dout_subsys ceph_subsys_rgw |
11 | ||
12 | ||
13 | class RGWCompletionManager::WaitContext : public Context { | |
14 | RGWCompletionManager *manager; | |
15 | void *opaque; | |
16 | public: | |
17 | WaitContext(RGWCompletionManager *_cm, void *_opaque) : manager(_cm), opaque(_opaque) {} | |
18 | void finish(int r) override { | |
19 | manager->_wakeup(opaque); | |
20 | } | |
21 | }; | |
22 | ||
23 | RGWCompletionManager::RGWCompletionManager(CephContext *_cct) : cct(_cct), lock("RGWCompletionManager::lock"), | |
24 | timer(cct, lock) | |
25 | { | |
26 | timer.init(); | |
27 | } | |
28 | ||
29 | RGWCompletionManager::~RGWCompletionManager() | |
30 | { | |
31 | Mutex::Locker l(lock); | |
32 | timer.cancel_all_events(); | |
33 | timer.shutdown(); | |
34 | } | |
35 | ||
36 | void RGWCompletionManager::complete(RGWAioCompletionNotifier *cn, void *user_info) | |
37 | { | |
38 | Mutex::Locker l(lock); | |
39 | _complete(cn, user_info); | |
40 | } | |
41 | ||
42 | void RGWCompletionManager::register_completion_notifier(RGWAioCompletionNotifier *cn) | |
43 | { | |
44 | Mutex::Locker l(lock); | |
45 | if (cn) { | |
46 | cns.insert(cn); | |
47 | } | |
48 | } | |
49 | ||
50 | void RGWCompletionManager::unregister_completion_notifier(RGWAioCompletionNotifier *cn) | |
51 | { | |
52 | Mutex::Locker l(lock); | |
53 | if (cn) { | |
54 | cns.erase(cn); | |
55 | } | |
56 | } | |
57 | ||
58 | void RGWCompletionManager::_complete(RGWAioCompletionNotifier *cn, void *user_info) | |
59 | { | |
60 | if (cn) { | |
61 | cns.erase(cn); | |
62 | } | |
63 | complete_reqs.push_back(user_info); | |
64 | cond.Signal(); | |
65 | } | |
66 | ||
67 | int RGWCompletionManager::get_next(void **user_info) | |
68 | { | |
69 | Mutex::Locker l(lock); | |
70 | while (complete_reqs.empty()) { | |
71 | cond.Wait(lock); | |
72 | if (going_down) { | |
73 | return -ECANCELED; | |
74 | } | |
75 | } | |
76 | *user_info = complete_reqs.front(); | |
77 | complete_reqs.pop_front(); | |
78 | return 0; | |
79 | } | |
80 | ||
81 | bool RGWCompletionManager::try_get_next(void **user_info) | |
82 | { | |
83 | Mutex::Locker l(lock); | |
84 | if (complete_reqs.empty()) { | |
85 | return false; | |
86 | } | |
87 | *user_info = complete_reqs.front(); | |
88 | complete_reqs.pop_front(); | |
89 | return true; | |
90 | } | |
91 | ||
92 | void RGWCompletionManager::go_down() | |
93 | { | |
94 | Mutex::Locker l(lock); | |
95 | for (auto cn : cns) { | |
96 | cn->unregister(); | |
97 | } | |
98 | going_down = true; | |
99 | cond.Signal(); | |
100 | } | |
101 | ||
102 | void RGWCompletionManager::wait_interval(void *opaque, const utime_t& interval, void *user_info) | |
103 | { | |
104 | Mutex::Locker l(lock); | |
105 | assert(waiters.find(opaque) == waiters.end()); | |
106 | waiters[opaque] = user_info; | |
107 | timer.add_event_after(interval, new WaitContext(this, opaque)); | |
108 | } | |
109 | ||
110 | void RGWCompletionManager::wakeup(void *opaque) | |
111 | { | |
112 | Mutex::Locker l(lock); | |
113 | _wakeup(opaque); | |
114 | } | |
115 | ||
116 | void RGWCompletionManager::_wakeup(void *opaque) | |
117 | { | |
118 | map<void *, void *>::iterator iter = waiters.find(opaque); | |
119 | if (iter != waiters.end()) { | |
120 | void *user_id = iter->second; | |
121 | waiters.erase(iter); | |
122 | _complete(NULL, user_id); | |
123 | } | |
124 | } | |
125 | ||
126 | RGWCoroutine::~RGWCoroutine() { | |
127 | for (auto stack : spawned.entries) { | |
128 | stack->put(); | |
129 | } | |
130 | } | |
131 | ||
132 | void RGWCoroutine::set_io_blocked(bool flag) { | |
133 | stack->set_io_blocked(flag); | |
134 | } | |
135 | ||
136 | void RGWCoroutine::set_sleeping(bool flag) { | |
137 | stack->set_sleeping(flag); | |
138 | } | |
139 | ||
140 | int RGWCoroutine::io_block(int ret) { | |
141 | set_io_blocked(true); | |
142 | return ret; | |
143 | } | |
144 | ||
145 | void RGWCoroutine::StatusItem::dump(Formatter *f) const { | |
146 | ::encode_json("timestamp", timestamp, f); | |
147 | ::encode_json("status", status, f); | |
148 | } | |
149 | ||
150 | stringstream& RGWCoroutine::Status::set_status() | |
151 | { | |
152 | RWLock::WLocker l(lock); | |
153 | string s = status.str(); | |
154 | status.str(string()); | |
155 | if (!timestamp.is_zero()) { | |
156 | history.push_back(StatusItem(timestamp, s)); | |
157 | } | |
158 | if (history.size() > (size_t)max_history) { | |
159 | history.pop_front(); | |
160 | } | |
161 | timestamp = ceph_clock_now(); | |
162 | ||
163 | return status; | |
164 | } | |
165 | ||
166 | RGWCoroutinesStack::RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start) : cct(_cct), ops_mgr(_ops_mgr), | |
167 | done_flag(false), error_flag(false), blocked_flag(false), | |
168 | sleep_flag(false), interval_wait_flag(false), is_scheduled(false), is_waiting_for_child(false), | |
169 | retcode(0), run_count(0), | |
170 | env(NULL), parent(NULL) | |
171 | { | |
172 | if (start) { | |
173 | ops.push_back(start); | |
174 | } | |
175 | pos = ops.begin(); | |
176 | } | |
177 | ||
178 | RGWCoroutinesStack::~RGWCoroutinesStack() | |
179 | { | |
180 | for (auto op : ops) { | |
181 | op->put(); | |
182 | } | |
183 | ||
184 | for (auto stack : spawned.entries) { | |
185 | stack->put(); | |
186 | } | |
187 | } | |
188 | ||
189 | int RGWCoroutinesStack::operate(RGWCoroutinesEnv *_env) | |
190 | { | |
191 | env = _env; | |
192 | RGWCoroutine *op = *pos; | |
193 | op->stack = this; | |
194 | ldout(cct, 20) << *op << ": operate()" << dendl; | |
195 | int r = op->operate(); | |
196 | if (r < 0) { | |
197 | ldout(cct, 20) << *op << ": operate() returned r=" << r << dendl; | |
198 | } | |
199 | ||
200 | error_flag = op->is_error(); | |
201 | ||
202 | if (op->is_done()) { | |
203 | int op_retcode = r; | |
204 | r = unwind(op_retcode); | |
205 | op->put(); | |
206 | done_flag = (pos == ops.end()); | |
207 | if (done_flag) { | |
208 | retcode = op_retcode; | |
209 | } | |
210 | return r; | |
211 | } | |
212 | ||
213 | /* should r ever be negative at this point? */ | |
214 | assert(r >= 0); | |
215 | ||
216 | return 0; | |
217 | } | |
218 | ||
219 | string RGWCoroutinesStack::error_str() | |
220 | { | |
221 | if (pos != ops.end()) { | |
222 | return (*pos)->error_str(); | |
223 | } | |
224 | return string(); | |
225 | } | |
226 | ||
227 | void RGWCoroutinesStack::call(RGWCoroutine *next_op) { | |
228 | if (!next_op) { | |
229 | return; | |
230 | } | |
231 | ops.push_back(next_op); | |
232 | if (pos != ops.end()) { | |
233 | ++pos; | |
234 | } else { | |
235 | pos = ops.begin(); | |
236 | } | |
237 | } | |
238 | ||
239 | RGWCoroutinesStack *RGWCoroutinesStack::spawn(RGWCoroutine *source_op, RGWCoroutine *op, bool wait) | |
240 | { | |
241 | if (!op) { | |
242 | return NULL; | |
243 | } | |
244 | ||
245 | rgw_spawned_stacks *s = (source_op ? &source_op->spawned : &spawned); | |
246 | ||
247 | RGWCoroutinesStack *stack = env->manager->allocate_stack(); | |
248 | s->add_pending(stack); | |
249 | stack->parent = this; | |
250 | ||
251 | stack->get(); /* we'll need to collect the stack */ | |
252 | stack->call(op); | |
253 | ||
254 | env->manager->schedule(env, stack); | |
255 | ||
256 | if (wait) { | |
257 | set_blocked_by(stack); | |
258 | } | |
259 | ||
260 | return stack; | |
261 | } | |
262 | ||
263 | RGWCoroutinesStack *RGWCoroutinesStack::spawn(RGWCoroutine *op, bool wait) | |
264 | { | |
265 | return spawn(NULL, op, wait); | |
266 | } | |
267 | ||
268 | int RGWCoroutinesStack::wait(const utime_t& interval) | |
269 | { | |
270 | RGWCompletionManager *completion_mgr = env->manager->get_completion_mgr(); | |
271 | completion_mgr->wait_interval((void *)this, interval, (void *)this); | |
272 | set_io_blocked(true); | |
273 | set_interval_wait(true); | |
274 | return 0; | |
275 | } | |
276 | ||
277 | void RGWCoroutinesStack::wakeup() | |
278 | { | |
279 | RGWCompletionManager *completion_mgr = env->manager->get_completion_mgr(); | |
280 | completion_mgr->wakeup((void *)this); | |
281 | } | |
282 | ||
283 | int RGWCoroutinesStack::unwind(int retcode) | |
284 | { | |
285 | rgw_spawned_stacks *src_spawned = &(*pos)->spawned; | |
286 | ||
287 | if (pos == ops.begin()) { | |
288 | spawned.inherit(src_spawned); | |
289 | ops.clear(); | |
290 | pos = ops.end(); | |
291 | return retcode; | |
292 | } | |
293 | ||
294 | --pos; | |
295 | ops.pop_back(); | |
296 | RGWCoroutine *op = *pos; | |
297 | op->set_retcode(retcode); | |
298 | op->spawned.inherit(src_spawned); | |
299 | return 0; | |
300 | } | |
301 | ||
302 | ||
303 | bool RGWCoroutinesStack::collect(RGWCoroutine *op, int *ret, RGWCoroutinesStack *skip_stack) /* returns true if needs to be called again */ | |
304 | { | |
305 | bool done = true; | |
306 | rgw_spawned_stacks *s = (op ? &op->spawned : &spawned); | |
307 | *ret = 0; | |
308 | vector<RGWCoroutinesStack *> new_list; | |
309 | ||
310 | for (vector<RGWCoroutinesStack *>::iterator iter = s->entries.begin(); iter != s->entries.end(); ++iter) { | |
311 | RGWCoroutinesStack *stack = *iter; | |
312 | if (stack == skip_stack || !stack->is_done()) { | |
313 | new_list.push_back(stack); | |
314 | if (!stack->is_done()) { | |
315 | ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " is still running" << dendl; | |
316 | } else if (stack == skip_stack) { | |
317 | ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " explicitly skipping stack" << dendl; | |
318 | } | |
319 | continue; | |
320 | } | |
321 | int r = stack->get_ret_status(); | |
322 | stack->put(); | |
323 | if (r < 0) { | |
324 | *ret = r; | |
325 | ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " encountered error (r=" << r << "), skipping next stacks" << dendl; | |
326 | new_list.insert(new_list.end(), ++iter, s->entries.end()); | |
327 | done &= (iter != s->entries.end()); | |
328 | break; | |
329 | } | |
330 | ||
331 | ldout(cct, 20) << "collect(): s=" << (void *)this << " stack=" << (void *)stack << " is complete" << dendl; | |
332 | } | |
333 | ||
334 | s->entries.swap(new_list); | |
335 | return (!done); | |
336 | } | |
337 | ||
338 | bool RGWCoroutinesStack::collect_next(RGWCoroutine *op, int *ret, RGWCoroutinesStack **collected_stack) /* returns true if found a stack to collect */ | |
339 | { | |
340 | rgw_spawned_stacks *s = (op ? &op->spawned : &spawned); | |
341 | *ret = 0; | |
342 | ||
343 | if (collected_stack) { | |
344 | *collected_stack = NULL; | |
345 | } | |
346 | ||
347 | for (vector<RGWCoroutinesStack *>::iterator iter = s->entries.begin(); iter != s->entries.end(); ++iter) { | |
348 | RGWCoroutinesStack *stack = *iter; | |
349 | if (!stack->is_done()) { | |
350 | continue; | |
351 | } | |
352 | int r = stack->get_ret_status(); | |
353 | if (r < 0) { | |
354 | *ret = r; | |
355 | } | |
356 | ||
357 | if (collected_stack) { | |
358 | *collected_stack = stack; | |
359 | } | |
360 | stack->put(); | |
361 | ||
362 | s->entries.erase(iter); | |
363 | return true; | |
364 | } | |
365 | ||
366 | return false; | |
367 | } | |
368 | ||
369 | bool RGWCoroutinesStack::collect(int *ret, RGWCoroutinesStack *skip_stack) /* returns true if needs to be called again */ | |
370 | { | |
371 | return collect(NULL, ret, skip_stack); | |
372 | } | |
373 | ||
374 | static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg) | |
375 | { | |
376 | ((RGWAioCompletionNotifier *)arg)->cb(); | |
377 | } | |
378 | ||
379 | RGWAioCompletionNotifier::RGWAioCompletionNotifier(RGWCompletionManager *_mgr, void *_user_data) : completion_mgr(_mgr), | |
380 | user_data(_user_data), lock("RGWAioCompletionNotifier"), registered(true) { | |
381 | c = librados::Rados::aio_create_completion((void *)this, NULL, | |
382 | _aio_completion_notifier_cb); | |
383 | } | |
384 | ||
385 | RGWAioCompletionNotifier *RGWCoroutinesStack::create_completion_notifier() | |
386 | { | |
387 | return ops_mgr->create_completion_notifier(this); | |
388 | } | |
389 | ||
390 | RGWCompletionManager *RGWCoroutinesStack::get_completion_mgr() | |
391 | { | |
392 | return ops_mgr->get_completion_mgr(); | |
393 | } | |
394 | ||
395 | bool RGWCoroutinesStack::unblock_stack(RGWCoroutinesStack **s) | |
396 | { | |
397 | if (blocking_stacks.empty()) { | |
398 | return false; | |
399 | } | |
400 | ||
401 | set<RGWCoroutinesStack *>::iterator iter = blocking_stacks.begin(); | |
402 | *s = *iter; | |
403 | blocking_stacks.erase(iter); | |
404 | (*s)->blocked_by_stack.erase(this); | |
405 | ||
406 | return true; | |
407 | } | |
408 | ||
409 | void RGWCoroutinesManager::report_error(RGWCoroutinesStack *op) | |
410 | { | |
411 | if (!op) { | |
412 | return; | |
413 | } | |
414 | string err = op->error_str(); | |
415 | if (err.empty()) { | |
416 | return; | |
417 | } | |
418 | lderr(cct) << "ERROR: failed operation: " << op->error_str() << dendl; | |
419 | } | |
420 | ||
421 | void RGWCoroutinesStack::dump(Formatter *f) const { | |
422 | stringstream ss; | |
423 | ss << (void *)this; | |
424 | ::encode_json("stack", ss.str(), f); | |
425 | ::encode_json("run_count", run_count, f); | |
426 | f->open_array_section("ops"); | |
427 | for (auto& i : ops) { | |
428 | encode_json("op", *i, f); | |
429 | } | |
430 | f->close_section(); | |
431 | } | |
432 | ||
433 | void RGWCoroutinesManager::handle_unblocked_stack(set<RGWCoroutinesStack *>& context_stacks, list<RGWCoroutinesStack *>& scheduled_stacks, RGWCoroutinesStack *stack, int *blocked_count) | |
434 | { | |
435 | RWLock::WLocker wl(lock); | |
436 | --(*blocked_count); | |
437 | stack->set_io_blocked(false); | |
438 | stack->set_interval_wait(false); | |
439 | if (!stack->is_done()) { | |
440 | scheduled_stacks.push_back(stack); | |
441 | } else { | |
442 | RWLock::WLocker wl(lock); | |
443 | context_stacks.erase(stack); | |
444 | stack->put(); | |
445 | } | |
446 | } | |
447 | ||
448 | void RGWCoroutinesManager::schedule(RGWCoroutinesEnv *env, RGWCoroutinesStack *stack) | |
449 | { | |
450 | assert(lock.is_wlocked()); | |
451 | env->scheduled_stacks->push_back(stack); | |
452 | set<RGWCoroutinesStack *>& context_stacks = run_contexts[env->run_context]; | |
453 | context_stacks.insert(stack); | |
454 | } | |
455 | ||
456 | int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks) | |
457 | { | |
458 | int ret = 0; | |
459 | int blocked_count = 0; | |
460 | int interval_wait_count = 0; | |
461 | bool canceled = false; // set on going_down | |
462 | RGWCoroutinesEnv env; | |
463 | ||
464 | uint64_t run_context = ++run_context_count; | |
465 | ||
466 | lock.get_write(); | |
467 | set<RGWCoroutinesStack *>& context_stacks = run_contexts[run_context]; | |
468 | list<RGWCoroutinesStack *> scheduled_stacks; | |
469 | for (auto& st : stacks) { | |
470 | context_stacks.insert(st); | |
471 | scheduled_stacks.push_back(st); | |
472 | } | |
473 | lock.unlock(); | |
474 | ||
475 | env.run_context = run_context; | |
476 | env.manager = this; | |
477 | env.scheduled_stacks = &scheduled_stacks; | |
478 | ||
479 | for (list<RGWCoroutinesStack *>::iterator iter = scheduled_stacks.begin(); iter != scheduled_stacks.end() && !going_down;) { | |
480 | lock.get_write(); | |
481 | ||
482 | RGWCoroutinesStack *stack = *iter; | |
483 | env.stack = stack; | |
484 | ||
485 | ret = stack->operate(&env); | |
486 | stack->set_is_scheduled(false); | |
487 | if (ret < 0) { | |
488 | ldout(cct, 20) << "stack->operate() returned ret=" << ret << dendl; | |
489 | } | |
490 | ||
491 | if (stack->is_error()) { | |
492 | report_error(stack); | |
493 | } | |
494 | ||
495 | bool op_not_blocked = false; | |
496 | ||
497 | if (stack->is_io_blocked()) { | |
498 | ldout(cct, 20) << __func__ << ":" << " stack=" << (void *)stack << " is io blocked" << dendl; | |
499 | if (stack->is_interval_waiting()) { | |
500 | interval_wait_count++; | |
501 | } | |
502 | blocked_count++; | |
503 | } else if (stack->is_blocked()) { | |
504 | /* do nothing, we'll re-add the stack when the blocking stack is done, | |
505 | * or when we're awaken | |
506 | */ | |
507 | ldout(cct, 20) << __func__ << ":" << " stack=" << (void *)stack << " is_blocked_by_stack()=" << stack->is_blocked_by_stack() | |
508 | << " is_sleeping=" << stack->is_sleeping() << " waiting_for_child()=" << stack->waiting_for_child() << dendl; | |
509 | } else if (stack->is_done()) { | |
510 | ldout(cct, 20) << __func__ << ":" << " stack=" << (void *)stack << " is done" << dendl; | |
511 | RGWCoroutinesStack *s; | |
512 | while (stack->unblock_stack(&s)) { | |
513 | if (!s->is_blocked_by_stack() && !s->is_done()) { | |
514 | if (s->is_io_blocked()) { | |
515 | if (stack->is_interval_waiting()) { | |
516 | interval_wait_count++; | |
517 | } | |
518 | blocked_count++; | |
519 | } else { | |
520 | s->schedule(); | |
521 | } | |
522 | } | |
523 | } | |
524 | if (stack->parent && stack->parent->waiting_for_child()) { | |
525 | stack->parent->set_wait_for_child(false); | |
526 | stack->parent->schedule(); | |
527 | } | |
528 | context_stacks.erase(stack); | |
529 | stack->put(); | |
530 | stack = NULL; | |
531 | } else { | |
532 | op_not_blocked = true; | |
533 | stack->run_count++; | |
534 | stack->schedule(); | |
535 | } | |
536 | ||
537 | if (!op_not_blocked && stack) { | |
538 | stack->run_count = 0; | |
539 | } | |
540 | ||
541 | lock.unlock(); | |
542 | ||
543 | RGWCoroutinesStack *blocked_stack; | |
544 | while (completion_mgr->try_get_next((void **)&blocked_stack)) { | |
545 | handle_unblocked_stack(context_stacks, scheduled_stacks, blocked_stack, &blocked_count); | |
546 | } | |
547 | ||
548 | /* | |
549 | * only account blocked operations that are not in interval_wait, these are stacks that | |
550 | * were put on a wait without any real IO operations. While we mark these as io_blocked, | |
551 | * these aren't really waiting for IOs | |
552 | */ | |
553 | while (blocked_count - interval_wait_count >= ops_window) { | |
554 | ret = completion_mgr->get_next((void **)&blocked_stack); | |
555 | if (ret < 0) { | |
28e407b8 | 556 | ldout(cct, 5) << "completion_mgr.get_next() returned ret=" << ret << dendl; |
7c673cae FG |
557 | } |
558 | handle_unblocked_stack(context_stacks, scheduled_stacks, blocked_stack, &blocked_count); | |
559 | } | |
560 | ||
561 | ++iter; | |
562 | scheduled_stacks.pop_front(); | |
563 | ||
564 | ||
565 | while (scheduled_stacks.empty() && blocked_count > 0) { | |
566 | ret = completion_mgr->get_next((void **)&blocked_stack); | |
567 | if (ret < 0) { | |
28e407b8 | 568 | ldout(cct, 5) << "completion_mgr.get_next() returned ret=" << ret << dendl; |
7c673cae FG |
569 | } |
570 | if (going_down) { | |
571 | ldout(cct, 5) << __func__ << "(): was stopped, exiting" << dendl; | |
572 | ret = -ECANCELED; | |
573 | canceled = true; | |
574 | break; | |
575 | } | |
576 | handle_unblocked_stack(context_stacks, scheduled_stacks, blocked_stack, &blocked_count); | |
577 | iter = scheduled_stacks.begin(); | |
578 | } | |
579 | if (canceled) { | |
580 | break; | |
581 | } | |
582 | ||
583 | if (iter == scheduled_stacks.end()) { | |
584 | iter = scheduled_stacks.begin(); | |
585 | } | |
586 | } | |
587 | ||
588 | lock.get_write(); | |
589 | if (!context_stacks.empty() && !going_down) { | |
590 | JSONFormatter formatter(true); | |
591 | formatter.open_array_section("context_stacks"); | |
592 | for (auto& s : context_stacks) { | |
593 | ::encode_json("entry", *s, &formatter); | |
594 | } | |
595 | formatter.close_section(); | |
596 | lderr(cct) << __func__ << "(): ERROR: deadlock detected, dumping remaining coroutines:\n"; | |
597 | formatter.flush(*_dout); | |
598 | *_dout << dendl; | |
599 | assert(context_stacks.empty() || going_down); // assert on deadlock | |
600 | } | |
601 | ||
602 | for (auto stack : context_stacks) { | |
603 | ldout(cct, 20) << "clearing stack on run() exit: stack=" << (void *)stack << " nref=" << stack->get_nref() << dendl; | |
604 | stack->put(); | |
605 | } | |
606 | run_contexts.erase(run_context); | |
607 | lock.unlock(); | |
608 | ||
609 | return ret; | |
610 | } | |
611 | ||
612 | int RGWCoroutinesManager::run(RGWCoroutine *op) | |
613 | { | |
614 | if (!op) { | |
615 | return 0; | |
616 | } | |
617 | list<RGWCoroutinesStack *> stacks; | |
618 | RGWCoroutinesStack *stack = allocate_stack(); | |
619 | op->get(); | |
620 | stack->call(op); | |
621 | ||
622 | stack->schedule(&stacks); | |
623 | ||
624 | int r = run(stacks); | |
625 | if (r < 0) { | |
626 | ldout(cct, 20) << "run(stacks) returned r=" << r << dendl; | |
627 | } else { | |
628 | r = op->get_ret_status(); | |
629 | } | |
630 | op->put(); | |
631 | ||
632 | return r; | |
633 | } | |
634 | ||
635 | RGWAioCompletionNotifier *RGWCoroutinesManager::create_completion_notifier(RGWCoroutinesStack *stack) | |
636 | { | |
637 | RGWAioCompletionNotifier *cn = new RGWAioCompletionNotifier(completion_mgr, (void *)stack); | |
638 | completion_mgr->register_completion_notifier(cn); | |
639 | return cn; | |
640 | } | |
641 | ||
642 | void RGWCoroutinesManager::dump(Formatter *f) const { | |
643 | RWLock::RLocker rl(lock); | |
644 | ||
645 | f->open_array_section("run_contexts"); | |
646 | for (auto& i : run_contexts) { | |
647 | f->open_object_section("context"); | |
648 | ::encode_json("id", i.first, f); | |
649 | f->open_array_section("entries"); | |
650 | for (auto& s : i.second) { | |
651 | ::encode_json("entry", *s, f); | |
652 | } | |
653 | f->close_section(); | |
654 | f->close_section(); | |
655 | } | |
656 | f->close_section(); | |
657 | } | |
658 | ||
659 | RGWCoroutinesStack *RGWCoroutinesManager::allocate_stack() { | |
660 | return new RGWCoroutinesStack(cct, this); | |
661 | } | |
662 | ||
663 | string RGWCoroutinesManager::get_id() | |
664 | { | |
665 | if (!id.empty()) { | |
666 | return id; | |
667 | } | |
668 | stringstream ss; | |
669 | ss << (void *)this; | |
670 | return ss.str(); | |
671 | } | |
672 | ||
673 | void RGWCoroutinesManagerRegistry::add(RGWCoroutinesManager *mgr) | |
674 | { | |
675 | RWLock::WLocker wl(lock); | |
676 | if (managers.find(mgr) == managers.end()) { | |
677 | managers.insert(mgr); | |
678 | get(); | |
679 | } | |
680 | } | |
681 | ||
682 | void RGWCoroutinesManagerRegistry::remove(RGWCoroutinesManager *mgr) | |
683 | { | |
684 | RWLock::WLocker wl(lock); | |
685 | if (managers.find(mgr) != managers.end()) { | |
686 | managers.erase(mgr); | |
687 | put(); | |
688 | } | |
689 | } | |
690 | ||
691 | RGWCoroutinesManagerRegistry::~RGWCoroutinesManagerRegistry() | |
692 | { | |
693 | AdminSocket *admin_socket = cct->get_admin_socket(); | |
694 | if (!admin_command.empty()) { | |
695 | admin_socket->unregister_command(admin_command); | |
696 | } | |
697 | } | |
698 | ||
699 | int RGWCoroutinesManagerRegistry::hook_to_admin_command(const string& command) | |
700 | { | |
701 | AdminSocket *admin_socket = cct->get_admin_socket(); | |
702 | if (!admin_command.empty()) { | |
703 | admin_socket->unregister_command(admin_command); | |
704 | } | |
705 | admin_command = command; | |
706 | int r = admin_socket->register_command(admin_command, admin_command, this, | |
707 | "dump current coroutines stack state"); | |
708 | if (r < 0) { | |
709 | lderr(cct) << "ERROR: fail to register admin socket command (r=" << r << ")" << dendl; | |
710 | return r; | |
711 | } | |
712 | return 0; | |
713 | } | |
714 | ||
715 | bool RGWCoroutinesManagerRegistry::call(std::string command, cmdmap_t& cmdmap, std::string format, | |
716 | bufferlist& out) { | |
717 | RWLock::RLocker rl(lock); | |
718 | stringstream ss; | |
719 | JSONFormatter f; | |
720 | ::encode_json("cr_managers", *this, &f); | |
721 | f.flush(ss); | |
722 | out.append(ss); | |
723 | return true; | |
724 | } | |
725 | ||
726 | void RGWCoroutinesManagerRegistry::dump(Formatter *f) const { | |
727 | f->open_array_section("coroutine_managers"); | |
728 | for (auto m : managers) { | |
729 | ::encode_json("entry", *m, f); | |
730 | } | |
731 | f->close_section(); | |
732 | } | |
733 | ||
734 | void RGWCoroutine::call(RGWCoroutine *op) | |
735 | { | |
736 | stack->call(op); | |
737 | } | |
738 | ||
739 | RGWCoroutinesStack *RGWCoroutine::spawn(RGWCoroutine *op, bool wait) | |
740 | { | |
741 | return stack->spawn(this, op, wait); | |
742 | } | |
743 | ||
744 | bool RGWCoroutine::collect(int *ret, RGWCoroutinesStack *skip_stack) /* returns true if needs to be called again */ | |
745 | { | |
746 | return stack->collect(this, ret, skip_stack); | |
747 | } | |
748 | ||
749 | bool RGWCoroutine::collect_next(int *ret, RGWCoroutinesStack **collected_stack) /* returns true if found a stack to collect */ | |
750 | { | |
751 | return stack->collect_next(this, ret, collected_stack); | |
752 | } | |
753 | ||
754 | int RGWCoroutine::wait(const utime_t& interval) | |
755 | { | |
756 | return stack->wait(interval); | |
757 | } | |
758 | ||
759 | void RGWCoroutine::wait_for_child() | |
760 | { | |
761 | /* should only wait for child if there is a child that is not done yet, and no complete children */ | |
762 | if (spawned.entries.empty()) { | |
763 | return; | |
764 | } | |
765 | for (vector<RGWCoroutinesStack *>::iterator iter = spawned.entries.begin(); iter != spawned.entries.end(); ++iter) { | |
766 | if ((*iter)->is_done()) { | |
767 | return; | |
768 | } | |
769 | } | |
770 | stack->set_wait_for_child(true); | |
771 | } | |
772 | ||
773 | string RGWCoroutine::to_str() const | |
774 | { | |
775 | return typeid(*this).name(); | |
776 | } | |
777 | ||
778 | ostream& operator<<(ostream& out, const RGWCoroutine& cr) | |
779 | { | |
780 | out << "cr:s=" << (void *)cr.get_stack() << ":op=" << (void *)&cr << ":" << typeid(cr).name(); | |
781 | return out; | |
782 | } | |
783 | ||
784 | bool RGWCoroutine::drain_children(int num_cr_left, RGWCoroutinesStack *skip_stack) | |
785 | { | |
786 | bool done = false; | |
787 | assert(num_cr_left >= 0); | |
788 | if (num_cr_left == 0 && skip_stack) { | |
789 | num_cr_left = 1; | |
790 | } | |
791 | reenter(&drain_cr) { | |
792 | while (num_spawned() > (size_t)num_cr_left) { | |
793 | yield wait_for_child(); | |
794 | int ret; | |
795 | while (collect(&ret, skip_stack)) { | |
796 | if (ret < 0) { | |
797 | ldout(cct, 10) << "collect() returned ret=" << ret << dendl; | |
798 | /* we should have reported this error */ | |
799 | log_error() << "ERROR: collect() returned error (ret=" << ret << ")"; | |
800 | } | |
801 | } | |
802 | } | |
803 | done = true; | |
804 | } | |
805 | return done; | |
806 | } | |
807 | ||
808 | void RGWCoroutine::wakeup() | |
809 | { | |
810 | stack->wakeup(); | |
811 | } | |
812 | ||
813 | void RGWCoroutine::dump(Formatter *f) const { | |
814 | if (!description.str().empty()) { | |
815 | encode_json("description", description.str(), f); | |
816 | } | |
817 | encode_json("type", to_str(), f); | |
818 | if (!spawned.entries.empty()) { | |
819 | f->open_array_section("spawned"); | |
820 | for (auto& i : spawned.entries) { | |
821 | char buf[32]; | |
822 | snprintf(buf, sizeof(buf), "%p", (void *)i); | |
823 | encode_json("stack", string(buf), f); | |
824 | } | |
825 | f->close_section(); | |
826 | } | |
827 | if (!status.history.empty()) { | |
828 | encode_json("history", status.history, f); | |
829 | } | |
830 | ||
831 | if (!status.status.str().empty()) { | |
832 | f->open_object_section("status"); | |
833 | encode_json("status", status.status.str(), f); | |
834 | encode_json("timestamp", status.timestamp, f); | |
835 | f->close_section(); | |
836 | } | |
837 | } | |
838 | ||
839 | RGWSimpleCoroutine::~RGWSimpleCoroutine() | |
840 | { | |
841 | if (!called_cleanup) { | |
842 | request_cleanup(); | |
843 | } | |
844 | } | |
845 | ||
846 | void RGWSimpleCoroutine::call_cleanup() | |
847 | { | |
848 | called_cleanup = true; | |
849 | request_cleanup(); | |
850 | } | |
851 | ||
852 | int RGWSimpleCoroutine::operate() | |
853 | { | |
854 | int ret = 0; | |
855 | reenter(this) { | |
856 | yield return state_init(); | |
857 | yield return state_send_request(); | |
858 | yield return state_request_complete(); | |
859 | yield return state_all_complete(); | |
860 | drain_all(); | |
861 | call_cleanup(); | |
862 | return set_state(RGWCoroutine_Done, ret); | |
863 | } | |
864 | return 0; | |
865 | } | |
866 | ||
867 | int RGWSimpleCoroutine::state_init() | |
868 | { | |
869 | int ret = init(); | |
870 | if (ret < 0) { | |
871 | call_cleanup(); | |
872 | return set_state(RGWCoroutine_Error, ret); | |
873 | } | |
874 | return 0; | |
875 | } | |
876 | ||
877 | int RGWSimpleCoroutine::state_send_request() | |
878 | { | |
879 | int ret = send_request(); | |
880 | if (ret < 0) { | |
881 | call_cleanup(); | |
882 | return set_state(RGWCoroutine_Error, ret); | |
883 | } | |
884 | return io_block(0); | |
885 | } | |
886 | ||
887 | int RGWSimpleCoroutine::state_request_complete() | |
888 | { | |
889 | int ret = request_complete(); | |
890 | if (ret < 0) { | |
891 | call_cleanup(); | |
892 | return set_state(RGWCoroutine_Error, ret); | |
893 | } | |
894 | return 0; | |
895 | } | |
896 | ||
897 | int RGWSimpleCoroutine::state_all_complete() | |
898 | { | |
899 | int ret = finish(); | |
900 | if (ret < 0) { | |
901 | call_cleanup(); | |
902 | return set_state(RGWCoroutine_Error, ret); | |
903 | } | |
904 | return 0; | |
905 | } | |
906 | ||
907 |