1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #ifndef CEPH_MDS_SESSIONMAP_H
16 #define CEPH_MDS_SESSIONMAP_H
21 #include "include/unordered_map.h"
23 #include "include/Context.h"
24 #include "include/xlist.h"
25 #include "include/elist.h"
26 #include "include/interval_set.h"
28 #include "mds/MDSAuthCaps.h"
29 #include "common/perf_counters.h"
30 #include "common/DecayCounter.h"
36 #include "Capability.h"
37 #include "msg/Message.h"
41 l_mdssm_session_count
,
43 l_mdssm_session_remove
,
45 l_mdssm_session_stale
,
48 l_mdssm_avg_session_uptime
,
56 class Session
: public RefCountedObject
{
61 <deleted> <-- closed <------------+
64 killing <-- opening <----+ |
67 stale <--> open --> closing ---+
69 + additional dimension of 'importing' (with counter)
73 using clock
= ceph::coarse_mono_clock
;
74 using time
= ceph::coarse_mono_time
;
79 STATE_OPENING
= 1, // journaling open
81 STATE_CLOSING
= 3, // journaling close
86 const char *get_state_name(int s
) const {
88 case STATE_CLOSED
: return "closed";
89 case STATE_OPENING
: return "opening";
90 case STATE_OPEN
: return "open";
91 case STATE_CLOSING
: return "closing";
92 case STATE_STALE
: return "stale";
93 case STATE_KILLING
: return "killing";
94 default: return "???";
99 int state
= STATE_CLOSED
;
100 uint64_t state_seq
= 0;
101 int importing_count
= 0;
102 friend class SessionMap
;
104 // Human (friendly) name is soft state generated from client metadata
105 void _update_human_name();
106 std::string human_name
;
108 // Versions in this session was projected: used to verify
109 // that appropriate mark_dirty calls follow.
110 std::deque
<version_t
> projected
;
112 // request load average for this session
113 mutable DecayCounter load_avg
;
114 DecayRate load_avg_rate
;
116 // Ephemeral state for tracking progress of capability recalls
117 // caps being recalled recently by this session; used for Beacon warnings
118 mutable DecayCounter recall_caps
;
119 // caps that have been released
120 mutable DecayCounter release_caps
;
121 // throttle on caps recalled
122 mutable DecayCounter recall_caps_throttle
;
123 // New limit in SESSION_RECALL
124 uint32_t recall_limit
= 0;
126 // session start time -- used to track average session time
127 // note that this is initialized in the constructor rather
128 // than at the time of adding a session to the sessionmap
129 // as journal replay of sessionmap will not call add_session().
134 void push_pv(version_t pv
)
136 assert(projected
.empty() || projected
.back() != pv
);
137 projected
.push_back(pv
);
140 void pop_pv(version_t v
)
142 assert(!projected
.empty());
143 assert(projected
.front() == v
);
144 projected
.pop_front();
147 int get_state() const { return state
; }
148 void set_state(int new_state
)
150 if (state
!= new_state
) {
155 void decode(bufferlist::iterator
&p
);
157 void set_client_metadata(T
&& meta
)
159 info
.client_metadata
= std::forward
<T
>(meta
);
160 _update_human_name();
162 std::string
get_human_name() const {return human_name
;}
164 session_info_t info
; ///< durable bits
166 MDSAuthCaps auth_caps
;
168 ConnectionRef connection
;
169 xlist
<Session
*>::item item_session_list
;
171 list
<Message
*> preopen_out_queue
; ///< messages for client, queued before they connect
173 elist
<MDRequestImpl
*> requests
;
174 size_t get_request_count();
176 interval_set
<inodeno_t
> pending_prealloc_inos
; // journaling prealloc, will be added to prealloc_inos
178 void notify_cap_release(size_t n_caps
);
179 uint64_t notify_recall_sent(size_t new_limit
);
180 double get_recall_caps_throttle() const {
181 return recall_caps_throttle
.get(ceph_clock_now());
183 double get_recall_caps() const {
184 return recall_caps
.get(ceph_clock_now());
186 double get_release_caps() const {
187 return release_caps
.get(ceph_clock_now());
190 inodeno_t
next_ino() const {
191 if (info
.prealloc_inos
.empty())
193 return info
.prealloc_inos
.range_start();
195 inodeno_t
take_ino(inodeno_t ino
= 0) {
196 assert(!info
.prealloc_inos
.empty());
199 if (info
.prealloc_inos
.contains(ino
))
200 info
.prealloc_inos
.erase(ino
);
205 ino
= info
.prealloc_inos
.range_start();
206 info
.prealloc_inos
.erase(ino
);
208 info
.used_inos
.insert(ino
, 1);
211 int get_num_projected_prealloc_inos() const {
212 return info
.prealloc_inos
.size() + pending_prealloc_inos
.size();
215 client_t
get_client() const {
216 return info
.get_client();
219 const char *get_state_name() const { return get_state_name(state
); }
220 uint64_t get_state_seq() const { return state_seq
; }
221 bool is_closed() const { return state
== STATE_CLOSED
; }
222 bool is_opening() const { return state
== STATE_OPENING
; }
223 bool is_open() const { return state
== STATE_OPEN
; }
224 bool is_closing() const { return state
== STATE_CLOSING
; }
225 bool is_stale() const { return state
== STATE_STALE
; }
226 bool is_killing() const { return state
== STATE_KILLING
; }
228 void inc_importing() {
231 void dec_importing() {
232 assert(importing_count
> 0);
235 bool is_importing() const { return importing_count
> 0; }
237 void set_load_avg_decay_rate(double rate
) {
238 assert(is_open() || is_stale());
239 load_avg_rate
.set_halflife(rate
);
241 uint64_t get_load_avg() const {
242 return (uint64_t)load_avg
.get(ceph_clock_now(), load_avg_rate
);
245 load_avg
.hit(ceph_clock_now(), load_avg_rate
);
248 double get_session_uptime() const {
249 chrono::duration
<double> uptime
= clock::now() - birth_time
;
250 return uptime
.count();
253 time
get_birth_time() const {
259 uint32_t cap_gen
= 0;
260 version_t cap_push_seq
= 0; // cap push seq #
261 map
<version_t
, list
<MDSInternalContextBase
*> > waitfor_flush
; // flush session messages
264 xlist
<Capability
*> caps
; // inodes with caps; front=most recently used
265 xlist
<ClientLease
*> leases
; // metadata leases to clients
266 time last_cap_renew
= time::min();
267 time last_seen
= time::min();
269 void inc_cap_gen() { ++cap_gen
; }
270 uint32_t get_cap_gen() const { return cap_gen
; }
272 version_t
inc_push_seq() { return ++cap_push_seq
; }
273 version_t
get_push_seq() const { return cap_push_seq
; }
275 version_t
wait_for_flush(MDSInternalContextBase
* c
) {
276 waitfor_flush
[get_push_seq()].push_back(c
);
277 return get_push_seq();
279 void finish_flush(version_t seq
, list
<MDSInternalContextBase
*>& ls
) {
280 while (!waitfor_flush
.empty()) {
281 if (waitfor_flush
.begin()->first
> seq
)
283 ls
.splice(ls
.end(), waitfor_flush
.begin()->second
);
284 waitfor_flush
.erase(waitfor_flush
.begin());
288 void touch_cap(Capability
*cap
) {
289 caps
.push_front(&cap
->item_session_caps
);
291 void touch_cap_bottom(Capability
*cap
) {
292 caps
.push_back(&cap
->item_session_caps
);
294 void touch_lease(ClientLease
*r
) {
295 leases
.push_back(&r
->item_session_lease
);
299 uint32_t lease_seq
= 0;
301 // -- completed requests --
303 // Has completed_requests been modified since the last time we
304 // wrote this session out?
305 bool completed_requests_dirty
= false;
307 unsigned num_trim_flushes_warnings
= 0;
308 unsigned num_trim_requests_warnings
= 0;
310 void add_completed_request(ceph_tid_t t
, inodeno_t created
) {
311 info
.completed_requests
[t
] = created
;
312 completed_requests_dirty
= true;
314 bool trim_completed_requests(ceph_tid_t mintid
) {
316 bool erased_any
= false;
317 while (!info
.completed_requests
.empty() &&
318 (mintid
== 0 || info
.completed_requests
.begin()->first
< mintid
)) {
319 info
.completed_requests
.erase(info
.completed_requests
.begin());
324 completed_requests_dirty
= true;
328 bool have_completed_request(ceph_tid_t tid
, inodeno_t
*pcreated
) const {
329 map
<ceph_tid_t
,inodeno_t
>::const_iterator p
= info
.completed_requests
.find(tid
);
330 if (p
== info
.completed_requests
.end())
333 *pcreated
= p
->second
;
337 void add_completed_flush(ceph_tid_t tid
) {
338 info
.completed_flushes
.insert(tid
);
340 bool trim_completed_flushes(ceph_tid_t mintid
) {
341 bool erased_any
= false;
342 while (!info
.completed_flushes
.empty() &&
343 (mintid
== 0 || *info
.completed_flushes
.begin() < mintid
)) {
344 info
.completed_flushes
.erase(info
.completed_flushes
.begin());
348 completed_requests_dirty
= true;
352 bool have_completed_flush(ceph_tid_t tid
) const {
353 return info
.completed_flushes
.count(tid
);
356 unsigned get_num_completed_flushes() const { return info
.completed_flushes
.size(); }
357 unsigned get_num_trim_flushes_warnings() const {
358 return num_trim_flushes_warnings
;
360 void inc_num_trim_flushes_warnings() { ++num_trim_flushes_warnings
; }
361 void reset_num_trim_flushes_warnings() { num_trim_flushes_warnings
= 0; }
363 unsigned get_num_completed_requests() const { return info
.completed_requests
.size(); }
364 unsigned get_num_trim_requests_warnings() const {
365 return num_trim_requests_warnings
;
367 void inc_num_trim_requests_warnings() { ++num_trim_requests_warnings
; }
368 void reset_num_trim_requests_warnings() { num_trim_requests_warnings
= 0; }
370 bool has_dirty_completed_requests() const
372 return completed_requests_dirty
;
375 void clear_dirty_completed_requests()
377 completed_requests_dirty
= false;
380 int check_access(CInode
*in
, unsigned mask
, int caller_uid
, int caller_gid
,
381 const vector
<uint64_t> *gid_list
, int new_uid
, int new_gid
);
384 Session(ConnectionRef con
) :
385 recall_caps(ceph_clock_now(), g_conf
->get_val
<double>("mds_recall_warning_decay_rate")),
386 release_caps(ceph_clock_now(), g_conf
->get_val
<double>("mds_recall_warning_decay_rate")),
387 recall_caps_throttle(ceph_clock_now(), g_conf
->get_val
<double>("mds_recall_max_decay_rate")),
388 birth_time(clock::now()),
389 auth_caps(g_ceph_context
),
390 item_session_list(this),
391 requests(0) // member_offset passed to front() manually
393 connection
= std::move(con
);
395 ~Session() override
{
396 if (state
== STATE_CLOSED
) {
397 item_session_list
.remove_myself();
399 assert(!item_session_list
.is_on_list());
401 while (!preopen_out_queue
.empty()) {
402 preopen_out_queue
.front()->put();
403 preopen_out_queue
.pop_front();
408 pending_prealloc_inos
.clear();
412 last_cap_renew
= time::min();
419 // First is whether to filter, second is filter value
420 std::pair
<bool, bool> reconnecting
;
423 std::map
<std::string
, std::string
> metadata
;
424 std::string auth_name
;
429 : reconnecting(false, false), id(0)
433 const Session
&session
,
434 std::function
<bool(client_t
)> is_reconnecting
) const;
435 int parse(const std::vector
<std::string
> &args
, std::stringstream
*ss
);
436 void set_reconnecting(bool v
)
438 reconnecting
.first
= true;
439 reconnecting
.second
= v
;
450 * Encapsulate the serialized state associated with SessionMap. Allows
451 * encode/decode outside of live MDS instance.
453 class SessionMapStore
{
455 using clock
= Session::clock
;
456 using time
= Session::time
;
460 ceph::unordered_map
<entity_name_t
, Session
*> session_map
;
461 PerfCounters
*logger
;
463 // total request load avg
465 DecayCounter total_load_avg
;
466 DecayRate total_load_avg_rate
;
471 version_t
get_version() const {return version
;}
473 virtual void encode_header(bufferlist
*header_bl
);
474 virtual void decode_header(bufferlist
&header_bl
);
475 virtual void decode_values(std::map
<std::string
, bufferlist
> &session_vals
);
476 virtual void decode_legacy(bufferlist::iterator
& blp
);
477 void dump(Formatter
*f
) const;
479 void set_rank(mds_rank_t r
)
484 Session
* get_or_add_session(const entity_inst_t
& i
) {
486 auto session_map_entry
= session_map
.find(i
.name
);
487 if (session_map_entry
!= session_map
.end()) {
488 s
= session_map_entry
->second
;
490 s
= session_map
[i
.name
] = new Session(ConnectionRef());
492 s
->last_cap_renew
= Session::clock::now();
494 logger
->set(l_mdssm_session_count
, session_map
.size());
495 logger
->inc(l_mdssm_session_add
);
502 static void generate_test_instances(list
<SessionMapStore
*>& ls
);
510 : version(0), logger(nullptr),
511 decay_rate(g_conf
->get_val
<double>("mds_request_load_average_decay_rate")),
512 total_load_avg_rate(decay_rate
), rank(MDS_RANK_NONE
) {
514 virtual ~SessionMapStore() {};
517 class SessionMap
: public SessionMapStore
{
522 version_t projected
= 0, committing
= 0, committed
= 0;
524 map
<int,xlist
<Session
*>* > by_state
;
525 uint64_t set_state(Session
*session
, int state
);
526 map
<version_t
, list
<MDSInternalContextBase
*> > commit_waiters
;
527 void update_average_session_age();
529 SessionMap() = delete;
530 explicit SessionMap(MDSRank
*m
) : mds(m
) {}
532 ~SessionMap() override
534 for (auto p
: by_state
)
538 g_ceph_context
->get_perfcounters_collection()->remove(logger
);
544 void register_perfcounters();
546 void set_version(const version_t v
)
548 version
= projected
= v
;
551 void set_projected(const version_t v
)
556 version_t
get_projected() const
561 version_t
get_committed() const
566 version_t
get_committing() const
572 void decode_legacy(bufferlist::iterator
& blp
) override
;
573 bool empty() const { return session_map
.empty(); }
574 const ceph::unordered_map
<entity_name_t
, Session
*> &get_sessions() const
579 bool is_any_state(int state
) const {
580 map
<int,xlist
<Session
*>* >::const_iterator p
= by_state
.find(state
);
581 if (p
== by_state
.end() || p
->second
->empty())
586 bool have_unclosed_sessions() const {
588 is_any_state(Session::STATE_OPENING
) ||
589 is_any_state(Session::STATE_OPEN
) ||
590 is_any_state(Session::STATE_CLOSING
) ||
591 is_any_state(Session::STATE_STALE
) ||
592 is_any_state(Session::STATE_KILLING
);
594 bool have_session(entity_name_t w
) const {
595 return session_map
.count(w
);
597 Session
* get_session(entity_name_t w
) {
598 auto session_map_entry
= session_map
.find(w
);
599 return (session_map_entry
!= session_map
.end() ?
600 session_map_entry
-> second
: nullptr);
602 const Session
* get_session(entity_name_t w
) const {
603 ceph::unordered_map
<entity_name_t
, Session
*>::const_iterator p
= session_map
.find(w
);
604 if (p
== session_map
.end()) {
611 void add_session(Session
*s
);
612 void remove_session(Session
*s
);
613 void touch_session(Session
*session
);
615 Session
*get_oldest_session(int state
) {
616 auto by_state_entry
= by_state
.find(state
);
617 if (by_state_entry
== by_state
.end() || by_state_entry
->second
->empty())
619 return by_state_entry
->second
->front();
625 void get_client_sessions(F
&& f
) const {
626 for (const auto& p
: session_map
) {
627 auto& session
= p
.second
;
628 if (session
->info
.inst
.name
.is_client())
633 void get_client_session_set(C
& c
) const {
634 auto f
= [&c
](Session
* s
) {
637 get_client_sessions(f
);
640 void replay_open_sessions(map
<client_t
,entity_inst_t
>& client_map
) {
641 for (map
<client_t
,entity_inst_t
>::iterator p
= client_map
.begin();
642 p
!= client_map
.end();
644 Session
*s
= get_or_add_session(p
->second
);
645 set_state(s
, Session::STATE_OPEN
);
646 replay_dirty_session(s
);
651 entity_inst_t
& get_inst(entity_name_t w
) {
652 assert(session_map
.count(w
));
653 return session_map
[w
]->info
.inst
;
655 version_t
inc_push_seq(client_t client
) {
656 return get_session(entity_name_t::CLIENT(client
.v
))->inc_push_seq();
658 version_t
get_push_seq(client_t client
) {
659 return get_session(entity_name_t::CLIENT(client
.v
))->get_push_seq();
661 bool have_completed_request(metareqid_t rid
) {
662 Session
*session
= get_session(rid
.name
);
663 return session
&& session
->have_completed_request(rid
.tid
, NULL
);
665 void trim_completed_requests(entity_name_t c
, ceph_tid_t tid
) {
666 Session
*session
= get_session(c
);
668 session
->trim_completed_requests(tid
);
672 void wipe_ino_prealloc();
674 // -- loading, saving --
676 list
<MDSInternalContextBase
*> waiting_for_load
;
678 object_t
get_object_name() const;
680 void load(MDSInternalContextBase
*onload
);
686 bufferlist
&header_bl
,
687 std::map
<std::string
, bufferlist
> &session_vals
,
688 bool more_session_vals
);
691 void _load_legacy_finish(int r
, bufferlist
&bl
);
693 void save(MDSInternalContextBase
*onsave
, version_t needv
=0);
694 void _save_finish(version_t v
);
697 std::set
<entity_name_t
> dirty_sessions
;
698 std::set
<entity_name_t
> null_sessions
;
699 bool loaded_legacy
= false;
700 void _mark_dirty(Session
*session
);
704 * Advance the version, and mark this session
705 * as dirty within the new version.
707 * Dirty means journalled but needing writeback
708 * to the backing store. Must have called
709 * mark_projected previously for this session.
711 void mark_dirty(Session
*session
);
714 * Advance the projected version, and mark this
715 * session as projected within the new version
717 * Projected means the session is updated in memory
718 * but we're waiting for the journal write of the update
719 * to finish. Must subsequently call mark_dirty
720 * for sessions in the same global order as calls
723 version_t
mark_projected(Session
*session
);
726 * During replay, advance versions to account
727 * for a session modification, and mark the
730 void replay_dirty_session(Session
*session
);
733 * During replay, if a session no longer present
734 * would have consumed a version, advance `version`
735 * and `projected` to account for that.
737 void replay_advance_version();
740 * For these session IDs, if a session exists with this ID, and it has
741 * dirty completed_requests, then persist it immediately
742 * (ahead of usual project/dirty versioned writes
745 void save_if_dirty(const std::set
<entity_name_t
> &tgt_sessions
,
746 MDSGatherBuilder
*gather_bld
);
749 time avg_birth_time
= time::min();
751 uint64_t get_session_count_in_state(int state
) {
752 return !is_any_state(state
) ? 0 : by_state
[state
]->size();
755 void update_average_birth_time(const Session
&s
, bool added
=true) {
756 uint32_t sessions
= session_map
.size();
757 time birth_time
= s
.get_birth_time();
760 avg_birth_time
= added
? birth_time
: time::min();
765 avg_birth_time
= clock::time_point(
766 ((avg_birth_time
- time::min()) / sessions
) * (sessions
- 1) +
767 (birth_time
- time::min()) / sessions
);
769 avg_birth_time
= clock::time_point(
770 ((avg_birth_time
- time::min()) / (sessions
- 1)) * sessions
-
771 (birth_time
- time::min()) / (sessions
- 1));
776 void hit_session(Session
*session
);
777 void handle_conf_change(const struct md_config_t
*conf
,
778 const std::set
<std::string
> &changed
);
781 std::ostream
& operator<<(std::ostream
&out
, const Session
&s
);