1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2015 Red Hat
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #include <boost/utility/string_view.hpp>
17 #include "common/debug.h"
18 #include "common/errno.h"
20 #include "messages/MClientRequestForward.h"
21 #include "messages/MMDSLoadTargets.h"
22 #include "messages/MMDSMap.h"
23 #include "messages/MMDSTableRequest.h"
24 #include "messages/MCommand.h"
25 #include "messages/MCommandReply.h"
27 #include "MDSDaemon.h"
29 #include "SnapClient.h"
30 #include "SnapServer.h"
31 #include "MDBalancer.h"
35 #include "mon/MonClient.h"
36 #include "common/HeartbeatMap.h"
37 #include "ScrubStack.h"
42 #define dout_context g_ceph_context
43 #define dout_subsys ceph_subsys_mds
45 #define dout_prefix *_dout << "mds." << whoami << '.' << incarnation << ' '
47 class C_Flush_Journal
: public MDSInternalContext
{
49 C_Flush_Journal(MDCache
*mdcache
, MDLog
*mdlog
, MDSRank
*mds
,
50 std::ostream
*ss
, Context
*on_finish
)
51 : MDSInternalContext(mds
),
52 mdcache(mdcache
), mdlog(mdlog
), ss(ss
), on_finish(on_finish
),
53 whoami(mds
->whoami
), incarnation(mds
->incarnation
) {
57 assert(mds
->mds_lock
.is_locked());
59 dout(20) << __func__
<< dendl
;
61 if (mdcache
->is_readonly()) {
62 dout(5) << __func__
<< ": read-only FS" << dendl
;
67 if (!mds
->is_active()) {
68 dout(5) << __func__
<< ": MDS not active, no-op" << dendl
;
79 dout(20) << __func__
<< dendl
;
81 // I need to seal off the current segment, and then mark all
82 // previous segments for expiry
83 mdlog
->start_new_segment();
85 Context
*ctx
= new FunctionContext([this](int r
) {
86 handle_flush_mdlog(r
);
89 // Flush initially so that all the segments older than our new one
90 // will be elegible for expiry
92 mdlog
->wait_for_safe(new MDSInternalContextWrapper(mds
, ctx
));
95 void handle_flush_mdlog(int r
) {
96 dout(20) << __func__
<< ": r=" << r
<< dendl
;
99 *ss
<< "Error " << r
<< " (" << cpp_strerror(r
) << ") while flushing journal";
108 dout(20) << __func__
<< dendl
;
110 Context
*ctx
= new FunctionContext([this](int r
) {
111 handle_clear_mdlog(r
);
114 // Because we may not be the last wait_for_safe context on MDLog,
115 // and subsequent contexts might wake up in the middle of our
116 // later trim_all and interfere with expiry (by e.g. marking
117 // dirs/dentries dirty on previous log segments), we run a second
118 // wait_for_safe here. See #10368
119 mdlog
->wait_for_safe(new MDSInternalContextWrapper(mds
, ctx
));
122 void handle_clear_mdlog(int r
) {
123 dout(20) << __func__
<< ": r=" << r
<< dendl
;
126 *ss
<< "Error " << r
<< " (" << cpp_strerror(r
) << ") while flushing journal";
135 // Put all the old log segments into expiring or expired state
136 dout(5) << __func__
<< ": beginning segment expiry" << dendl
;
138 int ret
= mdlog
->trim_all();
140 *ss
<< "Error " << ret
<< " (" << cpp_strerror(ret
) << ") while trimming log";
148 void expire_segments() {
149 dout(20) << __func__
<< dendl
;
151 // Attach contexts to wait for all expiring segments to expire
152 MDSGatherBuilder
*expiry_gather
= new MDSGatherBuilder(g_ceph_context
);
154 const auto &expiring_segments
= mdlog
->get_expiring_segments();
155 for (auto p
: expiring_segments
) {
156 p
->wait_for_expiry(expiry_gather
->new_sub());
158 dout(5) << __func__
<< ": waiting for " << expiry_gather
->num_subs_created()
159 << " segments to expire" << dendl
;
161 if (!expiry_gather
->has_subs()) {
163 delete expiry_gather
;
167 Context
*ctx
= new FunctionContext([this](int r
) {
168 handle_expire_segments(r
);
170 expiry_gather
->set_finisher(new MDSInternalContextWrapper(mds
, ctx
));
171 expiry_gather
->activate();
174 void handle_expire_segments(int r
) {
175 dout(20) << __func__
<< ": r=" << r
<< dendl
;
177 ceph_assert(r
== 0); // MDLog is not allowed to raise errors via
182 void trim_segments() {
183 dout(20) << __func__
<< dendl
;
185 Context
*ctx
= new C_OnFinisher(new FunctionContext([this](int _
) {
186 Mutex::Locker
locker(mds
->mds_lock
);
187 trim_expired_segments();
192 void trim_expired_segments() {
193 dout(5) << __func__
<< ": expiry complete, expire_pos/trim_pos is now "
194 << std::hex
<< mdlog
->get_journaler()->get_expire_pos() << "/"
195 << mdlog
->get_journaler()->get_trimmed_pos() << dendl
;
197 // Now everyone I'm interested in is expired
198 mdlog
->trim_expired_segments();
200 dout(5) << __func__
<< ": trim complete, expire_pos/trim_pos is now "
201 << std::hex
<< mdlog
->get_journaler()->get_expire_pos() << "/"
202 << mdlog
->get_journaler()->get_trimmed_pos() << dendl
;
204 write_journal_head();
207 void write_journal_head() {
208 dout(20) << __func__
<< dendl
;
210 Context
*ctx
= new FunctionContext([this](int r
) {
211 Mutex::Locker
locker(mds
->mds_lock
);
212 handle_write_head(r
);
214 // Flush the journal header so that readers will start from after
215 // the flushed region
216 mdlog
->get_journaler()->write_head(ctx
);
219 void handle_write_head(int r
) {
221 *ss
<< "Error " << r
<< " (" << cpp_strerror(r
) << ") while writing header";
223 dout(5) << __func__
<< ": write_head complete, all done!" << dendl
;
229 void finish(int r
) override
{
230 dout(20) << __func__
<< ": r=" << r
<< dendl
;
231 on_finish
->complete(r
);
244 class C_Drop_Cache
: public MDSInternalContext
{
246 C_Drop_Cache(Server
*server
, MDCache
*mdcache
, MDLog
*mdlog
,
247 MDSRank
*mds
, uint64_t recall_timeout
,
248 Formatter
*f
, Context
*on_finish
)
249 : MDSInternalContext(mds
),
250 server(server
), mdcache(mdcache
), mdlog(mdlog
),
251 recall_timeout(recall_timeout
), f(f
), on_finish(on_finish
),
252 whoami(mds
->whoami
), incarnation(mds
->incarnation
) {
256 // not really a hard requirement here, but lets ensure this in
257 // case we change the logic here.
258 assert(mds
->mds_lock
.is_locked());
260 dout(20) << __func__
<< dendl
;
261 recall_client_state();
265 // context which completes itself (with -ETIMEDOUT) after a specified
266 // timeout or when explicitly completed, whichever comes first. Note
267 // that the context does not detroy itself after completion -- it
268 // needs to be explicitly freed.
269 class C_ContextTimeout
: public MDSInternalContext
{
271 C_ContextTimeout(MDSRank
*mds
, uint64_t timeout
, Context
*on_finish
)
272 : MDSInternalContext(mds
),
274 lock("mds::context::timeout", false, true),
275 on_finish(on_finish
) {
277 ~C_ContextTimeout() {
278 ceph_assert(timer_task
== nullptr);
286 timer_task
= new FunctionContext([this](int _
) {
287 timer_task
= nullptr;
288 complete(-ETIMEDOUT
);
290 mds
->timer
.add_event_after(timeout
, timer_task
);
293 void finish(int r
) override
{
294 Context
*ctx
= nullptr;
296 Mutex::Locker
locker(lock
);
297 std::swap(on_finish
, ctx
);
299 if (ctx
!= nullptr) {
303 void complete(int r
) override
{
304 if (timer_task
!= nullptr) {
305 mds
->timer
.cancel_event(timer_task
);
313 Context
*on_finish
= nullptr;
314 Context
*timer_task
= nullptr;
317 void recall_client_state() {
318 dout(20) << __func__
<< dendl
;
320 f
->open_object_section("result");
322 MDSGatherBuilder
*gather
= new MDSGatherBuilder(g_ceph_context
);
323 server
->recall_client_state(1.0, true, gather
);
324 if (!gather
->has_subs()) {
325 handle_recall_client_state(0);
330 C_ContextTimeout
*ctx
= new C_ContextTimeout(
331 mds
, recall_timeout
, new FunctionContext([this](int r
) {
332 handle_recall_client_state(r
);
336 gather
->set_finisher(new MDSInternalContextWrapper(mds
, ctx
));
340 void handle_recall_client_state(int r
) {
341 dout(20) << __func__
<< ": r=" << r
<< dendl
;
343 // client recall section
344 f
->open_object_section("client_recall");
345 f
->dump_int("return_code", r
);
346 f
->dump_string("message", cpp_strerror(r
));
349 // we can still continue after recall timeout
354 dout(20) << __func__
<< dendl
;
356 if (!mdcache
->trim(UINT64_MAX
)) {
357 cmd_err(f
, "failed to trim cache");
365 void flush_journal() {
366 dout(20) << __func__
<< dendl
;
368 Context
*ctx
= new FunctionContext([this](int r
) {
369 handle_flush_journal(r
);
372 C_Flush_Journal
*flush_journal
= new C_Flush_Journal(mdcache
, mdlog
, mds
, &ss
, ctx
);
373 flush_journal
->send();
376 void handle_flush_journal(int r
) {
377 dout(20) << __func__
<< ": r=" << r
<< dendl
;
380 cmd_err(f
, ss
.str());
385 // journal flush section
386 f
->open_object_section("flush_journal");
387 f
->dump_int("return_code", r
);
388 f
->dump_string("message", ss
.str());
394 void cache_status() {
395 dout(20) << __func__
<< dendl
;
397 // cache status section
398 mdcache
->cache_status(f
);
404 void finish(int r
) override
{
405 dout(20) << __func__
<< ": r=" << r
<< dendl
;
407 on_finish
->complete(r
);
413 uint64_t recall_timeout
;
418 std::stringstream ss
;
424 void cmd_err(Formatter
*f
, boost::string_view err
) {
426 f
->open_object_section("result");
427 f
->dump_string("error", err
);
435 LogChannelRef
&clog_
,
441 Context
*respawn_hook_
,
442 Context
*suicide_hook_
)
444 whoami(whoami_
), incarnation(0),
445 mds_lock(mds_lock_
), cct(msgr
->cct
), clog(clog_
), timer(timer_
),
447 objecter(new Objecter(g_ceph_context
, msgr
, monc_
, nullptr, 0, 0)),
448 server(NULL
), mdcache(NULL
), locker(NULL
), mdlog(NULL
),
449 balancer(NULL
), scrubstack(NULL
),
450 damage_table(whoami_
),
451 inotable(NULL
), snapserver(NULL
), snapclient(NULL
),
452 sessionmap(this), logger(NULL
), mlogger(NULL
),
453 op_tracker(g_ceph_context
, g_conf
->mds_enable_op_tracker
,
454 g_conf
->osd_num_op_tracker_shard
),
455 last_state(MDSMap::STATE_BOOT
),
456 state(MDSMap::STATE_BOOT
),
457 cluster_degraded(false), stopping(false),
458 purge_queue(g_ceph_context
, whoami_
,
459 mdsmap_
->get_metadata_pool(), objecter
,
462 // Purge Queue operates inside mds_lock when we're calling into
463 // it, and outside when in background, so must handle both cases.
464 if (mds_lock
.is_locked_by_me()) {
465 handle_write_error(r
);
467 Mutex::Locker
l(mds_lock
);
468 handle_write_error(r
);
473 progress_thread(this), dispatch_depth(0),
474 hb(NULL
), last_tid(0), osd_epoch_barrier(0), beacon(beacon_
),
475 mds_slow_req_count(0),
476 last_client_mdsmap_bcast(0),
477 messenger(msgr
), monc(monc_
),
478 respawn_hook(respawn_hook_
),
479 suicide_hook(suicide_hook_
),
480 standby_replaying(false),
481 starttime(mono_clock::now())
483 hb
= g_ceph_context
->get_heartbeat_map()->add_worker("MDSRank", pthread_self());
485 purge_queue
.update_op_limit(*mdsmap
);
487 objecter
->unset_honor_osdmap_full();
489 finisher
= new Finisher(cct
);
491 mdcache
= new MDCache(this, purge_queue
);
492 mdlog
= new MDLog(this);
493 balancer
= new MDBalancer(this, messenger
, monc
);
495 scrubstack
= new ScrubStack(mdcache
, finisher
);
497 inotable
= new InoTable(this);
498 snapserver
= new SnapServer(this, monc
);
499 snapclient
= new SnapClient(this);
501 server
= new Server(this);
502 locker
= new Locker(this, mdcache
);
504 op_tracker
.set_complaint_and_threshold(cct
->_conf
->mds_op_complaint_time
,
505 cct
->_conf
->mds_op_log_threshold
);
506 op_tracker
.set_history_size_and_duration(cct
->_conf
->mds_op_history_size
,
507 cct
->_conf
->mds_op_history_duration
);
513 g_ceph_context
->get_heartbeat_map()->remove_worker(hb
);
516 if (scrubstack
) { delete scrubstack
; scrubstack
= NULL
; }
517 if (mdcache
) { delete mdcache
; mdcache
= NULL
; }
518 if (mdlog
) { delete mdlog
; mdlog
= NULL
; }
519 if (balancer
) { delete balancer
; balancer
= NULL
; }
520 if (inotable
) { delete inotable
; inotable
= NULL
; }
521 if (snapserver
) { delete snapserver
; snapserver
= NULL
; }
522 if (snapclient
) { delete snapclient
; snapclient
= NULL
; }
523 if (mdsmap
) { delete mdsmap
; mdsmap
= 0; }
525 if (server
) { delete server
; server
= 0; }
526 if (locker
) { delete locker
; locker
= 0; }
529 g_ceph_context
->get_perfcounters_collection()->remove(logger
);
534 g_ceph_context
->get_perfcounters_collection()->remove(mlogger
);
552 void MDSRankDispatcher::init()
555 messenger
->add_dispatcher_head(objecter
);
562 // Expose the OSDMap (already populated during MDS::init) to anyone
563 // who is interested in it.
566 progress_thread
.create("mds_rank_progr");
573 void MDSRank::update_targets(utime_t now
)
575 // get MonMap's idea of my export_targets
576 const set
<mds_rank_t
>& map_targets
= mdsmap
->get_mds_info(get_nodeid()).export_targets
;
578 dout(20) << "updating export targets, currently " << map_targets
.size() << " ranks are targets" << dendl
;
581 set
<mds_rank_t
> new_map_targets
;
583 auto it
= export_targets
.begin();
584 while (it
!= export_targets
.end()) {
585 mds_rank_t rank
= it
->first
;
586 double val
= it
->second
.get(now
);
587 dout(20) << "export target mds." << rank
<< " value is " << val
<< " @ " << now
<< dendl
;
590 dout(15) << "export target mds." << rank
<< " is no longer an export target" << dendl
;
591 export_targets
.erase(it
++);
595 if (!map_targets
.count(rank
)) {
596 dout(15) << "export target mds." << rank
<< " not in map's export_targets" << dendl
;
599 new_map_targets
.insert(rank
);
602 if (new_map_targets
.size() < map_targets
.size()) {
603 dout(15) << "export target map holds stale targets, sending update" << dendl
;
608 dout(15) << "updating export_targets, now " << new_map_targets
.size() << " ranks are targets" << dendl
;
609 MMDSLoadTargets
* m
= new MMDSLoadTargets(mds_gid_t(monc
->get_global_id()), new_map_targets
);
610 monc
->send_mon_message(m
);
614 void MDSRank::hit_export_target(utime_t now
, mds_rank_t rank
, double amount
)
616 double rate
= g_conf
->mds_bal_target_decay
;
618 amount
= 100.0/g_conf
->mds_bal_target_decay
; /* a good default for "i am trying to keep this export_target active" */
620 auto em
= export_targets
.emplace(std::piecewise_construct
, std::forward_as_tuple(rank
), std::forward_as_tuple(now
, DecayRate(rate
)));
622 dout(15) << "hit export target (new) " << amount
<< " @ " << now
<< dendl
;
624 dout(15) << "hit export target " << amount
<< " @ " << now
<< dendl
;
626 em
.first
->second
.hit(now
, amount
);
629 void MDSRankDispatcher::tick()
633 if (beacon
.is_laggy()) {
634 dout(1) << "skipping upkeep work because connection to Monitors appears laggy" << dendl
;
638 check_ops_in_flight();
640 // Wake up thread in case we use to be laggy and have waiting_for_nolaggy
641 // messages to progress.
642 progress_thread
.signal();
644 // make sure mds log flushes, trims periodically
647 // update average session uptime
648 sessionmap
.update_average_session_age();
650 if (is_active() || is_stopping()) {
652 mdcache
->trim_client_leases();
653 mdcache
->check_memory_usage();
654 mdlog
->trim(); // NOT during recovery!
659 logger
->set(l_mds_subtrees
, mdcache
->num_subtrees());
665 if (is_clientreplay() || is_active() || is_stopping()) {
666 server
->find_idle_sessions();
667 server
->evict_cap_revoke_non_responders();
672 server
->reconnect_tick();
676 mdcache
->find_stale_fragment_freeze();
677 mdcache
->migrator
->find_stale_export_freeze();
679 snapserver
->check_osd_map(false);
682 if (is_active() || is_stopping()) {
683 update_targets(ceph_clock_now());
689 if (mdcache
->shutdown_pass()) {
690 uint64_t pq_progress
= 0 ;
691 uint64_t pq_total
= 0;
692 size_t pq_in_flight
= 0;
693 if (!purge_queue
.drain(&pq_progress
, &pq_total
, &pq_in_flight
)) {
694 dout(7) << "shutdown_pass=true, but still waiting for purge queue"
696 // This takes unbounded time, so we must indicate progress
697 // to the administrator: we do it in a slightly imperfect way
698 // by sending periodic (tick frequency) clog messages while
700 clog
->info() << "MDS rank " << whoami
<< " waiting for purge queue ("
701 << std::dec
<< pq_progress
<< "/" << pq_total
<< " " << pq_in_flight
702 << " files purging" << ")";
704 dout(7) << "shutdown_pass=true, finished w/ shutdown, moving to "
705 "down:stopped" << dendl
;
710 dout(7) << "shutdown_pass=false" << dendl
;
714 // Expose ourselves to Beacon to update health indicators
715 beacon
.notify_health(this);
718 void MDSRankDispatcher::shutdown()
720 // It should never be possible for shutdown to get called twice, because
721 // anyone picking up mds_lock checks if stopping is true and drops
723 assert(stopping
== false);
726 dout(1) << __func__
<< ": shutting down rank " << whoami
<< dendl
;
730 // MDLog has to shut down before the finisher, because some of its
731 // threads block on IOs that require finisher to complete.
737 purge_queue
.shutdown();
740 finisher
->stop(); // no flushing
743 if (objecter
->initialized
)
744 objecter
->shutdown();
748 op_tracker
.on_shutdown();
750 progress_thread
.shutdown();
752 // release mds_lock for finisher/messenger threads (e.g.
753 // MDSDaemon::ms_handle_reset called from Messenger).
756 // shut down messenger
757 messenger
->shutdown();
761 // Workaround unclean shutdown: HeartbeatMap will assert if
762 // worker is not removed (as we do in ~MDS), but ~MDS is not
763 // always called after suicide.
765 g_ceph_context
->get_heartbeat_map()->remove_worker(hb
);
771 * Helper for simple callbacks that call a void fn with no args.
773 class C_MDS_VoidFn
: public MDSInternalContext
775 typedef void (MDSRank::*fn_ptr
)();
779 C_MDS_VoidFn(MDSRank
*mds_
, fn_ptr fn_
)
780 : MDSInternalContext(mds_
), fn(fn_
)
786 void finish(int r
) override
792 int64_t MDSRank::get_metadata_pool()
794 return mdsmap
->get_metadata_pool();
797 MDSTableClient
*MDSRank::get_table_client(int t
)
800 case TABLE_ANCHOR
: return NULL
;
801 case TABLE_SNAP
: return snapclient
;
802 default: ceph_abort();
806 MDSTableServer
*MDSRank::get_table_server(int t
)
809 case TABLE_ANCHOR
: return NULL
;
810 case TABLE_SNAP
: return snapserver
;
811 default: ceph_abort();
815 void MDSRank::suicide()
818 suicide_hook
->complete(0);
823 void MDSRank::respawn()
826 respawn_hook
->complete(0);
831 void MDSRank::damaged()
833 assert(whoami
!= MDS_RANK_NONE
);
834 assert(mds_lock
.is_locked_by_me());
836 beacon
.set_want_state(mdsmap
, MDSMap::STATE_DAMAGED
);
837 monc
->flush_log(); // Flush any clog error from before we were called
838 beacon
.notify_health(this); // Include latest status in our swan song
839 beacon
.send_and_wait(g_conf
->mds_mon_shutdown_timeout
);
841 // It's okay if we timed out and the mon didn't get our beacon, because
842 // another daemon (or ourselves after respawn) will eventually take the
843 // rank and report DAMAGED again when it hits same problem we did.
845 respawn(); // Respawn into standby in case mon has other work for us
848 void MDSRank::damaged_unlocked()
850 Mutex::Locker
l(mds_lock
);
854 void MDSRank::handle_write_error(int err
)
856 if (err
== -EBLACKLISTED
) {
857 derr
<< "we have been blacklisted (fenced), respawning..." << dendl
;
862 if (g_conf
->mds_action_on_write_error
>= 2) {
863 derr
<< "unhandled write error " << cpp_strerror(err
) << ", suicide..." << dendl
;
865 } else if (g_conf
->mds_action_on_write_error
== 1) {
866 derr
<< "unhandled write error " << cpp_strerror(err
) << ", force readonly..." << dendl
;
867 mdcache
->force_readonly();
870 derr
<< "unhandled write error " << cpp_strerror(err
) << ", ignore..." << dendl
;
874 void *MDSRank::ProgressThread::entry()
876 Mutex::Locker
l(mds
->mds_lock
);
878 while (!mds
->stopping
&&
879 mds
->finished_queue
.empty() &&
880 (mds
->waiting_for_nolaggy
.empty() || mds
->beacon
.is_laggy())) {
881 cond
.Wait(mds
->mds_lock
);
888 mds
->_advance_queues();
895 void MDSRank::ProgressThread::shutdown()
897 assert(mds
->mds_lock
.is_locked_by_me());
898 assert(mds
->stopping
);
901 // Stopping is set, we will fall out of our main loop naturally
903 // Kick the thread to notice mds->stopping, and join it
905 mds
->mds_lock
.Unlock();
908 mds
->mds_lock
.Lock();
912 bool MDSRankDispatcher::ms_dispatch(Message
*m
)
914 if (m
->get_source().is_client()) {
915 Session
*session
= static_cast<Session
*>(m
->get_connection()->get_priv());
917 session
->last_seen
= Session::clock::now();
920 inc_dispatch_depth();
921 bool ret
= _dispatch(m
, true);
922 dec_dispatch_depth();
926 /* If this function returns true, it recognizes the message and has taken the
927 * reference. If it returns false, it has done neither. */
928 bool MDSRank::_dispatch(Message
*m
, bool new_msg
)
930 if (is_stale_message(m
)) {
935 if (beacon
.is_laggy()) {
936 dout(5) << " laggy, deferring " << *m
<< dendl
;
937 waiting_for_nolaggy
.push_back(m
);
938 } else if (new_msg
&& !waiting_for_nolaggy
.empty()) {
939 dout(5) << " there are deferred messages, deferring " << *m
<< dendl
;
940 waiting_for_nolaggy
.push_back(m
);
942 if (!handle_deferrable_message(m
)) {
943 dout(0) << "unrecognized message " << *m
<< dendl
;
950 if (dispatch_depth
> 1)
953 // finish any triggered contexts
956 if (beacon
.is_laggy()) {
957 // We've gone laggy during dispatch, don't do any
962 // done with all client replayed requests?
963 if (is_clientreplay() &&
964 mdcache
->is_open() &&
965 replay_queue
.empty() &&
966 beacon
.get_want_state() == MDSMap::STATE_CLIENTREPLAY
) {
967 int num_requests
= mdcache
->get_num_client_requests();
968 dout(10) << " still have " << num_requests
<< " active replay requests" << dendl
;
969 if (num_requests
== 0)
973 // hack: thrash exports
974 static utime_t start
;
975 utime_t now
= ceph_clock_now();
976 if (start
== utime_t())
978 /*double el = now - start;
981 for (int i
=0; i
<g_conf
->mds_thrash_exports
; i
++) {
983 if (!is_active()) break;
984 mdsmap
->get_mds_set(s
, MDSMap::STATE_ACTIVE
);
985 if (s
.size() < 2 || CInode::count() < 10)
986 break; // need peers for this to work.
987 if (mdcache
->migrator
->get_num_exporting() > g_conf
->mds_thrash_exports
* 5 ||
988 mdcache
->migrator
->get_export_queue_size() > g_conf
->mds_thrash_exports
* 10)
991 dout(7) << "mds thrashing exports pass " << (i
+1) << "/" << g_conf
->mds_thrash_exports
<< dendl
;
993 // pick a random dir inode
994 CInode
*in
= mdcache
->hack_pick_random_inode();
997 in
->get_dirfrags(ls
);
998 if (!ls
.empty()) { // must be an open dir.
999 list
<CDir
*>::iterator p
= ls
.begin();
1000 int n
= rand() % ls
.size();
1004 if (!dir
->get_parent_dir()) continue; // must be linked.
1005 if (!dir
->is_auth()) continue; // must be auth.
1009 int k
= rand() % s
.size();
1010 set
<mds_rank_t
>::iterator p
= s
.begin();
1013 } while (dest
== whoami
);
1014 mdcache
->migrator
->export_dir_nicely(dir
,dest
);
1017 // hack: thrash fragments
1018 for (int i
=0; i
<g_conf
->mds_thrash_fragments
; i
++) {
1019 if (!is_active()) break;
1020 if (mdcache
->get_num_fragmenting_dirs() > 5 * g_conf
->mds_thrash_fragments
) break;
1021 dout(7) << "mds thrashing fragments pass " << (i
+1) << "/" << g_conf
->mds_thrash_fragments
<< dendl
;
1023 // pick a random dir inode
1024 CInode
*in
= mdcache
->hack_pick_random_inode();
1027 in
->get_dirfrags(ls
);
1028 if (ls
.empty()) continue; // must be an open dir.
1029 CDir
*dir
= ls
.front();
1030 if (!dir
->get_parent_dir()) continue; // must be linked.
1031 if (!dir
->is_auth()) continue; // must be auth.
1032 frag_t fg
= dir
->get_frag();
1033 if (mdsmap
->allows_dirfrags()) {
1034 if ((fg
== frag_t() || (rand() % (1 << fg
.bits()) == 0))) {
1035 mdcache
->split_dir(dir
, 1);
1037 balancer
->queue_merge(dir
);
1042 // hack: force hash root?
1045 mdcache->get_root() &&
1046 mdcache->get_root()->dir &&
1047 !(mdcache->get_root()->dir->is_hashed() ||
1048 mdcache->get_root()->dir->is_hashing())) {
1049 dout(0) << "hashing root" << dendl;
1050 mdcache->migrator->hash_dir(mdcache->get_root()->dir);
1058 void MDSRank::update_mlogger()
1061 mlogger
->set(l_mdm_ino
, CInode::count());
1062 mlogger
->set(l_mdm_dir
, CDir::count());
1063 mlogger
->set(l_mdm_dn
, CDentry::count());
1064 mlogger
->set(l_mdm_cap
, Capability::count());
1065 mlogger
->set(l_mdm_inoa
, CInode::increments());
1066 mlogger
->set(l_mdm_inos
, CInode::decrements());
1067 mlogger
->set(l_mdm_dira
, CDir::increments());
1068 mlogger
->set(l_mdm_dirs
, CDir::decrements());
1069 mlogger
->set(l_mdm_dna
, CDentry::increments());
1070 mlogger
->set(l_mdm_dns
, CDentry::decrements());
1071 mlogger
->set(l_mdm_capa
, Capability::increments());
1072 mlogger
->set(l_mdm_caps
, Capability::decrements());
1073 mlogger
->set(l_mdm_buf
, buffer::get_total_alloc());
1078 * lower priority messages we defer if we seem laggy
1080 bool MDSRank::handle_deferrable_message(Message
*m
)
1082 int port
= m
->get_type() & 0xff00;
1085 case MDS_PORT_CACHE
:
1086 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS
);
1087 mdcache
->dispatch(m
);
1090 case MDS_PORT_MIGRATOR
:
1091 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS
);
1092 mdcache
->migrator
->dispatch(m
);
1096 switch (m
->get_type()) {
1098 case CEPH_MSG_CLIENT_SESSION
:
1099 case CEPH_MSG_CLIENT_RECONNECT
:
1100 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_CLIENT
);
1102 case CEPH_MSG_CLIENT_REQUEST
:
1103 server
->dispatch(m
);
1105 case MSG_MDS_SLAVE_REQUEST
:
1106 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS
);
1107 server
->dispatch(m
);
1110 case MSG_MDS_HEARTBEAT
:
1111 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS
);
1112 balancer
->proc_message(m
);
1115 case MSG_MDS_TABLE_REQUEST
:
1116 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS
);
1118 MMDSTableRequest
*req
= static_cast<MMDSTableRequest
*>(m
);
1120 MDSTableClient
*client
= get_table_client(req
->table
);
1121 client
->handle_request(req
);
1123 MDSTableServer
*server
= get_table_server(req
->table
);
1124 server
->handle_request(req
);
1130 case MSG_MDS_INODEFILECAPS
:
1131 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MDS
);
1132 locker
->dispatch(m
);
1135 case CEPH_MSG_CLIENT_CAPS
:
1136 case CEPH_MSG_CLIENT_CAPRELEASE
:
1137 case CEPH_MSG_CLIENT_LEASE
:
1138 ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_CLIENT
);
1139 locker
->dispatch(m
);
1151 * Advance finished_queue and waiting_for_nolaggy.
1153 * Usually drain both queues, but may not drain waiting_for_nolaggy
1154 * if beacon is currently laggy.
1156 void MDSRank::_advance_queues()
1158 assert(mds_lock
.is_locked_by_me());
1160 while (!finished_queue
.empty()) {
1161 dout(7) << "mds has " << finished_queue
.size() << " queued contexts" << dendl
;
1162 dout(10) << finished_queue
<< dendl
;
1163 list
<MDSInternalContextBase
*> ls
;
1164 ls
.swap(finished_queue
);
1165 while (!ls
.empty()) {
1166 dout(10) << " finish " << ls
.front() << dendl
;
1167 ls
.front()->complete(0);
1174 while (!waiting_for_nolaggy
.empty()) {
1175 // stop if we're laggy now!
1176 if (beacon
.is_laggy())
1179 Message
*old
= waiting_for_nolaggy
.front();
1180 waiting_for_nolaggy
.pop_front();
1182 if (is_stale_message(old
)) {
1185 dout(7) << " processing laggy deferred " << *old
<< dendl
;
1186 if (!handle_deferrable_message(old
)) {
1187 dout(0) << "unrecognized message " << *old
<< dendl
;
1197 * Call this when you take mds_lock, or periodically if you're going to
1198 * hold the lock for a long time (e.g. iterating over clients/inodes)
1200 void MDSRank::heartbeat_reset()
1202 // Any thread might jump into mds_lock and call us immediately
1203 // after a call to suicide() completes, in which case MDSRank::hb
1204 // has been freed and we are a no-op.
1210 // NB not enabling suicide grace, because the mon takes care of killing us
1211 // (by blacklisting us) when we fail to send beacons, and it's simpler to
1212 // only have one way of dying.
1213 auto grace
= g_conf
->get_val
<double>("mds_heartbeat_grace");
1214 g_ceph_context
->get_heartbeat_map()->reset_timeout(hb
, grace
, 0);
1217 bool MDSRank::is_stale_message(Message
*m
) const
1220 if (m
->get_source().is_mds()) {
1221 mds_rank_t from
= mds_rank_t(m
->get_source().num());
1222 if (!mdsmap
->have_inst(from
) ||
1223 mdsmap
->get_inst(from
) != m
->get_source_inst() ||
1224 mdsmap
->is_down(from
)) {
1226 if (m
->get_type() == CEPH_MSG_MDS_MAP
) {
1227 dout(5) << "got " << *m
<< " from old/bad/imposter mds " << m
->get_source()
1228 << ", but it's an mdsmap, looking at it" << dendl
;
1229 } else if (m
->get_type() == MSG_MDS_CACHEEXPIRE
&&
1230 mdsmap
->get_inst(from
) == m
->get_source_inst()) {
1231 dout(5) << "got " << *m
<< " from down mds " << m
->get_source()
1232 << ", but it's a cache_expire, looking at it" << dendl
;
1234 dout(5) << "got " << *m
<< " from down/old/bad/imposter mds " << m
->get_source()
1235 << ", dropping" << dendl
;
1243 Session
*MDSRank::get_session(Message
*m
)
1245 Session
*session
= static_cast<Session
*>(m
->get_connection()->get_priv());
1247 session
->put(); // do not carry ref
1248 dout(20) << "get_session have " << session
<< " " << session
->info
.inst
1249 << " state " << session
->get_state_name() << dendl
;
1250 // Check if we've imported an open session since (new sessions start closed)
1251 if (session
->is_closed()) {
1252 Session
*imported_session
= sessionmap
.get_session(session
->info
.inst
.name
);
1253 if (imported_session
&& imported_session
!= session
) {
1254 dout(10) << __func__
<< " replacing connection bootstrap session " << session
<< " with imported session " << imported_session
<< dendl
;
1255 imported_session
->info
.auth_name
= session
->info
.auth_name
;
1256 //assert(session->info.auth_name == imported_session->info.auth_name);
1257 assert(session
->info
.inst
== imported_session
->info
.inst
);
1258 imported_session
->connection
= session
->connection
;
1259 // send out any queued messages
1260 while (!session
->preopen_out_queue
.empty()) {
1261 imported_session
->connection
->send_message(session
->preopen_out_queue
.front());
1262 session
->preopen_out_queue
.pop_front();
1264 imported_session
->auth_caps
= session
->auth_caps
;
1265 assert(session
->get_nref() == 1);
1266 imported_session
->connection
->set_priv(imported_session
->get());
1267 imported_session
->last_seen
= session
->last_seen
;
1268 session
= imported_session
;
1272 dout(20) << "get_session dne for " << m
->get_source_inst() << dendl
;
1277 void MDSRank::send_message(Message
*m
, Connection
*c
)
1284 void MDSRank::send_message_mds(Message
*m
, mds_rank_t mds
)
1286 if (!mdsmap
->is_up(mds
)) {
1287 dout(10) << "send_message_mds mds." << mds
<< " not up, dropping " << *m
<< dendl
;
1292 // send mdsmap first?
1293 if (mds
!= whoami
&& peer_mdsmap_epoch
[mds
] < mdsmap
->get_epoch()) {
1294 messenger
->send_message(new MMDSMap(monc
->get_fsid(), mdsmap
),
1295 mdsmap
->get_inst(mds
));
1296 peer_mdsmap_epoch
[mds
] = mdsmap
->get_epoch();
1300 messenger
->send_message(m
, mdsmap
->get_inst(mds
));
1303 void MDSRank::forward_message_mds(Message
*m
, mds_rank_t mds
)
1305 assert(mds
!= whoami
);
1308 if (m
->get_type() == CEPH_MSG_CLIENT_REQUEST
&&
1309 (static_cast<MClientRequest
*>(m
))->get_source().is_client()) {
1310 MClientRequest
*creq
= static_cast<MClientRequest
*>(m
);
1311 creq
->inc_num_fwd(); // inc forward counter
1314 * don't actually forward if non-idempotent!
1315 * client has to do it. although the MDS will ignore duplicate requests,
1316 * the affected metadata may migrate, in which case the new authority
1317 * won't have the metareq_id in the completed request map.
1319 // NEW: always make the client resend!
1320 bool client_must_resend
= true; //!creq->can_forward();
1322 // tell the client where it should go
1323 messenger
->send_message(new MClientRequestForward(creq
->get_tid(), mds
, creq
->get_num_fwd(),
1324 client_must_resend
),
1325 creq
->get_source_inst());
1327 if (client_must_resend
) {
1333 // these are the only types of messages we should be 'forwarding'; they
1334 // explicitly encode their source mds, which gets clobbered when we resend
1336 assert(m
->get_type() == MSG_MDS_DIRUPDATE
||
1337 m
->get_type() == MSG_MDS_EXPORTDIRDISCOVER
);
1339 // send mdsmap first?
1340 if (peer_mdsmap_epoch
[mds
] < mdsmap
->get_epoch()) {
1341 messenger
->send_message(new MMDSMap(monc
->get_fsid(), mdsmap
),
1342 mdsmap
->get_inst(mds
));
1343 peer_mdsmap_epoch
[mds
] = mdsmap
->get_epoch();
1346 messenger
->send_message(m
, mdsmap
->get_inst(mds
));
1351 void MDSRank::send_message_client_counted(Message
*m
, client_t client
)
1353 Session
*session
= sessionmap
.get_session(entity_name_t::CLIENT(client
.v
));
1355 send_message_client_counted(m
, session
);
1357 dout(10) << "send_message_client_counted no session for client." << client
<< " " << *m
<< dendl
;
1361 void MDSRank::send_message_client_counted(Message
*m
, Connection
*connection
)
1363 Session
*session
= static_cast<Session
*>(connection
->get_priv());
1365 session
->put(); // do not carry ref
1366 send_message_client_counted(m
, session
);
1368 dout(10) << "send_message_client_counted has no session for " << m
->get_source_inst() << dendl
;
1369 // another Connection took over the Session
1373 void MDSRank::send_message_client_counted(Message
*m
, Session
*session
)
1375 version_t seq
= session
->inc_push_seq();
1376 dout(10) << "send_message_client_counted " << session
->info
.inst
.name
<< " seq "
1377 << seq
<< " " << *m
<< dendl
;
1378 if (session
->connection
) {
1379 session
->connection
->send_message(m
);
1381 session
->preopen_out_queue
.push_back(m
);
1385 void MDSRank::send_message_client(Message
*m
, Session
*session
)
1387 dout(10) << "send_message_client " << session
->info
.inst
<< " " << *m
<< dendl
;
1388 if (session
->connection
) {
1389 session
->connection
->send_message(m
);
1391 session
->preopen_out_queue
.push_back(m
);
1396 * This is used whenever a RADOS operation has been cancelled
1397 * or a RADOS client has been blacklisted, to cause the MDS and
1398 * any clients to wait for this OSD epoch before using any new caps.
1400 * See doc/cephfs/eviction
1402 void MDSRank::set_osd_epoch_barrier(epoch_t e
)
1404 dout(4) << __func__
<< ": epoch=" << e
<< dendl
;
1405 osd_epoch_barrier
= e
;
1408 void MDSRank::retry_dispatch(Message
*m
)
1410 inc_dispatch_depth();
1411 _dispatch(m
, false);
1412 dec_dispatch_depth();
1415 double MDSRank::get_dispatch_queue_max_age(utime_t now
) const
1417 return messenger
->get_dispatch_queue_max_age(now
);
1420 bool MDSRank::is_daemon_stopping() const
1425 void MDSRank::request_state(MDSMap::DaemonState s
)
1427 dout(3) << "request_state " << ceph_mds_state_name(s
) << dendl
;
1428 beacon
.set_want_state(mdsmap
, s
);
1433 class C_MDS_BootStart
: public MDSInternalContext
{
1434 MDSRank::BootStep nextstep
;
1436 C_MDS_BootStart(MDSRank
*m
, MDSRank::BootStep n
)
1437 : MDSInternalContext(m
), nextstep(n
) {}
1438 void finish(int r
) override
{
1439 mds
->boot_start(nextstep
, r
);
1444 void MDSRank::boot_start(BootStep step
, int r
)
1446 // Handle errors from previous step
1448 if (is_standby_replay() && (r
== -EAGAIN
)) {
1449 dout(0) << "boot_start encountered an error EAGAIN"
1450 << ", respawning since we fell behind journal" << dendl
;
1452 } else if (r
== -EINVAL
|| r
== -ENOENT
) {
1453 // Invalid or absent data, indicates damaged on-disk structures
1454 clog
->error() << "Error loading MDS rank " << whoami
<< ": "
1457 assert(r
== 0); // Unreachable, damaged() calls respawn()
1458 } else if (r
== -EROFS
) {
1459 dout(0) << "boot error forcing transition to read-only; MDS will try to continue" << dendl
;
1461 // Completely unexpected error, give up and die
1462 dout(0) << "boot_start encountered an error, failing" << dendl
;
1468 assert(is_starting() || is_any_replay());
1471 case MDS_BOOT_INITIAL
:
1473 mdcache
->init_layouts();
1475 MDSGatherBuilder
gather(g_ceph_context
,
1476 new C_MDS_BootStart(this, MDS_BOOT_OPEN_ROOT
));
1477 dout(2) << "boot_start " << step
<< ": opening inotable" << dendl
;
1478 inotable
->set_rank(whoami
);
1479 inotable
->load(gather
.new_sub());
1481 dout(2) << "boot_start " << step
<< ": opening sessionmap" << dendl
;
1482 sessionmap
.set_rank(whoami
);
1483 sessionmap
.load(gather
.new_sub());
1485 dout(2) << "boot_start " << step
<< ": opening mds log" << dendl
;
1486 mdlog
->open(gather
.new_sub());
1488 if (is_starting()) {
1489 dout(2) << "boot_start " << step
<< ": opening purge queue" << dendl
;
1490 purge_queue
.open(new C_IO_Wrapper(this, gather
.new_sub()));
1491 } else if (!standby_replaying
) {
1492 dout(2) << "boot_start " << step
<< ": opening purge queue (async)" << dendl
;
1493 purge_queue
.open(NULL
);
1496 if (mdsmap
->get_tableserver() == whoami
) {
1497 dout(2) << "boot_start " << step
<< ": opening snap table" << dendl
;
1498 snapserver
->set_rank(whoami
);
1499 snapserver
->load(gather
.new_sub());
1505 case MDS_BOOT_OPEN_ROOT
:
1507 dout(2) << "boot_start " << step
<< ": loading/discovering base inodes" << dendl
;
1509 MDSGatherBuilder
gather(g_ceph_context
,
1510 new C_MDS_BootStart(this, MDS_BOOT_PREPARE_LOG
));
1512 if (is_starting()) {
1513 // load mydir frag for the first log segment (creating subtree map)
1514 mdcache
->open_mydir_frag(gather
.new_sub());
1516 mdcache
->open_mydir_inode(gather
.new_sub());
1519 if (whoami
== mdsmap
->get_root()) { // load root inode off disk if we are auth
1520 mdcache
->open_root_inode(gather
.new_sub());
1521 } else if (is_any_replay()) {
1522 // replay. make up fake root inode to start with
1523 mdcache
->create_root_inode();
1528 case MDS_BOOT_PREPARE_LOG
:
1529 if (is_any_replay()) {
1530 dout(2) << "boot_start " << step
<< ": replaying mds log" << dendl
;
1531 MDSGatherBuilder
gather(g_ceph_context
,
1532 new C_MDS_BootStart(this, MDS_BOOT_REPLAY_DONE
));
1534 if (!standby_replaying
) {
1535 dout(2) << "boot_start " << step
<< ": waiting for purge queue recovered" << dendl
;
1536 purge_queue
.wait_for_recovery(new C_IO_Wrapper(this, gather
.new_sub()));
1539 mdlog
->replay(gather
.new_sub());
1542 dout(2) << "boot_start " << step
<< ": positioning at end of old mds log" << dendl
;
1547 case MDS_BOOT_REPLAY_DONE
:
1548 assert(is_any_replay());
1550 // Sessiontable and inotable should be in sync after replay, validate
1551 // that they are consistent.
1552 validate_sessions();
1559 void MDSRank::validate_sessions()
1561 assert(mds_lock
.is_locked_by_me());
1564 // Identify any sessions which have state inconsistent with other,
1565 // after they have been loaded from rados during startup.
1566 // Mitigate bugs like: http://tracker.ceph.com/issues/16842
1567 const auto &sessions
= sessionmap
.get_sessions();
1568 for (const auto &i
: sessions
) {
1569 Session
*session
= i
.second
;
1570 interval_set
<inodeno_t
> badones
;
1571 if (inotable
->intersects_free(session
->info
.prealloc_inos
, &badones
)) {
1572 clog
->error() << "client " << *session
1573 << "loaded with preallocated inodes that are inconsistent with inotable";
1584 void MDSRank::starting_done()
1586 dout(3) << "starting_done" << dendl
;
1587 assert(is_starting());
1588 request_state(MDSMap::STATE_ACTIVE
);
1590 mdlog
->start_new_segment();
1594 void MDSRank::calc_recovery_set()
1596 // initialize gather sets
1598 mdsmap
->get_recovery_mds_set(rs
);
1600 mdcache
->set_recovery_set(rs
);
1602 dout(1) << " recovery set is " << rs
<< dendl
;
1606 void MDSRank::replay_start()
1608 dout(1) << "replay_start" << dendl
;
1610 if (is_standby_replay())
1611 standby_replaying
= true;
1613 calc_recovery_set();
1615 // Check if we need to wait for a newer OSD map before starting
1616 Context
*fin
= new C_IO_Wrapper(this, new C_MDS_BootStart(this, MDS_BOOT_INITIAL
));
1617 bool const ready
= objecter
->wait_for_map(
1618 mdsmap
->get_last_failure_osd_epoch(),
1625 dout(1) << " waiting for osdmap " << mdsmap
->get_last_failure_osd_epoch()
1626 << " (which blacklists prior instance)" << dendl
;
1631 class MDSRank::C_MDS_StandbyReplayRestartFinish
: public MDSIOContext
{
1632 uint64_t old_read_pos
;
1634 C_MDS_StandbyReplayRestartFinish(MDSRank
*mds_
, uint64_t old_read_pos_
) :
1635 MDSIOContext(mds_
), old_read_pos(old_read_pos_
) {}
1636 void finish(int r
) override
{
1637 mds
->_standby_replay_restart_finish(r
, old_read_pos
);
1639 void print(ostream
& out
) const override
{
1640 out
<< "standby_replay_restart";
1644 void MDSRank::_standby_replay_restart_finish(int r
, uint64_t old_read_pos
)
1646 if (old_read_pos
< mdlog
->get_journaler()->get_trimmed_pos()) {
1647 dout(0) << "standby MDS fell behind active MDS journal's expire_pos, restarting" << dendl
;
1648 respawn(); /* we're too far back, and this is easier than
1649 trying to reset everything in the cache, etc */
1651 mdlog
->standby_trim_segments();
1652 boot_start(MDS_BOOT_PREPARE_LOG
, r
);
1656 class MDSRank::C_MDS_StandbyReplayRestart
: public MDSInternalContext
{
1658 explicit C_MDS_StandbyReplayRestart(MDSRank
*m
) : MDSInternalContext(m
) {}
1659 void finish(int r
) override
{
1661 mds
->standby_replay_restart();
1665 void MDSRank::standby_replay_restart()
1667 if (standby_replaying
) {
1668 /* Go around for another pass of replaying in standby */
1669 dout(5) << "Restarting replay as standby-replay" << dendl
;
1670 mdlog
->get_journaler()->reread_head_and_probe(
1671 new C_MDS_StandbyReplayRestartFinish(
1673 mdlog
->get_journaler()->get_read_pos()));
1675 /* We are transitioning out of standby: wait for OSD map update
1676 before making final pass */
1677 dout(1) << "standby_replay_restart (final takeover pass)" << dendl
;
1678 Context
*fin
= new C_IO_Wrapper(this, new C_MDS_StandbyReplayRestart(this));
1679 bool ready
= objecter
->wait_for_map(mdsmap
->get_last_failure_osd_epoch(), fin
);
1682 mdlog
->get_journaler()->reread_head_and_probe(
1683 new C_MDS_StandbyReplayRestartFinish(
1685 mdlog
->get_journaler()->get_read_pos()));
1687 dout(1) << " opening purge queue (async)" << dendl
;
1688 purge_queue
.open(NULL
);
1690 dout(1) << " waiting for osdmap " << mdsmap
->get_last_failure_osd_epoch()
1691 << " (which blacklists prior instance)" << dendl
;
1696 void MDSRank::replay_done()
1698 if (!standby_replaying
) {
1699 dout(1) << "Finished replaying journal" << dendl
;
1701 dout(5) << "Finished replaying journal as standby-replay" << dendl
;
1704 if (is_standby_replay()) {
1705 // The replay was done in standby state, and we are still in that state
1706 assert(standby_replaying
);
1707 dout(10) << "setting replay timer" << dendl
;
1708 timer
.add_event_after(g_conf
->mds_replay_interval
,
1709 new C_MDS_StandbyReplayRestart(this));
1711 } else if (standby_replaying
) {
1712 // The replay was done in standby state, we have now _left_ that state
1713 dout(10) << " last replay pass was as a standby; making final pass" << dendl
;
1714 standby_replaying
= false;
1715 standby_replay_restart();
1718 // Replay is complete, journal read should be up to date
1719 assert(mdlog
->get_journaler()->get_read_pos() == mdlog
->get_journaler()->get_write_pos());
1720 assert(!is_standby_replay());
1722 // Reformat and come back here
1723 if (mdlog
->get_journaler()->get_stream_format() < g_conf
->mds_journal_format
) {
1724 dout(4) << "reformatting journal on standby-replay->replay transition" << dendl
;
1725 mdlog
->reopen(new C_MDS_BootStart(this, MDS_BOOT_REPLAY_DONE
));
1730 dout(1) << "making mds journal writeable" << dendl
;
1731 mdlog
->get_journaler()->set_writeable();
1732 mdlog
->get_journaler()->trim_tail();
1734 if (g_conf
->mds_wipe_sessions
) {
1735 dout(1) << "wiping out client sessions" << dendl
;
1737 sessionmap
.save(new C_MDSInternalNoop
);
1739 if (g_conf
->mds_wipe_ino_prealloc
) {
1740 dout(1) << "wiping out ino prealloc from sessions" << dendl
;
1741 sessionmap
.wipe_ino_prealloc();
1742 sessionmap
.save(new C_MDSInternalNoop
);
1744 if (g_conf
->mds_skip_ino
) {
1745 inodeno_t i
= g_conf
->mds_skip_ino
;
1746 dout(1) << "skipping " << i
<< " inodes" << dendl
;
1747 inotable
->skip_inos(i
);
1748 inotable
->save(new C_MDSInternalNoop
);
1751 if (mdsmap
->get_num_in_mds() == 1 &&
1752 mdsmap
->get_num_failed_mds() == 0) { // just me!
1753 dout(2) << "i am alone, moving to state reconnect" << dendl
;
1754 request_state(MDSMap::STATE_RECONNECT
);
1756 dout(2) << "i am not alone, moving to state resolve" << dendl
;
1757 request_state(MDSMap::STATE_RESOLVE
);
1761 void MDSRank::reopen_log()
1763 dout(1) << "reopen_log" << dendl
;
1764 mdcache
->rollback_uncommitted_fragments();
1768 void MDSRank::resolve_start()
1770 dout(1) << "resolve_start" << dendl
;
1774 mdcache
->resolve_start(new C_MDS_VoidFn(this, &MDSRank::resolve_done
));
1775 finish_contexts(g_ceph_context
, waiting_for_resolve
);
1777 void MDSRank::resolve_done()
1779 dout(1) << "resolve_done" << dendl
;
1780 request_state(MDSMap::STATE_RECONNECT
);
1783 void MDSRank::reconnect_start()
1785 dout(1) << "reconnect_start" << dendl
;
1787 if (last_state
== MDSMap::STATE_REPLAY
) {
1791 // Drop any blacklisted clients from the SessionMap before going
1792 // into reconnect, so that we don't wait for them.
1793 objecter
->enable_blacklist_events();
1794 std::set
<entity_addr_t
> blacklist
;
1796 objecter
->with_osdmap([this, &blacklist
, &epoch
](const OSDMap
& o
) {
1797 o
.get_blacklist(&blacklist
);
1798 epoch
= o
.get_epoch();
1800 auto killed
= server
->apply_blacklist(blacklist
);
1801 dout(4) << "reconnect_start: killed " << killed
<< " blacklisted sessions ("
1802 << blacklist
.size() << " blacklist entries, "
1803 << sessionmap
.get_sessions().size() << ")" << dendl
;
1805 set_osd_epoch_barrier(epoch
);
1808 server
->reconnect_clients(new C_MDS_VoidFn(this, &MDSRank::reconnect_done
));
1809 finish_contexts(g_ceph_context
, waiting_for_reconnect
);
1811 void MDSRank::reconnect_done()
1813 dout(1) << "reconnect_done" << dendl
;
1814 request_state(MDSMap::STATE_REJOIN
); // move to rejoin state
1817 void MDSRank::rejoin_joint_start()
1819 dout(1) << "rejoin_joint_start" << dendl
;
1820 mdcache
->rejoin_send_rejoins();
1822 void MDSRank::rejoin_start()
1824 dout(1) << "rejoin_start" << dendl
;
1825 mdcache
->rejoin_start(new C_MDS_VoidFn(this, &MDSRank::rejoin_done
));
1827 void MDSRank::rejoin_done()
1829 dout(1) << "rejoin_done" << dendl
;
1830 mdcache
->show_subtrees();
1831 mdcache
->show_cache();
1833 // funny case: is our cache empty? no subtrees?
1834 if (!mdcache
->is_subtrees()) {
1836 // The root should always have a subtree!
1837 clog
->error() << "No subtrees found for root MDS rank!";
1839 assert(mdcache
->is_subtrees());
1841 dout(1) << " empty cache, no subtrees, leaving cluster" << dendl
;
1842 request_state(MDSMap::STATE_STOPPED
);
1847 if (replay_queue
.empty())
1848 request_state(MDSMap::STATE_ACTIVE
);
1850 request_state(MDSMap::STATE_CLIENTREPLAY
);
1853 void MDSRank::clientreplay_start()
1855 dout(1) << "clientreplay_start" << dendl
;
1856 finish_contexts(g_ceph_context
, waiting_for_replay
); // kick waiters
1857 mdcache
->start_files_to_recover();
1861 bool MDSRank::queue_one_replay()
1863 if (replay_queue
.empty()) {
1864 mdlog
->wait_for_safe(new C_MDS_VoidFn(this, &MDSRank::clientreplay_done
));
1867 queue_waiter(replay_queue
.front());
1868 replay_queue
.pop_front();
1872 void MDSRank::clientreplay_done()
1874 dout(1) << "clientreplay_done" << dendl
;
1875 request_state(MDSMap::STATE_ACTIVE
);
1878 void MDSRank::active_start()
1880 dout(1) << "active_start" << dendl
;
1882 if (last_state
== MDSMap::STATE_CREATING
||
1883 last_state
== MDSMap::STATE_STARTING
) {
1884 mdcache
->open_root();
1887 mdcache
->clean_open_file_lists();
1888 mdcache
->export_remaining_imported_caps();
1889 finish_contexts(g_ceph_context
, waiting_for_replay
); // kick waiters
1890 mdcache
->start_files_to_recover();
1892 mdcache
->reissue_all_caps();
1893 mdcache
->activate_stray_manager();
1895 finish_contexts(g_ceph_context
, waiting_for_active
); // kick waiters
1898 void MDSRank::recovery_done(int oldstate
)
1900 dout(1) << "recovery_done -- successful recovery!" << dendl
;
1901 assert(is_clientreplay() || is_active());
1903 // kick snaptable (resent AGREEs)
1904 if (mdsmap
->get_tableserver() == whoami
) {
1905 set
<mds_rank_t
> active
;
1906 mdsmap
->get_mds_set_lower_bound(active
, MDSMap::STATE_CLIENTREPLAY
);
1907 snapserver
->finish_recovery(active
);
1910 if (oldstate
== MDSMap::STATE_CREATING
)
1913 mdcache
->start_recovered_truncates();
1914 mdcache
->do_file_recover();
1916 // tell connected clients
1917 //bcast_mds_map(); // not anymore, they get this from the monitor
1919 mdcache
->populate_mydir();
1922 void MDSRank::creating_done()
1924 dout(1)<< "creating_done" << dendl
;
1925 request_state(MDSMap::STATE_ACTIVE
);
1928 void MDSRank::boot_create()
1930 dout(3) << "boot_create" << dendl
;
1932 MDSGatherBuilder
fin(g_ceph_context
, new C_MDS_VoidFn(this, &MDSRank::creating_done
));
1934 mdcache
->init_layouts();
1936 snapserver
->set_rank(whoami
);
1937 inotable
->set_rank(whoami
);
1938 sessionmap
.set_rank(whoami
);
1940 // start with a fresh journal
1941 dout(10) << "boot_create creating fresh journal" << dendl
;
1942 mdlog
->create(fin
.new_sub());
1944 // open new journal segment, but do not journal subtree map (yet)
1945 mdlog
->prepare_new_segment();
1947 if (whoami
== mdsmap
->get_root()) {
1948 dout(3) << "boot_create creating fresh hierarchy" << dendl
;
1949 mdcache
->create_empty_hierarchy(fin
.get());
1952 dout(3) << "boot_create creating mydir hierarchy" << dendl
;
1953 mdcache
->create_mydir_hierarchy(fin
.get());
1955 // fixme: fake out inotable (reset, pretend loaded)
1956 dout(10) << "boot_create creating fresh inotable table" << dendl
;
1958 inotable
->save(fin
.new_sub());
1960 // write empty sessionmap
1961 sessionmap
.save(fin
.new_sub());
1963 // Create empty purge queue
1964 purge_queue
.create(new C_IO_Wrapper(this, fin
.new_sub()));
1966 // initialize tables
1967 if (mdsmap
->get_tableserver() == whoami
) {
1968 dout(10) << "boot_create creating fresh snaptable" << dendl
;
1969 snapserver
->reset();
1970 snapserver
->save(fin
.new_sub());
1973 assert(g_conf
->mds_kill_create_at
!= 1);
1975 // ok now journal it
1976 mdlog
->journal_segment_subtree_map(fin
.new_sub());
1979 // Usually we do this during reconnect, but creation skips that.
1980 objecter
->enable_blacklist_events();
1985 void MDSRank::stopping_start()
1987 dout(2) << "stopping_start" << dendl
;
1989 if (mdsmap
->get_num_in_mds() == 1 && !sessionmap
.empty()) {
1990 // we're the only mds up!
1991 dout(0) << "we are the last MDS, and have mounted clients: we cannot flush our journal. suicide!" << dendl
;
1995 mdcache
->shutdown_start();
1998 void MDSRank::stopping_done()
2000 dout(2) << "stopping_done" << dendl
;
2002 // tell monitor we shut down cleanly.
2003 request_state(MDSMap::STATE_STOPPED
);
2006 void MDSRankDispatcher::handle_mds_map(
2010 // I am only to be passed MDSMaps in which I hold a rank
2011 assert(whoami
!= MDS_RANK_NONE
);
2013 MDSMap::DaemonState oldstate
= state
;
2014 mds_gid_t mds_gid
= mds_gid_t(monc
->get_global_id());
2015 state
= mdsmap
->get_state_gid(mds_gid
);
2016 if (state
!= oldstate
) {
2017 last_state
= oldstate
;
2018 incarnation
= mdsmap
->get_inc_gid(mds_gid
);
2021 version_t epoch
= m
->get_epoch();
2023 // note source's map version
2024 if (m
->get_source().is_mds() &&
2025 peer_mdsmap_epoch
[mds_rank_t(m
->get_source().num())] < epoch
) {
2026 dout(15) << " peer " << m
->get_source()
2027 << " has mdsmap epoch >= " << epoch
2029 peer_mdsmap_epoch
[mds_rank_t(m
->get_source().num())] = epoch
;
2032 // Validate state transitions while I hold a rank
2033 if (!MDSMap::state_transition_valid(oldstate
, state
)) {
2034 derr
<< "Invalid state transition " << ceph_mds_state_name(oldstate
)
2035 << "->" << ceph_mds_state_name(state
) << dendl
;
2039 if (oldstate
!= state
) {
2040 // update messenger.
2041 if (state
== MDSMap::STATE_STANDBY_REPLAY
) {
2042 dout(1) << "handle_mds_map i am now mds." << mds_gid
<< "." << incarnation
2043 << " replaying mds." << whoami
<< "." << incarnation
<< dendl
;
2044 messenger
->set_myname(entity_name_t::MDS(mds_gid
));
2046 dout(1) << "handle_mds_map i am now mds." << whoami
<< "." << incarnation
<< dendl
;
2047 messenger
->set_myname(entity_name_t::MDS(whoami
));
2051 // tell objecter my incarnation
2052 if (objecter
->get_client_incarnation() != incarnation
)
2053 objecter
->set_client_incarnation(incarnation
);
2056 if (g_conf
->mds_dump_cache_on_map
)
2057 mdcache
->dump_cache();
2059 cluster_degraded
= mdsmap
->is_degraded();
2061 // mdsmap and oldmap can be discontinuous. failover might happen in the missing mdsmap.
2062 // the 'restart' set tracks ranks that have restarted since the old mdsmap
2063 set
<mds_rank_t
> restart
;
2064 // replaying mds does not communicate with other ranks
2065 if (state
>= MDSMap::STATE_RESOLVE
) {
2066 // did someone fail?
2068 set
<mds_rank_t
> olddown
, down
;
2069 oldmap
->get_down_mds_set(&olddown
);
2070 mdsmap
->get_down_mds_set(&down
);
2071 for (const auto& r
: down
) {
2072 if (oldmap
->have_inst(r
) && olddown
.count(r
) == 0) {
2073 messenger
->mark_down(oldmap
->get_inst(r
).addr
);
2074 handle_mds_failure(r
);
2078 // did someone fail?
2079 // did their addr/inst change?
2081 mdsmap
->get_up_mds_set(up
);
2082 for (const auto& r
: up
) {
2083 auto& info
= mdsmap
->get_info(r
);
2084 if (oldmap
->have_inst(r
)) {
2085 auto& oldinfo
= oldmap
->get_info(r
);
2086 if (info
.inc
!= oldinfo
.inc
) {
2087 messenger
->mark_down(oldinfo
.addr
);
2088 if (info
.state
== MDSMap::STATE_REPLAY
||
2089 info
.state
== MDSMap::STATE_RESOLVE
) {
2091 handle_mds_failure(r
);
2093 assert(info
.state
== MDSMap::STATE_STARTING
||
2094 info
.state
== MDSMap::STATE_ACTIVE
);
2095 // -> stopped (missing) -> starting -> active
2097 mdcache
->migrator
->handle_mds_failure_or_stop(r
);
2101 if (info
.state
== MDSMap::STATE_REPLAY
||
2102 info
.state
== MDSMap::STATE_RESOLVE
) {
2103 // -> starting/creating (missing) -> active (missing) -> replay -> resolve
2105 handle_mds_failure(r
);
2107 assert(info
.state
== MDSMap::STATE_CREATING
||
2108 info
.state
== MDSMap::STATE_STARTING
||
2109 info
.state
== MDSMap::STATE_ACTIVE
);
2116 if (oldstate
!= state
) {
2117 dout(1) << "handle_mds_map state change "
2118 << ceph_mds_state_name(oldstate
) << " --> "
2119 << ceph_mds_state_name(state
) << dendl
;
2120 beacon
.set_want_state(mdsmap
, state
);
2122 if (oldstate
== MDSMap::STATE_STANDBY_REPLAY
) {
2123 dout(10) << "Monitor activated us! Deactivating replay loop" << dendl
;
2124 assert (state
== MDSMap::STATE_REPLAY
);
2126 // did i just recover?
2127 if ((is_active() || is_clientreplay()) &&
2128 (oldstate
== MDSMap::STATE_CREATING
||
2129 oldstate
== MDSMap::STATE_REJOIN
||
2130 oldstate
== MDSMap::STATE_RECONNECT
))
2131 recovery_done(oldstate
);
2135 } else if (is_any_replay()) {
2137 } else if (is_resolve()) {
2139 } else if (is_reconnect()) {
2141 } else if (is_rejoin()) {
2143 } else if (is_clientreplay()) {
2144 clientreplay_start();
2145 } else if (is_creating()) {
2147 } else if (is_starting()) {
2149 } else if (is_stopping()) {
2150 assert(oldstate
== MDSMap::STATE_ACTIVE
);
2157 // is someone else newly resolving?
2158 if (state
>= MDSMap::STATE_RESOLVE
) {
2159 if ((!oldmap
->is_resolving() || !restart
.empty()) && mdsmap
->is_resolving()) {
2160 set
<mds_rank_t
> resolve
;
2161 mdsmap
->get_mds_set(resolve
, MDSMap::STATE_RESOLVE
);
2162 dout(10) << " resolve set is " << resolve
<< dendl
;
2163 calc_recovery_set();
2164 mdcache
->send_resolves();
2169 // is everybody finally rejoining?
2170 if (state
>= MDSMap::STATE_REJOIN
) {
2172 if (!oldmap
->is_rejoining() && mdsmap
->is_rejoining())
2173 rejoin_joint_start();
2176 if (g_conf
->mds_dump_cache_after_rejoin
&&
2177 oldmap
->is_rejoining() && !mdsmap
->is_rejoining())
2178 mdcache
->dump_cache(); // for DEBUG only
2180 if (oldstate
>= MDSMap::STATE_REJOIN
||
2181 oldstate
== MDSMap::STATE_STARTING
) {
2182 // ACTIVE|CLIENTREPLAY|REJOIN => we can discover from them.
2183 set
<mds_rank_t
> olddis
, dis
;
2184 oldmap
->get_mds_set_lower_bound(olddis
, MDSMap::STATE_REJOIN
);
2185 mdsmap
->get_mds_set_lower_bound(dis
, MDSMap::STATE_REJOIN
);
2186 for (const auto& r
: dis
) {
2189 if (!olddis
.count(r
) || restart
.count(r
)) { // newly so?
2190 mdcache
->kick_discovers(r
);
2191 mdcache
->kick_open_ino_peers(r
);
2197 if (oldmap
->is_degraded() && !cluster_degraded
&& state
>= MDSMap::STATE_ACTIVE
) {
2198 dout(1) << "cluster recovered." << dendl
;
2199 auto it
= waiting_for_active_peer
.find(MDS_RANK_NONE
);
2200 if (it
!= waiting_for_active_peer
.end()) {
2201 queue_waiters(it
->second
);
2202 waiting_for_active_peer
.erase(it
);
2206 // did someone go active?
2207 if (state
>= MDSMap::STATE_CLIENTREPLAY
&&
2208 oldstate
>= MDSMap::STATE_CLIENTREPLAY
) {
2209 set
<mds_rank_t
> oldactive
, active
;
2210 oldmap
->get_mds_set_lower_bound(oldactive
, MDSMap::STATE_CLIENTREPLAY
);
2211 mdsmap
->get_mds_set_lower_bound(active
, MDSMap::STATE_CLIENTREPLAY
);
2212 for (const auto& r
: active
) {
2215 if (!oldactive
.count(r
) || restart
.count(r
)) // newly so?
2216 handle_mds_recovery(r
);
2220 if (state
>= MDSMap::STATE_CLIENTREPLAY
) {
2222 set
<mds_rank_t
> oldstopped
, stopped
;
2223 oldmap
->get_stopped_mds_set(oldstopped
);
2224 mdsmap
->get_stopped_mds_set(stopped
);
2225 for (const auto& r
: stopped
)
2226 if (oldstopped
.count(r
) == 0) // newly so?
2227 mdcache
->migrator
->handle_mds_failure_or_stop(r
);
2231 map
<epoch_t
,list
<MDSInternalContextBase
*> >::iterator p
= waiting_for_mdsmap
.begin();
2232 while (p
!= waiting_for_mdsmap
.end() && p
->first
<= mdsmap
->get_epoch()) {
2233 list
<MDSInternalContextBase
*> ls
;
2235 waiting_for_mdsmap
.erase(p
++);
2241 // Before going active, set OSD epoch barrier to latest (so that
2242 // we don't risk handing out caps to clients with old OSD maps that
2243 // might not include barriers from the previous incarnation of this MDS)
2244 set_osd_epoch_barrier(objecter
->with_osdmap(
2245 std::mem_fn(&OSDMap::get_epoch
)));
2250 MDSMap::mds_info_t info
= mdsmap
->get_info(whoami
);
2252 for (map
<mds_gid_t
,MDSMap::mds_info_t
>::const_iterator p
= mdsmap
->get_mds_info().begin();
2253 p
!= mdsmap
->get_mds_info().end();
2255 if (p
->second
.state
== MDSMap::STATE_STANDBY_REPLAY
&&
2256 (p
->second
.standby_for_rank
== whoami
||(info
.name
.length() && p
->second
.standby_for_name
== info
.name
))) {
2261 mdlog
->set_write_iohint(0);
2263 mdlog
->set_write_iohint(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
);
2267 if (oldmap
->get_max_mds() != mdsmap
->get_max_mds()) {
2268 purge_queue
.update_op_limit(*mdsmap
);
2272 void MDSRank::handle_mds_recovery(mds_rank_t who
)
2274 dout(5) << "handle_mds_recovery mds." << who
<< dendl
;
2276 mdcache
->handle_mds_recovery(who
);
2278 if (mdsmap
->get_tableserver() == whoami
) {
2279 snapserver
->handle_mds_recovery(who
);
2282 queue_waiters(waiting_for_active_peer
[who
]);
2283 waiting_for_active_peer
.erase(who
);
2286 void MDSRank::handle_mds_failure(mds_rank_t who
)
2288 if (who
== whoami
) {
2289 dout(5) << "handle_mds_failure for myself; not doing anything" << dendl
;
2292 dout(5) << "handle_mds_failure mds." << who
<< dendl
;
2294 mdcache
->handle_mds_failure(who
);
2296 snapclient
->handle_mds_failure(who
);
2299 bool MDSRankDispatcher::handle_asok_command(
2300 std::string command
, cmdmap_t
& cmdmap
, Formatter
*f
,
2303 if (command
== "dump_ops_in_flight" ||
2305 if (!op_tracker
.dump_ops_in_flight(f
)) {
2306 ss
<< "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
2307 please enable \"osd_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
2309 } else if (command
== "dump_blocked_ops") {
2310 if (!op_tracker
.dump_ops_in_flight(f
, true)) {
2311 ss
<< "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
2312 Please enable \"osd_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
2314 } else if (command
== "dump_historic_ops") {
2315 if (!op_tracker
.dump_historic_ops(f
)) {
2316 ss
<< "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
2317 please enable \"osd_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
2319 } else if (command
== "dump_historic_ops_by_duration") {
2320 if (!op_tracker
.dump_historic_ops(f
, true)) {
2321 ss
<< "op_tracker tracking is not enabled now, so no ops are tracked currently, even those get stuck. \
2322 please enable \"osd_enable_op_tracker\", and the tracker will start to track new ops received afterwards.";
2324 } else if (command
== "osdmap barrier") {
2325 int64_t target_epoch
= 0;
2326 bool got_val
= cmd_getval(g_ceph_context
, cmdmap
, "target_epoch", target_epoch
);
2329 ss
<< "no target epoch given";
2334 set_osd_epoch_barrier(target_epoch
);
2338 bool already_got
= objecter
->wait_for_map(target_epoch
, &cond
);
2340 dout(4) << __func__
<< ": waiting for OSD epoch " << target_epoch
<< dendl
;
2343 } else if (command
== "session ls") {
2344 Mutex::Locker
l(mds_lock
);
2348 dump_sessions(SessionFilter(), f
);
2349 } else if (command
== "session evict") {
2350 std::string client_id
;
2351 const bool got_arg
= cmd_getval(g_ceph_context
, cmdmap
, "client_id", client_id
);
2353 ss
<< "Invalid client_id specified";
2358 std::stringstream dss
;
2359 bool evicted
= evict_client(strtol(client_id
.c_str(), 0, 10), true,
2360 g_conf
->mds_session_blacklist_on_evict
, dss
);
2362 dout(15) << dss
.str() << dendl
;
2366 } else if (command
== "scrub_path") {
2368 vector
<string
> scrubop_vec
;
2369 cmd_getval(g_ceph_context
, cmdmap
, "scrubops", scrubop_vec
);
2370 cmd_getval(g_ceph_context
, cmdmap
, "path", path
);
2371 command_scrub_path(f
, path
, scrubop_vec
);
2372 } else if (command
== "tag path") {
2374 cmd_getval(g_ceph_context
, cmdmap
, "path", path
);
2376 cmd_getval(g_ceph_context
, cmdmap
, "tag", tag
);
2377 command_tag_path(f
, path
, tag
);
2378 } else if (command
== "flush_path") {
2380 cmd_getval(g_ceph_context
, cmdmap
, "path", path
);
2381 command_flush_path(f
, path
);
2382 } else if (command
== "flush journal") {
2383 command_flush_journal(f
);
2384 } else if (command
== "get subtrees") {
2385 command_get_subtrees(f
);
2386 } else if (command
== "export dir") {
2388 if(!cmd_getval(g_ceph_context
, cmdmap
, "path", path
)) {
2389 ss
<< "malformed path";
2393 if(!cmd_getval(g_ceph_context
, cmdmap
, "rank", rank
)) {
2394 ss
<< "malformed rank";
2397 command_export_dir(f
, path
, (mds_rank_t
)rank
);
2398 } else if (command
== "dump cache") {
2399 Mutex::Locker
l(mds_lock
);
2402 if(!cmd_getval(g_ceph_context
, cmdmap
, "path", path
)) {
2403 r
= mdcache
->dump_cache(f
);
2405 r
= mdcache
->dump_cache(path
);
2409 ss
<< "Failed to dump cache: " << cpp_strerror(r
);
2412 } else if (command
== "cache status") {
2413 Mutex::Locker
l(mds_lock
);
2414 mdcache
->cache_status(f
);
2415 } else if (command
== "cache drop") {
2417 if (!cmd_getval(g_ceph_context
, cmdmap
, "timeout", timeout
)) {
2422 command_cache_drop((uint64_t)timeout
, f
, &cond
);
2423 int r
= cond
.wait();
2427 } else if (command
== "dump tree") {
2430 cmd_getval(g_ceph_context
, cmdmap
, "root", root
);
2431 if (!cmd_getval(g_ceph_context
, cmdmap
, "depth", depth
))
2434 Mutex::Locker
l(mds_lock
);
2435 int r
= mdcache
->dump_cache(root
, depth
, f
);
2437 ss
<< "Failed to dump tree: " << cpp_strerror(r
);
2441 } else if (command
== "dump loads") {
2442 Mutex::Locker
l(mds_lock
);
2443 int r
= balancer
->dump_loads(f
);
2445 ss
<< "Failed to dump loads: " << cpp_strerror(r
);
2448 } else if (command
== "force_readonly") {
2449 Mutex::Locker
l(mds_lock
);
2450 mdcache
->force_readonly();
2451 } else if (command
== "dirfrag split") {
2452 command_dirfrag_split(cmdmap
, ss
);
2453 } else if (command
== "dirfrag merge") {
2454 command_dirfrag_merge(cmdmap
, ss
);
2455 } else if (command
== "dirfrag ls") {
2456 command_dirfrag_ls(cmdmap
, ss
, f
);
2464 class C_MDS_Send_Command_Reply
: public MDSInternalContext
{
2468 C_MDS_Send_Command_Reply(MDSRank
*_mds
, MCommand
*_m
) :
2469 MDSInternalContext(_mds
), m(_m
) { m
->get(); }
2471 void send(int r
, boost::string_view ss
) {
2472 std::stringstream ds
;
2476 void send(int r
, boost::string_view ss
, std::stringstream
&ds
) {
2479 MDSDaemon::send_command_reply(m
, mds
, r
, bl
, ss
);
2483 void finish(int r
) override
{
2489 * This function drops the mds_lock, so don't do anything with
2490 * MDSRank after calling it (we could have gone into shutdown): just
2491 * send your result back to the calling client and finish.
2493 void MDSRankDispatcher::evict_clients(const SessionFilter
&filter
, MCommand
*m
)
2495 C_MDS_Send_Command_Reply
*reply
= new C_MDS_Send_Command_Reply(this, m
);
2497 if (is_any_replay()) {
2498 reply
->send(-EAGAIN
, "MDS is replaying log");
2503 std::list
<Session
*> victims
;
2504 const auto sessions
= sessionmap
.get_sessions();
2505 for (const auto p
: sessions
) {
2506 if (!p
.first
.is_client()) {
2510 Session
*s
= p
.second
;
2512 if (filter
.match(*s
, std::bind(&Server::waiting_for_reconnect
, server
, std::placeholders::_1
))) {
2513 victims
.push_back(s
);
2517 dout(20) << __func__
<< " matched " << victims
.size() << " sessions" << dendl
;
2519 if (victims
.empty()) {
2525 C_GatherBuilder
gather(g_ceph_context
, reply
);
2526 for (const auto s
: victims
) {
2527 std::stringstream ss
;
2528 evict_client(s
->info
.inst
.name
.num(), false,
2529 g_conf
->mds_session_blacklist_on_evict
, ss
, gather
.new_sub());
2534 void MDSRankDispatcher::dump_sessions(const SessionFilter
&filter
, Formatter
*f
) const
2536 // Dump sessions, decorated with recovery/replay status
2537 f
->open_array_section("sessions");
2538 const ceph::unordered_map
<entity_name_t
, Session
*> session_map
= sessionmap
.get_sessions();
2539 for (ceph::unordered_map
<entity_name_t
,Session
*>::const_iterator p
= session_map
.begin();
2540 p
!= session_map
.end();
2542 if (!p
->first
.is_client()) {
2546 Session
*s
= p
->second
;
2548 if (!filter
.match(*s
, std::bind(&Server::waiting_for_reconnect
, server
, std::placeholders::_1
))) {
2552 f
->open_object_section("session");
2553 f
->dump_int("id", p
->first
.num());
2555 f
->dump_int("num_leases", s
->leases
.size());
2556 f
->dump_int("num_caps", s
->caps
.size());
2558 f
->dump_string("state", s
->get_state_name());
2559 if (s
->is_open() || s
->is_stale()) {
2560 f
->dump_unsigned("request_load_avg", s
->get_load_avg());
2562 f
->dump_float("uptime", s
->get_session_uptime());
2563 f
->dump_int("replay_requests", is_clientreplay() ? s
->get_request_count() : 0);
2564 f
->dump_unsigned("completed_requests", s
->get_num_completed_requests());
2565 f
->dump_bool("reconnecting", server
->waiting_for_reconnect(p
->first
.num()));
2566 f
->dump_stream("inst") << s
->info
.inst
;
2567 f
->open_object_section("client_metadata");
2568 for (map
<string
, string
>::const_iterator i
= s
->info
.client_metadata
.begin();
2569 i
!= s
->info
.client_metadata
.end(); ++i
) {
2570 f
->dump_string(i
->first
.c_str(), i
->second
);
2572 f
->close_section(); // client_metadata
2573 f
->close_section(); //session
2575 f
->close_section(); //sessions
2578 void MDSRank::command_scrub_path(Formatter
*f
, boost::string_view path
, vector
<string
>& scrubop_vec
)
2581 bool recursive
= false;
2582 bool repair
= false;
2583 for (vector
<string
>::iterator i
= scrubop_vec
.begin() ; i
!= scrubop_vec
.end(); ++i
) {
2586 else if (*i
== "recursive")
2588 else if (*i
== "repair")
2593 Mutex::Locker
l(mds_lock
);
2594 mdcache
->enqueue_scrub(path
, "", force
, recursive
, repair
, f
, &scond
);
2597 // scrub_dentry() finishers will dump the data for us; we're done!
2600 void MDSRank::command_tag_path(Formatter
*f
,
2601 boost::string_view path
, boost::string_view tag
)
2605 Mutex::Locker
l(mds_lock
);
2606 mdcache
->enqueue_scrub(path
, tag
, true, true, false, f
, &scond
);
2611 void MDSRank::command_flush_path(Formatter
*f
, boost::string_view path
)
2615 Mutex::Locker
l(mds_lock
);
2616 mdcache
->flush_dentry(path
, &scond
);
2618 int r
= scond
.wait();
2619 f
->open_object_section("results");
2620 f
->dump_int("return_code", r
);
2621 f
->close_section(); // results
2624 // synchronous wrapper around "journal flush" asynchronous context
2626 void MDSRank::command_flush_journal(Formatter
*f
) {
2627 ceph_assert(f
!= NULL
);
2630 std::stringstream ss
;
2633 Mutex::Locker
locker(mds_lock
);
2634 C_Flush_Journal
*flush_journal
= new C_Flush_Journal(mdcache
, mdlog
, this, &ss
, &cond
);
2635 flush_journal
->send();
2637 int r
= cond
.wait();
2639 f
->open_object_section("result");
2640 f
->dump_string("message", ss
.str());
2641 f
->dump_int("return_code", r
);
2645 void MDSRank::command_get_subtrees(Formatter
*f
)
2648 Mutex::Locker
l(mds_lock
);
2650 std::list
<CDir
*> subtrees
;
2651 mdcache
->list_subtrees(subtrees
);
2653 f
->open_array_section("subtrees");
2654 for (std::list
<CDir
*>::iterator i
= subtrees
.begin(); i
!= subtrees
.end(); ++i
) {
2655 const CDir
*dir
= *i
;
2657 f
->open_object_section("subtree");
2659 f
->dump_bool("is_auth", dir
->is_auth());
2660 f
->dump_int("auth_first", dir
->get_dir_auth().first
);
2661 f
->dump_int("auth_second", dir
->get_dir_auth().second
);
2662 f
->dump_int("export_pin", dir
->inode
->get_export_pin());
2663 f
->open_object_section("dir");
2673 void MDSRank::command_export_dir(Formatter
*f
,
2674 boost::string_view path
,
2677 int r
= _command_export_dir(path
, target
);
2678 f
->open_object_section("results");
2679 f
->dump_int("return_code", r
);
2680 f
->close_section(); // results
2683 int MDSRank::_command_export_dir(
2684 boost::string_view path
,
2687 Mutex::Locker
l(mds_lock
);
2690 if (target
== whoami
|| !mdsmap
->is_up(target
) || !mdsmap
->is_in(target
)) {
2691 derr
<< "bad MDS target " << target
<< dendl
;
2695 CInode
*in
= mdcache
->cache_traverse(fp
);
2697 derr
<< "Bath path '" << path
<< "'" << dendl
;
2700 CDir
*dir
= in
->get_dirfrag(frag_t());
2701 if (!dir
|| !(dir
->is_auth())) {
2702 derr
<< "bad export_dir path dirfrag frag_t() or dir not auth" << dendl
;
2706 mdcache
->migrator
->export_dir(dir
, target
);
2710 CDir
*MDSRank::_command_dirfrag_get(
2711 const cmdmap_t
&cmdmap
,
2715 bool got
= cmd_getval(g_ceph_context
, cmdmap
, "path", path
);
2717 ss
<< "missing path argument";
2721 std::string frag_str
;
2722 if (!cmd_getval(g_ceph_context
, cmdmap
, "frag", frag_str
)) {
2723 ss
<< "missing frag argument";
2727 CInode
*in
= mdcache
->cache_traverse(filepath(path
.c_str()));
2729 // TODO really we should load something in if it's not in cache,
2730 // but the infrastructure is harder, and we might still be unable
2731 // to act on it if someone else is auth.
2732 ss
<< "directory '" << path
<< "' inode not in cache";
2738 if (!fg
.parse(frag_str
.c_str())) {
2739 ss
<< "frag " << frag_str
<< " failed to parse";
2743 CDir
*dir
= in
->get_dirfrag(fg
);
2745 ss
<< "frag 0x" << std::hex
<< in
->ino() << "/" << fg
<< " not in cache ("
2746 "use `dirfrag ls` to see if it should exist)";
2750 if (!dir
->is_auth()) {
2751 ss
<< "frag " << dir
->dirfrag() << " not auth (auth = "
2752 << dir
->authority() << ")";
2759 bool MDSRank::command_dirfrag_split(
2763 Mutex::Locker
l(mds_lock
);
2764 if (!mdsmap
->allows_dirfrags()) {
2765 ss
<< "dirfrags are disallowed by the mds map!";
2770 if (!cmd_getval(g_ceph_context
, cmdmap
, "bits", by
)) {
2771 ss
<< "missing bits argument";
2776 ss
<< "must split by >0 bits";
2780 CDir
*dir
= _command_dirfrag_get(cmdmap
, ss
);
2785 mdcache
->split_dir(dir
, by
);
2790 bool MDSRank::command_dirfrag_merge(
2794 Mutex::Locker
l(mds_lock
);
2796 bool got
= cmd_getval(g_ceph_context
, cmdmap
, "path", path
);
2798 ss
<< "missing path argument";
2802 std::string frag_str
;
2803 if (!cmd_getval(g_ceph_context
, cmdmap
, "frag", frag_str
)) {
2804 ss
<< "missing frag argument";
2808 CInode
*in
= mdcache
->cache_traverse(filepath(path
.c_str()));
2810 ss
<< "directory '" << path
<< "' inode not in cache";
2815 if (!fg
.parse(frag_str
.c_str())) {
2816 ss
<< "frag " << frag_str
<< " failed to parse";
2820 mdcache
->merge_dir(in
, fg
);
2825 bool MDSRank::command_dirfrag_ls(
2830 Mutex::Locker
l(mds_lock
);
2832 bool got
= cmd_getval(g_ceph_context
, cmdmap
, "path", path
);
2834 ss
<< "missing path argument";
2838 CInode
*in
= mdcache
->cache_traverse(filepath(path
.c_str()));
2840 ss
<< "directory inode not in cache";
2844 f
->open_array_section("frags");
2845 std::list
<frag_t
> frags
;
2846 // NB using get_leaves_under instead of get_dirfrags to give
2847 // you the list of what dirfrags may exist, not which are in cache
2848 in
->dirfragtree
.get_leaves_under(frag_t(), frags
);
2849 for (std::list
<frag_t
>::iterator i
= frags
.begin();
2850 i
!= frags
.end(); ++i
) {
2851 f
->open_object_section("frag");
2852 f
->dump_int("value", i
->value());
2853 f
->dump_int("bits", i
->bits());
2854 std::ostringstream frag_str
;
2855 frag_str
<< std::hex
<< i
->value() << "/" << std::dec
<< i
->bits();
2856 f
->dump_string("str", frag_str
.str());
2864 void MDSRank::dump_status(Formatter
*f
) const
2866 if (state
== MDSMap::STATE_REPLAY
||
2867 state
== MDSMap::STATE_STANDBY_REPLAY
) {
2868 mdlog
->dump_replay_status(f
);
2869 } else if (state
== MDSMap::STATE_RESOLVE
) {
2870 mdcache
->dump_resolve_status(f
);
2871 } else if (state
== MDSMap::STATE_RECONNECT
) {
2872 server
->dump_reconnect_status(f
);
2873 } else if (state
== MDSMap::STATE_REJOIN
) {
2874 mdcache
->dump_rejoin_status(f
);
2875 } else if (state
== MDSMap::STATE_CLIENTREPLAY
) {
2876 dump_clientreplay_status(f
);
2878 f
->dump_float("rank_uptime", get_uptime().count());
2881 void MDSRank::dump_clientreplay_status(Formatter
*f
) const
2883 f
->open_object_section("clientreplay_status");
2884 f
->dump_unsigned("clientreplay_queue", replay_queue
.size());
2885 f
->dump_unsigned("active_replay", mdcache
->get_num_client_requests());
2889 void MDSRankDispatcher::update_log_config()
2891 map
<string
,string
> log_to_monitors
;
2892 map
<string
,string
> log_to_syslog
;
2893 map
<string
,string
> log_channel
;
2894 map
<string
,string
> log_prio
;
2895 map
<string
,string
> log_to_graylog
;
2896 map
<string
,string
> log_to_graylog_host
;
2897 map
<string
,string
> log_to_graylog_port
;
2901 if (parse_log_client_options(g_ceph_context
, log_to_monitors
, log_to_syslog
,
2902 log_channel
, log_prio
, log_to_graylog
,
2903 log_to_graylog_host
, log_to_graylog_port
,
2905 clog
->update_config(log_to_monitors
, log_to_syslog
,
2906 log_channel
, log_prio
, log_to_graylog
,
2907 log_to_graylog_host
, log_to_graylog_port
,
2909 dout(10) << __func__
<< " log_to_monitors " << log_to_monitors
<< dendl
;
2912 void MDSRank::create_logger()
2914 dout(10) << "create_logger" << dendl
;
2916 PerfCountersBuilder
mds_plb(g_ceph_context
, "mds", l_mds_first
, l_mds_last
);
2918 // super useful (high prio) perf stats
2919 mds_plb
.add_u64_counter(l_mds_request
, "request", "Requests", "req",
2920 PerfCountersBuilder::PRIO_CRITICAL
);
2921 mds_plb
.add_time_avg(l_mds_reply_latency
, "reply_latency", "Reply latency", "rlat",
2922 PerfCountersBuilder::PRIO_CRITICAL
);
2923 mds_plb
.add_u64(l_mds_inodes
, "inodes", "Inodes", "inos",
2924 PerfCountersBuilder::PRIO_CRITICAL
);
2925 mds_plb
.add_u64_counter(l_mds_forward
, "forward", "Forwarding request", "fwd",
2926 PerfCountersBuilder::PRIO_INTERESTING
);
2927 mds_plb
.add_u64(l_mds_caps
, "caps", "Capabilities", "caps",
2928 PerfCountersBuilder::PRIO_INTERESTING
);
2929 mds_plb
.add_u64_counter(l_mds_exported_inodes
, "exported_inodes", "Exported inodes",
2930 "exi", PerfCountersBuilder::PRIO_INTERESTING
);
2931 mds_plb
.add_u64_counter(l_mds_imported_inodes
, "imported_inodes", "Imported inodes",
2932 "imi", PerfCountersBuilder::PRIO_INTERESTING
);
2934 // useful dir/inode/subtree stats
2935 mds_plb
.set_prio_default(PerfCountersBuilder::PRIO_USEFUL
);
2936 mds_plb
.add_u64_counter(l_mds_dir_fetch
, "dir_fetch", "Directory fetch");
2937 mds_plb
.add_u64_counter(l_mds_dir_commit
, "dir_commit", "Directory commit");
2938 mds_plb
.add_u64_counter(l_mds_dir_split
, "dir_split", "Directory split");
2939 mds_plb
.add_u64_counter(l_mds_dir_merge
, "dir_merge", "Directory merge");
2940 mds_plb
.add_u64(l_mds_inode_max
, "inode_max", "Max inodes, cache size");
2941 mds_plb
.add_u64(l_mds_inodes_pinned
, "inodes_pinned", "Inodes pinned");
2942 mds_plb
.add_u64(l_mds_inodes_expired
, "inodes_expired", "Inodes expired");
2943 mds_plb
.add_u64(l_mds_inodes_with_caps
, "inodes_with_caps",
2944 "Inodes with capabilities");
2945 mds_plb
.add_u64(l_mds_subtrees
, "subtrees", "Subtrees");
2946 mds_plb
.add_u64(l_mds_load_cent
, "load_cent", "Load per cent");
2949 mds_plb
.set_prio_default(PerfCountersBuilder::PRIO_DEBUGONLY
);
2950 mds_plb
.add_u64_counter(l_mds_reply
, "reply", "Replies");
2951 mds_plb
.add_u64(l_mds_inodes_top
, "inodes_top", "Inodes on top");
2952 mds_plb
.add_u64(l_mds_inodes_bottom
, "inodes_bottom", "Inodes on bottom");
2954 l_mds_inodes_pin_tail
, "inodes_pin_tail", "Inodes on pin tail");
2955 mds_plb
.add_u64_counter(l_mds_traverse
, "traverse", "Traverses");
2956 mds_plb
.add_u64_counter(l_mds_traverse_hit
, "traverse_hit", "Traverse hits");
2957 mds_plb
.add_u64_counter(l_mds_traverse_forward
, "traverse_forward",
2958 "Traverse forwards");
2959 mds_plb
.add_u64_counter(l_mds_traverse_discover
, "traverse_discover",
2960 "Traverse directory discovers");
2961 mds_plb
.add_u64_counter(l_mds_traverse_dir_fetch
, "traverse_dir_fetch",
2962 "Traverse incomplete directory content fetchings");
2963 mds_plb
.add_u64_counter(l_mds_traverse_remote_ino
, "traverse_remote_ino",
2964 "Traverse remote dentries");
2965 mds_plb
.add_u64_counter(l_mds_traverse_lock
, "traverse_lock",
2967 mds_plb
.add_u64(l_mds_dispatch_queue_len
, "q", "Dispatch queue length");
2968 mds_plb
.add_u64_counter(l_mds_exported
, "exported", "Exports");
2969 mds_plb
.add_u64_counter(l_mds_imported
, "imported", "Imports");
2971 logger
= mds_plb
.create_perf_counters();
2972 g_ceph_context
->get_perfcounters_collection()->add(logger
);
2976 PerfCountersBuilder
mdm_plb(g_ceph_context
, "mds_mem", l_mdm_first
, l_mdm_last
);
2977 mdm_plb
.add_u64(l_mdm_ino
, "ino", "Inodes", "ino",
2978 PerfCountersBuilder::PRIO_INTERESTING
);
2979 mdm_plb
.add_u64(l_mdm_dn
, "dn", "Dentries", "dn",
2980 PerfCountersBuilder::PRIO_INTERESTING
);
2982 mdm_plb
.set_prio_default(PerfCountersBuilder::PRIO_USEFUL
);
2983 mdm_plb
.add_u64_counter(l_mdm_inoa
, "ino+", "Inodes opened");
2984 mdm_plb
.add_u64_counter(l_mdm_inos
, "ino-", "Inodes closed");
2985 mdm_plb
.add_u64(l_mdm_dir
, "dir", "Directories");
2986 mdm_plb
.add_u64_counter(l_mdm_dira
, "dir+", "Directories opened");
2987 mdm_plb
.add_u64_counter(l_mdm_dirs
, "dir-", "Directories closed");
2988 mdm_plb
.add_u64_counter(l_mdm_dna
, "dn+", "Dentries opened");
2989 mdm_plb
.add_u64_counter(l_mdm_dns
, "dn-", "Dentries closed");
2990 mdm_plb
.add_u64(l_mdm_cap
, "cap", "Capabilities");
2991 mdm_plb
.add_u64_counter(l_mdm_capa
, "cap+", "Capabilities added");
2992 mdm_plb
.add_u64_counter(l_mdm_caps
, "cap-", "Capabilities removed");
2993 mdm_plb
.add_u64(l_mdm_heap
, "heap", "Heap size");
2994 mdm_plb
.add_u64(l_mdm_buf
, "buf", "Buffer size");
2996 mdm_plb
.set_prio_default(PerfCountersBuilder::PRIO_DEBUGONLY
);
2997 mdm_plb
.add_u64(l_mdm_rss
, "rss", "RSS");
2999 mlogger
= mdm_plb
.create_perf_counters();
3000 g_ceph_context
->get_perfcounters_collection()->add(mlogger
);
3003 mdlog
->create_logger();
3004 server
->create_logger();
3005 purge_queue
.create_logger();
3006 sessionmap
.register_perfcounters();
3007 mdcache
->register_perfcounters();
3010 void MDSRank::check_ops_in_flight()
3012 vector
<string
> warnings
;
3014 if (op_tracker
.check_ops_in_flight(warnings
, &slow
)) {
3015 for (vector
<string
>::iterator i
= warnings
.begin();
3016 i
!= warnings
.end();
3022 // set mds slow request count
3023 mds_slow_req_count
= slow
;
3027 void MDSRankDispatcher::handle_osd_map()
3029 if (is_active() && snapserver
) {
3030 snapserver
->check_osd_map(true);
3033 server
->handle_osd_map();
3035 purge_queue
.update_op_limit(*mdsmap
);
3037 std::set
<entity_addr_t
> newly_blacklisted
;
3038 objecter
->consume_blacklist_events(&newly_blacklisted
);
3039 auto epoch
= objecter
->with_osdmap([](const OSDMap
&o
){return o
.get_epoch();});
3040 dout(4) << "handle_osd_map epoch " << epoch
<< ", "
3041 << newly_blacklisted
.size() << " new blacklist entries" << dendl
;
3042 auto victims
= server
->apply_blacklist(newly_blacklisted
);
3044 set_osd_epoch_barrier(epoch
);
3048 // By default the objecter only requests OSDMap updates on use,
3049 // we would like to always receive the latest maps in order to
3050 // apply policy based on the FULL flag.
3051 objecter
->maybe_request_map();
3054 bool MDSRank::evict_client(int64_t session_id
,
3055 bool wait
, bool blacklist
, std::stringstream
& err_ss
,
3058 assert(mds_lock
.is_locked_by_me());
3060 // Mutually exclusive args
3061 assert(!(wait
&& on_killed
!= nullptr));
3063 if (is_any_replay()) {
3064 err_ss
<< "MDS is replaying log";
3068 Session
*session
= sessionmap
.get_session(
3069 entity_name_t(CEPH_ENTITY_TYPE_CLIENT
, session_id
));
3071 err_ss
<< "session " << session_id
<< " not in sessionmap!";
3075 dout(4) << "Preparing blacklist command... (wait=" << wait
<< ")" << dendl
;
3077 ss
<< "{\"prefix\":\"osd blacklist\", \"blacklistop\":\"add\",";
3078 ss
<< "\"addr\":\"";
3079 ss
<< session
->info
.inst
.addr
;
3081 std::string tmp
= ss
.str();
3082 std::vector
<std::string
> cmd
= {tmp
};
3084 auto kill_client_session
= [this, session_id
, wait
, on_killed
](){
3085 assert(mds_lock
.is_locked_by_me());
3086 Session
*session
= sessionmap
.get_session(
3087 entity_name_t(CEPH_ENTITY_TYPE_CLIENT
, session_id
));
3089 if (on_killed
|| !wait
) {
3090 server
->kill_session(session
, on_killed
);
3092 C_SaferCond on_safe
;
3093 server
->kill_session(session
, &on_safe
);
3100 dout(1) << "session " << session_id
<< " was removed while we waited "
3101 "for blacklist" << dendl
;
3103 // Even though it wasn't us that removed it, kick our completion
3104 // as the session has been removed.
3106 on_killed
->complete(0);
3111 auto apply_blacklist
= [this, cmd
](std::function
<void ()> fn
){
3112 assert(mds_lock
.is_locked_by_me());
3114 Context
*on_blacklist_done
= new FunctionContext([this, fn
](int r
) {
3115 objecter
->wait_for_latest_osdmap(
3117 new FunctionContext([this, fn
](int r
) {
3118 Mutex::Locker
l(mds_lock
);
3119 auto epoch
= objecter
->with_osdmap([](const OSDMap
&o
){
3120 return o
.get_epoch();
3123 set_osd_epoch_barrier(epoch
);
3130 dout(4) << "Sending mon blacklist command: " << cmd
[0] << dendl
;
3131 monc
->start_mon_command(cmd
, {}, nullptr, nullptr, on_blacklist_done
);
3136 C_SaferCond inline_ctx
;
3137 apply_blacklist([&inline_ctx
](){inline_ctx
.complete(0);});
3143 // We dropped mds_lock, so check that session still exists
3144 session
= sessionmap
.get_session(entity_name_t(CEPH_ENTITY_TYPE_CLIENT
,
3147 dout(1) << "session " << session_id
<< " was removed while we waited "
3148 "for blacklist" << dendl
;
3151 kill_client_session();
3154 apply_blacklist(kill_client_session
);
3156 kill_client_session();
3163 void MDSRank::bcast_mds_map()
3165 dout(7) << "bcast_mds_map " << mdsmap
->get_epoch() << dendl
;
3167 // share the map with mounted clients
3168 set
<Session
*> clients
;
3169 sessionmap
.get_client_session_set(clients
);
3170 for (set
<Session
*>::const_iterator p
= clients
.begin();
3173 (*p
)->connection
->send_message(new MMDSMap(monc
->get_fsid(), mdsmap
));
3174 last_client_mdsmap_bcast
= mdsmap
->get_epoch();
3177 MDSRankDispatcher::MDSRankDispatcher(
3180 LogChannelRef
&clog_
,
3186 Context
*respawn_hook_
,
3187 Context
*suicide_hook_
)
3188 : MDSRank(whoami_
, mds_lock_
, clog_
, timer_
, beacon_
, mdsmap_
,
3189 msgr
, monc_
, respawn_hook_
, suicide_hook_
)
3192 bool MDSRankDispatcher::handle_command(
3193 const cmdmap_t
&cmdmap
,
3196 std::stringstream
*ds
,
3197 std::stringstream
*ss
,
3198 Context
**run_later
,
3201 assert(r
!= nullptr);
3202 assert(ds
!= nullptr);
3203 assert(ss
!= nullptr);
3208 cmd_getval(g_ceph_context
, cmdmap
, "prefix", prefix
);
3210 if (prefix
== "session ls" || prefix
== "client ls") {
3211 std::vector
<std::string
> filter_args
;
3212 cmd_getval(g_ceph_context
, cmdmap
, "filters", filter_args
);
3214 SessionFilter filter
;
3215 *r
= filter
.parse(filter_args
, ss
);
3220 JSONFormatter
f(true);
3221 dump_sessions(filter
, &f
);
3224 } else if (prefix
== "session evict" || prefix
== "client evict") {
3225 std::vector
<std::string
> filter_args
;
3226 cmd_getval(g_ceph_context
, cmdmap
, "filters", filter_args
);
3228 SessionFilter filter
;
3229 *r
= filter
.parse(filter_args
, ss
);
3234 evict_clients(filter
, m
);
3236 *need_reply
= false;
3238 } else if (prefix
== "damage ls") {
3239 JSONFormatter
f(true);
3240 damage_table
.dump(&f
);
3243 } else if (prefix
== "damage rm") {
3244 damage_entry_id_t id
= 0;
3245 bool got
= cmd_getval(g_ceph_context
, cmdmap
, "damage_id", (int64_t&)id
);
3251 damage_table
.erase(id
);
3253 } else if (prefix
== "cache drop") {
3255 if (!cmd_getval(g_ceph_context
, cmdmap
, "timeout", timeout
)) {
3259 JSONFormatter
*f
= new JSONFormatter(true);
3260 C_MDS_Send_Command_Reply
*reply
= new C_MDS_Send_Command_Reply(this, m
);
3261 Context
*on_finish
= new FunctionContext([this, f
, reply
](int r
) {
3262 cache_drop_send_reply(f
, reply
, r
);
3267 *need_reply
= false;
3268 *run_later
= new C_OnFinisher(
3269 new FunctionContext([this, timeout
, f
, on_finish
](int _
) {
3270 command_cache_drop((uint64_t)timeout
, f
, on_finish
);
3279 void MDSRank::cache_drop_send_reply(Formatter
*f
, C_MDS_Send_Command_Reply
*reply
, int r
) {
3280 dout(20) << __func__
<< ": r=" << r
<< dendl
;
3282 std::stringstream ds
;
3283 std::stringstream ss
;
3290 reply
->send(r
, ss
.str(), ds
);
3293 void MDSRank::command_cache_drop(uint64_t timeout
, Formatter
*f
, Context
*on_finish
) {
3294 dout(20) << __func__
<< dendl
;
3296 Mutex::Locker
locker(mds_lock
);
3297 C_Drop_Cache
*request
= new C_Drop_Cache(server
, mdcache
, mdlog
, this,
3298 timeout
, f
, on_finish
);
3302 epoch_t
MDSRank::get_osd_epoch() const
3304 return objecter
->with_osdmap(std::mem_fn(&OSDMap::get_epoch
));