]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | #include "PG.h" | |
3 | ||
4 | #include "include/types.h" | |
5 | #include "messages/MWatchNotify.h" | |
6 | ||
7 | #include <map> | |
8 | ||
9 | #include "OSD.h" | |
10 | #include "PrimaryLogPG.h" | |
11 | #include "Watch.h" | |
12 | #include "Session.h" | |
13 | ||
14 | #include "common/config.h" | |
15 | ||
16 | struct CancelableContext : public Context { | |
17 | virtual void cancel() = 0; | |
18 | }; | |
19 | ||
20 | #define dout_context osd->cct | |
21 | #define dout_subsys ceph_subsys_osd | |
22 | #undef dout_prefix | |
23 | #define dout_prefix _prefix(_dout, this) | |
24 | ||
25 | static ostream& _prefix( | |
26 | std::ostream* _dout, | |
27 | Notify *notify) { | |
28 | return *_dout << notify->gen_dbg_prefix(); | |
29 | } | |
30 | ||
31 | Notify::Notify( | |
32 | ConnectionRef client, | |
33 | uint64_t client_gid, | |
34 | bufferlist &payload, | |
35 | uint32_t timeout, | |
36 | uint64_t cookie, | |
37 | uint64_t notify_id, | |
38 | uint64_t version, | |
39 | OSDService *osd) | |
40 | : client(client), client_gid(client_gid), | |
41 | complete(false), | |
42 | discarded(false), | |
43 | timed_out(false), | |
44 | payload(payload), | |
45 | timeout(timeout), | |
46 | cookie(cookie), | |
47 | notify_id(notify_id), | |
48 | version(version), | |
49 | osd(osd), | |
50 | cb(NULL), | |
51 | lock("Notify::lock") {} | |
52 | ||
53 | NotifyRef Notify::makeNotifyRef( | |
54 | ConnectionRef client, | |
55 | uint64_t client_gid, | |
56 | bufferlist &payload, | |
57 | uint32_t timeout, | |
58 | uint64_t cookie, | |
59 | uint64_t notify_id, | |
60 | uint64_t version, | |
61 | OSDService *osd) { | |
62 | NotifyRef ret( | |
63 | new Notify( | |
64 | client, client_gid, | |
65 | payload, timeout, | |
66 | cookie, notify_id, | |
67 | version, osd)); | |
68 | ret->set_self(ret); | |
69 | return ret; | |
70 | } | |
71 | ||
72 | class NotifyTimeoutCB : public CancelableContext { | |
73 | NotifyRef notif; | |
74 | bool canceled; // protected by notif lock | |
75 | public: | |
76 | explicit NotifyTimeoutCB(NotifyRef notif) : notif(notif), canceled(false) {} | |
77 | void finish(int) override { | |
78 | notif->osd->watch_lock.Unlock(); | |
79 | notif->lock.Lock(); | |
80 | if (!canceled) | |
81 | notif->do_timeout(); // drops lock | |
82 | else | |
83 | notif->lock.Unlock(); | |
84 | notif->osd->watch_lock.Lock(); | |
85 | } | |
86 | void cancel() override { | |
87 | assert(notif->lock.is_locked_by_me()); | |
88 | canceled = true; | |
89 | } | |
90 | }; | |
91 | ||
92 | void Notify::do_timeout() | |
93 | { | |
94 | assert(lock.is_locked_by_me()); | |
95 | dout(10) << "timeout" << dendl; | |
96 | cb = NULL; | |
97 | if (is_discarded()) { | |
98 | lock.Unlock(); | |
99 | return; | |
100 | } | |
101 | ||
102 | timed_out = true; // we will send the client an error code | |
103 | maybe_complete_notify(); | |
104 | assert(complete); | |
105 | set<WatchRef> _watchers; | |
106 | _watchers.swap(watchers); | |
107 | lock.Unlock(); | |
108 | ||
109 | for (set<WatchRef>::iterator i = _watchers.begin(); | |
110 | i != _watchers.end(); | |
111 | ++i) { | |
112 | boost::intrusive_ptr<PrimaryLogPG> pg((*i)->get_pg()); | |
113 | pg->lock(); | |
114 | if (!(*i)->is_discarded()) { | |
115 | (*i)->cancel_notify(self.lock()); | |
116 | } | |
117 | pg->unlock(); | |
118 | } | |
119 | } | |
120 | ||
121 | void Notify::register_cb() | |
122 | { | |
123 | assert(lock.is_locked_by_me()); | |
124 | { | |
125 | osd->watch_lock.Lock(); | |
126 | cb = new NotifyTimeoutCB(self.lock()); | |
127 | osd->watch_timer.add_event_after( | |
128 | timeout, | |
129 | cb); | |
130 | osd->watch_lock.Unlock(); | |
131 | } | |
132 | } | |
133 | ||
134 | void Notify::unregister_cb() | |
135 | { | |
136 | assert(lock.is_locked_by_me()); | |
137 | if (!cb) | |
138 | return; | |
139 | cb->cancel(); | |
140 | { | |
141 | osd->watch_lock.Lock(); | |
142 | osd->watch_timer.cancel_event(cb); | |
143 | cb = NULL; | |
144 | osd->watch_lock.Unlock(); | |
145 | } | |
146 | } | |
147 | ||
148 | void Notify::start_watcher(WatchRef watch) | |
149 | { | |
150 | Mutex::Locker l(lock); | |
151 | dout(10) << "start_watcher" << dendl; | |
152 | watchers.insert(watch); | |
153 | } | |
154 | ||
155 | void Notify::complete_watcher(WatchRef watch, bufferlist& reply_bl) | |
156 | { | |
157 | Mutex::Locker l(lock); | |
158 | dout(10) << "complete_watcher" << dendl; | |
159 | if (is_discarded()) | |
160 | return; | |
161 | assert(watchers.count(watch)); | |
162 | watchers.erase(watch); | |
163 | notify_replies.insert(make_pair(make_pair(watch->get_watcher_gid(), | |
164 | watch->get_cookie()), | |
165 | reply_bl)); | |
166 | maybe_complete_notify(); | |
167 | } | |
168 | ||
169 | void Notify::complete_watcher_remove(WatchRef watch) | |
170 | { | |
171 | Mutex::Locker l(lock); | |
172 | dout(10) << __func__ << dendl; | |
173 | if (is_discarded()) | |
174 | return; | |
175 | assert(watchers.count(watch)); | |
176 | watchers.erase(watch); | |
177 | maybe_complete_notify(); | |
178 | } | |
179 | ||
180 | void Notify::maybe_complete_notify() | |
181 | { | |
182 | dout(10) << "maybe_complete_notify -- " | |
183 | << watchers.size() | |
184 | << " in progress watchers " << dendl; | |
185 | if (watchers.empty() || timed_out) { | |
186 | // prepare reply | |
187 | bufferlist bl; | |
188 | ::encode(notify_replies, bl); | |
189 | list<pair<uint64_t,uint64_t> > missed; | |
190 | for (set<WatchRef>::iterator p = watchers.begin(); p != watchers.end(); ++p) { | |
191 | missed.push_back(make_pair((*p)->get_watcher_gid(), | |
192 | (*p)->get_cookie())); | |
193 | } | |
194 | ::encode(missed, bl); | |
195 | ||
196 | bufferlist empty; | |
197 | MWatchNotify *reply(new MWatchNotify(cookie, version, notify_id, | |
198 | CEPH_WATCH_EVENT_NOTIFY_COMPLETE, empty)); | |
199 | reply->notifier_gid = client_gid; | |
200 | reply->set_data(bl); | |
201 | if (timed_out) | |
202 | reply->return_code = -ETIMEDOUT; | |
203 | client->send_message(reply); | |
204 | unregister_cb(); | |
205 | ||
206 | complete = true; | |
207 | } | |
208 | } | |
209 | ||
210 | void Notify::discard() | |
211 | { | |
212 | Mutex::Locker l(lock); | |
213 | discarded = true; | |
214 | unregister_cb(); | |
215 | watchers.clear(); | |
216 | } | |
217 | ||
218 | void Notify::init() | |
219 | { | |
220 | Mutex::Locker l(lock); | |
221 | register_cb(); | |
222 | maybe_complete_notify(); | |
223 | } | |
224 | ||
225 | #define dout_subsys ceph_subsys_osd | |
226 | #undef dout_prefix | |
227 | #define dout_prefix _prefix(_dout, watch.get()) | |
228 | ||
229 | static ostream& _prefix( | |
230 | std::ostream* _dout, | |
231 | Watch *watch) { | |
232 | return *_dout << watch->gen_dbg_prefix(); | |
233 | } | |
234 | ||
235 | class HandleWatchTimeout : public CancelableContext { | |
236 | WatchRef watch; | |
237 | public: | |
238 | bool canceled; // protected by watch->pg->lock | |
239 | explicit HandleWatchTimeout(WatchRef watch) : watch(watch), canceled(false) {} | |
240 | void cancel() override { | |
241 | canceled = true; | |
242 | } | |
243 | void finish(int) override { ceph_abort(); /* not used */ } | |
244 | void complete(int) override { | |
245 | OSDService *osd(watch->osd); | |
246 | ldout(osd->cct, 10) << "HandleWatchTimeout" << dendl; | |
247 | boost::intrusive_ptr<PrimaryLogPG> pg(watch->pg); | |
248 | osd->watch_lock.Unlock(); | |
249 | pg->lock(); | |
250 | watch->cb = NULL; | |
251 | if (!watch->is_discarded() && !canceled) | |
252 | watch->pg->handle_watch_timeout(watch); | |
253 | delete this; // ~Watch requires pg lock! | |
254 | pg->unlock(); | |
255 | osd->watch_lock.Lock(); | |
256 | } | |
257 | }; | |
258 | ||
259 | class HandleDelayedWatchTimeout : public CancelableContext { | |
260 | WatchRef watch; | |
261 | public: | |
262 | bool canceled; | |
263 | explicit HandleDelayedWatchTimeout(WatchRef watch) : watch(watch), canceled(false) {} | |
264 | void cancel() override { | |
265 | canceled = true; | |
266 | } | |
267 | void finish(int) override { | |
268 | OSDService *osd(watch->osd); | |
269 | dout(10) << "HandleWatchTimeoutDelayed" << dendl; | |
270 | assert(watch->pg->is_locked()); | |
271 | watch->cb = NULL; | |
272 | if (!watch->is_discarded() && !canceled) | |
273 | watch->pg->handle_watch_timeout(watch); | |
274 | } | |
275 | }; | |
276 | ||
277 | #define dout_subsys ceph_subsys_osd | |
278 | #undef dout_prefix | |
279 | #define dout_prefix _prefix(_dout, this) | |
280 | ||
281 | string Watch::gen_dbg_prefix() { | |
282 | stringstream ss; | |
283 | ss << pg->gen_prefix() << " -- Watch(" | |
284 | << make_pair(cookie, entity) << ") "; | |
285 | return ss.str(); | |
286 | } | |
287 | ||
288 | Watch::Watch( | |
289 | PrimaryLogPG *pg, | |
290 | OSDService *osd, | |
291 | ObjectContextRef obc, | |
292 | uint32_t timeout, | |
293 | uint64_t cookie, | |
294 | entity_name_t entity, | |
295 | const entity_addr_t &addr) | |
296 | : cb(NULL), | |
297 | osd(osd), | |
298 | pg(pg), | |
299 | obc(obc), | |
300 | timeout(timeout), | |
301 | cookie(cookie), | |
302 | addr(addr), | |
303 | will_ping(false), | |
304 | entity(entity), | |
305 | discarded(false) { | |
306 | dout(10) << "Watch()" << dendl; | |
307 | } | |
308 | ||
309 | Watch::~Watch() { | |
310 | dout(10) << "~Watch" << dendl; | |
311 | // users must have called remove() or discard() prior to this point | |
312 | assert(!obc); | |
313 | assert(!conn); | |
314 | } | |
315 | ||
316 | bool Watch::connected() { return !!conn; } | |
317 | ||
318 | Context *Watch::get_delayed_cb() | |
319 | { | |
320 | assert(!cb); | |
321 | cb = new HandleDelayedWatchTimeout(self.lock()); | |
322 | return cb; | |
323 | } | |
324 | ||
325 | void Watch::register_cb() | |
326 | { | |
327 | Mutex::Locker l(osd->watch_lock); | |
328 | if (cb) { | |
329 | dout(15) << "re-registering callback, timeout: " << timeout << dendl; | |
330 | cb->cancel(); | |
331 | osd->watch_timer.cancel_event(cb); | |
332 | } else { | |
333 | dout(15) << "registering callback, timeout: " << timeout << dendl; | |
334 | } | |
335 | cb = new HandleWatchTimeout(self.lock()); | |
336 | osd->watch_timer.add_event_after( | |
337 | timeout, | |
338 | cb); | |
339 | } | |
340 | ||
341 | void Watch::unregister_cb() | |
342 | { | |
343 | dout(15) << "unregister_cb" << dendl; | |
344 | if (!cb) | |
345 | return; | |
346 | dout(15) << "actually registered, cancelling" << dendl; | |
347 | cb->cancel(); | |
348 | { | |
349 | Mutex::Locker l(osd->watch_lock); | |
350 | osd->watch_timer.cancel_event(cb); // harmless if not registered with timer | |
351 | } | |
352 | cb = NULL; | |
353 | } | |
354 | ||
355 | void Watch::got_ping(utime_t t) | |
356 | { | |
357 | last_ping = t; | |
358 | if (conn) { | |
359 | register_cb(); | |
360 | } | |
361 | } | |
362 | ||
363 | void Watch::connect(ConnectionRef con, bool _will_ping) | |
364 | { | |
365 | if (conn == con) { | |
366 | dout(10) << __func__ << " con " << con << " - already connected" << dendl; | |
367 | return; | |
368 | } | |
369 | dout(10) << __func__ << " con " << con << dendl; | |
370 | conn = con; | |
371 | will_ping = _will_ping; | |
372 | Session* sessionref(static_cast<Session*>(con->get_priv())); | |
373 | if (sessionref) { | |
374 | sessionref->wstate.addWatch(self.lock()); | |
375 | sessionref->put(); | |
376 | for (map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.begin(); | |
377 | i != in_progress_notifies.end(); | |
378 | ++i) { | |
379 | send_notify(i->second); | |
380 | } | |
381 | } | |
382 | if (will_ping) { | |
383 | last_ping = ceph_clock_now(); | |
384 | register_cb(); | |
385 | } else { | |
386 | unregister_cb(); | |
387 | } | |
388 | } | |
389 | ||
390 | void Watch::disconnect() | |
391 | { | |
392 | dout(10) << "disconnect (con was " << conn << ")" << dendl; | |
393 | conn = ConnectionRef(); | |
394 | if (!will_ping) | |
395 | register_cb(); | |
396 | } | |
397 | ||
398 | void Watch::discard() | |
399 | { | |
400 | dout(10) << "discard" << dendl; | |
401 | for (map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.begin(); | |
402 | i != in_progress_notifies.end(); | |
403 | ++i) { | |
404 | i->second->discard(); | |
405 | } | |
406 | discard_state(); | |
407 | } | |
408 | ||
409 | void Watch::discard_state() | |
410 | { | |
411 | assert(pg->is_locked()); | |
412 | assert(!discarded); | |
413 | assert(obc); | |
414 | in_progress_notifies.clear(); | |
415 | unregister_cb(); | |
416 | discarded = true; | |
417 | if (conn) { | |
418 | Session* sessionref(static_cast<Session*>(conn->get_priv())); | |
419 | if (sessionref) { | |
420 | sessionref->wstate.removeWatch(self.lock()); | |
421 | sessionref->put(); | |
422 | } | |
423 | conn = ConnectionRef(); | |
424 | } | |
425 | obc = ObjectContextRef(); | |
426 | } | |
427 | ||
428 | bool Watch::is_discarded() const | |
429 | { | |
430 | return discarded; | |
431 | } | |
432 | ||
433 | void Watch::remove(bool send_disconnect) | |
434 | { | |
435 | dout(10) << "remove" << dendl; | |
436 | if (send_disconnect && conn) { | |
437 | bufferlist empty; | |
438 | MWatchNotify *reply(new MWatchNotify(cookie, 0, 0, | |
439 | CEPH_WATCH_EVENT_DISCONNECT, empty)); | |
440 | conn->send_message(reply); | |
441 | } | |
442 | for (map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.begin(); | |
443 | i != in_progress_notifies.end(); | |
444 | ++i) { | |
445 | i->second->complete_watcher_remove(self.lock()); | |
446 | } | |
447 | discard_state(); | |
448 | } | |
449 | ||
450 | void Watch::start_notify(NotifyRef notif) | |
451 | { | |
452 | assert(in_progress_notifies.find(notif->notify_id) == | |
453 | in_progress_notifies.end()); | |
454 | if (will_ping) { | |
455 | utime_t cutoff = ceph_clock_now(); | |
456 | cutoff.sec_ref() -= timeout; | |
457 | if (last_ping < cutoff) { | |
458 | dout(10) << __func__ << " " << notif->notify_id | |
459 | << " last_ping " << last_ping << " < cutoff " << cutoff | |
460 | << ", disconnecting" << dendl; | |
461 | disconnect(); | |
462 | return; | |
463 | } | |
464 | } | |
465 | dout(10) << "start_notify " << notif->notify_id << dendl; | |
466 | in_progress_notifies[notif->notify_id] = notif; | |
467 | notif->start_watcher(self.lock()); | |
468 | if (connected()) | |
469 | send_notify(notif); | |
470 | } | |
471 | ||
472 | void Watch::cancel_notify(NotifyRef notif) | |
473 | { | |
474 | dout(10) << "cancel_notify " << notif->notify_id << dendl; | |
475 | in_progress_notifies.erase(notif->notify_id); | |
476 | } | |
477 | ||
478 | void Watch::send_notify(NotifyRef notif) | |
479 | { | |
480 | dout(10) << "send_notify" << dendl; | |
481 | MWatchNotify *notify_msg = new MWatchNotify( | |
482 | cookie, notif->version, notif->notify_id, | |
483 | CEPH_WATCH_EVENT_NOTIFY, notif->payload); | |
484 | notify_msg->notifier_gid = notif->client_gid; | |
485 | conn->send_message(notify_msg); | |
486 | } | |
487 | ||
488 | void Watch::notify_ack(uint64_t notify_id, bufferlist& reply_bl) | |
489 | { | |
490 | dout(10) << "notify_ack" << dendl; | |
491 | map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.find(notify_id); | |
492 | if (i != in_progress_notifies.end()) { | |
493 | i->second->complete_watcher(self.lock(), reply_bl); | |
494 | in_progress_notifies.erase(i); | |
495 | } | |
496 | } | |
497 | ||
498 | WatchRef Watch::makeWatchRef( | |
499 | PrimaryLogPG *pg, OSDService *osd, | |
500 | ObjectContextRef obc, uint32_t timeout, uint64_t cookie, entity_name_t entity, const entity_addr_t& addr) | |
501 | { | |
502 | WatchRef ret(new Watch(pg, osd, obc, timeout, cookie, entity, addr)); | |
503 | ret->set_self(ret); | |
504 | return ret; | |
505 | } | |
506 | ||
507 | void WatchConState::addWatch(WatchRef watch) | |
508 | { | |
509 | Mutex::Locker l(lock); | |
510 | watches.insert(watch); | |
511 | } | |
512 | ||
513 | void WatchConState::removeWatch(WatchRef watch) | |
514 | { | |
515 | Mutex::Locker l(lock); | |
516 | watches.erase(watch); | |
517 | } | |
518 | ||
519 | void WatchConState::reset(Connection *con) | |
520 | { | |
521 | set<WatchRef> _watches; | |
522 | { | |
523 | Mutex::Locker l(lock); | |
524 | _watches.swap(watches); | |
525 | } | |
526 | for (set<WatchRef>::iterator i = _watches.begin(); | |
527 | i != _watches.end(); | |
528 | ++i) { | |
529 | boost::intrusive_ptr<PrimaryLogPG> pg((*i)->get_pg()); | |
530 | pg->lock(); | |
531 | if (!(*i)->is_discarded()) { | |
532 | if ((*i)->is_connected(con)) { | |
533 | (*i)->disconnect(); | |
534 | } else { | |
535 | lgeneric_derr(cct) << __func__ << " not still connected to " << (*i) << dendl; | |
536 | } | |
537 | } | |
538 | pg->unlock(); | |
539 | } | |
540 | } |