]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/librados_test_stub/TestWatchNotify.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / test / librados_test_stub / TestWatchNotify.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "test/librados_test_stub/TestWatchNotify.h"
5 #include "include/Context.h"
6 #include "include/stringify.h"
7 #include "common/Finisher.h"
8 #include "test/librados_test_stub/TestRadosClient.h"
9 #include <boost/bind.hpp>
10 #include <boost/function.hpp>
11 #include "include/assert.h"
12
13 #define dout_subsys ceph_subsys_rados
14 #undef dout_prefix
15 #define dout_prefix *_dout << "TestWatchNotify::" << __func__ << ": "
16
17 namespace librados {
18
19 std::ostream& operator<<(std::ostream& out,
20 const TestWatchNotify::WatcherID &watcher_id) {
21 out << "(" << watcher_id.first << "," << watcher_id.second << ")";
22 return out;
23 }
24
25 TestWatchNotify::TestWatchNotify()
26 : m_lock("librados::TestWatchNotify::m_lock") {
27 }
28
29 void TestWatchNotify::flush(TestRadosClient *rados_client) {
30 CephContext *cct = rados_client->cct();
31
32 ldout(cct, 20) << "enter" << dendl;
33 // block until we know no additional async notify callbacks will occur
34 Mutex::Locker locker(m_lock);
35 while (m_pending_notifies > 0) {
36 m_file_watcher_cond.Wait(m_lock);
37 }
38 }
39
40 int TestWatchNotify::list_watchers(const std::string& o,
41 std::list<obj_watch_t> *out_watchers) {
42 Mutex::Locker lock(m_lock);
43 SharedWatcher watcher = get_watcher(o);
44
45 out_watchers->clear();
46 for (TestWatchNotify::WatchHandles::iterator it =
47 watcher->watch_handles.begin();
48 it != watcher->watch_handles.end(); ++it) {
49 obj_watch_t obj;
50 strcpy(obj.addr, it->second.addr.c_str());
51 obj.watcher_id = static_cast<int64_t>(it->second.gid);
52 obj.cookie = it->second.handle;
53 obj.timeout_seconds = 30;
54 out_watchers->push_back(obj);
55 }
56 return 0;
57 }
58
59 void TestWatchNotify::aio_flush(TestRadosClient *rados_client,
60 Context *on_finish) {
61 rados_client->get_aio_finisher()->queue(on_finish);
62 }
63
64 void TestWatchNotify::aio_watch(TestRadosClient *rados_client,
65 const std::string& o, uint64_t gid,
66 uint64_t *handle,
67 librados::WatchCtx2 *watch_ctx,
68 Context *on_finish) {
69 int r = watch(rados_client, o, gid, handle, nullptr, watch_ctx);
70 rados_client->get_aio_finisher()->queue(on_finish, r);
71 }
72
73 void TestWatchNotify::aio_unwatch(TestRadosClient *rados_client,
74 uint64_t handle, Context *on_finish) {
75 unwatch(rados_client, handle);
76 rados_client->get_aio_finisher()->queue(on_finish);
77 }
78
79 void TestWatchNotify::aio_notify(TestRadosClient *rados_client,
80 const std::string& oid, bufferlist& bl,
81 uint64_t timeout_ms, bufferlist *pbl,
82 Context *on_notify) {
83 CephContext *cct = rados_client->cct();
84
85 Mutex::Locker lock(m_lock);
86 ++m_pending_notifies;
87 uint64_t notify_id = ++m_notify_id;
88
89 ldout(cct, 20) << "oid=" << oid << ": notify_id=" << notify_id << dendl;
90
91 SharedWatcher watcher = get_watcher(oid);
92
93 SharedNotifyHandle notify_handle(new NotifyHandle());
94 notify_handle->rados_client = rados_client;
95 notify_handle->pbl = pbl;
96 notify_handle->on_notify = on_notify;
97 for (auto &watch_handle_pair : watcher->watch_handles) {
98 WatchHandle &watch_handle = watch_handle_pair.second;
99 notify_handle->pending_watcher_ids.insert(std::make_pair(
100 watch_handle.gid, watch_handle.handle));
101 }
102 watcher->notify_handles[notify_id] = notify_handle;
103
104 FunctionContext *ctx = new FunctionContext(
105 boost::bind(&TestWatchNotify::execute_notify, this, rados_client, oid, bl,
106 notify_id));
107 rados_client->get_aio_finisher()->queue(ctx);
108 }
109
110 int TestWatchNotify::notify(TestRadosClient *rados_client,
111 const std::string& oid, bufferlist& bl,
112 uint64_t timeout_ms, bufferlist *pbl) {
113 C_SaferCond cond;
114 aio_notify(rados_client, oid, bl, timeout_ms, pbl, &cond);
115 return cond.wait();
116 }
117
118 void TestWatchNotify::notify_ack(TestRadosClient *rados_client,
119 const std::string& o, uint64_t notify_id,
120 uint64_t handle, uint64_t gid,
121 bufferlist& bl) {
122 CephContext *cct = rados_client->cct();
123 ldout(cct, 20) << "notify_id=" << notify_id << ", handle=" << handle
124 << ", gid=" << gid << dendl;
125 Mutex::Locker lock(m_lock);
126 WatcherID watcher_id = std::make_pair(gid, handle);
127 ack_notify(rados_client, o, notify_id, watcher_id, bl);
128 finish_notify(rados_client, o, notify_id);
129 }
130
131 int TestWatchNotify::watch(TestRadosClient *rados_client,
132 const std::string& o, uint64_t gid,
133 uint64_t *handle, librados::WatchCtx *ctx,
134 librados::WatchCtx2 *ctx2) {
135 CephContext *cct = rados_client->cct();
136
137 Mutex::Locker lock(m_lock);
138 SharedWatcher watcher = get_watcher(o);
139
140 WatchHandle watch_handle;
141 watch_handle.rados_client = rados_client;
142 watch_handle.addr = "127.0.0.1:0/" + stringify(rados_client->get_nonce());
143 watch_handle.nonce = rados_client->get_nonce();
144 watch_handle.gid = gid;
145 watch_handle.handle = ++m_handle;
146 watch_handle.watch_ctx = ctx;
147 watch_handle.watch_ctx2 = ctx2;
148 watcher->watch_handles[watch_handle.handle] = watch_handle;
149
150 *handle = watch_handle.handle;
151
152 ldout(cct, 20) << "oid=" << o << ", gid=" << gid << ": handle=" << *handle
153 << dendl;
154 return 0;
155 }
156
157 int TestWatchNotify::unwatch(TestRadosClient *rados_client,
158 uint64_t handle) {
159 CephContext *cct = rados_client->cct();
160
161 ldout(cct, 20) << "handle=" << handle << dendl;
162 Mutex::Locker locker(m_lock);
163 for (FileWatchers::iterator it = m_file_watchers.begin();
164 it != m_file_watchers.end(); ++it) {
165 SharedWatcher watcher = it->second;
166
167 WatchHandles::iterator w_it = watcher->watch_handles.find(handle);
168 if (w_it != watcher->watch_handles.end()) {
169 watcher->watch_handles.erase(w_it);
170 if (watcher->watch_handles.empty() && watcher->notify_handles.empty()) {
171 m_file_watchers.erase(it);
172 }
173 break;
174 }
175 }
176 return 0;
177 }
178
179 TestWatchNotify::SharedWatcher TestWatchNotify::get_watcher(
180 const std::string& oid) {
181 assert(m_lock.is_locked());
182 SharedWatcher &watcher = m_file_watchers[oid];
183 if (!watcher) {
184 watcher.reset(new Watcher());
185 }
186 return watcher;
187 }
188
189 void TestWatchNotify::execute_notify(TestRadosClient *rados_client,
190 const std::string &oid,
191 bufferlist &bl, uint64_t notify_id) {
192 CephContext *cct = rados_client->cct();
193
194 ldout(cct, 20) << "oid=" << oid << ", notify_id=" << notify_id << dendl;
195
196 Mutex::Locker lock(m_lock);
197 SharedWatcher watcher = get_watcher(oid);
198 WatchHandles &watch_handles = watcher->watch_handles;
199
200 NotifyHandles::iterator n_it = watcher->notify_handles.find(notify_id);
201 if (n_it == watcher->notify_handles.end()) {
202 ldout(cct, 1) << "oid=" << oid << ", notify_id=" << notify_id
203 << ": not found" << dendl;
204 return;
205 }
206
207 SharedNotifyHandle notify_handle = n_it->second;
208 WatcherIDs watcher_ids(notify_handle->pending_watcher_ids);
209 for (WatcherIDs::iterator w_id_it = watcher_ids.begin();
210 w_id_it != watcher_ids.end(); ++w_id_it) {
211 WatcherID watcher_id = *w_id_it;
212 WatchHandles::iterator w_it = watch_handles.find(watcher_id.second);
213 if (w_it == watch_handles.end()) {
214 // client disconnected before notification processed
215 notify_handle->pending_watcher_ids.erase(watcher_id);
216 } else {
217 WatchHandle watch_handle = w_it->second;
218 assert(watch_handle.gid == watcher_id.first);
219 assert(watch_handle.handle == watcher_id.second);
220
221 uint64_t notifier_id = rados_client->get_instance_id();
222 watch_handle.rados_client->get_aio_finisher()->queue(new FunctionContext(
223 [this, oid, bl, notify_id, watch_handle, notifier_id](int r) {
224 bufferlist notify_bl;
225 notify_bl.append(bl);
226
227 if (watch_handle.watch_ctx2 != NULL) {
228 watch_handle.watch_ctx2->handle_notify(notify_id,
229 watch_handle.handle,
230 notifier_id, notify_bl);
231 } else if (watch_handle.watch_ctx != NULL) {
232 watch_handle.watch_ctx->notify(0, 0, notify_bl);
233
234 // auto ack old-style watch/notify clients
235 ack_notify(watch_handle.rados_client, oid, notify_id,
236 {watch_handle.gid, watch_handle.handle}, bufferlist());
237 }
238 }));
239 }
240 }
241
242 finish_notify(rados_client, oid, notify_id);
243
244 if (--m_pending_notifies == 0) {
245 m_file_watcher_cond.Signal();
246 }
247 }
248
249 void TestWatchNotify::ack_notify(TestRadosClient *rados_client,
250 const std::string &oid,
251 uint64_t notify_id,
252 const WatcherID &watcher_id,
253 const bufferlist &bl) {
254 CephContext *cct = rados_client->cct();
255
256 ldout(cct, 20) << "oid=" << oid << ", notify_id=" << notify_id
257 << ", WatcherID=" << watcher_id << dendl;
258
259 assert(m_lock.is_locked());
260 SharedWatcher watcher = get_watcher(oid);
261
262 NotifyHandles::iterator it = watcher->notify_handles.find(notify_id);
263 if (it == watcher->notify_handles.end()) {
264 ldout(cct, 1) << "oid=" << oid << ", notify_id=" << notify_id
265 << ", WatcherID=" << watcher_id << ": not found" << dendl;
266 return;
267 }
268
269 bufferlist response;
270 response.append(bl);
271
272 SharedNotifyHandle notify_handle = it->second;
273 notify_handle->notify_responses[watcher_id] = response;
274 notify_handle->pending_watcher_ids.erase(watcher_id);
275 }
276
277 void TestWatchNotify::finish_notify(TestRadosClient *rados_client,
278 const std::string &oid,
279 uint64_t notify_id) {
280 CephContext *cct = rados_client->cct();
281
282 ldout(cct, 20) << "oid=" << oid << ", notify_id=" << notify_id << dendl;
283
284 assert(m_lock.is_locked());
285 SharedWatcher watcher = get_watcher(oid);
286
287 NotifyHandles::iterator it = watcher->notify_handles.find(notify_id);
288 if (it == watcher->notify_handles.end()) {
289 ldout(cct, 1) << "oid=" << oid << ", notify_id=" << notify_id
290 << ": not found" << dendl;
291 return;
292 }
293
294 SharedNotifyHandle notify_handle = it->second;
295 if (!notify_handle->pending_watcher_ids.empty()) {
296 ldout(cct, 10) << "oid=" << oid << ", notify_id=" << notify_id
297 << ": pending watchers, returning" << dendl;
298 return;
299 }
300
301 ldout(cct, 20) << "oid=" << oid << ", notify_id=" << notify_id
302 << ": completing" << dendl;
303
304 if (notify_handle->pbl != NULL) {
305 ::encode(notify_handle->notify_responses, *notify_handle->pbl);
306 ::encode(notify_handle->pending_watcher_ids, *notify_handle->pbl);
307 }
308
309 notify_handle->rados_client->get_aio_finisher()->queue(
310 notify_handle->on_notify, 0);
311 watcher->notify_handles.erase(notify_id);
312 if (watcher->watch_handles.empty() && watcher->notify_handles.empty()) {
313 m_file_watchers.erase(oid);
314 }
315 }
316
317 void TestWatchNotify::blacklist(uint32_t nonce) {
318 Mutex::Locker locker(m_lock);
319
320 for (auto file_it = m_file_watchers.begin();
321 file_it != m_file_watchers.end(); ) {
322 auto &watcher = file_it->second;
323 for (auto w_it = watcher->watch_handles.begin();
324 w_it != watcher->watch_handles.end();) {
325 if (w_it->second.nonce == nonce) {
326 w_it = watcher->watch_handles.erase(w_it);
327 } else {
328 ++w_it;
329 }
330 }
331 if (watcher->watch_handles.empty() && watcher->notify_handles.empty()) {
332 file_it = m_file_watchers.erase(file_it);
333 } else {
334 ++file_it;
335 }
336 }
337 }
338
339 } // namespace librados