1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #ifndef CEPH_OSD_SESSION_H
16 #define CEPH_OSD_SESSION_H
18 #include "common/RefCountedObj.h"
19 #include "common/ceph_mutex.h"
20 #include "global/global_context.h"
21 #include "include/spinlock.h"
25 #include "PeeringState.h"
27 //#define PG_DEBUG_REFS
31 #include "common/tracked_int_ptr.hpp"
32 typedef TrackedIntPtr
<PG
> PGRef
;
34 typedef boost::intrusive_ptr
<PG
> PGRef
;
38 * A Backoff represents one instance of either a PG or an OID
39 * being plugged at the client. It's refcounted and linked from
40 * the PG {pg_oid}_backoffs map and from the client Session
43 * The Backoff has a lock that protects it's internal fields.
45 * The PG has a backoff_lock that protects it's maps to Backoffs.
46 * This lock is *inside* of Backoff::lock.
48 * The Session has a backoff_lock that protects it's map of pg and
49 * oid backoffs. This lock is *inside* the Backoff::lock *and*
56 * Session::backoff_lock
58 * When the Session goes away, we move our backoff lists aside,
59 * then we lock each of the Backoffs we
60 * previously referenced and clear the Session* pointer. If the PG
61 * is still linked, we unlink it, too.
63 * When the PG clears the backoff, it will send an unblock message
64 * if the Session* is still non-null, and unlink the session.
68 struct Backoff
: public RefCountedObject
{
70 STATE_NEW
= 1, ///< backoff in flight to client
71 STATE_ACKED
= 2, ///< backoff acked
72 STATE_DELETING
= 3 ///< backoff deleted, but un-acked
74 std::atomic
<int> state
= {STATE_NEW
};
75 spg_t pgid
; ///< owning pgid
76 uint64_t id
= 0; ///< unique id (within the Session)
79 return state
.load() == STATE_NEW
;
81 bool is_acked() const {
82 return state
.load() == STATE_ACKED
;
84 bool is_deleting() const {
85 return state
.load() == STATE_DELETING
;
87 const char *get_state_name() const {
88 switch (state
.load()) {
89 case STATE_NEW
: return "new";
90 case STATE_ACKED
: return "acked";
91 case STATE_DELETING
: return "deleting";
92 default: return "???";
96 ceph::mutex lock
= ceph::make_mutex("Backoff::lock");
97 // NOTE: the owning PG and session are either
99 // - both null (teardown), or
100 // - only session is set (and state == DELETING)
101 PGRef pg
; ///< owning pg
102 ceph::ref_t
<struct Session
> session
; ///< owning session
103 hobject_t begin
, end
; ///< [) range to block, unless ==, then single obj
105 friend ostream
& operator<<(ostream
& out
, const Backoff
& b
) {
106 return out
<< "Backoff(" << &b
<< " " << b
.pgid
<< " " << b
.id
107 << " " << b
.get_state_name()
108 << " [" << b
.begin
<< "," << b
.end
<< ") "
109 << " session " << b
.session
110 << " pg " << b
.pg
<< ")";
114 FRIEND_MAKE_REF(Backoff
);
115 Backoff(spg_t pgid
, PGRef pg
, ceph::ref_t
<Session
> s
,
117 const hobject_t
& b
, const hobject_t
& e
)
118 : RefCountedObject(g_ceph_context
),
122 session(std::move(s
)),
129 struct Session
: public RefCountedObject
{
130 EntityName entity_name
;
133 entity_addr_t socket_addr
;
134 WatchConState wstate
;
136 ceph::mutex session_dispatch_lock
=
137 ceph::make_mutex("Session::session_dispatch_lock");
138 boost::intrusive::list
<OpRequest
> waiting_on_map
;
140 ceph::spinlock sent_epoch_lock
;
141 epoch_t last_sent_epoch
= 0;
143 /// protects backoffs; orders inside Backoff::lock *and* PG::backoff_lock
144 ceph::mutex backoff_lock
= ceph::make_mutex("Session::backoff_lock");
145 std::atomic
<int> backoff_count
= {0}; ///< simple count of backoffs
146 std::map
<spg_t
, std::map
<hobject_t
, std::set
<ceph::ref_t
<Backoff
>>>> backoffs
;
148 std::atomic
<uint64_t> backoff_seq
= {0};
150 // for heartbeat connections only
152 HeartbeatStampsRef stamps
;
154 entity_addr_t
& get_peer_socket_addr() {
162 const hobject_t
& start
,
163 const hobject_t
& end
);
165 ceph::ref_t
<Backoff
> have_backoff(spg_t pgid
, const hobject_t
& oid
) {
166 if (!backoff_count
.load()) {
169 std::lock_guard
l(backoff_lock
);
170 ceph_assert(!backoff_count
== backoffs
.empty());
171 auto i
= backoffs
.find(pgid
);
172 if (i
== backoffs
.end()) {
175 auto p
= i
->second
.lower_bound(oid
);
176 if (p
!= i
->second
.begin() &&
177 (p
== i
->second
.end() || p
->first
> oid
)) {
180 if (p
!= i
->second
.end()) {
181 int r
= cmp(oid
, p
->first
);
182 if (r
== 0 || r
> 0) {
183 for (auto& q
: p
->second
) {
184 if (r
== 0 || oid
< q
->end
) {
194 CephContext
*cct
, spg_t pgid
, const hobject_t
& oid
, const Message
*m
);
196 void add_backoff(ceph::ref_t
<Backoff
> b
) {
197 std::lock_guard
l(backoff_lock
);
198 ceph_assert(!backoff_count
== backoffs
.empty());
199 backoffs
[b
->pgid
][b
->begin
].insert(std::move(b
));
203 // called by PG::release_*_backoffs and PG::clear_backoffs()
204 void rm_backoff(const ceph::ref_t
<Backoff
>& b
) {
205 std::lock_guard
l(backoff_lock
);
206 ceph_assert(ceph_mutex_is_locked_by_me(b
->lock
));
207 ceph_assert(b
->session
== this);
208 auto i
= backoffs
.find(b
->pgid
);
209 if (i
!= backoffs
.end()) {
210 // may race with clear_backoffs()
211 auto p
= i
->second
.find(b
->begin
);
212 if (p
!= i
->second
.end()) {
213 auto q
= p
->second
.find(b
);
214 if (q
!= p
->second
.end()) {
217 if (p
->second
.empty()) {
219 if (i
->second
.empty()) {
226 ceph_assert(!backoff_count
== backoffs
.empty());
228 void clear_backoffs();
231 FRIEND_MAKE_REF(Session
);
232 explicit Session(CephContext
*cct
, Connection
*con_
) :
233 RefCountedObject(cct
),
235 socket_addr(con_
->get_peer_socket_addr()),