]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/librbd/cache/pwl/AbstractWriteLog.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / librbd / cache / pwl / AbstractWriteLog.cc
index 1352884932e8b5ee925264a6940617968689ce2f..b11d947d34fec8781b83aae08a46d93cf949157b 100644 (file)
@@ -31,6 +31,7 @@ namespace librbd {
 namespace cache {
 namespace pwl {
 
+using namespace std;
 using namespace librbd::cache::pwl;
 
 typedef AbstractWriteLog<ImageCtx>::Extent Extent;
@@ -43,6 +44,9 @@ AbstractWriteLog<I>::AbstractWriteLog(
     plugin::Api<I>& plugin_api)
   : m_builder(builder),
     m_write_log_guard(image_ctx.cct),
+    m_flush_guard(image_ctx.cct),
+    m_flush_guard_lock(ceph::make_mutex(pwl::unique_lock_name(
+      "librbd::cache::pwl::AbstractWriteLog::m_flush_guard_lock", this))),
     m_deferred_dispatch_lock(ceph::make_mutex(pwl::unique_lock_name(
       "librbd::cache::pwl::AbstractWriteLog::m_deferred_dispatch_lock", this))),
     m_blockguard_lock(ceph::make_mutex(pwl::unique_lock_name(
@@ -408,7 +412,7 @@ void AbstractWriteLog<I>::update_sync_points(std::map<uint64_t, bool> &missing_s
     ceph_assert(kv.first == m_current_sync_gen+1);
     init_flush_new_sync_point(later);
     ceph_assert(kv.first == m_current_sync_gen);
-    sync_point_entries[kv.first] = m_current_sync_point->log_entry;;
+    sync_point_entries[kv.first] = m_current_sync_point->log_entry;
   }
 
   /*
@@ -515,7 +519,7 @@ void AbstractWriteLog<I>::pwl_init(Context *on_finish, DeferredContexts &later)
     ldout(cct, 5) << "There's an existing pool file " << m_log_pool_name
                   << ", While there's no cache in the image metatata." << dendl;
     if (remove(m_log_pool_name.c_str()) != 0) {
-      lderr(cct) << "Failed to remove the pool file " << m_log_pool_name
+      lderr(cct) << "failed to remove the pool file " << m_log_pool_name
                  << dendl;
       on_finish->complete(-errno);
       return;
@@ -524,7 +528,8 @@ void AbstractWriteLog<I>::pwl_init(Context *on_finish, DeferredContexts &later)
     }
   } else if ((m_cache_state->present) &&
              (access(m_log_pool_name.c_str(), F_OK) != 0)) {
-    ldout(cct, 5) << "Can't find the existed pool file " << m_log_pool_name << dendl;
+    lderr(cct) << "can't find the existed pool file: " << m_log_pool_name
+               << ". error: " << cpp_strerror(-errno) << dendl;
     on_finish->complete(-errno);
     return;
   }
@@ -685,9 +690,9 @@ void AbstractWriteLog<I>::read(Extents&& image_extents,
   C_ReadRequest *read_ctx = m_builder->create_read_request(
       cct, now, m_perfcounter, bl, on_finish);
   ldout(cct, 20) << "name: " << m_image_ctx.name << " id: " << m_image_ctx.id
-                 << "image_extents=" << image_extents << ", "
-                 << "bl=" << bl << ", "
-                 << "on_finish=" << on_finish << dendl;
+                 << "image_extents=" << image_extents
+                 << ", bl=" << bl
+                 << "on_finish=" << on_finish << dendl;
 
   ceph_assert(m_initialized);
   bl->clear();
@@ -804,8 +809,8 @@ void AbstractWriteLog<I>::read(Extents&& image_extents,
     }
   }
 
-  ldout(cct, 20) << "miss_extents=" << read_ctx->miss_extents << ", "
-                 << "miss_bl=" << read_ctx->miss_bl << dendl;
+  ldout(cct, 20) << "miss_extents=" << read_ctx->miss_extents
+                 << "miss_bl=" << read_ctx->miss_bl << dendl;
 
   complete_read(log_entries_to_read, bls_to_read, ctx);
 }
@@ -824,11 +829,9 @@ void AbstractWriteLog<I>::write(Extents &&image_extents,
 
   ceph_assert(m_initialized);
 
-  /* Split images because PMDK's space management is not perfect, there are
-   * fragment problems. The larger the block size difference of the block,
-   * the easier the fragmentation problem will occur, resulting in the
-   * remaining space can not be allocated in large size. We plan to manage
-   * pmem space and allocation by ourselves in the future.
+  /* Split image extents larger than 1M. This isn't strictly necessary but
+   * makes libpmemobj allocator's job easier and reduces pmemobj_defrag() cost.
+   * We plan to manage pmem space and allocation by ourselves in the future.
    */
   Extents split_image_extents;
   uint64_t max_extent_size = get_max_extent();
@@ -1245,8 +1248,8 @@ void AbstractWriteLog<I>::append_scheduled(GenericLogOperations &ops, bool &ops_
       std::advance(last_in_batch, ops_to_append);
       ops.splice(ops.end(), m_ops_to_append, m_ops_to_append.begin(), last_in_batch);
       ops_remain = true; /* Always check again before leaving */
-      ldout(m_image_ctx.cct, 20) << "appending " << ops.size() << ", "
-                                 << m_ops_to_append.size() << " remain" << dendl;
+      ldout(m_image_ctx.cct, 20) << "appending " << ops.size() << ", remain "
+                                 << m_ops_to_append.size() << dendl;
     } else if (isRWL) {
       ops_remain = false;
       if (appending) {
@@ -1473,7 +1476,6 @@ bool AbstractWriteLog<I>::check_allocation(
   {
     std::lock_guard locker(m_lock);
     if (m_free_lanes < num_lanes) {
-      req->set_io_waited_for_lanes(true);
       ldout(m_image_ctx.cct, 20) << "not enough free lanes (need "
                                  <<  num_lanes
                                  << ", have " << m_free_lanes << ") "
@@ -1482,7 +1484,6 @@ bool AbstractWriteLog<I>::check_allocation(
       /* This isn't considered a "no space" alloc fail. Lanes are a throttling mechanism. */
     }
     if (m_free_log_entries < num_log_entries) {
-      req->set_io_waited_for_entries(true);
       ldout(m_image_ctx.cct, 20) << "not enough free entries (need "
                                  << num_log_entries
                                  << ", have " << m_free_log_entries << ") "
@@ -1492,13 +1493,10 @@ bool AbstractWriteLog<I>::check_allocation(
     }
     /* Don't attempt buffer allocate if we've exceeded the "full" threshold */
     if (m_bytes_allocated + bytes_allocated > m_bytes_allocated_cap) {
-      if (!req->has_io_waited_for_buffers()) {
-        req->set_io_waited_for_buffers(true);
-        ldout(m_image_ctx.cct, 5) << "Waiting for allocation cap (cap="
-                                  << m_bytes_allocated_cap
-                                  << ", allocated=" << m_bytes_allocated
-                                  << ") in write [" << *req << "]" << dendl;
-      }
+      ldout(m_image_ctx.cct, 20) << "Waiting for allocation cap (cap="
+                                 << m_bytes_allocated_cap
+                                 << ", allocated=" << m_bytes_allocated
+                                 << ") in write [" << *req << "]" << dendl;
       alloc_succeeds = false;
       no_space = true; /* Entries must be retired */
     }
@@ -1521,9 +1519,6 @@ bool AbstractWriteLog<I>::check_allocation(
       m_bytes_allocated += bytes_allocated;
       m_bytes_cached += bytes_cached;
       m_bytes_dirty += bytes_dirtied;
-      if (req->has_io_waited_for_buffers()) {
-        req->set_io_waited_for_buffers(false);
-      }
     } else {
       alloc_succeeds = false;
     }
@@ -1626,21 +1621,34 @@ bool AbstractWriteLog<I>::can_flush_entry(std::shared_ptr<GenericLogEntry> log_e
 }
 
 template <typename I>
-Context* AbstractWriteLog<I>::construct_flush_entry(std::shared_ptr<GenericLogEntry> log_entry,
-                                                      bool invalidating) {
-  CephContext *cct = m_image_ctx.cct;
+void AbstractWriteLog<I>::detain_flush_guard_request(std::shared_ptr<GenericLogEntry> log_entry,
+                                                    GuardedRequestFunctionContext *guarded_ctx) {
+  ldout(m_image_ctx.cct, 20) << dendl;
 
-  ldout(cct, 20) << "" << dendl;
-  ceph_assert(m_entry_reader_lock.is_locked());
-  ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
-  if (!m_flush_ops_in_flight ||
-      (log_entry->ram_entry.sync_gen_number < m_lowest_flushing_sync_gen)) {
-    m_lowest_flushing_sync_gen = log_entry->ram_entry.sync_gen_number;
+  BlockExtent extent;
+  if (log_entry->is_sync_point()) {
+    extent = block_extent(whole_volume_extent());
+  } else {
+    extent = log_entry->ram_entry.block_extent();
   }
-  m_flush_ops_in_flight += 1;
-  m_flush_ops_will_send += 1;
-  /* For write same this is the bytes affected by the flush op, not the bytes transferred */
-  m_flush_bytes_in_flight += log_entry->ram_entry.write_bytes;
+
+  auto req = GuardedRequest(extent, guarded_ctx, false);
+  BlockGuardCell *cell = nullptr;
+
+  {
+    std::lock_guard locker(m_flush_guard_lock);
+    m_flush_guard.detain(req.block_extent, &req, &cell);
+  }
+  if (cell) {
+    req.guard_ctx->cell = cell;
+    m_image_ctx.op_work_queue->queue(req.guard_ctx, 0);
+  }
+}
+
+template <typename I>
+Context* AbstractWriteLog<I>::construct_flush_entry(std::shared_ptr<GenericLogEntry> log_entry,
+                                                      bool invalidating) {
+  ldout(m_image_ctx.cct, 20) << "" << dendl;
 
   /* Flush write completion action */
   utime_t writeback_start_time = ceph_clock_now();
@@ -1671,7 +1679,24 @@ Context* AbstractWriteLog<I>::construct_flush_entry(std::shared_ptr<GenericLogEn
     });
   /* Flush through lower cache before completing */
   ctx = new LambdaContext(
-    [this, ctx](int r) {
+    [this, ctx, log_entry](int r) {
+      {
+
+        WriteLogGuard::BlockOperations block_reqs;
+       BlockGuardCell *detained_cell = nullptr;
+
+       std::lock_guard locker{m_flush_guard_lock};
+       m_flush_guard.release(log_entry->m_cell, &block_reqs);
+
+       for (auto &req : block_reqs) {
+         m_flush_guard.detain(req.block_extent, &req, &detained_cell);
+         if (detained_cell) {
+           req.guard_ctx->cell = detained_cell;
+           m_image_ctx.op_work_queue->queue(req.guard_ctx, 0);
+         }
+        }
+      }
+
       if (r < 0) {
         lderr(m_image_ctx.cct) << "failed to flush log entry"
                                << cpp_strerror(r) << dendl;
@@ -1711,10 +1736,6 @@ void AbstractWriteLog<I>::process_writeback_dirty_entries() {
         break;
       }
 
-      if (m_flush_ops_will_send) {
-       ldout(cct, 20) << "Previous flush-ops is still not sent" << dendl;
-       break;
-      }
       auto candidate = m_dirty_log_entries.front();
       bool flushable = can_flush_entry(candidate);
       if (flushable) {
@@ -1723,6 +1744,17 @@ void AbstractWriteLog<I>::process_writeback_dirty_entries() {
         if (!has_write_entry)
           has_write_entry = candidate->is_write_entry();
         m_dirty_log_entries.pop_front();
+
+       // To track candidate, we should add m_flush_ops_in_flight in here
+       {
+         if (!m_flush_ops_in_flight ||
+             (candidate->ram_entry.sync_gen_number < m_lowest_flushing_sync_gen)) {
+           m_lowest_flushing_sync_gen = candidate->ram_entry.sync_gen_number;
+         }
+         m_flush_ops_in_flight += 1;
+         /* For write same this is the bytes affected by the flush op, not the bytes transferred */
+         m_flush_bytes_in_flight += candidate->ram_entry.write_bytes;
+       }
       } else {
         ldout(cct, 20) << "Next dirty entry isn't flushable yet" << dendl;
         break;
@@ -2016,8 +2048,8 @@ void AbstractWriteLog<I>::internal_flush(bool invalidate, Context *on_finish) {
                                       << invalidate << ")" << dendl;
             if (m_log_entries.size()) {
               ldout(m_image_ctx.cct, 1) << "m_log_entries.size()="
-                                        << m_log_entries.size() << ", "
-                                        << "front()=" << *m_log_entries.front()
+                                        << m_log_entries.size()
+                                        << "front()=" << *m_log_entries.front()
                                         << dendl;
             }
             if (invalidate) {
@@ -2088,13 +2120,14 @@ bool AbstractWriteLog<I>::can_retire_entry(std::shared_ptr<GenericLogEntry> log_
 
   ldout(cct, 20) << dendl;
   ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
+  ceph_assert(log_entry);
   return log_entry->can_retire();
 }
 
 template <typename I>
 void AbstractWriteLog<I>::check_image_cache_state_clean() {
   ceph_assert(m_deferred_ios.empty());
-  ceph_assert(m_ops_to_append.empty());;
+  ceph_assert(m_ops_to_append.empty());
   ceph_assert(m_async_flush_ops == 0);
   ceph_assert(m_async_append_ops == 0);
   ceph_assert(m_dirty_log_entries.empty());