2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
20 * Copyright (C) 2021 ScyllaDB
23 #include <seastar/core/thread.hh>
24 #include <seastar/core/sleep.hh>
25 #include <seastar/testing/test_case.hh>
26 #include <seastar/testing/thread_test_case.hh>
27 #include <seastar/testing/test_runner.hh>
28 #include <seastar/core/reactor.hh>
29 #include <seastar/core/smp.hh>
30 #include <seastar/core/when_all.hh>
31 #include <seastar/core/file.hh>
32 #include <seastar/core/io_queue.hh>
33 #include <seastar/core/io_intent.hh>
34 #include <seastar/core/internal/io_request.hh>
35 #include <seastar/core/internal/io_sink.hh>
36 #include <seastar/util/internal/iovec_utils.hh>
38 using namespace seastar
;
41 std::unordered_map
<uint64_t, int> data
;
43 static internal::io_request
make_write_req(size_t idx
, int* buf
) {
44 return internal::io_request::make_write(0, idx
, buf
, 1, false);
47 static internal::io_request
make_writev_req(size_t idx
, int* buf
, size_t nr
, size_t buf_len
, std::vector
<::iovec
>& vecs
) {
49 for (unsigned i
= 0; i
< nr
; i
++) {
50 vecs
.push_back({ &buf
[i
], buf_len
});
52 return internal::io_request::make_writev(0, idx
, vecs
, false);
55 void execute_write_req(const internal::io_request
& rq
, io_completion
* desc
) {
56 const auto& op
= rq
.as
<internal::io_request::operation::write
>();
57 data
[op
.pos
] = *(reinterpret_cast<int*>(op
.addr
));
58 desc
->complete_with(op
.size
);
61 void execute_writev_req(const internal::io_request
& rq
, io_completion
* desc
) {
63 const auto& op
= rq
.as
<internal::io_request::operation::writev
>();
64 for (unsigned i
= 0; i
< op
.iov_len
; i
++) {
65 data
[op
.pos
+ i
] = *(reinterpret_cast<int*>(op
.iovec
[i
].iov_base
));
66 len
+= op
.iovec
[i
].iov_len
;
68 desc
->complete_with(len
);
72 struct io_queue_for_tests
{
74 internal::io_sink sink
;
79 : group(std::make_shared
<io_group
>(io_queue::config
{0}))
82 , kicker([this] { kick(); })
84 kicker
.arm_periodic(std::chrono::microseconds(500));
88 for (auto&& fg
: group
->_fgs
) {
89 fg
->replenish_capacity(std::chrono::steady_clock::now());
94 SEASTAR_THREAD_TEST_CASE(test_basic_flow
) {
95 io_queue_for_tests tio
;
98 auto val
= std::make_unique
<int>(42);
99 auto f
= tio
.queue
.queue_request(default_priority_class(), internal::io_direction_and_length(internal::io_direction_and_length::write_idx
, 0), file
.make_write_req(0, val
.get()), nullptr, {})
100 .then([&file
] (size_t len
) {
101 BOOST_REQUIRE(file
.data
[0] == 42);
104 seastar::sleep(std::chrono::milliseconds(500)).get();
105 tio
.queue
.poll_io_queue();
106 tio
.sink
.drain([&file
] (const internal::io_request
& rq
, io_completion
* desc
) -> bool {
107 file
.execute_write_req(rq
, desc
);
114 enum class part_flaw
{ none
, partial
, error
};
116 static void do_test_large_request_flow(part_flaw flaw
) {
117 io_queue_for_tests tio
;
119 int values
[3] = { 13, 42, 73 };
121 auto limits
= tio
.queue
.get_request_limits();
123 std::vector
<::iovec
> vecs
;
124 auto f
= tio
.queue
.queue_request(default_priority_class(), internal::io_direction_and_length(internal::io_direction_and_length::write_idx
, limits
.max_write
* 3),
125 file
.make_writev_req(0, values
, 3, limits
.max_write
, vecs
), nullptr, std::move(vecs
))
126 .then([&file
, &values
, &limits
, flaw
] (size_t len
) {
127 size_t expected
= limits
.max_write
;
129 BOOST_REQUIRE_EQUAL(file
.data
[0 * limits
.max_write
], values
[0]);
131 if (flaw
== part_flaw::none
) {
132 BOOST_REQUIRE_EQUAL(file
.data
[1 * limits
.max_write
], values
[1]);
133 BOOST_REQUIRE_EQUAL(file
.data
[2 * limits
.max_write
], values
[2]);
134 expected
+= 2 * limits
.max_write
;
137 if (flaw
== part_flaw::partial
) {
138 BOOST_REQUIRE_EQUAL(file
.data
[1 * limits
.max_write
], values
[1]);
139 expected
+= limits
.max_write
/ 2;
142 BOOST_REQUIRE_EQUAL(len
, expected
);
145 for (int i
= 0; i
< 3; i
++) {
146 seastar::sleep(std::chrono::milliseconds(500)).get();
147 tio
.queue
.poll_io_queue();
148 tio
.sink
.drain([&file
, i
, flaw
] (const internal::io_request
& rq
, io_completion
* desc
) -> bool {
150 if (flaw
== part_flaw::partial
) {
151 const auto& op
= rq
.as
<internal::io_request::operation::writev
>();
152 op
.iovec
[0].iov_len
/= 2;
154 if (flaw
== part_flaw::error
) {
155 desc
->complete_with(-EIO
);
159 file
.execute_writev_req(rq
, desc
);
167 SEASTAR_THREAD_TEST_CASE(test_large_request_flow
) {
168 do_test_large_request_flow(part_flaw::none
);
171 SEASTAR_THREAD_TEST_CASE(test_large_request_flow_partial
) {
172 do_test_large_request_flow(part_flaw::partial
);
175 SEASTAR_THREAD_TEST_CASE(test_large_request_flow_error
) {
176 do_test_large_request_flow(part_flaw::error
);
179 SEASTAR_THREAD_TEST_CASE(test_intent_safe_ref
) {
180 auto get_cancelled
= [] (internal::intent_reference
& iref
) -> bool {
184 } catch(seastar::cancelled_error
& err
) {
189 io_intent intent
, intent_x
;
191 internal::intent_reference
ref_orig(&intent
);
192 BOOST_REQUIRE(ref_orig
.retrieve() == &intent
);
195 internal::intent_reference
ref_armed(std::move(ref_orig
));
196 BOOST_REQUIRE(ref_orig
.retrieve() == nullptr);
197 BOOST_REQUIRE(ref_armed
.retrieve() == &intent
);
199 internal::intent_reference
ref_armed_2(&intent_x
);
200 ref_armed_2
= std::move(ref_armed
);
201 BOOST_REQUIRE(ref_armed
.retrieve() == nullptr);
202 BOOST_REQUIRE(ref_armed_2
.retrieve() == &intent
);
205 BOOST_REQUIRE(get_cancelled(ref_armed_2
));
207 // Test move cancelled
208 internal::intent_reference
ref_cancelled(std::move(ref_armed_2
));
209 BOOST_REQUIRE(ref_armed_2
.retrieve() == nullptr);
210 BOOST_REQUIRE(get_cancelled(ref_cancelled
));
212 internal::intent_reference
ref_cancelled_2(&intent_x
);
213 ref_cancelled_2
= std::move(ref_cancelled
);
214 BOOST_REQUIRE(ref_cancelled
.retrieve() == nullptr);
215 BOOST_REQUIRE(get_cancelled(ref_cancelled_2
));
218 internal::intent_reference
ref_empty(std::move(ref_orig
));
219 BOOST_REQUIRE(ref_empty
.retrieve() == nullptr);
221 internal::intent_reference
ref_empty_2(&intent_x
);
222 ref_empty_2
= std::move(ref_empty
);
223 BOOST_REQUIRE(ref_empty_2
.retrieve() == nullptr);
226 static constexpr int nr_requests
= 24;
228 SEASTAR_THREAD_TEST_CASE(test_io_cancellation
) {
231 io_queue_for_tests tio
;
232 io_priority_class pc0
= io_priority_class::register_one("a", 100);
233 io_priority_class pc1
= io_priority_class::register_one("b", 100);
238 io_intent live
, dead
;
240 std::vector
<future
<>> finished
;
241 std::vector
<future
<>> cancelled
;
243 auto queue_legacy_request
= [&] (io_queue_for_tests
& q
, io_priority_class
& pc
) {
244 auto buf
= std::make_unique
<int>(val
);
245 auto f
= q
.queue
.queue_request(pc
, internal::io_direction_and_length(internal::io_direction_and_length::write_idx
, 0), file
.make_write_req(idx
, buf
.get()), nullptr, {})
246 .then([&file
, idx
, val
, buf
= std::move(buf
)] (size_t len
) {
247 BOOST_REQUIRE(file
.data
[idx
] == val
);
248 return make_ready_future
<>();
250 finished
.push_back(std::move(f
));
255 auto queue_live_request
= [&] (io_queue_for_tests
& q
, io_priority_class
& pc
) {
256 auto buf
= std::make_unique
<int>(val
);
257 auto f
= q
.queue
.queue_request(pc
, internal::io_direction_and_length(internal::io_direction_and_length::write_idx
, 0), file
.make_write_req(idx
, buf
.get()), &live
, {})
258 .then([&file
, idx
, val
, buf
= std::move(buf
)] (size_t len
) {
259 BOOST_REQUIRE(file
.data
[idx
] == val
);
260 return make_ready_future
<>();
262 finished
.push_back(std::move(f
));
267 auto queue_dead_request
= [&] (io_queue_for_tests
& q
, io_priority_class
& pc
) {
268 auto buf
= std::make_unique
<int>(val
);
269 auto f
= q
.queue
.queue_request(pc
, internal::io_direction_and_length(internal::io_direction_and_length::write_idx
, 0), file
.make_write_req(idx
, buf
.get()), &dead
, {})
270 .then_wrapped([buf
= std::move(buf
)] (auto&& f
) {
273 BOOST_REQUIRE(false);
275 return make_ready_future
<>();
277 .then([&file
, idx
] () {
278 BOOST_REQUIRE(file
.data
[idx
] == 0);
280 cancelled
.push_back(std::move(f
));
285 auto seed
= std::random_device
{}();
286 std::default_random_engine
reng(seed
);
287 std::uniform_int_distribution
<> dice(0, 5);
289 for (int i
= 0; i
< nr_requests
; i
++) {
290 int pc
= dice(reng
) % 2;
291 if (dice(reng
) < 3) {
292 fmt::print("queue live req to pc {}\n", pc
);
293 queue_live_request(tio
, pc
== 0 ? pc0
: pc1
);
294 } else if (dice(reng
) < 5) {
295 fmt::print("queue dead req to pc {}\n", pc
);
296 queue_dead_request(tio
, pc
== 0 ? pc0
: pc1
);
298 fmt::print("queue legacy req to pc {}\n", pc
);
299 queue_legacy_request(tio
, pc
== 0 ? pc0
: pc1
);
305 // cancelled requests must resolve right at once
307 when_all_succeed(cancelled
.begin(), cancelled
.end()).get();
309 seastar::sleep(std::chrono::milliseconds(500)).get();
310 tio
.queue
.poll_io_queue();
311 tio
.sink
.drain([&file
] (const internal::io_request
& rq
, io_completion
* desc
) -> bool {
312 file
.execute_write_req(rq
, desc
);
316 when_all_succeed(finished
.begin(), finished
.end()).get();
319 SEASTAR_TEST_CASE(test_request_buffer_split
) {
320 auto ensure
= [] (const std::vector
<internal::io_request::part
>& parts
, const internal::io_request
& req
, int idx
, uint64_t pos
, size_t size
, uintptr_t mem
) {
321 BOOST_REQUIRE(parts
[idx
].req
.opcode() == req
.opcode());
322 const auto& op
= req
.as
<internal::io_request::operation::read
>();
323 const auto& sub_op
= parts
[idx
].req
.as
<internal::io_request::operation::read
>();
324 BOOST_REQUIRE_EQUAL(sub_op
.fd
, op
.fd
);
325 BOOST_REQUIRE_EQUAL(sub_op
.pos
, pos
);
326 BOOST_REQUIRE_EQUAL(sub_op
.size
, size
);
327 BOOST_REQUIRE_EQUAL(sub_op
.addr
, reinterpret_cast<void*>(mem
));
328 BOOST_REQUIRE_EQUAL(sub_op
.nowait_works
, op
.nowait_works
);
329 BOOST_REQUIRE_EQUAL(parts
[idx
].iovecs
.size(), 0);
330 BOOST_REQUIRE_EQUAL(parts
[idx
].size
, sub_op
.size
);
335 internal::io_request req
= internal::io_request::make_read(5, 13, reinterpret_cast<void*>(0x420), 17, true);
336 auto parts
= req
.split(21);
337 BOOST_REQUIRE_EQUAL(parts
.size(), 1);
338 ensure(parts
, req
, 0, 13, 17, 0x420);
343 internal::io_request req
= internal::io_request::make_read(7, 24, reinterpret_cast<void*>(0x4321), 24, true);
344 auto parts
= req
.split(12);
345 BOOST_REQUIRE_EQUAL(parts
.size(), 2);
346 ensure(parts
, req
, 0, 24, 12, 0x4321);
347 ensure(parts
, req
, 1, 24 + 12, 12, 0x4321 + 12);
352 internal::io_request req
= internal::io_request::make_read(9, 42, reinterpret_cast<void*>(0x1234), 33, true);
353 auto parts
= req
.split(13);
354 BOOST_REQUIRE_EQUAL(parts
.size(), 3);
355 ensure(parts
, req
, 0, 42, 13, 0x1234);
356 ensure(parts
, req
, 1, 42 + 13, 13, 0x1234 + 13);
357 ensure(parts
, req
, 2, 42 + 26, 7, 0x1234 + 26);
360 return make_ready_future
<>();
363 static void show_request(const internal::io_request
& req
, void* buf_off
, std::string pfx
= "") {
364 if (!seastar_logger
.is_enabled(log_level::trace
)) {
368 const auto& op
= req
.as
<internal::io_request::operation::readv
>();
369 seastar_logger
.trace("{}{} iovecs on req:", pfx
, op
.iov_len
);
370 for (unsigned i
= 0; i
< op
.iov_len
; i
++) {
371 seastar_logger
.trace("{} base={} len={}", pfx
, reinterpret_cast<uintptr_t>(op
.iovec
[i
].iov_base
) - reinterpret_cast<uintptr_t>(buf_off
), op
.iovec
[i
].iov_len
);
375 static void show_request_parts(const std::vector
<internal::io_request::part
>& parts
, void* buf_off
) {
376 if (!seastar_logger
.is_enabled(log_level::trace
)) {
380 seastar_logger
.trace("{} parts", parts
.size());
381 for (const auto& p
: parts
) {
382 seastar_logger
.trace(" size={} iovecs={}", p
.size
, p
.iovecs
.size());
383 seastar_logger
.trace(" {} iovecs on part:", p
.iovecs
.size());
384 for (const auto& iov
: p
.iovecs
) {
385 seastar_logger
.trace(" base={} len={}", reinterpret_cast<uintptr_t>(iov
.iov_base
) - reinterpret_cast<uintptr_t>(buf_off
), iov
.iov_len
);
387 show_request(p
.req
, buf_off
, " ");
391 SEASTAR_TEST_CASE(test_request_iovec_split
) {
392 char large_buffer
[1025];
394 auto clear_buffer
= [&large_buffer
] {
395 memset(large_buffer
, 0, sizeof(large_buffer
));
398 auto bump_buffer
= [] (const std::vector
<::iovec
>& vecs
) {
399 for (auto&& v
: vecs
) {
400 for (unsigned i
= 0; i
< v
.iov_len
; i
++) {
401 (reinterpret_cast<char*>(v
.iov_base
))[i
]++;
406 auto check_buffer
= [&large_buffer
] (size_t len
, char value
) {
407 assert(len
< sizeof(large_buffer
));
408 bool fill_match
= true;
409 bool train_match
= true;
410 for (unsigned i
= 0; i
< sizeof(large_buffer
); i
++) {
412 if (large_buffer
[i
] != value
) {
416 if (large_buffer
[i
] != '\0') {
421 BOOST_REQUIRE_EQUAL(fill_match
, true);
422 BOOST_REQUIRE_EQUAL(train_match
, true);
425 auto ensure
= [] (const std::vector
<internal::io_request::part
>& parts
, const internal::io_request
& req
, int idx
, uint64_t pos
) {
426 BOOST_REQUIRE(parts
[idx
].req
.opcode() == req
.opcode());
427 const auto& op
= req
.as
<internal::io_request::operation::writev
>();
428 const auto& sub_op
= parts
[idx
].req
.as
<internal::io_request::operation::writev
>();
429 BOOST_REQUIRE_EQUAL(sub_op
.fd
, op
.fd
);
430 BOOST_REQUIRE_EQUAL(sub_op
.pos
, pos
);
431 BOOST_REQUIRE_EQUAL(sub_op
.iov_len
, parts
[idx
].iovecs
.size());
432 BOOST_REQUIRE_EQUAL(sub_op
.nowait_works
, op
.nowait_works
);
433 BOOST_REQUIRE_EQUAL(parts
[idx
].size
, internal::iovec_len(parts
[idx
].iovecs
));
435 for (unsigned iov
= 0; iov
< parts
[idx
].iovecs
.size(); iov
++) {
436 BOOST_REQUIRE_EQUAL(sub_op
.iovec
[iov
].iov_base
, parts
[idx
].iovecs
[iov
].iov_base
);
437 BOOST_REQUIRE_EQUAL(sub_op
.iovec
[iov
].iov_len
, parts
[idx
].iovecs
[iov
].iov_len
);
441 std::default_random_engine
& reng
= testing::local_random_engine
;
442 auto dice
= std::uniform_int_distribution
<uint16_t>(1, 31);
443 auto stop
= std::chrono::steady_clock::now() + std::chrono::seconds(4);
445 unsigned no_splits
= 0;
446 unsigned no_tails
= 0;
449 seastar_logger
.debug("===== iter {} =====", iter
++);
450 std::vector
<::iovec
> vecs
;
451 unsigned nr_vecs
= dice(reng
) % 13 + 1;
452 seastar_logger
.debug("Generate {} iovecs", nr_vecs
);
454 for (unsigned i
= 0; i
< nr_vecs
; i
++) {
456 iov
.iov_base
= reinterpret_cast<void*>(large_buffer
+ total
);
457 iov
.iov_len
= dice(reng
);
458 assert(iov
.iov_len
!= 0);
459 total
+= iov
.iov_len
;
460 vecs
.push_back(std::move(iov
));
466 check_buffer(total
, 1);
468 size_t file_off
= dice(reng
);
469 internal::io_request req
= internal::io_request::make_readv(5, file_off
, vecs
, true);
471 show_request(req
, large_buffer
);
473 size_t max_len
= dice(reng
) * 3;
474 unsigned nr_parts
= (total
+ max_len
- 1) / max_len
;
475 seastar_logger
.debug("Split {} into {}-bytes ({} parts)", total
, max_len
, nr_parts
);
476 auto parts
= req
.split(max_len
);
477 show_request_parts(parts
, large_buffer
);
478 BOOST_REQUIRE_EQUAL(parts
.size(), nr_parts
);
480 size_t parts_total
= 0;
481 for (unsigned p
= 0; p
< nr_parts
; p
++) {
482 ensure(parts
, req
, p
, file_off
+ parts_total
);
483 if (p
< nr_parts
- 1) {
484 BOOST_REQUIRE_EQUAL(parts
[p
].size
, max_len
);
486 parts_total
+= parts
[p
].size
;
487 bump_buffer(parts
[p
].iovecs
);
489 BOOST_REQUIRE_EQUAL(parts_total
, total
);
490 check_buffer(total
, 2);
492 if (parts
.size() == 1) {
495 if (parts
.back().size
== max_len
) {
498 } while (std::chrono::steady_clock::now() < stop
|| iter
< 32 || no_splits
< 16 || no_tails
< 16);
500 seastar_logger
.info("{} iters ({} no-splits, {} no-tails)", iter
, no_splits
, no_tails
);
502 return make_ready_future
<>();