1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
5 * Ceph - scalable distributed file system
7 * Copyright (C) 2018 Red Hat, Inc.
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
17 #include "rgw_putobj_processor.h"
18 #include "rgw_multi.h"
19 #include "services/svc_sys_obj.h"
21 #define dout_subsys ceph_subsys_rgw
23 namespace rgw::putobj
{
25 int HeadObjectProcessor::process(bufferlist
&& data
, uint64_t logical_offset
)
27 const bool flush
= (data
.length() == 0);
29 // capture the first chunk for special handling
30 if (data_offset
< head_chunk_size
|| data_offset
== 0) {
32 // flush partial chunk
33 return process_first_chunk(std::move(head_data
), &processor
);
36 auto remaining
= head_chunk_size
- data_offset
;
37 auto count
= std::min
<uint64_t>(data
.length(), remaining
);
38 data
.splice(0, count
, &head_data
);
41 if (data_offset
== head_chunk_size
) {
42 // process the first complete chunk
43 ceph_assert(head_data
.length() == head_chunk_size
);
44 int r
= process_first_chunk(std::move(head_data
), &processor
);
49 if (data
.length() == 0) { // avoid flushing stripe processor
53 ceph_assert(processor
); // process_first_chunk() must initialize
55 // send everything else through the processor
56 auto write_offset
= data_offset
;
57 data_offset
+= data
.length();
58 return processor
->process(std::move(data
), write_offset
);
62 static int process_completed(const AioResultList
& completed
, RawObjSet
*written
)
64 std::optional
<int> error
;
65 for (auto& r
: completed
) {
67 written
->insert(r
.obj
.get_ref().obj
);
68 } else if (!error
) { // record first error code
72 return error
.value_or(0);
75 int RadosWriter::set_stripe_obj(const rgw_raw_obj
& raw_obj
)
77 stripe_obj
= store
->svc
.rados
->obj(raw_obj
);
78 return stripe_obj
.open();
81 int RadosWriter::process(bufferlist
&& bl
, uint64_t offset
)
83 bufferlist data
= std::move(bl
);
84 const uint64_t cost
= data
.length();
85 if (cost
== 0) { // no empty writes, use aio directly for creates
88 librados::ObjectWriteOperation op
;
92 op
.write(offset
, data
);
94 constexpr uint64_t id
= 0; // unused
95 auto c
= aio
->submit(stripe_obj
, &op
, cost
, id
);
96 return process_completed(c
, &written
);
99 int RadosWriter::write_exclusive(const bufferlist
& data
)
101 const uint64_t cost
= data
.length();
103 librados::ObjectWriteOperation op
;
104 op
.create(true); // exclusive create
107 constexpr uint64_t id
= 0; // unused
108 auto c
= aio
->submit(stripe_obj
, &op
, cost
, id
);
109 auto d
= aio
->drain();
110 c
.splice(c
.end(), d
);
111 return process_completed(c
, &written
);
114 int RadosWriter::drain()
116 return process_completed(aio
->drain(), &written
);
119 RadosWriter::~RadosWriter()
121 // wait on any outstanding aio completions
122 process_completed(aio
->drain(), &written
);
124 bool need_to_remove_head
= false;
125 std::optional
<rgw_raw_obj
> raw_head
;
126 if (!head_obj
.empty()) {
128 store
->obj_to_raw(bucket_info
.placement_rule
, head_obj
, &*raw_head
);
132 * We should delete the object in the "multipart" namespace to avoid race condition.
133 * Such race condition is caused by the fact that the multipart object is the gatekeeper of a multipart
134 * upload, when it is deleted, a second upload would start with the same suffix("2/"), therefore, objects
135 * written by the second upload may be deleted by the first upload.
136 * details is describled on #11749
138 * The above comment still stands, but instead of searching for a specific object in the multipart
139 * namespace, we just make sure that we remove the object that is marked as the head object after
140 * we remove all the other raw objects. Note that we use different call to remove the head object,
141 * as this one needs to go via the bucket index prepare/complete 2-phase commit scheme.
143 for (const auto& obj
: written
) {
144 if (raw_head
&& obj
== *raw_head
) {
145 ldout(store
->ctx(), 5) << "NOTE: we should not process the head object (" << obj
<< ") here" << dendl
;
146 need_to_remove_head
= true;
150 int r
= store
->delete_raw_obj(obj
);
151 if (r
< 0 && r
!= -ENOENT
) {
152 ldout(store
->ctx(), 5) << "WARNING: failed to remove obj (" << obj
<< "), leaked" << dendl
;
156 if (need_to_remove_head
) {
157 ldout(store
->ctx(), 5) << "NOTE: we are going to process the head obj (" << *raw_head
<< ")" << dendl
;
158 int r
= store
->delete_obj(obj_ctx
, bucket_info
, head_obj
, 0, 0);
159 if (r
< 0 && r
!= -ENOENT
) {
160 ldout(store
->ctx(), 0) << "WARNING: failed to remove obj (" << *raw_head
<< "), leaked" << dendl
;
166 // advance to the next stripe
167 int ManifestObjectProcessor::next(uint64_t offset
, uint64_t *pstripe_size
)
169 // advance the manifest
170 int r
= manifest_gen
.create_next(offset
);
175 rgw_raw_obj stripe_obj
= manifest_gen
.get_cur_obj(store
);
177 uint64_t chunk_size
= 0;
178 r
= store
->get_max_chunk_size(stripe_obj
.pool
, &chunk_size
);
182 r
= writer
.set_stripe_obj(stripe_obj
);
187 chunk
= ChunkProcessor(&writer
, chunk_size
);
188 *pstripe_size
= manifest_gen
.cur_stripe_max_size();
193 int AtomicObjectProcessor::process_first_chunk(bufferlist
&& data
,
194 DataProcessor
**processor
)
196 first_chunk
= std::move(data
);
197 *processor
= &stripe
;
201 int AtomicObjectProcessor::prepare()
203 uint64_t max_head_chunk_size
;
204 uint64_t head_max_size
;
205 uint64_t chunk_size
= 0;
209 if (!store
->get_obj_data_pool(bucket_info
.placement_rule
, head_obj
, &head_pool
)) {
213 int r
= store
->get_max_chunk_size(head_pool
, &max_head_chunk_size
, &alignment
);
218 bool same_pool
= true;
220 if (bucket_info
.placement_rule
!= tail_placement_rule
) {
222 if (!store
->get_obj_data_pool(tail_placement_rule
, head_obj
, &tail_pool
)) {
226 if (tail_pool
!= head_pool
) {
229 r
= store
->get_max_chunk_size(tail_pool
, &chunk_size
);
239 head_max_size
= max_head_chunk_size
;
240 chunk_size
= max_head_chunk_size
;
243 uint64_t stripe_size
;
244 const uint64_t default_stripe_size
= store
->ctx()->_conf
->rgw_obj_stripe_size
;
246 store
->get_max_aligned_size(default_stripe_size
, alignment
, &stripe_size
);
248 manifest
.set_trivial_rule(head_max_size
, stripe_size
);
250 r
= manifest_gen
.create_begin(store
->ctx(), &manifest
,
251 bucket_info
.placement_rule
,
252 &tail_placement_rule
,
253 head_obj
.bucket
, head_obj
);
258 rgw_raw_obj stripe_obj
= manifest_gen
.get_cur_obj(store
);
260 r
= writer
.set_stripe_obj(stripe_obj
);
265 set_head_chunk_size(head_max_size
);
266 // initialize the processors
267 chunk
= ChunkProcessor(&writer
, chunk_size
);
268 stripe
= StripeProcessor(&chunk
, this, head_max_size
);
272 int AtomicObjectProcessor::complete(size_t accounted_size
,
273 const std::string
& etag
,
274 ceph::real_time
*mtime
,
275 ceph::real_time set_mtime
,
276 std::map
<std::string
, bufferlist
>& attrs
,
277 ceph::real_time delete_at
,
278 const char *if_match
,
279 const char *if_nomatch
,
280 const std::string
*user_data
,
281 rgw_zone_set
*zones_trace
,
284 int r
= writer
.drain();
288 const uint64_t actual_size
= get_actual_size();
289 r
= manifest_gen
.create_next(actual_size
);
294 obj_ctx
.set_atomic(head_obj
);
296 RGWRados::Object
op_target(store
, bucket_info
, obj_ctx
, head_obj
);
298 /* some object types shouldn't be versioned, e.g., multipart parts */
299 op_target
.set_versioning_disabled(!bucket_info
.versioning_enabled());
301 RGWRados::Object::Write
obj_op(&op_target
);
303 obj_op
.meta
.data
= &first_chunk
;
304 obj_op
.meta
.manifest
= &manifest
;
305 obj_op
.meta
.ptag
= &unique_tag
; /* use req_id as operation tag */
306 obj_op
.meta
.if_match
= if_match
;
307 obj_op
.meta
.if_nomatch
= if_nomatch
;
308 obj_op
.meta
.mtime
= mtime
;
309 obj_op
.meta
.set_mtime
= set_mtime
;
310 obj_op
.meta
.owner
= owner
;
311 obj_op
.meta
.flags
= PUT_OBJ_CREATE
;
312 obj_op
.meta
.olh_epoch
= olh_epoch
;
313 obj_op
.meta
.delete_at
= delete_at
;
314 obj_op
.meta
.user_data
= user_data
;
315 obj_op
.meta
.zones_trace
= zones_trace
;
316 obj_op
.meta
.modify_tail
= true;
318 r
= obj_op
.write_meta(actual_size
, accounted_size
, attrs
);
322 if (!obj_op
.meta
.canceled
) {
323 // on success, clear the set of objects for deletion
324 writer
.clear_written();
327 *pcanceled
= obj_op
.meta
.canceled
;
333 int MultipartObjectProcessor::process_first_chunk(bufferlist
&& data
,
334 DataProcessor
**processor
)
336 // write the first chunk of the head object as part of an exclusive create,
337 // then drain to wait for the result in case of EEXIST
338 int r
= writer
.write_exclusive(data
);
340 // randomize the oid prefix and reprepare the head/manifest
341 std::string
oid_rand(32, 0);
342 gen_rand_alphanumeric(store
->ctx(), oid_rand
.data(), oid_rand
.size());
344 mp
.init(target_obj
.key
.name
, upload_id
, oid_rand
);
345 manifest
.set_prefix(target_obj
.key
.name
+ "." + oid_rand
);
351 // resubmit the write op on the new head object
352 r
= writer
.write_exclusive(data
);
357 *processor
= &stripe
;
361 int MultipartObjectProcessor::prepare_head()
363 const uint64_t default_stripe_size
= store
->ctx()->_conf
->rgw_obj_stripe_size
;
365 uint64_t stripe_size
;
368 int r
= store
->get_max_chunk_size(tail_placement_rule
, target_obj
, &chunk_size
, &alignment
);
370 ldout(store
->ctx(), 0) << "ERROR: unexpected: get_max_chunk_size(): placement_rule=" << tail_placement_rule
.to_str() << " obj=" << target_obj
<< " returned r=" << r
<< dendl
;
373 store
->get_max_aligned_size(default_stripe_size
, alignment
, &stripe_size
);
375 manifest
.set_multipart_part_rule(stripe_size
, part_num
);
377 r
= manifest_gen
.create_begin(store
->ctx(), &manifest
,
378 bucket_info
.placement_rule
,
379 &tail_placement_rule
,
380 target_obj
.bucket
, target_obj
);
385 rgw_raw_obj stripe_obj
= manifest_gen
.get_cur_obj(store
);
386 rgw_raw_obj_to_obj(head_obj
.bucket
, stripe_obj
, &head_obj
);
387 head_obj
.index_hash_source
= target_obj
.key
.name
;
389 r
= writer
.set_stripe_obj(stripe_obj
);
393 stripe_size
= manifest_gen
.cur_stripe_max_size();
395 uint64_t max_head_size
= std::min(chunk_size
, stripe_size
);
396 set_head_chunk_size(max_head_size
);
398 chunk
= ChunkProcessor(&writer
, chunk_size
);
399 stripe
= StripeProcessor(&chunk
, this, max_head_size
);
403 int MultipartObjectProcessor::prepare()
405 manifest
.set_prefix(target_obj
.key
.name
+ "." + upload_id
);
407 return prepare_head();
410 int MultipartObjectProcessor::complete(size_t accounted_size
,
411 const std::string
& etag
,
412 ceph::real_time
*mtime
,
413 ceph::real_time set_mtime
,
414 std::map
<std::string
, bufferlist
>& attrs
,
415 ceph::real_time delete_at
,
416 const char *if_match
,
417 const char *if_nomatch
,
418 const std::string
*user_data
,
419 rgw_zone_set
*zones_trace
,
422 int r
= writer
.drain();
426 const uint64_t actual_size
= get_actual_size();
427 r
= manifest_gen
.create_next(actual_size
);
432 RGWRados::Object
op_target(store
, bucket_info
, obj_ctx
, head_obj
);
433 op_target
.set_versioning_disabled(true);
434 RGWRados::Object::Write
obj_op(&op_target
);
436 obj_op
.meta
.set_mtime
= set_mtime
;
437 obj_op
.meta
.mtime
= mtime
;
438 obj_op
.meta
.owner
= owner
;
439 obj_op
.meta
.delete_at
= delete_at
;
440 obj_op
.meta
.zones_trace
= zones_trace
;
441 obj_op
.meta
.modify_tail
= true;
443 r
= obj_op
.write_meta(actual_size
, accounted_size
, attrs
);
448 RGWUploadPartInfo info
;
450 bool sorted_omap
= is_v2_upload_id(upload_id
);
454 snprintf(buf
, sizeof(buf
), "%08d", part_num
);
457 p
.append(part_num_str
);
461 info
.size
= actual_size
;
462 info
.accounted_size
= accounted_size
;
463 info
.modified
= real_clock::now();
464 info
.manifest
= manifest
;
467 r
= rgw_compression_info_from_attrset(attrs
, compressed
, info
.cs_info
);
469 ldout(store
->ctx(), 1) << "cannot get compression info" << dendl
;
476 meta_obj
.init_ns(bucket_info
.bucket
, mp
.get_meta(), RGW_OBJ_NS_MULTIPART
);
477 meta_obj
.set_in_extra_data(true);
479 rgw_raw_obj raw_meta_obj
;
481 store
->obj_to_raw(bucket_info
.placement_rule
, meta_obj
, &raw_meta_obj
);
483 auto obj_ctx
= store
->svc
.sysobj
->init_obj_ctx();
484 auto sysobj
= obj_ctx
.get_obj(raw_meta_obj
);
487 .set_must_exist(true)
493 if (!obj_op
.meta
.canceled
) {
494 // on success, clear the set of objects for deletion
495 writer
.clear_written();
498 *pcanceled
= obj_op
.meta
.canceled
;
503 int AppendObjectProcessor::process_first_chunk(bufferlist
&&data
, rgw::putobj::DataProcessor
**processor
)
505 int r
= writer
.write_exclusive(data
);
509 *processor
= &stripe
;
513 int AppendObjectProcessor::prepare()
516 int r
= store
->get_obj_state(&obj_ctx
, bucket_info
, head_obj
, &astate
);
520 cur_size
= astate
->size
;
521 *cur_accounted_size
= astate
->accounted_size
;
522 if (!astate
->exists
) {
524 ldout(store
->ctx(), 5) << "ERROR: Append position should be zero" << dendl
;
525 return -ERR_POSITION_NOT_EQUAL_TO_LENGTH
;
530 gen_rand_alphanumeric(store
->ctx(), buf
, sizeof(buf
) - 1);
531 string oid_prefix
= head_obj
.key
.name
;
532 oid_prefix
.append(".");
533 oid_prefix
.append(buf
);
534 oid_prefix
.append("_");
535 manifest
.set_prefix(oid_prefix
);
538 // check whether the object appendable
539 map
<string
, bufferlist
>::iterator iter
= astate
->attrset
.find(RGW_ATTR_APPEND_PART_NUM
);
540 if (iter
== astate
->attrset
.end()) {
541 ldout(store
->ctx(), 5) << "ERROR: The object is not appendable" << dendl
;
542 return -ERR_OBJECT_NOT_APPENDABLE
;
544 if (position
!= *cur_accounted_size
) {
545 ldout(store
->ctx(), 5) << "ERROR: Append position should be equal to the obj size" << dendl
;
546 return -ERR_POSITION_NOT_EQUAL_TO_LENGTH
;
549 decode(cur_part_num
, iter
->second
);
550 } catch (buffer::error
& err
) {
551 ldout(store
->ctx(), 5) << "ERROR: failed to decode part num" << dendl
;
555 //get the current obj etag
556 iter
= astate
->attrset
.find(RGW_ATTR_ETAG
);
557 if (iter
!= astate
->attrset
.end()) {
558 string s
= rgw_string_unquote(iter
->second
.c_str());
559 size_t pos
= s
.find("-");
560 cur_etag
= s
.substr(0, pos
);
562 cur_manifest
= &astate
->manifest
;
563 manifest
.set_prefix(cur_manifest
->get_prefix());
565 manifest
.set_multipart_part_rule(store
->ctx()->_conf
->rgw_obj_stripe_size
, cur_part_num
);
567 r
= manifest_gen
.create_begin(store
->ctx(), &manifest
, bucket_info
.placement_rule
, &tail_placement_rule
, head_obj
.bucket
, head_obj
);
571 rgw_raw_obj stripe_obj
= manifest_gen
.get_cur_obj(store
);
573 uint64_t chunk_size
= 0;
574 r
= store
->get_max_chunk_size(stripe_obj
.pool
, &chunk_size
);
578 r
= writer
.set_stripe_obj(std::move(stripe_obj
));
583 uint64_t stripe_size
= manifest_gen
.cur_stripe_max_size();
585 uint64_t max_head_size
= std::min(chunk_size
, stripe_size
);
586 set_head_chunk_size(max_head_size
);
588 // initialize the processors
589 chunk
= ChunkProcessor(&writer
, chunk_size
);
590 stripe
= StripeProcessor(&chunk
, this, stripe_size
);
595 int AppendObjectProcessor::complete(size_t accounted_size
, const string
&etag
, ceph::real_time
*mtime
,
596 ceph::real_time set_mtime
, map
<string
, bufferlist
> &attrs
,
597 ceph::real_time delete_at
, const char *if_match
, const char *if_nomatch
,
598 const string
*user_data
, rgw_zone_set
*zones_trace
, bool *pcanceled
)
600 int r
= writer
.drain();
603 const uint64_t actual_size
= get_actual_size();
604 r
= manifest_gen
.create_next(actual_size
);
608 obj_ctx
.set_atomic(head_obj
);
609 RGWRados::Object
op_target(store
, bucket_info
, obj_ctx
, head_obj
);
610 //For Append obj, disable versioning
611 op_target
.set_versioning_disabled(true);
612 RGWRados::Object::Write
obj_op(&op_target
);
614 cur_manifest
->append(manifest
, store
->svc
.zone
);
615 obj_op
.meta
.manifest
= cur_manifest
;
617 obj_op
.meta
.manifest
= &manifest
;
619 obj_op
.meta
.ptag
= &unique_tag
; /* use req_id as operation tag */
620 obj_op
.meta
.mtime
= mtime
;
621 obj_op
.meta
.set_mtime
= set_mtime
;
622 obj_op
.meta
.owner
= owner
;
623 obj_op
.meta
.flags
= PUT_OBJ_CREATE
;
624 obj_op
.meta
.delete_at
= delete_at
;
625 obj_op
.meta
.user_data
= user_data
;
626 obj_op
.meta
.zones_trace
= zones_trace
;
627 obj_op
.meta
.modify_tail
= true;
628 obj_op
.meta
.appendable
= true;
629 //Add the append part number
630 bufferlist cur_part_num_bl
;
631 encode(cur_part_num
, cur_part_num_bl
);
632 attrs
[RGW_ATTR_APPEND_PART_NUM
] = cur_part_num_bl
;
634 if (!cur_etag
.empty()) {
636 char petag
[CEPH_CRYPTO_MD5_DIGESTSIZE
];
637 char final_etag
[CEPH_CRYPTO_MD5_DIGESTSIZE
];
638 char final_etag_str
[CEPH_CRYPTO_MD5_DIGESTSIZE
* 2 + 16];
639 hex_to_buf(cur_etag
.c_str(), petag
, CEPH_CRYPTO_MD5_DIGESTSIZE
);
640 hash
.Update((const unsigned char *)petag
, sizeof(petag
));
641 hex_to_buf(etag
.c_str(), petag
, CEPH_CRYPTO_MD5_DIGESTSIZE
);
642 hash
.Update((const unsigned char *)petag
, sizeof(petag
));
643 hash
.Final((unsigned char *)final_etag
);
644 buf_to_hex((unsigned char *)final_etag
, sizeof(final_etag
), final_etag_str
);
645 snprintf(&final_etag_str
[CEPH_CRYPTO_MD5_DIGESTSIZE
* 2], sizeof(final_etag_str
) - CEPH_CRYPTO_MD5_DIGESTSIZE
* 2,
646 "-%lld", (long long)cur_part_num
);
648 etag_bl
.append(final_etag_str
, strlen(final_etag_str
) + 1);
649 attrs
[RGW_ATTR_ETAG
] = etag_bl
;
651 r
= obj_op
.write_meta(actual_size
+ cur_size
, accounted_size
+ *cur_accounted_size
, attrs
);
655 if (!obj_op
.meta
.canceled
) {
656 // on success, clear the set of objects for deletion
657 writer
.clear_written();
660 *pcanceled
= obj_op
.meta
.canceled
;
662 *cur_accounted_size
+= accounted_size
;
667 } // namespace rgw::putobj