#include <cmath>
#include <sys/vfs.h>
#include <sys/sysmacros.h>
-#include <boost/filesystem.hpp>
#include <boost/range/irange.hpp>
#include <boost/program_options.hpp>
#include <boost/iterator/counting_iterator.hpp>
#include <fstream>
#include <wordexp.h>
#include <yaml-cpp/yaml.h>
+#include <fmt/printf.h>
#include <seastar/core/thread.hh>
#include <seastar/core/sstring.hh>
#include <seastar/core/posix.hh>
using namespace seastar;
using namespace std::chrono_literals;
-namespace fs = std::experimental::filesystem;
+namespace fs = seastar::compat::filesystem;
logger iotune_logger("iotune");
using iotune_clock = std::chrono::steady_clock;
static thread_local std::default_random_engine random_generator(std::chrono::duration_cast<std::chrono::nanoseconds>(iotune_clock::now().time_since_epoch()).count());
-template <typename Type>
-Type read_sys_file_as(fs::path sys_file) {
- return boost::lexical_cast<Type>(read_first_line(sys_file));
-}
-
void check_device_properties(fs::path dev_sys_file) {
auto sched_file = dev_sys_file / "queue" / "scheduler";
auto sched_string = read_first_line(sched_file);
}
auto nomerges_file = dev_sys_file / "queue" / "nomerges";
- auto nomerges = read_sys_file_as<unsigned>(nomerges_file);
+ auto nomerges = read_first_line_as<unsigned>(nomerges_file);
if (nomerges != 2u) {
iotune_logger.warn("nomerges for {} set to {}. It is recommend to set it to 2 before evaluation so that merges are disabled. Results can be skewed otherwise.",
nomerges_file.string(), nomerges);
if (fs::exists(sys_file / "slaves")) {
for (auto& dev : fs::directory_iterator(sys_file / "slaves")) {
is_leaf = false;
- scan_device(read_first_line(dev / "dev"));
+ scan_device(read_first_line(dev.path() / "dev"));
}
}
} else {
check_device_properties(sys_file);
auto queue_dir = sys_file / "queue";
- auto disk_min_io_size = read_sys_file_as<uint64_t>(queue_dir / "minimum_io_size");
+ auto disk_min_io_size = read_first_line_as<uint64_t>(queue_dir / "minimum_io_size");
_min_data_transfer_size = std::max(_min_data_transfer_size, disk_min_io_size);
- _max_iodepth += read_sys_file_as<uint64_t>(queue_dir / "nr_requests");
+ _max_iodepth += read_first_line_as<uint64_t>(queue_dir / "nr_requests");
_disks_per_array++;
}
} catch (std::system_error& se) {
virtual uint64_t get_pos() {
if (_position >= _size_limit) {
- throw invalid_position();
+ // Wrap around if reaching EOF. The write bandwidth is lower,
+ // and we also split the write bandwidth among shards, while we
+ // read only from shard 0, so shard 0's file may not be large
+ // enough to read from.
+ _position = 0;
}
auto pos = _position;
_position += _buffer_size;
uint64_t _bytes = 0;
unsigned _requests = 0;
size_t _buffer_size;
- std::chrono::duration<double> _duration;
std::chrono::time_point<iotune_clock, std::chrono::duration<double>> _start_measuring;
std::chrono::time_point<iotune_clock, std::chrono::duration<double>> _end_measuring;
std::chrono::time_point<iotune_clock, std::chrono::duration<double>> _end_load;
io_worker(size_t buffer_size, std::chrono::duration<double> duration, std::unique_ptr<request_issuer> reqs, std::unique_ptr<position_generator> pos)
: _buffer_size(buffer_size)
- , _duration(duration)
, _start_measuring(iotune_clock::now() + std::chrono::duration<double>(10ms))
, _end_measuring(_start_measuring + duration)
, _end_load(_end_measuring + 10ms)
auto worker = worker_ptr.get();
auto concurrency = boost::irange<unsigned, unsigned>(0, max_os_concurrency, 1);
- return parallel_for_each(std::move(concurrency), [this, worker] (unsigned idx) {
+ return parallel_for_each(std::move(concurrency), [worker] (unsigned idx) {
auto bufptr = worker->get_buffer();
auto buf = bufptr.get();
- return do_until([worker] { return worker->should_stop(); }, [this, buf, worker, idx] {
+ return do_until([worker] { return worker->should_stop(); }, [buf, worker] {
return worker->issue_request(buf);
- }).finally([this, alive = std::move(bufptr)] {});
+ }).finally([alive = std::move(bufptr)] {});
}).then_wrapped([this, worker = std::move(worker_ptr), update_file_size] (future<> f) {
try {
f.get();
}
future<> stop() {
- return make_ready_future<>();
+ return _file.close();
}
};
}
future<> create_data_file() {
- return _iotune_test_file.invoke_on_all([this] (test_file& tf) {
+ return _iotune_test_file.invoke_on_all([] (test_file& tf) {
return tf.create_data_file();
});
}
string_to_file(conf_file, buf);
}
-void write_property_file(sstring conf_file, struct std::vector<disk_descriptor> disk_descriptors) {
+void write_property_file(sstring conf_file, std::vector<disk_descriptor> disk_descriptors) {
YAML::Emitter out;
out << YAML::BeginMap;
out << YAML::Key << "disks";
mnt_candidate = current;
candidate_id = st.st_dev;
current = current.parent_path();
- } while (!current.empty());
+ } while (mnt_candidate != current);
return mnt_candidate;
}
auto format = configuration["format"].as<sstring>();
auto duration = std::chrono::duration<double>(configuration["duration"].as<unsigned>() * 1s);
- struct std::vector<disk_descriptor> disk_descriptors;
+ std::vector<disk_descriptor> disk_descriptors;
std::unordered_map<sstring, sstring> mountpoint_map;
// We want to evaluate once per mountpoint, but we still want to write in one of the
// directories that we were provided - we may not have permissions to write into the
::iotune_multi_shard_context iotune_tests(test_directory);
iotune_tests.start().get();
- iotune_tests.create_data_file().get();
-
auto stop = defer([&iotune_tests] {
- iotune_tests.stop().get();
+ try {
+ iotune_tests.stop().get();
+ } catch (...) {
+ fmt::print("Error occurred during iotune context shutdown: {}", std::current_exception());
+ abort();
+ }
});
+ iotune_tests.create_data_file().get();
+
fmt::print("Starting Evaluation. This may take a while...\n");
fmt::print("Measuring sequential write bandwidth: ");
std::cout.flush();