2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
19 * Copyright (C) 2015 Cloudius Systems, Ltd.
25 #include <seastar/core/reactor.hh>
26 #include <seastar/core/fstream.hh>
27 #include <seastar/core/shared_ptr.hh>
28 #include <seastar/core/app-template.hh>
29 #include <seastar/core/do_with.hh>
30 #include <seastar/core/seastar.hh>
31 #include <seastar/testing/test_case.hh>
32 #include <seastar/core/thread.hh>
33 #include <seastar/util/defer.hh>
35 #include <boost/range/adaptor/transformed.hpp>
36 #include <boost/algorithm/cxx11/any_of.hpp>
37 #include "mock_file.hh"
39 using namespace seastar
;
42 output_stream
<char> out
;
43 writer(file f
) : out(make_file_output_stream(std::move(f
))) {}
47 input_stream
<char> in
;
48 reader(file f
) : in(make_file_input_stream(std::move(f
))) {}
49 reader(file f
, file_input_stream_options options
) : in(make_file_input_stream(std::move(f
), std::move(options
))) {}
52 SEASTAR_TEST_CASE(test_fstream
) {
53 auto sem
= make_lw_shared
<semaphore
>(0);
55 open_file_dma("testfile.tmp",
56 open_flags::rw
| open_flags::create
| open_flags::truncate
).then([sem
] (file f
) {
57 auto w
= make_shared
<writer
>(std::move(f
));
58 auto buf
= static_cast<char*>(::malloc(4096));
63 w
->out
.write(buf
, 4096).then([buf
, w
] {
65 return make_ready_future
<>();
67 auto buf
= static_cast<char*>(::malloc(8192));
72 return w
->out
.write(buf
, 8192).then([buf
, w
] {
74 return w
->out
.close().then([w
] {});
77 return open_file_dma("testfile.tmp", open_flags::ro
);
79 /* file content after running the above:
80 * 00000000 5b 41 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |[A..............|
81 * 00000010 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |................|
83 * 00000ff0 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 5d |...............]|
84 * 00001000 5b 42 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |[B..............|
85 * 00001010 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 |................|
87 * 00002ff0 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 5d |...............]|
90 auto r
= make_shared
<reader
>(std::move(f
));
91 return r
->in
.read_exactly(4096 + 8192).then([r
] (temporary_buffer
<char> buf
) {
93 BOOST_REQUIRE(p
[0] == '[' && p
[1] == 'A' && p
[4095] == ']');
94 BOOST_REQUIRE(p
[4096] == '[' && p
[4096 + 1] == 'B' && p
[4096 + 8191] == ']');
95 return make_ready_future
<>();
107 SEASTAR_TEST_CASE(test_consume_skip_bytes
) {
108 return seastar::async([] {
109 auto f
= open_file_dma("testfile.tmp",
110 open_flags::rw
| open_flags::create
| open_flags::truncate
).get0();
111 auto w
= make_lw_shared
<writer
>(std::move(f
));
112 auto write_block
= [w
] (char c
, size_t size
) {
113 std::vector
<char> vec(size
, c
);
114 w
->out
.write(&vec
.front(), vec
.size()).get();
116 write_block('a', 8192);
117 write_block('b', 8192);
118 w
->out
.close().get();
119 /* file content after running the above:
120 * 00000000 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
122 * 00002000 62 62 62 62 62 62 62 62 62 62 62 62 62 62 62 62 |bbbbbbbbbbbbbbbb|
126 f
= open_file_dma("testfile.tmp", open_flags::ro
).get0();
127 auto r
= make_lw_shared
<reader
>(std::move(f
), file_input_stream_options
{512});
130 using consumption_result_type
= typename input_stream
<char>::consumption_result_type
;
131 using stop_consuming_type
= typename
consumption_result_type::stop_consuming_type
;
132 using tmp_buf
= stop_consuming_type::tmp_buf
;
135 * Consumer reads the file as follows:
136 * - first 8000 bytes are read in 512-byte chunks and checked
137 * - next 2000 bytes are skipped (jumping over both read buffer size and DMA block)
138 * - the remaining 6384 bytes are read and checked
140 future
<consumption_result_type
> operator()(tmp_buf buf
) {
142 auto delta
= std::min(buf
.size(), 8000 - _count
);
143 for (auto c
: buf
.share(0, delta
)) {
144 BOOST_REQUIRE_EQUAL(c
, 'a');
146 buf
.trim_front(delta
);
149 if (_count
== 8000) {
150 return make_ready_future
<consumption_result_type
>(skip_bytes
{2000 - buf
.size()});
153 return make_ready_future
<consumption_result_type
>(continue_consuming
{});
155 return make_ready_future
<consumption_result_type
>(continue_consuming
{});
158 BOOST_REQUIRE_EQUAL(c
, 'b');
160 _count
+= buf
.size();
161 if (_count
< 14384) {
162 return make_ready_future
<consumption_result_type
>(continue_consuming
{});
163 } else if (_count
> 14384) {
164 BOOST_FAIL("Read more than expected");
166 return make_ready_future
<consumption_result_type
>(stop_consuming_type({}));
170 r
->in
.consume(consumer
{}).get();
175 SEASTAR_TEST_CASE(test_fstream_unaligned
) {
176 auto sem
= make_lw_shared
<semaphore
>(0);
178 open_file_dma("testfile.tmp",
179 open_flags::rw
| open_flags::create
| open_flags::truncate
).then([sem
] (file f
) {
180 auto w
= make_shared
<writer
>(std::move(f
));
181 auto buf
= static_cast<char*>(::malloc(40));
186 w
->out
.write(buf
, 40).then([buf
, w
] {
188 return w
->out
.close().then([w
] {});
190 return open_file_dma("testfile.tmp", open_flags::ro
);
191 }).then([] (file f
) {
192 return do_with(std::move(f
), [] (file
& f
) {
193 return f
.size().then([] (size_t size
) {
194 // assert that file was indeed truncated to the amount of bytes written.
195 BOOST_REQUIRE(size
== 40);
196 return make_ready_future
<>();
200 return open_file_dma("testfile.tmp", open_flags::ro
);
201 }).then([] (file f
) {
202 auto r
= make_shared
<reader
>(std::move(f
));
203 return r
->in
.read_exactly(40).then([r
] (temporary_buffer
<char> buf
) {
205 BOOST_REQUIRE(p
[0] == '[' && p
[1] == 'A' && p
[39] == ']');
206 return make_ready_future
<>();
208 return r
->in
.close();
210 }).finally([sem
] () {
218 future
<> test_consume_until_end(uint64_t size
) {
219 return open_file_dma("testfile.tmp",
220 open_flags::rw
| open_flags::create
| open_flags::truncate
).then([size
] (file f
) {
221 return do_with(make_file_output_stream(f
), [size
] (output_stream
<char>& out
) {
222 std::vector
<char> buf(size
);
223 std::iota(buf
.begin(), buf
.end(), 0);
224 return out
.write(buf
.data(), buf
.size()).then([&out
] {
229 }).then([size
, f
] (size_t real_size
) {
230 BOOST_REQUIRE_EQUAL(size
, real_size
);
232 auto consumer
= [offset
= uint64_t(0), size
] (temporary_buffer
<char> buf
) mutable -> future
<input_stream
<char>::unconsumed_remainder
> {
234 return make_ready_future
<input_stream
<char>::unconsumed_remainder
>(temporary_buffer
<char>());
236 BOOST_REQUIRE(offset
+ buf
.size() <= size
);
237 std::vector
<char> expected(buf
.size());
238 std::iota(expected
.begin(), expected
.end(), offset
);
239 offset
+= buf
.size();
240 BOOST_REQUIRE(std::equal(buf
.begin(), buf
.end(), expected
.begin()));
241 return make_ready_future
<input_stream
<char>::unconsumed_remainder
>(compat::nullopt
);
243 return do_with(make_file_input_stream(f
), std::move(consumer
), [] (input_stream
<char>& in
, auto& consumer
) {
244 return in
.consume(consumer
).then([&in
] {
253 SEASTAR_TEST_CASE(test_consume_aligned_file
) {
254 return test_consume_until_end(4096);
257 SEASTAR_TEST_CASE(test_consume_empty_file
) {
258 return test_consume_until_end(0);
261 SEASTAR_TEST_CASE(test_consume_unaligned_file
) {
262 return test_consume_until_end(1);
265 SEASTAR_TEST_CASE(test_consume_unaligned_file_large
) {
266 return test_consume_until_end((1 << 20) + 1);
269 SEASTAR_TEST_CASE(test_input_stream_esp_around_eof
) {
270 return seastar::async([] {
271 auto flen
= uint64_t(5341);
272 auto rdist
= std::uniform_int_distribution
<char>();
273 auto reng
= std::default_random_engine();
274 auto data
= boost::copy_range
<std::vector
<uint8_t>>(
275 boost::irange
<uint64_t>(0, flen
)
276 | boost::adaptors::transformed([&] (int x
) { return rdist(reng
); }));
277 auto f
= open_file_dma("file.tmp",
278 open_flags::rw
| open_flags::create
| open_flags::truncate
).get0();
279 auto out
= make_file_output_stream(f
);
280 out
.write(reinterpret_cast<const char*>(data
.data()), data
.size()).get();
282 //out.close().get(); // FIXME: closes underlying stream:?!
283 struct range
{ uint64_t start
; uint64_t end
; };
284 auto ranges
= std::vector
<range
>{{
291 range
{flen
- 1, flen
},
292 range
{flen
- 1, flen
+ 1},
293 range
{flen
, flen
+ 1},
294 range
{flen
+ 1, flen
+ 2},
297 range
{1023, flen
+ 2},
304 auto opt
= file_input_stream_options();
305 opt
.buffer_size
= 512;
306 for (auto&& r
: ranges
) {
307 auto start
= r
.start
;
309 auto len
= end
- start
;
310 auto in
= make_file_input_stream(f
, start
, len
, opt
);
311 std::vector
<uint8_t> readback
;
314 auto rdata
= in
.read().get0();
315 for (size_t i
= 0; i
< rdata
.size(); ++i
) {
316 readback
.push_back(rdata
.get()[i
]);
318 more
= !rdata
.empty();
321 auto xlen
= std::min(end
, flen
) - std::min(flen
, start
);
322 if (xlen
!= readback
.size()) {
323 BOOST_FAIL(format("Expected {:d} bytes but got {:d}, start={:d}, end={:d}", xlen
, readback
.size(), start
, end
));
325 BOOST_REQUIRE(std::equal(readback
.begin(), readback
.end(), data
.begin() + std::min(start
, flen
)));
331 SEASTAR_TEST_CASE(file_handle_test
) {
332 return seastar::async([] {
333 auto f
= open_file_dma("testfile.tmp", open_flags::create
| open_flags::truncate
| open_flags::rw
).get0();
334 auto buf
= static_cast<char*>(aligned_alloc(4096, 4096));
335 auto del
= defer([&] { ::free(buf
); });
336 for (unsigned i
= 0; i
< 4096; ++i
) {
339 f
.dma_write(0, buf
, 4096).get();
340 auto bad
= std::vector
<unsigned>(smp::count
); // std::vector<bool> is special and unsuitable because it uses bitfields
341 smp::invoke_on_all([fh
= f
.dup(), &bad
] {
342 return seastar::async([fh
, &bad
] {
343 auto f
= fh
.to_file();
344 auto buf
= static_cast<char*>(aligned_alloc(4096, 4096));
345 auto del
= defer([&] { ::free(buf
); });
346 f
.dma_read(0, buf
, 4096).get();
347 for (unsigned i
= 0; i
< 4096; ++i
) {
348 bad
[engine().cpu_id()] |= buf
[i
] != char(i
);
352 BOOST_REQUIRE(!boost::algorithm::any_of_equal(bad
, 1u));
357 SEASTAR_TEST_CASE(test_fstream_slow_start
) {
358 return seastar::async([] {
359 static constexpr size_t file_size
= 128 * 1024 * 1024;
360 static constexpr size_t buffer_size
= 260 * 1024;
361 static constexpr size_t read_ahead
= 1;
363 auto mock_file
= make_shared
<mock_read_only_file
>(file_size
);
365 auto history
= make_lw_shared
<file_input_stream_history
>();
367 file_input_stream_options options
{};
368 options
.buffer_size
= buffer_size
;
369 options
.read_ahead
= read_ahead
;
370 options
.dynamic_adjustments
= history
;
372 static constexpr size_t requests_at_slow_start
= 2; // 1 request + 1 read-ahead
373 static constexpr size_t requests_at_full_speed
= read_ahead
+ 1; // 1 request + read_ahead
375 compat::optional
<size_t> initial_read_size
;
377 auto read_whole_file_with_slow_start
= [&] (auto fstr
) {
378 uint64_t total_read
= 0;
379 size_t previous_buffer_length
= 0;
381 // We don't want to assume too much about fstream internals, but with
382 // no history we should start with a buffer sizes somewhere in
383 // (0, buffer_size) range.
384 mock_file
->set_read_size_verifier([&] (size_t length
) {
385 BOOST_CHECK_LE(length
, initial_read_size
.value_or(buffer_size
- 1));
386 BOOST_CHECK_GE(length
, initial_read_size
.value_or(1));
387 previous_buffer_length
= length
;
388 if (!initial_read_size
) {
389 initial_read_size
= length
;
395 // We should leave slow start before reading the whole file.
396 BOOST_CHECK_LT(total_read
, file_size
);
398 mock_file
->set_allowed_read_requests(requests_at_slow_start
);
399 auto buf
= fstr
.read().get0();
400 BOOST_CHECK_GT(buf
.size(), 0u);
402 mock_file
->set_read_size_verifier([&] (size_t length
) {
403 // There is no reason to reduce buffer size.
404 BOOST_CHECK_LE(length
, std::min(previous_buffer_length
* 2, buffer_size
));
405 BOOST_CHECK_GE(length
, previous_buffer_length
);
406 previous_buffer_length
= length
;
409 BOOST_TEST_MESSAGE(format("Size {:d}", buf
.size()));
410 total_read
+= buf
.size();
411 if (buf
.size() == buffer_size
) {
412 BOOST_TEST_MESSAGE("Leaving slow start phase.");
417 // Reading at full speed now
418 mock_file
->set_expected_read_size(buffer_size
);
419 while (total_read
!= file_size
) {
420 mock_file
->set_allowed_read_requests(requests_at_full_speed
);
421 auto buf
= fstr
.read().get0();
422 total_read
+= buf
.size();
425 mock_file
->set_allowed_read_requests(requests_at_full_speed
);
426 auto buf
= fstr
.read().get0();
427 BOOST_CHECK_EQUAL(buf
.size(), 0u);
428 assert(buf
.size() == 0);
431 auto read_while_file_at_full_speed
= [&] (auto fstr
) {
432 uint64_t total_read
= 0;
434 mock_file
->set_expected_read_size(buffer_size
);
435 while (total_read
!= file_size
) {
436 mock_file
->set_allowed_read_requests(requests_at_full_speed
);
437 auto buf
= fstr
.read().get0();
438 total_read
+= buf
.size();
441 mock_file
->set_allowed_read_requests(requests_at_full_speed
);
442 auto buf
= fstr
.read().get0();
443 BOOST_CHECK_EQUAL(buf
.size(), 0u);
446 auto read_and_skip_a_lot
= [&] (auto fstr
) {
447 uint64_t total_read
= 0;
448 size_t previous_buffer_size
= buffer_size
;
450 mock_file
->set_allowed_read_requests(std::numeric_limits
<size_t>::max());
451 mock_file
->set_read_size_verifier([&] (size_t length
) {
452 // There is no reason to reduce buffer size.
453 BOOST_CHECK_LE(length
, previous_buffer_size
);
454 BOOST_CHECK_GE(length
, initial_read_size
.value_or(1));
455 previous_buffer_size
= length
;
457 while (total_read
!= file_size
) {
458 auto buf
= fstr
.read().get0();
459 total_read
+= buf
.size();
461 buf
= fstr
.read().get0();
462 total_read
+= buf
.size();
464 auto skip_by
= std::min(file_size
- total_read
, buffer_size
* 2);
465 fstr
.skip(skip_by
).get();
466 total_read
+= skip_by
;
469 // We should be back at slow start at this stage.
470 BOOST_CHECK_LT(previous_buffer_size
, buffer_size
);
471 if (initial_read_size
) {
472 BOOST_CHECK_EQUAL(previous_buffer_size
, *initial_read_size
);
475 mock_file
->set_allowed_read_requests(requests_at_full_speed
);
476 auto buf
= fstr
.read().get0();
477 BOOST_CHECK_EQUAL(buf
.size(), 0u);
481 auto make_fstream
= [&] {
482 struct fstream_wrapper
{
483 input_stream
<char> s
;
484 fstream_wrapper(fstream_wrapper
&&) = default;
485 fstream_wrapper
& operator=(fstream_wrapper
&&) = default;
486 future
<temporary_buffer
<char>> read() {
489 future
<> skip(uint64_t n
) {
496 return fstream_wrapper
{make_file_input_stream(file(mock_file
), 0, file_size
, options
)};
499 BOOST_TEST_MESSAGE("Reading file, no history, expectiong a slow start");
500 read_whole_file_with_slow_start(make_fstream());
501 BOOST_TEST_MESSAGE("Reading file again, everything good so far, read at full speed");
502 read_while_file_at_full_speed(make_fstream());
503 BOOST_TEST_MESSAGE("Reading and skipping a lot");
504 read_and_skip_a_lot(make_fstream());
505 BOOST_TEST_MESSAGE("Reading file, bad history, we are back at slow start...");
506 read_whole_file_with_slow_start(make_fstream());
507 BOOST_TEST_MESSAGE("Reading file yet again, should've recovered by now");
508 read_while_file_at_full_speed(make_fstream());