]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/librbd/cache/pwl/AbstractWriteLog.cc
import ceph 16.2.7
[ceph.git] / ceph / src / librbd / cache / pwl / AbstractWriteLog.cc
index 76e8e478e2bf7487907ba7e680b871133a1f9f2a..1352884932e8b5ee925264a6940617968689ce2f 100644 (file)
@@ -52,7 +52,7 @@ AbstractWriteLog<I>::AbstractWriteLog(
         "tp_pwl", 4, ""),
     m_cache_state(cache_state),
     m_image_ctx(image_ctx),
-    m_log_pool_config_size(DEFAULT_POOL_SIZE),
+    m_log_pool_size(DEFAULT_POOL_SIZE),
     m_image_writeback(image_writeback),
     m_plugin_api(plugin_api),
     m_log_retire_lock(ceph::make_mutex(pwl::unique_lock_name(
@@ -158,13 +158,13 @@ void AbstractWriteLog<I>::perf_start(std::string name) {
     "Histogram of syncpoint's logentry numbers vs bytes number");
 
   plb.add_u64_counter(l_librbd_pwl_wr_req, "wr", "Writes");
+  plb.add_u64_counter(l_librbd_pwl_wr_bytes, "wr_bytes", "Data size in writes");
   plb.add_u64_counter(l_librbd_pwl_wr_req_def, "wr_def", "Writes deferred for resources");
   plb.add_u64_counter(l_librbd_pwl_wr_req_def_lanes, "wr_def_lanes", "Writes deferred for lanes");
   plb.add_u64_counter(l_librbd_pwl_wr_req_def_log, "wr_def_log", "Writes deferred for log entries");
   plb.add_u64_counter(l_librbd_pwl_wr_req_def_buf, "wr_def_buf", "Writes deferred for buffers");
   plb.add_u64_counter(l_librbd_pwl_wr_req_overlap, "wr_overlap", "Writes overlapping with prior in-progress writes");
   plb.add_u64_counter(l_librbd_pwl_wr_req_queued, "wr_q_barrier", "Writes queued for prior barriers (aio_flush)");
-  plb.add_u64_counter(l_librbd_pwl_wr_bytes, "wr_bytes", "Data size in writes");
 
   plb.add_u64_counter(l_librbd_pwl_log_ops, "log_ops", "Log appends");
   plb.add_u64_avg(l_librbd_pwl_log_op_bytes, "log_op_bytes", "Average log append bytes");
@@ -258,7 +258,8 @@ void AbstractWriteLog<I>::perf_start(std::string name) {
   plb.add_time_avg(l_librbd_pwl_cmp_latency, "cmp_lat", "Compare and Write latecy");
   plb.add_u64_counter(l_librbd_pwl_cmp_fails, "cmp_fails", "Compare and Write compare fails");
 
-  plb.add_u64_counter(l_librbd_pwl_flush, "flush", "Flush (flush RWL)");
+  plb.add_u64_counter(l_librbd_pwl_internal_flush, "internal_flush", "Flush RWL (write back to OSD)");
+  plb.add_time_avg(l_librbd_pwl_writeback_latency, "writeback_lat", "write back to OSD latency");
   plb.add_u64_counter(l_librbd_pwl_invalidate_cache, "invalidate", "Invalidate RWL");
   plb.add_u64_counter(l_librbd_pwl_invalidate_discard_cache, "discard", "Discard and invalidate RWL");
 
@@ -310,16 +311,17 @@ void AbstractWriteLog<I>::log_perf() {
 template <typename I>
 void AbstractWriteLog<I>::periodic_stats() {
   std::lock_guard locker(m_lock);
-  ldout(m_image_ctx.cct, 1) << "STATS: "
-                            << "m_free_log_entries=" << m_free_log_entries << ", "
-                            << "m_log_entries=" << m_log_entries.size() << ", "
-                            << "m_dirty_log_entries=" << m_dirty_log_entries.size() << ", "
-                            << "m_bytes_allocated=" << m_bytes_allocated << ", "
-                            << "m_bytes_cached=" << m_bytes_cached << ", "
-                            << "m_bytes_dirty=" << m_bytes_dirty << ", "
-                            << "bytes available=" << m_bytes_allocated_cap - m_bytes_allocated << ", "
-                            << "m_current_sync_gen=" << m_current_sync_gen << ", "
-                            << "m_flushed_sync_gen=" << m_flushed_sync_gen << ", "
+  ldout(m_image_ctx.cct, 1) << "STATS: m_log_entries=" << m_log_entries.size()
+                            << ", m_dirty_log_entries=" << m_dirty_log_entries.size()
+                            << ", m_free_log_entries=" << m_free_log_entries
+                            << ", m_bytes_allocated=" << m_bytes_allocated
+                            << ", m_bytes_cached=" << m_bytes_cached
+                            << ", m_bytes_dirty=" << m_bytes_dirty
+                            << ", bytes available=" << m_bytes_allocated_cap - m_bytes_allocated
+                            << ", m_first_valid_entry=" << m_first_valid_entry
+                            << ", m_first_free_entry=" << m_first_free_entry
+                            << ", m_current_sync_gen=" << m_current_sync_gen
+                            << ", m_flushed_sync_gen=" << m_flushed_sync_gen
                             << dendl;
 }
 
@@ -341,7 +343,7 @@ template <typename I>
 void AbstractWriteLog<I>::update_entries(std::shared_ptr<GenericLogEntry> *log_entry,
     WriteLogCacheEntry *cache_entry, std::map<uint64_t, bool> &missing_sync_points,
     std::map<uint64_t, std::shared_ptr<SyncPointLogEntry>> &sync_point_entries,
-    int entry_index) {
+    uint64_t entry_index) {
     bool writer = cache_entry->is_writer();
     if (cache_entry->is_sync_point()) {
       ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
@@ -390,7 +392,7 @@ void AbstractWriteLog<I>::update_entries(std::shared_ptr<GenericLogEntry> *log_e
 template <typename I>
 void AbstractWriteLog<I>::update_sync_points(std::map<uint64_t, bool> &missing_sync_points,
     std::map<uint64_t, std::shared_ptr<SyncPointLogEntry>> &sync_point_entries,
-    DeferredContexts &later, uint32_t alloc_size ) {
+    DeferredContexts &later) {
   /* Create missing sync points. These must not be appended until the
    * entry reload is complete and the write map is up to
    * date. Currently this is handled by the deferred contexts object
@@ -441,15 +443,9 @@ void AbstractWriteLog<I>::update_sync_points(std::map<uint64_t, bool> &missing_s
             gen_write_entry->set_flushed(true);
             sync_point_entry->writes_flushed++;
           }
-          if (log_entry->write_bytes() == log_entry->bytes_dirty()) {
-            /* This entry is a basic write */
-            uint64_t bytes_allocated = alloc_size;
-            if (gen_write_entry->ram_entry.write_bytes > bytes_allocated) {
-              bytes_allocated = gen_write_entry->ram_entry.write_bytes;
-            }
-            m_bytes_allocated += bytes_allocated;
-            m_bytes_cached += gen_write_entry->ram_entry.write_bytes;
-          }
+
+          /* calc m_bytes_allocated & m_bytes_cached */
+          inc_allocated_cached_bytes(log_entry);
         }
       }
     } else {
@@ -509,7 +505,10 @@ void AbstractWriteLog<I>::pwl_init(Context *on_finish, DeferredContexts &later)
   ldout(cct,5) << "pwl_path: " << m_cache_state->path << dendl;
 
   m_log_pool_name = m_cache_state->path;
-  m_log_pool_config_size = max(m_cache_state->size, MIN_POOL_SIZE);
+  m_log_pool_size = max(m_cache_state->size, MIN_POOL_SIZE);
+  m_log_pool_size = p2align(m_log_pool_size, POOL_SIZE_ALIGN);
+  ldout(cct, 5) << "pool " << m_log_pool_name << " size " << m_log_pool_size
+                << " (adjusted from " << m_cache_state->size << ")" << dendl;
 
   if ((!m_cache_state->present) &&
       (access(m_log_pool_name.c_str(), F_OK) == 0)) {
@@ -527,9 +526,13 @@ void AbstractWriteLog<I>::pwl_init(Context *on_finish, DeferredContexts &later)
              (access(m_log_pool_name.c_str(), F_OK) != 0)) {
     ldout(cct, 5) << "Can't find the existed pool file " << m_log_pool_name << dendl;
     on_finish->complete(-errno);
+    return;
   }
 
-  initialize_pool(on_finish, later);
+  bool succeeded = initialize_pool(on_finish, later);
+  if (!succeeded) {
+    return ;
+  }
 
   ldout(cct,1) << "pool " << m_log_pool_name << " has " << m_total_log_entries
                << " log entries, " << m_free_log_entries << " of which are free."
@@ -575,7 +578,10 @@ template <typename I>
 void AbstractWriteLog<I>::init(Context *on_finish) {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << dendl;
-  perf_start(m_image_ctx.id);
+  auto pname = std::string("librbd-pwl-") + m_image_ctx.id +
+      std::string("-") + m_image_ctx.md_ctx.get_pool_name() +
+      std::string("-") + m_image_ctx.name;
+  perf_start(pname);
 
   ceph_assert(!m_initialized);
 
@@ -617,7 +623,7 @@ void AbstractWriteLog<I>::shut_down(Context *on_finish) {
       }
       {
         std::lock_guard locker(m_lock);
-        ceph_assert(m_dirty_log_entries.size() == 0);
+        check_image_cache_state_clean();
         m_wake_up_enabled = false;
         m_cache_state->clean = true;
         m_log_entries.clear();
@@ -630,6 +636,14 @@ void AbstractWriteLog<I>::shut_down(Context *on_finish) {
       }
       update_image_cache_state(next_ctx);
     });
+  ctx = new LambdaContext(
+    [this, ctx](int r) {
+      Context *next_ctx = override_ctx(r, ctx);
+      ldout(m_image_ctx.cct, 6) << "waiting for in flight operations" << dendl;
+      // Wait for in progress IOs to complete
+      next_ctx = util::create_async_context_callback(&m_work_queue, next_ctx);
+      m_async_op_tracker.wait_for_ops(next_ctx);
+    });
   ctx = new LambdaContext(
     [this, ctx](int r) {
       Context *next_ctx = override_ctx(r, ctx);
@@ -646,14 +660,6 @@ void AbstractWriteLog<I>::shut_down(Context *on_finish) {
       }
       flush_dirty_entries(next_ctx);
     });
-  ctx = new LambdaContext(
-    [this, ctx](int r) {
-      Context *next_ctx = override_ctx(r, ctx);
-      ldout(m_image_ctx.cct, 6) << "waiting for in flight operations" << dendl;
-      // Wait for in progress IOs to complete
-      next_ctx = util::create_async_context_callback(m_image_ctx, next_ctx);
-      m_async_op_tracker.wait_for_ops(next_ctx);
-    });
   ctx = new LambdaContext(
     [this, ctx](int r) {
       ldout(m_image_ctx.cct, 6) << "Done internal_flush in shutdown" << dendl;
@@ -687,7 +693,7 @@ void AbstractWriteLog<I>::read(Extents&& image_extents,
   bl->clear();
   m_perfcounter->inc(l_librbd_pwl_rd_req, 1);
 
-  std::vector<WriteLogCacheEntry*> log_entries_to_read;
+  std::vector<std::shared_ptr<GenericWriteLogEntry>> log_entries_to_read;
   std::vector<bufferlist*> bls_to_read;
 
   m_async_op_tracker.start_op();
@@ -1252,19 +1258,19 @@ void AbstractWriteLog<I>::append_scheduled(GenericLogOperations &ops, bool &ops_
 }
 
 template <typename I>
-void AbstractWriteLog<I>::schedule_append(GenericLogOperationsVector &ops)
+void AbstractWriteLog<I>::schedule_append(GenericLogOperationsVector &ops, C_BlockIORequestT *req)
 {
   GenericLogOperations to_append(ops.begin(), ops.end());
 
-  schedule_append_ops(to_append);
+  schedule_append_ops(to_append, req);
 }
 
 template <typename I>
-void AbstractWriteLog<I>::schedule_append(GenericLogOperationSharedPtr op)
+void AbstractWriteLog<I>::schedule_append(GenericLogOperationSharedPtr op, C_BlockIORequestT *req)
 {
   GenericLogOperations to_append { op };
 
-  schedule_append_ops(to_append);
+  schedule_append_ops(to_append, req);
 }
 
 /*
@@ -1298,16 +1304,16 @@ void AbstractWriteLog<I>::complete_op_log_entries(GenericLogOperations &&ops,
     }
     op->complete(result);
     m_perfcounter->tinc(l_librbd_pwl_log_op_dis_to_app_t,
-                        op->log_append_time - op->dispatch_time);
+                        op->log_append_start_time - op->dispatch_time);
     m_perfcounter->tinc(l_librbd_pwl_log_op_dis_to_cmp_t, now - op->dispatch_time);
     m_perfcounter->hinc(l_librbd_pwl_log_op_dis_to_cmp_t_hist,
                         utime_t(now - op->dispatch_time).to_nsec(),
                         log_entry->ram_entry.write_bytes);
-    utime_t app_lat = op->log_append_comp_time - op->log_append_time;
+    utime_t app_lat = op->log_append_comp_time - op->log_append_start_time;
     m_perfcounter->tinc(l_librbd_pwl_log_op_app_to_appc_t, app_lat);
     m_perfcounter->hinc(l_librbd_pwl_log_op_app_to_appc_t_hist, app_lat.to_nsec(),
                       log_entry->ram_entry.write_bytes);
-    m_perfcounter->tinc(l_librbd_pwl_log_op_app_to_cmp_t, now - op->log_append_time);
+    m_perfcounter->tinc(l_librbd_pwl_log_op_app_to_cmp_t, now - op->log_append_start_time);
   }
   // New entries may be flushable
   {
@@ -1433,8 +1439,9 @@ void AbstractWriteLog<I>::alloc_and_dispatch_io_req(C_BlockIORequestT *req)
       std::lock_guard locker(m_lock);
       dispatch_here = m_deferred_ios.empty();
       // Only flush req's total_bytes is the max uint64
-      if ((req->image_extents_summary.total_bytes ==
-          std::numeric_limits<uint64_t>::max())) {
+      if (req->image_extents_summary.total_bytes ==
+          std::numeric_limits<uint64_t>::max() &&
+          static_cast<C_FlushRequestT *>(req)->internal == true) {
         dispatch_here = true;
       }
     }
@@ -1457,10 +1464,10 @@ void AbstractWriteLog<I>::alloc_and_dispatch_io_req(C_BlockIORequestT *req)
 }
 
 template <typename I>
-bool AbstractWriteLog<I>::check_allocation(C_BlockIORequestT *req,
-      uint64_t &bytes_cached, uint64_t &bytes_dirtied, uint64_t &bytes_allocated,
-      uint64_t &num_lanes, uint64_t &num_log_entries,
-      uint64_t &num_unpublished_reserves, uint64_t bytes_allocated_cap){
+bool AbstractWriteLog<I>::check_allocation(
+    C_BlockIORequestT *req, uint64_t bytes_cached, uint64_t bytes_dirtied,
+    uint64_t bytes_allocated, uint32_t num_lanes, uint32_t num_log_entries,
+    uint32_t num_unpublished_reserves) {
   bool alloc_succeeds = true;
   bool no_space = false;
   {
@@ -1484,11 +1491,11 @@ bool AbstractWriteLog<I>::check_allocation(C_BlockIORequestT *req,
       no_space = true; /* Entries must be retired */
     }
     /* Don't attempt buffer allocate if we've exceeded the "full" threshold */
-    if (m_bytes_allocated + bytes_allocated > bytes_allocated_cap) {
+    if (m_bytes_allocated + bytes_allocated > m_bytes_allocated_cap) {
       if (!req->has_io_waited_for_buffers()) {
         req->set_io_waited_for_buffers(true);
-        ldout(m_image_ctx.cct, 1) << "Waiting for allocation cap (cap="
-                                  << bytes_allocated_cap
+        ldout(m_image_ctx.cct, 5) << "Waiting for allocation cap (cap="
+                                  << m_bytes_allocated_cap
                                   << ", allocated=" << m_bytes_allocated
                                   << ") in write [" << *req << "]" << dendl;
       }
@@ -1506,7 +1513,8 @@ bool AbstractWriteLog<I>::check_allocation(C_BlockIORequestT *req,
     /* We need one free log entry per extent (each is a separate entry), and
      * one free "lane" for remote replication. */
     if ((m_free_lanes >= num_lanes) &&
-        (m_free_log_entries >= num_log_entries)) {
+        (m_free_log_entries >= num_log_entries) &&
+        (m_bytes_allocated_cap >= m_bytes_allocated + bytes_allocated)) {
       m_free_lanes -= num_lanes;
       m_free_log_entries -= num_log_entries;
       m_unpublished_reserves += num_unpublished_reserves;
@@ -1516,7 +1524,6 @@ bool AbstractWriteLog<I>::check_allocation(C_BlockIORequestT *req,
       if (req->has_io_waited_for_buffers()) {
         req->set_io_waited_for_buffers(false);
       }
-
     } else {
       alloc_succeeds = false;
     }
@@ -1631,12 +1638,17 @@ Context* AbstractWriteLog<I>::construct_flush_entry(std::shared_ptr<GenericLogEn
     m_lowest_flushing_sync_gen = log_entry->ram_entry.sync_gen_number;
   }
   m_flush_ops_in_flight += 1;
+  m_flush_ops_will_send += 1;
   /* For write same this is the bytes affected by the flush op, not the bytes transferred */
   m_flush_bytes_in_flight += log_entry->ram_entry.write_bytes;
 
   /* Flush write completion action */
+  utime_t writeback_start_time = ceph_clock_now();
   Context *ctx = new LambdaContext(
-    [this, log_entry, invalidating](int r) {
+    [this, log_entry, writeback_start_time, invalidating](int r) {
+      utime_t writeback_comp_time = ceph_clock_now();
+      m_perfcounter->tinc(l_librbd_pwl_writeback_latency,
+                          writeback_comp_time - writeback_start_time);
       {
         std::lock_guard locker(m_lock);
         if (r < 0) {
@@ -1676,13 +1688,16 @@ void AbstractWriteLog<I>::process_writeback_dirty_entries() {
   CephContext *cct = m_image_ctx.cct;
   bool all_clean = false;
   int flushed = 0;
+  bool has_write_entry = false;
 
   ldout(cct, 20) << "Look for dirty entries" << dendl;
   {
     DeferredContexts post_unlock;
+    GenericLogEntries entries_to_flush;
+
     std::shared_lock entry_reader_locker(m_entry_reader_lock);
+    std::lock_guard locker(m_lock);
     while (flushed < IN_FLIGHT_FLUSH_WRITE_LIMIT) {
-      std::lock_guard locker(m_lock);
       if (m_shutting_down) {
         ldout(cct, 5) << "Flush during shutdown supressed" << dendl;
         /* Do flush complete only when all flush ops are finished */
@@ -1695,17 +1710,26 @@ void AbstractWriteLog<I>::process_writeback_dirty_entries() {
         all_clean = !m_flush_ops_in_flight;
         break;
       }
+
+      if (m_flush_ops_will_send) {
+       ldout(cct, 20) << "Previous flush-ops is still not sent" << dendl;
+       break;
+      }
       auto candidate = m_dirty_log_entries.front();
       bool flushable = can_flush_entry(candidate);
       if (flushable) {
-        post_unlock.add(construct_flush_entry_ctx(candidate));
+        entries_to_flush.push_back(candidate);
         flushed++;
+        if (!has_write_entry)
+          has_write_entry = candidate->is_write_entry();
         m_dirty_log_entries.pop_front();
       } else {
         ldout(cct, 20) << "Next dirty entry isn't flushable yet" << dendl;
         break;
       }
     }
+
+    construct_flush_entries(entries_to_flush, post_unlock, has_write_entry);
   }
 
   if (all_clean) {
@@ -1739,11 +1763,11 @@ bool AbstractWriteLog<I>::handle_flushed_sync_point(std::shared_ptr<SyncPointLog
     }
     m_async_op_tracker.start_op();
     m_work_queue.queue(new LambdaContext(
-      [this, log_entry](int r) {
+      [this, next = std::move(log_entry->next_sync_point_entry)](int r) {
         bool handled_by_next;
         {
           std::lock_guard locker(m_lock);
-          handled_by_next = handle_flushed_sync_point(log_entry->next_sync_point_entry);
+          handled_by_next = handle_flushed_sync_point(std::move(next));
         }
         if (!handled_by_next) {
           persist_last_flushed_sync_gen();
@@ -1961,7 +1985,7 @@ void AbstractWriteLog<I>::internal_flush(bool invalidate, Context *on_finish) {
     if (invalidate) {
       m_perfcounter->inc(l_librbd_pwl_invalidate_cache, 1);
     } else {
-      m_perfcounter->inc(l_librbd_pwl_flush, 1);
+      m_perfcounter->inc(l_librbd_pwl_internal_flush, 1);
     }
   }
 
@@ -2067,6 +2091,20 @@ bool AbstractWriteLog<I>::can_retire_entry(std::shared_ptr<GenericLogEntry> log_
   return log_entry->can_retire();
 }
 
+template <typename I>
+void AbstractWriteLog<I>::check_image_cache_state_clean() {
+  ceph_assert(m_deferred_ios.empty());
+  ceph_assert(m_ops_to_append.empty());;
+  ceph_assert(m_async_flush_ops == 0);
+  ceph_assert(m_async_append_ops == 0);
+  ceph_assert(m_dirty_log_entries.empty());
+  ceph_assert(m_ops_to_flush.empty());
+  ceph_assert(m_flush_ops_in_flight == 0);
+  ceph_assert(m_flush_bytes_in_flight == 0);
+  ceph_assert(m_bytes_dirty == 0);
+  ceph_assert(m_work_queue.empty());
+}
+
 } // namespace pwl
 } // namespace cache
 } // namespace librbd