]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/MDLog.cc
import ceph pacific 16.2.5
[ceph.git] / ceph / src / mds / MDLog.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "MDSRank.h"
16 #include "MDLog.h"
17 #include "MDCache.h"
18 #include "LogEvent.h"
19 #include "MDSContext.h"
20
21 #include "osdc/Journaler.h"
22 #include "mds/JournalPointer.h"
23
24 #include "common/entity_name.h"
25 #include "common/perf_counters.h"
26 #include "common/Cond.h"
27
28 #include "events/ESubtreeMap.h"
29
30 #include "common/config.h"
31 #include "common/errno.h"
32 #include "include/ceph_assert.h"
33
34 #define dout_context g_ceph_context
35 #define dout_subsys ceph_subsys_mds
36 #undef dout_prefix
37 #define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".log "
38
39 // cons/des
40 MDLog::~MDLog()
41 {
42 if (journaler) { delete journaler; journaler = 0; }
43 if (logger) {
44 g_ceph_context->get_perfcounters_collection()->remove(logger);
45 delete logger;
46 logger = 0;
47 }
48 }
49
50
51 void MDLog::create_logger()
52 {
53 PerfCountersBuilder plb(g_ceph_context, "mds_log", l_mdl_first, l_mdl_last);
54
55 plb.add_u64_counter(l_mdl_evadd, "evadd", "Events submitted", "subm",
56 PerfCountersBuilder::PRIO_INTERESTING);
57 plb.add_u64(l_mdl_ev, "ev", "Events", "evts",
58 PerfCountersBuilder::PRIO_INTERESTING);
59 plb.add_u64(l_mdl_seg, "seg", "Segments", "segs",
60 PerfCountersBuilder::PRIO_INTERESTING);
61
62 plb.set_prio_default(PerfCountersBuilder::PRIO_USEFUL);
63 plb.add_u64(l_mdl_evexg, "evexg", "Expiring events");
64 plb.add_u64(l_mdl_evexd, "evexd", "Current expired events");
65 plb.add_u64(l_mdl_segexg, "segexg", "Expiring segments");
66 plb.add_u64(l_mdl_segexd, "segexd", "Current expired segments");
67 plb.add_u64_counter(l_mdl_replayed, "replayed", "Events replayed",
68 "repl", PerfCountersBuilder::PRIO_INTERESTING);
69 plb.add_time_avg(l_mdl_jlat, "jlat", "Journaler flush latency");
70 plb.add_u64_counter(l_mdl_evex, "evex", "Total expired events");
71 plb.add_u64_counter(l_mdl_evtrm, "evtrm", "Trimmed events");
72 plb.add_u64_counter(l_mdl_segadd, "segadd", "Segments added");
73 plb.add_u64_counter(l_mdl_segex, "segex", "Total expired segments");
74 plb.add_u64_counter(l_mdl_segtrm, "segtrm", "Trimmed segments");
75
76 plb.set_prio_default(PerfCountersBuilder::PRIO_DEBUGONLY);
77 plb.add_u64(l_mdl_expos, "expos", "Journaler xpire position");
78 plb.add_u64(l_mdl_wrpos, "wrpos", "Journaler write position");
79 plb.add_u64(l_mdl_rdpos, "rdpos", "Journaler read position");
80
81 // logger
82 logger = plb.create_perf_counters();
83 g_ceph_context->get_perfcounters_collection()->add(logger);
84 }
85
86 void MDLog::set_write_iohint(unsigned iohint_flags)
87 {
88 journaler->set_write_iohint(iohint_flags);
89 }
90
91 class C_MDL_WriteError : public MDSIOContextBase {
92 protected:
93 MDLog *mdlog;
94 MDSRank *get_mds() override {return mdlog->mds;}
95
96 void finish(int r) override {
97 MDSRank *mds = get_mds();
98 // assume journal is reliable, so don't choose action based on
99 // g_conf()->mds_action_on_write_error.
100 if (r == -CEPHFS_EBLOCKLISTED) {
101 derr << "we have been blocklisted (fenced), respawning..." << dendl;
102 mds->respawn();
103 } else {
104 derr << "unhandled error " << cpp_strerror(r) << ", shutting down..." << dendl;
105 // Although it's possible that this could be something transient,
106 // it's severe and scary, so disable this rank until an administrator
107 // intervenes.
108 mds->clog->error() << "Unhandled journal write error on MDS rank " <<
109 mds->get_nodeid() << ": " << cpp_strerror(r) << ", shutting down.";
110 mds->damaged();
111 ceph_abort(); // damaged should never return
112 }
113 }
114
115 public:
116 explicit C_MDL_WriteError(MDLog *m) :
117 MDSIOContextBase(false), mdlog(m) {}
118 void print(ostream& out) const override {
119 out << "mdlog_write_error";
120 }
121 };
122
123
124 void MDLog::write_head(MDSContext *c)
125 {
126 Context *fin = NULL;
127 if (c != NULL) {
128 fin = new C_IO_Wrapper(mds, c);
129 }
130 journaler->write_head(fin);
131 }
132
133 uint64_t MDLog::get_read_pos() const
134 {
135 return journaler->get_read_pos();
136 }
137
138 uint64_t MDLog::get_write_pos() const
139 {
140 return journaler->get_write_pos();
141 }
142
143 uint64_t MDLog::get_safe_pos() const
144 {
145 return journaler->get_write_safe_pos();
146 }
147
148
149
150 void MDLog::create(MDSContext *c)
151 {
152 dout(5) << "create empty log" << dendl;
153
154 C_GatherBuilder gather(g_ceph_context);
155 // This requires an OnFinisher wrapper because Journaler will call back the completion for write_head inside its own lock
156 // XXX but should maybe that be handled inside Journaler?
157 gather.set_finisher(new C_IO_Wrapper(mds, c));
158
159 // The inode of the default Journaler we will create
160 ino = MDS_INO_LOG_OFFSET + mds->get_nodeid();
161
162 // Instantiate Journaler and start async write to RADOS
163 ceph_assert(journaler == NULL);
164 journaler = new Journaler("mdlog", ino, mds->get_metadata_pool(),
165 CEPH_FS_ONDISK_MAGIC, mds->objecter, logger,
166 l_mdl_jlat, mds->finisher);
167 ceph_assert(journaler->is_readonly());
168 journaler->set_write_error_handler(new C_MDL_WriteError(this));
169 journaler->set_writeable();
170 journaler->create(&mds->mdcache->default_log_layout, g_conf()->mds_journal_format);
171 journaler->write_head(gather.new_sub());
172
173 // Async write JournalPointer to RADOS
174 JournalPointer jp(mds->get_nodeid(), mds->get_metadata_pool());
175 jp.front = ino;
176 jp.back = 0;
177 jp.save(mds->objecter, gather.new_sub());
178
179 gather.activate();
180
181 logger->set(l_mdl_expos, journaler->get_expire_pos());
182 logger->set(l_mdl_wrpos, journaler->get_write_pos());
183
184 submit_thread.create("md_submit");
185 }
186
187 void MDLog::open(MDSContext *c)
188 {
189 dout(5) << "open discovering log bounds" << dendl;
190
191 ceph_assert(!recovery_thread.is_started());
192 recovery_thread.set_completion(c);
193 recovery_thread.create("md_recov_open");
194
195 submit_thread.create("md_submit");
196 // either append() or replay() will follow.
197 }
198
199 /**
200 * Final part of reopen() procedure, after recovery_thread
201 * has done its thing we call append()
202 */
203 class C_ReopenComplete : public MDSInternalContext {
204 MDLog *mdlog;
205 MDSContext *on_complete;
206 public:
207 C_ReopenComplete(MDLog *mdlog_, MDSContext *on_complete_) : MDSInternalContext(mdlog_->mds), mdlog(mdlog_), on_complete(on_complete_) {}
208 void finish(int r) override {
209 mdlog->append();
210 on_complete->complete(r);
211 }
212 };
213
214 /**
215 * Given that open() has been called in the past, go through the journal
216 * recovery procedure again, potentially reformatting the journal if it
217 * was in an old format.
218 */
219 void MDLog::reopen(MDSContext *c)
220 {
221 dout(5) << "reopen" << dendl;
222
223 // Because we will call append() at the completion of this, check that we have already
224 // read the whole journal.
225 ceph_assert(journaler != NULL);
226 ceph_assert(journaler->get_read_pos() == journaler->get_write_pos());
227
228 delete journaler;
229 journaler = NULL;
230
231 // recovery_thread was started at some point in the past. Although
232 // it has called it's completion if we made it back here, it might
233 // still not have been cleaned up: join it.
234 recovery_thread.join();
235
236 recovery_thread.set_completion(new C_ReopenComplete(this, c));
237 recovery_thread.create("md_recov_reopen");
238 }
239
240 void MDLog::append()
241 {
242 dout(5) << "append positioning at end and marking writeable" << dendl;
243 journaler->set_read_pos(journaler->get_write_pos());
244 journaler->set_expire_pos(journaler->get_write_pos());
245
246 journaler->set_writeable();
247
248 logger->set(l_mdl_expos, journaler->get_write_pos());
249 }
250
251
252
253 // -------------------------------------------------
254
255 void MDLog::_start_entry(LogEvent *e)
256 {
257 ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex));
258
259 ceph_assert(cur_event == NULL);
260 cur_event = e;
261
262 event_seq++;
263
264 EMetaBlob *metablob = e->get_metablob();
265 if (metablob) {
266 metablob->event_seq = event_seq;
267 metablob->last_subtree_map = get_last_segment_seq();
268 }
269 }
270
271 void MDLog::cancel_entry(LogEvent *le)
272 {
273 ceph_assert(le == cur_event);
274 cur_event = NULL;
275 delete le;
276 }
277
278 void MDLog::_submit_entry(LogEvent *le, MDSLogContextBase *c)
279 {
280 ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex));
281 ceph_assert(!mds->is_any_replay());
282 ceph_assert(!capped);
283
284 ceph_assert(le == cur_event);
285 cur_event = NULL;
286
287 // let the event register itself in the segment
288 ceph_assert(!segments.empty());
289 LogSegment *ls = segments.rbegin()->second;
290 ls->num_events++;
291
292 le->_segment = ls;
293 le->update_segment();
294 le->set_stamp(ceph_clock_now());
295
296 mdsmap_up_features = mds->mdsmap->get_up_features();
297 pending_events[ls->seq].push_back(PendingEvent(le, c));
298 num_events++;
299
300 if (logger) {
301 logger->inc(l_mdl_evadd);
302 logger->set(l_mdl_ev, num_events);
303 }
304
305 unflushed++;
306
307 uint64_t period = journaler->get_layout_period();
308 // start a new segment?
309 if (le->get_type() == EVENT_SUBTREEMAP ||
310 (le->get_type() == EVENT_IMPORTFINISH && mds->is_resolve())) {
311 // avoid infinite loop when ESubtreeMap is very large.
312 // do not insert ESubtreeMap among EImportFinish events that finish
313 // disambiguate imports. Because the ESubtreeMap reflects the subtree
314 // state when all EImportFinish events are replayed.
315 } else if (ls->end/period != ls->offset/period ||
316 ls->num_events >= g_conf()->mds_log_events_per_segment) {
317 dout(10) << "submit_entry also starting new segment: last = "
318 << ls->seq << "/" << ls->offset << ", event seq = " << event_seq << dendl;
319 _start_new_segment();
320 } else if (g_conf()->mds_debug_subtrees &&
321 le->get_type() != EVENT_SUBTREEMAP_TEST) {
322 // debug: journal this every time to catch subtree replay bugs.
323 // use a different event id so it doesn't get interpreted as a
324 // LogSegment boundary on replay.
325 LogEvent *sle = mds->mdcache->create_subtree_map();
326 sle->set_type(EVENT_SUBTREEMAP_TEST);
327 _submit_entry(sle, NULL);
328 }
329 }
330
331 /**
332 * Invoked on the flush after each entry submitted
333 */
334 class C_MDL_Flushed : public MDSLogContextBase {
335 protected:
336 MDLog *mdlog;
337 MDSRank *get_mds() override {return mdlog->mds;}
338 MDSContext *wrapped;
339
340 void finish(int r) override {
341 if (wrapped)
342 wrapped->complete(r);
343 }
344
345 public:
346 C_MDL_Flushed(MDLog *m, MDSContext *w)
347 : mdlog(m), wrapped(w) {}
348 C_MDL_Flushed(MDLog *m, uint64_t wp) : mdlog(m), wrapped(NULL) {
349 set_write_pos(wp);
350 }
351 };
352
353 void MDLog::_submit_thread()
354 {
355 dout(10) << "_submit_thread start" << dendl;
356
357 std::unique_lock locker{submit_mutex};
358
359 while (!mds->is_daemon_stopping()) {
360 if (g_conf()->mds_log_pause) {
361 submit_cond.wait(locker);
362 continue;
363 }
364
365 map<uint64_t,list<PendingEvent> >::iterator it = pending_events.begin();
366 if (it == pending_events.end()) {
367 submit_cond.wait(locker);
368 continue;
369 }
370
371 if (it->second.empty()) {
372 pending_events.erase(it);
373 continue;
374 }
375
376 int64_t features = mdsmap_up_features;
377 PendingEvent data = it->second.front();
378 it->second.pop_front();
379
380 locker.unlock();
381
382 if (data.le) {
383 LogEvent *le = data.le;
384 LogSegment *ls = le->_segment;
385 // encode it, with event type
386 bufferlist bl;
387 le->encode_with_header(bl, features);
388
389 uint64_t write_pos = journaler->get_write_pos();
390
391 le->set_start_off(write_pos);
392 if (le->get_type() == EVENT_SUBTREEMAP)
393 ls->offset = write_pos;
394
395 dout(5) << "_submit_thread " << write_pos << "~" << bl.length()
396 << " : " << *le << dendl;
397
398 // journal it.
399 const uint64_t new_write_pos = journaler->append_entry(bl); // bl is destroyed.
400 ls->end = new_write_pos;
401
402 MDSLogContextBase *fin;
403 if (data.fin) {
404 fin = dynamic_cast<MDSLogContextBase*>(data.fin);
405 ceph_assert(fin);
406 fin->set_write_pos(new_write_pos);
407 } else {
408 fin = new C_MDL_Flushed(this, new_write_pos);
409 }
410
411 journaler->wait_for_flush(fin);
412
413 if (data.flush)
414 journaler->flush();
415
416 if (logger)
417 logger->set(l_mdl_wrpos, ls->end);
418
419 delete le;
420 } else {
421 if (data.fin) {
422 MDSContext* fin =
423 dynamic_cast<MDSContext*>(data.fin);
424 ceph_assert(fin);
425 C_MDL_Flushed *fin2 = new C_MDL_Flushed(this, fin);
426 fin2->set_write_pos(journaler->get_write_pos());
427 journaler->wait_for_flush(fin2);
428 }
429 if (data.flush)
430 journaler->flush();
431 }
432
433 locker.lock();
434 if (data.flush)
435 unflushed = 0;
436 else if (data.le)
437 unflushed++;
438 }
439 }
440
441 void MDLog::wait_for_safe(MDSContext *c)
442 {
443 submit_mutex.lock();
444
445 bool no_pending = true;
446 if (!pending_events.empty()) {
447 pending_events.rbegin()->second.push_back(PendingEvent(NULL, c));
448 no_pending = false;
449 submit_cond.notify_all();
450 }
451
452 submit_mutex.unlock();
453
454 if (no_pending && c)
455 journaler->wait_for_flush(new C_IO_Wrapper(mds, c));
456 }
457
458 void MDLog::flush()
459 {
460 submit_mutex.lock();
461
462 bool do_flush = unflushed > 0;
463 unflushed = 0;
464 if (!pending_events.empty()) {
465 pending_events.rbegin()->second.push_back(PendingEvent(NULL, NULL, true));
466 do_flush = false;
467 submit_cond.notify_all();
468 }
469
470 submit_mutex.unlock();
471
472 if (do_flush)
473 journaler->flush();
474 }
475
476 void MDLog::kick_submitter()
477 {
478 std::lock_guard l(submit_mutex);
479 submit_cond.notify_all();
480 }
481
482 void MDLog::cap()
483 {
484 dout(5) << "cap" << dendl;
485 capped = true;
486 }
487
488 void MDLog::shutdown()
489 {
490 ceph_assert(ceph_mutex_is_locked_by_me(mds->mds_lock));
491
492 dout(5) << "shutdown" << dendl;
493 if (submit_thread.is_started()) {
494 ceph_assert(mds->is_daemon_stopping());
495
496 if (submit_thread.am_self()) {
497 // Called suicide from the thread: trust it to do no work after
498 // returning from suicide, and subsequently respect mds->is_daemon_stopping()
499 // and fall out of its loop.
500 } else {
501 mds->mds_lock.unlock();
502 // Because MDS::stopping is true, it's safe to drop mds_lock: nobody else
503 // picking it up will do anything with it.
504
505 submit_mutex.lock();
506 submit_cond.notify_all();
507 submit_mutex.unlock();
508
509 mds->mds_lock.lock();
510
511 submit_thread.join();
512 }
513 }
514
515 // Replay thread can be stuck inside e.g. Journaler::wait_for_readable,
516 // so we need to shutdown the journaler first.
517 if (journaler) {
518 journaler->shutdown();
519 }
520
521 if (replay_thread.is_started() && !replay_thread.am_self()) {
522 mds->mds_lock.unlock();
523 replay_thread.join();
524 mds->mds_lock.lock();
525 }
526
527 if (recovery_thread.is_started() && !recovery_thread.am_self()) {
528 mds->mds_lock.unlock();
529 recovery_thread.join();
530 mds->mds_lock.lock();
531 }
532 }
533
534
535 // -----------------------------
536 // segments
537
538 void MDLog::_start_new_segment()
539 {
540 _prepare_new_segment();
541 _journal_segment_subtree_map(NULL);
542 }
543
544 void MDLog::_prepare_new_segment()
545 {
546 ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex));
547
548 uint64_t seq = event_seq + 1;
549 dout(7) << __func__ << " seq " << seq << dendl;
550
551 segments[seq] = new LogSegment(seq);
552
553 logger->inc(l_mdl_segadd);
554 logger->set(l_mdl_seg, segments.size());
555
556 // Adjust to next stray dir
557 mds->mdcache->advance_stray();
558 }
559
560 void MDLog::_journal_segment_subtree_map(MDSContext *onsync)
561 {
562 ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex));
563
564 dout(7) << __func__ << dendl;
565 ESubtreeMap *sle = mds->mdcache->create_subtree_map();
566 sle->event_seq = get_last_segment_seq();
567
568 _submit_entry(sle, new C_MDL_Flushed(this, onsync));
569 }
570
571 class C_OFT_Committed : public MDSInternalContext {
572 MDLog *mdlog;
573 uint64_t seq;
574 public:
575 C_OFT_Committed(MDLog *l, uint64_t s) :
576 MDSInternalContext(l->mds), mdlog(l), seq(s) {}
577 void finish(int ret) override {
578 mdlog->trim_expired_segments();
579 }
580 };
581
582 void MDLog::trim(int m)
583 {
584 unsigned max_segments = g_conf()->mds_log_max_segments;
585 int max_events = g_conf()->mds_log_max_events;
586 if (m >= 0)
587 max_events = m;
588
589 if (mds->mdcache->is_readonly()) {
590 dout(10) << "trim, ignoring read-only FS" << dendl;
591 return;
592 }
593
594 // Clamp max_events to not be smaller than events per segment
595 if (max_events > 0 && max_events <= g_conf()->mds_log_events_per_segment) {
596 max_events = g_conf()->mds_log_events_per_segment + 1;
597 }
598
599 submit_mutex.lock();
600
601 // trim!
602 dout(10) << "trim "
603 << segments.size() << " / " << max_segments << " segments, "
604 << num_events << " / " << max_events << " events"
605 << ", " << expiring_segments.size() << " (" << expiring_events << ") expiring"
606 << ", " << expired_segments.size() << " (" << expired_events << ") expired"
607 << dendl;
608
609 if (segments.empty()) {
610 submit_mutex.unlock();
611 return;
612 }
613
614 // hack: only trim for a few seconds at a time
615 utime_t stop = ceph_clock_now();
616 stop += 2.0;
617
618 int op_prio = CEPH_MSG_PRIO_LOW +
619 (CEPH_MSG_PRIO_HIGH - CEPH_MSG_PRIO_LOW) *
620 expiring_segments.size() / max_segments;
621 if (op_prio > CEPH_MSG_PRIO_HIGH)
622 op_prio = CEPH_MSG_PRIO_HIGH;
623
624 unsigned new_expiring_segments = 0;
625
626 unsigned max_expiring_segments = 0;
627 if (pre_segments_size > 0){
628 max_expiring_segments = max_segments/2;
629 assert(segments.size() >= pre_segments_size);
630 max_expiring_segments = std::max<unsigned>(max_expiring_segments,segments.size() - pre_segments_size);
631 }
632
633 map<uint64_t,LogSegment*>::iterator p = segments.begin();
634 while (p != segments.end()) {
635 if (stop < ceph_clock_now())
636 break;
637
638 unsigned num_remaining_segments = (segments.size() - expired_segments.size() - expiring_segments.size());
639 if ((num_remaining_segments <= max_segments) &&
640 (max_events < 0 || num_events - expiring_events - expired_events <= max_events))
641 break;
642
643 // Do not trim too many segments at once for peak workload. If mds keeps creating N segments each tick,
644 // the upper bound of 'num_remaining_segments - max_segments' is '2 * N'
645 if (new_expiring_segments * 2 > num_remaining_segments)
646 break;
647
648 if (max_expiring_segments > 0 &&
649 expiring_segments.size() >= max_expiring_segments)
650 break;
651
652 // look at first segment
653 LogSegment *ls = p->second;
654 ceph_assert(ls);
655 ++p;
656
657 if (pending_events.count(ls->seq) ||
658 ls->end > safe_pos) {
659 dout(5) << "trim segment " << ls->seq << "/" << ls->offset << ", not fully flushed yet, safe "
660 << journaler->get_write_safe_pos() << " < end " << ls->end << dendl;
661 break;
662 }
663
664 if (expiring_segments.count(ls)) {
665 dout(5) << "trim already expiring segment " << ls->seq << "/" << ls->offset
666 << ", " << ls->num_events << " events" << dendl;
667 } else if (expired_segments.count(ls)) {
668 dout(5) << "trim already expired segment " << ls->seq << "/" << ls->offset
669 << ", " << ls->num_events << " events" << dendl;
670 } else {
671 ceph_assert(expiring_segments.count(ls) == 0);
672 new_expiring_segments++;
673 expiring_segments.insert(ls);
674 expiring_events += ls->num_events;
675 submit_mutex.unlock();
676
677 uint64_t last_seq = ls->seq;
678 try_expire(ls, op_prio);
679
680 submit_mutex.lock();
681 p = segments.lower_bound(last_seq + 1);
682 }
683 }
684
685 if (!capped &&
686 !mds->mdcache->open_file_table.is_any_committing()) {
687 uint64_t last_seq = get_last_segment_seq();
688 if (mds->mdcache->open_file_table.is_any_dirty() ||
689 last_seq > mds->mdcache->open_file_table.get_committed_log_seq()) {
690 submit_mutex.unlock();
691 mds->mdcache->open_file_table.commit(new C_OFT_Committed(this, last_seq),
692 last_seq, CEPH_MSG_PRIO_HIGH);
693 submit_mutex.lock();
694 }
695 }
696
697 // discard expired segments and unlock submit_mutex
698 _trim_expired_segments();
699 }
700
701 class C_MaybeExpiredSegment : public MDSInternalContext {
702 MDLog *mdlog;
703 LogSegment *ls;
704 int op_prio;
705 public:
706 C_MaybeExpiredSegment(MDLog *mdl, LogSegment *s, int p) :
707 MDSInternalContext(mdl->mds), mdlog(mdl), ls(s), op_prio(p) {}
708 void finish(int res) override {
709 if (res < 0)
710 mdlog->mds->handle_write_error(res);
711 mdlog->_maybe_expired(ls, op_prio);
712 }
713 };
714
715 /**
716 * Like MDLog::trim, but instead of trimming to max_segments, trim all but the latest
717 * segment.
718 */
719 int MDLog::trim_all()
720 {
721 submit_mutex.lock();
722
723 dout(10) << __func__ << ": "
724 << segments.size()
725 << "/" << expiring_segments.size()
726 << "/" << expired_segments.size() << dendl;
727
728 uint64_t last_seq = 0;
729 if (!segments.empty()) {
730 last_seq = get_last_segment_seq();
731 if (!capped &&
732 !mds->mdcache->open_file_table.is_any_committing() &&
733 last_seq > mds->mdcache->open_file_table.get_committing_log_seq()) {
734 submit_mutex.unlock();
735 mds->mdcache->open_file_table.commit(new C_OFT_Committed(this, last_seq),
736 last_seq, CEPH_MSG_PRIO_DEFAULT);
737 submit_mutex.lock();
738 }
739 }
740
741 map<uint64_t,LogSegment*>::iterator p = segments.begin();
742 while (p != segments.end() &&
743 p->first < last_seq &&
744 p->second->end < safe_pos) { // next segment should have been started
745 LogSegment *ls = p->second;
746 ++p;
747
748 // Caller should have flushed journaler before calling this
749 if (pending_events.count(ls->seq)) {
750 dout(5) << __func__ << ": segment " << ls->seq << " has pending events" << dendl;
751 submit_mutex.unlock();
752 return -CEPHFS_EAGAIN;
753 }
754
755 if (expiring_segments.count(ls)) {
756 dout(5) << "trim already expiring segment " << ls->seq << "/" << ls->offset
757 << ", " << ls->num_events << " events" << dendl;
758 } else if (expired_segments.count(ls)) {
759 dout(5) << "trim already expired segment " << ls->seq << "/" << ls->offset
760 << ", " << ls->num_events << " events" << dendl;
761 } else {
762 ceph_assert(expiring_segments.count(ls) == 0);
763 expiring_segments.insert(ls);
764 expiring_events += ls->num_events;
765 submit_mutex.unlock();
766
767 uint64_t next_seq = ls->seq + 1;
768 try_expire(ls, CEPH_MSG_PRIO_DEFAULT);
769
770 submit_mutex.lock();
771 p = segments.lower_bound(next_seq);
772 }
773 }
774
775 _trim_expired_segments();
776
777 return 0;
778 }
779
780
781 void MDLog::try_expire(LogSegment *ls, int op_prio)
782 {
783 MDSGatherBuilder gather_bld(g_ceph_context);
784 ls->try_to_expire(mds, gather_bld, op_prio);
785
786 if (gather_bld.has_subs()) {
787 dout(5) << "try_expire expiring segment " << ls->seq << "/" << ls->offset << dendl;
788 gather_bld.set_finisher(new C_MaybeExpiredSegment(this, ls, op_prio));
789 gather_bld.activate();
790 } else {
791 dout(10) << "try_expire expired segment " << ls->seq << "/" << ls->offset << dendl;
792 submit_mutex.lock();
793 ceph_assert(expiring_segments.count(ls));
794 expiring_segments.erase(ls);
795 expiring_events -= ls->num_events;
796 _expired(ls);
797 submit_mutex.unlock();
798 }
799
800 logger->set(l_mdl_segexg, expiring_segments.size());
801 logger->set(l_mdl_evexg, expiring_events);
802 }
803
804 void MDLog::_maybe_expired(LogSegment *ls, int op_prio)
805 {
806 if (mds->mdcache->is_readonly()) {
807 dout(10) << "_maybe_expired, ignoring read-only FS" << dendl;
808 return;
809 }
810
811 dout(10) << "_maybe_expired segment " << ls->seq << "/" << ls->offset
812 << ", " << ls->num_events << " events" << dendl;
813 try_expire(ls, op_prio);
814 }
815
816 void MDLog::_trim_expired_segments()
817 {
818 ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex));
819
820 uint64_t oft_committed_seq = mds->mdcache->open_file_table.get_committed_log_seq();
821
822 // trim expired segments?
823 bool trimmed = false;
824 while (!segments.empty()) {
825 LogSegment *ls = segments.begin()->second;
826 if (!expired_segments.count(ls)) {
827 dout(10) << "_trim_expired_segments waiting for " << ls->seq << "/" << ls->offset
828 << " to expire" << dendl;
829 break;
830 }
831
832 if (!capped && ls->seq >= oft_committed_seq) {
833 dout(10) << "_trim_expired_segments open file table committedseq " << oft_committed_seq
834 << " <= " << ls->seq << "/" << ls->offset << dendl;
835 break;
836 }
837
838 dout(10) << "_trim_expired_segments trimming expired "
839 << ls->seq << "/0x" << std::hex << ls->offset << std::dec << dendl;
840 expired_events -= ls->num_events;
841 expired_segments.erase(ls);
842 if (pre_segments_size > 0)
843 pre_segments_size--;
844 num_events -= ls->num_events;
845
846 // this was the oldest segment, adjust expire pos
847 if (journaler->get_expire_pos() < ls->end) {
848 journaler->set_expire_pos(ls->end);
849 logger->set(l_mdl_expos, ls->end);
850 } else {
851 logger->set(l_mdl_expos, ls->offset);
852 }
853
854 logger->inc(l_mdl_segtrm);
855 logger->inc(l_mdl_evtrm, ls->num_events);
856
857 segments.erase(ls->seq);
858 delete ls;
859 trimmed = true;
860 }
861
862 submit_mutex.unlock();
863
864 if (trimmed)
865 journaler->write_head(0);
866 }
867
868 void MDLog::trim_expired_segments()
869 {
870 submit_mutex.lock();
871 _trim_expired_segments();
872 }
873
874 void MDLog::_expired(LogSegment *ls)
875 {
876 ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex));
877
878 dout(5) << "_expired segment " << ls->seq << "/" << ls->offset
879 << ", " << ls->num_events << " events" << dendl;
880
881 if (!capped && ls == peek_current_segment()) {
882 dout(5) << "_expired not expiring " << ls->seq << "/" << ls->offset
883 << ", last one and !capped" << dendl;
884 } else {
885 // expired.
886 expired_segments.insert(ls);
887 expired_events += ls->num_events;
888
889 // Trigger all waiters
890 finish_contexts(g_ceph_context, ls->expiry_waiters);
891
892 logger->inc(l_mdl_evex, ls->num_events);
893 logger->inc(l_mdl_segex);
894 }
895
896 logger->set(l_mdl_ev, num_events);
897 logger->set(l_mdl_evexd, expired_events);
898 logger->set(l_mdl_seg, segments.size());
899 logger->set(l_mdl_segexd, expired_segments.size());
900 }
901
902
903
904 void MDLog::replay(MDSContext *c)
905 {
906 ceph_assert(journaler->is_active());
907 ceph_assert(journaler->is_readonly());
908
909 // empty?
910 if (journaler->get_read_pos() == journaler->get_write_pos()) {
911 dout(10) << "replay - journal empty, done." << dendl;
912 mds->mdcache->trim();
913 if (mds->is_standby_replay())
914 mds->update_mlogger();
915 if (c) {
916 c->complete(0);
917 }
918 return;
919 }
920
921 // add waiter
922 if (c)
923 waitfor_replay.push_back(c);
924
925 // go!
926 dout(10) << "replay start, from " << journaler->get_read_pos()
927 << " to " << journaler->get_write_pos() << dendl;
928
929 ceph_assert(num_events == 0 || already_replayed);
930 if (already_replayed) {
931 // Ensure previous instance of ReplayThread is joined before
932 // we create another one
933 replay_thread.join();
934 }
935 already_replayed = true;
936
937 replay_thread.create("md_log_replay");
938 }
939
940
941 /**
942 * Resolve the JournalPointer object to a journal file, and
943 * instantiate a Journaler object. This may re-write the journal
944 * if the journal in RADOS appears to be in an old format.
945 *
946 * This is a separate thread because of the way it is initialized from inside
947 * the mds lock, which is also the global objecter lock -- rather than split
948 * it up into hard-to-read async operations linked up by contexts,
949 *
950 * When this function completes, the `journaler` attribute will be set to
951 * a Journaler instance using the latest available serialization format.
952 */
953 void MDLog::_recovery_thread(MDSContext *completion)
954 {
955 ceph_assert(journaler == NULL);
956 if (g_conf()->mds_journal_format > JOURNAL_FORMAT_MAX) {
957 dout(0) << "Configuration value for mds_journal_format is out of bounds, max is "
958 << JOURNAL_FORMAT_MAX << dendl;
959
960 // Oh dear, something unreadable in the store for this rank: require
961 // operator intervention.
962 mds->damaged_unlocked();
963 ceph_abort(); // damaged should not return
964 }
965
966 // First, read the pointer object.
967 // If the pointer object is not present, then create it with
968 // front = default ino and back = null
969 JournalPointer jp(mds->get_nodeid(), mds->get_metadata_pool());
970 const int read_result = jp.load(mds->objecter);
971 if (read_result == -CEPHFS_ENOENT) {
972 inodeno_t const default_log_ino = MDS_INO_LOG_OFFSET + mds->get_nodeid();
973 jp.front = default_log_ino;
974 int write_result = jp.save(mds->objecter);
975 // Nothing graceful we can do for this
976 ceph_assert(write_result >= 0);
977 } else if (read_result == -CEPHFS_EBLOCKLISTED) {
978 derr << "Blocklisted during JournalPointer read! Respawning..." << dendl;
979 mds->respawn();
980 ceph_abort(); // Should be unreachable because respawn calls execv
981 } else if (read_result != 0) {
982 mds->clog->error() << "failed to read JournalPointer: " << read_result
983 << " (" << cpp_strerror(read_result) << ")";
984 mds->damaged_unlocked();
985 ceph_abort(); // Should be unreachable because damaged() calls respawn()
986 }
987
988 // If the back pointer is non-null, that means that a journal
989 // rewrite failed part way through. Erase the back journal
990 // to clean up.
991 if (jp.back) {
992 if (mds->is_standby_replay()) {
993 dout(1) << "Journal " << jp.front << " is being rewritten, "
994 << "cannot replay in standby until an active MDS completes rewrite" << dendl;
995 std::lock_guard l(mds->mds_lock);
996 if (mds->is_daemon_stopping()) {
997 return;
998 }
999 completion->complete(-CEPHFS_EAGAIN);
1000 return;
1001 }
1002 dout(1) << "Erasing journal " << jp.back << dendl;
1003 C_SaferCond erase_waiter;
1004 Journaler back("mdlog", jp.back, mds->get_metadata_pool(),
1005 CEPH_FS_ONDISK_MAGIC, mds->objecter, logger, l_mdl_jlat,
1006 mds->finisher);
1007
1008 // Read all about this journal (header + extents)
1009 C_SaferCond recover_wait;
1010 back.recover(&recover_wait);
1011 int recovery_result = recover_wait.wait();
1012 if (recovery_result == -CEPHFS_EBLOCKLISTED) {
1013 derr << "Blocklisted during journal recovery! Respawning..." << dendl;
1014 mds->respawn();
1015 ceph_abort(); // Should be unreachable because respawn calls execv
1016 } else if (recovery_result != 0) {
1017 // Journaler.recover succeeds if no journal objects are present: an error
1018 // means something worse like a corrupt header, which we can't handle here.
1019 mds->clog->error() << "Error recovering journal " << jp.front << ": "
1020 << cpp_strerror(recovery_result);
1021 mds->damaged_unlocked();
1022 ceph_assert(recovery_result == 0); // Unreachable because damaged() calls respawn()
1023 }
1024
1025 // We could read journal, so we can erase it.
1026 back.erase(&erase_waiter);
1027 int erase_result = erase_waiter.wait();
1028
1029 // If we are successful, or find no data, we can update the JournalPointer to
1030 // reflect that the back journal is gone.
1031 if (erase_result != 0 && erase_result != -CEPHFS_ENOENT) {
1032 derr << "Failed to erase journal " << jp.back << ": " << cpp_strerror(erase_result) << dendl;
1033 } else {
1034 dout(1) << "Successfully erased journal, updating journal pointer" << dendl;
1035 jp.back = 0;
1036 int write_result = jp.save(mds->objecter);
1037 // Nothing graceful we can do for this
1038 ceph_assert(write_result >= 0);
1039 }
1040 }
1041
1042 /* Read the header from the front journal */
1043 Journaler *front_journal = new Journaler("mdlog", jp.front,
1044 mds->get_metadata_pool(), CEPH_FS_ONDISK_MAGIC, mds->objecter,
1045 logger, l_mdl_jlat, mds->finisher);
1046
1047 // Assign to ::journaler so that we can be aborted by ::shutdown while
1048 // waiting for journaler recovery
1049 {
1050 std::lock_guard l(mds->mds_lock);
1051 journaler = front_journal;
1052 }
1053
1054 C_SaferCond recover_wait;
1055 front_journal->recover(&recover_wait);
1056 dout(4) << "Waiting for journal " << jp.front << " to recover..." << dendl;
1057 int recovery_result = recover_wait.wait();
1058 dout(4) << "Journal " << jp.front << " recovered." << dendl;
1059
1060 if (recovery_result == -CEPHFS_EBLOCKLISTED) {
1061 derr << "Blocklisted during journal recovery! Respawning..." << dendl;
1062 mds->respawn();
1063 ceph_abort(); // Should be unreachable because respawn calls execv
1064 } else if (recovery_result != 0) {
1065 mds->clog->error() << "Error recovering journal " << jp.front << ": "
1066 << cpp_strerror(recovery_result);
1067 mds->damaged_unlocked();
1068 ceph_assert(recovery_result == 0); // Unreachable because damaged() calls respawn()
1069 }
1070
1071 /* Check whether the front journal format is acceptable or needs re-write */
1072 if (front_journal->get_stream_format() > JOURNAL_FORMAT_MAX) {
1073 dout(0) << "Journal " << jp.front << " is in unknown format " << front_journal->get_stream_format()
1074 << ", does this MDS daemon require upgrade?" << dendl;
1075 {
1076 std::lock_guard l(mds->mds_lock);
1077 if (mds->is_daemon_stopping()) {
1078 journaler = NULL;
1079 delete front_journal;
1080 return;
1081 }
1082 completion->complete(-CEPHFS_EINVAL);
1083 }
1084 } else if (mds->is_standby_replay() || front_journal->get_stream_format() >= g_conf()->mds_journal_format) {
1085 /* The journal is of configured format, or we are in standbyreplay and will
1086 * tolerate replaying old journals until we have to go active. Use front_journal as
1087 * our journaler attribute and complete */
1088 dout(4) << "Recovered journal " << jp.front << " in format " << front_journal->get_stream_format() << dendl;
1089 {
1090 std::lock_guard l(mds->mds_lock);
1091 journaler->set_write_error_handler(new C_MDL_WriteError(this));
1092 if (mds->is_daemon_stopping()) {
1093 return;
1094 }
1095 completion->complete(0);
1096 }
1097 } else {
1098 /* Hand off to reformat routine, which will ultimately set the
1099 * completion when it has done its thing */
1100 dout(1) << "Journal " << jp.front << " has old format "
1101 << front_journal->get_stream_format() << ", it will now be updated" << dendl;
1102 _reformat_journal(jp, front_journal, completion);
1103 }
1104 }
1105
1106 /**
1107 * Blocking rewrite of the journal to a new file, followed by
1108 * swap of journal pointer to point to the new one.
1109 *
1110 * We write the new journal to the 'back' journal from the JournalPointer,
1111 * swapping pointers to make that one the front journal only when we have
1112 * safely completed.
1113 */
1114 void MDLog::_reformat_journal(JournalPointer const &jp_in, Journaler *old_journal, MDSContext *completion)
1115 {
1116 ceph_assert(!jp_in.is_null());
1117 ceph_assert(completion != NULL);
1118 ceph_assert(old_journal != NULL);
1119
1120 JournalPointer jp = jp_in;
1121
1122 /* Set JournalPointer.back to the location we will write the new journal */
1123 inodeno_t primary_ino = MDS_INO_LOG_OFFSET + mds->get_nodeid();
1124 inodeno_t secondary_ino = MDS_INO_LOG_BACKUP_OFFSET + mds->get_nodeid();
1125 jp.back = (jp.front == primary_ino ? secondary_ino : primary_ino);
1126 int write_result = jp.save(mds->objecter);
1127 ceph_assert(write_result == 0);
1128
1129 /* Create the new Journaler file */
1130 Journaler *new_journal = new Journaler("mdlog", jp.back,
1131 mds->get_metadata_pool(), CEPH_FS_ONDISK_MAGIC, mds->objecter, logger, l_mdl_jlat, mds->finisher);
1132 dout(4) << "Writing new journal header " << jp.back << dendl;
1133 file_layout_t new_layout = old_journal->get_layout();
1134 new_journal->set_writeable();
1135 new_journal->create(&new_layout, g_conf()->mds_journal_format);
1136
1137 /* Write the new journal header to RADOS */
1138 C_SaferCond write_head_wait;
1139 new_journal->write_head(&write_head_wait);
1140 write_head_wait.wait();
1141
1142 // Read in the old journal, and whenever we have readable events,
1143 // write them to the new journal.
1144 int r = 0;
1145
1146 // In old format journals before event_seq was introduced, the serialized
1147 // offset of a SubtreeMap message in the log is used as the unique ID for
1148 // a log segment. Because we change serialization, this will end up changing
1149 // for us, so we have to explicitly update the fields that point back to that
1150 // log segment.
1151 std::map<LogSegment::seq_t, LogSegment::seq_t> segment_pos_rewrite;
1152
1153 // The logic in here borrowed from replay_thread expects mds_lock to be held,
1154 // e.g. between checking readable and doing wait_for_readable so that journaler
1155 // state doesn't change in between.
1156 uint32_t events_transcribed = 0;
1157 while (1) {
1158 while (!old_journal->is_readable() &&
1159 old_journal->get_read_pos() < old_journal->get_write_pos() &&
1160 !old_journal->get_error()) {
1161
1162 // Issue a journal prefetch
1163 C_SaferCond readable_waiter;
1164 old_journal->wait_for_readable(&readable_waiter);
1165
1166 // Wait for a journal prefetch to complete
1167 readable_waiter.wait();
1168 }
1169 if (old_journal->get_error()) {
1170 r = old_journal->get_error();
1171 dout(0) << "_replay journaler got error " << r << ", aborting" << dendl;
1172 break;
1173 }
1174
1175 if (!old_journal->is_readable() &&
1176 old_journal->get_read_pos() == old_journal->get_write_pos())
1177 break;
1178
1179 // Read one serialized LogEvent
1180 ceph_assert(old_journal->is_readable());
1181 bufferlist bl;
1182 uint64_t le_pos = old_journal->get_read_pos();
1183 bool r = old_journal->try_read_entry(bl);
1184 if (!r && old_journal->get_error())
1185 continue;
1186 ceph_assert(r);
1187
1188 // Update segment_pos_rewrite
1189 auto le = LogEvent::decode_event(bl.cbegin());
1190 if (le) {
1191 bool modified = false;
1192
1193 if (le->get_type() == EVENT_SUBTREEMAP ||
1194 le->get_type() == EVENT_RESETJOURNAL) {
1195 auto sle = dynamic_cast<ESubtreeMap*>(le.get());
1196 if (sle == NULL || sle->event_seq == 0) {
1197 // A non-explicit event seq: the effective sequence number
1198 // of this segment is it's position in the old journal and
1199 // the new effective sequence number will be its position
1200 // in the new journal.
1201 segment_pos_rewrite[le_pos] = new_journal->get_write_pos();
1202 dout(20) << __func__ << " discovered segment seq mapping "
1203 << le_pos << " -> " << new_journal->get_write_pos() << dendl;
1204 }
1205 } else {
1206 event_seq++;
1207 }
1208
1209 // Rewrite segment references if necessary
1210 EMetaBlob *blob = le->get_metablob();
1211 if (blob) {
1212 modified = blob->rewrite_truncate_finish(mds, segment_pos_rewrite);
1213 }
1214
1215 // Zero-out expire_pos in subtreemap because offsets have changed
1216 // (expire_pos is just an optimization so it's safe to eliminate it)
1217 if (le->get_type() == EVENT_SUBTREEMAP
1218 || le->get_type() == EVENT_SUBTREEMAP_TEST) {
1219 auto& sle = dynamic_cast<ESubtreeMap&>(*le);
1220 dout(20) << __func__ << " zeroing expire_pos in subtreemap event at "
1221 << le_pos << " seq=" << sle.event_seq << dendl;
1222 sle.expire_pos = 0;
1223 modified = true;
1224 }
1225
1226 if (modified) {
1227 bl.clear();
1228 le->encode_with_header(bl, mds->mdsmap->get_up_features());
1229 }
1230 } else {
1231 // Failure from LogEvent::decode, our job is to change the journal wrapper,
1232 // not validate the contents, so pass it through.
1233 dout(1) << __func__ << " transcribing un-decodable LogEvent at old position "
1234 << old_journal->get_read_pos() << ", new position " << new_journal->get_write_pos()
1235 << dendl;
1236 }
1237
1238 // Write (buffered, synchronous) one serialized LogEvent
1239 events_transcribed += 1;
1240 new_journal->append_entry(bl);
1241 }
1242
1243 dout(1) << "Transcribed " << events_transcribed << " events, flushing new journal" << dendl;
1244 C_SaferCond flush_waiter;
1245 new_journal->flush(&flush_waiter);
1246 flush_waiter.wait();
1247
1248 // If failed to rewrite journal, leave the part written journal
1249 // as garbage to be cleaned up next startup.
1250 ceph_assert(r == 0);
1251
1252 /* Now that the new journal is safe, we can flip the pointers */
1253 inodeno_t const tmp = jp.front;
1254 jp.front = jp.back;
1255 jp.back = tmp;
1256 write_result = jp.save(mds->objecter);
1257 ceph_assert(write_result == 0);
1258
1259 /* Delete the old journal to free space */
1260 dout(1) << "New journal flushed, erasing old journal" << dendl;
1261 C_SaferCond erase_waiter;
1262 old_journal->erase(&erase_waiter);
1263 int erase_result = erase_waiter.wait();
1264 ceph_assert(erase_result == 0);
1265 {
1266 std::lock_guard l(mds->mds_lock);
1267 if (mds->is_daemon_stopping()) {
1268 delete new_journal;
1269 return;
1270 }
1271 ceph_assert(journaler == old_journal);
1272 journaler = NULL;
1273 delete old_journal;
1274
1275 /* Update the pointer to reflect we're back in clean single journal state. */
1276 jp.back = 0;
1277 write_result = jp.save(mds->objecter);
1278 ceph_assert(write_result == 0);
1279
1280 /* Reset the Journaler object to its default state */
1281 dout(1) << "Journal rewrite complete, continuing with normal startup" << dendl;
1282 if (mds->is_daemon_stopping()) {
1283 delete new_journal;
1284 return;
1285 }
1286 journaler = new_journal;
1287 journaler->set_readonly();
1288 journaler->set_write_error_handler(new C_MDL_WriteError(this));
1289
1290 /* Trigger completion */
1291 if (mds->is_daemon_stopping()) {
1292 return;
1293 }
1294 completion->complete(0);
1295 }
1296 }
1297
1298
1299 // i am a separate thread
1300 void MDLog::_replay_thread()
1301 {
1302 dout(10) << "_replay_thread start" << dendl;
1303
1304 // loop
1305 int r = 0;
1306 while (1) {
1307 // wait for read?
1308 while (!journaler->is_readable() &&
1309 journaler->get_read_pos() < journaler->get_write_pos() &&
1310 !journaler->get_error()) {
1311 C_SaferCond readable_waiter;
1312 journaler->wait_for_readable(&readable_waiter);
1313 r = readable_waiter.wait();
1314 }
1315 if (journaler->get_error()) {
1316 r = journaler->get_error();
1317 dout(0) << "_replay journaler got error " << r << ", aborting" << dendl;
1318 if (r == -CEPHFS_ENOENT) {
1319 if (mds->is_standby_replay()) {
1320 // journal has been trimmed by somebody else
1321 r = -CEPHFS_EAGAIN;
1322 } else {
1323 mds->clog->error() << "missing journal object";
1324 mds->damaged_unlocked();
1325 ceph_abort(); // Should be unreachable because damaged() calls respawn()
1326 }
1327 } else if (r == -CEPHFS_EINVAL) {
1328 if (journaler->get_read_pos() < journaler->get_expire_pos()) {
1329 // this should only happen if you're following somebody else
1330 if(journaler->is_readonly()) {
1331 dout(0) << "expire_pos is higher than read_pos, returning CEPHFS_EAGAIN" << dendl;
1332 r = -CEPHFS_EAGAIN;
1333 } else {
1334 mds->clog->error() << "invalid journaler offsets";
1335 mds->damaged_unlocked();
1336 ceph_abort(); // Should be unreachable because damaged() calls respawn()
1337 }
1338 } else {
1339 /* re-read head and check it
1340 * Given that replay happens in a separate thread and
1341 * the MDS is going to either shut down or restart when
1342 * we return this error, doing it synchronously is fine
1343 * -- as long as we drop the main mds lock--. */
1344 C_SaferCond reread_fin;
1345 journaler->reread_head(&reread_fin);
1346 int err = reread_fin.wait();
1347 if (err) {
1348 if (err == -CEPHFS_ENOENT && mds->is_standby_replay()) {
1349 r = -CEPHFS_EAGAIN;
1350 dout(1) << "Journal header went away while in standby replay, journal rewritten?"
1351 << dendl;
1352 break;
1353 } else {
1354 dout(0) << "got error while reading head: " << cpp_strerror(err)
1355 << dendl;
1356
1357 mds->clog->error() << "error reading journal header";
1358 mds->damaged_unlocked();
1359 ceph_abort(); // Should be unreachable because damaged() calls
1360 // respawn()
1361 }
1362 }
1363 standby_trim_segments();
1364 if (journaler->get_read_pos() < journaler->get_expire_pos()) {
1365 dout(0) << "expire_pos is higher than read_pos, returning CEPHFS_EAGAIN" << dendl;
1366 r = -CEPHFS_EAGAIN;
1367 }
1368 }
1369 }
1370 break;
1371 }
1372
1373 if (!journaler->is_readable() &&
1374 journaler->get_read_pos() == journaler->get_write_pos())
1375 break;
1376
1377 ceph_assert(journaler->is_readable() || mds->is_daemon_stopping());
1378
1379 // read it
1380 uint64_t pos = journaler->get_read_pos();
1381 bufferlist bl;
1382 bool r = journaler->try_read_entry(bl);
1383 if (!r && journaler->get_error())
1384 continue;
1385 ceph_assert(r);
1386
1387 // unpack event
1388 auto le = LogEvent::decode_event(bl.cbegin());
1389 if (!le) {
1390 dout(0) << "_replay " << pos << "~" << bl.length() << " / " << journaler->get_write_pos()
1391 << " -- unable to decode event" << dendl;
1392 dout(0) << "dump of unknown or corrupt event:\n";
1393 bl.hexdump(*_dout);
1394 *_dout << dendl;
1395
1396 mds->clog->error() << "corrupt journal event at " << pos << "~"
1397 << bl.length() << " / "
1398 << journaler->get_write_pos();
1399 if (g_conf()->mds_log_skip_corrupt_events) {
1400 continue;
1401 } else {
1402 mds->damaged_unlocked();
1403 ceph_abort(); // Should be unreachable because damaged() calls
1404 // respawn()
1405 }
1406
1407 }
1408 le->set_start_off(pos);
1409
1410 // new segment?
1411 if (le->get_type() == EVENT_SUBTREEMAP ||
1412 le->get_type() == EVENT_RESETJOURNAL) {
1413 auto sle = dynamic_cast<ESubtreeMap*>(le.get());
1414 if (sle && sle->event_seq > 0)
1415 event_seq = sle->event_seq;
1416 else
1417 event_seq = pos;
1418 segments[event_seq] = new LogSegment(event_seq, pos);
1419 logger->set(l_mdl_seg, segments.size());
1420 } else {
1421 event_seq++;
1422 }
1423
1424 // have we seen an import map yet?
1425 if (segments.empty()) {
1426 dout(10) << "_replay " << pos << "~" << bl.length() << " / " << journaler->get_write_pos()
1427 << " " << le->get_stamp() << " -- waiting for subtree_map. (skipping " << *le << ")" << dendl;
1428 } else {
1429 dout(10) << "_replay " << pos << "~" << bl.length() << " / " << journaler->get_write_pos()
1430 << " " << le->get_stamp() << ": " << *le << dendl;
1431 le->_segment = get_current_segment(); // replay may need this
1432 le->_segment->num_events++;
1433 le->_segment->end = journaler->get_read_pos();
1434 num_events++;
1435
1436 {
1437 std::lock_guard l(mds->mds_lock);
1438 if (mds->is_daemon_stopping()) {
1439 return;
1440 }
1441 logger->inc(l_mdl_replayed);
1442 le->replay(mds);
1443 }
1444 }
1445
1446 logger->set(l_mdl_rdpos, pos);
1447 }
1448
1449 // done!
1450 if (r == 0) {
1451 ceph_assert(journaler->get_read_pos() == journaler->get_write_pos());
1452 dout(10) << "_replay - complete, " << num_events
1453 << " events" << dendl;
1454
1455 logger->set(l_mdl_expos, journaler->get_expire_pos());
1456 }
1457
1458 safe_pos = journaler->get_write_safe_pos();
1459
1460 dout(10) << "_replay_thread kicking waiters" << dendl;
1461 {
1462 std::lock_guard l(mds->mds_lock);
1463 if (mds->is_daemon_stopping()) {
1464 return;
1465 }
1466 pre_segments_size = segments.size(); // get num of logs when replay is finished
1467 finish_contexts(g_ceph_context, waitfor_replay, r);
1468 }
1469
1470 dout(10) << "_replay_thread finish" << dendl;
1471 }
1472
1473 void MDLog::standby_trim_segments()
1474 {
1475 dout(10) << "standby_trim_segments" << dendl;
1476 uint64_t expire_pos = journaler->get_expire_pos();
1477 dout(10) << " expire_pos=" << expire_pos << dendl;
1478
1479 mds->mdcache->open_file_table.trim_destroyed_inos(expire_pos);
1480
1481 bool removed_segment = false;
1482 while (have_any_segments()) {
1483 LogSegment *seg = get_oldest_segment();
1484 dout(10) << " segment seq=" << seg->seq << " " << seg->offset <<
1485 "~" << seg->end - seg->offset << dendl;
1486
1487 if (seg->end > expire_pos) {
1488 dout(10) << " won't remove, not expired!" << dendl;
1489 break;
1490 }
1491
1492 if (segments.size() == 1) {
1493 dout(10) << " won't remove, last segment!" << dendl;
1494 break;
1495 }
1496
1497 dout(10) << " removing segment" << dendl;
1498 mds->mdcache->standby_trim_segment(seg);
1499 remove_oldest_segment();
1500 removed_segment = true;
1501 }
1502
1503 if (removed_segment) {
1504 dout(20) << " calling mdcache->trim!" << dendl;
1505 mds->mdcache->trim();
1506 } else {
1507 dout(20) << " removed no segments!" << dendl;
1508 }
1509 }
1510
1511 void MDLog::dump_replay_status(Formatter *f) const
1512 {
1513 f->open_object_section("replay_status");
1514 f->dump_unsigned("journal_read_pos", journaler ? journaler->get_read_pos() : 0);
1515 f->dump_unsigned("journal_write_pos", journaler ? journaler->get_write_pos() : 0);
1516 f->dump_unsigned("journal_expire_pos", journaler ? journaler->get_expire_pos() : 0);
1517 f->dump_unsigned("num_events", get_num_events());
1518 f->dump_unsigned("num_segments", get_num_segments());
1519 f->close_section();
1520 }