]>
Commit | Line | Data |
---|---|---|
20effc67 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- |
2 | // vim: ts=8 sw=2 smarttab expandtab | |
9f95a23c TL |
3 | |
4 | #include <seastar/core/future.hh> | |
5 | ||
6 | #include "messages/MOSDOp.h" | |
7 | #include "messages/MOSDOpReply.h" | |
8 | ||
f67539c2 | 9 | #include "crimson/common/exception.h" |
9f95a23c TL |
10 | #include "crimson/osd/pg.h" |
11 | #include "crimson/osd/osd.h" | |
12 | #include "common/Formatter.h" | |
20effc67 | 13 | #include "crimson/osd/osd_operation_sequencer.h" |
9f95a23c TL |
14 | #include "crimson/osd/osd_operations/client_request.h" |
15 | #include "crimson/osd/osd_connection_priv.h" | |
16 | ||
17 | namespace { | |
18 | seastar::logger& logger() { | |
19 | return crimson::get_logger(ceph_subsys_osd); | |
20 | } | |
21 | } | |
22 | ||
23 | namespace crimson::osd { | |
24 | ||
25 | ClientRequest::ClientRequest( | |
26 | OSD &osd, crimson::net::ConnectionRef conn, Ref<MOSDOp> &&m) | |
20effc67 TL |
27 | : osd(osd), |
28 | conn(conn), | |
29 | m(m), | |
30 | sequencer(get_osd_priv(conn.get()).op_sequencer[m->get_spg()]) | |
9f95a23c TL |
31 | {} |
32 | ||
20effc67 TL |
33 | ClientRequest::~ClientRequest() |
34 | { | |
35 | logger().debug("{}: destroying", *this); | |
36 | } | |
37 | ||
9f95a23c TL |
38 | void ClientRequest::print(std::ostream &lhs) const |
39 | { | |
20effc67 | 40 | lhs << "m=[" << *m << "]"; |
9f95a23c TL |
41 | } |
42 | ||
43 | void ClientRequest::dump_detail(Formatter *f) const | |
44 | { | |
45 | } | |
46 | ||
47 | ClientRequest::ConnectionPipeline &ClientRequest::cp() | |
48 | { | |
49 | return get_osd_priv(conn.get()).client_request_conn_pipeline; | |
50 | } | |
51 | ||
52 | ClientRequest::PGPipeline &ClientRequest::pp(PG &pg) | |
53 | { | |
54 | return pg.client_request_pg_pipeline; | |
55 | } | |
56 | ||
20effc67 TL |
57 | bool ClientRequest::same_session_and_pg(const ClientRequest& other_op) const |
58 | { | |
59 | return &get_osd_priv(conn.get()) == &get_osd_priv(other_op.conn.get()) && | |
60 | m->get_spg() == other_op.m->get_spg(); | |
61 | } | |
62 | ||
9f95a23c TL |
63 | bool ClientRequest::is_pg_op() const |
64 | { | |
65 | return std::any_of( | |
66 | begin(m->ops), end(m->ops), | |
67 | [](auto& op) { return ceph_osd_op_type_pg(op.op.op); }); | |
68 | } | |
69 | ||
20effc67 TL |
70 | template <typename FuncT> |
71 | ClientRequest::interruptible_future<> ClientRequest::with_sequencer(FuncT&& func) | |
72 | { | |
73 | return sequencer.start_op(*this, handle, osd.get_shard_services().registry, std::forward<FuncT>(func)); | |
74 | } | |
75 | ||
9f95a23c TL |
76 | seastar::future<> ClientRequest::start() |
77 | { | |
78 | logger().debug("{}: start", *this); | |
79 | ||
20effc67 TL |
80 | return seastar::repeat([this, opref=IRef{this}]() mutable { |
81 | logger().debug("{}: in repeat", *this); | |
f67539c2 TL |
82 | return with_blocking_future(handle.enter(cp().await_map)) |
83 | .then([this]() { | |
20effc67 TL |
84 | return with_blocking_future( |
85 | osd.osdmap_gate.wait_for_map( | |
86 | m->get_min_epoch())); | |
f67539c2 TL |
87 | }).then([this](epoch_t epoch) { |
88 | return with_blocking_future(handle.enter(cp().get_pg)); | |
89 | }).then([this] { | |
90 | return with_blocking_future(osd.wait_for_pg(m->get_spg())); | |
20effc67 TL |
91 | }).then([this](Ref<PG> pgref) mutable { |
92 | return interruptor::with_interruption([this, pgref]() mutable { | |
93 | epoch_t same_interval_since = pgref->get_interval_start_epoch(); | |
94 | logger().debug("{} same_interval_since: {}", *this, same_interval_since); | |
95 | if (m->finish_decode()) { | |
96 | m->clear_payload(); | |
97 | } | |
98 | return with_sequencer(interruptor::wrap_function([pgref, this] { | |
99 | PG &pg = *pgref; | |
100 | if (pg.can_discard_op(*m)) { | |
101 | return osd.send_incremental_map( | |
102 | conn, m->get_map_epoch()).then([this] { | |
103 | sequencer.finish_op_out_of_order(*this, osd.get_shard_services().registry); | |
104 | return interruptor::now(); | |
105 | }); | |
106 | } | |
107 | return with_blocking_future_interruptible<interruptor::condition>( | |
108 | handle.enter(pp(pg).await_map) | |
109 | ).then_interruptible([this, &pg] { | |
110 | return with_blocking_future_interruptible<interruptor::condition>( | |
111 | pg.osdmap_gate.wait_for_map(m->get_min_epoch())); | |
112 | }).then_interruptible([this, &pg](auto map) { | |
113 | return with_blocking_future_interruptible<interruptor::condition>( | |
114 | handle.enter(pp(pg).wait_for_active)); | |
115 | }).then_interruptible([this, &pg]() { | |
116 | return with_blocking_future_interruptible<interruptor::condition>( | |
117 | pg.wait_for_active_blocker.wait()); | |
118 | }).then_interruptible([this, pgref=std::move(pgref)]() mutable { | |
119 | if (is_pg_op()) { | |
120 | return process_pg_op(pgref).then_interruptible([this] { | |
121 | sequencer.finish_op_out_of_order(*this, osd.get_shard_services().registry); | |
122 | }); | |
123 | } else { | |
124 | return process_op(pgref).then_interruptible([this] (const seq_mode_t mode) { | |
125 | if (mode == seq_mode_t::IN_ORDER) { | |
126 | sequencer.finish_op_in_order(*this); | |
127 | } else { | |
128 | assert(mode == seq_mode_t::OUT_OF_ORDER); | |
129 | sequencer.finish_op_out_of_order(*this, osd.get_shard_services().registry); | |
130 | } | |
131 | }); | |
132 | } | |
133 | }); | |
134 | })).then_interruptible([pgref] { | |
135 | return seastar::stop_iteration::yes; | |
136 | }); | |
137 | }, [this, pgref](std::exception_ptr eptr) { | |
138 | if (should_abort_request(*this, std::move(eptr))) { | |
139 | sequencer.abort(); | |
140 | return seastar::stop_iteration::yes; | |
141 | } else { | |
142 | sequencer.maybe_reset(*this); | |
143 | return seastar::stop_iteration::no; | |
144 | } | |
145 | }, pgref); | |
f67539c2 | 146 | }); |
9f95a23c TL |
147 | }); |
148 | } | |
149 | ||
20effc67 TL |
150 | ClientRequest::interruptible_future<> |
151 | ClientRequest::process_pg_op( | |
f67539c2 | 152 | Ref<PG> &pg) |
9f95a23c | 153 | { |
f67539c2 | 154 | return pg->do_pg_ops(m) |
20effc67 TL |
155 | .then_interruptible([this, pg=std::move(pg)](MURef<MOSDOpReply> reply) { |
156 | return conn->send(std::move(reply)); | |
9f95a23c TL |
157 | }); |
158 | } | |
159 | ||
20effc67 TL |
160 | ClientRequest::interruptible_future<ClientRequest::seq_mode_t> |
161 | ClientRequest::process_op(Ref<PG> &pg) | |
9f95a23c | 162 | { |
20effc67 TL |
163 | return with_blocking_future_interruptible<interruptor::condition>( |
164 | handle.enter(pp(*pg).recover_missing)) | |
165 | .then_interruptible( | |
166 | [this, pg]() mutable { | |
167 | return do_recover_missing(pg, m->get_hobj()); | |
168 | }).then_interruptible([this, pg]() mutable { | |
169 | return pg->already_complete(m->get_reqid()).then_unpack_interruptible( | |
170 | [this, pg](bool completed, int ret) mutable | |
171 | -> PG::load_obc_iertr::future<seq_mode_t> { | |
172 | if (completed) { | |
173 | auto reply = crimson::make_message<MOSDOpReply>( | |
174 | m.get(), ret, pg->get_osdmap_epoch(), | |
175 | CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK, false); | |
176 | return conn->send(std::move(reply)).then([] { | |
177 | return seastar::make_ready_future<seq_mode_t>(seq_mode_t::OUT_OF_ORDER); | |
178 | }); | |
f67539c2 | 179 | } else { |
20effc67 TL |
180 | return with_blocking_future_interruptible<interruptor::condition>( |
181 | handle.enter(pp(*pg).get_obc)).then_interruptible( | |
182 | [this, pg]() mutable -> PG::load_obc_iertr::future<seq_mode_t> { | |
183 | logger().debug("{}: got obc lock", *this); | |
184 | op_info.set_from_op(&*m, *pg->get_osdmap()); | |
185 | // XXX: `do_with()` is just a workaround for `with_obc_func_t` imposing | |
186 | // `future<void>`. | |
187 | return seastar::do_with(seq_mode_t{}, [this, &pg] (seq_mode_t& mode) { | |
188 | return pg->with_locked_obc(m->get_hobj(), op_info, | |
189 | [this, pg, &mode](auto obc) mutable { | |
190 | return with_blocking_future_interruptible<interruptor::condition>( | |
191 | handle.enter(pp(*pg).process) | |
192 | ).then_interruptible([this, pg, obc, &mode]() mutable { | |
193 | return do_process(pg, obc).then_interruptible([&mode] (seq_mode_t _mode) { | |
194 | mode = _mode; | |
195 | return seastar::now(); | |
196 | }); | |
197 | }); | |
198 | }).safe_then_interruptible([&mode] { | |
199 | return PG::load_obc_iertr::make_ready_future<seq_mode_t>(mode); | |
200 | }); | |
201 | }); | |
202 | }); | |
f67539c2 | 203 | } |
f67539c2 | 204 | }); |
20effc67 TL |
205 | }).safe_then_interruptible([pg=std::move(pg)] (const seq_mode_t mode) { |
206 | return seastar::make_ready_future<seq_mode_t>(mode); | |
9f95a23c TL |
207 | }, PG::load_obc_ertr::all_same_way([](auto &code) { |
208 | logger().error("ClientRequest saw error code {}", code); | |
20effc67 TL |
209 | return seastar::make_ready_future<seq_mode_t>(seq_mode_t::OUT_OF_ORDER); |
210 | })); | |
211 | } | |
212 | ||
213 | ClientRequest::interruptible_future<ClientRequest::seq_mode_t> | |
214 | ClientRequest::do_process(Ref<PG>& pg, crimson::osd::ObjectContextRef obc) | |
215 | { | |
216 | if (!pg->is_primary()) { | |
217 | // primary can handle both normal ops and balanced reads | |
218 | if (is_misdirected(*pg)) { | |
219 | logger().trace("do_process: dropping misdirected op"); | |
220 | return seastar::make_ready_future<seq_mode_t>(seq_mode_t::OUT_OF_ORDER); | |
221 | } else if (const hobject_t& hoid = m->get_hobj(); | |
222 | !pg->get_peering_state().can_serve_replica_read(hoid)) { | |
223 | auto reply = crimson::make_message<MOSDOpReply>( | |
224 | m.get(), -EAGAIN, pg->get_osdmap_epoch(), | |
225 | m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK), | |
226 | !m->has_flag(CEPH_OSD_FLAG_RETURNVEC)); | |
227 | return conn->send(std::move(reply)).then([] { | |
228 | return seastar::make_ready_future<seq_mode_t>(seq_mode_t::OUT_OF_ORDER); | |
229 | }); | |
230 | } | |
231 | } | |
232 | return pg->do_osd_ops(m, obc, op_info).safe_then_unpack_interruptible( | |
233 | [this, pg](auto submitted, auto all_completed) mutable { | |
234 | return submitted.then_interruptible( | |
235 | [this, pg] { | |
236 | return with_blocking_future_interruptible<interruptor::condition>( | |
237 | handle.enter(pp(*pg).wait_repop)); | |
238 | }).then_interruptible( | |
239 | [this, pg, all_completed=std::move(all_completed)]() mutable { | |
240 | return all_completed.safe_then_interruptible( | |
241 | [this, pg](MURef<MOSDOpReply> reply) { | |
242 | return with_blocking_future_interruptible<interruptor::condition>( | |
243 | handle.enter(pp(*pg).send_reply)).then_interruptible( | |
244 | [this, reply=std::move(reply)]() mutable{ | |
245 | return conn->send(std::move(reply)).then([] { | |
246 | return seastar::make_ready_future<seq_mode_t>(seq_mode_t::IN_ORDER); | |
247 | }); | |
248 | }); | |
249 | }, crimson::ct_error::eagain::handle([this, pg]() mutable { | |
250 | return process_op(pg); | |
251 | })); | |
252 | }); | |
253 | }, crimson::ct_error::eagain::handle([this, pg]() mutable { | |
254 | return process_op(pg); | |
9f95a23c TL |
255 | })); |
256 | } | |
f67539c2 TL |
257 | |
258 | bool ClientRequest::is_misdirected(const PG& pg) const | |
259 | { | |
260 | // otherwise take a closer look | |
261 | if (const int flags = m->get_flags(); | |
262 | flags & CEPH_OSD_FLAG_BALANCE_READS || | |
263 | flags & CEPH_OSD_FLAG_LOCALIZE_READS) { | |
264 | if (!op_info.may_read()) { | |
265 | // no read found, so it can't be balanced read | |
266 | return true; | |
267 | } | |
268 | if (op_info.may_write() || op_info.may_cache()) { | |
269 | // write op, but i am not primary | |
270 | return true; | |
271 | } | |
272 | // balanced reads; any replica will do | |
273 | return pg.is_nonprimary(); | |
274 | } | |
275 | // neither balanced nor localize reads | |
276 | return true; | |
277 | } | |
278 | ||
9f95a23c | 279 | } |