]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | #ifndef CEPH_WATCH_H | |
15 | #define CEPH_WATCH_H | |
16 | ||
17 | #include "include/memory.h" | |
18 | #include <set> | |
19 | ||
20 | #include "msg/Messenger.h" | |
21 | #include "include/Context.h" | |
22 | ||
23 | enum WatcherState { | |
24 | WATCHER_PENDING, | |
25 | WATCHER_NOTIFIED, | |
26 | }; | |
27 | ||
28 | class OSDService; | |
29 | class PrimaryLogPG; | |
30 | void intrusive_ptr_add_ref(PrimaryLogPG *pg); | |
31 | void intrusive_ptr_release(PrimaryLogPG *pg); | |
32 | struct ObjectContext; | |
33 | class MWatchNotify; | |
34 | ||
35 | class Watch; | |
36 | typedef ceph::shared_ptr<Watch> WatchRef; | |
37 | typedef ceph::weak_ptr<Watch> WWatchRef; | |
38 | ||
39 | class Notify; | |
40 | typedef ceph::shared_ptr<Notify> NotifyRef; | |
41 | typedef ceph::weak_ptr<Notify> WNotifyRef; | |
42 | ||
43 | struct CancelableContext; | |
44 | ||
45 | /** | |
46 | * Notify tracks the progress of a particular notify | |
47 | * | |
48 | * References are held by Watch and the timeout callback. | |
49 | */ | |
50 | class Notify { | |
51 | friend class NotifyTimeoutCB; | |
52 | friend class Watch; | |
53 | WNotifyRef self; | |
54 | ConnectionRef client; | |
55 | uint64_t client_gid; | |
56 | bool complete; | |
57 | bool discarded; | |
58 | bool timed_out; ///< true if the notify timed out | |
59 | set<WatchRef> watchers; | |
60 | ||
61 | bufferlist payload; | |
62 | uint32_t timeout; | |
63 | uint64_t cookie; | |
64 | uint64_t notify_id; | |
65 | uint64_t version; | |
66 | ||
67 | OSDService *osd; | |
68 | CancelableContext *cb; | |
69 | Mutex lock; | |
70 | ||
71 | /// (gid,cookie) -> reply_bl for everyone who acked the notify | |
72 | multimap<pair<uint64_t,uint64_t>,bufferlist> notify_replies; | |
73 | ||
74 | /// true if this notify is being discarded | |
75 | bool is_discarded() { | |
76 | return discarded || complete; | |
77 | } | |
78 | ||
79 | /// Sends notify completion if watchers.empty() or timeout | |
80 | void maybe_complete_notify(); | |
81 | ||
82 | /// Called on Notify timeout | |
83 | void do_timeout(); | |
84 | ||
85 | Notify( | |
86 | ConnectionRef client, | |
87 | uint64_t client_gid, | |
88 | bufferlist &payload, | |
89 | uint32_t timeout, | |
90 | uint64_t cookie, | |
91 | uint64_t notify_id, | |
92 | uint64_t version, | |
93 | OSDService *osd); | |
94 | ||
95 | /// registers a timeout callback with the watch_timer | |
96 | void register_cb(); | |
97 | ||
98 | /// removes the timeout callback, called on completion or cancellation | |
99 | void unregister_cb(); | |
100 | public: | |
101 | ||
102 | string gen_dbg_prefix() { | |
103 | stringstream ss; | |
104 | ss << "Notify(" << make_pair(cookie, notify_id) << " " | |
105 | << " watchers=" << watchers.size() | |
106 | << ") "; | |
107 | return ss.str(); | |
108 | } | |
109 | void set_self(NotifyRef _self) { | |
110 | self = _self; | |
111 | } | |
112 | static NotifyRef makeNotifyRef( | |
113 | ConnectionRef client, | |
114 | uint64_t client_gid, | |
115 | bufferlist &payload, | |
116 | uint32_t timeout, | |
117 | uint64_t cookie, | |
118 | uint64_t notify_id, | |
119 | uint64_t version, | |
120 | OSDService *osd); | |
121 | ||
122 | /// Call after creation to initialize | |
123 | void init(); | |
124 | ||
125 | /// Called once per watcher prior to init() | |
126 | void start_watcher( | |
127 | WatchRef watcher ///< [in] watcher to complete | |
128 | ); | |
129 | ||
130 | /// Called once per NotifyAck | |
131 | void complete_watcher( | |
132 | WatchRef watcher, ///< [in] watcher to complete | |
133 | bufferlist& reply_bl ///< [in] reply buffer from the notified watcher | |
134 | ); | |
135 | /// Called when a watcher unregisters or times out | |
136 | void complete_watcher_remove( | |
137 | WatchRef watcher ///< [in] watcher to complete | |
138 | ); | |
139 | ||
140 | /// Called when the notify is canceled due to a new peering interval | |
141 | void discard(); | |
142 | }; | |
143 | ||
144 | /** | |
145 | * Watch is a mapping between a Connection and an ObjectContext | |
146 | * | |
147 | * References are held by ObjectContext and the timeout callback | |
148 | */ | |
149 | class HandleWatchTimeout; | |
150 | class HandleDelayedWatchTimeout; | |
151 | class Watch { | |
152 | WWatchRef self; | |
153 | friend class HandleWatchTimeout; | |
154 | friend class HandleDelayedWatchTimeout; | |
155 | ConnectionRef conn; | |
156 | CancelableContext *cb; | |
157 | ||
158 | OSDService *osd; | |
159 | boost::intrusive_ptr<PrimaryLogPG> pg; | |
160 | ceph::shared_ptr<ObjectContext> obc; | |
161 | ||
162 | std::map<uint64_t, NotifyRef> in_progress_notifies; | |
163 | ||
164 | // Could have watch_info_t here, but this file includes osd_types.h | |
165 | uint32_t timeout; ///< timeout in seconds | |
166 | uint64_t cookie; | |
167 | entity_addr_t addr; | |
168 | ||
169 | bool will_ping; ///< is client new enough to ping the watch | |
170 | utime_t last_ping; ///< last cilent ping | |
171 | ||
172 | entity_name_t entity; | |
173 | bool discarded; | |
174 | ||
175 | Watch( | |
176 | PrimaryLogPG *pg, OSDService *osd, | |
177 | ceph::shared_ptr<ObjectContext> obc, uint32_t timeout, | |
178 | uint64_t cookie, entity_name_t entity, | |
179 | const entity_addr_t& addr); | |
180 | ||
181 | /// Registers the timeout callback with watch_timer | |
182 | void register_cb(); | |
183 | ||
184 | /// send a Notify message when connected for notif | |
185 | void send_notify(NotifyRef notif); | |
186 | ||
187 | /// Cleans up state on discard or remove (including Connection state, obc) | |
188 | void discard_state(); | |
189 | public: | |
190 | /// Unregisters the timeout callback | |
191 | void unregister_cb(); | |
192 | ||
193 | /// note receipt of a ping | |
194 | void got_ping(utime_t t); | |
195 | utime_t get_last_ping() const { | |
196 | return last_ping; | |
197 | } | |
198 | ||
199 | bool is_connected() const { | |
200 | return conn.get() != NULL; | |
201 | } | |
202 | bool is_connected(Connection *con) const { | |
203 | return conn.get() == con; | |
204 | } | |
205 | ||
206 | /// NOTE: must be called with pg lock held | |
207 | ~Watch(); | |
208 | ||
209 | uint64_t get_watcher_gid() const { | |
210 | return entity.num(); | |
211 | } | |
212 | ||
213 | string gen_dbg_prefix(); | |
214 | static WatchRef makeWatchRef( | |
215 | PrimaryLogPG *pg, OSDService *osd, | |
216 | ceph::shared_ptr<ObjectContext> obc, uint32_t timeout, uint64_t cookie, entity_name_t entity, const entity_addr_t &addr); | |
217 | void set_self(WatchRef _self) { | |
218 | self = _self; | |
219 | } | |
220 | ||
221 | /// Does not grant a ref count! | |
222 | boost::intrusive_ptr<PrimaryLogPG> get_pg() { return pg; } | |
223 | ||
224 | ceph::shared_ptr<ObjectContext> get_obc() { return obc; } | |
225 | ||
226 | uint64_t get_cookie() const { return cookie; } | |
227 | entity_name_t get_entity() const { return entity; } | |
228 | entity_addr_t get_peer_addr() const { return addr; } | |
229 | uint32_t get_timeout() const { return timeout; } | |
230 | ||
231 | /// Generates context for use if watch timeout is delayed by scrub or recovery | |
232 | Context *get_delayed_cb(); | |
233 | ||
234 | /// True if currently connected | |
235 | bool connected(); | |
236 | ||
237 | /// Transitions Watch to connected, unregister_cb, resends pending Notifies | |
238 | void connect( | |
239 | ConnectionRef con, ///< [in] Reference to new connection | |
240 | bool will_ping ///< [in] client is new and will send pings | |
241 | ); | |
242 | ||
243 | /// Transitions watch to disconnected, register_cb | |
244 | void disconnect(); | |
245 | ||
246 | /// Called if Watch state is discarded due to new peering interval | |
247 | void discard(); | |
248 | ||
249 | /// True if removed or discarded | |
250 | bool is_discarded() const; | |
251 | ||
252 | /// Called on unwatch | |
253 | void remove(bool send_disconnect); | |
254 | ||
255 | /// Adds notif as in-progress notify | |
256 | void start_notify( | |
257 | NotifyRef notif ///< [in] Reference to new in-progress notify | |
258 | ); | |
259 | ||
260 | /// Removes timed out notify | |
261 | void cancel_notify( | |
262 | NotifyRef notif ///< [in] notify which timed out | |
263 | ); | |
264 | ||
265 | /// Call when notify_ack received on notify_id | |
266 | void notify_ack( | |
267 | uint64_t notify_id, ///< [in] id of acked notify | |
268 | bufferlist& reply_bl ///< [in] notify reply buffer | |
269 | ); | |
270 | }; | |
271 | ||
272 | /** | |
273 | * Holds weak refs to Watch structures corresponding to a connection | |
274 | * Lives in the Session object of an OSD connection | |
275 | */ | |
276 | class WatchConState { | |
277 | Mutex lock; | |
278 | std::set<WatchRef> watches; | |
279 | public: | |
280 | CephContext* cct; | |
281 | WatchConState(CephContext* cct) : lock("WatchConState"), cct(cct) {} | |
282 | ||
283 | /// Add a watch | |
284 | void addWatch( | |
285 | WatchRef watch ///< [in] Ref to new watch object | |
286 | ); | |
287 | ||
288 | /// Remove a watch | |
289 | void removeWatch( | |
290 | WatchRef watch ///< [in] Ref to watch object to remove | |
291 | ); | |
292 | ||
293 | /// Called on session reset, disconnects watchers | |
294 | void reset(Connection *con); | |
295 | }; | |
296 | ||
297 | #endif |