]> git.proxmox.com Git - ceph.git/blame - ceph/src/librbd/Journal.cc
build: use dgit for download target
[ceph.git] / ceph / src / librbd / Journal.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "librbd/Journal.h"
5#include "include/rados/librados.hpp"
6#include "common/errno.h"
7#include "common/Timer.h"
8#include "common/WorkQueue.h"
9#include "cls/journal/cls_journal_types.h"
10#include "journal/Journaler.h"
11#include "journal/Policy.h"
12#include "journal/ReplayEntry.h"
13#include "journal/Settings.h"
14#include "journal/Utils.h"
15#include "librbd/ExclusiveLock.h"
16#include "librbd/ImageCtx.h"
17#include "librbd/io/ImageRequestWQ.h"
11fdf7f2
TL
18#include "librbd/io/ObjectDispatchSpec.h"
19#include "librbd/io/ObjectDispatcher.h"
7c673cae
FG
20#include "librbd/journal/CreateRequest.h"
21#include "librbd/journal/DemoteRequest.h"
11fdf7f2 22#include "librbd/journal/ObjectDispatch.h"
7c673cae
FG
23#include "librbd/journal/OpenRequest.h"
24#include "librbd/journal/RemoveRequest.h"
11fdf7f2 25#include "librbd/journal/ResetRequest.h"
7c673cae
FG
26#include "librbd/journal/Replay.h"
27#include "librbd/journal/PromoteRequest.h"
28
29#include <boost/scope_exit.hpp>
30#include <utility>
31
32#define dout_subsys ceph_subsys_rbd
33#undef dout_prefix
34#define dout_prefix *_dout << "librbd::Journal: "
35
36namespace librbd {
37
38using util::create_async_context_callback;
39using util::create_context_callback;
40using journal::util::C_DecodeTag;
41using journal::util::C_DecodeTags;
42
43namespace {
44
45// TODO: once journaler is 100% async, remove separate threads and
46// reuse ImageCtx's thread pool
47class ThreadPoolSingleton : public ThreadPool {
48public:
49 explicit ThreadPoolSingleton(CephContext *cct)
50 : ThreadPool(cct, "librbd::Journal", "tp_librbd_journ", 1) {
51 start();
52 }
53 ~ThreadPoolSingleton() override {
54 stop();
55 }
56};
57
58template <typename I>
59struct C_IsTagOwner : public Context {
60 librados::IoCtx &io_ctx;
61 std::string image_id;
62 bool *is_tag_owner;
63 ContextWQ *op_work_queue;
64 Context *on_finish;
65
66 CephContext *cct = nullptr;
67 Journaler *journaler;
68 cls::journal::Client client;
69 journal::ImageClientMeta client_meta;
11fdf7f2 70 uint64_t tag_tid = 0;
7c673cae
FG
71 journal::TagData tag_data;
72
73 C_IsTagOwner(librados::IoCtx &io_ctx, const std::string &image_id,
74 bool *is_tag_owner, ContextWQ *op_work_queue, Context *on_finish)
75 : io_ctx(io_ctx), image_id(image_id), is_tag_owner(is_tag_owner),
76 op_work_queue(op_work_queue), on_finish(on_finish),
77 cct(reinterpret_cast<CephContext*>(io_ctx.cct())),
78 journaler(new Journaler(io_ctx, image_id, Journal<>::IMAGE_CLIENT_ID,
79 {})) {
80 }
81
82 void finish(int r) override {
83 ldout(cct, 20) << this << " C_IsTagOwner::" << __func__ << ": r=" << r
84 << dendl;
85 if (r < 0) {
86 lderr(cct) << this << " C_IsTagOwner::" << __func__ << ": "
87 << "failed to get tag owner: " << cpp_strerror(r) << dendl;
88 } else {
89 *is_tag_owner = (tag_data.mirror_uuid == Journal<>::LOCAL_MIRROR_UUID);
90 }
91
92 Journaler *journaler = this->journaler;
93 Context *on_finish = this->on_finish;
94 FunctionContext *ctx = new FunctionContext(
95 [journaler, on_finish](int r) {
96 on_finish->complete(r);
97 delete journaler;
98 });
99 op_work_queue->queue(ctx, r);
100 }
101};
102
103struct C_GetTagOwner : public Context {
104 std::string *mirror_uuid;
105 Context *on_finish;
106
107 Journaler journaler;
108 cls::journal::Client client;
109 journal::ImageClientMeta client_meta;
11fdf7f2 110 uint64_t tag_tid = 0;
7c673cae
FG
111 journal::TagData tag_data;
112
113 C_GetTagOwner(librados::IoCtx &io_ctx, const std::string &image_id,
114 std::string *mirror_uuid, Context *on_finish)
115 : mirror_uuid(mirror_uuid), on_finish(on_finish),
116 journaler(io_ctx, image_id, Journal<>::IMAGE_CLIENT_ID, {}) {
117 }
118
119 virtual void finish(int r) {
120 if (r >= 0) {
121 *mirror_uuid = tag_data.mirror_uuid;
122 }
123 on_finish->complete(r);
124 }
125};
126
127template <typename J>
128struct GetTagsRequest {
129 CephContext *cct;
130 J *journaler;
131 cls::journal::Client *client;
132 journal::ImageClientMeta *client_meta;
133 uint64_t *tag_tid;
134 journal::TagData *tag_data;
135 Context *on_finish;
136
137 Mutex lock;
138
139 GetTagsRequest(CephContext *cct, J *journaler, cls::journal::Client *client,
140 journal::ImageClientMeta *client_meta, uint64_t *tag_tid,
141 journal::TagData *tag_data, Context *on_finish)
142 : cct(cct), journaler(journaler), client(client), client_meta(client_meta),
143 tag_tid(tag_tid), tag_data(tag_data), on_finish(on_finish), lock("lock") {
144 }
145
146 /**
147 * @verbatim
148 *
149 * <start>
150 * |
151 * v
152 * GET_CLIENT * * * * * * * * * * * *
153 * | *
154 * v *
155 * GET_TAGS * * * * * * * * * * * * * (error)
156 * | *
157 * v *
158 * <finish> * * * * * * * * * * * * *
159 *
160 * @endverbatim
161 */
162
163 void send() {
164 send_get_client();
165 }
166
167 void send_get_client() {
168 ldout(cct, 20) << __func__ << dendl;
169
170 FunctionContext *ctx = new FunctionContext(
171 [this](int r) {
172 handle_get_client(r);
173 });
174 journaler->get_client(Journal<ImageCtx>::IMAGE_CLIENT_ID, client, ctx);
175 }
176
177 void handle_get_client(int r) {
178 ldout(cct, 20) << __func__ << ": r=" << r << dendl;
179
180 if (r < 0) {
181 complete(r);
182 return;
183 }
184
185 librbd::journal::ClientData client_data;
11fdf7f2 186 auto bl_it = client->data.cbegin();
7c673cae 187 try {
11fdf7f2 188 decode(client_data, bl_it);
7c673cae
FG
189 } catch (const buffer::error &err) {
190 lderr(cct) << this << " OpenJournalerRequest::" << __func__ << ": "
191 << "failed to decode client data" << dendl;
192 complete(-EBADMSG);
193 return;
194 }
195
196 journal::ImageClientMeta *image_client_meta =
197 boost::get<journal::ImageClientMeta>(&client_data.client_meta);
198 if (image_client_meta == nullptr) {
199 lderr(cct) << this << " OpenJournalerRequest::" << __func__ << ": "
200 << "failed to get client meta" << dendl;
201 complete(-EINVAL);
202 return;
203 }
204 *client_meta = *image_client_meta;
205
206 send_get_tags();
207 }
208
209 void send_get_tags() {
210 ldout(cct, 20) << __func__ << dendl;
211
212 FunctionContext *ctx = new FunctionContext(
213 [this](int r) {
214 handle_get_tags(r);
215 });
216 C_DecodeTags *tags_ctx = new C_DecodeTags(cct, &lock, tag_tid, tag_data,
217 ctx);
218 journaler->get_tags(client_meta->tag_class, &tags_ctx->tags, tags_ctx);
219 }
220
221 void handle_get_tags(int r) {
222 ldout(cct, 20) << __func__ << ": r=" << r << dendl;
223
224 complete(r);
225 }
226
227 void complete(int r) {
228 on_finish->complete(r);
229 delete this;
230 }
231};
232
233template <typename J>
234void get_tags(CephContext *cct, J *journaler,
235 cls::journal::Client *client,
236 journal::ImageClientMeta *client_meta,
237 uint64_t *tag_tid, journal::TagData *tag_data,
238 Context *on_finish) {
239 ldout(cct, 20) << __func__ << dendl;
240
241 GetTagsRequest<J> *req =
242 new GetTagsRequest<J>(cct, journaler, client, client_meta, tag_tid,
243 tag_data, on_finish);
244 req->send();
245}
246
247template <typename J>
248int allocate_journaler_tag(CephContext *cct, J *journaler,
249 uint64_t tag_class,
250 const journal::TagPredecessor &predecessor,
251 const std::string &mirror_uuid,
252 cls::journal::Tag *new_tag) {
253 journal::TagData tag_data;
254 tag_data.mirror_uuid = mirror_uuid;
255 tag_data.predecessor = predecessor;
256
257 bufferlist tag_bl;
11fdf7f2 258 encode(tag_data, tag_bl);
7c673cae
FG
259
260 C_SaferCond allocate_tag_ctx;
261 journaler->allocate_tag(tag_class, tag_bl, new_tag, &allocate_tag_ctx);
262
263 int r = allocate_tag_ctx.wait();
264 if (r < 0) {
265 lderr(cct) << __func__ << ": "
266 << "failed to allocate tag: " << cpp_strerror(r) << dendl;
267 return r;
268 }
269 return 0;
270}
271
272} // anonymous namespace
273
274// client id for local image
275template <typename I>
276const std::string Journal<I>::IMAGE_CLIENT_ID("");
277
278// mirror uuid to use for local images
279template <typename I>
280const std::string Journal<I>::LOCAL_MIRROR_UUID("");
281
282// mirror uuid to use for orphaned (demoted) images
283template <typename I>
284const std::string Journal<I>::ORPHAN_MIRROR_UUID("<orphan>");
285
286template <typename I>
287std::ostream &operator<<(std::ostream &os,
288 const typename Journal<I>::State &state) {
289 switch (state) {
290 case Journal<I>::STATE_UNINITIALIZED:
291 os << "Uninitialized";
292 break;
293 case Journal<I>::STATE_INITIALIZING:
294 os << "Initializing";
295 break;
296 case Journal<I>::STATE_REPLAYING:
297 os << "Replaying";
298 break;
299 case Journal<I>::STATE_FLUSHING_RESTART:
300 os << "FlushingRestart";
301 break;
302 case Journal<I>::STATE_RESTARTING_REPLAY:
303 os << "RestartingReplay";
304 break;
305 case Journal<I>::STATE_FLUSHING_REPLAY:
306 os << "FlushingReplay";
307 break;
308 case Journal<I>::STATE_READY:
309 os << "Ready";
310 break;
311 case Journal<I>::STATE_STOPPING:
312 os << "Stopping";
313 break;
314 case Journal<I>::STATE_CLOSING:
315 os << "Closing";
316 break;
317 case Journal<I>::STATE_CLOSED:
318 os << "Closed";
319 break;
320 default:
321 os << "Unknown (" << static_cast<uint32_t>(state) << ")";
322 break;
323 }
324 return os;
325}
326
327template <typename I>
328Journal<I>::Journal(I &image_ctx)
329 : m_image_ctx(image_ctx), m_journaler(NULL),
330 m_lock("Journal<I>::m_lock"), m_state(STATE_UNINITIALIZED),
331 m_error_result(0), m_replay_handler(this), m_close_pending(false),
332 m_event_lock("Journal<I>::m_event_lock"), m_event_tid(0),
333 m_blocking_writes(false), m_journal_replay(NULL),
334 m_metadata_listener(this) {
335
336 CephContext *cct = m_image_ctx.cct;
337 ldout(cct, 5) << this << ": ictx=" << &m_image_ctx << dendl;
338
11fdf7f2
TL
339 auto thread_pool_singleton =
340 &cct->lookup_or_create_singleton_object<ThreadPoolSingleton>(
341 "librbd::journal::thread_pool", false, cct);
7c673cae 342 m_work_queue = new ContextWQ("librbd::journal::work_queue",
11fdf7f2 343 cct->_conf.get_val<uint64_t>("rbd_op_thread_timeout"),
7c673cae
FG
344 thread_pool_singleton);
345 ImageCtx::get_timer_instance(cct, &m_timer, &m_timer_lock);
346}
347
348template <typename I>
349Journal<I>::~Journal() {
350 if (m_work_queue != nullptr) {
351 m_work_queue->drain();
352 delete m_work_queue;
353 }
354
11fdf7f2
TL
355 ceph_assert(m_state == STATE_UNINITIALIZED || m_state == STATE_CLOSED);
356 ceph_assert(m_journaler == NULL);
357 ceph_assert(m_journal_replay == NULL);
358 ceph_assert(m_wait_for_state_contexts.empty());
7c673cae
FG
359}
360
361template <typename I>
362bool Journal<I>::is_journal_supported(I &image_ctx) {
11fdf7f2 363 ceph_assert(image_ctx.snap_lock.is_locked());
7c673cae
FG
364 return ((image_ctx.features & RBD_FEATURE_JOURNALING) &&
365 !image_ctx.read_only && image_ctx.snap_id == CEPH_NOSNAP);
366}
367
368template <typename I>
369int Journal<I>::create(librados::IoCtx &io_ctx, const std::string &image_id,
370 uint8_t order, uint8_t splay_width,
371 const std::string &object_pool) {
372 CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
373 ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
374
375 ThreadPool *thread_pool;
376 ContextWQ *op_work_queue;
377 ImageCtx::get_thread_pool_instance(cct, &thread_pool, &op_work_queue);
378
379 C_SaferCond cond;
380 journal::TagData tag_data(LOCAL_MIRROR_UUID);
381 journal::CreateRequest<I> *req = journal::CreateRequest<I>::create(
382 io_ctx, image_id, order, splay_width, object_pool, cls::journal::Tag::TAG_CLASS_NEW,
383 tag_data, IMAGE_CLIENT_ID, op_work_queue, &cond);
384 req->send();
385
386 return cond.wait();
387}
388
389template <typename I>
390int Journal<I>::remove(librados::IoCtx &io_ctx, const std::string &image_id) {
391 CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
392 ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
393
394 ThreadPool *thread_pool;
395 ContextWQ *op_work_queue;
396 ImageCtx::get_thread_pool_instance(cct, &thread_pool, &op_work_queue);
397
398 C_SaferCond cond;
399 journal::RemoveRequest<I> *req = journal::RemoveRequest<I>::create(
400 io_ctx, image_id, IMAGE_CLIENT_ID, op_work_queue, &cond);
401 req->send();
402
403 return cond.wait();
404}
405
406template <typename I>
407int Journal<I>::reset(librados::IoCtx &io_ctx, const std::string &image_id) {
408 CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
409 ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
410
11fdf7f2
TL
411 ThreadPool *thread_pool;
412 ContextWQ *op_work_queue;
413 ImageCtx::get_thread_pool_instance(cct, &thread_pool, &op_work_queue);
7c673cae
FG
414
415 C_SaferCond cond;
11fdf7f2
TL
416 auto req = journal::ResetRequest<I>::create(io_ctx, image_id, IMAGE_CLIENT_ID,
417 Journal<>::LOCAL_MIRROR_UUID,
418 op_work_queue, &cond);
419 req->send();
7c673cae 420
11fdf7f2 421 return cond.wait();
7c673cae
FG
422}
423
424template <typename I>
425void Journal<I>::is_tag_owner(I *image_ctx, bool *owner,
426 Context *on_finish) {
427 Journal<I>::is_tag_owner(image_ctx->md_ctx, image_ctx->id, owner,
428 image_ctx->op_work_queue, on_finish);
429}
430
431template <typename I>
432void Journal<I>::is_tag_owner(librados::IoCtx& io_ctx, std::string& image_id,
433 bool *is_tag_owner, ContextWQ *op_work_queue,
434 Context *on_finish) {
435 CephContext *cct = reinterpret_cast<CephContext*>(io_ctx.cct());
436 ldout(cct, 20) << __func__ << dendl;
437
438 C_IsTagOwner<I> *is_tag_owner_ctx = new C_IsTagOwner<I>(
439 io_ctx, image_id, is_tag_owner, op_work_queue, on_finish);
440 get_tags(cct, is_tag_owner_ctx->journaler, &is_tag_owner_ctx->client,
441 &is_tag_owner_ctx->client_meta, &is_tag_owner_ctx->tag_tid,
442 &is_tag_owner_ctx->tag_data, is_tag_owner_ctx);
443}
444
445template <typename I>
446void Journal<I>::get_tag_owner(IoCtx& io_ctx, std::string& image_id,
447 std::string *mirror_uuid,
448 ContextWQ *op_work_queue, Context *on_finish) {
11fdf7f2 449 CephContext *cct = static_cast<CephContext *>(io_ctx.cct());
7c673cae
FG
450 ldout(cct, 20) << __func__ << dendl;
451
452 auto ctx = new C_GetTagOwner(io_ctx, image_id, mirror_uuid, on_finish);
453 get_tags(cct, &ctx->journaler, &ctx->client, &ctx->client_meta, &ctx->tag_tid,
454 &ctx->tag_data, create_async_context_callback(op_work_queue, ctx));
455}
456
457template <typename I>
458int Journal<I>::request_resync(I *image_ctx) {
459 CephContext *cct = image_ctx->cct;
460 ldout(cct, 20) << __func__ << dendl;
461
462 Journaler journaler(image_ctx->md_ctx, image_ctx->id, IMAGE_CLIENT_ID, {});
463
464 Mutex lock("lock");
465 journal::ImageClientMeta client_meta;
466 uint64_t tag_tid;
467 journal::TagData tag_data;
468
469 C_SaferCond open_ctx;
470 auto open_req = journal::OpenRequest<I>::create(image_ctx, &journaler, &lock,
471 &client_meta, &tag_tid,
472 &tag_data, &open_ctx);
473 open_req->send();
474
475 BOOST_SCOPE_EXIT_ALL(&journaler) {
476 journaler.shut_down();
477 };
478
479 int r = open_ctx.wait();
480 if (r < 0) {
481 return r;
482 }
483
484 client_meta.resync_requested = true;
485
486 journal::ClientData client_data(client_meta);
487 bufferlist client_data_bl;
11fdf7f2 488 encode(client_data, client_data_bl);
7c673cae
FG
489
490 C_SaferCond update_client_ctx;
491 journaler.update_client(client_data_bl, &update_client_ctx);
492
493 r = update_client_ctx.wait();
494 if (r < 0) {
495 lderr(cct) << __func__ << ": "
496 << "failed to update client: " << cpp_strerror(r) << dendl;
497 return r;
498 }
499 return 0;
500}
501
502template <typename I>
503void Journal<I>::promote(I *image_ctx, Context *on_finish) {
504 CephContext *cct = image_ctx->cct;
505 ldout(cct, 20) << __func__ << dendl;
506
507 auto promote_req = journal::PromoteRequest<I>::create(image_ctx, false,
508 on_finish);
509 promote_req->send();
510}
511
512template <typename I>
513void Journal<I>::demote(I *image_ctx, Context *on_finish) {
514 CephContext *cct = image_ctx->cct;
515 ldout(cct, 20) << __func__ << dendl;
516
517 auto req = journal::DemoteRequest<I>::create(*image_ctx, on_finish);
518 req->send();
519}
520
521template <typename I>
522bool Journal<I>::is_journal_ready() const {
523 Mutex::Locker locker(m_lock);
524 return (m_state == STATE_READY);
525}
526
527template <typename I>
528bool Journal<I>::is_journal_replaying() const {
529 Mutex::Locker locker(m_lock);
530 return is_journal_replaying(m_lock);
531}
532
533template <typename I>
534bool Journal<I>::is_journal_replaying(const Mutex &) const {
11fdf7f2 535 ceph_assert(m_lock.is_locked());
7c673cae
FG
536 return (m_state == STATE_REPLAYING ||
537 m_state == STATE_FLUSHING_REPLAY ||
538 m_state == STATE_FLUSHING_RESTART ||
539 m_state == STATE_RESTARTING_REPLAY);
540}
541
542template <typename I>
543bool Journal<I>::is_journal_appending() const {
11fdf7f2 544 ceph_assert(m_image_ctx.snap_lock.is_locked());
7c673cae
FG
545 Mutex::Locker locker(m_lock);
546 return (m_state == STATE_READY &&
547 !m_image_ctx.get_journal_policy()->append_disabled());
548}
549
550template <typename I>
551void Journal<I>::wait_for_journal_ready(Context *on_ready) {
552 on_ready = create_async_context_callback(m_image_ctx, on_ready);
553
554 Mutex::Locker locker(m_lock);
555 if (m_state == STATE_READY) {
556 on_ready->complete(m_error_result);
557 } else {
558 wait_for_steady_state(on_ready);
559 }
560}
561
562template <typename I>
563void Journal<I>::open(Context *on_finish) {
564 CephContext *cct = m_image_ctx.cct;
565 ldout(cct, 20) << this << " " << __func__ << dendl;
566
567 on_finish = create_async_context_callback(m_image_ctx, on_finish);
568
11fdf7f2
TL
569 // inject our handler into the object dispatcher chain
570 m_image_ctx.io_object_dispatcher->register_object_dispatch(
571 journal::ObjectDispatch<I>::create(&m_image_ctx, this));
572
7c673cae 573 Mutex::Locker locker(m_lock);
11fdf7f2 574 ceph_assert(m_state == STATE_UNINITIALIZED);
7c673cae
FG
575 wait_for_steady_state(on_finish);
576 create_journaler();
577}
578
579template <typename I>
580void Journal<I>::close(Context *on_finish) {
581 CephContext *cct = m_image_ctx.cct;
582 ldout(cct, 20) << this << " " << __func__ << dendl;
583
11fdf7f2
TL
584 on_finish = new FunctionContext([this, on_finish](int r) {
585 // remove our handler from object dispatcher chain - preserve error
586 auto ctx = new FunctionContext([on_finish, r](int _) {
587 on_finish->complete(r);
588 });
589 m_image_ctx.io_object_dispatcher->shut_down_object_dispatch(
590 io::OBJECT_DISPATCH_LAYER_JOURNAL, ctx);
591 });
7c673cae
FG
592 on_finish = create_async_context_callback(m_image_ctx, on_finish);
593
594 Mutex::Locker locker(m_lock);
595 while (m_listener_notify) {
596 m_listener_cond.Wait(m_lock);
597 }
598
599 Listeners listeners(m_listeners);
600 m_listener_notify = true;
601 m_lock.Unlock();
602 for (auto listener : listeners) {
603 listener->handle_close();
604 }
605
606 m_lock.Lock();
607 m_listener_notify = false;
608 m_listener_cond.Signal();
609
11fdf7f2 610 ceph_assert(m_state != STATE_UNINITIALIZED);
7c673cae
FG
611 if (m_state == STATE_CLOSED) {
612 on_finish->complete(m_error_result);
613 return;
614 }
615
616 if (m_state == STATE_READY) {
617 stop_recording();
618 }
619
620 m_close_pending = true;
621 wait_for_steady_state(on_finish);
622}
623
624template <typename I>
625bool Journal<I>::is_tag_owner() const {
626 Mutex::Locker locker(m_lock);
627 return is_tag_owner(m_lock);
628}
629
630template <typename I>
631bool Journal<I>::is_tag_owner(const Mutex &) const {
11fdf7f2 632 ceph_assert(m_lock.is_locked());
7c673cae
FG
633 return (m_tag_data.mirror_uuid == LOCAL_MIRROR_UUID);
634}
635
636template <typename I>
637uint64_t Journal<I>::get_tag_tid() const {
638 Mutex::Locker locker(m_lock);
639 return m_tag_tid;
640}
641
642template <typename I>
643journal::TagData Journal<I>::get_tag_data() const {
644 Mutex::Locker locker(m_lock);
645 return m_tag_data;
646}
647
648template <typename I>
649void Journal<I>::allocate_local_tag(Context *on_finish) {
650 CephContext *cct = m_image_ctx.cct;
651 ldout(cct, 20) << this << " " << __func__ << dendl;
652
653 journal::TagPredecessor predecessor;
654 predecessor.mirror_uuid = LOCAL_MIRROR_UUID;
655 {
656 Mutex::Locker locker(m_lock);
11fdf7f2 657 ceph_assert(m_journaler != nullptr && is_tag_owner(m_lock));
7c673cae
FG
658
659 cls::journal::Client client;
660 int r = m_journaler->get_cached_client(IMAGE_CLIENT_ID, &client);
661 if (r < 0) {
662 lderr(cct) << this << " " << __func__ << ": "
663 << "failed to retrieve client: " << cpp_strerror(r) << dendl;
664 m_image_ctx.op_work_queue->queue(on_finish, r);
665 return;
666 }
667
668 // since we are primary, populate the predecessor with our known commit
669 // position
11fdf7f2 670 ceph_assert(m_tag_data.mirror_uuid == LOCAL_MIRROR_UUID);
7c673cae
FG
671 if (!client.commit_position.object_positions.empty()) {
672 auto position = client.commit_position.object_positions.front();
673 predecessor.commit_valid = true;
674 predecessor.tag_tid = position.tag_tid;
675 predecessor.entry_tid = position.entry_tid;
676 }
677 }
678
679 allocate_tag(LOCAL_MIRROR_UUID, predecessor, on_finish);
680}
681
682template <typename I>
683void Journal<I>::allocate_tag(const std::string &mirror_uuid,
684 const journal::TagPredecessor &predecessor,
685 Context *on_finish) {
686 CephContext *cct = m_image_ctx.cct;
687 ldout(cct, 20) << this << " " << __func__ << ": mirror_uuid=" << mirror_uuid
688 << dendl;
689
690 Mutex::Locker locker(m_lock);
11fdf7f2 691 ceph_assert(m_journaler != nullptr);
7c673cae
FG
692
693 journal::TagData tag_data;
694 tag_data.mirror_uuid = mirror_uuid;
695 tag_data.predecessor = predecessor;
696
697 bufferlist tag_bl;
11fdf7f2 698 encode(tag_data, tag_bl);
7c673cae
FG
699
700 C_DecodeTag *decode_tag_ctx = new C_DecodeTag(cct, &m_lock, &m_tag_tid,
701 &m_tag_data, on_finish);
702 m_journaler->allocate_tag(m_tag_class, tag_bl, &decode_tag_ctx->tag,
703 decode_tag_ctx);
704}
705
706template <typename I>
707void Journal<I>::flush_commit_position(Context *on_finish) {
708 CephContext *cct = m_image_ctx.cct;
709 ldout(cct, 20) << this << " " << __func__ << dendl;
710
711 Mutex::Locker locker(m_lock);
11fdf7f2 712 ceph_assert(m_journaler != nullptr);
7c673cae
FG
713 m_journaler->flush_commit_position(on_finish);
714}
715
716template <typename I>
717uint64_t Journal<I>::append_write_event(uint64_t offset, size_t length,
718 const bufferlist &bl,
7c673cae 719 bool flush_entry) {
11fdf7f2 720 ceph_assert(m_max_append_size > journal::AioWriteEvent::get_fixed_size());
7c673cae
FG
721 uint64_t max_write_data_size =
722 m_max_append_size - journal::AioWriteEvent::get_fixed_size();
723
724 // ensure that the write event fits within the journal entry
725 Bufferlists bufferlists;
726 uint64_t bytes_remaining = length;
727 uint64_t event_offset = 0;
728 do {
11fdf7f2 729 uint64_t event_length = std::min(bytes_remaining, max_write_data_size);
7c673cae
FG
730
731 bufferlist event_bl;
732 event_bl.substr_of(bl, event_offset, event_length);
733 journal::EventEntry event_entry(journal::AioWriteEvent(offset + event_offset,
734 event_length,
735 event_bl),
736 ceph_clock_now());
737
738 bufferlists.emplace_back();
11fdf7f2 739 encode(event_entry, bufferlists.back());
7c673cae
FG
740
741 event_offset += event_length;
742 bytes_remaining -= event_length;
743 } while (bytes_remaining > 0);
744
11fdf7f2
TL
745 return append_io_events(journal::EVENT_TYPE_AIO_WRITE, bufferlists, offset,
746 length, flush_entry, 0);
7c673cae
FG
747}
748
749template <typename I>
750uint64_t Journal<I>::append_io_event(journal::EventEntry &&event_entry,
7c673cae 751 uint64_t offset, size_t length,
b32b8144 752 bool flush_entry, int filter_ret_val) {
7c673cae
FG
753 bufferlist bl;
754 event_entry.timestamp = ceph_clock_now();
11fdf7f2
TL
755 encode(event_entry, bl);
756 return append_io_events(event_entry.get_event_type(), {bl}, offset, length,
757 flush_entry, filter_ret_val);
7c673cae
FG
758}
759
760template <typename I>
761uint64_t Journal<I>::append_io_events(journal::EventType event_type,
762 const Bufferlists &bufferlists,
7c673cae 763 uint64_t offset, size_t length,
b32b8144 764 bool flush_entry, int filter_ret_val) {
11fdf7f2 765 ceph_assert(!bufferlists.empty());
7c673cae
FG
766
767 uint64_t tid;
768 {
769 Mutex::Locker locker(m_lock);
11fdf7f2 770 ceph_assert(m_state == STATE_READY);
7c673cae
FG
771
772 tid = ++m_event_tid;
11fdf7f2 773 ceph_assert(tid != 0);
7c673cae
FG
774 }
775
776 Futures futures;
777 for (auto &bl : bufferlists) {
11fdf7f2 778 ceph_assert(bl.length() <= m_max_append_size);
7c673cae
FG
779 futures.push_back(m_journaler->append(m_tag_tid, bl));
780 }
781
782 {
783 Mutex::Locker event_locker(m_event_lock);
11fdf7f2 784 m_events[tid] = Event(futures, offset, length, filter_ret_val);
7c673cae
FG
785 }
786
787 CephContext *cct = m_image_ctx.cct;
788 ldout(cct, 20) << this << " " << __func__ << ": "
789 << "event=" << event_type << ", "
7c673cae
FG
790 << "offset=" << offset << ", "
791 << "length=" << length << ", "
792 << "flush=" << flush_entry << ", tid=" << tid << dendl;
793
794 Context *on_safe = create_async_context_callback(
795 m_image_ctx, new C_IOEventSafe(this, tid));
796 if (flush_entry) {
797 futures.back().flush(on_safe);
798 } else {
799 futures.back().wait(on_safe);
800 }
801
802 return tid;
803}
804
805template <typename I>
806void Journal<I>::commit_io_event(uint64_t tid, int r) {
807 CephContext *cct = m_image_ctx.cct;
808 ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
809 "r=" << r << dendl;
810
811 Mutex::Locker event_locker(m_event_lock);
812 typename Events::iterator it = m_events.find(tid);
813 if (it == m_events.end()) {
814 return;
815 }
816 complete_event(it, r);
817}
818
819template <typename I>
820void Journal<I>::commit_io_event_extent(uint64_t tid, uint64_t offset,
821 uint64_t length, int r) {
11fdf7f2 822 ceph_assert(length > 0);
7c673cae
FG
823
824 CephContext *cct = m_image_ctx.cct;
825 ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
826 << "offset=" << offset << ", "
827 << "length=" << length << ", "
828 << "r=" << r << dendl;
829
830 Mutex::Locker event_locker(m_event_lock);
831 typename Events::iterator it = m_events.find(tid);
832 if (it == m_events.end()) {
833 return;
834 }
835
836 Event &event = it->second;
837 if (event.ret_val == 0 && r < 0) {
838 event.ret_val = r;
839 }
840
841 ExtentInterval extent;
842 extent.insert(offset, length);
843
844 ExtentInterval intersect;
845 intersect.intersection_of(extent, event.pending_extents);
846
847 event.pending_extents.subtract(intersect);
848 if (!event.pending_extents.empty()) {
849 ldout(cct, 20) << this << " " << __func__ << ": "
850 << "pending extents: " << event.pending_extents << dendl;
851 return;
852 }
853 complete_event(it, event.ret_val);
854}
855
856template <typename I>
857void Journal<I>::append_op_event(uint64_t op_tid,
858 journal::EventEntry &&event_entry,
859 Context *on_safe) {
11fdf7f2 860 ceph_assert(m_image_ctx.owner_lock.is_locked());
7c673cae
FG
861
862 bufferlist bl;
863 event_entry.timestamp = ceph_clock_now();
11fdf7f2 864 encode(event_entry, bl);
7c673cae
FG
865
866 Future future;
867 {
868 Mutex::Locker locker(m_lock);
11fdf7f2 869 ceph_assert(m_state == STATE_READY);
7c673cae
FG
870
871 future = m_journaler->append(m_tag_tid, bl);
872
873 // delay committing op event to ensure consistent replay
11fdf7f2 874 ceph_assert(m_op_futures.count(op_tid) == 0);
7c673cae
FG
875 m_op_futures[op_tid] = future;
876 }
877
878 on_safe = create_async_context_callback(m_image_ctx, on_safe);
879 on_safe = new FunctionContext([this, on_safe](int r) {
880 // ensure all committed IO before this op is committed
881 m_journaler->flush_commit_position(on_safe);
882 });
883 future.flush(on_safe);
884
885 CephContext *cct = m_image_ctx.cct;
886 ldout(cct, 10) << this << " " << __func__ << ": "
887 << "op_tid=" << op_tid << ", "
888 << "event=" << event_entry.get_event_type() << dendl;
889}
890
891template <typename I>
892void Journal<I>::commit_op_event(uint64_t op_tid, int r, Context *on_safe) {
893 CephContext *cct = m_image_ctx.cct;
894 ldout(cct, 10) << this << " " << __func__ << ": op_tid=" << op_tid << ", "
895 << "r=" << r << dendl;
896
897 journal::EventEntry event_entry((journal::OpFinishEvent(op_tid, r)),
898 ceph_clock_now());
899
900 bufferlist bl;
11fdf7f2 901 encode(event_entry, bl);
7c673cae
FG
902
903 Future op_start_future;
904 Future op_finish_future;
905 {
906 Mutex::Locker locker(m_lock);
11fdf7f2 907 ceph_assert(m_state == STATE_READY);
7c673cae
FG
908
909 // ready to commit op event
910 auto it = m_op_futures.find(op_tid);
11fdf7f2 911 ceph_assert(it != m_op_futures.end());
7c673cae
FG
912 op_start_future = it->second;
913 m_op_futures.erase(it);
914
915 op_finish_future = m_journaler->append(m_tag_tid, bl);
916 }
917
918 op_finish_future.flush(create_async_context_callback(
919 m_image_ctx, new C_OpEventSafe(this, op_tid, op_start_future,
920 op_finish_future, on_safe)));
921}
922
923template <typename I>
924void Journal<I>::replay_op_ready(uint64_t op_tid, Context *on_resume) {
925 CephContext *cct = m_image_ctx.cct;
926 ldout(cct, 10) << this << " " << __func__ << ": op_tid=" << op_tid << dendl;
927
928 {
929 Mutex::Locker locker(m_lock);
11fdf7f2 930 ceph_assert(m_journal_replay != nullptr);
7c673cae
FG
931 m_journal_replay->replay_op_ready(op_tid, on_resume);
932 }
933}
934
935template <typename I>
936void Journal<I>::flush_event(uint64_t tid, Context *on_safe) {
937 CephContext *cct = m_image_ctx.cct;
938 ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
939 << "on_safe=" << on_safe << dendl;
940
941 Future future;
942 {
943 Mutex::Locker event_locker(m_event_lock);
944 future = wait_event(m_lock, tid, on_safe);
945 }
946
947 if (future.is_valid()) {
948 future.flush(nullptr);
949 }
950}
951
952template <typename I>
953void Journal<I>::wait_event(uint64_t tid, Context *on_safe) {
954 CephContext *cct = m_image_ctx.cct;
955 ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
956 << "on_safe=" << on_safe << dendl;
957
958 Mutex::Locker event_locker(m_event_lock);
959 wait_event(m_lock, tid, on_safe);
960}
961
962template <typename I>
963typename Journal<I>::Future Journal<I>::wait_event(Mutex &lock, uint64_t tid,
964 Context *on_safe) {
11fdf7f2 965 ceph_assert(m_event_lock.is_locked());
7c673cae
FG
966 CephContext *cct = m_image_ctx.cct;
967
968 typename Events::iterator it = m_events.find(tid);
11fdf7f2 969 ceph_assert(it != m_events.end());
7c673cae
FG
970
971 Event &event = it->second;
972 if (event.safe) {
973 // journal entry already safe
974 ldout(cct, 20) << this << " " << __func__ << ": "
975 << "journal entry already safe" << dendl;
976 m_image_ctx.op_work_queue->queue(on_safe, event.ret_val);
977 return Future();
978 }
979
980 event.on_safe_contexts.push_back(create_async_context_callback(m_image_ctx,
981 on_safe));
982 return event.futures.back();
983}
984
985template <typename I>
986void Journal<I>::start_external_replay(journal::Replay<I> **journal_replay,
987 Context *on_start) {
988 CephContext *cct = m_image_ctx.cct;
989 ldout(cct, 20) << this << " " << __func__ << dendl;
990
991 Mutex::Locker locker(m_lock);
11fdf7f2
TL
992 ceph_assert(m_state == STATE_READY);
993 ceph_assert(m_journal_replay == nullptr);
7c673cae
FG
994
995 on_start = util::create_async_context_callback(m_image_ctx, on_start);
996 on_start = new FunctionContext(
997 [this, journal_replay, on_start](int r) {
998 handle_start_external_replay(r, journal_replay, on_start);
999 });
1000
1001 // safely flush all in-flight events before starting external replay
1002 m_journaler->stop_append(util::create_async_context_callback(m_image_ctx,
1003 on_start));
1004}
1005
1006template <typename I>
1007void Journal<I>::handle_start_external_replay(int r,
1008 journal::Replay<I> **journal_replay,
1009 Context *on_finish) {
1010 CephContext *cct = m_image_ctx.cct;
1011 ldout(cct, 20) << this << " " << __func__ << dendl;
1012
1013 Mutex::Locker locker(m_lock);
11fdf7f2
TL
1014 ceph_assert(m_state == STATE_READY);
1015 ceph_assert(m_journal_replay == nullptr);
7c673cae
FG
1016
1017 if (r < 0) {
1018 lderr(cct) << this << " " << __func__ << ": "
1019 << "failed to stop recording: " << cpp_strerror(r) << dendl;
1020 *journal_replay = nullptr;
1021
1022 // get back to a sane-state
1023 start_append();
1024 on_finish->complete(r);
1025 return;
1026 }
1027
1028 transition_state(STATE_REPLAYING, 0);
1029 m_journal_replay = journal::Replay<I>::create(m_image_ctx);
1030 *journal_replay = m_journal_replay;
1031 on_finish->complete(0);
1032}
1033
1034template <typename I>
1035void Journal<I>::stop_external_replay() {
1036 CephContext *cct = m_image_ctx.cct;
1037 ldout(cct, 20) << this << " " << __func__ << dendl;
1038
1039 Mutex::Locker locker(m_lock);
11fdf7f2
TL
1040 ceph_assert(m_journal_replay != nullptr);
1041 ceph_assert(m_state == STATE_REPLAYING);
7c673cae
FG
1042
1043 delete m_journal_replay;
1044 m_journal_replay = nullptr;
1045
1046 if (m_close_pending) {
1047 destroy_journaler(0);
1048 return;
1049 }
1050
1051 start_append();
1052}
1053
1054template <typename I>
1055void Journal<I>::create_journaler() {
1056 CephContext *cct = m_image_ctx.cct;
1057 ldout(cct, 20) << this << " " << __func__ << dendl;
1058
11fdf7f2
TL
1059 ceph_assert(m_lock.is_locked());
1060 ceph_assert(m_state == STATE_UNINITIALIZED || m_state == STATE_RESTARTING_REPLAY);
1061 ceph_assert(m_journaler == NULL);
7c673cae
FG
1062
1063 transition_state(STATE_INITIALIZING, 0);
1064 ::journal::Settings settings;
11fdf7f2
TL
1065 settings.commit_interval =
1066 m_image_ctx.config.template get_val<double>("rbd_journal_commit_age");
1067 settings.max_payload_bytes =
1068 m_image_ctx.config.template get_val<Option::size_t>("rbd_journal_max_payload_bytes");
7c673cae 1069 settings.max_concurrent_object_sets =
11fdf7f2 1070 m_image_ctx.config.template get_val<uint64_t>("rbd_journal_max_concurrent_object_sets");
7c673cae
FG
1071 // TODO: a configurable filter to exclude certain peers from being
1072 // disconnected.
1073 settings.whitelisted_laggy_clients = {IMAGE_CLIENT_ID};
1074
1075 m_journaler = new Journaler(m_work_queue, m_timer, m_timer_lock,
1076 m_image_ctx.md_ctx, m_image_ctx.id,
1077 IMAGE_CLIENT_ID, settings);
1078 m_journaler->add_listener(&m_metadata_listener);
1079
1080 Context *ctx = create_async_context_callback(
1081 m_image_ctx, create_context_callback<
1082 Journal<I>, &Journal<I>::handle_open>(this));
1083 auto open_req = journal::OpenRequest<I>::create(&m_image_ctx, m_journaler,
1084 &m_lock, &m_client_meta,
1085 &m_tag_tid, &m_tag_data, ctx);
1086 open_req->send();
1087}
1088
1089template <typename I>
1090void Journal<I>::destroy_journaler(int r) {
1091 CephContext *cct = m_image_ctx.cct;
1092 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1093
11fdf7f2 1094 ceph_assert(m_lock.is_locked());
7c673cae
FG
1095
1096 delete m_journal_replay;
1097 m_journal_replay = NULL;
1098
1099 m_journaler->remove_listener(&m_metadata_listener);
1100
1101 transition_state(STATE_CLOSING, r);
1102
1103 Context *ctx = create_async_context_callback(
1104 m_image_ctx, create_context_callback<
1105 Journal<I>, &Journal<I>::handle_journal_destroyed>(this));
1106 ctx = new FunctionContext(
1107 [this, ctx](int r) {
1108 Mutex::Locker locker(m_lock);
1109 m_journaler->shut_down(ctx);
1110 });
1111 m_async_journal_op_tracker.wait(m_image_ctx, ctx);
1112}
1113
1114template <typename I>
1115void Journal<I>::recreate_journaler(int r) {
1116 CephContext *cct = m_image_ctx.cct;
1117 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1118
11fdf7f2
TL
1119 ceph_assert(m_lock.is_locked());
1120 ceph_assert(m_state == STATE_FLUSHING_RESTART ||
1121 m_state == STATE_FLUSHING_REPLAY);
7c673cae
FG
1122
1123 delete m_journal_replay;
1124 m_journal_replay = NULL;
1125
1126 m_journaler->remove_listener(&m_metadata_listener);
1127
1128 transition_state(STATE_RESTARTING_REPLAY, r);
1129 m_journaler->shut_down(create_async_context_callback(
1130 m_image_ctx, create_context_callback<
1131 Journal<I>, &Journal<I>::handle_journal_destroyed>(this)));
1132}
1133
1134template <typename I>
1135void Journal<I>::complete_event(typename Events::iterator it, int r) {
11fdf7f2
TL
1136 ceph_assert(m_event_lock.is_locked());
1137 ceph_assert(m_state == STATE_READY);
7c673cae
FG
1138
1139 CephContext *cct = m_image_ctx.cct;
1140 ldout(cct, 20) << this << " " << __func__ << ": tid=" << it->first << " "
1141 << "r=" << r << dendl;
1142
1143 Event &event = it->second;
b32b8144
FG
1144 if (r < 0 && r == event.filter_ret_val) {
1145 // ignore allowed error codes
1146 r = 0;
1147 }
7c673cae
FG
1148 if (r < 0) {
1149 // event recorded to journal but failed to update disk, we cannot
1150 // commit this IO event. this event must be replayed.
11fdf7f2 1151 ceph_assert(event.safe);
7c673cae
FG
1152 lderr(cct) << this << " " << __func__ << ": "
1153 << "failed to commit IO to disk, replay required: "
1154 << cpp_strerror(r) << dendl;
1155 }
1156
1157 event.committed_io = true;
1158 if (event.safe) {
1159 if (r >= 0) {
1160 for (auto &future : event.futures) {
1161 m_journaler->committed(future);
1162 }
1163 }
1164 m_events.erase(it);
1165 }
1166}
1167
1168template <typename I>
1169void Journal<I>::start_append() {
11fdf7f2
TL
1170 ceph_assert(m_lock.is_locked());
1171 m_journaler->start_append(
1172 m_image_ctx.config.template get_val<uint64_t>("rbd_journal_object_flush_interval"),
1173 m_image_ctx.config.template get_val<Option::size_t>("rbd_journal_object_flush_bytes"),
1174 m_image_ctx.config.template get_val<double>("rbd_journal_object_flush_age"),
1175 m_image_ctx.config.template get_val<uint64_t>("rbd_journal_object_max_in_flight_appends"));
7c673cae
FG
1176 transition_state(STATE_READY, 0);
1177}
1178
1179template <typename I>
1180void Journal<I>::handle_open(int r) {
1181 CephContext *cct = m_image_ctx.cct;
1182 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1183
1184 Mutex::Locker locker(m_lock);
11fdf7f2 1185 ceph_assert(m_state == STATE_INITIALIZING);
7c673cae
FG
1186
1187 if (r < 0) {
1188 lderr(cct) << this << " " << __func__ << ": "
1189 << "failed to initialize journal: " << cpp_strerror(r)
1190 << dendl;
1191 destroy_journaler(r);
1192 return;
1193 }
1194
1195 m_tag_class = m_client_meta.tag_class;
1196 m_max_append_size = m_journaler->get_max_append_size();
1197 ldout(cct, 20) << this << " " << __func__ << ": "
1198 << "tag_class=" << m_tag_class << ", "
1199 << "max_append_size=" << m_max_append_size << dendl;
1200
1201 transition_state(STATE_REPLAYING, 0);
1202 m_journal_replay = journal::Replay<I>::create(m_image_ctx);
1203 m_journaler->start_replay(&m_replay_handler);
1204}
1205
1206template <typename I>
1207void Journal<I>::handle_replay_ready() {
1208 CephContext *cct = m_image_ctx.cct;
1209 ReplayEntry replay_entry;
1210 {
1211 Mutex::Locker locker(m_lock);
1212 if (m_state != STATE_REPLAYING) {
1213 return;
1214 }
1215
1216 ldout(cct, 20) << this << " " << __func__ << dendl;
1217 if (!m_journaler->try_pop_front(&replay_entry)) {
1218 return;
1219 }
1220
1221 // only one entry should be in-flight at a time
11fdf7f2 1222 ceph_assert(!m_processing_entry);
7c673cae
FG
1223 m_processing_entry = true;
1224 }
1225
1226 bufferlist data = replay_entry.get_data();
11fdf7f2 1227 auto it = data.cbegin();
7c673cae
FG
1228
1229 journal::EventEntry event_entry;
1230 int r = m_journal_replay->decode(&it, &event_entry);
1231 if (r < 0) {
1232 lderr(cct) << this << " " << __func__ << ": "
1233 << "failed to decode journal event entry" << dendl;
1234 handle_replay_process_safe(replay_entry, r);
1235 return;
1236 }
1237
1238 Context *on_ready = create_context_callback<
1239 Journal<I>, &Journal<I>::handle_replay_process_ready>(this);
1240 Context *on_commit = new C_ReplayProcessSafe(this, std::move(replay_entry));
1241 m_journal_replay->process(event_entry, on_ready, on_commit);
1242}
1243
1244template <typename I>
1245void Journal<I>::handle_replay_complete(int r) {
1246 CephContext *cct = m_image_ctx.cct;
1247
1248 bool cancel_ops = false;
1249 {
1250 Mutex::Locker locker(m_lock);
1251 if (m_state != STATE_REPLAYING) {
1252 return;
1253 }
1254
1255 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1256 if (r < 0) {
1257 cancel_ops = true;
1258 transition_state(STATE_FLUSHING_RESTART, r);
1259 } else {
1260 // state might change back to FLUSHING_RESTART on flush error
1261 transition_state(STATE_FLUSHING_REPLAY, 0);
1262 }
1263 }
1264
1265 Context *ctx = new FunctionContext([this, cct](int r) {
1266 ldout(cct, 20) << this << " handle_replay_complete: "
1267 << "handle shut down replay" << dendl;
1268
1269 State state;
1270 {
1271 Mutex::Locker locker(m_lock);
11fdf7f2
TL
1272 ceph_assert(m_state == STATE_FLUSHING_RESTART ||
1273 m_state == STATE_FLUSHING_REPLAY);
7c673cae
FG
1274 state = m_state;
1275 }
1276
1277 if (state == STATE_FLUSHING_RESTART) {
1278 handle_flushing_restart(0);
1279 } else {
1280 handle_flushing_replay();
1281 }
1282 });
94b18763
FG
1283 ctx = new FunctionContext([this, ctx](int r) {
1284 // ensure the commit position is flushed to disk
1285 m_journaler->flush_commit_position(ctx);
1286 });
7c673cae
FG
1287 ctx = new FunctionContext([this, cct, cancel_ops, ctx](int r) {
1288 ldout(cct, 20) << this << " handle_replay_complete: "
1289 << "shut down replay" << dendl;
1290 m_journal_replay->shut_down(cancel_ops, ctx);
1291 });
1292 m_journaler->stop_replay(ctx);
1293}
1294
1295template <typename I>
1296void Journal<I>::handle_replay_process_ready(int r) {
1297 // journal::Replay is ready for more events -- attempt to pop another
1298 CephContext *cct = m_image_ctx.cct;
1299 ldout(cct, 20) << this << " " << __func__ << dendl;
1300
11fdf7f2 1301 ceph_assert(r == 0);
7c673cae
FG
1302 {
1303 Mutex::Locker locker(m_lock);
11fdf7f2 1304 ceph_assert(m_processing_entry);
7c673cae
FG
1305 m_processing_entry = false;
1306 }
1307 handle_replay_ready();
1308}
1309
1310template <typename I>
1311void Journal<I>::handle_replay_process_safe(ReplayEntry replay_entry, int r) {
1312 CephContext *cct = m_image_ctx.cct;
1313
1314 m_lock.Lock();
11fdf7f2
TL
1315 ceph_assert(m_state == STATE_REPLAYING ||
1316 m_state == STATE_FLUSHING_RESTART ||
1317 m_state == STATE_FLUSHING_REPLAY);
7c673cae
FG
1318
1319 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1320 if (r < 0) {
31f18b77
FG
1321 if (r != -ECANCELED) {
1322 lderr(cct) << this << " " << __func__ << ": "
1323 << "failed to commit journal event to disk: "
1324 << cpp_strerror(r) << dendl;
1325 }
7c673cae
FG
1326
1327 if (m_state == STATE_REPLAYING) {
1328 // abort the replay if we have an error
1329 transition_state(STATE_FLUSHING_RESTART, r);
1330 m_lock.Unlock();
1331
1332 // stop replay, shut down, and restart
94b18763
FG
1333 Context* ctx = create_context_callback<
1334 Journal<I>, &Journal<I>::handle_flushing_restart>(this);
1335 ctx = new FunctionContext([this, ctx](int r) {
1336 // ensure the commit position is flushed to disk
1337 m_journaler->flush_commit_position(ctx);
1338 });
1339 ctx = new FunctionContext([this, cct, ctx](int r) {
7c673cae
FG
1340 ldout(cct, 20) << this << " handle_replay_process_safe: "
1341 << "shut down replay" << dendl;
1342 {
1343 Mutex::Locker locker(m_lock);
11fdf7f2 1344 ceph_assert(m_state == STATE_FLUSHING_RESTART);
7c673cae
FG
1345 }
1346
94b18763 1347 m_journal_replay->shut_down(true, ctx);
7c673cae
FG
1348 });
1349 m_journaler->stop_replay(ctx);
1350 return;
1351 } else if (m_state == STATE_FLUSHING_REPLAY) {
1352 // end-of-replay flush in-progress -- we need to restart replay
1353 transition_state(STATE_FLUSHING_RESTART, r);
1354 m_lock.Unlock();
1355 return;
1356 }
1357 } else {
1358 // only commit the entry if written successfully
1359 m_journaler->committed(replay_entry);
1360 }
1361 m_lock.Unlock();
1362}
1363
1364template <typename I>
1365void Journal<I>::handle_flushing_restart(int r) {
1366 Mutex::Locker locker(m_lock);
1367
1368 CephContext *cct = m_image_ctx.cct;
1369 ldout(cct, 20) << this << " " << __func__ << dendl;
1370
11fdf7f2
TL
1371 ceph_assert(r == 0);
1372 ceph_assert(m_state == STATE_FLUSHING_RESTART);
7c673cae
FG
1373 if (m_close_pending) {
1374 destroy_journaler(r);
1375 return;
1376 }
1377
1378 recreate_journaler(r);
1379}
1380
1381template <typename I>
1382void Journal<I>::handle_flushing_replay() {
1383 Mutex::Locker locker(m_lock);
1384
1385 CephContext *cct = m_image_ctx.cct;
1386 ldout(cct, 20) << this << " " << __func__ << dendl;
1387
11fdf7f2
TL
1388 ceph_assert(m_state == STATE_FLUSHING_REPLAY ||
1389 m_state == STATE_FLUSHING_RESTART);
7c673cae
FG
1390 if (m_close_pending) {
1391 destroy_journaler(0);
1392 return;
1393 } else if (m_state == STATE_FLUSHING_RESTART) {
1394 // failed to replay one-or-more events -- restart
1395 recreate_journaler(0);
1396 return;
1397 }
1398
1399 delete m_journal_replay;
1400 m_journal_replay = NULL;
1401
1402 m_error_result = 0;
1403 start_append();
1404}
1405
1406template <typename I>
1407void Journal<I>::handle_recording_stopped(int r) {
1408 CephContext *cct = m_image_ctx.cct;
1409 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1410
1411 Mutex::Locker locker(m_lock);
11fdf7f2 1412 ceph_assert(m_state == STATE_STOPPING);
7c673cae
FG
1413
1414 destroy_journaler(r);
1415}
1416
1417template <typename I>
1418void Journal<I>::handle_journal_destroyed(int r) {
1419 CephContext *cct = m_image_ctx.cct;
1420 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1421
1422 if (r < 0) {
1423 lderr(cct) << this << " " << __func__
1424 << "error detected while closing journal: " << cpp_strerror(r)
1425 << dendl;
1426 }
1427
1428 Mutex::Locker locker(m_lock);
1429 delete m_journaler;
1430 m_journaler = nullptr;
1431
11fdf7f2 1432 ceph_assert(m_state == STATE_CLOSING || m_state == STATE_RESTARTING_REPLAY);
7c673cae
FG
1433 if (m_state == STATE_RESTARTING_REPLAY) {
1434 create_journaler();
1435 return;
1436 }
1437
1438 transition_state(STATE_CLOSED, r);
1439}
1440
1441template <typename I>
1442void Journal<I>::handle_io_event_safe(int r, uint64_t tid) {
1443 CephContext *cct = m_image_ctx.cct;
1444 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << ", "
1445 << "tid=" << tid << dendl;
1446
1447 // journal will be flushed before closing
11fdf7f2 1448 ceph_assert(m_state == STATE_READY || m_state == STATE_STOPPING);
7c673cae
FG
1449 if (r < 0) {
1450 lderr(cct) << this << " " << __func__ << ": "
1451 << "failed to commit IO event: " << cpp_strerror(r) << dendl;
1452 }
1453
7c673cae
FG
1454 Contexts on_safe_contexts;
1455 {
1456 Mutex::Locker event_locker(m_event_lock);
1457 typename Events::iterator it = m_events.find(tid);
11fdf7f2 1458 ceph_assert(it != m_events.end());
7c673cae
FG
1459
1460 Event &event = it->second;
7c673cae
FG
1461 on_safe_contexts.swap(event.on_safe_contexts);
1462
1463 if (r < 0 || event.committed_io) {
1464 // failed journal write so IO won't be sent -- or IO extent was
1465 // overwritten by future IO operations so this was a no-op IO event
1466 event.ret_val = r;
1467 for (auto &future : event.futures) {
1468 m_journaler->committed(future);
1469 }
1470 }
1471
1472 if (event.committed_io) {
1473 m_events.erase(it);
1474 } else {
1475 event.safe = true;
1476 }
1477 }
1478
1479 ldout(cct, 20) << this << " " << __func__ << ": "
1480 << "completing tid=" << tid << dendl;
7c673cae
FG
1481
1482 // alert the cache about the journal event status
1483 for (Contexts::iterator it = on_safe_contexts.begin();
1484 it != on_safe_contexts.end(); ++it) {
1485 (*it)->complete(r);
1486 }
1487}
1488
1489template <typename I>
1490void Journal<I>::handle_op_event_safe(int r, uint64_t tid,
1491 const Future &op_start_future,
1492 const Future &op_finish_future,
1493 Context *on_safe) {
1494 CephContext *cct = m_image_ctx.cct;
1495 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << ", "
1496 << "tid=" << tid << dendl;
1497
1498 // journal will be flushed before closing
11fdf7f2 1499 ceph_assert(m_state == STATE_READY || m_state == STATE_STOPPING);
7c673cae
FG
1500 if (r < 0) {
1501 lderr(cct) << this << " " << __func__ << ": "
1502 << "failed to commit op event: " << cpp_strerror(r) << dendl;
1503 }
1504
1505 m_journaler->committed(op_start_future);
1506 m_journaler->committed(op_finish_future);
1507
1508 // reduce the replay window after committing an op event
1509 m_journaler->flush_commit_position(on_safe);
1510}
1511
1512template <typename I>
1513void Journal<I>::stop_recording() {
11fdf7f2
TL
1514 ceph_assert(m_lock.is_locked());
1515 ceph_assert(m_journaler != NULL);
7c673cae 1516
11fdf7f2 1517 ceph_assert(m_state == STATE_READY);
7c673cae
FG
1518 transition_state(STATE_STOPPING, 0);
1519
1520 m_journaler->stop_append(util::create_async_context_callback(
1521 m_image_ctx, create_context_callback<
1522 Journal<I>, &Journal<I>::handle_recording_stopped>(this)));
1523}
1524
1525template <typename I>
1526void Journal<I>::transition_state(State state, int r) {
1527 CephContext *cct = m_image_ctx.cct;
1528 ldout(cct, 20) << this << " " << __func__ << ": new state=" << state << dendl;
11fdf7f2 1529 ceph_assert(m_lock.is_locked());
7c673cae
FG
1530 m_state = state;
1531
1532 if (m_error_result == 0 && r < 0) {
1533 m_error_result = r;
1534 }
1535
1536 if (is_steady_state()) {
1537 Contexts wait_for_state_contexts(std::move(m_wait_for_state_contexts));
1538 for (auto ctx : wait_for_state_contexts) {
1539 ctx->complete(m_error_result);
1540 }
1541 }
1542}
1543
1544template <typename I>
1545bool Journal<I>::is_steady_state() const {
11fdf7f2 1546 ceph_assert(m_lock.is_locked());
7c673cae
FG
1547 switch (m_state) {
1548 case STATE_READY:
1549 case STATE_CLOSED:
1550 return true;
1551 case STATE_UNINITIALIZED:
1552 case STATE_INITIALIZING:
1553 case STATE_REPLAYING:
1554 case STATE_FLUSHING_RESTART:
1555 case STATE_RESTARTING_REPLAY:
1556 case STATE_FLUSHING_REPLAY:
1557 case STATE_STOPPING:
1558 case STATE_CLOSING:
1559 break;
1560 }
1561 return false;
1562}
1563
1564template <typename I>
1565void Journal<I>::wait_for_steady_state(Context *on_state) {
11fdf7f2
TL
1566 ceph_assert(m_lock.is_locked());
1567 ceph_assert(!is_steady_state());
7c673cae
FG
1568
1569 CephContext *cct = m_image_ctx.cct;
1570 ldout(cct, 20) << this << " " << __func__ << ": on_state=" << on_state
1571 << dendl;
1572 m_wait_for_state_contexts.push_back(on_state);
1573}
1574
1575template <typename I>
1576int Journal<I>::is_resync_requested(bool *do_resync) {
1577 Mutex::Locker l(m_lock);
1578 return check_resync_requested(do_resync);
1579}
1580
1581template <typename I>
1582int Journal<I>::check_resync_requested(bool *do_resync) {
1583 CephContext *cct = m_image_ctx.cct;
1584 ldout(cct, 20) << this << " " << __func__ << dendl;
1585
11fdf7f2
TL
1586 ceph_assert(m_lock.is_locked());
1587 ceph_assert(do_resync != nullptr);
7c673cae
FG
1588
1589 cls::journal::Client client;
1590 int r = m_journaler->get_cached_client(IMAGE_CLIENT_ID, &client);
1591 if (r < 0) {
1592 lderr(cct) << this << " " << __func__ << ": "
1593 << "failed to retrieve client: " << cpp_strerror(r) << dendl;
1594 return r;
1595 }
1596
1597 librbd::journal::ClientData client_data;
11fdf7f2 1598 auto bl_it = client.data.cbegin();
7c673cae 1599 try {
11fdf7f2 1600 decode(client_data, bl_it);
7c673cae
FG
1601 } catch (const buffer::error &err) {
1602 lderr(cct) << this << " " << __func__ << ": "
1603 << "failed to decode client data: " << err << dendl;
1604 return -EINVAL;
1605 }
1606
1607 journal::ImageClientMeta *image_client_meta =
1608 boost::get<journal::ImageClientMeta>(&client_data.client_meta);
1609 if (image_client_meta == nullptr) {
1610 lderr(cct) << this << " " << __func__ << ": "
1611 << "failed to access image client meta struct" << dendl;
1612 return -EINVAL;
1613 }
1614
1615 *do_resync = image_client_meta->resync_requested;
1616
1617 return 0;
1618}
1619
1620struct C_RefreshTags : public Context {
1621 util::AsyncOpTracker &async_op_tracker;
1622 Context *on_finish = nullptr;
1623
1624 Mutex lock;
11fdf7f2 1625 uint64_t tag_tid = 0;
7c673cae
FG
1626 journal::TagData tag_data;
1627
11fdf7f2 1628 explicit C_RefreshTags(util::AsyncOpTracker &async_op_tracker)
7c673cae
FG
1629 : async_op_tracker(async_op_tracker),
1630 lock("librbd::Journal::C_RefreshTags::lock") {
1631 async_op_tracker.start_op();
1632 }
1633 ~C_RefreshTags() override {
1634 async_op_tracker.finish_op();
1635 }
1636
1637 void finish(int r) override {
1638 on_finish->complete(r);
1639 }
1640};
1641
1642template <typename I>
1643void Journal<I>::handle_metadata_updated() {
1644 CephContext *cct = m_image_ctx.cct;
1645 Mutex::Locker locker(m_lock);
1646
1647 if (m_state != STATE_READY && !is_journal_replaying(m_lock)) {
1648 return;
1649 } else if (is_tag_owner(m_lock)) {
1650 ldout(cct, 20) << this << " " << __func__ << ": primary image" << dendl;
1651 return;
1652 } else if (m_listeners.empty()) {
1653 ldout(cct, 20) << this << " " << __func__ << ": no listeners" << dendl;
1654 return;
1655 }
1656
1657 uint64_t refresh_sequence = ++m_refresh_sequence;
1658 ldout(cct, 20) << this << " " << __func__ << ": "
1659 << "refresh_sequence=" << refresh_sequence << dendl;
1660
1661 // pull the most recent tags from the journal, decode, and
1662 // update the internal tag state
1663 C_RefreshTags *refresh_ctx = new C_RefreshTags(m_async_journal_op_tracker);
1664 refresh_ctx->on_finish = new FunctionContext(
1665 [this, refresh_sequence, refresh_ctx](int r) {
1666 handle_refresh_metadata(refresh_sequence, refresh_ctx->tag_tid,
1667 refresh_ctx->tag_data, r);
1668 });
1669 C_DecodeTags *decode_tags_ctx = new C_DecodeTags(
1670 cct, &refresh_ctx->lock, &refresh_ctx->tag_tid,
1671 &refresh_ctx->tag_data, refresh_ctx);
1672 m_journaler->get_tags(m_tag_tid == 0 ? 0 : m_tag_tid - 1, m_tag_class,
1673 &decode_tags_ctx->tags, decode_tags_ctx);
1674}
1675
1676template <typename I>
1677void Journal<I>::handle_refresh_metadata(uint64_t refresh_sequence,
1678 uint64_t tag_tid,
1679 journal::TagData tag_data, int r) {
1680 CephContext *cct = m_image_ctx.cct;
1681 Mutex::Locker locker(m_lock);
1682
1683 if (r < 0) {
1684 lderr(cct) << this << " " << __func__ << ": failed to refresh metadata: "
1685 << cpp_strerror(r) << dendl;
1686 return;
1687 } else if (m_state != STATE_READY && !is_journal_replaying(m_lock)) {
1688 return;
1689 } else if (refresh_sequence != m_refresh_sequence) {
1690 // another, more up-to-date refresh is in-flight
1691 return;
1692 }
1693
1694 ldout(cct, 20) << this << " " << __func__ << ": "
1695 << "refresh_sequence=" << refresh_sequence << ", "
1696 << "tag_tid=" << tag_tid << ", "
1697 << "tag_data=" << tag_data << dendl;
1698 while (m_listener_notify) {
1699 m_listener_cond.Wait(m_lock);
1700 }
1701
1702 bool was_tag_owner = is_tag_owner(m_lock);
1703 if (m_tag_tid < tag_tid) {
1704 m_tag_tid = tag_tid;
1705 m_tag_data = tag_data;
1706 }
1707 bool promoted_to_primary = (!was_tag_owner && is_tag_owner(m_lock));
1708
1709 bool resync_requested = false;
1710 r = check_resync_requested(&resync_requested);
1711 if (r < 0) {
1712 lderr(cct) << this << " " << __func__ << ": "
1713 << "failed to check if a resync was requested" << dendl;
1714 return;
1715 }
1716
1717 Listeners listeners(m_listeners);
1718 m_listener_notify = true;
1719 m_lock.Unlock();
1720
1721 if (promoted_to_primary) {
1722 for (auto listener : listeners) {
1723 listener->handle_promoted();
1724 }
1725 } else if (resync_requested) {
1726 for (auto listener : listeners) {
1727 listener->handle_resync();
1728 }
1729 }
1730
1731 m_lock.Lock();
1732 m_listener_notify = false;
1733 m_listener_cond.Signal();
1734}
1735
1736template <typename I>
1737void Journal<I>::add_listener(journal::Listener *listener) {
1738 Mutex::Locker locker(m_lock);
1739 m_listeners.insert(listener);
1740}
1741
1742template <typename I>
1743void Journal<I>::remove_listener(journal::Listener *listener) {
1744 Mutex::Locker locker(m_lock);
1745 while (m_listener_notify) {
1746 m_listener_cond.Wait(m_lock);
1747 }
1748 m_listeners.erase(listener);
1749}
1750
1751} // namespace librbd
1752
1753#ifndef TEST_F
1754template class librbd::Journal<librbd::ImageCtx>;
1755#endif