]> git.proxmox.com Git - ceph.git/blob - ceph/src/librbd/Journal.cc
update sources to v12.2.3
[ceph.git] / ceph / src / librbd / Journal.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "librbd/Journal.h"
5 #include "include/rados/librados.hpp"
6 #include "common/errno.h"
7 #include "common/Timer.h"
8 #include "common/WorkQueue.h"
9 #include "cls/journal/cls_journal_types.h"
10 #include "journal/Journaler.h"
11 #include "journal/Policy.h"
12 #include "journal/ReplayEntry.h"
13 #include "journal/Settings.h"
14 #include "journal/Utils.h"
15 #include "librbd/ExclusiveLock.h"
16 #include "librbd/ImageCtx.h"
17 #include "librbd/io/ImageRequestWQ.h"
18 #include "librbd/io/ObjectRequest.h"
19 #include "librbd/journal/CreateRequest.h"
20 #include "librbd/journal/DemoteRequest.h"
21 #include "librbd/journal/OpenRequest.h"
22 #include "librbd/journal/RemoveRequest.h"
23 #include "librbd/journal/Replay.h"
24 #include "librbd/journal/PromoteRequest.h"
25
26 #include <boost/scope_exit.hpp>
27 #include <utility>
28
29 #define dout_subsys ceph_subsys_rbd
30 #undef dout_prefix
31 #define dout_prefix *_dout << "librbd::Journal: "
32
33 namespace librbd {
34
35 using util::create_async_context_callback;
36 using util::create_context_callback;
37 using journal::util::C_DecodeTag;
38 using journal::util::C_DecodeTags;
39
40 namespace {
41
42 // TODO: once journaler is 100% async, remove separate threads and
43 // reuse ImageCtx's thread pool
44 class ThreadPoolSingleton : public ThreadPool {
45 public:
46 explicit ThreadPoolSingleton(CephContext *cct)
47 : ThreadPool(cct, "librbd::Journal", "tp_librbd_journ", 1) {
48 start();
49 }
50 ~ThreadPoolSingleton() override {
51 stop();
52 }
53 };
54
55 template <typename I>
56 struct C_IsTagOwner : public Context {
57 librados::IoCtx &io_ctx;
58 std::string image_id;
59 bool *is_tag_owner;
60 ContextWQ *op_work_queue;
61 Context *on_finish;
62
63 CephContext *cct = nullptr;
64 Journaler *journaler;
65 cls::journal::Client client;
66 journal::ImageClientMeta client_meta;
67 uint64_t tag_tid;
68 journal::TagData tag_data;
69
70 C_IsTagOwner(librados::IoCtx &io_ctx, const std::string &image_id,
71 bool *is_tag_owner, ContextWQ *op_work_queue, Context *on_finish)
72 : io_ctx(io_ctx), image_id(image_id), is_tag_owner(is_tag_owner),
73 op_work_queue(op_work_queue), on_finish(on_finish),
74 cct(reinterpret_cast<CephContext*>(io_ctx.cct())),
75 journaler(new Journaler(io_ctx, image_id, Journal<>::IMAGE_CLIENT_ID,
76 {})) {
77 }
78
79 void finish(int r) override {
80 ldout(cct, 20) << this << " C_IsTagOwner::" << __func__ << ": r=" << r
81 << dendl;
82 if (r < 0) {
83 lderr(cct) << this << " C_IsTagOwner::" << __func__ << ": "
84 << "failed to get tag owner: " << cpp_strerror(r) << dendl;
85 } else {
86 *is_tag_owner = (tag_data.mirror_uuid == Journal<>::LOCAL_MIRROR_UUID);
87 }
88
89 Journaler *journaler = this->journaler;
90 Context *on_finish = this->on_finish;
91 FunctionContext *ctx = new FunctionContext(
92 [journaler, on_finish](int r) {
93 on_finish->complete(r);
94 delete journaler;
95 });
96 op_work_queue->queue(ctx, r);
97 }
98 };
99
100 struct C_GetTagOwner : public Context {
101 std::string *mirror_uuid;
102 Context *on_finish;
103
104 Journaler journaler;
105 cls::journal::Client client;
106 journal::ImageClientMeta client_meta;
107 uint64_t tag_tid;
108 journal::TagData tag_data;
109
110 C_GetTagOwner(librados::IoCtx &io_ctx, const std::string &image_id,
111 std::string *mirror_uuid, Context *on_finish)
112 : mirror_uuid(mirror_uuid), on_finish(on_finish),
113 journaler(io_ctx, image_id, Journal<>::IMAGE_CLIENT_ID, {}) {
114 }
115
116 virtual void finish(int r) {
117 if (r >= 0) {
118 *mirror_uuid = tag_data.mirror_uuid;
119 }
120 on_finish->complete(r);
121 }
122 };
123
124 template <typename J>
125 struct GetTagsRequest {
126 CephContext *cct;
127 J *journaler;
128 cls::journal::Client *client;
129 journal::ImageClientMeta *client_meta;
130 uint64_t *tag_tid;
131 journal::TagData *tag_data;
132 Context *on_finish;
133
134 Mutex lock;
135
136 GetTagsRequest(CephContext *cct, J *journaler, cls::journal::Client *client,
137 journal::ImageClientMeta *client_meta, uint64_t *tag_tid,
138 journal::TagData *tag_data, Context *on_finish)
139 : cct(cct), journaler(journaler), client(client), client_meta(client_meta),
140 tag_tid(tag_tid), tag_data(tag_data), on_finish(on_finish), lock("lock") {
141 }
142
143 /**
144 * @verbatim
145 *
146 * <start>
147 * |
148 * v
149 * GET_CLIENT * * * * * * * * * * * *
150 * | *
151 * v *
152 * GET_TAGS * * * * * * * * * * * * * (error)
153 * | *
154 * v *
155 * <finish> * * * * * * * * * * * * *
156 *
157 * @endverbatim
158 */
159
160 void send() {
161 send_get_client();
162 }
163
164 void send_get_client() {
165 ldout(cct, 20) << __func__ << dendl;
166
167 FunctionContext *ctx = new FunctionContext(
168 [this](int r) {
169 handle_get_client(r);
170 });
171 journaler->get_client(Journal<ImageCtx>::IMAGE_CLIENT_ID, client, ctx);
172 }
173
174 void handle_get_client(int r) {
175 ldout(cct, 20) << __func__ << ": r=" << r << dendl;
176
177 if (r < 0) {
178 complete(r);
179 return;
180 }
181
182 librbd::journal::ClientData client_data;
183 bufferlist::iterator bl_it = client->data.begin();
184 try {
185 ::decode(client_data, bl_it);
186 } catch (const buffer::error &err) {
187 lderr(cct) << this << " OpenJournalerRequest::" << __func__ << ": "
188 << "failed to decode client data" << dendl;
189 complete(-EBADMSG);
190 return;
191 }
192
193 journal::ImageClientMeta *image_client_meta =
194 boost::get<journal::ImageClientMeta>(&client_data.client_meta);
195 if (image_client_meta == nullptr) {
196 lderr(cct) << this << " OpenJournalerRequest::" << __func__ << ": "
197 << "failed to get client meta" << dendl;
198 complete(-EINVAL);
199 return;
200 }
201 *client_meta = *image_client_meta;
202
203 send_get_tags();
204 }
205
206 void send_get_tags() {
207 ldout(cct, 20) << __func__ << dendl;
208
209 FunctionContext *ctx = new FunctionContext(
210 [this](int r) {
211 handle_get_tags(r);
212 });
213 C_DecodeTags *tags_ctx = new C_DecodeTags(cct, &lock, tag_tid, tag_data,
214 ctx);
215 journaler->get_tags(client_meta->tag_class, &tags_ctx->tags, tags_ctx);
216 }
217
218 void handle_get_tags(int r) {
219 ldout(cct, 20) << __func__ << ": r=" << r << dendl;
220
221 complete(r);
222 }
223
224 void complete(int r) {
225 on_finish->complete(r);
226 delete this;
227 }
228 };
229
230 template <typename J>
231 void get_tags(CephContext *cct, J *journaler,
232 cls::journal::Client *client,
233 journal::ImageClientMeta *client_meta,
234 uint64_t *tag_tid, journal::TagData *tag_data,
235 Context *on_finish) {
236 ldout(cct, 20) << __func__ << dendl;
237
238 GetTagsRequest<J> *req =
239 new GetTagsRequest<J>(cct, journaler, client, client_meta, tag_tid,
240 tag_data, on_finish);
241 req->send();
242 }
243
244 template <typename J>
245 int allocate_journaler_tag(CephContext *cct, J *journaler,
246 uint64_t tag_class,
247 const journal::TagPredecessor &predecessor,
248 const std::string &mirror_uuid,
249 cls::journal::Tag *new_tag) {
250 journal::TagData tag_data;
251 tag_data.mirror_uuid = mirror_uuid;
252 tag_data.predecessor = predecessor;
253
254 bufferlist tag_bl;
255 ::encode(tag_data, tag_bl);
256
257 C_SaferCond allocate_tag_ctx;
258 journaler->allocate_tag(tag_class, tag_bl, new_tag, &allocate_tag_ctx);
259
260 int r = allocate_tag_ctx.wait();
261 if (r < 0) {
262 lderr(cct) << __func__ << ": "
263 << "failed to allocate tag: " << cpp_strerror(r) << dendl;
264 return r;
265 }
266 return 0;
267 }
268
269 } // anonymous namespace
270
271 // client id for local image
272 template <typename I>
273 const std::string Journal<I>::IMAGE_CLIENT_ID("");
274
275 // mirror uuid to use for local images
276 template <typename I>
277 const std::string Journal<I>::LOCAL_MIRROR_UUID("");
278
279 // mirror uuid to use for orphaned (demoted) images
280 template <typename I>
281 const std::string Journal<I>::ORPHAN_MIRROR_UUID("<orphan>");
282
283 template <typename I>
284 std::ostream &operator<<(std::ostream &os,
285 const typename Journal<I>::State &state) {
286 switch (state) {
287 case Journal<I>::STATE_UNINITIALIZED:
288 os << "Uninitialized";
289 break;
290 case Journal<I>::STATE_INITIALIZING:
291 os << "Initializing";
292 break;
293 case Journal<I>::STATE_REPLAYING:
294 os << "Replaying";
295 break;
296 case Journal<I>::STATE_FLUSHING_RESTART:
297 os << "FlushingRestart";
298 break;
299 case Journal<I>::STATE_RESTARTING_REPLAY:
300 os << "RestartingReplay";
301 break;
302 case Journal<I>::STATE_FLUSHING_REPLAY:
303 os << "FlushingReplay";
304 break;
305 case Journal<I>::STATE_READY:
306 os << "Ready";
307 break;
308 case Journal<I>::STATE_STOPPING:
309 os << "Stopping";
310 break;
311 case Journal<I>::STATE_CLOSING:
312 os << "Closing";
313 break;
314 case Journal<I>::STATE_CLOSED:
315 os << "Closed";
316 break;
317 default:
318 os << "Unknown (" << static_cast<uint32_t>(state) << ")";
319 break;
320 }
321 return os;
322 }
323
324 template <typename I>
325 Journal<I>::Journal(I &image_ctx)
326 : m_image_ctx(image_ctx), m_journaler(NULL),
327 m_lock("Journal<I>::m_lock"), m_state(STATE_UNINITIALIZED),
328 m_error_result(0), m_replay_handler(this), m_close_pending(false),
329 m_event_lock("Journal<I>::m_event_lock"), m_event_tid(0),
330 m_blocking_writes(false), m_journal_replay(NULL),
331 m_metadata_listener(this) {
332
333 CephContext *cct = m_image_ctx.cct;
334 ldout(cct, 5) << this << ": ictx=" << &m_image_ctx << dendl;
335
336 ThreadPoolSingleton *thread_pool_singleton;
337 cct->lookup_or_create_singleton_object<ThreadPoolSingleton>(
338 thread_pool_singleton, "librbd::journal::thread_pool");
339 m_work_queue = new ContextWQ("librbd::journal::work_queue",
340 cct->_conf->get_val<int64_t>("rbd_op_thread_timeout"),
341 thread_pool_singleton);
342 ImageCtx::get_timer_instance(cct, &m_timer, &m_timer_lock);
343 }
344
345 template <typename I>
346 Journal<I>::~Journal() {
347 if (m_work_queue != nullptr) {
348 m_work_queue->drain();
349 delete m_work_queue;
350 }
351
352 assert(m_state == STATE_UNINITIALIZED || m_state == STATE_CLOSED);
353 assert(m_journaler == NULL);
354 assert(m_journal_replay == NULL);
355 assert(m_wait_for_state_contexts.empty());
356 }
357
358 template <typename I>
359 bool Journal<I>::is_journal_supported(I &image_ctx) {
360 assert(image_ctx.snap_lock.is_locked());
361 return ((image_ctx.features & RBD_FEATURE_JOURNALING) &&
362 !image_ctx.read_only && image_ctx.snap_id == CEPH_NOSNAP);
363 }
364
365 template <typename I>
366 int Journal<I>::create(librados::IoCtx &io_ctx, const std::string &image_id,
367 uint8_t order, uint8_t splay_width,
368 const std::string &object_pool) {
369 CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
370 ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
371
372 ThreadPool *thread_pool;
373 ContextWQ *op_work_queue;
374 ImageCtx::get_thread_pool_instance(cct, &thread_pool, &op_work_queue);
375
376 C_SaferCond cond;
377 journal::TagData tag_data(LOCAL_MIRROR_UUID);
378 journal::CreateRequest<I> *req = journal::CreateRequest<I>::create(
379 io_ctx, image_id, order, splay_width, object_pool, cls::journal::Tag::TAG_CLASS_NEW,
380 tag_data, IMAGE_CLIENT_ID, op_work_queue, &cond);
381 req->send();
382
383 return cond.wait();
384 }
385
386 template <typename I>
387 int Journal<I>::remove(librados::IoCtx &io_ctx, const std::string &image_id) {
388 CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
389 ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
390
391 ThreadPool *thread_pool;
392 ContextWQ *op_work_queue;
393 ImageCtx::get_thread_pool_instance(cct, &thread_pool, &op_work_queue);
394
395 C_SaferCond cond;
396 journal::RemoveRequest<I> *req = journal::RemoveRequest<I>::create(
397 io_ctx, image_id, IMAGE_CLIENT_ID, op_work_queue, &cond);
398 req->send();
399
400 return cond.wait();
401 }
402
403 template <typename I>
404 int Journal<I>::reset(librados::IoCtx &io_ctx, const std::string &image_id) {
405 CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
406 ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
407
408 Journaler journaler(io_ctx, image_id, IMAGE_CLIENT_ID, {});
409
410 C_SaferCond cond;
411 journaler.init(&cond);
412 BOOST_SCOPE_EXIT_ALL(&journaler) {
413 journaler.shut_down();
414 };
415
416 int r = cond.wait();
417 if (r == -ENOENT) {
418 return 0;
419 } else if (r < 0) {
420 lderr(cct) << __func__ << ": "
421 << "failed to initialize journal: " << cpp_strerror(r) << dendl;
422 return r;
423 }
424
425 uint8_t order, splay_width;
426 int64_t pool_id;
427 journaler.get_metadata(&order, &splay_width, &pool_id);
428
429 std::string pool_name;
430 if (pool_id != -1) {
431 librados::Rados rados(io_ctx);
432 r = rados.pool_reverse_lookup(pool_id, &pool_name);
433 if (r < 0) {
434 lderr(cct) << __func__ << ": "
435 << "failed to lookup data pool: " << cpp_strerror(r) << dendl;
436 return r;
437 }
438 }
439
440 C_SaferCond ctx1;
441 journaler.remove(true, &ctx1);
442 r = ctx1.wait();
443 if (r < 0) {
444 lderr(cct) << __func__ << ": "
445 << "failed to reset journal: " << cpp_strerror(r) << dendl;
446 return r;
447 }
448
449 r = create(io_ctx, image_id, order, splay_width, pool_name);
450 if (r < 0) {
451 lderr(cct) << __func__ << ": "
452 << "failed to create journal: " << cpp_strerror(r) << dendl;
453 return r;
454 }
455 return 0;
456 }
457
458 template <typename I>
459 void Journal<I>::is_tag_owner(I *image_ctx, bool *owner,
460 Context *on_finish) {
461 Journal<I>::is_tag_owner(image_ctx->md_ctx, image_ctx->id, owner,
462 image_ctx->op_work_queue, on_finish);
463 }
464
465 template <typename I>
466 void Journal<I>::is_tag_owner(librados::IoCtx& io_ctx, std::string& image_id,
467 bool *is_tag_owner, ContextWQ *op_work_queue,
468 Context *on_finish) {
469 CephContext *cct = reinterpret_cast<CephContext*>(io_ctx.cct());
470 ldout(cct, 20) << __func__ << dendl;
471
472 C_IsTagOwner<I> *is_tag_owner_ctx = new C_IsTagOwner<I>(
473 io_ctx, image_id, is_tag_owner, op_work_queue, on_finish);
474 get_tags(cct, is_tag_owner_ctx->journaler, &is_tag_owner_ctx->client,
475 &is_tag_owner_ctx->client_meta, &is_tag_owner_ctx->tag_tid,
476 &is_tag_owner_ctx->tag_data, is_tag_owner_ctx);
477 }
478
479 template <typename I>
480 void Journal<I>::get_tag_owner(IoCtx& io_ctx, std::string& image_id,
481 std::string *mirror_uuid,
482 ContextWQ *op_work_queue, Context *on_finish) {
483 CephContext *cct = (CephContext *)io_ctx.cct();
484 ldout(cct, 20) << __func__ << dendl;
485
486 auto ctx = new C_GetTagOwner(io_ctx, image_id, mirror_uuid, on_finish);
487 get_tags(cct, &ctx->journaler, &ctx->client, &ctx->client_meta, &ctx->tag_tid,
488 &ctx->tag_data, create_async_context_callback(op_work_queue, ctx));
489 }
490
491 template <typename I>
492 int Journal<I>::request_resync(I *image_ctx) {
493 CephContext *cct = image_ctx->cct;
494 ldout(cct, 20) << __func__ << dendl;
495
496 Journaler journaler(image_ctx->md_ctx, image_ctx->id, IMAGE_CLIENT_ID, {});
497
498 Mutex lock("lock");
499 journal::ImageClientMeta client_meta;
500 uint64_t tag_tid;
501 journal::TagData tag_data;
502
503 C_SaferCond open_ctx;
504 auto open_req = journal::OpenRequest<I>::create(image_ctx, &journaler, &lock,
505 &client_meta, &tag_tid,
506 &tag_data, &open_ctx);
507 open_req->send();
508
509 BOOST_SCOPE_EXIT_ALL(&journaler) {
510 journaler.shut_down();
511 };
512
513 int r = open_ctx.wait();
514 if (r < 0) {
515 return r;
516 }
517
518 client_meta.resync_requested = true;
519
520 journal::ClientData client_data(client_meta);
521 bufferlist client_data_bl;
522 ::encode(client_data, client_data_bl);
523
524 C_SaferCond update_client_ctx;
525 journaler.update_client(client_data_bl, &update_client_ctx);
526
527 r = update_client_ctx.wait();
528 if (r < 0) {
529 lderr(cct) << __func__ << ": "
530 << "failed to update client: " << cpp_strerror(r) << dendl;
531 return r;
532 }
533 return 0;
534 }
535
536 template <typename I>
537 void Journal<I>::promote(I *image_ctx, Context *on_finish) {
538 CephContext *cct = image_ctx->cct;
539 ldout(cct, 20) << __func__ << dendl;
540
541 auto promote_req = journal::PromoteRequest<I>::create(image_ctx, false,
542 on_finish);
543 promote_req->send();
544 }
545
546 template <typename I>
547 void Journal<I>::demote(I *image_ctx, Context *on_finish) {
548 CephContext *cct = image_ctx->cct;
549 ldout(cct, 20) << __func__ << dendl;
550
551 auto req = journal::DemoteRequest<I>::create(*image_ctx, on_finish);
552 req->send();
553 }
554
555 template <typename I>
556 bool Journal<I>::is_journal_ready() const {
557 Mutex::Locker locker(m_lock);
558 return (m_state == STATE_READY);
559 }
560
561 template <typename I>
562 bool Journal<I>::is_journal_replaying() const {
563 Mutex::Locker locker(m_lock);
564 return is_journal_replaying(m_lock);
565 }
566
567 template <typename I>
568 bool Journal<I>::is_journal_replaying(const Mutex &) const {
569 assert(m_lock.is_locked());
570 return (m_state == STATE_REPLAYING ||
571 m_state == STATE_FLUSHING_REPLAY ||
572 m_state == STATE_FLUSHING_RESTART ||
573 m_state == STATE_RESTARTING_REPLAY);
574 }
575
576 template <typename I>
577 bool Journal<I>::is_journal_appending() const {
578 assert(m_image_ctx.snap_lock.is_locked());
579 Mutex::Locker locker(m_lock);
580 return (m_state == STATE_READY &&
581 !m_image_ctx.get_journal_policy()->append_disabled());
582 }
583
584 template <typename I>
585 void Journal<I>::wait_for_journal_ready(Context *on_ready) {
586 on_ready = create_async_context_callback(m_image_ctx, on_ready);
587
588 Mutex::Locker locker(m_lock);
589 if (m_state == STATE_READY) {
590 on_ready->complete(m_error_result);
591 } else {
592 wait_for_steady_state(on_ready);
593 }
594 }
595
596 template <typename I>
597 void Journal<I>::open(Context *on_finish) {
598 CephContext *cct = m_image_ctx.cct;
599 ldout(cct, 20) << this << " " << __func__ << dendl;
600
601 on_finish = create_async_context_callback(m_image_ctx, on_finish);
602
603 Mutex::Locker locker(m_lock);
604 assert(m_state == STATE_UNINITIALIZED);
605 wait_for_steady_state(on_finish);
606 create_journaler();
607 }
608
609 template <typename I>
610 void Journal<I>::close(Context *on_finish) {
611 CephContext *cct = m_image_ctx.cct;
612 ldout(cct, 20) << this << " " << __func__ << dendl;
613
614 on_finish = create_async_context_callback(m_image_ctx, on_finish);
615
616 Mutex::Locker locker(m_lock);
617 while (m_listener_notify) {
618 m_listener_cond.Wait(m_lock);
619 }
620
621 Listeners listeners(m_listeners);
622 m_listener_notify = true;
623 m_lock.Unlock();
624 for (auto listener : listeners) {
625 listener->handle_close();
626 }
627
628 m_lock.Lock();
629 m_listener_notify = false;
630 m_listener_cond.Signal();
631
632 assert(m_state != STATE_UNINITIALIZED);
633 if (m_state == STATE_CLOSED) {
634 on_finish->complete(m_error_result);
635 return;
636 }
637
638 if (m_state == STATE_READY) {
639 stop_recording();
640 }
641
642 m_close_pending = true;
643 wait_for_steady_state(on_finish);
644 }
645
646 template <typename I>
647 bool Journal<I>::is_tag_owner() const {
648 Mutex::Locker locker(m_lock);
649 return is_tag_owner(m_lock);
650 }
651
652 template <typename I>
653 bool Journal<I>::is_tag_owner(const Mutex &) const {
654 assert(m_lock.is_locked());
655 return (m_tag_data.mirror_uuid == LOCAL_MIRROR_UUID);
656 }
657
658 template <typename I>
659 uint64_t Journal<I>::get_tag_tid() const {
660 Mutex::Locker locker(m_lock);
661 return m_tag_tid;
662 }
663
664 template <typename I>
665 journal::TagData Journal<I>::get_tag_data() const {
666 Mutex::Locker locker(m_lock);
667 return m_tag_data;
668 }
669
670 template <typename I>
671 void Journal<I>::allocate_local_tag(Context *on_finish) {
672 CephContext *cct = m_image_ctx.cct;
673 ldout(cct, 20) << this << " " << __func__ << dendl;
674
675 journal::TagPredecessor predecessor;
676 predecessor.mirror_uuid = LOCAL_MIRROR_UUID;
677 {
678 Mutex::Locker locker(m_lock);
679 assert(m_journaler != nullptr && is_tag_owner(m_lock));
680
681 cls::journal::Client client;
682 int r = m_journaler->get_cached_client(IMAGE_CLIENT_ID, &client);
683 if (r < 0) {
684 lderr(cct) << this << " " << __func__ << ": "
685 << "failed to retrieve client: " << cpp_strerror(r) << dendl;
686 m_image_ctx.op_work_queue->queue(on_finish, r);
687 return;
688 }
689
690 // since we are primary, populate the predecessor with our known commit
691 // position
692 assert(m_tag_data.mirror_uuid == LOCAL_MIRROR_UUID);
693 if (!client.commit_position.object_positions.empty()) {
694 auto position = client.commit_position.object_positions.front();
695 predecessor.commit_valid = true;
696 predecessor.tag_tid = position.tag_tid;
697 predecessor.entry_tid = position.entry_tid;
698 }
699 }
700
701 allocate_tag(LOCAL_MIRROR_UUID, predecessor, on_finish);
702 }
703
704 template <typename I>
705 void Journal<I>::allocate_tag(const std::string &mirror_uuid,
706 const journal::TagPredecessor &predecessor,
707 Context *on_finish) {
708 CephContext *cct = m_image_ctx.cct;
709 ldout(cct, 20) << this << " " << __func__ << ": mirror_uuid=" << mirror_uuid
710 << dendl;
711
712 Mutex::Locker locker(m_lock);
713 assert(m_journaler != nullptr);
714
715 journal::TagData tag_data;
716 tag_data.mirror_uuid = mirror_uuid;
717 tag_data.predecessor = predecessor;
718
719 bufferlist tag_bl;
720 ::encode(tag_data, tag_bl);
721
722 C_DecodeTag *decode_tag_ctx = new C_DecodeTag(cct, &m_lock, &m_tag_tid,
723 &m_tag_data, on_finish);
724 m_journaler->allocate_tag(m_tag_class, tag_bl, &decode_tag_ctx->tag,
725 decode_tag_ctx);
726 }
727
728 template <typename I>
729 void Journal<I>::flush_commit_position(Context *on_finish) {
730 CephContext *cct = m_image_ctx.cct;
731 ldout(cct, 20) << this << " " << __func__ << dendl;
732
733 Mutex::Locker locker(m_lock);
734 assert(m_journaler != nullptr);
735 m_journaler->flush_commit_position(on_finish);
736 }
737
738 template <typename I>
739 uint64_t Journal<I>::append_write_event(uint64_t offset, size_t length,
740 const bufferlist &bl,
741 const IOObjectRequests &requests,
742 bool flush_entry) {
743 assert(m_max_append_size > journal::AioWriteEvent::get_fixed_size());
744 uint64_t max_write_data_size =
745 m_max_append_size - journal::AioWriteEvent::get_fixed_size();
746
747 // ensure that the write event fits within the journal entry
748 Bufferlists bufferlists;
749 uint64_t bytes_remaining = length;
750 uint64_t event_offset = 0;
751 do {
752 uint64_t event_length = MIN(bytes_remaining, max_write_data_size);
753
754 bufferlist event_bl;
755 event_bl.substr_of(bl, event_offset, event_length);
756 journal::EventEntry event_entry(journal::AioWriteEvent(offset + event_offset,
757 event_length,
758 event_bl),
759 ceph_clock_now());
760
761 bufferlists.emplace_back();
762 ::encode(event_entry, bufferlists.back());
763
764 event_offset += event_length;
765 bytes_remaining -= event_length;
766 } while (bytes_remaining > 0);
767
768 return append_io_events(journal::EVENT_TYPE_AIO_WRITE, bufferlists, requests,
769 offset, length, flush_entry, 0);
770 }
771
772 template <typename I>
773 uint64_t Journal<I>::append_io_event(journal::EventEntry &&event_entry,
774 const IOObjectRequests &requests,
775 uint64_t offset, size_t length,
776 bool flush_entry, int filter_ret_val) {
777 bufferlist bl;
778 event_entry.timestamp = ceph_clock_now();
779 ::encode(event_entry, bl);
780 return append_io_events(event_entry.get_event_type(), {bl}, requests, offset,
781 length, flush_entry, filter_ret_val);
782 }
783
784 template <typename I>
785 uint64_t Journal<I>::append_io_events(journal::EventType event_type,
786 const Bufferlists &bufferlists,
787 const IOObjectRequests &requests,
788 uint64_t offset, size_t length,
789 bool flush_entry, int filter_ret_val) {
790 assert(!bufferlists.empty());
791
792 uint64_t tid;
793 {
794 Mutex::Locker locker(m_lock);
795 assert(m_state == STATE_READY);
796
797 tid = ++m_event_tid;
798 assert(tid != 0);
799 }
800
801 Futures futures;
802 for (auto &bl : bufferlists) {
803 assert(bl.length() <= m_max_append_size);
804 futures.push_back(m_journaler->append(m_tag_tid, bl));
805 }
806
807 {
808 Mutex::Locker event_locker(m_event_lock);
809 m_events[tid] = Event(futures, requests, offset, length, filter_ret_val);
810 }
811
812 CephContext *cct = m_image_ctx.cct;
813 ldout(cct, 20) << this << " " << __func__ << ": "
814 << "event=" << event_type << ", "
815 << "new_reqs=" << requests.size() << ", "
816 << "offset=" << offset << ", "
817 << "length=" << length << ", "
818 << "flush=" << flush_entry << ", tid=" << tid << dendl;
819
820 Context *on_safe = create_async_context_callback(
821 m_image_ctx, new C_IOEventSafe(this, tid));
822 if (flush_entry) {
823 futures.back().flush(on_safe);
824 } else {
825 futures.back().wait(on_safe);
826 }
827
828 return tid;
829 }
830
831 template <typename I>
832 void Journal<I>::commit_io_event(uint64_t tid, int r) {
833 CephContext *cct = m_image_ctx.cct;
834 ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
835 "r=" << r << dendl;
836
837 Mutex::Locker event_locker(m_event_lock);
838 typename Events::iterator it = m_events.find(tid);
839 if (it == m_events.end()) {
840 return;
841 }
842 complete_event(it, r);
843 }
844
845 template <typename I>
846 void Journal<I>::commit_io_event_extent(uint64_t tid, uint64_t offset,
847 uint64_t length, int r) {
848 assert(length > 0);
849
850 CephContext *cct = m_image_ctx.cct;
851 ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
852 << "offset=" << offset << ", "
853 << "length=" << length << ", "
854 << "r=" << r << dendl;
855
856 Mutex::Locker event_locker(m_event_lock);
857 typename Events::iterator it = m_events.find(tid);
858 if (it == m_events.end()) {
859 return;
860 }
861
862 Event &event = it->second;
863 if (event.ret_val == 0 && r < 0) {
864 event.ret_val = r;
865 }
866
867 ExtentInterval extent;
868 extent.insert(offset, length);
869
870 ExtentInterval intersect;
871 intersect.intersection_of(extent, event.pending_extents);
872
873 event.pending_extents.subtract(intersect);
874 if (!event.pending_extents.empty()) {
875 ldout(cct, 20) << this << " " << __func__ << ": "
876 << "pending extents: " << event.pending_extents << dendl;
877 return;
878 }
879 complete_event(it, event.ret_val);
880 }
881
882 template <typename I>
883 void Journal<I>::append_op_event(uint64_t op_tid,
884 journal::EventEntry &&event_entry,
885 Context *on_safe) {
886 assert(m_image_ctx.owner_lock.is_locked());
887
888 bufferlist bl;
889 event_entry.timestamp = ceph_clock_now();
890 ::encode(event_entry, bl);
891
892 Future future;
893 {
894 Mutex::Locker locker(m_lock);
895 assert(m_state == STATE_READY);
896
897 future = m_journaler->append(m_tag_tid, bl);
898
899 // delay committing op event to ensure consistent replay
900 assert(m_op_futures.count(op_tid) == 0);
901 m_op_futures[op_tid] = future;
902 }
903
904 on_safe = create_async_context_callback(m_image_ctx, on_safe);
905 on_safe = new FunctionContext([this, on_safe](int r) {
906 // ensure all committed IO before this op is committed
907 m_journaler->flush_commit_position(on_safe);
908 });
909 future.flush(on_safe);
910
911 CephContext *cct = m_image_ctx.cct;
912 ldout(cct, 10) << this << " " << __func__ << ": "
913 << "op_tid=" << op_tid << ", "
914 << "event=" << event_entry.get_event_type() << dendl;
915 }
916
917 template <typename I>
918 void Journal<I>::commit_op_event(uint64_t op_tid, int r, Context *on_safe) {
919 CephContext *cct = m_image_ctx.cct;
920 ldout(cct, 10) << this << " " << __func__ << ": op_tid=" << op_tid << ", "
921 << "r=" << r << dendl;
922
923 journal::EventEntry event_entry((journal::OpFinishEvent(op_tid, r)),
924 ceph_clock_now());
925
926 bufferlist bl;
927 ::encode(event_entry, bl);
928
929 Future op_start_future;
930 Future op_finish_future;
931 {
932 Mutex::Locker locker(m_lock);
933 assert(m_state == STATE_READY);
934
935 // ready to commit op event
936 auto it = m_op_futures.find(op_tid);
937 assert(it != m_op_futures.end());
938 op_start_future = it->second;
939 m_op_futures.erase(it);
940
941 op_finish_future = m_journaler->append(m_tag_tid, bl);
942 }
943
944 op_finish_future.flush(create_async_context_callback(
945 m_image_ctx, new C_OpEventSafe(this, op_tid, op_start_future,
946 op_finish_future, on_safe)));
947 }
948
949 template <typename I>
950 void Journal<I>::replay_op_ready(uint64_t op_tid, Context *on_resume) {
951 CephContext *cct = m_image_ctx.cct;
952 ldout(cct, 10) << this << " " << __func__ << ": op_tid=" << op_tid << dendl;
953
954 {
955 Mutex::Locker locker(m_lock);
956 assert(m_journal_replay != nullptr);
957 m_journal_replay->replay_op_ready(op_tid, on_resume);
958 }
959 }
960
961 template <typename I>
962 void Journal<I>::flush_event(uint64_t tid, Context *on_safe) {
963 CephContext *cct = m_image_ctx.cct;
964 ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
965 << "on_safe=" << on_safe << dendl;
966
967 Future future;
968 {
969 Mutex::Locker event_locker(m_event_lock);
970 future = wait_event(m_lock, tid, on_safe);
971 }
972
973 if (future.is_valid()) {
974 future.flush(nullptr);
975 }
976 }
977
978 template <typename I>
979 void Journal<I>::wait_event(uint64_t tid, Context *on_safe) {
980 CephContext *cct = m_image_ctx.cct;
981 ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
982 << "on_safe=" << on_safe << dendl;
983
984 Mutex::Locker event_locker(m_event_lock);
985 wait_event(m_lock, tid, on_safe);
986 }
987
988 template <typename I>
989 typename Journal<I>::Future Journal<I>::wait_event(Mutex &lock, uint64_t tid,
990 Context *on_safe) {
991 assert(m_event_lock.is_locked());
992 CephContext *cct = m_image_ctx.cct;
993
994 typename Events::iterator it = m_events.find(tid);
995 assert(it != m_events.end());
996
997 Event &event = it->second;
998 if (event.safe) {
999 // journal entry already safe
1000 ldout(cct, 20) << this << " " << __func__ << ": "
1001 << "journal entry already safe" << dendl;
1002 m_image_ctx.op_work_queue->queue(on_safe, event.ret_val);
1003 return Future();
1004 }
1005
1006 event.on_safe_contexts.push_back(create_async_context_callback(m_image_ctx,
1007 on_safe));
1008 return event.futures.back();
1009 }
1010
1011 template <typename I>
1012 void Journal<I>::start_external_replay(journal::Replay<I> **journal_replay,
1013 Context *on_start) {
1014 CephContext *cct = m_image_ctx.cct;
1015 ldout(cct, 20) << this << " " << __func__ << dendl;
1016
1017 Mutex::Locker locker(m_lock);
1018 assert(m_state == STATE_READY);
1019 assert(m_journal_replay == nullptr);
1020
1021 on_start = util::create_async_context_callback(m_image_ctx, on_start);
1022 on_start = new FunctionContext(
1023 [this, journal_replay, on_start](int r) {
1024 handle_start_external_replay(r, journal_replay, on_start);
1025 });
1026
1027 // safely flush all in-flight events before starting external replay
1028 m_journaler->stop_append(util::create_async_context_callback(m_image_ctx,
1029 on_start));
1030 }
1031
1032 template <typename I>
1033 void Journal<I>::handle_start_external_replay(int r,
1034 journal::Replay<I> **journal_replay,
1035 Context *on_finish) {
1036 CephContext *cct = m_image_ctx.cct;
1037 ldout(cct, 20) << this << " " << __func__ << dendl;
1038
1039 Mutex::Locker locker(m_lock);
1040 assert(m_state == STATE_READY);
1041 assert(m_journal_replay == nullptr);
1042
1043 if (r < 0) {
1044 lderr(cct) << this << " " << __func__ << ": "
1045 << "failed to stop recording: " << cpp_strerror(r) << dendl;
1046 *journal_replay = nullptr;
1047
1048 // get back to a sane-state
1049 start_append();
1050 on_finish->complete(r);
1051 return;
1052 }
1053
1054 transition_state(STATE_REPLAYING, 0);
1055 m_journal_replay = journal::Replay<I>::create(m_image_ctx);
1056 *journal_replay = m_journal_replay;
1057 on_finish->complete(0);
1058 }
1059
1060 template <typename I>
1061 void Journal<I>::stop_external_replay() {
1062 CephContext *cct = m_image_ctx.cct;
1063 ldout(cct, 20) << this << " " << __func__ << dendl;
1064
1065 Mutex::Locker locker(m_lock);
1066 assert(m_journal_replay != nullptr);
1067 assert(m_state == STATE_REPLAYING);
1068
1069 delete m_journal_replay;
1070 m_journal_replay = nullptr;
1071
1072 if (m_close_pending) {
1073 destroy_journaler(0);
1074 return;
1075 }
1076
1077 start_append();
1078 }
1079
1080 template <typename I>
1081 void Journal<I>::create_journaler() {
1082 CephContext *cct = m_image_ctx.cct;
1083 ldout(cct, 20) << this << " " << __func__ << dendl;
1084
1085 assert(m_lock.is_locked());
1086 assert(m_state == STATE_UNINITIALIZED || m_state == STATE_RESTARTING_REPLAY);
1087 assert(m_journaler == NULL);
1088
1089 transition_state(STATE_INITIALIZING, 0);
1090 ::journal::Settings settings;
1091 settings.commit_interval = m_image_ctx.journal_commit_age;
1092 settings.max_payload_bytes = m_image_ctx.journal_max_payload_bytes;
1093 settings.max_concurrent_object_sets =
1094 m_image_ctx.journal_max_concurrent_object_sets;
1095 // TODO: a configurable filter to exclude certain peers from being
1096 // disconnected.
1097 settings.whitelisted_laggy_clients = {IMAGE_CLIENT_ID};
1098
1099 m_journaler = new Journaler(m_work_queue, m_timer, m_timer_lock,
1100 m_image_ctx.md_ctx, m_image_ctx.id,
1101 IMAGE_CLIENT_ID, settings);
1102 m_journaler->add_listener(&m_metadata_listener);
1103
1104 Context *ctx = create_async_context_callback(
1105 m_image_ctx, create_context_callback<
1106 Journal<I>, &Journal<I>::handle_open>(this));
1107 auto open_req = journal::OpenRequest<I>::create(&m_image_ctx, m_journaler,
1108 &m_lock, &m_client_meta,
1109 &m_tag_tid, &m_tag_data, ctx);
1110 open_req->send();
1111 }
1112
1113 template <typename I>
1114 void Journal<I>::destroy_journaler(int r) {
1115 CephContext *cct = m_image_ctx.cct;
1116 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1117
1118 assert(m_lock.is_locked());
1119
1120 delete m_journal_replay;
1121 m_journal_replay = NULL;
1122
1123 m_journaler->remove_listener(&m_metadata_listener);
1124
1125 transition_state(STATE_CLOSING, r);
1126
1127 Context *ctx = create_async_context_callback(
1128 m_image_ctx, create_context_callback<
1129 Journal<I>, &Journal<I>::handle_journal_destroyed>(this));
1130 ctx = new FunctionContext(
1131 [this, ctx](int r) {
1132 Mutex::Locker locker(m_lock);
1133 m_journaler->shut_down(ctx);
1134 });
1135 m_async_journal_op_tracker.wait(m_image_ctx, ctx);
1136 }
1137
1138 template <typename I>
1139 void Journal<I>::recreate_journaler(int r) {
1140 CephContext *cct = m_image_ctx.cct;
1141 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1142
1143 assert(m_lock.is_locked());
1144 assert(m_state == STATE_FLUSHING_RESTART ||
1145 m_state == STATE_FLUSHING_REPLAY);
1146
1147 delete m_journal_replay;
1148 m_journal_replay = NULL;
1149
1150 m_journaler->remove_listener(&m_metadata_listener);
1151
1152 transition_state(STATE_RESTARTING_REPLAY, r);
1153 m_journaler->shut_down(create_async_context_callback(
1154 m_image_ctx, create_context_callback<
1155 Journal<I>, &Journal<I>::handle_journal_destroyed>(this)));
1156 }
1157
1158 template <typename I>
1159 void Journal<I>::complete_event(typename Events::iterator it, int r) {
1160 assert(m_event_lock.is_locked());
1161 assert(m_state == STATE_READY);
1162
1163 CephContext *cct = m_image_ctx.cct;
1164 ldout(cct, 20) << this << " " << __func__ << ": tid=" << it->first << " "
1165 << "r=" << r << dendl;
1166
1167 Event &event = it->second;
1168 if (r < 0 && r == event.filter_ret_val) {
1169 // ignore allowed error codes
1170 r = 0;
1171 }
1172 if (r < 0) {
1173 // event recorded to journal but failed to update disk, we cannot
1174 // commit this IO event. this event must be replayed.
1175 assert(event.safe);
1176 lderr(cct) << this << " " << __func__ << ": "
1177 << "failed to commit IO to disk, replay required: "
1178 << cpp_strerror(r) << dendl;
1179 }
1180
1181 event.committed_io = true;
1182 if (event.safe) {
1183 if (r >= 0) {
1184 for (auto &future : event.futures) {
1185 m_journaler->committed(future);
1186 }
1187 }
1188 m_events.erase(it);
1189 }
1190 }
1191
1192 template <typename I>
1193 void Journal<I>::start_append() {
1194 assert(m_lock.is_locked());
1195 m_journaler->start_append(m_image_ctx.journal_object_flush_interval,
1196 m_image_ctx.journal_object_flush_bytes,
1197 m_image_ctx.journal_object_flush_age);
1198 transition_state(STATE_READY, 0);
1199 }
1200
1201 template <typename I>
1202 void Journal<I>::handle_open(int r) {
1203 CephContext *cct = m_image_ctx.cct;
1204 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1205
1206 Mutex::Locker locker(m_lock);
1207 assert(m_state == STATE_INITIALIZING);
1208
1209 if (r < 0) {
1210 lderr(cct) << this << " " << __func__ << ": "
1211 << "failed to initialize journal: " << cpp_strerror(r)
1212 << dendl;
1213 destroy_journaler(r);
1214 return;
1215 }
1216
1217 m_tag_class = m_client_meta.tag_class;
1218 m_max_append_size = m_journaler->get_max_append_size();
1219 ldout(cct, 20) << this << " " << __func__ << ": "
1220 << "tag_class=" << m_tag_class << ", "
1221 << "max_append_size=" << m_max_append_size << dendl;
1222
1223 transition_state(STATE_REPLAYING, 0);
1224 m_journal_replay = journal::Replay<I>::create(m_image_ctx);
1225 m_journaler->start_replay(&m_replay_handler);
1226 }
1227
1228 template <typename I>
1229 void Journal<I>::handle_replay_ready() {
1230 CephContext *cct = m_image_ctx.cct;
1231 ReplayEntry replay_entry;
1232 {
1233 Mutex::Locker locker(m_lock);
1234 if (m_state != STATE_REPLAYING) {
1235 return;
1236 }
1237
1238 ldout(cct, 20) << this << " " << __func__ << dendl;
1239 if (!m_journaler->try_pop_front(&replay_entry)) {
1240 return;
1241 }
1242
1243 // only one entry should be in-flight at a time
1244 assert(!m_processing_entry);
1245 m_processing_entry = true;
1246 }
1247
1248 bufferlist data = replay_entry.get_data();
1249 bufferlist::iterator it = data.begin();
1250
1251 journal::EventEntry event_entry;
1252 int r = m_journal_replay->decode(&it, &event_entry);
1253 if (r < 0) {
1254 lderr(cct) << this << " " << __func__ << ": "
1255 << "failed to decode journal event entry" << dendl;
1256 handle_replay_process_safe(replay_entry, r);
1257 return;
1258 }
1259
1260 Context *on_ready = create_context_callback<
1261 Journal<I>, &Journal<I>::handle_replay_process_ready>(this);
1262 Context *on_commit = new C_ReplayProcessSafe(this, std::move(replay_entry));
1263 m_journal_replay->process(event_entry, on_ready, on_commit);
1264 }
1265
1266 template <typename I>
1267 void Journal<I>::handle_replay_complete(int r) {
1268 CephContext *cct = m_image_ctx.cct;
1269
1270 bool cancel_ops = false;
1271 {
1272 Mutex::Locker locker(m_lock);
1273 if (m_state != STATE_REPLAYING) {
1274 return;
1275 }
1276
1277 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1278 if (r < 0) {
1279 cancel_ops = true;
1280 transition_state(STATE_FLUSHING_RESTART, r);
1281 } else {
1282 // state might change back to FLUSHING_RESTART on flush error
1283 transition_state(STATE_FLUSHING_REPLAY, 0);
1284 }
1285 }
1286
1287 Context *ctx = new FunctionContext([this, cct](int r) {
1288 ldout(cct, 20) << this << " handle_replay_complete: "
1289 << "handle shut down replay" << dendl;
1290
1291 State state;
1292 {
1293 Mutex::Locker locker(m_lock);
1294 assert(m_state == STATE_FLUSHING_RESTART ||
1295 m_state == STATE_FLUSHING_REPLAY);
1296 state = m_state;
1297 }
1298
1299 if (state == STATE_FLUSHING_RESTART) {
1300 handle_flushing_restart(0);
1301 } else {
1302 handle_flushing_replay();
1303 }
1304 });
1305 ctx = new FunctionContext([this, cct, cancel_ops, ctx](int r) {
1306 ldout(cct, 20) << this << " handle_replay_complete: "
1307 << "shut down replay" << dendl;
1308 m_journal_replay->shut_down(cancel_ops, ctx);
1309 });
1310 m_journaler->stop_replay(ctx);
1311 }
1312
1313 template <typename I>
1314 void Journal<I>::handle_replay_process_ready(int r) {
1315 // journal::Replay is ready for more events -- attempt to pop another
1316 CephContext *cct = m_image_ctx.cct;
1317 ldout(cct, 20) << this << " " << __func__ << dendl;
1318
1319 assert(r == 0);
1320 {
1321 Mutex::Locker locker(m_lock);
1322 assert(m_processing_entry);
1323 m_processing_entry = false;
1324 }
1325 handle_replay_ready();
1326 }
1327
1328 template <typename I>
1329 void Journal<I>::handle_replay_process_safe(ReplayEntry replay_entry, int r) {
1330 CephContext *cct = m_image_ctx.cct;
1331
1332 m_lock.Lock();
1333 assert(m_state == STATE_REPLAYING ||
1334 m_state == STATE_FLUSHING_RESTART ||
1335 m_state == STATE_FLUSHING_REPLAY);
1336
1337 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1338 if (r < 0) {
1339 if (r != -ECANCELED) {
1340 lderr(cct) << this << " " << __func__ << ": "
1341 << "failed to commit journal event to disk: "
1342 << cpp_strerror(r) << dendl;
1343 }
1344
1345 if (m_state == STATE_REPLAYING) {
1346 // abort the replay if we have an error
1347 transition_state(STATE_FLUSHING_RESTART, r);
1348 m_lock.Unlock();
1349
1350 // stop replay, shut down, and restart
1351 Context *ctx = new FunctionContext([this, cct](int r) {
1352 ldout(cct, 20) << this << " handle_replay_process_safe: "
1353 << "shut down replay" << dendl;
1354 {
1355 Mutex::Locker locker(m_lock);
1356 assert(m_state == STATE_FLUSHING_RESTART);
1357 }
1358
1359 m_journal_replay->shut_down(true, create_context_callback<
1360 Journal<I>, &Journal<I>::handle_flushing_restart>(this));
1361 });
1362 m_journaler->stop_replay(ctx);
1363 return;
1364 } else if (m_state == STATE_FLUSHING_REPLAY) {
1365 // end-of-replay flush in-progress -- we need to restart replay
1366 transition_state(STATE_FLUSHING_RESTART, r);
1367 m_lock.Unlock();
1368 return;
1369 }
1370 } else {
1371 // only commit the entry if written successfully
1372 m_journaler->committed(replay_entry);
1373 }
1374 m_lock.Unlock();
1375 }
1376
1377 template <typename I>
1378 void Journal<I>::handle_flushing_restart(int r) {
1379 Mutex::Locker locker(m_lock);
1380
1381 CephContext *cct = m_image_ctx.cct;
1382 ldout(cct, 20) << this << " " << __func__ << dendl;
1383
1384 assert(r == 0);
1385 assert(m_state == STATE_FLUSHING_RESTART);
1386 if (m_close_pending) {
1387 destroy_journaler(r);
1388 return;
1389 }
1390
1391 recreate_journaler(r);
1392 }
1393
1394 template <typename I>
1395 void Journal<I>::handle_flushing_replay() {
1396 Mutex::Locker locker(m_lock);
1397
1398 CephContext *cct = m_image_ctx.cct;
1399 ldout(cct, 20) << this << " " << __func__ << dendl;
1400
1401 assert(m_state == STATE_FLUSHING_REPLAY || m_state == STATE_FLUSHING_RESTART);
1402 if (m_close_pending) {
1403 destroy_journaler(0);
1404 return;
1405 } else if (m_state == STATE_FLUSHING_RESTART) {
1406 // failed to replay one-or-more events -- restart
1407 recreate_journaler(0);
1408 return;
1409 }
1410
1411 delete m_journal_replay;
1412 m_journal_replay = NULL;
1413
1414 m_error_result = 0;
1415 start_append();
1416 }
1417
1418 template <typename I>
1419 void Journal<I>::handle_recording_stopped(int r) {
1420 CephContext *cct = m_image_ctx.cct;
1421 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1422
1423 Mutex::Locker locker(m_lock);
1424 assert(m_state == STATE_STOPPING);
1425
1426 destroy_journaler(r);
1427 }
1428
1429 template <typename I>
1430 void Journal<I>::handle_journal_destroyed(int r) {
1431 CephContext *cct = m_image_ctx.cct;
1432 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
1433
1434 if (r < 0) {
1435 lderr(cct) << this << " " << __func__
1436 << "error detected while closing journal: " << cpp_strerror(r)
1437 << dendl;
1438 }
1439
1440 Mutex::Locker locker(m_lock);
1441 delete m_journaler;
1442 m_journaler = nullptr;
1443
1444 assert(m_state == STATE_CLOSING || m_state == STATE_RESTARTING_REPLAY);
1445 if (m_state == STATE_RESTARTING_REPLAY) {
1446 create_journaler();
1447 return;
1448 }
1449
1450 transition_state(STATE_CLOSED, r);
1451 }
1452
1453 template <typename I>
1454 void Journal<I>::handle_io_event_safe(int r, uint64_t tid) {
1455 CephContext *cct = m_image_ctx.cct;
1456 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << ", "
1457 << "tid=" << tid << dendl;
1458
1459 // journal will be flushed before closing
1460 assert(m_state == STATE_READY || m_state == STATE_STOPPING);
1461 if (r < 0) {
1462 lderr(cct) << this << " " << __func__ << ": "
1463 << "failed to commit IO event: " << cpp_strerror(r) << dendl;
1464 }
1465
1466 IOObjectRequests aio_object_requests;
1467 Contexts on_safe_contexts;
1468 {
1469 Mutex::Locker event_locker(m_event_lock);
1470 typename Events::iterator it = m_events.find(tid);
1471 assert(it != m_events.end());
1472
1473 Event &event = it->second;
1474 aio_object_requests.swap(event.aio_object_requests);
1475 on_safe_contexts.swap(event.on_safe_contexts);
1476
1477 if (r < 0 || event.committed_io) {
1478 // failed journal write so IO won't be sent -- or IO extent was
1479 // overwritten by future IO operations so this was a no-op IO event
1480 event.ret_val = r;
1481 for (auto &future : event.futures) {
1482 m_journaler->committed(future);
1483 }
1484 }
1485
1486 if (event.committed_io) {
1487 m_events.erase(it);
1488 } else {
1489 event.safe = true;
1490 }
1491 }
1492
1493 ldout(cct, 20) << this << " " << __func__ << ": "
1494 << "completing tid=" << tid << dendl;
1495 for (IOObjectRequests::iterator it = aio_object_requests.begin();
1496 it != aio_object_requests.end(); ++it) {
1497 if (r < 0) {
1498 // don't send aio requests if the journal fails -- bubble error up
1499 (*it)->fail(r);
1500 } else {
1501 // send any waiting aio requests now that journal entry is safe
1502 (*it)->send();
1503 }
1504 }
1505
1506 // alert the cache about the journal event status
1507 for (Contexts::iterator it = on_safe_contexts.begin();
1508 it != on_safe_contexts.end(); ++it) {
1509 (*it)->complete(r);
1510 }
1511 }
1512
1513 template <typename I>
1514 void Journal<I>::handle_op_event_safe(int r, uint64_t tid,
1515 const Future &op_start_future,
1516 const Future &op_finish_future,
1517 Context *on_safe) {
1518 CephContext *cct = m_image_ctx.cct;
1519 ldout(cct, 20) << this << " " << __func__ << ": r=" << r << ", "
1520 << "tid=" << tid << dendl;
1521
1522 // journal will be flushed before closing
1523 assert(m_state == STATE_READY || m_state == STATE_STOPPING);
1524 if (r < 0) {
1525 lderr(cct) << this << " " << __func__ << ": "
1526 << "failed to commit op event: " << cpp_strerror(r) << dendl;
1527 }
1528
1529 m_journaler->committed(op_start_future);
1530 m_journaler->committed(op_finish_future);
1531
1532 // reduce the replay window after committing an op event
1533 m_journaler->flush_commit_position(on_safe);
1534 }
1535
1536 template <typename I>
1537 void Journal<I>::stop_recording() {
1538 assert(m_lock.is_locked());
1539 assert(m_journaler != NULL);
1540
1541 assert(m_state == STATE_READY);
1542 transition_state(STATE_STOPPING, 0);
1543
1544 m_journaler->stop_append(util::create_async_context_callback(
1545 m_image_ctx, create_context_callback<
1546 Journal<I>, &Journal<I>::handle_recording_stopped>(this)));
1547 }
1548
1549 template <typename I>
1550 void Journal<I>::transition_state(State state, int r) {
1551 CephContext *cct = m_image_ctx.cct;
1552 ldout(cct, 20) << this << " " << __func__ << ": new state=" << state << dendl;
1553 assert(m_lock.is_locked());
1554 m_state = state;
1555
1556 if (m_error_result == 0 && r < 0) {
1557 m_error_result = r;
1558 }
1559
1560 if (is_steady_state()) {
1561 Contexts wait_for_state_contexts(std::move(m_wait_for_state_contexts));
1562 for (auto ctx : wait_for_state_contexts) {
1563 ctx->complete(m_error_result);
1564 }
1565 }
1566 }
1567
1568 template <typename I>
1569 bool Journal<I>::is_steady_state() const {
1570 assert(m_lock.is_locked());
1571 switch (m_state) {
1572 case STATE_READY:
1573 case STATE_CLOSED:
1574 return true;
1575 case STATE_UNINITIALIZED:
1576 case STATE_INITIALIZING:
1577 case STATE_REPLAYING:
1578 case STATE_FLUSHING_RESTART:
1579 case STATE_RESTARTING_REPLAY:
1580 case STATE_FLUSHING_REPLAY:
1581 case STATE_STOPPING:
1582 case STATE_CLOSING:
1583 break;
1584 }
1585 return false;
1586 }
1587
1588 template <typename I>
1589 void Journal<I>::wait_for_steady_state(Context *on_state) {
1590 assert(m_lock.is_locked());
1591 assert(!is_steady_state());
1592
1593 CephContext *cct = m_image_ctx.cct;
1594 ldout(cct, 20) << this << " " << __func__ << ": on_state=" << on_state
1595 << dendl;
1596 m_wait_for_state_contexts.push_back(on_state);
1597 }
1598
1599 template <typename I>
1600 int Journal<I>::is_resync_requested(bool *do_resync) {
1601 Mutex::Locker l(m_lock);
1602 return check_resync_requested(do_resync);
1603 }
1604
1605 template <typename I>
1606 int Journal<I>::check_resync_requested(bool *do_resync) {
1607 CephContext *cct = m_image_ctx.cct;
1608 ldout(cct, 20) << this << " " << __func__ << dendl;
1609
1610 assert(m_lock.is_locked());
1611 assert(do_resync != nullptr);
1612
1613 cls::journal::Client client;
1614 int r = m_journaler->get_cached_client(IMAGE_CLIENT_ID, &client);
1615 if (r < 0) {
1616 lderr(cct) << this << " " << __func__ << ": "
1617 << "failed to retrieve client: " << cpp_strerror(r) << dendl;
1618 return r;
1619 }
1620
1621 librbd::journal::ClientData client_data;
1622 bufferlist::iterator bl_it = client.data.begin();
1623 try {
1624 ::decode(client_data, bl_it);
1625 } catch (const buffer::error &err) {
1626 lderr(cct) << this << " " << __func__ << ": "
1627 << "failed to decode client data: " << err << dendl;
1628 return -EINVAL;
1629 }
1630
1631 journal::ImageClientMeta *image_client_meta =
1632 boost::get<journal::ImageClientMeta>(&client_data.client_meta);
1633 if (image_client_meta == nullptr) {
1634 lderr(cct) << this << " " << __func__ << ": "
1635 << "failed to access image client meta struct" << dendl;
1636 return -EINVAL;
1637 }
1638
1639 *do_resync = image_client_meta->resync_requested;
1640
1641 return 0;
1642 }
1643
1644 struct C_RefreshTags : public Context {
1645 util::AsyncOpTracker &async_op_tracker;
1646 Context *on_finish = nullptr;
1647
1648 Mutex lock;
1649 uint64_t tag_tid;
1650 journal::TagData tag_data;
1651
1652 C_RefreshTags(util::AsyncOpTracker &async_op_tracker)
1653 : async_op_tracker(async_op_tracker),
1654 lock("librbd::Journal::C_RefreshTags::lock") {
1655 async_op_tracker.start_op();
1656 }
1657 ~C_RefreshTags() override {
1658 async_op_tracker.finish_op();
1659 }
1660
1661 void finish(int r) override {
1662 on_finish->complete(r);
1663 }
1664 };
1665
1666 template <typename I>
1667 void Journal<I>::handle_metadata_updated() {
1668 CephContext *cct = m_image_ctx.cct;
1669 Mutex::Locker locker(m_lock);
1670
1671 if (m_state != STATE_READY && !is_journal_replaying(m_lock)) {
1672 return;
1673 } else if (is_tag_owner(m_lock)) {
1674 ldout(cct, 20) << this << " " << __func__ << ": primary image" << dendl;
1675 return;
1676 } else if (m_listeners.empty()) {
1677 ldout(cct, 20) << this << " " << __func__ << ": no listeners" << dendl;
1678 return;
1679 }
1680
1681 uint64_t refresh_sequence = ++m_refresh_sequence;
1682 ldout(cct, 20) << this << " " << __func__ << ": "
1683 << "refresh_sequence=" << refresh_sequence << dendl;
1684
1685 // pull the most recent tags from the journal, decode, and
1686 // update the internal tag state
1687 C_RefreshTags *refresh_ctx = new C_RefreshTags(m_async_journal_op_tracker);
1688 refresh_ctx->on_finish = new FunctionContext(
1689 [this, refresh_sequence, refresh_ctx](int r) {
1690 handle_refresh_metadata(refresh_sequence, refresh_ctx->tag_tid,
1691 refresh_ctx->tag_data, r);
1692 });
1693 C_DecodeTags *decode_tags_ctx = new C_DecodeTags(
1694 cct, &refresh_ctx->lock, &refresh_ctx->tag_tid,
1695 &refresh_ctx->tag_data, refresh_ctx);
1696 m_journaler->get_tags(m_tag_tid == 0 ? 0 : m_tag_tid - 1, m_tag_class,
1697 &decode_tags_ctx->tags, decode_tags_ctx);
1698 }
1699
1700 template <typename I>
1701 void Journal<I>::handle_refresh_metadata(uint64_t refresh_sequence,
1702 uint64_t tag_tid,
1703 journal::TagData tag_data, int r) {
1704 CephContext *cct = m_image_ctx.cct;
1705 Mutex::Locker locker(m_lock);
1706
1707 if (r < 0) {
1708 lderr(cct) << this << " " << __func__ << ": failed to refresh metadata: "
1709 << cpp_strerror(r) << dendl;
1710 return;
1711 } else if (m_state != STATE_READY && !is_journal_replaying(m_lock)) {
1712 return;
1713 } else if (refresh_sequence != m_refresh_sequence) {
1714 // another, more up-to-date refresh is in-flight
1715 return;
1716 }
1717
1718 ldout(cct, 20) << this << " " << __func__ << ": "
1719 << "refresh_sequence=" << refresh_sequence << ", "
1720 << "tag_tid=" << tag_tid << ", "
1721 << "tag_data=" << tag_data << dendl;
1722 while (m_listener_notify) {
1723 m_listener_cond.Wait(m_lock);
1724 }
1725
1726 bool was_tag_owner = is_tag_owner(m_lock);
1727 if (m_tag_tid < tag_tid) {
1728 m_tag_tid = tag_tid;
1729 m_tag_data = tag_data;
1730 }
1731 bool promoted_to_primary = (!was_tag_owner && is_tag_owner(m_lock));
1732
1733 bool resync_requested = false;
1734 r = check_resync_requested(&resync_requested);
1735 if (r < 0) {
1736 lderr(cct) << this << " " << __func__ << ": "
1737 << "failed to check if a resync was requested" << dendl;
1738 return;
1739 }
1740
1741 Listeners listeners(m_listeners);
1742 m_listener_notify = true;
1743 m_lock.Unlock();
1744
1745 if (promoted_to_primary) {
1746 for (auto listener : listeners) {
1747 listener->handle_promoted();
1748 }
1749 } else if (resync_requested) {
1750 for (auto listener : listeners) {
1751 listener->handle_resync();
1752 }
1753 }
1754
1755 m_lock.Lock();
1756 m_listener_notify = false;
1757 m_listener_cond.Signal();
1758 }
1759
1760 template <typename I>
1761 void Journal<I>::add_listener(journal::Listener *listener) {
1762 Mutex::Locker locker(m_lock);
1763 m_listeners.insert(listener);
1764 }
1765
1766 template <typename I>
1767 void Journal<I>::remove_listener(journal::Listener *listener) {
1768 Mutex::Locker locker(m_lock);
1769 while (m_listener_notify) {
1770 m_listener_cond.Wait(m_lock);
1771 }
1772 m_listeners.erase(listener);
1773 }
1774
1775 } // namespace librbd
1776
1777 #ifndef TEST_F
1778 template class librbd::Journal<librbd::ImageCtx>;
1779 #endif