1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include <seastar/core/future.hh>
6 #include "include/types.h"
7 #include "common/Formatter.h"
8 #include "crimson/osd/pg.h"
9 #include "crimson/osd/osdmap_service.h"
10 #include "crimson/osd/shard_services.h"
11 #include "crimson/osd/osd_operations/pg_advance_map.h"
12 #include "crimson/osd/osd_operation_external_tracking.h"
13 #include "osd/PeeringState.h"
16 seastar::logger
& logger() {
17 return crimson::get_logger(ceph_subsys_osd
);
21 namespace crimson::osd
{
23 PGAdvanceMap::PGAdvanceMap(
24 ShardServices
&shard_services
, Ref
<PG
> pg
, epoch_t to
,
25 PeeringCtx
&&rctx
, bool do_init
)
26 : shard_services(shard_services
), pg(pg
), to(to
),
27 rctx(std::move(rctx
)), do_init(do_init
)
29 logger().debug("{}: created", *this);
32 PGAdvanceMap::~PGAdvanceMap() {}
34 void PGAdvanceMap::print(std::ostream
&lhs
) const
36 lhs
<< "PGAdvanceMap("
37 << "pg=" << pg
->get_pgid()
38 << " from=" << (from
? *from
: -1)
46 void PGAdvanceMap::dump_detail(Formatter
*f
) const
48 f
->open_object_section("PGAdvanceMap");
49 f
->dump_stream("pgid") << pg
->get_pgid();
51 f
->dump_int("from", *from
);
53 f
->dump_int("to", to
);
54 f
->dump_bool("do_init", do_init
);
58 PGPeeringPipeline
&PGAdvanceMap::peering_pp(PG
&pg
)
60 return pg
.peering_request_pg_pipeline
;
63 seastar::future
<> PGAdvanceMap::start()
65 using cached_map_t
= OSDMapService::cached_map_t
;
67 logger().debug("{}: start", *this);
71 peering_pp(*pg
).process
74 * PGAdvanceMap is scheduled at pg creation and when
75 * broadcasting new osdmaps to pgs. We are not able to serialize
76 * between the two different PGAdvanceMap callers since a new pg
77 * will get advanced to the latest osdmap at it's creation.
78 * As a result, we may need to adjust the PGAdvance operation
80 * See: https://tracker.ceph.com/issues/61744
82 from
= pg
->get_osdmap_epoch();
83 auto fut
= seastar::now();
85 fut
= pg
->handle_initialize(rctx
87 return pg
->handle_activate_map(rctx
);
90 return fut
.then([this] {
91 ceph_assert(std::cmp_less_equal(*from
, to
));
92 return seastar::do_for_each(
93 boost::make_counting_iterator(*from
+ 1),
94 boost::make_counting_iterator(to
+ 1),
95 [this](epoch_t next_epoch
) {
96 logger().debug("{}: start: getting map {}",
98 return shard_services
.get_map(next_epoch
).then(
99 [this] (cached_map_t
&& next_map
) {
100 logger().debug("{}: advancing map to {}",
101 *this, next_map
->get_epoch());
102 return pg
->handle_advance_map(next_map
, rctx
);
105 return pg
->handle_activate_map(rctx
).then([this] {
106 logger().debug("{}: map activated", *this);
108 shard_services
.pg_created(pg
->get_pgid(), pg
);
109 logger().info("PGAdvanceMap::start new pg {}", *pg
);
111 return seastar::when_all_succeed(
112 pg
->get_need_up_thru()
113 ? shard_services
.send_alive(
114 pg
->get_same_interval_since())
116 shard_services
.dispatch_context(
117 pg
->get_collection_ref(),
120 }).then_unpack([this] {
121 logger().debug("{}: sending pg temp", *this);
122 return shard_services
.send_pg_temp();
125 }).then([this, ref
=std::move(ref
)] {
126 logger().debug("{}: complete", *this);