]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/osd/pg.cc
0f01c160783d280851bd25ee6bb69be22486345e
[ceph.git] / ceph / src / crimson / osd / pg.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "pg.h"
5
6 #include <functional>
7
8 #include <boost/range/adaptor/filtered.hpp>
9 #include <boost/range/adaptor/map.hpp>
10 #include <boost/range/adaptor/transformed.hpp>
11 #include <boost/range/algorithm/copy.hpp>
12 #include <boost/range/algorithm/max_element.hpp>
13 #include <boost/range/numeric.hpp>
14 #include <fmt/format.h>
15 #include <fmt/ostream.h>
16
17 #include "messages/MOSDOp.h"
18 #include "messages/MOSDOpReply.h"
19 #include "messages/MOSDRepOp.h"
20 #include "messages/MOSDRepOpReply.h"
21
22 #include "osd/OSDMap.h"
23
24 #include "os/Transaction.h"
25
26 #include "crimson/common/exception.h"
27 #include "crimson/net/Connection.h"
28 #include "crimson/net/Messenger.h"
29 #include "crimson/os/cyanstore/cyan_store.h"
30 #include "crimson/os/futurized_collection.h"
31 #include "crimson/osd/exceptions.h"
32 #include "crimson/osd/pg_meta.h"
33 #include "crimson/osd/pg_backend.h"
34 #include "crimson/osd/ops_executer.h"
35 #include "crimson/osd/osd_operations/osdop_params.h"
36 #include "crimson/osd/osd_operations/peering_event.h"
37 #include "crimson/osd/pg_recovery.h"
38 #include "crimson/osd/replicated_recovery_backend.h"
39
40 namespace {
41 seastar::logger& logger() {
42 return crimson::get_logger(ceph_subsys_osd);
43 }
44 }
45
46 namespace std::chrono {
47 std::ostream& operator<<(std::ostream& out, const signedspan& d)
48 {
49 auto s = std::chrono::duration_cast<std::chrono::seconds>(d).count();
50 auto ns = std::abs((d % 1s).count());
51 fmt::print(out, "{}{}s", s, ns ? fmt::format(".{:0>9}", ns) : "");
52 return out;
53 }
54 }
55
56 namespace crimson::osd {
57
58 using crimson::common::local_conf;
59
60 class RecoverablePredicate : public IsPGRecoverablePredicate {
61 public:
62 bool operator()(const set<pg_shard_t> &have) const override {
63 return !have.empty();
64 }
65 };
66
67 class ReadablePredicate: public IsPGReadablePredicate {
68 pg_shard_t whoami;
69 public:
70 explicit ReadablePredicate(pg_shard_t whoami) : whoami(whoami) {}
71 bool operator()(const set<pg_shard_t> &have) const override {
72 return have.count(whoami);
73 }
74 };
75
76 PG::PG(
77 spg_t pgid,
78 pg_shard_t pg_shard,
79 crimson::os::CollectionRef coll_ref,
80 pg_pool_t&& pool,
81 std::string&& name,
82 cached_map_t osdmap,
83 ShardServices &shard_services,
84 ec_profile_t profile)
85 : pgid{pgid},
86 pg_whoami{pg_shard},
87 coll_ref{coll_ref},
88 pgmeta_oid{pgid.make_pgmeta_oid()},
89 osdmap_gate("PG::osdmap_gate", std::nullopt),
90 shard_services{shard_services},
91 osdmap{osdmap},
92 backend(
93 PGBackend::create(
94 pgid.pgid,
95 pg_shard,
96 pool,
97 coll_ref,
98 shard_services,
99 profile)),
100 recovery_backend(
101 std::make_unique<ReplicatedRecoveryBackend>(
102 *this, shard_services, coll_ref, backend.get())),
103 recovery_handler(
104 std::make_unique<PGRecovery>(this)),
105 peering_state(
106 shard_services.get_cct(),
107 pg_shard,
108 pgid,
109 PGPool(
110 osdmap,
111 pgid.pool(),
112 pool,
113 name),
114 osdmap,
115 this,
116 this),
117 wait_for_active_blocker(this)
118 {
119 peering_state.set_backend_predicates(
120 new ReadablePredicate(pg_whoami),
121 new RecoverablePredicate());
122 osdmap_gate.got_map(osdmap->get_epoch());
123 }
124
125 PG::~PG() {}
126
127 bool PG::try_flush_or_schedule_async() {
128 (void)shard_services.get_store().do_transaction(
129 coll_ref,
130 ObjectStore::Transaction()).then(
131 [this, epoch=get_osdmap_epoch()]() {
132 return shard_services.start_operation<LocalPeeringEvent>(
133 this,
134 shard_services,
135 pg_whoami,
136 pgid,
137 epoch,
138 epoch,
139 PeeringState::IntervalFlush());
140 });
141 return false;
142 }
143
144 void PG::queue_check_readable(epoch_t last_peering_reset, ceph::timespan delay)
145 {
146 // handle the peering event in the background
147 check_readable_timer.cancel();
148 check_readable_timer.set_callback([last_peering_reset, this] {
149 (void) shard_services.start_operation<LocalPeeringEvent>(
150 this,
151 shard_services,
152 pg_whoami,
153 pgid,
154 last_peering_reset,
155 last_peering_reset,
156 PeeringState::CheckReadable{});
157 });
158 check_readable_timer.arm(
159 std::chrono::duration_cast<seastar::lowres_clock::duration>(delay));
160 }
161
162 void PG::recheck_readable()
163 {
164 bool changed = false;
165 const auto mnow = shard_services.get_mnow();
166 if (peering_state.state_test(PG_STATE_WAIT)) {
167 auto prior_readable_until_ub = peering_state.get_prior_readable_until_ub();
168 if (mnow < prior_readable_until_ub) {
169 logger().info("{} will wait (mnow {} < prior_readable_until_ub {})",
170 __func__, mnow, prior_readable_until_ub);
171 } else {
172 logger().info("{} no longer wait (mnow {} >= prior_readable_until_ub {})",
173 __func__, mnow, prior_readable_until_ub);
174 peering_state.state_clear(PG_STATE_WAIT);
175 peering_state.clear_prior_readable_until_ub();
176 changed = true;
177 }
178 }
179 if (peering_state.state_test(PG_STATE_LAGGY)) {
180 auto readable_until = peering_state.get_readable_until();
181 if (readable_until == readable_until.zero()) {
182 logger().info("{} still laggy (mnow {}, readable_until zero)",
183 __func__, mnow);
184 } else if (mnow >= readable_until) {
185 logger().info("{} still laggy (mnow {} >= readable_until {})",
186 __func__, mnow, readable_until);
187 } else {
188 logger().info("{} no longer laggy (mnow {} < readable_until {})",
189 __func__, mnow, readable_until);
190 peering_state.state_clear(PG_STATE_LAGGY);
191 changed = true;
192 }
193 }
194 if (changed) {
195 publish_stats_to_osd();
196 if (!peering_state.state_test(PG_STATE_WAIT) &&
197 !peering_state.state_test(PG_STATE_LAGGY)) {
198 // TODO: requeue ops waiting for readable
199 }
200 }
201 }
202
203 unsigned PG::get_target_pg_log_entries() const
204 {
205 const unsigned num_pgs = shard_services.get_pg_num();
206 const unsigned target =
207 local_conf().get_val<uint64_t>("osd_target_pg_log_entries_per_osd");
208 const unsigned min_pg_log_entries =
209 local_conf().get_val<uint64_t>("osd_min_pg_log_entries");
210 if (num_pgs > 0 && target > 0) {
211 // target an even spread of our budgeted log entries across all
212 // PGs. note that while we only get to control the entry count
213 // for primary PGs, we'll normally be responsible for a mix of
214 // primary and replica PGs (for the same pool(s) even), so this
215 // will work out.
216 const unsigned max_pg_log_entries =
217 local_conf().get_val<uint64_t>("osd_max_pg_log_entries");
218 return std::clamp(target / num_pgs,
219 min_pg_log_entries,
220 max_pg_log_entries);
221 } else {
222 // fall back to a per-pg value.
223 return min_pg_log_entries;
224 }
225 }
226
227 void PG::on_activate(interval_set<snapid_t>)
228 {
229 projected_last_update = peering_state.get_info().last_update;
230 }
231
232 void PG::on_activate_complete()
233 {
234 wait_for_active_blocker.on_active();
235
236 if (peering_state.needs_recovery()) {
237 logger().info("{}: requesting recovery",
238 __func__);
239 (void) shard_services.start_operation<LocalPeeringEvent>(
240 this,
241 shard_services,
242 pg_whoami,
243 pgid,
244 get_osdmap_epoch(),
245 get_osdmap_epoch(),
246 PeeringState::DoRecovery{});
247 } else if (peering_state.needs_backfill()) {
248 logger().info("{}: requesting backfill",
249 __func__);
250 (void) shard_services.start_operation<LocalPeeringEvent>(
251 this,
252 shard_services,
253 pg_whoami,
254 pgid,
255 get_osdmap_epoch(),
256 get_osdmap_epoch(),
257 PeeringState::RequestBackfill{});
258 } else {
259 logger().debug("{}: no need to recover or backfill, AllReplicasRecovered",
260 " for pg: {}", __func__, pgid);
261 (void) shard_services.start_operation<LocalPeeringEvent>(
262 this,
263 shard_services,
264 pg_whoami,
265 pgid,
266 get_osdmap_epoch(),
267 get_osdmap_epoch(),
268 PeeringState::AllReplicasRecovered{});
269 }
270 backend->on_activate_complete();
271 }
272
273 void PG::prepare_write(pg_info_t &info,
274 pg_info_t &last_written_info,
275 PastIntervals &past_intervals,
276 PGLog &pglog,
277 bool dirty_info,
278 bool dirty_big_info,
279 bool need_write_epoch,
280 ceph::os::Transaction &t)
281 {
282 std::map<string,bufferlist> km;
283 std::string key_to_remove;
284 if (dirty_big_info || dirty_info) {
285 int ret = prepare_info_keymap(
286 shard_services.get_cct(),
287 &km,
288 &key_to_remove,
289 get_osdmap_epoch(),
290 info,
291 last_written_info,
292 past_intervals,
293 dirty_big_info,
294 need_write_epoch,
295 true,
296 nullptr,
297 this);
298 ceph_assert(ret == 0);
299 }
300 pglog.write_log_and_missing(
301 t, &km, coll_ref->get_cid(), pgmeta_oid,
302 peering_state.get_pool().info.require_rollback());
303 if (!km.empty()) {
304 t.omap_setkeys(coll_ref->get_cid(), pgmeta_oid, km);
305 }
306 if (!key_to_remove.empty()) {
307 t.omap_rmkey(coll_ref->get_cid(), pgmeta_oid, key_to_remove);
308 }
309 }
310
311 std::pair<ghobject_t, bool>
312 PG::do_delete_work(ceph::os::Transaction &t, ghobject_t _next)
313 {
314 // TODO
315 shard_services.dec_pg_num();
316 return {_next, false};
317 }
318
319 void PG::scrub_requested(scrub_level_t scrub_level, scrub_type_t scrub_type)
320 {
321 // TODO: should update the stats upon finishing the scrub
322 peering_state.update_stats([scrub_level, this](auto& history, auto& stats) {
323 const utime_t now = ceph_clock_now();
324 history.last_scrub = peering_state.get_info().last_update;
325 history.last_scrub_stamp = now;
326 history.last_clean_scrub_stamp = now;
327 if (scrub_level == scrub_level_t::deep) {
328 history.last_deep_scrub = history.last_scrub;
329 history.last_deep_scrub_stamp = now;
330 }
331 // yes, please publish the stats
332 return true;
333 });
334 }
335
336 void PG::log_state_enter(const char *state) {
337 logger().info("Entering state: {}", state);
338 }
339
340 void PG::log_state_exit(
341 const char *state_name, utime_t enter_time,
342 uint64_t events, utime_t event_dur) {
343 logger().info(
344 "Exiting state: {}, entered at {}, {} spent on {} events",
345 state_name,
346 enter_time,
347 event_dur,
348 events);
349 }
350
351 ceph::signedspan PG::get_mnow()
352 {
353 return shard_services.get_mnow();
354 }
355
356 HeartbeatStampsRef PG::get_hb_stamps(int peer)
357 {
358 return shard_services.get_hb_stamps(peer);
359 }
360
361 void PG::schedule_renew_lease(epoch_t last_peering_reset, ceph::timespan delay)
362 {
363 // handle the peering event in the background
364 renew_lease_timer.cancel();
365 renew_lease_timer.set_callback([last_peering_reset, this] {
366 (void) shard_services.start_operation<LocalPeeringEvent>(
367 this,
368 shard_services,
369 pg_whoami,
370 pgid,
371 last_peering_reset,
372 last_peering_reset,
373 RenewLease{});
374 });
375 renew_lease_timer.arm(
376 std::chrono::duration_cast<seastar::lowres_clock::duration>(delay));
377 }
378
379
380 void PG::init(
381 int role,
382 const vector<int>& newup, int new_up_primary,
383 const vector<int>& newacting, int new_acting_primary,
384 const pg_history_t& history,
385 const PastIntervals& pi,
386 bool backfill,
387 ObjectStore::Transaction &t)
388 {
389 peering_state.init(
390 role, newup, new_up_primary, newacting,
391 new_acting_primary, history, pi, backfill, t);
392 }
393
394 seastar::future<> PG::read_state(crimson::os::FuturizedStore* store)
395 {
396 if (__builtin_expect(stopping, false)) {
397 return seastar::make_exception_future<>(
398 crimson::common::system_shutdown_exception());
399 }
400
401 return seastar::do_with(PGMeta(store, pgid), [] (auto& pg_meta) {
402 return pg_meta.load();
403 }).then([this, store](auto&& ret) {
404 auto [pg_info, past_intervals] = std::move(ret);
405 return peering_state.init_from_disk_state(
406 std::move(pg_info),
407 std::move(past_intervals),
408 [this, store] (PGLog &pglog) {
409 return pglog.read_log_and_missing_crimson(
410 *store,
411 coll_ref,
412 peering_state.get_info(),
413 pgmeta_oid);
414 });
415 }).then([this]() {
416 int primary, up_primary;
417 vector<int> acting, up;
418 peering_state.get_osdmap()->pg_to_up_acting_osds(
419 pgid.pgid, &up, &up_primary, &acting, &primary);
420 peering_state.init_primary_up_acting(
421 up,
422 acting,
423 up_primary,
424 primary);
425 int rr = OSDMap::calc_pg_role(pg_whoami, acting);
426 peering_state.set_role(rr);
427
428 epoch_t epoch = get_osdmap_epoch();
429 (void) shard_services.start_operation<LocalPeeringEvent>(
430 this,
431 shard_services,
432 pg_whoami,
433 pgid,
434 epoch,
435 epoch,
436 PeeringState::Initialize());
437
438 return seastar::now();
439 });
440 }
441
442 void PG::do_peering_event(
443 const boost::statechart::event_base &evt,
444 PeeringCtx &rctx)
445 {
446 peering_state.handle_event(
447 evt,
448 &rctx);
449 peering_state.write_if_dirty(rctx.transaction);
450 }
451
452 void PG::do_peering_event(
453 PGPeeringEvent& evt, PeeringCtx &rctx)
454 {
455 if (!peering_state.pg_has_reset_since(evt.get_epoch_requested())) {
456 logger().debug("{} handling {} for pg: {}", __func__, evt.get_desc(), pgid);
457 do_peering_event(evt.get_event(), rctx);
458 } else {
459 logger().debug("{} ignoring {} -- pg has reset", __func__, evt.get_desc());
460 }
461 }
462
463 void PG::handle_advance_map(
464 cached_map_t next_map, PeeringCtx &rctx)
465 {
466 vector<int> newup, newacting;
467 int up_primary, acting_primary;
468 next_map->pg_to_up_acting_osds(
469 pgid.pgid,
470 &newup, &up_primary,
471 &newacting, &acting_primary);
472 peering_state.advance_map(
473 next_map,
474 peering_state.get_osdmap(),
475 newup,
476 up_primary,
477 newacting,
478 acting_primary,
479 rctx);
480 osdmap_gate.got_map(next_map->get_epoch());
481 }
482
483 void PG::handle_activate_map(PeeringCtx &rctx)
484 {
485 peering_state.activate_map(rctx);
486 }
487
488 void PG::handle_initialize(PeeringCtx &rctx)
489 {
490 PeeringState::Initialize evt;
491 peering_state.handle_event(evt, &rctx);
492 }
493
494
495 void PG::print(ostream& out) const
496 {
497 out << peering_state << " ";
498 }
499
500 void PG::dump_primary(Formatter* f)
501 {
502 peering_state.dump_peering_state(f);
503
504 f->open_array_section("recovery_state");
505 PeeringState::QueryState q(f);
506 peering_state.handle_event(q, 0);
507 f->close_section();
508
509 // TODO: snap_trimq
510 // TODO: scrubber state
511 // TODO: agent state
512 }
513
514 std::ostream& operator<<(std::ostream& os, const PG& pg)
515 {
516 os << " pg_epoch " << pg.get_osdmap_epoch() << " ";
517 pg.print(os);
518 return os;
519 }
520
521 void PG::WaitForActiveBlocker::dump_detail(Formatter *f) const
522 {
523 f->dump_stream("pgid") << pg->pgid;
524 }
525
526 void PG::WaitForActiveBlocker::on_active()
527 {
528 p.set_value();
529 p = {};
530 }
531
532 blocking_future<> PG::WaitForActiveBlocker::wait()
533 {
534 if (pg->peering_state.is_active()) {
535 return make_blocking_future(seastar::now());
536 } else {
537 return make_blocking_future(p.get_shared_future());
538 }
539 }
540
541 seastar::future<> PG::WaitForActiveBlocker::stop()
542 {
543 p.set_exception(crimson::common::system_shutdown_exception());
544 return seastar::now();
545 }
546
547 seastar::future<> PG::submit_transaction(const OpInfo& op_info,
548 const std::vector<OSDOp>& ops,
549 ObjectContextRef&& obc,
550 ceph::os::Transaction&& txn,
551 const osd_op_params_t& osd_op_p)
552 {
553 if (__builtin_expect(stopping, false)) {
554 return seastar::make_exception_future<>(
555 crimson::common::system_shutdown_exception());
556 }
557
558 epoch_t map_epoch = get_osdmap_epoch();
559
560 if (__builtin_expect(osd_op_p.at_version.epoch != map_epoch, false)) {
561 throw crimson::common::actingset_changed(is_primary());
562 }
563
564 std::vector<pg_log_entry_t> log_entries;
565 log_entries.emplace_back(obc->obs.exists ?
566 pg_log_entry_t::MODIFY : pg_log_entry_t::DELETE,
567 obc->obs.oi.soid, osd_op_p.at_version, obc->obs.oi.version,
568 osd_op_p.user_modify ? osd_op_p.at_version.version : 0,
569 osd_op_p.req->get_reqid(), osd_op_p.req->get_mtime(),
570 op_info.allows_returnvec() && !ops.empty() ? ops.back().rval.code : 0);
571 // TODO: refactor the submit_transaction
572 if (op_info.allows_returnvec()) {
573 // also the per-op values are recorded in the pg log
574 log_entries.back().set_op_returns(ops);
575 logger().debug("{} op_returns: {}",
576 __func__, log_entries.back().op_returns);
577 }
578 log_entries.back().clean_regions = std::move(osd_op_p.clean_regions);
579 peering_state.pre_submit_op(obc->obs.oi.soid, log_entries, osd_op_p.at_version);
580 peering_state.append_log_with_trim_to_updated(std::move(log_entries), osd_op_p.at_version,
581 txn, true, false);
582
583 return backend->mutate_object(peering_state.get_acting_recovery_backfill(),
584 std::move(obc),
585 std::move(txn),
586 std::move(osd_op_p),
587 peering_state.get_last_peering_reset(),
588 map_epoch,
589 std::move(log_entries)).then(
590 [this, last_complete=peering_state.get_info().last_complete,
591 at_version=osd_op_p.at_version](auto acked) {
592 for (const auto& peer : acked) {
593 peering_state.update_peer_last_complete_ondisk(
594 peer.shard, peer.last_complete_ondisk);
595 }
596 peering_state.complete_write(at_version, last_complete);
597 return seastar::now();
598 });
599 }
600
601 osd_op_params_t&& PG::fill_op_params_bump_pg_version(
602 osd_op_params_t&& osd_op_p,
603 Ref<MOSDOp> m,
604 const bool user_modify)
605 {
606 osd_op_p.req = std::move(m);
607 osd_op_p.at_version = next_version();
608 osd_op_p.pg_trim_to = get_pg_trim_to();
609 osd_op_p.min_last_complete_ondisk = get_min_last_complete_ondisk();
610 osd_op_p.last_complete = get_info().last_complete;
611 if (user_modify) {
612 osd_op_p.user_at_version = osd_op_p.at_version.version;
613 }
614 return std::move(osd_op_p);
615 }
616
617 seastar::future<Ref<MOSDOpReply>> PG::handle_failed_op(
618 const std::error_code& e,
619 ObjectContextRef obc,
620 const OpsExecuter& ox,
621 const MOSDOp& m) const
622 {
623 // Oops, an operation had failed. do_osd_ops() altogether with
624 // OpsExecuter already dropped the ObjectStore::Transaction if
625 // there was any. However, this is not enough to completely
626 // rollback as we gave OpsExecuter the very single copy of `obc`
627 // we maintain and we did it for both reading and writing.
628 // Now all modifications must be reverted.
629 //
630 // Let's just reload from the store. Evicting from the shared
631 // LRU would be tricky as next MOSDOp (the one at `get_obc`
632 // phase) could actually already finished the lookup. Fortunately,
633 // this is supposed to live on cold paths, so performance is not
634 // a concern -- simplicity wins.
635 //
636 // The conditional's purpose is to efficiently handle hot errors
637 // which may appear as a result of e.g. CEPH_OSD_OP_CMPXATTR or
638 // CEPH_OSD_OP_OMAP_CMP. These are read-like ops and clients
639 // typically append them before any write. If OpsExecuter hasn't
640 // seen any modifying operation, `obc` is supposed to be kept
641 // unchanged.
642 assert(e.value() > 0);
643 const bool need_reload_obc = ox.has_seen_write();
644 logger().debug(
645 "{}: {} - object {} got error code {}, {}; need_reload_obc {}",
646 __func__,
647 m,
648 obc->obs.oi.soid,
649 e.value(),
650 e.message(),
651 need_reload_obc);
652 return (need_reload_obc ? reload_obc(*obc)
653 : load_obc_ertr::now()
654 ).safe_then([&e, &m, obc = std::move(obc), this] {
655 auto reply = make_message<MOSDOpReply>(
656 &m, -e.value(), get_osdmap_epoch(), 0, false);
657 reply->set_enoent_reply_versions(
658 peering_state.get_info().last_update,
659 peering_state.get_info().last_user_version);
660 return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
661 }, load_obc_ertr::assert_all{ "can't live with object state messed up" });
662 }
663
664 seastar::future<Ref<MOSDOpReply>> PG::do_osd_ops(
665 Ref<MOSDOp> m,
666 ObjectContextRef obc,
667 const OpInfo &op_info)
668 {
669 if (__builtin_expect(stopping, false)) {
670 throw crimson::common::system_shutdown_exception();
671 }
672
673 using osd_op_errorator = OpsExecuter::osd_op_errorator;
674 const auto oid = m->get_snapid() == CEPH_SNAPDIR ? m->get_hobj().get_head()
675 : m->get_hobj();
676 auto ox = std::make_unique<OpsExecuter>(
677 obc, op_info, get_pool().info, get_backend(), *m);
678 return crimson::do_for_each(
679 m->ops, [obc, m, ox = ox.get()](OSDOp& osd_op) {
680 logger().debug(
681 "do_osd_ops: {} - object {} - handling op {}",
682 *m,
683 obc->obs.oi.soid,
684 ceph_osd_op_name(osd_op.op.op));
685 return ox->execute_op(osd_op);
686 }).safe_then([this, obc, m, ox = ox.get(), &op_info] {
687 logger().debug(
688 "do_osd_ops: {} - object {} all operations successful",
689 *m,
690 obc->obs.oi.soid);
691 return std::move(*ox).flush_changes(
692 [m] (auto&& obc) -> osd_op_errorator::future<> {
693 logger().debug(
694 "do_osd_ops: {} - object {} txn is empty, bypassing mutate",
695 *m,
696 obc->obs.oi.soid);
697 return osd_op_errorator::now();
698 },
699 [this, m, &op_info] (auto&& txn,
700 auto&& obc,
701 auto&& osd_op_p,
702 bool user_modify) -> osd_op_errorator::future<> {
703 logger().debug(
704 "do_osd_ops: {} - object {} submitting txn",
705 *m,
706 obc->obs.oi.soid);
707 auto filled_osd_op_p = fill_op_params_bump_pg_version(
708 std::move(osd_op_p),
709 std::move(m),
710 user_modify);
711 return submit_transaction(
712 op_info,
713 filled_osd_op_p.req->ops,
714 std::move(obc),
715 std::move(txn),
716 std::move(filled_osd_op_p));
717 });
718 }).safe_then([this,
719 m,
720 obc,
721 rvec = op_info.allows_returnvec()] {
722 // TODO: should stop at the first op which returns a negative retval,
723 // cmpext uses it for returning the index of first unmatched byte
724 int result = m->ops.empty() ? 0 : m->ops.back().rval.code;
725 if (result > 0 && !rvec) {
726 result = 0;
727 }
728 auto reply = make_message<MOSDOpReply>(m.get(),
729 result,
730 get_osdmap_epoch(),
731 0,
732 false);
733 reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
734 logger().debug(
735 "do_osd_ops: {} - object {} sending reply",
736 *m,
737 obc->obs.oi.soid);
738 return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
739 }, osd_op_errorator::all_same_way([ox = ox.get(),
740 m,
741 obc,
742 this] (const std::error_code& e) {
743 return handle_failed_op(e, std::move(obc), *ox, *m);
744 })).handle_exception_type([ox_deleter = std::move(ox),
745 m,
746 obc,
747 this] (const crimson::osd::error& e) {
748 // we need this handler because throwing path which aren't errorated yet.
749 logger().debug("encountered the legacy error handling path!");
750 return handle_failed_op(e.code(), std::move(obc), *ox_deleter, *m);
751 });
752 }
753
754 seastar::future<Ref<MOSDOpReply>> PG::do_pg_ops(Ref<MOSDOp> m)
755 {
756 if (__builtin_expect(stopping, false)) {
757 throw crimson::common::system_shutdown_exception();
758 }
759
760 auto ox = std::make_unique<PgOpsExecuter>(std::as_const(*this),
761 std::as_const(*m));
762 return seastar::do_for_each(m->ops, [ox = ox.get()](OSDOp& osd_op) {
763 logger().debug("will be handling pg op {}", ceph_osd_op_name(osd_op.op.op));
764 return ox->execute_op(osd_op);
765 }).then([m, this, ox = std::move(ox)] {
766 auto reply = make_message<MOSDOpReply>(m.get(), 0, get_osdmap_epoch(),
767 CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK,
768 false);
769 return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
770 }).handle_exception_type([=](const crimson::osd::error& e) {
771 auto reply = make_message<MOSDOpReply>(
772 m.get(), -e.code().value(), get_osdmap_epoch(), 0, false);
773 reply->set_enoent_reply_versions(peering_state.get_info().last_update,
774 peering_state.get_info().last_user_version);
775 return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
776 });
777 }
778
779 hobject_t PG::get_oid(const MOSDOp &m)
780 {
781 return (m.get_snapid() == CEPH_SNAPDIR ?
782 m.get_hobj().get_head() :
783 m.get_hobj());
784 }
785
786 RWState::State PG::get_lock_type(const OpInfo &op_info)
787 {
788
789 if (op_info.rwordered() && op_info.may_read()) {
790 return RWState::RWEXCL;
791 } else if (op_info.rwordered()) {
792 return RWState::RWWRITE;
793 } else {
794 ceph_assert(op_info.may_read());
795 return RWState::RWREAD;
796 }
797 }
798
799 std::optional<hobject_t> PG::resolve_oid(
800 const SnapSet &ss,
801 const hobject_t &oid)
802 {
803 if (oid.snap > ss.seq) {
804 return oid.get_head();
805 } else {
806 // which clone would it be?
807 auto clone = std::upper_bound(
808 begin(ss.clones), end(ss.clones),
809 oid.snap);
810 if (clone == end(ss.clones)) {
811 // Doesn't exist, > last clone, < ss.seq
812 return std::nullopt;
813 }
814 auto citer = ss.clone_snaps.find(*clone);
815 // TODO: how do we want to handle this kind of logic error?
816 ceph_assert(citer != ss.clone_snaps.end());
817
818 if (std::find(
819 citer->second.begin(),
820 citer->second.end(),
821 *clone) == citer->second.end()) {
822 return std::nullopt;
823 } else {
824 auto soid = oid;
825 soid.snap = *clone;
826 return std::optional<hobject_t>(soid);
827 }
828 }
829 }
830
831 template<RWState::State State>
832 PG::load_obc_ertr::future<>
833 PG::with_head_obc(hobject_t oid, with_obc_func_t&& func)
834 {
835 assert(oid.is_head());
836 auto [obc, existed] = shard_services.obc_registry.get_cached_obc(oid);
837 return obc->with_lock<State>(
838 [oid=std::move(oid), existed=existed, obc=std::move(obc),
839 func=std::move(func), this] {
840 auto loaded = load_obc_ertr::make_ready_future<ObjectContextRef>(obc);
841 if (existed) {
842 logger().debug("with_head_obc: found {} in cache", oid);
843 } else {
844 logger().debug("with_head_obc: cache miss on {}", oid);
845 loaded = obc->with_promoted_lock<State>([this, obc] {
846 return load_head_obc(obc);
847 });
848 }
849 return loaded.safe_then([func=std::move(func)](auto obc) {
850 return func(std::move(obc));
851 });
852 });
853 }
854
855 template<RWState::State State>
856 PG::load_obc_ertr::future<>
857 PG::with_clone_obc(hobject_t oid, with_obc_func_t&& func)
858 {
859 assert(!oid.is_head());
860 return with_head_obc<RWState::RWREAD>(oid.get_head(),
861 [oid, func=std::move(func), this](auto head) -> load_obc_ertr::future<> {
862 auto coid = resolve_oid(head->get_ro_ss(), oid);
863 if (!coid) {
864 // TODO: return crimson::ct_error::enoent::make();
865 logger().error("with_clone_obc: {} clone not found", coid);
866 return load_obc_ertr::make_ready_future<>();
867 }
868 auto [clone, existed] = shard_services.obc_registry.get_cached_obc(*coid);
869 return clone->template with_lock<State>(
870 [coid=*coid, existed=existed,
871 head=std::move(head), clone=std::move(clone),
872 func=std::move(func), this]() -> load_obc_ertr::future<> {
873 auto loaded = load_obc_ertr::make_ready_future<ObjectContextRef>(clone);
874 if (existed) {
875 logger().debug("with_clone_obc: found {} in cache", coid);
876 } else {
877 logger().debug("with_clone_obc: cache miss on {}", coid);
878 loaded = clone->template with_promoted_lock<State>(
879 [coid, clone, head, this] {
880 return backend->load_metadata(coid).safe_then(
881 [coid, clone=std::move(clone), head=std::move(head)](auto md) mutable {
882 clone->set_clone_state(std::move(md->os), std::move(head));
883 return clone;
884 });
885 });
886 }
887 return loaded.safe_then([func=std::move(func)](auto clone) {
888 return func(std::move(clone));
889 });
890 });
891 });
892 }
893
894 // explicitly instantiate the used instantiations
895 template PG::load_obc_ertr::future<>
896 PG::with_head_obc<RWState::RWNONE>(hobject_t, with_obc_func_t&&);
897
898 PG::load_obc_ertr::future<crimson::osd::ObjectContextRef>
899 PG::load_head_obc(ObjectContextRef obc)
900 {
901 hobject_t oid = obc->get_oid();
902 return backend->load_metadata(oid).safe_then([obc=std::move(obc)](auto md)
903 -> load_obc_ertr::future<crimson::osd::ObjectContextRef> {
904 const hobject_t& oid = md->os.oi.soid;
905 logger().debug(
906 "load_head_obc: loaded obs {} for {}", md->os.oi, oid);
907 if (!md->ss) {
908 logger().error(
909 "load_head_obc: oid {} missing snapset", oid);
910 return crimson::ct_error::object_corrupted::make();
911 }
912 obc->set_head_state(std::move(md->os), std::move(*(md->ss)));
913 logger().debug(
914 "load_head_obc: returning obc {} for {}",
915 obc->obs.oi, obc->obs.oi.soid);
916 return load_obc_ertr::make_ready_future<
917 crimson::osd::ObjectContextRef>(obc);
918 });
919 }
920
921 PG::load_obc_ertr::future<>
922 PG::reload_obc(crimson::osd::ObjectContext& obc) const
923 {
924 assert(obc.is_head());
925 return backend->load_metadata(obc.get_oid()).safe_then([&obc](auto md)
926 -> load_obc_ertr::future<> {
927 logger().debug(
928 "{}: reloaded obs {} for {}",
929 __func__,
930 md->os.oi,
931 obc.get_oid());
932 if (!md->ss) {
933 logger().error(
934 "{}: oid {} missing snapset",
935 __func__,
936 obc.get_oid());
937 return crimson::ct_error::object_corrupted::make();
938 }
939 obc.set_head_state(std::move(md->os), std::move(*(md->ss)));
940 return load_obc_ertr::now();
941 });
942 }
943
944 PG::load_obc_ertr::future<>
945 PG::with_locked_obc(Ref<MOSDOp> &m, const OpInfo &op_info,
946 Operation *op, PG::with_obc_func_t &&f)
947 {
948 if (__builtin_expect(stopping, false)) {
949 throw crimson::common::system_shutdown_exception();
950 }
951 const hobject_t oid = get_oid(*m);
952 switch (get_lock_type(op_info)) {
953 case RWState::RWREAD:
954 if (oid.is_head()) {
955 return with_head_obc<RWState::RWREAD>(oid, std::move(f));
956 } else {
957 return with_clone_obc<RWState::RWREAD>(oid, std::move(f));
958 }
959 case RWState::RWWRITE:
960 if (oid.is_head()) {
961 return with_head_obc<RWState::RWWRITE>(oid, std::move(f));
962 } else {
963 return with_clone_obc<RWState::RWWRITE>(oid, std::move(f));
964 }
965 case RWState::RWEXCL:
966 if (oid.is_head()) {
967 return with_head_obc<RWState::RWWRITE>(oid, std::move(f));
968 } else {
969 return with_clone_obc<RWState::RWWRITE>(oid, std::move(f));
970 }
971 default:
972 ceph_abort();
973 };
974 }
975
976 seastar::future<> PG::handle_rep_op(Ref<MOSDRepOp> req)
977 {
978 if (__builtin_expect(stopping, false)) {
979 return seastar::make_exception_future<>(
980 crimson::common::system_shutdown_exception());
981 }
982
983 if (can_discard_replica_op(*req)) {
984 return seastar::now();
985 }
986
987 ceph::os::Transaction txn;
988 auto encoded_txn = req->get_data().cbegin();
989 decode(txn, encoded_txn);
990 auto p = req->logbl.cbegin();
991 std::vector<pg_log_entry_t> log_entries;
992 decode(log_entries, p);
993 peering_state.append_log(std::move(log_entries), req->pg_trim_to,
994 req->version, req->min_last_complete_ondisk, txn, !txn.empty(), false);
995 return shard_services.get_store().do_transaction(coll_ref, std::move(txn))
996 .then([req, lcod=peering_state.get_info().last_complete, this] {
997 peering_state.update_last_complete_ondisk(lcod);
998 const auto map_epoch = get_osdmap_epoch();
999 auto reply = make_message<MOSDRepOpReply>(
1000 req.get(), pg_whoami, 0,
1001 map_epoch, req->get_min_epoch(), CEPH_OSD_FLAG_ONDISK);
1002 reply->set_last_complete_ondisk(lcod);
1003 return shard_services.send_to_osd(req->from.osd, reply, map_epoch);
1004 });
1005 }
1006
1007 void PG::handle_rep_op_reply(crimson::net::ConnectionRef conn,
1008 const MOSDRepOpReply& m)
1009 {
1010 if (!can_discard_replica_op(m)) {
1011 backend->got_rep_op_reply(m);
1012 }
1013 }
1014
1015 template <typename MsgType>
1016 bool PG::can_discard_replica_op(const MsgType& m) const
1017 {
1018 // if a repop is replied after a replica goes down in a new osdmap, and
1019 // before the pg advances to this new osdmap, the repop replies before this
1020 // repop can be discarded by that replica OSD, because the primary resets the
1021 // connection to it when handling the new osdmap marking it down, and also
1022 // resets the messenger sesssion when the replica reconnects. to avoid the
1023 // out-of-order replies, the messages from that replica should be discarded.
1024 const auto osdmap = peering_state.get_osdmap();
1025 const int from_osd = m.get_source().num();
1026 if (osdmap->is_down(from_osd)) {
1027 return true;
1028 }
1029 // Mostly, this overlaps with the old_peering_msg
1030 // condition. An important exception is pushes
1031 // sent by replicas not in the acting set, since
1032 // if such a replica goes down it does not cause
1033 // a new interval.
1034 if (osdmap->get_down_at(from_osd) >= m.map_epoch) {
1035 return true;
1036 }
1037 // same pg?
1038 // if pg changes *at all*, we reset and repeer!
1039 if (epoch_t lpr = peering_state.get_last_peering_reset();
1040 lpr > m.map_epoch) {
1041 logger().debug("{}: pg changed {} after {}, dropping",
1042 __func__, get_info().history, m.map_epoch);
1043 return true;
1044 }
1045 return false;
1046 }
1047
1048 seastar::future<> PG::stop()
1049 {
1050 logger().info("PG {} {}", pgid, __func__);
1051 stopping = true;
1052 return osdmap_gate.stop().then([this] {
1053 return wait_for_active_blocker.stop();
1054 }).then([this] {
1055 return recovery_handler->stop();
1056 }).then([this] {
1057 return recovery_backend->stop();
1058 }).then([this] {
1059 return backend->stop();
1060 });
1061 }
1062
1063 void PG::on_change(ceph::os::Transaction &t) {
1064 recovery_backend->on_peering_interval_change(t);
1065 backend->on_actingset_changed({ is_primary() });
1066 }
1067
1068 bool PG::can_discard_op(const MOSDOp& m) const {
1069 return __builtin_expect(m.get_map_epoch()
1070 < peering_state.get_info().history.same_primary_since, false);
1071 }
1072
1073 bool PG::is_degraded_or_backfilling_object(const hobject_t& soid) const {
1074 /* The conditions below may clear (on_local_recover, before we queue
1075 * the transaction) before we actually requeue the degraded waiters
1076 * in on_global_recover after the transaction completes.
1077 */
1078 if (peering_state.get_pg_log().get_missing().get_items().count(soid))
1079 return true;
1080 ceph_assert(!get_acting_recovery_backfill().empty());
1081 for (auto& peer : get_acting_recovery_backfill()) {
1082 if (peer == get_primary()) continue;
1083 auto peer_missing_entry = peering_state.get_peer_missing().find(peer);
1084 // If an object is missing on an async_recovery_target, return false.
1085 // This will not block the op and the object is async recovered later.
1086 if (peer_missing_entry != peering_state.get_peer_missing().end() &&
1087 peer_missing_entry->second.get_items().count(soid)) {
1088 return true;
1089 }
1090 // Object is degraded if after last_backfill AND
1091 // we are backfilling it
1092 if (is_backfill_target(peer) &&
1093 peering_state.get_peer_info(peer).last_backfill <= soid &&
1094 recovery_handler->backfill_state->get_last_backfill_started() >= soid &&
1095 recovery_backend->is_recovering(soid)) {
1096 return true;
1097 }
1098 }
1099 return false;
1100 }
1101
1102 }