]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include <seastar/core/future.hh> | |
5 | ||
6 | #include "osd/PeeringState.h" | |
7 | ||
9f95a23c TL |
8 | #include "messages/MOSDPGCreate2.h" |
9 | ||
10 | #include "common/Formatter.h" | |
11 | ||
f67539c2 | 12 | #include "crimson/common/exception.h" |
9f95a23c TL |
13 | #include "crimson/osd/pg.h" |
14 | #include "crimson/osd/osd.h" | |
15 | #include "crimson/osd/osd_operations/compound_peering_request.h" | |
16 | ||
17 | namespace { | |
18 | seastar::logger& logger() { | |
19 | return crimson::get_logger(ceph_subsys_osd); | |
20 | } | |
21 | } | |
22 | ||
23 | namespace { | |
24 | using namespace crimson::osd; | |
25 | ||
26 | struct compound_state { | |
27 | seastar::promise<BufferedRecoveryMessages> promise; | |
28 | // assuming crimson-osd won't need to be compatible with pre-octopus | |
29 | // releases | |
20effc67 | 30 | BufferedRecoveryMessages ctx; |
9f95a23c TL |
31 | compound_state() = default; |
32 | ~compound_state() { | |
33 | promise.set_value(std::move(ctx)); | |
34 | } | |
35 | }; | |
36 | using compound_state_ref = seastar::lw_shared_ptr<compound_state>; | |
37 | ||
38 | class PeeringSubEvent : public RemotePeeringEvent { | |
39 | compound_state_ref state; | |
40 | public: | |
41 | template <typename... Args> | |
42 | PeeringSubEvent(compound_state_ref state, Args &&... args) : | |
43 | RemotePeeringEvent(std::forward<Args>(args)...), state(state) {} | |
44 | ||
20effc67 TL |
45 | PeeringEvent::interruptible_future<> |
46 | complete_rctx(Ref<crimson::osd::PG> pg) final { | |
9f95a23c TL |
47 | logger().debug("{}: submitting ctx transaction", *this); |
48 | state->ctx.accept_buffered_messages(ctx); | |
49 | state = {}; | |
50 | if (!pg) { | |
51 | ceph_assert(ctx.transaction.empty()); | |
52 | return seastar::now(); | |
53 | } else { | |
54 | return osd.get_shard_services().dispatch_context_transaction( | |
55 | pg->get_collection_ref(), ctx); | |
56 | } | |
57 | } | |
58 | }; | |
59 | ||
20effc67 | 60 | std::vector<crimson::OperationRef> handle_pg_create( |
9f95a23c TL |
61 | OSD &osd, |
62 | crimson::net::ConnectionRef conn, | |
63 | compound_state_ref state, | |
64 | Ref<MOSDPGCreate2> m) | |
65 | { | |
20effc67 | 66 | std::vector<crimson::OperationRef> ret; |
f67539c2 TL |
67 | for (auto& [pgid, when] : m->pgs) { |
68 | const auto &[created, created_stamp] = when; | |
9f95a23c TL |
69 | auto q = m->pg_extra.find(pgid); |
70 | ceph_assert(q != m->pg_extra.end()); | |
f67539c2 | 71 | auto& [history, pi] = q->second; |
9f95a23c | 72 | logger().debug( |
f67539c2 TL |
73 | "{}: {} e{} @{} " |
74 | "history {} pi {}", | |
75 | __func__, pgid, created, created_stamp, | |
76 | history, pi); | |
77 | if (!pi.empty() && | |
78 | m->epoch < pi.get_bounds().second) { | |
9f95a23c | 79 | logger().error( |
f67539c2 TL |
80 | "got pg_create on {} epoch {} " |
81 | "unmatched past_intervals {} (history {})", | |
82 | pgid, m->epoch, | |
83 | pi, history); | |
9f95a23c TL |
84 | } else { |
85 | auto op = osd.get_shard_services().start_operation<PeeringSubEvent>( | |
86 | state, | |
87 | osd, | |
88 | conn, | |
89 | osd.get_shard_services(), | |
90 | pg_shard_t(), | |
91 | pgid, | |
92 | m->epoch, | |
93 | m->epoch, | |
94 | NullEvt(), | |
95 | true, | |
f67539c2 | 96 | new PGCreateInfo(pgid, m->epoch, history, pi, true)).first; |
9f95a23c TL |
97 | ret.push_back(op); |
98 | } | |
99 | } | |
100 | return ret; | |
101 | } | |
102 | ||
20effc67 | 103 | struct SubOpBlocker : crimson::BlockerT<SubOpBlocker> { |
9f95a23c TL |
104 | static constexpr const char * type_name = "CompoundOpBlocker"; |
105 | ||
20effc67 TL |
106 | std::vector<crimson::OperationRef> subops; |
107 | SubOpBlocker(std::vector<crimson::OperationRef> &&subops) | |
108 | : subops(subops) {} | |
9f95a23c TL |
109 | |
110 | virtual void dump_detail(Formatter *f) const { | |
111 | f->open_array_section("dependent_operations"); | |
112 | { | |
113 | for (auto &i : subops) { | |
114 | i->dump_brief(f); | |
115 | } | |
116 | } | |
117 | f->close_section(); | |
118 | } | |
119 | }; | |
120 | ||
121 | } // namespace | |
122 | ||
123 | namespace crimson::osd { | |
124 | ||
125 | CompoundPeeringRequest::CompoundPeeringRequest( | |
126 | OSD &osd, crimson::net::ConnectionRef conn, Ref<Message> m) | |
127 | : osd(osd), | |
128 | conn(conn), | |
129 | m(m) | |
130 | {} | |
131 | ||
132 | void CompoundPeeringRequest::print(std::ostream &lhs) const | |
133 | { | |
134 | lhs << *m; | |
135 | } | |
136 | ||
137 | void CompoundPeeringRequest::dump_detail(Formatter *f) const | |
138 | { | |
139 | f->dump_stream("message") << *m; | |
140 | } | |
141 | ||
142 | seastar::future<> CompoundPeeringRequest::start() | |
143 | { | |
144 | logger().info("{}: starting", *this); | |
145 | auto state = seastar::make_lw_shared<compound_state>(); | |
146 | auto blocker = std::make_unique<SubOpBlocker>( | |
147 | [&] { | |
148 | assert((m->get_type() == MSG_OSD_PG_CREATE2)); | |
149 | return handle_pg_create( | |
150 | osd, | |
151 | conn, | |
152 | state, | |
153 | boost::static_pointer_cast<MOSDPGCreate2>(m)); | |
154 | }()); | |
155 | ||
156 | IRef ref = this; | |
157 | logger().info("{}: about to fork future", *this); | |
f67539c2 TL |
158 | return crimson::common::handle_system_shutdown( |
159 | [this, ref, blocker=std::move(blocker), state]() mutable { | |
160 | return with_blocking_future( | |
161 | blocker->make_blocking_future(state->promise.get_future()) | |
162 | ).then([this, blocker=std::move(blocker)](auto &&ctx) { | |
163 | logger().info("{}: sub events complete", *this); | |
164 | return osd.get_shard_services().dispatch_context_messages(std::move(ctx)); | |
165 | }).then([this, ref=std::move(ref)] { | |
166 | logger().info("{}: complete", *this); | |
167 | }); | |
9f95a23c TL |
168 | }); |
169 | } | |
170 | ||
171 | } // namespace crimson::osd |