]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/osd/watch.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / crimson / osd / watch.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "crimson/osd/watch.h"
5 #include "messages/MWatchNotify.h"
6
7
8 namespace {
9 seastar::logger& logger() {
10 return crimson::get_logger(ceph_subsys_osd);
11 }
12 }
13
14 namespace crimson::osd {
15
16 bool Watch::NotifyCmp::operator()(NotifyRef lhs, NotifyRef rhs) const
17 {
18 ceph_assert(lhs);
19 ceph_assert(rhs);
20 return lhs->get_id() < rhs->get_id();
21 }
22
23 seastar::future<> Watch::connect(crimson::net::ConnectionRef conn, bool)
24 {
25 if (this->conn == conn) {
26 logger().debug("conn={} already connected", conn);
27 }
28
29 this->conn = std::move(conn);
30 return seastar::now();
31 }
32
33 seastar::future<> Watch::send_notify_msg(NotifyRef notify)
34 {
35 logger().info("{} for notify(id={})", __func__, notify->ninfo.notify_id);
36 return conn->send(make_message<MWatchNotify>(
37 winfo.cookie,
38 notify->user_version,
39 notify->ninfo.notify_id,
40 CEPH_WATCH_EVENT_NOTIFY,
41 notify->ninfo.bl,
42 notify->client_gid));
43 }
44
45 seastar::future<> Watch::start_notify(NotifyRef notify)
46 {
47 logger().info("{} adding notify(id={})", __func__, notify->ninfo.notify_id);
48 auto [ it, emplaced ] = in_progress_notifies.emplace(std::move(notify));
49 ceph_assert(emplaced);
50 ceph_assert(is_alive());
51 return is_connected() ? send_notify_msg(*it) : seastar::now();
52 }
53
54 seastar::future<> Watch::notify_ack(
55 const uint64_t notify_id,
56 const ceph::bufferlist& reply_bl)
57 {
58 logger().info("{}", __func__);
59 return seastar::do_for_each(in_progress_notifies,
60 [this_shared=shared_from_this(), &reply_bl] (auto notify) {
61 return notify->complete_watcher(this_shared, reply_bl);
62 }
63 ).then([this] {
64 in_progress_notifies.clear();
65 return seastar::now();
66 });
67 }
68
69 seastar::future<> Watch::send_disconnect_msg()
70 {
71 if (!is_connected()) {
72 return seastar::now();
73 }
74 ceph::bufferlist empty;
75 return conn->send(make_message<MWatchNotify>(
76 winfo.cookie,
77 0,
78 0,
79 CEPH_WATCH_EVENT_DISCONNECT,
80 empty));
81 }
82
83 void Watch::discard_state()
84 {
85 ceph_assert(obc);
86 in_progress_notifies.clear();
87 }
88
89 seastar::future<> Watch::remove(const bool send_disconnect)
90 {
91 logger().info("{}", __func__);
92 auto disconnected = send_disconnect ? send_disconnect_msg()
93 : seastar::now();
94 return std::move(disconnected).then([this] {
95 return seastar::do_for_each(in_progress_notifies,
96 [this_shared=shared_from_this()] (auto notify) {
97 return notify->remove_watcher(this_shared);
98 }).then([this] {
99 discard_state();
100 return seastar::now();
101 });
102 });
103 }
104
105 bool notify_reply_t::operator<(const notify_reply_t& rhs) const
106 {
107 // comparing std::pairs to emphasize our legacy. ceph-osd stores
108 // notify_replies as std::multimap<std::pair<gid, cookie>, bl>.
109 // unfortunately, what seems to be an implementation detail, got
110 // exposed as part of our public API (the `reply_buffer` parameter
111 // of the `rados_notify` family).
112 const auto lhsp = std::make_pair(watcher_gid, watcher_cookie);
113 const auto rhsp = std::make_pair(rhs.watcher_gid, rhs.watcher_cookie);
114 return lhsp < rhsp;
115 }
116
117 seastar::future<> Notify::remove_watcher(WatchRef watch)
118 {
119 if (discarded || complete) {
120 return seastar::now();
121 }
122 [[maybe_unused]] const auto num_removed = watchers.erase(watch);
123 assert(num_removed > 0);
124 return maybe_send_completion();
125 }
126
127
128 seastar::future<> Notify::complete_watcher(
129 WatchRef watch,
130 const ceph::bufferlist& reply_bl)
131 {
132 if (discarded || complete) {
133 return seastar::now();
134 }
135 notify_replies.emplace(notify_reply_t{
136 watch->get_watcher_gid(),
137 watch->get_cookie(),
138 reply_bl});
139 return remove_watcher(std::move(watch));
140 }
141
142 seastar::future<> Notify::maybe_send_completion()
143 {
144 logger().info("{} -- {} in progress watchers", __func__, watchers.size());
145 if (watchers.empty()) {
146 // prepare reply
147 ceph::bufferlist bl;
148 encode(notify_replies, bl);
149 // FIXME: this is just a stub
150 std::list<std::pair<uint64_t,uint64_t>> missed;
151 encode(missed, bl);
152
153 complete = true;
154
155 ceph::bufferlist empty;
156 auto reply = make_message<MWatchNotify>(
157 ninfo.cookie,
158 user_version,
159 ninfo.notify_id,
160 CEPH_WATCH_EVENT_NOTIFY_COMPLETE,
161 empty,
162 client_gid);
163 reply->set_data(bl);
164 return conn->send(std::move(reply));
165 }
166 return seastar::now();
167 }
168
169 } // namespace crimson::osd