1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
10 #include <boost/utility/string_ref.hpp>
11 #include <boost/format.hpp>
13 #include "common/errno.h"
14 #include "common/ceph_json.h"
15 #include "include/scope_guard.h"
17 #include "rgw_rados.h"
20 #include "rgw_acl_s3.h"
21 #include "rgw_tag_s3.h"
23 #include "include/types.h"
24 #include "rgw_bucket.h"
26 #include "rgw_string.h"
27 #include "rgw_multi.h"
29 #include "rgw_bucket_sync.h"
31 #include "services/svc_zone.h"
32 #include "services/svc_sys_obj.h"
33 #include "services/svc_bucket.h"
34 #include "services/svc_bucket_sync.h"
35 #include "services/svc_meta.h"
36 #include "services/svc_meta_be_sobj.h"
37 #include "services/svc_user.h"
38 #include "services/svc_cls.h"
39 #include "services/svc_bilog_rados.h"
40 #include "services/svc_datalog_rados.h"
42 #include "include/rados/librados.hpp"
43 // until everything is moved from rgw_common
44 #include "rgw_common.h"
45 #include "rgw_reshard.h"
48 // stolen from src/cls/version/cls_version.cc
49 #define VERSION_ATTR "ceph.objclass.version"
51 #include "cls/user/cls_user_types.h"
55 #define dout_context g_ceph_context
56 #define dout_subsys ceph_subsys_rgw
58 #define BUCKET_TAG_TIMEOUT 30
60 // default number of entries to list with each bucket listing call
61 // (use marker to bridge between calls)
62 static constexpr size_t listing_max_entries
= 1000;
66 * The tenant_name is always returned on purpose. May be empty, of course.
68 static void parse_bucket(const string
& bucket
,
71 string
*bucket_instance
= nullptr /* optional */)
74 * expected format: [tenant/]bucket:bucket_instance
76 int pos
= bucket
.find('/');
78 *tenant_name
= bucket
.substr(0, pos
);
82 string bn
= bucket
.substr(pos
+ 1);
85 *bucket_name
= std::move(bn
);
88 *bucket_name
= bn
.substr(0, pos
);
89 if (bucket_instance
) {
90 *bucket_instance
= bn
.substr(pos
+ 1);
94 * deal with the possible tenant:bucket:bucket_instance case
96 if (tenant_name
->empty()) {
97 pos
= bucket_instance
->find(':');
99 *tenant_name
= *bucket_name
;
100 *bucket_name
= bucket_instance
->substr(0, pos
);
101 *bucket_instance
= bucket_instance
->substr(pos
+ 1);
107 * Note that this is not a reversal of parse_bucket(). That one deals
108 * with the syntax we need in metadata and such. This one deals with
109 * the representation in RADOS pools. We chose '/' because it's not
110 * acceptable in bucket names and thus qualified buckets cannot conflict
111 * with the legacy or S3 buckets.
113 std::string
rgw_make_bucket_entry_name(const std::string
& tenant_name
,
114 const std::string
& bucket_name
) {
115 std::string bucket_entry
;
117 if (bucket_name
.empty()) {
118 bucket_entry
.clear();
119 } else if (tenant_name
.empty()) {
120 bucket_entry
= bucket_name
;
122 bucket_entry
= tenant_name
+ "/" + bucket_name
;
129 * Tenants are separated from buckets in URLs by a colon in S3.
130 * This function is not to be used on Swift URLs, not even for COPY arguments.
132 void rgw_parse_url_bucket(const string
&bucket
, const string
& auth_tenant
,
133 string
&tenant_name
, string
&bucket_name
) {
135 int pos
= bucket
.find(':');
138 * N.B.: We allow ":bucket" syntax with explicit empty tenant in order
139 * to refer to the legacy tenant, in case users in new named tenants
140 * want to access old global buckets.
142 tenant_name
= bucket
.substr(0, pos
);
143 bucket_name
= bucket
.substr(pos
+ 1);
145 tenant_name
= auth_tenant
;
146 bucket_name
= bucket
;
151 * Get all the buckets owned by a user and fill up an RGWUserBuckets with them.
152 * Returns: 0 on success, -ERR# on failure.
154 int rgw_read_user_buckets(rgw::sal::RGWRadosStore
* store
,
155 const rgw_user
& user_id
,
156 rgw::sal::RGWBucketList
& buckets
,
157 const string
& marker
,
158 const string
& end_marker
,
162 rgw::sal::RGWRadosUser
user(store
, user_id
);
163 return user
.list_buckets(marker
, end_marker
, max
, need_stats
, buckets
);
166 int rgw_bucket_parse_bucket_instance(const string
& bucket_instance
, string
*bucket_name
, string
*bucket_id
, int *shard_id
)
168 auto pos
= bucket_instance
.rfind(':');
169 if (pos
== string::npos
) {
173 string first
= bucket_instance
.substr(0, pos
);
174 string second
= bucket_instance
.substr(pos
+ 1);
176 pos
= first
.find(':');
178 if (pos
== string::npos
) {
180 *bucket_name
= first
;
185 *bucket_name
= first
.substr(0, pos
);
186 *bucket_id
= first
.substr(pos
+ 1);
189 *shard_id
= strict_strtol(second
.c_str(), 10, &err
);
197 // parse key in format: [tenant/]name:instance[:shard_id]
198 int rgw_bucket_parse_bucket_key(CephContext
*cct
, const string
& key
,
199 rgw_bucket
*bucket
, int *shard_id
)
201 boost::string_ref name
{key
};
202 boost::string_ref instance
;
205 auto pos
= name
.find('/');
206 if (pos
!= string::npos
) {
207 auto tenant
= name
.substr(0, pos
);
208 bucket
->tenant
.assign(tenant
.begin(), tenant
.end());
209 name
= name
.substr(pos
+ 1);
211 bucket
->tenant
.clear();
214 // split name:instance
215 pos
= name
.find(':');
216 if (pos
!= string::npos
) {
217 instance
= name
.substr(pos
+ 1);
218 name
= name
.substr(0, pos
);
220 bucket
->name
.assign(name
.begin(), name
.end());
222 // split instance:shard
223 pos
= instance
.find(':');
224 if (pos
== string::npos
) {
225 bucket
->bucket_id
.assign(instance
.begin(), instance
.end());
233 auto shard
= instance
.substr(pos
+ 1);
235 auto id
= strict_strtol(shard
.data(), 10, &err
);
238 ldout(cct
, 0) << "ERROR: failed to parse bucket shard '"
239 << instance
.data() << "': " << err
<< dendl
;
247 instance
= instance
.substr(0, pos
);
248 bucket
->bucket_id
.assign(instance
.begin(), instance
.end());
252 static void dump_mulipart_index_results(list
<rgw_obj_index_key
>& objs_to_unlink
,
255 for (const auto& o
: objs_to_unlink
) {
256 f
->dump_string("object", o
.name
);
260 void check_bad_user_bucket_mapping(rgw::sal::RGWRadosStore
*store
, const rgw_user
& user_id
,
263 rgw::sal::RGWBucketList user_buckets
;
264 rgw::sal::RGWRadosUser
user(store
, user_id
);
267 CephContext
*cct
= store
->ctx();
269 size_t max_entries
= cct
->_conf
->rgw_list_buckets_max_chunk
;
272 int ret
= user
.list_buckets(marker
, string(), max_entries
, false, user_buckets
);
274 ldout(store
->ctx(), 0) << "failed to read user buckets: "
275 << cpp_strerror(-ret
) << dendl
;
279 map
<string
, rgw::sal::RGWBucket
*>& buckets
= user_buckets
.get_buckets();
280 for (map
<string
, rgw::sal::RGWBucket
*>::iterator i
= buckets
.begin();
285 rgw::sal::RGWBucket
* bucket
= i
->second
;
287 RGWBucketInfo bucket_info
;
289 int r
= store
->getRados()->get_bucket_info(store
->svc(), user_id
.tenant
, bucket
->get_name(), bucket_info
, &mtime
, null_yield
);
291 ldout(store
->ctx(), 0) << "could not get bucket info for bucket=" << bucket
<< dendl
;
295 rgw_bucket
& actual_bucket
= bucket_info
.bucket
;
297 if (actual_bucket
.name
.compare(bucket
->get_name()) != 0 ||
298 actual_bucket
.tenant
.compare(bucket
->get_tenant()) != 0 ||
299 actual_bucket
.marker
.compare(bucket
->get_marker()) != 0 ||
300 actual_bucket
.bucket_id
.compare(bucket
->get_bucket_id()) != 0) {
301 cout
<< "bucket info mismatch: expected " << actual_bucket
<< " got " << bucket
<< std::endl
;
303 cout
<< "fixing" << std::endl
;
304 r
= store
->ctl()->bucket
->link_bucket(user_id
, actual_bucket
,
305 bucket_info
.creation_time
,
308 cerr
<< "failed to fix bucket: " << cpp_strerror(-r
) << std::endl
;
313 } while (user_buckets
.is_truncated());
316 // note: function type conforms to RGWRados::check_filter_t
317 bool rgw_bucket_object_check_filter(const string
& oid
)
321 return rgw_obj_key::oid_to_key_in_ns(oid
, &key
, ns
);
324 int rgw_remove_object(rgw::sal::RGWRadosStore
*store
, const RGWBucketInfo
& bucket_info
, const rgw_bucket
& bucket
, rgw_obj_key
& key
)
326 RGWObjectCtx
rctx(store
);
328 if (key
.instance
.empty()) {
329 key
.instance
= "null";
332 rgw_obj
obj(bucket
, key
);
334 return store
->getRados()->delete_obj(rctx
, bucket_info
, obj
, bucket_info
.versioning_status());
338 static int rgw_remove_bucket(rgw::sal::RGWRadosStore
*store
, rgw_bucket
& bucket
, bool delete_children
, optional_yield y
)
341 map
<RGWObjCategory
, RGWStorageStats
> stats
;
342 std::vector
<rgw_bucket_dir_entry
> objs
;
343 map
<string
, bool> common_prefixes
;
346 string bucket_ver
, master_ver
;
348 ret
= store
->getRados()->get_bucket_info(store
->svc(), bucket
.tenant
, bucket
.name
, info
, NULL
, null_yield
);
352 ret
= store
->getRados()->get_bucket_stats(info
, RGW_NO_SHARD
, &bucket_ver
, &master_ver
, stats
, NULL
);
356 RGWRados::Bucket
target(store
->getRados(), info
);
357 RGWRados::Bucket::List
list_op(&target
);
358 CephContext
*cct
= store
->ctx();
360 list_op
.params
.list_versions
= true;
361 list_op
.params
.allow_unordered
= true;
363 bool is_truncated
= false;
367 ret
= list_op
.list_objects(listing_max_entries
, &objs
, &common_prefixes
,
368 &is_truncated
, null_yield
);
372 if (!objs
.empty() && !delete_children
) {
373 lderr(store
->ctx()) << "ERROR: could not remove non-empty bucket " << bucket
.name
<< dendl
;
377 for (const auto& obj
: objs
) {
378 rgw_obj_key
key(obj
.key
);
379 ret
= rgw_remove_object(store
, info
, bucket
, key
);
380 if (ret
< 0 && ret
!= -ENOENT
) {
384 } while(is_truncated
);
386 string prefix
, delimiter
;
388 ret
= abort_bucket_multiparts(store
, cct
, info
, prefix
, delimiter
);
393 ret
= store
->ctl()->bucket
->sync_user_stats(info
.owner
, info
);
395 dout(1) << "WARNING: failed sync user stats before bucket delete. ret=" << ret
<< dendl
;
398 RGWObjVersionTracker objv_tracker
;
400 // if we deleted children above we will force delete, as any that
401 // remain is detrius from a prior bug
402 ret
= store
->getRados()->delete_bucket(info
, objv_tracker
, null_yield
, !delete_children
);
404 lderr(store
->ctx()) << "ERROR: could not remove bucket " <<
405 bucket
.name
<< dendl
;
409 ret
= store
->ctl()->bucket
->unlink_bucket(info
.owner
, bucket
, null_yield
, false);
411 lderr(store
->ctx()) << "ERROR: unable to remove user bucket information" << dendl
;
417 static int aio_wait(librados::AioCompletion
*handle
)
419 librados::AioCompletion
*c
= (librados::AioCompletion
*)handle
;
420 c
->wait_for_complete();
421 int ret
= c
->get_return_value();
426 static int drain_handles(list
<librados::AioCompletion
*>& pending
)
429 while (!pending
.empty()) {
430 librados::AioCompletion
*handle
= pending
.front();
432 int r
= aio_wait(handle
);
440 int rgw_remove_bucket_bypass_gc(rgw::sal::RGWRadosStore
*store
, rgw_bucket
& bucket
,
441 int concurrent_max
, bool keep_index_consistent
,
445 map
<RGWObjCategory
, RGWStorageStats
> stats
;
446 std::vector
<rgw_bucket_dir_entry
> objs
;
447 map
<string
, bool> common_prefixes
;
449 RGWObjectCtx
obj_ctx(store
);
450 CephContext
*cct
= store
->ctx();
452 string bucket_ver
, master_ver
;
454 ret
= store
->getRados()->get_bucket_info(store
->svc(), bucket
.tenant
, bucket
.name
, info
, NULL
, null_yield
);
458 ret
= store
->getRados()->get_bucket_stats(info
, RGW_NO_SHARD
, &bucket_ver
, &master_ver
, stats
, NULL
);
462 string prefix
, delimiter
;
464 ret
= abort_bucket_multiparts(store
, cct
, info
, prefix
, delimiter
);
469 RGWRados::Bucket
target(store
->getRados(), info
);
470 RGWRados::Bucket::List
list_op(&target
);
472 list_op
.params
.list_versions
= true;
473 list_op
.params
.allow_unordered
= true;
475 std::list
<librados::AioCompletion
*> handles
;
477 int max_aio
= concurrent_max
;
478 bool is_truncated
= true;
480 while (is_truncated
) {
482 ret
= list_op
.list_objects(listing_max_entries
, &objs
, &common_prefixes
,
483 &is_truncated
, null_yield
);
487 std::vector
<rgw_bucket_dir_entry
>::iterator it
= objs
.begin();
488 for (; it
!= objs
.end(); ++it
) {
489 RGWObjState
*astate
= NULL
;
490 rgw_obj
obj(bucket
, (*it
).key
);
492 ret
= store
->getRados()->get_obj_state(&obj_ctx
, info
, obj
, &astate
, false, y
);
493 if (ret
== -ENOENT
) {
494 dout(1) << "WARNING: cannot find obj state for obj " << obj
.get_oid() << dendl
;
498 lderr(store
->ctx()) << "ERROR: get obj state returned with error " << ret
<< dendl
;
502 if (astate
->manifest
) {
503 RGWObjManifest
& manifest
= *astate
->manifest
;
504 RGWObjManifest::obj_iterator miter
= manifest
.obj_begin();
505 rgw_obj head_obj
= manifest
.get_obj();
506 rgw_raw_obj raw_head_obj
;
507 store
->getRados()->obj_to_raw(info
.placement_rule
, head_obj
, &raw_head_obj
);
510 for (; miter
!= manifest
.obj_end() && max_aio
--; ++miter
) {
512 ret
= drain_handles(handles
);
514 lderr(store
->ctx()) << "ERROR: could not drain handles as aio completion returned with " << ret
<< dendl
;
517 max_aio
= concurrent_max
;
520 rgw_raw_obj last_obj
= miter
.get_location().get_raw_obj(store
->getRados());
521 if (last_obj
== raw_head_obj
) {
522 // have the head obj deleted at the end
526 ret
= store
->getRados()->delete_raw_obj_aio(last_obj
, handles
);
528 lderr(store
->ctx()) << "ERROR: delete obj aio failed with " << ret
<< dendl
;
531 } // for all shadow objs
533 ret
= store
->getRados()->delete_obj_aio(head_obj
, info
, astate
, handles
, keep_index_consistent
, null_yield
);
535 lderr(store
->ctx()) << "ERROR: delete obj aio failed with " << ret
<< dendl
;
541 ret
= drain_handles(handles
);
543 lderr(store
->ctx()) << "ERROR: could not drain handles as aio completion returned with " << ret
<< dendl
;
546 max_aio
= concurrent_max
;
548 obj_ctx
.invalidate(obj
);
549 } // for all RGW objects
552 ret
= drain_handles(handles
);
554 lderr(store
->ctx()) << "ERROR: could not drain handles as aio completion returned with " << ret
<< dendl
;
558 ret
= store
->ctl()->bucket
->sync_user_stats(info
.owner
, info
);
560 dout(1) << "WARNING: failed sync user stats before bucket delete. ret=" << ret
<< dendl
;
563 RGWObjVersionTracker objv_tracker
;
565 // this function can only be run if caller wanted children to be
566 // deleted, so we can ignore the check for children as any that
567 // remain are detritus from a prior bug
568 ret
= store
->getRados()->delete_bucket(info
, objv_tracker
, y
, false);
570 lderr(store
->ctx()) << "ERROR: could not remove bucket " << bucket
.name
<< dendl
;
574 ret
= store
->ctl()->bucket
->unlink_bucket(info
.owner
, bucket
, null_yield
, false);
576 lderr(store
->ctx()) << "ERROR: unable to remove user bucket information" << dendl
;
582 static void set_err_msg(std::string
*sink
, std::string msg
)
584 if (sink
&& !msg
.empty())
588 int RGWBucket::init(rgw::sal::RGWRadosStore
*storage
, RGWBucketAdminOpState
& op_state
,
589 optional_yield y
, std::string
*err_msg
,
590 map
<string
, bufferlist
> *pattrs
)
593 set_err_msg(err_msg
, "no storage!");
599 rgw_user user_id
= op_state
.get_user_id();
600 bucket
.tenant
= user_id
.tenant
;
601 bucket
.name
= op_state
.get_bucket_name();
603 if (bucket
.name
.empty() && user_id
.empty())
606 // split possible tenant/name
607 auto pos
= bucket
.name
.find('/');
608 if (pos
!= string::npos
) {
609 bucket
.tenant
= bucket
.name
.substr(0, pos
);
610 bucket
.name
= bucket
.name
.substr(pos
+ 1);
613 if (!bucket
.name
.empty()) {
614 int r
= store
->ctl()->bucket
->read_bucket_info(
615 bucket
, &bucket_info
, y
,
616 RGWBucketCtl::BucketInstance::GetParams().set_attrs(pattrs
),
619 set_err_msg(err_msg
, "failed to fetch bucket info for bucket=" + bucket
.name
);
623 op_state
.set_bucket(bucket_info
.bucket
);
626 if (!user_id
.empty()) {
627 int r
= store
->ctl()->user
->get_info_by_uid(user_id
, &user_info
, y
);
629 set_err_msg(err_msg
, "failed to fetch user info");
633 op_state
.display_name
= user_info
.display_name
;
640 bool rgw_find_bucket_by_id(CephContext
*cct
, RGWMetadataManager
*mgr
,
641 const string
& marker
, const string
& bucket_id
, rgw_bucket
* bucket_out
)
644 bool truncated
= false;
647 int ret
= mgr
->list_keys_init("bucket.instance", marker
, &handle
);
649 cerr
<< "ERROR: can't get key: " << cpp_strerror(-ret
) << std::endl
;
650 mgr
->list_keys_complete(handle
);
655 ret
= mgr
->list_keys_next(handle
, 1000, keys
, &truncated
);
657 cerr
<< "ERROR: lists_keys_next(): " << cpp_strerror(-ret
) << std::endl
;
658 mgr
->list_keys_complete(handle
);
661 for (list
<string
>::iterator iter
= keys
.begin(); iter
!= keys
.end(); ++iter
) {
663 ret
= rgw_bucket_parse_bucket_key(cct
, s
, bucket_out
, nullptr);
667 if (bucket_id
== bucket_out
->bucket_id
) {
668 mgr
->list_keys_complete(handle
);
673 mgr
->list_keys_complete(handle
);
677 int RGWBucket::link(RGWBucketAdminOpState
& op_state
, optional_yield y
,
678 map
<string
, bufferlist
>& attrs
, std::string
*err_msg
)
680 if (!op_state
.is_user_op()) {
681 set_err_msg(err_msg
, "empty user id");
685 string bucket_id
= op_state
.get_bucket_id();
687 std::string display_name
= op_state
.get_user_display_name();
688 rgw_bucket
& bucket
= op_state
.get_bucket();
689 if (!bucket_id
.empty() && bucket_id
!= bucket
.bucket_id
) {
691 "specified bucket id does not match " + bucket
.bucket_id
);
694 rgw_bucket old_bucket
= bucket
;
695 rgw_user user_id
= op_state
.get_user_id();
696 bucket
.tenant
= user_id
.tenant
;
697 if (!op_state
.new_bucket_name
.empty()) {
698 auto pos
= op_state
.new_bucket_name
.find('/');
699 if (pos
!= string::npos
) {
700 bucket
.tenant
= op_state
.new_bucket_name
.substr(0, pos
);
701 bucket
.name
= op_state
.new_bucket_name
.substr(pos
+ 1);
703 bucket
.name
= op_state
.new_bucket_name
;
707 RGWObjVersionTracker objv_tracker
;
708 RGWObjVersionTracker old_version
= bucket_info
.objv_tracker
;
710 map
<string
, bufferlist
>::iterator aiter
= attrs
.find(RGW_ATTR_ACL
);
711 if (aiter
== attrs
.end()) {
712 // should never happen; only pre-argonaut buckets lacked this.
713 ldout(store
->ctx(), 0) << "WARNING: can't bucket link because no acl on bucket=" << old_bucket
.name
<< dendl
;
715 "While crossing the Anavros you have displeased the goddess Hera."
716 " You must sacrifice your ancient bucket " + bucket
.bucket_id
);
719 bufferlist
& aclbl
= aiter
->second
;
720 RGWAccessControlPolicy policy
;
723 auto iter
= aclbl
.cbegin();
724 decode(policy
, iter
);
725 owner
= policy
.get_owner();
726 } catch (buffer::error
& err
) {
727 set_err_msg(err_msg
, "couldn't decode policy");
731 auto bucket_ctl
= store
->ctl()->bucket
;
732 int r
= bucket_ctl
->unlink_bucket(owner
.get_id(), old_bucket
, y
, false);
734 set_err_msg(err_msg
, "could not unlink policy from user " + owner
.get_id().to_str());
738 // now update the user for the bucket...
739 if (display_name
.empty()) {
740 ldout(store
->ctx(), 0) << "WARNING: user " << user_info
.user_id
<< " has no display name set" << dendl
;
743 RGWAccessControlPolicy policy_instance
;
744 policy_instance
.create_default(user_info
.user_id
, display_name
);
745 owner
= policy_instance
.get_owner();
748 policy_instance
.encode(aclbl
);
750 auto instance_params
= RGWBucketCtl::BucketInstance::PutParams().set_attrs(&attrs
);
752 bucket_info
.owner
= user_info
.user_id
;
753 if (bucket
!= old_bucket
) {
754 bucket_info
.bucket
= bucket
;
755 bucket_info
.objv_tracker
.version_for_read()->ver
= 0;
756 instance_params
.set_exclusive(true);
759 r
= bucket_ctl
->store_bucket_instance_info(bucket
, bucket_info
, y
, instance_params
);
761 set_err_msg(err_msg
, "ERROR: failed writing bucket instance info: " + cpp_strerror(-r
));
765 RGWBucketEntryPoint ep
;
766 ep
.bucket
= bucket_info
.bucket
;
767 ep
.owner
= user_info
.user_id
;
768 ep
.creation_time
= bucket_info
.creation_time
;
770 map
<string
, bufferlist
> ep_attrs
;
771 rgw_ep_info ep_data
{ep
, ep_attrs
};
774 r
= store
->ctl()->bucket
->link_bucket(user_info
.user_id
,
779 set_err_msg(err_msg
, "failed to relink bucket");
783 if (bucket
!= old_bucket
) {
784 // like RGWRados::delete_bucket -- excepting no bucket_index work.
785 r
= bucket_ctl
->remove_bucket_entrypoint_info(old_bucket
, y
,
786 RGWBucketCtl::Bucket::RemoveParams()
787 .set_objv_tracker(&ep_data
.ep_objv
));
789 set_err_msg(err_msg
, "failed to unlink old bucket endpoint " + old_bucket
.tenant
+ "/" + old_bucket
.name
);
793 r
= bucket_ctl
->remove_bucket_instance_info(old_bucket
, bucket_info
, y
,
794 RGWBucketCtl::BucketInstance::RemoveParams()
795 .set_objv_tracker(&old_version
));
797 set_err_msg(err_msg
, "failed to unlink old bucket info");
805 int RGWBucket::chown(RGWBucketAdminOpState
& op_state
, const string
& marker
,
806 optional_yield y
, std::string
*err_msg
)
808 int ret
= store
->ctl()->bucket
->chown(store
, bucket_info
, user_info
.user_id
,
809 user_info
.display_name
, marker
, y
);
811 set_err_msg(err_msg
, "Failed to change object ownership: " + cpp_strerror(-ret
));
817 int RGWBucket::unlink(RGWBucketAdminOpState
& op_state
, optional_yield y
, std::string
*err_msg
)
819 rgw_bucket bucket
= op_state
.get_bucket();
821 if (!op_state
.is_user_op()) {
822 set_err_msg(err_msg
, "could not fetch user or user bucket info");
826 int r
= store
->ctl()->bucket
->unlink_bucket(user_info
.user_id
, bucket
, y
);
828 set_err_msg(err_msg
, "error unlinking bucket" + cpp_strerror(-r
));
834 int RGWBucket::set_quota(RGWBucketAdminOpState
& op_state
, std::string
*err_msg
)
836 rgw_bucket bucket
= op_state
.get_bucket();
837 RGWBucketInfo bucket_info
;
838 map
<string
, bufferlist
> attrs
;
839 int r
= store
->getRados()->get_bucket_info(store
->svc(), bucket
.tenant
, bucket
.name
, bucket_info
, NULL
, null_yield
, &attrs
);
841 set_err_msg(err_msg
, "could not get bucket info for bucket=" + bucket
.name
+ ": " + cpp_strerror(-r
));
845 bucket_info
.quota
= op_state
.quota
;
846 r
= store
->getRados()->put_bucket_instance_info(bucket_info
, false, real_time(), &attrs
);
848 set_err_msg(err_msg
, "ERROR: failed writing bucket instance info: " + cpp_strerror(-r
));
854 int RGWBucket::remove(RGWBucketAdminOpState
& op_state
, optional_yield y
, bool bypass_gc
,
855 bool keep_index_consistent
, std::string
*err_msg
)
857 bool delete_children
= op_state
.will_delete_children();
858 rgw_bucket bucket
= op_state
.get_bucket();
862 if (delete_children
) {
863 ret
= rgw_remove_bucket_bypass_gc(store
, bucket
, op_state
.get_max_aio(), keep_index_consistent
, y
);
865 set_err_msg(err_msg
, "purge objects should be set for gc to be bypassed");
869 ret
= rgw_remove_bucket(store
, bucket
, delete_children
, y
);
873 set_err_msg(err_msg
, "unable to remove bucket" + cpp_strerror(-ret
));
880 int RGWBucket::remove_object(RGWBucketAdminOpState
& op_state
, std::string
*err_msg
)
882 rgw_bucket bucket
= op_state
.get_bucket();
883 std::string object_name
= op_state
.get_object_name();
885 rgw_obj_key
key(object_name
);
887 int ret
= rgw_remove_object(store
, bucket_info
, bucket
, key
);
889 set_err_msg(err_msg
, "unable to remove object" + cpp_strerror(-ret
));
896 static void dump_bucket_index(const RGWRados::ent_map_t
& result
, Formatter
*f
)
898 for (auto iter
= result
.begin(); iter
!= result
.end(); ++iter
) {
899 f
->dump_string("object", iter
->first
);
903 static void dump_bucket_usage(map
<RGWObjCategory
, RGWStorageStats
>& stats
, Formatter
*formatter
)
905 map
<RGWObjCategory
, RGWStorageStats
>::iterator iter
;
907 formatter
->open_object_section("usage");
908 for (iter
= stats
.begin(); iter
!= stats
.end(); ++iter
) {
909 RGWStorageStats
& s
= iter
->second
;
910 const char *cat_name
= rgw_obj_category_name(iter
->first
);
911 formatter
->open_object_section(cat_name
);
913 formatter
->close_section();
915 formatter
->close_section();
918 static void dump_index_check(map
<RGWObjCategory
, RGWStorageStats
> existing_stats
,
919 map
<RGWObjCategory
, RGWStorageStats
> calculated_stats
,
920 Formatter
*formatter
)
922 formatter
->open_object_section("check_result");
923 formatter
->open_object_section("existing_header");
924 dump_bucket_usage(existing_stats
, formatter
);
925 formatter
->close_section();
926 formatter
->open_object_section("calculated_header");
927 dump_bucket_usage(calculated_stats
, formatter
);
928 formatter
->close_section();
929 formatter
->close_section();
932 int RGWBucket::check_bad_index_multipart(RGWBucketAdminOpState
& op_state
,
933 RGWFormatterFlusher
& flusher
,std::string
*err_msg
)
935 bool fix_index
= op_state
.will_fix_index();
936 rgw_bucket bucket
= op_state
.get_bucket();
938 map
<string
, bool> common_prefixes
;
941 map
<string
, bool> meta_objs
;
942 map
<rgw_obj_index_key
, string
> all_objs
;
944 RGWBucketInfo bucket_info
;
945 auto obj_ctx
= store
->svc()->sysobj
->init_obj_ctx();
946 int r
= store
->getRados()->get_bucket_instance_info(obj_ctx
, bucket
, bucket_info
, nullptr, nullptr, null_yield
);
948 ldout(store
->ctx(), 0) << "ERROR: " << __func__
<< "(): get_bucket_instance_info(bucket=" << bucket
<< ") returned r=" << r
<< dendl
;
952 RGWRados::Bucket
target(store
->getRados(), bucket_info
);
953 RGWRados::Bucket::List
list_op(&target
);
955 list_op
.params
.list_versions
= true;
956 list_op
.params
.ns
= RGW_OBJ_NS_MULTIPART
;
959 vector
<rgw_bucket_dir_entry
> result
;
960 int r
= list_op
.list_objects(listing_max_entries
, &result
,
961 &common_prefixes
, &is_truncated
, null_yield
);
963 set_err_msg(err_msg
, "failed to list objects in bucket=" + bucket
.name
+
964 " err=" + cpp_strerror(-r
));
969 vector
<rgw_bucket_dir_entry
>::iterator iter
;
970 for (iter
= result
.begin(); iter
!= result
.end(); ++iter
) {
971 rgw_obj_index_key key
= iter
->key
;
972 rgw_obj
obj(bucket
, key
);
973 string oid
= obj
.get_oid();
975 int pos
= oid
.find_last_of('.');
977 /* obj has no suffix */
981 string name
= oid
.substr(0, pos
);
982 string suffix
= oid
.substr(pos
+ 1);
984 if (suffix
.compare("meta") == 0) {
985 meta_objs
[name
] = true;
987 all_objs
[key
] = name
;
991 } while (is_truncated
);
993 list
<rgw_obj_index_key
> objs_to_unlink
;
994 Formatter
*f
= flusher
.get_formatter();
996 f
->open_array_section("invalid_multipart_entries");
998 for (auto aiter
= all_objs
.begin(); aiter
!= all_objs
.end(); ++aiter
) {
999 string
& name
= aiter
->second
;
1001 if (meta_objs
.find(name
) == meta_objs
.end()) {
1002 objs_to_unlink
.push_back(aiter
->first
);
1005 if (objs_to_unlink
.size() > listing_max_entries
) {
1007 int r
= store
->getRados()->remove_objs_from_index(bucket_info
, objs_to_unlink
);
1009 set_err_msg(err_msg
, "ERROR: remove_obj_from_index() returned error: " +
1015 dump_mulipart_index_results(objs_to_unlink
, flusher
.get_formatter());
1017 objs_to_unlink
.clear();
1022 int r
= store
->getRados()->remove_objs_from_index(bucket_info
, objs_to_unlink
);
1024 set_err_msg(err_msg
, "ERROR: remove_obj_from_index() returned error: " +
1031 dump_mulipart_index_results(objs_to_unlink
, f
);
1038 int RGWBucket::check_object_index(RGWBucketAdminOpState
& op_state
,
1039 RGWFormatterFlusher
& flusher
,
1041 std::string
*err_msg
)
1044 bool fix_index
= op_state
.will_fix_index();
1047 set_err_msg(err_msg
, "check-objects flag requires fix index enabled");
1051 store
->getRados()->cls_obj_set_bucket_tag_timeout(bucket_info
, BUCKET_TAG_TIMEOUT
);
1054 string empty_delimiter
;
1055 rgw_obj_index_key marker
;
1056 bool is_truncated
= true;
1057 bool cls_filtered
= true;
1059 Formatter
*formatter
= flusher
.get_formatter();
1060 formatter
->open_object_section("objects");
1061 uint16_t expansion_factor
= 1;
1062 while (is_truncated
) {
1063 RGWRados::ent_map_t result
;
1064 result
.reserve(listing_max_entries
);
1066 int r
= store
->getRados()->cls_bucket_list_ordered(
1067 bucket_info
, RGW_NO_SHARD
, marker
, prefix
, empty_delimiter
,
1068 listing_max_entries
, true, expansion_factor
,
1069 result
, &is_truncated
, &cls_filtered
, &marker
,
1070 y
, rgw_bucket_object_check_filter
);
1073 } else if (r
< 0 && r
!= -ENOENT
) {
1074 set_err_msg(err_msg
, "ERROR: failed operation r=" + cpp_strerror(-r
));
1077 if (result
.size() < listing_max_entries
/ 8) {
1079 } else if (result
.size() > listing_max_entries
* 7 / 8 &&
1080 expansion_factor
> 1) {
1084 dump_bucket_index(result
, formatter
);
1088 formatter
->close_section();
1090 store
->getRados()->cls_obj_set_bucket_tag_timeout(bucket_info
, 0);
1096 int RGWBucket::check_index(RGWBucketAdminOpState
& op_state
,
1097 map
<RGWObjCategory
, RGWStorageStats
>& existing_stats
,
1098 map
<RGWObjCategory
, RGWStorageStats
>& calculated_stats
,
1099 std::string
*err_msg
)
1101 bool fix_index
= op_state
.will_fix_index();
1103 int r
= store
->getRados()->bucket_check_index(bucket_info
, &existing_stats
, &calculated_stats
);
1105 set_err_msg(err_msg
, "failed to check index error=" + cpp_strerror(-r
));
1110 r
= store
->getRados()->bucket_rebuild_index(bucket_info
);
1112 set_err_msg(err_msg
, "failed to rebuild index err=" + cpp_strerror(-r
));
1120 int RGWBucket::sync(RGWBucketAdminOpState
& op_state
, map
<string
, bufferlist
> *attrs
, std::string
*err_msg
)
1122 if (!store
->svc()->zone
->is_meta_master()) {
1123 set_err_msg(err_msg
, "ERROR: failed to update bucket sync: only allowed on meta master zone");
1126 bool sync
= op_state
.will_sync_bucket();
1128 bucket_info
.flags
&= ~BUCKET_DATASYNC_DISABLED
;
1130 bucket_info
.flags
|= BUCKET_DATASYNC_DISABLED
;
1133 int r
= store
->getRados()->put_bucket_instance_info(bucket_info
, false, real_time(), attrs
);
1135 set_err_msg(err_msg
, "ERROR: failed writing bucket instance info:" + cpp_strerror(-r
));
1139 int shards_num
= bucket_info
.num_shards
? bucket_info
.num_shards
: 1;
1140 int shard_id
= bucket_info
.num_shards
? 0 : -1;
1143 r
= store
->svc()->bilog_rados
->log_stop(bucket_info
, -1);
1145 set_err_msg(err_msg
, "ERROR: failed writing stop bilog:" + cpp_strerror(-r
));
1149 r
= store
->svc()->bilog_rados
->log_start(bucket_info
, -1);
1151 set_err_msg(err_msg
, "ERROR: failed writing resync bilog:" + cpp_strerror(-r
));
1156 for (int i
= 0; i
< shards_num
; ++i
, ++shard_id
) {
1157 r
= store
->svc()->datalog_rados
->add_entry(bucket_info
, shard_id
);
1159 set_err_msg(err_msg
, "ERROR: failed writing data log:" + cpp_strerror(-r
));
1168 int RGWBucket::policy_bl_to_stream(bufferlist
& bl
, ostream
& o
)
1170 RGWAccessControlPolicy_S3
policy(g_ceph_context
);
1171 int ret
= decode_bl(bl
, policy
);
1173 ldout(store
->ctx(),0) << "failed to decode RGWAccessControlPolicy" << dendl
;
1179 int rgw_object_get_attr(rgw::sal::RGWRadosStore
* store
, const RGWBucketInfo
& bucket_info
,
1180 const rgw_obj
& obj
, const char* attr_name
,
1181 bufferlist
& out_bl
, optional_yield y
)
1183 RGWObjectCtx
obj_ctx(store
);
1184 RGWRados::Object
op_target(store
->getRados(), bucket_info
, obj_ctx
, obj
);
1185 RGWRados::Object::Read
rop(&op_target
);
1187 return rop
.get_attr(attr_name
, out_bl
, y
);
1190 int RGWBucket::get_policy(RGWBucketAdminOpState
& op_state
, RGWAccessControlPolicy
& policy
, optional_yield y
)
1192 std::string object_name
= op_state
.get_object_name();
1193 rgw_bucket bucket
= op_state
.get_bucket();
1195 RGWBucketInfo bucket_info
;
1196 map
<string
, bufferlist
> attrs
;
1197 int ret
= store
->getRados()->get_bucket_info(store
->svc(), bucket
.tenant
, bucket
.name
, bucket_info
, NULL
, null_yield
, &attrs
);
1202 if (!object_name
.empty()) {
1204 rgw_obj
obj(bucket
, object_name
);
1206 ret
= rgw_object_get_attr(store
, bucket_info
, obj
, RGW_ATTR_ACL
, bl
, y
);
1211 ret
= decode_bl(bl
, policy
);
1213 ldout(store
->ctx(),0) << "failed to decode RGWAccessControlPolicy" << dendl
;
1218 map
<string
, bufferlist
>::iterator aiter
= attrs
.find(RGW_ATTR_ACL
);
1219 if (aiter
== attrs
.end()) {
1223 ret
= decode_bl(aiter
->second
, policy
);
1225 ldout(store
->ctx(),0) << "failed to decode RGWAccessControlPolicy" << dendl
;
1232 int RGWBucketAdminOp::get_policy(rgw::sal::RGWRadosStore
*store
, RGWBucketAdminOpState
& op_state
,
1233 RGWAccessControlPolicy
& policy
)
1237 int ret
= bucket
.init(store
, op_state
, null_yield
);
1241 ret
= bucket
.get_policy(op_state
, policy
, null_yield
);
1248 /* Wrappers to facilitate RESTful interface */
1251 int RGWBucketAdminOp::get_policy(rgw::sal::RGWRadosStore
*store
, RGWBucketAdminOpState
& op_state
,
1252 RGWFormatterFlusher
& flusher
)
1254 RGWAccessControlPolicy
policy(store
->ctx());
1256 int ret
= get_policy(store
, op_state
, policy
);
1260 Formatter
*formatter
= flusher
.get_formatter();
1264 formatter
->open_object_section("policy");
1265 policy
.dump(formatter
);
1266 formatter
->close_section();
1273 int RGWBucketAdminOp::dump_s3_policy(rgw::sal::RGWRadosStore
*store
, RGWBucketAdminOpState
& op_state
,
1276 RGWAccessControlPolicy_S3
policy(store
->ctx());
1278 int ret
= get_policy(store
, op_state
, policy
);
1287 int RGWBucketAdminOp::unlink(rgw::sal::RGWRadosStore
*store
, RGWBucketAdminOpState
& op_state
)
1291 int ret
= bucket
.init(store
, op_state
, null_yield
);
1295 return bucket
.unlink(op_state
, null_yield
);
1298 int RGWBucketAdminOp::link(rgw::sal::RGWRadosStore
*store
, RGWBucketAdminOpState
& op_state
, string
*err
)
1301 map
<string
, bufferlist
> attrs
;
1303 int ret
= bucket
.init(store
, op_state
, null_yield
, err
, &attrs
);
1307 return bucket
.link(op_state
, null_yield
, attrs
, err
);
1311 int RGWBucketAdminOp::chown(rgw::sal::RGWRadosStore
*store
, RGWBucketAdminOpState
& op_state
, const string
& marker
, string
*err
)
1314 map
<string
, bufferlist
> attrs
;
1316 int ret
= bucket
.init(store
, op_state
, null_yield
, err
, &attrs
);
1320 ret
= bucket
.link(op_state
, null_yield
, attrs
, err
);
1324 return bucket
.chown(op_state
, marker
, null_yield
, err
);
1328 int RGWBucketAdminOp::check_index(rgw::sal::RGWRadosStore
*store
, RGWBucketAdminOpState
& op_state
,
1329 RGWFormatterFlusher
& flusher
, optional_yield y
)
1332 map
<RGWObjCategory
, RGWStorageStats
> existing_stats
;
1333 map
<RGWObjCategory
, RGWStorageStats
> calculated_stats
;
1338 ret
= bucket
.init(store
, op_state
, null_yield
);
1342 Formatter
*formatter
= flusher
.get_formatter();
1345 ret
= bucket
.check_bad_index_multipart(op_state
, flusher
);
1349 ret
= bucket
.check_object_index(op_state
, flusher
, y
);
1353 ret
= bucket
.check_index(op_state
, existing_stats
, calculated_stats
);
1357 dump_index_check(existing_stats
, calculated_stats
, formatter
);
1363 int RGWBucketAdminOp::remove_bucket(rgw::sal::RGWRadosStore
*store
, RGWBucketAdminOpState
& op_state
,
1364 optional_yield y
, bool bypass_gc
, bool keep_index_consistent
)
1368 int ret
= bucket
.init(store
, op_state
, y
);
1372 std::string err_msg
;
1373 ret
= bucket
.remove(op_state
, y
, bypass_gc
, keep_index_consistent
, &err_msg
);
1374 if (!err_msg
.empty()) {
1375 lderr(store
->ctx()) << "ERROR: " << err_msg
<< dendl
;
1380 int RGWBucketAdminOp::remove_object(rgw::sal::RGWRadosStore
*store
, RGWBucketAdminOpState
& op_state
)
1384 int ret
= bucket
.init(store
, op_state
, null_yield
);
1388 return bucket
.remove_object(op_state
);
1391 int RGWBucketAdminOp::sync_bucket(rgw::sal::RGWRadosStore
*store
, RGWBucketAdminOpState
& op_state
, string
*err_msg
)
1394 map
<string
, bufferlist
> attrs
;
1395 int ret
= bucket
.init(store
, op_state
, null_yield
, err_msg
, &attrs
);
1400 return bucket
.sync(op_state
, &attrs
, err_msg
);
1403 static int bucket_stats(rgw::sal::RGWRadosStore
*store
,
1404 const std::string
& tenant_name
,
1405 const std::string
& bucket_name
,
1406 Formatter
*formatter
)
1408 RGWBucketInfo bucket_info
;
1409 map
<RGWObjCategory
, RGWStorageStats
> stats
;
1410 map
<string
, bufferlist
> attrs
;
1413 int r
= store
->getRados()->get_bucket_info(store
->svc(),
1414 tenant_name
, bucket_name
, bucket_info
,
1415 &mtime
, null_yield
, &attrs
);
1420 rgw_bucket
& bucket
= bucket_info
.bucket
;
1422 string bucket_ver
, master_ver
;
1424 int ret
= store
->getRados()->get_bucket_stats(bucket_info
, RGW_NO_SHARD
,
1425 &bucket_ver
, &master_ver
, stats
,
1428 cerr
<< "error getting bucket stats bucket=" << bucket
.name
<< " ret=" << ret
<< std::endl
;
1433 utime_t
ctime_ut(bucket_info
.creation_time
);
1435 formatter
->open_object_section("stats");
1436 formatter
->dump_string("bucket", bucket
.name
);
1437 formatter
->dump_int("num_shards", bucket_info
.num_shards
);
1438 formatter
->dump_string("tenant", bucket
.tenant
);
1439 formatter
->dump_string("zonegroup", bucket_info
.zonegroup
);
1440 formatter
->dump_string("placement_rule", bucket_info
.placement_rule
.to_str());
1441 ::encode_json("explicit_placement", bucket
.explicit_placement
, formatter
);
1442 formatter
->dump_string("id", bucket
.bucket_id
);
1443 formatter
->dump_string("marker", bucket
.marker
);
1444 formatter
->dump_stream("index_type") << bucket_info
.index_type
;
1445 ::encode_json("owner", bucket_info
.owner
, formatter
);
1446 formatter
->dump_string("ver", bucket_ver
);
1447 formatter
->dump_string("master_ver", master_ver
);
1448 ut
.gmtime(formatter
->dump_stream("mtime"));
1449 ctime_ut
.gmtime(formatter
->dump_stream("creation_time"));
1450 formatter
->dump_string("max_marker", max_marker
);
1451 dump_bucket_usage(stats
, formatter
);
1452 encode_json("bucket_quota", bucket_info
.quota
, formatter
);
1455 auto iter
= attrs
.find(RGW_ATTR_TAGS
);
1456 if (iter
!= attrs
.end()) {
1457 RGWObjTagSet_S3 tagset
;
1458 bufferlist::const_iterator piter
{&iter
->second
};
1460 tagset
.decode(piter
);
1461 tagset
.dump(formatter
);
1462 } catch (buffer::error
& err
) {
1463 cerr
<< "ERROR: caught buffer:error, couldn't decode TagSet" << std::endl
;
1467 // TODO: bucket CORS
1469 formatter
->close_section();
1474 int RGWBucketAdminOp::limit_check(rgw::sal::RGWRadosStore
*store
,
1475 RGWBucketAdminOpState
& op_state
,
1476 const std::list
<std::string
>& user_ids
,
1477 RGWFormatterFlusher
& flusher
,
1481 const size_t max_entries
=
1482 store
->ctx()->_conf
->rgw_list_buckets_max_chunk
;
1484 const size_t safe_max_objs_per_shard
=
1485 store
->ctx()->_conf
->rgw_safe_max_objects_per_shard
;
1487 uint16_t shard_warn_pct
=
1488 store
->ctx()->_conf
->rgw_shard_warning_threshold
;
1489 if (shard_warn_pct
> 100)
1490 shard_warn_pct
= 90;
1492 Formatter
*formatter
= flusher
.get_formatter();
1495 formatter
->open_array_section("users");
1497 for (const auto& user_id
: user_ids
) {
1499 formatter
->open_object_section("user");
1500 formatter
->dump_string("user_id", user_id
);
1501 formatter
->open_array_section("buckets");
1504 rgw::sal::RGWBucketList buckets
;
1506 rgw::sal::RGWRadosUser
user(store
, rgw_user(user_id
));
1508 ret
= user
.list_buckets(marker
, string(), max_entries
, false, buckets
);
1513 map
<string
, rgw::sal::RGWBucket
*>& m_buckets
= buckets
.get_buckets();
1515 for (const auto& iter
: m_buckets
) {
1516 auto bucket
= iter
.second
;
1517 uint32_t num_shards
= 1;
1518 uint64_t num_objects
= 0;
1520 /* need info for num_shards */
1523 marker
= bucket
->get_name(); /* Casey's location for marker update,
1524 * as we may now not reach the end of
1527 ret
= store
->getRados()->get_bucket_info(store
->svc(), bucket
->get_tenant(),
1528 bucket
->get_name(), info
, nullptr,
1533 /* need stats for num_entries */
1534 string bucket_ver
, master_ver
;
1535 std::map
<RGWObjCategory
, RGWStorageStats
> stats
;
1536 ret
= store
->getRados()->get_bucket_stats(info
, RGW_NO_SHARD
, &bucket_ver
,
1537 &master_ver
, stats
, nullptr);
1542 for (const auto& s
: stats
) {
1543 num_objects
+= s
.second
.num_objects
;
1546 num_shards
= info
.num_shards
;
1547 uint64_t objs_per_shard
=
1548 (num_shards
) ? num_objects
/num_shards
: num_objects
;
1552 uint64_t fill_pct
= objs_per_shard
* 100 / safe_max_objs_per_shard
;
1553 if (fill_pct
> 100) {
1554 ss
<< "OVER " << fill_pct
<< "%";
1556 } else if (fill_pct
>= shard_warn_pct
) {
1557 ss
<< "WARN " << fill_pct
<< "%";
1564 if (warn
|| !warnings_only
) {
1565 formatter
->open_object_section("bucket");
1566 formatter
->dump_string("bucket", bucket
->get_name());
1567 formatter
->dump_string("tenant", bucket
->get_tenant());
1568 formatter
->dump_int("num_objects", num_objects
);
1569 formatter
->dump_int("num_shards", num_shards
);
1570 formatter
->dump_int("objects_per_shard", objs_per_shard
);
1571 formatter
->dump_string("fill_status", ss
.str());
1572 formatter
->close_section();
1576 formatter
->flush(cout
);
1577 } while (buckets
.is_truncated()); /* foreach: bucket */
1579 formatter
->close_section();
1580 formatter
->close_section();
1581 formatter
->flush(cout
);
1583 } /* foreach: user_id */
1585 formatter
->close_section();
1586 formatter
->flush(cout
);
1589 } /* RGWBucketAdminOp::limit_check */
1591 int RGWBucketAdminOp::info(rgw::sal::RGWRadosStore
*store
,
1592 RGWBucketAdminOpState
& op_state
,
1593 RGWFormatterFlusher
& flusher
)
1597 const std::string
& bucket_name
= op_state
.get_bucket_name();
1598 if (!bucket_name
.empty()) {
1599 ret
= bucket
.init(store
, op_state
, null_yield
);
1601 return -ERR_NO_SUCH_BUCKET
;
1606 Formatter
*formatter
= flusher
.get_formatter();
1609 CephContext
*cct
= store
->ctx();
1611 const size_t max_entries
= cct
->_conf
->rgw_list_buckets_max_chunk
;
1613 const bool show_stats
= op_state
.will_fetch_stats();
1614 const rgw_user
& user_id
= op_state
.get_user_id();
1615 if (op_state
.is_user_op()) {
1616 formatter
->open_array_section("buckets");
1618 rgw::sal::RGWBucketList buckets
;
1619 rgw::sal::RGWRadosUser
user(store
, op_state
.get_user_id());
1621 const std::string empty_end_marker
;
1622 constexpr bool no_need_stats
= false; // set need_stats to false
1626 ret
= user
.list_buckets(marker
, empty_end_marker
, max_entries
,
1627 no_need_stats
, buckets
);
1632 const std::string
* marker_cursor
= nullptr;
1633 map
<string
, rgw::sal::RGWBucket
*>& m
= buckets
.get_buckets();
1635 for (const auto& i
: m
) {
1636 const std::string
& obj_name
= i
.first
;
1637 if (!bucket_name
.empty() && bucket_name
!= obj_name
) {
1642 bucket_stats(store
, user_id
.tenant
, obj_name
, formatter
);
1644 formatter
->dump_string("bucket", obj_name
);
1647 marker_cursor
= &obj_name
;
1649 if (marker_cursor
) {
1650 marker
= *marker_cursor
;
1654 } while (buckets
.is_truncated());
1656 formatter
->close_section();
1657 } else if (!bucket_name
.empty()) {
1658 ret
= bucket_stats(store
, user_id
.tenant
, bucket_name
, formatter
);
1663 void *handle
= nullptr;
1664 bool truncated
= true;
1666 formatter
->open_array_section("buckets");
1667 ret
= store
->ctl()->meta
.mgr
->list_keys_init("bucket", &handle
);
1668 while (ret
== 0 && truncated
) {
1669 std::list
<std::string
> buckets
;
1670 constexpr int max_keys
= 1000;
1671 ret
= store
->ctl()->meta
.mgr
->list_keys_next(handle
, max_keys
, buckets
,
1673 for (auto& bucket_name
: buckets
) {
1675 bucket_stats(store
, user_id
.tenant
, bucket_name
, formatter
);
1677 formatter
->dump_string("bucket", bucket_name
);
1681 store
->ctl()->meta
.mgr
->list_keys_complete(handle
);
1683 formatter
->close_section();
1691 int RGWBucketAdminOp::set_quota(rgw::sal::RGWRadosStore
*store
, RGWBucketAdminOpState
& op_state
)
1695 int ret
= bucket
.init(store
, op_state
, null_yield
);
1698 return bucket
.set_quota(op_state
);
1701 static int purge_bucket_instance(rgw::sal::RGWRadosStore
*store
, const RGWBucketInfo
& bucket_info
)
1703 int max_shards
= (bucket_info
.num_shards
> 0 ? bucket_info
.num_shards
: 1);
1704 for (int i
= 0; i
< max_shards
; i
++) {
1705 RGWRados::BucketShard
bs(store
->getRados());
1706 int shard_id
= (bucket_info
.num_shards
> 0 ? i
: -1);
1707 int ret
= bs
.init(bucket_info
.bucket
, shard_id
, nullptr);
1709 cerr
<< "ERROR: bs.init(bucket=" << bucket_info
.bucket
<< ", shard=" << shard_id
1710 << "): " << cpp_strerror(-ret
) << std::endl
;
1713 ret
= store
->getRados()->bi_remove(bs
);
1715 cerr
<< "ERROR: failed to remove bucket index object: "
1716 << cpp_strerror(-ret
) << std::endl
;
1723 inline auto split_tenant(const std::string
& bucket_name
){
1724 auto p
= bucket_name
.find('/');
1725 if(p
!= std::string::npos
) {
1726 return std::make_pair(bucket_name
.substr(0,p
), bucket_name
.substr(p
+1));
1728 return std::make_pair(std::string(), bucket_name
);
1731 using bucket_instance_ls
= std::vector
<RGWBucketInfo
>;
1732 void get_stale_instances(rgw::sal::RGWRadosStore
*store
, const std::string
& bucket_name
,
1733 const vector
<std::string
>& lst
,
1734 bucket_instance_ls
& stale_instances
)
1737 auto obj_ctx
= store
->svc()->sysobj
->init_obj_ctx();
1739 bucket_instance_ls other_instances
;
1740 // first iterate over the entries, and pick up the done buckets; these
1741 // are guaranteed to be stale
1742 for (const auto& bucket_instance
: lst
){
1743 RGWBucketInfo binfo
;
1744 int r
= store
->getRados()->get_bucket_instance_info(obj_ctx
, bucket_instance
,
1745 binfo
, nullptr,nullptr, null_yield
);
1747 // this can only happen if someone deletes us right when we're processing
1748 lderr(store
->ctx()) << "Bucket instance is invalid: " << bucket_instance
1749 << cpp_strerror(-r
) << dendl
;
1752 if (binfo
.reshard_status
== cls_rgw_reshard_status::DONE
)
1753 stale_instances
.emplace_back(std::move(binfo
));
1755 other_instances
.emplace_back(std::move(binfo
));
1759 // Read the cur bucket info, if the bucket doesn't exist we can simply return
1760 // all the instances
1761 auto [tenant
, bucket
] = split_tenant(bucket_name
);
1762 RGWBucketInfo cur_bucket_info
;
1763 int r
= store
->getRados()->get_bucket_info(store
->svc(), tenant
, bucket
, cur_bucket_info
, nullptr, null_yield
);
1766 // bucket doesn't exist, everything is stale then
1767 stale_instances
.insert(std::end(stale_instances
),
1768 std::make_move_iterator(other_instances
.begin()),
1769 std::make_move_iterator(other_instances
.end()));
1771 // all bets are off if we can't read the bucket, just return the sureshot stale instances
1772 lderr(store
->ctx()) << "error: reading bucket info for bucket: "
1773 << bucket
<< cpp_strerror(-r
) << dendl
;
1778 // Don't process further in this round if bucket is resharding
1779 if (cur_bucket_info
.reshard_status
== cls_rgw_reshard_status::IN_PROGRESS
)
1782 other_instances
.erase(std::remove_if(other_instances
.begin(), other_instances
.end(),
1783 [&cur_bucket_info
](const RGWBucketInfo
& b
){
1784 return (b
.bucket
.bucket_id
== cur_bucket_info
.bucket
.bucket_id
||
1785 b
.bucket
.bucket_id
== cur_bucket_info
.new_bucket_instance_id
);
1787 other_instances
.end());
1789 // check if there are still instances left
1790 if (other_instances
.empty()) {
1794 // Now we have a bucket with instances where the reshard status is none, this
1795 // usually happens when the reshard process couldn't complete, lockdown the
1796 // bucket and walk through these instances to make sure no one else interferes
1799 RGWBucketReshardLock
reshard_lock(store
, cur_bucket_info
, true);
1800 r
= reshard_lock
.lock();
1802 // most likely bucket is under reshard, return the sureshot stale instances
1803 ldout(store
->ctx(), 5) << __func__
1804 << "failed to take reshard lock; reshard underway likey" << dendl
;
1807 auto sg
= make_scope_guard([&reshard_lock
](){ reshard_lock
.unlock();} );
1808 // this should be fast enough that we may not need to renew locks and check
1809 // exit status?, should we read the values of the instances again?
1810 stale_instances
.insert(std::end(stale_instances
),
1811 std::make_move_iterator(other_instances
.begin()),
1812 std::make_move_iterator(other_instances
.end()));
1818 static int process_stale_instances(rgw::sal::RGWRadosStore
*store
, RGWBucketAdminOpState
& op_state
,
1819 RGWFormatterFlusher
& flusher
,
1820 std::function
<void(const bucket_instance_ls
&,
1822 rgw::sal::RGWRadosStore
*)> process_f
)
1826 Formatter
*formatter
= flusher
.get_formatter();
1827 static constexpr auto default_max_keys
= 1000;
1829 int ret
= store
->ctl()->meta
.mgr
->list_keys_init("bucket.instance", marker
, &handle
);
1831 cerr
<< "ERROR: can't get key: " << cpp_strerror(-ret
) << std::endl
;
1837 formatter
->open_array_section("keys");
1838 auto g
= make_scope_guard([&store
, &handle
, &formatter
]() {
1839 store
->ctl()->meta
.mgr
->list_keys_complete(handle
);
1840 formatter
->close_section(); // keys
1841 formatter
->flush(cout
);
1845 list
<std::string
> keys
;
1847 ret
= store
->ctl()->meta
.mgr
->list_keys_next(handle
, default_max_keys
, keys
, &truncated
);
1848 if (ret
< 0 && ret
!= -ENOENT
) {
1849 cerr
<< "ERROR: lists_keys_next(): " << cpp_strerror(-ret
) << std::endl
;
1851 } if (ret
!= -ENOENT
) {
1852 // partition the list of buckets by buckets as the listing is un sorted,
1853 // since it would minimize the reads to bucket_info
1854 std::unordered_map
<std::string
, std::vector
<std::string
>> bucket_instance_map
;
1855 for (auto &key
: keys
) {
1856 auto pos
= key
.find(':');
1857 if(pos
!= std::string::npos
)
1858 bucket_instance_map
[key
.substr(0,pos
)].emplace_back(std::move(key
));
1860 for (const auto& kv
: bucket_instance_map
) {
1861 bucket_instance_ls stale_lst
;
1862 get_stale_instances(store
, kv
.first
, kv
.second
, stale_lst
);
1863 process_f(stale_lst
, formatter
, store
);
1866 } while (truncated
);
1871 int RGWBucketAdminOp::list_stale_instances(rgw::sal::RGWRadosStore
*store
,
1872 RGWBucketAdminOpState
& op_state
,
1873 RGWFormatterFlusher
& flusher
)
1875 auto process_f
= [](const bucket_instance_ls
& lst
,
1876 Formatter
*formatter
,
1877 rgw::sal::RGWRadosStore
*){
1878 for (const auto& binfo
: lst
)
1879 formatter
->dump_string("key", binfo
.bucket
.get_key());
1881 return process_stale_instances(store
, op_state
, flusher
, process_f
);
1885 int RGWBucketAdminOp::clear_stale_instances(rgw::sal::RGWRadosStore
*store
,
1886 RGWBucketAdminOpState
& op_state
,
1887 RGWFormatterFlusher
& flusher
)
1889 auto process_f
= [](const bucket_instance_ls
& lst
,
1890 Formatter
*formatter
,
1891 rgw::sal::RGWRadosStore
*store
){
1892 for (const auto &binfo
: lst
) {
1893 int ret
= purge_bucket_instance(store
, binfo
);
1895 auto md_key
= "bucket.instance:" + binfo
.bucket
.get_key();
1896 ret
= store
->ctl()->meta
.mgr
->remove(md_key
, null_yield
);
1898 formatter
->open_object_section("delete_status");
1899 formatter
->dump_string("bucket_instance", binfo
.bucket
.get_key());
1900 formatter
->dump_int("status", -ret
);
1901 formatter
->close_section();
1905 return process_stale_instances(store
, op_state
, flusher
, process_f
);
1908 static int fix_single_bucket_lc(rgw::sal::RGWRadosStore
*store
,
1909 const std::string
& tenant_name
,
1910 const std::string
& bucket_name
)
1912 RGWBucketInfo bucket_info
;
1913 map
<std::string
, bufferlist
> bucket_attrs
;
1914 int ret
= store
->getRados()->get_bucket_info(store
->svc(), tenant_name
, bucket_name
,
1915 bucket_info
, nullptr, null_yield
, &bucket_attrs
);
1917 // TODO: Should we handle the case where the bucket could've been removed between
1918 // listing and fetching?
1922 return rgw::lc::fix_lc_shard_entry(store
, bucket_info
, bucket_attrs
);
1925 static void format_lc_status(Formatter
* formatter
,
1926 const std::string
& tenant_name
,
1927 const std::string
& bucket_name
,
1930 formatter
->open_object_section("bucket_entry");
1931 std::string entry
= tenant_name
.empty() ? bucket_name
: tenant_name
+ "/" + bucket_name
;
1932 formatter
->dump_string("bucket", entry
);
1933 formatter
->dump_int("status", status
);
1934 formatter
->close_section(); // bucket_entry
1937 static void process_single_lc_entry(rgw::sal::RGWRadosStore
*store
,
1938 Formatter
*formatter
,
1939 const std::string
& tenant_name
,
1940 const std::string
& bucket_name
)
1942 int ret
= fix_single_bucket_lc(store
, tenant_name
, bucket_name
);
1943 format_lc_status(formatter
, tenant_name
, bucket_name
, -ret
);
1946 int RGWBucketAdminOp::fix_lc_shards(rgw::sal::RGWRadosStore
*store
,
1947 RGWBucketAdminOpState
& op_state
,
1948 RGWFormatterFlusher
& flusher
)
1952 Formatter
*formatter
= flusher
.get_formatter();
1953 static constexpr auto default_max_keys
= 1000;
1956 if (const std::string
& bucket_name
= op_state
.get_bucket_name();
1957 ! bucket_name
.empty()) {
1958 const rgw_user user_id
= op_state
.get_user_id();
1959 process_single_lc_entry(store
, formatter
, user_id
.tenant
, bucket_name
);
1960 formatter
->flush(cout
);
1962 int ret
= store
->ctl()->meta
.mgr
->list_keys_init("bucket", marker
, &handle
);
1964 std::cerr
<< "ERROR: can't get key: " << cpp_strerror(-ret
) << std::endl
;
1969 formatter
->open_array_section("lc_fix_status");
1970 auto sg
= make_scope_guard([&store
, &handle
, &formatter
](){
1971 store
->ctl()->meta
.mgr
->list_keys_complete(handle
);
1972 formatter
->close_section(); // lc_fix_status
1973 formatter
->flush(cout
);
1976 list
<std::string
> keys
;
1977 ret
= store
->ctl()->meta
.mgr
->list_keys_next(handle
, default_max_keys
, keys
, &truncated
);
1978 if (ret
< 0 && ret
!= -ENOENT
) {
1979 std::cerr
<< "ERROR: lists_keys_next(): " << cpp_strerror(-ret
) << std::endl
;
1981 } if (ret
!= -ENOENT
) {
1982 for (const auto &key
:keys
) {
1983 auto [tenant_name
, bucket_name
] = split_tenant(key
);
1984 process_single_lc_entry(store
, formatter
, tenant_name
, bucket_name
);
1987 formatter
->flush(cout
); // regularly flush every 1k entries
1988 } while (truncated
);
1996 static bool has_object_expired(rgw::sal::RGWRadosStore
*store
,
1997 const RGWBucketInfo
& bucket_info
,
1998 const rgw_obj_key
& key
, utime_t
& delete_at
)
2000 rgw_obj
obj(bucket_info
.bucket
, key
);
2001 bufferlist delete_at_bl
;
2003 int ret
= rgw_object_get_attr(store
, bucket_info
, obj
, RGW_ATTR_DELETE_AT
, delete_at_bl
, null_yield
);
2005 return false; // no delete at attr, proceed
2008 ret
= decode_bl(delete_at_bl
, delete_at
);
2010 return false; // failed to parse
2013 if (delete_at
<= ceph_clock_now() && !delete_at
.is_zero()) {
2020 static int fix_bucket_obj_expiry(rgw::sal::RGWRadosStore
*store
,
2021 const RGWBucketInfo
& bucket_info
,
2022 RGWFormatterFlusher
& flusher
, bool dry_run
)
2024 if (bucket_info
.bucket
.bucket_id
== bucket_info
.bucket
.marker
) {
2025 lderr(store
->ctx()) << "Not a resharded bucket skipping" << dendl
;
2026 return 0; // not a resharded bucket, move along
2029 Formatter
*formatter
= flusher
.get_formatter();
2030 formatter
->open_array_section("expired_deletion_status");
2031 auto sg
= make_scope_guard([&formatter
] {
2032 formatter
->close_section();
2033 formatter
->flush(std::cout
);
2036 RGWRados::Bucket
target(store
->getRados(), bucket_info
);
2037 RGWRados::Bucket::List
list_op(&target
);
2039 list_op
.params
.list_versions
= bucket_info
.versioned();
2040 list_op
.params
.allow_unordered
= true;
2042 bool is_truncated
{false};
2044 std::vector
<rgw_bucket_dir_entry
> objs
;
2046 int ret
= list_op
.list_objects(listing_max_entries
, &objs
, nullptr,
2047 &is_truncated
, null_yield
);
2049 lderr(store
->ctx()) << "ERROR failed to list objects in the bucket" << dendl
;
2052 for (const auto& obj
: objs
) {
2053 rgw_obj_key
key(obj
.key
);
2055 if (has_object_expired(store
, bucket_info
, key
, delete_at
)) {
2056 formatter
->open_object_section("object_status");
2057 formatter
->dump_string("object", key
.name
);
2058 formatter
->dump_stream("delete_at") << delete_at
;
2061 ret
= rgw_remove_object(store
, bucket_info
, bucket_info
.bucket
, key
);
2062 formatter
->dump_int("status", ret
);
2065 formatter
->close_section(); // object_status
2068 formatter
->flush(cout
); // regularly flush every 1k entries
2069 } while (is_truncated
);
2074 int RGWBucketAdminOp::fix_obj_expiry(rgw::sal::RGWRadosStore
*store
,
2075 RGWBucketAdminOpState
& op_state
,
2076 RGWFormatterFlusher
& flusher
, bool dry_run
)
2078 RGWBucket admin_bucket
;
2079 int ret
= admin_bucket
.init(store
, op_state
, null_yield
);
2081 lderr(store
->ctx()) << "failed to initialize bucket" << dendl
;
2085 return fix_bucket_obj_expiry(store
, admin_bucket
.get_bucket_info(), flusher
, dry_run
);
2088 void rgw_data_change::dump(Formatter
*f
) const
2091 switch (entity_type
) {
2092 case ENTITY_TYPE_BUCKET
:
2098 encode_json("entity_type", type
, f
);
2099 encode_json("key", key
, f
);
2100 utime_t
ut(timestamp
);
2101 encode_json("timestamp", ut
, f
);
2104 void rgw_data_change::decode_json(JSONObj
*obj
) {
2106 JSONDecoder::decode_json("entity_type", s
, obj
);
2107 if (s
== "bucket") {
2108 entity_type
= ENTITY_TYPE_BUCKET
;
2110 entity_type
= ENTITY_TYPE_UNKNOWN
;
2112 JSONDecoder::decode_json("key", key
, obj
);
2114 JSONDecoder::decode_json("timestamp", ut
, obj
);
2115 timestamp
= ut
.to_real_time();
2118 void rgw_data_change_log_entry::dump(Formatter
*f
) const
2120 encode_json("log_id", log_id
, f
);
2121 utime_t
ut(log_timestamp
);
2122 encode_json("log_timestamp", ut
, f
);
2123 encode_json("entry", entry
, f
);
2126 void rgw_data_change_log_entry::decode_json(JSONObj
*obj
) {
2127 JSONDecoder::decode_json("log_id", log_id
, obj
);
2129 JSONDecoder::decode_json("log_timestamp", ut
, obj
);
2130 log_timestamp
= ut
.to_real_time();
2131 JSONDecoder::decode_json("entry", entry
, obj
);
2135 RGWDataChangesLog::RGWDataChangesLog(RGWSI_Zone
*zone_svc
, RGWSI_Cls
*cls_svc
)
2136 : cct(zone_svc
->ctx()), changes(cct
->_conf
->rgw_data_log_changes_size
)
2138 svc
.zone
= zone_svc
;
2141 num_shards
= cct
->_conf
->rgw_data_log_num_shards
;
2143 oids
= new string
[num_shards
];
2145 string prefix
= cct
->_conf
->rgw_data_log_obj_prefix
;
2147 if (prefix
.empty()) {
2148 prefix
= "data_log";
2151 for (int i
= 0; i
< num_shards
; i
++) {
2153 snprintf(buf
, sizeof(buf
), "%s.%d", prefix
.c_str(), i
);
2157 renew_thread
= new ChangesRenewThread(cct
, this);
2158 renew_thread
->create("rgw_dt_lg_renew");
2161 int RGWDataChangesLog::choose_oid(const rgw_bucket_shard
& bs
) {
2162 const string
& name
= bs
.bucket
.name
;
2163 int shard_shift
= (bs
.shard_id
> 0 ? bs
.shard_id
: 0);
2164 uint32_t r
= (ceph_str_hash_linux(name
.c_str(), name
.size()) + shard_shift
) % num_shards
;
2169 int RGWDataChangesLog::renew_entries()
2171 if (!svc
.zone
->need_to_log_data())
2174 /* we can't keep the bucket name as part of the cls_log_entry, and we need
2175 * it later, so we keep two lists under the map */
2176 map
<int, pair
<list
<rgw_bucket_shard
>, list
<cls_log_entry
> > > m
;
2179 map
<rgw_bucket_shard
, bool> entries
;
2180 entries
.swap(cur_cycle
);
2183 map
<rgw_bucket_shard
, bool>::iterator iter
;
2185 real_time ut
= real_clock::now();
2186 for (iter
= entries
.begin(); iter
!= entries
.end(); ++iter
) {
2187 const rgw_bucket_shard
& bs
= iter
->first
;
2189 int index
= choose_oid(bs
);
2191 cls_log_entry entry
;
2193 rgw_data_change change
;
2195 change
.entity_type
= ENTITY_TYPE_BUCKET
;
2196 change
.key
= bs
.get_key();
2197 change
.timestamp
= ut
;
2200 svc
.cls
->timelog
.prepare_entry(entry
, ut
, section
, change
.key
, bl
);
2202 m
[index
].first
.push_back(bs
);
2203 m
[index
].second
.emplace_back(std::move(entry
));
2206 map
<int, pair
<list
<rgw_bucket_shard
>, list
<cls_log_entry
> > >::iterator miter
;
2207 for (miter
= m
.begin(); miter
!= m
.end(); ++miter
) {
2208 list
<cls_log_entry
>& entries
= miter
->second
.second
;
2210 real_time now
= real_clock::now();
2212 int ret
= svc
.cls
->timelog
.add(oids
[miter
->first
], entries
, nullptr, true, null_yield
);
2214 /* we don't really need to have a special handling for failed cases here,
2215 * as this is just an optimization. */
2216 lderr(cct
) << "ERROR: svc.cls->timelog.add() returned " << ret
<< dendl
;
2220 real_time expiration
= now
;
2221 expiration
+= make_timespan(cct
->_conf
->rgw_data_log_window
);
2223 list
<rgw_bucket_shard
>& buckets
= miter
->second
.first
;
2224 list
<rgw_bucket_shard
>::iterator liter
;
2225 for (liter
= buckets
.begin(); liter
!= buckets
.end(); ++liter
) {
2226 update_renewed(*liter
, expiration
);
2233 void RGWDataChangesLog::_get_change(const rgw_bucket_shard
& bs
, ChangeStatusPtr
& status
)
2235 ceph_assert(ceph_mutex_is_locked(lock
));
2236 if (!changes
.find(bs
, status
)) {
2237 status
= ChangeStatusPtr(new ChangeStatus
);
2238 changes
.add(bs
, status
);
2242 void RGWDataChangesLog::register_renew(rgw_bucket_shard
& bs
)
2244 std::lock_guard l
{lock
};
2245 cur_cycle
[bs
] = true;
2248 void RGWDataChangesLog::update_renewed(rgw_bucket_shard
& bs
, real_time
& expiration
)
2250 std::lock_guard l
{lock
};
2251 ChangeStatusPtr status
;
2252 _get_change(bs
, status
);
2254 ldout(cct
, 20) << "RGWDataChangesLog::update_renewd() bucket_name=" << bs
.bucket
.name
<< " shard_id=" << bs
.shard_id
<< " expiration=" << expiration
<< dendl
;
2255 status
->cur_expiration
= expiration
;
2258 int RGWDataChangesLog::get_log_shard_id(rgw_bucket
& bucket
, int shard_id
) {
2259 rgw_bucket_shard
bs(bucket
, shard_id
);
2261 return choose_oid(bs
);
2264 bool RGWDataChangesLog::filter_bucket(const rgw_bucket
& bucket
, optional_yield y
) const
2266 if (!bucket_filter
) {
2270 return bucket_filter
->filter(bucket
, y
);
2273 int RGWDataChangesLog::add_entry(const RGWBucketInfo
& bucket_info
, int shard_id
) {
2274 auto& bucket
= bucket_info
.bucket
;
2276 if (!filter_bucket(bucket
, null_yield
)) {
2281 observer
->on_bucket_changed(bucket
.get_key());
2284 rgw_bucket_shard
bs(bucket
, shard_id
);
2286 int index
= choose_oid(bs
);
2287 mark_modified(index
, bs
);
2291 ChangeStatusPtr status
;
2292 _get_change(bs
, status
);
2296 real_time now
= real_clock::now();
2298 status
->lock
.lock();
2300 ldout(cct
, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket
.name
<< " shard_id=" << shard_id
<< " now=" << now
<< " cur_expiration=" << status
->cur_expiration
<< dendl
;
2302 if (now
< status
->cur_expiration
) {
2303 /* no need to send, recently completed */
2304 status
->lock
.unlock();
2310 RefCountedCond
*cond
;
2312 if (status
->pending
) {
2313 cond
= status
->cond
;
2317 status
->cond
->get();
2318 status
->lock
.unlock();
2320 int ret
= cond
->wait();
2328 status
->cond
= new RefCountedCond
;
2329 status
->pending
= true;
2331 string
& oid
= oids
[index
];
2332 real_time expiration
;
2337 status
->cur_sent
= now
;
2340 expiration
+= ceph::make_timespan(cct
->_conf
->rgw_data_log_window
);
2342 status
->lock
.unlock();
2345 rgw_data_change change
;
2346 change
.entity_type
= ENTITY_TYPE_BUCKET
;
2347 change
.key
= bs
.get_key();
2348 change
.timestamp
= now
;
2352 ldout(cct
, 20) << "RGWDataChangesLog::add_entry() sending update with now=" << now
<< " cur_expiration=" << expiration
<< dendl
;
2354 ret
= svc
.cls
->timelog
.add(oid
, now
, section
, change
.key
, bl
, null_yield
);
2356 now
= real_clock::now();
2358 status
->lock
.lock();
2360 } while (!ret
&& real_clock::now() > expiration
);
2362 cond
= status
->cond
;
2364 status
->pending
= false;
2365 status
->cur_expiration
= status
->cur_sent
; /* time of when operation started, not completed */
2366 status
->cur_expiration
+= make_timespan(cct
->_conf
->rgw_data_log_window
);
2367 status
->cond
= NULL
;
2368 status
->lock
.unlock();
2376 int RGWDataChangesLog::list_entries(int shard
, const real_time
& start_time
, const real_time
& end_time
, int max_entries
,
2377 list
<rgw_data_change_log_entry
>& entries
,
2378 const string
& marker
,
2381 if (shard
>= num_shards
)
2384 list
<cls_log_entry
> log_entries
;
2386 int ret
= svc
.cls
->timelog
.list(oids
[shard
], start_time
, end_time
,
2387 max_entries
, log_entries
, marker
,
2388 out_marker
, truncated
, null_yield
);
2392 list
<cls_log_entry
>::iterator iter
;
2393 for (iter
= log_entries
.begin(); iter
!= log_entries
.end(); ++iter
) {
2394 rgw_data_change_log_entry log_entry
;
2395 log_entry
.log_id
= iter
->id
;
2396 real_time rt
= iter
->timestamp
.to_real_time();
2397 log_entry
.log_timestamp
= rt
;
2398 auto liter
= iter
->data
.cbegin();
2400 decode(log_entry
.entry
, liter
);
2401 } catch (buffer::error
& err
) {
2402 lderr(cct
) << "ERROR: failed to decode data changes log entry" << dendl
;
2405 entries
.push_back(log_entry
);
2411 int RGWDataChangesLog::list_entries(const real_time
& start_time
, const real_time
& end_time
, int max_entries
,
2412 list
<rgw_data_change_log_entry
>& entries
, LogMarker
& marker
, bool *ptruncated
) {
2416 for (; marker
.shard
< num_shards
&& (int)entries
.size() < max_entries
;
2417 marker
.shard
++, marker
.marker
.clear()) {
2418 int ret
= list_entries(marker
.shard
, start_time
, end_time
, max_entries
- entries
.size(), entries
,
2419 marker
.marker
, NULL
, &truncated
);
2420 if (ret
== -ENOENT
) {
2432 *ptruncated
= (marker
.shard
< num_shards
);
2437 int RGWDataChangesLog::get_info(int shard_id
, RGWDataChangesLogInfo
*info
)
2439 if (shard_id
>= num_shards
)
2442 string oid
= oids
[shard_id
];
2444 cls_log_header header
;
2446 int ret
= svc
.cls
->timelog
.info(oid
, &header
, null_yield
);
2447 if ((ret
< 0) && (ret
!= -ENOENT
))
2450 info
->marker
= header
.max_marker
;
2451 info
->last_update
= header
.max_time
.to_real_time();
2456 int RGWDataChangesLog::trim_entries(int shard_id
, const real_time
& start_time
, const real_time
& end_time
,
2457 const string
& start_marker
, const string
& end_marker
)
2459 if (shard_id
> num_shards
)
2462 return svc
.cls
->timelog
.trim(oids
[shard_id
], start_time
, end_time
,
2463 start_marker
, end_marker
, nullptr, null_yield
);
2466 bool RGWDataChangesLog::going_down()
2471 RGWDataChangesLog::~RGWDataChangesLog() {
2473 renew_thread
->stop();
2474 renew_thread
->join();
2475 delete renew_thread
;
2479 void *RGWDataChangesLog::ChangesRenewThread::entry() {
2481 dout(2) << "RGWDataChangesLog::ChangesRenewThread: start" << dendl
;
2482 int r
= log
->renew_entries();
2484 dout(0) << "ERROR: RGWDataChangesLog::renew_entries returned error r=" << r
<< dendl
;
2487 if (log
->going_down())
2490 int interval
= cct
->_conf
->rgw_data_log_window
* 3 / 4;
2491 std::unique_lock locker
{lock
};
2492 cond
.wait_for(locker
, std::chrono::seconds(interval
));
2498 void RGWDataChangesLog::ChangesRenewThread::stop()
2500 std::lock_guard l
{lock
};
2504 void RGWDataChangesLog::mark_modified(int shard_id
, const rgw_bucket_shard
& bs
)
2506 auto key
= bs
.get_key();
2508 std::shared_lock rl
{modified_lock
}; // read lock to check for existence
2509 auto shard
= modified_shards
.find(shard_id
);
2510 if (shard
!= modified_shards
.end() && shard
->second
.count(key
)) {
2515 std::unique_lock wl
{modified_lock
}; // write lock for insertion
2516 modified_shards
[shard_id
].insert(key
);
2519 void RGWDataChangesLog::read_clear_modified(map
<int, set
<string
> > &modified
)
2521 std::unique_lock wl
{modified_lock
};
2522 modified
.swap(modified_shards
);
2523 modified_shards
.clear();
2526 void RGWBucketCompleteInfo::dump(Formatter
*f
) const {
2527 encode_json("bucket_info", info
, f
);
2528 encode_json("attrs", attrs
, f
);
2531 void RGWBucketCompleteInfo::decode_json(JSONObj
*obj
) {
2532 JSONDecoder::decode_json("bucket_info", info
, obj
);
2533 JSONDecoder::decode_json("attrs", attrs
, obj
);
2536 class RGWBucketMetadataHandler
: public RGWBucketMetadataHandlerBase
{
2539 RGWSI_Bucket
*bucket
{nullptr};
2543 RGWBucketCtl
*bucket
{nullptr};
2546 RGWBucketMetadataHandler() {}
2548 void init(RGWSI_Bucket
*bucket_svc
,
2549 RGWBucketCtl
*bucket_ctl
) override
{
2550 base_init(bucket_svc
->ctx(),
2551 bucket_svc
->get_ep_be_handler().get());
2552 svc
.bucket
= bucket_svc
;
2553 ctl
.bucket
= bucket_ctl
;
2556 string
get_type() override
{ return "bucket"; }
2558 RGWMetadataObject
*get_meta_obj(JSONObj
*jo
, const obj_version
& objv
, const ceph::real_time
& mtime
) override
{
2559 RGWBucketEntryPoint be
;
2562 decode_json_obj(be
, jo
);
2563 } catch (JSONDecoder::err
& e
) {
2567 return new RGWBucketEntryMetadataObject(be
, objv
, mtime
);
2570 int do_get(RGWSI_MetaBackend_Handler::Op
*op
, string
& entry
, RGWMetadataObject
**obj
, optional_yield y
) override
{
2571 RGWObjVersionTracker ot
;
2572 RGWBucketEntryPoint be
;
2575 map
<string
, bufferlist
> attrs
;
2577 RGWSI_Bucket_EP_Ctx
ctx(op
->ctx());
2579 int ret
= svc
.bucket
->read_bucket_entrypoint_info(ctx
, entry
, &be
, &ot
, &mtime
, &attrs
, y
);
2583 RGWBucketEntryMetadataObject
*mdo
= new RGWBucketEntryMetadataObject(be
, ot
.read_version
, mtime
, std::move(attrs
));
2590 int do_put(RGWSI_MetaBackend_Handler::Op
*op
, string
& entry
,
2591 RGWMetadataObject
*obj
,
2592 RGWObjVersionTracker
& objv_tracker
,
2594 RGWMDLogSyncType type
) override
;
2596 int do_remove(RGWSI_MetaBackend_Handler::Op
*op
, string
& entry
, RGWObjVersionTracker
& objv_tracker
,
2597 optional_yield y
) override
{
2598 RGWBucketEntryPoint be
;
2600 real_time orig_mtime
;
2602 RGWSI_Bucket_EP_Ctx
ctx(op
->ctx());
2604 int ret
= svc
.bucket
->read_bucket_entrypoint_info(ctx
, entry
, &be
, &objv_tracker
, &orig_mtime
, nullptr, y
);
2609 * We're unlinking the bucket but we don't want to update the entrypoint here - we're removing
2610 * it immediately and don't want to invalidate our cached objv_version or the bucket obj removal
2611 * will incorrectly fail.
2613 ret
= ctl
.bucket
->unlink_bucket(be
.owner
, be
.bucket
, y
, false);
2615 lderr(svc
.bucket
->ctx()) << "could not unlink bucket=" << entry
<< " owner=" << be
.owner
<< dendl
;
2618 ret
= svc
.bucket
->remove_bucket_entrypoint_info(ctx
, entry
, &objv_tracker
, y
);
2620 lderr(svc
.bucket
->ctx()) << "could not delete bucket=" << entry
<< dendl
;
2626 int call(std::function
<int(RGWSI_Bucket_EP_Ctx
& ctx
)> f
) {
2627 return call(nullopt
, f
);
2630 int call(std::optional
<RGWSI_MetaBackend_CtxParams
> bectx_params
,
2631 std::function
<int(RGWSI_Bucket_EP_Ctx
& ctx
)> f
) {
2632 return be_handler
->call(bectx_params
, [&](RGWSI_MetaBackend_Handler::Op
*op
) {
2633 RGWSI_Bucket_EP_Ctx
ctx(op
->ctx());
2639 class RGWMetadataHandlerPut_Bucket
: public RGWMetadataHandlerPut_SObj
2641 RGWBucketMetadataHandler
*bhandler
;
2642 RGWBucketEntryMetadataObject
*obj
;
2644 RGWMetadataHandlerPut_Bucket(RGWBucketMetadataHandler
*_handler
,
2645 RGWSI_MetaBackend_Handler::Op
*op
, string
& entry
,
2646 RGWMetadataObject
*_obj
, RGWObjVersionTracker
& objv_tracker
,
2648 RGWMDLogSyncType type
) : RGWMetadataHandlerPut_SObj(_handler
, op
, entry
, obj
, objv_tracker
, y
, type
),
2649 bhandler(_handler
) {
2650 obj
= static_cast<RGWBucketEntryMetadataObject
*>(_obj
);
2652 ~RGWMetadataHandlerPut_Bucket() {}
2654 void encode_obj(bufferlist
*bl
) override
{
2655 obj
->get_ep().encode(*bl
);
2658 int put_checked() override
;
2659 int put_post() override
;
2662 int RGWBucketMetadataHandler::do_put(RGWSI_MetaBackend_Handler::Op
*op
, string
& entry
,
2663 RGWMetadataObject
*obj
,
2664 RGWObjVersionTracker
& objv_tracker
,
2666 RGWMDLogSyncType type
)
2668 RGWMetadataHandlerPut_Bucket
put_op(this, op
, entry
, obj
, objv_tracker
, y
, type
);
2669 return do_put_operate(&put_op
);
2672 int RGWMetadataHandlerPut_Bucket::put_checked()
2674 RGWBucketEntryMetadataObject
*orig_obj
= static_cast<RGWBucketEntryMetadataObject
*>(old_obj
);
2677 obj
->set_pattrs(&orig_obj
->get_attrs());
2680 auto& be
= obj
->get_ep();
2681 auto mtime
= obj
->get_mtime();
2682 auto pattrs
= obj
->get_pattrs();
2684 RGWSI_Bucket_EP_Ctx
ctx(op
->ctx());
2686 return bhandler
->svc
.bucket
->store_bucket_entrypoint_info(ctx
, entry
,
2695 int RGWMetadataHandlerPut_Bucket::put_post()
2697 auto& be
= obj
->get_ep();
2703 ret
= bhandler
->ctl
.bucket
->link_bucket(be
.owner
, be
.bucket
, be
.creation_time
, y
, false);
2705 ret
= bhandler
->ctl
.bucket
->unlink_bucket(be
.owner
, be
.bucket
, y
, false);
2711 static void get_md5_digest(const RGWBucketEntryPoint
*be
, string
& md5_digest
) {
2713 char md5
[CEPH_CRYPTO_MD5_DIGESTSIZE
* 2 + 1];
2714 unsigned char m
[CEPH_CRYPTO_MD5_DIGESTSIZE
];
2717 Formatter
*f
= new JSONFormatter(false);
2722 // Allow use of MD5 digest in FIPS mode for non-cryptographic purposes
2723 hash
.SetFlags(EVP_MD_CTX_FLAG_NON_FIPS_ALLOW
);
2724 hash
.Update((const unsigned char *)bl
.c_str(), bl
.length());
2727 buf_to_hex(m
, CEPH_CRYPTO_MD5_DIGESTSIZE
, md5
);
2734 #define ARCHIVE_META_ATTR RGW_ATTR_PREFIX "zone.archive.info"
2736 struct archive_meta_info
{
2737 rgw_bucket orig_bucket
;
2739 bool from_attrs(CephContext
*cct
, map
<string
, bufferlist
>& attrs
) {
2740 auto iter
= attrs
.find(ARCHIVE_META_ATTR
);
2741 if (iter
== attrs
.end()) {
2745 auto bliter
= iter
->second
.cbegin();
2748 } catch (buffer::error
& err
) {
2749 ldout(cct
, 0) << "ERROR: failed to decode archive meta info" << dendl
;
2756 void store_in_attrs(map
<string
, bufferlist
>& attrs
) const {
2757 encode(attrs
[ARCHIVE_META_ATTR
]);
2760 void encode(bufferlist
& bl
) const {
2761 ENCODE_START(1, 1, bl
);
2762 encode(orig_bucket
, bl
);
2766 void decode(bufferlist::const_iterator
& bl
) {
2767 DECODE_START(1, bl
);
2768 decode(orig_bucket
, bl
);
2772 WRITE_CLASS_ENCODER(archive_meta_info
)
2774 class RGWArchiveBucketMetadataHandler
: public RGWBucketMetadataHandler
{
2776 RGWArchiveBucketMetadataHandler() {}
2778 int do_remove(RGWSI_MetaBackend_Handler::Op
*op
, string
& entry
, RGWObjVersionTracker
& objv_tracker
,
2779 optional_yield y
) override
{
2780 auto cct
= svc
.bucket
->ctx();
2782 RGWSI_Bucket_EP_Ctx
ctx(op
->ctx());
2784 ldout(cct
, 5) << "SKIP: bucket removal is not allowed on archive zone: bucket:" << entry
<< " ... proceeding to rename" << dendl
;
2786 string tenant_name
, bucket_name
;
2787 parse_bucket(entry
, &tenant_name
, &bucket_name
);
2788 rgw_bucket entry_bucket
;
2789 entry_bucket
.tenant
= tenant_name
;
2790 entry_bucket
.name
= bucket_name
;
2794 /* read original entrypoint */
2796 RGWBucketEntryPoint be
;
2797 map
<string
, bufferlist
> attrs
;
2798 int ret
= svc
.bucket
->read_bucket_entrypoint_info(ctx
, entry
, &be
, &objv_tracker
, &mtime
, &attrs
, y
);
2803 string bi_meta_name
= RGWSI_Bucket::get_bi_meta_key(be
.bucket
);
2805 /* read original bucket instance info */
2807 map
<string
, bufferlist
> attrs_m
;
2808 ceph::real_time orig_mtime
;
2809 RGWBucketInfo old_bi
;
2811 ret
= ctl
.bucket
->read_bucket_instance_info(be
.bucket
, &old_bi
, y
, RGWBucketCtl::BucketInstance::GetParams()
2812 .set_mtime(&orig_mtime
)
2813 .set_attrs(&attrs_m
));
2818 archive_meta_info ami
;
2820 if (!ami
.from_attrs(svc
.bucket
->ctx(), attrs_m
)) {
2821 ami
.orig_bucket
= old_bi
.bucket
;
2822 ami
.store_in_attrs(attrs_m
);
2825 /* generate a new bucket instance. We could have avoided this if we could just point a new
2826 * bucket entry point to the old bucket instance, however, due to limitation in the way
2827 * we index buckets under the user, bucket entrypoint and bucket instance of the same
2828 * bucket need to have the same name, so we need to copy the old bucket instance into
2829 * to a new entry with the new name
2832 string new_bucket_name
;
2834 RGWBucketInfo new_bi
= old_bi
;
2835 RGWBucketEntryPoint new_be
= be
;
2839 get_md5_digest(&new_be
, md5_digest
);
2840 new_bucket_name
= ami
.orig_bucket
.name
+ "-deleted-" + md5_digest
;
2842 new_bi
.bucket
.name
= new_bucket_name
;
2843 new_bi
.objv_tracker
.clear();
2845 new_be
.bucket
.name
= new_bucket_name
;
2847 ret
= ctl
.bucket
->store_bucket_instance_info(be
.bucket
, new_bi
, y
, RGWBucketCtl::BucketInstance::PutParams()
2848 .set_exclusive(false)
2849 .set_mtime(orig_mtime
)
2850 .set_attrs(&attrs_m
)
2851 .set_orig_info(&old_bi
));
2853 ldout(cct
, 0) << "ERROR: failed to put new bucket instance info for bucket=" << new_bi
.bucket
<< " ret=" << ret
<< dendl
;
2857 /* store a new entrypoint */
2859 RGWObjVersionTracker ot
;
2860 ot
.generate_new_write_ver(cct
);
2862 ret
= svc
.bucket
->store_bucket_entrypoint_info(ctx
, RGWSI_Bucket::get_entrypoint_meta_key(new_be
.bucket
),
2863 new_be
, true, mtime
, &attrs
, nullptr, y
);
2865 ldout(cct
, 0) << "ERROR: failed to put new bucket entrypoint for bucket=" << new_be
.bucket
<< " ret=" << ret
<< dendl
;
2869 /* link new bucket */
2871 ret
= ctl
.bucket
->link_bucket(new_be
.owner
, new_be
.bucket
, new_be
.creation_time
, y
, false);
2873 ldout(cct
, 0) << "ERROR: failed to link new bucket for bucket=" << new_be
.bucket
<< " ret=" << ret
<< dendl
;
2877 /* clean up old stuff */
2879 ret
= ctl
.bucket
->unlink_bucket(be
.owner
, entry_bucket
, y
, false);
2881 lderr(cct
) << "could not unlink bucket=" << entry
<< " owner=" << be
.owner
<< dendl
;
2884 // if (ret == -ECANCELED) it means that there was a race here, and someone
2885 // wrote to the bucket entrypoint just before we removed it. The question is
2886 // whether it was a newly created bucket entrypoint ... in which case we
2887 // should ignore the error and move forward, or whether it is a higher version
2888 // of the same bucket instance ... in which we should retry
2889 ret
= svc
.bucket
->remove_bucket_entrypoint_info(ctx
,
2890 RGWSI_Bucket::get_entrypoint_meta_key(be
.bucket
),
2894 ldout(cct
, 0) << "ERROR: failed to put new bucket entrypoint for bucket=" << new_be
.bucket
<< " ret=" << ret
<< dendl
;
2898 ret
= ctl
.bucket
->remove_bucket_instance_info(be
.bucket
, old_bi
, y
);
2900 lderr(cct
) << "could not delete bucket=" << entry
<< dendl
;
2909 int do_put(RGWSI_MetaBackend_Handler::Op
*op
, string
& entry
,
2910 RGWMetadataObject
*obj
,
2911 RGWObjVersionTracker
& objv_tracker
,
2913 RGWMDLogSyncType type
) override
{
2914 if (entry
.find("-deleted-") != string::npos
) {
2915 RGWObjVersionTracker ot
;
2916 RGWMetadataObject
*robj
;
2917 int ret
= do_get(op
, entry
, &robj
, y
);
2918 if (ret
!= -ENOENT
) {
2922 ot
.read_version
= robj
->get_version();
2925 ret
= do_remove(op
, entry
, ot
, y
);
2932 return RGWBucketMetadataHandler::do_put(op
, entry
, obj
,
2933 objv_tracker
, y
, type
);
2938 class RGWBucketInstanceMetadataHandler
: public RGWBucketInstanceMetadataHandlerBase
{
2939 int read_bucket_instance_entry(RGWSI_Bucket_BI_Ctx
& ctx
,
2940 const string
& entry
,
2941 RGWBucketCompleteInfo
*bi
,
2942 ceph::real_time
*pmtime
,
2944 return svc
.bucket
->read_bucket_instance_info(ctx
,
2953 RGWSI_Zone
*zone
{nullptr};
2954 RGWSI_Bucket
*bucket
{nullptr};
2955 RGWSI_BucketIndex
*bi
{nullptr};
2958 RGWBucketInstanceMetadataHandler() {}
2960 void init(RGWSI_Zone
*zone_svc
,
2961 RGWSI_Bucket
*bucket_svc
,
2962 RGWSI_BucketIndex
*bi_svc
) {
2963 base_init(bucket_svc
->ctx(),
2964 bucket_svc
->get_bi_be_handler().get());
2965 svc
.zone
= zone_svc
;
2966 svc
.bucket
= bucket_svc
;
2970 string
get_type() override
{ return "bucket.instance"; }
2972 RGWMetadataObject
*get_meta_obj(JSONObj
*jo
, const obj_version
& objv
, const ceph::real_time
& mtime
) override
{
2973 RGWBucketCompleteInfo bci
;
2976 decode_json_obj(bci
, jo
);
2977 } catch (JSONDecoder::err
& e
) {
2981 return new RGWBucketInstanceMetadataObject(bci
, objv
, mtime
);
2984 int do_get(RGWSI_MetaBackend_Handler::Op
*op
, string
& entry
, RGWMetadataObject
**obj
, optional_yield y
) override
{
2985 RGWBucketCompleteInfo bci
;
2988 RGWSI_Bucket_BI_Ctx
ctx(op
->ctx());
2990 int ret
= svc
.bucket
->read_bucket_instance_info(ctx
, entry
, &bci
.info
, &mtime
, &bci
.attrs
, y
);
2994 RGWBucketInstanceMetadataObject
*mdo
= new RGWBucketInstanceMetadataObject(bci
, bci
.info
.objv_tracker
.read_version
, mtime
);
3001 int do_put(RGWSI_MetaBackend_Handler::Op
*op
, string
& entry
,
3002 RGWMetadataObject
*_obj
, RGWObjVersionTracker
& objv_tracker
,
3004 RGWMDLogSyncType sync_type
) override
;
3006 int do_remove(RGWSI_MetaBackend_Handler::Op
*op
, string
& entry
, RGWObjVersionTracker
& objv_tracker
,
3007 optional_yield y
) override
{
3008 RGWBucketCompleteInfo bci
;
3010 RGWSI_Bucket_BI_Ctx
ctx(op
->ctx());
3012 int ret
= read_bucket_instance_entry(ctx
, entry
, &bci
, nullptr, y
);
3013 if (ret
< 0 && ret
!= -ENOENT
)
3016 return svc
.bucket
->remove_bucket_instance_info(ctx
, entry
, bci
.info
, &bci
.info
.objv_tracker
, y
);
3019 int call(std::function
<int(RGWSI_Bucket_BI_Ctx
& ctx
)> f
) {
3020 return call(nullopt
, f
);
3023 int call(std::optional
<RGWSI_MetaBackend_CtxParams
> bectx_params
,
3024 std::function
<int(RGWSI_Bucket_BI_Ctx
& ctx
)> f
) {
3025 return be_handler
->call(bectx_params
, [&](RGWSI_MetaBackend_Handler::Op
*op
) {
3026 RGWSI_Bucket_BI_Ctx
ctx(op
->ctx());
3032 class RGWMetadataHandlerPut_BucketInstance
: public RGWMetadataHandlerPut_SObj
3035 RGWBucketInstanceMetadataHandler
*bihandler
;
3036 RGWBucketInstanceMetadataObject
*obj
;
3038 RGWMetadataHandlerPut_BucketInstance(CephContext
*cct
,
3039 RGWBucketInstanceMetadataHandler
*_handler
,
3040 RGWSI_MetaBackend_Handler::Op
*_op
, string
& entry
,
3041 RGWMetadataObject
*_obj
, RGWObjVersionTracker
& objv_tracker
,
3043 RGWMDLogSyncType type
) : RGWMetadataHandlerPut_SObj(_handler
, _op
, entry
, obj
, objv_tracker
, y
, type
),
3044 bihandler(_handler
) {
3045 obj
= static_cast<RGWBucketInstanceMetadataObject
*>(_obj
);
3047 auto& bci
= obj
->get_bci();
3048 obj
->set_pattrs(&bci
.attrs
);
3051 void encode_obj(bufferlist
*bl
) override
{
3052 obj
->get_bucket_info().encode(*bl
);
3055 int put_check() override
;
3056 int put_checked() override
;
3057 int put_post() override
;
3060 int RGWBucketInstanceMetadataHandler::do_put(RGWSI_MetaBackend_Handler::Op
*op
,
3062 RGWMetadataObject
*obj
,
3063 RGWObjVersionTracker
& objv_tracker
,
3065 RGWMDLogSyncType type
)
3067 RGWMetadataHandlerPut_BucketInstance
put_op(svc
.bucket
->ctx(), this, op
, entry
, obj
,
3068 objv_tracker
, y
, type
);
3069 return do_put_operate(&put_op
);
3072 int RGWMetadataHandlerPut_BucketInstance::put_check()
3076 RGWBucketCompleteInfo
& bci
= obj
->get_bci();
3078 RGWBucketInstanceMetadataObject
*orig_obj
= static_cast<RGWBucketInstanceMetadataObject
*>(old_obj
);
3080 RGWBucketCompleteInfo
*old_bci
= (orig_obj
? &orig_obj
->get_bci() : nullptr);
3082 bool exists
= (!!orig_obj
);
3084 if (!exists
|| old_bci
->info
.bucket
.bucket_id
!= bci
.info
.bucket
.bucket_id
) {
3085 /* a new bucket, we need to select a new bucket placement for it */
3088 string bucket_instance
;
3089 parse_bucket(entry
, &tenant_name
, &bucket_name
, &bucket_instance
);
3091 RGWZonePlacementInfo rule_info
;
3092 bci
.info
.bucket
.name
= bucket_name
;
3093 bci
.info
.bucket
.bucket_id
= bucket_instance
;
3094 bci
.info
.bucket
.tenant
= tenant_name
;
3095 // if the sync module never writes data, don't require the zone to specify all placement targets
3096 if (bihandler
->svc
.zone
->sync_module_supports_writes()) {
3097 ret
= bihandler
->svc
.zone
->select_bucket_location_by_rule(bci
.info
.placement_rule
, &rule_info
);
3099 ldout(cct
, 0) << "ERROR: select_bucket_placement() returned " << ret
<< dendl
;
3103 bci
.info
.index_type
= rule_info
.index_type
;
3105 /* existing bucket, keep its placement */
3106 bci
.info
.bucket
.explicit_placement
= old_bci
->info
.bucket
.explicit_placement
;
3107 bci
.info
.placement_rule
= old_bci
->info
.placement_rule
;
3110 /* record the read version (if any), store the new version */
3111 bci
.info
.objv_tracker
.read_version
= objv_tracker
.read_version
;
3112 bci
.info
.objv_tracker
.write_version
= objv_tracker
.write_version
;
3117 int RGWMetadataHandlerPut_BucketInstance::put_checked()
3119 RGWBucketInstanceMetadataObject
*orig_obj
= static_cast<RGWBucketInstanceMetadataObject
*>(old_obj
);
3121 RGWBucketInfo
*orig_info
= (orig_obj
? &orig_obj
->get_bucket_info() : nullptr);
3123 auto& info
= obj
->get_bucket_info();
3124 auto mtime
= obj
->get_mtime();
3125 auto pattrs
= obj
->get_pattrs();
3127 RGWSI_Bucket_BI_Ctx
ctx(op
->ctx());
3129 return bihandler
->svc
.bucket
->store_bucket_instance_info(ctx
,
3139 int RGWMetadataHandlerPut_BucketInstance::put_post()
3141 RGWBucketCompleteInfo
& bci
= obj
->get_bci();
3143 objv_tracker
= bci
.info
.objv_tracker
;
3145 int ret
= bihandler
->svc
.bi
->init_index(bci
.info
);
3150 return STATUS_APPLIED
;
3153 class RGWArchiveBucketInstanceMetadataHandler
: public RGWBucketInstanceMetadataHandler
{
3155 RGWArchiveBucketInstanceMetadataHandler() {}
3157 int do_remove(RGWSI_MetaBackend_Handler::Op
*op
, string
& entry
, RGWObjVersionTracker
& objv_tracker
, optional_yield y
) override
{
3158 ldout(cct
, 0) << "SKIP: bucket instance removal is not allowed on archive zone: bucket.instance:" << entry
<< dendl
;
3163 bool RGWBucketCtl::DataLogFilter::filter(const rgw_bucket
& bucket
, optional_yield y
) const
3165 return bucket_ctl
->bucket_exports_data(bucket
, null_yield
);
3168 RGWBucketCtl::RGWBucketCtl(RGWSI_Zone
*zone_svc
,
3169 RGWSI_Bucket
*bucket_svc
,
3170 RGWSI_Bucket_Sync
*bucket_sync_svc
,
3171 RGWSI_BucketIndex
*bi_svc
) : cct(zone_svc
->ctx()),
3172 datalog_filter(this)
3174 svc
.zone
= zone_svc
;
3175 svc
.bucket
= bucket_svc
;
3176 svc
.bucket_sync
= bucket_sync_svc
;
3180 void RGWBucketCtl::init(RGWUserCtl
*user_ctl
,
3181 RGWBucketMetadataHandler
*_bm_handler
,
3182 RGWBucketInstanceMetadataHandler
*_bmi_handler
,
3183 RGWDataChangesLog
*datalog
)
3185 ctl
.user
= user_ctl
;
3187 bm_handler
= _bm_handler
;
3188 bmi_handler
= _bmi_handler
;
3190 bucket_be_handler
= bm_handler
->get_be_handler();
3191 bi_be_handler
= bmi_handler
->get_be_handler();
3193 datalog
->set_bucket_filter(&datalog_filter
);
3196 int RGWBucketCtl::call(std::function
<int(RGWSI_Bucket_X_Ctx
& ctx
)> f
) {
3197 return bm_handler
->call([&](RGWSI_Bucket_EP_Ctx
& ep_ctx
) {
3198 return bmi_handler
->call([&](RGWSI_Bucket_BI_Ctx
& bi_ctx
) {
3199 RGWSI_Bucket_X_Ctx ctx
{ep_ctx
, bi_ctx
};
3205 int RGWBucketCtl::read_bucket_entrypoint_info(const rgw_bucket
& bucket
,
3206 RGWBucketEntryPoint
*info
,
3208 const Bucket::GetParams
& params
)
3210 return bm_handler
->call(params
.bectx_params
, [&](RGWSI_Bucket_EP_Ctx
& ctx
) {
3211 return svc
.bucket
->read_bucket_entrypoint_info(ctx
,
3212 RGWSI_Bucket::get_entrypoint_meta_key(bucket
),
3214 params
.objv_tracker
,
3219 params
.refresh_version
);
3223 int RGWBucketCtl::store_bucket_entrypoint_info(const rgw_bucket
& bucket
,
3224 RGWBucketEntryPoint
& info
,
3226 const Bucket::PutParams
& params
)
3228 return bm_handler
->call([&](RGWSI_Bucket_EP_Ctx
& ctx
) {
3229 return svc
.bucket
->store_bucket_entrypoint_info(ctx
,
3230 RGWSI_Bucket::get_entrypoint_meta_key(bucket
),
3235 params
.objv_tracker
,
3240 int RGWBucketCtl::remove_bucket_entrypoint_info(const rgw_bucket
& bucket
,
3242 const Bucket::RemoveParams
& params
)
3244 return bm_handler
->call([&](RGWSI_Bucket_EP_Ctx
& ctx
) {
3245 return svc
.bucket
->remove_bucket_entrypoint_info(ctx
,
3246 RGWSI_Bucket::get_entrypoint_meta_key(bucket
),
3247 params
.objv_tracker
,
3252 int RGWBucketCtl::read_bucket_instance_info(const rgw_bucket
& bucket
,
3253 RGWBucketInfo
*info
,
3255 const BucketInstance::GetParams
& params
)
3257 int ret
= bmi_handler
->call(params
.bectx_params
, [&](RGWSI_Bucket_BI_Ctx
& ctx
) {
3258 return svc
.bucket
->read_bucket_instance_info(ctx
,
3259 RGWSI_Bucket::get_bi_meta_key(bucket
),
3265 params
.refresh_version
);
3272 if (params
.objv_tracker
) {
3273 *params
.objv_tracker
= info
->objv_tracker
;
3279 int RGWBucketCtl::read_bucket_info(const rgw_bucket
& bucket
,
3280 RGWBucketInfo
*info
,
3282 const BucketInstance::GetParams
& params
,
3283 RGWObjVersionTracker
*ep_objv_tracker
)
3285 const rgw_bucket
*b
= &bucket
;
3287 std::optional
<RGWBucketEntryPoint
> ep
;
3289 if (b
->bucket_id
.empty()) {
3292 int r
= read_bucket_entrypoint_info(*b
, &(*ep
), y
, RGWBucketCtl::Bucket::GetParams()
3293 .set_bectx_params(params
.bectx_params
)
3294 .set_objv_tracker(ep_objv_tracker
));
3302 int ret
= bmi_handler
->call(params
.bectx_params
, [&](RGWSI_Bucket_BI_Ctx
& ctx
) {
3303 return svc
.bucket
->read_bucket_instance_info(ctx
,
3304 RGWSI_Bucket::get_bi_meta_key(*b
),
3310 params
.refresh_version
);
3317 if (params
.objv_tracker
) {
3318 *params
.objv_tracker
= info
->objv_tracker
;
3324 int RGWBucketCtl::do_store_bucket_instance_info(RGWSI_Bucket_BI_Ctx
& ctx
,
3325 const rgw_bucket
& bucket
,
3326 RGWBucketInfo
& info
,
3328 const BucketInstance::PutParams
& params
)
3330 if (params
.objv_tracker
) {
3331 info
.objv_tracker
= *params
.objv_tracker
;
3334 return svc
.bucket
->store_bucket_instance_info(ctx
,
3335 RGWSI_Bucket::get_bi_meta_key(bucket
),
3344 int RGWBucketCtl::store_bucket_instance_info(const rgw_bucket
& bucket
,
3345 RGWBucketInfo
& info
,
3347 const BucketInstance::PutParams
& params
)
3349 return bmi_handler
->call([&](RGWSI_Bucket_BI_Ctx
& ctx
) {
3350 return do_store_bucket_instance_info(ctx
, bucket
, info
, y
, params
);
3354 int RGWBucketCtl::remove_bucket_instance_info(const rgw_bucket
& bucket
,
3355 RGWBucketInfo
& info
,
3357 const BucketInstance::RemoveParams
& params
)
3359 if (params
.objv_tracker
) {
3360 info
.objv_tracker
= *params
.objv_tracker
;
3363 return bmi_handler
->call([&](RGWSI_Bucket_BI_Ctx
& ctx
) {
3364 return svc
.bucket
->remove_bucket_instance_info(ctx
,
3365 RGWSI_Bucket::get_bi_meta_key(bucket
),
3372 int RGWBucketCtl::do_store_linked_bucket_info(RGWSI_Bucket_X_Ctx
& ctx
,
3373 RGWBucketInfo
& info
,
3374 RGWBucketInfo
*orig_info
,
3375 bool exclusive
, real_time mtime
,
3376 obj_version
*pep_objv
,
3377 map
<string
, bufferlist
> *pattrs
,
3378 bool create_entry_point
,
3381 bool create_head
= !info
.has_instance_obj
|| create_entry_point
;
3383 int ret
= svc
.bucket
->store_bucket_instance_info(ctx
.bi
,
3384 RGWSI_Bucket::get_bi_meta_key(info
.bucket
),
3395 return 0; /* done! */
3397 RGWBucketEntryPoint entry_point
;
3398 entry_point
.bucket
= info
.bucket
;
3399 entry_point
.owner
= info
.owner
;
3400 entry_point
.creation_time
= info
.creation_time
;
3401 entry_point
.linked
= true;
3402 RGWObjVersionTracker ot
;
3403 if (pep_objv
&& !pep_objv
->tag
.empty()) {
3404 ot
.write_version
= *pep_objv
;
3406 ot
.generate_new_write_ver(cct
);
3408 *pep_objv
= ot
.write_version
;
3411 ret
= svc
.bucket
->store_bucket_entrypoint_info(ctx
.ep
,
3412 RGWSI_Bucket::get_entrypoint_meta_key(info
.bucket
),
3424 int RGWBucketCtl::convert_old_bucket_info(RGWSI_Bucket_X_Ctx
& ctx
,
3425 const rgw_bucket
& bucket
,
3428 RGWBucketEntryPoint entry_point
;
3430 RGWObjVersionTracker ot
;
3431 map
<string
, bufferlist
> attrs
;
3433 auto cct
= svc
.bucket
->ctx();
3435 ldout(cct
, 10) << "RGWRados::convert_old_bucket_info(): bucket=" << bucket
<< dendl
;
3437 int ret
= svc
.bucket
->read_bucket_entrypoint_info(ctx
.ep
,
3438 RGWSI_Bucket::get_entrypoint_meta_key(bucket
),
3439 &entry_point
, &ot
, &ep_mtime
, &attrs
, y
);
3441 ldout(cct
, 0) << "ERROR: get_bucket_entrypoint_info() returned " << ret
<< " bucket=" << bucket
<< dendl
;
3445 if (!entry_point
.has_bucket_info
) {
3446 /* already converted! */
3450 info
= entry_point
.old_bucket_info
;
3452 ot
.generate_new_write_ver(cct
);
3454 ret
= do_store_linked_bucket_info(ctx
, info
, nullptr, false, ep_mtime
, &ot
.write_version
, &attrs
, true, y
);
3456 ldout(cct
, 0) << "ERROR: failed to put_linked_bucket_info(): " << ret
<< dendl
;
3463 int RGWBucketCtl::set_bucket_instance_attrs(RGWBucketInfo
& bucket_info
,
3464 map
<string
, bufferlist
>& attrs
,
3465 RGWObjVersionTracker
*objv_tracker
,
3468 return call([&](RGWSI_Bucket_X_Ctx
& ctx
) {
3469 rgw_bucket
& bucket
= bucket_info
.bucket
;
3471 if (!bucket_info
.has_instance_obj
) {
3472 /* an old bucket object, need to convert it */
3473 int ret
= convert_old_bucket_info(ctx
, bucket
, y
);
3475 ldout(cct
, 0) << "ERROR: failed converting old bucket info: " << ret
<< dendl
;
3480 return do_store_bucket_instance_info(ctx
.bi
,
3484 BucketInstance::PutParams().set_attrs(&attrs
)
3485 .set_objv_tracker(objv_tracker
)
3486 .set_orig_info(&bucket_info
));
3491 int RGWBucketCtl::link_bucket(const rgw_user
& user_id
,
3492 const rgw_bucket
& bucket
,
3493 ceph::real_time creation_time
,
3495 bool update_entrypoint
,
3498 return bm_handler
->call([&](RGWSI_Bucket_EP_Ctx
& ctx
) {
3499 return do_link_bucket(ctx
, user_id
, bucket
, creation_time
, y
,
3500 update_entrypoint
, pinfo
);
3504 int RGWBucketCtl::do_link_bucket(RGWSI_Bucket_EP_Ctx
& ctx
,
3505 const rgw_user
& user_id
,
3506 const rgw_bucket
& bucket
,
3507 ceph::real_time creation_time
,
3509 bool update_entrypoint
,
3514 RGWBucketEntryPoint ep
;
3515 RGWObjVersionTracker ot
;
3516 RGWObjVersionTracker
& rot
= (pinfo
) ? pinfo
->ep_objv
: ot
;
3517 map
<string
, bufferlist
> attrs
, *pattrs
= nullptr;
3520 if (update_entrypoint
) {
3521 meta_key
= RGWSI_Bucket::get_entrypoint_meta_key(bucket
);
3524 pattrs
= &pinfo
->attrs
;
3526 ret
= svc
.bucket
->read_bucket_entrypoint_info(ctx
,
3531 if (ret
< 0 && ret
!= -ENOENT
) {
3532 ldout(cct
, 0) << "ERROR: store->get_bucket_entrypoint_info() returned: "
3533 << cpp_strerror(-ret
) << dendl
;
3539 ret
= ctl
.user
->add_bucket(user_id
, bucket
, creation_time
);
3541 ldout(cct
, 0) << "ERROR: error adding bucket to user directory:"
3542 << " user=" << user_id
3543 << " bucket=" << bucket
3544 << " err=" << cpp_strerror(-ret
)
3549 if (!update_entrypoint
)
3555 ret
= svc
.bucket
->store_bucket_entrypoint_info(
3556 ctx
, meta_key
, ep
, false, real_time(), pattrs
, &rot
, y
);
3563 int r
= do_unlink_bucket(ctx
, user_id
, bucket
, y
, true);
3565 ldout(cct
, 0) << "ERROR: failed unlinking bucket on error cleanup: "
3566 << cpp_strerror(-r
) << dendl
;
3571 int RGWBucketCtl::unlink_bucket(const rgw_user
& user_id
, const rgw_bucket
& bucket
, optional_yield y
, bool update_entrypoint
)
3573 return bm_handler
->call([&](RGWSI_Bucket_EP_Ctx
& ctx
) {
3574 return do_unlink_bucket(ctx
, user_id
, bucket
, y
, update_entrypoint
);
3578 int RGWBucketCtl::do_unlink_bucket(RGWSI_Bucket_EP_Ctx
& ctx
,
3579 const rgw_user
& user_id
,
3580 const rgw_bucket
& bucket
,
3582 bool update_entrypoint
)
3584 int ret
= ctl
.user
->remove_bucket(user_id
, bucket
);
3586 ldout(cct
, 0) << "ERROR: error removing bucket from directory: "
3587 << cpp_strerror(-ret
)<< dendl
;
3590 if (!update_entrypoint
)
3593 RGWBucketEntryPoint ep
;
3594 RGWObjVersionTracker ot
;
3595 map
<string
, bufferlist
> attrs
;
3596 string meta_key
= RGWSI_Bucket::get_entrypoint_meta_key(bucket
);
3597 ret
= svc
.bucket
->read_bucket_entrypoint_info(ctx
, meta_key
, &ep
, &ot
, nullptr, &attrs
, y
);
3606 if (ep
.owner
!= user_id
) {
3607 ldout(cct
, 0) << "bucket entry point user mismatch, can't unlink bucket: " << ep
.owner
<< " != " << user_id
<< dendl
;
3612 return svc
.bucket
->store_bucket_entrypoint_info(ctx
, meta_key
, ep
, false, real_time(), &attrs
, &ot
, y
);
3615 int RGWBucketCtl::set_acl(ACLOwner
& owner
, rgw_bucket
& bucket
,
3616 RGWBucketInfo
& bucket_info
, bufferlist
& bl
,
3619 // set owner and acl
3620 bucket_info
.owner
= owner
.get_id();
3621 std::map
<std::string
, bufferlist
> attrs
{{RGW_ATTR_ACL
, bl
}};
3623 int r
= store_bucket_instance_info(bucket
, bucket_info
, y
,
3624 BucketInstance::PutParams().set_attrs(&attrs
));
3626 cerr
<< "ERROR: failed to set bucket owner: " << cpp_strerror(-r
) << std::endl
;
3633 // TODO: remove RGWRados dependency for bucket listing
3634 int RGWBucketCtl::chown(rgw::sal::RGWRadosStore
*store
, RGWBucketInfo
& bucket_info
,
3635 const rgw_user
& user_id
, const std::string
& display_name
,
3636 const std::string
& marker
, optional_yield y
)
3638 std::vector
<rgw_bucket_dir_entry
> objs
;
3639 map
<string
, bool> common_prefixes
;
3641 RGWRados::Bucket
target(store
->getRados(), bucket_info
);
3642 RGWRados::Bucket::List
list_op(&target
);
3644 list_op
.params
.list_versions
= true;
3645 list_op
.params
.allow_unordered
= true;
3646 list_op
.params
.marker
= marker
;
3648 bool is_truncated
= false;
3650 int max_entries
= 1000;
3652 //Loop through objects and update object acls to point to bucket owner
3655 RGWObjectCtx
obj_ctx(store
);
3657 int ret
= list_op
.list_objects(max_entries
, &objs
, &common_prefixes
, &is_truncated
, y
);
3659 ldout(store
->ctx(), 0) << "ERROR: list objects failed: " << cpp_strerror(-ret
) << dendl
;
3663 list_op
.params
.marker
= list_op
.get_next_marker();
3664 count
+= objs
.size();
3666 for (const auto& obj
: objs
) {
3668 rgw_obj
r_obj(bucket_info
.bucket
, obj
.key
);
3669 RGWRados::Object
op_target(store
->getRados(), bucket_info
, obj_ctx
, r_obj
);
3670 RGWRados::Object::Read
read_op(&op_target
);
3672 map
<string
, bufferlist
> attrs
;
3673 read_op
.params
.attrs
= &attrs
;
3674 ret
= read_op
.prepare(y
);
3676 ldout(store
->ctx(), 0) << "ERROR: failed to read object " << obj
.key
.name
<< cpp_strerror(-ret
) << dendl
;
3679 const auto& aiter
= attrs
.find(RGW_ATTR_ACL
);
3680 if (aiter
== attrs
.end()) {
3681 ldout(store
->ctx(), 0) << "ERROR: no acls found for object " << obj
.key
.name
<< " .Continuing with next object." << dendl
;
3684 bufferlist
& bl
= aiter
->second
;
3685 RGWAccessControlPolicy
policy(store
->ctx());
3689 owner
= policy
.get_owner();
3690 } catch (buffer::error
& err
) {
3691 ldout(store
->ctx(), 0) << "ERROR: decode policy failed" << err
<< dendl
;
3695 //Get the ACL from the policy
3696 RGWAccessControlList
& acl
= policy
.get_acl();
3698 //Remove grant that is set to old owner
3699 acl
.remove_canon_user_grant(owner
.get_id());
3701 //Create a grant and add grant
3703 grant
.set_canon(user_id
, display_name
, RGW_PERM_FULL_CONTROL
);
3704 acl
.add_grant(&grant
);
3706 //Update the ACL owner to the new user
3707 owner
.set_id(user_id
);
3708 owner
.set_name(display_name
);
3709 policy
.set_owner(owner
);
3714 obj_ctx
.set_atomic(r_obj
);
3715 ret
= store
->getRados()->set_attr(&obj_ctx
, bucket_info
, r_obj
, RGW_ATTR_ACL
, bl
);
3717 ldout(store
->ctx(), 0) << "ERROR: modify attr failed " << cpp_strerror(-ret
) << dendl
;
3722 cerr
<< count
<< " objects processed in " << bucket_info
.bucket
.name
3723 << ". Next marker " << list_op
.params
.marker
.name
<< std::endl
;
3724 } while(is_truncated
);
3728 int RGWBucketCtl::read_bucket_stats(const rgw_bucket
& bucket
,
3729 RGWBucketEnt
*result
,
3732 return call([&](RGWSI_Bucket_X_Ctx
& ctx
) {
3733 return svc
.bucket
->read_bucket_stats(ctx
, bucket
, result
, y
);
3737 int RGWBucketCtl::read_buckets_stats(map
<string
, RGWBucketEnt
>& m
,
3740 return call([&](RGWSI_Bucket_X_Ctx
& ctx
) {
3741 return svc
.bucket
->read_buckets_stats(ctx
, m
, y
);
3745 int RGWBucketCtl::sync_user_stats(const rgw_user
& user_id
,
3746 const RGWBucketInfo
& bucket_info
,
3753 int r
= svc
.bi
->read_stats(bucket_info
, pent
, null_yield
);
3755 ldout(cct
, 20) << __func__
<< "(): failed to read bucket stats (r=" << r
<< ")" << dendl
;
3759 return ctl
.user
->flush_bucket_stats(user_id
, *pent
);
3762 int RGWBucketCtl::get_sync_policy_handler(std::optional
<rgw_zone_id
> zone
,
3763 std::optional
<rgw_bucket
> bucket
,
3764 RGWBucketSyncPolicyHandlerRef
*phandler
,
3767 int r
= call([&](RGWSI_Bucket_X_Ctx
& ctx
) {
3768 return svc
.bucket_sync
->get_policy_handler(ctx
, zone
, bucket
, phandler
, y
);
3771 ldout(cct
, 20) << __func__
<< "(): failed to get policy handler for bucket=" << bucket
<< " (r=" << r
<< ")" << dendl
;
3777 int RGWBucketCtl::bucket_exports_data(const rgw_bucket
& bucket
,
3781 RGWBucketSyncPolicyHandlerRef handler
;
3783 int r
= get_sync_policy_handler(std::nullopt
, bucket
, &handler
, y
);
3788 return handler
->bucket_exports_data();
3791 int RGWBucketCtl::bucket_imports_data(const rgw_bucket
& bucket
,
3795 RGWBucketSyncPolicyHandlerRef handler
;
3797 int r
= get_sync_policy_handler(std::nullopt
, bucket
, &handler
, y
);
3802 return handler
->bucket_imports_data();
3805 RGWBucketMetadataHandlerBase
*RGWBucketMetaHandlerAllocator::alloc()
3807 return new RGWBucketMetadataHandler();
3810 RGWBucketInstanceMetadataHandlerBase
*RGWBucketInstanceMetaHandlerAllocator::alloc()
3812 return new RGWBucketInstanceMetadataHandler();
3815 RGWBucketMetadataHandlerBase
*RGWArchiveBucketMetaHandlerAllocator::alloc()
3817 return new RGWArchiveBucketMetadataHandler();
3820 RGWBucketInstanceMetadataHandlerBase
*RGWArchiveBucketInstanceMetaHandlerAllocator::alloc()
3822 return new RGWArchiveBucketInstanceMetadataHandler();