]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/SessionMap.h
update sources to 12.2.7
[ceph.git] / ceph / src / mds / SessionMap.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_MDS_SESSIONMAP_H
16 #define CEPH_MDS_SESSIONMAP_H
17
18 #include <set>
19 using std::set;
20
21 #include "include/unordered_map.h"
22
23 #include "include/Context.h"
24 #include "include/xlist.h"
25 #include "include/elist.h"
26 #include "include/interval_set.h"
27 #include "mdstypes.h"
28 #include "mds/MDSAuthCaps.h"
29 #include "common/perf_counters.h"
30
31 class CInode;
32 struct MDRequestImpl;
33
34 #include "CInode.h"
35 #include "Capability.h"
36 #include "msg/Message.h"
37
38 enum {
39 l_mdssm_first = 5500,
40 l_mdssm_session_count,
41 l_mdssm_session_add,
42 l_mdssm_session_remove,
43 l_mdssm_last,
44 };
45
46 /*
47 * session
48 */
49
50 class Session : public RefCountedObject {
51 // -- state etc --
52 public:
53 /*
54
55 <deleted> <-- closed <------------+
56 ^ | |
57 | v |
58 killing <-- opening <----+ |
59 ^ | | |
60 | v | |
61 stale <--> open --> closing ---+
62
63 + additional dimension of 'importing' (with counter)
64
65 */
66 enum {
67 STATE_CLOSED = 0,
68 STATE_OPENING = 1, // journaling open
69 STATE_OPEN = 2,
70 STATE_CLOSING = 3, // journaling close
71 STATE_STALE = 4,
72 STATE_KILLING = 5
73 };
74
75 const char *get_state_name(int s) const {
76 switch (s) {
77 case STATE_CLOSED: return "closed";
78 case STATE_OPENING: return "opening";
79 case STATE_OPEN: return "open";
80 case STATE_CLOSING: return "closing";
81 case STATE_STALE: return "stale";
82 case STATE_KILLING: return "killing";
83 default: return "???";
84 }
85 }
86
87 private:
88 int state;
89 uint64_t state_seq;
90 int importing_count;
91 friend class SessionMap;
92
93 // Human (friendly) name is soft state generated from client metadata
94 void _update_human_name();
95 std::string human_name;
96
97 // Versions in this session was projected: used to verify
98 // that appropriate mark_dirty calls follow.
99 std::deque<version_t> projected;
100
101
102
103 public:
104
105 void push_pv(version_t pv)
106 {
107 assert(projected.empty() || projected.back() != pv);
108 projected.push_back(pv);
109 }
110
111 void pop_pv(version_t v)
112 {
113 assert(!projected.empty());
114 assert(projected.front() == v);
115 projected.pop_front();
116 }
117
118 int get_state() const { return state; }
119 void set_state(int new_state)
120 {
121 if (state != new_state) {
122 state = new_state;
123 state_seq++;
124 }
125 }
126 void decode(bufferlist::iterator &p);
127 void set_client_metadata(std::map<std::string, std::string> const &meta);
128 std::string get_human_name() const {return human_name;}
129
130 // Ephemeral state for tracking progress of capability recalls
131 utime_t recalled_at; // When was I asked to SESSION_RECALL?
132 utime_t last_recall_sent;
133 uint32_t recall_count; // How many caps was I asked to SESSION_RECALL?
134 uint32_t recall_release_count; // How many caps have I actually revoked?
135
136 session_info_t info; ///< durable bits
137
138 MDSAuthCaps auth_caps;
139
140 ConnectionRef connection;
141 xlist<Session*>::item item_session_list;
142
143 list<Message*> preopen_out_queue; ///< messages for client, queued before they connect
144
145 elist<MDRequestImpl*> requests;
146 size_t get_request_count();
147
148 interval_set<inodeno_t> pending_prealloc_inos; // journaling prealloc, will be added to prealloc_inos
149
150 void notify_cap_release(size_t n_caps);
151 void notify_recall_sent(const size_t new_limit);
152 void clear_recalled_at();
153
154 inodeno_t next_ino() const {
155 if (info.prealloc_inos.empty())
156 return 0;
157 return info.prealloc_inos.range_start();
158 }
159 inodeno_t take_ino(inodeno_t ino = 0) {
160 assert(!info.prealloc_inos.empty());
161
162 if (ino) {
163 if (info.prealloc_inos.contains(ino))
164 info.prealloc_inos.erase(ino);
165 else
166 ino = 0;
167 }
168 if (!ino) {
169 ino = info.prealloc_inos.range_start();
170 info.prealloc_inos.erase(ino);
171 }
172 info.used_inos.insert(ino, 1);
173 return ino;
174 }
175 int get_num_projected_prealloc_inos() const {
176 return info.prealloc_inos.size() + pending_prealloc_inos.size();
177 }
178
179 client_t get_client() const {
180 return info.get_client();
181 }
182
183 const char *get_state_name() const { return get_state_name(state); }
184 uint64_t get_state_seq() const { return state_seq; }
185 bool is_closed() const { return state == STATE_CLOSED; }
186 bool is_opening() const { return state == STATE_OPENING; }
187 bool is_open() const { return state == STATE_OPEN; }
188 bool is_closing() const { return state == STATE_CLOSING; }
189 bool is_stale() const { return state == STATE_STALE; }
190 bool is_killing() const { return state == STATE_KILLING; }
191
192 void inc_importing() {
193 ++importing_count;
194 }
195 void dec_importing() {
196 assert(importing_count > 0);
197 --importing_count;
198 }
199 bool is_importing() const { return importing_count > 0; }
200
201 // -- caps --
202 private:
203 version_t cap_push_seq; // cap push seq #
204 map<version_t, list<MDSInternalContextBase*> > waitfor_flush; // flush session messages
205
206 public:
207 xlist<Capability*> caps; // inodes with caps; front=most recently used
208 xlist<ClientLease*> leases; // metadata leases to clients
209 utime_t last_cap_renew;
210
211 public:
212 version_t inc_push_seq() { return ++cap_push_seq; }
213 version_t get_push_seq() const { return cap_push_seq; }
214
215 version_t wait_for_flush(MDSInternalContextBase* c) {
216 waitfor_flush[get_push_seq()].push_back(c);
217 return get_push_seq();
218 }
219 void finish_flush(version_t seq, list<MDSInternalContextBase*>& ls) {
220 while (!waitfor_flush.empty()) {
221 if (waitfor_flush.begin()->first > seq)
222 break;
223 ls.splice(ls.end(), waitfor_flush.begin()->second);
224 waitfor_flush.erase(waitfor_flush.begin());
225 }
226 }
227
228 void add_cap(Capability *cap) {
229 caps.push_back(&cap->item_session_caps);
230 }
231 void touch_lease(ClientLease *r) {
232 leases.push_back(&r->item_session_lease);
233 }
234
235 // -- leases --
236 uint32_t lease_seq;
237
238 // -- completed requests --
239 private:
240 // Has completed_requests been modified since the last time we
241 // wrote this session out?
242 bool completed_requests_dirty;
243
244 unsigned num_trim_flushes_warnings;
245 unsigned num_trim_requests_warnings;
246 public:
247 void add_completed_request(ceph_tid_t t, inodeno_t created) {
248 info.completed_requests[t] = created;
249 completed_requests_dirty = true;
250 }
251 bool trim_completed_requests(ceph_tid_t mintid) {
252 // trim
253 bool erased_any = false;
254 while (!info.completed_requests.empty() &&
255 (mintid == 0 || info.completed_requests.begin()->first < mintid)) {
256 info.completed_requests.erase(info.completed_requests.begin());
257 erased_any = true;
258 }
259
260 if (erased_any) {
261 completed_requests_dirty = true;
262 }
263 return erased_any;
264 }
265 bool have_completed_request(ceph_tid_t tid, inodeno_t *pcreated) const {
266 map<ceph_tid_t,inodeno_t>::const_iterator p = info.completed_requests.find(tid);
267 if (p == info.completed_requests.end())
268 return false;
269 if (pcreated)
270 *pcreated = p->second;
271 return true;
272 }
273
274 void add_completed_flush(ceph_tid_t tid) {
275 info.completed_flushes.insert(tid);
276 }
277 bool trim_completed_flushes(ceph_tid_t mintid) {
278 bool erased_any = false;
279 while (!info.completed_flushes.empty() &&
280 (mintid == 0 || *info.completed_flushes.begin() < mintid)) {
281 info.completed_flushes.erase(info.completed_flushes.begin());
282 erased_any = true;
283 }
284 if (erased_any) {
285 completed_requests_dirty = true;
286 }
287 return erased_any;
288 }
289 bool have_completed_flush(ceph_tid_t tid) const {
290 return info.completed_flushes.count(tid);
291 }
292
293 unsigned get_num_completed_flushes() const { return info.completed_flushes.size(); }
294 unsigned get_num_trim_flushes_warnings() const {
295 return num_trim_flushes_warnings;
296 }
297 void inc_num_trim_flushes_warnings() { ++num_trim_flushes_warnings; }
298 void reset_num_trim_flushes_warnings() { num_trim_flushes_warnings = 0; }
299
300 unsigned get_num_completed_requests() const { return info.completed_requests.size(); }
301 unsigned get_num_trim_requests_warnings() const {
302 return num_trim_requests_warnings;
303 }
304 void inc_num_trim_requests_warnings() { ++num_trim_requests_warnings; }
305 void reset_num_trim_requests_warnings() { num_trim_requests_warnings = 0; }
306
307 bool has_dirty_completed_requests() const
308 {
309 return completed_requests_dirty;
310 }
311
312 void clear_dirty_completed_requests()
313 {
314 completed_requests_dirty = false;
315 }
316
317 int check_access(CInode *in, unsigned mask, int caller_uid, int caller_gid,
318 const vector<uint64_t> *gid_list, int new_uid, int new_gid);
319
320
321 Session() :
322 state(STATE_CLOSED), state_seq(0), importing_count(0),
323 recall_count(0), recall_release_count(0),
324 auth_caps(g_ceph_context),
325 connection(NULL), item_session_list(this),
326 requests(0), // member_offset passed to front() manually
327 cap_push_seq(0),
328 lease_seq(0),
329 completed_requests_dirty(false),
330 num_trim_flushes_warnings(0),
331 num_trim_requests_warnings(0) { }
332 ~Session() override {
333 if (state == STATE_CLOSED) {
334 item_session_list.remove_myself();
335 } else {
336 assert(!item_session_list.is_on_list());
337 }
338 while (!preopen_out_queue.empty()) {
339 preopen_out_queue.front()->put();
340 preopen_out_queue.pop_front();
341 }
342 }
343
344 void clear() {
345 pending_prealloc_inos.clear();
346 info.clear_meta();
347
348 cap_push_seq = 0;
349 last_cap_renew = utime_t();
350
351 }
352 };
353
354 class SessionFilter
355 {
356 protected:
357 // First is whether to filter, second is filter value
358 std::pair<bool, bool> reconnecting;
359
360 public:
361 std::map<std::string, std::string> metadata;
362 std::string auth_name;
363 std::string state;
364 int64_t id;
365
366 SessionFilter()
367 : reconnecting(false, false), id(0)
368 {}
369
370 bool match(
371 const Session &session,
372 std::function<bool(client_t)> is_reconnecting) const;
373 int parse(const std::vector<std::string> &args, std::stringstream *ss);
374 void set_reconnecting(bool v)
375 {
376 reconnecting.first = true;
377 reconnecting.second = v;
378 }
379 };
380
381 /*
382 * session map
383 */
384
385 class MDSRank;
386
387 /**
388 * Encapsulate the serialized state associated with SessionMap. Allows
389 * encode/decode outside of live MDS instance.
390 */
391 class SessionMapStore {
392 protected:
393 version_t version;
394 ceph::unordered_map<entity_name_t, Session*> session_map;
395 PerfCounters *logger;
396 public:
397 mds_rank_t rank;
398
399 version_t get_version() const {return version;}
400
401 virtual void encode_header(bufferlist *header_bl);
402 virtual void decode_header(bufferlist &header_bl);
403 virtual void decode_values(std::map<std::string, bufferlist> &session_vals);
404 virtual void decode_legacy(bufferlist::iterator& blp);
405 void dump(Formatter *f) const;
406
407 void set_rank(mds_rank_t r)
408 {
409 rank = r;
410 }
411
412 Session* get_or_add_session(const entity_inst_t& i) {
413 Session *s;
414 auto session_map_entry = session_map.find(i.name);
415 if (session_map_entry != session_map.end()) {
416 s = session_map_entry->second;
417 } else {
418 s = session_map[i.name] = new Session;
419 s->info.inst = i;
420 s->last_cap_renew = ceph_clock_now();
421 if (logger) {
422 logger->set(l_mdssm_session_count, session_map.size());
423 logger->inc(l_mdssm_session_add);
424 }
425 }
426
427 return s;
428 }
429
430 static void generate_test_instances(list<SessionMapStore*>& ls);
431
432 void reset_state()
433 {
434 session_map.clear();
435 }
436
437 SessionMapStore() : version(0), logger(nullptr), rank(MDS_RANK_NONE) {}
438 virtual ~SessionMapStore() {};
439 };
440
441 class SessionMap : public SessionMapStore {
442 public:
443 MDSRank *mds;
444
445 protected:
446 version_t projected, committing, committed;
447 public:
448 map<int,xlist<Session*>* > by_state;
449 uint64_t set_state(Session *session, int state);
450 map<version_t, list<MDSInternalContextBase*> > commit_waiters;
451
452 explicit SessionMap(MDSRank *m) : mds(m),
453 projected(0), committing(0), committed(0),
454 loaded_legacy(false)
455 { }
456
457 ~SessionMap() override
458 {
459 for (auto p : by_state)
460 delete p.second;
461
462 if (logger) {
463 g_ceph_context->get_perfcounters_collection()->remove(logger);
464 }
465
466 delete logger;
467 }
468
469 void register_perfcounters();
470
471 void set_version(const version_t v)
472 {
473 version = projected = v;
474 }
475
476 void set_projected(const version_t v)
477 {
478 projected = v;
479 }
480
481 version_t get_projected() const
482 {
483 return projected;
484 }
485
486 version_t get_committed() const
487 {
488 return committed;
489 }
490
491 version_t get_committing() const
492 {
493 return committing;
494 }
495
496 // sessions
497 void decode_legacy(bufferlist::iterator& blp) override;
498 bool empty() const { return session_map.empty(); }
499 const ceph::unordered_map<entity_name_t, Session*> &get_sessions() const
500 {
501 return session_map;
502 }
503
504 bool is_any_state(int state) const {
505 map<int,xlist<Session*>* >::const_iterator p = by_state.find(state);
506 if (p == by_state.end() || p->second->empty())
507 return false;
508 return true;
509 }
510
511 bool have_unclosed_sessions() const {
512 return
513 is_any_state(Session::STATE_OPENING) ||
514 is_any_state(Session::STATE_OPEN) ||
515 is_any_state(Session::STATE_CLOSING) ||
516 is_any_state(Session::STATE_STALE) ||
517 is_any_state(Session::STATE_KILLING);
518 }
519 bool have_session(entity_name_t w) const {
520 return session_map.count(w);
521 }
522 Session* get_session(entity_name_t w) {
523 auto session_map_entry = session_map.find(w);
524 return (session_map_entry != session_map.end() ?
525 session_map_entry-> second : nullptr);
526 }
527 const Session* get_session(entity_name_t w) const {
528 ceph::unordered_map<entity_name_t, Session*>::const_iterator p = session_map.find(w);
529 if (p == session_map.end()) {
530 return NULL;
531 } else {
532 return p->second;
533 }
534 }
535
536 void add_session(Session *s);
537 void remove_session(Session *s);
538 void touch_session(Session *session);
539
540 Session *get_oldest_session(int state) {
541 auto by_state_entry = by_state.find(state);
542 if (by_state_entry == by_state.end() || by_state_entry->second->empty())
543 return 0;
544 return by_state_entry->second->front();
545 }
546
547 void dump();
548
549 void get_client_session_set(set<Session*>& s) const {
550 for (ceph::unordered_map<entity_name_t,Session*>::const_iterator p = session_map.begin();
551 p != session_map.end();
552 ++p)
553 if (p->second->info.inst.name.is_client())
554 s.insert(p->second);
555 }
556
557 void replay_open_sessions(map<client_t,entity_inst_t>& client_map) {
558 for (map<client_t,entity_inst_t>::iterator p = client_map.begin();
559 p != client_map.end();
560 ++p) {
561 Session *s = get_or_add_session(p->second);
562 set_state(s, Session::STATE_OPEN);
563 replay_dirty_session(s);
564 }
565 }
566
567 // helpers
568 entity_inst_t& get_inst(entity_name_t w) {
569 assert(session_map.count(w));
570 return session_map[w]->info.inst;
571 }
572 version_t inc_push_seq(client_t client) {
573 return get_session(entity_name_t::CLIENT(client.v))->inc_push_seq();
574 }
575 version_t get_push_seq(client_t client) {
576 return get_session(entity_name_t::CLIENT(client.v))->get_push_seq();
577 }
578 bool have_completed_request(metareqid_t rid) {
579 Session *session = get_session(rid.name);
580 return session && session->have_completed_request(rid.tid, NULL);
581 }
582 void trim_completed_requests(entity_name_t c, ceph_tid_t tid) {
583 Session *session = get_session(c);
584 assert(session);
585 session->trim_completed_requests(tid);
586 }
587
588 void wipe();
589 void wipe_ino_prealloc();
590
591 // -- loading, saving --
592 inodeno_t ino;
593 list<MDSInternalContextBase*> waiting_for_load;
594
595 object_t get_object_name() const;
596
597 void load(MDSInternalContextBase *onload);
598 void _load_finish(
599 int operation_r,
600 int header_r,
601 int values_r,
602 bool first,
603 bufferlist &header_bl,
604 std::map<std::string, bufferlist> &session_vals,
605 bool more_session_vals);
606
607 void load_legacy();
608 void _load_legacy_finish(int r, bufferlist &bl);
609
610 void save(MDSInternalContextBase *onsave, version_t needv=0);
611 void _save_finish(version_t v);
612
613 protected:
614 std::set<entity_name_t> dirty_sessions;
615 std::set<entity_name_t> null_sessions;
616 bool loaded_legacy;
617 void _mark_dirty(Session *session);
618 public:
619
620 /**
621 * Advance the version, and mark this session
622 * as dirty within the new version.
623 *
624 * Dirty means journalled but needing writeback
625 * to the backing store. Must have called
626 * mark_projected previously for this session.
627 */
628 void mark_dirty(Session *session);
629
630 /**
631 * Advance the projected version, and mark this
632 * session as projected within the new version
633 *
634 * Projected means the session is updated in memory
635 * but we're waiting for the journal write of the update
636 * to finish. Must subsequently call mark_dirty
637 * for sessions in the same global order as calls
638 * to mark_projected.
639 */
640 version_t mark_projected(Session *session);
641
642 /**
643 * During replay, advance versions to account
644 * for a session modification, and mark the
645 * session dirty.
646 */
647 void replay_dirty_session(Session *session);
648
649 /**
650 * During replay, if a session no longer present
651 * would have consumed a version, advance `version`
652 * and `projected` to account for that.
653 */
654 void replay_advance_version();
655
656 /**
657 * For these session IDs, if a session exists with this ID, and it has
658 * dirty completed_requests, then persist it immediately
659 * (ahead of usual project/dirty versioned writes
660 * of the map).
661 */
662 void save_if_dirty(const std::set<entity_name_t> &tgt_sessions,
663 MDSGatherBuilder *gather_bld);
664 };
665
666 std::ostream& operator<<(std::ostream &out, const Session &s);
667
668
669 #endif