]> git.proxmox.com Git - ceph.git/blame - ceph/src/mds/SessionMap.cc
import ceph pacific 16.2.5
[ceph.git] / ceph / src / mds / SessionMap.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15#include "MDSRank.h"
16#include "MDCache.h"
17#include "Mutation.h"
18#include "SessionMap.h"
19#include "osdc/Filer.h"
20#include "common/Finisher.h"
21
22#include "common/config.h"
23#include "common/errno.h"
91327a77 24#include "common/DecayCounter.h"
11fdf7f2 25#include "include/ceph_assert.h"
7c673cae
FG
26#include "include/stringify.h"
27
28#define dout_context g_ceph_context
29#define dout_subsys ceph_subsys_mds
30#undef dout_prefix
31#define dout_prefix *_dout << "mds." << rank << ".sessionmap "
32
33namespace {
34class SessionMapIOContext : public MDSIOContextBase
35{
36 protected:
37 SessionMap *sessionmap;
38 MDSRank *get_mds() override {return sessionmap->mds;}
39 public:
40 explicit SessionMapIOContext(SessionMap *sessionmap_) : sessionmap(sessionmap_) {
11fdf7f2 41 ceph_assert(sessionmap != NULL);
7c673cae
FG
42 }
43};
44};
45
46void SessionMap::register_perfcounters()
47{
48 PerfCountersBuilder plb(g_ceph_context, "mds_sessions",
49 l_mdssm_first, l_mdssm_last);
91327a77 50
7c673cae 51 plb.add_u64(l_mdssm_session_count, "session_count",
b32b8144 52 "Session count", "sess", PerfCountersBuilder::PRIO_INTERESTING);
91327a77
AA
53
54 plb.set_prio_default(PerfCountersBuilder::PRIO_USEFUL);
7c673cae
FG
55 plb.add_u64_counter(l_mdssm_session_add, "session_add",
56 "Sessions added");
57 plb.add_u64_counter(l_mdssm_session_remove, "session_remove",
58 "Sessions removed");
91327a77
AA
59 plb.add_u64(l_mdssm_session_open, "sessions_open",
60 "Sessions currently open");
61 plb.add_u64(l_mdssm_session_stale, "sessions_stale",
62 "Sessions currently stale");
63 plb.add_u64(l_mdssm_total_load, "total_load", "Total Load");
64 plb.add_u64(l_mdssm_avg_load, "average_load", "Average Load");
65 plb.add_u64(l_mdssm_avg_session_uptime, "avg_session_uptime",
66 "Average session uptime");
67
7c673cae
FG
68 logger = plb.create_perf_counters();
69 g_ceph_context->get_perfcounters_collection()->add(logger);
70}
71
72void SessionMap::dump()
73{
74 dout(10) << "dump" << dendl;
75 for (ceph::unordered_map<entity_name_t,Session*>::iterator p = session_map.begin();
76 p != session_map.end();
77 ++p)
78 dout(10) << p->first << " " << p->second
79 << " state " << p->second->get_state_name()
80 << " completed " << p->second->info.completed_requests
f67539c2 81 << " free_prealloc_inos " << p->second->free_prealloc_inos
9f95a23c 82 << " delegated_inos " << p->second->delegated_inos
7c673cae
FG
83 << dendl;
84}
85
86
87// ----------------
88// LOAD
89
90
91object_t SessionMap::get_object_name() const
92{
93 char s[30];
94 snprintf(s, sizeof(s), "mds%d_sessionmap", int(mds->get_nodeid()));
95 return object_t(s);
96}
97
98namespace {
99class C_IO_SM_Load : public SessionMapIOContext {
100public:
101 const bool first; //< Am I the initial (header) load?
102 int header_r; //< Return value from OMAP header read
103 int values_r; //< Return value from OMAP value read
104 bufferlist header_bl;
105 std::map<std::string, bufferlist> session_vals;
106 bool more_session_vals = false;
107
108 C_IO_SM_Load(SessionMap *cm, const bool f)
109 : SessionMapIOContext(cm), first(f), header_r(0), values_r(0) {}
110
111 void finish(int r) override {
112 sessionmap->_load_finish(r, header_r, values_r, first, header_bl, session_vals,
113 more_session_vals);
114 }
91327a77
AA
115 void print(ostream& out) const override {
116 out << "session_load";
117 }
7c673cae
FG
118};
119}
120
121
122/**
123 * Decode OMAP header. Call this once when loading.
124 */
125void SessionMapStore::decode_header(
126 bufferlist &header_bl)
127{
11fdf7f2 128 auto q = header_bl.cbegin();
7c673cae 129 DECODE_START(1, q)
11fdf7f2 130 decode(version, q);
7c673cae
FG
131 DECODE_FINISH(q);
132}
133
134void SessionMapStore::encode_header(
135 bufferlist *header_bl)
136{
137 ENCODE_START(1, 1, *header_bl);
11fdf7f2 138 encode(version, *header_bl);
7c673cae
FG
139 ENCODE_FINISH(*header_bl);
140}
141
142/**
143 * Decode and insert some serialized OMAP values. Call this
144 * repeatedly to insert batched loads.
145 */
146void SessionMapStore::decode_values(std::map<std::string, bufferlist> &session_vals)
147{
148 for (std::map<std::string, bufferlist>::iterator i = session_vals.begin();
149 i != session_vals.end(); ++i) {
150
151 entity_inst_t inst;
152
153 bool parsed = inst.name.parse(i->first);
154 if (!parsed) {
155 derr << "Corrupt entity name '" << i->first << "' in sessionmap" << dendl;
156 throw buffer::malformed_input("Corrupt entity name in sessionmap");
157 }
158
159 Session *s = get_or_add_session(inst);
91327a77 160 if (s->is_closed()) {
7c673cae 161 s->set_state(Session::STATE_OPEN);
91327a77
AA
162 s->set_load_avg_decay_rate(decay_rate);
163 }
11fdf7f2 164 auto q = i->second.cbegin();
7c673cae
FG
165 s->decode(q);
166 }
167}
168
169/**
170 * An OMAP read finished.
171 */
172void SessionMap::_load_finish(
173 int operation_r,
174 int header_r,
175 int values_r,
176 bool first,
177 bufferlist &header_bl,
178 std::map<std::string, bufferlist> &session_vals,
179 bool more_session_vals)
180{
181 if (operation_r < 0) {
182 derr << "_load_finish got " << cpp_strerror(operation_r) << dendl;
183 mds->clog->error() << "error reading sessionmap '" << get_object_name()
184 << "' " << operation_r << " ("
185 << cpp_strerror(operation_r) << ")";
186 mds->damaged();
187 ceph_abort(); // Should be unreachable because damaged() calls respawn()
188 }
189
190 // Decode header
191 if (first) {
192 if (header_r != 0) {
193 derr << __func__ << ": header error: " << cpp_strerror(header_r) << dendl;
194 mds->clog->error() << "error reading sessionmap header "
195 << header_r << " (" << cpp_strerror(header_r) << ")";
196 mds->damaged();
197 ceph_abort(); // Should be unreachable because damaged() calls respawn()
198 }
199
200 if(header_bl.length() == 0) {
201 dout(4) << __func__ << ": header missing, loading legacy..." << dendl;
202 load_legacy();
203 return;
204 }
205
206 try {
207 decode_header(header_bl);
208 } catch (buffer::error &e) {
209 mds->clog->error() << "corrupt sessionmap header: " << e.what();
210 mds->damaged();
211 ceph_abort(); // Should be unreachable because damaged() calls respawn()
212 }
213 dout(10) << __func__ << " loaded version " << version << dendl;
214 }
215
216 if (values_r != 0) {
217 derr << __func__ << ": error reading values: "
218 << cpp_strerror(values_r) << dendl;
219 mds->clog->error() << "error reading sessionmap values: "
220 << values_r << " (" << cpp_strerror(values_r) << ")";
221 mds->damaged();
222 ceph_abort(); // Should be unreachable because damaged() calls respawn()
223 }
224
225 // Decode session_vals
226 try {
227 decode_values(session_vals);
228 } catch (buffer::error &e) {
229 mds->clog->error() << "corrupt sessionmap values: " << e.what();
230 mds->damaged();
231 ceph_abort(); // Should be unreachable because damaged() calls respawn()
232 }
233
234 if (more_session_vals) {
235 // Issue another read if we're not at the end of the omap
236 const std::string last_key = session_vals.rbegin()->first;
237 dout(10) << __func__ << ": continue omap load from '"
238 << last_key << "'" << dendl;
239 object_t oid = get_object_name();
b3b6e05e 240 object_locator_t oloc(mds->get_metadata_pool());
7c673cae
FG
241 C_IO_SM_Load *c = new C_IO_SM_Load(this, false);
242 ObjectOperation op;
11fdf7f2 243 op.omap_get_vals(last_key, "", g_conf()->mds_sessionmap_keys_per_op,
7c673cae
FG
244 &c->session_vals, &c->more_session_vals, &c->values_r);
245 mds->objecter->read(oid, oloc, op, CEPH_NOSNAP, NULL, 0,
246 new C_OnFinisher(c, mds->finisher));
247 } else {
248 // I/O is complete. Update `by_state`
249 dout(10) << __func__ << ": omap load complete" << dendl;
250 for (ceph::unordered_map<entity_name_t, Session*>::iterator i = session_map.begin();
251 i != session_map.end(); ++i) {
252 Session *s = i->second;
253 auto by_state_entry = by_state.find(s->get_state());
254 if (by_state_entry == by_state.end())
255 by_state_entry = by_state.emplace(s->get_state(),
256 new xlist<Session*>).first;
257 by_state_entry->second->push_back(&s->item_session_list);
258 }
259
260 // Population is complete. Trigger load waiters.
261 dout(10) << __func__ << ": v " << version
262 << ", " << session_map.size() << " sessions" << dendl;
263 projected = committing = committed = version;
264 dump();
265 finish_contexts(g_ceph_context, waiting_for_load);
266 }
267}
268
269/**
270 * Populate session state from OMAP records in this
271 * rank's sessionmap object.
272 */
11fdf7f2 273void SessionMap::load(MDSContext *onload)
7c673cae
FG
274{
275 dout(10) << "load" << dendl;
276
277 if (onload)
278 waiting_for_load.push_back(onload);
279
280 C_IO_SM_Load *c = new C_IO_SM_Load(this, true);
281 object_t oid = get_object_name();
b3b6e05e 282 object_locator_t oloc(mds->get_metadata_pool());
7c673cae
FG
283
284 ObjectOperation op;
285 op.omap_get_header(&c->header_bl, &c->header_r);
11fdf7f2 286 op.omap_get_vals("", "", g_conf()->mds_sessionmap_keys_per_op,
7c673cae
FG
287 &c->session_vals, &c->more_session_vals, &c->values_r);
288
289 mds->objecter->read(oid, oloc, op, CEPH_NOSNAP, NULL, 0, new C_OnFinisher(c, mds->finisher));
290}
291
292namespace {
293class C_IO_SM_LoadLegacy : public SessionMapIOContext {
294public:
295 bufferlist bl;
296 explicit C_IO_SM_LoadLegacy(SessionMap *cm) : SessionMapIOContext(cm) {}
297 void finish(int r) override {
298 sessionmap->_load_legacy_finish(r, bl);
299 }
91327a77
AA
300 void print(ostream& out) const override {
301 out << "session_load_legacy";
302 }
7c673cae
FG
303};
304}
305
306
307/**
308 * Load legacy (object data blob) SessionMap format, assuming
309 * that waiting_for_load has already been populated with
310 * the relevant completion. This is the fallback if we do not
311 * find an OMAP header when attempting to load normally.
312 */
313void SessionMap::load_legacy()
314{
315 dout(10) << __func__ << dendl;
316
317 C_IO_SM_LoadLegacy *c = new C_IO_SM_LoadLegacy(this);
318 object_t oid = get_object_name();
b3b6e05e 319 object_locator_t oloc(mds->get_metadata_pool());
7c673cae
FG
320
321 mds->objecter->read_full(oid, oloc, CEPH_NOSNAP, &c->bl, 0,
322 new C_OnFinisher(c, mds->finisher));
323}
324
325void SessionMap::_load_legacy_finish(int r, bufferlist &bl)
326{
11fdf7f2 327 auto blp = bl.cbegin();
7c673cae
FG
328 if (r < 0) {
329 derr << "_load_finish got " << cpp_strerror(r) << dendl;
11fdf7f2 330 ceph_abort_msg("failed to load sessionmap");
7c673cae
FG
331 }
332 dump();
333 decode_legacy(blp); // note: this sets last_cap_renew = now()
334 dout(10) << "_load_finish v " << version
335 << ", " << session_map.size() << " sessions, "
336 << bl.length() << " bytes"
337 << dendl;
338 projected = committing = committed = version;
339 dump();
340
341 // Mark all sessions dirty, so that on next save() we will write
342 // a complete OMAP version of the data loaded from the legacy format
343 for (ceph::unordered_map<entity_name_t, Session*>::iterator i = session_map.begin();
344 i != session_map.end(); ++i) {
345 // Don't use mark_dirty because on this occasion we want to ignore the
346 // keys_per_op limit and do one big write (upgrade must be atomic)
347 dirty_sessions.insert(i->first);
348 }
349 loaded_legacy = true;
350
351 finish_contexts(g_ceph_context, waiting_for_load);
352}
353
354
355// ----------------
356// SAVE
357
358namespace {
359class C_IO_SM_Save : public SessionMapIOContext {
360 version_t version;
361public:
362 C_IO_SM_Save(SessionMap *cm, version_t v) : SessionMapIOContext(cm), version(v) {}
363 void finish(int r) override {
364 if (r != 0) {
365 get_mds()->handle_write_error(r);
366 } else {
367 sessionmap->_save_finish(version);
368 }
369 }
91327a77
AA
370 void print(ostream& out) const override {
371 out << "session_save";
372 }
7c673cae
FG
373};
374}
375
11fdf7f2 376void SessionMap::save(MDSContext *onsave, version_t needv)
7c673cae
FG
377{
378 dout(10) << __func__ << ": needv " << needv << ", v " << version << dendl;
379
380 if (needv && committing >= needv) {
11fdf7f2 381 ceph_assert(committing > committed);
7c673cae
FG
382 commit_waiters[committing].push_back(onsave);
383 return;
384 }
385
386 commit_waiters[version].push_back(onsave);
387
388 committing = version;
389 SnapContext snapc;
390 object_t oid = get_object_name();
b3b6e05e 391 object_locator_t oloc(mds->get_metadata_pool());
7c673cae
FG
392
393 ObjectOperation op;
394
395 /* Compose OSD OMAP transaction for full write */
396 bufferlist header_bl;
397 encode_header(&header_bl);
398 op.omap_set_header(header_bl);
399
400 /* If we loaded a legacy sessionmap, then erase the old data. If
401 * an old-versioned MDS tries to read it, it'll fail out safely
402 * with an end_of_buffer exception */
403 if (loaded_legacy) {
404 dout(4) << __func__ << " erasing legacy sessionmap" << dendl;
405 op.truncate(0);
406 loaded_legacy = false; // only need to truncate once.
407 }
408
409 dout(20) << " updating keys:" << dendl;
410 map<string, bufferlist> to_set;
411 for(std::set<entity_name_t>::iterator i = dirty_sessions.begin();
412 i != dirty_sessions.end(); ++i) {
413 const entity_name_t name = *i;
414 Session *session = session_map[name];
415
416 if (session->is_open() ||
417 session->is_closing() ||
418 session->is_stale() ||
419 session->is_killing()) {
420 dout(20) << " " << name << dendl;
421 // Serialize K
f67539c2
TL
422 CachedStackStringStream css;
423 *css << name;
7c673cae
FG
424
425 // Serialize V
426 bufferlist bl;
427 session->info.encode(bl, mds->mdsmap->get_up_features());
428
429 // Add to RADOS op
f67539c2 430 to_set[std::string(css->strv())] = bl;
7c673cae
FG
431
432 session->clear_dirty_completed_requests();
433 } else {
434 dout(20) << " " << name << " (ignoring)" << dendl;
435 }
436 }
437 if (!to_set.empty()) {
438 op.omap_set(to_set);
439 }
440
441 dout(20) << " removing keys:" << dendl;
442 set<string> to_remove;
443 for(std::set<entity_name_t>::const_iterator i = null_sessions.begin();
444 i != null_sessions.end(); ++i) {
445 dout(20) << " " << *i << dendl;
f67539c2
TL
446 CachedStackStringStream css;
447 *css << *i;
448 to_remove.insert(css->str());
7c673cae
FG
449 }
450 if (!to_remove.empty()) {
451 op.omap_rm_keys(to_remove);
452 }
453
454 dirty_sessions.clear();
455 null_sessions.clear();
456
457 mds->objecter->mutate(oid, oloc, op, snapc,
458 ceph::real_clock::now(),
459 0,
460 new C_OnFinisher(new C_IO_SM_Save(this, version),
461 mds->finisher));
462}
463
464void SessionMap::_save_finish(version_t v)
465{
466 dout(10) << "_save_finish v" << v << dendl;
467 committed = v;
468
469 finish_contexts(g_ceph_context, commit_waiters[v]);
470 commit_waiters.erase(v);
471}
472
473
474/**
475 * Deserialize sessions, and update by_state index
476 */
11fdf7f2 477void SessionMap::decode_legacy(bufferlist::const_iterator &p)
7c673cae
FG
478{
479 // Populate `sessions`
480 SessionMapStore::decode_legacy(p);
481
482 // Update `by_state`
483 for (ceph::unordered_map<entity_name_t, Session*>::iterator i = session_map.begin();
484 i != session_map.end(); ++i) {
485 Session *s = i->second;
486 auto by_state_entry = by_state.find(s->get_state());
487 if (by_state_entry == by_state.end())
488 by_state_entry = by_state.emplace(s->get_state(),
489 new xlist<Session*>).first;
490 by_state_entry->second->push_back(&s->item_session_list);
491 }
492}
493
494uint64_t SessionMap::set_state(Session *session, int s) {
495 if (session->state != s) {
496 session->set_state(s);
497 auto by_state_entry = by_state.find(s);
498 if (by_state_entry == by_state.end())
499 by_state_entry = by_state.emplace(s, new xlist<Session*>).first;
500 by_state_entry->second->push_back(&session->item_session_list);
91327a77
AA
501
502 if (session->is_open() || session->is_stale()) {
503 session->set_load_avg_decay_rate(decay_rate);
504 }
505
506 // refresh number of sessions for states which have perf
507 // couters associated
508 logger->set(l_mdssm_session_open,
509 get_session_count_in_state(Session::STATE_OPEN));
510 logger->set(l_mdssm_session_stale,
511 get_session_count_in_state(Session::STATE_STALE));
7c673cae 512 }
91327a77 513
7c673cae
FG
514 return session->get_state_seq();
515}
516
11fdf7f2 517void SessionMapStore::decode_legacy(bufferlist::const_iterator& p)
7c673cae 518{
91327a77 519 auto now = clock::now();
7c673cae 520 uint64_t pre;
11fdf7f2 521 decode(pre, p);
7c673cae
FG
522 if (pre == (uint64_t)-1) {
523 DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, p);
11fdf7f2 524 ceph_assert(struct_v >= 2);
7c673cae 525
11fdf7f2 526 decode(version, p);
7c673cae
FG
527
528 while (!p.end()) {
529 entity_inst_t inst;
11fdf7f2 530 decode(inst.name, p);
7c673cae 531 Session *s = get_or_add_session(inst);
91327a77 532 if (s->is_closed()) {
7c673cae 533 s->set_state(Session::STATE_OPEN);
91327a77
AA
534 s->set_load_avg_decay_rate(decay_rate);
535 }
7c673cae
FG
536 s->decode(p);
537 }
538
539 DECODE_FINISH(p);
540 } else {
541 // --- old format ----
542 version = pre;
543
544 // this is a meaningless upper bound. can be ignored.
545 __u32 n;
11fdf7f2 546 decode(n, p);
7c673cae
FG
547
548 while (n-- && !p.end()) {
a8e16298
TL
549 auto p2 = p;
550 Session *s = new Session(ConnectionRef());
7c673cae 551 s->info.decode(p);
92f5a8d4
TL
552 {
553 auto& name = s->info.inst.name;
554 auto it = session_map.find(name);
555 if (it != session_map.end()) {
556 // eager client connected too fast! aie.
557 dout(10) << " already had session for " << name << ", recovering" << dendl;
558 delete s;
559 s = it->second;
560 p = p2;
561 s->info.decode(p);
562 } else {
563 it->second = s;
564 }
7c673cae
FG
565 }
566 s->set_state(Session::STATE_OPEN);
91327a77 567 s->set_load_avg_decay_rate(decay_rate);
7c673cae
FG
568 s->last_cap_renew = now;
569 }
570 }
571}
572
adb31ebb 573void Session::dump(Formatter *f, bool cap_dump) const
92f5a8d4
TL
574{
575 f->dump_int("id", info.inst.name.num());
576 f->dump_object("entity", info.inst);
577 f->dump_string("state", get_state_name());
578 f->dump_int("num_leases", leases.size());
579 f->dump_int("num_caps", caps.size());
adb31ebb
TL
580 if (cap_dump) {
581 f->open_array_section("caps");
582 for (const auto& cap : caps) {
583 f->dump_object("cap", *cap);
584 }
585 f->close_section();
586 }
92f5a8d4
TL
587 if (is_open() || is_stale()) {
588 f->dump_unsigned("request_load_avg", get_load_avg());
589 }
590 f->dump_float("uptime", get_session_uptime());
591 f->dump_unsigned("requests_in_flight", get_request_count());
b3b6e05e
TL
592 f->dump_unsigned("num_completed_requests", get_num_completed_requests());
593 f->dump_unsigned("num_completed_flushes", get_num_completed_flushes());
92f5a8d4
TL
594 f->dump_bool("reconnecting", reconnecting);
595 f->dump_object("recall_caps", recall_caps);
596 f->dump_object("release_caps", release_caps);
597 f->dump_object("recall_caps_throttle", recall_caps_throttle);
598 f->dump_object("recall_caps_throttle2o", recall_caps_throttle2o);
599 f->dump_object("session_cache_liveness", session_cache_liveness);
adb31ebb 600 f->dump_object("cap_acquisition", cap_acquisition);
f67539c2
TL
601
602 f->open_array_section("delegated_inos");
603 for (const auto& [start, len] : delegated_inos) {
604 f->open_object_section("ino_range");
605 f->dump_stream("start") << start;
606 f->dump_unsigned("length", len);
607 f->close_section();
608 }
609 f->close_section();
610
92f5a8d4
TL
611 info.dump(f);
612}
613
7c673cae
FG
614void SessionMapStore::dump(Formatter *f) const
615{
92f5a8d4
TL
616 f->open_array_section("sessions");
617 for (const auto& p : session_map) {
618 f->dump_object("session", *p.second);
7c673cae
FG
619 }
620 f->close_section(); // Sessions
621}
622
9f95a23c 623void SessionMapStore::generate_test_instances(std::list<SessionMapStore*>& ls)
7c673cae
FG
624{
625 // pretty boring for now
626 ls.push_back(new SessionMapStore());
627}
628
629void SessionMap::wipe()
630{
631 dout(1) << "wipe start" << dendl;
632 dump();
633 while (!session_map.empty()) {
634 Session *s = session_map.begin()->second;
635 remove_session(s);
636 }
637 version = ++projected;
638 dout(1) << "wipe result" << dendl;
639 dump();
640 dout(1) << "wipe done" << dendl;
641}
642
643void SessionMap::wipe_ino_prealloc()
644{
645 for (ceph::unordered_map<entity_name_t,Session*>::iterator p = session_map.begin();
646 p != session_map.end();
647 ++p) {
648 p->second->pending_prealloc_inos.clear();
f67539c2 649 p->second->free_prealloc_inos.clear();
9f95a23c 650 p->second->delegated_inos.clear();
7c673cae 651 p->second->info.prealloc_inos.clear();
7c673cae
FG
652 }
653 projected = ++version;
654}
655
656void SessionMap::add_session(Session *s)
657{
658 dout(10) << __func__ << " s=" << s << " name=" << s->info.inst.name << dendl;
659
11fdf7f2 660 ceph_assert(session_map.count(s->info.inst.name) == 0);
7c673cae
FG
661 session_map[s->info.inst.name] = s;
662 auto by_state_entry = by_state.find(s->state);
663 if (by_state_entry == by_state.end())
664 by_state_entry = by_state.emplace(s->state, new xlist<Session*>).first;
665 by_state_entry->second->push_back(&s->item_session_list);
666 s->get();
667
91327a77
AA
668 update_average_birth_time(*s);
669
7c673cae
FG
670 logger->set(l_mdssm_session_count, session_map.size());
671 logger->inc(l_mdssm_session_add);
672}
673
674void SessionMap::remove_session(Session *s)
675{
676 dout(10) << __func__ << " s=" << s << " name=" << s->info.inst.name << dendl;
677
91327a77
AA
678 update_average_birth_time(*s, false);
679
7c673cae
FG
680 s->trim_completed_requests(0);
681 s->item_session_list.remove_myself();
682 session_map.erase(s->info.inst.name);
683 dirty_sessions.erase(s->info.inst.name);
684 null_sessions.insert(s->info.inst.name);
685 s->put();
686
687 logger->set(l_mdssm_session_count, session_map.size());
688 logger->inc(l_mdssm_session_remove);
689}
690
691void SessionMap::touch_session(Session *session)
692{
693 dout(10) << __func__ << " s=" << session << " name=" << session->info.inst.name << dendl;
694
695 // Move to the back of the session list for this state (should
696 // already be on a list courtesy of add_session and set_state)
11fdf7f2 697 ceph_assert(session->item_session_list.is_on_list());
7c673cae
FG
698 auto by_state_entry = by_state.find(session->state);
699 if (by_state_entry == by_state.end())
700 by_state_entry = by_state.emplace(session->state,
701 new xlist<Session*>).first;
702 by_state_entry->second->push_back(&session->item_session_list);
703
91327a77 704 session->last_cap_renew = clock::now();
7c673cae
FG
705}
706
81eedcae 707void SessionMap::_mark_dirty(Session *s, bool may_save)
7c673cae 708{
31f18b77
FG
709 if (dirty_sessions.count(s->info.inst.name))
710 return;
711
81eedcae
TL
712 if (may_save &&
713 dirty_sessions.size() >= g_conf()->mds_sessionmap_keys_per_op) {
7c673cae
FG
714 // Pre-empt the usual save() call from journal segment trim, in
715 // order to avoid building up an oversized OMAP update operation
716 // from too many sessions modified at once
717 save(new C_MDSInternalNoop, version);
718 }
719
31f18b77 720 null_sessions.erase(s->info.inst.name);
7c673cae
FG
721 dirty_sessions.insert(s->info.inst.name);
722}
723
81eedcae 724void SessionMap::mark_dirty(Session *s, bool may_save)
7c673cae
FG
725{
726 dout(20) << __func__ << " s=" << s << " name=" << s->info.inst.name
727 << " v=" << version << dendl;
728
81eedcae 729 _mark_dirty(s, may_save);
7c673cae
FG
730 version++;
731 s->pop_pv(version);
732}
733
734void SessionMap::replay_dirty_session(Session *s)
735{
736 dout(20) << __func__ << " s=" << s << " name=" << s->info.inst.name
737 << " v=" << version << dendl;
738
81eedcae 739 _mark_dirty(s, false);
7c673cae
FG
740
741 replay_advance_version();
742}
743
744void SessionMap::replay_advance_version()
745{
746 version++;
747 projected = version;
748}
749
81eedcae
TL
750void SessionMap::replay_open_sessions(version_t event_cmapv,
751 map<client_t,entity_inst_t>& client_map,
752 map<client_t,client_metadata_t>& client_metadata_map)
753{
754 unsigned already_saved;
755
756 if (version + client_map.size() < event_cmapv)
757 goto bad;
758
759 // Server::finish_force_open_sessions() marks sessions dirty one by one.
760 // Marking a session dirty may flush all existing dirty sessions. So it's
761 // possible that some sessions are already saved in sessionmap.
762 already_saved = client_map.size() - (event_cmapv - version);
763 for (const auto& p : client_map) {
764 Session *s = get_or_add_session(p.second);
765 auto q = client_metadata_map.find(p.first);
766 if (q != client_metadata_map.end())
767 s->info.client_metadata.merge(q->second);
768
769 if (already_saved > 0) {
770 if (s->is_closed())
771 goto bad;
772
773 --already_saved;
774 continue;
775 }
776
777 set_state(s, Session::STATE_OPEN);
778 replay_dirty_session(s);
779 }
780 return;
781
782bad:
783 mds->clog->error() << "error replaying open sessions(" << client_map.size()
784 << ") sessionmap v " << event_cmapv << " table " << version;
785 ceph_assert(g_conf()->mds_wipe_sessions);
786 mds->sessionmap.wipe();
787 mds->sessionmap.set_version(event_cmapv);
788}
789
7c673cae
FG
790version_t SessionMap::mark_projected(Session *s)
791{
792 dout(20) << __func__ << " s=" << s << " name=" << s->info.inst.name
793 << " pv=" << projected << " -> " << projected + 1 << dendl;
794 ++projected;
795 s->push_pv(projected);
796 return projected;
797}
798
799namespace {
800class C_IO_SM_Save_One : public SessionMapIOContext {
11fdf7f2 801 MDSContext *on_safe;
7c673cae 802public:
11fdf7f2 803 C_IO_SM_Save_One(SessionMap *cm, MDSContext *on_safe_)
7c673cae
FG
804 : SessionMapIOContext(cm), on_safe(on_safe_) {}
805 void finish(int r) override {
806 if (r != 0) {
807 get_mds()->handle_write_error(r);
808 } else {
809 on_safe->complete(r);
810 }
811 }
91327a77
AA
812 void print(ostream& out) const override {
813 out << "session_save_one";
814 }
7c673cae
FG
815};
816}
817
818
819void SessionMap::save_if_dirty(const std::set<entity_name_t> &tgt_sessions,
820 MDSGatherBuilder *gather_bld)
821{
11fdf7f2 822 ceph_assert(gather_bld != NULL);
7c673cae
FG
823
824 std::vector<entity_name_t> write_sessions;
825
826 // Decide which sessions require a write
827 for (std::set<entity_name_t>::iterator i = tgt_sessions.begin();
828 i != tgt_sessions.end(); ++i) {
829 const entity_name_t &session_id = *i;
830
831 if (session_map.count(session_id) == 0) {
832 // Session isn't around any more, never mind.
833 continue;
834 }
835
836 Session *session = session_map[session_id];
837 if (!session->has_dirty_completed_requests()) {
838 // Session hasn't had completed_requests
839 // modified since last write, no need to
840 // write it now.
841 continue;
842 }
843
844 if (dirty_sessions.count(session_id) > 0) {
845 // Session is already dirtied, will be written, no
846 // need to pre-empt that.
847 continue;
848 }
849 // Okay, passed all our checks, now we write
850 // this session out. The version we write
851 // into the OMAP may now be higher-versioned
852 // than the version in the header, but that's
853 // okay because it's never a problem to have
854 // an overly-fresh copy of a session.
855 write_sessions.push_back(*i);
856 }
857
858 dout(4) << __func__ << ": writing " << write_sessions.size() << dendl;
859
860 // Batch writes into mds_sessionmap_keys_per_op
11fdf7f2 861 const uint32_t kpo = g_conf()->mds_sessionmap_keys_per_op;
7c673cae
FG
862 map<string, bufferlist> to_set;
863 for (uint32_t i = 0; i < write_sessions.size(); ++i) {
7c673cae
FG
864 const entity_name_t &session_id = write_sessions[i];
865 Session *session = session_map[session_id];
866 session->clear_dirty_completed_requests();
867
868 // Serialize K
f67539c2
TL
869 CachedStackStringStream css;
870 *css << session_id;
7c673cae
FG
871
872 // Serialize V
873 bufferlist bl;
874 session->info.encode(bl, mds->mdsmap->get_up_features());
875
876 // Add to RADOS op
f67539c2 877 to_set[css->str()] = bl;
7c673cae
FG
878
879 // Complete this write transaction?
880 if (i == write_sessions.size() - 1
881 || i % kpo == kpo - 1) {
882 ObjectOperation op;
883 op.omap_set(to_set);
11fdf7f2 884 to_set.clear(); // clear to start a new transaction
7c673cae
FG
885
886 SnapContext snapc;
887 object_t oid = get_object_name();
b3b6e05e 888 object_locator_t oloc(mds->get_metadata_pool());
11fdf7f2 889 MDSContext *on_safe = gather_bld->new_sub();
7c673cae 890 mds->objecter->mutate(oid, oloc, op, snapc,
91327a77
AA
891 ceph::real_clock::now(), 0,
892 new C_OnFinisher(
7c673cae
FG
893 new C_IO_SM_Save_One(this, on_safe),
894 mds->finisher));
895 }
896 }
897}
898
899// =================
900// Session
901
902#undef dout_prefix
903#define dout_prefix *_dout << "Session "
904
905/**
906 * Calculate the length of the `requests` member list,
907 * because elist does not have a size() method.
908 *
92f5a8d4 909 * O(N) runtime.
7c673cae 910 */
92f5a8d4 911size_t Session::get_request_count() const
7c673cae
FG
912{
913 size_t result = 0;
9f95a23c 914 for (auto p = requests.begin(); !p.end(); ++p)
7c673cae 915 ++result;
7c673cae
FG
916 return result;
917}
918
919/**
920 * Capped in response to a CEPH_MSG_CLIENT_CAPRELEASE message,
921 * with n_caps equal to the number of caps that were released
922 * in the message. Used to update state about how many caps a
923 * client has released since it was last instructed to RECALL_STATE.
924 */
925void Session::notify_cap_release(size_t n_caps)
926{
11fdf7f2
TL
927 recall_caps.hit(-(double)n_caps);
928 release_caps.hit(n_caps);
7c673cae
FG
929}
930
931/**
932 * Called when a CEPH_MSG_CLIENT_SESSION->CEPH_SESSION_RECALL_STATE
933 * message is sent to the client. Update our recall-related state
934 * in order to generate health metrics if the session doesn't see
935 * a commensurate number of calls to ::notify_cap_release
936 */
a8e16298 937uint64_t Session::notify_recall_sent(size_t new_limit)
7c673cae 938{
a8e16298
TL
939 const auto num_caps = caps.size();
940 ceph_assert(new_limit < num_caps); // Behaviour of Server::recall_client_state
941 const auto count = num_caps-new_limit;
942 uint64_t new_change;
943 if (recall_limit != new_limit) {
944 new_change = count;
7c673cae 945 } else {
a8e16298 946 new_change = 0; /* no change! */
7c673cae 947 }
7c673cae 948
a8e16298
TL
949 /* Always hit the session counter as a RECALL message is still sent to the
950 * client and we do not want the MDS to burn its global counter tokens on a
951 * session that is not releasing caps (i.e. allow the session counter to
952 * throttle future RECALL messages).
953 */
11fdf7f2
TL
954 recall_caps_throttle.hit(count);
955 recall_caps_throttle2o.hit(count);
956 recall_caps.hit(count);
a8e16298 957 return new_change;
7c673cae
FG
958}
959
960/**
961 * Use client metadata to generate a somewhat-friendlier
962 * name for the client than its session ID.
963 *
964 * This is *not* guaranteed to be unique, and any machine
965 * consumers of session-related output should always use
966 * the session ID as a primary capacity and use this only
967 * as a presentation hint.
968 */
969void Session::_update_human_name()
970{
971 auto info_client_metadata_entry = info.client_metadata.find("hostname");
972 if (info_client_metadata_entry != info.client_metadata.end()) {
973 // Happy path, refer to clients by hostname
974 human_name = info_client_metadata_entry->second;
975 if (!info.auth_name.has_default_id()) {
976 // When a non-default entity ID is set by the user, assume they
977 // would like to see it in references to the client, if it's
978 // reasonable short. Limit the length because we don't want
979 // to put e.g. uuid-generated names into a "human readable"
980 // rendering.
981 const int arbitrarily_short = 16;
982 if (info.auth_name.get_id().size() < arbitrarily_short) {
983 human_name += std::string(":") + info.auth_name.get_id();
984 }
985 }
986 } else {
987 // Fallback, refer to clients by ID e.g. client.4567
988 human_name = stringify(info.inst.name.num());
989 }
990}
991
11fdf7f2 992void Session::decode(bufferlist::const_iterator &p)
7c673cae
FG
993{
994 info.decode(p);
995
f67539c2
TL
996 free_prealloc_inos = info.prealloc_inos;
997
7c673cae
FG
998 _update_human_name();
999}
1000
1001int Session::check_access(CInode *in, unsigned mask,
1002 int caller_uid, int caller_gid,
1003 const vector<uint64_t> *caller_gid_list,
1004 int new_uid, int new_gid)
1005{
1006 string path;
1007 CInode *diri = NULL;
1008 if (!in->is_base())
1009 diri = in->get_projected_parent_dn()->get_dir()->get_inode();
1010 if (diri && diri->is_stray()){
11fdf7f2 1011 path = in->get_projected_inode()->stray_prior_path;
7c673cae
FG
1012 dout(20) << __func__ << " stray_prior_path " << path << dendl;
1013 } else {
1014 in->make_path_string(path, true);
1015 dout(20) << __func__ << " path " << path << dendl;
1016 }
1017 if (path.length())
1018 path = path.substr(1); // drop leading /
1019
f67539c2
TL
1020 const auto& inode = in->get_inode();
1021 if (in->is_dir() &&
1022 inode->has_layout() &&
1023 inode->layout.pool_ns.length() &&
7c673cae
FG
1024 !connection->has_feature(CEPH_FEATURE_FS_FILE_LAYOUT_V2)) {
1025 dout(10) << __func__ << " client doesn't support FS_FILE_LAYOUT_V2" << dendl;
f67539c2 1026 return -CEPHFS_EIO;
7c673cae
FG
1027 }
1028
f67539c2 1029 if (!auth_caps.is_capable(path, inode->uid, inode->gid, inode->mode,
7c673cae 1030 caller_uid, caller_gid, caller_gid_list, mask,
11fdf7f2 1031 new_uid, new_gid,
92f5a8d4 1032 info.inst.addr)) {
f67539c2 1033 return -CEPHFS_EACCES;
7c673cae
FG
1034 }
1035 return 0;
1036}
1037
91327a77
AA
1038// track total and per session load
1039void SessionMap::hit_session(Session *session) {
1040 uint64_t sessions = get_session_count_in_state(Session::STATE_OPEN) +
f91f0fd5
TL
1041 get_session_count_in_state(Session::STATE_STALE) +
1042 get_session_count_in_state(Session::STATE_CLOSING);
11fdf7f2 1043 ceph_assert(sessions != 0);
91327a77 1044
11fdf7f2 1045 double total_load = total_load_avg.hit();
91327a77
AA
1046 double avg_load = total_load / sessions;
1047
1048 logger->set(l_mdssm_total_load, (uint64_t)total_load);
1049 logger->set(l_mdssm_avg_load, (uint64_t)avg_load);
1050
1051 session->hit_session();
1052}
1053
92f5a8d4 1054void SessionMap::handle_conf_change(const std::set<std::string>& changed)
a8e16298 1055{
11fdf7f2
TL
1056 auto apply_to_open_sessions = [this](auto f) {
1057 if (auto it = by_state.find(Session::STATE_OPEN); it != by_state.end()) {
a8e16298 1058 for (const auto &session : *(it->second)) {
11fdf7f2 1059 f(session);
a8e16298
TL
1060 }
1061 }
11fdf7f2 1062 if (auto it = by_state.find(Session::STATE_STALE); it != by_state.end()) {
a8e16298 1063 for (const auto &session : *(it->second)) {
11fdf7f2 1064 f(session);
a8e16298
TL
1065 }
1066 }
11fdf7f2
TL
1067 };
1068
1069 if (changed.count("mds_request_load_average_decay_rate")) {
1070 auto d = g_conf().get_val<double>("mds_request_load_average_decay_rate");
11fdf7f2
TL
1071
1072 decay_rate = d;
1073 total_load_avg = DecayCounter(d);
1074
1075 auto mut = [d](auto s) {
1076 s->set_load_avg_decay_rate(d);
1077 };
1078 apply_to_open_sessions(mut);
a8e16298
TL
1079 }
1080 if (changed.count("mds_recall_max_decay_rate")) {
11fdf7f2
TL
1081 auto d = g_conf().get_val<double>("mds_recall_max_decay_rate");
1082 auto mut = [d](auto s) {
1083 s->recall_caps_throttle = DecayCounter(d);
1084 };
1085 apply_to_open_sessions(mut);
a8e16298
TL
1086 }
1087 if (changed.count("mds_recall_warning_decay_rate")) {
11fdf7f2
TL
1088 auto d = g_conf().get_val<double>("mds_recall_warning_decay_rate");
1089 auto mut = [d](auto s) {
1090 s->recall_caps = DecayCounter(d);
1091 s->release_caps = DecayCounter(d);
1092 };
1093 apply_to_open_sessions(mut);
91327a77 1094 }
92f5a8d4
TL
1095 if (changed.count("mds_session_cache_liveness_decay_rate")) {
1096 auto d = g_conf().get_val<double>("mds_session_cache_liveness_decay_rate");
1097 auto mut = [d](auto s) {
1098 s->session_cache_liveness = DecayCounter(d);
1099 s->session_cache_liveness.hit(s->caps.size()); /* so the MDS doesn't immediately start trimming a new session */
1100 };
1101 apply_to_open_sessions(mut);
1102 }
adb31ebb
TL
1103 if (changed.count("mds_session_cap_acquisition_decay_rate")) {
1104 auto d = g_conf().get_val<double>("mds_session_cap_acquisition_decay_rate");
1105 auto mut = [d](auto s) {
1106 s->cap_acquisition = DecayCounter(d);
1107 };
1108 apply_to_open_sessions(mut);
1109 }
91327a77
AA
1110}
1111
1112void SessionMap::update_average_session_age() {
1113 if (!session_map.size()) {
1114 return;
1115 }
1116
1117 double avg_uptime = std::chrono::duration<double>(clock::now()-avg_birth_time).count();
1118 logger->set(l_mdssm_avg_session_uptime, (uint64_t)avg_uptime);
1119}
1120
7c673cae
FG
1121int SessionFilter::parse(
1122 const std::vector<std::string> &args,
f67539c2 1123 std::ostream *ss)
7c673cae 1124{
11fdf7f2 1125 ceph_assert(ss != NULL);
7c673cae
FG
1126
1127 for (const auto &s : args) {
1128 dout(20) << __func__ << " parsing filter '" << s << "'" << dendl;
1129
1130 auto eq = s.find("=");
1131 if (eq == std::string::npos || eq == s.size()) {
9f95a23c
TL
1132 // allow this to be a bare id for compatibility with pre-octopus asok
1133 // 'session evict'.
1134 std::string err;
1135 id = strict_strtoll(s.c_str(), 10, &err);
1136 if (!err.empty()) {
1137 *ss << "Invalid filter '" << s << "'";
f67539c2 1138 return -CEPHFS_EINVAL;
9f95a23c
TL
1139 }
1140 return 0;
7c673cae
FG
1141 }
1142
1143 // Keys that start with this are to be taken as referring
1144 // to freeform client metadata fields.
1145 const std::string metadata_prefix("client_metadata.");
1146
1147 auto k = s.substr(0, eq);
1148 auto v = s.substr(eq + 1);
1149
1150 dout(20) << __func__ << " parsed k='" << k << "', v='" << v << "'" << dendl;
1151
1152 if (k.compare(0, metadata_prefix.size(), metadata_prefix) == 0
1153 && k.size() > metadata_prefix.size()) {
1154 // Filter on arbitrary metadata key (no fixed schema for this,
1155 // so anything after the dot is a valid field to filter on)
1156 auto metadata_key = k.substr(metadata_prefix.size());
1157 metadata.insert(std::make_pair(metadata_key, v));
1158 } else if (k == "auth_name") {
1159 // Filter on client entity name
1160 auth_name = v;
1161 } else if (k == "state") {
1162 state = v;
1163 } else if (k == "id") {
1164 std::string err;
1165 id = strict_strtoll(v.c_str(), 10, &err);
1166 if (!err.empty()) {
1167 *ss << err;
f67539c2 1168 return -CEPHFS_EINVAL;
7c673cae
FG
1169 }
1170 } else if (k == "reconnecting") {
1171
1172 /**
1173 * Strict boolean parser. Allow true/false/0/1.
f67539c2 1174 * Anything else is -CEPHFS_EINVAL.
7c673cae 1175 */
11fdf7f2 1176 auto is_true = [](std::string_view bstr, bool *out) -> bool
7c673cae 1177 {
11fdf7f2 1178 ceph_assert(out != nullptr);
7c673cae
FG
1179
1180 if (bstr == "true" || bstr == "1") {
1181 *out = true;
1182 return 0;
1183 } else if (bstr == "false" || bstr == "0") {
1184 *out = false;
1185 return 0;
1186 } else {
f67539c2 1187 return -CEPHFS_EINVAL;
7c673cae
FG
1188 }
1189 };
1190
1191 bool bval;
1192 int r = is_true(v, &bval);
1193 if (r == 0) {
1194 set_reconnecting(bval);
1195 } else {
1196 *ss << "Invalid boolean value '" << v << "'";
f67539c2 1197 return -CEPHFS_EINVAL;
7c673cae
FG
1198 }
1199 } else {
1200 *ss << "Invalid filter key '" << k << "'";
f67539c2 1201 return -CEPHFS_EINVAL;
7c673cae
FG
1202 }
1203 }
1204
1205 return 0;
1206}
1207
1208bool SessionFilter::match(
1209 const Session &session,
1210 std::function<bool(client_t)> is_reconnecting) const
1211{
1212 for (const auto &m : metadata) {
1213 const auto &k = m.first;
1214 const auto &v = m.second;
11fdf7f2
TL
1215 auto it = session.info.client_metadata.find(k);
1216 if (it == session.info.client_metadata.end()) {
7c673cae
FG
1217 return false;
1218 }
11fdf7f2 1219 if (it->second != v) {
7c673cae
FG
1220 return false;
1221 }
1222 }
1223
1224 if (!auth_name.empty() && auth_name != session.info.auth_name.get_id()) {
1225 return false;
1226 }
1227
1228 if (!state.empty() && state != session.get_state_name()) {
1229 return false;
1230 }
1231
1232 if (id != 0 && id != session.info.inst.name.num()) {
1233 return false;
1234 }
1235
1236 if (reconnecting.first) {
1237 const bool am_reconnecting = is_reconnecting(session.info.inst.name.num());
1238 if (reconnecting.second != am_reconnecting) {
1239 return false;
1240 }
1241 }
1242
1243 return true;
1244}
1245
1246std::ostream& operator<<(std::ostream &out, const Session &s)
1247{
11fdf7f2 1248 if (s.get_human_name() == stringify(s.get_client())) {
7c673cae
FG
1249 out << s.get_human_name();
1250 } else {
11fdf7f2 1251 out << s.get_human_name() << " (" << std::dec << s.get_client() << ")";
7c673cae
FG
1252 }
1253 return out;
1254}
1255