#include <memory>
#include <system_error>
#include <vector>
+#include <fstream>
#include "os/ObjectStore.h"
#include "global/global_init.h"
#include "include/intarith.h"
#include "include/stringify.h"
#include "include/random.h"
+#include "include/str_list.h"
#include "common/perf_counters.h"
+#include "common/TracepointProvider.h"
#include <fio.h>
#include <optgroup.h>
struct Options {
thread_data* td;
char* conf;
+ char* perf_output_file;
+ char* throttle_values;
+ char* deferred_throttle_values;
unsigned long long
+ cycle_throttle_period,
oi_attr_len_low,
oi_attr_len_high,
snapset_attr_len_low,
unsigned simulate_pglog;
unsigned single_pool_mode;
unsigned preallocate_files;
+ unsigned check_files;
};
template <class Func> // void Func(fio_option&)
o.help = "Path to a ceph configuration file";
o.off1 = offsetof(Options, conf);
}),
+ make_option([] (fio_option& o) {
+ o.name = "perf_output_file";
+ o.lname = "perf output target";
+ o.type = FIO_OPT_STR_STORE;
+ o.help = "Path to which to write json formatted perf output";
+ o.off1 = offsetof(Options, perf_output_file);
+ o.def = 0;
+ }),
make_option([] (fio_option& o) {
o.name = "oi_attr_len";
o.lname = "OI Attr length";
o.off1 = offsetof(Options, preallocate_files);
o.def = "1";
}),
+ make_option([] (fio_option& o) {
+ o.name = "check_files";
+ o.lname = "ensure files exist and are correct on init";
+ o.type = FIO_OPT_BOOL;
+ o.help = "Enables/disables checking of files on init";
+ o.off1 = offsetof(Options, check_files);
+ o.def = "0";
+ }),
+ make_option([] (fio_option& o) {
+ o.name = "bluestore_throttle";
+ o.lname = "set bluestore throttle";
+ o.type = FIO_OPT_STR_STORE;
+ o.help = "comma delimited list of throttle values",
+ o.off1 = offsetof(Options, throttle_values);
+ o.def = 0;
+ }),
+ make_option([] (fio_option& o) {
+ o.name = "bluestore_deferred_throttle";
+ o.lname = "set bluestore deferred throttle";
+ o.type = FIO_OPT_STR_STORE;
+ o.help = "comma delimited list of throttle values",
+ o.off1 = offsetof(Options, deferred_throttle_values);
+ o.def = 0;
+ }),
+ make_option([] (fio_option& o) {
+ o.name = "vary_bluestore_throttle_period";
+ o.lname = "period between different throttle values";
+ o.type = FIO_OPT_STR_VAL;
+ o.help = "set to non-zero value to periodically cycle through throttle options";
+ o.off1 = offsetof(Options, cycle_throttle_period);
+ o.def = "0";
+ o.minval = 0;
+ }),
{} // fio expects a 'null'-terminated list
};
int ref_count;
const bool unlink; //< unlink objects on destruction
+ // file to which to output formatted perf information
+ const std::optional<std::string> perf_output_file;
+
explicit Engine(thread_data* td);
~Engine();
--ref_count;
if (!ref_count) {
ostringstream ostr;
- Formatter* f = Formatter::create("json-pretty", "json-pretty", "json-pretty");
+ Formatter* f = Formatter::create(
+ "json-pretty", "json-pretty", "json-pretty");
+ f->open_object_section("perf_output");
cct->get_perfcounters_collection()->dump_formatted(f, false);
- ostr << "FIO plugin ";
- f->flush(ostr);
if (g_conf()->rocksdb_perf) {
+ f->open_object_section("rocksdb_perf");
os->get_db_statistics(f);
- ostr << "FIO get_db_statistics ";
- f->flush(ostr);
+ f->close_section();
}
- ostr << "Mempools: ";
- f->open_object_section("mempools");
mempool::dump(f);
+ {
+ f->open_object_section("db_histogram");
+ os->generate_db_histogram(f);
+ f->close_section();
+ }
f->close_section();
- f->flush(ostr);
- ostr << "Generate db histogram: ";
- os->generate_db_histogram(f);
f->flush(ostr);
delete f;
destroy_collections(os, collections);
}
os->umount();
- dout(0) << ostr.str() << dendl;
+ dout(0) << "FIO plugin perf dump:" << dendl;
+ dout(0) << ostr.str() << dendl;
+ if (perf_output_file) {
+ try {
+ std::ofstream foutput(*perf_output_file);
+ foutput << ostr.str() << std::endl;
+ } catch (std::exception &e) {
+ std::cerr << "Unable to write formatted output to "
+ << *perf_output_file
+ << ", exception: " << e.what()
+ << std::endl;
+ }
+ }
}
}
};
+TracepointProvider::Traits bluestore_tracepoint_traits("libbluestore_tp.so",
+ "bluestore_tracing");
+
Engine::Engine(thread_data* td)
: ref_count(0),
- unlink(td->o.unlink)
+ unlink(td->o.unlink),
+ perf_output_file(
+ static_cast<Options*>(td->eo)->perf_output_file ?
+ std::make_optional(static_cast<Options*>(td->eo)->perf_output_file) :
+ std::nullopt)
{
// add the ceph command line arguments
auto o = static_cast<Options*>(td->eo);
CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
common_init_finish(g_ceph_context);
+ TracepointProvider::initialize<bluestore_tracepoint_traits>(g_ceph_context);
+
// create the ObjectStore
os.reset(ObjectStore::create(g_ceph_context,
g_conf().get_val<std::string>("osd objectstore"),
/// or just a client using its own objects from the shared pool
struct Job {
Engine* engine; //< shared ptr to the global Engine
+ const unsigned subjob_number; //< subjob num
std::vector<Collection> collections; //< job's private collections to spread objects over
std::vector<Object> objects; //< associate an object with each fio_file
std::vector<io_u*> events; //< completions for fio_ceph_os_event()
bufferptr one_for_all_data; //< preallocated buffer long enough
//< to use for vairious operations
+ std::mutex throttle_lock;
+ const vector<unsigned> throttle_values;
+ const vector<unsigned> deferred_throttle_values;
+ std::chrono::duration<double> cycle_throttle_period;
+ mono_clock::time_point last = ceph::mono_clock::zero();
+ unsigned index = 0;
+
+ static vector<unsigned> parse_throttle_str(const char *p) {
+ vector<unsigned> ret;
+ if (p == nullptr) {
+ return ret;
+ }
+ ceph::for_each_substr(p, ",\"", [&ret] (auto &&s) mutable {
+ if (s.size() > 0) {
+ ret.push_back(std::stoul(std::string(s)));
+ }
+ });
+ return ret;
+ }
+ void check_throttle();
Job(Engine* engine, const thread_data* td);
~Job();
Job::Job(Engine* engine, const thread_data* td)
: engine(engine),
+ subjob_number(td->subjob_number),
events(td->o.iodepth),
- unlink(td->o.unlink)
+ unlink(td->o.unlink),
+ throttle_values(
+ parse_throttle_str(static_cast<Options*>(td->eo)->throttle_values)),
+ deferred_throttle_values(
+ parse_throttle_str(static_cast<Options*>(td->eo)->deferred_throttle_values)),
+ cycle_throttle_period(
+ static_cast<Options*>(td->eo)->cycle_throttle_period)
{
engine->ref();
auto o = static_cast<Options*>(td->eo);
// create an object for each file in the job
objects.reserve(td->o.nr_files);
+ unsigned checked_or_preallocated = 0;
for (uint32_t i = 0; i < td->o.nr_files; i++) {
auto f = td->files[i];
f->real_file_size = file_size;
throw std::system_error(r, std::system_category(), "job init");
}
}
+ if (o->check_files) {
+ auto& oid = objects.back().oid;
+ struct stat st;
+ int r = engine->os->stat(coll.ch, oid, &st);
+ if (r || ((unsigned)st.st_size) != file_size) {
+ derr << "Problem checking " << oid << ", r=" << r
+ << ", st.st_size=" << st.st_size
+ << ", file_size=" << file_size
+ << ", nr_files=" << td->o.nr_files << dendl;
+ engine->deref();
+ throw std::system_error(
+ r, std::system_category(), "job init -- cannot check file");
+ }
+ }
+ if (o->check_files || o->preallocate_files) {
+ ++checked_or_preallocated;
+ }
+ }
+ if (o->check_files) {
+ derr << "fio_ceph_objectstore checked " << checked_or_preallocated
+ << " files"<< dendl;
+ }
+ if (o->preallocate_files ){
+ derr << "fio_ceph_objectstore preallocated " << checked_or_preallocated
+ << " files"<< dendl;
}
}
engine->deref();
}
+void Job::check_throttle()
+{
+ if (subjob_number != 0)
+ return;
+
+ std::lock_guard<std::mutex> l(throttle_lock);
+ if (throttle_values.empty() && deferred_throttle_values.empty())
+ return;
+
+ if (ceph::mono_clock::is_zero(last) ||
+ ((cycle_throttle_period != cycle_throttle_period.zero()) &&
+ (ceph::mono_clock::now() - last) > cycle_throttle_period)) {
+ unsigned tvals = throttle_values.size() ? throttle_values.size() : 1;
+ unsigned dtvals = deferred_throttle_values.size() ? deferred_throttle_values.size() : 1;
+ if (!throttle_values.empty()) {
+ std::string val = std::to_string(throttle_values[index % tvals]);
+ std::cerr << "Setting bluestore_throttle_bytes to " << val << std::endl;
+ int r = engine->cct->_conf.set_val(
+ "bluestore_throttle_bytes",
+ val,
+ nullptr);
+ ceph_assert(r == 0);
+ }
+ if (!deferred_throttle_values.empty()) {
+ std::string val = std::to_string(deferred_throttle_values[(index / tvals) % dtvals]);
+ std::cerr << "Setting bluestore_deferred_throttle_bytes to " << val << std::endl;
+ int r = engine->cct->_conf.set_val(
+ "bluestore_throttle_deferred_bytes",
+ val,
+ nullptr);
+ ceph_assert(r == 0);
+ }
+ engine->cct->_conf.apply_changes(nullptr);
+ index++;
+ index %= tvals * dtvals;
+ last = ceph::mono_clock::now();
+ }
+}
+
int fio_ceph_os_setup(thread_data* td)
{
// if there are multiple jobs, they must run in the same process against a
auto& coll = object.coll;
auto& os = job->engine->os;
+ job->check_throttle();
+
if (u->ddir == DDIR_WRITE) {
// provide a hint if we're likely to read this data back
const int flags = td_rw(td) ? CEPH_OSD_OP_FLAG_FADVISE_WILLNEED : 0;
u->error = r;
td_verror(td, u->error, "xfer");
} else {
- bl.copy(0, bl.length(), static_cast<char*>(u->xfer_buf));
+ bl.begin().copy(bl.length(), static_cast<char*>(u->xfer_buf));
u->resid = u->xfer_buflen - r;
}
return FIO_Q_COMPLETED;