]>
Commit | Line | Data |
---|---|---|
f67539c2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2020 Red Hat, Inc. | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | */ | |
14 | ||
15 | ||
16 | #include <cerrno> | |
17 | #include <chrono> | |
18 | #include <cstdint> | |
19 | #include <exception> | |
20 | #include <future> | |
21 | #include <iostream> | |
22 | #include <string_view> | |
23 | ||
24 | #include <boost/asio.hpp> | |
25 | #include <boost/system/error_code.hpp> | |
26 | #include <boost/program_options.hpp> | |
27 | ||
28 | #undef FMT_HEADER_ONLY | |
29 | #define FMT_HEADER_ONLY 1 | |
30 | #include <fmt/chrono.h> | |
31 | #include <fmt/format.h> | |
32 | #include <fmt/ostream.h> | |
33 | ||
34 | #include <spawn/spawn.hpp> | |
35 | ||
36 | #include "include/neorados/RADOS.hpp" | |
37 | ||
38 | #include "neorados/cls/fifo.h" | |
39 | ||
20effc67 TL |
40 | using namespace std; |
41 | ||
f67539c2 TL |
42 | namespace ba = boost::asio; |
43 | namespace bs = boost::system; | |
44 | namespace bpo = boost::program_options; | |
45 | namespace cb = ceph::buffer; | |
46 | namespace R = neorados; | |
47 | namespace RCf = neorados::cls::fifo; | |
48 | namespace fifo = rados::cls::fifo; | |
49 | namespace s = spawn; | |
50 | namespace sc = std::chrono; | |
51 | ||
52 | namespace { | |
53 | static constexpr auto PUSH = 0x01 << 0; | |
54 | static constexpr auto PULL = 0x01 << 1; | |
55 | static constexpr auto BOTH = PUSH | PULL; | |
56 | static constexpr auto CLEAN = 0x01 << 2; | |
57 | static constexpr auto METADATA = 0x01 << 3; | |
58 | static constexpr auto PARTINFO = 0x01 << 4; | |
59 | static constexpr auto LIST = 0x01 << 5; | |
60 | ||
61 | struct benchmark { | |
62 | std::uint32_t entries = 0; | |
63 | sc::duration<double> elapsed = 0ns; | |
64 | ||
65 | std::uint64_t ratio() const { | |
66 | return entries/std::max(elapsed, | |
67 | sc::duration<double>(1ns)).count(); | |
68 | } | |
69 | benchmark() = default; | |
70 | benchmark(std::uint32_t entries, sc::duration<double> elapsed) | |
71 | : entries(entries), elapsed(elapsed) {} | |
72 | }; | |
73 | ||
74 | benchmark push(RCf::FIFO& f, const std::uint32_t count, | |
75 | const std::uint32_t entry_size, const std::uint32_t push_entries, | |
76 | s::yield_context y) | |
77 | { | |
78 | cb::list entry; | |
79 | entry.push_back(cb::create_small_page_aligned(entry_size)); | |
80 | entry.zero(); | |
81 | ||
82 | std::vector entries(std::min(count, push_entries), entry); | |
83 | auto remaining = count; | |
84 | auto start = sc::steady_clock::now(); | |
85 | while (remaining) { | |
86 | if (entries.size() > remaining) { | |
87 | entries.resize(remaining); | |
88 | } | |
89 | f.push(entries, y); | |
90 | remaining -= entries.size(); | |
91 | } | |
92 | auto finish = sc::steady_clock::now(); | |
93 | return benchmark(count, (finish - start)); | |
94 | } | |
95 | ||
96 | benchmark pull(RCf::FIFO& f, const std::uint32_t count, | |
97 | const std::uint32_t pull_entries, s::yield_context y) | |
98 | { | |
99 | auto remaining = count; | |
100 | std::uint32_t got = 0; | |
101 | ||
102 | auto start = sc::steady_clock::now(); | |
103 | while (remaining) { | |
104 | auto [result, more] = f.list(std::min(remaining, pull_entries), | |
105 | std::nullopt, y); | |
106 | if (result.empty()) | |
107 | break; | |
108 | got += result.size(); | |
109 | remaining -= result.size(); | |
110 | f.trim(result.back().marker, false, y); | |
111 | } | |
112 | auto finish = sc::steady_clock::now(); | |
113 | return benchmark(got, (finish - start)); | |
114 | } | |
115 | ||
116 | void concurpull(const std::string& oid, const std::int64_t pool, | |
117 | const std::uint32_t count, const std::uint32_t pull_entries, | |
118 | std::promise<benchmark> notify, const bool* const exit_early) | |
119 | { | |
120 | ba::io_context c; | |
121 | benchmark bench; | |
122 | std::exception_ptr ex; | |
123 | s::spawn( | |
124 | c, | |
125 | [&](s::yield_context y) { | |
126 | try { | |
127 | auto r = R::RADOS::Builder{}.build(c, y); | |
128 | R::IOContext ioc(pool); | |
129 | auto f = RCf::FIFO::open(r, ioc, oid, y); | |
130 | auto remaining = count; | |
131 | std::uint32_t got = 0; | |
132 | ||
133 | auto start = sc::steady_clock::now(); | |
134 | while (remaining) { | |
135 | if (*exit_early) break; | |
136 | auto [result, more] = | |
137 | f->list(std::min(remaining, pull_entries), std::nullopt, y); | |
138 | if (result.empty()) { | |
139 | // We just keep going assuming they'll push more. | |
140 | continue; | |
141 | } | |
142 | got += result.size(); | |
143 | remaining -= result.size(); | |
144 | if (*exit_early) break; | |
145 | f->trim(result.back().marker, false, y); | |
146 | } | |
147 | auto finish = sc::steady_clock::now(); | |
148 | bench.entries = got; | |
149 | bench.elapsed = finish - start; | |
150 | } catch (const std::exception&) { | |
151 | ex = std::current_exception(); | |
152 | } | |
153 | }); | |
154 | c.run(); | |
155 | if (ex) { | |
156 | notify.set_exception(std::current_exception()); | |
157 | } else { | |
158 | notify.set_value(bench); | |
159 | } | |
160 | } | |
161 | ||
162 | void clean(R::RADOS& r, const R::IOContext& ioc, RCf::FIFO& f, | |
163 | s::yield_context y) | |
164 | { | |
165 | f.read_meta(y); | |
166 | const auto info = f.meta(); | |
167 | if (info.head_part_num > -1) { | |
168 | for (auto i = info.tail_part_num; i <= info.head_part_num; ++i) { | |
169 | R::WriteOp op; | |
170 | op.remove(); | |
171 | r.execute(info.part_oid(i), ioc, std::move(op), y); | |
172 | } | |
173 | } | |
174 | R::WriteOp op; | |
175 | op.remove(); | |
176 | r.execute(info.id, ioc, std::move(op), y); | |
177 | } | |
178 | } | |
179 | ||
180 | int main(int argc, char* argv[]) | |
181 | { | |
182 | const std::string_view prog(argv[0]); | |
183 | std::string command; | |
184 | try { | |
185 | std::uint32_t count = 0; | |
186 | std::string oid; | |
187 | std::string pool; | |
188 | std::uint32_t entry_size = 0; | |
189 | std::uint32_t push_entries = 0; | |
190 | std::uint32_t pull_entries = 0; | |
191 | std::uint64_t max_part_size = 0; | |
192 | std::uint64_t max_entry_size = 0; | |
193 | std::int64_t part_num = 0; | |
194 | std::string marker; | |
195 | ||
196 | bpo::options_description desc(fmt::format("{} options", prog)); | |
197 | desc.add_options() | |
198 | ("help", "show help") | |
199 | ("oid", bpo::value<std::string>(&oid)->default_value("fifo"s), | |
200 | "the base oid for the fifo") | |
201 | ("pool", bpo::value<std::string>(&pool)->default_value("fifo_benchmark"s), | |
202 | "the base oid for the fifo") | |
203 | ("count", bpo::value<std::uint32_t>(&count)->default_value(1024), | |
204 | "total count of items") | |
205 | ("entry-size", bpo::value<std::uint32_t>(&entry_size)->default_value(64), | |
206 | "size of entries to push") | |
207 | ("push-entries", | |
208 | bpo::value<std::uint32_t>(&push_entries) | |
209 | ->default_value(512), "entries to push per call") | |
210 | ("max-part-size", bpo::value<std::uint64_t>(&max_part_size) | |
211 | ->default_value(RCf::default_max_part_size), | |
212 | "maximum entry size allowed by FIFO") | |
213 | ("max-entry-size", bpo::value<std::uint64_t>(&max_entry_size) | |
214 | ->default_value(RCf::default_max_entry_size), | |
215 | "maximum entry size allowed by FIFO") | |
216 | ("pull-entries", | |
217 | bpo::value<uint32_t>(&pull_entries) | |
218 | ->default_value(512), "entries to pull per call") | |
219 | ("part-num", | |
220 | bpo::value<int64_t>(&part_num) | |
221 | ->default_value(-1), "partition number, -1 for head") | |
222 | ("marker", bpo::value<std::string>(&marker), "marker to begin list") | |
223 | ("command", bpo::value<std::string>(&command), | |
224 | "the operation to perform"); | |
225 | ||
226 | bpo::positional_options_description p; | |
227 | p.add("command", 1); | |
228 | ||
229 | bpo::variables_map vm; | |
230 | ||
231 | bpo::store(bpo::command_line_parser(argc, argv). | |
232 | options(desc).positional(p).run(), vm); | |
233 | ||
234 | bpo::notify(vm); | |
235 | ||
236 | if (vm.count("help")) { | |
237 | fmt::print(std::cout, "{}", desc); | |
238 | fmt::print(std::cout, "\n{} commands:\n", prog); | |
239 | fmt::print(std::cout, " push\t\t\t push entries into fifo\n"); | |
240 | fmt::print(std::cout, " pull\t\t\t retrieve and trim entries\n"); | |
241 | fmt::print(std::cout, " both\t\t\t both at once, in two threads\n"); | |
242 | fmt::print(std::cout, " metadata\t\t\t print metadata\n"); | |
243 | fmt::print(std::cout, " partinfo\t\t\t print metadata\n"); | |
244 | fmt::print(std::cout, " list\t\t\t list entries\n"); | |
245 | fmt::print(std::cout, " clean\t\t\t clean up\n"); | |
246 | return 0; | |
247 | } | |
248 | ||
249 | ||
250 | if (vm.find("command") == vm.end()) { | |
251 | fmt::print(std::cerr, "{}: a command is required\n", prog); | |
252 | return 1; | |
253 | } | |
254 | ||
255 | int op = 0; | |
256 | if (command == "push"s) { | |
257 | op = PUSH; | |
258 | } else if (command == "pull"s) { | |
259 | op = PULL; | |
260 | } else if (command == "both"s) { | |
261 | op = BOTH; | |
262 | } else if (command == "clean"s) { | |
263 | op = CLEAN; | |
264 | } else if (command == "metadata"s) { | |
265 | op = METADATA; | |
266 | } else if (command == "partinfo"s) { | |
267 | op = PARTINFO; | |
268 | } else if (command == "list"s) { | |
269 | op = LIST; | |
270 | } else { | |
271 | fmt::print(std::cerr, "{}: {} is not a valid command\n", | |
272 | prog, command); | |
273 | return 1; | |
274 | } | |
275 | ||
276 | if (!(op & PULL) && !vm["pull-entries"].defaulted()) { | |
277 | fmt::print(std::cerr, "{}: pull-entries is only meaningful when pulling\n", | |
278 | prog); | |
279 | return 1; | |
280 | } | |
281 | ||
282 | if (!(op & PUSH)) { | |
283 | for (const auto& p : { "entry-size"s, "push-entries"s, "max-part-size"s, | |
284 | "max-entry-size"s }) { | |
285 | if (!vm[p].defaulted()) { | |
286 | fmt::print(std::cerr, "{}: {} is only meaningful when pushing\n", | |
287 | prog, p); | |
288 | return 1; | |
289 | } | |
290 | } | |
291 | } | |
292 | ||
293 | if (!(op & BOTH) && !(op & LIST) && !vm["count"].defaulted()) { | |
294 | fmt::print(std::cerr, "{}: count is only meaningful when pulling, pushing, both, or listing\n", | |
295 | prog); | |
296 | return 1; | |
297 | } | |
298 | ||
299 | if (!(op & PARTINFO) && !vm["part-num"].defaulted()) { | |
300 | fmt::print(std::cerr, "{}: part-num is only meaningful when getting part info\n", | |
301 | prog); | |
302 | return 1; | |
303 | } | |
304 | ||
305 | if (count == 0) { | |
306 | fmt::print(std::cerr, "{}: count must be nonzero\n", prog); | |
307 | return 1; | |
308 | } | |
309 | ||
310 | if ((op & PULL) && (pull_entries == 0)) { | |
311 | fmt::print(std::cerr, | |
312 | "{}: pull-entries must be nonzero\n", prog); | |
313 | return 1; | |
314 | } | |
315 | ||
316 | if (!(op & LIST) && vm.count("marker") > 0) { | |
317 | fmt::print(std::cerr, "{}: marker is only meaningful when listing\n", | |
318 | prog); | |
319 | return 1; | |
320 | } | |
321 | ||
322 | if (op & PUSH) { | |
323 | if (entry_size == 0) { | |
324 | fmt::print(std::cerr, "{}: entry-size must be nonzero\n", prog); | |
325 | return 1; | |
326 | } | |
327 | if (push_entries== 0) { | |
328 | fmt::print(std::cerr, "{}: push-entries must be nonzero\n", prog); | |
329 | return 1; | |
330 | } | |
331 | if (max_entry_size == 0) { | |
332 | fmt::print(std::cerr, "{}: max-entry-size must be nonzero\n", prog); | |
333 | return 1; | |
334 | } | |
335 | if (max_part_size == 0) { | |
336 | fmt::print(std::cerr, "{}: max-part-size must be nonzero\n", prog); | |
337 | return 1; | |
338 | } | |
339 | if (entry_size > max_entry_size) { | |
340 | fmt::print(std::cerr, | |
341 | "{}: entry-size may not be greater than max-entry-size\n", | |
342 | prog); | |
343 | return 1; | |
344 | } | |
345 | if (max_entry_size >= max_part_size) { | |
346 | fmt::print(std::cerr, | |
347 | "{}: max-entry-size may be less than max-part-size\n", | |
348 | prog); | |
349 | return 1; | |
350 | } | |
351 | } | |
352 | ||
353 | ba::io_context c; | |
354 | benchmark pushmark, pullmark; | |
355 | fifo::info meta; | |
356 | fifo::part_header partinfo; | |
357 | bool more = false; | |
358 | std::vector<RCf::list_entry> entries; | |
359 | s::spawn( | |
360 | c, | |
361 | [&](s::yield_context y) { | |
362 | auto r = R::RADOS::Builder{}.build(c, y); | |
363 | bs::error_code ec; | |
364 | std::int64_t pid; | |
365 | pid = r.lookup_pool(pool, y[ec]); | |
366 | if (ec) { | |
367 | r.create_pool(pool, std::nullopt, y); | |
368 | pid = r.lookup_pool(pool, y); | |
369 | } | |
370 | const R::IOContext ioc(pid); | |
371 | auto f = RCf::FIFO::create(r, ioc, oid, y, std::nullopt, | |
372 | std::nullopt, false, max_part_size, | |
373 | max_entry_size); | |
374 | ||
375 | switch (op) { | |
376 | case PUSH: | |
377 | pushmark = push(*f, count, entry_size, push_entries, y); | |
378 | break; | |
379 | ||
380 | case PULL: | |
381 | pullmark = pull(*f, count, pull_entries, y); | |
382 | break; | |
383 | ||
384 | case METADATA: | |
385 | meta = f->meta(); | |
386 | break; | |
387 | ||
388 | case PARTINFO: | |
389 | meta = f->meta(); | |
390 | if (part_num == -1) { | |
391 | part_num = meta.head_part_num; | |
392 | } | |
393 | partinfo = f->get_part_info(part_num, y); | |
394 | break; | |
395 | ||
396 | case LIST: | |
397 | if (vm.count("marker") == 0) { | |
398 | std::tie(entries, more) = f->list(count, std::nullopt, y); | |
399 | } else { | |
400 | std::tie(entries, more) = f->list(count, marker, y); | |
401 | } | |
402 | break; | |
403 | ||
404 | case BOTH: { | |
405 | std::promise<benchmark> notify; | |
406 | bool exit_early = false; | |
407 | ||
408 | auto notifier = notify.get_future(); | |
409 | std::thread t(concurpull, oid, pid, count, pull_entries, | |
410 | std::move(notify), &exit_early); | |
411 | t.detach(); | |
412 | try { | |
413 | pushmark = push(*f, count, entry_size, push_entries, y); | |
414 | } catch (const std::exception&) { | |
415 | exit_early = true; | |
416 | notifier.wait(); | |
417 | throw; | |
418 | } | |
419 | pullmark = notifier.get(); | |
420 | } | |
421 | } | |
422 | ||
423 | if (op & CLEAN) | |
424 | clean(r, ioc, *f, y); | |
425 | }); | |
426 | c.run(); | |
427 | if (op & PUSH) { | |
428 | fmt::print("Pushed {} in {} at {}/s\n", | |
429 | pushmark.entries, pushmark.elapsed, pushmark.ratio()); | |
430 | } | |
431 | if (op & PULL) { | |
432 | if (pullmark.entries == count) { | |
433 | fmt::print(std::cout, "Pulled {} in {} at {}/s\n", | |
434 | pullmark.entries, pullmark.elapsed, pullmark.ratio()); | |
435 | } else { | |
436 | fmt::print(std::cout, "Pulled {} (of {} requested), in {} at {}/s\n", | |
437 | pullmark.entries, count, pullmark.elapsed, pullmark.ratio()); | |
438 | } | |
439 | } | |
440 | if (op & METADATA) { | |
441 | fmt::print(std::cout, "Metadata: [{}]\n", meta); | |
442 | } | |
443 | if (op & PARTINFO) { | |
444 | fmt::print(std::cout, "Info for partition {}: [{}]\n", part_num, partinfo); | |
445 | } | |
446 | if (op & LIST) { | |
447 | for (const auto& entry : entries) { | |
448 | fmt::print(std::cout, "{}\t{}\n", entry.marker, entry.mtime); | |
449 | } | |
450 | if (more) { | |
451 | fmt::print(std::cout, "..."); | |
452 | } | |
453 | } | |
454 | } catch (const std::exception& e) { | |
455 | if (command.empty()) { | |
456 | fmt::print(std::cerr, "{}: {}\n", prog, e.what()); | |
457 | } else { | |
458 | fmt::print(std::cerr, "{}: {}: {}\n", prog, command, e.what()); | |
459 | } | |
460 | return 1; | |
461 | } | |
462 | ||
463 | return 0; | |
464 | } |