]> git.proxmox.com Git - ceph.git/blame - ceph/src/common/obj_bencher.cc
update sources to 12.2.7
[ceph.git] / ceph / src / common / obj_bencher.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2009 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 * Series of functions to test your rados installation. Notice
14 * that this code is not terribly robust -- for instance, if you
15 * try and bench on a pool you don't have permission to access
16 * it will just loop forever.
17 */
18#include "include/compat.h"
19#include <pthread.h>
20#include "common/Cond.h"
21#include "obj_bencher.h"
22
7c673cae
FG
23const std::string BENCH_LASTRUN_METADATA = "benchmark_last_metadata";
24const std::string BENCH_PREFIX = "benchmark_data";
25static char cached_hostname[30] = {0};
26int cached_pid = 0;
27
28static std::string generate_object_prefix_nopid() {
29 if (cached_hostname[0] == 0) {
30 gethostname(cached_hostname, sizeof(cached_hostname)-1);
31 cached_hostname[sizeof(cached_hostname)-1] = 0;
32 }
33
34 std::ostringstream oss;
35 oss << BENCH_PREFIX << "_" << cached_hostname;
36 return oss.str();
37}
38
39static std::string generate_object_prefix(int pid = 0) {
40 if (pid)
41 cached_pid = pid;
42 else if (!cached_pid)
43 cached_pid = getpid();
44
45 std::ostringstream oss;
46 oss << generate_object_prefix_nopid() << "_" << cached_pid;
47 return oss.str();
48}
49
50static std::string generate_object_name(int objnum, int pid = 0)
51{
52 std::ostringstream oss;
53 oss << generate_object_prefix(pid) << "_object" << objnum;
54 return oss.str();
55}
56
57static void sanitize_object_contents (bench_data *data, size_t length) {
58 memset(data->object_contents, 'z', length);
59}
60
61ostream& ObjBencher::out(ostream& os, utime_t& t)
62{
63 if (show_time)
64 return t.localtime(os) << " ";
65 else
66 return os;
67}
68
69ostream& ObjBencher::out(ostream& os)
70{
71 utime_t cur_time = ceph_clock_now();
72 return out(os, cur_time);
73}
74
75void *ObjBencher::status_printer(void *_bencher) {
76 ObjBencher *bencher = static_cast<ObjBencher *>(_bencher);
77 bench_data& data = bencher->data;
78 Formatter *formatter = bencher->formatter;
79 ostream *outstream = bencher->outstream;
80 Cond cond;
81 int i = 0;
82 int previous_writes = 0;
83 int cycleSinceChange = 0;
84 double bandwidth;
85 int iops;
86 utime_t ONE_SECOND;
87 ONE_SECOND.set_from_double(1.0);
88 bencher->lock.Lock();
89 if (formatter)
90 formatter->open_array_section("datas");
91 while(!data.done) {
92 utime_t cur_time = ceph_clock_now();
93
94 if (i % 20 == 0 && !formatter) {
95 if (i > 0)
96 cur_time.localtime(cout) << " min lat: " << data.min_latency
97 << " max lat: " << data.max_latency
98 << " avg lat: " << data.avg_latency << std::endl;
99 //I'm naughty and don't reset the fill
100 bencher->out(cout, cur_time) << setfill(' ')
101 << setw(5) << "sec"
102 << setw(8) << "Cur ops"
103 << setw(10) << "started"
104 << setw(10) << "finished"
105 << setw(10) << "avg MB/s"
106 << setw(10) << "cur MB/s"
107 << setw(12) << "last lat(s)"
108 << setw(12) << "avg lat(s)" << std::endl;
109 }
110 if (cycleSinceChange)
111 bandwidth = (double)(data.finished - previous_writes)
112 * (data.op_size)
113 / (1024*1024)
114 / cycleSinceChange;
115 else
116 bandwidth = -1;
117
118 if (!std::isnan(bandwidth) && bandwidth > -1) {
119 if (bandwidth > data.idata.max_bandwidth)
120 data.idata.max_bandwidth = bandwidth;
121 if (bandwidth < data.idata.min_bandwidth)
122 data.idata.min_bandwidth = bandwidth;
123
124 data.history.bandwidth.push_back(bandwidth);
125 }
126
127 if (cycleSinceChange)
128 iops = (double)(data.finished - previous_writes)
129 / cycleSinceChange;
130 else
131 iops = -1;
132
133 if (!std::isnan(iops) && iops > -1) {
134 if (iops > data.idata.max_iops)
135 data.idata.max_iops = iops;
136 if (iops < data.idata.min_iops)
137 data.idata.min_iops = iops;
138
139 data.history.iops.push_back(iops);
140 }
141
142 if (formatter)
143 formatter->open_object_section("data");
144
145 double avg_bandwidth = (double) (data.op_size) * (data.finished)
146 / (double)(cur_time - data.start_time) / (1024*1024);
147 if (previous_writes != data.finished) {
148 previous_writes = data.finished;
149 cycleSinceChange = 0;
150 if (!formatter) {
151 bencher->out(cout, cur_time)
152 << setfill(' ')
153 << setw(5) << i
154 << ' ' << setw(7) << data.in_flight
155 << ' ' << setw(9) << data.started
156 << ' ' << setw(9) << data.finished
157 << ' ' << setw(9) << avg_bandwidth
158 << ' ' << setw(9) << bandwidth
159 << ' ' << setw(11) << (double)data.cur_latency
160 << ' ' << setw(11) << data.avg_latency << std::endl;
161 } else {
162 formatter->dump_format("sec", "%d", i);
163 formatter->dump_format("cur_ops", "%d", data.in_flight);
164 formatter->dump_format("started", "%d", data.started);
165 formatter->dump_format("finished", "%d", data.finished);
166 formatter->dump_format("avg_bw", "%f", avg_bandwidth);
167 formatter->dump_format("cur_bw", "%f", bandwidth);
168 formatter->dump_format("last_lat", "%f", (double)data.cur_latency);
169 formatter->dump_format("avg_lat", "%f", data.avg_latency);
170 }
171 }
172 else {
173 if (!formatter) {
174 bencher->out(cout, cur_time)
175 << setfill(' ')
176 << setw(5) << i
177 << ' ' << setw(7) << data.in_flight
178 << ' ' << setw(9) << data.started
179 << ' ' << setw(9) << data.finished
180 << ' ' << setw(9) << avg_bandwidth
181 << ' ' << setw(9) << '0'
182 << ' ' << setw(11) << '-'
183 << ' '<< setw(11) << data.avg_latency << std::endl;
184 } else {
185 formatter->dump_format("sec", "%d", i);
186 formatter->dump_format("cur_ops", "%d", data.in_flight);
187 formatter->dump_format("started", "%d", data.started);
188 formatter->dump_format("finished", "%d", data.finished);
189 formatter->dump_format("avg_bw", "%f", avg_bandwidth);
190 formatter->dump_format("cur_bw", "%f", 0);
191 formatter->dump_format("last_lat", "%f", 0);
192 formatter->dump_format("avg_lat", "%f", data.avg_latency);
193 }
194 }
195 if (formatter) {
196 formatter->close_section(); // data
197 formatter->flush(*outstream);
198 }
199 ++i;
200 ++cycleSinceChange;
201 cond.WaitInterval(bencher->lock, ONE_SECOND);
202 }
203 if (formatter)
204 formatter->close_section(); //datas
205 bencher->lock.Unlock();
206 return NULL;
207}
208
209int ObjBencher::aio_bench(
210 int operation, int secondsToRun,
211 int concurrentios,
212 uint64_t op_size, uint64_t object_size,
213 unsigned max_objects,
214 bool cleanup, bool hints,
215 const std::string& run_name, bool no_verify) {
216
217 if (concurrentios <= 0)
218 return -EINVAL;
219
220 int num_objects = 0;
221 int r = 0;
222 int prevPid = 0;
223 utime_t runtime;
224
225 // default metadata object is used if user does not specify one
226 const std::string run_name_meta = (run_name.empty() ? BENCH_LASTRUN_METADATA : run_name);
227
228 //get data from previous write run, if available
229 if (operation != OP_WRITE) {
230 uint64_t prev_op_size, prev_object_size;
231 r = fetch_bench_metadata(run_name_meta, &prev_op_size, &prev_object_size,
232 &num_objects, &prevPid);
233 if (r < 0) {
234 if (r == -ENOENT)
235 cerr << "Must write data before running a read benchmark!" << std::endl;
236 return r;
237 }
238 object_size = prev_object_size;
239 op_size = prev_op_size;
240 }
241
242 char* contentsChars = new char[op_size];
243 lock.Lock();
244 data.done = false;
245 data.hints = hints;
246 data.object_size = object_size;
247 data.op_size = op_size;
248 data.in_flight = 0;
249 data.started = 0;
250 data.finished = 0;
251 data.min_latency = 9999.0; // this better be higher than initial latency!
252 data.max_latency = 0;
253 data.avg_latency = 0;
254 data.object_contents = contentsChars;
255 lock.Unlock();
256
257 //fill in contentsChars deterministically so we can check returns
258 sanitize_object_contents(&data, data.op_size);
259
260 if (formatter)
261 formatter->open_object_section("bench");
262
263 if (OP_WRITE == operation) {
264 r = write_bench(secondsToRun, concurrentios, run_name_meta, max_objects);
265 if (r != 0) goto out;
266 }
267 else if (OP_SEQ_READ == operation) {
268 r = seq_read_bench(secondsToRun, num_objects, concurrentios, prevPid, no_verify);
269 if (r != 0) goto out;
270 }
271 else if (OP_RAND_READ == operation) {
272 r = rand_read_bench(secondsToRun, num_objects, concurrentios, prevPid, no_verify);
273 if (r != 0) goto out;
274 }
275
276 if (OP_WRITE == operation && cleanup) {
277 r = fetch_bench_metadata(run_name_meta, &op_size, &object_size,
278 &num_objects, &prevPid);
279 if (r < 0) {
280 if (r == -ENOENT)
281 cerr << "Should never happen: bench metadata missing for current run!" << std::endl;
282 goto out;
283 }
284
285 data.start_time = ceph_clock_now();
286 out(cout) << "Cleaning up (deleting benchmark objects)" << std::endl;
287
288 r = clean_up(num_objects, prevPid, concurrentios);
289 if (r != 0) goto out;
290
291 runtime = ceph_clock_now() - data.start_time;
292 out(cout) << "Clean up completed and total clean up time :" << runtime << std::endl;
293
294 // lastrun file
295 r = sync_remove(run_name_meta);
296 if (r != 0) goto out;
297 }
298
299 out:
300 if (formatter) {
301 formatter->close_section(); // bench
302 formatter->flush(*outstream);
303 *outstream << std::endl;
304 }
305 delete[] contentsChars;
306 return r;
307}
308
309struct lock_cond {
310 explicit lock_cond(Mutex *_lock) : lock(_lock) {}
311 Mutex *lock;
312 Cond cond;
313};
314
315void _aio_cb(void *cb, void *arg) {
316 struct lock_cond *lc = (struct lock_cond *)arg;
317 lc->lock->Lock();
318 lc->cond.Signal();
319 lc->lock->Unlock();
320}
321
322template<class T>
323static T vec_stddev(vector<T>& v)
324{
325 T mean = 0;
326
327 if (v.size() < 2)
328 return 0;
329
330 typename vector<T>::iterator iter;
331 for (iter = v.begin(); iter != v.end(); ++iter) {
332 mean += *iter;
333 }
334
335 mean /= v.size();
336
337 T stddev = 0;
338 for (iter = v.begin(); iter != v.end(); ++iter) {
339 T dev = *iter - mean;
340 dev *= dev;
341 stddev += dev;
342 }
343 stddev /= (v.size() - 1);
344 return sqrt(stddev);
345}
346
347int ObjBencher::fetch_bench_metadata(const std::string& metadata_file,
348 uint64_t *op_size, uint64_t* object_size,
349 int* num_objects, int* prevPid) {
350 int r = 0;
351 bufferlist object_data;
352
353 r = sync_read(metadata_file, object_data,
354 sizeof(int) * 2 + sizeof(size_t) * 2);
355 if (r <= 0) {
356 // treat an empty file as a file that does not exist
357 if (r == 0) {
358 r = -ENOENT;
359 }
360 return r;
361 }
362 bufferlist::iterator p = object_data.begin();
363 ::decode(*object_size, p);
364 ::decode(*num_objects, p);
365 ::decode(*prevPid, p);
366 if (!p.end()) {
367 ::decode(*op_size, p);
368 } else {
369 *op_size = *object_size;
370 }
371
372 return 0;
373}
374
375int ObjBencher::write_bench(int secondsToRun,
376 int concurrentios, const string& run_name_meta,
377 unsigned max_objects) {
378 if (concurrentios <= 0)
379 return -EINVAL;
380
381 if (!formatter) {
382 out(cout) << "Maintaining " << concurrentios << " concurrent writes of "
383 << data.op_size << " bytes to objects of size "
384 << data.object_size << " for up to "
385 << secondsToRun << " seconds or "
386 << max_objects << " objects"
387 << std::endl;
388 } else {
389 formatter->dump_format("concurrent_ios", "%d", concurrentios);
390 formatter->dump_format("object_size", "%d", data.object_size);
391 formatter->dump_format("op_size", "%d", data.op_size);
392 formatter->dump_format("seconds_to_run", "%d", secondsToRun);
393 formatter->dump_format("max_objects", "%d", max_objects);
394 }
395 bufferlist* newContents = 0;
396
397 std::string prefix = generate_object_prefix();
398 if (!formatter)
399 out(cout) << "Object prefix: " << prefix << std::endl;
400 else
401 formatter->dump_string("object_prefix", prefix);
402
403 std::vector<string> name(concurrentios);
404 std::string newName;
405 bufferlist* contents[concurrentios];
406 double total_latency = 0;
407 std::vector<utime_t> start_times(concurrentios);
408 utime_t stopTime;
409 int r = 0;
410 bufferlist b_write;
411 lock_cond lc(&lock);
412 utime_t runtime;
413 utime_t timePassed;
414
415 unsigned writes_per_object = 1;
416 if (data.op_size)
417 writes_per_object = data.object_size / data.op_size;
418
419 r = completions_init(concurrentios);
420
421 //set up writes so I can start them together
422 for (int i = 0; i<concurrentios; ++i) {
423 name[i] = generate_object_name(i / writes_per_object);
424 contents[i] = new bufferlist();
425 snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", i);
426 contents[i]->append(data.object_contents, data.op_size);
427 }
428
429 pthread_t print_thread;
430
431 pthread_create(&print_thread, NULL, ObjBencher::status_printer, (void *)this);
432 ceph_pthread_setname(print_thread, "write_stat");
433 lock.Lock();
434 data.finished = 0;
435 data.start_time = ceph_clock_now();
436 lock.Unlock();
437 for (int i = 0; i<concurrentios; ++i) {
438 start_times[i] = ceph_clock_now();
439 r = create_completion(i, _aio_cb, (void *)&lc);
440 if (r < 0)
441 goto ERR;
442 r = aio_write(name[i], i, *contents[i], data.op_size,
443 data.op_size * (i % writes_per_object));
444 if (r < 0) { //naughty, doesn't clean up heap
445 goto ERR;
446 }
447 lock.Lock();
448 ++data.started;
449 ++data.in_flight;
450 lock.Unlock();
451 }
452
453 //keep on adding new writes as old ones complete until we've passed minimum time
454 int slot;
455 int num_objects;
456
457 //don't need locking for reads because other thread doesn't write
458
459 runtime.set_from_double(secondsToRun);
460 stopTime = data.start_time + runtime;
461 slot = 0;
462 lock.Lock();
463 while (!secondsToRun || ceph_clock_now() < stopTime) {
464 bool found = false;
465 while (1) {
466 int old_slot = slot;
467 do {
468 if (completion_is_done(slot)) {
469 found = true;
470 break;
471 }
472 slot++;
473 if (slot == concurrentios) {
474 slot = 0;
475 }
476 } while (slot != old_slot);
477 if (found)
478 break;
479 lc.cond.Wait(lock);
480 }
481 lock.Unlock();
482 //create new contents and name on the heap, and fill them
483 newName = generate_object_name(data.started / writes_per_object);
484 newContents = contents[slot];
485 snprintf(newContents->c_str(), data.op_size, "I'm the %16dth op!", data.started);
486 // we wrote to buffer, going around internal crc cache, so invalidate it now.
487 newContents->invalidate_crc();
488
489 completion_wait(slot);
490 lock.Lock();
491 r = completion_ret(slot);
492 if (r != 0) {
493 lock.Unlock();
494 goto ERR;
495 }
496 data.cur_latency = ceph_clock_now() - start_times[slot];
497 data.history.latency.push_back(data.cur_latency);
498 total_latency += data.cur_latency;
499 if( data.cur_latency > data.max_latency) data.max_latency = data.cur_latency;
500 if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency;
501 ++data.finished;
502 data.avg_latency = total_latency / data.finished;
503 --data.in_flight;
504 lock.Unlock();
505 release_completion(slot);
506 timePassed = ceph_clock_now() - data.start_time;
507
508 //write new stuff to backend
509 start_times[slot] = ceph_clock_now();
510 r = create_completion(slot, _aio_cb, &lc);
511 if (r < 0)
512 goto ERR;
513 r = aio_write(newName, slot, *newContents, data.op_size,
514 data.op_size * (data.started % writes_per_object));
515 if (r < 0) {//naughty; doesn't clean up heap space.
516 goto ERR;
517 }
518 name[slot] = newName;
519 lock.Lock();
520 ++data.started;
521 ++data.in_flight;
522 if (max_objects &&
523 data.started >= (int)((data.object_size * max_objects + data.op_size - 1) /
524 data.op_size))
525 break;
526 }
527 lock.Unlock();
528
529 while (data.finished < data.started) {
530 slot = data.finished % concurrentios;
531 completion_wait(slot);
532 lock.Lock();
533 r = completion_ret(slot);
534 if (r != 0) {
535 lock.Unlock();
536 goto ERR;
537 }
538 data.cur_latency = ceph_clock_now() - start_times[slot];
539 data.history.latency.push_back(data.cur_latency);
540 total_latency += data.cur_latency;
541 if (data.cur_latency > data.max_latency) data.max_latency = data.cur_latency;
542 if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency;
543 ++data.finished;
544 data.avg_latency = total_latency / data.finished;
545 --data.in_flight;
546 lock.Unlock();
547 release_completion(slot);
548 delete contents[slot];
549 contents[slot] = 0;
550 }
551
552 timePassed = ceph_clock_now() - data.start_time;
553 lock.Lock();
554 data.done = true;
555 lock.Unlock();
556
557 pthread_join(print_thread, NULL);
558
559 double bandwidth;
560 bandwidth = ((double)data.finished)*((double)data.op_size)/(double)timePassed;
561 bandwidth = bandwidth/(1024*1024); // we want it in MB/sec
562
563 if (!formatter) {
564 out(cout) << "Total time run: " << timePassed << std::endl
565 << "Total writes made: " << data.finished << std::endl
566 << "Write size: " << data.op_size << std::endl
567 << "Object size: " << data.object_size << std::endl
568 << "Bandwidth (MB/sec): " << setprecision(6) << bandwidth << std::endl
569 << "Stddev Bandwidth: " << vec_stddev(data.history.bandwidth) << std::endl
570 << "Max bandwidth (MB/sec): " << data.idata.max_bandwidth << std::endl
571 << "Min bandwidth (MB/sec): " << data.idata.min_bandwidth << std::endl
572 << "Average IOPS: " << (int)(data.finished/timePassed) << std::endl
573 << "Stddev IOPS: " << vec_stddev(data.history.iops) << std::endl
574 << "Max IOPS: " << data.idata.max_iops << std::endl
575 << "Min IOPS: " << data.idata.min_iops << std::endl
576 << "Average Latency(s): " << data.avg_latency << std::endl
577 << "Stddev Latency(s): " << vec_stddev(data.history.latency) << std::endl
578 << "Max latency(s): " << data.max_latency << std::endl
579 << "Min latency(s): " << data.min_latency << std::endl;
580 } else {
581 formatter->dump_format("total_time_run", "%f", (double)timePassed);
582 formatter->dump_format("total_writes_made", "%d", data.finished);
583 formatter->dump_format("write_size", "%d", data.op_size);
584 formatter->dump_format("object_size", "%d", data.object_size);
585 formatter->dump_format("bandwidth", "%f", bandwidth);
586 formatter->dump_format("stddev_bandwidth", "%f", vec_stddev(data.history.bandwidth));
587 formatter->dump_format("max_bandwidth", "%f", data.idata.max_bandwidth);
588 formatter->dump_format("min_bandwidth", "%f", data.idata.min_bandwidth);
589 formatter->dump_format("average_iops", "%d", (int)(data.finished/timePassed));
590 formatter->dump_format("stddev_iops", "%d", vec_stddev(data.history.iops));
591 formatter->dump_format("max_iops", "%d", data.idata.max_iops);
592 formatter->dump_format("min_iops", "%d", data.idata.min_iops);
593 formatter->dump_format("average_latency", "%f", data.avg_latency);
594 formatter->dump_format("stddev_latency", "%f", vec_stddev(data.history.latency));
28e407b8 595 formatter->dump_format("max_latency", "%f", data.max_latency);
7c673cae
FG
596 formatter->dump_format("min_latency", "%f", data.min_latency);
597 }
598 //write object size/number data for read benchmarks
599 ::encode(data.object_size, b_write);
600 num_objects = (data.finished + writes_per_object - 1) / writes_per_object;
601 ::encode(num_objects, b_write);
602 ::encode(getpid(), b_write);
603 ::encode(data.op_size, b_write);
604
605 // persist meta-data for further cleanup or read
606 sync_write(run_name_meta, b_write, sizeof(int)*3);
607
608 completions_done();
609 for (int i = 0; i < concurrentios; i++)
610 if (contents[i])
611 delete contents[i];
612
613 return 0;
614
615 ERR:
616 lock.Lock();
617 data.done = 1;
618 lock.Unlock();
619 pthread_join(print_thread, NULL);
620 for (int i = 0; i < concurrentios; i++)
621 if (contents[i])
622 delete contents[i];
623 return r;
624}
625
626int ObjBencher::seq_read_bench(int seconds_to_run, int num_objects, int concurrentios, int pid, bool no_verify) {
627 lock_cond lc(&lock);
628
629 if (concurrentios <= 0)
630 return -EINVAL;
631
632 std::vector<string> name(concurrentios);
633 std::string newName;
634 bufferlist* contents[concurrentios];
635 int index[concurrentios];
636 int errors = 0;
637 utime_t start_time;
638 std::vector<utime_t> start_times(concurrentios);
639 utime_t time_to_run;
640 time_to_run.set_from_double(seconds_to_run);
641 double total_latency = 0;
642 int r = 0;
643 utime_t runtime;
644 sanitize_object_contents(&data, data.op_size); //clean it up once; subsequent
645 //changes will be safe because string length should remain the same
646
647 unsigned writes_per_object = 1;
648 if (data.op_size)
649 writes_per_object = data.object_size / data.op_size;
650
651 r = completions_init(concurrentios);
652 if (r < 0)
653 return r;
654
655 //set up initial reads
656 for (int i = 0; i < concurrentios; ++i) {
657 name[i] = generate_object_name(i / writes_per_object, pid);
658 contents[i] = new bufferlist();
659 }
660
661 lock.Lock();
662 data.finished = 0;
663 data.start_time = ceph_clock_now();
664 lock.Unlock();
665
666 pthread_t print_thread;
667 pthread_create(&print_thread, NULL, status_printer, (void *)this);
668 ceph_pthread_setname(print_thread, "seq_read_stat");
669
670 utime_t finish_time = data.start_time + time_to_run;
671 //start initial reads
672 for (int i = 0; i < concurrentios; ++i) {
673 index[i] = i;
674 start_times[i] = ceph_clock_now();
675 create_completion(i, _aio_cb, (void *)&lc);
676 r = aio_read(name[i], i, contents[i], data.op_size,
677 data.op_size * (i % writes_per_object));
678 if (r < 0) { //naughty, doesn't clean up heap -- oh, or handle the print thread!
679 cerr << "r = " << r << std::endl;
680 goto ERR;
681 }
682 lock.Lock();
683 ++data.started;
684 ++data.in_flight;
685 lock.Unlock();
686 }
687
688 //keep on adding new reads as old ones complete
689 int slot;
690 bufferlist *cur_contents;
691
692 slot = 0;
693 while ((!seconds_to_run || ceph_clock_now() < finish_time) &&
694 num_objects > data.started) {
695 lock.Lock();
696 int old_slot = slot;
697 bool found = false;
698 while (1) {
699 do {
700 if (completion_is_done(slot)) {
701 found = true;
702 break;
703 }
704 slot++;
705 if (slot == concurrentios) {
706 slot = 0;
707 }
708 } while (slot != old_slot);
709 if (found) {
710 break;
711 }
712 lc.cond.Wait(lock);
713 }
714
715 // calculate latency here, so memcmp doesn't inflate it
716 data.cur_latency = ceph_clock_now() - start_times[slot];
717
718 cur_contents = contents[slot];
719 int current_index = index[slot];
720
721 // invalidate internal crc cache
722 cur_contents->invalidate_crc();
723
724 if (!no_verify) {
725 snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", current_index);
726 if ( (cur_contents->length() != data.op_size) ||
727 (memcmp(data.object_contents, cur_contents->c_str(), data.op_size) != 0) ) {
728 cerr << name[slot] << " is not correct!" << std::endl;
729 ++errors;
730 }
731 }
732
733 newName = generate_object_name(data.started / writes_per_object, pid);
734 index[slot] = data.started;
735 lock.Unlock();
736 completion_wait(slot);
737 lock.Lock();
738 r = completion_ret(slot);
739 if (r < 0) {
740 cerr << "read got " << r << std::endl;
741 lock.Unlock();
742 goto ERR;
743 }
744 total_latency += data.cur_latency;
745 if (data.cur_latency > data.max_latency) data.max_latency = data.cur_latency;
746 if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency;
747 ++data.finished;
748 data.avg_latency = total_latency / data.finished;
749 --data.in_flight;
750 lock.Unlock();
751 release_completion(slot);
752
753 //start new read and check data if requested
754 start_times[slot] = ceph_clock_now();
755 create_completion(slot, _aio_cb, (void *)&lc);
756 r = aio_read(newName, slot, contents[slot], data.op_size,
757 data.op_size * (data.started % writes_per_object));
758 if (r < 0) {
759 goto ERR;
760 }
761 lock.Lock();
762 ++data.started;
763 ++data.in_flight;
764 lock.Unlock();
765 name[slot] = newName;
766 }
767
768 //wait for final reads to complete
769 while (data.finished < data.started) {
770 slot = data.finished % concurrentios;
771 completion_wait(slot);
772 lock.Lock();
773 r = completion_ret(slot);
774 if (r < 0) {
775 cerr << "read got " << r << std::endl;
776 lock.Unlock();
777 goto ERR;
778 }
779 data.cur_latency = ceph_clock_now() - start_times[slot];
780 total_latency += data.cur_latency;
781 if (data.cur_latency > data.max_latency) data.max_latency = data.cur_latency;
782 if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency;
783 ++data.finished;
784 data.avg_latency = total_latency / data.finished;
785 --data.in_flight;
786 release_completion(slot);
787 if (!no_verify) {
788 snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", index[slot]);
789 lock.Unlock();
790 if ((contents[slot]->length() != data.op_size) ||
791 (memcmp(data.object_contents, contents[slot]->c_str(), data.op_size) != 0)) {
792 cerr << name[slot] << " is not correct!" << std::endl;
793 ++errors;
794 }
795 } else {
796 lock.Unlock();
797 }
798 delete contents[slot];
799 }
800
801 runtime = ceph_clock_now() - data.start_time;
802 lock.Lock();
803 data.done = true;
804 lock.Unlock();
805
806 pthread_join(print_thread, NULL);
807
808 double bandwidth;
809 bandwidth = ((double)data.finished)*((double)data.op_size)/(double)runtime;
810 bandwidth = bandwidth/(1024*1024); // we want it in MB/sec
811
812 if (!formatter) {
813 out(cout) << "Total time run: " << runtime << std::endl
814 << "Total reads made: " << data.finished << std::endl
815 << "Read size: " << data.op_size << std::endl
816 << "Object size: " << data.object_size << std::endl
817 << "Bandwidth (MB/sec): " << setprecision(6) << bandwidth << std::endl
818 << "Average IOPS: " << (int)(data.finished/runtime) << std::endl
819 << "Stddev IOPS: " << vec_stddev(data.history.iops) << std::endl
820 << "Max IOPS: " << data.idata.max_iops << std::endl
821 << "Min IOPS: " << data.idata.min_iops << std::endl
822 << "Average Latency(s): " << data.avg_latency << std::endl
823 << "Max latency(s): " << data.max_latency << std::endl
824 << "Min latency(s): " << data.min_latency << std::endl;
825 } else {
826 formatter->dump_format("total_time_run", "%f", (double)runtime);
827 formatter->dump_format("total_reads_made", "%d", data.finished);
828 formatter->dump_format("read_size", "%d", data.op_size);
829 formatter->dump_format("object_size", "%d", data.object_size);
830 formatter->dump_format("bandwidth", "%f", bandwidth);
831 formatter->dump_format("average_iops", "%d", (int)(data.finished/runtime));
832 formatter->dump_format("stddev_iops", "%d", vec_stddev(data.history.iops));
833 formatter->dump_format("max_iops", "%d", data.idata.max_iops);
834 formatter->dump_format("min_iops", "%d", data.idata.min_iops);
835 formatter->dump_format("average_latency", "%f", data.avg_latency);
836 formatter->dump_format("max_latency", "%f", data.max_latency);
837 formatter->dump_format("min_latency", "%f", data.min_latency);
838 }
839
840 completions_done();
841
842 return (errors > 0 ? -EIO : 0);
843
844 ERR:
845 lock.Lock();
846 data.done = 1;
847 lock.Unlock();
848 pthread_join(print_thread, NULL);
849 return r;
850}
851
852int ObjBencher::rand_read_bench(int seconds_to_run, int num_objects, int concurrentios, int pid, bool no_verify)
853{
854 lock_cond lc(&lock);
855
856 if (concurrentios <= 0)
857 return -EINVAL;
858
859 std::vector<string> name(concurrentios);
860 std::string newName;
861 bufferlist* contents[concurrentios];
862 int index[concurrentios];
863 int errors = 0;
864 utime_t start_time;
865 std::vector<utime_t> start_times(concurrentios);
866 utime_t time_to_run;
867 time_to_run.set_from_double(seconds_to_run);
868 double total_latency = 0;
869 int r = 0;
870 utime_t runtime;
871 sanitize_object_contents(&data, data.op_size); //clean it up once; subsequent
872 //changes will be safe because string length should remain the same
873
874 unsigned writes_per_object = 1;
875 if (data.op_size)
876 writes_per_object = data.object_size / data.op_size;
877
878 srand (time(NULL));
879
880 r = completions_init(concurrentios);
881 if (r < 0)
882 return r;
883
884 //set up initial reads
885 for (int i = 0; i < concurrentios; ++i) {
886 name[i] = generate_object_name(i / writes_per_object, pid);
887 contents[i] = new bufferlist();
888 }
889
890 lock.Lock();
891 data.finished = 0;
892 data.start_time = ceph_clock_now();
893 lock.Unlock();
894
895 pthread_t print_thread;
896 pthread_create(&print_thread, NULL, status_printer, (void *)this);
897 ceph_pthread_setname(print_thread, "rand_read_stat");
898
899 utime_t finish_time = data.start_time + time_to_run;
900 //start initial reads
901 for (int i = 0; i < concurrentios; ++i) {
902 index[i] = i;
903 start_times[i] = ceph_clock_now();
904 create_completion(i, _aio_cb, (void *)&lc);
905 r = aio_read(name[i], i, contents[i], data.op_size,
906 data.op_size * (i % writes_per_object));
907 if (r < 0) { //naughty, doesn't clean up heap -- oh, or handle the print thread!
908 cerr << "r = " << r << std::endl;
909 goto ERR;
910 }
911 lock.Lock();
912 ++data.started;
913 ++data.in_flight;
914 lock.Unlock();
915 }
916
917 //keep on adding new reads as old ones complete
918 int slot;
919 bufferlist *cur_contents;
920 int rand_id;
921
922 slot = 0;
923 while ((!seconds_to_run || ceph_clock_now() < finish_time)) {
924 lock.Lock();
925 int old_slot = slot;
926 bool found = false;
927 while (1) {
928 do {
929 if (completion_is_done(slot)) {
930 found = true;
931 break;
932 }
933 slot++;
934 if (slot == concurrentios) {
935 slot = 0;
936 }
937 } while (slot != old_slot);
938 if (found) {
939 break;
940 }
941 lc.cond.Wait(lock);
942 }
943
944 // calculate latency here, so memcmp doesn't inflate it
945 data.cur_latency = ceph_clock_now() - start_times[slot];
946
947 lock.Unlock();
948
949 int current_index = index[slot];
950 cur_contents = contents[slot];
951 completion_wait(slot);
952 lock.Lock();
953 r = completion_ret(slot);
954 if (r < 0) {
955 cerr << "read got " << r << std::endl;
956 lock.Unlock();
957 goto ERR;
958 }
959
960 total_latency += data.cur_latency;
961 if (data.cur_latency > data.max_latency) data.max_latency = data.cur_latency;
962 if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency;
963 ++data.finished;
964 data.avg_latency = total_latency / data.finished;
965 --data.in_flight;
966 lock.Unlock();
967
968 if (!no_verify) {
969 snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", current_index);
970 if ((cur_contents->length() != data.op_size) ||
971 (memcmp(data.object_contents, cur_contents->c_str(), data.op_size) != 0)) {
972 cerr << name[slot] << " is not correct!" << std::endl;
973 ++errors;
974 }
975 }
976
977 rand_id = rand() % num_objects;
978 newName = generate_object_name(rand_id / writes_per_object, pid);
979 index[slot] = rand_id;
980 release_completion(slot);
981
982 // invalidate internal crc cache
983 cur_contents->invalidate_crc();
984
985 //start new read and check data if requested
986 start_times[slot] = ceph_clock_now();
987 create_completion(slot, _aio_cb, (void *)&lc);
988 r = aio_read(newName, slot, contents[slot], data.op_size,
989 data.op_size * (rand_id % writes_per_object));
990 if (r < 0) {
991 goto ERR;
992 }
993 lock.Lock();
994 ++data.started;
995 ++data.in_flight;
996 lock.Unlock();
997 name[slot] = newName;
998 }
999
1000
1001 //wait for final reads to complete
1002 while (data.finished < data.started) {
1003 slot = data.finished % concurrentios;
1004 completion_wait(slot);
1005 lock.Lock();
1006 r = completion_ret(slot);
1007 if (r < 0) {
1008 cerr << "read got " << r << std::endl;
1009 lock.Unlock();
1010 goto ERR;
1011 }
1012 data.cur_latency = ceph_clock_now() - start_times[slot];
1013 total_latency += data.cur_latency;
1014 if (data.cur_latency > data.max_latency) data.max_latency = data.cur_latency;
1015 if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency;
1016 ++data.finished;
1017 data.avg_latency = total_latency / data.finished;
1018 --data.in_flight;
1019 release_completion(slot);
1020 if (!no_verify) {
1021 snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", index[slot]);
1022 lock.Unlock();
1023 if ((contents[slot]->length() != data.op_size) ||
1024 (memcmp(data.object_contents, contents[slot]->c_str(), data.op_size) != 0)) {
1025 cerr << name[slot] << " is not correct!" << std::endl;
1026 ++errors;
1027 }
1028 } else {
1029 lock.Unlock();
1030 }
1031 delete contents[slot];
1032 }
1033
1034 runtime = ceph_clock_now() - data.start_time;
1035 lock.Lock();
1036 data.done = true;
1037 lock.Unlock();
1038
1039 pthread_join(print_thread, NULL);
1040
1041 double bandwidth;
1042 bandwidth = ((double)data.finished)*((double)data.op_size)/(double)runtime;
1043 bandwidth = bandwidth/(1024*1024); // we want it in MB/sec
1044
1045 if (!formatter) {
1046 out(cout) << "Total time run: " << runtime << std::endl
1047 << "Total reads made: " << data.finished << std::endl
1048 << "Read size: " << data.op_size << std::endl
1049 << "Object size: " << data.object_size << std::endl
1050 << "Bandwidth (MB/sec): " << setprecision(6) << bandwidth << std::endl
1051 << "Average IOPS: " << (int)(data.finished/runtime) << std::endl
1052 << "Stddev IOPS: " << vec_stddev(data.history.iops) << std::endl
1053 << "Max IOPS: " << data.idata.max_iops << std::endl
1054 << "Min IOPS: " << data.idata.min_iops << std::endl
1055 << "Average Latency(s): " << data.avg_latency << std::endl
1056 << "Max latency(s): " << data.max_latency << std::endl
1057 << "Min latency(s): " << data.min_latency << std::endl;
1058 } else {
1059 formatter->dump_format("total_time_run", "%f", (double)runtime);
1060 formatter->dump_format("total_reads_made", "%d", data.finished);
1061 formatter->dump_format("read_size", "%d", data.op_size);
1062 formatter->dump_format("object_size", "%d", data.object_size);
1063 formatter->dump_format("bandwidth", "%f", bandwidth);
1064 formatter->dump_format("average_iops", "%d", (int)(data.finished/runtime));
1065 formatter->dump_format("stddev_iops", "%d", vec_stddev(data.history.iops));
1066 formatter->dump_format("max_iops", "%d", data.idata.max_iops);
1067 formatter->dump_format("min_iops", "%d", data.idata.min_iops);
1068 formatter->dump_format("average_latency", "%f", data.avg_latency);
1069 formatter->dump_format("max_latency", "%f", data.max_latency);
1070 formatter->dump_format("min_latency", "%f", data.min_latency);
1071 }
1072 completions_done();
1073
1074 return (errors > 0 ? -EIO : 0);
1075
1076 ERR:
1077 lock.Lock();
1078 data.done = 1;
1079 lock.Unlock();
1080 pthread_join(print_thread, NULL);
1081 return r;
1082}
1083
1084int ObjBencher::clean_up(const std::string& orig_prefix, int concurrentios, const std::string& run_name) {
1085 int r = 0;
1086 uint64_t op_size, object_size;
1087 int num_objects;
1088 int prevPid;
1089
1090 // default meta object if user does not specify one
1091 const std::string run_name_meta = (run_name.empty() ? BENCH_LASTRUN_METADATA : run_name);
1092 const std::string prefix = (orig_prefix.empty() ? generate_object_prefix_nopid() : orig_prefix);
1093
1094 if (prefix.substr(0, BENCH_PREFIX.length()) != BENCH_PREFIX) {
1095 cerr << "Specified --prefix invalid, it must begin with \"" << BENCH_PREFIX << "\"" << std::endl;
1096 return -EINVAL;
1097 }
1098
1099 std::list<Object> unfiltered_objects;
1100 std::set<std::string> meta_namespaces, all_namespaces;
1101
1102 // If caller set all_nspaces this will be searching
1103 // across multiple namespaces.
1104 while (true) {
1105 bool objects_remain = get_objects(&unfiltered_objects, 20);
1106 if (!objects_remain)
1107 break;
1108
1109 std::list<Object>::const_iterator i = unfiltered_objects.begin();
1110 for ( ; i != unfiltered_objects.end(); ++i) {
1111 if (i->first == run_name_meta) {
1112 meta_namespaces.insert(i->second);
1113 }
1114 if (i->first.substr(0, prefix.length()) == prefix) {
1115 all_namespaces.insert(i->second);
1116 }
1117 }
1118 }
1119
1120 std::set<std::string>::const_iterator i = all_namespaces.begin();
1121 for ( ; i != all_namespaces.end(); ++i) {
1122 set_namespace(*i);
1123
1124 // if no metadata file found we should try to do a linear search on the prefix
1125 if (meta_namespaces.find(*i) == meta_namespaces.end()) {
1126 int r = clean_up_slow(prefix, concurrentios);
1127 if (r < 0) {
1128 cerr << "clean_up_slow error r= " << r << std::endl;
1129 return r;
1130 }
1131 continue;
1132 }
1133
1134 r = fetch_bench_metadata(run_name_meta, &op_size, &object_size, &num_objects, &prevPid);
1135 if (r < 0) {
1136 return r;
1137 }
1138
1139 r = clean_up(num_objects, prevPid, concurrentios);
1140 if (r != 0) return r;
1141
1142 r = sync_remove(run_name_meta);
1143 if (r != 0) return r;
1144 }
1145
1146 return 0;
1147}
1148
1149int ObjBencher::clean_up(int num_objects, int prevPid, int concurrentios) {
1150 lock_cond lc(&lock);
1151
1152 if (concurrentios <= 0)
1153 return -EINVAL;
1154
1155 std::vector<string> name(concurrentios);
1156 std::string newName;
1157 int r = 0;
1158 utime_t runtime;
1159 int slot = 0;
1160
1161 lock.Lock();
1162 data.done = false;
1163 data.in_flight = 0;
1164 data.started = 0;
1165 data.finished = 0;
1166 lock.Unlock();
1167
1168 // don't start more completions than files
1169 if (num_objects == 0) {
1170 return 0;
1171 } else if (num_objects < concurrentios) {
1172 concurrentios = num_objects;
1173 }
1174
1175 r = completions_init(concurrentios);
1176 if (r < 0)
1177 return r;
1178
1179 //set up initial removes
1180 for (int i = 0; i < concurrentios; ++i) {
1181 name[i] = generate_object_name(i, prevPid);
1182 }
1183
1184 //start initial removes
1185 for (int i = 0; i < concurrentios; ++i) {
1186 create_completion(i, _aio_cb, (void *)&lc);
1187 r = aio_remove(name[i], i);
1188 if (r < 0) { //naughty, doesn't clean up heap
1189 cerr << "r = " << r << std::endl;
1190 goto ERR;
1191 }
1192 lock.Lock();
1193 ++data.started;
1194 ++data.in_flight;
1195 lock.Unlock();
1196 }
1197
1198 //keep on adding new removes as old ones complete
1199 while (data.started < num_objects) {
1200 lock.Lock();
1201 int old_slot = slot;
1202 bool found = false;
1203 while (1) {
1204 do {
1205 if (completion_is_done(slot)) {
1206 found = true;
1207 break;
1208 }
1209 slot++;
1210 if (slot == concurrentios) {
1211 slot = 0;
1212 }
1213 } while (slot != old_slot);
1214 if (found) {
1215 break;
1216 }
1217 lc.cond.Wait(lock);
1218 }
1219 lock.Unlock();
1220 newName = generate_object_name(data.started, prevPid);
1221 completion_wait(slot);
1222 lock.Lock();
1223 r = completion_ret(slot);
1224 if (r != 0 && r != -ENOENT) { // file does not exist
1225 cerr << "remove got " << r << std::endl;
1226 lock.Unlock();
1227 goto ERR;
1228 }
1229 ++data.finished;
1230 --data.in_flight;
1231 lock.Unlock();
1232 release_completion(slot);
1233
1234 //start new remove and check data if requested
1235 create_completion(slot, _aio_cb, (void *)&lc);
1236 r = aio_remove(newName, slot);
1237 if (r < 0) {
1238 goto ERR;
1239 }
1240 lock.Lock();
1241 ++data.started;
1242 ++data.in_flight;
1243 lock.Unlock();
1244 name[slot] = newName;
1245 }
1246
1247 //wait for final removes to complete
1248 while (data.finished < data.started) {
1249 slot = data.finished % concurrentios;
1250 completion_wait(slot);
1251 lock.Lock();
1252 r = completion_ret(slot);
1253 if (r != 0 && r != -ENOENT) { // file does not exist
1254 cerr << "remove got " << r << std::endl;
1255 lock.Unlock();
1256 goto ERR;
1257 }
1258 ++data.finished;
1259 --data.in_flight;
1260 release_completion(slot);
1261 lock.Unlock();
1262 }
1263
1264 lock.Lock();
1265 data.done = true;
1266 lock.Unlock();
1267
1268 completions_done();
1269
1270 out(cout) << "Removed " << data.finished << " object" << (data.finished != 1 ? "s" : "") << std::endl;
1271
1272 return 0;
1273
1274 ERR:
1275 lock.Lock();
1276 data.done = 1;
1277 lock.Unlock();
1278 return r;
1279}
1280
1281/**
1282 * Return objects from the datastore which match a prefix.
1283 *
1284 * Clears the list and populates it with any objects which match the
1285 * prefix. The list is guaranteed to have at least one item when the
1286 * function returns true.
1287 *
1288 * @param prefix the prefix to match against
1289 * @param objects [out] return list of objects
1290 * @returns true if there are any objects in the store which match
1291 * the prefix, false if there are no more
1292 */
1293bool ObjBencher::more_objects_matching_prefix(const std::string& prefix, std::list<Object>* objects) {
1294 std::list<Object> unfiltered_objects;
1295
1296 objects->clear();
1297
1298 while (objects->empty()) {
1299 bool objects_remain = get_objects(&unfiltered_objects, 20);
1300 if (!objects_remain)
1301 return false;
1302
1303 std::list<Object>::const_iterator i = unfiltered_objects.begin();
1304 for ( ; i != unfiltered_objects.end(); ++i) {
1305 if (i->first.substr(0, prefix.length()) == prefix) {
1306 objects->push_back(*i);
1307 }
1308 }
1309 }
1310
1311 return true;
1312}
1313
1314int ObjBencher::clean_up_slow(const std::string& prefix, int concurrentios) {
1315 lock_cond lc(&lock);
1316
1317 if (concurrentios <= 0)
1318 return -EINVAL;
1319
1320 std::vector<Object> name(concurrentios);
1321 Object newName;
1322 int r = 0;
1323 utime_t runtime;
1324 int slot = 0;
1325 std::list<Object> objects;
1326 bool objects_remain = true;
1327
1328 lock.Lock();
1329 data.done = false;
1330 data.in_flight = 0;
1331 data.started = 0;
1332 data.finished = 0;
1333 lock.Unlock();
1334
1335 out(cout) << "Warning: using slow linear search" << std::endl;
1336
1337 r = completions_init(concurrentios);
1338 if (r < 0)
1339 return r;
1340
1341 //set up initial removes
1342 for (int i = 0; i < concurrentios; ++i) {
1343 if (objects.empty()) {
1344 // if there are fewer objects than concurrent ios, don't generate extras
1345 bool objects_found = more_objects_matching_prefix(prefix, &objects);
1346 if (!objects_found) {
1347 concurrentios = i;
1348 objects_remain = false;
1349 break;
1350 }
1351 }
1352
1353 name[i] = objects.front();
1354 objects.pop_front();
1355 }
1356
1357 //start initial removes
1358 for (int i = 0; i < concurrentios; ++i) {
1359 create_completion(i, _aio_cb, (void *)&lc);
1360 set_namespace(name[i].second);
1361 r = aio_remove(name[i].first, i);
1362 if (r < 0) { //naughty, doesn't clean up heap
1363 cerr << "r = " << r << std::endl;
1364 goto ERR;
1365 }
1366 lock.Lock();
1367 ++data.started;
1368 ++data.in_flight;
1369 lock.Unlock();
1370 }
1371
1372 //keep on adding new removes as old ones complete
1373 while (objects_remain) {
1374 lock.Lock();
1375 int old_slot = slot;
1376 bool found = false;
1377 while (1) {
1378 do {
1379 if (completion_is_done(slot)) {
1380 found = true;
1381 break;
1382 }
1383 slot++;
1384 if (slot == concurrentios) {
1385 slot = 0;
1386 }
1387 } while (slot != old_slot);
1388 if (found) {
1389 break;
1390 }
1391 lc.cond.Wait(lock);
1392 }
1393 lock.Unlock();
1394
1395 // get more objects if necessary
1396 if (objects.empty()) {
1397 objects_remain = more_objects_matching_prefix(prefix, &objects);
1398 // quit if there are no more
1399 if (!objects_remain) {
1400 break;
1401 }
1402 }
1403
1404 // get the next object
1405 newName = objects.front();
1406 objects.pop_front();
1407
1408 completion_wait(slot);
1409 lock.Lock();
1410 r = completion_ret(slot);
1411 if (r != 0 && r != -ENOENT) { // file does not exist
1412 cerr << "remove got " << r << std::endl;
1413 lock.Unlock();
1414 goto ERR;
1415 }
1416 ++data.finished;
1417 --data.in_flight;
1418 lock.Unlock();
1419 release_completion(slot);
1420
1421 //start new remove and check data if requested
1422 create_completion(slot, _aio_cb, (void *)&lc);
1423 set_namespace(newName.second);
1424 r = aio_remove(newName.first, slot);
1425 if (r < 0) {
1426 goto ERR;
1427 }
1428 lock.Lock();
1429 ++data.started;
1430 ++data.in_flight;
1431 lock.Unlock();
1432 name[slot] = newName;
1433 }
1434
1435 //wait for final removes to complete
1436 while (data.finished < data.started) {
1437 slot = data.finished % concurrentios;
1438 completion_wait(slot);
1439 lock.Lock();
1440 r = completion_ret(slot);
1441 if (r != 0 && r != -ENOENT) { // file does not exist
1442 cerr << "remove got " << r << std::endl;
1443 lock.Unlock();
1444 goto ERR;
1445 }
1446 ++data.finished;
1447 --data.in_flight;
1448 release_completion(slot);
1449 lock.Unlock();
1450 }
1451
1452 lock.Lock();
1453 data.done = true;
1454 lock.Unlock();
1455
1456 completions_done();
1457
1458 out(cout) << "Removed " << data.finished << " object" << (data.finished != 1 ? "s" : "") << std::endl;
1459
1460 return 0;
1461
1462 ERR:
1463 lock.Lock();
1464 data.done = 1;
1465 lock.Unlock();
1466 return -EIO;
1467}