]> git.proxmox.com Git - ceph.git/blame - ceph/src/common/obj_bencher.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / common / obj_bencher.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2009 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 * Series of functions to test your rados installation. Notice
14 * that this code is not terribly robust -- for instance, if you
15 * try and bench on a pool you don't have permission to access
16 * it will just loop forever.
17 */
18#include "include/compat.h"
19#include <pthread.h>
9f95a23c
TL
20#include "common/ceph_mutex.h"
21#include "common/Clock.h"
7c673cae
FG
22#include "obj_bencher.h"
23
7c673cae
FG
24const std::string BENCH_LASTRUN_METADATA = "benchmark_last_metadata";
25const std::string BENCH_PREFIX = "benchmark_data";
11fdf7f2
TL
26const std::string BENCH_OBJ_NAME = BENCH_PREFIX + "_%s_%d_object%d";
27
7c673cae
FG
28static char cached_hostname[30] = {0};
29int cached_pid = 0;
30
31static std::string generate_object_prefix_nopid() {
32 if (cached_hostname[0] == 0) {
33 gethostname(cached_hostname, sizeof(cached_hostname)-1);
34 cached_hostname[sizeof(cached_hostname)-1] = 0;
35 }
36
37 std::ostringstream oss;
38 oss << BENCH_PREFIX << "_" << cached_hostname;
39 return oss.str();
40}
41
42static std::string generate_object_prefix(int pid = 0) {
43 if (pid)
44 cached_pid = pid;
45 else if (!cached_pid)
46 cached_pid = getpid();
47
48 std::ostringstream oss;
49 oss << generate_object_prefix_nopid() << "_" << cached_pid;
50 return oss.str();
51}
52
11fdf7f2
TL
53// this is 8x faster than previous impl based on chained, deduped functions call
54static std::string generate_object_name_fast(int objnum, int pid = 0)
7c673cae 55{
11fdf7f2
TL
56 if (cached_hostname[0] == 0) {
57 gethostname(cached_hostname, sizeof(cached_hostname)-1);
58 cached_hostname[sizeof(cached_hostname)-1] = 0;
59 }
60
61 if (pid)
62 cached_pid = pid;
63 else if (!cached_pid)
64 cached_pid = getpid();
65
66 char name[512];
67 int n = snprintf(&name[0], sizeof(name), BENCH_OBJ_NAME.c_str(), cached_hostname, cached_pid, objnum);
68 ceph_assert(n > 0 && n < (int)sizeof(name));
69 return std::string(&name[0], (size_t)n);
7c673cae
FG
70}
71
72static void sanitize_object_contents (bench_data *data, size_t length) {
92f5a8d4 73 // FIPS zeroization audit 20191115: this memset is not security related.
7c673cae
FG
74 memset(data->object_contents, 'z', length);
75}
76
77ostream& ObjBencher::out(ostream& os, utime_t& t)
78{
79 if (show_time)
80 return t.localtime(os) << " ";
81 else
82 return os;
83}
84
85ostream& ObjBencher::out(ostream& os)
86{
87 utime_t cur_time = ceph_clock_now();
88 return out(os, cur_time);
89}
90
91void *ObjBencher::status_printer(void *_bencher) {
92 ObjBencher *bencher = static_cast<ObjBencher *>(_bencher);
93 bench_data& data = bencher->data;
94 Formatter *formatter = bencher->formatter;
95 ostream *outstream = bencher->outstream;
9f95a23c 96 ceph::condition_variable cond;
7c673cae
FG
97 int i = 0;
98 int previous_writes = 0;
99 int cycleSinceChange = 0;
100 double bandwidth;
11fdf7f2
TL
101 int iops = 0;
102 mono_clock::duration ONE_SECOND = std::chrono::seconds(1);
9f95a23c 103 std::unique_lock locker{bencher->lock};
7c673cae
FG
104 if (formatter)
105 formatter->open_array_section("datas");
106 while(!data.done) {
11fdf7f2
TL
107 mono_time cur_time = mono_clock::now();
108 utime_t t = ceph_clock_now();
7c673cae
FG
109
110 if (i % 20 == 0 && !formatter) {
111 if (i > 0)
11fdf7f2
TL
112 t.localtime(cout)
113 << " min lat: " << data.min_latency
7c673cae
FG
114 << " max lat: " << data.max_latency
115 << " avg lat: " << data.avg_latency << std::endl;
116 //I'm naughty and don't reset the fill
11fdf7f2 117 bencher->out(cout, t) << setfill(' ')
7c673cae
FG
118 << setw(5) << "sec"
119 << setw(8) << "Cur ops"
120 << setw(10) << "started"
121 << setw(10) << "finished"
122 << setw(10) << "avg MB/s"
123 << setw(10) << "cur MB/s"
124 << setw(12) << "last lat(s)"
125 << setw(12) << "avg lat(s)" << std::endl;
126 }
127 if (cycleSinceChange)
128 bandwidth = (double)(data.finished - previous_writes)
129 * (data.op_size)
130 / (1024*1024)
131 / cycleSinceChange;
132 else
133 bandwidth = -1;
134
135 if (!std::isnan(bandwidth) && bandwidth > -1) {
136 if (bandwidth > data.idata.max_bandwidth)
137 data.idata.max_bandwidth = bandwidth;
138 if (bandwidth < data.idata.min_bandwidth)
139 data.idata.min_bandwidth = bandwidth;
140
11fdf7f2
TL
141 ++data.idata.bandwidth_cycles;
142 double delta = bandwidth - data.idata.avg_bandwidth;
143 data.idata.avg_bandwidth += delta / data.idata.bandwidth_cycles;
144 data.idata.bandwidth_diff_sum += delta * (bandwidth - data.idata.avg_bandwidth);
7c673cae
FG
145 }
146
147 if (cycleSinceChange)
148 iops = (double)(data.finished - previous_writes)
149 / cycleSinceChange;
150 else
151 iops = -1;
152
153 if (!std::isnan(iops) && iops > -1) {
154 if (iops > data.idata.max_iops)
155 data.idata.max_iops = iops;
156 if (iops < data.idata.min_iops)
157 data.idata.min_iops = iops;
158
11fdf7f2
TL
159 ++data.idata.iops_cycles;
160 double delta = iops - data.idata.avg_iops;
161 data.idata.avg_iops += delta / data.idata.iops_cycles;
162 data.idata.iops_diff_sum += delta * (iops - data.idata.avg_iops);
7c673cae
FG
163 }
164
165 if (formatter)
166 formatter->open_object_section("data");
167
11fdf7f2
TL
168 // elapsed will be in seconds, by default
169 std::chrono::duration<double> elapsed = cur_time - data.start_time;
7c673cae 170 double avg_bandwidth = (double) (data.op_size) * (data.finished)
11fdf7f2 171 / elapsed.count() / (1024*1024);
7c673cae
FG
172 if (previous_writes != data.finished) {
173 previous_writes = data.finished;
174 cycleSinceChange = 0;
175 if (!formatter) {
11fdf7f2 176 bencher->out(cout, t)
7c673cae
FG
177 << setfill(' ')
178 << setw(5) << i
179 << ' ' << setw(7) << data.in_flight
180 << ' ' << setw(9) << data.started
181 << ' ' << setw(9) << data.finished
182 << ' ' << setw(9) << avg_bandwidth
183 << ' ' << setw(9) << bandwidth
11fdf7f2 184 << ' ' << setw(11) << (double)data.cur_latency.count()
7c673cae
FG
185 << ' ' << setw(11) << data.avg_latency << std::endl;
186 } else {
187 formatter->dump_format("sec", "%d", i);
188 formatter->dump_format("cur_ops", "%d", data.in_flight);
189 formatter->dump_format("started", "%d", data.started);
190 formatter->dump_format("finished", "%d", data.finished);
191 formatter->dump_format("avg_bw", "%f", avg_bandwidth);
192 formatter->dump_format("cur_bw", "%f", bandwidth);
11fdf7f2 193 formatter->dump_format("last_lat", "%f", (double)data.cur_latency.count());
7c673cae
FG
194 formatter->dump_format("avg_lat", "%f", data.avg_latency);
195 }
196 }
197 else {
198 if (!formatter) {
11fdf7f2 199 bencher->out(cout, t)
7c673cae
FG
200 << setfill(' ')
201 << setw(5) << i
202 << ' ' << setw(7) << data.in_flight
203 << ' ' << setw(9) << data.started
204 << ' ' << setw(9) << data.finished
205 << ' ' << setw(9) << avg_bandwidth
206 << ' ' << setw(9) << '0'
207 << ' ' << setw(11) << '-'
208 << ' '<< setw(11) << data.avg_latency << std::endl;
209 } else {
210 formatter->dump_format("sec", "%d", i);
211 formatter->dump_format("cur_ops", "%d", data.in_flight);
212 formatter->dump_format("started", "%d", data.started);
213 formatter->dump_format("finished", "%d", data.finished);
214 formatter->dump_format("avg_bw", "%f", avg_bandwidth);
215 formatter->dump_format("cur_bw", "%f", 0);
216 formatter->dump_format("last_lat", "%f", 0);
217 formatter->dump_format("avg_lat", "%f", data.avg_latency);
218 }
219 }
220 if (formatter) {
221 formatter->close_section(); // data
222 formatter->flush(*outstream);
223 }
224 ++i;
225 ++cycleSinceChange;
9f95a23c 226 cond.wait_for(locker, ONE_SECOND);
7c673cae
FG
227 }
228 if (formatter)
229 formatter->close_section(); //datas
11fdf7f2
TL
230 if (iops < 0) {
231 std::chrono::duration<double> runtime = mono_clock::now() - data.start_time;
232 data.idata.min_iops = data.idata.max_iops = data.finished / runtime.count();
233 }
7c673cae
FG
234 return NULL;
235}
236
237int ObjBencher::aio_bench(
238 int operation, int secondsToRun,
239 int concurrentios,
240 uint64_t op_size, uint64_t object_size,
241 unsigned max_objects,
242 bool cleanup, bool hints,
11fdf7f2 243 const std::string& run_name, bool reuse_bench, bool no_verify) {
7c673cae
FG
244
245 if (concurrentios <= 0)
246 return -EINVAL;
247
9f95a23c 248 int num_ops = 0;
7c673cae
FG
249 int num_objects = 0;
250 int r = 0;
11fdf7f2
TL
251 int prev_pid = 0;
252 std::chrono::duration<double> timePassed;
7c673cae
FG
253
254 // default metadata object is used if user does not specify one
255 const std::string run_name_meta = (run_name.empty() ? BENCH_LASTRUN_METADATA : run_name);
256
257 //get data from previous write run, if available
11fdf7f2 258 if (operation != OP_WRITE || reuse_bench) {
7c673cae
FG
259 uint64_t prev_op_size, prev_object_size;
260 r = fetch_bench_metadata(run_name_meta, &prev_op_size, &prev_object_size,
9f95a23c 261 &num_ops, &num_objects, &prev_pid);
7c673cae 262 if (r < 0) {
11fdf7f2
TL
263 if (r == -ENOENT) {
264 if (reuse_bench)
265 cerr << "Must write data before using reuse_bench for a write benchmark!" << std::endl;
266 else
267 cerr << "Must write data before running a read benchmark!" << std::endl;
268 }
7c673cae
FG
269 return r;
270 }
271 object_size = prev_object_size;
272 op_size = prev_op_size;
273 }
274
275 char* contentsChars = new char[op_size];
11fdf7f2 276 lock.lock();
7c673cae
FG
277 data.done = false;
278 data.hints = hints;
279 data.object_size = object_size;
280 data.op_size = op_size;
281 data.in_flight = 0;
282 data.started = 0;
283 data.finished = 0;
284 data.min_latency = 9999.0; // this better be higher than initial latency!
285 data.max_latency = 0;
286 data.avg_latency = 0;
11fdf7f2 287 data.latency_diff_sum = 0;
7c673cae 288 data.object_contents = contentsChars;
11fdf7f2 289 lock.unlock();
7c673cae
FG
290
291 //fill in contentsChars deterministically so we can check returns
292 sanitize_object_contents(&data, data.op_size);
293
294 if (formatter)
295 formatter->open_object_section("bench");
296
297 if (OP_WRITE == operation) {
11fdf7f2 298 r = write_bench(secondsToRun, concurrentios, run_name_meta, max_objects, prev_pid);
7c673cae
FG
299 if (r != 0) goto out;
300 }
301 else if (OP_SEQ_READ == operation) {
9f95a23c 302 r = seq_read_bench(secondsToRun, num_ops, num_objects, concurrentios, prev_pid, no_verify);
7c673cae
FG
303 if (r != 0) goto out;
304 }
305 else if (OP_RAND_READ == operation) {
9f95a23c 306 r = rand_read_bench(secondsToRun, num_ops, num_objects, concurrentios, prev_pid, no_verify);
7c673cae
FG
307 if (r != 0) goto out;
308 }
309
310 if (OP_WRITE == operation && cleanup) {
311 r = fetch_bench_metadata(run_name_meta, &op_size, &object_size,
9f95a23c 312 &num_ops, &num_objects, &prev_pid);
7c673cae
FG
313 if (r < 0) {
314 if (r == -ENOENT)
315 cerr << "Should never happen: bench metadata missing for current run!" << std::endl;
316 goto out;
317 }
318
11fdf7f2 319 data.start_time = mono_clock::now();
7c673cae
FG
320 out(cout) << "Cleaning up (deleting benchmark objects)" << std::endl;
321
11fdf7f2 322 r = clean_up(num_objects, prev_pid, concurrentios);
7c673cae
FG
323 if (r != 0) goto out;
324
11fdf7f2
TL
325 timePassed = mono_clock::now() - data.start_time;
326 out(cout) << "Clean up completed and total clean up time :" << timePassed.count() << std::endl;
7c673cae
FG
327
328 // lastrun file
329 r = sync_remove(run_name_meta);
330 if (r != 0) goto out;
331 }
332
333 out:
334 if (formatter) {
335 formatter->close_section(); // bench
336 formatter->flush(*outstream);
337 *outstream << std::endl;
338 }
339 delete[] contentsChars;
340 return r;
341}
342
343struct lock_cond {
9f95a23c
TL
344 explicit lock_cond(ceph::mutex *_lock) : lock(_lock) {}
345 ceph::mutex *lock;
346 ceph::condition_variable cond;
7c673cae
FG
347};
348
349void _aio_cb(void *cb, void *arg) {
350 struct lock_cond *lc = (struct lock_cond *)arg;
11fdf7f2 351 lc->lock->lock();
9f95a23c 352 lc->cond.notify_all();
11fdf7f2 353 lc->lock->unlock();
7c673cae
FG
354}
355
356int ObjBencher::fetch_bench_metadata(const std::string& metadata_file,
357 uint64_t *op_size, uint64_t* object_size,
9f95a23c 358 int* num_ops, int* num_objects, int* prevPid) {
7c673cae
FG
359 int r = 0;
360 bufferlist object_data;
361
362 r = sync_read(metadata_file, object_data,
363 sizeof(int) * 2 + sizeof(size_t) * 2);
364 if (r <= 0) {
365 // treat an empty file as a file that does not exist
366 if (r == 0) {
367 r = -ENOENT;
368 }
369 return r;
370 }
11fdf7f2
TL
371 auto p = object_data.cbegin();
372 decode(*object_size, p);
9f95a23c 373 decode(*num_ops, p);
11fdf7f2 374 decode(*prevPid, p);
7c673cae 375 if (!p.end()) {
11fdf7f2 376 decode(*op_size, p);
7c673cae
FG
377 } else {
378 *op_size = *object_size;
379 }
9f95a23c
TL
380 unsigned ops_per_object = 1;
381 // make sure *op_size value is reasonable
382 if (*op_size > 0 && *object_size > *op_size) {
383 ops_per_object = *object_size / *op_size;
384 }
385 *num_objects = (*num_ops + ops_per_object - 1) / ops_per_object;
7c673cae
FG
386
387 return 0;
388}
389
390int ObjBencher::write_bench(int secondsToRun,
391 int concurrentios, const string& run_name_meta,
11fdf7f2 392 unsigned max_objects, int prev_pid) {
9f95a23c 393 if (concurrentios <= 0)
7c673cae 394 return -EINVAL;
9f95a23c 395
7c673cae
FG
396 if (!formatter) {
397 out(cout) << "Maintaining " << concurrentios << " concurrent writes of "
398 << data.op_size << " bytes to objects of size "
399 << data.object_size << " for up to "
400 << secondsToRun << " seconds or "
401 << max_objects << " objects"
402 << std::endl;
403 } else {
404 formatter->dump_format("concurrent_ios", "%d", concurrentios);
405 formatter->dump_format("object_size", "%d", data.object_size);
406 formatter->dump_format("op_size", "%d", data.op_size);
407 formatter->dump_format("seconds_to_run", "%d", secondsToRun);
408 formatter->dump_format("max_objects", "%d", max_objects);
409 }
410 bufferlist* newContents = 0;
411
11fdf7f2 412 std::string prefix = prev_pid ? generate_object_prefix(prev_pid) : generate_object_prefix();
7c673cae
FG
413 if (!formatter)
414 out(cout) << "Object prefix: " << prefix << std::endl;
415 else
416 formatter->dump_string("object_prefix", prefix);
417
418 std::vector<string> name(concurrentios);
419 std::string newName;
11fdf7f2 420 unique_ptr<bufferlist> contents[concurrentios];
7c673cae
FG
421 int r = 0;
422 bufferlist b_write;
423 lock_cond lc(&lock);
11fdf7f2
TL
424 double total_latency = 0;
425 std::vector<mono_time> start_times(concurrentios);
426 mono_time stopTime;
427 std::chrono::duration<double> timePassed;
7c673cae
FG
428
429 unsigned writes_per_object = 1;
430 if (data.op_size)
431 writes_per_object = data.object_size / data.op_size;
432
433 r = completions_init(concurrentios);
434
435 //set up writes so I can start them together
436 for (int i = 0; i<concurrentios; ++i) {
11fdf7f2
TL
437 name[i] = generate_object_name_fast(i / writes_per_object);
438 contents[i] = std::make_unique<bufferlist>();
7c673cae
FG
439 snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", i);
440 contents[i]->append(data.object_contents, data.op_size);
441 }
442
443 pthread_t print_thread;
444
445 pthread_create(&print_thread, NULL, ObjBencher::status_printer, (void *)this);
446 ceph_pthread_setname(print_thread, "write_stat");
9f95a23c 447 std::unique_lock locker{lock};
7c673cae 448 data.finished = 0;
11fdf7f2 449 data.start_time = mono_clock::now();
9f95a23c 450 locker.unlock();
7c673cae 451 for (int i = 0; i<concurrentios; ++i) {
11fdf7f2 452 start_times[i] = mono_clock::now();
7c673cae
FG
453 r = create_completion(i, _aio_cb, (void *)&lc);
454 if (r < 0)
455 goto ERR;
456 r = aio_write(name[i], i, *contents[i], data.op_size,
457 data.op_size * (i % writes_per_object));
11fdf7f2 458 if (r < 0) {
7c673cae
FG
459 goto ERR;
460 }
9f95a23c 461 locker.lock();
7c673cae
FG
462 ++data.started;
463 ++data.in_flight;
9f95a23c 464 locker.unlock();
7c673cae
FG
465 }
466
467 //keep on adding new writes as old ones complete until we've passed minimum time
468 int slot;
7c673cae
FG
469
470 //don't need locking for reads because other thread doesn't write
471
11fdf7f2 472 stopTime = data.start_time + std::chrono::seconds(secondsToRun);
7c673cae 473 slot = 0;
9f95a23c
TL
474 locker.lock();
475 while (data.finished < data.started) {
7c673cae
FG
476 bool found = false;
477 while (1) {
478 int old_slot = slot;
479 do {
480 if (completion_is_done(slot)) {
481 found = true;
482 break;
483 }
484 slot++;
485 if (slot == concurrentios) {
486 slot = 0;
487 }
488 } while (slot != old_slot);
489 if (found)
490 break;
9f95a23c 491 lc.cond.wait(locker);
7c673cae 492 }
9f95a23c 493 locker.unlock();
7c673cae
FG
494
495 completion_wait(slot);
9f95a23c 496 locker.lock();
7c673cae
FG
497 r = completion_ret(slot);
498 if (r != 0) {
9f95a23c 499 locker.unlock();
7c673cae
FG
500 goto ERR;
501 }
11fdf7f2
TL
502 data.cur_latency = mono_clock::now() - start_times[slot];
503 total_latency += data.cur_latency.count();
504 if( data.cur_latency.count() > data.max_latency)
505 data.max_latency = data.cur_latency.count();
506 if (data.cur_latency.count() < data.min_latency)
507 data.min_latency = data.cur_latency.count();
7c673cae 508 ++data.finished;
11fdf7f2 509 double delta = data.cur_latency.count() - data.avg_latency;
7c673cae 510 data.avg_latency = total_latency / data.finished;
11fdf7f2 511 data.latency_diff_sum += delta * (data.cur_latency.count() - data.avg_latency);
7c673cae 512 --data.in_flight;
9f95a23c 513 locker.unlock();
7c673cae 514 release_completion(slot);
7c673cae 515
9f95a23c
TL
516 if (!secondsToRun || mono_clock::now() >= stopTime) {
517 locker.lock();
518 continue;
519 }
520
521 if (data.op_size && max_objects &&
522 data.started >=
523 (int)((data.object_size * max_objects + data.op_size - 1) /
524 data.op_size)) {
525 locker.lock();
526 continue;
527 }
528
7c673cae 529 //write new stuff to backend
9f95a23c
TL
530
531 //create new contents and name on the heap, and fill them
532 newName = generate_object_name_fast(data.started / writes_per_object);
533 newContents = contents[slot].get();
534 snprintf(newContents->c_str(), data.op_size, "I'm the %16dth op!", data.started);
535 // we wrote to buffer, going around internal crc cache, so invalidate it now.
536 newContents->invalidate_crc();
537
11fdf7f2 538 start_times[slot] = mono_clock::now();
7c673cae
FG
539 r = create_completion(slot, _aio_cb, &lc);
540 if (r < 0)
541 goto ERR;
542 r = aio_write(newName, slot, *newContents, data.op_size,
543 data.op_size * (data.started % writes_per_object));
11fdf7f2 544 if (r < 0) {
7c673cae
FG
545 goto ERR;
546 }
547 name[slot] = newName;
9f95a23c 548 locker.lock();
7c673cae
FG
549 ++data.started;
550 ++data.in_flight;
7c673cae 551 }
9f95a23c 552 locker.unlock();
7c673cae 553
11fdf7f2 554 timePassed = mono_clock::now() - data.start_time;
9f95a23c 555 locker.lock();
7c673cae 556 data.done = true;
9f95a23c 557 locker.unlock();
7c673cae
FG
558
559 pthread_join(print_thread, NULL);
560
561 double bandwidth;
11fdf7f2
TL
562 bandwidth = ((double)data.finished)*((double)data.op_size) /
563 timePassed.count();
7c673cae
FG
564 bandwidth = bandwidth/(1024*1024); // we want it in MB/sec
565
11fdf7f2
TL
566 double bandwidth_stddev;
567 double iops_stddev;
568 double latency_stddev;
569 if (data.idata.bandwidth_cycles > 1) {
570 bandwidth_stddev = std::sqrt(data.idata.bandwidth_diff_sum / (data.idata.bandwidth_cycles - 1));
571 } else {
572 bandwidth_stddev = 0;
573 }
574 if (data.idata.iops_cycles > 1) {
575 iops_stddev = std::sqrt(data.idata.iops_diff_sum / (data.idata.iops_cycles - 1));
576 } else {
577 iops_stddev = 0;
578 }
579 if (data.finished > 1) {
580 latency_stddev = std::sqrt(data.latency_diff_sum / (data.finished - 1));
581 } else {
582 latency_stddev = 0;
583 }
584
7c673cae 585 if (!formatter) {
11fdf7f2 586 out(cout) << "Total time run: " << timePassed.count() << std::endl
7c673cae
FG
587 << "Total writes made: " << data.finished << std::endl
588 << "Write size: " << data.op_size << std::endl
9f95a23c 589 << "Object size: " << data.object_size << std::endl
7c673cae 590 << "Bandwidth (MB/sec): " << setprecision(6) << bandwidth << std::endl
11fdf7f2 591 << "Stddev Bandwidth: " << bandwidth_stddev << std::endl
7c673cae
FG
592 << "Max bandwidth (MB/sec): " << data.idata.max_bandwidth << std::endl
593 << "Min bandwidth (MB/sec): " << data.idata.min_bandwidth << std::endl
11fdf7f2
TL
594 << "Average IOPS: " << (int)(data.finished/timePassed.count()) << std::endl
595 << "Stddev IOPS: " << iops_stddev << std::endl
7c673cae
FG
596 << "Max IOPS: " << data.idata.max_iops << std::endl
597 << "Min IOPS: " << data.idata.min_iops << std::endl
598 << "Average Latency(s): " << data.avg_latency << std::endl
11fdf7f2 599 << "Stddev Latency(s): " << latency_stddev << std::endl
7c673cae
FG
600 << "Max latency(s): " << data.max_latency << std::endl
601 << "Min latency(s): " << data.min_latency << std::endl;
602 } else {
11fdf7f2 603 formatter->dump_format("total_time_run", "%f", timePassed.count());
7c673cae
FG
604 formatter->dump_format("total_writes_made", "%d", data.finished);
605 formatter->dump_format("write_size", "%d", data.op_size);
606 formatter->dump_format("object_size", "%d", data.object_size);
607 formatter->dump_format("bandwidth", "%f", bandwidth);
11fdf7f2 608 formatter->dump_format("stddev_bandwidth", "%f", bandwidth_stddev);
7c673cae
FG
609 formatter->dump_format("max_bandwidth", "%f", data.idata.max_bandwidth);
610 formatter->dump_format("min_bandwidth", "%f", data.idata.min_bandwidth);
11fdf7f2
TL
611 formatter->dump_format("average_iops", "%d", (int)(data.finished/timePassed.count()));
612 formatter->dump_format("stddev_iops", "%d", iops_stddev);
7c673cae
FG
613 formatter->dump_format("max_iops", "%d", data.idata.max_iops);
614 formatter->dump_format("min_iops", "%d", data.idata.min_iops);
615 formatter->dump_format("average_latency", "%f", data.avg_latency);
11fdf7f2 616 formatter->dump_format("stddev_latency", "%f", latency_stddev);
28e407b8 617 formatter->dump_format("max_latency", "%f", data.max_latency);
7c673cae
FG
618 formatter->dump_format("min_latency", "%f", data.min_latency);
619 }
620 //write object size/number data for read benchmarks
11fdf7f2 621 encode(data.object_size, b_write);
9f95a23c 622 encode(data.finished, b_write);
11fdf7f2
TL
623 encode(prev_pid ? prev_pid : getpid(), b_write);
624 encode(data.op_size, b_write);
7c673cae
FG
625
626 // persist meta-data for further cleanup or read
627 sync_write(run_name_meta, b_write, sizeof(int)*3);
628
629 completions_done();
7c673cae
FG
630
631 return 0;
632
633 ERR:
9f95a23c 634 locker.lock();
7c673cae 635 data.done = 1;
9f95a23c 636 locker.unlock();
7c673cae 637 pthread_join(print_thread, NULL);
7c673cae
FG
638 return r;
639}
640
9f95a23c
TL
641int ObjBencher::seq_read_bench(
642 int seconds_to_run, int num_ops, int num_objects,
643 int concurrentios, int pid, bool no_verify) {
644
7c673cae
FG
645 lock_cond lc(&lock);
646
9f95a23c 647 if (concurrentios <= 0)
7c673cae
FG
648 return -EINVAL;
649
650 std::vector<string> name(concurrentios);
651 std::string newName;
11fdf7f2 652 unique_ptr<bufferlist> contents[concurrentios];
7c673cae
FG
653 int index[concurrentios];
654 int errors = 0;
7c673cae
FG
655 double total_latency = 0;
656 int r = 0;
11fdf7f2
TL
657 std::vector<mono_time> start_times(concurrentios);
658 mono_clock::duration time_to_run = std::chrono::seconds(seconds_to_run);
659 std::chrono::duration<double> timePassed;
7c673cae
FG
660 sanitize_object_contents(&data, data.op_size); //clean it up once; subsequent
661 //changes will be safe because string length should remain the same
662
11fdf7f2 663 unsigned reads_per_object = 1;
7c673cae 664 if (data.op_size)
11fdf7f2 665 reads_per_object = data.object_size / data.op_size;
7c673cae
FG
666
667 r = completions_init(concurrentios);
668 if (r < 0)
669 return r;
670
671 //set up initial reads
672 for (int i = 0; i < concurrentios; ++i) {
11fdf7f2
TL
673 name[i] = generate_object_name_fast(i / reads_per_object, pid);
674 contents[i] = std::make_unique<bufferlist>();
7c673cae
FG
675 }
676
9f95a23c 677 std::unique_lock locker{lock};
7c673cae 678 data.finished = 0;
11fdf7f2 679 data.start_time = mono_clock::now();
9f95a23c 680 locker.unlock();
7c673cae
FG
681
682 pthread_t print_thread;
683 pthread_create(&print_thread, NULL, status_printer, (void *)this);
684 ceph_pthread_setname(print_thread, "seq_read_stat");
685
11fdf7f2 686 mono_time finish_time = data.start_time + time_to_run;
7c673cae
FG
687 //start initial reads
688 for (int i = 0; i < concurrentios; ++i) {
689 index[i] = i;
11fdf7f2 690 start_times[i] = mono_clock::now();
7c673cae 691 create_completion(i, _aio_cb, (void *)&lc);
11fdf7f2
TL
692 r = aio_read(name[i], i, contents[i].get(), data.op_size,
693 data.op_size * (i % reads_per_object));
694 if (r < 0) {
7c673cae
FG
695 cerr << "r = " << r << std::endl;
696 goto ERR;
697 }
9f95a23c 698 locker.lock();
7c673cae
FG
699 ++data.started;
700 ++data.in_flight;
9f95a23c 701 locker.unlock();
7c673cae
FG
702 }
703
704 //keep on adding new reads as old ones complete
705 int slot;
706 bufferlist *cur_contents;
707
708 slot = 0;
9f95a23c
TL
709 while (data.finished < data.started) {
710 locker.lock();
7c673cae
FG
711 int old_slot = slot;
712 bool found = false;
713 while (1) {
714 do {
715 if (completion_is_done(slot)) {
716 found = true;
717 break;
718 }
719 slot++;
720 if (slot == concurrentios) {
721 slot = 0;
722 }
723 } while (slot != old_slot);
724 if (found) {
725 break;
726 }
9f95a23c 727 lc.cond.wait(locker);
7c673cae
FG
728 }
729
730 // calculate latency here, so memcmp doesn't inflate it
11fdf7f2 731 data.cur_latency = mono_clock::now() - start_times[slot];
7c673cae 732
11fdf7f2 733 cur_contents = contents[slot].get();
7c673cae 734 int current_index = index[slot];
9f95a23c 735
7c673cae
FG
736 // invalidate internal crc cache
737 cur_contents->invalidate_crc();
9f95a23c 738
7c673cae
FG
739 if (!no_verify) {
740 snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", current_index);
9f95a23c 741 if ( (cur_contents->length() != data.op_size) ||
7c673cae
FG
742 (memcmp(data.object_contents, cur_contents->c_str(), data.op_size) != 0) ) {
743 cerr << name[slot] << " is not correct!" << std::endl;
744 ++errors;
745 }
746 }
747
9f95a23c
TL
748 bool start_new_read = (seconds_to_run && mono_clock::now() < finish_time) &&
749 num_ops > data.started;
750 if (start_new_read) {
751 newName = generate_object_name_fast(data.started / reads_per_object, pid);
752 index[slot] = data.started;
753 }
754
755 locker.unlock();
7c673cae 756 completion_wait(slot);
9f95a23c 757 locker.lock();
7c673cae
FG
758 r = completion_ret(slot);
759 if (r < 0) {
760 cerr << "read got " << r << std::endl;
9f95a23c 761 locker.unlock();
7c673cae
FG
762 goto ERR;
763 }
11fdf7f2
TL
764 total_latency += data.cur_latency.count();
765 if (data.cur_latency.count() > data.max_latency)
766 data.max_latency = data.cur_latency.count();
767 if (data.cur_latency.count() < data.min_latency)
768 data.min_latency = data.cur_latency.count();
7c673cae
FG
769 ++data.finished;
770 data.avg_latency = total_latency / data.finished;
771 --data.in_flight;
9f95a23c 772 locker.unlock();
7c673cae
FG
773 release_completion(slot);
774
9f95a23c
TL
775 if (!start_new_read)
776 continue;
777
7c673cae 778 //start new read and check data if requested
11fdf7f2 779 start_times[slot] = mono_clock::now();
7c673cae 780 create_completion(slot, _aio_cb, (void *)&lc);
11fdf7f2
TL
781 r = aio_read(newName, slot, contents[slot].get(), data.op_size,
782 data.op_size * (data.started % reads_per_object));
7c673cae
FG
783 if (r < 0) {
784 goto ERR;
785 }
9f95a23c 786 locker.lock();
7c673cae
FG
787 ++data.started;
788 ++data.in_flight;
9f95a23c 789 locker.unlock();
7c673cae
FG
790 name[slot] = newName;
791 }
792
11fdf7f2 793 timePassed = mono_clock::now() - data.start_time;
9f95a23c 794 locker.lock();
7c673cae 795 data.done = true;
9f95a23c 796 locker.unlock();
7c673cae
FG
797
798 pthread_join(print_thread, NULL);
799
800 double bandwidth;
11fdf7f2 801 bandwidth = ((double)data.finished)*((double)data.op_size)/timePassed.count();
7c673cae 802 bandwidth = bandwidth/(1024*1024); // we want it in MB/sec
9f95a23c 803
11fdf7f2
TL
804 double iops_stddev;
805 if (data.idata.iops_cycles > 1) {
806 iops_stddev = std::sqrt(data.idata.iops_diff_sum / (data.idata.iops_cycles - 1));
807 } else {
808 iops_stddev = 0;
809 }
7c673cae
FG
810
811 if (!formatter) {
11fdf7f2 812 out(cout) << "Total time run: " << timePassed.count() << std::endl
7c673cae
FG
813 << "Total reads made: " << data.finished << std::endl
814 << "Read size: " << data.op_size << std::endl
815 << "Object size: " << data.object_size << std::endl
816 << "Bandwidth (MB/sec): " << setprecision(6) << bandwidth << std::endl
11fdf7f2
TL
817 << "Average IOPS: " << (int)(data.finished/timePassed.count()) << std::endl
818 << "Stddev IOPS: " << iops_stddev << std::endl
7c673cae
FG
819 << "Max IOPS: " << data.idata.max_iops << std::endl
820 << "Min IOPS: " << data.idata.min_iops << std::endl
821 << "Average Latency(s): " << data.avg_latency << std::endl
822 << "Max latency(s): " << data.max_latency << std::endl
823 << "Min latency(s): " << data.min_latency << std::endl;
824 } else {
11fdf7f2 825 formatter->dump_format("total_time_run", "%f", timePassed.count());
7c673cae
FG
826 formatter->dump_format("total_reads_made", "%d", data.finished);
827 formatter->dump_format("read_size", "%d", data.op_size);
828 formatter->dump_format("object_size", "%d", data.object_size);
829 formatter->dump_format("bandwidth", "%f", bandwidth);
11fdf7f2
TL
830 formatter->dump_format("average_iops", "%d", (int)(data.finished/timePassed.count()));
831 formatter->dump_format("stddev_iops", "%f", iops_stddev);
7c673cae
FG
832 formatter->dump_format("max_iops", "%d", data.idata.max_iops);
833 formatter->dump_format("min_iops", "%d", data.idata.min_iops);
834 formatter->dump_format("average_latency", "%f", data.avg_latency);
835 formatter->dump_format("max_latency", "%f", data.max_latency);
836 formatter->dump_format("min_latency", "%f", data.min_latency);
837 }
838
839 completions_done();
840
841 return (errors > 0 ? -EIO : 0);
842
843 ERR:
9f95a23c 844 locker.lock();
7c673cae 845 data.done = 1;
9f95a23c 846 locker.unlock();
7c673cae
FG
847 pthread_join(print_thread, NULL);
848 return r;
849}
850
9f95a23c
TL
851int ObjBencher::rand_read_bench(
852 int seconds_to_run, int num_ops, int num_objects,
853 int concurrentios, int pid, bool no_verify) {
854
7c673cae
FG
855 lock_cond lc(&lock);
856
857 if (concurrentios <= 0)
858 return -EINVAL;
859
860 std::vector<string> name(concurrentios);
861 std::string newName;
11fdf7f2 862 unique_ptr<bufferlist> contents[concurrentios];
7c673cae
FG
863 int index[concurrentios];
864 int errors = 0;
7c673cae 865 int r = 0;
11fdf7f2
TL
866 double total_latency = 0;
867 std::vector<mono_time> start_times(concurrentios);
868 mono_clock::duration time_to_run = std::chrono::seconds(seconds_to_run);
869 std::chrono::duration<double> timePassed;
7c673cae
FG
870 sanitize_object_contents(&data, data.op_size); //clean it up once; subsequent
871 //changes will be safe because string length should remain the same
872
11fdf7f2 873 unsigned reads_per_object = 1;
7c673cae 874 if (data.op_size)
11fdf7f2 875 reads_per_object = data.object_size / data.op_size;
7c673cae
FG
876
877 srand (time(NULL));
878
879 r = completions_init(concurrentios);
880 if (r < 0)
881 return r;
882
883 //set up initial reads
884 for (int i = 0; i < concurrentios; ++i) {
11fdf7f2
TL
885 name[i] = generate_object_name_fast(i / reads_per_object, pid);
886 contents[i] = std::make_unique<bufferlist>();
7c673cae
FG
887 }
888
9f95a23c 889 unique_lock locker{lock};
7c673cae 890 data.finished = 0;
11fdf7f2 891 data.start_time = mono_clock::now();
9f95a23c 892 locker.unlock();
7c673cae
FG
893
894 pthread_t print_thread;
895 pthread_create(&print_thread, NULL, status_printer, (void *)this);
896 ceph_pthread_setname(print_thread, "rand_read_stat");
897
11fdf7f2 898 mono_time finish_time = data.start_time + time_to_run;
7c673cae
FG
899 //start initial reads
900 for (int i = 0; i < concurrentios; ++i) {
901 index[i] = i;
11fdf7f2 902 start_times[i] = mono_clock::now();
7c673cae 903 create_completion(i, _aio_cb, (void *)&lc);
11fdf7f2
TL
904 r = aio_read(name[i], i, contents[i].get(), data.op_size,
905 data.op_size * (i % reads_per_object));
906 if (r < 0) {
7c673cae
FG
907 cerr << "r = " << r << std::endl;
908 goto ERR;
909 }
9f95a23c 910 locker.lock();
7c673cae
FG
911 ++data.started;
912 ++data.in_flight;
9f95a23c 913 locker.unlock();
7c673cae
FG
914 }
915
916 //keep on adding new reads as old ones complete
917 int slot;
918 bufferlist *cur_contents;
919 int rand_id;
920
921 slot = 0;
9f95a23c
TL
922 while (data.finished < data.started) {
923 locker.lock();
7c673cae
FG
924 int old_slot = slot;
925 bool found = false;
926 while (1) {
927 do {
928 if (completion_is_done(slot)) {
929 found = true;
930 break;
931 }
932 slot++;
933 if (slot == concurrentios) {
934 slot = 0;
935 }
936 } while (slot != old_slot);
937 if (found) {
938 break;
939 }
9f95a23c 940 lc.cond.wait(locker);
7c673cae
FG
941 }
942
943 // calculate latency here, so memcmp doesn't inflate it
11fdf7f2 944 data.cur_latency = mono_clock::now() - start_times[slot];
7c673cae 945
9f95a23c 946 locker.unlock();
7c673cae
FG
947
948 int current_index = index[slot];
11fdf7f2 949 cur_contents = contents[slot].get();
7c673cae 950 completion_wait(slot);
9f95a23c 951 locker.lock();
7c673cae
FG
952 r = completion_ret(slot);
953 if (r < 0) {
954 cerr << "read got " << r << std::endl;
9f95a23c 955 locker.unlock();
7c673cae
FG
956 goto ERR;
957 }
958
11fdf7f2
TL
959 total_latency += data.cur_latency.count();
960 if (data.cur_latency.count() > data.max_latency)
961 data.max_latency = data.cur_latency.count();
962 if (data.cur_latency.count() < data.min_latency)
963 data.min_latency = data.cur_latency.count();
7c673cae
FG
964 ++data.finished;
965 data.avg_latency = total_latency / data.finished;
966 --data.in_flight;
9f95a23c 967
7c673cae
FG
968 if (!no_verify) {
969 snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", current_index);
9f95a23c 970 if ((cur_contents->length() != data.op_size) ||
7c673cae
FG
971 (memcmp(data.object_contents, cur_contents->c_str(), data.op_size) != 0)) {
972 cerr << name[slot] << " is not correct!" << std::endl;
973 ++errors;
974 }
9f95a23c
TL
975 }
976
977 locker.unlock();
978 release_completion(slot);
979
980 if (!seconds_to_run || mono_clock::now() >= finish_time)
981 continue;
982
983 //start new read and check data if requested
7c673cae 984
9f95a23c 985 rand_id = rand() % num_ops;
11fdf7f2 986 newName = generate_object_name_fast(rand_id / reads_per_object, pid);
7c673cae 987 index[slot] = rand_id;
7c673cae
FG
988
989 // invalidate internal crc cache
990 cur_contents->invalidate_crc();
991
11fdf7f2 992 start_times[slot] = mono_clock::now();
7c673cae 993 create_completion(slot, _aio_cb, (void *)&lc);
11fdf7f2
TL
994 r = aio_read(newName, slot, contents[slot].get(), data.op_size,
995 data.op_size * (rand_id % reads_per_object));
7c673cae
FG
996 if (r < 0) {
997 goto ERR;
998 }
9f95a23c 999 locker.lock();
7c673cae
FG
1000 ++data.started;
1001 ++data.in_flight;
9f95a23c 1002 locker.unlock();
7c673cae
FG
1003 name[slot] = newName;
1004 }
1005
11fdf7f2 1006 timePassed = mono_clock::now() - data.start_time;
9f95a23c 1007 locker.lock();
7c673cae 1008 data.done = true;
9f95a23c 1009 locker.unlock();
7c673cae
FG
1010
1011 pthread_join(print_thread, NULL);
1012
1013 double bandwidth;
11fdf7f2 1014 bandwidth = ((double)data.finished)*((double)data.op_size)/timePassed.count();
7c673cae 1015 bandwidth = bandwidth/(1024*1024); // we want it in MB/sec
9f95a23c 1016
11fdf7f2
TL
1017 double iops_stddev;
1018 if (data.idata.iops_cycles > 1) {
1019 iops_stddev = std::sqrt(data.idata.iops_diff_sum / (data.idata.iops_cycles - 1));
1020 } else {
1021 iops_stddev = 0;
1022 }
7c673cae
FG
1023
1024 if (!formatter) {
11fdf7f2 1025 out(cout) << "Total time run: " << timePassed.count() << std::endl
7c673cae
FG
1026 << "Total reads made: " << data.finished << std::endl
1027 << "Read size: " << data.op_size << std::endl
1028 << "Object size: " << data.object_size << std::endl
1029 << "Bandwidth (MB/sec): " << setprecision(6) << bandwidth << std::endl
11fdf7f2
TL
1030 << "Average IOPS: " << (int)(data.finished/timePassed.count()) << std::endl
1031 << "Stddev IOPS: " << iops_stddev << std::endl
7c673cae
FG
1032 << "Max IOPS: " << data.idata.max_iops << std::endl
1033 << "Min IOPS: " << data.idata.min_iops << std::endl
1034 << "Average Latency(s): " << data.avg_latency << std::endl
1035 << "Max latency(s): " << data.max_latency << std::endl
1036 << "Min latency(s): " << data.min_latency << std::endl;
1037 } else {
11fdf7f2 1038 formatter->dump_format("total_time_run", "%f", timePassed.count());
7c673cae
FG
1039 formatter->dump_format("total_reads_made", "%d", data.finished);
1040 formatter->dump_format("read_size", "%d", data.op_size);
1041 formatter->dump_format("object_size", "%d", data.object_size);
1042 formatter->dump_format("bandwidth", "%f", bandwidth);
11fdf7f2
TL
1043 formatter->dump_format("average_iops", "%d", (int)(data.finished/timePassed.count()));
1044 formatter->dump_format("stddev_iops", "%f", iops_stddev);
7c673cae
FG
1045 formatter->dump_format("max_iops", "%d", data.idata.max_iops);
1046 formatter->dump_format("min_iops", "%d", data.idata.min_iops);
1047 formatter->dump_format("average_latency", "%f", data.avg_latency);
1048 formatter->dump_format("max_latency", "%f", data.max_latency);
1049 formatter->dump_format("min_latency", "%f", data.min_latency);
1050 }
1051 completions_done();
1052
1053 return (errors > 0 ? -EIO : 0);
1054
1055 ERR:
9f95a23c 1056 locker.lock();
7c673cae 1057 data.done = 1;
9f95a23c 1058 locker.unlock();
7c673cae
FG
1059 pthread_join(print_thread, NULL);
1060 return r;
1061}
1062
1063int ObjBencher::clean_up(const std::string& orig_prefix, int concurrentios, const std::string& run_name) {
1064 int r = 0;
1065 uint64_t op_size, object_size;
9f95a23c 1066 int num_ops, num_objects;
7c673cae
FG
1067 int prevPid;
1068
1069 // default meta object if user does not specify one
1070 const std::string run_name_meta = (run_name.empty() ? BENCH_LASTRUN_METADATA : run_name);
1071 const std::string prefix = (orig_prefix.empty() ? generate_object_prefix_nopid() : orig_prefix);
1072
1073 if (prefix.substr(0, BENCH_PREFIX.length()) != BENCH_PREFIX) {
1074 cerr << "Specified --prefix invalid, it must begin with \"" << BENCH_PREFIX << "\"" << std::endl;
1075 return -EINVAL;
1076 }
1077
1078 std::list<Object> unfiltered_objects;
1079 std::set<std::string> meta_namespaces, all_namespaces;
1080
1081 // If caller set all_nspaces this will be searching
1082 // across multiple namespaces.
1083 while (true) {
1084 bool objects_remain = get_objects(&unfiltered_objects, 20);
1085 if (!objects_remain)
1086 break;
1087
1088 std::list<Object>::const_iterator i = unfiltered_objects.begin();
1089 for ( ; i != unfiltered_objects.end(); ++i) {
1090 if (i->first == run_name_meta) {
1091 meta_namespaces.insert(i->second);
1092 }
1093 if (i->first.substr(0, prefix.length()) == prefix) {
1094 all_namespaces.insert(i->second);
1095 }
1096 }
1097 }
1098
1099 std::set<std::string>::const_iterator i = all_namespaces.begin();
1100 for ( ; i != all_namespaces.end(); ++i) {
1101 set_namespace(*i);
1102
1103 // if no metadata file found we should try to do a linear search on the prefix
1104 if (meta_namespaces.find(*i) == meta_namespaces.end()) {
1105 int r = clean_up_slow(prefix, concurrentios);
1106 if (r < 0) {
1107 cerr << "clean_up_slow error r= " << r << std::endl;
1108 return r;
1109 }
1110 continue;
1111 }
1112
9f95a23c 1113 r = fetch_bench_metadata(run_name_meta, &op_size, &object_size, &num_ops, &num_objects, &prevPid);
7c673cae
FG
1114 if (r < 0) {
1115 return r;
1116 }
1117
1118 r = clean_up(num_objects, prevPid, concurrentios);
1119 if (r != 0) return r;
1120
1121 r = sync_remove(run_name_meta);
1122 if (r != 0) return r;
1123 }
1124
1125 return 0;
1126}
1127
1128int ObjBencher::clean_up(int num_objects, int prevPid, int concurrentios) {
1129 lock_cond lc(&lock);
9f95a23c
TL
1130
1131 if (concurrentios <= 0)
7c673cae
FG
1132 return -EINVAL;
1133
1134 std::vector<string> name(concurrentios);
1135 std::string newName;
1136 int r = 0;
7c673cae
FG
1137 int slot = 0;
1138
9f95a23c 1139 unique_lock locker{lock};
7c673cae
FG
1140 data.done = false;
1141 data.in_flight = 0;
1142 data.started = 0;
1143 data.finished = 0;
9f95a23c 1144 locker.unlock();
7c673cae
FG
1145
1146 // don't start more completions than files
1147 if (num_objects == 0) {
1148 return 0;
1149 } else if (num_objects < concurrentios) {
1150 concurrentios = num_objects;
1151 }
1152
1153 r = completions_init(concurrentios);
1154 if (r < 0)
1155 return r;
1156
1157 //set up initial removes
1158 for (int i = 0; i < concurrentios; ++i) {
11fdf7f2 1159 name[i] = generate_object_name_fast(i, prevPid);
7c673cae
FG
1160 }
1161
1162 //start initial removes
1163 for (int i = 0; i < concurrentios; ++i) {
1164 create_completion(i, _aio_cb, (void *)&lc);
1165 r = aio_remove(name[i], i);
1166 if (r < 0) { //naughty, doesn't clean up heap
1167 cerr << "r = " << r << std::endl;
1168 goto ERR;
1169 }
9f95a23c 1170 locker.lock();
7c673cae
FG
1171 ++data.started;
1172 ++data.in_flight;
9f95a23c 1173 locker.unlock();
7c673cae
FG
1174 }
1175
1176 //keep on adding new removes as old ones complete
9f95a23c
TL
1177 while (data.finished < data.started) {
1178 locker.lock();
7c673cae
FG
1179 int old_slot = slot;
1180 bool found = false;
1181 while (1) {
1182 do {
1183 if (completion_is_done(slot)) {
1184 found = true;
1185 break;
1186 }
1187 slot++;
1188 if (slot == concurrentios) {
1189 slot = 0;
1190 }
1191 } while (slot != old_slot);
1192 if (found) {
1193 break;
1194 }
9f95a23c 1195 lc.cond.wait(locker);
7c673cae 1196 }
9f95a23c 1197 locker.unlock();
7c673cae 1198 completion_wait(slot);
9f95a23c 1199 locker.lock();
7c673cae
FG
1200 r = completion_ret(slot);
1201 if (r != 0 && r != -ENOENT) { // file does not exist
1202 cerr << "remove got " << r << std::endl;
9f95a23c 1203 locker.unlock();
7c673cae
FG
1204 goto ERR;
1205 }
1206 ++data.finished;
1207 --data.in_flight;
9f95a23c 1208 locker.unlock();
7c673cae
FG
1209 release_completion(slot);
1210
9f95a23c
TL
1211 if (data.started >= num_objects)
1212 continue;
1213
7c673cae 1214 //start new remove and check data if requested
9f95a23c 1215 newName = generate_object_name_fast(data.started, prevPid);
7c673cae
FG
1216 create_completion(slot, _aio_cb, (void *)&lc);
1217 r = aio_remove(newName, slot);
1218 if (r < 0) {
1219 goto ERR;
1220 }
9f95a23c 1221 locker.lock();
7c673cae
FG
1222 ++data.started;
1223 ++data.in_flight;
9f95a23c 1224 locker.unlock();
7c673cae
FG
1225 name[slot] = newName;
1226 }
1227
9f95a23c 1228 locker.lock();
7c673cae 1229 data.done = true;
9f95a23c 1230 locker.unlock();
7c673cae
FG
1231
1232 completions_done();
1233
1234 out(cout) << "Removed " << data.finished << " object" << (data.finished != 1 ? "s" : "") << std::endl;
1235
1236 return 0;
1237
1238 ERR:
9f95a23c 1239 locker.lock();
7c673cae 1240 data.done = 1;
9f95a23c 1241 locker.unlock();
7c673cae
FG
1242 return r;
1243}
1244
1245/**
1246 * Return objects from the datastore which match a prefix.
1247 *
1248 * Clears the list and populates it with any objects which match the
1249 * prefix. The list is guaranteed to have at least one item when the
1250 * function returns true.
1251 *
1252 * @param prefix the prefix to match against
1253 * @param objects [out] return list of objects
1254 * @returns true if there are any objects in the store which match
1255 * the prefix, false if there are no more
1256 */
1257bool ObjBencher::more_objects_matching_prefix(const std::string& prefix, std::list<Object>* objects) {
1258 std::list<Object> unfiltered_objects;
1259
1260 objects->clear();
1261
1262 while (objects->empty()) {
1263 bool objects_remain = get_objects(&unfiltered_objects, 20);
1264 if (!objects_remain)
1265 return false;
1266
1267 std::list<Object>::const_iterator i = unfiltered_objects.begin();
1268 for ( ; i != unfiltered_objects.end(); ++i) {
1269 if (i->first.substr(0, prefix.length()) == prefix) {
1270 objects->push_back(*i);
1271 }
1272 }
1273 }
1274
1275 return true;
1276}
1277
1278int ObjBencher::clean_up_slow(const std::string& prefix, int concurrentios) {
1279 lock_cond lc(&lock);
1280
1281 if (concurrentios <= 0)
1282 return -EINVAL;
1283
1284 std::vector<Object> name(concurrentios);
1285 Object newName;
1286 int r = 0;
7c673cae
FG
1287 int slot = 0;
1288 std::list<Object> objects;
1289 bool objects_remain = true;
1290
9f95a23c 1291 std::unique_lock locker{lock};
7c673cae
FG
1292 data.done = false;
1293 data.in_flight = 0;
1294 data.started = 0;
1295 data.finished = 0;
9f95a23c 1296 locker.unlock();
7c673cae
FG
1297
1298 out(cout) << "Warning: using slow linear search" << std::endl;
1299
1300 r = completions_init(concurrentios);
1301 if (r < 0)
1302 return r;
1303
1304 //set up initial removes
1305 for (int i = 0; i < concurrentios; ++i) {
1306 if (objects.empty()) {
1307 // if there are fewer objects than concurrent ios, don't generate extras
1308 bool objects_found = more_objects_matching_prefix(prefix, &objects);
1309 if (!objects_found) {
1310 concurrentios = i;
1311 objects_remain = false;
1312 break;
1313 }
1314 }
1315
1316 name[i] = objects.front();
1317 objects.pop_front();
1318 }
1319
1320 //start initial removes
1321 for (int i = 0; i < concurrentios; ++i) {
1322 create_completion(i, _aio_cb, (void *)&lc);
1323 set_namespace(name[i].second);
1324 r = aio_remove(name[i].first, i);
1325 if (r < 0) { //naughty, doesn't clean up heap
1326 cerr << "r = " << r << std::endl;
1327 goto ERR;
1328 }
9f95a23c 1329 locker.lock();
7c673cae
FG
1330 ++data.started;
1331 ++data.in_flight;
9f95a23c 1332 locker.unlock();
7c673cae
FG
1333 }
1334
1335 //keep on adding new removes as old ones complete
1336 while (objects_remain) {
9f95a23c 1337 locker.lock();
7c673cae
FG
1338 int old_slot = slot;
1339 bool found = false;
1340 while (1) {
1341 do {
1342 if (completion_is_done(slot)) {
1343 found = true;
1344 break;
1345 }
1346 slot++;
1347 if (slot == concurrentios) {
1348 slot = 0;
1349 }
1350 } while (slot != old_slot);
1351 if (found) {
1352 break;
1353 }
9f95a23c 1354 lc.cond.wait(locker);
7c673cae 1355 }
9f95a23c 1356 locker.unlock();
7c673cae
FG
1357
1358 // get more objects if necessary
1359 if (objects.empty()) {
1360 objects_remain = more_objects_matching_prefix(prefix, &objects);
1361 // quit if there are no more
1362 if (!objects_remain) {
1363 break;
1364 }
1365 }
1366
1367 // get the next object
1368 newName = objects.front();
1369 objects.pop_front();
1370
1371 completion_wait(slot);
9f95a23c 1372 locker.lock();
7c673cae
FG
1373 r = completion_ret(slot);
1374 if (r != 0 && r != -ENOENT) { // file does not exist
1375 cerr << "remove got " << r << std::endl;
9f95a23c 1376 locker.unlock();
7c673cae
FG
1377 goto ERR;
1378 }
1379 ++data.finished;
1380 --data.in_flight;
9f95a23c 1381 locker.unlock();
7c673cae
FG
1382 release_completion(slot);
1383
1384 //start new remove and check data if requested
1385 create_completion(slot, _aio_cb, (void *)&lc);
1386 set_namespace(newName.second);
1387 r = aio_remove(newName.first, slot);
1388 if (r < 0) {
1389 goto ERR;
1390 }
9f95a23c 1391 locker.lock();
7c673cae
FG
1392 ++data.started;
1393 ++data.in_flight;
9f95a23c 1394 locker.unlock();
7c673cae
FG
1395 name[slot] = newName;
1396 }
1397
1398 //wait for final removes to complete
1399 while (data.finished < data.started) {
1400 slot = data.finished % concurrentios;
1401 completion_wait(slot);
9f95a23c 1402 locker.lock();
7c673cae
FG
1403 r = completion_ret(slot);
1404 if (r != 0 && r != -ENOENT) { // file does not exist
1405 cerr << "remove got " << r << std::endl;
9f95a23c 1406 locker.unlock();
7c673cae
FG
1407 goto ERR;
1408 }
1409 ++data.finished;
1410 --data.in_flight;
1411 release_completion(slot);
9f95a23c 1412 locker.unlock();
7c673cae
FG
1413 }
1414
9f95a23c 1415 locker.lock();
7c673cae 1416 data.done = true;
9f95a23c 1417 locker.unlock();
7c673cae
FG
1418
1419 completions_done();
1420
1421 out(cout) << "Removed " << data.finished << " object" << (data.finished != 1 ? "s" : "") << std::endl;
1422
1423 return 0;
1424
1425 ERR:
9f95a23c 1426 locker.lock();
7c673cae 1427 data.done = 1;
9f95a23c 1428 locker.unlock();
7c673cae
FG
1429 return -EIO;
1430}