1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #ifndef CEPH_MDS_SESSIONMAP_H
16 #define CEPH_MDS_SESSIONMAP_H
21 #include "include/unordered_map.h"
23 #include "include/Context.h"
24 #include "include/xlist.h"
25 #include "include/elist.h"
26 #include "include/interval_set.h"
28 #include "mds/MDSAuthCaps.h"
29 #include "common/perf_counters.h"
35 #include "Capability.h"
36 #include "msg/Message.h"
40 l_mdssm_session_count
,
42 l_mdssm_session_remove
,
50 class Session
: public RefCountedObject
{
55 <deleted> <-- closed <------------+
58 killing <-- opening <----+ |
61 stale <--> open --> closing ---+
63 + additional dimension of 'importing' (with counter)
68 STATE_OPENING
= 1, // journaling open
70 STATE_CLOSING
= 3, // journaling close
75 const char *get_state_name(int s
) const {
77 case STATE_CLOSED
: return "closed";
78 case STATE_OPENING
: return "opening";
79 case STATE_OPEN
: return "open";
80 case STATE_CLOSING
: return "closing";
81 case STATE_STALE
: return "stale";
82 case STATE_KILLING
: return "killing";
83 default: return "???";
91 friend class SessionMap
;
93 // Human (friendly) name is soft state generated from client metadata
94 void _update_human_name();
95 std::string human_name
;
97 // Versions in this session was projected: used to verify
98 // that appropriate mark_dirty calls follow.
99 std::deque
<version_t
> projected
;
105 void push_pv(version_t pv
)
107 assert(projected
.empty() || projected
.back() != pv
);
108 projected
.push_back(pv
);
111 void pop_pv(version_t v
)
113 assert(!projected
.empty());
114 assert(projected
.front() == v
);
115 projected
.pop_front();
118 int get_state() const { return state
; }
119 void set_state(int new_state
)
121 if (state
!= new_state
) {
126 void decode(bufferlist::iterator
&p
);
127 void set_client_metadata(std::map
<std::string
, std::string
> const &meta
);
128 std::string
get_human_name() const {return human_name
;}
130 // Ephemeral state for tracking progress of capability recalls
131 utime_t recalled_at
; // When was I asked to SESSION_RECALL?
132 utime_t last_recall_sent
;
133 uint32_t recall_count
; // How many caps was I asked to SESSION_RECALL?
134 uint32_t recall_release_count
; // How many caps have I actually revoked?
136 session_info_t info
; ///< durable bits
138 MDSAuthCaps auth_caps
;
140 ConnectionRef connection
;
141 xlist
<Session
*>::item item_session_list
;
143 list
<Message
*> preopen_out_queue
; ///< messages for client, queued before they connect
145 elist
<MDRequestImpl
*> requests
;
146 size_t get_request_count();
148 interval_set
<inodeno_t
> pending_prealloc_inos
; // journaling prealloc, will be added to prealloc_inos
150 void notify_cap_release(size_t n_caps
);
151 void notify_recall_sent(const size_t new_limit
);
152 void clear_recalled_at();
154 inodeno_t
next_ino() const {
155 if (info
.prealloc_inos
.empty())
157 return info
.prealloc_inos
.range_start();
159 inodeno_t
take_ino(inodeno_t ino
= 0) {
160 assert(!info
.prealloc_inos
.empty());
163 if (info
.prealloc_inos
.contains(ino
))
164 info
.prealloc_inos
.erase(ino
);
169 ino
= info
.prealloc_inos
.range_start();
170 info
.prealloc_inos
.erase(ino
);
172 info
.used_inos
.insert(ino
, 1);
175 int get_num_projected_prealloc_inos() const {
176 return info
.prealloc_inos
.size() + pending_prealloc_inos
.size();
179 client_t
get_client() const {
180 return info
.get_client();
183 const char *get_state_name() const { return get_state_name(state
); }
184 uint64_t get_state_seq() const { return state_seq
; }
185 bool is_closed() const { return state
== STATE_CLOSED
; }
186 bool is_opening() const { return state
== STATE_OPENING
; }
187 bool is_open() const { return state
== STATE_OPEN
; }
188 bool is_closing() const { return state
== STATE_CLOSING
; }
189 bool is_stale() const { return state
== STATE_STALE
; }
190 bool is_killing() const { return state
== STATE_KILLING
; }
192 void inc_importing() {
195 void dec_importing() {
196 assert(importing_count
> 0);
199 bool is_importing() const { return importing_count
> 0; }
203 version_t cap_push_seq
; // cap push seq #
204 map
<version_t
, list
<MDSInternalContextBase
*> > waitfor_flush
; // flush session messages
207 xlist
<Capability
*> caps
; // inodes with caps; front=most recently used
208 xlist
<ClientLease
*> leases
; // metadata leases to clients
209 utime_t last_cap_renew
;
212 version_t
inc_push_seq() { return ++cap_push_seq
; }
213 version_t
get_push_seq() const { return cap_push_seq
; }
215 version_t
wait_for_flush(MDSInternalContextBase
* c
) {
216 waitfor_flush
[get_push_seq()].push_back(c
);
217 return get_push_seq();
219 void finish_flush(version_t seq
, list
<MDSInternalContextBase
*>& ls
) {
220 while (!waitfor_flush
.empty()) {
221 if (waitfor_flush
.begin()->first
> seq
)
223 ls
.splice(ls
.end(), waitfor_flush
.begin()->second
);
224 waitfor_flush
.erase(waitfor_flush
.begin());
228 void add_cap(Capability
*cap
) {
229 caps
.push_back(&cap
->item_session_caps
);
231 void touch_lease(ClientLease
*r
) {
232 leases
.push_back(&r
->item_session_lease
);
238 // -- completed requests --
240 // Has completed_requests been modified since the last time we
241 // wrote this session out?
242 bool completed_requests_dirty
;
244 unsigned num_trim_flushes_warnings
;
245 unsigned num_trim_requests_warnings
;
247 void add_completed_request(ceph_tid_t t
, inodeno_t created
) {
248 info
.completed_requests
[t
] = created
;
249 completed_requests_dirty
= true;
251 bool trim_completed_requests(ceph_tid_t mintid
) {
253 bool erased_any
= false;
254 while (!info
.completed_requests
.empty() &&
255 (mintid
== 0 || info
.completed_requests
.begin()->first
< mintid
)) {
256 info
.completed_requests
.erase(info
.completed_requests
.begin());
261 completed_requests_dirty
= true;
265 bool have_completed_request(ceph_tid_t tid
, inodeno_t
*pcreated
) const {
266 map
<ceph_tid_t
,inodeno_t
>::const_iterator p
= info
.completed_requests
.find(tid
);
267 if (p
== info
.completed_requests
.end())
270 *pcreated
= p
->second
;
274 void add_completed_flush(ceph_tid_t tid
) {
275 info
.completed_flushes
.insert(tid
);
277 bool trim_completed_flushes(ceph_tid_t mintid
) {
278 bool erased_any
= false;
279 while (!info
.completed_flushes
.empty() &&
280 (mintid
== 0 || *info
.completed_flushes
.begin() < mintid
)) {
281 info
.completed_flushes
.erase(info
.completed_flushes
.begin());
285 completed_requests_dirty
= true;
289 bool have_completed_flush(ceph_tid_t tid
) const {
290 return info
.completed_flushes
.count(tid
);
293 unsigned get_num_completed_flushes() const { return info
.completed_flushes
.size(); }
294 unsigned get_num_trim_flushes_warnings() const {
295 return num_trim_flushes_warnings
;
297 void inc_num_trim_flushes_warnings() { ++num_trim_flushes_warnings
; }
298 void reset_num_trim_flushes_warnings() { num_trim_flushes_warnings
= 0; }
300 unsigned get_num_completed_requests() const { return info
.completed_requests
.size(); }
301 unsigned get_num_trim_requests_warnings() const {
302 return num_trim_requests_warnings
;
304 void inc_num_trim_requests_warnings() { ++num_trim_requests_warnings
; }
305 void reset_num_trim_requests_warnings() { num_trim_requests_warnings
= 0; }
307 bool has_dirty_completed_requests() const
309 return completed_requests_dirty
;
312 void clear_dirty_completed_requests()
314 completed_requests_dirty
= false;
317 int check_access(CInode
*in
, unsigned mask
, int caller_uid
, int caller_gid
,
318 const vector
<uint64_t> *gid_list
, int new_uid
, int new_gid
);
322 state(STATE_CLOSED
), state_seq(0), importing_count(0),
323 recall_count(0), recall_release_count(0),
324 auth_caps(g_ceph_context
),
325 connection(NULL
), item_session_list(this),
326 requests(0), // member_offset passed to front() manually
329 completed_requests_dirty(false),
330 num_trim_flushes_warnings(0),
331 num_trim_requests_warnings(0) { }
332 ~Session() override
{
333 assert(!item_session_list
.is_on_list());
334 while (!preopen_out_queue
.empty()) {
335 preopen_out_queue
.front()->put();
336 preopen_out_queue
.pop_front();
341 pending_prealloc_inos
.clear();
345 last_cap_renew
= utime_t();
353 // First is whether to filter, second is filter value
354 std::pair
<bool, bool> reconnecting
;
357 std::map
<std::string
, std::string
> metadata
;
358 std::string auth_name
;
363 : reconnecting(false, false), id(0)
367 const Session
&session
,
368 std::function
<bool(client_t
)> is_reconnecting
) const;
369 int parse(const std::vector
<std::string
> &args
, std::stringstream
*ss
);
370 void set_reconnecting(bool v
)
372 reconnecting
.first
= true;
373 reconnecting
.second
= v
;
384 * Encapsulate the serialized state associated with SessionMap. Allows
385 * encode/decode outside of live MDS instance.
387 class SessionMapStore
{
390 ceph::unordered_map
<entity_name_t
, Session
*> session_map
;
391 PerfCounters
*logger
;
395 version_t
get_version() const {return version
;}
397 virtual void encode_header(bufferlist
*header_bl
);
398 virtual void decode_header(bufferlist
&header_bl
);
399 virtual void decode_values(std::map
<std::string
, bufferlist
> &session_vals
);
400 virtual void decode_legacy(bufferlist::iterator
& blp
);
401 void dump(Formatter
*f
) const;
403 void set_rank(mds_rank_t r
)
408 Session
* get_or_add_session(const entity_inst_t
& i
) {
410 auto session_map_entry
= session_map
.find(i
.name
);
411 if (session_map_entry
!= session_map
.end()) {
412 s
= session_map_entry
->second
;
414 s
= session_map
[i
.name
] = new Session
;
416 s
->last_cap_renew
= ceph_clock_now();
418 logger
->set(l_mdssm_session_count
, session_map
.size());
419 logger
->inc(l_mdssm_session_add
);
426 static void generate_test_instances(list
<SessionMapStore
*>& ls
);
433 SessionMapStore() : version(0), logger(nullptr), rank(MDS_RANK_NONE
) {}
434 virtual ~SessionMapStore() {};
437 class SessionMap
: public SessionMapStore
{
442 version_t projected
, committing
, committed
;
444 map
<int,xlist
<Session
*>* > by_state
;
445 uint64_t set_state(Session
*session
, int state
);
446 map
<version_t
, list
<MDSInternalContextBase
*> > commit_waiters
;
448 explicit SessionMap(MDSRank
*m
) : mds(m
),
449 projected(0), committing(0), committed(0),
453 ~SessionMap() override
455 for (auto p
: by_state
)
459 g_ceph_context
->get_perfcounters_collection()->remove(logger
);
465 void register_perfcounters();
467 void set_version(const version_t v
)
469 version
= projected
= v
;
472 void set_projected(const version_t v
)
477 version_t
get_projected() const
482 version_t
get_committed() const
487 version_t
get_committing() const
493 void decode_legacy(bufferlist::iterator
& blp
) override
;
494 bool empty() const { return session_map
.empty(); }
495 const ceph::unordered_map
<entity_name_t
, Session
*> &get_sessions() const
500 bool is_any_state(int state
) const {
501 map
<int,xlist
<Session
*>* >::const_iterator p
= by_state
.find(state
);
502 if (p
== by_state
.end() || p
->second
->empty())
507 bool have_unclosed_sessions() const {
509 is_any_state(Session::STATE_OPENING
) ||
510 is_any_state(Session::STATE_OPEN
) ||
511 is_any_state(Session::STATE_CLOSING
) ||
512 is_any_state(Session::STATE_STALE
) ||
513 is_any_state(Session::STATE_KILLING
);
515 bool have_session(entity_name_t w
) const {
516 return session_map
.count(w
);
518 Session
* get_session(entity_name_t w
) {
519 auto session_map_entry
= session_map
.find(w
);
520 return (session_map_entry
!= session_map
.end() ?
521 session_map_entry
-> second
: nullptr);
523 const Session
* get_session(entity_name_t w
) const {
524 ceph::unordered_map
<entity_name_t
, Session
*>::const_iterator p
= session_map
.find(w
);
525 if (p
== session_map
.end()) {
532 void add_session(Session
*s
);
533 void remove_session(Session
*s
);
534 void touch_session(Session
*session
);
536 Session
*get_oldest_session(int state
) {
537 auto by_state_entry
= by_state
.find(state
);
538 if (by_state_entry
== by_state
.end() || by_state_entry
->second
->empty())
540 return by_state_entry
->second
->front();
545 void get_client_set(set
<client_t
>& s
) {
546 for (ceph::unordered_map
<entity_name_t
,Session
*>::iterator p
= session_map
.begin();
547 p
!= session_map
.end();
549 if (p
->second
->info
.inst
.name
.is_client())
550 s
.insert(p
->second
->info
.inst
.name
.num());
552 void get_client_session_set(set
<Session
*>& s
) const {
553 for (ceph::unordered_map
<entity_name_t
,Session
*>::const_iterator p
= session_map
.begin();
554 p
!= session_map
.end();
556 if (p
->second
->info
.inst
.name
.is_client())
560 void open_sessions(map
<client_t
,entity_inst_t
>& client_map
) {
561 for (map
<client_t
,entity_inst_t
>::iterator p
= client_map
.begin();
562 p
!= client_map
.end();
564 Session
*s
= get_or_add_session(p
->second
);
565 set_state(s
, Session::STATE_OPEN
);
571 entity_inst_t
& get_inst(entity_name_t w
) {
572 assert(session_map
.count(w
));
573 return session_map
[w
]->info
.inst
;
575 version_t
inc_push_seq(client_t client
) {
576 return get_session(entity_name_t::CLIENT(client
.v
))->inc_push_seq();
578 version_t
get_push_seq(client_t client
) {
579 return get_session(entity_name_t::CLIENT(client
.v
))->get_push_seq();
581 bool have_completed_request(metareqid_t rid
) {
582 Session
*session
= get_session(rid
.name
);
583 return session
&& session
->have_completed_request(rid
.tid
, NULL
);
585 void trim_completed_requests(entity_name_t c
, ceph_tid_t tid
) {
586 Session
*session
= get_session(c
);
588 session
->trim_completed_requests(tid
);
592 void wipe_ino_prealloc();
594 // -- loading, saving --
596 list
<MDSInternalContextBase
*> waiting_for_load
;
598 object_t
get_object_name() const;
600 void load(MDSInternalContextBase
*onload
);
606 bufferlist
&header_bl
,
607 std::map
<std::string
, bufferlist
> &session_vals
,
608 bool more_session_vals
);
611 void _load_legacy_finish(int r
, bufferlist
&bl
);
613 void save(MDSInternalContextBase
*onsave
, version_t needv
=0);
614 void _save_finish(version_t v
);
617 std::set
<entity_name_t
> dirty_sessions
;
618 std::set
<entity_name_t
> null_sessions
;
620 void _mark_dirty(Session
*session
);
624 * Advance the version, and mark this session
625 * as dirty within the new version.
627 * Dirty means journalled but needing writeback
628 * to the backing store. Must have called
629 * mark_projected previously for this session.
631 void mark_dirty(Session
*session
);
634 * Advance the projected version, and mark this
635 * session as projected within the new version
637 * Projected means the session is updated in memory
638 * but we're waiting for the journal write of the update
639 * to finish. Must subsequently call mark_dirty
640 * for sessions in the same global order as calls
643 version_t
mark_projected(Session
*session
);
646 * During replay, advance versions to account
647 * for a session modification, and mark the
650 void replay_dirty_session(Session
*session
);
653 * During replay, if a session no longer present
654 * would have consumed a version, advance `version`
655 * and `projected` to account for that.
657 void replay_advance_version();
660 * For these session IDs, if a session exists with this ID, and it has
661 * dirty completed_requests, then persist it immediately
662 * (ahead of usual project/dirty versioned writes
665 void save_if_dirty(const std::set
<entity_name_t
> &tgt_sessions
,
666 MDSGatherBuilder
*gather_bld
);
669 std::ostream
& operator<<(std::ostream
&out
, const Session
&s
);