]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/osd/replicated_backend.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / crimson / osd / replicated_backend.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "replicated_backend.h"
5
6 #include "messages/MOSDRepOpReply.h"
7
8 #include "crimson/common/log.h"
9 #include "crimson/os/cyanstore/cyan_object.h"
10 #include "crimson/os/futurized_store.h"
11 #include "crimson/osd/shard_services.h"
12
13 namespace {
14 seastar::logger& logger() {
15 return crimson::get_logger(ceph_subsys_osd);
16 }
17 }
18
19 ReplicatedBackend::ReplicatedBackend(pg_t pgid,
20 pg_shard_t whoami,
21 ReplicatedBackend::CollectionRef coll,
22 crimson::osd::ShardServices& shard_services)
23 : PGBackend{whoami.shard, coll, &shard_services.get_store()},
24 pgid{pgid},
25 whoami{whoami},
26 shard_services{shard_services}
27 {}
28
29 ReplicatedBackend::ll_read_errorator::future<ceph::bufferlist>
30 ReplicatedBackend::_read(const hobject_t& hoid,
31 const uint64_t off,
32 const uint64_t len,
33 const uint32_t flags)
34 {
35 return store->read(coll, ghobject_t{hoid}, off, len, flags);
36 }
37
38 seastar::future<crimson::osd::acked_peers_t>
39 ReplicatedBackend::_submit_transaction(std::set<pg_shard_t>&& pg_shards,
40 const hobject_t& hoid,
41 ceph::os::Transaction&& txn,
42 osd_reqid_t req_id,
43 epoch_t min_epoch, epoch_t map_epoch,
44 eversion_t ver)
45 {
46 const ceph_tid_t tid = next_txn_id++;
47 auto pending_txn =
48 pending_trans.emplace(tid, pending_on_t{pg_shards.size()}).first;
49 bufferlist encoded_txn;
50 encode(txn, encoded_txn);
51
52 return seastar::parallel_for_each(std::move(pg_shards),
53 [=, encoded_txn=std::move(encoded_txn), txn=std::move(txn)]
54 (auto pg_shard) mutable {
55 if (pg_shard == whoami) {
56 return shard_services.get_store().do_transaction(coll,std::move(txn));
57 } else {
58 auto m = make_message<MOSDRepOp>(req_id, whoami,
59 spg_t{pgid, pg_shard.shard}, hoid,
60 CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK,
61 map_epoch, min_epoch,
62 tid, ver);
63 m->set_data(encoded_txn);
64 pending_txn->second.acked_peers.push_back({pg_shard, eversion_t{}});
65 // TODO: set more stuff. e.g., pg_states
66 return shard_services.send_to_osd(pg_shard.osd, std::move(m), map_epoch);
67 }
68 }).then([&peers=pending_txn->second] {
69 if (--peers.pending == 0) {
70 peers.all_committed.set_value();
71 }
72 return peers.all_committed.get_future();
73 }).then([tid, pending_txn, this] {
74 pending_txn->second.all_committed = {};
75 auto acked_peers = std::move(pending_txn->second.acked_peers);
76 pending_trans.erase(pending_txn);
77 return seastar::make_ready_future<crimson::osd::acked_peers_t>(std::move(acked_peers));
78 });
79 }
80
81 void ReplicatedBackend::got_rep_op_reply(const MOSDRepOpReply& reply)
82 {
83 auto found = pending_trans.find(reply.get_tid());
84 if (found == pending_trans.end()) {
85 logger().warn("{}: no matched pending rep op: {}", __func__, reply);
86 return;
87 }
88 auto& peers = found->second;
89 for (auto& peer : peers.acked_peers) {
90 if (peer.shard == reply.from) {
91 peer.last_complete_ondisk = reply.get_last_complete_ondisk();
92 if (--peers.pending == 0) {
93 peers.all_committed.set_value();
94 }
95 return;
96 }
97 }
98 }