1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
10 #include <boost/algorithm/string/predicate.hpp>
11 #include <boost/optional.hpp>
13 #include "common/Clock.h"
14 #include "common/armor.h"
15 #include "common/mime.h"
16 #include "common/utf8.h"
17 #include "common/ceph_json.h"
19 #include "rgw_rados.h"
23 #include "rgw_acl_s3.h"
24 #include "rgw_acl_swift.h"
26 #include "rgw_bucket.h"
28 #include "rgw_multi.h"
29 #include "rgw_multi_del.h"
31 #include "rgw_cors_s3.h"
32 #include "rgw_rest_conn.h"
33 #include "rgw_rest_s3.h"
35 #include "rgw_client_io.h"
36 #include "rgw_compression.h"
38 #include "cls/lock/cls_lock_client.h"
39 #include "cls/rgw/cls_rgw_client.h"
42 #include "include/assert.h"
44 #include "compressor/Compressor.h"
46 #include "rgw_acl_swift.h"
48 #define dout_context g_ceph_context
49 #define dout_subsys ceph_subsys_rgw
52 using namespace librados
;
53 using ceph::crypto::MD5
;
56 static string mp_ns
= RGW_OBJ_NS_MULTIPART
;
57 static string shadow_ns
= RGW_OBJ_NS_SHADOW
;
59 static void forward_req_info(CephContext
*cct
, req_info
& info
, const std::string
& bucket_name
);
60 static int forward_request_to_master(struct req_state
*s
, obj_version
*objv
, RGWRados
*store
,
61 bufferlist
& in_data
, JSONParser
*jp
, req_info
*forward_info
= nullptr);
63 static MultipartMetaFilter mp_filter
;
65 static int parse_range(const char *range
, off_t
& ofs
, off_t
& end
, bool *partial_content
)
72 *partial_content
= false;
74 size_t pos
= s
.find("bytes=");
75 if (pos
== string::npos
) {
77 while (isspace(s
[pos
]))
80 while (isalpha(s
[end
]))
82 if (strncasecmp(s
.c_str(), "bytes", end
- pos
) != 0)
84 while (isspace(s
[end
]))
88 s
= s
.substr(end
+ 1);
90 s
= s
.substr(pos
+ 6); /* size of("bytes=") */
93 if (pos
== string::npos
)
96 *partial_content
= true;
98 ofs_str
= s
.substr(0, pos
);
99 end_str
= s
.substr(pos
+ 1);
100 if (end_str
.length()) {
101 end
= atoll(end_str
.c_str());
106 if (ofs_str
.length()) {
107 ofs
= atoll(ofs_str
.c_str());
108 } else { // RFC2616 suffix-byte-range-spec
113 if (end
>= 0 && end
< ofs
)
121 static int decode_policy(CephContext
*cct
,
123 RGWAccessControlPolicy
*policy
)
125 bufferlist::iterator iter
= bl
.begin();
127 policy
->decode(iter
);
128 } catch (buffer::error
& err
) {
129 ldout(cct
, 0) << "ERROR: could not decode policy, caught buffer::error" << dendl
;
132 if (cct
->_conf
->subsys
.should_gather(ceph_subsys_rgw
, 15)) {
133 RGWAccessControlPolicy_S3
*s3policy
= static_cast<RGWAccessControlPolicy_S3
*>(policy
);
134 ldout(cct
, 15) << __func__
<< " Read AccessControlPolicy";
135 s3policy
->to_xml(*_dout
);
142 static int get_user_policy_from_attr(CephContext
* const cct
,
143 RGWRados
* const store
,
144 map
<string
, bufferlist
>& attrs
,
145 RGWAccessControlPolicy
& policy
/* out */)
147 auto aiter
= attrs
.find(RGW_ATTR_ACL
);
148 if (aiter
!= attrs
.end()) {
149 int ret
= decode_policy(cct
, aiter
->second
, &policy
);
160 static int get_bucket_instance_policy_from_attr(CephContext
*cct
,
162 RGWBucketInfo
& bucket_info
,
163 map
<string
, bufferlist
>& bucket_attrs
,
164 RGWAccessControlPolicy
*policy
,
167 map
<string
, bufferlist
>::iterator aiter
= bucket_attrs
.find(RGW_ATTR_ACL
);
169 if (aiter
!= bucket_attrs
.end()) {
170 int ret
= decode_policy(cct
, aiter
->second
, policy
);
174 ldout(cct
, 0) << "WARNING: couldn't find acl header for bucket, generating default" << dendl
;
176 /* object exists, but policy is broken */
177 int r
= rgw_get_user_info_by_uid(store
, bucket_info
.owner
, uinfo
);
181 policy
->create_default(bucket_info
.owner
, uinfo
.display_name
);
186 static int get_obj_policy_from_attr(CephContext
*cct
,
188 RGWObjectCtx
& obj_ctx
,
189 RGWBucketInfo
& bucket_info
,
190 map
<string
, bufferlist
>& bucket_attrs
,
191 RGWAccessControlPolicy
*policy
,
197 RGWRados::Object
op_target(store
, bucket_info
, obj_ctx
, obj
);
198 RGWRados::Object::Read
rop(&op_target
);
200 ret
= rop
.get_attr(RGW_ATTR_ACL
, bl
);
202 ret
= decode_policy(cct
, bl
, policy
);
205 } else if (ret
== -ENODATA
) {
206 /* object exists, but policy is broken */
207 ldout(cct
, 0) << "WARNING: couldn't find acl header for object, generating default" << dendl
;
209 ret
= rgw_get_user_info_by_uid(store
, bucket_info
.owner
, uinfo
);
213 policy
->create_default(bucket_info
.owner
, uinfo
.display_name
);
220 * Get the AccessControlPolicy for an object off of disk.
221 * policy: must point to a valid RGWACL, and will be filled upon return.
222 * bucket: name of the bucket containing the object.
223 * object: name of the object to get the ACL for.
224 * Returns: 0 on success, -ERR# otherwise.
226 static int get_bucket_policy_from_attr(CephContext
*cct
,
228 RGWBucketInfo
& bucket_info
,
229 map
<string
, bufferlist
>& bucket_attrs
,
230 RGWAccessControlPolicy
*policy
)
232 rgw_raw_obj instance_obj
;
233 store
->get_bucket_instance_obj(bucket_info
.bucket
, instance_obj
);
234 return get_bucket_instance_policy_from_attr(cct
, store
, bucket_info
, bucket_attrs
,
235 policy
, instance_obj
);
238 static int get_obj_attrs(RGWRados
*store
, struct req_state
*s
, rgw_obj
& obj
, map
<string
, bufferlist
>& attrs
)
240 RGWRados::Object
op_target(store
, s
->bucket_info
, *static_cast<RGWObjectCtx
*>(s
->obj_ctx
), obj
);
241 RGWRados::Object::Read
read_op(&op_target
);
243 read_op
.params
.attrs
= &attrs
;
244 read_op
.params
.perr
= &s
->err
;
246 return read_op
.prepare();
249 static int modify_obj_attr(RGWRados
*store
, struct req_state
*s
, rgw_obj
& obj
, const char* attr_name
, bufferlist
& attr_val
)
251 map
<string
, bufferlist
> attrs
;
252 RGWRados::Object
op_target(store
, s
->bucket_info
, *static_cast<RGWObjectCtx
*>(s
->obj_ctx
), obj
);
253 RGWRados::Object::Read
read_op(&op_target
);
255 read_op
.params
.attrs
= &attrs
;
256 read_op
.params
.perr
= &s
->err
;
258 int r
= read_op
.prepare();
262 store
->set_atomic(s
->obj_ctx
, read_op
.state
.obj
);
263 attrs
[attr_name
] = attr_val
;
264 return store
->set_attrs(s
->obj_ctx
, s
->bucket_info
, read_op
.state
.obj
, attrs
, NULL
);
267 static int get_system_obj_attrs(RGWRados
*store
, struct req_state
*s
, rgw_raw_obj
& obj
, map
<string
, bufferlist
>& attrs
,
268 uint64_t *obj_size
, RGWObjVersionTracker
*objv_tracker
)
270 RGWRados::SystemObject
src(store
, *static_cast<RGWObjectCtx
*>(s
->obj_ctx
), obj
);
271 RGWRados::SystemObject::Read
rop(&src
);
273 rop
.stat_params
.attrs
= &attrs
;
274 rop
.stat_params
.obj_size
= obj_size
;
276 int ret
= rop
.stat(objv_tracker
);
280 static int read_bucket_policy(RGWRados
*store
,
282 RGWBucketInfo
& bucket_info
,
283 map
<string
, bufferlist
>& bucket_attrs
,
284 RGWAccessControlPolicy
*policy
,
287 if (!s
->system_request
&& bucket_info
.flags
& BUCKET_SUSPENDED
) {
288 ldout(s
->cct
, 0) << "NOTICE: bucket " << bucket_info
.bucket
.name
<< " is suspended" << dendl
;
289 return -ERR_USER_SUSPENDED
;
292 if (bucket
.name
.empty()) {
296 int ret
= get_bucket_policy_from_attr(s
->cct
, store
, bucket_info
, bucket_attrs
, policy
);
297 if (ret
== -ENOENT
) {
298 ret
= -ERR_NO_SUCH_BUCKET
;
304 static int read_obj_policy(RGWRados
*store
,
306 RGWBucketInfo
& bucket_info
,
307 map
<string
, bufferlist
>& bucket_attrs
,
308 RGWAccessControlPolicy
*policy
,
313 upload_id
= s
->info
.args
.get("uploadId");
316 if (!s
->system_request
&& bucket_info
.flags
& BUCKET_SUSPENDED
) {
317 ldout(s
->cct
, 0) << "NOTICE: bucket " << bucket_info
.bucket
.name
<< " is suspended" << dendl
;
318 return -ERR_USER_SUSPENDED
;
321 if (!upload_id
.empty()) {
322 /* multipart upload */
323 RGWMPObj
mp(object
.name
, upload_id
);
324 string oid
= mp
.get_meta();
325 obj
.init_ns(bucket
, oid
, mp_ns
);
326 obj
.set_in_extra_data(true);
328 obj
= rgw_obj(bucket
, object
);
330 RGWObjectCtx
*obj_ctx
= static_cast<RGWObjectCtx
*>(s
->obj_ctx
);
331 int ret
= get_obj_policy_from_attr(s
->cct
, store
, *obj_ctx
,
332 bucket_info
, bucket_attrs
, policy
, obj
);
333 if (ret
== -ENOENT
) {
334 /* object does not exist checking the bucket's ACL to make sure
335 that we send a proper error code */
336 RGWAccessControlPolicy
bucket_policy(s
->cct
);
337 ret
= get_bucket_policy_from_attr(s
->cct
, store
, bucket_info
, bucket_attrs
, &bucket_policy
);
342 const rgw_user
& bucket_owner
= bucket_policy
.get_owner().get_id();
343 if (bucket_owner
.compare(s
->user
->user_id
) != 0 &&
344 ! s
->auth
.identity
->is_admin_of(bucket_owner
) &&
345 ! bucket_policy
.verify_permission(*s
->auth
.identity
, s
->perm_mask
,
357 * Get the AccessControlPolicy for an user, bucket or object off of disk.
358 * s: The req_state to draw information from.
359 * only_bucket: If true, reads the user and bucket ACLs rather than the object ACL.
360 * Returns: 0 on success, -ERR# otherwise.
362 int rgw_build_bucket_policies(RGWRados
* store
, struct req_state
* s
)
366 RGWUserInfo bucket_owner_info
;
367 RGWObjectCtx
obj_ctx(store
);
369 string bi
= s
->info
.args
.get(RGW_SYS_PARAM_PREFIX
"bucket-instance");
371 ret
= rgw_bucket_parse_bucket_instance(bi
, &s
->bucket_instance_id
, &s
->bucket_instance_shard_id
);
377 if(s
->dialect
.compare("s3") == 0) {
378 s
->bucket_acl
= new RGWAccessControlPolicy_S3(s
->cct
);
379 } else if(s
->dialect
.compare("swift") == 0) {
380 /* We aren't allocating the account policy for those operations using
381 * the Swift's infrastructure that don't really need req_state::user.
382 * Typical example here is the implementation of /info. */
383 if (!s
->user
->user_id
.empty()) {
384 s
->user_acl
= std::unique_ptr
<RGWAccessControlPolicy
>(
385 new RGWAccessControlPolicy_SWIFTAcct(s
->cct
));
387 s
->bucket_acl
= new RGWAccessControlPolicy_SWIFT(s
->cct
);
389 s
->bucket_acl
= new RGWAccessControlPolicy(s
->cct
);
392 /* check if copy source is within the current domain */
393 if (!s
->src_bucket_name
.empty()) {
394 RGWBucketInfo source_info
;
396 if (s
->bucket_instance_id
.empty()) {
397 ret
= store
->get_bucket_info(obj_ctx
, s
->src_tenant_name
, s
->src_bucket_name
, source_info
, NULL
);
399 ret
= store
->get_bucket_instance_info(obj_ctx
, s
->bucket_instance_id
, source_info
, NULL
, NULL
);
402 string
& zonegroup
= source_info
.zonegroup
;
403 s
->local_source
= store
->get_zonegroup().equals(zonegroup
);
409 std::string display_name
;
412 s
->user
->display_name
,
415 if (!s
->bucket_name
.empty()) {
416 s
->bucket_exists
= true;
417 if (s
->bucket_instance_id
.empty()) {
418 ret
= store
->get_bucket_info(obj_ctx
, s
->bucket_tenant
, s
->bucket_name
, s
->bucket_info
, NULL
, &s
->bucket_attrs
);
420 ret
= store
->get_bucket_instance_info(obj_ctx
, s
->bucket_instance_id
, s
->bucket_info
, NULL
, &s
->bucket_attrs
);
423 if (ret
!= -ENOENT
) {
425 rgw_make_bucket_entry_name(s
->bucket_tenant
, s
->bucket_name
, bucket_log
);
426 ldout(s
->cct
, 0) << "NOTICE: couldn't get bucket from bucket_name (name=" << bucket_log
<< ")" << dendl
;
429 s
->bucket_exists
= false;
431 s
->bucket
= s
->bucket_info
.bucket
;
433 if (s
->bucket_exists
) {
434 ret
= read_bucket_policy(store
, s
, s
->bucket_info
, s
->bucket_attrs
, s
->bucket_acl
, s
->bucket
);
436 s
->bucket_info
.owner
,
437 s
->bucket_acl
->get_owner().get_display_name(),
440 s
->bucket_acl
->create_default(s
->user
->user_id
, s
->user
->display_name
);
441 ret
= -ERR_NO_SUCH_BUCKET
;
444 s
->bucket_owner
= s
->bucket_acl
->get_owner();
446 RGWZoneGroup zonegroup
;
447 int r
= store
->get_zonegroup(s
->bucket_info
.zonegroup
, zonegroup
);
449 if (!zonegroup
.endpoints
.empty()) {
450 s
->zonegroup_endpoint
= zonegroup
.endpoints
.front();
452 // use zonegroup's master zone endpoints
453 auto z
= zonegroup
.zones
.find(zonegroup
.master_zone
);
454 if (z
!= zonegroup
.zones
.end() && !z
->second
.endpoints
.empty()) {
455 s
->zonegroup_endpoint
= z
->second
.endpoints
.front();
458 s
->zonegroup_name
= zonegroup
.get_name();
460 if (r
< 0 && ret
== 0) {
464 if (s
->bucket_exists
&& !store
->get_zonegroup().equals(s
->bucket_info
.zonegroup
)) {
465 ldout(s
->cct
, 0) << "NOTICE: request for data in a different zonegroup (" << s
->bucket_info
.zonegroup
<< " != " << store
->get_zonegroup().get_id() << ")" << dendl
;
466 /* we now need to make sure that the operation actually requires copy source, that is
467 * it's a copy operation
469 if (store
->get_zonegroup().is_master
&& s
->system_request
) {
470 /*If this is the master, don't redirect*/
471 } else if (!s
->local_source
||
472 (s
->op
!= OP_PUT
&& s
->op
!= OP_COPY
) ||
474 return -ERR_PERMANENT_REDIRECT
;
479 /* handle user ACL only for those APIs which support it */
481 map
<string
, bufferlist
> uattrs
;
483 ret
= rgw_get_user_attrs_by_uid(store
, acct_acl_user
.uid
, uattrs
);
485 ret
= get_user_policy_from_attr(s
->cct
, store
, uattrs
, *s
->user_acl
);
487 if (-ENOENT
== ret
) {
488 /* In already existing clusters users won't have ACL. In such case
489 * assuming that only account owner has the rights seems to be
490 * reasonable. That allows to have only one verification logic.
491 * NOTE: there is small compatibility kludge for global, empty tenant:
492 * 1. if we try to reach an existing bucket, its owner is considered
494 * 2. otherwise account owner is identity stored in s->user->user_id. */
495 s
->user_acl
->create_default(acct_acl_user
.uid
,
496 acct_acl_user
.display_name
);
499 ldout(s
->cct
, 0) << "NOTICE: couldn't get user attrs for handling ACL (user_id="
513 * Get the AccessControlPolicy for a bucket or object off of disk.
514 * s: The req_state to draw information from.
515 * only_bucket: If true, reads the bucket ACL rather than the object ACL.
516 * Returns: 0 on success, -ERR# otherwise.
518 int rgw_build_object_policies(RGWRados
*store
, struct req_state
*s
,
523 if (!s
->object
.empty()) {
524 if (!s
->bucket_exists
) {
525 return -ERR_NO_SUCH_BUCKET
;
527 s
->object_acl
= new RGWAccessControlPolicy(s
->cct
);
529 rgw_obj
obj(s
->bucket
, s
->object
);
531 store
->set_atomic(s
->obj_ctx
, obj
);
533 store
->set_prefetch_data(s
->obj_ctx
, obj
);
535 ret
= read_obj_policy(store
, s
, s
->bucket_info
, s
->bucket_attrs
, s
->object_acl
, s
->bucket
, s
->object
);
541 static void rgw_bucket_object_pre_exec(struct req_state
*s
)
546 dump_bucket_from_state(s
);
549 int RGWGetObj::verify_permission()
551 obj
= rgw_obj(s
->bucket
, s
->object
);
552 store
->set_atomic(s
->obj_ctx
, obj
);
554 store
->set_prefetch_data(s
->obj_ctx
, obj
);
557 if (!verify_object_permission(s
, RGW_PERM_READ
)) {
565 int RGWOp::verify_op_mask()
567 uint32_t required_mask
= op_mask();
569 ldout(s
->cct
, 20) << "required_mask= " << required_mask
570 << " user.op_mask=" << s
->user
->op_mask
<< dendl
;
572 if ((s
->user
->op_mask
& required_mask
) != required_mask
) {
576 if (!s
->system_request
&& (required_mask
& RGW_OP_TYPE_MODIFY
) && !store
->zone_is_writeable()) {
577 ldout(s
->cct
, 5) << "NOTICE: modify request to a read-only zone by a non-system user, permission denied" << dendl
;
584 int RGWOp::do_aws4_auth_completion()
588 if (s
->aws4_auth_needs_complete
) {
590 ret
= RGW_Auth_S3::authorize_aws4_auth_complete(store
, s
);
591 s
->aws4_auth_needs_complete
= false;
595 /* verify signature */
596 if (s
->aws4_auth
->signature
!= s
->aws4_auth
->new_signature
) {
597 ret
= -ERR_SIGNATURE_NO_MATCH
;
598 ldout(s
->cct
, 20) << "delayed aws4 auth failed" << dendl
;
601 /* authorization ok */
602 dout(10) << "v4 auth ok" << dendl
;
608 int RGWOp::init_quota()
610 /* no quota enforcement for system requests */
611 if (s
->system_request
)
614 /* init quota related stuff */
615 if (!(s
->user
->op_mask
& RGW_OP_TYPE_MODIFY
)) {
619 /* only interested in object related ops */
620 if (s
->object
.empty()) {
624 RGWUserInfo owner_info
;
627 if (s
->user
->user_id
== s
->bucket_owner
.get_id()) {
630 int r
= rgw_get_user_info_by_uid(store
, s
->bucket_info
.owner
, owner_info
);
636 if (s
->bucket_info
.quota
.enabled
) {
637 bucket_quota
= s
->bucket_info
.quota
;
638 } else if (uinfo
->bucket_quota
.enabled
) {
639 bucket_quota
= uinfo
->bucket_quota
;
641 bucket_quota
= store
->get_bucket_quota();
644 if (uinfo
->user_quota
.enabled
) {
645 user_quota
= uinfo
->user_quota
;
647 user_quota
= store
->get_user_quota();
653 static bool validate_cors_rule_method(RGWCORSRule
*rule
, const char *req_meth
) {
657 dout(5) << "req_meth is null" << dendl
;
661 if (strcmp(req_meth
, "GET") == 0) flags
= RGW_CORS_GET
;
662 else if (strcmp(req_meth
, "POST") == 0) flags
= RGW_CORS_POST
;
663 else if (strcmp(req_meth
, "PUT") == 0) flags
= RGW_CORS_PUT
;
664 else if (strcmp(req_meth
, "DELETE") == 0) flags
= RGW_CORS_DELETE
;
665 else if (strcmp(req_meth
, "HEAD") == 0) flags
= RGW_CORS_HEAD
;
667 if ((rule
->get_allowed_methods() & flags
) == flags
) {
668 dout(10) << "Method " << req_meth
<< " is supported" << dendl
;
670 dout(5) << "Method " << req_meth
<< " is not supported" << dendl
;
677 int RGWOp::read_bucket_cors()
681 map
<string
, bufferlist
>::iterator aiter
= s
->bucket_attrs
.find(RGW_ATTR_CORS
);
682 if (aiter
== s
->bucket_attrs
.end()) {
683 ldout(s
->cct
, 20) << "no CORS configuration attr found" << dendl
;
685 return 0; /* no CORS configuration found */
692 bufferlist::iterator iter
= bl
.begin();
694 bucket_cors
.decode(iter
);
695 } catch (buffer::error
& err
) {
696 ldout(s
->cct
, 0) << "ERROR: could not decode policy, caught buffer::error" << dendl
;
699 if (s
->cct
->_conf
->subsys
.should_gather(ceph_subsys_rgw
, 15)) {
700 RGWCORSConfiguration_S3
*s3cors
= static_cast<RGWCORSConfiguration_S3
*>(&bucket_cors
);
701 ldout(s
->cct
, 15) << "Read RGWCORSConfiguration";
702 s3cors
->to_xml(*_dout
);
709 * If any of the header field-names is not a ASCII case-insensitive match for
710 * any of the values in list of headers do not set any additional headers and
711 * terminate this set of steps.
713 static void get_cors_response_headers(RGWCORSRule
*rule
, const char *req_hdrs
, string
& hdrs
, string
& exp_hdrs
, unsigned *max_age
) {
716 get_str_list(req_hdrs
, hl
);
717 for(list
<string
>::iterator it
= hl
.begin(); it
!= hl
.end(); ++it
) {
718 if (!rule
->is_header_allowed((*it
).c_str(), (*it
).length())) {
719 dout(5) << "Header " << (*it
) << " is not registered in this rule" << dendl
;
721 if (hdrs
.length() > 0) hdrs
.append(",");
726 rule
->format_exp_headers(exp_hdrs
);
727 *max_age
= rule
->get_max_age();
731 * Generate the CORS header response
733 * This is described in the CORS standard, section 6.2.
735 bool RGWOp::generate_cors_headers(string
& origin
, string
& method
, string
& headers
, string
& exp_headers
, unsigned *max_age
)
738 const char *orig
= s
->info
.env
->get("HTTP_ORIGIN");
745 op_ret
= read_bucket_cors();
751 dout(2) << "No CORS configuration set yet for this bucket" << dendl
;
756 RGWCORSRule
*rule
= bucket_cors
.host_name_rule(orig
);
761 * Set the Allowed-Origin header to a asterisk if this is allowed in the rule
762 * and no Authorization was send by the client
764 * The origin parameter specifies a URI that may access the resource. The browser must enforce this.
765 * For requests without credentials, the server may specify "*" as a wildcard,
766 * thereby allowing any origin to access the resource.
768 const char *authorization
= s
->info
.env
->get("HTTP_AUTHORIZATION");
769 if (!authorization
&& rule
->has_wildcard_origin())
773 const char *req_meth
= s
->info
.env
->get("HTTP_ACCESS_CONTROL_REQUEST_METHOD");
775 req_meth
= s
->info
.method
;
781 if (!validate_cors_rule_method(rule
, req_meth
)) {
787 const char *req_hdrs
= s
->info
.env
->get("HTTP_ACCESS_CONTROL_REQUEST_HEADERS");
790 get_cors_response_headers(rule
, req_hdrs
, headers
, exp_headers
, max_age
);
795 int RGWGetObj::read_user_manifest_part(rgw_bucket
& bucket
,
796 const rgw_bucket_dir_entry
& ent
,
797 RGWAccessControlPolicy
* const bucket_policy
,
798 const off_t start_ofs
,
801 ldout(s
->cct
, 20) << "user manifest obj=" << ent
.key
.name
<< "[" << ent
.key
.instance
<< "]" << dendl
;
802 RGWGetObj_CB
cb(this);
803 RGWGetDataCB
* filter
= &cb
;
804 boost::optional
<RGWGetObj_Decompress
> decompress
;
806 int64_t cur_ofs
= start_ofs
;
807 int64_t cur_end
= end_ofs
;
809 rgw_obj
part(bucket
, ent
.key
);
811 map
<string
, bufferlist
> attrs
;
814 RGWObjectCtx
obj_ctx(store
);
815 RGWAccessControlPolicy
obj_policy(s
->cct
);
817 ldout(s
->cct
, 20) << "reading obj=" << part
<< " ofs=" << cur_ofs
<< " end=" << cur_end
<< dendl
;
819 obj_ctx
.obj
.set_atomic(part
);
820 store
->set_prefetch_data(&obj_ctx
, part
);
822 RGWRados::Object
op_target(store
, s
->bucket_info
, obj_ctx
, part
);
823 RGWRados::Object::Read
read_op(&op_target
);
825 read_op
.conds
.if_match
= ent
.meta
.etag
.c_str();
826 read_op
.params
.attrs
= &attrs
;
827 read_op
.params
.obj_size
= &obj_size
;
828 read_op
.params
.perr
= &s
->err
;
830 op_ret
= read_op
.prepare();
833 op_ret
= read_op
.range_to_ofs(obj_size
, cur_ofs
, cur_end
);
836 bool need_decompress
;
837 op_ret
= rgw_compression_info_from_attrset(attrs
, need_decompress
, cs_info
);
839 lderr(s
->cct
) << "ERROR: failed to decode compression info, cannot decompress" << dendl
;
845 if (cs_info
.orig_size
!= ent
.meta
.size
) {
846 // hmm.. something wrong, object not as expected, abort!
847 ldout(s
->cct
, 0) << "ERROR: expected cs_info.orig_size=" << cs_info
.orig_size
<<
848 ", actual read size=" << ent
.meta
.size
<< dendl
;
851 decompress
.emplace(s
->cct
, &cs_info
, partial_content
, filter
);
852 filter
= &*decompress
;
856 if (obj_size
!= ent
.meta
.size
) {
857 // hmm.. something wrong, object not as expected, abort!
858 ldout(s
->cct
, 0) << "ERROR: expected obj_size=" << obj_size
<< ", actual read size=" << ent
.meta
.size
<< dendl
;
863 op_ret
= rgw_policy_from_attrset(s
->cct
, attrs
, &obj_policy
);
867 /* We can use global user_acl because LOs cannot have segments
868 * stored inside different accounts. */
869 if (s
->system_request
) {
870 ldout(s
->cct
, 2) << "overriding permissions due to system operation" << dendl
;
871 } else if (s
->auth
.identity
->is_admin_of(s
->user
->user_id
)) {
872 ldout(s
->cct
, 2) << "overriding permissions due to admin operation" << dendl
;
873 } else if (!verify_object_permission(s
, s
->user_acl
.get(), bucket_policy
,
874 &obj_policy
, RGW_PERM_READ
)) {
878 if (ent
.meta
.size
== 0) {
882 perfcounter
->inc(l_rgw_get_b
, cur_end
- cur_ofs
);
883 filter
->fixup_range(cur_ofs
, cur_end
);
884 op_ret
= read_op
.iterate(cur_ofs
, cur_end
, filter
);
886 op_ret
= filter
->flush();
890 static int iterate_user_manifest_parts(CephContext
* const cct
,
891 RGWRados
* const store
,
894 RGWBucketInfo
*pbucket_info
,
895 const string
& obj_prefix
,
896 RGWAccessControlPolicy
* const bucket_policy
,
897 uint64_t * const ptotal_len
,
898 uint64_t * const pobj_size
,
899 string
* const pobj_sum
,
900 int (*cb
)(rgw_bucket
& bucket
,
901 const rgw_bucket_dir_entry
& ent
,
902 RGWAccessControlPolicy
* const bucket_policy
,
906 void * const cb_param
)
908 rgw_bucket
& bucket
= pbucket_info
->bucket
;
909 uint64_t obj_ofs
= 0, len_count
= 0;
910 bool found_start
= false, found_end
= false, handled_end
= false;
913 vector
<rgw_bucket_dir_entry
> objs
;
915 utime_t start_time
= ceph_clock_now();
917 RGWRados::Bucket
target(store
, *pbucket_info
);
918 RGWRados::Bucket::List
list_op(&target
);
920 list_op
.params
.prefix
= obj_prefix
;
921 list_op
.params
.delim
= delim
;
925 #define MAX_LIST_OBJS 100
926 int r
= list_op
.list_objects(MAX_LIST_OBJS
, &objs
, NULL
, &is_truncated
);
931 for (rgw_bucket_dir_entry
& ent
: objs
) {
932 uint64_t cur_total_len
= obj_ofs
;
933 uint64_t start_ofs
= 0, end_ofs
= ent
.meta
.size
;
935 if (!found_start
&& cur_total_len
+ ent
.meta
.size
> (uint64_t)ofs
) {
936 start_ofs
= ofs
- obj_ofs
;
940 obj_ofs
+= ent
.meta
.size
;
942 etag_sum
.Update((const byte
*)ent
.meta
.etag
.c_str(),
943 ent
.meta
.etag
.length());
946 if (!found_end
&& obj_ofs
> (uint64_t)end
) {
947 end_ofs
= end
- cur_total_len
+ 1;
951 perfcounter
->tinc(l_rgw_get_lat
,
952 (ceph_clock_now() - start_time
));
954 if (found_start
&& !handled_end
) {
955 len_count
+= end_ofs
- start_ofs
;
958 r
= cb(bucket
, ent
, bucket_policy
, start_ofs
, end_ofs
, cb_param
);
965 handled_end
= found_end
;
966 start_time
= ceph_clock_now();
968 } while (is_truncated
);
971 *ptotal_len
= len_count
;
974 *pobj_size
= obj_ofs
;
977 complete_etag(etag_sum
, pobj_sum
);
983 struct rgw_slo_part
{
984 RGWAccessControlPolicy
*bucket_policy
;
990 rgw_slo_part() : bucket_policy(NULL
), size(0) {}
993 static int iterate_slo_parts(CephContext
*cct
,
997 map
<uint64_t, rgw_slo_part
>& slo_parts
,
998 int (*cb
)(rgw_bucket
& bucket
,
999 const rgw_bucket_dir_entry
& ent
,
1000 RGWAccessControlPolicy
*bucket_policy
,
1006 bool found_start
= false, found_end
= false;
1008 if (slo_parts
.empty()) {
1012 utime_t start_time
= ceph_clock_now();
1014 map
<uint64_t, rgw_slo_part
>::iterator iter
= slo_parts
.upper_bound(ofs
);
1015 if (iter
!= slo_parts
.begin()) {
1019 uint64_t obj_ofs
= iter
->first
;
1021 for (; iter
!= slo_parts
.end() && !found_end
; ++iter
) {
1022 rgw_slo_part
& part
= iter
->second
;
1023 rgw_bucket_dir_entry ent
;
1025 ent
.key
.name
= part
.obj_name
;
1026 ent
.meta
.size
= part
.size
;
1027 ent
.meta
.etag
= part
.etag
;
1029 uint64_t cur_total_len
= obj_ofs
;
1030 uint64_t start_ofs
= 0, end_ofs
= ent
.meta
.size
;
1032 if (!found_start
&& cur_total_len
+ ent
.meta
.size
> (uint64_t)ofs
) {
1033 start_ofs
= ofs
- obj_ofs
;
1037 obj_ofs
+= ent
.meta
.size
;
1039 if (!found_end
&& obj_ofs
> (uint64_t)end
) {
1040 end_ofs
= end
- cur_total_len
+ 1;
1044 perfcounter
->tinc(l_rgw_get_lat
,
1045 (ceph_clock_now() - start_time
));
1049 int r
= cb(part
.bucket
, ent
, part
.bucket_policy
, start_ofs
, end_ofs
, cb_param
);
1055 start_time
= ceph_clock_now();
1061 static int get_obj_user_manifest_iterate_cb(rgw_bucket
& bucket
,
1062 const rgw_bucket_dir_entry
& ent
,
1063 RGWAccessControlPolicy
* const bucket_policy
,
1064 const off_t start_ofs
,
1065 const off_t end_ofs
,
1068 RGWGetObj
*op
= static_cast<RGWGetObj
*>(param
);
1069 return op
->read_user_manifest_part(bucket
, ent
, bucket_policy
, start_ofs
, end_ofs
);
1072 int RGWGetObj::handle_user_manifest(const char *prefix
)
1074 ldout(s
->cct
, 2) << "RGWGetObj::handle_user_manifest() prefix=" << prefix
<< dendl
;
1076 string prefix_str
= prefix
;
1077 size_t pos
= prefix_str
.find('/');
1078 if (pos
== string::npos
)
1081 string bucket_name_raw
, bucket_name
;
1082 bucket_name_raw
= prefix_str
.substr(0, pos
);
1083 url_decode(bucket_name_raw
, bucket_name
);
1085 string obj_prefix_raw
, obj_prefix
;
1086 obj_prefix_raw
= prefix_str
.substr(pos
+ 1);
1087 url_decode(obj_prefix_raw
, obj_prefix
);
1091 RGWAccessControlPolicy
_bucket_policy(s
->cct
);
1092 RGWAccessControlPolicy
*bucket_policy
;
1093 RGWBucketInfo bucket_info
;
1094 RGWBucketInfo
*pbucket_info
;
1096 if (bucket_name
.compare(s
->bucket
.name
) != 0) {
1097 map
<string
, bufferlist
> bucket_attrs
;
1098 RGWObjectCtx
obj_ctx(store
);
1099 int r
= store
->get_bucket_info(obj_ctx
, s
->user
->user_id
.tenant
,
1100 bucket_name
, bucket_info
, NULL
,
1103 ldout(s
->cct
, 0) << "could not get bucket info for bucket="
1104 << bucket_name
<< dendl
;
1107 bucket
= bucket_info
.bucket
;
1108 pbucket_info
= &bucket_info
;
1109 bucket_policy
= &_bucket_policy
;
1110 r
= read_bucket_policy(store
, s
, bucket_info
, bucket_attrs
, bucket_policy
, bucket
);
1112 ldout(s
->cct
, 0) << "failed to read bucket policy" << dendl
;
1117 pbucket_info
= &s
->bucket_info
;
1118 bucket_policy
= s
->bucket_acl
;
1121 /* dry run to find out:
1122 * - total length (of the parts we are going to send to client),
1123 * - overall DLO's content size,
1124 * - md5 sum of overall DLO's content (for etag of Swift API). */
1125 int r
= iterate_user_manifest_parts(s
->cct
, store
, ofs
, end
,
1126 pbucket_info
, obj_prefix
, bucket_policy
,
1127 &total_len
, &s
->obj_size
, &lo_etag
,
1128 nullptr /* cb */, nullptr /* cb arg */);
1135 send_response_data(bl
, 0, 0);
1139 r
= iterate_user_manifest_parts(s
->cct
, store
, ofs
, end
,
1140 pbucket_info
, obj_prefix
, bucket_policy
,
1141 nullptr, nullptr, nullptr,
1142 get_obj_user_manifest_iterate_cb
, (void *)this);
1149 send_response_data(bl
, 0, 0);
1155 int RGWGetObj::handle_slo_manifest(bufferlist
& bl
)
1157 RGWSLOInfo slo_info
;
1158 bufferlist::iterator bliter
= bl
.begin();
1160 ::decode(slo_info
, bliter
);
1161 } catch (buffer::error
& err
) {
1162 ldout(s
->cct
, 0) << "ERROR: failed to decode slo manifest" << dendl
;
1165 ldout(s
->cct
, 2) << "RGWGetObj::handle_slo_manifest()" << dendl
;
1167 list
<RGWAccessControlPolicy
> allocated_policies
;
1168 map
<string
, RGWAccessControlPolicy
*> policies
;
1169 map
<string
, rgw_bucket
> buckets
;
1171 map
<uint64_t, rgw_slo_part
> slo_parts
;
1176 for (const auto& entry
: slo_info
.entries
) {
1177 const string
& path
= entry
.path
;
1179 /* If the path starts with slashes, strip them all. */
1180 const size_t pos_init
= path
.find_first_not_of('/');
1181 /* According to the documentation of std::string::find following check
1182 * is not necessary as we should get the std::string::npos propagation
1183 * here. This might be true with the accuracy to implementation's bugs.
1184 * See following question on SO:
1185 * http://stackoverflow.com/questions/1011790/why-does-stdstring-findtext-stdstringnpos-not-return-npos
1187 if (pos_init
== string::npos
) {
1191 const size_t pos_sep
= path
.find('/', pos_init
);
1192 if (pos_sep
== string::npos
) {
1196 string bucket_name
= path
.substr(pos_init
, pos_sep
- pos_init
);
1197 string obj_name
= path
.substr(pos_sep
+ 1);
1200 RGWAccessControlPolicy
*bucket_policy
;
1202 if (bucket_name
.compare(s
->bucket
.name
) != 0) {
1203 const auto& piter
= policies
.find(bucket_name
);
1204 if (piter
!= policies
.end()) {
1205 bucket_policy
= piter
->second
;
1206 bucket
= buckets
[bucket_name
];
1208 allocated_policies
.push_back(RGWAccessControlPolicy(s
->cct
));
1209 RGWAccessControlPolicy
& _bucket_policy
= allocated_policies
.back();
1211 RGWBucketInfo bucket_info
;
1212 map
<string
, bufferlist
> bucket_attrs
;
1213 RGWObjectCtx
obj_ctx(store
);
1214 int r
= store
->get_bucket_info(obj_ctx
, s
->user
->user_id
.tenant
,
1215 bucket_name
, bucket_info
, nullptr,
1218 ldout(s
->cct
, 0) << "could not get bucket info for bucket="
1219 << bucket_name
<< dendl
;
1222 bucket
= bucket_info
.bucket
;
1223 bucket_policy
= &_bucket_policy
;
1224 r
= read_bucket_policy(store
, s
, bucket_info
, bucket_attrs
, bucket_policy
,
1227 ldout(s
->cct
, 0) << "failed to read bucket policy for bucket "
1231 buckets
[bucket_name
] = bucket
;
1232 policies
[bucket_name
] = bucket_policy
;
1236 bucket_policy
= s
->bucket_acl
;
1240 part
.bucket_policy
= bucket_policy
;
1241 part
.bucket
= bucket
;
1242 part
.obj_name
= obj_name
;
1243 part
.size
= entry
.size_bytes
;
1244 part
.etag
= entry
.etag
;
1245 ldout(s
->cct
, 20) << "slo_part: ofs=" << ofs
1246 << " bucket=" << part
.bucket
1247 << " obj=" << part
.obj_name
1248 << " size=" << part
.size
1249 << " etag=" << part
.etag
1252 etag_sum
.Update((const byte
*)entry
.etag
.c_str(),
1253 entry
.etag
.length());
1255 slo_parts
[total_len
] = part
;
1256 total_len
+= part
.size
;
1259 complete_etag(etag_sum
, &lo_etag
);
1261 s
->obj_size
= slo_info
.total_size
;
1262 ldout(s
->cct
, 20) << "s->obj_size=" << s
->obj_size
<< dendl
;
1265 ofs
= total_len
- std::min(-ofs
, static_cast<off_t
>(total_len
));
1268 if (end
< 0 || end
>= static_cast<off_t
>(total_len
)) {
1269 end
= total_len
- 1;
1272 total_len
= end
- ofs
+ 1;
1274 int r
= iterate_slo_parts(s
->cct
, store
, ofs
, end
, slo_parts
,
1275 get_obj_user_manifest_iterate_cb
, (void *)this);
1283 int RGWGetObj::get_data_cb(bufferlist
& bl
, off_t bl_ofs
, off_t bl_len
)
1285 /* garbage collection related handling */
1286 utime_t start_time
= ceph_clock_now();
1287 if (start_time
> gc_invalidate_time
) {
1288 int r
= store
->defer_gc(s
->obj_ctx
, s
->bucket_info
, obj
);
1290 dout(0) << "WARNING: could not defer gc entry for obj" << dendl
;
1292 gc_invalidate_time
= start_time
;
1293 gc_invalidate_time
+= (s
->cct
->_conf
->rgw_gc_obj_min_wait
/ 2);
1295 return send_response_data(bl
, bl_ofs
, bl_len
);
1298 bool RGWGetObj::prefetch_data()
1300 /* HEAD request, stop prefetch*/
1305 bool prefetch_first_chunk
= true;
1306 range_str
= s
->info
.env
->get("HTTP_RANGE");
1309 int r
= parse_range(range_str
, ofs
, end
, &partial_content
);
1310 /* error on parsing the range, stop prefetch and will fail in execte() */
1312 range_parsed
= false;
1315 range_parsed
= true;
1317 /* range get goes to shadown objects, stop prefetch */
1318 if (ofs
>= s
->cct
->_conf
->rgw_max_chunk_size
) {
1319 prefetch_first_chunk
= false;
1323 return get_data
&& prefetch_first_chunk
;
1325 void RGWGetObj::pre_exec()
1327 rgw_bucket_object_pre_exec(s
);
1330 static bool object_is_expired(map
<string
, bufferlist
>& attrs
) {
1331 map
<string
, bufferlist
>::iterator iter
= attrs
.find(RGW_ATTR_DELETE_AT
);
1332 if (iter
!= attrs
.end()) {
1335 ::decode(delete_at
, iter
->second
);
1336 } catch (buffer::error
& err
) {
1337 dout(0) << "ERROR: " << __func__
<< ": failed to decode " RGW_ATTR_DELETE_AT
" attr" << dendl
;
1341 if (delete_at
<= ceph_clock_now() && !delete_at
.is_zero()) {
1349 void RGWGetObj::execute()
1351 utime_t start_time
= s
->time
;
1353 gc_invalidate_time
= ceph_clock_now();
1354 gc_invalidate_time
+= (s
->cct
->_conf
->rgw_gc_obj_min_wait
/ 2);
1356 bool need_decompress
;
1357 int64_t ofs_x
, end_x
;
1359 RGWGetObj_CB
cb(this);
1360 RGWGetDataCB
* filter
= (RGWGetDataCB
*)&cb
;
1361 boost::optional
<RGWGetObj_Decompress
> decompress
;
1362 std::unique_ptr
<RGWGetDataCB
> decrypt
;
1363 map
<string
, bufferlist
>::iterator attr_iter
;
1365 perfcounter
->inc(l_rgw_get
);
1367 RGWRados::Object
op_target(store
, s
->bucket_info
, *static_cast<RGWObjectCtx
*>(s
->obj_ctx
), obj
);
1368 RGWRados::Object::Read
read_op(&op_target
);
1370 op_ret
= get_params();
1374 op_ret
= init_common();
1378 read_op
.conds
.mod_ptr
= mod_ptr
;
1379 read_op
.conds
.unmod_ptr
= unmod_ptr
;
1380 read_op
.conds
.high_precision_time
= s
->system_request
; /* system request need to use high precision time */
1381 read_op
.conds
.mod_zone_id
= mod_zone_id
;
1382 read_op
.conds
.mod_pg_ver
= mod_pg_ver
;
1383 read_op
.conds
.if_match
= if_match
;
1384 read_op
.conds
.if_nomatch
= if_nomatch
;
1385 read_op
.params
.attrs
= &attrs
;
1386 read_op
.params
.lastmod
= &lastmod
;
1387 read_op
.params
.obj_size
= &s
->obj_size
;
1388 read_op
.params
.perr
= &s
->err
;
1390 op_ret
= read_op
.prepare();
1394 /* STAT ops don't need data, and do no i/o */
1395 if (get_type() == RGW_OP_STAT_OBJ
) {
1399 /* start gettorrent */
1400 if (torrent
.get_flag())
1402 torrent
.init(s
, store
);
1403 torrent
.get_torrent_file(op_ret
, read_op
, total_len
, bl
, obj
);
1406 ldout(s
->cct
, 0) << "ERROR: failed to get_torrent_file ret= " << op_ret
1410 op_ret
= send_response_data(bl
, 0, total_len
);
1413 ldout(s
->cct
, 0) << "ERROR: failed to send_response_data ret= " << op_ret
1419 /* end gettorrent */
1421 op_ret
= rgw_compression_info_from_attrset(attrs
, need_decompress
, cs_info
);
1423 lderr(s
->cct
) << "ERROR: failed to decode compression info, cannot decompress" << dendl
;
1426 if (need_decompress
) {
1427 s
->obj_size
= cs_info
.orig_size
;
1428 decompress
.emplace(s
->cct
, &cs_info
, partial_content
, filter
);
1429 filter
= &*decompress
;
1431 // for range requests with obj size 0
1432 if (range_str
&& !(s
->obj_size
)) {
1438 op_ret
= read_op
.range_to_ofs(s
->obj_size
, ofs
, end
);
1441 total_len
= (ofs
<= end
? end
+ 1 - ofs
: 0);
1443 attr_iter
= attrs
.find(RGW_ATTR_USER_MANIFEST
);
1444 if (attr_iter
!= attrs
.end() && !skip_manifest
) {
1445 op_ret
= handle_user_manifest(attr_iter
->second
.c_str());
1447 ldout(s
->cct
, 0) << "ERROR: failed to handle user manifest ret="
1454 attr_iter
= attrs
.find(RGW_ATTR_SLO_MANIFEST
);
1455 if (attr_iter
!= attrs
.end() && !skip_manifest
) {
1457 op_ret
= handle_slo_manifest(attr_iter
->second
);
1459 ldout(s
->cct
, 0) << "ERROR: failed to handle slo manifest ret=" << op_ret
1466 /* Check whether the object has expired. Swift API documentation
1467 * stands that we should return 404 Not Found in such case. */
1468 if (need_object_expiration() && object_is_expired(attrs
)) {
1475 /* STAT ops don't need data, and do no i/o */
1476 if (get_type() == RGW_OP_STAT_OBJ
) {
1480 attr_iter
= attrs
.find(RGW_ATTR_MANIFEST
);
1481 op_ret
= this->get_decrypt_filter(&decrypt
, filter
,
1482 attr_iter
!= attrs
.end() ? &(attr_iter
->second
) : nullptr);
1483 if (decrypt
!= nullptr) {
1484 filter
= decrypt
.get();
1490 if (!get_data
|| ofs
> end
) {
1491 send_response_data(bl
, 0, 0);
1495 perfcounter
->inc(l_rgw_get_b
, end
- ofs
);
1499 filter
->fixup_range(ofs_x
, end_x
);
1500 op_ret
= read_op
.iterate(ofs_x
, end_x
, filter
);
1503 op_ret
= filter
->flush();
1505 perfcounter
->tinc(l_rgw_get_lat
,
1506 (ceph_clock_now() - start_time
));
1511 op_ret
= send_response_data(bl
, 0, 0);
1518 send_response_data_error();
1521 int RGWGetObj::init_common()
1524 /* range parsed error when prefetch*/
1525 if (!range_parsed
) {
1526 int r
= parse_range(range_str
, ofs
, end
, &partial_content
);
1532 if (parse_time(if_mod
, &mod_time
) < 0)
1534 mod_ptr
= &mod_time
;
1538 if (parse_time(if_unmod
, &unmod_time
) < 0)
1540 unmod_ptr
= &unmod_time
;
1546 int RGWListBuckets::verify_permission()
1548 if (!verify_user_permission(s
, RGW_PERM_READ
)) {
1555 int RGWGetUsage::verify_permission()
1557 if (s
->auth
.identity
->is_anonymous()) {
1564 void RGWListBuckets::execute()
1567 bool started
= false;
1568 uint64_t total_count
= 0;
1570 uint64_t max_buckets
= s
->cct
->_conf
->rgw_list_buckets_max_chunk
;
1572 op_ret
= get_params();
1577 if (supports_account_metadata()) {
1578 op_ret
= rgw_get_user_attrs_by_uid(store
, s
->user
->user_id
, attrs
);
1584 is_truncated
= false;
1586 RGWUserBuckets buckets
;
1587 uint64_t read_count
;
1589 read_count
= min(limit
- total_count
, (uint64_t)max_buckets
);
1591 read_count
= max_buckets
;
1594 op_ret
= rgw_read_user_buckets(store
, s
->user
->user_id
, buckets
,
1595 marker
, end_marker
, read_count
,
1596 should_get_stats(), &is_truncated
,
1599 /* hmm.. something wrong here.. the user was authenticated, so it
1601 ldout(s
->cct
, 10) << "WARNING: failed on rgw_get_user_buckets uid="
1602 << s
->user
->user_id
<< dendl
;
1605 map
<string
, RGWBucketEnt
>& m
= buckets
.get_buckets();
1606 map
<string
, RGWBucketEnt
>::iterator iter
;
1607 for (iter
= m
.begin(); iter
!= m
.end(); ++iter
) {
1608 RGWBucketEnt
& bucket
= iter
->second
;
1609 buckets_size
+= bucket
.size
;
1610 buckets_size_rounded
+= bucket
.size_rounded
;
1611 buckets_objcount
+= bucket
.count
;
1613 buckets_count
+= m
.size();
1614 total_count
+= m
.size();
1616 done
= (m
.size() < read_count
|| (limit
>= 0 && total_count
>= (uint64_t)limit
));
1619 send_response_begin(buckets
.count() > 0);
1624 send_response_data(buckets
);
1626 map
<string
, RGWBucketEnt
>::reverse_iterator riter
= m
.rbegin();
1627 marker
= riter
->first
;
1629 } while (is_truncated
&& !done
);
1633 send_response_begin(false);
1635 send_response_end();
1638 void RGWGetUsage::execute()
1640 uint64_t start_epoch
= 0;
1641 uint64_t end_epoch
= (uint64_t)-1;
1642 op_ret
= get_params();
1646 if (!start_date
.empty()) {
1647 op_ret
= utime_t::parse_date(start_date
, &start_epoch
, NULL
);
1649 ldout(store
->ctx(), 0) << "ERROR: failed to parse start date" << dendl
;
1654 if (!end_date
.empty()) {
1655 op_ret
= utime_t::parse_date(end_date
, &end_epoch
, NULL
);
1657 ldout(store
->ctx(), 0) << "ERROR: failed to parse end date" << dendl
;
1662 uint32_t max_entries
= 1000;
1664 bool is_truncated
= true;
1666 RGWUsageIter usage_iter
;
1668 while (is_truncated
) {
1669 op_ret
= store
->read_usage(s
->user
->user_id
, start_epoch
, end_epoch
, max_entries
,
1670 &is_truncated
, usage_iter
, usage
);
1672 if (op_ret
== -ENOENT
) {
1674 is_truncated
= false;
1682 op_ret
= rgw_user_sync_all_stats(store
, s
->user
->user_id
);
1684 ldout(store
->ctx(), 0) << "ERROR: failed to sync user stats: " << dendl
;
1688 string user_str
= s
->user
->user_id
.to_str();
1689 op_ret
= store
->cls_user_get_header(user_str
, &header
);
1691 ldout(store
->ctx(), 0) << "ERROR: can't read user header: " << dendl
;
1698 int RGWStatAccount::verify_permission()
1700 if (!verify_user_permission(s
, RGW_PERM_READ
)) {
1707 void RGWStatAccount::execute()
1710 bool is_truncated
= false;
1711 uint64_t max_buckets
= s
->cct
->_conf
->rgw_list_buckets_max_chunk
;
1714 RGWUserBuckets buckets
;
1716 op_ret
= rgw_read_user_buckets(store
, s
->user
->user_id
, buckets
, marker
,
1717 string(), max_buckets
, true, &is_truncated
);
1719 /* hmm.. something wrong here.. the user was authenticated, so it
1721 ldout(s
->cct
, 10) << "WARNING: failed on rgw_get_user_buckets uid="
1722 << s
->user
->user_id
<< dendl
;
1725 map
<string
, RGWBucketEnt
>& m
= buckets
.get_buckets();
1726 map
<string
, RGWBucketEnt
>::iterator iter
;
1727 for (iter
= m
.begin(); iter
!= m
.end(); ++iter
) {
1728 RGWBucketEnt
& bucket
= iter
->second
;
1729 buckets_size
+= bucket
.size
;
1730 buckets_size_rounded
+= bucket
.size_rounded
;
1731 buckets_objcount
+= bucket
.count
;
1733 marker
= iter
->first
;
1735 buckets_count
+= m
.size();
1738 } while (is_truncated
);
1741 int RGWGetBucketVersioning::verify_permission()
1743 if (false == s
->auth
.identity
->is_owner_of(s
->bucket_owner
.get_id())) {
1750 void RGWGetBucketVersioning::pre_exec()
1752 rgw_bucket_object_pre_exec(s
);
1755 void RGWGetBucketVersioning::execute()
1757 versioned
= s
->bucket_info
.versioned();
1758 versioning_enabled
= s
->bucket_info
.versioning_enabled();
1761 int RGWSetBucketVersioning::verify_permission()
1763 if (false == s
->auth
.identity
->is_owner_of(s
->bucket_owner
.get_id())) {
1770 void RGWSetBucketVersioning::pre_exec()
1772 rgw_bucket_object_pre_exec(s
);
1775 void RGWSetBucketVersioning::execute()
1777 op_ret
= get_params();
1781 if (!store
->is_meta_master()) {
1782 op_ret
= forward_request_to_master(s
, NULL
, store
, in_data
, nullptr);
1784 ldout(s
->cct
, 20) << __func__
<< "forward_request_to_master returned ret=" << op_ret
<< dendl
;
1789 if (enable_versioning
) {
1790 s
->bucket_info
.flags
|= BUCKET_VERSIONED
;
1791 s
->bucket_info
.flags
&= ~BUCKET_VERSIONS_SUSPENDED
;
1793 s
->bucket_info
.flags
|= (BUCKET_VERSIONED
| BUCKET_VERSIONS_SUSPENDED
);
1796 op_ret
= store
->put_bucket_instance_info(s
->bucket_info
, false, real_time(),
1799 ldout(s
->cct
, 0) << "NOTICE: put_bucket_info on bucket=" << s
->bucket
.name
1800 << " returned err=" << op_ret
<< dendl
;
1805 int RGWGetBucketWebsite::verify_permission()
1807 if (s
->user
->user_id
.compare(s
->bucket_owner
.get_id()) != 0)
1813 void RGWGetBucketWebsite::pre_exec()
1815 rgw_bucket_object_pre_exec(s
);
1818 void RGWGetBucketWebsite::execute()
1820 if (!s
->bucket_info
.has_website
) {
1825 int RGWSetBucketWebsite::verify_permission()
1827 if (s
->user
->user_id
.compare(s
->bucket_owner
.get_id()) != 0)
1833 void RGWSetBucketWebsite::pre_exec()
1835 rgw_bucket_object_pre_exec(s
);
1838 void RGWSetBucketWebsite::execute()
1840 op_ret
= get_params();
1845 s
->bucket_info
.has_website
= true;
1846 s
->bucket_info
.website_conf
= website_conf
;
1848 op_ret
= store
->put_bucket_instance_info(s
->bucket_info
, false, real_time(), &s
->bucket_attrs
);
1850 ldout(s
->cct
, 0) << "NOTICE: put_bucket_info on bucket=" << s
->bucket
.name
<< " returned err=" << op_ret
<< dendl
;
1855 int RGWDeleteBucketWebsite::verify_permission()
1857 if (s
->user
->user_id
.compare(s
->bucket_owner
.get_id()) != 0)
1863 void RGWDeleteBucketWebsite::pre_exec()
1865 rgw_bucket_object_pre_exec(s
);
1868 void RGWDeleteBucketWebsite::execute()
1870 s
->bucket_info
.has_website
= false;
1871 s
->bucket_info
.website_conf
= RGWBucketWebsiteConf();
1873 op_ret
= store
->put_bucket_instance_info(s
->bucket_info
, false, real_time(), &s
->bucket_attrs
);
1875 ldout(s
->cct
, 0) << "NOTICE: put_bucket_info on bucket=" << s
->bucket
.name
<< " returned err=" << op_ret
<< dendl
;
1880 int RGWStatBucket::verify_permission()
1882 if (!verify_bucket_permission(s
, RGW_PERM_READ
)) {
1889 void RGWStatBucket::pre_exec()
1891 rgw_bucket_object_pre_exec(s
);
1894 void RGWStatBucket::execute()
1896 if (!s
->bucket_exists
) {
1897 op_ret
= -ERR_NO_SUCH_BUCKET
;
1901 RGWUserBuckets buckets
;
1902 bucket
.bucket
= s
->bucket
;
1903 buckets
.add(bucket
);
1904 map
<string
, RGWBucketEnt
>& m
= buckets
.get_buckets();
1905 op_ret
= store
->update_containers_stats(m
);
1910 map
<string
, RGWBucketEnt
>::iterator iter
= m
.find(bucket
.bucket
.name
);
1911 if (iter
!= m
.end()) {
1912 bucket
= iter
->second
;
1919 int RGWListBucket::verify_permission()
1921 if (!verify_bucket_permission(s
, RGW_PERM_READ
)) {
1928 int RGWListBucket::parse_max_keys()
1930 if (!max_keys
.empty()) {
1932 max
= strtol(max_keys
.c_str(), &endptr
, 10);
1934 while (*endptr
&& isspace(*endptr
)) // ignore white space
1947 void RGWListBucket::pre_exec()
1949 rgw_bucket_object_pre_exec(s
);
1952 void RGWListBucket::execute()
1954 if (!s
->bucket_exists
) {
1955 op_ret
= -ERR_NO_SUCH_BUCKET
;
1959 op_ret
= get_params();
1963 if (need_container_stats()) {
1964 map
<string
, RGWBucketEnt
> m
;
1965 m
[s
->bucket
.name
] = RGWBucketEnt();
1966 m
.begin()->second
.bucket
= s
->bucket
;
1967 op_ret
= store
->update_containers_stats(m
);
1969 bucket
= m
.begin()->second
;
1973 RGWRados::Bucket
target(store
, s
->bucket_info
);
1974 if (shard_id
>= 0) {
1975 target
.set_shard_id(shard_id
);
1977 RGWRados::Bucket::List
list_op(&target
);
1979 list_op
.params
.prefix
= prefix
;
1980 list_op
.params
.delim
= delimiter
;
1981 list_op
.params
.marker
= marker
;
1982 list_op
.params
.end_marker
= end_marker
;
1983 list_op
.params
.list_versions
= list_versions
;
1985 op_ret
= list_op
.list_objects(max
, &objs
, &common_prefixes
, &is_truncated
);
1986 if (op_ret
>= 0 && !delimiter
.empty()) {
1987 next_marker
= list_op
.get_next_marker();
1991 int RGWGetBucketLogging::verify_permission()
1993 if (false == s
->auth
.identity
->is_owner_of(s
->bucket_owner
.get_id())) {
2000 int RGWGetBucketLocation::verify_permission()
2002 if (false == s
->auth
.identity
->is_owner_of(s
->bucket_owner
.get_id())) {
2009 int RGWCreateBucket::verify_permission()
2011 /* This check is mostly needed for S3 that doesn't support account ACL.
2012 * Swift doesn't allow to delegate any permission to an anonymous user,
2013 * so it will become an early exit in such case. */
2014 if (s
->auth
.identity
->is_anonymous()) {
2018 if (!verify_user_permission(s
, RGW_PERM_WRITE
)) {
2022 if (s
->user
->user_id
.tenant
!= s
->bucket_tenant
) {
2023 ldout(s
->cct
, 10) << "user cannot create a bucket in a different tenant"
2024 << " (user_id.tenant=" << s
->user
->user_id
.tenant
2025 << " requested=" << s
->bucket_tenant
<< ")"
2029 if (s
->user
->max_buckets
< 0) {
2033 if (s
->user
->max_buckets
) {
2034 RGWUserBuckets buckets
;
2036 bool is_truncated
= false;
2037 op_ret
= rgw_read_user_buckets(store
, s
->user
->user_id
, buckets
,
2038 marker
, string(), s
->user
->max_buckets
,
2039 false, &is_truncated
);
2044 if ((int)buckets
.count() >= s
->user
->max_buckets
) {
2045 return -ERR_TOO_MANY_BUCKETS
;
2052 static int forward_request_to_master(struct req_state
*s
, obj_version
*objv
,
2053 RGWRados
*store
, bufferlist
& in_data
,
2054 JSONParser
*jp
, req_info
*forward_info
)
2056 if (!store
->rest_master_conn
) {
2057 ldout(s
->cct
, 0) << "rest connection is invalid" << dendl
;
2060 ldout(s
->cct
, 0) << "sending request to master zonegroup" << dendl
;
2061 bufferlist response
;
2062 string uid_str
= s
->user
->user_id
.to_str();
2063 #define MAX_REST_RESPONSE (128 * 1024) // we expect a very small response
2064 int ret
= store
->rest_master_conn
->forward(uid_str
, (forward_info
? *forward_info
: s
->info
),
2065 objv
, MAX_REST_RESPONSE
, &in_data
, &response
);
2069 ldout(s
->cct
, 20) << "response: " << response
.c_str() << dendl
;
2070 if (jp
&& !jp
->parse(response
.c_str(), response
.length())) {
2071 ldout(s
->cct
, 0) << "failed parsing response from master zonegroup" << dendl
;
2078 void RGWCreateBucket::pre_exec()
2080 rgw_bucket_object_pre_exec(s
);
2083 static void prepare_add_del_attrs(const map
<string
, bufferlist
>& orig_attrs
,
2084 map
<string
, bufferlist
>& out_attrs
,
2085 map
<string
, bufferlist
>& out_rmattrs
)
2087 for (const auto& kv
: orig_attrs
) {
2088 const string
& name
= kv
.first
;
2090 /* Check if the attr is user-defined metadata item. */
2091 if (name
.compare(0, sizeof(RGW_ATTR_META_PREFIX
) - 1,
2092 RGW_ATTR_META_PREFIX
) == 0) {
2093 /* For the objects all existing meta attrs have to be removed. */
2094 out_rmattrs
[name
] = kv
.second
;
2095 } else if (out_attrs
.find(name
) == std::end(out_attrs
)) {
2096 out_attrs
[name
] = kv
.second
;
2101 /* Fuse resource metadata basing on original attributes in @orig_attrs, set
2102 * of _custom_ attribute names to remove in @rmattr_names and attributes in
2103 * @out_attrs. Place results in @out_attrs.
2105 * NOTE: it's supposed that all special attrs already present in @out_attrs
2106 * will be preserved without any change. Special attributes are those which
2107 * names start with RGW_ATTR_META_PREFIX. They're complement to custom ones
2108 * used for X-Account-Meta-*, X-Container-Meta-*, X-Amz-Meta and so on. */
2109 static void prepare_add_del_attrs(const map
<string
, bufferlist
>& orig_attrs
,
2110 const set
<string
>& rmattr_names
,
2111 map
<string
, bufferlist
>& out_attrs
)
2113 for (const auto& kv
: orig_attrs
) {
2114 const string
& name
= kv
.first
;
2116 /* Check if the attr is user-defined metadata item. */
2117 if (name
.compare(0, strlen(RGW_ATTR_META_PREFIX
),
2118 RGW_ATTR_META_PREFIX
) == 0) {
2119 /* For the buckets all existing meta attrs are preserved,
2120 except those that are listed in rmattr_names. */
2121 if (rmattr_names
.find(name
) != std::end(rmattr_names
)) {
2122 const auto aiter
= out_attrs
.find(name
);
2124 if (aiter
!= std::end(out_attrs
)) {
2125 out_attrs
.erase(aiter
);
2128 /* emplace() won't alter the map if the key is already present.
2129 * This behaviour is fully intensional here. */
2130 out_attrs
.emplace(kv
);
2132 } else if (out_attrs
.find(name
) == std::end(out_attrs
)) {
2133 out_attrs
[name
] = kv
.second
;
2139 static void populate_with_generic_attrs(const req_state
* const s
,
2140 map
<string
, bufferlist
>& out_attrs
)
2142 for (const auto& kv
: s
->generic_attrs
) {
2143 bufferlist
& attrbl
= out_attrs
[kv
.first
];
2144 const string
& val
= kv
.second
;
2146 attrbl
.append(val
.c_str(), val
.size() + 1);
2151 static int filter_out_quota_info(std::map
<std::string
, bufferlist
>& add_attrs
,
2152 const std::set
<std::string
>& rmattr_names
,
2153 RGWQuotaInfo
& quota
,
2154 bool * quota_extracted
= nullptr)
2156 bool extracted
= false;
2158 /* Put new limit on max objects. */
2159 auto iter
= add_attrs
.find(RGW_ATTR_QUOTA_NOBJS
);
2161 if (std::end(add_attrs
) != iter
) {
2163 static_cast<int64_t>(strict_strtoll(iter
->second
.c_str(), 10, &err
));
2167 add_attrs
.erase(iter
);
2171 /* Put new limit on bucket (container) size. */
2172 iter
= add_attrs
.find(RGW_ATTR_QUOTA_MSIZE
);
2173 if (iter
!= add_attrs
.end()) {
2175 static_cast<int64_t>(strict_strtoll(iter
->second
.c_str(), 10, &err
));
2179 add_attrs
.erase(iter
);
2183 for (const auto& name
: rmattr_names
) {
2184 /* Remove limit on max objects. */
2185 if (name
.compare(RGW_ATTR_QUOTA_NOBJS
) == 0) {
2186 quota
.max_objects
= -1;
2190 /* Remove limit on max bucket size. */
2191 if (name
.compare(RGW_ATTR_QUOTA_MSIZE
) == 0) {
2192 quota
.max_size
= -1;
2197 /* Swift requries checking on raw usage instead of the 4 KiB rounded one. */
2198 quota
.check_on_raw
= true;
2199 quota
.enabled
= quota
.max_size
> 0 || quota
.max_objects
> 0;
2201 if (quota_extracted
) {
2202 *quota_extracted
= extracted
;
2209 static void filter_out_website(std::map
<std::string
, ceph::bufferlist
>& add_attrs
,
2210 const std::set
<std::string
>& rmattr_names
,
2211 RGWBucketWebsiteConf
& ws_conf
)
2215 /* Let's define a mapping between each custom attribute and the memory where
2216 * attribute's value should be stored. The memory location is expressed by
2217 * a non-const reference. */
2218 const auto mapping
= {
2219 std::make_pair(RGW_ATTR_WEB_INDEX
, std::ref(ws_conf
.index_doc_suffix
)),
2220 std::make_pair(RGW_ATTR_WEB_ERROR
, std::ref(ws_conf
.error_doc
)),
2221 std::make_pair(RGW_ATTR_WEB_LISTINGS
, std::ref(lstval
)),
2222 std::make_pair(RGW_ATTR_WEB_LIST_CSS
, std::ref(ws_conf
.listing_css_doc
)),
2223 std::make_pair(RGW_ATTR_SUBDIR_MARKER
, std::ref(ws_conf
.subdir_marker
))
2226 for (const auto& kv
: mapping
) {
2227 const char * const key
= kv
.first
;
2228 auto& target
= kv
.second
;
2230 auto iter
= add_attrs
.find(key
);
2232 if (std::end(add_attrs
) != iter
) {
2233 /* The "target" is a reference to ws_conf. */
2234 target
= iter
->second
.c_str();
2235 add_attrs
.erase(iter
);
2238 if (rmattr_names
.count(key
)) {
2239 target
= std::string();
2243 if (! lstval
.empty()) {
2244 ws_conf
.listing_enabled
= boost::algorithm::iequals(lstval
, "true");
2249 void RGWCreateBucket::execute()
2251 RGWAccessControlPolicy
old_policy(s
->cct
);
2253 buffer::list corsbl
;
2256 rgw_make_bucket_entry_name(s
->bucket_tenant
, s
->bucket_name
, bucket_name
);
2257 rgw_raw_obj
obj(store
->get_zone_params().domain_root
, bucket_name
);
2258 obj_version objv
, *pobjv
= NULL
;
2260 op_ret
= get_params();
2264 if (!store
->get_zonegroup().is_master
&&
2265 store
->get_zonegroup().api_name
!= location_constraint
) {
2266 ldout(s
->cct
, 0) << "location constraint (" << location_constraint
<< ") doesn't match zonegroup" << " (" << store
->get_zonegroup().api_name
<< ")" << dendl
;
2271 /* we need to make sure we read bucket info, it's not read before for this
2272 * specific request */
2273 RGWObjectCtx
& obj_ctx
= *static_cast<RGWObjectCtx
*>(s
->obj_ctx
);
2274 op_ret
= store
->get_bucket_info(obj_ctx
, s
->bucket_tenant
, s
->bucket_name
,
2275 s
->bucket_info
, NULL
, &s
->bucket_attrs
);
2276 if (op_ret
< 0 && op_ret
!= -ENOENT
)
2278 s
->bucket_exists
= (op_ret
!= -ENOENT
);
2280 s
->bucket_owner
.set_id(s
->user
->user_id
);
2281 s
->bucket_owner
.set_name(s
->user
->display_name
);
2282 if (s
->bucket_exists
) {
2283 int r
= get_bucket_policy_from_attr(s
->cct
, store
, s
->bucket_info
,
2284 s
->bucket_attrs
, &old_policy
);
2286 if (old_policy
.get_owner().get_id().compare(s
->user
->user_id
) != 0) {
2293 RGWBucketInfo master_info
;
2294 rgw_bucket
*pmaster_bucket
;
2295 uint32_t *pmaster_num_shards
;
2296 real_time creation_time
;
2298 if (!store
->is_meta_master()) {
2300 op_ret
= forward_request_to_master(s
, NULL
, store
, in_data
, &jp
);
2305 JSONDecoder::decode_json("entry_point_object_ver", ep_objv
, &jp
);
2306 JSONDecoder::decode_json("object_ver", objv
, &jp
);
2307 JSONDecoder::decode_json("bucket_info", master_info
, &jp
);
2308 ldout(s
->cct
, 20) << "parsed: objv.tag=" << objv
.tag
<< " objv.ver=" << objv
.ver
<< dendl
;
2309 ldout(s
->cct
, 20) << "got creation time: << " << master_info
.creation_time
<< dendl
;
2310 pmaster_bucket
= &master_info
.bucket
;
2311 creation_time
= master_info
.creation_time
;
2312 pmaster_num_shards
= &master_info
.num_shards
;
2315 pmaster_bucket
= NULL
;
2316 pmaster_num_shards
= NULL
;
2319 string zonegroup_id
;
2321 if (s
->system_request
) {
2322 zonegroup_id
= s
->info
.args
.get(RGW_SYS_PARAM_PREFIX
"zonegroup");
2323 if (zonegroup_id
.empty()) {
2324 zonegroup_id
= store
->get_zonegroup().get_id();
2327 zonegroup_id
= store
->get_zonegroup().get_id();
2330 if (s
->bucket_exists
) {
2331 string selected_placement_rule
;
2333 bucket
.tenant
= s
->bucket_tenant
;
2334 bucket
.name
= s
->bucket_name
;
2335 op_ret
= store
->select_bucket_placement(*(s
->user
), zonegroup_id
,
2337 &selected_placement_rule
, nullptr);
2338 if (selected_placement_rule
!= s
->bucket_info
.placement_rule
) {
2344 /* Encode special metadata first as we're using std::map::emplace under
2345 * the hood. This method will add the new items only if the map doesn't
2346 * contain such keys yet. */
2347 policy
.encode(aclbl
);
2348 emplace_attr(RGW_ATTR_ACL
, std::move(aclbl
));
2351 cors_config
.encode(corsbl
);
2352 emplace_attr(RGW_ATTR_CORS
, std::move(corsbl
));
2355 RGWQuotaInfo quota_info
;
2356 const RGWQuotaInfo
* pquota_info
= nullptr;
2357 if (need_metadata_upload()) {
2358 /* It's supposed that following functions WILL NOT change any special
2359 * attributes (like RGW_ATTR_ACL) if they are already present in attrs. */
2360 rgw_get_request_metadata(s
->cct
, s
->info
, attrs
, false);
2361 prepare_add_del_attrs(s
->bucket_attrs
, rmattr_names
, attrs
);
2362 populate_with_generic_attrs(s
, attrs
);
2364 op_ret
= filter_out_quota_info(attrs
, rmattr_names
, quota_info
);
2368 pquota_info
= "a_info
;
2371 /* Web site of Swift API. */
2372 filter_out_website(attrs
, rmattr_names
, s
->bucket_info
.website_conf
);
2373 s
->bucket_info
.has_website
= !s
->bucket_info
.website_conf
.is_empty();
2376 s
->bucket
.tenant
= s
->bucket_tenant
; /* ignored if bucket exists */
2377 s
->bucket
.name
= s
->bucket_name
;
2379 /* Handle updates of the metadata for Swift's object versioning. */
2380 if (swift_ver_location
) {
2381 s
->bucket_info
.swift_ver_location
= *swift_ver_location
;
2382 s
->bucket_info
.swift_versioning
= (! swift_ver_location
->empty());
2385 op_ret
= store
->create_bucket(*(s
->user
), s
->bucket
, zonegroup_id
,
2386 placement_rule
, s
->bucket_info
.swift_ver_location
,
2388 info
, pobjv
, &ep_objv
, creation_time
,
2389 pmaster_bucket
, pmaster_num_shards
, true);
2390 /* continue if EEXIST and create_bucket will fail below. this way we can
2391 * recover from a partial create by retrying it. */
2392 ldout(s
->cct
, 20) << "rgw_create_bucket returned ret=" << op_ret
<< " bucket=" << s
->bucket
<< dendl
;
2394 if (op_ret
&& op_ret
!= -EEXIST
)
2397 existed
= (op_ret
== -EEXIST
);
2400 /* bucket already existed, might have raced with another bucket creation, or
2401 * might be partial bucket creation that never completed. Read existing bucket
2402 * info, verify that the reported bucket owner is the current user.
2403 * If all is ok then update the user's list of buckets.
2404 * Otherwise inform client about a name conflict.
2406 if (info
.owner
.compare(s
->user
->user_id
) != 0) {
2410 s
->bucket
= info
.bucket
;
2413 op_ret
= rgw_link_bucket(store
, s
->user
->user_id
, s
->bucket
,
2414 info
.creation_time
, false);
2415 if (op_ret
&& !existed
&& op_ret
!= -EEXIST
) {
2416 /* if it exists (or previously existed), don't remove it! */
2417 op_ret
= rgw_unlink_bucket(store
, s
->user
->user_id
, s
->bucket
.tenant
,
2420 ldout(s
->cct
, 0) << "WARNING: failed to unlink bucket: ret=" << op_ret
2423 } else if (op_ret
== -EEXIST
|| (op_ret
== 0 && existed
)) {
2424 op_ret
= -ERR_BUCKET_EXISTS
;
2427 if (need_metadata_upload() && existed
) {
2428 /* OK, it looks we lost race with another request. As it's required to
2429 * handle metadata fusion and upload, the whole operation becomes very
2430 * similar in nature to PutMetadataBucket. However, as the attrs may
2431 * changed in the meantime, we have to refresh. */
2434 RGWObjectCtx
& obj_ctx
= *static_cast<RGWObjectCtx
*>(s
->obj_ctx
);
2435 RGWBucketInfo binfo
;
2436 map
<string
, bufferlist
> battrs
;
2438 op_ret
= store
->get_bucket_info(obj_ctx
, s
->bucket_tenant
, s
->bucket_name
,
2439 binfo
, nullptr, &battrs
);
2442 } else if (binfo
.owner
.compare(s
->user
->user_id
) != 0) {
2443 /* New bucket doesn't belong to the account we're operating on. */
2447 s
->bucket_info
= binfo
;
2448 s
->bucket_attrs
= battrs
;
2453 rgw_get_request_metadata(s
->cct
, s
->info
, attrs
, false);
2454 prepare_add_del_attrs(s
->bucket_attrs
, rmattr_names
, attrs
);
2455 populate_with_generic_attrs(s
, attrs
);
2456 op_ret
= filter_out_quota_info(attrs
, rmattr_names
, s
->bucket_info
.quota
);
2461 /* Handle updates of the metadata for Swift's object versioning. */
2462 if (swift_ver_location
) {
2463 s
->bucket_info
.swift_ver_location
= *swift_ver_location
;
2464 s
->bucket_info
.swift_versioning
= (! swift_ver_location
->empty());
2467 /* Web site of Swift API. */
2468 filter_out_website(attrs
, rmattr_names
, s
->bucket_info
.website_conf
);
2469 s
->bucket_info
.has_website
= !s
->bucket_info
.website_conf
.is_empty();
2471 /* This will also set the quota on the bucket. */
2472 op_ret
= rgw_bucket_set_attrs(store
, s
->bucket_info
, attrs
,
2473 &s
->bucket_info
.objv_tracker
);
2474 } while (op_ret
== -ECANCELED
&& tries
++ < 20);
2476 /* Restore the proper return code. */
2478 op_ret
= -ERR_BUCKET_EXISTS
;
2483 int RGWDeleteBucket::verify_permission()
2485 if (!verify_bucket_permission(s
, RGW_PERM_WRITE
)) {
2492 void RGWDeleteBucket::pre_exec()
2494 rgw_bucket_object_pre_exec(s
);
2497 void RGWDeleteBucket::execute()
2501 if (s
->bucket_name
.empty())
2504 if (!s
->bucket_exists
) {
2505 ldout(s
->cct
, 0) << "ERROR: bucket " << s
->bucket_name
<< " not found" << dendl
;
2506 op_ret
= -ERR_NO_SUCH_BUCKET
;
2509 RGWObjVersionTracker ot
;
2510 ot
.read_version
= s
->bucket_info
.ep_objv
;
2512 if (s
->system_request
) {
2513 string tag
= s
->info
.args
.get(RGW_SYS_PARAM_PREFIX
"tag");
2514 string ver_str
= s
->info
.args
.get(RGW_SYS_PARAM_PREFIX
"ver");
2516 ot
.read_version
.tag
= tag
;
2519 ver
= strict_strtol(ver_str
.c_str(), 10, &err
);
2521 ldout(s
->cct
, 0) << "failed to parse ver param" << dendl
;
2525 ot
.read_version
.ver
= ver
;
2529 op_ret
= rgw_bucket_sync_user_stats(store
, s
->user
->user_id
, s
->bucket_info
);
2531 ldout(s
->cct
, 1) << "WARNING: failed to sync user stats before bucket delete: op_ret= " << op_ret
<< dendl
;
2534 op_ret
= store
->check_bucket_empty(s
->bucket_info
);
2539 if (!store
->is_meta_master()) {
2541 op_ret
= forward_request_to_master(s
, &ot
.read_version
, store
, in_data
,
2544 if (op_ret
== -ENOENT
) {
2545 /* adjust error, we want to return with NoSuchBucket and not
2547 op_ret
= -ERR_NO_SUCH_BUCKET
;
2553 op_ret
= store
->delete_bucket(s
->bucket_info
, ot
, false);
2555 if (op_ret
== -ECANCELED
) {
2556 // lost a race, either with mdlog sync or another delete bucket operation.
2557 // in either case, we've already called rgw_unlink_bucket()
2563 op_ret
= rgw_unlink_bucket(store
, s
->user
->user_id
, s
->bucket
.tenant
,
2564 s
->bucket
.name
, false);
2566 ldout(s
->cct
, 0) << "WARNING: failed to unlink bucket: ret=" << op_ret
2578 int RGWPutObj::verify_permission()
2582 RGWAccessControlPolicy
cs_policy(s
->cct
);
2583 map
<string
, bufferlist
> cs_attrs
;
2584 rgw_bucket
cs_bucket(copy_source_bucket_info
.bucket
);
2585 rgw_obj_key
cs_object(copy_source_object_name
, copy_source_version_id
);
2587 rgw_obj
obj(cs_bucket
, cs_object
);
2588 store
->set_atomic(s
->obj_ctx
, obj
);
2589 store
->set_prefetch_data(s
->obj_ctx
, obj
);
2591 /* check source object permissions */
2592 if (read_obj_policy(store
, s
, copy_source_bucket_info
, cs_attrs
, &cs_policy
, cs_bucket
, cs_object
) < 0) {
2596 /* admin request overrides permission checks */
2597 if (! s
->auth
.identity
->is_admin_of(cs_policy
.get_owner().get_id()) &&
2598 ! cs_policy
.verify_permission(*s
->auth
.identity
, s
->perm_mask
, RGW_PERM_READ
)) {
2604 if (!verify_bucket_permission(s
, RGW_PERM_WRITE
)) {
2611 void RGWPutObjProcessor_Multipart::get_mp(RGWMPObj
** _mp
){
2615 int RGWPutObjProcessor_Multipart::prepare(RGWRados
*store
, string
*oid_rand
)
2617 string oid
= obj_str
;
2618 upload_id
= s
->info
.args
.get("uploadId");
2620 mp
.init(oid
, upload_id
);
2622 mp
.init(oid
, upload_id
, *oid_rand
);
2625 part_num
= s
->info
.args
.get("partNumber");
2626 if (part_num
.empty()) {
2627 ldout(s
->cct
, 10) << "part number is empty" << dendl
;
2632 uint64_t num
= (uint64_t)strict_strtol(part_num
.c_str(), 10, &err
);
2635 ldout(s
->cct
, 10) << "bad part number: " << part_num
<< ": " << err
<< dendl
;
2639 string upload_prefix
= oid
+ ".";
2642 upload_prefix
.append(upload_id
);
2644 upload_prefix
.append(*oid_rand
);
2648 target_obj
.init(bucket
, oid
);
2650 manifest
.set_prefix(upload_prefix
);
2652 manifest
.set_multipart_part_rule(store
->ctx()->_conf
->rgw_obj_stripe_size
, num
);
2654 int r
= manifest_gen
.create_begin(store
->ctx(), &manifest
, s
->bucket_info
.placement_rule
, bucket
, target_obj
);
2659 cur_obj
= manifest_gen
.get_cur_obj(store
);
2660 rgw_raw_obj_to_obj(bucket
, cur_obj
, &head_obj
);
2661 head_obj
.index_hash_source
= obj_str
;
2663 r
= prepare_init(store
, NULL
);
2671 int RGWPutObjProcessor_Multipart::do_complete(size_t accounted_size
,
2673 real_time
*mtime
, real_time set_mtime
,
2674 map
<string
, bufferlist
>& attrs
,
2675 real_time delete_at
,
2676 const char *if_match
,
2677 const char *if_nomatch
, const string
*user_data
)
2679 complete_writing_data();
2681 RGWRados::Object
op_target(store
, s
->bucket_info
, obj_ctx
, head_obj
);
2682 RGWRados::Object::Write
head_obj_op(&op_target
);
2684 head_obj_op
.meta
.set_mtime
= set_mtime
;
2685 head_obj_op
.meta
.mtime
= mtime
;
2686 head_obj_op
.meta
.owner
= s
->owner
.get_id();
2687 head_obj_op
.meta
.delete_at
= delete_at
;
2689 int r
= head_obj_op
.write_meta(obj_len
, accounted_size
, attrs
);
2694 RGWUploadPartInfo info
;
2696 bool sorted_omap
= is_v2_upload_id(upload_id
);
2700 int part_num_int
= strict_strtol(part_num
.c_str(), 10, &err
);
2702 dout(10) << "bad part number specified: " << part_num
<< dendl
;
2706 snprintf(buf
, sizeof(buf
), "%08d", part_num_int
);
2711 info
.num
= atoi(part_num
.c_str());
2713 info
.size
= obj_len
;
2714 info
.accounted_size
= accounted_size
;
2715 info
.modified
= real_clock::now();
2716 info
.manifest
= manifest
;
2719 r
= rgw_compression_info_from_attrset(attrs
, compressed
, info
.cs_info
);
2721 dout(1) << "cannot get compression info" << dendl
;
2727 string multipart_meta_obj
= mp
.get_meta();
2730 meta_obj
.init_ns(bucket
, multipart_meta_obj
, mp_ns
);
2731 meta_obj
.set_in_extra_data(true);
2733 rgw_raw_obj raw_meta_obj
;
2735 store
->obj_to_raw(s
->bucket_info
.placement_rule
, meta_obj
, &raw_meta_obj
);
2737 r
= store
->omap_set(raw_meta_obj
, p
, bl
);
2742 RGWPutObjProcessor
*RGWPutObj::select_processor(RGWObjectCtx
& obj_ctx
, bool *is_multipart
)
2744 RGWPutObjProcessor
*processor
;
2746 bool multipart
= s
->info
.args
.exists("uploadId");
2748 uint64_t part_size
= s
->cct
->_conf
->rgw_obj_stripe_size
;
2751 processor
= new RGWPutObjProcessor_Atomic(obj_ctx
, s
->bucket_info
, s
->bucket
, s
->object
.name
, part_size
, s
->req_id
, s
->bucket_info
.versioning_enabled());
2752 (static_cast<RGWPutObjProcessor_Atomic
*>(processor
))->set_olh_epoch(olh_epoch
);
2753 (static_cast<RGWPutObjProcessor_Atomic
*>(processor
))->set_version_id(version_id
);
2755 processor
= new RGWPutObjProcessor_Multipart(obj_ctx
, s
->bucket_info
, part_size
, s
);
2759 *is_multipart
= multipart
;
2765 void RGWPutObj::dispose_processor(RGWPutObjDataProcessor
*processor
)
2770 void RGWPutObj::pre_exec()
2772 rgw_bucket_object_pre_exec(s
);
2775 class RGWPutObj_CB
: public RGWGetDataCB
2779 RGWPutObj_CB(RGWPutObj
*_op
) : op(_op
) {}
2780 ~RGWPutObj_CB() override
{}
2782 int handle_data(bufferlist
& bl
, off_t bl_ofs
, off_t bl_len
) override
{
2783 return op
->get_data_cb(bl
, bl_ofs
, bl_len
);
2787 int RGWPutObj::get_data_cb(bufferlist
& bl
, off_t bl_ofs
, off_t bl_len
)
2790 bl
.copy(bl_ofs
, bl_len
, bl_tmp
);
2792 bl_aux
.append(bl_tmp
);
2797 int RGWPutObj::get_data(const off_t fst
, const off_t lst
, bufferlist
& bl
)
2799 RGWPutObj_CB
cb(this);
2800 RGWGetDataCB
* filter
= &cb
;
2801 boost::optional
<RGWGetObj_Decompress
> decompress
;
2802 std::unique_ptr
<RGWGetDataCB
> decrypt
;
2803 RGWCompressionInfo cs_info
;
2804 map
<string
, bufferlist
> attrs
;
2805 map
<string
, bufferlist
>::iterator attr_iter
;
2809 int64_t new_ofs
, new_end
;
2814 rgw_obj_key
obj_key(copy_source_object_name
, copy_source_version_id
);
2815 rgw_obj
obj(copy_source_bucket_info
.bucket
, obj_key
);
2817 RGWRados::Object
op_target(store
, copy_source_bucket_info
, *static_cast<RGWObjectCtx
*>(s
->obj_ctx
), obj
);
2818 RGWRados::Object::Read
read_op(&op_target
);
2819 read_op
.params
.obj_size
= &obj_size
;
2820 read_op
.params
.attrs
= &attrs
;
2822 ret
= read_op
.prepare();
2826 bool need_decompress
;
2827 op_ret
= rgw_compression_info_from_attrset(attrs
, need_decompress
, cs_info
);
2829 lderr(s
->cct
) << "ERROR: failed to decode compression info, cannot decompress" << dendl
;
2833 bool partial_content
= true;
2834 if (need_decompress
)
2836 obj_size
= cs_info
.orig_size
;
2837 decompress
.emplace(s
->cct
, &cs_info
, partial_content
, filter
);
2838 filter
= &*decompress
;
2841 attr_iter
= attrs
.find(RGW_ATTR_MANIFEST
);
2842 op_ret
= this->get_decrypt_filter(&decrypt
,
2845 attr_iter
!= attrs
.end() ? &(attr_iter
->second
) : nullptr);
2846 if (decrypt
!= nullptr) {
2847 filter
= decrypt
.get();
2853 ret
= read_op
.range_to_ofs(obj_size
, new_ofs
, new_end
);
2857 filter
->fixup_range(new_ofs
, new_end
);
2858 ret
= read_op
.iterate(new_ofs
, new_end
, filter
);
2861 ret
= filter
->flush();
2863 bl
.claim_append(bl_aux
);
2868 // special handling for compression type = "random" with multipart uploads
2869 static CompressorRef
get_compressor_plugin(const req_state
*s
,
2870 const std::string
& compression_type
)
2872 if (compression_type
!= "random") {
2873 return Compressor::create(s
->cct
, compression_type
);
2876 bool is_multipart
{false};
2877 const auto& upload_id
= s
->info
.args
.get("uploadId", &is_multipart
);
2879 if (!is_multipart
) {
2880 return Compressor::create(s
->cct
, compression_type
);
2883 // use a hash of the multipart upload id so all parts use the same plugin
2884 const auto alg
= std::hash
<std::string
>{}(upload_id
) % Compressor::COMP_ALG_LAST
;
2885 if (alg
== Compressor::COMP_ALG_NONE
) {
2888 return Compressor::create(s
->cct
, alg
);
2891 void RGWPutObj::execute()
2893 RGWPutObjProcessor
*processor
= NULL
;
2894 RGWPutObjDataProcessor
*filter
= nullptr;
2895 std::unique_ptr
<RGWPutObjDataProcessor
> encrypt
;
2896 char supplied_md5_bin
[CEPH_CRYPTO_MD5_DIGESTSIZE
+ 1];
2897 char supplied_md5
[CEPH_CRYPTO_MD5_DIGESTSIZE
* 2 + 1];
2898 char calc_md5
[CEPH_CRYPTO_MD5_DIGESTSIZE
* 2 + 1];
2899 unsigned char m
[CEPH_CRYPTO_MD5_DIGESTSIZE
];
2901 bufferlist bl
, aclbl
, bs
;
2903 map
<string
, string
>::iterator iter
;
2908 const auto& compression_type
= store
->get_zone_params().get_compression_type(
2909 s
->bucket_info
.placement_rule
);
2910 CompressorRef plugin
;
2911 boost::optional
<RGWPutObj_Compress
> compressor
;
2913 bool need_calc_md5
= (dlo_manifest
== NULL
) && (slo_info
== NULL
);
2914 perfcounter
->inc(l_rgw_put
);
2916 if (s
->object
.empty()) {
2920 if (!s
->bucket_exists
) {
2921 op_ret
= -ERR_NO_SUCH_BUCKET
;
2925 op_ret
= get_params();
2927 ldout(s
->cct
, 20) << "get_params() returned ret=" << op_ret
<< dendl
;
2931 op_ret
= get_system_versioning_params(s
, &olh_epoch
, &version_id
);
2933 ldout(s
->cct
, 20) << "get_system_versioning_params() returned ret="
2938 if (supplied_md5_b64
) {
2939 need_calc_md5
= true;
2941 ldout(s
->cct
, 15) << "supplied_md5_b64=" << supplied_md5_b64
<< dendl
;
2942 op_ret
= ceph_unarmor(supplied_md5_bin
, &supplied_md5_bin
[CEPH_CRYPTO_MD5_DIGESTSIZE
+ 1],
2943 supplied_md5_b64
, supplied_md5_b64
+ strlen(supplied_md5_b64
));
2944 ldout(s
->cct
, 15) << "ceph_armor ret=" << op_ret
<< dendl
;
2945 if (op_ret
!= CEPH_CRYPTO_MD5_DIGESTSIZE
) {
2946 op_ret
= -ERR_INVALID_DIGEST
;
2950 buf_to_hex((const unsigned char *)supplied_md5_bin
, CEPH_CRYPTO_MD5_DIGESTSIZE
, supplied_md5
);
2951 ldout(s
->cct
, 15) << "supplied_md5=" << supplied_md5
<< dendl
;
2954 if (!chunked_upload
) { /* with chunked upload we don't know how big is the upload.
2955 we also check sizes at the end anyway */
2956 op_ret
= store
->check_quota(s
->bucket_owner
.get_id(), s
->bucket
,
2957 user_quota
, bucket_quota
, s
->content_length
);
2959 ldout(s
->cct
, 20) << "check_quota() returned ret=" << op_ret
<< dendl
;
2964 if (supplied_etag
) {
2965 strncpy(supplied_md5
, supplied_etag
, sizeof(supplied_md5
) - 1);
2966 supplied_md5
[sizeof(supplied_md5
) - 1] = '\0';
2969 processor
= select_processor(*static_cast<RGWObjectCtx
*>(s
->obj_ctx
), &multipart
);
2971 // no filters by default
2974 /* Handle object versioning of Swift API. */
2976 rgw_obj
obj(s
->bucket
, s
->object
);
2977 op_ret
= store
->swift_versioning_copy(*static_cast<RGWObjectCtx
*>(s
->obj_ctx
),
2978 s
->bucket_owner
.get_id(),
2986 op_ret
= processor
->prepare(store
, NULL
);
2988 ldout(s
->cct
, 20) << "processor->prepare() returned ret=" << op_ret
2993 fst
= copy_source_range_fst
;
2994 lst
= copy_source_range_lst
;
2996 op_ret
= get_encrypt_filter(&encrypt
, filter
);
3000 if (encrypt
!= nullptr) {
3001 filter
= encrypt
.get();
3003 //no encryption, we can try compression
3004 if (compression_type
!= "none") {
3005 plugin
= get_compressor_plugin(s
, compression_type
);
3007 ldout(s
->cct
, 1) << "Cannot load plugin for compression type "
3008 << compression_type
<< dendl
;
3010 compressor
.emplace(s
->cct
, plugin
, filter
);
3011 filter
= &*compressor
;
3021 len
= get_data(data_in
);
3023 uint64_t cur_lst
= min(fst
+ s
->cct
->_conf
->rgw_max_chunk_size
- 1, lst
);
3024 op_ret
= get_data(fst
, cur_lst
, data_in
);
3027 len
= data_in
.length();
3028 s
->content_length
+= len
;
3036 bufferlist
&data
= data_in
;
3037 if (len
&& s
->aws4_auth_streaming_mode
) {
3038 /* use unwrapped data */
3039 data
= s
->aws4_auth
->bl
;
3040 len
= data
.length();
3043 if (need_calc_md5
) {
3044 hash
.Update((const byte
*)data
.c_str(), data
.length());
3047 /* save data for producing torrent data */
3048 torrent
.save_data(data_in
);
3050 /* do we need this operation to be synchronous? if we're dealing with an object with immutable
3051 * head, e.g., multipart object we need to make sure we're the first one writing to this object
3053 bool need_to_wait
= (ofs
== 0) && multipart
;
3055 bufferlist orig_data
;
3061 op_ret
= put_data_and_throttle(filter
, data
, ofs
, need_to_wait
);
3063 if (!need_to_wait
|| op_ret
!= -EEXIST
) {
3064 ldout(s
->cct
, 20) << "processor->thottle_data() returned ret="
3068 /* need_to_wait == true and op_ret == -EEXIST */
3069 ldout(s
->cct
, 5) << "NOTICE: processor->throttle_data() returned -EEXIST, need to restart write" << dendl
;
3071 /* restore original data */
3072 data
.swap(orig_data
);
3074 /* restart processing with different oid suffix */
3076 dispose_processor(processor
);
3077 processor
= select_processor(*static_cast<RGWObjectCtx
*>(s
->obj_ctx
), &multipart
);
3082 gen_rand_alphanumeric(store
->ctx(), buf
, sizeof(buf
) - 1);
3083 oid_rand
.append(buf
);
3085 op_ret
= processor
->prepare(store
, &oid_rand
);
3087 ldout(s
->cct
, 0) << "ERROR: processor->prepare() returned "
3092 op_ret
= get_encrypt_filter(&encrypt
, filter
);
3096 if (encrypt
!= nullptr) {
3097 filter
= encrypt
.get();
3100 compressor
.emplace(s
->cct
, plugin
, filter
);
3101 filter
= &*compressor
;
3104 op_ret
= put_data_and_throttle(filter
, data
, ofs
, false);
3115 op_ret
= put_data_and_throttle(filter
, flush
, ofs
, false);
3121 if (!chunked_upload
&&
3122 ofs
!= s
->content_length
&&
3123 !s
->aws4_auth_streaming_mode
) {
3124 op_ret
= -ERR_REQUEST_TIMEOUT
;
3129 perfcounter
->inc(l_rgw_put_b
, s
->obj_size
);
3131 if (s
->aws4_auth_needs_complete
) {
3133 /* complete aws4 auth */
3135 op_ret
= RGW_Auth_S3::authorize_aws4_auth_complete(store
, s
);
3140 s
->aws4_auth_needs_complete
= false;
3142 /* verify signature */
3144 if (s
->aws4_auth
->signature
!= s
->aws4_auth
->new_signature
) {
3145 op_ret
= -ERR_SIGNATURE_NO_MATCH
;
3146 ldout(s
->cct
, 20) << "delayed aws4 auth failed" << dendl
;
3150 /* authorization ok */
3152 dout(10) << "v4 auth ok" << dendl
;
3155 op_ret
= store
->check_quota(s
->bucket_owner
.get_id(), s
->bucket
,
3156 user_quota
, bucket_quota
, s
->obj_size
);
3158 ldout(s
->cct
, 20) << "second check_quota() returned op_ret=" << op_ret
<< dendl
;
3164 if (compressor
&& compressor
->is_compressed()) {
3166 RGWCompressionInfo cs_info
;
3167 cs_info
.compression_type
= plugin
->get_type_name();
3168 cs_info
.orig_size
= s
->obj_size
;
3169 cs_info
.blocks
= move(compressor
->get_compression_blocks());
3170 ::encode(cs_info
, tmp
);
3171 attrs
[RGW_ATTR_COMPRESSION
] = tmp
;
3172 ldout(s
->cct
, 20) << "storing " << RGW_ATTR_COMPRESSION
3173 << " with type=" << cs_info
.compression_type
3174 << ", orig_size=" << cs_info
.orig_size
3175 << ", blocks=" << cs_info
.blocks
.size() << dendl
;
3178 buf_to_hex(m
, CEPH_CRYPTO_MD5_DIGESTSIZE
, calc_md5
);
3182 if (supplied_md5_b64
&& strcmp(calc_md5
, supplied_md5
)) {
3183 op_ret
= -ERR_BAD_DIGEST
;
3187 policy
.encode(aclbl
);
3188 emplace_attr(RGW_ATTR_ACL
, std::move(aclbl
));
3191 op_ret
= encode_dlo_manifest_attr(dlo_manifest
, attrs
);
3193 ldout(s
->cct
, 0) << "bad user manifest: " << dlo_manifest
<< dendl
;
3196 complete_etag(hash
, &etag
);
3197 ldout(s
->cct
, 10) << __func__
<< ": calculated md5 for user manifest: " << etag
<< dendl
;
3201 bufferlist manifest_bl
;
3202 ::encode(*slo_info
, manifest_bl
);
3203 emplace_attr(RGW_ATTR_SLO_MANIFEST
, std::move(manifest_bl
));
3205 hash
.Update((byte
*)slo_info
->raw_data
, slo_info
->raw_data_len
);
3206 complete_etag(hash
, &etag
);
3207 ldout(s
->cct
, 10) << __func__
<< ": calculated md5 for user manifest: " << etag
<< dendl
;
3210 if (supplied_etag
&& etag
.compare(supplied_etag
) != 0) {
3211 op_ret
= -ERR_UNPROCESSABLE_ENTITY
;
3214 bl
.append(etag
.c_str(), etag
.size() + 1);
3215 emplace_attr(RGW_ATTR_ETAG
, std::move(bl
));
3217 populate_with_generic_attrs(s
, attrs
);
3218 rgw_get_request_metadata(s
->cct
, s
->info
, attrs
);
3219 encode_delete_at_attr(delete_at
, attrs
);
3221 /* Add a custom metadata to expose the information whether an object
3222 * is an SLO or not. Appending the attribute must be performed AFTER
3223 * processing any input from user in order to prohibit overwriting. */
3225 bufferlist slo_userindicator_bl
;
3226 ::encode("True", slo_userindicator_bl
);
3227 emplace_attr(RGW_ATTR_SLO_UINDICATOR
, std::move(slo_userindicator_bl
));
3230 op_ret
= processor
->complete(s
->obj_size
, etag
, &mtime
, real_time(), attrs
,
3231 (delete_at
? *delete_at
: real_time()), if_match
, if_nomatch
,
3232 (user_data
.empty() ? nullptr : &user_data
));
3234 /* produce torrent */
3235 if (s
->cct
->_conf
->rgw_torrent_flag
&& (ofs
== torrent
.get_data_len()))
3237 torrent
.init(s
, store
);
3238 torrent
.set_create_date(mtime
);
3239 op_ret
= torrent
.handle_data();
3242 ldout(s
->cct
, 0) << "ERROR: torrent.handle_data() returned " << op_ret
<< dendl
;
3248 dispose_processor(processor
);
3249 perfcounter
->tinc(l_rgw_put_lat
,
3250 (ceph_clock_now() - s
->time
));
3253 int RGWPostObj::verify_permission()
3258 RGWPutObjProcessor *RGWPostObj::select_processor(RGWObjectCtx& obj_ctx)
3260 RGWPutObjProcessor *processor;
3262 uint64_t part_size = s->cct->_conf->rgw_obj_stripe_size;
3264 processor = new RGWPutObjProcessor_Atomic(obj_ctx, s->bucket_info, s->bucket, s->object.name, part_size, s->req_id, s->bucket_info.versioning_enabled());
3269 void RGWPostObj::dispose_processor(RGWPutObjDataProcessor *processor)
3274 void RGWPostObj::pre_exec()
3276 rgw_bucket_object_pre_exec(s
);
3279 void RGWPostObj::execute()
3281 RGWPutObjDataProcessor
*filter
= nullptr;
3282 boost::optional
<RGWPutObj_Compress
> compressor
;
3283 CompressorRef plugin
;
3285 /* Read in the data from the POST form. */
3286 op_ret
= get_params();
3291 op_ret
= verify_params();
3296 if (!verify_bucket_permission(s
, RGW_PERM_WRITE
)) {
3301 /* Start iteration over data fields. It's necessary as Swift's FormPost
3302 * is capable to handle multiple files in single form. */
3304 std::unique_ptr
<RGWPutObjDataProcessor
> encrypt
;
3305 char calc_md5
[CEPH_CRYPTO_MD5_DIGESTSIZE
* 2 + 1];
3306 unsigned char m
[CEPH_CRYPTO_MD5_DIGESTSIZE
];
3308 ceph::buffer::list bl
, aclbl
;
3311 op_ret
= store
->check_quota(s
->bucket_owner
.get_id(),
3320 RGWPutObjProcessor_Atomic
processor(*static_cast<RGWObjectCtx
*>(s
->obj_ctx
),
3323 get_current_filename(),
3325 s
->cct
->_conf
->rgw_obj_stripe_size
,
3327 s
->bucket_info
.versioning_enabled());
3328 /* No filters by default. */
3329 filter
= &processor
;
3331 op_ret
= processor
.prepare(store
, nullptr);
3336 op_ret
= get_encrypt_filter(&encrypt
, filter
);
3340 if (encrypt
!= nullptr) {
3341 filter
= encrypt
.get();
3343 const auto& compression_type
= store
->get_zone_params().get_compression_type(
3344 s
->bucket_info
.placement_rule
);
3345 if (compression_type
!= "none") {
3346 plugin
= Compressor::create(s
->cct
, compression_type
);
3348 ldout(s
->cct
, 1) << "Cannot load plugin for compression type "
3349 << compression_type
<< dendl
;
3351 compressor
.emplace(s
->cct
, plugin
, filter
);
3352 filter
= &*compressor
;
3359 ceph::bufferlist data
;
3360 len
= get_data(data
, again
);
3371 hash
.Update((const byte
*)data
.c_str(), data
.length());
3372 op_ret
= put_data_and_throttle(filter
, data
, ofs
, false);
3376 if (ofs
> max_len
) {
3377 op_ret
= -ERR_TOO_LARGE
;
3384 op_ret
= put_data_and_throttle(filter
, flush
, ofs
, false);
3387 if (len
< min_len
) {
3388 op_ret
= -ERR_TOO_SMALL
;
3394 op_ret
= store
->check_quota(s
->bucket_owner
.get_id(), s
->bucket
,
3395 user_quota
, bucket_quota
, s
->obj_size
);
3401 buf_to_hex(m
, CEPH_CRYPTO_MD5_DIGESTSIZE
, calc_md5
);
3404 bl
.append(etag
.c_str(), etag
.size() + 1);
3405 emplace_attr(RGW_ATTR_ETAG
, std::move(bl
));
3407 policy
.encode(aclbl
);
3408 emplace_attr(RGW_ATTR_ACL
, std::move(aclbl
));
3410 const std::string content_type
= get_current_content_type();
3411 if (! content_type
.empty()) {
3412 ceph::bufferlist ct_bl
;
3413 ct_bl
.append(content_type
.c_str(), content_type
.size() + 1);
3414 emplace_attr(RGW_ATTR_CONTENT_TYPE
, std::move(ct_bl
));
3417 if (compressor
&& compressor
->is_compressed()) {
3418 ceph::bufferlist tmp
;
3419 RGWCompressionInfo cs_info
;
3420 cs_info
.compression_type
= plugin
->get_type_name();
3421 cs_info
.orig_size
= s
->obj_size
;
3422 cs_info
.blocks
= move(compressor
->get_compression_blocks());
3423 ::encode(cs_info
, tmp
);
3424 emplace_attr(RGW_ATTR_COMPRESSION
, std::move(tmp
));
3427 op_ret
= processor
.complete(s
->obj_size
, etag
, nullptr, real_time(),
3428 attrs
, (delete_at
? *delete_at
: real_time()));
3429 } while (is_next_file_to_upload());
3433 void RGWPutMetadataAccount::filter_out_temp_url(map
<string
, bufferlist
>& add_attrs
,
3434 const set
<string
>& rmattr_names
,
3435 map
<int, string
>& temp_url_keys
)
3437 map
<string
, bufferlist
>::iterator iter
;
3439 iter
= add_attrs
.find(RGW_ATTR_TEMPURL_KEY1
);
3440 if (iter
!= add_attrs
.end()) {
3441 temp_url_keys
[0] = iter
->second
.c_str();
3442 add_attrs
.erase(iter
);
3445 iter
= add_attrs
.find(RGW_ATTR_TEMPURL_KEY2
);
3446 if (iter
!= add_attrs
.end()) {
3447 temp_url_keys
[1] = iter
->second
.c_str();
3448 add_attrs
.erase(iter
);
3451 for (const string
& name
: rmattr_names
) {
3452 if (name
.compare(RGW_ATTR_TEMPURL_KEY1
) == 0) {
3453 temp_url_keys
[0] = string();
3455 if (name
.compare(RGW_ATTR_TEMPURL_KEY2
) == 0) {
3456 temp_url_keys
[1] = string();
3461 int RGWPutMetadataAccount::init_processing()
3463 /* First, go to the base class. At the time of writing the method was
3464 * responsible only for initializing the quota. This isn't necessary
3465 * here as we are touching metadata only. I'm putting this call only
3466 * for the future. */
3467 op_ret
= RGWOp::init_processing();
3472 op_ret
= get_params();
3477 op_ret
= rgw_get_user_attrs_by_uid(store
, s
->user
->user_id
, orig_attrs
,
3485 policy
.encode(acl_bl
);
3486 attrs
.emplace(RGW_ATTR_ACL
, std::move(acl_bl
));
3489 rgw_get_request_metadata(s
->cct
, s
->info
, attrs
, false);
3490 prepare_add_del_attrs(orig_attrs
, rmattr_names
, attrs
);
3491 populate_with_generic_attrs(s
, attrs
);
3493 /* Try extract the TempURL-related stuff now to allow verify_permission
3494 * evaluate whether we need FULL_CONTROL or not. */
3495 filter_out_temp_url(attrs
, rmattr_names
, temp_url_keys
);
3497 /* The same with quota except a client needs to be reseller admin. */
3498 op_ret
= filter_out_quota_info(attrs
, rmattr_names
, new_quota
,
3499 &new_quota_extracted
);
3507 int RGWPutMetadataAccount::verify_permission()
3509 if (s
->auth
.identity
->is_anonymous()) {
3513 if (!verify_user_permission(s
, RGW_PERM_WRITE
)) {
3517 /* Altering TempURL keys requires FULL_CONTROL. */
3518 if (!temp_url_keys
.empty() && s
->perm_mask
!= RGW_PERM_FULL_CONTROL
) {
3522 /* We are failing this intensionally to allow system user/reseller admin
3523 * override in rgw_process.cc. This is the way to specify a given RGWOp
3524 * expect extra privileges. */
3525 if (new_quota_extracted
) {
3532 void RGWPutMetadataAccount::execute()
3534 /* Params have been extracted earlier. See init_processing(). */
3535 RGWUserInfo new_uinfo
;
3536 op_ret
= rgw_get_user_info_by_uid(store
, s
->user
->user_id
, new_uinfo
,
3542 /* Handle the TempURL-related stuff. */
3543 if (!temp_url_keys
.empty()) {
3544 for (auto& pair
: temp_url_keys
) {
3545 new_uinfo
.temp_url_keys
[pair
.first
] = std::move(pair
.second
);
3549 /* Handle the quota extracted at the verify_permission step. */
3550 if (new_quota_extracted
) {
3551 new_uinfo
.user_quota
= std::move(new_quota
);
3554 /* We are passing here the current (old) user info to allow the function
3555 * optimize-out some operations. */
3556 op_ret
= rgw_store_user_info(store
, new_uinfo
, s
->user
,
3557 &acct_op_tracker
, real_time(), false, &attrs
);
3560 int RGWPutMetadataBucket::verify_permission()
3562 if (!verify_bucket_permission(s
, RGW_PERM_WRITE
)) {
3569 void RGWPutMetadataBucket::pre_exec()
3571 rgw_bucket_object_pre_exec(s
);
3574 void RGWPutMetadataBucket::execute()
3576 op_ret
= get_params();
3581 rgw_get_request_metadata(s
->cct
, s
->info
, attrs
, false);
3583 if (!placement_rule
.empty() &&
3584 placement_rule
!= s
->bucket_info
.placement_rule
) {
3589 /* Encode special metadata first as we're using std::map::emplace under
3590 * the hood. This method will add the new items only if the map doesn't
3591 * contain such keys yet. */
3593 if (s
->dialect
.compare("swift") == 0) {
3594 auto old_policy
= static_cast<RGWAccessControlPolicy_SWIFT
*>(s
->bucket_acl
);
3595 auto new_policy
= static_cast<RGWAccessControlPolicy_SWIFT
*>(&policy
);
3596 new_policy
->filter_merge(policy_rw_mask
, old_policy
);
3597 policy
= *new_policy
;
3601 emplace_attr(RGW_ATTR_ACL
, std::move(bl
));
3606 cors_config
.encode(bl
);
3607 emplace_attr(RGW_ATTR_CORS
, std::move(bl
));
3610 /* It's supposed that following functions WILL NOT change any special
3611 * attributes (like RGW_ATTR_ACL) if they are already present in attrs. */
3612 prepare_add_del_attrs(s
->bucket_attrs
, rmattr_names
, attrs
);
3613 populate_with_generic_attrs(s
, attrs
);
3615 /* According to the Swift's behaviour and its container_quota WSGI middleware
3616 * implementation: anyone with write permissions is able to set the bucket
3617 * quota. This stays in contrast to account quotas that can be set only by
3618 * clients holding reseller admin privileges. */
3619 op_ret
= filter_out_quota_info(attrs
, rmattr_names
, s
->bucket_info
.quota
);
3624 if (swift_ver_location
) {
3625 s
->bucket_info
.swift_ver_location
= *swift_ver_location
;
3626 s
->bucket_info
.swift_versioning
= (! swift_ver_location
->empty());
3629 /* Web site of Swift API. */
3630 filter_out_website(attrs
, rmattr_names
, s
->bucket_info
.website_conf
);
3631 s
->bucket_info
.has_website
= !s
->bucket_info
.website_conf
.is_empty();
3633 /* Setting attributes also stores the provided bucket info. Due to this
3634 * fact, the new quota settings can be serialized with the same call. */
3635 op_ret
= rgw_bucket_set_attrs(store
, s
->bucket_info
, attrs
,
3636 &s
->bucket_info
.objv_tracker
);
3639 int RGWPutMetadataObject::verify_permission()
3641 if (!verify_object_permission(s
, RGW_PERM_WRITE
)) {
3648 void RGWPutMetadataObject::pre_exec()
3650 rgw_bucket_object_pre_exec(s
);
3653 void RGWPutMetadataObject::execute()
3655 rgw_obj
obj(s
->bucket
, s
->object
);
3656 map
<string
, bufferlist
> attrs
, orig_attrs
, rmattrs
;
3658 store
->set_atomic(s
->obj_ctx
, obj
);
3660 op_ret
= get_params();
3665 rgw_get_request_metadata(s
->cct
, s
->info
, attrs
);
3666 /* check if obj exists, read orig attrs */
3667 op_ret
= get_obj_attrs(store
, s
, obj
, orig_attrs
);
3672 /* Check whether the object has expired. Swift API documentation
3673 * stands that we should return 404 Not Found in such case. */
3674 if (need_object_expiration() && object_is_expired(orig_attrs
)) {
3679 /* Filter currently existing attributes. */
3680 prepare_add_del_attrs(orig_attrs
, attrs
, rmattrs
);
3681 populate_with_generic_attrs(s
, attrs
);
3682 encode_delete_at_attr(delete_at
, attrs
);
3685 op_ret
= encode_dlo_manifest_attr(dlo_manifest
, attrs
);
3687 ldout(s
->cct
, 0) << "bad user manifest: " << dlo_manifest
<< dendl
;
3692 op_ret
= store
->set_attrs(s
->obj_ctx
, s
->bucket_info
, obj
, attrs
, &rmattrs
);
3695 int RGWDeleteObj::handle_slo_manifest(bufferlist
& bl
)
3697 RGWSLOInfo slo_info
;
3698 bufferlist::iterator bliter
= bl
.begin();
3700 ::decode(slo_info
, bliter
);
3701 } catch (buffer::error
& err
) {
3702 ldout(s
->cct
, 0) << "ERROR: failed to decode slo manifest" << dendl
;
3707 deleter
= std::unique_ptr
<RGWBulkDelete::Deleter
>(\
3708 new RGWBulkDelete::Deleter(store
, s
));
3709 } catch (std::bad_alloc
) {
3713 list
<RGWBulkDelete::acct_path_t
> items
;
3714 for (const auto& iter
: slo_info
.entries
) {
3715 const string
& path_str
= iter
.path
;
3717 const size_t sep_pos
= path_str
.find('/', 1 /* skip first slash */);
3718 if (string::npos
== sep_pos
) {
3722 RGWBulkDelete::acct_path_t path
;
3725 url_decode(path_str
.substr(1, sep_pos
- 1), bucket_name
);
3728 url_decode(path_str
.substr(sep_pos
+ 1), obj_name
);
3730 path
.bucket_name
= bucket_name
;
3731 path
.obj_key
= obj_name
;
3733 items
.push_back(path
);
3736 /* Request removal of the manifest object itself. */
3737 RGWBulkDelete::acct_path_t path
;
3738 path
.bucket_name
= s
->bucket_name
;
3739 path
.obj_key
= s
->object
;
3740 items
.push_back(path
);
3742 int ret
= deleter
->delete_chunk(items
);
3750 int RGWDeleteObj::verify_permission()
3752 if (!verify_bucket_permission(s
, RGW_PERM_WRITE
)) {
3759 void RGWDeleteObj::pre_exec()
3761 rgw_bucket_object_pre_exec(s
);
3764 void RGWDeleteObj::execute()
3766 if (!s
->bucket_exists
) {
3767 op_ret
= -ERR_NO_SUCH_BUCKET
;
3771 op_ret
= get_params();
3776 rgw_obj
obj(s
->bucket
, s
->object
);
3777 map
<string
, bufferlist
> attrs
;
3780 if (!s
->object
.empty()) {
3781 if (need_object_expiration() || multipart_delete
) {
3782 /* check if obj exists, read orig attrs */
3783 op_ret
= get_obj_attrs(store
, s
, obj
, attrs
);
3789 if (multipart_delete
) {
3790 const auto slo_attr
= attrs
.find(RGW_ATTR_SLO_MANIFEST
);
3792 if (slo_attr
!= attrs
.end()) {
3793 op_ret
= handle_slo_manifest(slo_attr
->second
);
3795 ldout(s
->cct
, 0) << "ERROR: failed to handle slo manifest ret=" << op_ret
<< dendl
;
3798 op_ret
= -ERR_NOT_SLO_MANIFEST
;
3804 RGWObjectCtx
*obj_ctx
= static_cast<RGWObjectCtx
*>(s
->obj_ctx
);
3805 obj_ctx
->obj
.set_atomic(obj
);
3807 bool ver_restored
= false;
3808 op_ret
= store
->swift_versioning_restore(*obj_ctx
, s
->bucket_owner
.get_id(),
3809 s
->bucket_info
, obj
, ver_restored
);
3814 if (!ver_restored
) {
3815 /* Swift's versioning mechanism hasn't found any previous version of
3816 * the object that could be restored. This means we should proceed
3817 * with the regular delete path. */
3818 RGWRados::Object
del_target(store
, s
->bucket_info
, *obj_ctx
, obj
);
3819 RGWRados::Object::Delete
del_op(&del_target
);
3821 op_ret
= get_system_versioning_params(s
, &del_op
.params
.olh_epoch
,
3822 &del_op
.params
.marker_version_id
);
3827 del_op
.params
.bucket_owner
= s
->bucket_owner
.get_id();
3828 del_op
.params
.versioning_status
= s
->bucket_info
.versioning_status();
3829 del_op
.params
.obj_owner
= s
->owner
;
3830 del_op
.params
.unmod_since
= unmod_since
;
3831 del_op
.params
.high_precision_time
= s
->system_request
; /* system request uses high precision time */
3833 op_ret
= del_op
.delete_obj();
3835 delete_marker
= del_op
.result
.delete_marker
;
3836 version_id
= del_op
.result
.version_id
;
3839 /* Check whether the object has expired. Swift API documentation
3840 * stands that we should return 404 Not Found in such case. */
3841 if (need_object_expiration() && object_is_expired(attrs
)) {
3847 if (op_ret
== -ERR_PRECONDITION_FAILED
&& no_precondition_error
) {
3856 bool RGWCopyObj::parse_copy_location(const string
& url_src
, string
& bucket_name
, rgw_obj_key
& key
)
3861 size_t pos
= url_src
.find('?');
3862 if (pos
== string::npos
) {
3865 name_str
= url_src
.substr(0, pos
);
3866 params_str
= url_src
.substr(pos
+ 1);
3871 url_decode(name_str
, dec_src
);
3872 const char *src
= dec_src
.c_str();
3874 if (*src
== '/') ++src
;
3878 pos
= str
.find('/');
3879 if (pos
==string::npos
)
3882 bucket_name
= str
.substr(0, pos
);
3883 key
.name
= str
.substr(pos
+ 1);
3885 if (key
.name
.empty()) {
3889 if (!params_str
.empty()) {
3891 args
.set(params_str
);
3894 key
.instance
= args
.get("versionId", NULL
);
3900 int RGWCopyObj::verify_permission()
3902 RGWAccessControlPolicy
src_policy(s
->cct
);
3903 op_ret
= get_params();
3907 op_ret
= get_system_versioning_params(s
, &olh_epoch
, &version_id
);
3911 map
<string
, bufferlist
> src_attrs
;
3913 RGWObjectCtx
& obj_ctx
= *static_cast<RGWObjectCtx
*>(s
->obj_ctx
);
3915 if (s
->bucket_instance_id
.empty()) {
3916 op_ret
= store
->get_bucket_info(obj_ctx
, src_tenant_name
, src_bucket_name
, src_bucket_info
, NULL
, &src_attrs
);
3918 /* will only happen in intra region sync where the source and dest bucket is the same */
3919 op_ret
= store
->get_bucket_instance_info(obj_ctx
, s
->bucket_instance_id
, src_bucket_info
, NULL
, &src_attrs
);
3922 if (op_ret
== -ENOENT
) {
3923 op_ret
= -ERR_NO_SUCH_BUCKET
;
3928 src_bucket
= src_bucket_info
.bucket
;
3930 /* get buckets info (source and dest) */
3931 if (s
->local_source
&& source_zone
.empty()) {
3932 rgw_obj
src_obj(src_bucket
, src_object
);
3933 store
->set_atomic(s
->obj_ctx
, src_obj
);
3934 store
->set_prefetch_data(s
->obj_ctx
, src_obj
);
3936 /* check source object permissions */
3937 op_ret
= read_obj_policy(store
, s
, src_bucket_info
, src_attrs
, &src_policy
,
3938 src_bucket
, src_object
);
3943 /* admin request overrides permission checks */
3944 if (! s
->auth
.identity
->is_admin_of(src_policy
.get_owner().get_id()) &&
3945 ! src_policy
.verify_permission(*s
->auth
.identity
, s
->perm_mask
,
3951 RGWAccessControlPolicy
dest_bucket_policy(s
->cct
);
3952 map
<string
, bufferlist
> dest_attrs
;
3954 if (src_bucket_name
.compare(dest_bucket_name
) == 0) { /* will only happen if s->local_source
3955 or intra region sync */
3956 dest_bucket_info
= src_bucket_info
;
3957 dest_attrs
= src_attrs
;
3959 op_ret
= store
->get_bucket_info(obj_ctx
, dest_tenant_name
, dest_bucket_name
,
3960 dest_bucket_info
, nullptr, &dest_attrs
);
3962 if (op_ret
== -ENOENT
) {
3963 op_ret
= -ERR_NO_SUCH_BUCKET
;
3969 dest_bucket
= dest_bucket_info
.bucket
;
3971 rgw_obj
dest_obj(dest_bucket
, dest_object
);
3972 store
->set_atomic(s
->obj_ctx
, dest_obj
);
3974 /* check dest bucket permissions */
3975 op_ret
= read_bucket_policy(store
, s
, dest_bucket_info
, dest_attrs
,
3976 &dest_bucket_policy
, dest_bucket
);
3981 /* admin request overrides permission checks */
3982 if (! s
->auth
.identity
->is_admin_of(dest_policy
.get_owner().get_id()) &&
3983 ! dest_bucket_policy
.verify_permission(*s
->auth
.identity
, s
->perm_mask
,
3988 op_ret
= init_dest_policy();
3997 int RGWCopyObj::init_common()
4000 if (parse_time(if_mod
, &mod_time
) < 0) {
4004 mod_ptr
= &mod_time
;
4008 if (parse_time(if_unmod
, &unmod_time
) < 0) {
4012 unmod_ptr
= &unmod_time
;
4016 dest_policy
.encode(aclbl
);
4017 emplace_attr(RGW_ATTR_ACL
, std::move(aclbl
));
4019 rgw_get_request_metadata(s
->cct
, s
->info
, attrs
);
4020 populate_with_generic_attrs(s
, attrs
);
4025 static void copy_obj_progress_cb(off_t ofs
, void *param
)
4027 RGWCopyObj
*op
= static_cast<RGWCopyObj
*>(param
);
4028 op
->progress_cb(ofs
);
4031 void RGWCopyObj::progress_cb(off_t ofs
)
4033 if (!s
->cct
->_conf
->rgw_copy_obj_progress
)
4036 if (ofs
- last_ofs
< s
->cct
->_conf
->rgw_copy_obj_progress_every_bytes
)
4039 send_partial_response(ofs
);
4044 void RGWCopyObj::pre_exec()
4046 rgw_bucket_object_pre_exec(s
);
4049 void RGWCopyObj::execute()
4051 if (init_common() < 0)
4054 rgw_obj
src_obj(src_bucket
, src_object
);
4055 rgw_obj
dst_obj(dest_bucket
, dest_object
);
4057 RGWObjectCtx
& obj_ctx
= *static_cast<RGWObjectCtx
*>(s
->obj_ctx
);
4058 obj_ctx
.obj
.set_atomic(src_obj
);
4059 obj_ctx
.obj
.set_atomic(dst_obj
);
4061 encode_delete_at_attr(delete_at
, attrs
);
4063 bool high_precision_time
= (s
->system_request
);
4065 /* Handle object versioning of Swift API. In case of copying to remote this
4066 * should fail gently (op_ret == 0) as the dst_obj will not exist here. */
4067 op_ret
= store
->swift_versioning_copy(obj_ctx
,
4068 dest_bucket_info
.owner
,
4075 op_ret
= store
->copy_obj(obj_ctx
,
4089 high_precision_time
,
4094 attrs
, RGW_OBJ_CATEGORY_MAIN
,
4096 (delete_at
? *delete_at
: real_time()),
4097 (version_id
.empty() ? NULL
: &version_id
),
4098 &s
->req_id
, /* use req_id as tag */
4101 copy_obj_progress_cb
, (void *)this
4105 int RGWGetACLs::verify_permission()
4108 if (!s
->object
.empty()) {
4109 perm
= verify_object_permission(s
, RGW_PERM_READ_ACP
);
4111 perm
= verify_bucket_permission(s
, RGW_PERM_READ_ACP
);
4119 void RGWGetACLs::pre_exec()
4121 rgw_bucket_object_pre_exec(s
);
4124 void RGWGetACLs::execute()
4127 RGWAccessControlPolicy
*acl
= (!s
->object
.empty() ? s
->object_acl
: s
->bucket_acl
);
4128 RGWAccessControlPolicy_S3
*s3policy
= static_cast<RGWAccessControlPolicy_S3
*>(acl
);
4129 s3policy
->to_xml(ss
);
4135 int RGWPutACLs::verify_permission()
4138 if (!s
->object
.empty()) {
4139 perm
= verify_object_permission(s
, RGW_PERM_WRITE_ACP
);
4141 perm
= verify_bucket_permission(s
, RGW_PERM_WRITE_ACP
);
4149 int RGWGetLC::verify_permission()
4152 perm
= verify_bucket_permission(s
, RGW_PERM_READ_ACP
);
4159 int RGWPutLC::verify_permission()
4162 perm
= verify_bucket_permission(s
, RGW_PERM_WRITE_ACP
);
4169 int RGWDeleteLC::verify_permission()
4172 perm
= verify_bucket_permission(s
, RGW_PERM_WRITE_ACP
);
4179 void RGWPutACLs::pre_exec()
4181 rgw_bucket_object_pre_exec(s
);
4184 void RGWGetLC::pre_exec()
4186 rgw_bucket_object_pre_exec(s
);
4189 void RGWPutLC::pre_exec()
4191 rgw_bucket_object_pre_exec(s
);
4194 void RGWDeleteLC::pre_exec()
4196 rgw_bucket_object_pre_exec(s
);
4199 void RGWPutACLs::execute()
4203 RGWAccessControlPolicy_S3
*policy
= NULL
;
4204 RGWACLXMLParser_S3
parser(s
->cct
);
4205 RGWAccessControlPolicy_S3
new_policy(s
->cct
);
4207 char *new_data
= NULL
;
4210 op_ret
= 0; /* XXX redundant? */
4212 if (!parser
.init()) {
4218 RGWAccessControlPolicy
*existing_policy
= (s
->object
.empty() ? s
->bucket_acl
: s
->object_acl
);
4220 owner
= existing_policy
->get_owner();
4222 op_ret
= get_params();
4226 ldout(s
->cct
, 15) << "read len=" << len
<< " data=" << (data
? data
: "") << dendl
;
4228 if (!s
->canned_acl
.empty() && len
) {
4233 if (!s
->canned_acl
.empty() || s
->has_acl_header
) {
4234 op_ret
= get_policy_from_state(store
, s
, ss
);
4238 new_data
= strdup(ss
.str().c_str());
4241 len
= ss
.str().size();
4244 if (!parser
.parse(data
, len
, 1)) {
4248 policy
= static_cast<RGWAccessControlPolicy_S3
*>(parser
.find_first("AccessControlPolicy"));
4254 // forward bucket acl requests to meta master zone
4255 if (s
->object
.empty() && !store
->is_meta_master()) {
4257 // include acl data unless it was generated from a canned_acl
4258 if (s
->canned_acl
.empty()) {
4259 in_data
.append(data
, len
);
4261 op_ret
= forward_request_to_master(s
, NULL
, store
, in_data
, NULL
);
4263 ldout(s
->cct
, 20) << __func__
<< "forward_request_to_master returned ret=" << op_ret
<< dendl
;
4268 if (s
->cct
->_conf
->subsys
.should_gather(ceph_subsys_rgw
, 15)) {
4269 ldout(s
->cct
, 15) << "Old AccessControlPolicy";
4270 policy
->to_xml(*_dout
);
4274 op_ret
= policy
->rebuild(store
, &owner
, new_policy
);
4278 if (s
->cct
->_conf
->subsys
.should_gather(ceph_subsys_rgw
, 15)) {
4279 ldout(s
->cct
, 15) << "New AccessControlPolicy:";
4280 new_policy
.to_xml(*_dout
);
4284 new_policy
.encode(bl
);
4285 map
<string
, bufferlist
> attrs
;
4287 if (!s
->object
.empty()) {
4288 obj
= rgw_obj(s
->bucket
, s
->object
);
4289 store
->set_atomic(s
->obj_ctx
, obj
);
4290 //if instance is empty, we should modify the latest object
4291 op_ret
= modify_obj_attr(store
, s
, obj
, RGW_ATTR_ACL
, bl
);
4293 attrs
= s
->bucket_attrs
;
4294 attrs
[RGW_ATTR_ACL
] = bl
;
4295 op_ret
= rgw_bucket_set_attrs(store
, s
->bucket_info
, attrs
, &s
->bucket_info
.objv_tracker
);
4297 if (op_ret
== -ECANCELED
) {
4298 op_ret
= 0; /* lost a race, but it's ok because acls are immutable */
4302 static void get_lc_oid(struct req_state
*s
, string
& oid
)
4304 string shard_id
= s
->bucket
.name
+ ':' +s
->bucket
.bucket_id
;
4305 int max_objs
= (s
->cct
->_conf
->rgw_lc_max_objs
> HASH_PRIME
)?HASH_PRIME
:s
->cct
->_conf
->rgw_lc_max_objs
;
4306 int index
= ceph_str_hash_linux(shard_id
.c_str(), shard_id
.size()) % HASH_PRIME
% max_objs
;
4307 oid
= lc_oid_prefix
;
4309 snprintf(buf
, 32, ".%d", index
);
4314 void RGWPutLC::execute()
4318 RGWLifecycleConfiguration_S3
*config
= NULL
;
4319 RGWLCXMLParser_S3
parser(s
->cct
);
4320 RGWLifecycleConfiguration_S3
new_config(s
->cct
);
4322 if (!parser
.init()) {
4327 op_ret
= get_params();
4331 ldout(s
->cct
, 15) << "read len=" << len
<< " data=" << (data
? data
: "") << dendl
;
4333 if (!parser
.parse(data
, len
, 1)) {
4334 op_ret
= -ERR_MALFORMED_XML
;
4337 config
= static_cast<RGWLifecycleConfiguration_S3
*>(parser
.find_first("LifecycleConfiguration"));
4339 op_ret
= -ERR_MALFORMED_XML
;
4343 if (s
->cct
->_conf
->subsys
.should_gather(ceph_subsys_rgw
, 15)) {
4344 ldout(s
->cct
, 15) << "Old LifecycleConfiguration:";
4345 config
->to_xml(*_dout
);
4349 op_ret
= config
->rebuild(store
, new_config
);
4353 if (s
->cct
->_conf
->subsys
.should_gather(ceph_subsys_rgw
, 15)) {
4354 ldout(s
->cct
, 15) << "New LifecycleConfiguration:";
4355 new_config
.to_xml(*_dout
);
4359 new_config
.encode(bl
);
4360 map
<string
, bufferlist
> attrs
;
4361 attrs
= s
->bucket_attrs
;
4362 attrs
[RGW_ATTR_LC
] = bl
;
4363 op_ret
= rgw_bucket_set_attrs(store
, s
->bucket_info
, attrs
, &s
->bucket_info
.objv_tracker
);
4366 string shard_id
= s
->bucket
.tenant
+ ':' + s
->bucket
.name
+ ':' + s
->bucket
.bucket_id
;
4369 pair
<string
, int> entry(shard_id
, lc_uninitial
);
4370 int max_lock_secs
= s
->cct
->_conf
->rgw_lc_lock_max_time
;
4371 rados::cls::lock::Lock
l(lc_index_lock_name
);
4372 utime_t
time(max_lock_secs
, 0);
4373 l
.set_duration(time
);
4374 l
.set_cookie(cookie
);
4375 librados::IoCtx
*ctx
= store
->get_lc_pool_ctx();
4377 op_ret
= l
.lock_exclusive(ctx
, oid
);
4378 if (op_ret
== -EBUSY
) {
4379 dout(0) << "RGWLC::RGWPutLC() failed to acquire lock on, sleep 5, try again" << oid
<< dendl
;
4384 dout(0) << "RGWLC::RGWPutLC() failed to acquire lock " << oid
<< op_ret
<< dendl
;
4387 op_ret
= cls_rgw_lc_set_entry(*ctx
, oid
, entry
);
4389 dout(0) << "RGWLC::RGWPutLC() failed to set entry " << oid
<< op_ret
<< dendl
;
4397 void RGWDeleteLC::execute()
4400 map
<string
, bufferlist
> orig_attrs
, attrs
;
4401 map
<string
, bufferlist
>::iterator iter
;
4403 store
->get_bucket_instance_obj(s
->bucket
, obj
);
4404 store
->set_prefetch_data(s
->obj_ctx
, obj
);
4405 op_ret
= get_system_obj_attrs(store
, s
, obj
, orig_attrs
, NULL
, &s
->bucket_info
.objv_tracker
);
4409 for (iter
= orig_attrs
.begin(); iter
!= orig_attrs
.end(); ++iter
) {
4410 const string
& name
= iter
->first
;
4411 dout(10) << "DeleteLC : attr: " << name
<< dendl
;
4412 if (name
.compare(0, (sizeof(RGW_ATTR_LC
) - 1), RGW_ATTR_LC
) != 0) {
4413 if (attrs
.find(name
) == attrs
.end()) {
4414 attrs
[name
] = iter
->second
;
4418 op_ret
= rgw_bucket_set_attrs(store
, s
->bucket_info
, attrs
, &s
->bucket_info
.objv_tracker
);
4419 string shard_id
= s
->bucket
.name
+ ':' +s
->bucket
.bucket_id
;
4420 pair
<string
, int> entry(shard_id
, lc_uninitial
);
4423 int max_lock_secs
= s
->cct
->_conf
->rgw_lc_lock_max_time
;
4424 librados::IoCtx
*ctx
= store
->get_lc_pool_ctx();
4425 rados::cls::lock::Lock
l(lc_index_lock_name
);
4426 utime_t
time(max_lock_secs
, 0);
4427 l
.set_duration(time
);
4429 op_ret
= l
.lock_exclusive(ctx
, oid
);
4430 if (op_ret
== -EBUSY
) {
4431 dout(0) << "RGWLC::RGWDeleteLC() failed to acquire lock on, sleep 5, try again" << oid
<< dendl
;
4436 dout(0) << "RGWLC::RGWDeleteLC() failed to acquire lock " << oid
<< op_ret
<< dendl
;
4439 op_ret
= cls_rgw_lc_rm_entry(*ctx
, oid
, entry
);
4441 dout(0) << "RGWLC::RGWDeleteLC() failed to set entry " << oid
<< op_ret
<< dendl
;
4449 int RGWGetCORS::verify_permission()
4451 if (false == s
->auth
.identity
->is_owner_of(s
->bucket_owner
.get_id())) {
4458 void RGWGetCORS::execute()
4460 op_ret
= read_bucket_cors();
4465 dout(2) << "No CORS configuration set yet for this bucket" << dendl
;
4471 int RGWPutCORS::verify_permission()
4473 if (false == s
->auth
.identity
->is_owner_of(s
->bucket_owner
.get_id())) {
4480 void RGWPutCORS::execute()
4484 op_ret
= get_params();
4488 map
<string
, bufferlist
> attrs
= s
->bucket_attrs
;
4489 attrs
[RGW_ATTR_CORS
] = cors_bl
;
4490 op_ret
= rgw_bucket_set_attrs(store
, s
->bucket_info
, attrs
, &s
->bucket_info
.objv_tracker
);
4493 int RGWDeleteCORS::verify_permission()
4495 if (false == s
->auth
.identity
->is_owner_of(s
->bucket_owner
.get_id())) {
4502 void RGWDeleteCORS::execute()
4504 op_ret
= read_bucket_cors();
4511 dout(2) << "No CORS configuration set yet for this bucket" << dendl
;
4515 store
->get_bucket_instance_obj(s
->bucket
, obj
);
4516 store
->set_prefetch_data(s
->obj_ctx
, obj
);
4517 map
<string
, bufferlist
> orig_attrs
, attrs
, rmattrs
;
4518 map
<string
, bufferlist
>::iterator iter
;
4520 op_ret
= get_system_obj_attrs(store
, s
, obj
, orig_attrs
, NULL
, &s
->bucket_info
.objv_tracker
);
4524 /* only remove meta attrs */
4525 for (iter
= orig_attrs
.begin(); iter
!= orig_attrs
.end(); ++iter
) {
4526 const string
& name
= iter
->first
;
4527 dout(10) << "DeleteCORS : attr: " << name
<< dendl
;
4528 if (name
.compare(0, (sizeof(RGW_ATTR_CORS
) - 1), RGW_ATTR_CORS
) == 0) {
4529 rmattrs
[name
] = iter
->second
;
4530 } else if (attrs
.find(name
) == attrs
.end()) {
4531 attrs
[name
] = iter
->second
;
4534 op_ret
= rgw_bucket_set_attrs(store
, s
->bucket_info
, attrs
, &s
->bucket_info
.objv_tracker
);
4537 void RGWOptionsCORS::get_response_params(string
& hdrs
, string
& exp_hdrs
, unsigned *max_age
) {
4538 get_cors_response_headers(rule
, req_hdrs
, hdrs
, exp_hdrs
, max_age
);
4541 int RGWOptionsCORS::validate_cors_request(RGWCORSConfiguration
*cc
) {
4542 rule
= cc
->host_name_rule(origin
);
4544 dout(10) << "There is no cors rule present for " << origin
<< dendl
;
4548 if (!validate_cors_rule_method(rule
, req_meth
)) {
4554 void RGWOptionsCORS::execute()
4556 op_ret
= read_bucket_cors();
4560 origin
= s
->info
.env
->get("HTTP_ORIGIN");
4563 "Preflight request without mandatory Origin header"
4568 req_meth
= s
->info
.env
->get("HTTP_ACCESS_CONTROL_REQUEST_METHOD");
4571 "Preflight request without mandatory Access-control-request-method header"
4577 dout(2) << "No CORS configuration set yet for this bucket" << dendl
;
4581 req_hdrs
= s
->info
.env
->get("HTTP_ACCESS_CONTROL_REQUEST_HEADERS");
4582 op_ret
= validate_cors_request(&bucket_cors
);
4584 origin
= req_meth
= NULL
;
4590 int RGWGetRequestPayment::verify_permission()
4595 void RGWGetRequestPayment::pre_exec()
4597 rgw_bucket_object_pre_exec(s
);
4600 void RGWGetRequestPayment::execute()
4602 requester_pays
= s
->bucket_info
.requester_pays
;
4605 int RGWSetRequestPayment::verify_permission()
4607 if (false == s
->auth
.identity
->is_owner_of(s
->bucket_owner
.get_id())) {
4614 void RGWSetRequestPayment::pre_exec()
4616 rgw_bucket_object_pre_exec(s
);
4619 void RGWSetRequestPayment::execute()
4621 op_ret
= get_params();
4626 s
->bucket_info
.requester_pays
= requester_pays
;
4627 op_ret
= store
->put_bucket_instance_info(s
->bucket_info
, false, real_time(),
4630 ldout(s
->cct
, 0) << "NOTICE: put_bucket_info on bucket=" << s
->bucket
.name
4631 << " returned err=" << op_ret
<< dendl
;
4636 int RGWInitMultipart::verify_permission()
4638 if (!verify_bucket_permission(s
, RGW_PERM_WRITE
))
4644 void RGWInitMultipart::pre_exec()
4646 rgw_bucket_object_pre_exec(s
);
4649 void RGWInitMultipart::execute()
4652 map
<string
, bufferlist
> attrs
;
4655 if (get_params() < 0)
4658 if (s
->object
.empty())
4661 policy
.encode(aclbl
);
4662 attrs
[RGW_ATTR_ACL
] = aclbl
;
4664 populate_with_generic_attrs(s
, attrs
);
4666 /* select encryption mode */
4667 op_ret
= prepare_encryption(attrs
);
4671 rgw_get_request_metadata(s
->cct
, s
->info
, attrs
);
4675 gen_rand_alphanumeric(s
->cct
, buf
, sizeof(buf
) - 1);
4676 upload_id
= MULTIPART_UPLOAD_ID_PREFIX
; /* v2 upload id */
4677 upload_id
.append(buf
);
4679 string tmp_obj_name
;
4680 RGWMPObj
mp(s
->object
.name
, upload_id
);
4681 tmp_obj_name
= mp
.get_meta();
4683 obj
.init_ns(s
->bucket
, tmp_obj_name
, mp_ns
);
4684 // the meta object will be indexed with 0 size, we c
4685 obj
.set_in_extra_data(true);
4686 obj
.index_hash_source
= s
->object
.name
;
4688 RGWRados::Object
op_target(store
, s
->bucket_info
, *static_cast<RGWObjectCtx
*>(s
->obj_ctx
), obj
);
4689 op_target
.set_versioning_disabled(true); /* no versioning for multipart meta */
4691 RGWRados::Object::Write
obj_op(&op_target
);
4693 obj_op
.meta
.owner
= s
->owner
.get_id();
4694 obj_op
.meta
.category
= RGW_OBJ_CATEGORY_MULTIMETA
;
4695 obj_op
.meta
.flags
= PUT_OBJ_CREATE_EXCL
;
4697 op_ret
= obj_op
.write_meta(0, 0, attrs
);
4698 } while (op_ret
== -EEXIST
);
4701 static int get_multipart_info(RGWRados
*store
, struct req_state
*s
,
4703 RGWAccessControlPolicy
*policy
,
4704 map
<string
, bufferlist
>& attrs
)
4706 map
<string
, bufferlist
>::iterator iter
;
4710 obj
.init_ns(s
->bucket
, meta_oid
, mp_ns
);
4711 obj
.set_in_extra_data(true);
4713 int op_ret
= get_obj_attrs(store
, s
, obj
, attrs
);
4715 if (op_ret
== -ENOENT
) {
4716 return -ERR_NO_SUCH_UPLOAD
;
4722 for (iter
= attrs
.begin(); iter
!= attrs
.end(); ++iter
) {
4723 string name
= iter
->first
;
4724 if (name
.compare(RGW_ATTR_ACL
) == 0) {
4725 bufferlist
& bl
= iter
->second
;
4726 bufferlist::iterator bli
= bl
.begin();
4728 ::decode(*policy
, bli
);
4729 } catch (buffer::error
& err
) {
4730 ldout(s
->cct
, 0) << "ERROR: could not decode policy, caught buffer::error" << dendl
;
4741 int RGWCompleteMultipart::verify_permission()
4743 if (!verify_bucket_permission(s
, RGW_PERM_WRITE
))
4749 void RGWCompleteMultipart::pre_exec()
4751 rgw_bucket_object_pre_exec(s
);
4754 void RGWCompleteMultipart::execute()
4756 RGWMultiCompleteUpload
*parts
;
4757 map
<int, string
>::iterator iter
;
4758 RGWMultiXMLParser parser
;
4760 map
<uint32_t, RGWUploadPartInfo
> obj_parts
;
4761 map
<uint32_t, RGWUploadPartInfo
>::iterator obj_iter
;
4762 map
<string
, bufferlist
> attrs
;
4765 char final_etag
[CEPH_CRYPTO_MD5_DIGESTSIZE
];
4766 char final_etag_str
[CEPH_CRYPTO_MD5_DIGESTSIZE
* 2 + 16];
4771 RGWObjManifest manifest
;
4772 uint64_t olh_epoch
= 0;
4775 op_ret
= get_params();
4778 op_ret
= get_system_versioning_params(s
, &olh_epoch
, &version_id
);
4783 if (!data
|| !len
) {
4784 op_ret
= -ERR_MALFORMED_XML
;
4788 if (!parser
.init()) {
4793 if (!parser
.parse(data
, len
, 1)) {
4794 op_ret
= -ERR_MALFORMED_XML
;
4798 parts
= static_cast<RGWMultiCompleteUpload
*>(parser
.find_first("CompleteMultipartUpload"));
4799 if (!parts
|| parts
->parts
.empty()) {
4800 op_ret
= -ERR_MALFORMED_XML
;
4804 if ((int)parts
->parts
.size() >
4805 s
->cct
->_conf
->rgw_multipart_part_upload_limit
) {
4810 mp
.init(s
->object
.name
, upload_id
);
4811 meta_oid
= mp
.get_meta();
4813 int total_parts
= 0;
4814 int handled_parts
= 0;
4815 int max_parts
= 1000;
4818 RGWCompressionInfo cs_info
;
4819 bool compressed
= false;
4820 uint64_t accounted_size
= 0;
4822 uint64_t min_part_size
= s
->cct
->_conf
->rgw_multipart_min_part_size
;
4824 list
<rgw_obj_index_key
> remove_objs
; /* objects to be removed from index listing */
4826 bool versioned_object
= s
->bucket_info
.versioning_enabled();
4828 iter
= parts
->parts
.begin();
4830 meta_obj
.init_ns(s
->bucket
, meta_oid
, mp_ns
);
4831 meta_obj
.set_in_extra_data(true);
4832 meta_obj
.index_hash_source
= s
->object
.name
;
4834 op_ret
= get_obj_attrs(store
, s
, meta_obj
, attrs
);
4837 ldout(s
->cct
, 0) << "ERROR: failed to get obj attrs, obj=" << meta_obj
4838 << " ret=" << op_ret
<< dendl
;
4843 op_ret
= list_multipart_parts(store
, s
, upload_id
, meta_oid
, max_parts
,
4844 marker
, obj_parts
, &marker
, &truncated
);
4845 if (op_ret
== -ENOENT
) {
4846 op_ret
= -ERR_NO_SUCH_UPLOAD
;
4851 total_parts
+= obj_parts
.size();
4852 if (!truncated
&& total_parts
!= (int)parts
->parts
.size()) {
4853 ldout(s
->cct
, 0) << "NOTICE: total parts mismatch: have: " << total_parts
4854 << " expected: " << parts
->parts
.size() << dendl
;
4855 op_ret
= -ERR_INVALID_PART
;
4859 for (obj_iter
= obj_parts
.begin(); iter
!= parts
->parts
.end() && obj_iter
!= obj_parts
.end(); ++iter
, ++obj_iter
, ++handled_parts
) {
4860 uint64_t part_size
= obj_iter
->second
.accounted_size
;
4861 if (handled_parts
< (int)parts
->parts
.size() - 1 &&
4862 part_size
< min_part_size
) {
4863 op_ret
= -ERR_TOO_SMALL
;
4867 char petag
[CEPH_CRYPTO_MD5_DIGESTSIZE
];
4868 if (iter
->first
!= (int)obj_iter
->first
) {
4869 ldout(s
->cct
, 0) << "NOTICE: parts num mismatch: next requested: "
4870 << iter
->first
<< " next uploaded: "
4871 << obj_iter
->first
<< dendl
;
4872 op_ret
= -ERR_INVALID_PART
;
4875 string part_etag
= rgw_string_unquote(iter
->second
);
4876 if (part_etag
.compare(obj_iter
->second
.etag
) != 0) {
4877 ldout(s
->cct
, 0) << "NOTICE: etag mismatch: part: " << iter
->first
4878 << " etag: " << iter
->second
<< dendl
;
4879 op_ret
= -ERR_INVALID_PART
;
4883 hex_to_buf(obj_iter
->second
.etag
.c_str(), petag
,
4884 CEPH_CRYPTO_MD5_DIGESTSIZE
);
4885 hash
.Update((const byte
*)petag
, sizeof(petag
));
4887 RGWUploadPartInfo
& obj_part
= obj_iter
->second
;
4889 /* update manifest for part */
4890 string oid
= mp
.get_part(obj_iter
->second
.num
);
4892 src_obj
.init_ns(s
->bucket
, oid
, mp_ns
);
4894 if (obj_part
.manifest
.empty()) {
4895 ldout(s
->cct
, 0) << "ERROR: empty manifest for object part: obj="
4896 << src_obj
<< dendl
;
4897 op_ret
= -ERR_INVALID_PART
;
4900 manifest
.append(obj_part
.manifest
, store
);
4903 if (obj_part
.cs_info
.compression_type
!= "none") {
4904 if (compressed
&& cs_info
.compression_type
!= obj_part
.cs_info
.compression_type
) {
4905 ldout(s
->cct
, 0) << "ERROR: compression type was changed during multipart upload ("
4906 << cs_info
.compression_type
<< ">>" << obj_part
.cs_info
.compression_type
<< ")" << dendl
;
4907 op_ret
= -ERR_INVALID_PART
;
4910 int new_ofs
; // offset in compression data for new part
4911 if (cs_info
.blocks
.size() > 0)
4912 new_ofs
= cs_info
.blocks
.back().new_ofs
+ cs_info
.blocks
.back().len
;
4915 for (const auto& block
: obj_part
.cs_info
.blocks
) {
4916 compression_block cb
;
4917 cb
.old_ofs
= block
.old_ofs
+ cs_info
.orig_size
;
4918 cb
.new_ofs
= new_ofs
;
4920 cs_info
.blocks
.push_back(cb
);
4921 new_ofs
= cb
.new_ofs
+ cb
.len
;
4924 cs_info
.compression_type
= obj_part
.cs_info
.compression_type
;
4925 cs_info
.orig_size
+= obj_part
.cs_info
.orig_size
;
4929 rgw_obj_index_key remove_key
;
4930 src_obj
.key
.get_index_key(&remove_key
);
4932 remove_objs
.push_back(remove_key
);
4934 ofs
+= obj_part
.size
;
4935 accounted_size
+= obj_part
.accounted_size
;
4937 } while (truncated
);
4938 hash
.Final((byte
*)final_etag
);
4940 buf_to_hex((unsigned char *)final_etag
, sizeof(final_etag
), final_etag_str
);
4941 snprintf(&final_etag_str
[CEPH_CRYPTO_MD5_DIGESTSIZE
* 2], sizeof(final_etag_str
) - CEPH_CRYPTO_MD5_DIGESTSIZE
* 2,
4942 "-%lld", (long long)parts
->parts
.size());
4943 etag
= final_etag_str
;
4944 ldout(s
->cct
, 10) << "calculated etag: " << final_etag_str
<< dendl
;
4946 etag_bl
.append(final_etag_str
, strlen(final_etag_str
) + 1);
4948 attrs
[RGW_ATTR_ETAG
] = etag_bl
;
4951 // write compression attribute to full object
4953 ::encode(cs_info
, tmp
);
4954 attrs
[RGW_ATTR_COMPRESSION
] = tmp
;
4957 target_obj
.init(s
->bucket
, s
->object
.name
);
4958 if (versioned_object
) {
4959 store
->gen_rand_obj_instance_name(&target_obj
);
4962 RGWObjectCtx
& obj_ctx
= *static_cast<RGWObjectCtx
*>(s
->obj_ctx
);
4964 obj_ctx
.obj
.set_atomic(target_obj
);
4966 RGWRados::Object
op_target(store
, s
->bucket_info
, *static_cast<RGWObjectCtx
*>(s
->obj_ctx
), target_obj
);
4967 RGWRados::Object::Write
obj_op(&op_target
);
4969 obj_op
.meta
.manifest
= &manifest
;
4970 obj_op
.meta
.remove_objs
= &remove_objs
;
4972 obj_op
.meta
.ptag
= &s
->req_id
; /* use req_id as operation tag */
4973 obj_op
.meta
.owner
= s
->owner
.get_id();
4974 obj_op
.meta
.flags
= PUT_OBJ_CREATE
;
4975 op_ret
= obj_op
.write_meta(ofs
, accounted_size
, attrs
);
4979 // remove the upload obj
4980 int r
= store
->delete_obj(*static_cast<RGWObjectCtx
*>(s
->obj_ctx
),
4981 s
->bucket_info
, meta_obj
, 0);
4983 ldout(store
->ctx(), 0) << "WARNING: failed to remove object " << meta_obj
<< dendl
;
4987 int RGWAbortMultipart::verify_permission()
4989 if (!verify_bucket_permission(s
, RGW_PERM_WRITE
))
4995 void RGWAbortMultipart::pre_exec()
4997 rgw_bucket_object_pre_exec(s
);
5000 void RGWAbortMultipart::execute()
5005 upload_id
= s
->info
.args
.get("uploadId");
5006 map
<string
, bufferlist
> attrs
;
5010 if (upload_id
.empty() || s
->object
.empty())
5013 mp
.init(s
->object
.name
, upload_id
);
5014 meta_oid
= mp
.get_meta();
5016 op_ret
= get_multipart_info(store
, s
, meta_oid
, NULL
, attrs
);
5020 RGWObjectCtx
*obj_ctx
= static_cast<RGWObjectCtx
*>(s
->obj_ctx
);
5021 op_ret
= abort_multipart_upload(store
, s
->cct
, obj_ctx
, s
->bucket_info
, mp
);
5024 int RGWListMultipart::verify_permission()
5026 if (!verify_object_permission(s
, RGW_PERM_READ
))
5032 void RGWListMultipart::pre_exec()
5034 rgw_bucket_object_pre_exec(s
);
5037 void RGWListMultipart::execute()
5039 map
<string
, bufferlist
> xattrs
;
5043 op_ret
= get_params();
5047 mp
.init(s
->object
.name
, upload_id
);
5048 meta_oid
= mp
.get_meta();
5050 op_ret
= get_multipart_info(store
, s
, meta_oid
, &policy
, xattrs
);
5054 op_ret
= list_multipart_parts(store
, s
, upload_id
, meta_oid
, max_parts
,
5055 marker
, parts
, NULL
, &truncated
);
5058 int RGWListBucketMultiparts::verify_permission()
5060 if (!verify_bucket_permission(s
, RGW_PERM_READ
))
5066 void RGWListBucketMultiparts::pre_exec()
5068 rgw_bucket_object_pre_exec(s
);
5071 void RGWListBucketMultiparts::execute()
5073 vector
<rgw_bucket_dir_entry
> objs
;
5076 op_ret
= get_params();
5080 if (s
->prot_flags
& RGW_REST_SWIFT
) {
5082 path_args
= s
->info
.args
.get("path");
5083 if (!path_args
.empty()) {
5084 if (!delimiter
.empty() || !prefix
.empty()) {
5092 marker_meta
= marker
.get_meta();
5094 RGWRados::Bucket
target(store
, s
->bucket_info
);
5095 RGWRados::Bucket::List
list_op(&target
);
5097 list_op
.params
.prefix
= prefix
;
5098 list_op
.params
.delim
= delimiter
;
5099 list_op
.params
.marker
= marker_meta
;
5100 list_op
.params
.ns
= mp_ns
;
5101 list_op
.params
.filter
= &mp_filter
;
5103 op_ret
= list_op
.list_objects(max_uploads
, &objs
, &common_prefixes
,
5105 if (!objs
.empty()) {
5106 vector
<rgw_bucket_dir_entry
>::iterator iter
;
5107 RGWMultipartUploadEntry entry
;
5108 for (iter
= objs
.begin(); iter
!= objs
.end(); ++iter
) {
5109 rgw_obj_key
key(iter
->key
);
5110 if (!entry
.mp
.from_meta(key
.name
))
5113 uploads
.push_back(entry
);
5115 next_marker
= entry
;
5119 void RGWGetHealthCheck::execute()
5121 if (!g_conf
->rgw_healthcheck_disabling_path
.empty() &&
5122 (::access(g_conf
->rgw_healthcheck_disabling_path
.c_str(), F_OK
) == 0)) {
5123 /* Disabling path specified & existent in the filesystem. */
5124 op_ret
= -ERR_SERVICE_UNAVAILABLE
; /* 503 */
5126 op_ret
= 0; /* 200 OK */
5130 int RGWDeleteMultiObj::verify_permission()
5132 if (!verify_bucket_permission(s
, RGW_PERM_WRITE
))
5138 void RGWDeleteMultiObj::pre_exec()
5140 rgw_bucket_object_pre_exec(s
);
5143 void RGWDeleteMultiObj::execute()
5145 RGWMultiDelDelete
*multi_delete
;
5146 vector
<rgw_obj_key
>::iterator iter
;
5147 RGWMultiDelXMLParser parser
;
5148 int num_processed
= 0;
5149 RGWObjectCtx
*obj_ctx
= static_cast<RGWObjectCtx
*>(s
->obj_ctx
);
5151 op_ret
= get_params();
5161 if (!parser
.init()) {
5166 if (!parser
.parse(data
, len
, 1)) {
5171 multi_delete
= static_cast<RGWMultiDelDelete
*>(parser
.find_first("Delete"));
5172 if (!multi_delete
) {
5177 if (multi_delete
->is_quiet())
5181 if (multi_delete
->objects
.empty()) {
5185 for (iter
= multi_delete
->objects
.begin();
5186 iter
!= multi_delete
->objects
.end() && num_processed
< max_to_delete
;
5187 ++iter
, num_processed
++) {
5188 rgw_obj
obj(bucket
, *iter
);
5190 obj_ctx
->obj
.set_atomic(obj
);
5192 RGWRados::Object
del_target(store
, s
->bucket_info
, *obj_ctx
, obj
);
5193 RGWRados::Object::Delete
del_op(&del_target
);
5195 del_op
.params
.bucket_owner
= s
->bucket_owner
.get_id();
5196 del_op
.params
.versioning_status
= s
->bucket_info
.versioning_status();
5197 del_op
.params
.obj_owner
= s
->owner
;
5199 op_ret
= del_op
.delete_obj();
5200 if (op_ret
== -ENOENT
) {
5204 send_partial_response(*iter
, del_op
.result
.delete_marker
,
5205 del_op
.result
.version_id
, op_ret
);
5208 /* set the return code to zero, errors at this point will be
5209 dumped to the response */
5213 // will likely segfault if begin_response() has not been called
5225 bool RGWBulkDelete::Deleter::verify_permission(RGWBucketInfo
& binfo
,
5226 map
<string
, bufferlist
>& battrs
,
5227 ACLOwner
& bucket_owner
/* out */)
5229 RGWAccessControlPolicy
bacl(store
->ctx());
5230 int ret
= read_bucket_policy(store
, s
, binfo
, battrs
, &bacl
, binfo
.bucket
);
5235 bucket_owner
= bacl
.get_owner();
5237 /* We can use global user_acl because each BulkDelete request is allowed
5238 * to work on entities from a single account only. */
5239 return verify_bucket_permission(s
, s
->user_acl
.get(), &bacl
, RGW_PERM_WRITE
);
5242 bool RGWBulkDelete::Deleter::delete_single(const acct_path_t
& path
)
5244 auto& obj_ctx
= *static_cast<RGWObjectCtx
*>(s
->obj_ctx
);
5246 RGWBucketInfo binfo
;
5247 map
<string
, bufferlist
> battrs
;
5250 int ret
= store
->get_bucket_info(obj_ctx
, s
->user
->user_id
.tenant
,
5251 path
.bucket_name
, binfo
, nullptr,
5257 if (!verify_permission(binfo
, battrs
, bowner
)) {
5262 if (!path
.obj_key
.empty()) {
5263 rgw_obj
obj(binfo
.bucket
, path
.obj_key
);
5264 obj_ctx
.obj
.set_atomic(obj
);
5266 RGWRados::Object
del_target(store
, binfo
, obj_ctx
, obj
);
5267 RGWRados::Object::Delete
del_op(&del_target
);
5269 del_op
.params
.bucket_owner
= binfo
.owner
;
5270 del_op
.params
.versioning_status
= binfo
.versioning_status();
5271 del_op
.params
.obj_owner
= bowner
;
5273 ret
= del_op
.delete_obj();
5278 RGWObjVersionTracker ot
;
5279 ot
.read_version
= binfo
.ep_objv
;
5281 ret
= store
->delete_bucket(binfo
, ot
);
5283 ret
= rgw_unlink_bucket(store
, binfo
.owner
, binfo
.bucket
.tenant
,
5284 binfo
.bucket
.name
, false);
5286 ldout(s
->cct
, 0) << "WARNING: failed to unlink bucket: ret=" << ret
5294 if (!store
->get_zonegroup().is_master
) {
5296 ret
= forward_request_to_master(s
, &ot
.read_version
, store
, in_data
,
5299 if (ret
== -ENOENT
) {
5300 /* adjust error, we want to return with NoSuchBucket and not
5302 ret
= -ERR_NO_SUCH_BUCKET
;
5314 if (-ENOENT
== ret
) {
5315 ldout(store
->ctx(), 20) << "cannot find bucket = " << path
.bucket_name
<< dendl
;
5318 ldout(store
->ctx(), 20) << "cannot get bucket info, ret = " << ret
5321 fail_desc_t failed_item
= {
5325 failures
.push_back(failed_item
);
5330 ldout(store
->ctx(), 20) << "wrong auth for " << path
<< dendl
;
5332 fail_desc_t failed_item
= {
5336 failures
.push_back(failed_item
);
5341 if (-ENOENT
== ret
) {
5342 ldout(store
->ctx(), 20) << "cannot find entry " << path
<< dendl
;
5345 fail_desc_t failed_item
= {
5349 failures
.push_back(failed_item
);
5354 bool RGWBulkDelete::Deleter::delete_chunk(const std::list
<acct_path_t
>& paths
)
5356 ldout(store
->ctx(), 20) << "in delete_chunk" << dendl
;
5357 for (auto path
: paths
) {
5358 ldout(store
->ctx(), 20) << "bulk deleting path: " << path
<< dendl
;
5359 delete_single(path
);
5365 int RGWBulkDelete::verify_permission()
5370 void RGWBulkDelete::pre_exec()
5372 rgw_bucket_object_pre_exec(s
);
5375 void RGWBulkDelete::execute()
5377 deleter
= std::unique_ptr
<Deleter
>(new Deleter(store
, s
));
5379 bool is_truncated
= false;
5381 list
<RGWBulkDelete::acct_path_t
> items
;
5383 int ret
= get_data(items
, &is_truncated
);
5388 ret
= deleter
->delete_chunk(items
);
5389 } while (!op_ret
&& is_truncated
);
5395 constexpr std::array
<int, 2> RGWBulkUploadOp::terminal_errors
;
5397 int RGWBulkUploadOp::verify_permission()
5399 if (s
->auth
.identity
->is_anonymous()) {
5403 if (! verify_user_permission(s
, RGW_PERM_WRITE
)) {
5407 if (s
->user
->user_id
.tenant
!= s
->bucket_tenant
) {
5408 ldout(s
->cct
, 10) << "user cannot create a bucket in a different tenant"
5409 << " (user_id.tenant=" << s
->user
->user_id
.tenant
5410 << " requested=" << s
->bucket_tenant
<< ")"
5415 if (s
->user
->max_buckets
< 0) {
5422 void RGWBulkUploadOp::pre_exec()
5424 rgw_bucket_object_pre_exec(s
);
5427 boost::optional
<std::pair
<std::string
, rgw_obj_key
>>
5428 RGWBulkUploadOp::parse_path(const boost::string_ref
& path
)
5430 /* We need to skip all slashes at the beginning in order to preserve
5431 * compliance with Swift. */
5432 const size_t start_pos
= path
.find_first_not_of('/');
5434 if (boost::string_ref::npos
!= start_pos
) {
5435 /* Seperator is the first slash after the leading ones. */
5436 const size_t sep_pos
= path
.substr(start_pos
).find('/');
5438 if (boost::string_ref::npos
!= sep_pos
) {
5439 const auto bucket_name
= path
.substr(start_pos
, sep_pos
- start_pos
);
5440 const auto obj_name
= path
.substr(sep_pos
+ 1);
5442 return std::make_pair(bucket_name
.to_string(),
5443 rgw_obj_key(obj_name
.to_string()));
5445 /* It's guaranteed here that bucket name is at least one character
5446 * long and is different than slash. */
5447 return std::make_pair(path
.substr(start_pos
).to_string(),
5455 std::pair
<std::string
, std::string
>
5456 RGWBulkUploadOp::handle_upload_path(struct req_state
*s
)
5458 std::string bucket_path
, file_prefix
;
5459 if (! s
->init_state
.url_bucket
.empty()) {
5460 file_prefix
= bucket_path
= s
->init_state
.url_bucket
+ "/";
5461 if (! s
->object
.empty()) {
5462 std::string
& object_name
= s
->object
.name
;
5464 /* As rgw_obj_key::empty() already verified emptiness of s->object.name,
5465 * we can safely examine its last element. */
5466 if (object_name
.back() == '/') {
5467 file_prefix
.append(object_name
);
5469 file_prefix
.append(object_name
).append("/");
5473 return std::make_pair(bucket_path
, file_prefix
);
5476 int RGWBulkUploadOp::handle_dir_verify_permission()
5478 if (s
->user
->max_buckets
> 0) {
5479 RGWUserBuckets buckets
;
5481 bool is_truncated
= false;
5482 op_ret
= rgw_read_user_buckets(store
, s
->user
->user_id
, buckets
,
5483 marker
, std::string(), s
->user
->max_buckets
,
5484 false, &is_truncated
);
5489 if (buckets
.count() >= static_cast<size_t>(s
->user
->max_buckets
)) {
5490 return -ERR_TOO_MANY_BUCKETS
;
5497 static void forward_req_info(CephContext
*cct
, req_info
& info
, const std::string
& bucket_name
)
5499 /* the request of container or object level will contain bucket name.
5500 * only at account level need to append the bucket name */
5501 if (info
.script_uri
.find(bucket_name
) != std::string::npos
) {
5505 ldout(cct
, 20) << "append the bucket: "<< bucket_name
<< " to req_info" << dendl
;
5506 info
.script_uri
.append("/").append(bucket_name
);
5507 info
.request_uri_aws4
= info
.request_uri
= info
.script_uri
;
5508 info
.effective_uri
= "/" + bucket_name
;
5511 int RGWBulkUploadOp::handle_dir(const boost::string_ref path
)
5513 ldout(s
->cct
, 20) << "bulk upload: got directory=" << path
<< dendl
;
5515 op_ret
= handle_dir_verify_permission();
5520 std::string bucket_name
;
5521 rgw_obj_key object_junk
;
5522 std::tie(bucket_name
, object_junk
) = *parse_path(path
);
5524 rgw_raw_obj
obj(store
->get_zone_params().domain_root
,
5525 rgw_make_bucket_entry_name(s
->bucket_tenant
, bucket_name
));
5527 /* we need to make sure we read bucket info, it's not read before for this
5528 * specific request */
5529 RGWBucketInfo binfo
;
5530 std::map
<std::string
, ceph::bufferlist
> battrs
;
5531 op_ret
= store
->get_bucket_info(*dir_ctx
, s
->bucket_tenant
, bucket_name
,
5532 binfo
, NULL
, &battrs
);
5533 if (op_ret
< 0 && op_ret
!= -ENOENT
) {
5536 const bool bucket_exists
= (op_ret
!= -ENOENT
);
5538 if (bucket_exists
) {
5539 RGWAccessControlPolicy
old_policy(s
->cct
);
5540 int r
= get_bucket_policy_from_attr(s
->cct
, store
, binfo
,
5541 battrs
, &old_policy
);
5543 if (old_policy
.get_owner().get_id().compare(s
->user
->user_id
) != 0) {
5550 RGWBucketInfo master_info
;
5551 rgw_bucket
*pmaster_bucket
= nullptr;
5552 uint32_t *pmaster_num_shards
= nullptr;
5553 real_time creation_time
;
5554 obj_version objv
, ep_objv
, *pobjv
= nullptr;
5556 if (! store
->is_meta_master()) {
5558 ceph::bufferlist in_data
;
5559 req_info info
= s
->info
;
5560 forward_req_info(s
->cct
, info
, bucket_name
);
5561 op_ret
= forward_request_to_master(s
, nullptr, store
, in_data
, &jp
, &info
);
5566 JSONDecoder::decode_json("entry_point_object_ver", ep_objv
, &jp
);
5567 JSONDecoder::decode_json("object_ver", objv
, &jp
);
5568 JSONDecoder::decode_json("bucket_info", master_info
, &jp
);
5570 ldout(s
->cct
, 20) << "parsed: objv.tag=" << objv
.tag
<< " objv.ver="
5571 << objv
.ver
<< dendl
;
5572 ldout(s
->cct
, 20) << "got creation_time="<< master_info
.creation_time
5575 pmaster_bucket
= &master_info
.bucket
;
5576 creation_time
= master_info
.creation_time
;
5577 pmaster_num_shards
= &master_info
.num_shards
;
5580 pmaster_bucket
= nullptr;
5581 pmaster_num_shards
= nullptr;
5585 std::string placement_rule
;
5586 if (bucket_exists
) {
5587 std::string selected_placement_rule
;
5589 bucket
.tenant
= s
->bucket_tenant
;
5590 bucket
.name
= s
->bucket_name
;
5591 op_ret
= store
->select_bucket_placement(*(s
->user
),
5592 store
->get_zonegroup().get_id(),
5594 &selected_placement_rule
,
5596 if (selected_placement_rule
!= binfo
.placement_rule
) {
5598 ldout(s
->cct
, 20) << "bulk upload: non-coherent placement rule" << dendl
;
5603 /* Create metadata: ACLs. */
5604 std::map
<std::string
, ceph::bufferlist
> attrs
;
5605 RGWAccessControlPolicy policy
;
5606 policy
.create_default(s
->user
->user_id
, s
->user
->display_name
);
5607 ceph::bufferlist aclbl
;
5608 policy
.encode(aclbl
);
5609 attrs
.emplace(RGW_ATTR_ACL
, std::move(aclbl
));
5611 RGWQuotaInfo quota_info
;
5612 const RGWQuotaInfo
* pquota_info
= nullptr;
5615 bucket
.tenant
= s
->bucket_tenant
; /* ignored if bucket exists */
5616 bucket
.name
= bucket_name
;
5619 RGWBucketInfo out_info
;
5620 op_ret
= store
->create_bucket(*(s
->user
),
5622 store
->get_zonegroup().get_id(),
5623 placement_rule
, binfo
.swift_ver_location
,
5625 out_info
, pobjv
, &ep_objv
, creation_time
,
5626 pmaster_bucket
, pmaster_num_shards
, true);
5627 /* continue if EEXIST and create_bucket will fail below. this way we can
5628 * recover from a partial create by retrying it. */
5629 ldout(s
->cct
, 20) << "rgw_create_bucket returned ret=" << op_ret
5630 << ", bucket=" << bucket
<< dendl
;
5632 if (op_ret
&& op_ret
!= -EEXIST
) {
5636 const bool existed
= (op_ret
== -EEXIST
);
5638 /* bucket already existed, might have raced with another bucket creation, or
5639 * might be partial bucket creation that never completed. Read existing bucket
5640 * info, verify that the reported bucket owner is the current user.
5641 * If all is ok then update the user's list of buckets.
5642 * Otherwise inform client about a name conflict.
5644 if (out_info
.owner
.compare(s
->user
->user_id
) != 0) {
5646 ldout(s
->cct
, 20) << "bulk upload: conflicting bucket name" << dendl
;
5649 bucket
= out_info
.bucket
;
5652 op_ret
= rgw_link_bucket(store
, s
->user
->user_id
, bucket
,
5653 out_info
.creation_time
, false);
5654 if (op_ret
&& !existed
&& op_ret
!= -EEXIST
) {
5655 /* if it exists (or previously existed), don't remove it! */
5656 op_ret
= rgw_unlink_bucket(store
, s
->user
->user_id
,
5657 bucket
.tenant
, bucket
.name
);
5659 ldout(s
->cct
, 0) << "bulk upload: WARNING: failed to unlink bucket: ret="
5662 } else if (op_ret
== -EEXIST
|| (op_ret
== 0 && existed
)) {
5663 ldout(s
->cct
, 20) << "bulk upload: containers already exists"
5665 op_ret
= -ERR_BUCKET_EXISTS
;
5672 bool RGWBulkUploadOp::handle_file_verify_permission(RGWBucketInfo
& binfo
,
5673 std::map
<std::string
, ceph::bufferlist
>& battrs
,
5674 ACLOwner
& bucket_owner
/* out */)
5676 RGWAccessControlPolicy
bacl(store
->ctx());
5677 op_ret
= read_bucket_policy(store
, s
, binfo
, battrs
, &bacl
, binfo
.bucket
);
5679 ldout(s
->cct
, 20) << "bulk upload: cannot read_policy() for bucket"
5684 bucket_owner
= bacl
.get_owner();
5685 return verify_bucket_permission(s
, s
->user_acl
.get(), &bacl
, RGW_PERM_WRITE
);
5688 int RGWBulkUploadOp::handle_file(const boost::string_ref path
,
5690 AlignedStreamGetter
& body
)
5693 ldout(s
->cct
, 20) << "bulk upload: got file=" << path
<< ", size=" << size
5696 RGWPutObjDataProcessor
*filter
= nullptr;
5697 boost::optional
<RGWPutObj_Compress
> compressor
;
5699 if (size
> static_cast<const size_t>(s
->cct
->_conf
->rgw_max_put_size
)) {
5700 op_ret
= -ERR_TOO_LARGE
;
5704 std::string bucket_name
;
5706 std::tie(bucket_name
, object
) = *parse_path(path
);
5708 auto& obj_ctx
= *static_cast<RGWObjectCtx
*>(s
->obj_ctx
);
5709 RGWBucketInfo binfo
;
5710 std::map
<std::string
, ceph::bufferlist
> battrs
;
5712 op_ret
= store
->get_bucket_info(obj_ctx
, s
->user
->user_id
.tenant
,
5713 bucket_name
, binfo
, nullptr, &battrs
);
5714 if (op_ret
== -ENOENT
) {
5715 ldout(s
->cct
, 20) << "bulk upload: non existent directory=" << bucket_name
5717 } else if (op_ret
< 0) {
5721 if (! handle_file_verify_permission(binfo
, battrs
, bowner
)) {
5722 ldout(s
->cct
, 20) << "bulk upload: object creation unauthorized" << dendl
;
5727 op_ret
= store
->check_quota(bowner
.get_id(), binfo
.bucket
,
5728 user_quota
, bucket_quota
, size
);
5733 RGWPutObjProcessor_Atomic
processor(obj_ctx
,
5738 s
->cct
->_conf
->rgw_obj_stripe_size
,
5740 binfo
.versioning_enabled());
5742 /* No filters by default. */
5743 filter
= &processor
;
5745 op_ret
= processor
.prepare(store
, nullptr);
5747 ldout(s
->cct
, 20) << "bulk upload: cannot prepare processor due to ret="
5752 const auto& compression_type
= store
->get_zone_params().get_compression_type(
5753 binfo
.placement_rule
);
5754 CompressorRef plugin
;
5755 if (compression_type
!= "none") {
5756 plugin
= Compressor::create(s
->cct
, compression_type
);
5758 ldout(s
->cct
, 1) << "Cannot load plugin for rgw_compression_type "
5759 << compression_type
<< dendl
;
5761 compressor
.emplace(s
->cct
, plugin
, filter
);
5762 filter
= &*compressor
;
5766 /* Upload file content. */
5771 ceph::bufferlist data
;
5772 len
= body
.get_at_most(s
->cct
->_conf
->rgw_max_chunk_size
, data
);
5774 ldout(s
->cct
, 20) << "bulk upload: body=" << data
.c_str() << dendl
;
5778 } else if (len
> 0) {
5779 hash
.Update((const byte
*)data
.c_str(), data
.length());
5780 op_ret
= put_data_and_throttle(filter
, data
, ofs
, false);
5782 ldout(s
->cct
, 20) << "processor->thottle_data() returned ret="
5793 ldout(s
->cct
, 10) << "bulk upload: real file size different from declared"
5798 op_ret
= store
->check_quota(bowner
.get_id(), binfo
.bucket
,
5799 user_quota
, bucket_quota
, size
);
5801 ldout(s
->cct
, 20) << "bulk upload: quota exceeded for path=" << path
5806 char calc_md5
[CEPH_CRYPTO_MD5_DIGESTSIZE
* 2 + 1];
5807 unsigned char m
[CEPH_CRYPTO_MD5_DIGESTSIZE
];
5809 buf_to_hex(m
, CEPH_CRYPTO_MD5_DIGESTSIZE
, calc_md5
);
5811 /* Create metadata: ETAG. */
5812 std::map
<std::string
, ceph::bufferlist
> attrs
;
5813 std::string etag
= calc_md5
;
5814 ceph::bufferlist etag_bl
;
5815 etag_bl
.append(etag
.c_str(), etag
.size() + 1);
5816 attrs
.emplace(RGW_ATTR_ETAG
, std::move(etag_bl
));
5818 /* Create metadata: ACLs. */
5819 RGWAccessControlPolicy policy
;
5820 policy
.create_default(s
->user
->user_id
, s
->user
->display_name
);
5821 ceph::bufferlist aclbl
;
5822 policy
.encode(aclbl
);
5823 attrs
.emplace(RGW_ATTR_ACL
, std::move(aclbl
));
5825 /* Create metadata: compression info. */
5826 if (compressor
&& compressor
->is_compressed()) {
5827 ceph::bufferlist tmp
;
5828 RGWCompressionInfo cs_info
;
5829 cs_info
.compression_type
= plugin
->get_type_name();
5830 cs_info
.orig_size
= s
->obj_size
;
5831 cs_info
.blocks
= std::move(compressor
->get_compression_blocks());
5832 ::encode(cs_info
, tmp
);
5833 attrs
.emplace(RGW_ATTR_COMPRESSION
, std::move(tmp
));
5836 /* Complete the transaction. */
5837 op_ret
= processor
.complete(size
, etag
, nullptr, ceph::real_time(), attrs
,
5838 ceph::real_time() /* delete_at */);
5840 ldout(s
->cct
, 20) << "bulk upload: processor::complete returned op_ret="
5847 void RGWBulkUploadOp::execute()
5849 ceph::bufferlist
buffer(64 * 1024);
5851 ldout(s
->cct
, 20) << "bulk upload: start" << dendl
;
5853 /* Create an instance of stream-abstracting class. Having this indirection
5854 * allows for easy introduction of decompressors like gzip and bzip2. */
5855 auto stream
= create_stream();
5860 /* Handling the $UPLOAD_PATH accordingly to the Swift's Bulk middleware. See:
5861 * https://github.com/openstack/swift/blob/2.13.0/swift/common/middleware/bulk.py#L31-L41 */
5862 std::string bucket_path
, file_prefix
;
5863 std::tie(bucket_path
, file_prefix
) = handle_upload_path(s
);
5865 auto status
= rgw::tar::StatusIndicator::create();
5867 op_ret
= stream
->get_exactly(rgw::tar::BLOCK_SIZE
, buffer
);
5869 ldout(s
->cct
, 2) << "bulk upload: cannot read header" << dendl
;
5873 /* We need to re-interpret the buffer as a TAR block. Exactly two blocks
5874 * must be tracked to detect out end-of-archive. It occurs when both of
5875 * them are empty (zeroed). Tracing this particular inter-block dependency
5876 * is responsibility of the rgw::tar::StatusIndicator class. */
5877 boost::optional
<rgw::tar::HeaderView
> header
;
5878 std::tie(status
, header
) = rgw::tar::interpret_block(status
, buffer
);
5880 if (! status
.empty() && header
) {
5881 /* This specific block isn't empty (entirely zeroed), so we can parse
5882 * it as a TAR header and dispatch. At the moment we do support only
5883 * regular files and directories. Everything else (symlinks, devices)
5884 * will be ignored but won't cease the whole upload. */
5885 switch (header
->get_filetype()) {
5886 case rgw::tar::FileType::NORMAL_FILE
: {
5887 ldout(s
->cct
, 2) << "bulk upload: handling regular file" << dendl
;
5889 boost::string_ref filename
= bucket_path
.empty() ? header
->get_filename() : \
5890 file_prefix
+ header
->get_filename().to_string();
5891 auto body
= AlignedStreamGetter(0, header
->get_filesize(),
5892 rgw::tar::BLOCK_SIZE
, *stream
);
5893 op_ret
= handle_file(filename
,
5894 header
->get_filesize(),
5897 /* Only regular files counts. */
5900 failures
.emplace_back(op_ret
, filename
.to_string());
5904 case rgw::tar::FileType::DIRECTORY
: {
5905 ldout(s
->cct
, 2) << "bulk upload: handling regular directory" << dendl
;
5907 boost::string_ref dirname
= bucket_path
.empty() ? header
->get_filename() : bucket_path
;
5908 op_ret
= handle_dir(dirname
);
5909 if (op_ret
< 0 && op_ret
!= -ERR_BUCKET_EXISTS
) {
5910 failures
.emplace_back(op_ret
, dirname
.to_string());
5915 /* Not recognized. Skip. */
5921 /* In case of any problems with sub-request authorization Swift simply
5922 * terminates whole upload immediately. */
5923 if (boost::algorithm::contains(std::initializer_list
<int>{ op_ret
},
5925 ldout(s
->cct
, 2) << "bulk upload: terminating due to ret=" << op_ret
5930 ldout(s
->cct
, 2) << "bulk upload: an empty block" << dendl
;
5935 } while (! status
.eof());
5940 RGWBulkUploadOp::AlignedStreamGetter::~AlignedStreamGetter()
5942 const size_t aligned_legnth
= length
+ (-length
% alignment
);
5943 ceph::bufferlist junk
;
5945 DecoratedStreamGetter::get_exactly(aligned_legnth
- position
, junk
);
5948 ssize_t
RGWBulkUploadOp::AlignedStreamGetter::get_at_most(const size_t want
,
5949 ceph::bufferlist
& dst
)
5951 const size_t max_to_read
= std::min(want
, length
- position
);
5952 const auto len
= DecoratedStreamGetter::get_at_most(max_to_read
, dst
);
5959 ssize_t
RGWBulkUploadOp::AlignedStreamGetter::get_exactly(const size_t want
,
5960 ceph::bufferlist
& dst
)
5962 const auto len
= DecoratedStreamGetter::get_exactly(want
, dst
);
5969 int RGWSetAttrs::verify_permission()
5972 if (!s
->object
.empty()) {
5973 perm
= verify_object_permission(s
, RGW_PERM_WRITE
);
5975 perm
= verify_bucket_permission(s
, RGW_PERM_WRITE
);
5983 void RGWSetAttrs::pre_exec()
5985 rgw_bucket_object_pre_exec(s
);
5988 void RGWSetAttrs::execute()
5990 op_ret
= get_params();
5994 rgw_obj
obj(s
->bucket
, s
->object
);
5996 store
->set_atomic(s
->obj_ctx
, obj
);
5998 if (!s
->object
.empty()) {
5999 op_ret
= store
->set_attrs(s
->obj_ctx
, s
->bucket_info
, obj
, attrs
, nullptr);
6001 for (auto& iter
: attrs
) {
6002 s
->bucket_attrs
[iter
.first
] = std::move(iter
.second
);
6004 op_ret
= rgw_bucket_set_attrs(store
, s
->bucket_info
, s
->bucket_attrs
,
6005 &s
->bucket_info
.objv_tracker
);
6009 void RGWGetObjLayout::pre_exec()
6011 rgw_bucket_object_pre_exec(s
);
6014 void RGWGetObjLayout::execute()
6016 rgw_obj
obj(s
->bucket
, s
->object
);
6017 RGWRados::Object
target(store
,
6019 *static_cast<RGWObjectCtx
*>(s
->obj_ctx
),
6020 rgw_obj(s
->bucket
, s
->object
));
6021 RGWRados::Object::Read
stat_op(&target
);
6023 op_ret
= stat_op
.prepare();
6028 head_obj
= stat_op
.state
.head_obj
;
6030 op_ret
= target
.get_manifest(&manifest
);
6034 RGWHandler::~RGWHandler()
6038 int RGWHandler::init(RGWRados
*_store
,
6039 struct req_state
*_s
,
6040 rgw::io::BasicClient
*cio
)
6048 int RGWHandler::do_init_permissions()
6050 int ret
= rgw_build_bucket_policies(store
, s
);
6053 ldout(s
->cct
, 10) << "read_permissions on " << s
->bucket
<< " ret=" << ret
<< dendl
;
6054 if (ret
== -ENODATA
)
6061 int RGWHandler::do_read_permissions(RGWOp
*op
, bool only_bucket
)
6064 /* already read bucket info */
6067 int ret
= rgw_build_object_policies(store
, s
, op
->prefetch_data());
6070 ldout(s
->cct
, 10) << "read_permissions on " << s
->bucket
<< ":"
6071 << s
->object
<< " only_bucket=" << only_bucket
6072 << " ret=" << ret
<< dendl
;
6073 if (ret
== -ENODATA
)
6080 int RGWOp::error_handler(int err_no
, string
*error_content
) {
6081 return dialect_handler
->error_handler(err_no
, error_content
);
6084 int RGWHandler::error_handler(int err_no
, string
*error_content
) {
6085 // This is the do-nothing error handler