1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
9 #include "include/types.h"
12 #include "rgw_multi.h"
16 #include "services/svc_sys_obj.h"
17 #include "services/svc_tier_rados.h"
19 #define dout_subsys ceph_subsys_rgw
23 bool RGWMultiPart::xml_end(const char *el
)
25 RGWMultiPartNumber
*num_obj
= static_cast<RGWMultiPartNumber
*>(find_first("PartNumber"));
26 RGWMultiETag
*etag_obj
= static_cast<RGWMultiETag
*>(find_first("ETag"));
28 if (!num_obj
|| !etag_obj
)
31 string s
= num_obj
->get_data();
35 num
= atoi(s
.c_str());
37 s
= etag_obj
->get_data();
43 bool RGWMultiCompleteUpload::xml_end(const char *el
) {
44 XMLObjIter iter
= find("Part");
45 RGWMultiPart
*part
= static_cast<RGWMultiPart
*>(iter
.get_next());
47 int num
= part
->get_num();
48 string etag
= part
->get_etag();
50 part
= static_cast<RGWMultiPart
*>(iter
.get_next());
56 XMLObj
*RGWMultiXMLParser::alloc_obj(const char *el
) {
58 if (strcmp(el
, "CompleteMultipartUpload") == 0 ||
59 strcmp(el
, "MultipartUpload") == 0) {
60 obj
= new RGWMultiCompleteUpload();
61 } else if (strcmp(el
, "Part") == 0) {
62 obj
= new RGWMultiPart();
63 } else if (strcmp(el
, "PartNumber") == 0) {
64 obj
= new RGWMultiPartNumber();
65 } else if (strcmp(el
, "ETag") == 0) {
66 obj
= new RGWMultiETag();
72 bool is_v2_upload_id(const string
& upload_id
)
74 const char *uid
= upload_id
.c_str();
76 return (strncmp(uid
, MULTIPART_UPLOAD_ID_PREFIX
, sizeof(MULTIPART_UPLOAD_ID_PREFIX
) - 1) == 0) ||
77 (strncmp(uid
, MULTIPART_UPLOAD_ID_PREFIX_LEGACY
, sizeof(MULTIPART_UPLOAD_ID_PREFIX_LEGACY
) - 1) == 0);
80 int list_multipart_parts(rgw::sal::RGWRadosStore
*store
, RGWBucketInfo
& bucket_info
,
82 const string
& upload_id
,
83 const string
& meta_oid
, int num_parts
,
84 int marker
, map
<uint32_t, RGWUploadPartInfo
>& parts
,
85 int *next_marker
, bool *truncated
,
88 map
<string
, bufferlist
> parts_map
;
89 map
<string
, bufferlist
>::iterator iter
;
92 obj
.init_ns(bucket_info
.bucket
, meta_oid
, RGW_OBJ_NS_MULTIPART
);
93 obj
.set_in_extra_data(true);
96 store
->getRados()->obj_to_raw(bucket_info
.placement_rule
, obj
, &raw_obj
);
98 bool sorted_omap
= is_v2_upload_id(upload_id
) && !assume_unsorted
;
102 auto obj_ctx
= store
->svc()->sysobj
->init_obj_ctx();
103 auto sysobj
= obj_ctx
.get_obj(raw_obj
);
110 snprintf(buf
, sizeof(buf
), "%08d", marker
);
113 ret
= sysobj
.omap().get_vals(p
, num_parts
+ 1, &parts_map
,
114 nullptr, null_yield
);
116 ret
= sysobj
.omap().get_all(&parts_map
, null_yield
);
125 uint32_t expected_next
= marker
+ 1;
127 for (i
= 0, iter
= parts_map
.begin();
128 (i
< num_parts
|| !sorted_omap
) && iter
!= parts_map
.end();
130 bufferlist
& bl
= iter
->second
;
131 auto bli
= bl
.cbegin();
132 RGWUploadPartInfo info
;
135 } catch (buffer::error
& err
) {
136 ldout(cct
, 0) << "ERROR: could not part info, caught buffer::error" <<
141 if (info
.num
!= expected_next
) {
142 /* ouch, we expected a specific part num here, but we got a
143 * different one. Either a part is missing, or it could be a
144 * case of mixed rgw versions working on the same upload,
145 * where one gateway doesn't support correctly sorted omap
146 * keys for multipart upload just assume data is unsorted.
148 return list_multipart_parts(store
, bucket_info
, cct
, upload_id
,
149 meta_oid
, num_parts
, marker
, parts
,
150 next_marker
, truncated
, true);
155 (int)info
.num
> marker
) {
156 parts
[info
.num
] = info
;
163 *truncated
= (iter
!= parts_map
.end());
166 /* rebuild a map with only num_parts entries */
167 map
<uint32_t, RGWUploadPartInfo
> new_parts
;
168 map
<uint32_t, RGWUploadPartInfo
>::iterator piter
;
169 for (i
= 0, piter
= parts
.begin();
170 i
< num_parts
&& piter
!= parts
.end();
172 new_parts
[piter
->first
] = piter
->second
;
173 last_num
= piter
->first
;
177 *truncated
= (piter
!= parts
.end());
180 parts
.swap(new_parts
);
184 *next_marker
= last_num
;
190 int list_multipart_parts(rgw::sal::RGWRadosStore
*store
, struct req_state
*s
,
191 const string
& upload_id
,
192 const string
& meta_oid
, int num_parts
,
193 int marker
, map
<uint32_t, RGWUploadPartInfo
>& parts
,
194 int *next_marker
, bool *truncated
,
195 bool assume_unsorted
)
197 return list_multipart_parts(store
, s
->bucket_info
, s
->cct
, upload_id
,
198 meta_oid
, num_parts
, marker
, parts
,
199 next_marker
, truncated
, assume_unsorted
);
202 int abort_multipart_upload(rgw::sal::RGWRadosStore
*store
, CephContext
*cct
,
203 RGWObjectCtx
*obj_ctx
, RGWBucketInfo
& bucket_info
,
207 meta_obj
.init_ns(bucket_info
.bucket
, mp_obj
.get_meta(), RGW_OBJ_NS_MULTIPART
);
208 meta_obj
.set_in_extra_data(true);
209 meta_obj
.index_hash_source
= mp_obj
.get_key();
210 cls_rgw_obj_chain chain
;
211 list
<rgw_obj_index_key
> remove_objs
;
212 map
<uint32_t, RGWUploadPartInfo
> obj_parts
;
216 uint64_t parts_accounted_size
= 0;
219 ret
= list_multipart_parts(store
, bucket_info
, cct
,
220 mp_obj
.get_upload_id(), mp_obj
.get_meta(),
221 1000, marker
, obj_parts
, &marker
, &truncated
);
223 ldout(cct
, 20) << __func__
<< ": list_multipart_parts returned " <<
225 return (ret
== -ENOENT
) ? -ERR_NO_SUCH_UPLOAD
: ret
;
228 for (auto obj_iter
= obj_parts
.begin();
229 obj_iter
!= obj_parts
.end();
231 RGWUploadPartInfo
& obj_part
= obj_iter
->second
;
233 if (obj_part
.manifest
.empty()) {
234 string oid
= mp_obj
.get_part(obj_iter
->second
.num
);
235 obj
.init_ns(bucket_info
.bucket
, oid
, RGW_OBJ_NS_MULTIPART
);
236 obj
.index_hash_source
= mp_obj
.get_key();
237 ret
= store
->getRados()->delete_obj(*obj_ctx
, bucket_info
, obj
, 0);
238 if (ret
< 0 && ret
!= -ENOENT
)
241 store
->getRados()->update_gc_chain(meta_obj
, obj_part
.manifest
, &chain
);
242 RGWObjManifest::obj_iterator oiter
= obj_part
.manifest
.obj_begin();
243 if (oiter
!= obj_part
.manifest
.obj_end()) {
245 rgw_raw_obj raw_head
= oiter
.get_location().get_raw_obj(store
->getRados());
246 RGWSI_Tier_RADOS::raw_obj_to_obj(bucket_info
.bucket
, raw_head
, &head
);
248 rgw_obj_index_key key
;
249 head
.key
.get_index_key(&key
);
250 remove_objs
.push_back(key
);
253 parts_accounted_size
+= obj_part
.accounted_size
;
257 /* use upload id as tag and do it synchronously */
258 ret
= store
->getRados()->send_chain_to_gc(chain
, mp_obj
.get_upload_id());
260 ldout(cct
, 5) << __func__
<< ": gc->send_chain() returned " << ret
<< dendl
;
261 if (ret
== -ENOENT
) {
262 return -ERR_NO_SUCH_UPLOAD
;
264 //Delete objects inline if send chain to gc fails
265 store
->getRados()->delete_objs_inline(chain
, mp_obj
.get_upload_id());
268 RGWRados::Object
del_target(store
->getRados(), bucket_info
, *obj_ctx
, meta_obj
);
269 RGWRados::Object::Delete
del_op(&del_target
);
270 del_op
.params
.bucket_owner
= bucket_info
.owner
;
271 del_op
.params
.versioning_status
= 0;
272 if (!remove_objs
.empty()) {
273 del_op
.params
.remove_objs
= &remove_objs
;
276 del_op
.params
.abortmp
= true;
277 del_op
.params
.parts_accounted_size
= parts_accounted_size
;
279 // and also remove the metadata obj
280 ret
= del_op
.delete_obj(null_yield
);
282 ldout(cct
, 20) << __func__
<< ": del_op.delete_obj returned " <<
285 return (ret
== -ENOENT
) ? -ERR_NO_SUCH_UPLOAD
: ret
;
288 int list_bucket_multiparts(rgw::sal::RGWRadosStore
*store
, RGWBucketInfo
& bucket_info
,
289 const string
& prefix
, const string
& marker
,
291 const int& max_uploads
,
292 vector
<rgw_bucket_dir_entry
> *objs
,
293 map
<string
, bool> *common_prefixes
, bool *is_truncated
)
295 RGWRados::Bucket
target(store
->getRados(), bucket_info
);
296 RGWRados::Bucket::List
list_op(&target
);
297 MultipartMetaFilter mp_filter
;
299 list_op
.params
.prefix
= prefix
;
300 list_op
.params
.delim
= delim
;
301 list_op
.params
.marker
= marker
;
302 list_op
.params
.ns
= RGW_OBJ_NS_MULTIPART
;
303 list_op
.params
.filter
= &mp_filter
;
305 return(list_op
.list_objects(max_uploads
, objs
, common_prefixes
, is_truncated
, null_yield
));
308 int abort_bucket_multiparts(rgw::sal::RGWRadosStore
*store
, CephContext
*cct
, RGWBucketInfo
& bucket_info
,
309 string
& prefix
, string
& delim
)
311 constexpr int max
= 1000;
312 int ret
, num_deleted
= 0;
313 vector
<rgw_bucket_dir_entry
> objs
;
314 RGWObjectCtx
obj_ctx(store
);
319 ret
= list_bucket_multiparts(store
, bucket_info
, prefix
, marker
, delim
,
320 max
, &objs
, nullptr, &is_truncated
);
322 ldout(store
->ctx(), 0) << __func__
<<
323 " ERROR : calling list_bucket_multiparts; ret=" << ret
<<
324 "; bucket=\"" << bucket_info
.bucket
<< "\"; prefix=\"" <<
325 prefix
<< "\"; delim=\"" << delim
<< "\"" << dendl
;
328 ldout(store
->ctx(), 20) << __func__
<<
329 " INFO: aborting and cleaning up multipart upload(s); bucket=\"" <<
330 bucket_info
.bucket
<< "\"; objs.size()=" << objs
.size() <<
331 "; is_truncated=" << is_truncated
<< dendl
;
335 for (const auto& obj
: objs
) {
336 rgw_obj_key
key(obj
.key
);
337 if (!mp
.from_meta(key
.name
))
339 ret
= abort_multipart_upload(store
, cct
, &obj_ctx
, bucket_info
, mp
);
341 // we're doing a best-effort; if something cannot be found,
342 // log it and keep moving forward
343 if (ret
!= -ENOENT
&& ret
!= -ERR_NO_SUCH_UPLOAD
) {
344 ldout(store
->ctx(), 0) << __func__
<<
345 " ERROR : failed to abort and clean-up multipart upload \"" <<
346 key
.get_oid() << "\"" << dendl
;
349 ldout(store
->ctx(), 10) << __func__
<<
350 " NOTE : unable to find part(s) of "
351 "aborted multipart upload of \"" << key
.get_oid() <<
352 "\" for cleaning up" << dendl
;
358 ldout(store
->ctx(), 0) << __func__
<<
359 " WARNING : aborted " << num_deleted
<<
360 " incomplete multipart uploads" << dendl
;
363 } while (is_truncated
);