]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "test/librados_test_stub/TestWatchNotify.h" | |
5 | #include "include/Context.h" | |
6 | #include "include/stringify.h" | |
7 | #include "common/Finisher.h" | |
8 | #include "test/librados_test_stub/TestRadosClient.h" | |
9 | #include <boost/bind.hpp> | |
10 | #include <boost/function.hpp> | |
11 | #include "include/assert.h" | |
12 | ||
13 | #define dout_subsys ceph_subsys_rados | |
14 | #undef dout_prefix | |
15 | #define dout_prefix *_dout << "TestWatchNotify::" << __func__ << ": " | |
16 | ||
17 | namespace librados { | |
18 | ||
19 | std::ostream& operator<<(std::ostream& out, | |
20 | const TestWatchNotify::WatcherID &watcher_id) { | |
21 | out << "(" << watcher_id.first << "," << watcher_id.second << ")"; | |
22 | return out; | |
23 | } | |
24 | ||
25 | TestWatchNotify::TestWatchNotify() | |
26 | : m_lock("librados::TestWatchNotify::m_lock") { | |
27 | } | |
28 | ||
29 | void TestWatchNotify::flush(TestRadosClient *rados_client) { | |
30 | CephContext *cct = rados_client->cct(); | |
31 | ||
32 | ldout(cct, 20) << "enter" << dendl; | |
33 | // block until we know no additional async notify callbacks will occur | |
34 | Mutex::Locker locker(m_lock); | |
35 | while (m_pending_notifies > 0) { | |
36 | m_file_watcher_cond.Wait(m_lock); | |
37 | } | |
38 | } | |
39 | ||
40 | int TestWatchNotify::list_watchers(const std::string& o, | |
41 | std::list<obj_watch_t> *out_watchers) { | |
42 | Mutex::Locker lock(m_lock); | |
43 | SharedWatcher watcher = get_watcher(o); | |
44 | ||
45 | out_watchers->clear(); | |
46 | for (TestWatchNotify::WatchHandles::iterator it = | |
47 | watcher->watch_handles.begin(); | |
48 | it != watcher->watch_handles.end(); ++it) { | |
49 | obj_watch_t obj; | |
50 | strcpy(obj.addr, it->second.addr.c_str()); | |
51 | obj.watcher_id = static_cast<int64_t>(it->second.gid); | |
52 | obj.cookie = it->second.handle; | |
53 | obj.timeout_seconds = 30; | |
54 | out_watchers->push_back(obj); | |
55 | } | |
56 | return 0; | |
57 | } | |
58 | ||
59 | void TestWatchNotify::aio_flush(TestRadosClient *rados_client, | |
60 | Context *on_finish) { | |
61 | rados_client->get_aio_finisher()->queue(on_finish); | |
62 | } | |
63 | ||
64 | void TestWatchNotify::aio_watch(TestRadosClient *rados_client, | |
65 | const std::string& o, uint64_t gid, | |
66 | uint64_t *handle, | |
67 | librados::WatchCtx2 *watch_ctx, | |
68 | Context *on_finish) { | |
69 | int r = watch(rados_client, o, gid, handle, nullptr, watch_ctx); | |
70 | rados_client->get_aio_finisher()->queue(on_finish, r); | |
71 | } | |
72 | ||
73 | void TestWatchNotify::aio_unwatch(TestRadosClient *rados_client, | |
74 | uint64_t handle, Context *on_finish) { | |
75 | unwatch(rados_client, handle); | |
76 | rados_client->get_aio_finisher()->queue(on_finish); | |
77 | } | |
78 | ||
79 | void TestWatchNotify::aio_notify(TestRadosClient *rados_client, | |
80 | const std::string& oid, bufferlist& bl, | |
81 | uint64_t timeout_ms, bufferlist *pbl, | |
82 | Context *on_notify) { | |
83 | CephContext *cct = rados_client->cct(); | |
84 | ||
85 | Mutex::Locker lock(m_lock); | |
86 | ++m_pending_notifies; | |
87 | uint64_t notify_id = ++m_notify_id; | |
88 | ||
89 | ldout(cct, 20) << "oid=" << oid << ": notify_id=" << notify_id << dendl; | |
90 | ||
91 | SharedWatcher watcher = get_watcher(oid); | |
92 | ||
93 | SharedNotifyHandle notify_handle(new NotifyHandle()); | |
94 | notify_handle->rados_client = rados_client; | |
95 | notify_handle->pbl = pbl; | |
96 | notify_handle->on_notify = on_notify; | |
97 | for (auto &watch_handle_pair : watcher->watch_handles) { | |
98 | WatchHandle &watch_handle = watch_handle_pair.second; | |
99 | notify_handle->pending_watcher_ids.insert(std::make_pair( | |
100 | watch_handle.gid, watch_handle.handle)); | |
101 | } | |
102 | watcher->notify_handles[notify_id] = notify_handle; | |
103 | ||
104 | FunctionContext *ctx = new FunctionContext( | |
105 | boost::bind(&TestWatchNotify::execute_notify, this, rados_client, oid, bl, | |
106 | notify_id)); | |
107 | rados_client->get_aio_finisher()->queue(ctx); | |
108 | } | |
109 | ||
110 | int TestWatchNotify::notify(TestRadosClient *rados_client, | |
111 | const std::string& oid, bufferlist& bl, | |
112 | uint64_t timeout_ms, bufferlist *pbl) { | |
113 | C_SaferCond cond; | |
114 | aio_notify(rados_client, oid, bl, timeout_ms, pbl, &cond); | |
115 | return cond.wait(); | |
116 | } | |
117 | ||
118 | void TestWatchNotify::notify_ack(TestRadosClient *rados_client, | |
119 | const std::string& o, uint64_t notify_id, | |
120 | uint64_t handle, uint64_t gid, | |
121 | bufferlist& bl) { | |
122 | CephContext *cct = rados_client->cct(); | |
123 | ldout(cct, 20) << "notify_id=" << notify_id << ", handle=" << handle | |
124 | << ", gid=" << gid << dendl; | |
125 | Mutex::Locker lock(m_lock); | |
126 | WatcherID watcher_id = std::make_pair(gid, handle); | |
127 | ack_notify(rados_client, o, notify_id, watcher_id, bl); | |
128 | finish_notify(rados_client, o, notify_id); | |
129 | } | |
130 | ||
131 | int TestWatchNotify::watch(TestRadosClient *rados_client, | |
132 | const std::string& o, uint64_t gid, | |
133 | uint64_t *handle, librados::WatchCtx *ctx, | |
134 | librados::WatchCtx2 *ctx2) { | |
135 | CephContext *cct = rados_client->cct(); | |
136 | ||
137 | Mutex::Locker lock(m_lock); | |
138 | SharedWatcher watcher = get_watcher(o); | |
139 | ||
140 | WatchHandle watch_handle; | |
141 | watch_handle.rados_client = rados_client; | |
142 | watch_handle.addr = "127.0.0.1:0/" + stringify(rados_client->get_nonce()); | |
143 | watch_handle.nonce = rados_client->get_nonce(); | |
144 | watch_handle.gid = gid; | |
145 | watch_handle.handle = ++m_handle; | |
146 | watch_handle.watch_ctx = ctx; | |
147 | watch_handle.watch_ctx2 = ctx2; | |
148 | watcher->watch_handles[watch_handle.handle] = watch_handle; | |
149 | ||
150 | *handle = watch_handle.handle; | |
151 | ||
152 | ldout(cct, 20) << "oid=" << o << ", gid=" << gid << ": handle=" << *handle | |
153 | << dendl; | |
154 | return 0; | |
155 | } | |
156 | ||
157 | int TestWatchNotify::unwatch(TestRadosClient *rados_client, | |
158 | uint64_t handle) { | |
159 | CephContext *cct = rados_client->cct(); | |
160 | ||
161 | ldout(cct, 20) << "handle=" << handle << dendl; | |
162 | Mutex::Locker locker(m_lock); | |
163 | for (FileWatchers::iterator it = m_file_watchers.begin(); | |
164 | it != m_file_watchers.end(); ++it) { | |
165 | SharedWatcher watcher = it->second; | |
166 | ||
167 | WatchHandles::iterator w_it = watcher->watch_handles.find(handle); | |
168 | if (w_it != watcher->watch_handles.end()) { | |
169 | watcher->watch_handles.erase(w_it); | |
170 | if (watcher->watch_handles.empty() && watcher->notify_handles.empty()) { | |
171 | m_file_watchers.erase(it); | |
172 | } | |
173 | break; | |
174 | } | |
175 | } | |
176 | return 0; | |
177 | } | |
178 | ||
179 | TestWatchNotify::SharedWatcher TestWatchNotify::get_watcher( | |
180 | const std::string& oid) { | |
181 | assert(m_lock.is_locked()); | |
182 | SharedWatcher &watcher = m_file_watchers[oid]; | |
183 | if (!watcher) { | |
184 | watcher.reset(new Watcher()); | |
185 | } | |
186 | return watcher; | |
187 | } | |
188 | ||
189 | void TestWatchNotify::execute_notify(TestRadosClient *rados_client, | |
190 | const std::string &oid, | |
191 | bufferlist &bl, uint64_t notify_id) { | |
192 | CephContext *cct = rados_client->cct(); | |
193 | ||
194 | ldout(cct, 20) << "oid=" << oid << ", notify_id=" << notify_id << dendl; | |
195 | ||
196 | Mutex::Locker lock(m_lock); | |
197 | SharedWatcher watcher = get_watcher(oid); | |
198 | WatchHandles &watch_handles = watcher->watch_handles; | |
199 | ||
200 | NotifyHandles::iterator n_it = watcher->notify_handles.find(notify_id); | |
201 | if (n_it == watcher->notify_handles.end()) { | |
202 | ldout(cct, 1) << "oid=" << oid << ", notify_id=" << notify_id | |
203 | << ": not found" << dendl; | |
204 | return; | |
205 | } | |
206 | ||
207 | SharedNotifyHandle notify_handle = n_it->second; | |
208 | WatcherIDs watcher_ids(notify_handle->pending_watcher_ids); | |
209 | for (WatcherIDs::iterator w_id_it = watcher_ids.begin(); | |
210 | w_id_it != watcher_ids.end(); ++w_id_it) { | |
211 | WatcherID watcher_id = *w_id_it; | |
212 | WatchHandles::iterator w_it = watch_handles.find(watcher_id.second); | |
213 | if (w_it == watch_handles.end()) { | |
214 | // client disconnected before notification processed | |
215 | notify_handle->pending_watcher_ids.erase(watcher_id); | |
216 | } else { | |
217 | WatchHandle watch_handle = w_it->second; | |
218 | assert(watch_handle.gid == watcher_id.first); | |
219 | assert(watch_handle.handle == watcher_id.second); | |
220 | ||
221 | uint64_t notifier_id = rados_client->get_instance_id(); | |
222 | watch_handle.rados_client->get_aio_finisher()->queue(new FunctionContext( | |
223 | [this, oid, bl, notify_id, watch_handle, notifier_id](int r) { | |
224 | bufferlist notify_bl; | |
225 | notify_bl.append(bl); | |
226 | ||
227 | if (watch_handle.watch_ctx2 != NULL) { | |
228 | watch_handle.watch_ctx2->handle_notify(notify_id, | |
229 | watch_handle.handle, | |
230 | notifier_id, notify_bl); | |
231 | } else if (watch_handle.watch_ctx != NULL) { | |
232 | watch_handle.watch_ctx->notify(0, 0, notify_bl); | |
233 | ||
234 | // auto ack old-style watch/notify clients | |
235 | ack_notify(watch_handle.rados_client, oid, notify_id, | |
236 | {watch_handle.gid, watch_handle.handle}, bufferlist()); | |
237 | } | |
238 | })); | |
239 | } | |
240 | } | |
241 | ||
242 | finish_notify(rados_client, oid, notify_id); | |
243 | ||
244 | if (--m_pending_notifies == 0) { | |
245 | m_file_watcher_cond.Signal(); | |
246 | } | |
247 | } | |
248 | ||
249 | void TestWatchNotify::ack_notify(TestRadosClient *rados_client, | |
250 | const std::string &oid, | |
251 | uint64_t notify_id, | |
252 | const WatcherID &watcher_id, | |
253 | const bufferlist &bl) { | |
254 | CephContext *cct = rados_client->cct(); | |
255 | ||
256 | ldout(cct, 20) << "oid=" << oid << ", notify_id=" << notify_id | |
257 | << ", WatcherID=" << watcher_id << dendl; | |
258 | ||
259 | assert(m_lock.is_locked()); | |
260 | SharedWatcher watcher = get_watcher(oid); | |
261 | ||
262 | NotifyHandles::iterator it = watcher->notify_handles.find(notify_id); | |
263 | if (it == watcher->notify_handles.end()) { | |
264 | ldout(cct, 1) << "oid=" << oid << ", notify_id=" << notify_id | |
265 | << ", WatcherID=" << watcher_id << ": not found" << dendl; | |
266 | return; | |
267 | } | |
268 | ||
269 | bufferlist response; | |
270 | response.append(bl); | |
271 | ||
272 | SharedNotifyHandle notify_handle = it->second; | |
273 | notify_handle->notify_responses[watcher_id] = response; | |
274 | notify_handle->pending_watcher_ids.erase(watcher_id); | |
275 | } | |
276 | ||
277 | void TestWatchNotify::finish_notify(TestRadosClient *rados_client, | |
278 | const std::string &oid, | |
279 | uint64_t notify_id) { | |
280 | CephContext *cct = rados_client->cct(); | |
281 | ||
282 | ldout(cct, 20) << "oid=" << oid << ", notify_id=" << notify_id << dendl; | |
283 | ||
284 | assert(m_lock.is_locked()); | |
285 | SharedWatcher watcher = get_watcher(oid); | |
286 | ||
287 | NotifyHandles::iterator it = watcher->notify_handles.find(notify_id); | |
288 | if (it == watcher->notify_handles.end()) { | |
289 | ldout(cct, 1) << "oid=" << oid << ", notify_id=" << notify_id | |
290 | << ": not found" << dendl; | |
291 | return; | |
292 | } | |
293 | ||
294 | SharedNotifyHandle notify_handle = it->second; | |
295 | if (!notify_handle->pending_watcher_ids.empty()) { | |
296 | ldout(cct, 10) << "oid=" << oid << ", notify_id=" << notify_id | |
297 | << ": pending watchers, returning" << dendl; | |
298 | return; | |
299 | } | |
300 | ||
301 | ldout(cct, 20) << "oid=" << oid << ", notify_id=" << notify_id | |
302 | << ": completing" << dendl; | |
303 | ||
304 | if (notify_handle->pbl != NULL) { | |
305 | ::encode(notify_handle->notify_responses, *notify_handle->pbl); | |
306 | ::encode(notify_handle->pending_watcher_ids, *notify_handle->pbl); | |
307 | } | |
308 | ||
309 | notify_handle->rados_client->get_aio_finisher()->queue( | |
310 | notify_handle->on_notify, 0); | |
311 | watcher->notify_handles.erase(notify_id); | |
312 | if (watcher->watch_handles.empty() && watcher->notify_handles.empty()) { | |
313 | m_file_watchers.erase(oid); | |
314 | } | |
315 | } | |
316 | ||
317 | void TestWatchNotify::blacklist(uint32_t nonce) { | |
318 | Mutex::Locker locker(m_lock); | |
319 | ||
320 | for (auto file_it = m_file_watchers.begin(); | |
321 | file_it != m_file_watchers.end(); ) { | |
322 | auto &watcher = file_it->second; | |
323 | for (auto w_it = watcher->watch_handles.begin(); | |
324 | w_it != watcher->watch_handles.end();) { | |
325 | if (w_it->second.nonce == nonce) { | |
326 | w_it = watcher->watch_handles.erase(w_it); | |
327 | } else { | |
328 | ++w_it; | |
329 | } | |
330 | } | |
331 | if (watcher->watch_handles.empty() && watcher->notify_handles.empty()) { | |
332 | file_it = m_file_watchers.erase(file_it); | |
333 | } else { | |
334 | ++file_it; | |
335 | } | |
336 | } | |
337 | } | |
338 | ||
339 | } // namespace librados |