1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
19 #include "MDSContext.h"
21 #include "osdc/Journaler.h"
22 #include "mds/JournalPointer.h"
24 #include "common/entity_name.h"
25 #include "common/perf_counters.h"
26 #include "common/Cond.h"
28 #include "events/ESubtreeMap.h"
30 #include "common/config.h"
31 #include "common/errno.h"
32 #include "include/assert.h"
34 #define dout_context g_ceph_context
35 #define dout_subsys ceph_subsys_mds
37 #define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".log "
42 if (journaler
) { delete journaler
; journaler
= 0; }
44 g_ceph_context
->get_perfcounters_collection()->remove(logger
);
51 void MDLog::create_logger()
53 PerfCountersBuilder
plb(g_ceph_context
, "mds_log", l_mdl_first
, l_mdl_last
);
55 plb
.add_u64_counter(l_mdl_evadd
, "evadd",
56 "Events submitted", "subm");
57 plb
.add_u64_counter(l_mdl_evex
, "evex", "Total expired events");
58 plb
.add_u64_counter(l_mdl_evtrm
, "evtrm", "Trimmed events");
59 plb
.add_u64(l_mdl_ev
, "ev",
61 plb
.add_u64(l_mdl_evexg
, "evexg", "Expiring events");
62 plb
.add_u64(l_mdl_evexd
, "evexd", "Current expired events");
64 plb
.add_u64_counter(l_mdl_segadd
, "segadd", "Segments added");
65 plb
.add_u64_counter(l_mdl_segex
, "segex", "Total expired segments");
66 plb
.add_u64_counter(l_mdl_segtrm
, "segtrm", "Trimmed segments");
67 plb
.add_u64(l_mdl_seg
, "seg",
69 plb
.add_u64(l_mdl_segexg
, "segexg", "Expiring segments");
70 plb
.add_u64(l_mdl_segexd
, "segexd", "Current expired segments");
72 plb
.add_u64(l_mdl_expos
, "expos", "Journaler xpire position");
73 plb
.add_u64(l_mdl_wrpos
, "wrpos", "Journaler write position");
74 plb
.add_u64(l_mdl_rdpos
, "rdpos", "Journaler read position");
75 plb
.add_time_avg(l_mdl_jlat
, "jlat", "Journaler flush latency");
77 plb
.add_u64_counter(l_mdl_replayed
, "replayed", "Events replayed");
80 logger
= plb
.create_perf_counters();
81 g_ceph_context
->get_perfcounters_collection()->add(logger
);
84 void MDLog::set_write_iohint(unsigned iohint_flags
)
86 journaler
->set_write_iohint(iohint_flags
);
89 class C_MDL_WriteError
: public MDSIOContextBase
{
92 MDSRank
*get_mds() override
{return mdlog
->mds
;}
94 void finish(int r
) override
{
95 MDSRank
*mds
= get_mds();
96 // assume journal is reliable, so don't choose action based on
97 // g_conf->mds_action_on_write_error.
98 if (r
== -EBLACKLISTED
) {
99 derr
<< "we have been blacklisted (fenced), respawning..." << dendl
;
102 derr
<< "unhandled error " << cpp_strerror(r
) << ", shutting down..." << dendl
;
103 // Although it's possible that this could be something transient,
104 // it's severe and scary, so disable this rank until an administrator
106 mds
->clog
->error() << "Unhandled journal write error on MDS rank " <<
107 mds
->get_nodeid() << ": " << cpp_strerror(r
) << ", shutting down.";
109 ceph_abort(); // damaged should never return
114 explicit C_MDL_WriteError(MDLog
*m
) : mdlog(m
) {}
118 void MDLog::write_head(MDSInternalContextBase
*c
)
122 fin
= new C_IO_Wrapper(mds
, c
);
124 journaler
->write_head(fin
);
127 uint64_t MDLog::get_read_pos() const
129 return journaler
->get_read_pos();
132 uint64_t MDLog::get_write_pos() const
134 return journaler
->get_write_pos();
137 uint64_t MDLog::get_safe_pos() const
139 return journaler
->get_write_safe_pos();
144 void MDLog::create(MDSInternalContextBase
*c
)
146 dout(5) << "create empty log" << dendl
;
148 C_GatherBuilder
gather(g_ceph_context
);
149 // This requires an OnFinisher wrapper because Journaler will call back the completion for write_head inside its own lock
150 // XXX but should maybe that be handled inside Journaler?
151 gather
.set_finisher(new C_IO_Wrapper(mds
, c
));
153 // The inode of the default Journaler we will create
154 ino
= MDS_INO_LOG_OFFSET
+ mds
->get_nodeid();
156 // Instantiate Journaler and start async write to RADOS
157 assert(journaler
== NULL
);
158 journaler
= new Journaler("mdlog", ino
, mds
->mdsmap
->get_metadata_pool(),
159 CEPH_FS_ONDISK_MAGIC
, mds
->objecter
, logger
,
160 l_mdl_jlat
, mds
->finisher
);
161 assert(journaler
->is_readonly());
162 journaler
->set_write_error_handler(new C_MDL_WriteError(this));
163 journaler
->set_writeable();
164 journaler
->create(&mds
->mdcache
->default_log_layout
, g_conf
->mds_journal_format
);
165 journaler
->write_head(gather
.new_sub());
167 // Async write JournalPointer to RADOS
168 JournalPointer
jp(mds
->get_nodeid(), mds
->mdsmap
->get_metadata_pool());
171 jp
.save(mds
->objecter
, gather
.new_sub());
175 logger
->set(l_mdl_expos
, journaler
->get_expire_pos());
176 logger
->set(l_mdl_wrpos
, journaler
->get_write_pos());
178 submit_thread
.create("md_submit");
181 void MDLog::open(MDSInternalContextBase
*c
)
183 dout(5) << "open discovering log bounds" << dendl
;
185 assert(!recovery_thread
.is_started());
186 recovery_thread
.set_completion(c
);
187 recovery_thread
.create("md_recov_open");
189 submit_thread
.create("md_submit");
190 // either append() or replay() will follow.
194 * Final part of reopen() procedure, after recovery_thread
195 * has done its thing we call append()
197 class C_ReopenComplete
: public MDSInternalContext
{
199 MDSInternalContextBase
*on_complete
;
201 C_ReopenComplete(MDLog
*mdlog_
, MDSInternalContextBase
*on_complete_
) : MDSInternalContext(mdlog_
->mds
), mdlog(mdlog_
), on_complete(on_complete_
) {}
202 void finish(int r
) override
{
204 on_complete
->complete(r
);
209 * Given that open() has been called in the past, go through the journal
210 * recovery procedure again, potentially reformatting the journal if it
211 * was in an old format.
213 void MDLog::reopen(MDSInternalContextBase
*c
)
215 dout(5) << "reopen" << dendl
;
217 // Because we will call append() at the completion of this, check that we have already
218 // read the whole journal.
219 assert(journaler
!= NULL
);
220 assert(journaler
->get_read_pos() == journaler
->get_write_pos());
225 // recovery_thread was started at some point in the past. Although
226 // it has called it's completion if we made it back here, it might
227 // still not have been cleaned up: join it.
228 recovery_thread
.join();
230 recovery_thread
.set_completion(new C_ReopenComplete(this, c
));
231 recovery_thread
.create("md_recov_reopen");
236 dout(5) << "append positioning at end and marking writeable" << dendl
;
237 journaler
->set_read_pos(journaler
->get_write_pos());
238 journaler
->set_expire_pos(journaler
->get_write_pos());
240 journaler
->set_writeable();
242 logger
->set(l_mdl_expos
, journaler
->get_write_pos());
247 // -------------------------------------------------
249 void MDLog::_start_entry(LogEvent
*e
)
251 assert(submit_mutex
.is_locked_by_me());
253 assert(cur_event
== NULL
);
258 EMetaBlob
*metablob
= e
->get_metablob();
260 metablob
->event_seq
= event_seq
;
261 metablob
->last_subtree_map
= get_last_segment_seq();
265 void MDLog::cancel_entry(LogEvent
*le
)
267 assert(le
== cur_event
);
272 void MDLog::_submit_entry(LogEvent
*le
, MDSLogContextBase
*c
)
274 assert(submit_mutex
.is_locked_by_me());
275 assert(!mds
->is_any_replay());
278 assert(le
== cur_event
);
281 // let the event register itself in the segment
282 assert(!segments
.empty());
283 LogSegment
*ls
= segments
.rbegin()->second
;
287 le
->update_segment();
288 le
->set_stamp(ceph_clock_now());
290 mdsmap_up_features
= mds
->mdsmap
->get_up_features();
291 pending_events
[ls
->seq
].push_back(PendingEvent(le
, c
));
295 logger
->inc(l_mdl_evadd
);
296 logger
->set(l_mdl_ev
, num_events
);
301 uint64_t period
= journaler
->get_layout_period();
302 // start a new segment?
303 if (le
->get_type() == EVENT_SUBTREEMAP
||
304 (le
->get_type() == EVENT_IMPORTFINISH
&& mds
->is_resolve())) {
305 // avoid infinite loop when ESubtreeMap is very large.
306 // do not insert ESubtreeMap among EImportFinish events that finish
307 // disambiguate imports. Because the ESubtreeMap reflects the subtree
308 // state when all EImportFinish events are replayed.
309 } else if (ls
->end
/period
!= ls
->offset
/period
||
310 ls
->num_events
>= g_conf
->mds_log_events_per_segment
) {
311 dout(10) << "submit_entry also starting new segment: last = "
312 << ls
->seq
<< "/" << ls
->offset
<< ", event seq = " << event_seq
<< dendl
;
313 _start_new_segment();
314 } else if (g_conf
->mds_debug_subtrees
&&
315 le
->get_type() != EVENT_SUBTREEMAP_TEST
) {
316 // debug: journal this every time to catch subtree replay bugs.
317 // use a different event id so it doesn't get interpreted as a
318 // LogSegment boundary on replay.
319 LogEvent
*sle
= mds
->mdcache
->create_subtree_map();
320 sle
->set_type(EVENT_SUBTREEMAP_TEST
);
321 _submit_entry(sle
, NULL
);
326 * Invoked on the flush after each entry submitted
328 class C_MDL_Flushed
: public MDSLogContextBase
{
331 MDSRank
*get_mds() override
{return mdlog
->mds
;}
332 MDSInternalContextBase
*wrapped
;
334 void finish(int r
) override
{
336 wrapped
->complete(r
);
340 C_MDL_Flushed(MDLog
*m
, MDSInternalContextBase
*w
)
341 : mdlog(m
), wrapped(w
) {}
342 C_MDL_Flushed(MDLog
*m
, uint64_t wp
) : mdlog(m
), wrapped(NULL
) {
347 void MDLog::_submit_thread()
349 dout(10) << "_submit_thread start" << dendl
;
353 while (!mds
->is_daemon_stopping()) {
354 if (g_conf
->mds_log_pause
) {
355 submit_cond
.Wait(submit_mutex
);
359 map
<uint64_t,list
<PendingEvent
> >::iterator it
= pending_events
.begin();
360 if (it
== pending_events
.end()) {
361 submit_cond
.Wait(submit_mutex
);
365 if (it
->second
.empty()) {
366 pending_events
.erase(it
);
370 int64_t features
= mdsmap_up_features
;
371 PendingEvent data
= it
->second
.front();
372 it
->second
.pop_front();
374 submit_mutex
.Unlock();
377 LogEvent
*le
= data
.le
;
378 LogSegment
*ls
= le
->_segment
;
379 // encode it, with event type
381 le
->encode_with_header(bl
, features
);
383 uint64_t write_pos
= journaler
->get_write_pos();
385 le
->set_start_off(write_pos
);
386 if (le
->get_type() == EVENT_SUBTREEMAP
)
387 ls
->offset
= write_pos
;
389 dout(5) << "_submit_thread " << write_pos
<< "~" << bl
.length()
390 << " : " << *le
<< dendl
;
393 const uint64_t new_write_pos
= journaler
->append_entry(bl
); // bl is destroyed.
394 ls
->end
= new_write_pos
;
396 MDSLogContextBase
*fin
;
398 fin
= dynamic_cast<MDSLogContextBase
*>(data
.fin
);
400 fin
->set_write_pos(new_write_pos
);
402 fin
= new C_MDL_Flushed(this, new_write_pos
);
405 journaler
->wait_for_flush(fin
);
411 logger
->set(l_mdl_wrpos
, ls
->end
);
416 MDSInternalContextBase
* fin
=
417 dynamic_cast<MDSInternalContextBase
*>(data
.fin
);
419 C_MDL_Flushed
*fin2
= new C_MDL_Flushed(this, fin
);
420 fin2
->set_write_pos(journaler
->get_write_pos());
421 journaler
->wait_for_flush(fin2
);
434 submit_mutex
.Unlock();
437 void MDLog::wait_for_safe(MDSInternalContextBase
*c
)
441 bool no_pending
= true;
442 if (!pending_events
.empty()) {
443 pending_events
.rbegin()->second
.push_back(PendingEvent(NULL
, c
));
445 submit_cond
.Signal();
448 submit_mutex
.Unlock();
451 journaler
->wait_for_flush(new C_IO_Wrapper(mds
, c
));
458 bool do_flush
= unflushed
> 0;
460 if (!pending_events
.empty()) {
461 pending_events
.rbegin()->second
.push_back(PendingEvent(NULL
, NULL
, true));
463 submit_cond
.Signal();
466 submit_mutex
.Unlock();
472 void MDLog::kick_submitter()
474 Mutex::Locker
l(submit_mutex
);
475 submit_cond
.Signal();
480 dout(5) << "cap" << dendl
;
484 void MDLog::shutdown()
486 assert(mds
->mds_lock
.is_locked_by_me());
488 dout(5) << "shutdown" << dendl
;
489 if (submit_thread
.is_started()) {
490 assert(mds
->is_daemon_stopping());
492 if (submit_thread
.am_self()) {
493 // Called suicide from the thread: trust it to do no work after
494 // returning from suicide, and subsequently respect mds->is_daemon_stopping()
495 // and fall out of its loop.
497 mds
->mds_lock
.Unlock();
498 // Because MDS::stopping is true, it's safe to drop mds_lock: nobody else
499 // picking it up will do anything with it.
502 submit_cond
.Signal();
503 submit_mutex
.Unlock();
505 mds
->mds_lock
.Lock();
507 submit_thread
.join();
511 // Replay thread can be stuck inside e.g. Journaler::wait_for_readable,
512 // so we need to shutdown the journaler first.
514 journaler
->shutdown();
517 if (replay_thread
.is_started() && !replay_thread
.am_self()) {
518 mds
->mds_lock
.Unlock();
519 replay_thread
.join();
520 mds
->mds_lock
.Lock();
523 if (recovery_thread
.is_started() && !recovery_thread
.am_self()) {
524 mds
->mds_lock
.Unlock();
525 recovery_thread
.join();
526 mds
->mds_lock
.Lock();
531 // -----------------------------
534 void MDLog::_start_new_segment()
536 _prepare_new_segment();
537 _journal_segment_subtree_map(NULL
);
540 void MDLog::_prepare_new_segment()
542 assert(submit_mutex
.is_locked_by_me());
544 uint64_t seq
= event_seq
+ 1;
545 dout(7) << __func__
<< " seq " << seq
<< dendl
;
547 segments
[seq
] = new LogSegment(seq
);
549 logger
->inc(l_mdl_segadd
);
550 logger
->set(l_mdl_seg
, segments
.size());
552 // Adjust to next stray dir
553 dout(10) << "Advancing to next stray directory on mds " << mds
->get_nodeid()
555 mds
->mdcache
->advance_stray();
558 void MDLog::_journal_segment_subtree_map(MDSInternalContextBase
*onsync
)
560 assert(submit_mutex
.is_locked_by_me());
562 dout(7) << __func__
<< dendl
;
563 ESubtreeMap
*sle
= mds
->mdcache
->create_subtree_map();
564 sle
->event_seq
= get_last_segment_seq();
566 _submit_entry(sle
, new C_MDL_Flushed(this, onsync
));
569 void MDLog::trim(int m
)
571 unsigned max_segments
= g_conf
->mds_log_max_segments
;
572 int max_events
= g_conf
->mds_log_max_events
;
576 if (mds
->mdcache
->is_readonly()) {
577 dout(10) << "trim, ignoring read-only FS" << dendl
;
581 // Clamp max_events to not be smaller than events per segment
582 if (max_events
> 0 && max_events
<= g_conf
->mds_log_events_per_segment
) {
583 max_events
= g_conf
->mds_log_events_per_segment
+ 1;
590 << segments
.size() << " / " << max_segments
<< " segments, "
591 << num_events
<< " / " << max_events
<< " events"
592 << ", " << expiring_segments
.size() << " (" << expiring_events
<< ") expiring"
593 << ", " << expired_segments
.size() << " (" << expired_events
<< ") expired"
596 if (segments
.empty()) {
597 submit_mutex
.Unlock();
601 // hack: only trim for a few seconds at a time
602 utime_t stop
= ceph_clock_now();
605 map
<uint64_t,LogSegment
*>::iterator p
= segments
.begin();
606 while (p
!= segments
.end() &&
608 num_events
- expiring_events
- expired_events
> max_events
) ||
609 (segments
.size() - expiring_segments
.size() - expired_segments
.size() > max_segments
))) {
611 if (stop
< ceph_clock_now())
614 int num_expiring_segments
= (int)expiring_segments
.size();
615 if (num_expiring_segments
>= g_conf
->mds_log_max_expiring
)
618 int op_prio
= CEPH_MSG_PRIO_LOW
+
619 (CEPH_MSG_PRIO_HIGH
- CEPH_MSG_PRIO_LOW
) *
620 num_expiring_segments
/ g_conf
->mds_log_max_expiring
;
622 // look at first segment
623 LogSegment
*ls
= p
->second
;
627 if (pending_events
.count(ls
->seq
) ||
628 ls
->end
> safe_pos
) {
629 dout(5) << "trim segment " << ls
->seq
<< "/" << ls
->offset
<< ", not fully flushed yet, safe "
630 << journaler
->get_write_safe_pos() << " < end " << ls
->end
<< dendl
;
633 if (expiring_segments
.count(ls
)) {
634 dout(5) << "trim already expiring segment " << ls
->seq
<< "/" << ls
->offset
635 << ", " << ls
->num_events
<< " events" << dendl
;
636 } else if (expired_segments
.count(ls
)) {
637 dout(5) << "trim already expired segment " << ls
->seq
<< "/" << ls
->offset
638 << ", " << ls
->num_events
<< " events" << dendl
;
640 assert(expiring_segments
.count(ls
) == 0);
641 expiring_segments
.insert(ls
);
642 expiring_events
+= ls
->num_events
;
643 submit_mutex
.Unlock();
645 uint64_t last_seq
= ls
->seq
;
646 try_expire(ls
, op_prio
);
649 p
= segments
.lower_bound(last_seq
+ 1);
653 // discard expired segments and unlock submit_mutex
654 _trim_expired_segments();
657 class C_MaybeExpiredSegment
: public MDSInternalContext
{
662 C_MaybeExpiredSegment(MDLog
*mdl
, LogSegment
*s
, int p
) :
663 MDSInternalContext(mdl
->mds
), mdlog(mdl
), ls(s
), op_prio(p
) {}
664 void finish(int res
) override
{
666 mdlog
->mds
->handle_write_error(res
);
667 mdlog
->_maybe_expired(ls
, op_prio
);
672 * Like MDLog::trim, but instead of trimming to max_segments, trim all but the latest
675 int MDLog::trim_all()
679 dout(10) << __func__
<< ": "
681 << "/" << expiring_segments
.size()
682 << "/" << expired_segments
.size() << dendl
;
684 uint64_t last_seq
= 0;
685 if (!segments
.empty())
686 last_seq
= get_last_segment_seq();
688 map
<uint64_t,LogSegment
*>::iterator p
= segments
.begin();
689 while (p
!= segments
.end() &&
690 p
->first
< last_seq
&& p
->second
->end
<= safe_pos
) {
691 LogSegment
*ls
= p
->second
;
694 // Caller should have flushed journaler before calling this
695 if (pending_events
.count(ls
->seq
)) {
696 dout(5) << __func__
<< ": segment " << ls
->seq
<< " has pending events" << dendl
;
697 submit_mutex
.Unlock();
701 if (expiring_segments
.count(ls
)) {
702 dout(5) << "trim already expiring segment " << ls
->seq
<< "/" << ls
->offset
703 << ", " << ls
->num_events
<< " events" << dendl
;
704 } else if (expired_segments
.count(ls
)) {
705 dout(5) << "trim already expired segment " << ls
->seq
<< "/" << ls
->offset
706 << ", " << ls
->num_events
<< " events" << dendl
;
708 assert(expiring_segments
.count(ls
) == 0);
709 expiring_segments
.insert(ls
);
710 expiring_events
+= ls
->num_events
;
711 submit_mutex
.Unlock();
713 uint64_t next_seq
= ls
->seq
+ 1;
714 try_expire(ls
, CEPH_MSG_PRIO_DEFAULT
);
717 p
= segments
.lower_bound(next_seq
);
721 _trim_expired_segments();
727 void MDLog::try_expire(LogSegment
*ls
, int op_prio
)
729 MDSGatherBuilder
gather_bld(g_ceph_context
);
730 ls
->try_to_expire(mds
, gather_bld
, op_prio
);
732 if (gather_bld
.has_subs()) {
733 dout(5) << "try_expire expiring segment " << ls
->seq
<< "/" << ls
->offset
<< dendl
;
734 gather_bld
.set_finisher(new C_MaybeExpiredSegment(this, ls
, op_prio
));
735 gather_bld
.activate();
737 dout(10) << "try_expire expired segment " << ls
->seq
<< "/" << ls
->offset
<< dendl
;
739 assert(expiring_segments
.count(ls
));
740 expiring_segments
.erase(ls
);
741 expiring_events
-= ls
->num_events
;
743 submit_mutex
.Unlock();
746 logger
->set(l_mdl_segexg
, expiring_segments
.size());
747 logger
->set(l_mdl_evexg
, expiring_events
);
750 void MDLog::_maybe_expired(LogSegment
*ls
, int op_prio
)
752 if (mds
->mdcache
->is_readonly()) {
753 dout(10) << "_maybe_expired, ignoring read-only FS" << dendl
;
757 dout(10) << "_maybe_expired segment " << ls
->seq
<< "/" << ls
->offset
758 << ", " << ls
->num_events
<< " events" << dendl
;
759 try_expire(ls
, op_prio
);
762 void MDLog::_trim_expired_segments()
764 assert(submit_mutex
.is_locked_by_me());
766 // trim expired segments?
767 bool trimmed
= false;
768 while (!segments
.empty()) {
769 LogSegment
*ls
= segments
.begin()->second
;
770 if (!expired_segments
.count(ls
)) {
771 dout(10) << "_trim_expired_segments waiting for " << ls
->seq
<< "/" << ls
->offset
772 << " to expire" << dendl
;
776 dout(10) << "_trim_expired_segments trimming expired "
777 << ls
->seq
<< "/0x" << std::hex
<< ls
->offset
<< std::dec
<< dendl
;
778 expired_events
-= ls
->num_events
;
779 expired_segments
.erase(ls
);
780 num_events
-= ls
->num_events
;
782 // this was the oldest segment, adjust expire pos
783 if (journaler
->get_expire_pos() < ls
->end
) {
784 journaler
->set_expire_pos(ls
->end
);
787 logger
->set(l_mdl_expos
, ls
->offset
);
788 logger
->inc(l_mdl_segtrm
);
789 logger
->inc(l_mdl_evtrm
, ls
->num_events
);
791 segments
.erase(ls
->seq
);
796 submit_mutex
.Unlock();
799 journaler
->write_head(0);
802 void MDLog::trim_expired_segments()
805 _trim_expired_segments();
808 void MDLog::_expired(LogSegment
*ls
)
810 assert(submit_mutex
.is_locked_by_me());
812 dout(5) << "_expired segment " << ls
->seq
<< "/" << ls
->offset
813 << ", " << ls
->num_events
<< " events" << dendl
;
815 if (!capped
&& ls
== peek_current_segment()) {
816 dout(5) << "_expired not expiring " << ls
->seq
<< "/" << ls
->offset
817 << ", last one and !capped" << dendl
;
820 expired_segments
.insert(ls
);
821 expired_events
+= ls
->num_events
;
823 // Trigger all waiters
824 for (std::list
<MDSInternalContextBase
*>::iterator i
= ls
->expiry_waiters
.begin();
825 i
!= ls
->expiry_waiters
.end(); ++i
) {
828 ls
->expiry_waiters
.clear();
830 logger
->inc(l_mdl_evex
, ls
->num_events
);
831 logger
->inc(l_mdl_segex
);
834 logger
->set(l_mdl_ev
, num_events
);
835 logger
->set(l_mdl_evexd
, expired_events
);
836 logger
->set(l_mdl_seg
, segments
.size());
837 logger
->set(l_mdl_segexd
, expired_segments
.size());
842 void MDLog::replay(MDSInternalContextBase
*c
)
844 assert(journaler
->is_active());
845 assert(journaler
->is_readonly());
848 if (journaler
->get_read_pos() == journaler
->get_write_pos()) {
849 dout(10) << "replay - journal empty, done." << dendl
;
850 mds
->mdcache
->trim(-1);
859 waitfor_replay
.push_back(c
);
862 dout(10) << "replay start, from " << journaler
->get_read_pos()
863 << " to " << journaler
->get_write_pos() << dendl
;
865 assert(num_events
== 0 || already_replayed
);
866 if (already_replayed
) {
867 // Ensure previous instance of ReplayThread is joined before
868 // we create another one
869 replay_thread
.join();
871 already_replayed
= true;
873 replay_thread
.create("md_log_replay");
878 * Resolve the JournalPointer object to a journal file, and
879 * instantiate a Journaler object. This may re-write the journal
880 * if the journal in RADOS appears to be in an old format.
882 * This is a separate thread because of the way it is initialized from inside
883 * the mds lock, which is also the global objecter lock -- rather than split
884 * it up into hard-to-read async operations linked up by contexts,
886 * When this function completes, the `journaler` attribute will be set to
887 * a Journaler instance using the latest available serialization format.
889 void MDLog::_recovery_thread(MDSInternalContextBase
*completion
)
891 assert(journaler
== NULL
);
892 if (g_conf
->mds_journal_format
> JOURNAL_FORMAT_MAX
) {
893 dout(0) << "Configuration value for mds_journal_format is out of bounds, max is "
894 << JOURNAL_FORMAT_MAX
<< dendl
;
896 // Oh dear, something unreadable in the store for this rank: require
897 // operator intervention.
899 ceph_abort(); // damaged should not return
902 // First, read the pointer object.
903 // If the pointer object is not present, then create it with
904 // front = default ino and back = null
905 JournalPointer
jp(mds
->get_nodeid(), mds
->mdsmap
->get_metadata_pool());
906 int const read_result
= jp
.load(mds
->objecter
);
907 if (read_result
== -ENOENT
) {
908 inodeno_t
const default_log_ino
= MDS_INO_LOG_OFFSET
+ mds
->get_nodeid();
909 jp
.front
= default_log_ino
;
910 int write_result
= jp
.save(mds
->objecter
);
911 // Nothing graceful we can do for this
912 assert(write_result
>= 0);
913 } else if (read_result
== -EBLACKLISTED
) {
914 derr
<< "Blacklisted during JournalPointer read! Respawning..." << dendl
;
916 ceph_abort(); // Should be unreachable because respawn calls execv
917 } else if (read_result
!= 0) {
918 mds
->clog
->error() << "failed to read JournalPointer: " << read_result
919 << " (" << cpp_strerror(read_result
) << ")";
920 mds
->damaged_unlocked();
921 ceph_abort(); // Should be unreachable because damaged() calls respawn()
924 // If the back pointer is non-null, that means that a journal
925 // rewrite failed part way through. Erase the back journal
928 if (mds
->is_standby_replay()) {
929 dout(1) << "Journal " << jp
.front
<< " is being rewritten, "
930 << "cannot replay in standby until an active MDS completes rewrite" << dendl
;
931 Mutex::Locker
l(mds
->mds_lock
);
932 if (mds
->is_daemon_stopping()) {
935 completion
->complete(-EAGAIN
);
938 dout(1) << "Erasing journal " << jp
.back
<< dendl
;
939 C_SaferCond erase_waiter
;
940 Journaler
back("mdlog", jp
.back
, mds
->mdsmap
->get_metadata_pool(),
941 CEPH_FS_ONDISK_MAGIC
, mds
->objecter
, logger
, l_mdl_jlat
,
944 // Read all about this journal (header + extents)
945 C_SaferCond recover_wait
;
946 back
.recover(&recover_wait
);
947 int recovery_result
= recover_wait
.wait();
948 if (recovery_result
== -EBLACKLISTED
) {
949 derr
<< "Blacklisted during journal recovery! Respawning..." << dendl
;
951 ceph_abort(); // Should be unreachable because respawn calls execv
952 } else if (recovery_result
!= 0) {
953 // Journaler.recover succeeds if no journal objects are present: an error
954 // means something worse like a corrupt header, which we can't handle here.
955 mds
->clog
->error() << "Error recovering journal " << jp
.front
<< ": "
956 << cpp_strerror(recovery_result
);
957 mds
->damaged_unlocked();
958 assert(recovery_result
== 0); // Unreachable because damaged() calls respawn()
961 // We could read journal, so we can erase it.
962 back
.erase(&erase_waiter
);
963 int erase_result
= erase_waiter
.wait();
965 // If we are successful, or find no data, we can update the JournalPointer to
966 // reflect that the back journal is gone.
967 if (erase_result
!= 0 && erase_result
!= -ENOENT
) {
968 derr
<< "Failed to erase journal " << jp
.back
<< ": " << cpp_strerror(erase_result
) << dendl
;
970 dout(1) << "Successfully erased journal, updating journal pointer" << dendl
;
972 int write_result
= jp
.save(mds
->objecter
);
973 // Nothing graceful we can do for this
974 assert(write_result
>= 0);
978 /* Read the header from the front journal */
979 Journaler
*front_journal
= new Journaler("mdlog", jp
.front
,
980 mds
->mdsmap
->get_metadata_pool(), CEPH_FS_ONDISK_MAGIC
, mds
->objecter
,
981 logger
, l_mdl_jlat
, mds
->finisher
);
983 // Assign to ::journaler so that we can be aborted by ::shutdown while
984 // waiting for journaler recovery
986 Mutex::Locker
l(mds
->mds_lock
);
987 journaler
= front_journal
;
990 C_SaferCond recover_wait
;
991 front_journal
->recover(&recover_wait
);
992 dout(4) << "Waiting for journal " << jp
.front
<< " to recover..." << dendl
;
993 int recovery_result
= recover_wait
.wait();
994 dout(4) << "Journal " << jp
.front
<< " recovered." << dendl
;
996 if (recovery_result
== -EBLACKLISTED
) {
997 derr
<< "Blacklisted during journal recovery! Respawning..." << dendl
;
999 ceph_abort(); // Should be unreachable because respawn calls execv
1000 } else if (recovery_result
!= 0) {
1001 mds
->clog
->error() << "Error recovering journal " << jp
.front
<< ": "
1002 << cpp_strerror(recovery_result
);
1003 mds
->damaged_unlocked();
1004 assert(recovery_result
== 0); // Unreachable because damaged() calls respawn()
1007 /* Check whether the front journal format is acceptable or needs re-write */
1008 if (front_journal
->get_stream_format() > JOURNAL_FORMAT_MAX
) {
1009 dout(0) << "Journal " << jp
.front
<< " is in unknown format " << front_journal
->get_stream_format()
1010 << ", does this MDS daemon require upgrade?" << dendl
;
1012 Mutex::Locker
l(mds
->mds_lock
);
1013 if (mds
->is_daemon_stopping()) {
1015 delete front_journal
;
1018 completion
->complete(-EINVAL
);
1020 } else if (mds
->is_standby_replay() || front_journal
->get_stream_format() >= g_conf
->mds_journal_format
) {
1021 /* The journal is of configured format, or we are in standbyreplay and will
1022 * tolerate replaying old journals until we have to go active. Use front_journal as
1023 * our journaler attribute and complete */
1024 dout(4) << "Recovered journal " << jp
.front
<< " in format " << front_journal
->get_stream_format() << dendl
;
1025 journaler
->set_write_error_handler(new C_MDL_WriteError(this));
1027 Mutex::Locker
l(mds
->mds_lock
);
1028 if (mds
->is_daemon_stopping()) {
1031 completion
->complete(0);
1034 /* Hand off to reformat routine, which will ultimately set the
1035 * completion when it has done its thing */
1036 dout(1) << "Journal " << jp
.front
<< " has old format "
1037 << front_journal
->get_stream_format() << ", it will now be updated" << dendl
;
1038 _reformat_journal(jp
, front_journal
, completion
);
1043 * Blocking rewrite of the journal to a new file, followed by
1044 * swap of journal pointer to point to the new one.
1046 * We write the new journal to the 'back' journal from the JournalPointer,
1047 * swapping pointers to make that one the front journal only when we have
1050 void MDLog::_reformat_journal(JournalPointer
const &jp_in
, Journaler
*old_journal
, MDSInternalContextBase
*completion
)
1052 assert(!jp_in
.is_null());
1053 assert(completion
!= NULL
);
1054 assert(old_journal
!= NULL
);
1056 JournalPointer jp
= jp_in
;
1058 /* Set JournalPointer.back to the location we will write the new journal */
1059 inodeno_t primary_ino
= MDS_INO_LOG_OFFSET
+ mds
->get_nodeid();
1060 inodeno_t secondary_ino
= MDS_INO_LOG_BACKUP_OFFSET
+ mds
->get_nodeid();
1061 jp
.back
= (jp
.front
== primary_ino
? secondary_ino
: primary_ino
);
1062 int write_result
= jp
.save(mds
->objecter
);
1063 assert(write_result
== 0);
1065 /* Create the new Journaler file */
1066 Journaler
*new_journal
= new Journaler("mdlog", jp
.back
,
1067 mds
->mdsmap
->get_metadata_pool(), CEPH_FS_ONDISK_MAGIC
, mds
->objecter
, logger
, l_mdl_jlat
, mds
->finisher
);
1068 dout(4) << "Writing new journal header " << jp
.back
<< dendl
;
1069 file_layout_t new_layout
= old_journal
->get_layout();
1070 new_journal
->set_writeable();
1071 new_journal
->create(&new_layout
, g_conf
->mds_journal_format
);
1073 /* Write the new journal header to RADOS */
1074 C_SaferCond write_head_wait
;
1075 new_journal
->write_head(&write_head_wait
);
1076 write_head_wait
.wait();
1078 // Read in the old journal, and whenever we have readable events,
1079 // write them to the new journal.
1082 // In old format journals before event_seq was introduced, the serialized
1083 // offset of a SubtreeMap message in the log is used as the unique ID for
1084 // a log segment. Because we change serialization, this will end up changing
1085 // for us, so we have to explicitly update the fields that point back to that
1087 std::map
<log_segment_seq_t
, log_segment_seq_t
> segment_pos_rewrite
;
1089 // The logic in here borrowed from replay_thread expects mds_lock to be held,
1090 // e.g. between checking readable and doing wait_for_readable so that journaler
1091 // state doesn't change in between.
1092 uint32_t events_transcribed
= 0;
1094 while (!old_journal
->is_readable() &&
1095 old_journal
->get_read_pos() < old_journal
->get_write_pos() &&
1096 !old_journal
->get_error()) {
1098 // Issue a journal prefetch
1099 C_SaferCond readable_waiter
;
1100 old_journal
->wait_for_readable(&readable_waiter
);
1102 // Wait for a journal prefetch to complete
1103 readable_waiter
.wait();
1105 if (old_journal
->get_error()) {
1106 r
= old_journal
->get_error();
1107 dout(0) << "_replay journaler got error " << r
<< ", aborting" << dendl
;
1111 if (!old_journal
->is_readable() &&
1112 old_journal
->get_read_pos() == old_journal
->get_write_pos())
1115 // Read one serialized LogEvent
1116 assert(old_journal
->is_readable());
1118 uint64_t le_pos
= old_journal
->get_read_pos();
1119 bool r
= old_journal
->try_read_entry(bl
);
1120 if (!r
&& old_journal
->get_error())
1124 // Update segment_pos_rewrite
1125 LogEvent
*le
= LogEvent::decode(bl
);
1127 bool modified
= false;
1129 if (le
->get_type() == EVENT_SUBTREEMAP
||
1130 le
->get_type() == EVENT_RESETJOURNAL
) {
1131 ESubtreeMap
*sle
= dynamic_cast<ESubtreeMap
*>(le
);
1132 if (sle
== NULL
|| sle
->event_seq
== 0) {
1133 // A non-explicit event seq: the effective sequence number
1134 // of this segment is it's position in the old journal and
1135 // the new effective sequence number will be its position
1136 // in the new journal.
1137 segment_pos_rewrite
[le_pos
] = new_journal
->get_write_pos();
1138 dout(20) << __func__
<< " discovered segment seq mapping "
1139 << le_pos
<< " -> " << new_journal
->get_write_pos() << dendl
;
1145 // Rewrite segment references if necessary
1146 EMetaBlob
*blob
= le
->get_metablob();
1148 modified
= blob
->rewrite_truncate_finish(mds
, segment_pos_rewrite
);
1151 // Zero-out expire_pos in subtreemap because offsets have changed
1152 // (expire_pos is just an optimization so it's safe to eliminate it)
1153 if (le
->get_type() == EVENT_SUBTREEMAP
1154 || le
->get_type() == EVENT_SUBTREEMAP_TEST
) {
1155 ESubtreeMap
*sle
= dynamic_cast<ESubtreeMap
*>(le
);
1156 assert(sle
!= NULL
);
1157 dout(20) << __func__
<< " zeroing expire_pos in subtreemap event at "
1158 << le_pos
<< " seq=" << sle
->event_seq
<< dendl
;
1159 sle
->expire_pos
= 0;
1165 le
->encode_with_header(bl
, mds
->mdsmap
->get_up_features());
1170 // Failure from LogEvent::decode, our job is to change the journal wrapper,
1171 // not validate the contents, so pass it through.
1172 dout(1) << __func__
<< " transcribing un-decodable LogEvent at old position "
1173 << old_journal
->get_read_pos() << ", new position " << new_journal
->get_write_pos()
1177 // Write (buffered, synchronous) one serialized LogEvent
1178 events_transcribed
+= 1;
1179 new_journal
->append_entry(bl
);
1182 dout(1) << "Transcribed " << events_transcribed
<< " events, flushing new journal" << dendl
;
1183 C_SaferCond flush_waiter
;
1184 new_journal
->flush(&flush_waiter
);
1185 flush_waiter
.wait();
1187 // If failed to rewrite journal, leave the part written journal
1188 // as garbage to be cleaned up next startup.
1191 /* Now that the new journal is safe, we can flip the pointers */
1192 inodeno_t
const tmp
= jp
.front
;
1195 write_result
= jp
.save(mds
->objecter
);
1196 assert(write_result
== 0);
1198 /* Delete the old journal to free space */
1199 dout(1) << "New journal flushed, erasing old journal" << dendl
;
1200 C_SaferCond erase_waiter
;
1201 old_journal
->erase(&erase_waiter
);
1202 int erase_result
= erase_waiter
.wait();
1203 assert(erase_result
== 0);
1205 Mutex::Locker
l(mds
->mds_lock
);
1206 if (mds
->is_daemon_stopping()) {
1210 assert(journaler
== old_journal
);
1215 /* Update the pointer to reflect we're back in clean single journal state. */
1217 write_result
= jp
.save(mds
->objecter
);
1218 assert(write_result
== 0);
1220 /* Reset the Journaler object to its default state */
1221 dout(1) << "Journal rewrite complete, continuing with normal startup" << dendl
;
1223 Mutex::Locker
l(mds
->mds_lock
);
1224 if (mds
->is_daemon_stopping()) {
1228 journaler
= new_journal
;
1229 journaler
->set_readonly();
1230 journaler
->set_write_error_handler(new C_MDL_WriteError(this));
1233 /* Trigger completion */
1235 Mutex::Locker
l(mds
->mds_lock
);
1236 if (mds
->is_daemon_stopping()) {
1239 completion
->complete(0);
1244 // i am a separate thread
1245 void MDLog::_replay_thread()
1247 dout(10) << "_replay_thread start" << dendl
;
1253 while (!journaler
->is_readable() &&
1254 journaler
->get_read_pos() < journaler
->get_write_pos() &&
1255 !journaler
->get_error()) {
1256 C_SaferCond readable_waiter
;
1257 journaler
->wait_for_readable(&readable_waiter
);
1258 r
= readable_waiter
.wait();
1260 if (journaler
->get_error()) {
1261 r
= journaler
->get_error();
1262 dout(0) << "_replay journaler got error " << r
<< ", aborting" << dendl
;
1264 if (mds
->is_standby_replay()) {
1265 // journal has been trimmed by somebody else
1268 mds
->clog
->error() << "missing journal object";
1269 mds
->damaged_unlocked();
1270 ceph_abort(); // Should be unreachable because damaged() calls respawn()
1272 } else if (r
== -EINVAL
) {
1273 if (journaler
->get_read_pos() < journaler
->get_expire_pos()) {
1274 // this should only happen if you're following somebody else
1275 if(journaler
->is_readonly()) {
1276 dout(0) << "expire_pos is higher than read_pos, returning EAGAIN" << dendl
;
1279 mds
->clog
->error() << "invalid journaler offsets";
1280 mds
->damaged_unlocked();
1281 ceph_abort(); // Should be unreachable because damaged() calls respawn()
1284 /* re-read head and check it
1285 * Given that replay happens in a separate thread and
1286 * the MDS is going to either shut down or restart when
1287 * we return this error, doing it synchronously is fine
1288 * -- as long as we drop the main mds lock--. */
1289 C_SaferCond reread_fin
;
1290 journaler
->reread_head(&reread_fin
);
1291 int err
= reread_fin
.wait();
1293 if (err
== -ENOENT
&& mds
->is_standby_replay()) {
1295 dout(1) << "Journal header went away while in standby replay, journal rewritten?"
1299 dout(0) << "got error while reading head: " << cpp_strerror(err
)
1302 mds
->clog
->error() << "error reading journal header";
1303 mds
->damaged_unlocked();
1304 ceph_abort(); // Should be unreachable because damaged() calls
1308 standby_trim_segments();
1309 if (journaler
->get_read_pos() < journaler
->get_expire_pos()) {
1310 dout(0) << "expire_pos is higher than read_pos, returning EAGAIN" << dendl
;
1318 if (!journaler
->is_readable() &&
1319 journaler
->get_read_pos() == journaler
->get_write_pos())
1322 assert(journaler
->is_readable() || mds
->is_daemon_stopping());
1325 uint64_t pos
= journaler
->get_read_pos();
1327 bool r
= journaler
->try_read_entry(bl
);
1328 if (!r
&& journaler
->get_error())
1333 LogEvent
*le
= LogEvent::decode(bl
);
1335 dout(0) << "_replay " << pos
<< "~" << bl
.length() << " / " << journaler
->get_write_pos()
1336 << " -- unable to decode event" << dendl
;
1337 dout(0) << "dump of unknown or corrupt event:\n";
1341 mds
->clog
->error() << "corrupt journal event at " << pos
<< "~"
1342 << bl
.length() << " / "
1343 << journaler
->get_write_pos();
1344 if (g_conf
->mds_log_skip_corrupt_events
) {
1347 mds
->damaged_unlocked();
1348 ceph_abort(); // Should be unreachable because damaged() calls
1353 le
->set_start_off(pos
);
1356 if (le
->get_type() == EVENT_SUBTREEMAP
||
1357 le
->get_type() == EVENT_RESETJOURNAL
) {
1358 ESubtreeMap
*sle
= dynamic_cast<ESubtreeMap
*>(le
);
1359 if (sle
&& sle
->event_seq
> 0)
1360 event_seq
= sle
->event_seq
;
1363 segments
[event_seq
] = new LogSegment(event_seq
, pos
);
1364 logger
->set(l_mdl_seg
, segments
.size());
1369 // have we seen an import map yet?
1370 if (segments
.empty()) {
1371 dout(10) << "_replay " << pos
<< "~" << bl
.length() << " / " << journaler
->get_write_pos()
1372 << " " << le
->get_stamp() << " -- waiting for subtree_map. (skipping " << *le
<< ")" << dendl
;
1374 dout(10) << "_replay " << pos
<< "~" << bl
.length() << " / " << journaler
->get_write_pos()
1375 << " " << le
->get_stamp() << ": " << *le
<< dendl
;
1376 le
->_segment
= get_current_segment(); // replay may need this
1377 le
->_segment
->num_events
++;
1378 le
->_segment
->end
= journaler
->get_read_pos();
1382 Mutex::Locker
l(mds
->mds_lock
);
1383 if (mds
->is_daemon_stopping()) {
1386 logger
->inc(l_mdl_replayed
);
1392 logger
->set(l_mdl_rdpos
, pos
);
1397 assert(journaler
->get_read_pos() == journaler
->get_write_pos());
1398 dout(10) << "_replay - complete, " << num_events
1399 << " events" << dendl
;
1401 logger
->set(l_mdl_expos
, journaler
->get_expire_pos());
1404 safe_pos
= journaler
->get_write_safe_pos();
1406 dout(10) << "_replay_thread kicking waiters" << dendl
;
1408 Mutex::Locker
l(mds
->mds_lock
);
1409 if (mds
->is_daemon_stopping()) {
1412 finish_contexts(g_ceph_context
, waitfor_replay
, r
);
1415 dout(10) << "_replay_thread finish" << dendl
;
1418 void MDLog::standby_trim_segments()
1420 dout(10) << "standby_trim_segments" << dendl
;
1421 uint64_t expire_pos
= journaler
->get_expire_pos();
1422 dout(10) << " expire_pos=" << expire_pos
<< dendl
;
1423 bool removed_segment
= false;
1424 while (have_any_segments()) {
1425 LogSegment
*seg
= get_oldest_segment();
1426 dout(10) << " segment seq=" << seg
->seq
<< " " << seg
->offset
<<
1427 "~" << seg
->end
- seg
->offset
<< dendl
;
1429 if (seg
->end
> expire_pos
) {
1430 dout(10) << " won't remove, not expired!" << dendl
;
1434 if (segments
.size() == 1) {
1435 dout(10) << " won't remove, last segment!" << dendl
;
1439 dout(10) << " removing segment" << dendl
;
1440 mds
->mdcache
->standby_trim_segment(seg
);
1441 remove_oldest_segment();
1442 removed_segment
= true;
1445 if (removed_segment
) {
1446 dout(20) << " calling mdcache->trim!" << dendl
;
1447 mds
->mdcache
->trim(-1);
1449 dout(20) << " removed no segments!" << dendl
;
1453 void MDLog::dump_replay_status(Formatter
*f
) const
1455 f
->open_object_section("replay_status");
1456 f
->dump_unsigned("journal_read_pos", journaler
? journaler
->get_read_pos() : 0);
1457 f
->dump_unsigned("journal_write_pos", journaler
? journaler
->get_write_pos() : 0);
1458 f
->dump_unsigned("journal_expire_pos", journaler
? journaler
->get_expire_pos() : 0);
1459 f
->dump_unsigned("num_events", get_num_events());
1460 f
->dump_unsigned("num_segments", get_num_segments());