1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
9 #include <boost/intrusive_ptr.hpp>
10 #include <boost/smart_ptr/intrusive_ref_counter.hpp>
11 #include <boost/smart_ptr/local_shared_ptr.hpp>
12 #include <seastar/core/chunked_fifo.hh>
13 #include <seastar/core/future.hh>
14 #include <seastar/core/shared_future.hh>
15 #include <seastar/core/shared_ptr.hh>
17 #include "common/dout.h"
18 #include "common/static_ptr.h"
19 #include "messages/MOSDOp.h"
20 #include "os/Transaction.h"
21 #include "osd/osd_types.h"
23 #include "crimson/common/errorator.h"
24 #include "crimson/common/interruptible_future.h"
25 #include "crimson/common/type_helpers.h"
26 #include "crimson/osd/osd_operations/client_request.h"
27 #include "crimson/osd/osd_operations/peering_event.h"
28 #include "crimson/osd/pg_backend.h"
29 #include "crimson/osd/pg_interval_interrupt_condition.h"
30 #include "crimson/osd/shard_services.h"
35 namespace crimson::osd
{
38 // OpsExecuter -- a class for executing ops targeting a certain object.
39 class OpsExecuter
: public seastar::enable_lw_shared_from_this
<OpsExecuter
> {
40 using call_errorator
= crimson::errorator
<
42 crimson::ct_error::enoent
,
43 crimson::ct_error::invarg
,
44 crimson::ct_error::permission_denied
,
45 crimson::ct_error::operation_not_supported
,
46 crimson::ct_error::input_output_error
,
47 crimson::ct_error::value_too_large
>;
48 using read_errorator
= PGBackend::read_errorator
;
49 using write_ertr
= PGBackend::write_ertr
;
50 using get_attr_errorator
= PGBackend::get_attr_errorator
;
51 using watch_errorator
= crimson::errorator
<
52 crimson::ct_error::enoent
,
53 crimson::ct_error::invarg
,
54 crimson::ct_error::not_connected
,
55 crimson::ct_error::timed_out
>;
57 using call_ierrorator
=
58 ::crimson::interruptible::interruptible_errorator
<
59 IOInterruptCondition
, call_errorator
>;
60 using read_ierrorator
=
61 ::crimson::interruptible::interruptible_errorator
<
62 IOInterruptCondition
, read_errorator
>;
64 ::crimson::interruptible::interruptible_errorator
<
65 IOInterruptCondition
, write_ertr
>;
66 using get_attr_ierrorator
=
67 ::crimson::interruptible::interruptible_errorator
<
68 IOInterruptCondition
, get_attr_errorator
>;
69 using watch_ierrorator
=
70 ::crimson::interruptible::interruptible_errorator
<
71 IOInterruptCondition
, watch_errorator
>;
73 template <typename Errorator
, typename T
= void>
74 using interruptible_errorated_future
=
75 ::crimson::interruptible::interruptible_errorated_future
<
76 IOInterruptCondition
, Errorator
, T
>;
78 ::crimson::interruptible::interruptor
<IOInterruptCondition
>;
79 template <typename T
= void>
80 using interruptible_future
=
81 ::crimson::interruptible::interruptible_future
<
82 IOInterruptCondition
, T
>;
85 // ExecutableMessage -- an interface class to allow using OpsExecuter
86 // with other message types than just the `MOSDOp`. The type erasure
87 // happens in the ctor of `OpsExecuter`.
88 struct ExecutableMessage
{
89 virtual crimson::net::ConnectionRef
get_connection() const = 0;
90 virtual osd_reqid_t
get_reqid() const = 0;
91 virtual utime_t
get_mtime() const = 0;
92 virtual epoch_t
get_map_epoch() const = 0;
93 virtual entity_inst_t
get_orig_source_inst() const = 0;
94 virtual uint64_t get_features() const = 0;
97 template <class ImplT
>
98 class ExecutableMessagePimpl final
: ExecutableMessage
{
101 ExecutableMessagePimpl(const ImplT
* pimpl
) : pimpl(pimpl
) {
103 crimson::net::ConnectionRef
get_connection() const final
{
104 return pimpl
->get_connection();
106 osd_reqid_t
get_reqid() const final
{
107 return pimpl
->get_reqid();
109 utime_t
get_mtime() const final
{
110 return pimpl
->get_mtime();
112 epoch_t
get_map_epoch() const final
{
113 return pimpl
->get_map_epoch();
115 entity_inst_t
get_orig_source_inst() const final
{
116 return pimpl
->get_orig_source_inst();
118 uint64_t get_features() const final
{
119 return pimpl
->get_features();
123 // because OpsExecuter is pretty heavy-weight object we want to ensure
124 // it's not copied nor even moved by accident. Performance is the sole
125 // reason for prohibiting that.
126 OpsExecuter(OpsExecuter
&&) = delete;
127 OpsExecuter(const OpsExecuter
&) = delete;
129 using osd_op_errorator
= crimson::compound_errorator_t
<
135 PGBackend::stat_errorator
>;
136 using osd_op_ierrorator
=
137 ::crimson::interruptible::interruptible_errorator
<
138 IOInterruptCondition
, osd_op_errorator
>;
141 // an operation can be divided into two stages: main and effect-exposing
142 // one. The former is performed immediately on call to `do_osd_op()` while
143 // the later on `submit_changes()` – after successfully processing main
144 // stages of all involved operations. When any stage fails, none of all
145 // scheduled effect-exposing stages will be executed.
146 // when operation requires this division, some variant of `with_effect()`
149 // an effect can affect PG, i.e. create a watch timeout
150 virtual osd_op_errorator::future
<> execute(Ref
<PG
> pg
) = 0;
151 virtual ~effect_t() = default;
154 Ref
<PG
> pg
; // for the sake of object class
155 ObjectContextRef obc
;
156 const OpInfo
& op_info
;
157 ceph::static_ptr
<ExecutableMessage
,
158 sizeof(ExecutableMessagePimpl
<void>)> msg
;
159 std::optional
<osd_op_params_t
> osd_op_params
;
160 bool user_modify
= false;
161 ceph::os::Transaction txn
;
163 size_t num_read
= 0; ///< count read ops
164 size_t num_write
= 0; ///< count update ops
165 object_stat_sum_t delta_stats
;
167 // this gizmo could be wrapped in std::optional for the sake of lazy
168 // initialization. we don't need it for ops that doesn't have effect
169 // TODO: verify the init overhead of chunked_fifo
170 seastar::chunked_fifo
<std::unique_ptr
<effect_t
>> op_effects
;
172 template <class Context
, class MainFunc
, class EffectFunc
>
173 auto with_effect_on_obc(
175 MainFunc
&& main_func
,
176 EffectFunc
&& effect_func
);
178 call_ierrorator::future
<> do_op_call(OSDOp
& osd_op
);
179 watch_ierrorator::future
<> do_op_watch(
182 ceph::os::Transaction
& txn
);
183 watch_ierrorator::future
<> do_op_watch_subop_watch(
186 ceph::os::Transaction
& txn
);
187 watch_ierrorator::future
<> do_op_watch_subop_reconnect(
190 ceph::os::Transaction
& txn
);
191 watch_ierrorator::future
<> do_op_watch_subop_unwatch(
194 ceph::os::Transaction
& txn
);
195 watch_ierrorator::future
<> do_op_watch_subop_ping(
198 ceph::os::Transaction
& txn
);
199 watch_ierrorator::future
<> do_op_list_watchers(
201 const ObjectState
& os
);
202 watch_ierrorator::future
<> do_op_notify(
204 const ObjectState
& os
);
205 watch_ierrorator::future
<> do_op_notify_ack(
207 const ObjectState
& os
);
209 template <class Func
>
210 auto do_const_op(Func
&& f
);
212 template <class Func
>
213 auto do_read_op(Func
&& f
) {
215 // TODO: pass backend as read-only
216 return do_const_op(std::forward
<Func
>(f
));
219 template <class Func
>
220 auto do_write_op(Func
&& f
, bool um
);
222 decltype(auto) dont_do_legacy_op() {
223 return crimson::ct_error::operation_not_supported::make();
227 template <class MsgT
>
228 OpsExecuter(Ref
<PG
> pg
,
229 ObjectContextRef obc
,
230 const OpInfo
& op_info
,
235 msg(std::in_place_type_t
<ExecutableMessagePimpl
<MsgT
>>{}, &msg
) {
238 template <class Func
>
239 struct RollbackHelper
;
241 template <class Func
>
242 RollbackHelper
<Func
> create_rollbacker(Func
&& func
);
244 interruptible_errorated_future
<osd_op_errorator
>
245 execute_op(OSDOp
& osd_op
);
247 using rep_op_fut_tuple
=
248 std::tuple
<interruptible_future
<>, osd_op_ierrorator::future
<>>;
250 interruptible_future
<rep_op_fut_tuple
>;
251 template <typename MutFunc
>
252 rep_op_fut_t
flush_changes_n_do_ops_effects(MutFunc
&& mut_func
) &&;
254 const hobject_t
&get_target() const {
255 return obc
->obs
.oi
.soid
;
258 const auto& get_message() const {
262 size_t get_processed_rw_ops_num() const {
263 return num_read
+ num_write
;
266 uint32_t get_pool_stripe_width() const;
268 bool has_seen_write() const {
269 return num_write
> 0;
272 object_stat_sum_t
& get_stats(){
276 version_t
get_last_user_version() const;
279 template <class Context
, class MainFunc
, class EffectFunc
>
280 auto OpsExecuter::with_effect_on_obc(
282 MainFunc
&& main_func
,
283 EffectFunc
&& effect_func
)
285 using context_t
= std::decay_t
<Context
>;
286 // the language offers implicit conversion to pointer-to-function for
287 // lambda only when it's closureless. We enforce this restriction due
288 // the fact that `flush_changes()` std::moves many executer's parts.
289 using allowed_effect_func_t
=
290 seastar::future
<> (*)(context_t
&&, ObjectContextRef
, Ref
<PG
>);
291 static_assert(std::is_convertible_v
<EffectFunc
, allowed_effect_func_t
>,
292 "with_effect function is not allowed to capture");
293 struct task_t final
: effect_t
{
295 EffectFunc effect_func
;
296 ObjectContextRef obc
;
298 task_t(Context
&& ctx
, EffectFunc
&& effect_func
, ObjectContextRef obc
)
299 : ctx(std::move(ctx
)),
300 effect_func(std::move(effect_func
)),
301 obc(std::move(obc
)) {
303 osd_op_errorator::future
<> execute(Ref
<PG
> pg
) final
{
304 return std::move(effect_func
)(std::move(ctx
),
310 std::make_unique
<task_t
>(std::move(ctx
), std::move(effect_func
), obc
);
311 auto& ctx_ref
= task
->ctx
;
312 op_effects
.emplace_back(std::move(task
));
313 return std::forward
<MainFunc
>(main_func
)(ctx_ref
);
316 template <typename MutFunc
>
317 OpsExecuter::rep_op_fut_t
318 OpsExecuter::flush_changes_n_do_ops_effects(MutFunc
&& mut_func
) &&
320 const bool want_mutate
= !txn
.empty();
321 // osd_op_params are instantiated by every wr-like operation.
322 assert(osd_op_params
|| !want_mutate
);
324 rep_op_fut_t maybe_mutated
=
325 interruptor::make_ready_future
<rep_op_fut_tuple
>(
327 interruptor::make_interruptible(osd_op_errorator::now()));
329 osd_op_params
->req_id
= msg
->get_reqid();
330 osd_op_params
->mtime
= msg
->get_mtime();
331 auto [submitted
, all_completed
] = std::forward
<MutFunc
>(mut_func
)(std::move(txn
),
333 std::move(*osd_op_params
),
335 maybe_mutated
= interruptor::make_ready_future
<rep_op_fut_tuple
>(
336 std::move(submitted
),
337 osd_op_ierrorator::future
<>(std::move(all_completed
)));
339 if (__builtin_expect(op_effects
.empty(), true)) {
340 return maybe_mutated
;
342 return maybe_mutated
.then_unpack_interruptible(
343 [this, pg
=std::move(pg
)](auto&& submitted
, auto&& all_completed
) mutable {
344 return interruptor::make_ready_future
<rep_op_fut_tuple
>(
345 std::move(submitted
),
346 all_completed
.safe_then_interruptible([this, pg
=std::move(pg
)] {
347 // let's do the cleaning of `op_effects` in destructor
348 return interruptor::do_for_each(op_effects
,
349 [pg
=std::move(pg
)](auto& op_effect
) {
350 return op_effect
->execute(pg
);
357 template <class Func
>
358 struct OpsExecuter::RollbackHelper
{
359 interruptible_future
<> rollback_obc_if_modified(const std::error_code
& e
);
360 ObjectContextRef
get_obc() const {
364 seastar::lw_shared_ptr
<OpsExecuter
> ox
;
368 template <class Func
>
369 inline OpsExecuter::RollbackHelper
<Func
>
370 OpsExecuter::create_rollbacker(Func
&& func
) {
371 return {shared_from_this(), std::forward
<Func
>(func
)};
375 template <class Func
>
376 OpsExecuter::interruptible_future
<>
377 OpsExecuter::RollbackHelper
<Func
>::rollback_obc_if_modified(
378 const std::error_code
& e
)
380 // Oops, an operation had failed. do_osd_ops() altogether with
381 // OpsExecuter already dropped the ObjectStore::Transaction if
382 // there was any. However, this is not enough to completely
383 // rollback as we gave OpsExecuter the very single copy of `obc`
384 // we maintain and we did it for both reading and writing.
385 // Now all modifications must be reverted.
387 // Let's just reload from the store. Evicting from the shared
388 // LRU would be tricky as next MOSDOp (the one at `get_obc`
389 // phase) could actually already finished the lookup. Fortunately,
390 // this is supposed to live on cold paths, so performance is not
391 // a concern -- simplicity wins.
393 // The conditional's purpose is to efficiently handle hot errors
394 // which may appear as a result of e.g. CEPH_OSD_OP_CMPXATTR or
395 // CEPH_OSD_OP_OMAP_CMP. These are read-like ops and clients
396 // typically append them before any write. If OpsExecuter hasn't
397 // seen any modifying operation, `obc` is supposed to be kept
400 const auto need_rollback
= ox
->has_seen_write();
401 crimson::get_logger(ceph_subsys_osd
).debug(
402 "{}: object {} got error {}, need_rollback={}",
407 return need_rollback
? func(*ox
->obc
) : interruptor::now();
410 // PgOpsExecuter -- a class for executing ops targeting a certain PG.
411 class PgOpsExecuter
{
412 template <typename T
= void>
413 using interruptible_future
=
414 ::crimson::interruptible::interruptible_future
<
415 IOInterruptCondition
, T
>;
418 PgOpsExecuter(const PG
& pg
, const MOSDOp
& msg
)
419 : pg(pg
), nspace(msg
.get_hobj().nspace
) {
422 interruptible_future
<> execute_op(OSDOp
& osd_op
);
426 const std::string
& nspace
;
429 } // namespace crimson::osd