]> git.proxmox.com Git - ceph.git/blame - ceph/src/test/cls_fifo/bench_cls_fifo.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / test / cls_fifo / bench_cls_fifo.cc
CommitLineData
f67539c2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2020 Red Hat, Inc.
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15
16#include <cerrno>
17#include <chrono>
18#include <cstdint>
19#include <exception>
20#include <future>
21#include <iostream>
22#include <string_view>
23
24#include <boost/asio.hpp>
25#include <boost/system/error_code.hpp>
26#include <boost/program_options.hpp>
27
28#undef FMT_HEADER_ONLY
29#define FMT_HEADER_ONLY 1
30#include <fmt/chrono.h>
31#include <fmt/format.h>
32#include <fmt/ostream.h>
33
34#include <spawn/spawn.hpp>
35
36#include "include/neorados/RADOS.hpp"
37
38#include "neorados/cls/fifo.h"
39
20effc67
TL
40using namespace std;
41
f67539c2
TL
42namespace ba = boost::asio;
43namespace bs = boost::system;
44namespace bpo = boost::program_options;
45namespace cb = ceph::buffer;
46namespace R = neorados;
47namespace RCf = neorados::cls::fifo;
48namespace fifo = rados::cls::fifo;
49namespace s = spawn;
50namespace sc = std::chrono;
51
52namespace {
53static constexpr auto PUSH = 0x01 << 0;
54static constexpr auto PULL = 0x01 << 1;
55static constexpr auto BOTH = PUSH | PULL;
56static constexpr auto CLEAN = 0x01 << 2;
57static constexpr auto METADATA = 0x01 << 3;
58static constexpr auto PARTINFO = 0x01 << 4;
59static constexpr auto LIST = 0x01 << 5;
60
61struct benchmark {
62 std::uint32_t entries = 0;
63 sc::duration<double> elapsed = 0ns;
64
65 std::uint64_t ratio() const {
66 return entries/std::max(elapsed,
67 sc::duration<double>(1ns)).count();
68 }
69 benchmark() = default;
70 benchmark(std::uint32_t entries, sc::duration<double> elapsed)
71 : entries(entries), elapsed(elapsed) {}
72};
73
74benchmark push(RCf::FIFO& f, const std::uint32_t count,
75 const std::uint32_t entry_size, const std::uint32_t push_entries,
76 s::yield_context y)
77{
78 cb::list entry;
79 entry.push_back(cb::create_small_page_aligned(entry_size));
80 entry.zero();
81
82 std::vector entries(std::min(count, push_entries), entry);
83 auto remaining = count;
84 auto start = sc::steady_clock::now();
85 while (remaining) {
86 if (entries.size() > remaining) {
87 entries.resize(remaining);
88 }
89 f.push(entries, y);
90 remaining -= entries.size();
91 }
92 auto finish = sc::steady_clock::now();
93 return benchmark(count, (finish - start));
94}
95
96benchmark pull(RCf::FIFO& f, const std::uint32_t count,
97 const std::uint32_t pull_entries, s::yield_context y)
98{
99 auto remaining = count;
100 std::uint32_t got = 0;
101
102 auto start = sc::steady_clock::now();
103 while (remaining) {
104 auto [result, more] = f.list(std::min(remaining, pull_entries),
105 std::nullopt, y);
106 if (result.empty())
107 break;
108 got += result.size();
109 remaining -= result.size();
110 f.trim(result.back().marker, false, y);
111 }
112 auto finish = sc::steady_clock::now();
113 return benchmark(got, (finish - start));
114}
115
116void concurpull(const std::string& oid, const std::int64_t pool,
117 const std::uint32_t count, const std::uint32_t pull_entries,
118 std::promise<benchmark> notify, const bool* const exit_early)
119{
120 ba::io_context c;
121 benchmark bench;
122 std::exception_ptr ex;
123 s::spawn(
124 c,
125 [&](s::yield_context y) {
126 try {
127 auto r = R::RADOS::Builder{}.build(c, y);
128 R::IOContext ioc(pool);
129 auto f = RCf::FIFO::open(r, ioc, oid, y);
130 auto remaining = count;
131 std::uint32_t got = 0;
132
133 auto start = sc::steady_clock::now();
134 while (remaining) {
135 if (*exit_early) break;
136 auto [result, more] =
137 f->list(std::min(remaining, pull_entries), std::nullopt, y);
138 if (result.empty()) {
139 // We just keep going assuming they'll push more.
140 continue;
141 }
142 got += result.size();
143 remaining -= result.size();
144 if (*exit_early) break;
145 f->trim(result.back().marker, false, y);
146 }
147 auto finish = sc::steady_clock::now();
148 bench.entries = got;
149 bench.elapsed = finish - start;
150 } catch (const std::exception&) {
151 ex = std::current_exception();
152 }
153 });
154 c.run();
155 if (ex) {
156 notify.set_exception(std::current_exception());
157 } else {
158 notify.set_value(bench);
159 }
160}
161
162void clean(R::RADOS& r, const R::IOContext& ioc, RCf::FIFO& f,
163 s::yield_context y)
164{
165 f.read_meta(y);
166 const auto info = f.meta();
167 if (info.head_part_num > -1) {
168 for (auto i = info.tail_part_num; i <= info.head_part_num; ++i) {
169 R::WriteOp op;
170 op.remove();
171 r.execute(info.part_oid(i), ioc, std::move(op), y);
172 }
173 }
174 R::WriteOp op;
175 op.remove();
176 r.execute(info.id, ioc, std::move(op), y);
177}
178}
179
180int main(int argc, char* argv[])
181{
182 const std::string_view prog(argv[0]);
183 std::string command;
184 try {
185 std::uint32_t count = 0;
186 std::string oid;
187 std::string pool;
188 std::uint32_t entry_size = 0;
189 std::uint32_t push_entries = 0;
190 std::uint32_t pull_entries = 0;
191 std::uint64_t max_part_size = 0;
192 std::uint64_t max_entry_size = 0;
193 std::int64_t part_num = 0;
194 std::string marker;
195
196 bpo::options_description desc(fmt::format("{} options", prog));
197 desc.add_options()
198 ("help", "show help")
199 ("oid", bpo::value<std::string>(&oid)->default_value("fifo"s),
200 "the base oid for the fifo")
201 ("pool", bpo::value<std::string>(&pool)->default_value("fifo_benchmark"s),
202 "the base oid for the fifo")
203 ("count", bpo::value<std::uint32_t>(&count)->default_value(1024),
204 "total count of items")
205 ("entry-size", bpo::value<std::uint32_t>(&entry_size)->default_value(64),
206 "size of entries to push")
207 ("push-entries",
208 bpo::value<std::uint32_t>(&push_entries)
209 ->default_value(512), "entries to push per call")
210 ("max-part-size", bpo::value<std::uint64_t>(&max_part_size)
211 ->default_value(RCf::default_max_part_size),
212 "maximum entry size allowed by FIFO")
213 ("max-entry-size", bpo::value<std::uint64_t>(&max_entry_size)
214 ->default_value(RCf::default_max_entry_size),
215 "maximum entry size allowed by FIFO")
216 ("pull-entries",
217 bpo::value<uint32_t>(&pull_entries)
218 ->default_value(512), "entries to pull per call")
219 ("part-num",
220 bpo::value<int64_t>(&part_num)
221 ->default_value(-1), "partition number, -1 for head")
222 ("marker", bpo::value<std::string>(&marker), "marker to begin list")
223 ("command", bpo::value<std::string>(&command),
224 "the operation to perform");
225
226 bpo::positional_options_description p;
227 p.add("command", 1);
228
229 bpo::variables_map vm;
230
231 bpo::store(bpo::command_line_parser(argc, argv).
232 options(desc).positional(p).run(), vm);
233
234 bpo::notify(vm);
235
236 if (vm.count("help")) {
237 fmt::print(std::cout, "{}", desc);
238 fmt::print(std::cout, "\n{} commands:\n", prog);
239 fmt::print(std::cout, " push\t\t\t push entries into fifo\n");
240 fmt::print(std::cout, " pull\t\t\t retrieve and trim entries\n");
241 fmt::print(std::cout, " both\t\t\t both at once, in two threads\n");
242 fmt::print(std::cout, " metadata\t\t\t print metadata\n");
243 fmt::print(std::cout, " partinfo\t\t\t print metadata\n");
244 fmt::print(std::cout, " list\t\t\t list entries\n");
245 fmt::print(std::cout, " clean\t\t\t clean up\n");
246 return 0;
247 }
248
249
250 if (vm.find("command") == vm.end()) {
251 fmt::print(std::cerr, "{}: a command is required\n", prog);
252 return 1;
253 }
254
255 int op = 0;
256 if (command == "push"s) {
257 op = PUSH;
258 } else if (command == "pull"s) {
259 op = PULL;
260 } else if (command == "both"s) {
261 op = BOTH;
262 } else if (command == "clean"s) {
263 op = CLEAN;
264 } else if (command == "metadata"s) {
265 op = METADATA;
266 } else if (command == "partinfo"s) {
267 op = PARTINFO;
268 } else if (command == "list"s) {
269 op = LIST;
270 } else {
271 fmt::print(std::cerr, "{}: {} is not a valid command\n",
272 prog, command);
273 return 1;
274 }
275
276 if (!(op & PULL) && !vm["pull-entries"].defaulted()) {
277 fmt::print(std::cerr, "{}: pull-entries is only meaningful when pulling\n",
278 prog);
279 return 1;
280 }
281
282 if (!(op & PUSH)) {
283 for (const auto& p : { "entry-size"s, "push-entries"s, "max-part-size"s,
284 "max-entry-size"s }) {
285 if (!vm[p].defaulted()) {
286 fmt::print(std::cerr, "{}: {} is only meaningful when pushing\n",
287 prog, p);
288 return 1;
289 }
290 }
291 }
292
293 if (!(op & BOTH) && !(op & LIST) && !vm["count"].defaulted()) {
294 fmt::print(std::cerr, "{}: count is only meaningful when pulling, pushing, both, or listing\n",
295 prog);
296 return 1;
297 }
298
299 if (!(op & PARTINFO) && !vm["part-num"].defaulted()) {
300 fmt::print(std::cerr, "{}: part-num is only meaningful when getting part info\n",
301 prog);
302 return 1;
303 }
304
305 if (count == 0) {
306 fmt::print(std::cerr, "{}: count must be nonzero\n", prog);
307 return 1;
308 }
309
310 if ((op & PULL) && (pull_entries == 0)) {
311 fmt::print(std::cerr,
312 "{}: pull-entries must be nonzero\n", prog);
313 return 1;
314 }
315
316 if (!(op & LIST) && vm.count("marker") > 0) {
317 fmt::print(std::cerr, "{}: marker is only meaningful when listing\n",
318 prog);
319 return 1;
320 }
321
322 if (op & PUSH) {
323 if (entry_size == 0) {
324 fmt::print(std::cerr, "{}: entry-size must be nonzero\n", prog);
325 return 1;
326 }
327 if (push_entries== 0) {
328 fmt::print(std::cerr, "{}: push-entries must be nonzero\n", prog);
329 return 1;
330 }
331 if (max_entry_size == 0) {
332 fmt::print(std::cerr, "{}: max-entry-size must be nonzero\n", prog);
333 return 1;
334 }
335 if (max_part_size == 0) {
336 fmt::print(std::cerr, "{}: max-part-size must be nonzero\n", prog);
337 return 1;
338 }
339 if (entry_size > max_entry_size) {
340 fmt::print(std::cerr,
341 "{}: entry-size may not be greater than max-entry-size\n",
342 prog);
343 return 1;
344 }
345 if (max_entry_size >= max_part_size) {
346 fmt::print(std::cerr,
347 "{}: max-entry-size may be less than max-part-size\n",
348 prog);
349 return 1;
350 }
351 }
352
353 ba::io_context c;
354 benchmark pushmark, pullmark;
355 fifo::info meta;
356 fifo::part_header partinfo;
357 bool more = false;
358 std::vector<RCf::list_entry> entries;
359 s::spawn(
360 c,
361 [&](s::yield_context y) {
362 auto r = R::RADOS::Builder{}.build(c, y);
363 bs::error_code ec;
364 std::int64_t pid;
365 pid = r.lookup_pool(pool, y[ec]);
366 if (ec) {
367 r.create_pool(pool, std::nullopt, y);
368 pid = r.lookup_pool(pool, y);
369 }
370 const R::IOContext ioc(pid);
371 auto f = RCf::FIFO::create(r, ioc, oid, y, std::nullopt,
372 std::nullopt, false, max_part_size,
373 max_entry_size);
374
375 switch (op) {
376 case PUSH:
377 pushmark = push(*f, count, entry_size, push_entries, y);
378 break;
379
380 case PULL:
381 pullmark = pull(*f, count, pull_entries, y);
382 break;
383
384 case METADATA:
385 meta = f->meta();
386 break;
387
388 case PARTINFO:
389 meta = f->meta();
390 if (part_num == -1) {
391 part_num = meta.head_part_num;
392 }
393 partinfo = f->get_part_info(part_num, y);
394 break;
395
396 case LIST:
397 if (vm.count("marker") == 0) {
398 std::tie(entries, more) = f->list(count, std::nullopt, y);
399 } else {
400 std::tie(entries, more) = f->list(count, marker, y);
401 }
402 break;
403
404 case BOTH: {
405 std::promise<benchmark> notify;
406 bool exit_early = false;
407
408 auto notifier = notify.get_future();
409 std::thread t(concurpull, oid, pid, count, pull_entries,
410 std::move(notify), &exit_early);
411 t.detach();
412 try {
413 pushmark = push(*f, count, entry_size, push_entries, y);
414 } catch (const std::exception&) {
415 exit_early = true;
416 notifier.wait();
417 throw;
418 }
419 pullmark = notifier.get();
420 }
421 }
422
423 if (op & CLEAN)
424 clean(r, ioc, *f, y);
425 });
426 c.run();
427 if (op & PUSH) {
428 fmt::print("Pushed {} in {} at {}/s\n",
429 pushmark.entries, pushmark.elapsed, pushmark.ratio());
430 }
431 if (op & PULL) {
432 if (pullmark.entries == count) {
433 fmt::print(std::cout, "Pulled {} in {} at {}/s\n",
434 pullmark.entries, pullmark.elapsed, pullmark.ratio());
435 } else {
436 fmt::print(std::cout, "Pulled {} (of {} requested), in {} at {}/s\n",
437 pullmark.entries, count, pullmark.elapsed, pullmark.ratio());
438 }
439 }
440 if (op & METADATA) {
441 fmt::print(std::cout, "Metadata: [{}]\n", meta);
442 }
443 if (op & PARTINFO) {
444 fmt::print(std::cout, "Info for partition {}: [{}]\n", part_num, partinfo);
445 }
446 if (op & LIST) {
447 for (const auto& entry : entries) {
448 fmt::print(std::cout, "{}\t{}\n", entry.marker, entry.mtime);
449 }
450 if (more) {
451 fmt::print(std::cout, "...");
452 }
453 }
454 } catch (const std::exception& e) {
455 if (command.empty()) {
456 fmt::print(std::cerr, "{}: {}\n", prog, e.what());
457 } else {
458 fmt::print(std::cerr, "{}: {}: {}\n", prog, command, e.what());
459 }
460 return 1;
461 }
462
463 return 0;
464}