#include "CInode.h"
#include "Capability.h"
+#include "MDSContext.h"
#include "msg/Message.h"
enum {
STATE_KILLING = 5
};
- const char *get_state_name(int s) const {
+ static std::string_view get_state_name(int s) {
switch (s) {
case STATE_CLOSED: return "closed";
case STATE_OPENING: return "opening";
std::deque<version_t> projected;
// request load average for this session
- mutable DecayCounter load_avg;
- DecayRate load_avg_rate;
+ DecayCounter load_avg;
// Ephemeral state for tracking progress of capability recalls
// caps being recalled recently by this session; used for Beacon warnings
- mutable DecayCounter recall_caps;
+ DecayCounter recall_caps;
// caps that have been released
- mutable DecayCounter release_caps;
+ DecayCounter release_caps;
// throttle on caps recalled
- mutable DecayCounter recall_caps_throttle;
+ DecayCounter recall_caps_throttle;
+ // second order throttle that prevents recalling too quickly
+ DecayCounter recall_caps_throttle2o;
// New limit in SESSION_RECALL
uint32_t recall_limit = 0;
time birth_time;
public:
+ Session *reclaiming_from = nullptr;
void push_pv(version_t pv)
{
- assert(projected.empty() || projected.back() != pv);
+ ceph_assert(projected.empty() || projected.back() != pv);
projected.push_back(pv);
}
void pop_pv(version_t v)
{
- assert(!projected.empty());
- assert(projected.front() == v);
+ ceph_assert(!projected.empty());
+ ceph_assert(projected.front() == v);
projected.pop_front();
}
state_seq++;
}
}
- void decode(bufferlist::iterator &p);
+ void decode(bufferlist::const_iterator &p);
template<typename T>
void set_client_metadata(T&& meta)
{
info.client_metadata = std::forward<T>(meta);
_update_human_name();
}
- std::string get_human_name() const {return human_name;}
+
+ const std::string& get_human_name() const {return human_name;}
session_info_t info; ///< durable bits
MDSAuthCaps auth_caps;
+protected:
ConnectionRef connection;
+public:
+ entity_addr_t socket_addr;
xlist<Session*>::item item_session_list;
- list<Message*> preopen_out_queue; ///< messages for client, queued before they connect
+ list<Message::ref> preopen_out_queue; ///< messages for client, queued before they connect
elist<MDRequestImpl*> requests;
size_t get_request_count();
void notify_cap_release(size_t n_caps);
uint64_t notify_recall_sent(size_t new_limit);
- double get_recall_caps_throttle() const {
- return recall_caps_throttle.get(ceph_clock_now());
+ auto get_recall_caps_throttle() const {
+ return recall_caps_throttle.get();
}
- double get_recall_caps() const {
- return recall_caps.get(ceph_clock_now());
+ auto get_recall_caps_throttle2o() const {
+ return recall_caps_throttle2o.get();
}
- double get_release_caps() const {
- return release_caps.get(ceph_clock_now());
+ auto get_recall_caps() const {
+ return recall_caps.get();
+ }
+ auto get_release_caps() const {
+ return release_caps.get();
}
inodeno_t next_ino() const {
return info.prealloc_inos.range_start();
}
inodeno_t take_ino(inodeno_t ino = 0) {
- assert(!info.prealloc_inos.empty());
+ ceph_assert(!info.prealloc_inos.empty());
if (ino) {
if (info.prealloc_inos.contains(ino))
return info.get_client();
}
- const char *get_state_name() const { return get_state_name(state); }
+ std::string_view get_state_name() const { return get_state_name(state); }
uint64_t get_state_seq() const { return state_seq; }
bool is_closed() const { return state == STATE_CLOSED; }
bool is_opening() const { return state == STATE_OPENING; }
++importing_count;
}
void dec_importing() {
- assert(importing_count > 0);
+ ceph_assert(importing_count > 0);
--importing_count;
}
bool is_importing() const { return importing_count > 0; }
void set_load_avg_decay_rate(double rate) {
- assert(is_open() || is_stale());
- load_avg_rate.set_halflife(rate);
+ ceph_assert(is_open() || is_stale());
+ load_avg = DecayCounter(rate);
}
uint64_t get_load_avg() const {
- return (uint64_t)load_avg.get(ceph_clock_now(), load_avg_rate);
+ return (uint64_t)load_avg.get();
}
void hit_session() {
- load_avg.hit(ceph_clock_now(), load_avg_rate);
+ load_avg.adjust();
}
double get_session_uptime() const {
private:
uint32_t cap_gen = 0;
version_t cap_push_seq = 0; // cap push seq #
- map<version_t, list<MDSInternalContextBase*> > waitfor_flush; // flush session messages
+ map<version_t, MDSContext::vec > waitfor_flush; // flush session messages
public:
xlist<Capability*> caps; // inodes with caps; front=most recently used
xlist<ClientLease*> leases; // metadata leases to clients
- time last_cap_renew = time::min();
- time last_seen = time::min();
+ time last_cap_renew = clock::zero();
+ time last_seen = clock::zero();
void inc_cap_gen() { ++cap_gen; }
uint32_t get_cap_gen() const { return cap_gen; }
version_t inc_push_seq() { return ++cap_push_seq; }
version_t get_push_seq() const { return cap_push_seq; }
- version_t wait_for_flush(MDSInternalContextBase* c) {
+ version_t wait_for_flush(MDSContext* c) {
waitfor_flush[get_push_seq()].push_back(c);
return get_push_seq();
}
- void finish_flush(version_t seq, list<MDSInternalContextBase*>& ls) {
+ void finish_flush(version_t seq, MDSContext::vec& ls) {
while (!waitfor_flush.empty()) {
- if (waitfor_flush.begin()->first > seq)
+ auto it = waitfor_flush.begin();
+ if (it->first > seq)
break;
- ls.splice(ls.end(), waitfor_flush.begin()->second);
- waitfor_flush.erase(waitfor_flush.begin());
+ auto& v = it->second;
+ ls.insert(ls.end(), v.begin(), v.end());
+ waitfor_flush.erase(it);
}
}
void touch_cap(Capability *cap) {
caps.push_front(&cap->item_session_caps);
}
+
void touch_cap_bottom(Capability *cap) {
caps.push_back(&cap->item_session_caps);
}
+
void touch_lease(ClientLease *r) {
leases.push_back(&r->item_session_lease);
}
+ bool is_any_flush_waiter() {
+ return !waitfor_flush.empty();
+ }
+
// -- leases --
uint32_t lease_seq = 0;
Session() = delete;
Session(ConnectionRef con) :
- recall_caps(ceph_clock_now(), g_conf->get_val<double>("mds_recall_warning_decay_rate")),
- release_caps(ceph_clock_now(), g_conf->get_val<double>("mds_recall_warning_decay_rate")),
- recall_caps_throttle(ceph_clock_now(), g_conf->get_val<double>("mds_recall_max_decay_rate")),
+ recall_caps(g_conf().get_val<double>("mds_recall_warning_decay_rate")),
+ release_caps(g_conf().get_val<double>("mds_recall_warning_decay_rate")),
+ recall_caps_throttle(g_conf().get_val<double>("mds_recall_max_decay_rate")),
+ recall_caps_throttle2o(0.5),
birth_time(clock::now()),
auth_caps(g_ceph_context),
item_session_list(this),
requests(0) // member_offset passed to front() manually
{
- connection = std::move(con);
+ set_connection(std::move(con));
}
~Session() override {
if (state == STATE_CLOSED) {
item_session_list.remove_myself();
} else {
- assert(!item_session_list.is_on_list());
+ ceph_assert(!item_session_list.is_on_list());
}
- while (!preopen_out_queue.empty()) {
- preopen_out_queue.front()->put();
- preopen_out_queue.pop_front();
+ preopen_out_queue.clear();
+ }
+
+ void set_connection(ConnectionRef con) {
+ connection = std::move(con);
+ if (connection) {
+ socket_addr = connection->get_peer_socket_addr();
}
}
+ const ConnectionRef& get_connection() const {
+ return connection;
+ }
void clear() {
pending_prealloc_inos.clear();
info.clear_meta();
cap_push_seq = 0;
- last_cap_renew = time::min();
+ last_cap_renew = clock::zero();
}
};
// total request load avg
double decay_rate;
DecayCounter total_load_avg;
- DecayRate total_load_avg_rate;
public:
mds_rank_t rank;
virtual void encode_header(bufferlist *header_bl);
virtual void decode_header(bufferlist &header_bl);
virtual void decode_values(std::map<std::string, bufferlist> &session_vals);
- virtual void decode_legacy(bufferlist::iterator& blp);
+ virtual void decode_legacy(bufferlist::const_iterator& blp);
void dump(Formatter *f) const;
void set_rank(mds_rank_t r)
SessionMapStore()
: version(0), logger(nullptr),
- decay_rate(g_conf->get_val<double>("mds_request_load_average_decay_rate")),
- total_load_avg_rate(decay_rate), rank(MDS_RANK_NONE) {
+ decay_rate(g_conf().get_val<double>("mds_request_load_average_decay_rate")),
+ total_load_avg(decay_rate), rank(MDS_RANK_NONE) {
}
virtual ~SessionMapStore() {};
};
public:
map<int,xlist<Session*>* > by_state;
uint64_t set_state(Session *session, int state);
- map<version_t, list<MDSInternalContextBase*> > commit_waiters;
+ map<version_t, MDSContext::vec > commit_waiters;
void update_average_session_age();
SessionMap() = delete;
}
// sessions
- void decode_legacy(bufferlist::iterator& blp) override;
+ void decode_legacy(bufferlist::const_iterator& blp) override;
bool empty() const { return session_map.empty(); }
- const ceph::unordered_map<entity_name_t, Session*> &get_sessions() const
+ const ceph::unordered_map<entity_name_t, Session*>& get_sessions() const
{
return session_map;
}
}
template<typename C>
void get_client_session_set(C& c) const {
- auto f = [&c](Session* s) {
+ auto f = [&c](auto& s) {
c.insert(s);
};
get_client_sessions(f);
}
- void replay_open_sessions(map<client_t,entity_inst_t>& client_map) {
- for (map<client_t,entity_inst_t>::iterator p = client_map.begin();
- p != client_map.end();
- ++p) {
- Session *s = get_or_add_session(p->second);
- set_state(s, Session::STATE_OPEN);
- replay_dirty_session(s);
- }
- }
-
// helpers
entity_inst_t& get_inst(entity_name_t w) {
- assert(session_map.count(w));
+ ceph_assert(session_map.count(w));
return session_map[w]->info.inst;
}
- version_t inc_push_seq(client_t client) {
- return get_session(entity_name_t::CLIENT(client.v))->inc_push_seq();
- }
version_t get_push_seq(client_t client) {
return get_session(entity_name_t::CLIENT(client.v))->get_push_seq();
}
}
void trim_completed_requests(entity_name_t c, ceph_tid_t tid) {
Session *session = get_session(c);
- assert(session);
+ ceph_assert(session);
session->trim_completed_requests(tid);
}
// -- loading, saving --
inodeno_t ino;
- list<MDSInternalContextBase*> waiting_for_load;
+ MDSContext::vec waiting_for_load;
object_t get_object_name() const;
- void load(MDSInternalContextBase *onload);
+ void load(MDSContext *onload);
void _load_finish(
int operation_r,
int header_r,
void load_legacy();
void _load_legacy_finish(int r, bufferlist &bl);
- void save(MDSInternalContextBase *onsave, version_t needv=0);
+ void save(MDSContext *onsave, version_t needv=0);
void _save_finish(version_t v);
protected:
std::set<entity_name_t> dirty_sessions;
std::set<entity_name_t> null_sessions;
bool loaded_legacy = false;
- void _mark_dirty(Session *session);
+ void _mark_dirty(Session *session, bool may_save);
public:
/**
* to the backing store. Must have called
* mark_projected previously for this session.
*/
- void mark_dirty(Session *session);
+ void mark_dirty(Session *session, bool may_save=true);
/**
* Advance the projected version, and mark this
*/
void replay_advance_version();
+ /**
+ * During replay, open sessions, advance versions and
+ * mark these sessions as dirty.
+ */
+ void replay_open_sessions(version_t event_cmapv,
+ map<client_t,entity_inst_t>& client_map,
+ map<client_t,client_metadata_t>& client_metadata_map);
+
/**
* For these session IDs, if a session exists with this ID, and it has
* dirty completed_requests, then persist it immediately
MDSGatherBuilder *gather_bld);
private:
- time avg_birth_time = time::min();
+ time avg_birth_time = clock::zero();
uint64_t get_session_count_in_state(int state) {
return !is_any_state(state) ? 0 : by_state[state]->size();
time birth_time = s.get_birth_time();
if (sessions == 1) {
- avg_birth_time = added ? birth_time : time::min();
+ avg_birth_time = added ? birth_time : clock::zero();
return;
}
if (added) {
avg_birth_time = clock::time_point(
- ((avg_birth_time - time::min()) / sessions) * (sessions - 1) +
- (birth_time - time::min()) / sessions);
+ ((avg_birth_time - clock::zero()) / sessions) * (sessions - 1) +
+ (birth_time - clock::zero()) / sessions);
} else {
avg_birth_time = clock::time_point(
- ((avg_birth_time - time::min()) / (sessions - 1)) * sessions -
- (birth_time - time::min()) / (sessions - 1));
+ ((avg_birth_time - clock::zero()) / (sessions - 1)) * sessions -
+ (birth_time - clock::zero()) / (sessions - 1));
}
}
public:
void hit_session(Session *session);
- void handle_conf_change(const struct md_config_t *conf,
+ void handle_conf_change(const ConfigProxy &conf,
const std::set <std::string> &changed);
};