]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/osd/osd_operations/snaptrim_event.cc
e63e78481ac1b60fa840f5a48ac24a7f658335c3
[ceph.git] / ceph / src / crimson / osd / osd_operations / snaptrim_event.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "crimson/osd/osd_operations/snaptrim_event.h"
5 #include "crimson/osd/ops_executer.h"
6 #include "crimson/osd/pg.h"
7 #include <seastar/core/sleep.hh>
8
9 namespace {
10 seastar::logger& logger() {
11 return crimson::get_logger(ceph_subsys_osd);
12 }
13 }
14
15 namespace crimson {
16 template <>
17 struct EventBackendRegistry<osd::SnapTrimEvent> {
18 static std::tuple<> get_backends() {
19 return {};
20 }
21 };
22
23 template <>
24 struct EventBackendRegistry<osd::SnapTrimObjSubEvent> {
25 static std::tuple<> get_backends() {
26 return {};
27 }
28 };
29 }
30
31 namespace crimson::osd {
32
33 void SnapTrimEvent::SubOpBlocker::dump_detail(Formatter *f) const
34 {
35 f->open_array_section("dependent_operations");
36 {
37 for (const auto &kv : subops) {
38 f->dump_unsigned("op_id", kv.first);
39 }
40 }
41 f->close_section();
42 }
43
44 template <class... Args>
45 void SnapTrimEvent::SubOpBlocker::emplace_back(Args&&... args)
46 {
47 subops.emplace_back(std::forward<Args>(args)...);
48 };
49
50 SnapTrimEvent::remove_or_update_iertr::future<>
51 SnapTrimEvent::SubOpBlocker::wait_completion()
52 {
53 return interruptor::do_for_each(subops, [](auto&& kv) {
54 return std::move(kv.second);
55 });
56 }
57
58 void SnapTrimEvent::print(std::ostream &lhs) const
59 {
60 lhs << "SnapTrimEvent("
61 << "pgid=" << pg->get_pgid()
62 << " snapid=" << snapid
63 << " needs_pause=" << needs_pause
64 << ")";
65 }
66
67 void SnapTrimEvent::dump_detail(Formatter *f) const
68 {
69 f->open_object_section("SnapTrimEvent");
70 f->dump_stream("pgid") << pg->get_pgid();
71 f->close_section();
72 }
73
74 SnapTrimEvent::snap_trim_ertr::future<seastar::stop_iteration>
75 SnapTrimEvent::start()
76 {
77 logger().debug("{}: {}", *this, __func__);
78 return with_pg(
79 pg->get_shard_services(), pg
80 ).finally([ref=IRef{this}, this] {
81 logger().debug("{}: complete", *ref);
82 return handle.complete();
83 });
84 }
85
86 CommonPGPipeline& SnapTrimEvent::pp()
87 {
88 return pg->request_pg_pipeline;
89 }
90
91 SnapTrimEvent::snap_trim_ertr::future<seastar::stop_iteration>
92 SnapTrimEvent::with_pg(
93 ShardServices &shard_services, Ref<PG> _pg)
94 {
95 return interruptor::with_interruption([&shard_services, this] {
96 return enter_stage<interruptor>(
97 pp().wait_for_active
98 ).then_interruptible([this] {
99 return with_blocking_event<PGActivationBlocker::BlockingEvent,
100 interruptor>([this] (auto&& trigger) {
101 return pg->wait_for_active_blocker.wait(std::move(trigger));
102 });
103 }).then_interruptible([this] {
104 return enter_stage<interruptor>(
105 pp().recover_missing);
106 }).then_interruptible([] {
107 //return do_recover_missing(pg, get_target_oid());
108 return seastar::now();
109 }).then_interruptible([this] {
110 return enter_stage<interruptor>(
111 pp().get_obc);
112 }).then_interruptible([this] {
113 return enter_stage<interruptor>(
114 pp().process);
115 }).then_interruptible([&shard_services, this] {
116 return interruptor::async([this] {
117 std::vector<hobject_t> to_trim;
118 using crimson::common::local_conf;
119 const auto max =
120 local_conf().get_val<uint64_t>("osd_pg_max_concurrent_snap_trims");
121 // we need to look for at least 1 snaptrim, otherwise we'll misinterpret
122 // the ENOENT below and erase snapid.
123 int r = snap_mapper.get_next_objects_to_trim(
124 snapid,
125 max,
126 &to_trim);
127 if (r == -ENOENT) {
128 to_trim.clear(); // paranoia
129 return to_trim;
130 } else if (r != 0) {
131 logger().error("{}: get_next_objects_to_trim returned {}",
132 *this, cpp_strerror(r));
133 ceph_abort_msg("get_next_objects_to_trim returned an invalid code");
134 } else {
135 assert(!to_trim.empty());
136 }
137 logger().debug("{}: async almost done line {}", *this, __LINE__);
138 return to_trim;
139 }).then_interruptible([&shard_services, this] (const auto& to_trim) {
140 if (to_trim.empty()) {
141 // the legit ENOENT -> done
142 logger().debug("{}: to_trim is empty! Stopping iteration", *this);
143 return snap_trim_iertr::make_ready_future<seastar::stop_iteration>(
144 seastar::stop_iteration::yes);
145 }
146 for (const auto& object : to_trim) {
147 logger().debug("{}: trimming {}", *this, object);
148 auto [op, fut] = shard_services.start_operation_may_interrupt<
149 interruptor, SnapTrimObjSubEvent>(
150 pg,
151 object,
152 snapid);
153 subop_blocker.emplace_back(
154 op->get_id(),
155 std::move(fut)
156 );
157 }
158 return enter_stage<interruptor>(
159 wait_subop
160 ).then_interruptible([this] {
161 logger().debug("{}: awaiting completion", *this);
162 return subop_blocker.wait_completion();
163 }).safe_then_interruptible([this] {
164 if (!needs_pause) {
165 return interruptor::now();
166 }
167 // let's know operators we're waiting
168 return enter_stage<interruptor>(
169 wait_trim_timer
170 ).then_interruptible([this] {
171 using crimson::common::local_conf;
172 const auto time_to_sleep =
173 local_conf().template get_val<double>("osd_snap_trim_sleep");
174 logger().debug("{}: time_to_sleep {}", *this, time_to_sleep);
175 // TODO: this logic should be more sophisticated and distinguish
176 // between SSDs, HDDs and the hybrid case
177 return seastar::sleep(
178 std::chrono::milliseconds(std::lround(time_to_sleep * 1000)));
179 });
180 }).safe_then_interruptible([this] {
181 logger().debug("{}: all completed", *this);
182 return snap_trim_iertr::make_ready_future<seastar::stop_iteration>(
183 seastar::stop_iteration::no);
184 });
185 });
186 });
187 }, [this](std::exception_ptr eptr) -> snap_trim_ertr::future<seastar::stop_iteration> {
188 logger().debug("{}: interrupted {}", *this, eptr);
189 return crimson::ct_error::eagain::make();
190 }, pg);
191 }
192
193
194 CommonPGPipeline& SnapTrimObjSubEvent::pp()
195 {
196 return pg->request_pg_pipeline;
197 }
198
199 SnapTrimObjSubEvent::remove_or_update_iertr::future<>
200 SnapTrimObjSubEvent::start()
201 {
202 logger().debug("{}: start", *this);
203 return with_pg(
204 pg->get_shard_services(), pg
205 ).finally([ref=IRef{this}, this] {
206 logger().debug("{}: complete", *ref);
207 return handle.complete();
208 });
209 }
210
211 SnapTrimObjSubEvent::remove_or_update_iertr::future<>
212 SnapTrimObjSubEvent::remove_clone(
213 ObjectContextRef obc,
214 ObjectContextRef head_obc,
215 ceph::os::Transaction& txn,
216 std::vector<pg_log_entry_t>& log_entries
217 ) {
218 const auto p = std::find(
219 head_obc->ssc->snapset.clones.begin(),
220 head_obc->ssc->snapset.clones.end(),
221 coid.snap);
222 if (p == head_obc->ssc->snapset.clones.end()) {
223 logger().error("{}: Snap {} not in clones",
224 *this, coid.snap);
225 return crimson::ct_error::enoent::make();
226 }
227 assert(p != head_obc->ssc->snapset.clones.end());
228 snapid_t last = coid.snap;
229 delta_stats.num_bytes -= head_obc->ssc->snapset.get_clone_bytes(last);
230
231 if (p != head_obc->ssc->snapset.clones.begin()) {
232 // not the oldest... merge overlap into next older clone
233 std::vector<snapid_t>::iterator n = p - 1;
234 hobject_t prev_coid = coid;
235 prev_coid.snap = *n;
236
237 // does the classical OSD really need is_present_clone(prev_coid)?
238 delta_stats.num_bytes -= head_obc->ssc->snapset.get_clone_bytes(*n);
239 head_obc->ssc->snapset.clone_overlap[*n].intersection_of(
240 head_obc->ssc->snapset.clone_overlap[*p]);
241 delta_stats.num_bytes += head_obc->ssc->snapset.get_clone_bytes(*n);
242 }
243 delta_stats.num_objects--;
244 if (obc->obs.oi.is_dirty()) {
245 delta_stats.num_objects_dirty--;
246 }
247 if (obc->obs.oi.is_omap()) {
248 delta_stats.num_objects_omap--;
249 }
250 if (obc->obs.oi.is_whiteout()) {
251 logger().debug("{}: trimming whiteout on {}",
252 *this, coid);
253 delta_stats.num_whiteouts--;
254 }
255 delta_stats.num_object_clones--;
256
257 obc->obs.exists = false;
258 head_obc->ssc->snapset.clones.erase(p);
259 head_obc->ssc->snapset.clone_overlap.erase(last);
260 head_obc->ssc->snapset.clone_size.erase(last);
261 head_obc->ssc->snapset.clone_snaps.erase(last);
262
263 log_entries.emplace_back(
264 pg_log_entry_t{
265 pg_log_entry_t::DELETE,
266 coid,
267 osd_op_p.at_version,
268 obc->obs.oi.version,
269 0,
270 osd_reqid_t(),
271 obc->obs.oi.mtime, // will be replaced in `apply_to()`
272 0}
273 );
274 txn.remove(
275 pg->get_collection_ref()->get_cid(),
276 ghobject_t{coid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD});
277 obc->obs.oi = object_info_t(coid);
278 return OpsExecuter::snap_map_remove(coid, pg->snap_mapper, pg->osdriver, txn);
279 }
280
281 void SnapTrimObjSubEvent::remove_head_whiteout(
282 ObjectContextRef obc,
283 ObjectContextRef head_obc,
284 ceph::os::Transaction& txn,
285 std::vector<pg_log_entry_t>& log_entries
286 ) {
287 // NOTE: this arguably constitutes minor interference with the
288 // tiering agent if this is a cache tier since a snap trim event
289 // is effectively evicting a whiteout we might otherwise want to
290 // keep around.
291 const auto head_oid = coid.get_head();
292 logger().info("{}: {} removing {}",
293 *this, coid, head_oid);
294 log_entries.emplace_back(
295 pg_log_entry_t{
296 pg_log_entry_t::DELETE,
297 head_oid,
298 osd_op_p.at_version,
299 head_obc->obs.oi.version,
300 0,
301 osd_reqid_t(),
302 obc->obs.oi.mtime, // will be replaced in `apply_to()`
303 0}
304 );
305 logger().info("{}: remove snap head", *this);
306 object_info_t& oi = head_obc->obs.oi;
307 delta_stats.num_objects--;
308 if (oi.is_dirty()) {
309 delta_stats.num_objects_dirty--;
310 }
311 if (oi.is_omap()) {
312 delta_stats.num_objects_omap--;
313 }
314 if (oi.is_whiteout()) {
315 logger().debug("{}: trimming whiteout on {}",
316 *this, oi.soid);
317 delta_stats.num_whiteouts--;
318 }
319 head_obc->obs.exists = false;
320 head_obc->obs.oi = object_info_t(head_oid);
321 txn.remove(pg->get_collection_ref()->get_cid(),
322 ghobject_t{head_oid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD});
323 }
324
325 SnapTrimObjSubEvent::interruptible_future<>
326 SnapTrimObjSubEvent::adjust_snaps(
327 ObjectContextRef obc,
328 ObjectContextRef head_obc,
329 const std::set<snapid_t>& new_snaps,
330 ceph::os::Transaction& txn,
331 std::vector<pg_log_entry_t>& log_entries
332 ) {
333 head_obc->ssc->snapset.clone_snaps[coid.snap] =
334 std::vector<snapid_t>(new_snaps.rbegin(), new_snaps.rend());
335
336 // we still do a 'modify' event on this object just to trigger a
337 // snapmapper.update ... :(
338 obc->obs.oi.prior_version = obc->obs.oi.version;
339 obc->obs.oi.version = osd_op_p.at_version;
340 ceph::bufferlist bl;
341 encode(obc->obs.oi,
342 bl,
343 pg->get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD, nullptr));
344 txn.setattr(
345 pg->get_collection_ref()->get_cid(),
346 ghobject_t{coid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD},
347 OI_ATTR,
348 bl);
349 log_entries.emplace_back(
350 pg_log_entry_t{
351 pg_log_entry_t::MODIFY,
352 coid,
353 obc->obs.oi.version,
354 obc->obs.oi.prior_version,
355 0,
356 osd_reqid_t(),
357 obc->obs.oi.mtime,
358 0}
359 );
360 return OpsExecuter::snap_map_modify(
361 coid, new_snaps, pg->snap_mapper, pg->osdriver, txn);
362 }
363
364 void SnapTrimObjSubEvent::update_head(
365 ObjectContextRef obc,
366 ObjectContextRef head_obc,
367 ceph::os::Transaction& txn,
368 std::vector<pg_log_entry_t>& log_entries
369 ) {
370 const auto head_oid = coid.get_head();
371 logger().info("{}: writing updated snapset on {}, snapset is {}",
372 *this, head_oid, head_obc->ssc->snapset);
373 log_entries.emplace_back(
374 pg_log_entry_t{
375 pg_log_entry_t::MODIFY,
376 head_oid,
377 osd_op_p.at_version,
378 head_obc->obs.oi.version,
379 0,
380 osd_reqid_t(),
381 obc->obs.oi.mtime,
382 0}
383 );
384
385 head_obc->obs.oi.prior_version = head_obc->obs.oi.version;
386 head_obc->obs.oi.version = osd_op_p.at_version;
387
388 std::map<std::string, ceph::bufferlist, std::less<>> attrs;
389 ceph::bufferlist bl;
390 encode(head_obc->ssc->snapset, bl);
391 attrs[SS_ATTR] = std::move(bl);
392
393 bl.clear();
394 head_obc->obs.oi.encode_no_oid(bl,
395 pg->get_osdmap()->get_features(CEPH_ENTITY_TYPE_OSD, nullptr));
396 attrs[OI_ATTR] = std::move(bl);
397 txn.setattrs(
398 pg->get_collection_ref()->get_cid(),
399 ghobject_t{head_oid, ghobject_t::NO_GEN, shard_id_t::NO_SHARD},
400 attrs);
401 }
402
403 SnapTrimObjSubEvent::remove_or_update_iertr::future<
404 SnapTrimObjSubEvent::remove_or_update_ret_t>
405 SnapTrimObjSubEvent::remove_or_update(
406 ObjectContextRef obc,
407 ObjectContextRef head_obc)
408 {
409 auto citer = head_obc->ssc->snapset.clone_snaps.find(coid.snap);
410 if (citer == head_obc->ssc->snapset.clone_snaps.end()) {
411 logger().error("{}: No clone_snaps in snapset {} for object {}",
412 *this, head_obc->ssc->snapset, coid);
413 return crimson::ct_error::enoent::make();
414 }
415 const auto& old_snaps = citer->second;
416 if (old_snaps.empty()) {
417 logger().error("{}: no object info snaps for object {}",
418 *this, coid);
419 return crimson::ct_error::enoent::make();
420 }
421 if (head_obc->ssc->snapset.seq == 0) {
422 logger().error("{}: no snapset.seq for object {}",
423 *this, coid);
424 return crimson::ct_error::enoent::make();
425 }
426 const OSDMapRef& osdmap = pg->get_osdmap();
427 std::set<snapid_t> new_snaps;
428 for (const auto& old_snap : old_snaps) {
429 if (!osdmap->in_removed_snaps_queue(pg->get_info().pgid.pgid.pool(),
430 old_snap)
431 && old_snap != snap_to_trim) {
432 new_snaps.insert(old_snap);
433 }
434 }
435
436 return seastar::do_with(ceph::os::Transaction{}, [=, this](auto &txn) {
437 std::vector<pg_log_entry_t> log_entries{};
438
439 int64_t num_objects_before_trim = delta_stats.num_objects;
440 osd_op_p.at_version = pg->next_version();
441 auto ret = remove_or_update_iertr::now();
442 if (new_snaps.empty()) {
443 // remove clone from snapset
444 logger().info("{}: {} snaps {} -> {} ... deleting",
445 *this, coid, old_snaps, new_snaps);
446 ret = remove_clone(obc, head_obc, txn, log_entries);
447 } else {
448 // save adjusted snaps for this object
449 logger().info("{}: {} snaps {} -> {}",
450 *this, coid, old_snaps, new_snaps);
451 ret = adjust_snaps(obc, head_obc, new_snaps, txn, log_entries);
452 }
453 return std::move(ret).safe_then_interruptible(
454 [&txn, obc, num_objects_before_trim, log_entries=std::move(log_entries), head_obc=std::move(head_obc), this]() mutable {
455 osd_op_p.at_version = pg->next_version();
456
457 // save head snapset
458 logger().debug("{}: {} new snapset {} on {}",
459 *this, coid, head_obc->ssc->snapset, head_obc->obs.oi);
460 if (head_obc->ssc->snapset.clones.empty() && head_obc->obs.oi.is_whiteout()) {
461 remove_head_whiteout(obc, head_obc, txn, log_entries);
462 } else {
463 update_head(obc, head_obc, txn, log_entries);
464 }
465 // Stats reporting - Set number of objects trimmed
466 if (num_objects_before_trim > delta_stats.num_objects) {
467 //int64_t num_objects_trimmed =
468 // num_objects_before_trim - delta_stats.num_objects;
469 //add_objects_trimmed_count(num_objects_trimmed);
470 }
471 }).safe_then_interruptible(
472 [&txn, log_entries=std::move(log_entries)] () mutable {
473 return remove_or_update_iertr::make_ready_future<remove_or_update_ret_t>(
474 std::make_pair(std::move(txn), std::move(log_entries)));
475 });
476 });
477 }
478
479 SnapTrimObjSubEvent::remove_or_update_iertr::future<>
480 SnapTrimObjSubEvent::with_pg(
481 ShardServices &shard_services, Ref<PG> _pg)
482 {
483 return enter_stage<interruptor>(
484 pp().wait_for_active
485 ).then_interruptible([this] {
486 return with_blocking_event<PGActivationBlocker::BlockingEvent,
487 interruptor>([this] (auto&& trigger) {
488 return pg->wait_for_active_blocker.wait(std::move(trigger));
489 });
490 }).then_interruptible([this] {
491 return enter_stage<interruptor>(
492 pp().recover_missing);
493 }).then_interruptible([] {
494 //return do_recover_missing(pg, get_target_oid());
495 return seastar::now();
496 }).then_interruptible([this] {
497 return enter_stage<interruptor>(
498 pp().get_obc);
499 }).then_interruptible([this] {
500 logger().debug("{}: getting obc for {}", *this, coid);
501 // end of commonality
502 // with_head_and_clone_obc lock both clone's and head's obcs
503 return pg->obc_loader.with_head_and_clone_obc<RWState::RWWRITE>(
504 coid,
505 [this](auto head_obc, auto clone_obc) {
506 logger().debug("{}: got clone_obc={}", *this, clone_obc->get_oid());
507 return enter_stage<interruptor>(
508 pp().process
509 ).then_interruptible(
510 [this,clone_obc=std::move(clone_obc), head_obc=std::move(head_obc)]() mutable {
511 logger().debug("{}: processing clone_obc={}", *this, clone_obc->get_oid());
512 return remove_or_update(
513 clone_obc, head_obc
514 ).safe_then_unpack_interruptible([clone_obc, this]
515 (auto&& txn, auto&& log_entries) mutable {
516 auto [submitted, all_completed] = pg->submit_transaction(
517 std::move(clone_obc),
518 std::move(txn),
519 std::move(osd_op_p),
520 std::move(log_entries));
521 return submitted.then_interruptible(
522 [all_completed=std::move(all_completed), this] () mutable {
523 return enter_stage<interruptor>(
524 wait_repop
525 ).then_interruptible([all_completed=std::move(all_completed)] () mutable {
526 return std::move(all_completed);
527 });
528 });
529 });
530 });
531 }).handle_error_interruptible(
532 remove_or_update_iertr::pass_further{},
533 crimson::ct_error::assert_all{"unexpected error in SnapTrimObjSubEvent"}
534 );
535 });
536 }
537
538 void SnapTrimObjSubEvent::print(std::ostream &lhs) const
539 {
540 lhs << "SnapTrimObjSubEvent("
541 << "coid=" << coid
542 << " snapid=" << snap_to_trim
543 << ")";
544 }
545
546 void SnapTrimObjSubEvent::dump_detail(Formatter *f) const
547 {
548 f->open_object_section("SnapTrimObjSubEvent");
549 f->dump_stream("coid") << coid;
550 f->close_section();
551 }
552
553 } // namespace crimson::osd