]> git.proxmox.com Git - ceph.git/blame - ceph/src/mds/MDLog.cc
update sources to v12.1.3
[ceph.git] / ceph / src / mds / MDLog.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "MDSRank.h"
16#include "MDLog.h"
17#include "MDCache.h"
18#include "LogEvent.h"
19#include "MDSContext.h"
20
21#include "osdc/Journaler.h"
22#include "mds/JournalPointer.h"
23
24#include "common/entity_name.h"
25#include "common/perf_counters.h"
26#include "common/Cond.h"
27
28#include "events/ESubtreeMap.h"
29
30#include "common/config.h"
31#include "common/errno.h"
32#include "include/assert.h"
33
34#define dout_context g_ceph_context
35#define dout_subsys ceph_subsys_mds
36#undef dout_prefix
37#define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".log "
38
39// cons/des
40MDLog::~MDLog()
41{
42 if (journaler) { delete journaler; journaler = 0; }
43 if (logger) {
44 g_ceph_context->get_perfcounters_collection()->remove(logger);
45 delete logger;
46 logger = 0;
47 }
48}
49
50
51void MDLog::create_logger()
52{
53 PerfCountersBuilder plb(g_ceph_context, "mds_log", l_mdl_first, l_mdl_last);
54
55 plb.add_u64_counter(l_mdl_evadd, "evadd",
c07f9fc5 56 "Events submitted", "subm", PerfCountersBuilder::PRIO_INTERESTING);
7c673cae
FG
57 plb.add_u64_counter(l_mdl_evex, "evex", "Total expired events");
58 plb.add_u64_counter(l_mdl_evtrm, "evtrm", "Trimmed events");
59 plb.add_u64(l_mdl_ev, "ev",
c07f9fc5 60 "Events", "evts", PerfCountersBuilder::PRIO_INTERESTING);
7c673cae
FG
61 plb.add_u64(l_mdl_evexg, "evexg", "Expiring events");
62 plb.add_u64(l_mdl_evexd, "evexd", "Current expired events");
63
64 plb.add_u64_counter(l_mdl_segadd, "segadd", "Segments added");
65 plb.add_u64_counter(l_mdl_segex, "segex", "Total expired segments");
66 plb.add_u64_counter(l_mdl_segtrm, "segtrm", "Trimmed segments");
67 plb.add_u64(l_mdl_seg, "seg",
c07f9fc5 68 "Segments", "segs", PerfCountersBuilder::PRIO_INTERESTING);
7c673cae
FG
69 plb.add_u64(l_mdl_segexg, "segexg", "Expiring segments");
70 plb.add_u64(l_mdl_segexd, "segexd", "Current expired segments");
71
72 plb.add_u64(l_mdl_expos, "expos", "Journaler xpire position");
73 plb.add_u64(l_mdl_wrpos, "wrpos", "Journaler write position");
74 plb.add_u64(l_mdl_rdpos, "rdpos", "Journaler read position");
75 plb.add_time_avg(l_mdl_jlat, "jlat", "Journaler flush latency");
76
77 plb.add_u64_counter(l_mdl_replayed, "replayed", "Events replayed");
78
79 // logger
80 logger = plb.create_perf_counters();
81 g_ceph_context->get_perfcounters_collection()->add(logger);
82}
83
84void MDLog::set_write_iohint(unsigned iohint_flags)
85{
86 journaler->set_write_iohint(iohint_flags);
87}
88
89class C_MDL_WriteError : public MDSIOContextBase {
90 protected:
91 MDLog *mdlog;
92 MDSRank *get_mds() override {return mdlog->mds;}
93
94 void finish(int r) override {
95 MDSRank *mds = get_mds();
96 // assume journal is reliable, so don't choose action based on
97 // g_conf->mds_action_on_write_error.
98 if (r == -EBLACKLISTED) {
99 derr << "we have been blacklisted (fenced), respawning..." << dendl;
100 mds->respawn();
101 } else {
102 derr << "unhandled error " << cpp_strerror(r) << ", shutting down..." << dendl;
103 // Although it's possible that this could be something transient,
104 // it's severe and scary, so disable this rank until an administrator
105 // intervenes.
106 mds->clog->error() << "Unhandled journal write error on MDS rank " <<
107 mds->get_nodeid() << ": " << cpp_strerror(r) << ", shutting down.";
108 mds->damaged();
109 ceph_abort(); // damaged should never return
110 }
111 }
112
113 public:
114 explicit C_MDL_WriteError(MDLog *m) : mdlog(m) {}
115};
116
117
118void MDLog::write_head(MDSInternalContextBase *c)
119{
120 Context *fin = NULL;
121 if (c != NULL) {
122 fin = new C_IO_Wrapper(mds, c);
123 }
124 journaler->write_head(fin);
125}
126
127uint64_t MDLog::get_read_pos() const
128{
129 return journaler->get_read_pos();
130}
131
132uint64_t MDLog::get_write_pos() const
133{
134 return journaler->get_write_pos();
135}
136
137uint64_t MDLog::get_safe_pos() const
138{
139 return journaler->get_write_safe_pos();
140}
141
142
143
144void MDLog::create(MDSInternalContextBase *c)
145{
146 dout(5) << "create empty log" << dendl;
147
148 C_GatherBuilder gather(g_ceph_context);
149 // This requires an OnFinisher wrapper because Journaler will call back the completion for write_head inside its own lock
150 // XXX but should maybe that be handled inside Journaler?
151 gather.set_finisher(new C_IO_Wrapper(mds, c));
152
153 // The inode of the default Journaler we will create
154 ino = MDS_INO_LOG_OFFSET + mds->get_nodeid();
155
156 // Instantiate Journaler and start async write to RADOS
157 assert(journaler == NULL);
158 journaler = new Journaler("mdlog", ino, mds->mdsmap->get_metadata_pool(),
159 CEPH_FS_ONDISK_MAGIC, mds->objecter, logger,
160 l_mdl_jlat, mds->finisher);
161 assert(journaler->is_readonly());
162 journaler->set_write_error_handler(new C_MDL_WriteError(this));
163 journaler->set_writeable();
164 journaler->create(&mds->mdcache->default_log_layout, g_conf->mds_journal_format);
165 journaler->write_head(gather.new_sub());
166
167 // Async write JournalPointer to RADOS
168 JournalPointer jp(mds->get_nodeid(), mds->mdsmap->get_metadata_pool());
169 jp.front = ino;
170 jp.back = 0;
171 jp.save(mds->objecter, gather.new_sub());
172
173 gather.activate();
174
175 logger->set(l_mdl_expos, journaler->get_expire_pos());
176 logger->set(l_mdl_wrpos, journaler->get_write_pos());
177
178 submit_thread.create("md_submit");
179}
180
181void MDLog::open(MDSInternalContextBase *c)
182{
183 dout(5) << "open discovering log bounds" << dendl;
184
185 assert(!recovery_thread.is_started());
186 recovery_thread.set_completion(c);
187 recovery_thread.create("md_recov_open");
188
189 submit_thread.create("md_submit");
190 // either append() or replay() will follow.
191}
192
193/**
194 * Final part of reopen() procedure, after recovery_thread
195 * has done its thing we call append()
196 */
197class C_ReopenComplete : public MDSInternalContext {
198 MDLog *mdlog;
199 MDSInternalContextBase *on_complete;
200public:
201 C_ReopenComplete(MDLog *mdlog_, MDSInternalContextBase *on_complete_) : MDSInternalContext(mdlog_->mds), mdlog(mdlog_), on_complete(on_complete_) {}
202 void finish(int r) override {
203 mdlog->append();
204 on_complete->complete(r);
205 }
206};
207
208/**
209 * Given that open() has been called in the past, go through the journal
210 * recovery procedure again, potentially reformatting the journal if it
211 * was in an old format.
212 */
213void MDLog::reopen(MDSInternalContextBase *c)
214{
215 dout(5) << "reopen" << dendl;
216
217 // Because we will call append() at the completion of this, check that we have already
218 // read the whole journal.
219 assert(journaler != NULL);
220 assert(journaler->get_read_pos() == journaler->get_write_pos());
221
222 delete journaler;
223 journaler = NULL;
224
225 // recovery_thread was started at some point in the past. Although
226 // it has called it's completion if we made it back here, it might
227 // still not have been cleaned up: join it.
228 recovery_thread.join();
229
230 recovery_thread.set_completion(new C_ReopenComplete(this, c));
231 recovery_thread.create("md_recov_reopen");
232}
233
234void MDLog::append()
235{
236 dout(5) << "append positioning at end and marking writeable" << dendl;
237 journaler->set_read_pos(journaler->get_write_pos());
238 journaler->set_expire_pos(journaler->get_write_pos());
239
240 journaler->set_writeable();
241
242 logger->set(l_mdl_expos, journaler->get_write_pos());
243}
244
245
246
247// -------------------------------------------------
248
249void MDLog::_start_entry(LogEvent *e)
250{
251 assert(submit_mutex.is_locked_by_me());
252
253 assert(cur_event == NULL);
254 cur_event = e;
255
256 event_seq++;
257
258 EMetaBlob *metablob = e->get_metablob();
259 if (metablob) {
260 metablob->event_seq = event_seq;
261 metablob->last_subtree_map = get_last_segment_seq();
262 }
263}
264
265void MDLog::cancel_entry(LogEvent *le)
266{
267 assert(le == cur_event);
268 cur_event = NULL;
269 delete le;
270}
271
272void MDLog::_submit_entry(LogEvent *le, MDSLogContextBase *c)
273{
274 assert(submit_mutex.is_locked_by_me());
275 assert(!mds->is_any_replay());
276 assert(!capped);
277
278 assert(le == cur_event);
279 cur_event = NULL;
280
281 // let the event register itself in the segment
282 assert(!segments.empty());
283 LogSegment *ls = segments.rbegin()->second;
284 ls->num_events++;
285
286 le->_segment = ls;
287 le->update_segment();
288 le->set_stamp(ceph_clock_now());
289
290 mdsmap_up_features = mds->mdsmap->get_up_features();
291 pending_events[ls->seq].push_back(PendingEvent(le, c));
292 num_events++;
293
294 if (logger) {
295 logger->inc(l_mdl_evadd);
296 logger->set(l_mdl_ev, num_events);
297 }
298
299 unflushed++;
300
301 uint64_t period = journaler->get_layout_period();
302 // start a new segment?
303 if (le->get_type() == EVENT_SUBTREEMAP ||
304 (le->get_type() == EVENT_IMPORTFINISH && mds->is_resolve())) {
305 // avoid infinite loop when ESubtreeMap is very large.
306 // do not insert ESubtreeMap among EImportFinish events that finish
307 // disambiguate imports. Because the ESubtreeMap reflects the subtree
308 // state when all EImportFinish events are replayed.
309 } else if (ls->end/period != ls->offset/period ||
310 ls->num_events >= g_conf->mds_log_events_per_segment) {
311 dout(10) << "submit_entry also starting new segment: last = "
312 << ls->seq << "/" << ls->offset << ", event seq = " << event_seq << dendl;
313 _start_new_segment();
314 } else if (g_conf->mds_debug_subtrees &&
315 le->get_type() != EVENT_SUBTREEMAP_TEST) {
316 // debug: journal this every time to catch subtree replay bugs.
317 // use a different event id so it doesn't get interpreted as a
318 // LogSegment boundary on replay.
319 LogEvent *sle = mds->mdcache->create_subtree_map();
320 sle->set_type(EVENT_SUBTREEMAP_TEST);
321 _submit_entry(sle, NULL);
322 }
323}
324
325/**
326 * Invoked on the flush after each entry submitted
327 */
328class C_MDL_Flushed : public MDSLogContextBase {
329protected:
330 MDLog *mdlog;
331 MDSRank *get_mds() override {return mdlog->mds;}
332 MDSInternalContextBase *wrapped;
333
334 void finish(int r) override {
335 if (wrapped)
336 wrapped->complete(r);
337 }
338
339public:
340 C_MDL_Flushed(MDLog *m, MDSInternalContextBase *w)
341 : mdlog(m), wrapped(w) {}
342 C_MDL_Flushed(MDLog *m, uint64_t wp) : mdlog(m), wrapped(NULL) {
343 set_write_pos(wp);
344 }
345};
346
347void MDLog::_submit_thread()
348{
349 dout(10) << "_submit_thread start" << dendl;
350
351 submit_mutex.Lock();
352
353 while (!mds->is_daemon_stopping()) {
354 if (g_conf->mds_log_pause) {
355 submit_cond.Wait(submit_mutex);
356 continue;
357 }
358
359 map<uint64_t,list<PendingEvent> >::iterator it = pending_events.begin();
360 if (it == pending_events.end()) {
361 submit_cond.Wait(submit_mutex);
362 continue;
363 }
364
365 if (it->second.empty()) {
366 pending_events.erase(it);
367 continue;
368 }
369
370 int64_t features = mdsmap_up_features;
371 PendingEvent data = it->second.front();
372 it->second.pop_front();
373
374 submit_mutex.Unlock();
375
376 if (data.le) {
377 LogEvent *le = data.le;
378 LogSegment *ls = le->_segment;
379 // encode it, with event type
380 bufferlist bl;
381 le->encode_with_header(bl, features);
382
383 uint64_t write_pos = journaler->get_write_pos();
384
385 le->set_start_off(write_pos);
386 if (le->get_type() == EVENT_SUBTREEMAP)
387 ls->offset = write_pos;
388
389 dout(5) << "_submit_thread " << write_pos << "~" << bl.length()
390 << " : " << *le << dendl;
391
392 // journal it.
393 const uint64_t new_write_pos = journaler->append_entry(bl); // bl is destroyed.
394 ls->end = new_write_pos;
395
396 MDSLogContextBase *fin;
397 if (data.fin) {
398 fin = dynamic_cast<MDSLogContextBase*>(data.fin);
399 assert(fin);
400 fin->set_write_pos(new_write_pos);
401 } else {
402 fin = new C_MDL_Flushed(this, new_write_pos);
403 }
404
405 journaler->wait_for_flush(fin);
406
407 if (data.flush)
408 journaler->flush();
409
410 if (logger)
411 logger->set(l_mdl_wrpos, ls->end);
412
413 delete le;
414 } else {
415 if (data.fin) {
416 MDSInternalContextBase* fin =
417 dynamic_cast<MDSInternalContextBase*>(data.fin);
418 assert(fin);
419 C_MDL_Flushed *fin2 = new C_MDL_Flushed(this, fin);
420 fin2->set_write_pos(journaler->get_write_pos());
421 journaler->wait_for_flush(fin2);
422 }
423 if (data.flush)
424 journaler->flush();
425 }
426
427 submit_mutex.Lock();
428 if (data.flush)
429 unflushed = 0;
430 else if (data.le)
431 unflushed++;
432 }
433
434 submit_mutex.Unlock();
435}
436
437void MDLog::wait_for_safe(MDSInternalContextBase *c)
438{
439 submit_mutex.Lock();
440
441 bool no_pending = true;
442 if (!pending_events.empty()) {
443 pending_events.rbegin()->second.push_back(PendingEvent(NULL, c));
444 no_pending = false;
445 submit_cond.Signal();
446 }
447
448 submit_mutex.Unlock();
449
450 if (no_pending && c)
451 journaler->wait_for_flush(new C_IO_Wrapper(mds, c));
452}
453
454void MDLog::flush()
455{
456 submit_mutex.Lock();
457
458 bool do_flush = unflushed > 0;
459 unflushed = 0;
460 if (!pending_events.empty()) {
461 pending_events.rbegin()->second.push_back(PendingEvent(NULL, NULL, true));
462 do_flush = false;
463 submit_cond.Signal();
464 }
465
466 submit_mutex.Unlock();
467
468 if (do_flush)
469 journaler->flush();
470}
471
472void MDLog::kick_submitter()
473{
474 Mutex::Locker l(submit_mutex);
475 submit_cond.Signal();
476}
477
478void MDLog::cap()
479{
480 dout(5) << "cap" << dendl;
481 capped = true;
482}
483
484void MDLog::shutdown()
485{
486 assert(mds->mds_lock.is_locked_by_me());
487
488 dout(5) << "shutdown" << dendl;
489 if (submit_thread.is_started()) {
490 assert(mds->is_daemon_stopping());
491
492 if (submit_thread.am_self()) {
493 // Called suicide from the thread: trust it to do no work after
494 // returning from suicide, and subsequently respect mds->is_daemon_stopping()
495 // and fall out of its loop.
496 } else {
497 mds->mds_lock.Unlock();
498 // Because MDS::stopping is true, it's safe to drop mds_lock: nobody else
499 // picking it up will do anything with it.
500
501 submit_mutex.Lock();
502 submit_cond.Signal();
503 submit_mutex.Unlock();
504
505 mds->mds_lock.Lock();
506
507 submit_thread.join();
508 }
509 }
510
511 // Replay thread can be stuck inside e.g. Journaler::wait_for_readable,
512 // so we need to shutdown the journaler first.
513 if (journaler) {
514 journaler->shutdown();
515 }
516
517 if (replay_thread.is_started() && !replay_thread.am_self()) {
518 mds->mds_lock.Unlock();
519 replay_thread.join();
520 mds->mds_lock.Lock();
521 }
522
523 if (recovery_thread.is_started() && !recovery_thread.am_self()) {
524 mds->mds_lock.Unlock();
525 recovery_thread.join();
526 mds->mds_lock.Lock();
527 }
528}
529
530
531// -----------------------------
532// segments
533
534void MDLog::_start_new_segment()
535{
536 _prepare_new_segment();
537 _journal_segment_subtree_map(NULL);
538}
539
540void MDLog::_prepare_new_segment()
541{
542 assert(submit_mutex.is_locked_by_me());
543
544 uint64_t seq = event_seq + 1;
545 dout(7) << __func__ << " seq " << seq << dendl;
546
547 segments[seq] = new LogSegment(seq);
548
549 logger->inc(l_mdl_segadd);
550 logger->set(l_mdl_seg, segments.size());
551
552 // Adjust to next stray dir
553 dout(10) << "Advancing to next stray directory on mds " << mds->get_nodeid()
554 << dendl;
555 mds->mdcache->advance_stray();
556}
557
558void MDLog::_journal_segment_subtree_map(MDSInternalContextBase *onsync)
559{
560 assert(submit_mutex.is_locked_by_me());
561
562 dout(7) << __func__ << dendl;
563 ESubtreeMap *sle = mds->mdcache->create_subtree_map();
564 sle->event_seq = get_last_segment_seq();
565
566 _submit_entry(sle, new C_MDL_Flushed(this, onsync));
567}
568
569void MDLog::trim(int m)
570{
571 unsigned max_segments = g_conf->mds_log_max_segments;
572 int max_events = g_conf->mds_log_max_events;
573 if (m >= 0)
574 max_events = m;
575
576 if (mds->mdcache->is_readonly()) {
577 dout(10) << "trim, ignoring read-only FS" << dendl;
578 return;
579 }
580
581 // Clamp max_events to not be smaller than events per segment
582 if (max_events > 0 && max_events <= g_conf->mds_log_events_per_segment) {
583 max_events = g_conf->mds_log_events_per_segment + 1;
584 }
585
586 submit_mutex.Lock();
587
588 // trim!
589 dout(10) << "trim "
590 << segments.size() << " / " << max_segments << " segments, "
591 << num_events << " / " << max_events << " events"
592 << ", " << expiring_segments.size() << " (" << expiring_events << ") expiring"
593 << ", " << expired_segments.size() << " (" << expired_events << ") expired"
594 << dendl;
595
596 if (segments.empty()) {
597 submit_mutex.Unlock();
598 return;
599 }
600
601 // hack: only trim for a few seconds at a time
602 utime_t stop = ceph_clock_now();
603 stop += 2.0;
604
605 map<uint64_t,LogSegment*>::iterator p = segments.begin();
606 while (p != segments.end() &&
607 ((max_events >= 0 &&
608 num_events - expiring_events - expired_events > max_events) ||
609 (segments.size() - expiring_segments.size() - expired_segments.size() > max_segments))) {
610
611 if (stop < ceph_clock_now())
612 break;
613
614 int num_expiring_segments = (int)expiring_segments.size();
615 if (num_expiring_segments >= g_conf->mds_log_max_expiring)
616 break;
617
618 int op_prio = CEPH_MSG_PRIO_LOW +
619 (CEPH_MSG_PRIO_HIGH - CEPH_MSG_PRIO_LOW) *
620 num_expiring_segments / g_conf->mds_log_max_expiring;
621
622 // look at first segment
623 LogSegment *ls = p->second;
624 assert(ls);
625 ++p;
626
627 if (pending_events.count(ls->seq) ||
628 ls->end > safe_pos) {
629 dout(5) << "trim segment " << ls->seq << "/" << ls->offset << ", not fully flushed yet, safe "
630 << journaler->get_write_safe_pos() << " < end " << ls->end << dendl;
631 break;
632 }
633 if (expiring_segments.count(ls)) {
634 dout(5) << "trim already expiring segment " << ls->seq << "/" << ls->offset
635 << ", " << ls->num_events << " events" << dendl;
636 } else if (expired_segments.count(ls)) {
637 dout(5) << "trim already expired segment " << ls->seq << "/" << ls->offset
638 << ", " << ls->num_events << " events" << dendl;
639 } else {
640 assert(expiring_segments.count(ls) == 0);
641 expiring_segments.insert(ls);
642 expiring_events += ls->num_events;
643 submit_mutex.Unlock();
644
645 uint64_t last_seq = ls->seq;
646 try_expire(ls, op_prio);
647
648 submit_mutex.Lock();
649 p = segments.lower_bound(last_seq + 1);
650 }
651 }
652
653 // discard expired segments and unlock submit_mutex
654 _trim_expired_segments();
655}
656
657class C_MaybeExpiredSegment : public MDSInternalContext {
658 MDLog *mdlog;
659 LogSegment *ls;
660 int op_prio;
661 public:
662 C_MaybeExpiredSegment(MDLog *mdl, LogSegment *s, int p) :
663 MDSInternalContext(mdl->mds), mdlog(mdl), ls(s), op_prio(p) {}
664 void finish(int res) override {
665 if (res < 0)
666 mdlog->mds->handle_write_error(res);
667 mdlog->_maybe_expired(ls, op_prio);
668 }
669};
670
671/**
672 * Like MDLog::trim, but instead of trimming to max_segments, trim all but the latest
673 * segment.
674 */
675int MDLog::trim_all()
676{
677 submit_mutex.Lock();
678
679 dout(10) << __func__ << ": "
680 << segments.size()
681 << "/" << expiring_segments.size()
682 << "/" << expired_segments.size() << dendl;
683
684 uint64_t last_seq = 0;
685 if (!segments.empty())
686 last_seq = get_last_segment_seq();
687
688 map<uint64_t,LogSegment*>::iterator p = segments.begin();
689 while (p != segments.end() &&
690 p->first < last_seq && p->second->end <= safe_pos) {
691 LogSegment *ls = p->second;
692 ++p;
693
694 // Caller should have flushed journaler before calling this
695 if (pending_events.count(ls->seq)) {
696 dout(5) << __func__ << ": segment " << ls->seq << " has pending events" << dendl;
697 submit_mutex.Unlock();
698 return -EAGAIN;
699 }
700
701 if (expiring_segments.count(ls)) {
702 dout(5) << "trim already expiring segment " << ls->seq << "/" << ls->offset
703 << ", " << ls->num_events << " events" << dendl;
704 } else if (expired_segments.count(ls)) {
705 dout(5) << "trim already expired segment " << ls->seq << "/" << ls->offset
706 << ", " << ls->num_events << " events" << dendl;
707 } else {
708 assert(expiring_segments.count(ls) == 0);
709 expiring_segments.insert(ls);
710 expiring_events += ls->num_events;
711 submit_mutex.Unlock();
712
713 uint64_t next_seq = ls->seq + 1;
714 try_expire(ls, CEPH_MSG_PRIO_DEFAULT);
715
716 submit_mutex.Lock();
717 p = segments.lower_bound(next_seq);
718 }
719 }
720
721 _trim_expired_segments();
722
723 return 0;
724}
725
726
727void MDLog::try_expire(LogSegment *ls, int op_prio)
728{
729 MDSGatherBuilder gather_bld(g_ceph_context);
730 ls->try_to_expire(mds, gather_bld, op_prio);
731
732 if (gather_bld.has_subs()) {
733 dout(5) << "try_expire expiring segment " << ls->seq << "/" << ls->offset << dendl;
734 gather_bld.set_finisher(new C_MaybeExpiredSegment(this, ls, op_prio));
735 gather_bld.activate();
736 } else {
737 dout(10) << "try_expire expired segment " << ls->seq << "/" << ls->offset << dendl;
738 submit_mutex.Lock();
739 assert(expiring_segments.count(ls));
740 expiring_segments.erase(ls);
741 expiring_events -= ls->num_events;
742 _expired(ls);
743 submit_mutex.Unlock();
744 }
745
746 logger->set(l_mdl_segexg, expiring_segments.size());
747 logger->set(l_mdl_evexg, expiring_events);
748}
749
750void MDLog::_maybe_expired(LogSegment *ls, int op_prio)
751{
752 if (mds->mdcache->is_readonly()) {
753 dout(10) << "_maybe_expired, ignoring read-only FS" << dendl;
754 return;
755 }
756
757 dout(10) << "_maybe_expired segment " << ls->seq << "/" << ls->offset
758 << ", " << ls->num_events << " events" << dendl;
759 try_expire(ls, op_prio);
760}
761
762void MDLog::_trim_expired_segments()
763{
764 assert(submit_mutex.is_locked_by_me());
765
766 // trim expired segments?
767 bool trimmed = false;
768 while (!segments.empty()) {
769 LogSegment *ls = segments.begin()->second;
770 if (!expired_segments.count(ls)) {
771 dout(10) << "_trim_expired_segments waiting for " << ls->seq << "/" << ls->offset
772 << " to expire" << dendl;
773 break;
774 }
775
776 dout(10) << "_trim_expired_segments trimming expired "
777 << ls->seq << "/0x" << std::hex << ls->offset << std::dec << dendl;
778 expired_events -= ls->num_events;
779 expired_segments.erase(ls);
780 num_events -= ls->num_events;
781
782 // this was the oldest segment, adjust expire pos
783 if (journaler->get_expire_pos() < ls->end) {
784 journaler->set_expire_pos(ls->end);
31f18b77
FG
785 logger->set(l_mdl_expos, ls->end);
786 } else {
787 logger->set(l_mdl_expos, ls->offset);
7c673cae
FG
788 }
789
7c673cae
FG
790 logger->inc(l_mdl_segtrm);
791 logger->inc(l_mdl_evtrm, ls->num_events);
792
793 segments.erase(ls->seq);
794 delete ls;
795 trimmed = true;
796 }
797
798 submit_mutex.Unlock();
799
800 if (trimmed)
801 journaler->write_head(0);
802}
803
804void MDLog::trim_expired_segments()
805{
806 submit_mutex.Lock();
807 _trim_expired_segments();
808}
809
810void MDLog::_expired(LogSegment *ls)
811{
812 assert(submit_mutex.is_locked_by_me());
813
814 dout(5) << "_expired segment " << ls->seq << "/" << ls->offset
815 << ", " << ls->num_events << " events" << dendl;
816
817 if (!capped && ls == peek_current_segment()) {
818 dout(5) << "_expired not expiring " << ls->seq << "/" << ls->offset
819 << ", last one and !capped" << dendl;
820 } else {
821 // expired.
822 expired_segments.insert(ls);
823 expired_events += ls->num_events;
824
825 // Trigger all waiters
826 for (std::list<MDSInternalContextBase*>::iterator i = ls->expiry_waiters.begin();
827 i != ls->expiry_waiters.end(); ++i) {
828 (*i)->complete(0);
829 }
830 ls->expiry_waiters.clear();
831
832 logger->inc(l_mdl_evex, ls->num_events);
833 logger->inc(l_mdl_segex);
834 }
835
836 logger->set(l_mdl_ev, num_events);
837 logger->set(l_mdl_evexd, expired_events);
838 logger->set(l_mdl_seg, segments.size());
839 logger->set(l_mdl_segexd, expired_segments.size());
840}
841
842
843
844void MDLog::replay(MDSInternalContextBase *c)
845{
846 assert(journaler->is_active());
847 assert(journaler->is_readonly());
848
849 // empty?
850 if (journaler->get_read_pos() == journaler->get_write_pos()) {
851 dout(10) << "replay - journal empty, done." << dendl;
31f18b77 852 mds->mdcache->trim();
7c673cae
FG
853 if (c) {
854 c->complete(0);
855 }
856 return;
857 }
858
859 // add waiter
860 if (c)
861 waitfor_replay.push_back(c);
862
863 // go!
864 dout(10) << "replay start, from " << journaler->get_read_pos()
865 << " to " << journaler->get_write_pos() << dendl;
866
867 assert(num_events == 0 || already_replayed);
868 if (already_replayed) {
869 // Ensure previous instance of ReplayThread is joined before
870 // we create another one
871 replay_thread.join();
872 }
873 already_replayed = true;
874
875 replay_thread.create("md_log_replay");
876}
877
878
879/**
880 * Resolve the JournalPointer object to a journal file, and
881 * instantiate a Journaler object. This may re-write the journal
882 * if the journal in RADOS appears to be in an old format.
883 *
884 * This is a separate thread because of the way it is initialized from inside
885 * the mds lock, which is also the global objecter lock -- rather than split
886 * it up into hard-to-read async operations linked up by contexts,
887 *
888 * When this function completes, the `journaler` attribute will be set to
889 * a Journaler instance using the latest available serialization format.
890 */
891void MDLog::_recovery_thread(MDSInternalContextBase *completion)
892{
893 assert(journaler == NULL);
894 if (g_conf->mds_journal_format > JOURNAL_FORMAT_MAX) {
895 dout(0) << "Configuration value for mds_journal_format is out of bounds, max is "
896 << JOURNAL_FORMAT_MAX << dendl;
897
898 // Oh dear, something unreadable in the store for this rank: require
899 // operator intervention.
900 mds->damaged();
901 ceph_abort(); // damaged should not return
902 }
903
904 // First, read the pointer object.
905 // If the pointer object is not present, then create it with
906 // front = default ino and back = null
907 JournalPointer jp(mds->get_nodeid(), mds->mdsmap->get_metadata_pool());
d2e6a577 908 const int read_result = jp.load(mds->objecter);
7c673cae
FG
909 if (read_result == -ENOENT) {
910 inodeno_t const default_log_ino = MDS_INO_LOG_OFFSET + mds->get_nodeid();
911 jp.front = default_log_ino;
912 int write_result = jp.save(mds->objecter);
913 // Nothing graceful we can do for this
914 assert(write_result >= 0);
915 } else if (read_result == -EBLACKLISTED) {
916 derr << "Blacklisted during JournalPointer read! Respawning..." << dendl;
917 mds->respawn();
918 ceph_abort(); // Should be unreachable because respawn calls execv
919 } else if (read_result != 0) {
920 mds->clog->error() << "failed to read JournalPointer: " << read_result
921 << " (" << cpp_strerror(read_result) << ")";
922 mds->damaged_unlocked();
923 ceph_abort(); // Should be unreachable because damaged() calls respawn()
924 }
925
926 // If the back pointer is non-null, that means that a journal
927 // rewrite failed part way through. Erase the back journal
928 // to clean up.
929 if (jp.back) {
930 if (mds->is_standby_replay()) {
931 dout(1) << "Journal " << jp.front << " is being rewritten, "
932 << "cannot replay in standby until an active MDS completes rewrite" << dendl;
933 Mutex::Locker l(mds->mds_lock);
934 if (mds->is_daemon_stopping()) {
935 return;
936 }
937 completion->complete(-EAGAIN);
938 return;
939 }
940 dout(1) << "Erasing journal " << jp.back << dendl;
941 C_SaferCond erase_waiter;
942 Journaler back("mdlog", jp.back, mds->mdsmap->get_metadata_pool(),
943 CEPH_FS_ONDISK_MAGIC, mds->objecter, logger, l_mdl_jlat,
944 mds->finisher);
945
946 // Read all about this journal (header + extents)
947 C_SaferCond recover_wait;
948 back.recover(&recover_wait);
949 int recovery_result = recover_wait.wait();
950 if (recovery_result == -EBLACKLISTED) {
951 derr << "Blacklisted during journal recovery! Respawning..." << dendl;
952 mds->respawn();
953 ceph_abort(); // Should be unreachable because respawn calls execv
954 } else if (recovery_result != 0) {
955 // Journaler.recover succeeds if no journal objects are present: an error
956 // means something worse like a corrupt header, which we can't handle here.
957 mds->clog->error() << "Error recovering journal " << jp.front << ": "
958 << cpp_strerror(recovery_result);
959 mds->damaged_unlocked();
960 assert(recovery_result == 0); // Unreachable because damaged() calls respawn()
961 }
962
963 // We could read journal, so we can erase it.
964 back.erase(&erase_waiter);
965 int erase_result = erase_waiter.wait();
966
967 // If we are successful, or find no data, we can update the JournalPointer to
968 // reflect that the back journal is gone.
969 if (erase_result != 0 && erase_result != -ENOENT) {
970 derr << "Failed to erase journal " << jp.back << ": " << cpp_strerror(erase_result) << dendl;
971 } else {
972 dout(1) << "Successfully erased journal, updating journal pointer" << dendl;
973 jp.back = 0;
974 int write_result = jp.save(mds->objecter);
975 // Nothing graceful we can do for this
976 assert(write_result >= 0);
977 }
978 }
979
980 /* Read the header from the front journal */
981 Journaler *front_journal = new Journaler("mdlog", jp.front,
982 mds->mdsmap->get_metadata_pool(), CEPH_FS_ONDISK_MAGIC, mds->objecter,
983 logger, l_mdl_jlat, mds->finisher);
984
985 // Assign to ::journaler so that we can be aborted by ::shutdown while
986 // waiting for journaler recovery
987 {
988 Mutex::Locker l(mds->mds_lock);
989 journaler = front_journal;
990 }
991
992 C_SaferCond recover_wait;
993 front_journal->recover(&recover_wait);
994 dout(4) << "Waiting for journal " << jp.front << " to recover..." << dendl;
995 int recovery_result = recover_wait.wait();
996 dout(4) << "Journal " << jp.front << " recovered." << dendl;
997
998 if (recovery_result == -EBLACKLISTED) {
999 derr << "Blacklisted during journal recovery! Respawning..." << dendl;
1000 mds->respawn();
1001 ceph_abort(); // Should be unreachable because respawn calls execv
1002 } else if (recovery_result != 0) {
1003 mds->clog->error() << "Error recovering journal " << jp.front << ": "
1004 << cpp_strerror(recovery_result);
1005 mds->damaged_unlocked();
1006 assert(recovery_result == 0); // Unreachable because damaged() calls respawn()
1007 }
1008
1009 /* Check whether the front journal format is acceptable or needs re-write */
1010 if (front_journal->get_stream_format() > JOURNAL_FORMAT_MAX) {
1011 dout(0) << "Journal " << jp.front << " is in unknown format " << front_journal->get_stream_format()
1012 << ", does this MDS daemon require upgrade?" << dendl;
1013 {
1014 Mutex::Locker l(mds->mds_lock);
1015 if (mds->is_daemon_stopping()) {
1016 journaler = NULL;
1017 delete front_journal;
1018 return;
1019 }
1020 completion->complete(-EINVAL);
1021 }
1022 } else if (mds->is_standby_replay() || front_journal->get_stream_format() >= g_conf->mds_journal_format) {
1023 /* The journal is of configured format, or we are in standbyreplay and will
1024 * tolerate replaying old journals until we have to go active. Use front_journal as
1025 * our journaler attribute and complete */
1026 dout(4) << "Recovered journal " << jp.front << " in format " << front_journal->get_stream_format() << dendl;
1027 journaler->set_write_error_handler(new C_MDL_WriteError(this));
1028 {
1029 Mutex::Locker l(mds->mds_lock);
1030 if (mds->is_daemon_stopping()) {
1031 return;
1032 }
1033 completion->complete(0);
1034 }
1035 } else {
1036 /* Hand off to reformat routine, which will ultimately set the
1037 * completion when it has done its thing */
1038 dout(1) << "Journal " << jp.front << " has old format "
1039 << front_journal->get_stream_format() << ", it will now be updated" << dendl;
1040 _reformat_journal(jp, front_journal, completion);
1041 }
1042}
1043
1044/**
1045 * Blocking rewrite of the journal to a new file, followed by
1046 * swap of journal pointer to point to the new one.
1047 *
1048 * We write the new journal to the 'back' journal from the JournalPointer,
1049 * swapping pointers to make that one the front journal only when we have
1050 * safely completed.
1051 */
1052void MDLog::_reformat_journal(JournalPointer const &jp_in, Journaler *old_journal, MDSInternalContextBase *completion)
1053{
1054 assert(!jp_in.is_null());
1055 assert(completion != NULL);
1056 assert(old_journal != NULL);
1057
1058 JournalPointer jp = jp_in;
1059
1060 /* Set JournalPointer.back to the location we will write the new journal */
1061 inodeno_t primary_ino = MDS_INO_LOG_OFFSET + mds->get_nodeid();
1062 inodeno_t secondary_ino = MDS_INO_LOG_BACKUP_OFFSET + mds->get_nodeid();
1063 jp.back = (jp.front == primary_ino ? secondary_ino : primary_ino);
1064 int write_result = jp.save(mds->objecter);
1065 assert(write_result == 0);
1066
1067 /* Create the new Journaler file */
1068 Journaler *new_journal = new Journaler("mdlog", jp.back,
1069 mds->mdsmap->get_metadata_pool(), CEPH_FS_ONDISK_MAGIC, mds->objecter, logger, l_mdl_jlat, mds->finisher);
1070 dout(4) << "Writing new journal header " << jp.back << dendl;
1071 file_layout_t new_layout = old_journal->get_layout();
1072 new_journal->set_writeable();
1073 new_journal->create(&new_layout, g_conf->mds_journal_format);
1074
1075 /* Write the new journal header to RADOS */
1076 C_SaferCond write_head_wait;
1077 new_journal->write_head(&write_head_wait);
1078 write_head_wait.wait();
1079
1080 // Read in the old journal, and whenever we have readable events,
1081 // write them to the new journal.
1082 int r = 0;
1083
1084 // In old format journals before event_seq was introduced, the serialized
1085 // offset of a SubtreeMap message in the log is used as the unique ID for
1086 // a log segment. Because we change serialization, this will end up changing
1087 // for us, so we have to explicitly update the fields that point back to that
1088 // log segment.
1089 std::map<log_segment_seq_t, log_segment_seq_t> segment_pos_rewrite;
1090
1091 // The logic in here borrowed from replay_thread expects mds_lock to be held,
1092 // e.g. between checking readable and doing wait_for_readable so that journaler
1093 // state doesn't change in between.
1094 uint32_t events_transcribed = 0;
1095 while (1) {
1096 while (!old_journal->is_readable() &&
1097 old_journal->get_read_pos() < old_journal->get_write_pos() &&
1098 !old_journal->get_error()) {
1099
1100 // Issue a journal prefetch
1101 C_SaferCond readable_waiter;
1102 old_journal->wait_for_readable(&readable_waiter);
1103
1104 // Wait for a journal prefetch to complete
1105 readable_waiter.wait();
1106 }
1107 if (old_journal->get_error()) {
1108 r = old_journal->get_error();
1109 dout(0) << "_replay journaler got error " << r << ", aborting" << dendl;
1110 break;
1111 }
1112
1113 if (!old_journal->is_readable() &&
1114 old_journal->get_read_pos() == old_journal->get_write_pos())
1115 break;
1116
1117 // Read one serialized LogEvent
1118 assert(old_journal->is_readable());
1119 bufferlist bl;
1120 uint64_t le_pos = old_journal->get_read_pos();
1121 bool r = old_journal->try_read_entry(bl);
1122 if (!r && old_journal->get_error())
1123 continue;
1124 assert(r);
1125
1126 // Update segment_pos_rewrite
1127 LogEvent *le = LogEvent::decode(bl);
1128 if (le) {
1129 bool modified = false;
1130
1131 if (le->get_type() == EVENT_SUBTREEMAP ||
1132 le->get_type() == EVENT_RESETJOURNAL) {
1133 ESubtreeMap *sle = dynamic_cast<ESubtreeMap*>(le);
1134 if (sle == NULL || sle->event_seq == 0) {
1135 // A non-explicit event seq: the effective sequence number
1136 // of this segment is it's position in the old journal and
1137 // the new effective sequence number will be its position
1138 // in the new journal.
1139 segment_pos_rewrite[le_pos] = new_journal->get_write_pos();
1140 dout(20) << __func__ << " discovered segment seq mapping "
1141 << le_pos << " -> " << new_journal->get_write_pos() << dendl;
1142 }
1143 } else {
1144 event_seq++;
1145 }
1146
1147 // Rewrite segment references if necessary
1148 EMetaBlob *blob = le->get_metablob();
1149 if (blob) {
1150 modified = blob->rewrite_truncate_finish(mds, segment_pos_rewrite);
1151 }
1152
1153 // Zero-out expire_pos in subtreemap because offsets have changed
1154 // (expire_pos is just an optimization so it's safe to eliminate it)
1155 if (le->get_type() == EVENT_SUBTREEMAP
1156 || le->get_type() == EVENT_SUBTREEMAP_TEST) {
1157 ESubtreeMap *sle = dynamic_cast<ESubtreeMap*>(le);
1158 assert(sle != NULL);
1159 dout(20) << __func__ << " zeroing expire_pos in subtreemap event at "
1160 << le_pos << " seq=" << sle->event_seq << dendl;
1161 sle->expire_pos = 0;
1162 modified = true;
1163 }
1164
1165 if (modified) {
1166 bl.clear();
1167 le->encode_with_header(bl, mds->mdsmap->get_up_features());
1168 }
1169
1170 delete le;
1171 } else {
1172 // Failure from LogEvent::decode, our job is to change the journal wrapper,
1173 // not validate the contents, so pass it through.
1174 dout(1) << __func__ << " transcribing un-decodable LogEvent at old position "
1175 << old_journal->get_read_pos() << ", new position " << new_journal->get_write_pos()
1176 << dendl;
1177 }
1178
1179 // Write (buffered, synchronous) one serialized LogEvent
1180 events_transcribed += 1;
1181 new_journal->append_entry(bl);
1182 }
1183
1184 dout(1) << "Transcribed " << events_transcribed << " events, flushing new journal" << dendl;
1185 C_SaferCond flush_waiter;
1186 new_journal->flush(&flush_waiter);
1187 flush_waiter.wait();
1188
1189 // If failed to rewrite journal, leave the part written journal
1190 // as garbage to be cleaned up next startup.
1191 assert(r == 0);
1192
1193 /* Now that the new journal is safe, we can flip the pointers */
1194 inodeno_t const tmp = jp.front;
1195 jp.front = jp.back;
1196 jp.back = tmp;
1197 write_result = jp.save(mds->objecter);
1198 assert(write_result == 0);
1199
1200 /* Delete the old journal to free space */
1201 dout(1) << "New journal flushed, erasing old journal" << dendl;
1202 C_SaferCond erase_waiter;
1203 old_journal->erase(&erase_waiter);
1204 int erase_result = erase_waiter.wait();
1205 assert(erase_result == 0);
1206 {
1207 Mutex::Locker l(mds->mds_lock);
1208 if (mds->is_daemon_stopping()) {
1209 delete new_journal;
1210 return;
1211 }
1212 assert(journaler == old_journal);
1213 journaler = NULL;
1214 delete old_journal;
1215 }
1216
1217 /* Update the pointer to reflect we're back in clean single journal state. */
1218 jp.back = 0;
1219 write_result = jp.save(mds->objecter);
1220 assert(write_result == 0);
1221
1222 /* Reset the Journaler object to its default state */
1223 dout(1) << "Journal rewrite complete, continuing with normal startup" << dendl;
1224 {
1225 Mutex::Locker l(mds->mds_lock);
1226 if (mds->is_daemon_stopping()) {
1227 delete new_journal;
1228 return;
1229 }
1230 journaler = new_journal;
1231 journaler->set_readonly();
1232 journaler->set_write_error_handler(new C_MDL_WriteError(this));
1233 }
1234
1235 /* Trigger completion */
1236 {
1237 Mutex::Locker l(mds->mds_lock);
1238 if (mds->is_daemon_stopping()) {
1239 return;
1240 }
1241 completion->complete(0);
1242 }
1243}
1244
1245
1246// i am a separate thread
1247void MDLog::_replay_thread()
1248{
1249 dout(10) << "_replay_thread start" << dendl;
1250
1251 // loop
1252 int r = 0;
1253 while (1) {
1254 // wait for read?
1255 while (!journaler->is_readable() &&
1256 journaler->get_read_pos() < journaler->get_write_pos() &&
1257 !journaler->get_error()) {
1258 C_SaferCond readable_waiter;
1259 journaler->wait_for_readable(&readable_waiter);
1260 r = readable_waiter.wait();
1261 }
1262 if (journaler->get_error()) {
1263 r = journaler->get_error();
1264 dout(0) << "_replay journaler got error " << r << ", aborting" << dendl;
1265 if (r == -ENOENT) {
1266 if (mds->is_standby_replay()) {
1267 // journal has been trimmed by somebody else
1268 r = -EAGAIN;
1269 } else {
1270 mds->clog->error() << "missing journal object";
1271 mds->damaged_unlocked();
1272 ceph_abort(); // Should be unreachable because damaged() calls respawn()
1273 }
1274 } else if (r == -EINVAL) {
1275 if (journaler->get_read_pos() < journaler->get_expire_pos()) {
1276 // this should only happen if you're following somebody else
1277 if(journaler->is_readonly()) {
1278 dout(0) << "expire_pos is higher than read_pos, returning EAGAIN" << dendl;
1279 r = -EAGAIN;
1280 } else {
1281 mds->clog->error() << "invalid journaler offsets";
1282 mds->damaged_unlocked();
1283 ceph_abort(); // Should be unreachable because damaged() calls respawn()
1284 }
1285 } else {
1286 /* re-read head and check it
1287 * Given that replay happens in a separate thread and
1288 * the MDS is going to either shut down or restart when
1289 * we return this error, doing it synchronously is fine
1290 * -- as long as we drop the main mds lock--. */
1291 C_SaferCond reread_fin;
1292 journaler->reread_head(&reread_fin);
1293 int err = reread_fin.wait();
1294 if (err) {
1295 if (err == -ENOENT && mds->is_standby_replay()) {
1296 r = -EAGAIN;
1297 dout(1) << "Journal header went away while in standby replay, journal rewritten?"
1298 << dendl;
1299 break;
1300 } else {
1301 dout(0) << "got error while reading head: " << cpp_strerror(err)
1302 << dendl;
1303
1304 mds->clog->error() << "error reading journal header";
1305 mds->damaged_unlocked();
1306 ceph_abort(); // Should be unreachable because damaged() calls
1307 // respawn()
1308 }
1309 }
1310 standby_trim_segments();
1311 if (journaler->get_read_pos() < journaler->get_expire_pos()) {
1312 dout(0) << "expire_pos is higher than read_pos, returning EAGAIN" << dendl;
1313 r = -EAGAIN;
1314 }
1315 }
1316 }
1317 break;
1318 }
1319
1320 if (!journaler->is_readable() &&
1321 journaler->get_read_pos() == journaler->get_write_pos())
1322 break;
1323
1324 assert(journaler->is_readable() || mds->is_daemon_stopping());
1325
1326 // read it
1327 uint64_t pos = journaler->get_read_pos();
1328 bufferlist bl;
1329 bool r = journaler->try_read_entry(bl);
1330 if (!r && journaler->get_error())
1331 continue;
1332 assert(r);
1333
1334 // unpack event
1335 LogEvent *le = LogEvent::decode(bl);
1336 if (!le) {
1337 dout(0) << "_replay " << pos << "~" << bl.length() << " / " << journaler->get_write_pos()
1338 << " -- unable to decode event" << dendl;
1339 dout(0) << "dump of unknown or corrupt event:\n";
1340 bl.hexdump(*_dout);
1341 *_dout << dendl;
1342
1343 mds->clog->error() << "corrupt journal event at " << pos << "~"
1344 << bl.length() << " / "
1345 << journaler->get_write_pos();
1346 if (g_conf->mds_log_skip_corrupt_events) {
1347 continue;
1348 } else {
1349 mds->damaged_unlocked();
1350 ceph_abort(); // Should be unreachable because damaged() calls
1351 // respawn()
1352 }
1353
1354 }
1355 le->set_start_off(pos);
1356
1357 // new segment?
1358 if (le->get_type() == EVENT_SUBTREEMAP ||
1359 le->get_type() == EVENT_RESETJOURNAL) {
1360 ESubtreeMap *sle = dynamic_cast<ESubtreeMap*>(le);
1361 if (sle && sle->event_seq > 0)
1362 event_seq = sle->event_seq;
1363 else
1364 event_seq = pos;
1365 segments[event_seq] = new LogSegment(event_seq, pos);
1366 logger->set(l_mdl_seg, segments.size());
1367 } else {
1368 event_seq++;
1369 }
1370
1371 // have we seen an import map yet?
1372 if (segments.empty()) {
1373 dout(10) << "_replay " << pos << "~" << bl.length() << " / " << journaler->get_write_pos()
1374 << " " << le->get_stamp() << " -- waiting for subtree_map. (skipping " << *le << ")" << dendl;
1375 } else {
1376 dout(10) << "_replay " << pos << "~" << bl.length() << " / " << journaler->get_write_pos()
1377 << " " << le->get_stamp() << ": " << *le << dendl;
1378 le->_segment = get_current_segment(); // replay may need this
1379 le->_segment->num_events++;
1380 le->_segment->end = journaler->get_read_pos();
1381 num_events++;
1382
1383 {
1384 Mutex::Locker l(mds->mds_lock);
1385 if (mds->is_daemon_stopping()) {
1386 return;
1387 }
1388 logger->inc(l_mdl_replayed);
1389 le->replay(mds);
1390 }
1391 }
1392 delete le;
1393
1394 logger->set(l_mdl_rdpos, pos);
1395 }
1396
1397 // done!
1398 if (r == 0) {
1399 assert(journaler->get_read_pos() == journaler->get_write_pos());
1400 dout(10) << "_replay - complete, " << num_events
1401 << " events" << dendl;
1402
1403 logger->set(l_mdl_expos, journaler->get_expire_pos());
1404 }
1405
1406 safe_pos = journaler->get_write_safe_pos();
1407
1408 dout(10) << "_replay_thread kicking waiters" << dendl;
1409 {
1410 Mutex::Locker l(mds->mds_lock);
1411 if (mds->is_daemon_stopping()) {
1412 return;
1413 }
1414 finish_contexts(g_ceph_context, waitfor_replay, r);
1415 }
1416
1417 dout(10) << "_replay_thread finish" << dendl;
1418}
1419
1420void MDLog::standby_trim_segments()
1421{
1422 dout(10) << "standby_trim_segments" << dendl;
1423 uint64_t expire_pos = journaler->get_expire_pos();
1424 dout(10) << " expire_pos=" << expire_pos << dendl;
1425 bool removed_segment = false;
1426 while (have_any_segments()) {
1427 LogSegment *seg = get_oldest_segment();
1428 dout(10) << " segment seq=" << seg->seq << " " << seg->offset <<
1429 "~" << seg->end - seg->offset << dendl;
1430
1431 if (seg->end > expire_pos) {
1432 dout(10) << " won't remove, not expired!" << dendl;
1433 break;
1434 }
1435
1436 if (segments.size() == 1) {
1437 dout(10) << " won't remove, last segment!" << dendl;
1438 break;
1439 }
1440
1441 dout(10) << " removing segment" << dendl;
1442 mds->mdcache->standby_trim_segment(seg);
1443 remove_oldest_segment();
1444 removed_segment = true;
1445 }
1446
1447 if (removed_segment) {
1448 dout(20) << " calling mdcache->trim!" << dendl;
1449 mds->mdcache->trim(-1);
1450 } else {
1451 dout(20) << " removed no segments!" << dendl;
1452 }
1453}
1454
1455void MDLog::dump_replay_status(Formatter *f) const
1456{
1457 f->open_object_section("replay_status");
1458 f->dump_unsigned("journal_read_pos", journaler ? journaler->get_read_pos() : 0);
1459 f->dump_unsigned("journal_write_pos", journaler ? journaler->get_write_pos() : 0);
1460 f->dump_unsigned("journal_expire_pos", journaler ? journaler->get_expire_pos() : 0);
1461 f->dump_unsigned("num_events", get_num_events());
1462 f->dump_unsigned("num_segments", get_num_segments());
1463 f->close_section();
1464}