]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/osd/osd_operations/peering_event.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / crimson / osd / osd_operations / peering_event.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <seastar/core/future.hh>
5 #include <seastar/core/sleep.hh>
6
7 #include "messages/MOSDPGLog.h"
8
9 #include "common/Formatter.h"
10 #include "crimson/osd/pg.h"
11 #include "crimson/osd/osd.h"
12 #include "crimson/osd/osd_operations/peering_event.h"
13 #include "crimson/osd/osd_connection_priv.h"
14
15 namespace {
16 seastar::logger& logger() {
17 return crimson::get_logger(ceph_subsys_osd);
18 }
19 }
20
21 namespace crimson::osd {
22
23 void PeeringEvent::print(std::ostream &lhs) const
24 {
25 lhs << "PeeringEvent("
26 << "from=" << from
27 << " pgid=" << pgid
28 << " sent=" << evt.get_epoch_sent()
29 << " requested=" << evt.get_epoch_requested()
30 << " evt=" << evt.get_desc()
31 << ")";
32 }
33
34 void PeeringEvent::dump_detail(Formatter *f) const
35 {
36 f->open_object_section("PeeringEvent");
37 f->dump_stream("from") << from;
38 f->dump_stream("pgid") << pgid;
39 f->dump_int("sent", evt.get_epoch_sent());
40 f->dump_int("requested", evt.get_epoch_requested());
41 f->dump_string("evt", evt.get_desc());
42 f->close_section();
43 }
44
45
46 PeeringEvent::PGPipeline &PeeringEvent::pp(PG &pg)
47 {
48 return pg.peering_request_pg_pipeline;
49 }
50
51 seastar::future<> PeeringEvent::start()
52 {
53
54 logger().debug("{}: start", *this);
55
56 IRef ref = this;
57 auto maybe_delay = seastar::now();
58 if (delay) {
59 maybe_delay = seastar::sleep(
60 std::chrono::milliseconds(std::lround(delay * 1000)));
61 }
62 return maybe_delay.then([this] {
63 return get_pg();
64 }).then([this](Ref<PG> pg) {
65 if (!pg) {
66 logger().warn("{}: pg absent, did not create", *this);
67 on_pg_absent();
68 handle.exit();
69 return complete_rctx_no_pg();
70 }
71 return interruptor::with_interruption([this, pg] {
72 logger().debug("{}: pg present", *this);
73 return with_blocking_future_interruptible<interruptor::condition>(
74 handle.enter(pp(*pg).await_map)
75 ).then_interruptible([this, pg] {
76 return with_blocking_future_interruptible<interruptor::condition>(
77 pg->osdmap_gate.wait_for_map(evt.get_epoch_sent()));
78 }).then_interruptible([this, pg](auto) {
79 return with_blocking_future_interruptible<interruptor::condition>(
80 handle.enter(pp(*pg).process));
81 }).then_interruptible([this, pg] {
82 // TODO: likely we should synchronize also with the pg log-based
83 // recovery.
84 return with_blocking_future_interruptible<interruptor::condition>(
85 handle.enter(BackfillRecovery::bp(*pg).process));
86 }).then_interruptible([this, pg] {
87 pg->do_peering_event(evt, ctx);
88 handle.exit();
89 return complete_rctx(pg);
90 }).then_interruptible([this, pg] () -> PeeringEvent::interruptible_future<> {
91 if (!pg->get_need_up_thru()) {
92 return seastar::now();
93 }
94 return shard_services.send_alive(pg->get_same_interval_since());
95 }).then_interruptible([this] {
96 return shard_services.send_pg_temp();
97 });
98 },
99 [this](std::exception_ptr ep) {
100 logger().debug("{}: interrupted with {}", *this, ep);
101 return seastar::now();
102 },
103 pg);
104 }).finally([ref=std::move(ref)] {
105 logger().debug("{}: complete", *ref);
106 });
107 }
108
109 void PeeringEvent::on_pg_absent()
110 {
111 logger().debug("{}: pg absent, dropping", *this);
112 }
113
114 PeeringEvent::interruptible_future<> PeeringEvent::complete_rctx(Ref<PG> pg)
115 {
116 logger().debug("{}: submitting ctx", *this);
117 return shard_services.dispatch_context(
118 pg->get_collection_ref(),
119 std::move(ctx));
120 }
121
122 RemotePeeringEvent::ConnectionPipeline &RemotePeeringEvent::cp()
123 {
124 return get_osd_priv(conn.get()).peering_request_conn_pipeline;
125 }
126
127 RemotePeeringEvent::OSDPipeline &RemotePeeringEvent::op()
128 {
129 return osd.peering_request_osd_pipeline;
130 }
131
132 void RemotePeeringEvent::on_pg_absent()
133 {
134 if (auto& e = get_event().get_event();
135 e.dynamic_type() == MQuery::static_type()) {
136 const auto map_epoch =
137 shard_services.get_osdmap_service().get_map()->get_epoch();
138 const auto& q = static_cast<const MQuery&>(e);
139 const pg_info_t empty{spg_t{pgid.pgid, q.query.to}};
140 if (q.query.type == q.query.LOG ||
141 q.query.type == q.query.FULLLOG) {
142 auto m = crimson::make_message<MOSDPGLog>(q.query.from, q.query.to,
143 map_epoch, empty,
144 q.query.epoch_sent);
145 ctx.send_osd_message(q.from.osd, std::move(m));
146 } else {
147 ctx.send_notify(q.from.osd, {q.query.from, q.query.to,
148 q.query.epoch_sent,
149 map_epoch, empty,
150 PastIntervals{}});
151 }
152 }
153 }
154
155 PeeringEvent::interruptible_future<> RemotePeeringEvent::complete_rctx(Ref<PG> pg)
156 {
157 if (pg) {
158 return PeeringEvent::complete_rctx(pg);
159 } else {
160 logger().debug("{}: OSDState is {}", *this, osd.state);
161 return osd.state.when_active().then([this] {
162 assert(osd.state.is_active());
163 return shard_services.dispatch_context_messages(std::move(ctx));
164 });
165 }
166 }
167
168 seastar::future<> RemotePeeringEvent::complete_rctx_no_pg()
169 {
170 logger().debug("{}: OSDState is {}", *this, osd.state);
171 return osd.state.when_active().then([this] {
172 assert(osd.state.is_active());
173 return shard_services.dispatch_context_messages(std::move(ctx));
174 });
175 }
176
177 seastar::future<Ref<PG>> RemotePeeringEvent::get_pg()
178 {
179 return with_blocking_future(
180 handle.enter(op().await_active)
181 ).then([this] {
182 return osd.state.when_active();
183 }).then([this] {
184 return with_blocking_future(handle.enter(cp().await_map));
185 }).then([this] {
186 return with_blocking_future(
187 osd.osdmap_gate.wait_for_map(evt.get_epoch_sent()));
188 }).then([this](auto epoch) {
189 logger().debug("{}: got map {}", *this, epoch);
190 return with_blocking_future(handle.enter(cp().get_pg));
191 }).then([this] {
192 return with_blocking_future(
193 osd.get_or_create_pg(
194 pgid, evt.get_epoch_sent(), std::move(evt.create_info)));
195 });
196 }
197
198 seastar::future<Ref<PG>> LocalPeeringEvent::get_pg() {
199 return seastar::make_ready_future<Ref<PG>>(pg);
200 }
201
202 LocalPeeringEvent::~LocalPeeringEvent() {}
203
204 }