#include "librbd/Journal.h"
#include "include/rados/librados.hpp"
+#include "common/AsyncOpTracker.h"
#include "common/errno.h"
#include "common/Timer.h"
#include "common/WorkQueue.h"
#include "journal/ReplayEntry.h"
#include "journal/Settings.h"
#include "journal/Utils.h"
-#include "librbd/ExclusiveLock.h"
#include "librbd/ImageCtx.h"
-#include "librbd/io/ImageRequestWQ.h"
-#include "librbd/io/ObjectRequest.h"
+#include "librbd/asio/ContextWQ.h"
+#include "librbd/io/ObjectDispatchSpec.h"
+#include "librbd/io/ObjectDispatcherInterface.h"
#include "librbd/journal/CreateRequest.h"
#include "librbd/journal/DemoteRequest.h"
+#include "librbd/journal/ObjectDispatch.h"
#include "librbd/journal/OpenRequest.h"
#include "librbd/journal/RemoveRequest.h"
+#include "librbd/journal/ResetRequest.h"
#include "librbd/journal/Replay.h"
#include "librbd/journal/PromoteRequest.h"
namespace {
-// TODO: once journaler is 100% async, remove separate threads and
-// reuse ImageCtx's thread pool
+// TODO: once journaler is 100% async and converted to ASIO, remove separate
+// threads and reuse librbd's AsioEngine
class ThreadPoolSingleton : public ThreadPool {
public:
+ ContextWQ *work_queue;
+
explicit ThreadPoolSingleton(CephContext *cct)
- : ThreadPool(cct, "librbd::Journal", "tp_librbd_journ", 1) {
+ : ThreadPool(cct, "librbd::Journal", "tp_librbd_journ", 1),
+ work_queue(new ContextWQ("librbd::journal::work_queue",
+ ceph::make_timespan(
+ cct->_conf.get_val<uint64_t>("rbd_op_thread_timeout")),
+ this)) {
start();
}
~ThreadPoolSingleton() override {
+ work_queue->drain();
+ delete work_queue;
+
stop();
}
};
librados::IoCtx &io_ctx;
std::string image_id;
bool *is_tag_owner;
- ContextWQ *op_work_queue;
+ asio::ContextWQ *op_work_queue;
Context *on_finish;
CephContext *cct = nullptr;
Journaler *journaler;
cls::journal::Client client;
journal::ImageClientMeta client_meta;
- uint64_t tag_tid;
+ uint64_t tag_tid = 0;
journal::TagData tag_data;
C_IsTagOwner(librados::IoCtx &io_ctx, const std::string &image_id,
- bool *is_tag_owner, ContextWQ *op_work_queue, Context *on_finish)
+ bool *is_tag_owner, asio::ContextWQ *op_work_queue,
+ Context *on_finish)
: io_ctx(io_ctx), image_id(image_id), is_tag_owner(is_tag_owner),
op_work_queue(op_work_queue), on_finish(on_finish),
cct(reinterpret_cast<CephContext*>(io_ctx.cct())),
journaler(new Journaler(io_ctx, image_id, Journal<>::IMAGE_CLIENT_ID,
- {})) {
+ {}, nullptr)) {
}
void finish(int r) override {
Journaler *journaler = this->journaler;
Context *on_finish = this->on_finish;
- FunctionContext *ctx = new FunctionContext(
+ auto ctx = new LambdaContext(
[journaler, on_finish](int r) {
on_finish->complete(r);
delete journaler;
Journaler journaler;
cls::journal::Client client;
journal::ImageClientMeta client_meta;
- uint64_t tag_tid;
+ uint64_t tag_tid = 0;
journal::TagData tag_data;
C_GetTagOwner(librados::IoCtx &io_ctx, const std::string &image_id,
std::string *mirror_uuid, Context *on_finish)
: mirror_uuid(mirror_uuid), on_finish(on_finish),
- journaler(io_ctx, image_id, Journal<>::IMAGE_CLIENT_ID, {}) {
+ journaler(io_ctx, image_id, Journal<>::IMAGE_CLIENT_ID, {}, nullptr) {
}
virtual void finish(int r) {
journal::TagData *tag_data;
Context *on_finish;
- Mutex lock;
+ ceph::mutex lock = ceph::make_mutex("lock");
GetTagsRequest(CephContext *cct, J *journaler, cls::journal::Client *client,
journal::ImageClientMeta *client_meta, uint64_t *tag_tid,
journal::TagData *tag_data, Context *on_finish)
: cct(cct), journaler(journaler), client(client), client_meta(client_meta),
- tag_tid(tag_tid), tag_data(tag_data), on_finish(on_finish), lock("lock") {
+ tag_tid(tag_tid), tag_data(tag_data), on_finish(on_finish) {
}
/**
void send_get_client() {
ldout(cct, 20) << __func__ << dendl;
- FunctionContext *ctx = new FunctionContext(
+ auto ctx = new LambdaContext(
[this](int r) {
handle_get_client(r);
});
}
librbd::journal::ClientData client_data;
- bufferlist::iterator bl_it = client->data.begin();
+ auto bl_it = client->data.cbegin();
try {
- ::decode(client_data, bl_it);
+ decode(client_data, bl_it);
} catch (const buffer::error &err) {
lderr(cct) << this << " OpenJournalerRequest::" << __func__ << ": "
<< "failed to decode client data" << dendl;
void send_get_tags() {
ldout(cct, 20) << __func__ << dendl;
- FunctionContext *ctx = new FunctionContext(
+ auto ctx = new LambdaContext(
[this](int r) {
handle_get_tags(r);
});
tag_data.predecessor = predecessor;
bufferlist tag_bl;
- ::encode(tag_data, tag_bl);
+ encode(tag_data, tag_bl);
C_SaferCond allocate_tag_ctx;
journaler->allocate_tag(tag_class, tag_bl, new_tag, &allocate_tag_ctx);
return os;
}
+
+template <typename I>
+void Journal<I>::MetadataListener::handle_update(::journal::JournalMetadata *) {
+ auto ctx = new LambdaContext([this](int r) {
+ journal->handle_metadata_updated();
+ });
+ journal->m_work_queue->queue(ctx, 0);
+}
+
+
+template <typename I>
+void Journal<I>::get_work_queue(CephContext *cct, ContextWQ **work_queue) {
+ auto thread_pool_singleton =
+ &cct->lookup_or_create_singleton_object<ThreadPoolSingleton>(
+ "librbd::journal::thread_pool", false, cct);
+ *work_queue = thread_pool_singleton->work_queue;
+}
+
template <typename I>
Journal<I>::Journal(I &image_ctx)
- : m_image_ctx(image_ctx), m_journaler(NULL),
- m_lock("Journal<I>::m_lock"), m_state(STATE_UNINITIALIZED),
+ : RefCountedObject(image_ctx.cct),
+ m_image_ctx(image_ctx), m_journaler(NULL),
+ m_state(STATE_UNINITIALIZED),
m_error_result(0), m_replay_handler(this), m_close_pending(false),
- m_event_lock("Journal<I>::m_event_lock"), m_event_tid(0),
+ m_event_tid(0),
m_blocking_writes(false), m_journal_replay(NULL),
m_metadata_listener(this) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 5) << this << ": ictx=" << &m_image_ctx << dendl;
- ThreadPoolSingleton *thread_pool_singleton;
- cct->lookup_or_create_singleton_object<ThreadPoolSingleton>(
- thread_pool_singleton, "librbd::journal::thread_pool");
- m_work_queue = new ContextWQ("librbd::journal::work_queue",
- cct->_conf->get_val<int64_t>("rbd_op_thread_timeout"),
- thread_pool_singleton);
+ get_work_queue(cct, &m_work_queue);
ImageCtx::get_timer_instance(cct, &m_timer, &m_timer_lock);
}
Journal<I>::~Journal() {
if (m_work_queue != nullptr) {
m_work_queue->drain();
- delete m_work_queue;
}
- assert(m_state == STATE_UNINITIALIZED || m_state == STATE_CLOSED);
- assert(m_journaler == NULL);
- assert(m_journal_replay == NULL);
- assert(m_wait_for_state_contexts.empty());
+ std::lock_guard locker{m_lock};
+ ceph_assert(m_state == STATE_UNINITIALIZED || m_state == STATE_CLOSED);
+ ceph_assert(m_journaler == NULL);
+ ceph_assert(m_journal_replay == NULL);
+ ceph_assert(m_wait_for_state_contexts.empty());
}
template <typename I>
bool Journal<I>::is_journal_supported(I &image_ctx) {
- assert(image_ctx.snap_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(image_ctx.image_lock));
return ((image_ctx.features & RBD_FEATURE_JOURNALING) &&
!image_ctx.read_only && image_ctx.snap_id == CEPH_NOSNAP);
}
CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
- ThreadPool *thread_pool;
- ContextWQ *op_work_queue;
- ImageCtx::get_thread_pool_instance(cct, &thread_pool, &op_work_queue);
+ ContextWQ *work_queue;
+ get_work_queue(cct, &work_queue);
C_SaferCond cond;
journal::TagData tag_data(LOCAL_MIRROR_UUID);
journal::CreateRequest<I> *req = journal::CreateRequest<I>::create(
io_ctx, image_id, order, splay_width, object_pool, cls::journal::Tag::TAG_CLASS_NEW,
- tag_data, IMAGE_CLIENT_ID, op_work_queue, &cond);
+ tag_data, IMAGE_CLIENT_ID, work_queue, &cond);
req->send();
return cond.wait();
CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
- ThreadPool *thread_pool;
- ContextWQ *op_work_queue;
- ImageCtx::get_thread_pool_instance(cct, &thread_pool, &op_work_queue);
+ ContextWQ *work_queue;
+ get_work_queue(cct, &work_queue);
C_SaferCond cond;
journal::RemoveRequest<I> *req = journal::RemoveRequest<I>::create(
- io_ctx, image_id, IMAGE_CLIENT_ID, op_work_queue, &cond);
+ io_ctx, image_id, IMAGE_CLIENT_ID, work_queue, &cond);
req->send();
return cond.wait();
CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
- Journaler journaler(io_ctx, image_id, IMAGE_CLIENT_ID, {});
+ ContextWQ *work_queue;
+ get_work_queue(cct, &work_queue);
C_SaferCond cond;
- journaler.init(&cond);
- BOOST_SCOPE_EXIT_ALL(&journaler) {
- journaler.shut_down();
- };
-
- int r = cond.wait();
- if (r == -ENOENT) {
- return 0;
- } else if (r < 0) {
- lderr(cct) << __func__ << ": "
- << "failed to initialize journal: " << cpp_strerror(r) << dendl;
- return r;
- }
-
- uint8_t order, splay_width;
- int64_t pool_id;
- journaler.get_metadata(&order, &splay_width, &pool_id);
-
- std::string pool_name;
- if (pool_id != -1) {
- librados::Rados rados(io_ctx);
- r = rados.pool_reverse_lookup(pool_id, &pool_name);
- if (r < 0) {
- lderr(cct) << __func__ << ": "
- << "failed to lookup data pool: " << cpp_strerror(r) << dendl;
- return r;
- }
- }
-
- C_SaferCond ctx1;
- journaler.remove(true, &ctx1);
- r = ctx1.wait();
- if (r < 0) {
- lderr(cct) << __func__ << ": "
- << "failed to reset journal: " << cpp_strerror(r) << dendl;
- return r;
- }
+ auto req = journal::ResetRequest<I>::create(io_ctx, image_id, IMAGE_CLIENT_ID,
+ Journal<>::LOCAL_MIRROR_UUID,
+ work_queue, &cond);
+ req->send();
- r = create(io_ctx, image_id, order, splay_width, pool_name);
- if (r < 0) {
- lderr(cct) << __func__ << ": "
- << "failed to create journal: " << cpp_strerror(r) << dendl;
- return r;
- }
- return 0;
+ return cond.wait();
}
template <typename I>
template <typename I>
void Journal<I>::is_tag_owner(librados::IoCtx& io_ctx, std::string& image_id,
- bool *is_tag_owner, ContextWQ *op_work_queue,
+ bool *is_tag_owner,
+ asio::ContextWQ *op_work_queue,
Context *on_finish) {
CephContext *cct = reinterpret_cast<CephContext*>(io_ctx.cct());
ldout(cct, 20) << __func__ << dendl;
template <typename I>
void Journal<I>::get_tag_owner(IoCtx& io_ctx, std::string& image_id,
std::string *mirror_uuid,
- ContextWQ *op_work_queue, Context *on_finish) {
- CephContext *cct = (CephContext *)io_ctx.cct();
+ asio::ContextWQ *op_work_queue,
+ Context *on_finish) {
+ CephContext *cct = static_cast<CephContext *>(io_ctx.cct());
ldout(cct, 20) << __func__ << dendl;
auto ctx = new C_GetTagOwner(io_ctx, image_id, mirror_uuid, on_finish);
CephContext *cct = image_ctx->cct;
ldout(cct, 20) << __func__ << dendl;
- Journaler journaler(image_ctx->md_ctx, image_ctx->id, IMAGE_CLIENT_ID, {});
+ Journaler journaler(image_ctx->md_ctx, image_ctx->id, IMAGE_CLIENT_ID, {},
+ nullptr);
- Mutex lock("lock");
+ ceph::mutex lock = ceph::make_mutex("lock");
journal::ImageClientMeta client_meta;
uint64_t tag_tid;
journal::TagData tag_data;
journal::ClientData client_data(client_meta);
bufferlist client_data_bl;
- ::encode(client_data, client_data_bl);
+ encode(client_data, client_data_bl);
C_SaferCond update_client_ctx;
journaler.update_client(client_data_bl, &update_client_ctx);
template <typename I>
bool Journal<I>::is_journal_ready() const {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
return (m_state == STATE_READY);
}
template <typename I>
bool Journal<I>::is_journal_replaying() const {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
return is_journal_replaying(m_lock);
}
template <typename I>
-bool Journal<I>::is_journal_replaying(const Mutex &) const {
- assert(m_lock.is_locked());
+bool Journal<I>::is_journal_replaying(const ceph::mutex &) const {
+ ceph_assert(ceph_mutex_is_locked(m_lock));
return (m_state == STATE_REPLAYING ||
m_state == STATE_FLUSHING_REPLAY ||
m_state == STATE_FLUSHING_RESTART ||
template <typename I>
bool Journal<I>::is_journal_appending() const {
- assert(m_image_ctx.snap_lock.is_locked());
- Mutex::Locker locker(m_lock);
+ ceph_assert(ceph_mutex_is_locked(m_image_ctx.image_lock));
+ std::lock_guard locker{m_lock};
return (m_state == STATE_READY &&
!m_image_ctx.get_journal_policy()->append_disabled());
}
void Journal<I>::wait_for_journal_ready(Context *on_ready) {
on_ready = create_async_context_callback(m_image_ctx, on_ready);
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
if (m_state == STATE_READY) {
on_ready->complete(m_error_result);
} else {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << dendl;
+ on_finish = create_context_callback<Context>(on_finish, this);
+
on_finish = create_async_context_callback(m_image_ctx, on_finish);
- Mutex::Locker locker(m_lock);
- assert(m_state == STATE_UNINITIALIZED);
+ // inject our handler into the object dispatcher chain
+ m_image_ctx.io_object_dispatcher->register_dispatch(
+ journal::ObjectDispatch<I>::create(&m_image_ctx, this));
+
+ std::lock_guard locker{m_lock};
+ ceph_assert(m_state == STATE_UNINITIALIZED);
wait_for_steady_state(on_finish);
create_journaler();
}
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << dendl;
+ on_finish = create_context_callback<Context>(on_finish, this);
+
+ on_finish = new LambdaContext([this, on_finish](int r) {
+ // remove our handler from object dispatcher chain - preserve error
+ auto ctx = new LambdaContext([on_finish, r](int _) {
+ on_finish->complete(r);
+ });
+ m_image_ctx.io_object_dispatcher->shut_down_dispatch(
+ io::OBJECT_DISPATCH_LAYER_JOURNAL, ctx);
+ });
on_finish = create_async_context_callback(m_image_ctx, on_finish);
- Mutex::Locker locker(m_lock);
- while (m_listener_notify) {
- m_listener_cond.Wait(m_lock);
- }
+ std::unique_lock locker{m_lock};
+ m_listener_cond.wait(locker, [this] { return !m_listener_notify; });
Listeners listeners(m_listeners);
m_listener_notify = true;
- m_lock.Unlock();
+ locker.unlock();
for (auto listener : listeners) {
listener->handle_close();
}
- m_lock.Lock();
+ locker.lock();
m_listener_notify = false;
- m_listener_cond.Signal();
+ m_listener_cond.notify_all();
- assert(m_state != STATE_UNINITIALIZED);
+ ceph_assert(m_state != STATE_UNINITIALIZED);
if (m_state == STATE_CLOSED) {
on_finish->complete(m_error_result);
return;
template <typename I>
bool Journal<I>::is_tag_owner() const {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
return is_tag_owner(m_lock);
}
template <typename I>
-bool Journal<I>::is_tag_owner(const Mutex &) const {
- assert(m_lock.is_locked());
+bool Journal<I>::is_tag_owner(const ceph::mutex &) const {
+ ceph_assert(ceph_mutex_is_locked(m_lock));
return (m_tag_data.mirror_uuid == LOCAL_MIRROR_UUID);
}
template <typename I>
uint64_t Journal<I>::get_tag_tid() const {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
return m_tag_tid;
}
template <typename I>
journal::TagData Journal<I>::get_tag_data() const {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
return m_tag_data;
}
journal::TagPredecessor predecessor;
predecessor.mirror_uuid = LOCAL_MIRROR_UUID;
{
- Mutex::Locker locker(m_lock);
- assert(m_journaler != nullptr && is_tag_owner(m_lock));
+ std::lock_guard locker{m_lock};
+ ceph_assert(m_journaler != nullptr && is_tag_owner(m_lock));
cls::journal::Client client;
int r = m_journaler->get_cached_client(IMAGE_CLIENT_ID, &client);
// since we are primary, populate the predecessor with our known commit
// position
- assert(m_tag_data.mirror_uuid == LOCAL_MIRROR_UUID);
+ ceph_assert(m_tag_data.mirror_uuid == LOCAL_MIRROR_UUID);
if (!client.commit_position.object_positions.empty()) {
auto position = client.commit_position.object_positions.front();
predecessor.commit_valid = true;
ldout(cct, 20) << this << " " << __func__ << ": mirror_uuid=" << mirror_uuid
<< dendl;
- Mutex::Locker locker(m_lock);
- assert(m_journaler != nullptr);
+ std::lock_guard locker{m_lock};
+ ceph_assert(m_journaler != nullptr);
journal::TagData tag_data;
tag_data.mirror_uuid = mirror_uuid;
tag_data.predecessor = predecessor;
bufferlist tag_bl;
- ::encode(tag_data, tag_bl);
+ encode(tag_data, tag_bl);
C_DecodeTag *decode_tag_ctx = new C_DecodeTag(cct, &m_lock, &m_tag_tid,
&m_tag_data, on_finish);
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << dendl;
- Mutex::Locker locker(m_lock);
- assert(m_journaler != nullptr);
+ std::lock_guard locker{m_lock};
+ ceph_assert(m_journaler != nullptr);
m_journaler->flush_commit_position(on_finish);
}
+template <typename I>
+void Journal<I>::user_flushed() {
+ if (m_state == STATE_READY && !m_user_flushed.exchange(true) &&
+ m_image_ctx.config.template get_val<bool>("rbd_journal_object_writethrough_until_flush")) {
+ std::lock_guard locker{m_lock};
+ if (m_state == STATE_READY) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 5) << this << " " << __func__ << dendl;
+
+ ceph_assert(m_journaler != nullptr);
+ m_journaler->set_append_batch_options(
+ m_image_ctx.config.template get_val<uint64_t>("rbd_journal_object_flush_interval"),
+ m_image_ctx.config.template get_val<Option::size_t>("rbd_journal_object_flush_bytes"),
+ m_image_ctx.config.template get_val<double>("rbd_journal_object_flush_age"));
+ } else {
+ m_user_flushed = false;
+ }
+ }
+}
+
template <typename I>
uint64_t Journal<I>::append_write_event(uint64_t offset, size_t length,
const bufferlist &bl,
- const IOObjectRequests &requests,
bool flush_entry) {
- assert(m_max_append_size > journal::AioWriteEvent::get_fixed_size());
+ ceph_assert(m_max_append_size > journal::AioWriteEvent::get_fixed_size());
uint64_t max_write_data_size =
m_max_append_size - journal::AioWriteEvent::get_fixed_size();
uint64_t bytes_remaining = length;
uint64_t event_offset = 0;
do {
- uint64_t event_length = MIN(bytes_remaining, max_write_data_size);
+ uint64_t event_length = std::min(bytes_remaining, max_write_data_size);
bufferlist event_bl;
event_bl.substr_of(bl, event_offset, event_length);
ceph_clock_now());
bufferlists.emplace_back();
- ::encode(event_entry, bufferlists.back());
+ encode(event_entry, bufferlists.back());
event_offset += event_length;
bytes_remaining -= event_length;
} while (bytes_remaining > 0);
- return append_io_events(journal::EVENT_TYPE_AIO_WRITE, bufferlists, requests,
- offset, length, flush_entry);
+ return append_io_events(journal::EVENT_TYPE_AIO_WRITE, bufferlists, offset,
+ length, flush_entry, 0);
}
template <typename I>
uint64_t Journal<I>::append_io_event(journal::EventEntry &&event_entry,
- const IOObjectRequests &requests,
uint64_t offset, size_t length,
- bool flush_entry) {
+ bool flush_entry, int filter_ret_val) {
bufferlist bl;
event_entry.timestamp = ceph_clock_now();
- ::encode(event_entry, bl);
- return append_io_events(event_entry.get_event_type(), {bl}, requests, offset,
- length, flush_entry);
+ encode(event_entry, bl);
+ return append_io_events(event_entry.get_event_type(), {bl}, offset, length,
+ flush_entry, filter_ret_val);
}
template <typename I>
uint64_t Journal<I>::append_io_events(journal::EventType event_type,
const Bufferlists &bufferlists,
- const IOObjectRequests &requests,
uint64_t offset, size_t length,
- bool flush_entry) {
- assert(!bufferlists.empty());
+ bool flush_entry, int filter_ret_val) {
+ ceph_assert(!bufferlists.empty());
uint64_t tid;
{
- Mutex::Locker locker(m_lock);
- assert(m_state == STATE_READY);
+ std::lock_guard locker{m_lock};
+ ceph_assert(m_state == STATE_READY);
tid = ++m_event_tid;
- assert(tid != 0);
+ ceph_assert(tid != 0);
}
Futures futures;
for (auto &bl : bufferlists) {
- assert(bl.length() <= m_max_append_size);
+ ceph_assert(bl.length() <= m_max_append_size);
futures.push_back(m_journaler->append(m_tag_tid, bl));
}
{
- Mutex::Locker event_locker(m_event_lock);
- m_events[tid] = Event(futures, requests, offset, length);
+ std::lock_guard event_locker{m_event_lock};
+ m_events[tid] = Event(futures, offset, length, filter_ret_val);
}
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": "
<< "event=" << event_type << ", "
- << "new_reqs=" << requests.size() << ", "
<< "offset=" << offset << ", "
<< "length=" << length << ", "
<< "flush=" << flush_entry << ", tid=" << tid << dendl;
ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
"r=" << r << dendl;
- Mutex::Locker event_locker(m_event_lock);
+ std::lock_guard event_locker{m_event_lock};
typename Events::iterator it = m_events.find(tid);
if (it == m_events.end()) {
return;
template <typename I>
void Journal<I>::commit_io_event_extent(uint64_t tid, uint64_t offset,
uint64_t length, int r) {
- assert(length > 0);
+ ceph_assert(length > 0);
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
<< "length=" << length << ", "
<< "r=" << r << dendl;
- Mutex::Locker event_locker(m_event_lock);
+ std::lock_guard event_locker{m_event_lock};
typename Events::iterator it = m_events.find(tid);
if (it == m_events.end()) {
return;
void Journal<I>::append_op_event(uint64_t op_tid,
journal::EventEntry &&event_entry,
Context *on_safe) {
- assert(m_image_ctx.owner_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
bufferlist bl;
event_entry.timestamp = ceph_clock_now();
- ::encode(event_entry, bl);
+ encode(event_entry, bl);
Future future;
{
- Mutex::Locker locker(m_lock);
- assert(m_state == STATE_READY);
+ std::lock_guard locker{m_lock};
+ ceph_assert(m_state == STATE_READY);
future = m_journaler->append(m_tag_tid, bl);
// delay committing op event to ensure consistent replay
- assert(m_op_futures.count(op_tid) == 0);
+ ceph_assert(m_op_futures.count(op_tid) == 0);
m_op_futures[op_tid] = future;
}
on_safe = create_async_context_callback(m_image_ctx, on_safe);
- on_safe = new FunctionContext([this, on_safe](int r) {
+ on_safe = new LambdaContext([this, on_safe](int r) {
// ensure all committed IO before this op is committed
m_journaler->flush_commit_position(on_safe);
});
ceph_clock_now());
bufferlist bl;
- ::encode(event_entry, bl);
+ encode(event_entry, bl);
Future op_start_future;
Future op_finish_future;
{
- Mutex::Locker locker(m_lock);
- assert(m_state == STATE_READY);
+ std::lock_guard locker{m_lock};
+ ceph_assert(m_state == STATE_READY);
// ready to commit op event
auto it = m_op_futures.find(op_tid);
- assert(it != m_op_futures.end());
+ ceph_assert(it != m_op_futures.end());
op_start_future = it->second;
m_op_futures.erase(it);
ldout(cct, 10) << this << " " << __func__ << ": op_tid=" << op_tid << dendl;
{
- Mutex::Locker locker(m_lock);
- assert(m_journal_replay != nullptr);
+ std::lock_guard locker{m_lock};
+ ceph_assert(m_journal_replay != nullptr);
m_journal_replay->replay_op_ready(op_tid, on_resume);
}
}
ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
<< "on_safe=" << on_safe << dendl;
+ on_safe = create_context_callback<Context>(on_safe, this);
+
Future future;
{
- Mutex::Locker event_locker(m_event_lock);
+ std::lock_guard event_locker{m_event_lock};
future = wait_event(m_lock, tid, on_safe);
}
ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
<< "on_safe=" << on_safe << dendl;
- Mutex::Locker event_locker(m_event_lock);
+ on_safe = create_context_callback<Context>(on_safe, this);
+
+ std::lock_guard event_locker{m_event_lock};
wait_event(m_lock, tid, on_safe);
}
template <typename I>
-typename Journal<I>::Future Journal<I>::wait_event(Mutex &lock, uint64_t tid,
+typename Journal<I>::Future Journal<I>::wait_event(ceph::mutex &lock, uint64_t tid,
Context *on_safe) {
- assert(m_event_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_event_lock));
CephContext *cct = m_image_ctx.cct;
typename Events::iterator it = m_events.find(tid);
- assert(it != m_events.end());
+ ceph_assert(it != m_events.end());
Event &event = it->second;
if (event.safe) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << dendl;
- Mutex::Locker locker(m_lock);
- assert(m_state == STATE_READY);
- assert(m_journal_replay == nullptr);
+ std::lock_guard locker{m_lock};
+ ceph_assert(m_state == STATE_READY);
+ ceph_assert(m_journal_replay == nullptr);
on_start = util::create_async_context_callback(m_image_ctx, on_start);
- on_start = new FunctionContext(
+ on_start = new LambdaContext(
[this, journal_replay, on_start](int r) {
handle_start_external_replay(r, journal_replay, on_start);
});
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << dendl;
- Mutex::Locker locker(m_lock);
- assert(m_state == STATE_READY);
- assert(m_journal_replay == nullptr);
+ std::lock_guard locker{m_lock};
+ ceph_assert(m_state == STATE_READY);
+ ceph_assert(m_journal_replay == nullptr);
if (r < 0) {
lderr(cct) << this << " " << __func__ << ": "
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << dendl;
- Mutex::Locker locker(m_lock);
- assert(m_journal_replay != nullptr);
- assert(m_state == STATE_REPLAYING);
+ std::lock_guard locker{m_lock};
+ ceph_assert(m_journal_replay != nullptr);
+ ceph_assert(m_state == STATE_REPLAYING);
delete m_journal_replay;
m_journal_replay = nullptr;
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << dendl;
- assert(m_lock.is_locked());
- assert(m_state == STATE_UNINITIALIZED || m_state == STATE_RESTARTING_REPLAY);
- assert(m_journaler == NULL);
+ ceph_assert(ceph_mutex_is_locked(m_lock));
+ ceph_assert(m_state == STATE_UNINITIALIZED || m_state == STATE_RESTARTING_REPLAY);
+ ceph_assert(m_journaler == NULL);
transition_state(STATE_INITIALIZING, 0);
::journal::Settings settings;
- settings.commit_interval = m_image_ctx.journal_commit_age;
- settings.max_payload_bytes = m_image_ctx.journal_max_payload_bytes;
+ settings.commit_interval =
+ m_image_ctx.config.template get_val<double>("rbd_journal_commit_age");
+ settings.max_payload_bytes =
+ m_image_ctx.config.template get_val<Option::size_t>("rbd_journal_max_payload_bytes");
settings.max_concurrent_object_sets =
- m_image_ctx.journal_max_concurrent_object_sets;
+ m_image_ctx.config.template get_val<uint64_t>("rbd_journal_max_concurrent_object_sets");
// TODO: a configurable filter to exclude certain peers from being
// disconnected.
- settings.whitelisted_laggy_clients = {IMAGE_CLIENT_ID};
+ settings.ignored_laggy_clients = {IMAGE_CLIENT_ID};
m_journaler = new Journaler(m_work_queue, m_timer, m_timer_lock,
m_image_ctx.md_ctx, m_image_ctx.id,
- IMAGE_CLIENT_ID, settings);
+ IMAGE_CLIENT_ID, settings, nullptr);
m_journaler->add_listener(&m_metadata_listener);
Context *ctx = create_async_context_callback(
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
- assert(m_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_lock));
delete m_journal_replay;
m_journal_replay = NULL;
Context *ctx = create_async_context_callback(
m_image_ctx, create_context_callback<
Journal<I>, &Journal<I>::handle_journal_destroyed>(this));
- ctx = new FunctionContext(
+ ctx = new LambdaContext(
[this, ctx](int r) {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
m_journaler->shut_down(ctx);
});
- m_async_journal_op_tracker.wait(m_image_ctx, ctx);
+ ctx = create_async_context_callback(m_image_ctx, ctx);
+ m_async_journal_op_tracker.wait_for_ops(ctx);
}
template <typename I>
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
- assert(m_lock.is_locked());
- assert(m_state == STATE_FLUSHING_RESTART ||
- m_state == STATE_FLUSHING_REPLAY);
+ ceph_assert(ceph_mutex_is_locked(m_lock));
+ ceph_assert(m_state == STATE_FLUSHING_RESTART ||
+ m_state == STATE_FLUSHING_REPLAY);
delete m_journal_replay;
m_journal_replay = NULL;
template <typename I>
void Journal<I>::complete_event(typename Events::iterator it, int r) {
- assert(m_event_lock.is_locked());
- assert(m_state == STATE_READY);
+ ceph_assert(ceph_mutex_is_locked(m_event_lock));
+ ceph_assert(m_state == STATE_READY);
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": tid=" << it->first << " "
<< "r=" << r << dendl;
Event &event = it->second;
+ if (r < 0 && r == event.filter_ret_val) {
+ // ignore allowed error codes
+ r = 0;
+ }
if (r < 0) {
// event recorded to journal but failed to update disk, we cannot
// commit this IO event. this event must be replayed.
- assert(event.safe);
+ ceph_assert(event.safe);
lderr(cct) << this << " " << __func__ << ": "
<< "failed to commit IO to disk, replay required: "
<< cpp_strerror(r) << dendl;
template <typename I>
void Journal<I>::start_append() {
- assert(m_lock.is_locked());
- m_journaler->start_append(m_image_ctx.journal_object_flush_interval,
- m_image_ctx.journal_object_flush_bytes,
- m_image_ctx.journal_object_flush_age);
+ ceph_assert(ceph_mutex_is_locked(m_lock));
+
+ m_journaler->start_append(
+ m_image_ctx.config.template get_val<uint64_t>("rbd_journal_object_max_in_flight_appends"));
+ if (!m_image_ctx.config.template get_val<bool>("rbd_journal_object_writethrough_until_flush")) {
+ m_journaler->set_append_batch_options(
+ m_image_ctx.config.template get_val<uint64_t>("rbd_journal_object_flush_interval"),
+ m_image_ctx.config.template get_val<Option::size_t>("rbd_journal_object_flush_bytes"),
+ m_image_ctx.config.template get_val<double>("rbd_journal_object_flush_age"));
+ }
+
transition_state(STATE_READY, 0);
}
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
- Mutex::Locker locker(m_lock);
- assert(m_state == STATE_INITIALIZING);
+ std::lock_guard locker{m_lock};
+ ceph_assert(m_state == STATE_INITIALIZING);
if (r < 0) {
lderr(cct) << this << " " << __func__ << ": "
CephContext *cct = m_image_ctx.cct;
ReplayEntry replay_entry;
{
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
if (m_state != STATE_REPLAYING) {
return;
}
}
// only one entry should be in-flight at a time
- assert(!m_processing_entry);
+ ceph_assert(!m_processing_entry);
m_processing_entry = true;
}
+ m_async_journal_op_tracker.start_op();
+
bufferlist data = replay_entry.get_data();
- bufferlist::iterator it = data.begin();
+ auto it = data.cbegin();
journal::EventEntry event_entry;
int r = m_journal_replay->decode(&it, &event_entry);
bool cancel_ops = false;
{
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
if (m_state != STATE_REPLAYING) {
return;
}
}
}
- Context *ctx = new FunctionContext([this, cct](int r) {
+ Context *ctx = new LambdaContext([this, cct](int r) {
ldout(cct, 20) << this << " handle_replay_complete: "
<< "handle shut down replay" << dendl;
State state;
{
- Mutex::Locker locker(m_lock);
- assert(m_state == STATE_FLUSHING_RESTART ||
- m_state == STATE_FLUSHING_REPLAY);
+ std::lock_guard locker{m_lock};
+ ceph_assert(m_state == STATE_FLUSHING_RESTART ||
+ m_state == STATE_FLUSHING_REPLAY);
state = m_state;
}
handle_flushing_replay();
}
});
- ctx = new FunctionContext([this, cct, cancel_ops, ctx](int r) {
+ ctx = new LambdaContext([this, ctx](int r) {
+ // ensure the commit position is flushed to disk
+ m_journaler->flush_commit_position(ctx);
+ });
+ ctx = create_async_context_callback(m_image_ctx, ctx);
+ ctx = new LambdaContext([this, ctx](int r) {
+ m_async_journal_op_tracker.wait_for_ops(ctx);
+ });
+ ctx = new LambdaContext([this, cct, cancel_ops, ctx](int r) {
ldout(cct, 20) << this << " handle_replay_complete: "
<< "shut down replay" << dendl;
m_journal_replay->shut_down(cancel_ops, ctx);
});
+
m_journaler->stop_replay(ctx);
}
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << dendl;
- assert(r == 0);
+ ceph_assert(r == 0);
{
- Mutex::Locker locker(m_lock);
- assert(m_processing_entry);
+ std::lock_guard locker{m_lock};
+ ceph_assert(m_processing_entry);
m_processing_entry = false;
}
handle_replay_ready();
void Journal<I>::handle_replay_process_safe(ReplayEntry replay_entry, int r) {
CephContext *cct = m_image_ctx.cct;
- m_lock.Lock();
- assert(m_state == STATE_REPLAYING ||
- m_state == STATE_FLUSHING_RESTART ||
- m_state == STATE_FLUSHING_REPLAY);
+ std::unique_lock locker{m_lock};
+ ceph_assert(m_state == STATE_REPLAYING ||
+ m_state == STATE_FLUSHING_RESTART ||
+ m_state == STATE_FLUSHING_REPLAY);
ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
if (r < 0) {
if (m_state == STATE_REPLAYING) {
// abort the replay if we have an error
transition_state(STATE_FLUSHING_RESTART, r);
- m_lock.Unlock();
+ locker.unlock();
// stop replay, shut down, and restart
- Context *ctx = new FunctionContext([this, cct](int r) {
+ Context* ctx = create_context_callback<
+ Journal<I>, &Journal<I>::handle_flushing_restart>(this);
+ ctx = new LambdaContext([this, ctx](int r) {
+ // ensure the commit position is flushed to disk
+ m_journaler->flush_commit_position(ctx);
+ });
+ ctx = new LambdaContext([this, cct, ctx](int r) {
ldout(cct, 20) << this << " handle_replay_process_safe: "
<< "shut down replay" << dendl;
{
- Mutex::Locker locker(m_lock);
- assert(m_state == STATE_FLUSHING_RESTART);
+ std::lock_guard locker{m_lock};
+ ceph_assert(m_state == STATE_FLUSHING_RESTART);
}
- m_journal_replay->shut_down(true, create_context_callback<
- Journal<I>, &Journal<I>::handle_flushing_restart>(this));
+ m_journal_replay->shut_down(true, ctx);
});
m_journaler->stop_replay(ctx);
+ m_async_journal_op_tracker.finish_op();
return;
} else if (m_state == STATE_FLUSHING_REPLAY) {
// end-of-replay flush in-progress -- we need to restart replay
transition_state(STATE_FLUSHING_RESTART, r);
- m_lock.Unlock();
+ locker.unlock();
+ m_async_journal_op_tracker.finish_op();
return;
}
} else {
// only commit the entry if written successfully
m_journaler->committed(replay_entry);
}
- m_lock.Unlock();
+ locker.unlock();
+ m_async_journal_op_tracker.finish_op();
}
template <typename I>
void Journal<I>::handle_flushing_restart(int r) {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << dendl;
- assert(r == 0);
- assert(m_state == STATE_FLUSHING_RESTART);
+ ceph_assert(r == 0);
+ ceph_assert(m_state == STATE_FLUSHING_RESTART);
if (m_close_pending) {
destroy_journaler(r);
return;
template <typename I>
void Journal<I>::handle_flushing_replay() {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << dendl;
- assert(m_state == STATE_FLUSHING_REPLAY || m_state == STATE_FLUSHING_RESTART);
+ ceph_assert(m_state == STATE_FLUSHING_REPLAY ||
+ m_state == STATE_FLUSHING_RESTART);
if (m_close_pending) {
destroy_journaler(0);
return;
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
- Mutex::Locker locker(m_lock);
- assert(m_state == STATE_STOPPING);
+ std::lock_guard locker{m_lock};
+ ceph_assert(m_state == STATE_STOPPING);
destroy_journaler(r);
}
<< dendl;
}
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
delete m_journaler;
m_journaler = nullptr;
- assert(m_state == STATE_CLOSING || m_state == STATE_RESTARTING_REPLAY);
+ ceph_assert(m_state == STATE_CLOSING || m_state == STATE_RESTARTING_REPLAY);
if (m_state == STATE_RESTARTING_REPLAY) {
create_journaler();
return;
<< "tid=" << tid << dendl;
// journal will be flushed before closing
- assert(m_state == STATE_READY || m_state == STATE_STOPPING);
+ ceph_assert(m_state == STATE_READY || m_state == STATE_STOPPING);
if (r < 0) {
lderr(cct) << this << " " << __func__ << ": "
<< "failed to commit IO event: " << cpp_strerror(r) << dendl;
}
- IOObjectRequests aio_object_requests;
Contexts on_safe_contexts;
{
- Mutex::Locker event_locker(m_event_lock);
+ std::lock_guard event_locker{m_event_lock};
typename Events::iterator it = m_events.find(tid);
- assert(it != m_events.end());
+ ceph_assert(it != m_events.end());
Event &event = it->second;
- aio_object_requests.swap(event.aio_object_requests);
on_safe_contexts.swap(event.on_safe_contexts);
if (r < 0 || event.committed_io) {
ldout(cct, 20) << this << " " << __func__ << ": "
<< "completing tid=" << tid << dendl;
- for (IOObjectRequests::iterator it = aio_object_requests.begin();
- it != aio_object_requests.end(); ++it) {
- if (r < 0) {
- // don't send aio requests if the journal fails -- bubble error up
- (*it)->complete(r);
- } else {
- // send any waiting aio requests now that journal entry is safe
- (*it)->send();
- }
- }
// alert the cache about the journal event status
for (Contexts::iterator it = on_safe_contexts.begin();
<< "tid=" << tid << dendl;
// journal will be flushed before closing
- assert(m_state == STATE_READY || m_state == STATE_STOPPING);
+ ceph_assert(m_state == STATE_READY || m_state == STATE_STOPPING);
if (r < 0) {
lderr(cct) << this << " " << __func__ << ": "
<< "failed to commit op event: " << cpp_strerror(r) << dendl;
template <typename I>
void Journal<I>::stop_recording() {
- assert(m_lock.is_locked());
- assert(m_journaler != NULL);
+ ceph_assert(ceph_mutex_is_locked(m_lock));
+ ceph_assert(m_journaler != NULL);
- assert(m_state == STATE_READY);
+ ceph_assert(m_state == STATE_READY);
transition_state(STATE_STOPPING, 0);
m_journaler->stop_append(util::create_async_context_callback(
void Journal<I>::transition_state(State state, int r) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": new state=" << state << dendl;
- assert(m_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_lock));
m_state = state;
if (m_error_result == 0 && r < 0) {
}
if (is_steady_state()) {
- Contexts wait_for_state_contexts(std::move(m_wait_for_state_contexts));
+ auto wait_for_state_contexts(std::move(m_wait_for_state_contexts));
+ m_wait_for_state_contexts.clear();
+
for (auto ctx : wait_for_state_contexts) {
ctx->complete(m_error_result);
}
template <typename I>
bool Journal<I>::is_steady_state() const {
- assert(m_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_lock));
switch (m_state) {
case STATE_READY:
case STATE_CLOSED:
template <typename I>
void Journal<I>::wait_for_steady_state(Context *on_state) {
- assert(m_lock.is_locked());
- assert(!is_steady_state());
+ ceph_assert(ceph_mutex_is_locked(m_lock));
+ ceph_assert(!is_steady_state());
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": on_state=" << on_state
template <typename I>
int Journal<I>::is_resync_requested(bool *do_resync) {
- Mutex::Locker l(m_lock);
+ std::lock_guard l{m_lock};
return check_resync_requested(do_resync);
}
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << dendl;
- assert(m_lock.is_locked());
- assert(do_resync != nullptr);
+ ceph_assert(ceph_mutex_is_locked(m_lock));
+ ceph_assert(do_resync != nullptr);
cls::journal::Client client;
int r = m_journaler->get_cached_client(IMAGE_CLIENT_ID, &client);
}
librbd::journal::ClientData client_data;
- bufferlist::iterator bl_it = client.data.begin();
+ auto bl_it = client.data.cbegin();
try {
- ::decode(client_data, bl_it);
+ decode(client_data, bl_it);
} catch (const buffer::error &err) {
lderr(cct) << this << " " << __func__ << ": "
- << "failed to decode client data: " << err << dendl;
+ << "failed to decode client data: " << err.what() << dendl;
return -EINVAL;
}
}
struct C_RefreshTags : public Context {
- util::AsyncOpTracker &async_op_tracker;
+ AsyncOpTracker &async_op_tracker;
Context *on_finish = nullptr;
- Mutex lock;
- uint64_t tag_tid;
+ ceph::mutex lock =
+ ceph::make_mutex("librbd::Journal::C_RefreshTags::lock");
+ uint64_t tag_tid = 0;
journal::TagData tag_data;
- C_RefreshTags(util::AsyncOpTracker &async_op_tracker)
- : async_op_tracker(async_op_tracker),
- lock("librbd::Journal::C_RefreshTags::lock") {
+ explicit C_RefreshTags(AsyncOpTracker &async_op_tracker)
+ : async_op_tracker(async_op_tracker) {
async_op_tracker.start_op();
}
~C_RefreshTags() override {
template <typename I>
void Journal<I>::handle_metadata_updated() {
CephContext *cct = m_image_ctx.cct;
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
if (m_state != STATE_READY && !is_journal_replaying(m_lock)) {
return;
// pull the most recent tags from the journal, decode, and
// update the internal tag state
C_RefreshTags *refresh_ctx = new C_RefreshTags(m_async_journal_op_tracker);
- refresh_ctx->on_finish = new FunctionContext(
+ refresh_ctx->on_finish = new LambdaContext(
[this, refresh_sequence, refresh_ctx](int r) {
handle_refresh_metadata(refresh_sequence, refresh_ctx->tag_tid,
refresh_ctx->tag_data, r);
uint64_t tag_tid,
journal::TagData tag_data, int r) {
CephContext *cct = m_image_ctx.cct;
- Mutex::Locker locker(m_lock);
+ std::unique_lock locker{m_lock};
if (r < 0) {
lderr(cct) << this << " " << __func__ << ": failed to refresh metadata: "
<< "refresh_sequence=" << refresh_sequence << ", "
<< "tag_tid=" << tag_tid << ", "
<< "tag_data=" << tag_data << dendl;
- while (m_listener_notify) {
- m_listener_cond.Wait(m_lock);
- }
+ m_listener_cond.wait(locker, [this] { return !m_listener_notify; });
bool was_tag_owner = is_tag_owner(m_lock);
if (m_tag_tid < tag_tid) {
Listeners listeners(m_listeners);
m_listener_notify = true;
- m_lock.Unlock();
+ locker.unlock();
if (promoted_to_primary) {
for (auto listener : listeners) {
}
}
- m_lock.Lock();
+ locker.lock();
m_listener_notify = false;
- m_listener_cond.Signal();
+ m_listener_cond.notify_all();
}
template <typename I>
void Journal<I>::add_listener(journal::Listener *listener) {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
m_listeners.insert(listener);
}
template <typename I>
void Journal<I>::remove_listener(journal::Listener *listener) {
- Mutex::Locker locker(m_lock);
- while (m_listener_notify) {
- m_listener_cond.Wait(m_lock);
- }
+ std::unique_lock locker{m_lock};
+ m_listener_cond.wait(locker, [this] { return !m_listener_notify; });
m_listeners.erase(listener);
}