]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/osd/osd_operations/client_request.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / crimson / osd / osd_operations / client_request.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <seastar/core/future.hh>
5
6 #include "messages/MOSDOp.h"
7 #include "messages/MOSDOpReply.h"
8
9 #include "crimson/common/exception.h"
10 #include "crimson/osd/pg.h"
11 #include "crimson/osd/osd.h"
12 #include "common/Formatter.h"
13 #include "crimson/osd/osd_operations/client_request.h"
14 #include "crimson/osd/osd_connection_priv.h"
15
16 namespace {
17 seastar::logger& logger() {
18 return crimson::get_logger(ceph_subsys_osd);
19 }
20 }
21
22 namespace crimson::osd {
23
24 ClientRequest::ClientRequest(
25 OSD &osd, crimson::net::ConnectionRef conn, Ref<MOSDOp> &&m)
26 : osd(osd), conn(conn), m(m)
27 {}
28
29 void ClientRequest::print(std::ostream &lhs) const
30 {
31 lhs << *m;
32 }
33
34 void ClientRequest::dump_detail(Formatter *f) const
35 {
36 }
37
38 ClientRequest::ConnectionPipeline &ClientRequest::cp()
39 {
40 return get_osd_priv(conn.get()).client_request_conn_pipeline;
41 }
42
43 ClientRequest::PGPipeline &ClientRequest::pp(PG &pg)
44 {
45 return pg.client_request_pg_pipeline;
46 }
47
48 bool ClientRequest::is_pg_op() const
49 {
50 return std::any_of(
51 begin(m->ops), end(m->ops),
52 [](auto& op) { return ceph_osd_op_type_pg(op.op.op); });
53 }
54
55 seastar::future<> ClientRequest::start()
56 {
57 logger().debug("{}: start", *this);
58
59 IRef opref = this;
60 return crimson::common::handle_system_shutdown(
61 [this, opref=std::move(opref)]() mutable {
62 return seastar::repeat([this, opref]() mutable {
63 return with_blocking_future(handle.enter(cp().await_map))
64 .then([this]() {
65 return with_blocking_future(osd.osdmap_gate.wait_for_map(m->get_min_epoch()));
66 }).then([this](epoch_t epoch) {
67 return with_blocking_future(handle.enter(cp().get_pg));
68 }).then([this] {
69 return with_blocking_future(osd.wait_for_pg(m->get_spg()));
70 }).then([this, opref](Ref<PG> pgref) {
71 PG &pg = *pgref;
72 if (pg.can_discard_op(*m)) {
73 return osd.send_incremental_map(conn, m->get_map_epoch());
74 }
75 return with_blocking_future(
76 handle.enter(pp(pg).await_map)
77 ).then([this, &pg]() mutable {
78 return with_blocking_future(
79 pg.osdmap_gate.wait_for_map(m->get_min_epoch()));
80 }).then([this, &pg](auto map) mutable {
81 return with_blocking_future(
82 handle.enter(pp(pg).wait_for_active));
83 }).then([this, &pg]() mutable {
84 return with_blocking_future(pg.wait_for_active_blocker.wait());
85 }).then([this, pgref=std::move(pgref)]() mutable {
86 if (m->finish_decode()) {
87 m->clear_payload();
88 }
89 if (is_pg_op()) {
90 return process_pg_op(pgref);
91 } else {
92 return process_op(pgref);
93 }
94 });
95 }).then([] {
96 return seastar::stop_iteration::yes;
97 }).handle_exception_type([](crimson::common::actingset_changed& e) {
98 if (e.is_primary()) {
99 logger().debug("operation restart, acting set changed");
100 return seastar::stop_iteration::no;
101 } else {
102 logger().debug("operation abort, up primary changed");
103 return seastar::stop_iteration::yes;
104 }
105 });
106 });
107 });
108 }
109
110 seastar::future<> ClientRequest::process_pg_op(
111 Ref<PG> &pg)
112 {
113 return pg->do_pg_ops(m)
114 .then([this, pg=std::move(pg)](Ref<MOSDOpReply> reply) {
115 return conn->send(reply);
116 });
117 }
118
119 seastar::future<> ClientRequest::process_op(
120 Ref<PG> &pgref)
121 {
122 PG& pg = *pgref;
123 return with_blocking_future(
124 handle.enter(pp(pg).recover_missing)
125 ).then([this, &pg, pgref] {
126 eversion_t ver;
127 const hobject_t& soid = m->get_hobj();
128 logger().debug("{} check for recovery, {}", *this, soid);
129 if (pg.is_unreadable_object(soid, &ver) ||
130 pg.is_degraded_or_backfilling_object(soid)) {
131 logger().debug("{} need to wait for recovery, {}", *this, soid);
132 if (pg.get_recovery_backend()->is_recovering(soid)) {
133 return pg.get_recovery_backend()->get_recovering(soid).wait_for_recovered();
134 } else {
135 auto [op, fut] = osd.get_shard_services().start_operation<UrgentRecovery>(
136 soid, ver, pgref, osd.get_shard_services(), pg.get_osdmap_epoch());
137 return std::move(fut);
138 }
139 }
140 return seastar::now();
141 }).then([this, &pg] {
142 return with_blocking_future(handle.enter(pp(pg).get_obc));
143 }).then([this, &pg]() -> PG::load_obc_ertr::future<> {
144 op_info.set_from_op(&*m, *pg.get_osdmap());
145 return pg.with_locked_obc(m, op_info, this, [this, &pg](auto obc) {
146 return with_blocking_future(
147 handle.enter(pp(pg).process)
148 ).then([this, &pg, obc] {
149 if (!pg.is_primary()) {
150 // primary can handle both normal ops and balanced reads
151 if (is_misdirected(pg)) {
152 logger().trace("process_op: dropping misdirected op");
153 return seastar::make_ready_future<Ref<MOSDOpReply>>();
154 } else if (const hobject_t& hoid = m->get_hobj();
155 !pg.get_peering_state().can_serve_replica_read(hoid)) {
156 auto reply = make_message<MOSDOpReply>(
157 m.get(), -EAGAIN, pg.get_osdmap_epoch(),
158 m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK),
159 !m->has_flag(CEPH_OSD_FLAG_RETURNVEC));
160 return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
161 }
162 }
163 return pg.do_osd_ops(m, obc, op_info);
164 }).then([this](Ref<MOSDOpReply> reply) {
165 if (reply) {
166 return conn->send(std::move(reply));
167 } else {
168 return seastar::now();
169 }
170 });
171 });
172 }).safe_then([pgref=std::move(pgref)] {
173 return seastar::now();
174 }, PG::load_obc_ertr::all_same_way([](auto &code) {
175 logger().error("ClientRequest saw error code {}", code);
176 return seastar::now();
177 }));
178 }
179
180 bool ClientRequest::is_misdirected(const PG& pg) const
181 {
182 // otherwise take a closer look
183 if (const int flags = m->get_flags();
184 flags & CEPH_OSD_FLAG_BALANCE_READS ||
185 flags & CEPH_OSD_FLAG_LOCALIZE_READS) {
186 if (!op_info.may_read()) {
187 // no read found, so it can't be balanced read
188 return true;
189 }
190 if (op_info.may_write() || op_info.may_cache()) {
191 // write op, but i am not primary
192 return true;
193 }
194 // balanced reads; any replica will do
195 return pg.is_nonprimary();
196 }
197 // neither balanced nor localize reads
198 return true;
199 }
200
201 }