]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include <seastar/core/future.hh> | |
5 | ||
6 | #include "messages/MOSDPGLog.h" | |
7 | ||
8 | #include "common/Formatter.h" | |
9 | #include "crimson/osd/pg.h" | |
10 | #include "crimson/osd/osd.h" | |
11 | #include "crimson/osd/osd_operations/peering_event.h" | |
12 | #include "crimson/osd/osd_connection_priv.h" | |
13 | ||
14 | namespace { | |
15 | seastar::logger& logger() { | |
16 | return crimson::get_logger(ceph_subsys_osd); | |
17 | } | |
18 | } | |
19 | ||
20 | namespace crimson::osd { | |
21 | ||
22 | void PeeringEvent::print(std::ostream &lhs) const | |
23 | { | |
24 | lhs << "PeeringEvent(" | |
25 | << "from=" << from | |
26 | << " pgid=" << pgid | |
27 | << " sent=" << evt.get_epoch_sent() | |
28 | << " requested=" << evt.get_epoch_requested() | |
29 | << " evt=" << evt.get_desc() | |
30 | << ")"; | |
31 | } | |
32 | ||
33 | void PeeringEvent::dump_detail(Formatter *f) const | |
34 | { | |
35 | f->open_object_section("PeeringEvent"); | |
36 | f->dump_stream("from") << from; | |
37 | f->dump_stream("pgid") << pgid; | |
38 | f->dump_int("sent", evt.get_epoch_sent()); | |
39 | f->dump_int("requested", evt.get_epoch_requested()); | |
40 | f->dump_string("evt", evt.get_desc()); | |
41 | f->close_section(); | |
42 | } | |
43 | ||
44 | ||
45 | PeeringEvent::PGPipeline &PeeringEvent::pp(PG &pg) | |
46 | { | |
47 | return pg.peering_request_pg_pipeline; | |
48 | } | |
49 | ||
50 | seastar::future<> PeeringEvent::start() | |
51 | { | |
52 | ||
53 | logger().debug("{}: start", *this); | |
54 | ||
55 | IRef ref = this; | |
56 | return get_pg().then([this](Ref<PG> pg) { | |
57 | if (!pg) { | |
58 | logger().warn("{}: pg absent, did not create", *this); | |
59 | on_pg_absent(); | |
60 | handle.exit(); | |
61 | return complete_rctx(pg); | |
62 | } else { | |
63 | logger().debug("{}: pg present", *this); | |
64 | return with_blocking_future(handle.enter(pp(*pg).await_map) | |
65 | ).then([this, pg] { | |
66 | return with_blocking_future( | |
67 | pg->osdmap_gate.wait_for_map(evt.get_epoch_sent())); | |
68 | }).then([this, pg](auto) { | |
69 | return with_blocking_future(handle.enter(pp(*pg).process)); | |
70 | }).then([this, pg] { | |
71 | pg->do_peering_event(evt, ctx); | |
72 | handle.exit(); | |
73 | return complete_rctx(pg); | |
74 | }); | |
75 | } | |
76 | }).then([this, ref=std::move(ref)] { | |
77 | logger().debug("{}: complete", *this); | |
78 | }); | |
79 | } | |
80 | ||
81 | void PeeringEvent::on_pg_absent() | |
82 | { | |
83 | logger().debug("{}: pg absent, dropping", *this); | |
84 | } | |
85 | ||
86 | seastar::future<> PeeringEvent::complete_rctx(Ref<PG> pg) | |
87 | { | |
88 | logger().debug("{}: submitting ctx", *this); | |
89 | return shard_services.dispatch_context( | |
90 | pg->get_collection_ref(), | |
91 | std::move(ctx)); | |
92 | } | |
93 | ||
94 | RemotePeeringEvent::ConnectionPipeline &RemotePeeringEvent::cp() | |
95 | { | |
96 | return get_osd_priv(conn.get()).peering_request_conn_pipeline; | |
97 | } | |
98 | ||
99 | void RemotePeeringEvent::on_pg_absent() | |
100 | { | |
101 | if (auto& e = get_event().get_event(); | |
102 | e.dynamic_type() == MQuery::static_type()) { | |
103 | const auto map_epoch = | |
104 | shard_services.get_osdmap_service().get_map()->get_epoch(); | |
105 | const auto& q = static_cast<const MQuery&>(e); | |
106 | const pg_info_t empty{spg_t{pgid.pgid, q.query.to}}; | |
107 | if (q.query.type == q.query.LOG || | |
108 | q.query.type == q.query.FULLLOG) { | |
109 | auto m = ceph::make_message<MOSDPGLog>(q.query.from, q.query.to, | |
110 | map_epoch, empty, | |
111 | q.query.epoch_sent); | |
112 | ctx.send_osd_message(q.from.osd, std::move(m)); | |
113 | } else { | |
114 | ctx.send_notify(q.from.osd, {q.query.from, q.query.to, | |
115 | q.query.epoch_sent, | |
116 | map_epoch, empty, | |
117 | PastIntervals{}}); | |
118 | } | |
119 | } | |
120 | } | |
121 | ||
122 | seastar::future<> RemotePeeringEvent::complete_rctx(Ref<PG> pg) | |
123 | { | |
124 | if (pg) { | |
125 | return PeeringEvent::complete_rctx(pg); | |
126 | } else { | |
127 | return shard_services.dispatch_context_messages(std::move(ctx)); | |
128 | } | |
129 | } | |
130 | ||
131 | seastar::future<Ref<PG>> RemotePeeringEvent::get_pg() | |
132 | { | |
133 | return with_blocking_future( | |
134 | handle.enter(cp().await_map) | |
135 | ).then([this] { | |
136 | return with_blocking_future( | |
137 | osd.osdmap_gate.wait_for_map(evt.get_epoch_sent())); | |
138 | }).then([this](auto epoch) { | |
139 | logger().debug("{}: got map {}", *this, epoch); | |
140 | return with_blocking_future(handle.enter(cp().get_pg)); | |
141 | }).then([this] { | |
142 | return with_blocking_future( | |
143 | osd.get_or_create_pg( | |
144 | pgid, evt.get_epoch_sent(), std::move(evt.create_info))); | |
145 | }); | |
146 | } | |
147 | ||
148 | seastar::future<Ref<PG>> LocalPeeringEvent::get_pg() { | |
149 | return seastar::make_ready_future<Ref<PG>>(pg); | |
150 | } | |
151 | ||
152 | LocalPeeringEvent::~LocalPeeringEvent() {} | |
153 | ||
154 | } |