max_purge_ops(0),
drain_initial(0),
draining(false),
- delayed_flush(nullptr)
+ delayed_flush(nullptr),
+ recovered(false)
{
assert(cct != nullptr);
assert(on_error != nullptr);
Mutex::Locker l(lock);
- journaler.recover(new FunctionContext([this, completion](int r){
+ if (completion)
+ waiting_for_recovery.push_back(completion);
+
+ journaler.recover(new FunctionContext([this](int r){
if (r == -ENOENT) {
dout(1) << "Purge Queue not found, assuming this is an upgrade and "
"creating it." << dendl;
- create(completion);
+ create(NULL);
} else if (r == 0) {
Mutex::Locker l(lock);
dout(4) << "open complete" << dendl;
if (journaler.last_committed.write_pos < journaler.get_write_pos()) {
dout(4) << "recovering write_pos" << dendl;
journaler.set_read_pos(journaler.last_committed.write_pos);
- _recover(completion);
+ _recover();
return;
}
journaler.set_writeable();
- completion->complete(0);
+ recovered = true;
+ finish_contexts(g_ceph_context, waiting_for_recovery);
} else {
derr << "Error " << r << " loading Journaler" << dendl;
on_error->complete(r);
}));
}
+void PurgeQueue::wait_for_recovery(Context* c)
+{
+ Mutex::Locker l(lock);
+ if (recovered)
+ c->complete(0);
+ else
+ waiting_for_recovery.push_back(c);
+}
-void PurgeQueue::_recover(Context *completion)
+void PurgeQueue::_recover()
{
assert(lock.is_locked_by_me());
if (!journaler.is_readable() &&
!journaler.get_error() &&
journaler.get_read_pos() < journaler.get_write_pos()) {
- journaler.wait_for_readable(new FunctionContext([this, completion](int r) {
+ journaler.wait_for_readable(new FunctionContext([this](int r) {
Mutex::Locker l(lock);
- _recover(completion);
+ _recover();
}));
return;
}
// restore original read_pos
journaler.set_read_pos(journaler.last_committed.expire_pos);
journaler.set_writeable();
- completion->complete(0);
+ recovered = true;
+ finish_contexts(g_ceph_context, waiting_for_recovery);
return;
}
dout(4) << "creating" << dendl;
Mutex::Locker l(lock);
+ if (fin)
+ waiting_for_recovery.push_back(fin);
+
file_layout_t layout = file_layout_t::get_default();
layout.pool_id = metadata_pool;
journaler.set_writeable();
journaler.create(&layout, JOURNAL_FORMAT_RESILIENT);
- journaler.write_head(fin);
+ journaler.write_head(new FunctionContext([this](int r) {
+ Mutex::Locker l(lock);
+ recovered = true;
+ finish_contexts(g_ceph_context, waiting_for_recovery);
+ }));
}
/**