1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
18 #include "SessionMap.h"
19 #include "osdc/Filer.h"
20 #include "common/Finisher.h"
22 #include "common/config.h"
23 #include "common/errno.h"
24 #include "common/DecayCounter.h"
25 #include "include/ceph_assert.h"
26 #include "include/stringify.h"
28 #define dout_context g_ceph_context
29 #define dout_subsys ceph_subsys_mds
31 #define dout_prefix *_dout << "mds." << rank << ".sessionmap "
36 class SessionMapIOContext
: public MDSIOContextBase
39 SessionMap
*sessionmap
;
40 MDSRank
*get_mds() override
{return sessionmap
->mds
;}
42 explicit SessionMapIOContext(SessionMap
*sessionmap_
) : sessionmap(sessionmap_
) {
43 ceph_assert(sessionmap
!= NULL
);
48 SessionMap::SessionMap(MDSRank
*m
)
50 mds_session_metadata_threshold(g_conf().get_val
<Option::size_t>("mds_session_metadata_threshold")) {
53 void SessionMap::register_perfcounters()
55 PerfCountersBuilder
plb(g_ceph_context
, "mds_sessions",
56 l_mdssm_first
, l_mdssm_last
);
58 plb
.add_u64(l_mdssm_session_count
, "session_count",
59 "Session count", "sess", PerfCountersBuilder::PRIO_INTERESTING
);
61 plb
.set_prio_default(PerfCountersBuilder::PRIO_USEFUL
);
62 plb
.add_u64_counter(l_mdssm_session_add
, "session_add",
64 plb
.add_u64_counter(l_mdssm_session_remove
, "session_remove",
66 plb
.add_u64(l_mdssm_session_open
, "sessions_open",
67 "Sessions currently open");
68 plb
.add_u64(l_mdssm_session_stale
, "sessions_stale",
69 "Sessions currently stale");
70 plb
.add_u64(l_mdssm_total_load
, "total_load", "Total Load");
71 plb
.add_u64(l_mdssm_avg_load
, "average_load", "Average Load");
72 plb
.add_u64(l_mdssm_avg_session_uptime
, "avg_session_uptime",
73 "Average session uptime");
74 plb
.add_u64(l_mdssm_metadata_threshold_sessions_evicted
, "mdthresh_evicted",
75 "Sessions evicted on reaching metadata threshold");
77 logger
= plb
.create_perf_counters();
78 g_ceph_context
->get_perfcounters_collection()->add(logger
);
81 void SessionMap::dump()
83 dout(10) << "dump" << dendl
;
84 for (ceph::unordered_map
<entity_name_t
,Session
*>::iterator p
= session_map
.begin();
85 p
!= session_map
.end();
87 dout(10) << p
->first
<< " " << p
->second
88 << " state " << p
->second
->get_state_name()
89 << " completed " << p
->second
->info
.completed_requests
90 << " free_prealloc_inos " << p
->second
->free_prealloc_inos
91 << " delegated_inos " << p
->second
->delegated_inos
100 object_t
SessionMap::get_object_name() const
103 snprintf(s
, sizeof(s
), "mds%d_sessionmap", int(mds
->get_nodeid()));
108 class C_IO_SM_Load
: public SessionMapIOContext
{
110 const bool first
; //< Am I the initial (header) load?
111 int header_r
; //< Return value from OMAP header read
112 int values_r
; //< Return value from OMAP value read
113 bufferlist header_bl
;
114 std::map
<std::string
, bufferlist
> session_vals
;
115 bool more_session_vals
= false;
117 C_IO_SM_Load(SessionMap
*cm
, const bool f
)
118 : SessionMapIOContext(cm
), first(f
), header_r(0), values_r(0) {}
120 void finish(int r
) override
{
121 sessionmap
->_load_finish(r
, header_r
, values_r
, first
, header_bl
, session_vals
,
124 void print(ostream
& out
) const override
{
125 out
<< "session_load";
132 * Decode OMAP header. Call this once when loading.
134 void SessionMapStore::decode_header(
135 bufferlist
&header_bl
)
137 auto q
= header_bl
.cbegin();
143 void SessionMapStore::encode_header(
144 bufferlist
*header_bl
)
146 ENCODE_START(1, 1, *header_bl
);
147 encode(version
, *header_bl
);
148 ENCODE_FINISH(*header_bl
);
152 * Decode and insert some serialized OMAP values. Call this
153 * repeatedly to insert batched loads.
155 void SessionMapStore::decode_values(std::map
<std::string
, bufferlist
> &session_vals
)
157 for (std::map
<std::string
, bufferlist
>::iterator i
= session_vals
.begin();
158 i
!= session_vals
.end(); ++i
) {
162 bool parsed
= inst
.name
.parse(i
->first
);
164 derr
<< "Corrupt entity name '" << i
->first
<< "' in sessionmap" << dendl
;
165 throw buffer::malformed_input("Corrupt entity name in sessionmap");
168 Session
*s
= get_or_add_session(inst
);
169 if (s
->is_closed()) {
170 s
->set_state(Session::STATE_OPEN
);
171 s
->set_load_avg_decay_rate(decay_rate
);
173 auto q
= i
->second
.cbegin();
179 * An OMAP read finished.
181 void SessionMap::_load_finish(
186 bufferlist
&header_bl
,
187 std::map
<std::string
, bufferlist
> &session_vals
,
188 bool more_session_vals
)
190 if (operation_r
< 0) {
191 derr
<< "_load_finish got " << cpp_strerror(operation_r
) << dendl
;
192 mds
->clog
->error() << "error reading sessionmap '" << get_object_name()
193 << "' " << operation_r
<< " ("
194 << cpp_strerror(operation_r
) << ")";
196 ceph_abort(); // Should be unreachable because damaged() calls respawn()
202 derr
<< __func__
<< ": header error: " << cpp_strerror(header_r
) << dendl
;
203 mds
->clog
->error() << "error reading sessionmap header "
204 << header_r
<< " (" << cpp_strerror(header_r
) << ")";
206 ceph_abort(); // Should be unreachable because damaged() calls respawn()
209 if(header_bl
.length() == 0) {
210 dout(4) << __func__
<< ": header missing, loading legacy..." << dendl
;
216 decode_header(header_bl
);
217 } catch (buffer::error
&e
) {
218 mds
->clog
->error() << "corrupt sessionmap header: " << e
.what();
220 ceph_abort(); // Should be unreachable because damaged() calls respawn()
222 dout(10) << __func__
<< " loaded version " << version
<< dendl
;
226 derr
<< __func__
<< ": error reading values: "
227 << cpp_strerror(values_r
) << dendl
;
228 mds
->clog
->error() << "error reading sessionmap values: "
229 << values_r
<< " (" << cpp_strerror(values_r
) << ")";
231 ceph_abort(); // Should be unreachable because damaged() calls respawn()
234 // Decode session_vals
236 decode_values(session_vals
);
237 } catch (buffer::error
&e
) {
238 mds
->clog
->error() << "corrupt sessionmap values: " << e
.what();
240 ceph_abort(); // Should be unreachable because damaged() calls respawn()
243 if (more_session_vals
) {
244 // Issue another read if we're not at the end of the omap
245 const std::string last_key
= session_vals
.rbegin()->first
;
246 dout(10) << __func__
<< ": continue omap load from '"
247 << last_key
<< "'" << dendl
;
248 object_t oid
= get_object_name();
249 object_locator_t
oloc(mds
->get_metadata_pool());
250 C_IO_SM_Load
*c
= new C_IO_SM_Load(this, false);
252 op
.omap_get_vals(last_key
, "", g_conf()->mds_sessionmap_keys_per_op
,
253 &c
->session_vals
, &c
->more_session_vals
, &c
->values_r
);
254 mds
->objecter
->read(oid
, oloc
, op
, CEPH_NOSNAP
, NULL
, 0,
255 new C_OnFinisher(c
, mds
->finisher
));
257 // I/O is complete. Update `by_state`
258 dout(10) << __func__
<< ": omap load complete" << dendl
;
259 for (ceph::unordered_map
<entity_name_t
, Session
*>::iterator i
= session_map
.begin();
260 i
!= session_map
.end(); ++i
) {
261 Session
*s
= i
->second
;
262 auto by_state_entry
= by_state
.find(s
->get_state());
263 if (by_state_entry
== by_state
.end())
264 by_state_entry
= by_state
.emplace(s
->get_state(),
265 new xlist
<Session
*>).first
;
266 by_state_entry
->second
->push_back(&s
->item_session_list
);
269 // Population is complete. Trigger load waiters.
270 dout(10) << __func__
<< ": v " << version
271 << ", " << session_map
.size() << " sessions" << dendl
;
272 projected
= committing
= committed
= version
;
274 finish_contexts(g_ceph_context
, waiting_for_load
);
279 * Populate session state from OMAP records in this
280 * rank's sessionmap object.
282 void SessionMap::load(MDSContext
*onload
)
284 dout(10) << "load" << dendl
;
287 waiting_for_load
.push_back(onload
);
289 C_IO_SM_Load
*c
= new C_IO_SM_Load(this, true);
290 object_t oid
= get_object_name();
291 object_locator_t
oloc(mds
->get_metadata_pool());
294 op
.omap_get_header(&c
->header_bl
, &c
->header_r
);
295 op
.omap_get_vals("", "", g_conf()->mds_sessionmap_keys_per_op
,
296 &c
->session_vals
, &c
->more_session_vals
, &c
->values_r
);
298 mds
->objecter
->read(oid
, oloc
, op
, CEPH_NOSNAP
, NULL
, 0, new C_OnFinisher(c
, mds
->finisher
));
302 class C_IO_SM_LoadLegacy
: public SessionMapIOContext
{
305 explicit C_IO_SM_LoadLegacy(SessionMap
*cm
) : SessionMapIOContext(cm
) {}
306 void finish(int r
) override
{
307 sessionmap
->_load_legacy_finish(r
, bl
);
309 void print(ostream
& out
) const override
{
310 out
<< "session_load_legacy";
317 * Load legacy (object data blob) SessionMap format, assuming
318 * that waiting_for_load has already been populated with
319 * the relevant completion. This is the fallback if we do not
320 * find an OMAP header when attempting to load normally.
322 void SessionMap::load_legacy()
324 dout(10) << __func__
<< dendl
;
326 C_IO_SM_LoadLegacy
*c
= new C_IO_SM_LoadLegacy(this);
327 object_t oid
= get_object_name();
328 object_locator_t
oloc(mds
->get_metadata_pool());
330 mds
->objecter
->read_full(oid
, oloc
, CEPH_NOSNAP
, &c
->bl
, 0,
331 new C_OnFinisher(c
, mds
->finisher
));
334 void SessionMap::_load_legacy_finish(int r
, bufferlist
&bl
)
336 auto blp
= bl
.cbegin();
338 derr
<< "_load_finish got " << cpp_strerror(r
) << dendl
;
339 ceph_abort_msg("failed to load sessionmap");
342 decode_legacy(blp
); // note: this sets last_cap_renew = now()
343 dout(10) << "_load_finish v " << version
344 << ", " << session_map
.size() << " sessions, "
345 << bl
.length() << " bytes"
347 projected
= committing
= committed
= version
;
350 // Mark all sessions dirty, so that on next save() we will write
351 // a complete OMAP version of the data loaded from the legacy format
352 for (ceph::unordered_map
<entity_name_t
, Session
*>::iterator i
= session_map
.begin();
353 i
!= session_map
.end(); ++i
) {
354 // Don't use mark_dirty because on this occasion we want to ignore the
355 // keys_per_op limit and do one big write (upgrade must be atomic)
356 dirty_sessions
.insert(i
->first
);
358 loaded_legacy
= true;
360 finish_contexts(g_ceph_context
, waiting_for_load
);
368 class C_IO_SM_Save
: public SessionMapIOContext
{
371 C_IO_SM_Save(SessionMap
*cm
, version_t v
) : SessionMapIOContext(cm
), version(v
) {}
372 void finish(int r
) override
{
374 get_mds()->handle_write_error(r
);
376 sessionmap
->_save_finish(version
);
379 void print(ostream
& out
) const override
{
380 out
<< "session_save";
385 bool SessionMap::validate_and_encode_session(MDSRank
*mds
, Session
*session
, bufferlist
& bl
) {
386 session
->info
.encode(bl
, mds
->mdsmap
->get_up_features());
387 return bl
.length() < mds_session_metadata_threshold
;
390 void SessionMap::save(MDSContext
*onsave
, version_t needv
)
392 dout(10) << __func__
<< ": needv " << needv
<< ", v " << version
<< dendl
;
394 if (needv
&& committing
>= needv
) {
395 ceph_assert(committing
> committed
);
396 commit_waiters
[committing
].push_back(onsave
);
400 commit_waiters
[version
].push_back(onsave
);
402 committing
= version
;
404 object_t oid
= get_object_name();
405 object_locator_t
oloc(mds
->get_metadata_pool());
409 /* Compose OSD OMAP transaction for full write */
410 bufferlist header_bl
;
411 encode_header(&header_bl
);
412 op
.omap_set_header(header_bl
);
414 /* If we loaded a legacy sessionmap, then erase the old data. If
415 * an old-versioned MDS tries to read it, it'll fail out safely
416 * with an end_of_buffer exception */
418 dout(4) << __func__
<< " erasing legacy sessionmap" << dendl
;
420 loaded_legacy
= false; // only need to truncate once.
423 dout(20) << " updating keys:" << dendl
;
424 map
<string
, bufferlist
> to_set
;
425 std::set
<entity_name_t
> to_blocklist
;
426 for(std::set
<entity_name_t
>::iterator i
= dirty_sessions
.begin();
427 i
!= dirty_sessions
.end(); ++i
) {
428 const entity_name_t name
= *i
;
429 Session
*session
= session_map
[name
];
431 if (session
->is_open() ||
432 session
->is_closing() ||
433 session
->is_stale() ||
434 session
->is_killing()) {
435 dout(20) << " " << name
<< dendl
;
439 if (!validate_and_encode_session(mds
, session
, bl
)) {
440 derr
<< __func__
<< ": session (" << name
<< ") exceeds"
441 << " sesion metadata threshold - blocklisting" << dendl
;
442 to_blocklist
.emplace(name
);
447 CachedStackStringStream css
;
451 to_set
[std::string(css
->strv())] = bl
;
453 session
->clear_dirty_completed_requests();
455 dout(20) << " " << name
<< " (ignoring)" << dendl
;
458 if (!to_set
.empty()) {
462 dout(20) << " removing keys:" << dendl
;
463 set
<string
> to_remove
;
464 for(std::set
<entity_name_t
>::const_iterator i
= null_sessions
.begin();
465 i
!= null_sessions
.end(); ++i
) {
466 dout(20) << " " << *i
<< dendl
;
467 CachedStackStringStream css
;
469 to_remove
.insert(css
->str());
471 if (!to_remove
.empty()) {
472 op
.omap_rm_keys(to_remove
);
475 dirty_sessions
.clear();
476 null_sessions
.clear();
478 mds
->objecter
->mutate(oid
, oloc
, op
, snapc
,
479 ceph::real_clock::now(),
481 new C_OnFinisher(new C_IO_SM_Save(this, version
),
483 apply_blocklist(to_blocklist
);
484 logger
->inc(l_mdssm_metadata_threshold_sessions_evicted
, to_blocklist
.size());
487 void SessionMap::_save_finish(version_t v
)
489 dout(10) << "_save_finish v" << v
<< dendl
;
492 finish_contexts(g_ceph_context
, commit_waiters
[v
]);
493 commit_waiters
.erase(v
);
498 * Deserialize sessions, and update by_state index
500 void SessionMap::decode_legacy(bufferlist::const_iterator
&p
)
502 // Populate `sessions`
503 SessionMapStore::decode_legacy(p
);
506 for (ceph::unordered_map
<entity_name_t
, Session
*>::iterator i
= session_map
.begin();
507 i
!= session_map
.end(); ++i
) {
508 Session
*s
= i
->second
;
509 auto by_state_entry
= by_state
.find(s
->get_state());
510 if (by_state_entry
== by_state
.end())
511 by_state_entry
= by_state
.emplace(s
->get_state(),
512 new xlist
<Session
*>).first
;
513 by_state_entry
->second
->push_back(&s
->item_session_list
);
517 uint64_t SessionMap::set_state(Session
*session
, int s
) {
518 if (session
->state
!= s
) {
519 session
->set_state(s
);
520 auto by_state_entry
= by_state
.find(s
);
521 if (by_state_entry
== by_state
.end())
522 by_state_entry
= by_state
.emplace(s
, new xlist
<Session
*>).first
;
523 by_state_entry
->second
->push_back(&session
->item_session_list
);
525 if (session
->is_open() || session
->is_stale()) {
526 session
->set_load_avg_decay_rate(decay_rate
);
529 // refresh number of sessions for states which have perf
530 // couters associated
531 logger
->set(l_mdssm_session_open
,
532 get_session_count_in_state(Session::STATE_OPEN
));
533 logger
->set(l_mdssm_session_stale
,
534 get_session_count_in_state(Session::STATE_STALE
));
537 return session
->get_state_seq();
540 void SessionMapStore::decode_legacy(bufferlist::const_iterator
& p
)
542 auto now
= clock::now();
545 if (pre
== (uint64_t)-1) {
546 DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, p
);
547 ceph_assert(struct_v
>= 2);
553 decode(inst
.name
, p
);
554 Session
*s
= get_or_add_session(inst
);
555 if (s
->is_closed()) {
556 s
->set_state(Session::STATE_OPEN
);
557 s
->set_load_avg_decay_rate(decay_rate
);
564 // --- old format ----
567 // this is a meaningless upper bound. can be ignored.
571 while (n
-- && !p
.end()) {
573 Session
*s
= new Session(ConnectionRef());
576 auto& name
= s
->info
.inst
.name
;
577 auto it
= session_map
.find(name
);
578 if (it
!= session_map
.end()) {
579 // eager client connected too fast! aie.
580 dout(10) << " already had session for " << name
<< ", recovering" << dendl
;
589 s
->set_state(Session::STATE_OPEN
);
590 s
->set_load_avg_decay_rate(decay_rate
);
591 s
->last_cap_renew
= now
;
596 void Session::dump(Formatter
*f
, bool cap_dump
) const
598 f
->dump_int("id", info
.inst
.name
.num());
599 f
->dump_object("entity", info
.inst
);
600 f
->dump_string("state", get_state_name());
601 f
->dump_int("num_leases", leases
.size());
602 f
->dump_int("num_caps", caps
.size());
604 f
->open_array_section("caps");
605 for (const auto& cap
: caps
) {
606 f
->dump_object("cap", *cap
);
610 if (is_open() || is_stale()) {
611 f
->dump_unsigned("request_load_avg", get_load_avg());
613 f
->dump_float("uptime", get_session_uptime());
614 f
->dump_unsigned("requests_in_flight", get_request_count());
615 f
->dump_unsigned("num_completed_requests", get_num_completed_requests());
616 f
->dump_unsigned("num_completed_flushes", get_num_completed_flushes());
617 f
->dump_bool("reconnecting", reconnecting
);
618 f
->dump_object("recall_caps", recall_caps
);
619 f
->dump_object("release_caps", release_caps
);
620 f
->dump_object("recall_caps_throttle", recall_caps_throttle
);
621 f
->dump_object("recall_caps_throttle2o", recall_caps_throttle2o
);
622 f
->dump_object("session_cache_liveness", session_cache_liveness
);
623 f
->dump_object("cap_acquisition", cap_acquisition
);
625 f
->open_array_section("delegated_inos");
626 for (const auto& [start
, len
] : delegated_inos
) {
627 f
->open_object_section("ino_range");
628 f
->dump_stream("start") << start
;
629 f
->dump_unsigned("length", len
);
637 void SessionMapStore::dump(Formatter
*f
) const
639 f
->open_array_section("sessions");
640 for (const auto& p
: session_map
) {
641 f
->dump_object("session", *p
.second
);
643 f
->close_section(); // Sessions
646 void SessionMapStore::generate_test_instances(std::list
<SessionMapStore
*>& ls
)
648 // pretty boring for now
649 ls
.push_back(new SessionMapStore());
652 void SessionMap::wipe()
654 dout(1) << "wipe start" << dendl
;
656 while (!session_map
.empty()) {
657 Session
*s
= session_map
.begin()->second
;
660 version
= ++projected
;
661 dout(1) << "wipe result" << dendl
;
663 dout(1) << "wipe done" << dendl
;
666 void SessionMap::wipe_ino_prealloc()
668 for (ceph::unordered_map
<entity_name_t
,Session
*>::iterator p
= session_map
.begin();
669 p
!= session_map
.end();
671 p
->second
->pending_prealloc_inos
.clear();
672 p
->second
->free_prealloc_inos
.clear();
673 p
->second
->delegated_inos
.clear();
674 p
->second
->info
.prealloc_inos
.clear();
676 projected
= ++version
;
679 void SessionMap::add_session(Session
*s
)
681 dout(10) << __func__
<< " s=" << s
<< " name=" << s
->info
.inst
.name
<< dendl
;
683 ceph_assert(session_map
.count(s
->info
.inst
.name
) == 0);
684 session_map
[s
->info
.inst
.name
] = s
;
685 auto by_state_entry
= by_state
.find(s
->state
);
686 if (by_state_entry
== by_state
.end())
687 by_state_entry
= by_state
.emplace(s
->state
, new xlist
<Session
*>).first
;
688 by_state_entry
->second
->push_back(&s
->item_session_list
);
691 update_average_birth_time(*s
);
693 logger
->set(l_mdssm_session_count
, session_map
.size());
694 logger
->inc(l_mdssm_session_add
);
697 void SessionMap::remove_session(Session
*s
)
699 dout(10) << __func__
<< " s=" << s
<< " name=" << s
->info
.inst
.name
<< dendl
;
701 update_average_birth_time(*s
, false);
703 s
->trim_completed_requests(0);
704 s
->item_session_list
.remove_myself();
705 session_map
.erase(s
->info
.inst
.name
);
706 dirty_sessions
.erase(s
->info
.inst
.name
);
707 null_sessions
.insert(s
->info
.inst
.name
);
710 logger
->set(l_mdssm_session_count
, session_map
.size());
711 logger
->inc(l_mdssm_session_remove
);
714 void SessionMap::touch_session(Session
*session
)
716 dout(10) << __func__
<< " s=" << session
<< " name=" << session
->info
.inst
.name
<< dendl
;
718 // Move to the back of the session list for this state (should
719 // already be on a list courtesy of add_session and set_state)
720 ceph_assert(session
->item_session_list
.is_on_list());
721 auto by_state_entry
= by_state
.find(session
->state
);
722 if (by_state_entry
== by_state
.end())
723 by_state_entry
= by_state
.emplace(session
->state
,
724 new xlist
<Session
*>).first
;
725 by_state_entry
->second
->push_back(&session
->item_session_list
);
727 session
->last_cap_renew
= clock::now();
730 void SessionMap::_mark_dirty(Session
*s
, bool may_save
)
732 if (dirty_sessions
.count(s
->info
.inst
.name
))
736 dirty_sessions
.size() >= g_conf()->mds_sessionmap_keys_per_op
) {
737 // Pre-empt the usual save() call from journal segment trim, in
738 // order to avoid building up an oversized OMAP update operation
739 // from too many sessions modified at once
740 save(new C_MDSInternalNoop
, version
);
743 null_sessions
.erase(s
->info
.inst
.name
);
744 dirty_sessions
.insert(s
->info
.inst
.name
);
747 void SessionMap::mark_dirty(Session
*s
, bool may_save
)
749 dout(20) << __func__
<< " s=" << s
<< " name=" << s
->info
.inst
.name
750 << " v=" << version
<< dendl
;
752 _mark_dirty(s
, may_save
);
757 void SessionMap::replay_dirty_session(Session
*s
)
759 dout(20) << __func__
<< " s=" << s
<< " name=" << s
->info
.inst
.name
760 << " v=" << version
<< dendl
;
762 _mark_dirty(s
, false);
764 replay_advance_version();
767 void SessionMap::replay_advance_version()
773 void SessionMap::replay_open_sessions(version_t event_cmapv
,
774 map
<client_t
,entity_inst_t
>& client_map
,
775 map
<client_t
,client_metadata_t
>& client_metadata_map
)
777 unsigned already_saved
;
779 if (version
+ client_map
.size() < event_cmapv
)
782 // Server::finish_force_open_sessions() marks sessions dirty one by one.
783 // Marking a session dirty may flush all existing dirty sessions. So it's
784 // possible that some sessions are already saved in sessionmap.
785 already_saved
= client_map
.size() - (event_cmapv
- version
);
786 for (const auto& p
: client_map
) {
787 Session
*s
= get_or_add_session(p
.second
);
788 auto q
= client_metadata_map
.find(p
.first
);
789 if (q
!= client_metadata_map
.end())
790 s
->info
.client_metadata
.merge(q
->second
);
792 if (already_saved
> 0) {
800 set_state(s
, Session::STATE_OPEN
);
801 replay_dirty_session(s
);
806 mds
->clog
->error() << "error replaying open sessions(" << client_map
.size()
807 << ") sessionmap v " << event_cmapv
<< " table " << version
;
808 ceph_assert(g_conf()->mds_wipe_sessions
);
809 mds
->sessionmap
.wipe();
810 mds
->sessionmap
.set_version(event_cmapv
);
813 version_t
SessionMap::mark_projected(Session
*s
)
815 dout(20) << __func__
<< " s=" << s
<< " name=" << s
->info
.inst
.name
816 << " pv=" << projected
<< " -> " << projected
+ 1 << dendl
;
818 s
->push_pv(projected
);
823 class C_IO_SM_Save_One
: public SessionMapIOContext
{
826 C_IO_SM_Save_One(SessionMap
*cm
, MDSContext
*on_safe_
)
827 : SessionMapIOContext(cm
), on_safe(on_safe_
) {}
828 void finish(int r
) override
{
830 get_mds()->handle_write_error(r
);
832 on_safe
->complete(r
);
835 void print(ostream
& out
) const override
{
836 out
<< "session_save_one";
842 void SessionMap::save_if_dirty(const std::set
<entity_name_t
> &tgt_sessions
,
843 MDSGatherBuilder
*gather_bld
)
845 ceph_assert(gather_bld
!= NULL
);
847 std::set
<entity_name_t
> to_blocklist
;
848 std::map
<entity_name_t
, bufferlist
> write_sessions
;
850 // Decide which sessions require a write
851 for (std::set
<entity_name_t
>::iterator i
= tgt_sessions
.begin();
852 i
!= tgt_sessions
.end(); ++i
) {
853 const entity_name_t
&session_id
= *i
;
855 if (session_map
.count(session_id
) == 0) {
856 // Session isn't around any more, never mind.
860 Session
*session
= session_map
[session_id
];
861 if (!session
->has_dirty_completed_requests()) {
862 // Session hasn't had completed_requests
863 // modified since last write, no need to
868 if (dirty_sessions
.count(session_id
) > 0) {
869 // Session is already dirtied, will be written, no
870 // need to pre-empt that.
876 if (!validate_and_encode_session(mds
, session
, bl
)) {
877 derr
<< __func__
<< ": session (" << session_id
<< ") exceeds"
878 << " sesion metadata threshold - blocklisting" << dendl
;
879 to_blocklist
.emplace(session_id
);
883 // Okay, passed all our checks, now we write
884 // this session out. The version we write
885 // into the OMAP may now be higher-versioned
886 // than the version in the header, but that's
887 // okay because it's never a problem to have
888 // an overly-fresh copy of a session.
889 write_sessions
.emplace(session_id
, std::move(bl
));
890 session
->clear_dirty_completed_requests();
893 dout(4) << __func__
<< ": writing " << write_sessions
.size() << dendl
;
895 // Batch writes into mds_sessionmap_keys_per_op
896 const uint32_t kpo
= g_conf()->mds_sessionmap_keys_per_op
;
897 map
<string
, bufferlist
> to_set
;
900 for (auto &[session_id
, bl
] : write_sessions
) {
902 CachedStackStringStream css
;
906 to_set
[css
->str()] = std::move(bl
);
908 // Complete this write transaction?
909 if (i
== write_sessions
.size() - 1
910 || i
% kpo
== kpo
- 1) {
913 to_set
.clear(); // clear to start a new transaction
916 object_t oid
= get_object_name();
917 object_locator_t
oloc(mds
->get_metadata_pool());
918 MDSContext
*on_safe
= gather_bld
->new_sub();
919 mds
->objecter
->mutate(oid
, oloc
, op
, snapc
,
920 ceph::real_clock::now(), 0,
922 new C_IO_SM_Save_One(this, on_safe
),
928 apply_blocklist(to_blocklist
);
929 logger
->inc(l_mdssm_metadata_threshold_sessions_evicted
, to_blocklist
.size());
936 #define dout_prefix *_dout << "Session "
939 * Calculate the length of the `requests` member list,
940 * because elist does not have a size() method.
944 size_t Session::get_request_count() const
947 for (auto p
= requests
.begin(); !p
.end(); ++p
)
953 * Capped in response to a CEPH_MSG_CLIENT_CAPRELEASE message,
954 * with n_caps equal to the number of caps that were released
955 * in the message. Used to update state about how many caps a
956 * client has released since it was last instructed to RECALL_STATE.
958 void Session::notify_cap_release(size_t n_caps
)
960 recall_caps
.hit(-(double)n_caps
);
961 release_caps
.hit(n_caps
);
965 * Called when a CEPH_MSG_CLIENT_SESSION->CEPH_SESSION_RECALL_STATE
966 * message is sent to the client. Update our recall-related state
967 * in order to generate health metrics if the session doesn't see
968 * a commensurate number of calls to ::notify_cap_release
970 uint64_t Session::notify_recall_sent(size_t new_limit
)
972 const auto num_caps
= caps
.size();
973 ceph_assert(new_limit
< num_caps
); // Behaviour of Server::recall_client_state
974 const auto count
= num_caps
-new_limit
;
976 if (recall_limit
!= new_limit
) {
979 new_change
= 0; /* no change! */
982 /* Always hit the session counter as a RECALL message is still sent to the
983 * client and we do not want the MDS to burn its global counter tokens on a
984 * session that is not releasing caps (i.e. allow the session counter to
985 * throttle future RECALL messages).
987 recall_caps_throttle
.hit(count
);
988 recall_caps_throttle2o
.hit(count
);
989 recall_caps
.hit(count
);
994 * Use client metadata to generate a somewhat-friendlier
995 * name for the client than its session ID.
997 * This is *not* guaranteed to be unique, and any machine
998 * consumers of session-related output should always use
999 * the session ID as a primary capacity and use this only
1000 * as a presentation hint.
1002 void Session::_update_human_name()
1004 auto info_client_metadata_entry
= info
.client_metadata
.find("hostname");
1005 if (info_client_metadata_entry
!= info
.client_metadata
.end()) {
1006 // Happy path, refer to clients by hostname
1007 human_name
= info_client_metadata_entry
->second
;
1008 if (!info
.auth_name
.has_default_id()) {
1009 // When a non-default entity ID is set by the user, assume they
1010 // would like to see it in references to the client, if it's
1011 // reasonable short. Limit the length because we don't want
1012 // to put e.g. uuid-generated names into a "human readable"
1014 const int arbitrarily_short
= 16;
1015 if (info
.auth_name
.get_id().size() < arbitrarily_short
) {
1016 human_name
+= std::string(":") + info
.auth_name
.get_id();
1020 // Fallback, refer to clients by ID e.g. client.4567
1021 human_name
= stringify(info
.inst
.name
.num());
1025 void Session::decode(bufferlist::const_iterator
&p
)
1029 free_prealloc_inos
= info
.prealloc_inos
;
1031 _update_human_name();
1034 int Session::check_access(CInode
*in
, unsigned mask
,
1035 int caller_uid
, int caller_gid
,
1036 const vector
<uint64_t> *caller_gid_list
,
1037 int new_uid
, int new_gid
)
1040 CInode
*diri
= NULL
;
1042 diri
= in
->get_projected_parent_dn()->get_dir()->get_inode();
1043 if (diri
&& diri
->is_stray()){
1044 path
= in
->get_projected_inode()->stray_prior_path
;
1045 dout(20) << __func__
<< " stray_prior_path " << path
<< dendl
;
1047 in
->make_path_string(path
, true);
1048 dout(20) << __func__
<< " path " << path
<< dendl
;
1051 path
= path
.substr(1); // drop leading /
1053 const auto& inode
= in
->get_inode();
1055 inode
->has_layout() &&
1056 inode
->layout
.pool_ns
.length() &&
1057 !connection
->has_feature(CEPH_FEATURE_FS_FILE_LAYOUT_V2
)) {
1058 dout(10) << __func__
<< " client doesn't support FS_FILE_LAYOUT_V2" << dendl
;
1062 if (!auth_caps
.is_capable(path
, inode
->uid
, inode
->gid
, inode
->mode
,
1063 caller_uid
, caller_gid
, caller_gid_list
, mask
,
1066 return -CEPHFS_EACCES
;
1071 // track total and per session load
1072 void SessionMap::hit_session(Session
*session
) {
1073 uint64_t sessions
= get_session_count_in_state(Session::STATE_OPEN
) +
1074 get_session_count_in_state(Session::STATE_STALE
) +
1075 get_session_count_in_state(Session::STATE_CLOSING
);
1076 ceph_assert(sessions
!= 0);
1078 double total_load
= total_load_avg
.hit();
1079 double avg_load
= total_load
/ sessions
;
1081 logger
->set(l_mdssm_total_load
, (uint64_t)total_load
);
1082 logger
->set(l_mdssm_avg_load
, (uint64_t)avg_load
);
1084 session
->hit_session();
1087 void SessionMap::handle_conf_change(const std::set
<std::string
>& changed
)
1089 auto apply_to_open_sessions
= [this](auto f
) {
1090 if (auto it
= by_state
.find(Session::STATE_OPEN
); it
!= by_state
.end()) {
1091 for (const auto &session
: *(it
->second
)) {
1095 if (auto it
= by_state
.find(Session::STATE_STALE
); it
!= by_state
.end()) {
1096 for (const auto &session
: *(it
->second
)) {
1102 if (changed
.count("mds_request_load_average_decay_rate")) {
1103 auto d
= g_conf().get_val
<double>("mds_request_load_average_decay_rate");
1106 total_load_avg
= DecayCounter(d
);
1108 auto mut
= [d
](auto s
) {
1109 s
->set_load_avg_decay_rate(d
);
1111 apply_to_open_sessions(mut
);
1113 if (changed
.count("mds_recall_max_decay_rate")) {
1114 auto d
= g_conf().get_val
<double>("mds_recall_max_decay_rate");
1115 auto mut
= [d
](auto s
) {
1116 s
->recall_caps_throttle
= DecayCounter(d
);
1118 apply_to_open_sessions(mut
);
1120 if (changed
.count("mds_recall_warning_decay_rate")) {
1121 auto d
= g_conf().get_val
<double>("mds_recall_warning_decay_rate");
1122 auto mut
= [d
](auto s
) {
1123 s
->recall_caps
= DecayCounter(d
);
1124 s
->release_caps
= DecayCounter(d
);
1126 apply_to_open_sessions(mut
);
1128 if (changed
.count("mds_session_cache_liveness_decay_rate")) {
1129 auto d
= g_conf().get_val
<double>("mds_session_cache_liveness_decay_rate");
1130 auto mut
= [d
](auto s
) {
1131 s
->session_cache_liveness
= DecayCounter(d
);
1132 s
->session_cache_liveness
.hit(s
->caps
.size()); /* so the MDS doesn't immediately start trimming a new session */
1134 apply_to_open_sessions(mut
);
1136 if (changed
.count("mds_session_cap_acquisition_decay_rate")) {
1137 auto d
= g_conf().get_val
<double>("mds_session_cap_acquisition_decay_rate");
1138 auto mut
= [d
](auto s
) {
1139 s
->cap_acquisition
= DecayCounter(d
);
1141 apply_to_open_sessions(mut
);
1144 if (changed
.count("mds_session_metadata_threshold")) {
1145 mds_session_metadata_threshold
= g_conf().get_val
<Option::size_t>("mds_session_metadata_threshold");
1149 void SessionMap::update_average_session_age() {
1150 if (!session_map
.size()) {
1154 double avg_uptime
= std::chrono::duration
<double>(clock::now()-avg_birth_time
).count();
1155 logger
->set(l_mdssm_avg_session_uptime
, (uint64_t)avg_uptime
);
1158 void SessionMap::apply_blocklist(const std::set
<entity_name_t
>& victims
) {
1159 if (victims
.empty()) {
1163 C_GatherBuilder
gather(g_ceph_context
, new C_MDSInternalNoop
);
1164 for (auto &victim
: victims
) {
1165 CachedStackStringStream css
;
1166 mds
->evict_client(victim
.num(), false, g_conf()->mds_session_blocklist_on_evict
, *css
,
1172 int SessionFilter::parse(
1173 const std::vector
<std::string
> &args
,
1176 ceph_assert(ss
!= NULL
);
1178 for (const auto &s
: args
) {
1179 dout(20) << __func__
<< " parsing filter '" << s
<< "'" << dendl
;
1181 auto eq
= s
.find("=");
1182 if (eq
== std::string::npos
|| eq
== s
.size()) {
1183 // allow this to be a bare id for compatibility with pre-octopus asok
1186 id
= strict_strtoll(s
.c_str(), 10, &err
);
1188 *ss
<< "Invalid filter '" << s
<< "'";
1189 return -CEPHFS_EINVAL
;
1194 // Keys that start with this are to be taken as referring
1195 // to freeform client metadata fields.
1196 const std::string
metadata_prefix("client_metadata.");
1198 auto k
= s
.substr(0, eq
);
1199 auto v
= s
.substr(eq
+ 1);
1201 dout(20) << __func__
<< " parsed k='" << k
<< "', v='" << v
<< "'" << dendl
;
1203 if (k
.compare(0, metadata_prefix
.size(), metadata_prefix
) == 0
1204 && k
.size() > metadata_prefix
.size()) {
1205 // Filter on arbitrary metadata key (no fixed schema for this,
1206 // so anything after the dot is a valid field to filter on)
1207 auto metadata_key
= k
.substr(metadata_prefix
.size());
1208 metadata
.insert(std::make_pair(metadata_key
, v
));
1209 } else if (k
== "auth_name") {
1210 // Filter on client entity name
1212 } else if (k
== "state") {
1214 } else if (k
== "id") {
1216 id
= strict_strtoll(v
.c_str(), 10, &err
);
1219 return -CEPHFS_EINVAL
;
1221 } else if (k
== "reconnecting") {
1224 * Strict boolean parser. Allow true/false/0/1.
1225 * Anything else is -CEPHFS_EINVAL.
1227 auto is_true
= [](std::string_view bstr
, bool *out
) -> bool
1229 ceph_assert(out
!= nullptr);
1231 if (bstr
== "true" || bstr
== "1") {
1234 } else if (bstr
== "false" || bstr
== "0") {
1238 return -CEPHFS_EINVAL
;
1243 int r
= is_true(v
, &bval
);
1245 set_reconnecting(bval
);
1247 *ss
<< "Invalid boolean value '" << v
<< "'";
1248 return -CEPHFS_EINVAL
;
1251 *ss
<< "Invalid filter key '" << k
<< "'";
1252 return -CEPHFS_EINVAL
;
1259 bool SessionFilter::match(
1260 const Session
&session
,
1261 std::function
<bool(client_t
)> is_reconnecting
) const
1263 for (const auto &m
: metadata
) {
1264 const auto &k
= m
.first
;
1265 const auto &v
= m
.second
;
1266 auto it
= session
.info
.client_metadata
.find(k
);
1267 if (it
== session
.info
.client_metadata
.end()) {
1270 if (it
->second
!= v
) {
1275 if (!auth_name
.empty() && auth_name
!= session
.info
.auth_name
.get_id()) {
1279 if (!state
.empty() && state
!= session
.get_state_name()) {
1283 if (id
!= 0 && id
!= session
.info
.inst
.name
.num()) {
1287 if (reconnecting
.first
) {
1288 const bool am_reconnecting
= is_reconnecting(session
.info
.inst
.name
.num());
1289 if (reconnecting
.second
!= am_reconnecting
) {
1297 std::ostream
& operator<<(std::ostream
&out
, const Session
&s
)
1299 if (s
.get_human_name() == stringify(s
.get_client())) {
1300 out
<< s
.get_human_name();
1302 out
<< s
.get_human_name() << " (" << std::dec
<< s
.get_client() << ")";