]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/driver/rados/rgw_putobj_processor.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / rgw / driver / rados / rgw_putobj_processor.cc
CommitLineData
11fdf7f2 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
9f95a23c 2// vim: ts=8 sw=2 smarttab ft=cpp
11fdf7f2
TL
3
4/*
5 * Ceph - scalable distributed file system
6 *
7 * Copyright (C) 2018 Red Hat, Inc.
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15
1e59de90 16#include "include/rados/librados.hpp"
11fdf7f2
TL
17#include "rgw_aio.h"
18#include "rgw_putobj_processor.h"
19#include "rgw_multi.h"
9f95a23c 20#include "rgw_compression.h"
11fdf7f2 21#include "services/svc_sys_obj.h"
39ae355f 22#include "services/svc_zone.h"
f67539c2 23#include "rgw_sal_rados.h"
11fdf7f2
TL
24
25#define dout_subsys ceph_subsys_rgw
26
20effc67
TL
27using namespace std;
28
11fdf7f2
TL
29namespace rgw::putobj {
30
1e59de90
TL
31/*
32 * For the cloudtiered objects, update the object manifest with the
33 * cloudtier config info read from the attrs.
34 * Since these attrs are used internally for only replication, do not store them
35 * in the head object.
36 */
37void read_cloudtier_info_from_attrs(rgw::sal::Attrs& attrs, RGWObjCategory& category,
38 RGWObjManifest& manifest) {
39 auto attr_iter = attrs.find(RGW_ATTR_CLOUD_TIER_TYPE);
40 if (attr_iter != attrs.end()) {
41 auto i = attr_iter->second;
42 string m = i.to_str();
43
44 if (m == "cloud-s3") {
45 category = RGWObjCategory::CloudTiered;
46 manifest.set_tier_type("cloud-s3");
47
48 auto config_iter = attrs.find(RGW_ATTR_CLOUD_TIER_CONFIG);
49 if (config_iter != attrs.end()) {
50 auto i = config_iter->second.cbegin();
51 RGWObjTier tier_config;
52
53 try {
54 using ceph::decode;
55 decode(tier_config, i);
56 manifest.set_tier_config(tier_config);
57 attrs.erase(config_iter);
58 } catch (buffer::error& err) {
59 }
60 }
61 }
62 attrs.erase(attr_iter);
63 }
64}
65
11fdf7f2
TL
66int HeadObjectProcessor::process(bufferlist&& data, uint64_t logical_offset)
67{
68 const bool flush = (data.length() == 0);
69
70 // capture the first chunk for special handling
71 if (data_offset < head_chunk_size || data_offset == 0) {
72 if (flush) {
73 // flush partial chunk
74 return process_first_chunk(std::move(head_data), &processor);
75 }
76
77 auto remaining = head_chunk_size - data_offset;
78 auto count = std::min<uint64_t>(data.length(), remaining);
79 data.splice(0, count, &head_data);
80 data_offset += count;
81
82 if (data_offset == head_chunk_size) {
83 // process the first complete chunk
84 ceph_assert(head_data.length() == head_chunk_size);
85 int r = process_first_chunk(std::move(head_data), &processor);
86 if (r < 0) {
87 return r;
88 }
89 }
90 if (data.length() == 0) { // avoid flushing stripe processor
91 return 0;
92 }
93 }
94 ceph_assert(processor); // process_first_chunk() must initialize
95
96 // send everything else through the processor
97 auto write_offset = data_offset;
98 data_offset += data.length();
99 return processor->process(std::move(data), write_offset);
100}
101
102
103static int process_completed(const AioResultList& completed, RawObjSet *written)
104{
105 std::optional<int> error;
106 for (auto& r : completed) {
107 if (r.result >= 0) {
108 written->insert(r.obj.get_ref().obj);
109 } else if (!error) { // record first error code
110 error = r.result;
111 }
112 }
113 return error.value_or(0);
114}
115
20effc67 116void RadosWriter::add_write_hint(librados::ObjectWriteOperation& op) {
1e59de90
TL
117 const RGWObjStateManifest *sm = obj_ctx.get_state(head_obj);
118 const bool compressed = sm->state.compressed;
20effc67
TL
119 uint32_t alloc_hint_flags = 0;
120 if (compressed) {
121 alloc_hint_flags |= librados::ALLOC_HINT_FLAG_INCOMPRESSIBLE;
122 }
123
124 op.set_alloc_hint2(0, 0, alloc_hint_flags);
125}
126
11fdf7f2
TL
127int RadosWriter::set_stripe_obj(const rgw_raw_obj& raw_obj)
128{
1e59de90 129 stripe_obj = store->svc.rados->obj(raw_obj);
b3b6e05e 130 return stripe_obj.open(dpp);
11fdf7f2
TL
131}
132
133int RadosWriter::process(bufferlist&& bl, uint64_t offset)
134{
135 bufferlist data = std::move(bl);
136 const uint64_t cost = data.length();
137 if (cost == 0) { // no empty writes, use aio directly for creates
138 return 0;
139 }
140 librados::ObjectWriteOperation op;
20effc67 141 add_write_hint(op);
11fdf7f2
TL
142 if (offset == 0) {
143 op.write_full(data);
144 } else {
145 op.write(offset, data);
146 }
147 constexpr uint64_t id = 0; // unused
9f95a23c 148 auto c = aio->get(stripe_obj, Aio::librados_op(std::move(op), y), cost, id);
11fdf7f2
TL
149 return process_completed(c, &written);
150}
151
152int RadosWriter::write_exclusive(const bufferlist& data)
153{
154 const uint64_t cost = data.length();
155
156 librados::ObjectWriteOperation op;
157 op.create(true); // exclusive create
20effc67 158 add_write_hint(op);
11fdf7f2
TL
159 op.write_full(data);
160
161 constexpr uint64_t id = 0; // unused
9f95a23c 162 auto c = aio->get(stripe_obj, Aio::librados_op(std::move(op), y), cost, id);
11fdf7f2
TL
163 auto d = aio->drain();
164 c.splice(c.end(), d);
165 return process_completed(c, &written);
166}
167
168int RadosWriter::drain()
169{
170 return process_completed(aio->drain(), &written);
171}
172
173RadosWriter::~RadosWriter()
174{
175 // wait on any outstanding aio completions
176 process_completed(aio->drain(), &written);
177
178 bool need_to_remove_head = false;
179 std::optional<rgw_raw_obj> raw_head;
1e59de90 180 if (!head_obj.empty()) {
11fdf7f2 181 raw_head.emplace();
1e59de90 182 store->obj_to_raw(bucket_info.placement_rule, head_obj, &*raw_head);
11fdf7f2
TL
183 }
184
185 /**
186 * We should delete the object in the "multipart" namespace to avoid race condition.
187 * Such race condition is caused by the fact that the multipart object is the gatekeeper of a multipart
188 * upload, when it is deleted, a second upload would start with the same suffix("2/"), therefore, objects
189 * written by the second upload may be deleted by the first upload.
190 * details is describled on #11749
191 *
192 * The above comment still stands, but instead of searching for a specific object in the multipart
193 * namespace, we just make sure that we remove the object that is marked as the head object after
194 * we remove all the other raw objects. Note that we use different call to remove the head object,
195 * as this one needs to go via the bucket index prepare/complete 2-phase commit scheme.
196 */
197 for (const auto& obj : written) {
198 if (raw_head && obj == *raw_head) {
9f95a23c 199 ldpp_dout(dpp, 5) << "NOTE: we should not process the head object (" << obj << ") here" << dendl;
11fdf7f2
TL
200 need_to_remove_head = true;
201 continue;
202 }
203
b3b6e05e 204 int r = store->delete_raw_obj(dpp, obj);
11fdf7f2 205 if (r < 0 && r != -ENOENT) {
f67539c2 206 ldpp_dout(dpp, 0) << "WARNING: failed to remove obj (" << obj << "), leaked" << dendl;
11fdf7f2
TL
207 }
208 }
209
210 if (need_to_remove_head) {
f67539c2 211 std::string version_id;
9f95a23c 212 ldpp_dout(dpp, 5) << "NOTE: we are going to process the head obj (" << *raw_head << ")" << dendl;
1e59de90 213 int r = store->delete_obj(dpp, obj_ctx, bucket_info, head_obj, 0, 0);
11fdf7f2 214 if (r < 0 && r != -ENOENT) {
9f95a23c 215 ldpp_dout(dpp, 0) << "WARNING: failed to remove obj (" << *raw_head << "), leaked" << dendl;
11fdf7f2
TL
216 }
217 }
218}
219
220
221// advance to the next stripe
222int ManifestObjectProcessor::next(uint64_t offset, uint64_t *pstripe_size)
223{
224 // advance the manifest
225 int r = manifest_gen.create_next(offset);
226 if (r < 0) {
227 return r;
228 }
229
f67539c2 230 rgw_raw_obj stripe_obj = manifest_gen.get_cur_obj(store);
11fdf7f2
TL
231
232 uint64_t chunk_size = 0;
1e59de90 233 r = store->get_max_chunk_size(stripe_obj.pool, &chunk_size, dpp);
11fdf7f2
TL
234 if (r < 0) {
235 return r;
236 }
237 r = writer.set_stripe_obj(stripe_obj);
238 if (r < 0) {
239 return r;
240 }
241
242 chunk = ChunkProcessor(&writer, chunk_size);
243 *pstripe_size = manifest_gen.cur_stripe_max_size();
244 return 0;
245}
246
247
9f95a23c 248
11fdf7f2
TL
249int AtomicObjectProcessor::process_first_chunk(bufferlist&& data,
250 DataProcessor **processor)
251{
252 first_chunk = std::move(data);
253 *processor = &stripe;
254 return 0;
255}
256
9f95a23c 257int AtomicObjectProcessor::prepare(optional_yield y)
11fdf7f2
TL
258{
259 uint64_t max_head_chunk_size;
260 uint64_t head_max_size;
261 uint64_t chunk_size = 0;
262 uint64_t alignment;
1e59de90
TL
263 rgw_pool head_pool;
264
265 if (!store->get_obj_data_pool(bucket_info.placement_rule, head_obj, &head_pool)) {
266 return -EIO;
267 }
11fdf7f2 268
1e59de90 269 int r = store->get_max_chunk_size(head_pool, &max_head_chunk_size, dpp, &alignment);
11fdf7f2
TL
270 if (r < 0) {
271 return r;
272 }
273
274 bool same_pool = true;
1e59de90
TL
275 if (bucket_info.placement_rule != tail_placement_rule) {
276 rgw_pool tail_pool;
277 if (!store->get_obj_data_pool(tail_placement_rule, head_obj, &tail_pool)) {
278 return -EIO;
279 }
280
281 if (tail_pool != head_pool) {
11fdf7f2 282 same_pool = false;
1e59de90
TL
283
284 r = store->get_max_chunk_size(tail_pool, &chunk_size, dpp);
11fdf7f2
TL
285 if (r < 0) {
286 return r;
287 }
1e59de90 288
11fdf7f2
TL
289 head_max_size = 0;
290 }
291 }
292
293 if (same_pool) {
39ae355f 294 RGWZonePlacementInfo placement_info;
1e59de90 295 if (!store->svc.zone->get_zone_params().get_placement(bucket_info.placement_rule.name, &placement_info) || placement_info.inline_data) {
39ae355f
TL
296 head_max_size = max_head_chunk_size;
297 } else {
298 head_max_size = 0;
299 }
11fdf7f2
TL
300 chunk_size = max_head_chunk_size;
301 }
302
303 uint64_t stripe_size;
304 const uint64_t default_stripe_size = store->ctx()->_conf->rgw_obj_stripe_size;
305
1e59de90 306 store->get_max_aligned_size(default_stripe_size, alignment, &stripe_size);
11fdf7f2
TL
307
308 manifest.set_trivial_rule(head_max_size, stripe_size);
309
310 r = manifest_gen.create_begin(store->ctx(), &manifest,
1e59de90 311 bucket_info.placement_rule,
11fdf7f2 312 &tail_placement_rule,
1e59de90 313 head_obj.bucket, head_obj);
11fdf7f2
TL
314 if (r < 0) {
315 return r;
316 }
317
f67539c2 318 rgw_raw_obj stripe_obj = manifest_gen.get_cur_obj(store);
11fdf7f2
TL
319
320 r = writer.set_stripe_obj(stripe_obj);
321 if (r < 0) {
322 return r;
323 }
324
325 set_head_chunk_size(head_max_size);
326 // initialize the processors
327 chunk = ChunkProcessor(&writer, chunk_size);
328 stripe = StripeProcessor(&chunk, this, head_max_size);
329 return 0;
330}
331
332int AtomicObjectProcessor::complete(size_t accounted_size,
333 const std::string& etag,
334 ceph::real_time *mtime,
335 ceph::real_time set_mtime,
20effc67 336 rgw::sal::Attrs& attrs,
11fdf7f2
TL
337 ceph::real_time delete_at,
338 const char *if_match,
339 const char *if_nomatch,
340 const std::string *user_data,
341 rgw_zone_set *zones_trace,
9f95a23c 342 bool *pcanceled, optional_yield y)
11fdf7f2
TL
343{
344 int r = writer.drain();
345 if (r < 0) {
346 return r;
347 }
348 const uint64_t actual_size = get_actual_size();
349 r = manifest_gen.create_next(actual_size);
350 if (r < 0) {
351 return r;
352 }
353
1e59de90 354 obj_ctx.set_atomic(head_obj);
11fdf7f2 355
1e59de90 356 RGWRados::Object op_target(store, bucket_info, obj_ctx, head_obj);
11fdf7f2
TL
357
358 /* some object types shouldn't be versioned, e.g., multipart parts */
1e59de90
TL
359 op_target.set_versioning_disabled(!bucket_info.versioning_enabled());
360
361 RGWRados::Object::Write obj_op(&op_target);
20effc67
TL
362 obj_op.meta.data = &first_chunk;
363 obj_op.meta.manifest = &manifest;
364 obj_op.meta.ptag = &unique_tag; /* use req_id as operation tag */
365 obj_op.meta.if_match = if_match;
366 obj_op.meta.if_nomatch = if_nomatch;
367 obj_op.meta.mtime = mtime;
368 obj_op.meta.set_mtime = set_mtime;
369 obj_op.meta.owner = owner;
370 obj_op.meta.flags = PUT_OBJ_CREATE;
371 obj_op.meta.olh_epoch = olh_epoch;
372 obj_op.meta.delete_at = delete_at;
373 obj_op.meta.user_data = user_data;
374 obj_op.meta.zones_trace = zones_trace;
375 obj_op.meta.modify_tail = true;
376
1e59de90
TL
377 read_cloudtier_info_from_attrs(attrs, obj_op.meta.category, manifest);
378
20effc67 379 r = obj_op.write_meta(dpp, actual_size, accounted_size, attrs, y);
f67539c2 380 if (r < 0) {
39ae355f
TL
381 if (r == -ETIMEDOUT) {
382 // The head object write may eventually succeed, clear the set of objects for deletion. if it
383 // doesn't ever succeed, we'll orphan any tail objects as if we'd crashed before that write
384 writer.clear_written();
385 }
f67539c2
TL
386 return r;
387 }
20effc67 388 if (!obj_op.meta.canceled) {
11fdf7f2
TL
389 // on success, clear the set of objects for deletion
390 writer.clear_written();
391 }
392 if (pcanceled) {
20effc67 393 *pcanceled = obj_op.meta.canceled;
11fdf7f2
TL
394 }
395 return 0;
396}
397
398
399int MultipartObjectProcessor::process_first_chunk(bufferlist&& data,
400 DataProcessor **processor)
401{
402 // write the first chunk of the head object as part of an exclusive create,
403 // then drain to wait for the result in case of EEXIST
404 int r = writer.write_exclusive(data);
405 if (r == -EEXIST) {
406 // randomize the oid prefix and reprepare the head/manifest
cd265ab1 407 std::string oid_rand = gen_rand_alphanumeric(store->ctx(), 32);
11fdf7f2 408
1e59de90
TL
409 mp.init(target_obj.key.name, upload_id, oid_rand);
410 manifest.set_prefix(target_obj.key.name + "." + oid_rand);
11fdf7f2
TL
411
412 r = prepare_head();
413 if (r < 0) {
414 return r;
415 }
416 // resubmit the write op on the new head object
417 r = writer.write_exclusive(data);
418 }
419 if (r < 0) {
420 return r;
421 }
422 *processor = &stripe;
423 return 0;
424}
425
426int MultipartObjectProcessor::prepare_head()
427{
428 const uint64_t default_stripe_size = store->ctx()->_conf->rgw_obj_stripe_size;
429 uint64_t chunk_size;
430 uint64_t stripe_size;
431 uint64_t alignment;
432
1e59de90 433 int r = store->get_max_chunk_size(tail_placement_rule, target_obj, &chunk_size, dpp, &alignment);
11fdf7f2 434 if (r < 0) {
9f95a23c 435 ldpp_dout(dpp, 0) << "ERROR: unexpected: get_max_chunk_size(): placement_rule=" << tail_placement_rule.to_str() << " obj=" << target_obj << " returned r=" << r << dendl;
11fdf7f2
TL
436 return r;
437 }
1e59de90 438 store->get_max_aligned_size(default_stripe_size, alignment, &stripe_size);
11fdf7f2
TL
439
440 manifest.set_multipart_part_rule(stripe_size, part_num);
441
442 r = manifest_gen.create_begin(store->ctx(), &manifest,
1e59de90 443 bucket_info.placement_rule,
f67539c2 444 &tail_placement_rule,
1e59de90 445 target_obj.bucket, target_obj);
11fdf7f2
TL
446 if (r < 0) {
447 return r;
448 }
449
f67539c2 450 rgw_raw_obj stripe_obj = manifest_gen.get_cur_obj(store);
1e59de90
TL
451 RGWSI_Tier_RADOS::raw_obj_to_obj(head_obj.bucket, stripe_obj, &head_obj);
452 head_obj.index_hash_source = target_obj.key.name;
11fdf7f2
TL
453
454 r = writer.set_stripe_obj(stripe_obj);
455 if (r < 0) {
456 return r;
457 }
458 stripe_size = manifest_gen.cur_stripe_max_size();
9f95a23c 459 set_head_chunk_size(stripe_size);
11fdf7f2
TL
460
461 chunk = ChunkProcessor(&writer, chunk_size);
9f95a23c 462 stripe = StripeProcessor(&chunk, this, stripe_size);
11fdf7f2
TL
463 return 0;
464}
465
9f95a23c 466int MultipartObjectProcessor::prepare(optional_yield y)
11fdf7f2 467{
1e59de90 468 manifest.set_prefix(target_obj.key.name + "." + upload_id);
11fdf7f2
TL
469
470 return prepare_head();
471}
472
473int MultipartObjectProcessor::complete(size_t accounted_size,
474 const std::string& etag,
475 ceph::real_time *mtime,
476 ceph::real_time set_mtime,
477 std::map<std::string, bufferlist>& attrs,
478 ceph::real_time delete_at,
479 const char *if_match,
480 const char *if_nomatch,
481 const std::string *user_data,
482 rgw_zone_set *zones_trace,
9f95a23c 483 bool *pcanceled, optional_yield y)
11fdf7f2
TL
484{
485 int r = writer.drain();
486 if (r < 0) {
487 return r;
488 }
489 const uint64_t actual_size = get_actual_size();
490 r = manifest_gen.create_next(actual_size);
491 if (r < 0) {
492 return r;
493 }
494
1e59de90 495 RGWRados::Object op_target(store, bucket_info, obj_ctx, head_obj);
20effc67
TL
496 op_target.set_versioning_disabled(true);
497 op_target.set_meta_placement_rule(&tail_placement_rule);
1e59de90
TL
498
499 RGWRados::Object::Write obj_op(&op_target);
20effc67
TL
500 obj_op.meta.set_mtime = set_mtime;
501 obj_op.meta.mtime = mtime;
502 obj_op.meta.owner = owner;
503 obj_op.meta.delete_at = delete_at;
504 obj_op.meta.zones_trace = zones_trace;
505 obj_op.meta.modify_tail = true;
11fdf7f2 506
20effc67 507 r = obj_op.write_meta(dpp, actual_size, accounted_size, attrs, y);
11fdf7f2
TL
508 if (r < 0)
509 return r;
510
11fdf7f2
TL
511 RGWUploadPartInfo info;
512 string p = "part.";
513 bool sorted_omap = is_v2_upload_id(upload_id);
514
515 if (sorted_omap) {
516 char buf[32];
517 snprintf(buf, sizeof(buf), "%08d", part_num);
518 p.append(buf);
519 } else {
520 p.append(part_num_str);
521 }
522 info.num = part_num;
523 info.etag = etag;
524 info.size = actual_size;
525 info.accounted_size = accounted_size;
526 info.modified = real_clock::now();
527 info.manifest = manifest;
528
529 bool compressed;
530 r = rgw_compression_info_from_attrset(attrs, compressed, info.cs_info);
531 if (r < 0) {
9f95a23c 532 ldpp_dout(dpp, 1) << "cannot get compression info" << dendl;
11fdf7f2
TL
533 return r;
534 }
535
1e59de90
TL
536 rgw_obj meta_obj;
537 meta_obj.init_ns(bucket_info.bucket, mp.get_meta(), RGW_OBJ_NS_MULTIPART);
538 meta_obj.set_in_extra_data(true);
539
540 rgw_raw_obj meta_raw_obj;
541 store->obj_to_raw(bucket_info.placement_rule, meta_obj, &meta_raw_obj);
542
543 rgw_rados_ref meta_obj_ref;
544 r = store->get_raw_obj_ref(dpp, meta_raw_obj, &meta_obj_ref);
545 if (r < 0) {
546 ldpp_dout(dpp, -1) << "ERROR: failed to get obj ref of meta obj with ret=" << r << dendl;
547 return r;
548 }
549
550 librados::ObjectWriteOperation op;
551 cls_rgw_mp_upload_part_info_update(op, p, info);
552 r = rgw_rados_operate(dpp, meta_obj_ref.pool.ioctx(), meta_obj_ref.obj.oid, &op, y);
553 ldpp_dout(dpp, 20) << "Update meta: " << meta_obj_ref.obj.oid << " part " << p << " prefix " << info.manifest.get_prefix() << " return " << r << dendl;
11fdf7f2 554
1e59de90
TL
555 if (r == -EOPNOTSUPP) {
556 // New CLS call to update part info is not yet supported. Fall back to the old handling.
557 bufferlist bl;
558 encode(info, bl);
11fdf7f2 559
1e59de90
TL
560 map<string, bufferlist> m;
561 m[p] = bl;
562
563 op = librados::ObjectWriteOperation{};
564 op.assert_exists(); // detect races with abort
565 op.omap_set(m);
566 r = rgw_rados_operate(dpp, meta_obj_ref.pool.ioctx(), meta_obj_ref.obj.oid, &op, y);
567 }
11fdf7f2 568 if (r < 0) {
9f95a23c 569 return r == -ENOENT ? -ERR_NO_SUCH_UPLOAD : r;
11fdf7f2
TL
570 }
571
20effc67 572 if (!obj_op.meta.canceled) {
11fdf7f2
TL
573 // on success, clear the set of objects for deletion
574 writer.clear_written();
575 }
576 if (pcanceled) {
20effc67 577 *pcanceled = obj_op.meta.canceled;
11fdf7f2
TL
578 }
579 return 0;
580}
581
20effc67 582int AppendObjectProcessor::process_first_chunk(bufferlist &&data, rgw::sal::DataProcessor **processor)
11fdf7f2
TL
583{
584 int r = writer.write_exclusive(data);
585 if (r < 0) {
586 return r;
587 }
588 *processor = &stripe;
589 return 0;
590}
591
9f95a23c 592int AppendObjectProcessor::prepare(optional_yield y)
11fdf7f2
TL
593{
594 RGWObjState *astate;
1e59de90
TL
595 int r = store->get_obj_state(dpp, &obj_ctx, bucket_info, head_obj,
596 &astate, &cur_manifest, y);
11fdf7f2
TL
597 if (r < 0) {
598 return r;
599 }
600 cur_size = astate->size;
601 *cur_accounted_size = astate->accounted_size;
602 if (!astate->exists) {
603 if (position != 0) {
9f95a23c 604 ldpp_dout(dpp, 5) << "ERROR: Append position should be zero" << dendl;
11fdf7f2
TL
605 return -ERR_POSITION_NOT_EQUAL_TO_LENGTH;
606 } else {
607 cur_part_num = 1;
608 //set the prefix
609 char buf[33];
610 gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1);
1e59de90 611 string oid_prefix = head_obj.key.name;
11fdf7f2
TL
612 oid_prefix.append(".");
613 oid_prefix.append(buf);
614 oid_prefix.append("_");
615 manifest.set_prefix(oid_prefix);
616 }
617 } else {
618 // check whether the object appendable
619 map<string, bufferlist>::iterator iter = astate->attrset.find(RGW_ATTR_APPEND_PART_NUM);
620 if (iter == astate->attrset.end()) {
9f95a23c 621 ldpp_dout(dpp, 5) << "ERROR: The object is not appendable" << dendl;
11fdf7f2
TL
622 return -ERR_OBJECT_NOT_APPENDABLE;
623 }
624 if (position != *cur_accounted_size) {
9f95a23c 625 ldpp_dout(dpp, 5) << "ERROR: Append position should be equal to the obj size" << dendl;
11fdf7f2
TL
626 return -ERR_POSITION_NOT_EQUAL_TO_LENGTH;
627 }
628 try {
f67539c2 629 using ceph::decode;
11fdf7f2
TL
630 decode(cur_part_num, iter->second);
631 } catch (buffer::error& err) {
9f95a23c 632 ldpp_dout(dpp, 5) << "ERROR: failed to decode part num" << dendl;
11fdf7f2
TL
633 return -EIO;
634 }
635 cur_part_num++;
636 //get the current obj etag
637 iter = astate->attrset.find(RGW_ATTR_ETAG);
638 if (iter != astate->attrset.end()) {
639 string s = rgw_string_unquote(iter->second.c_str());
640 size_t pos = s.find("-");
641 cur_etag = s.substr(0, pos);
642 }
9f95a23c
TL
643
644 iter = astate->attrset.find(RGW_ATTR_STORAGE_CLASS);
645 if (iter != astate->attrset.end()) {
646 tail_placement_rule.storage_class = iter->second.to_str();
1e59de90
TL
647 } else {
648 tail_placement_rule.storage_class = RGW_STORAGE_CLASS_STANDARD;
9f95a23c 649 }
11fdf7f2 650 manifest.set_prefix(cur_manifest->get_prefix());
f6b5b4d7 651 astate->keep_tail = true;
11fdf7f2
TL
652 }
653 manifest.set_multipart_part_rule(store->ctx()->_conf->rgw_obj_stripe_size, cur_part_num);
654
1e59de90 655 r = manifest_gen.create_begin(store->ctx(), &manifest, bucket_info.placement_rule, &tail_placement_rule, head_obj.bucket, head_obj);
11fdf7f2
TL
656 if (r < 0) {
657 return r;
658 }
f67539c2 659 rgw_raw_obj stripe_obj = manifest_gen.get_cur_obj(store);
11fdf7f2
TL
660
661 uint64_t chunk_size = 0;
1e59de90 662 r = store->get_max_chunk_size(stripe_obj.pool, &chunk_size, dpp);
11fdf7f2
TL
663 if (r < 0) {
664 return r;
665 }
666 r = writer.set_stripe_obj(std::move(stripe_obj));
667 if (r < 0) {
668 return r;
669 }
670
671 uint64_t stripe_size = manifest_gen.cur_stripe_max_size();
672
673 uint64_t max_head_size = std::min(chunk_size, stripe_size);
674 set_head_chunk_size(max_head_size);
675
676 // initialize the processors
677 chunk = ChunkProcessor(&writer, chunk_size);
678 stripe = StripeProcessor(&chunk, this, stripe_size);
679
680 return 0;
681}
682
683int AppendObjectProcessor::complete(size_t accounted_size, const string &etag, ceph::real_time *mtime,
20effc67 684 ceph::real_time set_mtime, rgw::sal::Attrs& attrs,
11fdf7f2 685 ceph::real_time delete_at, const char *if_match, const char *if_nomatch,
9f95a23c
TL
686 const string *user_data, rgw_zone_set *zones_trace, bool *pcanceled,
687 optional_yield y)
11fdf7f2
TL
688{
689 int r = writer.drain();
690 if (r < 0)
691 return r;
692 const uint64_t actual_size = get_actual_size();
693 r = manifest_gen.create_next(actual_size);
694 if (r < 0) {
695 return r;
696 }
1e59de90
TL
697 obj_ctx.set_atomic(head_obj);
698 RGWRados::Object op_target(store, bucket_info, obj_ctx, head_obj);
11fdf7f2 699 //For Append obj, disable versioning
20effc67 700 op_target.set_versioning_disabled(true);
1e59de90 701 RGWRados::Object::Write obj_op(&op_target);
11fdf7f2 702 if (cur_manifest) {
1e59de90 703 cur_manifest->append(dpp, manifest, store->svc.zone->get_zonegroup(), store->svc.zone->get_zone_params());
20effc67 704 obj_op.meta.manifest = cur_manifest;
11fdf7f2 705 } else {
20effc67
TL
706 obj_op.meta.manifest = &manifest;
707 }
708 obj_op.meta.ptag = &unique_tag; /* use req_id as operation tag */
709 obj_op.meta.mtime = mtime;
710 obj_op.meta.set_mtime = set_mtime;
711 obj_op.meta.owner = owner;
712 obj_op.meta.flags = PUT_OBJ_CREATE;
713 obj_op.meta.delete_at = delete_at;
714 obj_op.meta.user_data = user_data;
715 obj_op.meta.zones_trace = zones_trace;
716 obj_op.meta.modify_tail = true;
717 obj_op.meta.appendable = true;
11fdf7f2
TL
718 //Add the append part number
719 bufferlist cur_part_num_bl;
f67539c2 720 using ceph::encode;
11fdf7f2
TL
721 encode(cur_part_num, cur_part_num_bl);
722 attrs[RGW_ATTR_APPEND_PART_NUM] = cur_part_num_bl;
723 //calculate the etag
724 if (!cur_etag.empty()) {
725 MD5 hash;
20effc67
TL
726 // Allow use of MD5 digest in FIPS mode for non-cryptographic purposes
727 hash.SetFlags(EVP_MD_CTX_FLAG_NON_FIPS_ALLOW);
11fdf7f2
TL
728 char petag[CEPH_CRYPTO_MD5_DIGESTSIZE];
729 char final_etag[CEPH_CRYPTO_MD5_DIGESTSIZE];
730 char final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 16];
731 hex_to_buf(cur_etag.c_str(), petag, CEPH_CRYPTO_MD5_DIGESTSIZE);
732 hash.Update((const unsigned char *)petag, sizeof(petag));
733 hex_to_buf(etag.c_str(), petag, CEPH_CRYPTO_MD5_DIGESTSIZE);
734 hash.Update((const unsigned char *)petag, sizeof(petag));
735 hash.Final((unsigned char *)final_etag);
736 buf_to_hex((unsigned char *)final_etag, sizeof(final_etag), final_etag_str);
737 snprintf(&final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2], sizeof(final_etag_str) - CEPH_CRYPTO_MD5_DIGESTSIZE * 2,
738 "-%lld", (long long)cur_part_num);
739 bufferlist etag_bl;
740 etag_bl.append(final_etag_str, strlen(final_etag_str) + 1);
741 attrs[RGW_ATTR_ETAG] = etag_bl;
742 }
20effc67
TL
743 r = obj_op.write_meta(dpp, actual_size + cur_size,
744 accounted_size + *cur_accounted_size,
745 attrs, y);
11fdf7f2
TL
746 if (r < 0) {
747 return r;
748 }
20effc67 749 if (!obj_op.meta.canceled) {
11fdf7f2
TL
750 // on success, clear the set of objects for deletion
751 writer.clear_written();
752 }
753 if (pcanceled) {
20effc67 754 *pcanceled = obj_op.meta.canceled;
11fdf7f2
TL
755 }
756 *cur_accounted_size += accounted_size;
757
758 return 0;
759}
760
761} // namespace rgw::putobj