]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/SessionMap.h
import ceph 12.2.12
[ceph.git] / ceph / src / mds / SessionMap.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_MDS_SESSIONMAP_H
16 #define CEPH_MDS_SESSIONMAP_H
17
18 #include <set>
19 using std::set;
20
21 #include "include/unordered_map.h"
22
23 #include "include/Context.h"
24 #include "include/xlist.h"
25 #include "include/elist.h"
26 #include "include/interval_set.h"
27 #include "mdstypes.h"
28 #include "mds/MDSAuthCaps.h"
29 #include "common/perf_counters.h"
30 #include "common/DecayCounter.h"
31
32 class CInode;
33 struct MDRequestImpl;
34
35 #include "CInode.h"
36 #include "Capability.h"
37 #include "msg/Message.h"
38
39 enum {
40 l_mdssm_first = 5500,
41 l_mdssm_session_count,
42 l_mdssm_session_add,
43 l_mdssm_session_remove,
44 l_mdssm_session_open,
45 l_mdssm_session_stale,
46 l_mdssm_total_load,
47 l_mdssm_avg_load,
48 l_mdssm_avg_session_uptime,
49 l_mdssm_last,
50 };
51
52 /*
53 * session
54 */
55
56 class Session : public RefCountedObject {
57 // -- state etc --
58 public:
59 /*
60
61 <deleted> <-- closed <------------+
62 ^ | |
63 | v |
64 killing <-- opening <----+ |
65 ^ | | |
66 | v | |
67 stale <--> open --> closing ---+
68
69 + additional dimension of 'importing' (with counter)
70
71 */
72
73 using clock = ceph::coarse_mono_clock;
74 using time = ceph::coarse_mono_time;
75
76
77 enum {
78 STATE_CLOSED = 0,
79 STATE_OPENING = 1, // journaling open
80 STATE_OPEN = 2,
81 STATE_CLOSING = 3, // journaling close
82 STATE_STALE = 4,
83 STATE_KILLING = 5
84 };
85
86 const char *get_state_name(int s) const {
87 switch (s) {
88 case STATE_CLOSED: return "closed";
89 case STATE_OPENING: return "opening";
90 case STATE_OPEN: return "open";
91 case STATE_CLOSING: return "closing";
92 case STATE_STALE: return "stale";
93 case STATE_KILLING: return "killing";
94 default: return "???";
95 }
96 }
97
98 private:
99 int state = STATE_CLOSED;
100 uint64_t state_seq = 0;
101 int importing_count = 0;
102 friend class SessionMap;
103
104 // Human (friendly) name is soft state generated from client metadata
105 void _update_human_name();
106 std::string human_name;
107
108 // Versions in this session was projected: used to verify
109 // that appropriate mark_dirty calls follow.
110 std::deque<version_t> projected;
111
112 // request load average for this session
113 mutable DecayCounter load_avg;
114 DecayRate load_avg_rate;
115
116 // Ephemeral state for tracking progress of capability recalls
117 // caps being recalled recently by this session; used for Beacon warnings
118 mutable DecayCounter recall_caps;
119 // caps that have been released
120 mutable DecayCounter release_caps;
121 // throttle on caps recalled
122 mutable DecayCounter recall_caps_throttle;
123 // New limit in SESSION_RECALL
124 uint32_t recall_limit = 0;
125
126 // session start time -- used to track average session time
127 // note that this is initialized in the constructor rather
128 // than at the time of adding a session to the sessionmap
129 // as journal replay of sessionmap will not call add_session().
130 time birth_time;
131
132 public:
133
134 void push_pv(version_t pv)
135 {
136 assert(projected.empty() || projected.back() != pv);
137 projected.push_back(pv);
138 }
139
140 void pop_pv(version_t v)
141 {
142 assert(!projected.empty());
143 assert(projected.front() == v);
144 projected.pop_front();
145 }
146
147 int get_state() const { return state; }
148 void set_state(int new_state)
149 {
150 if (state != new_state) {
151 state = new_state;
152 state_seq++;
153 }
154 }
155 void decode(bufferlist::iterator &p);
156 template<typename T>
157 void set_client_metadata(T&& meta)
158 {
159 info.client_metadata = std::forward<T>(meta);
160 _update_human_name();
161 }
162 std::string get_human_name() const {return human_name;}
163
164 session_info_t info; ///< durable bits
165
166 MDSAuthCaps auth_caps;
167
168 ConnectionRef connection;
169 xlist<Session*>::item item_session_list;
170
171 list<Message*> preopen_out_queue; ///< messages for client, queued before they connect
172
173 elist<MDRequestImpl*> requests;
174 size_t get_request_count();
175
176 interval_set<inodeno_t> pending_prealloc_inos; // journaling prealloc, will be added to prealloc_inos
177
178 void notify_cap_release(size_t n_caps);
179 uint64_t notify_recall_sent(size_t new_limit);
180 double get_recall_caps_throttle() const {
181 return recall_caps_throttle.get(ceph_clock_now());
182 }
183 double get_recall_caps() const {
184 return recall_caps.get(ceph_clock_now());
185 }
186 double get_release_caps() const {
187 return release_caps.get(ceph_clock_now());
188 }
189
190 inodeno_t next_ino() const {
191 if (info.prealloc_inos.empty())
192 return 0;
193 return info.prealloc_inos.range_start();
194 }
195 inodeno_t take_ino(inodeno_t ino = 0) {
196 assert(!info.prealloc_inos.empty());
197
198 if (ino) {
199 if (info.prealloc_inos.contains(ino))
200 info.prealloc_inos.erase(ino);
201 else
202 ino = 0;
203 }
204 if (!ino) {
205 ino = info.prealloc_inos.range_start();
206 info.prealloc_inos.erase(ino);
207 }
208 info.used_inos.insert(ino, 1);
209 return ino;
210 }
211 int get_num_projected_prealloc_inos() const {
212 return info.prealloc_inos.size() + pending_prealloc_inos.size();
213 }
214
215 client_t get_client() const {
216 return info.get_client();
217 }
218
219 const char *get_state_name() const { return get_state_name(state); }
220 uint64_t get_state_seq() const { return state_seq; }
221 bool is_closed() const { return state == STATE_CLOSED; }
222 bool is_opening() const { return state == STATE_OPENING; }
223 bool is_open() const { return state == STATE_OPEN; }
224 bool is_closing() const { return state == STATE_CLOSING; }
225 bool is_stale() const { return state == STATE_STALE; }
226 bool is_killing() const { return state == STATE_KILLING; }
227
228 void inc_importing() {
229 ++importing_count;
230 }
231 void dec_importing() {
232 assert(importing_count > 0);
233 --importing_count;
234 }
235 bool is_importing() const { return importing_count > 0; }
236
237 void set_load_avg_decay_rate(double rate) {
238 assert(is_open() || is_stale());
239 load_avg_rate.set_halflife(rate);
240 }
241 uint64_t get_load_avg() const {
242 return (uint64_t)load_avg.get(ceph_clock_now(), load_avg_rate);
243 }
244 void hit_session() {
245 load_avg.hit(ceph_clock_now(), load_avg_rate);
246 }
247
248 double get_session_uptime() const {
249 chrono::duration<double> uptime = clock::now() - birth_time;
250 return uptime.count();
251 }
252
253 time get_birth_time() const {
254 return birth_time;
255 }
256
257 // -- caps --
258 private:
259 uint32_t cap_gen = 0;
260 version_t cap_push_seq = 0; // cap push seq #
261 map<version_t, list<MDSInternalContextBase*> > waitfor_flush; // flush session messages
262
263 public:
264 xlist<Capability*> caps; // inodes with caps; front=most recently used
265 xlist<ClientLease*> leases; // metadata leases to clients
266 time last_cap_renew = time::min();
267 time last_seen = time::min();
268
269 void inc_cap_gen() { ++cap_gen; }
270 uint32_t get_cap_gen() const { return cap_gen; }
271
272 version_t inc_push_seq() { return ++cap_push_seq; }
273 version_t get_push_seq() const { return cap_push_seq; }
274
275 version_t wait_for_flush(MDSInternalContextBase* c) {
276 waitfor_flush[get_push_seq()].push_back(c);
277 return get_push_seq();
278 }
279 void finish_flush(version_t seq, list<MDSInternalContextBase*>& ls) {
280 while (!waitfor_flush.empty()) {
281 if (waitfor_flush.begin()->first > seq)
282 break;
283 ls.splice(ls.end(), waitfor_flush.begin()->second);
284 waitfor_flush.erase(waitfor_flush.begin());
285 }
286 }
287
288 void touch_cap(Capability *cap) {
289 caps.push_front(&cap->item_session_caps);
290 }
291 void touch_cap_bottom(Capability *cap) {
292 caps.push_back(&cap->item_session_caps);
293 }
294 void touch_lease(ClientLease *r) {
295 leases.push_back(&r->item_session_lease);
296 }
297
298 // -- leases --
299 uint32_t lease_seq = 0;
300
301 // -- completed requests --
302 private:
303 // Has completed_requests been modified since the last time we
304 // wrote this session out?
305 bool completed_requests_dirty = false;
306
307 unsigned num_trim_flushes_warnings = 0;
308 unsigned num_trim_requests_warnings = 0;
309 public:
310 void add_completed_request(ceph_tid_t t, inodeno_t created) {
311 info.completed_requests[t] = created;
312 completed_requests_dirty = true;
313 }
314 bool trim_completed_requests(ceph_tid_t mintid) {
315 // trim
316 bool erased_any = false;
317 while (!info.completed_requests.empty() &&
318 (mintid == 0 || info.completed_requests.begin()->first < mintid)) {
319 info.completed_requests.erase(info.completed_requests.begin());
320 erased_any = true;
321 }
322
323 if (erased_any) {
324 completed_requests_dirty = true;
325 }
326 return erased_any;
327 }
328 bool have_completed_request(ceph_tid_t tid, inodeno_t *pcreated) const {
329 map<ceph_tid_t,inodeno_t>::const_iterator p = info.completed_requests.find(tid);
330 if (p == info.completed_requests.end())
331 return false;
332 if (pcreated)
333 *pcreated = p->second;
334 return true;
335 }
336
337 void add_completed_flush(ceph_tid_t tid) {
338 info.completed_flushes.insert(tid);
339 }
340 bool trim_completed_flushes(ceph_tid_t mintid) {
341 bool erased_any = false;
342 while (!info.completed_flushes.empty() &&
343 (mintid == 0 || *info.completed_flushes.begin() < mintid)) {
344 info.completed_flushes.erase(info.completed_flushes.begin());
345 erased_any = true;
346 }
347 if (erased_any) {
348 completed_requests_dirty = true;
349 }
350 return erased_any;
351 }
352 bool have_completed_flush(ceph_tid_t tid) const {
353 return info.completed_flushes.count(tid);
354 }
355
356 unsigned get_num_completed_flushes() const { return info.completed_flushes.size(); }
357 unsigned get_num_trim_flushes_warnings() const {
358 return num_trim_flushes_warnings;
359 }
360 void inc_num_trim_flushes_warnings() { ++num_trim_flushes_warnings; }
361 void reset_num_trim_flushes_warnings() { num_trim_flushes_warnings = 0; }
362
363 unsigned get_num_completed_requests() const { return info.completed_requests.size(); }
364 unsigned get_num_trim_requests_warnings() const {
365 return num_trim_requests_warnings;
366 }
367 void inc_num_trim_requests_warnings() { ++num_trim_requests_warnings; }
368 void reset_num_trim_requests_warnings() { num_trim_requests_warnings = 0; }
369
370 bool has_dirty_completed_requests() const
371 {
372 return completed_requests_dirty;
373 }
374
375 void clear_dirty_completed_requests()
376 {
377 completed_requests_dirty = false;
378 }
379
380 int check_access(CInode *in, unsigned mask, int caller_uid, int caller_gid,
381 const vector<uint64_t> *gid_list, int new_uid, int new_gid);
382
383 Session() = delete;
384 Session(ConnectionRef con) :
385 recall_caps(ceph_clock_now(), g_conf->get_val<double>("mds_recall_warning_decay_rate")),
386 release_caps(ceph_clock_now(), g_conf->get_val<double>("mds_recall_warning_decay_rate")),
387 recall_caps_throttle(ceph_clock_now(), g_conf->get_val<double>("mds_recall_max_decay_rate")),
388 birth_time(clock::now()),
389 auth_caps(g_ceph_context),
390 item_session_list(this),
391 requests(0) // member_offset passed to front() manually
392 {
393 connection = std::move(con);
394 }
395 ~Session() override {
396 if (state == STATE_CLOSED) {
397 item_session_list.remove_myself();
398 } else {
399 assert(!item_session_list.is_on_list());
400 }
401 while (!preopen_out_queue.empty()) {
402 preopen_out_queue.front()->put();
403 preopen_out_queue.pop_front();
404 }
405 }
406
407 void clear() {
408 pending_prealloc_inos.clear();
409 info.clear_meta();
410
411 cap_push_seq = 0;
412 last_cap_renew = time::min();
413 }
414 };
415
416 class SessionFilter
417 {
418 protected:
419 // First is whether to filter, second is filter value
420 std::pair<bool, bool> reconnecting;
421
422 public:
423 std::map<std::string, std::string> metadata;
424 std::string auth_name;
425 std::string state;
426 int64_t id;
427
428 SessionFilter()
429 : reconnecting(false, false), id(0)
430 {}
431
432 bool match(
433 const Session &session,
434 std::function<bool(client_t)> is_reconnecting) const;
435 int parse(const std::vector<std::string> &args, std::stringstream *ss);
436 void set_reconnecting(bool v)
437 {
438 reconnecting.first = true;
439 reconnecting.second = v;
440 }
441 };
442
443 /*
444 * session map
445 */
446
447 class MDSRank;
448
449 /**
450 * Encapsulate the serialized state associated with SessionMap. Allows
451 * encode/decode outside of live MDS instance.
452 */
453 class SessionMapStore {
454 public:
455 using clock = Session::clock;
456 using time = Session::time;
457
458 protected:
459 version_t version;
460 ceph::unordered_map<entity_name_t, Session*> session_map;
461 PerfCounters *logger;
462
463 // total request load avg
464 double decay_rate;
465 DecayCounter total_load_avg;
466 DecayRate total_load_avg_rate;
467
468 public:
469 mds_rank_t rank;
470
471 version_t get_version() const {return version;}
472
473 virtual void encode_header(bufferlist *header_bl);
474 virtual void decode_header(bufferlist &header_bl);
475 virtual void decode_values(std::map<std::string, bufferlist> &session_vals);
476 virtual void decode_legacy(bufferlist::iterator& blp);
477 void dump(Formatter *f) const;
478
479 void set_rank(mds_rank_t r)
480 {
481 rank = r;
482 }
483
484 Session* get_or_add_session(const entity_inst_t& i) {
485 Session *s;
486 auto session_map_entry = session_map.find(i.name);
487 if (session_map_entry != session_map.end()) {
488 s = session_map_entry->second;
489 } else {
490 s = session_map[i.name] = new Session(ConnectionRef());
491 s->info.inst = i;
492 s->last_cap_renew = Session::clock::now();
493 if (logger) {
494 logger->set(l_mdssm_session_count, session_map.size());
495 logger->inc(l_mdssm_session_add);
496 }
497 }
498
499 return s;
500 }
501
502 static void generate_test_instances(list<SessionMapStore*>& ls);
503
504 void reset_state()
505 {
506 session_map.clear();
507 }
508
509 SessionMapStore()
510 : version(0), logger(nullptr),
511 decay_rate(g_conf->get_val<double>("mds_request_load_average_decay_rate")),
512 total_load_avg_rate(decay_rate), rank(MDS_RANK_NONE) {
513 }
514 virtual ~SessionMapStore() {};
515 };
516
517 class SessionMap : public SessionMapStore {
518 public:
519 MDSRank *mds;
520
521 protected:
522 version_t projected = 0, committing = 0, committed = 0;
523 public:
524 map<int,xlist<Session*>* > by_state;
525 uint64_t set_state(Session *session, int state);
526 map<version_t, list<MDSInternalContextBase*> > commit_waiters;
527 void update_average_session_age();
528
529 SessionMap() = delete;
530 explicit SessionMap(MDSRank *m) : mds(m) {}
531
532 ~SessionMap() override
533 {
534 for (auto p : by_state)
535 delete p.second;
536
537 if (logger) {
538 g_ceph_context->get_perfcounters_collection()->remove(logger);
539 }
540
541 delete logger;
542 }
543
544 void register_perfcounters();
545
546 void set_version(const version_t v)
547 {
548 version = projected = v;
549 }
550
551 void set_projected(const version_t v)
552 {
553 projected = v;
554 }
555
556 version_t get_projected() const
557 {
558 return projected;
559 }
560
561 version_t get_committed() const
562 {
563 return committed;
564 }
565
566 version_t get_committing() const
567 {
568 return committing;
569 }
570
571 // sessions
572 void decode_legacy(bufferlist::iterator& blp) override;
573 bool empty() const { return session_map.empty(); }
574 const ceph::unordered_map<entity_name_t, Session*> &get_sessions() const
575 {
576 return session_map;
577 }
578
579 bool is_any_state(int state) const {
580 map<int,xlist<Session*>* >::const_iterator p = by_state.find(state);
581 if (p == by_state.end() || p->second->empty())
582 return false;
583 return true;
584 }
585
586 bool have_unclosed_sessions() const {
587 return
588 is_any_state(Session::STATE_OPENING) ||
589 is_any_state(Session::STATE_OPEN) ||
590 is_any_state(Session::STATE_CLOSING) ||
591 is_any_state(Session::STATE_STALE) ||
592 is_any_state(Session::STATE_KILLING);
593 }
594 bool have_session(entity_name_t w) const {
595 return session_map.count(w);
596 }
597 Session* get_session(entity_name_t w) {
598 auto session_map_entry = session_map.find(w);
599 return (session_map_entry != session_map.end() ?
600 session_map_entry-> second : nullptr);
601 }
602 const Session* get_session(entity_name_t w) const {
603 ceph::unordered_map<entity_name_t, Session*>::const_iterator p = session_map.find(w);
604 if (p == session_map.end()) {
605 return NULL;
606 } else {
607 return p->second;
608 }
609 }
610
611 void add_session(Session *s);
612 void remove_session(Session *s);
613 void touch_session(Session *session);
614
615 Session *get_oldest_session(int state) {
616 auto by_state_entry = by_state.find(state);
617 if (by_state_entry == by_state.end() || by_state_entry->second->empty())
618 return 0;
619 return by_state_entry->second->front();
620 }
621
622 void dump();
623
624 template<typename F>
625 void get_client_sessions(F&& f) const {
626 for (const auto& p : session_map) {
627 auto& session = p.second;
628 if (session->info.inst.name.is_client())
629 f(session);
630 }
631 }
632 template<typename C>
633 void get_client_session_set(C& c) const {
634 auto f = [&c](Session* s) {
635 c.insert(s);
636 };
637 get_client_sessions(f);
638 }
639
640 void replay_open_sessions(map<client_t,entity_inst_t>& client_map) {
641 for (map<client_t,entity_inst_t>::iterator p = client_map.begin();
642 p != client_map.end();
643 ++p) {
644 Session *s = get_or_add_session(p->second);
645 set_state(s, Session::STATE_OPEN);
646 replay_dirty_session(s);
647 }
648 }
649
650 // helpers
651 entity_inst_t& get_inst(entity_name_t w) {
652 assert(session_map.count(w));
653 return session_map[w]->info.inst;
654 }
655 version_t inc_push_seq(client_t client) {
656 return get_session(entity_name_t::CLIENT(client.v))->inc_push_seq();
657 }
658 version_t get_push_seq(client_t client) {
659 return get_session(entity_name_t::CLIENT(client.v))->get_push_seq();
660 }
661 bool have_completed_request(metareqid_t rid) {
662 Session *session = get_session(rid.name);
663 return session && session->have_completed_request(rid.tid, NULL);
664 }
665 void trim_completed_requests(entity_name_t c, ceph_tid_t tid) {
666 Session *session = get_session(c);
667 assert(session);
668 session->trim_completed_requests(tid);
669 }
670
671 void wipe();
672 void wipe_ino_prealloc();
673
674 // -- loading, saving --
675 inodeno_t ino;
676 list<MDSInternalContextBase*> waiting_for_load;
677
678 object_t get_object_name() const;
679
680 void load(MDSInternalContextBase *onload);
681 void _load_finish(
682 int operation_r,
683 int header_r,
684 int values_r,
685 bool first,
686 bufferlist &header_bl,
687 std::map<std::string, bufferlist> &session_vals,
688 bool more_session_vals);
689
690 void load_legacy();
691 void _load_legacy_finish(int r, bufferlist &bl);
692
693 void save(MDSInternalContextBase *onsave, version_t needv=0);
694 void _save_finish(version_t v);
695
696 protected:
697 std::set<entity_name_t> dirty_sessions;
698 std::set<entity_name_t> null_sessions;
699 bool loaded_legacy = false;
700 void _mark_dirty(Session *session);
701 public:
702
703 /**
704 * Advance the version, and mark this session
705 * as dirty within the new version.
706 *
707 * Dirty means journalled but needing writeback
708 * to the backing store. Must have called
709 * mark_projected previously for this session.
710 */
711 void mark_dirty(Session *session);
712
713 /**
714 * Advance the projected version, and mark this
715 * session as projected within the new version
716 *
717 * Projected means the session is updated in memory
718 * but we're waiting for the journal write of the update
719 * to finish. Must subsequently call mark_dirty
720 * for sessions in the same global order as calls
721 * to mark_projected.
722 */
723 version_t mark_projected(Session *session);
724
725 /**
726 * During replay, advance versions to account
727 * for a session modification, and mark the
728 * session dirty.
729 */
730 void replay_dirty_session(Session *session);
731
732 /**
733 * During replay, if a session no longer present
734 * would have consumed a version, advance `version`
735 * and `projected` to account for that.
736 */
737 void replay_advance_version();
738
739 /**
740 * For these session IDs, if a session exists with this ID, and it has
741 * dirty completed_requests, then persist it immediately
742 * (ahead of usual project/dirty versioned writes
743 * of the map).
744 */
745 void save_if_dirty(const std::set<entity_name_t> &tgt_sessions,
746 MDSGatherBuilder *gather_bld);
747
748 private:
749 time avg_birth_time = time::min();
750
751 uint64_t get_session_count_in_state(int state) {
752 return !is_any_state(state) ? 0 : by_state[state]->size();
753 }
754
755 void update_average_birth_time(const Session &s, bool added=true) {
756 uint32_t sessions = session_map.size();
757 time birth_time = s.get_birth_time();
758
759 if (sessions == 1) {
760 avg_birth_time = added ? birth_time : time::min();
761 return;
762 }
763
764 if (added) {
765 avg_birth_time = clock::time_point(
766 ((avg_birth_time - time::min()) / sessions) * (sessions - 1) +
767 (birth_time - time::min()) / sessions);
768 } else {
769 avg_birth_time = clock::time_point(
770 ((avg_birth_time - time::min()) / (sessions - 1)) * sessions -
771 (birth_time - time::min()) / (sessions - 1));
772 }
773 }
774
775 public:
776 void hit_session(Session *session);
777 void handle_conf_change(const struct md_config_t *conf,
778 const std::set <std::string> &changed);
779 };
780
781 std::ostream& operator<<(std::ostream &out, const Session &s);
782
783
784 #endif