]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | /* |
2 | * This file is open source software, licensed to you under the terms | |
3 | * of the Apache License, Version 2.0 (the "License"). See the NOTICE file | |
4 | * distributed with this work for additional information regarding copyright | |
5 | * ownership. You may not use this file except in compliance with the License. | |
6 | * | |
7 | * You may obtain a copy of the License at | |
8 | * | |
9 | * http://www.apache.org/licenses/LICENSE-2.0 | |
10 | * | |
11 | * Unless required by applicable law or agreed to in writing, | |
12 | * software distributed under the License is distributed on an | |
13 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
14 | * KIND, either express or implied. See the License for the | |
15 | * specific language governing permissions and limitations | |
16 | * under the License. | |
17 | */ | |
18 | /* | |
19 | * Copyright (C) 2017 ScyllaDB | |
20 | */ | |
21 | #include <seastar/core/app-template.hh> | |
22 | #include <seastar/core/distributed.hh> | |
23 | #include <seastar/core/reactor.hh> | |
24 | #include <seastar/core/future.hh> | |
25 | #include <seastar/core/shared_ptr.hh> | |
26 | #include <seastar/core/file.hh> | |
27 | #include <seastar/core/sleep.hh> | |
28 | #include <seastar/core/align.hh> | |
29 | #include <seastar/core/timer.hh> | |
30 | #include <seastar/core/thread.hh> | |
9f95a23c | 31 | #include <seastar/core/print.hh> |
f67539c2 TL |
32 | #include <seastar/core/loop.hh> |
33 | #include <seastar/core/with_scheduling_group.hh> | |
20effc67 TL |
34 | #include <seastar/core/metrics_api.hh> |
35 | #include <seastar/core/io_intent.hh> | |
11fdf7f2 TL |
36 | #include <chrono> |
37 | #include <vector> | |
38 | #include <boost/range/irange.hpp> | |
39 | #include <boost/algorithm/string.hpp> | |
40 | #include <boost/accumulators/accumulators.hpp> | |
41 | #include <boost/accumulators/statistics/stats.hpp> | |
42 | #include <boost/accumulators/statistics/max.hpp> | |
43 | #include <boost/accumulators/statistics/mean.hpp> | |
44 | #include <boost/accumulators/statistics/p_square_quantile.hpp> | |
45 | #include <boost/accumulators/statistics/extended_p_square.hpp> | |
46 | #include <boost/accumulators/statistics/extended_p_square_quantile.hpp> | |
47 | #include <boost/range/adaptor/filtered.hpp> | |
48 | #include <boost/range/adaptor/map.hpp> | |
49 | #include <boost/array.hpp> | |
50 | #include <iomanip> | |
51 | #include <random> | |
52 | #include <yaml-cpp/yaml.h> | |
53 | ||
54 | using namespace seastar; | |
55 | using namespace std::chrono_literals; | |
56 | using namespace boost::accumulators; | |
57 | ||
58 | static auto random_seed = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count(); | |
59 | static std::default_random_engine random_generator(random_seed); | |
11fdf7f2 | 60 | |
9f95a23c | 61 | class context; |
11fdf7f2 TL |
62 | enum class request_type { seqread, seqwrite, randread, randwrite, append, cpu }; |
63 | ||
64 | namespace std { | |
65 | ||
66 | template <> | |
67 | struct hash<request_type> { | |
68 | size_t operator() (const request_type& type) const { | |
69 | return static_cast<size_t>(type); | |
70 | } | |
71 | }; | |
72 | ||
73 | } | |
74 | ||
75 | struct byte_size { | |
76 | uint64_t size; | |
77 | }; | |
78 | ||
79 | struct duration_time { | |
80 | std::chrono::duration<float> time; | |
81 | }; | |
82 | ||
83 | class shard_config { | |
84 | std::unordered_set<unsigned> _shards; | |
85 | public: | |
86 | shard_config() | |
87 | : _shards(boost::copy_range<std::unordered_set<unsigned>>(boost::irange(0u, smp::count))) {} | |
88 | shard_config(std::unordered_set<unsigned> s) : _shards(std::move(s)) {} | |
89 | ||
90 | bool is_set(unsigned cpu) const { | |
91 | return _shards.count(cpu); | |
92 | } | |
93 | }; | |
94 | ||
95 | struct shard_info { | |
20effc67 TL |
96 | unsigned parallelism = 0; |
97 | unsigned rps = 0; | |
11fdf7f2 TL |
98 | unsigned shares = 10; |
99 | uint64_t request_size = 4 << 10; | |
100 | std::chrono::duration<float> think_time = 0ms; | |
20effc67 | 101 | std::chrono::duration<float> think_after = 0ms; |
11fdf7f2 TL |
102 | std::chrono::duration<float> execution_time = 1ms; |
103 | seastar::scheduling_group scheduling_group = seastar::default_scheduling_group(); | |
104 | }; | |
105 | ||
f67539c2 TL |
106 | struct options { |
107 | bool dsync = false; | |
108 | }; | |
109 | ||
11fdf7f2 TL |
110 | class class_data; |
111 | ||
112 | struct job_config { | |
113 | std::string name; | |
114 | request_type type; | |
115 | shard_config shard_placement; | |
116 | ::shard_info shard_info; | |
f67539c2 | 117 | ::options options; |
20effc67 TL |
118 | // size of each individual file. Every class and every shard have its file, so in a normal |
119 | // system with many shards we'll naturally have many files and that will push the data out | |
120 | // of the disk's cache | |
121 | uint64_t file_size; | |
122 | uint64_t offset_in_bdev; | |
11fdf7f2 TL |
123 | std::unique_ptr<class_data> gen_class_data(); |
124 | }; | |
125 | ||
126 | std::array<double, 4> quantiles = { 0.5, 0.95, 0.99, 0.999}; | |
20effc67 | 127 | static bool keep_files = false; |
11fdf7f2 TL |
128 | |
129 | class class_data { | |
130 | protected: | |
131 | using accumulator_type = accumulator_set<double, stats<tag::extended_p_square_quantile(quadratic), tag::mean, tag::max>>; | |
132 | ||
133 | job_config _config; | |
134 | uint64_t _alignment; | |
135 | uint64_t _last_pos = 0; | |
20effc67 | 136 | uint64_t _offset = 0; |
11fdf7f2 TL |
137 | |
138 | io_priority_class _iop; | |
139 | seastar::scheduling_group _sg; | |
140 | ||
141 | size_t _data = 0; | |
142 | std::chrono::duration<float> _total_duration; | |
143 | ||
144 | std::chrono::steady_clock::time_point _start = {}; | |
145 | accumulator_type _latencies; | |
20effc67 | 146 | uint64_t _requests = 0; |
11fdf7f2 TL |
147 | std::uniform_int_distribution<uint32_t> _pos_distribution; |
148 | file _file; | |
20effc67 TL |
149 | bool _think = false; |
150 | timer<> _thinker; | |
11fdf7f2 | 151 | |
20effc67 TL |
152 | virtual future<> do_start(sstring dir, directory_entry_type type) = 0; |
153 | virtual future<size_t> issue_request(char *buf, io_intent* intent) = 0; | |
11fdf7f2 | 154 | public: |
11fdf7f2 TL |
155 | class_data(job_config cfg) |
156 | : _config(std::move(cfg)) | |
157 | , _alignment(_config.shard_info.request_size >= 4096 ? 4096 : 512) | |
20effc67 | 158 | , _iop(io_priority_class::register_one(name(), _config.shard_info.shares)) |
11fdf7f2 TL |
159 | , _sg(cfg.shard_info.scheduling_group) |
160 | , _latencies(extended_p_square_probabilities = quantiles) | |
20effc67 TL |
161 | , _pos_distribution(0, _config.file_size / _config.shard_info.request_size) |
162 | , _thinker([this] { think_tick(); }) | |
163 | { | |
164 | if (_config.shard_info.think_after > 0us) { | |
165 | _thinker.arm(std::chrono::duration_cast<std::chrono::microseconds>(_config.shard_info.think_after)); | |
166 | } else if (_config.shard_info.think_time > 0us) { | |
167 | _think = true; | |
168 | } | |
169 | } | |
11fdf7f2 | 170 | |
9f95a23c TL |
171 | virtual ~class_data() = default; |
172 | ||
20effc67 TL |
173 | private: |
174 | ||
175 | void think_tick() { | |
176 | if (_think) { | |
177 | _think = false; | |
178 | _thinker.arm(std::chrono::duration_cast<std::chrono::microseconds>(_config.shard_info.think_after)); | |
179 | } else { | |
180 | _think = true; | |
181 | _thinker.arm(std::chrono::duration_cast<std::chrono::microseconds>(_config.shard_info.think_time)); | |
182 | } | |
183 | } | |
184 | ||
185 | future<> issue_requests_in_parallel(std::chrono::steady_clock::time_point stop, unsigned parallelism) { | |
186 | return parallel_for_each(boost::irange(0u, parallelism), [this, stop] (auto dummy) mutable { | |
187 | auto bufptr = allocate_aligned_buffer<char>(this->req_size(), _alignment); | |
188 | auto buf = bufptr.get(); | |
189 | return do_until([stop] { return std::chrono::steady_clock::now() > stop; }, [this, buf, stop] () mutable { | |
190 | auto start = std::chrono::steady_clock::now(); | |
191 | return issue_request(buf, nullptr).then([this, start, stop] (auto size) { | |
192 | auto now = std::chrono::steady_clock::now(); | |
193 | if (now < stop) { | |
194 | this->add_result(size, std::chrono::duration_cast<std::chrono::microseconds>(now - start)); | |
195 | } | |
196 | return think(); | |
197 | }); | |
198 | }).finally([bufptr = std::move(bufptr)] {}); | |
199 | }); | |
200 | } | |
201 | ||
202 | future<> issue_requests_at_rate(std::chrono::steady_clock::time_point stop, unsigned rps) { | |
203 | return do_with(io_intent{}, 0u, [this, stop, rps] (io_intent& intent, unsigned& in_flight) { | |
204 | auto bufptr = allocate_aligned_buffer<char>(this->req_size(), _alignment); | |
205 | auto buf = bufptr.get(); | |
206 | auto pause = std::chrono::duration_cast<std::chrono::microseconds>(1s) / rps; | |
207 | return do_until([stop] { return std::chrono::steady_clock::now() > stop; }, [this, buf, stop, pause, &intent, &in_flight] () mutable { | |
208 | auto start = std::chrono::steady_clock::now(); | |
209 | in_flight++; | |
210 | (void)issue_request(buf, &intent).then([this, start, stop, &in_flight] (auto size) { | |
211 | auto now = std::chrono::steady_clock::now(); | |
212 | if (now < stop) { | |
213 | this->add_result(size, std::chrono::duration_cast<std::chrono::microseconds>(now - start)); | |
214 | } | |
215 | in_flight--; | |
216 | return make_ready_future<>(); | |
217 | }); | |
218 | return seastar::sleep(std::max(std::chrono::duration_cast<std::chrono::microseconds>((start + pause) - std::chrono::steady_clock::now()), 0us)); | |
219 | }).then([&intent, &in_flight] { | |
220 | intent.cancel(); | |
221 | return do_until([&in_flight] { return in_flight == 0; }, [] { return seastar::sleep(100ms /* ¯\_(ツ)_/¯ */); }); | |
222 | }).finally([bufptr = std::move(bufptr)] {}); | |
223 | }); | |
224 | } | |
225 | ||
226 | public: | |
11fdf7f2 TL |
227 | future<> issue_requests(std::chrono::steady_clock::time_point stop) { |
228 | _start = std::chrono::steady_clock::now(); | |
229 | return with_scheduling_group(_sg, [this, stop] { | |
20effc67 TL |
230 | if (parallelism() != 0) { |
231 | return issue_requests_in_parallel(stop, parallelism()); | |
232 | } else /* rps() != 0 */ { | |
233 | assert(rps() != 0); | |
234 | return issue_requests_at_rate(stop, rps()); | |
235 | } | |
11fdf7f2 TL |
236 | }).then([this] { |
237 | _total_duration = std::chrono::steady_clock::now() - _start; | |
238 | }); | |
239 | } | |
240 | ||
241 | future<> think() { | |
20effc67 | 242 | if (_think) { |
11fdf7f2 TL |
243 | return seastar::sleep(std::chrono::duration_cast<std::chrono::microseconds>(_config.shard_info.think_time)); |
244 | } else { | |
245 | return make_ready_future<>(); | |
246 | } | |
247 | } | |
248 | // Generate the test file for reads and writes alike. It is much simpler to just generate one file per job instead of expecting | |
249 | // job dependencies between creators and consumers. So every job (a class in a shard) will have its own file and will operate | |
250 | // this file differently depending on the type: | |
251 | // | |
252 | // sequential reads : will read the file from pos = 0 onwards, back to 0 on EOF | |
253 | // sequential writes : will write the file from pos = 0 onwards, back to 0 on EOF | |
254 | // random reads : will read the file at random positions, between 0 and EOF | |
255 | // random writes : will overwrite the file at a random position, between 0 and EOF | |
256 | // append : will write to the file from pos = EOF onwards, always appending to the end. | |
257 | // cpu : CPU-only load, file is not created. | |
20effc67 TL |
258 | future<> start(sstring dir, directory_entry_type type) { |
259 | return do_start(dir, type); | |
11fdf7f2 | 260 | } |
9f95a23c TL |
261 | |
262 | future<> stop() { | |
263 | if (_file) { | |
264 | return _file.close(); | |
265 | } | |
266 | return make_ready_future<>(); | |
267 | } | |
20effc67 TL |
268 | |
269 | const sstring name() const { | |
270 | return _config.name; | |
271 | } | |
272 | ||
11fdf7f2 TL |
273 | protected: |
274 | sstring type_str() const { | |
275 | return std::unordered_map<request_type, sstring>{ | |
276 | { request_type::seqread, "SEQ READ" }, | |
277 | { request_type::seqwrite, "SEQ WRITE" }, | |
278 | { request_type::randread, "RAND READ" }, | |
279 | { request_type::randwrite, "RAND WRITE" }, | |
280 | { request_type::append , "APPEND" }, | |
281 | { request_type::cpu , "CPU" }, | |
282 | }[_config.type];; | |
283 | } | |
284 | ||
11fdf7f2 TL |
285 | request_type req_type() const { |
286 | return _config.type; | |
287 | } | |
288 | ||
289 | sstring think_time() const { | |
290 | if (_config.shard_info.think_time == std::chrono::duration<float>(0)) { | |
291 | return "NO think time"; | |
292 | } else { | |
293 | return format("{:d} us think time", std::chrono::duration_cast<std::chrono::microseconds>(_config.shard_info.think_time).count()); | |
294 | } | |
295 | } | |
296 | ||
297 | size_t req_size() const { | |
298 | return _config.shard_info.request_size; | |
299 | } | |
300 | ||
301 | unsigned parallelism() const { | |
302 | return _config.shard_info.parallelism; | |
303 | } | |
304 | ||
20effc67 TL |
305 | unsigned rps() const { |
306 | return _config.shard_info.rps; | |
307 | } | |
308 | ||
11fdf7f2 TL |
309 | unsigned shares() const { |
310 | return _config.shard_info.shares; | |
311 | } | |
312 | ||
313 | std::chrono::duration<float> total_duration() const { | |
314 | return _total_duration; | |
315 | } | |
316 | ||
20effc67 TL |
317 | uint64_t file_size_mb() const { |
318 | return _config.file_size >> 20; | |
319 | } | |
320 | ||
11fdf7f2 TL |
321 | uint64_t total_data() const { |
322 | return _data; | |
323 | } | |
324 | ||
325 | uint64_t max_latency() const { | |
326 | return max(_latencies); | |
327 | } | |
328 | ||
329 | uint64_t average_latency() const { | |
330 | return mean(_latencies); | |
331 | } | |
332 | ||
333 | uint64_t quantile_latency(double q) const { | |
334 | return quantile(_latencies, quantile_probability = q); | |
335 | } | |
336 | ||
20effc67 TL |
337 | uint64_t requests() const noexcept { |
338 | return _requests; | |
339 | } | |
340 | ||
11fdf7f2 TL |
341 | bool is_sequential() const { |
342 | return (req_type() == request_type::seqread) || (req_type() == request_type::seqwrite); | |
343 | } | |
344 | bool is_random() const { | |
345 | return (req_type() == request_type::randread) || (req_type() == request_type::randwrite); | |
346 | } | |
347 | ||
348 | uint64_t get_pos() { | |
349 | uint64_t pos; | |
350 | if (is_random()) { | |
351 | pos = _pos_distribution(random_generator) * req_size(); | |
352 | } else { | |
353 | pos = _last_pos + req_size(); | |
20effc67 | 354 | if (is_sequential() && (pos >= _config.file_size)) { |
11fdf7f2 TL |
355 | pos = 0; |
356 | } | |
357 | } | |
358 | _last_pos = pos; | |
20effc67 | 359 | return pos + _offset; |
11fdf7f2 TL |
360 | } |
361 | ||
362 | void add_result(size_t data, std::chrono::microseconds latency) { | |
363 | _data += data; | |
364 | _latencies(latency.count()); | |
20effc67 | 365 | _requests++; |
11fdf7f2 TL |
366 | } |
367 | ||
368 | public: | |
20effc67 | 369 | virtual void emit_results(YAML::Emitter& out) = 0; |
11fdf7f2 TL |
370 | }; |
371 | ||
372 | class io_class_data : public class_data { | |
373 | public: | |
374 | io_class_data(job_config cfg) : class_data(std::move(cfg)) {} | |
375 | ||
20effc67 TL |
376 | future<> do_start(sstring path, directory_entry_type type) override { |
377 | if (type == directory_entry_type::directory) { | |
378 | return do_start_on_directory(path); | |
379 | } | |
380 | ||
381 | if (type == directory_entry_type::block_device) { | |
382 | return do_start_on_bdev(path); | |
383 | } | |
384 | ||
385 | throw std::runtime_error(format("Unsupported storage. {} should be directory or block device", path)); | |
386 | } | |
387 | ||
388 | private: | |
389 | future<> do_start_on_directory(sstring dir) { | |
f67539c2 | 390 | auto fname = format("{}/test-{}-{:d}", dir, name(), this_shard_id()); |
20effc67 | 391 | auto flags = open_flags::rw | open_flags::create; |
f67539c2 TL |
392 | if (_config.options.dsync) { |
393 | flags |= open_flags::dsync; | |
394 | } | |
20effc67 TL |
395 | file_open_options options; |
396 | options.extent_allocation_size_hint = _config.file_size; | |
397 | options.append_is_unlikely = true; | |
398 | return open_file_dma(fname, flags, options).then([this, fname] (auto f) { | |
11fdf7f2 | 399 | _file = f; |
20effc67 TL |
400 | auto maybe_remove_file = [] (sstring fname) { |
401 | return keep_files ? make_ready_future<>() : remove_file(fname); | |
402 | }; | |
403 | return maybe_remove_file(fname).then([this] { | |
404 | return _file.size().then([this] (uint64_t size) { | |
405 | return _file.truncate(_config.file_size).then([this, size] { | |
406 | if (size >= _config.file_size) { | |
407 | return make_ready_future<>(); | |
408 | } | |
409 | ||
410 | auto bufsize = 256ul << 10; | |
411 | return do_with(boost::irange(0ul, (_config.file_size / bufsize) + 1), [this, bufsize] (auto& pos) mutable { | |
412 | return max_concurrent_for_each(pos.begin(), pos.end(), 64, [this, bufsize] (auto pos) mutable { | |
413 | auto bufptr = allocate_aligned_buffer<char>(bufsize, 4096); | |
414 | auto buf = bufptr.get(); | |
415 | std::uniform_int_distribution<char> fill('@', '~'); | |
416 | memset(buf, fill(random_generator), bufsize); | |
417 | pos = pos * bufsize; | |
418 | return _file.dma_write(pos, buf, bufsize).finally([this, bufptr = std::move(bufptr), pos] { | |
419 | if ((this->req_type() == request_type::append) && (pos > _last_pos)) { | |
420 | _last_pos = pos; | |
421 | } | |
422 | }).discard_result(); | |
423 | }); | |
424 | }).then([this] { | |
425 | return _file.flush(); | |
426 | }); | |
11fdf7f2 TL |
427 | }); |
428 | }); | |
429 | }); | |
11fdf7f2 TL |
430 | }); |
431 | } | |
432 | ||
20effc67 TL |
433 | future<> do_start_on_bdev(sstring name) { |
434 | auto flags = open_flags::rw; | |
435 | if (_config.options.dsync) { | |
436 | flags |= open_flags::dsync; | |
437 | } | |
438 | ||
439 | return open_file_dma(name, flags).then([this] (auto f) { | |
440 | _file = std::move(f); | |
441 | return _file.size().then([this] (uint64_t size) { | |
442 | auto shard_area_size = align_down<uint64_t>(size / smp::count, 1 << 20); | |
443 | if (_config.offset_in_bdev + _config.file_size > shard_area_size) { | |
444 | throw std::runtime_error("Data doesn't fit the blockdevice"); | |
445 | } | |
446 | _offset = shard_area_size * this_shard_id() + _config.offset_in_bdev; | |
447 | return make_ready_future<>(); | |
448 | }); | |
449 | }); | |
450 | } | |
451 | ||
452 | void emit_one_metrics(YAML::Emitter& out, sstring m_name) { | |
453 | const auto& values = seastar::metrics::impl::get_value_map(); | |
454 | const auto& mf = values.find(m_name); | |
455 | assert(mf != values.end()); | |
456 | for (auto&& mi : mf->second) { | |
457 | auto&& cname = mi.first.find("class"); | |
458 | if (cname != mi.first.end() && cname->second == name()) { | |
459 | out << YAML::Key << m_name << YAML::Value << mi.second->get_function()().d(); | |
460 | } | |
461 | } | |
11fdf7f2 TL |
462 | } |
463 | ||
20effc67 TL |
464 | void emit_metrics(YAML::Emitter& out) { |
465 | emit_one_metrics(out, "io_queue_total_exec_sec"); | |
466 | emit_one_metrics(out, "io_queue_total_delay_sec"); | |
467 | emit_one_metrics(out, "io_queue_total_operations"); | |
468 | emit_one_metrics(out, "io_queue_starvation_time_sec"); | |
469 | } | |
470 | ||
471 | public: | |
472 | virtual void emit_results(YAML::Emitter& out) override { | |
11fdf7f2 | 473 | auto throughput_kbs = (total_data() >> 10) / total_duration().count(); |
20effc67 TL |
474 | auto iops = requests() / total_duration().count(); |
475 | out << YAML::Key << "throughput" << YAML::Value << throughput_kbs << YAML::Comment("kB/s"); | |
476 | out << YAML::Key << "IOPS" << YAML::Value << iops; | |
477 | out << YAML::Key << "latencies" << YAML::Comment("usec"); | |
478 | out << YAML::BeginMap; | |
479 | out << YAML::Key << "average" << YAML::Value << average_latency(); | |
11fdf7f2 | 480 | for (auto& q: quantiles) { |
20effc67 | 481 | out << YAML::Key << fmt::format("p{}", q) << YAML::Value << quantile_latency(q); |
11fdf7f2 | 482 | } |
20effc67 TL |
483 | out << YAML::Key << "max" << YAML::Value << max_latency(); |
484 | out << YAML::EndMap; | |
485 | out << YAML::Key << "stats" << YAML::BeginMap; | |
486 | out << YAML::Key << "total_requests" << YAML::Value << requests(); | |
487 | emit_metrics(out); | |
488 | out << YAML::EndMap; | |
11fdf7f2 TL |
489 | } |
490 | }; | |
491 | ||
492 | class read_io_class_data : public io_class_data { | |
493 | public: | |
494 | read_io_class_data(job_config cfg) : io_class_data(std::move(cfg)) {} | |
495 | ||
20effc67 TL |
496 | future<size_t> issue_request(char *buf, io_intent* intent) override { |
497 | return _file.dma_read(this->get_pos(), buf, this->req_size(), _iop, intent); | |
11fdf7f2 TL |
498 | } |
499 | }; | |
500 | ||
501 | class write_io_class_data : public io_class_data { | |
502 | public: | |
503 | write_io_class_data(job_config cfg) : io_class_data(std::move(cfg)) {} | |
504 | ||
20effc67 TL |
505 | future<size_t> issue_request(char *buf, io_intent* intent) override { |
506 | return _file.dma_write(this->get_pos(), buf, this->req_size(), _iop, intent); | |
11fdf7f2 TL |
507 | } |
508 | }; | |
509 | ||
510 | class cpu_class_data : public class_data { | |
511 | public: | |
512 | cpu_class_data(job_config cfg) : class_data(std::move(cfg)) {} | |
513 | ||
20effc67 | 514 | future<> do_start(sstring dir, directory_entry_type type) override { |
11fdf7f2 TL |
515 | return make_ready_future<>(); |
516 | } | |
517 | ||
20effc67 | 518 | future<size_t> issue_request(char *buf, io_intent* intent) override { |
11fdf7f2 TL |
519 | // We do want the execution time to be a busy loop, and not just a bunch of |
520 | // continuations until our time is up: by doing this we can also simulate the behavior | |
521 | // of I/O continuations in the face of reactor stalls. | |
522 | auto start = std::chrono::steady_clock::now(); | |
523 | do { | |
524 | } while ((std::chrono::steady_clock::now() - start) < _config.shard_info.execution_time); | |
525 | return make_ready_future<size_t>(1); | |
526 | } | |
527 | ||
20effc67 | 528 | virtual void emit_results(YAML::Emitter& out) override { |
11fdf7f2 | 529 | auto throughput = total_data() / total_duration().count(); |
20effc67 | 530 | out << YAML::Key << "throughput" << YAML::Value << throughput; |
11fdf7f2 TL |
531 | } |
532 | }; | |
533 | ||
534 | std::unique_ptr<class_data> job_config::gen_class_data() { | |
535 | if (type == request_type::cpu) { | |
536 | return std::make_unique<cpu_class_data>(*this); | |
537 | } else if ((type == request_type::seqread) || (type == request_type::randread)) { | |
538 | return std::make_unique<read_io_class_data>(*this); | |
539 | } else { | |
540 | return std::make_unique<write_io_class_data>(*this); | |
541 | } | |
542 | } | |
543 | ||
544 | /// YAML parsing functions | |
545 | namespace YAML { | |
546 | template<> | |
547 | struct convert<byte_size> { | |
548 | static bool decode(const Node& node, byte_size& bs) { | |
549 | auto str = node.as<std::string>(); | |
550 | unsigned shift = 0; | |
551 | if (str.back() == 'B') { | |
552 | str.pop_back(); | |
553 | shift = std::unordered_map<char, unsigned>{ | |
554 | { 'k', 10 }, | |
555 | { 'M', 20 }, | |
556 | { 'G', 30 }, | |
557 | }[str.back()]; | |
558 | str.pop_back(); | |
559 | } | |
560 | bs.size = (boost::lexical_cast<size_t>(str) << shift); | |
561 | return bs.size >= 512; | |
562 | } | |
563 | }; | |
564 | ||
565 | template<> | |
566 | struct convert<duration_time> { | |
567 | static bool decode(const Node& node, duration_time& dt) { | |
568 | auto str = node.as<std::string>(); | |
569 | if (str == "0") { | |
570 | dt.time = 0ns; | |
571 | return true; | |
572 | } | |
573 | if (str.back() != 's') { | |
574 | return false; | |
575 | } | |
576 | str.pop_back(); | |
577 | std::unordered_map<char, std::chrono::duration<float>> unit = { | |
578 | { 'n', 1ns }, | |
579 | { 'u', 1us }, | |
580 | { 'm', 1ms }, | |
581 | }; | |
582 | ||
583 | if (unit.count(str.back())) { | |
584 | auto u = str.back(); | |
585 | str.pop_back(); | |
586 | dt.time = (boost::lexical_cast<size_t>(str) * unit[u]); | |
587 | } else { | |
588 | dt.time = (boost::lexical_cast<size_t>(str) * 1s); | |
589 | } | |
590 | return true; | |
591 | } | |
592 | }; | |
593 | ||
594 | template<> | |
595 | struct convert<shard_config> { | |
596 | static bool decode(const Node& node, shard_config& shards) { | |
597 | try { | |
598 | auto str = node.as<std::string>(); | |
599 | return (str == "all"); | |
600 | } catch (YAML::TypedBadConversion<std::string>& e) { | |
601 | shards = shard_config(boost::copy_range<std::unordered_set<unsigned>>(node.as<std::vector<unsigned>>())); | |
602 | return true; | |
603 | } | |
604 | return false; | |
605 | } | |
606 | }; | |
607 | ||
608 | template<> | |
609 | struct convert<request_type> { | |
610 | static bool decode(const Node& node, request_type& rt) { | |
611 | static std::unordered_map<std::string, request_type> mappings = { | |
612 | { "seqread", request_type::seqread }, | |
613 | { "seqwrite", request_type::seqwrite}, | |
614 | { "randread", request_type::randread }, | |
615 | { "randwrite", request_type::randwrite }, | |
616 | { "append", request_type::append}, | |
617 | { "cpu", request_type::cpu}, | |
618 | }; | |
619 | auto reqstr = node.as<std::string>(); | |
620 | if (!mappings.count(reqstr)) { | |
621 | return false; | |
622 | } | |
623 | rt = mappings[reqstr]; | |
624 | return true; | |
625 | } | |
626 | }; | |
627 | ||
628 | template<> | |
629 | struct convert<shard_info> { | |
630 | static bool decode(const Node& node, shard_info& sl) { | |
631 | if (node["parallelism"]) { | |
632 | sl.parallelism = node["parallelism"].as<unsigned>(); | |
633 | } | |
20effc67 TL |
634 | if (node["rps"]) { |
635 | sl.rps = node["rps"].as<unsigned>(); | |
636 | } | |
637 | if ((sl.parallelism == 0) == (sl.rps == 0)) { | |
638 | fmt::print("Must specify exactly one of 'parallelism' or 'rps' parameters\n"); | |
639 | return false; | |
640 | } | |
641 | ||
11fdf7f2 TL |
642 | if (node["shares"]) { |
643 | sl.shares = node["shares"].as<unsigned>(); | |
644 | } | |
645 | if (node["reqsize"]) { | |
646 | sl.request_size = node["reqsize"].as<byte_size>().size; | |
647 | } | |
648 | if (node["think_time"]) { | |
649 | sl.think_time = node["think_time"].as<duration_time>().time; | |
650 | } | |
20effc67 TL |
651 | if (node["think_after"]) { |
652 | sl.think_after = node["think_after"].as<duration_time>().time; | |
653 | } | |
11fdf7f2 TL |
654 | if (node["execution_time"]) { |
655 | sl.execution_time = node["execution_time"].as<duration_time>().time; | |
656 | } | |
657 | return true; | |
658 | } | |
659 | }; | |
660 | ||
f67539c2 TL |
661 | template<> |
662 | struct convert<options> { | |
663 | static bool decode(const Node& node, options& op) { | |
664 | if (node["dsync"]) { | |
665 | op.dsync = node["dsync"].as<bool>(); | |
666 | } | |
667 | return true; | |
668 | } | |
669 | }; | |
670 | ||
11fdf7f2 TL |
671 | template<> |
672 | struct convert<job_config> { | |
673 | static bool decode(const Node& node, job_config& cl) { | |
674 | cl.name = node["name"].as<std::string>(); | |
675 | cl.type = node["type"].as<request_type>(); | |
676 | cl.shard_placement = node["shards"].as<shard_config>(); | |
20effc67 TL |
677 | // The data_size is used to divide the available (and effectively |
678 | // constant) disk space between workloads. Each shard inside the | |
679 | // workload thus uses its portion of the assigned space. | |
680 | if (node["data_size"]) { | |
681 | cl.file_size = node["data_size"].as<byte_size>().size / smp::count; | |
682 | } else { | |
683 | cl.file_size = 1ull << 30; // 1G by default | |
684 | } | |
11fdf7f2 TL |
685 | if (node["shard_info"]) { |
686 | cl.shard_info = node["shard_info"].as<shard_info>(); | |
687 | } | |
f67539c2 TL |
688 | if (node["options"]) { |
689 | cl.options = node["options"].as<options>(); | |
690 | } | |
11fdf7f2 TL |
691 | return true; |
692 | } | |
693 | }; | |
694 | } | |
695 | ||
696 | /// Each shard has one context, and the context is responsible for creating the classes that should | |
697 | /// run in this shard. | |
698 | class context { | |
699 | std::vector<std::unique_ptr<class_data>> _cl; | |
700 | ||
701 | sstring _dir; | |
20effc67 | 702 | directory_entry_type _type; |
11fdf7f2 TL |
703 | std::chrono::seconds _duration; |
704 | ||
705 | semaphore _finished; | |
706 | public: | |
20effc67 | 707 | context(sstring dir, directory_entry_type dtype, std::vector<job_config> req_config, unsigned duration) |
11fdf7f2 | 708 | : _cl(boost::copy_range<std::vector<std::unique_ptr<class_data>>>(req_config |
f67539c2 | 709 | | boost::adaptors::filtered([] (auto& cfg) { return cfg.shard_placement.is_set(this_shard_id()); }) |
11fdf7f2 TL |
710 | | boost::adaptors::transformed([] (auto& cfg) { return cfg.gen_class_data(); }) |
711 | )) | |
712 | , _dir(dir) | |
20effc67 | 713 | , _type(dtype) |
11fdf7f2 TL |
714 | , _duration(duration) |
715 | , _finished(0) | |
716 | {} | |
717 | ||
9f95a23c TL |
718 | future<> stop() { |
719 | return parallel_for_each(_cl, [] (std::unique_ptr<class_data>& cl) { | |
720 | return cl->stop(); | |
721 | }); | |
722 | } | |
11fdf7f2 TL |
723 | |
724 | future<> start() { | |
725 | return parallel_for_each(_cl, [this] (std::unique_ptr<class_data>& cl) { | |
20effc67 | 726 | return cl->start(_dir, _type); |
11fdf7f2 TL |
727 | }); |
728 | } | |
729 | ||
730 | future<> issue_requests() { | |
731 | return parallel_for_each(_cl.begin(), _cl.end(), [this] (std::unique_ptr<class_data>& cl) { | |
732 | return cl->issue_requests(std::chrono::steady_clock::now() + _duration).finally([this] { | |
733 | _finished.signal(1); | |
734 | }); | |
735 | }); | |
736 | } | |
737 | ||
20effc67 TL |
738 | future<> emit_results(YAML::Emitter& out) { |
739 | return _finished.wait(_cl.size()).then([this, &out] { | |
11fdf7f2 | 740 | for (auto& cl: _cl) { |
20effc67 TL |
741 | out << YAML::Key << cl->name(); |
742 | out << YAML::BeginMap; | |
743 | cl->emit_results(out); | |
744 | out << YAML::EndMap; | |
11fdf7f2 TL |
745 | } |
746 | return make_ready_future<>(); | |
747 | }); | |
748 | } | |
749 | }; | |
750 | ||
20effc67 TL |
751 | static void show_results(distributed<context>& ctx) { |
752 | YAML::Emitter out; | |
753 | out << YAML::BeginDoc; | |
754 | out << YAML::BeginSeq; | |
755 | for (unsigned i = 0; i < smp::count; ++i) { | |
756 | out << YAML::BeginMap; | |
757 | out << YAML::Key << "shard" << YAML::Value << i; | |
758 | ctx.invoke_on(i, [&out] (auto& c) { | |
759 | return c.emit_results(out); | |
760 | }).get(); | |
761 | out << YAML::EndMap; | |
762 | } | |
763 | out << YAML::EndSeq; | |
764 | out << YAML::EndDoc; | |
765 | std::cout << out.c_str(); | |
11fdf7f2 TL |
766 | } |
767 | ||
768 | int main(int ac, char** av) { | |
769 | namespace bpo = boost::program_options; | |
770 | ||
771 | app_template app; | |
772 | auto opt_add = app.add_options(); | |
773 | opt_add | |
20effc67 | 774 | ("storage", bpo::value<sstring>()->default_value("."), "directory or block device where to execute the test") |
11fdf7f2 TL |
775 | ("duration", bpo::value<unsigned>()->default_value(10), "for how long (in seconds) to run the test") |
776 | ("conf", bpo::value<sstring>()->default_value("./conf.yaml"), "YAML file containing benchmark specification") | |
20effc67 | 777 | ("keep-files", bpo::value<bool>()->default_value(false), "keep test files, next run may re-use them") |
11fdf7f2 TL |
778 | ; |
779 | ||
780 | distributed<context> ctx; | |
781 | return app.run(ac, av, [&] { | |
782 | return seastar::async([&] { | |
783 | auto& opts = app.configuration(); | |
20effc67 TL |
784 | auto& storage = opts["storage"].as<sstring>(); |
785 | ||
786 | auto st_type = engine().file_type(storage).get0(); | |
787 | ||
788 | if (!st_type) { | |
789 | throw std::runtime_error(format("Unknown storage {}", storage)); | |
790 | } | |
11fdf7f2 | 791 | |
20effc67 TL |
792 | if (*st_type == directory_entry_type::directory) { |
793 | auto fs = file_system_at(storage).get0(); | |
794 | if (fs != fs_type::xfs) { | |
795 | throw std::runtime_error(format("This is a performance test. {} is not on XFS", storage)); | |
796 | } | |
11fdf7f2 TL |
797 | } |
798 | ||
20effc67 | 799 | keep_files = opts["keep-files"].as<bool>(); |
11fdf7f2 TL |
800 | auto& duration = opts["duration"].as<unsigned>(); |
801 | auto& yaml = opts["conf"].as<sstring>(); | |
802 | YAML::Node doc = YAML::LoadFile(yaml); | |
803 | auto reqs = doc.as<std::vector<job_config>>(); | |
804 | ||
805 | parallel_for_each(reqs, [] (auto& r) { | |
806 | return seastar::create_scheduling_group(r.name, r.shard_info.shares).then([&r] (seastar::scheduling_group sg) { | |
807 | r.shard_info.scheduling_group = sg; | |
808 | }); | |
809 | }).get(); | |
810 | ||
20effc67 TL |
811 | if (*st_type == directory_entry_type::block_device) { |
812 | uint64_t off = 0; | |
813 | for (job_config& r : reqs) { | |
814 | r.offset_in_bdev = off; | |
815 | off += r.file_size; | |
816 | } | |
817 | } | |
818 | ||
819 | ctx.start(storage, *st_type, reqs, duration).get0(); | |
11fdf7f2 TL |
820 | engine().at_exit([&ctx] { |
821 | return ctx.stop(); | |
822 | }); | |
823 | std::cout << "Creating initial files..." << std::endl; | |
824 | ctx.invoke_on_all([] (auto& c) { | |
825 | return c.start(); | |
826 | }).get(); | |
827 | std::cout << "Starting evaluation..." << std::endl; | |
828 | ctx.invoke_on_all([] (auto& c) { | |
829 | return c.issue_requests(); | |
830 | }).get(); | |
20effc67 | 831 | show_results(ctx); |
9f95a23c | 832 | ctx.stop().get0(); |
11fdf7f2 TL |
833 | }).or_terminate(); |
834 | }); | |
835 | } |