]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/test/fio/fio_ceph_objectstore.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / test / fio / fio_ceph_objectstore.cc
index b8c296247ae5af5fc6fff69bce6a3dab7624b1e0..33f900b80fca671c700d241361aae387f5c4cba2 100644 (file)
@@ -11,6 +11,7 @@
 #include <memory>
 #include <system_error>
 #include <vector>
+#include <fstream>
 
 #include "os/ObjectStore.h"
 #include "global/global_init.h"
@@ -18,7 +19,9 @@
 #include "include/intarith.h"
 #include "include/stringify.h"
 #include "include/random.h"
+#include "include/str_list.h"
 #include "common/perf_counters.h"
+#include "common/TracepointProvider.h"
 
 #include <fio.h>
 #include <optgroup.h>
@@ -35,7 +38,11 @@ namespace {
 struct Options {
   thread_data* td;
   char* conf;
+  char* perf_output_file;
+  char* throttle_values;
+  char* deferred_throttle_values;
   unsigned long long
+    cycle_throttle_period,
     oi_attr_len_low,
     oi_attr_len_high,
     snapset_attr_len_low,
@@ -49,6 +56,7 @@ struct Options {
   unsigned simulate_pglog;
   unsigned single_pool_mode;
   unsigned preallocate_files;
+  unsigned check_files;
 };
 
 template <class Func> // void Func(fio_option&)
@@ -70,6 +78,14 @@ static std::vector<fio_option> ceph_options{
     o.help   = "Path to a ceph configuration file";
     o.off1   = offsetof(Options, conf);
   }),
+  make_option([] (fio_option& o) {
+    o.name   = "perf_output_file";
+    o.lname  = "perf output target";
+    o.type   = FIO_OPT_STR_STORE;
+    o.help   = "Path to which to write json formatted perf output";
+    o.off1   = offsetof(Options, perf_output_file);
+    o.def    = 0;
+  }),
   make_option([] (fio_option& o) {
     o.name   = "oi_attr_len";
     o.lname  = "OI Attr length";
@@ -144,6 +160,39 @@ static std::vector<fio_option> ceph_options{
     o.off1   = offsetof(Options, preallocate_files);
     o.def    = "1";
   }),
+  make_option([] (fio_option& o) {
+    o.name   = "check_files";
+    o.lname  = "ensure files exist and are correct on init";
+    o.type   = FIO_OPT_BOOL;
+    o.help   = "Enables/disables checking of files on init";
+    o.off1   = offsetof(Options, check_files);
+    o.def    = "0";
+  }),
+  make_option([] (fio_option& o) {
+    o.name   = "bluestore_throttle";
+    o.lname  = "set bluestore throttle";
+    o.type   = FIO_OPT_STR_STORE;
+    o.help   = "comma delimited list of throttle values",
+    o.off1   = offsetof(Options, throttle_values);
+    o.def    = 0;
+  }),
+  make_option([] (fio_option& o) {
+    o.name   = "bluestore_deferred_throttle";
+    o.lname  = "set bluestore deferred throttle";
+    o.type   = FIO_OPT_STR_STORE;
+    o.help   = "comma delimited list of throttle values",
+    o.off1   = offsetof(Options, deferred_throttle_values);
+    o.def    = 0;
+  }),
+  make_option([] (fio_option& o) {
+    o.name   = "vary_bluestore_throttle_period";
+    o.lname = "period between different throttle values";
+    o.type   = FIO_OPT_STR_VAL;
+    o.help = "set to non-zero value to periodically cycle through throttle options";
+    o.off1   = offsetof(Options, cycle_throttle_period);
+    o.def    = "0";
+    o.minval = 0;
+  }),
   {} // fio expects a 'null'-terminated list
 };
 
@@ -263,6 +312,9 @@ struct Engine {
   int ref_count;
   const bool unlink; //< unlink objects on destruction
 
+  // file to which to output formatted perf information
+  const std::optional<std::string> perf_output_file;
+
   explicit Engine(thread_data* td);
   ~Engine();
 
@@ -281,23 +333,23 @@ struct Engine {
     --ref_count;
     if (!ref_count) {
       ostringstream ostr;
-      Formatter* f = Formatter::create("json-pretty", "json-pretty", "json-pretty");
+      Formatter* f = Formatter::create(
+       "json-pretty", "json-pretty", "json-pretty");
+      f->open_object_section("perf_output");
       cct->get_perfcounters_collection()->dump_formatted(f, false);
-      ostr << "FIO plugin ";
-      f->flush(ostr);
       if (g_conf()->rocksdb_perf) {
+       f->open_object_section("rocksdb_perf");
         os->get_db_statistics(f);
-        ostr << "FIO get_db_statistics ";
-        f->flush(ostr);
+       f->close_section();
       }
-      ostr << "Mempools: ";
-      f->open_object_section("mempools");
       mempool::dump(f);
+      {
+       f->open_object_section("db_histogram");
+       os->generate_db_histogram(f);
+       f->close_section();
+      }
       f->close_section();
-      f->flush(ostr);
       
-      ostr << "Generate db histogram: ";
-      os->generate_db_histogram(f);
       f->flush(ostr);
       delete f;
 
@@ -305,14 +357,33 @@ struct Engine {
        destroy_collections(os, collections);
       }
       os->umount();
-      dout(0) <<  ostr.str() << dendl;
+      dout(0) << "FIO plugin perf dump:" << dendl;
+      dout(0) << ostr.str() << dendl;
+      if (perf_output_file) {
+       try {
+         std::ofstream foutput(*perf_output_file);
+         foutput << ostr.str() << std::endl;
+       } catch (std::exception &e) {
+         std::cerr << "Unable to write formatted output to "
+                   << *perf_output_file
+                   << ", exception: " << e.what()
+                   << std::endl;
+       }
+      }
     }
   }
 };
 
+TracepointProvider::Traits bluestore_tracepoint_traits("libbluestore_tp.so",
+                                                      "bluestore_tracing");
+
 Engine::Engine(thread_data* td)
   : ref_count(0),
-    unlink(td->o.unlink)
+    unlink(td->o.unlink),
+    perf_output_file(
+      static_cast<Options*>(td->eo)->perf_output_file ?
+      std::make_optional(static_cast<Options*>(td->eo)->perf_output_file) :
+      std::nullopt)
 {
   // add the ceph command line arguments
   auto o = static_cast<Options*>(td->eo);
@@ -334,6 +405,8 @@ Engine::Engine(thread_data* td)
                    CINIT_FLAG_NO_DEFAULT_CONFIG_FILE);
   common_init_finish(g_ceph_context);
 
+  TracepointProvider::initialize<bluestore_tracepoint_traits>(g_ceph_context);
+
   // create the ObjectStore
   os.reset(ObjectStore::create(g_ceph_context,
                                g_conf().get_val<std::string>("osd objectstore"),
@@ -397,6 +470,7 @@ struct Object {
 /// or just a client using its own objects from the shared pool
 struct Job {
   Engine* engine; //< shared ptr to the global Engine
+  const unsigned subjob_number; //< subjob num
   std::vector<Collection> collections; //< job's private collections to spread objects over
   std::vector<Object> objects; //< associate an object with each fio_file
   std::vector<io_u*> events; //< completions for fio_ceph_os_event()
@@ -404,6 +478,26 @@ struct Job {
 
   bufferptr one_for_all_data; //< preallocated buffer long enough
                               //< to use for vairious operations
+  std::mutex throttle_lock;
+  const vector<unsigned> throttle_values;
+  const vector<unsigned> deferred_throttle_values;
+  std::chrono::duration<double> cycle_throttle_period;
+  mono_clock::time_point last = ceph::mono_clock::zero();
+  unsigned index = 0;
+
+  static vector<unsigned> parse_throttle_str(const char *p) {
+    vector<unsigned> ret;
+    if (p == nullptr) {
+      return ret;
+    }
+    ceph::for_each_substr(p, ",\"", [&ret] (auto &&s) mutable {
+      if (s.size() > 0) {
+       ret.push_back(std::stoul(std::string(s)));
+      }
+    });
+    return ret;
+  }
+  void check_throttle();
 
   Job(Engine* engine, const thread_data* td);
   ~Job();
@@ -411,8 +505,15 @@ struct Job {
 
 Job::Job(Engine* engine, const thread_data* td)
   : engine(engine),
+    subjob_number(td->subjob_number),
     events(td->o.iodepth),
-    unlink(td->o.unlink)
+    unlink(td->o.unlink),
+    throttle_values(
+      parse_throttle_str(static_cast<Options*>(td->eo)->throttle_values)),
+    deferred_throttle_values(
+      parse_throttle_str(static_cast<Options*>(td->eo)->deferred_throttle_values)),
+    cycle_throttle_period(
+      static_cast<Options*>(td->eo)->cycle_throttle_period)
 {
   engine->ref();
   auto o = static_cast<Options*>(td->eo);
@@ -441,6 +542,7 @@ Job::Job(Engine* engine, const thread_data* td)
 
   // create an object for each file in the job
   objects.reserve(td->o.nr_files);
+  unsigned checked_or_preallocated = 0;
   for (uint32_t i = 0; i < td->o.nr_files; i++) {
     auto f = td->files[i];
     f->real_file_size = file_size;
@@ -460,6 +562,31 @@ Job::Job(Engine* engine, const thread_data* td)
         throw std::system_error(r, std::system_category(), "job init");
       }
     }
+    if (o->check_files) {
+      auto& oid = objects.back().oid;
+      struct stat st;
+      int r = engine->os->stat(coll.ch, oid, &st);
+      if (r || ((unsigned)st.st_size) != file_size) {
+       derr << "Problem checking " << oid << ", r=" << r
+            << ", st.st_size=" << st.st_size
+            << ", file_size=" << file_size
+            << ", nr_files=" << td->o.nr_files << dendl;
+        engine->deref();
+        throw std::system_error(
+         r, std::system_category(), "job init -- cannot check file");
+      }
+    }
+    if (o->check_files || o->preallocate_files) {
+      ++checked_or_preallocated;
+    }
+  }
+  if (o->check_files) {
+    derr << "fio_ceph_objectstore checked " << checked_or_preallocated
+        << " files"<< dendl;
+  }
+  if (o->preallocate_files ){
+    derr << "fio_ceph_objectstore preallocated " << checked_or_preallocated
+        << " files"<< dendl;
   }
 }
 
@@ -482,6 +609,45 @@ Job::~Job()
   engine->deref();
 }
 
+void Job::check_throttle()
+{
+  if (subjob_number != 0)
+    return;
+
+  std::lock_guard<std::mutex> l(throttle_lock);
+  if (throttle_values.empty() && deferred_throttle_values.empty())
+    return;
+
+  if (ceph::mono_clock::is_zero(last) ||
+      ((cycle_throttle_period != cycle_throttle_period.zero()) &&
+       (ceph::mono_clock::now() - last) > cycle_throttle_period)) {
+    unsigned tvals = throttle_values.size() ? throttle_values.size() : 1;
+    unsigned dtvals = deferred_throttle_values.size() ? deferred_throttle_values.size() : 1;
+    if (!throttle_values.empty()) {
+      std::string val = std::to_string(throttle_values[index % tvals]);
+      std::cerr << "Setting bluestore_throttle_bytes to " << val << std::endl;
+      int r = engine->cct->_conf.set_val(
+       "bluestore_throttle_bytes",
+       val,
+       nullptr);
+      ceph_assert(r == 0);
+    }
+    if (!deferred_throttle_values.empty()) {
+      std::string val = std::to_string(deferred_throttle_values[(index / tvals) % dtvals]);
+      std::cerr << "Setting bluestore_deferred_throttle_bytes to " << val << std::endl;
+      int r = engine->cct->_conf.set_val(
+       "bluestore_throttle_deferred_bytes",
+       val,
+       nullptr);
+      ceph_assert(r == 0);
+    }
+    engine->cct->_conf.apply_changes(nullptr);
+    index++;
+    index %= tvals * dtvals;
+    last = ceph::mono_clock::now();
+  }
+}
+
 int fio_ceph_os_setup(thread_data* td)
 {
   // if there are multiple jobs, they must run in the same process against a
@@ -567,6 +733,8 @@ enum fio_q_status fio_ceph_os_queue(thread_data* td, io_u* u)
   auto& coll = object.coll;
   auto& os = job->engine->os;
 
+  job->check_throttle();
+
   if (u->ddir == DDIR_WRITE) {
     // provide a hint if we're likely to read this data back
     const int flags = td_rw(td) ? CEPH_OSD_OP_FLAG_FADVISE_WILLNEED : 0;
@@ -699,7 +867,7 @@ enum fio_q_status fio_ceph_os_queue(thread_data* td, io_u* u)
       u->error = r;
       td_verror(td, u->error, "xfer");
     } else {
-      bl.copy(0, bl.length(), static_cast<char*>(u->xfer_buf));
+      bl.begin().copy(bl.length(), static_cast<char*>(u->xfer_buf));
       u->resid = u->xfer_buflen - r;
     }
     return FIO_Q_COMPLETED;