]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/osd/pg.cc
import 15.2.9
[ceph.git] / ceph / src / crimson / osd / pg.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "pg.h"
5
6 #include <functional>
7
8 #include <boost/range/adaptor/filtered.hpp>
9 #include <boost/range/adaptor/map.hpp>
10 #include <boost/range/adaptor/transformed.hpp>
11 #include <boost/range/algorithm/copy.hpp>
12 #include <boost/range/algorithm/max_element.hpp>
13 #include <boost/range/numeric.hpp>
14 #include <fmt/format.h>
15 #include <fmt/ostream.h>
16
17 #include "messages/MOSDOp.h"
18 #include "messages/MOSDOpReply.h"
19 #include "messages/MOSDPGInfo.h"
20 #include "messages/MOSDPGLog.h"
21 #include "messages/MOSDPGNotify.h"
22 #include "messages/MOSDPGQuery.h"
23 #include "messages/MOSDRepOp.h"
24 #include "messages/MOSDRepOpReply.h"
25
26 #include "osd/OSDMap.h"
27
28 #include "os/Transaction.h"
29
30 #include "crimson/net/Connection.h"
31 #include "crimson/net/Messenger.h"
32 #include "crimson/os/cyanstore/cyan_store.h"
33 #include "crimson/os/futurized_collection.h"
34 #include "crimson/osd/exceptions.h"
35 #include "crimson/osd/pg_meta.h"
36 #include "crimson/osd/pg_backend.h"
37 #include "crimson/osd/ops_executer.h"
38 #include "crimson/osd/osd_operations/peering_event.h"
39
40 namespace {
41 seastar::logger& logger() {
42 return crimson::get_logger(ceph_subsys_osd);
43 }
44 }
45
46 namespace std::chrono {
47 std::ostream& operator<<(std::ostream& out, const signedspan& d)
48 {
49 auto s = std::chrono::duration_cast<std::chrono::seconds>(d).count();
50 auto ns = std::abs((d % 1s).count());
51 fmt::print(out, "{}{}s", s, ns ? fmt::format(".{:0>9}", ns) : "");
52 return out;
53 }
54 }
55
56 namespace crimson::osd {
57
58 using crimson::common::local_conf;
59
60 class RecoverablePredicate : public IsPGRecoverablePredicate {
61 public:
62 bool operator()(const set<pg_shard_t> &have) const override {
63 return !have.empty();
64 }
65 };
66
67 class ReadablePredicate: public IsPGReadablePredicate {
68 pg_shard_t whoami;
69 public:
70 explicit ReadablePredicate(pg_shard_t whoami) : whoami(whoami) {}
71 bool operator()(const set<pg_shard_t> &have) const override {
72 return have.count(whoami);
73 }
74 };
75
76 PG::PG(
77 spg_t pgid,
78 pg_shard_t pg_shard,
79 crimson::os::CollectionRef coll_ref,
80 pg_pool_t&& pool,
81 std::string&& name,
82 cached_map_t osdmap,
83 ShardServices &shard_services,
84 ec_profile_t profile)
85 : pgid{pgid},
86 pg_whoami{pg_shard},
87 coll_ref{coll_ref},
88 pgmeta_oid{pgid.make_pgmeta_oid()},
89 osdmap_gate("PG::osdmap_gate", std::nullopt),
90 shard_services{shard_services},
91 osdmap{osdmap},
92 backend(
93 PGBackend::create(
94 pgid.pgid,
95 pg_shard,
96 pool,
97 coll_ref,
98 shard_services,
99 profile)),
100 peering_state(
101 shard_services.get_cct(),
102 pg_shard,
103 pgid,
104 PGPool(
105 shard_services.get_cct(),
106 osdmap,
107 pgid.pool(),
108 pool,
109 name),
110 osdmap,
111 this,
112 this),
113 wait_for_active_blocker(this)
114 {
115 peering_state.set_backend_predicates(
116 new ReadablePredicate(pg_whoami),
117 new RecoverablePredicate());
118 osdmap_gate.got_map(osdmap->get_epoch());
119 }
120
121 PG::~PG() {}
122
123 bool PG::try_flush_or_schedule_async() {
124 (void)shard_services.get_store().do_transaction(
125 coll_ref,
126 ObjectStore::Transaction()).then(
127 [this, epoch=get_osdmap_epoch()]() {
128 return shard_services.start_operation<LocalPeeringEvent>(
129 this,
130 shard_services,
131 pg_whoami,
132 pgid,
133 epoch,
134 epoch,
135 PeeringState::IntervalFlush());
136 });
137 return false;
138 }
139
140 void PG::queue_check_readable(epoch_t last_peering_reset, ceph::timespan delay)
141 {
142 // handle the peering event in the background
143 check_readable_timer.cancel();
144 check_readable_timer.set_callback([last_peering_reset, this] {
145 shard_services.start_operation<LocalPeeringEvent>(
146 this,
147 shard_services,
148 pg_whoami,
149 pgid,
150 last_peering_reset,
151 last_peering_reset,
152 PeeringState::CheckReadable{});
153 });
154 check_readable_timer.arm(
155 std::chrono::duration_cast<seastar::lowres_clock::duration>(delay));
156 }
157
158 void PG::recheck_readable()
159 {
160 bool changed = false;
161 const auto mnow = shard_services.get_mnow();
162 if (peering_state.state_test(PG_STATE_WAIT)) {
163 auto prior_readable_until_ub = peering_state.get_prior_readable_until_ub();
164 if (mnow < prior_readable_until_ub) {
165 logger().info("{} will wait (mnow {} < prior_readable_until_ub {})",
166 __func__, mnow, prior_readable_until_ub);
167 } else {
168 logger().info("{} no longer wait (mnow {} >= prior_readable_until_ub {})",
169 __func__, mnow, prior_readable_until_ub);
170 peering_state.state_clear(PG_STATE_WAIT);
171 peering_state.clear_prior_readable_until_ub();
172 changed = true;
173 }
174 }
175 if (peering_state.state_test(PG_STATE_LAGGY)) {
176 auto readable_until = peering_state.get_readable_until();
177 if (readable_until == readable_until.zero()) {
178 logger().info("{} still laggy (mnow {}, readable_until zero)",
179 __func__, mnow);
180 } else if (mnow >= readable_until) {
181 logger().info("{} still laggy (mnow {} >= readable_until {})",
182 __func__, mnow, readable_until);
183 } else {
184 logger().info("{} no longer laggy (mnow {} < readable_until {})",
185 __func__, mnow, readable_until);
186 peering_state.state_clear(PG_STATE_LAGGY);
187 changed = true;
188 }
189 }
190 if (changed) {
191 publish_stats_to_osd();
192 if (!peering_state.state_test(PG_STATE_WAIT) &&
193 !peering_state.state_test(PG_STATE_LAGGY)) {
194 // TODO: requeue ops waiting for readable
195 }
196 }
197 }
198
199 unsigned PG::get_target_pg_log_entries() const
200 {
201 const unsigned num_pgs = shard_services.get_pg_num();
202 const unsigned target =
203 local_conf().get_val<uint64_t>("osd_target_pg_log_entries_per_osd");
204 const unsigned min_pg_log_entries =
205 local_conf().get_val<uint64_t>("osd_min_pg_log_entries");
206 if (num_pgs > 0 && target > 0) {
207 // target an even spread of our budgeted log entries across all
208 // PGs. note that while we only get to control the entry count
209 // for primary PGs, we'll normally be responsible for a mix of
210 // primary and replica PGs (for the same pool(s) even), so this
211 // will work out.
212 const unsigned max_pg_log_entries =
213 local_conf().get_val<uint64_t>("osd_max_pg_log_entries");
214 return std::clamp(target / num_pgs,
215 min_pg_log_entries,
216 max_pg_log_entries);
217 } else {
218 // fall back to a per-pg value.
219 return min_pg_log_entries;
220 }
221 }
222
223 void PG::on_activate(interval_set<snapid_t>)
224 {
225 projected_last_update = peering_state.get_info().last_update;
226 }
227
228 void PG::on_activate_complete()
229 {
230 wait_for_active_blocker.on_active();
231
232 if (peering_state.needs_recovery()) {
233 shard_services.start_operation<LocalPeeringEvent>(
234 this,
235 shard_services,
236 pg_whoami,
237 pgid,
238 get_osdmap_epoch(),
239 get_osdmap_epoch(),
240 PeeringState::DoRecovery{});
241 } else if (peering_state.needs_backfill()) {
242 shard_services.start_operation<LocalPeeringEvent>(
243 this,
244 shard_services,
245 pg_whoami,
246 pgid,
247 get_osdmap_epoch(),
248 get_osdmap_epoch(),
249 PeeringState::RequestBackfill{});
250 } else {
251 shard_services.start_operation<LocalPeeringEvent>(
252 this,
253 shard_services,
254 pg_whoami,
255 pgid,
256 get_osdmap_epoch(),
257 get_osdmap_epoch(),
258 PeeringState::AllReplicasRecovered{});
259 }
260 }
261
262 void PG::prepare_write(pg_info_t &info,
263 pg_info_t &last_written_info,
264 PastIntervals &past_intervals,
265 PGLog &pglog,
266 bool dirty_info,
267 bool dirty_big_info,
268 bool need_write_epoch,
269 ceph::os::Transaction &t)
270 {
271 std::map<string,bufferlist> km;
272 std::string key_to_remove;
273 if (dirty_big_info || dirty_info) {
274 int ret = prepare_info_keymap(
275 shard_services.get_cct(),
276 &km,
277 &key_to_remove,
278 get_osdmap_epoch(),
279 info,
280 last_written_info,
281 past_intervals,
282 dirty_big_info,
283 need_write_epoch,
284 true,
285 nullptr,
286 this);
287 ceph_assert(ret == 0);
288 }
289 pglog.write_log_and_missing(
290 t, &km, coll_ref->get_cid(), pgmeta_oid,
291 peering_state.get_pool().info.require_rollback());
292 if (!km.empty()) {
293 t.omap_setkeys(coll_ref->get_cid(), pgmeta_oid, km);
294 }
295 if (!key_to_remove.empty()) {
296 t.omap_rmkey(coll_ref->get_cid(), pgmeta_oid, key_to_remove);
297 }
298 }
299
300 ghobject_t PG::do_delete_work(ceph::os::Transaction &t,
301 ghobject_t _next)
302 {
303 // TODO
304 shard_services.dec_pg_num();
305 }
306
307 void PG::log_state_enter(const char *state) {
308 logger().info("Entering state: {}", state);
309 }
310
311 void PG::log_state_exit(
312 const char *state_name, utime_t enter_time,
313 uint64_t events, utime_t event_dur) {
314 logger().info(
315 "Exiting state: {}, entered at {}, {} spent on {} events",
316 state_name,
317 enter_time,
318 event_dur,
319 events);
320 }
321
322 ceph::signedspan PG::get_mnow()
323 {
324 return shard_services.get_mnow();
325 }
326
327 HeartbeatStampsRef PG::get_hb_stamps(int peer)
328 {
329 return shard_services.get_hb_stamps(peer);
330 }
331
332 void PG::schedule_renew_lease(epoch_t last_peering_reset, ceph::timespan delay)
333 {
334 // handle the peering event in the background
335 renew_lease_timer.cancel();
336 renew_lease_timer.set_callback([last_peering_reset, this] {
337 shard_services.start_operation<LocalPeeringEvent>(
338 this,
339 shard_services,
340 pg_whoami,
341 pgid,
342 last_peering_reset,
343 last_peering_reset,
344 RenewLease{});
345 });
346 renew_lease_timer.arm(
347 std::chrono::duration_cast<seastar::lowres_clock::duration>(delay));
348 }
349
350
351 void PG::init(
352 int role,
353 const vector<int>& newup, int new_up_primary,
354 const vector<int>& newacting, int new_acting_primary,
355 const pg_history_t& history,
356 const PastIntervals& pi,
357 bool backfill,
358 ObjectStore::Transaction &t)
359 {
360 peering_state.init(
361 role, newup, new_up_primary, newacting,
362 new_acting_primary, history, pi, backfill, t);
363 }
364
365 seastar::future<> PG::read_state(crimson::os::FuturizedStore* store)
366 {
367 return PGMeta{store, pgid}.load(
368 ).then([this, store](pg_info_t pg_info, PastIntervals past_intervals) {
369 return peering_state.init_from_disk_state(
370 std::move(pg_info),
371 std::move(past_intervals),
372 [this, store] (PGLog &pglog) {
373 return pglog.read_log_and_missing_crimson(
374 *store,
375 coll_ref,
376 peering_state.get_info(),
377 pgmeta_oid);
378 });
379 }).then([this]() {
380 int primary, up_primary;
381 vector<int> acting, up;
382 peering_state.get_osdmap()->pg_to_up_acting_osds(
383 pgid.pgid, &up, &up_primary, &acting, &primary);
384 peering_state.init_primary_up_acting(
385 up,
386 acting,
387 up_primary,
388 primary);
389 int rr = OSDMap::calc_pg_role(pg_whoami, acting);
390 peering_state.set_role(rr);
391
392 epoch_t epoch = get_osdmap_epoch();
393 shard_services.start_operation<LocalPeeringEvent>(
394 this,
395 shard_services,
396 pg_whoami,
397 pgid,
398 epoch,
399 epoch,
400 PeeringState::Initialize());
401
402 return seastar::now();
403 });
404 }
405
406 void PG::do_peering_event(
407 const boost::statechart::event_base &evt,
408 PeeringCtx &rctx)
409 {
410 peering_state.handle_event(
411 evt,
412 &rctx);
413 peering_state.write_if_dirty(rctx.transaction);
414 }
415
416 void PG::do_peering_event(
417 PGPeeringEvent& evt, PeeringCtx &rctx)
418 {
419 if (!peering_state.pg_has_reset_since(evt.get_epoch_requested())) {
420 logger().debug("{} handling {}", __func__, evt.get_desc());
421 do_peering_event(evt.get_event(), rctx);
422 } else {
423 logger().debug("{} ignoring {} -- pg has reset", __func__, evt.get_desc());
424 }
425 }
426
427 void PG::handle_advance_map(
428 cached_map_t next_map, PeeringCtx &rctx)
429 {
430 vector<int> newup, newacting;
431 int up_primary, acting_primary;
432 next_map->pg_to_up_acting_osds(
433 pgid.pgid,
434 &newup, &up_primary,
435 &newacting, &acting_primary);
436 peering_state.advance_map(
437 next_map,
438 peering_state.get_osdmap(),
439 newup,
440 up_primary,
441 newacting,
442 acting_primary,
443 rctx);
444 osdmap_gate.got_map(next_map->get_epoch());
445 }
446
447 void PG::handle_activate_map(PeeringCtx &rctx)
448 {
449 peering_state.activate_map(rctx);
450 }
451
452 void PG::handle_initialize(PeeringCtx &rctx)
453 {
454 PeeringState::Initialize evt;
455 peering_state.handle_event(evt, &rctx);
456 }
457
458
459 void PG::print(ostream& out) const
460 {
461 out << peering_state << " ";
462 }
463
464
465 std::ostream& operator<<(std::ostream& os, const PG& pg)
466 {
467 os << " pg_epoch " << pg.get_osdmap_epoch() << " ";
468 pg.print(os);
469 return os;
470 }
471
472 void PG::WaitForActiveBlocker::dump_detail(Formatter *f) const
473 {
474 f->dump_stream("pgid") << pg->pgid;
475 }
476
477 void PG::WaitForActiveBlocker::on_active()
478 {
479 p.set_value();
480 p = {};
481 }
482
483 blocking_future<> PG::WaitForActiveBlocker::wait()
484 {
485 if (pg->peering_state.is_active()) {
486 return make_blocking_future(seastar::now());
487 } else {
488 return make_blocking_future(p.get_shared_future());
489 }
490 }
491
492 seastar::future<> PG::submit_transaction(ObjectContextRef&& obc,
493 ceph::os::Transaction&& txn,
494 const MOSDOp& req)
495 {
496 epoch_t map_epoch = get_osdmap_epoch();
497 eversion_t at_version{map_epoch, projected_last_update.version + 1};
498 return backend->mutate_object(peering_state.get_acting_recovery_backfill(),
499 std::move(obc),
500 std::move(txn),
501 req,
502 peering_state.get_last_peering_reset(),
503 map_epoch,
504 at_version).then([this](auto acked) {
505 for (const auto& peer : acked) {
506 peering_state.update_peer_last_complete_ondisk(
507 peer.shard, peer.last_complete_ondisk);
508 }
509 return seastar::now();
510 });
511 }
512
513 seastar::future<Ref<MOSDOpReply>> PG::do_osd_ops(
514 Ref<MOSDOp> m,
515 ObjectContextRef obc)
516 {
517 using osd_op_errorator = OpsExecuter::osd_op_errorator;
518 const auto oid = m->get_snapid() == CEPH_SNAPDIR ? m->get_hobj().get_head()
519 : m->get_hobj();
520 auto ox =
521 std::make_unique<OpsExecuter>(obc, *this/* as const& */, m);
522 return crimson::do_for_each(
523 m->ops, [obc, m, ox = ox.get()](OSDOp& osd_op) {
524 logger().debug(
525 "do_osd_ops: {} - object {} - handling op {}",
526 *m,
527 obc->obs.oi.soid,
528 ceph_osd_op_name(osd_op.op.op));
529 return ox->execute_osd_op(osd_op);
530 }).safe_then([this, obc, m, ox = ox.get()] {
531 logger().debug(
532 "do_osd_ops: {} - object {} all operations successful",
533 *m,
534 obc->obs.oi.soid);
535 return std::move(*ox).submit_changes(
536 [this, m] (auto&& txn, auto&& obc) -> osd_op_errorator::future<> {
537 // XXX: the entire lambda could be scheduled conditionally. ::if_then()?
538 if (txn.empty()) {
539 logger().debug(
540 "do_osd_ops: {} - object {} txn is empty, bypassing mutate",
541 *m,
542 obc->obs.oi.soid);
543 return osd_op_errorator::now();
544 } else {
545 logger().debug(
546 "do_osd_ops: {} - object {} submitting txn",
547 *m,
548 obc->obs.oi.soid);
549 return submit_transaction(std::move(obc), std::move(txn), *m);
550 }
551 });
552 }).safe_then([m, obc, this, ox_deleter = std::move(ox)] {
553 auto reply = make_message<MOSDOpReply>(m.get(), 0, get_osdmap_epoch(),
554 0, false);
555 reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
556 logger().debug(
557 "do_osd_ops: {} - object {} sending reply",
558 *m,
559 obc->obs.oi.soid);
560 return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
561 }, OpsExecuter::osd_op_errorator::all_same_way([=] (const std::error_code& e) {
562 assert(e.value() > 0);
563 logger().debug(
564 "do_osd_ops: {} - object {} got error code {}, {}",
565 *m,
566 obc->obs.oi.soid,
567 e.value(),
568 e.message());
569 auto reply = make_message<MOSDOpReply>(
570 m.get(), -e.value(), get_osdmap_epoch(), 0, false);
571 reply->set_enoent_reply_versions(peering_state.get_info().last_update,
572 peering_state.get_info().last_user_version);
573 return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
574 })).handle_exception_type([=,&oid](const crimson::osd::error& e) {
575 // we need this handler because throwing path which aren't errorated yet.
576 logger().debug(
577 "do_osd_ops: {} - object {} got unhandled exception {} ({})",
578 *m,
579 obc->obs.oi.soid,
580 e.code(),
581 e.what());
582 auto reply = make_message<MOSDOpReply>(
583 m.get(), -e.code().value(), get_osdmap_epoch(), 0, false);
584 reply->set_enoent_reply_versions(peering_state.get_info().last_update,
585 peering_state.get_info().last_user_version);
586 return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
587 });
588 }
589
590 seastar::future<Ref<MOSDOpReply>> PG::do_pg_ops(Ref<MOSDOp> m)
591 {
592 auto ox = std::make_unique<OpsExecuter>(*this/* as const& */, m);
593 return seastar::do_for_each(m->ops, [ox = ox.get()](OSDOp& osd_op) {
594 logger().debug("will be handling pg op {}", ceph_osd_op_name(osd_op.op.op));
595 return ox->execute_pg_op(osd_op);
596 }).then([m, this, ox = std::move(ox)] {
597 auto reply = make_message<MOSDOpReply>(m.get(), 0, get_osdmap_epoch(),
598 CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK,
599 false);
600 return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
601 }).handle_exception_type([=](const crimson::osd::error& e) {
602 auto reply = make_message<MOSDOpReply>(
603 m.get(), -e.code().value(), get_osdmap_epoch(), 0, false);
604 reply->set_enoent_reply_versions(peering_state.get_info().last_update,
605 peering_state.get_info().last_user_version);
606 return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
607 });
608 }
609
610 std::pair<hobject_t, RWState::State> PG::get_oid_and_lock(
611 const MOSDOp &m,
612 const OpInfo &op_info)
613 {
614 auto oid = m.get_snapid() == CEPH_SNAPDIR ?
615 m.get_hobj().get_head() : m.get_hobj();
616
617 RWState::State lock_type = RWState::RWNONE;
618 if (op_info.rwordered() && op_info.may_read()) {
619 lock_type = RWState::RWState::RWEXCL;
620 } else if (op_info.rwordered()) {
621 lock_type = RWState::RWState::RWWRITE;
622 } else {
623 ceph_assert(op_info.may_read());
624 lock_type = RWState::RWState::RWREAD;
625 }
626 return std::make_pair(oid, lock_type);
627 }
628
629 std::optional<hobject_t> PG::resolve_oid(
630 const SnapSet &ss,
631 const hobject_t &oid)
632 {
633 if (oid.snap > ss.seq) {
634 return oid.get_head();
635 } else {
636 // which clone would it be?
637 auto clone = std::upper_bound(
638 begin(ss.clones), end(ss.clones),
639 oid.snap);
640 if (clone == end(ss.clones)) {
641 // Doesn't exist, > last clone, < ss.seq
642 return std::nullopt;
643 }
644 auto citer = ss.clone_snaps.find(*clone);
645 // TODO: how do we want to handle this kind of logic error?
646 ceph_assert(citer != ss.clone_snaps.end());
647
648 if (std::find(
649 citer->second.begin(),
650 citer->second.end(),
651 *clone) == citer->second.end()) {
652 return std::nullopt;
653 } else {
654 auto soid = oid;
655 soid.snap = *clone;
656 return std::optional<hobject_t>(soid);
657 }
658 }
659 }
660
661 PG::load_obc_ertr::future<
662 std::pair<crimson::osd::ObjectContextRef, bool>>
663 PG::get_or_load_clone_obc(hobject_t oid, ObjectContextRef head)
664 {
665 ceph_assert(!oid.is_head());
666 using ObjectContextRef = crimson::osd::ObjectContextRef;
667 auto coid = resolve_oid(head->get_ro_ss(), oid);
668 if (!coid) {
669 return load_obc_ertr::make_ready_future<
670 std::pair<crimson::osd::ObjectContextRef, bool>>(
671 std::make_pair(ObjectContextRef(), true)
672 );
673 }
674 auto [obc, existed] = shard_services.obc_registry.get_cached_obc(*coid);
675 if (existed) {
676 return load_obc_ertr::make_ready_future<
677 std::pair<crimson::osd::ObjectContextRef, bool>>(
678 std::make_pair(obc, true)
679 );
680 } else {
681 bool got = obc->maybe_get_excl();
682 ceph_assert(got);
683 return backend->load_metadata(*coid).safe_then(
684 [oid, obc=std::move(obc), head](auto &&md) mutable {
685 obc->set_clone_state(std::move(md->os), std::move(head));
686 return load_obc_ertr::make_ready_future<
687 std::pair<crimson::osd::ObjectContextRef, bool>>(
688 std::make_pair(obc, false)
689 );
690 });
691 }
692 }
693
694 PG::load_obc_ertr::future<
695 std::pair<crimson::osd::ObjectContextRef, bool>>
696 PG::get_or_load_head_obc(hobject_t oid)
697 {
698 ceph_assert(oid.is_head());
699 auto [obc, existed] = shard_services.obc_registry.get_cached_obc(oid);
700 if (existed) {
701 logger().debug(
702 "{}: found {} in cache",
703 __func__,
704 oid);
705 return load_obc_ertr::make_ready_future<
706 std::pair<crimson::osd::ObjectContextRef, bool>>(
707 std::make_pair(std::move(obc), true)
708 );
709 } else {
710 logger().debug(
711 "{}: cache miss on {}",
712 __func__,
713 oid);
714 bool got = obc->maybe_get_excl();
715 ceph_assert(got);
716 return backend->load_metadata(oid).safe_then(
717 [oid, obc=std::move(obc)](auto md) ->
718 load_obc_ertr::future<
719 std::pair<crimson::osd::ObjectContextRef, bool>>
720 {
721 logger().debug(
722 "{}: loaded obs {} for {}",
723 __func__,
724 md->os.oi,
725 oid);
726 if (!md->ss) {
727 logger().error(
728 "{}: oid {} missing snapset",
729 __func__,
730 oid);
731 return crimson::ct_error::object_corrupted::make();
732 }
733 obc->set_head_state(std::move(md->os), std::move(*(md->ss)));
734 logger().debug(
735 "{}: returning obc {} for {}",
736 __func__,
737 obc->obs.oi,
738 obc->obs.oi.soid);
739 return load_obc_ertr::make_ready_future<
740 std::pair<crimson::osd::ObjectContextRef, bool>>(
741 std::make_pair(obc, false)
742 );
743 });
744 }
745 }
746
747 PG::load_obc_ertr::future<crimson::osd::ObjectContextRef>
748 PG::get_locked_obc(
749 Operation *op, const hobject_t &oid, RWState::State type)
750 {
751 return get_or_load_head_obc(oid.get_head()).safe_then(
752 [this, op, oid, type](auto p) -> load_obc_ertr::future<ObjectContextRef>{
753 auto &[head_obc, head_existed] = p;
754 if (oid.is_head()) {
755 if (head_existed) {
756 return head_obc->get_lock_type(op, type).then([head_obc=head_obc] {
757 ceph_assert(head_obc->loaded);
758 return load_obc_ertr::make_ready_future<ObjectContextRef>(head_obc);
759 });
760 } else {
761 head_obc->degrade_excl_to(type);
762 return load_obc_ertr::make_ready_future<ObjectContextRef>(head_obc);
763 }
764 } else {
765 return head_obc->get_lock_type(op, RWState::RWREAD).then(
766 [this, head_obc=head_obc, oid] {
767 ceph_assert(head_obc->loaded);
768 return get_or_load_clone_obc(oid, head_obc);
769 }).safe_then([head_obc=head_obc, op, oid, type](auto p) {
770 auto &[obc, existed] = p;
771 if (existed) {
772 return load_obc_ertr::future<>(
773 obc->get_lock_type(op, type)).safe_then([obc=obc] {
774 ceph_assert(obc->loaded);
775 return load_obc_ertr::make_ready_future<ObjectContextRef>(obc);
776 });
777 } else {
778 obc->degrade_excl_to(type);
779 return load_obc_ertr::make_ready_future<ObjectContextRef>(obc);
780 }
781 }).safe_then([head_obc=head_obc](auto obc) {
782 head_obc->put_lock_type(RWState::RWREAD);
783 return load_obc_ertr::make_ready_future<ObjectContextRef>(obc);
784 });
785 }
786 });
787 }
788
789 seastar::future<> PG::handle_rep_op(Ref<MOSDRepOp> req)
790 {
791 ceph::os::Transaction txn;
792 auto encoded_txn = req->get_data().cbegin();
793 decode(txn, encoded_txn);
794 return shard_services.get_store().do_transaction(coll_ref, std::move(txn))
795 .then([req, lcod=peering_state.get_info().last_complete, this] {
796 peering_state.update_last_complete_ondisk(lcod);
797 const auto map_epoch = get_osdmap_epoch();
798 auto reply = make_message<MOSDRepOpReply>(
799 req.get(), pg_whoami, 0,
800 map_epoch, req->get_min_epoch(), CEPH_OSD_FLAG_ONDISK);
801 reply->set_last_complete_ondisk(lcod);
802 return shard_services.send_to_osd(req->from.osd, reply, map_epoch);
803 });
804 }
805
806 void PG::handle_rep_op_reply(crimson::net::Connection* conn,
807 const MOSDRepOpReply& m)
808 {
809 backend->got_rep_op_reply(m);
810 }
811
812 }