]>
git.proxmox.com Git - ceph.git/blob - ceph/src/cls/journal/cls_journal_client.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "cls/journal/cls_journal_client.h"
5 #include "include/rados/librados.hpp"
6 #include "include/buffer.h"
7 #include "include/Context.h"
8 #include "common/Cond.h"
17 struct C_AioExec
: public Context
{
18 librados::IoCtx
&ioctx
;
21 C_AioExec(librados::IoCtx
&_ioctx
, const std::string
&_oid
)
22 : ioctx(_ioctx
), oid(_oid
) {
25 static void rados_callback(rados_completion_t c
, void *arg
) {
26 Context
*ctx
= reinterpret_cast<Context
*>(arg
);
27 ctx
->complete(rados_aio_get_return_value(c
));
31 struct C_ClientList
: public C_AioExec
{
32 std::set
<cls::journal::Client
> *clients
;
36 C_ClientList(librados::IoCtx
&_ioctx
, const std::string
&_oid
,
37 std::set
<cls::journal::Client
> *_clients
,
39 : C_AioExec(_ioctx
, _oid
), clients(_clients
), on_finish(_on_finish
) {}
41 void send(const std::string
&start_after
) {
43 ::encode(start_after
, inbl
);
44 ::encode(JOURNAL_MAX_RETURN
, inbl
);
46 librados::ObjectReadOperation op
;
47 op
.exec("journal", "client_list", inbl
);
50 librados::AioCompletion
*rados_completion
=
51 librados::Rados::aio_create_completion(this, rados_callback
, NULL
);
52 int r
= ioctx
.aio_operate(oid
, rados_completion
, &op
, &outbl
);
54 rados_completion
->release();
57 void complete(int r
) override
{
64 bufferlist::iterator iter
= outbl
.begin();
65 std::set
<cls::journal::Client
> partial_clients
;
66 ::decode(partial_clients
, iter
);
68 std::string start_after
;
69 if (!partial_clients
.empty()) {
70 start_after
= partial_clients
.rbegin()->id
;
71 clients
->insert(partial_clients
.begin(), partial_clients
.end());
74 if (partial_clients
.size() < JOURNAL_MAX_RETURN
) {
79 } catch (const buffer::error
&err
) {
84 void finish(int r
) override
{
85 on_finish
->complete(r
);
90 struct C_ImmutableMetadata
: public C_AioExec
{
97 C_ImmutableMetadata(librados::IoCtx
&_ioctx
, const std::string
&_oid
,
98 uint8_t *_order
, uint8_t *_splay_width
,
99 int64_t *_pool_id
, Context
*_on_finish
)
100 : C_AioExec(_ioctx
, _oid
), order(_order
), splay_width(_splay_width
),
101 pool_id(_pool_id
), on_finish(_on_finish
) {
105 librados::ObjectReadOperation op
;
107 op
.exec("journal", "get_order", inbl
);
108 op
.exec("journal", "get_splay_width", inbl
);
109 op
.exec("journal", "get_pool_id", inbl
);
111 librados::AioCompletion
*rados_completion
=
112 librados::Rados::aio_create_completion(this, rados_callback
, NULL
);
113 int r
= ioctx
.aio_operate(oid
, rados_completion
, &op
, &outbl
);
115 rados_completion
->release();
118 void finish(int r
) override
{
121 bufferlist::iterator iter
= outbl
.begin();
122 ::decode(*order
, iter
);
123 ::decode(*splay_width
, iter
);
124 ::decode(*pool_id
, iter
);
125 } catch (const buffer::error
&err
) {
129 on_finish
->complete(r
);
133 struct C_MutableMetadata
: public C_AioExec
{
134 uint64_t *minimum_set
;
135 uint64_t *active_set
;
136 C_ClientList
*client_list
;
139 C_MutableMetadata(librados::IoCtx
&_ioctx
, const std::string
&_oid
,
140 uint64_t *_minimum_set
, uint64_t *_active_set
,
141 C_ClientList
*_client_list
)
142 : C_AioExec(_ioctx
, _oid
), minimum_set(_minimum_set
),
143 active_set(_active_set
), client_list(_client_list
) {}
146 librados::ObjectReadOperation op
;
148 op
.exec("journal", "get_minimum_set", inbl
);
149 op
.exec("journal", "get_active_set", inbl
);
151 librados::AioCompletion
*rados_completion
=
152 librados::Rados::aio_create_completion(this, rados_callback
, NULL
);
153 int r
= ioctx
.aio_operate(oid
, rados_completion
, &op
, &outbl
);
155 rados_completion
->release();
158 void finish(int r
) override
{
161 bufferlist::iterator iter
= outbl
.begin();
162 ::decode(*minimum_set
, iter
);
163 ::decode(*active_set
, iter
);
164 client_list
->send("");
165 } catch (const buffer::error
&err
) {
170 client_list
->complete(r
);
176 } // anonymous namespace
178 void create(librados::ObjectWriteOperation
*op
,
179 uint8_t order
, uint8_t splay
, int64_t pool_id
) {
183 ::encode(pool_id
, bl
);
185 op
->exec("journal", "create", bl
);
188 int create(librados::IoCtx
&ioctx
, const std::string
&oid
, uint8_t order
,
189 uint8_t splay
, int64_t pool_id
) {
190 librados::ObjectWriteOperation op
;
191 create(&op
, order
, splay
, pool_id
);
193 int r
= ioctx
.operate(oid
, &op
);
200 void get_immutable_metadata(librados::IoCtx
&ioctx
, const std::string
&oid
,
201 uint8_t *order
, uint8_t *splay_width
,
202 int64_t *pool_id
, Context
*on_finish
) {
203 C_ImmutableMetadata
*metadata
= new C_ImmutableMetadata(ioctx
, oid
, order
,
204 splay_width
, pool_id
,
209 void get_mutable_metadata(librados::IoCtx
&ioctx
, const std::string
&oid
,
210 uint64_t *minimum_set
, uint64_t *active_set
,
211 std::set
<cls::journal::Client
> *clients
,
212 Context
*on_finish
) {
213 C_ClientList
*client_list
= new C_ClientList(ioctx
, oid
, clients
, on_finish
);
214 C_MutableMetadata
*metadata
= new C_MutableMetadata(
215 ioctx
, oid
, minimum_set
, active_set
, client_list
);
219 void set_minimum_set(librados::ObjectWriteOperation
*op
, uint64_t object_set
) {
221 ::encode(object_set
, bl
);
222 op
->exec("journal", "set_minimum_set", bl
);
225 void set_active_set(librados::ObjectWriteOperation
*op
, uint64_t object_set
) {
227 ::encode(object_set
, bl
);
228 op
->exec("journal", "set_active_set", bl
);
231 int get_client(librados::IoCtx
&ioctx
, const std::string
&oid
,
232 const std::string
&id
, cls::journal::Client
*client
) {
233 librados::ObjectReadOperation op
;
234 get_client_start(&op
, id
);
237 int r
= ioctx
.operate(oid
, &op
, &out_bl
);
242 bufferlist::iterator iter
= out_bl
.begin();
243 r
= get_client_finish(&iter
, client
);
250 void get_client_start(librados::ObjectReadOperation
*op
,
251 const std::string
&id
) {
254 op
->exec("journal", "get_client", bl
);
257 int get_client_finish(bufferlist::iterator
*iter
,
258 cls::journal::Client
*client
) {
260 ::decode(*client
, *iter
);
261 } catch (const buffer::error
&err
) {
267 int client_register(librados::IoCtx
&ioctx
, const std::string
&oid
,
268 const std::string
&id
, const bufferlist
&data
) {
269 librados::ObjectWriteOperation op
;
270 client_register(&op
, id
, data
);
271 return ioctx
.operate(oid
, &op
);
274 void client_register(librados::ObjectWriteOperation
*op
,
275 const std::string
&id
, const bufferlist
&data
) {
279 op
->exec("journal", "client_register", bl
);
282 int client_update_data(librados::IoCtx
&ioctx
, const std::string
&oid
,
283 const std::string
&id
, const bufferlist
&data
) {
284 librados::ObjectWriteOperation op
;
285 client_update_data(&op
, id
, data
);
286 return ioctx
.operate(oid
, &op
);
289 void client_update_data(librados::ObjectWriteOperation
*op
,
290 const std::string
&id
, const bufferlist
&data
) {
294 op
->exec("journal", "client_update_data", bl
);
297 int client_update_state(librados::IoCtx
&ioctx
, const std::string
&oid
,
298 const std::string
&id
, cls::journal::ClientState state
) {
299 librados::ObjectWriteOperation op
;
300 client_update_state(&op
, id
, state
);
301 return ioctx
.operate(oid
, &op
);
304 void client_update_state(librados::ObjectWriteOperation
*op
,
305 const std::string
&id
,
306 cls::journal::ClientState state
) {
309 ::encode(static_cast<uint8_t>(state
), bl
);
310 op
->exec("journal", "client_update_state", bl
);
313 int client_unregister(librados::IoCtx
&ioctx
, const std::string
&oid
,
314 const std::string
&id
) {
315 librados::ObjectWriteOperation op
;
316 client_unregister(&op
, id
);
317 return ioctx
.operate(oid
, &op
);
320 void client_unregister(librados::ObjectWriteOperation
*op
,
321 const std::string
&id
) {
325 op
->exec("journal", "client_unregister", bl
);
328 void client_commit(librados::ObjectWriteOperation
*op
, const std::string
&id
,
329 const cls::journal::ObjectSetPosition
&commit_position
) {
332 ::encode(commit_position
, bl
);
333 op
->exec("journal", "client_commit", bl
);
336 int client_list(librados::IoCtx
&ioctx
, const std::string
&oid
,
337 std::set
<cls::journal::Client
> *clients
) {
339 client_list(ioctx
, oid
, clients
, &cond
);
343 void client_list(librados::IoCtx
&ioctx
, const std::string
&oid
,
344 std::set
<cls::journal::Client
> *clients
, Context
*on_finish
) {
345 C_ClientList
*client_list
= new C_ClientList(ioctx
, oid
, clients
, on_finish
);
346 client_list
->send("");
349 int get_next_tag_tid(librados::IoCtx
&ioctx
, const std::string
&oid
,
351 librados::ObjectReadOperation op
;
352 get_next_tag_tid_start(&op
);
355 int r
= ioctx
.operate(oid
, &op
, &out_bl
);
360 bufferlist::iterator iter
= out_bl
.begin();
361 r
= get_next_tag_tid_finish(&iter
, tag_tid
);
368 void get_next_tag_tid_start(librados::ObjectReadOperation
*op
) {
370 op
->exec("journal", "get_next_tag_tid", bl
);
373 int get_next_tag_tid_finish(bufferlist::iterator
*iter
,
376 ::decode(*tag_tid
, *iter
);
377 } catch (const buffer::error
&err
) {
383 int get_tag(librados::IoCtx
&ioctx
, const std::string
&oid
,
384 uint64_t tag_tid
, cls::journal::Tag
*tag
) {
385 librados::ObjectReadOperation op
;
386 get_tag_start(&op
, tag_tid
);
389 int r
= ioctx
.operate(oid
, &op
, &out_bl
);
394 bufferlist::iterator iter
= out_bl
.begin();
395 r
= get_tag_finish(&iter
, tag
);
402 void get_tag_start(librados::ObjectReadOperation
*op
,
405 ::encode(tag_tid
, bl
);
406 op
->exec("journal", "get_tag", bl
);
409 int get_tag_finish(bufferlist::iterator
*iter
, cls::journal::Tag
*tag
) {
411 ::decode(*tag
, *iter
);
412 } catch (const buffer::error
&err
) {
418 int tag_create(librados::IoCtx
&ioctx
, const std::string
&oid
,
419 uint64_t tag_tid
, uint64_t tag_class
,
420 const bufferlist
&data
) {
421 librados::ObjectWriteOperation op
;
422 tag_create(&op
, tag_tid
, tag_class
, data
);
423 return ioctx
.operate(oid
, &op
);
426 void tag_create(librados::ObjectWriteOperation
*op
, uint64_t tag_tid
,
427 uint64_t tag_class
, const bufferlist
&data
) {
429 ::encode(tag_tid
, bl
);
430 ::encode(tag_class
, bl
);
432 op
->exec("journal", "tag_create", bl
);
435 int tag_list(librados::IoCtx
&ioctx
, const std::string
&oid
,
436 const std::string
&client_id
, boost::optional
<uint64_t> tag_class
,
437 std::set
<cls::journal::Tag
> *tags
) {
439 uint64_t start_after_tag_tid
= 0;
441 librados::ObjectReadOperation op
;
442 tag_list_start(&op
, start_after_tag_tid
, JOURNAL_MAX_RETURN
, client_id
,
446 int r
= ioctx
.operate(oid
, &op
, &out_bl
);
451 bufferlist::iterator iter
= out_bl
.begin();
452 std::set
<cls::journal::Tag
> decode_tags
;
453 r
= tag_list_finish(&iter
, &decode_tags
);
458 tags
->insert(decode_tags
.begin(), decode_tags
.end());
459 if (decode_tags
.size() < JOURNAL_MAX_RETURN
) {
466 void tag_list_start(librados::ObjectReadOperation
*op
,
467 uint64_t start_after_tag_tid
, uint64_t max_return
,
468 const std::string
&client_id
,
469 boost::optional
<uint64_t> tag_class
) {
471 ::encode(start_after_tag_tid
, bl
);
472 ::encode(max_return
, bl
);
473 ::encode(client_id
, bl
);
474 ::encode(tag_class
, bl
);
475 op
->exec("journal", "tag_list", bl
);
478 int tag_list_finish(bufferlist::iterator
*iter
,
479 std::set
<cls::journal::Tag
> *tags
) {
481 ::decode(*tags
, *iter
);
482 } catch (const buffer::error
&err
) {
488 void guard_append(librados::ObjectWriteOperation
*op
, uint64_t soft_max_size
) {
490 ::encode(soft_max_size
, bl
);
491 op
->exec("journal", "guard_append", bl
);
494 } // namespace client
495 } // namespace journal