} else {
complete_cb(rbd_comp, complete_arg);
complete_event_socket();
+ notify_callbacks_complete();
}
} else {
complete_event_socket();
+ notify_callbacks_complete();
}
- state = AIO_STATE_COMPLETE;
- {
- std::unique_lock<std::mutex> locker(lock);
- cond.notify_all();
- }
-
- // note: possible for image to be closed after op marked finished
- if (async_op.started()) {
- async_op.finish_op();
- }
tracepoint(librbd, aio_complete_exit);
}
void AioCompletion::complete_external_callback() {
// ensure librbd external users never experience concurrent callbacks
// from multiple librbd-internal threads.
- ictx->external_callback_completions.push(this);
-
- while (true) {
- if (ictx->external_callback_in_progress.exchange(true)) {
- // another thread is concurrently invoking external callbacks
- break;
- }
-
- AioCompletion* aio_comp;
- while (ictx->external_callback_completions.pop(aio_comp)) {
- aio_comp->complete_cb(aio_comp->rbd_comp, aio_comp->complete_arg);
- aio_comp->complete_event_socket();
- }
-
- ictx->external_callback_in_progress.store(false);
- if (ictx->external_callback_completions.empty()) {
- // queue still empty implies we didn't have a race between the last failed
- // pop and resetting the in-progress state
- break;
- }
- }
+ get();
+ ictx->op_work_queue->queue(new LambdaContext([this](int r) {
+ complete_cb(rbd_comp, complete_arg);
+ complete_event_socket();
+ notify_callbacks_complete();
+ put();
+ }));
}
void AioCompletion::complete_event_socket() {
}
}
+void AioCompletion::notify_callbacks_complete() {
+ state = AIO_STATE_COMPLETE;
+
+ {
+ std::unique_lock<std::mutex> locker(lock);
+ cond.notify_all();
+ }
+
+ // note: possible for image to be closed after op marked finished
+ if (async_op.started()) {
+ async_op.finish_op();
+ }
+}
+
} // namespace io
} // namespace librbd