]> git.proxmox.com Git - ceph.git/blame - ceph/src/crimson/osd/osd_operations/peering_event.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / crimson / osd / osd_operations / peering_event.cc
CommitLineData
9f95a23c
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include <seastar/core/future.hh>
20effc67 5#include <seastar/core/sleep.hh>
9f95a23c
TL
6
7#include "messages/MOSDPGLog.h"
8
9#include "common/Formatter.h"
10#include "crimson/osd/pg.h"
11#include "crimson/osd/osd.h"
12#include "crimson/osd/osd_operations/peering_event.h"
13#include "crimson/osd/osd_connection_priv.h"
14
15namespace {
16 seastar::logger& logger() {
17 return crimson::get_logger(ceph_subsys_osd);
18 }
19}
20
21namespace crimson::osd {
22
23void PeeringEvent::print(std::ostream &lhs) const
24{
25 lhs << "PeeringEvent("
26 << "from=" << from
27 << " pgid=" << pgid
28 << " sent=" << evt.get_epoch_sent()
29 << " requested=" << evt.get_epoch_requested()
30 << " evt=" << evt.get_desc()
31 << ")";
32}
33
34void PeeringEvent::dump_detail(Formatter *f) const
35{
36 f->open_object_section("PeeringEvent");
37 f->dump_stream("from") << from;
38 f->dump_stream("pgid") << pgid;
39 f->dump_int("sent", evt.get_epoch_sent());
40 f->dump_int("requested", evt.get_epoch_requested());
41 f->dump_string("evt", evt.get_desc());
42 f->close_section();
43}
44
45
46PeeringEvent::PGPipeline &PeeringEvent::pp(PG &pg)
47{
48 return pg.peering_request_pg_pipeline;
49}
50
51seastar::future<> PeeringEvent::start()
52{
53
54 logger().debug("{}: start", *this);
55
56 IRef ref = this;
20effc67
TL
57 auto maybe_delay = seastar::now();
58 if (delay) {
59 maybe_delay = seastar::sleep(
60 std::chrono::milliseconds(std::lround(delay * 1000)));
61 }
62 return maybe_delay.then([this] {
f67539c2
TL
63 return get_pg();
64 }).then([this](Ref<PG> pg) {
9f95a23c
TL
65 if (!pg) {
66 logger().warn("{}: pg absent, did not create", *this);
67 on_pg_absent();
68 handle.exit();
20effc67
TL
69 return complete_rctx_no_pg();
70 }
71 return interruptor::with_interruption([this, pg] {
9f95a23c 72 logger().debug("{}: pg present", *this);
20effc67
TL
73 return with_blocking_future_interruptible<interruptor::condition>(
74 handle.enter(pp(*pg).await_map)
75 ).then_interruptible([this, pg] {
76 return with_blocking_future_interruptible<interruptor::condition>(
77 pg->osdmap_gate.wait_for_map(evt.get_epoch_sent()));
78 }).then_interruptible([this, pg](auto) {
79 return with_blocking_future_interruptible<interruptor::condition>(
80 handle.enter(pp(*pg).process));
81 }).then_interruptible([this, pg] {
f67539c2
TL
82 // TODO: likely we should synchronize also with the pg log-based
83 // recovery.
20effc67 84 return with_blocking_future_interruptible<interruptor::condition>(
f67539c2 85 handle.enter(BackfillRecovery::bp(*pg).process));
20effc67
TL
86 }).then_interruptible([this, pg] {
87 pg->do_peering_event(evt, ctx);
88 handle.exit();
89 return complete_rctx(pg);
90 }).then_interruptible([this, pg] () -> PeeringEvent::interruptible_future<> {
91 if (!pg->get_need_up_thru()) {
92 return seastar::now();
93 }
94 return shard_services.send_alive(pg->get_same_interval_since());
95 }).then_interruptible([this] {
96 return shard_services.send_pg_temp();
9f95a23c 97 });
20effc67
TL
98 },
99 [this](std::exception_ptr ep) {
100 logger().debug("{}: interrupted with {}", *this, ep);
101 return seastar::now();
102 },
103 pg);
104 }).finally([ref=std::move(ref)] {
105 logger().debug("{}: complete", *ref);
9f95a23c
TL
106 });
107}
108
109void PeeringEvent::on_pg_absent()
110{
111 logger().debug("{}: pg absent, dropping", *this);
112}
113
20effc67 114PeeringEvent::interruptible_future<> PeeringEvent::complete_rctx(Ref<PG> pg)
9f95a23c
TL
115{
116 logger().debug("{}: submitting ctx", *this);
117 return shard_services.dispatch_context(
118 pg->get_collection_ref(),
119 std::move(ctx));
120}
121
122RemotePeeringEvent::ConnectionPipeline &RemotePeeringEvent::cp()
123{
124 return get_osd_priv(conn.get()).peering_request_conn_pipeline;
125}
126
20effc67
TL
127RemotePeeringEvent::OSDPipeline &RemotePeeringEvent::op()
128{
129 return osd.peering_request_osd_pipeline;
130}
131
9f95a23c
TL
132void RemotePeeringEvent::on_pg_absent()
133{
134 if (auto& e = get_event().get_event();
135 e.dynamic_type() == MQuery::static_type()) {
136 const auto map_epoch =
137 shard_services.get_osdmap_service().get_map()->get_epoch();
138 const auto& q = static_cast<const MQuery&>(e);
139 const pg_info_t empty{spg_t{pgid.pgid, q.query.to}};
140 if (q.query.type == q.query.LOG ||
141 q.query.type == q.query.FULLLOG) {
20effc67 142 auto m = crimson::make_message<MOSDPGLog>(q.query.from, q.query.to,
9f95a23c
TL
143 map_epoch, empty,
144 q.query.epoch_sent);
145 ctx.send_osd_message(q.from.osd, std::move(m));
146 } else {
147 ctx.send_notify(q.from.osd, {q.query.from, q.query.to,
148 q.query.epoch_sent,
149 map_epoch, empty,
150 PastIntervals{}});
151 }
152 }
153}
154
20effc67 155PeeringEvent::interruptible_future<> RemotePeeringEvent::complete_rctx(Ref<PG> pg)
9f95a23c
TL
156{
157 if (pg) {
158 return PeeringEvent::complete_rctx(pg);
159 } else {
20effc67
TL
160 logger().debug("{}: OSDState is {}", *this, osd.state);
161 return osd.state.when_active().then([this] {
162 assert(osd.state.is_active());
163 return shard_services.dispatch_context_messages(std::move(ctx));
164 });
9f95a23c
TL
165 }
166}
167
20effc67
TL
168seastar::future<> RemotePeeringEvent::complete_rctx_no_pg()
169{
170 logger().debug("{}: OSDState is {}", *this, osd.state);
171 return osd.state.when_active().then([this] {
172 assert(osd.state.is_active());
173 return shard_services.dispatch_context_messages(std::move(ctx));
174 });
175}
176
9f95a23c
TL
177seastar::future<Ref<PG>> RemotePeeringEvent::get_pg()
178{
179 return with_blocking_future(
20effc67 180 handle.enter(op().await_active)
9f95a23c 181 ).then([this] {
20effc67
TL
182 return osd.state.when_active();
183 }).then([this] {
184 return with_blocking_future(handle.enter(cp().await_map));
185 }).then([this] {
9f95a23c
TL
186 return with_blocking_future(
187 osd.osdmap_gate.wait_for_map(evt.get_epoch_sent()));
188 }).then([this](auto epoch) {
189 logger().debug("{}: got map {}", *this, epoch);
190 return with_blocking_future(handle.enter(cp().get_pg));
191 }).then([this] {
192 return with_blocking_future(
193 osd.get_or_create_pg(
194 pgid, evt.get_epoch_sent(), std::move(evt.create_info)));
195 });
196}
197
198seastar::future<Ref<PG>> LocalPeeringEvent::get_pg() {
199 return seastar::make_ready_future<Ref<PG>>(pg);
200}
201
202LocalPeeringEvent::~LocalPeeringEvent() {}
203
204}