]> git.proxmox.com Git - ceph.git/blame - ceph/src/crimson/osd/osd_operations/peering_event.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / crimson / osd / osd_operations / peering_event.cc
CommitLineData
9f95a23c
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include <seastar/core/future.hh>
5
6#include "messages/MOSDPGLog.h"
7
8#include "common/Formatter.h"
9#include "crimson/osd/pg.h"
10#include "crimson/osd/osd.h"
11#include "crimson/osd/osd_operations/peering_event.h"
12#include "crimson/osd/osd_connection_priv.h"
13
14namespace {
15 seastar::logger& logger() {
16 return crimson::get_logger(ceph_subsys_osd);
17 }
18}
19
20namespace crimson::osd {
21
22void PeeringEvent::print(std::ostream &lhs) const
23{
24 lhs << "PeeringEvent("
25 << "from=" << from
26 << " pgid=" << pgid
27 << " sent=" << evt.get_epoch_sent()
28 << " requested=" << evt.get_epoch_requested()
29 << " evt=" << evt.get_desc()
30 << ")";
31}
32
33void PeeringEvent::dump_detail(Formatter *f) const
34{
35 f->open_object_section("PeeringEvent");
36 f->dump_stream("from") << from;
37 f->dump_stream("pgid") << pgid;
38 f->dump_int("sent", evt.get_epoch_sent());
39 f->dump_int("requested", evt.get_epoch_requested());
40 f->dump_string("evt", evt.get_desc());
41 f->close_section();
42}
43
44
45PeeringEvent::PGPipeline &PeeringEvent::pp(PG &pg)
46{
47 return pg.peering_request_pg_pipeline;
48}
49
50seastar::future<> PeeringEvent::start()
51{
52
53 logger().debug("{}: start", *this);
54
55 IRef ref = this;
56 return get_pg().then([this](Ref<PG> pg) {
57 if (!pg) {
58 logger().warn("{}: pg absent, did not create", *this);
59 on_pg_absent();
60 handle.exit();
61 return complete_rctx(pg);
62 } else {
63 logger().debug("{}: pg present", *this);
64 return with_blocking_future(handle.enter(pp(*pg).await_map)
65 ).then([this, pg] {
66 return with_blocking_future(
67 pg->osdmap_gate.wait_for_map(evt.get_epoch_sent()));
68 }).then([this, pg](auto) {
69 return with_blocking_future(handle.enter(pp(*pg).process));
70 }).then([this, pg] {
71 pg->do_peering_event(evt, ctx);
72 handle.exit();
73 return complete_rctx(pg);
74 });
75 }
76 }).then([this, ref=std::move(ref)] {
77 logger().debug("{}: complete", *this);
78 });
79}
80
81void PeeringEvent::on_pg_absent()
82{
83 logger().debug("{}: pg absent, dropping", *this);
84}
85
86seastar::future<> PeeringEvent::complete_rctx(Ref<PG> pg)
87{
88 logger().debug("{}: submitting ctx", *this);
89 return shard_services.dispatch_context(
90 pg->get_collection_ref(),
91 std::move(ctx));
92}
93
94RemotePeeringEvent::ConnectionPipeline &RemotePeeringEvent::cp()
95{
96 return get_osd_priv(conn.get()).peering_request_conn_pipeline;
97}
98
99void RemotePeeringEvent::on_pg_absent()
100{
101 if (auto& e = get_event().get_event();
102 e.dynamic_type() == MQuery::static_type()) {
103 const auto map_epoch =
104 shard_services.get_osdmap_service().get_map()->get_epoch();
105 const auto& q = static_cast<const MQuery&>(e);
106 const pg_info_t empty{spg_t{pgid.pgid, q.query.to}};
107 if (q.query.type == q.query.LOG ||
108 q.query.type == q.query.FULLLOG) {
109 auto m = ceph::make_message<MOSDPGLog>(q.query.from, q.query.to,
110 map_epoch, empty,
111 q.query.epoch_sent);
112 ctx.send_osd_message(q.from.osd, std::move(m));
113 } else {
114 ctx.send_notify(q.from.osd, {q.query.from, q.query.to,
115 q.query.epoch_sent,
116 map_epoch, empty,
117 PastIntervals{}});
118 }
119 }
120}
121
122seastar::future<> RemotePeeringEvent::complete_rctx(Ref<PG> pg)
123{
124 if (pg) {
125 return PeeringEvent::complete_rctx(pg);
126 } else {
127 return shard_services.dispatch_context_messages(std::move(ctx));
128 }
129}
130
131seastar::future<Ref<PG>> RemotePeeringEvent::get_pg()
132{
133 return with_blocking_future(
134 handle.enter(cp().await_map)
135 ).then([this] {
136 return with_blocking_future(
137 osd.osdmap_gate.wait_for_map(evt.get_epoch_sent()));
138 }).then([this](auto epoch) {
139 logger().debug("{}: got map {}", *this, epoch);
140 return with_blocking_future(handle.enter(cp().get_pg));
141 }).then([this] {
142 return with_blocking_future(
143 osd.get_or_create_pg(
144 pgid, evt.get_epoch_sent(), std::move(evt.create_info)));
145 });
146}
147
148seastar::future<Ref<PG>> LocalPeeringEvent::get_pg() {
149 return seastar::make_ready_future<Ref<PG>>(pg);
150}
151
152LocalPeeringEvent::~LocalPeeringEvent() {}
153
154}