]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/librbd/cache/pwl/ssd/WriteLog.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / librbd / cache / pwl / ssd / WriteLog.cc
index c2042a239c0e9d6008ea6b89b3e3b23f427dd820..a3b183411f3fbe635b260cb18a372c1cbbd1461e 100644 (file)
@@ -30,6 +30,7 @@ namespace cache {
 namespace pwl {
 namespace ssd {
 
+using namespace std;
 using namespace librbd::cache::pwl;
 
 static bool is_valid_pool_root(const WriteLogPoolRoot& root) {
@@ -73,7 +74,7 @@ void WriteLog<I>::collect_read_extents(
   // Make a bl for this hit extent. This will add references to the
   // write_entry->cache_bl */
   ldout(m_image_ctx.cct, 5) << dendl;
-  auto write_entry = static_pointer_cast<WriteLogEntry>(map_entry.log_entry);
+  auto write_entry = std::static_pointer_cast<WriteLogEntry>(map_entry.log_entry);
   buffer::list hit_bl;
   write_entry->copy_cache_bl(&hit_bl);
   bool writesame = write_entry->is_writesame_entry();
@@ -208,8 +209,8 @@ bool WriteLog<I>::initialize_pool(Context *on_finish,
     ::IOContext ioctx(cct, nullptr);
     r = bdev->read(0, MIN_WRITE_ALLOC_SSD_SIZE, &bl, &ioctx, false);
     if (r < 0) {
-      lderr(cct) << "Read ssd cache superblock failed " << dendl;
-      goto error_handle;
+      lderr(cct) << "read ssd cache superblock failed " << dendl;
+      goto err_close_bdev;
     }
     auto p = bl.cbegin();
     decode(superblock, p);
@@ -221,17 +222,17 @@ bool WriteLog<I>::initialize_pool(Context *on_finish,
                   << dendl;
     ceph_assert(is_valid_pool_root(pool_root));
     if (pool_root.layout_version != SSD_LAYOUT_VERSION) {
-      lderr(cct) << "Pool layout version is "
+      lderr(cct) << "pool layout version is "
                  << pool_root.layout_version
                  << " expected " << SSD_LAYOUT_VERSION
                  << dendl;
-      goto error_handle;
+      goto err_close_bdev;
     }
     if (pool_root.block_size != MIN_WRITE_ALLOC_SSD_SIZE) {
-      lderr(cct) << "Pool block size is " << pool_root.block_size
+      lderr(cct) << "pool block size is " << pool_root.block_size
                  << " expected " << MIN_WRITE_ALLOC_SSD_SIZE
                  << dendl;
-      goto error_handle;
+      goto err_close_bdev;
     }
 
     this->m_log_pool_size = pool_root.pool_size;
@@ -248,7 +249,7 @@ bool WriteLog<I>::initialize_pool(Context *on_finish,
   }
   return true;
 
-error_handle:
+err_close_bdev:
   bdev->close();
   delete bdev;
   on_finish->complete(-EINVAL);
@@ -382,10 +383,11 @@ void WriteLog<I>::enlist_op_appender() {
   this->m_async_append_ops++;
   this->m_async_op_tracker.start_op();
   Context *append_ctx = new LambdaContext([this](int r) {
-      append_scheduled_ops();
-      });
+    append_scheduled_ops();
+  });
   this->m_work_queue.queue(append_ctx);
 }
+
 /*
  * Takes custody of ops. They'll all get their log entries appended,
  * and have their on_write_persist contexts completed once they and
@@ -444,8 +446,8 @@ void WriteLog<I>::append_scheduled_ops(void) {
   GenericLogOperations ops;
   ldout(m_image_ctx.cct, 20) << dendl;
 
-  bool ops_remain = false; //no-op variable for SSD
-  bool appending = false; //no-op variable for SSD
+  bool ops_remain = false;  // unused, no-op variable for SSD
+  bool appending = false;   // unused, no-op variable for SSD
   this->append_scheduled(ops, ops_remain, appending);
 
   if (ops.size()) {
@@ -493,7 +495,7 @@ void WriteLog<I>::append_op_log_entries(GenericLogOperations &ops) {
   });
   uint64_t *new_first_free_entry = new(uint64_t);
   Context *append_ctx = new LambdaContext(
-      [this, new_first_free_entry, ops, ctx](int r) {
+    [this, new_first_free_entry, ops, ctx](int r) {
       std::shared_ptr<WriteLogPoolRoot> new_root;
       {
         ldout(m_image_ctx.cct, 20) << "Finished appending at "
@@ -550,82 +552,87 @@ void WriteLog<I>::construct_flush_entries(pwl::GenericLogEntries entries_to_flus
 
   if (invalidating || !has_write_entry) {
     for (auto &log_entry : entries_to_flush) {
-      Context *ctx = this->construct_flush_entry(log_entry, invalidating);
-
-      if (!invalidating) {
-        ctx = new LambdaContext(
-        [this, log_entry, ctx](int r) {
-          m_image_ctx.op_work_queue->queue(new LambdaContext(
-           [this, log_entry, ctx](int r) {
-             ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
-                                         << " " << *log_entry << dendl;
-             log_entry->writeback(this->m_image_writeback, ctx);
-             this->m_flush_ops_will_send -= 1;
-           }), 0);
-       });
-      }
-      post_unlock.add(ctx);
+      GuardedRequestFunctionContext *guarded_ctx =
+        new GuardedRequestFunctionContext([this, log_entry, invalidating]
+          (GuardedRequestFunctionContext &guard_ctx) {
+            log_entry->m_cell = guard_ctx.cell;
+            Context *ctx = this->construct_flush_entry(log_entry, invalidating);
+
+            if (!invalidating) {
+              ctx = new LambdaContext([this, log_entry, ctx](int r) {
+                m_image_ctx.op_work_queue->queue(new LambdaContext(
+                 [this, log_entry, ctx](int r) {
+                   ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
+                                               << " " << *log_entry << dendl;
+                   log_entry->writeback(this->m_image_writeback, ctx);
+                 }), 0);
+             });
+            }
+            ctx->complete(0);
+        });
+      this->detain_flush_guard_request(log_entry, guarded_ctx);
     }
   } else {
     int count = entries_to_flush.size();
-    std::vector<std::shared_ptr<GenericWriteLogEntry>> log_entries;
+    std::vector<std::shared_ptr<GenericWriteLogEntry>> write_entries;
     std::vector<bufferlist *> read_bls;
-    std::vector<Context *> contexts;
 
-    log_entries.reserve(count);
+    write_entries.reserve(count);
     read_bls.reserve(count);
-    contexts.reserve(count);
 
     for (auto &log_entry : entries_to_flush) {
-      // log_entry already removed from m_dirty_log_entries and
-      // in construct_flush_entry() it will inc(m_flush_ops_in_flight).
-      // We call this func here to make ops can track.
-      Context *ctx = this->construct_flush_entry(log_entry, invalidating);
       if (log_entry->is_write_entry()) {
        bufferlist *bl = new bufferlist;
        auto write_entry = static_pointer_cast<WriteLogEntry>(log_entry);
        write_entry->inc_bl_refs();
-       log_entries.push_back(write_entry);
+       write_entries.push_back(write_entry);
        read_bls.push_back(bl);
       }
-      contexts.push_back(ctx);
     }
 
     Context *ctx = new LambdaContext(
-      [this, entries_to_flush, read_bls, contexts](int r) {
-        int i = 0, j = 0;
+      [this, entries_to_flush, read_bls](int r) {
+        int i = 0;
+       GuardedRequestFunctionContext *guarded_ctx = nullptr;
 
        for (auto &log_entry : entries_to_flush) {
-          Context *ctx = contexts[j++];
-
          if (log_entry->is_write_entry()) {
            bufferlist captured_entry_bl;
-
            captured_entry_bl.claim_append(*read_bls[i]);
            delete read_bls[i++];
 
-           m_image_ctx.op_work_queue->queue(new LambdaContext(
-             [this, log_entry, entry_bl=std::move(captured_entry_bl), ctx](int r) {
-               auto captured_entry_bl = std::move(entry_bl);
-               ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
-                                          << " " << *log_entry << dendl;
-               log_entry->writeback_bl(this->m_image_writeback, ctx,
-                                       std::move(captured_entry_bl));
-               this->m_flush_ops_will_send -= 1;
-             }), 0);
+           guarded_ctx = new GuardedRequestFunctionContext([this, log_entry, captured_entry_bl]
+              (GuardedRequestFunctionContext &guard_ctx) {
+                log_entry->m_cell = guard_ctx.cell;
+                Context *ctx = this->construct_flush_entry(log_entry, false);
+
+               m_image_ctx.op_work_queue->queue(new LambdaContext(
+                 [this, log_entry, entry_bl=std::move(captured_entry_bl), ctx](int r) {
+                   auto captured_entry_bl = std::move(entry_bl);
+                   ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
+                                              << " " << *log_entry << dendl;
+                   log_entry->writeback_bl(this->m_image_writeback, ctx,
+                                            std::move(captured_entry_bl));
+                 }), 0);
+             });
          } else {
-             m_image_ctx.op_work_queue->queue(new LambdaContext(
-               [this, log_entry, ctx](int r) {
-                 ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
-                                             << " " << *log_entry << dendl;
-                 log_entry->writeback(this->m_image_writeback, ctx);
-                 this->m_flush_ops_will_send -= 1;
-               }), 0);
+           guarded_ctx = new GuardedRequestFunctionContext([this, log_entry]
+              (GuardedRequestFunctionContext &guard_ctx) {
+                log_entry->m_cell = guard_ctx.cell;
+                Context *ctx = this->construct_flush_entry(log_entry, false);
+               m_image_ctx.op_work_queue->queue(new LambdaContext(
+                 [this, log_entry, ctx](int r) {
+                   ldout(m_image_ctx.cct, 15) << "flushing:" << log_entry
+                                               << " " << *log_entry << dendl;
+                   log_entry->writeback(this->m_image_writeback, ctx);
+                 }), 0);
+            });
          }
+          this->detain_flush_guard_request(log_entry, guarded_ctx);
        }
       });
 
-    aio_read_data_blocks(log_entries, read_bls, ctx);
+    aio_read_data_blocks(write_entries, read_bls, ctx);
   }
 }
 
@@ -808,12 +815,12 @@ bool WriteLog<I>::retire_entries(const unsigned long int frees_per_tx) {
           this->m_bytes_cached -= cached_bytes;
 
           ldout(m_image_ctx.cct, 20)
-            << "Finished root update: " << "initial_first_valid_entry="
-            << initial_first_valid_entry << ", " << "m_first_valid_entry="
-            << m_first_valid_entry << "," << "release space = "
-            << allocated_bytes << "," << "m_bytes_allocated="
-            << m_bytes_allocated << "," << "release cached space="
-            << cached_bytes << "," << "m_bytes_cached="
+            << "Finished root update: initial_first_valid_entry="
+            << initial_first_valid_entry << ", m_first_valid_entry="
+            << m_first_valid_entry << ", release space = "
+            << allocated_bytes << ", m_bytes_allocated="
+            << m_bytes_allocated << ", release cached space="
+            << cached_bytes << ", m_bytes_cached="
             << this->m_bytes_cached << dendl;
 
           this->m_alloc_failed_since_retire = false;
@@ -1019,7 +1026,7 @@ void WriteLog<I>::update_root_scheduled_ops() {
   });
   Context *append_ctx = new LambdaContext([this, ctx](int r) {
     ldout(m_image_ctx.cct, 15) << "Finish the update of pool root." << dendl;
-    bool need_finisher = false;;
+    bool need_finisher = false;
     assert(r == 0);
     {
       std::lock_guard locker(m_lock);
@@ -1088,7 +1095,7 @@ void WriteLog<I>::aio_read_data_blocks(
         bls[i]->append(valid_data_bl);
         write_entry->dec_bl_refs();
       }
-     ctx->complete(r);
+      ctx->complete(r);
     });
 
   CephContext *cct = m_image_ctx.cct;