]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "cls/2pc_queue/cls_2pc_queue_client.h" | |
5 | #include "cls/2pc_queue/cls_2pc_queue_ops.h" | |
6 | #include "cls/2pc_queue/cls_2pc_queue_const.h" | |
7 | #include "cls/queue/cls_queue_ops.h" | |
8 | #include "cls/queue/cls_queue_const.h" | |
9 | ||
10 | using namespace librados; | |
11 | ||
12 | void cls_2pc_queue_init(ObjectWriteOperation& op, const std::string& queue_name, uint64_t size) { | |
13 | bufferlist in; | |
14 | cls_queue_init_op call; | |
15 | call.queue_size = size; | |
16 | encode(call, in); | |
17 | op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_INIT, in); | |
18 | } | |
19 | ||
20 | int cls_2pc_queue_get_capacity_result(const bufferlist& bl, uint64_t& size) { | |
21 | cls_queue_get_capacity_ret op_ret; | |
22 | auto iter = bl.cbegin(); | |
23 | try { | |
24 | decode(op_ret, iter); | |
25 | } catch (buffer::error& err) { | |
26 | return -EIO; | |
27 | } | |
28 | ||
29 | size = op_ret.queue_capacity; | |
30 | ||
31 | return 0; | |
32 | } | |
33 | ||
34 | #ifndef CLS_CLIENT_HIDE_IOCTX | |
20effc67 | 35 | int cls_2pc_queue_get_capacity(IoCtx& io_ctx, const std::string& queue_name, uint64_t& size) { |
f67539c2 TL |
36 | bufferlist in, out; |
37 | const auto r = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_GET_CAPACITY, in, out); | |
38 | if (r < 0 ) { | |
39 | return r; | |
40 | } | |
41 | ||
42 | return cls_2pc_queue_get_capacity_result(out, size); | |
43 | } | |
44 | #endif | |
45 | ||
46 | // optionally async method for getting capacity (bytes) | |
47 | // after answer is received, call cls_2pc_queue_get_capacity_result() to prase the results | |
48 | void cls_2pc_queue_get_capacity(ObjectReadOperation& op, bufferlist* obl, int* prval) { | |
49 | bufferlist in; | |
50 | op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_GET_CAPACITY, in, obl, prval); | |
51 | } | |
52 | ||
53 | ||
54 | int cls_2pc_queue_reserve_result(const bufferlist& bl, cls_2pc_reservation::id_t& res_id) { | |
55 | cls_2pc_queue_reserve_ret op_ret; | |
56 | auto iter = bl.cbegin(); | |
57 | try { | |
58 | decode(op_ret, iter); | |
59 | } catch (buffer::error& err) { | |
60 | return -EIO; | |
61 | } | |
62 | res_id = op_ret.id; | |
63 | ||
64 | return 0; | |
65 | } | |
66 | ||
20effc67 | 67 | int cls_2pc_queue_reserve(IoCtx& io_ctx, const std::string& queue_name, |
f67539c2 TL |
68 | uint64_t res_size, uint32_t entries, cls_2pc_reservation::id_t& res_id) { |
69 | bufferlist in, out; | |
70 | cls_2pc_queue_reserve_op reserve_op; | |
71 | reserve_op.size = res_size; | |
72 | reserve_op.entries = entries; | |
73 | ||
74 | encode(reserve_op, in); | |
75 | int rval; | |
76 | ObjectWriteOperation op; | |
77 | op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_RESERVE, in, &out, &rval); | |
78 | const auto r = io_ctx.operate(queue_name, &op, librados::OPERATION_RETURNVEC); | |
79 | ||
80 | if (r < 0) { | |
81 | return r; | |
82 | } | |
83 | ||
84 | return cls_2pc_queue_reserve_result(out, res_id); | |
85 | } | |
86 | ||
87 | void cls_2pc_queue_reserve(ObjectWriteOperation& op, uint64_t res_size, | |
88 | uint32_t entries, bufferlist* obl, int* prval) { | |
89 | bufferlist in; | |
90 | cls_2pc_queue_reserve_op reserve_op; | |
91 | reserve_op.size = res_size; | |
92 | reserve_op.entries = entries; | |
93 | encode(reserve_op, in); | |
94 | op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_RESERVE, in, obl, prval); | |
95 | } | |
96 | ||
97 | void cls_2pc_queue_commit(ObjectWriteOperation& op, std::vector<bufferlist> bl_data_vec, | |
98 | cls_2pc_reservation::id_t res_id) { | |
99 | bufferlist in; | |
100 | cls_2pc_queue_commit_op commit_op; | |
101 | commit_op.id = res_id; | |
102 | commit_op.bl_data_vec = std::move(bl_data_vec); | |
103 | encode(commit_op, in); | |
104 | op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_COMMIT, in); | |
105 | } | |
106 | ||
107 | void cls_2pc_queue_abort(ObjectWriteOperation& op, cls_2pc_reservation::id_t res_id) { | |
108 | bufferlist in; | |
109 | cls_2pc_queue_abort_op abort_op; | |
110 | abort_op.id = res_id; | |
111 | encode(abort_op, in); | |
112 | op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_ABORT, in); | |
113 | } | |
114 | ||
115 | int cls_2pc_queue_list_entries_result(const bufferlist& bl, std::vector<cls_queue_entry>& entries, | |
116 | bool *truncated, std::string& next_marker) { | |
117 | cls_queue_list_ret ret; | |
118 | auto iter = bl.cbegin(); | |
119 | try { | |
120 | decode(ret, iter); | |
121 | } catch (buffer::error& err) { | |
122 | return -EIO; | |
123 | } | |
124 | ||
125 | entries = std::move(ret.entries); | |
126 | *truncated = ret.is_truncated; | |
127 | ||
128 | next_marker = std::move(ret.next_marker); | |
129 | ||
130 | return 0; | |
131 | } | |
132 | ||
133 | #ifndef CLS_CLIENT_HIDE_IOCTX | |
20effc67 TL |
134 | int cls_2pc_queue_list_entries(IoCtx& io_ctx, |
135 | const std::string& queue_name, | |
136 | const std::string& marker, uint32_t max, | |
137 | std::vector<cls_queue_entry>& entries, | |
138 | bool *truncated, std::string& next_marker) { | |
f67539c2 TL |
139 | bufferlist in, out; |
140 | cls_queue_list_op op; | |
141 | op.start_marker = marker; | |
142 | op.max = max; | |
143 | encode(op, in); | |
144 | ||
145 | const auto r = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_LIST_ENTRIES, in, out); | |
146 | if (r < 0) { | |
147 | return r; | |
148 | } | |
149 | return cls_2pc_queue_list_entries_result(out, entries, truncated, next_marker); | |
150 | } | |
151 | #endif | |
152 | ||
153 | void cls_2pc_queue_list_entries(ObjectReadOperation& op, const std::string& marker, uint32_t max, bufferlist* obl, int* prval) { | |
154 | bufferlist in; | |
155 | cls_queue_list_op list_op; | |
156 | list_op.start_marker = marker; | |
157 | list_op.max = max; | |
158 | encode(list_op, in); | |
159 | ||
160 | op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_LIST_ENTRIES, in, obl, prval); | |
161 | } | |
162 | ||
163 | int cls_2pc_queue_list_reservations_result(const bufferlist& bl, cls_2pc_reservations& reservations) { | |
164 | cls_2pc_queue_reservations_ret ret; | |
165 | auto iter = bl.cbegin(); | |
166 | try { | |
167 | decode(ret, iter); | |
168 | } catch (buffer::error& err) { | |
169 | return -EIO; | |
170 | } | |
171 | ||
172 | reservations = std::move(ret.reservations); | |
173 | ||
174 | return 0; | |
175 | } | |
176 | ||
177 | #ifndef CLS_CLIENT_HIDE_IOCTX | |
178 | int cls_2pc_queue_list_reservations(IoCtx& io_ctx, const std::string& queue_name, cls_2pc_reservations& reservations) { | |
179 | bufferlist in, out; | |
180 | ||
181 | const auto r = io_ctx.exec(queue_name, TPC_QUEUE_CLASS, TPC_QUEUE_LIST_RESERVATIONS, in, out); | |
182 | if (r < 0) { | |
183 | return r; | |
184 | } | |
185 | return cls_2pc_queue_list_reservations_result(out, reservations); | |
186 | } | |
187 | #endif | |
188 | ||
189 | void cls_2pc_queue_list_reservations(ObjectReadOperation& op, bufferlist* obl, int* prval) { | |
190 | bufferlist in; | |
191 | ||
192 | op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_LIST_RESERVATIONS, in, obl, prval); | |
193 | } | |
194 | ||
195 | void cls_2pc_queue_remove_entries(ObjectWriteOperation& op, const std::string& end_marker) { | |
196 | bufferlist in; | |
197 | cls_queue_remove_op rem_op; | |
198 | rem_op.end_marker = end_marker; | |
199 | encode(rem_op, in); | |
200 | op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_REMOVE_ENTRIES, in); | |
201 | } | |
202 | ||
203 | void cls_2pc_queue_expire_reservations(librados::ObjectWriteOperation& op, ceph::coarse_real_time stale_time) { | |
204 | bufferlist in; | |
205 | cls_2pc_queue_expire_op expire_op; | |
206 | expire_op.stale_time = stale_time; | |
207 | encode(expire_op, in); | |
208 | op.exec(TPC_QUEUE_CLASS, TPC_QUEUE_EXPIRE_RESERVATIONS, in); | |
209 | } | |
210 |