#define CEPH_OSD_SESSION_H
#include "common/RefCountedObj.h"
-#include "common/Mutex.h"
-#include "include/Spinlock.h"
+#include "common/ceph_mutex.h"
+#include "global/global_context.h"
+#include "include/spinlock.h"
#include "OSDCap.h"
#include "Watch.h"
#include "OSDMap.h"
+#include "PeeringState.h"
+
+//#define PG_DEBUG_REFS
-struct Session;
-typedef boost::intrusive_ptr<Session> SessionRef;
-struct Backoff;
-typedef boost::intrusive_ptr<Backoff> BackoffRef;
class PG;
#ifdef PG_DEBUG_REFS
#include "common/tracked_int_ptr.hpp"
STATE_ACKED = 2, ///< backoff acked
STATE_DELETING = 3 ///< backoff deleted, but un-acked
};
- std::atomic_int state = {STATE_NEW};
+ std::atomic<int> state = {STATE_NEW};
spg_t pgid; ///< owning pgid
uint64_t id = 0; ///< unique id (within the Session)
}
}
- Mutex lock;
+ ceph::mutex lock = ceph::make_mutex("Backoff::lock");
// NOTE: the owning PG and session are either
// - *both* set, or
// - both null (teardown), or
// - only session is set (and state == DELETING)
PGRef pg; ///< owning pg
- SessionRef session; ///< owning session
+ ceph::ref_t<class Session> session; ///< owning session
hobject_t begin, end; ///< [) range to block, unless ==, then single obj
- Backoff(spg_t pgid, PGRef pg, SessionRef s,
- uint64_t i,
- const hobject_t& b, const hobject_t& e)
- : RefCountedObject(g_ceph_context, 0),
- pgid(pgid),
- id(i),
- lock("Backoff::lock"),
- pg(pg),
- session(s),
- begin(b),
- end(e) {}
-
friend ostream& operator<<(ostream& out, const Backoff& b) {
return out << "Backoff(" << &b << " " << b.pgid << " " << b.id
<< " " << b.get_state_name()
<< " session " << b.session
<< " pg " << b.pg << ")";
}
+
+private:
+ FRIEND_MAKE_REF(Backoff);
+ Backoff(spg_t pgid, PGRef pg, ceph::ref_t<Session> s,
+ uint64_t i,
+ const hobject_t& b, const hobject_t& e)
+ : RefCountedObject(g_ceph_context),
+ pgid(pgid),
+ id(i),
+ pg(pg),
+ session(std::move(s)),
+ begin(b),
+ end(e) {}
};
struct Session : public RefCountedObject {
EntityName entity_name;
OSDCap caps;
- int64_t auid;
ConnectionRef con;
+ entity_addr_t socket_addr;
WatchConState wstate;
- Mutex session_dispatch_lock;
+ ceph::mutex session_dispatch_lock =
+ ceph::make_mutex("Session::session_dispatch_lock");
boost::intrusive::list<OpRequest> waiting_on_map;
- Spinlock sent_epoch_lock;
- epoch_t last_sent_epoch;
- Spinlock received_map_lock;
- epoch_t received_map_epoch; // largest epoch seen in MOSDMap from here
+ ceph::spinlock sent_epoch_lock;
+ epoch_t last_sent_epoch = 0;
/// protects backoffs; orders inside Backoff::lock *and* PG::backoff_lock
- Mutex backoff_lock;
- std::atomic_int backoff_count= {0}; ///< simple count of backoffs
- map<spg_t,map<hobject_t,set<BackoffRef>>> backoffs;
+ ceph::mutex backoff_lock = ceph::make_mutex("Session::backoff_lock");
+ std::atomic<int> backoff_count= {0}; ///< simple count of backoffs
+ map<spg_t,map<hobject_t,set<ceph::ref_t<Backoff>>>> backoffs;
std::atomic<uint64_t> backoff_seq = {0};
- explicit Session(CephContext *cct) :
- RefCountedObject(cct),
- auid(-1), con(0),
- wstate(cct),
- session_dispatch_lock("Session::session_dispatch_lock"),
- last_sent_epoch(0), received_map_epoch(0),
- backoff_lock("Session::backoff_lock")
- {}
+ // for heartbeat connections only
+ int peer = -1;
+ HeartbeatStampsRef stamps;
+
+ entity_addr_t& get_peer_socket_addr() {
+ return socket_addr;
+ }
void ack_backoff(
CephContext *cct,
const hobject_t& start,
const hobject_t& end);
- BackoffRef have_backoff(spg_t pgid, const hobject_t& oid) {
+ ceph::ref_t<Backoff> have_backoff(spg_t pgid, const hobject_t& oid) {
if (!backoff_count.load()) {
return nullptr;
}
- Mutex::Locker l(backoff_lock);
- assert(!backoff_count == backoffs.empty());
+ std::lock_guard l(backoff_lock);
+ ceph_assert(!backoff_count == backoffs.empty());
auto i = backoffs.find(pgid);
if (i == backoffs.end()) {
return nullptr;
bool check_backoff(
CephContext *cct, spg_t pgid, const hobject_t& oid, const Message *m);
- void add_backoff(BackoffRef b) {
- Mutex::Locker l(backoff_lock);
- assert(!backoff_count == backoffs.empty());
- backoffs[b->pgid][b->begin].insert(b);
+ void add_backoff(ceph::ref_t<Backoff> b) {
+ std::lock_guard l(backoff_lock);
+ ceph_assert(!backoff_count == backoffs.empty());
+ backoffs[b->pgid][b->begin].insert(std::move(b));
++backoff_count;
}
// called by PG::release_*_backoffs and PG::clear_backoffs()
- void rm_backoff(BackoffRef b) {
- Mutex::Locker l(backoff_lock);
- assert(b->lock.is_locked_by_me());
- assert(b->session == this);
+ void rm_backoff(const ceph::ref_t<Backoff>& b) {
+ std::lock_guard l(backoff_lock);
+ ceph_assert(ceph_mutex_is_locked_by_me(b->lock));
+ ceph_assert(b->session == this);
auto i = backoffs.find(b->pgid);
if (i != backoffs.end()) {
// may race with clear_backoffs()
}
}
}
- assert(!backoff_count == backoffs.empty());
+ ceph_assert(!backoff_count == backoffs.empty());
}
void clear_backoffs();
+
+private:
+ FRIEND_MAKE_REF(Session);
+ explicit Session(CephContext *cct, Connection *con_) :
+ RefCountedObject(cct),
+ con(con_),
+ socket_addr(con_->get_peer_socket_addr()),
+ wstate(cct)
+ {}
};
#endif