]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/MDLog.cc
update sources to 12.2.10
[ceph.git] / ceph / src / mds / MDLog.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "MDSRank.h"
16 #include "MDLog.h"
17 #include "MDCache.h"
18 #include "LogEvent.h"
19 #include "MDSContext.h"
20
21 #include "osdc/Journaler.h"
22 #include "mds/JournalPointer.h"
23
24 #include "common/entity_name.h"
25 #include "common/perf_counters.h"
26 #include "common/Cond.h"
27
28 #include "events/ESubtreeMap.h"
29
30 #include "common/config.h"
31 #include "common/errno.h"
32 #include "include/assert.h"
33
34 #define dout_context g_ceph_context
35 #define dout_subsys ceph_subsys_mds
36 #undef dout_prefix
37 #define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".log "
38
39 // cons/des
40 MDLog::~MDLog()
41 {
42 if (journaler) { delete journaler; journaler = 0; }
43 if (logger) {
44 g_ceph_context->get_perfcounters_collection()->remove(logger);
45 delete logger;
46 logger = 0;
47 }
48 }
49
50
51 void MDLog::create_logger()
52 {
53 PerfCountersBuilder plb(g_ceph_context, "mds_log", l_mdl_first, l_mdl_last);
54
55 plb.add_u64_counter(l_mdl_evadd, "evadd", "Events submitted", "subm",
56 PerfCountersBuilder::PRIO_INTERESTING);
57 plb.add_u64(l_mdl_ev, "ev", "Events", "evts",
58 PerfCountersBuilder::PRIO_INTERESTING);
59 plb.add_u64(l_mdl_seg, "seg", "Segments", "segs",
60 PerfCountersBuilder::PRIO_INTERESTING);
61
62 plb.set_prio_default(PerfCountersBuilder::PRIO_USEFUL);
63 plb.add_u64(l_mdl_evexg, "evexg", "Expiring events");
64 plb.add_u64(l_mdl_evexd, "evexd", "Current expired events");
65 plb.add_u64(l_mdl_segexg, "segexg", "Expiring segments");
66 plb.add_u64(l_mdl_segexd, "segexd", "Current expired segments");
67 plb.add_u64_counter(l_mdl_replayed, "replayed", "Events replayed");
68 plb.add_time_avg(l_mdl_jlat, "jlat", "Journaler flush latency");
69 plb.add_u64_counter(l_mdl_evex, "evex", "Total expired events");
70 plb.add_u64_counter(l_mdl_evtrm, "evtrm", "Trimmed events");
71 plb.add_u64_counter(l_mdl_segadd, "segadd", "Segments added");
72 plb.add_u64_counter(l_mdl_segex, "segex", "Total expired segments");
73 plb.add_u64_counter(l_mdl_segtrm, "segtrm", "Trimmed segments");
74
75 plb.set_prio_default(PerfCountersBuilder::PRIO_DEBUGONLY);
76 plb.add_u64(l_mdl_expos, "expos", "Journaler xpire position");
77 plb.add_u64(l_mdl_wrpos, "wrpos", "Journaler write position");
78 plb.add_u64(l_mdl_rdpos, "rdpos", "Journaler read position");
79
80 // logger
81 logger = plb.create_perf_counters();
82 g_ceph_context->get_perfcounters_collection()->add(logger);
83 }
84
85 void MDLog::set_write_iohint(unsigned iohint_flags)
86 {
87 journaler->set_write_iohint(iohint_flags);
88 }
89
90 class C_MDL_WriteError : public MDSIOContextBase {
91 protected:
92 MDLog *mdlog;
93 MDSRank *get_mds() override {return mdlog->mds;}
94
95 void finish(int r) override {
96 MDSRank *mds = get_mds();
97 // assume journal is reliable, so don't choose action based on
98 // g_conf->mds_action_on_write_error.
99 if (r == -EBLACKLISTED) {
100 derr << "we have been blacklisted (fenced), respawning..." << dendl;
101 mds->respawn();
102 } else {
103 derr << "unhandled error " << cpp_strerror(r) << ", shutting down..." << dendl;
104 // Although it's possible that this could be something transient,
105 // it's severe and scary, so disable this rank until an administrator
106 // intervenes.
107 mds->clog->error() << "Unhandled journal write error on MDS rank " <<
108 mds->get_nodeid() << ": " << cpp_strerror(r) << ", shutting down.";
109 mds->damaged();
110 ceph_abort(); // damaged should never return
111 }
112 }
113
114 public:
115 explicit C_MDL_WriteError(MDLog *m) :
116 MDSIOContextBase(false), mdlog(m) {}
117 void print(ostream& out) const override {
118 out << "mdlog_write_error";
119 }
120 };
121
122
123 void MDLog::write_head(MDSInternalContextBase *c)
124 {
125 Context *fin = NULL;
126 if (c != NULL) {
127 fin = new C_IO_Wrapper(mds, c);
128 }
129 journaler->write_head(fin);
130 }
131
132 uint64_t MDLog::get_read_pos() const
133 {
134 return journaler->get_read_pos();
135 }
136
137 uint64_t MDLog::get_write_pos() const
138 {
139 return journaler->get_write_pos();
140 }
141
142 uint64_t MDLog::get_safe_pos() const
143 {
144 return journaler->get_write_safe_pos();
145 }
146
147
148
149 void MDLog::create(MDSInternalContextBase *c)
150 {
151 dout(5) << "create empty log" << dendl;
152
153 C_GatherBuilder gather(g_ceph_context);
154 // This requires an OnFinisher wrapper because Journaler will call back the completion for write_head inside its own lock
155 // XXX but should maybe that be handled inside Journaler?
156 gather.set_finisher(new C_IO_Wrapper(mds, c));
157
158 // The inode of the default Journaler we will create
159 ino = MDS_INO_LOG_OFFSET + mds->get_nodeid();
160
161 // Instantiate Journaler and start async write to RADOS
162 assert(journaler == NULL);
163 journaler = new Journaler("mdlog", ino, mds->mdsmap->get_metadata_pool(),
164 CEPH_FS_ONDISK_MAGIC, mds->objecter, logger,
165 l_mdl_jlat, mds->finisher);
166 assert(journaler->is_readonly());
167 journaler->set_write_error_handler(new C_MDL_WriteError(this));
168 journaler->set_writeable();
169 journaler->create(&mds->mdcache->default_log_layout, g_conf->mds_journal_format);
170 journaler->write_head(gather.new_sub());
171
172 // Async write JournalPointer to RADOS
173 JournalPointer jp(mds->get_nodeid(), mds->mdsmap->get_metadata_pool());
174 jp.front = ino;
175 jp.back = 0;
176 jp.save(mds->objecter, gather.new_sub());
177
178 gather.activate();
179
180 logger->set(l_mdl_expos, journaler->get_expire_pos());
181 logger->set(l_mdl_wrpos, journaler->get_write_pos());
182
183 submit_thread.create("md_submit");
184 }
185
186 void MDLog::open(MDSInternalContextBase *c)
187 {
188 dout(5) << "open discovering log bounds" << dendl;
189
190 assert(!recovery_thread.is_started());
191 recovery_thread.set_completion(c);
192 recovery_thread.create("md_recov_open");
193
194 submit_thread.create("md_submit");
195 // either append() or replay() will follow.
196 }
197
198 /**
199 * Final part of reopen() procedure, after recovery_thread
200 * has done its thing we call append()
201 */
202 class C_ReopenComplete : public MDSInternalContext {
203 MDLog *mdlog;
204 MDSInternalContextBase *on_complete;
205 public:
206 C_ReopenComplete(MDLog *mdlog_, MDSInternalContextBase *on_complete_) : MDSInternalContext(mdlog_->mds), mdlog(mdlog_), on_complete(on_complete_) {}
207 void finish(int r) override {
208 mdlog->append();
209 on_complete->complete(r);
210 }
211 };
212
213 /**
214 * Given that open() has been called in the past, go through the journal
215 * recovery procedure again, potentially reformatting the journal if it
216 * was in an old format.
217 */
218 void MDLog::reopen(MDSInternalContextBase *c)
219 {
220 dout(5) << "reopen" << dendl;
221
222 // Because we will call append() at the completion of this, check that we have already
223 // read the whole journal.
224 assert(journaler != NULL);
225 assert(journaler->get_read_pos() == journaler->get_write_pos());
226
227 delete journaler;
228 journaler = NULL;
229
230 // recovery_thread was started at some point in the past. Although
231 // it has called it's completion if we made it back here, it might
232 // still not have been cleaned up: join it.
233 recovery_thread.join();
234
235 recovery_thread.set_completion(new C_ReopenComplete(this, c));
236 recovery_thread.create("md_recov_reopen");
237 }
238
239 void MDLog::append()
240 {
241 dout(5) << "append positioning at end and marking writeable" << dendl;
242 journaler->set_read_pos(journaler->get_write_pos());
243 journaler->set_expire_pos(journaler->get_write_pos());
244
245 journaler->set_writeable();
246
247 logger->set(l_mdl_expos, journaler->get_write_pos());
248 }
249
250
251
252 // -------------------------------------------------
253
254 void MDLog::_start_entry(LogEvent *e)
255 {
256 assert(submit_mutex.is_locked_by_me());
257
258 assert(cur_event == NULL);
259 cur_event = e;
260
261 event_seq++;
262
263 EMetaBlob *metablob = e->get_metablob();
264 if (metablob) {
265 metablob->event_seq = event_seq;
266 metablob->last_subtree_map = get_last_segment_seq();
267 }
268 }
269
270 void MDLog::cancel_entry(LogEvent *le)
271 {
272 assert(le == cur_event);
273 cur_event = NULL;
274 delete le;
275 }
276
277 void MDLog::_submit_entry(LogEvent *le, MDSLogContextBase *c)
278 {
279 assert(submit_mutex.is_locked_by_me());
280 assert(!mds->is_any_replay());
281 assert(!capped);
282
283 assert(le == cur_event);
284 cur_event = NULL;
285
286 // let the event register itself in the segment
287 assert(!segments.empty());
288 LogSegment *ls = segments.rbegin()->second;
289 ls->num_events++;
290
291 le->_segment = ls;
292 le->update_segment();
293 le->set_stamp(ceph_clock_now());
294
295 mdsmap_up_features = mds->mdsmap->get_up_features();
296 pending_events[ls->seq].push_back(PendingEvent(le, c));
297 num_events++;
298
299 if (logger) {
300 logger->inc(l_mdl_evadd);
301 logger->set(l_mdl_ev, num_events);
302 }
303
304 unflushed++;
305
306 uint64_t period = journaler->get_layout_period();
307 // start a new segment?
308 if (le->get_type() == EVENT_SUBTREEMAP ||
309 (le->get_type() == EVENT_IMPORTFINISH && mds->is_resolve())) {
310 // avoid infinite loop when ESubtreeMap is very large.
311 // do not insert ESubtreeMap among EImportFinish events that finish
312 // disambiguate imports. Because the ESubtreeMap reflects the subtree
313 // state when all EImportFinish events are replayed.
314 } else if (ls->end/period != ls->offset/period ||
315 ls->num_events >= g_conf->mds_log_events_per_segment) {
316 dout(10) << "submit_entry also starting new segment: last = "
317 << ls->seq << "/" << ls->offset << ", event seq = " << event_seq << dendl;
318 _start_new_segment();
319 } else if (g_conf->mds_debug_subtrees &&
320 le->get_type() != EVENT_SUBTREEMAP_TEST) {
321 // debug: journal this every time to catch subtree replay bugs.
322 // use a different event id so it doesn't get interpreted as a
323 // LogSegment boundary on replay.
324 LogEvent *sle = mds->mdcache->create_subtree_map();
325 sle->set_type(EVENT_SUBTREEMAP_TEST);
326 _submit_entry(sle, NULL);
327 }
328 }
329
330 /**
331 * Invoked on the flush after each entry submitted
332 */
333 class C_MDL_Flushed : public MDSLogContextBase {
334 protected:
335 MDLog *mdlog;
336 MDSRank *get_mds() override {return mdlog->mds;}
337 MDSInternalContextBase *wrapped;
338
339 void finish(int r) override {
340 if (wrapped)
341 wrapped->complete(r);
342 }
343
344 public:
345 C_MDL_Flushed(MDLog *m, MDSInternalContextBase *w)
346 : mdlog(m), wrapped(w) {}
347 C_MDL_Flushed(MDLog *m, uint64_t wp) : mdlog(m), wrapped(NULL) {
348 set_write_pos(wp);
349 }
350 };
351
352 void MDLog::_submit_thread()
353 {
354 dout(10) << "_submit_thread start" << dendl;
355
356 submit_mutex.Lock();
357
358 while (!mds->is_daemon_stopping()) {
359 if (g_conf->mds_log_pause) {
360 submit_cond.Wait(submit_mutex);
361 continue;
362 }
363
364 map<uint64_t,list<PendingEvent> >::iterator it = pending_events.begin();
365 if (it == pending_events.end()) {
366 submit_cond.Wait(submit_mutex);
367 continue;
368 }
369
370 if (it->second.empty()) {
371 pending_events.erase(it);
372 continue;
373 }
374
375 int64_t features = mdsmap_up_features;
376 PendingEvent data = it->second.front();
377 it->second.pop_front();
378
379 submit_mutex.Unlock();
380
381 if (data.le) {
382 LogEvent *le = data.le;
383 LogSegment *ls = le->_segment;
384 // encode it, with event type
385 bufferlist bl;
386 le->encode_with_header(bl, features);
387
388 uint64_t write_pos = journaler->get_write_pos();
389
390 le->set_start_off(write_pos);
391 if (le->get_type() == EVENT_SUBTREEMAP)
392 ls->offset = write_pos;
393
394 dout(5) << "_submit_thread " << write_pos << "~" << bl.length()
395 << " : " << *le << dendl;
396
397 // journal it.
398 const uint64_t new_write_pos = journaler->append_entry(bl); // bl is destroyed.
399 ls->end = new_write_pos;
400
401 MDSLogContextBase *fin;
402 if (data.fin) {
403 fin = dynamic_cast<MDSLogContextBase*>(data.fin);
404 assert(fin);
405 fin->set_write_pos(new_write_pos);
406 } else {
407 fin = new C_MDL_Flushed(this, new_write_pos);
408 }
409
410 journaler->wait_for_flush(fin);
411
412 if (data.flush)
413 journaler->flush();
414
415 if (logger)
416 logger->set(l_mdl_wrpos, ls->end);
417
418 delete le;
419 } else {
420 if (data.fin) {
421 MDSInternalContextBase* fin =
422 dynamic_cast<MDSInternalContextBase*>(data.fin);
423 assert(fin);
424 C_MDL_Flushed *fin2 = new C_MDL_Flushed(this, fin);
425 fin2->set_write_pos(journaler->get_write_pos());
426 journaler->wait_for_flush(fin2);
427 }
428 if (data.flush)
429 journaler->flush();
430 }
431
432 submit_mutex.Lock();
433 if (data.flush)
434 unflushed = 0;
435 else if (data.le)
436 unflushed++;
437 }
438
439 submit_mutex.Unlock();
440 }
441
442 void MDLog::wait_for_safe(MDSInternalContextBase *c)
443 {
444 submit_mutex.Lock();
445
446 bool no_pending = true;
447 if (!pending_events.empty()) {
448 pending_events.rbegin()->second.push_back(PendingEvent(NULL, c));
449 no_pending = false;
450 submit_cond.Signal();
451 }
452
453 submit_mutex.Unlock();
454
455 if (no_pending && c)
456 journaler->wait_for_flush(new C_IO_Wrapper(mds, c));
457 }
458
459 void MDLog::flush()
460 {
461 submit_mutex.Lock();
462
463 bool do_flush = unflushed > 0;
464 unflushed = 0;
465 if (!pending_events.empty()) {
466 pending_events.rbegin()->second.push_back(PendingEvent(NULL, NULL, true));
467 do_flush = false;
468 submit_cond.Signal();
469 }
470
471 submit_mutex.Unlock();
472
473 if (do_flush)
474 journaler->flush();
475 }
476
477 void MDLog::kick_submitter()
478 {
479 Mutex::Locker l(submit_mutex);
480 submit_cond.Signal();
481 }
482
483 void MDLog::cap()
484 {
485 dout(5) << "cap" << dendl;
486 capped = true;
487 }
488
489 void MDLog::shutdown()
490 {
491 assert(mds->mds_lock.is_locked_by_me());
492
493 dout(5) << "shutdown" << dendl;
494 if (submit_thread.is_started()) {
495 assert(mds->is_daemon_stopping());
496
497 if (submit_thread.am_self()) {
498 // Called suicide from the thread: trust it to do no work after
499 // returning from suicide, and subsequently respect mds->is_daemon_stopping()
500 // and fall out of its loop.
501 } else {
502 mds->mds_lock.Unlock();
503 // Because MDS::stopping is true, it's safe to drop mds_lock: nobody else
504 // picking it up will do anything with it.
505
506 submit_mutex.Lock();
507 submit_cond.Signal();
508 submit_mutex.Unlock();
509
510 mds->mds_lock.Lock();
511
512 submit_thread.join();
513 }
514 }
515
516 // Replay thread can be stuck inside e.g. Journaler::wait_for_readable,
517 // so we need to shutdown the journaler first.
518 if (journaler) {
519 journaler->shutdown();
520 }
521
522 if (replay_thread.is_started() && !replay_thread.am_self()) {
523 mds->mds_lock.Unlock();
524 replay_thread.join();
525 mds->mds_lock.Lock();
526 }
527
528 if (recovery_thread.is_started() && !recovery_thread.am_self()) {
529 mds->mds_lock.Unlock();
530 recovery_thread.join();
531 mds->mds_lock.Lock();
532 }
533 }
534
535
536 // -----------------------------
537 // segments
538
539 void MDLog::_start_new_segment()
540 {
541 _prepare_new_segment();
542 _journal_segment_subtree_map(NULL);
543 }
544
545 void MDLog::_prepare_new_segment()
546 {
547 assert(submit_mutex.is_locked_by_me());
548
549 uint64_t seq = event_seq + 1;
550 dout(7) << __func__ << " seq " << seq << dendl;
551
552 segments[seq] = new LogSegment(seq);
553
554 logger->inc(l_mdl_segadd);
555 logger->set(l_mdl_seg, segments.size());
556
557 // Adjust to next stray dir
558 dout(10) << "Advancing to next stray directory on mds " << mds->get_nodeid()
559 << dendl;
560 mds->mdcache->advance_stray();
561 }
562
563 void MDLog::_journal_segment_subtree_map(MDSInternalContextBase *onsync)
564 {
565 assert(submit_mutex.is_locked_by_me());
566
567 dout(7) << __func__ << dendl;
568 ESubtreeMap *sle = mds->mdcache->create_subtree_map();
569 sle->event_seq = get_last_segment_seq();
570
571 _submit_entry(sle, new C_MDL_Flushed(this, onsync));
572 }
573
574 void MDLog::trim(int m)
575 {
576 unsigned max_segments = g_conf->mds_log_max_segments;
577 int max_events = g_conf->mds_log_max_events;
578 if (m >= 0)
579 max_events = m;
580
581 if (mds->mdcache->is_readonly()) {
582 dout(10) << "trim, ignoring read-only FS" << dendl;
583 return;
584 }
585
586 // Clamp max_events to not be smaller than events per segment
587 if (max_events > 0 && max_events <= g_conf->mds_log_events_per_segment) {
588 max_events = g_conf->mds_log_events_per_segment + 1;
589 }
590
591 submit_mutex.Lock();
592
593 // trim!
594 dout(10) << "trim "
595 << segments.size() << " / " << max_segments << " segments, "
596 << num_events << " / " << max_events << " events"
597 << ", " << expiring_segments.size() << " (" << expiring_events << ") expiring"
598 << ", " << expired_segments.size() << " (" << expired_events << ") expired"
599 << dendl;
600
601 if (segments.empty()) {
602 submit_mutex.Unlock();
603 return;
604 }
605
606 // hack: only trim for a few seconds at a time
607 utime_t stop = ceph_clock_now();
608 stop += 2.0;
609
610 int op_prio = CEPH_MSG_PRIO_LOW +
611 (CEPH_MSG_PRIO_HIGH - CEPH_MSG_PRIO_LOW) *
612 expiring_segments.size() / max_segments;
613 if (op_prio > CEPH_MSG_PRIO_HIGH)
614 op_prio = CEPH_MSG_PRIO_HIGH;
615
616 unsigned new_expiring_segments = 0;
617
618 map<uint64_t,LogSegment*>::iterator p = segments.begin();
619 while (p != segments.end()) {
620 if (stop < ceph_clock_now())
621 break;
622
623 unsigned num_remaining_segments = (segments.size() - expired_segments.size() - expiring_segments.size());
624 if ((num_remaining_segments <= max_segments) &&
625 (max_events < 0 || num_events - expiring_events - expired_events <= max_events))
626 break;
627
628 // Do not trim too many segments at once for peak workload. If mds keeps creating N segments each tick,
629 // the upper bound of 'num_remaining_segments - max_segments' is '2 * N'
630 if (new_expiring_segments * 2 > num_remaining_segments)
631 break;
632
633 // look at first segment
634 LogSegment *ls = p->second;
635 assert(ls);
636 ++p;
637
638 if (pending_events.count(ls->seq) ||
639 ls->end > safe_pos) {
640 dout(5) << "trim segment " << ls->seq << "/" << ls->offset << ", not fully flushed yet, safe "
641 << journaler->get_write_safe_pos() << " < end " << ls->end << dendl;
642 break;
643 }
644 if (expiring_segments.count(ls)) {
645 dout(5) << "trim already expiring segment " << ls->seq << "/" << ls->offset
646 << ", " << ls->num_events << " events" << dendl;
647 } else if (expired_segments.count(ls)) {
648 dout(5) << "trim already expired segment " << ls->seq << "/" << ls->offset
649 << ", " << ls->num_events << " events" << dendl;
650 } else {
651 assert(expiring_segments.count(ls) == 0);
652 new_expiring_segments++;
653 expiring_segments.insert(ls);
654 expiring_events += ls->num_events;
655 submit_mutex.Unlock();
656
657 uint64_t last_seq = ls->seq;
658 try_expire(ls, op_prio);
659
660 submit_mutex.Lock();
661 p = segments.lower_bound(last_seq + 1);
662 }
663 }
664
665 // discard expired segments and unlock submit_mutex
666 _trim_expired_segments();
667 }
668
669 class C_MaybeExpiredSegment : public MDSInternalContext {
670 MDLog *mdlog;
671 LogSegment *ls;
672 int op_prio;
673 public:
674 C_MaybeExpiredSegment(MDLog *mdl, LogSegment *s, int p) :
675 MDSInternalContext(mdl->mds), mdlog(mdl), ls(s), op_prio(p) {}
676 void finish(int res) override {
677 if (res < 0)
678 mdlog->mds->handle_write_error(res);
679 mdlog->_maybe_expired(ls, op_prio);
680 }
681 };
682
683 /**
684 * Like MDLog::trim, but instead of trimming to max_segments, trim all but the latest
685 * segment.
686 */
687 int MDLog::trim_all()
688 {
689 submit_mutex.Lock();
690
691 dout(10) << __func__ << ": "
692 << segments.size()
693 << "/" << expiring_segments.size()
694 << "/" << expired_segments.size() << dendl;
695
696 uint64_t last_seq = 0;
697 if (!segments.empty())
698 last_seq = get_last_segment_seq();
699
700 map<uint64_t,LogSegment*>::iterator p = segments.begin();
701 while (p != segments.end() &&
702 p->first < last_seq && p->second->end <= safe_pos) {
703 LogSegment *ls = p->second;
704 ++p;
705
706 // Caller should have flushed journaler before calling this
707 if (pending_events.count(ls->seq)) {
708 dout(5) << __func__ << ": segment " << ls->seq << " has pending events" << dendl;
709 submit_mutex.Unlock();
710 return -EAGAIN;
711 }
712
713 if (expiring_segments.count(ls)) {
714 dout(5) << "trim already expiring segment " << ls->seq << "/" << ls->offset
715 << ", " << ls->num_events << " events" << dendl;
716 } else if (expired_segments.count(ls)) {
717 dout(5) << "trim already expired segment " << ls->seq << "/" << ls->offset
718 << ", " << ls->num_events << " events" << dendl;
719 } else {
720 assert(expiring_segments.count(ls) == 0);
721 expiring_segments.insert(ls);
722 expiring_events += ls->num_events;
723 submit_mutex.Unlock();
724
725 uint64_t next_seq = ls->seq + 1;
726 try_expire(ls, CEPH_MSG_PRIO_DEFAULT);
727
728 submit_mutex.Lock();
729 p = segments.lower_bound(next_seq);
730 }
731 }
732
733 _trim_expired_segments();
734
735 return 0;
736 }
737
738
739 void MDLog::try_expire(LogSegment *ls, int op_prio)
740 {
741 MDSGatherBuilder gather_bld(g_ceph_context);
742 ls->try_to_expire(mds, gather_bld, op_prio);
743
744 if (gather_bld.has_subs()) {
745 dout(5) << "try_expire expiring segment " << ls->seq << "/" << ls->offset << dendl;
746 gather_bld.set_finisher(new C_MaybeExpiredSegment(this, ls, op_prio));
747 gather_bld.activate();
748 } else {
749 dout(10) << "try_expire expired segment " << ls->seq << "/" << ls->offset << dendl;
750 submit_mutex.Lock();
751 assert(expiring_segments.count(ls));
752 expiring_segments.erase(ls);
753 expiring_events -= ls->num_events;
754 _expired(ls);
755 submit_mutex.Unlock();
756 }
757
758 logger->set(l_mdl_segexg, expiring_segments.size());
759 logger->set(l_mdl_evexg, expiring_events);
760 }
761
762 void MDLog::_maybe_expired(LogSegment *ls, int op_prio)
763 {
764 if (mds->mdcache->is_readonly()) {
765 dout(10) << "_maybe_expired, ignoring read-only FS" << dendl;
766 return;
767 }
768
769 dout(10) << "_maybe_expired segment " << ls->seq << "/" << ls->offset
770 << ", " << ls->num_events << " events" << dendl;
771 try_expire(ls, op_prio);
772 }
773
774 void MDLog::_trim_expired_segments()
775 {
776 assert(submit_mutex.is_locked_by_me());
777
778 // trim expired segments?
779 bool trimmed = false;
780 while (!segments.empty()) {
781 LogSegment *ls = segments.begin()->second;
782 if (!expired_segments.count(ls)) {
783 dout(10) << "_trim_expired_segments waiting for " << ls->seq << "/" << ls->offset
784 << " to expire" << dendl;
785 break;
786 }
787
788 dout(10) << "_trim_expired_segments trimming expired "
789 << ls->seq << "/0x" << std::hex << ls->offset << std::dec << dendl;
790 expired_events -= ls->num_events;
791 expired_segments.erase(ls);
792 num_events -= ls->num_events;
793
794 // this was the oldest segment, adjust expire pos
795 if (journaler->get_expire_pos() < ls->end) {
796 journaler->set_expire_pos(ls->end);
797 logger->set(l_mdl_expos, ls->end);
798 } else {
799 logger->set(l_mdl_expos, ls->offset);
800 }
801
802 logger->inc(l_mdl_segtrm);
803 logger->inc(l_mdl_evtrm, ls->num_events);
804
805 segments.erase(ls->seq);
806 delete ls;
807 trimmed = true;
808 }
809
810 submit_mutex.Unlock();
811
812 if (trimmed)
813 journaler->write_head(0);
814 }
815
816 void MDLog::trim_expired_segments()
817 {
818 submit_mutex.Lock();
819 _trim_expired_segments();
820 }
821
822 void MDLog::_expired(LogSegment *ls)
823 {
824 assert(submit_mutex.is_locked_by_me());
825
826 dout(5) << "_expired segment " << ls->seq << "/" << ls->offset
827 << ", " << ls->num_events << " events" << dendl;
828
829 if (!capped && ls == peek_current_segment()) {
830 dout(5) << "_expired not expiring " << ls->seq << "/" << ls->offset
831 << ", last one and !capped" << dendl;
832 } else {
833 // expired.
834 expired_segments.insert(ls);
835 expired_events += ls->num_events;
836
837 // Trigger all waiters
838 for (std::list<MDSInternalContextBase*>::iterator i = ls->expiry_waiters.begin();
839 i != ls->expiry_waiters.end(); ++i) {
840 (*i)->complete(0);
841 }
842 ls->expiry_waiters.clear();
843
844 logger->inc(l_mdl_evex, ls->num_events);
845 logger->inc(l_mdl_segex);
846 }
847
848 logger->set(l_mdl_ev, num_events);
849 logger->set(l_mdl_evexd, expired_events);
850 logger->set(l_mdl_seg, segments.size());
851 logger->set(l_mdl_segexd, expired_segments.size());
852 }
853
854
855
856 void MDLog::replay(MDSInternalContextBase *c)
857 {
858 assert(journaler->is_active());
859 assert(journaler->is_readonly());
860
861 // empty?
862 if (journaler->get_read_pos() == journaler->get_write_pos()) {
863 dout(10) << "replay - journal empty, done." << dendl;
864 mds->mdcache->trim();
865 if (c) {
866 c->complete(0);
867 }
868 return;
869 }
870
871 // add waiter
872 if (c)
873 waitfor_replay.push_back(c);
874
875 // go!
876 dout(10) << "replay start, from " << journaler->get_read_pos()
877 << " to " << journaler->get_write_pos() << dendl;
878
879 assert(num_events == 0 || already_replayed);
880 if (already_replayed) {
881 // Ensure previous instance of ReplayThread is joined before
882 // we create another one
883 replay_thread.join();
884 }
885 already_replayed = true;
886
887 replay_thread.create("md_log_replay");
888 }
889
890
891 /**
892 * Resolve the JournalPointer object to a journal file, and
893 * instantiate a Journaler object. This may re-write the journal
894 * if the journal in RADOS appears to be in an old format.
895 *
896 * This is a separate thread because of the way it is initialized from inside
897 * the mds lock, which is also the global objecter lock -- rather than split
898 * it up into hard-to-read async operations linked up by contexts,
899 *
900 * When this function completes, the `journaler` attribute will be set to
901 * a Journaler instance using the latest available serialization format.
902 */
903 void MDLog::_recovery_thread(MDSInternalContextBase *completion)
904 {
905 assert(journaler == NULL);
906 if (g_conf->mds_journal_format > JOURNAL_FORMAT_MAX) {
907 dout(0) << "Configuration value for mds_journal_format is out of bounds, max is "
908 << JOURNAL_FORMAT_MAX << dendl;
909
910 // Oh dear, something unreadable in the store for this rank: require
911 // operator intervention.
912 mds->damaged();
913 ceph_abort(); // damaged should not return
914 }
915
916 // First, read the pointer object.
917 // If the pointer object is not present, then create it with
918 // front = default ino and back = null
919 JournalPointer jp(mds->get_nodeid(), mds->mdsmap->get_metadata_pool());
920 const int read_result = jp.load(mds->objecter);
921 if (read_result == -ENOENT) {
922 inodeno_t const default_log_ino = MDS_INO_LOG_OFFSET + mds->get_nodeid();
923 jp.front = default_log_ino;
924 int write_result = jp.save(mds->objecter);
925 // Nothing graceful we can do for this
926 assert(write_result >= 0);
927 } else if (read_result == -EBLACKLISTED) {
928 derr << "Blacklisted during JournalPointer read! Respawning..." << dendl;
929 mds->respawn();
930 ceph_abort(); // Should be unreachable because respawn calls execv
931 } else if (read_result != 0) {
932 mds->clog->error() << "failed to read JournalPointer: " << read_result
933 << " (" << cpp_strerror(read_result) << ")";
934 mds->damaged_unlocked();
935 ceph_abort(); // Should be unreachable because damaged() calls respawn()
936 }
937
938 // If the back pointer is non-null, that means that a journal
939 // rewrite failed part way through. Erase the back journal
940 // to clean up.
941 if (jp.back) {
942 if (mds->is_standby_replay()) {
943 dout(1) << "Journal " << jp.front << " is being rewritten, "
944 << "cannot replay in standby until an active MDS completes rewrite" << dendl;
945 Mutex::Locker l(mds->mds_lock);
946 if (mds->is_daemon_stopping()) {
947 return;
948 }
949 completion->complete(-EAGAIN);
950 return;
951 }
952 dout(1) << "Erasing journal " << jp.back << dendl;
953 C_SaferCond erase_waiter;
954 Journaler back("mdlog", jp.back, mds->mdsmap->get_metadata_pool(),
955 CEPH_FS_ONDISK_MAGIC, mds->objecter, logger, l_mdl_jlat,
956 mds->finisher);
957
958 // Read all about this journal (header + extents)
959 C_SaferCond recover_wait;
960 back.recover(&recover_wait);
961 int recovery_result = recover_wait.wait();
962 if (recovery_result == -EBLACKLISTED) {
963 derr << "Blacklisted during journal recovery! Respawning..." << dendl;
964 mds->respawn();
965 ceph_abort(); // Should be unreachable because respawn calls execv
966 } else if (recovery_result != 0) {
967 // Journaler.recover succeeds if no journal objects are present: an error
968 // means something worse like a corrupt header, which we can't handle here.
969 mds->clog->error() << "Error recovering journal " << jp.front << ": "
970 << cpp_strerror(recovery_result);
971 mds->damaged_unlocked();
972 assert(recovery_result == 0); // Unreachable because damaged() calls respawn()
973 }
974
975 // We could read journal, so we can erase it.
976 back.erase(&erase_waiter);
977 int erase_result = erase_waiter.wait();
978
979 // If we are successful, or find no data, we can update the JournalPointer to
980 // reflect that the back journal is gone.
981 if (erase_result != 0 && erase_result != -ENOENT) {
982 derr << "Failed to erase journal " << jp.back << ": " << cpp_strerror(erase_result) << dendl;
983 } else {
984 dout(1) << "Successfully erased journal, updating journal pointer" << dendl;
985 jp.back = 0;
986 int write_result = jp.save(mds->objecter);
987 // Nothing graceful we can do for this
988 assert(write_result >= 0);
989 }
990 }
991
992 /* Read the header from the front journal */
993 Journaler *front_journal = new Journaler("mdlog", jp.front,
994 mds->mdsmap->get_metadata_pool(), CEPH_FS_ONDISK_MAGIC, mds->objecter,
995 logger, l_mdl_jlat, mds->finisher);
996
997 // Assign to ::journaler so that we can be aborted by ::shutdown while
998 // waiting for journaler recovery
999 {
1000 Mutex::Locker l(mds->mds_lock);
1001 journaler = front_journal;
1002 }
1003
1004 C_SaferCond recover_wait;
1005 front_journal->recover(&recover_wait);
1006 dout(4) << "Waiting for journal " << jp.front << " to recover..." << dendl;
1007 int recovery_result = recover_wait.wait();
1008 dout(4) << "Journal " << jp.front << " recovered." << dendl;
1009
1010 if (recovery_result == -EBLACKLISTED) {
1011 derr << "Blacklisted during journal recovery! Respawning..." << dendl;
1012 mds->respawn();
1013 ceph_abort(); // Should be unreachable because respawn calls execv
1014 } else if (recovery_result != 0) {
1015 mds->clog->error() << "Error recovering journal " << jp.front << ": "
1016 << cpp_strerror(recovery_result);
1017 mds->damaged_unlocked();
1018 assert(recovery_result == 0); // Unreachable because damaged() calls respawn()
1019 }
1020
1021 /* Check whether the front journal format is acceptable or needs re-write */
1022 if (front_journal->get_stream_format() > JOURNAL_FORMAT_MAX) {
1023 dout(0) << "Journal " << jp.front << " is in unknown format " << front_journal->get_stream_format()
1024 << ", does this MDS daemon require upgrade?" << dendl;
1025 {
1026 Mutex::Locker l(mds->mds_lock);
1027 if (mds->is_daemon_stopping()) {
1028 journaler = NULL;
1029 delete front_journal;
1030 return;
1031 }
1032 completion->complete(-EINVAL);
1033 }
1034 } else if (mds->is_standby_replay() || front_journal->get_stream_format() >= g_conf->mds_journal_format) {
1035 /* The journal is of configured format, or we are in standbyreplay and will
1036 * tolerate replaying old journals until we have to go active. Use front_journal as
1037 * our journaler attribute and complete */
1038 dout(4) << "Recovered journal " << jp.front << " in format " << front_journal->get_stream_format() << dendl;
1039 journaler->set_write_error_handler(new C_MDL_WriteError(this));
1040 {
1041 Mutex::Locker l(mds->mds_lock);
1042 if (mds->is_daemon_stopping()) {
1043 return;
1044 }
1045 completion->complete(0);
1046 }
1047 } else {
1048 /* Hand off to reformat routine, which will ultimately set the
1049 * completion when it has done its thing */
1050 dout(1) << "Journal " << jp.front << " has old format "
1051 << front_journal->get_stream_format() << ", it will now be updated" << dendl;
1052 _reformat_journal(jp, front_journal, completion);
1053 }
1054 }
1055
1056 /**
1057 * Blocking rewrite of the journal to a new file, followed by
1058 * swap of journal pointer to point to the new one.
1059 *
1060 * We write the new journal to the 'back' journal from the JournalPointer,
1061 * swapping pointers to make that one the front journal only when we have
1062 * safely completed.
1063 */
1064 void MDLog::_reformat_journal(JournalPointer const &jp_in, Journaler *old_journal, MDSInternalContextBase *completion)
1065 {
1066 assert(!jp_in.is_null());
1067 assert(completion != NULL);
1068 assert(old_journal != NULL);
1069
1070 JournalPointer jp = jp_in;
1071
1072 /* Set JournalPointer.back to the location we will write the new journal */
1073 inodeno_t primary_ino = MDS_INO_LOG_OFFSET + mds->get_nodeid();
1074 inodeno_t secondary_ino = MDS_INO_LOG_BACKUP_OFFSET + mds->get_nodeid();
1075 jp.back = (jp.front == primary_ino ? secondary_ino : primary_ino);
1076 int write_result = jp.save(mds->objecter);
1077 assert(write_result == 0);
1078
1079 /* Create the new Journaler file */
1080 Journaler *new_journal = new Journaler("mdlog", jp.back,
1081 mds->mdsmap->get_metadata_pool(), CEPH_FS_ONDISK_MAGIC, mds->objecter, logger, l_mdl_jlat, mds->finisher);
1082 dout(4) << "Writing new journal header " << jp.back << dendl;
1083 file_layout_t new_layout = old_journal->get_layout();
1084 new_journal->set_writeable();
1085 new_journal->create(&new_layout, g_conf->mds_journal_format);
1086
1087 /* Write the new journal header to RADOS */
1088 C_SaferCond write_head_wait;
1089 new_journal->write_head(&write_head_wait);
1090 write_head_wait.wait();
1091
1092 // Read in the old journal, and whenever we have readable events,
1093 // write them to the new journal.
1094 int r = 0;
1095
1096 // In old format journals before event_seq was introduced, the serialized
1097 // offset of a SubtreeMap message in the log is used as the unique ID for
1098 // a log segment. Because we change serialization, this will end up changing
1099 // for us, so we have to explicitly update the fields that point back to that
1100 // log segment.
1101 std::map<log_segment_seq_t, log_segment_seq_t> segment_pos_rewrite;
1102
1103 // The logic in here borrowed from replay_thread expects mds_lock to be held,
1104 // e.g. between checking readable and doing wait_for_readable so that journaler
1105 // state doesn't change in between.
1106 uint32_t events_transcribed = 0;
1107 while (1) {
1108 while (!old_journal->is_readable() &&
1109 old_journal->get_read_pos() < old_journal->get_write_pos() &&
1110 !old_journal->get_error()) {
1111
1112 // Issue a journal prefetch
1113 C_SaferCond readable_waiter;
1114 old_journal->wait_for_readable(&readable_waiter);
1115
1116 // Wait for a journal prefetch to complete
1117 readable_waiter.wait();
1118 }
1119 if (old_journal->get_error()) {
1120 r = old_journal->get_error();
1121 dout(0) << "_replay journaler got error " << r << ", aborting" << dendl;
1122 break;
1123 }
1124
1125 if (!old_journal->is_readable() &&
1126 old_journal->get_read_pos() == old_journal->get_write_pos())
1127 break;
1128
1129 // Read one serialized LogEvent
1130 assert(old_journal->is_readable());
1131 bufferlist bl;
1132 uint64_t le_pos = old_journal->get_read_pos();
1133 bool r = old_journal->try_read_entry(bl);
1134 if (!r && old_journal->get_error())
1135 continue;
1136 assert(r);
1137
1138 // Update segment_pos_rewrite
1139 LogEvent *le = LogEvent::decode(bl);
1140 if (le) {
1141 bool modified = false;
1142
1143 if (le->get_type() == EVENT_SUBTREEMAP ||
1144 le->get_type() == EVENT_RESETJOURNAL) {
1145 ESubtreeMap *sle = dynamic_cast<ESubtreeMap*>(le);
1146 if (sle == NULL || sle->event_seq == 0) {
1147 // A non-explicit event seq: the effective sequence number
1148 // of this segment is it's position in the old journal and
1149 // the new effective sequence number will be its position
1150 // in the new journal.
1151 segment_pos_rewrite[le_pos] = new_journal->get_write_pos();
1152 dout(20) << __func__ << " discovered segment seq mapping "
1153 << le_pos << " -> " << new_journal->get_write_pos() << dendl;
1154 }
1155 } else {
1156 event_seq++;
1157 }
1158
1159 // Rewrite segment references if necessary
1160 EMetaBlob *blob = le->get_metablob();
1161 if (blob) {
1162 modified = blob->rewrite_truncate_finish(mds, segment_pos_rewrite);
1163 }
1164
1165 // Zero-out expire_pos in subtreemap because offsets have changed
1166 // (expire_pos is just an optimization so it's safe to eliminate it)
1167 if (le->get_type() == EVENT_SUBTREEMAP
1168 || le->get_type() == EVENT_SUBTREEMAP_TEST) {
1169 ESubtreeMap *sle = dynamic_cast<ESubtreeMap*>(le);
1170 assert(sle != NULL);
1171 dout(20) << __func__ << " zeroing expire_pos in subtreemap event at "
1172 << le_pos << " seq=" << sle->event_seq << dendl;
1173 sle->expire_pos = 0;
1174 modified = true;
1175 }
1176
1177 if (modified) {
1178 bl.clear();
1179 le->encode_with_header(bl, mds->mdsmap->get_up_features());
1180 }
1181
1182 delete le;
1183 } else {
1184 // Failure from LogEvent::decode, our job is to change the journal wrapper,
1185 // not validate the contents, so pass it through.
1186 dout(1) << __func__ << " transcribing un-decodable LogEvent at old position "
1187 << old_journal->get_read_pos() << ", new position " << new_journal->get_write_pos()
1188 << dendl;
1189 }
1190
1191 // Write (buffered, synchronous) one serialized LogEvent
1192 events_transcribed += 1;
1193 new_journal->append_entry(bl);
1194 }
1195
1196 dout(1) << "Transcribed " << events_transcribed << " events, flushing new journal" << dendl;
1197 C_SaferCond flush_waiter;
1198 new_journal->flush(&flush_waiter);
1199 flush_waiter.wait();
1200
1201 // If failed to rewrite journal, leave the part written journal
1202 // as garbage to be cleaned up next startup.
1203 assert(r == 0);
1204
1205 /* Now that the new journal is safe, we can flip the pointers */
1206 inodeno_t const tmp = jp.front;
1207 jp.front = jp.back;
1208 jp.back = tmp;
1209 write_result = jp.save(mds->objecter);
1210 assert(write_result == 0);
1211
1212 /* Delete the old journal to free space */
1213 dout(1) << "New journal flushed, erasing old journal" << dendl;
1214 C_SaferCond erase_waiter;
1215 old_journal->erase(&erase_waiter);
1216 int erase_result = erase_waiter.wait();
1217 assert(erase_result == 0);
1218 {
1219 Mutex::Locker l(mds->mds_lock);
1220 if (mds->is_daemon_stopping()) {
1221 delete new_journal;
1222 return;
1223 }
1224 assert(journaler == old_journal);
1225 journaler = NULL;
1226 delete old_journal;
1227 }
1228
1229 /* Update the pointer to reflect we're back in clean single journal state. */
1230 jp.back = 0;
1231 write_result = jp.save(mds->objecter);
1232 assert(write_result == 0);
1233
1234 /* Reset the Journaler object to its default state */
1235 dout(1) << "Journal rewrite complete, continuing with normal startup" << dendl;
1236 {
1237 Mutex::Locker l(mds->mds_lock);
1238 if (mds->is_daemon_stopping()) {
1239 delete new_journal;
1240 return;
1241 }
1242 journaler = new_journal;
1243 journaler->set_readonly();
1244 journaler->set_write_error_handler(new C_MDL_WriteError(this));
1245 }
1246
1247 /* Trigger completion */
1248 {
1249 Mutex::Locker l(mds->mds_lock);
1250 if (mds->is_daemon_stopping()) {
1251 return;
1252 }
1253 completion->complete(0);
1254 }
1255 }
1256
1257
1258 // i am a separate thread
1259 void MDLog::_replay_thread()
1260 {
1261 dout(10) << "_replay_thread start" << dendl;
1262
1263 // loop
1264 int r = 0;
1265 while (1) {
1266 // wait for read?
1267 while (!journaler->is_readable() &&
1268 journaler->get_read_pos() < journaler->get_write_pos() &&
1269 !journaler->get_error()) {
1270 C_SaferCond readable_waiter;
1271 journaler->wait_for_readable(&readable_waiter);
1272 r = readable_waiter.wait();
1273 }
1274 if (journaler->get_error()) {
1275 r = journaler->get_error();
1276 dout(0) << "_replay journaler got error " << r << ", aborting" << dendl;
1277 if (r == -ENOENT) {
1278 if (mds->is_standby_replay()) {
1279 // journal has been trimmed by somebody else
1280 r = -EAGAIN;
1281 } else {
1282 mds->clog->error() << "missing journal object";
1283 mds->damaged_unlocked();
1284 ceph_abort(); // Should be unreachable because damaged() calls respawn()
1285 }
1286 } else if (r == -EINVAL) {
1287 if (journaler->get_read_pos() < journaler->get_expire_pos()) {
1288 // this should only happen if you're following somebody else
1289 if(journaler->is_readonly()) {
1290 dout(0) << "expire_pos is higher than read_pos, returning EAGAIN" << dendl;
1291 r = -EAGAIN;
1292 } else {
1293 mds->clog->error() << "invalid journaler offsets";
1294 mds->damaged_unlocked();
1295 ceph_abort(); // Should be unreachable because damaged() calls respawn()
1296 }
1297 } else {
1298 /* re-read head and check it
1299 * Given that replay happens in a separate thread and
1300 * the MDS is going to either shut down or restart when
1301 * we return this error, doing it synchronously is fine
1302 * -- as long as we drop the main mds lock--. */
1303 C_SaferCond reread_fin;
1304 journaler->reread_head(&reread_fin);
1305 int err = reread_fin.wait();
1306 if (err) {
1307 if (err == -ENOENT && mds->is_standby_replay()) {
1308 r = -EAGAIN;
1309 dout(1) << "Journal header went away while in standby replay, journal rewritten?"
1310 << dendl;
1311 break;
1312 } else {
1313 dout(0) << "got error while reading head: " << cpp_strerror(err)
1314 << dendl;
1315
1316 mds->clog->error() << "error reading journal header";
1317 mds->damaged_unlocked();
1318 ceph_abort(); // Should be unreachable because damaged() calls
1319 // respawn()
1320 }
1321 }
1322 standby_trim_segments();
1323 if (journaler->get_read_pos() < journaler->get_expire_pos()) {
1324 dout(0) << "expire_pos is higher than read_pos, returning EAGAIN" << dendl;
1325 r = -EAGAIN;
1326 }
1327 }
1328 }
1329 break;
1330 }
1331
1332 if (!journaler->is_readable() &&
1333 journaler->get_read_pos() == journaler->get_write_pos())
1334 break;
1335
1336 assert(journaler->is_readable() || mds->is_daemon_stopping());
1337
1338 // read it
1339 uint64_t pos = journaler->get_read_pos();
1340 bufferlist bl;
1341 bool r = journaler->try_read_entry(bl);
1342 if (!r && journaler->get_error())
1343 continue;
1344 assert(r);
1345
1346 // unpack event
1347 LogEvent *le = LogEvent::decode(bl);
1348 if (!le) {
1349 dout(0) << "_replay " << pos << "~" << bl.length() << " / " << journaler->get_write_pos()
1350 << " -- unable to decode event" << dendl;
1351 dout(0) << "dump of unknown or corrupt event:\n";
1352 bl.hexdump(*_dout);
1353 *_dout << dendl;
1354
1355 mds->clog->error() << "corrupt journal event at " << pos << "~"
1356 << bl.length() << " / "
1357 << journaler->get_write_pos();
1358 if (g_conf->mds_log_skip_corrupt_events) {
1359 continue;
1360 } else {
1361 mds->damaged_unlocked();
1362 ceph_abort(); // Should be unreachable because damaged() calls
1363 // respawn()
1364 }
1365
1366 }
1367 le->set_start_off(pos);
1368
1369 // new segment?
1370 if (le->get_type() == EVENT_SUBTREEMAP ||
1371 le->get_type() == EVENT_RESETJOURNAL) {
1372 ESubtreeMap *sle = dynamic_cast<ESubtreeMap*>(le);
1373 if (sle && sle->event_seq > 0)
1374 event_seq = sle->event_seq;
1375 else
1376 event_seq = pos;
1377 segments[event_seq] = new LogSegment(event_seq, pos);
1378 logger->set(l_mdl_seg, segments.size());
1379 } else {
1380 event_seq++;
1381 }
1382
1383 // have we seen an import map yet?
1384 if (segments.empty()) {
1385 dout(10) << "_replay " << pos << "~" << bl.length() << " / " << journaler->get_write_pos()
1386 << " " << le->get_stamp() << " -- waiting for subtree_map. (skipping " << *le << ")" << dendl;
1387 } else {
1388 dout(10) << "_replay " << pos << "~" << bl.length() << " / " << journaler->get_write_pos()
1389 << " " << le->get_stamp() << ": " << *le << dendl;
1390 le->_segment = get_current_segment(); // replay may need this
1391 le->_segment->num_events++;
1392 le->_segment->end = journaler->get_read_pos();
1393 num_events++;
1394
1395 {
1396 Mutex::Locker l(mds->mds_lock);
1397 if (mds->is_daemon_stopping()) {
1398 return;
1399 }
1400 logger->inc(l_mdl_replayed);
1401 le->replay(mds);
1402 }
1403 }
1404 delete le;
1405
1406 logger->set(l_mdl_rdpos, pos);
1407 }
1408
1409 // done!
1410 if (r == 0) {
1411 assert(journaler->get_read_pos() == journaler->get_write_pos());
1412 dout(10) << "_replay - complete, " << num_events
1413 << " events" << dendl;
1414
1415 logger->set(l_mdl_expos, journaler->get_expire_pos());
1416 }
1417
1418 safe_pos = journaler->get_write_safe_pos();
1419
1420 dout(10) << "_replay_thread kicking waiters" << dendl;
1421 {
1422 Mutex::Locker l(mds->mds_lock);
1423 if (mds->is_daemon_stopping()) {
1424 return;
1425 }
1426 finish_contexts(g_ceph_context, waitfor_replay, r);
1427 }
1428
1429 dout(10) << "_replay_thread finish" << dendl;
1430 }
1431
1432 void MDLog::standby_trim_segments()
1433 {
1434 dout(10) << "standby_trim_segments" << dendl;
1435 uint64_t expire_pos = journaler->get_expire_pos();
1436 dout(10) << " expire_pos=" << expire_pos << dendl;
1437 bool removed_segment = false;
1438 while (have_any_segments()) {
1439 LogSegment *seg = get_oldest_segment();
1440 dout(10) << " segment seq=" << seg->seq << " " << seg->offset <<
1441 "~" << seg->end - seg->offset << dendl;
1442
1443 if (seg->end > expire_pos) {
1444 dout(10) << " won't remove, not expired!" << dendl;
1445 break;
1446 }
1447
1448 if (segments.size() == 1) {
1449 dout(10) << " won't remove, last segment!" << dendl;
1450 break;
1451 }
1452
1453 dout(10) << " removing segment" << dendl;
1454 mds->mdcache->standby_trim_segment(seg);
1455 remove_oldest_segment();
1456 removed_segment = true;
1457 }
1458
1459 if (removed_segment) {
1460 dout(20) << " calling mdcache->trim!" << dendl;
1461 mds->mdcache->trim();
1462 } else {
1463 dout(20) << " removed no segments!" << dendl;
1464 }
1465 }
1466
1467 void MDLog::dump_replay_status(Formatter *f) const
1468 {
1469 f->open_object_section("replay_status");
1470 f->dump_unsigned("journal_read_pos", journaler ? journaler->get_read_pos() : 0);
1471 f->dump_unsigned("journal_write_pos", journaler ? journaler->get_write_pos() : 0);
1472 f->dump_unsigned("journal_expire_pos", journaler ? journaler->get_expire_pos() : 0);
1473 f->dump_unsigned("num_events", get_num_events());
1474 f->dump_unsigned("num_segments", get_num_segments());
1475 f->close_section();
1476 }