]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/os/seastore/seastore.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / crimson / os / seastore / seastore.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "seastore.h"
5
6 #include <algorithm>
7
8 #include <boost/algorithm/string/trim.hpp>
9 #include <fmt/format.h>
10 #include <fmt/ostream.h>
11
12 #include <seastar/core/file.hh>
13 #include <seastar/core/fstream.hh>
14 #include <seastar/core/metrics.hh>
15 #include <seastar/core/shared_mutex.hh>
16
17 #include "common/safe_io.h"
18 #include "include/stringify.h"
19 #include "os/Transaction.h"
20
21 #include "crimson/common/buffer_io.h"
22
23 #include "crimson/os/futurized_collection.h"
24
25 #include "crimson/os/seastore/backref_manager.h"
26 #include "crimson/os/seastore/async_cleaner.h"
27 #include "crimson/os/seastore/collection_manager/flat_collection_manager.h"
28 #include "crimson/os/seastore/onode_manager/staged-fltree/fltree_onode_manager.h"
29 #include "crimson/os/seastore/omap_manager/btree/btree_omap_manager.h"
30 #include "crimson/os/seastore/onode_manager.h"
31 #include "crimson/os/seastore/object_data_handler.h"
32
33
34 using std::string;
35 using crimson::common::local_conf;
36
37 template <> struct fmt::formatter<crimson::os::seastore::op_type_t>
38 : fmt::formatter<std::string_view> {
39 using op_type_t = crimson::os::seastore::op_type_t;
40 // parse is inherited from formatter<string_view>.
41 template <typename FormatContext>
42 auto format(op_type_t op, FormatContext& ctx) {
43 std::string_view name = "unknown";
44 switch (op) {
45 case op_type_t::TRANSACTION:
46 name = "transaction";
47 break;
48 case op_type_t::READ:
49 name = "read";
50 break;
51 case op_type_t::WRITE:
52 name = "write";
53 break;
54 case op_type_t::GET_ATTR:
55 name = "get_attr";
56 break;
57 case op_type_t::GET_ATTRS:
58 name = "get_attrs";
59 break;
60 case op_type_t::STAT:
61 name = "stat";
62 break;
63 case op_type_t::OMAP_GET_VALUES:
64 name = "omap_get_values";
65 break;
66 case op_type_t::OMAP_LIST:
67 name = "omap_list";
68 break;
69 case op_type_t::MAX:
70 name = "unknown";
71 break;
72 }
73 return formatter<string_view>::format(name, ctx);
74 }
75 };
76
77 SET_SUBSYS(seastore);
78
79 namespace crimson::os::seastore {
80
81 class FileMDStore final : public SeaStore::MDStore {
82 std::string root;
83 public:
84 FileMDStore(const std::string& root) : root(root) {}
85
86 write_meta_ret write_meta(
87 const std::string& key, const std::string& value) final {
88 std::string path = fmt::format("{}/{}", root, key);
89 ceph::bufferlist bl;
90 bl.append(value + "\n");
91 return crimson::write_file(std::move(bl), path);
92 }
93
94 read_meta_ret read_meta(const std::string& key) final {
95 std::string path = fmt::format("{}/{}", root, key);
96 return seastar::file_exists(
97 path
98 ).then([path] (bool exist) {
99 if (exist) {
100 return crimson::read_file(path)
101 .then([] (auto tmp_buf) {
102 std::string v = {tmp_buf.get(), tmp_buf.size()};
103 std::size_t pos = v.find("\n");
104 std::string str = v.substr(0, pos);
105 return seastar::make_ready_future<std::optional<std::string>>(str);
106 });
107 } else {
108 return seastar::make_ready_future<std::optional<std::string>>(std::nullopt);
109 }
110 });
111 }
112 };
113
114 using crimson::common::get_conf;
115
116 SeaStore::Shard::Shard(
117 std::string root,
118 Device* dev,
119 bool is_test)
120 :root(root),
121 max_object_size(
122 get_conf<uint64_t>("seastore_default_max_object_size")),
123 is_test(is_test),
124 throttler(
125 get_conf<uint64_t>("seastore_max_concurrent_transactions"))
126 {
127 device = &(dev->get_sharded_device());
128 register_metrics();
129 }
130
131 SeaStore::SeaStore(
132 const std::string& root,
133 MDStoreRef mdstore)
134 : root(root),
135 mdstore(std::move(mdstore))
136 {
137 }
138
139 SeaStore::~SeaStore() = default;
140
141 void SeaStore::Shard::register_metrics()
142 {
143 namespace sm = seastar::metrics;
144 using op_type_t = crimson::os::seastore::op_type_t;
145 std::pair<op_type_t, sm::label_instance> labels_by_op_type[] = {
146 {op_type_t::TRANSACTION, sm::label_instance("latency", "TRANSACTION")},
147 {op_type_t::READ, sm::label_instance("latency", "READ")},
148 {op_type_t::WRITE, sm::label_instance("latency", "WRITE")},
149 {op_type_t::GET_ATTR, sm::label_instance("latency", "GET_ATTR")},
150 {op_type_t::GET_ATTRS, sm::label_instance("latency", "GET_ATTRS")},
151 {op_type_t::STAT, sm::label_instance("latency", "STAT")},
152 {op_type_t::OMAP_GET_VALUES, sm::label_instance("latency", "OMAP_GET_VALUES")},
153 {op_type_t::OMAP_LIST, sm::label_instance("latency", "OMAP_LIST")},
154 };
155
156 for (auto& [op_type, label] : labels_by_op_type) {
157 auto desc = fmt::format("latency of seastore operation (optype={})",
158 op_type);
159 metrics.add_group(
160 "seastore",
161 {
162 sm::make_histogram(
163 "op_lat", [this, op_type=op_type] {
164 return get_latency(op_type);
165 },
166 sm::description(desc),
167 {label}
168 ),
169 }
170 );
171 }
172
173 metrics.add_group(
174 "seastore",
175 {
176 sm::make_gauge(
177 "concurrent_transactions",
178 [this] {
179 return throttler.get_current();
180 },
181 sm::description("transactions that are running inside seastore")
182 ),
183 sm::make_gauge(
184 "pending_transactions",
185 [this] {
186 return throttler.get_pending();
187 },
188 sm::description("transactions waiting to get "
189 "through seastore's throttler")
190 )
191 }
192 );
193 }
194
195 seastar::future<> SeaStore::start()
196 {
197 ceph_assert(seastar::this_shard_id() == primary_core);
198 #ifndef NDEBUG
199 bool is_test = true;
200 #else
201 bool is_test = false;
202 #endif
203 using crimson::common::get_conf;
204 std::string type = get_conf<std::string>("seastore_main_device_type");
205 device_type_t d_type = string_to_device_type(type);
206 assert(d_type == device_type_t::SSD ||
207 d_type == device_type_t::RANDOM_BLOCK_SSD);
208
209 ceph_assert(root != "");
210 return Device::make_device(root, d_type
211 ).then([this](DeviceRef device_obj) {
212 device = std::move(device_obj);
213 return device->start();
214 }).then([this, is_test] {
215 ceph_assert(device);
216 return shard_stores.start(root, device.get(), is_test);
217 });
218 }
219
220 seastar::future<> SeaStore::test_start(DeviceRef device_obj)
221 {
222 ceph_assert(device_obj);
223 ceph_assert(root == "");
224 device = std::move(device_obj);
225 return shard_stores.start_single(root, device.get(), true);
226 }
227
228 seastar::future<> SeaStore::stop()
229 {
230 ceph_assert(seastar::this_shard_id() == primary_core);
231 return seastar::do_for_each(secondaries, [](auto& sec_dev) {
232 return sec_dev->stop();
233 }).then([this] {
234 secondaries.clear();
235 if (device) {
236 return device->stop();
237 } else {
238 return seastar::now();
239 }
240 }).then([this] {
241 return shard_stores.stop();
242 });
243 }
244
245 SeaStore::mount_ertr::future<> SeaStore::test_mount()
246 {
247 ceph_assert(seastar::this_shard_id() == primary_core);
248 return shard_stores.local().mount_managers();
249 }
250
251 SeaStore::mount_ertr::future<> SeaStore::mount()
252 {
253 ceph_assert(seastar::this_shard_id() == primary_core);
254 return device->mount(
255 ).safe_then([this] {
256 auto sec_devices = device->get_sharded_device().get_secondary_devices();
257 return crimson::do_for_each(sec_devices, [this](auto& device_entry) {
258 device_id_t id = device_entry.first;
259 magic_t magic = device_entry.second.magic;
260 device_type_t dtype = device_entry.second.dtype;
261 std::string path =
262 fmt::format("{}/block.{}.{}", root, dtype, std::to_string(id));
263 return Device::make_device(path, dtype
264 ).then([this, path, magic](DeviceRef sec_dev) {
265 return sec_dev->start(
266 ).then([this, magic, sec_dev = std::move(sec_dev)]() mutable {
267 return sec_dev->mount(
268 ).safe_then([this, sec_dev=std::move(sec_dev), magic]() mutable {
269 boost::ignore_unused(magic); // avoid clang warning;
270 assert(sec_dev->get_sharded_device().get_magic() == magic);
271 secondaries.emplace_back(std::move(sec_dev));
272 });
273 }).safe_then([this] {
274 return set_secondaries();
275 });
276 });
277 }).safe_then([this] {
278 return shard_stores.invoke_on_all([](auto &local_store) {
279 return local_store.mount_managers();
280 });
281 });
282 }).handle_error(
283 crimson::ct_error::assert_all{
284 "Invalid error in SeaStore::mount"
285 }
286 );
287 }
288
289 seastar::future<> SeaStore::Shard::mount_managers()
290 {
291 init_managers();
292 return transaction_manager->mount(
293 ).handle_error(
294 crimson::ct_error::assert_all{
295 "Invalid error in mount_managers"
296 });
297 }
298
299 seastar::future<> SeaStore::umount()
300 {
301 ceph_assert(seastar::this_shard_id() == primary_core);
302 return shard_stores.invoke_on_all([](auto &local_store) {
303 return local_store.umount();
304 });
305 }
306
307 seastar::future<> SeaStore::Shard::umount()
308 {
309 return [this] {
310 if (transaction_manager) {
311 return transaction_manager->close();
312 } else {
313 return TransactionManager::close_ertr::now();
314 }
315 }().safe_then([this] {
316 return crimson::do_for_each(
317 secondaries,
318 [](auto& sec_dev) -> SegmentManager::close_ertr::future<>
319 {
320 return sec_dev->close();
321 });
322 }).safe_then([this] {
323 return device->close();
324 }).safe_then([this] {
325 secondaries.clear();
326 transaction_manager.reset();
327 collection_manager.reset();
328 onode_manager.reset();
329 }).handle_error(
330 crimson::ct_error::assert_all{
331 "Invalid error in SeaStore::umount"
332 }
333 );
334 }
335
336 seastar::future<> SeaStore::write_fsid(uuid_d new_osd_fsid)
337 {
338 ceph_assert(seastar::this_shard_id() == primary_core);
339 LOG_PREFIX(SeaStore::write_fsid);
340 return read_meta("fsid").then([this, FNAME, new_osd_fsid] (auto tuple) {
341 auto [ret, fsid] = tuple;
342 std::string str_fsid = stringify(new_osd_fsid);
343 if (ret == -1) {
344 return write_meta("fsid", stringify(new_osd_fsid));
345 } else if (ret == 0 && fsid != str_fsid) {
346 ERROR("on-disk fsid {} != provided {}",
347 fsid, stringify(new_osd_fsid));
348 throw std::runtime_error("store fsid error");
349 } else {
350 return seastar::now();
351 }
352 });
353 }
354
355 seastar::future<>
356 SeaStore::Shard::mkfs_managers()
357 {
358 init_managers();
359 return transaction_manager->mkfs(
360 ).safe_then([this] {
361 init_managers();
362 return transaction_manager->mount();
363 }).safe_then([this] {
364 return repeat_eagain([this] {
365 return transaction_manager->with_transaction_intr(
366 Transaction::src_t::MUTATE,
367 "mkfs_seastore",
368 [this](auto& t)
369 {
370 return onode_manager->mkfs(t
371 ).si_then([this, &t] {
372 return collection_manager->mkfs(t);
373 }).si_then([this, &t](auto coll_root) {
374 transaction_manager->write_collection_root(
375 t, coll_root);
376 return transaction_manager->submit_transaction(t);
377 });
378 });
379 });
380 }).handle_error(
381 crimson::ct_error::assert_all{
382 "Invalid error in Shard::mkfs_managers"
383 }
384 );
385 }
386
387 seastar::future<> SeaStore::set_secondaries()
388 {
389 auto sec_dev_ite = secondaries.rbegin();
390 Device* sec_dev = sec_dev_ite->get();
391 return shard_stores.invoke_on_all([sec_dev](auto &local_store) {
392 local_store.set_secondaries(sec_dev->get_sharded_device());
393 });
394 }
395
396 SeaStore::mkfs_ertr::future<> SeaStore::test_mkfs(uuid_d new_osd_fsid)
397 {
398 ceph_assert(seastar::this_shard_id() == primary_core);
399 return read_meta("mkfs_done").then([this, new_osd_fsid] (auto tuple) {
400 auto [done, value] = tuple;
401 if (done == 0) {
402 return seastar::now();
403 }
404 return shard_stores.local().mkfs_managers(
405 ).then([this, new_osd_fsid] {
406 return prepare_meta(new_osd_fsid);
407 });
408 });
409 }
410
411 seastar::future<> SeaStore::prepare_meta(uuid_d new_osd_fsid)
412 {
413 ceph_assert(seastar::this_shard_id() == primary_core);
414 return write_fsid(new_osd_fsid).then([this] {
415 return read_meta("type").then([this] (auto tuple) {
416 auto [ret, type] = tuple;
417 if (ret == 0 && type == "seastore") {
418 return seastar::now();
419 } else if (ret == 0 && type != "seastore") {
420 LOG_PREFIX(SeaStore::prepare_meta);
421 ERROR("expected seastore, but type is {}", type);
422 throw std::runtime_error("store type error");
423 } else {
424 return write_meta("type", "seastore");
425 }
426 });
427 }).then([this] {
428 return write_meta("mkfs_done", "yes");
429 });
430 }
431
432 SeaStore::mkfs_ertr::future<> SeaStore::mkfs(uuid_d new_osd_fsid)
433 {
434 ceph_assert(seastar::this_shard_id() == primary_core);
435 return read_meta("mkfs_done").then([this, new_osd_fsid] (auto tuple) {
436 auto [done, value] = tuple;
437 if (done == 0) {
438 return seastar::now();
439 } else {
440 return seastar::do_with(
441 secondary_device_set_t(),
442 [this, new_osd_fsid](auto& sds) {
443 auto fut = seastar::now();
444 LOG_PREFIX(SeaStore::mkfs);
445 DEBUG("root: {}", root);
446 if (!root.empty()) {
447 fut = seastar::open_directory(root
448 ).then([this, &sds, new_osd_fsid](seastar::file rdir) mutable {
449 std::unique_ptr<seastar::file> root_f =
450 std::make_unique<seastar::file>(std::move(rdir));
451 auto sub = root_f->list_directory(
452 [this, &sds, new_osd_fsid](auto de) mutable -> seastar::future<>
453 {
454 LOG_PREFIX(SeaStore::mkfs);
455 DEBUG("found file: {}", de.name);
456 if (de.name.find("block.") == 0
457 && de.name.length() > 6 /* 6 for "block." */) {
458 std::string entry_name = de.name;
459 auto dtype_end = entry_name.find_first_of('.', 6);
460 device_type_t dtype =
461 string_to_device_type(
462 entry_name.substr(6, dtype_end - 6));
463 if (dtype == device_type_t::NONE) {
464 // invalid device type
465 return seastar::now();
466 }
467 auto id = std::stoi(entry_name.substr(dtype_end + 1));
468 std::string path = fmt::format("{}/{}", root, entry_name);
469 return Device::make_device(path, dtype
470 ).then([this, &sds, id, dtype, new_osd_fsid](DeviceRef sec_dev) {
471 auto p_sec_dev = sec_dev.get();
472 secondaries.emplace_back(std::move(sec_dev));
473 return p_sec_dev->start(
474 ).then([&sds, id, dtype, new_osd_fsid, p_sec_dev]() {
475 magic_t magic = (magic_t)std::rand();
476 sds.emplace(
477 (device_id_t)id,
478 device_spec_t{magic, dtype, (device_id_t)id});
479 return p_sec_dev->mkfs(device_config_t::create_secondary(
480 new_osd_fsid, id, dtype, magic)
481 ).handle_error(crimson::ct_error::assert_all{"not possible"});
482 });
483 }).then([this] {
484 return set_secondaries();
485 });
486 }
487 return seastar::now();
488 });
489 return sub.done().then([root_f=std::move(root_f)] {});
490 });
491 }
492 return fut.then([this, &sds, new_osd_fsid] {
493 device_id_t id = 0;
494 device_type_t d_type = device->get_device_type();
495 assert(d_type == device_type_t::SSD ||
496 d_type == device_type_t::RANDOM_BLOCK_SSD);
497 if (d_type == device_type_t::RANDOM_BLOCK_SSD) {
498 id = static_cast<device_id_t>(DEVICE_ID_RANDOM_BLOCK_MIN);
499 }
500
501 return device->mkfs(
502 device_config_t::create_primary(new_osd_fsid, id, d_type, sds)
503 );
504 }).safe_then([this] {
505 return crimson::do_for_each(secondaries, [](auto& sec_dev) {
506 return sec_dev->mount();
507 });
508 });
509 }).safe_then([this] {
510 return device->mount();
511 }).safe_then([this] {
512 return shard_stores.invoke_on_all([] (auto &local_store) {
513 return local_store.mkfs_managers();
514 });
515 }).safe_then([this, new_osd_fsid] {
516 return prepare_meta(new_osd_fsid);
517 }).safe_then([this] {
518 return umount();
519 }).handle_error(
520 crimson::ct_error::assert_all{
521 "Invalid error in SeaStore::mkfs"
522 }
523 );
524 }
525 });
526 }
527
528 using coll_core_t = FuturizedStore::coll_core_t;
529 seastar::future<std::vector<coll_core_t>>
530 SeaStore::list_collections()
531 {
532 ceph_assert(seastar::this_shard_id() == primary_core);
533 return shard_stores.map([](auto &local_store) {
534 return local_store.list_collections();
535 }).then([](std::vector<std::vector<coll_core_t>> results) {
536 std::vector<coll_core_t> collections;
537 for (auto& colls : results) {
538 collections.insert(collections.end(), colls.begin(), colls.end());
539 }
540 return seastar::make_ready_future<std::vector<coll_core_t>>(
541 std::move(collections));
542 });
543 }
544
545 store_statfs_t SeaStore::Shard::stat() const
546 {
547 return transaction_manager->store_stat();
548 }
549
550 seastar::future<store_statfs_t> SeaStore::stat() const
551 {
552 ceph_assert(seastar::this_shard_id() == primary_core);
553 LOG_PREFIX(SeaStore::stat);
554 DEBUG("");
555 return shard_stores.map_reduce0(
556 [](const SeaStore::Shard &local_store) {
557 return local_store.stat();
558 },
559 store_statfs_t(),
560 [](auto &&ss, auto &&ret) {
561 ss.add(ret);
562 return std::move(ss);
563 }
564 ).then([](store_statfs_t ss) {
565 return seastar::make_ready_future<store_statfs_t>(std::move(ss));
566 });
567 }
568
569 TransactionManager::read_extent_iertr::future<std::optional<unsigned>>
570 SeaStore::Shard::get_coll_bits(CollectionRef ch, Transaction &t) const
571 {
572 return transaction_manager->read_collection_root(t)
573 .si_then([this, ch, &t](auto coll_root) {
574 return collection_manager->list(coll_root, t);
575 }).si_then([ch](auto colls) {
576 auto it = std::find_if(colls.begin(), colls.end(),
577 [ch](const std::pair<coll_t, coll_info_t>& element) {
578 return element.first == ch->get_cid();
579 });
580 if (it != colls.end()) {
581 return TransactionManager::read_extent_iertr::make_ready_future<
582 std::optional<unsigned>>(it->second.split_bits);
583 } else {
584 return TransactionManager::read_extent_iertr::make_ready_future<
585 std::optional<unsigned>>(std::nullopt);
586 }
587 });
588 }
589
590 col_obj_ranges_t
591 SeaStore::get_objs_range(CollectionRef ch, unsigned bits)
592 {
593 col_obj_ranges_t obj_ranges;
594 spg_t pgid;
595 constexpr uint32_t MAX_HASH = std::numeric_limits<uint32_t>::max();
596 const std::string_view MAX_NSPACE = "\xff";
597 if (ch->get_cid().is_pg(&pgid)) {
598 obj_ranges.obj_begin.shard_id = pgid.shard;
599 obj_ranges.temp_begin = obj_ranges.obj_begin;
600
601 obj_ranges.obj_begin.hobj.pool = pgid.pool();
602 obj_ranges.temp_begin.hobj.pool = -2ll - pgid.pool();
603
604 obj_ranges.obj_end = obj_ranges.obj_begin;
605 obj_ranges.temp_end = obj_ranges.temp_begin;
606
607 uint32_t reverse_hash = hobject_t::_reverse_bits(pgid.ps());
608 obj_ranges.obj_begin.hobj.set_bitwise_key_u32(reverse_hash);
609 obj_ranges.temp_begin.hobj.set_bitwise_key_u32(reverse_hash);
610
611 uint64_t end_hash = reverse_hash + (1ull << (32 - bits));
612 if (end_hash > MAX_HASH) {
613 // make sure end hobj is even greater than the maximum possible hobj
614 obj_ranges.obj_end.hobj.set_bitwise_key_u32(MAX_HASH);
615 obj_ranges.temp_end.hobj.set_bitwise_key_u32(MAX_HASH);
616 obj_ranges.obj_end.hobj.nspace = MAX_NSPACE;
617 } else {
618 obj_ranges.obj_end.hobj.set_bitwise_key_u32(end_hash);
619 obj_ranges.temp_end.hobj.set_bitwise_key_u32(end_hash);
620 }
621 } else {
622 obj_ranges.obj_begin.shard_id = shard_id_t::NO_SHARD;
623 obj_ranges.obj_begin.hobj.pool = -1ull;
624
625 obj_ranges.obj_end = obj_ranges.obj_begin;
626 obj_ranges.obj_begin.hobj.set_bitwise_key_u32(0);
627 obj_ranges.obj_end.hobj.set_bitwise_key_u32(MAX_HASH);
628 obj_ranges.obj_end.hobj.nspace = MAX_NSPACE;
629 // no separate temp section
630 obj_ranges.temp_begin = obj_ranges.obj_end;
631 obj_ranges.temp_end = obj_ranges.obj_end;
632 }
633
634 obj_ranges.obj_begin.generation = 0;
635 obj_ranges.obj_end.generation = 0;
636 obj_ranges.temp_begin.generation = 0;
637 obj_ranges.temp_end.generation = 0;
638 return obj_ranges;
639 }
640
641 static std::list<std::pair<ghobject_t, ghobject_t>>
642 get_ranges(CollectionRef ch,
643 ghobject_t start,
644 ghobject_t end,
645 col_obj_ranges_t obj_ranges)
646 {
647 ceph_assert(start <= end);
648 std::list<std::pair<ghobject_t, ghobject_t>> ranges;
649 if (start < obj_ranges.temp_end) {
650 ranges.emplace_back(
651 std::max(obj_ranges.temp_begin, start),
652 std::min(obj_ranges.temp_end, end));
653 }
654 if (end > obj_ranges.obj_begin) {
655 ranges.emplace_back(
656 std::max(obj_ranges.obj_begin, start),
657 std::min(obj_ranges.obj_end, end));
658 }
659 return ranges;
660 }
661
662 seastar::future<std::tuple<std::vector<ghobject_t>, ghobject_t>>
663 SeaStore::Shard::list_objects(CollectionRef ch,
664 const ghobject_t& start,
665 const ghobject_t& end,
666 uint64_t limit) const
667 {
668 ceph_assert(start <= end);
669 using list_iertr = OnodeManager::list_onodes_iertr;
670 using RetType = typename OnodeManager::list_onodes_bare_ret;
671 return seastar::do_with(
672 RetType(std::vector<ghobject_t>(), start),
673 std::move(limit),
674 [this, ch, start, end](auto& ret, auto& limit) {
675 return repeat_eagain([this, ch, start, end, &limit, &ret] {
676 return transaction_manager->with_transaction_intr(
677 Transaction::src_t::READ,
678 "list_objects",
679 [this, ch, start, end, &limit, &ret](auto &t)
680 {
681 return get_coll_bits(
682 ch, t
683 ).si_then([this, ch, &t, start, end, &limit, &ret](auto bits) {
684 if (!bits) {
685 return list_iertr::make_ready_future<
686 OnodeManager::list_onodes_bare_ret
687 >(std::make_tuple(
688 std::vector<ghobject_t>(),
689 ghobject_t::get_max()));
690 } else {
691 auto filter = SeaStore::get_objs_range(ch, *bits);
692 using list_iertr = OnodeManager::list_onodes_iertr;
693 using repeat_ret = list_iertr::future<seastar::stop_iteration>;
694 return trans_intr::repeat(
695 [this, &t, &ret, &limit,
696 filter, ranges = get_ranges(ch, start, end, filter)
697 ]() mutable -> repeat_ret {
698 if (limit == 0 || ranges.empty()) {
699 return list_iertr::make_ready_future<
700 seastar::stop_iteration
701 >(seastar::stop_iteration::yes);
702 }
703 auto ite = ranges.begin();
704 auto pstart = ite->first;
705 auto pend = ite->second;
706 ranges.pop_front();
707 return onode_manager->list_onodes(
708 t, pstart, pend, limit
709 ).si_then([&limit, &ret, pend](auto &&_ret) mutable {
710 auto &next_objects = std::get<0>(_ret);
711 auto &ret_objects = std::get<0>(ret);
712 ret_objects.insert(
713 ret_objects.end(),
714 next_objects.begin(),
715 next_objects.end());
716 std::get<1>(ret) = std::get<1>(_ret);
717 assert(limit >= next_objects.size());
718 limit -= next_objects.size();
719 assert(limit == 0 ||
720 std::get<1>(_ret) == pend ||
721 std::get<1>(_ret) == ghobject_t::get_max());
722 return list_iertr::make_ready_future<
723 seastar::stop_iteration
724 >(seastar::stop_iteration::no);
725 });
726 }).si_then([&ret] {
727 return list_iertr::make_ready_future<
728 OnodeManager::list_onodes_bare_ret>(std::move(ret));
729 });
730 }
731 });
732 }).safe_then([&ret](auto&& _ret) {
733 ret = std::move(_ret);
734 });
735 }).safe_then([&ret] {
736 return std::move(ret);
737 }).handle_error(
738 crimson::ct_error::assert_all{
739 "Invalid error in SeaStore::list_objects"
740 }
741 );
742 });
743 }
744
745 seastar::future<CollectionRef>
746 SeaStore::Shard::create_new_collection(const coll_t& cid)
747 {
748 LOG_PREFIX(SeaStore::create_new_collection);
749 DEBUG("{}", cid);
750 return seastar::make_ready_future<CollectionRef>(_get_collection(cid));
751 }
752
753 seastar::future<CollectionRef>
754 SeaStore::Shard::open_collection(const coll_t& cid)
755 {
756 LOG_PREFIX(SeaStore::open_collection);
757 DEBUG("{}", cid);
758 return list_collections().then([cid, this] (auto colls_cores) {
759 if (auto found = std::find(colls_cores.begin(),
760 colls_cores.end(),
761 std::make_pair(cid, seastar::this_shard_id()));
762 found != colls_cores.end()) {
763 return seastar::make_ready_future<CollectionRef>(_get_collection(cid));
764 } else {
765 return seastar::make_ready_future<CollectionRef>();
766 }
767 });
768 }
769
770 seastar::future<std::vector<coll_core_t>>
771 SeaStore::Shard::list_collections()
772 {
773 return seastar::do_with(
774 std::vector<coll_core_t>(),
775 [this](auto &ret) {
776 return repeat_eagain([this, &ret] {
777 return transaction_manager->with_transaction_intr(
778 Transaction::src_t::READ,
779 "list_collections",
780 [this, &ret](auto& t)
781 {
782 return transaction_manager->read_collection_root(t
783 ).si_then([this, &t](auto coll_root) {
784 return collection_manager->list(coll_root, t);
785 }).si_then([&ret](auto colls) {
786 ret.resize(colls.size());
787 std::transform(
788 colls.begin(), colls.end(), ret.begin(),
789 [](auto p) {
790 return std::make_pair(p.first, seastar::this_shard_id());
791 });
792 });
793 });
794 }).safe_then([&ret] {
795 return seastar::make_ready_future<std::vector<coll_core_t>>(ret);
796 });
797 }
798 ).handle_error(
799 crimson::ct_error::assert_all{
800 "Invalid error in SeaStore::list_collections"
801 }
802 );
803 }
804
805 SeaStore::Shard::read_errorator::future<ceph::bufferlist>
806 SeaStore::Shard::read(
807 CollectionRef ch,
808 const ghobject_t& oid,
809 uint64_t offset,
810 size_t len,
811 uint32_t op_flags)
812 {
813 LOG_PREFIX(SeaStore::read);
814 DEBUG("oid {} offset {} len {}", oid, offset, len);
815 return repeat_with_onode<ceph::bufferlist>(
816 ch,
817 oid,
818 Transaction::src_t::READ,
819 "read_obj",
820 op_type_t::READ,
821 [=, this](auto &t, auto &onode) -> ObjectDataHandler::read_ret {
822 size_t size = onode.get_layout().size;
823
824 if (offset >= size) {
825 return seastar::make_ready_future<ceph::bufferlist>();
826 }
827
828 size_t corrected_len = (len == 0) ?
829 size - offset :
830 std::min(size - offset, len);
831
832 return ObjectDataHandler(max_object_size).read(
833 ObjectDataHandler::context_t{
834 *transaction_manager,
835 t,
836 onode,
837 },
838 offset,
839 corrected_len);
840 });
841 }
842
843 SeaStore::Shard::read_errorator::future<ceph::bufferlist>
844 SeaStore::Shard::readv(
845 CollectionRef ch,
846 const ghobject_t& _oid,
847 interval_set<uint64_t>& m,
848 uint32_t op_flags)
849 {
850 return seastar::do_with(
851 _oid,
852 ceph::bufferlist{},
853 [=, this, &m](auto &oid, auto &ret) {
854 return crimson::do_for_each(
855 m,
856 [=, this, &oid, &ret](auto &p) {
857 return read(
858 ch, oid, p.first, p.second, op_flags
859 ).safe_then([&ret](auto bl) {
860 ret.claim_append(bl);
861 });
862 }).safe_then([&ret] {
863 return read_errorator::make_ready_future<ceph::bufferlist>
864 (std::move(ret));
865 });
866 });
867 return read_errorator::make_ready_future<ceph::bufferlist>();
868 }
869
870 using crimson::os::seastore::omap_manager::BtreeOMapManager;
871
872 SeaStore::Shard::get_attr_errorator::future<ceph::bufferlist>
873 SeaStore::Shard::get_attr(
874 CollectionRef ch,
875 const ghobject_t& oid,
876 std::string_view name) const
877 {
878 auto c = static_cast<SeastoreCollection*>(ch.get());
879 LOG_PREFIX(SeaStore::get_attr);
880 DEBUG("{} {}", c->get_cid(), oid);
881 return repeat_with_onode<ceph::bufferlist>(
882 c,
883 oid,
884 Transaction::src_t::READ,
885 "get_attr",
886 op_type_t::GET_ATTR,
887 [=, this](auto &t, auto& onode) -> _omap_get_value_ret {
888 auto& layout = onode.get_layout();
889 if (name == OI_ATTR && layout.oi_size) {
890 ceph::bufferlist bl;
891 bl.append(ceph::bufferptr(&layout.oi[0], layout.oi_size));
892 return seastar::make_ready_future<ceph::bufferlist>(std::move(bl));
893 }
894 if (name == SS_ATTR && layout.ss_size) {
895 ceph::bufferlist bl;
896 bl.append(ceph::bufferptr(&layout.ss[0], layout.ss_size));
897 return seastar::make_ready_future<ceph::bufferlist>(std::move(bl));
898 }
899 return _omap_get_value(
900 t,
901 layout.xattr_root.get(
902 onode.get_metadata_hint(device->get_block_size())),
903 name);
904 }
905 ).handle_error(crimson::ct_error::input_output_error::handle([FNAME] {
906 ERROR("EIO when getting attrs");
907 abort();
908 }), crimson::ct_error::pass_further_all{});
909 }
910
911 SeaStore::Shard::get_attrs_ertr::future<SeaStore::Shard::attrs_t>
912 SeaStore::Shard::get_attrs(
913 CollectionRef ch,
914 const ghobject_t& oid)
915 {
916 LOG_PREFIX(SeaStore::get_attrs);
917 auto c = static_cast<SeastoreCollection*>(ch.get());
918 DEBUG("{} {}", c->get_cid(), oid);
919 return repeat_with_onode<attrs_t>(
920 c,
921 oid,
922 Transaction::src_t::READ,
923 "get_addrs",
924 op_type_t::GET_ATTRS,
925 [=, this](auto &t, auto& onode) {
926 auto& layout = onode.get_layout();
927 return omap_list(onode, layout.xattr_root, t, std::nullopt,
928 OMapManager::omap_list_config_t().with_inclusive(false, false)
929 ).si_then([&layout](auto p) {
930 auto& attrs = std::get<1>(p);
931 ceph::bufferlist bl;
932 if (layout.oi_size) {
933 bl.append(ceph::bufferptr(&layout.oi[0], layout.oi_size));
934 attrs.emplace(OI_ATTR, std::move(bl));
935 }
936 if (layout.ss_size) {
937 bl.clear();
938 bl.append(ceph::bufferptr(&layout.ss[0], layout.ss_size));
939 attrs.emplace(SS_ATTR, std::move(bl));
940 }
941 return seastar::make_ready_future<omap_values_t>(std::move(attrs));
942 });
943 }
944 ).handle_error(crimson::ct_error::input_output_error::handle([FNAME] {
945 ERROR("EIO when getting attrs");
946 abort();
947 }), crimson::ct_error::pass_further_all{});
948 }
949
950 seastar::future<struct stat> SeaStore::Shard::stat(
951 CollectionRef c,
952 const ghobject_t& oid)
953 {
954 LOG_PREFIX(SeaStore::stat);
955 return repeat_with_onode<struct stat>(
956 c,
957 oid,
958 Transaction::src_t::READ,
959 "stat",
960 op_type_t::STAT,
961 [=, this, &oid](auto &t, auto &onode) {
962 struct stat st;
963 auto &olayout = onode.get_layout();
964 st.st_size = olayout.size;
965 st.st_blksize = device->get_block_size();
966 st.st_blocks = (st.st_size + st.st_blksize - 1) / st.st_blksize;
967 st.st_nlink = 1;
968 DEBUGT("cid {}, oid {}, return size {}", t, c->get_cid(), oid, st.st_size);
969 return seastar::make_ready_future<struct stat>(st);
970 }
971 ).handle_error(
972 crimson::ct_error::assert_all{
973 "Invalid error in SeaStore::stat"
974 }
975 );
976 }
977
978 SeaStore::Shard::get_attr_errorator::future<ceph::bufferlist>
979 SeaStore::Shard::omap_get_header(
980 CollectionRef ch,
981 const ghobject_t& oid)
982 {
983 return get_attr(ch, oid, OMAP_HEADER_XATTR_KEY);
984 }
985
986 SeaStore::Shard::read_errorator::future<SeaStore::Shard::omap_values_t>
987 SeaStore::Shard::omap_get_values(
988 CollectionRef ch,
989 const ghobject_t &oid,
990 const omap_keys_t &keys)
991 {
992 auto c = static_cast<SeastoreCollection*>(ch.get());
993 return repeat_with_onode<omap_values_t>(
994 c,
995 oid,
996 Transaction::src_t::READ,
997 "omap_get_values",
998 op_type_t::OMAP_GET_VALUES,
999 [this, keys](auto &t, auto &onode) {
1000 omap_root_t omap_root = onode.get_layout().omap_root.get(
1001 onode.get_metadata_hint(device->get_block_size()));
1002 return _omap_get_values(
1003 t,
1004 std::move(omap_root),
1005 keys);
1006 });
1007 }
1008
1009 SeaStore::Shard::_omap_get_value_ret
1010 SeaStore::Shard::_omap_get_value(
1011 Transaction &t,
1012 omap_root_t &&root,
1013 std::string_view key) const
1014 {
1015 return seastar::do_with(
1016 BtreeOMapManager(*transaction_manager),
1017 std::move(root),
1018 std::string(key),
1019 [&t](auto &manager, auto& root, auto& key) -> _omap_get_value_ret {
1020 if (root.is_null()) {
1021 return crimson::ct_error::enodata::make();
1022 }
1023 return manager.omap_get_value(root, t, key
1024 ).si_then([](auto opt) -> _omap_get_value_ret {
1025 if (!opt) {
1026 return crimson::ct_error::enodata::make();
1027 }
1028 return seastar::make_ready_future<ceph::bufferlist>(std::move(*opt));
1029 });
1030 }
1031 );
1032 }
1033
1034 SeaStore::Shard::_omap_get_values_ret
1035 SeaStore::Shard::_omap_get_values(
1036 Transaction &t,
1037 omap_root_t &&omap_root,
1038 const omap_keys_t &keys) const
1039 {
1040 if (omap_root.is_null()) {
1041 return seastar::make_ready_future<omap_values_t>();
1042 }
1043 return seastar::do_with(
1044 BtreeOMapManager(*transaction_manager),
1045 std::move(omap_root),
1046 omap_values_t(),
1047 [&](auto &manager, auto &root, auto &ret) {
1048 return trans_intr::do_for_each(
1049 keys.begin(),
1050 keys.end(),
1051 [&](auto &key) {
1052 return manager.omap_get_value(
1053 root,
1054 t,
1055 key
1056 ).si_then([&ret, &key](auto &&p) {
1057 if (p) {
1058 bufferlist bl;
1059 bl.append(*p);
1060 ret.emplace(
1061 std::move(key),
1062 std::move(bl));
1063 }
1064 return seastar::now();
1065 });
1066 }
1067 ).si_then([&ret] {
1068 return std::move(ret);
1069 });
1070 }
1071 );
1072 }
1073
1074 SeaStore::Shard::omap_list_ret
1075 SeaStore::Shard::omap_list(
1076 Onode &onode,
1077 const omap_root_le_t& omap_root,
1078 Transaction& t,
1079 const std::optional<std::string>& start,
1080 OMapManager::omap_list_config_t config) const
1081 {
1082 auto root = omap_root.get(
1083 onode.get_metadata_hint(device->get_block_size()));
1084 if (root.is_null()) {
1085 return seastar::make_ready_future<omap_list_bare_ret>(
1086 true, omap_values_t{}
1087 );
1088 }
1089 return seastar::do_with(
1090 BtreeOMapManager(*transaction_manager),
1091 root,
1092 start,
1093 std::optional<std::string>(std::nullopt),
1094 [&t, config](auto &manager, auto &root, auto &start, auto &end) {
1095 return manager.omap_list(root, t, start, end, config);
1096 });
1097 }
1098
1099 SeaStore::Shard::omap_get_values_ret_t
1100 SeaStore::Shard::omap_get_values(
1101 CollectionRef ch,
1102 const ghobject_t &oid,
1103 const std::optional<string> &start)
1104 {
1105 auto c = static_cast<SeastoreCollection*>(ch.get());
1106 LOG_PREFIX(SeaStore::omap_get_values);
1107 DEBUG("{} {}", c->get_cid(), oid);
1108 using ret_bare_t = std::tuple<bool, SeaStore::Shard::omap_values_t>;
1109 return repeat_with_onode<ret_bare_t>(
1110 c,
1111 oid,
1112 Transaction::src_t::READ,
1113 "omap_list",
1114 op_type_t::OMAP_LIST,
1115 [this, start](auto &t, auto &onode) {
1116 return omap_list(
1117 onode,
1118 onode.get_layout().omap_root,
1119 t,
1120 start,
1121 OMapManager::omap_list_config_t().with_inclusive(false, false));
1122 });
1123 }
1124
1125 SeaStore::Shard::_fiemap_ret SeaStore::Shard::_fiemap(
1126 Transaction &t,
1127 Onode &onode,
1128 uint64_t off,
1129 uint64_t len) const
1130 {
1131 return seastar::do_with(
1132 ObjectDataHandler(max_object_size),
1133 [=, this, &t, &onode] (auto &objhandler) {
1134 return objhandler.fiemap(
1135 ObjectDataHandler::context_t{
1136 *transaction_manager,
1137 t,
1138 onode,
1139 },
1140 off,
1141 len);
1142 });
1143 }
1144
1145 SeaStore::Shard::read_errorator::future<std::map<uint64_t, uint64_t>>
1146 SeaStore::Shard::fiemap(
1147 CollectionRef ch,
1148 const ghobject_t& oid,
1149 uint64_t off,
1150 uint64_t len)
1151 {
1152 LOG_PREFIX(SeaStore::fiemap);
1153 DEBUG("oid: {}, off: {}, len: {} ", oid, off, len);
1154 return repeat_with_onode<std::map<uint64_t, uint64_t>>(
1155 ch,
1156 oid,
1157 Transaction::src_t::READ,
1158 "fiemap_read",
1159 op_type_t::READ,
1160 [=, this](auto &t, auto &onode) -> _fiemap_ret {
1161 size_t size = onode.get_layout().size;
1162 if (off >= size) {
1163 INFOT("fiemap offset is over onode size!", t);
1164 return seastar::make_ready_future<std::map<uint64_t, uint64_t>>();
1165 }
1166 size_t adjust_len = (len == 0) ?
1167 size - off:
1168 std::min(size - off, len);
1169 return _fiemap(t, onode, off, adjust_len);
1170 });
1171 }
1172
1173 void SeaStore::Shard::on_error(ceph::os::Transaction &t) {
1174 LOG_PREFIX(SeaStore::on_error);
1175 ERROR(" transaction dump:\n");
1176 JSONFormatter f(true);
1177 f.open_object_section("transaction");
1178 t.dump(&f);
1179 f.close_section();
1180 std::stringstream str;
1181 f.flush(str);
1182 ERROR("{}", str.str());
1183 abort();
1184 }
1185
1186 seastar::future<> SeaStore::Shard::do_transaction_no_callbacks(
1187 CollectionRef _ch,
1188 ceph::os::Transaction&& _t)
1189 {
1190 // repeat_with_internal_context ensures ordering via collection lock
1191 return repeat_with_internal_context(
1192 _ch,
1193 std::move(_t),
1194 Transaction::src_t::MUTATE,
1195 "do_transaction",
1196 op_type_t::TRANSACTION,
1197 [this](auto &ctx) {
1198 return with_trans_intr(*ctx.transaction, [&, this](auto &t) {
1199 return seastar::do_with(std::vector<OnodeRef>(ctx.iter.objects.size()),
1200 std::vector<OnodeRef>(),
1201 [this, &ctx](auto& onodes, auto& d_onodes) mutable {
1202 return trans_intr::repeat(
1203 [this, &ctx, &onodes, &d_onodes]() mutable
1204 -> tm_iertr::future<seastar::stop_iteration>
1205 {
1206 if (ctx.iter.have_op()) {
1207 return _do_transaction_step(
1208 ctx, ctx.ch, onodes, d_onodes, ctx.iter
1209 ).si_then([] {
1210 return seastar::make_ready_future<seastar::stop_iteration>(
1211 seastar::stop_iteration::no);
1212 });
1213 } else {
1214 return seastar::make_ready_future<seastar::stop_iteration>(
1215 seastar::stop_iteration::yes);
1216 };
1217 }).si_then([this, &ctx, &d_onodes] {
1218 return onode_manager->write_dirty(*ctx.transaction, d_onodes);
1219 });
1220 }).si_then([this, &ctx] {
1221 return transaction_manager->submit_transaction(*ctx.transaction);
1222 });
1223 });
1224 });
1225 }
1226
1227
1228 seastar::future<> SeaStore::Shard::flush(CollectionRef ch)
1229 {
1230 return seastar::do_with(
1231 get_dummy_ordering_handle(),
1232 [this, ch](auto &handle) {
1233 return handle.take_collection_lock(
1234 static_cast<SeastoreCollection&>(*ch).ordering_lock
1235 ).then([this, &handle] {
1236 return transaction_manager->flush(handle);
1237 });
1238 });
1239 }
1240
1241 SeaStore::Shard::tm_ret
1242 SeaStore::Shard::_do_transaction_step(
1243 internal_context_t &ctx,
1244 CollectionRef &col,
1245 std::vector<OnodeRef> &onodes,
1246 std::vector<OnodeRef> &d_onodes,
1247 ceph::os::Transaction::iterator &i)
1248 {
1249 auto op = i.decode_op();
1250
1251 using ceph::os::Transaction;
1252 if (op->op == Transaction::OP_NOP)
1253 return tm_iertr::now();
1254
1255 switch (op->op) {
1256 case Transaction::OP_RMCOLL:
1257 {
1258 coll_t cid = i.get_cid(op->cid);
1259 return _remove_collection(ctx, cid);
1260 }
1261 case Transaction::OP_MKCOLL:
1262 {
1263 coll_t cid = i.get_cid(op->cid);
1264 return _create_collection(ctx, cid, op->split_bits);
1265 }
1266 case Transaction::OP_COLL_HINT:
1267 {
1268 ceph::bufferlist hint;
1269 i.decode_bl(hint);
1270 return tm_iertr::now();
1271 }
1272 }
1273
1274 using onode_iertr = OnodeManager::get_onode_iertr::extend<
1275 crimson::ct_error::value_too_large>;
1276 auto fut = onode_iertr::make_ready_future<OnodeRef>(OnodeRef());
1277 bool create = false;
1278 if (op->op == Transaction::OP_TOUCH ||
1279 op->op == Transaction::OP_CREATE ||
1280 op->op == Transaction::OP_WRITE ||
1281 op->op == Transaction::OP_ZERO) {
1282 create = true;
1283 }
1284 if (!onodes[op->oid]) {
1285 if (!create) {
1286 fut = onode_manager->get_onode(*ctx.transaction, i.get_oid(op->oid));
1287 } else {
1288 fut = onode_manager->get_or_create_onode(
1289 *ctx.transaction, i.get_oid(op->oid));
1290 }
1291 }
1292 return fut.si_then([&, op, this](auto&& get_onode) -> tm_ret {
1293 LOG_PREFIX(SeaStore::_do_transaction_step);
1294 OnodeRef &o = onodes[op->oid];
1295 if (!o) {
1296 assert(get_onode);
1297 o = get_onode;
1298 d_onodes.push_back(get_onode);
1299 }
1300 try {
1301 switch (op->op) {
1302 case Transaction::OP_REMOVE:
1303 {
1304 TRACET("removing {}", *ctx.transaction, i.get_oid(op->oid));
1305 return _remove(ctx, onodes[op->oid]);
1306 }
1307 case Transaction::OP_CREATE:
1308 case Transaction::OP_TOUCH:
1309 {
1310 return _touch(ctx, onodes[op->oid]);
1311 }
1312 case Transaction::OP_WRITE:
1313 {
1314 uint64_t off = op->off;
1315 uint64_t len = op->len;
1316 uint32_t fadvise_flags = i.get_fadvise_flags();
1317 ceph::bufferlist bl;
1318 i.decode_bl(bl);
1319 return _write(
1320 ctx, onodes[op->oid], off, len, std::move(bl),
1321 fadvise_flags);
1322 }
1323 case Transaction::OP_TRUNCATE:
1324 {
1325 uint64_t off = op->off;
1326 return _truncate(ctx, onodes[op->oid], off);
1327 }
1328 case Transaction::OP_SETATTR:
1329 {
1330 std::string name = i.decode_string();
1331 std::map<std::string, bufferlist> to_set;
1332 ceph::bufferlist& bl = to_set[name];
1333 i.decode_bl(bl);
1334 return _setattrs(ctx, onodes[op->oid], std::move(to_set));
1335 }
1336 case Transaction::OP_SETATTRS:
1337 {
1338 std::map<std::string, bufferlist> to_set;
1339 i.decode_attrset(to_set);
1340 return _setattrs(ctx, onodes[op->oid], std::move(to_set));
1341 }
1342 case Transaction::OP_RMATTR:
1343 {
1344 std::string name = i.decode_string();
1345 return _rmattr(ctx, onodes[op->oid], name);
1346 }
1347 case Transaction::OP_RMATTRS:
1348 {
1349 return _rmattrs(ctx, onodes[op->oid]);
1350 }
1351 case Transaction::OP_OMAP_SETKEYS:
1352 {
1353 std::map<std::string, ceph::bufferlist> aset;
1354 i.decode_attrset(aset);
1355 return _omap_set_values(ctx, onodes[op->oid], std::move(aset));
1356 }
1357 case Transaction::OP_OMAP_SETHEADER:
1358 {
1359 ceph::bufferlist bl;
1360 i.decode_bl(bl);
1361 return _omap_set_header(ctx, onodes[op->oid], std::move(bl));
1362 }
1363 case Transaction::OP_OMAP_RMKEYS:
1364 {
1365 omap_keys_t keys;
1366 i.decode_keyset(keys);
1367 return _omap_rmkeys(ctx, onodes[op->oid], std::move(keys));
1368 }
1369 case Transaction::OP_OMAP_RMKEYRANGE:
1370 {
1371 string first, last;
1372 first = i.decode_string();
1373 last = i.decode_string();
1374 return _omap_rmkeyrange(
1375 ctx, onodes[op->oid],
1376 std::move(first), std::move(last));
1377 }
1378 case Transaction::OP_OMAP_CLEAR:
1379 {
1380 return _omap_clear(ctx, onodes[op->oid]);
1381 }
1382 case Transaction::OP_ZERO:
1383 {
1384 objaddr_t off = op->off;
1385 extent_len_t len = op->len;
1386 return _zero(ctx, onodes[op->oid], off, len);
1387 }
1388 case Transaction::OP_SETALLOCHINT:
1389 {
1390 // TODO
1391 return tm_iertr::now();
1392 }
1393 default:
1394 ERROR("bad op {}", static_cast<unsigned>(op->op));
1395 return crimson::ct_error::input_output_error::make();
1396 }
1397 } catch (std::exception &e) {
1398 ERROR("got exception {}", e);
1399 return crimson::ct_error::input_output_error::make();
1400 }
1401 }).handle_error_interruptible(
1402 tm_iertr::pass_further{},
1403 crimson::ct_error::enoent::handle([op] {
1404 //OMAP_CLEAR, TRUNCATE, REMOVE etc ops will tolerate absent onode.
1405 if (op->op == Transaction::OP_CLONERANGE ||
1406 op->op == Transaction::OP_CLONE ||
1407 op->op == Transaction::OP_CLONERANGE2 ||
1408 op->op == Transaction::OP_COLL_ADD ||
1409 op->op == Transaction::OP_SETATTR ||
1410 op->op == Transaction::OP_SETATTRS ||
1411 op->op == Transaction::OP_RMATTR ||
1412 op->op == Transaction::OP_OMAP_SETKEYS ||
1413 op->op == Transaction::OP_OMAP_RMKEYS ||
1414 op->op == Transaction::OP_OMAP_RMKEYRANGE ||
1415 op->op == Transaction::OP_OMAP_SETHEADER) {
1416 ceph_abort_msg("unexpected enoent error");
1417 }
1418 return seastar::now();
1419 }),
1420 crimson::ct_error::assert_all{
1421 "Invalid error in SeaStore::do_transaction_step"
1422 }
1423 );
1424 }
1425
1426 SeaStore::Shard::tm_ret
1427 SeaStore::Shard::_remove(
1428 internal_context_t &ctx,
1429 OnodeRef &onode)
1430 {
1431 LOG_PREFIX(SeaStore::_remove);
1432 DEBUGT("onode={}", *ctx.transaction, *onode);
1433 auto fut = BtreeOMapManager::omap_clear_iertr::now();
1434 auto omap_root = onode->get_layout().omap_root.get(
1435 onode->get_metadata_hint(device->get_block_size()));
1436 if (omap_root.get_location() != L_ADDR_NULL) {
1437 fut = seastar::do_with(
1438 BtreeOMapManager(*transaction_manager),
1439 onode->get_layout().omap_root.get(
1440 onode->get_metadata_hint(device->get_block_size())),
1441 [&ctx, onode](auto &omap_manager, auto &omap_root) {
1442 return omap_manager.omap_clear(
1443 omap_root,
1444 *ctx.transaction
1445 );
1446 });
1447 }
1448 return fut.si_then([this, &ctx, onode] {
1449 return seastar::do_with(
1450 ObjectDataHandler(max_object_size),
1451 [=, this, &ctx](auto &objhandler) {
1452 return objhandler.clear(
1453 ObjectDataHandler::context_t{
1454 *transaction_manager,
1455 *ctx.transaction,
1456 *onode,
1457 });
1458 });
1459 }).si_then([this, &ctx, onode]() mutable {
1460 return onode_manager->erase_onode(*ctx.transaction, onode);
1461 }).handle_error_interruptible(
1462 crimson::ct_error::input_output_error::pass_further(),
1463 crimson::ct_error::assert_all(
1464 "Invalid error in SeaStore::_remove"
1465 )
1466 );
1467 }
1468
1469 SeaStore::Shard::tm_ret
1470 SeaStore::Shard::_touch(
1471 internal_context_t &ctx,
1472 OnodeRef &onode)
1473 {
1474 LOG_PREFIX(SeaStore::_touch);
1475 DEBUGT("onode={}", *ctx.transaction, *onode);
1476 return tm_iertr::now();
1477 }
1478
1479 SeaStore::Shard::tm_ret
1480 SeaStore::Shard::_write(
1481 internal_context_t &ctx,
1482 OnodeRef &onode,
1483 uint64_t offset, size_t len,
1484 ceph::bufferlist &&_bl,
1485 uint32_t fadvise_flags)
1486 {
1487 LOG_PREFIX(SeaStore::_write);
1488 DEBUGT("onode={} {}~{}", *ctx.transaction, *onode, offset, len);
1489 {
1490 auto &object_size = onode->get_mutable_layout(*ctx.transaction).size;
1491 object_size = std::max<uint64_t>(
1492 offset + len,
1493 object_size);
1494 }
1495 return seastar::do_with(
1496 std::move(_bl),
1497 ObjectDataHandler(max_object_size),
1498 [=, this, &ctx, &onode](auto &bl, auto &objhandler) {
1499 return objhandler.write(
1500 ObjectDataHandler::context_t{
1501 *transaction_manager,
1502 *ctx.transaction,
1503 *onode,
1504 },
1505 offset,
1506 bl);
1507 });
1508 }
1509
1510 SeaStore::Shard::tm_ret
1511 SeaStore::Shard::_zero(
1512 internal_context_t &ctx,
1513 OnodeRef &onode,
1514 objaddr_t offset,
1515 extent_len_t len)
1516 {
1517 LOG_PREFIX(SeaStore::_zero);
1518 DEBUGT("onode={} {}~{}", *ctx.transaction, *onode, offset, len);
1519 if (offset + len >= max_object_size) {
1520 return crimson::ct_error::input_output_error::make();
1521 }
1522 auto &object_size = onode->get_mutable_layout(*ctx.transaction).size;
1523 object_size = std::max<uint64_t>(offset + len, object_size);
1524 return seastar::do_with(
1525 ObjectDataHandler(max_object_size),
1526 [=, this, &ctx, &onode](auto &objhandler) {
1527 return objhandler.zero(
1528 ObjectDataHandler::context_t{
1529 *transaction_manager,
1530 *ctx.transaction,
1531 *onode,
1532 },
1533 offset,
1534 len);
1535 });
1536 }
1537
1538 SeaStore::Shard::omap_set_kvs_ret
1539 SeaStore::Shard::_omap_set_kvs(
1540 OnodeRef &onode,
1541 const omap_root_le_t& omap_root,
1542 Transaction& t,
1543 omap_root_le_t& mutable_omap_root,
1544 std::map<std::string, ceph::bufferlist>&& kvs)
1545 {
1546 return seastar::do_with(
1547 BtreeOMapManager(*transaction_manager),
1548 omap_root.get(onode->get_metadata_hint(device->get_block_size())),
1549 [&, keys=std::move(kvs)](auto &omap_manager, auto &root) {
1550 tm_iertr::future<> maybe_create_root =
1551 !root.is_null() ?
1552 tm_iertr::now() :
1553 omap_manager.initialize_omap(
1554 t, onode->get_metadata_hint(device->get_block_size())
1555 ).si_then([&root](auto new_root) {
1556 root = new_root;
1557 });
1558 return maybe_create_root.si_then(
1559 [&, keys=std::move(keys)]() mutable {
1560 return omap_manager.omap_set_keys(root, t, std::move(keys));
1561 }).si_then([&] {
1562 return tm_iertr::make_ready_future<omap_root_t>(std::move(root));
1563 }).si_then([&mutable_omap_root](auto root) {
1564 if (root.must_update()) {
1565 mutable_omap_root.update(root);
1566 }
1567 });
1568 }
1569 );
1570 }
1571
1572 SeaStore::Shard::tm_ret
1573 SeaStore::Shard::_omap_set_values(
1574 internal_context_t &ctx,
1575 OnodeRef &onode,
1576 std::map<std::string, ceph::bufferlist> &&aset)
1577 {
1578 LOG_PREFIX(SeaStore::_omap_set_values);
1579 DEBUGT("{} {} keys", *ctx.transaction, *onode, aset.size());
1580 return _omap_set_kvs(
1581 onode,
1582 onode->get_layout().omap_root,
1583 *ctx.transaction,
1584 onode->get_mutable_layout(*ctx.transaction).omap_root,
1585 std::move(aset));
1586 }
1587
1588 SeaStore::Shard::tm_ret
1589 SeaStore::Shard::_omap_set_header(
1590 internal_context_t &ctx,
1591 OnodeRef &onode,
1592 ceph::bufferlist &&header)
1593 {
1594 LOG_PREFIX(SeaStore::_omap_set_header);
1595 DEBUGT("{} {} bytes", *ctx.transaction, *onode, header.length());
1596 std::map<std::string, bufferlist> to_set;
1597 to_set[OMAP_HEADER_XATTR_KEY] = header;
1598 return _setattrs(ctx, onode,std::move(to_set));
1599 }
1600
1601 SeaStore::Shard::tm_ret
1602 SeaStore::Shard::_omap_clear(
1603 internal_context_t &ctx,
1604 OnodeRef &onode)
1605 {
1606 LOG_PREFIX(SeaStore::_omap_clear);
1607 DEBUGT("{} {} keys", *ctx.transaction, *onode);
1608 return _xattr_rmattr(ctx, onode, std::string(OMAP_HEADER_XATTR_KEY))
1609 .si_then([this, &ctx, &onode]() -> tm_ret {
1610 if (auto omap_root = onode->get_layout().omap_root.get(
1611 onode->get_metadata_hint(device->get_block_size()));
1612 omap_root.is_null()) {
1613 return seastar::now();
1614 } else {
1615 return seastar::do_with(
1616 BtreeOMapManager(*transaction_manager),
1617 onode->get_layout().omap_root.get(
1618 onode->get_metadata_hint(device->get_block_size())),
1619 [&ctx, &onode](
1620 auto &omap_manager,
1621 auto &omap_root) {
1622 return omap_manager.omap_clear(
1623 omap_root,
1624 *ctx.transaction)
1625 .si_then([&] {
1626 if (omap_root.must_update()) {
1627 onode->get_mutable_layout(*ctx.transaction
1628 ).omap_root.update(omap_root);
1629 }
1630 });
1631 });
1632 }
1633 });
1634 }
1635
1636 SeaStore::Shard::tm_ret
1637 SeaStore::Shard::_omap_rmkeys(
1638 internal_context_t &ctx,
1639 OnodeRef &onode,
1640 omap_keys_t &&keys)
1641 {
1642 LOG_PREFIX(SeaStore::_omap_rmkeys);
1643 DEBUGT("{} {} keys", *ctx.transaction, *onode, keys.size());
1644 auto omap_root = onode->get_layout().omap_root.get(
1645 onode->get_metadata_hint(device->get_block_size()));
1646 if (omap_root.is_null()) {
1647 return seastar::now();
1648 } else {
1649 return seastar::do_with(
1650 BtreeOMapManager(*transaction_manager),
1651 onode->get_layout().omap_root.get(
1652 onode->get_metadata_hint(device->get_block_size())),
1653 std::move(keys),
1654 [&ctx, &onode](
1655 auto &omap_manager,
1656 auto &omap_root,
1657 auto &keys) {
1658 return trans_intr::do_for_each(
1659 keys.begin(),
1660 keys.end(),
1661 [&](auto &p) {
1662 return omap_manager.omap_rm_key(
1663 omap_root,
1664 *ctx.transaction,
1665 p);
1666 }
1667 ).si_then([&] {
1668 if (omap_root.must_update()) {
1669 onode->get_mutable_layout(*ctx.transaction
1670 ).omap_root.update(omap_root);
1671 }
1672 });
1673 }
1674 );
1675 }
1676 }
1677
1678 SeaStore::Shard::tm_ret
1679 SeaStore::Shard::_omap_rmkeyrange(
1680 internal_context_t &ctx,
1681 OnodeRef &onode,
1682 std::string first,
1683 std::string last)
1684 {
1685 LOG_PREFIX(SeaStore::_omap_rmkeyrange);
1686 DEBUGT("{} first={} last={}", *ctx.transaction, *onode, first, last);
1687 if (first > last) {
1688 ERRORT("range error, first: {} > last:{}", *ctx.transaction, first, last);
1689 ceph_abort();
1690 }
1691 auto omap_root = onode->get_layout().omap_root.get(
1692 onode->get_metadata_hint(device->get_block_size()));
1693 if (omap_root.is_null()) {
1694 return seastar::now();
1695 } else {
1696 return seastar::do_with(
1697 BtreeOMapManager(*transaction_manager),
1698 onode->get_layout().omap_root.get(
1699 onode->get_metadata_hint(device->get_block_size())),
1700 std::move(first),
1701 std::move(last),
1702 [&ctx, &onode](
1703 auto &omap_manager,
1704 auto &omap_root,
1705 auto &first,
1706 auto &last) {
1707 auto config = OMapManager::omap_list_config_t()
1708 .with_inclusive(true, false)
1709 .without_max();
1710 return omap_manager.omap_rm_key_range(
1711 omap_root,
1712 *ctx.transaction,
1713 first,
1714 last,
1715 config
1716 ).si_then([&] {
1717 if (omap_root.must_update()) {
1718 onode->get_mutable_layout(*ctx.transaction
1719 ).omap_root.update(omap_root);
1720 }
1721 });
1722 });
1723 }
1724 }
1725
1726 SeaStore::Shard::tm_ret
1727 SeaStore::Shard::_truncate(
1728 internal_context_t &ctx,
1729 OnodeRef &onode,
1730 uint64_t size)
1731 {
1732 LOG_PREFIX(SeaStore::_truncate);
1733 DEBUGT("onode={} size={}", *ctx.transaction, *onode, size);
1734 onode->get_mutable_layout(*ctx.transaction).size = size;
1735 return seastar::do_with(
1736 ObjectDataHandler(max_object_size),
1737 [=, this, &ctx, &onode](auto &objhandler) {
1738 return objhandler.truncate(
1739 ObjectDataHandler::context_t{
1740 *transaction_manager,
1741 *ctx.transaction,
1742 *onode
1743 },
1744 size);
1745 });
1746 }
1747
1748 SeaStore::Shard::tm_ret
1749 SeaStore::Shard::_setattrs(
1750 internal_context_t &ctx,
1751 OnodeRef &onode,
1752 std::map<std::string, bufferlist>&& aset)
1753 {
1754 LOG_PREFIX(SeaStore::_setattrs);
1755 DEBUGT("onode={}", *ctx.transaction, *onode);
1756
1757 auto fut = tm_iertr::now();
1758 auto& layout = onode->get_mutable_layout(*ctx.transaction);
1759 if (auto it = aset.find(OI_ATTR); it != aset.end()) {
1760 auto& val = it->second;
1761 if (likely(val.length() <= onode_layout_t::MAX_OI_LENGTH)) {
1762 maybe_inline_memcpy(
1763 &layout.oi[0],
1764 val.c_str(),
1765 val.length(),
1766 onode_layout_t::MAX_OI_LENGTH);
1767
1768 if (!layout.oi_size) {
1769 // if oi was not in the layout, it probably exists in the omap,
1770 // need to remove it first
1771 fut = _xattr_rmattr(ctx, onode, OI_ATTR);
1772 }
1773 layout.oi_size = val.length();
1774 aset.erase(it);
1775 } else {
1776 layout.oi_size = 0;
1777 }
1778 }
1779
1780 if (auto it = aset.find(SS_ATTR); it != aset.end()) {
1781 auto& val = it->second;
1782 if (likely(val.length() <= onode_layout_t::MAX_SS_LENGTH)) {
1783 maybe_inline_memcpy(
1784 &layout.ss[0],
1785 val.c_str(),
1786 val.length(),
1787 onode_layout_t::MAX_SS_LENGTH);
1788
1789 if (!layout.ss_size) {
1790 fut = _xattr_rmattr(ctx, onode, SS_ATTR);
1791 }
1792 layout.ss_size = val.length();
1793
1794 aset.erase(it);
1795 } else {
1796 layout.ss_size = 0;
1797 }
1798 }
1799
1800 if (aset.empty()) {
1801 return fut;
1802 }
1803
1804 return fut.si_then(
1805 [this, onode, &ctx, &layout,
1806 aset=std::move(aset)]() mutable {
1807 return _omap_set_kvs(
1808 onode,
1809 onode->get_layout().xattr_root,
1810 *ctx.transaction,
1811 layout.xattr_root,
1812 std::move(aset));
1813 });
1814 }
1815
1816 SeaStore::Shard::tm_ret
1817 SeaStore::Shard::_rmattr(
1818 internal_context_t &ctx,
1819 OnodeRef &onode,
1820 std::string name)
1821 {
1822 LOG_PREFIX(SeaStore::_rmattr);
1823 DEBUGT("onode={}", *ctx.transaction, *onode);
1824 auto& layout = onode->get_mutable_layout(*ctx.transaction);
1825 if ((name == OI_ATTR) && (layout.oi_size > 0)) {
1826 memset(&layout.oi[0], 0, layout.oi_size);
1827 layout.oi_size = 0;
1828 return tm_iertr::now();
1829 } else if ((name == SS_ATTR) && (layout.ss_size > 0)) {
1830 memset(&layout.ss[0], 0, layout.ss_size);
1831 layout.ss_size = 0;
1832 return tm_iertr::now();
1833 } else {
1834 return _xattr_rmattr(
1835 ctx,
1836 onode,
1837 std::move(name));
1838 }
1839 }
1840
1841 SeaStore::Shard::tm_ret
1842 SeaStore::Shard::_xattr_rmattr(
1843 internal_context_t &ctx,
1844 OnodeRef &onode,
1845 std::string &&name)
1846 {
1847 LOG_PREFIX(SeaStore::_xattr_rmattr);
1848 DEBUGT("onode={}", *ctx.transaction, *onode);
1849 auto xattr_root = onode->get_layout().xattr_root.get(
1850 onode->get_metadata_hint(device->get_block_size()));
1851 if (xattr_root.is_null()) {
1852 return seastar::now();
1853 } else {
1854 return seastar::do_with(
1855 BtreeOMapManager(*transaction_manager),
1856 onode->get_layout().xattr_root.get(
1857 onode->get_metadata_hint(device->get_block_size())),
1858 std::move(name),
1859 [&ctx, &onode](auto &omap_manager, auto &xattr_root, auto &name) {
1860 return omap_manager.omap_rm_key(xattr_root, *ctx.transaction, name)
1861 .si_then([&] {
1862 if (xattr_root.must_update()) {
1863 onode->get_mutable_layout(*ctx.transaction
1864 ).xattr_root.update(xattr_root);
1865 }
1866 });
1867 });
1868 }
1869 }
1870
1871 SeaStore::Shard::tm_ret
1872 SeaStore::Shard::_rmattrs(
1873 internal_context_t &ctx,
1874 OnodeRef &onode)
1875 {
1876 LOG_PREFIX(SeaStore::_rmattrs);
1877 DEBUGT("onode={}", *ctx.transaction, *onode);
1878 auto& layout = onode->get_mutable_layout(*ctx.transaction);
1879 memset(&layout.oi[0], 0, layout.oi_size);
1880 layout.oi_size = 0;
1881 memset(&layout.ss[0], 0, layout.ss_size);
1882 layout.ss_size = 0;
1883 return _xattr_clear(ctx, onode);
1884 }
1885
1886 SeaStore::Shard::tm_ret
1887 SeaStore::Shard::_xattr_clear(
1888 internal_context_t &ctx,
1889 OnodeRef &onode)
1890 {
1891 LOG_PREFIX(SeaStore::_xattr_clear);
1892 DEBUGT("onode={}", *ctx.transaction, *onode);
1893 auto xattr_root = onode->get_layout().xattr_root.get(
1894 onode->get_metadata_hint(device->get_block_size()));
1895 if (xattr_root.is_null()) {
1896 return seastar::now();
1897 } else {
1898 return seastar::do_with(
1899 BtreeOMapManager(*transaction_manager),
1900 onode->get_layout().xattr_root.get(
1901 onode->get_metadata_hint(device->get_block_size())),
1902 [&ctx, &onode](auto &omap_manager, auto &xattr_root) {
1903 return omap_manager.omap_clear(xattr_root, *ctx.transaction)
1904 .si_then([&] {
1905 if (xattr_root.must_update()) {
1906 onode->get_mutable_layout(*ctx.transaction
1907 ).xattr_root.update(xattr_root);
1908 }
1909 });
1910 });
1911 }
1912 }
1913
1914 SeaStore::Shard::tm_ret
1915 SeaStore::Shard::_create_collection(
1916 internal_context_t &ctx,
1917 const coll_t& cid, int bits)
1918 {
1919 return transaction_manager->read_collection_root(
1920 *ctx.transaction
1921 ).si_then([=, this, &ctx](auto _cmroot) {
1922 return seastar::do_with(
1923 _cmroot,
1924 [=, this, &ctx](auto &cmroot) {
1925 return collection_manager->create(
1926 cmroot,
1927 *ctx.transaction,
1928 cid,
1929 bits
1930 ).si_then([this, &ctx, &cmroot] {
1931 if (cmroot.must_update()) {
1932 transaction_manager->write_collection_root(
1933 *ctx.transaction,
1934 cmroot);
1935 }
1936 });
1937 }
1938 );
1939 }).handle_error_interruptible(
1940 tm_iertr::pass_further{},
1941 crimson::ct_error::assert_all{
1942 "Invalid error in SeaStore::_create_collection"
1943 }
1944 );
1945 }
1946
1947 SeaStore::Shard::tm_ret
1948 SeaStore::Shard::_remove_collection(
1949 internal_context_t &ctx,
1950 const coll_t& cid)
1951 {
1952 return transaction_manager->read_collection_root(
1953 *ctx.transaction
1954 ).si_then([=, this, &ctx](auto _cmroot) {
1955 return seastar::do_with(
1956 _cmroot,
1957 [=, this, &ctx](auto &cmroot) {
1958 return collection_manager->remove(
1959 cmroot,
1960 *ctx.transaction,
1961 cid
1962 ).si_then([this, &ctx, &cmroot] {
1963 // param here denotes whether it already existed, probably error
1964 if (cmroot.must_update()) {
1965 transaction_manager->write_collection_root(
1966 *ctx.transaction,
1967 cmroot);
1968 }
1969 });
1970 });
1971 }).handle_error_interruptible(
1972 tm_iertr::pass_further{},
1973 crimson::ct_error::assert_all{
1974 "Invalid error in SeaStore::_create_collection"
1975 }
1976 );
1977 }
1978
1979 boost::intrusive_ptr<SeastoreCollection>
1980 SeaStore::Shard::_get_collection(const coll_t& cid)
1981 {
1982 return new SeastoreCollection{cid};
1983 }
1984
1985 seastar::future<> SeaStore::Shard::write_meta(
1986 const std::string& key,
1987 const std::string& value)
1988 {
1989 LOG_PREFIX(SeaStore::write_meta);
1990 DEBUG("key: {}; value: {}", key, value);
1991 return seastar::do_with(
1992 key, value,
1993 [this, FNAME](auto& key, auto& value) {
1994 return repeat_eagain([this, FNAME, &key, &value] {
1995 return transaction_manager->with_transaction_intr(
1996 Transaction::src_t::MUTATE,
1997 "write_meta",
1998 [this, FNAME, &key, &value](auto& t)
1999 {
2000 DEBUGT("Have transaction, key: {}; value: {}", t, key, value);
2001 return transaction_manager->update_root_meta(
2002 t, key, value
2003 ).si_then([this, &t] {
2004 return transaction_manager->submit_transaction(t);
2005 });
2006 });
2007 });
2008 }).handle_error(
2009 crimson::ct_error::assert_all{"Invalid error in SeaStore::write_meta"}
2010 );
2011 }
2012
2013 seastar::future<std::tuple<int, std::string>>
2014 SeaStore::read_meta(const std::string& key)
2015 {
2016 ceph_assert(seastar::this_shard_id() == primary_core);
2017 LOG_PREFIX(SeaStore::read_meta);
2018 DEBUG("key: {}", key);
2019 return mdstore->read_meta(key).safe_then([](auto v) {
2020 if (v) {
2021 return std::make_tuple(0, std::move(*v));
2022 } else {
2023 return std::make_tuple(-1, std::string(""));
2024 }
2025 }).handle_error(
2026 crimson::ct_error::assert_all{
2027 "Invalid error in SeaStore::read_meta"
2028 }
2029 );
2030 }
2031
2032 uuid_d SeaStore::Shard::get_fsid() const
2033 {
2034 return device->get_meta().seastore_id;
2035 }
2036
2037 void SeaStore::Shard::init_managers()
2038 {
2039 transaction_manager.reset();
2040 collection_manager.reset();
2041 onode_manager.reset();
2042
2043 transaction_manager = make_transaction_manager(
2044 device, secondaries, is_test);
2045 collection_manager = std::make_unique<collection_manager::FlatCollectionManager>(
2046 *transaction_manager);
2047 onode_manager = std::make_unique<crimson::os::seastore::onode::FLTreeOnodeManager>(
2048 *transaction_manager);
2049 }
2050
2051 std::unique_ptr<SeaStore> make_seastore(
2052 const std::string &device)
2053 {
2054 auto mdstore = std::make_unique<FileMDStore>(device);
2055 return std::make_unique<SeaStore>(
2056 device,
2057 std::move(mdstore));
2058 }
2059
2060 std::unique_ptr<SeaStore> make_test_seastore(
2061 SeaStore::MDStoreRef mdstore)
2062 {
2063 return std::make_unique<SeaStore>(
2064 "",
2065 std::move(mdstore));
2066 }
2067
2068 }