1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include <seastar/core/future.hh>
6 #include "messages/MOSDOp.h"
7 #include "messages/MOSDOpReply.h"
9 #include "crimson/osd/pg.h"
10 #include "crimson/osd/osd.h"
11 #include "common/Formatter.h"
12 #include "crimson/osd/osd_operations/client_request.h"
13 #include "crimson/osd/osd_connection_priv.h"
16 seastar::logger
& logger() {
17 return crimson::get_logger(ceph_subsys_osd
);
21 namespace crimson::osd
{
23 ClientRequest::ClientRequest(
24 OSD
&osd
, crimson::net::ConnectionRef conn
, Ref
<MOSDOp
> &&m
)
25 : osd(osd
), conn(conn
), m(m
)
28 void ClientRequest::print(std::ostream
&lhs
) const
33 void ClientRequest::dump_detail(Formatter
*f
) const
37 ClientRequest::ConnectionPipeline
&ClientRequest::cp()
39 return get_osd_priv(conn
.get()).client_request_conn_pipeline
;
42 ClientRequest::PGPipeline
&ClientRequest::pp(PG
&pg
)
44 return pg
.client_request_pg_pipeline
;
47 bool ClientRequest::is_pg_op() const
50 begin(m
->ops
), end(m
->ops
),
51 [](auto& op
) { return ceph_osd_op_type_pg(op
.op
.op
); });
54 seastar::future
<> ClientRequest::start()
56 logger().debug("{}: start", *this);
59 return with_blocking_future(handle
.enter(cp().await_map
))
61 return with_blocking_future(osd
.osdmap_gate
.wait_for_map(m
->get_min_epoch()));
62 }).then([this](epoch_t epoch
) {
63 return with_blocking_future(handle
.enter(cp().get_pg
));
65 return with_blocking_future(osd
.wait_for_pg(m
->get_spg()));
66 }).then([this, opref
=std::move(opref
)](Ref
<PG
> pgref
) {
67 return seastar::do_with(
68 std::move(pgref
), std::move(opref
), [this](auto pgref
, auto opref
) {
70 return with_blocking_future(
71 handle
.enter(pp(pg
).await_map
)
72 ).then([this, &pg
]() mutable {
73 return with_blocking_future(
74 pg
.osdmap_gate
.wait_for_map(m
->get_map_epoch()));
75 }).then([this, &pg
](auto map
) mutable {
76 return with_blocking_future(
77 handle
.enter(pp(pg
).wait_for_active
));
78 }).then([this, &pg
]() mutable {
79 return with_blocking_future(pg
.wait_for_active_blocker
.wait());
80 }).then([this, &pg
]() mutable {
81 if (m
->finish_decode()) {
85 return process_pg_op(pg
);
87 return process_op(pg
);
94 seastar::future
<> ClientRequest::process_pg_op(
97 return pg
.do_pg_ops(m
)
98 .then([this](Ref
<MOSDOpReply
> reply
) {
99 return conn
->send(reply
);
103 seastar::future
<> ClientRequest::process_op(
106 return with_blocking_future(
107 handle
.enter(pp(pg
).get_obc
)
108 ).then([this, &pg
]() {
109 op_info
.set_from_op(&*m
, *pg
.get_osdmap());
110 return pg
.with_locked_obc(
114 [this, &pg
](auto obc
) {
115 return with_blocking_future(handle
.enter(pp(pg
).process
)
116 ).then([this, &pg
, obc
]() {
117 return pg
.do_osd_ops(m
, obc
);
118 }).then([this](Ref
<MOSDOpReply
> reply
) {
119 return conn
->send(reply
);
123 return seastar::now();
124 }, PG::load_obc_ertr::all_same_way([](auto &code
) {
125 logger().error("ClientRequest saw error code {}", code
);
126 return seastar::now();