]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "cls/journal/cls_journal_client.h" | |
31f18b77 | 5 | #include "include/rados/librados.hpp" |
7c673cae FG |
6 | #include "include/buffer.h" |
7 | #include "include/Context.h" | |
8 | #include "common/Cond.h" | |
9 | #include <errno.h> | |
7c673cae FG |
10 | |
11 | namespace cls { | |
12 | namespace journal { | |
13 | namespace client { | |
14 | ||
15 | namespace { | |
16 | ||
17 | struct C_AioExec : public Context { | |
18 | librados::IoCtx &ioctx; | |
19 | std::string oid; | |
20 | ||
21 | C_AioExec(librados::IoCtx &_ioctx, const std::string &_oid) | |
22 | : ioctx(_ioctx), oid(_oid) { | |
23 | } | |
24 | ||
25 | static void rados_callback(rados_completion_t c, void *arg) { | |
26 | Context *ctx = reinterpret_cast<Context *>(arg); | |
27 | ctx->complete(rados_aio_get_return_value(c)); | |
28 | } | |
29 | }; | |
30 | ||
31 | struct C_ClientList : public C_AioExec { | |
32 | std::set<cls::journal::Client> *clients; | |
33 | Context *on_finish; | |
34 | bufferlist outbl; | |
35 | ||
36 | C_ClientList(librados::IoCtx &_ioctx, const std::string &_oid, | |
37 | std::set<cls::journal::Client> *_clients, | |
38 | Context *_on_finish) | |
39 | : C_AioExec(_ioctx, _oid), clients(_clients), on_finish(_on_finish) {} | |
40 | ||
41 | void send(const std::string &start_after) { | |
42 | bufferlist inbl; | |
43 | ::encode(start_after, inbl); | |
44 | ::encode(JOURNAL_MAX_RETURN, inbl); | |
45 | ||
46 | librados::ObjectReadOperation op; | |
47 | op.exec("journal", "client_list", inbl); | |
48 | ||
49 | outbl.clear(); | |
50 | librados::AioCompletion *rados_completion = | |
51 | librados::Rados::aio_create_completion(this, rados_callback, NULL); | |
52 | int r = ioctx.aio_operate(oid, rados_completion, &op, &outbl); | |
53 | assert(r == 0); | |
54 | rados_completion->release(); | |
55 | } | |
56 | ||
57 | void complete(int r) override { | |
58 | if (r < 0) { | |
59 | finish(r); | |
60 | return; | |
61 | } | |
62 | ||
63 | try { | |
64 | bufferlist::iterator iter = outbl.begin(); | |
65 | std::set<cls::journal::Client> partial_clients; | |
66 | ::decode(partial_clients, iter); | |
67 | ||
68 | std::string start_after; | |
69 | if (!partial_clients.empty()) { | |
70 | start_after = partial_clients.rbegin()->id; | |
71 | clients->insert(partial_clients.begin(), partial_clients.end()); | |
72 | } | |
73 | ||
74 | if (partial_clients.size() < JOURNAL_MAX_RETURN) { | |
75 | finish(0); | |
76 | } else { | |
77 | send(start_after); | |
78 | } | |
79 | } catch (const buffer::error &err) { | |
80 | finish(-EBADMSG); | |
81 | } | |
82 | } | |
83 | ||
84 | void finish(int r) override { | |
85 | on_finish->complete(r); | |
86 | delete this; | |
87 | } | |
88 | }; | |
89 | ||
90 | struct C_ImmutableMetadata : public C_AioExec { | |
91 | uint8_t *order; | |
92 | uint8_t *splay_width; | |
93 | int64_t *pool_id; | |
94 | Context *on_finish; | |
95 | bufferlist outbl; | |
96 | ||
97 | C_ImmutableMetadata(librados::IoCtx &_ioctx, const std::string &_oid, | |
98 | uint8_t *_order, uint8_t *_splay_width, | |
99 | int64_t *_pool_id, Context *_on_finish) | |
100 | : C_AioExec(_ioctx, _oid), order(_order), splay_width(_splay_width), | |
101 | pool_id(_pool_id), on_finish(_on_finish) { | |
102 | } | |
103 | ||
104 | void send() { | |
105 | librados::ObjectReadOperation op; | |
106 | bufferlist inbl; | |
107 | op.exec("journal", "get_order", inbl); | |
108 | op.exec("journal", "get_splay_width", inbl); | |
109 | op.exec("journal", "get_pool_id", inbl); | |
110 | ||
111 | librados::AioCompletion *rados_completion = | |
112 | librados::Rados::aio_create_completion(this, rados_callback, NULL); | |
113 | int r = ioctx.aio_operate(oid, rados_completion, &op, &outbl); | |
114 | assert(r == 0); | |
115 | rados_completion->release(); | |
116 | } | |
117 | ||
118 | void finish(int r) override { | |
119 | if (r == 0) { | |
120 | try { | |
121 | bufferlist::iterator iter = outbl.begin(); | |
122 | ::decode(*order, iter); | |
123 | ::decode(*splay_width, iter); | |
124 | ::decode(*pool_id, iter); | |
125 | } catch (const buffer::error &err) { | |
126 | r = -EBADMSG; | |
127 | } | |
128 | } | |
129 | on_finish->complete(r); | |
130 | } | |
131 | }; | |
132 | ||
133 | struct C_MutableMetadata : public C_AioExec { | |
134 | uint64_t *minimum_set; | |
135 | uint64_t *active_set; | |
136 | C_ClientList *client_list; | |
137 | bufferlist outbl; | |
138 | ||
139 | C_MutableMetadata(librados::IoCtx &_ioctx, const std::string &_oid, | |
140 | uint64_t *_minimum_set, uint64_t *_active_set, | |
141 | C_ClientList *_client_list) | |
142 | : C_AioExec(_ioctx, _oid), minimum_set(_minimum_set), | |
143 | active_set(_active_set), client_list(_client_list) {} | |
144 | ||
145 | void send() { | |
146 | librados::ObjectReadOperation op; | |
147 | bufferlist inbl; | |
148 | op.exec("journal", "get_minimum_set", inbl); | |
149 | op.exec("journal", "get_active_set", inbl); | |
150 | ||
151 | librados::AioCompletion *rados_completion = | |
152 | librados::Rados::aio_create_completion(this, rados_callback, NULL); | |
153 | int r = ioctx.aio_operate(oid, rados_completion, &op, &outbl); | |
154 | assert(r == 0); | |
155 | rados_completion->release(); | |
156 | } | |
157 | ||
158 | void finish(int r) override { | |
159 | if (r == 0) { | |
160 | try { | |
161 | bufferlist::iterator iter = outbl.begin(); | |
162 | ::decode(*minimum_set, iter); | |
163 | ::decode(*active_set, iter); | |
164 | client_list->send(""); | |
165 | } catch (const buffer::error &err) { | |
166 | r = -EBADMSG; | |
167 | } | |
168 | } | |
169 | if (r < 0) { | |
170 | client_list->complete(r); | |
171 | } | |
172 | } | |
173 | }; | |
174 | ||
175 | ||
176 | } // anonymous namespace | |
177 | ||
178 | void create(librados::ObjectWriteOperation *op, | |
179 | uint8_t order, uint8_t splay, int64_t pool_id) { | |
180 | bufferlist bl; | |
181 | ::encode(order, bl); | |
182 | ::encode(splay, bl); | |
183 | ::encode(pool_id, bl); | |
184 | ||
185 | op->exec("journal", "create", bl); | |
186 | } | |
187 | ||
188 | int create(librados::IoCtx &ioctx, const std::string &oid, uint8_t order, | |
189 | uint8_t splay, int64_t pool_id) { | |
190 | librados::ObjectWriteOperation op; | |
191 | create(&op, order, splay, pool_id); | |
192 | ||
193 | int r = ioctx.operate(oid, &op); | |
194 | if (r < 0) { | |
195 | return r; | |
196 | } | |
197 | return 0; | |
198 | } | |
199 | ||
200 | void get_immutable_metadata(librados::IoCtx &ioctx, const std::string &oid, | |
201 | uint8_t *order, uint8_t *splay_width, | |
202 | int64_t *pool_id, Context *on_finish) { | |
203 | C_ImmutableMetadata *metadata = new C_ImmutableMetadata(ioctx, oid, order, | |
204 | splay_width, pool_id, | |
205 | on_finish); | |
206 | metadata->send(); | |
207 | } | |
208 | ||
209 | void get_mutable_metadata(librados::IoCtx &ioctx, const std::string &oid, | |
210 | uint64_t *minimum_set, uint64_t *active_set, | |
211 | std::set<cls::journal::Client> *clients, | |
212 | Context *on_finish) { | |
213 | C_ClientList *client_list = new C_ClientList(ioctx, oid, clients, on_finish); | |
214 | C_MutableMetadata *metadata = new C_MutableMetadata( | |
215 | ioctx, oid, minimum_set, active_set, client_list); | |
216 | metadata->send(); | |
217 | } | |
218 | ||
219 | void set_minimum_set(librados::ObjectWriteOperation *op, uint64_t object_set) { | |
220 | bufferlist bl; | |
221 | ::encode(object_set, bl); | |
222 | op->exec("journal", "set_minimum_set", bl); | |
223 | } | |
224 | ||
225 | void set_active_set(librados::ObjectWriteOperation *op, uint64_t object_set) { | |
226 | bufferlist bl; | |
227 | ::encode(object_set, bl); | |
228 | op->exec("journal", "set_active_set", bl); | |
229 | } | |
230 | ||
231 | int get_client(librados::IoCtx &ioctx, const std::string &oid, | |
232 | const std::string &id, cls::journal::Client *client) { | |
233 | librados::ObjectReadOperation op; | |
234 | get_client_start(&op, id); | |
235 | ||
236 | bufferlist out_bl; | |
237 | int r = ioctx.operate(oid, &op, &out_bl); | |
238 | if (r < 0) { | |
239 | return r; | |
240 | } | |
241 | ||
242 | bufferlist::iterator iter = out_bl.begin(); | |
243 | r = get_client_finish(&iter, client); | |
244 | if (r < 0) { | |
245 | return r; | |
246 | } | |
247 | return 0; | |
248 | } | |
249 | ||
250 | void get_client_start(librados::ObjectReadOperation *op, | |
251 | const std::string &id) { | |
252 | bufferlist bl; | |
253 | ::encode(id, bl); | |
254 | op->exec("journal", "get_client", bl); | |
255 | } | |
256 | ||
257 | int get_client_finish(bufferlist::iterator *iter, | |
258 | cls::journal::Client *client) { | |
259 | try { | |
260 | ::decode(*client, *iter); | |
261 | } catch (const buffer::error &err) { | |
262 | return -EBADMSG; | |
263 | } | |
264 | return 0; | |
265 | } | |
266 | ||
267 | int client_register(librados::IoCtx &ioctx, const std::string &oid, | |
268 | const std::string &id, const bufferlist &data) { | |
269 | librados::ObjectWriteOperation op; | |
270 | client_register(&op, id, data); | |
271 | return ioctx.operate(oid, &op); | |
272 | } | |
273 | ||
274 | void client_register(librados::ObjectWriteOperation *op, | |
275 | const std::string &id, const bufferlist &data) { | |
276 | bufferlist bl; | |
277 | ::encode(id, bl); | |
278 | ::encode(data, bl); | |
279 | op->exec("journal", "client_register", bl); | |
280 | } | |
281 | ||
282 | int client_update_data(librados::IoCtx &ioctx, const std::string &oid, | |
283 | const std::string &id, const bufferlist &data) { | |
284 | librados::ObjectWriteOperation op; | |
285 | client_update_data(&op, id, data); | |
286 | return ioctx.operate(oid, &op); | |
287 | } | |
288 | ||
289 | void client_update_data(librados::ObjectWriteOperation *op, | |
290 | const std::string &id, const bufferlist &data) { | |
291 | bufferlist bl; | |
292 | ::encode(id, bl); | |
293 | ::encode(data, bl); | |
294 | op->exec("journal", "client_update_data", bl); | |
295 | } | |
296 | ||
297 | int client_update_state(librados::IoCtx &ioctx, const std::string &oid, | |
298 | const std::string &id, cls::journal::ClientState state) { | |
299 | librados::ObjectWriteOperation op; | |
300 | client_update_state(&op, id, state); | |
301 | return ioctx.operate(oid, &op); | |
302 | } | |
303 | ||
304 | void client_update_state(librados::ObjectWriteOperation *op, | |
305 | const std::string &id, | |
306 | cls::journal::ClientState state) { | |
307 | bufferlist bl; | |
308 | ::encode(id, bl); | |
309 | ::encode(static_cast<uint8_t>(state), bl); | |
310 | op->exec("journal", "client_update_state", bl); | |
311 | } | |
312 | ||
313 | int client_unregister(librados::IoCtx &ioctx, const std::string &oid, | |
314 | const std::string &id) { | |
315 | librados::ObjectWriteOperation op; | |
316 | client_unregister(&op, id); | |
317 | return ioctx.operate(oid, &op); | |
318 | } | |
319 | ||
320 | void client_unregister(librados::ObjectWriteOperation *op, | |
321 | const std::string &id) { | |
322 | ||
323 | bufferlist bl; | |
324 | ::encode(id, bl); | |
325 | op->exec("journal", "client_unregister", bl); | |
326 | } | |
327 | ||
328 | void client_commit(librados::ObjectWriteOperation *op, const std::string &id, | |
329 | const cls::journal::ObjectSetPosition &commit_position) { | |
330 | bufferlist bl; | |
331 | ::encode(id, bl); | |
332 | ::encode(commit_position, bl); | |
333 | op->exec("journal", "client_commit", bl); | |
334 | } | |
335 | ||
336 | int client_list(librados::IoCtx &ioctx, const std::string &oid, | |
337 | std::set<cls::journal::Client> *clients) { | |
338 | C_SaferCond cond; | |
339 | client_list(ioctx, oid, clients, &cond); | |
340 | return cond.wait(); | |
341 | } | |
342 | ||
343 | void client_list(librados::IoCtx &ioctx, const std::string &oid, | |
344 | std::set<cls::journal::Client> *clients, Context *on_finish) { | |
345 | C_ClientList *client_list = new C_ClientList(ioctx, oid, clients, on_finish); | |
346 | client_list->send(""); | |
347 | } | |
348 | ||
349 | int get_next_tag_tid(librados::IoCtx &ioctx, const std::string &oid, | |
350 | uint64_t *tag_tid) { | |
351 | librados::ObjectReadOperation op; | |
352 | get_next_tag_tid_start(&op); | |
353 | ||
354 | bufferlist out_bl; | |
355 | int r = ioctx.operate(oid, &op, &out_bl); | |
356 | if (r < 0) { | |
357 | return r; | |
358 | } | |
359 | ||
360 | bufferlist::iterator iter = out_bl.begin(); | |
361 | r = get_next_tag_tid_finish(&iter, tag_tid); | |
362 | if (r < 0) { | |
363 | return r; | |
364 | } | |
365 | return 0; | |
366 | } | |
367 | ||
368 | void get_next_tag_tid_start(librados::ObjectReadOperation *op) { | |
369 | bufferlist bl; | |
370 | op->exec("journal", "get_next_tag_tid", bl); | |
371 | } | |
372 | ||
373 | int get_next_tag_tid_finish(bufferlist::iterator *iter, | |
374 | uint64_t *tag_tid) { | |
375 | try { | |
376 | ::decode(*tag_tid, *iter); | |
377 | } catch (const buffer::error &err) { | |
378 | return -EBADMSG; | |
379 | } | |
380 | return 0; | |
381 | } | |
382 | ||
383 | int get_tag(librados::IoCtx &ioctx, const std::string &oid, | |
384 | uint64_t tag_tid, cls::journal::Tag *tag) { | |
385 | librados::ObjectReadOperation op; | |
386 | get_tag_start(&op, tag_tid); | |
387 | ||
388 | bufferlist out_bl; | |
389 | int r = ioctx.operate(oid, &op, &out_bl); | |
390 | if (r < 0) { | |
391 | return r; | |
392 | } | |
393 | ||
394 | bufferlist::iterator iter = out_bl.begin(); | |
395 | r = get_tag_finish(&iter, tag); | |
396 | if (r < 0) { | |
397 | return r; | |
398 | } | |
399 | return 0; | |
400 | } | |
401 | ||
402 | void get_tag_start(librados::ObjectReadOperation *op, | |
403 | uint64_t tag_tid) { | |
404 | bufferlist bl; | |
405 | ::encode(tag_tid, bl); | |
406 | op->exec("journal", "get_tag", bl); | |
407 | } | |
408 | ||
409 | int get_tag_finish(bufferlist::iterator *iter, cls::journal::Tag *tag) { | |
410 | try { | |
411 | ::decode(*tag, *iter); | |
412 | } catch (const buffer::error &err) { | |
413 | return -EBADMSG; | |
414 | } | |
415 | return 0; | |
416 | } | |
417 | ||
418 | int tag_create(librados::IoCtx &ioctx, const std::string &oid, | |
419 | uint64_t tag_tid, uint64_t tag_class, | |
420 | const bufferlist &data) { | |
421 | librados::ObjectWriteOperation op; | |
422 | tag_create(&op, tag_tid, tag_class, data); | |
423 | return ioctx.operate(oid, &op); | |
424 | } | |
425 | ||
426 | void tag_create(librados::ObjectWriteOperation *op, uint64_t tag_tid, | |
427 | uint64_t tag_class, const bufferlist &data) { | |
428 | bufferlist bl; | |
429 | ::encode(tag_tid, bl); | |
430 | ::encode(tag_class, bl); | |
431 | ::encode(data, bl); | |
432 | op->exec("journal", "tag_create", bl); | |
433 | } | |
434 | ||
435 | int tag_list(librados::IoCtx &ioctx, const std::string &oid, | |
436 | const std::string &client_id, boost::optional<uint64_t> tag_class, | |
437 | std::set<cls::journal::Tag> *tags) { | |
438 | tags->clear(); | |
439 | uint64_t start_after_tag_tid = 0; | |
440 | while (true) { | |
441 | librados::ObjectReadOperation op; | |
442 | tag_list_start(&op, start_after_tag_tid, JOURNAL_MAX_RETURN, client_id, | |
443 | tag_class); | |
444 | ||
445 | bufferlist out_bl; | |
446 | int r = ioctx.operate(oid, &op, &out_bl); | |
447 | if (r < 0) { | |
448 | return r; | |
449 | } | |
450 | ||
451 | bufferlist::iterator iter = out_bl.begin(); | |
452 | std::set<cls::journal::Tag> decode_tags; | |
453 | r = tag_list_finish(&iter, &decode_tags); | |
454 | if (r < 0) { | |
455 | return r; | |
456 | } | |
457 | ||
458 | tags->insert(decode_tags.begin(), decode_tags.end()); | |
459 | if (decode_tags.size() < JOURNAL_MAX_RETURN) { | |
460 | break; | |
461 | } | |
462 | } | |
463 | return 0; | |
464 | } | |
465 | ||
466 | void tag_list_start(librados::ObjectReadOperation *op, | |
467 | uint64_t start_after_tag_tid, uint64_t max_return, | |
468 | const std::string &client_id, | |
469 | boost::optional<uint64_t> tag_class) { | |
470 | bufferlist bl; | |
471 | ::encode(start_after_tag_tid, bl); | |
472 | ::encode(max_return, bl); | |
473 | ::encode(client_id, bl); | |
474 | ::encode(tag_class, bl); | |
475 | op->exec("journal", "tag_list", bl); | |
476 | } | |
477 | ||
478 | int tag_list_finish(bufferlist::iterator *iter, | |
479 | std::set<cls::journal::Tag> *tags) { | |
480 | try { | |
481 | ::decode(*tags, *iter); | |
482 | } catch (const buffer::error &err) { | |
483 | return -EBADMSG; | |
484 | } | |
485 | return 0; | |
486 | } | |
487 | ||
488 | void guard_append(librados::ObjectWriteOperation *op, uint64_t soft_max_size) { | |
489 | bufferlist bl; | |
490 | ::encode(soft_max_size, bl); | |
491 | op->exec("journal", "guard_append", bl); | |
492 | } | |
493 | ||
494 | } // namespace client | |
495 | } // namespace journal | |
496 | } // namespace cls |