]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/osd/pg_backend.cc
add stop-gap to fix compat with CPUs not supporting SSE 4.1
[ceph.git] / ceph / src / crimson / osd / pg_backend.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "pg_backend.h"
5
6 #include <charconv>
7 #include <optional>
8 #include <boost/range/adaptor/filtered.hpp>
9 #include <boost/range/adaptor/transformed.hpp>
10 #include <boost/range/algorithm/copy.hpp>
11 #include <fmt/format.h>
12 #include <fmt/ostream.h>
13 #include <seastar/core/print.hh>
14
15 #include "messages/MOSDOp.h"
16 #include "os/Transaction.h"
17 #include "common/Checksummer.h"
18 #include "common/Clock.h"
19
20 #include "crimson/common/exception.h"
21 #include "crimson/common/tmap_helpers.h"
22 #include "crimson/os/futurized_collection.h"
23 #include "crimson/os/futurized_store.h"
24 #include "crimson/osd/osd_operation.h"
25 #include "crimson/osd/object_context_loader.h"
26 #include "replicated_backend.h"
27 #include "replicated_recovery_backend.h"
28 #include "ec_backend.h"
29 #include "exceptions.h"
30
31 namespace {
32 seastar::logger& logger() {
33 return crimson::get_logger(ceph_subsys_osd);
34 }
35 }
36
37 using std::runtime_error;
38 using std::string;
39 using std::string_view;
40 using crimson::common::local_conf;
41
42 std::unique_ptr<PGBackend>
43 PGBackend::create(pg_t pgid,
44 const pg_shard_t pg_shard,
45 const pg_pool_t& pool,
46 crimson::os::CollectionRef coll,
47 crimson::osd::ShardServices& shard_services,
48 const ec_profile_t& ec_profile,
49 DoutPrefixProvider &dpp)
50 {
51 switch (pool.type) {
52 case pg_pool_t::TYPE_REPLICATED:
53 return std::make_unique<ReplicatedBackend>(pgid, pg_shard,
54 coll, shard_services,
55 dpp);
56 case pg_pool_t::TYPE_ERASURE:
57 return std::make_unique<ECBackend>(pg_shard.shard, coll, shard_services,
58 std::move(ec_profile),
59 pool.stripe_width,
60 dpp);
61 default:
62 throw runtime_error(seastar::format("unsupported pool type '{}'",
63 pool.type));
64 }
65 }
66
67 PGBackend::PGBackend(shard_id_t shard,
68 CollectionRef coll,
69 crimson::osd::ShardServices &shard_services,
70 DoutPrefixProvider &dpp)
71 : shard{shard},
72 coll{coll},
73 shard_services{shard_services},
74 dpp{dpp},
75 store{&shard_services.get_store()}
76 {}
77
78 PGBackend::load_metadata_iertr::future
79 <PGBackend::loaded_object_md_t::ref>
80 PGBackend::load_metadata(const hobject_t& oid)
81 {
82 return interruptor::make_interruptible(store->get_attrs(
83 coll,
84 ghobject_t{oid, ghobject_t::NO_GEN, shard})).safe_then_interruptible(
85 [oid](auto &&attrs) -> load_metadata_ertr::future<loaded_object_md_t::ref>{
86 loaded_object_md_t::ref ret(new loaded_object_md_t());
87 if (auto oiiter = attrs.find(OI_ATTR); oiiter != attrs.end()) {
88 bufferlist bl = std::move(oiiter->second);
89 try {
90 ret->os = ObjectState(
91 object_info_t(bl, oid),
92 true);
93 } catch (const buffer::error&) {
94 logger().warn("unable to decode ObjectState");
95 throw crimson::osd::invalid_argument();
96 }
97 } else {
98 logger().error(
99 "load_metadata: object {} present but missing object info",
100 oid);
101 return crimson::ct_error::object_corrupted::make();
102 }
103
104 if (oid.is_head()) {
105 // Returning object_corrupted when the object exsits and the
106 // Snapset is either not found or empty.
107 bool object_corrupted = true;
108 if (auto ssiter = attrs.find(SS_ATTR); ssiter != attrs.end()) {
109 object_corrupted = false;
110 bufferlist bl = std::move(ssiter->second);
111 if (bl.length()) {
112 ret->ssc = new crimson::osd::SnapSetContext(oid.get_snapdir());
113 try {
114 ret->ssc->snapset = SnapSet(bl);
115 ret->ssc->exists = true;
116 logger().debug(
117 "load_metadata: object {} and snapset {} present",
118 oid, ret->ssc->snapset);
119 } catch (const buffer::error&) {
120 logger().warn("unable to decode SnapSet");
121 throw crimson::osd::invalid_argument();
122 }
123 } else {
124 object_corrupted = true;
125 }
126 }
127 if (object_corrupted) {
128 logger().error(
129 "load_metadata: object {} present but missing snapset",
130 oid);
131 return crimson::ct_error::object_corrupted::make();
132 }
133 }
134
135 return load_metadata_ertr::make_ready_future<loaded_object_md_t::ref>(
136 std::move(ret));
137 }, crimson::ct_error::enoent::handle([oid] {
138 logger().debug(
139 "load_metadata: object {} doesn't exist, returning empty metadata",
140 oid);
141 return load_metadata_ertr::make_ready_future<loaded_object_md_t::ref>(
142 new loaded_object_md_t{
143 ObjectState(
144 object_info_t(oid),
145 false),
146 oid.is_head() ? (new crimson::osd::SnapSetContext(oid)) : nullptr
147 });
148 }));
149 }
150
151 PGBackend::rep_op_fut_t
152 PGBackend::mutate_object(
153 std::set<pg_shard_t> pg_shards,
154 crimson::osd::ObjectContextRef &&obc,
155 ceph::os::Transaction&& txn,
156 osd_op_params_t&& osd_op_p,
157 epoch_t min_epoch,
158 epoch_t map_epoch,
159 std::vector<pg_log_entry_t>&& log_entries)
160 {
161 logger().trace("mutate_object: num_ops={}", txn.get_num_ops());
162 if (obc->obs.exists) {
163 #if 0
164 obc->obs.oi.version = ctx->at_version;
165 obc->obs.oi.prior_version = ctx->obs->oi.version;
166 #endif
167
168 obc->obs.oi.prior_version = obc->obs.oi.version;
169 obc->obs.oi.version = osd_op_p.at_version;
170 if (osd_op_p.user_at_version > obc->obs.oi.user_version)
171 obc->obs.oi.user_version = osd_op_p.user_at_version;
172 obc->obs.oi.last_reqid = osd_op_p.req_id;
173 obc->obs.oi.mtime = osd_op_p.mtime;
174 obc->obs.oi.local_mtime = ceph_clock_now();
175
176 // object_info_t
177 {
178 ceph::bufferlist osv;
179 obc->obs.oi.encode_no_oid(osv, CEPH_FEATURES_ALL);
180 // TODO: get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD, nullptr));
181 txn.setattr(coll->get_cid(), ghobject_t{obc->obs.oi.soid}, OI_ATTR, osv);
182 }
183
184 // snapset
185 if (obc->obs.oi.soid.snap == CEPH_NOSNAP) {
186 logger().debug("final snapset {} in {}",
187 obc->ssc->snapset, obc->obs.oi.soid);
188 ceph::bufferlist bss;
189 encode(obc->ssc->snapset, bss);
190 txn.setattr(coll->get_cid(), ghobject_t{obc->obs.oi.soid}, SS_ATTR, bss);
191 obc->ssc->exists = true;
192 } else {
193 logger().debug("no snapset (this is a clone)");
194 }
195 } else {
196 // reset cached ObjectState without enforcing eviction
197 obc->obs.oi = object_info_t(obc->obs.oi.soid);
198 }
199 return _submit_transaction(
200 std::move(pg_shards), obc->obs.oi.soid, std::move(txn),
201 std::move(osd_op_p), min_epoch, map_epoch, std::move(log_entries));
202 }
203
204 static inline bool _read_verify_data(
205 const object_info_t& oi,
206 const ceph::bufferlist& data)
207 {
208 if (oi.is_data_digest() && oi.size == data.length()) {
209 // whole object? can we verify the checksum?
210 if (auto crc = data.crc32c(-1); crc != oi.data_digest) {
211 logger().error("full-object read crc {} != expected {} on {}",
212 crc, oi.data_digest, oi.soid);
213 // todo: mark soid missing, perform recovery, and retry
214 return false;
215 }
216 }
217 return true;
218 }
219
220 PGBackend::read_ierrorator::future<>
221 PGBackend::read(const ObjectState& os, OSDOp& osd_op,
222 object_stat_sum_t& delta_stats)
223 {
224 const auto& oi = os.oi;
225 const ceph_osd_op& op = osd_op.op;
226 const uint64_t offset = op.extent.offset;
227 uint64_t length = op.extent.length;
228 logger().trace("read: {} {}~{}", oi.soid, offset, length);
229
230 if (!os.exists || os.oi.is_whiteout()) {
231 logger().debug("{}: {} DNE", __func__, os.oi.soid);
232 return crimson::ct_error::enoent::make();
233 }
234 // are we beyond truncate_size?
235 size_t size = oi.size;
236 if ((op.extent.truncate_seq > oi.truncate_seq) &&
237 (op.extent.truncate_size < offset + length) &&
238 (op.extent.truncate_size < size)) {
239 size = op.extent.truncate_size;
240 }
241 if (offset >= size) {
242 // read size was trimmed to zero and it is expected to do nothing,
243 return read_errorator::now();
244 }
245 if (!length) {
246 // read the whole object if length is 0
247 length = size;
248 }
249 return _read(oi.soid, offset, length, op.flags).safe_then_interruptible_tuple(
250 [&delta_stats, &oi, &osd_op](auto&& bl) -> read_errorator::future<> {
251 if (!_read_verify_data(oi, bl)) {
252 // crc mismatches
253 return crimson::ct_error::object_corrupted::make();
254 }
255 logger().debug("read: data length: {}", bl.length());
256 osd_op.op.extent.length = bl.length();
257 osd_op.rval = 0;
258 delta_stats.num_rd++;
259 delta_stats.num_rd_kb += shift_round_up(bl.length(), 10);
260 osd_op.outdata = std::move(bl);
261 return read_errorator::now();
262 }, crimson::ct_error::input_output_error::handle([] {
263 return read_errorator::future<>{crimson::ct_error::object_corrupted::make()};
264 }),
265 read_errorator::pass_further{});
266 }
267
268 PGBackend::read_ierrorator::future<>
269 PGBackend::sparse_read(const ObjectState& os, OSDOp& osd_op,
270 object_stat_sum_t& delta_stats)
271 {
272 if (!os.exists || os.oi.is_whiteout()) {
273 logger().debug("{}: {} DNE", __func__, os.oi.soid);
274 return crimson::ct_error::enoent::make();
275 }
276
277 const auto& op = osd_op.op;
278 /* clients (particularly cephfs) may send truncate operations out of order
279 * w.r.t. reads. op.extent.truncate_seq and op.extent.truncate_size allow
280 * the OSD to determine whether the client submitted read needs to be
281 * adjusted to compensate for a truncate the OSD hasn't seen yet.
282 */
283 uint64_t adjusted_size = os.oi.size;
284 const uint64_t offset = op.extent.offset;
285 uint64_t adjusted_length = op.extent.length;
286 if ((os.oi.truncate_seq < op.extent.truncate_seq) &&
287 (op.extent.offset + op.extent.length > op.extent.truncate_size) &&
288 (adjusted_size > op.extent.truncate_size)) {
289 adjusted_size = op.extent.truncate_size;
290 }
291 if (offset > adjusted_size) {
292 adjusted_length = 0;
293 } else if (offset + adjusted_length > adjusted_size) {
294 adjusted_length = adjusted_size - offset;
295 }
296 logger().trace("sparse_read: {} {}~{}",
297 os.oi.soid, op.extent.offset, op.extent.length);
298 return interruptor::make_interruptible(store->fiemap(coll, ghobject_t{os.oi.soid},
299 offset, adjusted_length)).safe_then_interruptible(
300 [&delta_stats, &os, &osd_op, this](auto&& m) {
301 return seastar::do_with(interval_set<uint64_t>{std::move(m)},
302 [&delta_stats, &os, &osd_op, this](auto&& extents) {
303 return interruptor::make_interruptible(store->readv(coll, ghobject_t{os.oi.soid},
304 extents, osd_op.op.flags)).safe_then_interruptible_tuple(
305 [&delta_stats, &os, &osd_op, &extents](auto&& bl) -> read_errorator::future<> {
306 if (_read_verify_data(os.oi, bl)) {
307 osd_op.op.extent.length = bl.length();
308 // re-encode since it might be modified
309 ceph::encode(extents, osd_op.outdata);
310 encode_destructively(bl, osd_op.outdata);
311 logger().trace("sparse_read got {} bytes from object {}",
312 osd_op.op.extent.length, os.oi.soid);
313 delta_stats.num_rd++;
314 delta_stats.num_rd_kb += shift_round_up(osd_op.op.extent.length, 10);
315 return read_errorator::make_ready_future<>();
316 } else {
317 // crc mismatches
318 return crimson::ct_error::object_corrupted::make();
319 }
320 }, crimson::ct_error::input_output_error::handle([] {
321 return read_errorator::future<>{crimson::ct_error::object_corrupted::make()};
322 }),
323 read_errorator::pass_further{});
324 });
325 });
326 }
327
328 namespace {
329
330 template<class CSum>
331 PGBackend::checksum_errorator::future<>
332 do_checksum(ceph::bufferlist& init_value_bl,
333 size_t chunk_size,
334 const ceph::bufferlist& buf,
335 ceph::bufferlist& result)
336 {
337 typename CSum::init_value_t init_value;
338 auto init_value_p = init_value_bl.cbegin();
339 try {
340 decode(init_value, init_value_p);
341 // chop off the consumed part
342 init_value_bl.splice(0, init_value_p.get_off());
343 } catch (const ceph::buffer::end_of_buffer&) {
344 logger().warn("{}: init value not provided", __func__);
345 return crimson::ct_error::invarg::make();
346 }
347 const uint32_t chunk_count = buf.length() / chunk_size;
348 ceph::bufferptr csum_data{
349 ceph::buffer::create(sizeof(typename CSum::value_t) * chunk_count)};
350 Checksummer::calculate<CSum>(
351 init_value, chunk_size, 0, buf.length(), buf, &csum_data);
352 encode(chunk_count, result);
353 result.append(std::move(csum_data));
354 return PGBackend::checksum_errorator::now();
355 }
356 }
357
358 PGBackend::checksum_ierrorator::future<>
359 PGBackend::checksum(const ObjectState& os, OSDOp& osd_op)
360 {
361 // sanity tests and normalize the argments
362 auto& checksum = osd_op.op.checksum;
363 if (checksum.offset == 0 && checksum.length == 0) {
364 // zeroed offset+length implies checksum whole object
365 checksum.length = os.oi.size;
366 } else if (checksum.offset >= os.oi.size) {
367 // read size was trimmed to zero, do nothing,
368 // see PGBackend::read()
369 return checksum_errorator::now();
370 }
371 if (checksum.chunk_size > 0) {
372 if (checksum.length == 0) {
373 logger().warn("{}: length required when chunk size provided", __func__);
374 return crimson::ct_error::invarg::make();
375 }
376 if (checksum.length % checksum.chunk_size != 0) {
377 logger().warn("{}: length not aligned to chunk size", __func__);
378 return crimson::ct_error::invarg::make();
379 }
380 } else {
381 checksum.chunk_size = checksum.length;
382 }
383 if (checksum.length == 0) {
384 uint32_t count = 0;
385 encode(count, osd_op.outdata);
386 return checksum_errorator::now();
387 }
388
389 // read the chunk to be checksum'ed
390 return _read(os.oi.soid, checksum.offset, checksum.length, osd_op.op.flags)
391 .safe_then_interruptible(
392 [&osd_op](auto&& read_bl) mutable -> checksum_errorator::future<> {
393 auto& checksum = osd_op.op.checksum;
394 if (read_bl.length() != checksum.length) {
395 logger().warn("checksum: bytes read {} != {}",
396 read_bl.length(), checksum.length);
397 return crimson::ct_error::invarg::make();
398 }
399 // calculate its checksum and put the result in outdata
400 switch (checksum.type) {
401 case CEPH_OSD_CHECKSUM_OP_TYPE_XXHASH32:
402 return do_checksum<Checksummer::xxhash32>(osd_op.indata,
403 checksum.chunk_size,
404 read_bl,
405 osd_op.outdata);
406 case CEPH_OSD_CHECKSUM_OP_TYPE_XXHASH64:
407 return do_checksum<Checksummer::xxhash64>(osd_op.indata,
408 checksum.chunk_size,
409 read_bl,
410 osd_op.outdata);
411 case CEPH_OSD_CHECKSUM_OP_TYPE_CRC32C:
412 return do_checksum<Checksummer::crc32c>(osd_op.indata,
413 checksum.chunk_size,
414 read_bl,
415 osd_op.outdata);
416 default:
417 logger().warn("checksum: unknown crc type ({})",
418 static_cast<uint32_t>(checksum.type));
419 return crimson::ct_error::invarg::make();
420 }
421 });
422 }
423
424 PGBackend::cmp_ext_ierrorator::future<>
425 PGBackend::cmp_ext(const ObjectState& os, OSDOp& osd_op)
426 {
427 const ceph_osd_op& op = osd_op.op;
428 uint64_t obj_size = os.oi.size;
429 if (os.oi.truncate_seq < op.extent.truncate_seq &&
430 op.extent.offset + op.extent.length > op.extent.truncate_size) {
431 obj_size = op.extent.truncate_size;
432 }
433 uint64_t ext_len;
434 if (op.extent.offset >= obj_size) {
435 ext_len = 0;
436 } else if (op.extent.offset + op.extent.length > obj_size) {
437 ext_len = obj_size - op.extent.offset;
438 } else {
439 ext_len = op.extent.length;
440 }
441 auto read_ext = ll_read_ierrorator::make_ready_future<ceph::bufferlist>();
442 if (ext_len == 0) {
443 logger().debug("{}: zero length extent", __func__);
444 } else if (!os.exists || os.oi.is_whiteout()) {
445 logger().debug("{}: {} DNE", __func__, os.oi.soid);
446 } else {
447 read_ext = _read(os.oi.soid, op.extent.offset, ext_len, 0);
448 }
449 return read_ext.safe_then_interruptible([&osd_op](auto&& read_bl)
450 -> cmp_ext_errorator::future<> {
451 for (unsigned index = 0; index < osd_op.indata.length(); index++) {
452 char byte_in_op = osd_op.indata[index];
453 char byte_from_disk = (index < read_bl.length() ? read_bl[index] : 0);
454 if (byte_in_op != byte_from_disk) {
455 logger().debug("cmp_ext: mismatch at {}", index);
456 // Unlike other ops, we set osd_op.rval here and return a different
457 // error code via ct_error::cmp_fail.
458 osd_op.rval = -MAX_ERRNO - index;
459 return crimson::ct_error::cmp_fail::make();
460 }
461 }
462 osd_op.rval = 0;
463 return cmp_ext_errorator::make_ready_future<>();
464 });
465 }
466
467 PGBackend::stat_ierrorator::future<>
468 PGBackend::stat(
469 const ObjectState& os,
470 OSDOp& osd_op,
471 object_stat_sum_t& delta_stats)
472 {
473 if (os.exists/* TODO: && !os.is_whiteout() */) {
474 logger().debug("stat os.oi.size={}, os.oi.mtime={}", os.oi.size, os.oi.mtime);
475 encode(os.oi.size, osd_op.outdata);
476 encode(os.oi.mtime, osd_op.outdata);
477 } else {
478 logger().debug("stat object does not exist");
479 return crimson::ct_error::enoent::make();
480 }
481 delta_stats.num_rd++;
482 return stat_errorator::now();
483 }
484
485 PGBackend::write_iertr::future<> PGBackend::_writefull(
486 ObjectState& os,
487 off_t truncate_size,
488 const bufferlist& bl,
489 ceph::os::Transaction& txn,
490 osd_op_params_t& osd_op_params,
491 object_stat_sum_t& delta_stats,
492 unsigned flags)
493 {
494 const bool existing = maybe_create_new_object(os, txn, delta_stats);
495 if (existing && bl.length() < os.oi.size) {
496
497 txn.truncate(coll->get_cid(), ghobject_t{os.oi.soid}, bl.length());
498 truncate_update_size_and_usage(delta_stats, os.oi, truncate_size);
499
500 osd_op_params.clean_regions.mark_data_region_dirty(
501 bl.length(),
502 os.oi.size - bl.length());
503 }
504 if (bl.length()) {
505 txn.write(
506 coll->get_cid(), ghobject_t{os.oi.soid}, 0, bl.length(),
507 bl, flags);
508 update_size_and_usage(
509 delta_stats, os.oi, 0,
510 bl.length(), true);
511 osd_op_params.clean_regions.mark_data_region_dirty(
512 0,
513 std::max((uint64_t)bl.length(), os.oi.size));
514 }
515 return seastar::now();
516 }
517
518 PGBackend::write_iertr::future<> PGBackend::_truncate(
519 ObjectState& os,
520 ceph::os::Transaction& txn,
521 osd_op_params_t& osd_op_params,
522 object_stat_sum_t& delta_stats,
523 size_t offset,
524 size_t truncate_size,
525 uint32_t truncate_seq)
526 {
527 if (truncate_seq) {
528 assert(offset == truncate_size);
529 if (truncate_seq <= os.oi.truncate_seq) {
530 logger().debug("{} truncate seq {} <= current {}, no-op",
531 __func__, truncate_seq, os.oi.truncate_seq);
532 return write_ertr::make_ready_future<>();
533 } else {
534 logger().debug("{} truncate seq {} > current {}, truncating",
535 __func__, truncate_seq, os.oi.truncate_seq);
536 os.oi.truncate_seq = truncate_seq;
537 os.oi.truncate_size = truncate_size;
538 }
539 }
540 maybe_create_new_object(os, txn, delta_stats);
541 if (os.oi.size != offset) {
542 txn.truncate(
543 coll->get_cid(),
544 ghobject_t{os.oi.soid}, offset);
545 if (os.oi.size > offset) {
546 // TODO: modified_ranges.union_of(trim);
547 osd_op_params.clean_regions.mark_data_region_dirty(
548 offset,
549 os.oi.size - offset);
550 } else {
551 // os.oi.size < offset
552 osd_op_params.clean_regions.mark_data_region_dirty(
553 os.oi.size,
554 offset - os.oi.size);
555 }
556 truncate_update_size_and_usage(delta_stats, os.oi, offset);
557 os.oi.clear_data_digest();
558 }
559 delta_stats.num_wr++;
560 return write_ertr::now();
561 }
562
563 bool PGBackend::maybe_create_new_object(
564 ObjectState& os,
565 ceph::os::Transaction& txn,
566 object_stat_sum_t& delta_stats)
567 {
568 if (!os.exists) {
569 ceph_assert(!os.oi.is_whiteout());
570 os.exists = true;
571 os.oi.new_object();
572
573 txn.touch(coll->get_cid(), ghobject_t{os.oi.soid});
574 delta_stats.num_objects++;
575 return false;
576 } else if (os.oi.is_whiteout()) {
577 os.oi.clear_flag(object_info_t::FLAG_WHITEOUT);
578 delta_stats.num_whiteouts--;
579 }
580 return true;
581 }
582
583 void PGBackend::update_size_and_usage(object_stat_sum_t& delta_stats,
584 object_info_t& oi, uint64_t offset,
585 uint64_t length, bool write_full)
586 {
587 if (write_full ||
588 (offset + length > oi.size && length)) {
589 uint64_t new_size = offset + length;
590 delta_stats.num_bytes -= oi.size;
591 delta_stats.num_bytes += new_size;
592 oi.size = new_size;
593 }
594 delta_stats.num_wr++;
595 delta_stats.num_wr_kb += shift_round_up(length, 10);
596 }
597
598 void PGBackend::truncate_update_size_and_usage(object_stat_sum_t& delta_stats,
599 object_info_t& oi,
600 uint64_t truncate_size)
601 {
602 if (oi.size != truncate_size) {
603 delta_stats.num_bytes -= oi.size;
604 delta_stats.num_bytes += truncate_size;
605 oi.size = truncate_size;
606 }
607 }
608
609 static bool is_offset_and_length_valid(
610 const std::uint64_t offset,
611 const std::uint64_t length)
612 {
613 if (const std::uint64_t max = local_conf()->osd_max_object_size;
614 offset >= max || length > max || offset + length > max) {
615 logger().debug("{} osd_max_object_size: {}, offset: {}, len: {}; "
616 "Hard limit of object size is 4GB",
617 __func__, max, offset, length);
618 return false;
619 } else {
620 return true;
621 }
622 }
623
624 PGBackend::interruptible_future<> PGBackend::set_allochint(
625 ObjectState& os,
626 const OSDOp& osd_op,
627 ceph::os::Transaction& txn,
628 object_stat_sum_t& delta_stats)
629 {
630 maybe_create_new_object(os, txn, delta_stats);
631
632 os.oi.expected_object_size = osd_op.op.alloc_hint.expected_object_size;
633 os.oi.expected_write_size = osd_op.op.alloc_hint.expected_write_size;
634 os.oi.alloc_hint_flags = osd_op.op.alloc_hint.flags;
635 txn.set_alloc_hint(coll->get_cid(),
636 ghobject_t{os.oi.soid},
637 os.oi.expected_object_size,
638 os.oi.expected_write_size,
639 os.oi.alloc_hint_flags);
640 return seastar::now();
641 }
642
643 PGBackend::write_iertr::future<> PGBackend::write(
644 ObjectState& os,
645 const OSDOp& osd_op,
646 ceph::os::Transaction& txn,
647 osd_op_params_t& osd_op_params,
648 object_stat_sum_t& delta_stats)
649 {
650 const ceph_osd_op& op = osd_op.op;
651 uint64_t offset = op.extent.offset;
652 uint64_t length = op.extent.length;
653 bufferlist buf = osd_op.indata;
654 if (op.extent.length != osd_op.indata.length()) {
655 return crimson::ct_error::invarg::make();
656 }
657
658 if (!is_offset_and_length_valid(op.extent.offset, op.extent.length)) {
659 return crimson::ct_error::file_too_large::make();
660 }
661
662 if (auto seq = os.oi.truncate_seq;
663 seq != 0 && op.extent.truncate_seq < seq) {
664 // old write, arrived after trimtrunc
665 if (offset + length > os.oi.size) {
666 // no-op
667 if (offset > os.oi.size) {
668 length = 0;
669 buf.clear();
670 } else {
671 // truncate
672 auto len = os.oi.size - offset;
673 buf.splice(len, length);
674 length = len;
675 }
676 }
677 } else if (op.extent.truncate_seq > seq) {
678 // write arrives before trimtrunc
679 if (os.exists && !os.oi.is_whiteout()) {
680 txn.truncate(coll->get_cid(),
681 ghobject_t{os.oi.soid}, op.extent.truncate_size);
682 if (op.extent.truncate_size != os.oi.size) {
683 os.oi.size = length;
684 if (op.extent.truncate_size > os.oi.size) {
685 osd_op_params.clean_regions.mark_data_region_dirty(os.oi.size,
686 op.extent.truncate_size - os.oi.size);
687 } else {
688 osd_op_params.clean_regions.mark_data_region_dirty(op.extent.truncate_size,
689 os.oi.size - op.extent.truncate_size);
690 }
691 }
692 truncate_update_size_and_usage(delta_stats, os.oi, op.extent.truncate_size);
693 }
694 os.oi.truncate_seq = op.extent.truncate_seq;
695 os.oi.truncate_size = op.extent.truncate_size;
696 }
697 maybe_create_new_object(os, txn, delta_stats);
698 if (length == 0) {
699 if (offset > os.oi.size) {
700 txn.truncate(coll->get_cid(), ghobject_t{os.oi.soid}, op.extent.offset);
701 truncate_update_size_and_usage(delta_stats, os.oi, op.extent.offset);
702 } else {
703 txn.nop();
704 }
705 } else {
706 txn.write(coll->get_cid(), ghobject_t{os.oi.soid},
707 offset, length, std::move(buf), op.flags);
708 update_size_and_usage(delta_stats, os.oi, offset, length);
709 }
710 osd_op_params.clean_regions.mark_data_region_dirty(op.extent.offset,
711 op.extent.length);
712
713 return seastar::now();
714 }
715
716 PGBackend::interruptible_future<> PGBackend::write_same(
717 ObjectState& os,
718 const OSDOp& osd_op,
719 ceph::os::Transaction& txn,
720 osd_op_params_t& osd_op_params,
721 object_stat_sum_t& delta_stats)
722 {
723 const ceph_osd_op& op = osd_op.op;
724 const uint64_t len = op.writesame.length;
725 if (len == 0) {
726 return seastar::now();
727 }
728 if (op.writesame.data_length == 0 ||
729 len % op.writesame.data_length != 0 ||
730 op.writesame.data_length != osd_op.indata.length()) {
731 throw crimson::osd::invalid_argument();
732 }
733 ceph::bufferlist repeated_indata;
734 for (uint64_t size = 0; size < len; size += op.writesame.data_length) {
735 repeated_indata.append(osd_op.indata);
736 }
737 maybe_create_new_object(os, txn, delta_stats);
738 txn.write(coll->get_cid(), ghobject_t{os.oi.soid},
739 op.writesame.offset, len,
740 std::move(repeated_indata), op.flags);
741 update_size_and_usage(delta_stats, os.oi, op.writesame.offset, len);
742 osd_op_params.clean_regions.mark_data_region_dirty(op.writesame.offset, len);
743 return seastar::now();
744 }
745
746 PGBackend::write_iertr::future<> PGBackend::writefull(
747 ObjectState& os,
748 const OSDOp& osd_op,
749 ceph::os::Transaction& txn,
750 osd_op_params_t& osd_op_params,
751 object_stat_sum_t& delta_stats)
752 {
753 const ceph_osd_op& op = osd_op.op;
754 if (op.extent.length != osd_op.indata.length()) {
755 return crimson::ct_error::invarg::make();
756 }
757 if (!is_offset_and_length_valid(op.extent.offset, op.extent.length)) {
758 return crimson::ct_error::file_too_large::make();
759 }
760
761 return _writefull(
762 os,
763 op.extent.truncate_size,
764 osd_op.indata,
765 txn,
766 osd_op_params,
767 delta_stats,
768 op.flags);
769 }
770
771 PGBackend::rollback_iertr::future<> PGBackend::rollback(
772 ObjectState& os,
773 const OSDOp& osd_op,
774 ceph::os::Transaction& txn,
775 osd_op_params_t& osd_op_params,
776 object_stat_sum_t& delta_stats,
777 crimson::osd::ObjectContextRef head,
778 crimson::osd::ObjectContextLoader& obc_loader)
779 {
780 const ceph_osd_op& op = osd_op.op;
781 snapid_t snapid = (uint64_t)op.snap.snapid;
782 assert(os.oi.soid.is_head());
783 logger().debug("{} deleting {} and rolling back to old snap {}",
784 __func__, os.oi.soid ,snapid);
785 hobject_t target_coid = os.oi.soid;
786 target_coid.snap = snapid;
787 return obc_loader.with_clone_obc_only<RWState::RWWRITE>(
788 head, target_coid,
789 [this, &os, &txn, &delta_stats, &osd_op_params]
790 (auto resolved_obc) {
791 if (resolved_obc->obs.oi.soid.is_head()) {
792 // no-op: The resolved oid returned the head object
793 logger().debug("PGBackend::rollback: loaded head_obc: {}"
794 " do nothing",
795 resolved_obc->obs.oi.soid);
796 return rollback_iertr::now();
797 }
798 /* TODO: https://tracker.ceph.com/issues/59114 This implementation will not
799 * behave correctly for a rados operation consisting of a mutation followed
800 * by a rollback to a snapshot since the last mutation of the object.
801 * The correct behavior would be for the rollback to undo the mutation
802 * earlier in the operation by resolving to the clone created at the start
803 * of the operation (see resolve_oid).
804 * Instead, it will select HEAD leaving that mutation intact since the SnapSet won't
805 * yet contain that clone. This behavior exists in classic as well.
806 */
807 logger().debug("PGBackend::rollback: loaded clone_obc: {}",
808 resolved_obc->obs.oi.soid);
809 // 1) Delete current head
810 if (os.exists) {
811 txn.remove(coll->get_cid(), ghobject_t{os.oi.soid,
812 ghobject_t::NO_GEN, shard});
813 }
814 // 2) Clone correct snapshot into head
815 txn.clone(coll->get_cid(), ghobject_t{resolved_obc->obs.oi.soid},
816 ghobject_t{os.oi.soid});
817 // Copy clone obc.os.oi to os.oi
818 os.oi.clear_flag(object_info_t::FLAG_WHITEOUT);
819 os.oi.copy_user_bits(resolved_obc->obs.oi);
820 delta_stats.num_bytes -= os.oi.size;
821 delta_stats.num_bytes += resolved_obc->obs.oi.size;
822 osd_op_params.clean_regions.mark_data_region_dirty(0,
823 std::max(os.oi.size, resolved_obc->obs.oi.size));
824 osd_op_params.clean_regions.mark_omap_dirty();
825 // TODO: 3) Calculate clone_overlaps by following overlaps
826 // forward from rollback snapshot
827 // https://tracker.ceph.com/issues/58263
828 return rollback_iertr::now();
829 }).safe_then_interruptible([] {
830 logger().debug("PGBackend::rollback succefully");
831 return rollback_iertr::now();
832 },// there's no snapshot here, or there's no object.
833 // if there's no snapshot, we delete the object;
834 // otherwise, do nothing.
835 crimson::ct_error::enoent::handle(
836 [this, &os, &snapid, &txn, &delta_stats] {
837 logger().debug("PGBackend::rollback: deleting head on {}"
838 " with snap_id of {}"
839 " because got ENOENT|whiteout on obc lookup",
840 os.oi.soid, snapid);
841 return remove(os, txn, delta_stats, false);
842 }),
843 rollback_ertr::pass_further{},
844 crimson::ct_error::assert_all{"unexpected error in rollback"}
845 );
846 }
847
848 PGBackend::append_ierrorator::future<> PGBackend::append(
849 ObjectState& os,
850 OSDOp& osd_op,
851 ceph::os::Transaction& txn,
852 osd_op_params_t& osd_op_params,
853 object_stat_sum_t& delta_stats)
854 {
855 const ceph_osd_op& op = osd_op.op;
856 if (op.extent.length != osd_op.indata.length()) {
857 return crimson::ct_error::invarg::make();
858 }
859 maybe_create_new_object(os, txn, delta_stats);
860 if (op.extent.length) {
861 txn.write(coll->get_cid(), ghobject_t{os.oi.soid},
862 os.oi.size /* offset */, op.extent.length,
863 std::move(osd_op.indata), op.flags);
864 update_size_and_usage(delta_stats, os.oi, os.oi.size,
865 op.extent.length);
866 osd_op_params.clean_regions.mark_data_region_dirty(os.oi.size,
867 op.extent.length);
868 }
869 return seastar::now();
870 }
871
872 PGBackend::write_iertr::future<> PGBackend::truncate(
873 ObjectState& os,
874 const OSDOp& osd_op,
875 ceph::os::Transaction& txn,
876 osd_op_params_t& osd_op_params,
877 object_stat_sum_t& delta_stats)
878 {
879 if (!os.exists || os.oi.is_whiteout()) {
880 logger().debug("{} object dne, truncate is a no-op", __func__);
881 return write_ertr::now();
882 }
883 const ceph_osd_op& op = osd_op.op;
884 if (!is_offset_and_length_valid(op.extent.offset, op.extent.length)) {
885 return crimson::ct_error::file_too_large::make();
886 }
887 return _truncate(
888 os, txn, osd_op_params, delta_stats,
889 op.extent.offset, op.extent.truncate_size, op.extent.truncate_seq);
890 }
891
892 PGBackend::write_iertr::future<> PGBackend::zero(
893 ObjectState& os,
894 const OSDOp& osd_op,
895 ceph::os::Transaction& txn,
896 osd_op_params_t& osd_op_params,
897 object_stat_sum_t& delta_stats)
898 {
899 if (!os.exists || os.oi.is_whiteout()) {
900 logger().debug("{} object dne, zero is a no-op", __func__);
901 return write_ertr::now();
902 }
903 const ceph_osd_op& op = osd_op.op;
904 if (!is_offset_and_length_valid(op.extent.offset, op.extent.length)) {
905 return crimson::ct_error::file_too_large::make();
906 }
907
908 if (op.extent.offset >= os.oi.size || op.extent.length == 0) {
909 return write_iertr::now(); // noop
910 }
911
912 if (op.extent.offset + op.extent.length >= os.oi.size) {
913 return _truncate(
914 os, txn, osd_op_params, delta_stats,
915 op.extent.offset, op.extent.truncate_size, op.extent.truncate_seq);
916 }
917
918 txn.zero(coll->get_cid(),
919 ghobject_t{os.oi.soid},
920 op.extent.offset,
921 op.extent.length);
922 // TODO: modified_ranges.union_of(zeroed);
923 osd_op_params.clean_regions.mark_data_region_dirty(op.extent.offset,
924 op.extent.length);
925 delta_stats.num_wr++;
926 os.oi.clear_data_digest();
927 return write_ertr::now();
928 }
929
930 PGBackend::create_iertr::future<> PGBackend::create(
931 ObjectState& os,
932 const OSDOp& osd_op,
933 ceph::os::Transaction& txn,
934 object_stat_sum_t& delta_stats)
935 {
936 if (os.exists && !os.oi.is_whiteout() &&
937 (osd_op.op.flags & CEPH_OSD_OP_FLAG_EXCL)) {
938 // this is an exclusive create
939 return crimson::ct_error::eexist::make();
940 }
941
942 if (osd_op.indata.length()) {
943 // handle the legacy. `category` is no longer implemented.
944 try {
945 auto p = osd_op.indata.cbegin();
946 std::string category;
947 decode(category, p);
948 } catch (buffer::error&) {
949 return crimson::ct_error::invarg::make();
950 }
951 }
952 maybe_create_new_object(os, txn, delta_stats);
953 txn.create(coll->get_cid(),
954 ghobject_t{os.oi.soid, ghobject_t::NO_GEN, shard});
955 return seastar::now();
956 }
957
958 PGBackend::interruptible_future<>
959 PGBackend::remove(ObjectState& os, ceph::os::Transaction& txn)
960 {
961 // todo: snapset
962 txn.remove(coll->get_cid(),
963 ghobject_t{os.oi.soid, ghobject_t::NO_GEN, shard});
964 os.oi.size = 0;
965 os.oi.new_object();
966 os.exists = false;
967 // todo: update watchers
968 if (os.oi.is_whiteout()) {
969 os.oi.clear_flag(object_info_t::FLAG_WHITEOUT);
970 }
971 return seastar::now();
972 }
973
974 PGBackend::remove_iertr::future<>
975 PGBackend::remove(ObjectState& os, ceph::os::Transaction& txn,
976 object_stat_sum_t& delta_stats, bool whiteout)
977 {
978 if (!os.exists) {
979 return crimson::ct_error::enoent::make();
980 }
981
982 if (!os.exists) {
983 logger().debug("{} {} does not exist",__func__, os.oi.soid);
984 return seastar::now();
985 }
986 if (whiteout && os.oi.is_whiteout()) {
987 logger().debug("{} whiteout set on {} ",__func__, os.oi.soid);
988 return seastar::now();
989 }
990 txn.remove(coll->get_cid(),
991 ghobject_t{os.oi.soid, ghobject_t::NO_GEN, shard});
992 delta_stats.num_bytes -= os.oi.size;
993 os.oi.size = 0;
994 os.oi.new_object();
995
996 // todo: clone_overlap
997 if (whiteout) {
998 logger().debug("{} setting whiteout on {} ",__func__, os.oi.soid);
999 os.oi.set_flag(object_info_t::FLAG_WHITEOUT);
1000 delta_stats.num_whiteouts++;
1001 txn.create(coll->get_cid(),
1002 ghobject_t{os.oi.soid, ghobject_t::NO_GEN, shard});
1003 return seastar::now();
1004 }
1005 // todo: update watchers
1006 if (os.oi.is_whiteout()) {
1007 os.oi.clear_flag(object_info_t::FLAG_WHITEOUT);
1008 delta_stats.num_whiteouts--;
1009 }
1010 delta_stats.num_objects--;
1011 os.exists = false;
1012 return seastar::now();
1013 }
1014
1015 PGBackend::interruptible_future<std::tuple<std::vector<hobject_t>, hobject_t>>
1016 PGBackend::list_objects(const hobject_t& start, uint64_t limit) const
1017 {
1018 auto gstart = start.is_min() ? ghobject_t{} : ghobject_t{start, 0, shard};
1019 return interruptor::make_interruptible(store->list_objects(coll,
1020 gstart,
1021 ghobject_t::get_max(),
1022 limit))
1023 .then_interruptible([](auto ret) {
1024 auto& [gobjects, next] = ret;
1025 std::vector<hobject_t> objects;
1026 boost::copy(gobjects |
1027 boost::adaptors::filtered([](const ghobject_t& o) {
1028 if (o.is_pgmeta()) {
1029 return false;
1030 } else if (o.hobj.is_temp()) {
1031 return false;
1032 } else {
1033 return o.is_no_gen();
1034 }
1035 }) |
1036 boost::adaptors::transformed([](const ghobject_t& o) {
1037 return o.hobj;
1038 }),
1039 std::back_inserter(objects));
1040 return seastar::make_ready_future<std::tuple<std::vector<hobject_t>, hobject_t>>(
1041 std::make_tuple(objects, next.hobj));
1042 });
1043 }
1044
1045 PGBackend::setxattr_ierrorator::future<> PGBackend::setxattr(
1046 ObjectState& os,
1047 const OSDOp& osd_op,
1048 ceph::os::Transaction& txn,
1049 object_stat_sum_t& delta_stats)
1050 {
1051 if (local_conf()->osd_max_attr_size > 0 &&
1052 osd_op.op.xattr.value_len > local_conf()->osd_max_attr_size) {
1053 return crimson::ct_error::file_too_large::make();
1054 }
1055
1056 const auto max_name_len = std::min<uint64_t>(
1057 store->get_max_attr_name_length(), local_conf()->osd_max_attr_name_len);
1058 if (osd_op.op.xattr.name_len > max_name_len) {
1059 return crimson::ct_error::enametoolong::make();
1060 }
1061
1062 maybe_create_new_object(os, txn, delta_stats);
1063
1064 std::string name{"_"};
1065 ceph::bufferlist val;
1066 {
1067 auto bp = osd_op.indata.cbegin();
1068 bp.copy(osd_op.op.xattr.name_len, name);
1069 bp.copy(osd_op.op.xattr.value_len, val);
1070 }
1071 logger().debug("setxattr on obj={} for attr={}", os.oi.soid, name);
1072 txn.setattr(coll->get_cid(), ghobject_t{os.oi.soid}, name, val);
1073 delta_stats.num_wr++;
1074 return seastar::now();
1075 }
1076
1077 PGBackend::get_attr_ierrorator::future<> PGBackend::getxattr(
1078 const ObjectState& os,
1079 OSDOp& osd_op,
1080 object_stat_sum_t& delta_stats) const
1081 {
1082 std::string name;
1083 ceph::bufferlist val;
1084 {
1085 auto bp = osd_op.indata.cbegin();
1086 std::string aname;
1087 bp.copy(osd_op.op.xattr.name_len, aname);
1088 name = "_" + aname;
1089 }
1090 logger().debug("getxattr on obj={} for attr={}", os.oi.soid, name);
1091 return getxattr(os.oi.soid, std::move(name)).safe_then_interruptible(
1092 [&delta_stats, &osd_op] (ceph::bufferlist&& val) {
1093 osd_op.outdata = std::move(val);
1094 osd_op.op.xattr.value_len = osd_op.outdata.length();
1095 delta_stats.num_rd++;
1096 delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10);
1097 return get_attr_errorator::now();
1098 });
1099 }
1100
1101 PGBackend::get_attr_ierrorator::future<ceph::bufferlist>
1102 PGBackend::getxattr(
1103 const hobject_t& soid,
1104 std::string_view key) const
1105 {
1106 return store->get_attr(coll, ghobject_t{soid}, key);
1107 }
1108
1109 PGBackend::get_attr_ierrorator::future<ceph::bufferlist>
1110 PGBackend::getxattr(
1111 const hobject_t& soid,
1112 std::string&& key) const
1113 {
1114 return seastar::do_with(key, [this, &soid](auto &key) {
1115 return store->get_attr(coll, ghobject_t{soid}, key);
1116 });
1117 }
1118
1119 PGBackend::get_attr_ierrorator::future<> PGBackend::get_xattrs(
1120 const ObjectState& os,
1121 OSDOp& osd_op,
1122 object_stat_sum_t& delta_stats) const
1123 {
1124 return store->get_attrs(coll, ghobject_t{os.oi.soid}).safe_then(
1125 [&delta_stats, &osd_op](auto&& attrs) {
1126 std::vector<std::pair<std::string, bufferlist>> user_xattrs;
1127 ceph::bufferlist bl;
1128 for (auto& [key, val] : attrs) {
1129 if (key.size() > 1 && key[0] == '_') {
1130 bl.append(std::move(val));
1131 user_xattrs.emplace_back(key.substr(1), std::move(bl));
1132 }
1133 }
1134 ceph::encode(user_xattrs, osd_op.outdata);
1135 delta_stats.num_rd++;
1136 delta_stats.num_rd_kb += shift_round_up(bl.length(), 10);
1137 return get_attr_errorator::now();
1138 });
1139 }
1140
1141 namespace {
1142
1143 template<typename U, typename V>
1144 int do_cmp_xattr(int op, const U& lhs, const V& rhs)
1145 {
1146 switch (op) {
1147 case CEPH_OSD_CMPXATTR_OP_EQ:
1148 return lhs == rhs;
1149 case CEPH_OSD_CMPXATTR_OP_NE:
1150 return lhs != rhs;
1151 case CEPH_OSD_CMPXATTR_OP_GT:
1152 return lhs > rhs;
1153 case CEPH_OSD_CMPXATTR_OP_GTE:
1154 return lhs >= rhs;
1155 case CEPH_OSD_CMPXATTR_OP_LT:
1156 return lhs < rhs;
1157 case CEPH_OSD_CMPXATTR_OP_LTE:
1158 return lhs <= rhs;
1159 default:
1160 return -EINVAL;
1161 }
1162 }
1163
1164 } // anonymous namespace
1165
1166 static int do_xattr_cmp_u64(int op, uint64_t lhs, bufferlist& rhs_xattr)
1167 {
1168 uint64_t rhs;
1169
1170 if (rhs_xattr.length() > 0) {
1171 const char* first = rhs_xattr.c_str();
1172 if (auto [p, ec] = std::from_chars(first, first + rhs_xattr.length(), rhs);
1173 ec != std::errc()) {
1174 return -EINVAL;
1175 }
1176 } else {
1177 rhs = 0;
1178 }
1179 logger().debug("do_xattr_cmp_u64 '{}' vs '{}' op {}", lhs, rhs, op);
1180 return do_cmp_xattr(op, lhs, rhs);
1181 }
1182
1183 PGBackend::cmp_xattr_ierrorator::future<> PGBackend::cmp_xattr(
1184 const ObjectState& os,
1185 OSDOp& osd_op,
1186 object_stat_sum_t& delta_stats) const
1187 {
1188 std::string name{"_"};
1189 auto bp = osd_op.indata.cbegin();
1190 bp.copy(osd_op.op.xattr.name_len, name);
1191
1192 logger().debug("cmpxattr on obj={} for attr={}", os.oi.soid, name);
1193 return getxattr(os.oi.soid, std::move(name)).safe_then_interruptible(
1194 [&delta_stats, &osd_op] (auto &&xattr) -> cmp_xattr_ierrorator::future<> {
1195 delta_stats.num_rd++;
1196 delta_stats.num_rd_kb += shift_round_up(osd_op.op.xattr.value_len, 10);
1197
1198 int result = 0;
1199 auto bp = osd_op.indata.cbegin();
1200 bp += osd_op.op.xattr.name_len;
1201
1202 switch (osd_op.op.xattr.cmp_mode) {
1203 case CEPH_OSD_CMPXATTR_MODE_STRING:
1204 {
1205 string lhs;
1206 bp.copy(osd_op.op.xattr.value_len, lhs);
1207 string_view rhs(xattr.c_str(), xattr.length());
1208 result = do_cmp_xattr(osd_op.op.xattr.cmp_op, lhs, rhs);
1209 logger().debug("cmpxattr lhs={}, rhs={}", lhs, rhs);
1210 }
1211 break;
1212 case CEPH_OSD_CMPXATTR_MODE_U64:
1213 {
1214 uint64_t lhs;
1215 try {
1216 decode(lhs, bp);
1217 } catch (ceph::buffer::error& e) {
1218 logger().info("cmp_xattr: buffer error expection");
1219 result = -EINVAL;
1220 break;
1221 }
1222 result = do_xattr_cmp_u64(osd_op.op.xattr.cmp_op, lhs, xattr);
1223 }
1224 break;
1225 default:
1226 logger().info("bad cmp mode {}", osd_op.op.xattr.cmp_mode);
1227 result = -EINVAL;
1228 }
1229 if (result == 0) {
1230 logger().info("cmp_xattr: comparison returned false");
1231 return crimson::ct_error::ecanceled::make();
1232 } else if (result == -EINVAL) {
1233 return crimson::ct_error::invarg::make();
1234 } else {
1235 osd_op.rval = 1;
1236 return cmp_xattr_ierrorator::now();
1237 }
1238 }).handle_error_interruptible(
1239 crimson::ct_error::enodata::handle([&delta_stats, &osd_op] ()
1240 ->cmp_xattr_errorator::future<> {
1241 delta_stats.num_rd++;
1242 delta_stats.num_rd_kb += shift_round_up(osd_op.op.xattr.value_len, 10);
1243 return crimson::ct_error::ecanceled::make();
1244 }),
1245 cmp_xattr_errorator::pass_further{}
1246 );
1247 }
1248
1249 PGBackend::rm_xattr_iertr::future<>
1250 PGBackend::rm_xattr(
1251 ObjectState& os,
1252 const OSDOp& osd_op,
1253 ceph::os::Transaction& txn)
1254 {
1255 if (!os.exists || os.oi.is_whiteout()) {
1256 logger().debug("{}: {} DNE", __func__, os.oi.soid);
1257 return crimson::ct_error::enoent::make();
1258 }
1259 auto bp = osd_op.indata.cbegin();
1260 string attr_name{"_"};
1261 bp.copy(osd_op.op.xattr.name_len, attr_name);
1262 txn.rmattr(coll->get_cid(), ghobject_t{os.oi.soid}, attr_name);
1263 return rm_xattr_iertr::now();
1264 }
1265
1266 void PGBackend::clone(
1267 /* const */object_info_t& snap_oi,
1268 const ObjectState& os,
1269 const ObjectState& d_os,
1270 ceph::os::Transaction& txn)
1271 {
1272 // See OpsExecutor::execute_clone documentation
1273 txn.clone(coll->get_cid(), ghobject_t{os.oi.soid}, ghobject_t{d_os.oi.soid});
1274 {
1275 ceph::bufferlist bv;
1276 snap_oi.encode_no_oid(bv, CEPH_FEATURES_ALL);
1277 txn.setattr(coll->get_cid(), ghobject_t{d_os.oi.soid}, OI_ATTR, bv);
1278 }
1279 txn.rmattr(coll->get_cid(), ghobject_t{d_os.oi.soid}, SS_ATTR);
1280 }
1281
1282 using get_omap_ertr =
1283 crimson::os::FuturizedStore::Shard::read_errorator::extend<
1284 crimson::ct_error::enodata>;
1285 using get_omap_iertr =
1286 ::crimson::interruptible::interruptible_errorator<
1287 ::crimson::osd::IOInterruptCondition,
1288 get_omap_ertr>;
1289 static
1290 get_omap_iertr::future<
1291 crimson::os::FuturizedStore::Shard::omap_values_t>
1292 maybe_get_omap_vals_by_keys(
1293 crimson::os::FuturizedStore::Shard* store,
1294 const crimson::os::CollectionRef& coll,
1295 const object_info_t& oi,
1296 const std::set<std::string>& keys_to_get)
1297 {
1298 if (oi.is_omap()) {
1299 return store->omap_get_values(coll, ghobject_t{oi.soid}, keys_to_get);
1300 } else {
1301 return crimson::ct_error::enodata::make();
1302 }
1303 }
1304
1305 static
1306 get_omap_iertr::future<
1307 std::tuple<bool, crimson::os::FuturizedStore::Shard::omap_values_t>>
1308 maybe_get_omap_vals(
1309 crimson::os::FuturizedStore::Shard* store,
1310 const crimson::os::CollectionRef& coll,
1311 const object_info_t& oi,
1312 const std::string& start_after)
1313 {
1314 if (oi.is_omap()) {
1315 return store->omap_get_values(coll, ghobject_t{oi.soid}, start_after);
1316 } else {
1317 return crimson::ct_error::enodata::make();
1318 }
1319 }
1320
1321 PGBackend::ll_read_ierrorator::future<ceph::bufferlist>
1322 PGBackend::omap_get_header(
1323 const crimson::os::CollectionRef& c,
1324 const ghobject_t& oid) const
1325 {
1326 return store->omap_get_header(c, oid)
1327 .handle_error(
1328 crimson::ct_error::enodata::handle([] {
1329 return seastar::make_ready_future<bufferlist>();
1330 }),
1331 ll_read_errorator::pass_further{}
1332 );
1333 }
1334
1335 PGBackend::ll_read_ierrorator::future<>
1336 PGBackend::omap_get_header(
1337 const ObjectState& os,
1338 OSDOp& osd_op,
1339 object_stat_sum_t& delta_stats) const
1340 {
1341 return omap_get_header(coll, ghobject_t{os.oi.soid}).safe_then_interruptible(
1342 [&delta_stats, &osd_op] (ceph::bufferlist&& header) {
1343 osd_op.outdata = std::move(header);
1344 delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10);
1345 delta_stats.num_rd++;
1346 return seastar::now();
1347 });
1348 }
1349
1350 PGBackend::ll_read_ierrorator::future<>
1351 PGBackend::omap_get_keys(
1352 const ObjectState& os,
1353 OSDOp& osd_op,
1354 object_stat_sum_t& delta_stats) const
1355 {
1356 if (!os.exists || os.oi.is_whiteout()) {
1357 logger().debug("{}: object does not exist: {}", os.oi.soid);
1358 return crimson::ct_error::enoent::make();
1359 }
1360 std::string start_after;
1361 uint64_t max_return;
1362 try {
1363 auto p = osd_op.indata.cbegin();
1364 decode(start_after, p);
1365 decode(max_return, p);
1366 } catch (buffer::error&) {
1367 throw crimson::osd::invalid_argument{};
1368 }
1369 max_return =
1370 std::min(max_return, local_conf()->osd_max_omap_entries_per_request);
1371
1372
1373 // TODO: truly chunk the reading
1374 return maybe_get_omap_vals(store, coll, os.oi, start_after).safe_then_interruptible(
1375 [=,&delta_stats, &osd_op](auto ret) {
1376 ceph::bufferlist result;
1377 bool truncated = false;
1378 uint32_t num = 0;
1379 for (auto &[key, val] : std::get<1>(ret)) {
1380 if (num >= max_return ||
1381 result.length() >= local_conf()->osd_max_omap_bytes_per_request) {
1382 truncated = true;
1383 break;
1384 }
1385 encode(key, result);
1386 ++num;
1387 }
1388 encode(num, osd_op.outdata);
1389 osd_op.outdata.claim_append(result);
1390 encode(truncated, osd_op.outdata);
1391 delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10);
1392 delta_stats.num_rd++;
1393 return seastar::now();
1394 }).handle_error_interruptible(
1395 crimson::ct_error::enodata::handle([&osd_op] {
1396 uint32_t num = 0;
1397 bool truncated = false;
1398 encode(num, osd_op.outdata);
1399 encode(truncated, osd_op.outdata);
1400 osd_op.rval = 0;
1401 return seastar::now();
1402 }),
1403 ll_read_errorator::pass_further{}
1404 );
1405 }
1406 static
1407 PGBackend::omap_cmp_ertr::future<> do_omap_val_cmp(
1408 std::map<std::string, bufferlist, std::less<>> out,
1409 std::map<std::string, std::pair<bufferlist, int>> assertions)
1410 {
1411 bufferlist empty;
1412 for (const auto &[akey, avalue] : assertions) {
1413 const auto [abl, aflag] = avalue;
1414 auto out_entry = out.find(akey);
1415 bufferlist &bl = (out_entry != out.end()) ? out_entry->second : empty;
1416 switch (aflag) {
1417 case CEPH_OSD_CMPXATTR_OP_EQ:
1418 if (!(bl == abl)) {
1419 return crimson::ct_error::ecanceled::make();
1420 }
1421 break;
1422 case CEPH_OSD_CMPXATTR_OP_LT:
1423 if (!(bl < abl)) {
1424 return crimson::ct_error::ecanceled::make();
1425 }
1426 break;
1427 case CEPH_OSD_CMPXATTR_OP_GT:
1428 if (!(bl > abl)) {
1429 return crimson::ct_error::ecanceled::make();
1430 }
1431 break;
1432 default:
1433 return crimson::ct_error::invarg::make();
1434 }
1435 }
1436 return PGBackend::omap_cmp_ertr::now();
1437 }
1438 PGBackend::omap_cmp_iertr::future<>
1439 PGBackend::omap_cmp(
1440 const ObjectState& os,
1441 OSDOp& osd_op,
1442 object_stat_sum_t& delta_stats) const
1443 {
1444 if (!os.exists || os.oi.is_whiteout()) {
1445 logger().debug("{}: object does not exist: {}", os.oi.soid);
1446 return crimson::ct_error::enoent::make();
1447 }
1448
1449 auto bp = osd_op.indata.cbegin();
1450 std::map<std::string, std::pair<bufferlist, int> > assertions;
1451 try {
1452 decode(assertions, bp);
1453 } catch (buffer::error&) {
1454 return crimson::ct_error::invarg::make();
1455 }
1456
1457 delta_stats.num_rd++;
1458 if (os.oi.is_omap()) {
1459 std::set<std::string> to_get;
1460 for (auto &i: assertions) {
1461 to_get.insert(i.first);
1462 }
1463 return store->omap_get_values(coll, ghobject_t{os.oi.soid}, to_get)
1464 .safe_then([=, &osd_op] (auto&& out) -> omap_cmp_iertr::future<> {
1465 osd_op.rval = 0;
1466 return do_omap_val_cmp(out, assertions);
1467 });
1468 } else {
1469 return crimson::ct_error::ecanceled::make();
1470 }
1471 }
1472 PGBackend::ll_read_ierrorator::future<>
1473 PGBackend::omap_get_vals(
1474 const ObjectState& os,
1475 OSDOp& osd_op,
1476 object_stat_sum_t& delta_stats) const
1477 {
1478 if (!os.exists || os.oi.is_whiteout()) {
1479 logger().debug("{}: object does not exist: {}", os.oi.soid);
1480 return crimson::ct_error::enoent::make();
1481 }
1482 std::string start_after;
1483 uint64_t max_return;
1484 std::string filter_prefix;
1485 try {
1486 auto p = osd_op.indata.cbegin();
1487 decode(start_after, p);
1488 decode(max_return, p);
1489 decode(filter_prefix, p);
1490 } catch (buffer::error&) {
1491 throw crimson::osd::invalid_argument{};
1492 }
1493
1494 max_return = \
1495 std::min(max_return, local_conf()->osd_max_omap_entries_per_request);
1496 delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10);
1497 delta_stats.num_rd++;
1498
1499 // TODO: truly chunk the reading
1500 return maybe_get_omap_vals(store, coll, os.oi, start_after)
1501 .safe_then_interruptible(
1502 [=, &osd_op] (auto&& ret) {
1503 auto [done, vals] = std::move(ret);
1504 assert(done);
1505 ceph::bufferlist result;
1506 bool truncated = false;
1507 uint32_t num = 0;
1508 auto iter = filter_prefix > start_after ? vals.lower_bound(filter_prefix)
1509 : std::begin(vals);
1510 for (; iter != std::end(vals); ++iter) {
1511 const auto& [key, value] = *iter;
1512 if (key.substr(0, filter_prefix.size()) != filter_prefix) {
1513 break;
1514 } else if (num >= max_return ||
1515 result.length() >= local_conf()->osd_max_omap_bytes_per_request) {
1516 truncated = true;
1517 break;
1518 }
1519 encode(key, result);
1520 encode(value, result);
1521 ++num;
1522 }
1523 encode(num, osd_op.outdata);
1524 osd_op.outdata.claim_append(result);
1525 encode(truncated, osd_op.outdata);
1526 return ll_read_errorator::now();
1527 }).handle_error_interruptible(
1528 crimson::ct_error::enodata::handle([&osd_op] {
1529 encode(uint32_t{0} /* num */, osd_op.outdata);
1530 encode(bool{false} /* truncated */, osd_op.outdata);
1531 osd_op.rval = 0;
1532 return ll_read_errorator::now();
1533 }),
1534 ll_read_errorator::pass_further{}
1535 );
1536 }
1537
1538 PGBackend::ll_read_ierrorator::future<>
1539 PGBackend::omap_get_vals_by_keys(
1540 const ObjectState& os,
1541 OSDOp& osd_op,
1542 object_stat_sum_t& delta_stats) const
1543 {
1544 if (!os.exists || os.oi.is_whiteout()) {
1545 logger().debug("{}: object does not exist: {}", __func__, os.oi.soid);
1546 return crimson::ct_error::enoent::make();
1547 }
1548
1549 std::set<std::string> keys_to_get;
1550 try {
1551 auto p = osd_op.indata.cbegin();
1552 decode(keys_to_get, p);
1553 } catch (buffer::error&) {
1554 throw crimson::osd::invalid_argument();
1555 }
1556 delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10);
1557 delta_stats.num_rd++;
1558 return maybe_get_omap_vals_by_keys(store, coll, os.oi, keys_to_get)
1559 .safe_then_interruptible(
1560 [&osd_op] (crimson::os::FuturizedStore::Shard::omap_values_t&& vals) {
1561 encode(vals, osd_op.outdata);
1562 return ll_read_errorator::now();
1563 }).handle_error_interruptible(
1564 crimson::ct_error::enodata::handle([&osd_op] {
1565 uint32_t num = 0;
1566 encode(num, osd_op.outdata);
1567 osd_op.rval = 0;
1568 return ll_read_errorator::now();
1569 }),
1570 ll_read_errorator::pass_further{}
1571 );
1572 }
1573
1574 PGBackend::interruptible_future<>
1575 PGBackend::omap_set_vals(
1576 ObjectState& os,
1577 const OSDOp& osd_op,
1578 ceph::os::Transaction& txn,
1579 osd_op_params_t& osd_op_params,
1580 object_stat_sum_t& delta_stats)
1581 {
1582 maybe_create_new_object(os, txn, delta_stats);
1583
1584 ceph::bufferlist to_set_bl;
1585 try {
1586 auto p = osd_op.indata.cbegin();
1587 decode_str_str_map_to_bl(p, &to_set_bl);
1588 } catch (buffer::error&) {
1589 throw crimson::osd::invalid_argument{};
1590 }
1591
1592 txn.omap_setkeys(coll->get_cid(), ghobject_t{os.oi.soid}, to_set_bl);
1593 osd_op_params.clean_regions.mark_omap_dirty();
1594 delta_stats.num_wr++;
1595 delta_stats.num_wr_kb += shift_round_up(to_set_bl.length(), 10);
1596 os.oi.set_flag(object_info_t::FLAG_OMAP);
1597 os.oi.clear_omap_digest();
1598 return seastar::now();
1599 }
1600
1601 PGBackend::interruptible_future<>
1602 PGBackend::omap_set_header(
1603 ObjectState& os,
1604 const OSDOp& osd_op,
1605 ceph::os::Transaction& txn,
1606 osd_op_params_t& osd_op_params,
1607 object_stat_sum_t& delta_stats)
1608 {
1609 maybe_create_new_object(os, txn, delta_stats);
1610 txn.omap_setheader(coll->get_cid(), ghobject_t{os.oi.soid}, osd_op.indata);
1611 osd_op_params.clean_regions.mark_omap_dirty();
1612 delta_stats.num_wr++;
1613 os.oi.set_flag(object_info_t::FLAG_OMAP);
1614 os.oi.clear_omap_digest();
1615 return seastar::now();
1616 }
1617
1618 PGBackend::interruptible_future<> PGBackend::omap_remove_range(
1619 ObjectState& os,
1620 const OSDOp& osd_op,
1621 ceph::os::Transaction& txn,
1622 object_stat_sum_t& delta_stats)
1623 {
1624 std::string key_begin, key_end;
1625 try {
1626 auto p = osd_op.indata.cbegin();
1627 decode(key_begin, p);
1628 decode(key_end, p);
1629 } catch (buffer::error& e) {
1630 throw crimson::osd::invalid_argument{};
1631 }
1632 txn.omap_rmkeyrange(coll->get_cid(), ghobject_t{os.oi.soid}, key_begin, key_end);
1633 delta_stats.num_wr++;
1634 os.oi.clear_omap_digest();
1635 return seastar::now();
1636 }
1637
1638 PGBackend::interruptible_future<> PGBackend::omap_remove_key(
1639 ObjectState& os,
1640 const OSDOp& osd_op,
1641 ceph::os::Transaction& txn)
1642 {
1643 ceph::bufferlist to_rm_bl;
1644 try {
1645 auto p = osd_op.indata.cbegin();
1646 decode_str_set_to_bl(p, &to_rm_bl);
1647 } catch (buffer::error& e) {
1648 throw crimson::osd::invalid_argument{};
1649 }
1650 txn.omap_rmkeys(coll->get_cid(), ghobject_t{os.oi.soid}, to_rm_bl);
1651 // TODO:
1652 // ctx->clean_regions.mark_omap_dirty();
1653 // ctx->delta_stats.num_wr++;
1654 os.oi.clear_omap_digest();
1655 return seastar::now();
1656 }
1657
1658 PGBackend::omap_clear_iertr::future<>
1659 PGBackend::omap_clear(
1660 ObjectState& os,
1661 OSDOp& osd_op,
1662 ceph::os::Transaction& txn,
1663 osd_op_params_t& osd_op_params,
1664 object_stat_sum_t& delta_stats)
1665 {
1666 if (!os.exists || os.oi.is_whiteout()) {
1667 logger().debug("{}: object does not exist: {}", os.oi.soid);
1668 return crimson::ct_error::enoent::make();
1669 }
1670 if (!os.oi.is_omap()) {
1671 return omap_clear_ertr::now();
1672 }
1673 txn.omap_clear(coll->get_cid(), ghobject_t{os.oi.soid});
1674 osd_op_params.clean_regions.mark_omap_dirty();
1675 delta_stats.num_wr++;
1676 os.oi.clear_omap_digest();
1677 os.oi.clear_flag(object_info_t::FLAG_OMAP);
1678 return omap_clear_ertr::now();
1679 }
1680
1681 PGBackend::interruptible_future<struct stat>
1682 PGBackend::stat(
1683 CollectionRef c,
1684 const ghobject_t& oid) const
1685 {
1686 return store->stat(c, oid);
1687 }
1688
1689 PGBackend::read_errorator::future<std::map<uint64_t, uint64_t>>
1690 PGBackend::fiemap(
1691 CollectionRef c,
1692 const ghobject_t& oid,
1693 uint64_t off,
1694 uint64_t len)
1695 {
1696 return store->fiemap(c, oid, off, len);
1697 }
1698
1699 PGBackend::write_iertr::future<> PGBackend::tmapput(
1700 ObjectState& os,
1701 const OSDOp& osd_op,
1702 ceph::os::Transaction& txn,
1703 object_stat_sum_t& delta_stats,
1704 osd_op_params_t& osd_op_params)
1705 {
1706 logger().debug("PGBackend::tmapput: {}", os.oi.soid);
1707 auto ret = crimson::common::do_tmap_put(osd_op.indata.cbegin());
1708 if (!ret.has_value()) {
1709 logger().debug("PGBackend::tmapup: {}, ret={}", os.oi.soid, ret.error());
1710 ceph_assert(ret.error() == -EINVAL);
1711 return crimson::ct_error::invarg::make();
1712 } else {
1713 auto bl = std::move(ret.value());
1714 return _writefull(
1715 os,
1716 bl.length(),
1717 std::move(bl),
1718 txn,
1719 osd_op_params,
1720 delta_stats,
1721 0);
1722 }
1723 }
1724
1725 PGBackend::tmapup_iertr::future<> PGBackend::tmapup(
1726 ObjectState& os,
1727 const OSDOp& osd_op,
1728 ceph::os::Transaction& txn,
1729 object_stat_sum_t& delta_stats,
1730 osd_op_params_t& osd_op_params)
1731 {
1732 logger().debug("PGBackend::tmapup: {}", os.oi.soid);
1733 return PGBackend::write_iertr::now(
1734 ).si_then([this, &os] {
1735 return _read(os.oi.soid, 0, os.oi.size, 0);
1736 }).handle_error_interruptible(
1737 crimson::ct_error::enoent::handle([](auto &) {
1738 return seastar::make_ready_future<bufferlist>();
1739 }),
1740 PGBackend::write_iertr::pass_further{},
1741 crimson::ct_error::assert_all{"read error in mutate_object_contents"}
1742 ).si_then([this, &os, &osd_op, &txn,
1743 &delta_stats, &osd_op_params]
1744 (auto &&bl) mutable -> PGBackend::tmapup_iertr::future<> {
1745 auto result = crimson::common::do_tmap_up(
1746 osd_op.indata.cbegin(),
1747 std::move(bl));
1748 if (!result.has_value()) {
1749 int ret = result.error();
1750 logger().debug("PGBackend::tmapup: {}, ret={}", os.oi.soid, ret);
1751 switch (ret) {
1752 case -EEXIST:
1753 return crimson::ct_error::eexist::make();
1754 case -ENOENT:
1755 return crimson::ct_error::enoent::make();
1756 case -EINVAL:
1757 return crimson::ct_error::invarg::make();
1758 default:
1759 ceph_assert(0 == "impossible error");
1760 return crimson::ct_error::invarg::make();
1761 }
1762 }
1763
1764 logger().debug(
1765 "PGBackend::tmapup: {}, result.value.length()={}, ret=0",
1766 os.oi.soid, result.value().length());
1767 return _writefull(
1768 os,
1769 result.value().length(),
1770 result.value(),
1771 txn,
1772 osd_op_params,
1773 delta_stats,
1774 0);
1775 });
1776 }
1777
1778 PGBackend::read_ierrorator::future<> PGBackend::tmapget(
1779 const ObjectState& os,
1780 OSDOp& osd_op,
1781 object_stat_sum_t& delta_stats)
1782 {
1783 logger().debug("PGBackend::tmapget: {}", os.oi.soid);
1784 const auto& oi = os.oi;
1785 logger().debug("PGBackend::tmapget: read {} 0~{}", oi.soid, oi.size);
1786 if (!os.exists || os.oi.is_whiteout()) {
1787 logger().debug("PGBackend::tmapget: {} DNE", os.oi.soid);
1788 return crimson::ct_error::enoent::make();
1789 }
1790
1791 return _read(oi.soid, 0, oi.size, 0).safe_then_interruptible_tuple(
1792 [&delta_stats, &osd_op](auto&& bl) -> read_errorator::future<> {
1793 logger().debug("PGBackend::tmapget: data length: {}", bl.length());
1794 osd_op.op.extent.length = bl.length();
1795 osd_op.rval = 0;
1796 delta_stats.num_rd++;
1797 delta_stats.num_rd_kb += shift_round_up(bl.length(), 10);
1798 osd_op.outdata = std::move(bl);
1799 return read_errorator::now();
1800 }, crimson::ct_error::input_output_error::handle([] {
1801 return read_errorator::future<>{crimson::ct_error::object_corrupted::make()};
1802 }),
1803 read_errorator::pass_further{});
1804 }
1805