]> git.proxmox.com Git - ceph.git/blame - ceph/src/crimson/osd/osd_operations/compound_peering_request.cc
buildsys: change download over to reef release
[ceph.git] / ceph / src / crimson / osd / osd_operations / compound_peering_request.cc
CommitLineData
9f95a23c
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include <seastar/core/future.hh>
5
6#include "osd/PeeringState.h"
7
9f95a23c
TL
8#include "messages/MOSDPGCreate2.h"
9
10#include "common/Formatter.h"
11
f67539c2 12#include "crimson/common/exception.h"
9f95a23c
TL
13#include "crimson/osd/pg.h"
14#include "crimson/osd/osd.h"
15#include "crimson/osd/osd_operations/compound_peering_request.h"
16
17namespace {
18 seastar::logger& logger() {
19 return crimson::get_logger(ceph_subsys_osd);
20 }
21}
22
23namespace {
24using namespace crimson::osd;
25
26struct compound_state {
27 seastar::promise<BufferedRecoveryMessages> promise;
28 // assuming crimson-osd won't need to be compatible with pre-octopus
29 // releases
20effc67 30 BufferedRecoveryMessages ctx;
9f95a23c
TL
31 compound_state() = default;
32 ~compound_state() {
33 promise.set_value(std::move(ctx));
34 }
35};
36using compound_state_ref = seastar::lw_shared_ptr<compound_state>;
37
38class PeeringSubEvent : public RemotePeeringEvent {
39 compound_state_ref state;
40public:
41 template <typename... Args>
42 PeeringSubEvent(compound_state_ref state, Args &&... args) :
43 RemotePeeringEvent(std::forward<Args>(args)...), state(state) {}
44
20effc67
TL
45 PeeringEvent::interruptible_future<>
46 complete_rctx(Ref<crimson::osd::PG> pg) final {
9f95a23c
TL
47 logger().debug("{}: submitting ctx transaction", *this);
48 state->ctx.accept_buffered_messages(ctx);
49 state = {};
50 if (!pg) {
51 ceph_assert(ctx.transaction.empty());
52 return seastar::now();
53 } else {
54 return osd.get_shard_services().dispatch_context_transaction(
55 pg->get_collection_ref(), ctx);
56 }
57 }
58};
59
20effc67 60std::vector<crimson::OperationRef> handle_pg_create(
9f95a23c
TL
61 OSD &osd,
62 crimson::net::ConnectionRef conn,
63 compound_state_ref state,
64 Ref<MOSDPGCreate2> m)
65{
20effc67 66 std::vector<crimson::OperationRef> ret;
f67539c2
TL
67 for (auto& [pgid, when] : m->pgs) {
68 const auto &[created, created_stamp] = when;
9f95a23c
TL
69 auto q = m->pg_extra.find(pgid);
70 ceph_assert(q != m->pg_extra.end());
f67539c2 71 auto& [history, pi] = q->second;
9f95a23c 72 logger().debug(
f67539c2
TL
73 "{}: {} e{} @{} "
74 "history {} pi {}",
75 __func__, pgid, created, created_stamp,
76 history, pi);
77 if (!pi.empty() &&
78 m->epoch < pi.get_bounds().second) {
9f95a23c 79 logger().error(
f67539c2
TL
80 "got pg_create on {} epoch {} "
81 "unmatched past_intervals {} (history {})",
82 pgid, m->epoch,
83 pi, history);
9f95a23c
TL
84 } else {
85 auto op = osd.get_shard_services().start_operation<PeeringSubEvent>(
86 state,
87 osd,
88 conn,
89 osd.get_shard_services(),
90 pg_shard_t(),
91 pgid,
92 m->epoch,
93 m->epoch,
94 NullEvt(),
95 true,
f67539c2 96 new PGCreateInfo(pgid, m->epoch, history, pi, true)).first;
9f95a23c
TL
97 ret.push_back(op);
98 }
99 }
100 return ret;
101}
102
20effc67 103struct SubOpBlocker : crimson::BlockerT<SubOpBlocker> {
9f95a23c
TL
104 static constexpr const char * type_name = "CompoundOpBlocker";
105
20effc67
TL
106 std::vector<crimson::OperationRef> subops;
107 SubOpBlocker(std::vector<crimson::OperationRef> &&subops)
108 : subops(subops) {}
9f95a23c
TL
109
110 virtual void dump_detail(Formatter *f) const {
111 f->open_array_section("dependent_operations");
112 {
113 for (auto &i : subops) {
114 i->dump_brief(f);
115 }
116 }
117 f->close_section();
118 }
119};
120
121} // namespace
122
123namespace crimson::osd {
124
125CompoundPeeringRequest::CompoundPeeringRequest(
126 OSD &osd, crimson::net::ConnectionRef conn, Ref<Message> m)
127 : osd(osd),
128 conn(conn),
129 m(m)
130{}
131
132void CompoundPeeringRequest::print(std::ostream &lhs) const
133{
134 lhs << *m;
135}
136
137void CompoundPeeringRequest::dump_detail(Formatter *f) const
138{
139 f->dump_stream("message") << *m;
140}
141
142seastar::future<> CompoundPeeringRequest::start()
143{
144 logger().info("{}: starting", *this);
145 auto state = seastar::make_lw_shared<compound_state>();
146 auto blocker = std::make_unique<SubOpBlocker>(
147 [&] {
148 assert((m->get_type() == MSG_OSD_PG_CREATE2));
149 return handle_pg_create(
150 osd,
151 conn,
152 state,
153 boost::static_pointer_cast<MOSDPGCreate2>(m));
154 }());
155
156 IRef ref = this;
157 logger().info("{}: about to fork future", *this);
f67539c2
TL
158 return crimson::common::handle_system_shutdown(
159 [this, ref, blocker=std::move(blocker), state]() mutable {
160 return with_blocking_future(
161 blocker->make_blocking_future(state->promise.get_future())
162 ).then([this, blocker=std::move(blocker)](auto &&ctx) {
163 logger().info("{}: sub events complete", *this);
164 return osd.get_shard_services().dispatch_context_messages(std::move(ctx));
165 }).then([this, ref=std::move(ref)] {
166 logger().info("{}: complete", *this);
167 });
9f95a23c
TL
168 });
169}
170
171} // namespace crimson::osd