]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/SessionMap.cc
import ceph 15.2.14
[ceph.git] / ceph / src / mds / SessionMap.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "MDSRank.h"
16 #include "MDCache.h"
17 #include "Mutation.h"
18 #include "SessionMap.h"
19 #include "osdc/Filer.h"
20 #include "common/Finisher.h"
21
22 #include "common/config.h"
23 #include "common/errno.h"
24 #include "common/DecayCounter.h"
25 #include "include/ceph_assert.h"
26 #include "include/stringify.h"
27
28 #define dout_context g_ceph_context
29 #define dout_subsys ceph_subsys_mds
30 #undef dout_prefix
31 #define dout_prefix *_dout << "mds." << rank << ".sessionmap "
32
33 namespace {
34 class SessionMapIOContext : public MDSIOContextBase
35 {
36 protected:
37 SessionMap *sessionmap;
38 MDSRank *get_mds() override {return sessionmap->mds;}
39 public:
40 explicit SessionMapIOContext(SessionMap *sessionmap_) : sessionmap(sessionmap_) {
41 ceph_assert(sessionmap != NULL);
42 }
43 };
44 };
45
46 void SessionMap::register_perfcounters()
47 {
48 PerfCountersBuilder plb(g_ceph_context, "mds_sessions",
49 l_mdssm_first, l_mdssm_last);
50
51 plb.add_u64(l_mdssm_session_count, "session_count",
52 "Session count", "sess", PerfCountersBuilder::PRIO_INTERESTING);
53
54 plb.set_prio_default(PerfCountersBuilder::PRIO_USEFUL);
55 plb.add_u64_counter(l_mdssm_session_add, "session_add",
56 "Sessions added");
57 plb.add_u64_counter(l_mdssm_session_remove, "session_remove",
58 "Sessions removed");
59 plb.add_u64(l_mdssm_session_open, "sessions_open",
60 "Sessions currently open");
61 plb.add_u64(l_mdssm_session_stale, "sessions_stale",
62 "Sessions currently stale");
63 plb.add_u64(l_mdssm_total_load, "total_load", "Total Load");
64 plb.add_u64(l_mdssm_avg_load, "average_load", "Average Load");
65 plb.add_u64(l_mdssm_avg_session_uptime, "avg_session_uptime",
66 "Average session uptime");
67
68 logger = plb.create_perf_counters();
69 g_ceph_context->get_perfcounters_collection()->add(logger);
70 }
71
72 void SessionMap::dump()
73 {
74 dout(10) << "dump" << dendl;
75 for (ceph::unordered_map<entity_name_t,Session*>::iterator p = session_map.begin();
76 p != session_map.end();
77 ++p)
78 dout(10) << p->first << " " << p->second
79 << " state " << p->second->get_state_name()
80 << " completed " << p->second->info.completed_requests
81 << " prealloc_inos " << p->second->info.prealloc_inos
82 << " delegated_inos " << p->second->delegated_inos
83 << " used_inos " << p->second->info.used_inos
84 << dendl;
85 }
86
87
88 // ----------------
89 // LOAD
90
91
92 object_t SessionMap::get_object_name() const
93 {
94 char s[30];
95 snprintf(s, sizeof(s), "mds%d_sessionmap", int(mds->get_nodeid()));
96 return object_t(s);
97 }
98
99 namespace {
100 class C_IO_SM_Load : public SessionMapIOContext {
101 public:
102 const bool first; //< Am I the initial (header) load?
103 int header_r; //< Return value from OMAP header read
104 int values_r; //< Return value from OMAP value read
105 bufferlist header_bl;
106 std::map<std::string, bufferlist> session_vals;
107 bool more_session_vals = false;
108
109 C_IO_SM_Load(SessionMap *cm, const bool f)
110 : SessionMapIOContext(cm), first(f), header_r(0), values_r(0) {}
111
112 void finish(int r) override {
113 sessionmap->_load_finish(r, header_r, values_r, first, header_bl, session_vals,
114 more_session_vals);
115 }
116 void print(ostream& out) const override {
117 out << "session_load";
118 }
119 };
120 }
121
122
123 /**
124 * Decode OMAP header. Call this once when loading.
125 */
126 void SessionMapStore::decode_header(
127 bufferlist &header_bl)
128 {
129 auto q = header_bl.cbegin();
130 DECODE_START(1, q)
131 decode(version, q);
132 DECODE_FINISH(q);
133 }
134
135 void SessionMapStore::encode_header(
136 bufferlist *header_bl)
137 {
138 ENCODE_START(1, 1, *header_bl);
139 encode(version, *header_bl);
140 ENCODE_FINISH(*header_bl);
141 }
142
143 /**
144 * Decode and insert some serialized OMAP values. Call this
145 * repeatedly to insert batched loads.
146 */
147 void SessionMapStore::decode_values(std::map<std::string, bufferlist> &session_vals)
148 {
149 for (std::map<std::string, bufferlist>::iterator i = session_vals.begin();
150 i != session_vals.end(); ++i) {
151
152 entity_inst_t inst;
153
154 bool parsed = inst.name.parse(i->first);
155 if (!parsed) {
156 derr << "Corrupt entity name '" << i->first << "' in sessionmap" << dendl;
157 throw buffer::malformed_input("Corrupt entity name in sessionmap");
158 }
159
160 Session *s = get_or_add_session(inst);
161 if (s->is_closed()) {
162 s->set_state(Session::STATE_OPEN);
163 s->set_load_avg_decay_rate(decay_rate);
164 }
165 auto q = i->second.cbegin();
166 s->decode(q);
167 }
168 }
169
170 /**
171 * An OMAP read finished.
172 */
173 void SessionMap::_load_finish(
174 int operation_r,
175 int header_r,
176 int values_r,
177 bool first,
178 bufferlist &header_bl,
179 std::map<std::string, bufferlist> &session_vals,
180 bool more_session_vals)
181 {
182 if (operation_r < 0) {
183 derr << "_load_finish got " << cpp_strerror(operation_r) << dendl;
184 mds->clog->error() << "error reading sessionmap '" << get_object_name()
185 << "' " << operation_r << " ("
186 << cpp_strerror(operation_r) << ")";
187 mds->damaged();
188 ceph_abort(); // Should be unreachable because damaged() calls respawn()
189 }
190
191 // Decode header
192 if (first) {
193 if (header_r != 0) {
194 derr << __func__ << ": header error: " << cpp_strerror(header_r) << dendl;
195 mds->clog->error() << "error reading sessionmap header "
196 << header_r << " (" << cpp_strerror(header_r) << ")";
197 mds->damaged();
198 ceph_abort(); // Should be unreachable because damaged() calls respawn()
199 }
200
201 if(header_bl.length() == 0) {
202 dout(4) << __func__ << ": header missing, loading legacy..." << dendl;
203 load_legacy();
204 return;
205 }
206
207 try {
208 decode_header(header_bl);
209 } catch (buffer::error &e) {
210 mds->clog->error() << "corrupt sessionmap header: " << e.what();
211 mds->damaged();
212 ceph_abort(); // Should be unreachable because damaged() calls respawn()
213 }
214 dout(10) << __func__ << " loaded version " << version << dendl;
215 }
216
217 if (values_r != 0) {
218 derr << __func__ << ": error reading values: "
219 << cpp_strerror(values_r) << dendl;
220 mds->clog->error() << "error reading sessionmap values: "
221 << values_r << " (" << cpp_strerror(values_r) << ")";
222 mds->damaged();
223 ceph_abort(); // Should be unreachable because damaged() calls respawn()
224 }
225
226 // Decode session_vals
227 try {
228 decode_values(session_vals);
229 } catch (buffer::error &e) {
230 mds->clog->error() << "corrupt sessionmap values: " << e.what();
231 mds->damaged();
232 ceph_abort(); // Should be unreachable because damaged() calls respawn()
233 }
234
235 if (more_session_vals) {
236 // Issue another read if we're not at the end of the omap
237 const std::string last_key = session_vals.rbegin()->first;
238 dout(10) << __func__ << ": continue omap load from '"
239 << last_key << "'" << dendl;
240 object_t oid = get_object_name();
241 object_locator_t oloc(mds->mdsmap->get_metadata_pool());
242 C_IO_SM_Load *c = new C_IO_SM_Load(this, false);
243 ObjectOperation op;
244 op.omap_get_vals(last_key, "", g_conf()->mds_sessionmap_keys_per_op,
245 &c->session_vals, &c->more_session_vals, &c->values_r);
246 mds->objecter->read(oid, oloc, op, CEPH_NOSNAP, NULL, 0,
247 new C_OnFinisher(c, mds->finisher));
248 } else {
249 // I/O is complete. Update `by_state`
250 dout(10) << __func__ << ": omap load complete" << dendl;
251 for (ceph::unordered_map<entity_name_t, Session*>::iterator i = session_map.begin();
252 i != session_map.end(); ++i) {
253 Session *s = i->second;
254 auto by_state_entry = by_state.find(s->get_state());
255 if (by_state_entry == by_state.end())
256 by_state_entry = by_state.emplace(s->get_state(),
257 new xlist<Session*>).first;
258 by_state_entry->second->push_back(&s->item_session_list);
259 }
260
261 // Population is complete. Trigger load waiters.
262 dout(10) << __func__ << ": v " << version
263 << ", " << session_map.size() << " sessions" << dendl;
264 projected = committing = committed = version;
265 dump();
266 finish_contexts(g_ceph_context, waiting_for_load);
267 }
268 }
269
270 /**
271 * Populate session state from OMAP records in this
272 * rank's sessionmap object.
273 */
274 void SessionMap::load(MDSContext *onload)
275 {
276 dout(10) << "load" << dendl;
277
278 if (onload)
279 waiting_for_load.push_back(onload);
280
281 C_IO_SM_Load *c = new C_IO_SM_Load(this, true);
282 object_t oid = get_object_name();
283 object_locator_t oloc(mds->mdsmap->get_metadata_pool());
284
285 ObjectOperation op;
286 op.omap_get_header(&c->header_bl, &c->header_r);
287 op.omap_get_vals("", "", g_conf()->mds_sessionmap_keys_per_op,
288 &c->session_vals, &c->more_session_vals, &c->values_r);
289
290 mds->objecter->read(oid, oloc, op, CEPH_NOSNAP, NULL, 0, new C_OnFinisher(c, mds->finisher));
291 }
292
293 namespace {
294 class C_IO_SM_LoadLegacy : public SessionMapIOContext {
295 public:
296 bufferlist bl;
297 explicit C_IO_SM_LoadLegacy(SessionMap *cm) : SessionMapIOContext(cm) {}
298 void finish(int r) override {
299 sessionmap->_load_legacy_finish(r, bl);
300 }
301 void print(ostream& out) const override {
302 out << "session_load_legacy";
303 }
304 };
305 }
306
307
308 /**
309 * Load legacy (object data blob) SessionMap format, assuming
310 * that waiting_for_load has already been populated with
311 * the relevant completion. This is the fallback if we do not
312 * find an OMAP header when attempting to load normally.
313 */
314 void SessionMap::load_legacy()
315 {
316 dout(10) << __func__ << dendl;
317
318 C_IO_SM_LoadLegacy *c = new C_IO_SM_LoadLegacy(this);
319 object_t oid = get_object_name();
320 object_locator_t oloc(mds->mdsmap->get_metadata_pool());
321
322 mds->objecter->read_full(oid, oloc, CEPH_NOSNAP, &c->bl, 0,
323 new C_OnFinisher(c, mds->finisher));
324 }
325
326 void SessionMap::_load_legacy_finish(int r, bufferlist &bl)
327 {
328 auto blp = bl.cbegin();
329 if (r < 0) {
330 derr << "_load_finish got " << cpp_strerror(r) << dendl;
331 ceph_abort_msg("failed to load sessionmap");
332 }
333 dump();
334 decode_legacy(blp); // note: this sets last_cap_renew = now()
335 dout(10) << "_load_finish v " << version
336 << ", " << session_map.size() << " sessions, "
337 << bl.length() << " bytes"
338 << dendl;
339 projected = committing = committed = version;
340 dump();
341
342 // Mark all sessions dirty, so that on next save() we will write
343 // a complete OMAP version of the data loaded from the legacy format
344 for (ceph::unordered_map<entity_name_t, Session*>::iterator i = session_map.begin();
345 i != session_map.end(); ++i) {
346 // Don't use mark_dirty because on this occasion we want to ignore the
347 // keys_per_op limit and do one big write (upgrade must be atomic)
348 dirty_sessions.insert(i->first);
349 }
350 loaded_legacy = true;
351
352 finish_contexts(g_ceph_context, waiting_for_load);
353 }
354
355
356 // ----------------
357 // SAVE
358
359 namespace {
360 class C_IO_SM_Save : public SessionMapIOContext {
361 version_t version;
362 public:
363 C_IO_SM_Save(SessionMap *cm, version_t v) : SessionMapIOContext(cm), version(v) {}
364 void finish(int r) override {
365 if (r != 0) {
366 get_mds()->handle_write_error(r);
367 } else {
368 sessionmap->_save_finish(version);
369 }
370 }
371 void print(ostream& out) const override {
372 out << "session_save";
373 }
374 };
375 }
376
377 void SessionMap::save(MDSContext *onsave, version_t needv)
378 {
379 dout(10) << __func__ << ": needv " << needv << ", v " << version << dendl;
380
381 if (needv && committing >= needv) {
382 ceph_assert(committing > committed);
383 commit_waiters[committing].push_back(onsave);
384 return;
385 }
386
387 commit_waiters[version].push_back(onsave);
388
389 committing = version;
390 SnapContext snapc;
391 object_t oid = get_object_name();
392 object_locator_t oloc(mds->mdsmap->get_metadata_pool());
393
394 ObjectOperation op;
395
396 /* Compose OSD OMAP transaction for full write */
397 bufferlist header_bl;
398 encode_header(&header_bl);
399 op.omap_set_header(header_bl);
400
401 /* If we loaded a legacy sessionmap, then erase the old data. If
402 * an old-versioned MDS tries to read it, it'll fail out safely
403 * with an end_of_buffer exception */
404 if (loaded_legacy) {
405 dout(4) << __func__ << " erasing legacy sessionmap" << dendl;
406 op.truncate(0);
407 loaded_legacy = false; // only need to truncate once.
408 }
409
410 dout(20) << " updating keys:" << dendl;
411 map<string, bufferlist> to_set;
412 for(std::set<entity_name_t>::iterator i = dirty_sessions.begin();
413 i != dirty_sessions.end(); ++i) {
414 const entity_name_t name = *i;
415 Session *session = session_map[name];
416
417 if (session->is_open() ||
418 session->is_closing() ||
419 session->is_stale() ||
420 session->is_killing()) {
421 dout(20) << " " << name << dendl;
422 // Serialize K
423 std::ostringstream k;
424 k << name;
425
426 // Serialize V
427 bufferlist bl;
428 session->info.encode(bl, mds->mdsmap->get_up_features());
429
430 // Add to RADOS op
431 to_set[k.str()] = bl;
432
433 session->clear_dirty_completed_requests();
434 } else {
435 dout(20) << " " << name << " (ignoring)" << dendl;
436 }
437 }
438 if (!to_set.empty()) {
439 op.omap_set(to_set);
440 }
441
442 dout(20) << " removing keys:" << dendl;
443 set<string> to_remove;
444 for(std::set<entity_name_t>::const_iterator i = null_sessions.begin();
445 i != null_sessions.end(); ++i) {
446 dout(20) << " " << *i << dendl;
447 std::ostringstream k;
448 k << *i;
449 to_remove.insert(k.str());
450 }
451 if (!to_remove.empty()) {
452 op.omap_rm_keys(to_remove);
453 }
454
455 dirty_sessions.clear();
456 null_sessions.clear();
457
458 mds->objecter->mutate(oid, oloc, op, snapc,
459 ceph::real_clock::now(),
460 0,
461 new C_OnFinisher(new C_IO_SM_Save(this, version),
462 mds->finisher));
463 }
464
465 void SessionMap::_save_finish(version_t v)
466 {
467 dout(10) << "_save_finish v" << v << dendl;
468 committed = v;
469
470 finish_contexts(g_ceph_context, commit_waiters[v]);
471 commit_waiters.erase(v);
472 }
473
474
475 /**
476 * Deserialize sessions, and update by_state index
477 */
478 void SessionMap::decode_legacy(bufferlist::const_iterator &p)
479 {
480 // Populate `sessions`
481 SessionMapStore::decode_legacy(p);
482
483 // Update `by_state`
484 for (ceph::unordered_map<entity_name_t, Session*>::iterator i = session_map.begin();
485 i != session_map.end(); ++i) {
486 Session *s = i->second;
487 auto by_state_entry = by_state.find(s->get_state());
488 if (by_state_entry == by_state.end())
489 by_state_entry = by_state.emplace(s->get_state(),
490 new xlist<Session*>).first;
491 by_state_entry->second->push_back(&s->item_session_list);
492 }
493 }
494
495 uint64_t SessionMap::set_state(Session *session, int s) {
496 if (session->state != s) {
497 session->set_state(s);
498 auto by_state_entry = by_state.find(s);
499 if (by_state_entry == by_state.end())
500 by_state_entry = by_state.emplace(s, new xlist<Session*>).first;
501 by_state_entry->second->push_back(&session->item_session_list);
502
503 if (session->is_open() || session->is_stale()) {
504 session->set_load_avg_decay_rate(decay_rate);
505 }
506
507 // refresh number of sessions for states which have perf
508 // couters associated
509 logger->set(l_mdssm_session_open,
510 get_session_count_in_state(Session::STATE_OPEN));
511 logger->set(l_mdssm_session_stale,
512 get_session_count_in_state(Session::STATE_STALE));
513 }
514
515 return session->get_state_seq();
516 }
517
518 void SessionMapStore::decode_legacy(bufferlist::const_iterator& p)
519 {
520 auto now = clock::now();
521 uint64_t pre;
522 decode(pre, p);
523 if (pre == (uint64_t)-1) {
524 DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, p);
525 ceph_assert(struct_v >= 2);
526
527 decode(version, p);
528
529 while (!p.end()) {
530 entity_inst_t inst;
531 decode(inst.name, p);
532 Session *s = get_or_add_session(inst);
533 if (s->is_closed()) {
534 s->set_state(Session::STATE_OPEN);
535 s->set_load_avg_decay_rate(decay_rate);
536 }
537 s->decode(p);
538 }
539
540 DECODE_FINISH(p);
541 } else {
542 // --- old format ----
543 version = pre;
544
545 // this is a meaningless upper bound. can be ignored.
546 __u32 n;
547 decode(n, p);
548
549 while (n-- && !p.end()) {
550 auto p2 = p;
551 Session *s = new Session(ConnectionRef());
552 s->info.decode(p);
553 {
554 auto& name = s->info.inst.name;
555 auto it = session_map.find(name);
556 if (it != session_map.end()) {
557 // eager client connected too fast! aie.
558 dout(10) << " already had session for " << name << ", recovering" << dendl;
559 delete s;
560 s = it->second;
561 p = p2;
562 s->info.decode(p);
563 } else {
564 it->second = s;
565 }
566 }
567 s->set_state(Session::STATE_OPEN);
568 s->set_load_avg_decay_rate(decay_rate);
569 s->last_cap_renew = now;
570 }
571 }
572 }
573
574 void Session::dump(Formatter *f, bool cap_dump) const
575 {
576 f->dump_int("id", info.inst.name.num());
577 f->dump_object("entity", info.inst);
578 f->dump_string("state", get_state_name());
579 f->dump_int("num_leases", leases.size());
580 f->dump_int("num_caps", caps.size());
581 if (cap_dump) {
582 f->open_array_section("caps");
583 for (const auto& cap : caps) {
584 f->dump_object("cap", *cap);
585 }
586 f->close_section();
587 }
588 if (is_open() || is_stale()) {
589 f->dump_unsigned("request_load_avg", get_load_avg());
590 }
591 f->dump_float("uptime", get_session_uptime());
592 f->dump_unsigned("requests_in_flight", get_request_count());
593 f->dump_unsigned("num_completed_requests", get_num_completed_requests());
594 f->dump_unsigned("num_completed_flushes", get_num_completed_flushes());
595 f->dump_bool("reconnecting", reconnecting);
596 f->dump_object("recall_caps", recall_caps);
597 f->dump_object("release_caps", release_caps);
598 f->dump_object("recall_caps_throttle", recall_caps_throttle);
599 f->dump_object("recall_caps_throttle2o", recall_caps_throttle2o);
600 f->dump_object("session_cache_liveness", session_cache_liveness);
601 f->dump_object("cap_acquisition", cap_acquisition);
602 info.dump(f);
603 }
604
605 void SessionMapStore::dump(Formatter *f) const
606 {
607 f->open_array_section("sessions");
608 for (const auto& p : session_map) {
609 f->dump_object("session", *p.second);
610 }
611 f->close_section(); // Sessions
612 }
613
614 void SessionMapStore::generate_test_instances(std::list<SessionMapStore*>& ls)
615 {
616 // pretty boring for now
617 ls.push_back(new SessionMapStore());
618 }
619
620 void SessionMap::wipe()
621 {
622 dout(1) << "wipe start" << dendl;
623 dump();
624 while (!session_map.empty()) {
625 Session *s = session_map.begin()->second;
626 remove_session(s);
627 }
628 version = ++projected;
629 dout(1) << "wipe result" << dendl;
630 dump();
631 dout(1) << "wipe done" << dendl;
632 }
633
634 void SessionMap::wipe_ino_prealloc()
635 {
636 for (ceph::unordered_map<entity_name_t,Session*>::iterator p = session_map.begin();
637 p != session_map.end();
638 ++p) {
639 p->second->pending_prealloc_inos.clear();
640 p->second->delegated_inos.clear();
641 p->second->info.prealloc_inos.clear();
642 p->second->info.used_inos.clear();
643 }
644 projected = ++version;
645 }
646
647 void SessionMap::add_session(Session *s)
648 {
649 dout(10) << __func__ << " s=" << s << " name=" << s->info.inst.name << dendl;
650
651 ceph_assert(session_map.count(s->info.inst.name) == 0);
652 session_map[s->info.inst.name] = s;
653 auto by_state_entry = by_state.find(s->state);
654 if (by_state_entry == by_state.end())
655 by_state_entry = by_state.emplace(s->state, new xlist<Session*>).first;
656 by_state_entry->second->push_back(&s->item_session_list);
657 s->get();
658
659 update_average_birth_time(*s);
660
661 logger->set(l_mdssm_session_count, session_map.size());
662 logger->inc(l_mdssm_session_add);
663 }
664
665 void SessionMap::remove_session(Session *s)
666 {
667 dout(10) << __func__ << " s=" << s << " name=" << s->info.inst.name << dendl;
668
669 update_average_birth_time(*s, false);
670
671 s->trim_completed_requests(0);
672 s->item_session_list.remove_myself();
673 session_map.erase(s->info.inst.name);
674 dirty_sessions.erase(s->info.inst.name);
675 null_sessions.insert(s->info.inst.name);
676 s->put();
677
678 logger->set(l_mdssm_session_count, session_map.size());
679 logger->inc(l_mdssm_session_remove);
680 }
681
682 void SessionMap::touch_session(Session *session)
683 {
684 dout(10) << __func__ << " s=" << session << " name=" << session->info.inst.name << dendl;
685
686 // Move to the back of the session list for this state (should
687 // already be on a list courtesy of add_session and set_state)
688 ceph_assert(session->item_session_list.is_on_list());
689 auto by_state_entry = by_state.find(session->state);
690 if (by_state_entry == by_state.end())
691 by_state_entry = by_state.emplace(session->state,
692 new xlist<Session*>).first;
693 by_state_entry->second->push_back(&session->item_session_list);
694
695 session->last_cap_renew = clock::now();
696 }
697
698 void SessionMap::_mark_dirty(Session *s, bool may_save)
699 {
700 if (dirty_sessions.count(s->info.inst.name))
701 return;
702
703 if (may_save &&
704 dirty_sessions.size() >= g_conf()->mds_sessionmap_keys_per_op) {
705 // Pre-empt the usual save() call from journal segment trim, in
706 // order to avoid building up an oversized OMAP update operation
707 // from too many sessions modified at once
708 save(new C_MDSInternalNoop, version);
709 }
710
711 null_sessions.erase(s->info.inst.name);
712 dirty_sessions.insert(s->info.inst.name);
713 }
714
715 void SessionMap::mark_dirty(Session *s, bool may_save)
716 {
717 dout(20) << __func__ << " s=" << s << " name=" << s->info.inst.name
718 << " v=" << version << dendl;
719
720 _mark_dirty(s, may_save);
721 version++;
722 s->pop_pv(version);
723 }
724
725 void SessionMap::replay_dirty_session(Session *s)
726 {
727 dout(20) << __func__ << " s=" << s << " name=" << s->info.inst.name
728 << " v=" << version << dendl;
729
730 _mark_dirty(s, false);
731
732 replay_advance_version();
733 }
734
735 void SessionMap::replay_advance_version()
736 {
737 version++;
738 projected = version;
739 }
740
741 void SessionMap::replay_open_sessions(version_t event_cmapv,
742 map<client_t,entity_inst_t>& client_map,
743 map<client_t,client_metadata_t>& client_metadata_map)
744 {
745 unsigned already_saved;
746
747 if (version + client_map.size() < event_cmapv)
748 goto bad;
749
750 // Server::finish_force_open_sessions() marks sessions dirty one by one.
751 // Marking a session dirty may flush all existing dirty sessions. So it's
752 // possible that some sessions are already saved in sessionmap.
753 already_saved = client_map.size() - (event_cmapv - version);
754 for (const auto& p : client_map) {
755 Session *s = get_or_add_session(p.second);
756 auto q = client_metadata_map.find(p.first);
757 if (q != client_metadata_map.end())
758 s->info.client_metadata.merge(q->second);
759
760 if (already_saved > 0) {
761 if (s->is_closed())
762 goto bad;
763
764 --already_saved;
765 continue;
766 }
767
768 set_state(s, Session::STATE_OPEN);
769 replay_dirty_session(s);
770 }
771 return;
772
773 bad:
774 mds->clog->error() << "error replaying open sessions(" << client_map.size()
775 << ") sessionmap v " << event_cmapv << " table " << version;
776 ceph_assert(g_conf()->mds_wipe_sessions);
777 mds->sessionmap.wipe();
778 mds->sessionmap.set_version(event_cmapv);
779 }
780
781 version_t SessionMap::mark_projected(Session *s)
782 {
783 dout(20) << __func__ << " s=" << s << " name=" << s->info.inst.name
784 << " pv=" << projected << " -> " << projected + 1 << dendl;
785 ++projected;
786 s->push_pv(projected);
787 return projected;
788 }
789
790 namespace {
791 class C_IO_SM_Save_One : public SessionMapIOContext {
792 MDSContext *on_safe;
793 public:
794 C_IO_SM_Save_One(SessionMap *cm, MDSContext *on_safe_)
795 : SessionMapIOContext(cm), on_safe(on_safe_) {}
796 void finish(int r) override {
797 if (r != 0) {
798 get_mds()->handle_write_error(r);
799 } else {
800 on_safe->complete(r);
801 }
802 }
803 void print(ostream& out) const override {
804 out << "session_save_one";
805 }
806 };
807 }
808
809
810 void SessionMap::save_if_dirty(const std::set<entity_name_t> &tgt_sessions,
811 MDSGatherBuilder *gather_bld)
812 {
813 ceph_assert(gather_bld != NULL);
814
815 std::vector<entity_name_t> write_sessions;
816
817 // Decide which sessions require a write
818 for (std::set<entity_name_t>::iterator i = tgt_sessions.begin();
819 i != tgt_sessions.end(); ++i) {
820 const entity_name_t &session_id = *i;
821
822 if (session_map.count(session_id) == 0) {
823 // Session isn't around any more, never mind.
824 continue;
825 }
826
827 Session *session = session_map[session_id];
828 if (!session->has_dirty_completed_requests()) {
829 // Session hasn't had completed_requests
830 // modified since last write, no need to
831 // write it now.
832 continue;
833 }
834
835 if (dirty_sessions.count(session_id) > 0) {
836 // Session is already dirtied, will be written, no
837 // need to pre-empt that.
838 continue;
839 }
840 // Okay, passed all our checks, now we write
841 // this session out. The version we write
842 // into the OMAP may now be higher-versioned
843 // than the version in the header, but that's
844 // okay because it's never a problem to have
845 // an overly-fresh copy of a session.
846 write_sessions.push_back(*i);
847 }
848
849 dout(4) << __func__ << ": writing " << write_sessions.size() << dendl;
850
851 // Batch writes into mds_sessionmap_keys_per_op
852 const uint32_t kpo = g_conf()->mds_sessionmap_keys_per_op;
853 map<string, bufferlist> to_set;
854 for (uint32_t i = 0; i < write_sessions.size(); ++i) {
855 const entity_name_t &session_id = write_sessions[i];
856 Session *session = session_map[session_id];
857 session->clear_dirty_completed_requests();
858
859 // Serialize K
860 std::ostringstream k;
861 k << session_id;
862
863 // Serialize V
864 bufferlist bl;
865 session->info.encode(bl, mds->mdsmap->get_up_features());
866
867 // Add to RADOS op
868 to_set[k.str()] = bl;
869
870 // Complete this write transaction?
871 if (i == write_sessions.size() - 1
872 || i % kpo == kpo - 1) {
873 ObjectOperation op;
874 op.omap_set(to_set);
875 to_set.clear(); // clear to start a new transaction
876
877 SnapContext snapc;
878 object_t oid = get_object_name();
879 object_locator_t oloc(mds->mdsmap->get_metadata_pool());
880 MDSContext *on_safe = gather_bld->new_sub();
881 mds->objecter->mutate(oid, oloc, op, snapc,
882 ceph::real_clock::now(), 0,
883 new C_OnFinisher(
884 new C_IO_SM_Save_One(this, on_safe),
885 mds->finisher));
886 }
887 }
888 }
889
890 // =================
891 // Session
892
893 #undef dout_prefix
894 #define dout_prefix *_dout << "Session "
895
896 /**
897 * Calculate the length of the `requests` member list,
898 * because elist does not have a size() method.
899 *
900 * O(N) runtime.
901 */
902 size_t Session::get_request_count() const
903 {
904 size_t result = 0;
905 for (auto p = requests.begin(); !p.end(); ++p)
906 ++result;
907 return result;
908 }
909
910 /**
911 * Capped in response to a CEPH_MSG_CLIENT_CAPRELEASE message,
912 * with n_caps equal to the number of caps that were released
913 * in the message. Used to update state about how many caps a
914 * client has released since it was last instructed to RECALL_STATE.
915 */
916 void Session::notify_cap_release(size_t n_caps)
917 {
918 recall_caps.hit(-(double)n_caps);
919 release_caps.hit(n_caps);
920 }
921
922 /**
923 * Called when a CEPH_MSG_CLIENT_SESSION->CEPH_SESSION_RECALL_STATE
924 * message is sent to the client. Update our recall-related state
925 * in order to generate health metrics if the session doesn't see
926 * a commensurate number of calls to ::notify_cap_release
927 */
928 uint64_t Session::notify_recall_sent(size_t new_limit)
929 {
930 const auto num_caps = caps.size();
931 ceph_assert(new_limit < num_caps); // Behaviour of Server::recall_client_state
932 const auto count = num_caps-new_limit;
933 uint64_t new_change;
934 if (recall_limit != new_limit) {
935 new_change = count;
936 } else {
937 new_change = 0; /* no change! */
938 }
939
940 /* Always hit the session counter as a RECALL message is still sent to the
941 * client and we do not want the MDS to burn its global counter tokens on a
942 * session that is not releasing caps (i.e. allow the session counter to
943 * throttle future RECALL messages).
944 */
945 recall_caps_throttle.hit(count);
946 recall_caps_throttle2o.hit(count);
947 recall_caps.hit(count);
948 return new_change;
949 }
950
951 /**
952 * Use client metadata to generate a somewhat-friendlier
953 * name for the client than its session ID.
954 *
955 * This is *not* guaranteed to be unique, and any machine
956 * consumers of session-related output should always use
957 * the session ID as a primary capacity and use this only
958 * as a presentation hint.
959 */
960 void Session::_update_human_name()
961 {
962 auto info_client_metadata_entry = info.client_metadata.find("hostname");
963 if (info_client_metadata_entry != info.client_metadata.end()) {
964 // Happy path, refer to clients by hostname
965 human_name = info_client_metadata_entry->second;
966 if (!info.auth_name.has_default_id()) {
967 // When a non-default entity ID is set by the user, assume they
968 // would like to see it in references to the client, if it's
969 // reasonable short. Limit the length because we don't want
970 // to put e.g. uuid-generated names into a "human readable"
971 // rendering.
972 const int arbitrarily_short = 16;
973 if (info.auth_name.get_id().size() < arbitrarily_short) {
974 human_name += std::string(":") + info.auth_name.get_id();
975 }
976 }
977 } else {
978 // Fallback, refer to clients by ID e.g. client.4567
979 human_name = stringify(info.inst.name.num());
980 }
981 }
982
983 void Session::decode(bufferlist::const_iterator &p)
984 {
985 info.decode(p);
986
987 _update_human_name();
988 }
989
990 int Session::check_access(CInode *in, unsigned mask,
991 int caller_uid, int caller_gid,
992 const vector<uint64_t> *caller_gid_list,
993 int new_uid, int new_gid)
994 {
995 string path;
996 CInode *diri = NULL;
997 if (!in->is_base())
998 diri = in->get_projected_parent_dn()->get_dir()->get_inode();
999 if (diri && diri->is_stray()){
1000 path = in->get_projected_inode()->stray_prior_path;
1001 dout(20) << __func__ << " stray_prior_path " << path << dendl;
1002 } else {
1003 in->make_path_string(path, true);
1004 dout(20) << __func__ << " path " << path << dendl;
1005 }
1006 if (path.length())
1007 path = path.substr(1); // drop leading /
1008
1009 if (in->inode.is_dir() &&
1010 in->inode.has_layout() &&
1011 in->inode.layout.pool_ns.length() &&
1012 !connection->has_feature(CEPH_FEATURE_FS_FILE_LAYOUT_V2)) {
1013 dout(10) << __func__ << " client doesn't support FS_FILE_LAYOUT_V2" << dendl;
1014 return -EIO;
1015 }
1016
1017 if (!auth_caps.is_capable(path, in->inode.uid, in->inode.gid, in->inode.mode,
1018 caller_uid, caller_gid, caller_gid_list, mask,
1019 new_uid, new_gid,
1020 info.inst.addr)) {
1021 return -EACCES;
1022 }
1023 return 0;
1024 }
1025
1026 // track total and per session load
1027 void SessionMap::hit_session(Session *session) {
1028 uint64_t sessions = get_session_count_in_state(Session::STATE_OPEN) +
1029 get_session_count_in_state(Session::STATE_STALE) +
1030 get_session_count_in_state(Session::STATE_CLOSING);
1031 ceph_assert(sessions != 0);
1032
1033 double total_load = total_load_avg.hit();
1034 double avg_load = total_load / sessions;
1035
1036 logger->set(l_mdssm_total_load, (uint64_t)total_load);
1037 logger->set(l_mdssm_avg_load, (uint64_t)avg_load);
1038
1039 session->hit_session();
1040 }
1041
1042 void SessionMap::handle_conf_change(const std::set<std::string>& changed)
1043 {
1044 auto apply_to_open_sessions = [this](auto f) {
1045 if (auto it = by_state.find(Session::STATE_OPEN); it != by_state.end()) {
1046 for (const auto &session : *(it->second)) {
1047 f(session);
1048 }
1049 }
1050 if (auto it = by_state.find(Session::STATE_STALE); it != by_state.end()) {
1051 for (const auto &session : *(it->second)) {
1052 f(session);
1053 }
1054 }
1055 };
1056
1057 if (changed.count("mds_request_load_average_decay_rate")) {
1058 auto d = g_conf().get_val<double>("mds_request_load_average_decay_rate");
1059
1060 decay_rate = d;
1061 total_load_avg = DecayCounter(d);
1062
1063 auto mut = [d](auto s) {
1064 s->set_load_avg_decay_rate(d);
1065 };
1066 apply_to_open_sessions(mut);
1067 }
1068 if (changed.count("mds_recall_max_decay_rate")) {
1069 auto d = g_conf().get_val<double>("mds_recall_max_decay_rate");
1070 auto mut = [d](auto s) {
1071 s->recall_caps_throttle = DecayCounter(d);
1072 };
1073 apply_to_open_sessions(mut);
1074 }
1075 if (changed.count("mds_recall_warning_decay_rate")) {
1076 auto d = g_conf().get_val<double>("mds_recall_warning_decay_rate");
1077 auto mut = [d](auto s) {
1078 s->recall_caps = DecayCounter(d);
1079 s->release_caps = DecayCounter(d);
1080 };
1081 apply_to_open_sessions(mut);
1082 }
1083 if (changed.count("mds_session_cache_liveness_decay_rate")) {
1084 auto d = g_conf().get_val<double>("mds_session_cache_liveness_decay_rate");
1085 auto mut = [d](auto s) {
1086 s->session_cache_liveness = DecayCounter(d);
1087 s->session_cache_liveness.hit(s->caps.size()); /* so the MDS doesn't immediately start trimming a new session */
1088 };
1089 apply_to_open_sessions(mut);
1090 }
1091 if (changed.count("mds_session_cap_acquisition_decay_rate")) {
1092 auto d = g_conf().get_val<double>("mds_session_cap_acquisition_decay_rate");
1093 auto mut = [d](auto s) {
1094 s->cap_acquisition = DecayCounter(d);
1095 };
1096 apply_to_open_sessions(mut);
1097 }
1098 }
1099
1100 void SessionMap::update_average_session_age() {
1101 if (!session_map.size()) {
1102 return;
1103 }
1104
1105 double avg_uptime = std::chrono::duration<double>(clock::now()-avg_birth_time).count();
1106 logger->set(l_mdssm_avg_session_uptime, (uint64_t)avg_uptime);
1107 }
1108
1109 int SessionFilter::parse(
1110 const std::vector<std::string> &args,
1111 std::stringstream *ss)
1112 {
1113 ceph_assert(ss != NULL);
1114
1115 for (const auto &s : args) {
1116 dout(20) << __func__ << " parsing filter '" << s << "'" << dendl;
1117
1118 auto eq = s.find("=");
1119 if (eq == std::string::npos || eq == s.size()) {
1120 // allow this to be a bare id for compatibility with pre-octopus asok
1121 // 'session evict'.
1122 std::string err;
1123 id = strict_strtoll(s.c_str(), 10, &err);
1124 if (!err.empty()) {
1125 *ss << "Invalid filter '" << s << "'";
1126 return -EINVAL;
1127 }
1128 return 0;
1129 }
1130
1131 // Keys that start with this are to be taken as referring
1132 // to freeform client metadata fields.
1133 const std::string metadata_prefix("client_metadata.");
1134
1135 auto k = s.substr(0, eq);
1136 auto v = s.substr(eq + 1);
1137
1138 dout(20) << __func__ << " parsed k='" << k << "', v='" << v << "'" << dendl;
1139
1140 if (k.compare(0, metadata_prefix.size(), metadata_prefix) == 0
1141 && k.size() > metadata_prefix.size()) {
1142 // Filter on arbitrary metadata key (no fixed schema for this,
1143 // so anything after the dot is a valid field to filter on)
1144 auto metadata_key = k.substr(metadata_prefix.size());
1145 metadata.insert(std::make_pair(metadata_key, v));
1146 } else if (k == "auth_name") {
1147 // Filter on client entity name
1148 auth_name = v;
1149 } else if (k == "state") {
1150 state = v;
1151 } else if (k == "id") {
1152 std::string err;
1153 id = strict_strtoll(v.c_str(), 10, &err);
1154 if (!err.empty()) {
1155 *ss << err;
1156 return -EINVAL;
1157 }
1158 } else if (k == "reconnecting") {
1159
1160 /**
1161 * Strict boolean parser. Allow true/false/0/1.
1162 * Anything else is -EINVAL.
1163 */
1164 auto is_true = [](std::string_view bstr, bool *out) -> bool
1165 {
1166 ceph_assert(out != nullptr);
1167
1168 if (bstr == "true" || bstr == "1") {
1169 *out = true;
1170 return 0;
1171 } else if (bstr == "false" || bstr == "0") {
1172 *out = false;
1173 return 0;
1174 } else {
1175 return -EINVAL;
1176 }
1177 };
1178
1179 bool bval;
1180 int r = is_true(v, &bval);
1181 if (r == 0) {
1182 set_reconnecting(bval);
1183 } else {
1184 *ss << "Invalid boolean value '" << v << "'";
1185 return -EINVAL;
1186 }
1187 } else {
1188 *ss << "Invalid filter key '" << k << "'";
1189 return -EINVAL;
1190 }
1191 }
1192
1193 return 0;
1194 }
1195
1196 bool SessionFilter::match(
1197 const Session &session,
1198 std::function<bool(client_t)> is_reconnecting) const
1199 {
1200 for (const auto &m : metadata) {
1201 const auto &k = m.first;
1202 const auto &v = m.second;
1203 auto it = session.info.client_metadata.find(k);
1204 if (it == session.info.client_metadata.end()) {
1205 return false;
1206 }
1207 if (it->second != v) {
1208 return false;
1209 }
1210 }
1211
1212 if (!auth_name.empty() && auth_name != session.info.auth_name.get_id()) {
1213 return false;
1214 }
1215
1216 if (!state.empty() && state != session.get_state_name()) {
1217 return false;
1218 }
1219
1220 if (id != 0 && id != session.info.inst.name.num()) {
1221 return false;
1222 }
1223
1224 if (reconnecting.first) {
1225 const bool am_reconnecting = is_reconnecting(session.info.inst.name.num());
1226 if (reconnecting.second != am_reconnecting) {
1227 return false;
1228 }
1229 }
1230
1231 return true;
1232 }
1233
1234 std::ostream& operator<<(std::ostream &out, const Session &s)
1235 {
1236 if (s.get_human_name() == stringify(s.get_client())) {
1237 out << s.get_human_name();
1238 } else {
1239 out << s.get_human_name() << " (" << std::dec << s.get_client() << ")";
1240 }
1241 return out;
1242 }
1243