]> git.proxmox.com Git - ceph.git/blob - ceph/src/crimson/os/seastore/transaction_manager.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / crimson / os / seastore / transaction_manager.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "include/denc.h"
5 #include "include/intarith.h"
6
7 #include "crimson/os/seastore/logging.h"
8 #include "crimson/os/seastore/transaction_manager.h"
9 #include "crimson/os/seastore/segment_manager.h"
10 #include "crimson/os/seastore/journal.h"
11
12 SET_SUBSYS(seastore_tm);
13
14 namespace crimson::os::seastore {
15
16 TransactionManager::TransactionManager(
17 SegmentManager &_segment_manager,
18 SegmentCleanerRef _segment_cleaner,
19 JournalRef _journal,
20 CacheRef _cache,
21 LBAManagerRef _lba_manager,
22 ExtentPlacementManagerRef&& epm,
23 ExtentReader& scanner)
24 : segment_manager(_segment_manager),
25 segment_cleaner(std::move(_segment_cleaner)),
26 cache(std::move(_cache)),
27 lba_manager(std::move(_lba_manager)),
28 journal(std::move(_journal)),
29 epm(std::move(epm)),
30 scanner(scanner)
31 {
32 segment_cleaner->set_extent_callback(this);
33 journal->set_write_pipeline(&write_pipeline);
34 register_metrics();
35 }
36
37 TransactionManager::mkfs_ertr::future<> TransactionManager::mkfs()
38 {
39 LOG_PREFIX(TransactionManager::mkfs);
40 segment_cleaner->mount(
41 segment_manager.get_device_id(),
42 scanner.get_segment_managers());
43 return journal->open_for_write().safe_then([this, FNAME](auto addr) {
44 DEBUG("about to do_with");
45 segment_cleaner->init_mkfs(addr);
46 return with_transaction_intr(
47 Transaction::src_t::MUTATE,
48 "mkfs_tm",
49 [this, FNAME](auto& t)
50 {
51 DEBUGT("about to cache->mkfs", t);
52 cache->init();
53 return cache->mkfs(t
54 ).si_then([this, &t] {
55 return lba_manager->mkfs(t);
56 }).si_then([this, FNAME, &t] {
57 DEBUGT("about to submit_transaction", t);
58 return submit_transaction_direct(t);
59 });
60 }).handle_error(
61 crimson::ct_error::eagain::handle([] {
62 ceph_assert(0 == "eagain impossible");
63 return mkfs_ertr::now();
64 }),
65 mkfs_ertr::pass_further{}
66 );
67 }).safe_then([this] {
68 return close();
69 });
70 }
71
72 TransactionManager::mount_ertr::future<> TransactionManager::mount()
73 {
74 LOG_PREFIX(TransactionManager::mount);
75 cache->init();
76 segment_cleaner->mount(
77 segment_manager.get_device_id(),
78 scanner.get_segment_managers());
79 return segment_cleaner->init_segments().safe_then(
80 [this](auto&& segments) {
81 return journal->replay(
82 std::move(segments),
83 [this](const auto &offsets, const auto &e) {
84 auto start_seq = offsets.write_result.start_seq;
85 segment_cleaner->update_journal_tail_target(
86 cache->get_oldest_dirty_from().value_or(start_seq));
87 return cache->replay_delta(
88 start_seq,
89 offsets.record_block_base,
90 e);
91 });
92 }).safe_then([this] {
93 return journal->open_for_write();
94 }).safe_then([this, FNAME](auto addr) {
95 segment_cleaner->set_journal_head(addr);
96 return seastar::do_with(
97 create_weak_transaction(
98 Transaction::src_t::READ, "mount"),
99 [this, FNAME](auto &tref) {
100 return with_trans_intr(
101 *tref,
102 [this, FNAME](auto &t) {
103 return cache->init_cached_extents(t, [this](auto &t, auto &e) {
104 return lba_manager->init_cached_extent(t, e);
105 }).si_then([this, FNAME, &t] {
106 assert(segment_cleaner->debug_check_space(
107 *segment_cleaner->get_empty_space_tracker()));
108 return lba_manager->scan_mapped_space(
109 t,
110 [this, FNAME, &t](paddr_t addr, extent_len_t len) {
111 TRACET(
112 "marking {}~{} used",
113 t,
114 addr,
115 len);
116 if (addr.is_real()) {
117 segment_cleaner->mark_space_used(
118 addr,
119 len ,
120 /* init_scan = */ true);
121 }
122 });
123 });
124 });
125 });
126 }).safe_then([this] {
127 segment_cleaner->complete_init();
128 }).handle_error(
129 mount_ertr::pass_further{},
130 crimson::ct_error::all_same_way([] {
131 ceph_assert(0 == "unhandled error");
132 return mount_ertr::now();
133 }));
134 }
135
136 TransactionManager::close_ertr::future<> TransactionManager::close() {
137 LOG_PREFIX(TransactionManager::close);
138 DEBUG("enter");
139 return segment_cleaner->stop(
140 ).then([this] {
141 return cache->close();
142 }).safe_then([this] {
143 cache->dump_contents();
144 return journal->close();
145 }).safe_then([FNAME] {
146 DEBUG("completed");
147 return seastar::now();
148 });
149 }
150
151 TransactionManager::ref_ret TransactionManager::inc_ref(
152 Transaction &t,
153 LogicalCachedExtentRef &ref)
154 {
155 return lba_manager->incref_extent(t, ref->get_laddr()).si_then([](auto r) {
156 return r.refcount;
157 }).handle_error_interruptible(
158 ref_iertr::pass_further{},
159 ct_error::all_same_way([](auto e) {
160 ceph_assert(0 == "unhandled error, TODO");
161 }));
162 }
163
164 TransactionManager::ref_ret TransactionManager::inc_ref(
165 Transaction &t,
166 laddr_t offset)
167 {
168 return lba_manager->incref_extent(t, offset).si_then([](auto result) {
169 return result.refcount;
170 });
171 }
172
173 TransactionManager::ref_ret TransactionManager::dec_ref(
174 Transaction &t,
175 LogicalCachedExtentRef &ref)
176 {
177 LOG_PREFIX(TransactionManager::dec_ref);
178 return lba_manager->decref_extent(t, ref->get_laddr()
179 ).si_then([this, FNAME, &t, ref](auto ret) {
180 if (ret.refcount == 0) {
181 DEBUGT(
182 "extent {} refcount 0",
183 t,
184 *ref);
185 cache->retire_extent(t, ref);
186 stats.extents_retired_total++;
187 stats.extents_retired_bytes += ref->get_length();
188 }
189 return ret.refcount;
190 });
191 }
192
193 TransactionManager::ref_ret TransactionManager::dec_ref(
194 Transaction &t,
195 laddr_t offset)
196 {
197 LOG_PREFIX(TransactionManager::dec_ref);
198 return lba_manager->decref_extent(t, offset
199 ).si_then([this, FNAME, offset, &t](auto result) -> ref_ret {
200 if (result.refcount == 0 && !result.addr.is_zero()) {
201 DEBUGT("offset {} refcount 0", t, offset);
202 return cache->retire_extent_addr(
203 t, result.addr, result.length
204 ).si_then([result, this] {
205 stats.extents_retired_total++;
206 stats.extents_retired_bytes += result.length;
207 return ref_ret(
208 interruptible::ready_future_marker{},
209 0);
210 });
211 } else {
212 return ref_ret(
213 interruptible::ready_future_marker{},
214 result.refcount);
215 }
216 });
217 }
218
219 TransactionManager::refs_ret TransactionManager::dec_ref(
220 Transaction &t,
221 std::vector<laddr_t> offsets)
222 {
223 return seastar::do_with(std::move(offsets), std::vector<unsigned>(),
224 [this, &t] (auto &&offsets, auto &refcnt) {
225 return trans_intr::do_for_each(offsets.begin(), offsets.end(),
226 [this, &t, &refcnt] (auto &laddr) {
227 return this->dec_ref(t, laddr).si_then([&refcnt] (auto ref) {
228 refcnt.push_back(ref);
229 return ref_iertr::now();
230 });
231 }).si_then([&refcnt] {
232 return ref_iertr::make_ready_future<std::vector<unsigned>>(std::move(refcnt));
233 });
234 });
235 }
236
237 TransactionManager::submit_transaction_iertr::future<>
238 TransactionManager::submit_transaction(
239 Transaction &t)
240 {
241 LOG_PREFIX(TransactionManager::submit_transaction);
242 return trans_intr::make_interruptible(
243 t.get_handle().enter(write_pipeline.reserve_projected_usage)
244 ).then_interruptible([this, FNAME, &t] {
245 size_t projected_usage = t.get_allocation_size();
246 DEBUGT("waiting for projected_usage: {}", t, projected_usage);
247 return trans_intr::make_interruptible(
248 segment_cleaner->reserve_projected_usage(projected_usage)
249 ).then_interruptible([this, &t] {
250 return submit_transaction_direct(t);
251 }).finally([this, FNAME, projected_usage, &t] {
252 DEBUGT("releasing projected_usage: {}", t, projected_usage);
253 segment_cleaner->release_projected_usage(projected_usage);
254 });
255 });
256 }
257
258 TransactionManager::submit_transaction_direct_ret
259 TransactionManager::submit_transaction_direct(
260 Transaction &tref)
261 {
262 LOG_PREFIX(TransactionManager::submit_transaction_direct);
263 DEBUGT("about to alloc delayed extents", tref);
264
265 return trans_intr::make_interruptible(
266 tref.get_handle().enter(write_pipeline.ool_writes)
267 ).then_interruptible([this, &tref] {
268 return epm->delayed_alloc_or_ool_write(tref
269 ).handle_error_interruptible(
270 crimson::ct_error::input_output_error::pass_further(),
271 crimson::ct_error::assert_all("invalid error")
272 );
273 }).si_then([this, FNAME, &tref] {
274 DEBUGT("about to prepare", tref);
275 return tref.get_handle().enter(write_pipeline.prepare);
276 }).si_then([this, FNAME, &tref]() mutable
277 -> submit_transaction_iertr::future<> {
278 auto record = cache->prepare_record(tref);
279
280 tref.get_handle().maybe_release_collection_lock();
281
282 DEBUGT("about to submit to journal", tref);
283
284 return journal->submit_record(std::move(record), tref.get_handle()
285 ).safe_then([this, FNAME, &tref](auto submit_result) mutable {
286 auto start_seq = submit_result.write_result.start_seq;
287 auto end_seq = submit_result.write_result.get_end_seq();
288 DEBUGT("journal commit to record_block_base={}, start_seq={}, end_seq={}",
289 tref,
290 submit_result.record_block_base,
291 start_seq,
292 end_seq);
293 segment_cleaner->set_journal_head(end_seq);
294 cache->complete_commit(
295 tref,
296 submit_result.record_block_base,
297 start_seq,
298 segment_cleaner.get());
299 lba_manager->complete_transaction(tref);
300 segment_cleaner->update_journal_tail_target(
301 cache->get_oldest_dirty_from().value_or(start_seq));
302 auto to_release = tref.get_segment_to_release();
303 if (to_release != NULL_SEG_ID) {
304 return segment_manager.release(to_release
305 ).safe_then([this, to_release] {
306 segment_cleaner->mark_segment_released(to_release);
307 });
308 } else {
309 return SegmentManager::release_ertr::now();
310 }
311 }).safe_then([&tref] {
312 return tref.get_handle().complete();
313 }).handle_error(
314 submit_transaction_iertr::pass_further{},
315 crimson::ct_error::all_same_way([](auto e) {
316 ceph_assert(0 == "Hit error submitting to journal");
317 })
318 );
319 }).finally([&tref]() {
320 tref.get_handle().exit();
321 });
322 }
323
324 TransactionManager::get_next_dirty_extents_ret
325 TransactionManager::get_next_dirty_extents(
326 Transaction &t,
327 journal_seq_t seq,
328 size_t max_bytes)
329 {
330 return cache->get_next_dirty_extents(t, seq, max_bytes);
331 }
332
333 TransactionManager::rewrite_extent_ret
334 TransactionManager::rewrite_logical_extent(
335 Transaction& t,
336 LogicalCachedExtentRef extent)
337 {
338 LOG_PREFIX(TransactionManager::rewrite_logical_extent);
339 if (extent->has_been_invalidated()) {
340 ERRORT("{} has been invalidated", t, *extent);
341 }
342 assert(!extent->has_been_invalidated());
343 DEBUGT("rewriting {}", t, *extent);
344
345 auto lextent = extent->cast<LogicalCachedExtent>();
346 cache->retire_extent(t, extent);
347 auto nlextent = epm->alloc_new_extent_by_type(
348 t,
349 lextent->get_type(),
350 lextent->get_length(),
351 placement_hint_t::REWRITE)->cast<LogicalCachedExtent>();
352 lextent->get_bptr().copy_out(
353 0,
354 lextent->get_length(),
355 nlextent->get_bptr().c_str());
356 nlextent->set_laddr(lextent->get_laddr());
357 nlextent->set_pin(lextent->get_pin().duplicate());
358
359 DEBUGT(
360 "rewriting {} into {}",
361 t,
362 *lextent,
363 *nlextent);
364
365 /* This update_mapping is, strictly speaking, unnecessary for delayed_alloc
366 * extents since we're going to do it again once we either do the ool write
367 * or allocate a relative inline addr. TODO: refactor SegmentCleaner to
368 * avoid this complication. */
369 return lba_manager->update_mapping(
370 t,
371 lextent->get_laddr(),
372 lextent->get_paddr(),
373 nlextent->get_paddr());
374 }
375
376 TransactionManager::rewrite_extent_ret TransactionManager::rewrite_extent(
377 Transaction &t,
378 CachedExtentRef extent)
379 {
380 LOG_PREFIX(TransactionManager::rewrite_extent);
381 {
382 auto updated = cache->update_extent_from_transaction(t, extent);
383 if (!updated) {
384 DEBUGT("{} is already retired, skipping", t, *extent);
385 return rewrite_extent_iertr::now();
386 }
387 extent = updated;
388 }
389
390 if (extent->get_type() == extent_types_t::ROOT) {
391 DEBUGT("marking root {} for rewrite", t, *extent);
392 cache->duplicate_for_write(t, extent);
393 return rewrite_extent_iertr::now();
394 }
395
396 if (extent->is_logical()) {
397 return rewrite_logical_extent(t, extent->cast<LogicalCachedExtent>());
398 } else {
399 return lba_manager->rewrite_extent(t, extent);
400 }
401 }
402
403 TransactionManager::get_extent_if_live_ret TransactionManager::get_extent_if_live(
404 Transaction &t,
405 extent_types_t type,
406 paddr_t addr,
407 laddr_t laddr,
408 segment_off_t len)
409 {
410 LOG_PREFIX(TransactionManager::get_extent_if_live);
411 DEBUGT("type {}, addr {}, laddr {}, len {}", t, type, addr, laddr, len);
412
413 return cache->get_extent_if_cached(t, addr, type
414 ).si_then([this, FNAME, &t, type, addr, laddr, len](auto extent)
415 -> get_extent_if_live_ret {
416 if (extent) {
417 return get_extent_if_live_ret (
418 interruptible::ready_future_marker{},
419 extent);
420 }
421
422 if (is_logical_type(type)) {
423 using inner_ret = LBAManager::get_mapping_iertr::future<CachedExtentRef>;
424 return lba_manager->get_mapping(
425 t,
426 laddr).si_then([=, &t] (LBAPinRef pin) -> inner_ret {
427 ceph_assert(pin->get_laddr() == laddr);
428 if (pin->get_paddr() == addr) {
429 if (pin->get_length() != (extent_len_t)len) {
430 ERRORT(
431 "Invalid pin laddr {} paddr {} len {} found for "
432 "extent laddr {} len{}",
433 t,
434 pin->get_laddr(),
435 pin->get_paddr(),
436 pin->get_length(),
437 laddr,
438 len);
439 }
440 ceph_assert(pin->get_length() == (extent_len_t)len);
441 return cache->get_extent_by_type(
442 t,
443 type,
444 addr,
445 laddr,
446 len,
447 [this, pin=std::move(pin)](CachedExtent &extent) mutable {
448 auto lref = extent.cast<LogicalCachedExtent>();
449 assert(!lref->has_pin());
450 assert(!lref->has_been_invalidated());
451 assert(!pin->has_been_invalidated());
452 lref->set_pin(std::move(pin));
453 lba_manager->add_pin(lref->get_pin());
454 });
455 } else {
456 return inner_ret(
457 interruptible::ready_future_marker{},
458 CachedExtentRef());
459 }
460 }).handle_error_interruptible(crimson::ct_error::enoent::handle([] {
461 return CachedExtentRef();
462 }), crimson::ct_error::pass_further_all{});
463 } else {
464 DEBUGT("non-logical extent {}", t, addr);
465 return lba_manager->get_physical_extent_if_live(
466 t,
467 type,
468 addr,
469 laddr,
470 len);
471 }
472 });
473 }
474
475 TransactionManager::~TransactionManager() {}
476
477 void TransactionManager::register_metrics()
478 {
479 namespace sm = seastar::metrics;
480 metrics.add_group("tm", {
481 sm::make_counter("extents_retired_total", stats.extents_retired_total,
482 sm::description("total number of retired extents in TransactionManager")),
483 sm::make_counter("extents_retired_bytes", stats.extents_retired_bytes,
484 sm::description("total size of retired extents in TransactionManager")),
485 sm::make_counter("extents_mutated_total", stats.extents_mutated_total,
486 sm::description("total number of mutated extents in TransactionManager")),
487 sm::make_counter("extents_mutated_bytes", stats.extents_mutated_bytes,
488 sm::description("total size of mutated extents in TransactionManager")),
489 sm::make_counter("extents_allocated_total", stats.extents_allocated_total,
490 sm::description("total number of allocated extents in TransactionManager")),
491 sm::make_counter("extents_allocated_bytes", stats.extents_allocated_bytes,
492 sm::description("total size of allocated extents in TransactionManager")),
493 });
494 }
495
496 }