]>
git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/osd/replicated_backend.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "replicated_backend.h"
6 #include "messages/MOSDRepOpReply.h"
8 #include "crimson/common/log.h"
9 #include "crimson/os/cyanstore/cyan_object.h"
10 #include "crimson/os/futurized_store.h"
11 #include "crimson/osd/shard_services.h"
14 seastar::logger
& logger() {
15 return crimson::get_logger(ceph_subsys_osd
);
19 ReplicatedBackend::ReplicatedBackend(pg_t pgid
,
21 ReplicatedBackend::CollectionRef coll
,
22 crimson::osd::ShardServices
& shard_services
)
23 : PGBackend
{whoami
.shard
, coll
, &shard_services
.get_store()},
26 shard_services
{shard_services
}
29 ReplicatedBackend::ll_read_errorator::future
<ceph::bufferlist
>
30 ReplicatedBackend::_read(const hobject_t
& hoid
,
35 return store
->read(coll
, ghobject_t
{hoid
}, off
, len
, flags
);
38 seastar::future
<crimson::osd::acked_peers_t
>
39 ReplicatedBackend::_submit_transaction(std::set
<pg_shard_t
>&& pg_shards
,
40 const hobject_t
& hoid
,
41 ceph::os::Transaction
&& txn
,
43 epoch_t min_epoch
, epoch_t map_epoch
,
46 const ceph_tid_t tid
= next_txn_id
++;
48 pending_trans
.emplace(tid
, pending_on_t
{pg_shards
.size()}).first
;
49 bufferlist encoded_txn
;
50 encode(txn
, encoded_txn
);
52 return seastar::parallel_for_each(std::move(pg_shards
),
53 [=, encoded_txn
=std::move(encoded_txn
), txn
=std::move(txn
)]
54 (auto pg_shard
) mutable {
55 if (pg_shard
== whoami
) {
56 return shard_services
.get_store().do_transaction(coll
,std::move(txn
));
58 auto m
= make_message
<MOSDRepOp
>(req_id
, whoami
,
59 spg_t
{pgid
, pg_shard
.shard
}, hoid
,
60 CEPH_OSD_FLAG_ACK
| CEPH_OSD_FLAG_ONDISK
,
63 m
->set_data(encoded_txn
);
64 pending_txn
->second
.acked_peers
.push_back({pg_shard
, eversion_t
{}});
65 // TODO: set more stuff. e.g., pg_states
66 return shard_services
.send_to_osd(pg_shard
.osd
, std::move(m
), map_epoch
);
68 }).then([&peers
=pending_txn
->second
] {
69 if (--peers
.pending
== 0) {
70 peers
.all_committed
.set_value();
72 return peers
.all_committed
.get_future();
73 }).then([tid
, pending_txn
, this] {
74 pending_txn
->second
.all_committed
= {};
75 auto acked_peers
= std::move(pending_txn
->second
.acked_peers
);
76 pending_trans
.erase(pending_txn
);
77 return seastar::make_ready_future
<crimson::osd::acked_peers_t
>(std::move(acked_peers
));
81 void ReplicatedBackend::got_rep_op_reply(const MOSDRepOpReply
& reply
)
83 auto found
= pending_trans
.find(reply
.get_tid());
84 if (found
== pending_trans
.end()) {
85 logger().warn("{}: no matched pending rep op: {}", __func__
, reply
);
88 auto& peers
= found
->second
;
89 for (auto& peer
: peers
.acked_peers
) {
90 if (peer
.shard
== reply
.from
) {
91 peer
.last_complete_ondisk
= reply
.get_last_complete_ondisk();
92 if (--peers
.pending
== 0) {
93 peers
.all_committed
.set_value();