]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_putobj_processor.cc
4ac9ccb017c6926cf84e625d6a51de31a1ad812d
[ceph.git] / ceph / src / rgw / rgw_putobj_processor.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 /*
5 * Ceph - scalable distributed file system
6 *
7 * Copyright (C) 2018 Red Hat, Inc.
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15
16 #include "rgw_aio.h"
17 #include "rgw_putobj_processor.h"
18 #include "rgw_multi.h"
19 #include "rgw_compression.h"
20 #include "services/svc_sys_obj.h"
21
22 #define dout_subsys ceph_subsys_rgw
23
24 namespace rgw::putobj {
25
26 int HeadObjectProcessor::process(bufferlist&& data, uint64_t logical_offset)
27 {
28 const bool flush = (data.length() == 0);
29
30 // capture the first chunk for special handling
31 if (data_offset < head_chunk_size || data_offset == 0) {
32 if (flush) {
33 // flush partial chunk
34 return process_first_chunk(std::move(head_data), &processor);
35 }
36
37 auto remaining = head_chunk_size - data_offset;
38 auto count = std::min<uint64_t>(data.length(), remaining);
39 data.splice(0, count, &head_data);
40 data_offset += count;
41
42 if (data_offset == head_chunk_size) {
43 // process the first complete chunk
44 ceph_assert(head_data.length() == head_chunk_size);
45 int r = process_first_chunk(std::move(head_data), &processor);
46 if (r < 0) {
47 return r;
48 }
49 }
50 if (data.length() == 0) { // avoid flushing stripe processor
51 return 0;
52 }
53 }
54 ceph_assert(processor); // process_first_chunk() must initialize
55
56 // send everything else through the processor
57 auto write_offset = data_offset;
58 data_offset += data.length();
59 return processor->process(std::move(data), write_offset);
60 }
61
62
63 static int process_completed(const AioResultList& completed, RawObjSet *written)
64 {
65 std::optional<int> error;
66 for (auto& r : completed) {
67 if (r.result >= 0) {
68 written->insert(r.obj.get_ref().obj);
69 } else if (!error) { // record first error code
70 error = r.result;
71 }
72 }
73 return error.value_or(0);
74 }
75
76 int RadosWriter::set_stripe_obj(const rgw_raw_obj& raw_obj)
77 {
78 stripe_obj = store->svc()->rados->obj(raw_obj);
79 return stripe_obj.open();
80 }
81
82 int RadosWriter::process(bufferlist&& bl, uint64_t offset)
83 {
84 bufferlist data = std::move(bl);
85 const uint64_t cost = data.length();
86 if (cost == 0) { // no empty writes, use aio directly for creates
87 return 0;
88 }
89 librados::ObjectWriteOperation op;
90 if (offset == 0) {
91 op.write_full(data);
92 } else {
93 op.write(offset, data);
94 }
95 constexpr uint64_t id = 0; // unused
96 auto c = aio->get(stripe_obj, Aio::librados_op(std::move(op), y), cost, id);
97 return process_completed(c, &written);
98 }
99
100 int RadosWriter::write_exclusive(const bufferlist& data)
101 {
102 const uint64_t cost = data.length();
103
104 librados::ObjectWriteOperation op;
105 op.create(true); // exclusive create
106 op.write_full(data);
107
108 constexpr uint64_t id = 0; // unused
109 auto c = aio->get(stripe_obj, Aio::librados_op(std::move(op), y), cost, id);
110 auto d = aio->drain();
111 c.splice(c.end(), d);
112 return process_completed(c, &written);
113 }
114
115 int RadosWriter::drain()
116 {
117 return process_completed(aio->drain(), &written);
118 }
119
120 RadosWriter::~RadosWriter()
121 {
122 // wait on any outstanding aio completions
123 process_completed(aio->drain(), &written);
124
125 bool need_to_remove_head = false;
126 std::optional<rgw_raw_obj> raw_head;
127 if (!head_obj.empty()) {
128 raw_head.emplace();
129 store->getRados()->obj_to_raw(bucket_info.placement_rule, head_obj, &*raw_head);
130 }
131
132 /**
133 * We should delete the object in the "multipart" namespace to avoid race condition.
134 * Such race condition is caused by the fact that the multipart object is the gatekeeper of a multipart
135 * upload, when it is deleted, a second upload would start with the same suffix("2/"), therefore, objects
136 * written by the second upload may be deleted by the first upload.
137 * details is describled on #11749
138 *
139 * The above comment still stands, but instead of searching for a specific object in the multipart
140 * namespace, we just make sure that we remove the object that is marked as the head object after
141 * we remove all the other raw objects. Note that we use different call to remove the head object,
142 * as this one needs to go via the bucket index prepare/complete 2-phase commit scheme.
143 */
144 for (const auto& obj : written) {
145 if (raw_head && obj == *raw_head) {
146 ldpp_dout(dpp, 5) << "NOTE: we should not process the head object (" << obj << ") here" << dendl;
147 need_to_remove_head = true;
148 continue;
149 }
150
151 int r = store->getRados()->delete_raw_obj(obj);
152 if (r < 0 && r != -ENOENT) {
153 ldpp_dout(dpp, 5) << "WARNING: failed to remove obj (" << obj << "), leaked" << dendl;
154 }
155 }
156
157 if (need_to_remove_head) {
158 ldpp_dout(dpp, 5) << "NOTE: we are going to process the head obj (" << *raw_head << ")" << dendl;
159 int r = store->getRados()->delete_obj(obj_ctx, bucket_info, head_obj, 0, 0);
160 if (r < 0 && r != -ENOENT) {
161 ldpp_dout(dpp, 0) << "WARNING: failed to remove obj (" << *raw_head << "), leaked" << dendl;
162 }
163 }
164 }
165
166
167 // advance to the next stripe
168 int ManifestObjectProcessor::next(uint64_t offset, uint64_t *pstripe_size)
169 {
170 // advance the manifest
171 int r = manifest_gen.create_next(offset);
172 if (r < 0) {
173 return r;
174 }
175
176 rgw_raw_obj stripe_obj = manifest_gen.get_cur_obj(store->getRados());
177
178 uint64_t chunk_size = 0;
179 r = store->getRados()->get_max_chunk_size(stripe_obj.pool, &chunk_size);
180 if (r < 0) {
181 return r;
182 }
183 r = writer.set_stripe_obj(stripe_obj);
184 if (r < 0) {
185 return r;
186 }
187
188 chunk = ChunkProcessor(&writer, chunk_size);
189 *pstripe_size = manifest_gen.cur_stripe_max_size();
190 return 0;
191 }
192
193
194
195 int AtomicObjectProcessor::process_first_chunk(bufferlist&& data,
196 DataProcessor **processor)
197 {
198 first_chunk = std::move(data);
199 *processor = &stripe;
200 return 0;
201 }
202
203 int AtomicObjectProcessor::prepare(optional_yield y)
204 {
205 uint64_t max_head_chunk_size;
206 uint64_t head_max_size;
207 uint64_t chunk_size = 0;
208 uint64_t alignment;
209 rgw_pool head_pool;
210
211 if (!store->getRados()->get_obj_data_pool(bucket_info.placement_rule, head_obj, &head_pool)) {
212 return -EIO;
213 }
214
215 int r = store->getRados()->get_max_chunk_size(head_pool, &max_head_chunk_size, &alignment);
216 if (r < 0) {
217 return r;
218 }
219
220 bool same_pool = true;
221
222 if (bucket_info.placement_rule != tail_placement_rule) {
223 rgw_pool tail_pool;
224 if (!store->getRados()->get_obj_data_pool(tail_placement_rule, head_obj, &tail_pool)) {
225 return -EIO;
226 }
227
228 if (tail_pool != head_pool) {
229 same_pool = false;
230
231 r = store->getRados()->get_max_chunk_size(tail_pool, &chunk_size);
232 if (r < 0) {
233 return r;
234 }
235
236 head_max_size = 0;
237 }
238 }
239
240 if (same_pool) {
241 head_max_size = max_head_chunk_size;
242 chunk_size = max_head_chunk_size;
243 }
244
245 uint64_t stripe_size;
246 const uint64_t default_stripe_size = store->ctx()->_conf->rgw_obj_stripe_size;
247
248 store->getRados()->get_max_aligned_size(default_stripe_size, alignment, &stripe_size);
249
250 manifest.set_trivial_rule(head_max_size, stripe_size);
251
252 r = manifest_gen.create_begin(store->ctx(), &manifest,
253 bucket_info.placement_rule,
254 &tail_placement_rule,
255 head_obj.bucket, head_obj);
256 if (r < 0) {
257 return r;
258 }
259
260 rgw_raw_obj stripe_obj = manifest_gen.get_cur_obj(store->getRados());
261
262 r = writer.set_stripe_obj(stripe_obj);
263 if (r < 0) {
264 return r;
265 }
266
267 set_head_chunk_size(head_max_size);
268 // initialize the processors
269 chunk = ChunkProcessor(&writer, chunk_size);
270 stripe = StripeProcessor(&chunk, this, head_max_size);
271 return 0;
272 }
273
274 int AtomicObjectProcessor::complete(size_t accounted_size,
275 const std::string& etag,
276 ceph::real_time *mtime,
277 ceph::real_time set_mtime,
278 std::map<std::string, bufferlist>& attrs,
279 ceph::real_time delete_at,
280 const char *if_match,
281 const char *if_nomatch,
282 const std::string *user_data,
283 rgw_zone_set *zones_trace,
284 bool *pcanceled, optional_yield y)
285 {
286 int r = writer.drain();
287 if (r < 0) {
288 return r;
289 }
290 const uint64_t actual_size = get_actual_size();
291 r = manifest_gen.create_next(actual_size);
292 if (r < 0) {
293 return r;
294 }
295
296 obj_ctx.set_atomic(head_obj);
297
298 RGWRados::Object op_target(store->getRados(), bucket_info, obj_ctx, head_obj);
299
300 /* some object types shouldn't be versioned, e.g., multipart parts */
301 op_target.set_versioning_disabled(!bucket_info.versioning_enabled());
302
303 RGWRados::Object::Write obj_op(&op_target);
304
305 obj_op.meta.data = &first_chunk;
306 obj_op.meta.manifest = &manifest;
307 obj_op.meta.ptag = &unique_tag; /* use req_id as operation tag */
308 obj_op.meta.if_match = if_match;
309 obj_op.meta.if_nomatch = if_nomatch;
310 obj_op.meta.mtime = mtime;
311 obj_op.meta.set_mtime = set_mtime;
312 obj_op.meta.owner = owner;
313 obj_op.meta.flags = PUT_OBJ_CREATE;
314 obj_op.meta.olh_epoch = olh_epoch;
315 obj_op.meta.delete_at = delete_at;
316 obj_op.meta.user_data = user_data;
317 obj_op.meta.zones_trace = zones_trace;
318 obj_op.meta.modify_tail = true;
319
320 r = obj_op.write_meta(actual_size, accounted_size, attrs, y);
321 if (r < 0) {
322 return r;
323 }
324 if (!obj_op.meta.canceled) {
325 // on success, clear the set of objects for deletion
326 writer.clear_written();
327 }
328 if (pcanceled) {
329 *pcanceled = obj_op.meta.canceled;
330 }
331 return 0;
332 }
333
334
335 int MultipartObjectProcessor::process_first_chunk(bufferlist&& data,
336 DataProcessor **processor)
337 {
338 // write the first chunk of the head object as part of an exclusive create,
339 // then drain to wait for the result in case of EEXIST
340 int r = writer.write_exclusive(data);
341 if (r == -EEXIST) {
342 // randomize the oid prefix and reprepare the head/manifest
343 std::string oid_rand(32, 0);
344 gen_rand_alphanumeric(store->ctx(), oid_rand.data(), oid_rand.size());
345
346 mp.init(target_obj.key.name, upload_id, oid_rand);
347 manifest.set_prefix(target_obj.key.name + "." + oid_rand);
348
349 r = prepare_head();
350 if (r < 0) {
351 return r;
352 }
353 // resubmit the write op on the new head object
354 r = writer.write_exclusive(data);
355 }
356 if (r < 0) {
357 return r;
358 }
359 *processor = &stripe;
360 return 0;
361 }
362
363 int MultipartObjectProcessor::prepare_head()
364 {
365 const uint64_t default_stripe_size = store->ctx()->_conf->rgw_obj_stripe_size;
366 uint64_t chunk_size;
367 uint64_t stripe_size;
368 uint64_t alignment;
369
370 int r = store->getRados()->get_max_chunk_size(tail_placement_rule, target_obj, &chunk_size, &alignment);
371 if (r < 0) {
372 ldpp_dout(dpp, 0) << "ERROR: unexpected: get_max_chunk_size(): placement_rule=" << tail_placement_rule.to_str() << " obj=" << target_obj << " returned r=" << r << dendl;
373 return r;
374 }
375 store->getRados()->get_max_aligned_size(default_stripe_size, alignment, &stripe_size);
376
377 manifest.set_multipart_part_rule(stripe_size, part_num);
378
379 r = manifest_gen.create_begin(store->ctx(), &manifest,
380 bucket_info.placement_rule,
381 &tail_placement_rule,
382 target_obj.bucket, target_obj);
383 if (r < 0) {
384 return r;
385 }
386
387 rgw_raw_obj stripe_obj = manifest_gen.get_cur_obj(store->getRados());
388 RGWSI_Tier_RADOS::raw_obj_to_obj(head_obj.bucket, stripe_obj, &head_obj);
389 head_obj.index_hash_source = target_obj.key.name;
390
391 r = writer.set_stripe_obj(stripe_obj);
392 if (r < 0) {
393 return r;
394 }
395 stripe_size = manifest_gen.cur_stripe_max_size();
396 set_head_chunk_size(stripe_size);
397
398 chunk = ChunkProcessor(&writer, chunk_size);
399 stripe = StripeProcessor(&chunk, this, stripe_size);
400 return 0;
401 }
402
403 int MultipartObjectProcessor::prepare(optional_yield y)
404 {
405 manifest.set_prefix(target_obj.key.name + "." + upload_id);
406
407 return prepare_head();
408 }
409
410 int MultipartObjectProcessor::complete(size_t accounted_size,
411 const std::string& etag,
412 ceph::real_time *mtime,
413 ceph::real_time set_mtime,
414 std::map<std::string, bufferlist>& attrs,
415 ceph::real_time delete_at,
416 const char *if_match,
417 const char *if_nomatch,
418 const std::string *user_data,
419 rgw_zone_set *zones_trace,
420 bool *pcanceled, optional_yield y)
421 {
422 int r = writer.drain();
423 if (r < 0) {
424 return r;
425 }
426 const uint64_t actual_size = get_actual_size();
427 r = manifest_gen.create_next(actual_size);
428 if (r < 0) {
429 return r;
430 }
431
432 RGWRados::Object op_target(store->getRados(), bucket_info, obj_ctx, head_obj);
433 op_target.set_versioning_disabled(true);
434 RGWRados::Object::Write obj_op(&op_target);
435
436 obj_op.meta.set_mtime = set_mtime;
437 obj_op.meta.mtime = mtime;
438 obj_op.meta.owner = owner;
439 obj_op.meta.delete_at = delete_at;
440 obj_op.meta.zones_trace = zones_trace;
441 obj_op.meta.modify_tail = true;
442
443 r = obj_op.write_meta(actual_size, accounted_size, attrs, y);
444 if (r < 0)
445 return r;
446
447 bufferlist bl;
448 RGWUploadPartInfo info;
449 string p = "part.";
450 bool sorted_omap = is_v2_upload_id(upload_id);
451
452 if (sorted_omap) {
453 char buf[32];
454 snprintf(buf, sizeof(buf), "%08d", part_num);
455 p.append(buf);
456 } else {
457 p.append(part_num_str);
458 }
459 info.num = part_num;
460 info.etag = etag;
461 info.size = actual_size;
462 info.accounted_size = accounted_size;
463 info.modified = real_clock::now();
464 info.manifest = manifest;
465
466 bool compressed;
467 r = rgw_compression_info_from_attrset(attrs, compressed, info.cs_info);
468 if (r < 0) {
469 ldpp_dout(dpp, 1) << "cannot get compression info" << dendl;
470 return r;
471 }
472
473 encode(info, bl);
474
475 rgw_obj meta_obj;
476 meta_obj.init_ns(bucket_info.bucket, mp.get_meta(), RGW_OBJ_NS_MULTIPART);
477 meta_obj.set_in_extra_data(true);
478
479 rgw_raw_obj raw_meta_obj;
480
481 store->getRados()->obj_to_raw(bucket_info.placement_rule, meta_obj, &raw_meta_obj);
482
483 auto obj_ctx = store->svc()->sysobj->init_obj_ctx();
484 auto sysobj = obj_ctx.get_obj(raw_meta_obj);
485
486 r = sysobj.omap()
487 .set_must_exist(true)
488 .set(p, bl, null_yield);
489 if (r < 0) {
490 return r == -ENOENT ? -ERR_NO_SUCH_UPLOAD : r;
491 }
492
493 if (!obj_op.meta.canceled) {
494 // on success, clear the set of objects for deletion
495 writer.clear_written();
496 }
497 if (pcanceled) {
498 *pcanceled = obj_op.meta.canceled;
499 }
500 return 0;
501 }
502
503 int AppendObjectProcessor::process_first_chunk(bufferlist &&data, rgw::putobj::DataProcessor **processor)
504 {
505 int r = writer.write_exclusive(data);
506 if (r < 0) {
507 return r;
508 }
509 *processor = &stripe;
510 return 0;
511 }
512
513 int AppendObjectProcessor::prepare(optional_yield y)
514 {
515 RGWObjState *astate;
516 int r = store->getRados()->get_obj_state(&obj_ctx, bucket_info, head_obj, &astate, y);
517 if (r < 0) {
518 return r;
519 }
520 cur_size = astate->size;
521 *cur_accounted_size = astate->accounted_size;
522 if (!astate->exists) {
523 if (position != 0) {
524 ldpp_dout(dpp, 5) << "ERROR: Append position should be zero" << dendl;
525 return -ERR_POSITION_NOT_EQUAL_TO_LENGTH;
526 } else {
527 cur_part_num = 1;
528 //set the prefix
529 char buf[33];
530 gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1);
531 string oid_prefix = head_obj.key.name;
532 oid_prefix.append(".");
533 oid_prefix.append(buf);
534 oid_prefix.append("_");
535 manifest.set_prefix(oid_prefix);
536 }
537 } else {
538 // check whether the object appendable
539 map<string, bufferlist>::iterator iter = astate->attrset.find(RGW_ATTR_APPEND_PART_NUM);
540 if (iter == astate->attrset.end()) {
541 ldpp_dout(dpp, 5) << "ERROR: The object is not appendable" << dendl;
542 return -ERR_OBJECT_NOT_APPENDABLE;
543 }
544 if (position != *cur_accounted_size) {
545 ldpp_dout(dpp, 5) << "ERROR: Append position should be equal to the obj size" << dendl;
546 return -ERR_POSITION_NOT_EQUAL_TO_LENGTH;
547 }
548 try {
549 decode(cur_part_num, iter->second);
550 } catch (buffer::error& err) {
551 ldpp_dout(dpp, 5) << "ERROR: failed to decode part num" << dendl;
552 return -EIO;
553 }
554 cur_part_num++;
555 //get the current obj etag
556 iter = astate->attrset.find(RGW_ATTR_ETAG);
557 if (iter != astate->attrset.end()) {
558 string s = rgw_string_unquote(iter->second.c_str());
559 size_t pos = s.find("-");
560 cur_etag = s.substr(0, pos);
561 }
562
563 iter = astate->attrset.find(RGW_ATTR_STORAGE_CLASS);
564 if (iter != astate->attrset.end()) {
565 tail_placement_rule.storage_class = iter->second.to_str();
566 }
567 cur_manifest = &(*astate->manifest);
568 manifest.set_prefix(cur_manifest->get_prefix());
569 astate->keep_tail = true;
570 }
571 manifest.set_multipart_part_rule(store->ctx()->_conf->rgw_obj_stripe_size, cur_part_num);
572
573 r = manifest_gen.create_begin(store->ctx(), &manifest, bucket_info.placement_rule, &tail_placement_rule, head_obj.bucket, head_obj);
574 if (r < 0) {
575 return r;
576 }
577 rgw_raw_obj stripe_obj = manifest_gen.get_cur_obj(store->getRados());
578
579 uint64_t chunk_size = 0;
580 r = store->getRados()->get_max_chunk_size(stripe_obj.pool, &chunk_size);
581 if (r < 0) {
582 return r;
583 }
584 r = writer.set_stripe_obj(std::move(stripe_obj));
585 if (r < 0) {
586 return r;
587 }
588
589 uint64_t stripe_size = manifest_gen.cur_stripe_max_size();
590
591 uint64_t max_head_size = std::min(chunk_size, stripe_size);
592 set_head_chunk_size(max_head_size);
593
594 // initialize the processors
595 chunk = ChunkProcessor(&writer, chunk_size);
596 stripe = StripeProcessor(&chunk, this, stripe_size);
597
598 return 0;
599 }
600
601 int AppendObjectProcessor::complete(size_t accounted_size, const string &etag, ceph::real_time *mtime,
602 ceph::real_time set_mtime, map <string, bufferlist> &attrs,
603 ceph::real_time delete_at, const char *if_match, const char *if_nomatch,
604 const string *user_data, rgw_zone_set *zones_trace, bool *pcanceled,
605 optional_yield y)
606 {
607 int r = writer.drain();
608 if (r < 0)
609 return r;
610 const uint64_t actual_size = get_actual_size();
611 r = manifest_gen.create_next(actual_size);
612 if (r < 0) {
613 return r;
614 }
615 obj_ctx.set_atomic(head_obj);
616 RGWRados::Object op_target(store->getRados(), bucket_info, obj_ctx, head_obj);
617 //For Append obj, disable versioning
618 op_target.set_versioning_disabled(true);
619 RGWRados::Object::Write obj_op(&op_target);
620 if (cur_manifest) {
621 cur_manifest->append(manifest, store->svc()->zone);
622 obj_op.meta.manifest = cur_manifest;
623 } else {
624 obj_op.meta.manifest = &manifest;
625 }
626 obj_op.meta.ptag = &unique_tag; /* use req_id as operation tag */
627 obj_op.meta.mtime = mtime;
628 obj_op.meta.set_mtime = set_mtime;
629 obj_op.meta.owner = owner;
630 obj_op.meta.flags = PUT_OBJ_CREATE;
631 obj_op.meta.delete_at = delete_at;
632 obj_op.meta.user_data = user_data;
633 obj_op.meta.zones_trace = zones_trace;
634 obj_op.meta.modify_tail = true;
635 obj_op.meta.appendable = true;
636 //Add the append part number
637 bufferlist cur_part_num_bl;
638 encode(cur_part_num, cur_part_num_bl);
639 attrs[RGW_ATTR_APPEND_PART_NUM] = cur_part_num_bl;
640 //calculate the etag
641 if (!cur_etag.empty()) {
642 MD5 hash;
643 char petag[CEPH_CRYPTO_MD5_DIGESTSIZE];
644 char final_etag[CEPH_CRYPTO_MD5_DIGESTSIZE];
645 char final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 16];
646 hex_to_buf(cur_etag.c_str(), petag, CEPH_CRYPTO_MD5_DIGESTSIZE);
647 hash.Update((const unsigned char *)petag, sizeof(petag));
648 hex_to_buf(etag.c_str(), petag, CEPH_CRYPTO_MD5_DIGESTSIZE);
649 hash.Update((const unsigned char *)petag, sizeof(petag));
650 hash.Final((unsigned char *)final_etag);
651 buf_to_hex((unsigned char *)final_etag, sizeof(final_etag), final_etag_str);
652 snprintf(&final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2], sizeof(final_etag_str) - CEPH_CRYPTO_MD5_DIGESTSIZE * 2,
653 "-%lld", (long long)cur_part_num);
654 bufferlist etag_bl;
655 etag_bl.append(final_etag_str, strlen(final_etag_str) + 1);
656 attrs[RGW_ATTR_ETAG] = etag_bl;
657 }
658 r = obj_op.write_meta(actual_size + cur_size, accounted_size + *cur_accounted_size, attrs, y);
659 if (r < 0) {
660 return r;
661 }
662 if (!obj_op.meta.canceled) {
663 // on success, clear the set of objects for deletion
664 writer.clear_written();
665 }
666 if (pcanceled) {
667 *pcanceled = obj_op.meta.canceled;
668 }
669 *cur_accounted_size += accounted_size;
670
671 return 0;
672 }
673
674 } // namespace rgw::putobj