]> git.proxmox.com Git - ceph.git/blob - ceph/src/seastar/tests/unit/io_queue_test.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / seastar / tests / unit / io_queue_test.cc
1 /*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18
19 /*
20 * Copyright (C) 2021 ScyllaDB
21 */
22
23 #include <seastar/core/thread.hh>
24 #include <seastar/core/sleep.hh>
25 #include <seastar/testing/test_case.hh>
26 #include <seastar/testing/thread_test_case.hh>
27 #include <seastar/testing/test_runner.hh>
28 #include <seastar/core/reactor.hh>
29 #include <seastar/core/smp.hh>
30 #include <seastar/core/when_all.hh>
31 #include <seastar/core/file.hh>
32 #include <seastar/core/io_queue.hh>
33 #include <seastar/core/io_intent.hh>
34 #include <seastar/core/internal/io_request.hh>
35 #include <seastar/core/internal/io_sink.hh>
36 #include <seastar/util/internal/iovec_utils.hh>
37
38 using namespace seastar;
39
40 struct fake_file {
41 std::unordered_map<uint64_t, int> data;
42
43 static internal::io_request make_write_req(size_t idx, int* buf) {
44 return internal::io_request::make_write(0, idx, buf, 1, false);
45 }
46
47 static internal::io_request make_writev_req(size_t idx, int* buf, size_t nr, size_t buf_len, std::vector<::iovec>& vecs) {
48 vecs.reserve(nr);
49 for (unsigned i = 0; i < nr; i++) {
50 vecs.push_back({ &buf[i], buf_len });
51 }
52 return internal::io_request::make_writev(0, idx, vecs, false);
53 }
54
55 void execute_write_req(const internal::io_request& rq, io_completion* desc) {
56 const auto& op = rq.as<internal::io_request::operation::write>();
57 data[op.pos] = *(reinterpret_cast<int*>(op.addr));
58 desc->complete_with(op.size);
59 }
60
61 void execute_writev_req(const internal::io_request& rq, io_completion* desc) {
62 size_t len = 0;
63 const auto& op = rq.as<internal::io_request::operation::writev>();
64 for (unsigned i = 0; i < op.iov_len; i++) {
65 data[op.pos + i] = *(reinterpret_cast<int*>(op.iovec[i].iov_base));
66 len += op.iovec[i].iov_len;
67 }
68 desc->complete_with(len);
69 }
70 };
71
72 struct io_queue_for_tests {
73 io_group_ptr group;
74 internal::io_sink sink;
75 io_queue queue;
76 timer<> kicker;
77
78 io_queue_for_tests()
79 : group(std::make_shared<io_group>(io_queue::config{0}))
80 , sink()
81 , queue(group, sink)
82 , kicker([this] { kick(); })
83 {
84 kicker.arm_periodic(std::chrono::microseconds(500));
85 }
86
87 void kick() {
88 for (auto&& fg : group->_fgs) {
89 fg->replenish_capacity(std::chrono::steady_clock::now());
90 }
91 }
92 };
93
94 SEASTAR_THREAD_TEST_CASE(test_basic_flow) {
95 io_queue_for_tests tio;
96 fake_file file;
97
98 auto val = std::make_unique<int>(42);
99 auto f = tio.queue.queue_request(default_priority_class(), internal::io_direction_and_length(internal::io_direction_and_length::write_idx, 0), file.make_write_req(0, val.get()), nullptr, {})
100 .then([&file] (size_t len) {
101 BOOST_REQUIRE(file.data[0] == 42);
102 });
103
104 seastar::sleep(std::chrono::milliseconds(500)).get();
105 tio.queue.poll_io_queue();
106 tio.sink.drain([&file] (const internal::io_request& rq, io_completion* desc) -> bool {
107 file.execute_write_req(rq, desc);
108 return true;
109 });
110
111 f.get();
112 }
113
114 enum class part_flaw { none, partial, error };
115
116 static void do_test_large_request_flow(part_flaw flaw) {
117 io_queue_for_tests tio;
118 fake_file file;
119 int values[3] = { 13, 42, 73 };
120
121 auto limits = tio.queue.get_request_limits();
122
123 std::vector<::iovec> vecs;
124 auto f = tio.queue.queue_request(default_priority_class(), internal::io_direction_and_length(internal::io_direction_and_length::write_idx, limits.max_write * 3),
125 file.make_writev_req(0, values, 3, limits.max_write, vecs), nullptr, std::move(vecs))
126 .then([&file, &values, &limits, flaw] (size_t len) {
127 size_t expected = limits.max_write;
128
129 BOOST_REQUIRE_EQUAL(file.data[0 * limits.max_write], values[0]);
130
131 if (flaw == part_flaw::none) {
132 BOOST_REQUIRE_EQUAL(file.data[1 * limits.max_write], values[1]);
133 BOOST_REQUIRE_EQUAL(file.data[2 * limits.max_write], values[2]);
134 expected += 2 * limits.max_write;
135 }
136
137 if (flaw == part_flaw::partial) {
138 BOOST_REQUIRE_EQUAL(file.data[1 * limits.max_write], values[1]);
139 expected += limits.max_write / 2;
140 }
141
142 BOOST_REQUIRE_EQUAL(len, expected);
143 });
144
145 for (int i = 0; i < 3; i++) {
146 seastar::sleep(std::chrono::milliseconds(500)).get();
147 tio.queue.poll_io_queue();
148 tio.sink.drain([&file, i, flaw] (const internal::io_request& rq, io_completion* desc) -> bool {
149 if (i == 1) {
150 if (flaw == part_flaw::partial) {
151 const auto& op = rq.as<internal::io_request::operation::writev>();
152 op.iovec[0].iov_len /= 2;
153 }
154 if (flaw == part_flaw::error) {
155 desc->complete_with(-EIO);
156 return true;
157 }
158 }
159 file.execute_writev_req(rq, desc);
160 return true;
161 });
162 }
163
164 f.get();
165 }
166
167 SEASTAR_THREAD_TEST_CASE(test_large_request_flow) {
168 do_test_large_request_flow(part_flaw::none);
169 }
170
171 SEASTAR_THREAD_TEST_CASE(test_large_request_flow_partial) {
172 do_test_large_request_flow(part_flaw::partial);
173 }
174
175 SEASTAR_THREAD_TEST_CASE(test_large_request_flow_error) {
176 do_test_large_request_flow(part_flaw::error);
177 }
178
179 SEASTAR_THREAD_TEST_CASE(test_intent_safe_ref) {
180 auto get_cancelled = [] (internal::intent_reference& iref) -> bool {
181 try {
182 iref.retrieve();
183 return false;
184 } catch(seastar::cancelled_error& err) {
185 return true;
186 }
187 };
188
189 io_intent intent, intent_x;
190
191 internal::intent_reference ref_orig(&intent);
192 BOOST_REQUIRE(ref_orig.retrieve() == &intent);
193
194 // Test move armed
195 internal::intent_reference ref_armed(std::move(ref_orig));
196 BOOST_REQUIRE(ref_orig.retrieve() == nullptr);
197 BOOST_REQUIRE(ref_armed.retrieve() == &intent);
198
199 internal::intent_reference ref_armed_2(&intent_x);
200 ref_armed_2 = std::move(ref_armed);
201 BOOST_REQUIRE(ref_armed.retrieve() == nullptr);
202 BOOST_REQUIRE(ref_armed_2.retrieve() == &intent);
203
204 intent.cancel();
205 BOOST_REQUIRE(get_cancelled(ref_armed_2));
206
207 // Test move cancelled
208 internal::intent_reference ref_cancelled(std::move(ref_armed_2));
209 BOOST_REQUIRE(ref_armed_2.retrieve() == nullptr);
210 BOOST_REQUIRE(get_cancelled(ref_cancelled));
211
212 internal::intent_reference ref_cancelled_2(&intent_x);
213 ref_cancelled_2 = std::move(ref_cancelled);
214 BOOST_REQUIRE(ref_cancelled.retrieve() == nullptr);
215 BOOST_REQUIRE(get_cancelled(ref_cancelled_2));
216
217 // Test move empty
218 internal::intent_reference ref_empty(std::move(ref_orig));
219 BOOST_REQUIRE(ref_empty.retrieve() == nullptr);
220
221 internal::intent_reference ref_empty_2(&intent_x);
222 ref_empty_2 = std::move(ref_empty);
223 BOOST_REQUIRE(ref_empty_2.retrieve() == nullptr);
224 }
225
226 static constexpr int nr_requests = 24;
227
228 SEASTAR_THREAD_TEST_CASE(test_io_cancellation) {
229 fake_file file;
230
231 io_queue_for_tests tio;
232 io_priority_class pc0 = io_priority_class::register_one("a", 100);
233 io_priority_class pc1 = io_priority_class::register_one("b", 100);
234
235 size_t idx = 0;
236 int val = 100;
237
238 io_intent live, dead;
239
240 std::vector<future<>> finished;
241 std::vector<future<>> cancelled;
242
243 auto queue_legacy_request = [&] (io_queue_for_tests& q, io_priority_class& pc) {
244 auto buf = std::make_unique<int>(val);
245 auto f = q.queue.queue_request(pc, internal::io_direction_and_length(internal::io_direction_and_length::write_idx, 0), file.make_write_req(idx, buf.get()), nullptr, {})
246 .then([&file, idx, val, buf = std::move(buf)] (size_t len) {
247 BOOST_REQUIRE(file.data[idx] == val);
248 return make_ready_future<>();
249 });
250 finished.push_back(std::move(f));
251 idx++;
252 val++;
253 };
254
255 auto queue_live_request = [&] (io_queue_for_tests& q, io_priority_class& pc) {
256 auto buf = std::make_unique<int>(val);
257 auto f = q.queue.queue_request(pc, internal::io_direction_and_length(internal::io_direction_and_length::write_idx, 0), file.make_write_req(idx, buf.get()), &live, {})
258 .then([&file, idx, val, buf = std::move(buf)] (size_t len) {
259 BOOST_REQUIRE(file.data[idx] == val);
260 return make_ready_future<>();
261 });
262 finished.push_back(std::move(f));
263 idx++;
264 val++;
265 };
266
267 auto queue_dead_request = [&] (io_queue_for_tests& q, io_priority_class& pc) {
268 auto buf = std::make_unique<int>(val);
269 auto f = q.queue.queue_request(pc, internal::io_direction_and_length(internal::io_direction_and_length::write_idx, 0), file.make_write_req(idx, buf.get()), &dead, {})
270 .then_wrapped([buf = std::move(buf)] (auto&& f) {
271 try {
272 f.get();
273 BOOST_REQUIRE(false);
274 } catch(...) {}
275 return make_ready_future<>();
276 })
277 .then([&file, idx] () {
278 BOOST_REQUIRE(file.data[idx] == 0);
279 });
280 cancelled.push_back(std::move(f));
281 idx++;
282 val++;
283 };
284
285 auto seed = std::random_device{}();
286 std::default_random_engine reng(seed);
287 std::uniform_int_distribution<> dice(0, 5);
288
289 for (int i = 0; i < nr_requests; i++) {
290 int pc = dice(reng) % 2;
291 if (dice(reng) < 3) {
292 fmt::print("queue live req to pc {}\n", pc);
293 queue_live_request(tio, pc == 0 ? pc0 : pc1);
294 } else if (dice(reng) < 5) {
295 fmt::print("queue dead req to pc {}\n", pc);
296 queue_dead_request(tio, pc == 0 ? pc0 : pc1);
297 } else {
298 fmt::print("queue legacy req to pc {}\n", pc);
299 queue_legacy_request(tio, pc == 0 ? pc0 : pc1);
300 }
301 }
302
303 dead.cancel();
304
305 // cancelled requests must resolve right at once
306
307 when_all_succeed(cancelled.begin(), cancelled.end()).get();
308
309 seastar::sleep(std::chrono::milliseconds(500)).get();
310 tio.queue.poll_io_queue();
311 tio.sink.drain([&file] (const internal::io_request& rq, io_completion* desc) -> bool {
312 file.execute_write_req(rq, desc);
313 return true;
314 });
315
316 when_all_succeed(finished.begin(), finished.end()).get();
317 }
318
319 SEASTAR_TEST_CASE(test_request_buffer_split) {
320 auto ensure = [] (const std::vector<internal::io_request::part>& parts, const internal::io_request& req, int idx, uint64_t pos, size_t size, uintptr_t mem) {
321 BOOST_REQUIRE(parts[idx].req.opcode() == req.opcode());
322 const auto& op = req.as<internal::io_request::operation::read>();
323 const auto& sub_op = parts[idx].req.as<internal::io_request::operation::read>();
324 BOOST_REQUIRE_EQUAL(sub_op.fd, op.fd);
325 BOOST_REQUIRE_EQUAL(sub_op.pos, pos);
326 BOOST_REQUIRE_EQUAL(sub_op.size, size);
327 BOOST_REQUIRE_EQUAL(sub_op.addr, reinterpret_cast<void*>(mem));
328 BOOST_REQUIRE_EQUAL(sub_op.nowait_works, op.nowait_works);
329 BOOST_REQUIRE_EQUAL(parts[idx].iovecs.size(), 0);
330 BOOST_REQUIRE_EQUAL(parts[idx].size, sub_op.size);
331 };
332
333 // No split
334 {
335 internal::io_request req = internal::io_request::make_read(5, 13, reinterpret_cast<void*>(0x420), 17, true);
336 auto parts = req.split(21);
337 BOOST_REQUIRE_EQUAL(parts.size(), 1);
338 ensure(parts, req, 0, 13, 17, 0x420);
339 }
340
341 // Without tail
342 {
343 internal::io_request req = internal::io_request::make_read(7, 24, reinterpret_cast<void*>(0x4321), 24, true);
344 auto parts = req.split(12);
345 BOOST_REQUIRE_EQUAL(parts.size(), 2);
346 ensure(parts, req, 0, 24, 12, 0x4321);
347 ensure(parts, req, 1, 24 + 12, 12, 0x4321 + 12);
348 }
349
350 // With tail
351 {
352 internal::io_request req = internal::io_request::make_read(9, 42, reinterpret_cast<void*>(0x1234), 33, true);
353 auto parts = req.split(13);
354 BOOST_REQUIRE_EQUAL(parts.size(), 3);
355 ensure(parts, req, 0, 42, 13, 0x1234);
356 ensure(parts, req, 1, 42 + 13, 13, 0x1234 + 13);
357 ensure(parts, req, 2, 42 + 26, 7, 0x1234 + 26);
358 }
359
360 return make_ready_future<>();
361 }
362
363 static void show_request(const internal::io_request& req, void* buf_off, std::string pfx = "") {
364 if (!seastar_logger.is_enabled(log_level::trace)) {
365 return;
366 }
367
368 const auto& op = req.as<internal::io_request::operation::readv>();
369 seastar_logger.trace("{}{} iovecs on req:", pfx, op.iov_len);
370 for (unsigned i = 0; i < op.iov_len; i++) {
371 seastar_logger.trace("{} base={} len={}", pfx, reinterpret_cast<uintptr_t>(op.iovec[i].iov_base) - reinterpret_cast<uintptr_t>(buf_off), op.iovec[i].iov_len);
372 }
373 }
374
375 static void show_request_parts(const std::vector<internal::io_request::part>& parts, void* buf_off) {
376 if (!seastar_logger.is_enabled(log_level::trace)) {
377 return;
378 }
379
380 seastar_logger.trace("{} parts", parts.size());
381 for (const auto& p : parts) {
382 seastar_logger.trace(" size={} iovecs={}", p.size, p.iovecs.size());
383 seastar_logger.trace(" {} iovecs on part:", p.iovecs.size());
384 for (const auto& iov : p.iovecs) {
385 seastar_logger.trace(" base={} len={}", reinterpret_cast<uintptr_t>(iov.iov_base) - reinterpret_cast<uintptr_t>(buf_off), iov.iov_len);
386 }
387 show_request(p.req, buf_off, " ");
388 }
389 }
390
391 SEASTAR_TEST_CASE(test_request_iovec_split) {
392 char large_buffer[1025];
393
394 auto clear_buffer = [&large_buffer] {
395 memset(large_buffer, 0, sizeof(large_buffer));
396 };
397
398 auto bump_buffer = [] (const std::vector<::iovec>& vecs) {
399 for (auto&& v : vecs) {
400 for (unsigned i = 0; i < v.iov_len; i++) {
401 (reinterpret_cast<char*>(v.iov_base))[i]++;
402 }
403 }
404 };
405
406 auto check_buffer = [&large_buffer] (size_t len, char value) {
407 assert(len < sizeof(large_buffer));
408 bool fill_match = true;
409 bool train_match = true;
410 for (unsigned i = 0; i < sizeof(large_buffer); i++) {
411 if (i < len) {
412 if (large_buffer[i] != value) {
413 fill_match = false;
414 }
415 } else {
416 if (large_buffer[i] != '\0') {
417 train_match = false;
418 }
419 }
420 }
421 BOOST_REQUIRE_EQUAL(fill_match, true);
422 BOOST_REQUIRE_EQUAL(train_match, true);
423 };
424
425 auto ensure = [] (const std::vector<internal::io_request::part>& parts, const internal::io_request& req, int idx, uint64_t pos) {
426 BOOST_REQUIRE(parts[idx].req.opcode() == req.opcode());
427 const auto& op = req.as<internal::io_request::operation::writev>();
428 const auto& sub_op = parts[idx].req.as<internal::io_request::operation::writev>();
429 BOOST_REQUIRE_EQUAL(sub_op.fd, op.fd);
430 BOOST_REQUIRE_EQUAL(sub_op.pos, pos);
431 BOOST_REQUIRE_EQUAL(sub_op.iov_len, parts[idx].iovecs.size());
432 BOOST_REQUIRE_EQUAL(sub_op.nowait_works, op.nowait_works);
433 BOOST_REQUIRE_EQUAL(parts[idx].size, internal::iovec_len(parts[idx].iovecs));
434
435 for (unsigned iov = 0; iov < parts[idx].iovecs.size(); iov++) {
436 BOOST_REQUIRE_EQUAL(sub_op.iovec[iov].iov_base, parts[idx].iovecs[iov].iov_base);
437 BOOST_REQUIRE_EQUAL(sub_op.iovec[iov].iov_len, parts[idx].iovecs[iov].iov_len);
438 }
439 };
440
441 std::default_random_engine& reng = testing::local_random_engine;
442 auto dice = std::uniform_int_distribution<uint16_t>(1, 31);
443 auto stop = std::chrono::steady_clock::now() + std::chrono::seconds(4);
444 uint64_t iter = 0;
445 unsigned no_splits = 0;
446 unsigned no_tails = 0;
447
448 do {
449 seastar_logger.debug("===== iter {} =====", iter++);
450 std::vector<::iovec> vecs;
451 unsigned nr_vecs = dice(reng) % 13 + 1;
452 seastar_logger.debug("Generate {} iovecs", nr_vecs);
453 size_t total = 0;
454 for (unsigned i = 0; i < nr_vecs; i++) {
455 ::iovec iov;
456 iov.iov_base = reinterpret_cast<void*>(large_buffer + total);
457 iov.iov_len = dice(reng);
458 assert(iov.iov_len != 0);
459 total += iov.iov_len;
460 vecs.push_back(std::move(iov));
461 }
462
463 assert(total > 0);
464 clear_buffer();
465 bump_buffer(vecs);
466 check_buffer(total, 1);
467
468 size_t file_off = dice(reng);
469 internal::io_request req = internal::io_request::make_readv(5, file_off, vecs, true);
470
471 show_request(req, large_buffer);
472
473 size_t max_len = dice(reng) * 3;
474 unsigned nr_parts = (total + max_len - 1) / max_len;
475 seastar_logger.debug("Split {} into {}-bytes ({} parts)", total, max_len, nr_parts);
476 auto parts = req.split(max_len);
477 show_request_parts(parts, large_buffer);
478 BOOST_REQUIRE_EQUAL(parts.size(), nr_parts);
479
480 size_t parts_total = 0;
481 for (unsigned p = 0; p < nr_parts; p++) {
482 ensure(parts, req, p, file_off + parts_total);
483 if (p < nr_parts - 1) {
484 BOOST_REQUIRE_EQUAL(parts[p].size, max_len);
485 }
486 parts_total += parts[p].size;
487 bump_buffer(parts[p].iovecs);
488 }
489 BOOST_REQUIRE_EQUAL(parts_total, total);
490 check_buffer(total, 2);
491
492 if (parts.size() == 1) {
493 no_splits++;
494 }
495 if (parts.back().size == max_len) {
496 no_tails++;
497 }
498 } while (std::chrono::steady_clock::now() < stop || iter < 32 || no_splits < 16 || no_tails < 16);
499
500 seastar_logger.info("{} iters ({} no-splits, {} no-tails)", iter, no_splits, no_tails);
501
502 return make_ready_future<>();
503 }