]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #pragma once | |
5 | ||
1e59de90 TL |
6 | #include <seastar/core/future.hh> |
7 | ||
8 | #include <boost/intrusive/list.hpp> | |
9 | #include <boost/intrusive_ptr.hpp> | |
10 | ||
9f95a23c TL |
11 | #include "osd/osd_op_util.h" |
12 | #include "crimson/net/Connection.h" | |
20effc67 | 13 | #include "crimson/osd/object_context.h" |
1e59de90 | 14 | #include "crimson/osd/osdmap_gate.h" |
9f95a23c | 15 | #include "crimson/osd/osd_operation.h" |
20effc67 TL |
16 | #include "crimson/osd/osd_operations/client_request_common.h" |
17 | #include "crimson/osd/osd_operations/common/pg_pipeline.h" | |
1e59de90 TL |
18 | #include "crimson/osd/pg_activation_blocker.h" |
19 | #include "crimson/osd/pg_map.h" | |
9f95a23c | 20 | #include "crimson/common/type_helpers.h" |
1e59de90 | 21 | #include "crimson/common/utility.h" |
f67539c2 | 22 | #include "messages/MOSDOp.h" |
9f95a23c TL |
23 | |
24 | namespace crimson::osd { | |
25 | class PG; | |
26 | class OSD; | |
1e59de90 | 27 | class ShardServices; |
9f95a23c | 28 | |
1e59de90 | 29 | class ClientRequest final : public PhasedOperationT<ClientRequest>, |
20effc67 | 30 | private CommonClientRequest { |
1e59de90 TL |
31 | // Initially set to primary core, updated to pg core after move, |
32 | // used by put_historic | |
33 | ShardServices *put_historic_shard_services = nullptr; | |
34 | ||
9f95a23c | 35 | crimson::net::ConnectionRef conn; |
1e59de90 | 36 | // must be after conn due to ConnectionPipeline's life-time |
9f95a23c TL |
37 | Ref<MOSDOp> m; |
38 | OpInfo op_info; | |
1e59de90 TL |
39 | seastar::promise<> on_complete; |
40 | unsigned instance_id = 0; | |
9f95a23c TL |
41 | |
42 | public: | |
20effc67 | 43 | class PGPipeline : public CommonPGPipeline { |
1e59de90 TL |
44 | public: |
45 | struct AwaitMap : OrderedExclusivePhaseT<AwaitMap> { | |
46 | static constexpr auto type_name = "ClientRequest::PGPipeline::await_map"; | |
47 | } await_map; | |
48 | struct WaitRepop : OrderedConcurrentPhaseT<WaitRepop> { | |
49 | static constexpr auto type_name = "ClientRequest::PGPipeline::wait_repop"; | |
50 | } wait_repop; | |
51 | struct SendReply : OrderedExclusivePhaseT<SendReply> { | |
52 | static constexpr auto type_name = "ClientRequest::PGPipeline::send_reply"; | |
53 | } send_reply; | |
9f95a23c | 54 | friend class ClientRequest; |
1e59de90 TL |
55 | friend class LttngBackend; |
56 | friend class HistoricBackend; | |
57 | friend class ReqRequest; | |
58 | friend class LogMissingRequest; | |
59 | friend class LogMissingRequestReply; | |
60 | }; | |
61 | ||
62 | /** | |
63 | * instance_handle_t | |
64 | * | |
65 | * Client request is, at present, the only Operation which can be requeued. | |
66 | * This is, mostly, fine. However, reusing the PipelineHandle or | |
67 | * BlockingEvent structures before proving that the prior instance has stopped | |
68 | * can create hangs or crashes due to violations of the BlockerT and | |
69 | * PipelineHandle invariants. | |
70 | * | |
71 | * To solve this, we create an instance_handle_t which contains the events | |
72 | * for the portion of execution that can be rerun as well as the | |
73 | * PipelineHandle. ClientRequest::with_pg_int grabs a reference to the current | |
74 | * instance_handle_t and releases its PipelineHandle in the finally block. | |
75 | * On requeue, we create a new instance_handle_t with a fresh PipelineHandle | |
76 | * and events tuple and use it and use it for the next invocation of | |
77 | * with_pg_int. | |
78 | */ | |
79 | std::tuple< | |
80 | StartEvent, | |
81 | ConnectionPipeline::AwaitActive::BlockingEvent, | |
82 | ConnectionPipeline::AwaitMap::BlockingEvent, | |
83 | OSD_OSDMapGate::OSDMapBlocker::BlockingEvent, | |
84 | ConnectionPipeline::GetPG::BlockingEvent, | |
85 | PGMap::PGCreationBlockingEvent, | |
86 | CompletionEvent | |
87 | > tracking_events; | |
88 | ||
89 | class instance_handle_t : public boost::intrusive_ref_counter< | |
90 | instance_handle_t, boost::thread_unsafe_counter> { | |
91 | public: | |
92 | // intrusive_ptr because seastar::lw_shared_ptr includes a cpu debug check | |
93 | // that we will fail since the core on which we allocate the request may not | |
94 | // be the core on which we perform with_pg_int. This is harmless, since we | |
95 | // don't leave any references on the source core, so we just bypass it by using | |
96 | // intrusive_ptr instead. | |
97 | using ref_t = boost::intrusive_ptr<instance_handle_t>; | |
98 | PipelineHandle handle; | |
99 | ||
100 | std::tuple< | |
101 | PGPipeline::AwaitMap::BlockingEvent, | |
102 | PG_OSDMapGate::OSDMapBlocker::BlockingEvent, | |
103 | PGPipeline::WaitForActive::BlockingEvent, | |
104 | PGActivationBlocker::BlockingEvent, | |
105 | PGPipeline::RecoverMissing::BlockingEvent, | |
106 | PGPipeline::GetOBC::BlockingEvent, | |
107 | PGPipeline::Process::BlockingEvent, | |
108 | PGPipeline::WaitRepop::BlockingEvent, | |
109 | PGPipeline::SendReply::BlockingEvent, | |
110 | CompletionEvent | |
111 | > pg_tracking_events; | |
112 | ||
113 | template <typename BlockingEventT, typename InterruptorT=void, typename F> | |
114 | auto with_blocking_event(F &&f, ClientRequest &op) { | |
115 | auto ret = std::forward<F>(f)( | |
116 | typename BlockingEventT::template Trigger<ClientRequest>{ | |
117 | std::get<BlockingEventT>(pg_tracking_events), op | |
118 | }); | |
119 | if constexpr (std::is_same_v<InterruptorT, void>) { | |
120 | return ret; | |
121 | } else { | |
122 | using ret_t = decltype(ret); | |
123 | return typename InterruptorT::template futurize_t<ret_t>{std::move(ret)}; | |
124 | } | |
125 | } | |
126 | ||
127 | template <typename InterruptorT=void, typename StageT> | |
128 | auto enter_stage(StageT &stage, ClientRequest &op) { | |
129 | return this->template with_blocking_event< | |
130 | typename StageT::BlockingEvent, | |
131 | InterruptorT>( | |
132 | [&stage, this](auto &&trigger) { | |
133 | return handle.template enter<ClientRequest>( | |
134 | stage, std::move(trigger)); | |
135 | }, op); | |
136 | } | |
137 | ||
138 | template < | |
139 | typename InterruptorT=void, typename BlockingObj, typename Method, | |
140 | typename... Args> | |
141 | auto enter_blocker( | |
142 | ClientRequest &op, BlockingObj &obj, Method method, Args&&... args) { | |
143 | return this->template with_blocking_event< | |
144 | typename BlockingObj::Blocker::BlockingEvent, | |
145 | InterruptorT>( | |
146 | [&obj, method, | |
147 | args=std::forward_as_tuple(std::move(args)...)](auto &&trigger) mutable { | |
148 | return apply_method_to_tuple( | |
149 | obj, method, | |
150 | std::tuple_cat( | |
151 | std::forward_as_tuple(std::move(trigger)), | |
152 | std::move(args)) | |
153 | ); | |
154 | }, op); | |
155 | } | |
9f95a23c | 156 | }; |
1e59de90 TL |
157 | instance_handle_t::ref_t instance_handle; |
158 | void reset_instance_handle() { | |
159 | instance_handle = new instance_handle_t; | |
160 | } | |
161 | auto get_instance_handle() { return instance_handle; } | |
162 | ||
163 | using ordering_hook_t = boost::intrusive::list_member_hook<>; | |
164 | ordering_hook_t ordering_hook; | |
165 | class Orderer { | |
166 | using list_t = boost::intrusive::list< | |
167 | ClientRequest, | |
168 | boost::intrusive::member_hook< | |
169 | ClientRequest, | |
170 | typename ClientRequest::ordering_hook_t, | |
171 | &ClientRequest::ordering_hook> | |
172 | >; | |
173 | list_t list; | |
174 | ||
175 | public: | |
176 | void add_request(ClientRequest &request) { | |
177 | assert(!request.ordering_hook.is_linked()); | |
178 | intrusive_ptr_add_ref(&request); | |
179 | list.push_back(request); | |
180 | } | |
181 | void remove_request(ClientRequest &request) { | |
182 | assert(request.ordering_hook.is_linked()); | |
183 | list.erase(list_t::s_iterator_to(request)); | |
184 | intrusive_ptr_release(&request); | |
185 | } | |
186 | void requeue(ShardServices &shard_services, Ref<PG> pg); | |
187 | void clear_and_cancel(); | |
188 | }; | |
189 | void complete_request(); | |
9f95a23c TL |
190 | |
191 | static constexpr OperationTypeCode type = OperationTypeCode::client_request; | |
192 | ||
1e59de90 TL |
193 | ClientRequest( |
194 | ShardServices &shard_services, | |
195 | crimson::net::ConnectionRef, Ref<MOSDOp> &&m); | |
20effc67 | 196 | ~ClientRequest(); |
9f95a23c TL |
197 | |
198 | void print(std::ostream &) const final; | |
199 | void dump_detail(Formatter *f) const final; | |
200 | ||
1e59de90 TL |
201 | static constexpr bool can_create() { return false; } |
202 | spg_t get_pgid() const { | |
203 | return m->get_spg(); | |
204 | } | |
205 | PipelineHandle &get_handle() { return instance_handle->handle; } | |
206 | epoch_t get_epoch() const { return m->get_min_epoch(); } | |
207 | ||
208 | ConnectionPipeline &get_connection_pipeline(); | |
209 | seastar::future<crimson::net::ConnectionFRef> prepare_remote_submission() { | |
210 | assert(conn); | |
211 | return conn.get_foreign( | |
212 | ).then([this](auto f_conn) { | |
213 | conn.reset(); | |
214 | return f_conn; | |
215 | }); | |
216 | } | |
217 | void finish_remote_submission(crimson::net::ConnectionFRef _conn) { | |
218 | assert(!conn); | |
219 | conn = make_local_shared_foreign(std::move(_conn)); | |
220 | } | |
221 | ||
222 | seastar::future<> with_pg_int( | |
223 | ShardServices &shard_services, Ref<PG> pg); | |
224 | ||
9f95a23c | 225 | public: |
1e59de90 TL |
226 | seastar::future<> with_pg( |
227 | ShardServices &shard_services, Ref<PG> pgref); | |
9f95a23c TL |
228 | |
229 | private: | |
20effc67 TL |
230 | template <typename FuncT> |
231 | interruptible_future<> with_sequencer(FuncT&& func); | |
1e59de90 | 232 | auto reply_op_error(const Ref<PG>& pg, int err); |
20effc67 | 233 | |
1e59de90 TL |
234 | interruptible_future<> do_process( |
235 | instance_handle_t &ihref, | |
20effc67 TL |
236 | Ref<PG>& pg, |
237 | crimson::osd::ObjectContextRef obc); | |
238 | ::crimson::interruptible::interruptible_future< | |
239 | ::crimson::osd::IOInterruptCondition> process_pg_op( | |
f67539c2 | 240 | Ref<PG> &pg); |
20effc67 | 241 | ::crimson::interruptible::interruptible_future< |
1e59de90 TL |
242 | ::crimson::osd::IOInterruptCondition> process_op( |
243 | instance_handle_t &ihref, | |
244 | Ref<PG> &pg); | |
9f95a23c TL |
245 | bool is_pg_op() const; |
246 | ||
aee94f69 | 247 | PGPipeline &client_pp(PG &pg); |
f67539c2 | 248 | |
20effc67 TL |
249 | template <typename Errorator> |
250 | using interruptible_errorator = | |
251 | ::crimson::interruptible::interruptible_errorator< | |
252 | ::crimson::osd::IOInterruptCondition, | |
253 | Errorator>; | |
1e59de90 | 254 | |
f67539c2 | 255 | bool is_misdirected(const PG& pg) const; |
1e59de90 TL |
256 | |
257 | const SnapContext get_snapc( | |
258 | Ref<PG>& pg, | |
259 | crimson::osd::ObjectContextRef obc) const; | |
260 | ||
261 | public: | |
262 | ||
263 | friend class LttngBackend; | |
264 | friend class HistoricBackend; | |
265 | ||
266 | auto get_started() const { | |
267 | return get_event<StartEvent>().get_timestamp(); | |
268 | }; | |
269 | ||
270 | auto get_completed() const { | |
271 | return get_event<CompletionEvent>().get_timestamp(); | |
272 | }; | |
273 | ||
274 | void put_historic() const; | |
9f95a23c TL |
275 | }; |
276 | ||
277 | } | |
1e59de90 TL |
278 | |
279 | #if FMT_VERSION >= 90000 | |
280 | template <> struct fmt::formatter<crimson::osd::ClientRequest> : fmt::ostream_formatter {}; | |
281 | #endif |