]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/MDLog.cc
eea60b48ae527d475d22aaf8a69166d1c24e4971
[ceph.git] / ceph / src / mds / MDLog.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "MDSRank.h"
16 #include "MDLog.h"
17 #include "MDCache.h"
18 #include "LogEvent.h"
19 #include "MDSContext.h"
20
21 #include "osdc/Journaler.h"
22 #include "mds/JournalPointer.h"
23
24 #include "common/entity_name.h"
25 #include "common/perf_counters.h"
26 #include "common/Cond.h"
27
28 #include "events/ESubtreeMap.h"
29
30 #include "common/config.h"
31 #include "common/errno.h"
32 #include "include/ceph_assert.h"
33
34 #define dout_context g_ceph_context
35 #define dout_subsys ceph_subsys_mds
36 #undef dout_prefix
37 #define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".log "
38
39 using namespace std;
40
41 // cons/des
42 MDLog::~MDLog()
43 {
44 if (journaler) { delete journaler; journaler = 0; }
45 if (logger) {
46 g_ceph_context->get_perfcounters_collection()->remove(logger);
47 delete logger;
48 logger = 0;
49 }
50 }
51
52
53 void MDLog::create_logger()
54 {
55 PerfCountersBuilder plb(g_ceph_context, "mds_log", l_mdl_first, l_mdl_last);
56
57 plb.add_u64_counter(l_mdl_evadd, "evadd", "Events submitted", "subm",
58 PerfCountersBuilder::PRIO_INTERESTING);
59 plb.add_u64(l_mdl_ev, "ev", "Events", "evts",
60 PerfCountersBuilder::PRIO_INTERESTING);
61 plb.add_u64(l_mdl_seg, "seg", "Segments", "segs",
62 PerfCountersBuilder::PRIO_INTERESTING);
63
64 plb.set_prio_default(PerfCountersBuilder::PRIO_USEFUL);
65 plb.add_u64(l_mdl_evexg, "evexg", "Expiring events");
66 plb.add_u64(l_mdl_evexd, "evexd", "Current expired events");
67 plb.add_u64(l_mdl_segexg, "segexg", "Expiring segments");
68 plb.add_u64(l_mdl_segexd, "segexd", "Current expired segments");
69 plb.add_u64_counter(l_mdl_replayed, "replayed", "Events replayed",
70 "repl", PerfCountersBuilder::PRIO_INTERESTING);
71 plb.add_time_avg(l_mdl_jlat, "jlat", "Journaler flush latency");
72 plb.add_u64_counter(l_mdl_evex, "evex", "Total expired events");
73 plb.add_u64_counter(l_mdl_evtrm, "evtrm", "Trimmed events");
74 plb.add_u64_counter(l_mdl_segadd, "segadd", "Segments added");
75 plb.add_u64_counter(l_mdl_segex, "segex", "Total expired segments");
76 plb.add_u64_counter(l_mdl_segtrm, "segtrm", "Trimmed segments");
77
78 plb.set_prio_default(PerfCountersBuilder::PRIO_DEBUGONLY);
79 plb.add_u64(l_mdl_expos, "expos", "Journaler xpire position");
80 plb.add_u64(l_mdl_wrpos, "wrpos", "Journaler write position");
81 plb.add_u64(l_mdl_rdpos, "rdpos", "Journaler read position");
82
83 // logger
84 logger = plb.create_perf_counters();
85 g_ceph_context->get_perfcounters_collection()->add(logger);
86 }
87
88 void MDLog::set_write_iohint(unsigned iohint_flags)
89 {
90 journaler->set_write_iohint(iohint_flags);
91 }
92
93 class C_MDL_WriteError : public MDSIOContextBase {
94 protected:
95 MDLog *mdlog;
96 MDSRank *get_mds() override {return mdlog->mds;}
97
98 void finish(int r) override {
99 MDSRank *mds = get_mds();
100 // assume journal is reliable, so don't choose action based on
101 // g_conf()->mds_action_on_write_error.
102 if (r == -CEPHFS_EBLOCKLISTED) {
103 derr << "we have been blocklisted (fenced), respawning..." << dendl;
104 mds->respawn();
105 } else {
106 derr << "unhandled error " << cpp_strerror(r) << ", shutting down..." << dendl;
107 // Although it's possible that this could be something transient,
108 // it's severe and scary, so disable this rank until an administrator
109 // intervenes.
110 mds->clog->error() << "Unhandled journal write error on MDS rank " <<
111 mds->get_nodeid() << ": " << cpp_strerror(r) << ", shutting down.";
112 mds->damaged();
113 ceph_abort(); // damaged should never return
114 }
115 }
116
117 public:
118 explicit C_MDL_WriteError(MDLog *m) :
119 MDSIOContextBase(false), mdlog(m) {}
120 void print(ostream& out) const override {
121 out << "mdlog_write_error";
122 }
123 };
124
125
126 void MDLog::write_head(MDSContext *c)
127 {
128 Context *fin = NULL;
129 if (c != NULL) {
130 fin = new C_IO_Wrapper(mds, c);
131 }
132 journaler->write_head(fin);
133 }
134
135 uint64_t MDLog::get_read_pos() const
136 {
137 return journaler->get_read_pos();
138 }
139
140 uint64_t MDLog::get_write_pos() const
141 {
142 return journaler->get_write_pos();
143 }
144
145 uint64_t MDLog::get_safe_pos() const
146 {
147 return journaler->get_write_safe_pos();
148 }
149
150
151
152 void MDLog::create(MDSContext *c)
153 {
154 dout(5) << "create empty log" << dendl;
155
156 C_GatherBuilder gather(g_ceph_context);
157 // This requires an OnFinisher wrapper because Journaler will call back the completion for write_head inside its own lock
158 // XXX but should maybe that be handled inside Journaler?
159 gather.set_finisher(new C_IO_Wrapper(mds, c));
160
161 // The inode of the default Journaler we will create
162 ino = MDS_INO_LOG_OFFSET + mds->get_nodeid();
163
164 // Instantiate Journaler and start async write to RADOS
165 ceph_assert(journaler == NULL);
166 journaler = new Journaler("mdlog", ino, mds->get_metadata_pool(),
167 CEPH_FS_ONDISK_MAGIC, mds->objecter, logger,
168 l_mdl_jlat, mds->finisher);
169 ceph_assert(journaler->is_readonly());
170 journaler->set_write_error_handler(new C_MDL_WriteError(this));
171 journaler->set_writeable();
172 journaler->create(&mds->mdcache->default_log_layout, g_conf()->mds_journal_format);
173 journaler->write_head(gather.new_sub());
174
175 // Async write JournalPointer to RADOS
176 JournalPointer jp(mds->get_nodeid(), mds->get_metadata_pool());
177 jp.front = ino;
178 jp.back = 0;
179 jp.save(mds->objecter, gather.new_sub());
180
181 gather.activate();
182
183 logger->set(l_mdl_expos, journaler->get_expire_pos());
184 logger->set(l_mdl_wrpos, journaler->get_write_pos());
185
186 submit_thread.create("md_submit");
187 }
188
189 void MDLog::open(MDSContext *c)
190 {
191 dout(5) << "open discovering log bounds" << dendl;
192
193 ceph_assert(!recovery_thread.is_started());
194 recovery_thread.set_completion(c);
195 recovery_thread.create("md_recov_open");
196
197 submit_thread.create("md_submit");
198 // either append() or replay() will follow.
199 }
200
201 /**
202 * Final part of reopen() procedure, after recovery_thread
203 * has done its thing we call append()
204 */
205 class C_ReopenComplete : public MDSInternalContext {
206 MDLog *mdlog;
207 MDSContext *on_complete;
208 public:
209 C_ReopenComplete(MDLog *mdlog_, MDSContext *on_complete_) : MDSInternalContext(mdlog_->mds), mdlog(mdlog_), on_complete(on_complete_) {}
210 void finish(int r) override {
211 mdlog->append();
212 on_complete->complete(r);
213 }
214 };
215
216 /**
217 * Given that open() has been called in the past, go through the journal
218 * recovery procedure again, potentially reformatting the journal if it
219 * was in an old format.
220 */
221 void MDLog::reopen(MDSContext *c)
222 {
223 dout(5) << "reopen" << dendl;
224
225 // Because we will call append() at the completion of this, check that we have already
226 // read the whole journal.
227 ceph_assert(journaler != NULL);
228 ceph_assert(journaler->get_read_pos() == journaler->get_write_pos());
229
230 delete journaler;
231 journaler = NULL;
232
233 // recovery_thread was started at some point in the past. Although
234 // it has called it's completion if we made it back here, it might
235 // still not have been cleaned up: join it.
236 recovery_thread.join();
237
238 recovery_thread.set_completion(new C_ReopenComplete(this, c));
239 recovery_thread.create("md_recov_reopen");
240 }
241
242 void MDLog::append()
243 {
244 dout(5) << "append positioning at end and marking writeable" << dendl;
245 journaler->set_read_pos(journaler->get_write_pos());
246 journaler->set_expire_pos(journaler->get_write_pos());
247
248 journaler->set_writeable();
249
250 logger->set(l_mdl_expos, journaler->get_write_pos());
251 }
252
253
254
255 // -------------------------------------------------
256
257 void MDLog::_start_entry(LogEvent *e)
258 {
259 ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex));
260
261 ceph_assert(cur_event == NULL);
262 cur_event = e;
263
264 event_seq++;
265
266 EMetaBlob *metablob = e->get_metablob();
267 if (metablob) {
268 metablob->event_seq = event_seq;
269 metablob->last_subtree_map = get_last_segment_seq();
270 }
271 }
272
273 void MDLog::cancel_entry(LogEvent *le)
274 {
275 ceph_assert(le == cur_event);
276 cur_event = NULL;
277 delete le;
278 }
279
280 void MDLog::_submit_entry(LogEvent *le, MDSLogContextBase *c)
281 {
282 ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex));
283 ceph_assert(!mds->is_any_replay());
284 ceph_assert(!mds_is_shutting_down);
285
286 ceph_assert(le == cur_event);
287 cur_event = NULL;
288
289 // let the event register itself in the segment
290 ceph_assert(!segments.empty());
291 LogSegment *ls = segments.rbegin()->second;
292 ls->num_events++;
293
294 le->_segment = ls;
295 le->update_segment();
296 le->set_stamp(ceph_clock_now());
297
298 mdsmap_up_features = mds->mdsmap->get_up_features();
299 pending_events[ls->seq].push_back(PendingEvent(le, c));
300 num_events++;
301
302 if (logger) {
303 logger->inc(l_mdl_evadd);
304 logger->set(l_mdl_ev, num_events);
305 }
306
307 unflushed++;
308
309 uint64_t period = journaler->get_layout_period();
310 // start a new segment?
311 if (le->get_type() == EVENT_SUBTREEMAP ||
312 (le->get_type() == EVENT_IMPORTFINISH && mds->is_resolve())) {
313 // avoid infinite loop when ESubtreeMap is very large.
314 // do not insert ESubtreeMap among EImportFinish events that finish
315 // disambiguate imports. Because the ESubtreeMap reflects the subtree
316 // state when all EImportFinish events are replayed.
317 } else if (ls->end/period != ls->offset/period ||
318 ls->num_events >= g_conf()->mds_log_events_per_segment) {
319 dout(10) << "submit_entry also starting new segment: last = "
320 << ls->seq << "/" << ls->offset << ", event seq = " << event_seq << dendl;
321 _start_new_segment();
322 } else if (g_conf()->mds_debug_subtrees &&
323 le->get_type() != EVENT_SUBTREEMAP_TEST) {
324 // debug: journal this every time to catch subtree replay bugs.
325 // use a different event id so it doesn't get interpreted as a
326 // LogSegment boundary on replay.
327 LogEvent *sle = mds->mdcache->create_subtree_map();
328 sle->set_type(EVENT_SUBTREEMAP_TEST);
329 _submit_entry(sle, NULL);
330 }
331 }
332
333 /**
334 * Invoked on the flush after each entry submitted
335 */
336 class C_MDL_Flushed : public MDSLogContextBase {
337 protected:
338 MDLog *mdlog;
339 MDSRank *get_mds() override {return mdlog->mds;}
340 MDSContext *wrapped;
341
342 void finish(int r) override {
343 if (wrapped)
344 wrapped->complete(r);
345 }
346
347 public:
348 C_MDL_Flushed(MDLog *m, MDSContext *w)
349 : mdlog(m), wrapped(w) {}
350 C_MDL_Flushed(MDLog *m, uint64_t wp) : mdlog(m), wrapped(NULL) {
351 set_write_pos(wp);
352 }
353 };
354
355 void MDLog::_submit_thread()
356 {
357 dout(10) << "_submit_thread start" << dendl;
358
359 std::unique_lock locker{submit_mutex};
360
361 while (!mds->is_daemon_stopping()) {
362 if (g_conf()->mds_log_pause) {
363 submit_cond.wait(locker);
364 continue;
365 }
366
367 map<uint64_t,list<PendingEvent> >::iterator it = pending_events.begin();
368 if (it == pending_events.end()) {
369 submit_cond.wait(locker);
370 continue;
371 }
372
373 if (it->second.empty()) {
374 pending_events.erase(it);
375 continue;
376 }
377
378 int64_t features = mdsmap_up_features;
379 PendingEvent data = it->second.front();
380 it->second.pop_front();
381
382 locker.unlock();
383
384 if (data.le) {
385 LogEvent *le = data.le;
386 LogSegment *ls = le->_segment;
387 // encode it, with event type
388 bufferlist bl;
389 le->encode_with_header(bl, features);
390
391 uint64_t write_pos = journaler->get_write_pos();
392
393 le->set_start_off(write_pos);
394 if (le->get_type() == EVENT_SUBTREEMAP)
395 ls->offset = write_pos;
396
397 dout(5) << "_submit_thread " << write_pos << "~" << bl.length()
398 << " : " << *le << dendl;
399
400 // journal it.
401 const uint64_t new_write_pos = journaler->append_entry(bl); // bl is destroyed.
402 ls->end = new_write_pos;
403
404 MDSLogContextBase *fin;
405 if (data.fin) {
406 fin = dynamic_cast<MDSLogContextBase*>(data.fin);
407 ceph_assert(fin);
408 fin->set_write_pos(new_write_pos);
409 } else {
410 fin = new C_MDL_Flushed(this, new_write_pos);
411 }
412
413 journaler->wait_for_flush(fin);
414
415 if (data.flush)
416 journaler->flush();
417
418 if (logger)
419 logger->set(l_mdl_wrpos, ls->end);
420
421 delete le;
422 } else {
423 if (data.fin) {
424 MDSContext* fin =
425 dynamic_cast<MDSContext*>(data.fin);
426 ceph_assert(fin);
427 C_MDL_Flushed *fin2 = new C_MDL_Flushed(this, fin);
428 fin2->set_write_pos(journaler->get_write_pos());
429 journaler->wait_for_flush(fin2);
430 }
431 if (data.flush)
432 journaler->flush();
433 }
434
435 locker.lock();
436 if (data.flush)
437 unflushed = 0;
438 else if (data.le)
439 unflushed++;
440 }
441 }
442
443 void MDLog::wait_for_safe(MDSContext *c)
444 {
445 submit_mutex.lock();
446
447 bool no_pending = true;
448 if (!pending_events.empty()) {
449 pending_events.rbegin()->second.push_back(PendingEvent(NULL, c));
450 no_pending = false;
451 submit_cond.notify_all();
452 }
453
454 submit_mutex.unlock();
455
456 if (no_pending && c)
457 journaler->wait_for_flush(new C_IO_Wrapper(mds, c));
458 }
459
460 void MDLog::flush()
461 {
462 submit_mutex.lock();
463
464 bool do_flush = unflushed > 0;
465 unflushed = 0;
466 if (!pending_events.empty()) {
467 pending_events.rbegin()->second.push_back(PendingEvent(NULL, NULL, true));
468 do_flush = false;
469 submit_cond.notify_all();
470 }
471
472 submit_mutex.unlock();
473
474 if (do_flush)
475 journaler->flush();
476 }
477
478 void MDLog::kick_submitter()
479 {
480 std::lock_guard l(submit_mutex);
481 submit_cond.notify_all();
482 }
483
484 void MDLog::cap()
485 {
486 dout(5) << "mark mds is shutting down" << dendl;
487 mds_is_shutting_down = true;
488 }
489
490 void MDLog::shutdown()
491 {
492 ceph_assert(ceph_mutex_is_locked_by_me(mds->mds_lock));
493
494 dout(5) << "shutdown" << dendl;
495 if (submit_thread.is_started()) {
496 ceph_assert(mds->is_daemon_stopping());
497
498 if (submit_thread.am_self()) {
499 // Called suicide from the thread: trust it to do no work after
500 // returning from suicide, and subsequently respect mds->is_daemon_stopping()
501 // and fall out of its loop.
502 } else {
503 mds->mds_lock.unlock();
504 // Because MDS::stopping is true, it's safe to drop mds_lock: nobody else
505 // picking it up will do anything with it.
506
507 submit_mutex.lock();
508 submit_cond.notify_all();
509 submit_mutex.unlock();
510
511 mds->mds_lock.lock();
512
513 submit_thread.join();
514 }
515 }
516
517 // Replay thread can be stuck inside e.g. Journaler::wait_for_readable,
518 // so we need to shutdown the journaler first.
519 if (journaler) {
520 journaler->shutdown();
521 }
522
523 if (replay_thread.is_started() && !replay_thread.am_self()) {
524 mds->mds_lock.unlock();
525 replay_thread.join();
526 mds->mds_lock.lock();
527 }
528
529 if (recovery_thread.is_started() && !recovery_thread.am_self()) {
530 mds->mds_lock.unlock();
531 recovery_thread.join();
532 mds->mds_lock.lock();
533 }
534 }
535
536
537 // -----------------------------
538 // segments
539
540 void MDLog::_start_new_segment()
541 {
542 _prepare_new_segment();
543 _journal_segment_subtree_map(NULL);
544 }
545
546 void MDLog::_prepare_new_segment()
547 {
548 ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex));
549
550 uint64_t seq = event_seq + 1;
551 dout(7) << __func__ << " seq " << seq << dendl;
552
553 segments[seq] = new LogSegment(seq);
554
555 logger->inc(l_mdl_segadd);
556 logger->set(l_mdl_seg, segments.size());
557
558 // Adjust to next stray dir
559 mds->mdcache->advance_stray();
560 }
561
562 void MDLog::_journal_segment_subtree_map(MDSContext *onsync)
563 {
564 ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex));
565
566 dout(7) << __func__ << dendl;
567 ESubtreeMap *sle = mds->mdcache->create_subtree_map();
568 sle->event_seq = get_last_segment_seq();
569
570 _submit_entry(sle, new C_MDL_Flushed(this, onsync));
571 }
572
573 class C_OFT_Committed : public MDSInternalContext {
574 MDLog *mdlog;
575 uint64_t seq;
576 public:
577 C_OFT_Committed(MDLog *l, uint64_t s) :
578 MDSInternalContext(l->mds), mdlog(l), seq(s) {}
579 void finish(int ret) override {
580 mdlog->trim_expired_segments();
581 }
582 };
583
584 void MDLog::try_to_commit_open_file_table(uint64_t last_seq)
585 {
586 ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex));
587
588 if (mds_is_shutting_down) // shutting down the MDS
589 return;
590
591 if (mds->mdcache->open_file_table.is_any_committing())
592 return;
593
594 // when there have dirty items, maybe there has no any new log event
595 if (mds->mdcache->open_file_table.is_any_dirty() ||
596 last_seq > mds->mdcache->open_file_table.get_committed_log_seq()) {
597 submit_mutex.unlock();
598 mds->mdcache->open_file_table.commit(new C_OFT_Committed(this, last_seq),
599 last_seq, CEPH_MSG_PRIO_HIGH);
600 submit_mutex.lock();
601 }
602 }
603
604 void MDLog::trim(int m)
605 {
606 unsigned max_segments = g_conf()->mds_log_max_segments;
607 int max_events = g_conf()->mds_log_max_events;
608 if (m >= 0)
609 max_events = m;
610
611 if (mds->mdcache->is_readonly()) {
612 dout(10) << "trim, ignoring read-only FS" << dendl;
613 return;
614 }
615
616 // Clamp max_events to not be smaller than events per segment
617 if (max_events > 0 && max_events <= g_conf()->mds_log_events_per_segment) {
618 max_events = g_conf()->mds_log_events_per_segment + 1;
619 }
620
621 submit_mutex.lock();
622
623 // trim!
624 dout(10) << "trim "
625 << segments.size() << " / " << max_segments << " segments, "
626 << num_events << " / " << max_events << " events"
627 << ", " << expiring_segments.size() << " (" << expiring_events << ") expiring"
628 << ", " << expired_segments.size() << " (" << expired_events << ") expired"
629 << dendl;
630
631 if (segments.empty()) {
632 submit_mutex.unlock();
633 return;
634 }
635
636 // hack: only trim for a few seconds at a time
637 utime_t stop = ceph_clock_now();
638 stop += 2.0;
639
640 int op_prio = CEPH_MSG_PRIO_LOW +
641 (CEPH_MSG_PRIO_HIGH - CEPH_MSG_PRIO_LOW) *
642 expiring_segments.size() / max_segments;
643 if (op_prio > CEPH_MSG_PRIO_HIGH)
644 op_prio = CEPH_MSG_PRIO_HIGH;
645
646 unsigned new_expiring_segments = 0;
647
648 unsigned max_expiring_segments = 0;
649 if (pre_segments_size > 0){
650 max_expiring_segments = max_segments/2;
651 ceph_assert(segments.size() >= pre_segments_size);
652 max_expiring_segments = std::max<unsigned>(max_expiring_segments,segments.size() - pre_segments_size);
653 }
654
655 map<uint64_t,LogSegment*>::iterator p = segments.begin();
656 while (p != segments.end()) {
657 if (stop < ceph_clock_now())
658 break;
659
660 unsigned num_remaining_segments = (segments.size() - expired_segments.size() - expiring_segments.size());
661 if ((num_remaining_segments <= max_segments) &&
662 (max_events < 0 || num_events - expiring_events - expired_events <= max_events))
663 break;
664
665 // Do not trim too many segments at once for peak workload. If mds keeps creating N segments each tick,
666 // the upper bound of 'num_remaining_segments - max_segments' is '2 * N'
667 if (new_expiring_segments * 2 > num_remaining_segments)
668 break;
669
670 if (max_expiring_segments > 0 &&
671 expiring_segments.size() >= max_expiring_segments)
672 break;
673
674 // look at first segment
675 LogSegment *ls = p->second;
676 ceph_assert(ls);
677 ++p;
678
679 if (pending_events.count(ls->seq) ||
680 ls->end > safe_pos) {
681 dout(5) << "trim segment " << ls->seq << "/" << ls->offset << ", not fully flushed yet, safe "
682 << journaler->get_write_safe_pos() << " < end " << ls->end << dendl;
683 break;
684 }
685
686 if (expiring_segments.count(ls)) {
687 dout(5) << "trim already expiring segment " << ls->seq << "/" << ls->offset
688 << ", " << ls->num_events << " events" << dendl;
689 } else if (expired_segments.count(ls)) {
690 dout(5) << "trim already expired segment " << ls->seq << "/" << ls->offset
691 << ", " << ls->num_events << " events" << dendl;
692 } else {
693 ceph_assert(expiring_segments.count(ls) == 0);
694 new_expiring_segments++;
695 expiring_segments.insert(ls);
696 expiring_events += ls->num_events;
697 submit_mutex.unlock();
698
699 uint64_t last_seq = ls->seq;
700 try_expire(ls, op_prio);
701
702 submit_mutex.lock();
703 p = segments.lower_bound(last_seq + 1);
704 }
705 }
706
707 try_to_commit_open_file_table(get_last_segment_seq());
708
709 // discard expired segments and unlock submit_mutex
710 _trim_expired_segments();
711 }
712
713 class C_MaybeExpiredSegment : public MDSInternalContext {
714 MDLog *mdlog;
715 LogSegment *ls;
716 int op_prio;
717 public:
718 C_MaybeExpiredSegment(MDLog *mdl, LogSegment *s, int p) :
719 MDSInternalContext(mdl->mds), mdlog(mdl), ls(s), op_prio(p) {}
720 void finish(int res) override {
721 if (res < 0)
722 mdlog->mds->handle_write_error(res);
723 mdlog->_maybe_expired(ls, op_prio);
724 }
725 };
726
727 /**
728 * Like MDLog::trim, but instead of trimming to max_segments, trim all but the latest
729 * segment.
730 */
731 int MDLog::trim_all()
732 {
733 submit_mutex.lock();
734
735 dout(10) << __func__ << ": "
736 << segments.size()
737 << "/" << expiring_segments.size()
738 << "/" << expired_segments.size() << dendl;
739
740 uint64_t last_seq = 0;
741 if (!segments.empty()) {
742 last_seq = get_last_segment_seq();
743 try_to_commit_open_file_table(last_seq);
744 }
745
746 map<uint64_t,LogSegment*>::iterator p = segments.begin();
747 while (p != segments.end() &&
748 p->first < last_seq &&
749 p->second->end < safe_pos) { // next segment should have been started
750 LogSegment *ls = p->second;
751 ++p;
752
753 // Caller should have flushed journaler before calling this
754 if (pending_events.count(ls->seq)) {
755 dout(5) << __func__ << ": segment " << ls->seq << " has pending events" << dendl;
756 submit_mutex.unlock();
757 return -CEPHFS_EAGAIN;
758 }
759
760 if (expiring_segments.count(ls)) {
761 dout(5) << "trim already expiring segment " << ls->seq << "/" << ls->offset
762 << ", " << ls->num_events << " events" << dendl;
763 } else if (expired_segments.count(ls)) {
764 dout(5) << "trim already expired segment " << ls->seq << "/" << ls->offset
765 << ", " << ls->num_events << " events" << dendl;
766 } else {
767 ceph_assert(expiring_segments.count(ls) == 0);
768 expiring_segments.insert(ls);
769 expiring_events += ls->num_events;
770 submit_mutex.unlock();
771
772 uint64_t next_seq = ls->seq + 1;
773 try_expire(ls, CEPH_MSG_PRIO_DEFAULT);
774
775 submit_mutex.lock();
776 p = segments.lower_bound(next_seq);
777 }
778 }
779
780 _trim_expired_segments();
781
782 return 0;
783 }
784
785
786 void MDLog::try_expire(LogSegment *ls, int op_prio)
787 {
788 MDSGatherBuilder gather_bld(g_ceph_context);
789 ls->try_to_expire(mds, gather_bld, op_prio);
790
791 if (gather_bld.has_subs()) {
792 dout(5) << "try_expire expiring segment " << ls->seq << "/" << ls->offset << dendl;
793 gather_bld.set_finisher(new C_MaybeExpiredSegment(this, ls, op_prio));
794 gather_bld.activate();
795 } else {
796 dout(10) << "try_expire expired segment " << ls->seq << "/" << ls->offset << dendl;
797 submit_mutex.lock();
798 ceph_assert(expiring_segments.count(ls));
799 expiring_segments.erase(ls);
800 expiring_events -= ls->num_events;
801 _expired(ls);
802 submit_mutex.unlock();
803 }
804
805 logger->set(l_mdl_segexg, expiring_segments.size());
806 logger->set(l_mdl_evexg, expiring_events);
807 }
808
809 void MDLog::_maybe_expired(LogSegment *ls, int op_prio)
810 {
811 if (mds->mdcache->is_readonly()) {
812 dout(10) << "_maybe_expired, ignoring read-only FS" << dendl;
813 return;
814 }
815
816 dout(10) << "_maybe_expired segment " << ls->seq << "/" << ls->offset
817 << ", " << ls->num_events << " events" << dendl;
818 try_expire(ls, op_prio);
819 }
820
821 void MDLog::_trim_expired_segments()
822 {
823 ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex));
824
825 uint64_t oft_committed_seq = mds->mdcache->open_file_table.get_committed_log_seq();
826
827 // trim expired segments?
828 bool trimmed = false;
829 while (!segments.empty()) {
830 LogSegment *ls = segments.begin()->second;
831 if (!expired_segments.count(ls)) {
832 dout(10) << "_trim_expired_segments waiting for " << ls->seq << "/" << ls->offset
833 << " to expire" << dendl;
834 break;
835 }
836
837 if (!mds_is_shutting_down && ls->seq >= oft_committed_seq) {
838 dout(10) << "_trim_expired_segments open file table committedseq " << oft_committed_seq
839 << " <= " << ls->seq << "/" << ls->offset << dendl;
840 break;
841 }
842
843 dout(10) << "_trim_expired_segments trimming expired "
844 << ls->seq << "/0x" << std::hex << ls->offset << std::dec << dendl;
845 expired_events -= ls->num_events;
846 expired_segments.erase(ls);
847 if (pre_segments_size > 0)
848 pre_segments_size--;
849 num_events -= ls->num_events;
850
851 // this was the oldest segment, adjust expire pos
852 if (journaler->get_expire_pos() < ls->end) {
853 journaler->set_expire_pos(ls->end);
854 logger->set(l_mdl_expos, ls->end);
855 } else {
856 logger->set(l_mdl_expos, ls->offset);
857 }
858
859 logger->inc(l_mdl_segtrm);
860 logger->inc(l_mdl_evtrm, ls->num_events);
861
862 segments.erase(ls->seq);
863 delete ls;
864 trimmed = true;
865 }
866
867 submit_mutex.unlock();
868
869 if (trimmed)
870 journaler->write_head(0);
871 }
872
873 void MDLog::trim_expired_segments()
874 {
875 submit_mutex.lock();
876 _trim_expired_segments();
877 }
878
879 void MDLog::_expired(LogSegment *ls)
880 {
881 ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex));
882
883 dout(5) << "_expired segment " << ls->seq << "/" << ls->offset
884 << ", " << ls->num_events << " events" << dendl;
885
886 if (!mds_is_shutting_down && ls == peek_current_segment()) {
887 dout(5) << "_expired not expiring " << ls->seq << "/" << ls->offset
888 << ", last one and !mds_is_shutting_down" << dendl;
889 } else {
890 // expired.
891 expired_segments.insert(ls);
892 expired_events += ls->num_events;
893
894 // Trigger all waiters
895 finish_contexts(g_ceph_context, ls->expiry_waiters);
896
897 logger->inc(l_mdl_evex, ls->num_events);
898 logger->inc(l_mdl_segex);
899 }
900
901 logger->set(l_mdl_ev, num_events);
902 logger->set(l_mdl_evexd, expired_events);
903 logger->set(l_mdl_seg, segments.size());
904 logger->set(l_mdl_segexd, expired_segments.size());
905 }
906
907
908
909 void MDLog::replay(MDSContext *c)
910 {
911 ceph_assert(journaler->is_active());
912 ceph_assert(journaler->is_readonly());
913
914 // empty?
915 if (journaler->get_read_pos() == journaler->get_write_pos()) {
916 dout(10) << "replay - journal empty, done." << dendl;
917 mds->mdcache->trim();
918 if (mds->is_standby_replay())
919 mds->update_mlogger();
920 if (c) {
921 c->complete(0);
922 }
923 return;
924 }
925
926 // add waiter
927 if (c)
928 waitfor_replay.push_back(c);
929
930 // go!
931 dout(10) << "replay start, from " << journaler->get_read_pos()
932 << " to " << journaler->get_write_pos() << dendl;
933
934 ceph_assert(num_events == 0 || already_replayed);
935 if (already_replayed) {
936 // Ensure previous instance of ReplayThread is joined before
937 // we create another one
938 replay_thread.join();
939 }
940 already_replayed = true;
941
942 replay_thread.create("md_log_replay");
943 }
944
945
946 /**
947 * Resolve the JournalPointer object to a journal file, and
948 * instantiate a Journaler object. This may re-write the journal
949 * if the journal in RADOS appears to be in an old format.
950 *
951 * This is a separate thread because of the way it is initialized from inside
952 * the mds lock, which is also the global objecter lock -- rather than split
953 * it up into hard-to-read async operations linked up by contexts,
954 *
955 * When this function completes, the `journaler` attribute will be set to
956 * a Journaler instance using the latest available serialization format.
957 */
958 void MDLog::_recovery_thread(MDSContext *completion)
959 {
960 ceph_assert(journaler == NULL);
961 if (g_conf()->mds_journal_format > JOURNAL_FORMAT_MAX) {
962 dout(0) << "Configuration value for mds_journal_format is out of bounds, max is "
963 << JOURNAL_FORMAT_MAX << dendl;
964
965 // Oh dear, something unreadable in the store for this rank: require
966 // operator intervention.
967 mds->damaged_unlocked();
968 ceph_abort(); // damaged should not return
969 }
970
971 // First, read the pointer object.
972 // If the pointer object is not present, then create it with
973 // front = default ino and back = null
974 JournalPointer jp(mds->get_nodeid(), mds->get_metadata_pool());
975 const int read_result = jp.load(mds->objecter);
976 if (read_result == -CEPHFS_ENOENT) {
977 inodeno_t const default_log_ino = MDS_INO_LOG_OFFSET + mds->get_nodeid();
978 jp.front = default_log_ino;
979 int write_result = jp.save(mds->objecter);
980 // Nothing graceful we can do for this
981 ceph_assert(write_result >= 0);
982 } else if (read_result == -CEPHFS_EBLOCKLISTED) {
983 derr << "Blocklisted during JournalPointer read! Respawning..." << dendl;
984 mds->respawn();
985 ceph_abort(); // Should be unreachable because respawn calls execv
986 } else if (read_result != 0) {
987 mds->clog->error() << "failed to read JournalPointer: " << read_result
988 << " (" << cpp_strerror(read_result) << ")";
989 mds->damaged_unlocked();
990 ceph_abort(); // Should be unreachable because damaged() calls respawn()
991 }
992
993 // If the back pointer is non-null, that means that a journal
994 // rewrite failed part way through. Erase the back journal
995 // to clean up.
996 if (jp.back) {
997 if (mds->is_standby_replay()) {
998 dout(1) << "Journal " << jp.front << " is being rewritten, "
999 << "cannot replay in standby until an active MDS completes rewrite" << dendl;
1000 std::lock_guard l(mds->mds_lock);
1001 if (mds->is_daemon_stopping()) {
1002 return;
1003 }
1004 completion->complete(-CEPHFS_EAGAIN);
1005 return;
1006 }
1007 dout(1) << "Erasing journal " << jp.back << dendl;
1008 C_SaferCond erase_waiter;
1009 Journaler back("mdlog", jp.back, mds->get_metadata_pool(),
1010 CEPH_FS_ONDISK_MAGIC, mds->objecter, logger, l_mdl_jlat,
1011 mds->finisher);
1012
1013 // Read all about this journal (header + extents)
1014 C_SaferCond recover_wait;
1015 back.recover(&recover_wait);
1016 int recovery_result = recover_wait.wait();
1017 if (recovery_result == -CEPHFS_EBLOCKLISTED) {
1018 derr << "Blocklisted during journal recovery! Respawning..." << dendl;
1019 mds->respawn();
1020 ceph_abort(); // Should be unreachable because respawn calls execv
1021 } else if (recovery_result != 0) {
1022 // Journaler.recover succeeds if no journal objects are present: an error
1023 // means something worse like a corrupt header, which we can't handle here.
1024 mds->clog->error() << "Error recovering journal " << jp.front << ": "
1025 << cpp_strerror(recovery_result);
1026 mds->damaged_unlocked();
1027 ceph_assert(recovery_result == 0); // Unreachable because damaged() calls respawn()
1028 }
1029
1030 // We could read journal, so we can erase it.
1031 back.erase(&erase_waiter);
1032 int erase_result = erase_waiter.wait();
1033
1034 // If we are successful, or find no data, we can update the JournalPointer to
1035 // reflect that the back journal is gone.
1036 if (erase_result != 0 && erase_result != -CEPHFS_ENOENT) {
1037 derr << "Failed to erase journal " << jp.back << ": " << cpp_strerror(erase_result) << dendl;
1038 } else {
1039 dout(1) << "Successfully erased journal, updating journal pointer" << dendl;
1040 jp.back = 0;
1041 int write_result = jp.save(mds->objecter);
1042 // Nothing graceful we can do for this
1043 ceph_assert(write_result >= 0);
1044 }
1045 }
1046
1047 /* Read the header from the front journal */
1048 Journaler *front_journal = new Journaler("mdlog", jp.front,
1049 mds->get_metadata_pool(), CEPH_FS_ONDISK_MAGIC, mds->objecter,
1050 logger, l_mdl_jlat, mds->finisher);
1051
1052 // Assign to ::journaler so that we can be aborted by ::shutdown while
1053 // waiting for journaler recovery
1054 {
1055 std::lock_guard l(mds->mds_lock);
1056 journaler = front_journal;
1057 }
1058
1059 C_SaferCond recover_wait;
1060 front_journal->recover(&recover_wait);
1061 dout(4) << "Waiting for journal " << jp.front << " to recover..." << dendl;
1062 int recovery_result = recover_wait.wait();
1063 if (recovery_result == -CEPHFS_EBLOCKLISTED) {
1064 derr << "Blocklisted during journal recovery! Respawning..." << dendl;
1065 mds->respawn();
1066 ceph_abort(); // Should be unreachable because respawn calls execv
1067 } else if (recovery_result != 0) {
1068 mds->clog->error() << "Error recovering journal " << jp.front << ": "
1069 << cpp_strerror(recovery_result);
1070 mds->damaged_unlocked();
1071 ceph_assert(recovery_result == 0); // Unreachable because damaged() calls respawn()
1072 }
1073 dout(4) << "Journal " << jp.front << " recovered." << dendl;
1074
1075 /* Check whether the front journal format is acceptable or needs re-write */
1076 if (front_journal->get_stream_format() > JOURNAL_FORMAT_MAX) {
1077 dout(0) << "Journal " << jp.front << " is in unknown format " << front_journal->get_stream_format()
1078 << ", does this MDS daemon require upgrade?" << dendl;
1079 {
1080 std::lock_guard l(mds->mds_lock);
1081 if (mds->is_daemon_stopping()) {
1082 journaler = NULL;
1083 delete front_journal;
1084 return;
1085 }
1086 completion->complete(-CEPHFS_EINVAL);
1087 }
1088 } else if (mds->is_standby_replay() || front_journal->get_stream_format() >= g_conf()->mds_journal_format) {
1089 /* The journal is of configured format, or we are in standbyreplay and will
1090 * tolerate replaying old journals until we have to go active. Use front_journal as
1091 * our journaler attribute and complete */
1092 dout(4) << "Recovered journal " << jp.front << " in format " << front_journal->get_stream_format() << dendl;
1093 {
1094 std::lock_guard l(mds->mds_lock);
1095 journaler->set_write_error_handler(new C_MDL_WriteError(this));
1096 if (mds->is_daemon_stopping()) {
1097 return;
1098 }
1099 completion->complete(0);
1100 }
1101 } else {
1102 /* Hand off to reformat routine, which will ultimately set the
1103 * completion when it has done its thing */
1104 dout(1) << "Journal " << jp.front << " has old format "
1105 << front_journal->get_stream_format() << ", it will now be updated" << dendl;
1106 _reformat_journal(jp, front_journal, completion);
1107 }
1108 }
1109
1110 /**
1111 * Blocking rewrite of the journal to a new file, followed by
1112 * swap of journal pointer to point to the new one.
1113 *
1114 * We write the new journal to the 'back' journal from the JournalPointer,
1115 * swapping pointers to make that one the front journal only when we have
1116 * safely completed.
1117 */
1118 void MDLog::_reformat_journal(JournalPointer const &jp_in, Journaler *old_journal, MDSContext *completion)
1119 {
1120 ceph_assert(!jp_in.is_null());
1121 ceph_assert(completion != NULL);
1122 ceph_assert(old_journal != NULL);
1123
1124 JournalPointer jp = jp_in;
1125
1126 /* Set JournalPointer.back to the location we will write the new journal */
1127 inodeno_t primary_ino = MDS_INO_LOG_OFFSET + mds->get_nodeid();
1128 inodeno_t secondary_ino = MDS_INO_LOG_BACKUP_OFFSET + mds->get_nodeid();
1129 jp.back = (jp.front == primary_ino ? secondary_ino : primary_ino);
1130 int write_result = jp.save(mds->objecter);
1131 ceph_assert(write_result == 0);
1132
1133 /* Create the new Journaler file */
1134 Journaler *new_journal = new Journaler("mdlog", jp.back,
1135 mds->get_metadata_pool(), CEPH_FS_ONDISK_MAGIC, mds->objecter, logger, l_mdl_jlat, mds->finisher);
1136 dout(4) << "Writing new journal header " << jp.back << dendl;
1137 file_layout_t new_layout = old_journal->get_layout();
1138 new_journal->set_writeable();
1139 new_journal->create(&new_layout, g_conf()->mds_journal_format);
1140
1141 /* Write the new journal header to RADOS */
1142 C_SaferCond write_head_wait;
1143 new_journal->write_head(&write_head_wait);
1144 write_head_wait.wait();
1145
1146 // Read in the old journal, and whenever we have readable events,
1147 // write them to the new journal.
1148 int r = 0;
1149
1150 // In old format journals before event_seq was introduced, the serialized
1151 // offset of a SubtreeMap message in the log is used as the unique ID for
1152 // a log segment. Because we change serialization, this will end up changing
1153 // for us, so we have to explicitly update the fields that point back to that
1154 // log segment.
1155 std::map<LogSegment::seq_t, LogSegment::seq_t> segment_pos_rewrite;
1156
1157 // The logic in here borrowed from replay_thread expects mds_lock to be held,
1158 // e.g. between checking readable and doing wait_for_readable so that journaler
1159 // state doesn't change in between.
1160 uint32_t events_transcribed = 0;
1161 while (1) {
1162 while (!old_journal->is_readable() &&
1163 old_journal->get_read_pos() < old_journal->get_write_pos() &&
1164 !old_journal->get_error()) {
1165
1166 // Issue a journal prefetch
1167 C_SaferCond readable_waiter;
1168 old_journal->wait_for_readable(&readable_waiter);
1169
1170 // Wait for a journal prefetch to complete
1171 readable_waiter.wait();
1172 }
1173 if (old_journal->get_error()) {
1174 r = old_journal->get_error();
1175 dout(0) << "_replay journaler got error " << r << ", aborting" << dendl;
1176 break;
1177 }
1178
1179 if (!old_journal->is_readable() &&
1180 old_journal->get_read_pos() == old_journal->get_write_pos())
1181 break;
1182
1183 // Read one serialized LogEvent
1184 ceph_assert(old_journal->is_readable());
1185 bufferlist bl;
1186 uint64_t le_pos = old_journal->get_read_pos();
1187 bool r = old_journal->try_read_entry(bl);
1188 if (!r && old_journal->get_error())
1189 continue;
1190 ceph_assert(r);
1191
1192 // Update segment_pos_rewrite
1193 auto le = LogEvent::decode_event(bl.cbegin());
1194 if (le) {
1195 bool modified = false;
1196
1197 if (le->get_type() == EVENT_SUBTREEMAP ||
1198 le->get_type() == EVENT_RESETJOURNAL) {
1199 auto sle = dynamic_cast<ESubtreeMap*>(le.get());
1200 if (sle == NULL || sle->event_seq == 0) {
1201 // A non-explicit event seq: the effective sequence number
1202 // of this segment is it's position in the old journal and
1203 // the new effective sequence number will be its position
1204 // in the new journal.
1205 segment_pos_rewrite[le_pos] = new_journal->get_write_pos();
1206 dout(20) << __func__ << " discovered segment seq mapping "
1207 << le_pos << " -> " << new_journal->get_write_pos() << dendl;
1208 }
1209 } else {
1210 event_seq++;
1211 }
1212
1213 // Rewrite segment references if necessary
1214 EMetaBlob *blob = le->get_metablob();
1215 if (blob) {
1216 modified = blob->rewrite_truncate_finish(mds, segment_pos_rewrite);
1217 }
1218
1219 // Zero-out expire_pos in subtreemap because offsets have changed
1220 // (expire_pos is just an optimization so it's safe to eliminate it)
1221 if (le->get_type() == EVENT_SUBTREEMAP
1222 || le->get_type() == EVENT_SUBTREEMAP_TEST) {
1223 auto& sle = dynamic_cast<ESubtreeMap&>(*le);
1224 dout(20) << __func__ << " zeroing expire_pos in subtreemap event at "
1225 << le_pos << " seq=" << sle.event_seq << dendl;
1226 sle.expire_pos = 0;
1227 modified = true;
1228 }
1229
1230 if (modified) {
1231 bl.clear();
1232 le->encode_with_header(bl, mds->mdsmap->get_up_features());
1233 }
1234 } else {
1235 // Failure from LogEvent::decode, our job is to change the journal wrapper,
1236 // not validate the contents, so pass it through.
1237 dout(1) << __func__ << " transcribing un-decodable LogEvent at old position "
1238 << old_journal->get_read_pos() << ", new position " << new_journal->get_write_pos()
1239 << dendl;
1240 }
1241
1242 // Write (buffered, synchronous) one serialized LogEvent
1243 events_transcribed += 1;
1244 new_journal->append_entry(bl);
1245 }
1246
1247 dout(1) << "Transcribed " << events_transcribed << " events, flushing new journal" << dendl;
1248 C_SaferCond flush_waiter;
1249 new_journal->flush(&flush_waiter);
1250 flush_waiter.wait();
1251
1252 // If failed to rewrite journal, leave the part written journal
1253 // as garbage to be cleaned up next startup.
1254 ceph_assert(r == 0);
1255
1256 /* Now that the new journal is safe, we can flip the pointers */
1257 inodeno_t const tmp = jp.front;
1258 jp.front = jp.back;
1259 jp.back = tmp;
1260 write_result = jp.save(mds->objecter);
1261 ceph_assert(write_result == 0);
1262
1263 /* Delete the old journal to free space */
1264 dout(1) << "New journal flushed, erasing old journal" << dendl;
1265 C_SaferCond erase_waiter;
1266 old_journal->erase(&erase_waiter);
1267 int erase_result = erase_waiter.wait();
1268 ceph_assert(erase_result == 0);
1269 {
1270 std::lock_guard l(mds->mds_lock);
1271 if (mds->is_daemon_stopping()) {
1272 delete new_journal;
1273 return;
1274 }
1275 ceph_assert(journaler == old_journal);
1276 journaler = NULL;
1277 delete old_journal;
1278
1279 /* Update the pointer to reflect we're back in clean single journal state. */
1280 jp.back = 0;
1281 write_result = jp.save(mds->objecter);
1282 ceph_assert(write_result == 0);
1283
1284 /* Reset the Journaler object to its default state */
1285 dout(1) << "Journal rewrite complete, continuing with normal startup" << dendl;
1286 if (mds->is_daemon_stopping()) {
1287 delete new_journal;
1288 return;
1289 }
1290 journaler = new_journal;
1291 journaler->set_readonly();
1292 journaler->set_write_error_handler(new C_MDL_WriteError(this));
1293
1294 /* Trigger completion */
1295 if (mds->is_daemon_stopping()) {
1296 return;
1297 }
1298 completion->complete(0);
1299 }
1300 }
1301
1302
1303 // i am a separate thread
1304 void MDLog::_replay_thread()
1305 {
1306 dout(10) << "_replay_thread start" << dendl;
1307
1308 // loop
1309 int r = 0;
1310 while (1) {
1311 // wait for read?
1312 while (!journaler->is_readable() &&
1313 journaler->get_read_pos() < journaler->get_write_pos() &&
1314 !journaler->get_error()) {
1315 C_SaferCond readable_waiter;
1316 journaler->wait_for_readable(&readable_waiter);
1317 r = readable_waiter.wait();
1318 }
1319 if (journaler->get_error()) {
1320 r = journaler->get_error();
1321 dout(0) << "_replay journaler got error " << r << ", aborting" << dendl;
1322 if (r == -CEPHFS_ENOENT) {
1323 if (mds->is_standby_replay()) {
1324 // journal has been trimmed by somebody else
1325 r = -CEPHFS_EAGAIN;
1326 } else {
1327 mds->clog->error() << "missing journal object";
1328 mds->damaged_unlocked();
1329 ceph_abort(); // Should be unreachable because damaged() calls respawn()
1330 }
1331 } else if (r == -CEPHFS_EINVAL) {
1332 if (journaler->get_read_pos() < journaler->get_expire_pos()) {
1333 // this should only happen if you're following somebody else
1334 if(journaler->is_readonly()) {
1335 dout(0) << "expire_pos is higher than read_pos, returning CEPHFS_EAGAIN" << dendl;
1336 r = -CEPHFS_EAGAIN;
1337 } else {
1338 mds->clog->error() << "invalid journaler offsets";
1339 mds->damaged_unlocked();
1340 ceph_abort(); // Should be unreachable because damaged() calls respawn()
1341 }
1342 } else {
1343 /* re-read head and check it
1344 * Given that replay happens in a separate thread and
1345 * the MDS is going to either shut down or restart when
1346 * we return this error, doing it synchronously is fine
1347 * -- as long as we drop the main mds lock--. */
1348 C_SaferCond reread_fin;
1349 journaler->reread_head(&reread_fin);
1350 int err = reread_fin.wait();
1351 if (err) {
1352 if (err == -CEPHFS_ENOENT && mds->is_standby_replay()) {
1353 r = -CEPHFS_EAGAIN;
1354 dout(1) << "Journal header went away while in standby replay, journal rewritten?"
1355 << dendl;
1356 break;
1357 } else {
1358 dout(0) << "got error while reading head: " << cpp_strerror(err)
1359 << dendl;
1360
1361 mds->clog->error() << "error reading journal header";
1362 mds->damaged_unlocked();
1363 ceph_abort(); // Should be unreachable because damaged() calls
1364 // respawn()
1365 }
1366 }
1367 standby_trim_segments();
1368 if (journaler->get_read_pos() < journaler->get_expire_pos()) {
1369 dout(0) << "expire_pos is higher than read_pos, returning CEPHFS_EAGAIN" << dendl;
1370 r = -CEPHFS_EAGAIN;
1371 }
1372 }
1373 }
1374 break;
1375 }
1376
1377 if (!journaler->is_readable() &&
1378 journaler->get_read_pos() == journaler->get_write_pos())
1379 break;
1380
1381 ceph_assert(journaler->is_readable() || mds->is_daemon_stopping());
1382
1383 // read it
1384 uint64_t pos = journaler->get_read_pos();
1385 bufferlist bl;
1386 bool r = journaler->try_read_entry(bl);
1387 if (!r && journaler->get_error())
1388 continue;
1389 ceph_assert(r);
1390
1391 // unpack event
1392 auto le = LogEvent::decode_event(bl.cbegin());
1393 if (!le) {
1394 dout(0) << "_replay " << pos << "~" << bl.length() << " / " << journaler->get_write_pos()
1395 << " -- unable to decode event" << dendl;
1396 dout(0) << "dump of unknown or corrupt event:\n";
1397 bl.hexdump(*_dout);
1398 *_dout << dendl;
1399
1400 mds->clog->error() << "corrupt journal event at " << pos << "~"
1401 << bl.length() << " / "
1402 << journaler->get_write_pos();
1403 if (g_conf()->mds_log_skip_corrupt_events) {
1404 continue;
1405 } else {
1406 mds->damaged_unlocked();
1407 ceph_abort(); // Should be unreachable because damaged() calls
1408 // respawn()
1409 }
1410
1411 }
1412 le->set_start_off(pos);
1413
1414 // new segment?
1415 if (le->get_type() == EVENT_SUBTREEMAP ||
1416 le->get_type() == EVENT_RESETJOURNAL) {
1417 auto sle = dynamic_cast<ESubtreeMap*>(le.get());
1418 if (sle && sle->event_seq > 0)
1419 event_seq = sle->event_seq;
1420 else
1421 event_seq = pos;
1422 segments[event_seq] = new LogSegment(event_seq, pos);
1423 logger->set(l_mdl_seg, segments.size());
1424 } else {
1425 event_seq++;
1426 }
1427
1428 // have we seen an import map yet?
1429 if (segments.empty()) {
1430 dout(10) << "_replay " << pos << "~" << bl.length() << " / " << journaler->get_write_pos()
1431 << " " << le->get_stamp() << " -- waiting for subtree_map. (skipping " << *le << ")" << dendl;
1432 } else {
1433 dout(10) << "_replay " << pos << "~" << bl.length() << " / " << journaler->get_write_pos()
1434 << " " << le->get_stamp() << ": " << *le << dendl;
1435 le->_segment = get_current_segment(); // replay may need this
1436 le->_segment->num_events++;
1437 le->_segment->end = journaler->get_read_pos();
1438 num_events++;
1439
1440 {
1441 std::lock_guard l(mds->mds_lock);
1442 if (mds->is_daemon_stopping()) {
1443 return;
1444 }
1445 logger->inc(l_mdl_replayed);
1446 le->replay(mds);
1447 }
1448 }
1449
1450 logger->set(l_mdl_rdpos, pos);
1451 }
1452
1453 // done!
1454 if (r == 0) {
1455 ceph_assert(journaler->get_read_pos() == journaler->get_write_pos());
1456 dout(10) << "_replay - complete, " << num_events
1457 << " events" << dendl;
1458
1459 logger->set(l_mdl_expos, journaler->get_expire_pos());
1460 }
1461
1462 safe_pos = journaler->get_write_safe_pos();
1463
1464 dout(10) << "_replay_thread kicking waiters" << dendl;
1465 {
1466 std::lock_guard l(mds->mds_lock);
1467 if (mds->is_daemon_stopping()) {
1468 return;
1469 }
1470 pre_segments_size = segments.size(); // get num of logs when replay is finished
1471 finish_contexts(g_ceph_context, waitfor_replay, r);
1472 }
1473
1474 dout(10) << "_replay_thread finish" << dendl;
1475 }
1476
1477 void MDLog::standby_trim_segments()
1478 {
1479 dout(10) << "standby_trim_segments" << dendl;
1480 uint64_t expire_pos = journaler->get_expire_pos();
1481 dout(10) << " expire_pos=" << expire_pos << dendl;
1482
1483 mds->mdcache->open_file_table.trim_destroyed_inos(expire_pos);
1484
1485 bool removed_segment = false;
1486 while (have_any_segments()) {
1487 LogSegment *seg = get_oldest_segment();
1488 dout(10) << " segment seq=" << seg->seq << " " << seg->offset <<
1489 "~" << seg->end - seg->offset << dendl;
1490
1491 if (seg->end > expire_pos) {
1492 dout(10) << " won't remove, not expired!" << dendl;
1493 break;
1494 }
1495
1496 if (segments.size() == 1) {
1497 dout(10) << " won't remove, last segment!" << dendl;
1498 break;
1499 }
1500
1501 dout(10) << " removing segment" << dendl;
1502 mds->mdcache->standby_trim_segment(seg);
1503 remove_oldest_segment();
1504 removed_segment = true;
1505 }
1506
1507 if (removed_segment) {
1508 dout(20) << " calling mdcache->trim!" << dendl;
1509 mds->mdcache->trim();
1510 } else {
1511 dout(20) << " removed no segments!" << dendl;
1512 }
1513 }
1514
1515 void MDLog::dump_replay_status(Formatter *f) const
1516 {
1517 f->open_object_section("replay_status");
1518 f->dump_unsigned("journal_read_pos", journaler ? journaler->get_read_pos() : 0);
1519 f->dump_unsigned("journal_write_pos", journaler ? journaler->get_write_pos() : 0);
1520 f->dump_unsigned("journal_expire_pos", journaler ? journaler->get_expire_pos() : 0);
1521 f->dump_unsigned("num_events", get_num_events());
1522 f->dump_unsigned("num_segments", get_num_segments());
1523 f->close_section();
1524 }