]> git.proxmox.com Git - ceph.git/blame - ceph/src/common/obj_bencher.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / common / obj_bencher.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2009 Sage Weil <sage@newdream.net>
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 * Series of functions to test your rados installation. Notice
14 * that this code is not terribly robust -- for instance, if you
15 * try and bench on a pool you don't have permission to access
16 * it will just loop forever.
17 */
18#include "include/compat.h"
19#include <pthread.h>
20#include "common/Cond.h"
21#include "obj_bencher.h"
22
23#include <iostream>
24#include <fstream>
25
26#include <cerrno>
27
28#include <stdlib.h>
29#include <time.h>
30#include <sstream>
31#include <vector>
32
33const std::string BENCH_LASTRUN_METADATA = "benchmark_last_metadata";
34const std::string BENCH_PREFIX = "benchmark_data";
35static char cached_hostname[30] = {0};
36int cached_pid = 0;
37
38static std::string generate_object_prefix_nopid() {
39 if (cached_hostname[0] == 0) {
40 gethostname(cached_hostname, sizeof(cached_hostname)-1);
41 cached_hostname[sizeof(cached_hostname)-1] = 0;
42 }
43
44 std::ostringstream oss;
45 oss << BENCH_PREFIX << "_" << cached_hostname;
46 return oss.str();
47}
48
49static std::string generate_object_prefix(int pid = 0) {
50 if (pid)
51 cached_pid = pid;
52 else if (!cached_pid)
53 cached_pid = getpid();
54
55 std::ostringstream oss;
56 oss << generate_object_prefix_nopid() << "_" << cached_pid;
57 return oss.str();
58}
59
60static std::string generate_object_name(int objnum, int pid = 0)
61{
62 std::ostringstream oss;
63 oss << generate_object_prefix(pid) << "_object" << objnum;
64 return oss.str();
65}
66
67static void sanitize_object_contents (bench_data *data, size_t length) {
68 memset(data->object_contents, 'z', length);
69}
70
71ostream& ObjBencher::out(ostream& os, utime_t& t)
72{
73 if (show_time)
74 return t.localtime(os) << " ";
75 else
76 return os;
77}
78
79ostream& ObjBencher::out(ostream& os)
80{
81 utime_t cur_time = ceph_clock_now();
82 return out(os, cur_time);
83}
84
85void *ObjBencher::status_printer(void *_bencher) {
86 ObjBencher *bencher = static_cast<ObjBencher *>(_bencher);
87 bench_data& data = bencher->data;
88 Formatter *formatter = bencher->formatter;
89 ostream *outstream = bencher->outstream;
90 Cond cond;
91 int i = 0;
92 int previous_writes = 0;
93 int cycleSinceChange = 0;
94 double bandwidth;
95 int iops;
96 utime_t ONE_SECOND;
97 ONE_SECOND.set_from_double(1.0);
98 bencher->lock.Lock();
99 if (formatter)
100 formatter->open_array_section("datas");
101 while(!data.done) {
102 utime_t cur_time = ceph_clock_now();
103
104 if (i % 20 == 0 && !formatter) {
105 if (i > 0)
106 cur_time.localtime(cout) << " min lat: " << data.min_latency
107 << " max lat: " << data.max_latency
108 << " avg lat: " << data.avg_latency << std::endl;
109 //I'm naughty and don't reset the fill
110 bencher->out(cout, cur_time) << setfill(' ')
111 << setw(5) << "sec"
112 << setw(8) << "Cur ops"
113 << setw(10) << "started"
114 << setw(10) << "finished"
115 << setw(10) << "avg MB/s"
116 << setw(10) << "cur MB/s"
117 << setw(12) << "last lat(s)"
118 << setw(12) << "avg lat(s)" << std::endl;
119 }
120 if (cycleSinceChange)
121 bandwidth = (double)(data.finished - previous_writes)
122 * (data.op_size)
123 / (1024*1024)
124 / cycleSinceChange;
125 else
126 bandwidth = -1;
127
128 if (!std::isnan(bandwidth) && bandwidth > -1) {
129 if (bandwidth > data.idata.max_bandwidth)
130 data.idata.max_bandwidth = bandwidth;
131 if (bandwidth < data.idata.min_bandwidth)
132 data.idata.min_bandwidth = bandwidth;
133
134 data.history.bandwidth.push_back(bandwidth);
135 }
136
137 if (cycleSinceChange)
138 iops = (double)(data.finished - previous_writes)
139 / cycleSinceChange;
140 else
141 iops = -1;
142
143 if (!std::isnan(iops) && iops > -1) {
144 if (iops > data.idata.max_iops)
145 data.idata.max_iops = iops;
146 if (iops < data.idata.min_iops)
147 data.idata.min_iops = iops;
148
149 data.history.iops.push_back(iops);
150 }
151
152 if (formatter)
153 formatter->open_object_section("data");
154
155 double avg_bandwidth = (double) (data.op_size) * (data.finished)
156 / (double)(cur_time - data.start_time) / (1024*1024);
157 if (previous_writes != data.finished) {
158 previous_writes = data.finished;
159 cycleSinceChange = 0;
160 if (!formatter) {
161 bencher->out(cout, cur_time)
162 << setfill(' ')
163 << setw(5) << i
164 << ' ' << setw(7) << data.in_flight
165 << ' ' << setw(9) << data.started
166 << ' ' << setw(9) << data.finished
167 << ' ' << setw(9) << avg_bandwidth
168 << ' ' << setw(9) << bandwidth
169 << ' ' << setw(11) << (double)data.cur_latency
170 << ' ' << setw(11) << data.avg_latency << std::endl;
171 } else {
172 formatter->dump_format("sec", "%d", i);
173 formatter->dump_format("cur_ops", "%d", data.in_flight);
174 formatter->dump_format("started", "%d", data.started);
175 formatter->dump_format("finished", "%d", data.finished);
176 formatter->dump_format("avg_bw", "%f", avg_bandwidth);
177 formatter->dump_format("cur_bw", "%f", bandwidth);
178 formatter->dump_format("last_lat", "%f", (double)data.cur_latency);
179 formatter->dump_format("avg_lat", "%f", data.avg_latency);
180 }
181 }
182 else {
183 if (!formatter) {
184 bencher->out(cout, cur_time)
185 << setfill(' ')
186 << setw(5) << i
187 << ' ' << setw(7) << data.in_flight
188 << ' ' << setw(9) << data.started
189 << ' ' << setw(9) << data.finished
190 << ' ' << setw(9) << avg_bandwidth
191 << ' ' << setw(9) << '0'
192 << ' ' << setw(11) << '-'
193 << ' '<< setw(11) << data.avg_latency << std::endl;
194 } else {
195 formatter->dump_format("sec", "%d", i);
196 formatter->dump_format("cur_ops", "%d", data.in_flight);
197 formatter->dump_format("started", "%d", data.started);
198 formatter->dump_format("finished", "%d", data.finished);
199 formatter->dump_format("avg_bw", "%f", avg_bandwidth);
200 formatter->dump_format("cur_bw", "%f", 0);
201 formatter->dump_format("last_lat", "%f", 0);
202 formatter->dump_format("avg_lat", "%f", data.avg_latency);
203 }
204 }
205 if (formatter) {
206 formatter->close_section(); // data
207 formatter->flush(*outstream);
208 }
209 ++i;
210 ++cycleSinceChange;
211 cond.WaitInterval(bencher->lock, ONE_SECOND);
212 }
213 if (formatter)
214 formatter->close_section(); //datas
215 bencher->lock.Unlock();
216 return NULL;
217}
218
219int ObjBencher::aio_bench(
220 int operation, int secondsToRun,
221 int concurrentios,
222 uint64_t op_size, uint64_t object_size,
223 unsigned max_objects,
224 bool cleanup, bool hints,
225 const std::string& run_name, bool no_verify) {
226
227 if (concurrentios <= 0)
228 return -EINVAL;
229
230 int num_objects = 0;
231 int r = 0;
232 int prevPid = 0;
233 utime_t runtime;
234
235 // default metadata object is used if user does not specify one
236 const std::string run_name_meta = (run_name.empty() ? BENCH_LASTRUN_METADATA : run_name);
237
238 //get data from previous write run, if available
239 if (operation != OP_WRITE) {
240 uint64_t prev_op_size, prev_object_size;
241 r = fetch_bench_metadata(run_name_meta, &prev_op_size, &prev_object_size,
242 &num_objects, &prevPid);
243 if (r < 0) {
244 if (r == -ENOENT)
245 cerr << "Must write data before running a read benchmark!" << std::endl;
246 return r;
247 }
248 object_size = prev_object_size;
249 op_size = prev_op_size;
250 }
251
252 char* contentsChars = new char[op_size];
253 lock.Lock();
254 data.done = false;
255 data.hints = hints;
256 data.object_size = object_size;
257 data.op_size = op_size;
258 data.in_flight = 0;
259 data.started = 0;
260 data.finished = 0;
261 data.min_latency = 9999.0; // this better be higher than initial latency!
262 data.max_latency = 0;
263 data.avg_latency = 0;
264 data.object_contents = contentsChars;
265 lock.Unlock();
266
267 //fill in contentsChars deterministically so we can check returns
268 sanitize_object_contents(&data, data.op_size);
269
270 if (formatter)
271 formatter->open_object_section("bench");
272
273 if (OP_WRITE == operation) {
274 r = write_bench(secondsToRun, concurrentios, run_name_meta, max_objects);
275 if (r != 0) goto out;
276 }
277 else if (OP_SEQ_READ == operation) {
278 r = seq_read_bench(secondsToRun, num_objects, concurrentios, prevPid, no_verify);
279 if (r != 0) goto out;
280 }
281 else if (OP_RAND_READ == operation) {
282 r = rand_read_bench(secondsToRun, num_objects, concurrentios, prevPid, no_verify);
283 if (r != 0) goto out;
284 }
285
286 if (OP_WRITE == operation && cleanup) {
287 r = fetch_bench_metadata(run_name_meta, &op_size, &object_size,
288 &num_objects, &prevPid);
289 if (r < 0) {
290 if (r == -ENOENT)
291 cerr << "Should never happen: bench metadata missing for current run!" << std::endl;
292 goto out;
293 }
294
295 data.start_time = ceph_clock_now();
296 out(cout) << "Cleaning up (deleting benchmark objects)" << std::endl;
297
298 r = clean_up(num_objects, prevPid, concurrentios);
299 if (r != 0) goto out;
300
301 runtime = ceph_clock_now() - data.start_time;
302 out(cout) << "Clean up completed and total clean up time :" << runtime << std::endl;
303
304 // lastrun file
305 r = sync_remove(run_name_meta);
306 if (r != 0) goto out;
307 }
308
309 out:
310 if (formatter) {
311 formatter->close_section(); // bench
312 formatter->flush(*outstream);
313 *outstream << std::endl;
314 }
315 delete[] contentsChars;
316 return r;
317}
318
319struct lock_cond {
320 explicit lock_cond(Mutex *_lock) : lock(_lock) {}
321 Mutex *lock;
322 Cond cond;
323};
324
325void _aio_cb(void *cb, void *arg) {
326 struct lock_cond *lc = (struct lock_cond *)arg;
327 lc->lock->Lock();
328 lc->cond.Signal();
329 lc->lock->Unlock();
330}
331
332template<class T>
333static T vec_stddev(vector<T>& v)
334{
335 T mean = 0;
336
337 if (v.size() < 2)
338 return 0;
339
340 typename vector<T>::iterator iter;
341 for (iter = v.begin(); iter != v.end(); ++iter) {
342 mean += *iter;
343 }
344
345 mean /= v.size();
346
347 T stddev = 0;
348 for (iter = v.begin(); iter != v.end(); ++iter) {
349 T dev = *iter - mean;
350 dev *= dev;
351 stddev += dev;
352 }
353 stddev /= (v.size() - 1);
354 return sqrt(stddev);
355}
356
357int ObjBencher::fetch_bench_metadata(const std::string& metadata_file,
358 uint64_t *op_size, uint64_t* object_size,
359 int* num_objects, int* prevPid) {
360 int r = 0;
361 bufferlist object_data;
362
363 r = sync_read(metadata_file, object_data,
364 sizeof(int) * 2 + sizeof(size_t) * 2);
365 if (r <= 0) {
366 // treat an empty file as a file that does not exist
367 if (r == 0) {
368 r = -ENOENT;
369 }
370 return r;
371 }
372 bufferlist::iterator p = object_data.begin();
373 ::decode(*object_size, p);
374 ::decode(*num_objects, p);
375 ::decode(*prevPid, p);
376 if (!p.end()) {
377 ::decode(*op_size, p);
378 } else {
379 *op_size = *object_size;
380 }
381
382 return 0;
383}
384
385int ObjBencher::write_bench(int secondsToRun,
386 int concurrentios, const string& run_name_meta,
387 unsigned max_objects) {
388 if (concurrentios <= 0)
389 return -EINVAL;
390
391 if (!formatter) {
392 out(cout) << "Maintaining " << concurrentios << " concurrent writes of "
393 << data.op_size << " bytes to objects of size "
394 << data.object_size << " for up to "
395 << secondsToRun << " seconds or "
396 << max_objects << " objects"
397 << std::endl;
398 } else {
399 formatter->dump_format("concurrent_ios", "%d", concurrentios);
400 formatter->dump_format("object_size", "%d", data.object_size);
401 formatter->dump_format("op_size", "%d", data.op_size);
402 formatter->dump_format("seconds_to_run", "%d", secondsToRun);
403 formatter->dump_format("max_objects", "%d", max_objects);
404 }
405 bufferlist* newContents = 0;
406
407 std::string prefix = generate_object_prefix();
408 if (!formatter)
409 out(cout) << "Object prefix: " << prefix << std::endl;
410 else
411 formatter->dump_string("object_prefix", prefix);
412
413 std::vector<string> name(concurrentios);
414 std::string newName;
415 bufferlist* contents[concurrentios];
416 double total_latency = 0;
417 std::vector<utime_t> start_times(concurrentios);
418 utime_t stopTime;
419 int r = 0;
420 bufferlist b_write;
421 lock_cond lc(&lock);
422 utime_t runtime;
423 utime_t timePassed;
424
425 unsigned writes_per_object = 1;
426 if (data.op_size)
427 writes_per_object = data.object_size / data.op_size;
428
429 r = completions_init(concurrentios);
430
431 //set up writes so I can start them together
432 for (int i = 0; i<concurrentios; ++i) {
433 name[i] = generate_object_name(i / writes_per_object);
434 contents[i] = new bufferlist();
435 snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", i);
436 contents[i]->append(data.object_contents, data.op_size);
437 }
438
439 pthread_t print_thread;
440
441 pthread_create(&print_thread, NULL, ObjBencher::status_printer, (void *)this);
442 ceph_pthread_setname(print_thread, "write_stat");
443 lock.Lock();
444 data.finished = 0;
445 data.start_time = ceph_clock_now();
446 lock.Unlock();
447 for (int i = 0; i<concurrentios; ++i) {
448 start_times[i] = ceph_clock_now();
449 r = create_completion(i, _aio_cb, (void *)&lc);
450 if (r < 0)
451 goto ERR;
452 r = aio_write(name[i], i, *contents[i], data.op_size,
453 data.op_size * (i % writes_per_object));
454 if (r < 0) { //naughty, doesn't clean up heap
455 goto ERR;
456 }
457 lock.Lock();
458 ++data.started;
459 ++data.in_flight;
460 lock.Unlock();
461 }
462
463 //keep on adding new writes as old ones complete until we've passed minimum time
464 int slot;
465 int num_objects;
466
467 //don't need locking for reads because other thread doesn't write
468
469 runtime.set_from_double(secondsToRun);
470 stopTime = data.start_time + runtime;
471 slot = 0;
472 lock.Lock();
473 while (!secondsToRun || ceph_clock_now() < stopTime) {
474 bool found = false;
475 while (1) {
476 int old_slot = slot;
477 do {
478 if (completion_is_done(slot)) {
479 found = true;
480 break;
481 }
482 slot++;
483 if (slot == concurrentios) {
484 slot = 0;
485 }
486 } while (slot != old_slot);
487 if (found)
488 break;
489 lc.cond.Wait(lock);
490 }
491 lock.Unlock();
492 //create new contents and name on the heap, and fill them
493 newName = generate_object_name(data.started / writes_per_object);
494 newContents = contents[slot];
495 snprintf(newContents->c_str(), data.op_size, "I'm the %16dth op!", data.started);
496 // we wrote to buffer, going around internal crc cache, so invalidate it now.
497 newContents->invalidate_crc();
498
499 completion_wait(slot);
500 lock.Lock();
501 r = completion_ret(slot);
502 if (r != 0) {
503 lock.Unlock();
504 goto ERR;
505 }
506 data.cur_latency = ceph_clock_now() - start_times[slot];
507 data.history.latency.push_back(data.cur_latency);
508 total_latency += data.cur_latency;
509 if( data.cur_latency > data.max_latency) data.max_latency = data.cur_latency;
510 if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency;
511 ++data.finished;
512 data.avg_latency = total_latency / data.finished;
513 --data.in_flight;
514 lock.Unlock();
515 release_completion(slot);
516 timePassed = ceph_clock_now() - data.start_time;
517
518 //write new stuff to backend
519 start_times[slot] = ceph_clock_now();
520 r = create_completion(slot, _aio_cb, &lc);
521 if (r < 0)
522 goto ERR;
523 r = aio_write(newName, slot, *newContents, data.op_size,
524 data.op_size * (data.started % writes_per_object));
525 if (r < 0) {//naughty; doesn't clean up heap space.
526 goto ERR;
527 }
528 name[slot] = newName;
529 lock.Lock();
530 ++data.started;
531 ++data.in_flight;
532 if (max_objects &&
533 data.started >= (int)((data.object_size * max_objects + data.op_size - 1) /
534 data.op_size))
535 break;
536 }
537 lock.Unlock();
538
539 while (data.finished < data.started) {
540 slot = data.finished % concurrentios;
541 completion_wait(slot);
542 lock.Lock();
543 r = completion_ret(slot);
544 if (r != 0) {
545 lock.Unlock();
546 goto ERR;
547 }
548 data.cur_latency = ceph_clock_now() - start_times[slot];
549 data.history.latency.push_back(data.cur_latency);
550 total_latency += data.cur_latency;
551 if (data.cur_latency > data.max_latency) data.max_latency = data.cur_latency;
552 if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency;
553 ++data.finished;
554 data.avg_latency = total_latency / data.finished;
555 --data.in_flight;
556 lock.Unlock();
557 release_completion(slot);
558 delete contents[slot];
559 contents[slot] = 0;
560 }
561
562 timePassed = ceph_clock_now() - data.start_time;
563 lock.Lock();
564 data.done = true;
565 lock.Unlock();
566
567 pthread_join(print_thread, NULL);
568
569 double bandwidth;
570 bandwidth = ((double)data.finished)*((double)data.op_size)/(double)timePassed;
571 bandwidth = bandwidth/(1024*1024); // we want it in MB/sec
572
573 if (!formatter) {
574 out(cout) << "Total time run: " << timePassed << std::endl
575 << "Total writes made: " << data.finished << std::endl
576 << "Write size: " << data.op_size << std::endl
577 << "Object size: " << data.object_size << std::endl
578 << "Bandwidth (MB/sec): " << setprecision(6) << bandwidth << std::endl
579 << "Stddev Bandwidth: " << vec_stddev(data.history.bandwidth) << std::endl
580 << "Max bandwidth (MB/sec): " << data.idata.max_bandwidth << std::endl
581 << "Min bandwidth (MB/sec): " << data.idata.min_bandwidth << std::endl
582 << "Average IOPS: " << (int)(data.finished/timePassed) << std::endl
583 << "Stddev IOPS: " << vec_stddev(data.history.iops) << std::endl
584 << "Max IOPS: " << data.idata.max_iops << std::endl
585 << "Min IOPS: " << data.idata.min_iops << std::endl
586 << "Average Latency(s): " << data.avg_latency << std::endl
587 << "Stddev Latency(s): " << vec_stddev(data.history.latency) << std::endl
588 << "Max latency(s): " << data.max_latency << std::endl
589 << "Min latency(s): " << data.min_latency << std::endl;
590 } else {
591 formatter->dump_format("total_time_run", "%f", (double)timePassed);
592 formatter->dump_format("total_writes_made", "%d", data.finished);
593 formatter->dump_format("write_size", "%d", data.op_size);
594 formatter->dump_format("object_size", "%d", data.object_size);
595 formatter->dump_format("bandwidth", "%f", bandwidth);
596 formatter->dump_format("stddev_bandwidth", "%f", vec_stddev(data.history.bandwidth));
597 formatter->dump_format("max_bandwidth", "%f", data.idata.max_bandwidth);
598 formatter->dump_format("min_bandwidth", "%f", data.idata.min_bandwidth);
599 formatter->dump_format("average_iops", "%d", (int)(data.finished/timePassed));
600 formatter->dump_format("stddev_iops", "%d", vec_stddev(data.history.iops));
601 formatter->dump_format("max_iops", "%d", data.idata.max_iops);
602 formatter->dump_format("min_iops", "%d", data.idata.min_iops);
603 formatter->dump_format("average_latency", "%f", data.avg_latency);
604 formatter->dump_format("stddev_latency", "%f", vec_stddev(data.history.latency));
605 formatter->dump_format("max_latency:", "%f", data.max_latency);
606 formatter->dump_format("min_latency", "%f", data.min_latency);
607 }
608 //write object size/number data for read benchmarks
609 ::encode(data.object_size, b_write);
610 num_objects = (data.finished + writes_per_object - 1) / writes_per_object;
611 ::encode(num_objects, b_write);
612 ::encode(getpid(), b_write);
613 ::encode(data.op_size, b_write);
614
615 // persist meta-data for further cleanup or read
616 sync_write(run_name_meta, b_write, sizeof(int)*3);
617
618 completions_done();
619 for (int i = 0; i < concurrentios; i++)
620 if (contents[i])
621 delete contents[i];
622
623 return 0;
624
625 ERR:
626 lock.Lock();
627 data.done = 1;
628 lock.Unlock();
629 pthread_join(print_thread, NULL);
630 for (int i = 0; i < concurrentios; i++)
631 if (contents[i])
632 delete contents[i];
633 return r;
634}
635
636int ObjBencher::seq_read_bench(int seconds_to_run, int num_objects, int concurrentios, int pid, bool no_verify) {
637 lock_cond lc(&lock);
638
639 if (concurrentios <= 0)
640 return -EINVAL;
641
642 std::vector<string> name(concurrentios);
643 std::string newName;
644 bufferlist* contents[concurrentios];
645 int index[concurrentios];
646 int errors = 0;
647 utime_t start_time;
648 std::vector<utime_t> start_times(concurrentios);
649 utime_t time_to_run;
650 time_to_run.set_from_double(seconds_to_run);
651 double total_latency = 0;
652 int r = 0;
653 utime_t runtime;
654 sanitize_object_contents(&data, data.op_size); //clean it up once; subsequent
655 //changes will be safe because string length should remain the same
656
657 unsigned writes_per_object = 1;
658 if (data.op_size)
659 writes_per_object = data.object_size / data.op_size;
660
661 r = completions_init(concurrentios);
662 if (r < 0)
663 return r;
664
665 //set up initial reads
666 for (int i = 0; i < concurrentios; ++i) {
667 name[i] = generate_object_name(i / writes_per_object, pid);
668 contents[i] = new bufferlist();
669 }
670
671 lock.Lock();
672 data.finished = 0;
673 data.start_time = ceph_clock_now();
674 lock.Unlock();
675
676 pthread_t print_thread;
677 pthread_create(&print_thread, NULL, status_printer, (void *)this);
678 ceph_pthread_setname(print_thread, "seq_read_stat");
679
680 utime_t finish_time = data.start_time + time_to_run;
681 //start initial reads
682 for (int i = 0; i < concurrentios; ++i) {
683 index[i] = i;
684 start_times[i] = ceph_clock_now();
685 create_completion(i, _aio_cb, (void *)&lc);
686 r = aio_read(name[i], i, contents[i], data.op_size,
687 data.op_size * (i % writes_per_object));
688 if (r < 0) { //naughty, doesn't clean up heap -- oh, or handle the print thread!
689 cerr << "r = " << r << std::endl;
690 goto ERR;
691 }
692 lock.Lock();
693 ++data.started;
694 ++data.in_flight;
695 lock.Unlock();
696 }
697
698 //keep on adding new reads as old ones complete
699 int slot;
700 bufferlist *cur_contents;
701
702 slot = 0;
703 while ((!seconds_to_run || ceph_clock_now() < finish_time) &&
704 num_objects > data.started) {
705 lock.Lock();
706 int old_slot = slot;
707 bool found = false;
708 while (1) {
709 do {
710 if (completion_is_done(slot)) {
711 found = true;
712 break;
713 }
714 slot++;
715 if (slot == concurrentios) {
716 slot = 0;
717 }
718 } while (slot != old_slot);
719 if (found) {
720 break;
721 }
722 lc.cond.Wait(lock);
723 }
724
725 // calculate latency here, so memcmp doesn't inflate it
726 data.cur_latency = ceph_clock_now() - start_times[slot];
727
728 cur_contents = contents[slot];
729 int current_index = index[slot];
730
731 // invalidate internal crc cache
732 cur_contents->invalidate_crc();
733
734 if (!no_verify) {
735 snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", current_index);
736 if ( (cur_contents->length() != data.op_size) ||
737 (memcmp(data.object_contents, cur_contents->c_str(), data.op_size) != 0) ) {
738 cerr << name[slot] << " is not correct!" << std::endl;
739 ++errors;
740 }
741 }
742
743 newName = generate_object_name(data.started / writes_per_object, pid);
744 index[slot] = data.started;
745 lock.Unlock();
746 completion_wait(slot);
747 lock.Lock();
748 r = completion_ret(slot);
749 if (r < 0) {
750 cerr << "read got " << r << std::endl;
751 lock.Unlock();
752 goto ERR;
753 }
754 total_latency += data.cur_latency;
755 if (data.cur_latency > data.max_latency) data.max_latency = data.cur_latency;
756 if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency;
757 ++data.finished;
758 data.avg_latency = total_latency / data.finished;
759 --data.in_flight;
760 lock.Unlock();
761 release_completion(slot);
762
763 //start new read and check data if requested
764 start_times[slot] = ceph_clock_now();
765 create_completion(slot, _aio_cb, (void *)&lc);
766 r = aio_read(newName, slot, contents[slot], data.op_size,
767 data.op_size * (data.started % writes_per_object));
768 if (r < 0) {
769 goto ERR;
770 }
771 lock.Lock();
772 ++data.started;
773 ++data.in_flight;
774 lock.Unlock();
775 name[slot] = newName;
776 }
777
778 //wait for final reads to complete
779 while (data.finished < data.started) {
780 slot = data.finished % concurrentios;
781 completion_wait(slot);
782 lock.Lock();
783 r = completion_ret(slot);
784 if (r < 0) {
785 cerr << "read got " << r << std::endl;
786 lock.Unlock();
787 goto ERR;
788 }
789 data.cur_latency = ceph_clock_now() - start_times[slot];
790 total_latency += data.cur_latency;
791 if (data.cur_latency > data.max_latency) data.max_latency = data.cur_latency;
792 if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency;
793 ++data.finished;
794 data.avg_latency = total_latency / data.finished;
795 --data.in_flight;
796 release_completion(slot);
797 if (!no_verify) {
798 snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", index[slot]);
799 lock.Unlock();
800 if ((contents[slot]->length() != data.op_size) ||
801 (memcmp(data.object_contents, contents[slot]->c_str(), data.op_size) != 0)) {
802 cerr << name[slot] << " is not correct!" << std::endl;
803 ++errors;
804 }
805 } else {
806 lock.Unlock();
807 }
808 delete contents[slot];
809 }
810
811 runtime = ceph_clock_now() - data.start_time;
812 lock.Lock();
813 data.done = true;
814 lock.Unlock();
815
816 pthread_join(print_thread, NULL);
817
818 double bandwidth;
819 bandwidth = ((double)data.finished)*((double)data.op_size)/(double)runtime;
820 bandwidth = bandwidth/(1024*1024); // we want it in MB/sec
821
822 if (!formatter) {
823 out(cout) << "Total time run: " << runtime << std::endl
824 << "Total reads made: " << data.finished << std::endl
825 << "Read size: " << data.op_size << std::endl
826 << "Object size: " << data.object_size << std::endl
827 << "Bandwidth (MB/sec): " << setprecision(6) << bandwidth << std::endl
828 << "Average IOPS: " << (int)(data.finished/runtime) << std::endl
829 << "Stddev IOPS: " << vec_stddev(data.history.iops) << std::endl
830 << "Max IOPS: " << data.idata.max_iops << std::endl
831 << "Min IOPS: " << data.idata.min_iops << std::endl
832 << "Average Latency(s): " << data.avg_latency << std::endl
833 << "Max latency(s): " << data.max_latency << std::endl
834 << "Min latency(s): " << data.min_latency << std::endl;
835 } else {
836 formatter->dump_format("total_time_run", "%f", (double)runtime);
837 formatter->dump_format("total_reads_made", "%d", data.finished);
838 formatter->dump_format("read_size", "%d", data.op_size);
839 formatter->dump_format("object_size", "%d", data.object_size);
840 formatter->dump_format("bandwidth", "%f", bandwidth);
841 formatter->dump_format("average_iops", "%d", (int)(data.finished/runtime));
842 formatter->dump_format("stddev_iops", "%d", vec_stddev(data.history.iops));
843 formatter->dump_format("max_iops", "%d", data.idata.max_iops);
844 formatter->dump_format("min_iops", "%d", data.idata.min_iops);
845 formatter->dump_format("average_latency", "%f", data.avg_latency);
846 formatter->dump_format("max_latency", "%f", data.max_latency);
847 formatter->dump_format("min_latency", "%f", data.min_latency);
848 }
849
850 completions_done();
851
852 return (errors > 0 ? -EIO : 0);
853
854 ERR:
855 lock.Lock();
856 data.done = 1;
857 lock.Unlock();
858 pthread_join(print_thread, NULL);
859 return r;
860}
861
862int ObjBencher::rand_read_bench(int seconds_to_run, int num_objects, int concurrentios, int pid, bool no_verify)
863{
864 lock_cond lc(&lock);
865
866 if (concurrentios <= 0)
867 return -EINVAL;
868
869 std::vector<string> name(concurrentios);
870 std::string newName;
871 bufferlist* contents[concurrentios];
872 int index[concurrentios];
873 int errors = 0;
874 utime_t start_time;
875 std::vector<utime_t> start_times(concurrentios);
876 utime_t time_to_run;
877 time_to_run.set_from_double(seconds_to_run);
878 double total_latency = 0;
879 int r = 0;
880 utime_t runtime;
881 sanitize_object_contents(&data, data.op_size); //clean it up once; subsequent
882 //changes will be safe because string length should remain the same
883
884 unsigned writes_per_object = 1;
885 if (data.op_size)
886 writes_per_object = data.object_size / data.op_size;
887
888 srand (time(NULL));
889
890 r = completions_init(concurrentios);
891 if (r < 0)
892 return r;
893
894 //set up initial reads
895 for (int i = 0; i < concurrentios; ++i) {
896 name[i] = generate_object_name(i / writes_per_object, pid);
897 contents[i] = new bufferlist();
898 }
899
900 lock.Lock();
901 data.finished = 0;
902 data.start_time = ceph_clock_now();
903 lock.Unlock();
904
905 pthread_t print_thread;
906 pthread_create(&print_thread, NULL, status_printer, (void *)this);
907 ceph_pthread_setname(print_thread, "rand_read_stat");
908
909 utime_t finish_time = data.start_time + time_to_run;
910 //start initial reads
911 for (int i = 0; i < concurrentios; ++i) {
912 index[i] = i;
913 start_times[i] = ceph_clock_now();
914 create_completion(i, _aio_cb, (void *)&lc);
915 r = aio_read(name[i], i, contents[i], data.op_size,
916 data.op_size * (i % writes_per_object));
917 if (r < 0) { //naughty, doesn't clean up heap -- oh, or handle the print thread!
918 cerr << "r = " << r << std::endl;
919 goto ERR;
920 }
921 lock.Lock();
922 ++data.started;
923 ++data.in_flight;
924 lock.Unlock();
925 }
926
927 //keep on adding new reads as old ones complete
928 int slot;
929 bufferlist *cur_contents;
930 int rand_id;
931
932 slot = 0;
933 while ((!seconds_to_run || ceph_clock_now() < finish_time)) {
934 lock.Lock();
935 int old_slot = slot;
936 bool found = false;
937 while (1) {
938 do {
939 if (completion_is_done(slot)) {
940 found = true;
941 break;
942 }
943 slot++;
944 if (slot == concurrentios) {
945 slot = 0;
946 }
947 } while (slot != old_slot);
948 if (found) {
949 break;
950 }
951 lc.cond.Wait(lock);
952 }
953
954 // calculate latency here, so memcmp doesn't inflate it
955 data.cur_latency = ceph_clock_now() - start_times[slot];
956
957 lock.Unlock();
958
959 int current_index = index[slot];
960 cur_contents = contents[slot];
961 completion_wait(slot);
962 lock.Lock();
963 r = completion_ret(slot);
964 if (r < 0) {
965 cerr << "read got " << r << std::endl;
966 lock.Unlock();
967 goto ERR;
968 }
969
970 total_latency += data.cur_latency;
971 if (data.cur_latency > data.max_latency) data.max_latency = data.cur_latency;
972 if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency;
973 ++data.finished;
974 data.avg_latency = total_latency / data.finished;
975 --data.in_flight;
976 lock.Unlock();
977
978 if (!no_verify) {
979 snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", current_index);
980 if ((cur_contents->length() != data.op_size) ||
981 (memcmp(data.object_contents, cur_contents->c_str(), data.op_size) != 0)) {
982 cerr << name[slot] << " is not correct!" << std::endl;
983 ++errors;
984 }
985 }
986
987 rand_id = rand() % num_objects;
988 newName = generate_object_name(rand_id / writes_per_object, pid);
989 index[slot] = rand_id;
990 release_completion(slot);
991
992 // invalidate internal crc cache
993 cur_contents->invalidate_crc();
994
995 //start new read and check data if requested
996 start_times[slot] = ceph_clock_now();
997 create_completion(slot, _aio_cb, (void *)&lc);
998 r = aio_read(newName, slot, contents[slot], data.op_size,
999 data.op_size * (rand_id % writes_per_object));
1000 if (r < 0) {
1001 goto ERR;
1002 }
1003 lock.Lock();
1004 ++data.started;
1005 ++data.in_flight;
1006 lock.Unlock();
1007 name[slot] = newName;
1008 }
1009
1010
1011 //wait for final reads to complete
1012 while (data.finished < data.started) {
1013 slot = data.finished % concurrentios;
1014 completion_wait(slot);
1015 lock.Lock();
1016 r = completion_ret(slot);
1017 if (r < 0) {
1018 cerr << "read got " << r << std::endl;
1019 lock.Unlock();
1020 goto ERR;
1021 }
1022 data.cur_latency = ceph_clock_now() - start_times[slot];
1023 total_latency += data.cur_latency;
1024 if (data.cur_latency > data.max_latency) data.max_latency = data.cur_latency;
1025 if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency;
1026 ++data.finished;
1027 data.avg_latency = total_latency / data.finished;
1028 --data.in_flight;
1029 release_completion(slot);
1030 if (!no_verify) {
1031 snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", index[slot]);
1032 lock.Unlock();
1033 if ((contents[slot]->length() != data.op_size) ||
1034 (memcmp(data.object_contents, contents[slot]->c_str(), data.op_size) != 0)) {
1035 cerr << name[slot] << " is not correct!" << std::endl;
1036 ++errors;
1037 }
1038 } else {
1039 lock.Unlock();
1040 }
1041 delete contents[slot];
1042 }
1043
1044 runtime = ceph_clock_now() - data.start_time;
1045 lock.Lock();
1046 data.done = true;
1047 lock.Unlock();
1048
1049 pthread_join(print_thread, NULL);
1050
1051 double bandwidth;
1052 bandwidth = ((double)data.finished)*((double)data.op_size)/(double)runtime;
1053 bandwidth = bandwidth/(1024*1024); // we want it in MB/sec
1054
1055 if (!formatter) {
1056 out(cout) << "Total time run: " << runtime << std::endl
1057 << "Total reads made: " << data.finished << std::endl
1058 << "Read size: " << data.op_size << std::endl
1059 << "Object size: " << data.object_size << std::endl
1060 << "Bandwidth (MB/sec): " << setprecision(6) << bandwidth << std::endl
1061 << "Average IOPS: " << (int)(data.finished/runtime) << std::endl
1062 << "Stddev IOPS: " << vec_stddev(data.history.iops) << std::endl
1063 << "Max IOPS: " << data.idata.max_iops << std::endl
1064 << "Min IOPS: " << data.idata.min_iops << std::endl
1065 << "Average Latency(s): " << data.avg_latency << std::endl
1066 << "Max latency(s): " << data.max_latency << std::endl
1067 << "Min latency(s): " << data.min_latency << std::endl;
1068 } else {
1069 formatter->dump_format("total_time_run", "%f", (double)runtime);
1070 formatter->dump_format("total_reads_made", "%d", data.finished);
1071 formatter->dump_format("read_size", "%d", data.op_size);
1072 formatter->dump_format("object_size", "%d", data.object_size);
1073 formatter->dump_format("bandwidth", "%f", bandwidth);
1074 formatter->dump_format("average_iops", "%d", (int)(data.finished/runtime));
1075 formatter->dump_format("stddev_iops", "%d", vec_stddev(data.history.iops));
1076 formatter->dump_format("max_iops", "%d", data.idata.max_iops);
1077 formatter->dump_format("min_iops", "%d", data.idata.min_iops);
1078 formatter->dump_format("average_latency", "%f", data.avg_latency);
1079 formatter->dump_format("max_latency", "%f", data.max_latency);
1080 formatter->dump_format("min_latency", "%f", data.min_latency);
1081 }
1082 completions_done();
1083
1084 return (errors > 0 ? -EIO : 0);
1085
1086 ERR:
1087 lock.Lock();
1088 data.done = 1;
1089 lock.Unlock();
1090 pthread_join(print_thread, NULL);
1091 return r;
1092}
1093
1094int ObjBencher::clean_up(const std::string& orig_prefix, int concurrentios, const std::string& run_name) {
1095 int r = 0;
1096 uint64_t op_size, object_size;
1097 int num_objects;
1098 int prevPid;
1099
1100 // default meta object if user does not specify one
1101 const std::string run_name_meta = (run_name.empty() ? BENCH_LASTRUN_METADATA : run_name);
1102 const std::string prefix = (orig_prefix.empty() ? generate_object_prefix_nopid() : orig_prefix);
1103
1104 if (prefix.substr(0, BENCH_PREFIX.length()) != BENCH_PREFIX) {
1105 cerr << "Specified --prefix invalid, it must begin with \"" << BENCH_PREFIX << "\"" << std::endl;
1106 return -EINVAL;
1107 }
1108
1109 std::list<Object> unfiltered_objects;
1110 std::set<std::string> meta_namespaces, all_namespaces;
1111
1112 // If caller set all_nspaces this will be searching
1113 // across multiple namespaces.
1114 while (true) {
1115 bool objects_remain = get_objects(&unfiltered_objects, 20);
1116 if (!objects_remain)
1117 break;
1118
1119 std::list<Object>::const_iterator i = unfiltered_objects.begin();
1120 for ( ; i != unfiltered_objects.end(); ++i) {
1121 if (i->first == run_name_meta) {
1122 meta_namespaces.insert(i->second);
1123 }
1124 if (i->first.substr(0, prefix.length()) == prefix) {
1125 all_namespaces.insert(i->second);
1126 }
1127 }
1128 }
1129
1130 std::set<std::string>::const_iterator i = all_namespaces.begin();
1131 for ( ; i != all_namespaces.end(); ++i) {
1132 set_namespace(*i);
1133
1134 // if no metadata file found we should try to do a linear search on the prefix
1135 if (meta_namespaces.find(*i) == meta_namespaces.end()) {
1136 int r = clean_up_slow(prefix, concurrentios);
1137 if (r < 0) {
1138 cerr << "clean_up_slow error r= " << r << std::endl;
1139 return r;
1140 }
1141 continue;
1142 }
1143
1144 r = fetch_bench_metadata(run_name_meta, &op_size, &object_size, &num_objects, &prevPid);
1145 if (r < 0) {
1146 return r;
1147 }
1148
1149 r = clean_up(num_objects, prevPid, concurrentios);
1150 if (r != 0) return r;
1151
1152 r = sync_remove(run_name_meta);
1153 if (r != 0) return r;
1154 }
1155
1156 return 0;
1157}
1158
1159int ObjBencher::clean_up(int num_objects, int prevPid, int concurrentios) {
1160 lock_cond lc(&lock);
1161
1162 if (concurrentios <= 0)
1163 return -EINVAL;
1164
1165 std::vector<string> name(concurrentios);
1166 std::string newName;
1167 int r = 0;
1168 utime_t runtime;
1169 int slot = 0;
1170
1171 lock.Lock();
1172 data.done = false;
1173 data.in_flight = 0;
1174 data.started = 0;
1175 data.finished = 0;
1176 lock.Unlock();
1177
1178 // don't start more completions than files
1179 if (num_objects == 0) {
1180 return 0;
1181 } else if (num_objects < concurrentios) {
1182 concurrentios = num_objects;
1183 }
1184
1185 r = completions_init(concurrentios);
1186 if (r < 0)
1187 return r;
1188
1189 //set up initial removes
1190 for (int i = 0; i < concurrentios; ++i) {
1191 name[i] = generate_object_name(i, prevPid);
1192 }
1193
1194 //start initial removes
1195 for (int i = 0; i < concurrentios; ++i) {
1196 create_completion(i, _aio_cb, (void *)&lc);
1197 r = aio_remove(name[i], i);
1198 if (r < 0) { //naughty, doesn't clean up heap
1199 cerr << "r = " << r << std::endl;
1200 goto ERR;
1201 }
1202 lock.Lock();
1203 ++data.started;
1204 ++data.in_flight;
1205 lock.Unlock();
1206 }
1207
1208 //keep on adding new removes as old ones complete
1209 while (data.started < num_objects) {
1210 lock.Lock();
1211 int old_slot = slot;
1212 bool found = false;
1213 while (1) {
1214 do {
1215 if (completion_is_done(slot)) {
1216 found = true;
1217 break;
1218 }
1219 slot++;
1220 if (slot == concurrentios) {
1221 slot = 0;
1222 }
1223 } while (slot != old_slot);
1224 if (found) {
1225 break;
1226 }
1227 lc.cond.Wait(lock);
1228 }
1229 lock.Unlock();
1230 newName = generate_object_name(data.started, prevPid);
1231 completion_wait(slot);
1232 lock.Lock();
1233 r = completion_ret(slot);
1234 if (r != 0 && r != -ENOENT) { // file does not exist
1235 cerr << "remove got " << r << std::endl;
1236 lock.Unlock();
1237 goto ERR;
1238 }
1239 ++data.finished;
1240 --data.in_flight;
1241 lock.Unlock();
1242 release_completion(slot);
1243
1244 //start new remove and check data if requested
1245 create_completion(slot, _aio_cb, (void *)&lc);
1246 r = aio_remove(newName, slot);
1247 if (r < 0) {
1248 goto ERR;
1249 }
1250 lock.Lock();
1251 ++data.started;
1252 ++data.in_flight;
1253 lock.Unlock();
1254 name[slot] = newName;
1255 }
1256
1257 //wait for final removes to complete
1258 while (data.finished < data.started) {
1259 slot = data.finished % concurrentios;
1260 completion_wait(slot);
1261 lock.Lock();
1262 r = completion_ret(slot);
1263 if (r != 0 && r != -ENOENT) { // file does not exist
1264 cerr << "remove got " << r << std::endl;
1265 lock.Unlock();
1266 goto ERR;
1267 }
1268 ++data.finished;
1269 --data.in_flight;
1270 release_completion(slot);
1271 lock.Unlock();
1272 }
1273
1274 lock.Lock();
1275 data.done = true;
1276 lock.Unlock();
1277
1278 completions_done();
1279
1280 out(cout) << "Removed " << data.finished << " object" << (data.finished != 1 ? "s" : "") << std::endl;
1281
1282 return 0;
1283
1284 ERR:
1285 lock.Lock();
1286 data.done = 1;
1287 lock.Unlock();
1288 return r;
1289}
1290
1291/**
1292 * Return objects from the datastore which match a prefix.
1293 *
1294 * Clears the list and populates it with any objects which match the
1295 * prefix. The list is guaranteed to have at least one item when the
1296 * function returns true.
1297 *
1298 * @param prefix the prefix to match against
1299 * @param objects [out] return list of objects
1300 * @returns true if there are any objects in the store which match
1301 * the prefix, false if there are no more
1302 */
1303bool ObjBencher::more_objects_matching_prefix(const std::string& prefix, std::list<Object>* objects) {
1304 std::list<Object> unfiltered_objects;
1305
1306 objects->clear();
1307
1308 while (objects->empty()) {
1309 bool objects_remain = get_objects(&unfiltered_objects, 20);
1310 if (!objects_remain)
1311 return false;
1312
1313 std::list<Object>::const_iterator i = unfiltered_objects.begin();
1314 for ( ; i != unfiltered_objects.end(); ++i) {
1315 if (i->first.substr(0, prefix.length()) == prefix) {
1316 objects->push_back(*i);
1317 }
1318 }
1319 }
1320
1321 return true;
1322}
1323
1324int ObjBencher::clean_up_slow(const std::string& prefix, int concurrentios) {
1325 lock_cond lc(&lock);
1326
1327 if (concurrentios <= 0)
1328 return -EINVAL;
1329
1330 std::vector<Object> name(concurrentios);
1331 Object newName;
1332 int r = 0;
1333 utime_t runtime;
1334 int slot = 0;
1335 std::list<Object> objects;
1336 bool objects_remain = true;
1337
1338 lock.Lock();
1339 data.done = false;
1340 data.in_flight = 0;
1341 data.started = 0;
1342 data.finished = 0;
1343 lock.Unlock();
1344
1345 out(cout) << "Warning: using slow linear search" << std::endl;
1346
1347 r = completions_init(concurrentios);
1348 if (r < 0)
1349 return r;
1350
1351 //set up initial removes
1352 for (int i = 0; i < concurrentios; ++i) {
1353 if (objects.empty()) {
1354 // if there are fewer objects than concurrent ios, don't generate extras
1355 bool objects_found = more_objects_matching_prefix(prefix, &objects);
1356 if (!objects_found) {
1357 concurrentios = i;
1358 objects_remain = false;
1359 break;
1360 }
1361 }
1362
1363 name[i] = objects.front();
1364 objects.pop_front();
1365 }
1366
1367 //start initial removes
1368 for (int i = 0; i < concurrentios; ++i) {
1369 create_completion(i, _aio_cb, (void *)&lc);
1370 set_namespace(name[i].second);
1371 r = aio_remove(name[i].first, i);
1372 if (r < 0) { //naughty, doesn't clean up heap
1373 cerr << "r = " << r << std::endl;
1374 goto ERR;
1375 }
1376 lock.Lock();
1377 ++data.started;
1378 ++data.in_flight;
1379 lock.Unlock();
1380 }
1381
1382 //keep on adding new removes as old ones complete
1383 while (objects_remain) {
1384 lock.Lock();
1385 int old_slot = slot;
1386 bool found = false;
1387 while (1) {
1388 do {
1389 if (completion_is_done(slot)) {
1390 found = true;
1391 break;
1392 }
1393 slot++;
1394 if (slot == concurrentios) {
1395 slot = 0;
1396 }
1397 } while (slot != old_slot);
1398 if (found) {
1399 break;
1400 }
1401 lc.cond.Wait(lock);
1402 }
1403 lock.Unlock();
1404
1405 // get more objects if necessary
1406 if (objects.empty()) {
1407 objects_remain = more_objects_matching_prefix(prefix, &objects);
1408 // quit if there are no more
1409 if (!objects_remain) {
1410 break;
1411 }
1412 }
1413
1414 // get the next object
1415 newName = objects.front();
1416 objects.pop_front();
1417
1418 completion_wait(slot);
1419 lock.Lock();
1420 r = completion_ret(slot);
1421 if (r != 0 && r != -ENOENT) { // file does not exist
1422 cerr << "remove got " << r << std::endl;
1423 lock.Unlock();
1424 goto ERR;
1425 }
1426 ++data.finished;
1427 --data.in_flight;
1428 lock.Unlock();
1429 release_completion(slot);
1430
1431 //start new remove and check data if requested
1432 create_completion(slot, _aio_cb, (void *)&lc);
1433 set_namespace(newName.second);
1434 r = aio_remove(newName.first, slot);
1435 if (r < 0) {
1436 goto ERR;
1437 }
1438 lock.Lock();
1439 ++data.started;
1440 ++data.in_flight;
1441 lock.Unlock();
1442 name[slot] = newName;
1443 }
1444
1445 //wait for final removes to complete
1446 while (data.finished < data.started) {
1447 slot = data.finished % concurrentios;
1448 completion_wait(slot);
1449 lock.Lock();
1450 r = completion_ret(slot);
1451 if (r != 0 && r != -ENOENT) { // file does not exist
1452 cerr << "remove got " << r << std::endl;
1453 lock.Unlock();
1454 goto ERR;
1455 }
1456 ++data.finished;
1457 --data.in_flight;
1458 release_completion(slot);
1459 lock.Unlock();
1460 }
1461
1462 lock.Lock();
1463 data.done = true;
1464 lock.Unlock();
1465
1466 completions_done();
1467
1468 out(cout) << "Removed " << data.finished << " object" << (data.finished != 1 ? "s" : "") << std::endl;
1469
1470 return 0;
1471
1472 ERR:
1473 lock.Lock();
1474 data.done = 1;
1475 lock.Unlock();
1476 return -EIO;
1477}