1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
5 * Ceph - scalable distributed file system
7 * Copyright (C) 2018 Red Hat, Inc.
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
17 #include "rgw_putobj_processor.h"
18 #include "rgw_multi.h"
19 #include "rgw_compression.h"
20 #include "services/svc_sys_obj.h"
21 #include "rgw_sal_rados.h"
23 #define dout_subsys ceph_subsys_rgw
25 namespace rgw::putobj
{
27 int HeadObjectProcessor::process(bufferlist
&& data
, uint64_t logical_offset
)
29 const bool flush
= (data
.length() == 0);
31 // capture the first chunk for special handling
32 if (data_offset
< head_chunk_size
|| data_offset
== 0) {
34 // flush partial chunk
35 return process_first_chunk(std::move(head_data
), &processor
);
38 auto remaining
= head_chunk_size
- data_offset
;
39 auto count
= std::min
<uint64_t>(data
.length(), remaining
);
40 data
.splice(0, count
, &head_data
);
43 if (data_offset
== head_chunk_size
) {
44 // process the first complete chunk
45 ceph_assert(head_data
.length() == head_chunk_size
);
46 int r
= process_first_chunk(std::move(head_data
), &processor
);
51 if (data
.length() == 0) { // avoid flushing stripe processor
55 ceph_assert(processor
); // process_first_chunk() must initialize
57 // send everything else through the processor
58 auto write_offset
= data_offset
;
59 data_offset
+= data
.length();
60 return processor
->process(std::move(data
), write_offset
);
64 static int process_completed(const AioResultList
& completed
, RawObjSet
*written
)
66 std::optional
<int> error
;
67 for (auto& r
: completed
) {
69 written
->insert(r
.obj
.get_ref().obj
);
70 } else if (!error
) { // record first error code
74 return error
.value_or(0);
77 int RadosWriter::set_stripe_obj(const rgw_raw_obj
& raw_obj
)
79 stripe_obj
= store
->svc()->rados
->obj(raw_obj
);
80 return stripe_obj
.open(dpp
);
83 int RadosWriter::process(bufferlist
&& bl
, uint64_t offset
)
85 bufferlist data
= std::move(bl
);
86 const uint64_t cost
= data
.length();
87 if (cost
== 0) { // no empty writes, use aio directly for creates
90 librados::ObjectWriteOperation op
;
94 op
.write(offset
, data
);
96 constexpr uint64_t id
= 0; // unused
97 auto c
= aio
->get(stripe_obj
, Aio::librados_op(std::move(op
), y
), cost
, id
);
98 return process_completed(c
, &written
);
101 int RadosWriter::write_exclusive(const bufferlist
& data
)
103 const uint64_t cost
= data
.length();
105 librados::ObjectWriteOperation op
;
106 op
.create(true); // exclusive create
109 constexpr uint64_t id
= 0; // unused
110 auto c
= aio
->get(stripe_obj
, Aio::librados_op(std::move(op
), y
), cost
, id
);
111 auto d
= aio
->drain();
112 c
.splice(c
.end(), d
);
113 return process_completed(c
, &written
);
116 int RadosWriter::drain()
118 return process_completed(aio
->drain(), &written
);
121 RadosWriter::~RadosWriter()
123 // wait on any outstanding aio completions
124 process_completed(aio
->drain(), &written
);
126 bool need_to_remove_head
= false;
127 std::optional
<rgw_raw_obj
> raw_head
;
128 if (!rgw::sal::RGWObject::empty(head_obj
.get())) {
130 head_obj
->get_raw_obj(&*raw_head
);
134 * We should delete the object in the "multipart" namespace to avoid race condition.
135 * Such race condition is caused by the fact that the multipart object is the gatekeeper of a multipart
136 * upload, when it is deleted, a second upload would start with the same suffix("2/"), therefore, objects
137 * written by the second upload may be deleted by the first upload.
138 * details is describled on #11749
140 * The above comment still stands, but instead of searching for a specific object in the multipart
141 * namespace, we just make sure that we remove the object that is marked as the head object after
142 * we remove all the other raw objects. Note that we use different call to remove the head object,
143 * as this one needs to go via the bucket index prepare/complete 2-phase commit scheme.
145 for (const auto& obj
: written
) {
146 if (raw_head
&& obj
== *raw_head
) {
147 ldpp_dout(dpp
, 5) << "NOTE: we should not process the head object (" << obj
<< ") here" << dendl
;
148 need_to_remove_head
= true;
152 int r
= store
->delete_raw_obj(dpp
, obj
);
153 if (r
< 0 && r
!= -ENOENT
) {
154 ldpp_dout(dpp
, 0) << "WARNING: failed to remove obj (" << obj
<< "), leaked" << dendl
;
158 if (need_to_remove_head
) {
159 std::string version_id
;
160 ldpp_dout(dpp
, 5) << "NOTE: we are going to process the head obj (" << *raw_head
<< ")" << dendl
;
161 int r
= head_obj
->delete_object(dpp
, &obj_ctx
, ACLOwner(), bucket
->get_acl_owner(), ceph::real_time(),
162 false, 0, version_id
, null_yield
);
163 if (r
< 0 && r
!= -ENOENT
) {
164 ldpp_dout(dpp
, 0) << "WARNING: failed to remove obj (" << *raw_head
<< "), leaked" << dendl
;
170 // advance to the next stripe
171 int ManifestObjectProcessor::next(uint64_t offset
, uint64_t *pstripe_size
)
173 // advance the manifest
174 int r
= manifest_gen
.create_next(offset
);
179 rgw_raw_obj stripe_obj
= manifest_gen
.get_cur_obj(store
);
181 uint64_t chunk_size
= 0;
182 r
= store
->get_raw_chunk_size(dpp
, stripe_obj
, &chunk_size
);
186 r
= writer
.set_stripe_obj(stripe_obj
);
191 chunk
= ChunkProcessor(&writer
, chunk_size
);
192 *pstripe_size
= manifest_gen
.cur_stripe_max_size();
198 int AtomicObjectProcessor::process_first_chunk(bufferlist
&& data
,
199 DataProcessor
**processor
)
201 first_chunk
= std::move(data
);
202 *processor
= &stripe
;
206 int AtomicObjectProcessor::prepare(optional_yield y
)
208 uint64_t max_head_chunk_size
;
209 uint64_t head_max_size
;
210 uint64_t chunk_size
= 0;
213 int r
= head_obj
->get_max_chunk_size(dpp
, bucket
->get_placement_rule(),
214 &max_head_chunk_size
, &alignment
);
219 bool same_pool
= true;
220 if (bucket
->get_placement_rule() != tail_placement_rule
) {
221 if (!head_obj
->placement_rules_match(bucket
->get_placement_rule(), tail_placement_rule
)) {
223 r
= head_obj
->get_max_chunk_size(dpp
, tail_placement_rule
, &chunk_size
);
232 head_max_size
= max_head_chunk_size
;
233 chunk_size
= max_head_chunk_size
;
236 uint64_t stripe_size
;
237 const uint64_t default_stripe_size
= store
->ctx()->_conf
->rgw_obj_stripe_size
;
239 head_obj
->get_max_aligned_size(default_stripe_size
, alignment
, &stripe_size
);
241 manifest
.set_trivial_rule(head_max_size
, stripe_size
);
243 rgw_obj obj
= head_obj
->get_obj();
245 r
= manifest_gen
.create_begin(store
->ctx(), &manifest
,
246 bucket
->get_placement_rule(),
247 &tail_placement_rule
,
253 rgw_raw_obj stripe_obj
= manifest_gen
.get_cur_obj(store
);
255 r
= writer
.set_stripe_obj(stripe_obj
);
260 set_head_chunk_size(head_max_size
);
261 // initialize the processors
262 chunk
= ChunkProcessor(&writer
, chunk_size
);
263 stripe
= StripeProcessor(&chunk
, this, head_max_size
);
267 int AtomicObjectProcessor::complete(size_t accounted_size
,
268 const std::string
& etag
,
269 ceph::real_time
*mtime
,
270 ceph::real_time set_mtime
,
271 rgw::sal::RGWAttrs
& attrs
,
272 ceph::real_time delete_at
,
273 const char *if_match
,
274 const char *if_nomatch
,
275 const std::string
*user_data
,
276 rgw_zone_set
*zones_trace
,
277 bool *pcanceled
, optional_yield y
)
279 int r
= writer
.drain();
283 const uint64_t actual_size
= get_actual_size();
284 r
= manifest_gen
.create_next(actual_size
);
289 head_obj
->set_atomic(&obj_ctx
);
291 std::unique_ptr
<rgw::sal::RGWObject::WriteOp
> obj_op
= head_obj
->get_write_op(&obj_ctx
);
293 /* some object types shouldn't be versioned, e.g., multipart parts */
294 obj_op
->params
.versioning_disabled
= !bucket
->versioning_enabled();
295 obj_op
->params
.data
= &first_chunk
;
296 obj_op
->params
.manifest
= &manifest
;
297 obj_op
->params
.ptag
= &unique_tag
; /* use req_id as operation tag */
298 obj_op
->params
.if_match
= if_match
;
299 obj_op
->params
.if_nomatch
= if_nomatch
;
300 obj_op
->params
.mtime
= mtime
;
301 obj_op
->params
.set_mtime
= set_mtime
;
302 obj_op
->params
.owner
= ACLOwner(owner
);
303 obj_op
->params
.flags
= PUT_OBJ_CREATE
;
304 obj_op
->params
.olh_epoch
= olh_epoch
;
305 obj_op
->params
.delete_at
= delete_at
;
306 obj_op
->params
.user_data
= user_data
;
307 obj_op
->params
.zones_trace
= zones_trace
;
308 obj_op
->params
.modify_tail
= true;
309 obj_op
->params
.attrs
= &attrs
;
311 r
= obj_op
->prepare(y
);
316 r
= obj_op
->write_meta(dpp
, actual_size
, accounted_size
, y
);
320 if (!obj_op
->params
.canceled
) {
321 // on success, clear the set of objects for deletion
322 writer
.clear_written();
325 *pcanceled
= obj_op
->params
.canceled
;
331 int MultipartObjectProcessor::process_first_chunk(bufferlist
&& data
,
332 DataProcessor
**processor
)
334 // write the first chunk of the head object as part of an exclusive create,
335 // then drain to wait for the result in case of EEXIST
336 int r
= writer
.write_exclusive(data
);
338 // randomize the oid prefix and reprepare the head/manifest
339 std::string oid_rand
= gen_rand_alphanumeric(store
->ctx(), 32);
341 mp
.init(target_obj
->get_name(), upload_id
, oid_rand
);
342 manifest
.set_prefix(target_obj
->get_name() + "." + oid_rand
);
348 // resubmit the write op on the new head object
349 r
= writer
.write_exclusive(data
);
354 *processor
= &stripe
;
358 int MultipartObjectProcessor::prepare_head()
360 const uint64_t default_stripe_size
= store
->ctx()->_conf
->rgw_obj_stripe_size
;
362 uint64_t stripe_size
;
365 int r
= target_obj
->get_max_chunk_size(dpp
, tail_placement_rule
, &chunk_size
, &alignment
);
367 ldpp_dout(dpp
, 0) << "ERROR: unexpected: get_max_chunk_size(): placement_rule=" << tail_placement_rule
.to_str() << " obj=" << target_obj
<< " returned r=" << r
<< dendl
;
370 target_obj
->get_max_aligned_size(default_stripe_size
, alignment
, &stripe_size
);
372 manifest
.set_multipart_part_rule(stripe_size
, part_num
);
374 r
= manifest_gen
.create_begin(store
->ctx(), &manifest
,
375 bucket
->get_placement_rule(),
376 &tail_placement_rule
,
377 target_obj
->get_bucket()->get_key(),
378 target_obj
->get_obj());
383 rgw_raw_obj stripe_obj
= manifest_gen
.get_cur_obj(store
);
384 head_obj
->raw_obj_to_obj(stripe_obj
);
385 head_obj
->set_hash_source(target_obj
->get_name());
387 r
= writer
.set_stripe_obj(stripe_obj
);
391 stripe_size
= manifest_gen
.cur_stripe_max_size();
392 set_head_chunk_size(stripe_size
);
394 chunk
= ChunkProcessor(&writer
, chunk_size
);
395 stripe
= StripeProcessor(&chunk
, this, stripe_size
);
399 int MultipartObjectProcessor::prepare(optional_yield y
)
401 manifest
.set_prefix(target_obj
->get_name() + "." + upload_id
);
403 return prepare_head();
406 int MultipartObjectProcessor::complete(size_t accounted_size
,
407 const std::string
& etag
,
408 ceph::real_time
*mtime
,
409 ceph::real_time set_mtime
,
410 std::map
<std::string
, bufferlist
>& attrs
,
411 ceph::real_time delete_at
,
412 const char *if_match
,
413 const char *if_nomatch
,
414 const std::string
*user_data
,
415 rgw_zone_set
*zones_trace
,
416 bool *pcanceled
, optional_yield y
)
418 int r
= writer
.drain();
422 const uint64_t actual_size
= get_actual_size();
423 r
= manifest_gen
.create_next(actual_size
);
428 std::unique_ptr
<rgw::sal::RGWObject::WriteOp
> obj_op
= head_obj
->get_write_op(&obj_ctx
);
430 obj_op
->params
.versioning_disabled
= true;
431 obj_op
->params
.set_mtime
= set_mtime
;
432 obj_op
->params
.mtime
= mtime
;
433 obj_op
->params
.owner
= ACLOwner(owner
);
434 obj_op
->params
.delete_at
= delete_at
;
435 obj_op
->params
.zones_trace
= zones_trace
;
436 obj_op
->params
.modify_tail
= true;
437 obj_op
->params
.attrs
= &attrs
;
438 r
= obj_op
->prepare(y
);
443 r
= obj_op
->write_meta(dpp
, actual_size
, accounted_size
, y
);
448 RGWUploadPartInfo info
;
450 bool sorted_omap
= is_v2_upload_id(upload_id
);
454 snprintf(buf
, sizeof(buf
), "%08d", part_num
);
457 p
.append(part_num_str
);
461 info
.size
= actual_size
;
462 info
.accounted_size
= accounted_size
;
463 info
.modified
= real_clock::now();
464 info
.manifest
= manifest
;
467 r
= rgw_compression_info_from_attrset(attrs
, compressed
, info
.cs_info
);
469 ldpp_dout(dpp
, 1) << "cannot get compression info" << dendl
;
475 std::unique_ptr
<rgw::sal::RGWObject
> meta_obj
=
476 bucket
->get_object(rgw_obj_key(mp
.get_meta(), std::string(), RGW_OBJ_NS_MULTIPART
));
477 meta_obj
->set_in_extra_data(true);
479 r
= meta_obj
->omap_set_val_by_key(dpp
, p
, bl
, true, null_yield
);
481 return r
== -ENOENT
? -ERR_NO_SUCH_UPLOAD
: r
;
484 if (!obj_op
->params
.canceled
) {
485 // on success, clear the set of objects for deletion
486 writer
.clear_written();
489 *pcanceled
= obj_op
->params
.canceled
;
494 int AppendObjectProcessor::process_first_chunk(bufferlist
&&data
, rgw::putobj::DataProcessor
**processor
)
496 int r
= writer
.write_exclusive(data
);
500 *processor
= &stripe
;
504 int AppendObjectProcessor::prepare(optional_yield y
)
507 int r
= head_obj
->get_obj_state(dpp
, &obj_ctx
, *bucket
, &astate
, y
);
511 cur_size
= astate
->size
;
512 *cur_accounted_size
= astate
->accounted_size
;
513 if (!astate
->exists
) {
515 ldpp_dout(dpp
, 5) << "ERROR: Append position should be zero" << dendl
;
516 return -ERR_POSITION_NOT_EQUAL_TO_LENGTH
;
521 gen_rand_alphanumeric(store
->ctx(), buf
, sizeof(buf
) - 1);
522 string oid_prefix
= head_obj
->get_name();
523 oid_prefix
.append(".");
524 oid_prefix
.append(buf
);
525 oid_prefix
.append("_");
526 manifest
.set_prefix(oid_prefix
);
529 // check whether the object appendable
530 map
<string
, bufferlist
>::iterator iter
= astate
->attrset
.find(RGW_ATTR_APPEND_PART_NUM
);
531 if (iter
== astate
->attrset
.end()) {
532 ldpp_dout(dpp
, 5) << "ERROR: The object is not appendable" << dendl
;
533 return -ERR_OBJECT_NOT_APPENDABLE
;
535 if (position
!= *cur_accounted_size
) {
536 ldpp_dout(dpp
, 5) << "ERROR: Append position should be equal to the obj size" << dendl
;
537 return -ERR_POSITION_NOT_EQUAL_TO_LENGTH
;
541 decode(cur_part_num
, iter
->second
);
542 } catch (buffer::error
& err
) {
543 ldpp_dout(dpp
, 5) << "ERROR: failed to decode part num" << dendl
;
547 //get the current obj etag
548 iter
= astate
->attrset
.find(RGW_ATTR_ETAG
);
549 if (iter
!= astate
->attrset
.end()) {
550 string s
= rgw_string_unquote(iter
->second
.c_str());
551 size_t pos
= s
.find("-");
552 cur_etag
= s
.substr(0, pos
);
555 iter
= astate
->attrset
.find(RGW_ATTR_STORAGE_CLASS
);
556 if (iter
!= astate
->attrset
.end()) {
557 tail_placement_rule
.storage_class
= iter
->second
.to_str();
559 cur_manifest
= &(*astate
->manifest
);
560 manifest
.set_prefix(cur_manifest
->get_prefix());
561 astate
->keep_tail
= true;
563 manifest
.set_multipart_part_rule(store
->ctx()->_conf
->rgw_obj_stripe_size
, cur_part_num
);
565 rgw_obj obj
= head_obj
->get_obj();
567 r
= manifest_gen
.create_begin(store
->ctx(), &manifest
, bucket
->get_placement_rule(), &tail_placement_rule
, obj
.bucket
, obj
);
571 rgw_raw_obj stripe_obj
= manifest_gen
.get_cur_obj(store
);
573 uint64_t chunk_size
= 0;
574 r
= store
->get_raw_chunk_size(dpp
, stripe_obj
, &chunk_size
);
578 r
= writer
.set_stripe_obj(std::move(stripe_obj
));
583 uint64_t stripe_size
= manifest_gen
.cur_stripe_max_size();
585 uint64_t max_head_size
= std::min(chunk_size
, stripe_size
);
586 set_head_chunk_size(max_head_size
);
588 // initialize the processors
589 chunk
= ChunkProcessor(&writer
, chunk_size
);
590 stripe
= StripeProcessor(&chunk
, this, stripe_size
);
595 int AppendObjectProcessor::complete(size_t accounted_size
, const string
&etag
, ceph::real_time
*mtime
,
596 ceph::real_time set_mtime
, rgw::sal::RGWAttrs
& attrs
,
597 ceph::real_time delete_at
, const char *if_match
, const char *if_nomatch
,
598 const string
*user_data
, rgw_zone_set
*zones_trace
, bool *pcanceled
,
601 int r
= writer
.drain();
604 const uint64_t actual_size
= get_actual_size();
605 r
= manifest_gen
.create_next(actual_size
);
609 head_obj
->set_atomic(&obj_ctx
);
610 std::unique_ptr
<rgw::sal::RGWObject::WriteOp
> obj_op
= head_obj
->get_write_op(&obj_ctx
);
611 //For Append obj, disable versioning
612 obj_op
->params
.versioning_disabled
= true;
614 cur_manifest
->append(dpp
, manifest
, store
->svc()->zone
);
615 obj_op
->params
.manifest
= cur_manifest
;
617 obj_op
->params
.manifest
= &manifest
;
619 obj_op
->params
.ptag
= &unique_tag
; /* use req_id as operation tag */
620 obj_op
->params
.mtime
= mtime
;
621 obj_op
->params
.set_mtime
= set_mtime
;
622 obj_op
->params
.owner
= ACLOwner(owner
);
623 obj_op
->params
.flags
= PUT_OBJ_CREATE
;
624 obj_op
->params
.delete_at
= delete_at
;
625 obj_op
->params
.user_data
= user_data
;
626 obj_op
->params
.zones_trace
= zones_trace
;
627 obj_op
->params
.modify_tail
= true;
628 obj_op
->params
.appendable
= true;
629 obj_op
->params
.attrs
= &attrs
;
630 //Add the append part number
631 bufferlist cur_part_num_bl
;
633 encode(cur_part_num
, cur_part_num_bl
);
634 attrs
[RGW_ATTR_APPEND_PART_NUM
] = cur_part_num_bl
;
636 if (!cur_etag
.empty()) {
638 char petag
[CEPH_CRYPTO_MD5_DIGESTSIZE
];
639 char final_etag
[CEPH_CRYPTO_MD5_DIGESTSIZE
];
640 char final_etag_str
[CEPH_CRYPTO_MD5_DIGESTSIZE
* 2 + 16];
641 hex_to_buf(cur_etag
.c_str(), petag
, CEPH_CRYPTO_MD5_DIGESTSIZE
);
642 hash
.Update((const unsigned char *)petag
, sizeof(petag
));
643 hex_to_buf(etag
.c_str(), petag
, CEPH_CRYPTO_MD5_DIGESTSIZE
);
644 hash
.Update((const unsigned char *)petag
, sizeof(petag
));
645 hash
.Final((unsigned char *)final_etag
);
646 buf_to_hex((unsigned char *)final_etag
, sizeof(final_etag
), final_etag_str
);
647 snprintf(&final_etag_str
[CEPH_CRYPTO_MD5_DIGESTSIZE
* 2], sizeof(final_etag_str
) - CEPH_CRYPTO_MD5_DIGESTSIZE
* 2,
648 "-%lld", (long long)cur_part_num
);
650 etag_bl
.append(final_etag_str
, strlen(final_etag_str
) + 1);
651 attrs
[RGW_ATTR_ETAG
] = etag_bl
;
653 r
= obj_op
->prepare(y
);
657 r
= obj_op
->write_meta(dpp
, actual_size
+ cur_size
, accounted_size
+ *cur_accounted_size
, y
);
661 if (!obj_op
->params
.canceled
) {
662 // on success, clear the set of objects for deletion
663 writer
.clear_written();
666 *pcanceled
= obj_op
->params
.canceled
;
668 *cur_accounted_size
+= accounted_size
;
673 } // namespace rgw::putobj