]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include <seastar/core/future.hh> | |
5 | ||
6 | #include "messages/MOSDOp.h" | |
7 | #include "messages/MOSDOpReply.h" | |
8 | ||
f67539c2 | 9 | #include "crimson/common/exception.h" |
9f95a23c TL |
10 | #include "crimson/osd/pg.h" |
11 | #include "crimson/osd/osd.h" | |
12 | #include "common/Formatter.h" | |
13 | #include "crimson/osd/osd_operations/client_request.h" | |
14 | #include "crimson/osd/osd_connection_priv.h" | |
15 | ||
16 | namespace { | |
17 | seastar::logger& logger() { | |
18 | return crimson::get_logger(ceph_subsys_osd); | |
19 | } | |
20 | } | |
21 | ||
22 | namespace crimson::osd { | |
23 | ||
24 | ClientRequest::ClientRequest( | |
25 | OSD &osd, crimson::net::ConnectionRef conn, Ref<MOSDOp> &&m) | |
26 | : osd(osd), conn(conn), m(m) | |
27 | {} | |
28 | ||
29 | void ClientRequest::print(std::ostream &lhs) const | |
30 | { | |
31 | lhs << *m; | |
32 | } | |
33 | ||
34 | void ClientRequest::dump_detail(Formatter *f) const | |
35 | { | |
36 | } | |
37 | ||
38 | ClientRequest::ConnectionPipeline &ClientRequest::cp() | |
39 | { | |
40 | return get_osd_priv(conn.get()).client_request_conn_pipeline; | |
41 | } | |
42 | ||
43 | ClientRequest::PGPipeline &ClientRequest::pp(PG &pg) | |
44 | { | |
45 | return pg.client_request_pg_pipeline; | |
46 | } | |
47 | ||
48 | bool ClientRequest::is_pg_op() const | |
49 | { | |
50 | return std::any_of( | |
51 | begin(m->ops), end(m->ops), | |
52 | [](auto& op) { return ceph_osd_op_type_pg(op.op.op); }); | |
53 | } | |
54 | ||
55 | seastar::future<> ClientRequest::start() | |
56 | { | |
57 | logger().debug("{}: start", *this); | |
58 | ||
59 | IRef opref = this; | |
f67539c2 TL |
60 | return crimson::common::handle_system_shutdown( |
61 | [this, opref=std::move(opref)]() mutable { | |
62 | return seastar::repeat([this, opref]() mutable { | |
63 | return with_blocking_future(handle.enter(cp().await_map)) | |
64 | .then([this]() { | |
65 | return with_blocking_future(osd.osdmap_gate.wait_for_map(m->get_min_epoch())); | |
66 | }).then([this](epoch_t epoch) { | |
67 | return with_blocking_future(handle.enter(cp().get_pg)); | |
68 | }).then([this] { | |
69 | return with_blocking_future(osd.wait_for_pg(m->get_spg())); | |
70 | }).then([this, opref](Ref<PG> pgref) { | |
71 | PG &pg = *pgref; | |
72 | if (pg.can_discard_op(*m)) { | |
73 | return osd.send_incremental_map(conn, m->get_map_epoch()); | |
74 | } | |
75 | return with_blocking_future( | |
76 | handle.enter(pp(pg).await_map) | |
77 | ).then([this, &pg]() mutable { | |
9f95a23c | 78 | return with_blocking_future( |
f67539c2 TL |
79 | pg.osdmap_gate.wait_for_map(m->get_min_epoch())); |
80 | }).then([this, &pg](auto map) mutable { | |
81 | return with_blocking_future( | |
82 | handle.enter(pp(pg).wait_for_active)); | |
83 | }).then([this, &pg]() mutable { | |
84 | return with_blocking_future(pg.wait_for_active_blocker.wait()); | |
85 | }).then([this, pgref=std::move(pgref)]() mutable { | |
86 | if (m->finish_decode()) { | |
87 | m->clear_payload(); | |
88 | } | |
89 | if (is_pg_op()) { | |
90 | return process_pg_op(pgref); | |
91 | } else { | |
92 | return process_op(pgref); | |
93 | } | |
9f95a23c | 94 | }); |
f67539c2 TL |
95 | }).then([] { |
96 | return seastar::stop_iteration::yes; | |
97 | }).handle_exception_type([](crimson::common::actingset_changed& e) { | |
98 | if (e.is_primary()) { | |
99 | logger().debug("operation restart, acting set changed"); | |
100 | return seastar::stop_iteration::no; | |
101 | } else { | |
102 | logger().debug("operation abort, up primary changed"); | |
103 | return seastar::stop_iteration::yes; | |
104 | } | |
105 | }); | |
9f95a23c | 106 | }); |
f67539c2 | 107 | }); |
9f95a23c TL |
108 | } |
109 | ||
110 | seastar::future<> ClientRequest::process_pg_op( | |
f67539c2 | 111 | Ref<PG> &pg) |
9f95a23c | 112 | { |
f67539c2 TL |
113 | return pg->do_pg_ops(m) |
114 | .then([this, pg=std::move(pg)](Ref<MOSDOpReply> reply) { | |
9f95a23c TL |
115 | return conn->send(reply); |
116 | }); | |
117 | } | |
118 | ||
119 | seastar::future<> ClientRequest::process_op( | |
f67539c2 | 120 | Ref<PG> &pgref) |
9f95a23c | 121 | { |
f67539c2 | 122 | PG& pg = *pgref; |
9f95a23c | 123 | return with_blocking_future( |
f67539c2 TL |
124 | handle.enter(pp(pg).recover_missing) |
125 | ).then([this, &pg, pgref] { | |
126 | eversion_t ver; | |
127 | const hobject_t& soid = m->get_hobj(); | |
128 | logger().debug("{} check for recovery, {}", *this, soid); | |
129 | if (pg.is_unreadable_object(soid, &ver) || | |
130 | pg.is_degraded_or_backfilling_object(soid)) { | |
131 | logger().debug("{} need to wait for recovery, {}", *this, soid); | |
132 | if (pg.get_recovery_backend()->is_recovering(soid)) { | |
133 | return pg.get_recovery_backend()->get_recovering(soid).wait_for_recovered(); | |
134 | } else { | |
135 | auto [op, fut] = osd.get_shard_services().start_operation<UrgentRecovery>( | |
136 | soid, ver, pgref, osd.get_shard_services(), pg.get_osdmap_epoch()); | |
137 | return std::move(fut); | |
138 | } | |
139 | } | |
140 | return seastar::now(); | |
141 | }).then([this, &pg] { | |
142 | return with_blocking_future(handle.enter(pp(pg).get_obc)); | |
143 | }).then([this, &pg]() -> PG::load_obc_ertr::future<> { | |
9f95a23c | 144 | op_info.set_from_op(&*m, *pg.get_osdmap()); |
f67539c2 TL |
145 | return pg.with_locked_obc(m, op_info, this, [this, &pg](auto obc) { |
146 | return with_blocking_future( | |
147 | handle.enter(pp(pg).process) | |
148 | ).then([this, &pg, obc] { | |
149 | if (!pg.is_primary()) { | |
150 | // primary can handle both normal ops and balanced reads | |
151 | if (is_misdirected(pg)) { | |
152 | logger().trace("process_op: dropping misdirected op"); | |
153 | return seastar::make_ready_future<Ref<MOSDOpReply>>(); | |
154 | } else if (const hobject_t& hoid = m->get_hobj(); | |
155 | !pg.get_peering_state().can_serve_replica_read(hoid)) { | |
156 | auto reply = make_message<MOSDOpReply>( | |
157 | m.get(), -EAGAIN, pg.get_osdmap_epoch(), | |
158 | m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK), | |
159 | !m->has_flag(CEPH_OSD_FLAG_RETURNVEC)); | |
160 | return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply)); | |
161 | } | |
162 | } | |
163 | return pg.do_osd_ops(m, obc, op_info); | |
164 | }).then([this](Ref<MOSDOpReply> reply) { | |
165 | if (reply) { | |
166 | return conn->send(std::move(reply)); | |
167 | } else { | |
168 | return seastar::now(); | |
169 | } | |
9f95a23c | 170 | }); |
f67539c2 TL |
171 | }); |
172 | }).safe_then([pgref=std::move(pgref)] { | |
9f95a23c TL |
173 | return seastar::now(); |
174 | }, PG::load_obc_ertr::all_same_way([](auto &code) { | |
175 | logger().error("ClientRequest saw error code {}", code); | |
176 | return seastar::now(); | |
177 | })); | |
178 | } | |
f67539c2 TL |
179 | |
180 | bool ClientRequest::is_misdirected(const PG& pg) const | |
181 | { | |
182 | // otherwise take a closer look | |
183 | if (const int flags = m->get_flags(); | |
184 | flags & CEPH_OSD_FLAG_BALANCE_READS || | |
185 | flags & CEPH_OSD_FLAG_LOCALIZE_READS) { | |
186 | if (!op_info.may_read()) { | |
187 | // no read found, so it can't be balanced read | |
188 | return true; | |
189 | } | |
190 | if (op_info.may_write() || op_info.may_cache()) { | |
191 | // write op, but i am not primary | |
192 | return true; | |
193 | } | |
194 | // balanced reads; any replica will do | |
195 | return pg.is_nonprimary(); | |
196 | } | |
197 | // neither balanced nor localize reads | |
198 | return true; | |
199 | } | |
200 | ||
9f95a23c | 201 | } |