1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "crimson/osd/osd_operations/snaptrim_event.h"
5 #include "crimson/osd/ops_executer.h"
6 #include "crimson/osd/pg.h"
7 #include <seastar/core/sleep.hh>
10 seastar::logger
& logger() {
11 return crimson::get_logger(ceph_subsys_osd
);
17 struct EventBackendRegistry
<osd::SnapTrimEvent
> {
18 static std::tuple
<> get_backends() {
24 struct EventBackendRegistry
<osd::SnapTrimObjSubEvent
> {
25 static std::tuple
<> get_backends() {
31 namespace crimson::osd
{
33 void SnapTrimEvent::SubOpBlocker::dump_detail(Formatter
*f
) const
35 f
->open_array_section("dependent_operations");
37 for (const auto &kv
: subops
) {
38 f
->dump_unsigned("op_id", kv
.first
);
44 template <class... Args
>
45 void SnapTrimEvent::SubOpBlocker::emplace_back(Args
&&... args
)
47 subops
.emplace_back(std::forward
<Args
>(args
)...);
50 SnapTrimEvent::remove_or_update_iertr::future
<>
51 SnapTrimEvent::SubOpBlocker::wait_completion()
53 return interruptor::do_for_each(subops
, [](auto&& kv
) {
54 return std::move(kv
.second
);
58 void SnapTrimEvent::print(std::ostream
&lhs
) const
60 lhs
<< "SnapTrimEvent("
61 << "pgid=" << pg
->get_pgid()
62 << " snapid=" << snapid
63 << " needs_pause=" << needs_pause
67 void SnapTrimEvent::dump_detail(Formatter
*f
) const
69 f
->open_object_section("SnapTrimEvent");
70 f
->dump_stream("pgid") << pg
->get_pgid();
74 SnapTrimEvent::snap_trim_ertr::future
<seastar::stop_iteration
>
75 SnapTrimEvent::start()
77 logger().debug("{}: {}", *this, __func__
);
79 pg
->get_shard_services(), pg
80 ).finally([ref
=IRef
{this}, this] {
81 logger().debug("{}: complete", *ref
);
82 return handle
.complete();
86 CommonPGPipeline
& SnapTrimEvent::pp()
88 return pg
->request_pg_pipeline
;
91 SnapTrimEvent::snap_trim_ertr::future
<seastar::stop_iteration
>
92 SnapTrimEvent::with_pg(
93 ShardServices
&shard_services
, Ref
<PG
> _pg
)
95 return interruptor::with_interruption([&shard_services
, this] {
96 return enter_stage
<interruptor
>(
98 ).then_interruptible([this] {
99 return with_blocking_event
<PGActivationBlocker::BlockingEvent
,
100 interruptor
>([this] (auto&& trigger
) {
101 return pg
->wait_for_active_blocker
.wait(std::move(trigger
));
103 }).then_interruptible([this] {
104 return enter_stage
<interruptor
>(
105 pp().recover_missing
);
106 }).then_interruptible([] {
107 //return do_recover_missing(pg, get_target_oid());
108 return seastar::now();
109 }).then_interruptible([this] {
110 return enter_stage
<interruptor
>(
112 }).then_interruptible([this] {
113 return enter_stage
<interruptor
>(
115 }).then_interruptible([&shard_services
, this] {
116 return interruptor::async([this] {
117 std::vector
<hobject_t
> to_trim
;
118 using crimson::common::local_conf
;
120 local_conf().get_val
<uint64_t>("osd_pg_max_concurrent_snap_trims");
121 // we need to look for at least 1 snaptrim, otherwise we'll misinterpret
122 // the ENOENT below and erase snapid.
123 int r
= snap_mapper
.get_next_objects_to_trim(
128 to_trim
.clear(); // paranoia
131 logger().error("{}: get_next_objects_to_trim returned {}",
132 *this, cpp_strerror(r
));
133 ceph_abort_msg("get_next_objects_to_trim returned an invalid code");
135 assert(!to_trim
.empty());
137 logger().debug("{}: async almost done line {}", *this, __LINE__
);
139 }).then_interruptible([&shard_services
, this] (const auto& to_trim
) {
140 if (to_trim
.empty()) {
141 // the legit ENOENT -> done
142 logger().debug("{}: to_trim is empty! Stopping iteration", *this);
143 return snap_trim_iertr::make_ready_future
<seastar::stop_iteration
>(
144 seastar::stop_iteration::yes
);
146 for (const auto& object
: to_trim
) {
147 logger().debug("{}: trimming {}", *this, object
);
148 auto [op
, fut
] = shard_services
.start_operation_may_interrupt
<
149 interruptor
, SnapTrimObjSubEvent
>(
153 subop_blocker
.emplace_back(
158 return enter_stage
<interruptor
>(
160 ).then_interruptible([this] {
161 logger().debug("{}: awaiting completion", *this);
162 return subop_blocker
.wait_completion();
163 }).safe_then_interruptible([this] {
165 return interruptor::now();
167 // let's know operators we're waiting
168 return enter_stage
<interruptor
>(
170 ).then_interruptible([this] {
171 using crimson::common::local_conf
;
172 const auto time_to_sleep
=
173 local_conf().template get_val
<double>("osd_snap_trim_sleep");
174 logger().debug("{}: time_to_sleep {}", *this, time_to_sleep
);
175 // TODO: this logic should be more sophisticated and distinguish
176 // between SSDs, HDDs and the hybrid case
177 return seastar::sleep(
178 std::chrono::milliseconds(std::lround(time_to_sleep
* 1000)));
180 }).safe_then_interruptible([this] {
181 logger().debug("{}: all completed", *this);
182 return snap_trim_iertr::make_ready_future
<seastar::stop_iteration
>(
183 seastar::stop_iteration::no
);
187 }, [this](std::exception_ptr eptr
) -> snap_trim_ertr::future
<seastar::stop_iteration
> {
188 logger().debug("{}: interrupted {}", *this, eptr
);
189 return crimson::ct_error::eagain::make();
194 CommonPGPipeline
& SnapTrimObjSubEvent::pp()
196 return pg
->request_pg_pipeline
;
199 SnapTrimObjSubEvent::remove_or_update_iertr::future
<>
200 SnapTrimObjSubEvent::start()
202 logger().debug("{}: start", *this);
204 pg
->get_shard_services(), pg
205 ).finally([ref
=IRef
{this}, this] {
206 logger().debug("{}: complete", *ref
);
207 return handle
.complete();
211 SnapTrimObjSubEvent::remove_or_update_iertr::future
<>
212 SnapTrimObjSubEvent::remove_clone(
213 ObjectContextRef obc
,
214 ObjectContextRef head_obc
,
215 ceph::os::Transaction
& txn
,
216 std::vector
<pg_log_entry_t
>& log_entries
218 const auto p
= std::find(
219 head_obc
->ssc
->snapset
.clones
.begin(),
220 head_obc
->ssc
->snapset
.clones
.end(),
222 if (p
== head_obc
->ssc
->snapset
.clones
.end()) {
223 logger().error("{}: Snap {} not in clones",
225 return crimson::ct_error::enoent::make();
227 assert(p
!= head_obc
->ssc
->snapset
.clones
.end());
228 snapid_t last
= coid
.snap
;
229 delta_stats
.num_bytes
-= head_obc
->ssc
->snapset
.get_clone_bytes(last
);
231 if (p
!= head_obc
->ssc
->snapset
.clones
.begin()) {
232 // not the oldest... merge overlap into next older clone
233 std::vector
<snapid_t
>::iterator n
= p
- 1;
234 hobject_t prev_coid
= coid
;
237 // does the classical OSD really need is_present_clone(prev_coid)?
238 delta_stats
.num_bytes
-= head_obc
->ssc
->snapset
.get_clone_bytes(*n
);
239 head_obc
->ssc
->snapset
.clone_overlap
[*n
].intersection_of(
240 head_obc
->ssc
->snapset
.clone_overlap
[*p
]);
241 delta_stats
.num_bytes
+= head_obc
->ssc
->snapset
.get_clone_bytes(*n
);
243 delta_stats
.num_objects
--;
244 if (obc
->obs
.oi
.is_dirty()) {
245 delta_stats
.num_objects_dirty
--;
247 if (obc
->obs
.oi
.is_omap()) {
248 delta_stats
.num_objects_omap
--;
250 if (obc
->obs
.oi
.is_whiteout()) {
251 logger().debug("{}: trimming whiteout on {}",
253 delta_stats
.num_whiteouts
--;
255 delta_stats
.num_object_clones
--;
257 obc
->obs
.exists
= false;
258 head_obc
->ssc
->snapset
.clones
.erase(p
);
259 head_obc
->ssc
->snapset
.clone_overlap
.erase(last
);
260 head_obc
->ssc
->snapset
.clone_size
.erase(last
);
261 head_obc
->ssc
->snapset
.clone_snaps
.erase(last
);
263 log_entries
.emplace_back(
265 pg_log_entry_t::DELETE
,
271 obc
->obs
.oi
.mtime
, // will be replaced in `apply_to()`
275 pg
->get_collection_ref()->get_cid(),
276 ghobject_t
{coid
, ghobject_t::NO_GEN
, shard_id_t::NO_SHARD
});
277 obc
->obs
.oi
= object_info_t(coid
);
278 return OpsExecuter::snap_map_remove(coid
, pg
->snap_mapper
, pg
->osdriver
, txn
);
281 void SnapTrimObjSubEvent::remove_head_whiteout(
282 ObjectContextRef obc
,
283 ObjectContextRef head_obc
,
284 ceph::os::Transaction
& txn
,
285 std::vector
<pg_log_entry_t
>& log_entries
287 // NOTE: this arguably constitutes minor interference with the
288 // tiering agent if this is a cache tier since a snap trim event
289 // is effectively evicting a whiteout we might otherwise want to
291 const auto head_oid
= coid
.get_head();
292 logger().info("{}: {} removing {}",
293 *this, coid
, head_oid
);
294 log_entries
.emplace_back(
296 pg_log_entry_t::DELETE
,
299 head_obc
->obs
.oi
.version
,
302 obc
->obs
.oi
.mtime
, // will be replaced in `apply_to()`
305 logger().info("{}: remove snap head", *this);
306 object_info_t
& oi
= head_obc
->obs
.oi
;
307 delta_stats
.num_objects
--;
309 delta_stats
.num_objects_dirty
--;
312 delta_stats
.num_objects_omap
--;
314 if (oi
.is_whiteout()) {
315 logger().debug("{}: trimming whiteout on {}",
317 delta_stats
.num_whiteouts
--;
319 head_obc
->obs
.exists
= false;
320 head_obc
->obs
.oi
= object_info_t(head_oid
);
321 txn
.remove(pg
->get_collection_ref()->get_cid(),
322 ghobject_t
{head_oid
, ghobject_t::NO_GEN
, shard_id_t::NO_SHARD
});
325 SnapTrimObjSubEvent::interruptible_future
<>
326 SnapTrimObjSubEvent::adjust_snaps(
327 ObjectContextRef obc
,
328 ObjectContextRef head_obc
,
329 const std::set
<snapid_t
>& new_snaps
,
330 ceph::os::Transaction
& txn
,
331 std::vector
<pg_log_entry_t
>& log_entries
333 head_obc
->ssc
->snapset
.clone_snaps
[coid
.snap
] =
334 std::vector
<snapid_t
>(new_snaps
.rbegin(), new_snaps
.rend());
336 // we still do a 'modify' event on this object just to trigger a
337 // snapmapper.update ... :(
338 obc
->obs
.oi
.prior_version
= obc
->obs
.oi
.version
;
339 obc
->obs
.oi
.version
= osd_op_p
.at_version
;
343 pg
->get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD
, nullptr));
345 pg
->get_collection_ref()->get_cid(),
346 ghobject_t
{coid
, ghobject_t::NO_GEN
, shard_id_t::NO_SHARD
},
349 log_entries
.emplace_back(
351 pg_log_entry_t::MODIFY
,
354 obc
->obs
.oi
.prior_version
,
360 return OpsExecuter::snap_map_modify(
361 coid
, new_snaps
, pg
->snap_mapper
, pg
->osdriver
, txn
);
364 void SnapTrimObjSubEvent::update_head(
365 ObjectContextRef obc
,
366 ObjectContextRef head_obc
,
367 ceph::os::Transaction
& txn
,
368 std::vector
<pg_log_entry_t
>& log_entries
370 const auto head_oid
= coid
.get_head();
371 logger().info("{}: writing updated snapset on {}, snapset is {}",
372 *this, head_oid
, head_obc
->ssc
->snapset
);
373 log_entries
.emplace_back(
375 pg_log_entry_t::MODIFY
,
378 head_obc
->obs
.oi
.version
,
385 head_obc
->obs
.oi
.prior_version
= head_obc
->obs
.oi
.version
;
386 head_obc
->obs
.oi
.version
= osd_op_p
.at_version
;
388 std::map
<std::string
, ceph::bufferlist
, std::less
<>> attrs
;
390 encode(head_obc
->ssc
->snapset
, bl
);
391 attrs
[SS_ATTR
] = std::move(bl
);
394 head_obc
->obs
.oi
.encode_no_oid(bl
,
395 pg
->get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD
, nullptr));
396 attrs
[OI_ATTR
] = std::move(bl
);
398 pg
->get_collection_ref()->get_cid(),
399 ghobject_t
{head_oid
, ghobject_t::NO_GEN
, shard_id_t::NO_SHARD
},
403 SnapTrimObjSubEvent::remove_or_update_iertr::future
<
404 SnapTrimObjSubEvent::remove_or_update_ret_t
>
405 SnapTrimObjSubEvent::remove_or_update(
406 ObjectContextRef obc
,
407 ObjectContextRef head_obc
)
409 auto citer
= head_obc
->ssc
->snapset
.clone_snaps
.find(coid
.snap
);
410 if (citer
== head_obc
->ssc
->snapset
.clone_snaps
.end()) {
411 logger().error("{}: No clone_snaps in snapset {} for object {}",
412 *this, head_obc
->ssc
->snapset
, coid
);
413 return crimson::ct_error::enoent::make();
415 const auto& old_snaps
= citer
->second
;
416 if (old_snaps
.empty()) {
417 logger().error("{}: no object info snaps for object {}",
419 return crimson::ct_error::enoent::make();
421 if (head_obc
->ssc
->snapset
.seq
== 0) {
422 logger().error("{}: no snapset.seq for object {}",
424 return crimson::ct_error::enoent::make();
426 const OSDMapRef
& osdmap
= pg
->get_osdmap();
427 std::set
<snapid_t
> new_snaps
;
428 for (const auto& old_snap
: old_snaps
) {
429 if (!osdmap
->in_removed_snaps_queue(pg
->get_info().pgid
.pgid
.pool(),
431 && old_snap
!= snap_to_trim
) {
432 new_snaps
.insert(old_snap
);
436 return seastar::do_with(ceph::os::Transaction
{}, [=, this](auto &txn
) {
437 std::vector
<pg_log_entry_t
> log_entries
{};
439 int64_t num_objects_before_trim
= delta_stats
.num_objects
;
440 osd_op_p
.at_version
= pg
->next_version();
441 auto ret
= remove_or_update_iertr::now();
442 if (new_snaps
.empty()) {
443 // remove clone from snapset
444 logger().info("{}: {} snaps {} -> {} ... deleting",
445 *this, coid
, old_snaps
, new_snaps
);
446 ret
= remove_clone(obc
, head_obc
, txn
, log_entries
);
448 // save adjusted snaps for this object
449 logger().info("{}: {} snaps {} -> {}",
450 *this, coid
, old_snaps
, new_snaps
);
451 ret
= adjust_snaps(obc
, head_obc
, new_snaps
, txn
, log_entries
);
453 return std::move(ret
).safe_then_interruptible(
454 [&txn
, obc
, num_objects_before_trim
, log_entries
=std::move(log_entries
), head_obc
=std::move(head_obc
), this]() mutable {
455 osd_op_p
.at_version
= pg
->next_version();
458 logger().debug("{}: {} new snapset {} on {}",
459 *this, coid
, head_obc
->ssc
->snapset
, head_obc
->obs
.oi
);
460 if (head_obc
->ssc
->snapset
.clones
.empty() && head_obc
->obs
.oi
.is_whiteout()) {
461 remove_head_whiteout(obc
, head_obc
, txn
, log_entries
);
463 update_head(obc
, head_obc
, txn
, log_entries
);
465 // Stats reporting - Set number of objects trimmed
466 if (num_objects_before_trim
> delta_stats
.num_objects
) {
467 //int64_t num_objects_trimmed =
468 // num_objects_before_trim - delta_stats.num_objects;
469 //add_objects_trimmed_count(num_objects_trimmed);
471 }).safe_then_interruptible(
472 [&txn
, log_entries
=std::move(log_entries
)] () mutable {
473 return remove_or_update_iertr::make_ready_future
<remove_or_update_ret_t
>(
474 std::make_pair(std::move(txn
), std::move(log_entries
)));
479 SnapTrimObjSubEvent::remove_or_update_iertr::future
<>
480 SnapTrimObjSubEvent::with_pg(
481 ShardServices
&shard_services
, Ref
<PG
> _pg
)
483 return enter_stage
<interruptor
>(
485 ).then_interruptible([this] {
486 return with_blocking_event
<PGActivationBlocker::BlockingEvent
,
487 interruptor
>([this] (auto&& trigger
) {
488 return pg
->wait_for_active_blocker
.wait(std::move(trigger
));
490 }).then_interruptible([this] {
491 return enter_stage
<interruptor
>(
492 pp().recover_missing
);
493 }).then_interruptible([] {
494 //return do_recover_missing(pg, get_target_oid());
495 return seastar::now();
496 }).then_interruptible([this] {
497 return enter_stage
<interruptor
>(
499 }).then_interruptible([this] {
500 logger().debug("{}: getting obc for {}", *this, coid
);
501 // end of commonality
502 // with_head_and_clone_obc lock both clone's and head's obcs
503 return pg
->obc_loader
.with_head_and_clone_obc
<RWState::RWWRITE
>(
505 [this](auto head_obc
, auto clone_obc
) {
506 logger().debug("{}: got clone_obc={}", *this, clone_obc
->get_oid());
507 return enter_stage
<interruptor
>(
509 ).then_interruptible(
510 [this,clone_obc
=std::move(clone_obc
), head_obc
=std::move(head_obc
)]() mutable {
511 logger().debug("{}: processing clone_obc={}", *this, clone_obc
->get_oid());
512 return remove_or_update(
514 ).safe_then_unpack_interruptible([clone_obc
, this]
515 (auto&& txn
, auto&& log_entries
) mutable {
516 auto [submitted
, all_completed
] = pg
->submit_transaction(
517 std::move(clone_obc
),
520 std::move(log_entries
));
521 return submitted
.then_interruptible(
522 [all_completed
=std::move(all_completed
), this] () mutable {
523 return enter_stage
<interruptor
>(
525 ).then_interruptible([all_completed
=std::move(all_completed
)] () mutable {
526 return std::move(all_completed
);
531 }).handle_error_interruptible(
532 remove_or_update_iertr::pass_further
{},
533 crimson::ct_error::assert_all
{"unexpected error in SnapTrimObjSubEvent"}
538 void SnapTrimObjSubEvent::print(std::ostream
&lhs
) const
540 lhs
<< "SnapTrimObjSubEvent("
542 << " snapid=" << snap_to_trim
546 void SnapTrimObjSubEvent::dump_detail(Formatter
*f
) const
548 f
->open_object_section("SnapTrimObjSubEvent");
549 f
->dump_stream("coid") << coid
;
553 } // namespace crimson::osd