]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/librbd/Journal.cc
import ceph quincy 17.2.6
[ceph.git] / ceph / src / librbd / Journal.cc
index 66849c461fad21b87eff0092d5822aae68075001..8ddce2e8f7d4f4b476d5886d2135c38007b988f1 100644 (file)
@@ -14,9 +14,9 @@
 #include "journal/Settings.h"
 #include "journal/Utils.h"
 #include "librbd/ImageCtx.h"
-#include "librbd/io/ImageRequestWQ.h"
+#include "librbd/asio/ContextWQ.h"
 #include "librbd/io/ObjectDispatchSpec.h"
-#include "librbd/io/ObjectDispatcher.h"
+#include "librbd/io/ObjectDispatcherInterface.h"
 #include "librbd/journal/CreateRequest.h"
 #include "librbd/journal/DemoteRequest.h"
 #include "librbd/journal/ObjectDispatch.h"
@@ -42,15 +42,24 @@ using journal::util::C_DecodeTags;
 
 namespace {
 
-// TODO: once journaler is 100% async, remove separate threads and
-// reuse ImageCtx's thread pool
+// TODO: once journaler is 100% async and converted to ASIO, remove separate
+// threads and reuse librbd's AsioEngine
 class ThreadPoolSingleton : public ThreadPool {
 public:
+  ContextWQ *work_queue;
+
   explicit ThreadPoolSingleton(CephContext *cct)
-    : ThreadPool(cct, "librbd::Journal", "tp_librbd_journ", 1) {
+    : ThreadPool(cct, "librbd::Journal", "tp_librbd_journ", 1),
+      work_queue(new ContextWQ("librbd::journal::work_queue",
+                               ceph::make_timespan(
+                                 cct->_conf.get_val<uint64_t>("rbd_op_thread_timeout")),
+                               this)) {
     start();
   }
   ~ThreadPoolSingleton() override {
+    work_queue->drain();
+    delete work_queue;
+
     stop();
   }
 };
@@ -60,7 +69,7 @@ struct C_IsTagOwner : public Context {
   librados::IoCtx &io_ctx;
   std::string image_id;
   bool *is_tag_owner;
-  ContextWQ *op_work_queue;
+  asio::ContextWQ *op_work_queue;
   Context *on_finish;
 
   CephContext *cct = nullptr;
@@ -71,7 +80,8 @@ struct C_IsTagOwner : public Context {
   journal::TagData tag_data;
 
   C_IsTagOwner(librados::IoCtx &io_ctx, const std::string &image_id,
-               bool *is_tag_owner, ContextWQ *op_work_queue, Context *on_finish)
+               bool *is_tag_owner, asio::ContextWQ *op_work_queue,
+               Context *on_finish)
     : io_ctx(io_ctx), image_id(image_id), is_tag_owner(is_tag_owner),
       op_work_queue(op_work_queue), on_finish(on_finish),
       cct(reinterpret_cast<CephContext*>(io_ctx.cct())),
@@ -324,6 +334,24 @@ std::ostream &operator<<(std::ostream &os,
   return os;
 }
 
+
+template <typename I>
+void Journal<I>::MetadataListener::handle_update(::journal::JournalMetadata *) {
+  auto ctx = new LambdaContext([this](int r) {
+    journal->handle_metadata_updated();
+  });
+  journal->m_work_queue->queue(ctx, 0);
+}
+
+
+template <typename I>
+void Journal<I>::get_work_queue(CephContext *cct, ContextWQ **work_queue) {
+  auto thread_pool_singleton =
+    &cct->lookup_or_create_singleton_object<ThreadPoolSingleton>(
+      "librbd::journal::thread_pool", false, cct);
+  *work_queue = thread_pool_singleton->work_queue;
+}
+
 template <typename I>
 Journal<I>::Journal(I &image_ctx)
   : RefCountedObject(image_ctx.cct),
@@ -337,12 +365,7 @@ Journal<I>::Journal(I &image_ctx)
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 5) << this << ": ictx=" << &m_image_ctx << dendl;
 
-  auto thread_pool_singleton =
-    &cct->lookup_or_create_singleton_object<ThreadPoolSingleton>(
-      "librbd::journal::thread_pool", false, cct);
-  m_work_queue = new ContextWQ("librbd::journal::work_queue",
-                               cct->_conf.get_val<uint64_t>("rbd_op_thread_timeout"),
-                               thread_pool_singleton);
+  get_work_queue(cct, &m_work_queue);
   ImageCtx::get_timer_instance(cct, &m_timer, &m_timer_lock);
 }
 
@@ -350,7 +373,6 @@ template <typename I>
 Journal<I>::~Journal() {
   if (m_work_queue != nullptr) {
     m_work_queue->drain();
-    delete m_work_queue;
   }
 
   std::lock_guard locker{m_lock};
@@ -374,15 +396,14 @@ int Journal<I>::create(librados::IoCtx &io_ctx, const std::string &image_id,
   CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
   ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
 
-  ThreadPool *thread_pool;
-  ContextWQ *op_work_queue;
-  ImageCtx::get_thread_pool_instance(cct, &thread_pool, &op_work_queue);
+  ContextWQ *work_queue;
+  get_work_queue(cct, &work_queue);
 
   C_SaferCond cond;
   journal::TagData tag_data(LOCAL_MIRROR_UUID);
   journal::CreateRequest<I> *req = journal::CreateRequest<I>::create(
     io_ctx, image_id, order, splay_width, object_pool, cls::journal::Tag::TAG_CLASS_NEW,
-    tag_data, IMAGE_CLIENT_ID, op_work_queue, &cond);
+    tag_data, IMAGE_CLIENT_ID, work_queue, &cond);
   req->send();
 
   return cond.wait();
@@ -393,13 +414,12 @@ int Journal<I>::remove(librados::IoCtx &io_ctx, const std::string &image_id) {
   CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
   ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
 
-  ThreadPool *thread_pool;
-  ContextWQ *op_work_queue;
-  ImageCtx::get_thread_pool_instance(cct, &thread_pool, &op_work_queue);
+  ContextWQ *work_queue;
+  get_work_queue(cct, &work_queue);
 
   C_SaferCond cond;
   journal::RemoveRequest<I> *req = journal::RemoveRequest<I>::create(
-    io_ctx, image_id, IMAGE_CLIENT_ID, op_work_queue, &cond);
+    io_ctx, image_id, IMAGE_CLIENT_ID, work_queue, &cond);
   req->send();
 
   return cond.wait();
@@ -410,14 +430,13 @@ int Journal<I>::reset(librados::IoCtx &io_ctx, const std::string &image_id) {
   CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
   ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
 
-  ThreadPool *thread_pool;
-  ContextWQ *op_work_queue;
-  ImageCtx::get_thread_pool_instance(cct, &thread_pool, &op_work_queue);
+  ContextWQ *work_queue;
+  get_work_queue(cct, &work_queue);
 
   C_SaferCond cond;
   auto req = journal::ResetRequest<I>::create(io_ctx, image_id, IMAGE_CLIENT_ID,
                                               Journal<>::LOCAL_MIRROR_UUID,
-                                              op_work_queue, &cond);
+                                              work_queue, &cond);
   req->send();
 
   return cond.wait();
@@ -432,7 +451,8 @@ void Journal<I>::is_tag_owner(I *image_ctx, bool *owner,
 
 template <typename I>
 void Journal<I>::is_tag_owner(librados::IoCtx& io_ctx, std::string& image_id,
-                              bool *is_tag_owner, ContextWQ *op_work_queue,
+                              bool *is_tag_owner,
+                              asio::ContextWQ *op_work_queue,
                               Context *on_finish) {
   CephContext *cct = reinterpret_cast<CephContext*>(io_ctx.cct());
   ldout(cct, 20) << __func__ << dendl;
@@ -447,7 +467,8 @@ void Journal<I>::is_tag_owner(librados::IoCtx& io_ctx, std::string& image_id,
 template <typename I>
 void Journal<I>::get_tag_owner(IoCtx& io_ctx, std::string& image_id,
                                std::string *mirror_uuid,
-                               ContextWQ *op_work_queue, Context *on_finish) {
+                               asio::ContextWQ *op_work_queue,
+                               Context *on_finish) {
   CephContext *cct = static_cast<CephContext *>(io_ctx.cct());
   ldout(cct, 20) << __func__ << dendl;
 
@@ -572,7 +593,7 @@ void Journal<I>::open(Context *on_finish) {
   on_finish = create_async_context_callback(m_image_ctx, on_finish);
 
   // inject our handler into the object dispatcher chain
-  m_image_ctx.io_object_dispatcher->register_object_dispatch(
+  m_image_ctx.io_object_dispatcher->register_dispatch(
     journal::ObjectDispatch<I>::create(&m_image_ctx, this));
 
   std::lock_guard locker{m_lock};
@@ -593,7 +614,7 @@ void Journal<I>::close(Context *on_finish) {
       auto ctx = new LambdaContext([on_finish, r](int _) {
           on_finish->complete(r);
         });
-      m_image_ctx.io_object_dispatcher->shut_down_object_dispatch(
+      m_image_ctx.io_object_dispatcher->shut_down_dispatch(
         io::OBJECT_DISPATCH_LAYER_JOURNAL, ctx);
     });
   on_finish = create_async_context_callback(m_image_ctx, on_finish);
@@ -771,6 +792,49 @@ uint64_t Journal<I>::append_write_event(uint64_t offset, size_t length,
                           length, flush_entry, 0);
 }
 
+template <typename I>
+uint64_t Journal<I>::append_compare_and_write_event(uint64_t offset,
+                                                    size_t length,
+                                                    const bufferlist &cmp_bl,
+                                                    const bufferlist &write_bl,
+                                                    bool flush_entry) {
+  ceph_assert(
+    m_max_append_size > journal::AioCompareAndWriteEvent::get_fixed_size());
+  uint64_t max_compare_and_write_data_size =
+    m_max_append_size - journal::AioCompareAndWriteEvent::get_fixed_size();
+  // we need double the size because we store cmp and write buffers
+  max_compare_and_write_data_size /= 2;
+
+  // ensure that the compare and write event fits within the journal entry
+  Bufferlists bufferlists;
+  uint64_t bytes_remaining = length;
+  uint64_t event_offset = 0;
+  do {
+    uint64_t event_length = std::min(bytes_remaining,
+                                     max_compare_and_write_data_size);
+
+    bufferlist event_cmp_bl;
+    event_cmp_bl.substr_of(cmp_bl, event_offset, event_length);
+    bufferlist event_write_bl;
+    event_write_bl.substr_of(write_bl, event_offset, event_length);
+    journal::EventEntry event_entry(
+      journal::AioCompareAndWriteEvent(offset + event_offset,
+                                       event_length,
+                                       event_cmp_bl,
+                                       event_write_bl),
+      ceph_clock_now());
+
+    bufferlists.emplace_back();
+    encode(event_entry, bufferlists.back());
+
+    event_offset += event_length;
+    bytes_remaining -= event_length;
+  } while (bytes_remaining > 0);
+
+  return append_io_events(journal::EVENT_TYPE_AIO_COMPARE_AND_WRITE,
+                          bufferlists, offset, length, flush_entry, -EILSEQ);
+}
+
 template <typename I>
 uint64_t Journal<I>::append_io_event(journal::EventEntry &&event_entry,
                                      uint64_t offset, size_t length,
@@ -1099,7 +1163,7 @@ void Journal<I>::create_journaler() {
     m_image_ctx.config.template get_val<uint64_t>("rbd_journal_max_concurrent_object_sets");
   // TODO: a configurable filter to exclude certain peers from being
   // disconnected.
-  settings.whitelisted_laggy_clients = {IMAGE_CLIENT_ID};
+  settings.ignored_laggy_clients = {IMAGE_CLIENT_ID};
 
   m_journaler = new Journaler(m_work_queue, m_timer, m_timer_lock,
                              m_image_ctx.md_ctx, m_image_ctx.id,
@@ -1579,7 +1643,9 @@ void Journal<I>::transition_state(State state, int r) {
   }
 
   if (is_steady_state()) {
-    Contexts wait_for_state_contexts(std::move(m_wait_for_state_contexts));
+    auto wait_for_state_contexts(std::move(m_wait_for_state_contexts));
+    m_wait_for_state_contexts.clear();
+
     for (auto ctx : wait_for_state_contexts) {
       ctx->complete(m_error_result);
     }
@@ -1645,7 +1711,7 @@ int Journal<I>::check_resync_requested(bool *do_resync) {
     decode(client_data, bl_it);
   } catch (const buffer::error &err) {
     lderr(cct) << this << " " << __func__ << ": "
-               << "failed to decode client data: " << err << dendl;
+               << "failed to decode client data: " << err.what() << dendl;
     return -EINVAL;
   }