]> git.proxmox.com Git - ceph.git/blame - ceph/src/librbd/Journal.cc
import ceph quincy 17.2.6
[ceph.git] / ceph / src / librbd / Journal.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "librbd/Journal.h"
5#include "include/rados/librados.hpp"
9f95a23c 6#include "common/AsyncOpTracker.h"
7c673cae
FG
7#include "common/errno.h"
8#include "common/Timer.h"
9#include "common/WorkQueue.h"
10#include "cls/journal/cls_journal_types.h"
11#include "journal/Journaler.h"
12#include "journal/Policy.h"
13#include "journal/ReplayEntry.h"
14#include "journal/Settings.h"
15#include "journal/Utils.h"
7c673cae 16#include "librbd/ImageCtx.h"
f67539c2 17#include "librbd/asio/ContextWQ.h"
11fdf7f2 18#include "librbd/io/ObjectDispatchSpec.h"
f67539c2 19#include "librbd/io/ObjectDispatcherInterface.h"
7c673cae
FG
20#include "librbd/journal/CreateRequest.h"
21#include "librbd/journal/DemoteRequest.h"
11fdf7f2 22#include "librbd/journal/ObjectDispatch.h"
7c673cae
FG
23#include "librbd/journal/OpenRequest.h"
24#include "librbd/journal/RemoveRequest.h"
11fdf7f2 25#include "librbd/journal/ResetRequest.h"
7c673cae
FG
26#include "librbd/journal/Replay.h"
27#include "librbd/journal/PromoteRequest.h"
28
29#include <boost/scope_exit.hpp>
30#include <utility>
31
32#define dout_subsys ceph_subsys_rbd
33#undef dout_prefix
34#define dout_prefix *_dout << "librbd::Journal: "
35
36namespace librbd {
37
38using util::create_async_context_callback;
39using util::create_context_callback;
40using journal::util::C_DecodeTag;
41using journal::util::C_DecodeTags;
42
43namespace {
44
f67539c2
TL
45// TODO: once journaler is 100% async and converted to ASIO, remove separate
46// threads and reuse librbd's AsioEngine
7c673cae
FG
47class ThreadPoolSingleton : public ThreadPool {
48public:
f67539c2
TL
49 ContextWQ *work_queue;
50
7c673cae 51 explicit ThreadPoolSingleton(CephContext *cct)
f67539c2
TL
52 : ThreadPool(cct, "librbd::Journal", "tp_librbd_journ", 1),
53 work_queue(new ContextWQ("librbd::journal::work_queue",
54 ceph::make_timespan(
55 cct->_conf.get_val<uint64_t>("rbd_op_thread_timeout")),
56 this)) {
7c673cae
FG
57 start();
58 }
59 ~ThreadPoolSingleton() override {
f67539c2
TL
60 work_queue->drain();
61 delete work_queue;
62
7c673cae
FG
63 stop();
64 }
65};
66
67template <typename I>
68struct C_IsTagOwner : public Context {
69 librados::IoCtx &io_ctx;
70 std::string image_id;
71 bool *is_tag_owner;
f67539c2 72 asio::ContextWQ *op_work_queue;
7c673cae
FG
73 Context *on_finish;
74
75 CephContext *cct = nullptr;
76 Journaler *journaler;
77 cls::journal::Client client;
78 journal::ImageClientMeta client_meta;
11fdf7f2 79 uint64_t tag_tid = 0;
7c673cae
FG
80 journal::TagData tag_data;
81
82 C_IsTagOwner(librados::IoCtx &io_ctx, const std::string &image_id,
f67539c2
TL
83 bool *is_tag_owner, asio::ContextWQ *op_work_queue,
84 Context *on_finish)
7c673cae
FG
85 : io_ctx(io_ctx), image_id(image_id), is_tag_owner(is_tag_owner),
86 op_work_queue(op_work_queue), on_finish(on_finish),
87 cct(reinterpret_cast<CephContext*>(io_ctx.cct())),
88 journaler(new Journaler(io_ctx, image_id, Journal<>::IMAGE_CLIENT_ID,
9f95a23c 89 {}, nullptr)) {
7c673cae
FG
90 }
91
92 void finish(int r) override {
93 ldout(cct, 20) << this << " C_IsTagOwner::" << __func__ << ": r=" << r
94 << dendl;
95 if (r < 0) {
96 lderr(cct) << this << " C_IsTagOwner::" << __func__ << ": "
97 << "failed to get tag owner: " << cpp_strerror(r) << dendl;
98 } else {
99 *is_tag_owner = (tag_data.mirror_uuid == Journal<>::LOCAL_MIRROR_UUID);
100 }
101
102 Journaler *journaler = this->journaler;
103 Context *on_finish = this->on_finish;
9f95a23c 104 auto ctx = new LambdaContext(
7c673cae
FG
105 [journaler, on_finish](int r) {
106 on_finish->complete(r);
107 delete journaler;
108 });
109 op_work_queue->queue(ctx, r);
110 }
111};
112
113struct C_GetTagOwner : public Context {
114 std::string *mirror_uuid;
115 Context *on_finish;
116
117 Journaler journaler;
118 cls::journal::Client client;
119 journal::ImageClientMeta client_meta;
11fdf7f2 120 uint64_t tag_tid = 0;
7c673cae
FG
121 journal::TagData tag_data;
122
123 C_GetTagOwner(librados::IoCtx &io_ctx, const std::string &image_id,
124 std::string *mirror_uuid, Context *on_finish)
125 : mirror_uuid(mirror_uuid), on_finish(on_finish),
9f95a23c 126 journaler(io_ctx, image_id, Journal<>::IMAGE_CLIENT_ID, {}, nullptr) {
7c673cae
FG
127 }
128
129 virtual void finish(int r) {
130 if (r >= 0) {
131 *mirror_uuid = tag_data.mirror_uuid;
132 }
133 on_finish->complete(r);
134 }
135};
136
137template <typename J>
138struct GetTagsRequest {
139 CephContext *cct;
140 J *journaler;
141 cls::journal::Client *client;
142 journal::ImageClientMeta *client_meta;
143 uint64_t *tag_tid;
144 journal::TagData *tag_data;
145 Context *on_finish;
146
9f95a23c 147 ceph::mutex lock = ceph::make_mutex("lock");
7c673cae
FG
148
149 GetTagsRequest(CephContext *cct, J *journaler, cls::journal::Client *client,
150 journal::ImageClientMeta *client_meta, uint64_t *tag_tid,
151 journal::TagData *tag_data, Context *on_finish)
152 : cct(cct), journaler(journaler), client(client), client_meta(client_meta),
9f95a23c 153 tag_tid(tag_tid), tag_data(tag_data), on_finish(on_finish) {
7c673cae
FG
154 }
155
156 /**
157 * @verbatim
158 *
159 * <start>
160 * |
161 * v
162 * GET_CLIENT * * * * * * * * * * * *
163 * | *
164 * v *
165 * GET_TAGS * * * * * * * * * * * * * (error)
166 * | *
167 * v *
168 * <finish> * * * * * * * * * * * * *
169 *
170 * @endverbatim
171 */
172
173 void send() {
174 send_get_client();
175 }
176
177 void send_get_client() {
178 ldout(cct, 20) << __func__ << dendl;
179
9f95a23c 180 auto ctx = new LambdaContext(
7c673cae
FG
181 [this](int r) {
182 handle_get_client(r);
183 });
184 journaler->get_client(Journal<ImageCtx>::IMAGE_CLIENT_ID, client, ctx);
185 }
186
187 void handle_get_client(int r) {
188 ldout(cct, 20) << __func__ << ": r=" << r << dendl;
189
190 if (r < 0) {
191 complete(r);
192 return;
193 }
194
195 librbd::journal::ClientData client_data;
11fdf7f2 196 auto bl_it = client->data.cbegin();
7c673cae 197 try {
11fdf7f2 198 decode(client_data, bl_it);
7c673cae
FG
199 } catch (const buffer::error &err) {
200 lderr(cct) << this << " OpenJournalerRequest::" << __func__ << ": "
201 << "failed to decode client data" << dendl;
202 complete(-EBADMSG);
203 return;
204 }
205
206 journal::ImageClientMeta *image_client_meta =
207 boost::get<journal::ImageClientMeta>(&client_data.client_meta);
208 if (image_client_meta == nullptr) {
209 lderr(cct) << this << " OpenJournalerRequest::" << __func__ << ": "
210 << "failed to get client meta" << dendl;
211 complete(-EINVAL);
212 return;
213 }
214 *client_meta = *image_client_meta;
215
216 send_get_tags();
217 }
218
219 void send_get_tags() {
220 ldout(cct, 20) << __func__ << dendl;
221
9f95a23c 222 auto ctx = new LambdaContext(
7c673cae
FG
223 [this](int r) {
224 handle_get_tags(r);
225 });
226 C_DecodeTags *tags_ctx = new C_DecodeTags(cct, &lock, tag_tid, tag_data,
227 ctx);
228 journaler->get_tags(client_meta->tag_class, &tags_ctx->tags, tags_ctx);
229 }
230
231 void handle_get_tags(int r) {
232 ldout(cct, 20) << __func__ << ": r=" << r << dendl;
233
234 complete(r);
235 }
236
237 void complete(int r) {
238 on_finish->complete(r);
239 delete this;
240 }
241};
242
243template <typename J>
244void get_tags(CephContext *cct, J *journaler,
245 cls::journal::Client *client,
246 journal::ImageClientMeta *client_meta,
247 uint64_t *tag_tid, journal::TagData *tag_data,
248 Context *on_finish) {
249 ldout(cct, 20) << __func__ << dendl;
250
251 GetTagsRequest<J> *req =
252 new GetTagsRequest<J>(cct, journaler, client, client_meta, tag_tid,
253 tag_data, on_finish);
254 req->send();
255}
256
257template <typename J>
258int allocate_journaler_tag(CephContext *cct, J *journaler,
259 uint64_t tag_class,
260 const journal::TagPredecessor &predecessor,
261 const std::string &mirror_uuid,
262 cls::journal::Tag *new_tag) {
263 journal::TagData tag_data;
264 tag_data.mirror_uuid = mirror_uuid;
265 tag_data.predecessor = predecessor;
266
267 bufferlist tag_bl;
11fdf7f2 268 encode(tag_data, tag_bl);
7c673cae
FG
269
270 C_SaferCond allocate_tag_ctx;
271 journaler->allocate_tag(tag_class, tag_bl, new_tag, &allocate_tag_ctx);
272
273 int r = allocate_tag_ctx.wait();
274 if (r < 0) {
275 lderr(cct) << __func__ << ": "
276 << "failed to allocate tag: " << cpp_strerror(r) << dendl;
277 return r;
278 }
279 return 0;
280}
281
282} // anonymous namespace
283
284// client id for local image
285template <typename I>
286const std::string Journal<I>::IMAGE_CLIENT_ID("");
287
288// mirror uuid to use for local images
289template <typename I>
290const std::string Journal<I>::LOCAL_MIRROR_UUID("");
291
292// mirror uuid to use for orphaned (demoted) images
293template <typename I>
294const std::string Journal<I>::ORPHAN_MIRROR_UUID("<orphan>");
295
296template <typename I>
297std::ostream &operator<<(std::ostream &os,
298 const typename Journal<I>::State &state) {
299 switch (state) {
300 case Journal<I>::STATE_UNINITIALIZED:
301 os << "Uninitialized";
302 break;
303 case Journal<I>::STATE_INITIALIZING:
304 os << "Initializing";
305 break;
306 case Journal<I>::STATE_REPLAYING:
307 os << "Replaying";
308 break;
309 case Journal<I>::STATE_FLUSHING_RESTART:
310 os << "FlushingRestart";
311 break;
312 case Journal<I>::STATE_RESTARTING_REPLAY:
313 os << "RestartingReplay";
314 break;
315 case Journal<I>::STATE_FLUSHING_REPLAY:
316 os << "FlushingReplay";
317 break;
318 case Journal<I>::STATE_READY:
319 os << "Ready";
320 break;
321 case Journal<I>::STATE_STOPPING:
322 os << "Stopping";
323 break;
324 case Journal<I>::STATE_CLOSING:
325 os << "Closing";
326 break;
327 case Journal<I>::STATE_CLOSED:
328 os << "Closed";
329 break;
330 default:
331 os << "Unknown (" << static_cast<uint32_t>(state) << ")";
332 break;
333 }
334 return os;
335}
336
f67539c2
TL
337
338template <typename I>
339void Journal<I>::MetadataListener::handle_update(::journal::JournalMetadata *) {
340 auto ctx = new LambdaContext([this](int r) {
341 journal->handle_metadata_updated();
342 });
343 journal->m_work_queue->queue(ctx, 0);
344}
345
346
347template <typename I>
348void Journal<I>::get_work_queue(CephContext *cct, ContextWQ **work_queue) {
349 auto thread_pool_singleton =
350 &cct->lookup_or_create_singleton_object<ThreadPoolSingleton>(
351 "librbd::journal::thread_pool", false, cct);
352 *work_queue = thread_pool_singleton->work_queue;
353}
354
7c673cae
FG
355template <typename I>
356Journal<I>::Journal(I &image_ctx)
9f95a23c
TL
357 : RefCountedObject(image_ctx.cct),
358 m_image_ctx(image_ctx), m_journaler(NULL),
359 m_state(STATE_UNINITIALIZED),
7c673cae 360 m_error_result(0), m_replay_handler(this), m_close_pending(false),
9f95a23c 361 m_event_tid(0),
7c673cae
FG
362 m_blocking_writes(false), m_journal_replay(NULL),
363 m_metadata_listener(this) {
364
365 CephContext *cct = m_image_ctx.cct;
366 ldout(cct, 5) << this << ": ictx=" << &m_image_ctx << dendl;
367
f67539c2 368 get_work_queue(cct, &m_work_queue);
7c673cae
FG
369 ImageCtx::get_timer_instance(cct, &m_timer, &m_timer_lock);
370}
371
372template <typename I>
373Journal<I>::~Journal() {
374 if (m_work_queue != nullptr) {
375 m_work_queue->drain();
7c673cae
FG
376 }
377
9f95a23c 378 std::lock_guard locker{m_lock};
11fdf7f2
TL
379 ceph_assert(m_state == STATE_UNINITIALIZED || m_state == STATE_CLOSED);
380 ceph_assert(m_journaler == NULL);
381 ceph_assert(m_journal_replay == NULL);
382 ceph_assert(m_wait_for_state_contexts.empty());
7c673cae
FG
383}
384
385template <typename I>
386bool Journal<I>::is_journal_supported(I &image_ctx) {
9f95a23c 387 ceph_assert(ceph_mutex_is_locked(image_ctx.image_lock));
7c673cae
FG
388 return ((image_ctx.features & RBD_FEATURE_JOURNALING) &&
389 !image_ctx.read_only && image_ctx.snap_id == CEPH_NOSNAP);
390}
391
392template <typename I>
393int Journal<I>::create(librados::IoCtx &io_ctx, const std::string &image_id,
394 uint8_t order, uint8_t splay_width,
395 const std::string &object_pool) {
396 CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
397 ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
398
f67539c2
TL
399 ContextWQ *work_queue;
400 get_work_queue(cct, &work_queue);
7c673cae
FG
401
402 C_SaferCond cond;
403 journal::TagData tag_data(LOCAL_MIRROR_UUID);
404 journal::CreateRequest<I> *req = journal::CreateRequest<I>::create(
405 io_ctx, image_id, order, splay_width, object_pool, cls::journal::Tag::TAG_CLASS_NEW,
f67539c2 406 tag_data, IMAGE_CLIENT_ID, work_queue, &cond);
7c673cae
FG
407 req->send();
408
409 return cond.wait();
410}
411
412template <typename I>
413int Journal<I>::remove(librados::IoCtx &io_ctx, const std::string &image_id) {
414 CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
415 ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
416
f67539c2
TL
417 ContextWQ *work_queue;
418 get_work_queue(cct, &work_queue);
7c673cae
FG
419
420 C_SaferCond cond;
421 journal::RemoveRequest<I> *req = journal::RemoveRequest<I>::create(
f67539c2 422 io_ctx, image_id, IMAGE_CLIENT_ID, work_queue, &cond);
7c673cae
FG
423 req->send();
424
425 return cond.wait();
426}
427
428template <typename I>
429int Journal<I>::reset(librados::IoCtx &io_ctx, const std::string &image_id) {
430 CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
431 ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
432
f67539c2
TL
433 ContextWQ *work_queue;
434 get_work_queue(cct, &work_queue);
7c673cae
FG
435
436 C_SaferCond cond;
11fdf7f2
TL
437 auto req = journal::ResetRequest<I>::create(io_ctx, image_id, IMAGE_CLIENT_ID,
438 Journal<>::LOCAL_MIRROR_UUID,
f67539c2 439 work_queue, &cond);
11fdf7f2 440 req->send();
7c673cae 441
11fdf7f2 442 return cond.wait();
7c673cae
FG
443}
444
445template <typename I>
446void Journal<I>::is_tag_owner(I *image_ctx, bool *owner,
447 Context *on_finish) {
448 Journal<I>::is_tag_owner(image_ctx->md_ctx, image_ctx->id, owner,
449 image_ctx->op_work_queue, on_finish);
450}
451
452template <typename I>
453void Journal<I>::is_tag_owner(librados::IoCtx& io_ctx, std::string& image_id,
f67539c2
TL
454 bool *is_tag_owner,
455 asio::ContextWQ *op_work_queue,
7c673cae
FG
456 Context *on_finish) {
457 CephContext *cct = reinterpret_cast<CephContext*>(io_ctx.cct());
458 ldout(cct, 20) << __func__ << dendl;
459
460 C_IsTagOwner<I> *is_tag_owner_ctx = new C_IsTagOwner<I>(
461 io_ctx, image_id, is_tag_owner, op_work_queue, on_finish);
462 get_tags(cct, is_tag_owner_ctx->journaler, &is_tag_owner_ctx->client,
463 &is_tag_owner_ctx->client_meta, &is_tag_owner_ctx->tag_tid,
464 &is_tag_owner_ctx->tag_data, is_tag_owner_ctx);
465}
466
467template <typename I>
468void Journal<I>::get_tag_owner(IoCtx& io_ctx, std::string& image_id,
469 std::string *mirror_uuid,
f67539c2
TL
470 asio::ContextWQ *op_work_queue,
471 Context *on_finish) {
11fdf7f2 472 CephContext *cct = static_cast<CephContext *>(io_ctx.cct());
7c673cae
FG
473 ldout(cct, 20) << __func__ << dendl;
474
475 auto ctx = new C_GetTagOwner(io_ctx, image_id, mirror_uuid, on_finish);
476 get_tags(cct, &ctx->journaler, &ctx->client, &ctx->client_meta, &ctx->tag_tid,
477 &ctx->tag_data, create_async_context_callback(op_work_queue, ctx));
478}
479
480template <typename I>
481int Journal<I>::request_resync(I *image_ctx) {
482 CephContext *cct = image_ctx->cct;
483 ldout(cct, 20) << __func__ << dendl;
484
9f95a23c
TL
485 Journaler journaler(image_ctx->md_ctx, image_ctx->id, IMAGE_CLIENT_ID, {},
486 nullptr);
7c673cae 487
9f95a23c 488 ceph::mutex lock = ceph::make_mutex("lock");
7c673cae
FG
489 journal::ImageClientMeta client_meta;
490 uint64_t tag_tid;
491 journal::TagData tag_data;
492
493 C_SaferCond open_ctx;
494 auto open_req = journal::OpenRequest<I>::create(image_ctx, &journaler, &lock,
495 &client_meta, &tag_tid,
496 &tag_data, &open_ctx);
497 open_req->send();
498
499 BOOST_SCOPE_EXIT_ALL(&journaler) {
500 journaler.shut_down();
501 };
502
503 int r = open_ctx.wait();
504 if (r < 0) {
505 return r;
506 }
507
508 client_meta.resync_requested = true;
509
510 journal::ClientData client_data(client_meta);
511 bufferlist client_data_bl;
11fdf7f2 512 encode(client_data, client_data_bl);
7c673cae
FG
513
514 C_SaferCond update_client_ctx;
515 journaler.update_client(client_data_bl, &update_client_ctx);
516
517 r = update_client_ctx.wait();
518 if (r < 0) {
519 lderr(cct) << __func__ << ": "
520 << "failed to update client: " << cpp_strerror(r) << dendl;
521 return r;
522 }
523 return 0;
524}
525
526template <typename I>
527void Journal<I>::promote(I *image_ctx, Context *on_finish) {
528 CephContext *cct = image_ctx->cct;
529 ldout(cct, 20) << __func__ << dendl;
530
531 auto promote_req = journal::PromoteRequest<I>::create(image_ctx, false,
532 on_finish);
533 promote_req->send();
534}
535
536template <typename I>
537void Journal<I>::demote(I *image_ctx, Context *on_finish) {
538 CephContext *cct = image_ctx->cct;
539 ldout(cct, 20) << __func__ << dendl;
540
541 auto req = journal::DemoteRequest<I>::create(*image_ctx, on_finish);
542 req->send();
543}
544
545template <typename I>
546bool Journal<I>::is_journal_ready() const {
9f95a23c 547 std::lock_guard locker{m_lock};
7c673cae
FG
548 return (m_state == STATE_READY);
549}
550
551template <typename I>
552bool Journal<I>::is_journal_replaying() const {
9f95a23c 553 std::lock_guard locker{m_lock};
7c673cae
FG
554 return is_journal_replaying(m_lock);
555}
556
557template <typename I>
9f95a23c
TL
558bool Journal<I>::is_journal_replaying(const ceph::mutex &) const {
559 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae
FG
560 return (m_state == STATE_REPLAYING ||
561 m_state == STATE_FLUSHING_REPLAY ||
562 m_state == STATE_FLUSHING_RESTART ||
563 m_state == STATE_RESTARTING_REPLAY);
564}
565
566template <typename I>
567bool Journal<I>::is_journal_appending() const {
9f95a23c
TL
568 ceph_assert(ceph_mutex_is_locked(m_image_ctx.image_lock));
569 std::lock_guard locker{m_lock};
7c673cae
FG
570 return (m_state == STATE_READY &&
571 !m_image_ctx.get_journal_policy()->append_disabled());
572}
573
574template <typename I>
575void Journal<I>::wait_for_journal_ready(Context *on_ready) {
576 on_ready = create_async_context_callback(m_image_ctx, on_ready);
577
9f95a23c 578 std::lock_guard locker{m_lock};
7c673cae
FG
579 if (m_state == STATE_READY) {
580 on_ready->complete(m_error_result);
581 } else {
582 wait_for_steady_state(on_ready);
583 }
584}
585
586template <typename I>
587void Journal<I>::open(Context *on_finish) {
588 CephContext *cct = m_image_ctx.cct;
589 ldout(cct, 20) << this << " " << __func__ << dendl;
590
9f95a23c
TL
591 on_finish = create_context_callback<Context>(on_finish, this);
592
7c673cae
FG
593 on_finish = create_async_context_callback(m_image_ctx, on_finish);
594
11fdf7f2 595 // inject our handler into the object dispatcher chain
f67539c2 596 m_image_ctx.io_object_dispatcher->register_dispatch(
11fdf7f2
TL
597 journal::ObjectDispatch<I>::create(&m_image_ctx, this));
598
9f95a23c 599 std::lock_guard locker{m_lock};
11fdf7f2 600 ceph_assert(m_state == STATE_UNINITIALIZED);
7c673cae
FG
601 wait_for_steady_state(on_finish);
602 create_journaler();
603}
604
605template <typename I>
606void Journal<I>::close(Context *on_finish) {
607 CephContext *cct = m_image_ctx.cct;
608 ldout(cct, 20) << this << " " << __func__ << dendl;
609
9f95a23c
TL
610 on_finish = create_context_callback<Context>(on_finish, this);
611
612 on_finish = new LambdaContext([this, on_finish](int r) {
11fdf7f2 613 // remove our handler from object dispatcher chain - preserve error
9f95a23c 614 auto ctx = new LambdaContext([on_finish, r](int _) {
11fdf7f2
TL
615 on_finish->complete(r);
616 });
f67539c2 617 m_image_ctx.io_object_dispatcher->shut_down_dispatch(
11fdf7f2
TL
618 io::OBJECT_DISPATCH_LAYER_JOURNAL, ctx);
619 });
7c673cae
FG
620 on_finish = create_async_context_callback(m_image_ctx, on_finish);
621
9f95a23c
TL
622 std::unique_lock locker{m_lock};
623 m_listener_cond.wait(locker, [this] { return !m_listener_notify; });
7c673cae
FG
624
625 Listeners listeners(m_listeners);
626 m_listener_notify = true;
9f95a23c 627 locker.unlock();
7c673cae
FG
628 for (auto listener : listeners) {
629 listener->handle_close();
630 }
631
9f95a23c 632 locker.lock();
7c673cae 633 m_listener_notify = false;
9f95a23c 634 m_listener_cond.notify_all();
7c673cae 635
11fdf7f2 636 ceph_assert(m_state != STATE_UNINITIALIZED);
7c673cae
FG
637 if (m_state == STATE_CLOSED) {
638 on_finish->complete(m_error_result);
639 return;
640 }
641
642 if (m_state == STATE_READY) {
643 stop_recording();
644 }
645
646 m_close_pending = true;
647 wait_for_steady_state(on_finish);
648}
649
650template <typename I>
651bool Journal<I>::is_tag_owner() const {
9f95a23c 652 std::lock_guard locker{m_lock};
7c673cae
FG
653 return is_tag_owner(m_lock);
654}
655
656template <typename I>
9f95a23c
TL
657bool Journal<I>::is_tag_owner(const ceph::mutex &) const {
658 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae
FG
659 return (m_tag_data.mirror_uuid == LOCAL_MIRROR_UUID);
660}
661
662template <typename I>
663uint64_t Journal<I>::get_tag_tid() const {
9f95a23c 664 std::lock_guard locker{m_lock};
7c673cae
FG
665 return m_tag_tid;
666}
667
668template <typename I>
669journal::TagData Journal<I>::get_tag_data() const {
9f95a23c 670 std::lock_guard locker{m_lock};
7c673cae
FG
671 return m_tag_data;
672}
673
674template <typename I>
675void Journal<I>::allocate_local_tag(Context *on_finish) {
676 CephContext *cct = m_image_ctx.cct;
677 ldout(cct, 20) << this << " " << __func__ << dendl;
678
679 journal::TagPredecessor predecessor;
680 predecessor.mirror_uuid = LOCAL_MIRROR_UUID;
681 {
9f95a23c 682 std::lock_guard locker{m_lock};
11fdf7f2 683 ceph_assert(m_journaler != nullptr && is_tag_owner(m_lock));
7c673cae
FG
684
685 cls::journal::Client client;
686 int r = m_journaler->get_cached_client(IMAGE_CLIENT_ID, &client);
687 if (r < 0) {
688 lderr(cct) << this << " " << __func__ << ": "
689 << "failed to retrieve client: " << cpp_strerror(r) << dendl;
690 m_image_ctx.op_work_queue->queue(on_finish, r);
691 return;
692 }
693
694 // since we are primary, populate the predecessor with our known commit
695 // position
11fdf7f2 696 ceph_assert(m_tag_data.mirror_uuid == LOCAL_MIRROR_UUID);
7c673cae
FG
697 if (!client.commit_position.object_positions.empty()) {
698 auto position = client.commit_position.object_positions.front();
699 predecessor.commit_valid = true;
700 predecessor.tag_tid = position.tag_tid;
701 predecessor.entry_tid = position.entry_tid;
702 }
703 }
704
705 allocate_tag(LOCAL_MIRROR_UUID, predecessor, on_finish);
706}
707
708template <typename I>
709void Journal<I>::allocate_tag(const std::string &mirror_uuid,
710 const journal::TagPredecessor &predecessor,
711 Context *on_finish) {
712 CephContext *cct = m_image_ctx.cct;
713 ldout(cct, 20) << this << " " << __func__ << ": mirror_uuid=" << mirror_uuid
714 << dendl;
715
9f95a23c 716 std::lock_guard locker{m_lock};
11fdf7f2 717 ceph_assert(m_journaler != nullptr);
7c673cae
FG
718
719 journal::TagData tag_data;
720 tag_data.mirror_uuid = mirror_uuid;
721 tag_data.predecessor = predecessor;
722
723 bufferlist tag_bl;
11fdf7f2 724 encode(tag_data, tag_bl);
7c673cae
FG
725
726 C_DecodeTag *decode_tag_ctx = new C_DecodeTag(cct, &m_lock, &m_tag_tid,
727 &m_tag_data, on_finish);
728 m_journaler->allocate_tag(m_tag_class, tag_bl, &decode_tag_ctx->tag,
729 decode_tag_ctx);
730}
731
732template <typename I>
733void Journal<I>::flush_commit_position(Context *on_finish) {
734 CephContext *cct = m_image_ctx.cct;
735 ldout(cct, 20) << this << " " << __func__ << dendl;
736
9f95a23c 737 std::lock_guard locker{m_lock};
11fdf7f2 738 ceph_assert(m_journaler != nullptr);
7c673cae
FG
739 m_journaler->flush_commit_position(on_finish);
740}
741
494da23a
TL
742template <typename I>
743void Journal<I>::user_flushed() {
744 if (m_state == STATE_READY && !m_user_flushed.exchange(true) &&
745 m_image_ctx.config.template get_val<bool>("rbd_journal_object_writethrough_until_flush")) {
9f95a23c 746 std::lock_guard locker{m_lock};
494da23a
TL
747 if (m_state == STATE_READY) {
748 CephContext *cct = m_image_ctx.cct;
749 ldout(cct, 5) << this << " " << __func__ << dendl;
750
751 ceph_assert(m_journaler != nullptr);
752 m_journaler->set_append_batch_options(
753 m_image_ctx.config.template get_val<uint64_t>("rbd_journal_object_flush_interval"),
754 m_image_ctx.config.template get_val<Option::size_t>("rbd_journal_object_flush_bytes"),
755 m_image_ctx.config.template get_val<double>("rbd_journal_object_flush_age"));
756 } else {
757 m_user_flushed = false;
758 }
759 }
760}
761
7c673cae
FG
762template <typename I>
763uint64_t Journal<I>::append_write_event(uint64_t offset, size_t length,
764 const bufferlist &bl,
7c673cae 765 bool flush_entry) {
11fdf7f2 766 ceph_assert(m_max_append_size > journal::AioWriteEvent::get_fixed_size());
7c673cae
FG
767 uint64_t max_write_data_size =
768 m_max_append_size - journal::AioWriteEvent::get_fixed_size();
769
770 // ensure that the write event fits within the journal entry
771 Bufferlists bufferlists;
772 uint64_t bytes_remaining = length;
773 uint64_t event_offset = 0;
774 do {
11fdf7f2 775 uint64_t event_length = std::min(bytes_remaining, max_write_data_size);
7c673cae
FG
776
777 bufferlist event_bl;
778 event_bl.substr_of(bl, event_offset, event_length);
779 journal::EventEntry event_entry(journal::AioWriteEvent(offset + event_offset,
780 event_length,
781 event_bl),
782 ceph_clock_now());
783
784 bufferlists.emplace_back();
11fdf7f2 785 encode(event_entry, bufferlists.back());
7c673cae
FG
786
787 event_offset += event_length;
788 bytes_remaining -= event_length;
789 } while (bytes_remaining > 0);
790
11fdf7f2
TL
791 return append_io_events(journal::EVENT_TYPE_AIO_WRITE, bufferlists, offset,
792 length, flush_entry, 0);
7c673cae
FG
793}
794
39ae355f
TL
795template <typename I>
796uint64_t Journal<I>::append_compare_and_write_event(uint64_t offset,
797 size_t length,
798 const bufferlist &cmp_bl,
799 const bufferlist &write_bl,
800 bool flush_entry) {
801 ceph_assert(
802 m_max_append_size > journal::AioCompareAndWriteEvent::get_fixed_size());
803 uint64_t max_compare_and_write_data_size =
804 m_max_append_size - journal::AioCompareAndWriteEvent::get_fixed_size();
805 // we need double the size because we store cmp and write buffers
806 max_compare_and_write_data_size /= 2;
807
808 // ensure that the compare and write event fits within the journal entry
809 Bufferlists bufferlists;
810 uint64_t bytes_remaining = length;
811 uint64_t event_offset = 0;
812 do {
813 uint64_t event_length = std::min(bytes_remaining,
814 max_compare_and_write_data_size);
815
816 bufferlist event_cmp_bl;
817 event_cmp_bl.substr_of(cmp_bl, event_offset, event_length);
818 bufferlist event_write_bl;
819 event_write_bl.substr_of(write_bl, event_offset, event_length);
820 journal::EventEntry event_entry(
821 journal::AioCompareAndWriteEvent(offset + event_offset,
822 event_length,
823 event_cmp_bl,
824 event_write_bl),
825 ceph_clock_now());
826
827 bufferlists.emplace_back();
828 encode(event_entry, bufferlists.back());
829
830 event_offset += event_length;
831 bytes_remaining -= event_length;
832 } while (bytes_remaining > 0);
833
834 return append_io_events(journal::EVENT_TYPE_AIO_COMPARE_AND_WRITE,
835 bufferlists, offset, length, flush_entry, -EILSEQ);
836}
837
7c673cae
FG
838template <typename I>
839uint64_t Journal<I>::append_io_event(journal::EventEntry &&event_entry,
7c673cae 840 uint64_t offset, size_t length,
b32b8144 841 bool flush_entry, int filter_ret_val) {
7c673cae
FG
842 bufferlist bl;
843 event_entry.timestamp = ceph_clock_now();
11fdf7f2
TL
844 encode(event_entry, bl);
845 return append_io_events(event_entry.get_event_type(), {bl}, offset, length,
846 flush_entry, filter_ret_val);
7c673cae
FG
847}
848
849template <typename I>
850uint64_t Journal<I>::append_io_events(journal::EventType event_type,
851 const Bufferlists &bufferlists,
7c673cae 852 uint64_t offset, size_t length,
b32b8144 853 bool flush_entry, int filter_ret_val) {
11fdf7f2 854 ceph_assert(!bufferlists.empty());
7c673cae
FG
855
856 uint64_t tid;
857 {
9f95a23c 858 std::lock_guard locker{m_lock};
11fdf7f2 859 ceph_assert(m_state == STATE_READY);
7c673cae
FG
860
861 tid = ++m_event_tid;
11fdf7f2 862 ceph_assert(tid != 0);
7c673cae
FG
863 }
864
865 Futures futures;
866 for (auto &bl : bufferlists) {
11fdf7f2 867 ceph_assert(bl.length() <= m_max_append_size);
7c673cae
FG
868 futures.push_back(m_journaler->append(m_tag_tid, bl));
869 }
870
871 {
9f95a23c 872 std::lock_guard event_locker{m_event_lock};
11fdf7f2 873 m_events[tid] = Event(futures, offset, length, filter_ret_val);
7c673cae
FG
874 }
875
876 CephContext *cct = m_image_ctx.cct;
877 ldout(cct, 20) << this << " " << __func__ << ": "
878 << "event=" << event_type << ", "
7c673cae
FG
879 << "offset=" << offset << ", "
880 << "length=" << length << ", "
881 << "flush=" << flush_entry << ", tid=" << tid << dendl;
882
883 Context *on_safe = create_async_context_callback(
884 m_image_ctx, new C_IOEventSafe(this, tid));
885 if (flush_entry) {
886 futures.back().flush(on_safe);
887 } else {
888 futures.back().wait(on_safe);
889 }
890
891 return tid;
892}
893
894template <typename I>
895void Journal<I>::commit_io_event(uint64_t tid, int r) {
896 CephContext *cct = m_image_ctx.cct;
897 ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
898 "r=" << r << dendl;
899
9f95a23c 900 std::lock_guard event_locker{m_event_lock};
7c673cae
FG
901 typename Events::iterator it = m_events.find(tid);
902 if (it == m_events.end()) {
903 return;
904 }
905 complete_event(it, r);
906}
907
908template <typename I>
909void Journal<I>::commit_io_event_extent(uint64_t tid, uint64_t offset,
910 uint64_t length, int r) {
11fdf7f2 911 ceph_assert(length > 0);
7c673cae
FG
912
913 CephContext *cct = m_image_ctx.cct;
914 ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
915 << "offset=" << offset << ", "
916 << "length=" << length << ", "
917 << "r=" << r << dendl;
918
9f95a23c 919 std::lock_guard event_locker{m_event_lock};
7c673cae
FG
920 typename Events::iterator it = m_events.find(tid);
921 if (it == m_events.end()) {
922 return;
923 }
924
925 Event &event = it->second;
926 if (event.ret_val == 0 && r < 0) {
927 event.ret_val = r;
928 }
929
930 ExtentInterval extent;
931 extent.insert(offset, length);
932
933 ExtentInterval intersect;
934 intersect.intersection_of(extent, event.pending_extents);
935
936 event.pending_extents.subtract(intersect);
937 if (!event.pending_extents.empty()) {
938 ldout(cct, 20) << this << " " << __func__ << ": "
939 << "pending extents: " << event.pending_extents << dendl;
940 return;
941 }
942 complete_event(it, event.ret_val);
943}
944
945template <typename I>
946void Journal<I>::append_op_event(uint64_t op_tid,
947 journal::EventEntry &&event_entry,
948 Context *on_safe) {
9f95a23c 949 ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
7c673cae
FG
950
951 bufferlist bl;
952 event_entry.timestamp = ceph_clock_now();
11fdf7f2 953 encode(event_entry, bl);
7c673cae
FG
954
955 Future future;
956 {
9f95a23c 957 std::lock_guard locker{m_lock};
11fdf7f2 958 ceph_assert(m_state == STATE_READY);
7c673cae
FG
959
960 future = m_journaler->append(m_tag_tid, bl);
961
962 // delay committing op event to ensure consistent replay
11fdf7f2 963 ceph_assert(m_op_futures.count(op_tid) == 0);
7c673cae
FG
964 m_op_futures[op_tid] = future;
965 }
966
967 on_safe = create_async_context_callback(m_image_ctx, on_safe);
9f95a23c 968 on_safe = new LambdaContext([this, on_safe](int r) {
7c673cae
FG
969 // ensure all committed IO before this op is committed
970 m_journaler->flush_commit_position(on_safe);
971 });
972 future.flush(on_safe);
973
974 CephContext *cct = m_image_ctx.cct;
975 ldout(cct, 10) << this << " " << __func__ << ": "
976 << "op_tid=" << op_tid << ", "
977 << "event=" << event_entry.get_event_type() << dendl;
978}
979
980template <typename I>
981void Journal<I>::commit_op_event(uint64_t op_tid, int r, Context *on_safe) {
982 CephContext *cct = m_image_ctx.cct;
983 ldout(cct, 10) << this << " " << __func__ << ": op_tid=" << op_tid << ", "
984 << "r=" << r << dendl;
985
986 journal::EventEntry event_entry((journal::OpFinishEvent(op_tid, r)),
987 ceph_clock_now());
988
989 bufferlist bl;
11fdf7f2 990 encode(event_entry, bl);
7c673cae
FG
991
992 Future op_start_future;
993 Future op_finish_future;
994 {
9f95a23c 995 std::lock_guard locker{m_lock};
11fdf7f2 996 ceph_assert(m_state == STATE_READY);
7c673cae
FG
997
998 // ready to commit op event
999 auto it = m_op_futures.find(op_tid);
11fdf7f2 1000 ceph_assert(it != m_op_futures.end());
7c673cae
FG
1001 op_start_future = it->second;
1002 m_op_futures.erase(it);
1003
1004 op_finish_future = m_journaler->append(m_tag_tid, bl);
1005 }
1006
1007 op_finish_future.flush(create_async_context_callback(
1008 m_image_ctx, new C_OpEventSafe(this, op_tid, op_start_future,
1009 op_finish_future, on_safe)));
1010}
1011
1012template <typename I>
1013void Journal<I>::replay_op_ready(uint64_t op_tid, Context *on_resume) {
1014 CephContext *cct = m_image_ctx.cct;
1015 ldout(cct, 10) << this << " " << __func__ << ": op_tid=" << op_tid << dendl;
1016
1017 {
9f95a23c 1018 std::lock_guard locker{m_lock};
11fdf7f2 1019 ceph_assert(m_journal_replay != nullptr);
7c673cae
FG
1020 m_journal_replay->replay_op_ready(op_tid, on_resume);
1021 }
1022}
1023
1024template <typename I>
1025void Journal<I>::flush_event(uint64_t tid, Context *on_safe) {
1026 CephContext *cct = m_image_ctx.cct;
1027 ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
1028 << "on_safe=" << on_safe << dendl;
1029
9f95a23c
TL
1030 on_safe = create_context_callback<Context>(on_safe, this);
1031
7c673cae
FG
1032 Future future;
1033 {
9f95a23c 1034 std::lock_guard event_locker{m_event_lock};
7c673cae
FG
1035 future = wait_event(m_lock, tid, on_safe);
1036 }
1037
1038 if (future.is_valid()) {
1039 future.flush(nullptr);
1040 }
1041}
1042
1043template <typename I>
1044void Journal<I>::wait_event(uint64_t tid, Context *on_safe) {
1045 CephContext *cct = m_image_ctx.cct;
1046 ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
1047 << "on_safe=" << on_safe << dendl;
1048
9f95a23c
TL
1049 on_safe = create_context_callback<Context>(on_safe, this);
1050
1051 std::lock_guard event_locker{m_event_lock};
7c673cae
FG
1052 wait_event(m_lock, tid, on_safe);
1053}
1054
1055template <typename I>
9f95a23c 1056typename Journal<I>::Future Journal<I>::wait_event(ceph::mutex &lock, uint64_t tid,
7c673cae 1057 Context *on_safe) {
9f95a23c 1058 ceph_assert(ceph_mutex_is_locked(m_event_lock));
7c673cae
FG
1059 CephContext *cct = m_image_ctx.cct;
1060
1061 typename Events::iterator it = m_events.find(tid);
11fdf7f2 1062 ceph_assert(it != m_events.end());
7c673cae
FG
1063
1064 Event &event = it->second;
1065 if (event.safe) {
1066 // journal entry already safe
1067 ldout(cct, 20) << this << " " << __func__ << ": "
1068 << "journal entry already safe" << dendl;
1069 m_image_ctx.op_work_queue->queue(on_safe, event.ret_val);
1070 return Future();
1071 }
1072
1073 event.on_safe_contexts.push_back(create_async_context_callback(m_image_ctx,
1074 on_safe));
1075 return event.futures.back();
1076}
1077
1078template <typename I>
1079void Journal<I>::start_external_replay(journal::Replay<I> **journal_replay,
1080 Context *on_start) {
1081 CephContext *cct = m_image_ctx.cct;
1082 ldout(cct, 20) << this << " " << __func__ << dendl;
1083
9f95a23c 1084 std::lock_guard locker{m_lock};
11fdf7f2
TL
1085 ceph_assert(m_state == STATE_READY);
1086 ceph_assert(m_journal_replay == nullptr);
7c673cae
FG
1087
1088 on_start = util::create_async_context_callback(m_image_ctx, on_start);
9f95a23c 1089 on_start = new LambdaContext(
7c673cae
FG
1090 [this, journal_replay, on_start](int r) {
1091 handle_start_external_replay(r, journal_replay, on_start);
1092 });
1093
1094 // safely flush all in-flight events before starting external replay
1095 m_journaler->stop_append(util::create_async_context_callback(m_image_ctx,
1096 on_start));
1097}
1098
1099template <typename I>
1100void Journal<I>::handle_start_external_replay(int r,
1101 journal::Replay<I> **journal_replay,
1102 Context *on_finish) {
1103 CephContext *cct = m_image_ctx.cct;
1104 ldout(cct, 20) << this << " " << __func__ << dendl;
1105
9f95a23c 1106 std::lock_guard locker{m_lock};
11fdf7f2
TL
1107 ceph_assert(m_state == STATE_READY);
1108 ceph_assert(m_journal_replay == nullptr);
7c673cae
FG
1109
1110 if (r < 0) {
1111 lderr(cct) << this << " " << __func__ << ": "
1112 << "failed to stop recording: " << cpp_strerror(r) << dendl;
1113 *journal_replay = nullptr;
1114
1115 // get back to a sane-state
1116 start_append();
1117 on_finish->complete(r);
1118 return;
1119 }
1120
1121 transition_state(STATE_REPLAYING, 0);
1122 m_journal_replay = journal::Replay<I>::create(m_image_ctx);
1123 *journal_replay = m_journal_replay;
1124 on_finish->complete(0);
1125}
1126
1127template <typename I>
1128void Journal<I>::stop_external_replay() {
1129 CephContext *cct = m_image_ctx.cct;
1130 ldout(cct, 20) << this << " " << __func__ << dendl;
1131
9f95a23c 1132 std::lock_guard locker{m_lock};
11fdf7f2
TL
1133 ceph_assert(m_journal_replay != nullptr);
1134 ceph_assert(m_state == STATE_REPLAYING);
7c673cae
FG
1135
1136 delete m_journal_replay;
1137 m_journal_replay = nullptr;
1138
1139 if (m_close_pending) {
1140 destroy_journaler(0);
1141 return;
1142 }
1143
1144 start_append();
1145}
1146
1147template <typename I>
1148void Journal<I>::create_journaler() {
1149 CephContext *cct = m_image_ctx.cct;
1150 ldout(cct, 20) << this << " " << __func__ << dendl;
1151
9f95a23c 1152 ceph_assert(ceph_mutex_is_locked(m_lock));
11fdf7f2
TL
1153 ceph_assert(m_state == STATE_UNINITIALIZED || m_state == STATE_RESTARTING_REPLAY);
1154 ceph_assert(m_journaler == NULL);
7c673cae
FG
1155
1156 transition_state(STATE_INITIALIZING, 0);
1157 ::journal::Settings settings;
11fdf7f2
TL
1158 settings.commit_interval =
1159 m_image_ctx.config.template get_val<double>("rbd_journal_commit_age");
1160 settings.max_payload_bytes =
1161 m_image_ctx.config.template get_val<Option::size_t>("rbd_journal_max_payload_bytes");
7c673cae 1162 settings.max_concurrent_object_sets =
11fdf7f2 1163 m_image_ctx.config.template get_val<uint64_t>("rbd_journal_max_concurrent_object_sets");
7c673cae
FG
1164 // TODO: a configurable filter to exclude certain peers from being
1165 // disconnected.
f67539c2 1166 settings.ignored_laggy_clients = {IMAGE_CLIENT_ID};
7c673cae
FG
1167
1168 m_journaler = new Journaler(m_work_queue, m_timer, m_timer_lock,
1169 m_image_ctx.md_ctx, m_image_ctx.id,
9f95a23c 1170 IMAGE_CLIENT_ID, settings, nullptr);
7c673cae
FG
1171 m_journaler->add_listener(&m_metadata_listener);
1172
1173 Context *ctx = create_async_context_callback(
1174 m_image_ctx, create_context_callback<
1175 Journal<I>, &Journal<I>::handle_open>(this));
1176 auto open_req = journal::OpenRequest<I>::create(&m_image_ctx, m_journaler,
1177 &m_lock, &m_client_meta,
1178 &m_tag_tid, &m_tag_data, ctx);
1179 open_req->send();
1180}
1181
1182template <typename I>
1183void Journal<I>::destroy_journaler(int r) {
1184 CephContext *cct = m_image_ctx.cct;
1185 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1186
9f95a23c 1187 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae
FG
1188
1189 delete m_journal_replay;
1190 m_journal_replay = NULL;
1191
1192 m_journaler->remove_listener(&m_metadata_listener);
1193
1194 transition_state(STATE_CLOSING, r);
1195
1196 Context *ctx = create_async_context_callback(
1197 m_image_ctx, create_context_callback<
1198 Journal<I>, &Journal<I>::handle_journal_destroyed>(this));
9f95a23c 1199 ctx = new LambdaContext(
7c673cae 1200 [this, ctx](int r) {
9f95a23c 1201 std::lock_guard locker{m_lock};
7c673cae
FG
1202 m_journaler->shut_down(ctx);
1203 });
9f95a23c
TL
1204 ctx = create_async_context_callback(m_image_ctx, ctx);
1205 m_async_journal_op_tracker.wait_for_ops(ctx);
7c673cae
FG
1206}
1207
1208template <typename I>
1209void Journal<I>::recreate_journaler(int r) {
1210 CephContext *cct = m_image_ctx.cct;
1211 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1212
9f95a23c 1213 ceph_assert(ceph_mutex_is_locked(m_lock));
11fdf7f2
TL
1214 ceph_assert(m_state == STATE_FLUSHING_RESTART ||
1215 m_state == STATE_FLUSHING_REPLAY);
7c673cae
FG
1216
1217 delete m_journal_replay;
1218 m_journal_replay = NULL;
1219
1220 m_journaler->remove_listener(&m_metadata_listener);
1221
1222 transition_state(STATE_RESTARTING_REPLAY, r);
1223 m_journaler->shut_down(create_async_context_callback(
1224 m_image_ctx, create_context_callback<
1225 Journal<I>, &Journal<I>::handle_journal_destroyed>(this)));
1226}
1227
1228template <typename I>
1229void Journal<I>::complete_event(typename Events::iterator it, int r) {
9f95a23c 1230 ceph_assert(ceph_mutex_is_locked(m_event_lock));
11fdf7f2 1231 ceph_assert(m_state == STATE_READY);
7c673cae
FG
1232
1233 CephContext *cct = m_image_ctx.cct;
1234 ldout(cct, 20) << this << " " << __func__ << ": tid=" << it->first << " "
1235 << "r=" << r << dendl;
1236
1237 Event &event = it->second;
b32b8144
FG
1238 if (r < 0 && r == event.filter_ret_val) {
1239 // ignore allowed error codes
1240 r = 0;
1241 }
7c673cae
FG
1242 if (r < 0) {
1243 // event recorded to journal but failed to update disk, we cannot
1244 // commit this IO event. this event must be replayed.
11fdf7f2 1245 ceph_assert(event.safe);
7c673cae
FG
1246 lderr(cct) << this << " " << __func__ << ": "
1247 << "failed to commit IO to disk, replay required: "
1248 << cpp_strerror(r) << dendl;
1249 }
1250
1251 event.committed_io = true;
1252 if (event.safe) {
1253 if (r >= 0) {
1254 for (auto &future : event.futures) {
1255 m_journaler->committed(future);
1256 }
1257 }
1258 m_events.erase(it);
1259 }
1260}
1261
1262template <typename I>
1263void Journal<I>::start_append() {
9f95a23c 1264 ceph_assert(ceph_mutex_is_locked(m_lock));
494da23a 1265
11fdf7f2 1266 m_journaler->start_append(
11fdf7f2 1267 m_image_ctx.config.template get_val<uint64_t>("rbd_journal_object_max_in_flight_appends"));
494da23a
TL
1268 if (!m_image_ctx.config.template get_val<bool>("rbd_journal_object_writethrough_until_flush")) {
1269 m_journaler->set_append_batch_options(
1270 m_image_ctx.config.template get_val<uint64_t>("rbd_journal_object_flush_interval"),
1271 m_image_ctx.config.template get_val<Option::size_t>("rbd_journal_object_flush_bytes"),
1272 m_image_ctx.config.template get_val<double>("rbd_journal_object_flush_age"));
1273 }
1274
7c673cae
FG
1275 transition_state(STATE_READY, 0);
1276}
1277
1278template <typename I>
1279void Journal<I>::handle_open(int r) {
1280 CephContext *cct = m_image_ctx.cct;
1281 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1282
9f95a23c 1283 std::lock_guard locker{m_lock};
11fdf7f2 1284 ceph_assert(m_state == STATE_INITIALIZING);
7c673cae
FG
1285
1286 if (r < 0) {
1287 lderr(cct) << this << " " << __func__ << ": "
1288 << "failed to initialize journal: " << cpp_strerror(r)
1289 << dendl;
1290 destroy_journaler(r);
1291 return;
1292 }
1293
1294 m_tag_class = m_client_meta.tag_class;
1295 m_max_append_size = m_journaler->get_max_append_size();
1296 ldout(cct, 20) << this << " " << __func__ << ": "
1297 << "tag_class=" << m_tag_class << ", "
1298 << "max_append_size=" << m_max_append_size << dendl;
1299
1300 transition_state(STATE_REPLAYING, 0);
1301 m_journal_replay = journal::Replay<I>::create(m_image_ctx);
1302 m_journaler->start_replay(&m_replay_handler);
1303}
1304
1305template <typename I>
1306void Journal<I>::handle_replay_ready() {
1307 CephContext *cct = m_image_ctx.cct;
1308 ReplayEntry replay_entry;
1309 {
9f95a23c 1310 std::lock_guard locker{m_lock};
7c673cae
FG
1311 if (m_state != STATE_REPLAYING) {
1312 return;
1313 }
1314
1315 ldout(cct, 20) << this << " " << __func__ << dendl;
1316 if (!m_journaler->try_pop_front(&replay_entry)) {
1317 return;
1318 }
1319
1320 // only one entry should be in-flight at a time
11fdf7f2 1321 ceph_assert(!m_processing_entry);
7c673cae
FG
1322 m_processing_entry = true;
1323 }
1324
9f95a23c
TL
1325 m_async_journal_op_tracker.start_op();
1326
7c673cae 1327 bufferlist data = replay_entry.get_data();
11fdf7f2 1328 auto it = data.cbegin();
7c673cae
FG
1329
1330 journal::EventEntry event_entry;
1331 int r = m_journal_replay->decode(&it, &event_entry);
1332 if (r < 0) {
1333 lderr(cct) << this << " " << __func__ << ": "
1334 << "failed to decode journal event entry" << dendl;
1335 handle_replay_process_safe(replay_entry, r);
1336 return;
1337 }
1338
1339 Context *on_ready = create_context_callback<
1340 Journal<I>, &Journal<I>::handle_replay_process_ready>(this);
1341 Context *on_commit = new C_ReplayProcessSafe(this, std::move(replay_entry));
1342 m_journal_replay->process(event_entry, on_ready, on_commit);
1343}
1344
1345template <typename I>
1346void Journal<I>::handle_replay_complete(int r) {
1347 CephContext *cct = m_image_ctx.cct;
1348
1349 bool cancel_ops = false;
1350 {
9f95a23c 1351 std::lock_guard locker{m_lock};
7c673cae
FG
1352 if (m_state != STATE_REPLAYING) {
1353 return;
1354 }
1355
1356 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1357 if (r < 0) {
1358 cancel_ops = true;
1359 transition_state(STATE_FLUSHING_RESTART, r);
1360 } else {
1361 // state might change back to FLUSHING_RESTART on flush error
1362 transition_state(STATE_FLUSHING_REPLAY, 0);
1363 }
1364 }
1365
9f95a23c 1366 Context *ctx = new LambdaContext([this, cct](int r) {
7c673cae
FG
1367 ldout(cct, 20) << this << " handle_replay_complete: "
1368 << "handle shut down replay" << dendl;
1369
1370 State state;
1371 {
9f95a23c 1372 std::lock_guard locker{m_lock};
11fdf7f2
TL
1373 ceph_assert(m_state == STATE_FLUSHING_RESTART ||
1374 m_state == STATE_FLUSHING_REPLAY);
7c673cae
FG
1375 state = m_state;
1376 }
1377
1378 if (state == STATE_FLUSHING_RESTART) {
1379 handle_flushing_restart(0);
1380 } else {
1381 handle_flushing_replay();
1382 }
1383 });
9f95a23c 1384 ctx = new LambdaContext([this, ctx](int r) {
94b18763
FG
1385 // ensure the commit position is flushed to disk
1386 m_journaler->flush_commit_position(ctx);
1387 });
9f95a23c
TL
1388 ctx = create_async_context_callback(m_image_ctx, ctx);
1389 ctx = new LambdaContext([this, ctx](int r) {
1390 m_async_journal_op_tracker.wait_for_ops(ctx);
1391 });
1392 ctx = new LambdaContext([this, cct, cancel_ops, ctx](int r) {
7c673cae
FG
1393 ldout(cct, 20) << this << " handle_replay_complete: "
1394 << "shut down replay" << dendl;
1395 m_journal_replay->shut_down(cancel_ops, ctx);
1396 });
9f95a23c 1397
7c673cae
FG
1398 m_journaler->stop_replay(ctx);
1399}
1400
1401template <typename I>
1402void Journal<I>::handle_replay_process_ready(int r) {
1403 // journal::Replay is ready for more events -- attempt to pop another
1404 CephContext *cct = m_image_ctx.cct;
1405 ldout(cct, 20) << this << " " << __func__ << dendl;
1406
11fdf7f2 1407 ceph_assert(r == 0);
7c673cae 1408 {
9f95a23c 1409 std::lock_guard locker{m_lock};
11fdf7f2 1410 ceph_assert(m_processing_entry);
7c673cae
FG
1411 m_processing_entry = false;
1412 }
1413 handle_replay_ready();
1414}
1415
1416template <typename I>
1417void Journal<I>::handle_replay_process_safe(ReplayEntry replay_entry, int r) {
1418 CephContext *cct = m_image_ctx.cct;
1419
9f95a23c 1420 std::unique_lock locker{m_lock};
11fdf7f2
TL
1421 ceph_assert(m_state == STATE_REPLAYING ||
1422 m_state == STATE_FLUSHING_RESTART ||
1423 m_state == STATE_FLUSHING_REPLAY);
7c673cae
FG
1424
1425 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1426 if (r < 0) {
31f18b77
FG
1427 if (r != -ECANCELED) {
1428 lderr(cct) << this << " " << __func__ << ": "
1429 << "failed to commit journal event to disk: "
1430 << cpp_strerror(r) << dendl;
1431 }
7c673cae
FG
1432
1433 if (m_state == STATE_REPLAYING) {
1434 // abort the replay if we have an error
1435 transition_state(STATE_FLUSHING_RESTART, r);
9f95a23c 1436 locker.unlock();
7c673cae
FG
1437
1438 // stop replay, shut down, and restart
94b18763
FG
1439 Context* ctx = create_context_callback<
1440 Journal<I>, &Journal<I>::handle_flushing_restart>(this);
9f95a23c 1441 ctx = new LambdaContext([this, ctx](int r) {
94b18763
FG
1442 // ensure the commit position is flushed to disk
1443 m_journaler->flush_commit_position(ctx);
1444 });
9f95a23c 1445 ctx = new LambdaContext([this, cct, ctx](int r) {
7c673cae
FG
1446 ldout(cct, 20) << this << " handle_replay_process_safe: "
1447 << "shut down replay" << dendl;
1448 {
9f95a23c 1449 std::lock_guard locker{m_lock};
11fdf7f2 1450 ceph_assert(m_state == STATE_FLUSHING_RESTART);
7c673cae
FG
1451 }
1452
94b18763 1453 m_journal_replay->shut_down(true, ctx);
7c673cae
FG
1454 });
1455 m_journaler->stop_replay(ctx);
9f95a23c 1456 m_async_journal_op_tracker.finish_op();
7c673cae
FG
1457 return;
1458 } else if (m_state == STATE_FLUSHING_REPLAY) {
1459 // end-of-replay flush in-progress -- we need to restart replay
1460 transition_state(STATE_FLUSHING_RESTART, r);
9f95a23c
TL
1461 locker.unlock();
1462 m_async_journal_op_tracker.finish_op();
7c673cae
FG
1463 return;
1464 }
1465 } else {
1466 // only commit the entry if written successfully
1467 m_journaler->committed(replay_entry);
1468 }
9f95a23c
TL
1469 locker.unlock();
1470 m_async_journal_op_tracker.finish_op();
7c673cae
FG
1471}
1472
1473template <typename I>
1474void Journal<I>::handle_flushing_restart(int r) {
9f95a23c 1475 std::lock_guard locker{m_lock};
7c673cae
FG
1476
1477 CephContext *cct = m_image_ctx.cct;
1478 ldout(cct, 20) << this << " " << __func__ << dendl;
1479
11fdf7f2
TL
1480 ceph_assert(r == 0);
1481 ceph_assert(m_state == STATE_FLUSHING_RESTART);
7c673cae
FG
1482 if (m_close_pending) {
1483 destroy_journaler(r);
1484 return;
1485 }
1486
1487 recreate_journaler(r);
1488}
1489
1490template <typename I>
1491void Journal<I>::handle_flushing_replay() {
9f95a23c 1492 std::lock_guard locker{m_lock};
7c673cae
FG
1493
1494 CephContext *cct = m_image_ctx.cct;
1495 ldout(cct, 20) << this << " " << __func__ << dendl;
1496
11fdf7f2
TL
1497 ceph_assert(m_state == STATE_FLUSHING_REPLAY ||
1498 m_state == STATE_FLUSHING_RESTART);
7c673cae
FG
1499 if (m_close_pending) {
1500 destroy_journaler(0);
1501 return;
1502 } else if (m_state == STATE_FLUSHING_RESTART) {
1503 // failed to replay one-or-more events -- restart
1504 recreate_journaler(0);
1505 return;
1506 }
1507
1508 delete m_journal_replay;
1509 m_journal_replay = NULL;
1510
1511 m_error_result = 0;
1512 start_append();
1513}
1514
1515template <typename I>
1516void Journal<I>::handle_recording_stopped(int r) {
1517 CephContext *cct = m_image_ctx.cct;
1518 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1519
9f95a23c 1520 std::lock_guard locker{m_lock};
11fdf7f2 1521 ceph_assert(m_state == STATE_STOPPING);
7c673cae
FG
1522
1523 destroy_journaler(r);
1524}
1525
1526template <typename I>
1527void Journal<I>::handle_journal_destroyed(int r) {
1528 CephContext *cct = m_image_ctx.cct;
1529 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1530
1531 if (r < 0) {
1532 lderr(cct) << this << " " << __func__
1533 << "error detected while closing journal: " << cpp_strerror(r)
1534 << dendl;
1535 }
1536
9f95a23c 1537 std::lock_guard locker{m_lock};
7c673cae
FG
1538 delete m_journaler;
1539 m_journaler = nullptr;
1540
11fdf7f2 1541 ceph_assert(m_state == STATE_CLOSING || m_state == STATE_RESTARTING_REPLAY);
7c673cae
FG
1542 if (m_state == STATE_RESTARTING_REPLAY) {
1543 create_journaler();
1544 return;
1545 }
1546
1547 transition_state(STATE_CLOSED, r);
1548}
1549
1550template <typename I>
1551void Journal<I>::handle_io_event_safe(int r, uint64_t tid) {
1552 CephContext *cct = m_image_ctx.cct;
1553 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << ", "
1554 << "tid=" << tid << dendl;
1555
1556 // journal will be flushed before closing
11fdf7f2 1557 ceph_assert(m_state == STATE_READY || m_state == STATE_STOPPING);
7c673cae
FG
1558 if (r < 0) {
1559 lderr(cct) << this << " " << __func__ << ": "
1560 << "failed to commit IO event: " << cpp_strerror(r) << dendl;
1561 }
1562
7c673cae
FG
1563 Contexts on_safe_contexts;
1564 {
9f95a23c 1565 std::lock_guard event_locker{m_event_lock};
7c673cae 1566 typename Events::iterator it = m_events.find(tid);
11fdf7f2 1567 ceph_assert(it != m_events.end());
7c673cae
FG
1568
1569 Event &event = it->second;
7c673cae
FG
1570 on_safe_contexts.swap(event.on_safe_contexts);
1571
1572 if (r < 0 || event.committed_io) {
1573 // failed journal write so IO won't be sent -- or IO extent was
1574 // overwritten by future IO operations so this was a no-op IO event
1575 event.ret_val = r;
1576 for (auto &future : event.futures) {
1577 m_journaler->committed(future);
1578 }
1579 }
1580
1581 if (event.committed_io) {
1582 m_events.erase(it);
1583 } else {
1584 event.safe = true;
1585 }
1586 }
1587
1588 ldout(cct, 20) << this << " " << __func__ << ": "
1589 << "completing tid=" << tid << dendl;
7c673cae
FG
1590
1591 // alert the cache about the journal event status
1592 for (Contexts::iterator it = on_safe_contexts.begin();
1593 it != on_safe_contexts.end(); ++it) {
1594 (*it)->complete(r);
1595 }
1596}
1597
1598template <typename I>
1599void Journal<I>::handle_op_event_safe(int r, uint64_t tid,
1600 const Future &op_start_future,
1601 const Future &op_finish_future,
1602 Context *on_safe) {
1603 CephContext *cct = m_image_ctx.cct;
1604 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << ", "
1605 << "tid=" << tid << dendl;
1606
1607 // journal will be flushed before closing
11fdf7f2 1608 ceph_assert(m_state == STATE_READY || m_state == STATE_STOPPING);
7c673cae
FG
1609 if (r < 0) {
1610 lderr(cct) << this << " " << __func__ << ": "
1611 << "failed to commit op event: " << cpp_strerror(r) << dendl;
1612 }
1613
1614 m_journaler->committed(op_start_future);
1615 m_journaler->committed(op_finish_future);
1616
1617 // reduce the replay window after committing an op event
1618 m_journaler->flush_commit_position(on_safe);
1619}
1620
1621template <typename I>
1622void Journal<I>::stop_recording() {
9f95a23c 1623 ceph_assert(ceph_mutex_is_locked(m_lock));
11fdf7f2 1624 ceph_assert(m_journaler != NULL);
7c673cae 1625
11fdf7f2 1626 ceph_assert(m_state == STATE_READY);
7c673cae
FG
1627 transition_state(STATE_STOPPING, 0);
1628
1629 m_journaler->stop_append(util::create_async_context_callback(
1630 m_image_ctx, create_context_callback<
1631 Journal<I>, &Journal<I>::handle_recording_stopped>(this)));
1632}
1633
1634template <typename I>
1635void Journal<I>::transition_state(State state, int r) {
1636 CephContext *cct = m_image_ctx.cct;
1637 ldout(cct, 20) << this << " " << __func__ << ": new state=" << state << dendl;
9f95a23c 1638 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae
FG
1639 m_state = state;
1640
1641 if (m_error_result == 0 && r < 0) {
1642 m_error_result = r;
1643 }
1644
1645 if (is_steady_state()) {
f67539c2
TL
1646 auto wait_for_state_contexts(std::move(m_wait_for_state_contexts));
1647 m_wait_for_state_contexts.clear();
1648
7c673cae
FG
1649 for (auto ctx : wait_for_state_contexts) {
1650 ctx->complete(m_error_result);
1651 }
1652 }
1653}
1654
1655template <typename I>
1656bool Journal<I>::is_steady_state() const {
9f95a23c 1657 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae
FG
1658 switch (m_state) {
1659 case STATE_READY:
1660 case STATE_CLOSED:
1661 return true;
1662 case STATE_UNINITIALIZED:
1663 case STATE_INITIALIZING:
1664 case STATE_REPLAYING:
1665 case STATE_FLUSHING_RESTART:
1666 case STATE_RESTARTING_REPLAY:
1667 case STATE_FLUSHING_REPLAY:
1668 case STATE_STOPPING:
1669 case STATE_CLOSING:
1670 break;
1671 }
1672 return false;
1673}
1674
1675template <typename I>
1676void Journal<I>::wait_for_steady_state(Context *on_state) {
9f95a23c 1677 ceph_assert(ceph_mutex_is_locked(m_lock));
11fdf7f2 1678 ceph_assert(!is_steady_state());
7c673cae
FG
1679
1680 CephContext *cct = m_image_ctx.cct;
1681 ldout(cct, 20) << this << " " << __func__ << ": on_state=" << on_state
1682 << dendl;
1683 m_wait_for_state_contexts.push_back(on_state);
1684}
1685
1686template <typename I>
1687int Journal<I>::is_resync_requested(bool *do_resync) {
9f95a23c 1688 std::lock_guard l{m_lock};
7c673cae
FG
1689 return check_resync_requested(do_resync);
1690}
1691
1692template <typename I>
1693int Journal<I>::check_resync_requested(bool *do_resync) {
1694 CephContext *cct = m_image_ctx.cct;
1695 ldout(cct, 20) << this << " " << __func__ << dendl;
1696
9f95a23c 1697 ceph_assert(ceph_mutex_is_locked(m_lock));
11fdf7f2 1698 ceph_assert(do_resync != nullptr);
7c673cae
FG
1699
1700 cls::journal::Client client;
1701 int r = m_journaler->get_cached_client(IMAGE_CLIENT_ID, &client);
1702 if (r < 0) {
1703 lderr(cct) << this << " " << __func__ << ": "
1704 << "failed to retrieve client: " << cpp_strerror(r) << dendl;
1705 return r;
1706 }
1707
1708 librbd::journal::ClientData client_data;
11fdf7f2 1709 auto bl_it = client.data.cbegin();
7c673cae 1710 try {
11fdf7f2 1711 decode(client_data, bl_it);
7c673cae
FG
1712 } catch (const buffer::error &err) {
1713 lderr(cct) << this << " " << __func__ << ": "
f67539c2 1714 << "failed to decode client data: " << err.what() << dendl;
7c673cae
FG
1715 return -EINVAL;
1716 }
1717
1718 journal::ImageClientMeta *image_client_meta =
1719 boost::get<journal::ImageClientMeta>(&client_data.client_meta);
1720 if (image_client_meta == nullptr) {
1721 lderr(cct) << this << " " << __func__ << ": "
1722 << "failed to access image client meta struct" << dendl;
1723 return -EINVAL;
1724 }
1725
1726 *do_resync = image_client_meta->resync_requested;
1727
1728 return 0;
1729}
1730
1731struct C_RefreshTags : public Context {
9f95a23c 1732 AsyncOpTracker &async_op_tracker;
7c673cae
FG
1733 Context *on_finish = nullptr;
1734
9f95a23c
TL
1735 ceph::mutex lock =
1736 ceph::make_mutex("librbd::Journal::C_RefreshTags::lock");
11fdf7f2 1737 uint64_t tag_tid = 0;
7c673cae
FG
1738 journal::TagData tag_data;
1739
9f95a23c
TL
1740 explicit C_RefreshTags(AsyncOpTracker &async_op_tracker)
1741 : async_op_tracker(async_op_tracker) {
7c673cae
FG
1742 async_op_tracker.start_op();
1743 }
1744 ~C_RefreshTags() override {
1745 async_op_tracker.finish_op();
1746 }
1747
1748 void finish(int r) override {
1749 on_finish->complete(r);
1750 }
1751};
1752
1753template <typename I>
1754void Journal<I>::handle_metadata_updated() {
1755 CephContext *cct = m_image_ctx.cct;
9f95a23c 1756 std::lock_guard locker{m_lock};
7c673cae
FG
1757
1758 if (m_state != STATE_READY && !is_journal_replaying(m_lock)) {
1759 return;
1760 } else if (is_tag_owner(m_lock)) {
1761 ldout(cct, 20) << this << " " << __func__ << ": primary image" << dendl;
1762 return;
1763 } else if (m_listeners.empty()) {
1764 ldout(cct, 20) << this << " " << __func__ << ": no listeners" << dendl;
1765 return;
1766 }
1767
1768 uint64_t refresh_sequence = ++m_refresh_sequence;
1769 ldout(cct, 20) << this << " " << __func__ << ": "
1770 << "refresh_sequence=" << refresh_sequence << dendl;
1771
1772 // pull the most recent tags from the journal, decode, and
1773 // update the internal tag state
1774 C_RefreshTags *refresh_ctx = new C_RefreshTags(m_async_journal_op_tracker);
9f95a23c 1775 refresh_ctx->on_finish = new LambdaContext(
7c673cae
FG
1776 [this, refresh_sequence, refresh_ctx](int r) {
1777 handle_refresh_metadata(refresh_sequence, refresh_ctx->tag_tid,
1778 refresh_ctx->tag_data, r);
1779 });
1780 C_DecodeTags *decode_tags_ctx = new C_DecodeTags(
1781 cct, &refresh_ctx->lock, &refresh_ctx->tag_tid,
1782 &refresh_ctx->tag_data, refresh_ctx);
1783 m_journaler->get_tags(m_tag_tid == 0 ? 0 : m_tag_tid - 1, m_tag_class,
1784 &decode_tags_ctx->tags, decode_tags_ctx);
1785}
1786
1787template <typename I>
1788void Journal<I>::handle_refresh_metadata(uint64_t refresh_sequence,
1789 uint64_t tag_tid,
1790 journal::TagData tag_data, int r) {
1791 CephContext *cct = m_image_ctx.cct;
9f95a23c 1792 std::unique_lock locker{m_lock};
7c673cae
FG
1793
1794 if (r < 0) {
1795 lderr(cct) << this << " " << __func__ << ": failed to refresh metadata: "
1796 << cpp_strerror(r) << dendl;
1797 return;
1798 } else if (m_state != STATE_READY && !is_journal_replaying(m_lock)) {
1799 return;
1800 } else if (refresh_sequence != m_refresh_sequence) {
1801 // another, more up-to-date refresh is in-flight
1802 return;
1803 }
1804
1805 ldout(cct, 20) << this << " " << __func__ << ": "
1806 << "refresh_sequence=" << refresh_sequence << ", "
1807 << "tag_tid=" << tag_tid << ", "
1808 << "tag_data=" << tag_data << dendl;
9f95a23c 1809 m_listener_cond.wait(locker, [this] { return !m_listener_notify; });
7c673cae
FG
1810
1811 bool was_tag_owner = is_tag_owner(m_lock);
1812 if (m_tag_tid < tag_tid) {
1813 m_tag_tid = tag_tid;
1814 m_tag_data = tag_data;
1815 }
1816 bool promoted_to_primary = (!was_tag_owner && is_tag_owner(m_lock));
1817
1818 bool resync_requested = false;
1819 r = check_resync_requested(&resync_requested);
1820 if (r < 0) {
1821 lderr(cct) << this << " " << __func__ << ": "
1822 << "failed to check if a resync was requested" << dendl;
1823 return;
1824 }
1825
1826 Listeners listeners(m_listeners);
1827 m_listener_notify = true;
9f95a23c 1828 locker.unlock();
7c673cae
FG
1829
1830 if (promoted_to_primary) {
1831 for (auto listener : listeners) {
1832 listener->handle_promoted();
1833 }
1834 } else if (resync_requested) {
1835 for (auto listener : listeners) {
1836 listener->handle_resync();
1837 }
1838 }
1839
9f95a23c 1840 locker.lock();
7c673cae 1841 m_listener_notify = false;
9f95a23c 1842 m_listener_cond.notify_all();
7c673cae
FG
1843}
1844
1845template <typename I>
1846void Journal<I>::add_listener(journal::Listener *listener) {
9f95a23c 1847 std::lock_guard locker{m_lock};
7c673cae
FG
1848 m_listeners.insert(listener);
1849}
1850
1851template <typename I>
1852void Journal<I>::remove_listener(journal::Listener *listener) {
9f95a23c
TL
1853 std::unique_lock locker{m_lock};
1854 m_listener_cond.wait(locker, [this] { return !m_listener_notify; });
7c673cae
FG
1855 m_listeners.erase(listener);
1856}
1857
1858} // namespace librbd
1859
1860#ifndef TEST_F
1861template class librbd::Journal<librbd::ImageCtx>;
1862#endif