]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/mds/SessionMap.h
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / mds / SessionMap.h
index 23e874831e40acc412b1bab220f51b9b8e8f08d3..8d3c9cc3eea051d005333b7f94d03d2edb54476b 100644 (file)
@@ -34,6 +34,7 @@ struct MDRequestImpl;
 
 #include "CInode.h"
 #include "Capability.h"
+#include "MDSContext.h"
 #include "msg/Message.h"
 
 enum {
@@ -83,7 +84,7 @@ public:
     STATE_KILLING = 5
   };
 
-  const char *get_state_name(int s) const {
+  static std::string_view get_state_name(int s) {
     switch (s) {
     case STATE_CLOSED: return "closed";
     case STATE_OPENING: return "opening";
@@ -110,16 +111,17 @@ private:
   std::deque<version_t> projected;
 
   // request load average for this session
-  mutable DecayCounter load_avg;
-  DecayRate    load_avg_rate;
+  DecayCounter load_avg;
 
   // Ephemeral state for tracking progress of capability recalls
   // caps being recalled recently by this session; used for Beacon warnings
-  mutable DecayCounter recall_caps;
+  DecayCounter recall_caps;
   // caps that have been released
-  mutable DecayCounter release_caps;
+  DecayCounter release_caps;
   // throttle on caps recalled
-  mutable DecayCounter recall_caps_throttle;
+  DecayCounter recall_caps_throttle;
+  // second order throttle that prevents recalling too quickly
+  DecayCounter recall_caps_throttle2o;
   // New limit in SESSION_RECALL
   uint32_t recall_limit = 0;
 
@@ -130,17 +132,18 @@ private:
   time birth_time;
 
 public:
+  Session *reclaiming_from = nullptr;
 
   void push_pv(version_t pv)
   {
-    assert(projected.empty() || projected.back() != pv);
+    ceph_assert(projected.empty() || projected.back() != pv);
     projected.push_back(pv);
   }
 
   void pop_pv(version_t v)
   {
-    assert(!projected.empty());
-    assert(projected.front() == v);
+    ceph_assert(!projected.empty());
+    ceph_assert(projected.front() == v);
     projected.pop_front();
   }
 
@@ -152,23 +155,27 @@ public:
       state_seq++;
     }
   }
-  void decode(bufferlist::iterator &p);
+  void decode(bufferlist::const_iterator &p);
   template<typename T>
   void set_client_metadata(T&& meta)
   {
     info.client_metadata = std::forward<T>(meta);
     _update_human_name();
   }
-  std::string get_human_name() const {return human_name;}
+
+  const std::string& get_human_name() const {return human_name;}
 
   session_info_t info;                         ///< durable bits
 
   MDSAuthCaps auth_caps;
 
+protected:
   ConnectionRef connection;
+public:
+  entity_addr_t socket_addr;
   xlist<Session*>::item item_session_list;
 
-  list<Message*> preopen_out_queue;  ///< messages for client, queued before they connect
+  list<Message::ref> preopen_out_queue;  ///< messages for client, queued before they connect
 
   elist<MDRequestImpl*> requests;
   size_t get_request_count();
@@ -177,14 +184,17 @@ public:
 
   void notify_cap_release(size_t n_caps);
   uint64_t notify_recall_sent(size_t new_limit);
-  double get_recall_caps_throttle() const {
-    return recall_caps_throttle.get(ceph_clock_now());
+  auto get_recall_caps_throttle() const {
+    return recall_caps_throttle.get();
   }
-  double get_recall_caps() const {
-    return recall_caps.get(ceph_clock_now());
+  auto get_recall_caps_throttle2o() const {
+    return recall_caps_throttle2o.get();
   }
-  double get_release_caps() const {
-    return release_caps.get(ceph_clock_now());
+  auto get_recall_caps() const {
+    return recall_caps.get();
+  }
+  auto get_release_caps() const {
+    return release_caps.get();
   }
 
   inodeno_t next_ino() const {
@@ -193,7 +203,7 @@ public:
     return info.prealloc_inos.range_start();
   }
   inodeno_t take_ino(inodeno_t ino = 0) {
-    assert(!info.prealloc_inos.empty());
+    ceph_assert(!info.prealloc_inos.empty());
 
     if (ino) {
       if (info.prealloc_inos.contains(ino))
@@ -216,7 +226,7 @@ public:
     return info.get_client();
   }
 
-  const char *get_state_name() const { return get_state_name(state); }
+  std::string_view get_state_name() const { return get_state_name(state); }
   uint64_t get_state_seq() const { return state_seq; }
   bool is_closed() const { return state == STATE_CLOSED; }
   bool is_opening() const { return state == STATE_OPENING; }
@@ -229,20 +239,20 @@ public:
     ++importing_count;
   }
   void dec_importing() {
-    assert(importing_count > 0);
+    ceph_assert(importing_count > 0);
     --importing_count;
   }
   bool is_importing() const { return importing_count > 0; }
 
   void set_load_avg_decay_rate(double rate) {
-    assert(is_open() || is_stale());
-    load_avg_rate.set_halflife(rate);
+    ceph_assert(is_open() || is_stale());
+    load_avg = DecayCounter(rate);
   }
   uint64_t get_load_avg() const {
-    return (uint64_t)load_avg.get(ceph_clock_now(), load_avg_rate);
+    return (uint64_t)load_avg.get();
   }
   void hit_session() {
-    load_avg.hit(ceph_clock_now(), load_avg_rate);
+    load_avg.adjust();
   }
 
   double get_session_uptime() const {
@@ -258,13 +268,13 @@ public:
 private:
   uint32_t cap_gen = 0;
   version_t cap_push_seq = 0;        // cap push seq #
-  map<version_t, list<MDSInternalContextBase*> > waitfor_flush; // flush session messages
+  map<version_t, MDSContext::vec > waitfor_flush; // flush session messages
 
 public:
   xlist<Capability*> caps;     // inodes with caps; front=most recently used
   xlist<ClientLease*> leases;  // metadata leases to clients
-  time last_cap_renew = time::min();
-  time last_seen = time::min();
+  time last_cap_renew = clock::zero();
+  time last_seen = clock::zero();
 
   void inc_cap_gen() { ++cap_gen; }
   uint32_t get_cap_gen() const { return cap_gen; }
@@ -272,29 +282,37 @@ public:
   version_t inc_push_seq() { return ++cap_push_seq; }
   version_t get_push_seq() const { return cap_push_seq; }
 
-  version_t wait_for_flush(MDSInternalContextBase* c) {
+  version_t wait_for_flush(MDSContext* c) {
     waitfor_flush[get_push_seq()].push_back(c);
     return get_push_seq();
   }
-  void finish_flush(version_t seq, list<MDSInternalContextBase*>& ls) {
+  void finish_flush(version_t seq, MDSContext::vec& ls) {
     while (!waitfor_flush.empty()) {
-      if (waitfor_flush.begin()->first > seq)
+      auto it = waitfor_flush.begin();
+      if (it->first > seq)
        break;
-      ls.splice(ls.end(), waitfor_flush.begin()->second);
-      waitfor_flush.erase(waitfor_flush.begin());
+      auto& v = it->second;
+      ls.insert(ls.end(), v.begin(), v.end());
+      waitfor_flush.erase(it);
     }
   }
 
   void touch_cap(Capability *cap) {
     caps.push_front(&cap->item_session_caps);
   }
+
   void touch_cap_bottom(Capability *cap) {
     caps.push_back(&cap->item_session_caps);
   }
+
   void touch_lease(ClientLease *r) {
     leases.push_back(&r->item_session_lease);
   }
 
+  bool is_any_flush_waiter() {
+    return !waitfor_flush.empty();
+  }
+
   // -- leases --
   uint32_t lease_seq = 0;
 
@@ -382,34 +400,42 @@ public:
 
   Session() = delete;
   Session(ConnectionRef con) :
-    recall_caps(ceph_clock_now(), g_conf->get_val<double>("mds_recall_warning_decay_rate")),
-    release_caps(ceph_clock_now(), g_conf->get_val<double>("mds_recall_warning_decay_rate")),
-    recall_caps_throttle(ceph_clock_now(), g_conf->get_val<double>("mds_recall_max_decay_rate")),
+    recall_caps(g_conf().get_val<double>("mds_recall_warning_decay_rate")),
+    release_caps(g_conf().get_val<double>("mds_recall_warning_decay_rate")),
+    recall_caps_throttle(g_conf().get_val<double>("mds_recall_max_decay_rate")),
+    recall_caps_throttle2o(0.5),
     birth_time(clock::now()),
     auth_caps(g_ceph_context),
     item_session_list(this),
     requests(0)  // member_offset passed to front() manually
   {
-    connection = std::move(con);
+    set_connection(std::move(con));
   }
   ~Session() override {
     if (state == STATE_CLOSED) {
       item_session_list.remove_myself();
     } else {
-      assert(!item_session_list.is_on_list());
+      ceph_assert(!item_session_list.is_on_list());
     }
-    while (!preopen_out_queue.empty()) {
-      preopen_out_queue.front()->put();
-      preopen_out_queue.pop_front();
+    preopen_out_queue.clear();
+  }
+
+  void set_connection(ConnectionRef con) {
+    connection = std::move(con);
+    if (connection) {
+      socket_addr = connection->get_peer_socket_addr();
     }
   }
+  const ConnectionRef& get_connection() const {
+    return connection;
+  }
 
   void clear() {
     pending_prealloc_inos.clear();
     info.clear_meta();
 
     cap_push_seq = 0;
-    last_cap_renew = time::min();
+    last_cap_renew = clock::zero();
   }
 };
 
@@ -463,7 +489,6 @@ protected:
   // total request load avg
   double decay_rate;
   DecayCounter total_load_avg;
-  DecayRate    total_load_avg_rate;
 
 public:
   mds_rank_t rank;
@@ -473,7 +498,7 @@ public:
   virtual void encode_header(bufferlist *header_bl);
   virtual void decode_header(bufferlist &header_bl);
   virtual void decode_values(std::map<std::string, bufferlist> &session_vals);
-  virtual void decode_legacy(bufferlist::iterator& blp);
+  virtual void decode_legacy(bufferlist::const_iterator& blp);
   void dump(Formatter *f) const;
 
   void set_rank(mds_rank_t r)
@@ -508,8 +533,8 @@ public:
 
   SessionMapStore()
     : version(0), logger(nullptr),
-      decay_rate(g_conf->get_val<double>("mds_request_load_average_decay_rate")),
-      total_load_avg_rate(decay_rate), rank(MDS_RANK_NONE) {
+      decay_rate(g_conf().get_val<double>("mds_request_load_average_decay_rate")),
+      total_load_avg(decay_rate), rank(MDS_RANK_NONE) {
   }
   virtual ~SessionMapStore() {};
 };
@@ -523,7 +548,7 @@ protected:
 public:
   map<int,xlist<Session*>* > by_state;
   uint64_t set_state(Session *session, int state);
-  map<version_t, list<MDSInternalContextBase*> > commit_waiters;
+  map<version_t, MDSContext::vec > commit_waiters;
   void update_average_session_age();
 
   SessionMap() = delete;
@@ -569,9 +594,9 @@ public:
   }
 
   // sessions
-  void decode_legacy(bufferlist::iterator& blp) override;
+  void decode_legacy(bufferlist::const_iterator& blp) override;
   bool empty() const { return session_map.empty(); }
-  const ceph::unordered_map<entity_name_t, Session*> &get_sessions() const
+  const ceph::unordered_map<entity_name_t, Session*>get_sessions() const
   {
     return session_map;
   }
@@ -631,30 +656,17 @@ public:
   }
   template<typename C>
   void get_client_session_set(C& c) const {
-    auto f = [&c](Session* s) {
+    auto f = [&c](auto& s) {
       c.insert(s);
     };
     get_client_sessions(f);
   }
 
-  void replay_open_sessions(map<client_t,entity_inst_t>& client_map) {
-    for (map<client_t,entity_inst_t>::iterator p = client_map.begin(); 
-        p != client_map.end(); 
-        ++p) {
-      Session *s = get_or_add_session(p->second);
-      set_state(s, Session::STATE_OPEN);
-      replay_dirty_session(s);
-    }
-  }
-
   // helpers
   entity_inst_t& get_inst(entity_name_t w) {
-    assert(session_map.count(w));
+    ceph_assert(session_map.count(w));
     return session_map[w]->info.inst;
   }
-  version_t inc_push_seq(client_t client) {
-    return get_session(entity_name_t::CLIENT(client.v))->inc_push_seq();
-  }
   version_t get_push_seq(client_t client) {
     return get_session(entity_name_t::CLIENT(client.v))->get_push_seq();
   }
@@ -664,7 +676,7 @@ public:
   }
   void trim_completed_requests(entity_name_t c, ceph_tid_t tid) {
     Session *session = get_session(c);
-    assert(session);
+    ceph_assert(session);
     session->trim_completed_requests(tid);
   }
 
@@ -673,11 +685,11 @@ public:
 
   // -- loading, saving --
   inodeno_t ino;
-  list<MDSInternalContextBase*> waiting_for_load;
+  MDSContext::vec waiting_for_load;
 
   object_t get_object_name() const;
 
-  void load(MDSInternalContextBase *onload);
+  void load(MDSContext *onload);
   void _load_finish(
       int operation_r,
       int header_r,
@@ -690,14 +702,14 @@ public:
   void load_legacy();
   void _load_legacy_finish(int r, bufferlist &bl);
 
-  void save(MDSInternalContextBase *onsave, version_t needv=0);
+  void save(MDSContext *onsave, version_t needv=0);
   void _save_finish(version_t v);
 
 protected:
   std::set<entity_name_t> dirty_sessions;
   std::set<entity_name_t> null_sessions;
   bool loaded_legacy = false;
-  void _mark_dirty(Session *session);
+  void _mark_dirty(Session *session, bool may_save);
 public:
 
   /**
@@ -708,7 +720,7 @@ public:
    * to the backing store.  Must have called
    * mark_projected previously for this session.
    */
-  void mark_dirty(Session *session);
+  void mark_dirty(Session *session, bool may_save=true);
 
   /**
    * Advance the projected version, and mark this
@@ -736,6 +748,14 @@ public:
    */
   void replay_advance_version();
 
+  /**
+   * During replay, open sessions, advance versions and
+   * mark these sessions as dirty.
+   */
+  void replay_open_sessions(version_t event_cmapv,
+                           map<client_t,entity_inst_t>& client_map,
+                           map<client_t,client_metadata_t>& client_metadata_map);
+
   /**
    * For these session IDs, if a session exists with this ID, and it has
    * dirty completed_requests, then persist it immediately
@@ -746,7 +766,7 @@ public:
                      MDSGatherBuilder *gather_bld);
 
 private:
-  time avg_birth_time = time::min();
+  time avg_birth_time = clock::zero();
 
   uint64_t get_session_count_in_state(int state) {
     return !is_any_state(state) ? 0 : by_state[state]->size();
@@ -757,24 +777,24 @@ private:
     time birth_time = s.get_birth_time();
 
     if (sessions == 1) {
-      avg_birth_time = added ? birth_time : time::min();
+      avg_birth_time = added ? birth_time : clock::zero();
       return;
     }
 
     if (added) {
       avg_birth_time = clock::time_point(
-        ((avg_birth_time - time::min()) / sessions) * (sessions - 1) +
-        (birth_time - time::min()) / sessions);
+        ((avg_birth_time - clock::zero()) / sessions) * (sessions - 1) +
+        (birth_time - clock::zero()) / sessions);
     } else {
       avg_birth_time = clock::time_point(
-        ((avg_birth_time - time::min()) / (sessions - 1)) * sessions -
-        (birth_time - time::min()) / (sessions - 1));
+        ((avg_birth_time - clock::zero()) / (sessions - 1)) * sessions -
+        (birth_time - clock::zero()) / (sessions - 1));
     }
   }
 
 public:
   void hit_session(Session *session);
-  void handle_conf_change(const struct md_config_t *conf,
+  void handle_conf_change(const ConfigProxy &conf,
                           const std::set <std::string> &changed);
 };