]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_putobj_processor.cc
import ceph pacific 16.2.5
[ceph.git] / ceph / src / rgw / rgw_putobj_processor.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 /*
5 * Ceph - scalable distributed file system
6 *
7 * Copyright (C) 2018 Red Hat, Inc.
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15
16 #include "rgw_aio.h"
17 #include "rgw_putobj_processor.h"
18 #include "rgw_multi.h"
19 #include "rgw_compression.h"
20 #include "services/svc_sys_obj.h"
21 #include "rgw_sal_rados.h"
22
23 #define dout_subsys ceph_subsys_rgw
24
25 namespace rgw::putobj {
26
27 int HeadObjectProcessor::process(bufferlist&& data, uint64_t logical_offset)
28 {
29 const bool flush = (data.length() == 0);
30
31 // capture the first chunk for special handling
32 if (data_offset < head_chunk_size || data_offset == 0) {
33 if (flush) {
34 // flush partial chunk
35 return process_first_chunk(std::move(head_data), &processor);
36 }
37
38 auto remaining = head_chunk_size - data_offset;
39 auto count = std::min<uint64_t>(data.length(), remaining);
40 data.splice(0, count, &head_data);
41 data_offset += count;
42
43 if (data_offset == head_chunk_size) {
44 // process the first complete chunk
45 ceph_assert(head_data.length() == head_chunk_size);
46 int r = process_first_chunk(std::move(head_data), &processor);
47 if (r < 0) {
48 return r;
49 }
50 }
51 if (data.length() == 0) { // avoid flushing stripe processor
52 return 0;
53 }
54 }
55 ceph_assert(processor); // process_first_chunk() must initialize
56
57 // send everything else through the processor
58 auto write_offset = data_offset;
59 data_offset += data.length();
60 return processor->process(std::move(data), write_offset);
61 }
62
63
64 static int process_completed(const AioResultList& completed, RawObjSet *written)
65 {
66 std::optional<int> error;
67 for (auto& r : completed) {
68 if (r.result >= 0) {
69 written->insert(r.obj.get_ref().obj);
70 } else if (!error) { // record first error code
71 error = r.result;
72 }
73 }
74 return error.value_or(0);
75 }
76
77 int RadosWriter::set_stripe_obj(const rgw_raw_obj& raw_obj)
78 {
79 stripe_obj = store->svc()->rados->obj(raw_obj);
80 return stripe_obj.open(dpp);
81 }
82
83 int RadosWriter::process(bufferlist&& bl, uint64_t offset)
84 {
85 bufferlist data = std::move(bl);
86 const uint64_t cost = data.length();
87 if (cost == 0) { // no empty writes, use aio directly for creates
88 return 0;
89 }
90 librados::ObjectWriteOperation op;
91 if (offset == 0) {
92 op.write_full(data);
93 } else {
94 op.write(offset, data);
95 }
96 constexpr uint64_t id = 0; // unused
97 auto c = aio->get(stripe_obj, Aio::librados_op(std::move(op), y), cost, id);
98 return process_completed(c, &written);
99 }
100
101 int RadosWriter::write_exclusive(const bufferlist& data)
102 {
103 const uint64_t cost = data.length();
104
105 librados::ObjectWriteOperation op;
106 op.create(true); // exclusive create
107 op.write_full(data);
108
109 constexpr uint64_t id = 0; // unused
110 auto c = aio->get(stripe_obj, Aio::librados_op(std::move(op), y), cost, id);
111 auto d = aio->drain();
112 c.splice(c.end(), d);
113 return process_completed(c, &written);
114 }
115
116 int RadosWriter::drain()
117 {
118 return process_completed(aio->drain(), &written);
119 }
120
121 RadosWriter::~RadosWriter()
122 {
123 // wait on any outstanding aio completions
124 process_completed(aio->drain(), &written);
125
126 bool need_to_remove_head = false;
127 std::optional<rgw_raw_obj> raw_head;
128 if (!rgw::sal::RGWObject::empty(head_obj.get())) {
129 raw_head.emplace();
130 head_obj->get_raw_obj(&*raw_head);
131 }
132
133 /**
134 * We should delete the object in the "multipart" namespace to avoid race condition.
135 * Such race condition is caused by the fact that the multipart object is the gatekeeper of a multipart
136 * upload, when it is deleted, a second upload would start with the same suffix("2/"), therefore, objects
137 * written by the second upload may be deleted by the first upload.
138 * details is describled on #11749
139 *
140 * The above comment still stands, but instead of searching for a specific object in the multipart
141 * namespace, we just make sure that we remove the object that is marked as the head object after
142 * we remove all the other raw objects. Note that we use different call to remove the head object,
143 * as this one needs to go via the bucket index prepare/complete 2-phase commit scheme.
144 */
145 for (const auto& obj : written) {
146 if (raw_head && obj == *raw_head) {
147 ldpp_dout(dpp, 5) << "NOTE: we should not process the head object (" << obj << ") here" << dendl;
148 need_to_remove_head = true;
149 continue;
150 }
151
152 int r = store->delete_raw_obj(dpp, obj);
153 if (r < 0 && r != -ENOENT) {
154 ldpp_dout(dpp, 0) << "WARNING: failed to remove obj (" << obj << "), leaked" << dendl;
155 }
156 }
157
158 if (need_to_remove_head) {
159 std::string version_id;
160 ldpp_dout(dpp, 5) << "NOTE: we are going to process the head obj (" << *raw_head << ")" << dendl;
161 int r = head_obj->delete_object(dpp, &obj_ctx, ACLOwner(), bucket->get_acl_owner(), ceph::real_time(),
162 false, 0, version_id, null_yield);
163 if (r < 0 && r != -ENOENT) {
164 ldpp_dout(dpp, 0) << "WARNING: failed to remove obj (" << *raw_head << "), leaked" << dendl;
165 }
166 }
167 }
168
169
170 // advance to the next stripe
171 int ManifestObjectProcessor::next(uint64_t offset, uint64_t *pstripe_size)
172 {
173 // advance the manifest
174 int r = manifest_gen.create_next(offset);
175 if (r < 0) {
176 return r;
177 }
178
179 rgw_raw_obj stripe_obj = manifest_gen.get_cur_obj(store);
180
181 uint64_t chunk_size = 0;
182 r = store->get_raw_chunk_size(dpp, stripe_obj, &chunk_size);
183 if (r < 0) {
184 return r;
185 }
186 r = writer.set_stripe_obj(stripe_obj);
187 if (r < 0) {
188 return r;
189 }
190
191 chunk = ChunkProcessor(&writer, chunk_size);
192 *pstripe_size = manifest_gen.cur_stripe_max_size();
193 return 0;
194 }
195
196
197
198 int AtomicObjectProcessor::process_first_chunk(bufferlist&& data,
199 DataProcessor **processor)
200 {
201 first_chunk = std::move(data);
202 *processor = &stripe;
203 return 0;
204 }
205
206 int AtomicObjectProcessor::prepare(optional_yield y)
207 {
208 uint64_t max_head_chunk_size;
209 uint64_t head_max_size;
210 uint64_t chunk_size = 0;
211 uint64_t alignment;
212
213 int r = head_obj->get_max_chunk_size(dpp, bucket->get_placement_rule(),
214 &max_head_chunk_size, &alignment);
215 if (r < 0) {
216 return r;
217 }
218
219 bool same_pool = true;
220 if (bucket->get_placement_rule() != tail_placement_rule) {
221 if (!head_obj->placement_rules_match(bucket->get_placement_rule(), tail_placement_rule)) {
222 same_pool = false;
223 r = head_obj->get_max_chunk_size(dpp, tail_placement_rule, &chunk_size);
224 if (r < 0) {
225 return r;
226 }
227 head_max_size = 0;
228 }
229 }
230
231 if (same_pool) {
232 head_max_size = max_head_chunk_size;
233 chunk_size = max_head_chunk_size;
234 }
235
236 uint64_t stripe_size;
237 const uint64_t default_stripe_size = store->ctx()->_conf->rgw_obj_stripe_size;
238
239 head_obj->get_max_aligned_size(default_stripe_size, alignment, &stripe_size);
240
241 manifest.set_trivial_rule(head_max_size, stripe_size);
242
243 rgw_obj obj = head_obj->get_obj();
244
245 r = manifest_gen.create_begin(store->ctx(), &manifest,
246 bucket->get_placement_rule(),
247 &tail_placement_rule,
248 obj.bucket, obj);
249 if (r < 0) {
250 return r;
251 }
252
253 rgw_raw_obj stripe_obj = manifest_gen.get_cur_obj(store);
254
255 r = writer.set_stripe_obj(stripe_obj);
256 if (r < 0) {
257 return r;
258 }
259
260 set_head_chunk_size(head_max_size);
261 // initialize the processors
262 chunk = ChunkProcessor(&writer, chunk_size);
263 stripe = StripeProcessor(&chunk, this, head_max_size);
264 return 0;
265 }
266
267 int AtomicObjectProcessor::complete(size_t accounted_size,
268 const std::string& etag,
269 ceph::real_time *mtime,
270 ceph::real_time set_mtime,
271 rgw::sal::RGWAttrs& attrs,
272 ceph::real_time delete_at,
273 const char *if_match,
274 const char *if_nomatch,
275 const std::string *user_data,
276 rgw_zone_set *zones_trace,
277 bool *pcanceled, optional_yield y)
278 {
279 int r = writer.drain();
280 if (r < 0) {
281 return r;
282 }
283 const uint64_t actual_size = get_actual_size();
284 r = manifest_gen.create_next(actual_size);
285 if (r < 0) {
286 return r;
287 }
288
289 head_obj->set_atomic(&obj_ctx);
290
291 std::unique_ptr<rgw::sal::RGWObject::WriteOp> obj_op = head_obj->get_write_op(&obj_ctx);
292
293 /* some object types shouldn't be versioned, e.g., multipart parts */
294 obj_op->params.versioning_disabled = !bucket->versioning_enabled();
295 obj_op->params.data = &first_chunk;
296 obj_op->params.manifest = &manifest;
297 obj_op->params.ptag = &unique_tag; /* use req_id as operation tag */
298 obj_op->params.if_match = if_match;
299 obj_op->params.if_nomatch = if_nomatch;
300 obj_op->params.mtime = mtime;
301 obj_op->params.set_mtime = set_mtime;
302 obj_op->params.owner = ACLOwner(owner);
303 obj_op->params.flags = PUT_OBJ_CREATE;
304 obj_op->params.olh_epoch = olh_epoch;
305 obj_op->params.delete_at = delete_at;
306 obj_op->params.user_data = user_data;
307 obj_op->params.zones_trace = zones_trace;
308 obj_op->params.modify_tail = true;
309 obj_op->params.attrs = &attrs;
310
311 r = obj_op->prepare(y);
312 if (r < 0) {
313 return r;
314 }
315
316 r = obj_op->write_meta(dpp, actual_size, accounted_size, y);
317 if (r < 0) {
318 return r;
319 }
320 if (!obj_op->params.canceled) {
321 // on success, clear the set of objects for deletion
322 writer.clear_written();
323 }
324 if (pcanceled) {
325 *pcanceled = obj_op->params.canceled;
326 }
327 return 0;
328 }
329
330
331 int MultipartObjectProcessor::process_first_chunk(bufferlist&& data,
332 DataProcessor **processor)
333 {
334 // write the first chunk of the head object as part of an exclusive create,
335 // then drain to wait for the result in case of EEXIST
336 int r = writer.write_exclusive(data);
337 if (r == -EEXIST) {
338 // randomize the oid prefix and reprepare the head/manifest
339 std::string oid_rand = gen_rand_alphanumeric(store->ctx(), 32);
340
341 mp.init(target_obj->get_name(), upload_id, oid_rand);
342 manifest.set_prefix(target_obj->get_name() + "." + oid_rand);
343
344 r = prepare_head();
345 if (r < 0) {
346 return r;
347 }
348 // resubmit the write op on the new head object
349 r = writer.write_exclusive(data);
350 }
351 if (r < 0) {
352 return r;
353 }
354 *processor = &stripe;
355 return 0;
356 }
357
358 int MultipartObjectProcessor::prepare_head()
359 {
360 const uint64_t default_stripe_size = store->ctx()->_conf->rgw_obj_stripe_size;
361 uint64_t chunk_size;
362 uint64_t stripe_size;
363 uint64_t alignment;
364
365 int r = target_obj->get_max_chunk_size(dpp, tail_placement_rule, &chunk_size, &alignment);
366 if (r < 0) {
367 ldpp_dout(dpp, 0) << "ERROR: unexpected: get_max_chunk_size(): placement_rule=" << tail_placement_rule.to_str() << " obj=" << target_obj << " returned r=" << r << dendl;
368 return r;
369 }
370 target_obj->get_max_aligned_size(default_stripe_size, alignment, &stripe_size);
371
372 manifest.set_multipart_part_rule(stripe_size, part_num);
373
374 r = manifest_gen.create_begin(store->ctx(), &manifest,
375 bucket->get_placement_rule(),
376 &tail_placement_rule,
377 target_obj->get_bucket()->get_key(),
378 target_obj->get_obj());
379 if (r < 0) {
380 return r;
381 }
382
383 rgw_raw_obj stripe_obj = manifest_gen.get_cur_obj(store);
384 head_obj->raw_obj_to_obj(stripe_obj);
385 head_obj->set_hash_source(target_obj->get_name());
386
387 r = writer.set_stripe_obj(stripe_obj);
388 if (r < 0) {
389 return r;
390 }
391 stripe_size = manifest_gen.cur_stripe_max_size();
392 set_head_chunk_size(stripe_size);
393
394 chunk = ChunkProcessor(&writer, chunk_size);
395 stripe = StripeProcessor(&chunk, this, stripe_size);
396 return 0;
397 }
398
399 int MultipartObjectProcessor::prepare(optional_yield y)
400 {
401 manifest.set_prefix(target_obj->get_name() + "." + upload_id);
402
403 return prepare_head();
404 }
405
406 int MultipartObjectProcessor::complete(size_t accounted_size,
407 const std::string& etag,
408 ceph::real_time *mtime,
409 ceph::real_time set_mtime,
410 std::map<std::string, bufferlist>& attrs,
411 ceph::real_time delete_at,
412 const char *if_match,
413 const char *if_nomatch,
414 const std::string *user_data,
415 rgw_zone_set *zones_trace,
416 bool *pcanceled, optional_yield y)
417 {
418 int r = writer.drain();
419 if (r < 0) {
420 return r;
421 }
422 const uint64_t actual_size = get_actual_size();
423 r = manifest_gen.create_next(actual_size);
424 if (r < 0) {
425 return r;
426 }
427
428 std::unique_ptr<rgw::sal::RGWObject::WriteOp> obj_op = head_obj->get_write_op(&obj_ctx);
429
430 obj_op->params.versioning_disabled = true;
431 obj_op->params.set_mtime = set_mtime;
432 obj_op->params.mtime = mtime;
433 obj_op->params.owner = ACLOwner(owner);
434 obj_op->params.delete_at = delete_at;
435 obj_op->params.zones_trace = zones_trace;
436 obj_op->params.modify_tail = true;
437 obj_op->params.attrs = &attrs;
438 r = obj_op->prepare(y);
439 if (r < 0) {
440 return r;
441 }
442
443 r = obj_op->write_meta(dpp, actual_size, accounted_size, y);
444 if (r < 0)
445 return r;
446
447 bufferlist bl;
448 RGWUploadPartInfo info;
449 string p = "part.";
450 bool sorted_omap = is_v2_upload_id(upload_id);
451
452 if (sorted_omap) {
453 char buf[32];
454 snprintf(buf, sizeof(buf), "%08d", part_num);
455 p.append(buf);
456 } else {
457 p.append(part_num_str);
458 }
459 info.num = part_num;
460 info.etag = etag;
461 info.size = actual_size;
462 info.accounted_size = accounted_size;
463 info.modified = real_clock::now();
464 info.manifest = manifest;
465
466 bool compressed;
467 r = rgw_compression_info_from_attrset(attrs, compressed, info.cs_info);
468 if (r < 0) {
469 ldpp_dout(dpp, 1) << "cannot get compression info" << dendl;
470 return r;
471 }
472
473 encode(info, bl);
474
475 std::unique_ptr<rgw::sal::RGWObject> meta_obj =
476 bucket->get_object(rgw_obj_key(mp.get_meta(), std::string(), RGW_OBJ_NS_MULTIPART));
477 meta_obj->set_in_extra_data(true);
478
479 r = meta_obj->omap_set_val_by_key(dpp, p, bl, true, null_yield);
480 if (r < 0) {
481 return r == -ENOENT ? -ERR_NO_SUCH_UPLOAD : r;
482 }
483
484 if (!obj_op->params.canceled) {
485 // on success, clear the set of objects for deletion
486 writer.clear_written();
487 }
488 if (pcanceled) {
489 *pcanceled = obj_op->params.canceled;
490 }
491 return 0;
492 }
493
494 int AppendObjectProcessor::process_first_chunk(bufferlist &&data, rgw::putobj::DataProcessor **processor)
495 {
496 int r = writer.write_exclusive(data);
497 if (r < 0) {
498 return r;
499 }
500 *processor = &stripe;
501 return 0;
502 }
503
504 int AppendObjectProcessor::prepare(optional_yield y)
505 {
506 RGWObjState *astate;
507 int r = head_obj->get_obj_state(dpp, &obj_ctx, *bucket, &astate, y);
508 if (r < 0) {
509 return r;
510 }
511 cur_size = astate->size;
512 *cur_accounted_size = astate->accounted_size;
513 if (!astate->exists) {
514 if (position != 0) {
515 ldpp_dout(dpp, 5) << "ERROR: Append position should be zero" << dendl;
516 return -ERR_POSITION_NOT_EQUAL_TO_LENGTH;
517 } else {
518 cur_part_num = 1;
519 //set the prefix
520 char buf[33];
521 gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1);
522 string oid_prefix = head_obj->get_name();
523 oid_prefix.append(".");
524 oid_prefix.append(buf);
525 oid_prefix.append("_");
526 manifest.set_prefix(oid_prefix);
527 }
528 } else {
529 // check whether the object appendable
530 map<string, bufferlist>::iterator iter = astate->attrset.find(RGW_ATTR_APPEND_PART_NUM);
531 if (iter == astate->attrset.end()) {
532 ldpp_dout(dpp, 5) << "ERROR: The object is not appendable" << dendl;
533 return -ERR_OBJECT_NOT_APPENDABLE;
534 }
535 if (position != *cur_accounted_size) {
536 ldpp_dout(dpp, 5) << "ERROR: Append position should be equal to the obj size" << dendl;
537 return -ERR_POSITION_NOT_EQUAL_TO_LENGTH;
538 }
539 try {
540 using ceph::decode;
541 decode(cur_part_num, iter->second);
542 } catch (buffer::error& err) {
543 ldpp_dout(dpp, 5) << "ERROR: failed to decode part num" << dendl;
544 return -EIO;
545 }
546 cur_part_num++;
547 //get the current obj etag
548 iter = astate->attrset.find(RGW_ATTR_ETAG);
549 if (iter != astate->attrset.end()) {
550 string s = rgw_string_unquote(iter->second.c_str());
551 size_t pos = s.find("-");
552 cur_etag = s.substr(0, pos);
553 }
554
555 iter = astate->attrset.find(RGW_ATTR_STORAGE_CLASS);
556 if (iter != astate->attrset.end()) {
557 tail_placement_rule.storage_class = iter->second.to_str();
558 }
559 cur_manifest = &(*astate->manifest);
560 manifest.set_prefix(cur_manifest->get_prefix());
561 astate->keep_tail = true;
562 }
563 manifest.set_multipart_part_rule(store->ctx()->_conf->rgw_obj_stripe_size, cur_part_num);
564
565 rgw_obj obj = head_obj->get_obj();
566
567 r = manifest_gen.create_begin(store->ctx(), &manifest, bucket->get_placement_rule(), &tail_placement_rule, obj.bucket, obj);
568 if (r < 0) {
569 return r;
570 }
571 rgw_raw_obj stripe_obj = manifest_gen.get_cur_obj(store);
572
573 uint64_t chunk_size = 0;
574 r = store->get_raw_chunk_size(dpp, stripe_obj, &chunk_size);
575 if (r < 0) {
576 return r;
577 }
578 r = writer.set_stripe_obj(std::move(stripe_obj));
579 if (r < 0) {
580 return r;
581 }
582
583 uint64_t stripe_size = manifest_gen.cur_stripe_max_size();
584
585 uint64_t max_head_size = std::min(chunk_size, stripe_size);
586 set_head_chunk_size(max_head_size);
587
588 // initialize the processors
589 chunk = ChunkProcessor(&writer, chunk_size);
590 stripe = StripeProcessor(&chunk, this, stripe_size);
591
592 return 0;
593 }
594
595 int AppendObjectProcessor::complete(size_t accounted_size, const string &etag, ceph::real_time *mtime,
596 ceph::real_time set_mtime, rgw::sal::RGWAttrs& attrs,
597 ceph::real_time delete_at, const char *if_match, const char *if_nomatch,
598 const string *user_data, rgw_zone_set *zones_trace, bool *pcanceled,
599 optional_yield y)
600 {
601 int r = writer.drain();
602 if (r < 0)
603 return r;
604 const uint64_t actual_size = get_actual_size();
605 r = manifest_gen.create_next(actual_size);
606 if (r < 0) {
607 return r;
608 }
609 head_obj->set_atomic(&obj_ctx);
610 std::unique_ptr<rgw::sal::RGWObject::WriteOp> obj_op = head_obj->get_write_op(&obj_ctx);
611 //For Append obj, disable versioning
612 obj_op->params.versioning_disabled = true;
613 if (cur_manifest) {
614 cur_manifest->append(dpp, manifest, store->svc()->zone);
615 obj_op->params.manifest = cur_manifest;
616 } else {
617 obj_op->params.manifest = &manifest;
618 }
619 obj_op->params.ptag = &unique_tag; /* use req_id as operation tag */
620 obj_op->params.mtime = mtime;
621 obj_op->params.set_mtime = set_mtime;
622 obj_op->params.owner = ACLOwner(owner);
623 obj_op->params.flags = PUT_OBJ_CREATE;
624 obj_op->params.delete_at = delete_at;
625 obj_op->params.user_data = user_data;
626 obj_op->params.zones_trace = zones_trace;
627 obj_op->params.modify_tail = true;
628 obj_op->params.appendable = true;
629 obj_op->params.attrs = &attrs;
630 //Add the append part number
631 bufferlist cur_part_num_bl;
632 using ceph::encode;
633 encode(cur_part_num, cur_part_num_bl);
634 attrs[RGW_ATTR_APPEND_PART_NUM] = cur_part_num_bl;
635 //calculate the etag
636 if (!cur_etag.empty()) {
637 MD5 hash;
638 char petag[CEPH_CRYPTO_MD5_DIGESTSIZE];
639 char final_etag[CEPH_CRYPTO_MD5_DIGESTSIZE];
640 char final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 16];
641 hex_to_buf(cur_etag.c_str(), petag, CEPH_CRYPTO_MD5_DIGESTSIZE);
642 hash.Update((const unsigned char *)petag, sizeof(petag));
643 hex_to_buf(etag.c_str(), petag, CEPH_CRYPTO_MD5_DIGESTSIZE);
644 hash.Update((const unsigned char *)petag, sizeof(petag));
645 hash.Final((unsigned char *)final_etag);
646 buf_to_hex((unsigned char *)final_etag, sizeof(final_etag), final_etag_str);
647 snprintf(&final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2], sizeof(final_etag_str) - CEPH_CRYPTO_MD5_DIGESTSIZE * 2,
648 "-%lld", (long long)cur_part_num);
649 bufferlist etag_bl;
650 etag_bl.append(final_etag_str, strlen(final_etag_str) + 1);
651 attrs[RGW_ATTR_ETAG] = etag_bl;
652 }
653 r = obj_op->prepare(y);
654 if (r < 0) {
655 return r;
656 }
657 r = obj_op->write_meta(dpp, actual_size + cur_size, accounted_size + *cur_accounted_size, y);
658 if (r < 0) {
659 return r;
660 }
661 if (!obj_op->params.canceled) {
662 // on success, clear the set of objects for deletion
663 writer.clear_written();
664 }
665 if (pcanceled) {
666 *pcanceled = obj_op->params.canceled;
667 }
668 *cur_accounted_size += accounted_size;
669
670 return 0;
671 }
672
673 } // namespace rgw::putobj