]> git.proxmox.com Git - ceph.git/blame - ceph/src/crimson/osd/osd_operations/client_request.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / crimson / osd / osd_operations / client_request.cc
CommitLineData
20effc67
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
2// vim: ts=8 sw=2 smarttab expandtab
9f95a23c
TL
3
4#include <seastar/core/future.hh>
5
6#include "messages/MOSDOp.h"
7#include "messages/MOSDOpReply.h"
8
f67539c2 9#include "crimson/common/exception.h"
9f95a23c
TL
10#include "crimson/osd/pg.h"
11#include "crimson/osd/osd.h"
12#include "common/Formatter.h"
20effc67 13#include "crimson/osd/osd_operation_sequencer.h"
9f95a23c
TL
14#include "crimson/osd/osd_operations/client_request.h"
15#include "crimson/osd/osd_connection_priv.h"
16
17namespace {
18 seastar::logger& logger() {
19 return crimson::get_logger(ceph_subsys_osd);
20 }
21}
22
23namespace crimson::osd {
24
25ClientRequest::ClientRequest(
26 OSD &osd, crimson::net::ConnectionRef conn, Ref<MOSDOp> &&m)
20effc67
TL
27 : osd(osd),
28 conn(conn),
29 m(m),
30 sequencer(get_osd_priv(conn.get()).op_sequencer[m->get_spg()])
9f95a23c
TL
31{}
32
20effc67
TL
33ClientRequest::~ClientRequest()
34{
35 logger().debug("{}: destroying", *this);
36}
37
9f95a23c
TL
38void ClientRequest::print(std::ostream &lhs) const
39{
20effc67 40 lhs << "m=[" << *m << "]";
9f95a23c
TL
41}
42
43void ClientRequest::dump_detail(Formatter *f) const
44{
45}
46
47ClientRequest::ConnectionPipeline &ClientRequest::cp()
48{
49 return get_osd_priv(conn.get()).client_request_conn_pipeline;
50}
51
52ClientRequest::PGPipeline &ClientRequest::pp(PG &pg)
53{
54 return pg.client_request_pg_pipeline;
55}
56
20effc67
TL
57bool ClientRequest::same_session_and_pg(const ClientRequest& other_op) const
58{
59 return &get_osd_priv(conn.get()) == &get_osd_priv(other_op.conn.get()) &&
60 m->get_spg() == other_op.m->get_spg();
61}
62
9f95a23c
TL
63bool ClientRequest::is_pg_op() const
64{
65 return std::any_of(
66 begin(m->ops), end(m->ops),
67 [](auto& op) { return ceph_osd_op_type_pg(op.op.op); });
68}
69
20effc67
TL
70template <typename FuncT>
71ClientRequest::interruptible_future<> ClientRequest::with_sequencer(FuncT&& func)
72{
73 return sequencer.start_op(*this, handle, osd.get_shard_services().registry, std::forward<FuncT>(func));
74}
75
9f95a23c
TL
76seastar::future<> ClientRequest::start()
77{
78 logger().debug("{}: start", *this);
79
20effc67
TL
80 return seastar::repeat([this, opref=IRef{this}]() mutable {
81 logger().debug("{}: in repeat", *this);
f67539c2
TL
82 return with_blocking_future(handle.enter(cp().await_map))
83 .then([this]() {
20effc67
TL
84 return with_blocking_future(
85 osd.osdmap_gate.wait_for_map(
86 m->get_min_epoch()));
f67539c2
TL
87 }).then([this](epoch_t epoch) {
88 return with_blocking_future(handle.enter(cp().get_pg));
89 }).then([this] {
90 return with_blocking_future(osd.wait_for_pg(m->get_spg()));
20effc67
TL
91 }).then([this](Ref<PG> pgref) mutable {
92 return interruptor::with_interruption([this, pgref]() mutable {
93 epoch_t same_interval_since = pgref->get_interval_start_epoch();
94 logger().debug("{} same_interval_since: {}", *this, same_interval_since);
95 if (m->finish_decode()) {
96 m->clear_payload();
97 }
98 return with_sequencer(interruptor::wrap_function([pgref, this] {
99 PG &pg = *pgref;
100 if (pg.can_discard_op(*m)) {
101 return osd.send_incremental_map(
102 conn, m->get_map_epoch()).then([this] {
103 sequencer.finish_op_out_of_order(*this, osd.get_shard_services().registry);
104 return interruptor::now();
105 });
106 }
107 return with_blocking_future_interruptible<interruptor::condition>(
108 handle.enter(pp(pg).await_map)
109 ).then_interruptible([this, &pg] {
110 return with_blocking_future_interruptible<interruptor::condition>(
111 pg.osdmap_gate.wait_for_map(m->get_min_epoch()));
112 }).then_interruptible([this, &pg](auto map) {
113 return with_blocking_future_interruptible<interruptor::condition>(
114 handle.enter(pp(pg).wait_for_active));
115 }).then_interruptible([this, &pg]() {
116 return with_blocking_future_interruptible<interruptor::condition>(
117 pg.wait_for_active_blocker.wait());
118 }).then_interruptible([this, pgref=std::move(pgref)]() mutable {
119 if (is_pg_op()) {
120 return process_pg_op(pgref).then_interruptible([this] {
121 sequencer.finish_op_out_of_order(*this, osd.get_shard_services().registry);
122 });
123 } else {
124 return process_op(pgref).then_interruptible([this] (const seq_mode_t mode) {
125 if (mode == seq_mode_t::IN_ORDER) {
126 sequencer.finish_op_in_order(*this);
127 } else {
128 assert(mode == seq_mode_t::OUT_OF_ORDER);
129 sequencer.finish_op_out_of_order(*this, osd.get_shard_services().registry);
130 }
131 });
132 }
133 });
134 })).then_interruptible([pgref] {
135 return seastar::stop_iteration::yes;
136 });
137 }, [this, pgref](std::exception_ptr eptr) {
138 if (should_abort_request(*this, std::move(eptr))) {
139 sequencer.abort();
140 return seastar::stop_iteration::yes;
141 } else {
142 sequencer.maybe_reset(*this);
143 return seastar::stop_iteration::no;
144 }
145 }, pgref);
f67539c2 146 });
9f95a23c
TL
147 });
148}
149
20effc67
TL
150ClientRequest::interruptible_future<>
151ClientRequest::process_pg_op(
f67539c2 152 Ref<PG> &pg)
9f95a23c 153{
f67539c2 154 return pg->do_pg_ops(m)
20effc67
TL
155 .then_interruptible([this, pg=std::move(pg)](MURef<MOSDOpReply> reply) {
156 return conn->send(std::move(reply));
9f95a23c
TL
157 });
158}
159
20effc67
TL
160ClientRequest::interruptible_future<ClientRequest::seq_mode_t>
161ClientRequest::process_op(Ref<PG> &pg)
9f95a23c 162{
20effc67
TL
163 return with_blocking_future_interruptible<interruptor::condition>(
164 handle.enter(pp(*pg).recover_missing))
165 .then_interruptible(
166 [this, pg]() mutable {
167 return do_recover_missing(pg, m->get_hobj());
168 }).then_interruptible([this, pg]() mutable {
169 return pg->already_complete(m->get_reqid()).then_unpack_interruptible(
170 [this, pg](bool completed, int ret) mutable
171 -> PG::load_obc_iertr::future<seq_mode_t> {
172 if (completed) {
173 auto reply = crimson::make_message<MOSDOpReply>(
174 m.get(), ret, pg->get_osdmap_epoch(),
175 CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK, false);
176 return conn->send(std::move(reply)).then([] {
177 return seastar::make_ready_future<seq_mode_t>(seq_mode_t::OUT_OF_ORDER);
178 });
f67539c2 179 } else {
20effc67
TL
180 return with_blocking_future_interruptible<interruptor::condition>(
181 handle.enter(pp(*pg).get_obc)).then_interruptible(
182 [this, pg]() mutable -> PG::load_obc_iertr::future<seq_mode_t> {
183 logger().debug("{}: got obc lock", *this);
184 op_info.set_from_op(&*m, *pg->get_osdmap());
185 // XXX: `do_with()` is just a workaround for `with_obc_func_t` imposing
186 // `future<void>`.
187 return seastar::do_with(seq_mode_t{}, [this, &pg] (seq_mode_t& mode) {
188 return pg->with_locked_obc(m->get_hobj(), op_info,
189 [this, pg, &mode](auto obc) mutable {
190 return with_blocking_future_interruptible<interruptor::condition>(
191 handle.enter(pp(*pg).process)
192 ).then_interruptible([this, pg, obc, &mode]() mutable {
193 return do_process(pg, obc).then_interruptible([&mode] (seq_mode_t _mode) {
194 mode = _mode;
195 return seastar::now();
196 });
197 });
198 }).safe_then_interruptible([&mode] {
199 return PG::load_obc_iertr::make_ready_future<seq_mode_t>(mode);
200 });
201 });
202 });
f67539c2 203 }
f67539c2 204 });
20effc67
TL
205 }).safe_then_interruptible([pg=std::move(pg)] (const seq_mode_t mode) {
206 return seastar::make_ready_future<seq_mode_t>(mode);
9f95a23c
TL
207 }, PG::load_obc_ertr::all_same_way([](auto &code) {
208 logger().error("ClientRequest saw error code {}", code);
20effc67
TL
209 return seastar::make_ready_future<seq_mode_t>(seq_mode_t::OUT_OF_ORDER);
210 }));
211}
212
213ClientRequest::interruptible_future<ClientRequest::seq_mode_t>
214ClientRequest::do_process(Ref<PG>& pg, crimson::osd::ObjectContextRef obc)
215{
216 if (!pg->is_primary()) {
217 // primary can handle both normal ops and balanced reads
218 if (is_misdirected(*pg)) {
219 logger().trace("do_process: dropping misdirected op");
220 return seastar::make_ready_future<seq_mode_t>(seq_mode_t::OUT_OF_ORDER);
221 } else if (const hobject_t& hoid = m->get_hobj();
222 !pg->get_peering_state().can_serve_replica_read(hoid)) {
223 auto reply = crimson::make_message<MOSDOpReply>(
224 m.get(), -EAGAIN, pg->get_osdmap_epoch(),
225 m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK),
226 !m->has_flag(CEPH_OSD_FLAG_RETURNVEC));
227 return conn->send(std::move(reply)).then([] {
228 return seastar::make_ready_future<seq_mode_t>(seq_mode_t::OUT_OF_ORDER);
229 });
230 }
231 }
232 return pg->do_osd_ops(m, obc, op_info).safe_then_unpack_interruptible(
233 [this, pg](auto submitted, auto all_completed) mutable {
234 return submitted.then_interruptible(
235 [this, pg] {
236 return with_blocking_future_interruptible<interruptor::condition>(
237 handle.enter(pp(*pg).wait_repop));
238 }).then_interruptible(
239 [this, pg, all_completed=std::move(all_completed)]() mutable {
240 return all_completed.safe_then_interruptible(
241 [this, pg](MURef<MOSDOpReply> reply) {
242 return with_blocking_future_interruptible<interruptor::condition>(
243 handle.enter(pp(*pg).send_reply)).then_interruptible(
244 [this, reply=std::move(reply)]() mutable{
245 return conn->send(std::move(reply)).then([] {
246 return seastar::make_ready_future<seq_mode_t>(seq_mode_t::IN_ORDER);
247 });
248 });
249 }, crimson::ct_error::eagain::handle([this, pg]() mutable {
250 return process_op(pg);
251 }));
252 });
253 }, crimson::ct_error::eagain::handle([this, pg]() mutable {
254 return process_op(pg);
9f95a23c
TL
255 }));
256}
f67539c2
TL
257
258bool ClientRequest::is_misdirected(const PG& pg) const
259{
260 // otherwise take a closer look
261 if (const int flags = m->get_flags();
262 flags & CEPH_OSD_FLAG_BALANCE_READS ||
263 flags & CEPH_OSD_FLAG_LOCALIZE_READS) {
264 if (!op_info.may_read()) {
265 // no read found, so it can't be balanced read
266 return true;
267 }
268 if (op_info.may_write() || op_info.may_cache()) {
269 // write op, but i am not primary
270 return true;
271 }
272 // balanced reads; any replica will do
273 return pg.is_nonprimary();
274 }
275 // neither balanced nor localize reads
276 return true;
277}
278
9f95a23c 279}