1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include <boost/range/adaptor/transformed.hpp>
7 #include <boost/range/algorithm_ext/insert.hpp>
9 #include "crimson/osd/watch.h"
10 #include "crimson/osd/osd_operations/internal_client_request.h"
12 #include "messages/MWatchNotify.h"
16 seastar::logger
& logger() {
17 return crimson::get_logger(ceph_subsys_osd
);
21 namespace crimson::osd
{
23 // a watcher can remove itself if it has not seen a notification after a period of time.
24 // in the case, we need to drop it also from the persisted `ObjectState` instance.
25 // this operation resembles a bit the `_UNWATCH` subop.
26 class WatchTimeoutRequest final
: public InternalClientRequest
{
28 WatchTimeoutRequest(WatchRef watch
, Ref
<PG
> pg
)
29 : InternalClientRequest(std::move(pg
)),
30 watch(std::move(watch
)) {
33 const hobject_t
& get_target_oid() const final
;
34 PG::do_osd_ops_params_t
get_do_osd_ops_params() const final
;
35 std::vector
<OSDOp
> create_osd_ops() final
;
41 const hobject_t
& WatchTimeoutRequest::get_target_oid() const
44 return watch
->obc
->get_oid();
47 PG::do_osd_ops_params_t
48 WatchTimeoutRequest::get_do_osd_ops_params() const
51 reqid
.name
= watch
->entity_name
;
52 PG::do_osd_ops_params_t params
{
56 get_pg().get_osdmap_epoch(),
57 entity_inst_t
{ watch
->entity_name
, watch
->winfo
.addr
},
60 logger().debug("{}: params.reqid={}", __func__
, params
.reqid
);
64 std::vector
<OSDOp
> WatchTimeoutRequest::create_osd_ops()
66 logger().debug("{}", __func__
);
69 osd_op
.op
.op
= CEPH_OSD_OP_WATCH
;
71 osd_op
.op
.watch
.op
= CEPH_OSD_WATCH_OP_UNWATCH
;
72 osd_op
.op
.watch
.cookie
= watch
->winfo
.cookie
;
73 return std::vector
{std::move(osd_op
)};
78 logger().debug("{} gid={} cookie={}", __func__
, get_watcher_gid(), get_cookie());
81 seastar::future
<> Watch::connect(crimson::net::ConnectionRef conn
, bool)
83 if (this->conn
== conn
) {
84 logger().debug("conn={} already connected", conn
);
85 return seastar::now();
87 timeout_timer
.cancel();
88 timeout_timer
.arm(std::chrono::seconds
{winfo
.timeout_seconds
});
89 this->conn
= std::move(conn
);
90 return seastar::now();
93 void Watch::disconnect()
96 timeout_timer
.cancel();
97 timeout_timer
.arm(std::chrono::seconds
{winfo
.timeout_seconds
});
100 seastar::future
<> Watch::send_notify_msg(NotifyRef notify
)
102 logger().info("{} for notify(id={})", __func__
, notify
->ninfo
.notify_id
);
103 return conn
->send(crimson::make_message
<MWatchNotify
>(
105 notify
->user_version
,
106 notify
->ninfo
.notify_id
,
107 CEPH_WATCH_EVENT_NOTIFY
,
109 notify
->client_gid
));
112 seastar::future
<> Watch::start_notify(NotifyRef notify
)
114 logger().info("{} adding notify(id={})", __func__
, notify
->ninfo
.notify_id
);
115 auto [ it
, emplaced
] = in_progress_notifies
.emplace(std::move(notify
));
116 ceph_assert(emplaced
);
117 ceph_assert(is_alive());
118 return is_connected() ? send_notify_msg(*it
) : seastar::now();
121 seastar::future
<> Watch::notify_ack(
122 const uint64_t notify_id
,
123 const ceph::bufferlist
& reply_bl
)
125 logger().info("{}", __func__
);
126 return seastar::do_for_each(in_progress_notifies
,
127 [this_shared
=shared_from_this(), reply_bl
] (auto notify
) {
128 return notify
->complete_watcher(this_shared
, reply_bl
);
131 in_progress_notifies
.clear();
132 return seastar::now();
136 seastar::future
<> Watch::send_disconnect_msg()
138 if (!is_connected()) {
139 return seastar::now();
141 ceph::bufferlist empty
;
142 return conn
->send(crimson::make_message
<MWatchNotify
>(
146 CEPH_WATCH_EVENT_DISCONNECT
,
150 void Watch::discard_state()
153 in_progress_notifies
.clear();
154 timeout_timer
.cancel();
157 void Watch::got_ping(utime_t
)
159 if (is_connected()) {
160 // using cancel() + arm() as rearm() has no overload for time delta.
161 timeout_timer
.cancel();
162 timeout_timer
.arm(std::chrono::seconds
{winfo
.timeout_seconds
});
166 seastar::future
<> Watch::remove()
168 logger().info("{}", __func__
);
169 // in contrast to ceph-osd crimson sends CEPH_WATCH_EVENT_DISCONNECT directly
170 // from the timeout handler and _after_ CEPH_WATCH_EVENT_NOTIFY_COMPLETE.
171 // this simplifies the Watch::remove() interface as callers aren't obliged
172 // anymore to decide whether EVENT_DISCONNECT needs to be send or not -- it
173 // becomes an implementation detail of Watch.
174 return seastar::do_for_each(in_progress_notifies
,
175 [this_shared
=shared_from_this()] (auto notify
) {
176 return notify
->remove_watcher(this_shared
);
179 return seastar::now();
183 void Watch::cancel_notify(const uint64_t notify_id
)
185 logger().info("{} notify_id={}", __func__
, notify_id
);
186 const auto it
= in_progress_notifies
.find(notify_id
);
187 assert(it
!= std::end(in_progress_notifies
));
188 in_progress_notifies
.erase(it
);
191 void Watch::do_watch_timeout()
194 auto [op
, fut
] = pg
->get_shard_services().start_operation
<WatchTimeoutRequest
>(
195 shared_from_this(), pg
);
196 std::ignore
= std::move(fut
).then([op
=std::move(op
), this] {
197 return send_disconnect_msg();
201 bool notify_reply_t::operator<(const notify_reply_t
& rhs
) const
203 // comparing std::pairs to emphasize our legacy. ceph-osd stores
204 // notify_replies as std::multimap<std::pair<gid, cookie>, bl>.
205 // unfortunately, what seems to be an implementation detail, got
206 // exposed as part of our public API (the `reply_buffer` parameter
207 // of the `rados_notify` family).
208 const auto lhsp
= std::make_pair(watcher_gid
, watcher_cookie
);
209 const auto rhsp
= std::make_pair(rhs
.watcher_gid
, rhs
.watcher_cookie
);
213 std::ostream
&operator<<(std::ostream
&out
, const notify_reply_t
&rhs
)
215 out
<< "notify_reply_t{watcher_gid=" << rhs
.watcher_gid
216 << ", watcher_cookie=" << rhs
.watcher_cookie
217 << ", bl=" << rhs
.bl
<< "}";
221 Notify::Notify(crimson::net::ConnectionRef conn
,
222 const notify_info_t
& ninfo
,
223 const uint64_t client_gid
,
224 const uint64_t user_version
)
226 conn(std::move(conn
)),
227 client_gid(client_gid
),
228 user_version(user_version
)
231 seastar::future
<> Notify::remove_watcher(WatchRef watch
)
233 if (discarded
|| complete
) {
234 return seastar::now();
236 [[maybe_unused
]] const auto num_removed
= watchers
.erase(watch
);
237 assert(num_removed
> 0);
238 if (watchers
.empty()) {
240 [[maybe_unused
]] bool was_armed
= timeout_timer
.cancel();
242 return send_completion();
244 return seastar::now();
249 seastar::future
<> Notify::complete_watcher(
251 const ceph::bufferlist
& reply_bl
)
253 if (discarded
|| complete
) {
254 return seastar::now();
256 notify_replies
.emplace(notify_reply_t
{
257 watch
->get_watcher_gid(),
260 return remove_watcher(std::move(watch
));
263 seastar::future
<> Notify::send_completion(
264 std::set
<WatchRef
> timedout_watchers
)
266 logger().info("{} -- {} in progress watchers, timedout watchers {}",
267 __func__
, watchers
.size(), timedout_watchers
.size());
268 logger().debug("{} sending notify replies: {}", __func__
, notify_replies
);
270 ceph::bufferlist empty
;
271 auto reply
= crimson::make_message
<MWatchNotify
>(
275 CEPH_WATCH_EVENT_NOTIFY_COMPLETE
,
278 ceph::bufferlist reply_bl
;
280 std::vector
<std::pair
<uint64_t,uint64_t>> missed
;
281 missed
.reserve(std::size(timedout_watchers
));
283 missed
, std::begin(missed
),
284 timedout_watchers
| boost::adaptors::transformed([] (auto w
) {
285 return std::make_pair(w
->get_watcher_gid(), w
->get_cookie());
287 ceph::encode(notify_replies
, reply_bl
);
288 ceph::encode(missed
, reply_bl
);
290 reply
->set_data(std::move(reply_bl
));
291 if (!timedout_watchers
.empty()) {
292 reply
->return_code
= -ETIMEDOUT
;
294 return conn
->send(std::move(reply
));
297 void Notify::do_notify_timeout()
299 logger().debug("{} complete={}", __func__
, complete
);
303 // it might be that `this` is kept alive only because of the reference
304 // a watcher stores and which is being removed by `cancel_notify()`.
305 // to avoid use-after-free we bump up the ref counter with `guard_ptr`.
306 [[maybe_unused
]] auto guard_ptr
= shared_from_this();
307 for (auto& watcher
: watchers
) {
308 logger().debug("canceling watcher cookie={} gid={} use_count={}",
309 watcher
->get_cookie(),
310 watcher
->get_watcher_gid(),
311 watcher
->use_count());
312 watcher
->cancel_notify(ninfo
.notify_id
);
314 std::ignore
= send_completion(std::move(watchers
));
318 } // namespace crimson::osd
320 #if FMT_VERSION >= 90000
321 template <> struct fmt::formatter
<crimson::osd::WatchTimeoutRequest
> : fmt::ostream_formatter
{};