]> git.proxmox.com Git - ceph.git/blame_incremental - ceph/src/librbd/Journal.cc
bump version to 19.2.0-pve1
[ceph.git] / ceph / src / librbd / Journal.cc
... / ...
CommitLineData
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "librbd/Journal.h"
5#include "include/rados/librados.hpp"
6#include "common/AsyncOpTracker.h"
7#include "common/errno.h"
8#include "common/Timer.h"
9#include "common/WorkQueue.h"
10#include "cls/journal/cls_journal_types.h"
11#include "journal/Journaler.h"
12#include "journal/Policy.h"
13#include "journal/ReplayEntry.h"
14#include "journal/Settings.h"
15#include "journal/Utils.h"
16#include "librbd/ImageCtx.h"
17#include "librbd/asio/ContextWQ.h"
18#include "librbd/io/ObjectDispatchSpec.h"
19#include "librbd/io/ObjectDispatcherInterface.h"
20#include "librbd/journal/CreateRequest.h"
21#include "librbd/journal/DemoteRequest.h"
22#include "librbd/journal/ObjectDispatch.h"
23#include "librbd/journal/OpenRequest.h"
24#include "librbd/journal/RemoveRequest.h"
25#include "librbd/journal/ResetRequest.h"
26#include "librbd/journal/Replay.h"
27#include "librbd/journal/PromoteRequest.h"
28
29#include <boost/scope_exit.hpp>
30#include <utility>
31
32#define dout_subsys ceph_subsys_rbd
33#undef dout_prefix
34#define dout_prefix *_dout << "librbd::Journal: "
35
36namespace librbd {
37
38using util::create_async_context_callback;
39using util::create_context_callback;
40using journal::util::C_DecodeTag;
41using journal::util::C_DecodeTags;
42using io::Extents;
43
44namespace {
45
46// TODO: once journaler is 100% async and converted to ASIO, remove separate
47// threads and reuse librbd's AsioEngine
48class ThreadPoolSingleton : public ThreadPool {
49public:
50 ContextWQ *work_queue;
51
52 explicit ThreadPoolSingleton(CephContext *cct)
53 : ThreadPool(cct, "librbd::Journal", "tp_librbd_journ", 1),
54 work_queue(new ContextWQ("librbd::journal::work_queue",
55 ceph::make_timespan(
56 cct->_conf.get_val<uint64_t>("rbd_op_thread_timeout")),
57 this)) {
58 start();
59 }
60 ~ThreadPoolSingleton() override {
61 work_queue->drain();
62 delete work_queue;
63
64 stop();
65 }
66};
67
68template <typename I>
69struct C_IsTagOwner : public Context {
70 librados::IoCtx &io_ctx;
71 std::string image_id;
72 bool *is_tag_owner;
73 asio::ContextWQ *op_work_queue;
74 Context *on_finish;
75
76 CephContext *cct = nullptr;
77 Journaler *journaler;
78 cls::journal::Client client;
79 journal::ImageClientMeta client_meta;
80 uint64_t tag_tid = 0;
81 journal::TagData tag_data;
82
83 C_IsTagOwner(librados::IoCtx &io_ctx, const std::string &image_id,
84 bool *is_tag_owner, asio::ContextWQ *op_work_queue,
85 Context *on_finish)
86 : io_ctx(io_ctx), image_id(image_id), is_tag_owner(is_tag_owner),
87 op_work_queue(op_work_queue), on_finish(on_finish),
88 cct(reinterpret_cast<CephContext*>(io_ctx.cct())),
89 journaler(new Journaler(io_ctx, image_id, Journal<>::IMAGE_CLIENT_ID,
90 {}, nullptr)) {
91 }
92
93 void finish(int r) override {
94 ldout(cct, 20) << this << " C_IsTagOwner::" << __func__ << ": r=" << r
95 << dendl;
96 if (r < 0) {
97 lderr(cct) << this << " C_IsTagOwner::" << __func__ << ": "
98 << "failed to get tag owner: " << cpp_strerror(r) << dendl;
99 } else {
100 *is_tag_owner = (tag_data.mirror_uuid == Journal<>::LOCAL_MIRROR_UUID);
101 }
102
103 Journaler *journaler = this->journaler;
104 Context *on_finish = this->on_finish;
105 auto ctx = new LambdaContext(
106 [journaler, on_finish](int r) {
107 on_finish->complete(r);
108 delete journaler;
109 });
110 op_work_queue->queue(ctx, r);
111 }
112};
113
114struct C_GetTagOwner : public Context {
115 std::string *mirror_uuid;
116 Context *on_finish;
117
118 Journaler journaler;
119 cls::journal::Client client;
120 journal::ImageClientMeta client_meta;
121 uint64_t tag_tid = 0;
122 journal::TagData tag_data;
123
124 C_GetTagOwner(librados::IoCtx &io_ctx, const std::string &image_id,
125 std::string *mirror_uuid, Context *on_finish)
126 : mirror_uuid(mirror_uuid), on_finish(on_finish),
127 journaler(io_ctx, image_id, Journal<>::IMAGE_CLIENT_ID, {}, nullptr) {
128 }
129
130 virtual void finish(int r) {
131 if (r >= 0) {
132 *mirror_uuid = tag_data.mirror_uuid;
133 }
134 on_finish->complete(r);
135 }
136};
137
138template <typename J>
139struct GetTagsRequest {
140 CephContext *cct;
141 J *journaler;
142 cls::journal::Client *client;
143 journal::ImageClientMeta *client_meta;
144 uint64_t *tag_tid;
145 journal::TagData *tag_data;
146 Context *on_finish;
147
148 ceph::mutex lock = ceph::make_mutex("lock");
149
150 GetTagsRequest(CephContext *cct, J *journaler, cls::journal::Client *client,
151 journal::ImageClientMeta *client_meta, uint64_t *tag_tid,
152 journal::TagData *tag_data, Context *on_finish)
153 : cct(cct), journaler(journaler), client(client), client_meta(client_meta),
154 tag_tid(tag_tid), tag_data(tag_data), on_finish(on_finish) {
155 }
156
157 /**
158 * @verbatim
159 *
160 * <start>
161 * |
162 * v
163 * GET_CLIENT * * * * * * * * * * * *
164 * | *
165 * v *
166 * GET_TAGS * * * * * * * * * * * * * (error)
167 * | *
168 * v *
169 * <finish> * * * * * * * * * * * * *
170 *
171 * @endverbatim
172 */
173
174 void send() {
175 send_get_client();
176 }
177
178 void send_get_client() {
179 ldout(cct, 20) << __func__ << dendl;
180
181 auto ctx = new LambdaContext(
182 [this](int r) {
183 handle_get_client(r);
184 });
185 journaler->get_client(Journal<ImageCtx>::IMAGE_CLIENT_ID, client, ctx);
186 }
187
188 void handle_get_client(int r) {
189 ldout(cct, 20) << __func__ << ": r=" << r << dendl;
190
191 if (r < 0) {
192 complete(r);
193 return;
194 }
195
196 librbd::journal::ClientData client_data;
197 auto bl_it = client->data.cbegin();
198 try {
199 decode(client_data, bl_it);
200 } catch (const buffer::error &err) {
201 lderr(cct) << this << " OpenJournalerRequest::" << __func__ << ": "
202 << "failed to decode client data" << dendl;
203 complete(-EBADMSG);
204 return;
205 }
206
207 journal::ImageClientMeta *image_client_meta =
208 boost::get<journal::ImageClientMeta>(&client_data.client_meta);
209 if (image_client_meta == nullptr) {
210 lderr(cct) << this << " OpenJournalerRequest::" << __func__ << ": "
211 << "failed to get client meta" << dendl;
212 complete(-EINVAL);
213 return;
214 }
215 *client_meta = *image_client_meta;
216
217 send_get_tags();
218 }
219
220 void send_get_tags() {
221 ldout(cct, 20) << __func__ << dendl;
222
223 auto ctx = new LambdaContext(
224 [this](int r) {
225 handle_get_tags(r);
226 });
227 C_DecodeTags *tags_ctx = new C_DecodeTags(cct, &lock, tag_tid, tag_data,
228 ctx);
229 journaler->get_tags(client_meta->tag_class, &tags_ctx->tags, tags_ctx);
230 }
231
232 void handle_get_tags(int r) {
233 ldout(cct, 20) << __func__ << ": r=" << r << dendl;
234
235 complete(r);
236 }
237
238 void complete(int r) {
239 on_finish->complete(r);
240 delete this;
241 }
242};
243
244template <typename J>
245void get_tags(CephContext *cct, J *journaler,
246 cls::journal::Client *client,
247 journal::ImageClientMeta *client_meta,
248 uint64_t *tag_tid, journal::TagData *tag_data,
249 Context *on_finish) {
250 ldout(cct, 20) << __func__ << dendl;
251
252 GetTagsRequest<J> *req =
253 new GetTagsRequest<J>(cct, journaler, client, client_meta, tag_tid,
254 tag_data, on_finish);
255 req->send();
256}
257
258template <typename J>
259int allocate_journaler_tag(CephContext *cct, J *journaler,
260 uint64_t tag_class,
261 const journal::TagPredecessor &predecessor,
262 const std::string &mirror_uuid,
263 cls::journal::Tag *new_tag) {
264 journal::TagData tag_data;
265 tag_data.mirror_uuid = mirror_uuid;
266 tag_data.predecessor = predecessor;
267
268 bufferlist tag_bl;
269 encode(tag_data, tag_bl);
270
271 C_SaferCond allocate_tag_ctx;
272 journaler->allocate_tag(tag_class, tag_bl, new_tag, &allocate_tag_ctx);
273
274 int r = allocate_tag_ctx.wait();
275 if (r < 0) {
276 lderr(cct) << __func__ << ": "
277 << "failed to allocate tag: " << cpp_strerror(r) << dendl;
278 return r;
279 }
280 return 0;
281}
282
283} // anonymous namespace
284
285// client id for local image
286template <typename I>
287const std::string Journal<I>::IMAGE_CLIENT_ID("");
288
289// mirror uuid to use for local images
290template <typename I>
291const std::string Journal<I>::LOCAL_MIRROR_UUID("");
292
293// mirror uuid to use for orphaned (demoted) images
294template <typename I>
295const std::string Journal<I>::ORPHAN_MIRROR_UUID("<orphan>");
296
297template <typename I>
298std::ostream &operator<<(std::ostream &os,
299 const typename Journal<I>::State &state) {
300 switch (state) {
301 case Journal<I>::STATE_UNINITIALIZED:
302 os << "Uninitialized";
303 break;
304 case Journal<I>::STATE_INITIALIZING:
305 os << "Initializing";
306 break;
307 case Journal<I>::STATE_REPLAYING:
308 os << "Replaying";
309 break;
310 case Journal<I>::STATE_FLUSHING_RESTART:
311 os << "FlushingRestart";
312 break;
313 case Journal<I>::STATE_RESTARTING_REPLAY:
314 os << "RestartingReplay";
315 break;
316 case Journal<I>::STATE_FLUSHING_REPLAY:
317 os << "FlushingReplay";
318 break;
319 case Journal<I>::STATE_READY:
320 os << "Ready";
321 break;
322 case Journal<I>::STATE_STOPPING:
323 os << "Stopping";
324 break;
325 case Journal<I>::STATE_CLOSING:
326 os << "Closing";
327 break;
328 case Journal<I>::STATE_CLOSED:
329 os << "Closed";
330 break;
331 default:
332 os << "Unknown (" << static_cast<uint32_t>(state) << ")";
333 break;
334 }
335 return os;
336}
337
338
339template <typename I>
340void Journal<I>::MetadataListener::handle_update(::journal::JournalMetadata *) {
341 auto ctx = new LambdaContext([this](int r) {
342 journal->handle_metadata_updated();
343 });
344 journal->m_work_queue->queue(ctx, 0);
345}
346
347
348template <typename I>
349void Journal<I>::get_work_queue(CephContext *cct, ContextWQ **work_queue) {
350 auto thread_pool_singleton =
351 &cct->lookup_or_create_singleton_object<ThreadPoolSingleton>(
352 "librbd::journal::thread_pool", false, cct);
353 *work_queue = thread_pool_singleton->work_queue;
354}
355
356template <typename I>
357Journal<I>::Journal(I &image_ctx)
358 : RefCountedObject(image_ctx.cct),
359 m_image_ctx(image_ctx), m_journaler(NULL),
360 m_state(STATE_UNINITIALIZED),
361 m_error_result(0), m_replay_handler(this), m_close_pending(false),
362 m_event_tid(0),
363 m_blocking_writes(false), m_journal_replay(NULL),
364 m_metadata_listener(this) {
365
366 CephContext *cct = m_image_ctx.cct;
367 ldout(cct, 5) << this << ": ictx=" << &m_image_ctx << dendl;
368
369 get_work_queue(cct, &m_work_queue);
370 ImageCtx::get_timer_instance(cct, &m_timer, &m_timer_lock);
371}
372
373template <typename I>
374Journal<I>::~Journal() {
375 if (m_work_queue != nullptr) {
376 m_work_queue->drain();
377 }
378
379 std::lock_guard locker{m_lock};
380 ceph_assert(m_state == STATE_UNINITIALIZED || m_state == STATE_CLOSED);
381 ceph_assert(m_journaler == NULL);
382 ceph_assert(m_journal_replay == NULL);
383 ceph_assert(m_wait_for_state_contexts.empty());
384}
385
386template <typename I>
387bool Journal<I>::is_journal_supported(I &image_ctx) {
388 ceph_assert(ceph_mutex_is_locked(image_ctx.image_lock));
389 return ((image_ctx.features & RBD_FEATURE_JOURNALING) &&
390 !image_ctx.read_only && image_ctx.snap_id == CEPH_NOSNAP);
391}
392
393template <typename I>
394int Journal<I>::create(librados::IoCtx &io_ctx, const std::string &image_id,
395 uint8_t order, uint8_t splay_width,
396 const std::string &object_pool) {
397 CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
398 ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
399
400 ContextWQ *work_queue;
401 get_work_queue(cct, &work_queue);
402
403 C_SaferCond cond;
404 journal::TagData tag_data(LOCAL_MIRROR_UUID);
405 journal::CreateRequest<I> *req = journal::CreateRequest<I>::create(
406 io_ctx, image_id, order, splay_width, object_pool, cls::journal::Tag::TAG_CLASS_NEW,
407 tag_data, IMAGE_CLIENT_ID, work_queue, &cond);
408 req->send();
409
410 return cond.wait();
411}
412
413template <typename I>
414int Journal<I>::remove(librados::IoCtx &io_ctx, const std::string &image_id) {
415 CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
416 ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
417
418 ContextWQ *work_queue;
419 get_work_queue(cct, &work_queue);
420
421 C_SaferCond cond;
422 journal::RemoveRequest<I> *req = journal::RemoveRequest<I>::create(
423 io_ctx, image_id, IMAGE_CLIENT_ID, work_queue, &cond);
424 req->send();
425
426 return cond.wait();
427}
428
429template <typename I>
430int Journal<I>::reset(librados::IoCtx &io_ctx, const std::string &image_id) {
431 CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
432 ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
433
434 ContextWQ *work_queue;
435 get_work_queue(cct, &work_queue);
436
437 C_SaferCond cond;
438 auto req = journal::ResetRequest<I>::create(io_ctx, image_id, IMAGE_CLIENT_ID,
439 Journal<>::LOCAL_MIRROR_UUID,
440 work_queue, &cond);
441 req->send();
442
443 return cond.wait();
444}
445
446template <typename I>
447void Journal<I>::is_tag_owner(I *image_ctx, bool *owner,
448 Context *on_finish) {
449 Journal<I>::is_tag_owner(image_ctx->md_ctx, image_ctx->id, owner,
450 image_ctx->op_work_queue, on_finish);
451}
452
453template <typename I>
454void Journal<I>::is_tag_owner(librados::IoCtx& io_ctx, std::string& image_id,
455 bool *is_tag_owner,
456 asio::ContextWQ *op_work_queue,
457 Context *on_finish) {
458 CephContext *cct = reinterpret_cast<CephContext*>(io_ctx.cct());
459 ldout(cct, 20) << __func__ << dendl;
460
461 C_IsTagOwner<I> *is_tag_owner_ctx = new C_IsTagOwner<I>(
462 io_ctx, image_id, is_tag_owner, op_work_queue, on_finish);
463 get_tags(cct, is_tag_owner_ctx->journaler, &is_tag_owner_ctx->client,
464 &is_tag_owner_ctx->client_meta, &is_tag_owner_ctx->tag_tid,
465 &is_tag_owner_ctx->tag_data, is_tag_owner_ctx);
466}
467
468template <typename I>
469void Journal<I>::get_tag_owner(IoCtx& io_ctx, std::string& image_id,
470 std::string *mirror_uuid,
471 asio::ContextWQ *op_work_queue,
472 Context *on_finish) {
473 CephContext *cct = static_cast<CephContext *>(io_ctx.cct());
474 ldout(cct, 20) << __func__ << dendl;
475
476 auto ctx = new C_GetTagOwner(io_ctx, image_id, mirror_uuid, on_finish);
477 get_tags(cct, &ctx->journaler, &ctx->client, &ctx->client_meta, &ctx->tag_tid,
478 &ctx->tag_data, create_async_context_callback(op_work_queue, ctx));
479}
480
481template <typename I>
482int Journal<I>::request_resync(I *image_ctx) {
483 CephContext *cct = image_ctx->cct;
484 ldout(cct, 20) << __func__ << dendl;
485
486 Journaler journaler(image_ctx->md_ctx, image_ctx->id, IMAGE_CLIENT_ID, {},
487 nullptr);
488
489 ceph::mutex lock = ceph::make_mutex("lock");
490 journal::ImageClientMeta client_meta;
491 uint64_t tag_tid;
492 journal::TagData tag_data;
493
494 C_SaferCond open_ctx;
495 auto open_req = journal::OpenRequest<I>::create(image_ctx, &journaler, &lock,
496 &client_meta, &tag_tid,
497 &tag_data, &open_ctx);
498 open_req->send();
499
500 BOOST_SCOPE_EXIT_ALL(&journaler) {
501 journaler.shut_down();
502 };
503
504 int r = open_ctx.wait();
505 if (r < 0) {
506 return r;
507 }
508
509 client_meta.resync_requested = true;
510
511 journal::ClientData client_data(client_meta);
512 bufferlist client_data_bl;
513 encode(client_data, client_data_bl);
514
515 C_SaferCond update_client_ctx;
516 journaler.update_client(client_data_bl, &update_client_ctx);
517
518 r = update_client_ctx.wait();
519 if (r < 0) {
520 lderr(cct) << __func__ << ": "
521 << "failed to update client: " << cpp_strerror(r) << dendl;
522 return r;
523 }
524 return 0;
525}
526
527template <typename I>
528void Journal<I>::promote(I *image_ctx, Context *on_finish) {
529 CephContext *cct = image_ctx->cct;
530 ldout(cct, 20) << __func__ << dendl;
531
532 auto promote_req = journal::PromoteRequest<I>::create(image_ctx, false,
533 on_finish);
534 promote_req->send();
535}
536
537template <typename I>
538void Journal<I>::demote(I *image_ctx, Context *on_finish) {
539 CephContext *cct = image_ctx->cct;
540 ldout(cct, 20) << __func__ << dendl;
541
542 auto req = journal::DemoteRequest<I>::create(*image_ctx, on_finish);
543 req->send();
544}
545
546template <typename I>
547bool Journal<I>::is_journal_ready() const {
548 std::lock_guard locker{m_lock};
549 return (m_state == STATE_READY);
550}
551
552template <typename I>
553bool Journal<I>::is_journal_replaying() const {
554 std::lock_guard locker{m_lock};
555 return is_journal_replaying(m_lock);
556}
557
558template <typename I>
559bool Journal<I>::is_journal_replaying(const ceph::mutex &) const {
560 ceph_assert(ceph_mutex_is_locked(m_lock));
561 return (m_state == STATE_REPLAYING ||
562 m_state == STATE_FLUSHING_REPLAY ||
563 m_state == STATE_FLUSHING_RESTART ||
564 m_state == STATE_RESTARTING_REPLAY);
565}
566
567template <typename I>
568bool Journal<I>::is_journal_appending() const {
569 ceph_assert(ceph_mutex_is_locked(m_image_ctx.image_lock));
570 std::lock_guard locker{m_lock};
571 return (m_state == STATE_READY &&
572 !m_image_ctx.get_journal_policy()->append_disabled());
573}
574
575template <typename I>
576void Journal<I>::wait_for_journal_ready(Context *on_ready) {
577 on_ready = create_async_context_callback(m_image_ctx, on_ready);
578
579 std::lock_guard locker{m_lock};
580 if (m_state == STATE_READY) {
581 on_ready->complete(m_error_result);
582 } else {
583 wait_for_steady_state(on_ready);
584 }
585}
586
587template <typename I>
588void Journal<I>::open(Context *on_finish) {
589 CephContext *cct = m_image_ctx.cct;
590 ldout(cct, 20) << this << " " << __func__ << dendl;
591
592 on_finish = create_context_callback<Context>(on_finish, this);
593
594 on_finish = create_async_context_callback(m_image_ctx, on_finish);
595
596 // inject our handler into the object dispatcher chain
597 m_image_ctx.io_object_dispatcher->register_dispatch(
598 journal::ObjectDispatch<I>::create(&m_image_ctx, this));
599
600 std::lock_guard locker{m_lock};
601 ceph_assert(m_state == STATE_UNINITIALIZED);
602 wait_for_steady_state(on_finish);
603 create_journaler();
604}
605
606template <typename I>
607void Journal<I>::close(Context *on_finish) {
608 CephContext *cct = m_image_ctx.cct;
609 ldout(cct, 20) << this << " " << __func__ << dendl;
610
611 on_finish = create_context_callback<Context>(on_finish, this);
612
613 on_finish = new LambdaContext([this, on_finish](int r) {
614 // remove our handler from object dispatcher chain - preserve error
615 auto ctx = new LambdaContext([on_finish, r](int _) {
616 on_finish->complete(r);
617 });
618 m_image_ctx.io_object_dispatcher->shut_down_dispatch(
619 io::OBJECT_DISPATCH_LAYER_JOURNAL, ctx);
620 });
621 on_finish = create_async_context_callback(m_image_ctx, on_finish);
622
623 std::unique_lock locker{m_lock};
624 m_listener_cond.wait(locker, [this] { return !m_listener_notify; });
625
626 Listeners listeners(m_listeners);
627 m_listener_notify = true;
628 locker.unlock();
629 for (auto listener : listeners) {
630 listener->handle_close();
631 }
632
633 locker.lock();
634 m_listener_notify = false;
635 m_listener_cond.notify_all();
636
637 ceph_assert(m_state != STATE_UNINITIALIZED);
638 if (m_state == STATE_CLOSED) {
639 on_finish->complete(m_error_result);
640 return;
641 }
642
643 if (m_state == STATE_READY) {
644 stop_recording();
645 }
646
647 m_close_pending = true;
648 wait_for_steady_state(on_finish);
649}
650
651template <typename I>
652bool Journal<I>::is_tag_owner() const {
653 std::lock_guard locker{m_lock};
654 return is_tag_owner(m_lock);
655}
656
657template <typename I>
658bool Journal<I>::is_tag_owner(const ceph::mutex &) const {
659 ceph_assert(ceph_mutex_is_locked(m_lock));
660 return (m_tag_data.mirror_uuid == LOCAL_MIRROR_UUID);
661}
662
663template <typename I>
664uint64_t Journal<I>::get_tag_tid() const {
665 std::lock_guard locker{m_lock};
666 return m_tag_tid;
667}
668
669template <typename I>
670journal::TagData Journal<I>::get_tag_data() const {
671 std::lock_guard locker{m_lock};
672 return m_tag_data;
673}
674
675template <typename I>
676void Journal<I>::allocate_local_tag(Context *on_finish) {
677 CephContext *cct = m_image_ctx.cct;
678 ldout(cct, 20) << this << " " << __func__ << dendl;
679
680 journal::TagPredecessor predecessor;
681 predecessor.mirror_uuid = LOCAL_MIRROR_UUID;
682 {
683 std::lock_guard locker{m_lock};
684 ceph_assert(m_journaler != nullptr && is_tag_owner(m_lock));
685
686 cls::journal::Client client;
687 int r = m_journaler->get_cached_client(IMAGE_CLIENT_ID, &client);
688 if (r < 0) {
689 lderr(cct) << this << " " << __func__ << ": "
690 << "failed to retrieve client: " << cpp_strerror(r) << dendl;
691 m_image_ctx.op_work_queue->queue(on_finish, r);
692 return;
693 }
694
695 // since we are primary, populate the predecessor with our known commit
696 // position
697 ceph_assert(m_tag_data.mirror_uuid == LOCAL_MIRROR_UUID);
698 if (!client.commit_position.object_positions.empty()) {
699 auto position = client.commit_position.object_positions.front();
700 predecessor.commit_valid = true;
701 predecessor.tag_tid = position.tag_tid;
702 predecessor.entry_tid = position.entry_tid;
703 }
704 }
705
706 allocate_tag(LOCAL_MIRROR_UUID, predecessor, on_finish);
707}
708
709template <typename I>
710void Journal<I>::allocate_tag(const std::string &mirror_uuid,
711 const journal::TagPredecessor &predecessor,
712 Context *on_finish) {
713 CephContext *cct = m_image_ctx.cct;
714 ldout(cct, 20) << this << " " << __func__ << ": mirror_uuid=" << mirror_uuid
715 << dendl;
716
717 std::lock_guard locker{m_lock};
718 ceph_assert(m_journaler != nullptr);
719
720 journal::TagData tag_data;
721 tag_data.mirror_uuid = mirror_uuid;
722 tag_data.predecessor = predecessor;
723
724 bufferlist tag_bl;
725 encode(tag_data, tag_bl);
726
727 C_DecodeTag *decode_tag_ctx = new C_DecodeTag(cct, &m_lock, &m_tag_tid,
728 &m_tag_data, on_finish);
729 m_journaler->allocate_tag(m_tag_class, tag_bl, &decode_tag_ctx->tag,
730 decode_tag_ctx);
731}
732
733template <typename I>
734void Journal<I>::flush_commit_position(Context *on_finish) {
735 CephContext *cct = m_image_ctx.cct;
736 ldout(cct, 20) << this << " " << __func__ << dendl;
737
738 std::lock_guard locker{m_lock};
739 ceph_assert(m_journaler != nullptr);
740 m_journaler->flush_commit_position(on_finish);
741}
742
743template <typename I>
744void Journal<I>::user_flushed() {
745 if (m_state == STATE_READY && !m_user_flushed.exchange(true) &&
746 m_image_ctx.config.template get_val<bool>("rbd_journal_object_writethrough_until_flush")) {
747 std::lock_guard locker{m_lock};
748 if (m_state == STATE_READY) {
749 CephContext *cct = m_image_ctx.cct;
750 ldout(cct, 5) << this << " " << __func__ << dendl;
751
752 ceph_assert(m_journaler != nullptr);
753 m_journaler->set_append_batch_options(
754 m_image_ctx.config.template get_val<uint64_t>("rbd_journal_object_flush_interval"),
755 m_image_ctx.config.template get_val<Option::size_t>("rbd_journal_object_flush_bytes"),
756 m_image_ctx.config.template get_val<double>("rbd_journal_object_flush_age"));
757 } else {
758 m_user_flushed = false;
759 }
760 }
761}
762
763template <typename I>
764void Journal<I>::add_write_event_entries(uint64_t offset, size_t length,
765 const bufferlist &bl,
766 uint64_t buffer_offset,
767 Bufferlists *bufferlists) {
768 ceph_assert(m_max_append_size > journal::AioWriteEvent::get_fixed_size());
769 const uint64_t max_write_data_size =
770 m_max_append_size - journal::AioWriteEvent::get_fixed_size();
771
772 // ensure that the write event fits within the journal entry
773 uint64_t bytes_remaining = length;
774 uint64_t event_offset = 0;
775 do {
776 uint64_t event_length = std::min(bytes_remaining, max_write_data_size);
777
778 bufferlist event_bl;
779 event_bl.substr_of(bl, buffer_offset + event_offset, event_length);
780 journal::EventEntry event_entry(journal::AioWriteEvent(offset + event_offset,
781 event_length,
782 event_bl),
783 ceph_clock_now());
784
785 bufferlists->emplace_back();
786 encode(event_entry, bufferlists->back());
787
788 event_offset += event_length;
789 bytes_remaining -= event_length;
790 } while (bytes_remaining > 0);
791}
792
793template <typename I>
794uint64_t Journal<I>::append_write_event(const Extents &image_extents,
795 const bufferlist &bl,
796 bool flush_entry) {
797 Bufferlists bufferlists;
798 uint64_t buffer_offset = 0;
799 for (auto &extent : image_extents) {
800 add_write_event_entries(extent.first, extent.second, bl, buffer_offset,
801 &bufferlists);
802
803 buffer_offset += extent.second;
804 }
805
806 return append_io_events(journal::EVENT_TYPE_AIO_WRITE, bufferlists,
807 image_extents, flush_entry, 0);
808}
809
810template <typename I>
811uint64_t Journal<I>::append_write_same_event(const Extents &image_extents,
812 const bufferlist &bl,
813 bool flush_entry) {
814 Bufferlists bufferlists;
815 for (auto &extent : image_extents) {
816 journal::EventEntry event_entry(
817 journal::AioWriteSameEvent(extent.first, extent.second, bl),
818 ceph_clock_now());
819
820 bufferlists.emplace_back();
821 encode(event_entry, bufferlists.back());
822 }
823
824 return append_io_events(journal::EVENT_TYPE_AIO_WRITESAME, bufferlists,
825 image_extents, flush_entry, 0);
826}
827
828template <typename I>
829uint64_t Journal<I>::append_discard_event(const Extents &image_extents,
830 uint32_t discard_granularity_bytes,
831 bool flush_entry) {
832 Bufferlists bufferlists;
833 for (auto &extent : image_extents) {
834 journal::EventEntry event_entry(
835 journal::AioDiscardEvent(extent.first, extent.second,
836 discard_granularity_bytes),
837 ceph_clock_now());
838
839 bufferlists.emplace_back();
840 encode(event_entry, bufferlists.back());
841 }
842
843 return append_io_events(journal::EVENT_TYPE_AIO_DISCARD, bufferlists,
844 image_extents, flush_entry, 0);
845}
846
847template <typename I>
848uint64_t Journal<I>::append_compare_and_write_event(uint64_t offset,
849 size_t length,
850 const bufferlist &cmp_bl,
851 const bufferlist &write_bl,
852 bool flush_entry) {
853 ceph_assert(
854 m_max_append_size > journal::AioCompareAndWriteEvent::get_fixed_size());
855 uint64_t max_compare_and_write_data_size =
856 m_max_append_size - journal::AioCompareAndWriteEvent::get_fixed_size();
857 // we need double the size because we store cmp and write buffers
858 max_compare_and_write_data_size /= 2;
859
860 // ensure that the compare and write event fits within the journal entry
861 Bufferlists bufferlists;
862 uint64_t bytes_remaining = length;
863 uint64_t event_offset = 0;
864 do {
865 uint64_t event_length = std::min(bytes_remaining,
866 max_compare_and_write_data_size);
867
868 bufferlist event_cmp_bl;
869 event_cmp_bl.substr_of(cmp_bl, event_offset, event_length);
870 bufferlist event_write_bl;
871 event_write_bl.substr_of(write_bl, event_offset, event_length);
872 journal::EventEntry event_entry(
873 journal::AioCompareAndWriteEvent(offset + event_offset,
874 event_length,
875 event_cmp_bl,
876 event_write_bl),
877 ceph_clock_now());
878
879 bufferlists.emplace_back();
880 encode(event_entry, bufferlists.back());
881
882 event_offset += event_length;
883 bytes_remaining -= event_length;
884 } while (bytes_remaining > 0);
885
886 return append_io_events(journal::EVENT_TYPE_AIO_COMPARE_AND_WRITE,
887 bufferlists, {{offset, length}}, flush_entry,
888 -EILSEQ);
889}
890
891template <typename I>
892uint64_t Journal<I>::append_io_event(journal::EventEntry &&event_entry,
893 uint64_t offset, size_t length,
894 bool flush_entry, int filter_ret_val) {
895 bufferlist bl;
896 event_entry.timestamp = ceph_clock_now();
897 encode(event_entry, bl);
898 return append_io_events(event_entry.get_event_type(), {bl},
899 {{offset, length}}, flush_entry, filter_ret_val);
900}
901
902template <typename I>
903uint64_t Journal<I>::append_io_events(journal::EventType event_type,
904 const Bufferlists &bufferlists,
905 const Extents &image_extents,
906 bool flush_entry, int filter_ret_val) {
907 ceph_assert(!bufferlists.empty());
908
909 uint64_t tid;
910 {
911 std::lock_guard locker{m_lock};
912 ceph_assert(m_state == STATE_READY);
913
914 tid = ++m_event_tid;
915 ceph_assert(tid != 0);
916 }
917
918 Futures futures;
919 for (auto &bl : bufferlists) {
920 ceph_assert(bl.length() <= m_max_append_size);
921 futures.push_back(m_journaler->append(m_tag_tid, bl));
922 }
923
924 {
925 std::lock_guard event_locker{m_event_lock};
926 m_events[tid] = Event(futures, image_extents, filter_ret_val);
927 }
928
929 CephContext *cct = m_image_ctx.cct;
930 ldout(cct, 20) << this << " " << __func__ << ": "
931 << "event=" << event_type << ", "
932 << "image_extents=" << image_extents << ", "
933 << "flush=" << flush_entry << ", tid=" << tid << dendl;
934
935 Context *on_safe = create_async_context_callback(
936 m_image_ctx, new C_IOEventSafe(this, tid));
937 if (flush_entry) {
938 futures.back().flush(on_safe);
939 } else {
940 futures.back().wait(on_safe);
941 }
942
943 return tid;
944}
945
946template <typename I>
947void Journal<I>::commit_io_event(uint64_t tid, int r) {
948 CephContext *cct = m_image_ctx.cct;
949 ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
950 "r=" << r << dendl;
951
952 std::lock_guard event_locker{m_event_lock};
953 typename Events::iterator it = m_events.find(tid);
954 if (it == m_events.end()) {
955 return;
956 }
957 complete_event(it, r);
958}
959
960template <typename I>
961void Journal<I>::commit_io_event_extent(uint64_t tid, uint64_t offset,
962 uint64_t length, int r) {
963 ceph_assert(length > 0);
964
965 CephContext *cct = m_image_ctx.cct;
966 ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
967 << "offset=" << offset << ", "
968 << "length=" << length << ", "
969 << "r=" << r << dendl;
970
971 std::lock_guard event_locker{m_event_lock};
972 typename Events::iterator it = m_events.find(tid);
973 if (it == m_events.end()) {
974 return;
975 }
976
977 Event &event = it->second;
978 if (event.ret_val == 0 && r < 0) {
979 event.ret_val = r;
980 }
981
982 ExtentInterval extent;
983 extent.insert(offset, length);
984
985 ExtentInterval intersect;
986 intersect.intersection_of(extent, event.pending_extents);
987
988 event.pending_extents.subtract(intersect);
989 if (!event.pending_extents.empty()) {
990 ldout(cct, 20) << this << " " << __func__ << ": "
991 << "pending extents: " << event.pending_extents << dendl;
992 return;
993 }
994 complete_event(it, event.ret_val);
995}
996
997template <typename I>
998void Journal<I>::append_op_event(uint64_t op_tid,
999 journal::EventEntry &&event_entry,
1000 Context *on_safe) {
1001 ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
1002
1003 bufferlist bl;
1004 event_entry.timestamp = ceph_clock_now();
1005 encode(event_entry, bl);
1006
1007 Future future;
1008 {
1009 std::lock_guard locker{m_lock};
1010 ceph_assert(m_state == STATE_READY);
1011
1012 future = m_journaler->append(m_tag_tid, bl);
1013
1014 // delay committing op event to ensure consistent replay
1015 ceph_assert(m_op_futures.count(op_tid) == 0);
1016 m_op_futures[op_tid] = future;
1017 }
1018
1019 on_safe = create_async_context_callback(m_image_ctx, on_safe);
1020 on_safe = new LambdaContext([this, on_safe](int r) {
1021 // ensure all committed IO before this op is committed
1022 m_journaler->flush_commit_position(on_safe);
1023 });
1024 future.flush(on_safe);
1025
1026 CephContext *cct = m_image_ctx.cct;
1027 ldout(cct, 10) << this << " " << __func__ << ": "
1028 << "op_tid=" << op_tid << ", "
1029 << "event=" << event_entry.get_event_type() << dendl;
1030}
1031
1032template <typename I>
1033void Journal<I>::commit_op_event(uint64_t op_tid, int r, Context *on_safe) {
1034 CephContext *cct = m_image_ctx.cct;
1035 ldout(cct, 10) << this << " " << __func__ << ": op_tid=" << op_tid << ", "
1036 << "r=" << r << dendl;
1037
1038 journal::EventEntry event_entry((journal::OpFinishEvent(op_tid, r)),
1039 ceph_clock_now());
1040
1041 bufferlist bl;
1042 encode(event_entry, bl);
1043
1044 Future op_start_future;
1045 Future op_finish_future;
1046 {
1047 std::lock_guard locker{m_lock};
1048 ceph_assert(m_state == STATE_READY);
1049
1050 // ready to commit op event
1051 auto it = m_op_futures.find(op_tid);
1052 ceph_assert(it != m_op_futures.end());
1053 op_start_future = it->second;
1054 m_op_futures.erase(it);
1055
1056 op_finish_future = m_journaler->append(m_tag_tid, bl);
1057 }
1058
1059 op_finish_future.flush(create_async_context_callback(
1060 m_image_ctx, new C_OpEventSafe(this, op_tid, op_start_future,
1061 op_finish_future, on_safe)));
1062}
1063
1064template <typename I>
1065void Journal<I>::replay_op_ready(uint64_t op_tid, Context *on_resume) {
1066 CephContext *cct = m_image_ctx.cct;
1067 ldout(cct, 10) << this << " " << __func__ << ": op_tid=" << op_tid << dendl;
1068
1069 {
1070 std::lock_guard locker{m_lock};
1071 ceph_assert(m_journal_replay != nullptr);
1072 m_journal_replay->replay_op_ready(op_tid, on_resume);
1073 }
1074}
1075
1076template <typename I>
1077void Journal<I>::flush_event(uint64_t tid, Context *on_safe) {
1078 CephContext *cct = m_image_ctx.cct;
1079 ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
1080 << "on_safe=" << on_safe << dendl;
1081
1082 on_safe = create_context_callback<Context>(on_safe, this);
1083
1084 Future future;
1085 {
1086 std::lock_guard event_locker{m_event_lock};
1087 future = wait_event(m_lock, tid, on_safe);
1088 }
1089
1090 if (future.is_valid()) {
1091 future.flush(nullptr);
1092 }
1093}
1094
1095template <typename I>
1096void Journal<I>::wait_event(uint64_t tid, Context *on_safe) {
1097 CephContext *cct = m_image_ctx.cct;
1098 ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
1099 << "on_safe=" << on_safe << dendl;
1100
1101 on_safe = create_context_callback<Context>(on_safe, this);
1102
1103 std::lock_guard event_locker{m_event_lock};
1104 wait_event(m_lock, tid, on_safe);
1105}
1106
1107template <typename I>
1108typename Journal<I>::Future Journal<I>::wait_event(ceph::mutex &lock, uint64_t tid,
1109 Context *on_safe) {
1110 ceph_assert(ceph_mutex_is_locked(m_event_lock));
1111 CephContext *cct = m_image_ctx.cct;
1112
1113 typename Events::iterator it = m_events.find(tid);
1114 ceph_assert(it != m_events.end());
1115
1116 Event &event = it->second;
1117 if (event.safe) {
1118 // journal entry already safe
1119 ldout(cct, 20) << this << " " << __func__ << ": "
1120 << "journal entry already safe" << dendl;
1121 m_image_ctx.op_work_queue->queue(on_safe, event.ret_val);
1122 return Future();
1123 }
1124
1125 event.on_safe_contexts.push_back(create_async_context_callback(m_image_ctx,
1126 on_safe));
1127 return event.futures.back();
1128}
1129
1130template <typename I>
1131void Journal<I>::start_external_replay(journal::Replay<I> **journal_replay,
1132 Context *on_start) {
1133 CephContext *cct = m_image_ctx.cct;
1134 ldout(cct, 20) << this << " " << __func__ << dendl;
1135
1136 std::lock_guard locker{m_lock};
1137 ceph_assert(m_state == STATE_READY);
1138 ceph_assert(m_journal_replay == nullptr);
1139
1140 on_start = util::create_async_context_callback(m_image_ctx, on_start);
1141 on_start = new LambdaContext(
1142 [this, journal_replay, on_start](int r) {
1143 handle_start_external_replay(r, journal_replay, on_start);
1144 });
1145
1146 // safely flush all in-flight events before starting external replay
1147 m_journaler->stop_append(util::create_async_context_callback(m_image_ctx,
1148 on_start));
1149}
1150
1151template <typename I>
1152void Journal<I>::handle_start_external_replay(int r,
1153 journal::Replay<I> **journal_replay,
1154 Context *on_finish) {
1155 CephContext *cct = m_image_ctx.cct;
1156 ldout(cct, 20) << this << " " << __func__ << dendl;
1157
1158 std::lock_guard locker{m_lock};
1159 ceph_assert(m_state == STATE_READY);
1160 ceph_assert(m_journal_replay == nullptr);
1161
1162 if (r < 0) {
1163 lderr(cct) << this << " " << __func__ << ": "
1164 << "failed to stop recording: " << cpp_strerror(r) << dendl;
1165 *journal_replay = nullptr;
1166
1167 // get back to a sane-state
1168 start_append();
1169 on_finish->complete(r);
1170 return;
1171 }
1172
1173 transition_state(STATE_REPLAYING, 0);
1174 m_journal_replay = journal::Replay<I>::create(m_image_ctx);
1175 *journal_replay = m_journal_replay;
1176 on_finish->complete(0);
1177}
1178
1179template <typename I>
1180void Journal<I>::stop_external_replay() {
1181 CephContext *cct = m_image_ctx.cct;
1182 ldout(cct, 20) << this << " " << __func__ << dendl;
1183
1184 std::lock_guard locker{m_lock};
1185 ceph_assert(m_journal_replay != nullptr);
1186 ceph_assert(m_state == STATE_REPLAYING);
1187
1188 delete m_journal_replay;
1189 m_journal_replay = nullptr;
1190
1191 if (m_close_pending) {
1192 destroy_journaler(0);
1193 return;
1194 }
1195
1196 start_append();
1197}
1198
1199template <typename I>
1200void Journal<I>::create_journaler() {
1201 CephContext *cct = m_image_ctx.cct;
1202 ldout(cct, 20) << this << " " << __func__ << dendl;
1203
1204 ceph_assert(ceph_mutex_is_locked(m_lock));
1205 ceph_assert(m_state == STATE_UNINITIALIZED || m_state == STATE_RESTARTING_REPLAY);
1206 ceph_assert(m_journaler == NULL);
1207
1208 transition_state(STATE_INITIALIZING, 0);
1209 ::journal::Settings settings;
1210 settings.commit_interval =
1211 m_image_ctx.config.template get_val<double>("rbd_journal_commit_age");
1212 settings.max_payload_bytes =
1213 m_image_ctx.config.template get_val<Option::size_t>("rbd_journal_max_payload_bytes");
1214 settings.max_concurrent_object_sets =
1215 m_image_ctx.config.template get_val<uint64_t>("rbd_journal_max_concurrent_object_sets");
1216 // TODO: a configurable filter to exclude certain peers from being
1217 // disconnected.
1218 settings.ignored_laggy_clients = {IMAGE_CLIENT_ID};
1219
1220 m_journaler = new Journaler(m_work_queue, m_timer, m_timer_lock,
1221 m_image_ctx.md_ctx, m_image_ctx.id,
1222 IMAGE_CLIENT_ID, settings, nullptr);
1223 m_journaler->add_listener(&m_metadata_listener);
1224
1225 Context *ctx = create_async_context_callback(
1226 m_image_ctx, create_context_callback<
1227 Journal<I>, &Journal<I>::handle_open>(this));
1228 auto open_req = journal::OpenRequest<I>::create(&m_image_ctx, m_journaler,
1229 &m_lock, &m_client_meta,
1230 &m_tag_tid, &m_tag_data, ctx);
1231 open_req->send();
1232}
1233
1234template <typename I>
1235void Journal<I>::destroy_journaler(int r) {
1236 CephContext *cct = m_image_ctx.cct;
1237 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1238
1239 ceph_assert(ceph_mutex_is_locked(m_lock));
1240
1241 delete m_journal_replay;
1242 m_journal_replay = NULL;
1243
1244 m_journaler->remove_listener(&m_metadata_listener);
1245
1246 transition_state(STATE_CLOSING, r);
1247
1248 Context *ctx = create_async_context_callback(
1249 m_image_ctx, create_context_callback<
1250 Journal<I>, &Journal<I>::handle_journal_destroyed>(this));
1251 ctx = new LambdaContext(
1252 [this, ctx](int r) {
1253 std::lock_guard locker{m_lock};
1254 m_journaler->shut_down(ctx);
1255 });
1256 ctx = create_async_context_callback(m_image_ctx, ctx);
1257 m_async_journal_op_tracker.wait_for_ops(ctx);
1258}
1259
1260template <typename I>
1261void Journal<I>::recreate_journaler(int r) {
1262 CephContext *cct = m_image_ctx.cct;
1263 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1264
1265 ceph_assert(ceph_mutex_is_locked(m_lock));
1266 ceph_assert(m_state == STATE_FLUSHING_RESTART ||
1267 m_state == STATE_FLUSHING_REPLAY);
1268
1269 delete m_journal_replay;
1270 m_journal_replay = NULL;
1271
1272 m_journaler->remove_listener(&m_metadata_listener);
1273
1274 transition_state(STATE_RESTARTING_REPLAY, r);
1275 m_journaler->shut_down(create_async_context_callback(
1276 m_image_ctx, create_context_callback<
1277 Journal<I>, &Journal<I>::handle_journal_destroyed>(this)));
1278}
1279
1280template <typename I>
1281void Journal<I>::complete_event(typename Events::iterator it, int r) {
1282 ceph_assert(ceph_mutex_is_locked(m_event_lock));
1283 ceph_assert(m_state == STATE_READY);
1284
1285 CephContext *cct = m_image_ctx.cct;
1286 ldout(cct, 20) << this << " " << __func__ << ": tid=" << it->first << " "
1287 << "r=" << r << dendl;
1288
1289 Event &event = it->second;
1290 if (r < 0 && r == event.filter_ret_val) {
1291 // ignore allowed error codes
1292 r = 0;
1293 }
1294 if (r < 0) {
1295 // event recorded to journal but failed to update disk, we cannot
1296 // commit this IO event. this event must be replayed.
1297 ceph_assert(event.safe);
1298 lderr(cct) << this << " " << __func__ << ": "
1299 << "failed to commit IO to disk, replay required: "
1300 << cpp_strerror(r) << dendl;
1301 }
1302
1303 event.committed_io = true;
1304 if (event.safe) {
1305 if (r >= 0) {
1306 for (auto &future : event.futures) {
1307 m_journaler->committed(future);
1308 }
1309 }
1310 m_events.erase(it);
1311 }
1312}
1313
1314template <typename I>
1315void Journal<I>::start_append() {
1316 ceph_assert(ceph_mutex_is_locked(m_lock));
1317
1318 m_journaler->start_append(
1319 m_image_ctx.config.template get_val<uint64_t>("rbd_journal_object_max_in_flight_appends"));
1320 if (!m_image_ctx.config.template get_val<bool>("rbd_journal_object_writethrough_until_flush")) {
1321 m_journaler->set_append_batch_options(
1322 m_image_ctx.config.template get_val<uint64_t>("rbd_journal_object_flush_interval"),
1323 m_image_ctx.config.template get_val<Option::size_t>("rbd_journal_object_flush_bytes"),
1324 m_image_ctx.config.template get_val<double>("rbd_journal_object_flush_age"));
1325 }
1326
1327 transition_state(STATE_READY, 0);
1328}
1329
1330template <typename I>
1331void Journal<I>::handle_open(int r) {
1332 CephContext *cct = m_image_ctx.cct;
1333 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1334
1335 std::lock_guard locker{m_lock};
1336 ceph_assert(m_state == STATE_INITIALIZING);
1337
1338 if (r < 0) {
1339 lderr(cct) << this << " " << __func__ << ": "
1340 << "failed to initialize journal: " << cpp_strerror(r)
1341 << dendl;
1342 destroy_journaler(r);
1343 return;
1344 }
1345
1346 m_tag_class = m_client_meta.tag_class;
1347 m_max_append_size = m_journaler->get_max_append_size();
1348 ldout(cct, 20) << this << " " << __func__ << ": "
1349 << "tag_class=" << m_tag_class << ", "
1350 << "max_append_size=" << m_max_append_size << dendl;
1351
1352 transition_state(STATE_REPLAYING, 0);
1353 m_journal_replay = journal::Replay<I>::create(m_image_ctx);
1354 m_journaler->start_replay(&m_replay_handler);
1355}
1356
1357template <typename I>
1358void Journal<I>::handle_replay_ready() {
1359 CephContext *cct = m_image_ctx.cct;
1360 ReplayEntry replay_entry;
1361 {
1362 std::lock_guard locker{m_lock};
1363 if (m_state != STATE_REPLAYING) {
1364 return;
1365 }
1366
1367 ldout(cct, 20) << this << " " << __func__ << dendl;
1368 if (!m_journaler->try_pop_front(&replay_entry)) {
1369 return;
1370 }
1371
1372 // only one entry should be in-flight at a time
1373 ceph_assert(!m_processing_entry);
1374 m_processing_entry = true;
1375 }
1376
1377 m_async_journal_op_tracker.start_op();
1378
1379 bufferlist data = replay_entry.get_data();
1380 auto it = data.cbegin();
1381
1382 journal::EventEntry event_entry;
1383 int r = m_journal_replay->decode(&it, &event_entry);
1384 if (r < 0) {
1385 lderr(cct) << this << " " << __func__ << ": "
1386 << "failed to decode journal event entry" << dendl;
1387 handle_replay_process_safe(replay_entry, r);
1388 return;
1389 }
1390
1391 Context *on_ready = create_context_callback<
1392 Journal<I>, &Journal<I>::handle_replay_process_ready>(this);
1393 Context *on_commit = new C_ReplayProcessSafe(this, std::move(replay_entry));
1394 m_journal_replay->process(event_entry, on_ready, on_commit);
1395}
1396
1397template <typename I>
1398void Journal<I>::handle_replay_complete(int r) {
1399 CephContext *cct = m_image_ctx.cct;
1400
1401 bool cancel_ops = false;
1402 {
1403 std::lock_guard locker{m_lock};
1404 if (m_state != STATE_REPLAYING) {
1405 return;
1406 }
1407
1408 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1409 if (r < 0) {
1410 cancel_ops = true;
1411 transition_state(STATE_FLUSHING_RESTART, r);
1412 } else {
1413 // state might change back to FLUSHING_RESTART on flush error
1414 transition_state(STATE_FLUSHING_REPLAY, 0);
1415 }
1416 }
1417
1418 Context *ctx = new LambdaContext([this, cct](int r) {
1419 ldout(cct, 20) << this << " handle_replay_complete: "
1420 << "handle shut down replay" << dendl;
1421
1422 State state;
1423 {
1424 std::lock_guard locker{m_lock};
1425 ceph_assert(m_state == STATE_FLUSHING_RESTART ||
1426 m_state == STATE_FLUSHING_REPLAY);
1427 state = m_state;
1428 }
1429
1430 if (state == STATE_FLUSHING_RESTART) {
1431 handle_flushing_restart(0);
1432 } else {
1433 handle_flushing_replay();
1434 }
1435 });
1436 ctx = new LambdaContext([this, ctx](int r) {
1437 // ensure the commit position is flushed to disk
1438 m_journaler->flush_commit_position(ctx);
1439 });
1440 ctx = create_async_context_callback(m_image_ctx, ctx);
1441 ctx = new LambdaContext([this, ctx](int r) {
1442 m_async_journal_op_tracker.wait_for_ops(ctx);
1443 });
1444 ctx = new LambdaContext([this, cct, cancel_ops, ctx](int r) {
1445 ldout(cct, 20) << this << " handle_replay_complete: "
1446 << "shut down replay" << dendl;
1447 m_journal_replay->shut_down(cancel_ops, ctx);
1448 });
1449
1450 m_journaler->stop_replay(ctx);
1451}
1452
1453template <typename I>
1454void Journal<I>::handle_replay_process_ready(int r) {
1455 // journal::Replay is ready for more events -- attempt to pop another
1456 CephContext *cct = m_image_ctx.cct;
1457 ldout(cct, 20) << this << " " << __func__ << dendl;
1458
1459 ceph_assert(r == 0);
1460 {
1461 std::lock_guard locker{m_lock};
1462 ceph_assert(m_processing_entry);
1463 m_processing_entry = false;
1464 }
1465 handle_replay_ready();
1466}
1467
1468template <typename I>
1469void Journal<I>::handle_replay_process_safe(ReplayEntry replay_entry, int r) {
1470 CephContext *cct = m_image_ctx.cct;
1471
1472 std::unique_lock locker{m_lock};
1473 ceph_assert(m_state == STATE_REPLAYING ||
1474 m_state == STATE_FLUSHING_RESTART ||
1475 m_state == STATE_FLUSHING_REPLAY);
1476
1477 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1478 if (r < 0) {
1479 if (r != -ECANCELED) {
1480 lderr(cct) << this << " " << __func__ << ": "
1481 << "failed to commit journal event to disk: "
1482 << cpp_strerror(r) << dendl;
1483 }
1484
1485 if (m_state == STATE_REPLAYING) {
1486 // abort the replay if we have an error
1487 transition_state(STATE_FLUSHING_RESTART, r);
1488 locker.unlock();
1489
1490 // stop replay, shut down, and restart
1491 Context* ctx = create_context_callback<
1492 Journal<I>, &Journal<I>::handle_flushing_restart>(this);
1493 ctx = new LambdaContext([this, ctx](int r) {
1494 // ensure the commit position is flushed to disk
1495 m_journaler->flush_commit_position(ctx);
1496 });
1497 ctx = new LambdaContext([this, cct, ctx](int r) {
1498 ldout(cct, 20) << this << " handle_replay_process_safe: "
1499 << "shut down replay" << dendl;
1500 {
1501 std::lock_guard locker{m_lock};
1502 ceph_assert(m_state == STATE_FLUSHING_RESTART);
1503 }
1504
1505 m_journal_replay->shut_down(true, ctx);
1506 });
1507 m_journaler->stop_replay(ctx);
1508 m_async_journal_op_tracker.finish_op();
1509 return;
1510 } else if (m_state == STATE_FLUSHING_REPLAY) {
1511 // end-of-replay flush in-progress -- we need to restart replay
1512 transition_state(STATE_FLUSHING_RESTART, r);
1513 locker.unlock();
1514 m_async_journal_op_tracker.finish_op();
1515 return;
1516 }
1517 } else {
1518 // only commit the entry if written successfully
1519 m_journaler->committed(replay_entry);
1520 }
1521 locker.unlock();
1522 m_async_journal_op_tracker.finish_op();
1523}
1524
1525template <typename I>
1526void Journal<I>::handle_flushing_restart(int r) {
1527 std::lock_guard locker{m_lock};
1528
1529 CephContext *cct = m_image_ctx.cct;
1530 ldout(cct, 20) << this << " " << __func__ << dendl;
1531
1532 ceph_assert(r == 0);
1533 ceph_assert(m_state == STATE_FLUSHING_RESTART);
1534 if (m_close_pending) {
1535 destroy_journaler(r);
1536 return;
1537 }
1538
1539 recreate_journaler(r);
1540}
1541
1542template <typename I>
1543void Journal<I>::handle_flushing_replay() {
1544 std::lock_guard locker{m_lock};
1545
1546 CephContext *cct = m_image_ctx.cct;
1547 ldout(cct, 20) << this << " " << __func__ << dendl;
1548
1549 ceph_assert(m_state == STATE_FLUSHING_REPLAY ||
1550 m_state == STATE_FLUSHING_RESTART);
1551 if (m_close_pending) {
1552 destroy_journaler(0);
1553 return;
1554 } else if (m_state == STATE_FLUSHING_RESTART) {
1555 // failed to replay one-or-more events -- restart
1556 recreate_journaler(0);
1557 return;
1558 }
1559
1560 delete m_journal_replay;
1561 m_journal_replay = NULL;
1562
1563 m_error_result = 0;
1564 start_append();
1565}
1566
1567template <typename I>
1568void Journal<I>::handle_recording_stopped(int r) {
1569 CephContext *cct = m_image_ctx.cct;
1570 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1571
1572 std::lock_guard locker{m_lock};
1573 ceph_assert(m_state == STATE_STOPPING);
1574
1575 destroy_journaler(r);
1576}
1577
1578template <typename I>
1579void Journal<I>::handle_journal_destroyed(int r) {
1580 CephContext *cct = m_image_ctx.cct;
1581 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1582
1583 if (r < 0) {
1584 lderr(cct) << this << " " << __func__
1585 << "error detected while closing journal: " << cpp_strerror(r)
1586 << dendl;
1587 }
1588
1589 std::lock_guard locker{m_lock};
1590 delete m_journaler;
1591 m_journaler = nullptr;
1592
1593 ceph_assert(m_state == STATE_CLOSING || m_state == STATE_RESTARTING_REPLAY);
1594 if (m_state == STATE_RESTARTING_REPLAY) {
1595 create_journaler();
1596 return;
1597 }
1598
1599 transition_state(STATE_CLOSED, r);
1600}
1601
1602template <typename I>
1603void Journal<I>::handle_io_event_safe(int r, uint64_t tid) {
1604 CephContext *cct = m_image_ctx.cct;
1605 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << ", "
1606 << "tid=" << tid << dendl;
1607
1608 // journal will be flushed before closing
1609 ceph_assert(m_state == STATE_READY || m_state == STATE_STOPPING);
1610 if (r < 0) {
1611 lderr(cct) << this << " " << __func__ << ": "
1612 << "failed to commit IO event: " << cpp_strerror(r) << dendl;
1613 }
1614
1615 Contexts on_safe_contexts;
1616 {
1617 std::lock_guard event_locker{m_event_lock};
1618 typename Events::iterator it = m_events.find(tid);
1619 ceph_assert(it != m_events.end());
1620
1621 Event &event = it->second;
1622 on_safe_contexts.swap(event.on_safe_contexts);
1623
1624 if (r < 0 || event.committed_io) {
1625 // failed journal write so IO won't be sent -- or IO extent was
1626 // overwritten by future IO operations so this was a no-op IO event
1627 event.ret_val = r;
1628 for (auto &future : event.futures) {
1629 m_journaler->committed(future);
1630 }
1631 }
1632
1633 if (event.committed_io) {
1634 m_events.erase(it);
1635 } else {
1636 event.safe = true;
1637 }
1638 }
1639
1640 ldout(cct, 20) << this << " " << __func__ << ": "
1641 << "completing tid=" << tid << dendl;
1642
1643 // alert the cache about the journal event status
1644 for (Contexts::iterator it = on_safe_contexts.begin();
1645 it != on_safe_contexts.end(); ++it) {
1646 (*it)->complete(r);
1647 }
1648}
1649
1650template <typename I>
1651void Journal<I>::handle_op_event_safe(int r, uint64_t tid,
1652 const Future &op_start_future,
1653 const Future &op_finish_future,
1654 Context *on_safe) {
1655 CephContext *cct = m_image_ctx.cct;
1656 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << ", "
1657 << "tid=" << tid << dendl;
1658
1659 // journal will be flushed before closing
1660 ceph_assert(m_state == STATE_READY || m_state == STATE_STOPPING);
1661 if (r < 0) {
1662 lderr(cct) << this << " " << __func__ << ": "
1663 << "failed to commit op event: " << cpp_strerror(r) << dendl;
1664 }
1665
1666 m_journaler->committed(op_start_future);
1667 m_journaler->committed(op_finish_future);
1668
1669 // reduce the replay window after committing an op event
1670 m_journaler->flush_commit_position(on_safe);
1671}
1672
1673template <typename I>
1674void Journal<I>::stop_recording() {
1675 ceph_assert(ceph_mutex_is_locked(m_lock));
1676 ceph_assert(m_journaler != NULL);
1677
1678 ceph_assert(m_state == STATE_READY);
1679 transition_state(STATE_STOPPING, 0);
1680
1681 m_journaler->stop_append(util::create_async_context_callback(
1682 m_image_ctx, create_context_callback<
1683 Journal<I>, &Journal<I>::handle_recording_stopped>(this)));
1684}
1685
1686template <typename I>
1687void Journal<I>::transition_state(State state, int r) {
1688 CephContext *cct = m_image_ctx.cct;
1689 ldout(cct, 20) << this << " " << __func__ << ": new state=" << state << dendl;
1690 ceph_assert(ceph_mutex_is_locked(m_lock));
1691 m_state = state;
1692
1693 if (m_error_result == 0 && r < 0) {
1694 m_error_result = r;
1695 }
1696
1697 if (is_steady_state()) {
1698 auto wait_for_state_contexts(std::move(m_wait_for_state_contexts));
1699 m_wait_for_state_contexts.clear();
1700
1701 for (auto ctx : wait_for_state_contexts) {
1702 ctx->complete(m_error_result);
1703 }
1704 }
1705}
1706
1707template <typename I>
1708bool Journal<I>::is_steady_state() const {
1709 ceph_assert(ceph_mutex_is_locked(m_lock));
1710 switch (m_state) {
1711 case STATE_READY:
1712 case STATE_CLOSED:
1713 return true;
1714 case STATE_UNINITIALIZED:
1715 case STATE_INITIALIZING:
1716 case STATE_REPLAYING:
1717 case STATE_FLUSHING_RESTART:
1718 case STATE_RESTARTING_REPLAY:
1719 case STATE_FLUSHING_REPLAY:
1720 case STATE_STOPPING:
1721 case STATE_CLOSING:
1722 break;
1723 }
1724 return false;
1725}
1726
1727template <typename I>
1728void Journal<I>::wait_for_steady_state(Context *on_state) {
1729 ceph_assert(ceph_mutex_is_locked(m_lock));
1730 ceph_assert(!is_steady_state());
1731
1732 CephContext *cct = m_image_ctx.cct;
1733 ldout(cct, 20) << this << " " << __func__ << ": on_state=" << on_state
1734 << dendl;
1735 m_wait_for_state_contexts.push_back(on_state);
1736}
1737
1738template <typename I>
1739int Journal<I>::is_resync_requested(bool *do_resync) {
1740 std::lock_guard l{m_lock};
1741 return check_resync_requested(do_resync);
1742}
1743
1744template <typename I>
1745int Journal<I>::check_resync_requested(bool *do_resync) {
1746 CephContext *cct = m_image_ctx.cct;
1747 ldout(cct, 20) << this << " " << __func__ << dendl;
1748
1749 ceph_assert(ceph_mutex_is_locked(m_lock));
1750 ceph_assert(do_resync != nullptr);
1751
1752 cls::journal::Client client;
1753 int r = m_journaler->get_cached_client(IMAGE_CLIENT_ID, &client);
1754 if (r < 0) {
1755 lderr(cct) << this << " " << __func__ << ": "
1756 << "failed to retrieve client: " << cpp_strerror(r) << dendl;
1757 return r;
1758 }
1759
1760 librbd::journal::ClientData client_data;
1761 auto bl_it = client.data.cbegin();
1762 try {
1763 decode(client_data, bl_it);
1764 } catch (const buffer::error &err) {
1765 lderr(cct) << this << " " << __func__ << ": "
1766 << "failed to decode client data: " << err.what() << dendl;
1767 return -EINVAL;
1768 }
1769
1770 journal::ImageClientMeta *image_client_meta =
1771 boost::get<journal::ImageClientMeta>(&client_data.client_meta);
1772 if (image_client_meta == nullptr) {
1773 lderr(cct) << this << " " << __func__ << ": "
1774 << "failed to access image client meta struct" << dendl;
1775 return -EINVAL;
1776 }
1777
1778 *do_resync = image_client_meta->resync_requested;
1779
1780 return 0;
1781}
1782
1783struct C_RefreshTags : public Context {
1784 AsyncOpTracker &async_op_tracker;
1785 Context *on_finish = nullptr;
1786
1787 ceph::mutex lock =
1788 ceph::make_mutex("librbd::Journal::C_RefreshTags::lock");
1789 uint64_t tag_tid = 0;
1790 journal::TagData tag_data;
1791
1792 explicit C_RefreshTags(AsyncOpTracker &async_op_tracker)
1793 : async_op_tracker(async_op_tracker) {
1794 async_op_tracker.start_op();
1795 }
1796 ~C_RefreshTags() override {
1797 async_op_tracker.finish_op();
1798 }
1799
1800 void finish(int r) override {
1801 on_finish->complete(r);
1802 }
1803};
1804
1805template <typename I>
1806void Journal<I>::handle_metadata_updated() {
1807 CephContext *cct = m_image_ctx.cct;
1808 std::lock_guard locker{m_lock};
1809
1810 if (m_state != STATE_READY && !is_journal_replaying(m_lock)) {
1811 return;
1812 } else if (is_tag_owner(m_lock)) {
1813 ldout(cct, 20) << this << " " << __func__ << ": primary image" << dendl;
1814 return;
1815 } else if (m_listeners.empty()) {
1816 ldout(cct, 20) << this << " " << __func__ << ": no listeners" << dendl;
1817 return;
1818 }
1819
1820 uint64_t refresh_sequence = ++m_refresh_sequence;
1821 ldout(cct, 20) << this << " " << __func__ << ": "
1822 << "refresh_sequence=" << refresh_sequence << dendl;
1823
1824 // pull the most recent tags from the journal, decode, and
1825 // update the internal tag state
1826 C_RefreshTags *refresh_ctx = new C_RefreshTags(m_async_journal_op_tracker);
1827 refresh_ctx->on_finish = new LambdaContext(
1828 [this, refresh_sequence, refresh_ctx](int r) {
1829 handle_refresh_metadata(refresh_sequence, refresh_ctx->tag_tid,
1830 refresh_ctx->tag_data, r);
1831 });
1832 C_DecodeTags *decode_tags_ctx = new C_DecodeTags(
1833 cct, &refresh_ctx->lock, &refresh_ctx->tag_tid,
1834 &refresh_ctx->tag_data, refresh_ctx);
1835 m_journaler->get_tags(m_tag_tid == 0 ? 0 : m_tag_tid - 1, m_tag_class,
1836 &decode_tags_ctx->tags, decode_tags_ctx);
1837}
1838
1839template <typename I>
1840void Journal<I>::handle_refresh_metadata(uint64_t refresh_sequence,
1841 uint64_t tag_tid,
1842 journal::TagData tag_data, int r) {
1843 CephContext *cct = m_image_ctx.cct;
1844 std::unique_lock locker{m_lock};
1845
1846 if (r < 0) {
1847 lderr(cct) << this << " " << __func__ << ": failed to refresh metadata: "
1848 << cpp_strerror(r) << dendl;
1849 return;
1850 } else if (m_state != STATE_READY && !is_journal_replaying(m_lock)) {
1851 return;
1852 } else if (refresh_sequence != m_refresh_sequence) {
1853 // another, more up-to-date refresh is in-flight
1854 return;
1855 }
1856
1857 ldout(cct, 20) << this << " " << __func__ << ": "
1858 << "refresh_sequence=" << refresh_sequence << ", "
1859 << "tag_tid=" << tag_tid << ", "
1860 << "tag_data=" << tag_data << dendl;
1861 m_listener_cond.wait(locker, [this] { return !m_listener_notify; });
1862
1863 bool was_tag_owner = is_tag_owner(m_lock);
1864 if (m_tag_tid < tag_tid) {
1865 m_tag_tid = tag_tid;
1866 m_tag_data = tag_data;
1867 }
1868 bool promoted_to_primary = (!was_tag_owner && is_tag_owner(m_lock));
1869
1870 bool resync_requested = false;
1871 r = check_resync_requested(&resync_requested);
1872 if (r < 0) {
1873 lderr(cct) << this << " " << __func__ << ": "
1874 << "failed to check if a resync was requested" << dendl;
1875 return;
1876 }
1877
1878 Listeners listeners(m_listeners);
1879 m_listener_notify = true;
1880 locker.unlock();
1881
1882 if (promoted_to_primary) {
1883 for (auto listener : listeners) {
1884 listener->handle_promoted();
1885 }
1886 } else if (resync_requested) {
1887 for (auto listener : listeners) {
1888 listener->handle_resync();
1889 }
1890 }
1891
1892 locker.lock();
1893 m_listener_notify = false;
1894 m_listener_cond.notify_all();
1895}
1896
1897template <typename I>
1898void Journal<I>::add_listener(journal::Listener *listener) {
1899 std::lock_guard locker{m_lock};
1900 m_listeners.insert(listener);
1901}
1902
1903template <typename I>
1904void Journal<I>::remove_listener(journal::Listener *listener) {
1905 std::unique_lock locker{m_lock};
1906 m_listener_cond.wait(locker, [this] { return !m_listener_notify; });
1907 m_listeners.erase(listener);
1908}
1909
1910} // namespace librbd
1911
1912#ifndef TEST_F
1913template class librbd::Journal<librbd::ImageCtx>;
1914#endif