1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "pg_backend.h"
8 #include <boost/range/adaptor/filtered.hpp>
9 #include <boost/range/adaptor/transformed.hpp>
10 #include <boost/range/algorithm/copy.hpp>
11 #include <fmt/format.h>
12 #include <fmt/ostream.h>
13 #include <seastar/core/print.hh>
15 #include "messages/MOSDOp.h"
16 #include "os/Transaction.h"
17 #include "common/Checksummer.h"
18 #include "common/Clock.h"
20 #include "crimson/common/exception.h"
21 #include "crimson/common/tmap_helpers.h"
22 #include "crimson/os/futurized_collection.h"
23 #include "crimson/os/futurized_store.h"
24 #include "crimson/osd/osd_operation.h"
25 #include "crimson/osd/object_context_loader.h"
26 #include "replicated_backend.h"
27 #include "replicated_recovery_backend.h"
28 #include "ec_backend.h"
29 #include "exceptions.h"
32 seastar::logger
& logger() {
33 return crimson::get_logger(ceph_subsys_osd
);
37 using std::runtime_error
;
39 using std::string_view
;
40 using crimson::common::local_conf
;
42 std::unique_ptr
<PGBackend
>
43 PGBackend::create(pg_t pgid
,
44 const pg_shard_t pg_shard
,
45 const pg_pool_t
& pool
,
46 crimson::os::CollectionRef coll
,
47 crimson::osd::ShardServices
& shard_services
,
48 const ec_profile_t
& ec_profile
,
49 DoutPrefixProvider
&dpp
)
52 case pg_pool_t::TYPE_REPLICATED
:
53 return std::make_unique
<ReplicatedBackend
>(pgid
, pg_shard
,
56 case pg_pool_t::TYPE_ERASURE
:
57 return std::make_unique
<ECBackend
>(pg_shard
.shard
, coll
, shard_services
,
58 std::move(ec_profile
),
62 throw runtime_error(seastar::format("unsupported pool type '{}'",
67 PGBackend::PGBackend(shard_id_t shard
,
69 crimson::osd::ShardServices
&shard_services
,
70 DoutPrefixProvider
&dpp
)
73 shard_services
{shard_services
},
75 store
{&shard_services
.get_store()}
78 PGBackend::load_metadata_iertr::future
79 <PGBackend::loaded_object_md_t::ref
>
80 PGBackend::load_metadata(const hobject_t
& oid
)
82 return interruptor::make_interruptible(store
->get_attrs(
84 ghobject_t
{oid
, ghobject_t::NO_GEN
, shard
})).safe_then_interruptible(
85 [oid
](auto &&attrs
) -> load_metadata_ertr::future
<loaded_object_md_t::ref
>{
86 loaded_object_md_t::ref
ret(new loaded_object_md_t());
87 if (auto oiiter
= attrs
.find(OI_ATTR
); oiiter
!= attrs
.end()) {
88 bufferlist bl
= std::move(oiiter
->second
);
90 ret
->os
= ObjectState(
91 object_info_t(bl
, oid
),
93 } catch (const buffer::error
&) {
94 logger().warn("unable to decode ObjectState");
95 throw crimson::osd::invalid_argument();
99 "load_metadata: object {} present but missing object info",
101 return crimson::ct_error::object_corrupted::make();
105 // Returning object_corrupted when the object exsits and the
106 // Snapset is either not found or empty.
107 bool object_corrupted
= true;
108 if (auto ssiter
= attrs
.find(SS_ATTR
); ssiter
!= attrs
.end()) {
109 object_corrupted
= false;
110 bufferlist bl
= std::move(ssiter
->second
);
112 ret
->ssc
= new crimson::osd::SnapSetContext(oid
.get_snapdir());
114 ret
->ssc
->snapset
= SnapSet(bl
);
115 ret
->ssc
->exists
= true;
117 "load_metadata: object {} and snapset {} present",
118 oid
, ret
->ssc
->snapset
);
119 } catch (const buffer::error
&) {
120 logger().warn("unable to decode SnapSet");
121 throw crimson::osd::invalid_argument();
124 object_corrupted
= true;
127 if (object_corrupted
) {
129 "load_metadata: object {} present but missing snapset",
131 return crimson::ct_error::object_corrupted::make();
135 return load_metadata_ertr::make_ready_future
<loaded_object_md_t::ref
>(
137 }, crimson::ct_error::enoent::handle([oid
] {
139 "load_metadata: object {} doesn't exist, returning empty metadata",
141 return load_metadata_ertr::make_ready_future
<loaded_object_md_t::ref
>(
142 new loaded_object_md_t
{
146 oid
.is_head() ? (new crimson::osd::SnapSetContext(oid
)) : nullptr
151 PGBackend::rep_op_fut_t
152 PGBackend::mutate_object(
153 std::set
<pg_shard_t
> pg_shards
,
154 crimson::osd::ObjectContextRef
&&obc
,
155 ceph::os::Transaction
&& txn
,
156 osd_op_params_t
&& osd_op_p
,
159 std::vector
<pg_log_entry_t
>&& log_entries
)
161 logger().trace("mutate_object: num_ops={}", txn
.get_num_ops());
162 if (obc
->obs
.exists
) {
164 obc
->obs
.oi
.version
= ctx
->at_version
;
165 obc
->obs
.oi
.prior_version
= ctx
->obs
->oi
.version
;
168 obc
->obs
.oi
.prior_version
= obc
->obs
.oi
.version
;
169 obc
->obs
.oi
.version
= osd_op_p
.at_version
;
170 if (osd_op_p
.user_at_version
> obc
->obs
.oi
.user_version
)
171 obc
->obs
.oi
.user_version
= osd_op_p
.user_at_version
;
172 obc
->obs
.oi
.last_reqid
= osd_op_p
.req_id
;
173 obc
->obs
.oi
.mtime
= osd_op_p
.mtime
;
174 obc
->obs
.oi
.local_mtime
= ceph_clock_now();
178 ceph::bufferlist osv
;
179 obc
->obs
.oi
.encode_no_oid(osv
, CEPH_FEATURES_ALL
);
180 // TODO: get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD, nullptr));
181 txn
.setattr(coll
->get_cid(), ghobject_t
{obc
->obs
.oi
.soid
}, OI_ATTR
, osv
);
185 if (obc
->obs
.oi
.soid
.snap
== CEPH_NOSNAP
) {
186 logger().debug("final snapset {} in {}",
187 obc
->ssc
->snapset
, obc
->obs
.oi
.soid
);
188 ceph::bufferlist bss
;
189 encode(obc
->ssc
->snapset
, bss
);
190 txn
.setattr(coll
->get_cid(), ghobject_t
{obc
->obs
.oi
.soid
}, SS_ATTR
, bss
);
191 obc
->ssc
->exists
= true;
193 logger().debug("no snapset (this is a clone)");
196 // reset cached ObjectState without enforcing eviction
197 obc
->obs
.oi
= object_info_t(obc
->obs
.oi
.soid
);
199 return _submit_transaction(
200 std::move(pg_shards
), obc
->obs
.oi
.soid
, std::move(txn
),
201 std::move(osd_op_p
), min_epoch
, map_epoch
, std::move(log_entries
));
204 static inline bool _read_verify_data(
205 const object_info_t
& oi
,
206 const ceph::bufferlist
& data
)
208 if (oi
.is_data_digest() && oi
.size
== data
.length()) {
209 // whole object? can we verify the checksum?
210 if (auto crc
= data
.crc32c(-1); crc
!= oi
.data_digest
) {
211 logger().error("full-object read crc {} != expected {} on {}",
212 crc
, oi
.data_digest
, oi
.soid
);
213 // todo: mark soid missing, perform recovery, and retry
220 PGBackend::read_ierrorator::future
<>
221 PGBackend::read(const ObjectState
& os
, OSDOp
& osd_op
,
222 object_stat_sum_t
& delta_stats
)
224 const auto& oi
= os
.oi
;
225 const ceph_osd_op
& op
= osd_op
.op
;
226 const uint64_t offset
= op
.extent
.offset
;
227 uint64_t length
= op
.extent
.length
;
228 logger().trace("read: {} {}~{}", oi
.soid
, offset
, length
);
230 if (!os
.exists
|| os
.oi
.is_whiteout()) {
231 logger().debug("{}: {} DNE", __func__
, os
.oi
.soid
);
232 return crimson::ct_error::enoent::make();
234 // are we beyond truncate_size?
235 size_t size
= oi
.size
;
236 if ((op
.extent
.truncate_seq
> oi
.truncate_seq
) &&
237 (op
.extent
.truncate_size
< offset
+ length
) &&
238 (op
.extent
.truncate_size
< size
)) {
239 size
= op
.extent
.truncate_size
;
241 if (offset
>= size
) {
242 // read size was trimmed to zero and it is expected to do nothing,
243 return read_errorator::now();
246 // read the whole object if length is 0
249 return _read(oi
.soid
, offset
, length
, op
.flags
).safe_then_interruptible_tuple(
250 [&delta_stats
, &oi
, &osd_op
](auto&& bl
) -> read_errorator::future
<> {
251 if (!_read_verify_data(oi
, bl
)) {
253 return crimson::ct_error::object_corrupted::make();
255 logger().debug("read: data length: {}", bl
.length());
256 osd_op
.op
.extent
.length
= bl
.length();
258 delta_stats
.num_rd
++;
259 delta_stats
.num_rd_kb
+= shift_round_up(bl
.length(), 10);
260 osd_op
.outdata
= std::move(bl
);
261 return read_errorator::now();
262 }, crimson::ct_error::input_output_error::handle([] {
263 return read_errorator::future
<>{crimson::ct_error::object_corrupted::make()};
265 read_errorator::pass_further
{});
268 PGBackend::read_ierrorator::future
<>
269 PGBackend::sparse_read(const ObjectState
& os
, OSDOp
& osd_op
,
270 object_stat_sum_t
& delta_stats
)
272 if (!os
.exists
|| os
.oi
.is_whiteout()) {
273 logger().debug("{}: {} DNE", __func__
, os
.oi
.soid
);
274 return crimson::ct_error::enoent::make();
277 const auto& op
= osd_op
.op
;
278 /* clients (particularly cephfs) may send truncate operations out of order
279 * w.r.t. reads. op.extent.truncate_seq and op.extent.truncate_size allow
280 * the OSD to determine whether the client submitted read needs to be
281 * adjusted to compensate for a truncate the OSD hasn't seen yet.
283 uint64_t adjusted_size
= os
.oi
.size
;
284 const uint64_t offset
= op
.extent
.offset
;
285 uint64_t adjusted_length
= op
.extent
.length
;
286 if ((os
.oi
.truncate_seq
< op
.extent
.truncate_seq
) &&
287 (op
.extent
.offset
+ op
.extent
.length
> op
.extent
.truncate_size
) &&
288 (adjusted_size
> op
.extent
.truncate_size
)) {
289 adjusted_size
= op
.extent
.truncate_size
;
291 if (offset
> adjusted_size
) {
293 } else if (offset
+ adjusted_length
> adjusted_size
) {
294 adjusted_length
= adjusted_size
- offset
;
296 logger().trace("sparse_read: {} {}~{}",
297 os
.oi
.soid
, op
.extent
.offset
, op
.extent
.length
);
298 return interruptor::make_interruptible(store
->fiemap(coll
, ghobject_t
{os
.oi
.soid
},
299 offset
, adjusted_length
)).safe_then_interruptible(
300 [&delta_stats
, &os
, &osd_op
, this](auto&& m
) {
301 return seastar::do_with(interval_set
<uint64_t>{std::move(m
)},
302 [&delta_stats
, &os
, &osd_op
, this](auto&& extents
) {
303 return interruptor::make_interruptible(store
->readv(coll
, ghobject_t
{os
.oi
.soid
},
304 extents
, osd_op
.op
.flags
)).safe_then_interruptible_tuple(
305 [&delta_stats
, &os
, &osd_op
, &extents
](auto&& bl
) -> read_errorator::future
<> {
306 if (_read_verify_data(os
.oi
, bl
)) {
307 osd_op
.op
.extent
.length
= bl
.length();
308 // re-encode since it might be modified
309 ceph::encode(extents
, osd_op
.outdata
);
310 encode_destructively(bl
, osd_op
.outdata
);
311 logger().trace("sparse_read got {} bytes from object {}",
312 osd_op
.op
.extent
.length
, os
.oi
.soid
);
313 delta_stats
.num_rd
++;
314 delta_stats
.num_rd_kb
+= shift_round_up(osd_op
.op
.extent
.length
, 10);
315 return read_errorator::make_ready_future
<>();
318 return crimson::ct_error::object_corrupted::make();
320 }, crimson::ct_error::input_output_error::handle([] {
321 return read_errorator::future
<>{crimson::ct_error::object_corrupted::make()};
323 read_errorator::pass_further
{});
331 PGBackend::checksum_errorator::future
<>
332 do_checksum(ceph::bufferlist
& init_value_bl
,
334 const ceph::bufferlist
& buf
,
335 ceph::bufferlist
& result
)
337 typename
CSum::init_value_t init_value
;
338 auto init_value_p
= init_value_bl
.cbegin();
340 decode(init_value
, init_value_p
);
341 // chop off the consumed part
342 init_value_bl
.splice(0, init_value_p
.get_off());
343 } catch (const ceph::buffer::end_of_buffer
&) {
344 logger().warn("{}: init value not provided", __func__
);
345 return crimson::ct_error::invarg::make();
347 const uint32_t chunk_count
= buf
.length() / chunk_size
;
348 ceph::bufferptr csum_data
{
349 ceph::buffer::create(sizeof(typename
CSum::value_t
) * chunk_count
)};
350 Checksummer::calculate
<CSum
>(
351 init_value
, chunk_size
, 0, buf
.length(), buf
, &csum_data
);
352 encode(chunk_count
, result
);
353 result
.append(std::move(csum_data
));
354 return PGBackend::checksum_errorator::now();
358 PGBackend::checksum_ierrorator::future
<>
359 PGBackend::checksum(const ObjectState
& os
, OSDOp
& osd_op
)
361 // sanity tests and normalize the argments
362 auto& checksum
= osd_op
.op
.checksum
;
363 if (checksum
.offset
== 0 && checksum
.length
== 0) {
364 // zeroed offset+length implies checksum whole object
365 checksum
.length
= os
.oi
.size
;
366 } else if (checksum
.offset
>= os
.oi
.size
) {
367 // read size was trimmed to zero, do nothing,
368 // see PGBackend::read()
369 return checksum_errorator::now();
371 if (checksum
.chunk_size
> 0) {
372 if (checksum
.length
== 0) {
373 logger().warn("{}: length required when chunk size provided", __func__
);
374 return crimson::ct_error::invarg::make();
376 if (checksum
.length
% checksum
.chunk_size
!= 0) {
377 logger().warn("{}: length not aligned to chunk size", __func__
);
378 return crimson::ct_error::invarg::make();
381 checksum
.chunk_size
= checksum
.length
;
383 if (checksum
.length
== 0) {
385 encode(count
, osd_op
.outdata
);
386 return checksum_errorator::now();
389 // read the chunk to be checksum'ed
390 return _read(os
.oi
.soid
, checksum
.offset
, checksum
.length
, osd_op
.op
.flags
)
391 .safe_then_interruptible(
392 [&osd_op
](auto&& read_bl
) mutable -> checksum_errorator::future
<> {
393 auto& checksum
= osd_op
.op
.checksum
;
394 if (read_bl
.length() != checksum
.length
) {
395 logger().warn("checksum: bytes read {} != {}",
396 read_bl
.length(), checksum
.length
);
397 return crimson::ct_error::invarg::make();
399 // calculate its checksum and put the result in outdata
400 switch (checksum
.type
) {
401 case CEPH_OSD_CHECKSUM_OP_TYPE_XXHASH32
:
402 return do_checksum
<Checksummer::xxhash32
>(osd_op
.indata
,
406 case CEPH_OSD_CHECKSUM_OP_TYPE_XXHASH64
:
407 return do_checksum
<Checksummer::xxhash64
>(osd_op
.indata
,
411 case CEPH_OSD_CHECKSUM_OP_TYPE_CRC32C
:
412 return do_checksum
<Checksummer::crc32c
>(osd_op
.indata
,
417 logger().warn("checksum: unknown crc type ({})",
418 static_cast<uint32_t>(checksum
.type
));
419 return crimson::ct_error::invarg::make();
424 PGBackend::cmp_ext_ierrorator::future
<>
425 PGBackend::cmp_ext(const ObjectState
& os
, OSDOp
& osd_op
)
427 const ceph_osd_op
& op
= osd_op
.op
;
428 uint64_t obj_size
= os
.oi
.size
;
429 if (os
.oi
.truncate_seq
< op
.extent
.truncate_seq
&&
430 op
.extent
.offset
+ op
.extent
.length
> op
.extent
.truncate_size
) {
431 obj_size
= op
.extent
.truncate_size
;
434 if (op
.extent
.offset
>= obj_size
) {
436 } else if (op
.extent
.offset
+ op
.extent
.length
> obj_size
) {
437 ext_len
= obj_size
- op
.extent
.offset
;
439 ext_len
= op
.extent
.length
;
441 auto read_ext
= ll_read_ierrorator::make_ready_future
<ceph::bufferlist
>();
443 logger().debug("{}: zero length extent", __func__
);
444 } else if (!os
.exists
|| os
.oi
.is_whiteout()) {
445 logger().debug("{}: {} DNE", __func__
, os
.oi
.soid
);
447 read_ext
= _read(os
.oi
.soid
, op
.extent
.offset
, ext_len
, 0);
449 return read_ext
.safe_then_interruptible([&osd_op
](auto&& read_bl
)
450 -> cmp_ext_errorator::future
<> {
451 for (unsigned index
= 0; index
< osd_op
.indata
.length(); index
++) {
452 char byte_in_op
= osd_op
.indata
[index
];
453 char byte_from_disk
= (index
< read_bl
.length() ? read_bl
[index
] : 0);
454 if (byte_in_op
!= byte_from_disk
) {
455 logger().debug("cmp_ext: mismatch at {}", index
);
456 // Unlike other ops, we set osd_op.rval here and return a different
457 // error code via ct_error::cmp_fail.
458 osd_op
.rval
= -MAX_ERRNO
- index
;
459 return crimson::ct_error::cmp_fail::make();
463 return cmp_ext_errorator::make_ready_future
<>();
467 PGBackend::stat_ierrorator::future
<>
469 const ObjectState
& os
,
471 object_stat_sum_t
& delta_stats
)
473 if (os
.exists
/* TODO: && !os.is_whiteout() */) {
474 logger().debug("stat os.oi.size={}, os.oi.mtime={}", os
.oi
.size
, os
.oi
.mtime
);
475 encode(os
.oi
.size
, osd_op
.outdata
);
476 encode(os
.oi
.mtime
, osd_op
.outdata
);
478 logger().debug("stat object does not exist");
479 return crimson::ct_error::enoent::make();
481 delta_stats
.num_rd
++;
482 return stat_errorator::now();
485 PGBackend::write_iertr::future
<> PGBackend::_writefull(
488 const bufferlist
& bl
,
489 ceph::os::Transaction
& txn
,
490 osd_op_params_t
& osd_op_params
,
491 object_stat_sum_t
& delta_stats
,
494 const bool existing
= maybe_create_new_object(os
, txn
, delta_stats
);
495 if (existing
&& bl
.length() < os
.oi
.size
) {
497 txn
.truncate(coll
->get_cid(), ghobject_t
{os
.oi
.soid
}, bl
.length());
498 truncate_update_size_and_usage(delta_stats
, os
.oi
, truncate_size
);
500 osd_op_params
.clean_regions
.mark_data_region_dirty(
502 os
.oi
.size
- bl
.length());
506 coll
->get_cid(), ghobject_t
{os
.oi
.soid
}, 0, bl
.length(),
508 update_size_and_usage(
509 delta_stats
, os
.oi
, 0,
511 osd_op_params
.clean_regions
.mark_data_region_dirty(
513 std::max((uint64_t)bl
.length(), os
.oi
.size
));
515 return seastar::now();
518 PGBackend::write_iertr::future
<> PGBackend::_truncate(
520 ceph::os::Transaction
& txn
,
521 osd_op_params_t
& osd_op_params
,
522 object_stat_sum_t
& delta_stats
,
524 size_t truncate_size
,
525 uint32_t truncate_seq
)
528 assert(offset
== truncate_size
);
529 if (truncate_seq
<= os
.oi
.truncate_seq
) {
530 logger().debug("{} truncate seq {} <= current {}, no-op",
531 __func__
, truncate_seq
, os
.oi
.truncate_seq
);
532 return write_ertr::make_ready_future
<>();
534 logger().debug("{} truncate seq {} > current {}, truncating",
535 __func__
, truncate_seq
, os
.oi
.truncate_seq
);
536 os
.oi
.truncate_seq
= truncate_seq
;
537 os
.oi
.truncate_size
= truncate_size
;
540 maybe_create_new_object(os
, txn
, delta_stats
);
541 if (os
.oi
.size
!= offset
) {
544 ghobject_t
{os
.oi
.soid
}, offset
);
545 if (os
.oi
.size
> offset
) {
546 // TODO: modified_ranges.union_of(trim);
547 osd_op_params
.clean_regions
.mark_data_region_dirty(
549 os
.oi
.size
- offset
);
551 // os.oi.size < offset
552 osd_op_params
.clean_regions
.mark_data_region_dirty(
554 offset
- os
.oi
.size
);
556 truncate_update_size_and_usage(delta_stats
, os
.oi
, offset
);
557 os
.oi
.clear_data_digest();
559 delta_stats
.num_wr
++;
560 return write_ertr::now();
563 bool PGBackend::maybe_create_new_object(
565 ceph::os::Transaction
& txn
,
566 object_stat_sum_t
& delta_stats
)
569 ceph_assert(!os
.oi
.is_whiteout());
573 txn
.touch(coll
->get_cid(), ghobject_t
{os
.oi
.soid
});
574 delta_stats
.num_objects
++;
576 } else if (os
.oi
.is_whiteout()) {
577 os
.oi
.clear_flag(object_info_t::FLAG_WHITEOUT
);
578 delta_stats
.num_whiteouts
--;
583 void PGBackend::update_size_and_usage(object_stat_sum_t
& delta_stats
,
584 object_info_t
& oi
, uint64_t offset
,
585 uint64_t length
, bool write_full
)
588 (offset
+ length
> oi
.size
&& length
)) {
589 uint64_t new_size
= offset
+ length
;
590 delta_stats
.num_bytes
-= oi
.size
;
591 delta_stats
.num_bytes
+= new_size
;
594 delta_stats
.num_wr
++;
595 delta_stats
.num_wr_kb
+= shift_round_up(length
, 10);
598 void PGBackend::truncate_update_size_and_usage(object_stat_sum_t
& delta_stats
,
600 uint64_t truncate_size
)
602 if (oi
.size
!= truncate_size
) {
603 delta_stats
.num_bytes
-= oi
.size
;
604 delta_stats
.num_bytes
+= truncate_size
;
605 oi
.size
= truncate_size
;
609 static bool is_offset_and_length_valid(
610 const std::uint64_t offset
,
611 const std::uint64_t length
)
613 if (const std::uint64_t max
= local_conf()->osd_max_object_size
;
614 offset
>= max
|| length
> max
|| offset
+ length
> max
) {
615 logger().debug("{} osd_max_object_size: {}, offset: {}, len: {}; "
616 "Hard limit of object size is 4GB",
617 __func__
, max
, offset
, length
);
624 PGBackend::interruptible_future
<> PGBackend::set_allochint(
627 ceph::os::Transaction
& txn
,
628 object_stat_sum_t
& delta_stats
)
630 maybe_create_new_object(os
, txn
, delta_stats
);
632 os
.oi
.expected_object_size
= osd_op
.op
.alloc_hint
.expected_object_size
;
633 os
.oi
.expected_write_size
= osd_op
.op
.alloc_hint
.expected_write_size
;
634 os
.oi
.alloc_hint_flags
= osd_op
.op
.alloc_hint
.flags
;
635 txn
.set_alloc_hint(coll
->get_cid(),
636 ghobject_t
{os
.oi
.soid
},
637 os
.oi
.expected_object_size
,
638 os
.oi
.expected_write_size
,
639 os
.oi
.alloc_hint_flags
);
640 return seastar::now();
643 PGBackend::write_iertr::future
<> PGBackend::write(
646 ceph::os::Transaction
& txn
,
647 osd_op_params_t
& osd_op_params
,
648 object_stat_sum_t
& delta_stats
)
650 const ceph_osd_op
& op
= osd_op
.op
;
651 uint64_t offset
= op
.extent
.offset
;
652 uint64_t length
= op
.extent
.length
;
653 bufferlist buf
= osd_op
.indata
;
654 if (op
.extent
.length
!= osd_op
.indata
.length()) {
655 return crimson::ct_error::invarg::make();
658 if (!is_offset_and_length_valid(op
.extent
.offset
, op
.extent
.length
)) {
659 return crimson::ct_error::file_too_large::make();
662 if (auto seq
= os
.oi
.truncate_seq
;
663 seq
!= 0 && op
.extent
.truncate_seq
< seq
) {
664 // old write, arrived after trimtrunc
665 if (offset
+ length
> os
.oi
.size
) {
667 if (offset
> os
.oi
.size
) {
672 auto len
= os
.oi
.size
- offset
;
673 buf
.splice(len
, length
);
677 } else if (op
.extent
.truncate_seq
> seq
) {
678 // write arrives before trimtrunc
679 if (os
.exists
&& !os
.oi
.is_whiteout()) {
680 txn
.truncate(coll
->get_cid(),
681 ghobject_t
{os
.oi
.soid
}, op
.extent
.truncate_size
);
682 if (op
.extent
.truncate_size
!= os
.oi
.size
) {
684 if (op
.extent
.truncate_size
> os
.oi
.size
) {
685 osd_op_params
.clean_regions
.mark_data_region_dirty(os
.oi
.size
,
686 op
.extent
.truncate_size
- os
.oi
.size
);
688 osd_op_params
.clean_regions
.mark_data_region_dirty(op
.extent
.truncate_size
,
689 os
.oi
.size
- op
.extent
.truncate_size
);
692 truncate_update_size_and_usage(delta_stats
, os
.oi
, op
.extent
.truncate_size
);
694 os
.oi
.truncate_seq
= op
.extent
.truncate_seq
;
695 os
.oi
.truncate_size
= op
.extent
.truncate_size
;
697 maybe_create_new_object(os
, txn
, delta_stats
);
699 if (offset
> os
.oi
.size
) {
700 txn
.truncate(coll
->get_cid(), ghobject_t
{os
.oi
.soid
}, op
.extent
.offset
);
701 truncate_update_size_and_usage(delta_stats
, os
.oi
, op
.extent
.offset
);
706 txn
.write(coll
->get_cid(), ghobject_t
{os
.oi
.soid
},
707 offset
, length
, std::move(buf
), op
.flags
);
708 update_size_and_usage(delta_stats
, os
.oi
, offset
, length
);
710 osd_op_params
.clean_regions
.mark_data_region_dirty(op
.extent
.offset
,
713 return seastar::now();
716 PGBackend::interruptible_future
<> PGBackend::write_same(
719 ceph::os::Transaction
& txn
,
720 osd_op_params_t
& osd_op_params
,
721 object_stat_sum_t
& delta_stats
)
723 const ceph_osd_op
& op
= osd_op
.op
;
724 const uint64_t len
= op
.writesame
.length
;
726 return seastar::now();
728 if (op
.writesame
.data_length
== 0 ||
729 len
% op
.writesame
.data_length
!= 0 ||
730 op
.writesame
.data_length
!= osd_op
.indata
.length()) {
731 throw crimson::osd::invalid_argument();
733 ceph::bufferlist repeated_indata
;
734 for (uint64_t size
= 0; size
< len
; size
+= op
.writesame
.data_length
) {
735 repeated_indata
.append(osd_op
.indata
);
737 maybe_create_new_object(os
, txn
, delta_stats
);
738 txn
.write(coll
->get_cid(), ghobject_t
{os
.oi
.soid
},
739 op
.writesame
.offset
, len
,
740 std::move(repeated_indata
), op
.flags
);
741 update_size_and_usage(delta_stats
, os
.oi
, op
.writesame
.offset
, len
);
742 osd_op_params
.clean_regions
.mark_data_region_dirty(op
.writesame
.offset
, len
);
743 return seastar::now();
746 PGBackend::write_iertr::future
<> PGBackend::writefull(
749 ceph::os::Transaction
& txn
,
750 osd_op_params_t
& osd_op_params
,
751 object_stat_sum_t
& delta_stats
)
753 const ceph_osd_op
& op
= osd_op
.op
;
754 if (op
.extent
.length
!= osd_op
.indata
.length()) {
755 return crimson::ct_error::invarg::make();
757 if (!is_offset_and_length_valid(op
.extent
.offset
, op
.extent
.length
)) {
758 return crimson::ct_error::file_too_large::make();
763 op
.extent
.truncate_size
,
771 PGBackend::rollback_iertr::future
<> PGBackend::rollback(
774 ceph::os::Transaction
& txn
,
775 osd_op_params_t
& osd_op_params
,
776 object_stat_sum_t
& delta_stats
,
777 crimson::osd::ObjectContextRef head
,
778 crimson::osd::ObjectContextLoader
& obc_loader
)
780 const ceph_osd_op
& op
= osd_op
.op
;
781 snapid_t snapid
= (uint64_t)op
.snap
.snapid
;
782 assert(os
.oi
.soid
.is_head());
783 logger().debug("{} deleting {} and rolling back to old snap {}",
784 __func__
, os
.oi
.soid
,snapid
);
785 hobject_t target_coid
= os
.oi
.soid
;
786 target_coid
.snap
= snapid
;
787 return obc_loader
.with_clone_obc_only
<RWState::RWWRITE
>(
789 [this, &os
, &txn
, &delta_stats
, &osd_op_params
]
790 (auto resolved_obc
) {
791 if (resolved_obc
->obs
.oi
.soid
.is_head()) {
792 // no-op: The resolved oid returned the head object
793 logger().debug("PGBackend::rollback: loaded head_obc: {}"
795 resolved_obc
->obs
.oi
.soid
);
796 return rollback_iertr::now();
798 /* TODO: https://tracker.ceph.com/issues/59114 This implementation will not
799 * behave correctly for a rados operation consisting of a mutation followed
800 * by a rollback to a snapshot since the last mutation of the object.
801 * The correct behavior would be for the rollback to undo the mutation
802 * earlier in the operation by resolving to the clone created at the start
803 * of the operation (see resolve_oid).
804 * Instead, it will select HEAD leaving that mutation intact since the SnapSet won't
805 * yet contain that clone. This behavior exists in classic as well.
807 logger().debug("PGBackend::rollback: loaded clone_obc: {}",
808 resolved_obc
->obs
.oi
.soid
);
809 // 1) Delete current head
811 txn
.remove(coll
->get_cid(), ghobject_t
{os
.oi
.soid
,
812 ghobject_t::NO_GEN
, shard
});
814 // 2) Clone correct snapshot into head
815 txn
.clone(coll
->get_cid(), ghobject_t
{resolved_obc
->obs
.oi
.soid
},
816 ghobject_t
{os
.oi
.soid
});
817 // Copy clone obc.os.oi to os.oi
818 os
.oi
.clear_flag(object_info_t::FLAG_WHITEOUT
);
819 os
.oi
.copy_user_bits(resolved_obc
->obs
.oi
);
820 delta_stats
.num_bytes
-= os
.oi
.size
;
821 delta_stats
.num_bytes
+= resolved_obc
->obs
.oi
.size
;
822 osd_op_params
.clean_regions
.mark_data_region_dirty(0,
823 std::max(os
.oi
.size
, resolved_obc
->obs
.oi
.size
));
824 osd_op_params
.clean_regions
.mark_omap_dirty();
825 // TODO: 3) Calculate clone_overlaps by following overlaps
826 // forward from rollback snapshot
827 // https://tracker.ceph.com/issues/58263
828 return rollback_iertr::now();
829 }).safe_then_interruptible([] {
830 logger().debug("PGBackend::rollback succefully");
831 return rollback_iertr::now();
832 },// there's no snapshot here, or there's no object.
833 // if there's no snapshot, we delete the object;
834 // otherwise, do nothing.
835 crimson::ct_error::enoent::handle(
836 [this, &os
, &snapid
, &txn
, &delta_stats
] {
837 logger().debug("PGBackend::rollback: deleting head on {}"
838 " with snap_id of {}"
839 " because got ENOENT|whiteout on obc lookup",
841 return remove(os
, txn
, delta_stats
, false);
843 rollback_ertr::pass_further
{},
844 crimson::ct_error::assert_all
{"unexpected error in rollback"}
848 PGBackend::append_ierrorator::future
<> PGBackend::append(
851 ceph::os::Transaction
& txn
,
852 osd_op_params_t
& osd_op_params
,
853 object_stat_sum_t
& delta_stats
)
855 const ceph_osd_op
& op
= osd_op
.op
;
856 if (op
.extent
.length
!= osd_op
.indata
.length()) {
857 return crimson::ct_error::invarg::make();
859 maybe_create_new_object(os
, txn
, delta_stats
);
860 if (op
.extent
.length
) {
861 txn
.write(coll
->get_cid(), ghobject_t
{os
.oi
.soid
},
862 os
.oi
.size
/* offset */, op
.extent
.length
,
863 std::move(osd_op
.indata
), op
.flags
);
864 update_size_and_usage(delta_stats
, os
.oi
, os
.oi
.size
,
866 osd_op_params
.clean_regions
.mark_data_region_dirty(os
.oi
.size
,
869 return seastar::now();
872 PGBackend::write_iertr::future
<> PGBackend::truncate(
875 ceph::os::Transaction
& txn
,
876 osd_op_params_t
& osd_op_params
,
877 object_stat_sum_t
& delta_stats
)
879 if (!os
.exists
|| os
.oi
.is_whiteout()) {
880 logger().debug("{} object dne, truncate is a no-op", __func__
);
881 return write_ertr::now();
883 const ceph_osd_op
& op
= osd_op
.op
;
884 if (!is_offset_and_length_valid(op
.extent
.offset
, op
.extent
.length
)) {
885 return crimson::ct_error::file_too_large::make();
888 os
, txn
, osd_op_params
, delta_stats
,
889 op
.extent
.offset
, op
.extent
.truncate_size
, op
.extent
.truncate_seq
);
892 PGBackend::write_iertr::future
<> PGBackend::zero(
895 ceph::os::Transaction
& txn
,
896 osd_op_params_t
& osd_op_params
,
897 object_stat_sum_t
& delta_stats
)
899 if (!os
.exists
|| os
.oi
.is_whiteout()) {
900 logger().debug("{} object dne, zero is a no-op", __func__
);
901 return write_ertr::now();
903 const ceph_osd_op
& op
= osd_op
.op
;
904 if (!is_offset_and_length_valid(op
.extent
.offset
, op
.extent
.length
)) {
905 return crimson::ct_error::file_too_large::make();
908 if (op
.extent
.offset
>= os
.oi
.size
|| op
.extent
.length
== 0) {
909 return write_iertr::now(); // noop
912 if (op
.extent
.offset
+ op
.extent
.length
>= os
.oi
.size
) {
914 os
, txn
, osd_op_params
, delta_stats
,
915 op
.extent
.offset
, op
.extent
.truncate_size
, op
.extent
.truncate_seq
);
918 txn
.zero(coll
->get_cid(),
919 ghobject_t
{os
.oi
.soid
},
922 // TODO: modified_ranges.union_of(zeroed);
923 osd_op_params
.clean_regions
.mark_data_region_dirty(op
.extent
.offset
,
925 delta_stats
.num_wr
++;
926 os
.oi
.clear_data_digest();
927 return write_ertr::now();
930 PGBackend::create_iertr::future
<> PGBackend::create(
933 ceph::os::Transaction
& txn
,
934 object_stat_sum_t
& delta_stats
)
936 if (os
.exists
&& !os
.oi
.is_whiteout() &&
937 (osd_op
.op
.flags
& CEPH_OSD_OP_FLAG_EXCL
)) {
938 // this is an exclusive create
939 return crimson::ct_error::eexist::make();
942 if (osd_op
.indata
.length()) {
943 // handle the legacy. `category` is no longer implemented.
945 auto p
= osd_op
.indata
.cbegin();
946 std::string category
;
948 } catch (buffer::error
&) {
949 return crimson::ct_error::invarg::make();
952 maybe_create_new_object(os
, txn
, delta_stats
);
953 txn
.create(coll
->get_cid(),
954 ghobject_t
{os
.oi
.soid
, ghobject_t::NO_GEN
, shard
});
955 return seastar::now();
958 PGBackend::interruptible_future
<>
959 PGBackend::remove(ObjectState
& os
, ceph::os::Transaction
& txn
)
962 txn
.remove(coll
->get_cid(),
963 ghobject_t
{os
.oi
.soid
, ghobject_t::NO_GEN
, shard
});
967 // todo: update watchers
968 if (os
.oi
.is_whiteout()) {
969 os
.oi
.clear_flag(object_info_t::FLAG_WHITEOUT
);
971 return seastar::now();
974 PGBackend::remove_iertr::future
<>
975 PGBackend::remove(ObjectState
& os
, ceph::os::Transaction
& txn
,
976 object_stat_sum_t
& delta_stats
, bool whiteout
)
979 return crimson::ct_error::enoent::make();
983 logger().debug("{} {} does not exist",__func__
, os
.oi
.soid
);
984 return seastar::now();
986 if (whiteout
&& os
.oi
.is_whiteout()) {
987 logger().debug("{} whiteout set on {} ",__func__
, os
.oi
.soid
);
988 return seastar::now();
990 txn
.remove(coll
->get_cid(),
991 ghobject_t
{os
.oi
.soid
, ghobject_t::NO_GEN
, shard
});
992 delta_stats
.num_bytes
-= os
.oi
.size
;
996 // todo: clone_overlap
998 logger().debug("{} setting whiteout on {} ",__func__
, os
.oi
.soid
);
999 os
.oi
.set_flag(object_info_t::FLAG_WHITEOUT
);
1000 delta_stats
.num_whiteouts
++;
1001 txn
.create(coll
->get_cid(),
1002 ghobject_t
{os
.oi
.soid
, ghobject_t::NO_GEN
, shard
});
1003 return seastar::now();
1005 // todo: update watchers
1006 if (os
.oi
.is_whiteout()) {
1007 os
.oi
.clear_flag(object_info_t::FLAG_WHITEOUT
);
1008 delta_stats
.num_whiteouts
--;
1010 delta_stats
.num_objects
--;
1012 return seastar::now();
1015 PGBackend::interruptible_future
<std::tuple
<std::vector
<hobject_t
>, hobject_t
>>
1016 PGBackend::list_objects(const hobject_t
& start
, uint64_t limit
) const
1018 auto gstart
= start
.is_min() ? ghobject_t
{} : ghobject_t
{start
, 0, shard
};
1019 return interruptor::make_interruptible(store
->list_objects(coll
,
1021 ghobject_t::get_max(),
1023 .then_interruptible([](auto ret
) {
1024 auto& [gobjects
, next
] = ret
;
1025 std::vector
<hobject_t
> objects
;
1026 boost::copy(gobjects
|
1027 boost::adaptors::filtered([](const ghobject_t
& o
) {
1028 if (o
.is_pgmeta()) {
1030 } else if (o
.hobj
.is_temp()) {
1033 return o
.is_no_gen();
1036 boost::adaptors::transformed([](const ghobject_t
& o
) {
1039 std::back_inserter(objects
));
1040 return seastar::make_ready_future
<std::tuple
<std::vector
<hobject_t
>, hobject_t
>>(
1041 std::make_tuple(objects
, next
.hobj
));
1045 PGBackend::setxattr_ierrorator::future
<> PGBackend::setxattr(
1047 const OSDOp
& osd_op
,
1048 ceph::os::Transaction
& txn
,
1049 object_stat_sum_t
& delta_stats
)
1051 if (local_conf()->osd_max_attr_size
> 0 &&
1052 osd_op
.op
.xattr
.value_len
> local_conf()->osd_max_attr_size
) {
1053 return crimson::ct_error::file_too_large::make();
1056 const auto max_name_len
= std::min
<uint64_t>(
1057 store
->get_max_attr_name_length(), local_conf()->osd_max_attr_name_len
);
1058 if (osd_op
.op
.xattr
.name_len
> max_name_len
) {
1059 return crimson::ct_error::enametoolong::make();
1062 maybe_create_new_object(os
, txn
, delta_stats
);
1064 std::string name
{"_"};
1065 ceph::bufferlist val
;
1067 auto bp
= osd_op
.indata
.cbegin();
1068 bp
.copy(osd_op
.op
.xattr
.name_len
, name
);
1069 bp
.copy(osd_op
.op
.xattr
.value_len
, val
);
1071 logger().debug("setxattr on obj={} for attr={}", os
.oi
.soid
, name
);
1072 txn
.setattr(coll
->get_cid(), ghobject_t
{os
.oi
.soid
}, name
, val
);
1073 delta_stats
.num_wr
++;
1074 return seastar::now();
1077 PGBackend::get_attr_ierrorator::future
<> PGBackend::getxattr(
1078 const ObjectState
& os
,
1080 object_stat_sum_t
& delta_stats
) const
1083 ceph::bufferlist val
;
1085 auto bp
= osd_op
.indata
.cbegin();
1087 bp
.copy(osd_op
.op
.xattr
.name_len
, aname
);
1090 logger().debug("getxattr on obj={} for attr={}", os
.oi
.soid
, name
);
1091 return getxattr(os
.oi
.soid
, std::move(name
)).safe_then_interruptible(
1092 [&delta_stats
, &osd_op
] (ceph::bufferlist
&& val
) {
1093 osd_op
.outdata
= std::move(val
);
1094 osd_op
.op
.xattr
.value_len
= osd_op
.outdata
.length();
1095 delta_stats
.num_rd
++;
1096 delta_stats
.num_rd_kb
+= shift_round_up(osd_op
.outdata
.length(), 10);
1097 return get_attr_errorator::now();
1101 PGBackend::get_attr_ierrorator::future
<ceph::bufferlist
>
1102 PGBackend::getxattr(
1103 const hobject_t
& soid
,
1104 std::string_view key
) const
1106 return store
->get_attr(coll
, ghobject_t
{soid
}, key
);
1109 PGBackend::get_attr_ierrorator::future
<ceph::bufferlist
>
1110 PGBackend::getxattr(
1111 const hobject_t
& soid
,
1112 std::string
&& key
) const
1114 return seastar::do_with(key
, [this, &soid
](auto &key
) {
1115 return store
->get_attr(coll
, ghobject_t
{soid
}, key
);
1119 PGBackend::get_attr_ierrorator::future
<> PGBackend::get_xattrs(
1120 const ObjectState
& os
,
1122 object_stat_sum_t
& delta_stats
) const
1124 return store
->get_attrs(coll
, ghobject_t
{os
.oi
.soid
}).safe_then(
1125 [&delta_stats
, &osd_op
](auto&& attrs
) {
1126 std::vector
<std::pair
<std::string
, bufferlist
>> user_xattrs
;
1127 ceph::bufferlist bl
;
1128 for (auto& [key
, val
] : attrs
) {
1129 if (key
.size() > 1 && key
[0] == '_') {
1130 bl
.append(std::move(val
));
1131 user_xattrs
.emplace_back(key
.substr(1), std::move(bl
));
1134 ceph::encode(user_xattrs
, osd_op
.outdata
);
1135 delta_stats
.num_rd
++;
1136 delta_stats
.num_rd_kb
+= shift_round_up(bl
.length(), 10);
1137 return get_attr_errorator::now();
1143 template<typename U
, typename V
>
1144 int do_cmp_xattr(int op
, const U
& lhs
, const V
& rhs
)
1147 case CEPH_OSD_CMPXATTR_OP_EQ
:
1149 case CEPH_OSD_CMPXATTR_OP_NE
:
1151 case CEPH_OSD_CMPXATTR_OP_GT
:
1153 case CEPH_OSD_CMPXATTR_OP_GTE
:
1155 case CEPH_OSD_CMPXATTR_OP_LT
:
1157 case CEPH_OSD_CMPXATTR_OP_LTE
:
1164 } // anonymous namespace
1166 static int do_xattr_cmp_u64(int op
, uint64_t lhs
, bufferlist
& rhs_xattr
)
1170 if (rhs_xattr
.length() > 0) {
1171 const char* first
= rhs_xattr
.c_str();
1172 if (auto [p
, ec
] = std::from_chars(first
, first
+ rhs_xattr
.length(), rhs
);
1173 ec
!= std::errc()) {
1179 logger().debug("do_xattr_cmp_u64 '{}' vs '{}' op {}", lhs
, rhs
, op
);
1180 return do_cmp_xattr(op
, lhs
, rhs
);
1183 PGBackend::cmp_xattr_ierrorator::future
<> PGBackend::cmp_xattr(
1184 const ObjectState
& os
,
1186 object_stat_sum_t
& delta_stats
) const
1188 std::string name
{"_"};
1189 auto bp
= osd_op
.indata
.cbegin();
1190 bp
.copy(osd_op
.op
.xattr
.name_len
, name
);
1192 logger().debug("cmpxattr on obj={} for attr={}", os
.oi
.soid
, name
);
1193 return getxattr(os
.oi
.soid
, std::move(name
)).safe_then_interruptible(
1194 [&delta_stats
, &osd_op
] (auto &&xattr
) -> cmp_xattr_ierrorator::future
<> {
1195 delta_stats
.num_rd
++;
1196 delta_stats
.num_rd_kb
+= shift_round_up(osd_op
.op
.xattr
.value_len
, 10);
1199 auto bp
= osd_op
.indata
.cbegin();
1200 bp
+= osd_op
.op
.xattr
.name_len
;
1202 switch (osd_op
.op
.xattr
.cmp_mode
) {
1203 case CEPH_OSD_CMPXATTR_MODE_STRING
:
1206 bp
.copy(osd_op
.op
.xattr
.value_len
, lhs
);
1207 string_view
rhs(xattr
.c_str(), xattr
.length());
1208 result
= do_cmp_xattr(osd_op
.op
.xattr
.cmp_op
, lhs
, rhs
);
1209 logger().debug("cmpxattr lhs={}, rhs={}", lhs
, rhs
);
1212 case CEPH_OSD_CMPXATTR_MODE_U64
:
1217 } catch (ceph::buffer::error
& e
) {
1218 logger().info("cmp_xattr: buffer error expection");
1222 result
= do_xattr_cmp_u64(osd_op
.op
.xattr
.cmp_op
, lhs
, xattr
);
1226 logger().info("bad cmp mode {}", osd_op
.op
.xattr
.cmp_mode
);
1230 logger().info("cmp_xattr: comparison returned false");
1231 return crimson::ct_error::ecanceled::make();
1232 } else if (result
== -EINVAL
) {
1233 return crimson::ct_error::invarg::make();
1236 return cmp_xattr_ierrorator::now();
1238 }).handle_error_interruptible(
1239 crimson::ct_error::enodata::handle([&delta_stats
, &osd_op
] ()
1240 ->cmp_xattr_errorator::future
<> {
1241 delta_stats
.num_rd
++;
1242 delta_stats
.num_rd_kb
+= shift_round_up(osd_op
.op
.xattr
.value_len
, 10);
1243 return crimson::ct_error::ecanceled::make();
1245 cmp_xattr_errorator::pass_further
{}
1249 PGBackend::rm_xattr_iertr::future
<>
1250 PGBackend::rm_xattr(
1252 const OSDOp
& osd_op
,
1253 ceph::os::Transaction
& txn
)
1255 if (!os
.exists
|| os
.oi
.is_whiteout()) {
1256 logger().debug("{}: {} DNE", __func__
, os
.oi
.soid
);
1257 return crimson::ct_error::enoent::make();
1259 auto bp
= osd_op
.indata
.cbegin();
1260 string attr_name
{"_"};
1261 bp
.copy(osd_op
.op
.xattr
.name_len
, attr_name
);
1262 txn
.rmattr(coll
->get_cid(), ghobject_t
{os
.oi
.soid
}, attr_name
);
1263 return rm_xattr_iertr::now();
1266 void PGBackend::clone(
1267 /* const */object_info_t
& snap_oi
,
1268 const ObjectState
& os
,
1269 const ObjectState
& d_os
,
1270 ceph::os::Transaction
& txn
)
1272 // See OpsExecutor::execute_clone documentation
1273 txn
.clone(coll
->get_cid(), ghobject_t
{os
.oi
.soid
}, ghobject_t
{d_os
.oi
.soid
});
1275 ceph::bufferlist bv
;
1276 snap_oi
.encode_no_oid(bv
, CEPH_FEATURES_ALL
);
1277 txn
.setattr(coll
->get_cid(), ghobject_t
{d_os
.oi
.soid
}, OI_ATTR
, bv
);
1279 txn
.rmattr(coll
->get_cid(), ghobject_t
{d_os
.oi
.soid
}, SS_ATTR
);
1282 using get_omap_ertr
=
1283 crimson::os::FuturizedStore::Shard::read_errorator::extend
<
1284 crimson::ct_error::enodata
>;
1285 using get_omap_iertr
=
1286 ::crimson::interruptible::interruptible_errorator
<
1287 ::crimson::osd::IOInterruptCondition
,
1290 get_omap_iertr::future
<
1291 crimson::os::FuturizedStore::Shard::omap_values_t
>
1292 maybe_get_omap_vals_by_keys(
1293 crimson::os::FuturizedStore::Shard
* store
,
1294 const crimson::os::CollectionRef
& coll
,
1295 const object_info_t
& oi
,
1296 const std::set
<std::string
>& keys_to_get
)
1299 return store
->omap_get_values(coll
, ghobject_t
{oi
.soid
}, keys_to_get
);
1301 return crimson::ct_error::enodata::make();
1306 get_omap_iertr::future
<
1307 std::tuple
<bool, crimson::os::FuturizedStore::Shard::omap_values_t
>>
1308 maybe_get_omap_vals(
1309 crimson::os::FuturizedStore::Shard
* store
,
1310 const crimson::os::CollectionRef
& coll
,
1311 const object_info_t
& oi
,
1312 const std::string
& start_after
)
1315 return store
->omap_get_values(coll
, ghobject_t
{oi
.soid
}, start_after
);
1317 return crimson::ct_error::enodata::make();
1321 PGBackend::ll_read_ierrorator::future
<ceph::bufferlist
>
1322 PGBackend::omap_get_header(
1323 const crimson::os::CollectionRef
& c
,
1324 const ghobject_t
& oid
) const
1326 return store
->omap_get_header(c
, oid
)
1328 crimson::ct_error::enodata::handle([] {
1329 return seastar::make_ready_future
<bufferlist
>();
1331 ll_read_errorator::pass_further
{}
1335 PGBackend::ll_read_ierrorator::future
<>
1336 PGBackend::omap_get_header(
1337 const ObjectState
& os
,
1339 object_stat_sum_t
& delta_stats
) const
1341 return omap_get_header(coll
, ghobject_t
{os
.oi
.soid
}).safe_then_interruptible(
1342 [&delta_stats
, &osd_op
] (ceph::bufferlist
&& header
) {
1343 osd_op
.outdata
= std::move(header
);
1344 delta_stats
.num_rd_kb
+= shift_round_up(osd_op
.outdata
.length(), 10);
1345 delta_stats
.num_rd
++;
1346 return seastar::now();
1350 PGBackend::ll_read_ierrorator::future
<>
1351 PGBackend::omap_get_keys(
1352 const ObjectState
& os
,
1354 object_stat_sum_t
& delta_stats
) const
1356 if (!os
.exists
|| os
.oi
.is_whiteout()) {
1357 logger().debug("{}: object does not exist: {}", os
.oi
.soid
);
1358 return crimson::ct_error::enoent::make();
1360 std::string start_after
;
1361 uint64_t max_return
;
1363 auto p
= osd_op
.indata
.cbegin();
1364 decode(start_after
, p
);
1365 decode(max_return
, p
);
1366 } catch (buffer::error
&) {
1367 throw crimson::osd::invalid_argument
{};
1370 std::min(max_return
, local_conf()->osd_max_omap_entries_per_request
);
1373 // TODO: truly chunk the reading
1374 return maybe_get_omap_vals(store
, coll
, os
.oi
, start_after
).safe_then_interruptible(
1375 [=,&delta_stats
, &osd_op
](auto ret
) {
1376 ceph::bufferlist result
;
1377 bool truncated
= false;
1379 for (auto &[key
, val
] : std::get
<1>(ret
)) {
1380 if (num
>= max_return
||
1381 result
.length() >= local_conf()->osd_max_omap_bytes_per_request
) {
1385 encode(key
, result
);
1388 encode(num
, osd_op
.outdata
);
1389 osd_op
.outdata
.claim_append(result
);
1390 encode(truncated
, osd_op
.outdata
);
1391 delta_stats
.num_rd_kb
+= shift_round_up(osd_op
.outdata
.length(), 10);
1392 delta_stats
.num_rd
++;
1393 return seastar::now();
1394 }).handle_error_interruptible(
1395 crimson::ct_error::enodata::handle([&osd_op
] {
1397 bool truncated
= false;
1398 encode(num
, osd_op
.outdata
);
1399 encode(truncated
, osd_op
.outdata
);
1401 return seastar::now();
1403 ll_read_errorator::pass_further
{}
1407 PGBackend::omap_cmp_ertr::future
<> do_omap_val_cmp(
1408 std::map
<std::string
, bufferlist
, std::less
<>> out
,
1409 std::map
<std::string
, std::pair
<bufferlist
, int>> assertions
)
1412 for (const auto &[akey
, avalue
] : assertions
) {
1413 const auto [abl
, aflag
] = avalue
;
1414 auto out_entry
= out
.find(akey
);
1415 bufferlist
&bl
= (out_entry
!= out
.end()) ? out_entry
->second
: empty
;
1417 case CEPH_OSD_CMPXATTR_OP_EQ
:
1419 return crimson::ct_error::ecanceled::make();
1422 case CEPH_OSD_CMPXATTR_OP_LT
:
1424 return crimson::ct_error::ecanceled::make();
1427 case CEPH_OSD_CMPXATTR_OP_GT
:
1429 return crimson::ct_error::ecanceled::make();
1433 return crimson::ct_error::invarg::make();
1436 return PGBackend::omap_cmp_ertr::now();
1438 PGBackend::omap_cmp_iertr::future
<>
1439 PGBackend::omap_cmp(
1440 const ObjectState
& os
,
1442 object_stat_sum_t
& delta_stats
) const
1444 if (!os
.exists
|| os
.oi
.is_whiteout()) {
1445 logger().debug("{}: object does not exist: {}", os
.oi
.soid
);
1446 return crimson::ct_error::enoent::make();
1449 auto bp
= osd_op
.indata
.cbegin();
1450 std::map
<std::string
, std::pair
<bufferlist
, int> > assertions
;
1452 decode(assertions
, bp
);
1453 } catch (buffer::error
&) {
1454 return crimson::ct_error::invarg::make();
1457 delta_stats
.num_rd
++;
1458 if (os
.oi
.is_omap()) {
1459 std::set
<std::string
> to_get
;
1460 for (auto &i
: assertions
) {
1461 to_get
.insert(i
.first
);
1463 return store
->omap_get_values(coll
, ghobject_t
{os
.oi
.soid
}, to_get
)
1464 .safe_then([=, &osd_op
] (auto&& out
) -> omap_cmp_iertr::future
<> {
1466 return do_omap_val_cmp(out
, assertions
);
1469 return crimson::ct_error::ecanceled::make();
1472 PGBackend::ll_read_ierrorator::future
<>
1473 PGBackend::omap_get_vals(
1474 const ObjectState
& os
,
1476 object_stat_sum_t
& delta_stats
) const
1478 if (!os
.exists
|| os
.oi
.is_whiteout()) {
1479 logger().debug("{}: object does not exist: {}", os
.oi
.soid
);
1480 return crimson::ct_error::enoent::make();
1482 std::string start_after
;
1483 uint64_t max_return
;
1484 std::string filter_prefix
;
1486 auto p
= osd_op
.indata
.cbegin();
1487 decode(start_after
, p
);
1488 decode(max_return
, p
);
1489 decode(filter_prefix
, p
);
1490 } catch (buffer::error
&) {
1491 throw crimson::osd::invalid_argument
{};
1495 std::min(max_return
, local_conf()->osd_max_omap_entries_per_request
);
1496 delta_stats
.num_rd_kb
+= shift_round_up(osd_op
.outdata
.length(), 10);
1497 delta_stats
.num_rd
++;
1499 // TODO: truly chunk the reading
1500 return maybe_get_omap_vals(store
, coll
, os
.oi
, start_after
)
1501 .safe_then_interruptible(
1502 [=, &osd_op
] (auto&& ret
) {
1503 auto [done
, vals
] = std::move(ret
);
1505 ceph::bufferlist result
;
1506 bool truncated
= false;
1508 auto iter
= filter_prefix
> start_after
? vals
.lower_bound(filter_prefix
)
1510 for (; iter
!= std::end(vals
); ++iter
) {
1511 const auto& [key
, value
] = *iter
;
1512 if (key
.substr(0, filter_prefix
.size()) != filter_prefix
) {
1514 } else if (num
>= max_return
||
1515 result
.length() >= local_conf()->osd_max_omap_bytes_per_request
) {
1519 encode(key
, result
);
1520 encode(value
, result
);
1523 encode(num
, osd_op
.outdata
);
1524 osd_op
.outdata
.claim_append(result
);
1525 encode(truncated
, osd_op
.outdata
);
1526 return ll_read_errorator::now();
1527 }).handle_error_interruptible(
1528 crimson::ct_error::enodata::handle([&osd_op
] {
1529 encode(uint32_t{0} /* num */, osd_op
.outdata
);
1530 encode(bool{false} /* truncated */, osd_op
.outdata
);
1532 return ll_read_errorator::now();
1534 ll_read_errorator::pass_further
{}
1538 PGBackend::ll_read_ierrorator::future
<>
1539 PGBackend::omap_get_vals_by_keys(
1540 const ObjectState
& os
,
1542 object_stat_sum_t
& delta_stats
) const
1544 if (!os
.exists
|| os
.oi
.is_whiteout()) {
1545 logger().debug("{}: object does not exist: {}", __func__
, os
.oi
.soid
);
1546 return crimson::ct_error::enoent::make();
1549 std::set
<std::string
> keys_to_get
;
1551 auto p
= osd_op
.indata
.cbegin();
1552 decode(keys_to_get
, p
);
1553 } catch (buffer::error
&) {
1554 throw crimson::osd::invalid_argument();
1556 delta_stats
.num_rd_kb
+= shift_round_up(osd_op
.outdata
.length(), 10);
1557 delta_stats
.num_rd
++;
1558 return maybe_get_omap_vals_by_keys(store
, coll
, os
.oi
, keys_to_get
)
1559 .safe_then_interruptible(
1560 [&osd_op
] (crimson::os::FuturizedStore::Shard::omap_values_t
&& vals
) {
1561 encode(vals
, osd_op
.outdata
);
1562 return ll_read_errorator::now();
1563 }).handle_error_interruptible(
1564 crimson::ct_error::enodata::handle([&osd_op
] {
1566 encode(num
, osd_op
.outdata
);
1568 return ll_read_errorator::now();
1570 ll_read_errorator::pass_further
{}
1574 PGBackend::interruptible_future
<>
1575 PGBackend::omap_set_vals(
1577 const OSDOp
& osd_op
,
1578 ceph::os::Transaction
& txn
,
1579 osd_op_params_t
& osd_op_params
,
1580 object_stat_sum_t
& delta_stats
)
1582 maybe_create_new_object(os
, txn
, delta_stats
);
1584 ceph::bufferlist to_set_bl
;
1586 auto p
= osd_op
.indata
.cbegin();
1587 decode_str_str_map_to_bl(p
, &to_set_bl
);
1588 } catch (buffer::error
&) {
1589 throw crimson::osd::invalid_argument
{};
1592 txn
.omap_setkeys(coll
->get_cid(), ghobject_t
{os
.oi
.soid
}, to_set_bl
);
1593 osd_op_params
.clean_regions
.mark_omap_dirty();
1594 delta_stats
.num_wr
++;
1595 delta_stats
.num_wr_kb
+= shift_round_up(to_set_bl
.length(), 10);
1596 os
.oi
.set_flag(object_info_t::FLAG_OMAP
);
1597 os
.oi
.clear_omap_digest();
1598 return seastar::now();
1601 PGBackend::interruptible_future
<>
1602 PGBackend::omap_set_header(
1604 const OSDOp
& osd_op
,
1605 ceph::os::Transaction
& txn
,
1606 osd_op_params_t
& osd_op_params
,
1607 object_stat_sum_t
& delta_stats
)
1609 maybe_create_new_object(os
, txn
, delta_stats
);
1610 txn
.omap_setheader(coll
->get_cid(), ghobject_t
{os
.oi
.soid
}, osd_op
.indata
);
1611 osd_op_params
.clean_regions
.mark_omap_dirty();
1612 delta_stats
.num_wr
++;
1613 os
.oi
.set_flag(object_info_t::FLAG_OMAP
);
1614 os
.oi
.clear_omap_digest();
1615 return seastar::now();
1618 PGBackend::interruptible_future
<> PGBackend::omap_remove_range(
1620 const OSDOp
& osd_op
,
1621 ceph::os::Transaction
& txn
,
1622 object_stat_sum_t
& delta_stats
)
1624 std::string key_begin
, key_end
;
1626 auto p
= osd_op
.indata
.cbegin();
1627 decode(key_begin
, p
);
1629 } catch (buffer::error
& e
) {
1630 throw crimson::osd::invalid_argument
{};
1632 txn
.omap_rmkeyrange(coll
->get_cid(), ghobject_t
{os
.oi
.soid
}, key_begin
, key_end
);
1633 delta_stats
.num_wr
++;
1634 os
.oi
.clear_omap_digest();
1635 return seastar::now();
1638 PGBackend::interruptible_future
<> PGBackend::omap_remove_key(
1640 const OSDOp
& osd_op
,
1641 ceph::os::Transaction
& txn
)
1643 ceph::bufferlist to_rm_bl
;
1645 auto p
= osd_op
.indata
.cbegin();
1646 decode_str_set_to_bl(p
, &to_rm_bl
);
1647 } catch (buffer::error
& e
) {
1648 throw crimson::osd::invalid_argument
{};
1650 txn
.omap_rmkeys(coll
->get_cid(), ghobject_t
{os
.oi
.soid
}, to_rm_bl
);
1652 // ctx->clean_regions.mark_omap_dirty();
1653 // ctx->delta_stats.num_wr++;
1654 os
.oi
.clear_omap_digest();
1655 return seastar::now();
1658 PGBackend::omap_clear_iertr::future
<>
1659 PGBackend::omap_clear(
1662 ceph::os::Transaction
& txn
,
1663 osd_op_params_t
& osd_op_params
,
1664 object_stat_sum_t
& delta_stats
)
1666 if (!os
.exists
|| os
.oi
.is_whiteout()) {
1667 logger().debug("{}: object does not exist: {}", os
.oi
.soid
);
1668 return crimson::ct_error::enoent::make();
1670 if (!os
.oi
.is_omap()) {
1671 return omap_clear_ertr::now();
1673 txn
.omap_clear(coll
->get_cid(), ghobject_t
{os
.oi
.soid
});
1674 osd_op_params
.clean_regions
.mark_omap_dirty();
1675 delta_stats
.num_wr
++;
1676 os
.oi
.clear_omap_digest();
1677 os
.oi
.clear_flag(object_info_t::FLAG_OMAP
);
1678 return omap_clear_ertr::now();
1681 PGBackend::interruptible_future
<struct stat
>
1684 const ghobject_t
& oid
) const
1686 return store
->stat(c
, oid
);
1689 PGBackend::read_errorator::future
<std::map
<uint64_t, uint64_t>>
1692 const ghobject_t
& oid
,
1696 return store
->fiemap(c
, oid
, off
, len
);
1699 PGBackend::write_iertr::future
<> PGBackend::tmapput(
1701 const OSDOp
& osd_op
,
1702 ceph::os::Transaction
& txn
,
1703 object_stat_sum_t
& delta_stats
,
1704 osd_op_params_t
& osd_op_params
)
1706 logger().debug("PGBackend::tmapput: {}", os
.oi
.soid
);
1707 auto ret
= crimson::common::do_tmap_put(osd_op
.indata
.cbegin());
1708 if (!ret
.has_value()) {
1709 logger().debug("PGBackend::tmapup: {}, ret={}", os
.oi
.soid
, ret
.error());
1710 ceph_assert(ret
.error() == -EINVAL
);
1711 return crimson::ct_error::invarg::make();
1713 auto bl
= std::move(ret
.value());
1725 PGBackend::tmapup_iertr::future
<> PGBackend::tmapup(
1727 const OSDOp
& osd_op
,
1728 ceph::os::Transaction
& txn
,
1729 object_stat_sum_t
& delta_stats
,
1730 osd_op_params_t
& osd_op_params
)
1732 logger().debug("PGBackend::tmapup: {}", os
.oi
.soid
);
1733 return PGBackend::write_iertr::now(
1734 ).si_then([this, &os
] {
1735 return _read(os
.oi
.soid
, 0, os
.oi
.size
, 0);
1736 }).handle_error_interruptible(
1737 crimson::ct_error::enoent::handle([](auto &) {
1738 return seastar::make_ready_future
<bufferlist
>();
1740 PGBackend::write_iertr::pass_further
{},
1741 crimson::ct_error::assert_all
{"read error in mutate_object_contents"}
1742 ).si_then([this, &os
, &osd_op
, &txn
,
1743 &delta_stats
, &osd_op_params
]
1744 (auto &&bl
) mutable -> PGBackend::tmapup_iertr::future
<> {
1745 auto result
= crimson::common::do_tmap_up(
1746 osd_op
.indata
.cbegin(),
1748 if (!result
.has_value()) {
1749 int ret
= result
.error();
1750 logger().debug("PGBackend::tmapup: {}, ret={}", os
.oi
.soid
, ret
);
1753 return crimson::ct_error::eexist::make();
1755 return crimson::ct_error::enoent::make();
1757 return crimson::ct_error::invarg::make();
1759 ceph_assert(0 == "impossible error");
1760 return crimson::ct_error::invarg::make();
1765 "PGBackend::tmapup: {}, result.value.length()={}, ret=0",
1766 os
.oi
.soid
, result
.value().length());
1769 result
.value().length(),
1778 PGBackend::read_ierrorator::future
<> PGBackend::tmapget(
1779 const ObjectState
& os
,
1781 object_stat_sum_t
& delta_stats
)
1783 logger().debug("PGBackend::tmapget: {}", os
.oi
.soid
);
1784 const auto& oi
= os
.oi
;
1785 logger().debug("PGBackend::tmapget: read {} 0~{}", oi
.soid
, oi
.size
);
1786 if (!os
.exists
|| os
.oi
.is_whiteout()) {
1787 logger().debug("PGBackend::tmapget: {} DNE", os
.oi
.soid
);
1788 return crimson::ct_error::enoent::make();
1791 return _read(oi
.soid
, 0, oi
.size
, 0).safe_then_interruptible_tuple(
1792 [&delta_stats
, &osd_op
](auto&& bl
) -> read_errorator::future
<> {
1793 logger().debug("PGBackend::tmapget: data length: {}", bl
.length());
1794 osd_op
.op
.extent
.length
= bl
.length();
1796 delta_stats
.num_rd
++;
1797 delta_stats
.num_rd_kb
+= shift_round_up(bl
.length(), 10);
1798 osd_op
.outdata
= std::move(bl
);
1799 return read_errorator::now();
1800 }, crimson::ct_error::input_output_error::handle([] {
1801 return read_errorator::future
<>{crimson::ct_error::object_corrupted::make()};
1803 read_errorator::pass_further
{});