]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/osd/ops_executer.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / crimson / osd / ops_executer.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #pragma once
5
6 #include <memory>
7 #include <type_traits>
8 #include <utility>
9 #include <boost/intrusive_ptr.hpp>
10 #include <boost/smart_ptr/intrusive_ref_counter.hpp>
11 #include <boost/smart_ptr/local_shared_ptr.hpp>
12 #include <seastar/core/chunked_fifo.hh>
13 #include <seastar/core/future.hh>
14 #include <seastar/core/shared_future.hh>
15 #include <seastar/core/shared_ptr.hh>
16
17 #include "common/dout.h"
18 #include "common/static_ptr.h"
19 #include "messages/MOSDOp.h"
20 #include "os/Transaction.h"
21 #include "osd/osd_types.h"
22
23 #include "crimson/common/errorator.h"
24 #include "crimson/common/interruptible_future.h"
25 #include "crimson/common/type_helpers.h"
26 #include "crimson/osd/osd_operations/client_request.h"
27 #include "crimson/osd/osd_operations/peering_event.h"
28 #include "crimson/osd/pg_backend.h"
29 #include "crimson/osd/pg_interval_interrupt_condition.h"
30 #include "crimson/osd/shard_services.h"
31
32 struct ObjectState;
33 struct OSDOp;
34
35 namespace crimson::osd {
36 class PG;
37
38 // OpsExecuter -- a class for executing ops targeting a certain object.
39 class OpsExecuter : public seastar::enable_lw_shared_from_this<OpsExecuter> {
40 using call_errorator = crimson::errorator<
41 crimson::stateful_ec,
42 crimson::ct_error::enoent,
43 crimson::ct_error::invarg,
44 crimson::ct_error::permission_denied,
45 crimson::ct_error::operation_not_supported,
46 crimson::ct_error::input_output_error,
47 crimson::ct_error::value_too_large>;
48 using read_errorator = PGBackend::read_errorator;
49 using write_ertr = PGBackend::write_ertr;
50 using get_attr_errorator = PGBackend::get_attr_errorator;
51 using watch_errorator = crimson::errorator<
52 crimson::ct_error::enoent,
53 crimson::ct_error::invarg,
54 crimson::ct_error::not_connected,
55 crimson::ct_error::timed_out>;
56
57 using call_ierrorator =
58 ::crimson::interruptible::interruptible_errorator<
59 IOInterruptCondition, call_errorator>;
60 using read_ierrorator =
61 ::crimson::interruptible::interruptible_errorator<
62 IOInterruptCondition, read_errorator>;
63 using write_iertr =
64 ::crimson::interruptible::interruptible_errorator<
65 IOInterruptCondition, write_ertr>;
66 using get_attr_ierrorator =
67 ::crimson::interruptible::interruptible_errorator<
68 IOInterruptCondition, get_attr_errorator>;
69 using watch_ierrorator =
70 ::crimson::interruptible::interruptible_errorator<
71 IOInterruptCondition, watch_errorator>;
72
73 template <typename Errorator, typename T = void>
74 using interruptible_errorated_future =
75 ::crimson::interruptible::interruptible_errorated_future<
76 IOInterruptCondition, Errorator, T>;
77 using interruptor =
78 ::crimson::interruptible::interruptor<IOInterruptCondition>;
79 template <typename T = void>
80 using interruptible_future =
81 ::crimson::interruptible::interruptible_future<
82 IOInterruptCondition, T>;
83
84 public:
85 // ExecutableMessage -- an interface class to allow using OpsExecuter
86 // with other message types than just the `MOSDOp`. The type erasure
87 // happens in the ctor of `OpsExecuter`.
88 struct ExecutableMessage {
89 virtual crimson::net::ConnectionRef get_connection() const = 0;
90 virtual osd_reqid_t get_reqid() const = 0;
91 virtual utime_t get_mtime() const = 0;
92 virtual epoch_t get_map_epoch() const = 0;
93 virtual entity_inst_t get_orig_source_inst() const = 0;
94 virtual uint64_t get_features() const = 0;
95 };
96
97 template <class ImplT>
98 class ExecutableMessagePimpl final : ExecutableMessage {
99 const ImplT* pimpl;
100 public:
101 ExecutableMessagePimpl(const ImplT* pimpl) : pimpl(pimpl) {
102 }
103 crimson::net::ConnectionRef get_connection() const final {
104 return pimpl->get_connection();
105 }
106 osd_reqid_t get_reqid() const final {
107 return pimpl->get_reqid();
108 }
109 utime_t get_mtime() const final {
110 return pimpl->get_mtime();
111 };
112 epoch_t get_map_epoch() const final {
113 return pimpl->get_map_epoch();
114 }
115 entity_inst_t get_orig_source_inst() const final {
116 return pimpl->get_orig_source_inst();
117 }
118 uint64_t get_features() const final {
119 return pimpl->get_features();
120 }
121 };
122
123 // because OpsExecuter is pretty heavy-weight object we want to ensure
124 // it's not copied nor even moved by accident. Performance is the sole
125 // reason for prohibiting that.
126 OpsExecuter(OpsExecuter&&) = delete;
127 OpsExecuter(const OpsExecuter&) = delete;
128
129 using osd_op_errorator = crimson::compound_errorator_t<
130 call_errorator,
131 read_errorator,
132 write_ertr,
133 get_attr_errorator,
134 watch_errorator,
135 PGBackend::stat_errorator>;
136 using osd_op_ierrorator =
137 ::crimson::interruptible::interruptible_errorator<
138 IOInterruptCondition, osd_op_errorator>;
139
140 private:
141 // an operation can be divided into two stages: main and effect-exposing
142 // one. The former is performed immediately on call to `do_osd_op()` while
143 // the later on `submit_changes()` – after successfully processing main
144 // stages of all involved operations. When any stage fails, none of all
145 // scheduled effect-exposing stages will be executed.
146 // when operation requires this division, some variant of `with_effect()`
147 // should be used.
148 struct effect_t {
149 // an effect can affect PG, i.e. create a watch timeout
150 virtual osd_op_errorator::future<> execute(Ref<PG> pg) = 0;
151 virtual ~effect_t() = default;
152 };
153
154 Ref<PG> pg; // for the sake of object class
155 ObjectContextRef obc;
156 const OpInfo& op_info;
157 ceph::static_ptr<ExecutableMessage,
158 sizeof(ExecutableMessagePimpl<void>)> msg;
159 std::optional<osd_op_params_t> osd_op_params;
160 bool user_modify = false;
161 ceph::os::Transaction txn;
162
163 size_t num_read = 0; ///< count read ops
164 size_t num_write = 0; ///< count update ops
165 object_stat_sum_t delta_stats;
166
167 // this gizmo could be wrapped in std::optional for the sake of lazy
168 // initialization. we don't need it for ops that doesn't have effect
169 // TODO: verify the init overhead of chunked_fifo
170 seastar::chunked_fifo<std::unique_ptr<effect_t>> op_effects;
171
172 template <class Context, class MainFunc, class EffectFunc>
173 auto with_effect_on_obc(
174 Context&& ctx,
175 MainFunc&& main_func,
176 EffectFunc&& effect_func);
177
178 call_ierrorator::future<> do_op_call(OSDOp& osd_op);
179 watch_ierrorator::future<> do_op_watch(
180 OSDOp& osd_op,
181 ObjectState& os,
182 ceph::os::Transaction& txn);
183 watch_ierrorator::future<> do_op_watch_subop_watch(
184 OSDOp& osd_op,
185 ObjectState& os,
186 ceph::os::Transaction& txn);
187 watch_ierrorator::future<> do_op_watch_subop_reconnect(
188 OSDOp& osd_op,
189 ObjectState& os,
190 ceph::os::Transaction& txn);
191 watch_ierrorator::future<> do_op_watch_subop_unwatch(
192 OSDOp& osd_op,
193 ObjectState& os,
194 ceph::os::Transaction& txn);
195 watch_ierrorator::future<> do_op_watch_subop_ping(
196 OSDOp& osd_op,
197 ObjectState& os,
198 ceph::os::Transaction& txn);
199 watch_ierrorator::future<> do_op_list_watchers(
200 OSDOp& osd_op,
201 const ObjectState& os);
202 watch_ierrorator::future<> do_op_notify(
203 OSDOp& osd_op,
204 const ObjectState& os);
205 watch_ierrorator::future<> do_op_notify_ack(
206 OSDOp& osd_op,
207 const ObjectState& os);
208
209 template <class Func>
210 auto do_const_op(Func&& f);
211
212 template <class Func>
213 auto do_read_op(Func&& f) {
214 ++num_read;
215 // TODO: pass backend as read-only
216 return do_const_op(std::forward<Func>(f));
217 }
218
219 template <class Func>
220 auto do_write_op(Func&& f, bool um);
221
222 decltype(auto) dont_do_legacy_op() {
223 return crimson::ct_error::operation_not_supported::make();
224 }
225
226 public:
227 template <class MsgT>
228 OpsExecuter(Ref<PG> pg,
229 ObjectContextRef obc,
230 const OpInfo& op_info,
231 const MsgT& msg)
232 : pg(std::move(pg)),
233 obc(std::move(obc)),
234 op_info(op_info),
235 msg(std::in_place_type_t<ExecutableMessagePimpl<MsgT>>{}, &msg) {
236 }
237
238 template <class Func>
239 struct RollbackHelper;
240
241 template <class Func>
242 RollbackHelper<Func> create_rollbacker(Func&& func);
243
244 interruptible_errorated_future<osd_op_errorator>
245 execute_op(OSDOp& osd_op);
246
247 using rep_op_fut_tuple =
248 std::tuple<interruptible_future<>, osd_op_ierrorator::future<>>;
249 using rep_op_fut_t =
250 interruptible_future<rep_op_fut_tuple>;
251 template <typename MutFunc>
252 rep_op_fut_t flush_changes_n_do_ops_effects(MutFunc&& mut_func) &&;
253
254 const hobject_t &get_target() const {
255 return obc->obs.oi.soid;
256 }
257
258 const auto& get_message() const {
259 return *msg;
260 }
261
262 size_t get_processed_rw_ops_num() const {
263 return num_read + num_write;
264 }
265
266 uint32_t get_pool_stripe_width() const;
267
268 bool has_seen_write() const {
269 return num_write > 0;
270 }
271
272 object_stat_sum_t& get_stats(){
273 return delta_stats;
274 }
275
276 version_t get_last_user_version() const;
277 };
278
279 template <class Context, class MainFunc, class EffectFunc>
280 auto OpsExecuter::with_effect_on_obc(
281 Context&& ctx,
282 MainFunc&& main_func,
283 EffectFunc&& effect_func)
284 {
285 using context_t = std::decay_t<Context>;
286 // the language offers implicit conversion to pointer-to-function for
287 // lambda only when it's closureless. We enforce this restriction due
288 // the fact that `flush_changes()` std::moves many executer's parts.
289 using allowed_effect_func_t =
290 seastar::future<> (*)(context_t&&, ObjectContextRef, Ref<PG>);
291 static_assert(std::is_convertible_v<EffectFunc, allowed_effect_func_t>,
292 "with_effect function is not allowed to capture");
293 struct task_t final : effect_t {
294 context_t ctx;
295 EffectFunc effect_func;
296 ObjectContextRef obc;
297
298 task_t(Context&& ctx, EffectFunc&& effect_func, ObjectContextRef obc)
299 : ctx(std::move(ctx)),
300 effect_func(std::move(effect_func)),
301 obc(std::move(obc)) {
302 }
303 osd_op_errorator::future<> execute(Ref<PG> pg) final {
304 return std::move(effect_func)(std::move(ctx),
305 std::move(obc),
306 std::move(pg));
307 }
308 };
309 auto task =
310 std::make_unique<task_t>(std::move(ctx), std::move(effect_func), obc);
311 auto& ctx_ref = task->ctx;
312 op_effects.emplace_back(std::move(task));
313 return std::forward<MainFunc>(main_func)(ctx_ref);
314 }
315
316 template <typename MutFunc>
317 OpsExecuter::rep_op_fut_t
318 OpsExecuter::flush_changes_n_do_ops_effects(MutFunc&& mut_func) &&
319 {
320 const bool want_mutate = !txn.empty();
321 // osd_op_params are instantiated by every wr-like operation.
322 assert(osd_op_params || !want_mutate);
323 assert(obc);
324 rep_op_fut_t maybe_mutated =
325 interruptor::make_ready_future<rep_op_fut_tuple>(
326 seastar::now(),
327 interruptor::make_interruptible(osd_op_errorator::now()));
328 if (want_mutate) {
329 osd_op_params->req_id = msg->get_reqid();
330 osd_op_params->mtime = msg->get_mtime();
331 auto [submitted, all_completed] = std::forward<MutFunc>(mut_func)(std::move(txn),
332 std::move(obc),
333 std::move(*osd_op_params),
334 user_modify);
335 maybe_mutated = interruptor::make_ready_future<rep_op_fut_tuple>(
336 std::move(submitted),
337 osd_op_ierrorator::future<>(std::move(all_completed)));
338 }
339 if (__builtin_expect(op_effects.empty(), true)) {
340 return maybe_mutated;
341 } else {
342 return maybe_mutated.then_unpack_interruptible(
343 [this, pg=std::move(pg)](auto&& submitted, auto&& all_completed) mutable {
344 return interruptor::make_ready_future<rep_op_fut_tuple>(
345 std::move(submitted),
346 all_completed.safe_then_interruptible([this, pg=std::move(pg)] {
347 // let's do the cleaning of `op_effects` in destructor
348 return interruptor::do_for_each(op_effects,
349 [pg=std::move(pg)](auto& op_effect) {
350 return op_effect->execute(pg);
351 });
352 }));
353 });
354 }
355 }
356
357 template <class Func>
358 struct OpsExecuter::RollbackHelper {
359 interruptible_future<> rollback_obc_if_modified(const std::error_code& e);
360 ObjectContextRef get_obc() const {
361 assert(ox);
362 return ox->obc;
363 }
364 seastar::lw_shared_ptr<OpsExecuter> ox;
365 Func func;
366 };
367
368 template <class Func>
369 inline OpsExecuter::RollbackHelper<Func>
370 OpsExecuter::create_rollbacker(Func&& func) {
371 return {shared_from_this(), std::forward<Func>(func)};
372 }
373
374
375 template <class Func>
376 OpsExecuter::interruptible_future<>
377 OpsExecuter::RollbackHelper<Func>::rollback_obc_if_modified(
378 const std::error_code& e)
379 {
380 // Oops, an operation had failed. do_osd_ops() altogether with
381 // OpsExecuter already dropped the ObjectStore::Transaction if
382 // there was any. However, this is not enough to completely
383 // rollback as we gave OpsExecuter the very single copy of `obc`
384 // we maintain and we did it for both reading and writing.
385 // Now all modifications must be reverted.
386 //
387 // Let's just reload from the store. Evicting from the shared
388 // LRU would be tricky as next MOSDOp (the one at `get_obc`
389 // phase) could actually already finished the lookup. Fortunately,
390 // this is supposed to live on cold paths, so performance is not
391 // a concern -- simplicity wins.
392 //
393 // The conditional's purpose is to efficiently handle hot errors
394 // which may appear as a result of e.g. CEPH_OSD_OP_CMPXATTR or
395 // CEPH_OSD_OP_OMAP_CMP. These are read-like ops and clients
396 // typically append them before any write. If OpsExecuter hasn't
397 // seen any modifying operation, `obc` is supposed to be kept
398 // unchanged.
399 assert(ox);
400 const auto need_rollback = ox->has_seen_write();
401 crimson::get_logger(ceph_subsys_osd).debug(
402 "{}: object {} got error {}, need_rollback={}",
403 __func__,
404 ox->obc->get_oid(),
405 e,
406 need_rollback);
407 return need_rollback ? func(*ox->obc) : interruptor::now();
408 }
409
410 // PgOpsExecuter -- a class for executing ops targeting a certain PG.
411 class PgOpsExecuter {
412 template <typename T = void>
413 using interruptible_future =
414 ::crimson::interruptible::interruptible_future<
415 IOInterruptCondition, T>;
416
417 public:
418 PgOpsExecuter(const PG& pg, const MOSDOp& msg)
419 : pg(pg), nspace(msg.get_hobj().nspace) {
420 }
421
422 interruptible_future<> execute_op(OSDOp& osd_op);
423
424 private:
425 const PG& pg;
426 const std::string& nspace;
427 };
428
429 } // namespace crimson::osd