]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/osd/shard_services.cc
add stop-gap to fix compat with CPUs not supporting SSE 4.1
[ceph.git] / ceph / src / crimson / osd / shard_services.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <boost/smart_ptr/make_local_shared.hpp>
5
6 #include "crimson/osd/shard_services.h"
7
8 #include "messages/MOSDAlive.h"
9 #include "messages/MOSDMap.h"
10 #include "messages/MOSDPGCreated.h"
11 #include "messages/MOSDPGTemp.h"
12
13 #include "osd/osd_perf_counters.h"
14 #include "osd/PeeringState.h"
15 #include "crimson/common/config_proxy.h"
16 #include "crimson/mgr/client.h"
17 #include "crimson/mon/MonClient.h"
18 #include "crimson/net/Messenger.h"
19 #include "crimson/net/Connection.h"
20 #include "crimson/os/cyanstore/cyan_store.h"
21 #include "crimson/osd/osdmap_service.h"
22 #include "crimson/osd/osd_operations/pg_advance_map.h"
23 #include "crimson/osd/pg.h"
24 #include "crimson/osd/pg_meta.h"
25
26 namespace {
27 seastar::logger& logger() {
28 return crimson::get_logger(ceph_subsys_osd);
29 }
30 }
31
32 using std::vector;
33
34 namespace crimson::osd {
35
36 PerShardState::PerShardState(
37 int whoami,
38 ceph::mono_time startup_time,
39 PerfCounters *perf,
40 PerfCounters *recoverystate_perf,
41 crimson::os::FuturizedStore &store)
42 : whoami(whoami),
43 store(store.get_sharded_store()),
44 perf(perf), recoverystate_perf(recoverystate_perf),
45 throttler(crimson::common::local_conf()),
46 next_tid(
47 static_cast<ceph_tid_t>(seastar::this_shard_id()) <<
48 (std::numeric_limits<ceph_tid_t>::digits - 8)),
49 startup_time(startup_time)
50 {}
51
52 seastar::future<> PerShardState::dump_ops_in_flight(Formatter *f) const
53 {
54 registry.for_each_op([f](const auto &op) {
55 op.dump(f);
56 });
57 return seastar::now();
58 }
59
60 seastar::future<> PerShardState::stop_pgs()
61 {
62 assert_core();
63 return seastar::parallel_for_each(
64 pg_map.get_pgs(),
65 [](auto& p) {
66 return p.second->stop();
67 });
68 }
69
70 std::map<pg_t, pg_stat_t> PerShardState::get_pg_stats() const
71 {
72 assert_core();
73 std::map<pg_t, pg_stat_t> ret;
74 for (auto [pgid, pg] : pg_map.get_pgs()) {
75 if (pg->is_primary()) {
76 auto stats = pg->get_stats();
77 // todo: update reported_epoch,reported_seq,last_fresh
78 stats.reported_epoch = osdmap->get_epoch();
79 ret.emplace(pgid.pgid, std::move(stats));
80 }
81 }
82 return ret;
83 }
84
85 seastar::future<> PerShardState::broadcast_map_to_pgs(
86 ShardServices &shard_services,
87 epoch_t epoch)
88 {
89 assert_core();
90 auto &pgs = pg_map.get_pgs();
91 return seastar::parallel_for_each(
92 pgs.begin(), pgs.end(),
93 [=, &shard_services](auto& pg) {
94 return shard_services.start_operation<PGAdvanceMap>(
95 shard_services,
96 pg.second, epoch,
97 PeeringCtx{}, false).second;
98 });
99 }
100
101 Ref<PG> PerShardState::get_pg(spg_t pgid)
102 {
103 assert_core();
104 return pg_map.get_pg(pgid);
105 }
106
107 HeartbeatStampsRef PerShardState::get_hb_stamps(int peer)
108 {
109 assert_core();
110 auto [stamps, added] = heartbeat_stamps.try_emplace(peer);
111 if (added) {
112 stamps->second = ceph::make_ref<HeartbeatStamps>(peer);
113 }
114 return stamps->second;
115 }
116
117 OSDSingletonState::OSDSingletonState(
118 int whoami,
119 crimson::net::Messenger &cluster_msgr,
120 crimson::net::Messenger &public_msgr,
121 crimson::mon::Client &monc,
122 crimson::mgr::Client &mgrc)
123 : whoami(whoami),
124 osdmap_gate("OSDSingletonState::osdmap_gate"),
125 cluster_msgr(cluster_msgr),
126 public_msgr(public_msgr),
127 monc(monc),
128 mgrc(mgrc),
129 local_reserver(
130 &cct,
131 &finisher,
132 crimson::common::local_conf()->osd_max_backfills,
133 crimson::common::local_conf()->osd_min_recovery_priority),
134 remote_reserver(
135 &cct,
136 &finisher,
137 crimson::common::local_conf()->osd_max_backfills,
138 crimson::common::local_conf()->osd_min_recovery_priority),
139 snap_reserver(
140 &cct,
141 &finisher,
142 crimson::common::local_conf()->osd_max_trimming_pgs)
143 {
144 crimson::common::local_conf().add_observer(this);
145 osdmaps[0] = boost::make_local_shared<OSDMap>();
146
147 perf = build_osd_logger(&cct);
148 cct.get_perfcounters_collection()->add(perf);
149
150 recoverystate_perf = build_recoverystate_perf(&cct);
151 cct.get_perfcounters_collection()->add(recoverystate_perf);
152 }
153
154 seastar::future<> OSDSingletonState::send_to_osd(
155 int peer, MessageURef m, epoch_t from_epoch)
156 {
157 if (osdmap->is_down(peer)) {
158 logger().info("{}: osd.{} is_down", __func__, peer);
159 return seastar::now();
160 } else if (osdmap->get_info(peer).up_from > from_epoch) {
161 logger().info("{}: osd.{} {} > {}", __func__, peer,
162 osdmap->get_info(peer).up_from, from_epoch);
163 return seastar::now();
164 } else {
165 auto conn = cluster_msgr.connect(
166 osdmap->get_cluster_addrs(peer).front(), CEPH_ENTITY_TYPE_OSD);
167 return conn->send(std::move(m));
168 }
169 }
170
171 seastar::future<> OSDSingletonState::osdmap_subscribe(
172 version_t epoch, bool force_request)
173 {
174 logger().info("{}({})", __func__, epoch);
175 if (monc.sub_want_increment("osdmap", epoch, CEPH_SUBSCRIBE_ONETIME) ||
176 force_request) {
177 return monc.renew_subs();
178 } else {
179 return seastar::now();
180 }
181 }
182
183 void OSDSingletonState::queue_want_pg_temp(
184 pg_t pgid,
185 const vector<int>& want,
186 bool forced)
187 {
188 auto p = pg_temp_pending.find(pgid);
189 if (p == pg_temp_pending.end() ||
190 p->second.acting != want ||
191 forced) {
192 pg_temp_wanted[pgid] = {want, forced};
193 }
194 }
195
196 void OSDSingletonState::remove_want_pg_temp(pg_t pgid)
197 {
198 pg_temp_wanted.erase(pgid);
199 pg_temp_pending.erase(pgid);
200 }
201
202 void OSDSingletonState::requeue_pg_temp()
203 {
204 unsigned old_wanted = pg_temp_wanted.size();
205 unsigned old_pending = pg_temp_pending.size();
206 pg_temp_wanted.merge(pg_temp_pending);
207 pg_temp_pending.clear();
208 logger().debug(
209 "{}: {} + {} -> {}",
210 __func__ ,
211 old_wanted,
212 old_pending,
213 pg_temp_wanted.size());
214 }
215
216 seastar::future<> OSDSingletonState::send_pg_temp()
217 {
218 if (pg_temp_wanted.empty())
219 return seastar::now();
220 logger().debug("{}: {}", __func__, pg_temp_wanted);
221 MURef<MOSDPGTemp> ms[2] = {nullptr, nullptr};
222 for (auto& [pgid, pg_temp] : pg_temp_wanted) {
223 auto& m = ms[pg_temp.forced];
224 if (!m) {
225 m = crimson::make_message<MOSDPGTemp>(osdmap->get_epoch());
226 m->forced = pg_temp.forced;
227 }
228 m->pg_temp.emplace(pgid, pg_temp.acting);
229 }
230 pg_temp_pending.merge(pg_temp_wanted);
231 pg_temp_wanted.clear();
232 return seastar::parallel_for_each(std::begin(ms), std::end(ms),
233 [this](auto& m) {
234 if (m) {
235 return monc.send_message(std::move(m));
236 } else {
237 return seastar::now();
238 }
239 });
240 }
241
242 std::ostream& operator<<(
243 std::ostream& out,
244 const OSDSingletonState::pg_temp_t& pg_temp)
245 {
246 out << pg_temp.acting;
247 if (pg_temp.forced) {
248 out << " (forced)";
249 }
250 return out;
251 }
252
253 seastar::future<> OSDSingletonState::send_pg_created(pg_t pgid)
254 {
255 logger().debug(__func__);
256 auto o = get_osdmap();
257 ceph_assert(o->require_osd_release >= ceph_release_t::luminous);
258 pg_created.insert(pgid);
259 return monc.send_message(crimson::make_message<MOSDPGCreated>(pgid));
260 }
261
262 seastar::future<> OSDSingletonState::send_pg_created()
263 {
264 logger().debug(__func__);
265 auto o = get_osdmap();
266 ceph_assert(o->require_osd_release >= ceph_release_t::luminous);
267 return seastar::parallel_for_each(pg_created,
268 [this](auto &pgid) {
269 return monc.send_message(crimson::make_message<MOSDPGCreated>(pgid));
270 });
271 }
272
273 void OSDSingletonState::prune_pg_created()
274 {
275 logger().debug(__func__);
276 auto o = get_osdmap();
277 auto i = pg_created.begin();
278 while (i != pg_created.end()) {
279 auto p = o->get_pg_pool(i->pool());
280 if (!p || !p->has_flag(pg_pool_t::FLAG_CREATING)) {
281 logger().debug("{} pruning {}", __func__, *i);
282 i = pg_created.erase(i);
283 } else {
284 logger().debug(" keeping {}", __func__, *i);
285 ++i;
286 }
287 }
288 }
289
290 seastar::future<> OSDSingletonState::send_alive(const epoch_t want)
291 {
292 logger().info(
293 "{} want={} up_thru_wanted={}",
294 __func__,
295 want,
296 up_thru_wanted);
297
298 if (want > up_thru_wanted) {
299 up_thru_wanted = want;
300 } else {
301 logger().debug("{} want={} <= up_thru_wanted={}; skipping",
302 __func__, want, up_thru_wanted);
303 return seastar::now();
304 }
305 if (!osdmap->exists(whoami)) {
306 logger().warn("{} DNE", __func__);
307 return seastar::now();
308 } if (const epoch_t up_thru = osdmap->get_up_thru(whoami);
309 up_thru_wanted > up_thru) {
310 logger().debug("{} up_thru_wanted={} up_thru={}", __func__, want, up_thru);
311 return monc.send_message(
312 crimson::make_message<MOSDAlive>(osdmap->get_epoch(), want));
313 } else {
314 logger().debug("{} {} <= {}", __func__, want, osdmap->get_up_thru(whoami));
315 return seastar::now();
316 }
317 }
318
319 const char** OSDSingletonState::get_tracked_conf_keys() const
320 {
321 static const char* KEYS[] = {
322 "osd_max_backfills",
323 "osd_min_recovery_priority",
324 "osd_max_trimming_pgs",
325 nullptr
326 };
327 return KEYS;
328 }
329
330 void OSDSingletonState::handle_conf_change(
331 const ConfigProxy& conf,
332 const std::set <std::string> &changed)
333 {
334 if (changed.count("osd_max_backfills")) {
335 local_reserver.set_max(conf->osd_max_backfills);
336 remote_reserver.set_max(conf->osd_max_backfills);
337 }
338 if (changed.count("osd_min_recovery_priority")) {
339 local_reserver.set_min_priority(conf->osd_min_recovery_priority);
340 remote_reserver.set_min_priority(conf->osd_min_recovery_priority);
341 }
342 if (changed.count("osd_max_trimming_pgs")) {
343 snap_reserver.set_max(conf->osd_max_trimming_pgs);
344 }
345 }
346
347 seastar::future<OSDSingletonState::local_cached_map_t>
348 OSDSingletonState::get_local_map(epoch_t e)
349 {
350 // TODO: use LRU cache for managing osdmap, fallback to disk if we have to
351 if (auto found = osdmaps.find(e); found) {
352 return seastar::make_ready_future<local_cached_map_t>(std::move(found));
353 } else {
354 return load_map(e).then([e, this](std::unique_ptr<OSDMap> osdmap) {
355 return seastar::make_ready_future<local_cached_map_t>(
356 osdmaps.insert(e, std::move(osdmap)));
357 });
358 }
359 }
360
361 void OSDSingletonState::store_map_bl(
362 ceph::os::Transaction& t,
363 epoch_t e, bufferlist&& bl)
364 {
365 meta_coll->store_map(t, e, bl);
366 map_bl_cache.insert(e, std::move(bl));
367 }
368
369 seastar::future<bufferlist> OSDSingletonState::load_map_bl(
370 epoch_t e)
371 {
372 if (std::optional<bufferlist> found = map_bl_cache.find(e); found) {
373 return seastar::make_ready_future<bufferlist>(*found);
374 } else {
375 return meta_coll->load_map(e);
376 }
377 }
378
379 seastar::future<std::map<epoch_t, bufferlist>> OSDSingletonState::load_map_bls(
380 epoch_t first,
381 epoch_t last)
382 {
383 logger().debug("{} loading maps [{},{}]",
384 __func__, first, last);
385 ceph_assert(first <= last);
386 return seastar::map_reduce(boost::make_counting_iterator<epoch_t>(first),
387 boost::make_counting_iterator<epoch_t>(last + 1),
388 [this](epoch_t e) {
389 return load_map_bl(e).then([e](auto&& bl) {
390 return seastar::make_ready_future<std::pair<epoch_t, bufferlist>>(
391 std::make_pair(e, std::move(bl)));
392 });
393 },
394 std::map<epoch_t, bufferlist>{},
395 [](auto&& bls, auto&& epoch_bl) {
396 bls.emplace(std::move(epoch_bl));
397 return std::move(bls);
398 });
399 }
400
401 seastar::future<std::unique_ptr<OSDMap>> OSDSingletonState::load_map(epoch_t e)
402 {
403 auto o = std::make_unique<OSDMap>();
404 if (e > 0) {
405 return load_map_bl(e).then([o=std::move(o)](bufferlist bl) mutable {
406 o->decode(bl);
407 return seastar::make_ready_future<std::unique_ptr<OSDMap>>(std::move(o));
408 });
409 } else {
410 return seastar::make_ready_future<std::unique_ptr<OSDMap>>(std::move(o));
411 }
412 }
413
414 seastar::future<> OSDSingletonState::store_maps(ceph::os::Transaction& t,
415 epoch_t start, Ref<MOSDMap> m)
416 {
417 return seastar::do_for_each(
418 boost::make_counting_iterator(start),
419 boost::make_counting_iterator(m->get_last() + 1),
420 [&t, m, this](epoch_t e) {
421 if (auto p = m->maps.find(e); p != m->maps.end()) {
422 auto o = std::make_unique<OSDMap>();
423 o->decode(p->second);
424 logger().info("store_maps osdmap.{}", e);
425 store_map_bl(t, e, std::move(std::move(p->second)));
426 osdmaps.insert(e, std::move(o));
427 return seastar::now();
428 } else if (auto p = m->incremental_maps.find(e);
429 p != m->incremental_maps.end()) {
430 return load_map(e - 1).then([e, bl=p->second, &t, this](auto o) {
431 OSDMap::Incremental inc;
432 auto i = bl.cbegin();
433 inc.decode(i);
434 o->apply_incremental(inc);
435 bufferlist fbl;
436 o->encode(fbl, inc.encode_features | CEPH_FEATURE_RESERVED);
437 store_map_bl(t, e, std::move(fbl));
438 osdmaps.insert(e, std::move(o));
439 return seastar::now();
440 });
441 } else {
442 logger().error("MOSDMap lied about what maps it had?");
443 return seastar::now();
444 }
445 });
446 }
447
448 seastar::future<Ref<PG>> ShardServices::make_pg(
449 OSDMapService::cached_map_t create_map,
450 spg_t pgid,
451 bool do_create)
452 {
453 using ec_profile_t = std::map<std::string, std::string>;
454 auto get_pool_info_for_pg = [create_map, pgid, this] {
455 if (create_map->have_pg_pool(pgid.pool())) {
456 pg_pool_t pi = *create_map->get_pg_pool(pgid.pool());
457 std::string name = create_map->get_pool_name(pgid.pool());
458 ec_profile_t ec_profile;
459 if (pi.is_erasure()) {
460 ec_profile = create_map->get_erasure_code_profile(
461 pi.erasure_code_profile);
462 }
463 return seastar::make_ready_future<
464 std::tuple<pg_pool_t,std::string, ec_profile_t>
465 >(std::make_tuple(
466 std::move(pi),
467 std::move(name),
468 std::move(ec_profile)));
469 } else {
470 // pool was deleted; grab final pg_pool_t off disk.
471 return get_pool_info(pgid.pool());
472 }
473 };
474 auto get_collection = [pgid, do_create, this] {
475 const coll_t cid{pgid};
476 if (do_create) {
477 return get_store().create_new_collection(cid);
478 } else {
479 return get_store().open_collection(cid);
480 }
481 };
482 return seastar::when_all(
483 std::move(get_pool_info_for_pg),
484 std::move(get_collection)
485 ).then([pgid, create_map, this](auto &&ret) {
486 auto [pool, name, ec_profile] = std::move(std::get<0>(ret).get0());
487 auto coll = std::move(std::get<1>(ret).get0());
488 return seastar::make_ready_future<Ref<PG>>(
489 new PG{
490 pgid,
491 pg_shard_t{local_state.whoami, pgid.shard},
492 std::move(coll),
493 std::move(pool),
494 std::move(name),
495 create_map,
496 *this,
497 ec_profile});
498 });
499 }
500
501 seastar::future<Ref<PG>> ShardServices::handle_pg_create_info(
502 std::unique_ptr<PGCreateInfo> info) {
503 return seastar::do_with(
504 std::move(info),
505 [this](auto &info)
506 -> seastar::future<Ref<PG>> {
507 return get_map(info->epoch).then(
508 [&info, this](cached_map_t startmap)
509 -> seastar::future<std::tuple<Ref<PG>, cached_map_t>> {
510 const spg_t &pgid = info->pgid;
511 if (info->by_mon) {
512 int64_t pool_id = pgid.pgid.pool();
513 const pg_pool_t *pool = get_map()->get_pg_pool(pool_id);
514 if (!pool) {
515 logger().debug(
516 "{} ignoring pgid {}, pool dne",
517 __func__,
518 pgid);
519 local_state.pg_map.pg_creation_canceled(pgid);
520 return seastar::make_ready_future<
521 std::tuple<Ref<PG>, OSDMapService::cached_map_t>
522 >(std::make_tuple(Ref<PG>(), startmap));
523 } else if (!pool->is_crimson()) {
524 logger().debug(
525 "{} ignoring pgid {}, pool lacks crimson flag",
526 __func__,
527 pgid);
528 local_state.pg_map.pg_creation_canceled(pgid);
529 return seastar::make_ready_future<
530 std::tuple<Ref<PG>, OSDMapService::cached_map_t>
531 >(std::make_tuple(Ref<PG>(), startmap));
532 }
533 ceph_assert(get_map()->require_osd_release >=
534 ceph_release_t::octopus);
535 if (!pool->has_flag(pg_pool_t::FLAG_CREATING)) {
536 // this ensures we do not process old creating messages after the
537 // pool's initial pgs have been created (and pg are subsequently
538 // allowed to split or merge).
539 logger().debug(
540 "{} dropping {} create, pool does not have CREATING flag set",
541 __func__,
542 pgid);
543 local_state.pg_map.pg_creation_canceled(pgid);
544 return seastar::make_ready_future<
545 std::tuple<Ref<PG>, OSDMapService::cached_map_t>
546 >(std::make_tuple(Ref<PG>(), startmap));
547 }
548 }
549 return make_pg(
550 startmap, pgid, true
551 ).then([startmap=std::move(startmap)](auto pg) mutable {
552 return seastar::make_ready_future<
553 std::tuple<Ref<PG>, OSDMapService::cached_map_t>
554 >(std::make_tuple(std::move(pg), std::move(startmap)));
555 });
556 }).then([this, &info](auto &&ret)
557 ->seastar::future<Ref<PG>> {
558 auto [pg, startmap] = std::move(ret);
559 if (!pg)
560 return seastar::make_ready_future<Ref<PG>>(Ref<PG>());
561 const pg_pool_t* pp = startmap->get_pg_pool(info->pgid.pool());
562
563 int up_primary, acting_primary;
564 vector<int> up, acting;
565 startmap->pg_to_up_acting_osds(
566 info->pgid.pgid, &up, &up_primary, &acting, &acting_primary);
567
568 int role = startmap->calc_pg_role(
569 pg_shard_t(local_state.whoami, info->pgid.shard),
570 acting);
571
572 PeeringCtx rctx;
573 create_pg_collection(
574 rctx.transaction,
575 info->pgid,
576 info->pgid.get_split_bits(pp->get_pg_num()));
577 init_pg_ondisk(
578 rctx.transaction,
579 info->pgid,
580 pp);
581
582 pg->init(
583 role,
584 up,
585 up_primary,
586 acting,
587 acting_primary,
588 info->history,
589 info->past_intervals,
590 rctx.transaction);
591
592 return start_operation<PGAdvanceMap>(
593 *this, pg, get_map()->get_epoch(), std::move(rctx), true
594 ).second.then([pg=pg] {
595 return seastar::make_ready_future<Ref<PG>>(pg);
596 });
597 });
598 });
599 }
600
601
602 ShardServices::get_or_create_pg_ret
603 ShardServices::get_or_create_pg(
604 PGMap::PGCreationBlockingEvent::TriggerI&& trigger,
605 spg_t pgid,
606 epoch_t epoch,
607 std::unique_ptr<PGCreateInfo> info)
608 {
609 if (info) {
610 auto [fut, creating] = local_state.pg_map.wait_for_pg(
611 std::move(trigger), pgid);
612 if (!creating) {
613 local_state.pg_map.set_creating(pgid);
614 (void)handle_pg_create_info(
615 std::move(info));
616 }
617 return std::move(fut);
618 } else {
619 return get_or_create_pg_ret(
620 get_or_create_pg_ertr::ready_future_marker{},
621 local_state.pg_map.get_pg(pgid));
622 }
623 }
624
625 ShardServices::wait_for_pg_ret
626 ShardServices::wait_for_pg(
627 PGMap::PGCreationBlockingEvent::TriggerI&& trigger, spg_t pgid)
628 {
629 return local_state.pg_map.wait_for_pg(std::move(trigger), pgid).first;
630 }
631
632 seastar::future<Ref<PG>> ShardServices::load_pg(spg_t pgid)
633
634 {
635 logger().debug("{}: {}", __func__, pgid);
636
637 return seastar::do_with(PGMeta(get_store(), pgid), [](auto& pg_meta) {
638 return pg_meta.get_epoch();
639 }).then([this](epoch_t e) {
640 return get_map(e);
641 }).then([pgid, this](auto&& create_map) {
642 return make_pg(std::move(create_map), pgid, false);
643 }).then([this](Ref<PG> pg) {
644 return pg->read_state(&get_store()).then([pg] {
645 return seastar::make_ready_future<Ref<PG>>(std::move(pg));
646 });
647 }).handle_exception([pgid](auto ep) {
648 logger().info("pg {} saw exception on load {}", pgid, ep);
649 ceph_abort("Could not load pg" == 0);
650 return seastar::make_exception_future<Ref<PG>>(ep);
651 });
652 }
653
654 seastar::future<> ShardServices::dispatch_context_transaction(
655 crimson::os::CollectionRef col, PeeringCtx &ctx) {
656 if (ctx.transaction.empty()) {
657 logger().debug("ShardServices::dispatch_context_transaction: empty transaction");
658 return seastar::now();
659 }
660
661 logger().debug("ShardServices::dispatch_context_transaction: do_transaction ...");
662 auto ret = get_store().do_transaction(
663 col,
664 std::move(ctx.transaction));
665 ctx.reset_transaction();
666 return ret;
667 }
668
669 seastar::future<> ShardServices::dispatch_context_messages(
670 BufferedRecoveryMessages &&ctx)
671 {
672 auto ret = seastar::parallel_for_each(std::move(ctx.message_map),
673 [this](auto& osd_messages) {
674 auto& [peer, messages] = osd_messages;
675 logger().debug("dispatch_context_messages sending messages to {}", peer);
676 return seastar::parallel_for_each(
677 std::move(messages), [=, peer=peer, this](auto& m) {
678 return send_to_osd(peer, std::move(m), local_state.osdmap->get_epoch());
679 });
680 });
681 ctx.message_map.clear();
682 return ret;
683 }
684
685 seastar::future<> ShardServices::dispatch_context(
686 crimson::os::CollectionRef col,
687 PeeringCtx &&ctx)
688 {
689 ceph_assert(col || ctx.transaction.empty());
690 return seastar::when_all_succeed(
691 dispatch_context_messages(
692 BufferedRecoveryMessages{ctx}),
693 col ? dispatch_context_transaction(col, ctx) : seastar::now()
694 ).then_unpack([] {
695 return seastar::now();
696 });
697 }
698
699 seastar::future<> OSDSingletonState::send_incremental_map(
700 crimson::net::Connection &conn,
701 epoch_t first)
702 {
703 if (first >= superblock.oldest_map) {
704 return load_map_bls(
705 first, superblock.newest_map
706 ).then([this, &conn, first](auto&& bls) {
707 auto m = crimson::make_message<MOSDMap>(
708 monc.get_fsid(),
709 osdmap->get_encoding_features());
710 m->cluster_osdmap_trim_lower_bound = first;
711 m->newest_map = superblock.newest_map;
712 m->maps = std::move(bls);
713 return conn.send(std::move(m));
714 });
715 } else {
716 return load_map_bl(osdmap->get_epoch()
717 ).then([this, &conn](auto&& bl) mutable {
718 auto m = crimson::make_message<MOSDMap>(
719 monc.get_fsid(),
720 osdmap->get_encoding_features());
721 /* TODO: once we support the tracking of superblock's
722 * cluster_osdmap_trim_lower_bound, the MOSDMap should
723 * be populated with this value instead of the oldest_map.
724 * See: OSD::handle_osd_map for how classic updates the
725 * cluster's trim lower bound.
726 */
727 m->cluster_osdmap_trim_lower_bound = superblock.oldest_map;
728 m->newest_map = superblock.newest_map;
729 m->maps.emplace(osdmap->get_epoch(), std::move(bl));
730 return conn.send(std::move(m));
731 });
732 }
733 }
734
735 seastar::future<> OSDSingletonState::send_incremental_map_to_osd(
736 int osd,
737 epoch_t first)
738 {
739 if (osdmap->is_down(osd)) {
740 logger().info("{}: osd.{} is_down", __func__, osd);
741 return seastar::now();
742 } else {
743 auto conn = cluster_msgr.connect(
744 osdmap->get_cluster_addrs(osd).front(), CEPH_ENTITY_TYPE_OSD);
745 return send_incremental_map(*conn, first);
746 }
747 }
748
749 };