]> git.proxmox.com Git - ceph.git/blob - ceph/src/osd/Session.h
import 15.2.0 Octopus source
[ceph.git] / ceph / src / osd / Session.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_OSD_SESSION_H
16 #define CEPH_OSD_SESSION_H
17
18 #include "common/RefCountedObj.h"
19 #include "common/ceph_mutex.h"
20 #include "global/global_context.h"
21 #include "include/spinlock.h"
22 #include "OSDCap.h"
23 #include "Watch.h"
24 #include "OSDMap.h"
25 #include "PeeringState.h"
26
27 //#define PG_DEBUG_REFS
28
29 class PG;
30 #ifdef PG_DEBUG_REFS
31 #include "common/tracked_int_ptr.hpp"
32 typedef TrackedIntPtr<PG> PGRef;
33 #else
34 typedef boost::intrusive_ptr<PG> PGRef;
35 #endif
36
37 /*
38 * A Backoff represents one instance of either a PG or an OID
39 * being plugged at the client. It's refcounted and linked from
40 * the PG {pg_oid}_backoffs map and from the client Session
41 * object.
42 *
43 * The Backoff has a lock that protects it's internal fields.
44 *
45 * The PG has a backoff_lock that protects it's maps to Backoffs.
46 * This lock is *inside* of Backoff::lock.
47 *
48 * The Session has a backoff_lock that protects it's map of pg and
49 * oid backoffs. This lock is *inside* the Backoff::lock *and*
50 * PG::backoff_lock.
51 *
52 * That's
53 *
54 * Backoff::lock
55 * PG::backoff_lock
56 * Session::backoff_lock
57 *
58 * When the Session goes away, we move our backoff lists aside,
59 * then we lock each of the Backoffs we
60 * previously referenced and clear the Session* pointer. If the PG
61 * is still linked, we unlink it, too.
62 *
63 * When the PG clears the backoff, it will send an unblock message
64 * if the Session* is still non-null, and unlink the session.
65 *
66 */
67
68 struct Backoff : public RefCountedObject {
69 enum {
70 STATE_NEW = 1, ///< backoff in flight to client
71 STATE_ACKED = 2, ///< backoff acked
72 STATE_DELETING = 3 ///< backoff deleted, but un-acked
73 };
74 std::atomic<int> state = {STATE_NEW};
75 spg_t pgid; ///< owning pgid
76 uint64_t id = 0; ///< unique id (within the Session)
77
78 bool is_new() const {
79 return state.load() == STATE_NEW;
80 }
81 bool is_acked() const {
82 return state.load() == STATE_ACKED;
83 }
84 bool is_deleting() const {
85 return state.load() == STATE_DELETING;
86 }
87 const char *get_state_name() const {
88 switch (state.load()) {
89 case STATE_NEW: return "new";
90 case STATE_ACKED: return "acked";
91 case STATE_DELETING: return "deleting";
92 default: return "???";
93 }
94 }
95
96 ceph::mutex lock = ceph::make_mutex("Backoff::lock");
97 // NOTE: the owning PG and session are either
98 // - *both* set, or
99 // - both null (teardown), or
100 // - only session is set (and state == DELETING)
101 PGRef pg; ///< owning pg
102 ceph::ref_t<class Session> session; ///< owning session
103 hobject_t begin, end; ///< [) range to block, unless ==, then single obj
104
105 friend ostream& operator<<(ostream& out, const Backoff& b) {
106 return out << "Backoff(" << &b << " " << b.pgid << " " << b.id
107 << " " << b.get_state_name()
108 << " [" << b.begin << "," << b.end << ") "
109 << " session " << b.session
110 << " pg " << b.pg << ")";
111 }
112
113 private:
114 FRIEND_MAKE_REF(Backoff);
115 Backoff(spg_t pgid, PGRef pg, ceph::ref_t<Session> s,
116 uint64_t i,
117 const hobject_t& b, const hobject_t& e)
118 : RefCountedObject(g_ceph_context),
119 pgid(pgid),
120 id(i),
121 pg(pg),
122 session(std::move(s)),
123 begin(b),
124 end(e) {}
125 };
126
127
128
129 struct Session : public RefCountedObject {
130 EntityName entity_name;
131 OSDCap caps;
132 ConnectionRef con;
133 entity_addr_t socket_addr;
134 WatchConState wstate;
135
136 ceph::mutex session_dispatch_lock =
137 ceph::make_mutex("Session::session_dispatch_lock");
138 boost::intrusive::list<OpRequest> waiting_on_map;
139
140 ceph::spinlock sent_epoch_lock;
141 epoch_t last_sent_epoch = 0;
142
143 /// protects backoffs; orders inside Backoff::lock *and* PG::backoff_lock
144 ceph::mutex backoff_lock = ceph::make_mutex("Session::backoff_lock");
145 std::atomic<int> backoff_count= {0}; ///< simple count of backoffs
146 map<spg_t,map<hobject_t,set<ceph::ref_t<Backoff>>>> backoffs;
147
148 std::atomic<uint64_t> backoff_seq = {0};
149
150 // for heartbeat connections only
151 int peer = -1;
152 HeartbeatStampsRef stamps;
153
154 entity_addr_t& get_peer_socket_addr() {
155 return socket_addr;
156 }
157
158 void ack_backoff(
159 CephContext *cct,
160 spg_t pgid,
161 uint64_t id,
162 const hobject_t& start,
163 const hobject_t& end);
164
165 ceph::ref_t<Backoff> have_backoff(spg_t pgid, const hobject_t& oid) {
166 if (!backoff_count.load()) {
167 return nullptr;
168 }
169 std::lock_guard l(backoff_lock);
170 ceph_assert(!backoff_count == backoffs.empty());
171 auto i = backoffs.find(pgid);
172 if (i == backoffs.end()) {
173 return nullptr;
174 }
175 auto p = i->second.lower_bound(oid);
176 if (p != i->second.begin() &&
177 (p == i->second.end() || p->first > oid)) {
178 --p;
179 }
180 if (p != i->second.end()) {
181 int r = cmp(oid, p->first);
182 if (r == 0 || r > 0) {
183 for (auto& q : p->second) {
184 if (r == 0 || oid < q->end) {
185 return &(*q);
186 }
187 }
188 }
189 }
190 return nullptr;
191 }
192
193 bool check_backoff(
194 CephContext *cct, spg_t pgid, const hobject_t& oid, const Message *m);
195
196 void add_backoff(ceph::ref_t<Backoff> b) {
197 std::lock_guard l(backoff_lock);
198 ceph_assert(!backoff_count == backoffs.empty());
199 backoffs[b->pgid][b->begin].insert(std::move(b));
200 ++backoff_count;
201 }
202
203 // called by PG::release_*_backoffs and PG::clear_backoffs()
204 void rm_backoff(const ceph::ref_t<Backoff>& b) {
205 std::lock_guard l(backoff_lock);
206 ceph_assert(ceph_mutex_is_locked_by_me(b->lock));
207 ceph_assert(b->session == this);
208 auto i = backoffs.find(b->pgid);
209 if (i != backoffs.end()) {
210 // may race with clear_backoffs()
211 auto p = i->second.find(b->begin);
212 if (p != i->second.end()) {
213 auto q = p->second.find(b);
214 if (q != p->second.end()) {
215 p->second.erase(q);
216 --backoff_count;
217 if (p->second.empty()) {
218 i->second.erase(p);
219 if (i->second.empty()) {
220 backoffs.erase(i);
221 }
222 }
223 }
224 }
225 }
226 ceph_assert(!backoff_count == backoffs.empty());
227 }
228 void clear_backoffs();
229
230 private:
231 FRIEND_MAKE_REF(Session);
232 explicit Session(CephContext *cct, Connection *con_) :
233 RefCountedObject(cct),
234 con(con_),
235 socket_addr(con_->get_peer_socket_addr()),
236 wstate(cct)
237 {}
238 };
239
240 #endif