#include "mdstypes.h"
#include "mds/MDSAuthCaps.h"
#include "common/perf_counters.h"
+#include "common/DecayCounter.h"
class CInode;
struct MDRequestImpl;
#include "CInode.h"
#include "Capability.h"
+#include "MDSContext.h"
#include "msg/Message.h"
enum {
l_mdssm_session_count,
l_mdssm_session_add,
l_mdssm_session_remove,
+ l_mdssm_session_open,
+ l_mdssm_session_stale,
+ l_mdssm_total_load,
+ l_mdssm_avg_load,
+ l_mdssm_avg_session_uptime,
l_mdssm_last,
};
+ additional dimension of 'importing' (with counter)
*/
+
+ using clock = ceph::coarse_mono_clock;
+ using time = ceph::coarse_mono_time;
+
+
enum {
STATE_CLOSED = 0,
STATE_OPENING = 1, // journaling open
STATE_KILLING = 5
};
- const char *get_state_name(int s) const {
+ static std::string_view get_state_name(int s) {
switch (s) {
case STATE_CLOSED: return "closed";
case STATE_OPENING: return "opening";
}
private:
- int state;
- uint64_t state_seq;
- int importing_count;
+ int state = STATE_CLOSED;
+ uint64_t state_seq = 0;
+ int importing_count = 0;
friend class SessionMap;
// Human (friendly) name is soft state generated from client metadata
// that appropriate mark_dirty calls follow.
std::deque<version_t> projected;
+ // request load average for this session
+ DecayCounter load_avg;
+ // Ephemeral state for tracking progress of capability recalls
+ // caps being recalled recently by this session; used for Beacon warnings
+ DecayCounter recall_caps;
+ // caps that have been released
+ DecayCounter release_caps;
+ // throttle on caps recalled
+ DecayCounter recall_caps_throttle;
+ // second order throttle that prevents recalling too quickly
+ DecayCounter recall_caps_throttle2o;
+ // New limit in SESSION_RECALL
+ uint32_t recall_limit = 0;
+
+ // session start time -- used to track average session time
+ // note that this is initialized in the constructor rather
+ // than at the time of adding a session to the sessionmap
+ // as journal replay of sessionmap will not call add_session().
+ time birth_time;
public:
+ Session *reclaiming_from = nullptr;
void push_pv(version_t pv)
{
- assert(projected.empty() || projected.back() != pv);
+ ceph_assert(projected.empty() || projected.back() != pv);
projected.push_back(pv);
}
void pop_pv(version_t v)
{
- assert(!projected.empty());
- assert(projected.front() == v);
+ ceph_assert(!projected.empty());
+ ceph_assert(projected.front() == v);
projected.pop_front();
}
state_seq++;
}
}
- void decode(bufferlist::iterator &p);
- void set_client_metadata(std::map<std::string, std::string> const &meta);
- std::string get_human_name() const {return human_name;}
+ void decode(bufferlist::const_iterator &p);
+ template<typename T>
+ void set_client_metadata(T&& meta)
+ {
+ info.client_metadata = std::forward<T>(meta);
+ _update_human_name();
+ }
- // Ephemeral state for tracking progress of capability recalls
- utime_t recalled_at; // When was I asked to SESSION_RECALL?
- utime_t last_recall_sent;
- uint32_t recall_count; // How many caps was I asked to SESSION_RECALL?
- uint32_t recall_release_count; // How many caps have I actually revoked?
+ const std::string& get_human_name() const {return human_name;}
session_info_t info; ///< durable bits
MDSAuthCaps auth_caps;
+protected:
ConnectionRef connection;
+public:
+ entity_addr_t socket_addr;
xlist<Session*>::item item_session_list;
- list<Message*> preopen_out_queue; ///< messages for client, queued before they connect
+ list<Message::ref> preopen_out_queue; ///< messages for client, queued before they connect
elist<MDRequestImpl*> requests;
size_t get_request_count();
interval_set<inodeno_t> pending_prealloc_inos; // journaling prealloc, will be added to prealloc_inos
void notify_cap_release(size_t n_caps);
- void notify_recall_sent(const int new_limit);
- void clear_recalled_at();
+ uint64_t notify_recall_sent(size_t new_limit);
+ auto get_recall_caps_throttle() const {
+ return recall_caps_throttle.get();
+ }
+ auto get_recall_caps_throttle2o() const {
+ return recall_caps_throttle2o.get();
+ }
+ auto get_recall_caps() const {
+ return recall_caps.get();
+ }
+ auto get_release_caps() const {
+ return release_caps.get();
+ }
inodeno_t next_ino() const {
if (info.prealloc_inos.empty())
return info.prealloc_inos.range_start();
}
inodeno_t take_ino(inodeno_t ino = 0) {
- assert(!info.prealloc_inos.empty());
+ ceph_assert(!info.prealloc_inos.empty());
if (ino) {
if (info.prealloc_inos.contains(ino))
return info.get_client();
}
- const char *get_state_name() const { return get_state_name(state); }
+ std::string_view get_state_name() const { return get_state_name(state); }
uint64_t get_state_seq() const { return state_seq; }
bool is_closed() const { return state == STATE_CLOSED; }
bool is_opening() const { return state == STATE_OPENING; }
++importing_count;
}
void dec_importing() {
- assert(importing_count > 0);
+ ceph_assert(importing_count > 0);
--importing_count;
}
bool is_importing() const { return importing_count > 0; }
+ void set_load_avg_decay_rate(double rate) {
+ ceph_assert(is_open() || is_stale());
+ load_avg = DecayCounter(rate);
+ }
+ uint64_t get_load_avg() const {
+ return (uint64_t)load_avg.get();
+ }
+ void hit_session() {
+ load_avg.adjust();
+ }
+
+ double get_session_uptime() const {
+ chrono::duration<double> uptime = clock::now() - birth_time;
+ return uptime.count();
+ }
+
+ time get_birth_time() const {
+ return birth_time;
+ }
+
// -- caps --
private:
- version_t cap_push_seq; // cap push seq #
- map<version_t, list<MDSInternalContextBase*> > waitfor_flush; // flush session messages
+ uint32_t cap_gen = 0;
+ version_t cap_push_seq = 0; // cap push seq #
+ map<version_t, MDSContext::vec > waitfor_flush; // flush session messages
public:
xlist<Capability*> caps; // inodes with caps; front=most recently used
xlist<ClientLease*> leases; // metadata leases to clients
- utime_t last_cap_renew;
+ time last_cap_renew = clock::zero();
+ time last_seen = clock::zero();
+
+ void inc_cap_gen() { ++cap_gen; }
+ uint32_t get_cap_gen() const { return cap_gen; }
-public:
version_t inc_push_seq() { return ++cap_push_seq; }
version_t get_push_seq() const { return cap_push_seq; }
- version_t wait_for_flush(MDSInternalContextBase* c) {
+ version_t wait_for_flush(MDSContext* c) {
waitfor_flush[get_push_seq()].push_back(c);
return get_push_seq();
}
- void finish_flush(version_t seq, list<MDSInternalContextBase*>& ls) {
+ void finish_flush(version_t seq, MDSContext::vec& ls) {
while (!waitfor_flush.empty()) {
- if (waitfor_flush.begin()->first > seq)
+ auto it = waitfor_flush.begin();
+ if (it->first > seq)
break;
- ls.splice(ls.end(), waitfor_flush.begin()->second);
- waitfor_flush.erase(waitfor_flush.begin());
+ auto& v = it->second;
+ ls.insert(ls.end(), v.begin(), v.end());
+ waitfor_flush.erase(it);
}
}
- void add_cap(Capability *cap) {
+ void touch_cap(Capability *cap) {
+ caps.push_front(&cap->item_session_caps);
+ }
+
+ void touch_cap_bottom(Capability *cap) {
caps.push_back(&cap->item_session_caps);
}
+
void touch_lease(ClientLease *r) {
leases.push_back(&r->item_session_lease);
}
+ bool is_any_flush_waiter() {
+ return !waitfor_flush.empty();
+ }
+
// -- leases --
- uint32_t lease_seq;
+ uint32_t lease_seq = 0;
// -- completed requests --
private:
// Has completed_requests been modified since the last time we
// wrote this session out?
- bool completed_requests_dirty;
+ bool completed_requests_dirty = false;
- unsigned num_trim_flushes_warnings;
- unsigned num_trim_requests_warnings;
+ unsigned num_trim_flushes_warnings = 0;
+ unsigned num_trim_requests_warnings = 0;
public:
void add_completed_request(ceph_tid_t t, inodeno_t created) {
info.completed_requests[t] = created;
int check_access(CInode *in, unsigned mask, int caller_uid, int caller_gid,
const vector<uint64_t> *gid_list, int new_uid, int new_gid);
-
- Session() :
- state(STATE_CLOSED), state_seq(0), importing_count(0),
- recall_count(0), recall_release_count(0),
+ Session() = delete;
+ Session(ConnectionRef con) :
+ recall_caps(g_conf().get_val<double>("mds_recall_warning_decay_rate")),
+ release_caps(g_conf().get_val<double>("mds_recall_warning_decay_rate")),
+ recall_caps_throttle(g_conf().get_val<double>("mds_recall_max_decay_rate")),
+ recall_caps_throttle2o(0.5),
+ birth_time(clock::now()),
auth_caps(g_ceph_context),
- connection(NULL), item_session_list(this),
- requests(0), // member_offset passed to front() manually
- cap_push_seq(0),
- lease_seq(0),
- completed_requests_dirty(false),
- num_trim_flushes_warnings(0),
- num_trim_requests_warnings(0) { }
+ item_session_list(this),
+ requests(0) // member_offset passed to front() manually
+ {
+ set_connection(std::move(con));
+ }
~Session() override {
- assert(!item_session_list.is_on_list());
- while (!preopen_out_queue.empty()) {
- preopen_out_queue.front()->put();
- preopen_out_queue.pop_front();
+ if (state == STATE_CLOSED) {
+ item_session_list.remove_myself();
+ } else {
+ ceph_assert(!item_session_list.is_on_list());
+ }
+ preopen_out_queue.clear();
+ }
+
+ void set_connection(ConnectionRef con) {
+ connection = std::move(con);
+ if (connection) {
+ socket_addr = connection->get_peer_socket_addr();
}
}
+ const ConnectionRef& get_connection() const {
+ return connection;
+ }
void clear() {
pending_prealloc_inos.clear();
info.clear_meta();
cap_push_seq = 0;
- last_cap_renew = utime_t();
-
+ last_cap_renew = clock::zero();
}
};
* encode/decode outside of live MDS instance.
*/
class SessionMapStore {
+public:
+ using clock = Session::clock;
+ using time = Session::time;
+
protected:
version_t version;
ceph::unordered_map<entity_name_t, Session*> session_map;
PerfCounters *logger;
+
+ // total request load avg
+ double decay_rate;
+ DecayCounter total_load_avg;
+
public:
mds_rank_t rank;
virtual void encode_header(bufferlist *header_bl);
virtual void decode_header(bufferlist &header_bl);
virtual void decode_values(std::map<std::string, bufferlist> &session_vals);
- virtual void decode_legacy(bufferlist::iterator& blp);
+ virtual void decode_legacy(bufferlist::const_iterator& blp);
void dump(Formatter *f) const;
void set_rank(mds_rank_t r)
if (session_map_entry != session_map.end()) {
s = session_map_entry->second;
} else {
- s = session_map[i.name] = new Session;
+ s = session_map[i.name] = new Session(ConnectionRef());
s->info.inst = i;
- s->last_cap_renew = ceph_clock_now();
+ s->last_cap_renew = Session::clock::now();
if (logger) {
logger->set(l_mdssm_session_count, session_map.size());
logger->inc(l_mdssm_session_add);
session_map.clear();
}
- SessionMapStore() : version(0), logger(nullptr), rank(MDS_RANK_NONE) {}
+ SessionMapStore()
+ : version(0), logger(nullptr),
+ decay_rate(g_conf().get_val<double>("mds_request_load_average_decay_rate")),
+ total_load_avg(decay_rate), rank(MDS_RANK_NONE) {
+ }
virtual ~SessionMapStore() {};
};
MDSRank *mds;
protected:
- version_t projected, committing, committed;
+ version_t projected = 0, committing = 0, committed = 0;
public:
map<int,xlist<Session*>* > by_state;
uint64_t set_state(Session *session, int state);
- map<version_t, list<MDSInternalContextBase*> > commit_waiters;
+ map<version_t, MDSContext::vec > commit_waiters;
+ void update_average_session_age();
- explicit SessionMap(MDSRank *m) : mds(m),
- projected(0), committing(0), committed(0),
- loaded_legacy(false)
- { }
+ SessionMap() = delete;
+ explicit SessionMap(MDSRank *m) : mds(m) {}
~SessionMap() override
{
}
// sessions
- void decode_legacy(bufferlist::iterator& blp) override;
+ void decode_legacy(bufferlist::const_iterator& blp) override;
bool empty() const { return session_map.empty(); }
- const ceph::unordered_map<entity_name_t, Session*> &get_sessions() const
+ const ceph::unordered_map<entity_name_t, Session*>& get_sessions() const
{
return session_map;
}
void dump();
- void get_client_set(set<client_t>& s) {
- for (ceph::unordered_map<entity_name_t,Session*>::iterator p = session_map.begin();
- p != session_map.end();
- ++p)
- if (p->second->info.inst.name.is_client())
- s.insert(p->second->info.inst.name.num());
- }
- void get_client_session_set(set<Session*>& s) const {
- for (ceph::unordered_map<entity_name_t,Session*>::const_iterator p = session_map.begin();
- p != session_map.end();
- ++p)
- if (p->second->info.inst.name.is_client())
- s.insert(p->second);
- }
-
- void open_sessions(map<client_t,entity_inst_t>& client_map) {
- for (map<client_t,entity_inst_t>::iterator p = client_map.begin();
- p != client_map.end();
- ++p) {
- Session *s = get_or_add_session(p->second);
- set_state(s, Session::STATE_OPEN);
- version++;
+ template<typename F>
+ void get_client_sessions(F&& f) const {
+ for (const auto& p : session_map) {
+ auto& session = p.second;
+ if (session->info.inst.name.is_client())
+ f(session);
}
}
+ template<typename C>
+ void get_client_session_set(C& c) const {
+ auto f = [&c](auto& s) {
+ c.insert(s);
+ };
+ get_client_sessions(f);
+ }
// helpers
entity_inst_t& get_inst(entity_name_t w) {
- assert(session_map.count(w));
+ ceph_assert(session_map.count(w));
return session_map[w]->info.inst;
}
- version_t inc_push_seq(client_t client) {
- return get_session(entity_name_t::CLIENT(client.v))->inc_push_seq();
- }
version_t get_push_seq(client_t client) {
return get_session(entity_name_t::CLIENT(client.v))->get_push_seq();
}
}
void trim_completed_requests(entity_name_t c, ceph_tid_t tid) {
Session *session = get_session(c);
- assert(session);
+ ceph_assert(session);
session->trim_completed_requests(tid);
}
// -- loading, saving --
inodeno_t ino;
- list<MDSInternalContextBase*> waiting_for_load;
+ MDSContext::vec waiting_for_load;
object_t get_object_name() const;
- void load(MDSInternalContextBase *onload);
+ void load(MDSContext *onload);
void _load_finish(
int operation_r,
int header_r,
void load_legacy();
void _load_legacy_finish(int r, bufferlist &bl);
- void save(MDSInternalContextBase *onsave, version_t needv=0);
+ void save(MDSContext *onsave, version_t needv=0);
void _save_finish(version_t v);
protected:
std::set<entity_name_t> dirty_sessions;
std::set<entity_name_t> null_sessions;
- bool loaded_legacy;
- void _mark_dirty(Session *session);
+ bool loaded_legacy = false;
+ void _mark_dirty(Session *session, bool may_save);
public:
/**
* to the backing store. Must have called
* mark_projected previously for this session.
*/
- void mark_dirty(Session *session);
+ void mark_dirty(Session *session, bool may_save=true);
/**
* Advance the projected version, and mark this
*/
void replay_advance_version();
+ /**
+ * During replay, open sessions, advance versions and
+ * mark these sessions as dirty.
+ */
+ void replay_open_sessions(version_t event_cmapv,
+ map<client_t,entity_inst_t>& client_map,
+ map<client_t,client_metadata_t>& client_metadata_map);
+
/**
* For these session IDs, if a session exists with this ID, and it has
* dirty completed_requests, then persist it immediately
*/
void save_if_dirty(const std::set<entity_name_t> &tgt_sessions,
MDSGatherBuilder *gather_bld);
+
+private:
+ time avg_birth_time = clock::zero();
+
+ uint64_t get_session_count_in_state(int state) {
+ return !is_any_state(state) ? 0 : by_state[state]->size();
+ }
+
+ void update_average_birth_time(const Session &s, bool added=true) {
+ uint32_t sessions = session_map.size();
+ time birth_time = s.get_birth_time();
+
+ if (sessions == 1) {
+ avg_birth_time = added ? birth_time : clock::zero();
+ return;
+ }
+
+ if (added) {
+ avg_birth_time = clock::time_point(
+ ((avg_birth_time - clock::zero()) / sessions) * (sessions - 1) +
+ (birth_time - clock::zero()) / sessions);
+ } else {
+ avg_birth_time = clock::time_point(
+ ((avg_birth_time - clock::zero()) / (sessions - 1)) * sessions -
+ (birth_time - clock::zero()) / (sessions - 1));
+ }
+ }
+
+public:
+ void hit_session(Session *session);
+ void handle_conf_change(const ConfigProxy &conf,
+ const std::set <std::string> &changed);
};
std::ostream& operator<<(std::ostream &out, const Session &s);