]> git.proxmox.com Git - ceph.git/blob - ceph/src/mds/SessionMap.cc
update sources to v12.1.3
[ceph.git] / ceph / src / mds / SessionMap.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #include "MDSRank.h"
16 #include "MDCache.h"
17 #include "Mutation.h"
18 #include "SessionMap.h"
19 #include "osdc/Filer.h"
20 #include "common/Finisher.h"
21
22 #include "common/config.h"
23 #include "common/errno.h"
24 #include "include/assert.h"
25 #include "include/stringify.h"
26
27 #define dout_context g_ceph_context
28 #define dout_subsys ceph_subsys_mds
29 #undef dout_prefix
30 #define dout_prefix *_dout << "mds." << rank << ".sessionmap "
31
32 namespace {
33 class SessionMapIOContext : public MDSIOContextBase
34 {
35 protected:
36 SessionMap *sessionmap;
37 MDSRank *get_mds() override {return sessionmap->mds;}
38 public:
39 explicit SessionMapIOContext(SessionMap *sessionmap_) : sessionmap(sessionmap_) {
40 assert(sessionmap != NULL);
41 }
42 };
43 };
44
45 void SessionMap::register_perfcounters()
46 {
47 PerfCountersBuilder plb(g_ceph_context, "mds_sessions",
48 l_mdssm_first, l_mdssm_last);
49 plb.add_u64(l_mdssm_session_count, "session_count",
50 "Session count");
51 plb.add_u64_counter(l_mdssm_session_add, "session_add",
52 "Sessions added");
53 plb.add_u64_counter(l_mdssm_session_remove, "session_remove",
54 "Sessions removed");
55 logger = plb.create_perf_counters();
56 g_ceph_context->get_perfcounters_collection()->add(logger);
57 }
58
59 void SessionMap::dump()
60 {
61 dout(10) << "dump" << dendl;
62 for (ceph::unordered_map<entity_name_t,Session*>::iterator p = session_map.begin();
63 p != session_map.end();
64 ++p)
65 dout(10) << p->first << " " << p->second
66 << " state " << p->second->get_state_name()
67 << " completed " << p->second->info.completed_requests
68 << " prealloc_inos " << p->second->info.prealloc_inos
69 << " used_inos " << p->second->info.used_inos
70 << dendl;
71 }
72
73
74 // ----------------
75 // LOAD
76
77
78 object_t SessionMap::get_object_name() const
79 {
80 char s[30];
81 snprintf(s, sizeof(s), "mds%d_sessionmap", int(mds->get_nodeid()));
82 return object_t(s);
83 }
84
85 namespace {
86 class C_IO_SM_Load : public SessionMapIOContext {
87 public:
88 const bool first; //< Am I the initial (header) load?
89 int header_r; //< Return value from OMAP header read
90 int values_r; //< Return value from OMAP value read
91 bufferlist header_bl;
92 std::map<std::string, bufferlist> session_vals;
93 bool more_session_vals = false;
94
95 C_IO_SM_Load(SessionMap *cm, const bool f)
96 : SessionMapIOContext(cm), first(f), header_r(0), values_r(0) {}
97
98 void finish(int r) override {
99 sessionmap->_load_finish(r, header_r, values_r, first, header_bl, session_vals,
100 more_session_vals);
101 }
102 };
103 }
104
105
106 /**
107 * Decode OMAP header. Call this once when loading.
108 */
109 void SessionMapStore::decode_header(
110 bufferlist &header_bl)
111 {
112 bufferlist::iterator q = header_bl.begin();
113 DECODE_START(1, q)
114 ::decode(version, q);
115 DECODE_FINISH(q);
116 }
117
118 void SessionMapStore::encode_header(
119 bufferlist *header_bl)
120 {
121 ENCODE_START(1, 1, *header_bl);
122 ::encode(version, *header_bl);
123 ENCODE_FINISH(*header_bl);
124 }
125
126 /**
127 * Decode and insert some serialized OMAP values. Call this
128 * repeatedly to insert batched loads.
129 */
130 void SessionMapStore::decode_values(std::map<std::string, bufferlist> &session_vals)
131 {
132 for (std::map<std::string, bufferlist>::iterator i = session_vals.begin();
133 i != session_vals.end(); ++i) {
134
135 entity_inst_t inst;
136
137 bool parsed = inst.name.parse(i->first);
138 if (!parsed) {
139 derr << "Corrupt entity name '" << i->first << "' in sessionmap" << dendl;
140 throw buffer::malformed_input("Corrupt entity name in sessionmap");
141 }
142
143 Session *s = get_or_add_session(inst);
144 if (s->is_closed())
145 s->set_state(Session::STATE_OPEN);
146 bufferlist::iterator q = i->second.begin();
147 s->decode(q);
148 }
149 }
150
151 /**
152 * An OMAP read finished.
153 */
154 void SessionMap::_load_finish(
155 int operation_r,
156 int header_r,
157 int values_r,
158 bool first,
159 bufferlist &header_bl,
160 std::map<std::string, bufferlist> &session_vals,
161 bool more_session_vals)
162 {
163 if (operation_r < 0) {
164 derr << "_load_finish got " << cpp_strerror(operation_r) << dendl;
165 mds->clog->error() << "error reading sessionmap '" << get_object_name()
166 << "' " << operation_r << " ("
167 << cpp_strerror(operation_r) << ")";
168 mds->damaged();
169 ceph_abort(); // Should be unreachable because damaged() calls respawn()
170 }
171
172 // Decode header
173 if (first) {
174 if (header_r != 0) {
175 derr << __func__ << ": header error: " << cpp_strerror(header_r) << dendl;
176 mds->clog->error() << "error reading sessionmap header "
177 << header_r << " (" << cpp_strerror(header_r) << ")";
178 mds->damaged();
179 ceph_abort(); // Should be unreachable because damaged() calls respawn()
180 }
181
182 if(header_bl.length() == 0) {
183 dout(4) << __func__ << ": header missing, loading legacy..." << dendl;
184 load_legacy();
185 return;
186 }
187
188 try {
189 decode_header(header_bl);
190 } catch (buffer::error &e) {
191 mds->clog->error() << "corrupt sessionmap header: " << e.what();
192 mds->damaged();
193 ceph_abort(); // Should be unreachable because damaged() calls respawn()
194 }
195 dout(10) << __func__ << " loaded version " << version << dendl;
196 }
197
198 if (values_r != 0) {
199 derr << __func__ << ": error reading values: "
200 << cpp_strerror(values_r) << dendl;
201 mds->clog->error() << "error reading sessionmap values: "
202 << values_r << " (" << cpp_strerror(values_r) << ")";
203 mds->damaged();
204 ceph_abort(); // Should be unreachable because damaged() calls respawn()
205 }
206
207 // Decode session_vals
208 try {
209 decode_values(session_vals);
210 } catch (buffer::error &e) {
211 mds->clog->error() << "corrupt sessionmap values: " << e.what();
212 mds->damaged();
213 ceph_abort(); // Should be unreachable because damaged() calls respawn()
214 }
215
216 if (more_session_vals) {
217 // Issue another read if we're not at the end of the omap
218 const std::string last_key = session_vals.rbegin()->first;
219 dout(10) << __func__ << ": continue omap load from '"
220 << last_key << "'" << dendl;
221 object_t oid = get_object_name();
222 object_locator_t oloc(mds->mdsmap->get_metadata_pool());
223 C_IO_SM_Load *c = new C_IO_SM_Load(this, false);
224 ObjectOperation op;
225 op.omap_get_vals(last_key, "", g_conf->mds_sessionmap_keys_per_op,
226 &c->session_vals, &c->more_session_vals, &c->values_r);
227 mds->objecter->read(oid, oloc, op, CEPH_NOSNAP, NULL, 0,
228 new C_OnFinisher(c, mds->finisher));
229 } else {
230 // I/O is complete. Update `by_state`
231 dout(10) << __func__ << ": omap load complete" << dendl;
232 for (ceph::unordered_map<entity_name_t, Session*>::iterator i = session_map.begin();
233 i != session_map.end(); ++i) {
234 Session *s = i->second;
235 auto by_state_entry = by_state.find(s->get_state());
236 if (by_state_entry == by_state.end())
237 by_state_entry = by_state.emplace(s->get_state(),
238 new xlist<Session*>).first;
239 by_state_entry->second->push_back(&s->item_session_list);
240 }
241
242 // Population is complete. Trigger load waiters.
243 dout(10) << __func__ << ": v " << version
244 << ", " << session_map.size() << " sessions" << dendl;
245 projected = committing = committed = version;
246 dump();
247 finish_contexts(g_ceph_context, waiting_for_load);
248 }
249 }
250
251 /**
252 * Populate session state from OMAP records in this
253 * rank's sessionmap object.
254 */
255 void SessionMap::load(MDSInternalContextBase *onload)
256 {
257 dout(10) << "load" << dendl;
258
259 if (onload)
260 waiting_for_load.push_back(onload);
261
262 C_IO_SM_Load *c = new C_IO_SM_Load(this, true);
263 object_t oid = get_object_name();
264 object_locator_t oloc(mds->mdsmap->get_metadata_pool());
265
266 ObjectOperation op;
267 op.omap_get_header(&c->header_bl, &c->header_r);
268 op.omap_get_vals("", "", g_conf->mds_sessionmap_keys_per_op,
269 &c->session_vals, &c->more_session_vals, &c->values_r);
270
271 mds->objecter->read(oid, oloc, op, CEPH_NOSNAP, NULL, 0, new C_OnFinisher(c, mds->finisher));
272 }
273
274 namespace {
275 class C_IO_SM_LoadLegacy : public SessionMapIOContext {
276 public:
277 bufferlist bl;
278 explicit C_IO_SM_LoadLegacy(SessionMap *cm) : SessionMapIOContext(cm) {}
279 void finish(int r) override {
280 sessionmap->_load_legacy_finish(r, bl);
281 }
282 };
283 }
284
285
286 /**
287 * Load legacy (object data blob) SessionMap format, assuming
288 * that waiting_for_load has already been populated with
289 * the relevant completion. This is the fallback if we do not
290 * find an OMAP header when attempting to load normally.
291 */
292 void SessionMap::load_legacy()
293 {
294 dout(10) << __func__ << dendl;
295
296 C_IO_SM_LoadLegacy *c = new C_IO_SM_LoadLegacy(this);
297 object_t oid = get_object_name();
298 object_locator_t oloc(mds->mdsmap->get_metadata_pool());
299
300 mds->objecter->read_full(oid, oloc, CEPH_NOSNAP, &c->bl, 0,
301 new C_OnFinisher(c, mds->finisher));
302 }
303
304 void SessionMap::_load_legacy_finish(int r, bufferlist &bl)
305 {
306 bufferlist::iterator blp = bl.begin();
307 if (r < 0) {
308 derr << "_load_finish got " << cpp_strerror(r) << dendl;
309 assert(0 == "failed to load sessionmap");
310 }
311 dump();
312 decode_legacy(blp); // note: this sets last_cap_renew = now()
313 dout(10) << "_load_finish v " << version
314 << ", " << session_map.size() << " sessions, "
315 << bl.length() << " bytes"
316 << dendl;
317 projected = committing = committed = version;
318 dump();
319
320 // Mark all sessions dirty, so that on next save() we will write
321 // a complete OMAP version of the data loaded from the legacy format
322 for (ceph::unordered_map<entity_name_t, Session*>::iterator i = session_map.begin();
323 i != session_map.end(); ++i) {
324 // Don't use mark_dirty because on this occasion we want to ignore the
325 // keys_per_op limit and do one big write (upgrade must be atomic)
326 dirty_sessions.insert(i->first);
327 }
328 loaded_legacy = true;
329
330 finish_contexts(g_ceph_context, waiting_for_load);
331 }
332
333
334 // ----------------
335 // SAVE
336
337 namespace {
338 class C_IO_SM_Save : public SessionMapIOContext {
339 version_t version;
340 public:
341 C_IO_SM_Save(SessionMap *cm, version_t v) : SessionMapIOContext(cm), version(v) {}
342 void finish(int r) override {
343 if (r != 0) {
344 get_mds()->handle_write_error(r);
345 } else {
346 sessionmap->_save_finish(version);
347 }
348 }
349 };
350 }
351
352 void SessionMap::save(MDSInternalContextBase *onsave, version_t needv)
353 {
354 dout(10) << __func__ << ": needv " << needv << ", v " << version << dendl;
355
356 if (needv && committing >= needv) {
357 assert(committing > committed);
358 commit_waiters[committing].push_back(onsave);
359 return;
360 }
361
362 commit_waiters[version].push_back(onsave);
363
364 committing = version;
365 SnapContext snapc;
366 object_t oid = get_object_name();
367 object_locator_t oloc(mds->mdsmap->get_metadata_pool());
368
369 ObjectOperation op;
370
371 /* Compose OSD OMAP transaction for full write */
372 bufferlist header_bl;
373 encode_header(&header_bl);
374 op.omap_set_header(header_bl);
375
376 /* If we loaded a legacy sessionmap, then erase the old data. If
377 * an old-versioned MDS tries to read it, it'll fail out safely
378 * with an end_of_buffer exception */
379 if (loaded_legacy) {
380 dout(4) << __func__ << " erasing legacy sessionmap" << dendl;
381 op.truncate(0);
382 loaded_legacy = false; // only need to truncate once.
383 }
384
385 dout(20) << " updating keys:" << dendl;
386 map<string, bufferlist> to_set;
387 for(std::set<entity_name_t>::iterator i = dirty_sessions.begin();
388 i != dirty_sessions.end(); ++i) {
389 const entity_name_t name = *i;
390 Session *session = session_map[name];
391
392 if (session->is_open() ||
393 session->is_closing() ||
394 session->is_stale() ||
395 session->is_killing()) {
396 dout(20) << " " << name << dendl;
397 // Serialize K
398 std::ostringstream k;
399 k << name;
400
401 // Serialize V
402 bufferlist bl;
403 session->info.encode(bl, mds->mdsmap->get_up_features());
404
405 // Add to RADOS op
406 to_set[k.str()] = bl;
407
408 session->clear_dirty_completed_requests();
409 } else {
410 dout(20) << " " << name << " (ignoring)" << dendl;
411 }
412 }
413 if (!to_set.empty()) {
414 op.omap_set(to_set);
415 }
416
417 dout(20) << " removing keys:" << dendl;
418 set<string> to_remove;
419 for(std::set<entity_name_t>::const_iterator i = null_sessions.begin();
420 i != null_sessions.end(); ++i) {
421 dout(20) << " " << *i << dendl;
422 std::ostringstream k;
423 k << *i;
424 to_remove.insert(k.str());
425 }
426 if (!to_remove.empty()) {
427 op.omap_rm_keys(to_remove);
428 }
429
430 dirty_sessions.clear();
431 null_sessions.clear();
432
433 mds->objecter->mutate(oid, oloc, op, snapc,
434 ceph::real_clock::now(),
435 0,
436 new C_OnFinisher(new C_IO_SM_Save(this, version),
437 mds->finisher));
438 }
439
440 void SessionMap::_save_finish(version_t v)
441 {
442 dout(10) << "_save_finish v" << v << dendl;
443 committed = v;
444
445 finish_contexts(g_ceph_context, commit_waiters[v]);
446 commit_waiters.erase(v);
447 }
448
449
450 /**
451 * Deserialize sessions, and update by_state index
452 */
453 void SessionMap::decode_legacy(bufferlist::iterator &p)
454 {
455 // Populate `sessions`
456 SessionMapStore::decode_legacy(p);
457
458 // Update `by_state`
459 for (ceph::unordered_map<entity_name_t, Session*>::iterator i = session_map.begin();
460 i != session_map.end(); ++i) {
461 Session *s = i->second;
462 auto by_state_entry = by_state.find(s->get_state());
463 if (by_state_entry == by_state.end())
464 by_state_entry = by_state.emplace(s->get_state(),
465 new xlist<Session*>).first;
466 by_state_entry->second->push_back(&s->item_session_list);
467 }
468 }
469
470 uint64_t SessionMap::set_state(Session *session, int s) {
471 if (session->state != s) {
472 session->set_state(s);
473 auto by_state_entry = by_state.find(s);
474 if (by_state_entry == by_state.end())
475 by_state_entry = by_state.emplace(s, new xlist<Session*>).first;
476 by_state_entry->second->push_back(&session->item_session_list);
477 }
478 return session->get_state_seq();
479 }
480
481 void SessionMapStore::decode_legacy(bufferlist::iterator& p)
482 {
483 utime_t now = ceph_clock_now();
484 uint64_t pre;
485 ::decode(pre, p);
486 if (pre == (uint64_t)-1) {
487 DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, p);
488 assert(struct_v >= 2);
489
490 ::decode(version, p);
491
492 while (!p.end()) {
493 entity_inst_t inst;
494 ::decode(inst.name, p);
495 Session *s = get_or_add_session(inst);
496 if (s->is_closed())
497 s->set_state(Session::STATE_OPEN);
498 s->decode(p);
499 }
500
501 DECODE_FINISH(p);
502 } else {
503 // --- old format ----
504 version = pre;
505
506 // this is a meaningless upper bound. can be ignored.
507 __u32 n;
508 ::decode(n, p);
509
510 while (n-- && !p.end()) {
511 bufferlist::iterator p2 = p;
512 Session *s = new Session;
513 s->info.decode(p);
514 if (session_map.count(s->info.inst.name)) {
515 // eager client connected too fast! aie.
516 dout(10) << " already had session for " << s->info.inst.name << ", recovering" << dendl;
517 entity_name_t n = s->info.inst.name;
518 delete s;
519 s = session_map[n];
520 p = p2;
521 s->info.decode(p);
522 } else {
523 session_map[s->info.inst.name] = s;
524 }
525 s->set_state(Session::STATE_OPEN);
526 s->last_cap_renew = now;
527 }
528 }
529 }
530
531 void SessionMapStore::dump(Formatter *f) const
532 {
533 f->open_array_section("Sessions");
534 for (ceph::unordered_map<entity_name_t,Session*>::const_iterator p = session_map.begin();
535 p != session_map.end();
536 ++p) {
537 f->open_object_section("Session");
538 f->open_object_section("entity name");
539 p->first.dump(f);
540 f->close_section(); // entity name
541 f->dump_string("state", p->second->get_state_name());
542 f->open_object_section("Session info");
543 p->second->info.dump(f);
544 f->close_section(); // Session info
545 f->close_section(); // Session
546 }
547 f->close_section(); // Sessions
548 }
549
550 void SessionMapStore::generate_test_instances(list<SessionMapStore*>& ls)
551 {
552 // pretty boring for now
553 ls.push_back(new SessionMapStore());
554 }
555
556 void SessionMap::wipe()
557 {
558 dout(1) << "wipe start" << dendl;
559 dump();
560 while (!session_map.empty()) {
561 Session *s = session_map.begin()->second;
562 remove_session(s);
563 }
564 version = ++projected;
565 dout(1) << "wipe result" << dendl;
566 dump();
567 dout(1) << "wipe done" << dendl;
568 }
569
570 void SessionMap::wipe_ino_prealloc()
571 {
572 for (ceph::unordered_map<entity_name_t,Session*>::iterator p = session_map.begin();
573 p != session_map.end();
574 ++p) {
575 p->second->pending_prealloc_inos.clear();
576 p->second->info.prealloc_inos.clear();
577 p->second->info.used_inos.clear();
578 }
579 projected = ++version;
580 }
581
582 void SessionMap::add_session(Session *s)
583 {
584 dout(10) << __func__ << " s=" << s << " name=" << s->info.inst.name << dendl;
585
586 assert(session_map.count(s->info.inst.name) == 0);
587 session_map[s->info.inst.name] = s;
588 auto by_state_entry = by_state.find(s->state);
589 if (by_state_entry == by_state.end())
590 by_state_entry = by_state.emplace(s->state, new xlist<Session*>).first;
591 by_state_entry->second->push_back(&s->item_session_list);
592 s->get();
593
594 logger->set(l_mdssm_session_count, session_map.size());
595 logger->inc(l_mdssm_session_add);
596 }
597
598 void SessionMap::remove_session(Session *s)
599 {
600 dout(10) << __func__ << " s=" << s << " name=" << s->info.inst.name << dendl;
601
602 s->trim_completed_requests(0);
603 s->item_session_list.remove_myself();
604 session_map.erase(s->info.inst.name);
605 dirty_sessions.erase(s->info.inst.name);
606 null_sessions.insert(s->info.inst.name);
607 s->put();
608
609 logger->set(l_mdssm_session_count, session_map.size());
610 logger->inc(l_mdssm_session_remove);
611 }
612
613 void SessionMap::touch_session(Session *session)
614 {
615 dout(10) << __func__ << " s=" << session << " name=" << session->info.inst.name << dendl;
616
617 // Move to the back of the session list for this state (should
618 // already be on a list courtesy of add_session and set_state)
619 assert(session->item_session_list.is_on_list());
620 auto by_state_entry = by_state.find(session->state);
621 if (by_state_entry == by_state.end())
622 by_state_entry = by_state.emplace(session->state,
623 new xlist<Session*>).first;
624 by_state_entry->second->push_back(&session->item_session_list);
625
626 session->last_cap_renew = ceph_clock_now();
627 }
628
629 void SessionMap::_mark_dirty(Session *s)
630 {
631 if (dirty_sessions.count(s->info.inst.name))
632 return;
633
634 if (dirty_sessions.size() >= g_conf->mds_sessionmap_keys_per_op) {
635 // Pre-empt the usual save() call from journal segment trim, in
636 // order to avoid building up an oversized OMAP update operation
637 // from too many sessions modified at once
638 save(new C_MDSInternalNoop, version);
639 }
640
641 null_sessions.erase(s->info.inst.name);
642 dirty_sessions.insert(s->info.inst.name);
643 }
644
645 void SessionMap::mark_dirty(Session *s)
646 {
647 dout(20) << __func__ << " s=" << s << " name=" << s->info.inst.name
648 << " v=" << version << dendl;
649
650 _mark_dirty(s);
651 version++;
652 s->pop_pv(version);
653 }
654
655 void SessionMap::replay_dirty_session(Session *s)
656 {
657 dout(20) << __func__ << " s=" << s << " name=" << s->info.inst.name
658 << " v=" << version << dendl;
659
660 _mark_dirty(s);
661
662 replay_advance_version();
663 }
664
665 void SessionMap::replay_advance_version()
666 {
667 version++;
668 projected = version;
669 }
670
671 version_t SessionMap::mark_projected(Session *s)
672 {
673 dout(20) << __func__ << " s=" << s << " name=" << s->info.inst.name
674 << " pv=" << projected << " -> " << projected + 1 << dendl;
675 ++projected;
676 s->push_pv(projected);
677 return projected;
678 }
679
680 namespace {
681 class C_IO_SM_Save_One : public SessionMapIOContext {
682 MDSInternalContextBase *on_safe;
683 public:
684 C_IO_SM_Save_One(SessionMap *cm, MDSInternalContextBase *on_safe_)
685 : SessionMapIOContext(cm), on_safe(on_safe_) {}
686 void finish(int r) override {
687 if (r != 0) {
688 get_mds()->handle_write_error(r);
689 } else {
690 on_safe->complete(r);
691 }
692 }
693 };
694 }
695
696
697 void SessionMap::save_if_dirty(const std::set<entity_name_t> &tgt_sessions,
698 MDSGatherBuilder *gather_bld)
699 {
700 assert(gather_bld != NULL);
701
702 std::vector<entity_name_t> write_sessions;
703
704 // Decide which sessions require a write
705 for (std::set<entity_name_t>::iterator i = tgt_sessions.begin();
706 i != tgt_sessions.end(); ++i) {
707 const entity_name_t &session_id = *i;
708
709 if (session_map.count(session_id) == 0) {
710 // Session isn't around any more, never mind.
711 continue;
712 }
713
714 Session *session = session_map[session_id];
715 if (!session->has_dirty_completed_requests()) {
716 // Session hasn't had completed_requests
717 // modified since last write, no need to
718 // write it now.
719 continue;
720 }
721
722 if (dirty_sessions.count(session_id) > 0) {
723 // Session is already dirtied, will be written, no
724 // need to pre-empt that.
725 continue;
726 }
727 // Okay, passed all our checks, now we write
728 // this session out. The version we write
729 // into the OMAP may now be higher-versioned
730 // than the version in the header, but that's
731 // okay because it's never a problem to have
732 // an overly-fresh copy of a session.
733 write_sessions.push_back(*i);
734 }
735
736 dout(4) << __func__ << ": writing " << write_sessions.size() << dendl;
737
738 // Batch writes into mds_sessionmap_keys_per_op
739 const uint32_t kpo = g_conf->mds_sessionmap_keys_per_op;
740 map<string, bufferlist> to_set;
741 for (uint32_t i = 0; i < write_sessions.size(); ++i) {
742 // Start a new write transaction?
743 if (i % g_conf->mds_sessionmap_keys_per_op == 0) {
744 to_set.clear();
745 }
746
747 const entity_name_t &session_id = write_sessions[i];
748 Session *session = session_map[session_id];
749 session->clear_dirty_completed_requests();
750
751 // Serialize K
752 std::ostringstream k;
753 k << session_id;
754
755 // Serialize V
756 bufferlist bl;
757 session->info.encode(bl, mds->mdsmap->get_up_features());
758
759 // Add to RADOS op
760 to_set[k.str()] = bl;
761
762 // Complete this write transaction?
763 if (i == write_sessions.size() - 1
764 || i % kpo == kpo - 1) {
765 ObjectOperation op;
766 op.omap_set(to_set);
767
768 SnapContext snapc;
769 object_t oid = get_object_name();
770 object_locator_t oloc(mds->mdsmap->get_metadata_pool());
771 MDSInternalContextBase *on_safe = gather_bld->new_sub();
772 mds->objecter->mutate(oid, oloc, op, snapc,
773 ceph::real_clock::now(),
774 0, new C_OnFinisher(
775 new C_IO_SM_Save_One(this, on_safe),
776 mds->finisher));
777 }
778 }
779 }
780
781 // =================
782 // Session
783
784 #undef dout_prefix
785 #define dout_prefix *_dout << "Session "
786
787 /**
788 * Calculate the length of the `requests` member list,
789 * because elist does not have a size() method.
790 *
791 * O(N) runtime. This would be const, but elist doesn't
792 * have const iterators.
793 */
794 size_t Session::get_request_count()
795 {
796 size_t result = 0;
797
798 elist<MDRequestImpl*>::iterator p = requests.begin(
799 member_offset(MDRequestImpl, item_session_request));
800 while (!p.end()) {
801 ++result;
802 ++p;
803 }
804
805 return result;
806 }
807
808 /**
809 * Capped in response to a CEPH_MSG_CLIENT_CAPRELEASE message,
810 * with n_caps equal to the number of caps that were released
811 * in the message. Used to update state about how many caps a
812 * client has released since it was last instructed to RECALL_STATE.
813 */
814 void Session::notify_cap_release(size_t n_caps)
815 {
816 if (!recalled_at.is_zero()) {
817 recall_release_count += n_caps;
818 if (recall_release_count >= recall_count)
819 clear_recalled_at();
820 }
821 }
822
823 /**
824 * Called when a CEPH_MSG_CLIENT_SESSION->CEPH_SESSION_RECALL_STATE
825 * message is sent to the client. Update our recall-related state
826 * in order to generate health metrics if the session doesn't see
827 * a commensurate number of calls to ::notify_cap_release
828 */
829 void Session::notify_recall_sent(const int new_limit)
830 {
831 if (recalled_at.is_zero()) {
832 // Entering recall phase, set up counters so we can later
833 // judge whether the client has respected the recall request
834 recalled_at = last_recall_sent = ceph_clock_now();
835 assert (new_limit < caps.size()); // Behaviour of Server::recall_client_state
836 recall_count = caps.size() - new_limit;
837 recall_release_count = 0;
838 } else {
839 last_recall_sent = ceph_clock_now();
840 }
841 }
842
843 void Session::clear_recalled_at()
844 {
845 recalled_at = last_recall_sent = utime_t();
846 recall_count = 0;
847 recall_release_count = 0;
848 }
849
850 void Session::set_client_metadata(map<string, string> const &meta)
851 {
852 info.client_metadata = meta;
853
854 _update_human_name();
855 }
856
857 /**
858 * Use client metadata to generate a somewhat-friendlier
859 * name for the client than its session ID.
860 *
861 * This is *not* guaranteed to be unique, and any machine
862 * consumers of session-related output should always use
863 * the session ID as a primary capacity and use this only
864 * as a presentation hint.
865 */
866 void Session::_update_human_name()
867 {
868 auto info_client_metadata_entry = info.client_metadata.find("hostname");
869 if (info_client_metadata_entry != info.client_metadata.end()) {
870 // Happy path, refer to clients by hostname
871 human_name = info_client_metadata_entry->second;
872 if (!info.auth_name.has_default_id()) {
873 // When a non-default entity ID is set by the user, assume they
874 // would like to see it in references to the client, if it's
875 // reasonable short. Limit the length because we don't want
876 // to put e.g. uuid-generated names into a "human readable"
877 // rendering.
878 const int arbitrarily_short = 16;
879 if (info.auth_name.get_id().size() < arbitrarily_short) {
880 human_name += std::string(":") + info.auth_name.get_id();
881 }
882 }
883 } else {
884 // Fallback, refer to clients by ID e.g. client.4567
885 human_name = stringify(info.inst.name.num());
886 }
887 }
888
889 void Session::decode(bufferlist::iterator &p)
890 {
891 info.decode(p);
892
893 _update_human_name();
894 }
895
896 int Session::check_access(CInode *in, unsigned mask,
897 int caller_uid, int caller_gid,
898 const vector<uint64_t> *caller_gid_list,
899 int new_uid, int new_gid)
900 {
901 string path;
902 CInode *diri = NULL;
903 if (!in->is_base())
904 diri = in->get_projected_parent_dn()->get_dir()->get_inode();
905 if (diri && diri->is_stray()){
906 path = in->get_projected_inode()->stray_prior_path;
907 dout(20) << __func__ << " stray_prior_path " << path << dendl;
908 } else {
909 in->make_path_string(path, true);
910 dout(20) << __func__ << " path " << path << dendl;
911 }
912 if (path.length())
913 path = path.substr(1); // drop leading /
914
915 if (in->inode.is_dir() &&
916 in->inode.has_layout() &&
917 in->inode.layout.pool_ns.length() &&
918 !connection->has_feature(CEPH_FEATURE_FS_FILE_LAYOUT_V2)) {
919 dout(10) << __func__ << " client doesn't support FS_FILE_LAYOUT_V2" << dendl;
920 return -EIO;
921 }
922
923 if (!auth_caps.is_capable(path, in->inode.uid, in->inode.gid, in->inode.mode,
924 caller_uid, caller_gid, caller_gid_list, mask,
925 new_uid, new_gid)) {
926 return -EACCES;
927 }
928 return 0;
929 }
930
931 int SessionFilter::parse(
932 const std::vector<std::string> &args,
933 std::stringstream *ss)
934 {
935 assert(ss != NULL);
936
937 for (const auto &s : args) {
938 dout(20) << __func__ << " parsing filter '" << s << "'" << dendl;
939
940 auto eq = s.find("=");
941 if (eq == std::string::npos || eq == s.size()) {
942 *ss << "Invalid filter '" << s << "'";
943 return -EINVAL;
944 }
945
946 // Keys that start with this are to be taken as referring
947 // to freeform client metadata fields.
948 const std::string metadata_prefix("client_metadata.");
949
950 auto k = s.substr(0, eq);
951 auto v = s.substr(eq + 1);
952
953 dout(20) << __func__ << " parsed k='" << k << "', v='" << v << "'" << dendl;
954
955 if (k.compare(0, metadata_prefix.size(), metadata_prefix) == 0
956 && k.size() > metadata_prefix.size()) {
957 // Filter on arbitrary metadata key (no fixed schema for this,
958 // so anything after the dot is a valid field to filter on)
959 auto metadata_key = k.substr(metadata_prefix.size());
960 metadata.insert(std::make_pair(metadata_key, v));
961 } else if (k == "auth_name") {
962 // Filter on client entity name
963 auth_name = v;
964 } else if (k == "state") {
965 state = v;
966 } else if (k == "id") {
967 std::string err;
968 id = strict_strtoll(v.c_str(), 10, &err);
969 if (!err.empty()) {
970 *ss << err;
971 return -EINVAL;
972 }
973 } else if (k == "reconnecting") {
974
975 /**
976 * Strict boolean parser. Allow true/false/0/1.
977 * Anything else is -EINVAL.
978 */
979 auto is_true = [](const std::string &bstr, bool *out) -> bool
980 {
981 assert(out != nullptr);
982
983 if (bstr == "true" || bstr == "1") {
984 *out = true;
985 return 0;
986 } else if (bstr == "false" || bstr == "0") {
987 *out = false;
988 return 0;
989 } else {
990 return -EINVAL;
991 }
992 };
993
994 bool bval;
995 int r = is_true(v, &bval);
996 if (r == 0) {
997 set_reconnecting(bval);
998 } else {
999 *ss << "Invalid boolean value '" << v << "'";
1000 return -EINVAL;
1001 }
1002 } else {
1003 *ss << "Invalid filter key '" << k << "'";
1004 return -EINVAL;
1005 }
1006 }
1007
1008 return 0;
1009 }
1010
1011 bool SessionFilter::match(
1012 const Session &session,
1013 std::function<bool(client_t)> is_reconnecting) const
1014 {
1015 for (const auto &m : metadata) {
1016 const auto &k = m.first;
1017 const auto &v = m.second;
1018 if (session.info.client_metadata.count(k) == 0) {
1019 return false;
1020 }
1021 if (session.info.client_metadata.at(k) != v) {
1022 return false;
1023 }
1024 }
1025
1026 if (!auth_name.empty() && auth_name != session.info.auth_name.get_id()) {
1027 return false;
1028 }
1029
1030 if (!state.empty() && state != session.get_state_name()) {
1031 return false;
1032 }
1033
1034 if (id != 0 && id != session.info.inst.name.num()) {
1035 return false;
1036 }
1037
1038 if (reconnecting.first) {
1039 const bool am_reconnecting = is_reconnecting(session.info.inst.name.num());
1040 if (reconnecting.second != am_reconnecting) {
1041 return false;
1042 }
1043 }
1044
1045 return true;
1046 }
1047
1048 std::ostream& operator<<(std::ostream &out, const Session &s)
1049 {
1050 if (s.get_human_name() == stringify(s.info.inst.name.num())) {
1051 out << s.get_human_name();
1052 } else {
1053 out << s.get_human_name() << " (" << std::dec << s.info.inst.name.num() << ")";
1054 }
1055 return out;
1056 }
1057