]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/SessionMap.h
217b4ba1511ef78aa312086dc8ddb4e441f315af
[ceph.git] / ceph / src / mds / SessionMap.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_MDS_SESSIONMAP_H
16 #define CEPH_MDS_SESSIONMAP_H
17
18 #include <set>
19 using std::set;
20
21 #include "include/unordered_map.h"
22
23 #include "include/Context.h"
24 #include "include/xlist.h"
25 #include "include/elist.h"
26 #include "include/interval_set.h"
27 #include "mdstypes.h"
28 #include "mds/MDSAuthCaps.h"
29 #include "common/perf_counters.h"
30
31 class CInode;
32 struct MDRequestImpl;
33 class DecayCounter;
34
35 #include "CInode.h"
36 #include "Capability.h"
37 #include "msg/Message.h"
38
39 enum {
40 l_mdssm_first = 5500,
41 l_mdssm_session_count,
42 l_mdssm_session_add,
43 l_mdssm_session_remove,
44 l_mdssm_session_open,
45 l_mdssm_session_stale,
46 l_mdssm_total_load,
47 l_mdssm_avg_load,
48 l_mdssm_avg_session_uptime,
49 l_mdssm_last,
50 };
51
52 /*
53 * session
54 */
55
56 class Session : public RefCountedObject {
57 // -- state etc --
58 public:
59 /*
60
61 <deleted> <-- closed <------------+
62 ^ | |
63 | v |
64 killing <-- opening <----+ |
65 ^ | | |
66 | v | |
67 stale <--> open --> closing ---+
68
69 + additional dimension of 'importing' (with counter)
70
71 */
72
73 using clock = ceph::coarse_mono_clock;
74 using time = ceph::coarse_mono_time;
75
76
77 enum {
78 STATE_CLOSED = 0,
79 STATE_OPENING = 1, // journaling open
80 STATE_OPEN = 2,
81 STATE_CLOSING = 3, // journaling close
82 STATE_STALE = 4,
83 STATE_KILLING = 5
84 };
85
86 const char *get_state_name(int s) const {
87 switch (s) {
88 case STATE_CLOSED: return "closed";
89 case STATE_OPENING: return "opening";
90 case STATE_OPEN: return "open";
91 case STATE_CLOSING: return "closing";
92 case STATE_STALE: return "stale";
93 case STATE_KILLING: return "killing";
94 default: return "???";
95 }
96 }
97
98 private:
99 int state;
100 uint64_t state_seq;
101 int importing_count;
102 friend class SessionMap;
103
104 // Human (friendly) name is soft state generated from client metadata
105 void _update_human_name();
106 std::string human_name;
107
108 // Versions in this session was projected: used to verify
109 // that appropriate mark_dirty calls follow.
110 std::deque<version_t> projected;
111
112 // request load average for this session
113 mutable DecayCounter load_avg;
114 DecayRate load_avg_rate;
115
116 // session start time -- used to track average session time
117 // note that this is initialized in the constructor rather
118 // than at the time of adding a session to the sessionmap
119 // as journal replay of sessionmap will not call add_session().
120 time birth_time;
121
122 public:
123
124 void push_pv(version_t pv)
125 {
126 assert(projected.empty() || projected.back() != pv);
127 projected.push_back(pv);
128 }
129
130 void pop_pv(version_t v)
131 {
132 assert(!projected.empty());
133 assert(projected.front() == v);
134 projected.pop_front();
135 }
136
137 int get_state() const { return state; }
138 void set_state(int new_state)
139 {
140 if (state != new_state) {
141 state = new_state;
142 state_seq++;
143 }
144 }
145 void decode(bufferlist::iterator &p);
146 void set_client_metadata(std::map<std::string, std::string> const &meta);
147 std::string get_human_name() const {return human_name;}
148
149 // Ephemeral state for tracking progress of capability recalls
150 time recalled_at = time::min(); // When was I asked to SESSION_RECALL?
151 time last_recall_sent = time::min();
152 uint32_t recall_count; // How many caps was I asked to SESSION_RECALL?
153 uint32_t recall_release_count; // How many caps have I actually revoked?
154
155 session_info_t info; ///< durable bits
156
157 MDSAuthCaps auth_caps;
158
159 ConnectionRef connection;
160 xlist<Session*>::item item_session_list;
161
162 list<Message*> preopen_out_queue; ///< messages for client, queued before they connect
163
164 elist<MDRequestImpl*> requests;
165 size_t get_request_count();
166
167 interval_set<inodeno_t> pending_prealloc_inos; // journaling prealloc, will be added to prealloc_inos
168
169 void notify_cap_release(size_t n_caps);
170 void notify_recall_sent(const size_t new_limit);
171 void clear_recalled_at();
172
173 inodeno_t next_ino() const {
174 if (info.prealloc_inos.empty())
175 return 0;
176 return info.prealloc_inos.range_start();
177 }
178 inodeno_t take_ino(inodeno_t ino = 0) {
179 assert(!info.prealloc_inos.empty());
180
181 if (ino) {
182 if (info.prealloc_inos.contains(ino))
183 info.prealloc_inos.erase(ino);
184 else
185 ino = 0;
186 }
187 if (!ino) {
188 ino = info.prealloc_inos.range_start();
189 info.prealloc_inos.erase(ino);
190 }
191 info.used_inos.insert(ino, 1);
192 return ino;
193 }
194 int get_num_projected_prealloc_inos() const {
195 return info.prealloc_inos.size() + pending_prealloc_inos.size();
196 }
197
198 client_t get_client() const {
199 return info.get_client();
200 }
201
202 const char *get_state_name() const { return get_state_name(state); }
203 uint64_t get_state_seq() const { return state_seq; }
204 bool is_closed() const { return state == STATE_CLOSED; }
205 bool is_opening() const { return state == STATE_OPENING; }
206 bool is_open() const { return state == STATE_OPEN; }
207 bool is_closing() const { return state == STATE_CLOSING; }
208 bool is_stale() const { return state == STATE_STALE; }
209 bool is_killing() const { return state == STATE_KILLING; }
210
211 void inc_importing() {
212 ++importing_count;
213 }
214 void dec_importing() {
215 assert(importing_count > 0);
216 --importing_count;
217 }
218 bool is_importing() const { return importing_count > 0; }
219
220 void set_load_avg_decay_rate(double rate) {
221 assert(is_open() || is_stale());
222 load_avg_rate.set_halflife(rate);
223 }
224 uint64_t get_load_avg() const {
225 return (uint64_t)load_avg.get(ceph_clock_now(), load_avg_rate);
226 }
227 void hit_session() {
228 load_avg.hit(ceph_clock_now(), load_avg_rate);
229 }
230
231 double get_session_uptime() const {
232 chrono::duration<double> uptime = clock::now() - birth_time;
233 return uptime.count();
234 }
235
236 time get_birth_time() const {
237 return birth_time;
238 }
239
240 // -- caps --
241 private:
242 version_t cap_push_seq; // cap push seq #
243 map<version_t, list<MDSInternalContextBase*> > waitfor_flush; // flush session messages
244
245 public:
246 xlist<Capability*> caps; // inodes with caps; front=most recently used
247 xlist<ClientLease*> leases; // metadata leases to clients
248 time last_cap_renew = time::min();
249
250 public:
251 version_t inc_push_seq() { return ++cap_push_seq; }
252 version_t get_push_seq() const { return cap_push_seq; }
253
254 version_t wait_for_flush(MDSInternalContextBase* c) {
255 waitfor_flush[get_push_seq()].push_back(c);
256 return get_push_seq();
257 }
258 void finish_flush(version_t seq, list<MDSInternalContextBase*>& ls) {
259 while (!waitfor_flush.empty()) {
260 if (waitfor_flush.begin()->first > seq)
261 break;
262 ls.splice(ls.end(), waitfor_flush.begin()->second);
263 waitfor_flush.erase(waitfor_flush.begin());
264 }
265 }
266
267 void add_cap(Capability *cap) {
268 caps.push_back(&cap->item_session_caps);
269 }
270 void touch_lease(ClientLease *r) {
271 leases.push_back(&r->item_session_lease);
272 }
273
274 // -- leases --
275 uint32_t lease_seq;
276
277 // -- completed requests --
278 private:
279 // Has completed_requests been modified since the last time we
280 // wrote this session out?
281 bool completed_requests_dirty;
282
283 unsigned num_trim_flushes_warnings;
284 unsigned num_trim_requests_warnings;
285 public:
286 void add_completed_request(ceph_tid_t t, inodeno_t created) {
287 info.completed_requests[t] = created;
288 completed_requests_dirty = true;
289 }
290 bool trim_completed_requests(ceph_tid_t mintid) {
291 // trim
292 bool erased_any = false;
293 while (!info.completed_requests.empty() &&
294 (mintid == 0 || info.completed_requests.begin()->first < mintid)) {
295 info.completed_requests.erase(info.completed_requests.begin());
296 erased_any = true;
297 }
298
299 if (erased_any) {
300 completed_requests_dirty = true;
301 }
302 return erased_any;
303 }
304 bool have_completed_request(ceph_tid_t tid, inodeno_t *pcreated) const {
305 map<ceph_tid_t,inodeno_t>::const_iterator p = info.completed_requests.find(tid);
306 if (p == info.completed_requests.end())
307 return false;
308 if (pcreated)
309 *pcreated = p->second;
310 return true;
311 }
312
313 void add_completed_flush(ceph_tid_t tid) {
314 info.completed_flushes.insert(tid);
315 }
316 bool trim_completed_flushes(ceph_tid_t mintid) {
317 bool erased_any = false;
318 while (!info.completed_flushes.empty() &&
319 (mintid == 0 || *info.completed_flushes.begin() < mintid)) {
320 info.completed_flushes.erase(info.completed_flushes.begin());
321 erased_any = true;
322 }
323 if (erased_any) {
324 completed_requests_dirty = true;
325 }
326 return erased_any;
327 }
328 bool have_completed_flush(ceph_tid_t tid) const {
329 return info.completed_flushes.count(tid);
330 }
331
332 unsigned get_num_completed_flushes() const { return info.completed_flushes.size(); }
333 unsigned get_num_trim_flushes_warnings() const {
334 return num_trim_flushes_warnings;
335 }
336 void inc_num_trim_flushes_warnings() { ++num_trim_flushes_warnings; }
337 void reset_num_trim_flushes_warnings() { num_trim_flushes_warnings = 0; }
338
339 unsigned get_num_completed_requests() const { return info.completed_requests.size(); }
340 unsigned get_num_trim_requests_warnings() const {
341 return num_trim_requests_warnings;
342 }
343 void inc_num_trim_requests_warnings() { ++num_trim_requests_warnings; }
344 void reset_num_trim_requests_warnings() { num_trim_requests_warnings = 0; }
345
346 bool has_dirty_completed_requests() const
347 {
348 return completed_requests_dirty;
349 }
350
351 void clear_dirty_completed_requests()
352 {
353 completed_requests_dirty = false;
354 }
355
356 int check_access(CInode *in, unsigned mask, int caller_uid, int caller_gid,
357 const vector<uint64_t> *gid_list, int new_uid, int new_gid);
358
359
360 Session() :
361 state(STATE_CLOSED), state_seq(0), importing_count(0),
362 birth_time(clock::now()), recall_count(0),
363 recall_release_count(0), auth_caps(g_ceph_context),
364 connection(NULL), item_session_list(this),
365 requests(0), // member_offset passed to front() manually
366 cap_push_seq(0),
367 lease_seq(0),
368 completed_requests_dirty(false),
369 num_trim_flushes_warnings(0),
370 num_trim_requests_warnings(0) { }
371 ~Session() override {
372 if (state == STATE_CLOSED) {
373 item_session_list.remove_myself();
374 } else {
375 assert(!item_session_list.is_on_list());
376 }
377 while (!preopen_out_queue.empty()) {
378 preopen_out_queue.front()->put();
379 preopen_out_queue.pop_front();
380 }
381 }
382
383 void clear() {
384 pending_prealloc_inos.clear();
385 info.clear_meta();
386
387 cap_push_seq = 0;
388 last_cap_renew = time::min();
389 }
390 };
391
392 class SessionFilter
393 {
394 protected:
395 // First is whether to filter, second is filter value
396 std::pair<bool, bool> reconnecting;
397
398 public:
399 std::map<std::string, std::string> metadata;
400 std::string auth_name;
401 std::string state;
402 int64_t id;
403
404 SessionFilter()
405 : reconnecting(false, false), id(0)
406 {}
407
408 bool match(
409 const Session &session,
410 std::function<bool(client_t)> is_reconnecting) const;
411 int parse(const std::vector<std::string> &args, std::stringstream *ss);
412 void set_reconnecting(bool v)
413 {
414 reconnecting.first = true;
415 reconnecting.second = v;
416 }
417 };
418
419 /*
420 * session map
421 */
422
423 class MDSRank;
424
425 /**
426 * Encapsulate the serialized state associated with SessionMap. Allows
427 * encode/decode outside of live MDS instance.
428 */
429 class SessionMapStore {
430 public:
431 using clock = Session::clock;
432 using time = Session::time;
433
434 protected:
435 version_t version;
436 ceph::unordered_map<entity_name_t, Session*> session_map;
437 PerfCounters *logger;
438
439 // total request load avg
440 double decay_rate;
441 DecayCounter total_load_avg;
442 DecayRate total_load_avg_rate;
443
444 public:
445 mds_rank_t rank;
446
447 version_t get_version() const {return version;}
448
449 virtual void encode_header(bufferlist *header_bl);
450 virtual void decode_header(bufferlist &header_bl);
451 virtual void decode_values(std::map<std::string, bufferlist> &session_vals);
452 virtual void decode_legacy(bufferlist::iterator& blp);
453 void dump(Formatter *f) const;
454
455 void set_rank(mds_rank_t r)
456 {
457 rank = r;
458 }
459
460 Session* get_or_add_session(const entity_inst_t& i) {
461 Session *s;
462 auto session_map_entry = session_map.find(i.name);
463 if (session_map_entry != session_map.end()) {
464 s = session_map_entry->second;
465 } else {
466 s = session_map[i.name] = new Session;
467 s->info.inst = i;
468 s->last_cap_renew = Session::clock::now();
469 if (logger) {
470 logger->set(l_mdssm_session_count, session_map.size());
471 logger->inc(l_mdssm_session_add);
472 }
473 }
474
475 return s;
476 }
477
478 static void generate_test_instances(list<SessionMapStore*>& ls);
479
480 void reset_state()
481 {
482 session_map.clear();
483 }
484
485 SessionMapStore()
486 : version(0), logger(nullptr),
487 decay_rate(g_conf->get_val<double>("mds_request_load_average_decay_rate")),
488 total_load_avg_rate(decay_rate), rank(MDS_RANK_NONE) {
489 }
490 virtual ~SessionMapStore() {};
491 };
492
493 class SessionMap : public SessionMapStore {
494 public:
495 MDSRank *mds;
496
497 protected:
498 version_t projected, committing, committed;
499 public:
500 map<int,xlist<Session*>* > by_state;
501 uint64_t set_state(Session *session, int state);
502 map<version_t, list<MDSInternalContextBase*> > commit_waiters;
503 void update_average_session_age();
504
505 explicit SessionMap(MDSRank *m) : mds(m),
506 projected(0), committing(0), committed(0),
507 loaded_legacy(false)
508 { }
509
510 ~SessionMap() override
511 {
512 for (auto p : by_state)
513 delete p.second;
514
515 if (logger) {
516 g_ceph_context->get_perfcounters_collection()->remove(logger);
517 }
518
519 delete logger;
520 }
521
522 void register_perfcounters();
523
524 void set_version(const version_t v)
525 {
526 version = projected = v;
527 }
528
529 void set_projected(const version_t v)
530 {
531 projected = v;
532 }
533
534 version_t get_projected() const
535 {
536 return projected;
537 }
538
539 version_t get_committed() const
540 {
541 return committed;
542 }
543
544 version_t get_committing() const
545 {
546 return committing;
547 }
548
549 // sessions
550 void decode_legacy(bufferlist::iterator& blp) override;
551 bool empty() const { return session_map.empty(); }
552 const ceph::unordered_map<entity_name_t, Session*> &get_sessions() const
553 {
554 return session_map;
555 }
556
557 bool is_any_state(int state) const {
558 map<int,xlist<Session*>* >::const_iterator p = by_state.find(state);
559 if (p == by_state.end() || p->second->empty())
560 return false;
561 return true;
562 }
563
564 bool have_unclosed_sessions() const {
565 return
566 is_any_state(Session::STATE_OPENING) ||
567 is_any_state(Session::STATE_OPEN) ||
568 is_any_state(Session::STATE_CLOSING) ||
569 is_any_state(Session::STATE_STALE) ||
570 is_any_state(Session::STATE_KILLING);
571 }
572 bool have_session(entity_name_t w) const {
573 return session_map.count(w);
574 }
575 Session* get_session(entity_name_t w) {
576 auto session_map_entry = session_map.find(w);
577 return (session_map_entry != session_map.end() ?
578 session_map_entry-> second : nullptr);
579 }
580 const Session* get_session(entity_name_t w) const {
581 ceph::unordered_map<entity_name_t, Session*>::const_iterator p = session_map.find(w);
582 if (p == session_map.end()) {
583 return NULL;
584 } else {
585 return p->second;
586 }
587 }
588
589 void add_session(Session *s);
590 void remove_session(Session *s);
591 void touch_session(Session *session);
592
593 Session *get_oldest_session(int state) {
594 auto by_state_entry = by_state.find(state);
595 if (by_state_entry == by_state.end() || by_state_entry->second->empty())
596 return 0;
597 return by_state_entry->second->front();
598 }
599
600 void dump();
601
602 void get_client_session_set(set<Session*>& s) const {
603 for (ceph::unordered_map<entity_name_t,Session*>::const_iterator p = session_map.begin();
604 p != session_map.end();
605 ++p)
606 if (p->second->info.inst.name.is_client())
607 s.insert(p->second);
608 }
609
610 void replay_open_sessions(map<client_t,entity_inst_t>& client_map) {
611 for (map<client_t,entity_inst_t>::iterator p = client_map.begin();
612 p != client_map.end();
613 ++p) {
614 Session *s = get_or_add_session(p->second);
615 set_state(s, Session::STATE_OPEN);
616 replay_dirty_session(s);
617 }
618 }
619
620 // helpers
621 entity_inst_t& get_inst(entity_name_t w) {
622 assert(session_map.count(w));
623 return session_map[w]->info.inst;
624 }
625 version_t inc_push_seq(client_t client) {
626 return get_session(entity_name_t::CLIENT(client.v))->inc_push_seq();
627 }
628 version_t get_push_seq(client_t client) {
629 return get_session(entity_name_t::CLIENT(client.v))->get_push_seq();
630 }
631 bool have_completed_request(metareqid_t rid) {
632 Session *session = get_session(rid.name);
633 return session && session->have_completed_request(rid.tid, NULL);
634 }
635 void trim_completed_requests(entity_name_t c, ceph_tid_t tid) {
636 Session *session = get_session(c);
637 assert(session);
638 session->trim_completed_requests(tid);
639 }
640
641 void wipe();
642 void wipe_ino_prealloc();
643
644 // -- loading, saving --
645 inodeno_t ino;
646 list<MDSInternalContextBase*> waiting_for_load;
647
648 object_t get_object_name() const;
649
650 void load(MDSInternalContextBase *onload);
651 void _load_finish(
652 int operation_r,
653 int header_r,
654 int values_r,
655 bool first,
656 bufferlist &header_bl,
657 std::map<std::string, bufferlist> &session_vals,
658 bool more_session_vals);
659
660 void load_legacy();
661 void _load_legacy_finish(int r, bufferlist &bl);
662
663 void save(MDSInternalContextBase *onsave, version_t needv=0);
664 void _save_finish(version_t v);
665
666 protected:
667 std::set<entity_name_t> dirty_sessions;
668 std::set<entity_name_t> null_sessions;
669 bool loaded_legacy;
670 void _mark_dirty(Session *session);
671 public:
672
673 /**
674 * Advance the version, and mark this session
675 * as dirty within the new version.
676 *
677 * Dirty means journalled but needing writeback
678 * to the backing store. Must have called
679 * mark_projected previously for this session.
680 */
681 void mark_dirty(Session *session);
682
683 /**
684 * Advance the projected version, and mark this
685 * session as projected within the new version
686 *
687 * Projected means the session is updated in memory
688 * but we're waiting for the journal write of the update
689 * to finish. Must subsequently call mark_dirty
690 * for sessions in the same global order as calls
691 * to mark_projected.
692 */
693 version_t mark_projected(Session *session);
694
695 /**
696 * During replay, advance versions to account
697 * for a session modification, and mark the
698 * session dirty.
699 */
700 void replay_dirty_session(Session *session);
701
702 /**
703 * During replay, if a session no longer present
704 * would have consumed a version, advance `version`
705 * and `projected` to account for that.
706 */
707 void replay_advance_version();
708
709 /**
710 * For these session IDs, if a session exists with this ID, and it has
711 * dirty completed_requests, then persist it immediately
712 * (ahead of usual project/dirty versioned writes
713 * of the map).
714 */
715 void save_if_dirty(const std::set<entity_name_t> &tgt_sessions,
716 MDSGatherBuilder *gather_bld);
717
718 private:
719 time avg_birth_time = time::min();
720
721 uint64_t get_session_count_in_state(int state) {
722 return !is_any_state(state) ? 0 : by_state[state]->size();
723 }
724
725 void update_average_birth_time(const Session &s, bool added=true) {
726 uint32_t sessions = session_map.size();
727 time birth_time = s.get_birth_time();
728
729 if (sessions == 1) {
730 avg_birth_time = added ? birth_time : time::min();
731 return;
732 }
733
734 if (added) {
735 avg_birth_time = clock::time_point(
736 ((avg_birth_time - time::min()) / sessions) * (sessions - 1) +
737 (birth_time - time::min()) / sessions);
738 } else {
739 avg_birth_time = clock::time_point(
740 ((avg_birth_time - time::min()) / (sessions - 1)) * sessions -
741 (birth_time - time::min()) / (sessions - 1));
742 }
743 }
744
745 public:
746 void hit_session(Session *session);
747 void handle_conf_change(const struct md_config_t *conf,
748 const std::set <std::string> &changed);
749 };
750
751 std::ostream& operator<<(std::ostream &out, const Session &s);
752
753
754 #endif