]> git.proxmox.com Git - ceph.git/blame - ceph/src/cls/2pc_queue/cls_2pc_queue_client.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / cls / 2pc_queue / cls_2pc_queue_client.cc
CommitLineData
f67539c2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "cls/2pc_queue/cls_2pc_queue_client.h"
5#include "cls/2pc_queue/cls_2pc_queue_ops.h"
6#include "cls/2pc_queue/cls_2pc_queue_const.h"
7#include "cls/queue/cls_queue_ops.h"
8#include "cls/queue/cls_queue_const.h"
9
10using namespace librados;
11
12void cls_2pc_queue_init(ObjectWriteOperation& op, const std::string& queue_name, uint64_t size) {
13 bufferlist in;
14 cls_queue_init_op call;
15 call.queue_size = size;
16 encode(call, in);
17 op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_INIT, in);
18}
19
20int cls_2pc_queue_get_capacity_result(const bufferlist& bl, uint64_t& size) {
21 cls_queue_get_capacity_ret op_ret;
22 auto iter = bl.cbegin();
23 try {
24 decode(op_ret, iter);
25 } catch (buffer::error& err) {
26 return -EIO;
27 }
28
29 size = op_ret.queue_capacity;
30
31 return 0;
32}
33
34#ifndef CLS_CLIENT_HIDE_IOCTX
20effc67 35int cls_2pc_queue_get_capacity(IoCtx& io_ctx, const std::string& queue_name, uint64_t& size) {
f67539c2
TL
36 bufferlist in, out;
37 const auto r = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_GET_CAPACITY, in, out);
38 if (r < 0 ) {
39 return r;
40 }
41
42 return cls_2pc_queue_get_capacity_result(out, size);
43}
44#endif
45
46// optionally async method for getting capacity (bytes)
47// after answer is received, call cls_2pc_queue_get_capacity_result() to prase the results
48void cls_2pc_queue_get_capacity(ObjectReadOperation& op, bufferlist* obl, int* prval) {
49 bufferlist in;
50 op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_GET_CAPACITY, in, obl, prval);
51}
52
53
54int cls_2pc_queue_reserve_result(const bufferlist& bl, cls_2pc_reservation::id_t& res_id) {
55 cls_2pc_queue_reserve_ret op_ret;
56 auto iter = bl.cbegin();
57 try {
58 decode(op_ret, iter);
59 } catch (buffer::error& err) {
60 return -EIO;
61 }
62 res_id = op_ret.id;
63
64 return 0;
65}
66
20effc67 67int cls_2pc_queue_reserve(IoCtx& io_ctx, const std::string& queue_name,
f67539c2
TL
68 uint64_t res_size, uint32_t entries, cls_2pc_reservation::id_t& res_id) {
69 bufferlist in, out;
70 cls_2pc_queue_reserve_op reserve_op;
71 reserve_op.size = res_size;
72 reserve_op.entries = entries;
73
74 encode(reserve_op, in);
75 int rval;
76 ObjectWriteOperation op;
77 op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_RESERVE, in, &out, &rval);
78 const auto r = io_ctx.operate(queue_name, &op, librados::OPERATION_RETURNVEC);
79
80 if (r < 0) {
81 return r;
82 }
83
84 return cls_2pc_queue_reserve_result(out, res_id);
85}
86
87void cls_2pc_queue_reserve(ObjectWriteOperation& op, uint64_t res_size,
88 uint32_t entries, bufferlist* obl, int* prval) {
89 bufferlist in;
90 cls_2pc_queue_reserve_op reserve_op;
91 reserve_op.size = res_size;
92 reserve_op.entries = entries;
93 encode(reserve_op, in);
94 op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_RESERVE, in, obl, prval);
95}
96
97void cls_2pc_queue_commit(ObjectWriteOperation& op, std::vector<bufferlist> bl_data_vec,
98 cls_2pc_reservation::id_t res_id) {
99 bufferlist in;
100 cls_2pc_queue_commit_op commit_op;
101 commit_op.id = res_id;
102 commit_op.bl_data_vec = std::move(bl_data_vec);
103 encode(commit_op, in);
104 op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_COMMIT, in);
105}
106
107void cls_2pc_queue_abort(ObjectWriteOperation& op, cls_2pc_reservation::id_t res_id) {
108 bufferlist in;
109 cls_2pc_queue_abort_op abort_op;
110 abort_op.id = res_id;
111 encode(abort_op, in);
112 op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_ABORT, in);
113}
114
115int cls_2pc_queue_list_entries_result(const bufferlist& bl, std::vector<cls_queue_entry>& entries,
116 bool *truncated, std::string& next_marker) {
117 cls_queue_list_ret ret;
118 auto iter = bl.cbegin();
119 try {
120 decode(ret, iter);
121 } catch (buffer::error& err) {
122 return -EIO;
123 }
124
125 entries = std::move(ret.entries);
126 *truncated = ret.is_truncated;
127
128 next_marker = std::move(ret.next_marker);
129
130 return 0;
131}
132
133#ifndef CLS_CLIENT_HIDE_IOCTX
20effc67
TL
134int cls_2pc_queue_list_entries(IoCtx& io_ctx,
135 const std::string& queue_name,
136 const std::string& marker, uint32_t max,
137 std::vector<cls_queue_entry>& entries,
138 bool *truncated, std::string& next_marker) {
f67539c2
TL
139 bufferlist in, out;
140 cls_queue_list_op op;
141 op.start_marker = marker;
142 op.max = max;
143 encode(op, in);
144
145 const auto r = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_LIST_ENTRIES, in, out);
146 if (r < 0) {
147 return r;
148 }
149 return cls_2pc_queue_list_entries_result(out, entries, truncated, next_marker);
150}
151#endif
152
153void cls_2pc_queue_list_entries(ObjectReadOperation& op, const std::string& marker, uint32_t max, bufferlist* obl, int* prval) {
154 bufferlist in;
155 cls_queue_list_op list_op;
156 list_op.start_marker = marker;
157 list_op.max = max;
158 encode(list_op, in);
159
160 op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_LIST_ENTRIES, in, obl, prval);
161}
162
163int cls_2pc_queue_list_reservations_result(const bufferlist& bl, cls_2pc_reservations& reservations) {
164 cls_2pc_queue_reservations_ret ret;
165 auto iter = bl.cbegin();
166 try {
167 decode(ret, iter);
168 } catch (buffer::error& err) {
169 return -EIO;
170 }
171
172 reservations = std::move(ret.reservations);
173
174 return 0;
175}
176
177#ifndef CLS_CLIENT_HIDE_IOCTX
178int cls_2pc_queue_list_reservations(IoCtx& io_ctx, const std::string& queue_name, cls_2pc_reservations& reservations) {
179 bufferlist in, out;
180
181 const auto r = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_LIST_RESERVATIONS, in, out);
182 if (r < 0) {
183 return r;
184 }
185 return cls_2pc_queue_list_reservations_result(out, reservations);
186}
187#endif
188
189void cls_2pc_queue_list_reservations(ObjectReadOperation& op, bufferlist* obl, int* prval) {
190 bufferlist in;
191
192 op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_LIST_RESERVATIONS, in, obl, prval);
193}
194
195void cls_2pc_queue_remove_entries(ObjectWriteOperation& op, const std::string& end_marker) {
196 bufferlist in;
197 cls_queue_remove_op rem_op;
198 rem_op.end_marker = end_marker;
199 encode(rem_op, in);
200 op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_REMOVE_ENTRIES, in);
201}
202
203void cls_2pc_queue_expire_reservations(librados::ObjectWriteOperation& op, ceph::coarse_real_time stale_time) {
204 bufferlist in;
205 cls_2pc_queue_expire_op expire_op;
206 expire_op.stale_time = stale_time;
207 encode(expire_op, in);
208 op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_EXPIRE_RESERVATIONS, in);
209}
210