]> git.proxmox.com Git - ceph.git/blame - ceph/src/common/obj_bencher.cc
update ceph source to reef 18.2.0
[ceph.git] / ceph / src / common / obj_bencher.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2009 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 * Series of functions to test your rados installation. Notice
14 * that this code is not terribly robust -- for instance, if you
15 * try and bench on a pool you don't have permission to access
16 * it will just loop forever.
17 */
18#include "include/compat.h"
19#include <pthread.h>
9f95a23c
TL
20#include "common/ceph_mutex.h"
21#include "common/Clock.h"
7c673cae
FG
22#include "obj_bencher.h"
23
20effc67
TL
24using std::ostream;
25using std::cerr;
26using std::cout;
27using std::setfill;
28using std::setprecision;
29using std::setw;
30using std::string;
31using std::unique_lock;
32using std::unique_ptr;
33
7c673cae
FG
34const std::string BENCH_LASTRUN_METADATA = "benchmark_last_metadata";
35const std::string BENCH_PREFIX = "benchmark_data";
11fdf7f2
TL
36const std::string BENCH_OBJ_NAME = BENCH_PREFIX + "_%s_%d_object%d";
37
7c673cae
FG
38static char cached_hostname[30] = {0};
39int cached_pid = 0;
40
41static std::string generate_object_prefix_nopid() {
42 if (cached_hostname[0] == 0) {
43 gethostname(cached_hostname, sizeof(cached_hostname)-1);
44 cached_hostname[sizeof(cached_hostname)-1] = 0;
45 }
46
47 std::ostringstream oss;
48 oss << BENCH_PREFIX << "_" << cached_hostname;
49 return oss.str();
50}
51
52static std::string generate_object_prefix(int pid = 0) {
53 if (pid)
54 cached_pid = pid;
55 else if (!cached_pid)
56 cached_pid = getpid();
57
58 std::ostringstream oss;
59 oss << generate_object_prefix_nopid() << "_" << cached_pid;
60 return oss.str();
61}
62
11fdf7f2
TL
63// this is 8x faster than previous impl based on chained, deduped functions call
64static std::string generate_object_name_fast(int objnum, int pid = 0)
7c673cae 65{
11fdf7f2
TL
66 if (cached_hostname[0] == 0) {
67 gethostname(cached_hostname, sizeof(cached_hostname)-1);
68 cached_hostname[sizeof(cached_hostname)-1] = 0;
69 }
70
71 if (pid)
72 cached_pid = pid;
73 else if (!cached_pid)
74 cached_pid = getpid();
75
76 char name[512];
77 int n = snprintf(&name[0], sizeof(name), BENCH_OBJ_NAME.c_str(), cached_hostname, cached_pid, objnum);
78 ceph_assert(n > 0 && n < (int)sizeof(name));
79 return std::string(&name[0], (size_t)n);
7c673cae
FG
80}
81
82static void sanitize_object_contents (bench_data *data, size_t length) {
92f5a8d4 83 // FIPS zeroization audit 20191115: this memset is not security related.
7c673cae
FG
84 memset(data->object_contents, 'z', length);
85}
86
87ostream& ObjBencher::out(ostream& os, utime_t& t)
88{
89 if (show_time)
90 return t.localtime(os) << " ";
91 else
92 return os;
93}
94
95ostream& ObjBencher::out(ostream& os)
96{
97 utime_t cur_time = ceph_clock_now();
98 return out(os, cur_time);
99}
100
101void *ObjBencher::status_printer(void *_bencher) {
102 ObjBencher *bencher = static_cast<ObjBencher *>(_bencher);
103 bench_data& data = bencher->data;
104 Formatter *formatter = bencher->formatter;
105 ostream *outstream = bencher->outstream;
9f95a23c 106 ceph::condition_variable cond;
7c673cae
FG
107 int i = 0;
108 int previous_writes = 0;
109 int cycleSinceChange = 0;
110 double bandwidth;
11fdf7f2
TL
111 int iops = 0;
112 mono_clock::duration ONE_SECOND = std::chrono::seconds(1);
9f95a23c 113 std::unique_lock locker{bencher->lock};
7c673cae
FG
114 if (formatter)
115 formatter->open_array_section("datas");
116 while(!data.done) {
11fdf7f2
TL
117 mono_time cur_time = mono_clock::now();
118 utime_t t = ceph_clock_now();
7c673cae
FG
119
120 if (i % 20 == 0 && !formatter) {
121 if (i > 0)
11fdf7f2
TL
122 t.localtime(cout)
123 << " min lat: " << data.min_latency
7c673cae
FG
124 << " max lat: " << data.max_latency
125 << " avg lat: " << data.avg_latency << std::endl;
126 //I'm naughty and don't reset the fill
11fdf7f2 127 bencher->out(cout, t) << setfill(' ')
7c673cae
FG
128 << setw(5) << "sec"
129 << setw(8) << "Cur ops"
130 << setw(10) << "started"
131 << setw(10) << "finished"
132 << setw(10) << "avg MB/s"
133 << setw(10) << "cur MB/s"
134 << setw(12) << "last lat(s)"
135 << setw(12) << "avg lat(s)" << std::endl;
136 }
137 if (cycleSinceChange)
138 bandwidth = (double)(data.finished - previous_writes)
139 * (data.op_size)
140 / (1024*1024)
141 / cycleSinceChange;
142 else
143 bandwidth = -1;
144
145 if (!std::isnan(bandwidth) && bandwidth > -1) {
146 if (bandwidth > data.idata.max_bandwidth)
147 data.idata.max_bandwidth = bandwidth;
148 if (bandwidth < data.idata.min_bandwidth)
149 data.idata.min_bandwidth = bandwidth;
150
11fdf7f2
TL
151 ++data.idata.bandwidth_cycles;
152 double delta = bandwidth - data.idata.avg_bandwidth;
153 data.idata.avg_bandwidth += delta / data.idata.bandwidth_cycles;
154 data.idata.bandwidth_diff_sum += delta * (bandwidth - data.idata.avg_bandwidth);
7c673cae
FG
155 }
156
157 if (cycleSinceChange)
158 iops = (double)(data.finished - previous_writes)
159 / cycleSinceChange;
160 else
161 iops = -1;
162
163 if (!std::isnan(iops) && iops > -1) {
164 if (iops > data.idata.max_iops)
165 data.idata.max_iops = iops;
166 if (iops < data.idata.min_iops)
167 data.idata.min_iops = iops;
168
11fdf7f2
TL
169 ++data.idata.iops_cycles;
170 double delta = iops - data.idata.avg_iops;
171 data.idata.avg_iops += delta / data.idata.iops_cycles;
172 data.idata.iops_diff_sum += delta * (iops - data.idata.avg_iops);
7c673cae
FG
173 }
174
175 if (formatter)
176 formatter->open_object_section("data");
177
11fdf7f2
TL
178 // elapsed will be in seconds, by default
179 std::chrono::duration<double> elapsed = cur_time - data.start_time;
7c673cae 180 double avg_bandwidth = (double) (data.op_size) * (data.finished)
11fdf7f2 181 / elapsed.count() / (1024*1024);
7c673cae
FG
182 if (previous_writes != data.finished) {
183 previous_writes = data.finished;
184 cycleSinceChange = 0;
185 if (!formatter) {
11fdf7f2 186 bencher->out(cout, t)
7c673cae
FG
187 << setfill(' ')
188 << setw(5) << i
189 << ' ' << setw(7) << data.in_flight
190 << ' ' << setw(9) << data.started
191 << ' ' << setw(9) << data.finished
192 << ' ' << setw(9) << avg_bandwidth
193 << ' ' << setw(9) << bandwidth
11fdf7f2 194 << ' ' << setw(11) << (double)data.cur_latency.count()
7c673cae
FG
195 << ' ' << setw(11) << data.avg_latency << std::endl;
196 } else {
197 formatter->dump_format("sec", "%d", i);
198 formatter->dump_format("cur_ops", "%d", data.in_flight);
199 formatter->dump_format("started", "%d", data.started);
200 formatter->dump_format("finished", "%d", data.finished);
201 formatter->dump_format("avg_bw", "%f", avg_bandwidth);
202 formatter->dump_format("cur_bw", "%f", bandwidth);
11fdf7f2 203 formatter->dump_format("last_lat", "%f", (double)data.cur_latency.count());
7c673cae
FG
204 formatter->dump_format("avg_lat", "%f", data.avg_latency);
205 }
206 }
207 else {
208 if (!formatter) {
11fdf7f2 209 bencher->out(cout, t)
7c673cae
FG
210 << setfill(' ')
211 << setw(5) << i
212 << ' ' << setw(7) << data.in_flight
213 << ' ' << setw(9) << data.started
214 << ' ' << setw(9) << data.finished
215 << ' ' << setw(9) << avg_bandwidth
216 << ' ' << setw(9) << '0'
217 << ' ' << setw(11) << '-'
218 << ' '<< setw(11) << data.avg_latency << std::endl;
219 } else {
220 formatter->dump_format("sec", "%d", i);
221 formatter->dump_format("cur_ops", "%d", data.in_flight);
222 formatter->dump_format("started", "%d", data.started);
223 formatter->dump_format("finished", "%d", data.finished);
224 formatter->dump_format("avg_bw", "%f", avg_bandwidth);
225 formatter->dump_format("cur_bw", "%f", 0);
226 formatter->dump_format("last_lat", "%f", 0);
227 formatter->dump_format("avg_lat", "%f", data.avg_latency);
228 }
229 }
230 if (formatter) {
231 formatter->close_section(); // data
232 formatter->flush(*outstream);
233 }
234 ++i;
235 ++cycleSinceChange;
9f95a23c 236 cond.wait_for(locker, ONE_SECOND);
7c673cae
FG
237 }
238 if (formatter)
239 formatter->close_section(); //datas
11fdf7f2
TL
240 if (iops < 0) {
241 std::chrono::duration<double> runtime = mono_clock::now() - data.start_time;
242 data.idata.min_iops = data.idata.max_iops = data.finished / runtime.count();
243 }
7c673cae
FG
244 return NULL;
245}
246
247int ObjBencher::aio_bench(
248 int operation, int secondsToRun,
249 int concurrentios,
250 uint64_t op_size, uint64_t object_size,
251 unsigned max_objects,
252 bool cleanup, bool hints,
11fdf7f2 253 const std::string& run_name, bool reuse_bench, bool no_verify) {
7c673cae
FG
254
255 if (concurrentios <= 0)
256 return -EINVAL;
257
9f95a23c 258 int num_ops = 0;
7c673cae
FG
259 int num_objects = 0;
260 int r = 0;
11fdf7f2
TL
261 int prev_pid = 0;
262 std::chrono::duration<double> timePassed;
7c673cae
FG
263
264 // default metadata object is used if user does not specify one
265 const std::string run_name_meta = (run_name.empty() ? BENCH_LASTRUN_METADATA : run_name);
266
267 //get data from previous write run, if available
11fdf7f2 268 if (operation != OP_WRITE || reuse_bench) {
7c673cae
FG
269 uint64_t prev_op_size, prev_object_size;
270 r = fetch_bench_metadata(run_name_meta, &prev_op_size, &prev_object_size,
9f95a23c 271 &num_ops, &num_objects, &prev_pid);
7c673cae 272 if (r < 0) {
11fdf7f2
TL
273 if (r == -ENOENT) {
274 if (reuse_bench)
275 cerr << "Must write data before using reuse_bench for a write benchmark!" << std::endl;
276 else
277 cerr << "Must write data before running a read benchmark!" << std::endl;
278 }
7c673cae
FG
279 return r;
280 }
281 object_size = prev_object_size;
282 op_size = prev_op_size;
283 }
284
285 char* contentsChars = new char[op_size];
11fdf7f2 286 lock.lock();
7c673cae
FG
287 data.done = false;
288 data.hints = hints;
289 data.object_size = object_size;
290 data.op_size = op_size;
291 data.in_flight = 0;
292 data.started = 0;
293 data.finished = 0;
294 data.min_latency = 9999.0; // this better be higher than initial latency!
295 data.max_latency = 0;
296 data.avg_latency = 0;
11fdf7f2 297 data.latency_diff_sum = 0;
7c673cae 298 data.object_contents = contentsChars;
11fdf7f2 299 lock.unlock();
7c673cae
FG
300
301 //fill in contentsChars deterministically so we can check returns
302 sanitize_object_contents(&data, data.op_size);
303
304 if (formatter)
305 formatter->open_object_section("bench");
306
307 if (OP_WRITE == operation) {
11fdf7f2 308 r = write_bench(secondsToRun, concurrentios, run_name_meta, max_objects, prev_pid);
7c673cae
FG
309 if (r != 0) goto out;
310 }
311 else if (OP_SEQ_READ == operation) {
9f95a23c 312 r = seq_read_bench(secondsToRun, num_ops, num_objects, concurrentios, prev_pid, no_verify);
7c673cae
FG
313 if (r != 0) goto out;
314 }
315 else if (OP_RAND_READ == operation) {
9f95a23c 316 r = rand_read_bench(secondsToRun, num_ops, num_objects, concurrentios, prev_pid, no_verify);
7c673cae
FG
317 if (r != 0) goto out;
318 }
319
320 if (OP_WRITE == operation && cleanup) {
321 r = fetch_bench_metadata(run_name_meta, &op_size, &object_size,
9f95a23c 322 &num_ops, &num_objects, &prev_pid);
7c673cae
FG
323 if (r < 0) {
324 if (r == -ENOENT)
325 cerr << "Should never happen: bench metadata missing for current run!" << std::endl;
326 goto out;
327 }
328
11fdf7f2 329 data.start_time = mono_clock::now();
7c673cae
FG
330 out(cout) << "Cleaning up (deleting benchmark objects)" << std::endl;
331
11fdf7f2 332 r = clean_up(num_objects, prev_pid, concurrentios);
7c673cae
FG
333 if (r != 0) goto out;
334
11fdf7f2
TL
335 timePassed = mono_clock::now() - data.start_time;
336 out(cout) << "Clean up completed and total clean up time :" << timePassed.count() << std::endl;
7c673cae
FG
337
338 // lastrun file
339 r = sync_remove(run_name_meta);
340 if (r != 0) goto out;
341 }
342
343 out:
344 if (formatter) {
345 formatter->close_section(); // bench
346 formatter->flush(*outstream);
347 *outstream << std::endl;
348 }
349 delete[] contentsChars;
350 return r;
351}
352
353struct lock_cond {
9f95a23c
TL
354 explicit lock_cond(ceph::mutex *_lock) : lock(_lock) {}
355 ceph::mutex *lock;
356 ceph::condition_variable cond;
7c673cae
FG
357};
358
359void _aio_cb(void *cb, void *arg) {
360 struct lock_cond *lc = (struct lock_cond *)arg;
11fdf7f2 361 lc->lock->lock();
9f95a23c 362 lc->cond.notify_all();
11fdf7f2 363 lc->lock->unlock();
7c673cae
FG
364}
365
366int ObjBencher::fetch_bench_metadata(const std::string& metadata_file,
367 uint64_t *op_size, uint64_t* object_size,
9f95a23c 368 int* num_ops, int* num_objects, int* prevPid) {
7c673cae
FG
369 int r = 0;
370 bufferlist object_data;
371
372 r = sync_read(metadata_file, object_data,
373 sizeof(int) * 2 + sizeof(size_t) * 2);
374 if (r <= 0) {
375 // treat an empty file as a file that does not exist
376 if (r == 0) {
377 r = -ENOENT;
378 }
379 return r;
380 }
11fdf7f2
TL
381 auto p = object_data.cbegin();
382 decode(*object_size, p);
9f95a23c 383 decode(*num_ops, p);
11fdf7f2 384 decode(*prevPid, p);
7c673cae 385 if (!p.end()) {
11fdf7f2 386 decode(*op_size, p);
7c673cae
FG
387 } else {
388 *op_size = *object_size;
389 }
9f95a23c
TL
390 unsigned ops_per_object = 1;
391 // make sure *op_size value is reasonable
392 if (*op_size > 0 && *object_size > *op_size) {
393 ops_per_object = *object_size / *op_size;
394 }
395 *num_objects = (*num_ops + ops_per_object - 1) / ops_per_object;
7c673cae
FG
396
397 return 0;
398}
399
400int ObjBencher::write_bench(int secondsToRun,
401 int concurrentios, const string& run_name_meta,
11fdf7f2 402 unsigned max_objects, int prev_pid) {
9f95a23c 403 if (concurrentios <= 0)
7c673cae 404 return -EINVAL;
9f95a23c 405
7c673cae
FG
406 if (!formatter) {
407 out(cout) << "Maintaining " << concurrentios << " concurrent writes of "
408 << data.op_size << " bytes to objects of size "
409 << data.object_size << " for up to "
410 << secondsToRun << " seconds or "
411 << max_objects << " objects"
412 << std::endl;
413 } else {
414 formatter->dump_format("concurrent_ios", "%d", concurrentios);
415 formatter->dump_format("object_size", "%d", data.object_size);
416 formatter->dump_format("op_size", "%d", data.op_size);
417 formatter->dump_format("seconds_to_run", "%d", secondsToRun);
418 formatter->dump_format("max_objects", "%d", max_objects);
419 }
420 bufferlist* newContents = 0;
421
11fdf7f2 422 std::string prefix = prev_pid ? generate_object_prefix(prev_pid) : generate_object_prefix();
7c673cae
FG
423 if (!formatter)
424 out(cout) << "Object prefix: " << prefix << std::endl;
425 else
426 formatter->dump_string("object_prefix", prefix);
427
428 std::vector<string> name(concurrentios);
429 std::string newName;
11fdf7f2 430 unique_ptr<bufferlist> contents[concurrentios];
7c673cae
FG
431 int r = 0;
432 bufferlist b_write;
433 lock_cond lc(&lock);
11fdf7f2
TL
434 double total_latency = 0;
435 std::vector<mono_time> start_times(concurrentios);
436 mono_time stopTime;
437 std::chrono::duration<double> timePassed;
7c673cae
FG
438
439 unsigned writes_per_object = 1;
440 if (data.op_size)
441 writes_per_object = data.object_size / data.op_size;
442
443 r = completions_init(concurrentios);
444
445 //set up writes so I can start them together
446 for (int i = 0; i<concurrentios; ++i) {
11fdf7f2
TL
447 name[i] = generate_object_name_fast(i / writes_per_object);
448 contents[i] = std::make_unique<bufferlist>();
7c673cae
FG
449 snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", i);
450 contents[i]->append(data.object_contents, data.op_size);
451 }
452
453 pthread_t print_thread;
454
455 pthread_create(&print_thread, NULL, ObjBencher::status_printer, (void *)this);
456 ceph_pthread_setname(print_thread, "write_stat");
9f95a23c 457 std::unique_lock locker{lock};
7c673cae 458 data.finished = 0;
11fdf7f2 459 data.start_time = mono_clock::now();
9f95a23c 460 locker.unlock();
7c673cae 461 for (int i = 0; i<concurrentios; ++i) {
11fdf7f2 462 start_times[i] = mono_clock::now();
7c673cae
FG
463 r = create_completion(i, _aio_cb, (void *)&lc);
464 if (r < 0)
465 goto ERR;
466 r = aio_write(name[i], i, *contents[i], data.op_size,
467 data.op_size * (i % writes_per_object));
11fdf7f2 468 if (r < 0) {
7c673cae
FG
469 goto ERR;
470 }
9f95a23c 471 locker.lock();
7c673cae
FG
472 ++data.started;
473 ++data.in_flight;
9f95a23c 474 locker.unlock();
7c673cae
FG
475 }
476
477 //keep on adding new writes as old ones complete until we've passed minimum time
478 int slot;
7c673cae
FG
479
480 //don't need locking for reads because other thread doesn't write
481
11fdf7f2 482 stopTime = data.start_time + std::chrono::seconds(secondsToRun);
7c673cae 483 slot = 0;
9f95a23c
TL
484 locker.lock();
485 while (data.finished < data.started) {
7c673cae
FG
486 bool found = false;
487 while (1) {
488 int old_slot = slot;
489 do {
490 if (completion_is_done(slot)) {
491 found = true;
492 break;
493 }
494 slot++;
495 if (slot == concurrentios) {
496 slot = 0;
497 }
498 } while (slot != old_slot);
499 if (found)
500 break;
9f95a23c 501 lc.cond.wait(locker);
7c673cae 502 }
9f95a23c 503 locker.unlock();
7c673cae
FG
504
505 completion_wait(slot);
9f95a23c 506 locker.lock();
7c673cae
FG
507 r = completion_ret(slot);
508 if (r != 0) {
9f95a23c 509 locker.unlock();
7c673cae
FG
510 goto ERR;
511 }
11fdf7f2
TL
512 data.cur_latency = mono_clock::now() - start_times[slot];
513 total_latency += data.cur_latency.count();
514 if( data.cur_latency.count() > data.max_latency)
515 data.max_latency = data.cur_latency.count();
516 if (data.cur_latency.count() < data.min_latency)
517 data.min_latency = data.cur_latency.count();
7c673cae 518 ++data.finished;
11fdf7f2 519 double delta = data.cur_latency.count() - data.avg_latency;
7c673cae 520 data.avg_latency = total_latency / data.finished;
11fdf7f2 521 data.latency_diff_sum += delta * (data.cur_latency.count() - data.avg_latency);
7c673cae 522 --data.in_flight;
9f95a23c 523 locker.unlock();
7c673cae 524 release_completion(slot);
7c673cae 525
9f95a23c
TL
526 if (!secondsToRun || mono_clock::now() >= stopTime) {
527 locker.lock();
528 continue;
529 }
530
531 if (data.op_size && max_objects &&
532 data.started >=
533 (int)((data.object_size * max_objects + data.op_size - 1) /
534 data.op_size)) {
535 locker.lock();
536 continue;
537 }
538
7c673cae 539 //write new stuff to backend
9f95a23c
TL
540
541 //create new contents and name on the heap, and fill them
542 newName = generate_object_name_fast(data.started / writes_per_object);
543 newContents = contents[slot].get();
544 snprintf(newContents->c_str(), data.op_size, "I'm the %16dth op!", data.started);
545 // we wrote to buffer, going around internal crc cache, so invalidate it now.
546 newContents->invalidate_crc();
547
11fdf7f2 548 start_times[slot] = mono_clock::now();
7c673cae
FG
549 r = create_completion(slot, _aio_cb, &lc);
550 if (r < 0)
551 goto ERR;
552 r = aio_write(newName, slot, *newContents, data.op_size,
553 data.op_size * (data.started % writes_per_object));
11fdf7f2 554 if (r < 0) {
7c673cae
FG
555 goto ERR;
556 }
557 name[slot] = newName;
9f95a23c 558 locker.lock();
7c673cae
FG
559 ++data.started;
560 ++data.in_flight;
7c673cae 561 }
9f95a23c 562 locker.unlock();
7c673cae 563
11fdf7f2 564 timePassed = mono_clock::now() - data.start_time;
9f95a23c 565 locker.lock();
7c673cae 566 data.done = true;
9f95a23c 567 locker.unlock();
7c673cae
FG
568
569 pthread_join(print_thread, NULL);
570
571 double bandwidth;
11fdf7f2
TL
572 bandwidth = ((double)data.finished)*((double)data.op_size) /
573 timePassed.count();
7c673cae
FG
574 bandwidth = bandwidth/(1024*1024); // we want it in MB/sec
575
11fdf7f2
TL
576 double bandwidth_stddev;
577 double iops_stddev;
578 double latency_stddev;
579 if (data.idata.bandwidth_cycles > 1) {
580 bandwidth_stddev = std::sqrt(data.idata.bandwidth_diff_sum / (data.idata.bandwidth_cycles - 1));
581 } else {
582 bandwidth_stddev = 0;
583 }
584 if (data.idata.iops_cycles > 1) {
585 iops_stddev = std::sqrt(data.idata.iops_diff_sum / (data.idata.iops_cycles - 1));
586 } else {
587 iops_stddev = 0;
588 }
589 if (data.finished > 1) {
590 latency_stddev = std::sqrt(data.latency_diff_sum / (data.finished - 1));
591 } else {
592 latency_stddev = 0;
593 }
594
7c673cae 595 if (!formatter) {
11fdf7f2 596 out(cout) << "Total time run: " << timePassed.count() << std::endl
7c673cae
FG
597 << "Total writes made: " << data.finished << std::endl
598 << "Write size: " << data.op_size << std::endl
9f95a23c 599 << "Object size: " << data.object_size << std::endl
7c673cae 600 << "Bandwidth (MB/sec): " << setprecision(6) << bandwidth << std::endl
11fdf7f2 601 << "Stddev Bandwidth: " << bandwidth_stddev << std::endl
7c673cae
FG
602 << "Max bandwidth (MB/sec): " << data.idata.max_bandwidth << std::endl
603 << "Min bandwidth (MB/sec): " << data.idata.min_bandwidth << std::endl
11fdf7f2
TL
604 << "Average IOPS: " << (int)(data.finished/timePassed.count()) << std::endl
605 << "Stddev IOPS: " << iops_stddev << std::endl
7c673cae
FG
606 << "Max IOPS: " << data.idata.max_iops << std::endl
607 << "Min IOPS: " << data.idata.min_iops << std::endl
608 << "Average Latency(s): " << data.avg_latency << std::endl
11fdf7f2 609 << "Stddev Latency(s): " << latency_stddev << std::endl
7c673cae
FG
610 << "Max latency(s): " << data.max_latency << std::endl
611 << "Min latency(s): " << data.min_latency << std::endl;
612 } else {
11fdf7f2 613 formatter->dump_format("total_time_run", "%f", timePassed.count());
7c673cae
FG
614 formatter->dump_format("total_writes_made", "%d", data.finished);
615 formatter->dump_format("write_size", "%d", data.op_size);
616 formatter->dump_format("object_size", "%d", data.object_size);
617 formatter->dump_format("bandwidth", "%f", bandwidth);
11fdf7f2 618 formatter->dump_format("stddev_bandwidth", "%f", bandwidth_stddev);
7c673cae
FG
619 formatter->dump_format("max_bandwidth", "%f", data.idata.max_bandwidth);
620 formatter->dump_format("min_bandwidth", "%f", data.idata.min_bandwidth);
11fdf7f2
TL
621 formatter->dump_format("average_iops", "%d", (int)(data.finished/timePassed.count()));
622 formatter->dump_format("stddev_iops", "%d", iops_stddev);
7c673cae
FG
623 formatter->dump_format("max_iops", "%d", data.idata.max_iops);
624 formatter->dump_format("min_iops", "%d", data.idata.min_iops);
625 formatter->dump_format("average_latency", "%f", data.avg_latency);
11fdf7f2 626 formatter->dump_format("stddev_latency", "%f", latency_stddev);
28e407b8 627 formatter->dump_format("max_latency", "%f", data.max_latency);
7c673cae
FG
628 formatter->dump_format("min_latency", "%f", data.min_latency);
629 }
630 //write object size/number data for read benchmarks
11fdf7f2 631 encode(data.object_size, b_write);
9f95a23c 632 encode(data.finished, b_write);
11fdf7f2
TL
633 encode(prev_pid ? prev_pid : getpid(), b_write);
634 encode(data.op_size, b_write);
7c673cae
FG
635
636 // persist meta-data for further cleanup or read
637 sync_write(run_name_meta, b_write, sizeof(int)*3);
638
639 completions_done();
7c673cae
FG
640
641 return 0;
642
643 ERR:
9f95a23c 644 locker.lock();
7c673cae 645 data.done = 1;
9f95a23c 646 locker.unlock();
7c673cae 647 pthread_join(print_thread, NULL);
7c673cae
FG
648 return r;
649}
650
9f95a23c
TL
651int ObjBencher::seq_read_bench(
652 int seconds_to_run, int num_ops, int num_objects,
653 int concurrentios, int pid, bool no_verify) {
654
7c673cae
FG
655 lock_cond lc(&lock);
656
9f95a23c 657 if (concurrentios <= 0)
7c673cae
FG
658 return -EINVAL;
659
660 std::vector<string> name(concurrentios);
661 std::string newName;
11fdf7f2 662 unique_ptr<bufferlist> contents[concurrentios];
7c673cae
FG
663 int index[concurrentios];
664 int errors = 0;
7c673cae
FG
665 double total_latency = 0;
666 int r = 0;
11fdf7f2
TL
667 std::vector<mono_time> start_times(concurrentios);
668 mono_clock::duration time_to_run = std::chrono::seconds(seconds_to_run);
669 std::chrono::duration<double> timePassed;
7c673cae
FG
670 sanitize_object_contents(&data, data.op_size); //clean it up once; subsequent
671 //changes will be safe because string length should remain the same
672
11fdf7f2 673 unsigned reads_per_object = 1;
7c673cae 674 if (data.op_size)
11fdf7f2 675 reads_per_object = data.object_size / data.op_size;
7c673cae
FG
676
677 r = completions_init(concurrentios);
678 if (r < 0)
679 return r;
680
681 //set up initial reads
682 for (int i = 0; i < concurrentios; ++i) {
11fdf7f2
TL
683 name[i] = generate_object_name_fast(i / reads_per_object, pid);
684 contents[i] = std::make_unique<bufferlist>();
7c673cae
FG
685 }
686
9f95a23c 687 std::unique_lock locker{lock};
7c673cae 688 data.finished = 0;
11fdf7f2 689 data.start_time = mono_clock::now();
9f95a23c 690 locker.unlock();
7c673cae
FG
691
692 pthread_t print_thread;
693 pthread_create(&print_thread, NULL, status_printer, (void *)this);
694 ceph_pthread_setname(print_thread, "seq_read_stat");
695
11fdf7f2 696 mono_time finish_time = data.start_time + time_to_run;
7c673cae
FG
697 //start initial reads
698 for (int i = 0; i < concurrentios; ++i) {
699 index[i] = i;
11fdf7f2 700 start_times[i] = mono_clock::now();
7c673cae 701 create_completion(i, _aio_cb, (void *)&lc);
11fdf7f2
TL
702 r = aio_read(name[i], i, contents[i].get(), data.op_size,
703 data.op_size * (i % reads_per_object));
704 if (r < 0) {
7c673cae
FG
705 cerr << "r = " << r << std::endl;
706 goto ERR;
707 }
9f95a23c 708 locker.lock();
7c673cae
FG
709 ++data.started;
710 ++data.in_flight;
9f95a23c 711 locker.unlock();
7c673cae
FG
712 }
713
714 //keep on adding new reads as old ones complete
715 int slot;
716 bufferlist *cur_contents;
717
718 slot = 0;
9f95a23c
TL
719 while (data.finished < data.started) {
720 locker.lock();
7c673cae
FG
721 int old_slot = slot;
722 bool found = false;
723 while (1) {
724 do {
725 if (completion_is_done(slot)) {
726 found = true;
727 break;
728 }
729 slot++;
730 if (slot == concurrentios) {
731 slot = 0;
732 }
733 } while (slot != old_slot);
734 if (found) {
735 break;
736 }
9f95a23c 737 lc.cond.wait(locker);
7c673cae
FG
738 }
739
740 // calculate latency here, so memcmp doesn't inflate it
11fdf7f2 741 data.cur_latency = mono_clock::now() - start_times[slot];
7c673cae 742
11fdf7f2 743 cur_contents = contents[slot].get();
7c673cae 744 int current_index = index[slot];
9f95a23c 745
7c673cae
FG
746 // invalidate internal crc cache
747 cur_contents->invalidate_crc();
9f95a23c 748
7c673cae
FG
749 if (!no_verify) {
750 snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", current_index);
9f95a23c 751 if ( (cur_contents->length() != data.op_size) ||
7c673cae
FG
752 (memcmp(data.object_contents, cur_contents->c_str(), data.op_size) != 0) ) {
753 cerr << name[slot] << " is not correct!" << std::endl;
754 ++errors;
755 }
756 }
757
9f95a23c
TL
758 bool start_new_read = (seconds_to_run && mono_clock::now() < finish_time) &&
759 num_ops > data.started;
760 if (start_new_read) {
761 newName = generate_object_name_fast(data.started / reads_per_object, pid);
762 index[slot] = data.started;
763 }
764
765 locker.unlock();
7c673cae 766 completion_wait(slot);
9f95a23c 767 locker.lock();
7c673cae
FG
768 r = completion_ret(slot);
769 if (r < 0) {
770 cerr << "read got " << r << std::endl;
9f95a23c 771 locker.unlock();
7c673cae
FG
772 goto ERR;
773 }
11fdf7f2
TL
774 total_latency += data.cur_latency.count();
775 if (data.cur_latency.count() > data.max_latency)
776 data.max_latency = data.cur_latency.count();
777 if (data.cur_latency.count() < data.min_latency)
778 data.min_latency = data.cur_latency.count();
7c673cae
FG
779 ++data.finished;
780 data.avg_latency = total_latency / data.finished;
781 --data.in_flight;
9f95a23c 782 locker.unlock();
7c673cae
FG
783 release_completion(slot);
784
9f95a23c
TL
785 if (!start_new_read)
786 continue;
787
7c673cae 788 //start new read and check data if requested
11fdf7f2 789 start_times[slot] = mono_clock::now();
7c673cae 790 create_completion(slot, _aio_cb, (void *)&lc);
11fdf7f2
TL
791 r = aio_read(newName, slot, contents[slot].get(), data.op_size,
792 data.op_size * (data.started % reads_per_object));
7c673cae
FG
793 if (r < 0) {
794 goto ERR;
795 }
9f95a23c 796 locker.lock();
7c673cae
FG
797 ++data.started;
798 ++data.in_flight;
9f95a23c 799 locker.unlock();
7c673cae
FG
800 name[slot] = newName;
801 }
802
11fdf7f2 803 timePassed = mono_clock::now() - data.start_time;
9f95a23c 804 locker.lock();
7c673cae 805 data.done = true;
9f95a23c 806 locker.unlock();
7c673cae
FG
807
808 pthread_join(print_thread, NULL);
809
810 double bandwidth;
11fdf7f2 811 bandwidth = ((double)data.finished)*((double)data.op_size)/timePassed.count();
7c673cae 812 bandwidth = bandwidth/(1024*1024); // we want it in MB/sec
9f95a23c 813
11fdf7f2
TL
814 double iops_stddev;
815 if (data.idata.iops_cycles > 1) {
816 iops_stddev = std::sqrt(data.idata.iops_diff_sum / (data.idata.iops_cycles - 1));
817 } else {
818 iops_stddev = 0;
819 }
7c673cae
FG
820
821 if (!formatter) {
11fdf7f2 822 out(cout) << "Total time run: " << timePassed.count() << std::endl
7c673cae
FG
823 << "Total reads made: " << data.finished << std::endl
824 << "Read size: " << data.op_size << std::endl
825 << "Object size: " << data.object_size << std::endl
826 << "Bandwidth (MB/sec): " << setprecision(6) << bandwidth << std::endl
11fdf7f2
TL
827 << "Average IOPS: " << (int)(data.finished/timePassed.count()) << std::endl
828 << "Stddev IOPS: " << iops_stddev << std::endl
7c673cae
FG
829 << "Max IOPS: " << data.idata.max_iops << std::endl
830 << "Min IOPS: " << data.idata.min_iops << std::endl
831 << "Average Latency(s): " << data.avg_latency << std::endl
832 << "Max latency(s): " << data.max_latency << std::endl
833 << "Min latency(s): " << data.min_latency << std::endl;
834 } else {
11fdf7f2 835 formatter->dump_format("total_time_run", "%f", timePassed.count());
7c673cae
FG
836 formatter->dump_format("total_reads_made", "%d", data.finished);
837 formatter->dump_format("read_size", "%d", data.op_size);
838 formatter->dump_format("object_size", "%d", data.object_size);
839 formatter->dump_format("bandwidth", "%f", bandwidth);
11fdf7f2
TL
840 formatter->dump_format("average_iops", "%d", (int)(data.finished/timePassed.count()));
841 formatter->dump_format("stddev_iops", "%f", iops_stddev);
7c673cae
FG
842 formatter->dump_format("max_iops", "%d", data.idata.max_iops);
843 formatter->dump_format("min_iops", "%d", data.idata.min_iops);
844 formatter->dump_format("average_latency", "%f", data.avg_latency);
845 formatter->dump_format("max_latency", "%f", data.max_latency);
846 formatter->dump_format("min_latency", "%f", data.min_latency);
847 }
848
849 completions_done();
850
851 return (errors > 0 ? -EIO : 0);
852
853 ERR:
9f95a23c 854 locker.lock();
7c673cae 855 data.done = 1;
9f95a23c 856 locker.unlock();
7c673cae
FG
857 pthread_join(print_thread, NULL);
858 return r;
859}
860
9f95a23c
TL
861int ObjBencher::rand_read_bench(
862 int seconds_to_run, int num_ops, int num_objects,
863 int concurrentios, int pid, bool no_verify) {
864
7c673cae
FG
865 lock_cond lc(&lock);
866
867 if (concurrentios <= 0)
868 return -EINVAL;
869
870 std::vector<string> name(concurrentios);
871 std::string newName;
11fdf7f2 872 unique_ptr<bufferlist> contents[concurrentios];
7c673cae
FG
873 int index[concurrentios];
874 int errors = 0;
7c673cae 875 int r = 0;
11fdf7f2
TL
876 double total_latency = 0;
877 std::vector<mono_time> start_times(concurrentios);
878 mono_clock::duration time_to_run = std::chrono::seconds(seconds_to_run);
879 std::chrono::duration<double> timePassed;
7c673cae
FG
880 sanitize_object_contents(&data, data.op_size); //clean it up once; subsequent
881 //changes will be safe because string length should remain the same
882
11fdf7f2 883 unsigned reads_per_object = 1;
7c673cae 884 if (data.op_size)
11fdf7f2 885 reads_per_object = data.object_size / data.op_size;
7c673cae
FG
886
887 srand (time(NULL));
888
889 r = completions_init(concurrentios);
890 if (r < 0)
891 return r;
892
893 //set up initial reads
894 for (int i = 0; i < concurrentios; ++i) {
11fdf7f2
TL
895 name[i] = generate_object_name_fast(i / reads_per_object, pid);
896 contents[i] = std::make_unique<bufferlist>();
7c673cae
FG
897 }
898
9f95a23c 899 unique_lock locker{lock};
7c673cae 900 data.finished = 0;
11fdf7f2 901 data.start_time = mono_clock::now();
9f95a23c 902 locker.unlock();
7c673cae
FG
903
904 pthread_t print_thread;
905 pthread_create(&print_thread, NULL, status_printer, (void *)this);
906 ceph_pthread_setname(print_thread, "rand_read_stat");
907
11fdf7f2 908 mono_time finish_time = data.start_time + time_to_run;
7c673cae
FG
909 //start initial reads
910 for (int i = 0; i < concurrentios; ++i) {
911 index[i] = i;
11fdf7f2 912 start_times[i] = mono_clock::now();
7c673cae 913 create_completion(i, _aio_cb, (void *)&lc);
11fdf7f2
TL
914 r = aio_read(name[i], i, contents[i].get(), data.op_size,
915 data.op_size * (i % reads_per_object));
916 if (r < 0) {
7c673cae
FG
917 cerr << "r = " << r << std::endl;
918 goto ERR;
919 }
9f95a23c 920 locker.lock();
7c673cae
FG
921 ++data.started;
922 ++data.in_flight;
9f95a23c 923 locker.unlock();
7c673cae
FG
924 }
925
926 //keep on adding new reads as old ones complete
927 int slot;
928 bufferlist *cur_contents;
929 int rand_id;
930
931 slot = 0;
9f95a23c
TL
932 while (data.finished < data.started) {
933 locker.lock();
7c673cae
FG
934 int old_slot = slot;
935 bool found = false;
936 while (1) {
937 do {
938 if (completion_is_done(slot)) {
939 found = true;
940 break;
941 }
942 slot++;
943 if (slot == concurrentios) {
944 slot = 0;
945 }
946 } while (slot != old_slot);
947 if (found) {
948 break;
949 }
9f95a23c 950 lc.cond.wait(locker);
7c673cae
FG
951 }
952
953 // calculate latency here, so memcmp doesn't inflate it
11fdf7f2 954 data.cur_latency = mono_clock::now() - start_times[slot];
7c673cae 955
9f95a23c 956 locker.unlock();
7c673cae
FG
957
958 int current_index = index[slot];
11fdf7f2 959 cur_contents = contents[slot].get();
7c673cae 960 completion_wait(slot);
9f95a23c 961 locker.lock();
7c673cae
FG
962 r = completion_ret(slot);
963 if (r < 0) {
964 cerr << "read got " << r << std::endl;
9f95a23c 965 locker.unlock();
7c673cae
FG
966 goto ERR;
967 }
968
11fdf7f2
TL
969 total_latency += data.cur_latency.count();
970 if (data.cur_latency.count() > data.max_latency)
971 data.max_latency = data.cur_latency.count();
972 if (data.cur_latency.count() < data.min_latency)
973 data.min_latency = data.cur_latency.count();
7c673cae
FG
974 ++data.finished;
975 data.avg_latency = total_latency / data.finished;
976 --data.in_flight;
9f95a23c 977
7c673cae
FG
978 if (!no_verify) {
979 snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", current_index);
9f95a23c 980 if ((cur_contents->length() != data.op_size) ||
7c673cae
FG
981 (memcmp(data.object_contents, cur_contents->c_str(), data.op_size) != 0)) {
982 cerr << name[slot] << " is not correct!" << std::endl;
983 ++errors;
984 }
9f95a23c
TL
985 }
986
987 locker.unlock();
988 release_completion(slot);
989
990 if (!seconds_to_run || mono_clock::now() >= finish_time)
991 continue;
992
993 //start new read and check data if requested
7c673cae 994
9f95a23c 995 rand_id = rand() % num_ops;
11fdf7f2 996 newName = generate_object_name_fast(rand_id / reads_per_object, pid);
7c673cae 997 index[slot] = rand_id;
7c673cae
FG
998
999 // invalidate internal crc cache
1000 cur_contents->invalidate_crc();
1001
11fdf7f2 1002 start_times[slot] = mono_clock::now();
7c673cae 1003 create_completion(slot, _aio_cb, (void *)&lc);
11fdf7f2
TL
1004 r = aio_read(newName, slot, contents[slot].get(), data.op_size,
1005 data.op_size * (rand_id % reads_per_object));
7c673cae
FG
1006 if (r < 0) {
1007 goto ERR;
1008 }
9f95a23c 1009 locker.lock();
7c673cae
FG
1010 ++data.started;
1011 ++data.in_flight;
9f95a23c 1012 locker.unlock();
7c673cae
FG
1013 name[slot] = newName;
1014 }
1015
11fdf7f2 1016 timePassed = mono_clock::now() - data.start_time;
9f95a23c 1017 locker.lock();
7c673cae 1018 data.done = true;
9f95a23c 1019 locker.unlock();
7c673cae
FG
1020
1021 pthread_join(print_thread, NULL);
1022
1023 double bandwidth;
11fdf7f2 1024 bandwidth = ((double)data.finished)*((double)data.op_size)/timePassed.count();
7c673cae 1025 bandwidth = bandwidth/(1024*1024); // we want it in MB/sec
9f95a23c 1026
11fdf7f2
TL
1027 double iops_stddev;
1028 if (data.idata.iops_cycles > 1) {
1029 iops_stddev = std::sqrt(data.idata.iops_diff_sum / (data.idata.iops_cycles - 1));
1030 } else {
1031 iops_stddev = 0;
1032 }
7c673cae
FG
1033
1034 if (!formatter) {
11fdf7f2 1035 out(cout) << "Total time run: " << timePassed.count() << std::endl
7c673cae
FG
1036 << "Total reads made: " << data.finished << std::endl
1037 << "Read size: " << data.op_size << std::endl
1038 << "Object size: " << data.object_size << std::endl
1039 << "Bandwidth (MB/sec): " << setprecision(6) << bandwidth << std::endl
11fdf7f2
TL
1040 << "Average IOPS: " << (int)(data.finished/timePassed.count()) << std::endl
1041 << "Stddev IOPS: " << iops_stddev << std::endl
7c673cae
FG
1042 << "Max IOPS: " << data.idata.max_iops << std::endl
1043 << "Min IOPS: " << data.idata.min_iops << std::endl
1044 << "Average Latency(s): " << data.avg_latency << std::endl
1045 << "Max latency(s): " << data.max_latency << std::endl
1046 << "Min latency(s): " << data.min_latency << std::endl;
1047 } else {
11fdf7f2 1048 formatter->dump_format("total_time_run", "%f", timePassed.count());
7c673cae
FG
1049 formatter->dump_format("total_reads_made", "%d", data.finished);
1050 formatter->dump_format("read_size", "%d", data.op_size);
1051 formatter->dump_format("object_size", "%d", data.object_size);
1052 formatter->dump_format("bandwidth", "%f", bandwidth);
11fdf7f2
TL
1053 formatter->dump_format("average_iops", "%d", (int)(data.finished/timePassed.count()));
1054 formatter->dump_format("stddev_iops", "%f", iops_stddev);
7c673cae
FG
1055 formatter->dump_format("max_iops", "%d", data.idata.max_iops);
1056 formatter->dump_format("min_iops", "%d", data.idata.min_iops);
1057 formatter->dump_format("average_latency", "%f", data.avg_latency);
1058 formatter->dump_format("max_latency", "%f", data.max_latency);
1059 formatter->dump_format("min_latency", "%f", data.min_latency);
1060 }
1061 completions_done();
1062
1063 return (errors > 0 ? -EIO : 0);
1064
1065 ERR:
9f95a23c 1066 locker.lock();
7c673cae 1067 data.done = 1;
9f95a23c 1068 locker.unlock();
7c673cae
FG
1069 pthread_join(print_thread, NULL);
1070 return r;
1071}
1072
1073int ObjBencher::clean_up(const std::string& orig_prefix, int concurrentios, const std::string& run_name) {
1074 int r = 0;
1075 uint64_t op_size, object_size;
9f95a23c 1076 int num_ops, num_objects;
7c673cae
FG
1077 int prevPid;
1078
1079 // default meta object if user does not specify one
1080 const std::string run_name_meta = (run_name.empty() ? BENCH_LASTRUN_METADATA : run_name);
1081 const std::string prefix = (orig_prefix.empty() ? generate_object_prefix_nopid() : orig_prefix);
1082
1083 if (prefix.substr(0, BENCH_PREFIX.length()) != BENCH_PREFIX) {
1084 cerr << "Specified --prefix invalid, it must begin with \"" << BENCH_PREFIX << "\"" << std::endl;
1085 return -EINVAL;
1086 }
1087
1088 std::list<Object> unfiltered_objects;
1089 std::set<std::string> meta_namespaces, all_namespaces;
1090
1091 // If caller set all_nspaces this will be searching
1092 // across multiple namespaces.
1093 while (true) {
1094 bool objects_remain = get_objects(&unfiltered_objects, 20);
1095 if (!objects_remain)
1096 break;
1097
1098 std::list<Object>::const_iterator i = unfiltered_objects.begin();
1099 for ( ; i != unfiltered_objects.end(); ++i) {
1100 if (i->first == run_name_meta) {
1101 meta_namespaces.insert(i->second);
1102 }
1103 if (i->first.substr(0, prefix.length()) == prefix) {
1104 all_namespaces.insert(i->second);
1105 }
1106 }
1107 }
1108
1109 std::set<std::string>::const_iterator i = all_namespaces.begin();
1110 for ( ; i != all_namespaces.end(); ++i) {
1111 set_namespace(*i);
1112
1113 // if no metadata file found we should try to do a linear search on the prefix
1114 if (meta_namespaces.find(*i) == meta_namespaces.end()) {
1115 int r = clean_up_slow(prefix, concurrentios);
1116 if (r < 0) {
1117 cerr << "clean_up_slow error r= " << r << std::endl;
1118 return r;
1119 }
1120 continue;
1121 }
1122
9f95a23c 1123 r = fetch_bench_metadata(run_name_meta, &op_size, &object_size, &num_ops, &num_objects, &prevPid);
7c673cae
FG
1124 if (r < 0) {
1125 return r;
1126 }
1127
1128 r = clean_up(num_objects, prevPid, concurrentios);
1129 if (r != 0) return r;
1130
1131 r = sync_remove(run_name_meta);
1132 if (r != 0) return r;
1133 }
1134
1135 return 0;
1136}
1137
1138int ObjBencher::clean_up(int num_objects, int prevPid, int concurrentios) {
1139 lock_cond lc(&lock);
9f95a23c
TL
1140
1141 if (concurrentios <= 0)
7c673cae
FG
1142 return -EINVAL;
1143
1144 std::vector<string> name(concurrentios);
1145 std::string newName;
1146 int r = 0;
7c673cae
FG
1147 int slot = 0;
1148
9f95a23c 1149 unique_lock locker{lock};
7c673cae
FG
1150 data.done = false;
1151 data.in_flight = 0;
1152 data.started = 0;
1153 data.finished = 0;
9f95a23c 1154 locker.unlock();
7c673cae
FG
1155
1156 // don't start more completions than files
1157 if (num_objects == 0) {
1158 return 0;
1159 } else if (num_objects < concurrentios) {
1160 concurrentios = num_objects;
1161 }
1162
1163 r = completions_init(concurrentios);
1164 if (r < 0)
1165 return r;
1166
1167 //set up initial removes
1168 for (int i = 0; i < concurrentios; ++i) {
11fdf7f2 1169 name[i] = generate_object_name_fast(i, prevPid);
7c673cae
FG
1170 }
1171
1172 //start initial removes
1173 for (int i = 0; i < concurrentios; ++i) {
1174 create_completion(i, _aio_cb, (void *)&lc);
1175 r = aio_remove(name[i], i);
1176 if (r < 0) { //naughty, doesn't clean up heap
1177 cerr << "r = " << r << std::endl;
1178 goto ERR;
1179 }
9f95a23c 1180 locker.lock();
7c673cae
FG
1181 ++data.started;
1182 ++data.in_flight;
9f95a23c 1183 locker.unlock();
7c673cae
FG
1184 }
1185
1186 //keep on adding new removes as old ones complete
9f95a23c
TL
1187 while (data.finished < data.started) {
1188 locker.lock();
7c673cae
FG
1189 int old_slot = slot;
1190 bool found = false;
1191 while (1) {
1192 do {
1193 if (completion_is_done(slot)) {
1194 found = true;
1195 break;
1196 }
1197 slot++;
1198 if (slot == concurrentios) {
1199 slot = 0;
1200 }
1201 } while (slot != old_slot);
1202 if (found) {
1203 break;
1204 }
9f95a23c 1205 lc.cond.wait(locker);
7c673cae 1206 }
9f95a23c 1207 locker.unlock();
7c673cae 1208 completion_wait(slot);
9f95a23c 1209 locker.lock();
7c673cae
FG
1210 r = completion_ret(slot);
1211 if (r != 0 && r != -ENOENT) { // file does not exist
1212 cerr << "remove got " << r << std::endl;
9f95a23c 1213 locker.unlock();
7c673cae
FG
1214 goto ERR;
1215 }
1216 ++data.finished;
1217 --data.in_flight;
9f95a23c 1218 locker.unlock();
7c673cae
FG
1219 release_completion(slot);
1220
9f95a23c
TL
1221 if (data.started >= num_objects)
1222 continue;
1223
7c673cae 1224 //start new remove and check data if requested
9f95a23c 1225 newName = generate_object_name_fast(data.started, prevPid);
7c673cae
FG
1226 create_completion(slot, _aio_cb, (void *)&lc);
1227 r = aio_remove(newName, slot);
1228 if (r < 0) {
1229 goto ERR;
1230 }
9f95a23c 1231 locker.lock();
7c673cae
FG
1232 ++data.started;
1233 ++data.in_flight;
9f95a23c 1234 locker.unlock();
7c673cae
FG
1235 name[slot] = newName;
1236 }
1237
9f95a23c 1238 locker.lock();
7c673cae 1239 data.done = true;
9f95a23c 1240 locker.unlock();
7c673cae
FG
1241
1242 completions_done();
1243
1244 out(cout) << "Removed " << data.finished << " object" << (data.finished != 1 ? "s" : "") << std::endl;
1245
1246 return 0;
1247
1248 ERR:
9f95a23c 1249 locker.lock();
7c673cae 1250 data.done = 1;
9f95a23c 1251 locker.unlock();
7c673cae
FG
1252 return r;
1253}
1254
1255/**
1256 * Return objects from the datastore which match a prefix.
1257 *
1258 * Clears the list and populates it with any objects which match the
1259 * prefix. The list is guaranteed to have at least one item when the
1260 * function returns true.
1261 *
1262 * @param prefix the prefix to match against
1263 * @param objects [out] return list of objects
1264 * @returns true if there are any objects in the store which match
1265 * the prefix, false if there are no more
1266 */
1267bool ObjBencher::more_objects_matching_prefix(const std::string& prefix, std::list<Object>* objects) {
1268 std::list<Object> unfiltered_objects;
1269
1270 objects->clear();
1271
1272 while (objects->empty()) {
1273 bool objects_remain = get_objects(&unfiltered_objects, 20);
1274 if (!objects_remain)
1275 return false;
1276
1277 std::list<Object>::const_iterator i = unfiltered_objects.begin();
1278 for ( ; i != unfiltered_objects.end(); ++i) {
1279 if (i->first.substr(0, prefix.length()) == prefix) {
1280 objects->push_back(*i);
1281 }
1282 }
1283 }
1284
1285 return true;
1286}
1287
1288int ObjBencher::clean_up_slow(const std::string& prefix, int concurrentios) {
1289 lock_cond lc(&lock);
1290
1291 if (concurrentios <= 0)
1292 return -EINVAL;
1293
1294 std::vector<Object> name(concurrentios);
1295 Object newName;
1296 int r = 0;
7c673cae
FG
1297 int slot = 0;
1298 std::list<Object> objects;
1299 bool objects_remain = true;
1300
9f95a23c 1301 std::unique_lock locker{lock};
7c673cae
FG
1302 data.done = false;
1303 data.in_flight = 0;
1304 data.started = 0;
1305 data.finished = 0;
9f95a23c 1306 locker.unlock();
7c673cae
FG
1307
1308 out(cout) << "Warning: using slow linear search" << std::endl;
1309
1310 r = completions_init(concurrentios);
1311 if (r < 0)
1312 return r;
1313
1314 //set up initial removes
1315 for (int i = 0; i < concurrentios; ++i) {
1316 if (objects.empty()) {
1317 // if there are fewer objects than concurrent ios, don't generate extras
1318 bool objects_found = more_objects_matching_prefix(prefix, &objects);
1319 if (!objects_found) {
1320 concurrentios = i;
1321 objects_remain = false;
1322 break;
1323 }
1324 }
1325
1326 name[i] = objects.front();
1327 objects.pop_front();
1328 }
1329
1330 //start initial removes
1331 for (int i = 0; i < concurrentios; ++i) {
1332 create_completion(i, _aio_cb, (void *)&lc);
1333 set_namespace(name[i].second);
1334 r = aio_remove(name[i].first, i);
1335 if (r < 0) { //naughty, doesn't clean up heap
1336 cerr << "r = " << r << std::endl;
1337 goto ERR;
1338 }
9f95a23c 1339 locker.lock();
7c673cae
FG
1340 ++data.started;
1341 ++data.in_flight;
9f95a23c 1342 locker.unlock();
7c673cae
FG
1343 }
1344
1345 //keep on adding new removes as old ones complete
1346 while (objects_remain) {
9f95a23c 1347 locker.lock();
7c673cae
FG
1348 int old_slot = slot;
1349 bool found = false;
1350 while (1) {
1351 do {
1352 if (completion_is_done(slot)) {
1353 found = true;
1354 break;
1355 }
1356 slot++;
1357 if (slot == concurrentios) {
1358 slot = 0;
1359 }
1360 } while (slot != old_slot);
1361 if (found) {
1362 break;
1363 }
9f95a23c 1364 lc.cond.wait(locker);
7c673cae 1365 }
9f95a23c 1366 locker.unlock();
7c673cae
FG
1367
1368 // get more objects if necessary
1369 if (objects.empty()) {
1370 objects_remain = more_objects_matching_prefix(prefix, &objects);
1371 // quit if there are no more
1372 if (!objects_remain) {
1373 break;
1374 }
1375 }
1376
1377 // get the next object
1378 newName = objects.front();
1379 objects.pop_front();
1380
1381 completion_wait(slot);
9f95a23c 1382 locker.lock();
7c673cae
FG
1383 r = completion_ret(slot);
1384 if (r != 0 && r != -ENOENT) { // file does not exist
1385 cerr << "remove got " << r << std::endl;
9f95a23c 1386 locker.unlock();
7c673cae
FG
1387 goto ERR;
1388 }
1389 ++data.finished;
1390 --data.in_flight;
9f95a23c 1391 locker.unlock();
7c673cae
FG
1392 release_completion(slot);
1393
1394 //start new remove and check data if requested
1395 create_completion(slot, _aio_cb, (void *)&lc);
1396 set_namespace(newName.second);
1397 r = aio_remove(newName.first, slot);
1398 if (r < 0) {
1399 goto ERR;
1400 }
9f95a23c 1401 locker.lock();
7c673cae
FG
1402 ++data.started;
1403 ++data.in_flight;
9f95a23c 1404 locker.unlock();
7c673cae
FG
1405 name[slot] = newName;
1406 }
1407
1408 //wait for final removes to complete
1409 while (data.finished < data.started) {
1410 slot = data.finished % concurrentios;
1411 completion_wait(slot);
9f95a23c 1412 locker.lock();
7c673cae
FG
1413 r = completion_ret(slot);
1414 if (r != 0 && r != -ENOENT) { // file does not exist
1415 cerr << "remove got " << r << std::endl;
9f95a23c 1416 locker.unlock();
7c673cae
FG
1417 goto ERR;
1418 }
1419 ++data.finished;
1420 --data.in_flight;
1421 release_completion(slot);
9f95a23c 1422 locker.unlock();
7c673cae
FG
1423 }
1424
9f95a23c 1425 locker.lock();
7c673cae 1426 data.done = true;
9f95a23c 1427 locker.unlock();
7c673cae
FG
1428
1429 completions_done();
1430
1431 out(cout) << "Removed " << data.finished << " object" << (data.finished != 1 ? "s" : "") << std::endl;
1432
1433 return 0;
1434
1435 ERR:
9f95a23c 1436 locker.lock();
7c673cae 1437 data.done = 1;
9f95a23c 1438 locker.unlock();
7c673cae
FG
1439 return -EIO;
1440}