]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_putobj_processor.cc
80d39794c83018bcc37912d4f43f63c91e8f5d85
[ceph.git] / ceph / src / rgw / rgw_putobj_processor.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 /*
5 * Ceph - scalable distributed file system
6 *
7 * Copyright (C) 2018 Red Hat, Inc.
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15
16 #include "rgw_aio.h"
17 #include "rgw_putobj_processor.h"
18 #include "rgw_multi.h"
19 #include "rgw_compression.h"
20 #include "services/svc_sys_obj.h"
21 #include "rgw_sal_rados.h"
22
23 #define dout_subsys ceph_subsys_rgw
24
25 using namespace std;
26
27 namespace rgw::putobj {
28
29 int HeadObjectProcessor::process(bufferlist&& data, uint64_t logical_offset)
30 {
31 const bool flush = (data.length() == 0);
32
33 // capture the first chunk for special handling
34 if (data_offset < head_chunk_size || data_offset == 0) {
35 if (flush) {
36 // flush partial chunk
37 return process_first_chunk(std::move(head_data), &processor);
38 }
39
40 auto remaining = head_chunk_size - data_offset;
41 auto count = std::min<uint64_t>(data.length(), remaining);
42 data.splice(0, count, &head_data);
43 data_offset += count;
44
45 if (data_offset == head_chunk_size) {
46 // process the first complete chunk
47 ceph_assert(head_data.length() == head_chunk_size);
48 int r = process_first_chunk(std::move(head_data), &processor);
49 if (r < 0) {
50 return r;
51 }
52 }
53 if (data.length() == 0) { // avoid flushing stripe processor
54 return 0;
55 }
56 }
57 ceph_assert(processor); // process_first_chunk() must initialize
58
59 // send everything else through the processor
60 auto write_offset = data_offset;
61 data_offset += data.length();
62 return processor->process(std::move(data), write_offset);
63 }
64
65
66 static int process_completed(const AioResultList& completed, RawObjSet *written)
67 {
68 std::optional<int> error;
69 for (auto& r : completed) {
70 if (r.result >= 0) {
71 written->insert(r.obj.get_ref().obj);
72 } else if (!error) { // record first error code
73 error = r.result;
74 }
75 }
76 return error.value_or(0);
77 }
78
79 void RadosWriter::add_write_hint(librados::ObjectWriteOperation& op) {
80 const rgw_obj obj = head_obj->get_obj();
81 const RGWObjState *obj_state = obj_ctx.get_state(obj);
82 const bool compressed = obj_state->compressed;
83 uint32_t alloc_hint_flags = 0;
84 if (compressed) {
85 alloc_hint_flags |= librados::ALLOC_HINT_FLAG_INCOMPRESSIBLE;
86 }
87
88 op.set_alloc_hint2(0, 0, alloc_hint_flags);
89 }
90
91 int RadosWriter::set_stripe_obj(const rgw_raw_obj& raw_obj)
92 {
93 stripe_obj = store->svc()->rados->obj(raw_obj);
94 return stripe_obj.open(dpp);
95 }
96
97 int RadosWriter::process(bufferlist&& bl, uint64_t offset)
98 {
99 bufferlist data = std::move(bl);
100 const uint64_t cost = data.length();
101 if (cost == 0) { // no empty writes, use aio directly for creates
102 return 0;
103 }
104 librados::ObjectWriteOperation op;
105 add_write_hint(op);
106 if (offset == 0) {
107 op.write_full(data);
108 } else {
109 op.write(offset, data);
110 }
111 constexpr uint64_t id = 0; // unused
112 auto c = aio->get(stripe_obj, Aio::librados_op(std::move(op), y), cost, id);
113 return process_completed(c, &written);
114 }
115
116 int RadosWriter::write_exclusive(const bufferlist& data)
117 {
118 const uint64_t cost = data.length();
119
120 librados::ObjectWriteOperation op;
121 op.create(true); // exclusive create
122 add_write_hint(op);
123 op.write_full(data);
124
125 constexpr uint64_t id = 0; // unused
126 auto c = aio->get(stripe_obj, Aio::librados_op(std::move(op), y), cost, id);
127 auto d = aio->drain();
128 c.splice(c.end(), d);
129 return process_completed(c, &written);
130 }
131
132 int RadosWriter::drain()
133 {
134 return process_completed(aio->drain(), &written);
135 }
136
137 RadosWriter::~RadosWriter()
138 {
139 // wait on any outstanding aio completions
140 process_completed(aio->drain(), &written);
141
142 bool need_to_remove_head = false;
143 std::optional<rgw_raw_obj> raw_head;
144 if (!rgw::sal::Object::empty(head_obj.get())) {
145 raw_head.emplace();
146 rgw::sal::RadosObject* obj = dynamic_cast<rgw::sal::RadosObject*>(head_obj.get());
147 obj->get_raw_obj(&*raw_head);
148 }
149
150 /**
151 * We should delete the object in the "multipart" namespace to avoid race condition.
152 * Such race condition is caused by the fact that the multipart object is the gatekeeper of a multipart
153 * upload, when it is deleted, a second upload would start with the same suffix("2/"), therefore, objects
154 * written by the second upload may be deleted by the first upload.
155 * details is describled on #11749
156 *
157 * The above comment still stands, but instead of searching for a specific object in the multipart
158 * namespace, we just make sure that we remove the object that is marked as the head object after
159 * we remove all the other raw objects. Note that we use different call to remove the head object,
160 * as this one needs to go via the bucket index prepare/complete 2-phase commit scheme.
161 */
162 for (const auto& obj : written) {
163 if (raw_head && obj == *raw_head) {
164 ldpp_dout(dpp, 5) << "NOTE: we should not process the head object (" << obj << ") here" << dendl;
165 need_to_remove_head = true;
166 continue;
167 }
168
169 int r = store->delete_raw_obj(dpp, obj);
170 if (r < 0 && r != -ENOENT) {
171 ldpp_dout(dpp, 0) << "WARNING: failed to remove obj (" << obj << "), leaked" << dendl;
172 }
173 }
174
175 if (need_to_remove_head) {
176 std::string version_id;
177 ldpp_dout(dpp, 5) << "NOTE: we are going to process the head obj (" << *raw_head << ")" << dendl;
178 int r = head_obj->delete_object(dpp, &obj_ctx, null_yield);
179 if (r < 0 && r != -ENOENT) {
180 ldpp_dout(dpp, 0) << "WARNING: failed to remove obj (" << *raw_head << "), leaked" << dendl;
181 }
182 }
183 }
184
185
186 // advance to the next stripe
187 int ManifestObjectProcessor::next(uint64_t offset, uint64_t *pstripe_size)
188 {
189 // advance the manifest
190 int r = manifest_gen.create_next(offset);
191 if (r < 0) {
192 return r;
193 }
194
195 rgw_raw_obj stripe_obj = manifest_gen.get_cur_obj(store);
196
197 uint64_t chunk_size = 0;
198 r = store->get_raw_chunk_size(dpp, stripe_obj, &chunk_size);
199 if (r < 0) {
200 return r;
201 }
202 r = writer.set_stripe_obj(stripe_obj);
203 if (r < 0) {
204 return r;
205 }
206
207 chunk = ChunkProcessor(&writer, chunk_size);
208 *pstripe_size = manifest_gen.cur_stripe_max_size();
209 return 0;
210 }
211
212
213
214 int AtomicObjectProcessor::process_first_chunk(bufferlist&& data,
215 DataProcessor **processor)
216 {
217 first_chunk = std::move(data);
218 *processor = &stripe;
219 return 0;
220 }
221
222 int AtomicObjectProcessor::prepare(optional_yield y)
223 {
224 uint64_t max_head_chunk_size;
225 uint64_t head_max_size;
226 uint64_t chunk_size = 0;
227 uint64_t alignment;
228
229 int r = dynamic_cast<rgw::sal::RadosObject*>(head_obj.get())->get_max_chunk_size(
230 dpp, head_obj->get_bucket()->get_placement_rule(),
231 &max_head_chunk_size, &alignment);
232 if (r < 0) {
233 return r;
234 }
235
236 bool same_pool = true;
237 if (head_obj->get_bucket()->get_placement_rule() != tail_placement_rule) {
238 if (!head_obj->placement_rules_match(head_obj->get_bucket()->get_placement_rule(), tail_placement_rule)) {
239 same_pool = false;
240 r = dynamic_cast<rgw::sal::RadosObject*>(head_obj.get())->get_max_chunk_size(dpp, tail_placement_rule, &chunk_size);
241 if (r < 0) {
242 return r;
243 }
244 head_max_size = 0;
245 }
246 }
247
248 if (same_pool) {
249 head_max_size = max_head_chunk_size;
250 chunk_size = max_head_chunk_size;
251 }
252
253 uint64_t stripe_size;
254 const uint64_t default_stripe_size = store->ctx()->_conf->rgw_obj_stripe_size;
255
256 dynamic_cast<rgw::sal::RadosObject*>(head_obj.get())->get_max_aligned_size(
257 default_stripe_size, alignment, &stripe_size);
258
259 manifest.set_trivial_rule(head_max_size, stripe_size);
260
261 rgw_obj obj = head_obj->get_obj();
262
263 r = manifest_gen.create_begin(store->ctx(), &manifest,
264 head_obj->get_bucket()->get_placement_rule(),
265 &tail_placement_rule,
266 obj.bucket, obj);
267 if (r < 0) {
268 return r;
269 }
270
271 rgw_raw_obj stripe_obj = manifest_gen.get_cur_obj(store);
272
273 r = writer.set_stripe_obj(stripe_obj);
274 if (r < 0) {
275 return r;
276 }
277
278 set_head_chunk_size(head_max_size);
279 // initialize the processors
280 chunk = ChunkProcessor(&writer, chunk_size);
281 stripe = StripeProcessor(&chunk, this, head_max_size);
282 return 0;
283 }
284
285 int AtomicObjectProcessor::complete(size_t accounted_size,
286 const std::string& etag,
287 ceph::real_time *mtime,
288 ceph::real_time set_mtime,
289 rgw::sal::Attrs& attrs,
290 ceph::real_time delete_at,
291 const char *if_match,
292 const char *if_nomatch,
293 const std::string *user_data,
294 rgw_zone_set *zones_trace,
295 bool *pcanceled, optional_yield y)
296 {
297 int r = writer.drain();
298 if (r < 0) {
299 return r;
300 }
301 const uint64_t actual_size = get_actual_size();
302 r = manifest_gen.create_next(actual_size);
303 if (r < 0) {
304 return r;
305 }
306
307 head_obj->set_atomic(&obj_ctx);
308
309 RGWRados::Object op_target(store->getRados(),
310 head_obj->get_bucket()->get_info(),
311 obj_ctx, head_obj->get_obj());
312 RGWRados::Object::Write obj_op(&op_target);
313
314 /* some object types shouldn't be versioned, e.g., multipart parts */
315 op_target.set_versioning_disabled(!head_obj->get_bucket()->versioning_enabled());
316 obj_op.meta.data = &first_chunk;
317 obj_op.meta.manifest = &manifest;
318 obj_op.meta.ptag = &unique_tag; /* use req_id as operation tag */
319 obj_op.meta.if_match = if_match;
320 obj_op.meta.if_nomatch = if_nomatch;
321 obj_op.meta.mtime = mtime;
322 obj_op.meta.set_mtime = set_mtime;
323 obj_op.meta.owner = owner;
324 obj_op.meta.flags = PUT_OBJ_CREATE;
325 obj_op.meta.olh_epoch = olh_epoch;
326 obj_op.meta.delete_at = delete_at;
327 obj_op.meta.user_data = user_data;
328 obj_op.meta.zones_trace = zones_trace;
329 obj_op.meta.modify_tail = true;
330
331 r = obj_op.write_meta(dpp, actual_size, accounted_size, attrs, y);
332 if (r < 0) {
333 return r;
334 }
335 if (!obj_op.meta.canceled) {
336 // on success, clear the set of objects for deletion
337 writer.clear_written();
338 }
339 if (pcanceled) {
340 *pcanceled = obj_op.meta.canceled;
341 }
342 return 0;
343 }
344
345
346 int MultipartObjectProcessor::process_first_chunk(bufferlist&& data,
347 DataProcessor **processor)
348 {
349 // write the first chunk of the head object as part of an exclusive create,
350 // then drain to wait for the result in case of EEXIST
351 int r = writer.write_exclusive(data);
352 if (r == -EEXIST) {
353 // randomize the oid prefix and reprepare the head/manifest
354 std::string oid_rand = gen_rand_alphanumeric(store->ctx(), 32);
355
356 mp.init(target_obj->get_name(), upload_id, oid_rand);
357 manifest.set_prefix(target_obj->get_name() + "." + oid_rand);
358
359 r = prepare_head();
360 if (r < 0) {
361 return r;
362 }
363 // resubmit the write op on the new head object
364 r = writer.write_exclusive(data);
365 }
366 if (r < 0) {
367 return r;
368 }
369 *processor = &stripe;
370 return 0;
371 }
372
373 int MultipartObjectProcessor::prepare_head()
374 {
375 const uint64_t default_stripe_size = store->ctx()->_conf->rgw_obj_stripe_size;
376 uint64_t chunk_size;
377 uint64_t stripe_size;
378 uint64_t alignment;
379
380 int r = dynamic_cast<rgw::sal::RadosObject*>(target_obj.get())->get_max_chunk_size(dpp,
381 tail_placement_rule, &chunk_size, &alignment);
382 if (r < 0) {
383 ldpp_dout(dpp, 0) << "ERROR: unexpected: get_max_chunk_size(): placement_rule=" << tail_placement_rule.to_str() << " obj=" << target_obj << " returned r=" << r << dendl;
384 return r;
385 }
386 dynamic_cast<rgw::sal::RadosObject*>(target_obj.get())->get_max_aligned_size(
387 default_stripe_size, alignment, &stripe_size);
388
389 manifest.set_multipart_part_rule(stripe_size, part_num);
390
391 r = manifest_gen.create_begin(store->ctx(), &manifest,
392 head_obj->get_bucket()->get_placement_rule(),
393 &tail_placement_rule,
394 target_obj->get_bucket()->get_key(),
395 target_obj->get_obj());
396 if (r < 0) {
397 return r;
398 }
399
400 rgw_raw_obj stripe_obj = manifest_gen.get_cur_obj(store);
401 dynamic_cast<rgw::sal::RadosObject*>(head_obj.get())->raw_obj_to_obj(stripe_obj);
402 head_obj->set_hash_source(target_obj->get_name());
403
404 r = writer.set_stripe_obj(stripe_obj);
405 if (r < 0) {
406 return r;
407 }
408 stripe_size = manifest_gen.cur_stripe_max_size();
409 set_head_chunk_size(stripe_size);
410
411 chunk = ChunkProcessor(&writer, chunk_size);
412 stripe = StripeProcessor(&chunk, this, stripe_size);
413 return 0;
414 }
415
416 int MultipartObjectProcessor::prepare(optional_yield y)
417 {
418 manifest.set_prefix(target_obj->get_name() + "." + upload_id);
419
420 return prepare_head();
421 }
422
423 int MultipartObjectProcessor::complete(size_t accounted_size,
424 const std::string& etag,
425 ceph::real_time *mtime,
426 ceph::real_time set_mtime,
427 std::map<std::string, bufferlist>& attrs,
428 ceph::real_time delete_at,
429 const char *if_match,
430 const char *if_nomatch,
431 const std::string *user_data,
432 rgw_zone_set *zones_trace,
433 bool *pcanceled, optional_yield y)
434 {
435 int r = writer.drain();
436 if (r < 0) {
437 return r;
438 }
439 const uint64_t actual_size = get_actual_size();
440 r = manifest_gen.create_next(actual_size);
441 if (r < 0) {
442 return r;
443 }
444
445 RGWRados::Object op_target(store->getRados(),
446 head_obj->get_bucket()->get_info(),
447 obj_ctx, head_obj->get_obj());
448 RGWRados::Object::Write obj_op(&op_target);
449
450 op_target.set_versioning_disabled(true);
451 op_target.set_meta_placement_rule(&tail_placement_rule);
452 obj_op.meta.set_mtime = set_mtime;
453 obj_op.meta.mtime = mtime;
454 obj_op.meta.owner = owner;
455 obj_op.meta.delete_at = delete_at;
456 obj_op.meta.zones_trace = zones_trace;
457 obj_op.meta.modify_tail = true;
458
459 r = obj_op.write_meta(dpp, actual_size, accounted_size, attrs, y);
460 if (r < 0)
461 return r;
462
463 bufferlist bl;
464 RGWUploadPartInfo info;
465 string p = "part.";
466 bool sorted_omap = is_v2_upload_id(upload_id);
467
468 if (sorted_omap) {
469 char buf[32];
470 snprintf(buf, sizeof(buf), "%08d", part_num);
471 p.append(buf);
472 } else {
473 p.append(part_num_str);
474 }
475 info.num = part_num;
476 info.etag = etag;
477 info.size = actual_size;
478 info.accounted_size = accounted_size;
479 info.modified = real_clock::now();
480 info.manifest = manifest;
481
482 bool compressed;
483 r = rgw_compression_info_from_attrset(attrs, compressed, info.cs_info);
484 if (r < 0) {
485 ldpp_dout(dpp, 1) << "cannot get compression info" << dendl;
486 return r;
487 }
488
489 encode(info, bl);
490
491 std::unique_ptr<rgw::sal::Object> meta_obj =
492 head_obj->get_bucket()->get_object(rgw_obj_key(mp.get_meta(), std::string(), RGW_OBJ_NS_MULTIPART));
493 meta_obj->set_in_extra_data(true);
494
495 r = meta_obj->omap_set_val_by_key(dpp, p, bl, true, null_yield);
496 if (r < 0) {
497 return r == -ENOENT ? -ERR_NO_SUCH_UPLOAD : r;
498 }
499
500 if (!obj_op.meta.canceled) {
501 // on success, clear the set of objects for deletion
502 writer.clear_written();
503 }
504 if (pcanceled) {
505 *pcanceled = obj_op.meta.canceled;
506 }
507 return 0;
508 }
509
510 int AppendObjectProcessor::process_first_chunk(bufferlist &&data, rgw::sal::DataProcessor **processor)
511 {
512 int r = writer.write_exclusive(data);
513 if (r < 0) {
514 return r;
515 }
516 *processor = &stripe;
517 return 0;
518 }
519
520 int AppendObjectProcessor::prepare(optional_yield y)
521 {
522 RGWObjState *astate;
523 int r = head_obj->get_obj_state(dpp, &obj_ctx, &astate, y);
524 if (r < 0) {
525 return r;
526 }
527 cur_size = astate->size;
528 *cur_accounted_size = astate->accounted_size;
529 if (!astate->exists) {
530 if (position != 0) {
531 ldpp_dout(dpp, 5) << "ERROR: Append position should be zero" << dendl;
532 return -ERR_POSITION_NOT_EQUAL_TO_LENGTH;
533 } else {
534 cur_part_num = 1;
535 //set the prefix
536 char buf[33];
537 gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1);
538 string oid_prefix = head_obj->get_name();
539 oid_prefix.append(".");
540 oid_prefix.append(buf);
541 oid_prefix.append("_");
542 manifest.set_prefix(oid_prefix);
543 }
544 } else {
545 // check whether the object appendable
546 map<string, bufferlist>::iterator iter = astate->attrset.find(RGW_ATTR_APPEND_PART_NUM);
547 if (iter == astate->attrset.end()) {
548 ldpp_dout(dpp, 5) << "ERROR: The object is not appendable" << dendl;
549 return -ERR_OBJECT_NOT_APPENDABLE;
550 }
551 if (position != *cur_accounted_size) {
552 ldpp_dout(dpp, 5) << "ERROR: Append position should be equal to the obj size" << dendl;
553 return -ERR_POSITION_NOT_EQUAL_TO_LENGTH;
554 }
555 try {
556 using ceph::decode;
557 decode(cur_part_num, iter->second);
558 } catch (buffer::error& err) {
559 ldpp_dout(dpp, 5) << "ERROR: failed to decode part num" << dendl;
560 return -EIO;
561 }
562 cur_part_num++;
563 //get the current obj etag
564 iter = astate->attrset.find(RGW_ATTR_ETAG);
565 if (iter != astate->attrset.end()) {
566 string s = rgw_string_unquote(iter->second.c_str());
567 size_t pos = s.find("-");
568 cur_etag = s.substr(0, pos);
569 }
570
571 iter = astate->attrset.find(RGW_ATTR_STORAGE_CLASS);
572 if (iter != astate->attrset.end()) {
573 tail_placement_rule.storage_class = iter->second.to_str();
574 }
575 cur_manifest = &(*astate->manifest);
576 manifest.set_prefix(cur_manifest->get_prefix());
577 astate->keep_tail = true;
578 }
579 manifest.set_multipart_part_rule(store->ctx()->_conf->rgw_obj_stripe_size, cur_part_num);
580
581 rgw_obj obj = head_obj->get_obj();
582
583 r = manifest_gen.create_begin(store->ctx(), &manifest, head_obj->get_bucket()->get_placement_rule(), &tail_placement_rule, obj.bucket, obj);
584 if (r < 0) {
585 return r;
586 }
587 rgw_raw_obj stripe_obj = manifest_gen.get_cur_obj(store);
588
589 uint64_t chunk_size = 0;
590 r = store->get_raw_chunk_size(dpp, stripe_obj, &chunk_size);
591 if (r < 0) {
592 return r;
593 }
594 r = writer.set_stripe_obj(std::move(stripe_obj));
595 if (r < 0) {
596 return r;
597 }
598
599 uint64_t stripe_size = manifest_gen.cur_stripe_max_size();
600
601 uint64_t max_head_size = std::min(chunk_size, stripe_size);
602 set_head_chunk_size(max_head_size);
603
604 // initialize the processors
605 chunk = ChunkProcessor(&writer, chunk_size);
606 stripe = StripeProcessor(&chunk, this, stripe_size);
607
608 return 0;
609 }
610
611 int AppendObjectProcessor::complete(size_t accounted_size, const string &etag, ceph::real_time *mtime,
612 ceph::real_time set_mtime, rgw::sal::Attrs& attrs,
613 ceph::real_time delete_at, const char *if_match, const char *if_nomatch,
614 const string *user_data, rgw_zone_set *zones_trace, bool *pcanceled,
615 optional_yield y)
616 {
617 int r = writer.drain();
618 if (r < 0)
619 return r;
620 const uint64_t actual_size = get_actual_size();
621 r = manifest_gen.create_next(actual_size);
622 if (r < 0) {
623 return r;
624 }
625 head_obj->set_atomic(&obj_ctx);
626 RGWRados::Object op_target(store->getRados(),
627 head_obj->get_bucket()->get_info(),
628 obj_ctx, head_obj->get_obj());
629 RGWRados::Object::Write obj_op(&op_target);
630 //For Append obj, disable versioning
631 op_target.set_versioning_disabled(true);
632 if (cur_manifest) {
633 cur_manifest->append(dpp, manifest, store->get_zone());
634 obj_op.meta.manifest = cur_manifest;
635 } else {
636 obj_op.meta.manifest = &manifest;
637 }
638 obj_op.meta.ptag = &unique_tag; /* use req_id as operation tag */
639 obj_op.meta.mtime = mtime;
640 obj_op.meta.set_mtime = set_mtime;
641 obj_op.meta.owner = owner;
642 obj_op.meta.flags = PUT_OBJ_CREATE;
643 obj_op.meta.delete_at = delete_at;
644 obj_op.meta.user_data = user_data;
645 obj_op.meta.zones_trace = zones_trace;
646 obj_op.meta.modify_tail = true;
647 obj_op.meta.appendable = true;
648 //Add the append part number
649 bufferlist cur_part_num_bl;
650 using ceph::encode;
651 encode(cur_part_num, cur_part_num_bl);
652 attrs[RGW_ATTR_APPEND_PART_NUM] = cur_part_num_bl;
653 //calculate the etag
654 if (!cur_etag.empty()) {
655 MD5 hash;
656 // Allow use of MD5 digest in FIPS mode for non-cryptographic purposes
657 hash.SetFlags(EVP_MD_CTX_FLAG_NON_FIPS_ALLOW);
658 char petag[CEPH_CRYPTO_MD5_DIGESTSIZE];
659 char final_etag[CEPH_CRYPTO_MD5_DIGESTSIZE];
660 char final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 16];
661 hex_to_buf(cur_etag.c_str(), petag, CEPH_CRYPTO_MD5_DIGESTSIZE);
662 hash.Update((const unsigned char *)petag, sizeof(petag));
663 hex_to_buf(etag.c_str(), petag, CEPH_CRYPTO_MD5_DIGESTSIZE);
664 hash.Update((const unsigned char *)petag, sizeof(petag));
665 hash.Final((unsigned char *)final_etag);
666 buf_to_hex((unsigned char *)final_etag, sizeof(final_etag), final_etag_str);
667 snprintf(&final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2], sizeof(final_etag_str) - CEPH_CRYPTO_MD5_DIGESTSIZE * 2,
668 "-%lld", (long long)cur_part_num);
669 bufferlist etag_bl;
670 etag_bl.append(final_etag_str, strlen(final_etag_str) + 1);
671 attrs[RGW_ATTR_ETAG] = etag_bl;
672 }
673 r = obj_op.write_meta(dpp, actual_size + cur_size,
674 accounted_size + *cur_accounted_size,
675 attrs, y);
676 if (r < 0) {
677 return r;
678 }
679 if (!obj_op.meta.canceled) {
680 // on success, clear the set of objects for deletion
681 writer.clear_written();
682 }
683 if (pcanceled) {
684 *pcanceled = obj_op.meta.canceled;
685 }
686 *cur_accounted_size += accounted_size;
687
688 return 0;
689 }
690
691 } // namespace rgw::putobj