]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include <seastar/core/future.hh> | |
20effc67 | 5 | #include <seastar/core/sleep.hh> |
9f95a23c TL |
6 | |
7 | #include "messages/MOSDPGLog.h" | |
8 | ||
9 | #include "common/Formatter.h" | |
10 | #include "crimson/osd/pg.h" | |
11 | #include "crimson/osd/osd.h" | |
12 | #include "crimson/osd/osd_operations/peering_event.h" | |
13 | #include "crimson/osd/osd_connection_priv.h" | |
14 | ||
15 | namespace { | |
16 | seastar::logger& logger() { | |
17 | return crimson::get_logger(ceph_subsys_osd); | |
18 | } | |
19 | } | |
20 | ||
21 | namespace crimson::osd { | |
22 | ||
23 | void PeeringEvent::print(std::ostream &lhs) const | |
24 | { | |
25 | lhs << "PeeringEvent(" | |
26 | << "from=" << from | |
27 | << " pgid=" << pgid | |
28 | << " sent=" << evt.get_epoch_sent() | |
29 | << " requested=" << evt.get_epoch_requested() | |
30 | << " evt=" << evt.get_desc() | |
31 | << ")"; | |
32 | } | |
33 | ||
34 | void PeeringEvent::dump_detail(Formatter *f) const | |
35 | { | |
36 | f->open_object_section("PeeringEvent"); | |
37 | f->dump_stream("from") << from; | |
38 | f->dump_stream("pgid") << pgid; | |
39 | f->dump_int("sent", evt.get_epoch_sent()); | |
40 | f->dump_int("requested", evt.get_epoch_requested()); | |
41 | f->dump_string("evt", evt.get_desc()); | |
42 | f->close_section(); | |
43 | } | |
44 | ||
45 | ||
46 | PeeringEvent::PGPipeline &PeeringEvent::pp(PG &pg) | |
47 | { | |
48 | return pg.peering_request_pg_pipeline; | |
49 | } | |
50 | ||
51 | seastar::future<> PeeringEvent::start() | |
52 | { | |
53 | ||
54 | logger().debug("{}: start", *this); | |
55 | ||
56 | IRef ref = this; | |
20effc67 TL |
57 | auto maybe_delay = seastar::now(); |
58 | if (delay) { | |
59 | maybe_delay = seastar::sleep( | |
60 | std::chrono::milliseconds(std::lround(delay * 1000))); | |
61 | } | |
62 | return maybe_delay.then([this] { | |
f67539c2 TL |
63 | return get_pg(); |
64 | }).then([this](Ref<PG> pg) { | |
9f95a23c TL |
65 | if (!pg) { |
66 | logger().warn("{}: pg absent, did not create", *this); | |
67 | on_pg_absent(); | |
68 | handle.exit(); | |
20effc67 TL |
69 | return complete_rctx_no_pg(); |
70 | } | |
71 | return interruptor::with_interruption([this, pg] { | |
9f95a23c | 72 | logger().debug("{}: pg present", *this); |
20effc67 TL |
73 | return with_blocking_future_interruptible<interruptor::condition>( |
74 | handle.enter(pp(*pg).await_map) | |
75 | ).then_interruptible([this, pg] { | |
76 | return with_blocking_future_interruptible<interruptor::condition>( | |
77 | pg->osdmap_gate.wait_for_map(evt.get_epoch_sent())); | |
78 | }).then_interruptible([this, pg](auto) { | |
79 | return with_blocking_future_interruptible<interruptor::condition>( | |
80 | handle.enter(pp(*pg).process)); | |
81 | }).then_interruptible([this, pg] { | |
f67539c2 TL |
82 | // TODO: likely we should synchronize also with the pg log-based |
83 | // recovery. | |
20effc67 | 84 | return with_blocking_future_interruptible<interruptor::condition>( |
f67539c2 | 85 | handle.enter(BackfillRecovery::bp(*pg).process)); |
20effc67 TL |
86 | }).then_interruptible([this, pg] { |
87 | pg->do_peering_event(evt, ctx); | |
88 | handle.exit(); | |
89 | return complete_rctx(pg); | |
90 | }).then_interruptible([this, pg] () -> PeeringEvent::interruptible_future<> { | |
91 | if (!pg->get_need_up_thru()) { | |
92 | return seastar::now(); | |
93 | } | |
94 | return shard_services.send_alive(pg->get_same_interval_since()); | |
95 | }).then_interruptible([this] { | |
96 | return shard_services.send_pg_temp(); | |
9f95a23c | 97 | }); |
20effc67 TL |
98 | }, |
99 | [this](std::exception_ptr ep) { | |
100 | logger().debug("{}: interrupted with {}", *this, ep); | |
101 | return seastar::now(); | |
102 | }, | |
103 | pg); | |
104 | }).finally([ref=std::move(ref)] { | |
105 | logger().debug("{}: complete", *ref); | |
9f95a23c TL |
106 | }); |
107 | } | |
108 | ||
109 | void PeeringEvent::on_pg_absent() | |
110 | { | |
111 | logger().debug("{}: pg absent, dropping", *this); | |
112 | } | |
113 | ||
20effc67 | 114 | PeeringEvent::interruptible_future<> PeeringEvent::complete_rctx(Ref<PG> pg) |
9f95a23c TL |
115 | { |
116 | logger().debug("{}: submitting ctx", *this); | |
117 | return shard_services.dispatch_context( | |
118 | pg->get_collection_ref(), | |
119 | std::move(ctx)); | |
120 | } | |
121 | ||
122 | RemotePeeringEvent::ConnectionPipeline &RemotePeeringEvent::cp() | |
123 | { | |
124 | return get_osd_priv(conn.get()).peering_request_conn_pipeline; | |
125 | } | |
126 | ||
20effc67 TL |
127 | RemotePeeringEvent::OSDPipeline &RemotePeeringEvent::op() |
128 | { | |
129 | return osd.peering_request_osd_pipeline; | |
130 | } | |
131 | ||
9f95a23c TL |
132 | void RemotePeeringEvent::on_pg_absent() |
133 | { | |
134 | if (auto& e = get_event().get_event(); | |
135 | e.dynamic_type() == MQuery::static_type()) { | |
136 | const auto map_epoch = | |
137 | shard_services.get_osdmap_service().get_map()->get_epoch(); | |
138 | const auto& q = static_cast<const MQuery&>(e); | |
139 | const pg_info_t empty{spg_t{pgid.pgid, q.query.to}}; | |
140 | if (q.query.type == q.query.LOG || | |
141 | q.query.type == q.query.FULLLOG) { | |
20effc67 | 142 | auto m = crimson::make_message<MOSDPGLog>(q.query.from, q.query.to, |
9f95a23c TL |
143 | map_epoch, empty, |
144 | q.query.epoch_sent); | |
145 | ctx.send_osd_message(q.from.osd, std::move(m)); | |
146 | } else { | |
147 | ctx.send_notify(q.from.osd, {q.query.from, q.query.to, | |
148 | q.query.epoch_sent, | |
149 | map_epoch, empty, | |
150 | PastIntervals{}}); | |
151 | } | |
152 | } | |
153 | } | |
154 | ||
20effc67 | 155 | PeeringEvent::interruptible_future<> RemotePeeringEvent::complete_rctx(Ref<PG> pg) |
9f95a23c TL |
156 | { |
157 | if (pg) { | |
158 | return PeeringEvent::complete_rctx(pg); | |
159 | } else { | |
20effc67 TL |
160 | logger().debug("{}: OSDState is {}", *this, osd.state); |
161 | return osd.state.when_active().then([this] { | |
162 | assert(osd.state.is_active()); | |
163 | return shard_services.dispatch_context_messages(std::move(ctx)); | |
164 | }); | |
9f95a23c TL |
165 | } |
166 | } | |
167 | ||
20effc67 TL |
168 | seastar::future<> RemotePeeringEvent::complete_rctx_no_pg() |
169 | { | |
170 | logger().debug("{}: OSDState is {}", *this, osd.state); | |
171 | return osd.state.when_active().then([this] { | |
172 | assert(osd.state.is_active()); | |
173 | return shard_services.dispatch_context_messages(std::move(ctx)); | |
174 | }); | |
175 | } | |
176 | ||
9f95a23c TL |
177 | seastar::future<Ref<PG>> RemotePeeringEvent::get_pg() |
178 | { | |
179 | return with_blocking_future( | |
20effc67 | 180 | handle.enter(op().await_active) |
9f95a23c | 181 | ).then([this] { |
20effc67 TL |
182 | return osd.state.when_active(); |
183 | }).then([this] { | |
184 | return with_blocking_future(handle.enter(cp().await_map)); | |
185 | }).then([this] { | |
9f95a23c TL |
186 | return with_blocking_future( |
187 | osd.osdmap_gate.wait_for_map(evt.get_epoch_sent())); | |
188 | }).then([this](auto epoch) { | |
189 | logger().debug("{}: got map {}", *this, epoch); | |
190 | return with_blocking_future(handle.enter(cp().get_pg)); | |
191 | }).then([this] { | |
192 | return with_blocking_future( | |
193 | osd.get_or_create_pg( | |
194 | pgid, evt.get_epoch_sent(), std::move(evt.create_info))); | |
195 | }); | |
196 | } | |
197 | ||
198 | seastar::future<Ref<PG>> LocalPeeringEvent::get_pg() { | |
199 | return seastar::make_ready_future<Ref<PG>>(pg); | |
200 | } | |
201 | ||
202 | LocalPeeringEvent::~LocalPeeringEvent() {} | |
203 | ||
204 | } |