1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include "cls/rgw/cls_rgw_const.h"
7 #include "cls/rgw/cls_rgw_client.h"
9 #include "common/debug.h"
17 using ceph::real_time
;
19 using namespace librados
;
21 const string
BucketIndexShardsManager::KEY_VALUE_SEPARATOR
= "#";
22 const string
BucketIndexShardsManager::SHARDS_SEPARATOR
= ",";
25 int CLSRGWConcurrentIO::operator()() {
27 iter
= objs_container
.begin();
28 for (; iter
!= objs_container
.end() && max_aio
-- > 0; ++iter
) {
29 ret
= issue_op(iter
->first
, iter
->second
);
34 int num_completions
= 0, r
= 0;
35 std::map
<int, std::string
> completed_objs
;
36 std::map
<int, std::string
> retry_objs
;
37 while (manager
.wait_for_completions(valid_ret_code(), &num_completions
, &r
,
38 need_multiple_rounds() ? &completed_objs
: nullptr,
39 !need_multiple_rounds() ? &retry_objs
: nullptr)) {
40 if (r
>= 0 && ret
>= 0) {
41 for (; num_completions
&& iter
!= objs_container
.end(); --num_completions
, ++iter
) {
42 int issue_ret
= issue_op(iter
->first
, iter
->second
);
48 } else if (ret
>= 0) {
52 // if we're at the end with this round, see if another round is needed
53 if (iter
== objs_container
.end()) {
54 if (need_multiple_rounds() && !completed_objs
.empty()) {
55 // For those objects which need another round, use them to reset
57 reset_container(completed_objs
);
58 iter
= objs_container
.begin();
59 } else if (! need_multiple_rounds() && !retry_objs
.empty()) {
60 reset_container(retry_objs
);
61 iter
= objs_container
.begin();
64 // re-issue ops if container was reset above (i.e., iter !=
65 // objs_container.end()); if it was not reset above (i.e., iter
66 // == objs_container.end()) the loop will exit immediately
68 for (; num_completions
&& iter
!= objs_container
.end(); --num_completions
, ++iter
) {
69 int issue_ret
= issue_op(iter
->first
, iter
->second
);
82 } // CLSRGWConcurrintIO::operator()()
86 * This class represents the bucket index object operation callback context.
89 class ClsBucketIndexOpCtx
: public ObjectOperationCompletion
{
94 ClsBucketIndexOpCtx(T
* _data
, int *_ret_code
) : data(_data
), ret_code(_ret_code
) { ceph_assert(data
); }
95 ~ClsBucketIndexOpCtx() override
{}
96 void handle_completion(int r
, bufferlist
& outbl
) override
{
97 // if successful, or we're asked for a retry, copy result into
98 // destination (*data)
99 if (r
>= 0 || r
== RGWBIAdvanceAndRetryError
) {
101 auto iter
= outbl
.cbegin();
102 decode((*data
), iter
);
103 } catch (ceph::buffer::error
& err
) {
113 void BucketIndexAioManager::do_completion(const int request_id
) {
114 std::lock_guard l
{lock
};
116 auto iter
= pendings
.find(request_id
);
117 ceph_assert(iter
!= pendings
.end());
118 completions
[request_id
] = iter
->second
;
119 pendings
.erase(iter
);
121 // If the caller needs a list of finished objects, store them
122 // for further processing
123 auto miter
= pending_objs
.find(request_id
);
124 if (miter
!= pending_objs
.end()) {
125 completion_objs
.emplace(request_id
, miter
->second
);
126 pending_objs
.erase(miter
);
132 bool BucketIndexAioManager::wait_for_completions(int valid_ret_code
,
133 int *num_completions
,
135 std::map
<int, std::string
> *completed_objs
,
136 std::map
<int, std::string
> *retry_objs
)
138 std::unique_lock locker
{lock
};
139 if (pendings
.empty() && completions
.empty()) {
143 if (completions
.empty()) {
144 // Wait for AIO completion
148 // Clear the completed AIOs
149 auto iter
= completions
.begin();
150 for (; iter
!= completions
.end(); ++iter
) {
151 int r
= iter
->second
->get_return_value();
153 // see if we may need to copy completions or retries
154 if (completed_objs
|| retry_objs
) {
155 auto liter
= completion_objs
.find(iter
->first
);
156 if (liter
!= completion_objs
.end()) {
157 if (completed_objs
&& r
== 0) { /* update list of successfully completed objs */
158 (*completed_objs
)[liter
->second
.shard_id
] = liter
->second
.oid
;
161 if (r
== RGWBIAdvanceAndRetryError
) {
164 (*retry_objs
)[liter
->second
.shard_id
] = liter
->second
.oid
;
168 // NB: should we log an error here; currently no logging
173 if (ret_code
&& (r
< 0 && r
!= valid_ret_code
)) {
177 iter
->second
->release();
180 if (num_completions
) {
181 (*num_completions
) = completions
.size();
189 // note: currently only called by tesing code
190 void cls_rgw_bucket_init_index(ObjectWriteOperation
& o
)
193 o
.exec(RGW_CLASS
, RGW_BUCKET_INIT_INDEX
, in
);
196 static bool issue_bucket_index_init_op(librados::IoCtx
& io_ctx
,
199 BucketIndexAioManager
*manager
) {
201 librados::ObjectWriteOperation op
;
203 op
.exec(RGW_CLASS
, RGW_BUCKET_INIT_INDEX
, in
);
204 return manager
->aio_operate(io_ctx
, shard_id
, oid
, &op
);
207 static bool issue_bucket_index_clean_op(librados::IoCtx
& io_ctx
,
210 BucketIndexAioManager
*manager
) {
212 librados::ObjectWriteOperation op
;
214 return manager
->aio_operate(io_ctx
, shard_id
, oid
, &op
);
217 static bool issue_bucket_set_tag_timeout_op(librados::IoCtx
& io_ctx
,
221 BucketIndexAioManager
*manager
) {
223 rgw_cls_tag_timeout_op call
;
224 call
.tag_timeout
= timeout
;
226 ObjectWriteOperation op
;
227 op
.exec(RGW_CLASS
, RGW_BUCKET_SET_TAG_TIMEOUT
, in
);
228 return manager
->aio_operate(io_ctx
, shard_id
, oid
, &op
);
231 int CLSRGWIssueBucketIndexInit::issue_op(const int shard_id
, const string
& oid
)
233 return issue_bucket_index_init_op(io_ctx
, shard_id
, oid
, &manager
);
236 void CLSRGWIssueBucketIndexInit::cleanup()
238 // Do best effort removal
239 for (auto citer
= objs_container
.begin(); citer
!= iter
; ++citer
) {
240 io_ctx
.remove(citer
->second
);
244 int CLSRGWIssueBucketIndexClean::issue_op(const int shard_id
, const string
& oid
)
246 return issue_bucket_index_clean_op(io_ctx
, shard_id
, oid
, &manager
);
249 int CLSRGWIssueSetTagTimeout::issue_op(const int shard_id
, const string
& oid
)
251 return issue_bucket_set_tag_timeout_op(io_ctx
, shard_id
, oid
, tag_timeout
, &manager
);
254 void cls_rgw_bucket_update_stats(librados::ObjectWriteOperation
& o
,
256 const map
<RGWObjCategory
, rgw_bucket_category_stats
>& stats
)
258 rgw_cls_bucket_update_stats_op call
;
259 call
.absolute
= absolute
;
263 o
.exec(RGW_CLASS
, RGW_BUCKET_UPDATE_STATS
, in
);
266 void cls_rgw_bucket_prepare_op(ObjectWriteOperation
& o
, RGWModifyOp op
, const string
& tag
,
267 const cls_rgw_obj_key
& key
, const string
& locator
, bool log_op
,
268 uint16_t bilog_flags
, const rgw_zone_set
& zones_trace
)
270 rgw_cls_obj_prepare_op call
;
274 call
.locator
= locator
;
275 call
.log_op
= log_op
;
276 call
.bilog_flags
= bilog_flags
;
277 call
.zones_trace
= zones_trace
;
280 o
.exec(RGW_CLASS
, RGW_BUCKET_PREPARE_OP
, in
);
283 void cls_rgw_bucket_complete_op(ObjectWriteOperation
& o
, RGWModifyOp op
, const string
& tag
,
284 const rgw_bucket_entry_ver
& ver
,
285 const cls_rgw_obj_key
& key
,
286 const rgw_bucket_dir_entry_meta
& dir_meta
,
287 const list
<cls_rgw_obj_key
> *remove_objs
, bool log_op
,
288 uint16_t bilog_flags
,
289 const rgw_zone_set
*zones_trace
)
293 rgw_cls_obj_complete_op call
;
298 call
.meta
= dir_meta
;
299 call
.log_op
= log_op
;
300 call
.bilog_flags
= bilog_flags
;
302 call
.remove_objs
= *remove_objs
;
304 call
.zones_trace
= *zones_trace
;
307 o
.exec(RGW_CLASS
, RGW_BUCKET_COMPLETE_OP
, in
);
310 void cls_rgw_bucket_list_op(librados::ObjectReadOperation
& op
,
311 const cls_rgw_obj_key
& start_obj
,
312 const std::string
& filter_prefix
,
313 const std::string
& delimiter
,
314 uint32_t num_entries
,
316 rgw_cls_list_ret
* result
)
319 rgw_cls_list_op call
;
320 call
.start_obj
= start_obj
;
321 call
.filter_prefix
= filter_prefix
;
322 call
.delimiter
= delimiter
;
323 call
.num_entries
= num_entries
;
324 call
.list_versions
= list_versions
;
327 op
.exec(RGW_CLASS
, RGW_BUCKET_LIST
, in
,
328 new ClsBucketIndexOpCtx
<rgw_cls_list_ret
>(result
, NULL
));
331 static bool issue_bucket_list_op(librados::IoCtx
& io_ctx
,
333 const std::string
& oid
,
334 const cls_rgw_obj_key
& start_obj
,
335 const std::string
& filter_prefix
,
336 const std::string
& delimiter
,
337 uint32_t num_entries
,
339 BucketIndexAioManager
*manager
,
340 rgw_cls_list_ret
*pdata
)
342 librados::ObjectReadOperation op
;
343 cls_rgw_bucket_list_op(op
,
344 start_obj
, filter_prefix
, delimiter
,
345 num_entries
, list_versions
, pdata
);
346 return manager
->aio_operate(io_ctx
, shard_id
, oid
, &op
);
349 int CLSRGWIssueBucketList::issue_op(const int shard_id
, const string
& oid
)
351 // set the marker depending on whether we've already queried this
352 // shard and gotten a RGWBIAdvanceAndRetryError (defined
353 // constant) return value; if we have use the marker in the return
354 // to advance the search, otherwise use the marker passed in by the
356 cls_rgw_obj_key marker
;
357 auto iter
= result
.find(shard_id
);
358 if (iter
!= result
.end()) {
359 marker
= iter
->second
.marker
;
364 return issue_bucket_list_op(io_ctx
, shard_id
, oid
,
365 marker
, filter_prefix
, delimiter
,
366 num_entries
, list_versions
, &manager
,
371 void CLSRGWIssueBucketList::reset_container(std::map
<int, std::string
>& objs
)
373 objs_container
.swap(objs
);
374 iter
= objs_container
.begin();
379 void cls_rgw_remove_obj(librados::ObjectWriteOperation
& o
, list
<string
>& keep_attr_prefixes
)
382 rgw_cls_obj_remove_op call
;
383 call
.keep_attr_prefixes
= keep_attr_prefixes
;
385 o
.exec(RGW_CLASS
, RGW_OBJ_REMOVE
, in
);
388 void cls_rgw_obj_store_pg_ver(librados::ObjectWriteOperation
& o
, const string
& attr
)
391 rgw_cls_obj_store_pg_ver_op call
;
394 o
.exec(RGW_CLASS
, RGW_OBJ_STORE_PG_VER
, in
);
397 void cls_rgw_obj_check_attrs_prefix(librados::ObjectOperation
& o
, const string
& prefix
, bool fail_if_exist
)
400 rgw_cls_obj_check_attrs_prefix call
;
401 call
.check_prefix
= prefix
;
402 call
.fail_if_exist
= fail_if_exist
;
404 o
.exec(RGW_CLASS
, RGW_OBJ_CHECK_ATTRS_PREFIX
, in
);
407 void cls_rgw_obj_check_mtime(librados::ObjectOperation
& o
, const real_time
& mtime
, bool high_precision_time
, RGWCheckMTimeType type
)
410 rgw_cls_obj_check_mtime call
;
412 call
.high_precision_time
= high_precision_time
;
415 o
.exec(RGW_CLASS
, RGW_OBJ_CHECK_MTIME
, in
);
418 int cls_rgw_bi_get(librados::IoCtx
& io_ctx
, const string oid
,
419 BIIndexType index_type
, const cls_rgw_obj_key
& key
,
420 rgw_cls_bi_entry
*entry
)
423 rgw_cls_bi_get_op call
;
425 call
.type
= index_type
;
427 int r
= io_ctx
.exec(oid
, RGW_CLASS
, RGW_BI_GET
, in
, out
);
431 rgw_cls_bi_get_ret op_ret
;
432 auto iter
= out
.cbegin();
434 decode(op_ret
, iter
);
435 } catch (ceph::buffer::error
& err
) {
439 *entry
= op_ret
.entry
;
444 int cls_rgw_bi_put(librados::IoCtx
& io_ctx
, const string oid
, const rgw_cls_bi_entry
& entry
)
447 rgw_cls_bi_put_op call
;
450 int r
= io_ctx
.exec(oid
, RGW_CLASS
, RGW_BI_PUT
, in
, out
);
457 void cls_rgw_bi_put(ObjectWriteOperation
& op
, const string oid
, const rgw_cls_bi_entry
& entry
)
460 rgw_cls_bi_put_op call
;
463 op
.exec(RGW_CLASS
, RGW_BI_PUT
, in
);
466 /* nb: any entries passed in are replaced with the results of the cls
467 * call, so caller does not need to clear entries between calls
469 int cls_rgw_bi_list(librados::IoCtx
& io_ctx
, const std::string
& oid
,
470 const std::string
& name_filter
, const std::string
& marker
, uint32_t max
,
471 std::list
<rgw_cls_bi_entry
> *entries
, bool *is_truncated
)
474 rgw_cls_bi_list_op call
;
475 call
.name_filter
= name_filter
;
476 call
.marker
= marker
;
479 int r
= io_ctx
.exec(oid
, RGW_CLASS
, RGW_BI_LIST
, in
, out
);
483 rgw_cls_bi_list_ret op_ret
;
484 auto iter
= out
.cbegin();
486 decode(op_ret
, iter
);
487 } catch (ceph::buffer::error
& err
) {
491 entries
->swap(op_ret
.entries
);
492 *is_truncated
= op_ret
.is_truncated
;
497 int cls_rgw_bucket_link_olh(librados::IoCtx
& io_ctx
, const string
& oid
,
498 const cls_rgw_obj_key
& key
, const bufferlist
& olh_tag
,
499 bool delete_marker
, const string
& op_tag
, const rgw_bucket_dir_entry_meta
*meta
,
500 uint64_t olh_epoch
, ceph::real_time unmod_since
, bool high_precision_time
, bool log_op
, const rgw_zone_set
& zones_trace
)
502 librados::ObjectWriteOperation op
;
503 cls_rgw_bucket_link_olh(op
, key
, olh_tag
, delete_marker
, op_tag
, meta
,
504 olh_epoch
, unmod_since
, high_precision_time
, log_op
,
507 return io_ctx
.operate(oid
, &op
);
511 void cls_rgw_bucket_link_olh(librados::ObjectWriteOperation
& op
, const cls_rgw_obj_key
& key
,
512 const bufferlist
& olh_tag
, bool delete_marker
,
513 const string
& op_tag
, const rgw_bucket_dir_entry_meta
*meta
,
514 uint64_t olh_epoch
, ceph::real_time unmod_since
, bool high_precision_time
, bool log_op
, const rgw_zone_set
& zones_trace
)
517 rgw_cls_link_olh_op call
;
519 call
.olh_tag
= olh_tag
.to_str();
520 call
.op_tag
= op_tag
;
521 call
.delete_marker
= delete_marker
;
525 call
.olh_epoch
= olh_epoch
;
526 call
.log_op
= log_op
;
527 call
.unmod_since
= unmod_since
;
528 call
.high_precision_time
= high_precision_time
;
529 call
.zones_trace
= zones_trace
;
531 op
.exec(RGW_CLASS
, RGW_BUCKET_LINK_OLH
, in
);
534 int cls_rgw_bucket_unlink_instance(librados::IoCtx
& io_ctx
, const string
& oid
,
535 const cls_rgw_obj_key
& key
, const string
& op_tag
,
536 const string
& olh_tag
, uint64_t olh_epoch
, bool log_op
, const rgw_zone_set
& zones_trace
)
538 librados::ObjectWriteOperation op
;
539 cls_rgw_bucket_unlink_instance(op
, key
, op_tag
, olh_tag
, olh_epoch
, log_op
, zones_trace
);
540 int r
= io_ctx
.operate(oid
, &op
);
547 void cls_rgw_bucket_unlink_instance(librados::ObjectWriteOperation
& op
,
548 const cls_rgw_obj_key
& key
, const string
& op_tag
,
549 const string
& olh_tag
, uint64_t olh_epoch
, bool log_op
, const rgw_zone_set
& zones_trace
)
552 rgw_cls_unlink_instance_op call
;
554 call
.op_tag
= op_tag
;
555 call
.olh_epoch
= olh_epoch
;
556 call
.olh_tag
= olh_tag
;
557 call
.log_op
= log_op
;
558 call
.zones_trace
= zones_trace
;
560 op
.exec(RGW_CLASS
, RGW_BUCKET_UNLINK_INSTANCE
, in
);
563 void cls_rgw_get_olh_log(librados::ObjectReadOperation
& op
, const cls_rgw_obj_key
& olh
, uint64_t ver_marker
, const string
& olh_tag
, rgw_cls_read_olh_log_ret
& log_ret
, int& op_ret
)
566 rgw_cls_read_olh_log_op call
;
568 call
.ver_marker
= ver_marker
;
569 call
.olh_tag
= olh_tag
;
571 op
.exec(RGW_CLASS
, RGW_BUCKET_READ_OLH_LOG
, in
, new ClsBucketIndexOpCtx
<rgw_cls_read_olh_log_ret
>(&log_ret
, &op_ret
));
574 int cls_rgw_get_olh_log(IoCtx
& io_ctx
, string
& oid
, const cls_rgw_obj_key
& olh
, uint64_t ver_marker
,
575 const string
& olh_tag
,
576 rgw_cls_read_olh_log_ret
& log_ret
)
579 librados::ObjectReadOperation op
;
580 cls_rgw_get_olh_log(op
, olh
, ver_marker
, olh_tag
, log_ret
, op_ret
);
581 int r
= io_ctx
.operate(oid
, &op
, NULL
);
592 void cls_rgw_trim_olh_log(librados::ObjectWriteOperation
& op
, const cls_rgw_obj_key
& olh
, uint64_t ver
, const string
& olh_tag
)
595 rgw_cls_trim_olh_log_op call
;
598 call
.olh_tag
= olh_tag
;
600 op
.exec(RGW_CLASS
, RGW_BUCKET_TRIM_OLH_LOG
, in
);
603 int cls_rgw_clear_olh(IoCtx
& io_ctx
, string
& oid
, const cls_rgw_obj_key
& olh
, const string
& olh_tag
)
605 librados::ObjectWriteOperation op
;
606 cls_rgw_clear_olh(op
, olh
, olh_tag
);
608 return io_ctx
.operate(oid
, &op
);
611 void cls_rgw_clear_olh(librados::ObjectWriteOperation
& op
, const cls_rgw_obj_key
& olh
, const string
& olh_tag
)
614 rgw_cls_bucket_clear_olh_op call
;
616 call
.olh_tag
= olh_tag
;
618 op
.exec(RGW_CLASS
, RGW_BUCKET_CLEAR_OLH
, in
);
621 void cls_rgw_bilog_list(librados::ObjectReadOperation
& op
,
622 const std::string
& marker
, uint32_t max
,
623 cls_rgw_bi_log_list_ret
*pdata
, int *ret
)
625 cls_rgw_bi_log_list_op call
;
626 call
.marker
= marker
;
631 op
.exec(RGW_CLASS
, RGW_BI_LOG_LIST
, in
, new ClsBucketIndexOpCtx
<cls_rgw_bi_log_list_ret
>(pdata
, ret
));
634 static bool issue_bi_log_list_op(librados::IoCtx
& io_ctx
, const string
& oid
, const int shard_id
,
635 BucketIndexShardsManager
& marker_mgr
, uint32_t max
,
636 BucketIndexAioManager
*manager
,
637 cls_rgw_bi_log_list_ret
*pdata
)
639 librados::ObjectReadOperation op
;
640 cls_rgw_bilog_list(op
, marker_mgr
.get(shard_id
, ""), max
, pdata
, nullptr);
641 return manager
->aio_operate(io_ctx
, shard_id
, oid
, &op
);
644 int CLSRGWIssueBILogList::issue_op(const int shard_id
, const string
& oid
)
646 return issue_bi_log_list_op(io_ctx
, oid
, shard_id
, marker_mgr
, max
, &manager
, &result
[shard_id
]);
649 void cls_rgw_bilog_trim(librados::ObjectWriteOperation
& op
,
650 const std::string
& start_marker
,
651 const std::string
& end_marker
)
653 cls_rgw_bi_log_trim_op call
;
654 call
.start_marker
= start_marker
;
655 call
.end_marker
= end_marker
;
659 op
.exec(RGW_CLASS
, RGW_BI_LOG_TRIM
, in
);
662 static bool issue_bi_log_trim(librados::IoCtx
& io_ctx
, const string
& oid
, const int shard_id
,
663 BucketIndexShardsManager
& start_marker_mgr
,
664 BucketIndexShardsManager
& end_marker_mgr
, BucketIndexAioManager
*manager
) {
665 cls_rgw_bi_log_trim_op call
;
666 librados::ObjectWriteOperation op
;
667 cls_rgw_bilog_trim(op
, start_marker_mgr
.get(shard_id
, ""),
668 end_marker_mgr
.get(shard_id
, ""));
669 return manager
->aio_operate(io_ctx
, shard_id
, oid
, &op
);
672 int CLSRGWIssueBILogTrim::issue_op(const int shard_id
, const string
& oid
)
674 return issue_bi_log_trim(io_ctx
, oid
, shard_id
, start_marker_mgr
, end_marker_mgr
, &manager
);
677 static bool issue_bucket_check_index_op(IoCtx
& io_ctx
, const int shard_id
, const string
& oid
, BucketIndexAioManager
*manager
,
678 rgw_cls_check_index_ret
*pdata
) {
680 librados::ObjectReadOperation op
;
681 op
.exec(RGW_CLASS
, RGW_BUCKET_CHECK_INDEX
, in
, new ClsBucketIndexOpCtx
<rgw_cls_check_index_ret
>(
683 return manager
->aio_operate(io_ctx
, shard_id
, oid
, &op
);
686 int CLSRGWIssueBucketCheck::issue_op(int shard_id
, const string
& oid
)
688 return issue_bucket_check_index_op(io_ctx
, shard_id
, oid
, &manager
, &result
[shard_id
]);
691 static bool issue_bucket_rebuild_index_op(IoCtx
& io_ctx
, const int shard_id
, const string
& oid
,
692 BucketIndexAioManager
*manager
) {
694 librados::ObjectWriteOperation op
;
695 op
.exec(RGW_CLASS
, RGW_BUCKET_REBUILD_INDEX
, in
);
696 return manager
->aio_operate(io_ctx
, shard_id
, oid
, &op
);
699 int CLSRGWIssueBucketRebuild::issue_op(const int shard_id
, const string
& oid
)
701 return issue_bucket_rebuild_index_op(io_ctx
, shard_id
, oid
, &manager
);
704 void cls_rgw_encode_suggestion(char op
, rgw_bucket_dir_entry
& dirent
, bufferlist
& updates
)
707 encode(dirent
, updates
);
710 void cls_rgw_suggest_changes(ObjectWriteOperation
& o
, bufferlist
& updates
)
712 o
.exec(RGW_CLASS
, RGW_DIR_SUGGEST_CHANGES
, updates
);
715 int CLSRGWIssueGetDirHeader::issue_op(const int shard_id
, const string
& oid
)
717 cls_rgw_obj_key empty_key
;
719 string empty_delimiter
;
720 return issue_bucket_list_op(io_ctx
, shard_id
, oid
,
721 empty_key
, empty_prefix
, empty_delimiter
,
722 0, false, &manager
, &result
[shard_id
]);
725 static bool issue_resync_bi_log(librados::IoCtx
& io_ctx
, const int shard_id
, const string
& oid
, BucketIndexAioManager
*manager
)
728 librados::ObjectWriteOperation op
;
729 op
.exec(RGW_CLASS
, RGW_BI_LOG_RESYNC
, in
);
730 return manager
->aio_operate(io_ctx
, shard_id
, oid
, &op
);
733 int CLSRGWIssueResyncBucketBILog::issue_op(const int shard_id
, const string
& oid
)
735 return issue_resync_bi_log(io_ctx
, shard_id
, oid
, &manager
);
738 static bool issue_bi_log_stop(librados::IoCtx
& io_ctx
, const int shard_id
, const string
& oid
, BucketIndexAioManager
*manager
)
741 librados::ObjectWriteOperation op
;
742 op
.exec(RGW_CLASS
, RGW_BI_LOG_STOP
, in
);
743 return manager
->aio_operate(io_ctx
, shard_id
, oid
, &op
);
746 int CLSRGWIssueBucketBILogStop::issue_op(const int shard_id
, const string
& oid
)
748 return issue_bi_log_stop(io_ctx
, shard_id
, oid
, &manager
);
751 class GetDirHeaderCompletion
: public ObjectOperationCompletion
{
752 RGWGetDirHeader_CB
*ret_ctx
;
754 explicit GetDirHeaderCompletion(RGWGetDirHeader_CB
*_ctx
) : ret_ctx(_ctx
) {}
755 ~GetDirHeaderCompletion() override
{
758 void handle_completion(int r
, bufferlist
& outbl
) override
{
759 rgw_cls_list_ret ret
;
761 auto iter
= outbl
.cbegin();
763 } catch (ceph::buffer::error
& err
) {
767 ret_ctx
->handle_response(r
, ret
.dir
.header
);
771 int cls_rgw_get_dir_header_async(IoCtx
& io_ctx
, string
& oid
, RGWGetDirHeader_CB
*ctx
)
774 rgw_cls_list_op call
;
775 call
.num_entries
= 0;
777 ObjectReadOperation op
;
778 GetDirHeaderCompletion
*cb
= new GetDirHeaderCompletion(ctx
);
779 op
.exec(RGW_CLASS
, RGW_BUCKET_LIST
, in
, cb
);
780 AioCompletion
*c
= librados::Rados::aio_create_completion(nullptr, nullptr);
781 int r
= io_ctx
.aio_operate(oid
, c
, &op
, NULL
);
789 int cls_rgw_usage_log_read(IoCtx
& io_ctx
, const string
& oid
, const string
& user
, const string
& bucket
,
790 uint64_t start_epoch
, uint64_t end_epoch
, uint32_t max_entries
,
791 string
& read_iter
, map
<rgw_user_bucket
, rgw_usage_log_entry
>& usage
,
795 *is_truncated
= false;
798 rgw_cls_usage_log_read_op call
;
799 call
.start_epoch
= start_epoch
;
800 call
.end_epoch
= end_epoch
;
802 call
.max_entries
= max_entries
;
803 call
.bucket
= bucket
;
804 call
.iter
= read_iter
;
806 int r
= io_ctx
.exec(oid
, RGW_CLASS
, RGW_USER_USAGE_LOG_READ
, in
, out
);
811 rgw_cls_usage_log_read_ret result
;
812 auto iter
= out
.cbegin();
813 decode(result
, iter
);
814 read_iter
= result
.next_iter
;
816 *is_truncated
= result
.truncated
;
818 usage
= result
.usage
;
819 } catch (ceph::buffer::error
& e
) {
826 int cls_rgw_usage_log_trim(IoCtx
& io_ctx
, const string
& oid
, const string
& user
, const string
& bucket
,
827 uint64_t start_epoch
, uint64_t end_epoch
)
830 rgw_cls_usage_log_trim_op call
;
831 call
.start_epoch
= start_epoch
;
832 call
.end_epoch
= end_epoch
;
834 call
.bucket
= bucket
;
839 ObjectWriteOperation op
;
840 op
.exec(RGW_CLASS
, RGW_USER_USAGE_LOG_TRIM
, in
);
841 int r
= io_ctx
.operate(oid
, &op
);
851 void cls_rgw_usage_log_trim(librados::ObjectWriteOperation
& op
, const string
& user
, const string
& bucket
, uint64_t start_epoch
, uint64_t end_epoch
)
854 rgw_cls_usage_log_trim_op call
;
855 call
.start_epoch
= start_epoch
;
856 call
.end_epoch
= end_epoch
;
858 call
.bucket
= bucket
;
861 op
.exec(RGW_CLASS
, RGW_USER_USAGE_LOG_TRIM
, in
);
864 void cls_rgw_usage_log_clear(ObjectWriteOperation
& op
)
867 op
.exec(RGW_CLASS
, RGW_USAGE_LOG_CLEAR
, in
);
870 void cls_rgw_usage_log_add(ObjectWriteOperation
& op
, rgw_usage_log_info
& info
)
873 rgw_cls_usage_log_add_op call
;
876 op
.exec(RGW_CLASS
, RGW_USER_USAGE_LOG_ADD
, in
);
879 /* garbage collection */
881 void cls_rgw_gc_set_entry(ObjectWriteOperation
& op
, uint32_t expiration_secs
, cls_rgw_gc_obj_info
& info
)
884 cls_rgw_gc_set_entry_op call
;
885 call
.expiration_secs
= expiration_secs
;
888 op
.exec(RGW_CLASS
, RGW_GC_SET_ENTRY
, in
);
891 void cls_rgw_gc_defer_entry(ObjectWriteOperation
& op
, uint32_t expiration_secs
, const string
& tag
)
894 cls_rgw_gc_defer_entry_op call
;
895 call
.expiration_secs
= expiration_secs
;
898 op
.exec(RGW_CLASS
, RGW_GC_DEFER_ENTRY
, in
);
901 int cls_rgw_gc_list(IoCtx
& io_ctx
, string
& oid
, string
& marker
, uint32_t max
, bool expired_only
,
902 list
<cls_rgw_gc_obj_info
>& entries
, bool *truncated
, string
& next_marker
)
905 cls_rgw_gc_list_op call
;
906 call
.marker
= marker
;
908 call
.expired_only
= expired_only
;
910 int r
= io_ctx
.exec(oid
, RGW_CLASS
, RGW_GC_LIST
, in
, out
);
914 cls_rgw_gc_list_ret ret
;
916 auto iter
= out
.cbegin();
918 } catch (ceph::buffer::error
& err
) {
922 entries
.swap(ret
.entries
);
925 *truncated
= ret
.truncated
;
926 next_marker
= std::move(ret
.next_marker
);
930 void cls_rgw_gc_remove(librados::ObjectWriteOperation
& op
, const vector
<string
>& tags
)
933 cls_rgw_gc_remove_op call
;
936 op
.exec(RGW_CLASS
, RGW_GC_REMOVE
, in
);
939 int cls_rgw_lc_get_head(IoCtx
& io_ctx
, const string
& oid
, cls_rgw_lc_obj_head
& head
)
942 int r
= io_ctx
.exec(oid
, RGW_CLASS
, RGW_LC_GET_HEAD
, in
, out
);
946 cls_rgw_lc_get_head_ret ret
;
948 auto iter
= out
.cbegin();
950 } catch (ceph::buffer::error
& err
) {
958 int cls_rgw_lc_put_head(IoCtx
& io_ctx
, const string
& oid
, cls_rgw_lc_obj_head
& head
)
961 cls_rgw_lc_put_head_op call
;
964 int r
= io_ctx
.exec(oid
, RGW_CLASS
, RGW_LC_PUT_HEAD
, in
, out
);
968 int cls_rgw_lc_get_next_entry(IoCtx
& io_ctx
, const string
& oid
, string
& marker
,
969 cls_rgw_lc_entry
& entry
)
972 cls_rgw_lc_get_next_entry_op call
;
973 call
.marker
= marker
;
975 int r
= io_ctx
.exec(oid
, RGW_CLASS
, RGW_LC_GET_NEXT_ENTRY
, in
, out
);
979 cls_rgw_lc_get_next_entry_ret ret
;
981 auto iter
= out
.cbegin();
983 } catch (ceph::buffer::error
& err
) {
991 int cls_rgw_lc_rm_entry(IoCtx
& io_ctx
, const string
& oid
,
992 const cls_rgw_lc_entry
& entry
)
995 cls_rgw_lc_rm_entry_op call
;
998 int r
= io_ctx
.exec(oid
, RGW_CLASS
, RGW_LC_RM_ENTRY
, in
, out
);
1002 int cls_rgw_lc_set_entry(IoCtx
& io_ctx
, const string
& oid
,
1003 const cls_rgw_lc_entry
& entry
)
1006 cls_rgw_lc_set_entry_op call
;
1009 int r
= io_ctx
.exec(oid
, RGW_CLASS
, RGW_LC_SET_ENTRY
, in
, out
);
1013 int cls_rgw_lc_get_entry(IoCtx
& io_ctx
, const string
& oid
,
1014 const std::string
& marker
, cls_rgw_lc_entry
& entry
)
1017 cls_rgw_lc_get_entry_op call
{marker
};;
1019 int r
= io_ctx
.exec(oid
, RGW_CLASS
, RGW_LC_GET_ENTRY
, in
, out
);
1025 cls_rgw_lc_get_entry_ret ret
;
1027 auto iter
= out
.cbegin();
1029 } catch (ceph::buffer::error
& err
) {
1033 entry
= std::move(ret
.entry
);
1037 int cls_rgw_lc_list(IoCtx
& io_ctx
, const string
& oid
,
1038 const string
& marker
,
1039 uint32_t max_entries
,
1040 vector
<cls_rgw_lc_entry
>& entries
)
1043 cls_rgw_lc_list_entries_op op
;
1048 op
.max_entries
= max_entries
;
1052 int r
= io_ctx
.exec(oid
, RGW_CLASS
, RGW_LC_LIST_ENTRIES
, in
, out
);
1056 cls_rgw_lc_list_entries_ret ret
;
1058 auto iter
= out
.cbegin();
1060 } catch (ceph::buffer::error
& err
) {
1064 std::sort(std::begin(ret
.entries
), std::end(ret
.entries
),
1065 [](const cls_rgw_lc_entry
& a
, const cls_rgw_lc_entry
& b
)
1066 { return a
.bucket
< b
.bucket
; });
1067 entries
= std::move(ret
.entries
);
1071 void cls_rgw_reshard_add(librados::ObjectWriteOperation
& op
, const cls_rgw_reshard_entry
& entry
)
1074 cls_rgw_reshard_add_op call
;
1077 op
.exec(RGW_CLASS
, RGW_RESHARD_ADD
, in
);
1080 int cls_rgw_reshard_list(librados::IoCtx
& io_ctx
, const string
& oid
, string
& marker
, uint32_t max
,
1081 list
<cls_rgw_reshard_entry
>& entries
, bool* is_truncated
)
1084 cls_rgw_reshard_list_op call
;
1085 call
.marker
= marker
;
1088 int r
= io_ctx
.exec(oid
, RGW_CLASS
, RGW_RESHARD_LIST
, in
, out
);
1092 cls_rgw_reshard_list_ret op_ret
;
1093 auto iter
= out
.cbegin();
1095 decode(op_ret
, iter
);
1096 } catch (ceph::buffer::error
& err
) {
1100 entries
.swap(op_ret
.entries
);
1101 *is_truncated
= op_ret
.is_truncated
;
1106 int cls_rgw_reshard_get(librados::IoCtx
& io_ctx
, const string
& oid
, cls_rgw_reshard_entry
& entry
)
1109 cls_rgw_reshard_get_op call
;
1112 int r
= io_ctx
.exec(oid
, RGW_CLASS
, RGW_RESHARD_GET
, in
, out
);
1116 cls_rgw_reshard_get_ret op_ret
;
1117 auto iter
= out
.cbegin();
1119 decode(op_ret
, iter
);
1120 } catch (ceph::buffer::error
& err
) {
1124 entry
= op_ret
.entry
;
1129 void cls_rgw_reshard_remove(librados::ObjectWriteOperation
& op
, const cls_rgw_reshard_entry
& entry
)
1132 cls_rgw_reshard_remove_op call
;
1133 call
.tenant
= entry
.tenant
;
1134 call
.bucket_name
= entry
.bucket_name
;
1135 call
.bucket_id
= entry
.bucket_id
;
1137 op
.exec(RGW_CLASS
, RGW_RESHARD_REMOVE
, in
);
1140 int cls_rgw_set_bucket_resharding(librados::IoCtx
& io_ctx
, const string
& oid
,
1141 const cls_rgw_bucket_instance_entry
& entry
)
1144 cls_rgw_set_bucket_resharding_op call
;
1147 return io_ctx
.exec(oid
, RGW_CLASS
, RGW_SET_BUCKET_RESHARDING
, in
, out
);
1150 int cls_rgw_clear_bucket_resharding(librados::IoCtx
& io_ctx
, const string
& oid
)
1153 cls_rgw_clear_bucket_resharding_op call
;
1155 return io_ctx
.exec(oid
, RGW_CLASS
, RGW_CLEAR_BUCKET_RESHARDING
, in
, out
);
1158 int cls_rgw_get_bucket_resharding(librados::IoCtx
& io_ctx
, const string
& oid
,
1159 cls_rgw_bucket_instance_entry
*entry
)
1162 cls_rgw_get_bucket_resharding_op call
;
1164 int r
= io_ctx
.exec(oid
, RGW_CLASS
, RGW_GET_BUCKET_RESHARDING
, in
, out
);
1168 cls_rgw_get_bucket_resharding_ret op_ret
;
1169 auto iter
= out
.cbegin();
1171 decode(op_ret
, iter
);
1172 } catch (ceph::buffer::error
& err
) {
1176 *entry
= op_ret
.new_instance
;
1181 void cls_rgw_guard_bucket_resharding(librados::ObjectOperation
& op
, int ret_err
)
1184 cls_rgw_guard_bucket_resharding_op call
;
1185 call
.ret_err
= ret_err
;
1187 op
.exec(RGW_CLASS
, RGW_GUARD_BUCKET_RESHARDING
, in
);
1190 static bool issue_set_bucket_resharding(librados::IoCtx
& io_ctx
,
1191 const int shard_id
, const string
& oid
,
1192 const cls_rgw_bucket_instance_entry
& entry
,
1193 BucketIndexAioManager
*manager
) {
1195 cls_rgw_set_bucket_resharding_op call
;
1198 librados::ObjectWriteOperation op
;
1199 op
.exec(RGW_CLASS
, RGW_SET_BUCKET_RESHARDING
, in
);
1200 return manager
->aio_operate(io_ctx
, shard_id
, oid
, &op
);
1203 int CLSRGWIssueSetBucketResharding::issue_op(const int shard_id
, const string
& oid
)
1205 return issue_set_bucket_resharding(io_ctx
, shard_id
, oid
, entry
, &manager
);