1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
2 // vim: ts=8 sw=2 smarttab expandtab
4 #include "messages/MOSDOp.h"
5 #include "messages/MOSDOpReply.h"
7 #include "crimson/common/exception.h"
8 #include "crimson/osd/pg.h"
9 #include "crimson/osd/osd.h"
10 #include "common/Formatter.h"
11 #include "crimson/osd/osd_operation_external_tracking.h"
12 #include "crimson/osd/osd_operations/client_request.h"
13 #include "crimson/osd/osd_connection_priv.h"
14 #include "osd/object_state_fmt.h"
17 seastar::logger
& logger() {
18 return crimson::get_logger(ceph_subsys_osd
);
22 namespace crimson::osd
{
25 void ClientRequest::Orderer::requeue(
26 ShardServices
&shard_services
, Ref
<PG
> pg
)
28 for (auto &req
: list
) {
29 logger().debug("{}: {} requeueing {}", __func__
, *pg
, req
);
30 req
.reset_instance_handle();
31 std::ignore
= req
.with_pg_int(shard_services
, pg
);
35 void ClientRequest::Orderer::clear_and_cancel()
37 for (auto i
= list
.begin(); i
!= list
.end(); ) {
39 "ClientRequest::Orderer::clear_and_cancel: {}",
41 i
->complete_request();
42 remove_request(*(i
++));
46 void ClientRequest::complete_request()
48 track_event
<CompletionEvent
>();
49 on_complete
.set_value();
52 ClientRequest::ClientRequest(
53 ShardServices
&shard_services
, crimson::net::ConnectionRef conn
,
55 : put_historic_shard_services(&shard_services
),
56 conn(std::move(conn
)),
58 instance_handle(new instance_handle_t
)
61 ClientRequest::~ClientRequest()
63 logger().debug("{}: destroying", *this);
66 void ClientRequest::print(std::ostream
&lhs
) const
68 lhs
<< "m=[" << *m
<< "]";
71 void ClientRequest::dump_detail(Formatter
*f
) const
73 logger().debug("{}: dumping", *this);
74 std::apply([f
] (auto... event
) {
79 ConnectionPipeline
&ClientRequest::get_connection_pipeline()
81 return get_osd_priv(conn
.get()).client_request_conn_pipeline
;
84 ClientRequest::PGPipeline
&ClientRequest::client_pp(PG
&pg
)
86 return pg
.request_pg_pipeline
;
89 bool ClientRequest::is_pg_op() const
92 begin(m
->ops
), end(m
->ops
),
93 [](auto& op
) { return ceph_osd_op_type_pg(op
.op
.op
); });
96 seastar::future
<> ClientRequest::with_pg_int(
97 ShardServices
&shard_services
, Ref
<PG
> pgref
)
99 epoch_t same_interval_since
= pgref
->get_interval_start_epoch();
100 logger().debug("{} same_interval_since: {}", *this, same_interval_since
);
101 if (m
->finish_decode()) {
104 const auto this_instance_id
= instance_id
++;
105 OperationRef opref
{this};
106 auto instance_handle
= get_instance_handle();
107 auto &ihref
= *instance_handle
;
108 return interruptor::with_interruption(
109 [this, pgref
, this_instance_id
, &ihref
, &shard_services
]() mutable {
111 if (pg
.can_discard_op(*m
)) {
112 return shard_services
.send_incremental_map(
113 std::ref(*conn
), m
->get_map_epoch()
114 ).then([this, this_instance_id
, pgref
] {
115 logger().debug("{}.{}: discarding", *this, this_instance_id
);
116 pgref
->client_request_orderer
.remove_request(*this);
118 return interruptor::now();
121 return ihref
.enter_stage
<interruptor
>(client_pp(pg
).await_map
, *this
122 ).then_interruptible([this, this_instance_id
, &pg
, &ihref
] {
123 logger().debug("{}.{}: after await_map stage", *this, this_instance_id
);
124 return ihref
.enter_blocker(
125 *this, pg
.osdmap_gate
, &decltype(pg
.osdmap_gate
)::wait_for_map
,
126 m
->get_min_epoch(), nullptr);
127 }).then_interruptible([this, this_instance_id
, &pg
, &ihref
](auto map
) {
128 logger().debug("{}.{}: after wait_for_map", *this, this_instance_id
);
129 return ihref
.enter_stage
<interruptor
>(client_pp(pg
).wait_for_active
, *this);
130 }).then_interruptible([this, this_instance_id
, &pg
, &ihref
]() {
132 "{}.{}: after wait_for_active stage", *this, this_instance_id
);
133 return ihref
.enter_blocker(
135 pg
.wait_for_active_blocker
,
136 &decltype(pg
.wait_for_active_blocker
)::wait
);
137 }).then_interruptible([this, pgref
, this_instance_id
, &ihref
]() mutable
138 -> interruptible_future
<> {
140 "{}.{}: after wait_for_active", *this, this_instance_id
);
142 return process_pg_op(pgref
);
144 return process_op(ihref
, pgref
);
146 }).then_interruptible([this, this_instance_id
, pgref
] {
147 logger().debug("{}.{}: after process*", *this, this_instance_id
);
148 pgref
->client_request_orderer
.remove_request(*this);
151 }, [this, this_instance_id
, pgref
](std::exception_ptr eptr
) {
152 // TODO: better debug output
153 logger().debug("{}.{}: interrupted {}", *this, this_instance_id
, eptr
);
155 [opref
=std::move(opref
), pgref
=std::move(pgref
),
156 instance_handle
=std::move(instance_handle
), &ihref
] {
161 seastar::future
<> ClientRequest::with_pg(
162 ShardServices
&shard_services
, Ref
<PG
> pgref
)
164 put_historic_shard_services
= &shard_services
;
165 pgref
->client_request_orderer
.add_request(*this);
166 auto ret
= on_complete
.get_future();
167 std::ignore
= with_pg_int(
168 shard_services
, std::move(pgref
)
173 ClientRequest::interruptible_future
<>
174 ClientRequest::process_pg_op(
177 return pg
->do_pg_ops(
179 ).then_interruptible([this, pg
=std::move(pg
)](MURef
<MOSDOpReply
> reply
) {
180 return conn
->send(std::move(reply
));
184 auto ClientRequest::reply_op_error(const Ref
<PG
>& pg
, int err
)
186 logger().debug("{}: replying with error {}", *this, err
);
187 auto reply
= crimson::make_message
<MOSDOpReply
>(
188 m
.get(), err
, pg
->get_osdmap_epoch(),
189 m
->get_flags() & (CEPH_OSD_FLAG_ACK
|CEPH_OSD_FLAG_ONDISK
),
190 !m
->has_flag(CEPH_OSD_FLAG_RETURNVEC
));
191 reply
->set_reply_versions(eversion_t(), 0);
192 reply
->set_op_returns(std::vector
<pg_log_op_return_item_t
>{});
193 return conn
->send(std::move(reply
));
196 ClientRequest::interruptible_future
<>
197 ClientRequest::process_op(instance_handle_t
&ihref
, Ref
<PG
> &pg
)
199 return ihref
.enter_stage
<interruptor
>(
200 client_pp(*pg
).recover_missing
,
202 ).then_interruptible(
203 [this, pg
]() mutable {
204 if (pg
->is_primary()) {
205 return do_recover_missing(pg
, m
->get_hobj());
207 logger().debug("process_op: Skipping do_recover_missing"
208 "on non primary pg");
209 return interruptor::now();
211 }).then_interruptible([this, pg
, &ihref
]() mutable {
212 return pg
->already_complete(m
->get_reqid()).then_interruptible(
213 [this, pg
, &ihref
](auto completed
) mutable
214 -> PG::load_obc_iertr::future
<> {
216 auto reply
= crimson::make_message
<MOSDOpReply
>(
217 m
.get(), completed
->err
, pg
->get_osdmap_epoch(),
218 CEPH_OSD_FLAG_ACK
| CEPH_OSD_FLAG_ONDISK
, false);
219 reply
->set_reply_versions(completed
->version
, completed
->user_version
);
220 return conn
->send(std::move(reply
));
222 return ihref
.enter_stage
<interruptor
>(client_pp(*pg
).get_obc
, *this
223 ).then_interruptible(
224 [this, pg
, &ihref
]() mutable -> PG::load_obc_iertr::future
<> {
225 logger().debug("{}: in get_obc stage", *this);
226 op_info
.set_from_op(&*m
, *pg
->get_osdmap());
227 return pg
->with_locked_obc(
228 m
->get_hobj(), op_info
,
229 [this, pg
, &ihref
](auto obc
) mutable {
230 logger().debug("{}: got obc {}", *this, obc
->obs
);
231 return ihref
.enter_stage
<interruptor
>(
232 client_pp(*pg
).process
, *this
233 ).then_interruptible([this, pg
, obc
, &ihref
]() mutable {
234 return do_process(ihref
, pg
, obc
);
240 }).handle_error_interruptible(
241 PG::load_obc_ertr::all_same_way([this, pg
=std::move(pg
)](const auto &code
) {
242 logger().error("ClientRequest saw error code {}", code
);
243 assert(code
.value() > 0);
244 return reply_op_error(pg
, -code
.value());
248 ClientRequest::interruptible_future
<>
249 ClientRequest::do_process(
250 instance_handle_t
&ihref
,
251 Ref
<PG
>& pg
, crimson::osd::ObjectContextRef obc
)
253 if (m
->has_flag(CEPH_OSD_FLAG_PARALLELEXEC
)) {
254 return reply_op_error(pg
, -EINVAL
);
256 const pg_pool_t pool
= pg
->get_pgpool().info
;
257 if (pool
.has_flag(pg_pool_t::FLAG_EIO
)) {
258 // drop op on the floor; the client will handle returning EIO
259 if (m
->has_flag(CEPH_OSD_FLAG_SUPPORTSPOOLEIO
)) {
260 logger().debug("discarding op due to pool EIO flag");
261 return seastar::now();
263 logger().debug("replying EIO due to pool EIO flag");
264 return reply_op_error(pg
, -EIO
);
267 if (m
->get_oid().name
.size()
268 > crimson::common::local_conf()->osd_max_object_name_len
) {
269 return reply_op_error(pg
, -ENAMETOOLONG
);
270 } else if (m
->get_hobj().get_key().size()
271 > crimson::common::local_conf()->osd_max_object_name_len
) {
272 return reply_op_error(pg
, -ENAMETOOLONG
);
273 } else if (m
->get_hobj().nspace
.size()
274 > crimson::common::local_conf()->osd_max_object_namespace_len
) {
275 return reply_op_error(pg
, -ENAMETOOLONG
);
276 } else if (m
->get_hobj().oid
.name
.empty()) {
277 return reply_op_error(pg
, -EINVAL
);
278 } else if (pg
->get_osdmap()->is_blocklisted(conn
->get_peer_addr())) {
279 logger().info("{} is blocklisted", conn
->get_peer_addr());
280 return reply_op_error(pg
, -EBLOCKLISTED
);
283 if (!obc
->obs
.exists
&& !op_info
.may_write()) {
284 return reply_op_error(pg
, -ENOENT
);
287 SnapContext snapc
= get_snapc(pg
,obc
);
289 if ((m
->has_flag(CEPH_OSD_FLAG_ORDERSNAP
)) &&
290 snapc
.seq
< obc
->ssc
->snapset
.seq
) {
291 logger().debug("{} ORDERSNAP flag set and snapc seq {}",
292 " < snapset seq {} on {}",
293 __func__
, snapc
.seq
, obc
->ssc
->snapset
.seq
,
295 return reply_op_error(pg
, -EOLDSNAPC
);
298 if (!pg
->is_primary()) {
299 // primary can handle both normal ops and balanced reads
300 if (is_misdirected(*pg
)) {
301 logger().trace("do_process: dropping misdirected op");
302 return seastar::now();
303 } else if (const hobject_t
& hoid
= m
->get_hobj();
304 !pg
->get_peering_state().can_serve_replica_read(hoid
)) {
305 logger().debug("{}: unstable write on replica, "
306 "bouncing to primary",
308 return reply_op_error(pg
, -EAGAIN
);
310 logger().debug("{}: serving replica read on oid {}",
311 __func__
, m
->get_hobj());
314 return pg
->do_osd_ops(m
, conn
, obc
, op_info
, snapc
).safe_then_unpack_interruptible(
315 [this, pg
, &ihref
](auto submitted
, auto all_completed
) mutable {
316 return submitted
.then_interruptible([this, pg
, &ihref
] {
317 return ihref
.enter_stage
<interruptor
>(client_pp(*pg
).wait_repop
, *this);
318 }).then_interruptible(
319 [this, pg
, all_completed
=std::move(all_completed
), &ihref
]() mutable {
320 return all_completed
.safe_then_interruptible(
321 [this, pg
, &ihref
](MURef
<MOSDOpReply
> reply
) {
322 return ihref
.enter_stage
<interruptor
>(client_pp(*pg
).send_reply
, *this
323 ).then_interruptible(
324 [this, reply
=std::move(reply
)]() mutable {
325 logger().debug("{}: sending response", *this);
326 return conn
->send(std::move(reply
));
328 }, crimson::ct_error::eagain::handle([this, pg
, &ihref
]() mutable {
329 return process_op(ihref
, pg
);
332 }, crimson::ct_error::eagain::handle([this, pg
, &ihref
]() mutable {
333 return process_op(ihref
, pg
);
337 bool ClientRequest::is_misdirected(const PG
& pg
) const
339 // otherwise take a closer look
340 if (const int flags
= m
->get_flags();
341 flags
& CEPH_OSD_FLAG_BALANCE_READS
||
342 flags
& CEPH_OSD_FLAG_LOCALIZE_READS
) {
343 if (!op_info
.may_read()) {
344 // no read found, so it can't be balanced read
347 if (op_info
.may_write() || op_info
.may_cache()) {
348 // write op, but i am not primary
351 // balanced reads; any replica will do
354 // neither balanced nor localize reads
358 void ClientRequest::put_historic() const
360 ceph_assert_always(put_historic_shard_services
);
361 put_historic_shard_services
->get_registry().put_historic(*this);
364 const SnapContext
ClientRequest::get_snapc(
366 crimson::osd::ObjectContextRef obc
) const
369 if (op_info
.may_write() || op_info
.may_cache()) {
371 if (pg
->get_pgpool().info
.is_pool_snaps_mode()) {
373 snapc
= pg
->get_pgpool().snapc
;
374 logger().debug("{} using pool's snapc snaps={}",
375 __func__
, snapc
.snaps
);
378 // client specified snapc
379 snapc
.seq
= m
->get_snap_seq();
380 snapc
.snaps
= m
->get_snaps();
381 logger().debug("{} client specified snapc seq={} snaps={}",
382 __func__
, snapc
.seq
, snapc
.snaps
);