]> git.proxmox.com Git - ceph.git/blame - ceph/src/cls/journal/cls_journal_client.cc
update sources to v12.1.0
[ceph.git] / ceph / src / cls / journal / cls_journal_client.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "cls/journal/cls_journal_client.h"
31f18b77 5#include "include/rados/librados.hpp"
7c673cae
FG
6#include "include/buffer.h"
7#include "include/Context.h"
8#include "common/Cond.h"
9#include <errno.h>
7c673cae
FG
10
11namespace cls {
12namespace journal {
13namespace client {
14
15namespace {
16
17struct C_AioExec : public Context {
18 librados::IoCtx &ioctx;
19 std::string oid;
20
21 C_AioExec(librados::IoCtx &_ioctx, const std::string &_oid)
22 : ioctx(_ioctx), oid(_oid) {
23 }
24
25 static void rados_callback(rados_completion_t c, void *arg) {
26 Context *ctx = reinterpret_cast<Context *>(arg);
27 ctx->complete(rados_aio_get_return_value(c));
28 }
29};
30
31struct C_ClientList : public C_AioExec {
32 std::set<cls::journal::Client> *clients;
33 Context *on_finish;
34 bufferlist outbl;
35
36 C_ClientList(librados::IoCtx &_ioctx, const std::string &_oid,
37 std::set<cls::journal::Client> *_clients,
38 Context *_on_finish)
39 : C_AioExec(_ioctx, _oid), clients(_clients), on_finish(_on_finish) {}
40
41 void send(const std::string &start_after) {
42 bufferlist inbl;
43 ::encode(start_after, inbl);
44 ::encode(JOURNAL_MAX_RETURN, inbl);
45
46 librados::ObjectReadOperation op;
47 op.exec("journal", "client_list", inbl);
48
49 outbl.clear();
50 librados::AioCompletion *rados_completion =
51 librados::Rados::aio_create_completion(this, rados_callback, NULL);
52 int r = ioctx.aio_operate(oid, rados_completion, &op, &outbl);
53 assert(r == 0);
54 rados_completion->release();
55 }
56
57 void complete(int r) override {
58 if (r < 0) {
59 finish(r);
60 return;
61 }
62
63 try {
64 bufferlist::iterator iter = outbl.begin();
65 std::set<cls::journal::Client> partial_clients;
66 ::decode(partial_clients, iter);
67
68 std::string start_after;
69 if (!partial_clients.empty()) {
70 start_after = partial_clients.rbegin()->id;
71 clients->insert(partial_clients.begin(), partial_clients.end());
72 }
73
74 if (partial_clients.size() < JOURNAL_MAX_RETURN) {
75 finish(0);
76 } else {
77 send(start_after);
78 }
79 } catch (const buffer::error &err) {
80 finish(-EBADMSG);
81 }
82 }
83
84 void finish(int r) override {
85 on_finish->complete(r);
86 delete this;
87 }
88};
89
90struct C_ImmutableMetadata : public C_AioExec {
91 uint8_t *order;
92 uint8_t *splay_width;
93 int64_t *pool_id;
94 Context *on_finish;
95 bufferlist outbl;
96
97 C_ImmutableMetadata(librados::IoCtx &_ioctx, const std::string &_oid,
98 uint8_t *_order, uint8_t *_splay_width,
99 int64_t *_pool_id, Context *_on_finish)
100 : C_AioExec(_ioctx, _oid), order(_order), splay_width(_splay_width),
101 pool_id(_pool_id), on_finish(_on_finish) {
102 }
103
104 void send() {
105 librados::ObjectReadOperation op;
106 bufferlist inbl;
107 op.exec("journal", "get_order", inbl);
108 op.exec("journal", "get_splay_width", inbl);
109 op.exec("journal", "get_pool_id", inbl);
110
111 librados::AioCompletion *rados_completion =
112 librados::Rados::aio_create_completion(this, rados_callback, NULL);
113 int r = ioctx.aio_operate(oid, rados_completion, &op, &outbl);
114 assert(r == 0);
115 rados_completion->release();
116 }
117
118 void finish(int r) override {
119 if (r == 0) {
120 try {
121 bufferlist::iterator iter = outbl.begin();
122 ::decode(*order, iter);
123 ::decode(*splay_width, iter);
124 ::decode(*pool_id, iter);
125 } catch (const buffer::error &err) {
126 r = -EBADMSG;
127 }
128 }
129 on_finish->complete(r);
130 }
131};
132
133struct C_MutableMetadata : public C_AioExec {
134 uint64_t *minimum_set;
135 uint64_t *active_set;
136 C_ClientList *client_list;
137 bufferlist outbl;
138
139 C_MutableMetadata(librados::IoCtx &_ioctx, const std::string &_oid,
140 uint64_t *_minimum_set, uint64_t *_active_set,
141 C_ClientList *_client_list)
142 : C_AioExec(_ioctx, _oid), minimum_set(_minimum_set),
143 active_set(_active_set), client_list(_client_list) {}
144
145 void send() {
146 librados::ObjectReadOperation op;
147 bufferlist inbl;
148 op.exec("journal", "get_minimum_set", inbl);
149 op.exec("journal", "get_active_set", inbl);
150
151 librados::AioCompletion *rados_completion =
152 librados::Rados::aio_create_completion(this, rados_callback, NULL);
153 int r = ioctx.aio_operate(oid, rados_completion, &op, &outbl);
154 assert(r == 0);
155 rados_completion->release();
156 }
157
158 void finish(int r) override {
159 if (r == 0) {
160 try {
161 bufferlist::iterator iter = outbl.begin();
162 ::decode(*minimum_set, iter);
163 ::decode(*active_set, iter);
164 client_list->send("");
165 } catch (const buffer::error &err) {
166 r = -EBADMSG;
167 }
168 }
169 if (r < 0) {
170 client_list->complete(r);
171 }
172 }
173};
174
175
176} // anonymous namespace
177
178void create(librados::ObjectWriteOperation *op,
179 uint8_t order, uint8_t splay, int64_t pool_id) {
180 bufferlist bl;
181 ::encode(order, bl);
182 ::encode(splay, bl);
183 ::encode(pool_id, bl);
184
185 op->exec("journal", "create", bl);
186}
187
188int create(librados::IoCtx &ioctx, const std::string &oid, uint8_t order,
189 uint8_t splay, int64_t pool_id) {
190 librados::ObjectWriteOperation op;
191 create(&op, order, splay, pool_id);
192
193 int r = ioctx.operate(oid, &op);
194 if (r < 0) {
195 return r;
196 }
197 return 0;
198}
199
200void get_immutable_metadata(librados::IoCtx &ioctx, const std::string &oid,
201 uint8_t *order, uint8_t *splay_width,
202 int64_t *pool_id, Context *on_finish) {
203 C_ImmutableMetadata *metadata = new C_ImmutableMetadata(ioctx, oid, order,
204 splay_width, pool_id,
205 on_finish);
206 metadata->send();
207}
208
209void get_mutable_metadata(librados::IoCtx &ioctx, const std::string &oid,
210 uint64_t *minimum_set, uint64_t *active_set,
211 std::set<cls::journal::Client> *clients,
212 Context *on_finish) {
213 C_ClientList *client_list = new C_ClientList(ioctx, oid, clients, on_finish);
214 C_MutableMetadata *metadata = new C_MutableMetadata(
215 ioctx, oid, minimum_set, active_set, client_list);
216 metadata->send();
217}
218
219void set_minimum_set(librados::ObjectWriteOperation *op, uint64_t object_set) {
220 bufferlist bl;
221 ::encode(object_set, bl);
222 op->exec("journal", "set_minimum_set", bl);
223}
224
225void set_active_set(librados::ObjectWriteOperation *op, uint64_t object_set) {
226 bufferlist bl;
227 ::encode(object_set, bl);
228 op->exec("journal", "set_active_set", bl);
229}
230
231int get_client(librados::IoCtx &ioctx, const std::string &oid,
232 const std::string &id, cls::journal::Client *client) {
233 librados::ObjectReadOperation op;
234 get_client_start(&op, id);
235
236 bufferlist out_bl;
237 int r = ioctx.operate(oid, &op, &out_bl);
238 if (r < 0) {
239 return r;
240 }
241
242 bufferlist::iterator iter = out_bl.begin();
243 r = get_client_finish(&iter, client);
244 if (r < 0) {
245 return r;
246 }
247 return 0;
248}
249
250void get_client_start(librados::ObjectReadOperation *op,
251 const std::string &id) {
252 bufferlist bl;
253 ::encode(id, bl);
254 op->exec("journal", "get_client", bl);
255}
256
257int get_client_finish(bufferlist::iterator *iter,
258 cls::journal::Client *client) {
259 try {
260 ::decode(*client, *iter);
261 } catch (const buffer::error &err) {
262 return -EBADMSG;
263 }
264 return 0;
265}
266
267int client_register(librados::IoCtx &ioctx, const std::string &oid,
268 const std::string &id, const bufferlist &data) {
269 librados::ObjectWriteOperation op;
270 client_register(&op, id, data);
271 return ioctx.operate(oid, &op);
272}
273
274void client_register(librados::ObjectWriteOperation *op,
275 const std::string &id, const bufferlist &data) {
276 bufferlist bl;
277 ::encode(id, bl);
278 ::encode(data, bl);
279 op->exec("journal", "client_register", bl);
280}
281
282int client_update_data(librados::IoCtx &ioctx, const std::string &oid,
283 const std::string &id, const bufferlist &data) {
284 librados::ObjectWriteOperation op;
285 client_update_data(&op, id, data);
286 return ioctx.operate(oid, &op);
287}
288
289void client_update_data(librados::ObjectWriteOperation *op,
290 const std::string &id, const bufferlist &data) {
291 bufferlist bl;
292 ::encode(id, bl);
293 ::encode(data, bl);
294 op->exec("journal", "client_update_data", bl);
295}
296
297int client_update_state(librados::IoCtx &ioctx, const std::string &oid,
298 const std::string &id, cls::journal::ClientState state) {
299 librados::ObjectWriteOperation op;
300 client_update_state(&op, id, state);
301 return ioctx.operate(oid, &op);
302}
303
304void client_update_state(librados::ObjectWriteOperation *op,
305 const std::string &id,
306 cls::journal::ClientState state) {
307 bufferlist bl;
308 ::encode(id, bl);
309 ::encode(static_cast<uint8_t>(state), bl);
310 op->exec("journal", "client_update_state", bl);
311}
312
313int client_unregister(librados::IoCtx &ioctx, const std::string &oid,
314 const std::string &id) {
315 librados::ObjectWriteOperation op;
316 client_unregister(&op, id);
317 return ioctx.operate(oid, &op);
318}
319
320void client_unregister(librados::ObjectWriteOperation *op,
321 const std::string &id) {
322
323 bufferlist bl;
324 ::encode(id, bl);
325 op->exec("journal", "client_unregister", bl);
326}
327
328void client_commit(librados::ObjectWriteOperation *op, const std::string &id,
329 const cls::journal::ObjectSetPosition &commit_position) {
330 bufferlist bl;
331 ::encode(id, bl);
332 ::encode(commit_position, bl);
333 op->exec("journal", "client_commit", bl);
334}
335
336int client_list(librados::IoCtx &ioctx, const std::string &oid,
337 std::set<cls::journal::Client> *clients) {
338 C_SaferCond cond;
339 client_list(ioctx, oid, clients, &cond);
340 return cond.wait();
341}
342
343void client_list(librados::IoCtx &ioctx, const std::string &oid,
344 std::set<cls::journal::Client> *clients, Context *on_finish) {
345 C_ClientList *client_list = new C_ClientList(ioctx, oid, clients, on_finish);
346 client_list->send("");
347}
348
349int get_next_tag_tid(librados::IoCtx &ioctx, const std::string &oid,
350 uint64_t *tag_tid) {
351 librados::ObjectReadOperation op;
352 get_next_tag_tid_start(&op);
353
354 bufferlist out_bl;
355 int r = ioctx.operate(oid, &op, &out_bl);
356 if (r < 0) {
357 return r;
358 }
359
360 bufferlist::iterator iter = out_bl.begin();
361 r = get_next_tag_tid_finish(&iter, tag_tid);
362 if (r < 0) {
363 return r;
364 }
365 return 0;
366}
367
368void get_next_tag_tid_start(librados::ObjectReadOperation *op) {
369 bufferlist bl;
370 op->exec("journal", "get_next_tag_tid", bl);
371}
372
373int get_next_tag_tid_finish(bufferlist::iterator *iter,
374 uint64_t *tag_tid) {
375 try {
376 ::decode(*tag_tid, *iter);
377 } catch (const buffer::error &err) {
378 return -EBADMSG;
379 }
380 return 0;
381}
382
383int get_tag(librados::IoCtx &ioctx, const std::string &oid,
384 uint64_t tag_tid, cls::journal::Tag *tag) {
385 librados::ObjectReadOperation op;
386 get_tag_start(&op, tag_tid);
387
388 bufferlist out_bl;
389 int r = ioctx.operate(oid, &op, &out_bl);
390 if (r < 0) {
391 return r;
392 }
393
394 bufferlist::iterator iter = out_bl.begin();
395 r = get_tag_finish(&iter, tag);
396 if (r < 0) {
397 return r;
398 }
399 return 0;
400}
401
402void get_tag_start(librados::ObjectReadOperation *op,
403 uint64_t tag_tid) {
404 bufferlist bl;
405 ::encode(tag_tid, bl);
406 op->exec("journal", "get_tag", bl);
407}
408
409int get_tag_finish(bufferlist::iterator *iter, cls::journal::Tag *tag) {
410 try {
411 ::decode(*tag, *iter);
412 } catch (const buffer::error &err) {
413 return -EBADMSG;
414 }
415 return 0;
416}
417
418int tag_create(librados::IoCtx &ioctx, const std::string &oid,
419 uint64_t tag_tid, uint64_t tag_class,
420 const bufferlist &data) {
421 librados::ObjectWriteOperation op;
422 tag_create(&op, tag_tid, tag_class, data);
423 return ioctx.operate(oid, &op);
424}
425
426void tag_create(librados::ObjectWriteOperation *op, uint64_t tag_tid,
427 uint64_t tag_class, const bufferlist &data) {
428 bufferlist bl;
429 ::encode(tag_tid, bl);
430 ::encode(tag_class, bl);
431 ::encode(data, bl);
432 op->exec("journal", "tag_create", bl);
433}
434
435int tag_list(librados::IoCtx &ioctx, const std::string &oid,
436 const std::string &client_id, boost::optional<uint64_t> tag_class,
437 std::set<cls::journal::Tag> *tags) {
438 tags->clear();
439 uint64_t start_after_tag_tid = 0;
440 while (true) {
441 librados::ObjectReadOperation op;
442 tag_list_start(&op, start_after_tag_tid, JOURNAL_MAX_RETURN, client_id,
443 tag_class);
444
445 bufferlist out_bl;
446 int r = ioctx.operate(oid, &op, &out_bl);
447 if (r < 0) {
448 return r;
449 }
450
451 bufferlist::iterator iter = out_bl.begin();
452 std::set<cls::journal::Tag> decode_tags;
453 r = tag_list_finish(&iter, &decode_tags);
454 if (r < 0) {
455 return r;
456 }
457
458 tags->insert(decode_tags.begin(), decode_tags.end());
459 if (decode_tags.size() < JOURNAL_MAX_RETURN) {
460 break;
461 }
462 }
463 return 0;
464}
465
466void tag_list_start(librados::ObjectReadOperation *op,
467 uint64_t start_after_tag_tid, uint64_t max_return,
468 const std::string &client_id,
469 boost::optional<uint64_t> tag_class) {
470 bufferlist bl;
471 ::encode(start_after_tag_tid, bl);
472 ::encode(max_return, bl);
473 ::encode(client_id, bl);
474 ::encode(tag_class, bl);
475 op->exec("journal", "tag_list", bl);
476}
477
478int tag_list_finish(bufferlist::iterator *iter,
479 std::set<cls::journal::Tag> *tags) {
480 try {
481 ::decode(*tags, *iter);
482 } catch (const buffer::error &err) {
483 return -EBADMSG;
484 }
485 return 0;
486}
487
488void guard_append(librados::ObjectWriteOperation *op, uint64_t soft_max_size) {
489 bufferlist bl;
490 ::encode(soft_max_size, bl);
491 op->exec("journal", "guard_append", bl);
492}
493
494} // namespace client
495} // namespace journal
496} // namespace cls