]> git.proxmox.com Git - ceph.git/blob - ceph/src/common/obj_bencher.cc
import new upstream nautilus stable release 14.2.8
[ceph.git] / ceph / src / common / obj_bencher.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2009 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 * Series of functions to test your rados installation. Notice
14 * that this code is not terribly robust -- for instance, if you
15 * try and bench on a pool you don't have permission to access
16 * it will just loop forever.
17 */
18 #include "include/compat.h"
19 #include <pthread.h>
20 #include "common/Cond.h"
21 #include "obj_bencher.h"
22
23 const std::string BENCH_LASTRUN_METADATA = "benchmark_last_metadata";
24 const std::string BENCH_PREFIX = "benchmark_data";
25 const std::string BENCH_OBJ_NAME = BENCH_PREFIX + "_%s_%d_object%d";
26
27 static char cached_hostname[30] = {0};
28 int cached_pid = 0;
29
30 static std::string generate_object_prefix_nopid() {
31 if (cached_hostname[0] == 0) {
32 gethostname(cached_hostname, sizeof(cached_hostname)-1);
33 cached_hostname[sizeof(cached_hostname)-1] = 0;
34 }
35
36 std::ostringstream oss;
37 oss << BENCH_PREFIX << "_" << cached_hostname;
38 return oss.str();
39 }
40
41 static std::string generate_object_prefix(int pid = 0) {
42 if (pid)
43 cached_pid = pid;
44 else if (!cached_pid)
45 cached_pid = getpid();
46
47 std::ostringstream oss;
48 oss << generate_object_prefix_nopid() << "_" << cached_pid;
49 return oss.str();
50 }
51
52 // this is 8x faster than previous impl based on chained, deduped functions call
53 static std::string generate_object_name_fast(int objnum, int pid = 0)
54 {
55 if (cached_hostname[0] == 0) {
56 gethostname(cached_hostname, sizeof(cached_hostname)-1);
57 cached_hostname[sizeof(cached_hostname)-1] = 0;
58 }
59
60 if (pid)
61 cached_pid = pid;
62 else if (!cached_pid)
63 cached_pid = getpid();
64
65 char name[512];
66 int n = snprintf(&name[0], sizeof(name), BENCH_OBJ_NAME.c_str(), cached_hostname, cached_pid, objnum);
67 ceph_assert(n > 0 && n < (int)sizeof(name));
68 return std::string(&name[0], (size_t)n);
69 }
70
71 static void sanitize_object_contents (bench_data *data, size_t length) {
72 // FIPS zeroization audit 20191115: this memset is not security related.
73 memset(data->object_contents, 'z', length);
74 }
75
76 ostream& ObjBencher::out(ostream& os, utime_t& t)
77 {
78 if (show_time)
79 return t.localtime(os) << " ";
80 else
81 return os;
82 }
83
84 ostream& ObjBencher::out(ostream& os)
85 {
86 utime_t cur_time = ceph_clock_now();
87 return out(os, cur_time);
88 }
89
90 void *ObjBencher::status_printer(void *_bencher) {
91 ObjBencher *bencher = static_cast<ObjBencher *>(_bencher);
92 bench_data& data = bencher->data;
93 Formatter *formatter = bencher->formatter;
94 ostream *outstream = bencher->outstream;
95 Cond cond;
96 int i = 0;
97 int previous_writes = 0;
98 int cycleSinceChange = 0;
99 double bandwidth;
100 int iops = 0;
101 mono_clock::duration ONE_SECOND = std::chrono::seconds(1);
102 bencher->lock.lock();
103 if (formatter)
104 formatter->open_array_section("datas");
105 while(!data.done) {
106 mono_time cur_time = mono_clock::now();
107 utime_t t = ceph_clock_now();
108
109 if (i % 20 == 0 && !formatter) {
110 if (i > 0)
111 t.localtime(cout)
112 << " min lat: " << data.min_latency
113 << " max lat: " << data.max_latency
114 << " avg lat: " << data.avg_latency << std::endl;
115 //I'm naughty and don't reset the fill
116 bencher->out(cout, t) << setfill(' ')
117 << setw(5) << "sec"
118 << setw(8) << "Cur ops"
119 << setw(10) << "started"
120 << setw(10) << "finished"
121 << setw(10) << "avg MB/s"
122 << setw(10) << "cur MB/s"
123 << setw(12) << "last lat(s)"
124 << setw(12) << "avg lat(s)" << std::endl;
125 }
126 if (cycleSinceChange)
127 bandwidth = (double)(data.finished - previous_writes)
128 * (data.op_size)
129 / (1024*1024)
130 / cycleSinceChange;
131 else
132 bandwidth = -1;
133
134 if (!std::isnan(bandwidth) && bandwidth > -1) {
135 if (bandwidth > data.idata.max_bandwidth)
136 data.idata.max_bandwidth = bandwidth;
137 if (bandwidth < data.idata.min_bandwidth)
138 data.idata.min_bandwidth = bandwidth;
139
140 ++data.idata.bandwidth_cycles;
141 double delta = bandwidth - data.idata.avg_bandwidth;
142 data.idata.avg_bandwidth += delta / data.idata.bandwidth_cycles;
143 data.idata.bandwidth_diff_sum += delta * (bandwidth - data.idata.avg_bandwidth);
144 }
145
146 if (cycleSinceChange)
147 iops = (double)(data.finished - previous_writes)
148 / cycleSinceChange;
149 else
150 iops = -1;
151
152 if (!std::isnan(iops) && iops > -1) {
153 if (iops > data.idata.max_iops)
154 data.idata.max_iops = iops;
155 if (iops < data.idata.min_iops)
156 data.idata.min_iops = iops;
157
158 ++data.idata.iops_cycles;
159 double delta = iops - data.idata.avg_iops;
160 data.idata.avg_iops += delta / data.idata.iops_cycles;
161 data.idata.iops_diff_sum += delta * (iops - data.idata.avg_iops);
162 }
163
164 if (formatter)
165 formatter->open_object_section("data");
166
167 // elapsed will be in seconds, by default
168 std::chrono::duration<double> elapsed = cur_time - data.start_time;
169 double avg_bandwidth = (double) (data.op_size) * (data.finished)
170 / elapsed.count() / (1024*1024);
171 if (previous_writes != data.finished) {
172 previous_writes = data.finished;
173 cycleSinceChange = 0;
174 if (!formatter) {
175 bencher->out(cout, t)
176 << setfill(' ')
177 << setw(5) << i
178 << ' ' << setw(7) << data.in_flight
179 << ' ' << setw(9) << data.started
180 << ' ' << setw(9) << data.finished
181 << ' ' << setw(9) << avg_bandwidth
182 << ' ' << setw(9) << bandwidth
183 << ' ' << setw(11) << (double)data.cur_latency.count()
184 << ' ' << setw(11) << data.avg_latency << std::endl;
185 } else {
186 formatter->dump_format("sec", "%d", i);
187 formatter->dump_format("cur_ops", "%d", data.in_flight);
188 formatter->dump_format("started", "%d", data.started);
189 formatter->dump_format("finished", "%d", data.finished);
190 formatter->dump_format("avg_bw", "%f", avg_bandwidth);
191 formatter->dump_format("cur_bw", "%f", bandwidth);
192 formatter->dump_format("last_lat", "%f", (double)data.cur_latency.count());
193 formatter->dump_format("avg_lat", "%f", data.avg_latency);
194 }
195 }
196 else {
197 if (!formatter) {
198 bencher->out(cout, t)
199 << setfill(' ')
200 << setw(5) << i
201 << ' ' << setw(7) << data.in_flight
202 << ' ' << setw(9) << data.started
203 << ' ' << setw(9) << data.finished
204 << ' ' << setw(9) << avg_bandwidth
205 << ' ' << setw(9) << '0'
206 << ' ' << setw(11) << '-'
207 << ' '<< setw(11) << data.avg_latency << std::endl;
208 } else {
209 formatter->dump_format("sec", "%d", i);
210 formatter->dump_format("cur_ops", "%d", data.in_flight);
211 formatter->dump_format("started", "%d", data.started);
212 formatter->dump_format("finished", "%d", data.finished);
213 formatter->dump_format("avg_bw", "%f", avg_bandwidth);
214 formatter->dump_format("cur_bw", "%f", 0);
215 formatter->dump_format("last_lat", "%f", 0);
216 formatter->dump_format("avg_lat", "%f", data.avg_latency);
217 }
218 }
219 if (formatter) {
220 formatter->close_section(); // data
221 formatter->flush(*outstream);
222 }
223 ++i;
224 ++cycleSinceChange;
225 cond.WaitInterval(bencher->lock, ONE_SECOND);
226 }
227 if (formatter)
228 formatter->close_section(); //datas
229 if (iops < 0) {
230 std::chrono::duration<double> runtime = mono_clock::now() - data.start_time;
231 data.idata.min_iops = data.idata.max_iops = data.finished / runtime.count();
232 }
233 bencher->lock.unlock();
234 return NULL;
235 }
236
237 int ObjBencher::aio_bench(
238 int operation, int secondsToRun,
239 int concurrentios,
240 uint64_t op_size, uint64_t object_size,
241 unsigned max_objects,
242 bool cleanup, bool hints,
243 const std::string& run_name, bool reuse_bench, bool no_verify) {
244
245 if (concurrentios <= 0)
246 return -EINVAL;
247
248 int num_objects = 0;
249 int r = 0;
250 int prev_pid = 0;
251 std::chrono::duration<double> timePassed;
252
253 // default metadata object is used if user does not specify one
254 const std::string run_name_meta = (run_name.empty() ? BENCH_LASTRUN_METADATA : run_name);
255
256 //get data from previous write run, if available
257 if (operation != OP_WRITE || reuse_bench) {
258 uint64_t prev_op_size, prev_object_size;
259 r = fetch_bench_metadata(run_name_meta, &prev_op_size, &prev_object_size,
260 &num_objects, &prev_pid);
261 if (r < 0) {
262 if (r == -ENOENT) {
263 if (reuse_bench)
264 cerr << "Must write data before using reuse_bench for a write benchmark!" << std::endl;
265 else
266 cerr << "Must write data before running a read benchmark!" << std::endl;
267 }
268 return r;
269 }
270 object_size = prev_object_size;
271 op_size = prev_op_size;
272 }
273
274 char* contentsChars = new char[op_size];
275 lock.lock();
276 data.done = false;
277 data.hints = hints;
278 data.object_size = object_size;
279 data.op_size = op_size;
280 data.in_flight = 0;
281 data.started = 0;
282 data.finished = 0;
283 data.min_latency = 9999.0; // this better be higher than initial latency!
284 data.max_latency = 0;
285 data.avg_latency = 0;
286 data.latency_diff_sum = 0;
287 data.object_contents = contentsChars;
288 lock.unlock();
289
290 //fill in contentsChars deterministically so we can check returns
291 sanitize_object_contents(&data, data.op_size);
292
293 if (formatter)
294 formatter->open_object_section("bench");
295
296 if (OP_WRITE == operation) {
297 r = write_bench(secondsToRun, concurrentios, run_name_meta, max_objects, prev_pid);
298 if (r != 0) goto out;
299 }
300 else if (OP_SEQ_READ == operation) {
301 r = seq_read_bench(secondsToRun, num_objects, concurrentios, prev_pid, no_verify);
302 if (r != 0) goto out;
303 }
304 else if (OP_RAND_READ == operation) {
305 r = rand_read_bench(secondsToRun, num_objects, concurrentios, prev_pid, no_verify);
306 if (r != 0) goto out;
307 }
308
309 if (OP_WRITE == operation && cleanup) {
310 r = fetch_bench_metadata(run_name_meta, &op_size, &object_size,
311 &num_objects, &prev_pid);
312 if (r < 0) {
313 if (r == -ENOENT)
314 cerr << "Should never happen: bench metadata missing for current run!" << std::endl;
315 goto out;
316 }
317
318 data.start_time = mono_clock::now();
319 out(cout) << "Cleaning up (deleting benchmark objects)" << std::endl;
320
321 r = clean_up(num_objects, prev_pid, concurrentios);
322 if (r != 0) goto out;
323
324 timePassed = mono_clock::now() - data.start_time;
325 out(cout) << "Clean up completed and total clean up time :" << timePassed.count() << std::endl;
326
327 // lastrun file
328 r = sync_remove(run_name_meta);
329 if (r != 0) goto out;
330 }
331
332 out:
333 if (formatter) {
334 formatter->close_section(); // bench
335 formatter->flush(*outstream);
336 *outstream << std::endl;
337 }
338 delete[] contentsChars;
339 return r;
340 }
341
342 struct lock_cond {
343 explicit lock_cond(Mutex *_lock) : lock(_lock) {}
344 Mutex *lock;
345 Cond cond;
346 };
347
348 void _aio_cb(void *cb, void *arg) {
349 struct lock_cond *lc = (struct lock_cond *)arg;
350 lc->lock->lock();
351 lc->cond.Signal();
352 lc->lock->unlock();
353 }
354
355 int ObjBencher::fetch_bench_metadata(const std::string& metadata_file,
356 uint64_t *op_size, uint64_t* object_size,
357 int* num_objects, int* prevPid) {
358 int r = 0;
359 bufferlist object_data;
360
361 r = sync_read(metadata_file, object_data,
362 sizeof(int) * 2 + sizeof(size_t) * 2);
363 if (r <= 0) {
364 // treat an empty file as a file that does not exist
365 if (r == 0) {
366 r = -ENOENT;
367 }
368 return r;
369 }
370 auto p = object_data.cbegin();
371 decode(*object_size, p);
372 decode(*num_objects, p);
373 decode(*prevPid, p);
374 if (!p.end()) {
375 decode(*op_size, p);
376 } else {
377 *op_size = *object_size;
378 }
379
380 return 0;
381 }
382
383 int ObjBencher::write_bench(int secondsToRun,
384 int concurrentios, const string& run_name_meta,
385 unsigned max_objects, int prev_pid) {
386 if (concurrentios <= 0)
387 return -EINVAL;
388
389 if (!formatter) {
390 out(cout) << "Maintaining " << concurrentios << " concurrent writes of "
391 << data.op_size << " bytes to objects of size "
392 << data.object_size << " for up to "
393 << secondsToRun << " seconds or "
394 << max_objects << " objects"
395 << std::endl;
396 } else {
397 formatter->dump_format("concurrent_ios", "%d", concurrentios);
398 formatter->dump_format("object_size", "%d", data.object_size);
399 formatter->dump_format("op_size", "%d", data.op_size);
400 formatter->dump_format("seconds_to_run", "%d", secondsToRun);
401 formatter->dump_format("max_objects", "%d", max_objects);
402 }
403 bufferlist* newContents = 0;
404
405 std::string prefix = prev_pid ? generate_object_prefix(prev_pid) : generate_object_prefix();
406 if (!formatter)
407 out(cout) << "Object prefix: " << prefix << std::endl;
408 else
409 formatter->dump_string("object_prefix", prefix);
410
411 std::vector<string> name(concurrentios);
412 std::string newName;
413 unique_ptr<bufferlist> contents[concurrentios];
414 int r = 0;
415 bufferlist b_write;
416 lock_cond lc(&lock);
417 double total_latency = 0;
418 std::vector<mono_time> start_times(concurrentios);
419 mono_time stopTime;
420 std::chrono::duration<double> timePassed;
421
422 unsigned writes_per_object = 1;
423 if (data.op_size)
424 writes_per_object = data.object_size / data.op_size;
425
426 r = completions_init(concurrentios);
427
428 //set up writes so I can start them together
429 for (int i = 0; i<concurrentios; ++i) {
430 name[i] = generate_object_name_fast(i / writes_per_object);
431 contents[i] = std::make_unique<bufferlist>();
432 snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", i);
433 contents[i]->append(data.object_contents, data.op_size);
434 }
435
436 pthread_t print_thread;
437
438 pthread_create(&print_thread, NULL, ObjBencher::status_printer, (void *)this);
439 ceph_pthread_setname(print_thread, "write_stat");
440 lock.lock();
441 data.finished = 0;
442 data.start_time = mono_clock::now();
443 lock.unlock();
444 for (int i = 0; i<concurrentios; ++i) {
445 start_times[i] = mono_clock::now();
446 r = create_completion(i, _aio_cb, (void *)&lc);
447 if (r < 0)
448 goto ERR;
449 r = aio_write(name[i], i, *contents[i], data.op_size,
450 data.op_size * (i % writes_per_object));
451 if (r < 0) {
452 goto ERR;
453 }
454 lock.lock();
455 ++data.started;
456 ++data.in_flight;
457 lock.unlock();
458 }
459
460 //keep on adding new writes as old ones complete until we've passed minimum time
461 int slot;
462 int num_objects;
463
464 //don't need locking for reads because other thread doesn't write
465
466 stopTime = data.start_time + std::chrono::seconds(secondsToRun);
467 slot = 0;
468 lock.lock();
469 while (secondsToRun && mono_clock::now() < stopTime) {
470 bool found = false;
471 while (1) {
472 int old_slot = slot;
473 do {
474 if (completion_is_done(slot)) {
475 found = true;
476 break;
477 }
478 slot++;
479 if (slot == concurrentios) {
480 slot = 0;
481 }
482 } while (slot != old_slot);
483 if (found)
484 break;
485 lc.cond.Wait(lock);
486 }
487 lock.unlock();
488 //create new contents and name on the heap, and fill them
489 newName = generate_object_name_fast(data.started / writes_per_object);
490 newContents = contents[slot].get();
491 snprintf(newContents->c_str(), data.op_size, "I'm the %16dth op!", data.started);
492 // we wrote to buffer, going around internal crc cache, so invalidate it now.
493 newContents->invalidate_crc();
494
495 completion_wait(slot);
496 lock.lock();
497 r = completion_ret(slot);
498 if (r != 0) {
499 lock.unlock();
500 goto ERR;
501 }
502 data.cur_latency = mono_clock::now() - start_times[slot];
503 total_latency += data.cur_latency.count();
504 if( data.cur_latency.count() > data.max_latency)
505 data.max_latency = data.cur_latency.count();
506 if (data.cur_latency.count() < data.min_latency)
507 data.min_latency = data.cur_latency.count();
508 ++data.finished;
509 double delta = data.cur_latency.count() - data.avg_latency;
510 data.avg_latency = total_latency / data.finished;
511 data.latency_diff_sum += delta * (data.cur_latency.count() - data.avg_latency);
512 --data.in_flight;
513 lock.unlock();
514 release_completion(slot);
515
516 //write new stuff to backend
517 start_times[slot] = mono_clock::now();
518 r = create_completion(slot, _aio_cb, &lc);
519 if (r < 0)
520 goto ERR;
521 r = aio_write(newName, slot, *newContents, data.op_size,
522 data.op_size * (data.started % writes_per_object));
523 if (r < 0) {
524 goto ERR;
525 }
526 name[slot] = newName;
527 lock.lock();
528 ++data.started;
529 ++data.in_flight;
530 if (data.op_size) {
531 if (max_objects &&
532 data.started >= (int)((data.object_size * max_objects + data.op_size - 1) /
533 data.op_size))
534 break;
535 }
536 }
537 lock.unlock();
538
539 while (data.finished < data.started) {
540 slot = data.finished % concurrentios;
541 completion_wait(slot);
542 lock.lock();
543 r = completion_ret(slot);
544 if (r != 0) {
545 lock.unlock();
546 goto ERR;
547 }
548 data.cur_latency = mono_clock::now() - start_times[slot];
549 total_latency += data.cur_latency.count();
550 if (data.cur_latency.count() > data.max_latency)
551 data.max_latency = data.cur_latency.count();
552 if (data.cur_latency.count() < data.min_latency)
553 data.min_latency = data.cur_latency.count();
554 ++data.finished;
555 double delta = data.cur_latency.count() - data.avg_latency;
556 data.avg_latency = total_latency / data.finished;
557 data.latency_diff_sum += delta * (data.cur_latency.count() - data.avg_latency);
558 --data.in_flight;
559 lock.unlock();
560 release_completion(slot);
561 }
562
563 timePassed = mono_clock::now() - data.start_time;
564 lock.lock();
565 data.done = true;
566 lock.unlock();
567
568 pthread_join(print_thread, NULL);
569
570 double bandwidth;
571 bandwidth = ((double)data.finished)*((double)data.op_size) /
572 timePassed.count();
573 bandwidth = bandwidth/(1024*1024); // we want it in MB/sec
574
575 double bandwidth_stddev;
576 double iops_stddev;
577 double latency_stddev;
578 if (data.idata.bandwidth_cycles > 1) {
579 bandwidth_stddev = std::sqrt(data.idata.bandwidth_diff_sum / (data.idata.bandwidth_cycles - 1));
580 } else {
581 bandwidth_stddev = 0;
582 }
583 if (data.idata.iops_cycles > 1) {
584 iops_stddev = std::sqrt(data.idata.iops_diff_sum / (data.idata.iops_cycles - 1));
585 } else {
586 iops_stddev = 0;
587 }
588 if (data.finished > 1) {
589 latency_stddev = std::sqrt(data.latency_diff_sum / (data.finished - 1));
590 } else {
591 latency_stddev = 0;
592 }
593
594 if (!formatter) {
595 out(cout) << "Total time run: " << timePassed.count() << std::endl
596 << "Total writes made: " << data.finished << std::endl
597 << "Write size: " << data.op_size << std::endl
598 << "Object size: " << data.object_size << std::endl
599 << "Bandwidth (MB/sec): " << setprecision(6) << bandwidth << std::endl
600 << "Stddev Bandwidth: " << bandwidth_stddev << std::endl
601 << "Max bandwidth (MB/sec): " << data.idata.max_bandwidth << std::endl
602 << "Min bandwidth (MB/sec): " << data.idata.min_bandwidth << std::endl
603 << "Average IOPS: " << (int)(data.finished/timePassed.count()) << std::endl
604 << "Stddev IOPS: " << iops_stddev << std::endl
605 << "Max IOPS: " << data.idata.max_iops << std::endl
606 << "Min IOPS: " << data.idata.min_iops << std::endl
607 << "Average Latency(s): " << data.avg_latency << std::endl
608 << "Stddev Latency(s): " << latency_stddev << std::endl
609 << "Max latency(s): " << data.max_latency << std::endl
610 << "Min latency(s): " << data.min_latency << std::endl;
611 } else {
612 formatter->dump_format("total_time_run", "%f", timePassed.count());
613 formatter->dump_format("total_writes_made", "%d", data.finished);
614 formatter->dump_format("write_size", "%d", data.op_size);
615 formatter->dump_format("object_size", "%d", data.object_size);
616 formatter->dump_format("bandwidth", "%f", bandwidth);
617 formatter->dump_format("stddev_bandwidth", "%f", bandwidth_stddev);
618 formatter->dump_format("max_bandwidth", "%f", data.idata.max_bandwidth);
619 formatter->dump_format("min_bandwidth", "%f", data.idata.min_bandwidth);
620 formatter->dump_format("average_iops", "%d", (int)(data.finished/timePassed.count()));
621 formatter->dump_format("stddev_iops", "%d", iops_stddev);
622 formatter->dump_format("max_iops", "%d", data.idata.max_iops);
623 formatter->dump_format("min_iops", "%d", data.idata.min_iops);
624 formatter->dump_format("average_latency", "%f", data.avg_latency);
625 formatter->dump_format("stddev_latency", "%f", latency_stddev);
626 formatter->dump_format("max_latency", "%f", data.max_latency);
627 formatter->dump_format("min_latency", "%f", data.min_latency);
628 }
629 //write object size/number data for read benchmarks
630 encode(data.object_size, b_write);
631 num_objects = (data.finished + writes_per_object - 1) / writes_per_object;
632 encode(num_objects, b_write);
633 encode(prev_pid ? prev_pid : getpid(), b_write);
634 encode(data.op_size, b_write);
635
636 // persist meta-data for further cleanup or read
637 sync_write(run_name_meta, b_write, sizeof(int)*3);
638
639 completions_done();
640
641 return 0;
642
643 ERR:
644 lock.lock();
645 data.done = 1;
646 lock.unlock();
647 pthread_join(print_thread, NULL);
648 return r;
649 }
650
651 int ObjBencher::seq_read_bench(int seconds_to_run, int num_objects, int concurrentios, int pid, bool no_verify) {
652 lock_cond lc(&lock);
653
654 if (concurrentios <= 0)
655 return -EINVAL;
656
657 std::vector<string> name(concurrentios);
658 std::string newName;
659 unique_ptr<bufferlist> contents[concurrentios];
660 int index[concurrentios];
661 int errors = 0;
662 double total_latency = 0;
663 int r = 0;
664 std::vector<mono_time> start_times(concurrentios);
665 mono_clock::duration time_to_run = std::chrono::seconds(seconds_to_run);
666 std::chrono::duration<double> timePassed;
667 sanitize_object_contents(&data, data.op_size); //clean it up once; subsequent
668 //changes will be safe because string length should remain the same
669
670 unsigned reads_per_object = 1;
671 if (data.op_size)
672 reads_per_object = data.object_size / data.op_size;
673
674 r = completions_init(concurrentios);
675 if (r < 0)
676 return r;
677
678 //set up initial reads
679 for (int i = 0; i < concurrentios; ++i) {
680 name[i] = generate_object_name_fast(i / reads_per_object, pid);
681 contents[i] = std::make_unique<bufferlist>();
682 }
683
684 lock.lock();
685 data.finished = 0;
686 data.start_time = mono_clock::now();
687 lock.unlock();
688
689 pthread_t print_thread;
690 pthread_create(&print_thread, NULL, status_printer, (void *)this);
691 ceph_pthread_setname(print_thread, "seq_read_stat");
692
693 mono_time finish_time = data.start_time + time_to_run;
694 //start initial reads
695 for (int i = 0; i < concurrentios; ++i) {
696 index[i] = i;
697 start_times[i] = mono_clock::now();
698 create_completion(i, _aio_cb, (void *)&lc);
699 r = aio_read(name[i], i, contents[i].get(), data.op_size,
700 data.op_size * (i % reads_per_object));
701 if (r < 0) {
702 cerr << "r = " << r << std::endl;
703 goto ERR;
704 }
705 lock.lock();
706 ++data.started;
707 ++data.in_flight;
708 lock.unlock();
709 }
710
711 //keep on adding new reads as old ones complete
712 int slot;
713 bufferlist *cur_contents;
714
715 slot = 0;
716 while ((seconds_to_run && mono_clock::now() < finish_time) &&
717 num_objects > data.started) {
718 lock.lock();
719 int old_slot = slot;
720 bool found = false;
721 while (1) {
722 do {
723 if (completion_is_done(slot)) {
724 found = true;
725 break;
726 }
727 slot++;
728 if (slot == concurrentios) {
729 slot = 0;
730 }
731 } while (slot != old_slot);
732 if (found) {
733 break;
734 }
735 lc.cond.Wait(lock);
736 }
737
738 // calculate latency here, so memcmp doesn't inflate it
739 data.cur_latency = mono_clock::now() - start_times[slot];
740
741 cur_contents = contents[slot].get();
742 int current_index = index[slot];
743
744 // invalidate internal crc cache
745 cur_contents->invalidate_crc();
746
747 if (!no_verify) {
748 snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", current_index);
749 if ( (cur_contents->length() != data.op_size) ||
750 (memcmp(data.object_contents, cur_contents->c_str(), data.op_size) != 0) ) {
751 cerr << name[slot] << " is not correct!" << std::endl;
752 ++errors;
753 }
754 }
755
756 newName = generate_object_name_fast(data.started / reads_per_object, pid);
757 index[slot] = data.started;
758 lock.unlock();
759 completion_wait(slot);
760 lock.lock();
761 r = completion_ret(slot);
762 if (r < 0) {
763 cerr << "read got " << r << std::endl;
764 lock.unlock();
765 goto ERR;
766 }
767 total_latency += data.cur_latency.count();
768 if (data.cur_latency.count() > data.max_latency)
769 data.max_latency = data.cur_latency.count();
770 if (data.cur_latency.count() < data.min_latency)
771 data.min_latency = data.cur_latency.count();
772 ++data.finished;
773 data.avg_latency = total_latency / data.finished;
774 --data.in_flight;
775 lock.unlock();
776 release_completion(slot);
777
778 //start new read and check data if requested
779 start_times[slot] = mono_clock::now();
780 create_completion(slot, _aio_cb, (void *)&lc);
781 r = aio_read(newName, slot, contents[slot].get(), data.op_size,
782 data.op_size * (data.started % reads_per_object));
783 if (r < 0) {
784 goto ERR;
785 }
786 lock.lock();
787 ++data.started;
788 ++data.in_flight;
789 lock.unlock();
790 name[slot] = newName;
791 }
792
793 //wait for final reads to complete
794 while (data.finished < data.started) {
795 slot = data.finished % concurrentios;
796 completion_wait(slot);
797 lock.lock();
798 r = completion_ret(slot);
799 if (r < 0) {
800 cerr << "read got " << r << std::endl;
801 lock.unlock();
802 goto ERR;
803 }
804 data.cur_latency = mono_clock::now() - start_times[slot];
805 total_latency += data.cur_latency.count();
806 if (data.cur_latency.count() > data.max_latency)
807 data.max_latency = data.cur_latency.count();
808 if (data.cur_latency.count() < data.min_latency)
809 data.min_latency = data.cur_latency.count();
810 ++data.finished;
811 data.avg_latency = total_latency / data.finished;
812 --data.in_flight;
813 release_completion(slot);
814 if (!no_verify) {
815 snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", index[slot]);
816 lock.unlock();
817 if ((contents[slot]->length() != data.op_size) ||
818 (memcmp(data.object_contents, contents[slot]->c_str(), data.op_size) != 0)) {
819 cerr << name[slot] << " is not correct!" << std::endl;
820 ++errors;
821 }
822 } else {
823 lock.unlock();
824 }
825 }
826
827 timePassed = mono_clock::now() - data.start_time;
828 lock.lock();
829 data.done = true;
830 lock.unlock();
831
832 pthread_join(print_thread, NULL);
833
834 double bandwidth;
835 bandwidth = ((double)data.finished)*((double)data.op_size)/timePassed.count();
836 bandwidth = bandwidth/(1024*1024); // we want it in MB/sec
837
838 double iops_stddev;
839 if (data.idata.iops_cycles > 1) {
840 iops_stddev = std::sqrt(data.idata.iops_diff_sum / (data.idata.iops_cycles - 1));
841 } else {
842 iops_stddev = 0;
843 }
844
845 if (!formatter) {
846 out(cout) << "Total time run: " << timePassed.count() << std::endl
847 << "Total reads made: " << data.finished << std::endl
848 << "Read size: " << data.op_size << std::endl
849 << "Object size: " << data.object_size << std::endl
850 << "Bandwidth (MB/sec): " << setprecision(6) << bandwidth << std::endl
851 << "Average IOPS: " << (int)(data.finished/timePassed.count()) << std::endl
852 << "Stddev IOPS: " << iops_stddev << std::endl
853 << "Max IOPS: " << data.idata.max_iops << std::endl
854 << "Min IOPS: " << data.idata.min_iops << std::endl
855 << "Average Latency(s): " << data.avg_latency << std::endl
856 << "Max latency(s): " << data.max_latency << std::endl
857 << "Min latency(s): " << data.min_latency << std::endl;
858 } else {
859 formatter->dump_format("total_time_run", "%f", timePassed.count());
860 formatter->dump_format("total_reads_made", "%d", data.finished);
861 formatter->dump_format("read_size", "%d", data.op_size);
862 formatter->dump_format("object_size", "%d", data.object_size);
863 formatter->dump_format("bandwidth", "%f", bandwidth);
864 formatter->dump_format("average_iops", "%d", (int)(data.finished/timePassed.count()));
865 formatter->dump_format("stddev_iops", "%f", iops_stddev);
866 formatter->dump_format("max_iops", "%d", data.idata.max_iops);
867 formatter->dump_format("min_iops", "%d", data.idata.min_iops);
868 formatter->dump_format("average_latency", "%f", data.avg_latency);
869 formatter->dump_format("max_latency", "%f", data.max_latency);
870 formatter->dump_format("min_latency", "%f", data.min_latency);
871 }
872
873 completions_done();
874
875 return (errors > 0 ? -EIO : 0);
876
877 ERR:
878 lock.lock();
879 data.done = 1;
880 lock.unlock();
881 pthread_join(print_thread, NULL);
882 return r;
883 }
884
885 int ObjBencher::rand_read_bench(int seconds_to_run, int num_objects, int concurrentios, int pid, bool no_verify)
886 {
887 lock_cond lc(&lock);
888
889 if (concurrentios <= 0)
890 return -EINVAL;
891
892 std::vector<string> name(concurrentios);
893 std::string newName;
894 unique_ptr<bufferlist> contents[concurrentios];
895 int index[concurrentios];
896 int errors = 0;
897 int r = 0;
898 double total_latency = 0;
899 std::vector<mono_time> start_times(concurrentios);
900 mono_clock::duration time_to_run = std::chrono::seconds(seconds_to_run);
901 std::chrono::duration<double> timePassed;
902 sanitize_object_contents(&data, data.op_size); //clean it up once; subsequent
903 //changes will be safe because string length should remain the same
904
905 unsigned reads_per_object = 1;
906 if (data.op_size)
907 reads_per_object = data.object_size / data.op_size;
908
909 srand (time(NULL));
910
911 r = completions_init(concurrentios);
912 if (r < 0)
913 return r;
914
915 //set up initial reads
916 for (int i = 0; i < concurrentios; ++i) {
917 name[i] = generate_object_name_fast(i / reads_per_object, pid);
918 contents[i] = std::make_unique<bufferlist>();
919 }
920
921 lock.lock();
922 data.finished = 0;
923 data.start_time = mono_clock::now();
924 lock.unlock();
925
926 pthread_t print_thread;
927 pthread_create(&print_thread, NULL, status_printer, (void *)this);
928 ceph_pthread_setname(print_thread, "rand_read_stat");
929
930 mono_time finish_time = data.start_time + time_to_run;
931 //start initial reads
932 for (int i = 0; i < concurrentios; ++i) {
933 index[i] = i;
934 start_times[i] = mono_clock::now();
935 create_completion(i, _aio_cb, (void *)&lc);
936 r = aio_read(name[i], i, contents[i].get(), data.op_size,
937 data.op_size * (i % reads_per_object));
938 if (r < 0) {
939 cerr << "r = " << r << std::endl;
940 goto ERR;
941 }
942 lock.lock();
943 ++data.started;
944 ++data.in_flight;
945 lock.unlock();
946 }
947
948 //keep on adding new reads as old ones complete
949 int slot;
950 bufferlist *cur_contents;
951 int rand_id;
952
953 slot = 0;
954 while ((seconds_to_run && mono_clock::now() < finish_time)) {
955 lock.lock();
956 int old_slot = slot;
957 bool found = false;
958 while (1) {
959 do {
960 if (completion_is_done(slot)) {
961 found = true;
962 break;
963 }
964 slot++;
965 if (slot == concurrentios) {
966 slot = 0;
967 }
968 } while (slot != old_slot);
969 if (found) {
970 break;
971 }
972 lc.cond.Wait(lock);
973 }
974
975 // calculate latency here, so memcmp doesn't inflate it
976 data.cur_latency = mono_clock::now() - start_times[slot];
977
978 lock.unlock();
979
980 int current_index = index[slot];
981 cur_contents = contents[slot].get();
982 completion_wait(slot);
983 lock.lock();
984 r = completion_ret(slot);
985 if (r < 0) {
986 cerr << "read got " << r << std::endl;
987 lock.unlock();
988 goto ERR;
989 }
990
991 total_latency += data.cur_latency.count();
992 if (data.cur_latency.count() > data.max_latency)
993 data.max_latency = data.cur_latency.count();
994 if (data.cur_latency.count() < data.min_latency)
995 data.min_latency = data.cur_latency.count();
996 ++data.finished;
997 data.avg_latency = total_latency / data.finished;
998 --data.in_flight;
999 lock.unlock();
1000
1001 if (!no_verify) {
1002 snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", current_index);
1003 if ((cur_contents->length() != data.op_size) ||
1004 (memcmp(data.object_contents, cur_contents->c_str(), data.op_size) != 0)) {
1005 cerr << name[slot] << " is not correct!" << std::endl;
1006 ++errors;
1007 }
1008 }
1009
1010 rand_id = rand() % num_objects;
1011 newName = generate_object_name_fast(rand_id / reads_per_object, pid);
1012 index[slot] = rand_id;
1013 release_completion(slot);
1014
1015 // invalidate internal crc cache
1016 cur_contents->invalidate_crc();
1017
1018 //start new read and check data if requested
1019 start_times[slot] = mono_clock::now();
1020 create_completion(slot, _aio_cb, (void *)&lc);
1021 r = aio_read(newName, slot, contents[slot].get(), data.op_size,
1022 data.op_size * (rand_id % reads_per_object));
1023 if (r < 0) {
1024 goto ERR;
1025 }
1026 lock.lock();
1027 ++data.started;
1028 ++data.in_flight;
1029 lock.unlock();
1030 name[slot] = newName;
1031 }
1032
1033
1034 //wait for final reads to complete
1035 while (data.finished < data.started) {
1036 slot = data.finished % concurrentios;
1037 completion_wait(slot);
1038 lock.lock();
1039 r = completion_ret(slot);
1040 if (r < 0) {
1041 cerr << "read got " << r << std::endl;
1042 lock.unlock();
1043 goto ERR;
1044 }
1045 data.cur_latency = mono_clock::now() - start_times[slot];
1046 total_latency += data.cur_latency.count();
1047 if (data.cur_latency.count() > data.max_latency)
1048 data.max_latency = data.cur_latency.count();
1049 if (data.cur_latency.count() < data.min_latency)
1050 data.min_latency = data.cur_latency.count();
1051 ++data.finished;
1052 data.avg_latency = total_latency / data.finished;
1053 --data.in_flight;
1054 release_completion(slot);
1055 if (!no_verify) {
1056 snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", index[slot]);
1057 lock.unlock();
1058 if ((contents[slot]->length() != data.op_size) ||
1059 (memcmp(data.object_contents, contents[slot]->c_str(), data.op_size) != 0)) {
1060 cerr << name[slot] << " is not correct!" << std::endl;
1061 ++errors;
1062 }
1063 } else {
1064 lock.unlock();
1065 }
1066 }
1067
1068 timePassed = mono_clock::now() - data.start_time;
1069 lock.lock();
1070 data.done = true;
1071 lock.unlock();
1072
1073 pthread_join(print_thread, NULL);
1074
1075 double bandwidth;
1076 bandwidth = ((double)data.finished)*((double)data.op_size)/timePassed.count();
1077 bandwidth = bandwidth/(1024*1024); // we want it in MB/sec
1078
1079 double iops_stddev;
1080 if (data.idata.iops_cycles > 1) {
1081 iops_stddev = std::sqrt(data.idata.iops_diff_sum / (data.idata.iops_cycles - 1));
1082 } else {
1083 iops_stddev = 0;
1084 }
1085
1086 if (!formatter) {
1087 out(cout) << "Total time run: " << timePassed.count() << std::endl
1088 << "Total reads made: " << data.finished << std::endl
1089 << "Read size: " << data.op_size << std::endl
1090 << "Object size: " << data.object_size << std::endl
1091 << "Bandwidth (MB/sec): " << setprecision(6) << bandwidth << std::endl
1092 << "Average IOPS: " << (int)(data.finished/timePassed.count()) << std::endl
1093 << "Stddev IOPS: " << iops_stddev << std::endl
1094 << "Max IOPS: " << data.idata.max_iops << std::endl
1095 << "Min IOPS: " << data.idata.min_iops << std::endl
1096 << "Average Latency(s): " << data.avg_latency << std::endl
1097 << "Max latency(s): " << data.max_latency << std::endl
1098 << "Min latency(s): " << data.min_latency << std::endl;
1099 } else {
1100 formatter->dump_format("total_time_run", "%f", timePassed.count());
1101 formatter->dump_format("total_reads_made", "%d", data.finished);
1102 formatter->dump_format("read_size", "%d", data.op_size);
1103 formatter->dump_format("object_size", "%d", data.object_size);
1104 formatter->dump_format("bandwidth", "%f", bandwidth);
1105 formatter->dump_format("average_iops", "%d", (int)(data.finished/timePassed.count()));
1106 formatter->dump_format("stddev_iops", "%f", iops_stddev);
1107 formatter->dump_format("max_iops", "%d", data.idata.max_iops);
1108 formatter->dump_format("min_iops", "%d", data.idata.min_iops);
1109 formatter->dump_format("average_latency", "%f", data.avg_latency);
1110 formatter->dump_format("max_latency", "%f", data.max_latency);
1111 formatter->dump_format("min_latency", "%f", data.min_latency);
1112 }
1113 completions_done();
1114
1115 return (errors > 0 ? -EIO : 0);
1116
1117 ERR:
1118 lock.lock();
1119 data.done = 1;
1120 lock.unlock();
1121 pthread_join(print_thread, NULL);
1122 return r;
1123 }
1124
1125 int ObjBencher::clean_up(const std::string& orig_prefix, int concurrentios, const std::string& run_name) {
1126 int r = 0;
1127 uint64_t op_size, object_size;
1128 int num_objects;
1129 int prevPid;
1130
1131 // default meta object if user does not specify one
1132 const std::string run_name_meta = (run_name.empty() ? BENCH_LASTRUN_METADATA : run_name);
1133 const std::string prefix = (orig_prefix.empty() ? generate_object_prefix_nopid() : orig_prefix);
1134
1135 if (prefix.substr(0, BENCH_PREFIX.length()) != BENCH_PREFIX) {
1136 cerr << "Specified --prefix invalid, it must begin with \"" << BENCH_PREFIX << "\"" << std::endl;
1137 return -EINVAL;
1138 }
1139
1140 std::list<Object> unfiltered_objects;
1141 std::set<std::string> meta_namespaces, all_namespaces;
1142
1143 // If caller set all_nspaces this will be searching
1144 // across multiple namespaces.
1145 while (true) {
1146 bool objects_remain = get_objects(&unfiltered_objects, 20);
1147 if (!objects_remain)
1148 break;
1149
1150 std::list<Object>::const_iterator i = unfiltered_objects.begin();
1151 for ( ; i != unfiltered_objects.end(); ++i) {
1152 if (i->first == run_name_meta) {
1153 meta_namespaces.insert(i->second);
1154 }
1155 if (i->first.substr(0, prefix.length()) == prefix) {
1156 all_namespaces.insert(i->second);
1157 }
1158 }
1159 }
1160
1161 std::set<std::string>::const_iterator i = all_namespaces.begin();
1162 for ( ; i != all_namespaces.end(); ++i) {
1163 set_namespace(*i);
1164
1165 // if no metadata file found we should try to do a linear search on the prefix
1166 if (meta_namespaces.find(*i) == meta_namespaces.end()) {
1167 int r = clean_up_slow(prefix, concurrentios);
1168 if (r < 0) {
1169 cerr << "clean_up_slow error r= " << r << std::endl;
1170 return r;
1171 }
1172 continue;
1173 }
1174
1175 r = fetch_bench_metadata(run_name_meta, &op_size, &object_size, &num_objects, &prevPid);
1176 if (r < 0) {
1177 return r;
1178 }
1179
1180 r = clean_up(num_objects, prevPid, concurrentios);
1181 if (r != 0) return r;
1182
1183 r = sync_remove(run_name_meta);
1184 if (r != 0) return r;
1185 }
1186
1187 return 0;
1188 }
1189
1190 int ObjBencher::clean_up(int num_objects, int prevPid, int concurrentios) {
1191 lock_cond lc(&lock);
1192
1193 if (concurrentios <= 0)
1194 return -EINVAL;
1195
1196 std::vector<string> name(concurrentios);
1197 std::string newName;
1198 int r = 0;
1199 int slot = 0;
1200
1201 lock.lock();
1202 data.done = false;
1203 data.in_flight = 0;
1204 data.started = 0;
1205 data.finished = 0;
1206 lock.unlock();
1207
1208 // don't start more completions than files
1209 if (num_objects == 0) {
1210 return 0;
1211 } else if (num_objects < concurrentios) {
1212 concurrentios = num_objects;
1213 }
1214
1215 r = completions_init(concurrentios);
1216 if (r < 0)
1217 return r;
1218
1219 //set up initial removes
1220 for (int i = 0; i < concurrentios; ++i) {
1221 name[i] = generate_object_name_fast(i, prevPid);
1222 }
1223
1224 //start initial removes
1225 for (int i = 0; i < concurrentios; ++i) {
1226 create_completion(i, _aio_cb, (void *)&lc);
1227 r = aio_remove(name[i], i);
1228 if (r < 0) { //naughty, doesn't clean up heap
1229 cerr << "r = " << r << std::endl;
1230 goto ERR;
1231 }
1232 lock.lock();
1233 ++data.started;
1234 ++data.in_flight;
1235 lock.unlock();
1236 }
1237
1238 //keep on adding new removes as old ones complete
1239 while (data.started < num_objects) {
1240 lock.lock();
1241 int old_slot = slot;
1242 bool found = false;
1243 while (1) {
1244 do {
1245 if (completion_is_done(slot)) {
1246 found = true;
1247 break;
1248 }
1249 slot++;
1250 if (slot == concurrentios) {
1251 slot = 0;
1252 }
1253 } while (slot != old_slot);
1254 if (found) {
1255 break;
1256 }
1257 lc.cond.Wait(lock);
1258 }
1259 lock.unlock();
1260 newName = generate_object_name_fast(data.started, prevPid);
1261 completion_wait(slot);
1262 lock.lock();
1263 r = completion_ret(slot);
1264 if (r != 0 && r != -ENOENT) { // file does not exist
1265 cerr << "remove got " << r << std::endl;
1266 lock.unlock();
1267 goto ERR;
1268 }
1269 ++data.finished;
1270 --data.in_flight;
1271 lock.unlock();
1272 release_completion(slot);
1273
1274 //start new remove and check data if requested
1275 create_completion(slot, _aio_cb, (void *)&lc);
1276 r = aio_remove(newName, slot);
1277 if (r < 0) {
1278 goto ERR;
1279 }
1280 lock.lock();
1281 ++data.started;
1282 ++data.in_flight;
1283 lock.unlock();
1284 name[slot] = newName;
1285 }
1286
1287 //wait for final removes to complete
1288 while (data.finished < data.started) {
1289 slot = data.finished % concurrentios;
1290 completion_wait(slot);
1291 lock.lock();
1292 r = completion_ret(slot);
1293 if (r != 0 && r != -ENOENT) { // file does not exist
1294 cerr << "remove got " << r << std::endl;
1295 lock.unlock();
1296 goto ERR;
1297 }
1298 ++data.finished;
1299 --data.in_flight;
1300 release_completion(slot);
1301 lock.unlock();
1302 }
1303
1304 lock.lock();
1305 data.done = true;
1306 lock.unlock();
1307
1308 completions_done();
1309
1310 out(cout) << "Removed " << data.finished << " object" << (data.finished != 1 ? "s" : "") << std::endl;
1311
1312 return 0;
1313
1314 ERR:
1315 lock.lock();
1316 data.done = 1;
1317 lock.unlock();
1318 return r;
1319 }
1320
1321 /**
1322 * Return objects from the datastore which match a prefix.
1323 *
1324 * Clears the list and populates it with any objects which match the
1325 * prefix. The list is guaranteed to have at least one item when the
1326 * function returns true.
1327 *
1328 * @param prefix the prefix to match against
1329 * @param objects [out] return list of objects
1330 * @returns true if there are any objects in the store which match
1331 * the prefix, false if there are no more
1332 */
1333 bool ObjBencher::more_objects_matching_prefix(const std::string& prefix, std::list<Object>* objects) {
1334 std::list<Object> unfiltered_objects;
1335
1336 objects->clear();
1337
1338 while (objects->empty()) {
1339 bool objects_remain = get_objects(&unfiltered_objects, 20);
1340 if (!objects_remain)
1341 return false;
1342
1343 std::list<Object>::const_iterator i = unfiltered_objects.begin();
1344 for ( ; i != unfiltered_objects.end(); ++i) {
1345 if (i->first.substr(0, prefix.length()) == prefix) {
1346 objects->push_back(*i);
1347 }
1348 }
1349 }
1350
1351 return true;
1352 }
1353
1354 int ObjBencher::clean_up_slow(const std::string& prefix, int concurrentios) {
1355 lock_cond lc(&lock);
1356
1357 if (concurrentios <= 0)
1358 return -EINVAL;
1359
1360 std::vector<Object> name(concurrentios);
1361 Object newName;
1362 int r = 0;
1363 int slot = 0;
1364 std::list<Object> objects;
1365 bool objects_remain = true;
1366
1367 lock.lock();
1368 data.done = false;
1369 data.in_flight = 0;
1370 data.started = 0;
1371 data.finished = 0;
1372 lock.unlock();
1373
1374 out(cout) << "Warning: using slow linear search" << std::endl;
1375
1376 r = completions_init(concurrentios);
1377 if (r < 0)
1378 return r;
1379
1380 //set up initial removes
1381 for (int i = 0; i < concurrentios; ++i) {
1382 if (objects.empty()) {
1383 // if there are fewer objects than concurrent ios, don't generate extras
1384 bool objects_found = more_objects_matching_prefix(prefix, &objects);
1385 if (!objects_found) {
1386 concurrentios = i;
1387 objects_remain = false;
1388 break;
1389 }
1390 }
1391
1392 name[i] = objects.front();
1393 objects.pop_front();
1394 }
1395
1396 //start initial removes
1397 for (int i = 0; i < concurrentios; ++i) {
1398 create_completion(i, _aio_cb, (void *)&lc);
1399 set_namespace(name[i].second);
1400 r = aio_remove(name[i].first, i);
1401 if (r < 0) { //naughty, doesn't clean up heap
1402 cerr << "r = " << r << std::endl;
1403 goto ERR;
1404 }
1405 lock.lock();
1406 ++data.started;
1407 ++data.in_flight;
1408 lock.unlock();
1409 }
1410
1411 //keep on adding new removes as old ones complete
1412 while (objects_remain) {
1413 lock.lock();
1414 int old_slot = slot;
1415 bool found = false;
1416 while (1) {
1417 do {
1418 if (completion_is_done(slot)) {
1419 found = true;
1420 break;
1421 }
1422 slot++;
1423 if (slot == concurrentios) {
1424 slot = 0;
1425 }
1426 } while (slot != old_slot);
1427 if (found) {
1428 break;
1429 }
1430 lc.cond.Wait(lock);
1431 }
1432 lock.unlock();
1433
1434 // get more objects if necessary
1435 if (objects.empty()) {
1436 objects_remain = more_objects_matching_prefix(prefix, &objects);
1437 // quit if there are no more
1438 if (!objects_remain) {
1439 break;
1440 }
1441 }
1442
1443 // get the next object
1444 newName = objects.front();
1445 objects.pop_front();
1446
1447 completion_wait(slot);
1448 lock.lock();
1449 r = completion_ret(slot);
1450 if (r != 0 && r != -ENOENT) { // file does not exist
1451 cerr << "remove got " << r << std::endl;
1452 lock.unlock();
1453 goto ERR;
1454 }
1455 ++data.finished;
1456 --data.in_flight;
1457 lock.unlock();
1458 release_completion(slot);
1459
1460 //start new remove and check data if requested
1461 create_completion(slot, _aio_cb, (void *)&lc);
1462 set_namespace(newName.second);
1463 r = aio_remove(newName.first, slot);
1464 if (r < 0) {
1465 goto ERR;
1466 }
1467 lock.lock();
1468 ++data.started;
1469 ++data.in_flight;
1470 lock.unlock();
1471 name[slot] = newName;
1472 }
1473
1474 //wait for final removes to complete
1475 while (data.finished < data.started) {
1476 slot = data.finished % concurrentios;
1477 completion_wait(slot);
1478 lock.lock();
1479 r = completion_ret(slot);
1480 if (r != 0 && r != -ENOENT) { // file does not exist
1481 cerr << "remove got " << r << std::endl;
1482 lock.unlock();
1483 goto ERR;
1484 }
1485 ++data.finished;
1486 --data.in_flight;
1487 release_completion(slot);
1488 lock.unlock();
1489 }
1490
1491 lock.lock();
1492 data.done = true;
1493 lock.unlock();
1494
1495 completions_done();
1496
1497 out(cout) << "Removed " << data.finished << " object" << (data.finished != 1 ? "s" : "") << std::endl;
1498
1499 return 0;
1500
1501 ERR:
1502 lock.lock();
1503 data.done = 1;
1504 lock.unlock();
1505 return -EIO;
1506 }