]> git.proxmox.com Git - ceph.git/blame - ceph/src/seastar/apps/io_tester/io_tester.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / seastar / apps / io_tester / io_tester.cc
CommitLineData
11fdf7f2
TL
1/*
2 * This file is open source software, licensed to you under the terms
3 * of the Apache License, Version 2.0 (the "License"). See the NOTICE file
4 * distributed with this work for additional information regarding copyright
5 * ownership. You may not use this file except in compliance with the License.
6 *
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing,
12 * software distributed under the License is distributed on an
13 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 * KIND, either express or implied. See the License for the
15 * specific language governing permissions and limitations
16 * under the License.
17 */
18/*
19 * Copyright (C) 2017 ScyllaDB
20 */
21#include <seastar/core/app-template.hh>
22#include <seastar/core/distributed.hh>
23#include <seastar/core/reactor.hh>
24#include <seastar/core/future.hh>
25#include <seastar/core/shared_ptr.hh>
26#include <seastar/core/file.hh>
27#include <seastar/core/sleep.hh>
28#include <seastar/core/align.hh>
29#include <seastar/core/timer.hh>
30#include <seastar/core/thread.hh>
9f95a23c 31#include <seastar/core/print.hh>
f67539c2
TL
32#include <seastar/core/loop.hh>
33#include <seastar/core/with_scheduling_group.hh>
20effc67
TL
34#include <seastar/core/metrics_api.hh>
35#include <seastar/core/io_intent.hh>
11fdf7f2
TL
36#include <chrono>
37#include <vector>
38#include <boost/range/irange.hpp>
39#include <boost/algorithm/string.hpp>
40#include <boost/accumulators/accumulators.hpp>
41#include <boost/accumulators/statistics/stats.hpp>
42#include <boost/accumulators/statistics/max.hpp>
43#include <boost/accumulators/statistics/mean.hpp>
44#include <boost/accumulators/statistics/p_square_quantile.hpp>
45#include <boost/accumulators/statistics/extended_p_square.hpp>
46#include <boost/accumulators/statistics/extended_p_square_quantile.hpp>
47#include <boost/range/adaptor/filtered.hpp>
48#include <boost/range/adaptor/map.hpp>
49#include <boost/array.hpp>
50#include <iomanip>
51#include <random>
52#include <yaml-cpp/yaml.h>
53
54using namespace seastar;
55using namespace std::chrono_literals;
56using namespace boost::accumulators;
57
58static auto random_seed = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
59static std::default_random_engine random_generator(random_seed);
11fdf7f2 60
9f95a23c 61class context;
11fdf7f2
TL
62enum class request_type { seqread, seqwrite, randread, randwrite, append, cpu };
63
64namespace std {
65
66template <>
67struct hash<request_type> {
68 size_t operator() (const request_type& type) const {
69 return static_cast<size_t>(type);
70 }
71};
72
73}
74
75struct byte_size {
76 uint64_t size;
77};
78
79struct duration_time {
80 std::chrono::duration<float> time;
81};
82
83class shard_config {
84 std::unordered_set<unsigned> _shards;
85public:
86 shard_config()
87 : _shards(boost::copy_range<std::unordered_set<unsigned>>(boost::irange(0u, smp::count))) {}
88 shard_config(std::unordered_set<unsigned> s) : _shards(std::move(s)) {}
89
90 bool is_set(unsigned cpu) const {
91 return _shards.count(cpu);
92 }
93};
94
95struct shard_info {
20effc67
TL
96 unsigned parallelism = 0;
97 unsigned rps = 0;
11fdf7f2
TL
98 unsigned shares = 10;
99 uint64_t request_size = 4 << 10;
100 std::chrono::duration<float> think_time = 0ms;
20effc67 101 std::chrono::duration<float> think_after = 0ms;
11fdf7f2
TL
102 std::chrono::duration<float> execution_time = 1ms;
103 seastar::scheduling_group scheduling_group = seastar::default_scheduling_group();
104};
105
f67539c2
TL
106struct options {
107 bool dsync = false;
108};
109
11fdf7f2
TL
110class class_data;
111
112struct job_config {
113 std::string name;
114 request_type type;
115 shard_config shard_placement;
116 ::shard_info shard_info;
f67539c2 117 ::options options;
20effc67
TL
118 // size of each individual file. Every class and every shard have its file, so in a normal
119 // system with many shards we'll naturally have many files and that will push the data out
120 // of the disk's cache
121 uint64_t file_size;
122 uint64_t offset_in_bdev;
11fdf7f2
TL
123 std::unique_ptr<class_data> gen_class_data();
124};
125
126std::array<double, 4> quantiles = { 0.5, 0.95, 0.99, 0.999};
20effc67 127static bool keep_files = false;
11fdf7f2
TL
128
129class class_data {
130protected:
131 using accumulator_type = accumulator_set<double, stats<tag::extended_p_square_quantile(quadratic), tag::mean, tag::max>>;
132
133 job_config _config;
134 uint64_t _alignment;
135 uint64_t _last_pos = 0;
20effc67 136 uint64_t _offset = 0;
11fdf7f2
TL
137
138 io_priority_class _iop;
139 seastar::scheduling_group _sg;
140
141 size_t _data = 0;
142 std::chrono::duration<float> _total_duration;
143
144 std::chrono::steady_clock::time_point _start = {};
145 accumulator_type _latencies;
20effc67 146 uint64_t _requests = 0;
11fdf7f2
TL
147 std::uniform_int_distribution<uint32_t> _pos_distribution;
148 file _file;
20effc67
TL
149 bool _think = false;
150 timer<> _thinker;
11fdf7f2 151
20effc67
TL
152 virtual future<> do_start(sstring dir, directory_entry_type type) = 0;
153 virtual future<size_t> issue_request(char *buf, io_intent* intent) = 0;
11fdf7f2 154public:
11fdf7f2
TL
155 class_data(job_config cfg)
156 : _config(std::move(cfg))
157 , _alignment(_config.shard_info.request_size >= 4096 ? 4096 : 512)
20effc67 158 , _iop(io_priority_class::register_one(name(), _config.shard_info.shares))
11fdf7f2
TL
159 , _sg(cfg.shard_info.scheduling_group)
160 , _latencies(extended_p_square_probabilities = quantiles)
20effc67
TL
161 , _pos_distribution(0, _config.file_size / _config.shard_info.request_size)
162 , _thinker([this] { think_tick(); })
163 {
164 if (_config.shard_info.think_after > 0us) {
165 _thinker.arm(std::chrono::duration_cast<std::chrono::microseconds>(_config.shard_info.think_after));
166 } else if (_config.shard_info.think_time > 0us) {
167 _think = true;
168 }
169 }
11fdf7f2 170
9f95a23c
TL
171 virtual ~class_data() = default;
172
20effc67
TL
173private:
174
175 void think_tick() {
176 if (_think) {
177 _think = false;
178 _thinker.arm(std::chrono::duration_cast<std::chrono::microseconds>(_config.shard_info.think_after));
179 } else {
180 _think = true;
181 _thinker.arm(std::chrono::duration_cast<std::chrono::microseconds>(_config.shard_info.think_time));
182 }
183 }
184
185 future<> issue_requests_in_parallel(std::chrono::steady_clock::time_point stop, unsigned parallelism) {
186 return parallel_for_each(boost::irange(0u, parallelism), [this, stop] (auto dummy) mutable {
187 auto bufptr = allocate_aligned_buffer<char>(this->req_size(), _alignment);
188 auto buf = bufptr.get();
189 return do_until([stop] { return std::chrono::steady_clock::now() > stop; }, [this, buf, stop] () mutable {
190 auto start = std::chrono::steady_clock::now();
191 return issue_request(buf, nullptr).then([this, start, stop] (auto size) {
192 auto now = std::chrono::steady_clock::now();
193 if (now < stop) {
194 this->add_result(size, std::chrono::duration_cast<std::chrono::microseconds>(now - start));
195 }
196 return think();
197 });
198 }).finally([bufptr = std::move(bufptr)] {});
199 });
200 }
201
202 future<> issue_requests_at_rate(std::chrono::steady_clock::time_point stop, unsigned rps) {
203 return do_with(io_intent{}, 0u, [this, stop, rps] (io_intent& intent, unsigned& in_flight) {
204 auto bufptr = allocate_aligned_buffer<char>(this->req_size(), _alignment);
205 auto buf = bufptr.get();
206 auto pause = std::chrono::duration_cast<std::chrono::microseconds>(1s) / rps;
207 return do_until([stop] { return std::chrono::steady_clock::now() > stop; }, [this, buf, stop, pause, &intent, &in_flight] () mutable {
208 auto start = std::chrono::steady_clock::now();
209 in_flight++;
210 (void)issue_request(buf, &intent).then([this, start, stop, &in_flight] (auto size) {
211 auto now = std::chrono::steady_clock::now();
212 if (now < stop) {
213 this->add_result(size, std::chrono::duration_cast<std::chrono::microseconds>(now - start));
214 }
215 in_flight--;
216 return make_ready_future<>();
217 });
218 return seastar::sleep(std::max(std::chrono::duration_cast<std::chrono::microseconds>((start + pause) - std::chrono::steady_clock::now()), 0us));
219 }).then([&intent, &in_flight] {
220 intent.cancel();
221 return do_until([&in_flight] { return in_flight == 0; }, [] { return seastar::sleep(100ms /* ¯\_(ツ)_/¯ */); });
222 }).finally([bufptr = std::move(bufptr)] {});
223 });
224 }
225
226public:
11fdf7f2
TL
227 future<> issue_requests(std::chrono::steady_clock::time_point stop) {
228 _start = std::chrono::steady_clock::now();
229 return with_scheduling_group(_sg, [this, stop] {
20effc67
TL
230 if (parallelism() != 0) {
231 return issue_requests_in_parallel(stop, parallelism());
232 } else /* rps() != 0 */ {
233 assert(rps() != 0);
234 return issue_requests_at_rate(stop, rps());
235 }
11fdf7f2
TL
236 }).then([this] {
237 _total_duration = std::chrono::steady_clock::now() - _start;
238 });
239 }
240
241 future<> think() {
20effc67 242 if (_think) {
11fdf7f2
TL
243 return seastar::sleep(std::chrono::duration_cast<std::chrono::microseconds>(_config.shard_info.think_time));
244 } else {
245 return make_ready_future<>();
246 }
247 }
248 // Generate the test file for reads and writes alike. It is much simpler to just generate one file per job instead of expecting
249 // job dependencies between creators and consumers. So every job (a class in a shard) will have its own file and will operate
250 // this file differently depending on the type:
251 //
252 // sequential reads : will read the file from pos = 0 onwards, back to 0 on EOF
253 // sequential writes : will write the file from pos = 0 onwards, back to 0 on EOF
254 // random reads : will read the file at random positions, between 0 and EOF
255 // random writes : will overwrite the file at a random position, between 0 and EOF
256 // append : will write to the file from pos = EOF onwards, always appending to the end.
257 // cpu : CPU-only load, file is not created.
20effc67
TL
258 future<> start(sstring dir, directory_entry_type type) {
259 return do_start(dir, type);
11fdf7f2 260 }
9f95a23c
TL
261
262 future<> stop() {
263 if (_file) {
264 return _file.close();
265 }
266 return make_ready_future<>();
267 }
20effc67
TL
268
269 const sstring name() const {
270 return _config.name;
271 }
272
11fdf7f2
TL
273protected:
274 sstring type_str() const {
275 return std::unordered_map<request_type, sstring>{
276 { request_type::seqread, "SEQ READ" },
277 { request_type::seqwrite, "SEQ WRITE" },
278 { request_type::randread, "RAND READ" },
279 { request_type::randwrite, "RAND WRITE" },
280 { request_type::append , "APPEND" },
281 { request_type::cpu , "CPU" },
282 }[_config.type];;
283 }
284
11fdf7f2
TL
285 request_type req_type() const {
286 return _config.type;
287 }
288
289 sstring think_time() const {
290 if (_config.shard_info.think_time == std::chrono::duration<float>(0)) {
291 return "NO think time";
292 } else {
293 return format("{:d} us think time", std::chrono::duration_cast<std::chrono::microseconds>(_config.shard_info.think_time).count());
294 }
295 }
296
297 size_t req_size() const {
298 return _config.shard_info.request_size;
299 }
300
301 unsigned parallelism() const {
302 return _config.shard_info.parallelism;
303 }
304
20effc67
TL
305 unsigned rps() const {
306 return _config.shard_info.rps;
307 }
308
11fdf7f2
TL
309 unsigned shares() const {
310 return _config.shard_info.shares;
311 }
312
313 std::chrono::duration<float> total_duration() const {
314 return _total_duration;
315 }
316
20effc67
TL
317 uint64_t file_size_mb() const {
318 return _config.file_size >> 20;
319 }
320
11fdf7f2
TL
321 uint64_t total_data() const {
322 return _data;
323 }
324
325 uint64_t max_latency() const {
326 return max(_latencies);
327 }
328
329 uint64_t average_latency() const {
330 return mean(_latencies);
331 }
332
333 uint64_t quantile_latency(double q) const {
334 return quantile(_latencies, quantile_probability = q);
335 }
336
20effc67
TL
337 uint64_t requests() const noexcept {
338 return _requests;
339 }
340
11fdf7f2
TL
341 bool is_sequential() const {
342 return (req_type() == request_type::seqread) || (req_type() == request_type::seqwrite);
343 }
344 bool is_random() const {
345 return (req_type() == request_type::randread) || (req_type() == request_type::randwrite);
346 }
347
348 uint64_t get_pos() {
349 uint64_t pos;
350 if (is_random()) {
351 pos = _pos_distribution(random_generator) * req_size();
352 } else {
353 pos = _last_pos + req_size();
20effc67 354 if (is_sequential() && (pos >= _config.file_size)) {
11fdf7f2
TL
355 pos = 0;
356 }
357 }
358 _last_pos = pos;
20effc67 359 return pos + _offset;
11fdf7f2
TL
360 }
361
362 void add_result(size_t data, std::chrono::microseconds latency) {
363 _data += data;
364 _latencies(latency.count());
20effc67 365 _requests++;
11fdf7f2
TL
366 }
367
368public:
20effc67 369 virtual void emit_results(YAML::Emitter& out) = 0;
11fdf7f2
TL
370};
371
372class io_class_data : public class_data {
373public:
374 io_class_data(job_config cfg) : class_data(std::move(cfg)) {}
375
20effc67
TL
376 future<> do_start(sstring path, directory_entry_type type) override {
377 if (type == directory_entry_type::directory) {
378 return do_start_on_directory(path);
379 }
380
381 if (type == directory_entry_type::block_device) {
382 return do_start_on_bdev(path);
383 }
384
385 throw std::runtime_error(format("Unsupported storage. {} should be directory or block device", path));
386 }
387
388private:
389 future<> do_start_on_directory(sstring dir) {
f67539c2 390 auto fname = format("{}/test-{}-{:d}", dir, name(), this_shard_id());
20effc67 391 auto flags = open_flags::rw | open_flags::create;
f67539c2
TL
392 if (_config.options.dsync) {
393 flags |= open_flags::dsync;
394 }
20effc67
TL
395 file_open_options options;
396 options.extent_allocation_size_hint = _config.file_size;
397 options.append_is_unlikely = true;
398 return open_file_dma(fname, flags, options).then([this, fname] (auto f) {
11fdf7f2 399 _file = f;
20effc67
TL
400 auto maybe_remove_file = [] (sstring fname) {
401 return keep_files ? make_ready_future<>() : remove_file(fname);
402 };
403 return maybe_remove_file(fname).then([this] {
404 return _file.size().then([this] (uint64_t size) {
405 return _file.truncate(_config.file_size).then([this, size] {
406 if (size >= _config.file_size) {
407 return make_ready_future<>();
408 }
409
410 auto bufsize = 256ul << 10;
411 return do_with(boost::irange(0ul, (_config.file_size / bufsize) + 1), [this, bufsize] (auto& pos) mutable {
412 return max_concurrent_for_each(pos.begin(), pos.end(), 64, [this, bufsize] (auto pos) mutable {
413 auto bufptr = allocate_aligned_buffer<char>(bufsize, 4096);
414 auto buf = bufptr.get();
415 std::uniform_int_distribution<char> fill('@', '~');
416 memset(buf, fill(random_generator), bufsize);
417 pos = pos * bufsize;
418 return _file.dma_write(pos, buf, bufsize).finally([this, bufptr = std::move(bufptr), pos] {
419 if ((this->req_type() == request_type::append) && (pos > _last_pos)) {
420 _last_pos = pos;
421 }
422 }).discard_result();
423 });
424 }).then([this] {
425 return _file.flush();
426 });
11fdf7f2
TL
427 });
428 });
429 });
11fdf7f2
TL
430 });
431 }
432
20effc67
TL
433 future<> do_start_on_bdev(sstring name) {
434 auto flags = open_flags::rw;
435 if (_config.options.dsync) {
436 flags |= open_flags::dsync;
437 }
438
439 return open_file_dma(name, flags).then([this] (auto f) {
440 _file = std::move(f);
441 return _file.size().then([this] (uint64_t size) {
442 auto shard_area_size = align_down<uint64_t>(size / smp::count, 1 << 20);
443 if (_config.offset_in_bdev + _config.file_size > shard_area_size) {
444 throw std::runtime_error("Data doesn't fit the blockdevice");
445 }
446 _offset = shard_area_size * this_shard_id() + _config.offset_in_bdev;
447 return make_ready_future<>();
448 });
449 });
450 }
451
452 void emit_one_metrics(YAML::Emitter& out, sstring m_name) {
453 const auto& values = seastar::metrics::impl::get_value_map();
454 const auto& mf = values.find(m_name);
455 assert(mf != values.end());
456 for (auto&& mi : mf->second) {
457 auto&& cname = mi.first.find("class");
458 if (cname != mi.first.end() && cname->second == name()) {
459 out << YAML::Key << m_name << YAML::Value << mi.second->get_function()().d();
460 }
461 }
11fdf7f2
TL
462 }
463
20effc67
TL
464 void emit_metrics(YAML::Emitter& out) {
465 emit_one_metrics(out, "io_queue_total_exec_sec");
466 emit_one_metrics(out, "io_queue_total_delay_sec");
467 emit_one_metrics(out, "io_queue_total_operations");
468 emit_one_metrics(out, "io_queue_starvation_time_sec");
469 }
470
471public:
472 virtual void emit_results(YAML::Emitter& out) override {
11fdf7f2 473 auto throughput_kbs = (total_data() >> 10) / total_duration().count();
20effc67
TL
474 auto iops = requests() / total_duration().count();
475 out << YAML::Key << "throughput" << YAML::Value << throughput_kbs << YAML::Comment("kB/s");
476 out << YAML::Key << "IOPS" << YAML::Value << iops;
477 out << YAML::Key << "latencies" << YAML::Comment("usec");
478 out << YAML::BeginMap;
479 out << YAML::Key << "average" << YAML::Value << average_latency();
11fdf7f2 480 for (auto& q: quantiles) {
20effc67 481 out << YAML::Key << fmt::format("p{}", q) << YAML::Value << quantile_latency(q);
11fdf7f2 482 }
20effc67
TL
483 out << YAML::Key << "max" << YAML::Value << max_latency();
484 out << YAML::EndMap;
485 out << YAML::Key << "stats" << YAML::BeginMap;
486 out << YAML::Key << "total_requests" << YAML::Value << requests();
487 emit_metrics(out);
488 out << YAML::EndMap;
11fdf7f2
TL
489 }
490};
491
492class read_io_class_data : public io_class_data {
493public:
494 read_io_class_data(job_config cfg) : io_class_data(std::move(cfg)) {}
495
20effc67
TL
496 future<size_t> issue_request(char *buf, io_intent* intent) override {
497 return _file.dma_read(this->get_pos(), buf, this->req_size(), _iop, intent);
11fdf7f2
TL
498 }
499};
500
501class write_io_class_data : public io_class_data {
502public:
503 write_io_class_data(job_config cfg) : io_class_data(std::move(cfg)) {}
504
20effc67
TL
505 future<size_t> issue_request(char *buf, io_intent* intent) override {
506 return _file.dma_write(this->get_pos(), buf, this->req_size(), _iop, intent);
11fdf7f2
TL
507 }
508};
509
510class cpu_class_data : public class_data {
511public:
512 cpu_class_data(job_config cfg) : class_data(std::move(cfg)) {}
513
20effc67 514 future<> do_start(sstring dir, directory_entry_type type) override {
11fdf7f2
TL
515 return make_ready_future<>();
516 }
517
20effc67 518 future<size_t> issue_request(char *buf, io_intent* intent) override {
11fdf7f2
TL
519 // We do want the execution time to be a busy loop, and not just a bunch of
520 // continuations until our time is up: by doing this we can also simulate the behavior
521 // of I/O continuations in the face of reactor stalls.
522 auto start = std::chrono::steady_clock::now();
523 do {
524 } while ((std::chrono::steady_clock::now() - start) < _config.shard_info.execution_time);
525 return make_ready_future<size_t>(1);
526 }
527
20effc67 528 virtual void emit_results(YAML::Emitter& out) override {
11fdf7f2 529 auto throughput = total_data() / total_duration().count();
20effc67 530 out << YAML::Key << "throughput" << YAML::Value << throughput;
11fdf7f2
TL
531 }
532};
533
534std::unique_ptr<class_data> job_config::gen_class_data() {
535 if (type == request_type::cpu) {
536 return std::make_unique<cpu_class_data>(*this);
537 } else if ((type == request_type::seqread) || (type == request_type::randread)) {
538 return std::make_unique<read_io_class_data>(*this);
539 } else {
540 return std::make_unique<write_io_class_data>(*this);
541 }
542}
543
544/// YAML parsing functions
545namespace YAML {
546template<>
547struct convert<byte_size> {
548 static bool decode(const Node& node, byte_size& bs) {
549 auto str = node.as<std::string>();
550 unsigned shift = 0;
551 if (str.back() == 'B') {
552 str.pop_back();
553 shift = std::unordered_map<char, unsigned>{
554 { 'k', 10 },
555 { 'M', 20 },
556 { 'G', 30 },
557 }[str.back()];
558 str.pop_back();
559 }
560 bs.size = (boost::lexical_cast<size_t>(str) << shift);
561 return bs.size >= 512;
562 }
563};
564
565template<>
566struct convert<duration_time> {
567 static bool decode(const Node& node, duration_time& dt) {
568 auto str = node.as<std::string>();
569 if (str == "0") {
570 dt.time = 0ns;
571 return true;
572 }
573 if (str.back() != 's') {
574 return false;
575 }
576 str.pop_back();
577 std::unordered_map<char, std::chrono::duration<float>> unit = {
578 { 'n', 1ns },
579 { 'u', 1us },
580 { 'm', 1ms },
581 };
582
583 if (unit.count(str.back())) {
584 auto u = str.back();
585 str.pop_back();
586 dt.time = (boost::lexical_cast<size_t>(str) * unit[u]);
587 } else {
588 dt.time = (boost::lexical_cast<size_t>(str) * 1s);
589 }
590 return true;
591 }
592};
593
594template<>
595struct convert<shard_config> {
596 static bool decode(const Node& node, shard_config& shards) {
597 try {
598 auto str = node.as<std::string>();
599 return (str == "all");
600 } catch (YAML::TypedBadConversion<std::string>& e) {
601 shards = shard_config(boost::copy_range<std::unordered_set<unsigned>>(node.as<std::vector<unsigned>>()));
602 return true;
603 }
604 return false;
605 }
606};
607
608template<>
609struct convert<request_type> {
610 static bool decode(const Node& node, request_type& rt) {
611 static std::unordered_map<std::string, request_type> mappings = {
612 { "seqread", request_type::seqread },
613 { "seqwrite", request_type::seqwrite},
614 { "randread", request_type::randread },
615 { "randwrite", request_type::randwrite },
616 { "append", request_type::append},
617 { "cpu", request_type::cpu},
618 };
619 auto reqstr = node.as<std::string>();
620 if (!mappings.count(reqstr)) {
621 return false;
622 }
623 rt = mappings[reqstr];
624 return true;
625 }
626};
627
628template<>
629struct convert<shard_info> {
630 static bool decode(const Node& node, shard_info& sl) {
631 if (node["parallelism"]) {
632 sl.parallelism = node["parallelism"].as<unsigned>();
633 }
20effc67
TL
634 if (node["rps"]) {
635 sl.rps = node["rps"].as<unsigned>();
636 }
637 if ((sl.parallelism == 0) == (sl.rps == 0)) {
638 fmt::print("Must specify exactly one of 'parallelism' or 'rps' parameters\n");
639 return false;
640 }
641
11fdf7f2
TL
642 if (node["shares"]) {
643 sl.shares = node["shares"].as<unsigned>();
644 }
645 if (node["reqsize"]) {
646 sl.request_size = node["reqsize"].as<byte_size>().size;
647 }
648 if (node["think_time"]) {
649 sl.think_time = node["think_time"].as<duration_time>().time;
650 }
20effc67
TL
651 if (node["think_after"]) {
652 sl.think_after = node["think_after"].as<duration_time>().time;
653 }
11fdf7f2
TL
654 if (node["execution_time"]) {
655 sl.execution_time = node["execution_time"].as<duration_time>().time;
656 }
657 return true;
658 }
659};
660
f67539c2
TL
661template<>
662struct convert<options> {
663 static bool decode(const Node& node, options& op) {
664 if (node["dsync"]) {
665 op.dsync = node["dsync"].as<bool>();
666 }
667 return true;
668 }
669};
670
11fdf7f2
TL
671template<>
672struct convert<job_config> {
673 static bool decode(const Node& node, job_config& cl) {
674 cl.name = node["name"].as<std::string>();
675 cl.type = node["type"].as<request_type>();
676 cl.shard_placement = node["shards"].as<shard_config>();
20effc67
TL
677 // The data_size is used to divide the available (and effectively
678 // constant) disk space between workloads. Each shard inside the
679 // workload thus uses its portion of the assigned space.
680 if (node["data_size"]) {
681 cl.file_size = node["data_size"].as<byte_size>().size / smp::count;
682 } else {
683 cl.file_size = 1ull << 30; // 1G by default
684 }
11fdf7f2
TL
685 if (node["shard_info"]) {
686 cl.shard_info = node["shard_info"].as<shard_info>();
687 }
f67539c2
TL
688 if (node["options"]) {
689 cl.options = node["options"].as<options>();
690 }
11fdf7f2
TL
691 return true;
692 }
693};
694}
695
696/// Each shard has one context, and the context is responsible for creating the classes that should
697/// run in this shard.
698class context {
699 std::vector<std::unique_ptr<class_data>> _cl;
700
701 sstring _dir;
20effc67 702 directory_entry_type _type;
11fdf7f2
TL
703 std::chrono::seconds _duration;
704
705 semaphore _finished;
706public:
20effc67 707 context(sstring dir, directory_entry_type dtype, std::vector<job_config> req_config, unsigned duration)
11fdf7f2 708 : _cl(boost::copy_range<std::vector<std::unique_ptr<class_data>>>(req_config
f67539c2 709 | boost::adaptors::filtered([] (auto& cfg) { return cfg.shard_placement.is_set(this_shard_id()); })
11fdf7f2
TL
710 | boost::adaptors::transformed([] (auto& cfg) { return cfg.gen_class_data(); })
711 ))
712 , _dir(dir)
20effc67 713 , _type(dtype)
11fdf7f2
TL
714 , _duration(duration)
715 , _finished(0)
716 {}
717
9f95a23c
TL
718 future<> stop() {
719 return parallel_for_each(_cl, [] (std::unique_ptr<class_data>& cl) {
720 return cl->stop();
721 });
722 }
11fdf7f2
TL
723
724 future<> start() {
725 return parallel_for_each(_cl, [this] (std::unique_ptr<class_data>& cl) {
20effc67 726 return cl->start(_dir, _type);
11fdf7f2
TL
727 });
728 }
729
730 future<> issue_requests() {
731 return parallel_for_each(_cl.begin(), _cl.end(), [this] (std::unique_ptr<class_data>& cl) {
732 return cl->issue_requests(std::chrono::steady_clock::now() + _duration).finally([this] {
733 _finished.signal(1);
734 });
735 });
736 }
737
20effc67
TL
738 future<> emit_results(YAML::Emitter& out) {
739 return _finished.wait(_cl.size()).then([this, &out] {
11fdf7f2 740 for (auto& cl: _cl) {
20effc67
TL
741 out << YAML::Key << cl->name();
742 out << YAML::BeginMap;
743 cl->emit_results(out);
744 out << YAML::EndMap;
11fdf7f2
TL
745 }
746 return make_ready_future<>();
747 });
748 }
749};
750
20effc67
TL
751static void show_results(distributed<context>& ctx) {
752 YAML::Emitter out;
753 out << YAML::BeginDoc;
754 out << YAML::BeginSeq;
755 for (unsigned i = 0; i < smp::count; ++i) {
756 out << YAML::BeginMap;
757 out << YAML::Key << "shard" << YAML::Value << i;
758 ctx.invoke_on(i, [&out] (auto& c) {
759 return c.emit_results(out);
760 }).get();
761 out << YAML::EndMap;
762 }
763 out << YAML::EndSeq;
764 out << YAML::EndDoc;
765 std::cout << out.c_str();
11fdf7f2
TL
766}
767
768int main(int ac, char** av) {
769 namespace bpo = boost::program_options;
770
771 app_template app;
772 auto opt_add = app.add_options();
773 opt_add
20effc67 774 ("storage", bpo::value<sstring>()->default_value("."), "directory or block device where to execute the test")
11fdf7f2
TL
775 ("duration", bpo::value<unsigned>()->default_value(10), "for how long (in seconds) to run the test")
776 ("conf", bpo::value<sstring>()->default_value("./conf.yaml"), "YAML file containing benchmark specification")
20effc67 777 ("keep-files", bpo::value<bool>()->default_value(false), "keep test files, next run may re-use them")
11fdf7f2
TL
778 ;
779
780 distributed<context> ctx;
781 return app.run(ac, av, [&] {
782 return seastar::async([&] {
783 auto& opts = app.configuration();
20effc67
TL
784 auto& storage = opts["storage"].as<sstring>();
785
786 auto st_type = engine().file_type(storage).get0();
787
788 if (!st_type) {
789 throw std::runtime_error(format("Unknown storage {}", storage));
790 }
11fdf7f2 791
20effc67
TL
792 if (*st_type == directory_entry_type::directory) {
793 auto fs = file_system_at(storage).get0();
794 if (fs != fs_type::xfs) {
795 throw std::runtime_error(format("This is a performance test. {} is not on XFS", storage));
796 }
11fdf7f2
TL
797 }
798
20effc67 799 keep_files = opts["keep-files"].as<bool>();
11fdf7f2
TL
800 auto& duration = opts["duration"].as<unsigned>();
801 auto& yaml = opts["conf"].as<sstring>();
802 YAML::Node doc = YAML::LoadFile(yaml);
803 auto reqs = doc.as<std::vector<job_config>>();
804
805 parallel_for_each(reqs, [] (auto& r) {
806 return seastar::create_scheduling_group(r.name, r.shard_info.shares).then([&r] (seastar::scheduling_group sg) {
807 r.shard_info.scheduling_group = sg;
808 });
809 }).get();
810
20effc67
TL
811 if (*st_type == directory_entry_type::block_device) {
812 uint64_t off = 0;
813 for (job_config& r : reqs) {
814 r.offset_in_bdev = off;
815 off += r.file_size;
816 }
817 }
818
819 ctx.start(storage, *st_type, reqs, duration).get0();
11fdf7f2
TL
820 engine().at_exit([&ctx] {
821 return ctx.stop();
822 });
823 std::cout << "Creating initial files..." << std::endl;
824 ctx.invoke_on_all([] (auto& c) {
825 return c.start();
826 }).get();
827 std::cout << "Starting evaluation..." << std::endl;
828 ctx.invoke_on_all([] (auto& c) {
829 return c.issue_requests();
830 }).get();
20effc67 831 show_results(ctx);
9f95a23c 832 ctx.stop().get0();
11fdf7f2
TL
833 }).or_terminate();
834 });
835}