1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
5 * Ceph - scalable distributed file system
7 * Copyright (C) 2018 Red Hat, Inc.
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
17 #include "rgw_putobj_processor.h"
18 #include "rgw_multi.h"
19 #include "rgw_compression.h"
20 #include "services/svc_sys_obj.h"
22 #define dout_subsys ceph_subsys_rgw
24 namespace rgw::putobj
{
26 int HeadObjectProcessor::process(bufferlist
&& data
, uint64_t logical_offset
)
28 const bool flush
= (data
.length() == 0);
30 // capture the first chunk for special handling
31 if (data_offset
< head_chunk_size
|| data_offset
== 0) {
33 // flush partial chunk
34 return process_first_chunk(std::move(head_data
), &processor
);
37 auto remaining
= head_chunk_size
- data_offset
;
38 auto count
= std::min
<uint64_t>(data
.length(), remaining
);
39 data
.splice(0, count
, &head_data
);
42 if (data_offset
== head_chunk_size
) {
43 // process the first complete chunk
44 ceph_assert(head_data
.length() == head_chunk_size
);
45 int r
= process_first_chunk(std::move(head_data
), &processor
);
50 if (data
.length() == 0) { // avoid flushing stripe processor
54 ceph_assert(processor
); // process_first_chunk() must initialize
56 // send everything else through the processor
57 auto write_offset
= data_offset
;
58 data_offset
+= data
.length();
59 return processor
->process(std::move(data
), write_offset
);
63 static int process_completed(const AioResultList
& completed
, RawObjSet
*written
)
65 std::optional
<int> error
;
66 for (auto& r
: completed
) {
68 written
->insert(r
.obj
.get_ref().obj
);
69 } else if (!error
) { // record first error code
73 return error
.value_or(0);
76 int RadosWriter::set_stripe_obj(const rgw_raw_obj
& raw_obj
)
78 stripe_obj
= store
->svc()->rados
->obj(raw_obj
);
79 return stripe_obj
.open();
82 int RadosWriter::process(bufferlist
&& bl
, uint64_t offset
)
84 bufferlist data
= std::move(bl
);
85 const uint64_t cost
= data
.length();
86 if (cost
== 0) { // no empty writes, use aio directly for creates
89 librados::ObjectWriteOperation op
;
93 op
.write(offset
, data
);
95 constexpr uint64_t id
= 0; // unused
96 auto c
= aio
->get(stripe_obj
, Aio::librados_op(std::move(op
), y
), cost
, id
);
97 return process_completed(c
, &written
);
100 int RadosWriter::write_exclusive(const bufferlist
& data
)
102 const uint64_t cost
= data
.length();
104 librados::ObjectWriteOperation op
;
105 op
.create(true); // exclusive create
108 constexpr uint64_t id
= 0; // unused
109 auto c
= aio
->get(stripe_obj
, Aio::librados_op(std::move(op
), y
), cost
, id
);
110 auto d
= aio
->drain();
111 c
.splice(c
.end(), d
);
112 return process_completed(c
, &written
);
115 int RadosWriter::drain()
117 return process_completed(aio
->drain(), &written
);
120 RadosWriter::~RadosWriter()
122 // wait on any outstanding aio completions
123 process_completed(aio
->drain(), &written
);
125 bool need_to_remove_head
= false;
126 std::optional
<rgw_raw_obj
> raw_head
;
127 if (!head_obj
.empty()) {
129 store
->getRados()->obj_to_raw(bucket_info
.placement_rule
, head_obj
, &*raw_head
);
133 * We should delete the object in the "multipart" namespace to avoid race condition.
134 * Such race condition is caused by the fact that the multipart object is the gatekeeper of a multipart
135 * upload, when it is deleted, a second upload would start with the same suffix("2/"), therefore, objects
136 * written by the second upload may be deleted by the first upload.
137 * details is describled on #11749
139 * The above comment still stands, but instead of searching for a specific object in the multipart
140 * namespace, we just make sure that we remove the object that is marked as the head object after
141 * we remove all the other raw objects. Note that we use different call to remove the head object,
142 * as this one needs to go via the bucket index prepare/complete 2-phase commit scheme.
144 for (const auto& obj
: written
) {
145 if (raw_head
&& obj
== *raw_head
) {
146 ldpp_dout(dpp
, 5) << "NOTE: we should not process the head object (" << obj
<< ") here" << dendl
;
147 need_to_remove_head
= true;
151 int r
= store
->getRados()->delete_raw_obj(obj
);
152 if (r
< 0 && r
!= -ENOENT
) {
153 ldpp_dout(dpp
, 5) << "WARNING: failed to remove obj (" << obj
<< "), leaked" << dendl
;
157 if (need_to_remove_head
) {
158 ldpp_dout(dpp
, 5) << "NOTE: we are going to process the head obj (" << *raw_head
<< ")" << dendl
;
159 int r
= store
->getRados()->delete_obj(obj_ctx
, bucket_info
, head_obj
, 0, 0);
160 if (r
< 0 && r
!= -ENOENT
) {
161 ldpp_dout(dpp
, 0) << "WARNING: failed to remove obj (" << *raw_head
<< "), leaked" << dendl
;
167 // advance to the next stripe
168 int ManifestObjectProcessor::next(uint64_t offset
, uint64_t *pstripe_size
)
170 // advance the manifest
171 int r
= manifest_gen
.create_next(offset
);
176 rgw_raw_obj stripe_obj
= manifest_gen
.get_cur_obj(store
->getRados());
178 uint64_t chunk_size
= 0;
179 r
= store
->getRados()->get_max_chunk_size(stripe_obj
.pool
, &chunk_size
);
183 r
= writer
.set_stripe_obj(stripe_obj
);
188 chunk
= ChunkProcessor(&writer
, chunk_size
);
189 *pstripe_size
= manifest_gen
.cur_stripe_max_size();
195 int AtomicObjectProcessor::process_first_chunk(bufferlist
&& data
,
196 DataProcessor
**processor
)
198 first_chunk
= std::move(data
);
199 *processor
= &stripe
;
203 int AtomicObjectProcessor::prepare(optional_yield y
)
205 uint64_t max_head_chunk_size
;
206 uint64_t head_max_size
;
207 uint64_t chunk_size
= 0;
211 if (!store
->getRados()->get_obj_data_pool(bucket_info
.placement_rule
, head_obj
, &head_pool
)) {
215 int r
= store
->getRados()->get_max_chunk_size(head_pool
, &max_head_chunk_size
, &alignment
);
220 bool same_pool
= true;
222 if (bucket_info
.placement_rule
!= tail_placement_rule
) {
224 if (!store
->getRados()->get_obj_data_pool(tail_placement_rule
, head_obj
, &tail_pool
)) {
228 if (tail_pool
!= head_pool
) {
231 r
= store
->getRados()->get_max_chunk_size(tail_pool
, &chunk_size
);
241 head_max_size
= max_head_chunk_size
;
242 chunk_size
= max_head_chunk_size
;
245 uint64_t stripe_size
;
246 const uint64_t default_stripe_size
= store
->ctx()->_conf
->rgw_obj_stripe_size
;
248 store
->getRados()->get_max_aligned_size(default_stripe_size
, alignment
, &stripe_size
);
250 manifest
.set_trivial_rule(head_max_size
, stripe_size
);
252 r
= manifest_gen
.create_begin(store
->ctx(), &manifest
,
253 bucket_info
.placement_rule
,
254 &tail_placement_rule
,
255 head_obj
.bucket
, head_obj
);
260 rgw_raw_obj stripe_obj
= manifest_gen
.get_cur_obj(store
->getRados());
262 r
= writer
.set_stripe_obj(stripe_obj
);
267 set_head_chunk_size(head_max_size
);
268 // initialize the processors
269 chunk
= ChunkProcessor(&writer
, chunk_size
);
270 stripe
= StripeProcessor(&chunk
, this, head_max_size
);
274 int AtomicObjectProcessor::complete(size_t accounted_size
,
275 const std::string
& etag
,
276 ceph::real_time
*mtime
,
277 ceph::real_time set_mtime
,
278 std::map
<std::string
, bufferlist
>& attrs
,
279 ceph::real_time delete_at
,
280 const char *if_match
,
281 const char *if_nomatch
,
282 const std::string
*user_data
,
283 rgw_zone_set
*zones_trace
,
284 bool *pcanceled
, optional_yield y
)
286 int r
= writer
.drain();
290 const uint64_t actual_size
= get_actual_size();
291 r
= manifest_gen
.create_next(actual_size
);
296 obj_ctx
.set_atomic(head_obj
);
298 RGWRados::Object
op_target(store
->getRados(), bucket_info
, obj_ctx
, head_obj
);
300 /* some object types shouldn't be versioned, e.g., multipart parts */
301 op_target
.set_versioning_disabled(!bucket_info
.versioning_enabled());
303 RGWRados::Object::Write
obj_op(&op_target
);
305 obj_op
.meta
.data
= &first_chunk
;
306 obj_op
.meta
.manifest
= &manifest
;
307 obj_op
.meta
.ptag
= &unique_tag
; /* use req_id as operation tag */
308 obj_op
.meta
.if_match
= if_match
;
309 obj_op
.meta
.if_nomatch
= if_nomatch
;
310 obj_op
.meta
.mtime
= mtime
;
311 obj_op
.meta
.set_mtime
= set_mtime
;
312 obj_op
.meta
.owner
= owner
;
313 obj_op
.meta
.flags
= PUT_OBJ_CREATE
;
314 obj_op
.meta
.olh_epoch
= olh_epoch
;
315 obj_op
.meta
.delete_at
= delete_at
;
316 obj_op
.meta
.user_data
= user_data
;
317 obj_op
.meta
.zones_trace
= zones_trace
;
318 obj_op
.meta
.modify_tail
= true;
320 r
= obj_op
.write_meta(actual_size
, accounted_size
, attrs
, y
);
324 if (!obj_op
.meta
.canceled
) {
325 // on success, clear the set of objects for deletion
326 writer
.clear_written();
329 *pcanceled
= obj_op
.meta
.canceled
;
335 int MultipartObjectProcessor::process_first_chunk(bufferlist
&& data
,
336 DataProcessor
**processor
)
338 // write the first chunk of the head object as part of an exclusive create,
339 // then drain to wait for the result in case of EEXIST
340 int r
= writer
.write_exclusive(data
);
342 // randomize the oid prefix and reprepare the head/manifest
343 std::string
oid_rand(32, 0);
344 gen_rand_alphanumeric(store
->ctx(), oid_rand
.data(), oid_rand
.size());
346 mp
.init(target_obj
.key
.name
, upload_id
, oid_rand
);
347 manifest
.set_prefix(target_obj
.key
.name
+ "." + oid_rand
);
353 // resubmit the write op on the new head object
354 r
= writer
.write_exclusive(data
);
359 *processor
= &stripe
;
363 int MultipartObjectProcessor::prepare_head()
365 const uint64_t default_stripe_size
= store
->ctx()->_conf
->rgw_obj_stripe_size
;
367 uint64_t stripe_size
;
370 int r
= store
->getRados()->get_max_chunk_size(tail_placement_rule
, target_obj
, &chunk_size
, &alignment
);
372 ldpp_dout(dpp
, 0) << "ERROR: unexpected: get_max_chunk_size(): placement_rule=" << tail_placement_rule
.to_str() << " obj=" << target_obj
<< " returned r=" << r
<< dendl
;
375 store
->getRados()->get_max_aligned_size(default_stripe_size
, alignment
, &stripe_size
);
377 manifest
.set_multipart_part_rule(stripe_size
, part_num
);
379 r
= manifest_gen
.create_begin(store
->ctx(), &manifest
,
380 bucket_info
.placement_rule
,
381 &tail_placement_rule
,
382 target_obj
.bucket
, target_obj
);
387 rgw_raw_obj stripe_obj
= manifest_gen
.get_cur_obj(store
->getRados());
388 RGWSI_Tier_RADOS::raw_obj_to_obj(head_obj
.bucket
, stripe_obj
, &head_obj
);
389 head_obj
.index_hash_source
= target_obj
.key
.name
;
391 r
= writer
.set_stripe_obj(stripe_obj
);
395 stripe_size
= manifest_gen
.cur_stripe_max_size();
396 set_head_chunk_size(stripe_size
);
398 chunk
= ChunkProcessor(&writer
, chunk_size
);
399 stripe
= StripeProcessor(&chunk
, this, stripe_size
);
403 int MultipartObjectProcessor::prepare(optional_yield y
)
405 manifest
.set_prefix(target_obj
.key
.name
+ "." + upload_id
);
407 return prepare_head();
410 int MultipartObjectProcessor::complete(size_t accounted_size
,
411 const std::string
& etag
,
412 ceph::real_time
*mtime
,
413 ceph::real_time set_mtime
,
414 std::map
<std::string
, bufferlist
>& attrs
,
415 ceph::real_time delete_at
,
416 const char *if_match
,
417 const char *if_nomatch
,
418 const std::string
*user_data
,
419 rgw_zone_set
*zones_trace
,
420 bool *pcanceled
, optional_yield y
)
422 int r
= writer
.drain();
426 const uint64_t actual_size
= get_actual_size();
427 r
= manifest_gen
.create_next(actual_size
);
432 RGWRados::Object
op_target(store
->getRados(), bucket_info
, obj_ctx
, head_obj
);
433 op_target
.set_versioning_disabled(true);
434 RGWRados::Object::Write
obj_op(&op_target
);
436 obj_op
.meta
.set_mtime
= set_mtime
;
437 obj_op
.meta
.mtime
= mtime
;
438 obj_op
.meta
.owner
= owner
;
439 obj_op
.meta
.delete_at
= delete_at
;
440 obj_op
.meta
.zones_trace
= zones_trace
;
441 obj_op
.meta
.modify_tail
= true;
443 r
= obj_op
.write_meta(actual_size
, accounted_size
, attrs
, y
);
448 RGWUploadPartInfo info
;
450 bool sorted_omap
= is_v2_upload_id(upload_id
);
454 snprintf(buf
, sizeof(buf
), "%08d", part_num
);
457 p
.append(part_num_str
);
461 info
.size
= actual_size
;
462 info
.accounted_size
= accounted_size
;
463 info
.modified
= real_clock::now();
464 info
.manifest
= manifest
;
467 r
= rgw_compression_info_from_attrset(attrs
, compressed
, info
.cs_info
);
469 ldpp_dout(dpp
, 1) << "cannot get compression info" << dendl
;
476 meta_obj
.init_ns(bucket_info
.bucket
, mp
.get_meta(), RGW_OBJ_NS_MULTIPART
);
477 meta_obj
.set_in_extra_data(true);
479 rgw_raw_obj raw_meta_obj
;
481 store
->getRados()->obj_to_raw(bucket_info
.placement_rule
, meta_obj
, &raw_meta_obj
);
483 auto obj_ctx
= store
->svc()->sysobj
->init_obj_ctx();
484 auto sysobj
= obj_ctx
.get_obj(raw_meta_obj
);
487 .set_must_exist(true)
488 .set(p
, bl
, null_yield
);
490 return r
== -ENOENT
? -ERR_NO_SUCH_UPLOAD
: r
;
493 if (!obj_op
.meta
.canceled
) {
494 // on success, clear the set of objects for deletion
495 writer
.clear_written();
498 *pcanceled
= obj_op
.meta
.canceled
;
503 int AppendObjectProcessor::process_first_chunk(bufferlist
&&data
, rgw::putobj::DataProcessor
**processor
)
505 int r
= writer
.write_exclusive(data
);
509 *processor
= &stripe
;
513 int AppendObjectProcessor::prepare(optional_yield y
)
516 int r
= store
->getRados()->get_obj_state(&obj_ctx
, bucket_info
, head_obj
, &astate
, y
);
520 cur_size
= astate
->size
;
521 *cur_accounted_size
= astate
->accounted_size
;
522 if (!astate
->exists
) {
524 ldpp_dout(dpp
, 5) << "ERROR: Append position should be zero" << dendl
;
525 return -ERR_POSITION_NOT_EQUAL_TO_LENGTH
;
530 gen_rand_alphanumeric(store
->ctx(), buf
, sizeof(buf
) - 1);
531 string oid_prefix
= head_obj
.key
.name
;
532 oid_prefix
.append(".");
533 oid_prefix
.append(buf
);
534 oid_prefix
.append("_");
535 manifest
.set_prefix(oid_prefix
);
538 // check whether the object appendable
539 map
<string
, bufferlist
>::iterator iter
= astate
->attrset
.find(RGW_ATTR_APPEND_PART_NUM
);
540 if (iter
== astate
->attrset
.end()) {
541 ldpp_dout(dpp
, 5) << "ERROR: The object is not appendable" << dendl
;
542 return -ERR_OBJECT_NOT_APPENDABLE
;
544 if (position
!= *cur_accounted_size
) {
545 ldpp_dout(dpp
, 5) << "ERROR: Append position should be equal to the obj size" << dendl
;
546 return -ERR_POSITION_NOT_EQUAL_TO_LENGTH
;
549 decode(cur_part_num
, iter
->second
);
550 } catch (buffer::error
& err
) {
551 ldpp_dout(dpp
, 5) << "ERROR: failed to decode part num" << dendl
;
555 //get the current obj etag
556 iter
= astate
->attrset
.find(RGW_ATTR_ETAG
);
557 if (iter
!= astate
->attrset
.end()) {
558 string s
= rgw_string_unquote(iter
->second
.c_str());
559 size_t pos
= s
.find("-");
560 cur_etag
= s
.substr(0, pos
);
563 iter
= astate
->attrset
.find(RGW_ATTR_STORAGE_CLASS
);
564 if (iter
!= astate
->attrset
.end()) {
565 tail_placement_rule
.storage_class
= iter
->second
.to_str();
567 cur_manifest
= &(*astate
->manifest
);
568 manifest
.set_prefix(cur_manifest
->get_prefix());
569 astate
->keep_tail
= true;
571 manifest
.set_multipart_part_rule(store
->ctx()->_conf
->rgw_obj_stripe_size
, cur_part_num
);
573 r
= manifest_gen
.create_begin(store
->ctx(), &manifest
, bucket_info
.placement_rule
, &tail_placement_rule
, head_obj
.bucket
, head_obj
);
577 rgw_raw_obj stripe_obj
= manifest_gen
.get_cur_obj(store
->getRados());
579 uint64_t chunk_size
= 0;
580 r
= store
->getRados()->get_max_chunk_size(stripe_obj
.pool
, &chunk_size
);
584 r
= writer
.set_stripe_obj(std::move(stripe_obj
));
589 uint64_t stripe_size
= manifest_gen
.cur_stripe_max_size();
591 uint64_t max_head_size
= std::min(chunk_size
, stripe_size
);
592 set_head_chunk_size(max_head_size
);
594 // initialize the processors
595 chunk
= ChunkProcessor(&writer
, chunk_size
);
596 stripe
= StripeProcessor(&chunk
, this, stripe_size
);
601 int AppendObjectProcessor::complete(size_t accounted_size
, const string
&etag
, ceph::real_time
*mtime
,
602 ceph::real_time set_mtime
, map
<string
, bufferlist
> &attrs
,
603 ceph::real_time delete_at
, const char *if_match
, const char *if_nomatch
,
604 const string
*user_data
, rgw_zone_set
*zones_trace
, bool *pcanceled
,
607 int r
= writer
.drain();
610 const uint64_t actual_size
= get_actual_size();
611 r
= manifest_gen
.create_next(actual_size
);
615 obj_ctx
.set_atomic(head_obj
);
616 RGWRados::Object
op_target(store
->getRados(), bucket_info
, obj_ctx
, head_obj
);
617 //For Append obj, disable versioning
618 op_target
.set_versioning_disabled(true);
619 RGWRados::Object::Write
obj_op(&op_target
);
621 cur_manifest
->append(manifest
, store
->svc()->zone
);
622 obj_op
.meta
.manifest
= cur_manifest
;
624 obj_op
.meta
.manifest
= &manifest
;
626 obj_op
.meta
.ptag
= &unique_tag
; /* use req_id as operation tag */
627 obj_op
.meta
.mtime
= mtime
;
628 obj_op
.meta
.set_mtime
= set_mtime
;
629 obj_op
.meta
.owner
= owner
;
630 obj_op
.meta
.flags
= PUT_OBJ_CREATE
;
631 obj_op
.meta
.delete_at
= delete_at
;
632 obj_op
.meta
.user_data
= user_data
;
633 obj_op
.meta
.zones_trace
= zones_trace
;
634 obj_op
.meta
.modify_tail
= true;
635 obj_op
.meta
.appendable
= true;
636 //Add the append part number
637 bufferlist cur_part_num_bl
;
638 encode(cur_part_num
, cur_part_num_bl
);
639 attrs
[RGW_ATTR_APPEND_PART_NUM
] = cur_part_num_bl
;
641 if (!cur_etag
.empty()) {
643 char petag
[CEPH_CRYPTO_MD5_DIGESTSIZE
];
644 char final_etag
[CEPH_CRYPTO_MD5_DIGESTSIZE
];
645 char final_etag_str
[CEPH_CRYPTO_MD5_DIGESTSIZE
* 2 + 16];
646 hex_to_buf(cur_etag
.c_str(), petag
, CEPH_CRYPTO_MD5_DIGESTSIZE
);
647 hash
.Update((const unsigned char *)petag
, sizeof(petag
));
648 hex_to_buf(etag
.c_str(), petag
, CEPH_CRYPTO_MD5_DIGESTSIZE
);
649 hash
.Update((const unsigned char *)petag
, sizeof(petag
));
650 hash
.Final((unsigned char *)final_etag
);
651 buf_to_hex((unsigned char *)final_etag
, sizeof(final_etag
), final_etag_str
);
652 snprintf(&final_etag_str
[CEPH_CRYPTO_MD5_DIGESTSIZE
* 2], sizeof(final_etag_str
) - CEPH_CRYPTO_MD5_DIGESTSIZE
* 2,
653 "-%lld", (long long)cur_part_num
);
655 etag_bl
.append(final_etag_str
, strlen(final_etag_str
) + 1);
656 attrs
[RGW_ATTR_ETAG
] = etag_bl
;
658 r
= obj_op
.write_meta(actual_size
+ cur_size
, accounted_size
+ *cur_accounted_size
, attrs
, y
);
662 if (!obj_op
.meta
.canceled
) {
663 // on success, clear the set of objects for deletion
664 writer
.clear_written();
667 *pcanceled
= obj_op
.meta
.canceled
;
669 *cur_accounted_size
+= accounted_size
;
674 } // namespace rgw::putobj