1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
5 * Ceph - scalable distributed file system
7 * Copyright (C) 2018 Red Hat, Inc.
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
17 #include "rgw_putobj_processor.h"
18 #include "rgw_multi.h"
19 #include "rgw_compression.h"
20 #include "services/svc_sys_obj.h"
21 #include "rgw_sal_rados.h"
23 #define dout_subsys ceph_subsys_rgw
27 namespace rgw::putobj
{
29 int HeadObjectProcessor::process(bufferlist
&& data
, uint64_t logical_offset
)
31 const bool flush
= (data
.length() == 0);
33 // capture the first chunk for special handling
34 if (data_offset
< head_chunk_size
|| data_offset
== 0) {
36 // flush partial chunk
37 return process_first_chunk(std::move(head_data
), &processor
);
40 auto remaining
= head_chunk_size
- data_offset
;
41 auto count
= std::min
<uint64_t>(data
.length(), remaining
);
42 data
.splice(0, count
, &head_data
);
45 if (data_offset
== head_chunk_size
) {
46 // process the first complete chunk
47 ceph_assert(head_data
.length() == head_chunk_size
);
48 int r
= process_first_chunk(std::move(head_data
), &processor
);
53 if (data
.length() == 0) { // avoid flushing stripe processor
57 ceph_assert(processor
); // process_first_chunk() must initialize
59 // send everything else through the processor
60 auto write_offset
= data_offset
;
61 data_offset
+= data
.length();
62 return processor
->process(std::move(data
), write_offset
);
66 static int process_completed(const AioResultList
& completed
, RawObjSet
*written
)
68 std::optional
<int> error
;
69 for (auto& r
: completed
) {
71 written
->insert(r
.obj
.get_ref().obj
);
72 } else if (!error
) { // record first error code
76 return error
.value_or(0);
79 void RadosWriter::add_write_hint(librados::ObjectWriteOperation
& op
) {
80 const rgw_obj obj
= head_obj
->get_obj();
81 const RGWObjState
*obj_state
= obj_ctx
.get_state(obj
);
82 const bool compressed
= obj_state
->compressed
;
83 uint32_t alloc_hint_flags
= 0;
85 alloc_hint_flags
|= librados::ALLOC_HINT_FLAG_INCOMPRESSIBLE
;
88 op
.set_alloc_hint2(0, 0, alloc_hint_flags
);
91 int RadosWriter::set_stripe_obj(const rgw_raw_obj
& raw_obj
)
93 stripe_obj
= store
->svc()->rados
->obj(raw_obj
);
94 return stripe_obj
.open(dpp
);
97 int RadosWriter::process(bufferlist
&& bl
, uint64_t offset
)
99 bufferlist data
= std::move(bl
);
100 const uint64_t cost
= data
.length();
101 if (cost
== 0) { // no empty writes, use aio directly for creates
104 librados::ObjectWriteOperation op
;
109 op
.write(offset
, data
);
111 constexpr uint64_t id
= 0; // unused
112 auto c
= aio
->get(stripe_obj
, Aio::librados_op(std::move(op
), y
), cost
, id
);
113 return process_completed(c
, &written
);
116 int RadosWriter::write_exclusive(const bufferlist
& data
)
118 const uint64_t cost
= data
.length();
120 librados::ObjectWriteOperation op
;
121 op
.create(true); // exclusive create
125 constexpr uint64_t id
= 0; // unused
126 auto c
= aio
->get(stripe_obj
, Aio::librados_op(std::move(op
), y
), cost
, id
);
127 auto d
= aio
->drain();
128 c
.splice(c
.end(), d
);
129 return process_completed(c
, &written
);
132 int RadosWriter::drain()
134 return process_completed(aio
->drain(), &written
);
137 RadosWriter::~RadosWriter()
139 // wait on any outstanding aio completions
140 process_completed(aio
->drain(), &written
);
142 bool need_to_remove_head
= false;
143 std::optional
<rgw_raw_obj
> raw_head
;
144 if (!rgw::sal::Object::empty(head_obj
.get())) {
146 rgw::sal::RadosObject
* obj
= dynamic_cast<rgw::sal::RadosObject
*>(head_obj
.get());
147 obj
->get_raw_obj(&*raw_head
);
151 * We should delete the object in the "multipart" namespace to avoid race condition.
152 * Such race condition is caused by the fact that the multipart object is the gatekeeper of a multipart
153 * upload, when it is deleted, a second upload would start with the same suffix("2/"), therefore, objects
154 * written by the second upload may be deleted by the first upload.
155 * details is describled on #11749
157 * The above comment still stands, but instead of searching for a specific object in the multipart
158 * namespace, we just make sure that we remove the object that is marked as the head object after
159 * we remove all the other raw objects. Note that we use different call to remove the head object,
160 * as this one needs to go via the bucket index prepare/complete 2-phase commit scheme.
162 for (const auto& obj
: written
) {
163 if (raw_head
&& obj
== *raw_head
) {
164 ldpp_dout(dpp
, 5) << "NOTE: we should not process the head object (" << obj
<< ") here" << dendl
;
165 need_to_remove_head
= true;
169 int r
= store
->delete_raw_obj(dpp
, obj
);
170 if (r
< 0 && r
!= -ENOENT
) {
171 ldpp_dout(dpp
, 0) << "WARNING: failed to remove obj (" << obj
<< "), leaked" << dendl
;
175 if (need_to_remove_head
) {
176 std::string version_id
;
177 ldpp_dout(dpp
, 5) << "NOTE: we are going to process the head obj (" << *raw_head
<< ")" << dendl
;
178 int r
= head_obj
->delete_object(dpp
, &obj_ctx
, null_yield
);
179 if (r
< 0 && r
!= -ENOENT
) {
180 ldpp_dout(dpp
, 0) << "WARNING: failed to remove obj (" << *raw_head
<< "), leaked" << dendl
;
186 // advance to the next stripe
187 int ManifestObjectProcessor::next(uint64_t offset
, uint64_t *pstripe_size
)
189 // advance the manifest
190 int r
= manifest_gen
.create_next(offset
);
195 rgw_raw_obj stripe_obj
= manifest_gen
.get_cur_obj(store
);
197 uint64_t chunk_size
= 0;
198 r
= store
->get_raw_chunk_size(dpp
, stripe_obj
, &chunk_size
);
202 r
= writer
.set_stripe_obj(stripe_obj
);
207 chunk
= ChunkProcessor(&writer
, chunk_size
);
208 *pstripe_size
= manifest_gen
.cur_stripe_max_size();
214 int AtomicObjectProcessor::process_first_chunk(bufferlist
&& data
,
215 DataProcessor
**processor
)
217 first_chunk
= std::move(data
);
218 *processor
= &stripe
;
222 int AtomicObjectProcessor::prepare(optional_yield y
)
224 uint64_t max_head_chunk_size
;
225 uint64_t head_max_size
;
226 uint64_t chunk_size
= 0;
229 int r
= dynamic_cast<rgw::sal::RadosObject
*>(head_obj
.get())->get_max_chunk_size(
230 dpp
, head_obj
->get_bucket()->get_placement_rule(),
231 &max_head_chunk_size
, &alignment
);
236 bool same_pool
= true;
237 if (head_obj
->get_bucket()->get_placement_rule() != tail_placement_rule
) {
238 if (!head_obj
->placement_rules_match(head_obj
->get_bucket()->get_placement_rule(), tail_placement_rule
)) {
240 r
= dynamic_cast<rgw::sal::RadosObject
*>(head_obj
.get())->get_max_chunk_size(dpp
, tail_placement_rule
, &chunk_size
);
249 head_max_size
= max_head_chunk_size
;
250 chunk_size
= max_head_chunk_size
;
253 uint64_t stripe_size
;
254 const uint64_t default_stripe_size
= store
->ctx()->_conf
->rgw_obj_stripe_size
;
256 dynamic_cast<rgw::sal::RadosObject
*>(head_obj
.get())->get_max_aligned_size(
257 default_stripe_size
, alignment
, &stripe_size
);
259 manifest
.set_trivial_rule(head_max_size
, stripe_size
);
261 rgw_obj obj
= head_obj
->get_obj();
263 r
= manifest_gen
.create_begin(store
->ctx(), &manifest
,
264 head_obj
->get_bucket()->get_placement_rule(),
265 &tail_placement_rule
,
271 rgw_raw_obj stripe_obj
= manifest_gen
.get_cur_obj(store
);
273 r
= writer
.set_stripe_obj(stripe_obj
);
278 set_head_chunk_size(head_max_size
);
279 // initialize the processors
280 chunk
= ChunkProcessor(&writer
, chunk_size
);
281 stripe
= StripeProcessor(&chunk
, this, head_max_size
);
285 int AtomicObjectProcessor::complete(size_t accounted_size
,
286 const std::string
& etag
,
287 ceph::real_time
*mtime
,
288 ceph::real_time set_mtime
,
289 rgw::sal::Attrs
& attrs
,
290 ceph::real_time delete_at
,
291 const char *if_match
,
292 const char *if_nomatch
,
293 const std::string
*user_data
,
294 rgw_zone_set
*zones_trace
,
295 bool *pcanceled
, optional_yield y
)
297 int r
= writer
.drain();
301 const uint64_t actual_size
= get_actual_size();
302 r
= manifest_gen
.create_next(actual_size
);
307 head_obj
->set_atomic(&obj_ctx
);
309 RGWRados::Object
op_target(store
->getRados(),
310 head_obj
->get_bucket()->get_info(),
311 obj_ctx
, head_obj
->get_obj());
312 RGWRados::Object::Write
obj_op(&op_target
);
314 /* some object types shouldn't be versioned, e.g., multipart parts */
315 op_target
.set_versioning_disabled(!head_obj
->get_bucket()->versioning_enabled());
316 obj_op
.meta
.data
= &first_chunk
;
317 obj_op
.meta
.manifest
= &manifest
;
318 obj_op
.meta
.ptag
= &unique_tag
; /* use req_id as operation tag */
319 obj_op
.meta
.if_match
= if_match
;
320 obj_op
.meta
.if_nomatch
= if_nomatch
;
321 obj_op
.meta
.mtime
= mtime
;
322 obj_op
.meta
.set_mtime
= set_mtime
;
323 obj_op
.meta
.owner
= owner
;
324 obj_op
.meta
.flags
= PUT_OBJ_CREATE
;
325 obj_op
.meta
.olh_epoch
= olh_epoch
;
326 obj_op
.meta
.delete_at
= delete_at
;
327 obj_op
.meta
.user_data
= user_data
;
328 obj_op
.meta
.zones_trace
= zones_trace
;
329 obj_op
.meta
.modify_tail
= true;
331 r
= obj_op
.write_meta(dpp
, actual_size
, accounted_size
, attrs
, y
);
335 if (!obj_op
.meta
.canceled
) {
336 // on success, clear the set of objects for deletion
337 writer
.clear_written();
340 *pcanceled
= obj_op
.meta
.canceled
;
346 int MultipartObjectProcessor::process_first_chunk(bufferlist
&& data
,
347 DataProcessor
**processor
)
349 // write the first chunk of the head object as part of an exclusive create,
350 // then drain to wait for the result in case of EEXIST
351 int r
= writer
.write_exclusive(data
);
353 // randomize the oid prefix and reprepare the head/manifest
354 std::string oid_rand
= gen_rand_alphanumeric(store
->ctx(), 32);
356 mp
.init(target_obj
->get_name(), upload_id
, oid_rand
);
357 manifest
.set_prefix(target_obj
->get_name() + "." + oid_rand
);
363 // resubmit the write op on the new head object
364 r
= writer
.write_exclusive(data
);
369 *processor
= &stripe
;
373 int MultipartObjectProcessor::prepare_head()
375 const uint64_t default_stripe_size
= store
->ctx()->_conf
->rgw_obj_stripe_size
;
377 uint64_t stripe_size
;
380 int r
= dynamic_cast<rgw::sal::RadosObject
*>(target_obj
.get())->get_max_chunk_size(dpp
,
381 tail_placement_rule
, &chunk_size
, &alignment
);
383 ldpp_dout(dpp
, 0) << "ERROR: unexpected: get_max_chunk_size(): placement_rule=" << tail_placement_rule
.to_str() << " obj=" << target_obj
<< " returned r=" << r
<< dendl
;
386 dynamic_cast<rgw::sal::RadosObject
*>(target_obj
.get())->get_max_aligned_size(
387 default_stripe_size
, alignment
, &stripe_size
);
389 manifest
.set_multipart_part_rule(stripe_size
, part_num
);
391 r
= manifest_gen
.create_begin(store
->ctx(), &manifest
,
392 head_obj
->get_bucket()->get_placement_rule(),
393 &tail_placement_rule
,
394 target_obj
->get_bucket()->get_key(),
395 target_obj
->get_obj());
400 rgw_raw_obj stripe_obj
= manifest_gen
.get_cur_obj(store
);
401 dynamic_cast<rgw::sal::RadosObject
*>(head_obj
.get())->raw_obj_to_obj(stripe_obj
);
402 head_obj
->set_hash_source(target_obj
->get_name());
404 r
= writer
.set_stripe_obj(stripe_obj
);
408 stripe_size
= manifest_gen
.cur_stripe_max_size();
409 set_head_chunk_size(stripe_size
);
411 chunk
= ChunkProcessor(&writer
, chunk_size
);
412 stripe
= StripeProcessor(&chunk
, this, stripe_size
);
416 int MultipartObjectProcessor::prepare(optional_yield y
)
418 manifest
.set_prefix(target_obj
->get_name() + "." + upload_id
);
420 return prepare_head();
423 int MultipartObjectProcessor::complete(size_t accounted_size
,
424 const std::string
& etag
,
425 ceph::real_time
*mtime
,
426 ceph::real_time set_mtime
,
427 std::map
<std::string
, bufferlist
>& attrs
,
428 ceph::real_time delete_at
,
429 const char *if_match
,
430 const char *if_nomatch
,
431 const std::string
*user_data
,
432 rgw_zone_set
*zones_trace
,
433 bool *pcanceled
, optional_yield y
)
435 int r
= writer
.drain();
439 const uint64_t actual_size
= get_actual_size();
440 r
= manifest_gen
.create_next(actual_size
);
445 RGWRados::Object
op_target(store
->getRados(),
446 head_obj
->get_bucket()->get_info(),
447 obj_ctx
, head_obj
->get_obj());
448 RGWRados::Object::Write
obj_op(&op_target
);
450 op_target
.set_versioning_disabled(true);
451 op_target
.set_meta_placement_rule(&tail_placement_rule
);
452 obj_op
.meta
.set_mtime
= set_mtime
;
453 obj_op
.meta
.mtime
= mtime
;
454 obj_op
.meta
.owner
= owner
;
455 obj_op
.meta
.delete_at
= delete_at
;
456 obj_op
.meta
.zones_trace
= zones_trace
;
457 obj_op
.meta
.modify_tail
= true;
459 r
= obj_op
.write_meta(dpp
, actual_size
, accounted_size
, attrs
, y
);
464 RGWUploadPartInfo info
;
466 bool sorted_omap
= is_v2_upload_id(upload_id
);
470 snprintf(buf
, sizeof(buf
), "%08d", part_num
);
473 p
.append(part_num_str
);
477 info
.size
= actual_size
;
478 info
.accounted_size
= accounted_size
;
479 info
.modified
= real_clock::now();
480 info
.manifest
= manifest
;
483 r
= rgw_compression_info_from_attrset(attrs
, compressed
, info
.cs_info
);
485 ldpp_dout(dpp
, 1) << "cannot get compression info" << dendl
;
491 std::unique_ptr
<rgw::sal::Object
> meta_obj
=
492 head_obj
->get_bucket()->get_object(rgw_obj_key(mp
.get_meta(), std::string(), RGW_OBJ_NS_MULTIPART
));
493 meta_obj
->set_in_extra_data(true);
495 r
= meta_obj
->omap_set_val_by_key(dpp
, p
, bl
, true, null_yield
);
497 return r
== -ENOENT
? -ERR_NO_SUCH_UPLOAD
: r
;
500 if (!obj_op
.meta
.canceled
) {
501 // on success, clear the set of objects for deletion
502 writer
.clear_written();
505 *pcanceled
= obj_op
.meta
.canceled
;
510 int AppendObjectProcessor::process_first_chunk(bufferlist
&&data
, rgw::sal::DataProcessor
**processor
)
512 int r
= writer
.write_exclusive(data
);
516 *processor
= &stripe
;
520 int AppendObjectProcessor::prepare(optional_yield y
)
523 int r
= head_obj
->get_obj_state(dpp
, &obj_ctx
, &astate
, y
);
527 cur_size
= astate
->size
;
528 *cur_accounted_size
= astate
->accounted_size
;
529 if (!astate
->exists
) {
531 ldpp_dout(dpp
, 5) << "ERROR: Append position should be zero" << dendl
;
532 return -ERR_POSITION_NOT_EQUAL_TO_LENGTH
;
537 gen_rand_alphanumeric(store
->ctx(), buf
, sizeof(buf
) - 1);
538 string oid_prefix
= head_obj
->get_name();
539 oid_prefix
.append(".");
540 oid_prefix
.append(buf
);
541 oid_prefix
.append("_");
542 manifest
.set_prefix(oid_prefix
);
545 // check whether the object appendable
546 map
<string
, bufferlist
>::iterator iter
= astate
->attrset
.find(RGW_ATTR_APPEND_PART_NUM
);
547 if (iter
== astate
->attrset
.end()) {
548 ldpp_dout(dpp
, 5) << "ERROR: The object is not appendable" << dendl
;
549 return -ERR_OBJECT_NOT_APPENDABLE
;
551 if (position
!= *cur_accounted_size
) {
552 ldpp_dout(dpp
, 5) << "ERROR: Append position should be equal to the obj size" << dendl
;
553 return -ERR_POSITION_NOT_EQUAL_TO_LENGTH
;
557 decode(cur_part_num
, iter
->second
);
558 } catch (buffer::error
& err
) {
559 ldpp_dout(dpp
, 5) << "ERROR: failed to decode part num" << dendl
;
563 //get the current obj etag
564 iter
= astate
->attrset
.find(RGW_ATTR_ETAG
);
565 if (iter
!= astate
->attrset
.end()) {
566 string s
= rgw_string_unquote(iter
->second
.c_str());
567 size_t pos
= s
.find("-");
568 cur_etag
= s
.substr(0, pos
);
571 iter
= astate
->attrset
.find(RGW_ATTR_STORAGE_CLASS
);
572 if (iter
!= astate
->attrset
.end()) {
573 tail_placement_rule
.storage_class
= iter
->second
.to_str();
575 cur_manifest
= &(*astate
->manifest
);
576 manifest
.set_prefix(cur_manifest
->get_prefix());
577 astate
->keep_tail
= true;
579 manifest
.set_multipart_part_rule(store
->ctx()->_conf
->rgw_obj_stripe_size
, cur_part_num
);
581 rgw_obj obj
= head_obj
->get_obj();
583 r
= manifest_gen
.create_begin(store
->ctx(), &manifest
, head_obj
->get_bucket()->get_placement_rule(), &tail_placement_rule
, obj
.bucket
, obj
);
587 rgw_raw_obj stripe_obj
= manifest_gen
.get_cur_obj(store
);
589 uint64_t chunk_size
= 0;
590 r
= store
->get_raw_chunk_size(dpp
, stripe_obj
, &chunk_size
);
594 r
= writer
.set_stripe_obj(std::move(stripe_obj
));
599 uint64_t stripe_size
= manifest_gen
.cur_stripe_max_size();
601 uint64_t max_head_size
= std::min(chunk_size
, stripe_size
);
602 set_head_chunk_size(max_head_size
);
604 // initialize the processors
605 chunk
= ChunkProcessor(&writer
, chunk_size
);
606 stripe
= StripeProcessor(&chunk
, this, stripe_size
);
611 int AppendObjectProcessor::complete(size_t accounted_size
, const string
&etag
, ceph::real_time
*mtime
,
612 ceph::real_time set_mtime
, rgw::sal::Attrs
& attrs
,
613 ceph::real_time delete_at
, const char *if_match
, const char *if_nomatch
,
614 const string
*user_data
, rgw_zone_set
*zones_trace
, bool *pcanceled
,
617 int r
= writer
.drain();
620 const uint64_t actual_size
= get_actual_size();
621 r
= manifest_gen
.create_next(actual_size
);
625 head_obj
->set_atomic(&obj_ctx
);
626 RGWRados::Object
op_target(store
->getRados(),
627 head_obj
->get_bucket()->get_info(),
628 obj_ctx
, head_obj
->get_obj());
629 RGWRados::Object::Write
obj_op(&op_target
);
630 //For Append obj, disable versioning
631 op_target
.set_versioning_disabled(true);
633 cur_manifest
->append(dpp
, manifest
, store
->get_zone());
634 obj_op
.meta
.manifest
= cur_manifest
;
636 obj_op
.meta
.manifest
= &manifest
;
638 obj_op
.meta
.ptag
= &unique_tag
; /* use req_id as operation tag */
639 obj_op
.meta
.mtime
= mtime
;
640 obj_op
.meta
.set_mtime
= set_mtime
;
641 obj_op
.meta
.owner
= owner
;
642 obj_op
.meta
.flags
= PUT_OBJ_CREATE
;
643 obj_op
.meta
.delete_at
= delete_at
;
644 obj_op
.meta
.user_data
= user_data
;
645 obj_op
.meta
.zones_trace
= zones_trace
;
646 obj_op
.meta
.modify_tail
= true;
647 obj_op
.meta
.appendable
= true;
648 //Add the append part number
649 bufferlist cur_part_num_bl
;
651 encode(cur_part_num
, cur_part_num_bl
);
652 attrs
[RGW_ATTR_APPEND_PART_NUM
] = cur_part_num_bl
;
654 if (!cur_etag
.empty()) {
656 // Allow use of MD5 digest in FIPS mode for non-cryptographic purposes
657 hash
.SetFlags(EVP_MD_CTX_FLAG_NON_FIPS_ALLOW
);
658 char petag
[CEPH_CRYPTO_MD5_DIGESTSIZE
];
659 char final_etag
[CEPH_CRYPTO_MD5_DIGESTSIZE
];
660 char final_etag_str
[CEPH_CRYPTO_MD5_DIGESTSIZE
* 2 + 16];
661 hex_to_buf(cur_etag
.c_str(), petag
, CEPH_CRYPTO_MD5_DIGESTSIZE
);
662 hash
.Update((const unsigned char *)petag
, sizeof(petag
));
663 hex_to_buf(etag
.c_str(), petag
, CEPH_CRYPTO_MD5_DIGESTSIZE
);
664 hash
.Update((const unsigned char *)petag
, sizeof(petag
));
665 hash
.Final((unsigned char *)final_etag
);
666 buf_to_hex((unsigned char *)final_etag
, sizeof(final_etag
), final_etag_str
);
667 snprintf(&final_etag_str
[CEPH_CRYPTO_MD5_DIGESTSIZE
* 2], sizeof(final_etag_str
) - CEPH_CRYPTO_MD5_DIGESTSIZE
* 2,
668 "-%lld", (long long)cur_part_num
);
670 etag_bl
.append(final_etag_str
, strlen(final_etag_str
) + 1);
671 attrs
[RGW_ATTR_ETAG
] = etag_bl
;
673 r
= obj_op
.write_meta(dpp
, actual_size
+ cur_size
,
674 accounted_size
+ *cur_accounted_size
,
679 if (!obj_op
.meta
.canceled
) {
680 // on success, clear the set of objects for deletion
681 writer
.clear_written();
684 *pcanceled
= obj_op
.meta
.canceled
;
686 *cur_accounted_size
+= accounted_size
;
691 } // namespace rgw::putobj