]> git.proxmox.com Git - ceph.git/blame - ceph/src/common/obj_bencher.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / common / obj_bencher.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2009 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 * Series of functions to test your rados installation. Notice
14 * that this code is not terribly robust -- for instance, if you
15 * try and bench on a pool you don't have permission to access
16 * it will just loop forever.
17 */
18#include "include/compat.h"
19#include <pthread.h>
20#include "common/Cond.h"
21#include "obj_bencher.h"
22
7c673cae
FG
23const std::string BENCH_LASTRUN_METADATA = "benchmark_last_metadata";
24const std::string BENCH_PREFIX = "benchmark_data";
11fdf7f2
TL
25const std::string BENCH_OBJ_NAME = BENCH_PREFIX + "_%s_%d_object%d";
26
7c673cae
FG
27static char cached_hostname[30] = {0};
28int cached_pid = 0;
29
30static std::string generate_object_prefix_nopid() {
31 if (cached_hostname[0] == 0) {
32 gethostname(cached_hostname, sizeof(cached_hostname)-1);
33 cached_hostname[sizeof(cached_hostname)-1] = 0;
34 }
35
36 std::ostringstream oss;
37 oss << BENCH_PREFIX << "_" << cached_hostname;
38 return oss.str();
39}
40
41static std::string generate_object_prefix(int pid = 0) {
42 if (pid)
43 cached_pid = pid;
44 else if (!cached_pid)
45 cached_pid = getpid();
46
47 std::ostringstream oss;
48 oss << generate_object_prefix_nopid() << "_" << cached_pid;
49 return oss.str();
50}
51
11fdf7f2
TL
52// this is 8x faster than previous impl based on chained, deduped functions call
53static std::string generate_object_name_fast(int objnum, int pid = 0)
7c673cae 54{
11fdf7f2
TL
55 if (cached_hostname[0] == 0) {
56 gethostname(cached_hostname, sizeof(cached_hostname)-1);
57 cached_hostname[sizeof(cached_hostname)-1] = 0;
58 }
59
60 if (pid)
61 cached_pid = pid;
62 else if (!cached_pid)
63 cached_pid = getpid();
64
65 char name[512];
66 int n = snprintf(&name[0], sizeof(name), BENCH_OBJ_NAME.c_str(), cached_hostname, cached_pid, objnum);
67 ceph_assert(n > 0 && n < (int)sizeof(name));
68 return std::string(&name[0], (size_t)n);
7c673cae
FG
69}
70
71static void sanitize_object_contents (bench_data *data, size_t length) {
72 memset(data->object_contents, 'z', length);
73}
74
75ostream& ObjBencher::out(ostream& os, utime_t& t)
76{
77 if (show_time)
78 return t.localtime(os) << " ";
79 else
80 return os;
81}
82
83ostream& ObjBencher::out(ostream& os)
84{
85 utime_t cur_time = ceph_clock_now();
86 return out(os, cur_time);
87}
88
89void *ObjBencher::status_printer(void *_bencher) {
90 ObjBencher *bencher = static_cast<ObjBencher *>(_bencher);
91 bench_data& data = bencher->data;
92 Formatter *formatter = bencher->formatter;
93 ostream *outstream = bencher->outstream;
94 Cond cond;
95 int i = 0;
96 int previous_writes = 0;
97 int cycleSinceChange = 0;
98 double bandwidth;
11fdf7f2
TL
99 int iops = 0;
100 mono_clock::duration ONE_SECOND = std::chrono::seconds(1);
101 bencher->lock.lock();
7c673cae
FG
102 if (formatter)
103 formatter->open_array_section("datas");
104 while(!data.done) {
11fdf7f2
TL
105 mono_time cur_time = mono_clock::now();
106 utime_t t = ceph_clock_now();
7c673cae
FG
107
108 if (i % 20 == 0 && !formatter) {
109 if (i > 0)
11fdf7f2
TL
110 t.localtime(cout)
111 << " min lat: " << data.min_latency
7c673cae
FG
112 << " max lat: " << data.max_latency
113 << " avg lat: " << data.avg_latency << std::endl;
114 //I'm naughty and don't reset the fill
11fdf7f2 115 bencher->out(cout, t) << setfill(' ')
7c673cae
FG
116 << setw(5) << "sec"
117 << setw(8) << "Cur ops"
118 << setw(10) << "started"
119 << setw(10) << "finished"
120 << setw(10) << "avg MB/s"
121 << setw(10) << "cur MB/s"
122 << setw(12) << "last lat(s)"
123 << setw(12) << "avg lat(s)" << std::endl;
124 }
125 if (cycleSinceChange)
126 bandwidth = (double)(data.finished - previous_writes)
127 * (data.op_size)
128 / (1024*1024)
129 / cycleSinceChange;
130 else
131 bandwidth = -1;
132
133 if (!std::isnan(bandwidth) && bandwidth > -1) {
134 if (bandwidth > data.idata.max_bandwidth)
135 data.idata.max_bandwidth = bandwidth;
136 if (bandwidth < data.idata.min_bandwidth)
137 data.idata.min_bandwidth = bandwidth;
138
11fdf7f2
TL
139 ++data.idata.bandwidth_cycles;
140 double delta = bandwidth - data.idata.avg_bandwidth;
141 data.idata.avg_bandwidth += delta / data.idata.bandwidth_cycles;
142 data.idata.bandwidth_diff_sum += delta * (bandwidth - data.idata.avg_bandwidth);
7c673cae
FG
143 }
144
145 if (cycleSinceChange)
146 iops = (double)(data.finished - previous_writes)
147 / cycleSinceChange;
148 else
149 iops = -1;
150
151 if (!std::isnan(iops) && iops > -1) {
152 if (iops > data.idata.max_iops)
153 data.idata.max_iops = iops;
154 if (iops < data.idata.min_iops)
155 data.idata.min_iops = iops;
156
11fdf7f2
TL
157 ++data.idata.iops_cycles;
158 double delta = iops - data.idata.avg_iops;
159 data.idata.avg_iops += delta / data.idata.iops_cycles;
160 data.idata.iops_diff_sum += delta * (iops - data.idata.avg_iops);
7c673cae
FG
161 }
162
163 if (formatter)
164 formatter->open_object_section("data");
165
11fdf7f2
TL
166 // elapsed will be in seconds, by default
167 std::chrono::duration<double> elapsed = cur_time - data.start_time;
7c673cae 168 double avg_bandwidth = (double) (data.op_size) * (data.finished)
11fdf7f2 169 / elapsed.count() / (1024*1024);
7c673cae
FG
170 if (previous_writes != data.finished) {
171 previous_writes = data.finished;
172 cycleSinceChange = 0;
173 if (!formatter) {
11fdf7f2 174 bencher->out(cout, t)
7c673cae
FG
175 << setfill(' ')
176 << setw(5) << i
177 << ' ' << setw(7) << data.in_flight
178 << ' ' << setw(9) << data.started
179 << ' ' << setw(9) << data.finished
180 << ' ' << setw(9) << avg_bandwidth
181 << ' ' << setw(9) << bandwidth
11fdf7f2 182 << ' ' << setw(11) << (double)data.cur_latency.count()
7c673cae
FG
183 << ' ' << setw(11) << data.avg_latency << std::endl;
184 } else {
185 formatter->dump_format("sec", "%d", i);
186 formatter->dump_format("cur_ops", "%d", data.in_flight);
187 formatter->dump_format("started", "%d", data.started);
188 formatter->dump_format("finished", "%d", data.finished);
189 formatter->dump_format("avg_bw", "%f", avg_bandwidth);
190 formatter->dump_format("cur_bw", "%f", bandwidth);
11fdf7f2 191 formatter->dump_format("last_lat", "%f", (double)data.cur_latency.count());
7c673cae
FG
192 formatter->dump_format("avg_lat", "%f", data.avg_latency);
193 }
194 }
195 else {
196 if (!formatter) {
11fdf7f2 197 bencher->out(cout, t)
7c673cae
FG
198 << setfill(' ')
199 << setw(5) << i
200 << ' ' << setw(7) << data.in_flight
201 << ' ' << setw(9) << data.started
202 << ' ' << setw(9) << data.finished
203 << ' ' << setw(9) << avg_bandwidth
204 << ' ' << setw(9) << '0'
205 << ' ' << setw(11) << '-'
206 << ' '<< setw(11) << data.avg_latency << std::endl;
207 } else {
208 formatter->dump_format("sec", "%d", i);
209 formatter->dump_format("cur_ops", "%d", data.in_flight);
210 formatter->dump_format("started", "%d", data.started);
211 formatter->dump_format("finished", "%d", data.finished);
212 formatter->dump_format("avg_bw", "%f", avg_bandwidth);
213 formatter->dump_format("cur_bw", "%f", 0);
214 formatter->dump_format("last_lat", "%f", 0);
215 formatter->dump_format("avg_lat", "%f", data.avg_latency);
216 }
217 }
218 if (formatter) {
219 formatter->close_section(); // data
220 formatter->flush(*outstream);
221 }
222 ++i;
223 ++cycleSinceChange;
224 cond.WaitInterval(bencher->lock, ONE_SECOND);
225 }
226 if (formatter)
227 formatter->close_section(); //datas
11fdf7f2
TL
228 if (iops < 0) {
229 std::chrono::duration<double> runtime = mono_clock::now() - data.start_time;
230 data.idata.min_iops = data.idata.max_iops = data.finished / runtime.count();
231 }
232 bencher->lock.unlock();
7c673cae
FG
233 return NULL;
234}
235
236int ObjBencher::aio_bench(
237 int operation, int secondsToRun,
238 int concurrentios,
239 uint64_t op_size, uint64_t object_size,
240 unsigned max_objects,
241 bool cleanup, bool hints,
11fdf7f2 242 const std::string& run_name, bool reuse_bench, bool no_verify) {
7c673cae
FG
243
244 if (concurrentios <= 0)
245 return -EINVAL;
246
247 int num_objects = 0;
248 int r = 0;
11fdf7f2
TL
249 int prev_pid = 0;
250 std::chrono::duration<double> timePassed;
7c673cae
FG
251
252 // default metadata object is used if user does not specify one
253 const std::string run_name_meta = (run_name.empty() ? BENCH_LASTRUN_METADATA : run_name);
254
255 //get data from previous write run, if available
11fdf7f2 256 if (operation != OP_WRITE || reuse_bench) {
7c673cae
FG
257 uint64_t prev_op_size, prev_object_size;
258 r = fetch_bench_metadata(run_name_meta, &prev_op_size, &prev_object_size,
11fdf7f2 259 &num_objects, &prev_pid);
7c673cae 260 if (r < 0) {
11fdf7f2
TL
261 if (r == -ENOENT) {
262 if (reuse_bench)
263 cerr << "Must write data before using reuse_bench for a write benchmark!" << std::endl;
264 else
265 cerr << "Must write data before running a read benchmark!" << std::endl;
266 }
7c673cae
FG
267 return r;
268 }
269 object_size = prev_object_size;
270 op_size = prev_op_size;
271 }
272
273 char* contentsChars = new char[op_size];
11fdf7f2 274 lock.lock();
7c673cae
FG
275 data.done = false;
276 data.hints = hints;
277 data.object_size = object_size;
278 data.op_size = op_size;
279 data.in_flight = 0;
280 data.started = 0;
281 data.finished = 0;
282 data.min_latency = 9999.0; // this better be higher than initial latency!
283 data.max_latency = 0;
284 data.avg_latency = 0;
11fdf7f2 285 data.latency_diff_sum = 0;
7c673cae 286 data.object_contents = contentsChars;
11fdf7f2 287 lock.unlock();
7c673cae
FG
288
289 //fill in contentsChars deterministically so we can check returns
290 sanitize_object_contents(&data, data.op_size);
291
292 if (formatter)
293 formatter->open_object_section("bench");
294
295 if (OP_WRITE == operation) {
11fdf7f2 296 r = write_bench(secondsToRun, concurrentios, run_name_meta, max_objects, prev_pid);
7c673cae
FG
297 if (r != 0) goto out;
298 }
299 else if (OP_SEQ_READ == operation) {
11fdf7f2 300 r = seq_read_bench(secondsToRun, num_objects, concurrentios, prev_pid, no_verify);
7c673cae
FG
301 if (r != 0) goto out;
302 }
303 else if (OP_RAND_READ == operation) {
11fdf7f2 304 r = rand_read_bench(secondsToRun, num_objects, concurrentios, prev_pid, no_verify);
7c673cae
FG
305 if (r != 0) goto out;
306 }
307
308 if (OP_WRITE == operation && cleanup) {
309 r = fetch_bench_metadata(run_name_meta, &op_size, &object_size,
11fdf7f2 310 &num_objects, &prev_pid);
7c673cae
FG
311 if (r < 0) {
312 if (r == -ENOENT)
313 cerr << "Should never happen: bench metadata missing for current run!" << std::endl;
314 goto out;
315 }
316
11fdf7f2 317 data.start_time = mono_clock::now();
7c673cae
FG
318 out(cout) << "Cleaning up (deleting benchmark objects)" << std::endl;
319
11fdf7f2 320 r = clean_up(num_objects, prev_pid, concurrentios);
7c673cae
FG
321 if (r != 0) goto out;
322
11fdf7f2
TL
323 timePassed = mono_clock::now() - data.start_time;
324 out(cout) << "Clean up completed and total clean up time :" << timePassed.count() << std::endl;
7c673cae
FG
325
326 // lastrun file
327 r = sync_remove(run_name_meta);
328 if (r != 0) goto out;
329 }
330
331 out:
332 if (formatter) {
333 formatter->close_section(); // bench
334 formatter->flush(*outstream);
335 *outstream << std::endl;
336 }
337 delete[] contentsChars;
338 return r;
339}
340
341struct lock_cond {
342 explicit lock_cond(Mutex *_lock) : lock(_lock) {}
343 Mutex *lock;
344 Cond cond;
345};
346
347void _aio_cb(void *cb, void *arg) {
348 struct lock_cond *lc = (struct lock_cond *)arg;
11fdf7f2 349 lc->lock->lock();
7c673cae 350 lc->cond.Signal();
11fdf7f2 351 lc->lock->unlock();
7c673cae
FG
352}
353
354int ObjBencher::fetch_bench_metadata(const std::string& metadata_file,
355 uint64_t *op_size, uint64_t* object_size,
356 int* num_objects, int* prevPid) {
357 int r = 0;
358 bufferlist object_data;
359
360 r = sync_read(metadata_file, object_data,
361 sizeof(int) * 2 + sizeof(size_t) * 2);
362 if (r <= 0) {
363 // treat an empty file as a file that does not exist
364 if (r == 0) {
365 r = -ENOENT;
366 }
367 return r;
368 }
11fdf7f2
TL
369 auto p = object_data.cbegin();
370 decode(*object_size, p);
371 decode(*num_objects, p);
372 decode(*prevPid, p);
7c673cae 373 if (!p.end()) {
11fdf7f2 374 decode(*op_size, p);
7c673cae
FG
375 } else {
376 *op_size = *object_size;
377 }
378
379 return 0;
380}
381
382int ObjBencher::write_bench(int secondsToRun,
383 int concurrentios, const string& run_name_meta,
11fdf7f2 384 unsigned max_objects, int prev_pid) {
7c673cae
FG
385 if (concurrentios <= 0)
386 return -EINVAL;
387
388 if (!formatter) {
389 out(cout) << "Maintaining " << concurrentios << " concurrent writes of "
390 << data.op_size << " bytes to objects of size "
391 << data.object_size << " for up to "
392 << secondsToRun << " seconds or "
393 << max_objects << " objects"
394 << std::endl;
395 } else {
396 formatter->dump_format("concurrent_ios", "%d", concurrentios);
397 formatter->dump_format("object_size", "%d", data.object_size);
398 formatter->dump_format("op_size", "%d", data.op_size);
399 formatter->dump_format("seconds_to_run", "%d", secondsToRun);
400 formatter->dump_format("max_objects", "%d", max_objects);
401 }
402 bufferlist* newContents = 0;
403
11fdf7f2 404 std::string prefix = prev_pid ? generate_object_prefix(prev_pid) : generate_object_prefix();
7c673cae
FG
405 if (!formatter)
406 out(cout) << "Object prefix: " << prefix << std::endl;
407 else
408 formatter->dump_string("object_prefix", prefix);
409
410 std::vector<string> name(concurrentios);
411 std::string newName;
11fdf7f2 412 unique_ptr<bufferlist> contents[concurrentios];
7c673cae
FG
413 int r = 0;
414 bufferlist b_write;
415 lock_cond lc(&lock);
11fdf7f2
TL
416 double total_latency = 0;
417 std::vector<mono_time> start_times(concurrentios);
418 mono_time stopTime;
419 std::chrono::duration<double> timePassed;
7c673cae
FG
420
421 unsigned writes_per_object = 1;
422 if (data.op_size)
423 writes_per_object = data.object_size / data.op_size;
424
425 r = completions_init(concurrentios);
426
427 //set up writes so I can start them together
428 for (int i = 0; i<concurrentios; ++i) {
11fdf7f2
TL
429 name[i] = generate_object_name_fast(i / writes_per_object);
430 contents[i] = std::make_unique<bufferlist>();
7c673cae
FG
431 snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", i);
432 contents[i]->append(data.object_contents, data.op_size);
433 }
434
435 pthread_t print_thread;
436
437 pthread_create(&print_thread, NULL, ObjBencher::status_printer, (void *)this);
438 ceph_pthread_setname(print_thread, "write_stat");
11fdf7f2 439 lock.lock();
7c673cae 440 data.finished = 0;
11fdf7f2
TL
441 data.start_time = mono_clock::now();
442 lock.unlock();
7c673cae 443 for (int i = 0; i<concurrentios; ++i) {
11fdf7f2 444 start_times[i] = mono_clock::now();
7c673cae
FG
445 r = create_completion(i, _aio_cb, (void *)&lc);
446 if (r < 0)
447 goto ERR;
448 r = aio_write(name[i], i, *contents[i], data.op_size,
449 data.op_size * (i % writes_per_object));
11fdf7f2 450 if (r < 0) {
7c673cae
FG
451 goto ERR;
452 }
11fdf7f2 453 lock.lock();
7c673cae
FG
454 ++data.started;
455 ++data.in_flight;
11fdf7f2 456 lock.unlock();
7c673cae
FG
457 }
458
459 //keep on adding new writes as old ones complete until we've passed minimum time
460 int slot;
461 int num_objects;
462
463 //don't need locking for reads because other thread doesn't write
464
11fdf7f2 465 stopTime = data.start_time + std::chrono::seconds(secondsToRun);
7c673cae 466 slot = 0;
11fdf7f2
TL
467 lock.lock();
468 while (secondsToRun && mono_clock::now() < stopTime) {
7c673cae
FG
469 bool found = false;
470 while (1) {
471 int old_slot = slot;
472 do {
473 if (completion_is_done(slot)) {
474 found = true;
475 break;
476 }
477 slot++;
478 if (slot == concurrentios) {
479 slot = 0;
480 }
481 } while (slot != old_slot);
482 if (found)
483 break;
484 lc.cond.Wait(lock);
485 }
11fdf7f2 486 lock.unlock();
7c673cae 487 //create new contents and name on the heap, and fill them
11fdf7f2
TL
488 newName = generate_object_name_fast(data.started / writes_per_object);
489 newContents = contents[slot].get();
7c673cae
FG
490 snprintf(newContents->c_str(), data.op_size, "I'm the %16dth op!", data.started);
491 // we wrote to buffer, going around internal crc cache, so invalidate it now.
492 newContents->invalidate_crc();
493
494 completion_wait(slot);
11fdf7f2 495 lock.lock();
7c673cae
FG
496 r = completion_ret(slot);
497 if (r != 0) {
11fdf7f2 498 lock.unlock();
7c673cae
FG
499 goto ERR;
500 }
11fdf7f2
TL
501 data.cur_latency = mono_clock::now() - start_times[slot];
502 total_latency += data.cur_latency.count();
503 if( data.cur_latency.count() > data.max_latency)
504 data.max_latency = data.cur_latency.count();
505 if (data.cur_latency.count() < data.min_latency)
506 data.min_latency = data.cur_latency.count();
7c673cae 507 ++data.finished;
11fdf7f2 508 double delta = data.cur_latency.count() - data.avg_latency;
7c673cae 509 data.avg_latency = total_latency / data.finished;
11fdf7f2 510 data.latency_diff_sum += delta * (data.cur_latency.count() - data.avg_latency);
7c673cae 511 --data.in_flight;
11fdf7f2 512 lock.unlock();
7c673cae 513 release_completion(slot);
7c673cae
FG
514
515 //write new stuff to backend
11fdf7f2 516 start_times[slot] = mono_clock::now();
7c673cae
FG
517 r = create_completion(slot, _aio_cb, &lc);
518 if (r < 0)
519 goto ERR;
520 r = aio_write(newName, slot, *newContents, data.op_size,
521 data.op_size * (data.started % writes_per_object));
11fdf7f2 522 if (r < 0) {
7c673cae
FG
523 goto ERR;
524 }
525 name[slot] = newName;
11fdf7f2 526 lock.lock();
7c673cae
FG
527 ++data.started;
528 ++data.in_flight;
11fdf7f2
TL
529 if (data.op_size) {
530 if (max_objects &&
531 data.started >= (int)((data.object_size * max_objects + data.op_size - 1) /
532 data.op_size))
533 break;
534 }
7c673cae 535 }
11fdf7f2 536 lock.unlock();
7c673cae
FG
537
538 while (data.finished < data.started) {
539 slot = data.finished % concurrentios;
540 completion_wait(slot);
11fdf7f2 541 lock.lock();
7c673cae
FG
542 r = completion_ret(slot);
543 if (r != 0) {
11fdf7f2 544 lock.unlock();
7c673cae
FG
545 goto ERR;
546 }
11fdf7f2
TL
547 data.cur_latency = mono_clock::now() - start_times[slot];
548 total_latency += data.cur_latency.count();
549 if (data.cur_latency.count() > data.max_latency)
550 data.max_latency = data.cur_latency.count();
551 if (data.cur_latency.count() < data.min_latency)
552 data.min_latency = data.cur_latency.count();
7c673cae 553 ++data.finished;
11fdf7f2 554 double delta = data.cur_latency.count() - data.avg_latency;
7c673cae 555 data.avg_latency = total_latency / data.finished;
11fdf7f2 556 data.latency_diff_sum += delta * (data.cur_latency.count() - data.avg_latency);
7c673cae 557 --data.in_flight;
11fdf7f2 558 lock.unlock();
7c673cae 559 release_completion(slot);
7c673cae
FG
560 }
561
11fdf7f2
TL
562 timePassed = mono_clock::now() - data.start_time;
563 lock.lock();
7c673cae 564 data.done = true;
11fdf7f2 565 lock.unlock();
7c673cae
FG
566
567 pthread_join(print_thread, NULL);
568
569 double bandwidth;
11fdf7f2
TL
570 bandwidth = ((double)data.finished)*((double)data.op_size) /
571 timePassed.count();
7c673cae
FG
572 bandwidth = bandwidth/(1024*1024); // we want it in MB/sec
573
11fdf7f2
TL
574 double bandwidth_stddev;
575 double iops_stddev;
576 double latency_stddev;
577 if (data.idata.bandwidth_cycles > 1) {
578 bandwidth_stddev = std::sqrt(data.idata.bandwidth_diff_sum / (data.idata.bandwidth_cycles - 1));
579 } else {
580 bandwidth_stddev = 0;
581 }
582 if (data.idata.iops_cycles > 1) {
583 iops_stddev = std::sqrt(data.idata.iops_diff_sum / (data.idata.iops_cycles - 1));
584 } else {
585 iops_stddev = 0;
586 }
587 if (data.finished > 1) {
588 latency_stddev = std::sqrt(data.latency_diff_sum / (data.finished - 1));
589 } else {
590 latency_stddev = 0;
591 }
592
7c673cae 593 if (!formatter) {
11fdf7f2 594 out(cout) << "Total time run: " << timePassed.count() << std::endl
7c673cae
FG
595 << "Total writes made: " << data.finished << std::endl
596 << "Write size: " << data.op_size << std::endl
597 << "Object size: " << data.object_size << std::endl
598 << "Bandwidth (MB/sec): " << setprecision(6) << bandwidth << std::endl
11fdf7f2 599 << "Stddev Bandwidth: " << bandwidth_stddev << std::endl
7c673cae
FG
600 << "Max bandwidth (MB/sec): " << data.idata.max_bandwidth << std::endl
601 << "Min bandwidth (MB/sec): " << data.idata.min_bandwidth << std::endl
11fdf7f2
TL
602 << "Average IOPS: " << (int)(data.finished/timePassed.count()) << std::endl
603 << "Stddev IOPS: " << iops_stddev << std::endl
7c673cae
FG
604 << "Max IOPS: " << data.idata.max_iops << std::endl
605 << "Min IOPS: " << data.idata.min_iops << std::endl
606 << "Average Latency(s): " << data.avg_latency << std::endl
11fdf7f2 607 << "Stddev Latency(s): " << latency_stddev << std::endl
7c673cae
FG
608 << "Max latency(s): " << data.max_latency << std::endl
609 << "Min latency(s): " << data.min_latency << std::endl;
610 } else {
11fdf7f2 611 formatter->dump_format("total_time_run", "%f", timePassed.count());
7c673cae
FG
612 formatter->dump_format("total_writes_made", "%d", data.finished);
613 formatter->dump_format("write_size", "%d", data.op_size);
614 formatter->dump_format("object_size", "%d", data.object_size);
615 formatter->dump_format("bandwidth", "%f", bandwidth);
11fdf7f2 616 formatter->dump_format("stddev_bandwidth", "%f", bandwidth_stddev);
7c673cae
FG
617 formatter->dump_format("max_bandwidth", "%f", data.idata.max_bandwidth);
618 formatter->dump_format("min_bandwidth", "%f", data.idata.min_bandwidth);
11fdf7f2
TL
619 formatter->dump_format("average_iops", "%d", (int)(data.finished/timePassed.count()));
620 formatter->dump_format("stddev_iops", "%d", iops_stddev);
7c673cae
FG
621 formatter->dump_format("max_iops", "%d", data.idata.max_iops);
622 formatter->dump_format("min_iops", "%d", data.idata.min_iops);
623 formatter->dump_format("average_latency", "%f", data.avg_latency);
11fdf7f2 624 formatter->dump_format("stddev_latency", "%f", latency_stddev);
28e407b8 625 formatter->dump_format("max_latency", "%f", data.max_latency);
7c673cae
FG
626 formatter->dump_format("min_latency", "%f", data.min_latency);
627 }
628 //write object size/number data for read benchmarks
11fdf7f2 629 encode(data.object_size, b_write);
7c673cae 630 num_objects = (data.finished + writes_per_object - 1) / writes_per_object;
11fdf7f2
TL
631 encode(num_objects, b_write);
632 encode(prev_pid ? prev_pid : getpid(), b_write);
633 encode(data.op_size, b_write);
7c673cae
FG
634
635 // persist meta-data for further cleanup or read
636 sync_write(run_name_meta, b_write, sizeof(int)*3);
637
638 completions_done();
7c673cae
FG
639
640 return 0;
641
642 ERR:
11fdf7f2 643 lock.lock();
7c673cae 644 data.done = 1;
11fdf7f2 645 lock.unlock();
7c673cae 646 pthread_join(print_thread, NULL);
7c673cae
FG
647 return r;
648}
649
650int ObjBencher::seq_read_bench(int seconds_to_run, int num_objects, int concurrentios, int pid, bool no_verify) {
651 lock_cond lc(&lock);
652
653 if (concurrentios <= 0)
654 return -EINVAL;
655
656 std::vector<string> name(concurrentios);
657 std::string newName;
11fdf7f2 658 unique_ptr<bufferlist> contents[concurrentios];
7c673cae
FG
659 int index[concurrentios];
660 int errors = 0;
7c673cae
FG
661 double total_latency = 0;
662 int r = 0;
11fdf7f2
TL
663 std::vector<mono_time> start_times(concurrentios);
664 mono_clock::duration time_to_run = std::chrono::seconds(seconds_to_run);
665 std::chrono::duration<double> timePassed;
7c673cae
FG
666 sanitize_object_contents(&data, data.op_size); //clean it up once; subsequent
667 //changes will be safe because string length should remain the same
668
11fdf7f2 669 unsigned reads_per_object = 1;
7c673cae 670 if (data.op_size)
11fdf7f2 671 reads_per_object = data.object_size / data.op_size;
7c673cae
FG
672
673 r = completions_init(concurrentios);
674 if (r < 0)
675 return r;
676
677 //set up initial reads
678 for (int i = 0; i < concurrentios; ++i) {
11fdf7f2
TL
679 name[i] = generate_object_name_fast(i / reads_per_object, pid);
680 contents[i] = std::make_unique<bufferlist>();
7c673cae
FG
681 }
682
11fdf7f2 683 lock.lock();
7c673cae 684 data.finished = 0;
11fdf7f2
TL
685 data.start_time = mono_clock::now();
686 lock.unlock();
7c673cae
FG
687
688 pthread_t print_thread;
689 pthread_create(&print_thread, NULL, status_printer, (void *)this);
690 ceph_pthread_setname(print_thread, "seq_read_stat");
691
11fdf7f2 692 mono_time finish_time = data.start_time + time_to_run;
7c673cae
FG
693 //start initial reads
694 for (int i = 0; i < concurrentios; ++i) {
695 index[i] = i;
11fdf7f2 696 start_times[i] = mono_clock::now();
7c673cae 697 create_completion(i, _aio_cb, (void *)&lc);
11fdf7f2
TL
698 r = aio_read(name[i], i, contents[i].get(), data.op_size,
699 data.op_size * (i % reads_per_object));
700 if (r < 0) {
7c673cae
FG
701 cerr << "r = " << r << std::endl;
702 goto ERR;
703 }
11fdf7f2 704 lock.lock();
7c673cae
FG
705 ++data.started;
706 ++data.in_flight;
11fdf7f2 707 lock.unlock();
7c673cae
FG
708 }
709
710 //keep on adding new reads as old ones complete
711 int slot;
712 bufferlist *cur_contents;
713
714 slot = 0;
11fdf7f2 715 while ((seconds_to_run && mono_clock::now() < finish_time) &&
7c673cae 716 num_objects > data.started) {
11fdf7f2 717 lock.lock();
7c673cae
FG
718 int old_slot = slot;
719 bool found = false;
720 while (1) {
721 do {
722 if (completion_is_done(slot)) {
723 found = true;
724 break;
725 }
726 slot++;
727 if (slot == concurrentios) {
728 slot = 0;
729 }
730 } while (slot != old_slot);
731 if (found) {
732 break;
733 }
734 lc.cond.Wait(lock);
735 }
736
737 // calculate latency here, so memcmp doesn't inflate it
11fdf7f2 738 data.cur_latency = mono_clock::now() - start_times[slot];
7c673cae 739
11fdf7f2 740 cur_contents = contents[slot].get();
7c673cae
FG
741 int current_index = index[slot];
742
743 // invalidate internal crc cache
744 cur_contents->invalidate_crc();
745
746 if (!no_verify) {
747 snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", current_index);
748 if ( (cur_contents->length() != data.op_size) ||
749 (memcmp(data.object_contents, cur_contents->c_str(), data.op_size) != 0) ) {
750 cerr << name[slot] << " is not correct!" << std::endl;
751 ++errors;
752 }
753 }
754
11fdf7f2 755 newName = generate_object_name_fast(data.started / reads_per_object, pid);
7c673cae 756 index[slot] = data.started;
11fdf7f2 757 lock.unlock();
7c673cae 758 completion_wait(slot);
11fdf7f2 759 lock.lock();
7c673cae
FG
760 r = completion_ret(slot);
761 if (r < 0) {
762 cerr << "read got " << r << std::endl;
11fdf7f2 763 lock.unlock();
7c673cae
FG
764 goto ERR;
765 }
11fdf7f2
TL
766 total_latency += data.cur_latency.count();
767 if (data.cur_latency.count() > data.max_latency)
768 data.max_latency = data.cur_latency.count();
769 if (data.cur_latency.count() < data.min_latency)
770 data.min_latency = data.cur_latency.count();
7c673cae
FG
771 ++data.finished;
772 data.avg_latency = total_latency / data.finished;
773 --data.in_flight;
11fdf7f2 774 lock.unlock();
7c673cae
FG
775 release_completion(slot);
776
777 //start new read and check data if requested
11fdf7f2 778 start_times[slot] = mono_clock::now();
7c673cae 779 create_completion(slot, _aio_cb, (void *)&lc);
11fdf7f2
TL
780 r = aio_read(newName, slot, contents[slot].get(), data.op_size,
781 data.op_size * (data.started % reads_per_object));
7c673cae
FG
782 if (r < 0) {
783 goto ERR;
784 }
11fdf7f2 785 lock.lock();
7c673cae
FG
786 ++data.started;
787 ++data.in_flight;
11fdf7f2 788 lock.unlock();
7c673cae
FG
789 name[slot] = newName;
790 }
791
792 //wait for final reads to complete
793 while (data.finished < data.started) {
794 slot = data.finished % concurrentios;
795 completion_wait(slot);
11fdf7f2 796 lock.lock();
7c673cae
FG
797 r = completion_ret(slot);
798 if (r < 0) {
799 cerr << "read got " << r << std::endl;
11fdf7f2 800 lock.unlock();
7c673cae
FG
801 goto ERR;
802 }
11fdf7f2
TL
803 data.cur_latency = mono_clock::now() - start_times[slot];
804 total_latency += data.cur_latency.count();
805 if (data.cur_latency.count() > data.max_latency)
806 data.max_latency = data.cur_latency.count();
807 if (data.cur_latency.count() < data.min_latency)
808 data.min_latency = data.cur_latency.count();
7c673cae
FG
809 ++data.finished;
810 data.avg_latency = total_latency / data.finished;
811 --data.in_flight;
812 release_completion(slot);
813 if (!no_verify) {
814 snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", index[slot]);
11fdf7f2 815 lock.unlock();
7c673cae
FG
816 if ((contents[slot]->length() != data.op_size) ||
817 (memcmp(data.object_contents, contents[slot]->c_str(), data.op_size) != 0)) {
818 cerr << name[slot] << " is not correct!" << std::endl;
819 ++errors;
820 }
821 } else {
11fdf7f2 822 lock.unlock();
7c673cae 823 }
7c673cae
FG
824 }
825
11fdf7f2
TL
826 timePassed = mono_clock::now() - data.start_time;
827 lock.lock();
7c673cae 828 data.done = true;
11fdf7f2 829 lock.unlock();
7c673cae
FG
830
831 pthread_join(print_thread, NULL);
832
833 double bandwidth;
11fdf7f2 834 bandwidth = ((double)data.finished)*((double)data.op_size)/timePassed.count();
7c673cae 835 bandwidth = bandwidth/(1024*1024); // we want it in MB/sec
11fdf7f2
TL
836
837 double iops_stddev;
838 if (data.idata.iops_cycles > 1) {
839 iops_stddev = std::sqrt(data.idata.iops_diff_sum / (data.idata.iops_cycles - 1));
840 } else {
841 iops_stddev = 0;
842 }
7c673cae
FG
843
844 if (!formatter) {
11fdf7f2 845 out(cout) << "Total time run: " << timePassed.count() << std::endl
7c673cae
FG
846 << "Total reads made: " << data.finished << std::endl
847 << "Read size: " << data.op_size << std::endl
848 << "Object size: " << data.object_size << std::endl
849 << "Bandwidth (MB/sec): " << setprecision(6) << bandwidth << std::endl
11fdf7f2
TL
850 << "Average IOPS: " << (int)(data.finished/timePassed.count()) << std::endl
851 << "Stddev IOPS: " << iops_stddev << std::endl
7c673cae
FG
852 << "Max IOPS: " << data.idata.max_iops << std::endl
853 << "Min IOPS: " << data.idata.min_iops << std::endl
854 << "Average Latency(s): " << data.avg_latency << std::endl
855 << "Max latency(s): " << data.max_latency << std::endl
856 << "Min latency(s): " << data.min_latency << std::endl;
857 } else {
11fdf7f2 858 formatter->dump_format("total_time_run", "%f", timePassed.count());
7c673cae
FG
859 formatter->dump_format("total_reads_made", "%d", data.finished);
860 formatter->dump_format("read_size", "%d", data.op_size);
861 formatter->dump_format("object_size", "%d", data.object_size);
862 formatter->dump_format("bandwidth", "%f", bandwidth);
11fdf7f2
TL
863 formatter->dump_format("average_iops", "%d", (int)(data.finished/timePassed.count()));
864 formatter->dump_format("stddev_iops", "%f", iops_stddev);
7c673cae
FG
865 formatter->dump_format("max_iops", "%d", data.idata.max_iops);
866 formatter->dump_format("min_iops", "%d", data.idata.min_iops);
867 formatter->dump_format("average_latency", "%f", data.avg_latency);
868 formatter->dump_format("max_latency", "%f", data.max_latency);
869 formatter->dump_format("min_latency", "%f", data.min_latency);
870 }
871
872 completions_done();
873
874 return (errors > 0 ? -EIO : 0);
875
876 ERR:
11fdf7f2 877 lock.lock();
7c673cae 878 data.done = 1;
11fdf7f2 879 lock.unlock();
7c673cae
FG
880 pthread_join(print_thread, NULL);
881 return r;
882}
883
884int ObjBencher::rand_read_bench(int seconds_to_run, int num_objects, int concurrentios, int pid, bool no_verify)
885{
886 lock_cond lc(&lock);
887
888 if (concurrentios <= 0)
889 return -EINVAL;
890
891 std::vector<string> name(concurrentios);
892 std::string newName;
11fdf7f2 893 unique_ptr<bufferlist> contents[concurrentios];
7c673cae
FG
894 int index[concurrentios];
895 int errors = 0;
7c673cae 896 int r = 0;
11fdf7f2
TL
897 double total_latency = 0;
898 std::vector<mono_time> start_times(concurrentios);
899 mono_clock::duration time_to_run = std::chrono::seconds(seconds_to_run);
900 std::chrono::duration<double> timePassed;
7c673cae
FG
901 sanitize_object_contents(&data, data.op_size); //clean it up once; subsequent
902 //changes will be safe because string length should remain the same
903
11fdf7f2 904 unsigned reads_per_object = 1;
7c673cae 905 if (data.op_size)
11fdf7f2 906 reads_per_object = data.object_size / data.op_size;
7c673cae
FG
907
908 srand (time(NULL));
909
910 r = completions_init(concurrentios);
911 if (r < 0)
912 return r;
913
914 //set up initial reads
915 for (int i = 0; i < concurrentios; ++i) {
11fdf7f2
TL
916 name[i] = generate_object_name_fast(i / reads_per_object, pid);
917 contents[i] = std::make_unique<bufferlist>();
7c673cae
FG
918 }
919
11fdf7f2 920 lock.lock();
7c673cae 921 data.finished = 0;
11fdf7f2
TL
922 data.start_time = mono_clock::now();
923 lock.unlock();
7c673cae
FG
924
925 pthread_t print_thread;
926 pthread_create(&print_thread, NULL, status_printer, (void *)this);
927 ceph_pthread_setname(print_thread, "rand_read_stat");
928
11fdf7f2 929 mono_time finish_time = data.start_time + time_to_run;
7c673cae
FG
930 //start initial reads
931 for (int i = 0; i < concurrentios; ++i) {
932 index[i] = i;
11fdf7f2 933 start_times[i] = mono_clock::now();
7c673cae 934 create_completion(i, _aio_cb, (void *)&lc);
11fdf7f2
TL
935 r = aio_read(name[i], i, contents[i].get(), data.op_size,
936 data.op_size * (i % reads_per_object));
937 if (r < 0) {
7c673cae
FG
938 cerr << "r = " << r << std::endl;
939 goto ERR;
940 }
11fdf7f2 941 lock.lock();
7c673cae
FG
942 ++data.started;
943 ++data.in_flight;
11fdf7f2 944 lock.unlock();
7c673cae
FG
945 }
946
947 //keep on adding new reads as old ones complete
948 int slot;
949 bufferlist *cur_contents;
950 int rand_id;
951
952 slot = 0;
11fdf7f2
TL
953 while ((seconds_to_run && mono_clock::now() < finish_time)) {
954 lock.lock();
7c673cae
FG
955 int old_slot = slot;
956 bool found = false;
957 while (1) {
958 do {
959 if (completion_is_done(slot)) {
960 found = true;
961 break;
962 }
963 slot++;
964 if (slot == concurrentios) {
965 slot = 0;
966 }
967 } while (slot != old_slot);
968 if (found) {
969 break;
970 }
971 lc.cond.Wait(lock);
972 }
973
974 // calculate latency here, so memcmp doesn't inflate it
11fdf7f2 975 data.cur_latency = mono_clock::now() - start_times[slot];
7c673cae 976
11fdf7f2 977 lock.unlock();
7c673cae
FG
978
979 int current_index = index[slot];
11fdf7f2 980 cur_contents = contents[slot].get();
7c673cae 981 completion_wait(slot);
11fdf7f2 982 lock.lock();
7c673cae
FG
983 r = completion_ret(slot);
984 if (r < 0) {
985 cerr << "read got " << r << std::endl;
11fdf7f2 986 lock.unlock();
7c673cae
FG
987 goto ERR;
988 }
989
11fdf7f2
TL
990 total_latency += data.cur_latency.count();
991 if (data.cur_latency.count() > data.max_latency)
992 data.max_latency = data.cur_latency.count();
993 if (data.cur_latency.count() < data.min_latency)
994 data.min_latency = data.cur_latency.count();
7c673cae
FG
995 ++data.finished;
996 data.avg_latency = total_latency / data.finished;
997 --data.in_flight;
11fdf7f2 998 lock.unlock();
7c673cae
FG
999
1000 if (!no_verify) {
1001 snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", current_index);
1002 if ((cur_contents->length() != data.op_size) ||
1003 (memcmp(data.object_contents, cur_contents->c_str(), data.op_size) != 0)) {
1004 cerr << name[slot] << " is not correct!" << std::endl;
1005 ++errors;
1006 }
1007 }
1008
1009 rand_id = rand() % num_objects;
11fdf7f2 1010 newName = generate_object_name_fast(rand_id / reads_per_object, pid);
7c673cae
FG
1011 index[slot] = rand_id;
1012 release_completion(slot);
1013
1014 // invalidate internal crc cache
1015 cur_contents->invalidate_crc();
1016
1017 //start new read and check data if requested
11fdf7f2 1018 start_times[slot] = mono_clock::now();
7c673cae 1019 create_completion(slot, _aio_cb, (void *)&lc);
11fdf7f2
TL
1020 r = aio_read(newName, slot, contents[slot].get(), data.op_size,
1021 data.op_size * (rand_id % reads_per_object));
7c673cae
FG
1022 if (r < 0) {
1023 goto ERR;
1024 }
11fdf7f2 1025 lock.lock();
7c673cae
FG
1026 ++data.started;
1027 ++data.in_flight;
11fdf7f2 1028 lock.unlock();
7c673cae
FG
1029 name[slot] = newName;
1030 }
1031
1032
1033 //wait for final reads to complete
1034 while (data.finished < data.started) {
1035 slot = data.finished % concurrentios;
1036 completion_wait(slot);
11fdf7f2 1037 lock.lock();
7c673cae
FG
1038 r = completion_ret(slot);
1039 if (r < 0) {
1040 cerr << "read got " << r << std::endl;
11fdf7f2 1041 lock.unlock();
7c673cae
FG
1042 goto ERR;
1043 }
11fdf7f2
TL
1044 data.cur_latency = mono_clock::now() - start_times[slot];
1045 total_latency += data.cur_latency.count();
1046 if (data.cur_latency.count() > data.max_latency)
1047 data.max_latency = data.cur_latency.count();
1048 if (data.cur_latency.count() < data.min_latency)
1049 data.min_latency = data.cur_latency.count();
7c673cae
FG
1050 ++data.finished;
1051 data.avg_latency = total_latency / data.finished;
1052 --data.in_flight;
1053 release_completion(slot);
1054 if (!no_verify) {
1055 snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", index[slot]);
11fdf7f2 1056 lock.unlock();
7c673cae
FG
1057 if ((contents[slot]->length() != data.op_size) ||
1058 (memcmp(data.object_contents, contents[slot]->c_str(), data.op_size) != 0)) {
1059 cerr << name[slot] << " is not correct!" << std::endl;
1060 ++errors;
1061 }
1062 } else {
11fdf7f2 1063 lock.unlock();
7c673cae 1064 }
7c673cae
FG
1065 }
1066
11fdf7f2
TL
1067 timePassed = mono_clock::now() - data.start_time;
1068 lock.lock();
7c673cae 1069 data.done = true;
11fdf7f2 1070 lock.unlock();
7c673cae
FG
1071
1072 pthread_join(print_thread, NULL);
1073
1074 double bandwidth;
11fdf7f2 1075 bandwidth = ((double)data.finished)*((double)data.op_size)/timePassed.count();
7c673cae 1076 bandwidth = bandwidth/(1024*1024); // we want it in MB/sec
11fdf7f2
TL
1077
1078 double iops_stddev;
1079 if (data.idata.iops_cycles > 1) {
1080 iops_stddev = std::sqrt(data.idata.iops_diff_sum / (data.idata.iops_cycles - 1));
1081 } else {
1082 iops_stddev = 0;
1083 }
7c673cae
FG
1084
1085 if (!formatter) {
11fdf7f2 1086 out(cout) << "Total time run: " << timePassed.count() << std::endl
7c673cae
FG
1087 << "Total reads made: " << data.finished << std::endl
1088 << "Read size: " << data.op_size << std::endl
1089 << "Object size: " << data.object_size << std::endl
1090 << "Bandwidth (MB/sec): " << setprecision(6) << bandwidth << std::endl
11fdf7f2
TL
1091 << "Average IOPS: " << (int)(data.finished/timePassed.count()) << std::endl
1092 << "Stddev IOPS: " << iops_stddev << std::endl
7c673cae
FG
1093 << "Max IOPS: " << data.idata.max_iops << std::endl
1094 << "Min IOPS: " << data.idata.min_iops << std::endl
1095 << "Average Latency(s): " << data.avg_latency << std::endl
1096 << "Max latency(s): " << data.max_latency << std::endl
1097 << "Min latency(s): " << data.min_latency << std::endl;
1098 } else {
11fdf7f2 1099 formatter->dump_format("total_time_run", "%f", timePassed.count());
7c673cae
FG
1100 formatter->dump_format("total_reads_made", "%d", data.finished);
1101 formatter->dump_format("read_size", "%d", data.op_size);
1102 formatter->dump_format("object_size", "%d", data.object_size);
1103 formatter->dump_format("bandwidth", "%f", bandwidth);
11fdf7f2
TL
1104 formatter->dump_format("average_iops", "%d", (int)(data.finished/timePassed.count()));
1105 formatter->dump_format("stddev_iops", "%f", iops_stddev);
7c673cae
FG
1106 formatter->dump_format("max_iops", "%d", data.idata.max_iops);
1107 formatter->dump_format("min_iops", "%d", data.idata.min_iops);
1108 formatter->dump_format("average_latency", "%f", data.avg_latency);
1109 formatter->dump_format("max_latency", "%f", data.max_latency);
1110 formatter->dump_format("min_latency", "%f", data.min_latency);
1111 }
1112 completions_done();
1113
1114 return (errors > 0 ? -EIO : 0);
1115
1116 ERR:
11fdf7f2 1117 lock.lock();
7c673cae 1118 data.done = 1;
11fdf7f2 1119 lock.unlock();
7c673cae
FG
1120 pthread_join(print_thread, NULL);
1121 return r;
1122}
1123
1124int ObjBencher::clean_up(const std::string& orig_prefix, int concurrentios, const std::string& run_name) {
1125 int r = 0;
1126 uint64_t op_size, object_size;
1127 int num_objects;
1128 int prevPid;
1129
1130 // default meta object if user does not specify one
1131 const std::string run_name_meta = (run_name.empty() ? BENCH_LASTRUN_METADATA : run_name);
1132 const std::string prefix = (orig_prefix.empty() ? generate_object_prefix_nopid() : orig_prefix);
1133
1134 if (prefix.substr(0, BENCH_PREFIX.length()) != BENCH_PREFIX) {
1135 cerr << "Specified --prefix invalid, it must begin with \"" << BENCH_PREFIX << "\"" << std::endl;
1136 return -EINVAL;
1137 }
1138
1139 std::list<Object> unfiltered_objects;
1140 std::set<std::string> meta_namespaces, all_namespaces;
1141
1142 // If caller set all_nspaces this will be searching
1143 // across multiple namespaces.
1144 while (true) {
1145 bool objects_remain = get_objects(&unfiltered_objects, 20);
1146 if (!objects_remain)
1147 break;
1148
1149 std::list<Object>::const_iterator i = unfiltered_objects.begin();
1150 for ( ; i != unfiltered_objects.end(); ++i) {
1151 if (i->first == run_name_meta) {
1152 meta_namespaces.insert(i->second);
1153 }
1154 if (i->first.substr(0, prefix.length()) == prefix) {
1155 all_namespaces.insert(i->second);
1156 }
1157 }
1158 }
1159
1160 std::set<std::string>::const_iterator i = all_namespaces.begin();
1161 for ( ; i != all_namespaces.end(); ++i) {
1162 set_namespace(*i);
1163
1164 // if no metadata file found we should try to do a linear search on the prefix
1165 if (meta_namespaces.find(*i) == meta_namespaces.end()) {
1166 int r = clean_up_slow(prefix, concurrentios);
1167 if (r < 0) {
1168 cerr << "clean_up_slow error r= " << r << std::endl;
1169 return r;
1170 }
1171 continue;
1172 }
1173
1174 r = fetch_bench_metadata(run_name_meta, &op_size, &object_size, &num_objects, &prevPid);
1175 if (r < 0) {
1176 return r;
1177 }
1178
1179 r = clean_up(num_objects, prevPid, concurrentios);
1180 if (r != 0) return r;
1181
1182 r = sync_remove(run_name_meta);
1183 if (r != 0) return r;
1184 }
1185
1186 return 0;
1187}
1188
1189int ObjBencher::clean_up(int num_objects, int prevPid, int concurrentios) {
1190 lock_cond lc(&lock);
1191
1192 if (concurrentios <= 0)
1193 return -EINVAL;
1194
1195 std::vector<string> name(concurrentios);
1196 std::string newName;
1197 int r = 0;
7c673cae
FG
1198 int slot = 0;
1199
11fdf7f2 1200 lock.lock();
7c673cae
FG
1201 data.done = false;
1202 data.in_flight = 0;
1203 data.started = 0;
1204 data.finished = 0;
11fdf7f2 1205 lock.unlock();
7c673cae
FG
1206
1207 // don't start more completions than files
1208 if (num_objects == 0) {
1209 return 0;
1210 } else if (num_objects < concurrentios) {
1211 concurrentios = num_objects;
1212 }
1213
1214 r = completions_init(concurrentios);
1215 if (r < 0)
1216 return r;
1217
1218 //set up initial removes
1219 for (int i = 0; i < concurrentios; ++i) {
11fdf7f2 1220 name[i] = generate_object_name_fast(i, prevPid);
7c673cae
FG
1221 }
1222
1223 //start initial removes
1224 for (int i = 0; i < concurrentios; ++i) {
1225 create_completion(i, _aio_cb, (void *)&lc);
1226 r = aio_remove(name[i], i);
1227 if (r < 0) { //naughty, doesn't clean up heap
1228 cerr << "r = " << r << std::endl;
1229 goto ERR;
1230 }
11fdf7f2 1231 lock.lock();
7c673cae
FG
1232 ++data.started;
1233 ++data.in_flight;
11fdf7f2 1234 lock.unlock();
7c673cae
FG
1235 }
1236
1237 //keep on adding new removes as old ones complete
1238 while (data.started < num_objects) {
11fdf7f2 1239 lock.lock();
7c673cae
FG
1240 int old_slot = slot;
1241 bool found = false;
1242 while (1) {
1243 do {
1244 if (completion_is_done(slot)) {
1245 found = true;
1246 break;
1247 }
1248 slot++;
1249 if (slot == concurrentios) {
1250 slot = 0;
1251 }
1252 } while (slot != old_slot);
1253 if (found) {
1254 break;
1255 }
1256 lc.cond.Wait(lock);
1257 }
11fdf7f2
TL
1258 lock.unlock();
1259 newName = generate_object_name_fast(data.started, prevPid);
7c673cae 1260 completion_wait(slot);
11fdf7f2 1261 lock.lock();
7c673cae
FG
1262 r = completion_ret(slot);
1263 if (r != 0 && r != -ENOENT) { // file does not exist
1264 cerr << "remove got " << r << std::endl;
11fdf7f2 1265 lock.unlock();
7c673cae
FG
1266 goto ERR;
1267 }
1268 ++data.finished;
1269 --data.in_flight;
11fdf7f2 1270 lock.unlock();
7c673cae
FG
1271 release_completion(slot);
1272
1273 //start new remove and check data if requested
1274 create_completion(slot, _aio_cb, (void *)&lc);
1275 r = aio_remove(newName, slot);
1276 if (r < 0) {
1277 goto ERR;
1278 }
11fdf7f2 1279 lock.lock();
7c673cae
FG
1280 ++data.started;
1281 ++data.in_flight;
11fdf7f2 1282 lock.unlock();
7c673cae
FG
1283 name[slot] = newName;
1284 }
1285
1286 //wait for final removes to complete
1287 while (data.finished < data.started) {
1288 slot = data.finished % concurrentios;
1289 completion_wait(slot);
11fdf7f2 1290 lock.lock();
7c673cae
FG
1291 r = completion_ret(slot);
1292 if (r != 0 && r != -ENOENT) { // file does not exist
1293 cerr << "remove got " << r << std::endl;
11fdf7f2 1294 lock.unlock();
7c673cae
FG
1295 goto ERR;
1296 }
1297 ++data.finished;
1298 --data.in_flight;
1299 release_completion(slot);
11fdf7f2 1300 lock.unlock();
7c673cae
FG
1301 }
1302
11fdf7f2 1303 lock.lock();
7c673cae 1304 data.done = true;
11fdf7f2 1305 lock.unlock();
7c673cae
FG
1306
1307 completions_done();
1308
1309 out(cout) << "Removed " << data.finished << " object" << (data.finished != 1 ? "s" : "") << std::endl;
1310
1311 return 0;
1312
1313 ERR:
11fdf7f2 1314 lock.lock();
7c673cae 1315 data.done = 1;
11fdf7f2 1316 lock.unlock();
7c673cae
FG
1317 return r;
1318}
1319
1320/**
1321 * Return objects from the datastore which match a prefix.
1322 *
1323 * Clears the list and populates it with any objects which match the
1324 * prefix. The list is guaranteed to have at least one item when the
1325 * function returns true.
1326 *
1327 * @param prefix the prefix to match against
1328 * @param objects [out] return list of objects
1329 * @returns true if there are any objects in the store which match
1330 * the prefix, false if there are no more
1331 */
1332bool ObjBencher::more_objects_matching_prefix(const std::string& prefix, std::list<Object>* objects) {
1333 std::list<Object> unfiltered_objects;
1334
1335 objects->clear();
1336
1337 while (objects->empty()) {
1338 bool objects_remain = get_objects(&unfiltered_objects, 20);
1339 if (!objects_remain)
1340 return false;
1341
1342 std::list<Object>::const_iterator i = unfiltered_objects.begin();
1343 for ( ; i != unfiltered_objects.end(); ++i) {
1344 if (i->first.substr(0, prefix.length()) == prefix) {
1345 objects->push_back(*i);
1346 }
1347 }
1348 }
1349
1350 return true;
1351}
1352
1353int ObjBencher::clean_up_slow(const std::string& prefix, int concurrentios) {
1354 lock_cond lc(&lock);
1355
1356 if (concurrentios <= 0)
1357 return -EINVAL;
1358
1359 std::vector<Object> name(concurrentios);
1360 Object newName;
1361 int r = 0;
7c673cae
FG
1362 int slot = 0;
1363 std::list<Object> objects;
1364 bool objects_remain = true;
1365
11fdf7f2 1366 lock.lock();
7c673cae
FG
1367 data.done = false;
1368 data.in_flight = 0;
1369 data.started = 0;
1370 data.finished = 0;
11fdf7f2 1371 lock.unlock();
7c673cae
FG
1372
1373 out(cout) << "Warning: using slow linear search" << std::endl;
1374
1375 r = completions_init(concurrentios);
1376 if (r < 0)
1377 return r;
1378
1379 //set up initial removes
1380 for (int i = 0; i < concurrentios; ++i) {
1381 if (objects.empty()) {
1382 // if there are fewer objects than concurrent ios, don't generate extras
1383 bool objects_found = more_objects_matching_prefix(prefix, &objects);
1384 if (!objects_found) {
1385 concurrentios = i;
1386 objects_remain = false;
1387 break;
1388 }
1389 }
1390
1391 name[i] = objects.front();
1392 objects.pop_front();
1393 }
1394
1395 //start initial removes
1396 for (int i = 0; i < concurrentios; ++i) {
1397 create_completion(i, _aio_cb, (void *)&lc);
1398 set_namespace(name[i].second);
1399 r = aio_remove(name[i].first, i);
1400 if (r < 0) { //naughty, doesn't clean up heap
1401 cerr << "r = " << r << std::endl;
1402 goto ERR;
1403 }
11fdf7f2 1404 lock.lock();
7c673cae
FG
1405 ++data.started;
1406 ++data.in_flight;
11fdf7f2 1407 lock.unlock();
7c673cae
FG
1408 }
1409
1410 //keep on adding new removes as old ones complete
1411 while (objects_remain) {
11fdf7f2 1412 lock.lock();
7c673cae
FG
1413 int old_slot = slot;
1414 bool found = false;
1415 while (1) {
1416 do {
1417 if (completion_is_done(slot)) {
1418 found = true;
1419 break;
1420 }
1421 slot++;
1422 if (slot == concurrentios) {
1423 slot = 0;
1424 }
1425 } while (slot != old_slot);
1426 if (found) {
1427 break;
1428 }
1429 lc.cond.Wait(lock);
1430 }
11fdf7f2 1431 lock.unlock();
7c673cae
FG
1432
1433 // get more objects if necessary
1434 if (objects.empty()) {
1435 objects_remain = more_objects_matching_prefix(prefix, &objects);
1436 // quit if there are no more
1437 if (!objects_remain) {
1438 break;
1439 }
1440 }
1441
1442 // get the next object
1443 newName = objects.front();
1444 objects.pop_front();
1445
1446 completion_wait(slot);
11fdf7f2 1447 lock.lock();
7c673cae
FG
1448 r = completion_ret(slot);
1449 if (r != 0 && r != -ENOENT) { // file does not exist
1450 cerr << "remove got " << r << std::endl;
11fdf7f2 1451 lock.unlock();
7c673cae
FG
1452 goto ERR;
1453 }
1454 ++data.finished;
1455 --data.in_flight;
11fdf7f2 1456 lock.unlock();
7c673cae
FG
1457 release_completion(slot);
1458
1459 //start new remove and check data if requested
1460 create_completion(slot, _aio_cb, (void *)&lc);
1461 set_namespace(newName.second);
1462 r = aio_remove(newName.first, slot);
1463 if (r < 0) {
1464 goto ERR;
1465 }
11fdf7f2 1466 lock.lock();
7c673cae
FG
1467 ++data.started;
1468 ++data.in_flight;
11fdf7f2 1469 lock.unlock();
7c673cae
FG
1470 name[slot] = newName;
1471 }
1472
1473 //wait for final removes to complete
1474 while (data.finished < data.started) {
1475 slot = data.finished % concurrentios;
1476 completion_wait(slot);
11fdf7f2 1477 lock.lock();
7c673cae
FG
1478 r = completion_ret(slot);
1479 if (r != 0 && r != -ENOENT) { // file does not exist
1480 cerr << "remove got " << r << std::endl;
11fdf7f2 1481 lock.unlock();
7c673cae
FG
1482 goto ERR;
1483 }
1484 ++data.finished;
1485 --data.in_flight;
1486 release_completion(slot);
11fdf7f2 1487 lock.unlock();
7c673cae
FG
1488 }
1489
11fdf7f2 1490 lock.lock();
7c673cae 1491 data.done = true;
11fdf7f2 1492 lock.unlock();
7c673cae
FG
1493
1494 completions_done();
1495
1496 out(cout) << "Removed " << data.finished << " object" << (data.finished != 1 ? "s" : "") << std::endl;
1497
1498 return 0;
1499
1500 ERR:
11fdf7f2 1501 lock.lock();
7c673cae 1502 data.done = 1;
11fdf7f2 1503 lock.unlock();
7c673cae
FG
1504 return -EIO;
1505}