]> git.proxmox.com Git - ceph.git/blame - ceph/src/crimson/os/seastore/extent_placement_manager.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / crimson / os / seastore / extent_placement_manager.cc
CommitLineData
20effc67
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*-
2// vim: ts=8 sw=2 smarttab expandtab
3
4#include "crimson/os/seastore/journal.h"
5#include "crimson/os/seastore/extent_placement_manager.h"
6
7namespace {
8 seastar::logger& logger() {
9 return crimson::get_logger(ceph_subsys_seastore_tm);
10 }
11}
12
13SET_SUBSYS(seastore_tm);
14
15namespace crimson::os::seastore {
16
17SegmentedAllocator::SegmentedAllocator(
18 SegmentProvider& sp,
19 SegmentManager& sm,
20 LBAManager& lba_manager,
21 Journal& journal,
22 Cache& cache)
23 : segment_provider(sp),
24 segment_manager(sm),
25 lba_manager(lba_manager),
26 journal(journal),
27 cache(cache)
28{
29 std::generate_n(
30 std::back_inserter(writers),
31 crimson::common::get_conf<uint64_t>(
32 "seastore_init_rewrite_segments_num_per_device"),
33 [&] {
34 return Writer{
35 segment_provider,
36 segment_manager,
37 lba_manager,
38 journal,
39 cache};
40 });
41}
42
43SegmentedAllocator::Writer::finish_record_ret
44SegmentedAllocator::Writer::finish_write(
45 Transaction& t,
46 ool_record_t& record) {
47 return trans_intr::do_for_each(record.get_extents(),
48 [this, &t](auto& ool_extent) {
49 LOG_PREFIX(SegmentedAllocator::Writer::finish_write);
50 auto& lextent = ool_extent.get_lextent();
51 DEBUGT("extent: {}, ool_paddr: {}",
52 t,
53 *lextent,
54 ool_extent.get_ool_paddr());
55 return lba_manager.update_mapping(
56 t,
57 lextent->get_laddr(),
58 lextent->get_paddr(),
59 ool_extent.get_ool_paddr()
60 ).si_then([&ool_extent, &t, &lextent, this] {
61 lextent->backend_type = device_type_t::NONE;
62 lextent->hint = {};
63 cache.mark_delayed_extent_ool(t, lextent, ool_extent.get_ool_paddr());
64 return finish_record_iertr::now();
65 });
66 }).si_then([&record] {
67 record.clear();
68 });
69}
70
71SegmentedAllocator::Writer::write_iertr::future<>
72SegmentedAllocator::Writer::_write(
73 Transaction& t,
74 ool_record_t& record)
75{
76 auto record_size = record.get_encoded_record_length();
77 allocated_to += record_size.get_encoded_length();
78 segment_provider.update_segment_avail_bytes(
79 paddr_t::make_seg_paddr(
80 current_segment->segment->get_segment_id(),
81 allocated_to));
82 bufferlist bl = record.encode(
83 current_segment->segment->get_segment_id(),
84 0);
85 seastar::promise<> pr;
86 current_segment->inflight_writes.emplace_back(pr.get_future());
87 LOG_PREFIX(SegmentedAllocator::Writer::_write);
88
89 DEBUGT(
90 "written {} extents, {} bytes to segment {} at {}",
91 t,
92 record.get_num_extents(),
93 bl.length(),
94 current_segment->segment->get_segment_id(),
95 record.get_base());
96
97 // account transactional ool writes before write()
98 auto& stats = t.get_ool_write_stats();
99 stats.extents.num += record.get_num_extents();
100 stats.extents.bytes += record_size.dlength;
101 stats.header_raw_bytes += record_size.get_raw_mdlength();
102 stats.header_bytes += record_size.get_mdlength();
103 stats.data_bytes += record_size.dlength;
104 stats.num_records += 1;
105
106 return trans_intr::make_interruptible(
107 current_segment->segment->write(record.get_base(), bl).safe_then(
108 [this, pr=std::move(pr), &t,
109 it=(--current_segment->inflight_writes.end()),
110 cs=current_segment]() mutable {
111 LOG_PREFIX(SegmentedAllocator::Writer::_write);
112 if (cs->outdated) {
113 DEBUGT("segment rolled", t);
114 pr.set_value();
115 } else{
116 DEBUGT("segment not rolled", t);
117 current_segment->inflight_writes.erase(it);
118 }
119 return seastar::now();
120 })
121 ).si_then([this, &record, &t]() mutable {
122 return finish_write(t, record);
123 });
124}
125
126void SegmentedAllocator::Writer::add_extent_to_write(
127 ool_record_t& record,
128 LogicalCachedExtentRef& extent) {
129 logger().debug(
130 "SegmentedAllocator::Writer::add_extent_to_write: "
131 "add extent {} to record",
132 extent);
133 extent->prepare_write();
134 record.add_extent(extent);
135}
136
137SegmentedAllocator::Writer::write_iertr::future<>
138SegmentedAllocator::Writer::write(
139 Transaction& t,
140 std::list<LogicalCachedExtentRef>& extents)
141{
142 auto write_func = [this, &extents, &t] {
143 return seastar::do_with(ool_record_t(segment_manager.get_block_size()),
144 [this, &extents, &t](auto& record) {
145 return trans_intr::repeat([this, &record, &t, &extents]()
146 -> write_iertr::future<seastar::stop_iteration> {
147 if (extents.empty()) {
148 return seastar::make_ready_future<
149 seastar::stop_iteration>(seastar::stop_iteration::yes);
150 }
151
152 return segment_rotation_guard.wait(
153 [this] {
154 return !rolling_segment;
155 },
156 [this, &record, &extents, &t]() -> write_iertr::future<> {
157 LOG_PREFIX(SegmentedAllocator::Writer::write);
158 record.set_base(allocated_to);
159 for (auto it = extents.begin();
160 it != extents.end();) {
161 auto& extent = *it;
162 auto wouldbe_length =
163 record.get_wouldbe_encoded_record_length(extent);
164 if (_needs_roll(wouldbe_length)) {
165 // reached the segment end, write and roll
166 assert(!rolling_segment);
167 rolling_segment = true;
168 auto num_extents = record.get_num_extents();
169 DEBUGT(
170 "end of segment, writing {} extents to segment {} at {}",
171 t,
172 num_extents,
173 current_segment->segment->get_segment_id(),
174 allocated_to);
175 return (num_extents ?
176 _write(t, record) :
177 write_iertr::now()
178 ).si_then([this]() mutable {
179 return roll_segment(false);
180 }).finally([this] {
181 rolling_segment = false;
182 segment_rotation_guard.broadcast();
183 });
184 }
185 add_extent_to_write(record, extent);
186 it = extents.erase(it);
187 }
188
189 DEBUGT(
190 "writing {} extents to segment {} at {}",
191 t,
192 record.get_num_extents(),
193 current_segment->segment->get_segment_id(),
194 allocated_to);
195 return _write(t, record);
196 }
197 ).si_then([]()
198 -> write_iertr::future<seastar::stop_iteration> {
199 return seastar::make_ready_future<
200 seastar::stop_iteration>(seastar::stop_iteration::no);
201 });
202 });
203 });
204 };
205
206 if (rolling_segment) {
207 return segment_rotation_guard.wait([this] {
208 return !rolling_segment;
209 }, std::move(write_func));
210
211 } else if (!current_segment) {
212 return trans_intr::make_interruptible(roll_segment(true)).si_then(
213 [write_func=std::move(write_func)] {
214 return write_func();
215 });
216 }
217 return write_func();
218}
219
220bool SegmentedAllocator::Writer::_needs_roll(segment_off_t length) const {
221 return allocated_to + length > current_segment->segment->get_write_capacity();
222}
223
224SegmentedAllocator::Writer::init_segment_ertr::future<>
225SegmentedAllocator::Writer::init_segment(Segment& segment) {
226 bufferptr bp(
227 ceph::buffer::create_page_aligned(
228 segment_manager.get_block_size()));
229 bp.zero();
230 auto header =segment_header_t{
231 journal.get_segment_seq(),
232 segment.get_segment_id(),
233 NO_DELTAS, 0, true};
234 logger().debug("SegmentedAllocator::Writer::init_segment: initting {}, {}",
235 segment.get_segment_id(),
236 header);
237 ceph::bufferlist bl;
238 encode(header, bl);
239 bl.cbegin().copy(bl.length(), bp.c_str());
240 bl.clear();
241 bl.append(bp);
242 allocated_to = segment_manager.get_block_size();
243 return segment.write(0, bl).handle_error(
244 crimson::ct_error::input_output_error::pass_further{},
245 crimson::ct_error::assert_all{
246 "Invalid error when initing segment"}
247 );
248}
249
250SegmentedAllocator::Writer::roll_segment_ertr::future<>
251SegmentedAllocator::Writer::roll_segment(bool set_rolling) {
252 LOG_PREFIX(SegmentedAllocator::Writer::roll_segment);
253 DEBUG("set_rolling {}", set_rolling);
254 if (set_rolling) {
255 rolling_segment = true;
256 }
257 assert(rolling_segment);
258 if (current_segment) {
259 (void) seastar::with_gate(writer_guard, [this] {
260 auto fut = seastar::now();
261 if (!current_segment->inflight_writes.empty()) {
262 fut = seastar::when_all_succeed(
263 current_segment->inflight_writes.begin(),
264 current_segment->inflight_writes.end());
265 }
266 current_segment->outdated = true;
267 return fut.then(
268 [cs=std::move(current_segment), this, it=(--open_segments.end())] {
269 return cs->segment->close().safe_then([this, cs, it] {
270 LOG_PREFIX(SegmentedAllocator::Writer::roll_segment);
271 assert((*it).get() == cs.get());
272 segment_provider.close_segment(cs->segment->get_segment_id());
273 open_segments.erase(it);
274 DEBUG("closed segment: {}", cs->segment->get_segment_id());
275 });
276 });
277 }).handle_exception_type([](seastar::gate_closed_exception e) {
278 LOG_PREFIX(SegmentedAllocator::Writer::roll_segment);
279 DEBUG(" writer_guard closed, should be stopping");
280 return seastar::now();
281 });
282 }
283
284 return segment_provider.get_segment(
285 segment_manager.get_device_id()
286 ).safe_then([this](auto segment) {
287 return segment_manager.open(segment);
288 }).safe_then([this](auto segref) {
289 LOG_PREFIX(SegmentedAllocator::Writer::roll_segment);
290 DEBUG("opened new segment: {}", segref->get_segment_id());
291 return init_segment(*segref).safe_then([segref=std::move(segref), this] {
292 LOG_PREFIX(SegmentedAllocator::Writer::roll_segment);
293 assert(!current_segment.get());
294 current_segment.reset(new open_segment_wrapper_t());
295 current_segment->segment = segref;
296 open_segments.emplace_back(current_segment);
297 rolling_segment = false;
298 segment_rotation_guard.broadcast();
299 DEBUG("inited new segment: {}", segref->get_segment_id());
300 });
301 }).handle_error(
302 roll_segment_ertr::pass_further{},
303 crimson::ct_error::all_same_way([] { ceph_assert(0 == "TODO"); })
304 );
305}
306
307}