1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2020 Red Hat, Inc.
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
22 #include <string_view>
24 #include <boost/asio.hpp>
25 #include <boost/system/error_code.hpp>
26 #include <boost/program_options.hpp>
28 #undef FMT_HEADER_ONLY
29 #define FMT_HEADER_ONLY 1
30 #include <fmt/chrono.h>
31 #include <fmt/format.h>
32 #include <fmt/ostream.h>
34 #include <spawn/spawn.hpp>
36 #include "include/neorados/RADOS.hpp"
38 #include "neorados/cls/fifo.h"
42 namespace ba
= boost::asio
;
43 namespace bs
= boost::system
;
44 namespace bpo
= boost::program_options
;
45 namespace cb
= ceph::buffer
;
46 namespace R
= neorados
;
47 namespace RCf
= neorados::cls::fifo
;
48 namespace fifo
= rados::cls::fifo
;
50 namespace sc
= std::chrono
;
53 static constexpr auto PUSH
= 0x01 << 0;
54 static constexpr auto PULL
= 0x01 << 1;
55 static constexpr auto BOTH
= PUSH
| PULL
;
56 static constexpr auto CLEAN
= 0x01 << 2;
57 static constexpr auto METADATA
= 0x01 << 3;
58 static constexpr auto PARTINFO
= 0x01 << 4;
59 static constexpr auto LIST
= 0x01 << 5;
62 std::uint32_t entries
= 0;
63 sc::duration
<double> elapsed
= 0ns
;
65 std::uint64_t ratio() const {
66 return entries
/std::max(elapsed
,
67 sc::duration
<double>(1ns
)).count();
69 benchmark() = default;
70 benchmark(std::uint32_t entries
, sc::duration
<double> elapsed
)
71 : entries(entries
), elapsed(elapsed
) {}
74 benchmark
push(RCf::FIFO
& f
, const std::uint32_t count
,
75 const std::uint32_t entry_size
, const std::uint32_t push_entries
,
79 entry
.push_back(cb::create_small_page_aligned(entry_size
));
82 std::vector
entries(std::min(count
, push_entries
), entry
);
83 auto remaining
= count
;
84 auto start
= sc::steady_clock::now();
86 if (entries
.size() > remaining
) {
87 entries
.resize(remaining
);
90 remaining
-= entries
.size();
92 auto finish
= sc::steady_clock::now();
93 return benchmark(count
, (finish
- start
));
96 benchmark
pull(RCf::FIFO
& f
, const std::uint32_t count
,
97 const std::uint32_t pull_entries
, s::yield_context y
)
99 auto remaining
= count
;
100 std::uint32_t got
= 0;
102 auto start
= sc::steady_clock::now();
104 auto [result
, more
] = f
.list(std::min(remaining
, pull_entries
),
108 got
+= result
.size();
109 remaining
-= result
.size();
110 f
.trim(result
.back().marker
, false, y
);
112 auto finish
= sc::steady_clock::now();
113 return benchmark(got
, (finish
- start
));
116 void concurpull(const std::string
& oid
, const std::int64_t pool
,
117 const std::uint32_t count
, const std::uint32_t pull_entries
,
118 std::promise
<benchmark
> notify
, const bool* const exit_early
)
122 std::exception_ptr ex
;
125 [&](s::yield_context y
) {
127 auto r
= R::RADOS::Builder
{}.build(c
, y
);
128 R::IOContext
ioc(pool
);
129 auto f
= RCf::FIFO::open(r
, ioc
, oid
, y
);
130 auto remaining
= count
;
131 std::uint32_t got
= 0;
133 auto start
= sc::steady_clock::now();
135 if (*exit_early
) break;
136 auto [result
, more
] =
137 f
->list(std::min(remaining
, pull_entries
), std::nullopt
, y
);
138 if (result
.empty()) {
139 // We just keep going assuming they'll push more.
142 got
+= result
.size();
143 remaining
-= result
.size();
144 if (*exit_early
) break;
145 f
->trim(result
.back().marker
, false, y
);
147 auto finish
= sc::steady_clock::now();
149 bench
.elapsed
= finish
- start
;
150 } catch (const std::exception
&) {
151 ex
= std::current_exception();
156 notify
.set_exception(std::current_exception());
158 notify
.set_value(bench
);
162 void clean(R::RADOS
& r
, const R::IOContext
& ioc
, RCf::FIFO
& f
,
166 const auto info
= f
.meta();
167 if (info
.head_part_num
> -1) {
168 for (auto i
= info
.tail_part_num
; i
<= info
.head_part_num
; ++i
) {
171 r
.execute(info
.part_oid(i
), ioc
, std::move(op
), y
);
176 r
.execute(info
.id
, ioc
, std::move(op
), y
);
180 int main(int argc
, char* argv
[])
182 const std::string_view
prog(argv
[0]);
185 std::uint32_t count
= 0;
188 std::uint32_t entry_size
= 0;
189 std::uint32_t push_entries
= 0;
190 std::uint32_t pull_entries
= 0;
191 std::uint64_t max_part_size
= 0;
192 std::uint64_t max_entry_size
= 0;
193 std::int64_t part_num
= 0;
196 bpo::options_description
desc(fmt::format("{} options", prog
));
198 ("help", "show help")
199 ("oid", bpo::value
<std::string
>(&oid
)->default_value("fifo"s
),
200 "the base oid for the fifo")
201 ("pool", bpo::value
<std::string
>(&pool
)->default_value("fifo_benchmark"s
),
202 "the base oid for the fifo")
203 ("count", bpo::value
<std::uint32_t>(&count
)->default_value(1024),
204 "total count of items")
205 ("entry-size", bpo::value
<std::uint32_t>(&entry_size
)->default_value(64),
206 "size of entries to push")
208 bpo::value
<std::uint32_t>(&push_entries
)
209 ->default_value(512), "entries to push per call")
210 ("max-part-size", bpo::value
<std::uint64_t>(&max_part_size
)
211 ->default_value(RCf::default_max_part_size
),
212 "maximum entry size allowed by FIFO")
213 ("max-entry-size", bpo::value
<std::uint64_t>(&max_entry_size
)
214 ->default_value(RCf::default_max_entry_size
),
215 "maximum entry size allowed by FIFO")
217 bpo::value
<uint32_t>(&pull_entries
)
218 ->default_value(512), "entries to pull per call")
220 bpo::value
<int64_t>(&part_num
)
221 ->default_value(-1), "partition number, -1 for head")
222 ("marker", bpo::value
<std::string
>(&marker
), "marker to begin list")
223 ("command", bpo::value
<std::string
>(&command
),
224 "the operation to perform");
226 bpo::positional_options_description p
;
229 bpo::variables_map vm
;
231 bpo::store(bpo::command_line_parser(argc
, argv
).
232 options(desc
).positional(p
).run(), vm
);
236 if (vm
.count("help")) {
237 fmt::print(std::cout
, "{}", desc
);
238 fmt::print(std::cout
, "\n{} commands:\n", prog
);
239 fmt::print(std::cout
, " push\t\t\t push entries into fifo\n");
240 fmt::print(std::cout
, " pull\t\t\t retrieve and trim entries\n");
241 fmt::print(std::cout
, " both\t\t\t both at once, in two threads\n");
242 fmt::print(std::cout
, " metadata\t\t\t print metadata\n");
243 fmt::print(std::cout
, " partinfo\t\t\t print metadata\n");
244 fmt::print(std::cout
, " list\t\t\t list entries\n");
245 fmt::print(std::cout
, " clean\t\t\t clean up\n");
250 if (vm
.find("command") == vm
.end()) {
251 fmt::print(std::cerr
, "{}: a command is required\n", prog
);
256 if (command
== "push"s
) {
258 } else if (command
== "pull"s
) {
260 } else if (command
== "both"s
) {
262 } else if (command
== "clean"s
) {
264 } else if (command
== "metadata"s
) {
266 } else if (command
== "partinfo"s
) {
268 } else if (command
== "list"s
) {
271 fmt::print(std::cerr
, "{}: {} is not a valid command\n",
276 if (!(op
& PULL
) && !vm
["pull-entries"].defaulted()) {
277 fmt::print(std::cerr
, "{}: pull-entries is only meaningful when pulling\n",
283 for (const auto& p
: { "entry-size"s
, "push-entries"s
, "max-part-size"s
,
284 "max-entry-size"s
}) {
285 if (!vm
[p
].defaulted()) {
286 fmt::print(std::cerr
, "{}: {} is only meaningful when pushing\n",
293 if (!(op
& BOTH
) && !(op
& LIST
) && !vm
["count"].defaulted()) {
294 fmt::print(std::cerr
, "{}: count is only meaningful when pulling, pushing, both, or listing\n",
299 if (!(op
& PARTINFO
) && !vm
["part-num"].defaulted()) {
300 fmt::print(std::cerr
, "{}: part-num is only meaningful when getting part info\n",
306 fmt::print(std::cerr
, "{}: count must be nonzero\n", prog
);
310 if ((op
& PULL
) && (pull_entries
== 0)) {
311 fmt::print(std::cerr
,
312 "{}: pull-entries must be nonzero\n", prog
);
316 if (!(op
& LIST
) && vm
.count("marker") > 0) {
317 fmt::print(std::cerr
, "{}: marker is only meaningful when listing\n",
323 if (entry_size
== 0) {
324 fmt::print(std::cerr
, "{}: entry-size must be nonzero\n", prog
);
327 if (push_entries
== 0) {
328 fmt::print(std::cerr
, "{}: push-entries must be nonzero\n", prog
);
331 if (max_entry_size
== 0) {
332 fmt::print(std::cerr
, "{}: max-entry-size must be nonzero\n", prog
);
335 if (max_part_size
== 0) {
336 fmt::print(std::cerr
, "{}: max-part-size must be nonzero\n", prog
);
339 if (entry_size
> max_entry_size
) {
340 fmt::print(std::cerr
,
341 "{}: entry-size may not be greater than max-entry-size\n",
345 if (max_entry_size
>= max_part_size
) {
346 fmt::print(std::cerr
,
347 "{}: max-entry-size may be less than max-part-size\n",
354 benchmark pushmark
, pullmark
;
356 fifo::part_header partinfo
;
358 std::vector
<RCf::list_entry
> entries
;
361 [&](s::yield_context y
) {
362 auto r
= R::RADOS::Builder
{}.build(c
, y
);
365 pid
= r
.lookup_pool(pool
, y
[ec
]);
367 r
.create_pool(pool
, std::nullopt
, y
);
368 pid
= r
.lookup_pool(pool
, y
);
370 const R::IOContext
ioc(pid
);
371 auto f
= RCf::FIFO::create(r
, ioc
, oid
, y
, std::nullopt
,
372 std::nullopt
, false, max_part_size
,
377 pushmark
= push(*f
, count
, entry_size
, push_entries
, y
);
381 pullmark
= pull(*f
, count
, pull_entries
, y
);
390 if (part_num
== -1) {
391 part_num
= meta
.head_part_num
;
393 partinfo
= f
->get_part_info(part_num
, y
);
397 if (vm
.count("marker") == 0) {
398 std::tie(entries
, more
) = f
->list(count
, std::nullopt
, y
);
400 std::tie(entries
, more
) = f
->list(count
, marker
, y
);
405 std::promise
<benchmark
> notify
;
406 bool exit_early
= false;
408 auto notifier
= notify
.get_future();
409 std::thread
t(concurpull
, oid
, pid
, count
, pull_entries
,
410 std::move(notify
), &exit_early
);
413 pushmark
= push(*f
, count
, entry_size
, push_entries
, y
);
414 } catch (const std::exception
&) {
419 pullmark
= notifier
.get();
424 clean(r
, ioc
, *f
, y
);
428 fmt::print("Pushed {} in {} at {}/s\n",
429 pushmark
.entries
, pushmark
.elapsed
, pushmark
.ratio());
432 if (pullmark
.entries
== count
) {
433 fmt::print(std::cout
, "Pulled {} in {} at {}/s\n",
434 pullmark
.entries
, pullmark
.elapsed
, pullmark
.ratio());
436 fmt::print(std::cout
, "Pulled {} (of {} requested), in {} at {}/s\n",
437 pullmark
.entries
, count
, pullmark
.elapsed
, pullmark
.ratio());
441 fmt::print(std::cout
, "Metadata: [{}]\n", meta
);
444 fmt::print(std::cout
, "Info for partition {}: [{}]\n", part_num
, partinfo
);
447 for (const auto& entry
: entries
) {
448 fmt::print(std::cout
, "{}\t{}\n", entry
.marker
, entry
.mtime
);
451 fmt::print(std::cout
, "...");
454 } catch (const std::exception
& e
) {
455 if (command
.empty()) {
456 fmt::print(std::cerr
, "{}: {}\n", prog
, e
.what());
458 fmt::print(std::cerr
, "{}: {}: {}\n", prog
, command
, e
.what());