#include "journal/Settings.h"
#include "journal/Utils.h"
#include "librbd/ImageCtx.h"
-#include "librbd/io/ImageRequestWQ.h"
+#include "librbd/asio/ContextWQ.h"
#include "librbd/io/ObjectDispatchSpec.h"
-#include "librbd/io/ObjectDispatcher.h"
+#include "librbd/io/ObjectDispatcherInterface.h"
#include "librbd/journal/CreateRequest.h"
#include "librbd/journal/DemoteRequest.h"
#include "librbd/journal/ObjectDispatch.h"
namespace {
-// TODO: once journaler is 100% async, remove separate threads and
-// reuse ImageCtx's thread pool
+// TODO: once journaler is 100% async and converted to ASIO, remove separate
+// threads and reuse librbd's AsioEngine
class ThreadPoolSingleton : public ThreadPool {
public:
+ ContextWQ *work_queue;
+
explicit ThreadPoolSingleton(CephContext *cct)
- : ThreadPool(cct, "librbd::Journal", "tp_librbd_journ", 1) {
+ : ThreadPool(cct, "librbd::Journal", "tp_librbd_journ", 1),
+ work_queue(new ContextWQ("librbd::journal::work_queue",
+ ceph::make_timespan(
+ cct->_conf.get_val<uint64_t>("rbd_op_thread_timeout")),
+ this)) {
start();
}
~ThreadPoolSingleton() override {
+ work_queue->drain();
+ delete work_queue;
+
stop();
}
};
librados::IoCtx &io_ctx;
std::string image_id;
bool *is_tag_owner;
- ContextWQ *op_work_queue;
+ asio::ContextWQ *op_work_queue;
Context *on_finish;
CephContext *cct = nullptr;
journal::TagData tag_data;
C_IsTagOwner(librados::IoCtx &io_ctx, const std::string &image_id,
- bool *is_tag_owner, ContextWQ *op_work_queue, Context *on_finish)
+ bool *is_tag_owner, asio::ContextWQ *op_work_queue,
+ Context *on_finish)
: io_ctx(io_ctx), image_id(image_id), is_tag_owner(is_tag_owner),
op_work_queue(op_work_queue), on_finish(on_finish),
cct(reinterpret_cast<CephContext*>(io_ctx.cct())),
return os;
}
+
+template <typename I>
+void Journal<I>::MetadataListener::handle_update(::journal::JournalMetadata *) {
+ auto ctx = new LambdaContext([this](int r) {
+ journal->handle_metadata_updated();
+ });
+ journal->m_work_queue->queue(ctx, 0);
+}
+
+
+template <typename I>
+void Journal<I>::get_work_queue(CephContext *cct, ContextWQ **work_queue) {
+ auto thread_pool_singleton =
+ &cct->lookup_or_create_singleton_object<ThreadPoolSingleton>(
+ "librbd::journal::thread_pool", false, cct);
+ *work_queue = thread_pool_singleton->work_queue;
+}
+
template <typename I>
Journal<I>::Journal(I &image_ctx)
: RefCountedObject(image_ctx.cct),
CephContext *cct = m_image_ctx.cct;
ldout(cct, 5) << this << ": ictx=" << &m_image_ctx << dendl;
- auto thread_pool_singleton =
- &cct->lookup_or_create_singleton_object<ThreadPoolSingleton>(
- "librbd::journal::thread_pool", false, cct);
- m_work_queue = new ContextWQ("librbd::journal::work_queue",
- cct->_conf.get_val<uint64_t>("rbd_op_thread_timeout"),
- thread_pool_singleton);
+ get_work_queue(cct, &m_work_queue);
ImageCtx::get_timer_instance(cct, &m_timer, &m_timer_lock);
}
Journal<I>::~Journal() {
if (m_work_queue != nullptr) {
m_work_queue->drain();
- delete m_work_queue;
}
std::lock_guard locker{m_lock};
CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
- ThreadPool *thread_pool;
- ContextWQ *op_work_queue;
- ImageCtx::get_thread_pool_instance(cct, &thread_pool, &op_work_queue);
+ ContextWQ *work_queue;
+ get_work_queue(cct, &work_queue);
C_SaferCond cond;
journal::TagData tag_data(LOCAL_MIRROR_UUID);
journal::CreateRequest<I> *req = journal::CreateRequest<I>::create(
io_ctx, image_id, order, splay_width, object_pool, cls::journal::Tag::TAG_CLASS_NEW,
- tag_data, IMAGE_CLIENT_ID, op_work_queue, &cond);
+ tag_data, IMAGE_CLIENT_ID, work_queue, &cond);
req->send();
return cond.wait();
CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
- ThreadPool *thread_pool;
- ContextWQ *op_work_queue;
- ImageCtx::get_thread_pool_instance(cct, &thread_pool, &op_work_queue);
+ ContextWQ *work_queue;
+ get_work_queue(cct, &work_queue);
C_SaferCond cond;
journal::RemoveRequest<I> *req = journal::RemoveRequest<I>::create(
- io_ctx, image_id, IMAGE_CLIENT_ID, op_work_queue, &cond);
+ io_ctx, image_id, IMAGE_CLIENT_ID, work_queue, &cond);
req->send();
return cond.wait();
CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
- ThreadPool *thread_pool;
- ContextWQ *op_work_queue;
- ImageCtx::get_thread_pool_instance(cct, &thread_pool, &op_work_queue);
+ ContextWQ *work_queue;
+ get_work_queue(cct, &work_queue);
C_SaferCond cond;
auto req = journal::ResetRequest<I>::create(io_ctx, image_id, IMAGE_CLIENT_ID,
Journal<>::LOCAL_MIRROR_UUID,
- op_work_queue, &cond);
+ work_queue, &cond);
req->send();
return cond.wait();
template <typename I>
void Journal<I>::is_tag_owner(librados::IoCtx& io_ctx, std::string& image_id,
- bool *is_tag_owner, ContextWQ *op_work_queue,
+ bool *is_tag_owner,
+ asio::ContextWQ *op_work_queue,
Context *on_finish) {
CephContext *cct = reinterpret_cast<CephContext*>(io_ctx.cct());
ldout(cct, 20) << __func__ << dendl;
template <typename I>
void Journal<I>::get_tag_owner(IoCtx& io_ctx, std::string& image_id,
std::string *mirror_uuid,
- ContextWQ *op_work_queue, Context *on_finish) {
+ asio::ContextWQ *op_work_queue,
+ Context *on_finish) {
CephContext *cct = static_cast<CephContext *>(io_ctx.cct());
ldout(cct, 20) << __func__ << dendl;
on_finish = create_async_context_callback(m_image_ctx, on_finish);
// inject our handler into the object dispatcher chain
- m_image_ctx.io_object_dispatcher->register_object_dispatch(
+ m_image_ctx.io_object_dispatcher->register_dispatch(
journal::ObjectDispatch<I>::create(&m_image_ctx, this));
std::lock_guard locker{m_lock};
auto ctx = new LambdaContext([on_finish, r](int _) {
on_finish->complete(r);
});
- m_image_ctx.io_object_dispatcher->shut_down_object_dispatch(
+ m_image_ctx.io_object_dispatcher->shut_down_dispatch(
io::OBJECT_DISPATCH_LAYER_JOURNAL, ctx);
});
on_finish = create_async_context_callback(m_image_ctx, on_finish);
length, flush_entry, 0);
}
+template <typename I>
+uint64_t Journal<I>::append_compare_and_write_event(uint64_t offset,
+ size_t length,
+ const bufferlist &cmp_bl,
+ const bufferlist &write_bl,
+ bool flush_entry) {
+ ceph_assert(
+ m_max_append_size > journal::AioCompareAndWriteEvent::get_fixed_size());
+ uint64_t max_compare_and_write_data_size =
+ m_max_append_size - journal::AioCompareAndWriteEvent::get_fixed_size();
+ // we need double the size because we store cmp and write buffers
+ max_compare_and_write_data_size /= 2;
+
+ // ensure that the compare and write event fits within the journal entry
+ Bufferlists bufferlists;
+ uint64_t bytes_remaining = length;
+ uint64_t event_offset = 0;
+ do {
+ uint64_t event_length = std::min(bytes_remaining,
+ max_compare_and_write_data_size);
+
+ bufferlist event_cmp_bl;
+ event_cmp_bl.substr_of(cmp_bl, event_offset, event_length);
+ bufferlist event_write_bl;
+ event_write_bl.substr_of(write_bl, event_offset, event_length);
+ journal::EventEntry event_entry(
+ journal::AioCompareAndWriteEvent(offset + event_offset,
+ event_length,
+ event_cmp_bl,
+ event_write_bl),
+ ceph_clock_now());
+
+ bufferlists.emplace_back();
+ encode(event_entry, bufferlists.back());
+
+ event_offset += event_length;
+ bytes_remaining -= event_length;
+ } while (bytes_remaining > 0);
+
+ return append_io_events(journal::EVENT_TYPE_AIO_COMPARE_AND_WRITE,
+ bufferlists, offset, length, flush_entry, -EILSEQ);
+}
+
template <typename I>
uint64_t Journal<I>::append_io_event(journal::EventEntry &&event_entry,
uint64_t offset, size_t length,
m_image_ctx.config.template get_val<uint64_t>("rbd_journal_max_concurrent_object_sets");
// TODO: a configurable filter to exclude certain peers from being
// disconnected.
- settings.whitelisted_laggy_clients = {IMAGE_CLIENT_ID};
+ settings.ignored_laggy_clients = {IMAGE_CLIENT_ID};
m_journaler = new Journaler(m_work_queue, m_timer, m_timer_lock,
m_image_ctx.md_ctx, m_image_ctx.id,
}
if (is_steady_state()) {
- Contexts wait_for_state_contexts(std::move(m_wait_for_state_contexts));
+ auto wait_for_state_contexts(std::move(m_wait_for_state_contexts));
+ m_wait_for_state_contexts.clear();
+
for (auto ctx : wait_for_state_contexts) {
ctx->complete(m_error_result);
}
decode(client_data, bl_it);
} catch (const buffer::error &err) {
lderr(cct) << this << " " << __func__ << ": "
- << "failed to decode client data: " << err << dendl;
+ << "failed to decode client data: " << err.what() << dendl;
return -EINVAL;
}