]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/osd/osd_operations/client_request.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / crimson / osd / osd_operations / client_request.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <seastar/core/future.hh>
5
6 #include "messages/MOSDOp.h"
7 #include "messages/MOSDOpReply.h"
8
9 #include "crimson/osd/pg.h"
10 #include "crimson/osd/osd.h"
11 #include "common/Formatter.h"
12 #include "crimson/osd/osd_operations/client_request.h"
13 #include "crimson/osd/osd_connection_priv.h"
14
15 namespace {
16 seastar::logger& logger() {
17 return crimson::get_logger(ceph_subsys_osd);
18 }
19 }
20
21 namespace crimson::osd {
22
23 ClientRequest::ClientRequest(
24 OSD &osd, crimson::net::ConnectionRef conn, Ref<MOSDOp> &&m)
25 : osd(osd), conn(conn), m(m)
26 {}
27
28 void ClientRequest::print(std::ostream &lhs) const
29 {
30 lhs << *m;
31 }
32
33 void ClientRequest::dump_detail(Formatter *f) const
34 {
35 }
36
37 ClientRequest::ConnectionPipeline &ClientRequest::cp()
38 {
39 return get_osd_priv(conn.get()).client_request_conn_pipeline;
40 }
41
42 ClientRequest::PGPipeline &ClientRequest::pp(PG &pg)
43 {
44 return pg.client_request_pg_pipeline;
45 }
46
47 bool ClientRequest::is_pg_op() const
48 {
49 return std::any_of(
50 begin(m->ops), end(m->ops),
51 [](auto& op) { return ceph_osd_op_type_pg(op.op.op); });
52 }
53
54 seastar::future<> ClientRequest::start()
55 {
56 logger().debug("{}: start", *this);
57
58 IRef opref = this;
59 return with_blocking_future(handle.enter(cp().await_map))
60 .then([this]() {
61 return with_blocking_future(osd.osdmap_gate.wait_for_map(m->get_min_epoch()));
62 }).then([this](epoch_t epoch) {
63 return with_blocking_future(handle.enter(cp().get_pg));
64 }).then([this] {
65 return with_blocking_future(osd.wait_for_pg(m->get_spg()));
66 }).then([this, opref=std::move(opref)](Ref<PG> pgref) {
67 return seastar::do_with(
68 std::move(pgref), std::move(opref), [this](auto pgref, auto opref) {
69 PG &pg = *pgref;
70 return with_blocking_future(
71 handle.enter(pp(pg).await_map)
72 ).then([this, &pg]() mutable {
73 return with_blocking_future(
74 pg.osdmap_gate.wait_for_map(m->get_map_epoch()));
75 }).then([this, &pg](auto map) mutable {
76 return with_blocking_future(
77 handle.enter(pp(pg).wait_for_active));
78 }).then([this, &pg]() mutable {
79 return with_blocking_future(pg.wait_for_active_blocker.wait());
80 }).then([this, &pg]() mutable {
81 if (m->finish_decode()) {
82 m->clear_payload();
83 }
84 if (is_pg_op()) {
85 return process_pg_op(pg);
86 } else {
87 return process_op(pg);
88 }
89 });
90 });
91 });
92 }
93
94 seastar::future<> ClientRequest::process_pg_op(
95 PG &pg)
96 {
97 return pg.do_pg_ops(m)
98 .then([this](Ref<MOSDOpReply> reply) {
99 return conn->send(reply);
100 });
101 }
102
103 seastar::future<> ClientRequest::process_op(
104 PG &pg)
105 {
106 return with_blocking_future(
107 handle.enter(pp(pg).get_obc)
108 ).then([this, &pg]() {
109 op_info.set_from_op(&*m, *pg.get_osdmap());
110 return pg.with_locked_obc(
111 m,
112 op_info,
113 this,
114 [this, &pg](auto obc) {
115 return with_blocking_future(handle.enter(pp(pg).process)
116 ).then([this, &pg, obc]() {
117 return pg.do_osd_ops(m, obc);
118 }).then([this](Ref<MOSDOpReply> reply) {
119 return conn->send(reply);
120 });
121 });
122 }).safe_then([] {
123 return seastar::now();
124 }, PG::load_obc_ertr::all_same_way([](auto &code) {
125 logger().error("ClientRequest saw error code {}", code);
126 return seastar::now();
127 }));
128 }
129 }