1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
6 #include <seastar/core/future.hh>
8 #include <boost/intrusive/list.hpp>
9 #include <boost/intrusive_ptr.hpp>
11 #include "osd/osd_op_util.h"
12 #include "crimson/net/Connection.h"
13 #include "crimson/osd/object_context.h"
14 #include "crimson/osd/osdmap_gate.h"
15 #include "crimson/osd/osd_operation.h"
16 #include "crimson/osd/osd_operations/client_request_common.h"
17 #include "crimson/osd/osd_operations/common/pg_pipeline.h"
18 #include "crimson/osd/pg_activation_blocker.h"
19 #include "crimson/osd/pg_map.h"
20 #include "crimson/common/type_helpers.h"
21 #include "crimson/common/utility.h"
22 #include "messages/MOSDOp.h"
24 namespace crimson::osd
{
29 class ClientRequest final
: public PhasedOperationT
<ClientRequest
>,
30 private CommonClientRequest
{
31 // Initially set to primary core, updated to pg core after move,
32 // used by put_historic
33 ShardServices
*put_historic_shard_services
= nullptr;
35 crimson::net::ConnectionRef conn
;
36 // must be after conn due to ConnectionPipeline's life-time
39 seastar::promise
<> on_complete
;
40 unsigned instance_id
= 0;
43 class PGPipeline
: public CommonPGPipeline
{
45 struct AwaitMap
: OrderedExclusivePhaseT
<AwaitMap
> {
46 static constexpr auto type_name
= "ClientRequest::PGPipeline::await_map";
48 struct WaitRepop
: OrderedConcurrentPhaseT
<WaitRepop
> {
49 static constexpr auto type_name
= "ClientRequest::PGPipeline::wait_repop";
51 struct SendReply
: OrderedExclusivePhaseT
<SendReply
> {
52 static constexpr auto type_name
= "ClientRequest::PGPipeline::send_reply";
54 friend class ClientRequest
;
55 friend class LttngBackend
;
56 friend class HistoricBackend
;
57 friend class ReqRequest
;
58 friend class LogMissingRequest
;
59 friend class LogMissingRequestReply
;
65 * Client request is, at present, the only Operation which can be requeued.
66 * This is, mostly, fine. However, reusing the PipelineHandle or
67 * BlockingEvent structures before proving that the prior instance has stopped
68 * can create hangs or crashes due to violations of the BlockerT and
69 * PipelineHandle invariants.
71 * To solve this, we create an instance_handle_t which contains the events
72 * for the portion of execution that can be rerun as well as the
73 * PipelineHandle. ClientRequest::with_pg_int grabs a reference to the current
74 * instance_handle_t and releases its PipelineHandle in the finally block.
75 * On requeue, we create a new instance_handle_t with a fresh PipelineHandle
76 * and events tuple and use it and use it for the next invocation of
81 ConnectionPipeline::AwaitActive::BlockingEvent
,
82 ConnectionPipeline::AwaitMap::BlockingEvent
,
83 OSD_OSDMapGate::OSDMapBlocker::BlockingEvent
,
84 ConnectionPipeline::GetPG::BlockingEvent
,
85 PGMap::PGCreationBlockingEvent
,
89 class instance_handle_t
: public boost::intrusive_ref_counter
<
90 instance_handle_t
, boost::thread_unsafe_counter
> {
92 // intrusive_ptr because seastar::lw_shared_ptr includes a cpu debug check
93 // that we will fail since the core on which we allocate the request may not
94 // be the core on which we perform with_pg_int. This is harmless, since we
95 // don't leave any references on the source core, so we just bypass it by using
96 // intrusive_ptr instead.
97 using ref_t
= boost::intrusive_ptr
<instance_handle_t
>;
98 PipelineHandle handle
;
101 PGPipeline::AwaitMap::BlockingEvent
,
102 PG_OSDMapGate::OSDMapBlocker::BlockingEvent
,
103 PGPipeline::WaitForActive::BlockingEvent
,
104 PGActivationBlocker::BlockingEvent
,
105 PGPipeline::RecoverMissing::BlockingEvent
,
106 PGPipeline::GetOBC::BlockingEvent
,
107 PGPipeline::Process::BlockingEvent
,
108 PGPipeline::WaitRepop::BlockingEvent
,
109 PGPipeline::SendReply::BlockingEvent
,
111 > pg_tracking_events
;
113 template <typename BlockingEventT
, typename InterruptorT
=void, typename F
>
114 auto with_blocking_event(F
&&f
, ClientRequest
&op
) {
115 auto ret
= std::forward
<F
>(f
)(
116 typename
BlockingEventT::template Trigger
<ClientRequest
>{
117 std::get
<BlockingEventT
>(pg_tracking_events
), op
119 if constexpr (std::is_same_v
<InterruptorT
, void>) {
122 using ret_t
= decltype(ret
);
123 return typename
InterruptorT::template futurize_t
<ret_t
>{std::move(ret
)};
127 template <typename InterruptorT
=void, typename StageT
>
128 auto enter_stage(StageT
&stage
, ClientRequest
&op
) {
129 return this->template with_blocking_event
<
130 typename
StageT::BlockingEvent
,
132 [&stage
, this](auto &&trigger
) {
133 return handle
.template enter
<ClientRequest
>(
134 stage
, std::move(trigger
));
139 typename InterruptorT
=void, typename BlockingObj
, typename Method
,
142 ClientRequest
&op
, BlockingObj
&obj
, Method method
, Args
&&... args
) {
143 return this->template with_blocking_event
<
144 typename
BlockingObj::Blocker::BlockingEvent
,
147 args
=std::forward_as_tuple(std::move(args
)...)](auto &&trigger
) mutable {
148 return apply_method_to_tuple(
151 std::forward_as_tuple(std::move(trigger
)),
157 instance_handle_t::ref_t instance_handle
;
158 void reset_instance_handle() {
159 instance_handle
= new instance_handle_t
;
161 auto get_instance_handle() { return instance_handle
; }
163 using ordering_hook_t
= boost::intrusive::list_member_hook
<>;
164 ordering_hook_t ordering_hook
;
166 using list_t
= boost::intrusive::list
<
168 boost::intrusive::member_hook
<
170 typename
ClientRequest::ordering_hook_t
,
171 &ClientRequest::ordering_hook
>
176 void add_request(ClientRequest
&request
) {
177 assert(!request
.ordering_hook
.is_linked());
178 intrusive_ptr_add_ref(&request
);
179 list
.push_back(request
);
181 void remove_request(ClientRequest
&request
) {
182 assert(request
.ordering_hook
.is_linked());
183 list
.erase(list_t::s_iterator_to(request
));
184 intrusive_ptr_release(&request
);
186 void requeue(ShardServices
&shard_services
, Ref
<PG
> pg
);
187 void clear_and_cancel();
189 void complete_request();
191 static constexpr OperationTypeCode type
= OperationTypeCode::client_request
;
194 ShardServices
&shard_services
,
195 crimson::net::ConnectionRef
, Ref
<MOSDOp
> &&m
);
198 void print(std::ostream
&) const final
;
199 void dump_detail(Formatter
*f
) const final
;
201 static constexpr bool can_create() { return false; }
202 spg_t
get_pgid() const {
205 PipelineHandle
&get_handle() { return instance_handle
->handle
; }
206 epoch_t
get_epoch() const { return m
->get_min_epoch(); }
208 ConnectionPipeline
&get_connection_pipeline();
209 seastar::future
<crimson::net::ConnectionFRef
> prepare_remote_submission() {
211 return conn
.get_foreign(
212 ).then([this](auto f_conn
) {
217 void finish_remote_submission(crimson::net::ConnectionFRef _conn
) {
219 conn
= make_local_shared_foreign(std::move(_conn
));
222 seastar::future
<> with_pg_int(
223 ShardServices
&shard_services
, Ref
<PG
> pg
);
226 seastar::future
<> with_pg(
227 ShardServices
&shard_services
, Ref
<PG
> pgref
);
230 template <typename FuncT
>
231 interruptible_future
<> with_sequencer(FuncT
&& func
);
232 auto reply_op_error(const Ref
<PG
>& pg
, int err
);
234 interruptible_future
<> do_process(
235 instance_handle_t
&ihref
,
237 crimson::osd::ObjectContextRef obc
);
238 ::crimson::interruptible::interruptible_future
<
239 ::crimson::osd::IOInterruptCondition
> process_pg_op(
241 ::crimson::interruptible::interruptible_future
<
242 ::crimson::osd::IOInterruptCondition
> process_op(
243 instance_handle_t
&ihref
,
245 bool is_pg_op() const;
247 PGPipeline
&client_pp(PG
&pg
);
249 template <typename Errorator
>
250 using interruptible_errorator
=
251 ::crimson::interruptible::interruptible_errorator
<
252 ::crimson::osd::IOInterruptCondition
,
255 bool is_misdirected(const PG
& pg
) const;
257 const SnapContext
get_snapc(
259 crimson::osd::ObjectContextRef obc
) const;
263 friend class LttngBackend
;
264 friend class HistoricBackend
;
266 auto get_started() const {
267 return get_event
<StartEvent
>().get_timestamp();
270 auto get_completed() const {
271 return get_event
<CompletionEvent
>().get_timestamp();
274 void put_historic() const;
279 #if FMT_VERSION >= 90000
280 template <> struct fmt::formatter
<crimson::osd::ClientRequest
> : fmt::ostream_formatter
{};