]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_multi.cc
import ceph pacific 16.2.5
[ceph.git] / ceph / src / rgw / rgw_multi.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #include <string.h>
5
6 #include <iostream>
7 #include <map>
8
9 #include "include/types.h"
10
11 #include "rgw_xml.h"
12 #include "rgw_multi.h"
13 #include "rgw_op.h"
14 #include "rgw_sal.h"
15 #include "rgw_sal_rados.h"
16
17 #include "services/svc_sys_obj.h"
18 #include "services/svc_tier_rados.h"
19
20 #define dout_subsys ceph_subsys_rgw
21
22
23
24 bool RGWMultiPart::xml_end(const char *el)
25 {
26 RGWMultiPartNumber *num_obj = static_cast<RGWMultiPartNumber *>(find_first("PartNumber"));
27 RGWMultiETag *etag_obj = static_cast<RGWMultiETag *>(find_first("ETag"));
28
29 if (!num_obj || !etag_obj)
30 return false;
31
32 string s = num_obj->get_data();
33 if (s.empty())
34 return false;
35
36 num = atoi(s.c_str());
37
38 s = etag_obj->get_data();
39 etag = s;
40
41 return true;
42 }
43
44 bool RGWMultiCompleteUpload::xml_end(const char *el) {
45 XMLObjIter iter = find("Part");
46 RGWMultiPart *part = static_cast<RGWMultiPart *>(iter.get_next());
47 while (part) {
48 int num = part->get_num();
49 string etag = part->get_etag();
50 parts[num] = etag;
51 part = static_cast<RGWMultiPart *>(iter.get_next());
52 }
53 return true;
54 }
55
56
57 XMLObj *RGWMultiXMLParser::alloc_obj(const char *el) {
58 XMLObj *obj = NULL;
59 if (strcmp(el, "CompleteMultipartUpload") == 0 ||
60 strcmp(el, "MultipartUpload") == 0) {
61 obj = new RGWMultiCompleteUpload();
62 } else if (strcmp(el, "Part") == 0) {
63 obj = new RGWMultiPart();
64 } else if (strcmp(el, "PartNumber") == 0) {
65 obj = new RGWMultiPartNumber();
66 } else if (strcmp(el, "ETag") == 0) {
67 obj = new RGWMultiETag();
68 }
69
70 return obj;
71 }
72
73 bool is_v2_upload_id(const string& upload_id)
74 {
75 const char *uid = upload_id.c_str();
76
77 return (strncmp(uid, MULTIPART_UPLOAD_ID_PREFIX, sizeof(MULTIPART_UPLOAD_ID_PREFIX) - 1) == 0) ||
78 (strncmp(uid, MULTIPART_UPLOAD_ID_PREFIX_LEGACY, sizeof(MULTIPART_UPLOAD_ID_PREFIX_LEGACY) - 1) == 0);
79 }
80
81 int list_multipart_parts(const DoutPrefixProvider *dpp, rgw::sal::RGWRadosStore *store, RGWBucketInfo& bucket_info,
82 CephContext *cct,
83 const string& upload_id,
84 const string& meta_oid, int num_parts,
85 int marker, map<uint32_t, RGWUploadPartInfo>& parts,
86 int *next_marker, bool *truncated,
87 bool assume_unsorted)
88 {
89 map<string, bufferlist> parts_map;
90 map<string, bufferlist>::iterator iter;
91
92 rgw_obj obj;
93 obj.init_ns(bucket_info.bucket, meta_oid, RGW_OBJ_NS_MULTIPART);
94 obj.set_in_extra_data(true);
95
96 rgw_raw_obj raw_obj;
97 store->getRados()->obj_to_raw(bucket_info.placement_rule, obj, &raw_obj);
98
99 bool sorted_omap = is_v2_upload_id(upload_id) && !assume_unsorted;
100
101 parts.clear();
102
103 auto obj_ctx = store->svc()->sysobj->init_obj_ctx();
104 auto sysobj = obj_ctx.get_obj(raw_obj);
105 int ret;
106 if (sorted_omap) {
107 string p;
108 p = "part.";
109 char buf[32];
110
111 snprintf(buf, sizeof(buf), "%08d", marker);
112 p.append(buf);
113
114 ret = sysobj.omap().get_vals(dpp, p, num_parts + 1, &parts_map,
115 nullptr, null_yield);
116 } else {
117 ret = sysobj.omap().get_all(dpp, &parts_map, null_yield);
118 }
119 if (ret < 0) {
120 return ret;
121 }
122
123 int i;
124 int last_num = 0;
125
126 uint32_t expected_next = marker + 1;
127
128 for (i = 0, iter = parts_map.begin();
129 (i < num_parts || !sorted_omap) && iter != parts_map.end();
130 ++iter, ++i) {
131 bufferlist& bl = iter->second;
132 auto bli = bl.cbegin();
133 RGWUploadPartInfo info;
134 try {
135 decode(info, bli);
136 } catch (buffer::error& err) {
137 ldpp_dout(dpp, 0) << "ERROR: could not part info, caught buffer::error" <<
138 dendl;
139 return -EIO;
140 }
141 if (sorted_omap) {
142 if (info.num != expected_next) {
143 /* ouch, we expected a specific part num here, but we got a
144 * different one. Either a part is missing, or it could be a
145 * case of mixed rgw versions working on the same upload,
146 * where one gateway doesn't support correctly sorted omap
147 * keys for multipart upload just assume data is unsorted.
148 */
149 return list_multipart_parts(dpp, store, bucket_info, cct, upload_id,
150 meta_oid, num_parts, marker, parts,
151 next_marker, truncated, true);
152 }
153 expected_next++;
154 }
155 if (sorted_omap ||
156 (int)info.num > marker) {
157 parts[info.num] = info;
158 last_num = info.num;
159 }
160 }
161
162 if (sorted_omap) {
163 if (truncated) {
164 *truncated = (iter != parts_map.end());
165 }
166 } else {
167 /* rebuild a map with only num_parts entries */
168 map<uint32_t, RGWUploadPartInfo> new_parts;
169 map<uint32_t, RGWUploadPartInfo>::iterator piter;
170 for (i = 0, piter = parts.begin();
171 i < num_parts && piter != parts.end();
172 ++i, ++piter) {
173 new_parts[piter->first] = piter->second;
174 last_num = piter->first;
175 }
176
177 if (truncated) {
178 *truncated = (piter != parts.end());
179 }
180
181 parts.swap(new_parts);
182 }
183
184 if (next_marker) {
185 *next_marker = last_num;
186 }
187
188 return 0;
189 }
190
191 int list_multipart_parts(const DoutPrefixProvider *dpp,
192 rgw::sal::RGWRadosStore *store, struct req_state *s,
193 const string& upload_id,
194 const string& meta_oid, int num_parts,
195 int marker, map<uint32_t, RGWUploadPartInfo>& parts,
196 int *next_marker, bool *truncated,
197 bool assume_unsorted)
198 {
199 return list_multipart_parts(dpp, store, s->bucket->get_info(), s->cct, upload_id,
200 meta_oid, num_parts, marker, parts,
201 next_marker, truncated, assume_unsorted);
202 }
203
204 int abort_multipart_upload(const DoutPrefixProvider *dpp,
205 rgw::sal::RGWRadosStore *store, CephContext *cct,
206 RGWObjectCtx *obj_ctx, RGWBucketInfo& bucket_info,
207 RGWMPObj& mp_obj)
208 {
209 rgw_obj meta_obj;
210 meta_obj.init_ns(bucket_info.bucket, mp_obj.get_meta(), RGW_OBJ_NS_MULTIPART);
211 meta_obj.set_in_extra_data(true);
212 meta_obj.index_hash_source = mp_obj.get_key();
213 cls_rgw_obj_chain chain;
214 list<rgw_obj_index_key> remove_objs;
215 map<uint32_t, RGWUploadPartInfo> obj_parts;
216 bool truncated;
217 int marker = 0;
218 int ret;
219 uint64_t parts_accounted_size = 0;
220
221 do {
222 ret = list_multipart_parts(dpp, store, bucket_info, cct,
223 mp_obj.get_upload_id(), mp_obj.get_meta(),
224 1000, marker, obj_parts, &marker, &truncated);
225 if (ret < 0) {
226 ldpp_dout(dpp, 20) << __func__ << ": list_multipart_parts returned " <<
227 ret << dendl;
228 return (ret == -ENOENT) ? -ERR_NO_SUCH_UPLOAD : ret;
229 }
230
231 for (auto obj_iter = obj_parts.begin();
232 obj_iter != obj_parts.end();
233 ++obj_iter) {
234 RGWUploadPartInfo& obj_part = obj_iter->second;
235 rgw_obj obj;
236 if (obj_part.manifest.empty()) {
237 string oid = mp_obj.get_part(obj_iter->second.num);
238 obj.init_ns(bucket_info.bucket, oid, RGW_OBJ_NS_MULTIPART);
239 obj.index_hash_source = mp_obj.get_key();
240 ret = store->getRados()->delete_obj(dpp, *obj_ctx, bucket_info, obj, 0);
241 if (ret < 0 && ret != -ENOENT)
242 return ret;
243 } else {
244 store->getRados()->update_gc_chain(dpp, meta_obj, obj_part.manifest, &chain);
245 RGWObjManifest::obj_iterator oiter = obj_part.manifest.obj_begin(dpp);
246 if (oiter != obj_part.manifest.obj_end(dpp)) {
247 rgw_obj head;
248 rgw_raw_obj raw_head = oiter.get_location().get_raw_obj(store);
249 RGWSI_Tier_RADOS::raw_obj_to_obj(bucket_info.bucket, raw_head, &head);
250
251 rgw_obj_index_key key;
252 head.key.get_index_key(&key);
253 remove_objs.push_back(key);
254 }
255 }
256 parts_accounted_size += obj_part.accounted_size;
257 }
258 } while (truncated);
259
260 /* use upload id as tag and do it synchronously */
261 ret = store->getRados()->send_chain_to_gc(chain, mp_obj.get_upload_id());
262 if (ret < 0) {
263 ldpp_dout(dpp, 5) << __func__ << ": gc->send_chain() returned " << ret << dendl;
264 if (ret == -ENOENT) {
265 return -ERR_NO_SUCH_UPLOAD;
266 }
267 //Delete objects inline if send chain to gc fails
268 store->getRados()->delete_objs_inline(dpp, chain, mp_obj.get_upload_id());
269 }
270
271 RGWRados::Object del_target(store->getRados(), bucket_info, *obj_ctx, meta_obj);
272 RGWRados::Object::Delete del_op(&del_target);
273 del_op.params.bucket_owner = bucket_info.owner;
274 del_op.params.versioning_status = 0;
275 if (!remove_objs.empty()) {
276 del_op.params.remove_objs = &remove_objs;
277 }
278
279 del_op.params.abortmp = true;
280 del_op.params.parts_accounted_size = parts_accounted_size;
281
282 // and also remove the metadata obj
283 ret = del_op.delete_obj(null_yield, dpp);
284 if (ret < 0) {
285 ldpp_dout(dpp, 20) << __func__ << ": del_op.delete_obj returned " <<
286 ret << dendl;
287 }
288 return (ret == -ENOENT) ? -ERR_NO_SUCH_UPLOAD : ret;
289 }
290
291 int list_bucket_multiparts(const DoutPrefixProvider *dpp,
292 rgw::sal::RGWRadosStore *store, RGWBucketInfo& bucket_info,
293 const string& prefix, const string& marker,
294 const string& delim,
295 const int& max_uploads,
296 vector<rgw_bucket_dir_entry> *objs,
297 map<string, bool> *common_prefixes, bool *is_truncated)
298 {
299 RGWRados::Bucket target(store->getRados(), bucket_info);
300 RGWRados::Bucket::List list_op(&target);
301 MultipartMetaFilter mp_filter;
302
303 list_op.params.prefix = prefix;
304 list_op.params.delim = delim;
305 list_op.params.marker = marker;
306 list_op.params.ns = RGW_OBJ_NS_MULTIPART;
307 list_op.params.filter = &mp_filter;
308
309 return(list_op.list_objects(dpp, max_uploads, objs, common_prefixes, is_truncated, null_yield));
310 }
311
312 int abort_bucket_multiparts(const DoutPrefixProvider *dpp,
313 rgw::sal::RGWRadosStore *store, CephContext *cct,
314 RGWBucketInfo& bucket_info, string& prefix, string& delim)
315 {
316 constexpr int max = 1000;
317 int ret, num_deleted = 0;
318 vector<rgw_bucket_dir_entry> objs;
319 RGWObjectCtx obj_ctx(store);
320 string marker;
321 bool is_truncated;
322
323 do {
324 ret = list_bucket_multiparts(dpp, store, bucket_info, prefix, marker, delim,
325 max, &objs, nullptr, &is_truncated);
326 if (ret < 0) {
327 ldpp_dout(dpp, 0) << __func__ <<
328 " ERROR : calling list_bucket_multiparts; ret=" << ret <<
329 "; bucket=\"" << bucket_info.bucket << "\"; prefix=\"" <<
330 prefix << "\"; delim=\"" << delim << "\"" << dendl;
331 return ret;
332 }
333 ldpp_dout(dpp, 20) << __func__ <<
334 " INFO: aborting and cleaning up multipart upload(s); bucket=\"" <<
335 bucket_info.bucket << "\"; objs.size()=" << objs.size() <<
336 "; is_truncated=" << is_truncated << dendl;
337
338 if (!objs.empty()) {
339 RGWMPObj mp;
340 for (const auto& obj : objs) {
341 rgw_obj_key key(obj.key);
342 if (!mp.from_meta(key.name))
343 continue;
344 ret = abort_multipart_upload(dpp, store, cct, &obj_ctx, bucket_info, mp);
345 if (ret < 0) {
346 // we're doing a best-effort; if something cannot be found,
347 // log it and keep moving forward
348 if (ret != -ENOENT && ret != -ERR_NO_SUCH_UPLOAD) {
349 ldpp_dout(dpp, 0) << __func__ <<
350 " ERROR : failed to abort and clean-up multipart upload \"" <<
351 key.get_oid() << "\"" << dendl;
352 return ret;
353 } else {
354 ldpp_dout(dpp, 10) << __func__ <<
355 " NOTE : unable to find part(s) of "
356 "aborted multipart upload of \"" << key.get_oid() <<
357 "\" for cleaning up" << dendl;
358 }
359 }
360 num_deleted++;
361 }
362 if (num_deleted) {
363 ldpp_dout(dpp, 0) << __func__ <<
364 " WARNING : aborted " << num_deleted <<
365 " incomplete multipart uploads" << dendl;
366 }
367 }
368 } while (is_truncated);
369
370 return 0;
371 }