1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
18 #include "SessionMap.h"
19 #include "osdc/Filer.h"
20 #include "common/Finisher.h"
22 #include "common/config.h"
23 #include "common/errno.h"
24 #include "include/assert.h"
25 #include "include/stringify.h"
27 #define dout_context g_ceph_context
28 #define dout_subsys ceph_subsys_mds
30 #define dout_prefix *_dout << "mds." << rank << ".sessionmap "
33 class SessionMapIOContext
: public MDSIOContextBase
36 SessionMap
*sessionmap
;
37 MDSRank
*get_mds() override
{return sessionmap
->mds
;}
39 explicit SessionMapIOContext(SessionMap
*sessionmap_
) : sessionmap(sessionmap_
) {
40 assert(sessionmap
!= NULL
);
45 void SessionMap::register_perfcounters()
47 PerfCountersBuilder
plb(g_ceph_context
, "mds_sessions",
48 l_mdssm_first
, l_mdssm_last
);
49 plb
.add_u64(l_mdssm_session_count
, "session_count",
50 "Session count", "sess", PerfCountersBuilder::PRIO_INTERESTING
);
51 plb
.add_u64_counter(l_mdssm_session_add
, "session_add",
53 plb
.add_u64_counter(l_mdssm_session_remove
, "session_remove",
55 logger
= plb
.create_perf_counters();
56 g_ceph_context
->get_perfcounters_collection()->add(logger
);
59 void SessionMap::dump()
61 dout(10) << "dump" << dendl
;
62 for (ceph::unordered_map
<entity_name_t
,Session
*>::iterator p
= session_map
.begin();
63 p
!= session_map
.end();
65 dout(10) << p
->first
<< " " << p
->second
66 << " state " << p
->second
->get_state_name()
67 << " completed " << p
->second
->info
.completed_requests
68 << " prealloc_inos " << p
->second
->info
.prealloc_inos
69 << " used_inos " << p
->second
->info
.used_inos
78 object_t
SessionMap::get_object_name() const
81 snprintf(s
, sizeof(s
), "mds%d_sessionmap", int(mds
->get_nodeid()));
86 class C_IO_SM_Load
: public SessionMapIOContext
{
88 const bool first
; //< Am I the initial (header) load?
89 int header_r
; //< Return value from OMAP header read
90 int values_r
; //< Return value from OMAP value read
92 std::map
<std::string
, bufferlist
> session_vals
;
93 bool more_session_vals
= false;
95 C_IO_SM_Load(SessionMap
*cm
, const bool f
)
96 : SessionMapIOContext(cm
), first(f
), header_r(0), values_r(0) {}
98 void finish(int r
) override
{
99 sessionmap
->_load_finish(r
, header_r
, values_r
, first
, header_bl
, session_vals
,
107 * Decode OMAP header. Call this once when loading.
109 void SessionMapStore::decode_header(
110 bufferlist
&header_bl
)
112 bufferlist::iterator q
= header_bl
.begin();
114 ::decode(version
, q
);
118 void SessionMapStore::encode_header(
119 bufferlist
*header_bl
)
121 ENCODE_START(1, 1, *header_bl
);
122 ::encode(version
, *header_bl
);
123 ENCODE_FINISH(*header_bl
);
127 * Decode and insert some serialized OMAP values. Call this
128 * repeatedly to insert batched loads.
130 void SessionMapStore::decode_values(std::map
<std::string
, bufferlist
> &session_vals
)
132 for (std::map
<std::string
, bufferlist
>::iterator i
= session_vals
.begin();
133 i
!= session_vals
.end(); ++i
) {
137 bool parsed
= inst
.name
.parse(i
->first
);
139 derr
<< "Corrupt entity name '" << i
->first
<< "' in sessionmap" << dendl
;
140 throw buffer::malformed_input("Corrupt entity name in sessionmap");
143 Session
*s
= get_or_add_session(inst
);
145 s
->set_state(Session::STATE_OPEN
);
146 bufferlist::iterator q
= i
->second
.begin();
152 * An OMAP read finished.
154 void SessionMap::_load_finish(
159 bufferlist
&header_bl
,
160 std::map
<std::string
, bufferlist
> &session_vals
,
161 bool more_session_vals
)
163 if (operation_r
< 0) {
164 derr
<< "_load_finish got " << cpp_strerror(operation_r
) << dendl
;
165 mds
->clog
->error() << "error reading sessionmap '" << get_object_name()
166 << "' " << operation_r
<< " ("
167 << cpp_strerror(operation_r
) << ")";
169 ceph_abort(); // Should be unreachable because damaged() calls respawn()
175 derr
<< __func__
<< ": header error: " << cpp_strerror(header_r
) << dendl
;
176 mds
->clog
->error() << "error reading sessionmap header "
177 << header_r
<< " (" << cpp_strerror(header_r
) << ")";
179 ceph_abort(); // Should be unreachable because damaged() calls respawn()
182 if(header_bl
.length() == 0) {
183 dout(4) << __func__
<< ": header missing, loading legacy..." << dendl
;
189 decode_header(header_bl
);
190 } catch (buffer::error
&e
) {
191 mds
->clog
->error() << "corrupt sessionmap header: " << e
.what();
193 ceph_abort(); // Should be unreachable because damaged() calls respawn()
195 dout(10) << __func__
<< " loaded version " << version
<< dendl
;
199 derr
<< __func__
<< ": error reading values: "
200 << cpp_strerror(values_r
) << dendl
;
201 mds
->clog
->error() << "error reading sessionmap values: "
202 << values_r
<< " (" << cpp_strerror(values_r
) << ")";
204 ceph_abort(); // Should be unreachable because damaged() calls respawn()
207 // Decode session_vals
209 decode_values(session_vals
);
210 } catch (buffer::error
&e
) {
211 mds
->clog
->error() << "corrupt sessionmap values: " << e
.what();
213 ceph_abort(); // Should be unreachable because damaged() calls respawn()
216 if (more_session_vals
) {
217 // Issue another read if we're not at the end of the omap
218 const std::string last_key
= session_vals
.rbegin()->first
;
219 dout(10) << __func__
<< ": continue omap load from '"
220 << last_key
<< "'" << dendl
;
221 object_t oid
= get_object_name();
222 object_locator_t
oloc(mds
->mdsmap
->get_metadata_pool());
223 C_IO_SM_Load
*c
= new C_IO_SM_Load(this, false);
225 op
.omap_get_vals(last_key
, "", g_conf
->mds_sessionmap_keys_per_op
,
226 &c
->session_vals
, &c
->more_session_vals
, &c
->values_r
);
227 mds
->objecter
->read(oid
, oloc
, op
, CEPH_NOSNAP
, NULL
, 0,
228 new C_OnFinisher(c
, mds
->finisher
));
230 // I/O is complete. Update `by_state`
231 dout(10) << __func__
<< ": omap load complete" << dendl
;
232 for (ceph::unordered_map
<entity_name_t
, Session
*>::iterator i
= session_map
.begin();
233 i
!= session_map
.end(); ++i
) {
234 Session
*s
= i
->second
;
235 auto by_state_entry
= by_state
.find(s
->get_state());
236 if (by_state_entry
== by_state
.end())
237 by_state_entry
= by_state
.emplace(s
->get_state(),
238 new xlist
<Session
*>).first
;
239 by_state_entry
->second
->push_back(&s
->item_session_list
);
242 // Population is complete. Trigger load waiters.
243 dout(10) << __func__
<< ": v " << version
244 << ", " << session_map
.size() << " sessions" << dendl
;
245 projected
= committing
= committed
= version
;
247 finish_contexts(g_ceph_context
, waiting_for_load
);
252 * Populate session state from OMAP records in this
253 * rank's sessionmap object.
255 void SessionMap::load(MDSInternalContextBase
*onload
)
257 dout(10) << "load" << dendl
;
260 waiting_for_load
.push_back(onload
);
262 C_IO_SM_Load
*c
= new C_IO_SM_Load(this, true);
263 object_t oid
= get_object_name();
264 object_locator_t
oloc(mds
->mdsmap
->get_metadata_pool());
267 op
.omap_get_header(&c
->header_bl
, &c
->header_r
);
268 op
.omap_get_vals("", "", g_conf
->mds_sessionmap_keys_per_op
,
269 &c
->session_vals
, &c
->more_session_vals
, &c
->values_r
);
271 mds
->objecter
->read(oid
, oloc
, op
, CEPH_NOSNAP
, NULL
, 0, new C_OnFinisher(c
, mds
->finisher
));
275 class C_IO_SM_LoadLegacy
: public SessionMapIOContext
{
278 explicit C_IO_SM_LoadLegacy(SessionMap
*cm
) : SessionMapIOContext(cm
) {}
279 void finish(int r
) override
{
280 sessionmap
->_load_legacy_finish(r
, bl
);
287 * Load legacy (object data blob) SessionMap format, assuming
288 * that waiting_for_load has already been populated with
289 * the relevant completion. This is the fallback if we do not
290 * find an OMAP header when attempting to load normally.
292 void SessionMap::load_legacy()
294 dout(10) << __func__
<< dendl
;
296 C_IO_SM_LoadLegacy
*c
= new C_IO_SM_LoadLegacy(this);
297 object_t oid
= get_object_name();
298 object_locator_t
oloc(mds
->mdsmap
->get_metadata_pool());
300 mds
->objecter
->read_full(oid
, oloc
, CEPH_NOSNAP
, &c
->bl
, 0,
301 new C_OnFinisher(c
, mds
->finisher
));
304 void SessionMap::_load_legacy_finish(int r
, bufferlist
&bl
)
306 bufferlist::iterator blp
= bl
.begin();
308 derr
<< "_load_finish got " << cpp_strerror(r
) << dendl
;
309 assert(0 == "failed to load sessionmap");
312 decode_legacy(blp
); // note: this sets last_cap_renew = now()
313 dout(10) << "_load_finish v " << version
314 << ", " << session_map
.size() << " sessions, "
315 << bl
.length() << " bytes"
317 projected
= committing
= committed
= version
;
320 // Mark all sessions dirty, so that on next save() we will write
321 // a complete OMAP version of the data loaded from the legacy format
322 for (ceph::unordered_map
<entity_name_t
, Session
*>::iterator i
= session_map
.begin();
323 i
!= session_map
.end(); ++i
) {
324 // Don't use mark_dirty because on this occasion we want to ignore the
325 // keys_per_op limit and do one big write (upgrade must be atomic)
326 dirty_sessions
.insert(i
->first
);
328 loaded_legacy
= true;
330 finish_contexts(g_ceph_context
, waiting_for_load
);
338 class C_IO_SM_Save
: public SessionMapIOContext
{
341 C_IO_SM_Save(SessionMap
*cm
, version_t v
) : SessionMapIOContext(cm
), version(v
) {}
342 void finish(int r
) override
{
344 get_mds()->handle_write_error(r
);
346 sessionmap
->_save_finish(version
);
352 void SessionMap::save(MDSInternalContextBase
*onsave
, version_t needv
)
354 dout(10) << __func__
<< ": needv " << needv
<< ", v " << version
<< dendl
;
356 if (needv
&& committing
>= needv
) {
357 assert(committing
> committed
);
358 commit_waiters
[committing
].push_back(onsave
);
362 commit_waiters
[version
].push_back(onsave
);
364 committing
= version
;
366 object_t oid
= get_object_name();
367 object_locator_t
oloc(mds
->mdsmap
->get_metadata_pool());
371 /* Compose OSD OMAP transaction for full write */
372 bufferlist header_bl
;
373 encode_header(&header_bl
);
374 op
.omap_set_header(header_bl
);
376 /* If we loaded a legacy sessionmap, then erase the old data. If
377 * an old-versioned MDS tries to read it, it'll fail out safely
378 * with an end_of_buffer exception */
380 dout(4) << __func__
<< " erasing legacy sessionmap" << dendl
;
382 loaded_legacy
= false; // only need to truncate once.
385 dout(20) << " updating keys:" << dendl
;
386 map
<string
, bufferlist
> to_set
;
387 for(std::set
<entity_name_t
>::iterator i
= dirty_sessions
.begin();
388 i
!= dirty_sessions
.end(); ++i
) {
389 const entity_name_t name
= *i
;
390 Session
*session
= session_map
[name
];
392 if (session
->is_open() ||
393 session
->is_closing() ||
394 session
->is_stale() ||
395 session
->is_killing()) {
396 dout(20) << " " << name
<< dendl
;
398 std::ostringstream k
;
403 session
->info
.encode(bl
, mds
->mdsmap
->get_up_features());
406 to_set
[k
.str()] = bl
;
408 session
->clear_dirty_completed_requests();
410 dout(20) << " " << name
<< " (ignoring)" << dendl
;
413 if (!to_set
.empty()) {
417 dout(20) << " removing keys:" << dendl
;
418 set
<string
> to_remove
;
419 for(std::set
<entity_name_t
>::const_iterator i
= null_sessions
.begin();
420 i
!= null_sessions
.end(); ++i
) {
421 dout(20) << " " << *i
<< dendl
;
422 std::ostringstream k
;
424 to_remove
.insert(k
.str());
426 if (!to_remove
.empty()) {
427 op
.omap_rm_keys(to_remove
);
430 dirty_sessions
.clear();
431 null_sessions
.clear();
433 mds
->objecter
->mutate(oid
, oloc
, op
, snapc
,
434 ceph::real_clock::now(),
436 new C_OnFinisher(new C_IO_SM_Save(this, version
),
440 void SessionMap::_save_finish(version_t v
)
442 dout(10) << "_save_finish v" << v
<< dendl
;
445 finish_contexts(g_ceph_context
, commit_waiters
[v
]);
446 commit_waiters
.erase(v
);
451 * Deserialize sessions, and update by_state index
453 void SessionMap::decode_legacy(bufferlist::iterator
&p
)
455 // Populate `sessions`
456 SessionMapStore::decode_legacy(p
);
459 for (ceph::unordered_map
<entity_name_t
, Session
*>::iterator i
= session_map
.begin();
460 i
!= session_map
.end(); ++i
) {
461 Session
*s
= i
->second
;
462 auto by_state_entry
= by_state
.find(s
->get_state());
463 if (by_state_entry
== by_state
.end())
464 by_state_entry
= by_state
.emplace(s
->get_state(),
465 new xlist
<Session
*>).first
;
466 by_state_entry
->second
->push_back(&s
->item_session_list
);
470 uint64_t SessionMap::set_state(Session
*session
, int s
) {
471 if (session
->state
!= s
) {
472 session
->set_state(s
);
473 auto by_state_entry
= by_state
.find(s
);
474 if (by_state_entry
== by_state
.end())
475 by_state_entry
= by_state
.emplace(s
, new xlist
<Session
*>).first
;
476 by_state_entry
->second
->push_back(&session
->item_session_list
);
478 return session
->get_state_seq();
481 void SessionMapStore::decode_legacy(bufferlist::iterator
& p
)
483 utime_t now
= ceph_clock_now();
486 if (pre
== (uint64_t)-1) {
487 DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, p
);
488 assert(struct_v
>= 2);
490 ::decode(version
, p
);
494 ::decode(inst
.name
, p
);
495 Session
*s
= get_or_add_session(inst
);
497 s
->set_state(Session::STATE_OPEN
);
503 // --- old format ----
506 // this is a meaningless upper bound. can be ignored.
510 while (n
-- && !p
.end()) {
511 bufferlist::iterator p2
= p
;
512 Session
*s
= new Session
;
514 if (session_map
.count(s
->info
.inst
.name
)) {
515 // eager client connected too fast! aie.
516 dout(10) << " already had session for " << s
->info
.inst
.name
<< ", recovering" << dendl
;
517 entity_name_t n
= s
->info
.inst
.name
;
523 session_map
[s
->info
.inst
.name
] = s
;
525 s
->set_state(Session::STATE_OPEN
);
526 s
->last_cap_renew
= now
;
531 void SessionMapStore::dump(Formatter
*f
) const
533 f
->open_array_section("Sessions");
534 for (ceph::unordered_map
<entity_name_t
,Session
*>::const_iterator p
= session_map
.begin();
535 p
!= session_map
.end();
537 f
->open_object_section("Session");
538 f
->open_object_section("entity name");
540 f
->close_section(); // entity name
541 f
->dump_string("state", p
->second
->get_state_name());
542 f
->open_object_section("Session info");
543 p
->second
->info
.dump(f
);
544 f
->close_section(); // Session info
545 f
->close_section(); // Session
547 f
->close_section(); // Sessions
550 void SessionMapStore::generate_test_instances(list
<SessionMapStore
*>& ls
)
552 // pretty boring for now
553 ls
.push_back(new SessionMapStore());
556 void SessionMap::wipe()
558 dout(1) << "wipe start" << dendl
;
560 while (!session_map
.empty()) {
561 Session
*s
= session_map
.begin()->second
;
564 version
= ++projected
;
565 dout(1) << "wipe result" << dendl
;
567 dout(1) << "wipe done" << dendl
;
570 void SessionMap::wipe_ino_prealloc()
572 for (ceph::unordered_map
<entity_name_t
,Session
*>::iterator p
= session_map
.begin();
573 p
!= session_map
.end();
575 p
->second
->pending_prealloc_inos
.clear();
576 p
->second
->info
.prealloc_inos
.clear();
577 p
->second
->info
.used_inos
.clear();
579 projected
= ++version
;
582 void SessionMap::add_session(Session
*s
)
584 dout(10) << __func__
<< " s=" << s
<< " name=" << s
->info
.inst
.name
<< dendl
;
586 assert(session_map
.count(s
->info
.inst
.name
) == 0);
587 session_map
[s
->info
.inst
.name
] = s
;
588 auto by_state_entry
= by_state
.find(s
->state
);
589 if (by_state_entry
== by_state
.end())
590 by_state_entry
= by_state
.emplace(s
->state
, new xlist
<Session
*>).first
;
591 by_state_entry
->second
->push_back(&s
->item_session_list
);
594 logger
->set(l_mdssm_session_count
, session_map
.size());
595 logger
->inc(l_mdssm_session_add
);
598 void SessionMap::remove_session(Session
*s
)
600 dout(10) << __func__
<< " s=" << s
<< " name=" << s
->info
.inst
.name
<< dendl
;
602 s
->trim_completed_requests(0);
603 s
->item_session_list
.remove_myself();
604 session_map
.erase(s
->info
.inst
.name
);
605 dirty_sessions
.erase(s
->info
.inst
.name
);
606 null_sessions
.insert(s
->info
.inst
.name
);
609 logger
->set(l_mdssm_session_count
, session_map
.size());
610 logger
->inc(l_mdssm_session_remove
);
613 void SessionMap::touch_session(Session
*session
)
615 dout(10) << __func__
<< " s=" << session
<< " name=" << session
->info
.inst
.name
<< dendl
;
617 // Move to the back of the session list for this state (should
618 // already be on a list courtesy of add_session and set_state)
619 assert(session
->item_session_list
.is_on_list());
620 auto by_state_entry
= by_state
.find(session
->state
);
621 if (by_state_entry
== by_state
.end())
622 by_state_entry
= by_state
.emplace(session
->state
,
623 new xlist
<Session
*>).first
;
624 by_state_entry
->second
->push_back(&session
->item_session_list
);
626 session
->last_cap_renew
= ceph_clock_now();
629 void SessionMap::_mark_dirty(Session
*s
)
631 if (dirty_sessions
.count(s
->info
.inst
.name
))
634 if (dirty_sessions
.size() >= g_conf
->mds_sessionmap_keys_per_op
) {
635 // Pre-empt the usual save() call from journal segment trim, in
636 // order to avoid building up an oversized OMAP update operation
637 // from too many sessions modified at once
638 save(new C_MDSInternalNoop
, version
);
641 null_sessions
.erase(s
->info
.inst
.name
);
642 dirty_sessions
.insert(s
->info
.inst
.name
);
645 void SessionMap::mark_dirty(Session
*s
)
647 dout(20) << __func__
<< " s=" << s
<< " name=" << s
->info
.inst
.name
648 << " v=" << version
<< dendl
;
655 void SessionMap::replay_dirty_session(Session
*s
)
657 dout(20) << __func__
<< " s=" << s
<< " name=" << s
->info
.inst
.name
658 << " v=" << version
<< dendl
;
662 replay_advance_version();
665 void SessionMap::replay_advance_version()
671 version_t
SessionMap::mark_projected(Session
*s
)
673 dout(20) << __func__
<< " s=" << s
<< " name=" << s
->info
.inst
.name
674 << " pv=" << projected
<< " -> " << projected
+ 1 << dendl
;
676 s
->push_pv(projected
);
681 class C_IO_SM_Save_One
: public SessionMapIOContext
{
682 MDSInternalContextBase
*on_safe
;
684 C_IO_SM_Save_One(SessionMap
*cm
, MDSInternalContextBase
*on_safe_
)
685 : SessionMapIOContext(cm
), on_safe(on_safe_
) {}
686 void finish(int r
) override
{
688 get_mds()->handle_write_error(r
);
690 on_safe
->complete(r
);
697 void SessionMap::save_if_dirty(const std::set
<entity_name_t
> &tgt_sessions
,
698 MDSGatherBuilder
*gather_bld
)
700 assert(gather_bld
!= NULL
);
702 std::vector
<entity_name_t
> write_sessions
;
704 // Decide which sessions require a write
705 for (std::set
<entity_name_t
>::iterator i
= tgt_sessions
.begin();
706 i
!= tgt_sessions
.end(); ++i
) {
707 const entity_name_t
&session_id
= *i
;
709 if (session_map
.count(session_id
) == 0) {
710 // Session isn't around any more, never mind.
714 Session
*session
= session_map
[session_id
];
715 if (!session
->has_dirty_completed_requests()) {
716 // Session hasn't had completed_requests
717 // modified since last write, no need to
722 if (dirty_sessions
.count(session_id
) > 0) {
723 // Session is already dirtied, will be written, no
724 // need to pre-empt that.
727 // Okay, passed all our checks, now we write
728 // this session out. The version we write
729 // into the OMAP may now be higher-versioned
730 // than the version in the header, but that's
731 // okay because it's never a problem to have
732 // an overly-fresh copy of a session.
733 write_sessions
.push_back(*i
);
736 dout(4) << __func__
<< ": writing " << write_sessions
.size() << dendl
;
738 // Batch writes into mds_sessionmap_keys_per_op
739 const uint32_t kpo
= g_conf
->mds_sessionmap_keys_per_op
;
740 map
<string
, bufferlist
> to_set
;
741 for (uint32_t i
= 0; i
< write_sessions
.size(); ++i
) {
742 // Start a new write transaction?
743 if (i
% g_conf
->mds_sessionmap_keys_per_op
== 0) {
747 const entity_name_t
&session_id
= write_sessions
[i
];
748 Session
*session
= session_map
[session_id
];
749 session
->clear_dirty_completed_requests();
752 std::ostringstream k
;
757 session
->info
.encode(bl
, mds
->mdsmap
->get_up_features());
760 to_set
[k
.str()] = bl
;
762 // Complete this write transaction?
763 if (i
== write_sessions
.size() - 1
764 || i
% kpo
== kpo
- 1) {
769 object_t oid
= get_object_name();
770 object_locator_t
oloc(mds
->mdsmap
->get_metadata_pool());
771 MDSInternalContextBase
*on_safe
= gather_bld
->new_sub();
772 mds
->objecter
->mutate(oid
, oloc
, op
, snapc
,
773 ceph::real_clock::now(),
775 new C_IO_SM_Save_One(this, on_safe
),
785 #define dout_prefix *_dout << "Session "
788 * Calculate the length of the `requests` member list,
789 * because elist does not have a size() method.
791 * O(N) runtime. This would be const, but elist doesn't
792 * have const iterators.
794 size_t Session::get_request_count()
798 elist
<MDRequestImpl
*>::iterator p
= requests
.begin(
799 member_offset(MDRequestImpl
, item_session_request
));
809 * Capped in response to a CEPH_MSG_CLIENT_CAPRELEASE message,
810 * with n_caps equal to the number of caps that were released
811 * in the message. Used to update state about how many caps a
812 * client has released since it was last instructed to RECALL_STATE.
814 void Session::notify_cap_release(size_t n_caps
)
816 if (!recalled_at
.is_zero()) {
817 recall_release_count
+= n_caps
;
818 if (recall_release_count
>= recall_count
)
824 * Called when a CEPH_MSG_CLIENT_SESSION->CEPH_SESSION_RECALL_STATE
825 * message is sent to the client. Update our recall-related state
826 * in order to generate health metrics if the session doesn't see
827 * a commensurate number of calls to ::notify_cap_release
829 void Session::notify_recall_sent(const size_t new_limit
)
831 if (recalled_at
.is_zero()) {
832 // Entering recall phase, set up counters so we can later
833 // judge whether the client has respected the recall request
834 recalled_at
= last_recall_sent
= ceph_clock_now();
835 assert (new_limit
< caps
.size()); // Behaviour of Server::recall_client_state
836 recall_count
= caps
.size() - new_limit
;
837 recall_release_count
= 0;
839 last_recall_sent
= ceph_clock_now();
843 void Session::clear_recalled_at()
845 recalled_at
= last_recall_sent
= utime_t();
847 recall_release_count
= 0;
850 void Session::set_client_metadata(map
<string
, string
> const &meta
)
852 info
.client_metadata
= meta
;
854 _update_human_name();
858 * Use client metadata to generate a somewhat-friendlier
859 * name for the client than its session ID.
861 * This is *not* guaranteed to be unique, and any machine
862 * consumers of session-related output should always use
863 * the session ID as a primary capacity and use this only
864 * as a presentation hint.
866 void Session::_update_human_name()
868 auto info_client_metadata_entry
= info
.client_metadata
.find("hostname");
869 if (info_client_metadata_entry
!= info
.client_metadata
.end()) {
870 // Happy path, refer to clients by hostname
871 human_name
= info_client_metadata_entry
->second
;
872 if (!info
.auth_name
.has_default_id()) {
873 // When a non-default entity ID is set by the user, assume they
874 // would like to see it in references to the client, if it's
875 // reasonable short. Limit the length because we don't want
876 // to put e.g. uuid-generated names into a "human readable"
878 const int arbitrarily_short
= 16;
879 if (info
.auth_name
.get_id().size() < arbitrarily_short
) {
880 human_name
+= std::string(":") + info
.auth_name
.get_id();
884 // Fallback, refer to clients by ID e.g. client.4567
885 human_name
= stringify(info
.inst
.name
.num());
889 void Session::decode(bufferlist::iterator
&p
)
893 _update_human_name();
896 int Session::check_access(CInode
*in
, unsigned mask
,
897 int caller_uid
, int caller_gid
,
898 const vector
<uint64_t> *caller_gid_list
,
899 int new_uid
, int new_gid
)
904 diri
= in
->get_projected_parent_dn()->get_dir()->get_inode();
905 if (diri
&& diri
->is_stray()){
906 path
= in
->get_projected_inode()->stray_prior_path
;
907 dout(20) << __func__
<< " stray_prior_path " << path
<< dendl
;
909 in
->make_path_string(path
, true);
910 dout(20) << __func__
<< " path " << path
<< dendl
;
913 path
= path
.substr(1); // drop leading /
915 if (in
->inode
.is_dir() &&
916 in
->inode
.has_layout() &&
917 in
->inode
.layout
.pool_ns
.length() &&
918 !connection
->has_feature(CEPH_FEATURE_FS_FILE_LAYOUT_V2
)) {
919 dout(10) << __func__
<< " client doesn't support FS_FILE_LAYOUT_V2" << dendl
;
923 if (!auth_caps
.is_capable(path
, in
->inode
.uid
, in
->inode
.gid
, in
->inode
.mode
,
924 caller_uid
, caller_gid
, caller_gid_list
, mask
,
931 int SessionFilter::parse(
932 const std::vector
<std::string
> &args
,
933 std::stringstream
*ss
)
937 for (const auto &s
: args
) {
938 dout(20) << __func__
<< " parsing filter '" << s
<< "'" << dendl
;
940 auto eq
= s
.find("=");
941 if (eq
== std::string::npos
|| eq
== s
.size()) {
942 *ss
<< "Invalid filter '" << s
<< "'";
946 // Keys that start with this are to be taken as referring
947 // to freeform client metadata fields.
948 const std::string
metadata_prefix("client_metadata.");
950 auto k
= s
.substr(0, eq
);
951 auto v
= s
.substr(eq
+ 1);
953 dout(20) << __func__
<< " parsed k='" << k
<< "', v='" << v
<< "'" << dendl
;
955 if (k
.compare(0, metadata_prefix
.size(), metadata_prefix
) == 0
956 && k
.size() > metadata_prefix
.size()) {
957 // Filter on arbitrary metadata key (no fixed schema for this,
958 // so anything after the dot is a valid field to filter on)
959 auto metadata_key
= k
.substr(metadata_prefix
.size());
960 metadata
.insert(std::make_pair(metadata_key
, v
));
961 } else if (k
== "auth_name") {
962 // Filter on client entity name
964 } else if (k
== "state") {
966 } else if (k
== "id") {
968 id
= strict_strtoll(v
.c_str(), 10, &err
);
973 } else if (k
== "reconnecting") {
976 * Strict boolean parser. Allow true/false/0/1.
977 * Anything else is -EINVAL.
979 auto is_true
= [](const std::string
&bstr
, bool *out
) -> bool
981 assert(out
!= nullptr);
983 if (bstr
== "true" || bstr
== "1") {
986 } else if (bstr
== "false" || bstr
== "0") {
995 int r
= is_true(v
, &bval
);
997 set_reconnecting(bval
);
999 *ss
<< "Invalid boolean value '" << v
<< "'";
1003 *ss
<< "Invalid filter key '" << k
<< "'";
1011 bool SessionFilter::match(
1012 const Session
&session
,
1013 std::function
<bool(client_t
)> is_reconnecting
) const
1015 for (const auto &m
: metadata
) {
1016 const auto &k
= m
.first
;
1017 const auto &v
= m
.second
;
1018 if (session
.info
.client_metadata
.count(k
) == 0) {
1021 if (session
.info
.client_metadata
.at(k
) != v
) {
1026 if (!auth_name
.empty() && auth_name
!= session
.info
.auth_name
.get_id()) {
1030 if (!state
.empty() && state
!= session
.get_state_name()) {
1034 if (id
!= 0 && id
!= session
.info
.inst
.name
.num()) {
1038 if (reconnecting
.first
) {
1039 const bool am_reconnecting
= is_reconnecting(session
.info
.inst
.name
.num());
1040 if (reconnecting
.second
!= am_reconnecting
) {
1048 std::ostream
& operator<<(std::ostream
&out
, const Session
&s
)
1050 if (s
.get_human_name() == stringify(s
.info
.inst
.name
.num())) {
1051 out
<< s
.get_human_name();
1053 out
<< s
.get_human_name() << " (" << std::dec
<< s
.info
.inst
.name
.num() << ")";