]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | #include "PG.h" | |
3 | ||
4 | #include "include/types.h" | |
5 | #include "messages/MWatchNotify.h" | |
6 | ||
7 | #include <map> | |
8 | ||
9 | #include "OSD.h" | |
10 | #include "PrimaryLogPG.h" | |
11 | #include "Watch.h" | |
12 | #include "Session.h" | |
13 | ||
14 | #include "common/config.h" | |
15 | ||
7c673cae FG |
16 | #define dout_context osd->cct |
17 | #define dout_subsys ceph_subsys_osd | |
18 | #undef dout_prefix | |
19 | #define dout_prefix _prefix(_dout, this) | |
20 | ||
f67539c2 TL |
21 | using std::list; |
22 | using std::make_pair; | |
23 | using std::pair; | |
24 | using std::ostream; | |
25 | using std::set; | |
20effc67 | 26 | using std::vector; |
f67539c2 TL |
27 | |
28 | using ceph::bufferlist; | |
29 | using ceph::decode; | |
30 | using ceph::encode; | |
31 | ||
32 | struct CancelableContext : public Context { | |
33 | virtual void cancel() = 0; | |
34 | }; | |
35 | ||
36 | ||
7c673cae FG |
37 | static ostream& _prefix( |
38 | std::ostream* _dout, | |
39 | Notify *notify) { | |
11fdf7f2 | 40 | return notify->gen_dbg_prefix(*_dout); |
7c673cae FG |
41 | } |
42 | ||
43 | Notify::Notify( | |
44 | ConnectionRef client, | |
45 | uint64_t client_gid, | |
46 | bufferlist &payload, | |
47 | uint32_t timeout, | |
48 | uint64_t cookie, | |
49 | uint64_t notify_id, | |
50 | uint64_t version, | |
51 | OSDService *osd) | |
9f95a23c TL |
52 | : client(client), |
53 | client_gid(client_gid), | |
7c673cae FG |
54 | complete(false), |
55 | discarded(false), | |
56 | timed_out(false), | |
57 | payload(payload), | |
58 | timeout(timeout), | |
59 | cookie(cookie), | |
60 | notify_id(notify_id), | |
61 | version(version), | |
62 | osd(osd), | |
9f95a23c | 63 | cb(nullptr) {} |
7c673cae FG |
64 | |
65 | NotifyRef Notify::makeNotifyRef( | |
66 | ConnectionRef client, | |
67 | uint64_t client_gid, | |
68 | bufferlist &payload, | |
69 | uint32_t timeout, | |
70 | uint64_t cookie, | |
71 | uint64_t notify_id, | |
72 | uint64_t version, | |
73 | OSDService *osd) { | |
74 | NotifyRef ret( | |
75 | new Notify( | |
76 | client, client_gid, | |
77 | payload, timeout, | |
78 | cookie, notify_id, | |
79 | version, osd)); | |
80 | ret->set_self(ret); | |
81 | return ret; | |
82 | } | |
83 | ||
84 | class NotifyTimeoutCB : public CancelableContext { | |
85 | NotifyRef notif; | |
86 | bool canceled; // protected by notif lock | |
87 | public: | |
88 | explicit NotifyTimeoutCB(NotifyRef notif) : notif(notif), canceled(false) {} | |
89 | void finish(int) override { | |
9f95a23c TL |
90 | notif->osd->watch_lock.unlock(); |
91 | notif->lock.lock(); | |
7c673cae FG |
92 | if (!canceled) |
93 | notif->do_timeout(); // drops lock | |
94 | else | |
9f95a23c TL |
95 | notif->lock.unlock(); |
96 | notif->osd->watch_lock.lock(); | |
7c673cae FG |
97 | } |
98 | void cancel() override { | |
9f95a23c | 99 | ceph_assert(ceph_mutex_is_locked(notif->lock)); |
7c673cae FG |
100 | canceled = true; |
101 | } | |
102 | }; | |
103 | ||
104 | void Notify::do_timeout() | |
105 | { | |
9f95a23c | 106 | ceph_assert(ceph_mutex_is_locked(lock)); |
7c673cae | 107 | dout(10) << "timeout" << dendl; |
11fdf7f2 | 108 | cb = nullptr; |
7c673cae | 109 | if (is_discarded()) { |
9f95a23c | 110 | lock.unlock(); |
7c673cae FG |
111 | return; |
112 | } | |
113 | ||
114 | timed_out = true; // we will send the client an error code | |
115 | maybe_complete_notify(); | |
11fdf7f2 | 116 | ceph_assert(complete); |
7c673cae FG |
117 | set<WatchRef> _watchers; |
118 | _watchers.swap(watchers); | |
9f95a23c | 119 | lock.unlock(); |
7c673cae | 120 | |
f67539c2 | 121 | for (auto i = _watchers.begin(); i != _watchers.end(); ++i) { |
7c673cae FG |
122 | boost::intrusive_ptr<PrimaryLogPG> pg((*i)->get_pg()); |
123 | pg->lock(); | |
124 | if (!(*i)->is_discarded()) { | |
125 | (*i)->cancel_notify(self.lock()); | |
126 | } | |
127 | pg->unlock(); | |
128 | } | |
129 | } | |
130 | ||
131 | void Notify::register_cb() | |
132 | { | |
9f95a23c | 133 | ceph_assert(ceph_mutex_is_locked(lock)); |
7c673cae | 134 | { |
9f95a23c | 135 | std::lock_guard l{osd->watch_lock}; |
7c673cae | 136 | cb = new NotifyTimeoutCB(self.lock()); |
3efd9988 FG |
137 | if (!osd->watch_timer.add_event_after(timeout, cb)) { |
138 | cb = nullptr; | |
139 | } | |
7c673cae FG |
140 | } |
141 | } | |
142 | ||
143 | void Notify::unregister_cb() | |
144 | { | |
9f95a23c | 145 | ceph_assert(ceph_mutex_is_locked(lock)); |
7c673cae FG |
146 | if (!cb) |
147 | return; | |
148 | cb->cancel(); | |
149 | { | |
9f95a23c | 150 | std::lock_guard l{osd->watch_lock}; |
7c673cae | 151 | osd->watch_timer.cancel_event(cb); |
11fdf7f2 | 152 | cb = nullptr; |
7c673cae FG |
153 | } |
154 | } | |
155 | ||
156 | void Notify::start_watcher(WatchRef watch) | |
157 | { | |
11fdf7f2 | 158 | std::lock_guard l(lock); |
7c673cae FG |
159 | dout(10) << "start_watcher" << dendl; |
160 | watchers.insert(watch); | |
161 | } | |
162 | ||
163 | void Notify::complete_watcher(WatchRef watch, bufferlist& reply_bl) | |
164 | { | |
11fdf7f2 | 165 | std::lock_guard l(lock); |
7c673cae FG |
166 | dout(10) << "complete_watcher" << dendl; |
167 | if (is_discarded()) | |
168 | return; | |
11fdf7f2 | 169 | ceph_assert(watchers.count(watch)); |
7c673cae FG |
170 | watchers.erase(watch); |
171 | notify_replies.insert(make_pair(make_pair(watch->get_watcher_gid(), | |
172 | watch->get_cookie()), | |
173 | reply_bl)); | |
174 | maybe_complete_notify(); | |
175 | } | |
176 | ||
177 | void Notify::complete_watcher_remove(WatchRef watch) | |
178 | { | |
11fdf7f2 | 179 | std::lock_guard l(lock); |
7c673cae FG |
180 | dout(10) << __func__ << dendl; |
181 | if (is_discarded()) | |
182 | return; | |
11fdf7f2 | 183 | ceph_assert(watchers.count(watch)); |
7c673cae FG |
184 | watchers.erase(watch); |
185 | maybe_complete_notify(); | |
186 | } | |
187 | ||
188 | void Notify::maybe_complete_notify() | |
189 | { | |
190 | dout(10) << "maybe_complete_notify -- " | |
191 | << watchers.size() | |
192 | << " in progress watchers " << dendl; | |
193 | if (watchers.empty() || timed_out) { | |
194 | // prepare reply | |
195 | bufferlist bl; | |
11fdf7f2 | 196 | encode(notify_replies, bl); |
20effc67 TL |
197 | vector<pair<uint64_t,uint64_t>> missed; |
198 | missed.reserve(watchers.size()); | |
199 | for (auto& watcher : watchers) { | |
200 | missed.emplace_back(watcher->get_watcher_gid(), | |
201 | watcher->get_cookie()); | |
7c673cae | 202 | } |
11fdf7f2 | 203 | encode(missed, bl); |
7c673cae FG |
204 | |
205 | bufferlist empty; | |
9f95a23c TL |
206 | auto* const reply = new MWatchNotify( |
207 | cookie, | |
208 | version, | |
209 | notify_id, | |
210 | CEPH_WATCH_EVENT_NOTIFY_COMPLETE, | |
211 | empty, | |
212 | client_gid); | |
7c673cae FG |
213 | reply->set_data(bl); |
214 | if (timed_out) | |
215 | reply->return_code = -ETIMEDOUT; | |
216 | client->send_message(reply); | |
217 | unregister_cb(); | |
218 | ||
219 | complete = true; | |
220 | } | |
221 | } | |
222 | ||
223 | void Notify::discard() | |
224 | { | |
11fdf7f2 | 225 | std::lock_guard l(lock); |
7c673cae FG |
226 | discarded = true; |
227 | unregister_cb(); | |
228 | watchers.clear(); | |
229 | } | |
230 | ||
231 | void Notify::init() | |
232 | { | |
11fdf7f2 | 233 | std::lock_guard l(lock); |
7c673cae FG |
234 | register_cb(); |
235 | maybe_complete_notify(); | |
236 | } | |
237 | ||
238 | #define dout_subsys ceph_subsys_osd | |
239 | #undef dout_prefix | |
240 | #define dout_prefix _prefix(_dout, watch.get()) | |
241 | ||
242 | static ostream& _prefix( | |
243 | std::ostream* _dout, | |
244 | Watch *watch) { | |
11fdf7f2 | 245 | return watch->gen_dbg_prefix(*_dout); |
7c673cae FG |
246 | } |
247 | ||
248 | class HandleWatchTimeout : public CancelableContext { | |
249 | WatchRef watch; | |
250 | public: | |
251 | bool canceled; // protected by watch->pg->lock | |
252 | explicit HandleWatchTimeout(WatchRef watch) : watch(watch), canceled(false) {} | |
253 | void cancel() override { | |
254 | canceled = true; | |
255 | } | |
256 | void finish(int) override { ceph_abort(); /* not used */ } | |
257 | void complete(int) override { | |
258 | OSDService *osd(watch->osd); | |
259 | ldout(osd->cct, 10) << "HandleWatchTimeout" << dendl; | |
260 | boost::intrusive_ptr<PrimaryLogPG> pg(watch->pg); | |
9f95a23c | 261 | osd->watch_lock.unlock(); |
7c673cae | 262 | pg->lock(); |
11fdf7f2 | 263 | watch->cb = nullptr; |
7c673cae FG |
264 | if (!watch->is_discarded() && !canceled) |
265 | watch->pg->handle_watch_timeout(watch); | |
266 | delete this; // ~Watch requires pg lock! | |
267 | pg->unlock(); | |
9f95a23c | 268 | osd->watch_lock.lock(); |
7c673cae FG |
269 | } |
270 | }; | |
271 | ||
272 | class HandleDelayedWatchTimeout : public CancelableContext { | |
273 | WatchRef watch; | |
274 | public: | |
275 | bool canceled; | |
276 | explicit HandleDelayedWatchTimeout(WatchRef watch) : watch(watch), canceled(false) {} | |
277 | void cancel() override { | |
278 | canceled = true; | |
279 | } | |
280 | void finish(int) override { | |
281 | OSDService *osd(watch->osd); | |
282 | dout(10) << "HandleWatchTimeoutDelayed" << dendl; | |
11fdf7f2 TL |
283 | ceph_assert(watch->pg->is_locked()); |
284 | watch->cb = nullptr; | |
7c673cae FG |
285 | if (!watch->is_discarded() && !canceled) |
286 | watch->pg->handle_watch_timeout(watch); | |
287 | } | |
288 | }; | |
289 | ||
290 | #define dout_subsys ceph_subsys_osd | |
291 | #undef dout_prefix | |
292 | #define dout_prefix _prefix(_dout, this) | |
293 | ||
11fdf7f2 TL |
294 | std::ostream& Watch::gen_dbg_prefix(std::ostream& out) { |
295 | return pg->gen_prefix(out) << " -- Watch(" | |
296 | << make_pair(cookie, entity) << ") "; | |
7c673cae FG |
297 | } |
298 | ||
299 | Watch::Watch( | |
300 | PrimaryLogPG *pg, | |
301 | OSDService *osd, | |
302 | ObjectContextRef obc, | |
303 | uint32_t timeout, | |
304 | uint64_t cookie, | |
305 | entity_name_t entity, | |
306 | const entity_addr_t &addr) | |
307 | : cb(NULL), | |
308 | osd(osd), | |
309 | pg(pg), | |
310 | obc(obc), | |
311 | timeout(timeout), | |
312 | cookie(cookie), | |
313 | addr(addr), | |
314 | will_ping(false), | |
315 | entity(entity), | |
316 | discarded(false) { | |
317 | dout(10) << "Watch()" << dendl; | |
318 | } | |
319 | ||
320 | Watch::~Watch() { | |
321 | dout(10) << "~Watch" << dendl; | |
322 | // users must have called remove() or discard() prior to this point | |
11fdf7f2 | 323 | ceph_assert(!obc); |
9f95a23c | 324 | ceph_assert(!is_connected()); |
7c673cae FG |
325 | } |
326 | ||
7c673cae FG |
327 | Context *Watch::get_delayed_cb() |
328 | { | |
11fdf7f2 | 329 | ceph_assert(!cb); |
7c673cae FG |
330 | cb = new HandleDelayedWatchTimeout(self.lock()); |
331 | return cb; | |
332 | } | |
333 | ||
334 | void Watch::register_cb() | |
335 | { | |
11fdf7f2 | 336 | std::lock_guard l(osd->watch_lock); |
7c673cae FG |
337 | if (cb) { |
338 | dout(15) << "re-registering callback, timeout: " << timeout << dendl; | |
339 | cb->cancel(); | |
340 | osd->watch_timer.cancel_event(cb); | |
341 | } else { | |
342 | dout(15) << "registering callback, timeout: " << timeout << dendl; | |
343 | } | |
344 | cb = new HandleWatchTimeout(self.lock()); | |
3efd9988 FG |
345 | if (!osd->watch_timer.add_event_after(timeout, cb)) { |
346 | cb = nullptr; | |
347 | } | |
7c673cae FG |
348 | } |
349 | ||
350 | void Watch::unregister_cb() | |
351 | { | |
352 | dout(15) << "unregister_cb" << dendl; | |
353 | if (!cb) | |
354 | return; | |
355 | dout(15) << "actually registered, cancelling" << dendl; | |
356 | cb->cancel(); | |
357 | { | |
11fdf7f2 | 358 | std::lock_guard l(osd->watch_lock); |
7c673cae FG |
359 | osd->watch_timer.cancel_event(cb); // harmless if not registered with timer |
360 | } | |
11fdf7f2 | 361 | cb = nullptr; |
7c673cae FG |
362 | } |
363 | ||
364 | void Watch::got_ping(utime_t t) | |
365 | { | |
366 | last_ping = t; | |
9f95a23c | 367 | if (is_connected()) { |
7c673cae FG |
368 | register_cb(); |
369 | } | |
370 | } | |
371 | ||
372 | void Watch::connect(ConnectionRef con, bool _will_ping) | |
373 | { | |
9f95a23c | 374 | if (is_connected(con.get())) { |
7c673cae FG |
375 | dout(10) << __func__ << " con " << con << " - already connected" << dendl; |
376 | return; | |
377 | } | |
378 | dout(10) << __func__ << " con " << con << dendl; | |
379 | conn = con; | |
380 | will_ping = _will_ping; | |
11fdf7f2 TL |
381 | auto priv = con->get_priv(); |
382 | if (priv) { | |
383 | auto sessionref = static_cast<Session*>(priv.get()); | |
7c673cae | 384 | sessionref->wstate.addWatch(self.lock()); |
11fdf7f2 | 385 | priv.reset(); |
f67539c2 | 386 | for (auto i = in_progress_notifies.begin(); |
7c673cae FG |
387 | i != in_progress_notifies.end(); |
388 | ++i) { | |
389 | send_notify(i->second); | |
390 | } | |
391 | } | |
392 | if (will_ping) { | |
393 | last_ping = ceph_clock_now(); | |
394 | register_cb(); | |
395 | } else { | |
396 | unregister_cb(); | |
397 | } | |
398 | } | |
399 | ||
400 | void Watch::disconnect() | |
401 | { | |
402 | dout(10) << "disconnect (con was " << conn << ")" << dendl; | |
403 | conn = ConnectionRef(); | |
404 | if (!will_ping) | |
405 | register_cb(); | |
406 | } | |
407 | ||
408 | void Watch::discard() | |
409 | { | |
410 | dout(10) << "discard" << dendl; | |
f67539c2 | 411 | for (auto i = in_progress_notifies.begin(); |
7c673cae FG |
412 | i != in_progress_notifies.end(); |
413 | ++i) { | |
414 | i->second->discard(); | |
415 | } | |
416 | discard_state(); | |
417 | } | |
418 | ||
419 | void Watch::discard_state() | |
420 | { | |
11fdf7f2 TL |
421 | ceph_assert(pg->is_locked()); |
422 | ceph_assert(!discarded); | |
423 | ceph_assert(obc); | |
7c673cae FG |
424 | in_progress_notifies.clear(); |
425 | unregister_cb(); | |
426 | discarded = true; | |
9f95a23c | 427 | if (is_connected()) { |
11fdf7f2 TL |
428 | if (auto priv = conn->get_priv(); priv) { |
429 | auto session = static_cast<Session*>(priv.get()); | |
430 | session->wstate.removeWatch(self.lock()); | |
7c673cae FG |
431 | } |
432 | conn = ConnectionRef(); | |
433 | } | |
434 | obc = ObjectContextRef(); | |
435 | } | |
436 | ||
437 | bool Watch::is_discarded() const | |
438 | { | |
439 | return discarded; | |
440 | } | |
441 | ||
442 | void Watch::remove(bool send_disconnect) | |
443 | { | |
444 | dout(10) << "remove" << dendl; | |
9f95a23c | 445 | if (send_disconnect && is_connected()) { |
7c673cae FG |
446 | bufferlist empty; |
447 | MWatchNotify *reply(new MWatchNotify(cookie, 0, 0, | |
448 | CEPH_WATCH_EVENT_DISCONNECT, empty)); | |
449 | conn->send_message(reply); | |
450 | } | |
f67539c2 | 451 | for (auto i = in_progress_notifies.begin(); |
7c673cae FG |
452 | i != in_progress_notifies.end(); |
453 | ++i) { | |
454 | i->second->complete_watcher_remove(self.lock()); | |
455 | } | |
456 | discard_state(); | |
457 | } | |
458 | ||
459 | void Watch::start_notify(NotifyRef notif) | |
460 | { | |
11fdf7f2 | 461 | ceph_assert(in_progress_notifies.find(notif->notify_id) == |
7c673cae FG |
462 | in_progress_notifies.end()); |
463 | if (will_ping) { | |
464 | utime_t cutoff = ceph_clock_now(); | |
465 | cutoff.sec_ref() -= timeout; | |
466 | if (last_ping < cutoff) { | |
467 | dout(10) << __func__ << " " << notif->notify_id | |
468 | << " last_ping " << last_ping << " < cutoff " << cutoff | |
469 | << ", disconnecting" << dendl; | |
470 | disconnect(); | |
471 | return; | |
472 | } | |
473 | } | |
474 | dout(10) << "start_notify " << notif->notify_id << dendl; | |
475 | in_progress_notifies[notif->notify_id] = notif; | |
476 | notif->start_watcher(self.lock()); | |
9f95a23c | 477 | if (is_connected()) |
7c673cae FG |
478 | send_notify(notif); |
479 | } | |
480 | ||
481 | void Watch::cancel_notify(NotifyRef notif) | |
482 | { | |
483 | dout(10) << "cancel_notify " << notif->notify_id << dendl; | |
484 | in_progress_notifies.erase(notif->notify_id); | |
485 | } | |
486 | ||
487 | void Watch::send_notify(NotifyRef notif) | |
488 | { | |
489 | dout(10) << "send_notify" << dendl; | |
490 | MWatchNotify *notify_msg = new MWatchNotify( | |
9f95a23c TL |
491 | cookie, |
492 | notif->version, | |
493 | notif->notify_id, | |
494 | CEPH_WATCH_EVENT_NOTIFY, | |
495 | notif->payload, | |
496 | notif->client_gid); | |
7c673cae FG |
497 | conn->send_message(notify_msg); |
498 | } | |
499 | ||
500 | void Watch::notify_ack(uint64_t notify_id, bufferlist& reply_bl) | |
501 | { | |
502 | dout(10) << "notify_ack" << dendl; | |
f67539c2 | 503 | auto i = in_progress_notifies.find(notify_id); |
7c673cae FG |
504 | if (i != in_progress_notifies.end()) { |
505 | i->second->complete_watcher(self.lock(), reply_bl); | |
506 | in_progress_notifies.erase(i); | |
507 | } | |
508 | } | |
509 | ||
510 | WatchRef Watch::makeWatchRef( | |
511 | PrimaryLogPG *pg, OSDService *osd, | |
512 | ObjectContextRef obc, uint32_t timeout, uint64_t cookie, entity_name_t entity, const entity_addr_t& addr) | |
513 | { | |
514 | WatchRef ret(new Watch(pg, osd, obc, timeout, cookie, entity, addr)); | |
515 | ret->set_self(ret); | |
516 | return ret; | |
517 | } | |
518 | ||
519 | void WatchConState::addWatch(WatchRef watch) | |
520 | { | |
11fdf7f2 | 521 | std::lock_guard l(lock); |
7c673cae FG |
522 | watches.insert(watch); |
523 | } | |
524 | ||
525 | void WatchConState::removeWatch(WatchRef watch) | |
526 | { | |
11fdf7f2 | 527 | std::lock_guard l(lock); |
7c673cae FG |
528 | watches.erase(watch); |
529 | } | |
530 | ||
531 | void WatchConState::reset(Connection *con) | |
532 | { | |
533 | set<WatchRef> _watches; | |
534 | { | |
11fdf7f2 | 535 | std::lock_guard l(lock); |
7c673cae FG |
536 | _watches.swap(watches); |
537 | } | |
538 | for (set<WatchRef>::iterator i = _watches.begin(); | |
539 | i != _watches.end(); | |
540 | ++i) { | |
541 | boost::intrusive_ptr<PrimaryLogPG> pg((*i)->get_pg()); | |
542 | pg->lock(); | |
543 | if (!(*i)->is_discarded()) { | |
544 | if ((*i)->is_connected(con)) { | |
545 | (*i)->disconnect(); | |
546 | } else { | |
547 | lgeneric_derr(cct) << __func__ << " not still connected to " << (*i) << dendl; | |
548 | } | |
549 | } | |
550 | pg->unlock(); | |
551 | } | |
552 | } |