]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/osd/pg.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / crimson / osd / pg.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "pg.h"
5
6 #include <functional>
7
8 #include <boost/range/adaptor/filtered.hpp>
9 #include <boost/range/adaptor/map.hpp>
10 #include <boost/range/adaptor/transformed.hpp>
11 #include <boost/range/algorithm/copy.hpp>
12 #include <boost/range/algorithm/max_element.hpp>
13 #include <boost/range/numeric.hpp>
14 #include <fmt/format.h>
15 #include <fmt/ostream.h>
16
17 #include "messages/MOSDOp.h"
18 #include "messages/MOSDOpReply.h"
19 #include "messages/MOSDPGInfo.h"
20 #include "messages/MOSDPGLog.h"
21 #include "messages/MOSDPGNotify.h"
22 #include "messages/MOSDPGQuery.h"
23 #include "messages/MOSDRepOp.h"
24 #include "messages/MOSDRepOpReply.h"
25
26 #include "osd/OSDMap.h"
27
28 #include "os/Transaction.h"
29
30 #include "crimson/net/Connection.h"
31 #include "crimson/net/Messenger.h"
32 #include "crimson/os/cyanstore/cyan_store.h"
33 #include "crimson/os/futurized_collection.h"
34 #include "crimson/osd/exceptions.h"
35 #include "crimson/osd/pg_meta.h"
36 #include "crimson/osd/pg_backend.h"
37 #include "crimson/osd/ops_executer.h"
38 #include "crimson/osd/osd_operations/peering_event.h"
39
40 namespace {
41 seastar::logger& logger() {
42 return crimson::get_logger(ceph_subsys_osd);
43 }
44 }
45
46 namespace std::chrono {
47 std::ostream& operator<<(std::ostream& out, const signedspan& d)
48 {
49 auto s = std::chrono::duration_cast<std::chrono::seconds>(d).count();
50 auto ns = std::abs((d % 1s).count());
51 fmt::print(out, "{}{}s", s, ns ? fmt::format(".{:0>9}", ns) : "");
52 return out;
53 }
54 }
55
56 namespace crimson::osd {
57
58 using crimson::common::local_conf;
59
60 class RecoverablePredicate : public IsPGRecoverablePredicate {
61 public:
62 bool operator()(const set<pg_shard_t> &have) const override {
63 return !have.empty();
64 }
65 };
66
67 class ReadablePredicate: public IsPGReadablePredicate {
68 pg_shard_t whoami;
69 public:
70 explicit ReadablePredicate(pg_shard_t whoami) : whoami(whoami) {}
71 bool operator()(const set<pg_shard_t> &have) const override {
72 return have.count(whoami);
73 }
74 };
75
76 PG::PG(
77 spg_t pgid,
78 pg_shard_t pg_shard,
79 crimson::os::CollectionRef coll_ref,
80 pg_pool_t&& pool,
81 std::string&& name,
82 cached_map_t osdmap,
83 ShardServices &shard_services,
84 ec_profile_t profile)
85 : pgid{pgid},
86 pg_whoami{pg_shard},
87 coll_ref{coll_ref},
88 pgmeta_oid{pgid.make_pgmeta_oid()},
89 osdmap_gate("PG::osdmap_gate", std::nullopt),
90 shard_services{shard_services},
91 osdmap{osdmap},
92 backend(
93 PGBackend::create(
94 pgid.pgid,
95 pg_shard,
96 pool,
97 coll_ref,
98 shard_services,
99 profile)),
100 peering_state(
101 shard_services.get_cct(),
102 pg_shard,
103 pgid,
104 PGPool(
105 shard_services.get_cct(),
106 osdmap,
107 pgid.pool(),
108 pool,
109 name),
110 osdmap,
111 this,
112 this),
113 wait_for_active_blocker(this)
114 {
115 peering_state.set_backend_predicates(
116 new ReadablePredicate(pg_whoami),
117 new RecoverablePredicate());
118 osdmap_gate.got_map(osdmap->get_epoch());
119 }
120
121 PG::~PG() {}
122
123 bool PG::try_flush_or_schedule_async() {
124 (void)shard_services.get_store().do_transaction(
125 coll_ref,
126 ObjectStore::Transaction()).then(
127 [this, epoch=get_osdmap_epoch()]() {
128 return shard_services.start_operation<LocalPeeringEvent>(
129 this,
130 shard_services,
131 pg_whoami,
132 pgid,
133 epoch,
134 epoch,
135 PeeringState::IntervalFlush());
136 });
137 return false;
138 }
139
140 void PG::queue_check_readable(epoch_t last_peering_reset, ceph::timespan delay)
141 {
142 // handle the peering event in the background
143 check_readable_timer.cancel();
144 check_readable_timer.set_callback([last_peering_reset, this] {
145 shard_services.start_operation<LocalPeeringEvent>(
146 this,
147 shard_services,
148 pg_whoami,
149 pgid,
150 last_peering_reset,
151 last_peering_reset,
152 PeeringState::CheckReadable{});
153 });
154 check_readable_timer.arm(
155 std::chrono::duration_cast<seastar::lowres_clock::duration>(delay));
156 }
157
158 void PG::recheck_readable()
159 {
160 bool changed = false;
161 const auto mnow = shard_services.get_mnow();
162 if (peering_state.state_test(PG_STATE_WAIT)) {
163 auto prior_readable_until_ub = peering_state.get_prior_readable_until_ub();
164 if (mnow < prior_readable_until_ub) {
165 logger().info("{} will wait (mnow {} < prior_readable_until_ub {})",
166 __func__, mnow, prior_readable_until_ub);
167 } else {
168 logger().info("{} no longer wait (mnow {} >= prior_readable_until_ub {})",
169 __func__, mnow, prior_readable_until_ub);
170 peering_state.state_clear(PG_STATE_WAIT);
171 peering_state.clear_prior_readable_until_ub();
172 changed = true;
173 }
174 }
175 if (peering_state.state_test(PG_STATE_LAGGY)) {
176 auto readable_until = peering_state.get_readable_until();
177 if (readable_until == readable_until.zero()) {
178 logger().info("{} still laggy (mnow {}, readable_until zero)",
179 __func__, mnow);
180 } else if (mnow >= readable_until) {
181 logger().info("{} still laggy (mnow {} >= readable_until {})",
182 __func__, mnow, readable_until);
183 } else {
184 logger().info("{} no longer laggy (mnow {} < readable_until {})",
185 __func__, mnow, readable_until);
186 peering_state.state_clear(PG_STATE_LAGGY);
187 changed = true;
188 }
189 }
190 if (changed) {
191 publish_stats_to_osd();
192 if (!peering_state.state_test(PG_STATE_WAIT) &&
193 !peering_state.state_test(PG_STATE_LAGGY)) {
194 // TODO: requeue ops waiting for readable
195 }
196 }
197 }
198
199 unsigned PG::get_target_pg_log_entries() const
200 {
201 const unsigned num_pgs = shard_services.get_pg_num();
202 const unsigned target =
203 local_conf().get_val<uint64_t>("osd_target_pg_log_entries_per_osd");
204 const unsigned min_pg_log_entries =
205 local_conf().get_val<uint64_t>("osd_min_pg_log_entries");
206 if (num_pgs > 0 && target > 0) {
207 // target an even spread of our budgeted log entries across all
208 // PGs. note that while we only get to control the entry count
209 // for primary PGs, we'll normally be responsible for a mix of
210 // primary and replica PGs (for the same pool(s) even), so this
211 // will work out.
212 const unsigned max_pg_log_entries =
213 local_conf().get_val<uint64_t>("osd_max_pg_log_entries");
214 return std::clamp(target / num_pgs,
215 min_pg_log_entries,
216 max_pg_log_entries);
217 } else {
218 // fall back to a per-pg value.
219 return min_pg_log_entries;
220 }
221 }
222
223 void PG::on_activate(interval_set<snapid_t>)
224 {
225 projected_last_update = peering_state.get_info().last_update;
226 }
227
228 void PG::on_activate_complete()
229 {
230 wait_for_active_blocker.on_active();
231
232 if (peering_state.needs_recovery()) {
233 shard_services.start_operation<LocalPeeringEvent>(
234 this,
235 shard_services,
236 pg_whoami,
237 pgid,
238 get_osdmap_epoch(),
239 get_osdmap_epoch(),
240 PeeringState::DoRecovery{});
241 } else if (peering_state.needs_backfill()) {
242 shard_services.start_operation<LocalPeeringEvent>(
243 this,
244 shard_services,
245 pg_whoami,
246 pgid,
247 get_osdmap_epoch(),
248 get_osdmap_epoch(),
249 PeeringState::RequestBackfill{});
250 } else {
251 shard_services.start_operation<LocalPeeringEvent>(
252 this,
253 shard_services,
254 pg_whoami,
255 pgid,
256 get_osdmap_epoch(),
257 get_osdmap_epoch(),
258 PeeringState::AllReplicasRecovered{});
259 }
260 }
261
262 void PG::prepare_write(pg_info_t &info,
263 pg_info_t &last_written_info,
264 PastIntervals &past_intervals,
265 PGLog &pglog,
266 bool dirty_info,
267 bool dirty_big_info,
268 bool need_write_epoch,
269 ceph::os::Transaction &t)
270 {
271 std::map<string,bufferlist> km;
272 std::string key_to_remove;
273 if (dirty_big_info || dirty_info) {
274 int ret = prepare_info_keymap(
275 shard_services.get_cct(),
276 &km,
277 &key_to_remove,
278 get_osdmap_epoch(),
279 info,
280 last_written_info,
281 past_intervals,
282 dirty_big_info,
283 need_write_epoch,
284 true,
285 nullptr,
286 this);
287 ceph_assert(ret == 0);
288 }
289 pglog.write_log_and_missing(
290 t, &km, coll_ref->get_cid(), pgmeta_oid,
291 peering_state.get_pool().info.require_rollback());
292 if (!km.empty()) {
293 t.omap_setkeys(coll_ref->get_cid(), pgmeta_oid, km);
294 }
295 if (!key_to_remove.empty()) {
296 t.omap_rmkey(coll_ref->get_cid(), pgmeta_oid, key_to_remove);
297 }
298 }
299
300 void PG::do_delete_work(ceph::os::Transaction &t)
301 {
302 // TODO
303 shard_services.dec_pg_num();
304 }
305
306 void PG::log_state_enter(const char *state) {
307 logger().info("Entering state: {}", state);
308 }
309
310 void PG::log_state_exit(
311 const char *state_name, utime_t enter_time,
312 uint64_t events, utime_t event_dur) {
313 logger().info(
314 "Exiting state: {}, entered at {}, {} spent on {} events",
315 state_name,
316 enter_time,
317 event_dur,
318 events);
319 }
320
321 ceph::signedspan PG::get_mnow()
322 {
323 return shard_services.get_mnow();
324 }
325
326 HeartbeatStampsRef PG::get_hb_stamps(int peer)
327 {
328 return shard_services.get_hb_stamps(peer);
329 }
330
331 void PG::schedule_renew_lease(epoch_t last_peering_reset, ceph::timespan delay)
332 {
333 // handle the peering event in the background
334 renew_lease_timer.cancel();
335 renew_lease_timer.set_callback([last_peering_reset, this] {
336 shard_services.start_operation<LocalPeeringEvent>(
337 this,
338 shard_services,
339 pg_whoami,
340 pgid,
341 last_peering_reset,
342 last_peering_reset,
343 RenewLease{});
344 });
345 renew_lease_timer.arm(
346 std::chrono::duration_cast<seastar::lowres_clock::duration>(delay));
347 }
348
349
350 void PG::init(
351 int role,
352 const vector<int>& newup, int new_up_primary,
353 const vector<int>& newacting, int new_acting_primary,
354 const pg_history_t& history,
355 const PastIntervals& pi,
356 bool backfill,
357 ObjectStore::Transaction &t)
358 {
359 peering_state.init(
360 role, newup, new_up_primary, newacting,
361 new_acting_primary, history, pi, backfill, t);
362 }
363
364 seastar::future<> PG::read_state(crimson::os::FuturizedStore* store)
365 {
366 return PGMeta{store, pgid}.load(
367 ).then([this, store](pg_info_t pg_info, PastIntervals past_intervals) {
368 return peering_state.init_from_disk_state(
369 std::move(pg_info),
370 std::move(past_intervals),
371 [this, store] (PGLog &pglog) {
372 return pglog.read_log_and_missing_crimson(
373 *store,
374 coll_ref,
375 peering_state.get_info(),
376 pgmeta_oid);
377 });
378 }).then([this]() {
379 int primary, up_primary;
380 vector<int> acting, up;
381 peering_state.get_osdmap()->pg_to_up_acting_osds(
382 pgid.pgid, &up, &up_primary, &acting, &primary);
383 peering_state.init_primary_up_acting(
384 up,
385 acting,
386 up_primary,
387 primary);
388 int rr = OSDMap::calc_pg_role(pg_whoami, acting);
389 peering_state.set_role(rr);
390
391 epoch_t epoch = get_osdmap_epoch();
392 shard_services.start_operation<LocalPeeringEvent>(
393 this,
394 shard_services,
395 pg_whoami,
396 pgid,
397 epoch,
398 epoch,
399 PeeringState::Initialize());
400
401 return seastar::now();
402 });
403 }
404
405 void PG::do_peering_event(
406 const boost::statechart::event_base &evt,
407 PeeringCtx &rctx)
408 {
409 peering_state.handle_event(
410 evt,
411 &rctx);
412 peering_state.write_if_dirty(rctx.transaction);
413 }
414
415 void PG::do_peering_event(
416 PGPeeringEvent& evt, PeeringCtx &rctx)
417 {
418 if (!peering_state.pg_has_reset_since(evt.get_epoch_requested())) {
419 logger().debug("{} handling {}", __func__, evt.get_desc());
420 do_peering_event(evt.get_event(), rctx);
421 } else {
422 logger().debug("{} ignoring {} -- pg has reset", __func__, evt.get_desc());
423 }
424 }
425
426 void PG::handle_advance_map(
427 cached_map_t next_map, PeeringCtx &rctx)
428 {
429 vector<int> newup, newacting;
430 int up_primary, acting_primary;
431 next_map->pg_to_up_acting_osds(
432 pgid.pgid,
433 &newup, &up_primary,
434 &newacting, &acting_primary);
435 peering_state.advance_map(
436 next_map,
437 peering_state.get_osdmap(),
438 newup,
439 up_primary,
440 newacting,
441 acting_primary,
442 rctx);
443 osdmap_gate.got_map(next_map->get_epoch());
444 }
445
446 void PG::handle_activate_map(PeeringCtx &rctx)
447 {
448 peering_state.activate_map(rctx);
449 }
450
451 void PG::handle_initialize(PeeringCtx &rctx)
452 {
453 PeeringState::Initialize evt;
454 peering_state.handle_event(evt, &rctx);
455 }
456
457
458 void PG::print(ostream& out) const
459 {
460 out << peering_state << " ";
461 }
462
463
464 std::ostream& operator<<(std::ostream& os, const PG& pg)
465 {
466 os << " pg_epoch " << pg.get_osdmap_epoch() << " ";
467 pg.print(os);
468 return os;
469 }
470
471 void PG::WaitForActiveBlocker::dump_detail(Formatter *f) const
472 {
473 f->dump_stream("pgid") << pg->pgid;
474 }
475
476 void PG::WaitForActiveBlocker::on_active()
477 {
478 p.set_value();
479 p = {};
480 }
481
482 blocking_future<> PG::WaitForActiveBlocker::wait()
483 {
484 if (pg->peering_state.is_active()) {
485 return make_blocking_future(seastar::now());
486 } else {
487 return make_blocking_future(p.get_shared_future());
488 }
489 }
490
491 seastar::future<> PG::submit_transaction(ObjectContextRef&& obc,
492 ceph::os::Transaction&& txn,
493 const MOSDOp& req)
494 {
495 epoch_t map_epoch = get_osdmap_epoch();
496 eversion_t at_version{map_epoch, projected_last_update.version + 1};
497 return backend->mutate_object(peering_state.get_acting_recovery_backfill(),
498 std::move(obc),
499 std::move(txn),
500 req,
501 peering_state.get_last_peering_reset(),
502 map_epoch,
503 at_version).then([this](auto acked) {
504 for (const auto& peer : acked) {
505 peering_state.update_peer_last_complete_ondisk(
506 peer.shard, peer.last_complete_ondisk);
507 }
508 return seastar::now();
509 });
510 }
511
512 seastar::future<Ref<MOSDOpReply>> PG::do_osd_ops(
513 Ref<MOSDOp> m,
514 ObjectContextRef obc)
515 {
516 using osd_op_errorator = OpsExecuter::osd_op_errorator;
517 const auto oid = m->get_snapid() == CEPH_SNAPDIR ? m->get_hobj().get_head()
518 : m->get_hobj();
519 auto ox =
520 std::make_unique<OpsExecuter>(obc, *this/* as const& */, m);
521 return crimson::do_for_each(
522 m->ops, [obc, m, ox = ox.get()](OSDOp& osd_op) {
523 logger().debug(
524 "do_osd_ops: {} - object {} - handling op {}",
525 *m,
526 obc->obs.oi.soid,
527 ceph_osd_op_name(osd_op.op.op));
528 return ox->execute_osd_op(osd_op);
529 }).safe_then([this, obc, m, ox = ox.get()] {
530 logger().debug(
531 "do_osd_ops: {} - object {} all operations successful",
532 *m,
533 obc->obs.oi.soid);
534 return std::move(*ox).submit_changes(
535 [this, m] (auto&& txn, auto&& obc) -> osd_op_errorator::future<> {
536 // XXX: the entire lambda could be scheduled conditionally. ::if_then()?
537 if (txn.empty()) {
538 logger().debug(
539 "do_osd_ops: {} - object {} txn is empty, bypassing mutate",
540 *m,
541 obc->obs.oi.soid);
542 return osd_op_errorator::now();
543 } else {
544 logger().debug(
545 "do_osd_ops: {} - object {} submitting txn",
546 *m,
547 obc->obs.oi.soid);
548 return submit_transaction(std::move(obc), std::move(txn), *m);
549 }
550 });
551 }).safe_then([m, obc, this, ox_deleter = std::move(ox)] {
552 auto reply = make_message<MOSDOpReply>(m.get(), 0, get_osdmap_epoch(),
553 0, false);
554 reply->add_flags(CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK);
555 logger().debug(
556 "do_osd_ops: {} - object {} sending reply",
557 *m,
558 obc->obs.oi.soid);
559 return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
560 }, OpsExecuter::osd_op_errorator::all_same_way([=] (const std::error_code& e) {
561 assert(e.value() > 0);
562 logger().debug(
563 "do_osd_ops: {} - object {} got error code {}, {}",
564 *m,
565 obc->obs.oi.soid,
566 e.value(),
567 e.message());
568 auto reply = make_message<MOSDOpReply>(
569 m.get(), -e.value(), get_osdmap_epoch(), 0, false);
570 reply->set_enoent_reply_versions(peering_state.get_info().last_update,
571 peering_state.get_info().last_user_version);
572 return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
573 })).handle_exception_type([=,&oid](const crimson::osd::error& e) {
574 // we need this handler because throwing path which aren't errorated yet.
575 logger().debug(
576 "do_osd_ops: {} - object {} got unhandled exception {} ({})",
577 *m,
578 obc->obs.oi.soid,
579 e.code(),
580 e.what());
581 auto reply = make_message<MOSDOpReply>(
582 m.get(), -e.code().value(), get_osdmap_epoch(), 0, false);
583 reply->set_enoent_reply_versions(peering_state.get_info().last_update,
584 peering_state.get_info().last_user_version);
585 return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
586 });
587 }
588
589 seastar::future<Ref<MOSDOpReply>> PG::do_pg_ops(Ref<MOSDOp> m)
590 {
591 auto ox = std::make_unique<OpsExecuter>(*this/* as const& */, m);
592 return seastar::do_for_each(m->ops, [ox = ox.get()](OSDOp& osd_op) {
593 logger().debug("will be handling pg op {}", ceph_osd_op_name(osd_op.op.op));
594 return ox->execute_pg_op(osd_op);
595 }).then([m, this, ox = std::move(ox)] {
596 auto reply = make_message<MOSDOpReply>(m.get(), 0, get_osdmap_epoch(),
597 CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK,
598 false);
599 return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
600 }).handle_exception_type([=](const crimson::osd::error& e) {
601 auto reply = make_message<MOSDOpReply>(
602 m.get(), -e.code().value(), get_osdmap_epoch(), 0, false);
603 reply->set_enoent_reply_versions(peering_state.get_info().last_update,
604 peering_state.get_info().last_user_version);
605 return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
606 });
607 }
608
609 std::pair<hobject_t, RWState::State> PG::get_oid_and_lock(
610 const MOSDOp &m,
611 const OpInfo &op_info)
612 {
613 auto oid = m.get_snapid() == CEPH_SNAPDIR ?
614 m.get_hobj().get_head() : m.get_hobj();
615
616 RWState::State lock_type = RWState::RWNONE;
617 if (op_info.rwordered() && op_info.may_read()) {
618 lock_type = RWState::RWState::RWEXCL;
619 } else if (op_info.rwordered()) {
620 lock_type = RWState::RWState::RWWRITE;
621 } else {
622 ceph_assert(op_info.may_read());
623 lock_type = RWState::RWState::RWREAD;
624 }
625 return std::make_pair(oid, lock_type);
626 }
627
628 std::optional<hobject_t> PG::resolve_oid(
629 const SnapSet &ss,
630 const hobject_t &oid)
631 {
632 if (oid.snap > ss.seq) {
633 return oid.get_head();
634 } else {
635 // which clone would it be?
636 auto clone = std::upper_bound(
637 begin(ss.clones), end(ss.clones),
638 oid.snap);
639 if (clone == end(ss.clones)) {
640 // Doesn't exist, > last clone, < ss.seq
641 return std::nullopt;
642 }
643 auto citer = ss.clone_snaps.find(*clone);
644 // TODO: how do we want to handle this kind of logic error?
645 ceph_assert(citer != ss.clone_snaps.end());
646
647 if (std::find(
648 citer->second.begin(),
649 citer->second.end(),
650 *clone) == citer->second.end()) {
651 return std::nullopt;
652 } else {
653 auto soid = oid;
654 soid.snap = *clone;
655 return std::optional<hobject_t>(soid);
656 }
657 }
658 }
659
660 PG::load_obc_ertr::future<
661 std::pair<crimson::osd::ObjectContextRef, bool>>
662 PG::get_or_load_clone_obc(hobject_t oid, ObjectContextRef head)
663 {
664 ceph_assert(!oid.is_head());
665 using ObjectContextRef = crimson::osd::ObjectContextRef;
666 auto coid = resolve_oid(head->get_ro_ss(), oid);
667 if (!coid) {
668 return load_obc_ertr::make_ready_future<
669 std::pair<crimson::osd::ObjectContextRef, bool>>(
670 std::make_pair(ObjectContextRef(), true)
671 );
672 }
673 auto [obc, existed] = shard_services.obc_registry.get_cached_obc(*coid);
674 if (existed) {
675 return load_obc_ertr::make_ready_future<
676 std::pair<crimson::osd::ObjectContextRef, bool>>(
677 std::make_pair(obc, true)
678 );
679 } else {
680 bool got = obc->maybe_get_excl();
681 ceph_assert(got);
682 return backend->load_metadata(*coid).safe_then(
683 [oid, obc=std::move(obc), head](auto &&md) mutable {
684 obc->set_clone_state(std::move(md->os), std::move(head));
685 return load_obc_ertr::make_ready_future<
686 std::pair<crimson::osd::ObjectContextRef, bool>>(
687 std::make_pair(obc, false)
688 );
689 });
690 }
691 }
692
693 PG::load_obc_ertr::future<
694 std::pair<crimson::osd::ObjectContextRef, bool>>
695 PG::get_or_load_head_obc(hobject_t oid)
696 {
697 ceph_assert(oid.is_head());
698 auto [obc, existed] = shard_services.obc_registry.get_cached_obc(oid);
699 if (existed) {
700 logger().debug(
701 "{}: found {} in cache",
702 __func__,
703 oid);
704 return load_obc_ertr::make_ready_future<
705 std::pair<crimson::osd::ObjectContextRef, bool>>(
706 std::make_pair(std::move(obc), true)
707 );
708 } else {
709 logger().debug(
710 "{}: cache miss on {}",
711 __func__,
712 oid);
713 bool got = obc->maybe_get_excl();
714 ceph_assert(got);
715 return backend->load_metadata(oid).safe_then(
716 [oid, obc=std::move(obc)](auto md) ->
717 load_obc_ertr::future<
718 std::pair<crimson::osd::ObjectContextRef, bool>>
719 {
720 logger().debug(
721 "{}: loaded obs {} for {}",
722 __func__,
723 md->os.oi,
724 oid);
725 if (!md->ss) {
726 logger().error(
727 "{}: oid {} missing snapset",
728 __func__,
729 oid);
730 return crimson::ct_error::object_corrupted::make();
731 }
732 obc->set_head_state(std::move(md->os), std::move(*(md->ss)));
733 logger().debug(
734 "{}: returning obc {} for {}",
735 __func__,
736 obc->obs.oi,
737 obc->obs.oi.soid);
738 return load_obc_ertr::make_ready_future<
739 std::pair<crimson::osd::ObjectContextRef, bool>>(
740 std::make_pair(obc, false)
741 );
742 });
743 }
744 }
745
746 PG::load_obc_ertr::future<crimson::osd::ObjectContextRef>
747 PG::get_locked_obc(
748 Operation *op, const hobject_t &oid, RWState::State type)
749 {
750 return get_or_load_head_obc(oid.get_head()).safe_then(
751 [this, op, oid, type](auto p) -> load_obc_ertr::future<ObjectContextRef>{
752 auto &[head_obc, head_existed] = p;
753 if (oid.is_head()) {
754 if (head_existed) {
755 return head_obc->get_lock_type(op, type).then([head_obc=head_obc] {
756 ceph_assert(head_obc->loaded);
757 return load_obc_ertr::make_ready_future<ObjectContextRef>(head_obc);
758 });
759 } else {
760 head_obc->degrade_excl_to(type);
761 return load_obc_ertr::make_ready_future<ObjectContextRef>(head_obc);
762 }
763 } else {
764 return head_obc->get_lock_type(op, RWState::RWREAD).then(
765 [this, head_obc=head_obc, oid] {
766 ceph_assert(head_obc->loaded);
767 return get_or_load_clone_obc(oid, head_obc);
768 }).safe_then([head_obc=head_obc, op, oid, type](auto p) {
769 auto &[obc, existed] = p;
770 if (existed) {
771 return load_obc_ertr::future<>(
772 obc->get_lock_type(op, type)).safe_then([obc=obc] {
773 ceph_assert(obc->loaded);
774 return load_obc_ertr::make_ready_future<ObjectContextRef>(obc);
775 });
776 } else {
777 obc->degrade_excl_to(type);
778 return load_obc_ertr::make_ready_future<ObjectContextRef>(obc);
779 }
780 }).safe_then([head_obc=head_obc](auto obc) {
781 head_obc->put_lock_type(RWState::RWREAD);
782 return load_obc_ertr::make_ready_future<ObjectContextRef>(obc);
783 });
784 }
785 });
786 }
787
788 seastar::future<> PG::handle_rep_op(Ref<MOSDRepOp> req)
789 {
790 ceph::os::Transaction txn;
791 auto encoded_txn = req->get_data().cbegin();
792 decode(txn, encoded_txn);
793 return shard_services.get_store().do_transaction(coll_ref, std::move(txn))
794 .then([req, lcod=peering_state.get_info().last_complete, this] {
795 peering_state.update_last_complete_ondisk(lcod);
796 const auto map_epoch = get_osdmap_epoch();
797 auto reply = make_message<MOSDRepOpReply>(
798 req.get(), pg_whoami, 0,
799 map_epoch, req->get_min_epoch(), CEPH_OSD_FLAG_ONDISK);
800 reply->set_last_complete_ondisk(lcod);
801 return shard_services.send_to_osd(req->from.osd, reply, map_epoch);
802 });
803 }
804
805 void PG::handle_rep_op_reply(crimson::net::Connection* conn,
806 const MOSDRepOpReply& m)
807 {
808 backend->got_rep_op_reply(m);
809 }
810
811 }