1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
19 #include "MDSContext.h"
21 #include "osdc/Journaler.h"
22 #include "mds/JournalPointer.h"
24 #include "common/entity_name.h"
25 #include "common/perf_counters.h"
26 #include "common/Cond.h"
28 #include "events/ESubtreeMap.h"
30 #include "common/config.h"
31 #include "common/errno.h"
32 #include "include/ceph_assert.h"
34 #define dout_context g_ceph_context
35 #define dout_subsys ceph_subsys_mds
37 #define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".log "
44 if (journaler
) { delete journaler
; journaler
= 0; }
46 g_ceph_context
->get_perfcounters_collection()->remove(logger
);
53 void MDLog::create_logger()
55 PerfCountersBuilder
plb(g_ceph_context
, "mds_log", l_mdl_first
, l_mdl_last
);
57 plb
.add_u64_counter(l_mdl_evadd
, "evadd", "Events submitted", "subm",
58 PerfCountersBuilder::PRIO_INTERESTING
);
59 plb
.add_u64(l_mdl_ev
, "ev", "Events", "evts",
60 PerfCountersBuilder::PRIO_INTERESTING
);
61 plb
.add_u64(l_mdl_seg
, "seg", "Segments", "segs",
62 PerfCountersBuilder::PRIO_INTERESTING
);
64 plb
.set_prio_default(PerfCountersBuilder::PRIO_USEFUL
);
65 plb
.add_u64(l_mdl_evexg
, "evexg", "Expiring events");
66 plb
.add_u64(l_mdl_evexd
, "evexd", "Current expired events");
67 plb
.add_u64(l_mdl_segexg
, "segexg", "Expiring segments");
68 plb
.add_u64(l_mdl_segexd
, "segexd", "Current expired segments");
69 plb
.add_u64_counter(l_mdl_replayed
, "replayed", "Events replayed",
70 "repl", PerfCountersBuilder::PRIO_INTERESTING
);
71 plb
.add_time_avg(l_mdl_jlat
, "jlat", "Journaler flush latency");
72 plb
.add_u64_counter(l_mdl_evex
, "evex", "Total expired events");
73 plb
.add_u64_counter(l_mdl_evtrm
, "evtrm", "Trimmed events");
74 plb
.add_u64_counter(l_mdl_segadd
, "segadd", "Segments added");
75 plb
.add_u64_counter(l_mdl_segex
, "segex", "Total expired segments");
76 plb
.add_u64_counter(l_mdl_segtrm
, "segtrm", "Trimmed segments");
78 plb
.set_prio_default(PerfCountersBuilder::PRIO_DEBUGONLY
);
79 plb
.add_u64(l_mdl_expos
, "expos", "Journaler xpire position");
80 plb
.add_u64(l_mdl_wrpos
, "wrpos", "Journaler write position");
81 plb
.add_u64(l_mdl_rdpos
, "rdpos", "Journaler read position");
84 logger
= plb
.create_perf_counters();
85 g_ceph_context
->get_perfcounters_collection()->add(logger
);
88 void MDLog::set_write_iohint(unsigned iohint_flags
)
90 journaler
->set_write_iohint(iohint_flags
);
93 class C_MDL_WriteError
: public MDSIOContextBase
{
96 MDSRank
*get_mds() override
{return mdlog
->mds
;}
98 void finish(int r
) override
{
99 MDSRank
*mds
= get_mds();
100 // assume journal is reliable, so don't choose action based on
101 // g_conf()->mds_action_on_write_error.
102 if (r
== -CEPHFS_EBLOCKLISTED
) {
103 derr
<< "we have been blocklisted (fenced), respawning..." << dendl
;
106 derr
<< "unhandled error " << cpp_strerror(r
) << ", shutting down..." << dendl
;
107 // Although it's possible that this could be something transient,
108 // it's severe and scary, so disable this rank until an administrator
110 mds
->clog
->error() << "Unhandled journal write error on MDS rank " <<
111 mds
->get_nodeid() << ": " << cpp_strerror(r
) << ", shutting down.";
113 ceph_abort(); // damaged should never return
118 explicit C_MDL_WriteError(MDLog
*m
) :
119 MDSIOContextBase(false), mdlog(m
) {}
120 void print(ostream
& out
) const override
{
121 out
<< "mdlog_write_error";
126 void MDLog::write_head(MDSContext
*c
)
130 fin
= new C_IO_Wrapper(mds
, c
);
132 journaler
->write_head(fin
);
135 uint64_t MDLog::get_read_pos() const
137 return journaler
->get_read_pos();
140 uint64_t MDLog::get_write_pos() const
142 return journaler
->get_write_pos();
145 uint64_t MDLog::get_safe_pos() const
147 return journaler
->get_write_safe_pos();
152 void MDLog::create(MDSContext
*c
)
154 dout(5) << "create empty log" << dendl
;
156 C_GatherBuilder
gather(g_ceph_context
);
157 // This requires an OnFinisher wrapper because Journaler will call back the completion for write_head inside its own lock
158 // XXX but should maybe that be handled inside Journaler?
159 gather
.set_finisher(new C_IO_Wrapper(mds
, c
));
161 // The inode of the default Journaler we will create
162 ino
= MDS_INO_LOG_OFFSET
+ mds
->get_nodeid();
164 // Instantiate Journaler and start async write to RADOS
165 ceph_assert(journaler
== NULL
);
166 journaler
= new Journaler("mdlog", ino
, mds
->get_metadata_pool(),
167 CEPH_FS_ONDISK_MAGIC
, mds
->objecter
, logger
,
168 l_mdl_jlat
, mds
->finisher
);
169 ceph_assert(journaler
->is_readonly());
170 journaler
->set_write_error_handler(new C_MDL_WriteError(this));
171 journaler
->set_writeable();
172 journaler
->create(&mds
->mdcache
->default_log_layout
, g_conf()->mds_journal_format
);
173 journaler
->write_head(gather
.new_sub());
175 // Async write JournalPointer to RADOS
176 JournalPointer
jp(mds
->get_nodeid(), mds
->get_metadata_pool());
179 jp
.save(mds
->objecter
, gather
.new_sub());
183 logger
->set(l_mdl_expos
, journaler
->get_expire_pos());
184 logger
->set(l_mdl_wrpos
, journaler
->get_write_pos());
186 submit_thread
.create("md_submit");
189 void MDLog::open(MDSContext
*c
)
191 dout(5) << "open discovering log bounds" << dendl
;
193 ceph_assert(!recovery_thread
.is_started());
194 recovery_thread
.set_completion(c
);
195 recovery_thread
.create("md_recov_open");
197 submit_thread
.create("md_submit");
198 // either append() or replay() will follow.
202 * Final part of reopen() procedure, after recovery_thread
203 * has done its thing we call append()
205 class C_ReopenComplete
: public MDSInternalContext
{
207 MDSContext
*on_complete
;
209 C_ReopenComplete(MDLog
*mdlog_
, MDSContext
*on_complete_
) : MDSInternalContext(mdlog_
->mds
), mdlog(mdlog_
), on_complete(on_complete_
) {}
210 void finish(int r
) override
{
212 on_complete
->complete(r
);
217 * Given that open() has been called in the past, go through the journal
218 * recovery procedure again, potentially reformatting the journal if it
219 * was in an old format.
221 void MDLog::reopen(MDSContext
*c
)
223 dout(5) << "reopen" << dendl
;
225 // Because we will call append() at the completion of this, check that we have already
226 // read the whole journal.
227 ceph_assert(journaler
!= NULL
);
228 ceph_assert(journaler
->get_read_pos() == journaler
->get_write_pos());
233 // recovery_thread was started at some point in the past. Although
234 // it has called it's completion if we made it back here, it might
235 // still not have been cleaned up: join it.
236 recovery_thread
.join();
238 recovery_thread
.set_completion(new C_ReopenComplete(this, c
));
239 recovery_thread
.create("md_recov_reopen");
244 dout(5) << "append positioning at end and marking writeable" << dendl
;
245 journaler
->set_read_pos(journaler
->get_write_pos());
246 journaler
->set_expire_pos(journaler
->get_write_pos());
248 journaler
->set_writeable();
250 logger
->set(l_mdl_expos
, journaler
->get_write_pos());
255 // -------------------------------------------------
257 void MDLog::_start_entry(LogEvent
*e
)
259 ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex
));
261 ceph_assert(cur_event
== NULL
);
266 EMetaBlob
*metablob
= e
->get_metablob();
268 metablob
->event_seq
= event_seq
;
269 metablob
->last_subtree_map
= get_last_segment_seq();
273 void MDLog::cancel_entry(LogEvent
*le
)
275 ceph_assert(le
== cur_event
);
280 void MDLog::_submit_entry(LogEvent
*le
, MDSLogContextBase
*c
)
282 ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex
));
283 ceph_assert(!mds
->is_any_replay());
284 ceph_assert(!mds_is_shutting_down
);
286 ceph_assert(le
== cur_event
);
289 // let the event register itself in the segment
290 ceph_assert(!segments
.empty());
291 LogSegment
*ls
= segments
.rbegin()->second
;
295 le
->update_segment();
296 le
->set_stamp(ceph_clock_now());
298 mdsmap_up_features
= mds
->mdsmap
->get_up_features();
299 pending_events
[ls
->seq
].push_back(PendingEvent(le
, c
));
303 logger
->inc(l_mdl_evadd
);
304 logger
->set(l_mdl_ev
, num_events
);
309 uint64_t period
= journaler
->get_layout_period();
310 // start a new segment?
311 if (le
->get_type() == EVENT_SUBTREEMAP
||
312 (le
->get_type() == EVENT_IMPORTFINISH
&& mds
->is_resolve())) {
313 // avoid infinite loop when ESubtreeMap is very large.
314 // do not insert ESubtreeMap among EImportFinish events that finish
315 // disambiguate imports. Because the ESubtreeMap reflects the subtree
316 // state when all EImportFinish events are replayed.
317 } else if (ls
->end
/period
!= ls
->offset
/period
||
318 ls
->num_events
>= g_conf()->mds_log_events_per_segment
) {
319 dout(10) << "submit_entry also starting new segment: last = "
320 << ls
->seq
<< "/" << ls
->offset
<< ", event seq = " << event_seq
<< dendl
;
321 _start_new_segment();
322 } else if (g_conf()->mds_debug_subtrees
&&
323 le
->get_type() != EVENT_SUBTREEMAP_TEST
) {
324 // debug: journal this every time to catch subtree replay bugs.
325 // use a different event id so it doesn't get interpreted as a
326 // LogSegment boundary on replay.
327 LogEvent
*sle
= mds
->mdcache
->create_subtree_map();
328 sle
->set_type(EVENT_SUBTREEMAP_TEST
);
329 _submit_entry(sle
, NULL
);
334 * Invoked on the flush after each entry submitted
336 class C_MDL_Flushed
: public MDSLogContextBase
{
339 MDSRank
*get_mds() override
{return mdlog
->mds
;}
342 void finish(int r
) override
{
344 wrapped
->complete(r
);
348 C_MDL_Flushed(MDLog
*m
, MDSContext
*w
)
349 : mdlog(m
), wrapped(w
) {}
350 C_MDL_Flushed(MDLog
*m
, uint64_t wp
) : mdlog(m
), wrapped(NULL
) {
355 void MDLog::_submit_thread()
357 dout(10) << "_submit_thread start" << dendl
;
359 std::unique_lock locker
{submit_mutex
};
361 while (!mds
->is_daemon_stopping()) {
362 if (g_conf()->mds_log_pause
) {
363 submit_cond
.wait(locker
);
367 map
<uint64_t,list
<PendingEvent
> >::iterator it
= pending_events
.begin();
368 if (it
== pending_events
.end()) {
369 submit_cond
.wait(locker
);
373 if (it
->second
.empty()) {
374 pending_events
.erase(it
);
378 int64_t features
= mdsmap_up_features
;
379 PendingEvent data
= it
->second
.front();
380 it
->second
.pop_front();
385 LogEvent
*le
= data
.le
;
386 LogSegment
*ls
= le
->_segment
;
387 // encode it, with event type
389 le
->encode_with_header(bl
, features
);
391 uint64_t write_pos
= journaler
->get_write_pos();
393 le
->set_start_off(write_pos
);
394 if (le
->get_type() == EVENT_SUBTREEMAP
)
395 ls
->offset
= write_pos
;
397 dout(5) << "_submit_thread " << write_pos
<< "~" << bl
.length()
398 << " : " << *le
<< dendl
;
401 const uint64_t new_write_pos
= journaler
->append_entry(bl
); // bl is destroyed.
402 ls
->end
= new_write_pos
;
404 MDSLogContextBase
*fin
;
406 fin
= dynamic_cast<MDSLogContextBase
*>(data
.fin
);
408 fin
->set_write_pos(new_write_pos
);
410 fin
= new C_MDL_Flushed(this, new_write_pos
);
413 journaler
->wait_for_flush(fin
);
419 logger
->set(l_mdl_wrpos
, ls
->end
);
425 dynamic_cast<MDSContext
*>(data
.fin
);
427 C_MDL_Flushed
*fin2
= new C_MDL_Flushed(this, fin
);
428 fin2
->set_write_pos(journaler
->get_write_pos());
429 journaler
->wait_for_flush(fin2
);
443 void MDLog::wait_for_safe(MDSContext
*c
)
447 bool no_pending
= true;
448 if (!pending_events
.empty()) {
449 pending_events
.rbegin()->second
.push_back(PendingEvent(NULL
, c
));
451 submit_cond
.notify_all();
454 submit_mutex
.unlock();
457 journaler
->wait_for_flush(new C_IO_Wrapper(mds
, c
));
464 bool do_flush
= unflushed
> 0;
466 if (!pending_events
.empty()) {
467 pending_events
.rbegin()->second
.push_back(PendingEvent(NULL
, NULL
, true));
469 submit_cond
.notify_all();
472 submit_mutex
.unlock();
478 void MDLog::kick_submitter()
480 std::lock_guard
l(submit_mutex
);
481 submit_cond
.notify_all();
486 dout(5) << "mark mds is shutting down" << dendl
;
487 mds_is_shutting_down
= true;
490 void MDLog::shutdown()
492 ceph_assert(ceph_mutex_is_locked_by_me(mds
->mds_lock
));
494 dout(5) << "shutdown" << dendl
;
495 if (submit_thread
.is_started()) {
496 ceph_assert(mds
->is_daemon_stopping());
498 if (submit_thread
.am_self()) {
499 // Called suicide from the thread: trust it to do no work after
500 // returning from suicide, and subsequently respect mds->is_daemon_stopping()
501 // and fall out of its loop.
503 mds
->mds_lock
.unlock();
504 // Because MDS::stopping is true, it's safe to drop mds_lock: nobody else
505 // picking it up will do anything with it.
508 submit_cond
.notify_all();
509 submit_mutex
.unlock();
511 mds
->mds_lock
.lock();
513 submit_thread
.join();
517 // Replay thread can be stuck inside e.g. Journaler::wait_for_readable,
518 // so we need to shutdown the journaler first.
520 journaler
->shutdown();
523 if (replay_thread
.is_started() && !replay_thread
.am_self()) {
524 mds
->mds_lock
.unlock();
525 replay_thread
.join();
526 mds
->mds_lock
.lock();
529 if (recovery_thread
.is_started() && !recovery_thread
.am_self()) {
530 mds
->mds_lock
.unlock();
531 recovery_thread
.join();
532 mds
->mds_lock
.lock();
537 // -----------------------------
540 void MDLog::_start_new_segment()
542 _prepare_new_segment();
543 _journal_segment_subtree_map(NULL
);
546 void MDLog::_prepare_new_segment()
548 ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex
));
550 uint64_t seq
= event_seq
+ 1;
551 dout(7) << __func__
<< " seq " << seq
<< dendl
;
553 segments
[seq
] = new LogSegment(seq
);
555 logger
->inc(l_mdl_segadd
);
556 logger
->set(l_mdl_seg
, segments
.size());
558 // Adjust to next stray dir
559 mds
->mdcache
->advance_stray();
562 void MDLog::_journal_segment_subtree_map(MDSContext
*onsync
)
564 ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex
));
566 dout(7) << __func__
<< dendl
;
567 ESubtreeMap
*sle
= mds
->mdcache
->create_subtree_map();
568 sle
->event_seq
= get_last_segment_seq();
570 _submit_entry(sle
, new C_MDL_Flushed(this, onsync
));
573 class C_OFT_Committed
: public MDSInternalContext
{
577 C_OFT_Committed(MDLog
*l
, uint64_t s
) :
578 MDSInternalContext(l
->mds
), mdlog(l
), seq(s
) {}
579 void finish(int ret
) override
{
580 mdlog
->trim_expired_segments();
584 void MDLog::try_to_commit_open_file_table(uint64_t last_seq
)
586 ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex
));
588 if (mds_is_shutting_down
) // shutting down the MDS
591 if (mds
->mdcache
->open_file_table
.is_any_committing())
594 // when there have dirty items, maybe there has no any new log event
595 if (mds
->mdcache
->open_file_table
.is_any_dirty() ||
596 last_seq
> mds
->mdcache
->open_file_table
.get_committed_log_seq()) {
597 submit_mutex
.unlock();
598 mds
->mdcache
->open_file_table
.commit(new C_OFT_Committed(this, last_seq
),
599 last_seq
, CEPH_MSG_PRIO_HIGH
);
604 void MDLog::trim(int m
)
606 unsigned max_segments
= g_conf()->mds_log_max_segments
;
607 int max_events
= g_conf()->mds_log_max_events
;
611 if (mds
->mdcache
->is_readonly()) {
612 dout(10) << "trim, ignoring read-only FS" << dendl
;
616 // Clamp max_events to not be smaller than events per segment
617 if (max_events
> 0 && max_events
<= g_conf()->mds_log_events_per_segment
) {
618 max_events
= g_conf()->mds_log_events_per_segment
+ 1;
625 << segments
.size() << " / " << max_segments
<< " segments, "
626 << num_events
<< " / " << max_events
<< " events"
627 << ", " << expiring_segments
.size() << " (" << expiring_events
<< ") expiring"
628 << ", " << expired_segments
.size() << " (" << expired_events
<< ") expired"
631 if (segments
.empty()) {
632 submit_mutex
.unlock();
636 // hack: only trim for a few seconds at a time
637 utime_t stop
= ceph_clock_now();
640 int op_prio
= CEPH_MSG_PRIO_LOW
+
641 (CEPH_MSG_PRIO_HIGH
- CEPH_MSG_PRIO_LOW
) *
642 expiring_segments
.size() / max_segments
;
643 if (op_prio
> CEPH_MSG_PRIO_HIGH
)
644 op_prio
= CEPH_MSG_PRIO_HIGH
;
646 unsigned new_expiring_segments
= 0;
648 unsigned max_expiring_segments
= 0;
649 if (pre_segments_size
> 0){
650 max_expiring_segments
= max_segments
/2;
651 ceph_assert(segments
.size() >= pre_segments_size
);
652 max_expiring_segments
= std::max
<unsigned>(max_expiring_segments
,segments
.size() - pre_segments_size
);
655 map
<uint64_t,LogSegment
*>::iterator p
= segments
.begin();
656 while (p
!= segments
.end()) {
657 if (stop
< ceph_clock_now())
660 unsigned num_remaining_segments
= (segments
.size() - expired_segments
.size() - expiring_segments
.size());
661 if ((num_remaining_segments
<= max_segments
) &&
662 (max_events
< 0 || num_events
- expiring_events
- expired_events
<= max_events
))
665 // Do not trim too many segments at once for peak workload. If mds keeps creating N segments each tick,
666 // the upper bound of 'num_remaining_segments - max_segments' is '2 * N'
667 if (new_expiring_segments
* 2 > num_remaining_segments
)
670 if (max_expiring_segments
> 0 &&
671 expiring_segments
.size() >= max_expiring_segments
)
674 // look at first segment
675 LogSegment
*ls
= p
->second
;
679 if (pending_events
.count(ls
->seq
) ||
680 ls
->end
> safe_pos
) {
681 dout(5) << "trim segment " << ls
->seq
<< "/" << ls
->offset
<< ", not fully flushed yet, safe "
682 << journaler
->get_write_safe_pos() << " < end " << ls
->end
<< dendl
;
686 if (expiring_segments
.count(ls
)) {
687 dout(5) << "trim already expiring segment " << ls
->seq
<< "/" << ls
->offset
688 << ", " << ls
->num_events
<< " events" << dendl
;
689 } else if (expired_segments
.count(ls
)) {
690 dout(5) << "trim already expired segment " << ls
->seq
<< "/" << ls
->offset
691 << ", " << ls
->num_events
<< " events" << dendl
;
693 ceph_assert(expiring_segments
.count(ls
) == 0);
694 new_expiring_segments
++;
695 expiring_segments
.insert(ls
);
696 expiring_events
+= ls
->num_events
;
697 submit_mutex
.unlock();
699 uint64_t last_seq
= ls
->seq
;
700 try_expire(ls
, op_prio
);
703 p
= segments
.lower_bound(last_seq
+ 1);
707 try_to_commit_open_file_table(get_last_segment_seq());
709 // discard expired segments and unlock submit_mutex
710 _trim_expired_segments();
713 class C_MaybeExpiredSegment
: public MDSInternalContext
{
718 C_MaybeExpiredSegment(MDLog
*mdl
, LogSegment
*s
, int p
) :
719 MDSInternalContext(mdl
->mds
), mdlog(mdl
), ls(s
), op_prio(p
) {}
720 void finish(int res
) override
{
722 mdlog
->mds
->handle_write_error(res
);
723 mdlog
->_maybe_expired(ls
, op_prio
);
728 * Like MDLog::trim, but instead of trimming to max_segments, trim all but the latest
731 int MDLog::trim_all()
735 dout(10) << __func__
<< ": "
737 << "/" << expiring_segments
.size()
738 << "/" << expired_segments
.size() << dendl
;
740 uint64_t last_seq
= 0;
741 if (!segments
.empty()) {
742 last_seq
= get_last_segment_seq();
743 try_to_commit_open_file_table(last_seq
);
746 map
<uint64_t,LogSegment
*>::iterator p
= segments
.begin();
747 while (p
!= segments
.end() &&
748 p
->first
< last_seq
&&
749 p
->second
->end
< safe_pos
) { // next segment should have been started
750 LogSegment
*ls
= p
->second
;
753 // Caller should have flushed journaler before calling this
754 if (pending_events
.count(ls
->seq
)) {
755 dout(5) << __func__
<< ": segment " << ls
->seq
<< " has pending events" << dendl
;
756 submit_mutex
.unlock();
757 return -CEPHFS_EAGAIN
;
760 if (expiring_segments
.count(ls
)) {
761 dout(5) << "trim already expiring segment " << ls
->seq
<< "/" << ls
->offset
762 << ", " << ls
->num_events
<< " events" << dendl
;
763 } else if (expired_segments
.count(ls
)) {
764 dout(5) << "trim already expired segment " << ls
->seq
<< "/" << ls
->offset
765 << ", " << ls
->num_events
<< " events" << dendl
;
767 ceph_assert(expiring_segments
.count(ls
) == 0);
768 expiring_segments
.insert(ls
);
769 expiring_events
+= ls
->num_events
;
770 submit_mutex
.unlock();
772 uint64_t next_seq
= ls
->seq
+ 1;
773 try_expire(ls
, CEPH_MSG_PRIO_DEFAULT
);
776 p
= segments
.lower_bound(next_seq
);
780 _trim_expired_segments();
786 void MDLog::try_expire(LogSegment
*ls
, int op_prio
)
788 MDSGatherBuilder
gather_bld(g_ceph_context
);
789 ls
->try_to_expire(mds
, gather_bld
, op_prio
);
791 if (gather_bld
.has_subs()) {
792 dout(5) << "try_expire expiring segment " << ls
->seq
<< "/" << ls
->offset
<< dendl
;
793 gather_bld
.set_finisher(new C_MaybeExpiredSegment(this, ls
, op_prio
));
794 gather_bld
.activate();
796 dout(10) << "try_expire expired segment " << ls
->seq
<< "/" << ls
->offset
<< dendl
;
798 ceph_assert(expiring_segments
.count(ls
));
799 expiring_segments
.erase(ls
);
800 expiring_events
-= ls
->num_events
;
802 submit_mutex
.unlock();
805 logger
->set(l_mdl_segexg
, expiring_segments
.size());
806 logger
->set(l_mdl_evexg
, expiring_events
);
809 void MDLog::_maybe_expired(LogSegment
*ls
, int op_prio
)
811 if (mds
->mdcache
->is_readonly()) {
812 dout(10) << "_maybe_expired, ignoring read-only FS" << dendl
;
816 dout(10) << "_maybe_expired segment " << ls
->seq
<< "/" << ls
->offset
817 << ", " << ls
->num_events
<< " events" << dendl
;
818 try_expire(ls
, op_prio
);
821 void MDLog::_trim_expired_segments()
823 ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex
));
825 uint64_t oft_committed_seq
= mds
->mdcache
->open_file_table
.get_committed_log_seq();
827 // trim expired segments?
828 bool trimmed
= false;
829 while (!segments
.empty()) {
830 LogSegment
*ls
= segments
.begin()->second
;
831 if (!expired_segments
.count(ls
)) {
832 dout(10) << "_trim_expired_segments waiting for " << ls
->seq
<< "/" << ls
->offset
833 << " to expire" << dendl
;
837 if (!mds_is_shutting_down
&& ls
->seq
>= oft_committed_seq
) {
838 dout(10) << "_trim_expired_segments open file table committedseq " << oft_committed_seq
839 << " <= " << ls
->seq
<< "/" << ls
->offset
<< dendl
;
843 dout(10) << "_trim_expired_segments trimming expired "
844 << ls
->seq
<< "/0x" << std::hex
<< ls
->offset
<< std::dec
<< dendl
;
845 expired_events
-= ls
->num_events
;
846 expired_segments
.erase(ls
);
847 if (pre_segments_size
> 0)
849 num_events
-= ls
->num_events
;
851 // this was the oldest segment, adjust expire pos
852 if (journaler
->get_expire_pos() < ls
->end
) {
853 journaler
->set_expire_pos(ls
->end
);
854 logger
->set(l_mdl_expos
, ls
->end
);
856 logger
->set(l_mdl_expos
, ls
->offset
);
859 logger
->inc(l_mdl_segtrm
);
860 logger
->inc(l_mdl_evtrm
, ls
->num_events
);
862 segments
.erase(ls
->seq
);
867 submit_mutex
.unlock();
870 journaler
->write_head(0);
873 void MDLog::trim_expired_segments()
876 _trim_expired_segments();
879 void MDLog::_expired(LogSegment
*ls
)
881 ceph_assert(ceph_mutex_is_locked_by_me(submit_mutex
));
883 dout(5) << "_expired segment " << ls
->seq
<< "/" << ls
->offset
884 << ", " << ls
->num_events
<< " events" << dendl
;
886 if (!mds_is_shutting_down
&& ls
== peek_current_segment()) {
887 dout(5) << "_expired not expiring " << ls
->seq
<< "/" << ls
->offset
888 << ", last one and !mds_is_shutting_down" << dendl
;
891 expired_segments
.insert(ls
);
892 expired_events
+= ls
->num_events
;
894 // Trigger all waiters
895 finish_contexts(g_ceph_context
, ls
->expiry_waiters
);
897 logger
->inc(l_mdl_evex
, ls
->num_events
);
898 logger
->inc(l_mdl_segex
);
901 logger
->set(l_mdl_ev
, num_events
);
902 logger
->set(l_mdl_evexd
, expired_events
);
903 logger
->set(l_mdl_seg
, segments
.size());
904 logger
->set(l_mdl_segexd
, expired_segments
.size());
909 void MDLog::replay(MDSContext
*c
)
911 ceph_assert(journaler
->is_active());
912 ceph_assert(journaler
->is_readonly());
915 if (journaler
->get_read_pos() == journaler
->get_write_pos()) {
916 dout(10) << "replay - journal empty, done." << dendl
;
917 mds
->mdcache
->trim();
918 if (mds
->is_standby_replay())
919 mds
->update_mlogger();
928 waitfor_replay
.push_back(c
);
931 dout(10) << "replay start, from " << journaler
->get_read_pos()
932 << " to " << journaler
->get_write_pos() << dendl
;
934 ceph_assert(num_events
== 0 || already_replayed
);
935 if (already_replayed
) {
936 // Ensure previous instance of ReplayThread is joined before
937 // we create another one
938 replay_thread
.join();
940 already_replayed
= true;
942 replay_thread
.create("md_log_replay");
947 * Resolve the JournalPointer object to a journal file, and
948 * instantiate a Journaler object. This may re-write the journal
949 * if the journal in RADOS appears to be in an old format.
951 * This is a separate thread because of the way it is initialized from inside
952 * the mds lock, which is also the global objecter lock -- rather than split
953 * it up into hard-to-read async operations linked up by contexts,
955 * When this function completes, the `journaler` attribute will be set to
956 * a Journaler instance using the latest available serialization format.
958 void MDLog::_recovery_thread(MDSContext
*completion
)
960 ceph_assert(journaler
== NULL
);
961 if (g_conf()->mds_journal_format
> JOURNAL_FORMAT_MAX
) {
962 dout(0) << "Configuration value for mds_journal_format is out of bounds, max is "
963 << JOURNAL_FORMAT_MAX
<< dendl
;
965 // Oh dear, something unreadable in the store for this rank: require
966 // operator intervention.
967 mds
->damaged_unlocked();
968 ceph_abort(); // damaged should not return
971 // First, read the pointer object.
972 // If the pointer object is not present, then create it with
973 // front = default ino and back = null
974 JournalPointer
jp(mds
->get_nodeid(), mds
->get_metadata_pool());
975 const int read_result
= jp
.load(mds
->objecter
);
976 if (read_result
== -CEPHFS_ENOENT
) {
977 inodeno_t
const default_log_ino
= MDS_INO_LOG_OFFSET
+ mds
->get_nodeid();
978 jp
.front
= default_log_ino
;
979 int write_result
= jp
.save(mds
->objecter
);
980 if (write_result
< 0) {
981 std::lock_guard
l(mds
->mds_lock
);
982 if (mds
->is_daemon_stopping()) {
986 ceph_abort(); // damaged should never return
988 } else if (read_result
== -CEPHFS_EBLOCKLISTED
) {
989 derr
<< "Blocklisted during JournalPointer read! Respawning..." << dendl
;
991 ceph_abort(); // Should be unreachable because respawn calls execv
992 } else if (read_result
!= 0) {
993 mds
->clog
->error() << "failed to read JournalPointer: " << read_result
994 << " (" << cpp_strerror(read_result
) << ")";
995 mds
->damaged_unlocked();
996 ceph_abort(); // Should be unreachable because damaged() calls respawn()
999 // If the back pointer is non-null, that means that a journal
1000 // rewrite failed part way through. Erase the back journal
1003 if (mds
->is_standby_replay()) {
1004 dout(1) << "Journal " << jp
.front
<< " is being rewritten, "
1005 << "cannot replay in standby until an active MDS completes rewrite" << dendl
;
1006 std::lock_guard
l(mds
->mds_lock
);
1007 if (mds
->is_daemon_stopping()) {
1010 completion
->complete(-CEPHFS_EAGAIN
);
1013 dout(1) << "Erasing journal " << jp
.back
<< dendl
;
1014 C_SaferCond erase_waiter
;
1015 Journaler
back("mdlog", jp
.back
, mds
->get_metadata_pool(),
1016 CEPH_FS_ONDISK_MAGIC
, mds
->objecter
, logger
, l_mdl_jlat
,
1019 // Read all about this journal (header + extents)
1020 C_SaferCond recover_wait
;
1021 back
.recover(&recover_wait
);
1022 int recovery_result
= recover_wait
.wait();
1023 if (recovery_result
== -CEPHFS_EBLOCKLISTED
) {
1024 derr
<< "Blocklisted during journal recovery! Respawning..." << dendl
;
1026 ceph_abort(); // Should be unreachable because respawn calls execv
1027 } else if (recovery_result
!= 0) {
1028 // Journaler.recover succeeds if no journal objects are present: an error
1029 // means something worse like a corrupt header, which we can't handle here.
1030 mds
->clog
->error() << "Error recovering journal " << jp
.front
<< ": "
1031 << cpp_strerror(recovery_result
);
1032 mds
->damaged_unlocked();
1033 ceph_assert(recovery_result
== 0); // Unreachable because damaged() calls respawn()
1036 // We could read journal, so we can erase it.
1037 back
.erase(&erase_waiter
);
1038 int erase_result
= erase_waiter
.wait();
1040 // If we are successful, or find no data, we can update the JournalPointer to
1041 // reflect that the back journal is gone.
1042 if (erase_result
!= 0 && erase_result
!= -CEPHFS_ENOENT
) {
1043 derr
<< "Failed to erase journal " << jp
.back
<< ": " << cpp_strerror(erase_result
) << dendl
;
1045 dout(1) << "Successfully erased journal, updating journal pointer" << dendl
;
1047 int write_result
= jp
.save(mds
->objecter
);
1048 // Nothing graceful we can do for this
1049 ceph_assert(write_result
>= 0);
1053 /* Read the header from the front journal */
1054 Journaler
*front_journal
= new Journaler("mdlog", jp
.front
,
1055 mds
->get_metadata_pool(), CEPH_FS_ONDISK_MAGIC
, mds
->objecter
,
1056 logger
, l_mdl_jlat
, mds
->finisher
);
1058 // Assign to ::journaler so that we can be aborted by ::shutdown while
1059 // waiting for journaler recovery
1061 std::lock_guard
l(mds
->mds_lock
);
1062 journaler
= front_journal
;
1065 C_SaferCond recover_wait
;
1066 front_journal
->recover(&recover_wait
);
1067 dout(4) << "Waiting for journal " << jp
.front
<< " to recover..." << dendl
;
1068 int recovery_result
= recover_wait
.wait();
1069 if (recovery_result
== -CEPHFS_EBLOCKLISTED
) {
1070 derr
<< "Blocklisted during journal recovery! Respawning..." << dendl
;
1072 ceph_abort(); // Should be unreachable because respawn calls execv
1073 } else if (recovery_result
!= 0) {
1074 mds
->clog
->error() << "Error recovering journal " << jp
.front
<< ": "
1075 << cpp_strerror(recovery_result
);
1076 mds
->damaged_unlocked();
1077 ceph_assert(recovery_result
== 0); // Unreachable because damaged() calls respawn()
1079 dout(4) << "Journal " << jp
.front
<< " recovered." << dendl
;
1081 /* Check whether the front journal format is acceptable or needs re-write */
1082 if (front_journal
->get_stream_format() > JOURNAL_FORMAT_MAX
) {
1083 dout(0) << "Journal " << jp
.front
<< " is in unknown format " << front_journal
->get_stream_format()
1084 << ", does this MDS daemon require upgrade?" << dendl
;
1086 std::lock_guard
l(mds
->mds_lock
);
1087 if (mds
->is_daemon_stopping()) {
1089 delete front_journal
;
1092 completion
->complete(-CEPHFS_EINVAL
);
1094 } else if (mds
->is_standby_replay() || front_journal
->get_stream_format() >= g_conf()->mds_journal_format
) {
1095 /* The journal is of configured format, or we are in standbyreplay and will
1096 * tolerate replaying old journals until we have to go active. Use front_journal as
1097 * our journaler attribute and complete */
1098 dout(4) << "Recovered journal " << jp
.front
<< " in format " << front_journal
->get_stream_format() << dendl
;
1100 std::lock_guard
l(mds
->mds_lock
);
1101 journaler
->set_write_error_handler(new C_MDL_WriteError(this));
1102 if (mds
->is_daemon_stopping()) {
1105 completion
->complete(0);
1108 /* Hand off to reformat routine, which will ultimately set the
1109 * completion when it has done its thing */
1110 dout(1) << "Journal " << jp
.front
<< " has old format "
1111 << front_journal
->get_stream_format() << ", it will now be updated" << dendl
;
1112 _reformat_journal(jp
, front_journal
, completion
);
1117 * Blocking rewrite of the journal to a new file, followed by
1118 * swap of journal pointer to point to the new one.
1120 * We write the new journal to the 'back' journal from the JournalPointer,
1121 * swapping pointers to make that one the front journal only when we have
1124 void MDLog::_reformat_journal(JournalPointer
const &jp_in
, Journaler
*old_journal
, MDSContext
*completion
)
1126 ceph_assert(!jp_in
.is_null());
1127 ceph_assert(completion
!= NULL
);
1128 ceph_assert(old_journal
!= NULL
);
1130 JournalPointer jp
= jp_in
;
1132 /* Set JournalPointer.back to the location we will write the new journal */
1133 inodeno_t primary_ino
= MDS_INO_LOG_OFFSET
+ mds
->get_nodeid();
1134 inodeno_t secondary_ino
= MDS_INO_LOG_BACKUP_OFFSET
+ mds
->get_nodeid();
1135 jp
.back
= (jp
.front
== primary_ino
? secondary_ino
: primary_ino
);
1136 int write_result
= jp
.save(mds
->objecter
);
1137 ceph_assert(write_result
== 0);
1139 /* Create the new Journaler file */
1140 Journaler
*new_journal
= new Journaler("mdlog", jp
.back
,
1141 mds
->get_metadata_pool(), CEPH_FS_ONDISK_MAGIC
, mds
->objecter
, logger
, l_mdl_jlat
, mds
->finisher
);
1142 dout(4) << "Writing new journal header " << jp
.back
<< dendl
;
1143 file_layout_t new_layout
= old_journal
->get_layout();
1144 new_journal
->set_writeable();
1145 new_journal
->create(&new_layout
, g_conf()->mds_journal_format
);
1147 /* Write the new journal header to RADOS */
1148 C_SaferCond write_head_wait
;
1149 new_journal
->write_head(&write_head_wait
);
1150 write_head_wait
.wait();
1152 // Read in the old journal, and whenever we have readable events,
1153 // write them to the new journal.
1156 // In old format journals before event_seq was introduced, the serialized
1157 // offset of a SubtreeMap message in the log is used as the unique ID for
1158 // a log segment. Because we change serialization, this will end up changing
1159 // for us, so we have to explicitly update the fields that point back to that
1161 std::map
<LogSegment::seq_t
, LogSegment::seq_t
> segment_pos_rewrite
;
1163 // The logic in here borrowed from replay_thread expects mds_lock to be held,
1164 // e.g. between checking readable and doing wait_for_readable so that journaler
1165 // state doesn't change in between.
1166 uint32_t events_transcribed
= 0;
1168 old_journal
->check_isreadable();
1169 if (old_journal
->get_error()) {
1170 r
= old_journal
->get_error();
1171 dout(0) << "_replay journaler got error " << r
<< ", aborting" << dendl
;
1175 if (!old_journal
->is_readable() &&
1176 old_journal
->get_read_pos() == old_journal
->get_write_pos())
1179 // Read one serialized LogEvent
1180 ceph_assert(old_journal
->is_readable());
1182 uint64_t le_pos
= old_journal
->get_read_pos();
1183 bool r
= old_journal
->try_read_entry(bl
);
1184 if (!r
&& old_journal
->get_error())
1188 // Update segment_pos_rewrite
1189 auto le
= LogEvent::decode_event(bl
.cbegin());
1191 bool modified
= false;
1193 if (le
->get_type() == EVENT_SUBTREEMAP
||
1194 le
->get_type() == EVENT_RESETJOURNAL
) {
1195 auto sle
= dynamic_cast<ESubtreeMap
*>(le
.get());
1196 if (sle
== NULL
|| sle
->event_seq
== 0) {
1197 // A non-explicit event seq: the effective sequence number
1198 // of this segment is it's position in the old journal and
1199 // the new effective sequence number will be its position
1200 // in the new journal.
1201 segment_pos_rewrite
[le_pos
] = new_journal
->get_write_pos();
1202 dout(20) << __func__
<< " discovered segment seq mapping "
1203 << le_pos
<< " -> " << new_journal
->get_write_pos() << dendl
;
1209 // Rewrite segment references if necessary
1210 EMetaBlob
*blob
= le
->get_metablob();
1212 modified
= blob
->rewrite_truncate_finish(mds
, segment_pos_rewrite
);
1215 // Zero-out expire_pos in subtreemap because offsets have changed
1216 // (expire_pos is just an optimization so it's safe to eliminate it)
1217 if (le
->get_type() == EVENT_SUBTREEMAP
1218 || le
->get_type() == EVENT_SUBTREEMAP_TEST
) {
1219 auto& sle
= dynamic_cast<ESubtreeMap
&>(*le
);
1220 dout(20) << __func__
<< " zeroing expire_pos in subtreemap event at "
1221 << le_pos
<< " seq=" << sle
.event_seq
<< dendl
;
1228 le
->encode_with_header(bl
, mds
->mdsmap
->get_up_features());
1231 // Failure from LogEvent::decode, our job is to change the journal wrapper,
1232 // not validate the contents, so pass it through.
1233 dout(1) << __func__
<< " transcribing un-decodable LogEvent at old position "
1234 << old_journal
->get_read_pos() << ", new position " << new_journal
->get_write_pos()
1238 // Write (buffered, synchronous) one serialized LogEvent
1239 events_transcribed
+= 1;
1240 new_journal
->append_entry(bl
);
1243 dout(1) << "Transcribed " << events_transcribed
<< " events, flushing new journal" << dendl
;
1244 C_SaferCond flush_waiter
;
1245 new_journal
->flush(&flush_waiter
);
1246 flush_waiter
.wait();
1248 // If failed to rewrite journal, leave the part written journal
1249 // as garbage to be cleaned up next startup.
1250 ceph_assert(r
== 0);
1252 /* Now that the new journal is safe, we can flip the pointers */
1253 inodeno_t
const tmp
= jp
.front
;
1256 write_result
= jp
.save(mds
->objecter
);
1257 ceph_assert(write_result
== 0);
1259 /* Delete the old journal to free space */
1260 dout(1) << "New journal flushed, erasing old journal" << dendl
;
1261 C_SaferCond erase_waiter
;
1262 old_journal
->erase(&erase_waiter
);
1263 int erase_result
= erase_waiter
.wait();
1264 ceph_assert(erase_result
== 0);
1266 std::lock_guard
l(mds
->mds_lock
);
1267 if (mds
->is_daemon_stopping()) {
1271 ceph_assert(journaler
== old_journal
);
1275 /* Update the pointer to reflect we're back in clean single journal state. */
1277 write_result
= jp
.save(mds
->objecter
);
1278 ceph_assert(write_result
== 0);
1280 /* Reset the Journaler object to its default state */
1281 dout(1) << "Journal rewrite complete, continuing with normal startup" << dendl
;
1282 if (mds
->is_daemon_stopping()) {
1286 journaler
= new_journal
;
1287 journaler
->set_readonly();
1288 journaler
->set_write_error_handler(new C_MDL_WriteError(this));
1290 /* Trigger completion */
1291 if (mds
->is_daemon_stopping()) {
1294 completion
->complete(0);
1299 // i am a separate thread
1300 void MDLog::_replay_thread()
1302 dout(10) << "_replay_thread start" << dendl
;
1308 journaler
->check_isreadable();
1309 if (journaler
->get_error()) {
1310 r
= journaler
->get_error();
1311 dout(0) << "_replay journaler got error " << r
<< ", aborting" << dendl
;
1312 if (r
== -CEPHFS_ENOENT
) {
1313 if (mds
->is_standby_replay()) {
1314 // journal has been trimmed by somebody else
1317 mds
->clog
->error() << "missing journal object";
1318 mds
->damaged_unlocked();
1319 ceph_abort(); // Should be unreachable because damaged() calls respawn()
1321 } else if (r
== -CEPHFS_EINVAL
) {
1322 if (journaler
->get_read_pos() < journaler
->get_expire_pos()) {
1323 // this should only happen if you're following somebody else
1324 if(journaler
->is_readonly()) {
1325 dout(0) << "expire_pos is higher than read_pos, returning CEPHFS_EAGAIN" << dendl
;
1328 mds
->clog
->error() << "invalid journaler offsets";
1329 mds
->damaged_unlocked();
1330 ceph_abort(); // Should be unreachable because damaged() calls respawn()
1333 /* re-read head and check it
1334 * Given that replay happens in a separate thread and
1335 * the MDS is going to either shut down or restart when
1336 * we return this error, doing it synchronously is fine
1337 * -- as long as we drop the main mds lock--. */
1338 C_SaferCond reread_fin
;
1339 journaler
->reread_head(&reread_fin
);
1340 int err
= reread_fin
.wait();
1342 if (err
== -CEPHFS_ENOENT
&& mds
->is_standby_replay()) {
1344 dout(1) << "Journal header went away while in standby replay, journal rewritten?"
1348 dout(0) << "got error while reading head: " << cpp_strerror(err
)
1351 mds
->clog
->error() << "error reading journal header";
1352 mds
->damaged_unlocked();
1353 ceph_abort(); // Should be unreachable because damaged() calls
1357 standby_trim_segments();
1358 if (journaler
->get_read_pos() < journaler
->get_expire_pos()) {
1359 dout(0) << "expire_pos is higher than read_pos, returning CEPHFS_EAGAIN" << dendl
;
1367 if (!journaler
->is_readable() &&
1368 journaler
->get_read_pos() == journaler
->get_write_pos())
1371 ceph_assert(journaler
->is_readable() || mds
->is_daemon_stopping());
1374 uint64_t pos
= journaler
->get_read_pos();
1376 bool r
= journaler
->try_read_entry(bl
);
1377 if (!r
&& journaler
->get_error())
1382 auto le
= LogEvent::decode_event(bl
.cbegin());
1384 dout(0) << "_replay " << pos
<< "~" << bl
.length() << " / " << journaler
->get_write_pos()
1385 << " -- unable to decode event" << dendl
;
1386 dout(0) << "dump of unknown or corrupt event:\n";
1390 mds
->clog
->error() << "corrupt journal event at " << pos
<< "~"
1391 << bl
.length() << " / "
1392 << journaler
->get_write_pos();
1393 if (g_conf()->mds_log_skip_corrupt_events
) {
1396 mds
->damaged_unlocked();
1397 ceph_abort(); // Should be unreachable because damaged() calls
1402 le
->set_start_off(pos
);
1405 if (le
->get_type() == EVENT_SUBTREEMAP
||
1406 le
->get_type() == EVENT_RESETJOURNAL
) {
1407 auto sle
= dynamic_cast<ESubtreeMap
*>(le
.get());
1408 if (sle
&& sle
->event_seq
> 0)
1409 event_seq
= sle
->event_seq
;
1412 segments
[event_seq
] = new LogSegment(event_seq
, pos
);
1413 logger
->set(l_mdl_seg
, segments
.size());
1418 // have we seen an import map yet?
1419 if (segments
.empty()) {
1420 dout(10) << "_replay " << pos
<< "~" << bl
.length() << " / " << journaler
->get_write_pos()
1421 << " " << le
->get_stamp() << " -- waiting for subtree_map. (skipping " << *le
<< ")" << dendl
;
1423 dout(10) << "_replay " << pos
<< "~" << bl
.length() << " / " << journaler
->get_write_pos()
1424 << " " << le
->get_stamp() << ": " << *le
<< dendl
;
1425 le
->_segment
= get_current_segment(); // replay may need this
1426 le
->_segment
->num_events
++;
1427 le
->_segment
->end
= journaler
->get_read_pos();
1429 logger
->set(l_mdl_ev
, num_events
);
1432 std::lock_guard
l(mds
->mds_lock
);
1433 if (mds
->is_daemon_stopping()) {
1436 logger
->inc(l_mdl_replayed
);
1441 logger
->set(l_mdl_rdpos
, pos
);
1442 logger
->set(l_mdl_expos
, journaler
->get_expire_pos());
1443 logger
->set(l_mdl_wrpos
, journaler
->get_write_pos());
1448 ceph_assert(journaler
->get_read_pos() == journaler
->get_write_pos());
1449 dout(10) << "_replay - complete, " << num_events
1450 << " events" << dendl
;
1452 logger
->set(l_mdl_expos
, journaler
->get_expire_pos());
1455 safe_pos
= journaler
->get_write_safe_pos();
1457 dout(10) << "_replay_thread kicking waiters" << dendl
;
1459 std::lock_guard
l(mds
->mds_lock
);
1460 if (mds
->is_daemon_stopping()) {
1463 pre_segments_size
= segments
.size(); // get num of logs when replay is finished
1464 finish_contexts(g_ceph_context
, waitfor_replay
, r
);
1467 dout(10) << "_replay_thread finish" << dendl
;
1470 void MDLog::standby_trim_segments()
1472 dout(10) << "standby_trim_segments" << dendl
;
1473 uint64_t expire_pos
= journaler
->get_expire_pos();
1474 dout(10) << " expire_pos=" << expire_pos
<< dendl
;
1476 mds
->mdcache
->open_file_table
.trim_destroyed_inos(expire_pos
);
1478 bool removed_segment
= false;
1479 while (have_any_segments()) {
1480 LogSegment
*seg
= get_oldest_segment();
1481 dout(10) << " segment seq=" << seg
->seq
<< " " << seg
->offset
<<
1482 "~" << seg
->end
- seg
->offset
<< dendl
;
1484 if (seg
->end
> expire_pos
) {
1485 dout(10) << " won't remove, not expired!" << dendl
;
1489 if (segments
.size() == 1) {
1490 dout(10) << " won't remove, last segment!" << dendl
;
1494 dout(10) << " removing segment" << dendl
;
1495 mds
->mdcache
->standby_trim_segment(seg
);
1496 remove_oldest_segment();
1497 if (pre_segments_size
> 0) {
1498 --pre_segments_size
;
1500 removed_segment
= true;
1503 if (removed_segment
) {
1504 dout(20) << " calling mdcache->trim!" << dendl
;
1505 mds
->mdcache
->trim();
1507 dout(20) << " removed no segments!" << dendl
;
1511 void MDLog::dump_replay_status(Formatter
*f
) const
1513 f
->open_object_section("replay_status");
1514 f
->dump_unsigned("journal_read_pos", journaler
? journaler
->get_read_pos() : 0);
1515 f
->dump_unsigned("journal_write_pos", journaler
? journaler
->get_write_pos() : 0);
1516 f
->dump_unsigned("journal_expire_pos", journaler
? journaler
->get_expire_pos() : 0);
1517 f
->dump_unsigned("num_events", get_num_events());
1518 f
->dump_unsigned("num_segments", get_num_segments());