]> git.proxmox.com Git - ceph.git/blame - ceph/src/crimson/osd/pg_backend.cc
buildsys: switch source download to quincy
[ceph.git] / ceph / src / crimson / osd / pg_backend.cc
CommitLineData
9f95a23c
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "pg_backend.h"
5
6#include <optional>
7#include <boost/range/adaptor/filtered.hpp>
8#include <boost/range/adaptor/transformed.hpp>
9#include <boost/range/algorithm/copy.hpp>
10#include <fmt/format.h>
11#include <fmt/ostream.h>
12#include <seastar/core/print.hh>
13
14#include "messages/MOSDOp.h"
15#include "os/Transaction.h"
f67539c2 16#include "common/Checksummer.h"
9f95a23c
TL
17#include "common/Clock.h"
18
f67539c2 19#include "crimson/common/exception.h"
9f95a23c
TL
20#include "crimson/os/futurized_collection.h"
21#include "crimson/os/futurized_store.h"
22#include "crimson/osd/osd_operation.h"
23#include "replicated_backend.h"
f67539c2 24#include "replicated_recovery_backend.h"
9f95a23c
TL
25#include "ec_backend.h"
26#include "exceptions.h"
27
28namespace {
29 seastar::logger& logger() {
30 return crimson::get_logger(ceph_subsys_osd);
31 }
32}
33
34using crimson::common::local_conf;
35
36std::unique_ptr<PGBackend>
37PGBackend::create(pg_t pgid,
38 const pg_shard_t pg_shard,
39 const pg_pool_t& pool,
40 crimson::os::CollectionRef coll,
41 crimson::osd::ShardServices& shard_services,
42 const ec_profile_t& ec_profile)
43{
44 switch (pool.type) {
45 case pg_pool_t::TYPE_REPLICATED:
46 return std::make_unique<ReplicatedBackend>(pgid, pg_shard,
47 coll, shard_services);
48 case pg_pool_t::TYPE_ERASURE:
49 return std::make_unique<ECBackend>(pg_shard.shard, coll, shard_services,
50 std::move(ec_profile),
51 pool.stripe_width);
52 default:
53 throw runtime_error(seastar::format("unsupported pool type '{}'",
54 pool.type));
55 }
56}
57
58PGBackend::PGBackend(shard_id_t shard,
59 CollectionRef coll,
60 crimson::os::FuturizedStore* store)
61 : shard{shard},
62 coll{coll},
63 store{store}
64{}
65
66PGBackend::load_metadata_ertr::future<PGBackend::loaded_object_md_t::ref>
67PGBackend::load_metadata(const hobject_t& oid)
68{
f67539c2
TL
69 if (__builtin_expect(stopping, false)) {
70 throw crimson::common::system_shutdown_exception();
71 }
72
9f95a23c
TL
73 return store->get_attrs(
74 coll,
75 ghobject_t{oid, ghobject_t::NO_GEN, shard}).safe_then(
76 [oid](auto &&attrs) -> load_metadata_ertr::future<loaded_object_md_t::ref>{
77 loaded_object_md_t::ref ret(new loaded_object_md_t());
78 if (auto oiiter = attrs.find(OI_ATTR); oiiter != attrs.end()) {
79 bufferlist bl;
80 bl.push_back(std::move(oiiter->second));
81 ret->os = ObjectState(
82 object_info_t(bl),
83 true);
84 } else {
85 logger().error(
86 "load_metadata: object {} present but missing object info",
87 oid);
88 return crimson::ct_error::object_corrupted::make();
89 }
90
91 if (oid.is_head()) {
92 if (auto ssiter = attrs.find(SS_ATTR); ssiter != attrs.end()) {
93 bufferlist bl;
94 bl.push_back(std::move(ssiter->second));
95 ret->ss = SnapSet(bl);
96 } else {
97 /* TODO: add support for writing out snapsets
98 logger().error(
99 "load_metadata: object {} present but missing snapset",
100 oid);
101 //return crimson::ct_error::object_corrupted::make();
102 */
103 ret->ss = SnapSet();
104 }
105 }
106
107 return load_metadata_ertr::make_ready_future<loaded_object_md_t::ref>(
108 std::move(ret));
109 }, crimson::ct_error::enoent::handle([oid] {
110 logger().debug(
111 "load_metadata: object {} doesn't exist, returning empty metadata",
112 oid);
113 return load_metadata_ertr::make_ready_future<loaded_object_md_t::ref>(
114 new loaded_object_md_t{
115 ObjectState(
116 object_info_t(oid),
117 false),
118 oid.is_head() ? std::optional<SnapSet>(SnapSet()) : std::nullopt
119 });
120 }));
121}
122
123seastar::future<crimson::osd::acked_peers_t>
124PGBackend::mutate_object(
125 std::set<pg_shard_t> pg_shards,
126 crimson::osd::ObjectContextRef &&obc,
127 ceph::os::Transaction&& txn,
f67539c2 128 const osd_op_params_t& osd_op_p,
9f95a23c
TL
129 epoch_t min_epoch,
130 epoch_t map_epoch,
f67539c2 131 std::vector<pg_log_entry_t>&& log_entries)
9f95a23c
TL
132{
133 logger().trace("mutate_object: num_ops={}", txn.get_num_ops());
134 if (obc->obs.exists) {
135#if 0
136 obc->obs.oi.version = ctx->at_version;
137 obc->obs.oi.prior_version = ctx->obs->oi.version;
138#endif
139
f67539c2
TL
140 auto& m = osd_op_p.req;
141 obc->obs.oi.prior_version = obc->obs.oi.version;
142 obc->obs.oi.version = osd_op_p.at_version;
143 if (osd_op_p.user_at_version > obc->obs.oi.user_version)
144 obc->obs.oi.user_version = osd_op_p.user_at_version;
145 obc->obs.oi.last_reqid = m->get_reqid();
146 obc->obs.oi.mtime = m->get_mtime();
9f95a23c
TL
147 obc->obs.oi.local_mtime = ceph_clock_now();
148
149 // object_info_t
150 {
151 ceph::bufferlist osv;
f67539c2 152 encode(obc->obs.oi, osv, CEPH_FEATURES_ALL);
9f95a23c
TL
153 // TODO: get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD, nullptr));
154 txn.setattr(coll->get_cid(), ghobject_t{obc->obs.oi.soid}, OI_ATTR, osv);
155 }
156 } else {
157 // reset cached ObjectState without enforcing eviction
158 obc->obs.oi = object_info_t(obc->obs.oi.soid);
159 }
160 return _submit_transaction(
161 std::move(pg_shards), obc->obs.oi.soid, std::move(txn),
f67539c2 162 std::move(osd_op_p), min_epoch, map_epoch, std::move(log_entries));
9f95a23c
TL
163}
164
165static inline bool _read_verify_data(
166 const object_info_t& oi,
167 const ceph::bufferlist& data)
168{
169 if (oi.is_data_digest() && oi.size == data.length()) {
170 // whole object? can we verify the checksum?
171 if (auto crc = data.crc32c(-1); crc != oi.data_digest) {
172 logger().error("full-object read crc {} != expected {} on {}",
173 crc, oi.data_digest, oi.soid);
174 // todo: mark soid missing, perform recovery, and retry
175 return false;
176 }
177 }
178 return true;
179}
180
f67539c2
TL
181PGBackend::read_errorator::future<>
182PGBackend::read(const ObjectState& os, OSDOp& osd_op)
9f95a23c 183{
f67539c2
TL
184 const auto& oi = os.oi;
185 const ceph_osd_op& op = osd_op.op;
186 const uint64_t offset = op.extent.offset;
187 uint64_t length = op.extent.length;
9f95a23c 188 logger().trace("read: {} {}~{}", oi.soid, offset, length);
f67539c2
TL
189
190 if (!os.exists || os.oi.is_whiteout()) {
191 logger().debug("{}: {} DNE", __func__, os.oi.soid);
192 return crimson::ct_error::enoent::make();
193 }
9f95a23c
TL
194 // are we beyond truncate_size?
195 size_t size = oi.size;
f67539c2
TL
196 if ((op.extent.truncate_seq > oi.truncate_seq) &&
197 (op.extent.truncate_size < offset + length) &&
198 (op.extent.truncate_size < size)) {
199 size = op.extent.truncate_size;
200 }
201 if (offset >= size) {
202 // read size was trimmed to zero and it is expected to do nothing,
203 return read_errorator::now();
9f95a23c
TL
204 }
205 if (!length) {
206 // read the whole object if length is 0
207 length = size;
208 }
f67539c2
TL
209 return _read(oi.soid, offset, length, op.flags).safe_then(
210 [&oi, &osd_op](auto&& bl) -> read_errorator::future<> {
211 if (!_read_verify_data(oi, bl)) {
212 return crimson::ct_error::object_corrupted::make();
213 }
214 logger().debug("read: data length: {}", bl.length());
215 osd_op.rval = bl.length();
216 osd_op.outdata = std::move(bl);
217 return read_errorator::now();
218 });
219}
220
221PGBackend::read_errorator::future<>
222PGBackend::sparse_read(const ObjectState& os, OSDOp& osd_op)
223{
224 const auto& op = osd_op.op;
225 logger().trace("sparse_read: {} {}~{}",
226 os.oi.soid, op.extent.offset, op.extent.length);
227 return store->fiemap(coll, ghobject_t{os.oi.soid},
228 op.extent.offset,
229 op.extent.length).then([&os, &osd_op, this](auto&& m) {
230 return seastar::do_with(interval_set<uint64_t>{std::move(m)},
231 [&os, &osd_op, this](auto&& extents) {
232 return store->readv(coll, ghobject_t{os.oi.soid},
233 extents, osd_op.op.flags).safe_then(
234 [&os, &osd_op, &extents](auto&& bl) -> read_errorator::future<> {
235 if (_read_verify_data(os.oi, bl)) {
236 osd_op.op.extent.length = bl.length();
237 // re-encode since it might be modified
238 ceph::encode(extents, osd_op.outdata);
239 encode_destructively(bl, osd_op.outdata);
240 logger().trace("sparse_read got {} bytes from object {}",
241 osd_op.op.extent.length, os.oi.soid);
242 return read_errorator::make_ready_future<>();
243 } else {
244 // TODO: repair it if crc mismatches
245 return crimson::ct_error::object_corrupted::make();
246 }
247 });
248 });
249 });
250}
251
252namespace {
253
254 template<class CSum>
255 PGBackend::checksum_errorator::future<>
256 do_checksum(ceph::bufferlist& init_value_bl,
257 size_t chunk_size,
258 const ceph::bufferlist& buf,
259 ceph::bufferlist& result)
260 {
261 typename CSum::init_value_t init_value;
262 auto init_value_p = init_value_bl.cbegin();
263 try {
264 decode(init_value, init_value_p);
265 // chop off the consumed part
266 init_value_bl.splice(0, init_value_p.get_off());
267 } catch (const ceph::buffer::end_of_buffer&) {
268 logger().warn("{}: init value not provided", __func__);
269 return crimson::ct_error::invarg::make();
270 }
271 const uint32_t chunk_count = buf.length() / chunk_size;
272 ceph::bufferptr csum_data{
273 ceph::buffer::create(sizeof(typename CSum::value_t) * chunk_count)};
274 Checksummer::calculate<CSum>(
275 init_value, chunk_size, 0, buf.length(), buf, &csum_data);
276 encode(chunk_count, result);
277 result.append(std::move(csum_data));
278 return PGBackend::checksum_errorator::now();
9f95a23c 279 }
f67539c2
TL
280}
281
282PGBackend::checksum_errorator::future<>
283PGBackend::checksum(const ObjectState& os, OSDOp& osd_op)
284{
285 // sanity tests and normalize the argments
286 auto& checksum = osd_op.op.checksum;
287 if (checksum.offset == 0 && checksum.length == 0) {
288 // zeroed offset+length implies checksum whole object
289 checksum.length = os.oi.size;
290 } else if (checksum.offset >= os.oi.size) {
291 // read size was trimmed to zero, do nothing,
292 // see PGBackend::read()
293 return checksum_errorator::now();
294 }
295 if (checksum.chunk_size > 0) {
296 if (checksum.length == 0) {
297 logger().warn("{}: length required when chunk size provided", __func__);
298 return crimson::ct_error::invarg::make();
299 }
300 if (checksum.length % checksum.chunk_size != 0) {
301 logger().warn("{}: length not aligned to chunk size", __func__);
302 return crimson::ct_error::invarg::make();
303 }
304 } else {
305 checksum.chunk_size = checksum.length;
306 }
307 if (checksum.length == 0) {
308 uint32_t count = 0;
309 encode(count, osd_op.outdata);
310 return checksum_errorator::now();
311 }
312
313 // read the chunk to be checksum'ed
314 return _read(os.oi.soid, checksum.offset, checksum.length, osd_op.op.flags).safe_then(
315 [&osd_op](auto&& read_bl) mutable -> checksum_errorator::future<> {
316 auto& checksum = osd_op.op.checksum;
317 if (read_bl.length() != checksum.length) {
318 logger().warn("checksum: bytes read {} != {}",
319 read_bl.length(), checksum.length);
320 return crimson::ct_error::invarg::make();
321 }
322 // calculate its checksum and put the result in outdata
323 switch (checksum.type) {
324 case CEPH_OSD_CHECKSUM_OP_TYPE_XXHASH32:
325 return do_checksum<Checksummer::xxhash32>(osd_op.indata,
326 checksum.chunk_size,
327 read_bl,
328 osd_op.outdata);
329 case CEPH_OSD_CHECKSUM_OP_TYPE_XXHASH64:
330 return do_checksum<Checksummer::xxhash64>(osd_op.indata,
331 checksum.chunk_size,
332 read_bl,
333 osd_op.outdata);
334 case CEPH_OSD_CHECKSUM_OP_TYPE_CRC32C:
335 return do_checksum<Checksummer::crc32c>(osd_op.indata,
336 checksum.chunk_size,
337 read_bl,
338 osd_op.outdata);
339 default:
340 logger().warn("checksum: unknown crc type ({})",
341 static_cast<uint32_t>(checksum.type));
342 return crimson::ct_error::invarg::make();
343 }
344 });
345}
346
347PGBackend::cmp_ext_errorator::future<>
348PGBackend::cmp_ext(const ObjectState& os, OSDOp& osd_op)
349{
350 const ceph_osd_op& op = osd_op.op;
351 // return the index of the first unmatched byte in the payload, hence the
352 // strange limit and check
353 if (op.extent.length > MAX_ERRNO) {
354 return crimson::ct_error::invarg::make();
355 }
356 uint64_t obj_size = os.oi.size;
357 if (os.oi.truncate_seq < op.extent.truncate_seq &&
358 op.extent.offset + op.extent.length > op.extent.truncate_size) {
359 obj_size = op.extent.truncate_size;
360 }
361 uint64_t ext_len;
362 if (op.extent.offset >= obj_size) {
363 ext_len = 0;
364 } else if (op.extent.offset + op.extent.length > obj_size) {
365 ext_len = obj_size - op.extent.offset;
366 } else {
367 ext_len = op.extent.length;
368 }
369 auto read_ext = ll_read_errorator::make_ready_future<ceph::bufferlist>();
370 if (ext_len == 0) {
371 logger().debug("{}: zero length extent", __func__);
372 } else if (!os.exists || os.oi.is_whiteout()) {
373 logger().debug("{}: {} DNE", __func__, os.oi.soid);
374 } else {
375 read_ext = _read(os.oi.soid, op.extent.offset, ext_len, 0);
376 }
377 return read_ext.safe_then([&osd_op](auto&& read_bl) {
378 int32_t retcode = 0;
379 for (unsigned index = 0; index < osd_op.indata.length(); index++) {
380 char byte_in_op = osd_op.indata[index];
381 char byte_from_disk = (index < read_bl.length() ? read_bl[index] : 0);
382 if (byte_in_op != byte_from_disk) {
383 logger().debug("cmp_ext: mismatch at {}", index);
384 retcode = -MAX_ERRNO - index;
385 break;
9f95a23c 386 }
f67539c2
TL
387 }
388 logger().debug("cmp_ext: {}", retcode);
389 osd_op.rval = retcode;
390 });
9f95a23c
TL
391}
392
393PGBackend::stat_errorator::future<> PGBackend::stat(
394 const ObjectState& os,
395 OSDOp& osd_op)
396{
397 if (os.exists/* TODO: && !os.is_whiteout() */) {
398 logger().debug("stat os.oi.size={}, os.oi.mtime={}", os.oi.size, os.oi.mtime);
399 encode(os.oi.size, osd_op.outdata);
400 encode(os.oi.mtime, osd_op.outdata);
401 } else {
402 logger().debug("stat object does not exist");
403 return crimson::ct_error::enoent::make();
404 }
405 return stat_errorator::now();
406 // TODO: ctx->delta_stats.num_rd++;
407}
408
409bool PGBackend::maybe_create_new_object(
410 ObjectState& os,
411 ceph::os::Transaction& txn)
412{
413 if (!os.exists) {
414 ceph_assert(!os.oi.is_whiteout());
415 os.exists = true;
416 os.oi.new_object();
417
418 txn.touch(coll->get_cid(), ghobject_t{os.oi.soid});
419 // TODO: delta_stats.num_objects++
420 return false;
421 } else if (os.oi.is_whiteout()) {
422 os.oi.clear_flag(object_info_t::FLAG_WHITEOUT);
423 // TODO: delta_stats.num_whiteouts--
424 }
425 return true;
426}
427
f67539c2
TL
428static bool is_offset_and_length_valid(
429 const std::uint64_t offset,
430 const std::uint64_t length)
431{
432 if (const std::uint64_t max = local_conf()->osd_max_object_size;
433 offset >= max || length > max || offset + length > max) {
434 logger().debug("{} osd_max_object_size: {}, offset: {}, len: {}; "
435 "Hard limit of object size is 4GB",
436 __func__, max, offset, length);
437 return false;
438 } else {
439 return true;
440 }
441}
442
9f95a23c
TL
443seastar::future<> PGBackend::write(
444 ObjectState& os,
445 const OSDOp& osd_op,
f67539c2
TL
446 ceph::os::Transaction& txn,
447 osd_op_params_t& osd_op_params)
9f95a23c
TL
448{
449 const ceph_osd_op& op = osd_op.op;
450 uint64_t offset = op.extent.offset;
451 uint64_t length = op.extent.length;
452 bufferlist buf = osd_op.indata;
453 if (auto seq = os.oi.truncate_seq;
454 seq != 0 && op.extent.truncate_seq < seq) {
455 // old write, arrived after trimtrunc
456 if (offset + length > os.oi.size) {
457 // no-op
458 if (offset > os.oi.size) {
459 length = 0;
460 buf.clear();
461 } else {
462 // truncate
463 auto len = os.oi.size - offset;
464 buf.splice(len, length);
465 length = len;
466 }
467 }
468 } else if (op.extent.truncate_seq > seq) {
469 // write arrives before trimtrunc
470 if (os.exists && !os.oi.is_whiteout()) {
471 txn.truncate(coll->get_cid(),
472 ghobject_t{os.oi.soid}, op.extent.truncate_size);
473 if (op.extent.truncate_size != os.oi.size) {
474 os.oi.size = length;
475 // TODO: truncate_update_size_and_usage()
f67539c2
TL
476 if (op.extent.truncate_size > os.oi.size) {
477 osd_op_params.clean_regions.mark_data_region_dirty(os.oi.size,
478 op.extent.truncate_size - os.oi.size);
479 } else {
480 osd_op_params.clean_regions.mark_data_region_dirty(op.extent.truncate_size,
481 os.oi.size - op.extent.truncate_size);
482 }
9f95a23c
TL
483 }
484 }
485 os.oi.truncate_seq = op.extent.truncate_seq;
486 os.oi.truncate_size = op.extent.truncate_size;
487 }
488 maybe_create_new_object(os, txn);
489 if (length == 0) {
490 if (offset > os.oi.size) {
491 txn.truncate(coll->get_cid(), ghobject_t{os.oi.soid}, op.extent.offset);
492 } else {
493 txn.nop();
494 }
495 } else {
496 txn.write(coll->get_cid(), ghobject_t{os.oi.soid},
497 offset, length, std::move(buf), op.flags);
498 os.oi.size = std::max(offset + length, os.oi.size);
499 }
f67539c2
TL
500 osd_op_params.clean_regions.mark_data_region_dirty(op.extent.offset,
501 op.extent.length);
502
503 return seastar::now();
504}
505
506seastar::future<> PGBackend::write_same(
507 ObjectState& os,
508 const OSDOp& osd_op,
509 ceph::os::Transaction& txn,
510 osd_op_params_t& osd_op_params)
511{
512 const ceph_osd_op& op = osd_op.op;
513 const uint64_t len = op.writesame.length;
514 if (len == 0) {
515 return seastar::now();
516 }
517 if (op.writesame.data_length == 0 ||
518 len % op.writesame.data_length != 0 ||
519 op.writesame.data_length != osd_op.indata.length()) {
520 throw crimson::osd::invalid_argument();
521 }
522 ceph::bufferlist repeated_indata;
523 for (uint64_t size = 0; size < len; size += op.writesame.data_length) {
524 repeated_indata.append(osd_op.indata);
525 }
526 maybe_create_new_object(os, txn);
527 txn.write(coll->get_cid(), ghobject_t{os.oi.soid},
528 op.writesame.offset, len,
529 std::move(repeated_indata), op.flags);
530 os.oi.size = len;
531 osd_op_params.clean_regions.mark_data_region_dirty(op.writesame.offset, len);
9f95a23c
TL
532 return seastar::now();
533}
534
535seastar::future<> PGBackend::writefull(
536 ObjectState& os,
537 const OSDOp& osd_op,
f67539c2
TL
538 ceph::os::Transaction& txn,
539 osd_op_params_t& osd_op_params)
9f95a23c
TL
540{
541 const ceph_osd_op& op = osd_op.op;
542 if (op.extent.length != osd_op.indata.length()) {
543 throw crimson::osd::invalid_argument();
544 }
545
546 const bool existing = maybe_create_new_object(os, txn);
547 if (existing && op.extent.length < os.oi.size) {
548 txn.truncate(coll->get_cid(), ghobject_t{os.oi.soid}, op.extent.length);
f67539c2
TL
549 osd_op_params.clean_regions.mark_data_region_dirty(op.extent.length,
550 os.oi.size - op.extent.length);
9f95a23c
TL
551 }
552 if (op.extent.length) {
553 txn.write(coll->get_cid(), ghobject_t{os.oi.soid}, 0, op.extent.length,
554 osd_op.indata, op.flags);
555 os.oi.size = op.extent.length;
f67539c2
TL
556 osd_op_params.clean_regions.mark_data_region_dirty(0,
557 std::max((uint64_t) op.extent.length, os.oi.size));
558 }
559 return seastar::now();
560}
561
562PGBackend::append_errorator::future<> PGBackend::append(
563 ObjectState& os,
564 OSDOp& osd_op,
565 ceph::os::Transaction& txn,
566 osd_op_params_t& osd_op_params)
567{
568 const ceph_osd_op& op = osd_op.op;
569 if (op.extent.length != osd_op.indata.length()) {
570 return crimson::ct_error::invarg::make();
571 }
572 maybe_create_new_object(os, txn);
573 if (op.extent.length) {
574 txn.write(coll->get_cid(), ghobject_t{os.oi.soid},
575 os.oi.size /* offset */, op.extent.length,
576 std::move(osd_op.indata), op.flags);
577 os.oi.size += op.extent.length;
578 osd_op_params.clean_regions.mark_data_region_dirty(os.oi.size,
579 op.extent.length);
9f95a23c
TL
580 }
581 return seastar::now();
582}
583
f67539c2
TL
584PGBackend::write_ertr::future<> PGBackend::truncate(
585 ObjectState& os,
586 const OSDOp& osd_op,
587 ceph::os::Transaction& txn,
588 osd_op_params_t& osd_op_params)
589{
590 if (!os.exists || os.oi.is_whiteout()) {
591 logger().debug("{} object dne, truncate is a no-op", __func__);
592 return write_ertr::now();
593 }
594 const ceph_osd_op& op = osd_op.op;
595 if (!is_offset_and_length_valid(op.extent.offset, op.extent.length)) {
596 return crimson::ct_error::file_too_large::make();
597 }
598 if (op.extent.truncate_seq) {
599 assert(op.extent.offset == op.extent.truncate_size);
600 if (op.extent.truncate_seq <= os.oi.truncate_seq) {
601 logger().debug("{} truncate seq {} <= current {}, no-op",
602 __func__, op.extent.truncate_seq, os.oi.truncate_seq);
603 return write_ertr::make_ready_future<>();
604 } else {
605 logger().debug("{} truncate seq {} > current {}, truncating",
606 __func__, op.extent.truncate_seq, os.oi.truncate_seq);
607 os.oi.truncate_seq = op.extent.truncate_seq;
608 os.oi.truncate_size = op.extent.truncate_size;
609 }
610 }
611 maybe_create_new_object(os, txn);
612 if (os.oi.size != op.extent.offset) {
613 txn.truncate(coll->get_cid(),
614 ghobject_t{os.oi.soid}, op.extent.offset);
615 if (os.oi.size > op.extent.offset) {
616 // TODO: modified_ranges.union_of(trim);
617 osd_op_params.clean_regions.mark_data_region_dirty(
618 op.extent.offset,
619 os.oi.size - op.extent.offset);
620 } else {
621 // os.oi.size < op.extent.offset
622 osd_op_params.clean_regions.mark_data_region_dirty(
623 os.oi.size,
624 op.extent.offset - os.oi.size);
625 }
626 os.oi.size = op.extent.offset;
627 os.oi.clear_data_digest();
628 }
629 // TODO: truncate_update_size_and_usage()
630 // TODO: ctx->delta_stats.num_wr++;
631 // ----
632 // do no set exists, or we will break above DELETE -> TRUNCATE munging.
633 return write_ertr::now();
634}
635
636PGBackend::write_ertr::future<> PGBackend::zero(
637 ObjectState& os,
638 const OSDOp& osd_op,
639 ceph::os::Transaction& txn,
640 osd_op_params_t& osd_op_params)
641{
642 if (!os.exists || os.oi.is_whiteout()) {
643 logger().debug("{} object dne, zero is a no-op", __func__);
644 return write_ertr::now();
645 }
646 const ceph_osd_op& op = osd_op.op;
647 if (!is_offset_and_length_valid(op.extent.offset, op.extent.length)) {
648 return crimson::ct_error::file_too_large::make();
649 }
650 assert(op.extent.length);
651 txn.zero(coll->get_cid(),
652 ghobject_t{os.oi.soid},
653 op.extent.offset,
654 op.extent.length);
655 // TODO: modified_ranges.union_of(zeroed);
656 osd_op_params.clean_regions.mark_data_region_dirty(op.extent.offset,
657 op.extent.length);
658 // TODO: ctx->delta_stats.num_wr++;
659 os.oi.clear_data_digest();
660 return write_ertr::now();
661}
662
9f95a23c
TL
663seastar::future<> PGBackend::create(
664 ObjectState& os,
665 const OSDOp& osd_op,
666 ceph::os::Transaction& txn)
667{
668 if (os.exists && !os.oi.is_whiteout() &&
669 (osd_op.op.flags & CEPH_OSD_OP_FLAG_EXCL)) {
670 // this is an exclusive create
671 throw crimson::osd::make_error(-EEXIST);
672 }
673
674 if (osd_op.indata.length()) {
675 // handle the legacy. `category` is no longer implemented.
676 try {
677 auto p = osd_op.indata.cbegin();
678 std::string category;
679 decode(category, p);
680 } catch (buffer::error&) {
681 throw crimson::osd::invalid_argument();
682 }
683 }
684 maybe_create_new_object(os, txn);
685 txn.nop();
686 return seastar::now();
687}
688
689seastar::future<> PGBackend::remove(ObjectState& os,
690 ceph::os::Transaction& txn)
691{
692 // todo: snapset
693 txn.remove(coll->get_cid(),
694 ghobject_t{os.oi.soid, ghobject_t::NO_GEN, shard});
695 os.oi.size = 0;
696 os.oi.new_object();
697 os.exists = false;
698 // todo: update watchers
699 if (os.oi.is_whiteout()) {
700 os.oi.clear_flag(object_info_t::FLAG_WHITEOUT);
701 }
702 return seastar::now();
703}
704
f67539c2 705seastar::future<std::tuple<std::vector<hobject_t>, hobject_t>>
9f95a23c
TL
706PGBackend::list_objects(const hobject_t& start, uint64_t limit) const
707{
f67539c2
TL
708 if (__builtin_expect(stopping, false)) {
709 throw crimson::common::system_shutdown_exception();
710 }
711
9f95a23c
TL
712 auto gstart = start.is_min() ? ghobject_t{} : ghobject_t{start, 0, shard};
713 return store->list_objects(coll,
714 gstart,
715 ghobject_t::get_max(),
716 limit)
f67539c2
TL
717 .then([](auto ret) {
718 auto& [gobjects, next] = ret;
9f95a23c
TL
719 std::vector<hobject_t> objects;
720 boost::copy(gobjects |
721 boost::adaptors::filtered([](const ghobject_t& o) {
722 if (o.is_pgmeta()) {
723 return false;
724 } else if (o.hobj.is_temp()) {
725 return false;
726 } else {
727 return o.is_no_gen();
728 }
729 }) |
730 boost::adaptors::transformed([](const ghobject_t& o) {
731 return o.hobj;
732 }),
733 std::back_inserter(objects));
f67539c2
TL
734 return seastar::make_ready_future<std::tuple<std::vector<hobject_t>, hobject_t>>(
735 std::make_tuple(objects, next.hobj));
9f95a23c
TL
736 });
737}
738
739seastar::future<> PGBackend::setxattr(
740 ObjectState& os,
741 const OSDOp& osd_op,
742 ceph::os::Transaction& txn)
743{
744 if (local_conf()->osd_max_attr_size > 0 &&
745 osd_op.op.xattr.value_len > local_conf()->osd_max_attr_size) {
746 throw crimson::osd::make_error(-EFBIG);
747 }
748
749 const auto max_name_len = std::min<uint64_t>(
750 store->get_max_attr_name_length(), local_conf()->osd_max_attr_name_len);
751 if (osd_op.op.xattr.name_len > max_name_len) {
752 throw crimson::osd::make_error(-ENAMETOOLONG);
753 }
754
755 maybe_create_new_object(os, txn);
756
f67539c2 757 std::string name{"_"};
9f95a23c
TL
758 ceph::bufferlist val;
759 {
760 auto bp = osd_op.indata.cbegin();
f67539c2 761 bp.copy(osd_op.op.xattr.name_len, name);
9f95a23c
TL
762 bp.copy(osd_op.op.xattr.value_len, val);
763 }
764 logger().debug("setxattr on obj={} for attr={}", os.oi.soid, name);
765
766 txn.setattr(coll->get_cid(), ghobject_t{os.oi.soid}, name, val);
767 return seastar::now();
768 //ctx->delta_stats.num_wr++;
769}
770
771PGBackend::get_attr_errorator::future<> PGBackend::getxattr(
772 const ObjectState& os,
773 OSDOp& osd_op) const
774{
775 std::string name;
776 ceph::bufferlist val;
777 {
778 auto bp = osd_op.indata.cbegin();
779 std::string aname;
780 bp.copy(osd_op.op.xattr.name_len, aname);
781 name = "_" + aname;
782 }
783 logger().debug("getxattr on obj={} for attr={}", os.oi.soid, name);
784 return getxattr(os.oi.soid, name).safe_then([&osd_op] (ceph::bufferptr val) {
785 osd_op.outdata.clear();
786 osd_op.outdata.push_back(std::move(val));
787 osd_op.op.xattr.value_len = osd_op.outdata.length();
788 return get_attr_errorator::now();
789 //ctx->delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10);
790 });
791 //ctx->delta_stats.num_rd++;
792}
793
794PGBackend::get_attr_errorator::future<ceph::bufferptr> PGBackend::getxattr(
795 const hobject_t& soid,
796 std::string_view key) const
797{
f67539c2
TL
798 if (__builtin_expect(stopping, false)) {
799 throw crimson::common::system_shutdown_exception();
800 }
801
9f95a23c
TL
802 return store->get_attr(coll, ghobject_t{soid}, key);
803}
804
f67539c2
TL
805PGBackend::get_attr_errorator::future<> PGBackend::get_xattrs(
806 const ObjectState& os,
807 OSDOp& osd_op) const
808{
809 if (__builtin_expect(stopping, false)) {
810 throw crimson::common::system_shutdown_exception();
811 }
812 return store->get_attrs(coll, ghobject_t{os.oi.soid}).safe_then(
813 [&osd_op](auto&& attrs) {
814 std::vector<std::pair<std::string, bufferlist>> user_xattrs;
815 for (auto& [key, val] : attrs) {
816 if (key.size() > 1 && key[0] == '_') {
817 ceph::bufferlist bl;
818 bl.append(std::move(val));
819 user_xattrs.emplace_back(key.substr(1), std::move(bl));
820 }
821 }
822 ceph::encode(user_xattrs, osd_op.outdata);
823 return get_attr_errorator::now();
824 });
825}
826
827PGBackend::rm_xattr_ertr::future<> PGBackend::rm_xattr(
828 ObjectState& os,
829 const OSDOp& osd_op,
830 ceph::os::Transaction& txn)
831{
832 if (__builtin_expect(stopping, false)) {
833 throw crimson::common::system_shutdown_exception();
834 }
835 if (!os.exists || os.oi.is_whiteout()) {
836 logger().debug("{}: {} DNE", __func__, os.oi.soid);
837 return crimson::ct_error::enoent::make();
838 }
839 auto bp = osd_op.indata.cbegin();
840 string attr_name{"_"};
841 bp.copy(osd_op.op.xattr.name_len, attr_name);
842 txn.rmattr(coll->get_cid(), ghobject_t{os.oi.soid}, attr_name);
843 return rm_xattr_ertr::now();
844}
845
846using get_omap_ertr =
847 crimson::os::FuturizedStore::read_errorator::extend<
848 crimson::ct_error::enodata>;
849static
850get_omap_ertr::future<
851 crimson::os::FuturizedStore::omap_values_t>
9f95a23c
TL
852maybe_get_omap_vals_by_keys(
853 crimson::os::FuturizedStore* store,
854 const crimson::os::CollectionRef& coll,
855 const object_info_t& oi,
856 const std::set<std::string>& keys_to_get)
857{
858 if (oi.is_omap()) {
859 return store->omap_get_values(coll, ghobject_t{oi.soid}, keys_to_get);
860 } else {
f67539c2 861 return crimson::ct_error::enodata::make();
9f95a23c
TL
862 }
863}
864
f67539c2
TL
865static
866get_omap_ertr::future<
867 std::tuple<bool, crimson::os::FuturizedStore::omap_values_t>>
9f95a23c
TL
868maybe_get_omap_vals(
869 crimson::os::FuturizedStore* store,
870 const crimson::os::CollectionRef& coll,
871 const object_info_t& oi,
872 const std::string& start_after)
873{
874 if (oi.is_omap()) {
875 return store->omap_get_values(coll, ghobject_t{oi.soid}, start_after);
876 } else {
f67539c2 877 return crimson::ct_error::enodata::make();
9f95a23c
TL
878 }
879}
880
f67539c2
TL
881PGBackend::ll_read_errorator::future<ceph::bufferlist>
882PGBackend::omap_get_header(
883 const crimson::os::CollectionRef& c,
884 const ghobject_t& oid) const
885{
886 return store->omap_get_header(c, oid);
887}
888
889PGBackend::ll_read_errorator::future<>
890PGBackend::omap_get_header(
891 const ObjectState& os,
892 OSDOp& osd_op) const
893{
894 return omap_get_header(coll, ghobject_t{os.oi.soid}).safe_then(
895 [&osd_op] (ceph::bufferlist&& header) {
896 osd_op.outdata = std::move(header);
897 return seastar::now();
898 });
899}
900
901PGBackend::ll_read_errorator::future<>
902PGBackend::omap_get_keys(
9f95a23c
TL
903 const ObjectState& os,
904 OSDOp& osd_op) const
905{
f67539c2
TL
906 if (__builtin_expect(stopping, false)) {
907 throw crimson::common::system_shutdown_exception();
908 }
909 if (!os.exists || os.oi.is_whiteout()) {
910 logger().debug("{}: object does not exist: {}", os.oi.soid);
911 return crimson::ct_error::enoent::make();
912 }
9f95a23c
TL
913 std::string start_after;
914 uint64_t max_return;
915 try {
916 auto p = osd_op.indata.cbegin();
917 decode(start_after, p);
918 decode(max_return, p);
919 } catch (buffer::error&) {
920 throw crimson::osd::invalid_argument{};
921 }
922 max_return =
923 std::min(max_return, local_conf()->osd_max_omap_entries_per_request);
924
925 // TODO: truly chunk the reading
f67539c2
TL
926 return maybe_get_omap_vals(store, coll, os.oi, start_after).safe_then(
927 [=, &osd_op](auto ret) {
9f95a23c
TL
928 ceph::bufferlist result;
929 bool truncated = false;
930 uint32_t num = 0;
f67539c2
TL
931 for (auto &[key, val] : std::get<1>(ret)) {
932 if (num >= max_return ||
9f95a23c
TL
933 result.length() >= local_conf()->osd_max_omap_bytes_per_request) {
934 truncated = true;
935 break;
936 }
937 encode(key, result);
f67539c2 938 ++num;
9f95a23c
TL
939 }
940 encode(num, osd_op.outdata);
941 osd_op.outdata.claim_append(result);
942 encode(truncated, osd_op.outdata);
943 return seastar::now();
f67539c2
TL
944 }).handle_error(
945 crimson::ct_error::enodata::handle([&osd_op] {
946 uint32_t num = 0;
947 bool truncated = false;
948 encode(num, osd_op.outdata);
949 encode(truncated, osd_op.outdata);
950 return seastar::now();
951 }),
952 ll_read_errorator::pass_further{}
953 );
9f95a23c
TL
954 // TODO:
955 //ctx->delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10);
956 //ctx->delta_stats.num_rd++;
957}
958
f67539c2
TL
959PGBackend::ll_read_errorator::future<>
960PGBackend::omap_get_vals(
9f95a23c
TL
961 const ObjectState& os,
962 OSDOp& osd_op) const
963{
f67539c2
TL
964 if (__builtin_expect(stopping, false)) {
965 throw crimson::common::system_shutdown_exception();
966 }
967
9f95a23c
TL
968 std::string start_after;
969 uint64_t max_return;
970 std::string filter_prefix;
971 try {
972 auto p = osd_op.indata.cbegin();
973 decode(start_after, p);
974 decode(max_return, p);
975 decode(filter_prefix, p);
976 } catch (buffer::error&) {
977 throw crimson::osd::invalid_argument{};
978 }
979
980 max_return = \
981 std::min(max_return, local_conf()->osd_max_omap_entries_per_request);
982
983 // TODO: truly chunk the reading
f67539c2
TL
984 return maybe_get_omap_vals(store, coll, os.oi, start_after).safe_then(
985 [=, &osd_op] (auto&& ret) {
986 auto [done, vals] = std::move(ret);
9f95a23c
TL
987 assert(done);
988 ceph::bufferlist result;
989 bool truncated = false;
990 uint32_t num = 0;
991 auto iter = filter_prefix > start_after ? vals.lower_bound(filter_prefix)
992 : std::begin(vals);
993 for (; iter != std::end(vals); ++iter) {
994 const auto& [key, value] = *iter;
995 if (key.substr(0, filter_prefix.size()) != filter_prefix) {
996 break;
f67539c2 997 } else if (num >= max_return ||
9f95a23c
TL
998 result.length() >= local_conf()->osd_max_omap_bytes_per_request) {
999 truncated = true;
1000 break;
1001 }
1002 encode(key, result);
1003 encode(value, result);
f67539c2 1004 ++num;
9f95a23c
TL
1005 }
1006 encode(num, osd_op.outdata);
1007 osd_op.outdata.claim_append(result);
1008 encode(truncated, osd_op.outdata);
f67539c2
TL
1009 return ll_read_errorator::now();
1010 }).handle_error(
1011 crimson::ct_error::enodata::handle([&osd_op] {
1012 encode(uint32_t{0} /* num */, osd_op.outdata);
1013 encode(bool{false} /* truncated */, osd_op.outdata);
1014 return ll_read_errorator::now();
1015 }),
1016 ll_read_errorator::pass_further{}
1017 );
9f95a23c
TL
1018
1019 // TODO:
1020 //ctx->delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10);
1021 //ctx->delta_stats.num_rd++;
1022}
f67539c2
TL
1023
1024PGBackend::ll_read_errorator::future<>
1025PGBackend::omap_get_vals_by_keys(
9f95a23c
TL
1026 const ObjectState& os,
1027 OSDOp& osd_op) const
1028{
f67539c2
TL
1029 if (__builtin_expect(stopping, false)) {
1030 throw crimson::common::system_shutdown_exception();
1031 }
1032 if (!os.exists || os.oi.is_whiteout()) {
1033 logger().debug("{}: object does not exist: {}", os.oi.soid);
1034 return crimson::ct_error::enoent::make();
1035 }
1036
9f95a23c
TL
1037 std::set<std::string> keys_to_get;
1038 try {
1039 auto p = osd_op.indata.cbegin();
1040 decode(keys_to_get, p);
1041 } catch (buffer::error&) {
1042 throw crimson::osd::invalid_argument();
1043 }
f67539c2
TL
1044 return maybe_get_omap_vals_by_keys(store, coll, os.oi, keys_to_get).safe_then(
1045 [&osd_op] (crimson::os::FuturizedStore::omap_values_t&& vals) {
9f95a23c 1046 encode(vals, osd_op.outdata);
f67539c2
TL
1047 return ll_read_errorator::now();
1048 }).handle_error(
1049 crimson::ct_error::enodata::handle([&osd_op] {
1050 uint32_t num = 0;
1051 encode(num, osd_op.outdata);
1052 return ll_read_errorator::now();
1053 }),
1054 ll_read_errorator::pass_further{}
1055 );
9f95a23c
TL
1056
1057 // TODO:
1058 //ctx->delta_stats.num_rd_kb += shift_round_up(osd_op.outdata.length(), 10);
1059 //ctx->delta_stats.num_rd++;
1060}
1061
1062seastar::future<> PGBackend::omap_set_vals(
1063 ObjectState& os,
1064 const OSDOp& osd_op,
f67539c2
TL
1065 ceph::os::Transaction& txn,
1066 osd_op_params_t& osd_op_params)
9f95a23c
TL
1067{
1068 maybe_create_new_object(os, txn);
1069
1070 ceph::bufferlist to_set_bl;
1071 try {
1072 auto p = osd_op.indata.cbegin();
1073 decode_str_str_map_to_bl(p, &to_set_bl);
1074 } catch (buffer::error&) {
1075 throw crimson::osd::invalid_argument{};
1076 }
1077
1078 txn.omap_setkeys(coll->get_cid(), ghobject_t{os.oi.soid}, to_set_bl);
1079
1080 // TODO:
1081 //ctx->clean_regions.mark_omap_dirty();
1082
1083 // TODO:
1084 //ctx->delta_stats.num_wr++;
1085 //ctx->delta_stats.num_wr_kb += shift_round_up(to_set_bl.length(), 10);
1086 os.oi.set_flag(object_info_t::FLAG_OMAP);
1087 os.oi.clear_omap_digest();
f67539c2
TL
1088 osd_op_params.clean_regions.mark_omap_dirty();
1089 return seastar::now();
1090}
1091
1092seastar::future<> PGBackend::omap_set_header(
1093 ObjectState& os,
1094 const OSDOp& osd_op,
1095 ceph::os::Transaction& txn)
1096{
1097 maybe_create_new_object(os, txn);
1098 txn.omap_setheader(coll->get_cid(), ghobject_t{os.oi.soid}, osd_op.indata);
1099 //TODO:
1100 //ctx->clean_regions.mark_omap_dirty();
1101 //ctx->delta_stats.num_wr++;
1102 os.oi.set_flag(object_info_t::FLAG_OMAP);
1103 os.oi.clear_omap_digest();
1104 return seastar::now();
1105}
1106
1107seastar::future<> PGBackend::omap_remove_range(
1108 ObjectState& os,
1109 const OSDOp& osd_op,
1110 ceph::os::Transaction& txn)
1111{
1112 std::string key_begin, key_end;
1113 try {
1114 auto p = osd_op.indata.cbegin();
1115 decode(key_begin, p);
1116 decode(key_end, p);
1117 } catch (buffer::error& e) {
1118 throw crimson::osd::invalid_argument{};
1119 }
1120 txn.omap_rmkeyrange(coll->get_cid(), ghobject_t{os.oi.soid}, key_begin, key_end);
1121 //TODO:
1122 //ctx->delta_stats.num_wr++;
1123 os.oi.clear_omap_digest();
9f95a23c
TL
1124 return seastar::now();
1125}
f67539c2
TL
1126
1127PGBackend::omap_clear_ertr::future<>
1128PGBackend::omap_clear(
1129 ObjectState& os,
1130 OSDOp& osd_op,
1131 ceph::os::Transaction& txn,
1132 osd_op_params_t& osd_op_params)
1133{
1134 if (__builtin_expect(stopping, false)) {
1135 throw crimson::common::system_shutdown_exception();
1136 }
1137 if (!os.exists || os.oi.is_whiteout()) {
1138 logger().debug("{}: object does not exist: {}", os.oi.soid);
1139 return crimson::ct_error::enoent::make();
1140 }
1141 if (!os.oi.is_omap()) {
1142 return omap_clear_ertr::now();
1143 }
1144 txn.omap_clear(coll->get_cid(), ghobject_t{os.oi.soid});
1145 osd_op_params.clean_regions.mark_omap_dirty();
1146 os.oi.clear_omap_digest();
1147 os.oi.clear_flag(object_info_t::FLAG_OMAP);
1148 return omap_clear_ertr::now();
1149}
1150
1151seastar::future<struct stat> PGBackend::stat(
1152 CollectionRef c,
1153 const ghobject_t& oid) const
1154{
1155 return store->stat(c, oid);
1156}
1157
1158seastar::future<std::map<uint64_t, uint64_t>>
1159PGBackend::fiemap(
1160 CollectionRef c,
1161 const ghobject_t& oid,
1162 uint64_t off,
1163 uint64_t len)
1164{
1165 return store->fiemap(c, oid, off, len);
1166}
1167
1168void PGBackend::on_activate_complete() {
1169 peering.reset();
1170}
1171