1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #ifndef CEPH_MDS_SESSIONMAP_H
16 #define CEPH_MDS_SESSIONMAP_H
20 #include "include/unordered_map.h"
22 #include "include/Context.h"
23 #include "include/xlist.h"
24 #include "include/elist.h"
25 #include "include/interval_set.h"
27 #include "mds/MDSAuthCaps.h"
28 #include "common/perf_counters.h"
29 #include "common/DecayCounter.h"
32 #include "Capability.h"
33 #include "MDSContext.h"
34 #include "msg/Message.h"
40 l_mdssm_session_count
,
42 l_mdssm_session_remove
,
44 l_mdssm_session_stale
,
47 l_mdssm_avg_session_uptime
,
57 class Session
: public RefCountedObject
{
62 <deleted> <-- closed <------------+
65 killing <-- opening <----+ |
68 stale <--> open --> closing ---+
70 + additional dimension of 'importing' (with counter)
74 using clock
= ceph::coarse_mono_clock
;
75 using time
= ceph::coarse_mono_time
;
79 STATE_OPENING
= 1, // journaling open
81 STATE_CLOSING
= 3, // journaling close
87 Session(ConnectionRef con
) :
88 item_session_list(this),
89 requests(member_offset(MDRequestImpl
, item_session_request
)),
90 recall_caps(g_conf().get_val
<double>("mds_recall_warning_decay_rate")),
91 release_caps(g_conf().get_val
<double>("mds_recall_warning_decay_rate")),
92 recall_caps_throttle(g_conf().get_val
<double>("mds_recall_max_decay_rate")),
93 recall_caps_throttle2o(0.5),
94 session_cache_liveness(g_conf().get_val
<double>("mds_session_cache_liveness_decay_rate")),
95 cap_acquisition(g_conf().get_val
<double>("mds_session_cap_acquisition_decay_rate")),
96 birth_time(clock::now())
98 set_connection(std::move(con
));
100 ~Session() override
{
101 ceph_assert(!item_session_list
.is_on_list());
102 preopen_out_queue
.clear();
105 static std::string_view
get_state_name(int s
) {
107 case STATE_CLOSED
: return "closed";
108 case STATE_OPENING
: return "opening";
109 case STATE_OPEN
: return "open";
110 case STATE_CLOSING
: return "closing";
111 case STATE_STALE
: return "stale";
112 case STATE_KILLING
: return "killing";
113 default: return "???";
117 void dump(ceph::Formatter
*f
, bool cap_dump
=false) const;
118 void push_pv(version_t pv
)
120 ceph_assert(projected
.empty() || projected
.back() != pv
);
121 projected
.push_back(pv
);
124 void pop_pv(version_t v
)
126 ceph_assert(!projected
.empty());
127 ceph_assert(projected
.front() == v
);
128 projected
.pop_front();
131 int get_state() const { return state
; }
132 void set_state(int new_state
)
134 if (state
!= new_state
) {
140 void set_reconnecting(bool s
) { reconnecting
= s
; }
142 void decode(ceph::buffer::list::const_iterator
&p
);
144 void set_client_metadata(T
&& meta
)
146 info
.client_metadata
= std::forward
<T
>(meta
);
147 _update_human_name();
150 const std::string
& get_human_name() const {return human_name
;}
152 size_t get_request_count() const;
154 void notify_cap_release(size_t n_caps
);
155 uint64_t notify_recall_sent(size_t new_limit
);
156 auto get_recall_caps_throttle() const {
157 return recall_caps_throttle
.get();
159 auto get_recall_caps_throttle2o() const {
160 return recall_caps_throttle2o
.get();
162 auto get_recall_caps() const {
163 return recall_caps
.get();
165 auto get_release_caps() const {
166 return release_caps
.get();
168 auto get_session_cache_liveness() const {
169 return session_cache_liveness
.get();
171 auto get_cap_acquisition() const {
172 return cap_acquisition
.get();
175 inodeno_t
take_ino(inodeno_t ino
= 0) {
177 if (!info
.prealloc_inos
.contains(ino
))
179 if (delegated_inos
.contains(ino
)) {
180 delegated_inos
.erase(ino
);
181 } else if (free_prealloc_inos
.contains(ino
)) {
182 free_prealloc_inos
.erase(ino
);
186 } else if (!free_prealloc_inos
.empty()) {
187 ino
= free_prealloc_inos
.range_start();
188 free_prealloc_inos
.erase(ino
);
193 void delegate_inos(int want
, interval_set
<inodeno_t
>& inos
) {
194 want
-= (int)delegated_inos
.size();
198 for (auto it
= free_prealloc_inos
.begin(); it
!= free_prealloc_inos
.end(); ) {
199 if (want
< (int)it
.get_len()) {
200 inos
.insert(it
.get_start(), (inodeno_t
)want
);
201 delegated_inos
.insert(it
.get_start(), (inodeno_t
)want
);
202 free_prealloc_inos
.erase(it
.get_start(), (inodeno_t
)want
);
205 want
-= (int)it
.get_len();
206 inos
.insert(it
.get_start(), it
.get_len());
207 delegated_inos
.insert(it
.get_start(), it
.get_len());
208 free_prealloc_inos
.erase(it
++);
214 // sans any delegated ones
215 int get_num_prealloc_inos() const {
216 return free_prealloc_inos
.size();
219 int get_num_projected_prealloc_inos() const {
220 return get_num_prealloc_inos() + pending_prealloc_inos
.size();
223 client_t
get_client() const {
224 return info
.get_client();
227 std::string_view
get_state_name() const { return get_state_name(state
); }
228 uint64_t get_state_seq() const { return state_seq
; }
229 bool is_closed() const { return state
== STATE_CLOSED
; }
230 bool is_opening() const { return state
== STATE_OPENING
; }
231 bool is_open() const { return state
== STATE_OPEN
; }
232 bool is_closing() const { return state
== STATE_CLOSING
; }
233 bool is_stale() const { return state
== STATE_STALE
; }
234 bool is_killing() const { return state
== STATE_KILLING
; }
236 void inc_importing() {
239 void dec_importing() {
240 ceph_assert(importing_count
> 0);
243 bool is_importing() const { return importing_count
> 0; }
245 void set_load_avg_decay_rate(double rate
) {
246 ceph_assert(is_open() || is_stale());
247 load_avg
= DecayCounter(rate
);
249 uint64_t get_load_avg() const {
250 return (uint64_t)load_avg
.get();
256 double get_session_uptime() const {
257 std::chrono::duration
<double> uptime
= clock::now() - birth_time
;
258 return uptime
.count();
261 time
get_birth_time() const {
265 void inc_cap_gen() { ++cap_gen
; }
266 uint32_t get_cap_gen() const { return cap_gen
; }
268 version_t
inc_push_seq() { return ++cap_push_seq
; }
269 version_t
get_push_seq() const { return cap_push_seq
; }
271 version_t
wait_for_flush(MDSContext
* c
) {
272 waitfor_flush
[get_push_seq()].push_back(c
);
273 return get_push_seq();
275 void finish_flush(version_t seq
, MDSContext::vec
& ls
) {
276 while (!waitfor_flush
.empty()) {
277 auto it
= waitfor_flush
.begin();
280 auto& v
= it
->second
;
281 ls
.insert(ls
.end(), v
.begin(), v
.end());
282 waitfor_flush
.erase(it
);
286 void touch_readdir_cap(uint32_t count
) {
287 cap_acquisition
.hit(count
);
290 void touch_cap(Capability
*cap
) {
291 session_cache_liveness
.hit(1.0);
292 caps
.push_front(&cap
->item_session_caps
);
295 void touch_cap_bottom(Capability
*cap
) {
296 session_cache_liveness
.hit(1.0);
297 caps
.push_back(&cap
->item_session_caps
);
300 void touch_lease(ClientLease
*r
) {
301 session_cache_liveness
.hit(1.0);
302 leases
.push_back(&r
->item_session_lease
);
305 bool is_any_flush_waiter() {
306 return !waitfor_flush
.empty();
309 void add_completed_request(ceph_tid_t t
, inodeno_t created
) {
310 info
.completed_requests
[t
] = created
;
311 completed_requests_dirty
= true;
313 bool trim_completed_requests(ceph_tid_t mintid
) {
315 bool erased_any
= false;
316 while (!info
.completed_requests
.empty() &&
317 (mintid
== 0 || info
.completed_requests
.begin()->first
< mintid
)) {
318 info
.completed_requests
.erase(info
.completed_requests
.begin());
323 completed_requests_dirty
= true;
327 bool have_completed_request(ceph_tid_t tid
, inodeno_t
*pcreated
) const {
328 auto p
= info
.completed_requests
.find(tid
);
329 if (p
== info
.completed_requests
.end())
332 *pcreated
= p
->second
;
336 void add_completed_flush(ceph_tid_t tid
) {
337 info
.completed_flushes
.insert(tid
);
339 bool trim_completed_flushes(ceph_tid_t mintid
) {
340 bool erased_any
= false;
341 while (!info
.completed_flushes
.empty() &&
342 (mintid
== 0 || *info
.completed_flushes
.begin() < mintid
)) {
343 info
.completed_flushes
.erase(info
.completed_flushes
.begin());
347 completed_requests_dirty
= true;
351 bool have_completed_flush(ceph_tid_t tid
) const {
352 return info
.completed_flushes
.count(tid
);
355 uint64_t get_num_caps() const {
359 unsigned get_num_completed_flushes() const { return info
.completed_flushes
.size(); }
360 unsigned get_num_trim_flushes_warnings() const {
361 return num_trim_flushes_warnings
;
363 void inc_num_trim_flushes_warnings() { ++num_trim_flushes_warnings
; }
364 void reset_num_trim_flushes_warnings() { num_trim_flushes_warnings
= 0; }
366 unsigned get_num_completed_requests() const { return info
.completed_requests
.size(); }
367 unsigned get_num_trim_requests_warnings() const {
368 return num_trim_requests_warnings
;
370 void inc_num_trim_requests_warnings() { ++num_trim_requests_warnings
; }
371 void reset_num_trim_requests_warnings() { num_trim_requests_warnings
= 0; }
373 bool has_dirty_completed_requests() const
375 return completed_requests_dirty
;
378 void clear_dirty_completed_requests()
380 completed_requests_dirty
= false;
383 int check_access(CInode
*in
, unsigned mask
, int caller_uid
, int caller_gid
,
384 const std::vector
<uint64_t> *gid_list
, int new_uid
, int new_gid
);
386 bool fs_name_capable(std::string_view fs_name
, unsigned mask
) const {
387 return auth_caps
.fs_name_capable(fs_name
, mask
);
390 void set_connection(ConnectionRef con
) {
391 connection
= std::move(con
);
392 auto& c
= connection
;
394 info
.auth_name
= c
->get_peer_entity_name();
395 info
.inst
.addr
= c
->get_peer_socket_addr();
396 info
.inst
.name
= entity_name_t(c
->get_peer_type(), c
->get_peer_global_id());
399 const ConnectionRef
& get_connection() const {
404 pending_prealloc_inos
.clear();
405 free_prealloc_inos
.clear();
406 delegated_inos
.clear();
410 last_cap_renew
= clock::zero();
413 Session
*reclaiming_from
= nullptr;
414 session_info_t info
; ///< durable bits
415 MDSAuthCaps auth_caps
;
417 xlist
<Session
*>::item item_session_list
;
419 std::list
<ceph::ref_t
<Message
>> preopen_out_queue
; ///< messages for client, queued before they connect
421 /* This is mutable to allow get_request_count to be const. elist does not
422 * support const iterators yet.
424 mutable elist
<MDRequestImpl
*> requests
;
426 interval_set
<inodeno_t
> pending_prealloc_inos
; // journaling prealloc, will be added to prealloc_inos
427 interval_set
<inodeno_t
> free_prealloc_inos
; //
428 interval_set
<inodeno_t
> delegated_inos
; // hand these out to client
430 xlist
<Capability
*> caps
; // inodes with caps; front=most recently used
431 xlist
<ClientLease
*> leases
; // metadata leases to clients
432 time last_cap_renew
= clock::zero();
433 time last_seen
= clock::zero();
436 uint32_t lease_seq
= 0;
439 ConnectionRef connection
;
442 friend class SessionMap
;
444 // Human (friendly) name is soft state generated from client metadata
445 void _update_human_name();
447 int state
= STATE_CLOSED
;
448 bool reconnecting
= false;
449 uint64_t state_seq
= 0;
450 int importing_count
= 0;
452 std::string human_name
;
454 // Versions in this session was projected: used to verify
455 // that appropriate mark_dirty calls follow.
456 std::deque
<version_t
> projected
;
458 // request load average for this session
459 DecayCounter load_avg
;
461 // Ephemeral state for tracking progress of capability recalls
462 // caps being recalled recently by this session; used for Beacon warnings
463 DecayCounter recall_caps
; // caps that have been released
464 DecayCounter release_caps
;
465 // throttle on caps recalled
466 DecayCounter recall_caps_throttle
;
467 // second order throttle that prevents recalling too quickly
468 DecayCounter recall_caps_throttle2o
;
469 // New limit in SESSION_RECALL
470 uint32_t recall_limit
= 0;
472 // session caps liveness
473 DecayCounter session_cache_liveness
;
475 // cap acquisition via readdir
476 DecayCounter cap_acquisition
;
478 // session start time -- used to track average session time
479 // note that this is initialized in the constructor rather
480 // than at the time of adding a session to the sessionmap
481 // as journal replay of sessionmap will not call add_session().
485 uint32_t cap_gen
= 0;
486 version_t cap_push_seq
= 0; // cap push seq #
487 std::map
<version_t
, MDSContext::vec
> waitfor_flush
; // flush session messages
489 // Has completed_requests been modified since the last time we
490 // wrote this session out?
491 bool completed_requests_dirty
= false;
493 unsigned num_trim_flushes_warnings
= 0;
494 unsigned num_trim_requests_warnings
= 0;
500 SessionFilter() : reconnecting(false, false) {}
503 const Session
&session
,
504 std::function
<bool(client_t
)> is_reconnecting
) const;
505 int parse(const std::vector
<std::string
> &args
, std::ostream
*ss
);
506 void set_reconnecting(bool v
)
508 reconnecting
.first
= true;
509 reconnecting
.second
= v
;
512 std::map
<std::string
, std::string
> metadata
;
513 std::string auth_name
;
517 // First is whether to filter, second is filter value
518 std::pair
<bool, bool> reconnecting
;
528 * Encapsulate the serialized state associated with SessionMap. Allows
529 * encode/decode outside of live MDS instance.
531 class SessionMapStore
{
533 using clock
= Session::clock
;
534 using time
= Session::time
;
536 SessionMapStore(): total_load_avg(decay_rate
) {}
537 virtual ~SessionMapStore() {};
539 version_t
get_version() const {return version
;}
541 virtual void encode_header(ceph::buffer::list
*header_bl
);
542 virtual void decode_header(ceph::buffer::list
&header_bl
);
543 virtual void decode_values(std::map
<std::string
, ceph::buffer::list
> &session_vals
);
544 virtual void decode_legacy(ceph::buffer::list::const_iterator
& blp
);
545 void dump(ceph::Formatter
*f
) const;
547 void set_rank(mds_rank_t r
)
552 Session
* get_or_add_session(const entity_inst_t
& i
) {
554 auto session_map_entry
= session_map
.find(i
.name
);
555 if (session_map_entry
!= session_map
.end()) {
556 s
= session_map_entry
->second
;
558 s
= session_map
[i
.name
] = new Session(ConnectionRef());
560 s
->last_cap_renew
= Session::clock::now();
562 logger
->set(l_mdssm_session_count
, session_map
.size());
563 logger
->inc(l_mdssm_session_add
);
570 static void generate_test_instances(std::list
<SessionMapStore
*>& ls
);
577 mds_rank_t rank
= MDS_RANK_NONE
;
580 version_t version
= 0;
581 ceph::unordered_map
<entity_name_t
, Session
*> session_map
;
582 PerfCounters
*logger
=nullptr;
584 // total request load avg
585 double decay_rate
= g_conf().get_val
<double>("mds_request_load_average_decay_rate");
586 DecayCounter total_load_avg
;
589 class SessionMap
: public SessionMapStore
{
591 SessionMap() = delete;
592 explicit SessionMap(MDSRank
*m
) : mds(m
) {}
594 ~SessionMap() override
596 for (auto p
: by_state
)
600 g_ceph_context
->get_perfcounters_collection()->remove(logger
);
606 uint64_t set_state(Session
*session
, int state
);
607 void update_average_session_age();
609 void register_perfcounters();
611 void set_version(const version_t v
)
613 version
= projected
= v
;
616 void set_projected(const version_t v
)
621 version_t
get_projected() const
626 version_t
get_committed() const
631 version_t
get_committing() const
637 void decode_legacy(ceph::buffer::list::const_iterator
& blp
) override
;
638 bool empty() const { return session_map
.empty(); }
639 const auto& get_sessions() const {
643 bool is_any_state(int state
) const {
644 auto it
= by_state
.find(state
);
645 if (it
== by_state
.end() || it
->second
->empty())
650 bool have_unclosed_sessions() const {
652 is_any_state(Session::STATE_OPENING
) ||
653 is_any_state(Session::STATE_OPEN
) ||
654 is_any_state(Session::STATE_CLOSING
) ||
655 is_any_state(Session::STATE_STALE
) ||
656 is_any_state(Session::STATE_KILLING
);
658 bool have_session(entity_name_t w
) const {
659 return session_map
.count(w
);
661 Session
* get_session(entity_name_t w
) {
662 auto session_map_entry
= session_map
.find(w
);
663 return (session_map_entry
!= session_map
.end() ?
664 session_map_entry
-> second
: nullptr);
666 const Session
* get_session(entity_name_t w
) const {
667 ceph::unordered_map
<entity_name_t
, Session
*>::const_iterator p
= session_map
.find(w
);
668 if (p
== session_map
.end()) {
675 void add_session(Session
*s
);
676 void remove_session(Session
*s
);
677 void touch_session(Session
*session
);
679 Session
*get_oldest_session(int state
) {
680 auto by_state_entry
= by_state
.find(state
);
681 if (by_state_entry
== by_state
.end() || by_state_entry
->second
->empty())
683 return by_state_entry
->second
->front();
689 void get_client_sessions(F
&& f
) const {
690 for (const auto& p
: session_map
) {
691 auto& session
= p
.second
;
692 if (session
->info
.inst
.name
.is_client())
697 void get_client_session_set(C
& c
) const {
698 auto f
= [&c
](auto& s
) {
701 get_client_sessions(f
);
705 entity_inst_t
& get_inst(entity_name_t w
) {
706 ceph_assert(session_map
.count(w
));
707 return session_map
[w
]->info
.inst
;
709 version_t
get_push_seq(client_t client
) {
710 return get_session(entity_name_t::CLIENT(client
.v
))->get_push_seq();
712 bool have_completed_request(metareqid_t rid
) {
713 Session
*session
= get_session(rid
.name
);
714 return session
&& session
->have_completed_request(rid
.tid
, NULL
);
716 void trim_completed_requests(entity_name_t c
, ceph_tid_t tid
) {
717 Session
*session
= get_session(c
);
718 ceph_assert(session
);
719 session
->trim_completed_requests(tid
);
723 void wipe_ino_prealloc();
725 object_t
get_object_name() const;
727 void load(MDSContext
*onload
);
733 ceph::buffer::list
&header_bl
,
734 std::map
<std::string
, ceph::buffer::list
> &session_vals
,
735 bool more_session_vals
);
738 void _load_legacy_finish(int r
, ceph::buffer::list
&bl
);
740 void save(MDSContext
*onsave
, version_t needv
=0);
741 void _save_finish(version_t v
);
744 * Advance the version, and mark this session
745 * as dirty within the new version.
747 * Dirty means journalled but needing writeback
748 * to the backing store. Must have called
749 * mark_projected previously for this session.
751 void mark_dirty(Session
*session
, bool may_save
=true);
754 * Advance the projected version, and mark this
755 * session as projected within the new version
757 * Projected means the session is updated in memory
758 * but we're waiting for the journal write of the update
759 * to finish. Must subsequently call mark_dirty
760 * for sessions in the same global order as calls
763 version_t
mark_projected(Session
*session
);
766 * During replay, advance versions to account
767 * for a session modification, and mark the
770 void replay_dirty_session(Session
*session
);
773 * During replay, if a session no longer present
774 * would have consumed a version, advance `version`
775 * and `projected` to account for that.
777 void replay_advance_version();
780 * During replay, open sessions, advance versions and
781 * mark these sessions as dirty.
783 void replay_open_sessions(version_t event_cmapv
,
784 std::map
<client_t
,entity_inst_t
>& client_map
,
785 std::map
<client_t
,client_metadata_t
>& client_metadata_map
);
788 * For these session IDs, if a session exists with this ID, and it has
789 * dirty completed_requests, then persist it immediately
790 * (ahead of usual project/dirty versioned writes
793 void save_if_dirty(const std::set
<entity_name_t
> &tgt_sessions
,
794 MDSGatherBuilder
*gather_bld
);
796 void hit_session(Session
*session
);
797 void handle_conf_change(const std::set
<std::string
> &changed
);
800 std::map
<int,xlist
<Session
*>*> by_state
;
801 std::map
<version_t
, MDSContext::vec
> commit_waiters
;
803 // -- loading, saving --
805 MDSContext::vec waiting_for_load
;
808 void _mark_dirty(Session
*session
, bool may_save
);
810 version_t projected
= 0, committing
= 0, committed
= 0;
811 std::set
<entity_name_t
> dirty_sessions
;
812 std::set
<entity_name_t
> null_sessions
;
813 bool loaded_legacy
= false;
816 uint64_t get_session_count_in_state(int state
) {
817 return !is_any_state(state
) ? 0 : by_state
[state
]->size();
820 void update_average_birth_time(const Session
&s
, bool added
=true) {
821 uint32_t sessions
= session_map
.size();
822 time birth_time
= s
.get_birth_time();
825 avg_birth_time
= added
? birth_time
: clock::zero();
830 avg_birth_time
= clock::time_point(
831 ((avg_birth_time
- clock::zero()) / sessions
) * (sessions
- 1) +
832 (birth_time
- clock::zero()) / sessions
);
834 avg_birth_time
= clock::time_point(
835 ((avg_birth_time
- clock::zero()) / (sessions
- 1)) * sessions
-
836 (birth_time
- clock::zero()) / (sessions
- 1));
840 time avg_birth_time
= clock::zero();
843 std::ostream
& operator<<(std::ostream
&out
, const Session
&s
);