]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/osd/backfill_state.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / crimson / osd / backfill_state.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <algorithm>
5 #include <boost/type_index.hpp>
6
7 #include "crimson/osd/backfill_state.h"
8
9 namespace {
10 seastar::logger& logger() {
11 return crimson::get_logger(ceph_subsys_osd);
12 }
13 }
14
15 namespace crimson::osd {
16
17 BackfillState::BackfillState(
18 BackfillState::BackfillListener& backfill_listener,
19 std::unique_ptr<BackfillState::PeeringFacade> peering_state,
20 std::unique_ptr<BackfillState::PGFacade> pg)
21 : backfill_machine(*this,
22 backfill_listener,
23 std::move(peering_state),
24 std::move(pg)),
25 progress_tracker(
26 std::make_unique<BackfillState::ProgressTracker>(backfill_machine))
27 {
28 logger().debug("{}:{}", __func__, __LINE__);
29 backfill_machine.initiate();
30 }
31
32 template <class S>
33 BackfillState::StateHelper<S>::StateHelper()
34 {
35 logger().debug("enter {}",
36 boost::typeindex::type_id<S>().pretty_name());
37 }
38
39 template <class S>
40 BackfillState::StateHelper<S>::~StateHelper()
41 {
42 logger().debug("exit {}",
43 boost::typeindex::type_id<S>().pretty_name());
44 }
45
46 BackfillState::~BackfillState() = default;
47
48 BackfillState::BackfillMachine::BackfillMachine(
49 BackfillState& backfill_state,
50 BackfillState::BackfillListener& backfill_listener,
51 std::unique_ptr<BackfillState::PeeringFacade> peering_state,
52 std::unique_ptr<BackfillState::PGFacade> pg)
53 : backfill_state(backfill_state),
54 backfill_listener(backfill_listener),
55 peering_state(std::move(peering_state)),
56 pg(std::move(pg))
57 {}
58
59 BackfillState::BackfillMachine::~BackfillMachine() = default;
60
61 BackfillState::Initial::Initial(my_context ctx)
62 : my_base(ctx)
63 {
64 backfill_state().last_backfill_started = peering_state().earliest_backfill();
65 logger().debug("{}: bft={} from {}",
66 __func__, peering_state().get_backfill_targets(),
67 backfill_state().last_backfill_started);
68 for (const auto& bt : peering_state().get_backfill_targets()) {
69 logger().debug("{}: target shard {} from {}",
70 __func__, bt, peering_state().get_peer_last_backfill(bt));
71 }
72 ceph_assert(peering_state().get_backfill_targets().size());
73 ceph_assert(!backfill_state().last_backfill_started.is_max());
74 }
75
76 boost::statechart::result
77 BackfillState::Initial::react(const BackfillState::Triggered& evt)
78 {
79 logger().debug("{}: backfill triggered", __func__);
80 ceph_assert(backfill_state().last_backfill_started == \
81 peering_state().earliest_backfill());
82 ceph_assert(peering_state().is_backfilling());
83 // initialize BackfillIntervals
84 for (const auto& bt : peering_state().get_backfill_targets()) {
85 backfill_state().peer_backfill_info[bt].reset(
86 peering_state().get_peer_last_backfill(bt));
87 }
88 backfill_state().backfill_info.reset(backfill_state().last_backfill_started);
89 if (Enqueuing::all_enqueued(peering_state(),
90 backfill_state().backfill_info,
91 backfill_state().peer_backfill_info)) {
92 logger().debug("{}: switching to Done state", __func__);
93 return transit<BackfillState::Done>();
94 } else {
95 logger().debug("{}: switching to Enqueuing state", __func__);
96 return transit<BackfillState::Enqueuing>();
97 }
98 }
99
100
101 // -- Enqueuing
102 void BackfillState::Enqueuing::maybe_update_range()
103 {
104 if (auto& primary_bi = backfill_state().backfill_info;
105 primary_bi.version >= pg().get_projected_last_update()) {
106 logger().info("{}: bi is current", __func__);
107 ceph_assert(primary_bi.version == pg().get_projected_last_update());
108 } else if (primary_bi.version >= peering_state().get_log_tail()) {
109 #if 0
110 if (peering_state().get_pg_log().get_log().empty() &&
111 pg().get_projected_log().empty()) {
112 /* Because we don't move log_tail on split, the log might be
113 * empty even if log_tail != last_update. However, the only
114 * way to get here with an empty log is if log_tail is actually
115 * eversion_t(), because otherwise the entry which changed
116 * last_update since the last scan would have to be present.
117 */
118 ceph_assert(primary_bi.version == eversion_t());
119 return;
120 }
121 #endif
122 logger().debug("{}: bi is old, ({}) can be updated with log to {}",
123 __func__,
124 primary_bi.version,
125 pg().get_projected_last_update());
126 logger().debug("{}: scanning pg log first", __func__);
127 peering_state().scan_log_after(primary_bi.version,
128 [&](const pg_log_entry_t& e) {
129 logger().debug("maybe_update_range(lambda): updating from version {}",
130 e.version);
131 if (e.soid >= primary_bi.begin && e.soid < primary_bi.end) {
132 if (e.is_update()) {
133 logger().debug("maybe_update_range(lambda): {} updated to ver {}",
134 e.soid, e.version);
135 primary_bi.objects.erase(e.soid);
136 primary_bi.objects.insert(std::make_pair(e.soid,
137 e.version));
138 } else if (e.is_delete()) {
139 logger().debug("maybe_update_range(lambda): {} removed",
140 e.soid);
141 primary_bi.objects.erase(e.soid);
142 }
143 }
144 });
145 primary_bi.version = pg().get_projected_last_update();
146 } else {
147 ceph_abort_msg(
148 "scan_range should have raised primary_bi.version past log_tail");
149 }
150 }
151
152 void BackfillState::Enqueuing::trim_backfill_infos()
153 {
154 for (const auto& bt : peering_state().get_backfill_targets()) {
155 backfill_state().peer_backfill_info[bt].trim_to(
156 std::max(peering_state().get_peer_last_backfill(bt),
157 backfill_state().last_backfill_started));
158 }
159 backfill_state().backfill_info.trim_to(
160 backfill_state().last_backfill_started);
161 }
162
163 /* static */ bool BackfillState::Enqueuing::all_enqueued(
164 const PeeringFacade& peering_state,
165 const BackfillInterval& backfill_info,
166 const std::map<pg_shard_t, BackfillInterval>& peer_backfill_info)
167 {
168 const bool all_local_enqueued = \
169 backfill_info.extends_to_end() && backfill_info.empty();
170 const bool all_peer_enqueued = std::all_of(
171 std::begin(peer_backfill_info),
172 std::end(peer_backfill_info),
173 [] (const auto& kv) {
174 [[maybe_unused]] const auto& [ shard, peer_backfill_info ] = kv;
175 return peer_backfill_info.extends_to_end() && peer_backfill_info.empty();
176 });
177 return all_local_enqueued && all_peer_enqueued;
178 }
179
180 hobject_t BackfillState::Enqueuing::earliest_peer_backfill(
181 const std::map<pg_shard_t, BackfillInterval>& peer_backfill_info) const
182 {
183 hobject_t e = hobject_t::get_max();
184 for (const pg_shard_t& bt : peering_state().get_backfill_targets()) {
185 const auto iter = peer_backfill_info.find(bt);
186 ceph_assert(iter != peer_backfill_info.end());
187 e = std::min(e, iter->second.begin);
188 }
189 return e;
190 }
191
192 bool BackfillState::Enqueuing::should_rescan_replicas(
193 const std::map<pg_shard_t, BackfillInterval>& peer_backfill_info,
194 const BackfillInterval& backfill_info) const
195 {
196 const auto& targets = peering_state().get_backfill_targets();
197 return std::any_of(std::begin(targets), std::end(targets),
198 [&] (const auto& bt) {
199 return ReplicasScanning::replica_needs_scan(peer_backfill_info.at(bt),
200 backfill_info);
201 });
202 }
203
204 bool BackfillState::Enqueuing::should_rescan_primary(
205 const std::map<pg_shard_t, BackfillInterval>& peer_backfill_info,
206 const BackfillInterval& backfill_info) const
207 {
208 return backfill_info.begin <= earliest_peer_backfill(peer_backfill_info) &&
209 !backfill_info.extends_to_end();
210 }
211
212 void BackfillState::Enqueuing::trim_backfilled_object_from_intervals(
213 BackfillState::Enqueuing::result_t&& result,
214 hobject_t& last_backfill_started,
215 std::map<pg_shard_t, BackfillInterval>& peer_backfill_info)
216 {
217 std::for_each(std::begin(result.pbi_targets), std::end(result.pbi_targets),
218 [&peer_backfill_info] (const auto& bt) {
219 peer_backfill_info.at(bt).pop_front();
220 });
221 last_backfill_started = std::move(result.new_last_backfill_started);
222 }
223
224 BackfillState::Enqueuing::result_t
225 BackfillState::Enqueuing::remove_on_peers(const hobject_t& check)
226 {
227 // set `new_last_backfill_started` to `check`
228 result_t result { {}, check };
229 for (const auto& bt : peering_state().get_backfill_targets()) {
230 const auto& pbi = backfill_state().peer_backfill_info.at(bt);
231 if (pbi.begin == check) {
232 result.pbi_targets.insert(bt);
233 const auto& version = pbi.objects.begin()->second;
234 backfill_state().progress_tracker->enqueue_drop(pbi.begin);
235 backfill_listener().enqueue_drop(bt, pbi.begin, version);
236 }
237 }
238 logger().debug("{}: BACKFILL removing {} from peers {}",
239 __func__, check, result.pbi_targets);
240 ceph_assert(!result.pbi_targets.empty());
241 return result;
242 }
243
244 BackfillState::Enqueuing::result_t
245 BackfillState::Enqueuing::update_on_peers(const hobject_t& check)
246 {
247 logger().debug("{}: check={}", __func__, check);
248 const auto& primary_bi = backfill_state().backfill_info;
249 result_t result { {}, primary_bi.begin };
250
251 for (const auto& bt : peering_state().get_backfill_targets()) {
252 const auto& peer_bi = backfill_state().peer_backfill_info.at(bt);
253
254 // Find all check peers that have the wrong version
255 if (const eversion_t& obj_v = primary_bi.objects.begin()->second;
256 check == primary_bi.begin && check == peer_bi.begin) {
257 if(peer_bi.objects.begin()->second != obj_v &&
258 backfill_state().progress_tracker->enqueue_push(primary_bi.begin)) {
259 backfill_listener().enqueue_push(primary_bi.begin, obj_v);
260 } else {
261 // it's fine, keep it! OR already recovering
262 }
263 result.pbi_targets.insert(bt);
264 } else {
265 // Only include peers that we've caught up to their backfill line
266 // otherwise, they only appear to be missing this object
267 // because their peer_bi.begin > backfill_info.begin.
268 if (primary_bi.begin > peering_state().get_peer_last_backfill(bt) &&
269 backfill_state().progress_tracker->enqueue_push(primary_bi.begin)) {
270 backfill_listener().enqueue_push(primary_bi.begin, obj_v);
271 }
272 }
273 }
274 return result;
275 }
276
277 bool BackfillState::Enqueuing::Enqueuing::all_emptied(
278 const BackfillInterval& local_backfill_info,
279 const std::map<pg_shard_t, BackfillInterval>& peer_backfill_info) const
280 {
281 const auto& targets = peering_state().get_backfill_targets();
282 const auto replicas_emptied =
283 std::all_of(std::begin(targets), std::end(targets),
284 [&] (const auto& bt) {
285 return peer_backfill_info.at(bt).empty();
286 });
287 return local_backfill_info.empty() && replicas_emptied;
288 }
289
290 BackfillState::Enqueuing::Enqueuing(my_context ctx)
291 : my_base(ctx)
292 {
293 auto& primary_bi = backfill_state().backfill_info;
294
295 // update our local interval to cope with recent changes
296 primary_bi.begin = backfill_state().last_backfill_started;
297 if (primary_bi.version < peering_state().get_log_tail()) {
298 // it might be that the OSD is so flooded with modifying operations
299 // that backfill will be spinning here over and over. For the sake
300 // of performance and complexity we don't synchronize with entire PG.
301 // similar can happen in classical OSD.
302 logger().warn("{}: bi is old, rescanning of local backfill_info",
303 __func__);
304 post_event(RequestPrimaryScanning{});
305 return;
306 } else {
307 maybe_update_range();
308 }
309 trim_backfill_infos();
310
311 while (!all_emptied(primary_bi, backfill_state().peer_backfill_info)) {
312 if (!backfill_listener().budget_available()) {
313 post_event(RequestWaiting{});
314 return;
315 } else if (should_rescan_replicas(backfill_state().peer_backfill_info,
316 primary_bi)) {
317 // Count simultaneous scans as a single op and let those complete
318 post_event(RequestReplicasScanning{});
319 return;
320 }
321 // Get object within set of peers to operate on and the set of targets
322 // for which that object applies.
323 if (const hobject_t check = \
324 earliest_peer_backfill(backfill_state().peer_backfill_info);
325 check < primary_bi.begin) {
326 // Don't increment ops here because deletions
327 // are cheap and not replied to unlike real recovery_ops,
328 // and we can't increment ops without requeueing ourself
329 // for recovery.
330 auto result = remove_on_peers(check);
331 trim_backfilled_object_from_intervals(std::move(result),
332 backfill_state().last_backfill_started,
333 backfill_state().peer_backfill_info);
334 } else {
335 auto result = update_on_peers(check);
336 trim_backfilled_object_from_intervals(std::move(result),
337 backfill_state().last_backfill_started,
338 backfill_state().peer_backfill_info);
339 primary_bi.pop_front();
340 }
341 backfill_listener().maybe_flush();
342 }
343
344 if (should_rescan_primary(backfill_state().peer_backfill_info,
345 primary_bi)) {
346 // need to grab one another chunk of the object namespace and restart
347 // the queueing.
348 logger().debug("{}: reached end for current local chunk",
349 __func__);
350 post_event(RequestPrimaryScanning{});
351 } else if (backfill_state().progress_tracker->tracked_objects_completed()) {
352 post_event(RequestDone{});
353 } else {
354 logger().debug("{}: reached end for both local and all peers "
355 "but still has in-flight operations", __func__);
356 post_event(RequestWaiting{});
357 }
358 }
359
360 // -- PrimaryScanning
361 BackfillState::PrimaryScanning::PrimaryScanning(my_context ctx)
362 : my_base(ctx)
363 {
364 backfill_state().backfill_info.version = peering_state().get_last_update();
365 backfill_listener().request_primary_scan(
366 backfill_state().backfill_info.begin);
367 }
368
369 boost::statechart::result
370 BackfillState::PrimaryScanning::react(PrimaryScanned evt)
371 {
372 logger().debug("{}", __func__);
373 backfill_state().backfill_info = std::move(evt.result);
374 return transit<Enqueuing>();
375 }
376
377 boost::statechart::result
378 BackfillState::PrimaryScanning::react(ObjectPushed evt)
379 {
380 logger().debug("PrimaryScanning::react() on ObjectPushed; evt.object={}",
381 evt.object);
382 backfill_state().progress_tracker->complete_to(evt.object, evt.stat);
383 return discard_event();
384 }
385
386 // -- ReplicasScanning
387 bool BackfillState::ReplicasScanning::replica_needs_scan(
388 const BackfillInterval& replica_backfill_info,
389 const BackfillInterval& local_backfill_info)
390 {
391 return replica_backfill_info.empty() && \
392 replica_backfill_info.begin <= local_backfill_info.begin && \
393 !replica_backfill_info.extends_to_end();
394 }
395
396 BackfillState::ReplicasScanning::ReplicasScanning(my_context ctx)
397 : my_base(ctx)
398 {
399 for (const auto& bt : peering_state().get_backfill_targets()) {
400 if (const auto& pbi = backfill_state().peer_backfill_info.at(bt);
401 replica_needs_scan(pbi, backfill_state().backfill_info)) {
402 logger().debug("{}: scanning peer osd.{} from {}",
403 __func__, bt, pbi.end);
404 backfill_listener().request_replica_scan(bt, pbi.end, hobject_t{});
405
406 ceph_assert(waiting_on_backfill.find(bt) == \
407 waiting_on_backfill.end());
408 waiting_on_backfill.insert(bt);
409 }
410 }
411 ceph_assert(!waiting_on_backfill.empty());
412 // TODO: start_recovery_op(hobject_t::get_max()); // XXX: was pbi.end
413 }
414
415 #if 0
416 BackfillState::ReplicasScanning::~ReplicasScanning()
417 {
418 // TODO: finish_recovery_op(hobject_t::get_max());
419 }
420 #endif
421
422 boost::statechart::result
423 BackfillState::ReplicasScanning::react(ReplicaScanned evt)
424 {
425 logger().debug("{}: got scan result from osd={}, result={}",
426 __func__, evt.from, evt.result);
427 // TODO: maybe we'll be able to move waiting_on_backfill from
428 // the machine to the state.
429 ceph_assert(peering_state().is_backfill_target(evt.from));
430 if (waiting_on_backfill.erase(evt.from)) {
431 backfill_state().peer_backfill_info[evt.from] = std::move(evt.result);
432 if (waiting_on_backfill.empty()) {
433 ceph_assert(backfill_state().peer_backfill_info.size() == \
434 peering_state().get_backfill_targets().size());
435 return transit<Enqueuing>();
436 }
437 } else {
438 // we canceled backfill for a while due to a too full, and this
439 // is an extra response from a non-too-full peer
440 logger().debug("{}: canceled backfill (too full?)", __func__);
441 }
442 return discard_event();
443 }
444
445 boost::statechart::result
446 BackfillState::ReplicasScanning::react(ObjectPushed evt)
447 {
448 logger().debug("ReplicasScanning::react() on ObjectPushed; evt.object={}",
449 evt.object);
450 backfill_state().progress_tracker->complete_to(evt.object, evt.stat);
451 return discard_event();
452 }
453
454
455 // -- Waiting
456 BackfillState::Waiting::Waiting(my_context ctx)
457 : my_base(ctx)
458 {
459 }
460
461 boost::statechart::result
462 BackfillState::Waiting::react(ObjectPushed evt)
463 {
464 logger().debug("Waiting::react() on ObjectPushed; evt.object={}",
465 evt.object);
466 backfill_state().progress_tracker->complete_to(evt.object, evt.stat);
467 if (!Enqueuing::all_enqueued(peering_state(),
468 backfill_state().backfill_info,
469 backfill_state().peer_backfill_info)) {
470 return transit<Enqueuing>();
471 } else if (backfill_state().progress_tracker->tracked_objects_completed()) {
472 return transit<Done>();
473 } else {
474 // we still have something to wait on
475 logger().debug("Waiting::react() on ObjectPushed; still waiting");
476 return discard_event();
477 }
478 }
479
480 // -- Done
481 BackfillState::Done::Done(my_context ctx)
482 : my_base(ctx)
483 {
484 logger().info("{}: backfill is done", __func__);
485 backfill_listener().backfilled();
486 }
487
488 // -- Crashed
489 BackfillState::Crashed::Crashed()
490 {
491 ceph_abort_msg("{}: this should not happen");
492 }
493
494 // ProgressTracker is an intermediary between the BackfillListener and
495 // BackfillMachine + its states. All requests to push or drop an object
496 // are directed through it. The same happens with notifications about
497 // completing given operations which are generated by BackfillListener
498 // and dispatched as i.e. ObjectPushed events.
499 // This allows ProgressTacker to track the list of in-flight operations
500 // which is essential to make the decision whether the entire machine
501 // should switch from Waiting to Done keep in Waiting.
502 // ProgressTracker also coordinates .last_backfill_started and stats
503 // updates.
504 bool BackfillState::ProgressTracker::tracked_objects_completed() const
505 {
506 return registry.empty();
507 }
508
509 bool BackfillState::ProgressTracker::enqueue_push(const hobject_t& obj)
510 {
511 [[maybe_unused]] const auto [it, first_seen] = registry.try_emplace(
512 obj, registry_item_t{op_stage_t::enqueued_push, std::nullopt});
513 return first_seen;
514 }
515
516 void BackfillState::ProgressTracker::enqueue_drop(const hobject_t& obj)
517 {
518 registry.try_emplace(
519 obj, registry_item_t{op_stage_t::enqueued_drop, pg_stat_t{}});
520 }
521
522 void BackfillState::ProgressTracker::complete_to(
523 const hobject_t& obj,
524 const pg_stat_t& stats)
525 {
526 logger().debug("{}: obj={}",
527 __func__, obj);
528 if (auto completion_iter = registry.find(obj);
529 completion_iter != std::end(registry)) {
530 completion_iter->second = \
531 registry_item_t{ op_stage_t::completed_push, stats };
532 } else {
533 ceph_abort_msg("completing untracked object shall not happen");
534 }
535 for (auto it = std::begin(registry);
536 it != std::end(registry) &&
537 it->second.stage != op_stage_t::enqueued_push;
538 it = registry.erase(it)) {
539 auto& [soid, item] = *it;
540 assert(item.stats);
541 peering_state().update_complete_backfill_object_stats(
542 soid,
543 *item.stats);
544 }
545 if (Enqueuing::all_enqueued(peering_state(),
546 backfill_state().backfill_info,
547 backfill_state().peer_backfill_info) &&
548 tracked_objects_completed()) {
549 backfill_state().last_backfill_started = hobject_t::get_max();
550 backfill_listener().update_peers_last_backfill(hobject_t::get_max());
551 } else {
552 backfill_listener().update_peers_last_backfill(obj);
553 }
554 }
555
556 } // namespace crimson::osd