]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mds/SessionMap.h
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / mds / SessionMap.h
index 50ffde9cc04f3477dacca53705f2c890307b001c..8d3c9cc3eea051d005333b7f94d03d2edb54476b 100644 (file)
@@ -27,12 +27,14 @@ using std::set;
 #include "mdstypes.h"
 #include "mds/MDSAuthCaps.h"
 #include "common/perf_counters.h"
+#include "common/DecayCounter.h"
 
 class CInode;
 struct MDRequestImpl;
 
 #include "CInode.h"
 #include "Capability.h"
+#include "MDSContext.h"
 #include "msg/Message.h"
 
 enum {
@@ -40,6 +42,11 @@ enum {
   l_mdssm_session_count,
   l_mdssm_session_add,
   l_mdssm_session_remove,
+  l_mdssm_session_open,
+  l_mdssm_session_stale,
+  l_mdssm_total_load,
+  l_mdssm_avg_load,
+  l_mdssm_avg_session_uptime,
   l_mdssm_last,
 };
 
@@ -63,6 +70,11 @@ public:
     + additional dimension of 'importing' (with counter)
 
   */
+
+  using clock = ceph::coarse_mono_clock;
+  using time = ceph::coarse_mono_time;
+
+
   enum {
     STATE_CLOSED = 0,
     STATE_OPENING = 1,   // journaling open
@@ -72,7 +84,7 @@ public:
     STATE_KILLING = 5
   };
 
-  const char *get_state_name(int s) const {
+  static std::string_view get_state_name(int s) {
     switch (s) {
     case STATE_CLOSED: return "closed";
     case STATE_OPENING: return "opening";
@@ -85,9 +97,9 @@ public:
   }
 
 private:
-  int state;
-  uint64_t state_seq;
-  int importing_count;
+  int state = STATE_CLOSED;
+  uint64_t state_seq = 0;
+  int importing_count = 0;
   friend class SessionMap;
 
   // Human (friendly) name is soft state generated from client metadata
@@ -98,20 +110,40 @@ private:
   // that appropriate mark_dirty calls follow.
   std::deque<version_t> projected;
 
+  // request load average for this session
+  DecayCounter load_avg;
 
+  // Ephemeral state for tracking progress of capability recalls
+  // caps being recalled recently by this session; used for Beacon warnings
+  DecayCounter recall_caps;
+  // caps that have been released
+  DecayCounter release_caps;
+  // throttle on caps recalled
+  DecayCounter recall_caps_throttle;
+  // second order throttle that prevents recalling too quickly
+  DecayCounter recall_caps_throttle2o;
+  // New limit in SESSION_RECALL
+  uint32_t recall_limit = 0;
+
+  // session start time -- used to track average session time
+  // note that this is initialized in the constructor rather
+  // than at the time of adding a session to the sessionmap
+  // as journal replay of sessionmap will not call add_session().
+  time birth_time;
 
 public:
+  Session *reclaiming_from = nullptr;
 
   void push_pv(version_t pv)
   {
-    assert(projected.empty() || projected.back() != pv);
+    ceph_assert(projected.empty() || projected.back() != pv);
     projected.push_back(pv);
   }
 
   void pop_pv(version_t v)
   {
-    assert(!projected.empty());
-    assert(projected.front() == v);
+    ceph_assert(!projected.empty());
+    ceph_assert(projected.front() == v);
     projected.pop_front();
   }
 
@@ -123,24 +155,27 @@ public:
       state_seq++;
     }
   }
-  void decode(bufferlist::iterator &p);
-  void set_client_metadata(std::map<std::string, std::string> const &meta);
-  std::string get_human_name() const {return human_name;}
+  void decode(bufferlist::const_iterator &p);
+  template<typename T>
+  void set_client_metadata(T&& meta)
+  {
+    info.client_metadata = std::forward<T>(meta);
+    _update_human_name();
+  }
 
-  // Ephemeral state for tracking progress of capability recalls
-  utime_t recalled_at;  // When was I asked to SESSION_RECALL?
-  utime_t last_recall_sent;
-  uint32_t recall_count;  // How many caps was I asked to SESSION_RECALL?
-  uint32_t recall_release_count;  // How many caps have I actually revoked?
+  const std::string& get_human_name() const {return human_name;}
 
   session_info_t info;                         ///< durable bits
 
   MDSAuthCaps auth_caps;
 
+protected:
   ConnectionRef connection;
+public:
+  entity_addr_t socket_addr;
   xlist<Session*>::item item_session_list;
 
-  list<Message*> preopen_out_queue;  ///< messages for client, queued before they connect
+  list<Message::ref> preopen_out_queue;  ///< messages for client, queued before they connect
 
   elist<MDRequestImpl*> requests;
   size_t get_request_count();
@@ -148,8 +183,19 @@ public:
   interval_set<inodeno_t> pending_prealloc_inos; // journaling prealloc, will be added to prealloc_inos
 
   void notify_cap_release(size_t n_caps);
-  void notify_recall_sent(const size_t new_limit);
-  void clear_recalled_at();
+  uint64_t notify_recall_sent(size_t new_limit);
+  auto get_recall_caps_throttle() const {
+    return recall_caps_throttle.get();
+  }
+  auto get_recall_caps_throttle2o() const {
+    return recall_caps_throttle2o.get();
+  }
+  auto get_recall_caps() const {
+    return recall_caps.get();
+  }
+  auto get_release_caps() const {
+    return release_caps.get();
+  }
 
   inodeno_t next_ino() const {
     if (info.prealloc_inos.empty())
@@ -157,7 +203,7 @@ public:
     return info.prealloc_inos.range_start();
   }
   inodeno_t take_ino(inodeno_t ino = 0) {
-    assert(!info.prealloc_inos.empty());
+    ceph_assert(!info.prealloc_inos.empty());
 
     if (ino) {
       if (info.prealloc_inos.contains(ino))
@@ -180,7 +226,7 @@ public:
     return info.get_client();
   }
 
-  const char *get_state_name() const { return get_state_name(state); }
+  std::string_view get_state_name() const { return get_state_name(state); }
   uint64_t get_state_seq() const { return state_seq; }
   bool is_closed() const { return state == STATE_CLOSED; }
   bool is_opening() const { return state == STATE_OPENING; }
@@ -193,56 +239,91 @@ public:
     ++importing_count;
   }
   void dec_importing() {
-    assert(importing_count > 0);
+    ceph_assert(importing_count > 0);
     --importing_count;
   }
   bool is_importing() const { return importing_count > 0; }
 
+  void set_load_avg_decay_rate(double rate) {
+    ceph_assert(is_open() || is_stale());
+    load_avg = DecayCounter(rate);
+  }
+  uint64_t get_load_avg() const {
+    return (uint64_t)load_avg.get();
+  }
+  void hit_session() {
+    load_avg.adjust();
+  }
+
+  double get_session_uptime() const {
+    chrono::duration<double> uptime = clock::now() - birth_time;
+    return uptime.count();
+  }
+
+  time get_birth_time() const {
+    return birth_time;
+  }
+
   // -- caps --
 private:
-  version_t cap_push_seq;        // cap push seq #
-  map<version_t, list<MDSInternalContextBase*> > waitfor_flush; // flush session messages
+  uint32_t cap_gen = 0;
+  version_t cap_push_seq = 0;        // cap push seq #
+  map<version_t, MDSContext::vec > waitfor_flush; // flush session messages
 
 public:
   xlist<Capability*> caps;     // inodes with caps; front=most recently used
   xlist<ClientLease*> leases;  // metadata leases to clients
-  utime_t last_cap_renew;
+  time last_cap_renew = clock::zero();
+  time last_seen = clock::zero();
+
+  void inc_cap_gen() { ++cap_gen; }
+  uint32_t get_cap_gen() const { return cap_gen; }
 
-public:
   version_t inc_push_seq() { return ++cap_push_seq; }
   version_t get_push_seq() const { return cap_push_seq; }
 
-  version_t wait_for_flush(MDSInternalContextBase* c) {
+  version_t wait_for_flush(MDSContext* c) {
     waitfor_flush[get_push_seq()].push_back(c);
     return get_push_seq();
   }
-  void finish_flush(version_t seq, list<MDSInternalContextBase*>& ls) {
+  void finish_flush(version_t seq, MDSContext::vec& ls) {
     while (!waitfor_flush.empty()) {
-      if (waitfor_flush.begin()->first > seq)
+      auto it = waitfor_flush.begin();
+      if (it->first > seq)
        break;
-      ls.splice(ls.end(), waitfor_flush.begin()->second);
-      waitfor_flush.erase(waitfor_flush.begin());
+      auto& v = it->second;
+      ls.insert(ls.end(), v.begin(), v.end());
+      waitfor_flush.erase(it);
     }
   }
 
-  void add_cap(Capability *cap) {
+  void touch_cap(Capability *cap) {
+    caps.push_front(&cap->item_session_caps);
+  }
+
+  void touch_cap_bottom(Capability *cap) {
     caps.push_back(&cap->item_session_caps);
   }
+
   void touch_lease(ClientLease *r) {
     leases.push_back(&r->item_session_lease);
   }
 
+  bool is_any_flush_waiter() {
+    return !waitfor_flush.empty();
+  }
+
   // -- leases --
-  uint32_t lease_seq;
+  uint32_t lease_seq = 0;
 
   // -- completed requests --
 private:
   // Has completed_requests been modified since the last time we
   // wrote this session out?
-  bool completed_requests_dirty;
+  bool completed_requests_dirty = false;
 
-  unsigned num_trim_flushes_warnings;
-  unsigned num_trim_requests_warnings;
+  unsigned num_trim_flushes_warnings = 0;
+  unsigned num_trim_requests_warnings = 0;
 public:
   void add_completed_request(ceph_tid_t t, inodeno_t created) {
     info.completed_requests[t] = created;
@@ -317,33 +398,44 @@ public:
   int check_access(CInode *in, unsigned mask, int caller_uid, int caller_gid,
                   const vector<uint64_t> *gid_list, int new_uid, int new_gid);
 
-
-  Session() : 
-    state(STATE_CLOSED), state_seq(0), importing_count(0),
-    recall_count(0), recall_release_count(0),
+  Session() = delete;
+  Session(ConnectionRef con) :
+    recall_caps(g_conf().get_val<double>("mds_recall_warning_decay_rate")),
+    release_caps(g_conf().get_val<double>("mds_recall_warning_decay_rate")),
+    recall_caps_throttle(g_conf().get_val<double>("mds_recall_max_decay_rate")),
+    recall_caps_throttle2o(0.5),
+    birth_time(clock::now()),
     auth_caps(g_ceph_context),
-    connection(NULL), item_session_list(this),
-    requests(0),  // member_offset passed to front() manually
-    cap_push_seq(0),
-    lease_seq(0),
-    completed_requests_dirty(false),
-    num_trim_flushes_warnings(0),
-    num_trim_requests_warnings(0) { }
+    item_session_list(this),
+    requests(0)  // member_offset passed to front() manually
+  {
+    set_connection(std::move(con));
+  }
   ~Session() override {
-    assert(!item_session_list.is_on_list());
-    while (!preopen_out_queue.empty()) {
-      preopen_out_queue.front()->put();
-      preopen_out_queue.pop_front();
+    if (state == STATE_CLOSED) {
+      item_session_list.remove_myself();
+    } else {
+      ceph_assert(!item_session_list.is_on_list());
+    }
+    preopen_out_queue.clear();
+  }
+
+  void set_connection(ConnectionRef con) {
+    connection = std::move(con);
+    if (connection) {
+      socket_addr = connection->get_peer_socket_addr();
     }
   }
+  const ConnectionRef& get_connection() const {
+    return connection;
+  }
 
   void clear() {
     pending_prealloc_inos.clear();
     info.clear_meta();
 
     cap_push_seq = 0;
-    last_cap_renew = utime_t();
-
+    last_cap_renew = clock::zero();
   }
 };
 
@@ -385,10 +477,19 @@ class MDSRank;
  * encode/decode outside of live MDS instance.
  */
 class SessionMapStore {
+public:
+  using clock = Session::clock;
+  using time = Session::time;
+
 protected:
   version_t version;
   ceph::unordered_map<entity_name_t, Session*> session_map;
   PerfCounters *logger;
+
+  // total request load avg
+  double decay_rate;
+  DecayCounter total_load_avg;
+
 public:
   mds_rank_t rank;
 
@@ -397,7 +498,7 @@ public:
   virtual void encode_header(bufferlist *header_bl);
   virtual void decode_header(bufferlist &header_bl);
   virtual void decode_values(std::map<std::string, bufferlist> &session_vals);
-  virtual void decode_legacy(bufferlist::iterator& blp);
+  virtual void decode_legacy(bufferlist::const_iterator& blp);
   void dump(Formatter *f) const;
 
   void set_rank(mds_rank_t r)
@@ -411,9 +512,9 @@ public:
     if (session_map_entry != session_map.end()) {
       s = session_map_entry->second;
     } else {
-      s = session_map[i.name] = new Session;
+      s = session_map[i.name] = new Session(ConnectionRef());
       s->info.inst = i;
-      s->last_cap_renew = ceph_clock_now();
+      s->last_cap_renew = Session::clock::now();
       if (logger) {
         logger->set(l_mdssm_session_count, session_map.size());
         logger->inc(l_mdssm_session_add);
@@ -430,7 +531,11 @@ public:
     session_map.clear();
   }
 
-  SessionMapStore() : version(0), logger(nullptr), rank(MDS_RANK_NONE) {}
+  SessionMapStore()
+    : version(0), logger(nullptr),
+      decay_rate(g_conf().get_val<double>("mds_request_load_average_decay_rate")),
+      total_load_avg(decay_rate), rank(MDS_RANK_NONE) {
+  }
   virtual ~SessionMapStore() {};
 };
 
@@ -439,16 +544,15 @@ public:
   MDSRank *mds;
 
 protected:
-  version_t projected, committing, committed;
+  version_t projected = 0, committing = 0, committed = 0;
 public:
   map<int,xlist<Session*>* > by_state;
   uint64_t set_state(Session *session, int state);
-  map<version_t, list<MDSInternalContextBase*> > commit_waiters;
+  map<version_t, MDSContext::vec > commit_waiters;
+  void update_average_session_age();
 
-  explicit SessionMap(MDSRank *m) : mds(m),
-                      projected(0), committing(0), committed(0),
-                       loaded_legacy(false)
-  { }
+  SessionMap() = delete;
+  explicit SessionMap(MDSRank *m) : mds(m) {}
 
   ~SessionMap() override
   {
@@ -490,9 +594,9 @@ public:
   }
 
   // sessions
-  void decode_legacy(bufferlist::iterator& blp) override;
+  void decode_legacy(bufferlist::const_iterator& blp) override;
   bool empty() const { return session_map.empty(); }
-  const ceph::unordered_map<entity_name_t, Session*> &get_sessions() const
+  const ceph::unordered_map<entity_name_t, Session*>get_sessions() const
   {
     return session_map;
   }
@@ -542,39 +646,27 @@ public:
 
   void dump();
 
-  void get_client_set(set<client_t>& s) {
-    for (ceph::unordered_map<entity_name_t,Session*>::iterator p = session_map.begin();
-        p != session_map.end();
-        ++p)
-      if (p->second->info.inst.name.is_client())
-       s.insert(p->second->info.inst.name.num());
-  }
-  void get_client_session_set(set<Session*>& s) const {
-    for (ceph::unordered_map<entity_name_t,Session*>::const_iterator p = session_map.begin();
-        p != session_map.end();
-        ++p)
-      if (p->second->info.inst.name.is_client())
-       s.insert(p->second);
-  }
-
-  void open_sessions(map<client_t,entity_inst_t>& client_map) {
-    for (map<client_t,entity_inst_t>::iterator p = client_map.begin(); 
-        p != client_map.end(); 
-        ++p) {
-      Session *s = get_or_add_session(p->second);
-      set_state(s, Session::STATE_OPEN);
-      version++;
+  template<typename F>
+  void get_client_sessions(F&& f) const {
+    for (const auto& p : session_map) {
+      auto& session = p.second;
+      if (session->info.inst.name.is_client())
+       f(session);
     }
   }
+  template<typename C>
+  void get_client_session_set(C& c) const {
+    auto f = [&c](auto& s) {
+      c.insert(s);
+    };
+    get_client_sessions(f);
+  }
 
   // helpers
   entity_inst_t& get_inst(entity_name_t w) {
-    assert(session_map.count(w));
+    ceph_assert(session_map.count(w));
     return session_map[w]->info.inst;
   }
-  version_t inc_push_seq(client_t client) {
-    return get_session(entity_name_t::CLIENT(client.v))->inc_push_seq();
-  }
   version_t get_push_seq(client_t client) {
     return get_session(entity_name_t::CLIENT(client.v))->get_push_seq();
   }
@@ -584,7 +676,7 @@ public:
   }
   void trim_completed_requests(entity_name_t c, ceph_tid_t tid) {
     Session *session = get_session(c);
-    assert(session);
+    ceph_assert(session);
     session->trim_completed_requests(tid);
   }
 
@@ -593,11 +685,11 @@ public:
 
   // -- loading, saving --
   inodeno_t ino;
-  list<MDSInternalContextBase*> waiting_for_load;
+  MDSContext::vec waiting_for_load;
 
   object_t get_object_name() const;
 
-  void load(MDSInternalContextBase *onload);
+  void load(MDSContext *onload);
   void _load_finish(
       int operation_r,
       int header_r,
@@ -610,14 +702,14 @@ public:
   void load_legacy();
   void _load_legacy_finish(int r, bufferlist &bl);
 
-  void save(MDSInternalContextBase *onsave, version_t needv=0);
+  void save(MDSContext *onsave, version_t needv=0);
   void _save_finish(version_t v);
 
 protected:
   std::set<entity_name_t> dirty_sessions;
   std::set<entity_name_t> null_sessions;
-  bool loaded_legacy;
-  void _mark_dirty(Session *session);
+  bool loaded_legacy = false;
+  void _mark_dirty(Session *session, bool may_save);
 public:
 
   /**
@@ -628,7 +720,7 @@ public:
    * to the backing store.  Must have called
    * mark_projected previously for this session.
    */
-  void mark_dirty(Session *session);
+  void mark_dirty(Session *session, bool may_save=true);
 
   /**
    * Advance the projected version, and mark this
@@ -656,6 +748,14 @@ public:
    */
   void replay_advance_version();
 
+  /**
+   * During replay, open sessions, advance versions and
+   * mark these sessions as dirty.
+   */
+  void replay_open_sessions(version_t event_cmapv,
+                           map<client_t,entity_inst_t>& client_map,
+                           map<client_t,client_metadata_t>& client_metadata_map);
+
   /**
    * For these session IDs, if a session exists with this ID, and it has
    * dirty completed_requests, then persist it immediately
@@ -664,6 +764,38 @@ public:
    */
   void save_if_dirty(const std::set<entity_name_t> &tgt_sessions,
                      MDSGatherBuilder *gather_bld);
+
+private:
+  time avg_birth_time = clock::zero();
+
+  uint64_t get_session_count_in_state(int state) {
+    return !is_any_state(state) ? 0 : by_state[state]->size();
+  }
+
+  void update_average_birth_time(const Session &s, bool added=true) {
+    uint32_t sessions = session_map.size();
+    time birth_time = s.get_birth_time();
+
+    if (sessions == 1) {
+      avg_birth_time = added ? birth_time : clock::zero();
+      return;
+    }
+
+    if (added) {
+      avg_birth_time = clock::time_point(
+        ((avg_birth_time - clock::zero()) / sessions) * (sessions - 1) +
+        (birth_time - clock::zero()) / sessions);
+    } else {
+      avg_birth_time = clock::time_point(
+        ((avg_birth_time - clock::zero()) / (sessions - 1)) * sessions -
+        (birth_time - clock::zero()) / (sessions - 1));
+    }
+  }
+
+public:
+  void hit_session(Session *session);
+  void handle_conf_change(const ConfigProxy &conf,
+                          const std::set <std::string> &changed);
 };
 
 std::ostream& operator<<(std::ostream &out, const Session &s);