1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "crimson/osd/watch.h"
5 #include "messages/MWatchNotify.h"
9 seastar::logger
& logger() {
10 return crimson::get_logger(ceph_subsys_osd
);
14 namespace crimson::osd
{
16 bool Watch::NotifyCmp::operator()(NotifyRef lhs
, NotifyRef rhs
) const
20 return lhs
->get_id() < rhs
->get_id();
23 seastar::future
<> Watch::connect(crimson::net::ConnectionRef conn
, bool)
25 if (this->conn
== conn
) {
26 logger().debug("conn={} already connected", conn
);
29 this->conn
= std::move(conn
);
30 return seastar::now();
33 seastar::future
<> Watch::send_notify_msg(NotifyRef notify
)
35 logger().info("{} for notify(id={})", __func__
, notify
->ninfo
.notify_id
);
36 return conn
->send(make_message
<MWatchNotify
>(
39 notify
->ninfo
.notify_id
,
40 CEPH_WATCH_EVENT_NOTIFY
,
45 seastar::future
<> Watch::start_notify(NotifyRef notify
)
47 logger().info("{} adding notify(id={})", __func__
, notify
->ninfo
.notify_id
);
48 auto [ it
, emplaced
] = in_progress_notifies
.emplace(std::move(notify
));
49 ceph_assert(emplaced
);
50 ceph_assert(is_alive());
51 return is_connected() ? send_notify_msg(*it
) : seastar::now();
54 seastar::future
<> Watch::notify_ack(
55 const uint64_t notify_id
,
56 const ceph::bufferlist
& reply_bl
)
58 logger().info("{}", __func__
);
59 return seastar::do_for_each(in_progress_notifies
,
60 [this_shared
=shared_from_this(), &reply_bl
] (auto notify
) {
61 return notify
->complete_watcher(this_shared
, reply_bl
);
64 in_progress_notifies
.clear();
65 return seastar::now();
69 seastar::future
<> Watch::send_disconnect_msg()
71 if (!is_connected()) {
72 return seastar::now();
74 ceph::bufferlist empty
;
75 return conn
->send(make_message
<MWatchNotify
>(
79 CEPH_WATCH_EVENT_DISCONNECT
,
83 void Watch::discard_state()
86 in_progress_notifies
.clear();
89 seastar::future
<> Watch::remove(const bool send_disconnect
)
91 logger().info("{}", __func__
);
92 auto disconnected
= send_disconnect
? send_disconnect_msg()
94 return std::move(disconnected
).then([this] {
95 return seastar::do_for_each(in_progress_notifies
,
96 [this_shared
=shared_from_this()] (auto notify
) {
97 return notify
->remove_watcher(this_shared
);
100 return seastar::now();
105 bool notify_reply_t::operator<(const notify_reply_t
& rhs
) const
107 // comparing std::pairs to emphasize our legacy. ceph-osd stores
108 // notify_replies as std::multimap<std::pair<gid, cookie>, bl>.
109 // unfortunately, what seems to be an implementation detail, got
110 // exposed as part of our public API (the `reply_buffer` parameter
111 // of the `rados_notify` family).
112 const auto lhsp
= std::make_pair(watcher_gid
, watcher_cookie
);
113 const auto rhsp
= std::make_pair(rhs
.watcher_gid
, rhs
.watcher_cookie
);
117 seastar::future
<> Notify::remove_watcher(WatchRef watch
)
119 if (discarded
|| complete
) {
120 return seastar::now();
122 [[maybe_unused
]] const auto num_removed
= watchers
.erase(watch
);
123 assert(num_removed
> 0);
124 return maybe_send_completion();
128 seastar::future
<> Notify::complete_watcher(
130 const ceph::bufferlist
& reply_bl
)
132 if (discarded
|| complete
) {
133 return seastar::now();
135 notify_replies
.emplace(notify_reply_t
{
136 watch
->get_watcher_gid(),
139 return remove_watcher(std::move(watch
));
142 seastar::future
<> Notify::maybe_send_completion()
144 logger().info("{} -- {} in progress watchers", __func__
, watchers
.size());
145 if (watchers
.empty()) {
148 encode(notify_replies
, bl
);
149 // FIXME: this is just a stub
150 std::list
<std::pair
<uint64_t,uint64_t>> missed
;
155 ceph::bufferlist empty
;
156 auto reply
= make_message
<MWatchNotify
>(
160 CEPH_WATCH_EVENT_NOTIFY_COMPLETE
,
164 return conn
->send(std::move(reply
));
166 return seastar::now();
169 } // namespace crimson::osd