1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "replicated_backend.h"
6 #include "messages/MOSDRepOpReply.h"
8 #include "crimson/common/exception.h"
9 #include "crimson/common/log.h"
10 #include "crimson/os/futurized_store.h"
11 #include "crimson/osd/shard_services.h"
12 #include "osd/PeeringState.h"
15 seastar::logger
& logger() {
16 return crimson::get_logger(ceph_subsys_osd
);
20 ReplicatedBackend::ReplicatedBackend(pg_t pgid
,
22 ReplicatedBackend::CollectionRef coll
,
23 crimson::osd::ShardServices
& shard_services
)
24 : PGBackend
{whoami
.shard
, coll
, &shard_services
.get_store()},
27 shard_services
{shard_services
}
30 ReplicatedBackend::ll_read_ierrorator::future
<ceph::bufferlist
>
31 ReplicatedBackend::_read(const hobject_t
& hoid
,
36 if (__builtin_expect(stopping
, false)) {
37 throw crimson::common::system_shutdown_exception();
39 return store
->read(coll
, ghobject_t
{hoid
}, off
, len
, flags
);
42 ReplicatedBackend::rep_op_fut_t
43 ReplicatedBackend::_submit_transaction(std::set
<pg_shard_t
>&& pg_shards
,
44 const hobject_t
& hoid
,
45 ceph::os::Transaction
&& txn
,
46 osd_op_params_t
&& osd_op_p
,
47 epoch_t min_epoch
, epoch_t map_epoch
,
48 std::vector
<pg_log_entry_t
>&& log_entries
)
50 if (__builtin_expect(stopping
, false)) {
51 throw crimson::common::system_shutdown_exception();
53 if (__builtin_expect((bool)peering
, false)) {
54 throw crimson::common::actingset_changed(peering
->is_primary
);
57 const ceph_tid_t tid
= next_txn_id
++;
59 pending_trans
.try_emplace(tid
, pg_shards
.size(), osd_op_p
.at_version
).first
;
60 bufferlist encoded_txn
;
61 encode(txn
, encoded_txn
);
63 logger().debug("ReplicatedBackend::_submit_transaction: do_transaction...");
64 auto all_completed
= interruptor::make_interruptible(
65 shard_services
.get_store().do_transaction(coll
, std::move(txn
)))
66 .then_interruptible([this, peers
=pending_txn
->second
.weak_from_this()] {
68 // for now, only actingset_changed can cause peers
71 throw crimson::common::actingset_changed(peering
->is_primary
);
73 if (--peers
->pending
== 0) {
74 peers
->all_committed
.set_value();
75 peers
->all_committed
= {};
76 return seastar::now();
78 return peers
->all_committed
.get_shared_future();
79 }).then_interruptible([pending_txn
, this] {
80 auto acked_peers
= std::move(pending_txn
->second
.acked_peers
);
81 pending_trans
.erase(pending_txn
);
82 return seastar::make_ready_future
<crimson::osd::acked_peers_t
>(std::move(acked_peers
));
85 for (auto pg_shard
: pg_shards
) {
86 if (pg_shard
!= whoami
) {
87 auto m
= crimson::make_message
<MOSDRepOp
>(
90 spg_t
{pgid
, pg_shard
.shard
},
92 CEPH_OSD_FLAG_ACK
| CEPH_OSD_FLAG_ONDISK
,
97 m
->set_data(encoded_txn
);
98 pending_txn
->second
.acked_peers
.push_back({pg_shard
, eversion_t
{}});
99 encode(log_entries
, m
->logbl
);
100 m
->pg_trim_to
= osd_op_p
.pg_trim_to
;
101 m
->min_last_complete_ondisk
= osd_op_p
.min_last_complete_ondisk
;
102 m
->set_rollback_to(osd_op_p
.at_version
);
103 // TODO: set more stuff. e.g., pg_states
104 (void) shard_services
.send_to_osd(pg_shard
.osd
, std::move(m
), map_epoch
);
107 return {seastar::now(), std::move(all_completed
)};
110 void ReplicatedBackend::on_actingset_changed(peering_info_t pi
)
113 crimson::common::actingset_changed e_actingset_changed
{peering
->is_primary
};
114 for (auto& [tid
, pending_txn
] : pending_trans
) {
115 pending_txn
.all_committed
.set_exception(e_actingset_changed
);
117 pending_trans
.clear();
120 void ReplicatedBackend::got_rep_op_reply(const MOSDRepOpReply
& reply
)
122 auto found
= pending_trans
.find(reply
.get_tid());
123 if (found
== pending_trans
.end()) {
124 logger().warn("{}: no matched pending rep op: {}", __func__
, reply
);
127 auto& peers
= found
->second
;
128 for (auto& peer
: peers
.acked_peers
) {
129 if (peer
.shard
== reply
.from
) {
130 peer
.last_complete_ondisk
= reply
.get_last_complete_ondisk();
131 if (--peers
.pending
== 0) {
132 peers
.all_committed
.set_value();
133 peers
.all_committed
= {};
140 seastar::future
<> ReplicatedBackend::stop()
142 logger().info("ReplicatedBackend::stop {}", coll
->get_cid());
144 for (auto& [tid
, pending_on
] : pending_trans
) {
145 pending_on
.all_committed
.set_exception(
146 crimson::common::system_shutdown_exception());
148 pending_trans
.clear();
149 return seastar::now();
153 ReplicatedBackend::request_committed(const osd_reqid_t
& reqid
,
154 const eversion_t
& at_version
)
156 if (std::empty(pending_trans
)) {
157 return seastar::now();
159 auto iter
= pending_trans
.begin();
160 auto& pending_txn
= iter
->second
;
161 if (pending_txn
.at_version
> at_version
) {
162 return seastar::now();
164 for (; iter
->second
.at_version
< at_version
; ++iter
);
165 // As for now, the previous client_request with the same reqid
166 // mustn't have finished, as that would mean later client_requests
167 // has finished before earlier ones.
169 // The following line of code should be "assert(pending_txn.at_version == at_version)",
170 // as there can be only one transaction at any time in pending_trans due to
171 // PG::client_request_pg_pipeline. But there's a high possibility that we will
172 // improve the parallelism here in the future, which means there may be multiple
173 // client requests in flight, so we loosed the restriction to as follows. Correct
174 // me if I'm wrong:-)
175 assert(iter
!= pending_trans
.end() && iter
->second
.at_version
== at_version
);
176 if (iter
->second
.pending
) {
177 return iter
->second
.all_committed
.get_shared_future();
179 return seastar::now();