]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_putobj_processor.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rgw / rgw_putobj_processor.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 /*
5 * Ceph - scalable distributed file system
6 *
7 * Copyright (C) 2018 Red Hat, Inc.
8 *
9 * This is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Lesser General Public
11 * License version 2.1, as published by the Free Software
12 * Foundation. See file COPYING.
13 *
14 */
15
16 #include "rgw_aio.h"
17 #include "rgw_putobj_processor.h"
18 #include "rgw_multi.h"
19 #include "services/svc_sys_obj.h"
20
21 #define dout_subsys ceph_subsys_rgw
22
23 namespace rgw::putobj {
24
25 int HeadObjectProcessor::process(bufferlist&& data, uint64_t logical_offset)
26 {
27 const bool flush = (data.length() == 0);
28
29 // capture the first chunk for special handling
30 if (data_offset < head_chunk_size || data_offset == 0) {
31 if (flush) {
32 // flush partial chunk
33 return process_first_chunk(std::move(head_data), &processor);
34 }
35
36 auto remaining = head_chunk_size - data_offset;
37 auto count = std::min<uint64_t>(data.length(), remaining);
38 data.splice(0, count, &head_data);
39 data_offset += count;
40
41 if (data_offset == head_chunk_size) {
42 // process the first complete chunk
43 ceph_assert(head_data.length() == head_chunk_size);
44 int r = process_first_chunk(std::move(head_data), &processor);
45 if (r < 0) {
46 return r;
47 }
48 }
49 if (data.length() == 0) { // avoid flushing stripe processor
50 return 0;
51 }
52 }
53 ceph_assert(processor); // process_first_chunk() must initialize
54
55 // send everything else through the processor
56 auto write_offset = data_offset;
57 data_offset += data.length();
58 return processor->process(std::move(data), write_offset);
59 }
60
61
62 static int process_completed(const AioResultList& completed, RawObjSet *written)
63 {
64 std::optional<int> error;
65 for (auto& r : completed) {
66 if (r.result >= 0) {
67 written->insert(r.obj.get_ref().obj);
68 } else if (!error) { // record first error code
69 error = r.result;
70 }
71 }
72 return error.value_or(0);
73 }
74
75 int RadosWriter::set_stripe_obj(const rgw_raw_obj& raw_obj)
76 {
77 stripe_obj = store->svc.rados->obj(raw_obj);
78 return stripe_obj.open();
79 }
80
81 int RadosWriter::process(bufferlist&& bl, uint64_t offset)
82 {
83 bufferlist data = std::move(bl);
84 const uint64_t cost = data.length();
85 if (cost == 0) { // no empty writes, use aio directly for creates
86 return 0;
87 }
88 librados::ObjectWriteOperation op;
89 if (offset == 0) {
90 op.write_full(data);
91 } else {
92 op.write(offset, data);
93 }
94 constexpr uint64_t id = 0; // unused
95 auto c = aio->submit(stripe_obj, &op, cost, id);
96 return process_completed(c, &written);
97 }
98
99 int RadosWriter::write_exclusive(const bufferlist& data)
100 {
101 const uint64_t cost = data.length();
102
103 librados::ObjectWriteOperation op;
104 op.create(true); // exclusive create
105 op.write_full(data);
106
107 constexpr uint64_t id = 0; // unused
108 auto c = aio->submit(stripe_obj, &op, cost, id);
109 auto d = aio->drain();
110 c.splice(c.end(), d);
111 return process_completed(c, &written);
112 }
113
114 int RadosWriter::drain()
115 {
116 return process_completed(aio->drain(), &written);
117 }
118
119 RadosWriter::~RadosWriter()
120 {
121 // wait on any outstanding aio completions
122 process_completed(aio->drain(), &written);
123
124 bool need_to_remove_head = false;
125 std::optional<rgw_raw_obj> raw_head;
126 if (!head_obj.empty()) {
127 raw_head.emplace();
128 store->obj_to_raw(bucket_info.placement_rule, head_obj, &*raw_head);
129 }
130
131 /**
132 * We should delete the object in the "multipart" namespace to avoid race condition.
133 * Such race condition is caused by the fact that the multipart object is the gatekeeper of a multipart
134 * upload, when it is deleted, a second upload would start with the same suffix("2/"), therefore, objects
135 * written by the second upload may be deleted by the first upload.
136 * details is describled on #11749
137 *
138 * The above comment still stands, but instead of searching for a specific object in the multipart
139 * namespace, we just make sure that we remove the object that is marked as the head object after
140 * we remove all the other raw objects. Note that we use different call to remove the head object,
141 * as this one needs to go via the bucket index prepare/complete 2-phase commit scheme.
142 */
143 for (const auto& obj : written) {
144 if (raw_head && obj == *raw_head) {
145 ldout(store->ctx(), 5) << "NOTE: we should not process the head object (" << obj << ") here" << dendl;
146 need_to_remove_head = true;
147 continue;
148 }
149
150 int r = store->delete_raw_obj(obj);
151 if (r < 0 && r != -ENOENT) {
152 ldout(store->ctx(), 5) << "WARNING: failed to remove obj (" << obj << "), leaked" << dendl;
153 }
154 }
155
156 if (need_to_remove_head) {
157 ldout(store->ctx(), 5) << "NOTE: we are going to process the head obj (" << *raw_head << ")" << dendl;
158 int r = store->delete_obj(obj_ctx, bucket_info, head_obj, 0, 0);
159 if (r < 0 && r != -ENOENT) {
160 ldout(store->ctx(), 0) << "WARNING: failed to remove obj (" << *raw_head << "), leaked" << dendl;
161 }
162 }
163 }
164
165
166 // advance to the next stripe
167 int ManifestObjectProcessor::next(uint64_t offset, uint64_t *pstripe_size)
168 {
169 // advance the manifest
170 int r = manifest_gen.create_next(offset);
171 if (r < 0) {
172 return r;
173 }
174
175 rgw_raw_obj stripe_obj = manifest_gen.get_cur_obj(store);
176
177 uint64_t chunk_size = 0;
178 r = store->get_max_chunk_size(stripe_obj.pool, &chunk_size);
179 if (r < 0) {
180 return r;
181 }
182 r = writer.set_stripe_obj(stripe_obj);
183 if (r < 0) {
184 return r;
185 }
186
187 chunk = ChunkProcessor(&writer, chunk_size);
188 *pstripe_size = manifest_gen.cur_stripe_max_size();
189 return 0;
190 }
191
192
193 int AtomicObjectProcessor::process_first_chunk(bufferlist&& data,
194 DataProcessor **processor)
195 {
196 first_chunk = std::move(data);
197 *processor = &stripe;
198 return 0;
199 }
200
201 int AtomicObjectProcessor::prepare()
202 {
203 uint64_t max_head_chunk_size;
204 uint64_t head_max_size;
205 uint64_t chunk_size = 0;
206 uint64_t alignment;
207 rgw_pool head_pool;
208
209 if (!store->get_obj_data_pool(bucket_info.placement_rule, head_obj, &head_pool)) {
210 return -EIO;
211 }
212
213 int r = store->get_max_chunk_size(head_pool, &max_head_chunk_size, &alignment);
214 if (r < 0) {
215 return r;
216 }
217
218 bool same_pool = true;
219
220 if (bucket_info.placement_rule != tail_placement_rule) {
221 rgw_pool tail_pool;
222 if (!store->get_obj_data_pool(tail_placement_rule, head_obj, &tail_pool)) {
223 return -EIO;
224 }
225
226 if (tail_pool != head_pool) {
227 same_pool = false;
228
229 r = store->get_max_chunk_size(tail_pool, &chunk_size);
230 if (r < 0) {
231 return r;
232 }
233
234 head_max_size = 0;
235 }
236 }
237
238 if (same_pool) {
239 head_max_size = max_head_chunk_size;
240 chunk_size = max_head_chunk_size;
241 }
242
243 uint64_t stripe_size;
244 const uint64_t default_stripe_size = store->ctx()->_conf->rgw_obj_stripe_size;
245
246 store->get_max_aligned_size(default_stripe_size, alignment, &stripe_size);
247
248 manifest.set_trivial_rule(head_max_size, stripe_size);
249
250 r = manifest_gen.create_begin(store->ctx(), &manifest,
251 bucket_info.placement_rule,
252 &tail_placement_rule,
253 head_obj.bucket, head_obj);
254 if (r < 0) {
255 return r;
256 }
257
258 rgw_raw_obj stripe_obj = manifest_gen.get_cur_obj(store);
259
260 r = writer.set_stripe_obj(stripe_obj);
261 if (r < 0) {
262 return r;
263 }
264
265 set_head_chunk_size(head_max_size);
266 // initialize the processors
267 chunk = ChunkProcessor(&writer, chunk_size);
268 stripe = StripeProcessor(&chunk, this, head_max_size);
269 return 0;
270 }
271
272 int AtomicObjectProcessor::complete(size_t accounted_size,
273 const std::string& etag,
274 ceph::real_time *mtime,
275 ceph::real_time set_mtime,
276 std::map<std::string, bufferlist>& attrs,
277 ceph::real_time delete_at,
278 const char *if_match,
279 const char *if_nomatch,
280 const std::string *user_data,
281 rgw_zone_set *zones_trace,
282 bool *pcanceled)
283 {
284 int r = writer.drain();
285 if (r < 0) {
286 return r;
287 }
288 const uint64_t actual_size = get_actual_size();
289 r = manifest_gen.create_next(actual_size);
290 if (r < 0) {
291 return r;
292 }
293
294 obj_ctx.set_atomic(head_obj);
295
296 RGWRados::Object op_target(store, bucket_info, obj_ctx, head_obj);
297
298 /* some object types shouldn't be versioned, e.g., multipart parts */
299 op_target.set_versioning_disabled(!bucket_info.versioning_enabled());
300
301 RGWRados::Object::Write obj_op(&op_target);
302
303 obj_op.meta.data = &first_chunk;
304 obj_op.meta.manifest = &manifest;
305 obj_op.meta.ptag = &unique_tag; /* use req_id as operation tag */
306 obj_op.meta.if_match = if_match;
307 obj_op.meta.if_nomatch = if_nomatch;
308 obj_op.meta.mtime = mtime;
309 obj_op.meta.set_mtime = set_mtime;
310 obj_op.meta.owner = owner;
311 obj_op.meta.flags = PUT_OBJ_CREATE;
312 obj_op.meta.olh_epoch = olh_epoch;
313 obj_op.meta.delete_at = delete_at;
314 obj_op.meta.user_data = user_data;
315 obj_op.meta.zones_trace = zones_trace;
316 obj_op.meta.modify_tail = true;
317
318 r = obj_op.write_meta(actual_size, accounted_size, attrs);
319 if (r < 0) {
320 return r;
321 }
322 if (!obj_op.meta.canceled) {
323 // on success, clear the set of objects for deletion
324 writer.clear_written();
325 }
326 if (pcanceled) {
327 *pcanceled = obj_op.meta.canceled;
328 }
329 return 0;
330 }
331
332
333 int MultipartObjectProcessor::process_first_chunk(bufferlist&& data,
334 DataProcessor **processor)
335 {
336 // write the first chunk of the head object as part of an exclusive create,
337 // then drain to wait for the result in case of EEXIST
338 int r = writer.write_exclusive(data);
339 if (r == -EEXIST) {
340 // randomize the oid prefix and reprepare the head/manifest
341 std::string oid_rand(32, 0);
342 gen_rand_alphanumeric(store->ctx(), oid_rand.data(), oid_rand.size());
343
344 mp.init(target_obj.key.name, upload_id, oid_rand);
345 manifest.set_prefix(target_obj.key.name + "." + oid_rand);
346
347 r = prepare_head();
348 if (r < 0) {
349 return r;
350 }
351 // resubmit the write op on the new head object
352 r = writer.write_exclusive(data);
353 }
354 if (r < 0) {
355 return r;
356 }
357 *processor = &stripe;
358 return 0;
359 }
360
361 int MultipartObjectProcessor::prepare_head()
362 {
363 const uint64_t default_stripe_size = store->ctx()->_conf->rgw_obj_stripe_size;
364 uint64_t chunk_size;
365 uint64_t stripe_size;
366 uint64_t alignment;
367
368 int r = store->get_max_chunk_size(tail_placement_rule, target_obj, &chunk_size, &alignment);
369 if (r < 0) {
370 ldout(store->ctx(), 0) << "ERROR: unexpected: get_max_chunk_size(): placement_rule=" << tail_placement_rule.to_str() << " obj=" << target_obj << " returned r=" << r << dendl;
371 return r;
372 }
373 store->get_max_aligned_size(default_stripe_size, alignment, &stripe_size);
374
375 manifest.set_multipart_part_rule(stripe_size, part_num);
376
377 r = manifest_gen.create_begin(store->ctx(), &manifest,
378 bucket_info.placement_rule,
379 &tail_placement_rule,
380 target_obj.bucket, target_obj);
381 if (r < 0) {
382 return r;
383 }
384
385 rgw_raw_obj stripe_obj = manifest_gen.get_cur_obj(store);
386 rgw_raw_obj_to_obj(head_obj.bucket, stripe_obj, &head_obj);
387 head_obj.index_hash_source = target_obj.key.name;
388
389 r = writer.set_stripe_obj(stripe_obj);
390 if (r < 0) {
391 return r;
392 }
393 stripe_size = manifest_gen.cur_stripe_max_size();
394
395 uint64_t max_head_size = std::min(chunk_size, stripe_size);
396 set_head_chunk_size(max_head_size);
397
398 chunk = ChunkProcessor(&writer, chunk_size);
399 stripe = StripeProcessor(&chunk, this, max_head_size);
400 return 0;
401 }
402
403 int MultipartObjectProcessor::prepare()
404 {
405 manifest.set_prefix(target_obj.key.name + "." + upload_id);
406
407 return prepare_head();
408 }
409
410 int MultipartObjectProcessor::complete(size_t accounted_size,
411 const std::string& etag,
412 ceph::real_time *mtime,
413 ceph::real_time set_mtime,
414 std::map<std::string, bufferlist>& attrs,
415 ceph::real_time delete_at,
416 const char *if_match,
417 const char *if_nomatch,
418 const std::string *user_data,
419 rgw_zone_set *zones_trace,
420 bool *pcanceled)
421 {
422 int r = writer.drain();
423 if (r < 0) {
424 return r;
425 }
426 const uint64_t actual_size = get_actual_size();
427 r = manifest_gen.create_next(actual_size);
428 if (r < 0) {
429 return r;
430 }
431
432 RGWRados::Object op_target(store, bucket_info, obj_ctx, head_obj);
433 op_target.set_versioning_disabled(true);
434 RGWRados::Object::Write obj_op(&op_target);
435
436 obj_op.meta.set_mtime = set_mtime;
437 obj_op.meta.mtime = mtime;
438 obj_op.meta.owner = owner;
439 obj_op.meta.delete_at = delete_at;
440 obj_op.meta.zones_trace = zones_trace;
441 obj_op.meta.modify_tail = true;
442
443 r = obj_op.write_meta(actual_size, accounted_size, attrs);
444 if (r < 0)
445 return r;
446
447 bufferlist bl;
448 RGWUploadPartInfo info;
449 string p = "part.";
450 bool sorted_omap = is_v2_upload_id(upload_id);
451
452 if (sorted_omap) {
453 char buf[32];
454 snprintf(buf, sizeof(buf), "%08d", part_num);
455 p.append(buf);
456 } else {
457 p.append(part_num_str);
458 }
459 info.num = part_num;
460 info.etag = etag;
461 info.size = actual_size;
462 info.accounted_size = accounted_size;
463 info.modified = real_clock::now();
464 info.manifest = manifest;
465
466 bool compressed;
467 r = rgw_compression_info_from_attrset(attrs, compressed, info.cs_info);
468 if (r < 0) {
469 ldout(store->ctx(), 1) << "cannot get compression info" << dendl;
470 return r;
471 }
472
473 encode(info, bl);
474
475 rgw_obj meta_obj;
476 meta_obj.init_ns(bucket_info.bucket, mp.get_meta(), RGW_OBJ_NS_MULTIPART);
477 meta_obj.set_in_extra_data(true);
478
479 rgw_raw_obj raw_meta_obj;
480
481 store->obj_to_raw(bucket_info.placement_rule, meta_obj, &raw_meta_obj);
482
483 auto obj_ctx = store->svc.sysobj->init_obj_ctx();
484 auto sysobj = obj_ctx.get_obj(raw_meta_obj);
485
486 r = sysobj.omap()
487 .set_must_exist(true)
488 .set(p, bl);
489 if (r < 0) {
490 return r;
491 }
492
493 if (!obj_op.meta.canceled) {
494 // on success, clear the set of objects for deletion
495 writer.clear_written();
496 }
497 if (pcanceled) {
498 *pcanceled = obj_op.meta.canceled;
499 }
500 return 0;
501 }
502
503 int AppendObjectProcessor::process_first_chunk(bufferlist &&data, rgw::putobj::DataProcessor **processor)
504 {
505 int r = writer.write_exclusive(data);
506 if (r < 0) {
507 return r;
508 }
509 *processor = &stripe;
510 return 0;
511 }
512
513 int AppendObjectProcessor::prepare()
514 {
515 RGWObjState *astate;
516 int r = store->get_obj_state(&obj_ctx, bucket_info, head_obj, &astate);
517 if (r < 0) {
518 return r;
519 }
520 cur_size = astate->size;
521 *cur_accounted_size = astate->accounted_size;
522 if (!astate->exists) {
523 if (position != 0) {
524 ldout(store->ctx(), 5) << "ERROR: Append position should be zero" << dendl;
525 return -ERR_POSITION_NOT_EQUAL_TO_LENGTH;
526 } else {
527 cur_part_num = 1;
528 //set the prefix
529 char buf[33];
530 gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1);
531 string oid_prefix = head_obj.key.name;
532 oid_prefix.append(".");
533 oid_prefix.append(buf);
534 oid_prefix.append("_");
535 manifest.set_prefix(oid_prefix);
536 }
537 } else {
538 // check whether the object appendable
539 map<string, bufferlist>::iterator iter = astate->attrset.find(RGW_ATTR_APPEND_PART_NUM);
540 if (iter == astate->attrset.end()) {
541 ldout(store->ctx(), 5) << "ERROR: The object is not appendable" << dendl;
542 return -ERR_OBJECT_NOT_APPENDABLE;
543 }
544 if (position != *cur_accounted_size) {
545 ldout(store->ctx(), 5) << "ERROR: Append position should be equal to the obj size" << dendl;
546 return -ERR_POSITION_NOT_EQUAL_TO_LENGTH;
547 }
548 try {
549 decode(cur_part_num, iter->second);
550 } catch (buffer::error& err) {
551 ldout(store->ctx(), 5) << "ERROR: failed to decode part num" << dendl;
552 return -EIO;
553 }
554 cur_part_num++;
555 //get the current obj etag
556 iter = astate->attrset.find(RGW_ATTR_ETAG);
557 if (iter != astate->attrset.end()) {
558 string s = rgw_string_unquote(iter->second.c_str());
559 size_t pos = s.find("-");
560 cur_etag = s.substr(0, pos);
561 }
562 cur_manifest = &astate->manifest;
563 manifest.set_prefix(cur_manifest->get_prefix());
564 }
565 manifest.set_multipart_part_rule(store->ctx()->_conf->rgw_obj_stripe_size, cur_part_num);
566
567 r = manifest_gen.create_begin(store->ctx(), &manifest, bucket_info.placement_rule, &tail_placement_rule, head_obj.bucket, head_obj);
568 if (r < 0) {
569 return r;
570 }
571 rgw_raw_obj stripe_obj = manifest_gen.get_cur_obj(store);
572
573 uint64_t chunk_size = 0;
574 r = store->get_max_chunk_size(stripe_obj.pool, &chunk_size);
575 if (r < 0) {
576 return r;
577 }
578 r = writer.set_stripe_obj(std::move(stripe_obj));
579 if (r < 0) {
580 return r;
581 }
582
583 uint64_t stripe_size = manifest_gen.cur_stripe_max_size();
584
585 uint64_t max_head_size = std::min(chunk_size, stripe_size);
586 set_head_chunk_size(max_head_size);
587
588 // initialize the processors
589 chunk = ChunkProcessor(&writer, chunk_size);
590 stripe = StripeProcessor(&chunk, this, stripe_size);
591
592 return 0;
593 }
594
595 int AppendObjectProcessor::complete(size_t accounted_size, const string &etag, ceph::real_time *mtime,
596 ceph::real_time set_mtime, map <string, bufferlist> &attrs,
597 ceph::real_time delete_at, const char *if_match, const char *if_nomatch,
598 const string *user_data, rgw_zone_set *zones_trace, bool *pcanceled)
599 {
600 int r = writer.drain();
601 if (r < 0)
602 return r;
603 const uint64_t actual_size = get_actual_size();
604 r = manifest_gen.create_next(actual_size);
605 if (r < 0) {
606 return r;
607 }
608 obj_ctx.set_atomic(head_obj);
609 RGWRados::Object op_target(store, bucket_info, obj_ctx, head_obj);
610 //For Append obj, disable versioning
611 op_target.set_versioning_disabled(true);
612 RGWRados::Object::Write obj_op(&op_target);
613 if (cur_manifest) {
614 cur_manifest->append(manifest, store->svc.zone);
615 obj_op.meta.manifest = cur_manifest;
616 } else {
617 obj_op.meta.manifest = &manifest;
618 }
619 obj_op.meta.ptag = &unique_tag; /* use req_id as operation tag */
620 obj_op.meta.mtime = mtime;
621 obj_op.meta.set_mtime = set_mtime;
622 obj_op.meta.owner = owner;
623 obj_op.meta.flags = PUT_OBJ_CREATE;
624 obj_op.meta.delete_at = delete_at;
625 obj_op.meta.user_data = user_data;
626 obj_op.meta.zones_trace = zones_trace;
627 obj_op.meta.modify_tail = true;
628 obj_op.meta.appendable = true;
629 //Add the append part number
630 bufferlist cur_part_num_bl;
631 encode(cur_part_num, cur_part_num_bl);
632 attrs[RGW_ATTR_APPEND_PART_NUM] = cur_part_num_bl;
633 //calculate the etag
634 if (!cur_etag.empty()) {
635 MD5 hash;
636 char petag[CEPH_CRYPTO_MD5_DIGESTSIZE];
637 char final_etag[CEPH_CRYPTO_MD5_DIGESTSIZE];
638 char final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 16];
639 hex_to_buf(cur_etag.c_str(), petag, CEPH_CRYPTO_MD5_DIGESTSIZE);
640 hash.Update((const unsigned char *)petag, sizeof(petag));
641 hex_to_buf(etag.c_str(), petag, CEPH_CRYPTO_MD5_DIGESTSIZE);
642 hash.Update((const unsigned char *)petag, sizeof(petag));
643 hash.Final((unsigned char *)final_etag);
644 buf_to_hex((unsigned char *)final_etag, sizeof(final_etag), final_etag_str);
645 snprintf(&final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2], sizeof(final_etag_str) - CEPH_CRYPTO_MD5_DIGESTSIZE * 2,
646 "-%lld", (long long)cur_part_num);
647 bufferlist etag_bl;
648 etag_bl.append(final_etag_str, strlen(final_etag_str) + 1);
649 attrs[RGW_ATTR_ETAG] = etag_bl;
650 }
651 r = obj_op.write_meta(actual_size + cur_size, accounted_size + *cur_accounted_size, attrs);
652 if (r < 0) {
653 return r;
654 }
655 if (!obj_op.meta.canceled) {
656 // on success, clear the set of objects for deletion
657 writer.clear_written();
658 }
659 if (pcanceled) {
660 *pcanceled = obj_op.meta.canceled;
661 }
662 *cur_accounted_size += accounted_size;
663
664 return 0;
665 }
666
667 } // namespace rgw::putobj