]> git.proxmox.com Git - ceph.git/blame - ceph/src/librbd/Journal.cc
bump version to 15.2.11-pve1
[ceph.git] / ceph / src / librbd / Journal.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "librbd/Journal.h"
5#include "include/rados/librados.hpp"
9f95a23c 6#include "common/AsyncOpTracker.h"
7c673cae
FG
7#include "common/errno.h"
8#include "common/Timer.h"
9#include "common/WorkQueue.h"
10#include "cls/journal/cls_journal_types.h"
11#include "journal/Journaler.h"
12#include "journal/Policy.h"
13#include "journal/ReplayEntry.h"
14#include "journal/Settings.h"
15#include "journal/Utils.h"
7c673cae
FG
16#include "librbd/ImageCtx.h"
17#include "librbd/io/ImageRequestWQ.h"
11fdf7f2
TL
18#include "librbd/io/ObjectDispatchSpec.h"
19#include "librbd/io/ObjectDispatcher.h"
7c673cae
FG
20#include "librbd/journal/CreateRequest.h"
21#include "librbd/journal/DemoteRequest.h"
11fdf7f2 22#include "librbd/journal/ObjectDispatch.h"
7c673cae
FG
23#include "librbd/journal/OpenRequest.h"
24#include "librbd/journal/RemoveRequest.h"
11fdf7f2 25#include "librbd/journal/ResetRequest.h"
7c673cae
FG
26#include "librbd/journal/Replay.h"
27#include "librbd/journal/PromoteRequest.h"
28
29#include <boost/scope_exit.hpp>
30#include <utility>
31
32#define dout_subsys ceph_subsys_rbd
33#undef dout_prefix
34#define dout_prefix *_dout << "librbd::Journal: "
35
36namespace librbd {
37
38using util::create_async_context_callback;
39using util::create_context_callback;
40using journal::util::C_DecodeTag;
41using journal::util::C_DecodeTags;
42
43namespace {
44
45// TODO: once journaler is 100% async, remove separate threads and
46// reuse ImageCtx's thread pool
47class ThreadPoolSingleton : public ThreadPool {
48public:
49 explicit ThreadPoolSingleton(CephContext *cct)
50 : ThreadPool(cct, "librbd::Journal", "tp_librbd_journ", 1) {
51 start();
52 }
53 ~ThreadPoolSingleton() override {
54 stop();
55 }
56};
57
58template <typename I>
59struct C_IsTagOwner : public Context {
60 librados::IoCtx &io_ctx;
61 std::string image_id;
62 bool *is_tag_owner;
63 ContextWQ *op_work_queue;
64 Context *on_finish;
65
66 CephContext *cct = nullptr;
67 Journaler *journaler;
68 cls::journal::Client client;
69 journal::ImageClientMeta client_meta;
11fdf7f2 70 uint64_t tag_tid = 0;
7c673cae
FG
71 journal::TagData tag_data;
72
73 C_IsTagOwner(librados::IoCtx &io_ctx, const std::string &image_id,
74 bool *is_tag_owner, ContextWQ *op_work_queue, Context *on_finish)
75 : io_ctx(io_ctx), image_id(image_id), is_tag_owner(is_tag_owner),
76 op_work_queue(op_work_queue), on_finish(on_finish),
77 cct(reinterpret_cast<CephContext*>(io_ctx.cct())),
78 journaler(new Journaler(io_ctx, image_id, Journal<>::IMAGE_CLIENT_ID,
9f95a23c 79 {}, nullptr)) {
7c673cae
FG
80 }
81
82 void finish(int r) override {
83 ldout(cct, 20) << this << " C_IsTagOwner::" << __func__ << ": r=" << r
84 << dendl;
85 if (r < 0) {
86 lderr(cct) << this << " C_IsTagOwner::" << __func__ << ": "
87 << "failed to get tag owner: " << cpp_strerror(r) << dendl;
88 } else {
89 *is_tag_owner = (tag_data.mirror_uuid == Journal<>::LOCAL_MIRROR_UUID);
90 }
91
92 Journaler *journaler = this->journaler;
93 Context *on_finish = this->on_finish;
9f95a23c 94 auto ctx = new LambdaContext(
7c673cae
FG
95 [journaler, on_finish](int r) {
96 on_finish->complete(r);
97 delete journaler;
98 });
99 op_work_queue->queue(ctx, r);
100 }
101};
102
103struct C_GetTagOwner : public Context {
104 std::string *mirror_uuid;
105 Context *on_finish;
106
107 Journaler journaler;
108 cls::journal::Client client;
109 journal::ImageClientMeta client_meta;
11fdf7f2 110 uint64_t tag_tid = 0;
7c673cae
FG
111 journal::TagData tag_data;
112
113 C_GetTagOwner(librados::IoCtx &io_ctx, const std::string &image_id,
114 std::string *mirror_uuid, Context *on_finish)
115 : mirror_uuid(mirror_uuid), on_finish(on_finish),
9f95a23c 116 journaler(io_ctx, image_id, Journal<>::IMAGE_CLIENT_ID, {}, nullptr) {
7c673cae
FG
117 }
118
119 virtual void finish(int r) {
120 if (r >= 0) {
121 *mirror_uuid = tag_data.mirror_uuid;
122 }
123 on_finish->complete(r);
124 }
125};
126
127template <typename J>
128struct GetTagsRequest {
129 CephContext *cct;
130 J *journaler;
131 cls::journal::Client *client;
132 journal::ImageClientMeta *client_meta;
133 uint64_t *tag_tid;
134 journal::TagData *tag_data;
135 Context *on_finish;
136
9f95a23c 137 ceph::mutex lock = ceph::make_mutex("lock");
7c673cae
FG
138
139 GetTagsRequest(CephContext *cct, J *journaler, cls::journal::Client *client,
140 journal::ImageClientMeta *client_meta, uint64_t *tag_tid,
141 journal::TagData *tag_data, Context *on_finish)
142 : cct(cct), journaler(journaler), client(client), client_meta(client_meta),
9f95a23c 143 tag_tid(tag_tid), tag_data(tag_data), on_finish(on_finish) {
7c673cae
FG
144 }
145
146 /**
147 * @verbatim
148 *
149 * <start>
150 * |
151 * v
152 * GET_CLIENT * * * * * * * * * * * *
153 * | *
154 * v *
155 * GET_TAGS * * * * * * * * * * * * * (error)
156 * | *
157 * v *
158 * <finish> * * * * * * * * * * * * *
159 *
160 * @endverbatim
161 */
162
163 void send() {
164 send_get_client();
165 }
166
167 void send_get_client() {
168 ldout(cct, 20) << __func__ << dendl;
169
9f95a23c 170 auto ctx = new LambdaContext(
7c673cae
FG
171 [this](int r) {
172 handle_get_client(r);
173 });
174 journaler->get_client(Journal<ImageCtx>::IMAGE_CLIENT_ID, client, ctx);
175 }
176
177 void handle_get_client(int r) {
178 ldout(cct, 20) << __func__ << ": r=" << r << dendl;
179
180 if (r < 0) {
181 complete(r);
182 return;
183 }
184
185 librbd::journal::ClientData client_data;
11fdf7f2 186 auto bl_it = client->data.cbegin();
7c673cae 187 try {
11fdf7f2 188 decode(client_data, bl_it);
7c673cae
FG
189 } catch (const buffer::error &err) {
190 lderr(cct) << this << " OpenJournalerRequest::" << __func__ << ": "
191 << "failed to decode client data" << dendl;
192 complete(-EBADMSG);
193 return;
194 }
195
196 journal::ImageClientMeta *image_client_meta =
197 boost::get<journal::ImageClientMeta>(&client_data.client_meta);
198 if (image_client_meta == nullptr) {
199 lderr(cct) << this << " OpenJournalerRequest::" << __func__ << ": "
200 << "failed to get client meta" << dendl;
201 complete(-EINVAL);
202 return;
203 }
204 *client_meta = *image_client_meta;
205
206 send_get_tags();
207 }
208
209 void send_get_tags() {
210 ldout(cct, 20) << __func__ << dendl;
211
9f95a23c 212 auto ctx = new LambdaContext(
7c673cae
FG
213 [this](int r) {
214 handle_get_tags(r);
215 });
216 C_DecodeTags *tags_ctx = new C_DecodeTags(cct, &lock, tag_tid, tag_data,
217 ctx);
218 journaler->get_tags(client_meta->tag_class, &tags_ctx->tags, tags_ctx);
219 }
220
221 void handle_get_tags(int r) {
222 ldout(cct, 20) << __func__ << ": r=" << r << dendl;
223
224 complete(r);
225 }
226
227 void complete(int r) {
228 on_finish->complete(r);
229 delete this;
230 }
231};
232
233template <typename J>
234void get_tags(CephContext *cct, J *journaler,
235 cls::journal::Client *client,
236 journal::ImageClientMeta *client_meta,
237 uint64_t *tag_tid, journal::TagData *tag_data,
238 Context *on_finish) {
239 ldout(cct, 20) << __func__ << dendl;
240
241 GetTagsRequest<J> *req =
242 new GetTagsRequest<J>(cct, journaler, client, client_meta, tag_tid,
243 tag_data, on_finish);
244 req->send();
245}
246
247template <typename J>
248int allocate_journaler_tag(CephContext *cct, J *journaler,
249 uint64_t tag_class,
250 const journal::TagPredecessor &predecessor,
251 const std::string &mirror_uuid,
252 cls::journal::Tag *new_tag) {
253 journal::TagData tag_data;
254 tag_data.mirror_uuid = mirror_uuid;
255 tag_data.predecessor = predecessor;
256
257 bufferlist tag_bl;
11fdf7f2 258 encode(tag_data, tag_bl);
7c673cae
FG
259
260 C_SaferCond allocate_tag_ctx;
261 journaler->allocate_tag(tag_class, tag_bl, new_tag, &allocate_tag_ctx);
262
263 int r = allocate_tag_ctx.wait();
264 if (r < 0) {
265 lderr(cct) << __func__ << ": "
266 << "failed to allocate tag: " << cpp_strerror(r) << dendl;
267 return r;
268 }
269 return 0;
270}
271
272} // anonymous namespace
273
274// client id for local image
275template <typename I>
276const std::string Journal<I>::IMAGE_CLIENT_ID("");
277
278// mirror uuid to use for local images
279template <typename I>
280const std::string Journal<I>::LOCAL_MIRROR_UUID("");
281
282// mirror uuid to use for orphaned (demoted) images
283template <typename I>
284const std::string Journal<I>::ORPHAN_MIRROR_UUID("<orphan>");
285
286template <typename I>
287std::ostream &operator<<(std::ostream &os,
288 const typename Journal<I>::State &state) {
289 switch (state) {
290 case Journal<I>::STATE_UNINITIALIZED:
291 os << "Uninitialized";
292 break;
293 case Journal<I>::STATE_INITIALIZING:
294 os << "Initializing";
295 break;
296 case Journal<I>::STATE_REPLAYING:
297 os << "Replaying";
298 break;
299 case Journal<I>::STATE_FLUSHING_RESTART:
300 os << "FlushingRestart";
301 break;
302 case Journal<I>::STATE_RESTARTING_REPLAY:
303 os << "RestartingReplay";
304 break;
305 case Journal<I>::STATE_FLUSHING_REPLAY:
306 os << "FlushingReplay";
307 break;
308 case Journal<I>::STATE_READY:
309 os << "Ready";
310 break;
311 case Journal<I>::STATE_STOPPING:
312 os << "Stopping";
313 break;
314 case Journal<I>::STATE_CLOSING:
315 os << "Closing";
316 break;
317 case Journal<I>::STATE_CLOSED:
318 os << "Closed";
319 break;
320 default:
321 os << "Unknown (" << static_cast<uint32_t>(state) << ")";
322 break;
323 }
324 return os;
325}
326
327template <typename I>
328Journal<I>::Journal(I &image_ctx)
9f95a23c
TL
329 : RefCountedObject(image_ctx.cct),
330 m_image_ctx(image_ctx), m_journaler(NULL),
331 m_state(STATE_UNINITIALIZED),
7c673cae 332 m_error_result(0), m_replay_handler(this), m_close_pending(false),
9f95a23c 333 m_event_tid(0),
7c673cae
FG
334 m_blocking_writes(false), m_journal_replay(NULL),
335 m_metadata_listener(this) {
336
337 CephContext *cct = m_image_ctx.cct;
338 ldout(cct, 5) << this << ": ictx=" << &m_image_ctx << dendl;
339
11fdf7f2
TL
340 auto thread_pool_singleton =
341 &cct->lookup_or_create_singleton_object<ThreadPoolSingleton>(
342 "librbd::journal::thread_pool", false, cct);
7c673cae 343 m_work_queue = new ContextWQ("librbd::journal::work_queue",
11fdf7f2 344 cct->_conf.get_val<uint64_t>("rbd_op_thread_timeout"),
7c673cae
FG
345 thread_pool_singleton);
346 ImageCtx::get_timer_instance(cct, &m_timer, &m_timer_lock);
347}
348
349template <typename I>
350Journal<I>::~Journal() {
351 if (m_work_queue != nullptr) {
352 m_work_queue->drain();
353 delete m_work_queue;
354 }
355
9f95a23c 356 std::lock_guard locker{m_lock};
11fdf7f2
TL
357 ceph_assert(m_state == STATE_UNINITIALIZED || m_state == STATE_CLOSED);
358 ceph_assert(m_journaler == NULL);
359 ceph_assert(m_journal_replay == NULL);
360 ceph_assert(m_wait_for_state_contexts.empty());
7c673cae
FG
361}
362
363template <typename I>
364bool Journal<I>::is_journal_supported(I &image_ctx) {
9f95a23c 365 ceph_assert(ceph_mutex_is_locked(image_ctx.image_lock));
7c673cae
FG
366 return ((image_ctx.features & RBD_FEATURE_JOURNALING) &&
367 !image_ctx.read_only && image_ctx.snap_id == CEPH_NOSNAP);
368}
369
370template <typename I>
371int Journal<I>::create(librados::IoCtx &io_ctx, const std::string &image_id,
372 uint8_t order, uint8_t splay_width,
373 const std::string &object_pool) {
374 CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
375 ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
376
377 ThreadPool *thread_pool;
378 ContextWQ *op_work_queue;
379 ImageCtx::get_thread_pool_instance(cct, &thread_pool, &op_work_queue);
380
381 C_SaferCond cond;
382 journal::TagData tag_data(LOCAL_MIRROR_UUID);
383 journal::CreateRequest<I> *req = journal::CreateRequest<I>::create(
384 io_ctx, image_id, order, splay_width, object_pool, cls::journal::Tag::TAG_CLASS_NEW,
385 tag_data, IMAGE_CLIENT_ID, op_work_queue, &cond);
386 req->send();
387
388 return cond.wait();
389}
390
391template <typename I>
392int Journal<I>::remove(librados::IoCtx &io_ctx, const std::string &image_id) {
393 CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
394 ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
395
396 ThreadPool *thread_pool;
397 ContextWQ *op_work_queue;
398 ImageCtx::get_thread_pool_instance(cct, &thread_pool, &op_work_queue);
399
400 C_SaferCond cond;
401 journal::RemoveRequest<I> *req = journal::RemoveRequest<I>::create(
402 io_ctx, image_id, IMAGE_CLIENT_ID, op_work_queue, &cond);
403 req->send();
404
405 return cond.wait();
406}
407
408template <typename I>
409int Journal<I>::reset(librados::IoCtx &io_ctx, const std::string &image_id) {
410 CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
411 ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
412
11fdf7f2
TL
413 ThreadPool *thread_pool;
414 ContextWQ *op_work_queue;
415 ImageCtx::get_thread_pool_instance(cct, &thread_pool, &op_work_queue);
7c673cae
FG
416
417 C_SaferCond cond;
11fdf7f2
TL
418 auto req = journal::ResetRequest<I>::create(io_ctx, image_id, IMAGE_CLIENT_ID,
419 Journal<>::LOCAL_MIRROR_UUID,
420 op_work_queue, &cond);
421 req->send();
7c673cae 422
11fdf7f2 423 return cond.wait();
7c673cae
FG
424}
425
426template <typename I>
427void Journal<I>::is_tag_owner(I *image_ctx, bool *owner,
428 Context *on_finish) {
429 Journal<I>::is_tag_owner(image_ctx->md_ctx, image_ctx->id, owner,
430 image_ctx->op_work_queue, on_finish);
431}
432
433template <typename I>
434void Journal<I>::is_tag_owner(librados::IoCtx& io_ctx, std::string& image_id,
435 bool *is_tag_owner, ContextWQ *op_work_queue,
436 Context *on_finish) {
437 CephContext *cct = reinterpret_cast<CephContext*>(io_ctx.cct());
438 ldout(cct, 20) << __func__ << dendl;
439
440 C_IsTagOwner<I> *is_tag_owner_ctx = new C_IsTagOwner<I>(
441 io_ctx, image_id, is_tag_owner, op_work_queue, on_finish);
442 get_tags(cct, is_tag_owner_ctx->journaler, &is_tag_owner_ctx->client,
443 &is_tag_owner_ctx->client_meta, &is_tag_owner_ctx->tag_tid,
444 &is_tag_owner_ctx->tag_data, is_tag_owner_ctx);
445}
446
447template <typename I>
448void Journal<I>::get_tag_owner(IoCtx& io_ctx, std::string& image_id,
449 std::string *mirror_uuid,
450 ContextWQ *op_work_queue, Context *on_finish) {
11fdf7f2 451 CephContext *cct = static_cast<CephContext *>(io_ctx.cct());
7c673cae
FG
452 ldout(cct, 20) << __func__ << dendl;
453
454 auto ctx = new C_GetTagOwner(io_ctx, image_id, mirror_uuid, on_finish);
455 get_tags(cct, &ctx->journaler, &ctx->client, &ctx->client_meta, &ctx->tag_tid,
456 &ctx->tag_data, create_async_context_callback(op_work_queue, ctx));
457}
458
459template <typename I>
460int Journal<I>::request_resync(I *image_ctx) {
461 CephContext *cct = image_ctx->cct;
462 ldout(cct, 20) << __func__ << dendl;
463
9f95a23c
TL
464 Journaler journaler(image_ctx->md_ctx, image_ctx->id, IMAGE_CLIENT_ID, {},
465 nullptr);
7c673cae 466
9f95a23c 467 ceph::mutex lock = ceph::make_mutex("lock");
7c673cae
FG
468 journal::ImageClientMeta client_meta;
469 uint64_t tag_tid;
470 journal::TagData tag_data;
471
472 C_SaferCond open_ctx;
473 auto open_req = journal::OpenRequest<I>::create(image_ctx, &journaler, &lock,
474 &client_meta, &tag_tid,
475 &tag_data, &open_ctx);
476 open_req->send();
477
478 BOOST_SCOPE_EXIT_ALL(&journaler) {
479 journaler.shut_down();
480 };
481
482 int r = open_ctx.wait();
483 if (r < 0) {
484 return r;
485 }
486
487 client_meta.resync_requested = true;
488
489 journal::ClientData client_data(client_meta);
490 bufferlist client_data_bl;
11fdf7f2 491 encode(client_data, client_data_bl);
7c673cae
FG
492
493 C_SaferCond update_client_ctx;
494 journaler.update_client(client_data_bl, &update_client_ctx);
495
496 r = update_client_ctx.wait();
497 if (r < 0) {
498 lderr(cct) << __func__ << ": "
499 << "failed to update client: " << cpp_strerror(r) << dendl;
500 return r;
501 }
502 return 0;
503}
504
505template <typename I>
506void Journal<I>::promote(I *image_ctx, Context *on_finish) {
507 CephContext *cct = image_ctx->cct;
508 ldout(cct, 20) << __func__ << dendl;
509
510 auto promote_req = journal::PromoteRequest<I>::create(image_ctx, false,
511 on_finish);
512 promote_req->send();
513}
514
515template <typename I>
516void Journal<I>::demote(I *image_ctx, Context *on_finish) {
517 CephContext *cct = image_ctx->cct;
518 ldout(cct, 20) << __func__ << dendl;
519
520 auto req = journal::DemoteRequest<I>::create(*image_ctx, on_finish);
521 req->send();
522}
523
524template <typename I>
525bool Journal<I>::is_journal_ready() const {
9f95a23c 526 std::lock_guard locker{m_lock};
7c673cae
FG
527 return (m_state == STATE_READY);
528}
529
530template <typename I>
531bool Journal<I>::is_journal_replaying() const {
9f95a23c 532 std::lock_guard locker{m_lock};
7c673cae
FG
533 return is_journal_replaying(m_lock);
534}
535
536template <typename I>
9f95a23c
TL
537bool Journal<I>::is_journal_replaying(const ceph::mutex &) const {
538 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae
FG
539 return (m_state == STATE_REPLAYING ||
540 m_state == STATE_FLUSHING_REPLAY ||
541 m_state == STATE_FLUSHING_RESTART ||
542 m_state == STATE_RESTARTING_REPLAY);
543}
544
545template <typename I>
546bool Journal<I>::is_journal_appending() const {
9f95a23c
TL
547 ceph_assert(ceph_mutex_is_locked(m_image_ctx.image_lock));
548 std::lock_guard locker{m_lock};
7c673cae
FG
549 return (m_state == STATE_READY &&
550 !m_image_ctx.get_journal_policy()->append_disabled());
551}
552
553template <typename I>
554void Journal<I>::wait_for_journal_ready(Context *on_ready) {
555 on_ready = create_async_context_callback(m_image_ctx, on_ready);
556
9f95a23c 557 std::lock_guard locker{m_lock};
7c673cae
FG
558 if (m_state == STATE_READY) {
559 on_ready->complete(m_error_result);
560 } else {
561 wait_for_steady_state(on_ready);
562 }
563}
564
565template <typename I>
566void Journal<I>::open(Context *on_finish) {
567 CephContext *cct = m_image_ctx.cct;
568 ldout(cct, 20) << this << " " << __func__ << dendl;
569
9f95a23c
TL
570 on_finish = create_context_callback<Context>(on_finish, this);
571
7c673cae
FG
572 on_finish = create_async_context_callback(m_image_ctx, on_finish);
573
11fdf7f2
TL
574 // inject our handler into the object dispatcher chain
575 m_image_ctx.io_object_dispatcher->register_object_dispatch(
576 journal::ObjectDispatch<I>::create(&m_image_ctx, this));
577
9f95a23c 578 std::lock_guard locker{m_lock};
11fdf7f2 579 ceph_assert(m_state == STATE_UNINITIALIZED);
7c673cae
FG
580 wait_for_steady_state(on_finish);
581 create_journaler();
582}
583
584template <typename I>
585void Journal<I>::close(Context *on_finish) {
586 CephContext *cct = m_image_ctx.cct;
587 ldout(cct, 20) << this << " " << __func__ << dendl;
588
9f95a23c
TL
589 on_finish = create_context_callback<Context>(on_finish, this);
590
591 on_finish = new LambdaContext([this, on_finish](int r) {
11fdf7f2 592 // remove our handler from object dispatcher chain - preserve error
9f95a23c 593 auto ctx = new LambdaContext([on_finish, r](int _) {
11fdf7f2
TL
594 on_finish->complete(r);
595 });
596 m_image_ctx.io_object_dispatcher->shut_down_object_dispatch(
597 io::OBJECT_DISPATCH_LAYER_JOURNAL, ctx);
598 });
7c673cae
FG
599 on_finish = create_async_context_callback(m_image_ctx, on_finish);
600
9f95a23c
TL
601 std::unique_lock locker{m_lock};
602 m_listener_cond.wait(locker, [this] { return !m_listener_notify; });
7c673cae
FG
603
604 Listeners listeners(m_listeners);
605 m_listener_notify = true;
9f95a23c 606 locker.unlock();
7c673cae
FG
607 for (auto listener : listeners) {
608 listener->handle_close();
609 }
610
9f95a23c 611 locker.lock();
7c673cae 612 m_listener_notify = false;
9f95a23c 613 m_listener_cond.notify_all();
7c673cae 614
11fdf7f2 615 ceph_assert(m_state != STATE_UNINITIALIZED);
7c673cae
FG
616 if (m_state == STATE_CLOSED) {
617 on_finish->complete(m_error_result);
618 return;
619 }
620
621 if (m_state == STATE_READY) {
622 stop_recording();
623 }
624
625 m_close_pending = true;
626 wait_for_steady_state(on_finish);
627}
628
629template <typename I>
630bool Journal<I>::is_tag_owner() const {
9f95a23c 631 std::lock_guard locker{m_lock};
7c673cae
FG
632 return is_tag_owner(m_lock);
633}
634
635template <typename I>
9f95a23c
TL
636bool Journal<I>::is_tag_owner(const ceph::mutex &) const {
637 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae
FG
638 return (m_tag_data.mirror_uuid == LOCAL_MIRROR_UUID);
639}
640
641template <typename I>
642uint64_t Journal<I>::get_tag_tid() const {
9f95a23c 643 std::lock_guard locker{m_lock};
7c673cae
FG
644 return m_tag_tid;
645}
646
647template <typename I>
648journal::TagData Journal<I>::get_tag_data() const {
9f95a23c 649 std::lock_guard locker{m_lock};
7c673cae
FG
650 return m_tag_data;
651}
652
653template <typename I>
654void Journal<I>::allocate_local_tag(Context *on_finish) {
655 CephContext *cct = m_image_ctx.cct;
656 ldout(cct, 20) << this << " " << __func__ << dendl;
657
658 journal::TagPredecessor predecessor;
659 predecessor.mirror_uuid = LOCAL_MIRROR_UUID;
660 {
9f95a23c 661 std::lock_guard locker{m_lock};
11fdf7f2 662 ceph_assert(m_journaler != nullptr && is_tag_owner(m_lock));
7c673cae
FG
663
664 cls::journal::Client client;
665 int r = m_journaler->get_cached_client(IMAGE_CLIENT_ID, &client);
666 if (r < 0) {
667 lderr(cct) << this << " " << __func__ << ": "
668 << "failed to retrieve client: " << cpp_strerror(r) << dendl;
669 m_image_ctx.op_work_queue->queue(on_finish, r);
670 return;
671 }
672
673 // since we are primary, populate the predecessor with our known commit
674 // position
11fdf7f2 675 ceph_assert(m_tag_data.mirror_uuid == LOCAL_MIRROR_UUID);
7c673cae
FG
676 if (!client.commit_position.object_positions.empty()) {
677 auto position = client.commit_position.object_positions.front();
678 predecessor.commit_valid = true;
679 predecessor.tag_tid = position.tag_tid;
680 predecessor.entry_tid = position.entry_tid;
681 }
682 }
683
684 allocate_tag(LOCAL_MIRROR_UUID, predecessor, on_finish);
685}
686
687template <typename I>
688void Journal<I>::allocate_tag(const std::string &mirror_uuid,
689 const journal::TagPredecessor &predecessor,
690 Context *on_finish) {
691 CephContext *cct = m_image_ctx.cct;
692 ldout(cct, 20) << this << " " << __func__ << ": mirror_uuid=" << mirror_uuid
693 << dendl;
694
9f95a23c 695 std::lock_guard locker{m_lock};
11fdf7f2 696 ceph_assert(m_journaler != nullptr);
7c673cae
FG
697
698 journal::TagData tag_data;
699 tag_data.mirror_uuid = mirror_uuid;
700 tag_data.predecessor = predecessor;
701
702 bufferlist tag_bl;
11fdf7f2 703 encode(tag_data, tag_bl);
7c673cae
FG
704
705 C_DecodeTag *decode_tag_ctx = new C_DecodeTag(cct, &m_lock, &m_tag_tid,
706 &m_tag_data, on_finish);
707 m_journaler->allocate_tag(m_tag_class, tag_bl, &decode_tag_ctx->tag,
708 decode_tag_ctx);
709}
710
711template <typename I>
712void Journal<I>::flush_commit_position(Context *on_finish) {
713 CephContext *cct = m_image_ctx.cct;
714 ldout(cct, 20) << this << " " << __func__ << dendl;
715
9f95a23c 716 std::lock_guard locker{m_lock};
11fdf7f2 717 ceph_assert(m_journaler != nullptr);
7c673cae
FG
718 m_journaler->flush_commit_position(on_finish);
719}
720
494da23a
TL
721template <typename I>
722void Journal<I>::user_flushed() {
723 if (m_state == STATE_READY && !m_user_flushed.exchange(true) &&
724 m_image_ctx.config.template get_val<bool>("rbd_journal_object_writethrough_until_flush")) {
9f95a23c 725 std::lock_guard locker{m_lock};
494da23a
TL
726 if (m_state == STATE_READY) {
727 CephContext *cct = m_image_ctx.cct;
728 ldout(cct, 5) << this << " " << __func__ << dendl;
729
730 ceph_assert(m_journaler != nullptr);
731 m_journaler->set_append_batch_options(
732 m_image_ctx.config.template get_val<uint64_t>("rbd_journal_object_flush_interval"),
733 m_image_ctx.config.template get_val<Option::size_t>("rbd_journal_object_flush_bytes"),
734 m_image_ctx.config.template get_val<double>("rbd_journal_object_flush_age"));
735 } else {
736 m_user_flushed = false;
737 }
738 }
739}
740
7c673cae
FG
741template <typename I>
742uint64_t Journal<I>::append_write_event(uint64_t offset, size_t length,
743 const bufferlist &bl,
7c673cae 744 bool flush_entry) {
11fdf7f2 745 ceph_assert(m_max_append_size > journal::AioWriteEvent::get_fixed_size());
7c673cae
FG
746 uint64_t max_write_data_size =
747 m_max_append_size - journal::AioWriteEvent::get_fixed_size();
748
749 // ensure that the write event fits within the journal entry
750 Bufferlists bufferlists;
751 uint64_t bytes_remaining = length;
752 uint64_t event_offset = 0;
753 do {
11fdf7f2 754 uint64_t event_length = std::min(bytes_remaining, max_write_data_size);
7c673cae
FG
755
756 bufferlist event_bl;
757 event_bl.substr_of(bl, event_offset, event_length);
758 journal::EventEntry event_entry(journal::AioWriteEvent(offset + event_offset,
759 event_length,
760 event_bl),
761 ceph_clock_now());
762
763 bufferlists.emplace_back();
11fdf7f2 764 encode(event_entry, bufferlists.back());
7c673cae
FG
765
766 event_offset += event_length;
767 bytes_remaining -= event_length;
768 } while (bytes_remaining > 0);
769
11fdf7f2
TL
770 return append_io_events(journal::EVENT_TYPE_AIO_WRITE, bufferlists, offset,
771 length, flush_entry, 0);
7c673cae
FG
772}
773
774template <typename I>
775uint64_t Journal<I>::append_io_event(journal::EventEntry &&event_entry,
7c673cae 776 uint64_t offset, size_t length,
b32b8144 777 bool flush_entry, int filter_ret_val) {
7c673cae
FG
778 bufferlist bl;
779 event_entry.timestamp = ceph_clock_now();
11fdf7f2
TL
780 encode(event_entry, bl);
781 return append_io_events(event_entry.get_event_type(), {bl}, offset, length,
782 flush_entry, filter_ret_val);
7c673cae
FG
783}
784
785template <typename I>
786uint64_t Journal<I>::append_io_events(journal::EventType event_type,
787 const Bufferlists &bufferlists,
7c673cae 788 uint64_t offset, size_t length,
b32b8144 789 bool flush_entry, int filter_ret_val) {
11fdf7f2 790 ceph_assert(!bufferlists.empty());
7c673cae
FG
791
792 uint64_t tid;
793 {
9f95a23c 794 std::lock_guard locker{m_lock};
11fdf7f2 795 ceph_assert(m_state == STATE_READY);
7c673cae
FG
796
797 tid = ++m_event_tid;
11fdf7f2 798 ceph_assert(tid != 0);
7c673cae
FG
799 }
800
801 Futures futures;
802 for (auto &bl : bufferlists) {
11fdf7f2 803 ceph_assert(bl.length() <= m_max_append_size);
7c673cae
FG
804 futures.push_back(m_journaler->append(m_tag_tid, bl));
805 }
806
807 {
9f95a23c 808 std::lock_guard event_locker{m_event_lock};
11fdf7f2 809 m_events[tid] = Event(futures, offset, length, filter_ret_val);
7c673cae
FG
810 }
811
812 CephContext *cct = m_image_ctx.cct;
813 ldout(cct, 20) << this << " " << __func__ << ": "
814 << "event=" << event_type << ", "
7c673cae
FG
815 << "offset=" << offset << ", "
816 << "length=" << length << ", "
817 << "flush=" << flush_entry << ", tid=" << tid << dendl;
818
819 Context *on_safe = create_async_context_callback(
820 m_image_ctx, new C_IOEventSafe(this, tid));
821 if (flush_entry) {
822 futures.back().flush(on_safe);
823 } else {
824 futures.back().wait(on_safe);
825 }
826
827 return tid;
828}
829
830template <typename I>
831void Journal<I>::commit_io_event(uint64_t tid, int r) {
832 CephContext *cct = m_image_ctx.cct;
833 ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
834 "r=" << r << dendl;
835
9f95a23c 836 std::lock_guard event_locker{m_event_lock};
7c673cae
FG
837 typename Events::iterator it = m_events.find(tid);
838 if (it == m_events.end()) {
839 return;
840 }
841 complete_event(it, r);
842}
843
844template <typename I>
845void Journal<I>::commit_io_event_extent(uint64_t tid, uint64_t offset,
846 uint64_t length, int r) {
11fdf7f2 847 ceph_assert(length > 0);
7c673cae
FG
848
849 CephContext *cct = m_image_ctx.cct;
850 ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
851 << "offset=" << offset << ", "
852 << "length=" << length << ", "
853 << "r=" << r << dendl;
854
9f95a23c 855 std::lock_guard event_locker{m_event_lock};
7c673cae
FG
856 typename Events::iterator it = m_events.find(tid);
857 if (it == m_events.end()) {
858 return;
859 }
860
861 Event &event = it->second;
862 if (event.ret_val == 0 && r < 0) {
863 event.ret_val = r;
864 }
865
866 ExtentInterval extent;
867 extent.insert(offset, length);
868
869 ExtentInterval intersect;
870 intersect.intersection_of(extent, event.pending_extents);
871
872 event.pending_extents.subtract(intersect);
873 if (!event.pending_extents.empty()) {
874 ldout(cct, 20) << this << " " << __func__ << ": "
875 << "pending extents: " << event.pending_extents << dendl;
876 return;
877 }
878 complete_event(it, event.ret_val);
879}
880
881template <typename I>
882void Journal<I>::append_op_event(uint64_t op_tid,
883 journal::EventEntry &&event_entry,
884 Context *on_safe) {
9f95a23c 885 ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
7c673cae
FG
886
887 bufferlist bl;
888 event_entry.timestamp = ceph_clock_now();
11fdf7f2 889 encode(event_entry, bl);
7c673cae
FG
890
891 Future future;
892 {
9f95a23c 893 std::lock_guard locker{m_lock};
11fdf7f2 894 ceph_assert(m_state == STATE_READY);
7c673cae
FG
895
896 future = m_journaler->append(m_tag_tid, bl);
897
898 // delay committing op event to ensure consistent replay
11fdf7f2 899 ceph_assert(m_op_futures.count(op_tid) == 0);
7c673cae
FG
900 m_op_futures[op_tid] = future;
901 }
902
903 on_safe = create_async_context_callback(m_image_ctx, on_safe);
9f95a23c 904 on_safe = new LambdaContext([this, on_safe](int r) {
7c673cae
FG
905 // ensure all committed IO before this op is committed
906 m_journaler->flush_commit_position(on_safe);
907 });
908 future.flush(on_safe);
909
910 CephContext *cct = m_image_ctx.cct;
911 ldout(cct, 10) << this << " " << __func__ << ": "
912 << "op_tid=" << op_tid << ", "
913 << "event=" << event_entry.get_event_type() << dendl;
914}
915
916template <typename I>
917void Journal<I>::commit_op_event(uint64_t op_tid, int r, Context *on_safe) {
918 CephContext *cct = m_image_ctx.cct;
919 ldout(cct, 10) << this << " " << __func__ << ": op_tid=" << op_tid << ", "
920 << "r=" << r << dendl;
921
922 journal::EventEntry event_entry((journal::OpFinishEvent(op_tid, r)),
923 ceph_clock_now());
924
925 bufferlist bl;
11fdf7f2 926 encode(event_entry, bl);
7c673cae
FG
927
928 Future op_start_future;
929 Future op_finish_future;
930 {
9f95a23c 931 std::lock_guard locker{m_lock};
11fdf7f2 932 ceph_assert(m_state == STATE_READY);
7c673cae
FG
933
934 // ready to commit op event
935 auto it = m_op_futures.find(op_tid);
11fdf7f2 936 ceph_assert(it != m_op_futures.end());
7c673cae
FG
937 op_start_future = it->second;
938 m_op_futures.erase(it);
939
940 op_finish_future = m_journaler->append(m_tag_tid, bl);
941 }
942
943 op_finish_future.flush(create_async_context_callback(
944 m_image_ctx, new C_OpEventSafe(this, op_tid, op_start_future,
945 op_finish_future, on_safe)));
946}
947
948template <typename I>
949void Journal<I>::replay_op_ready(uint64_t op_tid, Context *on_resume) {
950 CephContext *cct = m_image_ctx.cct;
951 ldout(cct, 10) << this << " " << __func__ << ": op_tid=" << op_tid << dendl;
952
953 {
9f95a23c 954 std::lock_guard locker{m_lock};
11fdf7f2 955 ceph_assert(m_journal_replay != nullptr);
7c673cae
FG
956 m_journal_replay->replay_op_ready(op_tid, on_resume);
957 }
958}
959
960template <typename I>
961void Journal<I>::flush_event(uint64_t tid, Context *on_safe) {
962 CephContext *cct = m_image_ctx.cct;
963 ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
964 << "on_safe=" << on_safe << dendl;
965
9f95a23c
TL
966 on_safe = create_context_callback<Context>(on_safe, this);
967
7c673cae
FG
968 Future future;
969 {
9f95a23c 970 std::lock_guard event_locker{m_event_lock};
7c673cae
FG
971 future = wait_event(m_lock, tid, on_safe);
972 }
973
974 if (future.is_valid()) {
975 future.flush(nullptr);
976 }
977}
978
979template <typename I>
980void Journal<I>::wait_event(uint64_t tid, Context *on_safe) {
981 CephContext *cct = m_image_ctx.cct;
982 ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
983 << "on_safe=" << on_safe << dendl;
984
9f95a23c
TL
985 on_safe = create_context_callback<Context>(on_safe, this);
986
987 std::lock_guard event_locker{m_event_lock};
7c673cae
FG
988 wait_event(m_lock, tid, on_safe);
989}
990
991template <typename I>
9f95a23c 992typename Journal<I>::Future Journal<I>::wait_event(ceph::mutex &lock, uint64_t tid,
7c673cae 993 Context *on_safe) {
9f95a23c 994 ceph_assert(ceph_mutex_is_locked(m_event_lock));
7c673cae
FG
995 CephContext *cct = m_image_ctx.cct;
996
997 typename Events::iterator it = m_events.find(tid);
11fdf7f2 998 ceph_assert(it != m_events.end());
7c673cae
FG
999
1000 Event &event = it->second;
1001 if (event.safe) {
1002 // journal entry already safe
1003 ldout(cct, 20) << this << " " << __func__ << ": "
1004 << "journal entry already safe" << dendl;
1005 m_image_ctx.op_work_queue->queue(on_safe, event.ret_val);
1006 return Future();
1007 }
1008
1009 event.on_safe_contexts.push_back(create_async_context_callback(m_image_ctx,
1010 on_safe));
1011 return event.futures.back();
1012}
1013
1014template <typename I>
1015void Journal<I>::start_external_replay(journal::Replay<I> **journal_replay,
1016 Context *on_start) {
1017 CephContext *cct = m_image_ctx.cct;
1018 ldout(cct, 20) << this << " " << __func__ << dendl;
1019
9f95a23c 1020 std::lock_guard locker{m_lock};
11fdf7f2
TL
1021 ceph_assert(m_state == STATE_READY);
1022 ceph_assert(m_journal_replay == nullptr);
7c673cae
FG
1023
1024 on_start = util::create_async_context_callback(m_image_ctx, on_start);
9f95a23c 1025 on_start = new LambdaContext(
7c673cae
FG
1026 [this, journal_replay, on_start](int r) {
1027 handle_start_external_replay(r, journal_replay, on_start);
1028 });
1029
1030 // safely flush all in-flight events before starting external replay
1031 m_journaler->stop_append(util::create_async_context_callback(m_image_ctx,
1032 on_start));
1033}
1034
1035template <typename I>
1036void Journal<I>::handle_start_external_replay(int r,
1037 journal::Replay<I> **journal_replay,
1038 Context *on_finish) {
1039 CephContext *cct = m_image_ctx.cct;
1040 ldout(cct, 20) << this << " " << __func__ << dendl;
1041
9f95a23c 1042 std::lock_guard locker{m_lock};
11fdf7f2
TL
1043 ceph_assert(m_state == STATE_READY);
1044 ceph_assert(m_journal_replay == nullptr);
7c673cae
FG
1045
1046 if (r < 0) {
1047 lderr(cct) << this << " " << __func__ << ": "
1048 << "failed to stop recording: " << cpp_strerror(r) << dendl;
1049 *journal_replay = nullptr;
1050
1051 // get back to a sane-state
1052 start_append();
1053 on_finish->complete(r);
1054 return;
1055 }
1056
1057 transition_state(STATE_REPLAYING, 0);
1058 m_journal_replay = journal::Replay<I>::create(m_image_ctx);
1059 *journal_replay = m_journal_replay;
1060 on_finish->complete(0);
1061}
1062
1063template <typename I>
1064void Journal<I>::stop_external_replay() {
1065 CephContext *cct = m_image_ctx.cct;
1066 ldout(cct, 20) << this << " " << __func__ << dendl;
1067
9f95a23c 1068 std::lock_guard locker{m_lock};
11fdf7f2
TL
1069 ceph_assert(m_journal_replay != nullptr);
1070 ceph_assert(m_state == STATE_REPLAYING);
7c673cae
FG
1071
1072 delete m_journal_replay;
1073 m_journal_replay = nullptr;
1074
1075 if (m_close_pending) {
1076 destroy_journaler(0);
1077 return;
1078 }
1079
1080 start_append();
1081}
1082
1083template <typename I>
1084void Journal<I>::create_journaler() {
1085 CephContext *cct = m_image_ctx.cct;
1086 ldout(cct, 20) << this << " " << __func__ << dendl;
1087
9f95a23c 1088 ceph_assert(ceph_mutex_is_locked(m_lock));
11fdf7f2
TL
1089 ceph_assert(m_state == STATE_UNINITIALIZED || m_state == STATE_RESTARTING_REPLAY);
1090 ceph_assert(m_journaler == NULL);
7c673cae
FG
1091
1092 transition_state(STATE_INITIALIZING, 0);
1093 ::journal::Settings settings;
11fdf7f2
TL
1094 settings.commit_interval =
1095 m_image_ctx.config.template get_val<double>("rbd_journal_commit_age");
1096 settings.max_payload_bytes =
1097 m_image_ctx.config.template get_val<Option::size_t>("rbd_journal_max_payload_bytes");
7c673cae 1098 settings.max_concurrent_object_sets =
11fdf7f2 1099 m_image_ctx.config.template get_val<uint64_t>("rbd_journal_max_concurrent_object_sets");
7c673cae
FG
1100 // TODO: a configurable filter to exclude certain peers from being
1101 // disconnected.
1102 settings.whitelisted_laggy_clients = {IMAGE_CLIENT_ID};
1103
1104 m_journaler = new Journaler(m_work_queue, m_timer, m_timer_lock,
1105 m_image_ctx.md_ctx, m_image_ctx.id,
9f95a23c 1106 IMAGE_CLIENT_ID, settings, nullptr);
7c673cae
FG
1107 m_journaler->add_listener(&m_metadata_listener);
1108
1109 Context *ctx = create_async_context_callback(
1110 m_image_ctx, create_context_callback<
1111 Journal<I>, &Journal<I>::handle_open>(this));
1112 auto open_req = journal::OpenRequest<I>::create(&m_image_ctx, m_journaler,
1113 &m_lock, &m_client_meta,
1114 &m_tag_tid, &m_tag_data, ctx);
1115 open_req->send();
1116}
1117
1118template <typename I>
1119void Journal<I>::destroy_journaler(int r) {
1120 CephContext *cct = m_image_ctx.cct;
1121 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1122
9f95a23c 1123 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae
FG
1124
1125 delete m_journal_replay;
1126 m_journal_replay = NULL;
1127
1128 m_journaler->remove_listener(&m_metadata_listener);
1129
1130 transition_state(STATE_CLOSING, r);
1131
1132 Context *ctx = create_async_context_callback(
1133 m_image_ctx, create_context_callback<
1134 Journal<I>, &Journal<I>::handle_journal_destroyed>(this));
9f95a23c 1135 ctx = new LambdaContext(
7c673cae 1136 [this, ctx](int r) {
9f95a23c 1137 std::lock_guard locker{m_lock};
7c673cae
FG
1138 m_journaler->shut_down(ctx);
1139 });
9f95a23c
TL
1140 ctx = create_async_context_callback(m_image_ctx, ctx);
1141 m_async_journal_op_tracker.wait_for_ops(ctx);
7c673cae
FG
1142}
1143
1144template <typename I>
1145void Journal<I>::recreate_journaler(int r) {
1146 CephContext *cct = m_image_ctx.cct;
1147 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1148
9f95a23c 1149 ceph_assert(ceph_mutex_is_locked(m_lock));
11fdf7f2
TL
1150 ceph_assert(m_state == STATE_FLUSHING_RESTART ||
1151 m_state == STATE_FLUSHING_REPLAY);
7c673cae
FG
1152
1153 delete m_journal_replay;
1154 m_journal_replay = NULL;
1155
1156 m_journaler->remove_listener(&m_metadata_listener);
1157
1158 transition_state(STATE_RESTARTING_REPLAY, r);
1159 m_journaler->shut_down(create_async_context_callback(
1160 m_image_ctx, create_context_callback<
1161 Journal<I>, &Journal<I>::handle_journal_destroyed>(this)));
1162}
1163
1164template <typename I>
1165void Journal<I>::complete_event(typename Events::iterator it, int r) {
9f95a23c 1166 ceph_assert(ceph_mutex_is_locked(m_event_lock));
11fdf7f2 1167 ceph_assert(m_state == STATE_READY);
7c673cae
FG
1168
1169 CephContext *cct = m_image_ctx.cct;
1170 ldout(cct, 20) << this << " " << __func__ << ": tid=" << it->first << " "
1171 << "r=" << r << dendl;
1172
1173 Event &event = it->second;
b32b8144
FG
1174 if (r < 0 && r == event.filter_ret_val) {
1175 // ignore allowed error codes
1176 r = 0;
1177 }
7c673cae
FG
1178 if (r < 0) {
1179 // event recorded to journal but failed to update disk, we cannot
1180 // commit this IO event. this event must be replayed.
11fdf7f2 1181 ceph_assert(event.safe);
7c673cae
FG
1182 lderr(cct) << this << " " << __func__ << ": "
1183 << "failed to commit IO to disk, replay required: "
1184 << cpp_strerror(r) << dendl;
1185 }
1186
1187 event.committed_io = true;
1188 if (event.safe) {
1189 if (r >= 0) {
1190 for (auto &future : event.futures) {
1191 m_journaler->committed(future);
1192 }
1193 }
1194 m_events.erase(it);
1195 }
1196}
1197
1198template <typename I>
1199void Journal<I>::start_append() {
9f95a23c 1200 ceph_assert(ceph_mutex_is_locked(m_lock));
494da23a 1201
11fdf7f2 1202 m_journaler->start_append(
11fdf7f2 1203 m_image_ctx.config.template get_val<uint64_t>("rbd_journal_object_max_in_flight_appends"));
494da23a
TL
1204 if (!m_image_ctx.config.template get_val<bool>("rbd_journal_object_writethrough_until_flush")) {
1205 m_journaler->set_append_batch_options(
1206 m_image_ctx.config.template get_val<uint64_t>("rbd_journal_object_flush_interval"),
1207 m_image_ctx.config.template get_val<Option::size_t>("rbd_journal_object_flush_bytes"),
1208 m_image_ctx.config.template get_val<double>("rbd_journal_object_flush_age"));
1209 }
1210
7c673cae
FG
1211 transition_state(STATE_READY, 0);
1212}
1213
1214template <typename I>
1215void Journal<I>::handle_open(int r) {
1216 CephContext *cct = m_image_ctx.cct;
1217 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1218
9f95a23c 1219 std::lock_guard locker{m_lock};
11fdf7f2 1220 ceph_assert(m_state == STATE_INITIALIZING);
7c673cae
FG
1221
1222 if (r < 0) {
1223 lderr(cct) << this << " " << __func__ << ": "
1224 << "failed to initialize journal: " << cpp_strerror(r)
1225 << dendl;
1226 destroy_journaler(r);
1227 return;
1228 }
1229
1230 m_tag_class = m_client_meta.tag_class;
1231 m_max_append_size = m_journaler->get_max_append_size();
1232 ldout(cct, 20) << this << " " << __func__ << ": "
1233 << "tag_class=" << m_tag_class << ", "
1234 << "max_append_size=" << m_max_append_size << dendl;
1235
1236 transition_state(STATE_REPLAYING, 0);
1237 m_journal_replay = journal::Replay<I>::create(m_image_ctx);
1238 m_journaler->start_replay(&m_replay_handler);
1239}
1240
1241template <typename I>
1242void Journal<I>::handle_replay_ready() {
1243 CephContext *cct = m_image_ctx.cct;
1244 ReplayEntry replay_entry;
1245 {
9f95a23c 1246 std::lock_guard locker{m_lock};
7c673cae
FG
1247 if (m_state != STATE_REPLAYING) {
1248 return;
1249 }
1250
1251 ldout(cct, 20) << this << " " << __func__ << dendl;
1252 if (!m_journaler->try_pop_front(&replay_entry)) {
1253 return;
1254 }
1255
1256 // only one entry should be in-flight at a time
11fdf7f2 1257 ceph_assert(!m_processing_entry);
7c673cae
FG
1258 m_processing_entry = true;
1259 }
1260
9f95a23c
TL
1261 m_async_journal_op_tracker.start_op();
1262
7c673cae 1263 bufferlist data = replay_entry.get_data();
11fdf7f2 1264 auto it = data.cbegin();
7c673cae
FG
1265
1266 journal::EventEntry event_entry;
1267 int r = m_journal_replay->decode(&it, &event_entry);
1268 if (r < 0) {
1269 lderr(cct) << this << " " << __func__ << ": "
1270 << "failed to decode journal event entry" << dendl;
1271 handle_replay_process_safe(replay_entry, r);
1272 return;
1273 }
1274
1275 Context *on_ready = create_context_callback<
1276 Journal<I>, &Journal<I>::handle_replay_process_ready>(this);
1277 Context *on_commit = new C_ReplayProcessSafe(this, std::move(replay_entry));
1278 m_journal_replay->process(event_entry, on_ready, on_commit);
1279}
1280
1281template <typename I>
1282void Journal<I>::handle_replay_complete(int r) {
1283 CephContext *cct = m_image_ctx.cct;
1284
1285 bool cancel_ops = false;
1286 {
9f95a23c 1287 std::lock_guard locker{m_lock};
7c673cae
FG
1288 if (m_state != STATE_REPLAYING) {
1289 return;
1290 }
1291
1292 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1293 if (r < 0) {
1294 cancel_ops = true;
1295 transition_state(STATE_FLUSHING_RESTART, r);
1296 } else {
1297 // state might change back to FLUSHING_RESTART on flush error
1298 transition_state(STATE_FLUSHING_REPLAY, 0);
1299 }
1300 }
1301
9f95a23c 1302 Context *ctx = new LambdaContext([this, cct](int r) {
7c673cae
FG
1303 ldout(cct, 20) << this << " handle_replay_complete: "
1304 << "handle shut down replay" << dendl;
1305
1306 State state;
1307 {
9f95a23c 1308 std::lock_guard locker{m_lock};
11fdf7f2
TL
1309 ceph_assert(m_state == STATE_FLUSHING_RESTART ||
1310 m_state == STATE_FLUSHING_REPLAY);
7c673cae
FG
1311 state = m_state;
1312 }
1313
1314 if (state == STATE_FLUSHING_RESTART) {
1315 handle_flushing_restart(0);
1316 } else {
1317 handle_flushing_replay();
1318 }
1319 });
9f95a23c 1320 ctx = new LambdaContext([this, ctx](int r) {
94b18763
FG
1321 // ensure the commit position is flushed to disk
1322 m_journaler->flush_commit_position(ctx);
1323 });
9f95a23c
TL
1324 ctx = create_async_context_callback(m_image_ctx, ctx);
1325 ctx = new LambdaContext([this, ctx](int r) {
1326 m_async_journal_op_tracker.wait_for_ops(ctx);
1327 });
1328 ctx = new LambdaContext([this, cct, cancel_ops, ctx](int r) {
7c673cae
FG
1329 ldout(cct, 20) << this << " handle_replay_complete: "
1330 << "shut down replay" << dendl;
1331 m_journal_replay->shut_down(cancel_ops, ctx);
1332 });
9f95a23c 1333
7c673cae
FG
1334 m_journaler->stop_replay(ctx);
1335}
1336
1337template <typename I>
1338void Journal<I>::handle_replay_process_ready(int r) {
1339 // journal::Replay is ready for more events -- attempt to pop another
1340 CephContext *cct = m_image_ctx.cct;
1341 ldout(cct, 20) << this << " " << __func__ << dendl;
1342
11fdf7f2 1343 ceph_assert(r == 0);
7c673cae 1344 {
9f95a23c 1345 std::lock_guard locker{m_lock};
11fdf7f2 1346 ceph_assert(m_processing_entry);
7c673cae
FG
1347 m_processing_entry = false;
1348 }
1349 handle_replay_ready();
1350}
1351
1352template <typename I>
1353void Journal<I>::handle_replay_process_safe(ReplayEntry replay_entry, int r) {
1354 CephContext *cct = m_image_ctx.cct;
1355
9f95a23c 1356 std::unique_lock locker{m_lock};
11fdf7f2
TL
1357 ceph_assert(m_state == STATE_REPLAYING ||
1358 m_state == STATE_FLUSHING_RESTART ||
1359 m_state == STATE_FLUSHING_REPLAY);
7c673cae
FG
1360
1361 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1362 if (r < 0) {
31f18b77
FG
1363 if (r != -ECANCELED) {
1364 lderr(cct) << this << " " << __func__ << ": "
1365 << "failed to commit journal event to disk: "
1366 << cpp_strerror(r) << dendl;
1367 }
7c673cae
FG
1368
1369 if (m_state == STATE_REPLAYING) {
1370 // abort the replay if we have an error
1371 transition_state(STATE_FLUSHING_RESTART, r);
9f95a23c 1372 locker.unlock();
7c673cae
FG
1373
1374 // stop replay, shut down, and restart
94b18763
FG
1375 Context* ctx = create_context_callback<
1376 Journal<I>, &Journal<I>::handle_flushing_restart>(this);
9f95a23c 1377 ctx = new LambdaContext([this, ctx](int r) {
94b18763
FG
1378 // ensure the commit position is flushed to disk
1379 m_journaler->flush_commit_position(ctx);
1380 });
9f95a23c 1381 ctx = new LambdaContext([this, cct, ctx](int r) {
7c673cae
FG
1382 ldout(cct, 20) << this << " handle_replay_process_safe: "
1383 << "shut down replay" << dendl;
1384 {
9f95a23c 1385 std::lock_guard locker{m_lock};
11fdf7f2 1386 ceph_assert(m_state == STATE_FLUSHING_RESTART);
7c673cae
FG
1387 }
1388
94b18763 1389 m_journal_replay->shut_down(true, ctx);
7c673cae
FG
1390 });
1391 m_journaler->stop_replay(ctx);
9f95a23c 1392 m_async_journal_op_tracker.finish_op();
7c673cae
FG
1393 return;
1394 } else if (m_state == STATE_FLUSHING_REPLAY) {
1395 // end-of-replay flush in-progress -- we need to restart replay
1396 transition_state(STATE_FLUSHING_RESTART, r);
9f95a23c
TL
1397 locker.unlock();
1398 m_async_journal_op_tracker.finish_op();
7c673cae
FG
1399 return;
1400 }
1401 } else {
1402 // only commit the entry if written successfully
1403 m_journaler->committed(replay_entry);
1404 }
9f95a23c
TL
1405 locker.unlock();
1406 m_async_journal_op_tracker.finish_op();
7c673cae
FG
1407}
1408
1409template <typename I>
1410void Journal<I>::handle_flushing_restart(int r) {
9f95a23c 1411 std::lock_guard locker{m_lock};
7c673cae
FG
1412
1413 CephContext *cct = m_image_ctx.cct;
1414 ldout(cct, 20) << this << " " << __func__ << dendl;
1415
11fdf7f2
TL
1416 ceph_assert(r == 0);
1417 ceph_assert(m_state == STATE_FLUSHING_RESTART);
7c673cae
FG
1418 if (m_close_pending) {
1419 destroy_journaler(r);
1420 return;
1421 }
1422
1423 recreate_journaler(r);
1424}
1425
1426template <typename I>
1427void Journal<I>::handle_flushing_replay() {
9f95a23c 1428 std::lock_guard locker{m_lock};
7c673cae
FG
1429
1430 CephContext *cct = m_image_ctx.cct;
1431 ldout(cct, 20) << this << " " << __func__ << dendl;
1432
11fdf7f2
TL
1433 ceph_assert(m_state == STATE_FLUSHING_REPLAY ||
1434 m_state == STATE_FLUSHING_RESTART);
7c673cae
FG
1435 if (m_close_pending) {
1436 destroy_journaler(0);
1437 return;
1438 } else if (m_state == STATE_FLUSHING_RESTART) {
1439 // failed to replay one-or-more events -- restart
1440 recreate_journaler(0);
1441 return;
1442 }
1443
1444 delete m_journal_replay;
1445 m_journal_replay = NULL;
1446
1447 m_error_result = 0;
1448 start_append();
1449}
1450
1451template <typename I>
1452void Journal<I>::handle_recording_stopped(int r) {
1453 CephContext *cct = m_image_ctx.cct;
1454 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1455
9f95a23c 1456 std::lock_guard locker{m_lock};
11fdf7f2 1457 ceph_assert(m_state == STATE_STOPPING);
7c673cae
FG
1458
1459 destroy_journaler(r);
1460}
1461
1462template <typename I>
1463void Journal<I>::handle_journal_destroyed(int r) {
1464 CephContext *cct = m_image_ctx.cct;
1465 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1466
1467 if (r < 0) {
1468 lderr(cct) << this << " " << __func__
1469 << "error detected while closing journal: " << cpp_strerror(r)
1470 << dendl;
1471 }
1472
9f95a23c 1473 std::lock_guard locker{m_lock};
7c673cae
FG
1474 delete m_journaler;
1475 m_journaler = nullptr;
1476
11fdf7f2 1477 ceph_assert(m_state == STATE_CLOSING || m_state == STATE_RESTARTING_REPLAY);
7c673cae
FG
1478 if (m_state == STATE_RESTARTING_REPLAY) {
1479 create_journaler();
1480 return;
1481 }
1482
1483 transition_state(STATE_CLOSED, r);
1484}
1485
1486template <typename I>
1487void Journal<I>::handle_io_event_safe(int r, uint64_t tid) {
1488 CephContext *cct = m_image_ctx.cct;
1489 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << ", "
1490 << "tid=" << tid << dendl;
1491
1492 // journal will be flushed before closing
11fdf7f2 1493 ceph_assert(m_state == STATE_READY || m_state == STATE_STOPPING);
7c673cae
FG
1494 if (r < 0) {
1495 lderr(cct) << this << " " << __func__ << ": "
1496 << "failed to commit IO event: " << cpp_strerror(r) << dendl;
1497 }
1498
7c673cae
FG
1499 Contexts on_safe_contexts;
1500 {
9f95a23c 1501 std::lock_guard event_locker{m_event_lock};
7c673cae 1502 typename Events::iterator it = m_events.find(tid);
11fdf7f2 1503 ceph_assert(it != m_events.end());
7c673cae
FG
1504
1505 Event &event = it->second;
7c673cae
FG
1506 on_safe_contexts.swap(event.on_safe_contexts);
1507
1508 if (r < 0 || event.committed_io) {
1509 // failed journal write so IO won't be sent -- or IO extent was
1510 // overwritten by future IO operations so this was a no-op IO event
1511 event.ret_val = r;
1512 for (auto &future : event.futures) {
1513 m_journaler->committed(future);
1514 }
1515 }
1516
1517 if (event.committed_io) {
1518 m_events.erase(it);
1519 } else {
1520 event.safe = true;
1521 }
1522 }
1523
1524 ldout(cct, 20) << this << " " << __func__ << ": "
1525 << "completing tid=" << tid << dendl;
7c673cae
FG
1526
1527 // alert the cache about the journal event status
1528 for (Contexts::iterator it = on_safe_contexts.begin();
1529 it != on_safe_contexts.end(); ++it) {
1530 (*it)->complete(r);
1531 }
1532}
1533
1534template <typename I>
1535void Journal<I>::handle_op_event_safe(int r, uint64_t tid,
1536 const Future &op_start_future,
1537 const Future &op_finish_future,
1538 Context *on_safe) {
1539 CephContext *cct = m_image_ctx.cct;
1540 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << ", "
1541 << "tid=" << tid << dendl;
1542
1543 // journal will be flushed before closing
11fdf7f2 1544 ceph_assert(m_state == STATE_READY || m_state == STATE_STOPPING);
7c673cae
FG
1545 if (r < 0) {
1546 lderr(cct) << this << " " << __func__ << ": "
1547 << "failed to commit op event: " << cpp_strerror(r) << dendl;
1548 }
1549
1550 m_journaler->committed(op_start_future);
1551 m_journaler->committed(op_finish_future);
1552
1553 // reduce the replay window after committing an op event
1554 m_journaler->flush_commit_position(on_safe);
1555}
1556
1557template <typename I>
1558void Journal<I>::stop_recording() {
9f95a23c 1559 ceph_assert(ceph_mutex_is_locked(m_lock));
11fdf7f2 1560 ceph_assert(m_journaler != NULL);
7c673cae 1561
11fdf7f2 1562 ceph_assert(m_state == STATE_READY);
7c673cae
FG
1563 transition_state(STATE_STOPPING, 0);
1564
1565 m_journaler->stop_append(util::create_async_context_callback(
1566 m_image_ctx, create_context_callback<
1567 Journal<I>, &Journal<I>::handle_recording_stopped>(this)));
1568}
1569
1570template <typename I>
1571void Journal<I>::transition_state(State state, int r) {
1572 CephContext *cct = m_image_ctx.cct;
1573 ldout(cct, 20) << this << " " << __func__ << ": new state=" << state << dendl;
9f95a23c 1574 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae
FG
1575 m_state = state;
1576
1577 if (m_error_result == 0 && r < 0) {
1578 m_error_result = r;
1579 }
1580
1581 if (is_steady_state()) {
1582 Contexts wait_for_state_contexts(std::move(m_wait_for_state_contexts));
1583 for (auto ctx : wait_for_state_contexts) {
1584 ctx->complete(m_error_result);
1585 }
1586 }
1587}
1588
1589template <typename I>
1590bool Journal<I>::is_steady_state() const {
9f95a23c 1591 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae
FG
1592 switch (m_state) {
1593 case STATE_READY:
1594 case STATE_CLOSED:
1595 return true;
1596 case STATE_UNINITIALIZED:
1597 case STATE_INITIALIZING:
1598 case STATE_REPLAYING:
1599 case STATE_FLUSHING_RESTART:
1600 case STATE_RESTARTING_REPLAY:
1601 case STATE_FLUSHING_REPLAY:
1602 case STATE_STOPPING:
1603 case STATE_CLOSING:
1604 break;
1605 }
1606 return false;
1607}
1608
1609template <typename I>
1610void Journal<I>::wait_for_steady_state(Context *on_state) {
9f95a23c 1611 ceph_assert(ceph_mutex_is_locked(m_lock));
11fdf7f2 1612 ceph_assert(!is_steady_state());
7c673cae
FG
1613
1614 CephContext *cct = m_image_ctx.cct;
1615 ldout(cct, 20) << this << " " << __func__ << ": on_state=" << on_state
1616 << dendl;
1617 m_wait_for_state_contexts.push_back(on_state);
1618}
1619
1620template <typename I>
1621int Journal<I>::is_resync_requested(bool *do_resync) {
9f95a23c 1622 std::lock_guard l{m_lock};
7c673cae
FG
1623 return check_resync_requested(do_resync);
1624}
1625
1626template <typename I>
1627int Journal<I>::check_resync_requested(bool *do_resync) {
1628 CephContext *cct = m_image_ctx.cct;
1629 ldout(cct, 20) << this << " " << __func__ << dendl;
1630
9f95a23c 1631 ceph_assert(ceph_mutex_is_locked(m_lock));
11fdf7f2 1632 ceph_assert(do_resync != nullptr);
7c673cae
FG
1633
1634 cls::journal::Client client;
1635 int r = m_journaler->get_cached_client(IMAGE_CLIENT_ID, &client);
1636 if (r < 0) {
1637 lderr(cct) << this << " " << __func__ << ": "
1638 << "failed to retrieve client: " << cpp_strerror(r) << dendl;
1639 return r;
1640 }
1641
1642 librbd::journal::ClientData client_data;
11fdf7f2 1643 auto bl_it = client.data.cbegin();
7c673cae 1644 try {
11fdf7f2 1645 decode(client_data, bl_it);
7c673cae
FG
1646 } catch (const buffer::error &err) {
1647 lderr(cct) << this << " " << __func__ << ": "
1648 << "failed to decode client data: " << err << dendl;
1649 return -EINVAL;
1650 }
1651
1652 journal::ImageClientMeta *image_client_meta =
1653 boost::get<journal::ImageClientMeta>(&client_data.client_meta);
1654 if (image_client_meta == nullptr) {
1655 lderr(cct) << this << " " << __func__ << ": "
1656 << "failed to access image client meta struct" << dendl;
1657 return -EINVAL;
1658 }
1659
1660 *do_resync = image_client_meta->resync_requested;
1661
1662 return 0;
1663}
1664
1665struct C_RefreshTags : public Context {
9f95a23c 1666 AsyncOpTracker &async_op_tracker;
7c673cae
FG
1667 Context *on_finish = nullptr;
1668
9f95a23c
TL
1669 ceph::mutex lock =
1670 ceph::make_mutex("librbd::Journal::C_RefreshTags::lock");
11fdf7f2 1671 uint64_t tag_tid = 0;
7c673cae
FG
1672 journal::TagData tag_data;
1673
9f95a23c
TL
1674 explicit C_RefreshTags(AsyncOpTracker &async_op_tracker)
1675 : async_op_tracker(async_op_tracker) {
7c673cae
FG
1676 async_op_tracker.start_op();
1677 }
1678 ~C_RefreshTags() override {
1679 async_op_tracker.finish_op();
1680 }
1681
1682 void finish(int r) override {
1683 on_finish->complete(r);
1684 }
1685};
1686
1687template <typename I>
1688void Journal<I>::handle_metadata_updated() {
1689 CephContext *cct = m_image_ctx.cct;
9f95a23c 1690 std::lock_guard locker{m_lock};
7c673cae
FG
1691
1692 if (m_state != STATE_READY && !is_journal_replaying(m_lock)) {
1693 return;
1694 } else if (is_tag_owner(m_lock)) {
1695 ldout(cct, 20) << this << " " << __func__ << ": primary image" << dendl;
1696 return;
1697 } else if (m_listeners.empty()) {
1698 ldout(cct, 20) << this << " " << __func__ << ": no listeners" << dendl;
1699 return;
1700 }
1701
1702 uint64_t refresh_sequence = ++m_refresh_sequence;
1703 ldout(cct, 20) << this << " " << __func__ << ": "
1704 << "refresh_sequence=" << refresh_sequence << dendl;
1705
1706 // pull the most recent tags from the journal, decode, and
1707 // update the internal tag state
1708 C_RefreshTags *refresh_ctx = new C_RefreshTags(m_async_journal_op_tracker);
9f95a23c 1709 refresh_ctx->on_finish = new LambdaContext(
7c673cae
FG
1710 [this, refresh_sequence, refresh_ctx](int r) {
1711 handle_refresh_metadata(refresh_sequence, refresh_ctx->tag_tid,
1712 refresh_ctx->tag_data, r);
1713 });
1714 C_DecodeTags *decode_tags_ctx = new C_DecodeTags(
1715 cct, &refresh_ctx->lock, &refresh_ctx->tag_tid,
1716 &refresh_ctx->tag_data, refresh_ctx);
1717 m_journaler->get_tags(m_tag_tid == 0 ? 0 : m_tag_tid - 1, m_tag_class,
1718 &decode_tags_ctx->tags, decode_tags_ctx);
1719}
1720
1721template <typename I>
1722void Journal<I>::handle_refresh_metadata(uint64_t refresh_sequence,
1723 uint64_t tag_tid,
1724 journal::TagData tag_data, int r) {
1725 CephContext *cct = m_image_ctx.cct;
9f95a23c 1726 std::unique_lock locker{m_lock};
7c673cae
FG
1727
1728 if (r < 0) {
1729 lderr(cct) << this << " " << __func__ << ": failed to refresh metadata: "
1730 << cpp_strerror(r) << dendl;
1731 return;
1732 } else if (m_state != STATE_READY && !is_journal_replaying(m_lock)) {
1733 return;
1734 } else if (refresh_sequence != m_refresh_sequence) {
1735 // another, more up-to-date refresh is in-flight
1736 return;
1737 }
1738
1739 ldout(cct, 20) << this << " " << __func__ << ": "
1740 << "refresh_sequence=" << refresh_sequence << ", "
1741 << "tag_tid=" << tag_tid << ", "
1742 << "tag_data=" << tag_data << dendl;
9f95a23c 1743 m_listener_cond.wait(locker, [this] { return !m_listener_notify; });
7c673cae
FG
1744
1745 bool was_tag_owner = is_tag_owner(m_lock);
1746 if (m_tag_tid < tag_tid) {
1747 m_tag_tid = tag_tid;
1748 m_tag_data = tag_data;
1749 }
1750 bool promoted_to_primary = (!was_tag_owner && is_tag_owner(m_lock));
1751
1752 bool resync_requested = false;
1753 r = check_resync_requested(&resync_requested);
1754 if (r < 0) {
1755 lderr(cct) << this << " " << __func__ << ": "
1756 << "failed to check if a resync was requested" << dendl;
1757 return;
1758 }
1759
1760 Listeners listeners(m_listeners);
1761 m_listener_notify = true;
9f95a23c 1762 locker.unlock();
7c673cae
FG
1763
1764 if (promoted_to_primary) {
1765 for (auto listener : listeners) {
1766 listener->handle_promoted();
1767 }
1768 } else if (resync_requested) {
1769 for (auto listener : listeners) {
1770 listener->handle_resync();
1771 }
1772 }
1773
9f95a23c 1774 locker.lock();
7c673cae 1775 m_listener_notify = false;
9f95a23c 1776 m_listener_cond.notify_all();
7c673cae
FG
1777}
1778
1779template <typename I>
1780void Journal<I>::add_listener(journal::Listener *listener) {
9f95a23c 1781 std::lock_guard locker{m_lock};
7c673cae
FG
1782 m_listeners.insert(listener);
1783}
1784
1785template <typename I>
1786void Journal<I>::remove_listener(journal::Listener *listener) {
9f95a23c
TL
1787 std::unique_lock locker{m_lock};
1788 m_listener_cond.wait(locker, [this] { return !m_listener_notify; });
7c673cae
FG
1789 m_listeners.erase(listener);
1790}
1791
1792} // namespace librbd
1793
1794#ifndef TEST_F
1795template class librbd::Journal<librbd::ImageCtx>;
1796#endif