]> git.proxmox.com Git - ceph.git/blame - ceph/src/crimson/osd/pg_backend.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / crimson / osd / pg_backend.cc
CommitLineData
9f95a23c
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "pg_backend.h"
5
20effc67 6#include <charconv>
9f95a23c
TL
7#include <optional>
8#include <boost/range/adaptor/filtered.hpp>
9#include <boost/range/adaptor/transformed.hpp>
10#include <boost/range/algorithm/copy.hpp>
11#include <fmt/format.h>
12#include <fmt/ostream.h>
13#include <seastar/core/print.hh>
14
15#include "messages/MOSDOp.h"
16#include "os/Transaction.h"
f67539c2 17#include "common/Checksummer.h"
9f95a23c
TL
18#include "common/Clock.h"
19
f67539c2 20#include "crimson/common/exception.h"
9f95a23c
TL
21#include "crimson/os/futurized_collection.h"
22#include "crimson/os/futurized_store.h"
23#include "crimson/osd/osd_operation.h"
24#include "replicated_backend.h"
f67539c2 25#include "replicated_recovery_backend.h"
9f95a23c
TL
26#include "ec_backend.h"
27#include "exceptions.h"
28
29namespace {
30 seastar::logger& logger() {
31 return crimson::get_logger(ceph_subsys_osd);
32 }
33}
34
20effc67
TL
35using std::runtime_error;
36using std::string;
37using std::string_view;
9f95a23c
TL
38using crimson::common::local_conf;
39
40std::unique_ptr<PGBackend>
41PGBackend::create(pg_t pgid,
42 const pg_shard_t pg_shard,
43 const pg_pool_t& pool,
44 crimson::os::CollectionRef coll,
45 crimson::osd::ShardServices& shard_services,
46 const ec_profile_t& ec_profile)
47{
48 switch (pool.type) {
49 case pg_pool_t::TYPE_REPLICATED:
50 return std::make_unique<ReplicatedBackend>(pgid, pg_shard,
51 coll, shard_services);
52 case pg_pool_t::TYPE_ERASURE:
53 return std::make_unique<ECBackend>(pg_shard.shard, coll, shard_services,
54 std::move(ec_profile),
55 pool.stripe_width);
56 default:
57 throw runtime_error(seastar::format("unsupported pool type '{}'",
58 pool.type));
59 }
60}
61
62PGBackend::PGBackend(shard_id_t shard,
63 CollectionRef coll,
64 crimson::os::FuturizedStore* store)
65 : shard{shard},
66 coll{coll},
67 store{store}
68{}
69
20effc67
TL
70PGBackend::load_metadata_iertr::future
71 <PGBackend::loaded_object_md_t::ref>
9f95a23c
TL
72PGBackend::load_metadata(const hobject_t& oid)
73{
f67539c2
TL
74 if (__builtin_expect(stopping, false)) {
75 throw crimson::common::system_shutdown_exception();
76 }
77
20effc67 78 return interruptor::make_interruptible(store->get_attrs(
9f95a23c 79 coll,
20effc67 80 ghobject_t{oid, ghobject_t::NO_GEN, shard})).safe_then_interruptible(
9f95a23c
TL
81 [oid](auto &&attrs) -> load_metadata_ertr::future<loaded_object_md_t::ref>{
82 loaded_object_md_t::ref ret(new loaded_object_md_t());
83 if (auto oiiter = attrs.find(OI_ATTR); oiiter != attrs.end()) {
20effc67 84 bufferlist bl = std::move(oiiter->second);
9f95a23c 85 ret->os = ObjectState(
20effc67 86 object_info_t(bl, oid),
9f95a23c
TL
87 true);
88 } else {
89 logger().error(
90 "load_metadata: object {} present but missing object info",
91 oid);
92 return crimson::ct_error::object_corrupted::make();
93 }
94
95 if (oid.is_head()) {
96 if (auto ssiter = attrs.find(SS_ATTR); ssiter != attrs.end()) {
20effc67 97 bufferlist bl = std::move(ssiter->second);
9f95a23c
TL
98 ret->ss = SnapSet(bl);
99 } else {
100 /* TODO: add support for writing out snapsets
101 logger().error(
102 "load_metadata: object {} present but missing snapset",
103 oid);
104 //return crimson::ct_error::object_corrupted::make();
105 */
106 ret->ss = SnapSet();
107 }
108 }
109
110 return load_metadata_ertr::make_ready_future<loaded_object_md_t::ref>(
111 std::move(ret));
112 }, crimson::ct_error::enoent::handle([oid] {
113 logger().debug(
114 "load_metadata: object {} doesn't exist, returning empty metadata",
115 oid);
116 return load_metadata_ertr::make_ready_future<loaded_object_md_t::ref>(
117 new loaded_object_md_t{
118 ObjectState(
119 object_info_t(oid),
120 false),
121 oid.is_head() ? std::optional<SnapSet>(SnapSet()) : std::nullopt
122 });
123 }));
124}
125
20effc67 126PGBackend::rep_op_fut_t
9f95a23c
TL
127PGBackend::mutate_object(
128 std::set<pg_shard_t> pg_shards,
129 crimson::osd::ObjectContextRef &&obc,
130 ceph::os::Transaction&& txn,
20effc67 131 osd_op_params_t&& osd_op_p,
9f95a23c
TL
132 epoch_t min_epoch,
133 epoch_t map_epoch,
f67539c2 134 std::vector<pg_log_entry_t>&& log_entries)
9f95a23c
TL
135{
136 logger().trace("mutate_object: num_ops={}", txn.get_num_ops());
137 if (obc->obs.exists) {
138#if 0
139 obc->obs.oi.version = ctx->at_version;
140 obc->obs.oi.prior_version = ctx->obs->oi.version;
141#endif
142
f67539c2
TL
143 obc->obs.oi.prior_version = obc->obs.oi.version;
144 obc->obs.oi.version = osd_op_p.at_version;
145 if (osd_op_p.user_at_version > obc->obs.oi.user_version)
146 obc->obs.oi.user_version = osd_op_p.user_at_version;
20effc67
TL
147 obc->obs.oi.last_reqid = osd_op_p.req_id;
148 obc->obs.oi.mtime = osd_op_p.mtime;
9f95a23c
TL
149 obc->obs.oi.local_mtime = ceph_clock_now();
150
151 // object_info_t
152 {
153 ceph::bufferlist osv;
20effc67 154 obc->obs.oi.encode_no_oid(osv, CEPH_FEATURES_ALL);
9f95a23c
TL
155 // TODO: get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD, nullptr));
156 txn.setattr(coll->get_cid(), ghobject_t{obc->obs.oi.soid}, OI_ATTR, osv);
157 }
158 } else {
159 // reset cached ObjectState without enforcing eviction
160 obc->obs.oi = object_info_t(obc->obs.oi.soid);
161 }
162 return _submit_transaction(
163 std::move(pg_shards), obc->obs.oi.soid, std::move(txn),
f67539c2 164 std::move(osd_op_p), min_epoch, map_epoch, std::move(log_entries));
9f95a23c
TL
165}
166
167static inline bool _read_verify_data(
168 const object_info_t& oi,
169 const ceph::bufferlist& data)
170{
171 if (oi.is_data_digest() && oi.size == data.length()) {
172 // whole object? can we verify the checksum?
173 if (auto crc = data.crc32c(-1); crc != oi.data_digest) {
174 logger().error("full-object read crc {} != expected {} on {}",
175 crc, oi.data_digest, oi.soid);
176 // todo: mark soid missing, perform recovery, and retry
177 return false;
178 }
179 }
180 return true;
181}
182
20effc67
TL
183PGBackend::read_ierrorator::future<>
184PGBackend::read(const ObjectState& os, OSDOp& osd_op,
185 object_stat_sum_t& delta_stats)
9f95a23c 186{
f67539c2
TL
187 const auto& oi = os.oi;
188 const ceph_osd_op& op = osd_op.op;
189 const uint64_t offset = op.extent.offset;
190 uint64_t length = op.extent.length;
9f95a23c 191 logger().trace("read: {} {}~{}", oi.soid, offset, length);
f67539c2
TL
192
193 if (!os.exists || os.oi.is_whiteout()) {
194 logger().debug("{}: {} DNE", __func__, os.oi.soid);
195 return crimson::ct_error::enoent::make();
196 }
9f95a23c
TL
197 // are we beyond truncate_size?
198 size_t size = oi.size;
f67539c2
TL
199 if ((op.extent.truncate_seq > oi.truncate_seq) &&
200 (op.extent.truncate_size < offset + length) &&
201 (op.extent.truncate_size < size)) {
202 size = op.extent.truncate_size;
203 }
204 if (offset >= size) {
205 // read size was trimmed to zero and it is expected to do nothing,
206 return read_errorator::now();
9f95a23c
TL
207 }
208 if (!length) {
209 // read the whole object if length is 0
210 length = size;
211 }
20effc67
TL
212 return _read(oi.soid, offset, length, op.flags).safe_then_interruptible_tuple(
213 [&delta_stats, &oi, &osd_op](auto&& bl) -> read_errorator::future<> {
f67539c2 214 if (!_read_verify_data(oi, bl)) {
20effc67 215 // crc mismatches
f67539c2
TL
216 return crimson::ct_error::object_corrupted::make();
217 }
218 logger().debug("read: data length: {}", bl.length());
219 osd_op.rval = bl.length();
20effc67
TL
220 delta_stats.num_rd++;
221 delta_stats.num_rd_kb += shift_round_up(bl.length(), 10);
f67539c2
TL
222 osd_op.outdata = std::move(bl);
223 return read_errorator::now();
20effc67
TL
224 }, crimson::ct_error::input_output_error::handle([] {
225 return read_errorator::future<>{crimson::ct_error::object_corrupted::make()};
226 }),
227 read_errorator::pass_further{});
f67539c2
TL
228}
229
20effc67
TL
230PGBackend::read_ierrorator::future<>
231PGBackend::sparse_read(const ObjectState& os, OSDOp& osd_op,
232 object_stat_sum_t& delta_stats)
f67539c2
TL
233{
234 const auto& op = osd_op.op;
235 logger().trace("sparse_read: {} {}~{}",
236 os.oi.soid, op.extent.offset, op.extent.length);
20effc67 237 return interruptor::make_interruptible(store->fiemap(coll, ghobject_t{os.oi.soid},
f67539c2 238 op.extent.offset,
20effc67
TL
239 op.extent.length)).then_interruptible(
240 [&delta_stats, &os, &osd_op, this](auto&& m) {
f67539c2 241 return seastar::do_with(interval_set<uint64_t>{std::move(m)},
20effc67
TL
242 [&delta_stats, &os, &osd_op, this](auto&& extents) {
243 return interruptor::make_interruptible(store->readv(coll, ghobject_t{os.oi.soid},
244 extents, osd_op.op.flags)).safe_then_interruptible_tuple(
245 [&delta_stats, &os, &osd_op, &extents](auto&& bl) -> read_errorator::future<> {
f67539c2
TL
246 if (_read_verify_data(os.oi, bl)) {
247 osd_op.op.extent.length = bl.length();
248 // re-encode since it might be modified
249 ceph::encode(extents, osd_op.outdata);
250 encode_destructively(bl, osd_op.outdata);
251 logger().trace("sparse_read got {} bytes from object {}",
252 osd_op.op.extent.length, os.oi.soid);
20effc67
TL
253 delta_stats.num_rd++;
254 delta_stats.num_rd_kb += shift_round_up(osd_op.op.extent.length, 10);
f67539c2
TL
255 return read_errorator::make_ready_future<>();
256 } else {
20effc67 257 // crc mismatches
f67539c2
TL
258 return crimson::ct_error::object_corrupted::make();
259 }
20effc67
TL
260 }, crimson::ct_error::input_output_error::handle([] {
261 return read_errorator::future<>{crimson::ct_error::object_corrupted::make()};
262 }),
263 read_errorator::pass_further{});
f67539c2
TL
264 });
265 });
266}
267
268namespace {
269
270 template<class CSum>
271 PGBackend::checksum_errorator::future<>
272 do_checksum(ceph::bufferlist& init_value_bl,
273 size_t chunk_size,
274 const ceph::bufferlist& buf,
275 ceph::bufferlist& result)
276 {
277 typename CSum::init_value_t init_value;
278 auto init_value_p = init_value_bl.cbegin();
279 try {
280 decode(init_value, init_value_p);
281 // chop off the consumed part
282 init_value_bl.splice(0, init_value_p.get_off());
283 } catch (const ceph::buffer::end_of_buffer&) {
284 logger().warn("{}: init value not provided", __func__);
285 return crimson::ct_error::invarg::make();
286 }
287 const uint32_t chunk_count = buf.length() / chunk_size;
288 ceph::bufferptr csum_data{
289 ceph::buffer::create(sizeof(typename CSum::value_t) * chunk_count)};
290 Checksummer::calculate<CSum>(
291 init_value, chunk_size, 0, buf.length(), buf, &csum_data);
292 encode(chunk_count, result);
293 result.append(std::move(csum_data));
294 return PGBackend::checksum_errorator::now();
9f95a23c 295 }
f67539c2
TL
296}
297
20effc67 298PGBackend::checksum_ierrorator::future<>
f67539c2
TL
299PGBackend::checksum(const ObjectState& os, OSDOp& osd_op)
300{
301 // sanity tests and normalize the argments
302 auto& checksum = osd_op.op.checksum;
303 if (checksum.offset == 0 && checksum.length == 0) {
304 // zeroed offset+length implies checksum whole object
305 checksum.length = os.oi.size;
306 } else if (checksum.offset >= os.oi.size) {
307 // read size was trimmed to zero, do nothing,
308 // see PGBackend::read()
309 return checksum_errorator::now();
310 }
311 if (checksum.chunk_size > 0) {
312 if (checksum.length == 0) {
313 logger().warn("{}: length required when chunk size provided", __func__);
314 return crimson::ct_error::invarg::make();
315 }
316 if (checksum.length % checksum.chunk_size != 0) {
317 logger().warn("{}: length not aligned to chunk size", __func__);
318 return crimson::ct_error::invarg::make();
319 }
320 } else {
321 checksum.chunk_size = checksum.length;
322 }
323 if (checksum.length == 0) {
324 uint32_t count = 0;
325 encode(count, osd_op.outdata);
326 return checksum_errorator::now();
327 }
328
329 // read the chunk to be checksum'ed
20effc67
TL
330 return _read(os.oi.soid, checksum.offset, checksum.length, osd_op.op.flags)
331 .safe_then_interruptible(
f67539c2
TL
332 [&osd_op](auto&& read_bl) mutable -> checksum_errorator::future<> {
333 auto& checksum = osd_op.op.checksum;
334 if (read_bl.length() != checksum.length) {
335 logger().warn("checksum: bytes read {} != {}",
336 read_bl.length(), checksum.length);
337 return crimson::ct_error::invarg::make();
338 }
339 // calculate its checksum and put the result in outdata
340 switch (checksum.type) {
341 case CEPH_OSD_CHECKSUM_OP_TYPE_XXHASH32:
342 return do_checksum<Checksummer::xxhash32>(osd_op.indata,
343 checksum.chunk_size,
344 read_bl,
345 osd_op.outdata);
346 case CEPH_OSD_CHECKSUM_OP_TYPE_XXHASH64:
347 return do_checksum<Checksummer::xxhash64>(osd_op.indata,
348 checksum.chunk_size,
349 read_bl,
350 osd_op.outdata);
351 case CEPH_OSD_CHECKSUM_OP_TYPE_CRC32C:
352 return do_checksum<Checksummer::crc32c>(osd_op.indata,
353 checksum.chunk_size,
354 read_bl,
355 osd_op.outdata);
356 default:
357 logger().warn("checksum: unknown crc type ({})",
358 static_cast<uint32_t>(checksum.type));
359 return crimson::ct_error::invarg::make();
360 }
361 });
362}
363
20effc67 364PGBackend::cmp_ext_ierrorator::future<>
f67539c2
TL
365PGBackend::cmp_ext(const ObjectState& os, OSDOp& osd_op)
366{
367 const ceph_osd_op& op = osd_op.op;
368 // return the index of the first unmatched byte in the payload, hence the
369 // strange limit and check
370 if (op.extent.length > MAX_ERRNO) {
371 return crimson::ct_error::invarg::make();
372 }
373 uint64_t obj_size = os.oi.size;
374 if (os.oi.truncate_seq < op.extent.truncate_seq &&
375 op.extent.offset + op.extent.length > op.extent.truncate_size) {
376 obj_size = op.extent.truncate_size;
377 }
378 uint64_t ext_len;
379 if (op.extent.offset >= obj_size) {
380 ext_len = 0;
381 } else if (op.extent.offset + op.extent.length > obj_size) {
382 ext_len = obj_size - op.extent.offset;
383 } else {
384 ext_len = op.extent.length;
385 }
20effc67 386 auto read_ext = ll_read_ierrorator::make_ready_future<ceph::bufferlist>();
f67539c2
TL
387 if (ext_len == 0) {
388 logger().debug("{}: zero length extent", __func__);
389 } else if (!os.exists || os.oi.is_whiteout()) {
390 logger().debug("{}: {} DNE", __func__, os.oi.soid);
391 } else {
392 read_ext = _read(os.oi.soid, op.extent.offset, ext_len, 0);
393 }
20effc67 394 return read_ext.safe_then_interruptible([&osd_op](auto&& read_bl) {
f67539c2
TL
395 int32_t retcode = 0;
396 for (unsigned index = 0; index < osd_op.indata.length(); index++) {
397 char byte_in_op = osd_op.indata[index];
398 char byte_from_disk = (index < read_bl.length() ? read_bl[index] : 0);
399 if (byte_in_op != byte_from_disk) {
400 logger().debug("cmp_ext: mismatch at {}", index);
401 retcode = -MAX_ERRNO - index;
402 break;
9f95a23c 403 }
f67539c2
TL
404 }
405 logger().debug("cmp_ext: {}", retcode);
406 osd_op.rval = retcode;
407 });
9f95a23c
TL
408}
409
20effc67
TL
410PGBackend::stat_ierrorator::future<>
411PGBackend::stat(
9f95a23c 412 const ObjectState& os,
20effc67
TL
413 OSDOp& osd_op,
414 object_stat_sum_t& delta_stats)
9f95a23c
TL
415{
416 if (os.exists/* TODO: && !os.is_whiteout() */) {
417 logger().debug("stat os.oi.size={}, os.oi.mtime={}", os.oi.size, os.oi.mtime);
418 encode(os.oi.size, osd_op.outdata);
419 encode(os.oi.mtime, osd_op.outdata);
420 } else {
421 logger().debug("stat object does not exist");
422 return crimson::ct_error::enoent::make();
423 }
20effc67 424 delta_stats.num_rd++;
9f95a23c 425 return stat_errorator::now();
9f95a23c
TL
426}
427
428bool PGBackend::maybe_create_new_object(
429 ObjectState& os,
20effc67
TL
430 ceph::os::Transaction& txn,
431 object_stat_sum_t& delta_stats)
9f95a23c
TL
432{
433 if (!os.exists) {
434 ceph_assert(!os.oi.is_whiteout());
435 os.exists = true;
436 os.oi.new_object();
437
438 txn.touch(coll->get_cid(), ghobject_t{os.oi.soid});
20effc67 439 delta_stats.num_objects++;
9f95a23c
TL
440 return false;
441 } else if (os.oi.is_whiteout()) {
442 os.oi.clear_flag(object_info_t::FLAG_WHITEOUT);
20effc67 443 delta_stats.num_whiteouts--;
9f95a23c
TL
444 }
445 return true;
446}
447
20effc67
TL
448void PGBackend::update_size_and_usage(object_stat_sum_t& delta_stats,
449 object_info_t& oi, uint64_t offset,
450 uint64_t length, bool write_full)
451{
452 if (write_full ||
453 (offset + length > oi.size && length)) {
454 uint64_t new_size = offset + length;
455 delta_stats.num_bytes -= oi.size;
456 delta_stats.num_bytes += new_size;
457 oi.size = new_size;
458 }
459 delta_stats.num_wr++;
460 delta_stats.num_wr_kb += shift_round_up(length, 10);
461}
462
463void PGBackend::truncate_update_size_and_usage(object_stat_sum_t& delta_stats,
464 object_info_t& oi,
465 uint64_t truncate_size)
466{
467 if (oi.size != truncate_size) {
468 delta_stats.num_bytes -= oi.size;
469 delta_stats.num_bytes += truncate_size;
470 oi.size = truncate_size;
471 }
472}
473
f67539c2
TL
474static bool is_offset_and_length_valid(
475 const std::uint64_t offset,
476 const std::uint64_t length)
477{
478 if (const std::uint64_t max = local_conf()->osd_max_object_size;
479 offset >= max || length > max || offset + length > max) {
480 logger().debug("{} osd_max_object_size: {}, offset: {}, len: {}; "
481 "Hard limit of object size is 4GB",
482 __func__, max, offset, length);
483 return false;
484 } else {
485 return true;
486 }
487}
488
20effc67 489PGBackend::interruptible_future<> PGBackend::write(
9f95a23c
TL
490 ObjectState& os,
491 const OSDOp& osd_op,
f67539c2 492 ceph::os::Transaction& txn,
20effc67
TL
493 osd_op_params_t& osd_op_params,
494 object_stat_sum_t& delta_stats)
9f95a23c
TL
495{
496 const ceph_osd_op& op = osd_op.op;
497 uint64_t offset = op.extent.offset;
498 uint64_t length = op.extent.length;
499 bufferlist buf = osd_op.indata;
500 if (auto seq = os.oi.truncate_seq;
501 seq != 0 && op.extent.truncate_seq < seq) {
502 // old write, arrived after trimtrunc
503 if (offset + length > os.oi.size) {
504 // no-op
505 if (offset > os.oi.size) {
506 length = 0;
507 buf.clear();
508 } else {
509 // truncate
510 auto len = os.oi.size - offset;
511 buf.splice(len, length);
512 length = len;
513 }
514 }
515 } else if (op.extent.truncate_seq > seq) {
516 // write arrives before trimtrunc
517 if (os.exists && !os.oi.is_whiteout()) {
518 txn.truncate(coll->get_cid(),
519 ghobject_t{os.oi.soid}, op.extent.truncate_size);
520 if (op.extent.truncate_size != os.oi.size) {
521 os.oi.size = length;
f67539c2
TL
522 if (op.extent.truncate_size > os.oi.size) {
523 osd_op_params.clean_regions.mark_data_region_dirty(os.oi.size,
524 op.extent.truncate_size - os.oi.size);
525 } else {
526 osd_op_params.clean_regions.mark_data_region_dirty(op.extent.truncate_size,
527 os.oi.size - op.extent.truncate_size);
528 }
9f95a23c 529 }
20effc67 530 truncate_update_size_and_usage(delta_stats, os.oi, op.extent.truncate_size);
9f95a23c
TL
531 }
532 os.oi.truncate_seq = op.extent.truncate_seq;
533 os.oi.truncate_size = op.extent.truncate_size;
534 }
20effc67 535 maybe_create_new_object(os, txn, delta_stats);
9f95a23c
TL
536 if (length == 0) {
537 if (offset > os.oi.size) {
538 txn.truncate(coll->get_cid(), ghobject_t{os.oi.soid}, op.extent.offset);
20effc67 539 truncate_update_size_and_usage(delta_stats, os.oi, op.extent.offset);
9f95a23c
TL
540 } else {
541 txn.nop();
542 }
543 } else {
544 txn.write(coll->get_cid(), ghobject_t{os.oi.soid},
545 offset, length, std::move(buf), op.flags);
20effc67 546 update_size_and_usage(delta_stats, os.oi, offset, length);
9f95a23c 547 }
f67539c2
TL
548 osd_op_params.clean_regions.mark_data_region_dirty(op.extent.offset,
549 op.extent.length);
550
551 return seastar::now();
552}
553
20effc67 554PGBackend::interruptible_future<> PGBackend::write_same(
f67539c2
TL
555 ObjectState& os,
556 const OSDOp& osd_op,
557 ceph::os::Transaction& txn,
20effc67
TL
558 osd_op_params_t& osd_op_params,
559 object_stat_sum_t& delta_stats)
f67539c2
TL
560{
561 const ceph_osd_op& op = osd_op.op;
562 const uint64_t len = op.writesame.length;
563 if (len == 0) {
564 return seastar::now();
565 }
566 if (op.writesame.data_length == 0 ||
567 len % op.writesame.data_length != 0 ||
568 op.writesame.data_length != osd_op.indata.length()) {
569 throw crimson::osd::invalid_argument();
570 }
571 ceph::bufferlist repeated_indata;
572 for (uint64_t size = 0; size < len; size += op.writesame.data_length) {
573 repeated_indata.append(osd_op.indata);
574 }
20effc67 575 maybe_create_new_object(os, txn, delta_stats);
f67539c2
TL
576 txn.write(coll->get_cid(), ghobject_t{os.oi.soid},
577 op.writesame.offset, len,
578 std::move(repeated_indata), op.flags);
20effc67 579 update_size_and_usage(delta_stats, os.oi, op.writesame.offset, len);
f67539c2 580 osd_op_params.clean_regions.mark_data_region_dirty(op.writesame.offset, len);
9f95a23c
TL
581 return seastar::now();
582}
583
20effc67 584PGBackend::interruptible_future<> PGBackend::writefull(
9f95a23c
TL
585 ObjectState& os,
586 const OSDOp& osd_op,
f67539c2 587 ceph::os::Transaction& txn,
20effc67
TL
588 osd_op_params_t& osd_op_params,
589 object_stat_sum_t& delta_stats)
9f95a23c
TL
590{
591 const ceph_osd_op& op = osd_op.op;
592 if (op.extent.length != osd_op.indata.length()) {
593 throw crimson::osd::invalid_argument();
594 }
595
20effc67 596 const bool existing = maybe_create_new_object(os, txn, delta_stats);
9f95a23c
TL
597 if (existing && op.extent.length < os.oi.size) {
598 txn.truncate(coll->get_cid(), ghobject_t{os.oi.soid}, op.extent.length);
20effc67 599 truncate_update_size_and_usage(delta_stats, os.oi, op.extent.truncate_size);
f67539c2
TL
600 osd_op_params.clean_regions.mark_data_region_dirty(op.extent.length,
601 os.oi.size - op.extent.length);
9f95a23c
TL
602 }
603 if (op.extent.length) {
604 txn.write(coll->get_cid(), ghobject_t{os.oi.soid}, 0, op.extent.length,
605 osd_op.indata, op.flags);
20effc67
TL
606 update_size_and_usage(delta_stats, os.oi, 0,
607 op.extent.length, true);
f67539c2
TL
608 osd_op_params.clean_regions.mark_data_region_dirty(0,
609 std::max((uint64_t) op.extent.length, os.oi.size));
610 }
611 return seastar::now();
612}
613
20effc67 614PGBackend::append_ierrorator::future<> PGBackend::append(
f67539c2
TL
615 ObjectState& os,
616 OSDOp& osd_op,
617 ceph::os::Transaction& txn,
20effc67
TL
618 osd_op_params_t& osd_op_params,
619 object_stat_sum_t& delta_stats)
f67539c2
TL
620{
621 const ceph_osd_op& op = osd_op.op;
622 if (op.extent.length != osd_op.indata.length()) {
623 return crimson::ct_error::invarg::make();
624 }
20effc67 625 maybe_create_new_object(os, txn, delta_stats);
f67539c2
TL
626 if (op.extent.length) {
627 txn.write(coll->get_cid(), ghobject_t{os.oi.soid},
628 os.oi.size /* offset */, op.extent.length,
629 std::move(osd_op.indata), op.flags);
20effc67
TL
630 update_size_and_usage(delta_stats, os.oi, os.oi.size,
631 op.extent.length);
f67539c2
TL
632 osd_op_params.clean_regions.mark_data_region_dirty(os.oi.size,
633 op.extent.length);
9f95a23c
TL
634 }
635 return seastar::now();
636}
637
20effc67 638PGBackend::write_iertr::future<> PGBackend::truncate(
f67539c2
TL
639 ObjectState& os,
640 const OSDOp& osd_op,
641 ceph::os::Transaction& txn,
20effc67
TL
642 osd_op_params_t& osd_op_params,
643 object_stat_sum_t& delta_stats)
f67539c2
TL
644{
645 if (!os.exists || os.oi.is_whiteout()) {
646 logger().debug("{} object dne, truncate is a no-op", __func__);
647 return write_ertr::now();
648 }
649 const ceph_osd_op& op = osd_op.op;
650 if (!is_offset_and_length_valid(op.extent.offset, op.extent.length)) {
651 return crimson::ct_error::file_too_large::make();
652 }
653 if (op.extent.truncate_seq) {
654 assert(op.extent.offset == op.extent.truncate_size);
655 if (op.extent.truncate_seq <= os.oi.truncate_seq) {
656 logger().debug("{} truncate seq {} <= current {}, no-op",
657 __func__, op.extent.truncate_seq, os.oi.truncate_seq);
658 return write_ertr::make_ready_future<>();
659 } else {
660 logger().debug("{} truncate seq {} > current {}, truncating",
661 __func__, op.extent.truncate_seq, os.oi.truncate_seq);
662 os.oi.truncate_seq = op.extent.truncate_seq;
663 os.oi.truncate_size = op.extent.truncate_size;
664 }
665 }
20effc67 666 maybe_create_new_object(os, txn, delta_stats);
f67539c2
TL
667 if (os.oi.size != op.extent.offset) {
668 txn.truncate(coll->get_cid(),
669 ghobject_t{os.oi.soid}, op.extent.offset);
670 if (os.oi.size > op.extent.offset) {
671 // TODO: modified_ranges.union_of(trim);
672 osd_op_params.clean_regions.mark_data_region_dirty(
673 op.extent.offset,
674 os.oi.size - op.extent.offset);
675 } else {
676 // os.oi.size < op.extent.offset
677 osd_op_params.clean_regions.mark_data_region_dirty(
678 os.oi.size,
679 op.extent.offset - os.oi.size);
680 }
20effc67 681 truncate_update_size_and_usage(delta_stats, os.oi, op.extent.offset);
f67539c2
TL
682 os.oi.clear_data_digest();
683 }
20effc67 684 delta_stats.num_wr++;
f67539c2
TL
685 // ----
686 // do no set exists, or we will break above DELETE -> TRUNCATE munging.
687 return write_ertr::now();
688}
689
20effc67 690PGBackend::write_iertr::future<> PGBackend::zero(
f67539c2
TL
691 ObjectState& os,
692 const OSDOp& osd_op,
693 ceph::os::Transaction& txn,
20effc67
TL
694 osd_op_params_t& osd_op_params,
695 object_stat_sum_t& delta_stats)
f67539c2
TL
696{
697 if (!os.exists || os.oi.is_whiteout()) {
698 logger().debug("{} object dne, zero is a no-op", __func__);
699 return write_ertr::now();
700 }
701 const ceph_osd_op& op = osd_op.op;
702 if (!is_offset_and_length_valid(op.extent.offset, op.extent.length)) {
703 return crimson::ct_error::file_too_large::make();
704 }
705 assert(op.extent.length);
706 txn.zero(coll->get_cid(),
707 ghobject_t{os.oi.soid},
708 op.extent.offset,
709 op.extent.length);
710 // TODO: modified_ranges.union_of(zeroed);
711 osd_op_params.clean_regions.mark_data_region_dirty(op.extent.offset,
712 op.extent.length);
20effc67 713 delta_stats.num_wr++;
f67539c2
TL
714 os.oi.clear_data_digest();
715 return write_ertr::now();
716}
717
20effc67 718PGBackend::interruptible_future<> PGBackend::create(
9f95a23c
TL
719 ObjectState& os,
720 const OSDOp& osd_op,
20effc67
TL
721 ceph::os::Transaction& txn,
722 object_stat_sum_t& delta_stats)
9f95a23c
TL
723{
724 if (os.exists && !os.oi.is_whiteout() &&
725 (osd_op.op.flags & CEPH_OSD_OP_FLAG_EXCL)) {
726 // this is an exclusive create
727 throw crimson::osd::make_error(-EEXIST);
728 }
729
730 if (osd_op.indata.length()) {
731 // handle the legacy. `category` is no longer implemented.
732 try {
733 auto p = osd_op.indata.cbegin();
734 std::string category;
735 decode(category, p);
736 } catch (buffer::error&) {
737 throw crimson::osd::invalid_argument();
738 }
739 }
20effc67 740 maybe_create_new_object(os, txn, delta_stats);
9f95a23c
TL
741 txn.nop();
742 return seastar::now();
743}
744
20effc67
TL
745PGBackend::interruptible_future<>
746PGBackend::remove(ObjectState& os, ceph::os::Transaction& txn)
9f95a23c
TL
747{
748 // todo: snapset
749 txn.remove(coll->get_cid(),
750 ghobject_t{os.oi.soid, ghobject_t::NO_GEN, shard});
751 os.oi.size = 0;
752 os.oi.new_object();
753 os.exists = false;
754 // todo: update watchers
755 if (os.oi.is_whiteout()) {
756 os.oi.clear_flag(object_info_t::FLAG_WHITEOUT);
757 }
758 return seastar::now();
759}
760
20effc67
TL
761PGBackend::interruptible_future<>
762PGBackend::remove(ObjectState& os, ceph::os::Transaction& txn,
763 object_stat_sum_t& delta_stats)
764{
765 // todo: snapset
766 txn.remove(coll->get_cid(),
767 ghobject_t{os.oi.soid, ghobject_t::NO_GEN, shard});
768 delta_stats.num_bytes -= os.oi.size;
769 os.oi.size = 0;
770 os.oi.new_object();
771 os.exists = false;
772 // todo: update watchers
773 if (os.oi.is_whiteout()) {
774 os.oi.clear_flag(object_info_t::FLAG_WHITEOUT);
775 delta_stats.num_whiteouts--;
776 }
777 delta_stats.num_objects--;
778 return seastar::now();
779}
780
781PGBackend::interruptible_future<std::tuple<std::vector<hobject_t>, hobject_t>>
9f95a23c
TL
782PGBackend::list_objects(const hobject_t& start, uint64_t limit) const
783{
f67539c2
TL
784 if (__builtin_expect(stopping, false)) {
785 throw crimson::common::system_shutdown_exception();
786 }
787
9f95a23c 788 auto gstart = start.is_min() ? ghobject_t{} : ghobject_t{start, 0, shard};
20effc67
TL
789 return interruptor::make_interruptible(store->list_objects(coll,
790 gstart,
791 ghobject_t::get_max(),
792 limit))
793 .then_interruptible([](auto ret) {
f67539c2 794 auto& [gobjects, next] = ret;
9f95a23c
TL
795 std::vector<hobject_t> objects;
796 boost::copy(gobjects |
797 boost::adaptors::filtered([](const ghobject_t& o) {
798 if (o.is_pgmeta()) {
799 return false;
800 } else if (o.hobj.is_temp()) {
801 return false;
802 } else {
803 return o.is_no_gen();
804 }
805 }) |
806 boost::adaptors::transformed([](const ghobject_t& o) {
807 return o.hobj;
808 }),
809 std::back_inserter(objects));
f67539c2
TL
810 return seastar::make_ready_future<std::tuple<std::vector<hobject_t>, hobject_t>>(
811 std::make_tuple(objects, next.hobj));
9f95a23c
TL
812 });
813}
814
20effc67 815PGBackend::interruptible_future<> PGBackend::setxattr(
9f95a23c
TL
816 ObjectState& os,
817 const OSDOp& osd_op,
20effc67
TL
818 ceph::os::Transaction& txn,
819 object_stat_sum_t& delta_stats)
9f95a23c
TL
820{
821 if (local_conf()->osd_max_attr_size > 0 &&
822 osd_op.op.xattr.value_len > local_conf()->osd_max_attr_size) {
823 throw crimson::osd::make_error(-EFBIG);
824 }
825
826 const auto max_name_len = std::min<uint64_t>(
827 store->get_max_attr_name_length(), local_conf()->osd_max_attr_name_len);
828 if (osd_op.op.xattr.name_len > max_name_len) {
829 throw crimson::osd::make_error(-ENAMETOOLONG);
830 }
831
20effc67 832 maybe_create_new_object(os, txn, delta_stats);
9f95a23c 833
f67539c2 834 std::string name{"_"};
9f95a23c
TL
835 ceph::bufferlist val;
836 {
837 auto bp = osd_op.indata.cbegin();
f67539c2 838 bp.copy(osd_op.op.xattr.name_len, name);
9f95a23c
TL
839 bp.copy(osd_op.op.xattr.value_len, val);
840 }
841 logger().debug("setxattr on obj={} for attr={}", os.oi.soid, name);
9f95a23c 842 txn.setattr(coll->get_cid(), ghobject_t{os.oi.soid}, name, val);
20effc67 843 delta_stats.num_wr++;
9f95a23c 844 return seastar::now();
9f95a23c
TL
845}
846
20effc67 847PGBackend::get_attr_ierrorator::future<> PGBackend::getxattr(
9f95a23c 848 const ObjectState& os,
20effc67
TL
849 OSDOp& osd_op,
850 object_stat_sum_t& delta_stats) const
9f95a23c
TL
851{
852 std::string name;
853 ceph::bufferlist val;
854 {
855 auto bp = osd_op.indata.cbegin();
856 std::string aname;
857 bp.copy(osd_op.op.xattr.name_len, aname);
858 name = "_" + aname;
859 }
860 logger().debug("getxattr on obj={} for attr={}", os.oi.soid, name);
20effc67
TL
861 return getxattr(os.oi.soid, name).safe_then_interruptible(
862 [&delta_stats, &osd_op] (ceph::bufferlist&& val) {
863 osd_op.outdata = std::move(val);
9f95a23c 864 osd_op.op.xattr.value_len = osd_op.outdata.length();
20effc67
TL
865 delta_stats.num_rd++;
866 delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10);
9f95a23c 867 return get_attr_errorator::now();
9f95a23c 868 });
9f95a23c
TL
869}
870
20effc67
TL
871PGBackend::get_attr_ierrorator::future<ceph::bufferlist>
872PGBackend::getxattr(
9f95a23c
TL
873 const hobject_t& soid,
874 std::string_view key) const
875{
f67539c2
TL
876 if (__builtin_expect(stopping, false)) {
877 throw crimson::common::system_shutdown_exception();
878 }
879
9f95a23c
TL
880 return store->get_attr(coll, ghobject_t{soid}, key);
881}
882
20effc67 883PGBackend::get_attr_ierrorator::future<> PGBackend::get_xattrs(
f67539c2 884 const ObjectState& os,
20effc67
TL
885 OSDOp& osd_op,
886 object_stat_sum_t& delta_stats) const
f67539c2
TL
887{
888 if (__builtin_expect(stopping, false)) {
889 throw crimson::common::system_shutdown_exception();
890 }
891 return store->get_attrs(coll, ghobject_t{os.oi.soid}).safe_then(
20effc67 892 [&delta_stats, &osd_op](auto&& attrs) {
f67539c2 893 std::vector<std::pair<std::string, bufferlist>> user_xattrs;
20effc67 894 ceph::bufferlist bl;
f67539c2
TL
895 for (auto& [key, val] : attrs) {
896 if (key.size() > 1 && key[0] == '_') {
f67539c2
TL
897 bl.append(std::move(val));
898 user_xattrs.emplace_back(key.substr(1), std::move(bl));
899 }
900 }
901 ceph::encode(user_xattrs, osd_op.outdata);
20effc67
TL
902 delta_stats.num_rd++;
903 delta_stats.num_rd_kb += shift_round_up(bl.length(), 10);
f67539c2
TL
904 return get_attr_errorator::now();
905 });
906}
907
20effc67
TL
908namespace {
909
910template<typename U, typename V>
911int do_cmp_xattr(int op, const U& lhs, const V& rhs)
912{
913 switch (op) {
914 case CEPH_OSD_CMPXATTR_OP_EQ:
915 return lhs == rhs;
916 case CEPH_OSD_CMPXATTR_OP_NE:
917 return lhs != rhs;
918 case CEPH_OSD_CMPXATTR_OP_GT:
919 return lhs > rhs;
920 case CEPH_OSD_CMPXATTR_OP_GTE:
921 return lhs >= rhs;
922 case CEPH_OSD_CMPXATTR_OP_LT:
923 return lhs < rhs;
924 case CEPH_OSD_CMPXATTR_OP_LTE:
925 return lhs <= rhs;
926 default:
927 return -EINVAL;
928 }
929}
930
931} // anonymous namespace
932
933static int do_xattr_cmp_u64(int op, uint64_t lhs, bufferlist& rhs_xattr)
934{
935 uint64_t rhs;
936
937 if (rhs_xattr.length() > 0) {
938 const char* first = rhs_xattr.c_str();
939 if (auto [p, ec] = std::from_chars(first, first + rhs_xattr.length(), rhs);
940 ec != std::errc()) {
941 return -EINVAL;
942 }
943 } else {
944 rhs = 0;
945 }
946 logger().debug("do_xattr_cmp_u64 '{}' vs '{}' op {}", lhs, rhs, op);
947 return do_cmp_xattr(op, lhs, rhs);
948}
949
950PGBackend::cmp_xattr_ierrorator::future<> PGBackend::cmp_xattr(
951 const ObjectState& os,
952 OSDOp& osd_op,
953 object_stat_sum_t& delta_stats) const
954{
955 std::string name{"_"};
956 auto bp = osd_op.indata.cbegin();
957 bp.copy(osd_op.op.xattr.name_len, name);
958
959 logger().debug("cmpxattr on obj={} for attr={}", os.oi.soid, name);
960 return getxattr(os.oi.soid, name).safe_then_interruptible(
961 [&delta_stats, &osd_op] (auto &&xattr) {
962 int result = 0;
963 auto bp = osd_op.indata.cbegin();
964 bp += osd_op.op.xattr.name_len;
965
966 switch (osd_op.op.xattr.cmp_mode) {
967 case CEPH_OSD_CMPXATTR_MODE_STRING:
968 {
969 string lhs;
970 bp.copy(osd_op.op.xattr.value_len, lhs);
971 string_view rhs(xattr.c_str(), xattr.length());
972 result = do_cmp_xattr(osd_op.op.xattr.cmp_op, lhs, rhs);
973 logger().debug("cmpxattr lhs={}, rhs={}", lhs, rhs);
974 }
975 break;
976 case CEPH_OSD_CMPXATTR_MODE_U64:
977 {
978 uint64_t lhs;
979 try {
980 decode(lhs, bp);
981 } catch (ceph::buffer::error& e) {
982 logger().info("cmp_xattr: buffer error expection");
983 result = -EINVAL;
984 break;
985 }
986 result = do_xattr_cmp_u64(osd_op.op.xattr.cmp_op, lhs, xattr);
987 }
988 break;
989 default:
990 logger().info("bad cmp mode {}", osd_op.op.xattr.cmp_mode);
991 result = -EINVAL;
992 }
993 if (result == 0) {
994 logger().info("cmp_xattr: comparison returned false");
995 osd_op.rval = -ECANCELED;
996 } else {
997 osd_op.rval = result;
998 }
999 delta_stats.num_rd++;
1000 delta_stats.num_rd_kb += shift_round_up(osd_op.op.xattr.value_len, 10);
1001 });
1002}
1003
1004PGBackend::rm_xattr_iertr::future<>
1005PGBackend::rm_xattr(
f67539c2
TL
1006 ObjectState& os,
1007 const OSDOp& osd_op,
1008 ceph::os::Transaction& txn)
1009{
1010 if (__builtin_expect(stopping, false)) {
1011 throw crimson::common::system_shutdown_exception();
1012 }
1013 if (!os.exists || os.oi.is_whiteout()) {
1014 logger().debug("{}: {} DNE", __func__, os.oi.soid);
1015 return crimson::ct_error::enoent::make();
1016 }
1017 auto bp = osd_op.indata.cbegin();
1018 string attr_name{"_"};
1019 bp.copy(osd_op.op.xattr.name_len, attr_name);
1020 txn.rmattr(coll->get_cid(), ghobject_t{os.oi.soid}, attr_name);
20effc67 1021 return rm_xattr_iertr::now();
f67539c2
TL
1022}
1023
1024using get_omap_ertr =
1025 crimson::os::FuturizedStore::read_errorator::extend<
1026 crimson::ct_error::enodata>;
20effc67
TL
1027using get_omap_iertr =
1028 ::crimson::interruptible::interruptible_errorator<
1029 ::crimson::osd::IOInterruptCondition,
1030 get_omap_ertr>;
f67539c2 1031static
20effc67 1032get_omap_iertr::future<
f67539c2 1033 crimson::os::FuturizedStore::omap_values_t>
9f95a23c
TL
1034maybe_get_omap_vals_by_keys(
1035 crimson::os::FuturizedStore* store,
1036 const crimson::os::CollectionRef& coll,
1037 const object_info_t& oi,
1038 const std::set<std::string>& keys_to_get)
1039{
1040 if (oi.is_omap()) {
1041 return store->omap_get_values(coll, ghobject_t{oi.soid}, keys_to_get);
1042 } else {
f67539c2 1043 return crimson::ct_error::enodata::make();
9f95a23c
TL
1044 }
1045}
1046
f67539c2 1047static
20effc67 1048get_omap_iertr::future<
f67539c2 1049 std::tuple<bool, crimson::os::FuturizedStore::omap_values_t>>
9f95a23c
TL
1050maybe_get_omap_vals(
1051 crimson::os::FuturizedStore* store,
1052 const crimson::os::CollectionRef& coll,
1053 const object_info_t& oi,
1054 const std::string& start_after)
1055{
1056 if (oi.is_omap()) {
1057 return store->omap_get_values(coll, ghobject_t{oi.soid}, start_after);
1058 } else {
f67539c2 1059 return crimson::ct_error::enodata::make();
9f95a23c
TL
1060 }
1061}
1062
20effc67 1063PGBackend::ll_read_ierrorator::future<ceph::bufferlist>
f67539c2
TL
1064PGBackend::omap_get_header(
1065 const crimson::os::CollectionRef& c,
1066 const ghobject_t& oid) const
1067{
1068 return store->omap_get_header(c, oid);
1069}
1070
20effc67 1071PGBackend::ll_read_ierrorator::future<>
f67539c2
TL
1072PGBackend::omap_get_header(
1073 const ObjectState& os,
20effc67
TL
1074 OSDOp& osd_op,
1075 object_stat_sum_t& delta_stats) const
f67539c2 1076{
20effc67
TL
1077 return omap_get_header(coll, ghobject_t{os.oi.soid}).safe_then_interruptible(
1078 [&delta_stats, &osd_op] (ceph::bufferlist&& header) {
f67539c2 1079 osd_op.outdata = std::move(header);
20effc67
TL
1080 delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10);
1081 delta_stats.num_rd++;
f67539c2
TL
1082 return seastar::now();
1083 });
1084}
1085
20effc67 1086PGBackend::ll_read_ierrorator::future<>
f67539c2 1087PGBackend::omap_get_keys(
9f95a23c 1088 const ObjectState& os,
20effc67
TL
1089 OSDOp& osd_op,
1090 object_stat_sum_t& delta_stats) const
9f95a23c 1091{
f67539c2
TL
1092 if (__builtin_expect(stopping, false)) {
1093 throw crimson::common::system_shutdown_exception();
1094 }
1095 if (!os.exists || os.oi.is_whiteout()) {
1096 logger().debug("{}: object does not exist: {}", os.oi.soid);
1097 return crimson::ct_error::enoent::make();
1098 }
9f95a23c
TL
1099 std::string start_after;
1100 uint64_t max_return;
1101 try {
1102 auto p = osd_op.indata.cbegin();
1103 decode(start_after, p);
1104 decode(max_return, p);
1105 } catch (buffer::error&) {
1106 throw crimson::osd::invalid_argument{};
1107 }
1108 max_return =
1109 std::min(max_return, local_conf()->osd_max_omap_entries_per_request);
1110
20effc67 1111
9f95a23c 1112 // TODO: truly chunk the reading
20effc67
TL
1113 return maybe_get_omap_vals(store, coll, os.oi, start_after).safe_then_interruptible(
1114 [=,&delta_stats, &osd_op](auto ret) {
9f95a23c
TL
1115 ceph::bufferlist result;
1116 bool truncated = false;
1117 uint32_t num = 0;
f67539c2
TL
1118 for (auto &[key, val] : std::get<1>(ret)) {
1119 if (num >= max_return ||
9f95a23c
TL
1120 result.length() >= local_conf()->osd_max_omap_bytes_per_request) {
1121 truncated = true;
1122 break;
1123 }
1124 encode(key, result);
f67539c2 1125 ++num;
9f95a23c
TL
1126 }
1127 encode(num, osd_op.outdata);
1128 osd_op.outdata.claim_append(result);
1129 encode(truncated, osd_op.outdata);
20effc67
TL
1130 delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10);
1131 delta_stats.num_rd++;
9f95a23c 1132 return seastar::now();
20effc67 1133 }).handle_error_interruptible(
f67539c2
TL
1134 crimson::ct_error::enodata::handle([&osd_op] {
1135 uint32_t num = 0;
1136 bool truncated = false;
1137 encode(num, osd_op.outdata);
1138 encode(truncated, osd_op.outdata);
1139 return seastar::now();
1140 }),
1141 ll_read_errorator::pass_further{}
1142 );
9f95a23c
TL
1143}
1144
20effc67 1145PGBackend::ll_read_ierrorator::future<>
f67539c2 1146PGBackend::omap_get_vals(
9f95a23c 1147 const ObjectState& os,
20effc67
TL
1148 OSDOp& osd_op,
1149 object_stat_sum_t& delta_stats) const
9f95a23c 1150{
f67539c2
TL
1151 if (__builtin_expect(stopping, false)) {
1152 throw crimson::common::system_shutdown_exception();
1153 }
1154
9f95a23c
TL
1155 std::string start_after;
1156 uint64_t max_return;
1157 std::string filter_prefix;
1158 try {
1159 auto p = osd_op.indata.cbegin();
1160 decode(start_after, p);
1161 decode(max_return, p);
1162 decode(filter_prefix, p);
1163 } catch (buffer::error&) {
1164 throw crimson::osd::invalid_argument{};
1165 }
1166
1167 max_return = \
1168 std::min(max_return, local_conf()->osd_max_omap_entries_per_request);
20effc67
TL
1169 delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10);
1170 delta_stats.num_rd++;
9f95a23c
TL
1171
1172 // TODO: truly chunk the reading
20effc67
TL
1173 return maybe_get_omap_vals(store, coll, os.oi, start_after)
1174 .safe_then_interruptible(
f67539c2
TL
1175 [=, &osd_op] (auto&& ret) {
1176 auto [done, vals] = std::move(ret);
9f95a23c
TL
1177 assert(done);
1178 ceph::bufferlist result;
1179 bool truncated = false;
1180 uint32_t num = 0;
1181 auto iter = filter_prefix > start_after ? vals.lower_bound(filter_prefix)
1182 : std::begin(vals);
1183 for (; iter != std::end(vals); ++iter) {
1184 const auto& [key, value] = *iter;
1185 if (key.substr(0, filter_prefix.size()) != filter_prefix) {
1186 break;
f67539c2 1187 } else if (num >= max_return ||
9f95a23c
TL
1188 result.length() >= local_conf()->osd_max_omap_bytes_per_request) {
1189 truncated = true;
1190 break;
1191 }
1192 encode(key, result);
1193 encode(value, result);
f67539c2 1194 ++num;
9f95a23c
TL
1195 }
1196 encode(num, osd_op.outdata);
1197 osd_op.outdata.claim_append(result);
1198 encode(truncated, osd_op.outdata);
f67539c2 1199 return ll_read_errorator::now();
20effc67 1200 }).handle_error_interruptible(
f67539c2
TL
1201 crimson::ct_error::enodata::handle([&osd_op] {
1202 encode(uint32_t{0} /* num */, osd_op.outdata);
1203 encode(bool{false} /* truncated */, osd_op.outdata);
1204 return ll_read_errorator::now();
1205 }),
1206 ll_read_errorator::pass_further{}
1207 );
9f95a23c 1208}
f67539c2 1209
20effc67 1210PGBackend::ll_read_ierrorator::future<>
f67539c2 1211PGBackend::omap_get_vals_by_keys(
9f95a23c 1212 const ObjectState& os,
20effc67
TL
1213 OSDOp& osd_op,
1214 object_stat_sum_t& delta_stats) const
9f95a23c 1215{
f67539c2
TL
1216 if (__builtin_expect(stopping, false)) {
1217 throw crimson::common::system_shutdown_exception();
1218 }
1219 if (!os.exists || os.oi.is_whiteout()) {
1220 logger().debug("{}: object does not exist: {}", os.oi.soid);
1221 return crimson::ct_error::enoent::make();
1222 }
1223
9f95a23c
TL
1224 std::set<std::string> keys_to_get;
1225 try {
1226 auto p = osd_op.indata.cbegin();
1227 decode(keys_to_get, p);
1228 } catch (buffer::error&) {
1229 throw crimson::osd::invalid_argument();
1230 }
20effc67
TL
1231 delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10);
1232 delta_stats.num_rd++;
1233 return maybe_get_omap_vals_by_keys(store, coll, os.oi, keys_to_get)
1234 .safe_then_interruptible(
f67539c2 1235 [&osd_op] (crimson::os::FuturizedStore::omap_values_t&& vals) {
9f95a23c 1236 encode(vals, osd_op.outdata);
f67539c2 1237 return ll_read_errorator::now();
20effc67 1238 }).handle_error_interruptible(
f67539c2
TL
1239 crimson::ct_error::enodata::handle([&osd_op] {
1240 uint32_t num = 0;
1241 encode(num, osd_op.outdata);
1242 return ll_read_errorator::now();
1243 }),
1244 ll_read_errorator::pass_further{}
1245 );
9f95a23c
TL
1246}
1247
20effc67
TL
1248PGBackend::interruptible_future<>
1249PGBackend::omap_set_vals(
9f95a23c
TL
1250 ObjectState& os,
1251 const OSDOp& osd_op,
f67539c2 1252 ceph::os::Transaction& txn,
20effc67
TL
1253 osd_op_params_t& osd_op_params,
1254 object_stat_sum_t& delta_stats)
9f95a23c 1255{
20effc67 1256 maybe_create_new_object(os, txn, delta_stats);
9f95a23c
TL
1257
1258 ceph::bufferlist to_set_bl;
1259 try {
1260 auto p = osd_op.indata.cbegin();
1261 decode_str_str_map_to_bl(p, &to_set_bl);
1262 } catch (buffer::error&) {
1263 throw crimson::osd::invalid_argument{};
1264 }
1265
1266 txn.omap_setkeys(coll->get_cid(), ghobject_t{os.oi.soid}, to_set_bl);
20effc67
TL
1267 osd_op_params.clean_regions.mark_omap_dirty();
1268 delta_stats.num_wr++;
1269 delta_stats.num_wr_kb += shift_round_up(to_set_bl.length(), 10);
9f95a23c
TL
1270 os.oi.set_flag(object_info_t::FLAG_OMAP);
1271 os.oi.clear_omap_digest();
f67539c2
TL
1272 return seastar::now();
1273}
1274
20effc67
TL
1275PGBackend::interruptible_future<>
1276PGBackend::omap_set_header(
f67539c2
TL
1277 ObjectState& os,
1278 const OSDOp& osd_op,
20effc67
TL
1279 ceph::os::Transaction& txn,
1280 osd_op_params_t& osd_op_params,
1281 object_stat_sum_t& delta_stats)
f67539c2 1282{
20effc67 1283 maybe_create_new_object(os, txn, delta_stats);
f67539c2 1284 txn.omap_setheader(coll->get_cid(), ghobject_t{os.oi.soid}, osd_op.indata);
20effc67
TL
1285 osd_op_params.clean_regions.mark_omap_dirty();
1286 delta_stats.num_wr++;
f67539c2
TL
1287 os.oi.set_flag(object_info_t::FLAG_OMAP);
1288 os.oi.clear_omap_digest();
1289 return seastar::now();
1290}
1291
20effc67 1292PGBackend::interruptible_future<> PGBackend::omap_remove_range(
f67539c2
TL
1293 ObjectState& os,
1294 const OSDOp& osd_op,
20effc67
TL
1295 ceph::os::Transaction& txn,
1296 object_stat_sum_t& delta_stats)
f67539c2
TL
1297{
1298 std::string key_begin, key_end;
1299 try {
1300 auto p = osd_op.indata.cbegin();
1301 decode(key_begin, p);
1302 decode(key_end, p);
1303 } catch (buffer::error& e) {
1304 throw crimson::osd::invalid_argument{};
1305 }
1306 txn.omap_rmkeyrange(coll->get_cid(), ghobject_t{os.oi.soid}, key_begin, key_end);
20effc67
TL
1307 delta_stats.num_wr++;
1308 os.oi.clear_omap_digest();
1309 return seastar::now();
1310}
1311
1312PGBackend::interruptible_future<> PGBackend::omap_remove_key(
1313 ObjectState& os,
1314 const OSDOp& osd_op,
1315 ceph::os::Transaction& txn)
1316{
1317 ceph::bufferlist to_rm_bl;
1318 try {
1319 auto p = osd_op.indata.cbegin();
1320 decode_str_set_to_bl(p, &to_rm_bl);
1321 } catch (buffer::error& e) {
1322 throw crimson::osd::invalid_argument{};
1323 }
1324 txn.omap_rmkeys(coll->get_cid(), ghobject_t{os.oi.soid}, to_rm_bl);
1325 // TODO:
1326 // ctx->clean_regions.mark_omap_dirty();
1327 // ctx->delta_stats.num_wr++;
f67539c2 1328 os.oi.clear_omap_digest();
9f95a23c
TL
1329 return seastar::now();
1330}
f67539c2 1331
20effc67 1332PGBackend::omap_clear_iertr::future<>
f67539c2
TL
1333PGBackend::omap_clear(
1334 ObjectState& os,
1335 OSDOp& osd_op,
1336 ceph::os::Transaction& txn,
20effc67
TL
1337 osd_op_params_t& osd_op_params,
1338 object_stat_sum_t& delta_stats)
f67539c2
TL
1339{
1340 if (__builtin_expect(stopping, false)) {
1341 throw crimson::common::system_shutdown_exception();
1342 }
1343 if (!os.exists || os.oi.is_whiteout()) {
1344 logger().debug("{}: object does not exist: {}", os.oi.soid);
1345 return crimson::ct_error::enoent::make();
1346 }
1347 if (!os.oi.is_omap()) {
1348 return omap_clear_ertr::now();
1349 }
1350 txn.omap_clear(coll->get_cid(), ghobject_t{os.oi.soid});
1351 osd_op_params.clean_regions.mark_omap_dirty();
20effc67 1352 delta_stats.num_wr++;
f67539c2
TL
1353 os.oi.clear_omap_digest();
1354 os.oi.clear_flag(object_info_t::FLAG_OMAP);
1355 return omap_clear_ertr::now();
1356}
1357
20effc67
TL
1358PGBackend::interruptible_future<struct stat>
1359PGBackend::stat(
f67539c2
TL
1360 CollectionRef c,
1361 const ghobject_t& oid) const
1362{
1363 return store->stat(c, oid);
1364}
1365
20effc67 1366PGBackend::interruptible_future<std::map<uint64_t, uint64_t>>
f67539c2
TL
1367PGBackend::fiemap(
1368 CollectionRef c,
1369 const ghobject_t& oid,
1370 uint64_t off,
1371 uint64_t len)
1372{
1373 return store->fiemap(c, oid, off, len);
1374}
1375
1376void PGBackend::on_activate_complete() {
1377 peering.reset();
1378}
1379