]> git.proxmox.com Git - ceph.git/blame - ceph/src/mds/SessionMap.h
import ceph nautilus 14.2.2
[ceph.git] / ceph / src / mds / SessionMap.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#ifndef CEPH_MDS_SESSIONMAP_H
16#define CEPH_MDS_SESSIONMAP_H
17
18#include <set>
19using std::set;
20
21#include "include/unordered_map.h"
22
23#include "include/Context.h"
24#include "include/xlist.h"
25#include "include/elist.h"
26#include "include/interval_set.h"
27#include "mdstypes.h"
28#include "mds/MDSAuthCaps.h"
29#include "common/perf_counters.h"
a8e16298 30#include "common/DecayCounter.h"
7c673cae
FG
31
32class CInode;
33struct MDRequestImpl;
34
35#include "CInode.h"
36#include "Capability.h"
11fdf7f2 37#include "MDSContext.h"
7c673cae
FG
38#include "msg/Message.h"
39
40enum {
41 l_mdssm_first = 5500,
42 l_mdssm_session_count,
43 l_mdssm_session_add,
44 l_mdssm_session_remove,
91327a77
AA
45 l_mdssm_session_open,
46 l_mdssm_session_stale,
47 l_mdssm_total_load,
48 l_mdssm_avg_load,
49 l_mdssm_avg_session_uptime,
7c673cae
FG
50 l_mdssm_last,
51};
52
53/*
54 * session
55 */
56
57class Session : public RefCountedObject {
58 // -- state etc --
59public:
60 /*
61
62 <deleted> <-- closed <------------+
63 ^ | |
64 | v |
65 killing <-- opening <----+ |
66 ^ | | |
67 | v | |
68 stale <--> open --> closing ---+
69
70 + additional dimension of 'importing' (with counter)
71
72 */
91327a77
AA
73
74 using clock = ceph::coarse_mono_clock;
75 using time = ceph::coarse_mono_time;
76
77
7c673cae
FG
78 enum {
79 STATE_CLOSED = 0,
80 STATE_OPENING = 1, // journaling open
81 STATE_OPEN = 2,
82 STATE_CLOSING = 3, // journaling close
83 STATE_STALE = 4,
84 STATE_KILLING = 5
85 };
86
11fdf7f2 87 static std::string_view get_state_name(int s) {
7c673cae
FG
88 switch (s) {
89 case STATE_CLOSED: return "closed";
90 case STATE_OPENING: return "opening";
91 case STATE_OPEN: return "open";
92 case STATE_CLOSING: return "closing";
93 case STATE_STALE: return "stale";
94 case STATE_KILLING: return "killing";
95 default: return "???";
96 }
97 }
98
99private:
a8e16298
TL
100 int state = STATE_CLOSED;
101 uint64_t state_seq = 0;
102 int importing_count = 0;
7c673cae
FG
103 friend class SessionMap;
104
105 // Human (friendly) name is soft state generated from client metadata
106 void _update_human_name();
107 std::string human_name;
108
109 // Versions in this session was projected: used to verify
110 // that appropriate mark_dirty calls follow.
111 std::deque<version_t> projected;
112
91327a77 113 // request load average for this session
11fdf7f2 114 DecayCounter load_avg;
7c673cae 115
a8e16298
TL
116 // Ephemeral state for tracking progress of capability recalls
117 // caps being recalled recently by this session; used for Beacon warnings
11fdf7f2 118 DecayCounter recall_caps;
a8e16298 119 // caps that have been released
11fdf7f2 120 DecayCounter release_caps;
a8e16298 121 // throttle on caps recalled
11fdf7f2
TL
122 DecayCounter recall_caps_throttle;
123 // second order throttle that prevents recalling too quickly
124 DecayCounter recall_caps_throttle2o;
a8e16298
TL
125 // New limit in SESSION_RECALL
126 uint32_t recall_limit = 0;
127
91327a77
AA
128 // session start time -- used to track average session time
129 // note that this is initialized in the constructor rather
130 // than at the time of adding a session to the sessionmap
131 // as journal replay of sessionmap will not call add_session().
132 time birth_time;
7c673cae
FG
133
134public:
11fdf7f2 135 Session *reclaiming_from = nullptr;
7c673cae
FG
136
137 void push_pv(version_t pv)
138 {
11fdf7f2 139 ceph_assert(projected.empty() || projected.back() != pv);
7c673cae
FG
140 projected.push_back(pv);
141 }
142
143 void pop_pv(version_t v)
144 {
11fdf7f2
TL
145 ceph_assert(!projected.empty());
146 ceph_assert(projected.front() == v);
7c673cae
FG
147 projected.pop_front();
148 }
149
150 int get_state() const { return state; }
151 void set_state(int new_state)
152 {
153 if (state != new_state) {
154 state = new_state;
155 state_seq++;
156 }
157 }
11fdf7f2 158 void decode(bufferlist::const_iterator &p);
a8e16298
TL
159 template<typename T>
160 void set_client_metadata(T&& meta)
161 {
162 info.client_metadata = std::forward<T>(meta);
163 _update_human_name();
164 }
11fdf7f2
TL
165
166 const std::string& get_human_name() const {return human_name;}
7c673cae 167
7c673cae
FG
168 session_info_t info; ///< durable bits
169
170 MDSAuthCaps auth_caps;
171
11fdf7f2 172protected:
7c673cae 173 ConnectionRef connection;
11fdf7f2
TL
174public:
175 entity_addr_t socket_addr;
7c673cae
FG
176 xlist<Session*>::item item_session_list;
177
11fdf7f2 178 list<Message::ref> preopen_out_queue; ///< messages for client, queued before they connect
7c673cae
FG
179
180 elist<MDRequestImpl*> requests;
181 size_t get_request_count();
182
183 interval_set<inodeno_t> pending_prealloc_inos; // journaling prealloc, will be added to prealloc_inos
184
185 void notify_cap_release(size_t n_caps);
a8e16298 186 uint64_t notify_recall_sent(size_t new_limit);
11fdf7f2
TL
187 auto get_recall_caps_throttle() const {
188 return recall_caps_throttle.get();
a8e16298 189 }
11fdf7f2
TL
190 auto get_recall_caps_throttle2o() const {
191 return recall_caps_throttle2o.get();
a8e16298 192 }
11fdf7f2
TL
193 auto get_recall_caps() const {
194 return recall_caps.get();
195 }
196 auto get_release_caps() const {
197 return release_caps.get();
a8e16298 198 }
7c673cae
FG
199
200 inodeno_t next_ino() const {
201 if (info.prealloc_inos.empty())
202 return 0;
203 return info.prealloc_inos.range_start();
204 }
205 inodeno_t take_ino(inodeno_t ino = 0) {
11fdf7f2 206 ceph_assert(!info.prealloc_inos.empty());
7c673cae
FG
207
208 if (ino) {
209 if (info.prealloc_inos.contains(ino))
210 info.prealloc_inos.erase(ino);
211 else
212 ino = 0;
213 }
214 if (!ino) {
215 ino = info.prealloc_inos.range_start();
216 info.prealloc_inos.erase(ino);
217 }
218 info.used_inos.insert(ino, 1);
219 return ino;
220 }
221 int get_num_projected_prealloc_inos() const {
222 return info.prealloc_inos.size() + pending_prealloc_inos.size();
223 }
224
225 client_t get_client() const {
226 return info.get_client();
227 }
228
11fdf7f2 229 std::string_view get_state_name() const { return get_state_name(state); }
7c673cae
FG
230 uint64_t get_state_seq() const { return state_seq; }
231 bool is_closed() const { return state == STATE_CLOSED; }
232 bool is_opening() const { return state == STATE_OPENING; }
233 bool is_open() const { return state == STATE_OPEN; }
234 bool is_closing() const { return state == STATE_CLOSING; }
235 bool is_stale() const { return state == STATE_STALE; }
236 bool is_killing() const { return state == STATE_KILLING; }
237
238 void inc_importing() {
239 ++importing_count;
240 }
241 void dec_importing() {
11fdf7f2 242 ceph_assert(importing_count > 0);
7c673cae
FG
243 --importing_count;
244 }
245 bool is_importing() const { return importing_count > 0; }
246
91327a77 247 void set_load_avg_decay_rate(double rate) {
11fdf7f2
TL
248 ceph_assert(is_open() || is_stale());
249 load_avg = DecayCounter(rate);
91327a77
AA
250 }
251 uint64_t get_load_avg() const {
11fdf7f2 252 return (uint64_t)load_avg.get();
91327a77
AA
253 }
254 void hit_session() {
11fdf7f2 255 load_avg.adjust();
91327a77
AA
256 }
257
258 double get_session_uptime() const {
259 chrono::duration<double> uptime = clock::now() - birth_time;
260 return uptime.count();
261 }
262
263 time get_birth_time() const {
264 return birth_time;
265 }
266
7c673cae
FG
267 // -- caps --
268private:
a8e16298
TL
269 uint32_t cap_gen = 0;
270 version_t cap_push_seq = 0; // cap push seq #
11fdf7f2 271 map<version_t, MDSContext::vec > waitfor_flush; // flush session messages
7c673cae
FG
272
273public:
274 xlist<Capability*> caps; // inodes with caps; front=most recently used
275 xlist<ClientLease*> leases; // metadata leases to clients
11fdf7f2
TL
276 time last_cap_renew = clock::zero();
277 time last_seen = clock::zero();
7c673cae 278
a8e16298
TL
279 void inc_cap_gen() { ++cap_gen; }
280 uint32_t get_cap_gen() const { return cap_gen; }
281
7c673cae
FG
282 version_t inc_push_seq() { return ++cap_push_seq; }
283 version_t get_push_seq() const { return cap_push_seq; }
284
11fdf7f2 285 version_t wait_for_flush(MDSContext* c) {
7c673cae
FG
286 waitfor_flush[get_push_seq()].push_back(c);
287 return get_push_seq();
288 }
11fdf7f2 289 void finish_flush(version_t seq, MDSContext::vec& ls) {
7c673cae 290 while (!waitfor_flush.empty()) {
11fdf7f2
TL
291 auto it = waitfor_flush.begin();
292 if (it->first > seq)
7c673cae 293 break;
11fdf7f2
TL
294 auto& v = it->second;
295 ls.insert(ls.end(), v.begin(), v.end());
296 waitfor_flush.erase(it);
7c673cae
FG
297 }
298 }
299
a8e16298
TL
300 void touch_cap(Capability *cap) {
301 caps.push_front(&cap->item_session_caps);
302 }
303 void touch_cap_bottom(Capability *cap) {
7c673cae
FG
304 caps.push_back(&cap->item_session_caps);
305 }
306 void touch_lease(ClientLease *r) {
307 leases.push_back(&r->item_session_lease);
308 }
309
310 // -- leases --
a8e16298 311 uint32_t lease_seq = 0;
7c673cae
FG
312
313 // -- completed requests --
314private:
315 // Has completed_requests been modified since the last time we
316 // wrote this session out?
a8e16298 317 bool completed_requests_dirty = false;
7c673cae 318
a8e16298
TL
319 unsigned num_trim_flushes_warnings = 0;
320 unsigned num_trim_requests_warnings = 0;
7c673cae
FG
321public:
322 void add_completed_request(ceph_tid_t t, inodeno_t created) {
323 info.completed_requests[t] = created;
324 completed_requests_dirty = true;
325 }
326 bool trim_completed_requests(ceph_tid_t mintid) {
327 // trim
328 bool erased_any = false;
329 while (!info.completed_requests.empty() &&
330 (mintid == 0 || info.completed_requests.begin()->first < mintid)) {
331 info.completed_requests.erase(info.completed_requests.begin());
332 erased_any = true;
333 }
334
335 if (erased_any) {
336 completed_requests_dirty = true;
337 }
338 return erased_any;
339 }
340 bool have_completed_request(ceph_tid_t tid, inodeno_t *pcreated) const {
341 map<ceph_tid_t,inodeno_t>::const_iterator p = info.completed_requests.find(tid);
342 if (p == info.completed_requests.end())
343 return false;
344 if (pcreated)
345 *pcreated = p->second;
346 return true;
347 }
348
349 void add_completed_flush(ceph_tid_t tid) {
350 info.completed_flushes.insert(tid);
351 }
352 bool trim_completed_flushes(ceph_tid_t mintid) {
353 bool erased_any = false;
354 while (!info.completed_flushes.empty() &&
355 (mintid == 0 || *info.completed_flushes.begin() < mintid)) {
356 info.completed_flushes.erase(info.completed_flushes.begin());
357 erased_any = true;
358 }
359 if (erased_any) {
360 completed_requests_dirty = true;
361 }
362 return erased_any;
363 }
364 bool have_completed_flush(ceph_tid_t tid) const {
365 return info.completed_flushes.count(tid);
366 }
367
368 unsigned get_num_completed_flushes() const { return info.completed_flushes.size(); }
369 unsigned get_num_trim_flushes_warnings() const {
370 return num_trim_flushes_warnings;
371 }
372 void inc_num_trim_flushes_warnings() { ++num_trim_flushes_warnings; }
373 void reset_num_trim_flushes_warnings() { num_trim_flushes_warnings = 0; }
374
375 unsigned get_num_completed_requests() const { return info.completed_requests.size(); }
376 unsigned get_num_trim_requests_warnings() const {
377 return num_trim_requests_warnings;
378 }
379 void inc_num_trim_requests_warnings() { ++num_trim_requests_warnings; }
380 void reset_num_trim_requests_warnings() { num_trim_requests_warnings = 0; }
381
382 bool has_dirty_completed_requests() const
383 {
384 return completed_requests_dirty;
385 }
386
387 void clear_dirty_completed_requests()
388 {
389 completed_requests_dirty = false;
390 }
391
392 int check_access(CInode *in, unsigned mask, int caller_uid, int caller_gid,
393 const vector<uint64_t> *gid_list, int new_uid, int new_gid);
394
a8e16298
TL
395 Session() = delete;
396 Session(ConnectionRef con) :
11fdf7f2
TL
397 recall_caps(g_conf().get_val<double>("mds_recall_warning_decay_rate")),
398 release_caps(g_conf().get_val<double>("mds_recall_warning_decay_rate")),
399 recall_caps_throttle(g_conf().get_val<double>("mds_recall_max_decay_rate")),
400 recall_caps_throttle2o(0.5),
a8e16298
TL
401 birth_time(clock::now()),
402 auth_caps(g_ceph_context),
403 item_session_list(this),
404 requests(0) // member_offset passed to front() manually
405 {
11fdf7f2 406 set_connection(std::move(con));
a8e16298 407 }
7c673cae 408 ~Session() override {
28e407b8
AA
409 if (state == STATE_CLOSED) {
410 item_session_list.remove_myself();
411 } else {
11fdf7f2 412 ceph_assert(!item_session_list.is_on_list());
28e407b8 413 }
11fdf7f2
TL
414 preopen_out_queue.clear();
415 }
416
417 void set_connection(ConnectionRef con) {
418 connection = std::move(con);
419 if (connection) {
420 socket_addr = connection->get_peer_socket_addr();
7c673cae
FG
421 }
422 }
11fdf7f2
TL
423 const ConnectionRef& get_connection() const {
424 return connection;
425 }
7c673cae
FG
426
427 void clear() {
428 pending_prealloc_inos.clear();
429 info.clear_meta();
430
431 cap_push_seq = 0;
11fdf7f2 432 last_cap_renew = clock::zero();
7c673cae
FG
433 }
434};
435
436class SessionFilter
437{
438protected:
439 // First is whether to filter, second is filter value
440 std::pair<bool, bool> reconnecting;
441
442public:
443 std::map<std::string, std::string> metadata;
444 std::string auth_name;
445 std::string state;
446 int64_t id;
447
448 SessionFilter()
449 : reconnecting(false, false), id(0)
450 {}
451
452 bool match(
453 const Session &session,
454 std::function<bool(client_t)> is_reconnecting) const;
455 int parse(const std::vector<std::string> &args, std::stringstream *ss);
456 void set_reconnecting(bool v)
457 {
458 reconnecting.first = true;
459 reconnecting.second = v;
460 }
461};
462
463/*
464 * session map
465 */
466
467class MDSRank;
468
469/**
470 * Encapsulate the serialized state associated with SessionMap. Allows
471 * encode/decode outside of live MDS instance.
472 */
473class SessionMapStore {
91327a77
AA
474public:
475 using clock = Session::clock;
476 using time = Session::time;
477
7c673cae
FG
478protected:
479 version_t version;
480 ceph::unordered_map<entity_name_t, Session*> session_map;
481 PerfCounters *logger;
91327a77
AA
482
483 // total request load avg
484 double decay_rate;
485 DecayCounter total_load_avg;
91327a77 486
7c673cae
FG
487public:
488 mds_rank_t rank;
489
490 version_t get_version() const {return version;}
491
492 virtual void encode_header(bufferlist *header_bl);
493 virtual void decode_header(bufferlist &header_bl);
494 virtual void decode_values(std::map<std::string, bufferlist> &session_vals);
11fdf7f2 495 virtual void decode_legacy(bufferlist::const_iterator& blp);
7c673cae
FG
496 void dump(Formatter *f) const;
497
498 void set_rank(mds_rank_t r)
499 {
500 rank = r;
501 }
502
503 Session* get_or_add_session(const entity_inst_t& i) {
504 Session *s;
505 auto session_map_entry = session_map.find(i.name);
506 if (session_map_entry != session_map.end()) {
507 s = session_map_entry->second;
508 } else {
a8e16298 509 s = session_map[i.name] = new Session(ConnectionRef());
7c673cae 510 s->info.inst = i;
91327a77 511 s->last_cap_renew = Session::clock::now();
7c673cae
FG
512 if (logger) {
513 logger->set(l_mdssm_session_count, session_map.size());
514 logger->inc(l_mdssm_session_add);
515 }
516 }
517
518 return s;
519 }
520
521 static void generate_test_instances(list<SessionMapStore*>& ls);
522
523 void reset_state()
524 {
525 session_map.clear();
526 }
527
91327a77
AA
528 SessionMapStore()
529 : version(0), logger(nullptr),
11fdf7f2
TL
530 decay_rate(g_conf().get_val<double>("mds_request_load_average_decay_rate")),
531 total_load_avg(decay_rate), rank(MDS_RANK_NONE) {
91327a77 532 }
7c673cae
FG
533 virtual ~SessionMapStore() {};
534};
535
536class SessionMap : public SessionMapStore {
537public:
538 MDSRank *mds;
539
540protected:
a8e16298 541 version_t projected = 0, committing = 0, committed = 0;
7c673cae
FG
542public:
543 map<int,xlist<Session*>* > by_state;
544 uint64_t set_state(Session *session, int state);
11fdf7f2 545 map<version_t, MDSContext::vec > commit_waiters;
91327a77 546 void update_average_session_age();
7c673cae 547
a8e16298
TL
548 SessionMap() = delete;
549 explicit SessionMap(MDSRank *m) : mds(m) {}
7c673cae
FG
550
551 ~SessionMap() override
552 {
553 for (auto p : by_state)
554 delete p.second;
555
556 if (logger) {
557 g_ceph_context->get_perfcounters_collection()->remove(logger);
558 }
559
560 delete logger;
561 }
562
563 void register_perfcounters();
564
565 void set_version(const version_t v)
566 {
567 version = projected = v;
568 }
569
570 void set_projected(const version_t v)
571 {
572 projected = v;
573 }
574
575 version_t get_projected() const
576 {
577 return projected;
578 }
579
580 version_t get_committed() const
581 {
582 return committed;
583 }
584
585 version_t get_committing() const
586 {
587 return committing;
588 }
589
590 // sessions
11fdf7f2 591 void decode_legacy(bufferlist::const_iterator& blp) override;
7c673cae 592 bool empty() const { return session_map.empty(); }
11fdf7f2 593 const ceph::unordered_map<entity_name_t, Session*>& get_sessions() const
7c673cae
FG
594 {
595 return session_map;
596 }
597
598 bool is_any_state(int state) const {
599 map<int,xlist<Session*>* >::const_iterator p = by_state.find(state);
600 if (p == by_state.end() || p->second->empty())
601 return false;
602 return true;
603 }
604
605 bool have_unclosed_sessions() const {
606 return
607 is_any_state(Session::STATE_OPENING) ||
608 is_any_state(Session::STATE_OPEN) ||
609 is_any_state(Session::STATE_CLOSING) ||
610 is_any_state(Session::STATE_STALE) ||
611 is_any_state(Session::STATE_KILLING);
612 }
613 bool have_session(entity_name_t w) const {
614 return session_map.count(w);
615 }
616 Session* get_session(entity_name_t w) {
617 auto session_map_entry = session_map.find(w);
618 return (session_map_entry != session_map.end() ?
619 session_map_entry-> second : nullptr);
620 }
621 const Session* get_session(entity_name_t w) const {
622 ceph::unordered_map<entity_name_t, Session*>::const_iterator p = session_map.find(w);
623 if (p == session_map.end()) {
624 return NULL;
625 } else {
626 return p->second;
627 }
628 }
629
630 void add_session(Session *s);
631 void remove_session(Session *s);
632 void touch_session(Session *session);
633
634 Session *get_oldest_session(int state) {
635 auto by_state_entry = by_state.find(state);
636 if (by_state_entry == by_state.end() || by_state_entry->second->empty())
637 return 0;
638 return by_state_entry->second->front();
639 }
640
641 void dump();
642
a8e16298
TL
643 template<typename F>
644 void get_client_sessions(F&& f) const {
645 for (const auto& p : session_map) {
646 auto& session = p.second;
647 if (session->info.inst.name.is_client())
648 f(session);
649 }
650 }
651 template<typename C>
652 void get_client_session_set(C& c) const {
11fdf7f2 653 auto f = [&c](auto& s) {
a8e16298
TL
654 c.insert(s);
655 };
656 get_client_sessions(f);
7c673cae
FG
657 }
658
7c673cae
FG
659 // helpers
660 entity_inst_t& get_inst(entity_name_t w) {
11fdf7f2 661 ceph_assert(session_map.count(w));
7c673cae
FG
662 return session_map[w]->info.inst;
663 }
7c673cae
FG
664 version_t get_push_seq(client_t client) {
665 return get_session(entity_name_t::CLIENT(client.v))->get_push_seq();
666 }
667 bool have_completed_request(metareqid_t rid) {
668 Session *session = get_session(rid.name);
669 return session && session->have_completed_request(rid.tid, NULL);
670 }
671 void trim_completed_requests(entity_name_t c, ceph_tid_t tid) {
672 Session *session = get_session(c);
11fdf7f2 673 ceph_assert(session);
7c673cae
FG
674 session->trim_completed_requests(tid);
675 }
676
677 void wipe();
678 void wipe_ino_prealloc();
679
680 // -- loading, saving --
681 inodeno_t ino;
11fdf7f2 682 MDSContext::vec waiting_for_load;
7c673cae
FG
683
684 object_t get_object_name() const;
685
11fdf7f2 686 void load(MDSContext *onload);
7c673cae
FG
687 void _load_finish(
688 int operation_r,
689 int header_r,
690 int values_r,
691 bool first,
692 bufferlist &header_bl,
693 std::map<std::string, bufferlist> &session_vals,
694 bool more_session_vals);
695
696 void load_legacy();
697 void _load_legacy_finish(int r, bufferlist &bl);
698
11fdf7f2 699 void save(MDSContext *onsave, version_t needv=0);
7c673cae
FG
700 void _save_finish(version_t v);
701
702protected:
703 std::set<entity_name_t> dirty_sessions;
704 std::set<entity_name_t> null_sessions;
a8e16298 705 bool loaded_legacy = false;
81eedcae 706 void _mark_dirty(Session *session, bool may_save);
7c673cae
FG
707public:
708
709 /**
710 * Advance the version, and mark this session
711 * as dirty within the new version.
712 *
713 * Dirty means journalled but needing writeback
714 * to the backing store. Must have called
715 * mark_projected previously for this session.
716 */
81eedcae 717 void mark_dirty(Session *session, bool may_save=true);
7c673cae
FG
718
719 /**
720 * Advance the projected version, and mark this
721 * session as projected within the new version
722 *
723 * Projected means the session is updated in memory
724 * but we're waiting for the journal write of the update
725 * to finish. Must subsequently call mark_dirty
726 * for sessions in the same global order as calls
727 * to mark_projected.
728 */
729 version_t mark_projected(Session *session);
730
731 /**
732 * During replay, advance versions to account
733 * for a session modification, and mark the
734 * session dirty.
735 */
736 void replay_dirty_session(Session *session);
737
738 /**
739 * During replay, if a session no longer present
740 * would have consumed a version, advance `version`
741 * and `projected` to account for that.
742 */
743 void replay_advance_version();
744
81eedcae
TL
745 /**
746 * During replay, open sessions, advance versions and
747 * mark these sessions as dirty.
748 */
749 void replay_open_sessions(version_t event_cmapv,
750 map<client_t,entity_inst_t>& client_map,
751 map<client_t,client_metadata_t>& client_metadata_map);
752
7c673cae
FG
753 /**
754 * For these session IDs, if a session exists with this ID, and it has
755 * dirty completed_requests, then persist it immediately
756 * (ahead of usual project/dirty versioned writes
757 * of the map).
758 */
759 void save_if_dirty(const std::set<entity_name_t> &tgt_sessions,
760 MDSGatherBuilder *gather_bld);
91327a77
AA
761
762private:
11fdf7f2 763 time avg_birth_time = clock::zero();
91327a77
AA
764
765 uint64_t get_session_count_in_state(int state) {
766 return !is_any_state(state) ? 0 : by_state[state]->size();
767 }
768
769 void update_average_birth_time(const Session &s, bool added=true) {
770 uint32_t sessions = session_map.size();
771 time birth_time = s.get_birth_time();
772
773 if (sessions == 1) {
11fdf7f2 774 avg_birth_time = added ? birth_time : clock::zero();
91327a77
AA
775 return;
776 }
777
778 if (added) {
779 avg_birth_time = clock::time_point(
11fdf7f2
TL
780 ((avg_birth_time - clock::zero()) / sessions) * (sessions - 1) +
781 (birth_time - clock::zero()) / sessions);
91327a77
AA
782 } else {
783 avg_birth_time = clock::time_point(
11fdf7f2
TL
784 ((avg_birth_time - clock::zero()) / (sessions - 1)) * sessions -
785 (birth_time - clock::zero()) / (sessions - 1));
91327a77
AA
786 }
787 }
788
789public:
790 void hit_session(Session *session);
11fdf7f2 791 void handle_conf_change(const ConfigProxy &conf,
91327a77 792 const std::set <std::string> &changed);
7c673cae
FG
793};
794
795std::ostream& operator<<(std::ostream &out, const Session &s);
796
797
798#endif