]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/osd/watch.cc
f71d915bb9d7ab6b3e14352b7ba42a0f5430bca7
[ceph.git] / ceph / src / crimson / osd / watch.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <algorithm>
5
6 #include <boost/range/adaptor/transformed.hpp>
7 #include <boost/range/algorithm_ext/insert.hpp>
8
9 #include "crimson/osd/watch.h"
10 #include "crimson/osd/osd_operations/internal_client_request.h"
11
12 #include "messages/MWatchNotify.h"
13
14
15 namespace {
16 seastar::logger& logger() {
17 return crimson::get_logger(ceph_subsys_osd);
18 }
19 }
20
21 namespace crimson::osd {
22
23 // a watcher can remove itself if it has not seen a notification after a period of time.
24 // in the case, we need to drop it also from the persisted `ObjectState` instance.
25 // this operation resembles a bit the `_UNWATCH` subop.
26 class WatchTimeoutRequest final : public InternalClientRequest {
27 public:
28 WatchTimeoutRequest(WatchRef watch, Ref<PG> pg)
29 : InternalClientRequest(std::move(pg)),
30 watch(std::move(watch)) {
31 }
32
33 const hobject_t& get_target_oid() const final;
34 PG::do_osd_ops_params_t get_do_osd_ops_params() const final;
35 std::vector<OSDOp> create_osd_ops() final;
36
37 private:
38 WatchRef watch;
39 };
40
41 const hobject_t& WatchTimeoutRequest::get_target_oid() const
42 {
43 assert(watch->obc);
44 return watch->obc->get_oid();
45 }
46
47 PG::do_osd_ops_params_t
48 WatchTimeoutRequest::get_do_osd_ops_params() const
49 {
50 osd_reqid_t reqid;
51 reqid.name = watch->entity_name;
52 PG::do_osd_ops_params_t params{
53 watch->conn,
54 reqid,
55 ceph_clock_now(),
56 get_pg().get_osdmap_epoch(),
57 entity_inst_t{ watch->entity_name, watch->winfo.addr },
58 0
59 };
60 logger().debug("{}: params.reqid={}", __func__, params.reqid);
61 return params;
62 }
63
64 std::vector<OSDOp> WatchTimeoutRequest::create_osd_ops()
65 {
66 logger().debug("{}", __func__);
67 assert(watch);
68 OSDOp osd_op;
69 osd_op.op.op = CEPH_OSD_OP_WATCH;
70 osd_op.op.flags = 0;
71 osd_op.op.watch.op = CEPH_OSD_WATCH_OP_UNWATCH;
72 osd_op.op.watch.cookie = watch->winfo.cookie;
73 return std::vector{std::move(osd_op)};
74 }
75
76 Watch::~Watch()
77 {
78 logger().debug("{} gid={} cookie={}", __func__, get_watcher_gid(), get_cookie());
79 }
80
81 seastar::future<> Watch::connect(crimson::net::ConnectionRef conn, bool)
82 {
83 if (this->conn == conn) {
84 logger().debug("conn={} already connected", conn);
85 return seastar::now();
86 }
87 timeout_timer.cancel();
88 timeout_timer.arm(std::chrono::seconds{winfo.timeout_seconds});
89 this->conn = std::move(conn);
90 return seastar::now();
91 }
92
93 void Watch::disconnect()
94 {
95 ceph_assert(!conn);
96 timeout_timer.cancel();
97 timeout_timer.arm(std::chrono::seconds{winfo.timeout_seconds});
98 }
99
100 seastar::future<> Watch::send_notify_msg(NotifyRef notify)
101 {
102 logger().info("{} for notify(id={})", __func__, notify->ninfo.notify_id);
103 return conn->send(crimson::make_message<MWatchNotify>(
104 winfo.cookie,
105 notify->user_version,
106 notify->ninfo.notify_id,
107 CEPH_WATCH_EVENT_NOTIFY,
108 notify->ninfo.bl,
109 notify->client_gid));
110 }
111
112 seastar::future<> Watch::start_notify(NotifyRef notify)
113 {
114 logger().info("{} adding notify(id={})", __func__, notify->ninfo.notify_id);
115 auto [ it, emplaced ] = in_progress_notifies.emplace(std::move(notify));
116 ceph_assert(emplaced);
117 ceph_assert(is_alive());
118 return is_connected() ? send_notify_msg(*it) : seastar::now();
119 }
120
121 seastar::future<> Watch::notify_ack(
122 const uint64_t notify_id,
123 const ceph::bufferlist& reply_bl)
124 {
125 logger().info("{}", __func__);
126 return seastar::do_for_each(in_progress_notifies,
127 [this_shared=shared_from_this(), reply_bl] (auto notify) {
128 return notify->complete_watcher(this_shared, reply_bl);
129 }
130 ).then([this] {
131 in_progress_notifies.clear();
132 return seastar::now();
133 });
134 }
135
136 seastar::future<> Watch::send_disconnect_msg()
137 {
138 if (!is_connected()) {
139 return seastar::now();
140 }
141 ceph::bufferlist empty;
142 return conn->send(crimson::make_message<MWatchNotify>(
143 winfo.cookie,
144 0,
145 0,
146 CEPH_WATCH_EVENT_DISCONNECT,
147 empty));
148 }
149
150 void Watch::discard_state()
151 {
152 ceph_assert(obc);
153 in_progress_notifies.clear();
154 timeout_timer.cancel();
155 }
156
157 void Watch::got_ping(utime_t)
158 {
159 if (is_connected()) {
160 // using cancel() + arm() as rearm() has no overload for time delta.
161 timeout_timer.cancel();
162 timeout_timer.arm(std::chrono::seconds{winfo.timeout_seconds});
163 }
164 }
165
166 seastar::future<> Watch::remove()
167 {
168 logger().info("{}", __func__);
169 // in contrast to ceph-osd crimson sends CEPH_WATCH_EVENT_DISCONNECT directly
170 // from the timeout handler and _after_ CEPH_WATCH_EVENT_NOTIFY_COMPLETE.
171 // this simplifies the Watch::remove() interface as callers aren't obliged
172 // anymore to decide whether EVENT_DISCONNECT needs to be send or not -- it
173 // becomes an implementation detail of Watch.
174 return seastar::do_for_each(in_progress_notifies,
175 [this_shared=shared_from_this()] (auto notify) {
176 return notify->remove_watcher(this_shared);
177 }).then([this] {
178 discard_state();
179 return seastar::now();
180 });
181 }
182
183 void Watch::cancel_notify(const uint64_t notify_id)
184 {
185 logger().info("{} notify_id={}", __func__, notify_id);
186 const auto it = in_progress_notifies.find(notify_id);
187 assert(it != std::end(in_progress_notifies));
188 in_progress_notifies.erase(it);
189 }
190
191 void Watch::do_watch_timeout()
192 {
193 assert(pg);
194 auto [op, fut] = pg->get_shard_services().start_operation<WatchTimeoutRequest>(
195 shared_from_this(), pg);
196 std::ignore = std::move(fut).then([op=std::move(op), this] {
197 return send_disconnect_msg();
198 });
199 }
200
201 bool notify_reply_t::operator<(const notify_reply_t& rhs) const
202 {
203 // comparing std::pairs to emphasize our legacy. ceph-osd stores
204 // notify_replies as std::multimap<std::pair<gid, cookie>, bl>.
205 // unfortunately, what seems to be an implementation detail, got
206 // exposed as part of our public API (the `reply_buffer` parameter
207 // of the `rados_notify` family).
208 const auto lhsp = std::make_pair(watcher_gid, watcher_cookie);
209 const auto rhsp = std::make_pair(rhs.watcher_gid, rhs.watcher_cookie);
210 return lhsp < rhsp;
211 }
212
213 std::ostream &operator<<(std::ostream &out, const notify_reply_t &rhs)
214 {
215 out << "notify_reply_t{watcher_gid=" << rhs.watcher_gid
216 << ", watcher_cookie=" << rhs.watcher_cookie
217 << ", bl=" << rhs.bl << "}";
218 return out;
219 }
220
221 Notify::Notify(crimson::net::ConnectionRef conn,
222 const notify_info_t& ninfo,
223 const uint64_t client_gid,
224 const uint64_t user_version)
225 : ninfo(ninfo),
226 conn(std::move(conn)),
227 client_gid(client_gid),
228 user_version(user_version)
229 {}
230
231 seastar::future<> Notify::remove_watcher(WatchRef watch)
232 {
233 if (discarded || complete) {
234 return seastar::now();
235 }
236 [[maybe_unused]] const auto num_removed = watchers.erase(watch);
237 assert(num_removed > 0);
238 if (watchers.empty()) {
239 complete = true;
240 [[maybe_unused]] bool was_armed = timeout_timer.cancel();
241 assert(was_armed);
242 return send_completion();
243 } else {
244 return seastar::now();
245 }
246 }
247
248
249 seastar::future<> Notify::complete_watcher(
250 WatchRef watch,
251 const ceph::bufferlist& reply_bl)
252 {
253 if (discarded || complete) {
254 return seastar::now();
255 }
256 notify_replies.emplace(notify_reply_t{
257 watch->get_watcher_gid(),
258 watch->get_cookie(),
259 reply_bl});
260 return remove_watcher(std::move(watch));
261 }
262
263 seastar::future<> Notify::send_completion(
264 std::set<WatchRef> timedout_watchers)
265 {
266 logger().info("{} -- {} in progress watchers, timedout watchers {}",
267 __func__, watchers.size(), timedout_watchers.size());
268 logger().debug("{} sending notify replies: {}", __func__, notify_replies);
269
270 ceph::bufferlist empty;
271 auto reply = crimson::make_message<MWatchNotify>(
272 ninfo.cookie,
273 user_version,
274 ninfo.notify_id,
275 CEPH_WATCH_EVENT_NOTIFY_COMPLETE,
276 empty,
277 client_gid);
278 ceph::bufferlist reply_bl;
279 {
280 std::vector<std::pair<uint64_t,uint64_t>> missed;
281 missed.reserve(std::size(timedout_watchers));
282 boost::insert(
283 missed, std::begin(missed),
284 timedout_watchers | boost::adaptors::transformed([] (auto w) {
285 return std::make_pair(w->get_watcher_gid(), w->get_cookie());
286 }));
287 ceph::encode(notify_replies, reply_bl);
288 ceph::encode(missed, reply_bl);
289 }
290 reply->set_data(std::move(reply_bl));
291 if (!timedout_watchers.empty()) {
292 reply->return_code = -ETIMEDOUT;
293 }
294 return conn->send(std::move(reply));
295 }
296
297 void Notify::do_notify_timeout()
298 {
299 logger().debug("{} complete={}", __func__, complete);
300 if (complete) {
301 return;
302 }
303 // it might be that `this` is kept alive only because of the reference
304 // a watcher stores and which is being removed by `cancel_notify()`.
305 // to avoid use-after-free we bump up the ref counter with `guard_ptr`.
306 [[maybe_unused]] auto guard_ptr = shared_from_this();
307 for (auto& watcher : watchers) {
308 logger().debug("canceling watcher cookie={} gid={} use_count={}",
309 watcher->get_cookie(),
310 watcher->get_watcher_gid(),
311 watcher->use_count());
312 watcher->cancel_notify(ninfo.notify_id);
313 }
314 std::ignore = send_completion(std::move(watchers));
315 watchers.clear();
316 }
317
318 } // namespace crimson::osd
319
320 #if FMT_VERSION >= 90000
321 template <> struct fmt::formatter<crimson::osd::WatchTimeoutRequest> : fmt::ostream_formatter {};
322 #endif