]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/osd/osd_operations/peering_event.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / crimson / osd / osd_operations / peering_event.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <seastar/core/future.hh>
5
6 #include "messages/MOSDPGLog.h"
7
8 #include "common/Formatter.h"
9 #include "crimson/osd/pg.h"
10 #include "crimson/osd/osd.h"
11 #include "crimson/osd/osd_operations/peering_event.h"
12 #include "crimson/osd/osd_connection_priv.h"
13
14 namespace {
15 seastar::logger& logger() {
16 return crimson::get_logger(ceph_subsys_osd);
17 }
18 }
19
20 namespace crimson::osd {
21
22 void PeeringEvent::print(std::ostream &lhs) const
23 {
24 lhs << "PeeringEvent("
25 << "from=" << from
26 << " pgid=" << pgid
27 << " sent=" << evt.get_epoch_sent()
28 << " requested=" << evt.get_epoch_requested()
29 << " evt=" << evt.get_desc()
30 << ")";
31 }
32
33 void PeeringEvent::dump_detail(Formatter *f) const
34 {
35 f->open_object_section("PeeringEvent");
36 f->dump_stream("from") << from;
37 f->dump_stream("pgid") << pgid;
38 f->dump_int("sent", evt.get_epoch_sent());
39 f->dump_int("requested", evt.get_epoch_requested());
40 f->dump_string("evt", evt.get_desc());
41 f->close_section();
42 }
43
44
45 PeeringEvent::PGPipeline &PeeringEvent::pp(PG &pg)
46 {
47 return pg.peering_request_pg_pipeline;
48 }
49
50 seastar::future<> PeeringEvent::start()
51 {
52
53 logger().debug("{}: start", *this);
54
55 IRef ref = this;
56 return get_pg().then([this](Ref<PG> pg) {
57 if (!pg) {
58 logger().warn("{}: pg absent, did not create", *this);
59 on_pg_absent();
60 handle.exit();
61 return complete_rctx(pg);
62 } else {
63 logger().debug("{}: pg present", *this);
64 return with_blocking_future(handle.enter(pp(*pg).await_map)
65 ).then([this, pg] {
66 return with_blocking_future(
67 pg->osdmap_gate.wait_for_map(evt.get_epoch_sent()));
68 }).then([this, pg](auto) {
69 return with_blocking_future(handle.enter(pp(*pg).process));
70 }).then([this, pg] {
71 pg->do_peering_event(evt, ctx);
72 handle.exit();
73 return complete_rctx(pg);
74 });
75 }
76 }).then([this, ref=std::move(ref)] {
77 logger().debug("{}: complete", *this);
78 });
79 }
80
81 void PeeringEvent::on_pg_absent()
82 {
83 logger().debug("{}: pg absent, dropping", *this);
84 }
85
86 seastar::future<> PeeringEvent::complete_rctx(Ref<PG> pg)
87 {
88 logger().debug("{}: submitting ctx", *this);
89 return shard_services.dispatch_context(
90 pg->get_collection_ref(),
91 std::move(ctx));
92 }
93
94 RemotePeeringEvent::ConnectionPipeline &RemotePeeringEvent::cp()
95 {
96 return get_osd_priv(conn.get()).peering_request_conn_pipeline;
97 }
98
99 void RemotePeeringEvent::on_pg_absent()
100 {
101 if (auto& e = get_event().get_event();
102 e.dynamic_type() == MQuery::static_type()) {
103 const auto map_epoch =
104 shard_services.get_osdmap_service().get_map()->get_epoch();
105 const auto& q = static_cast<const MQuery&>(e);
106 const pg_info_t empty{spg_t{pgid.pgid, q.query.to}};
107 if (q.query.type == q.query.LOG ||
108 q.query.type == q.query.FULLLOG) {
109 auto m = ceph::make_message<MOSDPGLog>(q.query.from, q.query.to,
110 map_epoch, empty,
111 q.query.epoch_sent);
112 ctx.send_osd_message(q.from.osd, std::move(m));
113 } else {
114 ctx.send_notify(q.from.osd, {q.query.from, q.query.to,
115 q.query.epoch_sent,
116 map_epoch, empty,
117 PastIntervals{}});
118 }
119 }
120 }
121
122 seastar::future<> RemotePeeringEvent::complete_rctx(Ref<PG> pg)
123 {
124 if (pg) {
125 return PeeringEvent::complete_rctx(pg);
126 } else {
127 return shard_services.dispatch_context_messages(std::move(ctx));
128 }
129 }
130
131 seastar::future<Ref<PG>> RemotePeeringEvent::get_pg()
132 {
133 return with_blocking_future(
134 handle.enter(cp().await_map)
135 ).then([this] {
136 return with_blocking_future(
137 osd.osdmap_gate.wait_for_map(evt.get_epoch_sent()));
138 }).then([this](auto epoch) {
139 logger().debug("{}: got map {}", *this, epoch);
140 return with_blocking_future(handle.enter(cp().get_pg));
141 }).then([this] {
142 return with_blocking_future(
143 osd.get_or_create_pg(
144 pgid, evt.get_epoch_sent(), std::move(evt.create_info)));
145 });
146 }
147
148 seastar::future<Ref<PG>> LocalPeeringEvent::get_pg() {
149 return seastar::make_ready_future<Ref<PG>>(pg);
150 }
151
152 LocalPeeringEvent::~LocalPeeringEvent() {}
153
154 }