]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mds/PurgeQueue.cc
update sources to 12.2.2
[ceph.git] / ceph / src / mds / PurgeQueue.cc
index f520240da3dae16647c4a808b9d2bce720bfb68f..49e48b04cc6d12700a4ff97ab149177a8a2f5c2b 100644 (file)
@@ -79,7 +79,8 @@ PurgeQueue::PurgeQueue(
     max_purge_ops(0),
     drain_initial(0),
     draining(false),
-    delayed_flush(nullptr)
+    delayed_flush(nullptr),
+    recovered(false)
 {
   assert(cct != nullptr);
   assert(on_error != nullptr);
@@ -147,11 +148,14 @@ void PurgeQueue::open(Context *completion)
 
   Mutex::Locker l(lock);
 
-  journaler.recover(new FunctionContext([this, completion](int r){
+  if (completion)
+    waiting_for_recovery.push_back(completion);
+
+  journaler.recover(new FunctionContext([this](int r){
     if (r == -ENOENT) {
       dout(1) << "Purge Queue not found, assuming this is an upgrade and "
                  "creating it." << dendl;
-      create(completion);
+      create(NULL);
     } else if (r == 0) {
       Mutex::Locker l(lock);
       dout(4) << "open complete" << dendl;
@@ -162,12 +166,13 @@ void PurgeQueue::open(Context *completion)
       if (journaler.last_committed.write_pos < journaler.get_write_pos()) {
        dout(4) << "recovering write_pos" << dendl;
        journaler.set_read_pos(journaler.last_committed.write_pos);
-       _recover(completion);
+       _recover();
        return;
       }
 
       journaler.set_writeable();
-      completion->complete(0);
+      recovered = true;
+      finish_contexts(g_ceph_context, waiting_for_recovery);
     } else {
       derr << "Error " << r << " loading Journaler" << dendl;
       on_error->complete(r);
@@ -175,8 +180,16 @@ void PurgeQueue::open(Context *completion)
   }));
 }
 
+void PurgeQueue::wait_for_recovery(Context* c)
+{
+  Mutex::Locker l(lock);
+  if (recovered)
+    c->complete(0);
+  else
+    waiting_for_recovery.push_back(c);
+}
 
-void PurgeQueue::_recover(Context *completion)
+void PurgeQueue::_recover()
 {
   assert(lock.is_locked_by_me());
 
@@ -185,9 +198,9 @@ void PurgeQueue::_recover(Context *completion)
     if (!journaler.is_readable() &&
        !journaler.get_error() &&
        journaler.get_read_pos() < journaler.get_write_pos()) {
-      journaler.wait_for_readable(new FunctionContext([this, completion](int r) {
+      journaler.wait_for_readable(new FunctionContext([this](int r) {
         Mutex::Locker l(lock);
-       _recover(completion);
+       _recover();
       }));
       return;
     }
@@ -204,7 +217,8 @@ void PurgeQueue::_recover(Context *completion)
       // restore original read_pos
       journaler.set_read_pos(journaler.last_committed.expire_pos);
       journaler.set_writeable();
-      completion->complete(0);
+      recovered = true;
+      finish_contexts(g_ceph_context, waiting_for_recovery);
       return;
     }
 
@@ -219,11 +233,18 @@ void PurgeQueue::create(Context *fin)
   dout(4) << "creating" << dendl;
   Mutex::Locker l(lock);
 
+  if (fin)
+    waiting_for_recovery.push_back(fin);
+
   file_layout_t layout = file_layout_t::get_default();
   layout.pool_id = metadata_pool;
   journaler.set_writeable();
   journaler.create(&layout, JOURNAL_FORMAT_RESILIENT);
-  journaler.write_head(fin);
+  journaler.write_head(new FunctionContext([this](int r) {
+    Mutex::Locker l(lock);
+    recovered = true;
+    finish_contexts(g_ceph_context, waiting_for_recovery);
+  }));
 }
 
 /**