]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/tests/unit/fstream_test.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / seastar / tests / unit / fstream_test.cc
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18 /*
19 * Copyright (C) 2015 Cloudius Systems, Ltd.
20 */
21
22 #include <algorithm>
23 #include <iostream>
24 #include <numeric>
25 #include <seastar/core/reactor.hh>
26 #include <seastar/core/fstream.hh>
27 #include <seastar/core/shared_ptr.hh>
28 #include <seastar/core/app-template.hh>
29 #include <seastar/core/do_with.hh>
30 #include <seastar/core/seastar.hh>
31 #include <seastar/testing/test_case.hh>
32 #include <seastar/core/thread.hh>
33 #include <seastar/util/defer.hh>
34 #include <random>
35 #include <boost/range/adaptor/transformed.hpp>
36 #include <boost/algorithm/cxx11/any_of.hpp>
37 #include "mock_file.hh"
38
39 using namespace seastar;
40
41 struct writer {
42 output_stream<char> out;
43 writer(file f) : out(make_file_output_stream(std::move(f))) {}
44 };
45
46 struct reader {
47 input_stream<char> in;
48 reader(file f) : in(make_file_input_stream(std::move(f))) {}
49 reader(file f, file_input_stream_options options) : in(make_file_input_stream(std::move(f), std::move(options))) {}
50 };
51
52 SEASTAR_TEST_CASE(test_fstream) {
53 auto sem = make_lw_shared<semaphore>(0);
54
55 open_file_dma("testfile.tmp",
56 open_flags::rw | open_flags::create | open_flags::truncate).then([sem] (file f) {
57 auto w = make_shared<writer>(std::move(f));
58 auto buf = static_cast<char*>(::malloc(4096));
59 memset(buf, 0, 4096);
60 buf[0] = '[';
61 buf[1] = 'A';
62 buf[4095] = ']';
63 w->out.write(buf, 4096).then([buf, w] {
64 ::free(buf);
65 return make_ready_future<>();
66 }).then([w] {
67 auto buf = static_cast<char*>(::malloc(8192));
68 memset(buf, 0, 8192);
69 buf[0] = '[';
70 buf[1] = 'B';
71 buf[8191] = ']';
72 return w->out.write(buf, 8192).then([buf, w] {
73 ::free(buf);
74 return w->out.close().then([w] {});
75 });
76 }).then([] {
77 return open_file_dma("testfile.tmp", open_flags::ro);
78 }).then([] (file f) {
79 /* file content after running the above:
80 * 00000000 5b 41 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |[A..............|
81 * 00000010 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |................|
82 * *
83 * 00000ff0 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 5d |...............]|
84 * 00001000 5b 42 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |[B..............|
85 * 00001010 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |................|
86 * *
87 * 00002ff0 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 5d |...............]|
88 * 00003000
89 */
90 auto r = make_shared<reader>(std::move(f));
91 return r->in.read_exactly(4096 + 8192).then([r] (temporary_buffer<char> buf) {
92 auto p = buf.get();
93 BOOST_REQUIRE(p[0] == '[' && p[1] == 'A' && p[4095] == ']');
94 BOOST_REQUIRE(p[4096] == '[' && p[4096 + 1] == 'B' && p[4096 + 8191] == ']');
95 return make_ready_future<>();
96 }).then([r] {
97 return r->in.close();
98 }).finally([r] {});
99 }).finally([sem] () {
100 sem->signal();
101 });
102 });
103
104 return sem->wait();
105 }
106
107 SEASTAR_TEST_CASE(test_consume_skip_bytes) {
108 return seastar::async([] {
109 auto f = open_file_dma("testfile.tmp",
110 open_flags::rw | open_flags::create | open_flags::truncate).get0();
111 auto w = make_lw_shared<writer>(std::move(f));
112 auto write_block = [w] (char c, size_t size) {
113 std::vector<char> vec(size, c);
114 w->out.write(&vec.front(), vec.size()).get();
115 };
116 write_block('a', 8192);
117 write_block('b', 8192);
118 w->out.close().get();
119 /* file content after running the above:
120 * 00000000 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
121 * *
122 * 00002000 62 62 62 62 62 62 62 62 62 62 62 62 62 62 62 62 |bbbbbbbbbbbbbbbb|
123 * *
124 * 00004000
125 */
126 f = open_file_dma("testfile.tmp", open_flags::ro).get0();
127 auto r = make_lw_shared<reader>(std::move(f), file_input_stream_options{512});
128 struct consumer {
129 uint64_t _count = 0;
130 using consumption_result_type = typename input_stream<char>::consumption_result_type;
131 using stop_consuming_type = typename consumption_result_type::stop_consuming_type;
132 using tmp_buf = stop_consuming_type::tmp_buf;
133
134 /*
135 * Consumer reads the file as follows:
136 * - first 8000 bytes are read in 512-byte chunks and checked
137 * - next 2000 bytes are skipped (jumping over both read buffer size and DMA block)
138 * - the remaining 6384 bytes are read and checked
139 */
140 future<consumption_result_type> operator()(tmp_buf buf) {
141 if (_count < 8000) {
142 auto delta = std::min(buf.size(), 8000 - _count);
143 for (auto c : buf.share(0, delta)) {
144 BOOST_REQUIRE_EQUAL(c, 'a');
145 }
146 buf.trim_front(delta);
147 _count += delta;
148
149 if (_count == 8000) {
150 return make_ready_future<consumption_result_type>(skip_bytes{2000 - buf.size()});
151 } else {
152 assert(buf.empty());
153 return make_ready_future<consumption_result_type>(continue_consuming{});
154 }
155 return make_ready_future<consumption_result_type>(continue_consuming{});
156 } else {
157 for (auto c : buf) {
158 BOOST_REQUIRE_EQUAL(c, 'b');
159 }
160 _count += buf.size();
161 if (_count < 14384) {
162 return make_ready_future<consumption_result_type>(continue_consuming{});
163 } else if (_count > 14384) {
164 BOOST_FAIL("Read more than expected");
165 }
166 return make_ready_future<consumption_result_type>(stop_consuming_type({}));
167 }
168 }
169 };
170 r->in.consume(consumer{}).get();
171 r->in.close().get();
172 });
173 }
174
175 SEASTAR_TEST_CASE(test_fstream_unaligned) {
176 auto sem = make_lw_shared<semaphore>(0);
177
178 open_file_dma("testfile.tmp",
179 open_flags::rw | open_flags::create | open_flags::truncate).then([sem] (file f) {
180 auto w = make_shared<writer>(std::move(f));
181 auto buf = static_cast<char*>(::malloc(40));
182 memset(buf, 0, 40);
183 buf[0] = '[';
184 buf[1] = 'A';
185 buf[39] = ']';
186 w->out.write(buf, 40).then([buf, w] {
187 ::free(buf);
188 return w->out.close().then([w] {});
189 }).then([] {
190 return open_file_dma("testfile.tmp", open_flags::ro);
191 }).then([] (file f) {
192 return do_with(std::move(f), [] (file& f) {
193 return f.size().then([] (size_t size) {
194 // assert that file was indeed truncated to the amount of bytes written.
195 BOOST_REQUIRE(size == 40);
196 return make_ready_future<>();
197 });
198 });
199 }).then([] {
200 return open_file_dma("testfile.tmp", open_flags::ro);
201 }).then([] (file f) {
202 auto r = make_shared<reader>(std::move(f));
203 return r->in.read_exactly(40).then([r] (temporary_buffer<char> buf) {
204 auto p = buf.get();
205 BOOST_REQUIRE(p[0] == '[' && p[1] == 'A' && p[39] == ']');
206 return make_ready_future<>();
207 }).then([r] {
208 return r->in.close();
209 }).finally([r] {});
210 }).finally([sem] () {
211 sem->signal();
212 });
213 });
214
215 return sem->wait();
216 }
217
218 future<> test_consume_until_end(uint64_t size) {
219 return open_file_dma("testfile.tmp",
220 open_flags::rw | open_flags::create | open_flags::truncate).then([size] (file f) {
221 return do_with(make_file_output_stream(f), [size] (output_stream<char>& out) {
222 std::vector<char> buf(size);
223 std::iota(buf.begin(), buf.end(), 0);
224 return out.write(buf.data(), buf.size()).then([&out] {
225 return out.flush();
226 });
227 }).then([f] {
228 return f.size();
229 }).then([size, f] (size_t real_size) {
230 BOOST_REQUIRE_EQUAL(size, real_size);
231 }).then([size, f] {
232 auto consumer = [offset = uint64_t(0), size] (temporary_buffer<char> buf) mutable -> future<input_stream<char>::unconsumed_remainder> {
233 if (!buf) {
234 return make_ready_future<input_stream<char>::unconsumed_remainder>(temporary_buffer<char>());
235 }
236 BOOST_REQUIRE(offset + buf.size() <= size);
237 std::vector<char> expected(buf.size());
238 std::iota(expected.begin(), expected.end(), offset);
239 offset += buf.size();
240 BOOST_REQUIRE(std::equal(buf.begin(), buf.end(), expected.begin()));
241 return make_ready_future<input_stream<char>::unconsumed_remainder>(compat::nullopt);
242 };
243 return do_with(make_file_input_stream(f), std::move(consumer), [] (input_stream<char>& in, auto& consumer) {
244 return in.consume(consumer).then([&in] {
245 return in.close();
246 });
247 });
248 });
249 });
250 }
251
252
253 SEASTAR_TEST_CASE(test_consume_aligned_file) {
254 return test_consume_until_end(4096);
255 }
256
257 SEASTAR_TEST_CASE(test_consume_empty_file) {
258 return test_consume_until_end(0);
259 }
260
261 SEASTAR_TEST_CASE(test_consume_unaligned_file) {
262 return test_consume_until_end(1);
263 }
264
265 SEASTAR_TEST_CASE(test_consume_unaligned_file_large) {
266 return test_consume_until_end((1 << 20) + 1);
267 }
268
269 SEASTAR_TEST_CASE(test_input_stream_esp_around_eof) {
270 return seastar::async([] {
271 auto flen = uint64_t(5341);
272 auto rdist = std::uniform_int_distribution<char>();
273 auto reng = std::default_random_engine();
274 auto data = boost::copy_range<std::vector<uint8_t>>(
275 boost::irange<uint64_t>(0, flen)
276 | boost::adaptors::transformed([&] (int x) { return rdist(reng); }));
277 auto f = open_file_dma("file.tmp",
278 open_flags::rw | open_flags::create | open_flags::truncate).get0();
279 auto out = make_file_output_stream(f);
280 out.write(reinterpret_cast<const char*>(data.data()), data.size()).get();
281 out.flush().get();
282 //out.close().get(); // FIXME: closes underlying stream:?!
283 struct range { uint64_t start; uint64_t end; };
284 auto ranges = std::vector<range>{{
285 range{0, flen},
286 range{0, flen * 2},
287 range{0, flen + 1},
288 range{0, flen - 1},
289 range{0, 1},
290 range{1, 2},
291 range{flen - 1, flen},
292 range{flen - 1, flen + 1},
293 range{flen, flen + 1},
294 range{flen + 1, flen + 2},
295 range{1023, flen-1},
296 range{1023, flen},
297 range{1023, flen + 2},
298 range{8193, 8194},
299 range{1023, 1025},
300 range{1023, 1024},
301 range{1024, 1025},
302 range{1023, 4097},
303 }};
304 auto opt = file_input_stream_options();
305 opt.buffer_size = 512;
306 for (auto&& r : ranges) {
307 auto start = r.start;
308 auto end = r.end;
309 auto len = end - start;
310 auto in = make_file_input_stream(f, start, len, opt);
311 std::vector<uint8_t> readback;
312 auto more = true;
313 while (more) {
314 auto rdata = in.read().get0();
315 for (size_t i = 0; i < rdata.size(); ++i) {
316 readback.push_back(rdata.get()[i]);
317 }
318 more = !rdata.empty();
319 }
320 //in.close().get();
321 auto xlen = std::min(end, flen) - std::min(flen, start);
322 if (xlen != readback.size()) {
323 BOOST_FAIL(format("Expected {:d} bytes but got {:d}, start={:d}, end={:d}", xlen, readback.size(), start, end));
324 }
325 BOOST_REQUIRE(std::equal(readback.begin(), readback.end(), data.begin() + std::min(start, flen)));
326 }
327 f.close().get();
328 });
329 }
330
331 SEASTAR_TEST_CASE(file_handle_test) {
332 return seastar::async([] {
333 auto f = open_file_dma("testfile.tmp", open_flags::create | open_flags::truncate | open_flags::rw).get0();
334 auto buf = static_cast<char*>(aligned_alloc(4096, 4096));
335 auto del = defer([&] { ::free(buf); });
336 for (unsigned i = 0; i < 4096; ++i) {
337 buf[i] = i;
338 }
339 f.dma_write(0, buf, 4096).get();
340 auto bad = std::vector<unsigned>(smp::count); // std::vector<bool> is special and unsuitable because it uses bitfields
341 smp::invoke_on_all([fh = f.dup(), &bad] {
342 return seastar::async([fh, &bad] {
343 auto f = fh.to_file();
344 auto buf = static_cast<char*>(aligned_alloc(4096, 4096));
345 auto del = defer([&] { ::free(buf); });
346 f.dma_read(0, buf, 4096).get();
347 for (unsigned i = 0; i < 4096; ++i) {
348 bad[engine().cpu_id()] |= buf[i] != char(i);
349 }
350 });
351 }).get();
352 BOOST_REQUIRE(!boost::algorithm::any_of_equal(bad, 1u));
353 f.close().get();
354 });
355 }
356
357 SEASTAR_TEST_CASE(test_fstream_slow_start) {
358 return seastar::async([] {
359 static constexpr size_t file_size = 128 * 1024 * 1024;
360 static constexpr size_t buffer_size = 260 * 1024;
361 static constexpr size_t read_ahead = 1;
362
363 auto mock_file = make_shared<mock_read_only_file>(file_size);
364
365 auto history = make_lw_shared<file_input_stream_history>();
366
367 file_input_stream_options options{};
368 options.buffer_size = buffer_size;
369 options.read_ahead = read_ahead;
370 options.dynamic_adjustments = history;
371
372 static constexpr size_t requests_at_slow_start = 2; // 1 request + 1 read-ahead
373 static constexpr size_t requests_at_full_speed = read_ahead + 1; // 1 request + read_ahead
374
375 compat::optional<size_t> initial_read_size;
376
377 auto read_whole_file_with_slow_start = [&] (auto fstr) {
378 uint64_t total_read = 0;
379 size_t previous_buffer_length = 0;
380
381 // We don't want to assume too much about fstream internals, but with
382 // no history we should start with a buffer sizes somewhere in
383 // (0, buffer_size) range.
384 mock_file->set_read_size_verifier([&] (size_t length) {
385 BOOST_CHECK_LE(length, initial_read_size.value_or(buffer_size - 1));
386 BOOST_CHECK_GE(length, initial_read_size.value_or(1));
387 previous_buffer_length = length;
388 if (!initial_read_size) {
389 initial_read_size = length;
390 }
391 });
392
393 // Slow start phase
394 while (true) {
395 // We should leave slow start before reading the whole file.
396 BOOST_CHECK_LT(total_read, file_size);
397
398 mock_file->set_allowed_read_requests(requests_at_slow_start);
399 auto buf = fstr.read().get0();
400 BOOST_CHECK_GT(buf.size(), 0u);
401
402 mock_file->set_read_size_verifier([&] (size_t length) {
403 // There is no reason to reduce buffer size.
404 BOOST_CHECK_LE(length, std::min(previous_buffer_length * 2, buffer_size));
405 BOOST_CHECK_GE(length, previous_buffer_length);
406 previous_buffer_length = length;
407 });
408
409 BOOST_TEST_MESSAGE(format("Size {:d}", buf.size()));
410 total_read += buf.size();
411 if (buf.size() == buffer_size) {
412 BOOST_TEST_MESSAGE("Leaving slow start phase.");
413 break;
414 }
415 }
416
417 // Reading at full speed now
418 mock_file->set_expected_read_size(buffer_size);
419 while (total_read != file_size) {
420 mock_file->set_allowed_read_requests(requests_at_full_speed);
421 auto buf = fstr.read().get0();
422 total_read += buf.size();
423 }
424
425 mock_file->set_allowed_read_requests(requests_at_full_speed);
426 auto buf = fstr.read().get0();
427 BOOST_CHECK_EQUAL(buf.size(), 0u);
428 assert(buf.size() == 0);
429 };
430
431 auto read_while_file_at_full_speed = [&] (auto fstr) {
432 uint64_t total_read = 0;
433
434 mock_file->set_expected_read_size(buffer_size);
435 while (total_read != file_size) {
436 mock_file->set_allowed_read_requests(requests_at_full_speed);
437 auto buf = fstr.read().get0();
438 total_read += buf.size();
439 }
440
441 mock_file->set_allowed_read_requests(requests_at_full_speed);
442 auto buf = fstr.read().get0();
443 BOOST_CHECK_EQUAL(buf.size(), 0u);
444 };
445
446 auto read_and_skip_a_lot = [&] (auto fstr) {
447 uint64_t total_read = 0;
448 size_t previous_buffer_size = buffer_size;
449
450 mock_file->set_allowed_read_requests(std::numeric_limits<size_t>::max());
451 mock_file->set_read_size_verifier([&] (size_t length) {
452 // There is no reason to reduce buffer size.
453 BOOST_CHECK_LE(length, previous_buffer_size);
454 BOOST_CHECK_GE(length, initial_read_size.value_or(1));
455 previous_buffer_size = length;
456 });
457 while (total_read != file_size) {
458 auto buf = fstr.read().get0();
459 total_read += buf.size();
460
461 buf = fstr.read().get0();
462 total_read += buf.size();
463
464 auto skip_by = std::min(file_size - total_read, buffer_size * 2);
465 fstr.skip(skip_by).get();
466 total_read += skip_by;
467 }
468
469 // We should be back at slow start at this stage.
470 BOOST_CHECK_LT(previous_buffer_size, buffer_size);
471 if (initial_read_size) {
472 BOOST_CHECK_EQUAL(previous_buffer_size, *initial_read_size);
473 }
474
475 mock_file->set_allowed_read_requests(requests_at_full_speed);
476 auto buf = fstr.read().get0();
477 BOOST_CHECK_EQUAL(buf.size(), 0u);
478
479 };
480
481 auto make_fstream = [&] {
482 struct fstream_wrapper {
483 input_stream<char> s;
484 fstream_wrapper(fstream_wrapper&&) = default;
485 fstream_wrapper& operator=(fstream_wrapper&&) = default;
486 future<temporary_buffer<char>> read() {
487 return s.read();
488 }
489 future<> skip(uint64_t n) {
490 return s.skip(n);
491 }
492 ~fstream_wrapper() {
493 s.close().get();
494 }
495 };
496 return fstream_wrapper{make_file_input_stream(file(mock_file), 0, file_size, options)};
497 };
498
499 BOOST_TEST_MESSAGE("Reading file, no history, expectiong a slow start");
500 read_whole_file_with_slow_start(make_fstream());
501 BOOST_TEST_MESSAGE("Reading file again, everything good so far, read at full speed");
502 read_while_file_at_full_speed(make_fstream());
503 BOOST_TEST_MESSAGE("Reading and skipping a lot");
504 read_and_skip_a_lot(make_fstream());
505 BOOST_TEST_MESSAGE("Reading file, bad history, we are back at slow start...");
506 read_whole_file_with_slow_start(make_fstream());
507 BOOST_TEST_MESSAGE("Reading file yet again, should've recovered by now");
508 read_while_file_at_full_speed(make_fstream());
509 });
510 }