1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include <seastar/core/future.hh>
6 #include "messages/MOSDOp.h"
7 #include "messages/MOSDOpReply.h"
9 #include "crimson/common/exception.h"
10 #include "crimson/osd/pg.h"
11 #include "crimson/osd/osd.h"
12 #include "common/Formatter.h"
13 #include "crimson/osd/osd_operations/client_request.h"
14 #include "crimson/osd/osd_connection_priv.h"
17 seastar::logger
& logger() {
18 return crimson::get_logger(ceph_subsys_osd
);
22 namespace crimson::osd
{
24 ClientRequest::ClientRequest(
25 OSD
&osd
, crimson::net::ConnectionRef conn
, Ref
<MOSDOp
> &&m
)
26 : osd(osd
), conn(conn
), m(m
)
29 void ClientRequest::print(std::ostream
&lhs
) const
34 void ClientRequest::dump_detail(Formatter
*f
) const
38 ClientRequest::ConnectionPipeline
&ClientRequest::cp()
40 return get_osd_priv(conn
.get()).client_request_conn_pipeline
;
43 ClientRequest::PGPipeline
&ClientRequest::pp(PG
&pg
)
45 return pg
.client_request_pg_pipeline
;
48 bool ClientRequest::is_pg_op() const
51 begin(m
->ops
), end(m
->ops
),
52 [](auto& op
) { return ceph_osd_op_type_pg(op
.op
.op
); });
55 seastar::future
<> ClientRequest::start()
57 logger().debug("{}: start", *this);
60 return crimson::common::handle_system_shutdown(
61 [this, opref
=std::move(opref
)]() mutable {
62 return seastar::repeat([this, opref
]() mutable {
63 return with_blocking_future(handle
.enter(cp().await_map
))
65 return with_blocking_future(osd
.osdmap_gate
.wait_for_map(m
->get_min_epoch()));
66 }).then([this](epoch_t epoch
) {
67 return with_blocking_future(handle
.enter(cp().get_pg
));
69 return with_blocking_future(osd
.wait_for_pg(m
->get_spg()));
70 }).then([this, opref
](Ref
<PG
> pgref
) {
72 if (pg
.can_discard_op(*m
)) {
73 return osd
.send_incremental_map(conn
, m
->get_map_epoch());
75 return with_blocking_future(
76 handle
.enter(pp(pg
).await_map
)
77 ).then([this, &pg
]() mutable {
78 return with_blocking_future(
79 pg
.osdmap_gate
.wait_for_map(m
->get_min_epoch()));
80 }).then([this, &pg
](auto map
) mutable {
81 return with_blocking_future(
82 handle
.enter(pp(pg
).wait_for_active
));
83 }).then([this, &pg
]() mutable {
84 return with_blocking_future(pg
.wait_for_active_blocker
.wait());
85 }).then([this, pgref
=std::move(pgref
)]() mutable {
86 if (m
->finish_decode()) {
90 return process_pg_op(pgref
);
92 return process_op(pgref
);
96 return seastar::stop_iteration::yes
;
97 }).handle_exception_type([](crimson::common::actingset_changed
& e
) {
99 logger().debug("operation restart, acting set changed");
100 return seastar::stop_iteration::no
;
102 logger().debug("operation abort, up primary changed");
103 return seastar::stop_iteration::yes
;
110 seastar::future
<> ClientRequest::process_pg_op(
113 return pg
->do_pg_ops(m
)
114 .then([this, pg
=std::move(pg
)](Ref
<MOSDOpReply
> reply
) {
115 return conn
->send(reply
);
119 seastar::future
<> ClientRequest::process_op(
123 return with_blocking_future(
124 handle
.enter(pp(pg
).recover_missing
)
125 ).then([this, &pg
, pgref
] {
127 const hobject_t
& soid
= m
->get_hobj();
128 logger().debug("{} check for recovery, {}", *this, soid
);
129 if (pg
.is_unreadable_object(soid
, &ver
) ||
130 pg
.is_degraded_or_backfilling_object(soid
)) {
131 logger().debug("{} need to wait for recovery, {}", *this, soid
);
132 if (pg
.get_recovery_backend()->is_recovering(soid
)) {
133 return pg
.get_recovery_backend()->get_recovering(soid
).wait_for_recovered();
135 auto [op
, fut
] = osd
.get_shard_services().start_operation
<UrgentRecovery
>(
136 soid
, ver
, pgref
, osd
.get_shard_services(), pg
.get_osdmap_epoch());
137 return std::move(fut
);
140 return seastar::now();
141 }).then([this, &pg
] {
142 return with_blocking_future(handle
.enter(pp(pg
).get_obc
));
143 }).then([this, &pg
]() -> PG::load_obc_ertr::future
<> {
144 op_info
.set_from_op(&*m
, *pg
.get_osdmap());
145 return pg
.with_locked_obc(m
, op_info
, this, [this, &pg
](auto obc
) {
146 return with_blocking_future(
147 handle
.enter(pp(pg
).process
)
148 ).then([this, &pg
, obc
] {
149 if (!pg
.is_primary()) {
150 // primary can handle both normal ops and balanced reads
151 if (is_misdirected(pg
)) {
152 logger().trace("process_op: dropping misdirected op");
153 return seastar::make_ready_future
<Ref
<MOSDOpReply
>>();
154 } else if (const hobject_t
& hoid
= m
->get_hobj();
155 !pg
.get_peering_state().can_serve_replica_read(hoid
)) {
156 auto reply
= make_message
<MOSDOpReply
>(
157 m
.get(), -EAGAIN
, pg
.get_osdmap_epoch(),
158 m
->get_flags() & (CEPH_OSD_FLAG_ACK
|CEPH_OSD_FLAG_ONDISK
),
159 !m
->has_flag(CEPH_OSD_FLAG_RETURNVEC
));
160 return seastar::make_ready_future
<Ref
<MOSDOpReply
>>(std::move(reply
));
163 return pg
.do_osd_ops(m
, obc
, op_info
);
164 }).then([this](Ref
<MOSDOpReply
> reply
) {
166 return conn
->send(std::move(reply
));
168 return seastar::now();
172 }).safe_then([pgref
=std::move(pgref
)] {
173 return seastar::now();
174 }, PG::load_obc_ertr::all_same_way([](auto &code
) {
175 logger().error("ClientRequest saw error code {}", code
);
176 return seastar::now();
180 bool ClientRequest::is_misdirected(const PG
& pg
) const
182 // otherwise take a closer look
183 if (const int flags
= m
->get_flags();
184 flags
& CEPH_OSD_FLAG_BALANCE_READS
||
185 flags
& CEPH_OSD_FLAG_LOCALIZE_READS
) {
186 if (!op_info
.may_read()) {
187 // no read found, so it can't be balanced read
190 if (op_info
.may_write() || op_info
.may_cache()) {
191 // write op, but i am not primary
194 // balanced reads; any replica will do
195 return pg
.is_nonprimary();
197 // neither balanced nor localize reads