]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/osd/replicated_backend.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / crimson / osd / replicated_backend.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "replicated_backend.h"
5
6 #include "messages/MOSDRepOpReply.h"
7
8 #include "crimson/common/exception.h"
9 #include "crimson/common/log.h"
10 #include "crimson/os/futurized_store.h"
11 #include "crimson/osd/shard_services.h"
12 #include "osd/PeeringState.h"
13
14 namespace {
15 seastar::logger& logger() {
16 return crimson::get_logger(ceph_subsys_osd);
17 }
18 }
19
20 ReplicatedBackend::ReplicatedBackend(pg_t pgid,
21 pg_shard_t whoami,
22 ReplicatedBackend::CollectionRef coll,
23 crimson::osd::ShardServices& shard_services)
24 : PGBackend{whoami.shard, coll, &shard_services.get_store()},
25 pgid{pgid},
26 whoami{whoami},
27 shard_services{shard_services}
28 {}
29
30 ReplicatedBackend::ll_read_ierrorator::future<ceph::bufferlist>
31 ReplicatedBackend::_read(const hobject_t& hoid,
32 const uint64_t off,
33 const uint64_t len,
34 const uint32_t flags)
35 {
36 if (__builtin_expect(stopping, false)) {
37 throw crimson::common::system_shutdown_exception();
38 }
39 return store->read(coll, ghobject_t{hoid}, off, len, flags);
40 }
41
42 ReplicatedBackend::rep_op_fut_t
43 ReplicatedBackend::_submit_transaction(std::set<pg_shard_t>&& pg_shards,
44 const hobject_t& hoid,
45 ceph::os::Transaction&& txn,
46 osd_op_params_t&& osd_op_p,
47 epoch_t min_epoch, epoch_t map_epoch,
48 std::vector<pg_log_entry_t>&& log_entries)
49 {
50 if (__builtin_expect(stopping, false)) {
51 throw crimson::common::system_shutdown_exception();
52 }
53 if (__builtin_expect((bool)peering, false)) {
54 throw crimson::common::actingset_changed(peering->is_primary);
55 }
56
57 const ceph_tid_t tid = next_txn_id++;
58 auto pending_txn =
59 pending_trans.try_emplace(tid, pg_shards.size(), osd_op_p.at_version).first;
60 bufferlist encoded_txn;
61 encode(txn, encoded_txn);
62
63 logger().debug("ReplicatedBackend::_submit_transaction: do_transaction...");
64 auto all_completed = interruptor::make_interruptible(
65 shard_services.get_store().do_transaction(coll, std::move(txn)))
66 .then_interruptible([this, peers=pending_txn->second.weak_from_this()] {
67 if (!peers) {
68 // for now, only actingset_changed can cause peers
69 // to be nullptr
70 assert(peering);
71 throw crimson::common::actingset_changed(peering->is_primary);
72 }
73 if (--peers->pending == 0) {
74 peers->all_committed.set_value();
75 peers->all_committed = {};
76 return seastar::now();
77 }
78 return peers->all_committed.get_shared_future();
79 }).then_interruptible([pending_txn, this] {
80 auto acked_peers = std::move(pending_txn->second.acked_peers);
81 pending_trans.erase(pending_txn);
82 return seastar::make_ready_future<crimson::osd::acked_peers_t>(std::move(acked_peers));
83 });
84
85 for (auto pg_shard : pg_shards) {
86 if (pg_shard != whoami) {
87 auto m = crimson::make_message<MOSDRepOp>(
88 osd_op_p.req_id,
89 whoami,
90 spg_t{pgid, pg_shard.shard},
91 hoid,
92 CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK,
93 map_epoch,
94 min_epoch,
95 tid,
96 osd_op_p.at_version);
97 m->set_data(encoded_txn);
98 pending_txn->second.acked_peers.push_back({pg_shard, eversion_t{}});
99 encode(log_entries, m->logbl);
100 m->pg_trim_to = osd_op_p.pg_trim_to;
101 m->min_last_complete_ondisk = osd_op_p.min_last_complete_ondisk;
102 m->set_rollback_to(osd_op_p.at_version);
103 // TODO: set more stuff. e.g., pg_states
104 (void) shard_services.send_to_osd(pg_shard.osd, std::move(m), map_epoch);
105 }
106 }
107 return {seastar::now(), std::move(all_completed)};
108 }
109
110 void ReplicatedBackend::on_actingset_changed(peering_info_t pi)
111 {
112 peering.emplace(pi);
113 crimson::common::actingset_changed e_actingset_changed{peering->is_primary};
114 for (auto& [tid, pending_txn] : pending_trans) {
115 pending_txn.all_committed.set_exception(e_actingset_changed);
116 }
117 pending_trans.clear();
118 }
119
120 void ReplicatedBackend::got_rep_op_reply(const MOSDRepOpReply& reply)
121 {
122 auto found = pending_trans.find(reply.get_tid());
123 if (found == pending_trans.end()) {
124 logger().warn("{}: no matched pending rep op: {}", __func__, reply);
125 return;
126 }
127 auto& peers = found->second;
128 for (auto& peer : peers.acked_peers) {
129 if (peer.shard == reply.from) {
130 peer.last_complete_ondisk = reply.get_last_complete_ondisk();
131 if (--peers.pending == 0) {
132 peers.all_committed.set_value();
133 peers.all_committed = {};
134 }
135 return;
136 }
137 }
138 }
139
140 seastar::future<> ReplicatedBackend::stop()
141 {
142 logger().info("ReplicatedBackend::stop {}", coll->get_cid());
143 stopping = true;
144 for (auto& [tid, pending_on] : pending_trans) {
145 pending_on.all_committed.set_exception(
146 crimson::common::system_shutdown_exception());
147 }
148 pending_trans.clear();
149 return seastar::now();
150 }
151
152 seastar::future<>
153 ReplicatedBackend::request_committed(const osd_reqid_t& reqid,
154 const eversion_t& at_version)
155 {
156 if (std::empty(pending_trans)) {
157 return seastar::now();
158 }
159 auto iter = pending_trans.begin();
160 auto& pending_txn = iter->second;
161 if (pending_txn.at_version > at_version) {
162 return seastar::now();
163 }
164 for (; iter->second.at_version < at_version; ++iter);
165 // As for now, the previous client_request with the same reqid
166 // mustn't have finished, as that would mean later client_requests
167 // has finished before earlier ones.
168 //
169 // The following line of code should be "assert(pending_txn.at_version == at_version)",
170 // as there can be only one transaction at any time in pending_trans due to
171 // PG::client_request_pg_pipeline. But there's a high possibility that we will
172 // improve the parallelism here in the future, which means there may be multiple
173 // client requests in flight, so we loosed the restriction to as follows. Correct
174 // me if I'm wrong:-)
175 assert(iter != pending_trans.end() && iter->second.at_version == at_version);
176 if (iter->second.pending) {
177 return iter->second.all_committed.get_shared_future();
178 } else {
179 return seastar::now();
180 }
181 }