]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/osd/osd_operations/client_request.cc
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / crimson / osd / osd_operations / client_request.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
2 // vim: ts=8 sw=2 smarttab expandtab
3
4 #include "messages/MOSDOp.h"
5 #include "messages/MOSDOpReply.h"
6
7 #include "crimson/common/exception.h"
8 #include "crimson/osd/pg.h"
9 #include "crimson/osd/osd.h"
10 #include "common/Formatter.h"
11 #include "crimson/osd/osd_operation_external_tracking.h"
12 #include "crimson/osd/osd_operations/client_request.h"
13 #include "crimson/osd/osd_connection_priv.h"
14 #include "osd/object_state_fmt.h"
15
16 namespace {
17 seastar::logger& logger() {
18 return crimson::get_logger(ceph_subsys_osd);
19 }
20 }
21
22 namespace crimson::osd {
23
24
25 void ClientRequest::Orderer::requeue(
26 ShardServices &shard_services, Ref<PG> pg)
27 {
28 for (auto &req: list) {
29 logger().debug("{}: {} requeueing {}", __func__, *pg, req);
30 req.reset_instance_handle();
31 std::ignore = req.with_pg_int(shard_services, pg);
32 }
33 }
34
35 void ClientRequest::Orderer::clear_and_cancel()
36 {
37 for (auto i = list.begin(); i != list.end(); ) {
38 logger().debug(
39 "ClientRequest::Orderer::clear_and_cancel: {}",
40 *i);
41 i->complete_request();
42 remove_request(*(i++));
43 }
44 }
45
46 void ClientRequest::complete_request()
47 {
48 track_event<CompletionEvent>();
49 on_complete.set_value();
50 }
51
52 ClientRequest::ClientRequest(
53 ShardServices &shard_services, crimson::net::ConnectionRef conn,
54 Ref<MOSDOp> &&m)
55 : put_historic_shard_services(&shard_services),
56 conn(std::move(conn)),
57 m(std::move(m)),
58 instance_handle(new instance_handle_t)
59 {}
60
61 ClientRequest::~ClientRequest()
62 {
63 logger().debug("{}: destroying", *this);
64 }
65
66 void ClientRequest::print(std::ostream &lhs) const
67 {
68 lhs << "m=[" << *m << "]";
69 }
70
71 void ClientRequest::dump_detail(Formatter *f) const
72 {
73 logger().debug("{}: dumping", *this);
74 std::apply([f] (auto... event) {
75 (..., event.dump(f));
76 }, tracking_events);
77 }
78
79 ConnectionPipeline &ClientRequest::get_connection_pipeline()
80 {
81 return get_osd_priv(conn.get()).client_request_conn_pipeline;
82 }
83
84 ClientRequest::PGPipeline &ClientRequest::client_pp(PG &pg)
85 {
86 return pg.request_pg_pipeline;
87 }
88
89 bool ClientRequest::is_pg_op() const
90 {
91 return std::any_of(
92 begin(m->ops), end(m->ops),
93 [](auto& op) { return ceph_osd_op_type_pg(op.op.op); });
94 }
95
96 seastar::future<> ClientRequest::with_pg_int(
97 ShardServices &shard_services, Ref<PG> pgref)
98 {
99 epoch_t same_interval_since = pgref->get_interval_start_epoch();
100 logger().debug("{} same_interval_since: {}", *this, same_interval_since);
101 if (m->finish_decode()) {
102 m->clear_payload();
103 }
104 const auto this_instance_id = instance_id++;
105 OperationRef opref{this};
106 auto instance_handle = get_instance_handle();
107 auto &ihref = *instance_handle;
108 return interruptor::with_interruption(
109 [this, pgref, this_instance_id, &ihref, &shard_services]() mutable {
110 PG &pg = *pgref;
111 if (pg.can_discard_op(*m)) {
112 return shard_services.send_incremental_map(
113 std::ref(*conn), m->get_map_epoch()
114 ).then([this, this_instance_id, pgref] {
115 logger().debug("{}.{}: discarding", *this, this_instance_id);
116 pgref->client_request_orderer.remove_request(*this);
117 complete_request();
118 return interruptor::now();
119 });
120 }
121 return ihref.enter_stage<interruptor>(client_pp(pg).await_map, *this
122 ).then_interruptible([this, this_instance_id, &pg, &ihref] {
123 logger().debug("{}.{}: after await_map stage", *this, this_instance_id);
124 return ihref.enter_blocker(
125 *this, pg.osdmap_gate, &decltype(pg.osdmap_gate)::wait_for_map,
126 m->get_min_epoch(), nullptr);
127 }).then_interruptible([this, this_instance_id, &pg, &ihref](auto map) {
128 logger().debug("{}.{}: after wait_for_map", *this, this_instance_id);
129 return ihref.enter_stage<interruptor>(client_pp(pg).wait_for_active, *this);
130 }).then_interruptible([this, this_instance_id, &pg, &ihref]() {
131 logger().debug(
132 "{}.{}: after wait_for_active stage", *this, this_instance_id);
133 return ihref.enter_blocker(
134 *this,
135 pg.wait_for_active_blocker,
136 &decltype(pg.wait_for_active_blocker)::wait);
137 }).then_interruptible([this, pgref, this_instance_id, &ihref]() mutable
138 -> interruptible_future<> {
139 logger().debug(
140 "{}.{}: after wait_for_active", *this, this_instance_id);
141 if (is_pg_op()) {
142 return process_pg_op(pgref);
143 } else {
144 return process_op(ihref, pgref);
145 }
146 }).then_interruptible([this, this_instance_id, pgref] {
147 logger().debug("{}.{}: after process*", *this, this_instance_id);
148 pgref->client_request_orderer.remove_request(*this);
149 complete_request();
150 });
151 }, [this, this_instance_id, pgref](std::exception_ptr eptr) {
152 // TODO: better debug output
153 logger().debug("{}.{}: interrupted {}", *this, this_instance_id, eptr);
154 }, pgref).finally(
155 [opref=std::move(opref), pgref=std::move(pgref),
156 instance_handle=std::move(instance_handle), &ihref] {
157 ihref.handle.exit();
158 });
159 }
160
161 seastar::future<> ClientRequest::with_pg(
162 ShardServices &shard_services, Ref<PG> pgref)
163 {
164 put_historic_shard_services = &shard_services;
165 pgref->client_request_orderer.add_request(*this);
166 auto ret = on_complete.get_future();
167 std::ignore = with_pg_int(
168 shard_services, std::move(pgref)
169 );
170 return ret;
171 }
172
173 ClientRequest::interruptible_future<>
174 ClientRequest::process_pg_op(
175 Ref<PG> &pg)
176 {
177 return pg->do_pg_ops(
178 m
179 ).then_interruptible([this, pg=std::move(pg)](MURef<MOSDOpReply> reply) {
180 return conn->send(std::move(reply));
181 });
182 }
183
184 auto ClientRequest::reply_op_error(const Ref<PG>& pg, int err)
185 {
186 logger().debug("{}: replying with error {}", *this, err);
187 auto reply = crimson::make_message<MOSDOpReply>(
188 m.get(), err, pg->get_osdmap_epoch(),
189 m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK),
190 !m->has_flag(CEPH_OSD_FLAG_RETURNVEC));
191 reply->set_reply_versions(eversion_t(), 0);
192 reply->set_op_returns(std::vector<pg_log_op_return_item_t>{});
193 return conn->send(std::move(reply));
194 }
195
196 ClientRequest::interruptible_future<>
197 ClientRequest::process_op(instance_handle_t &ihref, Ref<PG> &pg)
198 {
199 return ihref.enter_stage<interruptor>(
200 client_pp(*pg).recover_missing,
201 *this
202 ).then_interruptible(
203 [this, pg]() mutable {
204 if (pg->is_primary()) {
205 return do_recover_missing(pg, m->get_hobj());
206 } else {
207 logger().debug("process_op: Skipping do_recover_missing"
208 "on non primary pg");
209 return interruptor::now();
210 }
211 }).then_interruptible([this, pg, &ihref]() mutable {
212 return pg->already_complete(m->get_reqid()).then_interruptible(
213 [this, pg, &ihref](auto completed) mutable
214 -> PG::load_obc_iertr::future<> {
215 if (completed) {
216 auto reply = crimson::make_message<MOSDOpReply>(
217 m.get(), completed->err, pg->get_osdmap_epoch(),
218 CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK, false);
219 reply->set_reply_versions(completed->version, completed->user_version);
220 return conn->send(std::move(reply));
221 } else {
222 return ihref.enter_stage<interruptor>(client_pp(*pg).get_obc, *this
223 ).then_interruptible(
224 [this, pg, &ihref]() mutable -> PG::load_obc_iertr::future<> {
225 logger().debug("{}: in get_obc stage", *this);
226 op_info.set_from_op(&*m, *pg->get_osdmap());
227 return pg->with_locked_obc(
228 m->get_hobj(), op_info,
229 [this, pg, &ihref](auto obc) mutable {
230 logger().debug("{}: got obc {}", *this, obc->obs);
231 return ihref.enter_stage<interruptor>(
232 client_pp(*pg).process, *this
233 ).then_interruptible([this, pg, obc, &ihref]() mutable {
234 return do_process(ihref, pg, obc);
235 });
236 });
237 });
238 }
239 });
240 }).handle_error_interruptible(
241 PG::load_obc_ertr::all_same_way([this, pg=std::move(pg)](const auto &code) {
242 logger().error("ClientRequest saw error code {}", code);
243 assert(code.value() > 0);
244 return reply_op_error(pg, -code.value());
245 }));
246 }
247
248 ClientRequest::interruptible_future<>
249 ClientRequest::do_process(
250 instance_handle_t &ihref,
251 Ref<PG>& pg, crimson::osd::ObjectContextRef obc)
252 {
253 if (m->has_flag(CEPH_OSD_FLAG_PARALLELEXEC)) {
254 return reply_op_error(pg, -EINVAL);
255 }
256 const pg_pool_t pool = pg->get_pgpool().info;
257 if (pool.has_flag(pg_pool_t::FLAG_EIO)) {
258 // drop op on the floor; the client will handle returning EIO
259 if (m->has_flag(CEPH_OSD_FLAG_SUPPORTSPOOLEIO)) {
260 logger().debug("discarding op due to pool EIO flag");
261 return seastar::now();
262 } else {
263 logger().debug("replying EIO due to pool EIO flag");
264 return reply_op_error(pg, -EIO);
265 }
266 }
267 if (m->get_oid().name.size()
268 > crimson::common::local_conf()->osd_max_object_name_len) {
269 return reply_op_error(pg, -ENAMETOOLONG);
270 } else if (m->get_hobj().get_key().size()
271 > crimson::common::local_conf()->osd_max_object_name_len) {
272 return reply_op_error(pg, -ENAMETOOLONG);
273 } else if (m->get_hobj().nspace.size()
274 > crimson::common::local_conf()->osd_max_object_namespace_len) {
275 return reply_op_error(pg, -ENAMETOOLONG);
276 } else if (m->get_hobj().oid.name.empty()) {
277 return reply_op_error(pg, -EINVAL);
278 } else if (pg->get_osdmap()->is_blocklisted(conn->get_peer_addr())) {
279 logger().info("{} is blocklisted", conn->get_peer_addr());
280 return reply_op_error(pg, -EBLOCKLISTED);
281 }
282
283 if (!obc->obs.exists && !op_info.may_write()) {
284 return reply_op_error(pg, -ENOENT);
285 }
286
287 SnapContext snapc = get_snapc(pg,obc);
288
289 if ((m->has_flag(CEPH_OSD_FLAG_ORDERSNAP)) &&
290 snapc.seq < obc->ssc->snapset.seq) {
291 logger().debug("{} ORDERSNAP flag set and snapc seq {}",
292 " < snapset seq {} on {}",
293 __func__, snapc.seq, obc->ssc->snapset.seq,
294 obc->obs.oi.soid);
295 return reply_op_error(pg, -EOLDSNAPC);
296 }
297
298 if (!pg->is_primary()) {
299 // primary can handle both normal ops and balanced reads
300 if (is_misdirected(*pg)) {
301 logger().trace("do_process: dropping misdirected op");
302 return seastar::now();
303 } else if (const hobject_t& hoid = m->get_hobj();
304 !pg->get_peering_state().can_serve_replica_read(hoid)) {
305 logger().debug("{}: unstable write on replica, "
306 "bouncing to primary",
307 __func__);
308 return reply_op_error(pg, -EAGAIN);
309 } else {
310 logger().debug("{}: serving replica read on oid {}",
311 __func__, m->get_hobj());
312 }
313 }
314 return pg->do_osd_ops(m, conn, obc, op_info, snapc).safe_then_unpack_interruptible(
315 [this, pg, &ihref](auto submitted, auto all_completed) mutable {
316 return submitted.then_interruptible([this, pg, &ihref] {
317 return ihref.enter_stage<interruptor>(client_pp(*pg).wait_repop, *this);
318 }).then_interruptible(
319 [this, pg, all_completed=std::move(all_completed), &ihref]() mutable {
320 return all_completed.safe_then_interruptible(
321 [this, pg, &ihref](MURef<MOSDOpReply> reply) {
322 return ihref.enter_stage<interruptor>(client_pp(*pg).send_reply, *this
323 ).then_interruptible(
324 [this, reply=std::move(reply)]() mutable {
325 logger().debug("{}: sending response", *this);
326 return conn->send(std::move(reply));
327 });
328 }, crimson::ct_error::eagain::handle([this, pg, &ihref]() mutable {
329 return process_op(ihref, pg);
330 }));
331 });
332 }, crimson::ct_error::eagain::handle([this, pg, &ihref]() mutable {
333 return process_op(ihref, pg);
334 }));
335 }
336
337 bool ClientRequest::is_misdirected(const PG& pg) const
338 {
339 // otherwise take a closer look
340 if (const int flags = m->get_flags();
341 flags & CEPH_OSD_FLAG_BALANCE_READS ||
342 flags & CEPH_OSD_FLAG_LOCALIZE_READS) {
343 if (!op_info.may_read()) {
344 // no read found, so it can't be balanced read
345 return true;
346 }
347 if (op_info.may_write() || op_info.may_cache()) {
348 // write op, but i am not primary
349 return true;
350 }
351 // balanced reads; any replica will do
352 return false;
353 }
354 // neither balanced nor localize reads
355 return true;
356 }
357
358 void ClientRequest::put_historic() const
359 {
360 ceph_assert_always(put_historic_shard_services);
361 put_historic_shard_services->get_registry().put_historic(*this);
362 }
363
364 const SnapContext ClientRequest::get_snapc(
365 Ref<PG>& pg,
366 crimson::osd::ObjectContextRef obc) const
367 {
368 SnapContext snapc;
369 if (op_info.may_write() || op_info.may_cache()) {
370 // snap
371 if (pg->get_pgpool().info.is_pool_snaps_mode()) {
372 // use pool's snapc
373 snapc = pg->get_pgpool().snapc;
374 logger().debug("{} using pool's snapc snaps={}",
375 __func__, snapc.snaps);
376
377 } else {
378 // client specified snapc
379 snapc.seq = m->get_snap_seq();
380 snapc.snaps = m->get_snaps();
381 logger().debug("{} client specified snapc seq={} snaps={}",
382 __func__, snapc.seq, snapc.snaps);
383 }
384 }
385 return snapc;
386 }
387
388 }