]>
Commit | Line | Data |
---|---|---|
20effc67 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- |
2 | // vim: ts=8 sw=2 smarttab expandtab | |
3 | ||
4 | #include "crimson/os/seastore/journal.h" | |
5 | #include "crimson/os/seastore/extent_placement_manager.h" | |
6 | ||
7 | namespace { | |
8 | seastar::logger& logger() { | |
9 | return crimson::get_logger(ceph_subsys_seastore_tm); | |
10 | } | |
11 | } | |
12 | ||
13 | SET_SUBSYS(seastore_tm); | |
14 | ||
15 | namespace crimson::os::seastore { | |
16 | ||
17 | SegmentedAllocator::SegmentedAllocator( | |
18 | SegmentProvider& sp, | |
19 | SegmentManager& sm, | |
20 | LBAManager& lba_manager, | |
21 | Journal& journal, | |
22 | Cache& cache) | |
23 | : segment_provider(sp), | |
24 | segment_manager(sm), | |
25 | lba_manager(lba_manager), | |
26 | journal(journal), | |
27 | cache(cache) | |
28 | { | |
29 | std::generate_n( | |
30 | std::back_inserter(writers), | |
31 | crimson::common::get_conf<uint64_t>( | |
32 | "seastore_init_rewrite_segments_num_per_device"), | |
33 | [&] { | |
34 | return Writer{ | |
35 | segment_provider, | |
36 | segment_manager, | |
37 | lba_manager, | |
38 | journal, | |
39 | cache}; | |
40 | }); | |
41 | } | |
42 | ||
43 | SegmentedAllocator::Writer::finish_record_ret | |
44 | SegmentedAllocator::Writer::finish_write( | |
45 | Transaction& t, | |
46 | ool_record_t& record) { | |
47 | return trans_intr::do_for_each(record.get_extents(), | |
48 | [this, &t](auto& ool_extent) { | |
49 | LOG_PREFIX(SegmentedAllocator::Writer::finish_write); | |
50 | auto& lextent = ool_extent.get_lextent(); | |
51 | DEBUGT("extent: {}, ool_paddr: {}", | |
52 | t, | |
53 | *lextent, | |
54 | ool_extent.get_ool_paddr()); | |
55 | return lba_manager.update_mapping( | |
56 | t, | |
57 | lextent->get_laddr(), | |
58 | lextent->get_paddr(), | |
59 | ool_extent.get_ool_paddr() | |
60 | ).si_then([&ool_extent, &t, &lextent, this] { | |
61 | lextent->backend_type = device_type_t::NONE; | |
62 | lextent->hint = {}; | |
63 | cache.mark_delayed_extent_ool(t, lextent, ool_extent.get_ool_paddr()); | |
64 | return finish_record_iertr::now(); | |
65 | }); | |
66 | }).si_then([&record] { | |
67 | record.clear(); | |
68 | }); | |
69 | } | |
70 | ||
71 | SegmentedAllocator::Writer::write_iertr::future<> | |
72 | SegmentedAllocator::Writer::_write( | |
73 | Transaction& t, | |
74 | ool_record_t& record) | |
75 | { | |
76 | auto record_size = record.get_encoded_record_length(); | |
77 | allocated_to += record_size.get_encoded_length(); | |
78 | segment_provider.update_segment_avail_bytes( | |
79 | paddr_t::make_seg_paddr( | |
80 | current_segment->segment->get_segment_id(), | |
81 | allocated_to)); | |
82 | bufferlist bl = record.encode( | |
83 | current_segment->segment->get_segment_id(), | |
84 | 0); | |
85 | seastar::promise<> pr; | |
86 | current_segment->inflight_writes.emplace_back(pr.get_future()); | |
87 | LOG_PREFIX(SegmentedAllocator::Writer::_write); | |
88 | ||
89 | DEBUGT( | |
90 | "written {} extents, {} bytes to segment {} at {}", | |
91 | t, | |
92 | record.get_num_extents(), | |
93 | bl.length(), | |
94 | current_segment->segment->get_segment_id(), | |
95 | record.get_base()); | |
96 | ||
97 | // account transactional ool writes before write() | |
98 | auto& stats = t.get_ool_write_stats(); | |
99 | stats.extents.num += record.get_num_extents(); | |
100 | stats.extents.bytes += record_size.dlength; | |
101 | stats.header_raw_bytes += record_size.get_raw_mdlength(); | |
102 | stats.header_bytes += record_size.get_mdlength(); | |
103 | stats.data_bytes += record_size.dlength; | |
104 | stats.num_records += 1; | |
105 | ||
106 | return trans_intr::make_interruptible( | |
107 | current_segment->segment->write(record.get_base(), bl).safe_then( | |
108 | [this, pr=std::move(pr), &t, | |
109 | it=(--current_segment->inflight_writes.end()), | |
110 | cs=current_segment]() mutable { | |
111 | LOG_PREFIX(SegmentedAllocator::Writer::_write); | |
112 | if (cs->outdated) { | |
113 | DEBUGT("segment rolled", t); | |
114 | pr.set_value(); | |
115 | } else{ | |
116 | DEBUGT("segment not rolled", t); | |
117 | current_segment->inflight_writes.erase(it); | |
118 | } | |
119 | return seastar::now(); | |
120 | }) | |
121 | ).si_then([this, &record, &t]() mutable { | |
122 | return finish_write(t, record); | |
123 | }); | |
124 | } | |
125 | ||
126 | void SegmentedAllocator::Writer::add_extent_to_write( | |
127 | ool_record_t& record, | |
128 | LogicalCachedExtentRef& extent) { | |
129 | logger().debug( | |
130 | "SegmentedAllocator::Writer::add_extent_to_write: " | |
131 | "add extent {} to record", | |
132 | extent); | |
133 | extent->prepare_write(); | |
134 | record.add_extent(extent); | |
135 | } | |
136 | ||
137 | SegmentedAllocator::Writer::write_iertr::future<> | |
138 | SegmentedAllocator::Writer::write( | |
139 | Transaction& t, | |
140 | std::list<LogicalCachedExtentRef>& extents) | |
141 | { | |
142 | auto write_func = [this, &extents, &t] { | |
143 | return seastar::do_with(ool_record_t(segment_manager.get_block_size()), | |
144 | [this, &extents, &t](auto& record) { | |
145 | return trans_intr::repeat([this, &record, &t, &extents]() | |
146 | -> write_iertr::future<seastar::stop_iteration> { | |
147 | if (extents.empty()) { | |
148 | return seastar::make_ready_future< | |
149 | seastar::stop_iteration>(seastar::stop_iteration::yes); | |
150 | } | |
151 | ||
152 | return segment_rotation_guard.wait( | |
153 | [this] { | |
154 | return !rolling_segment; | |
155 | }, | |
156 | [this, &record, &extents, &t]() -> write_iertr::future<> { | |
157 | LOG_PREFIX(SegmentedAllocator::Writer::write); | |
158 | record.set_base(allocated_to); | |
159 | for (auto it = extents.begin(); | |
160 | it != extents.end();) { | |
161 | auto& extent = *it; | |
162 | auto wouldbe_length = | |
163 | record.get_wouldbe_encoded_record_length(extent); | |
164 | if (_needs_roll(wouldbe_length)) { | |
165 | // reached the segment end, write and roll | |
166 | assert(!rolling_segment); | |
167 | rolling_segment = true; | |
168 | auto num_extents = record.get_num_extents(); | |
169 | DEBUGT( | |
170 | "end of segment, writing {} extents to segment {} at {}", | |
171 | t, | |
172 | num_extents, | |
173 | current_segment->segment->get_segment_id(), | |
174 | allocated_to); | |
175 | return (num_extents ? | |
176 | _write(t, record) : | |
177 | write_iertr::now() | |
178 | ).si_then([this]() mutable { | |
179 | return roll_segment(false); | |
180 | }).finally([this] { | |
181 | rolling_segment = false; | |
182 | segment_rotation_guard.broadcast(); | |
183 | }); | |
184 | } | |
185 | add_extent_to_write(record, extent); | |
186 | it = extents.erase(it); | |
187 | } | |
188 | ||
189 | DEBUGT( | |
190 | "writing {} extents to segment {} at {}", | |
191 | t, | |
192 | record.get_num_extents(), | |
193 | current_segment->segment->get_segment_id(), | |
194 | allocated_to); | |
195 | return _write(t, record); | |
196 | } | |
197 | ).si_then([]() | |
198 | -> write_iertr::future<seastar::stop_iteration> { | |
199 | return seastar::make_ready_future< | |
200 | seastar::stop_iteration>(seastar::stop_iteration::no); | |
201 | }); | |
202 | }); | |
203 | }); | |
204 | }; | |
205 | ||
206 | if (rolling_segment) { | |
207 | return segment_rotation_guard.wait([this] { | |
208 | return !rolling_segment; | |
209 | }, std::move(write_func)); | |
210 | ||
211 | } else if (!current_segment) { | |
212 | return trans_intr::make_interruptible(roll_segment(true)).si_then( | |
213 | [write_func=std::move(write_func)] { | |
214 | return write_func(); | |
215 | }); | |
216 | } | |
217 | return write_func(); | |
218 | } | |
219 | ||
220 | bool SegmentedAllocator::Writer::_needs_roll(segment_off_t length) const { | |
221 | return allocated_to + length > current_segment->segment->get_write_capacity(); | |
222 | } | |
223 | ||
224 | SegmentedAllocator::Writer::init_segment_ertr::future<> | |
225 | SegmentedAllocator::Writer::init_segment(Segment& segment) { | |
226 | bufferptr bp( | |
227 | ceph::buffer::create_page_aligned( | |
228 | segment_manager.get_block_size())); | |
229 | bp.zero(); | |
230 | auto header =segment_header_t{ | |
231 | journal.get_segment_seq(), | |
232 | segment.get_segment_id(), | |
233 | NO_DELTAS, 0, true}; | |
234 | logger().debug("SegmentedAllocator::Writer::init_segment: initting {}, {}", | |
235 | segment.get_segment_id(), | |
236 | header); | |
237 | ceph::bufferlist bl; | |
238 | encode(header, bl); | |
239 | bl.cbegin().copy(bl.length(), bp.c_str()); | |
240 | bl.clear(); | |
241 | bl.append(bp); | |
242 | allocated_to = segment_manager.get_block_size(); | |
243 | return segment.write(0, bl).handle_error( | |
244 | crimson::ct_error::input_output_error::pass_further{}, | |
245 | crimson::ct_error::assert_all{ | |
246 | "Invalid error when initing segment"} | |
247 | ); | |
248 | } | |
249 | ||
250 | SegmentedAllocator::Writer::roll_segment_ertr::future<> | |
251 | SegmentedAllocator::Writer::roll_segment(bool set_rolling) { | |
252 | LOG_PREFIX(SegmentedAllocator::Writer::roll_segment); | |
253 | DEBUG("set_rolling {}", set_rolling); | |
254 | if (set_rolling) { | |
255 | rolling_segment = true; | |
256 | } | |
257 | assert(rolling_segment); | |
258 | if (current_segment) { | |
259 | (void) seastar::with_gate(writer_guard, [this] { | |
260 | auto fut = seastar::now(); | |
261 | if (!current_segment->inflight_writes.empty()) { | |
262 | fut = seastar::when_all_succeed( | |
263 | current_segment->inflight_writes.begin(), | |
264 | current_segment->inflight_writes.end()); | |
265 | } | |
266 | current_segment->outdated = true; | |
267 | return fut.then( | |
268 | [cs=std::move(current_segment), this, it=(--open_segments.end())] { | |
269 | return cs->segment->close().safe_then([this, cs, it] { | |
270 | LOG_PREFIX(SegmentedAllocator::Writer::roll_segment); | |
271 | assert((*it).get() == cs.get()); | |
272 | segment_provider.close_segment(cs->segment->get_segment_id()); | |
273 | open_segments.erase(it); | |
274 | DEBUG("closed segment: {}", cs->segment->get_segment_id()); | |
275 | }); | |
276 | }); | |
277 | }).handle_exception_type([](seastar::gate_closed_exception e) { | |
278 | LOG_PREFIX(SegmentedAllocator::Writer::roll_segment); | |
279 | DEBUG(" writer_guard closed, should be stopping"); | |
280 | return seastar::now(); | |
281 | }); | |
282 | } | |
283 | ||
284 | return segment_provider.get_segment( | |
285 | segment_manager.get_device_id() | |
286 | ).safe_then([this](auto segment) { | |
287 | return segment_manager.open(segment); | |
288 | }).safe_then([this](auto segref) { | |
289 | LOG_PREFIX(SegmentedAllocator::Writer::roll_segment); | |
290 | DEBUG("opened new segment: {}", segref->get_segment_id()); | |
291 | return init_segment(*segref).safe_then([segref=std::move(segref), this] { | |
292 | LOG_PREFIX(SegmentedAllocator::Writer::roll_segment); | |
293 | assert(!current_segment.get()); | |
294 | current_segment.reset(new open_segment_wrapper_t()); | |
295 | current_segment->segment = segref; | |
296 | open_segments.emplace_back(current_segment); | |
297 | rolling_segment = false; | |
298 | segment_rotation_guard.broadcast(); | |
299 | DEBUG("inited new segment: {}", segref->get_segment_id()); | |
300 | }); | |
301 | }).handle_error( | |
302 | roll_segment_ertr::pass_further{}, | |
303 | crimson::ct_error::all_same_way([] { ceph_assert(0 == "TODO"); }) | |
304 | ); | |
305 | } | |
306 | ||
307 | } |