]> git.proxmox.com Git - ceph.git/blob - ceph/src/osd/Watch.h
8d6d93a7d1c778e768af394a9892aa0d99a7570c
[ceph.git] / ceph / src / osd / Watch.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14 #ifndef CEPH_WATCH_H
15 #define CEPH_WATCH_H
16
17 #include <set>
18 #include "msg/Connection.h"
19 #include "include/Context.h"
20
21 enum WatcherState {
22 WATCHER_PENDING,
23 WATCHER_NOTIFIED,
24 };
25
26 class OSDService;
27 class PrimaryLogPG;
28 void intrusive_ptr_add_ref(PrimaryLogPG *pg);
29 void intrusive_ptr_release(PrimaryLogPG *pg);
30 struct ObjectContext;
31 class MWatchNotify;
32
33 class Watch;
34 typedef std::shared_ptr<Watch> WatchRef;
35 typedef std::weak_ptr<Watch> WWatchRef;
36
37 class Notify;
38 typedef std::shared_ptr<Notify> NotifyRef;
39 typedef std::weak_ptr<Notify> WNotifyRef;
40
41 struct CancelableContext;
42
43 /**
44 * Notify tracks the progress of a particular notify
45 *
46 * References are held by Watch and the timeout callback.
47 */
48 class Notify {
49 friend class NotifyTimeoutCB;
50 friend class Watch;
51 WNotifyRef self;
52 ConnectionRef client;
53 uint64_t client_gid;
54 bool complete;
55 bool discarded;
56 bool timed_out; ///< true if the notify timed out
57 std::set<WatchRef> watchers;
58
59 ceph::buffer::list payload;
60 uint32_t timeout;
61 uint64_t cookie;
62 uint64_t notify_id;
63 uint64_t version;
64
65 OSDService *osd;
66 CancelableContext *cb;
67 ceph::mutex lock = ceph::make_mutex("Notify::lock");
68
69 /// (gid,cookie) -> reply_bl for everyone who acked the notify
70 std::multimap<std::pair<uint64_t,uint64_t>, ceph::buffer::list> notify_replies;
71
72 /// true if this notify is being discarded
73 bool is_discarded() {
74 return discarded || complete;
75 }
76
77 /// Sends notify completion if watchers.empty() or timeout
78 void maybe_complete_notify();
79
80 /// Called on Notify timeout
81 void do_timeout();
82
83 Notify(
84 ConnectionRef client,
85 uint64_t client_gid,
86 ceph::buffer::list& payload,
87 uint32_t timeout,
88 uint64_t cookie,
89 uint64_t notify_id,
90 uint64_t version,
91 OSDService *osd);
92
93 /// registers a timeout callback with the watch_timer
94 void register_cb();
95
96 /// removes the timeout callback, called on completion or cancellation
97 void unregister_cb();
98 public:
99
100 std::ostream& gen_dbg_prefix(std::ostream& out) {
101 return out << "Notify(" << std::make_pair(cookie, notify_id) << " "
102 << " watchers=" << watchers.size()
103 << ") ";
104 }
105 void set_self(NotifyRef _self) {
106 self = _self;
107 }
108 static NotifyRef makeNotifyRef(
109 ConnectionRef client,
110 uint64_t client_gid,
111 ceph::buffer::list &payload,
112 uint32_t timeout,
113 uint64_t cookie,
114 uint64_t notify_id,
115 uint64_t version,
116 OSDService *osd);
117
118 /// Call after creation to initialize
119 void init();
120
121 /// Called once per watcher prior to init()
122 void start_watcher(
123 WatchRef watcher ///< [in] watcher to complete
124 );
125
126 /// Called once per NotifyAck
127 void complete_watcher(
128 WatchRef watcher, ///< [in] watcher to complete
129 ceph::buffer::list& reply_bl ///< [in] reply buffer from the notified watcher
130 );
131 /// Called when a watcher unregisters or times out
132 void complete_watcher_remove(
133 WatchRef watcher ///< [in] watcher to complete
134 );
135
136 /// Called when the notify is canceled due to a new peering interval
137 void discard();
138 };
139
140 /**
141 * Watch is a mapping between a Connection and an ObjectContext
142 *
143 * References are held by ObjectContext and the timeout callback
144 */
145 class HandleWatchTimeout;
146 class HandleDelayedWatchTimeout;
147 class Watch {
148 WWatchRef self;
149 friend class HandleWatchTimeout;
150 friend class HandleDelayedWatchTimeout;
151 ConnectionRef conn;
152 CancelableContext *cb;
153
154 OSDService *osd;
155 boost::intrusive_ptr<PrimaryLogPG> pg;
156 std::shared_ptr<ObjectContext> obc;
157
158 std::map<uint64_t, NotifyRef> in_progress_notifies;
159
160 // Could have watch_info_t here, but this file includes osd_types.h
161 uint32_t timeout; ///< timeout in seconds
162 uint64_t cookie;
163 entity_addr_t addr;
164
165 bool will_ping; ///< is client new enough to ping the watch
166 utime_t last_ping; ///< last client ping
167
168 entity_name_t entity;
169 bool discarded;
170
171 Watch(
172 PrimaryLogPG *pg, OSDService *osd,
173 std::shared_ptr<ObjectContext> obc, uint32_t timeout,
174 uint64_t cookie, entity_name_t entity,
175 const entity_addr_t& addr);
176
177 /// Registers the timeout callback with watch_timer
178 void register_cb();
179
180 /// send a Notify message when connected for notif
181 void send_notify(NotifyRef notif);
182
183 /// Cleans up state on discard or remove (including Connection state, obc)
184 void discard_state();
185 public:
186 /// Unregisters the timeout callback
187 void unregister_cb();
188
189 /// note receipt of a ping
190 void got_ping(utime_t t);
191 utime_t get_last_ping() const {
192 return last_ping;
193 }
194
195 /// True if currently connected
196 bool is_connected() const {
197 return conn.get() != NULL;
198 }
199 bool is_connected(Connection *con) const {
200 return conn.get() == con;
201 }
202
203 /// NOTE: must be called with pg lock held
204 ~Watch();
205
206 uint64_t get_watcher_gid() const {
207 return entity.num();
208 }
209
210 std::ostream& gen_dbg_prefix(std::ostream& out);
211 static WatchRef makeWatchRef(
212 PrimaryLogPG *pg, OSDService *osd,
213 std::shared_ptr<ObjectContext> obc, uint32_t timeout, uint64_t cookie, entity_name_t entity, const entity_addr_t &addr);
214 void set_self(WatchRef _self) {
215 self = _self;
216 }
217
218 /// Does not grant a ref count!
219 boost::intrusive_ptr<PrimaryLogPG> get_pg() { return pg; }
220
221 std::shared_ptr<ObjectContext> get_obc() { return obc; }
222
223 uint64_t get_cookie() const { return cookie; }
224 entity_name_t get_entity() const { return entity; }
225 entity_addr_t get_peer_addr() const { return addr; }
226 uint32_t get_timeout() const { return timeout; }
227
228 /// Generates context for use if watch timeout is delayed by scrub or recovery
229 Context *get_delayed_cb();
230
231 /// Transitions Watch to connected, unregister_cb, resends pending Notifies
232 void connect(
233 ConnectionRef con, ///< [in] Reference to new connection
234 bool will_ping ///< [in] client is new and will send pings
235 );
236
237 /// Transitions watch to disconnected, register_cb
238 void disconnect();
239
240 /// Called if Watch state is discarded due to new peering interval
241 void discard();
242
243 /// True if removed or discarded
244 bool is_discarded() const;
245
246 /// Called on unwatch
247 void remove(bool send_disconnect);
248
249 /// Adds notif as in-progress notify
250 void start_notify(
251 NotifyRef notif ///< [in] Reference to new in-progress notify
252 );
253
254 /// Removes timed out notify
255 void cancel_notify(
256 NotifyRef notif ///< [in] notify which timed out
257 );
258
259 /// Call when notify_ack received on notify_id
260 void notify_ack(
261 uint64_t notify_id, ///< [in] id of acked notify
262 ceph::buffer::list& reply_bl ///< [in] notify reply buffer
263 );
264 };
265
266 /**
267 * Holds weak refs to Watch structures corresponding to a connection
268 * Lives in the Session object of an OSD connection
269 */
270 class WatchConState {
271 ceph::mutex lock = ceph::make_mutex("WatchConState");
272 std::set<WatchRef> watches;
273 public:
274 CephContext* cct;
275 explicit WatchConState(CephContext* cct) : cct(cct) {}
276
277 /// Add a watch
278 void addWatch(
279 WatchRef watch ///< [in] Ref to new watch object
280 );
281
282 /// Remove a watch
283 void removeWatch(
284 WatchRef watch ///< [in] Ref to watch object to remove
285 );
286
287 /// Called on session reset, disconnects watchers
288 void reset(Connection *con);
289 };
290
291 #endif