]>
Commit | Line | Data |
---|---|---|
11fdf7f2 | 1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
9f95a23c | 2 | // vim: ts=8 sw=2 smarttab ft=cpp |
11fdf7f2 TL |
3 | |
4 | /* | |
5 | * Ceph - scalable distributed file system | |
6 | * | |
7 | * Copyright (C) 2018 Red Hat, Inc. | |
8 | * | |
9 | * This is free software; you can redistribute it and/or | |
10 | * modify it under the terms of the GNU Lesser General Public | |
11 | * License version 2.1, as published by the Free Software | |
12 | * Foundation. See file COPYING. | |
13 | * | |
14 | */ | |
15 | ||
1e59de90 | 16 | #include "include/rados/librados.hpp" |
11fdf7f2 TL |
17 | #include "rgw_aio.h" |
18 | #include "rgw_putobj_processor.h" | |
19 | #include "rgw_multi.h" | |
9f95a23c | 20 | #include "rgw_compression.h" |
11fdf7f2 | 21 | #include "services/svc_sys_obj.h" |
39ae355f | 22 | #include "services/svc_zone.h" |
f67539c2 | 23 | #include "rgw_sal_rados.h" |
11fdf7f2 TL |
24 | |
25 | #define dout_subsys ceph_subsys_rgw | |
26 | ||
20effc67 TL |
27 | using namespace std; |
28 | ||
11fdf7f2 TL |
29 | namespace rgw::putobj { |
30 | ||
1e59de90 TL |
31 | /* |
32 | * For the cloudtiered objects, update the object manifest with the | |
33 | * cloudtier config info read from the attrs. | |
34 | * Since these attrs are used internally for only replication, do not store them | |
35 | * in the head object. | |
36 | */ | |
37 | void read_cloudtier_info_from_attrs(rgw::sal::Attrs& attrs, RGWObjCategory& category, | |
38 | RGWObjManifest& manifest) { | |
39 | auto attr_iter = attrs.find(RGW_ATTR_CLOUD_TIER_TYPE); | |
40 | if (attr_iter != attrs.end()) { | |
41 | auto i = attr_iter->second; | |
42 | string m = i.to_str(); | |
43 | ||
44 | if (m == "cloud-s3") { | |
45 | category = RGWObjCategory::CloudTiered; | |
46 | manifest.set_tier_type("cloud-s3"); | |
47 | ||
48 | auto config_iter = attrs.find(RGW_ATTR_CLOUD_TIER_CONFIG); | |
49 | if (config_iter != attrs.end()) { | |
50 | auto i = config_iter->second.cbegin(); | |
51 | RGWObjTier tier_config; | |
52 | ||
53 | try { | |
54 | using ceph::decode; | |
55 | decode(tier_config, i); | |
56 | manifest.set_tier_config(tier_config); | |
57 | attrs.erase(config_iter); | |
58 | } catch (buffer::error& err) { | |
59 | } | |
60 | } | |
61 | } | |
62 | attrs.erase(attr_iter); | |
63 | } | |
64 | } | |
65 | ||
11fdf7f2 TL |
66 | int HeadObjectProcessor::process(bufferlist&& data, uint64_t logical_offset) |
67 | { | |
68 | const bool flush = (data.length() == 0); | |
69 | ||
70 | // capture the first chunk for special handling | |
71 | if (data_offset < head_chunk_size || data_offset == 0) { | |
72 | if (flush) { | |
73 | // flush partial chunk | |
74 | return process_first_chunk(std::move(head_data), &processor); | |
75 | } | |
76 | ||
77 | auto remaining = head_chunk_size - data_offset; | |
78 | auto count = std::min<uint64_t>(data.length(), remaining); | |
79 | data.splice(0, count, &head_data); | |
80 | data_offset += count; | |
81 | ||
82 | if (data_offset == head_chunk_size) { | |
83 | // process the first complete chunk | |
84 | ceph_assert(head_data.length() == head_chunk_size); | |
85 | int r = process_first_chunk(std::move(head_data), &processor); | |
86 | if (r < 0) { | |
87 | return r; | |
88 | } | |
89 | } | |
90 | if (data.length() == 0) { // avoid flushing stripe processor | |
91 | return 0; | |
92 | } | |
93 | } | |
94 | ceph_assert(processor); // process_first_chunk() must initialize | |
95 | ||
96 | // send everything else through the processor | |
97 | auto write_offset = data_offset; | |
98 | data_offset += data.length(); | |
99 | return processor->process(std::move(data), write_offset); | |
100 | } | |
101 | ||
102 | ||
103 | static int process_completed(const AioResultList& completed, RawObjSet *written) | |
104 | { | |
105 | std::optional<int> error; | |
106 | for (auto& r : completed) { | |
107 | if (r.result >= 0) { | |
108 | written->insert(r.obj.get_ref().obj); | |
109 | } else if (!error) { // record first error code | |
110 | error = r.result; | |
111 | } | |
112 | } | |
113 | return error.value_or(0); | |
114 | } | |
115 | ||
20effc67 | 116 | void RadosWriter::add_write_hint(librados::ObjectWriteOperation& op) { |
1e59de90 TL |
117 | const RGWObjStateManifest *sm = obj_ctx.get_state(head_obj); |
118 | const bool compressed = sm->state.compressed; | |
20effc67 TL |
119 | uint32_t alloc_hint_flags = 0; |
120 | if (compressed) { | |
121 | alloc_hint_flags |= librados::ALLOC_HINT_FLAG_INCOMPRESSIBLE; | |
122 | } | |
123 | ||
124 | op.set_alloc_hint2(0, 0, alloc_hint_flags); | |
125 | } | |
126 | ||
11fdf7f2 TL |
127 | int RadosWriter::set_stripe_obj(const rgw_raw_obj& raw_obj) |
128 | { | |
1e59de90 | 129 | stripe_obj = store->svc.rados->obj(raw_obj); |
b3b6e05e | 130 | return stripe_obj.open(dpp); |
11fdf7f2 TL |
131 | } |
132 | ||
133 | int RadosWriter::process(bufferlist&& bl, uint64_t offset) | |
134 | { | |
135 | bufferlist data = std::move(bl); | |
136 | const uint64_t cost = data.length(); | |
137 | if (cost == 0) { // no empty writes, use aio directly for creates | |
138 | return 0; | |
139 | } | |
140 | librados::ObjectWriteOperation op; | |
20effc67 | 141 | add_write_hint(op); |
11fdf7f2 TL |
142 | if (offset == 0) { |
143 | op.write_full(data); | |
144 | } else { | |
145 | op.write(offset, data); | |
146 | } | |
147 | constexpr uint64_t id = 0; // unused | |
9f95a23c | 148 | auto c = aio->get(stripe_obj, Aio::librados_op(std::move(op), y), cost, id); |
11fdf7f2 TL |
149 | return process_completed(c, &written); |
150 | } | |
151 | ||
152 | int RadosWriter::write_exclusive(const bufferlist& data) | |
153 | { | |
154 | const uint64_t cost = data.length(); | |
155 | ||
156 | librados::ObjectWriteOperation op; | |
157 | op.create(true); // exclusive create | |
20effc67 | 158 | add_write_hint(op); |
11fdf7f2 TL |
159 | op.write_full(data); |
160 | ||
161 | constexpr uint64_t id = 0; // unused | |
9f95a23c | 162 | auto c = aio->get(stripe_obj, Aio::librados_op(std::move(op), y), cost, id); |
11fdf7f2 TL |
163 | auto d = aio->drain(); |
164 | c.splice(c.end(), d); | |
165 | return process_completed(c, &written); | |
166 | } | |
167 | ||
168 | int RadosWriter::drain() | |
169 | { | |
170 | return process_completed(aio->drain(), &written); | |
171 | } | |
172 | ||
173 | RadosWriter::~RadosWriter() | |
174 | { | |
175 | // wait on any outstanding aio completions | |
176 | process_completed(aio->drain(), &written); | |
177 | ||
178 | bool need_to_remove_head = false; | |
179 | std::optional<rgw_raw_obj> raw_head; | |
1e59de90 | 180 | if (!head_obj.empty()) { |
11fdf7f2 | 181 | raw_head.emplace(); |
1e59de90 | 182 | store->obj_to_raw(bucket_info.placement_rule, head_obj, &*raw_head); |
11fdf7f2 TL |
183 | } |
184 | ||
185 | /** | |
186 | * We should delete the object in the "multipart" namespace to avoid race condition. | |
187 | * Such race condition is caused by the fact that the multipart object is the gatekeeper of a multipart | |
188 | * upload, when it is deleted, a second upload would start with the same suffix("2/"), therefore, objects | |
189 | * written by the second upload may be deleted by the first upload. | |
190 | * details is describled on #11749 | |
191 | * | |
192 | * The above comment still stands, but instead of searching for a specific object in the multipart | |
193 | * namespace, we just make sure that we remove the object that is marked as the head object after | |
194 | * we remove all the other raw objects. Note that we use different call to remove the head object, | |
195 | * as this one needs to go via the bucket index prepare/complete 2-phase commit scheme. | |
196 | */ | |
197 | for (const auto& obj : written) { | |
198 | if (raw_head && obj == *raw_head) { | |
9f95a23c | 199 | ldpp_dout(dpp, 5) << "NOTE: we should not process the head object (" << obj << ") here" << dendl; |
11fdf7f2 TL |
200 | need_to_remove_head = true; |
201 | continue; | |
202 | } | |
203 | ||
b3b6e05e | 204 | int r = store->delete_raw_obj(dpp, obj); |
11fdf7f2 | 205 | if (r < 0 && r != -ENOENT) { |
f67539c2 | 206 | ldpp_dout(dpp, 0) << "WARNING: failed to remove obj (" << obj << "), leaked" << dendl; |
11fdf7f2 TL |
207 | } |
208 | } | |
209 | ||
210 | if (need_to_remove_head) { | |
f67539c2 | 211 | std::string version_id; |
9f95a23c | 212 | ldpp_dout(dpp, 5) << "NOTE: we are going to process the head obj (" << *raw_head << ")" << dendl; |
1e59de90 | 213 | int r = store->delete_obj(dpp, obj_ctx, bucket_info, head_obj, 0, 0); |
11fdf7f2 | 214 | if (r < 0 && r != -ENOENT) { |
9f95a23c | 215 | ldpp_dout(dpp, 0) << "WARNING: failed to remove obj (" << *raw_head << "), leaked" << dendl; |
11fdf7f2 TL |
216 | } |
217 | } | |
218 | } | |
219 | ||
220 | ||
221 | // advance to the next stripe | |
222 | int ManifestObjectProcessor::next(uint64_t offset, uint64_t *pstripe_size) | |
223 | { | |
224 | // advance the manifest | |
225 | int r = manifest_gen.create_next(offset); | |
226 | if (r < 0) { | |
227 | return r; | |
228 | } | |
229 | ||
f67539c2 | 230 | rgw_raw_obj stripe_obj = manifest_gen.get_cur_obj(store); |
11fdf7f2 TL |
231 | |
232 | uint64_t chunk_size = 0; | |
1e59de90 | 233 | r = store->get_max_chunk_size(stripe_obj.pool, &chunk_size, dpp); |
11fdf7f2 TL |
234 | if (r < 0) { |
235 | return r; | |
236 | } | |
237 | r = writer.set_stripe_obj(stripe_obj); | |
238 | if (r < 0) { | |
239 | return r; | |
240 | } | |
241 | ||
242 | chunk = ChunkProcessor(&writer, chunk_size); | |
243 | *pstripe_size = manifest_gen.cur_stripe_max_size(); | |
244 | return 0; | |
245 | } | |
246 | ||
247 | ||
9f95a23c | 248 | |
11fdf7f2 TL |
249 | int AtomicObjectProcessor::process_first_chunk(bufferlist&& data, |
250 | DataProcessor **processor) | |
251 | { | |
252 | first_chunk = std::move(data); | |
253 | *processor = &stripe; | |
254 | return 0; | |
255 | } | |
256 | ||
9f95a23c | 257 | int AtomicObjectProcessor::prepare(optional_yield y) |
11fdf7f2 TL |
258 | { |
259 | uint64_t max_head_chunk_size; | |
260 | uint64_t head_max_size; | |
261 | uint64_t chunk_size = 0; | |
262 | uint64_t alignment; | |
1e59de90 TL |
263 | rgw_pool head_pool; |
264 | ||
265 | if (!store->get_obj_data_pool(bucket_info.placement_rule, head_obj, &head_pool)) { | |
266 | return -EIO; | |
267 | } | |
11fdf7f2 | 268 | |
1e59de90 | 269 | int r = store->get_max_chunk_size(head_pool, &max_head_chunk_size, dpp, &alignment); |
11fdf7f2 TL |
270 | if (r < 0) { |
271 | return r; | |
272 | } | |
273 | ||
274 | bool same_pool = true; | |
1e59de90 TL |
275 | if (bucket_info.placement_rule != tail_placement_rule) { |
276 | rgw_pool tail_pool; | |
277 | if (!store->get_obj_data_pool(tail_placement_rule, head_obj, &tail_pool)) { | |
278 | return -EIO; | |
279 | } | |
280 | ||
281 | if (tail_pool != head_pool) { | |
11fdf7f2 | 282 | same_pool = false; |
1e59de90 TL |
283 | |
284 | r = store->get_max_chunk_size(tail_pool, &chunk_size, dpp); | |
11fdf7f2 TL |
285 | if (r < 0) { |
286 | return r; | |
287 | } | |
1e59de90 | 288 | |
11fdf7f2 TL |
289 | head_max_size = 0; |
290 | } | |
291 | } | |
292 | ||
293 | if (same_pool) { | |
39ae355f | 294 | RGWZonePlacementInfo placement_info; |
1e59de90 | 295 | if (!store->svc.zone->get_zone_params().get_placement(bucket_info.placement_rule.name, &placement_info) || placement_info.inline_data) { |
39ae355f TL |
296 | head_max_size = max_head_chunk_size; |
297 | } else { | |
298 | head_max_size = 0; | |
299 | } | |
11fdf7f2 TL |
300 | chunk_size = max_head_chunk_size; |
301 | } | |
302 | ||
303 | uint64_t stripe_size; | |
304 | const uint64_t default_stripe_size = store->ctx()->_conf->rgw_obj_stripe_size; | |
305 | ||
1e59de90 | 306 | store->get_max_aligned_size(default_stripe_size, alignment, &stripe_size); |
11fdf7f2 TL |
307 | |
308 | manifest.set_trivial_rule(head_max_size, stripe_size); | |
309 | ||
310 | r = manifest_gen.create_begin(store->ctx(), &manifest, | |
1e59de90 | 311 | bucket_info.placement_rule, |
11fdf7f2 | 312 | &tail_placement_rule, |
1e59de90 | 313 | head_obj.bucket, head_obj); |
11fdf7f2 TL |
314 | if (r < 0) { |
315 | return r; | |
316 | } | |
317 | ||
f67539c2 | 318 | rgw_raw_obj stripe_obj = manifest_gen.get_cur_obj(store); |
11fdf7f2 TL |
319 | |
320 | r = writer.set_stripe_obj(stripe_obj); | |
321 | if (r < 0) { | |
322 | return r; | |
323 | } | |
324 | ||
325 | set_head_chunk_size(head_max_size); | |
326 | // initialize the processors | |
327 | chunk = ChunkProcessor(&writer, chunk_size); | |
328 | stripe = StripeProcessor(&chunk, this, head_max_size); | |
329 | return 0; | |
330 | } | |
331 | ||
332 | int AtomicObjectProcessor::complete(size_t accounted_size, | |
333 | const std::string& etag, | |
334 | ceph::real_time *mtime, | |
335 | ceph::real_time set_mtime, | |
20effc67 | 336 | rgw::sal::Attrs& attrs, |
11fdf7f2 TL |
337 | ceph::real_time delete_at, |
338 | const char *if_match, | |
339 | const char *if_nomatch, | |
340 | const std::string *user_data, | |
341 | rgw_zone_set *zones_trace, | |
9f95a23c | 342 | bool *pcanceled, optional_yield y) |
11fdf7f2 TL |
343 | { |
344 | int r = writer.drain(); | |
345 | if (r < 0) { | |
346 | return r; | |
347 | } | |
348 | const uint64_t actual_size = get_actual_size(); | |
349 | r = manifest_gen.create_next(actual_size); | |
350 | if (r < 0) { | |
351 | return r; | |
352 | } | |
353 | ||
1e59de90 | 354 | obj_ctx.set_atomic(head_obj); |
11fdf7f2 | 355 | |
1e59de90 | 356 | RGWRados::Object op_target(store, bucket_info, obj_ctx, head_obj); |
11fdf7f2 TL |
357 | |
358 | /* some object types shouldn't be versioned, e.g., multipart parts */ | |
1e59de90 TL |
359 | op_target.set_versioning_disabled(!bucket_info.versioning_enabled()); |
360 | ||
361 | RGWRados::Object::Write obj_op(&op_target); | |
20effc67 TL |
362 | obj_op.meta.data = &first_chunk; |
363 | obj_op.meta.manifest = &manifest; | |
364 | obj_op.meta.ptag = &unique_tag; /* use req_id as operation tag */ | |
365 | obj_op.meta.if_match = if_match; | |
366 | obj_op.meta.if_nomatch = if_nomatch; | |
367 | obj_op.meta.mtime = mtime; | |
368 | obj_op.meta.set_mtime = set_mtime; | |
369 | obj_op.meta.owner = owner; | |
370 | obj_op.meta.flags = PUT_OBJ_CREATE; | |
371 | obj_op.meta.olh_epoch = olh_epoch; | |
372 | obj_op.meta.delete_at = delete_at; | |
373 | obj_op.meta.user_data = user_data; | |
374 | obj_op.meta.zones_trace = zones_trace; | |
375 | obj_op.meta.modify_tail = true; | |
376 | ||
1e59de90 TL |
377 | read_cloudtier_info_from_attrs(attrs, obj_op.meta.category, manifest); |
378 | ||
20effc67 | 379 | r = obj_op.write_meta(dpp, actual_size, accounted_size, attrs, y); |
f67539c2 | 380 | if (r < 0) { |
39ae355f TL |
381 | if (r == -ETIMEDOUT) { |
382 | // The head object write may eventually succeed, clear the set of objects for deletion. if it | |
383 | // doesn't ever succeed, we'll orphan any tail objects as if we'd crashed before that write | |
384 | writer.clear_written(); | |
385 | } | |
f67539c2 TL |
386 | return r; |
387 | } | |
20effc67 | 388 | if (!obj_op.meta.canceled) { |
11fdf7f2 TL |
389 | // on success, clear the set of objects for deletion |
390 | writer.clear_written(); | |
391 | } | |
392 | if (pcanceled) { | |
20effc67 | 393 | *pcanceled = obj_op.meta.canceled; |
11fdf7f2 TL |
394 | } |
395 | return 0; | |
396 | } | |
397 | ||
398 | ||
399 | int MultipartObjectProcessor::process_first_chunk(bufferlist&& data, | |
400 | DataProcessor **processor) | |
401 | { | |
402 | // write the first chunk of the head object as part of an exclusive create, | |
403 | // then drain to wait for the result in case of EEXIST | |
404 | int r = writer.write_exclusive(data); | |
405 | if (r == -EEXIST) { | |
406 | // randomize the oid prefix and reprepare the head/manifest | |
cd265ab1 | 407 | std::string oid_rand = gen_rand_alphanumeric(store->ctx(), 32); |
11fdf7f2 | 408 | |
1e59de90 TL |
409 | mp.init(target_obj.key.name, upload_id, oid_rand); |
410 | manifest.set_prefix(target_obj.key.name + "." + oid_rand); | |
11fdf7f2 TL |
411 | |
412 | r = prepare_head(); | |
413 | if (r < 0) { | |
414 | return r; | |
415 | } | |
416 | // resubmit the write op on the new head object | |
417 | r = writer.write_exclusive(data); | |
418 | } | |
419 | if (r < 0) { | |
420 | return r; | |
421 | } | |
422 | *processor = &stripe; | |
423 | return 0; | |
424 | } | |
425 | ||
426 | int MultipartObjectProcessor::prepare_head() | |
427 | { | |
428 | const uint64_t default_stripe_size = store->ctx()->_conf->rgw_obj_stripe_size; | |
429 | uint64_t chunk_size; | |
430 | uint64_t stripe_size; | |
431 | uint64_t alignment; | |
432 | ||
1e59de90 | 433 | int r = store->get_max_chunk_size(tail_placement_rule, target_obj, &chunk_size, dpp, &alignment); |
11fdf7f2 | 434 | if (r < 0) { |
9f95a23c | 435 | ldpp_dout(dpp, 0) << "ERROR: unexpected: get_max_chunk_size(): placement_rule=" << tail_placement_rule.to_str() << " obj=" << target_obj << " returned r=" << r << dendl; |
11fdf7f2 TL |
436 | return r; |
437 | } | |
1e59de90 | 438 | store->get_max_aligned_size(default_stripe_size, alignment, &stripe_size); |
11fdf7f2 TL |
439 | |
440 | manifest.set_multipart_part_rule(stripe_size, part_num); | |
441 | ||
442 | r = manifest_gen.create_begin(store->ctx(), &manifest, | |
1e59de90 | 443 | bucket_info.placement_rule, |
f67539c2 | 444 | &tail_placement_rule, |
1e59de90 | 445 | target_obj.bucket, target_obj); |
11fdf7f2 TL |
446 | if (r < 0) { |
447 | return r; | |
448 | } | |
449 | ||
f67539c2 | 450 | rgw_raw_obj stripe_obj = manifest_gen.get_cur_obj(store); |
1e59de90 TL |
451 | RGWSI_Tier_RADOS::raw_obj_to_obj(head_obj.bucket, stripe_obj, &head_obj); |
452 | head_obj.index_hash_source = target_obj.key.name; | |
11fdf7f2 TL |
453 | |
454 | r = writer.set_stripe_obj(stripe_obj); | |
455 | if (r < 0) { | |
456 | return r; | |
457 | } | |
458 | stripe_size = manifest_gen.cur_stripe_max_size(); | |
9f95a23c | 459 | set_head_chunk_size(stripe_size); |
11fdf7f2 TL |
460 | |
461 | chunk = ChunkProcessor(&writer, chunk_size); | |
9f95a23c | 462 | stripe = StripeProcessor(&chunk, this, stripe_size); |
11fdf7f2 TL |
463 | return 0; |
464 | } | |
465 | ||
9f95a23c | 466 | int MultipartObjectProcessor::prepare(optional_yield y) |
11fdf7f2 | 467 | { |
1e59de90 | 468 | manifest.set_prefix(target_obj.key.name + "." + upload_id); |
11fdf7f2 TL |
469 | |
470 | return prepare_head(); | |
471 | } | |
472 | ||
473 | int MultipartObjectProcessor::complete(size_t accounted_size, | |
474 | const std::string& etag, | |
475 | ceph::real_time *mtime, | |
476 | ceph::real_time set_mtime, | |
477 | std::map<std::string, bufferlist>& attrs, | |
478 | ceph::real_time delete_at, | |
479 | const char *if_match, | |
480 | const char *if_nomatch, | |
481 | const std::string *user_data, | |
482 | rgw_zone_set *zones_trace, | |
9f95a23c | 483 | bool *pcanceled, optional_yield y) |
11fdf7f2 TL |
484 | { |
485 | int r = writer.drain(); | |
486 | if (r < 0) { | |
487 | return r; | |
488 | } | |
489 | const uint64_t actual_size = get_actual_size(); | |
490 | r = manifest_gen.create_next(actual_size); | |
491 | if (r < 0) { | |
492 | return r; | |
493 | } | |
494 | ||
1e59de90 | 495 | RGWRados::Object op_target(store, bucket_info, obj_ctx, head_obj); |
20effc67 TL |
496 | op_target.set_versioning_disabled(true); |
497 | op_target.set_meta_placement_rule(&tail_placement_rule); | |
1e59de90 TL |
498 | |
499 | RGWRados::Object::Write obj_op(&op_target); | |
20effc67 TL |
500 | obj_op.meta.set_mtime = set_mtime; |
501 | obj_op.meta.mtime = mtime; | |
502 | obj_op.meta.owner = owner; | |
503 | obj_op.meta.delete_at = delete_at; | |
504 | obj_op.meta.zones_trace = zones_trace; | |
505 | obj_op.meta.modify_tail = true; | |
11fdf7f2 | 506 | |
20effc67 | 507 | r = obj_op.write_meta(dpp, actual_size, accounted_size, attrs, y); |
11fdf7f2 TL |
508 | if (r < 0) |
509 | return r; | |
510 | ||
11fdf7f2 TL |
511 | RGWUploadPartInfo info; |
512 | string p = "part."; | |
513 | bool sorted_omap = is_v2_upload_id(upload_id); | |
514 | ||
515 | if (sorted_omap) { | |
516 | char buf[32]; | |
517 | snprintf(buf, sizeof(buf), "%08d", part_num); | |
518 | p.append(buf); | |
519 | } else { | |
520 | p.append(part_num_str); | |
521 | } | |
522 | info.num = part_num; | |
523 | info.etag = etag; | |
524 | info.size = actual_size; | |
525 | info.accounted_size = accounted_size; | |
526 | info.modified = real_clock::now(); | |
527 | info.manifest = manifest; | |
528 | ||
529 | bool compressed; | |
530 | r = rgw_compression_info_from_attrset(attrs, compressed, info.cs_info); | |
531 | if (r < 0) { | |
9f95a23c | 532 | ldpp_dout(dpp, 1) << "cannot get compression info" << dendl; |
11fdf7f2 TL |
533 | return r; |
534 | } | |
535 | ||
1e59de90 TL |
536 | rgw_obj meta_obj; |
537 | meta_obj.init_ns(bucket_info.bucket, mp.get_meta(), RGW_OBJ_NS_MULTIPART); | |
538 | meta_obj.set_in_extra_data(true); | |
539 | ||
540 | rgw_raw_obj meta_raw_obj; | |
541 | store->obj_to_raw(bucket_info.placement_rule, meta_obj, &meta_raw_obj); | |
542 | ||
543 | rgw_rados_ref meta_obj_ref; | |
544 | r = store->get_raw_obj_ref(dpp, meta_raw_obj, &meta_obj_ref); | |
545 | if (r < 0) { | |
546 | ldpp_dout(dpp, -1) << "ERROR: failed to get obj ref of meta obj with ret=" << r << dendl; | |
547 | return r; | |
548 | } | |
549 | ||
550 | librados::ObjectWriteOperation op; | |
551 | cls_rgw_mp_upload_part_info_update(op, p, info); | |
552 | r = rgw_rados_operate(dpp, meta_obj_ref.pool.ioctx(), meta_obj_ref.obj.oid, &op, y); | |
553 | ldpp_dout(dpp, 20) << "Update meta: " << meta_obj_ref.obj.oid << " part " << p << " prefix " << info.manifest.get_prefix() << " return " << r << dendl; | |
11fdf7f2 | 554 | |
1e59de90 TL |
555 | if (r == -EOPNOTSUPP) { |
556 | // New CLS call to update part info is not yet supported. Fall back to the old handling. | |
557 | bufferlist bl; | |
558 | encode(info, bl); | |
11fdf7f2 | 559 | |
1e59de90 TL |
560 | map<string, bufferlist> m; |
561 | m[p] = bl; | |
562 | ||
563 | op = librados::ObjectWriteOperation{}; | |
564 | op.assert_exists(); // detect races with abort | |
565 | op.omap_set(m); | |
566 | r = rgw_rados_operate(dpp, meta_obj_ref.pool.ioctx(), meta_obj_ref.obj.oid, &op, y); | |
567 | } | |
11fdf7f2 | 568 | if (r < 0) { |
9f95a23c | 569 | return r == -ENOENT ? -ERR_NO_SUCH_UPLOAD : r; |
11fdf7f2 TL |
570 | } |
571 | ||
20effc67 | 572 | if (!obj_op.meta.canceled) { |
11fdf7f2 TL |
573 | // on success, clear the set of objects for deletion |
574 | writer.clear_written(); | |
575 | } | |
576 | if (pcanceled) { | |
20effc67 | 577 | *pcanceled = obj_op.meta.canceled; |
11fdf7f2 TL |
578 | } |
579 | return 0; | |
580 | } | |
581 | ||
20effc67 | 582 | int AppendObjectProcessor::process_first_chunk(bufferlist &&data, rgw::sal::DataProcessor **processor) |
11fdf7f2 TL |
583 | { |
584 | int r = writer.write_exclusive(data); | |
585 | if (r < 0) { | |
586 | return r; | |
587 | } | |
588 | *processor = &stripe; | |
589 | return 0; | |
590 | } | |
591 | ||
9f95a23c | 592 | int AppendObjectProcessor::prepare(optional_yield y) |
11fdf7f2 TL |
593 | { |
594 | RGWObjState *astate; | |
1e59de90 TL |
595 | int r = store->get_obj_state(dpp, &obj_ctx, bucket_info, head_obj, |
596 | &astate, &cur_manifest, y); | |
11fdf7f2 TL |
597 | if (r < 0) { |
598 | return r; | |
599 | } | |
600 | cur_size = astate->size; | |
601 | *cur_accounted_size = astate->accounted_size; | |
602 | if (!astate->exists) { | |
603 | if (position != 0) { | |
9f95a23c | 604 | ldpp_dout(dpp, 5) << "ERROR: Append position should be zero" << dendl; |
11fdf7f2 TL |
605 | return -ERR_POSITION_NOT_EQUAL_TO_LENGTH; |
606 | } else { | |
607 | cur_part_num = 1; | |
608 | //set the prefix | |
609 | char buf[33]; | |
610 | gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1); | |
1e59de90 | 611 | string oid_prefix = head_obj.key.name; |
11fdf7f2 TL |
612 | oid_prefix.append("."); |
613 | oid_prefix.append(buf); | |
614 | oid_prefix.append("_"); | |
615 | manifest.set_prefix(oid_prefix); | |
616 | } | |
617 | } else { | |
618 | // check whether the object appendable | |
619 | map<string, bufferlist>::iterator iter = astate->attrset.find(RGW_ATTR_APPEND_PART_NUM); | |
620 | if (iter == astate->attrset.end()) { | |
9f95a23c | 621 | ldpp_dout(dpp, 5) << "ERROR: The object is not appendable" << dendl; |
11fdf7f2 TL |
622 | return -ERR_OBJECT_NOT_APPENDABLE; |
623 | } | |
624 | if (position != *cur_accounted_size) { | |
9f95a23c | 625 | ldpp_dout(dpp, 5) << "ERROR: Append position should be equal to the obj size" << dendl; |
11fdf7f2 TL |
626 | return -ERR_POSITION_NOT_EQUAL_TO_LENGTH; |
627 | } | |
628 | try { | |
f67539c2 | 629 | using ceph::decode; |
11fdf7f2 TL |
630 | decode(cur_part_num, iter->second); |
631 | } catch (buffer::error& err) { | |
9f95a23c | 632 | ldpp_dout(dpp, 5) << "ERROR: failed to decode part num" << dendl; |
11fdf7f2 TL |
633 | return -EIO; |
634 | } | |
635 | cur_part_num++; | |
636 | //get the current obj etag | |
637 | iter = astate->attrset.find(RGW_ATTR_ETAG); | |
638 | if (iter != astate->attrset.end()) { | |
639 | string s = rgw_string_unquote(iter->second.c_str()); | |
640 | size_t pos = s.find("-"); | |
641 | cur_etag = s.substr(0, pos); | |
642 | } | |
9f95a23c TL |
643 | |
644 | iter = astate->attrset.find(RGW_ATTR_STORAGE_CLASS); | |
645 | if (iter != astate->attrset.end()) { | |
646 | tail_placement_rule.storage_class = iter->second.to_str(); | |
1e59de90 TL |
647 | } else { |
648 | tail_placement_rule.storage_class = RGW_STORAGE_CLASS_STANDARD; | |
9f95a23c | 649 | } |
11fdf7f2 | 650 | manifest.set_prefix(cur_manifest->get_prefix()); |
f6b5b4d7 | 651 | astate->keep_tail = true; |
11fdf7f2 TL |
652 | } |
653 | manifest.set_multipart_part_rule(store->ctx()->_conf->rgw_obj_stripe_size, cur_part_num); | |
654 | ||
1e59de90 | 655 | r = manifest_gen.create_begin(store->ctx(), &manifest, bucket_info.placement_rule, &tail_placement_rule, head_obj.bucket, head_obj); |
11fdf7f2 TL |
656 | if (r < 0) { |
657 | return r; | |
658 | } | |
f67539c2 | 659 | rgw_raw_obj stripe_obj = manifest_gen.get_cur_obj(store); |
11fdf7f2 TL |
660 | |
661 | uint64_t chunk_size = 0; | |
1e59de90 | 662 | r = store->get_max_chunk_size(stripe_obj.pool, &chunk_size, dpp); |
11fdf7f2 TL |
663 | if (r < 0) { |
664 | return r; | |
665 | } | |
666 | r = writer.set_stripe_obj(std::move(stripe_obj)); | |
667 | if (r < 0) { | |
668 | return r; | |
669 | } | |
670 | ||
671 | uint64_t stripe_size = manifest_gen.cur_stripe_max_size(); | |
672 | ||
673 | uint64_t max_head_size = std::min(chunk_size, stripe_size); | |
674 | set_head_chunk_size(max_head_size); | |
675 | ||
676 | // initialize the processors | |
677 | chunk = ChunkProcessor(&writer, chunk_size); | |
678 | stripe = StripeProcessor(&chunk, this, stripe_size); | |
679 | ||
680 | return 0; | |
681 | } | |
682 | ||
683 | int AppendObjectProcessor::complete(size_t accounted_size, const string &etag, ceph::real_time *mtime, | |
20effc67 | 684 | ceph::real_time set_mtime, rgw::sal::Attrs& attrs, |
11fdf7f2 | 685 | ceph::real_time delete_at, const char *if_match, const char *if_nomatch, |
9f95a23c TL |
686 | const string *user_data, rgw_zone_set *zones_trace, bool *pcanceled, |
687 | optional_yield y) | |
11fdf7f2 TL |
688 | { |
689 | int r = writer.drain(); | |
690 | if (r < 0) | |
691 | return r; | |
692 | const uint64_t actual_size = get_actual_size(); | |
693 | r = manifest_gen.create_next(actual_size); | |
694 | if (r < 0) { | |
695 | return r; | |
696 | } | |
1e59de90 TL |
697 | obj_ctx.set_atomic(head_obj); |
698 | RGWRados::Object op_target(store, bucket_info, obj_ctx, head_obj); | |
11fdf7f2 | 699 | //For Append obj, disable versioning |
20effc67 | 700 | op_target.set_versioning_disabled(true); |
1e59de90 | 701 | RGWRados::Object::Write obj_op(&op_target); |
11fdf7f2 | 702 | if (cur_manifest) { |
1e59de90 | 703 | cur_manifest->append(dpp, manifest, store->svc.zone->get_zonegroup(), store->svc.zone->get_zone_params()); |
20effc67 | 704 | obj_op.meta.manifest = cur_manifest; |
11fdf7f2 | 705 | } else { |
20effc67 TL |
706 | obj_op.meta.manifest = &manifest; |
707 | } | |
708 | obj_op.meta.ptag = &unique_tag; /* use req_id as operation tag */ | |
709 | obj_op.meta.mtime = mtime; | |
710 | obj_op.meta.set_mtime = set_mtime; | |
711 | obj_op.meta.owner = owner; | |
712 | obj_op.meta.flags = PUT_OBJ_CREATE; | |
713 | obj_op.meta.delete_at = delete_at; | |
714 | obj_op.meta.user_data = user_data; | |
715 | obj_op.meta.zones_trace = zones_trace; | |
716 | obj_op.meta.modify_tail = true; | |
717 | obj_op.meta.appendable = true; | |
11fdf7f2 TL |
718 | //Add the append part number |
719 | bufferlist cur_part_num_bl; | |
f67539c2 | 720 | using ceph::encode; |
11fdf7f2 TL |
721 | encode(cur_part_num, cur_part_num_bl); |
722 | attrs[RGW_ATTR_APPEND_PART_NUM] = cur_part_num_bl; | |
723 | //calculate the etag | |
724 | if (!cur_etag.empty()) { | |
725 | MD5 hash; | |
20effc67 TL |
726 | // Allow use of MD5 digest in FIPS mode for non-cryptographic purposes |
727 | hash.SetFlags(EVP_MD_CTX_FLAG_NON_FIPS_ALLOW); | |
11fdf7f2 TL |
728 | char petag[CEPH_CRYPTO_MD5_DIGESTSIZE]; |
729 | char final_etag[CEPH_CRYPTO_MD5_DIGESTSIZE]; | |
730 | char final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 16]; | |
731 | hex_to_buf(cur_etag.c_str(), petag, CEPH_CRYPTO_MD5_DIGESTSIZE); | |
732 | hash.Update((const unsigned char *)petag, sizeof(petag)); | |
733 | hex_to_buf(etag.c_str(), petag, CEPH_CRYPTO_MD5_DIGESTSIZE); | |
734 | hash.Update((const unsigned char *)petag, sizeof(petag)); | |
735 | hash.Final((unsigned char *)final_etag); | |
736 | buf_to_hex((unsigned char *)final_etag, sizeof(final_etag), final_etag_str); | |
737 | snprintf(&final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2], sizeof(final_etag_str) - CEPH_CRYPTO_MD5_DIGESTSIZE * 2, | |
738 | "-%lld", (long long)cur_part_num); | |
739 | bufferlist etag_bl; | |
740 | etag_bl.append(final_etag_str, strlen(final_etag_str) + 1); | |
741 | attrs[RGW_ATTR_ETAG] = etag_bl; | |
742 | } | |
20effc67 TL |
743 | r = obj_op.write_meta(dpp, actual_size + cur_size, |
744 | accounted_size + *cur_accounted_size, | |
745 | attrs, y); | |
11fdf7f2 TL |
746 | if (r < 0) { |
747 | return r; | |
748 | } | |
20effc67 | 749 | if (!obj_op.meta.canceled) { |
11fdf7f2 TL |
750 | // on success, clear the set of objects for deletion |
751 | writer.clear_written(); | |
752 | } | |
753 | if (pcanceled) { | |
20effc67 | 754 | *pcanceled = obj_op.meta.canceled; |
11fdf7f2 TL |
755 | } |
756 | *cur_accounted_size += accounted_size; | |
757 | ||
758 | return 0; | |
759 | } | |
760 | ||
761 | } // namespace rgw::putobj |