]>
Commit | Line | Data |
---|---|---|
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- | |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include <errno.h> | |
5 | ||
6 | #include "cls/rgw/cls_rgw_const.h" | |
7 | #include "cls/rgw/cls_rgw_client.h" | |
8 | ||
9 | #include "common/debug.h" | |
10 | ||
11 | using std::list; | |
12 | using std::map; | |
13 | using std::pair; | |
14 | using std::string; | |
15 | using std::vector; | |
16 | ||
17 | using ceph::real_time; | |
18 | ||
19 | using namespace librados; | |
20 | ||
21 | const string BucketIndexShardsManager::KEY_VALUE_SEPARATOR = "#"; | |
22 | const string BucketIndexShardsManager::SHARDS_SEPARATOR = ","; | |
23 | ||
24 | ||
25 | int CLSRGWConcurrentIO::operator()() { | |
26 | int ret = 0; | |
27 | iter = objs_container.begin(); | |
28 | for (; iter != objs_container.end() && max_aio-- > 0; ++iter) { | |
29 | ret = issue_op(iter->first, iter->second); | |
30 | if (ret < 0) | |
31 | break; | |
32 | } | |
33 | ||
34 | int num_completions = 0, r = 0; | |
35 | std::map<int, std::string> completed_objs; | |
36 | std::map<int, std::string> retry_objs; | |
37 | while (manager.wait_for_completions(valid_ret_code(), &num_completions, &r, | |
38 | need_multiple_rounds() ? &completed_objs : nullptr, | |
39 | !need_multiple_rounds() ? &retry_objs : nullptr)) { | |
40 | if (r >= 0 && ret >= 0) { | |
41 | for (; num_completions && iter != objs_container.end(); --num_completions, ++iter) { | |
42 | int issue_ret = issue_op(iter->first, iter->second); | |
43 | if (issue_ret < 0) { | |
44 | ret = issue_ret; | |
45 | break; | |
46 | } | |
47 | } | |
48 | } else if (ret >= 0) { | |
49 | ret = r; | |
50 | } | |
51 | ||
52 | // if we're at the end with this round, see if another round is needed | |
53 | if (iter == objs_container.end()) { | |
54 | if (need_multiple_rounds() && !completed_objs.empty()) { | |
55 | // For those objects which need another round, use them to reset | |
56 | // the container | |
57 | reset_container(completed_objs); | |
58 | iter = objs_container.begin(); | |
59 | } else if (! need_multiple_rounds() && !retry_objs.empty()) { | |
60 | reset_container(retry_objs); | |
61 | iter = objs_container.begin(); | |
62 | } | |
63 | ||
64 | // re-issue ops if container was reset above (i.e., iter != | |
65 | // objs_container.end()); if it was not reset above (i.e., iter | |
66 | // == objs_container.end()) the loop will exit immediately | |
67 | // without iterating | |
68 | for (; num_completions && iter != objs_container.end(); --num_completions, ++iter) { | |
69 | int issue_ret = issue_op(iter->first, iter->second); | |
70 | if (issue_ret < 0) { | |
71 | ret = issue_ret; | |
72 | break; | |
73 | } | |
74 | } | |
75 | } | |
76 | } | |
77 | ||
78 | if (ret < 0) { | |
79 | cleanup(); | |
80 | } | |
81 | return ret; | |
82 | } // CLSRGWConcurrentIO::operator()() | |
83 | ||
84 | ||
85 | /** | |
86 | * This class represents the bucket index object operation callback context. | |
87 | */ | |
88 | template <typename T> | |
89 | class ClsBucketIndexOpCtx : public ObjectOperationCompletion { | |
90 | private: | |
91 | T *data; | |
92 | int *ret_code; | |
93 | public: | |
94 | ClsBucketIndexOpCtx(T* _data, int *_ret_code) : data(_data), ret_code(_ret_code) { ceph_assert(data); } | |
95 | ~ClsBucketIndexOpCtx() override {} | |
96 | void handle_completion(int r, bufferlist& outbl) override { | |
97 | // if successful, or we're asked for a retry, copy result into | |
98 | // destination (*data) | |
99 | if (r >= 0 || r == RGWBIAdvanceAndRetryError) { | |
100 | try { | |
101 | auto iter = outbl.cbegin(); | |
102 | decode((*data), iter); | |
103 | } catch (ceph::buffer::error& err) { | |
104 | r = -EIO; | |
105 | } | |
106 | } | |
107 | if (ret_code) { | |
108 | *ret_code = r; | |
109 | } | |
110 | } | |
111 | }; | |
112 | ||
113 | void BucketIndexAioManager::do_completion(const int request_id) { | |
114 | std::lock_guard l{lock}; | |
115 | ||
116 | auto iter = pendings.find(request_id); | |
117 | ceph_assert(iter != pendings.end()); | |
118 | completions[request_id] = iter->second; | |
119 | pendings.erase(iter); | |
120 | ||
121 | // If the caller needs a list of finished objects, store them | |
122 | // for further processing | |
123 | auto miter = pending_objs.find(request_id); | |
124 | if (miter != pending_objs.end()) { | |
125 | completion_objs.emplace(request_id, miter->second); | |
126 | pending_objs.erase(miter); | |
127 | } | |
128 | ||
129 | cond.notify_all(); | |
130 | } | |
131 | ||
132 | bool BucketIndexAioManager::wait_for_completions(int valid_ret_code, | |
133 | int *num_completions, | |
134 | int *ret_code, | |
135 | std::map<int, std::string> *completed_objs, | |
136 | std::map<int, std::string> *retry_objs) | |
137 | { | |
138 | std::unique_lock locker{lock}; | |
139 | if (pendings.empty() && completions.empty()) { | |
140 | return false; | |
141 | } | |
142 | ||
143 | if (completions.empty()) { | |
144 | // Wait for AIO completion | |
145 | cond.wait(locker); | |
146 | } | |
147 | ||
148 | // Clear the completed AIOs | |
149 | auto iter = completions.begin(); | |
150 | for (; iter != completions.end(); ++iter) { | |
151 | int r = iter->second->get_return_value(); | |
152 | ||
153 | // see if we may need to copy completions or retries | |
154 | if (completed_objs || retry_objs) { | |
155 | auto liter = completion_objs.find(iter->first); | |
156 | if (liter != completion_objs.end()) { | |
157 | if (completed_objs && r == 0) { /* update list of successfully completed objs */ | |
158 | (*completed_objs)[liter->second.shard_id] = liter->second.oid; | |
159 | } | |
160 | ||
161 | if (r == RGWBIAdvanceAndRetryError) { | |
162 | r = 0; | |
163 | if (retry_objs) { | |
164 | (*retry_objs)[liter->second.shard_id] = liter->second.oid; | |
165 | } | |
166 | } | |
167 | } else { | |
168 | // NB: should we log an error here; currently no logging | |
169 | // context to use | |
170 | } | |
171 | } | |
172 | ||
173 | if (ret_code && (r < 0 && r != valid_ret_code)) { | |
174 | (*ret_code) = r; | |
175 | } | |
176 | ||
177 | iter->second->release(); | |
178 | } | |
179 | ||
180 | if (num_completions) { | |
181 | (*num_completions) = completions.size(); | |
182 | } | |
183 | ||
184 | completions.clear(); | |
185 | ||
186 | return true; | |
187 | } | |
188 | ||
189 | // note: currently only called by testing code | |
190 | void cls_rgw_bucket_init_index(ObjectWriteOperation& o) | |
191 | { | |
192 | bufferlist in; | |
193 | o.exec(RGW_CLASS, RGW_BUCKET_INIT_INDEX, in); | |
194 | } | |
195 | ||
196 | static bool issue_bucket_index_init_op(librados::IoCtx& io_ctx, | |
197 | const int shard_id, | |
198 | const string& oid, | |
199 | BucketIndexAioManager *manager) { | |
200 | bufferlist in; | |
201 | librados::ObjectWriteOperation op; | |
202 | op.create(true); | |
203 | op.exec(RGW_CLASS, RGW_BUCKET_INIT_INDEX, in); | |
204 | return manager->aio_operate(io_ctx, shard_id, oid, &op); | |
205 | } | |
206 | ||
207 | static bool issue_bucket_index_clean_op(librados::IoCtx& io_ctx, | |
208 | const int shard_id, | |
209 | const string& oid, | |
210 | BucketIndexAioManager *manager) { | |
211 | bufferlist in; | |
212 | librados::ObjectWriteOperation op; | |
213 | op.remove(); | |
214 | return manager->aio_operate(io_ctx, shard_id, oid, &op); | |
215 | } | |
216 | ||
217 | static bool issue_bucket_set_tag_timeout_op(librados::IoCtx& io_ctx, | |
218 | const int shard_id, | |
219 | const string& oid, | |
220 | uint64_t timeout, | |
221 | BucketIndexAioManager *manager) { | |
222 | bufferlist in; | |
223 | rgw_cls_tag_timeout_op call; | |
224 | call.tag_timeout = timeout; | |
225 | encode(call, in); | |
226 | ObjectWriteOperation op; | |
227 | op.exec(RGW_CLASS, RGW_BUCKET_SET_TAG_TIMEOUT, in); | |
228 | return manager->aio_operate(io_ctx, shard_id, oid, &op); | |
229 | } | |
230 | ||
231 | int CLSRGWIssueBucketIndexInit::issue_op(const int shard_id, const string& oid) | |
232 | { | |
233 | return issue_bucket_index_init_op(io_ctx, shard_id, oid, &manager); | |
234 | } | |
235 | ||
236 | void CLSRGWIssueBucketIndexInit::cleanup() | |
237 | { | |
238 | // Do best effort removal | |
239 | for (auto citer = objs_container.begin(); citer != iter; ++citer) { | |
240 | io_ctx.remove(citer->second); | |
241 | } | |
242 | } | |
243 | ||
244 | int CLSRGWIssueBucketIndexClean::issue_op(const int shard_id, const string& oid) | |
245 | { | |
246 | return issue_bucket_index_clean_op(io_ctx, shard_id, oid, &manager); | |
247 | } | |
248 | ||
249 | int CLSRGWIssueSetTagTimeout::issue_op(const int shard_id, const string& oid) | |
250 | { | |
251 | return issue_bucket_set_tag_timeout_op(io_ctx, shard_id, oid, tag_timeout, &manager); | |
252 | } | |
253 | ||
254 | void cls_rgw_bucket_update_stats(librados::ObjectWriteOperation& o, | |
255 | bool absolute, | |
256 | const map<RGWObjCategory, rgw_bucket_category_stats>& stats) | |
257 | { | |
258 | rgw_cls_bucket_update_stats_op call; | |
259 | call.absolute = absolute; | |
260 | call.stats = stats; | |
261 | bufferlist in; | |
262 | encode(call, in); | |
263 | o.exec(RGW_CLASS, RGW_BUCKET_UPDATE_STATS, in); | |
264 | } | |
265 | ||
266 | void cls_rgw_bucket_prepare_op(ObjectWriteOperation& o, RGWModifyOp op, const string& tag, | |
267 | const cls_rgw_obj_key& key, const string& locator, bool log_op, | |
268 | uint16_t bilog_flags, const rgw_zone_set& zones_trace) | |
269 | { | |
270 | rgw_cls_obj_prepare_op call; | |
271 | call.op = op; | |
272 | call.tag = tag; | |
273 | call.key = key; | |
274 | call.locator = locator; | |
275 | call.log_op = log_op; | |
276 | call.bilog_flags = bilog_flags; | |
277 | call.zones_trace = zones_trace; | |
278 | bufferlist in; | |
279 | encode(call, in); | |
280 | o.exec(RGW_CLASS, RGW_BUCKET_PREPARE_OP, in); | |
281 | } | |
282 | ||
283 | void cls_rgw_bucket_complete_op(ObjectWriteOperation& o, RGWModifyOp op, const string& tag, | |
284 | const rgw_bucket_entry_ver& ver, | |
285 | const cls_rgw_obj_key& key, | |
286 | const rgw_bucket_dir_entry_meta& dir_meta, | |
287 | const list<cls_rgw_obj_key> *remove_objs, bool log_op, | |
288 | uint16_t bilog_flags, | |
289 | const rgw_zone_set *zones_trace, | |
290 | const std::string& obj_locator) | |
291 | { | |
292 | ||
293 | bufferlist in; | |
294 | rgw_cls_obj_complete_op call; | |
295 | call.op = op; | |
296 | call.tag = tag; | |
297 | call.key = key; | |
298 | call.ver = ver; | |
299 | call.locator = obj_locator; | |
300 | call.meta = dir_meta; | |
301 | call.log_op = log_op; | |
302 | call.bilog_flags = bilog_flags; | |
303 | if (remove_objs) | |
304 | call.remove_objs = *remove_objs; | |
305 | if (zones_trace) { | |
306 | call.zones_trace = *zones_trace; | |
307 | } | |
308 | encode(call, in); | |
309 | o.exec(RGW_CLASS, RGW_BUCKET_COMPLETE_OP, in); | |
310 | } | |
311 | ||
312 | void cls_rgw_bucket_list_op(librados::ObjectReadOperation& op, | |
313 | const cls_rgw_obj_key& start_obj, | |
314 | const std::string& filter_prefix, | |
315 | const std::string& delimiter, | |
316 | uint32_t num_entries, | |
317 | bool list_versions, | |
318 | rgw_cls_list_ret* result) | |
319 | { | |
320 | bufferlist in; | |
321 | rgw_cls_list_op call; | |
322 | call.start_obj = start_obj; | |
323 | call.filter_prefix = filter_prefix; | |
324 | call.delimiter = delimiter; | |
325 | call.num_entries = num_entries; | |
326 | call.list_versions = list_versions; | |
327 | encode(call, in); | |
328 | ||
329 | op.exec(RGW_CLASS, RGW_BUCKET_LIST, in, | |
330 | new ClsBucketIndexOpCtx<rgw_cls_list_ret>(result, NULL)); | |
331 | } | |
332 | ||
333 | static bool issue_bucket_list_op(librados::IoCtx& io_ctx, | |
334 | const int shard_id, | |
335 | const std::string& oid, | |
336 | const cls_rgw_obj_key& start_obj, | |
337 | const std::string& filter_prefix, | |
338 | const std::string& delimiter, | |
339 | uint32_t num_entries, | |
340 | bool list_versions, | |
341 | BucketIndexAioManager *manager, | |
342 | rgw_cls_list_ret *pdata) | |
343 | { | |
344 | librados::ObjectReadOperation op; | |
345 | cls_rgw_bucket_list_op(op, | |
346 | start_obj, filter_prefix, delimiter, | |
347 | num_entries, list_versions, pdata); | |
348 | return manager->aio_operate(io_ctx, shard_id, oid, &op); | |
349 | } | |
350 | ||
351 | int CLSRGWIssueBucketList::issue_op(const int shard_id, const string& oid) | |
352 | { | |
353 | // set the marker depending on whether we've already queried this | |
354 | // shard and gotten a RGWBIAdvanceAndRetryError (defined | |
355 | // constant) return value; if we have use the marker in the return | |
356 | // to advance the search, otherwise use the marker passed in by the | |
357 | // caller | |
358 | cls_rgw_obj_key marker; | |
359 | auto iter = result.find(shard_id); | |
360 | if (iter != result.end()) { | |
361 | marker = iter->second.marker; | |
362 | } else { | |
363 | marker = start_obj; | |
364 | } | |
365 | ||
366 | return issue_bucket_list_op(io_ctx, shard_id, oid, | |
367 | marker, filter_prefix, delimiter, | |
368 | num_entries, list_versions, &manager, | |
369 | &result[shard_id]); | |
370 | } | |
371 | ||
372 | ||
373 | void CLSRGWIssueBucketList::reset_container(std::map<int, std::string>& objs) | |
374 | { | |
375 | objs_container.swap(objs); | |
376 | iter = objs_container.begin(); | |
377 | objs.clear(); | |
378 | } | |
379 | ||
380 | ||
381 | void cls_rgw_remove_obj(librados::ObjectWriteOperation& o, list<string>& keep_attr_prefixes) | |
382 | { | |
383 | bufferlist in; | |
384 | rgw_cls_obj_remove_op call; | |
385 | call.keep_attr_prefixes = keep_attr_prefixes; | |
386 | encode(call, in); | |
387 | o.exec(RGW_CLASS, RGW_OBJ_REMOVE, in); | |
388 | } | |
389 | ||
390 | void cls_rgw_obj_store_pg_ver(librados::ObjectWriteOperation& o, const string& attr) | |
391 | { | |
392 | bufferlist in; | |
393 | rgw_cls_obj_store_pg_ver_op call; | |
394 | call.attr = attr; | |
395 | encode(call, in); | |
396 | o.exec(RGW_CLASS, RGW_OBJ_STORE_PG_VER, in); | |
397 | } | |
398 | ||
399 | void cls_rgw_obj_check_attrs_prefix(librados::ObjectOperation& o, const string& prefix, bool fail_if_exist) | |
400 | { | |
401 | bufferlist in; | |
402 | rgw_cls_obj_check_attrs_prefix call; | |
403 | call.check_prefix = prefix; | |
404 | call.fail_if_exist = fail_if_exist; | |
405 | encode(call, in); | |
406 | o.exec(RGW_CLASS, RGW_OBJ_CHECK_ATTRS_PREFIX, in); | |
407 | } | |
408 | ||
409 | void cls_rgw_obj_check_mtime(librados::ObjectOperation& o, const real_time& mtime, bool high_precision_time, RGWCheckMTimeType type) | |
410 | { | |
411 | bufferlist in; | |
412 | rgw_cls_obj_check_mtime call; | |
413 | call.mtime = mtime; | |
414 | call.high_precision_time = high_precision_time; | |
415 | call.type = type; | |
416 | encode(call, in); | |
417 | o.exec(RGW_CLASS, RGW_OBJ_CHECK_MTIME, in); | |
418 | } | |
419 | ||
420 | int cls_rgw_bi_get(librados::IoCtx& io_ctx, const string oid, | |
421 | BIIndexType index_type, const cls_rgw_obj_key& key, | |
422 | rgw_cls_bi_entry *entry) | |
423 | { | |
424 | bufferlist in, out; | |
425 | rgw_cls_bi_get_op call; | |
426 | call.key = key; | |
427 | call.type = index_type; | |
428 | encode(call, in); | |
429 | int r = io_ctx.exec(oid, RGW_CLASS, RGW_BI_GET, in, out); | |
430 | if (r < 0) | |
431 | return r; | |
432 | ||
433 | rgw_cls_bi_get_ret op_ret; | |
434 | auto iter = out.cbegin(); | |
435 | try { | |
436 | decode(op_ret, iter); | |
437 | } catch (ceph::buffer::error& err) { | |
438 | return -EIO; | |
439 | } | |
440 | ||
441 | *entry = op_ret.entry; | |
442 | ||
443 | return 0; | |
444 | } | |
445 | ||
446 | int cls_rgw_bi_put(librados::IoCtx& io_ctx, const string oid, const rgw_cls_bi_entry& entry) | |
447 | { | |
448 | bufferlist in, out; | |
449 | rgw_cls_bi_put_op call; | |
450 | call.entry = entry; | |
451 | encode(call, in); | |
452 | int r = io_ctx.exec(oid, RGW_CLASS, RGW_BI_PUT, in, out); | |
453 | if (r < 0) | |
454 | return r; | |
455 | ||
456 | return 0; | |
457 | } | |
458 | ||
459 | void cls_rgw_bi_put(ObjectWriteOperation& op, const string oid, const rgw_cls_bi_entry& entry) | |
460 | { | |
461 | bufferlist in, out; | |
462 | rgw_cls_bi_put_op call; | |
463 | call.entry = entry; | |
464 | encode(call, in); | |
465 | op.exec(RGW_CLASS, RGW_BI_PUT, in); | |
466 | } | |
467 | ||
468 | /* nb: any entries passed in are replaced with the results of the cls | |
469 | * call, so caller does not need to clear entries between calls | |
470 | */ | |
471 | int cls_rgw_bi_list(librados::IoCtx& io_ctx, const std::string& oid, | |
472 | const std::string& name_filter, const std::string& marker, uint32_t max, | |
473 | std::list<rgw_cls_bi_entry> *entries, bool *is_truncated) | |
474 | { | |
475 | bufferlist in, out; | |
476 | rgw_cls_bi_list_op call; | |
477 | call.name_filter = name_filter; | |
478 | call.marker = marker; | |
479 | call.max = max; | |
480 | encode(call, in); | |
481 | int r = io_ctx.exec(oid, RGW_CLASS, RGW_BI_LIST, in, out); | |
482 | if (r < 0) | |
483 | return r; | |
484 | ||
485 | rgw_cls_bi_list_ret op_ret; | |
486 | auto iter = out.cbegin(); | |
487 | try { | |
488 | decode(op_ret, iter); | |
489 | } catch (ceph::buffer::error& err) { | |
490 | return -EIO; | |
491 | } | |
492 | ||
493 | entries->swap(op_ret.entries); | |
494 | *is_truncated = op_ret.is_truncated; | |
495 | ||
496 | return 0; | |
497 | } | |
498 | ||
499 | int cls_rgw_bucket_link_olh(librados::IoCtx& io_ctx, const string& oid, | |
500 | const cls_rgw_obj_key& key, const bufferlist& olh_tag, | |
501 | bool delete_marker, const string& op_tag, const rgw_bucket_dir_entry_meta *meta, | |
502 | uint64_t olh_epoch, ceph::real_time unmod_since, bool high_precision_time, bool log_op, const rgw_zone_set& zones_trace) | |
503 | { | |
504 | librados::ObjectWriteOperation op; | |
505 | cls_rgw_bucket_link_olh(op, key, olh_tag, delete_marker, op_tag, meta, | |
506 | olh_epoch, unmod_since, high_precision_time, log_op, | |
507 | zones_trace); | |
508 | ||
509 | return io_ctx.operate(oid, &op); | |
510 | } | |
511 | ||
512 | ||
513 | void cls_rgw_bucket_link_olh(librados::ObjectWriteOperation& op, const cls_rgw_obj_key& key, | |
514 | const bufferlist& olh_tag, bool delete_marker, | |
515 | const string& op_tag, const rgw_bucket_dir_entry_meta *meta, | |
516 | uint64_t olh_epoch, ceph::real_time unmod_since, bool high_precision_time, bool log_op, const rgw_zone_set& zones_trace) | |
517 | { | |
518 | bufferlist in, out; | |
519 | rgw_cls_link_olh_op call; | |
520 | call.key = key; | |
521 | call.olh_tag = olh_tag.to_str(); | |
522 | call.op_tag = op_tag; | |
523 | call.delete_marker = delete_marker; | |
524 | if (meta) { | |
525 | call.meta = *meta; | |
526 | } | |
527 | call.olh_epoch = olh_epoch; | |
528 | call.log_op = log_op; | |
529 | call.unmod_since = unmod_since; | |
530 | call.high_precision_time = high_precision_time; | |
531 | call.zones_trace = zones_trace; | |
532 | encode(call, in); | |
533 | op.exec(RGW_CLASS, RGW_BUCKET_LINK_OLH, in); | |
534 | } | |
535 | ||
536 | int cls_rgw_bucket_unlink_instance(librados::IoCtx& io_ctx, const string& oid, | |
537 | const cls_rgw_obj_key& key, const string& op_tag, | |
538 | const string& olh_tag, uint64_t olh_epoch, bool log_op, const rgw_zone_set& zones_trace) | |
539 | { | |
540 | librados::ObjectWriteOperation op; | |
541 | cls_rgw_bucket_unlink_instance(op, key, op_tag, olh_tag, olh_epoch, log_op, zones_trace); | |
542 | int r = io_ctx.operate(oid, &op); | |
543 | if (r < 0) | |
544 | return r; | |
545 | ||
546 | return 0; | |
547 | } | |
548 | ||
549 | void cls_rgw_bucket_unlink_instance(librados::ObjectWriteOperation& op, | |
550 | const cls_rgw_obj_key& key, const string& op_tag, | |
551 | const string& olh_tag, uint64_t olh_epoch, bool log_op, const rgw_zone_set& zones_trace) | |
552 | { | |
553 | bufferlist in, out; | |
554 | rgw_cls_unlink_instance_op call; | |
555 | call.key = key; | |
556 | call.op_tag = op_tag; | |
557 | call.olh_epoch = olh_epoch; | |
558 | call.olh_tag = olh_tag; | |
559 | call.log_op = log_op; | |
560 | call.zones_trace = zones_trace; | |
561 | encode(call, in); | |
562 | op.exec(RGW_CLASS, RGW_BUCKET_UNLINK_INSTANCE, in); | |
563 | } | |
564 | ||
565 | void cls_rgw_get_olh_log(librados::ObjectReadOperation& op, const cls_rgw_obj_key& olh, uint64_t ver_marker, const string& olh_tag, rgw_cls_read_olh_log_ret& log_ret, int& op_ret) | |
566 | { | |
567 | bufferlist in; | |
568 | rgw_cls_read_olh_log_op call; | |
569 | call.olh = olh; | |
570 | call.ver_marker = ver_marker; | |
571 | call.olh_tag = olh_tag; | |
572 | encode(call, in); | |
573 | op.exec(RGW_CLASS, RGW_BUCKET_READ_OLH_LOG, in, new ClsBucketIndexOpCtx<rgw_cls_read_olh_log_ret>(&log_ret, &op_ret)); | |
574 | } | |
575 | ||
576 | int cls_rgw_get_olh_log(IoCtx& io_ctx, string& oid, const cls_rgw_obj_key& olh, uint64_t ver_marker, | |
577 | const string& olh_tag, | |
578 | rgw_cls_read_olh_log_ret& log_ret) | |
579 | { | |
580 | int op_ret = 0; | |
581 | librados::ObjectReadOperation op; | |
582 | cls_rgw_get_olh_log(op, olh, ver_marker, olh_tag, log_ret, op_ret); | |
583 | int r = io_ctx.operate(oid, &op, NULL); | |
584 | if (r < 0) { | |
585 | return r; | |
586 | } | |
587 | if (op_ret < 0) { | |
588 | return op_ret; | |
589 | } | |
590 | ||
591 | return r; | |
592 | } | |
593 | ||
594 | void cls_rgw_trim_olh_log(librados::ObjectWriteOperation& op, const cls_rgw_obj_key& olh, uint64_t ver, const string& olh_tag) | |
595 | { | |
596 | bufferlist in; | |
597 | rgw_cls_trim_olh_log_op call; | |
598 | call.olh = olh; | |
599 | call.ver = ver; | |
600 | call.olh_tag = olh_tag; | |
601 | encode(call, in); | |
602 | op.exec(RGW_CLASS, RGW_BUCKET_TRIM_OLH_LOG, in); | |
603 | } | |
604 | ||
605 | int cls_rgw_clear_olh(IoCtx& io_ctx, string& oid, const cls_rgw_obj_key& olh, const string& olh_tag) | |
606 | { | |
607 | librados::ObjectWriteOperation op; | |
608 | cls_rgw_clear_olh(op, olh, olh_tag); | |
609 | ||
610 | return io_ctx.operate(oid, &op); | |
611 | } | |
612 | ||
613 | void cls_rgw_clear_olh(librados::ObjectWriteOperation& op, const cls_rgw_obj_key& olh, const string& olh_tag) | |
614 | { | |
615 | bufferlist in; | |
616 | rgw_cls_bucket_clear_olh_op call; | |
617 | call.key = olh; | |
618 | call.olh_tag = olh_tag; | |
619 | encode(call, in); | |
620 | op.exec(RGW_CLASS, RGW_BUCKET_CLEAR_OLH, in); | |
621 | } | |
622 | ||
623 | void cls_rgw_bilog_list(librados::ObjectReadOperation& op, | |
624 | const std::string& marker, uint32_t max, | |
625 | cls_rgw_bi_log_list_ret *pdata, int *ret) | |
626 | { | |
627 | cls_rgw_bi_log_list_op call; | |
628 | call.marker = marker; | |
629 | call.max = max; | |
630 | ||
631 | bufferlist in; | |
632 | encode(call, in); | |
633 | op.exec(RGW_CLASS, RGW_BI_LOG_LIST, in, new ClsBucketIndexOpCtx<cls_rgw_bi_log_list_ret>(pdata, ret)); | |
634 | } | |
635 | ||
636 | static bool issue_bi_log_list_op(librados::IoCtx& io_ctx, const string& oid, const int shard_id, | |
637 | BucketIndexShardsManager& marker_mgr, uint32_t max, | |
638 | BucketIndexAioManager *manager, | |
639 | cls_rgw_bi_log_list_ret *pdata) | |
640 | { | |
641 | librados::ObjectReadOperation op; | |
642 | cls_rgw_bilog_list(op, marker_mgr.get(shard_id, ""), max, pdata, nullptr); | |
643 | return manager->aio_operate(io_ctx, shard_id, oid, &op); | |
644 | } | |
645 | ||
646 | int CLSRGWIssueBILogList::issue_op(const int shard_id, const string& oid) | |
647 | { | |
648 | return issue_bi_log_list_op(io_ctx, oid, shard_id, marker_mgr, max, &manager, &result[shard_id]); | |
649 | } | |
650 | ||
651 | void cls_rgw_bilog_trim(librados::ObjectWriteOperation& op, | |
652 | const std::string& start_marker, | |
653 | const std::string& end_marker) | |
654 | { | |
655 | cls_rgw_bi_log_trim_op call; | |
656 | call.start_marker = start_marker; | |
657 | call.end_marker = end_marker; | |
658 | ||
659 | bufferlist in; | |
660 | encode(call, in); | |
661 | op.exec(RGW_CLASS, RGW_BI_LOG_TRIM, in); | |
662 | } | |
663 | ||
664 | static bool issue_bi_log_trim(librados::IoCtx& io_ctx, const string& oid, const int shard_id, | |
665 | BucketIndexShardsManager& start_marker_mgr, | |
666 | BucketIndexShardsManager& end_marker_mgr, BucketIndexAioManager *manager) { | |
667 | cls_rgw_bi_log_trim_op call; | |
668 | librados::ObjectWriteOperation op; | |
669 | cls_rgw_bilog_trim(op, start_marker_mgr.get(shard_id, ""), | |
670 | end_marker_mgr.get(shard_id, "")); | |
671 | return manager->aio_operate(io_ctx, shard_id, oid, &op); | |
672 | } | |
673 | ||
674 | int CLSRGWIssueBILogTrim::issue_op(const int shard_id, const string& oid) | |
675 | { | |
676 | return issue_bi_log_trim(io_ctx, oid, shard_id, start_marker_mgr, end_marker_mgr, &manager); | |
677 | } | |
678 | ||
679 | static bool issue_bucket_check_index_op(IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager, | |
680 | rgw_cls_check_index_ret *pdata) { | |
681 | bufferlist in; | |
682 | librados::ObjectReadOperation op; | |
683 | op.exec(RGW_CLASS, RGW_BUCKET_CHECK_INDEX, in, new ClsBucketIndexOpCtx<rgw_cls_check_index_ret>( | |
684 | pdata, NULL)); | |
685 | return manager->aio_operate(io_ctx, shard_id, oid, &op); | |
686 | } | |
687 | ||
688 | int CLSRGWIssueBucketCheck::issue_op(int shard_id, const string& oid) | |
689 | { | |
690 | return issue_bucket_check_index_op(io_ctx, shard_id, oid, &manager, &result[shard_id]); | |
691 | } | |
692 | ||
693 | static bool issue_bucket_rebuild_index_op(IoCtx& io_ctx, const int shard_id, const string& oid, | |
694 | BucketIndexAioManager *manager) { | |
695 | bufferlist in; | |
696 | librados::ObjectWriteOperation op; | |
697 | op.exec(RGW_CLASS, RGW_BUCKET_REBUILD_INDEX, in); | |
698 | return manager->aio_operate(io_ctx, shard_id, oid, &op); | |
699 | } | |
700 | ||
701 | int CLSRGWIssueBucketRebuild::issue_op(const int shard_id, const string& oid) | |
702 | { | |
703 | return issue_bucket_rebuild_index_op(io_ctx, shard_id, oid, &manager); | |
704 | } | |
705 | ||
706 | void cls_rgw_encode_suggestion(char op, rgw_bucket_dir_entry& dirent, bufferlist& updates) | |
707 | { | |
708 | updates.append(op); | |
709 | encode(dirent, updates); | |
710 | } | |
711 | ||
712 | void cls_rgw_suggest_changes(ObjectWriteOperation& o, bufferlist& updates) | |
713 | { | |
714 | o.exec(RGW_CLASS, RGW_DIR_SUGGEST_CHANGES, updates); | |
715 | } | |
716 | ||
717 | int CLSRGWIssueGetDirHeader::issue_op(const int shard_id, const string& oid) | |
718 | { | |
719 | cls_rgw_obj_key empty_key; | |
720 | string empty_prefix; | |
721 | string empty_delimiter; | |
722 | return issue_bucket_list_op(io_ctx, shard_id, oid, | |
723 | empty_key, empty_prefix, empty_delimiter, | |
724 | 0, false, &manager, &result[shard_id]); | |
725 | } | |
726 | ||
727 | static bool issue_resync_bi_log(librados::IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager) | |
728 | { | |
729 | bufferlist in; | |
730 | librados::ObjectWriteOperation op; | |
731 | op.exec(RGW_CLASS, RGW_BI_LOG_RESYNC, in); | |
732 | return manager->aio_operate(io_ctx, shard_id, oid, &op); | |
733 | } | |
734 | ||
735 | int CLSRGWIssueResyncBucketBILog::issue_op(const int shard_id, const string& oid) | |
736 | { | |
737 | return issue_resync_bi_log(io_ctx, shard_id, oid, &manager); | |
738 | } | |
739 | ||
740 | static bool issue_bi_log_stop(librados::IoCtx& io_ctx, const int shard_id, const string& oid, BucketIndexAioManager *manager) | |
741 | { | |
742 | bufferlist in; | |
743 | librados::ObjectWriteOperation op; | |
744 | op.exec(RGW_CLASS, RGW_BI_LOG_STOP, in); | |
745 | return manager->aio_operate(io_ctx, shard_id, oid, &op); | |
746 | } | |
747 | ||
748 | int CLSRGWIssueBucketBILogStop::issue_op(const int shard_id, const string& oid) | |
749 | { | |
750 | return issue_bi_log_stop(io_ctx, shard_id, oid, &manager); | |
751 | } | |
752 | ||
753 | class GetDirHeaderCompletion : public ObjectOperationCompletion { | |
754 | boost::intrusive_ptr<RGWGetDirHeader_CB> cb; | |
755 | public: | |
756 | explicit GetDirHeaderCompletion(boost::intrusive_ptr<RGWGetDirHeader_CB> cb) | |
757 | : cb(std::move(cb)) {} | |
758 | ||
759 | void handle_completion(int r, bufferlist& outbl) override { | |
760 | rgw_cls_list_ret ret; | |
761 | try { | |
762 | auto iter = outbl.cbegin(); | |
763 | decode(ret, iter); | |
764 | } catch (ceph::buffer::error& err) { | |
765 | r = -EIO; | |
766 | } | |
767 | cb->handle_response(r, ret.dir.header); | |
768 | } | |
769 | }; | |
770 | ||
771 | int cls_rgw_get_dir_header_async(IoCtx& io_ctx, const string& oid, | |
772 | boost::intrusive_ptr<RGWGetDirHeader_CB> cb) | |
773 | { | |
774 | bufferlist in, out; | |
775 | rgw_cls_list_op call; | |
776 | call.num_entries = 0; | |
777 | encode(call, in); | |
778 | ObjectReadOperation op; | |
779 | op.exec(RGW_CLASS, RGW_BUCKET_LIST, in, | |
780 | new GetDirHeaderCompletion(std::move(cb))); | |
781 | AioCompletion *c = librados::Rados::aio_create_completion(nullptr, nullptr); | |
782 | int r = io_ctx.aio_operate(oid, c, &op, NULL); | |
783 | c->release(); | |
784 | if (r < 0) | |
785 | return r; | |
786 | ||
787 | return 0; | |
788 | } | |
789 | ||
790 | int cls_rgw_usage_log_read(IoCtx& io_ctx, const string& oid, const string& user, const string& bucket, | |
791 | uint64_t start_epoch, uint64_t end_epoch, uint32_t max_entries, | |
792 | string& read_iter, map<rgw_user_bucket, rgw_usage_log_entry>& usage, | |
793 | bool *is_truncated) | |
794 | { | |
795 | if (is_truncated) | |
796 | *is_truncated = false; | |
797 | ||
798 | bufferlist in, out; | |
799 | rgw_cls_usage_log_read_op call; | |
800 | call.start_epoch = start_epoch; | |
801 | call.end_epoch = end_epoch; | |
802 | call.owner = user; | |
803 | call.max_entries = max_entries; | |
804 | call.bucket = bucket; | |
805 | call.iter = read_iter; | |
806 | encode(call, in); | |
807 | int r = io_ctx.exec(oid, RGW_CLASS, RGW_USER_USAGE_LOG_READ, in, out); | |
808 | if (r < 0) | |
809 | return r; | |
810 | ||
811 | try { | |
812 | rgw_cls_usage_log_read_ret result; | |
813 | auto iter = out.cbegin(); | |
814 | decode(result, iter); | |
815 | read_iter = result.next_iter; | |
816 | if (is_truncated) | |
817 | *is_truncated = result.truncated; | |
818 | ||
819 | usage = result.usage; | |
820 | } catch (ceph::buffer::error& e) { | |
821 | return -EINVAL; | |
822 | } | |
823 | ||
824 | return 0; | |
825 | } | |
826 | ||
827 | int cls_rgw_usage_log_trim(IoCtx& io_ctx, const string& oid, const string& user, const string& bucket, | |
828 | uint64_t start_epoch, uint64_t end_epoch) | |
829 | { | |
830 | bufferlist in; | |
831 | rgw_cls_usage_log_trim_op call; | |
832 | call.start_epoch = start_epoch; | |
833 | call.end_epoch = end_epoch; | |
834 | call.user = user; | |
835 | call.bucket = bucket; | |
836 | encode(call, in); | |
837 | ||
838 | bool done = false; | |
839 | do { | |
840 | ObjectWriteOperation op; | |
841 | op.exec(RGW_CLASS, RGW_USER_USAGE_LOG_TRIM, in); | |
842 | int r = io_ctx.operate(oid, &op); | |
843 | if (r == -ENODATA) | |
844 | done = true; | |
845 | else if (r < 0) | |
846 | return r; | |
847 | } while (!done); | |
848 | ||
849 | return 0; | |
850 | } | |
851 | ||
852 | void cls_rgw_usage_log_trim(librados::ObjectWriteOperation& op, const string& user, const string& bucket, uint64_t start_epoch, uint64_t end_epoch) | |
853 | { | |
854 | bufferlist in; | |
855 | rgw_cls_usage_log_trim_op call; | |
856 | call.start_epoch = start_epoch; | |
857 | call.end_epoch = end_epoch; | |
858 | call.user = user; | |
859 | call.bucket = bucket; | |
860 | encode(call, in); | |
861 | ||
862 | op.exec(RGW_CLASS, RGW_USER_USAGE_LOG_TRIM, in); | |
863 | } | |
864 | ||
865 | void cls_rgw_usage_log_clear(ObjectWriteOperation& op) | |
866 | { | |
867 | bufferlist in; | |
868 | op.exec(RGW_CLASS, RGW_USAGE_LOG_CLEAR, in); | |
869 | } | |
870 | ||
871 | void cls_rgw_usage_log_add(ObjectWriteOperation& op, rgw_usage_log_info& info) | |
872 | { | |
873 | bufferlist in; | |
874 | rgw_cls_usage_log_add_op call; | |
875 | call.info = info; | |
876 | encode(call, in); | |
877 | op.exec(RGW_CLASS, RGW_USER_USAGE_LOG_ADD, in); | |
878 | } | |
879 | ||
880 | /* garbage collection */ | |
881 | ||
882 | void cls_rgw_gc_set_entry(ObjectWriteOperation& op, uint32_t expiration_secs, cls_rgw_gc_obj_info& info) | |
883 | { | |
884 | bufferlist in; | |
885 | cls_rgw_gc_set_entry_op call; | |
886 | call.expiration_secs = expiration_secs; | |
887 | call.info = info; | |
888 | encode(call, in); | |
889 | op.exec(RGW_CLASS, RGW_GC_SET_ENTRY, in); | |
890 | } | |
891 | ||
892 | void cls_rgw_gc_defer_entry(ObjectWriteOperation& op, uint32_t expiration_secs, const string& tag) | |
893 | { | |
894 | bufferlist in; | |
895 | cls_rgw_gc_defer_entry_op call; | |
896 | call.expiration_secs = expiration_secs; | |
897 | call.tag = tag; | |
898 | encode(call, in); | |
899 | op.exec(RGW_CLASS, RGW_GC_DEFER_ENTRY, in); | |
900 | } | |
901 | ||
902 | int cls_rgw_gc_list(IoCtx& io_ctx, string& oid, string& marker, uint32_t max, bool expired_only, | |
903 | list<cls_rgw_gc_obj_info>& entries, bool *truncated, string& next_marker) | |
904 | { | |
905 | bufferlist in, out; | |
906 | cls_rgw_gc_list_op call; | |
907 | call.marker = marker; | |
908 | call.max = max; | |
909 | call.expired_only = expired_only; | |
910 | encode(call, in); | |
911 | int r = io_ctx.exec(oid, RGW_CLASS, RGW_GC_LIST, in, out); | |
912 | if (r < 0) | |
913 | return r; | |
914 | ||
915 | cls_rgw_gc_list_ret ret; | |
916 | try { | |
917 | auto iter = out.cbegin(); | |
918 | decode(ret, iter); | |
919 | } catch (ceph::buffer::error& err) { | |
920 | return -EIO; | |
921 | } | |
922 | ||
923 | entries.swap(ret.entries); | |
924 | ||
925 | if (truncated) | |
926 | *truncated = ret.truncated; | |
927 | next_marker = std::move(ret.next_marker); | |
928 | return r; | |
929 | } | |
930 | ||
931 | void cls_rgw_gc_remove(librados::ObjectWriteOperation& op, const vector<string>& tags) | |
932 | { | |
933 | bufferlist in; | |
934 | cls_rgw_gc_remove_op call; | |
935 | call.tags = tags; | |
936 | encode(call, in); | |
937 | op.exec(RGW_CLASS, RGW_GC_REMOVE, in); | |
938 | } | |
939 | ||
940 | int cls_rgw_lc_get_head(IoCtx& io_ctx, const string& oid, cls_rgw_lc_obj_head& head) | |
941 | { | |
942 | bufferlist in, out; | |
943 | int r = io_ctx.exec(oid, RGW_CLASS, RGW_LC_GET_HEAD, in, out); | |
944 | if (r < 0) | |
945 | return r; | |
946 | ||
947 | cls_rgw_lc_get_head_ret ret; | |
948 | try { | |
949 | auto iter = out.cbegin(); | |
950 | decode(ret, iter); | |
951 | } catch (ceph::buffer::error& err) { | |
952 | return -EIO; | |
953 | } | |
954 | head = ret.head; | |
955 | ||
956 | return r; | |
957 | } | |
958 | ||
959 | int cls_rgw_lc_put_head(IoCtx& io_ctx, const string& oid, cls_rgw_lc_obj_head& head) | |
960 | { | |
961 | bufferlist in, out; | |
962 | cls_rgw_lc_put_head_op call; | |
963 | call.head = head; | |
964 | encode(call, in); | |
965 | int r = io_ctx.exec(oid, RGW_CLASS, RGW_LC_PUT_HEAD, in, out); | |
966 | return r; | |
967 | } | |
968 | ||
969 | int cls_rgw_lc_get_next_entry(IoCtx& io_ctx, const string& oid, const string& marker, | |
970 | cls_rgw_lc_entry& entry) | |
971 | { | |
972 | bufferlist in, out; | |
973 | cls_rgw_lc_get_next_entry_op call; | |
974 | call.marker = marker; | |
975 | encode(call, in); | |
976 | int r = io_ctx.exec(oid, RGW_CLASS, RGW_LC_GET_NEXT_ENTRY, in, out); | |
977 | if (r < 0) | |
978 | return r; | |
979 | ||
980 | cls_rgw_lc_get_next_entry_ret ret; | |
981 | try { | |
982 | auto iter = out.cbegin(); | |
983 | decode(ret, iter); | |
984 | } catch (ceph::buffer::error& err) { | |
985 | return -EIO; | |
986 | } | |
987 | entry = ret.entry; | |
988 | ||
989 | return r; | |
990 | } | |
991 | ||
992 | int cls_rgw_lc_rm_entry(IoCtx& io_ctx, const string& oid, | |
993 | const cls_rgw_lc_entry& entry) | |
994 | { | |
995 | bufferlist in, out; | |
996 | cls_rgw_lc_rm_entry_op call; | |
997 | call.entry = entry; | |
998 | encode(call, in); | |
999 | int r = io_ctx.exec(oid, RGW_CLASS, RGW_LC_RM_ENTRY, in, out); | |
1000 | return r; | |
1001 | } | |
1002 | ||
1003 | int cls_rgw_lc_set_entry(IoCtx& io_ctx, const string& oid, | |
1004 | const cls_rgw_lc_entry& entry) | |
1005 | { | |
1006 | bufferlist in, out; | |
1007 | cls_rgw_lc_set_entry_op call; | |
1008 | call.entry = entry; | |
1009 | encode(call, in); | |
1010 | int r = io_ctx.exec(oid, RGW_CLASS, RGW_LC_SET_ENTRY, in, out); | |
1011 | return r; | |
1012 | } | |
1013 | ||
1014 | int cls_rgw_lc_get_entry(IoCtx& io_ctx, const string& oid, | |
1015 | const std::string& marker, cls_rgw_lc_entry& entry) | |
1016 | { | |
1017 | bufferlist in, out; | |
1018 | cls_rgw_lc_get_entry_op call{marker};; | |
1019 | encode(call, in); | |
1020 | int r = io_ctx.exec(oid, RGW_CLASS, RGW_LC_GET_ENTRY, in, out); | |
1021 | ||
1022 | if (r < 0) { | |
1023 | return r; | |
1024 | } | |
1025 | ||
1026 | cls_rgw_lc_get_entry_ret ret; | |
1027 | try { | |
1028 | auto iter = out.cbegin(); | |
1029 | decode(ret, iter); | |
1030 | } catch (ceph::buffer::error& err) { | |
1031 | return -EIO; | |
1032 | } | |
1033 | ||
1034 | entry = std::move(ret.entry); | |
1035 | return r; | |
1036 | } | |
1037 | ||
1038 | int cls_rgw_lc_list(IoCtx& io_ctx, const string& oid, | |
1039 | const string& marker, | |
1040 | uint32_t max_entries, | |
1041 | vector<cls_rgw_lc_entry>& entries) | |
1042 | { | |
1043 | bufferlist in, out; | |
1044 | cls_rgw_lc_list_entries_op op; | |
1045 | ||
1046 | entries.clear(); | |
1047 | ||
1048 | op.marker = marker; | |
1049 | op.max_entries = max_entries; | |
1050 | ||
1051 | encode(op, in); | |
1052 | ||
1053 | int r = io_ctx.exec(oid, RGW_CLASS, RGW_LC_LIST_ENTRIES, in, out); | |
1054 | if (r < 0) | |
1055 | return r; | |
1056 | ||
1057 | cls_rgw_lc_list_entries_ret ret; | |
1058 | try { | |
1059 | auto iter = out.cbegin(); | |
1060 | decode(ret, iter); | |
1061 | } catch (ceph::buffer::error& err) { | |
1062 | return -EIO; | |
1063 | } | |
1064 | ||
1065 | std::sort(std::begin(ret.entries), std::end(ret.entries), | |
1066 | [](const cls_rgw_lc_entry& a, const cls_rgw_lc_entry& b) | |
1067 | { return a.bucket < b.bucket; }); | |
1068 | entries = std::move(ret.entries); | |
1069 | return r; | |
1070 | } | |
1071 | ||
1072 | void cls_rgw_mp_upload_part_info_update(librados::ObjectWriteOperation& op, | |
1073 | const std::string& part_key, | |
1074 | const RGWUploadPartInfo& info) | |
1075 | { | |
1076 | cls_rgw_mp_upload_part_info_update_op call; | |
1077 | call.part_key = part_key; | |
1078 | call.info = info; | |
1079 | ||
1080 | buffer::list in; | |
1081 | encode(call, in); | |
1082 | ||
1083 | op.exec(RGW_CLASS, RGW_MP_UPLOAD_PART_INFO_UPDATE, in); | |
1084 | } | |
1085 | ||
1086 | void cls_rgw_reshard_add(librados::ObjectWriteOperation& op, const cls_rgw_reshard_entry& entry) | |
1087 | { | |
1088 | bufferlist in; | |
1089 | cls_rgw_reshard_add_op call; | |
1090 | call.entry = entry; | |
1091 | encode(call, in); | |
1092 | op.exec(RGW_CLASS, RGW_RESHARD_ADD, in); | |
1093 | } | |
1094 | ||
1095 | int cls_rgw_reshard_list(librados::IoCtx& io_ctx, const string& oid, string& marker, uint32_t max, | |
1096 | list<cls_rgw_reshard_entry>& entries, bool* is_truncated) | |
1097 | { | |
1098 | bufferlist in, out; | |
1099 | cls_rgw_reshard_list_op call; | |
1100 | call.marker = marker; | |
1101 | call.max = max; | |
1102 | encode(call, in); | |
1103 | int r = io_ctx.exec(oid, RGW_CLASS, RGW_RESHARD_LIST, in, out); | |
1104 | if (r < 0) | |
1105 | return r; | |
1106 | ||
1107 | cls_rgw_reshard_list_ret op_ret; | |
1108 | auto iter = out.cbegin(); | |
1109 | try { | |
1110 | decode(op_ret, iter); | |
1111 | } catch (ceph::buffer::error& err) { | |
1112 | return -EIO; | |
1113 | } | |
1114 | ||
1115 | entries.swap(op_ret.entries); | |
1116 | *is_truncated = op_ret.is_truncated; | |
1117 | ||
1118 | return 0; | |
1119 | } | |
1120 | ||
1121 | int cls_rgw_reshard_get(librados::IoCtx& io_ctx, const string& oid, cls_rgw_reshard_entry& entry) | |
1122 | { | |
1123 | bufferlist in, out; | |
1124 | cls_rgw_reshard_get_op call; | |
1125 | call.entry = entry; | |
1126 | encode(call, in); | |
1127 | int r = io_ctx.exec(oid, RGW_CLASS, RGW_RESHARD_GET, in, out); | |
1128 | if (r < 0) | |
1129 | return r; | |
1130 | ||
1131 | cls_rgw_reshard_get_ret op_ret; | |
1132 | auto iter = out.cbegin(); | |
1133 | try { | |
1134 | decode(op_ret, iter); | |
1135 | } catch (ceph::buffer::error& err) { | |
1136 | return -EIO; | |
1137 | } | |
1138 | ||
1139 | entry = op_ret.entry; | |
1140 | ||
1141 | return 0; | |
1142 | } | |
1143 | ||
1144 | void cls_rgw_reshard_remove(librados::ObjectWriteOperation& op, const cls_rgw_reshard_entry& entry) | |
1145 | { | |
1146 | bufferlist in; | |
1147 | cls_rgw_reshard_remove_op call; | |
1148 | call.tenant = entry.tenant; | |
1149 | call.bucket_name = entry.bucket_name; | |
1150 | call.bucket_id = entry.bucket_id; | |
1151 | encode(call, in); | |
1152 | op.exec(RGW_CLASS, RGW_RESHARD_REMOVE, in); | |
1153 | } | |
1154 | ||
1155 | int cls_rgw_set_bucket_resharding(librados::IoCtx& io_ctx, const string& oid, | |
1156 | const cls_rgw_bucket_instance_entry& entry) | |
1157 | { | |
1158 | bufferlist in, out; | |
1159 | cls_rgw_set_bucket_resharding_op call; | |
1160 | call.entry = entry; | |
1161 | encode(call, in); | |
1162 | return io_ctx.exec(oid, RGW_CLASS, RGW_SET_BUCKET_RESHARDING, in, out); | |
1163 | } | |
1164 | ||
1165 | int cls_rgw_clear_bucket_resharding(librados::IoCtx& io_ctx, const string& oid) | |
1166 | { | |
1167 | bufferlist in, out; | |
1168 | cls_rgw_clear_bucket_resharding_op call; | |
1169 | encode(call, in); | |
1170 | return io_ctx.exec(oid, RGW_CLASS, RGW_CLEAR_BUCKET_RESHARDING, in, out); | |
1171 | } | |
1172 | ||
1173 | int cls_rgw_get_bucket_resharding(librados::IoCtx& io_ctx, const string& oid, | |
1174 | cls_rgw_bucket_instance_entry *entry) | |
1175 | { | |
1176 | bufferlist in, out; | |
1177 | cls_rgw_get_bucket_resharding_op call; | |
1178 | encode(call, in); | |
1179 | int r= io_ctx.exec(oid, RGW_CLASS, RGW_GET_BUCKET_RESHARDING, in, out); | |
1180 | if (r < 0) | |
1181 | return r; | |
1182 | ||
1183 | cls_rgw_get_bucket_resharding_ret op_ret; | |
1184 | auto iter = out.cbegin(); | |
1185 | try { | |
1186 | decode(op_ret, iter); | |
1187 | } catch (ceph::buffer::error& err) { | |
1188 | return -EIO; | |
1189 | } | |
1190 | ||
1191 | *entry = op_ret.new_instance; | |
1192 | ||
1193 | return 0; | |
1194 | } | |
1195 | ||
1196 | void cls_rgw_guard_bucket_resharding(librados::ObjectOperation& op, int ret_err) | |
1197 | { | |
1198 | bufferlist in, out; | |
1199 | cls_rgw_guard_bucket_resharding_op call; | |
1200 | call.ret_err = ret_err; | |
1201 | encode(call, in); | |
1202 | op.exec(RGW_CLASS, RGW_GUARD_BUCKET_RESHARDING, in); | |
1203 | } | |
1204 | ||
1205 | static bool issue_set_bucket_resharding(librados::IoCtx& io_ctx, | |
1206 | const int shard_id, const string& oid, | |
1207 | const cls_rgw_bucket_instance_entry& entry, | |
1208 | BucketIndexAioManager *manager) { | |
1209 | bufferlist in; | |
1210 | cls_rgw_set_bucket_resharding_op call; | |
1211 | call.entry = entry; | |
1212 | encode(call, in); | |
1213 | librados::ObjectWriteOperation op; | |
1214 | op.assert_exists(); // the shard must exist; if not fail rather than recreate | |
1215 | op.exec(RGW_CLASS, RGW_SET_BUCKET_RESHARDING, in); | |
1216 | return manager->aio_operate(io_ctx, shard_id, oid, &op); | |
1217 | } | |
1218 | ||
1219 | int CLSRGWIssueSetBucketResharding::issue_op(const int shard_id, const string& oid) | |
1220 | { | |
1221 | return issue_set_bucket_resharding(io_ctx, shard_id, oid, entry, &manager); | |
1222 | } |