]> git.proxmox.com Git - ceph.git/blob - ceph/src/librbd/Journal.cc
0cd38b22adaa9ee29b0f3bc17f82ebbc0c3e3e06
[ceph.git] / ceph / src / librbd / Journal.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "librbd/Journal.h"
5 #include "include/rados/librados.hpp"
6 #include "common/AsyncOpTracker.h"
7 #include "common/errno.h"
8 #include "common/Timer.h"
9 #include "common/WorkQueue.h"
10 #include "cls/journal/cls_journal_types.h"
11 #include "journal/Journaler.h"
12 #include "journal/Policy.h"
13 #include "journal/ReplayEntry.h"
14 #include "journal/Settings.h"
15 #include "journal/Utils.h"
16 #include "librbd/ImageCtx.h"
17 #include "librbd/asio/ContextWQ.h"
18 #include "librbd/io/ObjectDispatchSpec.h"
19 #include "librbd/io/ObjectDispatcherInterface.h"
20 #include "librbd/journal/CreateRequest.h"
21 #include "librbd/journal/DemoteRequest.h"
22 #include "librbd/journal/ObjectDispatch.h"
23 #include "librbd/journal/OpenRequest.h"
24 #include "librbd/journal/RemoveRequest.h"
25 #include "librbd/journal/ResetRequest.h"
26 #include "librbd/journal/Replay.h"
27 #include "librbd/journal/PromoteRequest.h"
28
29 #include <boost/scope_exit.hpp>
30 #include <utility>
31
32 #define dout_subsys ceph_subsys_rbd
33 #undef dout_prefix
34 #define dout_prefix *_dout << "librbd::Journal: "
35
36 namespace librbd {
37
38 using util::create_async_context_callback;
39 using util::create_context_callback;
40 using journal::util::C_DecodeTag;
41 using journal::util::C_DecodeTags;
42
43 namespace {
44
45 // TODO: once journaler is 100% async and converted to ASIO, remove separate
46 // threads and reuse librbd's AsioEngine
47 class ThreadPoolSingleton : public ThreadPool {
48 public:
49 ContextWQ *work_queue;
50
51 explicit ThreadPoolSingleton(CephContext *cct)
52 : ThreadPool(cct, "librbd::Journal", "tp_librbd_journ", 1),
53 work_queue(new ContextWQ("librbd::journal::work_queue",
54 ceph::make_timespan(
55 cct->_conf.get_val<uint64_t>("rbd_op_thread_timeout")),
56 this)) {
57 start();
58 }
59 ~ThreadPoolSingleton() override {
60 work_queue->drain();
61 delete work_queue;
62
63 stop();
64 }
65 };
66
67 template <typename I>
68 struct C_IsTagOwner : public Context {
69 librados::IoCtx &io_ctx;
70 std::string image_id;
71 bool *is_tag_owner;
72 asio::ContextWQ *op_work_queue;
73 Context *on_finish;
74
75 CephContext *cct = nullptr;
76 Journaler *journaler;
77 cls::journal::Client client;
78 journal::ImageClientMeta client_meta;
79 uint64_t tag_tid = 0;
80 journal::TagData tag_data;
81
82 C_IsTagOwner(librados::IoCtx &io_ctx, const std::string &image_id,
83 bool *is_tag_owner, asio::ContextWQ *op_work_queue,
84 Context *on_finish)
85 : io_ctx(io_ctx), image_id(image_id), is_tag_owner(is_tag_owner),
86 op_work_queue(op_work_queue), on_finish(on_finish),
87 cct(reinterpret_cast<CephContext*>(io_ctx.cct())),
88 journaler(new Journaler(io_ctx, image_id, Journal<>::IMAGE_CLIENT_ID,
89 {}, nullptr)) {
90 }
91
92 void finish(int r) override {
93 ldout(cct, 20) << this << " C_IsTagOwner::" << __func__ << ": r=" << r
94 << dendl;
95 if (r < 0) {
96 lderr(cct) << this << " C_IsTagOwner::" << __func__ << ": "
97 << "failed to get tag owner: " << cpp_strerror(r) << dendl;
98 } else {
99 *is_tag_owner = (tag_data.mirror_uuid == Journal<>::LOCAL_MIRROR_UUID);
100 }
101
102 Journaler *journaler = this->journaler;
103 Context *on_finish = this->on_finish;
104 auto ctx = new LambdaContext(
105 [journaler, on_finish](int r) {
106 on_finish->complete(r);
107 delete journaler;
108 });
109 op_work_queue->queue(ctx, r);
110 }
111 };
112
113 struct C_GetTagOwner : public Context {
114 std::string *mirror_uuid;
115 Context *on_finish;
116
117 Journaler journaler;
118 cls::journal::Client client;
119 journal::ImageClientMeta client_meta;
120 uint64_t tag_tid = 0;
121 journal::TagData tag_data;
122
123 C_GetTagOwner(librados::IoCtx &io_ctx, const std::string &image_id,
124 std::string *mirror_uuid, Context *on_finish)
125 : mirror_uuid(mirror_uuid), on_finish(on_finish),
126 journaler(io_ctx, image_id, Journal<>::IMAGE_CLIENT_ID, {}, nullptr) {
127 }
128
129 virtual void finish(int r) {
130 if (r >= 0) {
131 *mirror_uuid = tag_data.mirror_uuid;
132 }
133 on_finish->complete(r);
134 }
135 };
136
137 template <typename J>
138 struct GetTagsRequest {
139 CephContext *cct;
140 J *journaler;
141 cls::journal::Client *client;
142 journal::ImageClientMeta *client_meta;
143 uint64_t *tag_tid;
144 journal::TagData *tag_data;
145 Context *on_finish;
146
147 ceph::mutex lock = ceph::make_mutex("lock");
148
149 GetTagsRequest(CephContext *cct, J *journaler, cls::journal::Client *client,
150 journal::ImageClientMeta *client_meta, uint64_t *tag_tid,
151 journal::TagData *tag_data, Context *on_finish)
152 : cct(cct), journaler(journaler), client(client), client_meta(client_meta),
153 tag_tid(tag_tid), tag_data(tag_data), on_finish(on_finish) {
154 }
155
156 /**
157 * @verbatim
158 *
159 * <start>
160 * |
161 * v
162 * GET_CLIENT * * * * * * * * * * * *
163 * | *
164 * v *
165 * GET_TAGS * * * * * * * * * * * * * (error)
166 * | *
167 * v *
168 * <finish> * * * * * * * * * * * * *
169 *
170 * @endverbatim
171 */
172
173 void send() {
174 send_get_client();
175 }
176
177 void send_get_client() {
178 ldout(cct, 20) << __func__ << dendl;
179
180 auto ctx = new LambdaContext(
181 [this](int r) {
182 handle_get_client(r);
183 });
184 journaler->get_client(Journal<ImageCtx>::IMAGE_CLIENT_ID, client, ctx);
185 }
186
187 void handle_get_client(int r) {
188 ldout(cct, 20) << __func__ << ": r=" << r << dendl;
189
190 if (r < 0) {
191 complete(r);
192 return;
193 }
194
195 librbd::journal::ClientData client_data;
196 auto bl_it = client->data.cbegin();
197 try {
198 decode(client_data, bl_it);
199 } catch (const buffer::error &err) {
200 lderr(cct) << this << " OpenJournalerRequest::" << __func__ << ": "
201 << "failed to decode client data" << dendl;
202 complete(-EBADMSG);
203 return;
204 }
205
206 journal::ImageClientMeta *image_client_meta =
207 boost::get<journal::ImageClientMeta>(&client_data.client_meta);
208 if (image_client_meta == nullptr) {
209 lderr(cct) << this << " OpenJournalerRequest::" << __func__ << ": "
210 << "failed to get client meta" << dendl;
211 complete(-EINVAL);
212 return;
213 }
214 *client_meta = *image_client_meta;
215
216 send_get_tags();
217 }
218
219 void send_get_tags() {
220 ldout(cct, 20) << __func__ << dendl;
221
222 auto ctx = new LambdaContext(
223 [this](int r) {
224 handle_get_tags(r);
225 });
226 C_DecodeTags *tags_ctx = new C_DecodeTags(cct, &lock, tag_tid, tag_data,
227 ctx);
228 journaler->get_tags(client_meta->tag_class, &tags_ctx->tags, tags_ctx);
229 }
230
231 void handle_get_tags(int r) {
232 ldout(cct, 20) << __func__ << ": r=" << r << dendl;
233
234 complete(r);
235 }
236
237 void complete(int r) {
238 on_finish->complete(r);
239 delete this;
240 }
241 };
242
243 template <typename J>
244 void get_tags(CephContext *cct, J *journaler,
245 cls::journal::Client *client,
246 journal::ImageClientMeta *client_meta,
247 uint64_t *tag_tid, journal::TagData *tag_data,
248 Context *on_finish) {
249 ldout(cct, 20) << __func__ << dendl;
250
251 GetTagsRequest<J> *req =
252 new GetTagsRequest<J>(cct, journaler, client, client_meta, tag_tid,
253 tag_data, on_finish);
254 req->send();
255 }
256
257 template <typename J>
258 int allocate_journaler_tag(CephContext *cct, J *journaler,
259 uint64_t tag_class,
260 const journal::TagPredecessor &predecessor,
261 const std::string &mirror_uuid,
262 cls::journal::Tag *new_tag) {
263 journal::TagData tag_data;
264 tag_data.mirror_uuid = mirror_uuid;
265 tag_data.predecessor = predecessor;
266
267 bufferlist tag_bl;
268 encode(tag_data, tag_bl);
269
270 C_SaferCond allocate_tag_ctx;
271 journaler->allocate_tag(tag_class, tag_bl, new_tag, &allocate_tag_ctx);
272
273 int r = allocate_tag_ctx.wait();
274 if (r < 0) {
275 lderr(cct) << __func__ << ": "
276 << "failed to allocate tag: " << cpp_strerror(r) << dendl;
277 return r;
278 }
279 return 0;
280 }
281
282 } // anonymous namespace
283
284 // client id for local image
285 template <typename I>
286 const std::string Journal<I>::IMAGE_CLIENT_ID("");
287
288 // mirror uuid to use for local images
289 template <typename I>
290 const std::string Journal<I>::LOCAL_MIRROR_UUID("");
291
292 // mirror uuid to use for orphaned (demoted) images
293 template <typename I>
294 const std::string Journal<I>::ORPHAN_MIRROR_UUID("<orphan>");
295
296 template <typename I>
297 std::ostream &operator<<(std::ostream &os,
298 const typename Journal<I>::State &state) {
299 switch (state) {
300 case Journal<I>::STATE_UNINITIALIZED:
301 os << "Uninitialized";
302 break;
303 case Journal<I>::STATE_INITIALIZING:
304 os << "Initializing";
305 break;
306 case Journal<I>::STATE_REPLAYING:
307 os << "Replaying";
308 break;
309 case Journal<I>::STATE_FLUSHING_RESTART:
310 os << "FlushingRestart";
311 break;
312 case Journal<I>::STATE_RESTARTING_REPLAY:
313 os << "RestartingReplay";
314 break;
315 case Journal<I>::STATE_FLUSHING_REPLAY:
316 os << "FlushingReplay";
317 break;
318 case Journal<I>::STATE_READY:
319 os << "Ready";
320 break;
321 case Journal<I>::STATE_STOPPING:
322 os << "Stopping";
323 break;
324 case Journal<I>::STATE_CLOSING:
325 os << "Closing";
326 break;
327 case Journal<I>::STATE_CLOSED:
328 os << "Closed";
329 break;
330 default:
331 os << "Unknown (" << static_cast<uint32_t>(state) << ")";
332 break;
333 }
334 return os;
335 }
336
337
338 template <typename I>
339 void Journal<I>::MetadataListener::handle_update(::journal::JournalMetadata *) {
340 auto ctx = new LambdaContext([this](int r) {
341 journal->handle_metadata_updated();
342 });
343 journal->m_work_queue->queue(ctx, 0);
344 }
345
346
347 template <typename I>
348 void Journal<I>::get_work_queue(CephContext *cct, ContextWQ **work_queue) {
349 auto thread_pool_singleton =
350 &cct->lookup_or_create_singleton_object<ThreadPoolSingleton>(
351 "librbd::journal::thread_pool", false, cct);
352 *work_queue = thread_pool_singleton->work_queue;
353 }
354
355 template <typename I>
356 Journal<I>::Journal(I &image_ctx)
357 : RefCountedObject(image_ctx.cct),
358 m_image_ctx(image_ctx), m_journaler(NULL),
359 m_state(STATE_UNINITIALIZED),
360 m_error_result(0), m_replay_handler(this), m_close_pending(false),
361 m_event_tid(0),
362 m_blocking_writes(false), m_journal_replay(NULL),
363 m_metadata_listener(this) {
364
365 CephContext *cct = m_image_ctx.cct;
366 ldout(cct, 5) << this << ": ictx=" << &m_image_ctx << dendl;
367
368 get_work_queue(cct, &m_work_queue);
369 ImageCtx::get_timer_instance(cct, &m_timer, &m_timer_lock);
370 }
371
372 template <typename I>
373 Journal<I>::~Journal() {
374 if (m_work_queue != nullptr) {
375 m_work_queue->drain();
376 }
377
378 std::lock_guard locker{m_lock};
379 ceph_assert(m_state == STATE_UNINITIALIZED || m_state == STATE_CLOSED);
380 ceph_assert(m_journaler == NULL);
381 ceph_assert(m_journal_replay == NULL);
382 ceph_assert(m_wait_for_state_contexts.empty());
383 }
384
385 template <typename I>
386 bool Journal<I>::is_journal_supported(I &image_ctx) {
387 ceph_assert(ceph_mutex_is_locked(image_ctx.image_lock));
388 return ((image_ctx.features & RBD_FEATURE_JOURNALING) &&
389 !image_ctx.read_only && image_ctx.snap_id == CEPH_NOSNAP);
390 }
391
392 template <typename I>
393 int Journal<I>::create(librados::IoCtx &io_ctx, const std::string &image_id,
394 uint8_t order, uint8_t splay_width,
395 const std::string &object_pool) {
396 CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
397 ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
398
399 ContextWQ *work_queue;
400 get_work_queue(cct, &work_queue);
401
402 C_SaferCond cond;
403 journal::TagData tag_data(LOCAL_MIRROR_UUID);
404 journal::CreateRequest<I> *req = journal::CreateRequest<I>::create(
405 io_ctx, image_id, order, splay_width, object_pool, cls::journal::Tag::TAG_CLASS_NEW,
406 tag_data, IMAGE_CLIENT_ID, work_queue, &cond);
407 req->send();
408
409 return cond.wait();
410 }
411
412 template <typename I>
413 int Journal<I>::remove(librados::IoCtx &io_ctx, const std::string &image_id) {
414 CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
415 ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
416
417 ContextWQ *work_queue;
418 get_work_queue(cct, &work_queue);
419
420 C_SaferCond cond;
421 journal::RemoveRequest<I> *req = journal::RemoveRequest<I>::create(
422 io_ctx, image_id, IMAGE_CLIENT_ID, work_queue, &cond);
423 req->send();
424
425 return cond.wait();
426 }
427
428 template <typename I>
429 int Journal<I>::reset(librados::IoCtx &io_ctx, const std::string &image_id) {
430 CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
431 ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
432
433 ContextWQ *work_queue;
434 get_work_queue(cct, &work_queue);
435
436 C_SaferCond cond;
437 auto req = journal::ResetRequest<I>::create(io_ctx, image_id, IMAGE_CLIENT_ID,
438 Journal<>::LOCAL_MIRROR_UUID,
439 work_queue, &cond);
440 req->send();
441
442 return cond.wait();
443 }
444
445 template <typename I>
446 void Journal<I>::is_tag_owner(I *image_ctx, bool *owner,
447 Context *on_finish) {
448 Journal<I>::is_tag_owner(image_ctx->md_ctx, image_ctx->id, owner,
449 image_ctx->op_work_queue, on_finish);
450 }
451
452 template <typename I>
453 void Journal<I>::is_tag_owner(librados::IoCtx& io_ctx, std::string& image_id,
454 bool *is_tag_owner,
455 asio::ContextWQ *op_work_queue,
456 Context *on_finish) {
457 CephContext *cct = reinterpret_cast<CephContext*>(io_ctx.cct());
458 ldout(cct, 20) << __func__ << dendl;
459
460 C_IsTagOwner<I> *is_tag_owner_ctx = new C_IsTagOwner<I>(
461 io_ctx, image_id, is_tag_owner, op_work_queue, on_finish);
462 get_tags(cct, is_tag_owner_ctx->journaler, &is_tag_owner_ctx->client,
463 &is_tag_owner_ctx->client_meta, &is_tag_owner_ctx->tag_tid,
464 &is_tag_owner_ctx->tag_data, is_tag_owner_ctx);
465 }
466
467 template <typename I>
468 void Journal<I>::get_tag_owner(IoCtx& io_ctx, std::string& image_id,
469 std::string *mirror_uuid,
470 asio::ContextWQ *op_work_queue,
471 Context *on_finish) {
472 CephContext *cct = static_cast<CephContext *>(io_ctx.cct());
473 ldout(cct, 20) << __func__ << dendl;
474
475 auto ctx = new C_GetTagOwner(io_ctx, image_id, mirror_uuid, on_finish);
476 get_tags(cct, &ctx->journaler, &ctx->client, &ctx->client_meta, &ctx->tag_tid,
477 &ctx->tag_data, create_async_context_callback(op_work_queue, ctx));
478 }
479
480 template <typename I>
481 int Journal<I>::request_resync(I *image_ctx) {
482 CephContext *cct = image_ctx->cct;
483 ldout(cct, 20) << __func__ << dendl;
484
485 Journaler journaler(image_ctx->md_ctx, image_ctx->id, IMAGE_CLIENT_ID, {},
486 nullptr);
487
488 ceph::mutex lock = ceph::make_mutex("lock");
489 journal::ImageClientMeta client_meta;
490 uint64_t tag_tid;
491 journal::TagData tag_data;
492
493 C_SaferCond open_ctx;
494 auto open_req = journal::OpenRequest<I>::create(image_ctx, &journaler, &lock,
495 &client_meta, &tag_tid,
496 &tag_data, &open_ctx);
497 open_req->send();
498
499 BOOST_SCOPE_EXIT_ALL(&journaler) {
500 journaler.shut_down();
501 };
502
503 int r = open_ctx.wait();
504 if (r < 0) {
505 return r;
506 }
507
508 client_meta.resync_requested = true;
509
510 journal::ClientData client_data(client_meta);
511 bufferlist client_data_bl;
512 encode(client_data, client_data_bl);
513
514 C_SaferCond update_client_ctx;
515 journaler.update_client(client_data_bl, &update_client_ctx);
516
517 r = update_client_ctx.wait();
518 if (r < 0) {
519 lderr(cct) << __func__ << ": "
520 << "failed to update client: " << cpp_strerror(r) << dendl;
521 return r;
522 }
523 return 0;
524 }
525
526 template <typename I>
527 void Journal<I>::promote(I *image_ctx, Context *on_finish) {
528 CephContext *cct = image_ctx->cct;
529 ldout(cct, 20) << __func__ << dendl;
530
531 auto promote_req = journal::PromoteRequest<I>::create(image_ctx, false,
532 on_finish);
533 promote_req->send();
534 }
535
536 template <typename I>
537 void Journal<I>::demote(I *image_ctx, Context *on_finish) {
538 CephContext *cct = image_ctx->cct;
539 ldout(cct, 20) << __func__ << dendl;
540
541 auto req = journal::DemoteRequest<I>::create(*image_ctx, on_finish);
542 req->send();
543 }
544
545 template <typename I>
546 bool Journal<I>::is_journal_ready() const {
547 std::lock_guard locker{m_lock};
548 return (m_state == STATE_READY);
549 }
550
551 template <typename I>
552 bool Journal<I>::is_journal_replaying() const {
553 std::lock_guard locker{m_lock};
554 return is_journal_replaying(m_lock);
555 }
556
557 template <typename I>
558 bool Journal<I>::is_journal_replaying(const ceph::mutex &) const {
559 ceph_assert(ceph_mutex_is_locked(m_lock));
560 return (m_state == STATE_REPLAYING ||
561 m_state == STATE_FLUSHING_REPLAY ||
562 m_state == STATE_FLUSHING_RESTART ||
563 m_state == STATE_RESTARTING_REPLAY);
564 }
565
566 template <typename I>
567 bool Journal<I>::is_journal_appending() const {
568 ceph_assert(ceph_mutex_is_locked(m_image_ctx.image_lock));
569 std::lock_guard locker{m_lock};
570 return (m_state == STATE_READY &&
571 !m_image_ctx.get_journal_policy()->append_disabled());
572 }
573
574 template <typename I>
575 void Journal<I>::wait_for_journal_ready(Context *on_ready) {
576 on_ready = create_async_context_callback(m_image_ctx, on_ready);
577
578 std::lock_guard locker{m_lock};
579 if (m_state == STATE_READY) {
580 on_ready->complete(m_error_result);
581 } else {
582 wait_for_steady_state(on_ready);
583 }
584 }
585
586 template <typename I>
587 void Journal<I>::open(Context *on_finish) {
588 CephContext *cct = m_image_ctx.cct;
589 ldout(cct, 20) << this << " " << __func__ << dendl;
590
591 on_finish = create_context_callback<Context>(on_finish, this);
592
593 on_finish = create_async_context_callback(m_image_ctx, on_finish);
594
595 // inject our handler into the object dispatcher chain
596 m_image_ctx.io_object_dispatcher->register_dispatch(
597 journal::ObjectDispatch<I>::create(&m_image_ctx, this));
598
599 std::lock_guard locker{m_lock};
600 ceph_assert(m_state == STATE_UNINITIALIZED);
601 wait_for_steady_state(on_finish);
602 create_journaler();
603 }
604
605 template <typename I>
606 void Journal<I>::close(Context *on_finish) {
607 CephContext *cct = m_image_ctx.cct;
608 ldout(cct, 20) << this << " " << __func__ << dendl;
609
610 on_finish = create_context_callback<Context>(on_finish, this);
611
612 on_finish = new LambdaContext([this, on_finish](int r) {
613 // remove our handler from object dispatcher chain - preserve error
614 auto ctx = new LambdaContext([on_finish, r](int _) {
615 on_finish->complete(r);
616 });
617 m_image_ctx.io_object_dispatcher->shut_down_dispatch(
618 io::OBJECT_DISPATCH_LAYER_JOURNAL, ctx);
619 });
620 on_finish = create_async_context_callback(m_image_ctx, on_finish);
621
622 std::unique_lock locker{m_lock};
623 m_listener_cond.wait(locker, [this] { return !m_listener_notify; });
624
625 Listeners listeners(m_listeners);
626 m_listener_notify = true;
627 locker.unlock();
628 for (auto listener : listeners) {
629 listener->handle_close();
630 }
631
632 locker.lock();
633 m_listener_notify = false;
634 m_listener_cond.notify_all();
635
636 ceph_assert(m_state != STATE_UNINITIALIZED);
637 if (m_state == STATE_CLOSED) {
638 on_finish->complete(m_error_result);
639 return;
640 }
641
642 if (m_state == STATE_READY) {
643 stop_recording();
644 }
645
646 m_close_pending = true;
647 wait_for_steady_state(on_finish);
648 }
649
650 template <typename I>
651 bool Journal<I>::is_tag_owner() const {
652 std::lock_guard locker{m_lock};
653 return is_tag_owner(m_lock);
654 }
655
656 template <typename I>
657 bool Journal<I>::is_tag_owner(const ceph::mutex &) const {
658 ceph_assert(ceph_mutex_is_locked(m_lock));
659 return (m_tag_data.mirror_uuid == LOCAL_MIRROR_UUID);
660 }
661
662 template <typename I>
663 uint64_t Journal<I>::get_tag_tid() const {
664 std::lock_guard locker{m_lock};
665 return m_tag_tid;
666 }
667
668 template <typename I>
669 journal::TagData Journal<I>::get_tag_data() const {
670 std::lock_guard locker{m_lock};
671 return m_tag_data;
672 }
673
674 template <typename I>
675 void Journal<I>::allocate_local_tag(Context *on_finish) {
676 CephContext *cct = m_image_ctx.cct;
677 ldout(cct, 20) << this << " " << __func__ << dendl;
678
679 journal::TagPredecessor predecessor;
680 predecessor.mirror_uuid = LOCAL_MIRROR_UUID;
681 {
682 std::lock_guard locker{m_lock};
683 ceph_assert(m_journaler != nullptr && is_tag_owner(m_lock));
684
685 cls::journal::Client client;
686 int r = m_journaler->get_cached_client(IMAGE_CLIENT_ID, &client);
687 if (r < 0) {
688 lderr(cct) << this << " " << __func__ << ": "
689 << "failed to retrieve client: " << cpp_strerror(r) << dendl;
690 m_image_ctx.op_work_queue->queue(on_finish, r);
691 return;
692 }
693
694 // since we are primary, populate the predecessor with our known commit
695 // position
696 ceph_assert(m_tag_data.mirror_uuid == LOCAL_MIRROR_UUID);
697 if (!client.commit_position.object_positions.empty()) {
698 auto position = client.commit_position.object_positions.front();
699 predecessor.commit_valid = true;
700 predecessor.tag_tid = position.tag_tid;
701 predecessor.entry_tid = position.entry_tid;
702 }
703 }
704
705 allocate_tag(LOCAL_MIRROR_UUID, predecessor, on_finish);
706 }
707
708 template <typename I>
709 void Journal<I>::allocate_tag(const std::string &mirror_uuid,
710 const journal::TagPredecessor &predecessor,
711 Context *on_finish) {
712 CephContext *cct = m_image_ctx.cct;
713 ldout(cct, 20) << this << " " << __func__ << ": mirror_uuid=" << mirror_uuid
714 << dendl;
715
716 std::lock_guard locker{m_lock};
717 ceph_assert(m_journaler != nullptr);
718
719 journal::TagData tag_data;
720 tag_data.mirror_uuid = mirror_uuid;
721 tag_data.predecessor = predecessor;
722
723 bufferlist tag_bl;
724 encode(tag_data, tag_bl);
725
726 C_DecodeTag *decode_tag_ctx = new C_DecodeTag(cct, &m_lock, &m_tag_tid,
727 &m_tag_data, on_finish);
728 m_journaler->allocate_tag(m_tag_class, tag_bl, &decode_tag_ctx->tag,
729 decode_tag_ctx);
730 }
731
732 template <typename I>
733 void Journal<I>::flush_commit_position(Context *on_finish) {
734 CephContext *cct = m_image_ctx.cct;
735 ldout(cct, 20) << this << " " << __func__ << dendl;
736
737 std::lock_guard locker{m_lock};
738 ceph_assert(m_journaler != nullptr);
739 m_journaler->flush_commit_position(on_finish);
740 }
741
742 template <typename I>
743 void Journal<I>::user_flushed() {
744 if (m_state == STATE_READY && !m_user_flushed.exchange(true) &&
745 m_image_ctx.config.template get_val<bool>("rbd_journal_object_writethrough_until_flush")) {
746 std::lock_guard locker{m_lock};
747 if (m_state == STATE_READY) {
748 CephContext *cct = m_image_ctx.cct;
749 ldout(cct, 5) << this << " " << __func__ << dendl;
750
751 ceph_assert(m_journaler != nullptr);
752 m_journaler->set_append_batch_options(
753 m_image_ctx.config.template get_val<uint64_t>("rbd_journal_object_flush_interval"),
754 m_image_ctx.config.template get_val<Option::size_t>("rbd_journal_object_flush_bytes"),
755 m_image_ctx.config.template get_val<double>("rbd_journal_object_flush_age"));
756 } else {
757 m_user_flushed = false;
758 }
759 }
760 }
761
762 template <typename I>
763 uint64_t Journal<I>::append_write_event(uint64_t offset, size_t length,
764 const bufferlist &bl,
765 bool flush_entry) {
766 ceph_assert(m_max_append_size > journal::AioWriteEvent::get_fixed_size());
767 uint64_t max_write_data_size =
768 m_max_append_size - journal::AioWriteEvent::get_fixed_size();
769
770 // ensure that the write event fits within the journal entry
771 Bufferlists bufferlists;
772 uint64_t bytes_remaining = length;
773 uint64_t event_offset = 0;
774 do {
775 uint64_t event_length = std::min(bytes_remaining, max_write_data_size);
776
777 bufferlist event_bl;
778 event_bl.substr_of(bl, event_offset, event_length);
779 journal::EventEntry event_entry(journal::AioWriteEvent(offset + event_offset,
780 event_length,
781 event_bl),
782 ceph_clock_now());
783
784 bufferlists.emplace_back();
785 encode(event_entry, bufferlists.back());
786
787 event_offset += event_length;
788 bytes_remaining -= event_length;
789 } while (bytes_remaining > 0);
790
791 return append_io_events(journal::EVENT_TYPE_AIO_WRITE, bufferlists, offset,
792 length, flush_entry, 0);
793 }
794
795 template <typename I>
796 uint64_t Journal<I>::append_io_event(journal::EventEntry &&event_entry,
797 uint64_t offset, size_t length,
798 bool flush_entry, int filter_ret_val) {
799 bufferlist bl;
800 event_entry.timestamp = ceph_clock_now();
801 encode(event_entry, bl);
802 return append_io_events(event_entry.get_event_type(), {bl}, offset, length,
803 flush_entry, filter_ret_val);
804 }
805
806 template <typename I>
807 uint64_t Journal<I>::append_io_events(journal::EventType event_type,
808 const Bufferlists &bufferlists,
809 uint64_t offset, size_t length,
810 bool flush_entry, int filter_ret_val) {
811 ceph_assert(!bufferlists.empty());
812
813 uint64_t tid;
814 {
815 std::lock_guard locker{m_lock};
816 ceph_assert(m_state == STATE_READY);
817
818 tid = ++m_event_tid;
819 ceph_assert(tid != 0);
820 }
821
822 Futures futures;
823 for (auto &bl : bufferlists) {
824 ceph_assert(bl.length() <= m_max_append_size);
825 futures.push_back(m_journaler->append(m_tag_tid, bl));
826 }
827
828 {
829 std::lock_guard event_locker{m_event_lock};
830 m_events[tid] = Event(futures, offset, length, filter_ret_val);
831 }
832
833 CephContext *cct = m_image_ctx.cct;
834 ldout(cct, 20) << this << " " << __func__ << ": "
835 << "event=" << event_type << ", "
836 << "offset=" << offset << ", "
837 << "length=" << length << ", "
838 << "flush=" << flush_entry << ", tid=" << tid << dendl;
839
840 Context *on_safe = create_async_context_callback(
841 m_image_ctx, new C_IOEventSafe(this, tid));
842 if (flush_entry) {
843 futures.back().flush(on_safe);
844 } else {
845 futures.back().wait(on_safe);
846 }
847
848 return tid;
849 }
850
851 template <typename I>
852 void Journal<I>::commit_io_event(uint64_t tid, int r) {
853 CephContext *cct = m_image_ctx.cct;
854 ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
855 "r=" << r << dendl;
856
857 std::lock_guard event_locker{m_event_lock};
858 typename Events::iterator it = m_events.find(tid);
859 if (it == m_events.end()) {
860 return;
861 }
862 complete_event(it, r);
863 }
864
865 template <typename I>
866 void Journal<I>::commit_io_event_extent(uint64_t tid, uint64_t offset,
867 uint64_t length, int r) {
868 ceph_assert(length > 0);
869
870 CephContext *cct = m_image_ctx.cct;
871 ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
872 << "offset=" << offset << ", "
873 << "length=" << length << ", "
874 << "r=" << r << dendl;
875
876 std::lock_guard event_locker{m_event_lock};
877 typename Events::iterator it = m_events.find(tid);
878 if (it == m_events.end()) {
879 return;
880 }
881
882 Event &event = it->second;
883 if (event.ret_val == 0 && r < 0) {
884 event.ret_val = r;
885 }
886
887 ExtentInterval extent;
888 extent.insert(offset, length);
889
890 ExtentInterval intersect;
891 intersect.intersection_of(extent, event.pending_extents);
892
893 event.pending_extents.subtract(intersect);
894 if (!event.pending_extents.empty()) {
895 ldout(cct, 20) << this << " " << __func__ << ": "
896 << "pending extents: " << event.pending_extents << dendl;
897 return;
898 }
899 complete_event(it, event.ret_val);
900 }
901
902 template <typename I>
903 void Journal<I>::append_op_event(uint64_t op_tid,
904 journal::EventEntry &&event_entry,
905 Context *on_safe) {
906 ceph_assert(ceph_mutex_is_locked(m_image_ctx.owner_lock));
907
908 bufferlist bl;
909 event_entry.timestamp = ceph_clock_now();
910 encode(event_entry, bl);
911
912 Future future;
913 {
914 std::lock_guard locker{m_lock};
915 ceph_assert(m_state == STATE_READY);
916
917 future = m_journaler->append(m_tag_tid, bl);
918
919 // delay committing op event to ensure consistent replay
920 ceph_assert(m_op_futures.count(op_tid) == 0);
921 m_op_futures[op_tid] = future;
922 }
923
924 on_safe = create_async_context_callback(m_image_ctx, on_safe);
925 on_safe = new LambdaContext([this, on_safe](int r) {
926 // ensure all committed IO before this op is committed
927 m_journaler->flush_commit_position(on_safe);
928 });
929 future.flush(on_safe);
930
931 CephContext *cct = m_image_ctx.cct;
932 ldout(cct, 10) << this << " " << __func__ << ": "
933 << "op_tid=" << op_tid << ", "
934 << "event=" << event_entry.get_event_type() << dendl;
935 }
936
937 template <typename I>
938 void Journal<I>::commit_op_event(uint64_t op_tid, int r, Context *on_safe) {
939 CephContext *cct = m_image_ctx.cct;
940 ldout(cct, 10) << this << " " << __func__ << ": op_tid=" << op_tid << ", "
941 << "r=" << r << dendl;
942
943 journal::EventEntry event_entry((journal::OpFinishEvent(op_tid, r)),
944 ceph_clock_now());
945
946 bufferlist bl;
947 encode(event_entry, bl);
948
949 Future op_start_future;
950 Future op_finish_future;
951 {
952 std::lock_guard locker{m_lock};
953 ceph_assert(m_state == STATE_READY);
954
955 // ready to commit op event
956 auto it = m_op_futures.find(op_tid);
957 ceph_assert(it != m_op_futures.end());
958 op_start_future = it->second;
959 m_op_futures.erase(it);
960
961 op_finish_future = m_journaler->append(m_tag_tid, bl);
962 }
963
964 op_finish_future.flush(create_async_context_callback(
965 m_image_ctx, new C_OpEventSafe(this, op_tid, op_start_future,
966 op_finish_future, on_safe)));
967 }
968
969 template <typename I>
970 void Journal<I>::replay_op_ready(uint64_t op_tid, Context *on_resume) {
971 CephContext *cct = m_image_ctx.cct;
972 ldout(cct, 10) << this << " " << __func__ << ": op_tid=" << op_tid << dendl;
973
974 {
975 std::lock_guard locker{m_lock};
976 ceph_assert(m_journal_replay != nullptr);
977 m_journal_replay->replay_op_ready(op_tid, on_resume);
978 }
979 }
980
981 template <typename I>
982 void Journal<I>::flush_event(uint64_t tid, Context *on_safe) {
983 CephContext *cct = m_image_ctx.cct;
984 ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
985 << "on_safe=" << on_safe << dendl;
986
987 on_safe = create_context_callback<Context>(on_safe, this);
988
989 Future future;
990 {
991 std::lock_guard event_locker{m_event_lock};
992 future = wait_event(m_lock, tid, on_safe);
993 }
994
995 if (future.is_valid()) {
996 future.flush(nullptr);
997 }
998 }
999
1000 template <typename I>
1001 void Journal<I>::wait_event(uint64_t tid, Context *on_safe) {
1002 CephContext *cct = m_image_ctx.cct;
1003 ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
1004 << "on_safe=" << on_safe << dendl;
1005
1006 on_safe = create_context_callback<Context>(on_safe, this);
1007
1008 std::lock_guard event_locker{m_event_lock};
1009 wait_event(m_lock, tid, on_safe);
1010 }
1011
1012 template <typename I>
1013 typename Journal<I>::Future Journal<I>::wait_event(ceph::mutex &lock, uint64_t tid,
1014 Context *on_safe) {
1015 ceph_assert(ceph_mutex_is_locked(m_event_lock));
1016 CephContext *cct = m_image_ctx.cct;
1017
1018 typename Events::iterator it = m_events.find(tid);
1019 ceph_assert(it != m_events.end());
1020
1021 Event &event = it->second;
1022 if (event.safe) {
1023 // journal entry already safe
1024 ldout(cct, 20) << this << " " << __func__ << ": "
1025 << "journal entry already safe" << dendl;
1026 m_image_ctx.op_work_queue->queue(on_safe, event.ret_val);
1027 return Future();
1028 }
1029
1030 event.on_safe_contexts.push_back(create_async_context_callback(m_image_ctx,
1031 on_safe));
1032 return event.futures.back();
1033 }
1034
1035 template <typename I>
1036 void Journal<I>::start_external_replay(journal::Replay<I> **journal_replay,
1037 Context *on_start) {
1038 CephContext *cct = m_image_ctx.cct;
1039 ldout(cct, 20) << this << " " << __func__ << dendl;
1040
1041 std::lock_guard locker{m_lock};
1042 ceph_assert(m_state == STATE_READY);
1043 ceph_assert(m_journal_replay == nullptr);
1044
1045 on_start = util::create_async_context_callback(m_image_ctx, on_start);
1046 on_start = new LambdaContext(
1047 [this, journal_replay, on_start](int r) {
1048 handle_start_external_replay(r, journal_replay, on_start);
1049 });
1050
1051 // safely flush all in-flight events before starting external replay
1052 m_journaler->stop_append(util::create_async_context_callback(m_image_ctx,
1053 on_start));
1054 }
1055
1056 template <typename I>
1057 void Journal<I>::handle_start_external_replay(int r,
1058 journal::Replay<I> **journal_replay,
1059 Context *on_finish) {
1060 CephContext *cct = m_image_ctx.cct;
1061 ldout(cct, 20) << this << " " << __func__ << dendl;
1062
1063 std::lock_guard locker{m_lock};
1064 ceph_assert(m_state == STATE_READY);
1065 ceph_assert(m_journal_replay == nullptr);
1066
1067 if (r < 0) {
1068 lderr(cct) << this << " " << __func__ << ": "
1069 << "failed to stop recording: " << cpp_strerror(r) << dendl;
1070 *journal_replay = nullptr;
1071
1072 // get back to a sane-state
1073 start_append();
1074 on_finish->complete(r);
1075 return;
1076 }
1077
1078 transition_state(STATE_REPLAYING, 0);
1079 m_journal_replay = journal::Replay<I>::create(m_image_ctx);
1080 *journal_replay = m_journal_replay;
1081 on_finish->complete(0);
1082 }
1083
1084 template <typename I>
1085 void Journal<I>::stop_external_replay() {
1086 CephContext *cct = m_image_ctx.cct;
1087 ldout(cct, 20) << this << " " << __func__ << dendl;
1088
1089 std::lock_guard locker{m_lock};
1090 ceph_assert(m_journal_replay != nullptr);
1091 ceph_assert(m_state == STATE_REPLAYING);
1092
1093 delete m_journal_replay;
1094 m_journal_replay = nullptr;
1095
1096 if (m_close_pending) {
1097 destroy_journaler(0);
1098 return;
1099 }
1100
1101 start_append();
1102 }
1103
1104 template <typename I>
1105 void Journal<I>::create_journaler() {
1106 CephContext *cct = m_image_ctx.cct;
1107 ldout(cct, 20) << this << " " << __func__ << dendl;
1108
1109 ceph_assert(ceph_mutex_is_locked(m_lock));
1110 ceph_assert(m_state == STATE_UNINITIALIZED || m_state == STATE_RESTARTING_REPLAY);
1111 ceph_assert(m_journaler == NULL);
1112
1113 transition_state(STATE_INITIALIZING, 0);
1114 ::journal::Settings settings;
1115 settings.commit_interval =
1116 m_image_ctx.config.template get_val<double>("rbd_journal_commit_age");
1117 settings.max_payload_bytes =
1118 m_image_ctx.config.template get_val<Option::size_t>("rbd_journal_max_payload_bytes");
1119 settings.max_concurrent_object_sets =
1120 m_image_ctx.config.template get_val<uint64_t>("rbd_journal_max_concurrent_object_sets");
1121 // TODO: a configurable filter to exclude certain peers from being
1122 // disconnected.
1123 settings.ignored_laggy_clients = {IMAGE_CLIENT_ID};
1124
1125 m_journaler = new Journaler(m_work_queue, m_timer, m_timer_lock,
1126 m_image_ctx.md_ctx, m_image_ctx.id,
1127 IMAGE_CLIENT_ID, settings, nullptr);
1128 m_journaler->add_listener(&m_metadata_listener);
1129
1130 Context *ctx = create_async_context_callback(
1131 m_image_ctx, create_context_callback<
1132 Journal<I>, &Journal<I>::handle_open>(this));
1133 auto open_req = journal::OpenRequest<I>::create(&m_image_ctx, m_journaler,
1134 &m_lock, &m_client_meta,
1135 &m_tag_tid, &m_tag_data, ctx);
1136 open_req->send();
1137 }
1138
1139 template <typename I>
1140 void Journal<I>::destroy_journaler(int r) {
1141 CephContext *cct = m_image_ctx.cct;
1142 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1143
1144 ceph_assert(ceph_mutex_is_locked(m_lock));
1145
1146 delete m_journal_replay;
1147 m_journal_replay = NULL;
1148
1149 m_journaler->remove_listener(&m_metadata_listener);
1150
1151 transition_state(STATE_CLOSING, r);
1152
1153 Context *ctx = create_async_context_callback(
1154 m_image_ctx, create_context_callback<
1155 Journal<I>, &Journal<I>::handle_journal_destroyed>(this));
1156 ctx = new LambdaContext(
1157 [this, ctx](int r) {
1158 std::lock_guard locker{m_lock};
1159 m_journaler->shut_down(ctx);
1160 });
1161 ctx = create_async_context_callback(m_image_ctx, ctx);
1162 m_async_journal_op_tracker.wait_for_ops(ctx);
1163 }
1164
1165 template <typename I>
1166 void Journal<I>::recreate_journaler(int r) {
1167 CephContext *cct = m_image_ctx.cct;
1168 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1169
1170 ceph_assert(ceph_mutex_is_locked(m_lock));
1171 ceph_assert(m_state == STATE_FLUSHING_RESTART ||
1172 m_state == STATE_FLUSHING_REPLAY);
1173
1174 delete m_journal_replay;
1175 m_journal_replay = NULL;
1176
1177 m_journaler->remove_listener(&m_metadata_listener);
1178
1179 transition_state(STATE_RESTARTING_REPLAY, r);
1180 m_journaler->shut_down(create_async_context_callback(
1181 m_image_ctx, create_context_callback<
1182 Journal<I>, &Journal<I>::handle_journal_destroyed>(this)));
1183 }
1184
1185 template <typename I>
1186 void Journal<I>::complete_event(typename Events::iterator it, int r) {
1187 ceph_assert(ceph_mutex_is_locked(m_event_lock));
1188 ceph_assert(m_state == STATE_READY);
1189
1190 CephContext *cct = m_image_ctx.cct;
1191 ldout(cct, 20) << this << " " << __func__ << ": tid=" << it->first << " "
1192 << "r=" << r << dendl;
1193
1194 Event &event = it->second;
1195 if (r < 0 && r == event.filter_ret_val) {
1196 // ignore allowed error codes
1197 r = 0;
1198 }
1199 if (r < 0) {
1200 // event recorded to journal but failed to update disk, we cannot
1201 // commit this IO event. this event must be replayed.
1202 ceph_assert(event.safe);
1203 lderr(cct) << this << " " << __func__ << ": "
1204 << "failed to commit IO to disk, replay required: "
1205 << cpp_strerror(r) << dendl;
1206 }
1207
1208 event.committed_io = true;
1209 if (event.safe) {
1210 if (r >= 0) {
1211 for (auto &future : event.futures) {
1212 m_journaler->committed(future);
1213 }
1214 }
1215 m_events.erase(it);
1216 }
1217 }
1218
1219 template <typename I>
1220 void Journal<I>::start_append() {
1221 ceph_assert(ceph_mutex_is_locked(m_lock));
1222
1223 m_journaler->start_append(
1224 m_image_ctx.config.template get_val<uint64_t>("rbd_journal_object_max_in_flight_appends"));
1225 if (!m_image_ctx.config.template get_val<bool>("rbd_journal_object_writethrough_until_flush")) {
1226 m_journaler->set_append_batch_options(
1227 m_image_ctx.config.template get_val<uint64_t>("rbd_journal_object_flush_interval"),
1228 m_image_ctx.config.template get_val<Option::size_t>("rbd_journal_object_flush_bytes"),
1229 m_image_ctx.config.template get_val<double>("rbd_journal_object_flush_age"));
1230 }
1231
1232 transition_state(STATE_READY, 0);
1233 }
1234
1235 template <typename I>
1236 void Journal<I>::handle_open(int r) {
1237 CephContext *cct = m_image_ctx.cct;
1238 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1239
1240 std::lock_guard locker{m_lock};
1241 ceph_assert(m_state == STATE_INITIALIZING);
1242
1243 if (r < 0) {
1244 lderr(cct) << this << " " << __func__ << ": "
1245 << "failed to initialize journal: " << cpp_strerror(r)
1246 << dendl;
1247 destroy_journaler(r);
1248 return;
1249 }
1250
1251 m_tag_class = m_client_meta.tag_class;
1252 m_max_append_size = m_journaler->get_max_append_size();
1253 ldout(cct, 20) << this << " " << __func__ << ": "
1254 << "tag_class=" << m_tag_class << ", "
1255 << "max_append_size=" << m_max_append_size << dendl;
1256
1257 transition_state(STATE_REPLAYING, 0);
1258 m_journal_replay = journal::Replay<I>::create(m_image_ctx);
1259 m_journaler->start_replay(&m_replay_handler);
1260 }
1261
1262 template <typename I>
1263 void Journal<I>::handle_replay_ready() {
1264 CephContext *cct = m_image_ctx.cct;
1265 ReplayEntry replay_entry;
1266 {
1267 std::lock_guard locker{m_lock};
1268 if (m_state != STATE_REPLAYING) {
1269 return;
1270 }
1271
1272 ldout(cct, 20) << this << " " << __func__ << dendl;
1273 if (!m_journaler->try_pop_front(&replay_entry)) {
1274 return;
1275 }
1276
1277 // only one entry should be in-flight at a time
1278 ceph_assert(!m_processing_entry);
1279 m_processing_entry = true;
1280 }
1281
1282 m_async_journal_op_tracker.start_op();
1283
1284 bufferlist data = replay_entry.get_data();
1285 auto it = data.cbegin();
1286
1287 journal::EventEntry event_entry;
1288 int r = m_journal_replay->decode(&it, &event_entry);
1289 if (r < 0) {
1290 lderr(cct) << this << " " << __func__ << ": "
1291 << "failed to decode journal event entry" << dendl;
1292 handle_replay_process_safe(replay_entry, r);
1293 return;
1294 }
1295
1296 Context *on_ready = create_context_callback<
1297 Journal<I>, &Journal<I>::handle_replay_process_ready>(this);
1298 Context *on_commit = new C_ReplayProcessSafe(this, std::move(replay_entry));
1299 m_journal_replay->process(event_entry, on_ready, on_commit);
1300 }
1301
1302 template <typename I>
1303 void Journal<I>::handle_replay_complete(int r) {
1304 CephContext *cct = m_image_ctx.cct;
1305
1306 bool cancel_ops = false;
1307 {
1308 std::lock_guard locker{m_lock};
1309 if (m_state != STATE_REPLAYING) {
1310 return;
1311 }
1312
1313 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1314 if (r < 0) {
1315 cancel_ops = true;
1316 transition_state(STATE_FLUSHING_RESTART, r);
1317 } else {
1318 // state might change back to FLUSHING_RESTART on flush error
1319 transition_state(STATE_FLUSHING_REPLAY, 0);
1320 }
1321 }
1322
1323 Context *ctx = new LambdaContext([this, cct](int r) {
1324 ldout(cct, 20) << this << " handle_replay_complete: "
1325 << "handle shut down replay" << dendl;
1326
1327 State state;
1328 {
1329 std::lock_guard locker{m_lock};
1330 ceph_assert(m_state == STATE_FLUSHING_RESTART ||
1331 m_state == STATE_FLUSHING_REPLAY);
1332 state = m_state;
1333 }
1334
1335 if (state == STATE_FLUSHING_RESTART) {
1336 handle_flushing_restart(0);
1337 } else {
1338 handle_flushing_replay();
1339 }
1340 });
1341 ctx = new LambdaContext([this, ctx](int r) {
1342 // ensure the commit position is flushed to disk
1343 m_journaler->flush_commit_position(ctx);
1344 });
1345 ctx = create_async_context_callback(m_image_ctx, ctx);
1346 ctx = new LambdaContext([this, ctx](int r) {
1347 m_async_journal_op_tracker.wait_for_ops(ctx);
1348 });
1349 ctx = new LambdaContext([this, cct, cancel_ops, ctx](int r) {
1350 ldout(cct, 20) << this << " handle_replay_complete: "
1351 << "shut down replay" << dendl;
1352 m_journal_replay->shut_down(cancel_ops, ctx);
1353 });
1354
1355 m_journaler->stop_replay(ctx);
1356 }
1357
1358 template <typename I>
1359 void Journal<I>::handle_replay_process_ready(int r) {
1360 // journal::Replay is ready for more events -- attempt to pop another
1361 CephContext *cct = m_image_ctx.cct;
1362 ldout(cct, 20) << this << " " << __func__ << dendl;
1363
1364 ceph_assert(r == 0);
1365 {
1366 std::lock_guard locker{m_lock};
1367 ceph_assert(m_processing_entry);
1368 m_processing_entry = false;
1369 }
1370 handle_replay_ready();
1371 }
1372
1373 template <typename I>
1374 void Journal<I>::handle_replay_process_safe(ReplayEntry replay_entry, int r) {
1375 CephContext *cct = m_image_ctx.cct;
1376
1377 std::unique_lock locker{m_lock};
1378 ceph_assert(m_state == STATE_REPLAYING ||
1379 m_state == STATE_FLUSHING_RESTART ||
1380 m_state == STATE_FLUSHING_REPLAY);
1381
1382 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1383 if (r < 0) {
1384 if (r != -ECANCELED) {
1385 lderr(cct) << this << " " << __func__ << ": "
1386 << "failed to commit journal event to disk: "
1387 << cpp_strerror(r) << dendl;
1388 }
1389
1390 if (m_state == STATE_REPLAYING) {
1391 // abort the replay if we have an error
1392 transition_state(STATE_FLUSHING_RESTART, r);
1393 locker.unlock();
1394
1395 // stop replay, shut down, and restart
1396 Context* ctx = create_context_callback<
1397 Journal<I>, &Journal<I>::handle_flushing_restart>(this);
1398 ctx = new LambdaContext([this, ctx](int r) {
1399 // ensure the commit position is flushed to disk
1400 m_journaler->flush_commit_position(ctx);
1401 });
1402 ctx = new LambdaContext([this, cct, ctx](int r) {
1403 ldout(cct, 20) << this << " handle_replay_process_safe: "
1404 << "shut down replay" << dendl;
1405 {
1406 std::lock_guard locker{m_lock};
1407 ceph_assert(m_state == STATE_FLUSHING_RESTART);
1408 }
1409
1410 m_journal_replay->shut_down(true, ctx);
1411 });
1412 m_journaler->stop_replay(ctx);
1413 m_async_journal_op_tracker.finish_op();
1414 return;
1415 } else if (m_state == STATE_FLUSHING_REPLAY) {
1416 // end-of-replay flush in-progress -- we need to restart replay
1417 transition_state(STATE_FLUSHING_RESTART, r);
1418 locker.unlock();
1419 m_async_journal_op_tracker.finish_op();
1420 return;
1421 }
1422 } else {
1423 // only commit the entry if written successfully
1424 m_journaler->committed(replay_entry);
1425 }
1426 locker.unlock();
1427 m_async_journal_op_tracker.finish_op();
1428 }
1429
1430 template <typename I>
1431 void Journal<I>::handle_flushing_restart(int r) {
1432 std::lock_guard locker{m_lock};
1433
1434 CephContext *cct = m_image_ctx.cct;
1435 ldout(cct, 20) << this << " " << __func__ << dendl;
1436
1437 ceph_assert(r == 0);
1438 ceph_assert(m_state == STATE_FLUSHING_RESTART);
1439 if (m_close_pending) {
1440 destroy_journaler(r);
1441 return;
1442 }
1443
1444 recreate_journaler(r);
1445 }
1446
1447 template <typename I>
1448 void Journal<I>::handle_flushing_replay() {
1449 std::lock_guard locker{m_lock};
1450
1451 CephContext *cct = m_image_ctx.cct;
1452 ldout(cct, 20) << this << " " << __func__ << dendl;
1453
1454 ceph_assert(m_state == STATE_FLUSHING_REPLAY ||
1455 m_state == STATE_FLUSHING_RESTART);
1456 if (m_close_pending) {
1457 destroy_journaler(0);
1458 return;
1459 } else if (m_state == STATE_FLUSHING_RESTART) {
1460 // failed to replay one-or-more events -- restart
1461 recreate_journaler(0);
1462 return;
1463 }
1464
1465 delete m_journal_replay;
1466 m_journal_replay = NULL;
1467
1468 m_error_result = 0;
1469 start_append();
1470 }
1471
1472 template <typename I>
1473 void Journal<I>::handle_recording_stopped(int r) {
1474 CephContext *cct = m_image_ctx.cct;
1475 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1476
1477 std::lock_guard locker{m_lock};
1478 ceph_assert(m_state == STATE_STOPPING);
1479
1480 destroy_journaler(r);
1481 }
1482
1483 template <typename I>
1484 void Journal<I>::handle_journal_destroyed(int r) {
1485 CephContext *cct = m_image_ctx.cct;
1486 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1487
1488 if (r < 0) {
1489 lderr(cct) << this << " " << __func__
1490 << "error detected while closing journal: " << cpp_strerror(r)
1491 << dendl;
1492 }
1493
1494 std::lock_guard locker{m_lock};
1495 delete m_journaler;
1496 m_journaler = nullptr;
1497
1498 ceph_assert(m_state == STATE_CLOSING || m_state == STATE_RESTARTING_REPLAY);
1499 if (m_state == STATE_RESTARTING_REPLAY) {
1500 create_journaler();
1501 return;
1502 }
1503
1504 transition_state(STATE_CLOSED, r);
1505 }
1506
1507 template <typename I>
1508 void Journal<I>::handle_io_event_safe(int r, uint64_t tid) {
1509 CephContext *cct = m_image_ctx.cct;
1510 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << ", "
1511 << "tid=" << tid << dendl;
1512
1513 // journal will be flushed before closing
1514 ceph_assert(m_state == STATE_READY || m_state == STATE_STOPPING);
1515 if (r < 0) {
1516 lderr(cct) << this << " " << __func__ << ": "
1517 << "failed to commit IO event: " << cpp_strerror(r) << dendl;
1518 }
1519
1520 Contexts on_safe_contexts;
1521 {
1522 std::lock_guard event_locker{m_event_lock};
1523 typename Events::iterator it = m_events.find(tid);
1524 ceph_assert(it != m_events.end());
1525
1526 Event &event = it->second;
1527 on_safe_contexts.swap(event.on_safe_contexts);
1528
1529 if (r < 0 || event.committed_io) {
1530 // failed journal write so IO won't be sent -- or IO extent was
1531 // overwritten by future IO operations so this was a no-op IO event
1532 event.ret_val = r;
1533 for (auto &future : event.futures) {
1534 m_journaler->committed(future);
1535 }
1536 }
1537
1538 if (event.committed_io) {
1539 m_events.erase(it);
1540 } else {
1541 event.safe = true;
1542 }
1543 }
1544
1545 ldout(cct, 20) << this << " " << __func__ << ": "
1546 << "completing tid=" << tid << dendl;
1547
1548 // alert the cache about the journal event status
1549 for (Contexts::iterator it = on_safe_contexts.begin();
1550 it != on_safe_contexts.end(); ++it) {
1551 (*it)->complete(r);
1552 }
1553 }
1554
1555 template <typename I>
1556 void Journal<I>::handle_op_event_safe(int r, uint64_t tid,
1557 const Future &op_start_future,
1558 const Future &op_finish_future,
1559 Context *on_safe) {
1560 CephContext *cct = m_image_ctx.cct;
1561 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << ", "
1562 << "tid=" << tid << dendl;
1563
1564 // journal will be flushed before closing
1565 ceph_assert(m_state == STATE_READY || m_state == STATE_STOPPING);
1566 if (r < 0) {
1567 lderr(cct) << this << " " << __func__ << ": "
1568 << "failed to commit op event: " << cpp_strerror(r) << dendl;
1569 }
1570
1571 m_journaler->committed(op_start_future);
1572 m_journaler->committed(op_finish_future);
1573
1574 // reduce the replay window after committing an op event
1575 m_journaler->flush_commit_position(on_safe);
1576 }
1577
1578 template <typename I>
1579 void Journal<I>::stop_recording() {
1580 ceph_assert(ceph_mutex_is_locked(m_lock));
1581 ceph_assert(m_journaler != NULL);
1582
1583 ceph_assert(m_state == STATE_READY);
1584 transition_state(STATE_STOPPING, 0);
1585
1586 m_journaler->stop_append(util::create_async_context_callback(
1587 m_image_ctx, create_context_callback<
1588 Journal<I>, &Journal<I>::handle_recording_stopped>(this)));
1589 }
1590
1591 template <typename I>
1592 void Journal<I>::transition_state(State state, int r) {
1593 CephContext *cct = m_image_ctx.cct;
1594 ldout(cct, 20) << this << " " << __func__ << ": new state=" << state << dendl;
1595 ceph_assert(ceph_mutex_is_locked(m_lock));
1596 m_state = state;
1597
1598 if (m_error_result == 0 && r < 0) {
1599 m_error_result = r;
1600 }
1601
1602 if (is_steady_state()) {
1603 auto wait_for_state_contexts(std::move(m_wait_for_state_contexts));
1604 m_wait_for_state_contexts.clear();
1605
1606 for (auto ctx : wait_for_state_contexts) {
1607 ctx->complete(m_error_result);
1608 }
1609 }
1610 }
1611
1612 template <typename I>
1613 bool Journal<I>::is_steady_state() const {
1614 ceph_assert(ceph_mutex_is_locked(m_lock));
1615 switch (m_state) {
1616 case STATE_READY:
1617 case STATE_CLOSED:
1618 return true;
1619 case STATE_UNINITIALIZED:
1620 case STATE_INITIALIZING:
1621 case STATE_REPLAYING:
1622 case STATE_FLUSHING_RESTART:
1623 case STATE_RESTARTING_REPLAY:
1624 case STATE_FLUSHING_REPLAY:
1625 case STATE_STOPPING:
1626 case STATE_CLOSING:
1627 break;
1628 }
1629 return false;
1630 }
1631
1632 template <typename I>
1633 void Journal<I>::wait_for_steady_state(Context *on_state) {
1634 ceph_assert(ceph_mutex_is_locked(m_lock));
1635 ceph_assert(!is_steady_state());
1636
1637 CephContext *cct = m_image_ctx.cct;
1638 ldout(cct, 20) << this << " " << __func__ << ": on_state=" << on_state
1639 << dendl;
1640 m_wait_for_state_contexts.push_back(on_state);
1641 }
1642
1643 template <typename I>
1644 int Journal<I>::is_resync_requested(bool *do_resync) {
1645 std::lock_guard l{m_lock};
1646 return check_resync_requested(do_resync);
1647 }
1648
1649 template <typename I>
1650 int Journal<I>::check_resync_requested(bool *do_resync) {
1651 CephContext *cct = m_image_ctx.cct;
1652 ldout(cct, 20) << this << " " << __func__ << dendl;
1653
1654 ceph_assert(ceph_mutex_is_locked(m_lock));
1655 ceph_assert(do_resync != nullptr);
1656
1657 cls::journal::Client client;
1658 int r = m_journaler->get_cached_client(IMAGE_CLIENT_ID, &client);
1659 if (r < 0) {
1660 lderr(cct) << this << " " << __func__ << ": "
1661 << "failed to retrieve client: " << cpp_strerror(r) << dendl;
1662 return r;
1663 }
1664
1665 librbd::journal::ClientData client_data;
1666 auto bl_it = client.data.cbegin();
1667 try {
1668 decode(client_data, bl_it);
1669 } catch (const buffer::error &err) {
1670 lderr(cct) << this << " " << __func__ << ": "
1671 << "failed to decode client data: " << err.what() << dendl;
1672 return -EINVAL;
1673 }
1674
1675 journal::ImageClientMeta *image_client_meta =
1676 boost::get<journal::ImageClientMeta>(&client_data.client_meta);
1677 if (image_client_meta == nullptr) {
1678 lderr(cct) << this << " " << __func__ << ": "
1679 << "failed to access image client meta struct" << dendl;
1680 return -EINVAL;
1681 }
1682
1683 *do_resync = image_client_meta->resync_requested;
1684
1685 return 0;
1686 }
1687
1688 struct C_RefreshTags : public Context {
1689 AsyncOpTracker &async_op_tracker;
1690 Context *on_finish = nullptr;
1691
1692 ceph::mutex lock =
1693 ceph::make_mutex("librbd::Journal::C_RefreshTags::lock");
1694 uint64_t tag_tid = 0;
1695 journal::TagData tag_data;
1696
1697 explicit C_RefreshTags(AsyncOpTracker &async_op_tracker)
1698 : async_op_tracker(async_op_tracker) {
1699 async_op_tracker.start_op();
1700 }
1701 ~C_RefreshTags() override {
1702 async_op_tracker.finish_op();
1703 }
1704
1705 void finish(int r) override {
1706 on_finish->complete(r);
1707 }
1708 };
1709
1710 template <typename I>
1711 void Journal<I>::handle_metadata_updated() {
1712 CephContext *cct = m_image_ctx.cct;
1713 std::lock_guard locker{m_lock};
1714
1715 if (m_state != STATE_READY && !is_journal_replaying(m_lock)) {
1716 return;
1717 } else if (is_tag_owner(m_lock)) {
1718 ldout(cct, 20) << this << " " << __func__ << ": primary image" << dendl;
1719 return;
1720 } else if (m_listeners.empty()) {
1721 ldout(cct, 20) << this << " " << __func__ << ": no listeners" << dendl;
1722 return;
1723 }
1724
1725 uint64_t refresh_sequence = ++m_refresh_sequence;
1726 ldout(cct, 20) << this << " " << __func__ << ": "
1727 << "refresh_sequence=" << refresh_sequence << dendl;
1728
1729 // pull the most recent tags from the journal, decode, and
1730 // update the internal tag state
1731 C_RefreshTags *refresh_ctx = new C_RefreshTags(m_async_journal_op_tracker);
1732 refresh_ctx->on_finish = new LambdaContext(
1733 [this, refresh_sequence, refresh_ctx](int r) {
1734 handle_refresh_metadata(refresh_sequence, refresh_ctx->tag_tid,
1735 refresh_ctx->tag_data, r);
1736 });
1737 C_DecodeTags *decode_tags_ctx = new C_DecodeTags(
1738 cct, &refresh_ctx->lock, &refresh_ctx->tag_tid,
1739 &refresh_ctx->tag_data, refresh_ctx);
1740 m_journaler->get_tags(m_tag_tid == 0 ? 0 : m_tag_tid - 1, m_tag_class,
1741 &decode_tags_ctx->tags, decode_tags_ctx);
1742 }
1743
1744 template <typename I>
1745 void Journal<I>::handle_refresh_metadata(uint64_t refresh_sequence,
1746 uint64_t tag_tid,
1747 journal::TagData tag_data, int r) {
1748 CephContext *cct = m_image_ctx.cct;
1749 std::unique_lock locker{m_lock};
1750
1751 if (r < 0) {
1752 lderr(cct) << this << " " << __func__ << ": failed to refresh metadata: "
1753 << cpp_strerror(r) << dendl;
1754 return;
1755 } else if (m_state != STATE_READY && !is_journal_replaying(m_lock)) {
1756 return;
1757 } else if (refresh_sequence != m_refresh_sequence) {
1758 // another, more up-to-date refresh is in-flight
1759 return;
1760 }
1761
1762 ldout(cct, 20) << this << " " << __func__ << ": "
1763 << "refresh_sequence=" << refresh_sequence << ", "
1764 << "tag_tid=" << tag_tid << ", "
1765 << "tag_data=" << tag_data << dendl;
1766 m_listener_cond.wait(locker, [this] { return !m_listener_notify; });
1767
1768 bool was_tag_owner = is_tag_owner(m_lock);
1769 if (m_tag_tid < tag_tid) {
1770 m_tag_tid = tag_tid;
1771 m_tag_data = tag_data;
1772 }
1773 bool promoted_to_primary = (!was_tag_owner && is_tag_owner(m_lock));
1774
1775 bool resync_requested = false;
1776 r = check_resync_requested(&resync_requested);
1777 if (r < 0) {
1778 lderr(cct) << this << " " << __func__ << ": "
1779 << "failed to check if a resync was requested" << dendl;
1780 return;
1781 }
1782
1783 Listeners listeners(m_listeners);
1784 m_listener_notify = true;
1785 locker.unlock();
1786
1787 if (promoted_to_primary) {
1788 for (auto listener : listeners) {
1789 listener->handle_promoted();
1790 }
1791 } else if (resync_requested) {
1792 for (auto listener : listeners) {
1793 listener->handle_resync();
1794 }
1795 }
1796
1797 locker.lock();
1798 m_listener_notify = false;
1799 m_listener_cond.notify_all();
1800 }
1801
1802 template <typename I>
1803 void Journal<I>::add_listener(journal::Listener *listener) {
1804 std::lock_guard locker{m_lock};
1805 m_listeners.insert(listener);
1806 }
1807
1808 template <typename I>
1809 void Journal<I>::remove_listener(journal::Listener *listener) {
1810 std::unique_lock locker{m_lock};
1811 m_listener_cond.wait(locker, [this] { return !m_listener_notify; });
1812 m_listeners.erase(listener);
1813 }
1814
1815 } // namespace librbd
1816
1817 #ifndef TEST_F
1818 template class librbd::Journal<librbd::ImageCtx>;
1819 #endif