]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | /* | |
4 | * Ceph - scalable distributed file system | |
5 | * | |
6 | * Copyright (C) 2009 Sage Weil <sage@newdream.net> | |
7 | * | |
8 | * This is free software; you can redistribute it and/or | |
9 | * modify it under the terms of the GNU Lesser General Public | |
10 | * License version 2.1, as published by the Free Software | |
11 | * Foundation. See file COPYING. | |
12 | * | |
13 | * Series of functions to test your rados installation. Notice | |
14 | * that this code is not terribly robust -- for instance, if you | |
15 | * try and bench on a pool you don't have permission to access | |
16 | * it will just loop forever. | |
17 | */ | |
18 | #include "include/compat.h" | |
19 | #include <pthread.h> | |
20 | #include "common/Cond.h" | |
21 | #include "obj_bencher.h" | |
22 | ||
7c673cae FG |
23 | const std::string BENCH_LASTRUN_METADATA = "benchmark_last_metadata"; |
24 | const std::string BENCH_PREFIX = "benchmark_data"; | |
25 | static char cached_hostname[30] = {0}; | |
26 | int cached_pid = 0; | |
27 | ||
28 | static std::string generate_object_prefix_nopid() { | |
29 | if (cached_hostname[0] == 0) { | |
30 | gethostname(cached_hostname, sizeof(cached_hostname)-1); | |
31 | cached_hostname[sizeof(cached_hostname)-1] = 0; | |
32 | } | |
33 | ||
34 | std::ostringstream oss; | |
35 | oss << BENCH_PREFIX << "_" << cached_hostname; | |
36 | return oss.str(); | |
37 | } | |
38 | ||
39 | static std::string generate_object_prefix(int pid = 0) { | |
40 | if (pid) | |
41 | cached_pid = pid; | |
42 | else if (!cached_pid) | |
43 | cached_pid = getpid(); | |
44 | ||
45 | std::ostringstream oss; | |
46 | oss << generate_object_prefix_nopid() << "_" << cached_pid; | |
47 | return oss.str(); | |
48 | } | |
49 | ||
50 | static std::string generate_object_name(int objnum, int pid = 0) | |
51 | { | |
52 | std::ostringstream oss; | |
53 | oss << generate_object_prefix(pid) << "_object" << objnum; | |
54 | return oss.str(); | |
55 | } | |
56 | ||
57 | static void sanitize_object_contents (bench_data *data, size_t length) { | |
58 | memset(data->object_contents, 'z', length); | |
59 | } | |
60 | ||
61 | ostream& ObjBencher::out(ostream& os, utime_t& t) | |
62 | { | |
63 | if (show_time) | |
64 | return t.localtime(os) << " "; | |
65 | else | |
66 | return os; | |
67 | } | |
68 | ||
69 | ostream& ObjBencher::out(ostream& os) | |
70 | { | |
71 | utime_t cur_time = ceph_clock_now(); | |
72 | return out(os, cur_time); | |
73 | } | |
74 | ||
75 | void *ObjBencher::status_printer(void *_bencher) { | |
76 | ObjBencher *bencher = static_cast<ObjBencher *>(_bencher); | |
77 | bench_data& data = bencher->data; | |
78 | Formatter *formatter = bencher->formatter; | |
79 | ostream *outstream = bencher->outstream; | |
80 | Cond cond; | |
81 | int i = 0; | |
82 | int previous_writes = 0; | |
83 | int cycleSinceChange = 0; | |
84 | double bandwidth; | |
85 | int iops; | |
86 | utime_t ONE_SECOND; | |
87 | ONE_SECOND.set_from_double(1.0); | |
88 | bencher->lock.Lock(); | |
89 | if (formatter) | |
90 | formatter->open_array_section("datas"); | |
91 | while(!data.done) { | |
92 | utime_t cur_time = ceph_clock_now(); | |
93 | ||
94 | if (i % 20 == 0 && !formatter) { | |
95 | if (i > 0) | |
96 | cur_time.localtime(cout) << " min lat: " << data.min_latency | |
97 | << " max lat: " << data.max_latency | |
98 | << " avg lat: " << data.avg_latency << std::endl; | |
99 | //I'm naughty and don't reset the fill | |
100 | bencher->out(cout, cur_time) << setfill(' ') | |
101 | << setw(5) << "sec" | |
102 | << setw(8) << "Cur ops" | |
103 | << setw(10) << "started" | |
104 | << setw(10) << "finished" | |
105 | << setw(10) << "avg MB/s" | |
106 | << setw(10) << "cur MB/s" | |
107 | << setw(12) << "last lat(s)" | |
108 | << setw(12) << "avg lat(s)" << std::endl; | |
109 | } | |
110 | if (cycleSinceChange) | |
111 | bandwidth = (double)(data.finished - previous_writes) | |
112 | * (data.op_size) | |
113 | / (1024*1024) | |
114 | / cycleSinceChange; | |
115 | else | |
116 | bandwidth = -1; | |
117 | ||
118 | if (!std::isnan(bandwidth) && bandwidth > -1) { | |
119 | if (bandwidth > data.idata.max_bandwidth) | |
120 | data.idata.max_bandwidth = bandwidth; | |
121 | if (bandwidth < data.idata.min_bandwidth) | |
122 | data.idata.min_bandwidth = bandwidth; | |
123 | ||
124 | data.history.bandwidth.push_back(bandwidth); | |
125 | } | |
126 | ||
127 | if (cycleSinceChange) | |
128 | iops = (double)(data.finished - previous_writes) | |
129 | / cycleSinceChange; | |
130 | else | |
131 | iops = -1; | |
132 | ||
133 | if (!std::isnan(iops) && iops > -1) { | |
134 | if (iops > data.idata.max_iops) | |
135 | data.idata.max_iops = iops; | |
136 | if (iops < data.idata.min_iops) | |
137 | data.idata.min_iops = iops; | |
138 | ||
139 | data.history.iops.push_back(iops); | |
140 | } | |
141 | ||
142 | if (formatter) | |
143 | formatter->open_object_section("data"); | |
144 | ||
145 | double avg_bandwidth = (double) (data.op_size) * (data.finished) | |
146 | / (double)(cur_time - data.start_time) / (1024*1024); | |
147 | if (previous_writes != data.finished) { | |
148 | previous_writes = data.finished; | |
149 | cycleSinceChange = 0; | |
150 | if (!formatter) { | |
151 | bencher->out(cout, cur_time) | |
152 | << setfill(' ') | |
153 | << setw(5) << i | |
154 | << ' ' << setw(7) << data.in_flight | |
155 | << ' ' << setw(9) << data.started | |
156 | << ' ' << setw(9) << data.finished | |
157 | << ' ' << setw(9) << avg_bandwidth | |
158 | << ' ' << setw(9) << bandwidth | |
159 | << ' ' << setw(11) << (double)data.cur_latency | |
160 | << ' ' << setw(11) << data.avg_latency << std::endl; | |
161 | } else { | |
162 | formatter->dump_format("sec", "%d", i); | |
163 | formatter->dump_format("cur_ops", "%d", data.in_flight); | |
164 | formatter->dump_format("started", "%d", data.started); | |
165 | formatter->dump_format("finished", "%d", data.finished); | |
166 | formatter->dump_format("avg_bw", "%f", avg_bandwidth); | |
167 | formatter->dump_format("cur_bw", "%f", bandwidth); | |
168 | formatter->dump_format("last_lat", "%f", (double)data.cur_latency); | |
169 | formatter->dump_format("avg_lat", "%f", data.avg_latency); | |
170 | } | |
171 | } | |
172 | else { | |
173 | if (!formatter) { | |
174 | bencher->out(cout, cur_time) | |
175 | << setfill(' ') | |
176 | << setw(5) << i | |
177 | << ' ' << setw(7) << data.in_flight | |
178 | << ' ' << setw(9) << data.started | |
179 | << ' ' << setw(9) << data.finished | |
180 | << ' ' << setw(9) << avg_bandwidth | |
181 | << ' ' << setw(9) << '0' | |
182 | << ' ' << setw(11) << '-' | |
183 | << ' '<< setw(11) << data.avg_latency << std::endl; | |
184 | } else { | |
185 | formatter->dump_format("sec", "%d", i); | |
186 | formatter->dump_format("cur_ops", "%d", data.in_flight); | |
187 | formatter->dump_format("started", "%d", data.started); | |
188 | formatter->dump_format("finished", "%d", data.finished); | |
189 | formatter->dump_format("avg_bw", "%f", avg_bandwidth); | |
190 | formatter->dump_format("cur_bw", "%f", 0); | |
191 | formatter->dump_format("last_lat", "%f", 0); | |
192 | formatter->dump_format("avg_lat", "%f", data.avg_latency); | |
193 | } | |
194 | } | |
195 | if (formatter) { | |
196 | formatter->close_section(); // data | |
197 | formatter->flush(*outstream); | |
198 | } | |
199 | ++i; | |
200 | ++cycleSinceChange; | |
201 | cond.WaitInterval(bencher->lock, ONE_SECOND); | |
202 | } | |
203 | if (formatter) | |
204 | formatter->close_section(); //datas | |
205 | bencher->lock.Unlock(); | |
206 | return NULL; | |
207 | } | |
208 | ||
209 | int ObjBencher::aio_bench( | |
210 | int operation, int secondsToRun, | |
211 | int concurrentios, | |
212 | uint64_t op_size, uint64_t object_size, | |
213 | unsigned max_objects, | |
214 | bool cleanup, bool hints, | |
215 | const std::string& run_name, bool no_verify) { | |
216 | ||
217 | if (concurrentios <= 0) | |
218 | return -EINVAL; | |
219 | ||
220 | int num_objects = 0; | |
221 | int r = 0; | |
222 | int prevPid = 0; | |
223 | utime_t runtime; | |
224 | ||
225 | // default metadata object is used if user does not specify one | |
226 | const std::string run_name_meta = (run_name.empty() ? BENCH_LASTRUN_METADATA : run_name); | |
227 | ||
228 | //get data from previous write run, if available | |
229 | if (operation != OP_WRITE) { | |
230 | uint64_t prev_op_size, prev_object_size; | |
231 | r = fetch_bench_metadata(run_name_meta, &prev_op_size, &prev_object_size, | |
232 | &num_objects, &prevPid); | |
233 | if (r < 0) { | |
234 | if (r == -ENOENT) | |
235 | cerr << "Must write data before running a read benchmark!" << std::endl; | |
236 | return r; | |
237 | } | |
238 | object_size = prev_object_size; | |
239 | op_size = prev_op_size; | |
240 | } | |
241 | ||
242 | char* contentsChars = new char[op_size]; | |
243 | lock.Lock(); | |
244 | data.done = false; | |
245 | data.hints = hints; | |
246 | data.object_size = object_size; | |
247 | data.op_size = op_size; | |
248 | data.in_flight = 0; | |
249 | data.started = 0; | |
250 | data.finished = 0; | |
251 | data.min_latency = 9999.0; // this better be higher than initial latency! | |
252 | data.max_latency = 0; | |
253 | data.avg_latency = 0; | |
254 | data.object_contents = contentsChars; | |
255 | lock.Unlock(); | |
256 | ||
257 | //fill in contentsChars deterministically so we can check returns | |
258 | sanitize_object_contents(&data, data.op_size); | |
259 | ||
260 | if (formatter) | |
261 | formatter->open_object_section("bench"); | |
262 | ||
263 | if (OP_WRITE == operation) { | |
264 | r = write_bench(secondsToRun, concurrentios, run_name_meta, max_objects); | |
265 | if (r != 0) goto out; | |
266 | } | |
267 | else if (OP_SEQ_READ == operation) { | |
268 | r = seq_read_bench(secondsToRun, num_objects, concurrentios, prevPid, no_verify); | |
269 | if (r != 0) goto out; | |
270 | } | |
271 | else if (OP_RAND_READ == operation) { | |
272 | r = rand_read_bench(secondsToRun, num_objects, concurrentios, prevPid, no_verify); | |
273 | if (r != 0) goto out; | |
274 | } | |
275 | ||
276 | if (OP_WRITE == operation && cleanup) { | |
277 | r = fetch_bench_metadata(run_name_meta, &op_size, &object_size, | |
278 | &num_objects, &prevPid); | |
279 | if (r < 0) { | |
280 | if (r == -ENOENT) | |
281 | cerr << "Should never happen: bench metadata missing for current run!" << std::endl; | |
282 | goto out; | |
283 | } | |
284 | ||
285 | data.start_time = ceph_clock_now(); | |
286 | out(cout) << "Cleaning up (deleting benchmark objects)" << std::endl; | |
287 | ||
288 | r = clean_up(num_objects, prevPid, concurrentios); | |
289 | if (r != 0) goto out; | |
290 | ||
291 | runtime = ceph_clock_now() - data.start_time; | |
292 | out(cout) << "Clean up completed and total clean up time :" << runtime << std::endl; | |
293 | ||
294 | // lastrun file | |
295 | r = sync_remove(run_name_meta); | |
296 | if (r != 0) goto out; | |
297 | } | |
298 | ||
299 | out: | |
300 | if (formatter) { | |
301 | formatter->close_section(); // bench | |
302 | formatter->flush(*outstream); | |
303 | *outstream << std::endl; | |
304 | } | |
305 | delete[] contentsChars; | |
306 | return r; | |
307 | } | |
308 | ||
309 | struct lock_cond { | |
310 | explicit lock_cond(Mutex *_lock) : lock(_lock) {} | |
311 | Mutex *lock; | |
312 | Cond cond; | |
313 | }; | |
314 | ||
315 | void _aio_cb(void *cb, void *arg) { | |
316 | struct lock_cond *lc = (struct lock_cond *)arg; | |
317 | lc->lock->Lock(); | |
318 | lc->cond.Signal(); | |
319 | lc->lock->Unlock(); | |
320 | } | |
321 | ||
322 | template<class T> | |
323 | static T vec_stddev(vector<T>& v) | |
324 | { | |
325 | T mean = 0; | |
326 | ||
327 | if (v.size() < 2) | |
328 | return 0; | |
329 | ||
330 | typename vector<T>::iterator iter; | |
331 | for (iter = v.begin(); iter != v.end(); ++iter) { | |
332 | mean += *iter; | |
333 | } | |
334 | ||
335 | mean /= v.size(); | |
336 | ||
337 | T stddev = 0; | |
338 | for (iter = v.begin(); iter != v.end(); ++iter) { | |
339 | T dev = *iter - mean; | |
340 | dev *= dev; | |
341 | stddev += dev; | |
342 | } | |
343 | stddev /= (v.size() - 1); | |
344 | return sqrt(stddev); | |
345 | } | |
346 | ||
347 | int ObjBencher::fetch_bench_metadata(const std::string& metadata_file, | |
348 | uint64_t *op_size, uint64_t* object_size, | |
349 | int* num_objects, int* prevPid) { | |
350 | int r = 0; | |
351 | bufferlist object_data; | |
352 | ||
353 | r = sync_read(metadata_file, object_data, | |
354 | sizeof(int) * 2 + sizeof(size_t) * 2); | |
355 | if (r <= 0) { | |
356 | // treat an empty file as a file that does not exist | |
357 | if (r == 0) { | |
358 | r = -ENOENT; | |
359 | } | |
360 | return r; | |
361 | } | |
362 | bufferlist::iterator p = object_data.begin(); | |
363 | ::decode(*object_size, p); | |
364 | ::decode(*num_objects, p); | |
365 | ::decode(*prevPid, p); | |
366 | if (!p.end()) { | |
367 | ::decode(*op_size, p); | |
368 | } else { | |
369 | *op_size = *object_size; | |
370 | } | |
371 | ||
372 | return 0; | |
373 | } | |
374 | ||
375 | int ObjBencher::write_bench(int secondsToRun, | |
376 | int concurrentios, const string& run_name_meta, | |
377 | unsigned max_objects) { | |
378 | if (concurrentios <= 0) | |
379 | return -EINVAL; | |
380 | ||
381 | if (!formatter) { | |
382 | out(cout) << "Maintaining " << concurrentios << " concurrent writes of " | |
383 | << data.op_size << " bytes to objects of size " | |
384 | << data.object_size << " for up to " | |
385 | << secondsToRun << " seconds or " | |
386 | << max_objects << " objects" | |
387 | << std::endl; | |
388 | } else { | |
389 | formatter->dump_format("concurrent_ios", "%d", concurrentios); | |
390 | formatter->dump_format("object_size", "%d", data.object_size); | |
391 | formatter->dump_format("op_size", "%d", data.op_size); | |
392 | formatter->dump_format("seconds_to_run", "%d", secondsToRun); | |
393 | formatter->dump_format("max_objects", "%d", max_objects); | |
394 | } | |
395 | bufferlist* newContents = 0; | |
396 | ||
397 | std::string prefix = generate_object_prefix(); | |
398 | if (!formatter) | |
399 | out(cout) << "Object prefix: " << prefix << std::endl; | |
400 | else | |
401 | formatter->dump_string("object_prefix", prefix); | |
402 | ||
403 | std::vector<string> name(concurrentios); | |
404 | std::string newName; | |
405 | bufferlist* contents[concurrentios]; | |
406 | double total_latency = 0; | |
407 | std::vector<utime_t> start_times(concurrentios); | |
408 | utime_t stopTime; | |
409 | int r = 0; | |
410 | bufferlist b_write; | |
411 | lock_cond lc(&lock); | |
412 | utime_t runtime; | |
413 | utime_t timePassed; | |
414 | ||
415 | unsigned writes_per_object = 1; | |
416 | if (data.op_size) | |
417 | writes_per_object = data.object_size / data.op_size; | |
418 | ||
419 | r = completions_init(concurrentios); | |
420 | ||
421 | //set up writes so I can start them together | |
422 | for (int i = 0; i<concurrentios; ++i) { | |
423 | name[i] = generate_object_name(i / writes_per_object); | |
424 | contents[i] = new bufferlist(); | |
425 | snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", i); | |
426 | contents[i]->append(data.object_contents, data.op_size); | |
427 | } | |
428 | ||
429 | pthread_t print_thread; | |
430 | ||
431 | pthread_create(&print_thread, NULL, ObjBencher::status_printer, (void *)this); | |
432 | ceph_pthread_setname(print_thread, "write_stat"); | |
433 | lock.Lock(); | |
434 | data.finished = 0; | |
435 | data.start_time = ceph_clock_now(); | |
436 | lock.Unlock(); | |
437 | for (int i = 0; i<concurrentios; ++i) { | |
438 | start_times[i] = ceph_clock_now(); | |
439 | r = create_completion(i, _aio_cb, (void *)&lc); | |
440 | if (r < 0) | |
441 | goto ERR; | |
442 | r = aio_write(name[i], i, *contents[i], data.op_size, | |
443 | data.op_size * (i % writes_per_object)); | |
444 | if (r < 0) { //naughty, doesn't clean up heap | |
445 | goto ERR; | |
446 | } | |
447 | lock.Lock(); | |
448 | ++data.started; | |
449 | ++data.in_flight; | |
450 | lock.Unlock(); | |
451 | } | |
452 | ||
453 | //keep on adding new writes as old ones complete until we've passed minimum time | |
454 | int slot; | |
455 | int num_objects; | |
456 | ||
457 | //don't need locking for reads because other thread doesn't write | |
458 | ||
459 | runtime.set_from_double(secondsToRun); | |
460 | stopTime = data.start_time + runtime; | |
461 | slot = 0; | |
462 | lock.Lock(); | |
463 | while (!secondsToRun || ceph_clock_now() < stopTime) { | |
464 | bool found = false; | |
465 | while (1) { | |
466 | int old_slot = slot; | |
467 | do { | |
468 | if (completion_is_done(slot)) { | |
469 | found = true; | |
470 | break; | |
471 | } | |
472 | slot++; | |
473 | if (slot == concurrentios) { | |
474 | slot = 0; | |
475 | } | |
476 | } while (slot != old_slot); | |
477 | if (found) | |
478 | break; | |
479 | lc.cond.Wait(lock); | |
480 | } | |
481 | lock.Unlock(); | |
482 | //create new contents and name on the heap, and fill them | |
483 | newName = generate_object_name(data.started / writes_per_object); | |
484 | newContents = contents[slot]; | |
485 | snprintf(newContents->c_str(), data.op_size, "I'm the %16dth op!", data.started); | |
486 | // we wrote to buffer, going around internal crc cache, so invalidate it now. | |
487 | newContents->invalidate_crc(); | |
488 | ||
489 | completion_wait(slot); | |
490 | lock.Lock(); | |
491 | r = completion_ret(slot); | |
492 | if (r != 0) { | |
493 | lock.Unlock(); | |
494 | goto ERR; | |
495 | } | |
496 | data.cur_latency = ceph_clock_now() - start_times[slot]; | |
497 | data.history.latency.push_back(data.cur_latency); | |
498 | total_latency += data.cur_latency; | |
499 | if( data.cur_latency > data.max_latency) data.max_latency = data.cur_latency; | |
500 | if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency; | |
501 | ++data.finished; | |
502 | data.avg_latency = total_latency / data.finished; | |
503 | --data.in_flight; | |
504 | lock.Unlock(); | |
505 | release_completion(slot); | |
506 | timePassed = ceph_clock_now() - data.start_time; | |
507 | ||
508 | //write new stuff to backend | |
509 | start_times[slot] = ceph_clock_now(); | |
510 | r = create_completion(slot, _aio_cb, &lc); | |
511 | if (r < 0) | |
512 | goto ERR; | |
513 | r = aio_write(newName, slot, *newContents, data.op_size, | |
514 | data.op_size * (data.started % writes_per_object)); | |
515 | if (r < 0) {//naughty; doesn't clean up heap space. | |
516 | goto ERR; | |
517 | } | |
518 | name[slot] = newName; | |
519 | lock.Lock(); | |
520 | ++data.started; | |
521 | ++data.in_flight; | |
522 | if (max_objects && | |
523 | data.started >= (int)((data.object_size * max_objects + data.op_size - 1) / | |
524 | data.op_size)) | |
525 | break; | |
526 | } | |
527 | lock.Unlock(); | |
528 | ||
529 | while (data.finished < data.started) { | |
530 | slot = data.finished % concurrentios; | |
531 | completion_wait(slot); | |
532 | lock.Lock(); | |
533 | r = completion_ret(slot); | |
534 | if (r != 0) { | |
535 | lock.Unlock(); | |
536 | goto ERR; | |
537 | } | |
538 | data.cur_latency = ceph_clock_now() - start_times[slot]; | |
539 | data.history.latency.push_back(data.cur_latency); | |
540 | total_latency += data.cur_latency; | |
541 | if (data.cur_latency > data.max_latency) data.max_latency = data.cur_latency; | |
542 | if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency; | |
543 | ++data.finished; | |
544 | data.avg_latency = total_latency / data.finished; | |
545 | --data.in_flight; | |
546 | lock.Unlock(); | |
547 | release_completion(slot); | |
548 | delete contents[slot]; | |
549 | contents[slot] = 0; | |
550 | } | |
551 | ||
552 | timePassed = ceph_clock_now() - data.start_time; | |
553 | lock.Lock(); | |
554 | data.done = true; | |
555 | lock.Unlock(); | |
556 | ||
557 | pthread_join(print_thread, NULL); | |
558 | ||
559 | double bandwidth; | |
560 | bandwidth = ((double)data.finished)*((double)data.op_size)/(double)timePassed; | |
561 | bandwidth = bandwidth/(1024*1024); // we want it in MB/sec | |
562 | ||
563 | if (!formatter) { | |
564 | out(cout) << "Total time run: " << timePassed << std::endl | |
565 | << "Total writes made: " << data.finished << std::endl | |
566 | << "Write size: " << data.op_size << std::endl | |
567 | << "Object size: " << data.object_size << std::endl | |
568 | << "Bandwidth (MB/sec): " << setprecision(6) << bandwidth << std::endl | |
569 | << "Stddev Bandwidth: " << vec_stddev(data.history.bandwidth) << std::endl | |
570 | << "Max bandwidth (MB/sec): " << data.idata.max_bandwidth << std::endl | |
571 | << "Min bandwidth (MB/sec): " << data.idata.min_bandwidth << std::endl | |
572 | << "Average IOPS: " << (int)(data.finished/timePassed) << std::endl | |
573 | << "Stddev IOPS: " << vec_stddev(data.history.iops) << std::endl | |
574 | << "Max IOPS: " << data.idata.max_iops << std::endl | |
575 | << "Min IOPS: " << data.idata.min_iops << std::endl | |
576 | << "Average Latency(s): " << data.avg_latency << std::endl | |
577 | << "Stddev Latency(s): " << vec_stddev(data.history.latency) << std::endl | |
578 | << "Max latency(s): " << data.max_latency << std::endl | |
579 | << "Min latency(s): " << data.min_latency << std::endl; | |
580 | } else { | |
581 | formatter->dump_format("total_time_run", "%f", (double)timePassed); | |
582 | formatter->dump_format("total_writes_made", "%d", data.finished); | |
583 | formatter->dump_format("write_size", "%d", data.op_size); | |
584 | formatter->dump_format("object_size", "%d", data.object_size); | |
585 | formatter->dump_format("bandwidth", "%f", bandwidth); | |
586 | formatter->dump_format("stddev_bandwidth", "%f", vec_stddev(data.history.bandwidth)); | |
587 | formatter->dump_format("max_bandwidth", "%f", data.idata.max_bandwidth); | |
588 | formatter->dump_format("min_bandwidth", "%f", data.idata.min_bandwidth); | |
589 | formatter->dump_format("average_iops", "%d", (int)(data.finished/timePassed)); | |
590 | formatter->dump_format("stddev_iops", "%d", vec_stddev(data.history.iops)); | |
591 | formatter->dump_format("max_iops", "%d", data.idata.max_iops); | |
592 | formatter->dump_format("min_iops", "%d", data.idata.min_iops); | |
593 | formatter->dump_format("average_latency", "%f", data.avg_latency); | |
594 | formatter->dump_format("stddev_latency", "%f", vec_stddev(data.history.latency)); | |
595 | formatter->dump_format("max_latency:", "%f", data.max_latency); | |
596 | formatter->dump_format("min_latency", "%f", data.min_latency); | |
597 | } | |
598 | //write object size/number data for read benchmarks | |
599 | ::encode(data.object_size, b_write); | |
600 | num_objects = (data.finished + writes_per_object - 1) / writes_per_object; | |
601 | ::encode(num_objects, b_write); | |
602 | ::encode(getpid(), b_write); | |
603 | ::encode(data.op_size, b_write); | |
604 | ||
605 | // persist meta-data for further cleanup or read | |
606 | sync_write(run_name_meta, b_write, sizeof(int)*3); | |
607 | ||
608 | completions_done(); | |
609 | for (int i = 0; i < concurrentios; i++) | |
610 | if (contents[i]) | |
611 | delete contents[i]; | |
612 | ||
613 | return 0; | |
614 | ||
615 | ERR: | |
616 | lock.Lock(); | |
617 | data.done = 1; | |
618 | lock.Unlock(); | |
619 | pthread_join(print_thread, NULL); | |
620 | for (int i = 0; i < concurrentios; i++) | |
621 | if (contents[i]) | |
622 | delete contents[i]; | |
623 | return r; | |
624 | } | |
625 | ||
626 | int ObjBencher::seq_read_bench(int seconds_to_run, int num_objects, int concurrentios, int pid, bool no_verify) { | |
627 | lock_cond lc(&lock); | |
628 | ||
629 | if (concurrentios <= 0) | |
630 | return -EINVAL; | |
631 | ||
632 | std::vector<string> name(concurrentios); | |
633 | std::string newName; | |
634 | bufferlist* contents[concurrentios]; | |
635 | int index[concurrentios]; | |
636 | int errors = 0; | |
637 | utime_t start_time; | |
638 | std::vector<utime_t> start_times(concurrentios); | |
639 | utime_t time_to_run; | |
640 | time_to_run.set_from_double(seconds_to_run); | |
641 | double total_latency = 0; | |
642 | int r = 0; | |
643 | utime_t runtime; | |
644 | sanitize_object_contents(&data, data.op_size); //clean it up once; subsequent | |
645 | //changes will be safe because string length should remain the same | |
646 | ||
647 | unsigned writes_per_object = 1; | |
648 | if (data.op_size) | |
649 | writes_per_object = data.object_size / data.op_size; | |
650 | ||
651 | r = completions_init(concurrentios); | |
652 | if (r < 0) | |
653 | return r; | |
654 | ||
655 | //set up initial reads | |
656 | for (int i = 0; i < concurrentios; ++i) { | |
657 | name[i] = generate_object_name(i / writes_per_object, pid); | |
658 | contents[i] = new bufferlist(); | |
659 | } | |
660 | ||
661 | lock.Lock(); | |
662 | data.finished = 0; | |
663 | data.start_time = ceph_clock_now(); | |
664 | lock.Unlock(); | |
665 | ||
666 | pthread_t print_thread; | |
667 | pthread_create(&print_thread, NULL, status_printer, (void *)this); | |
668 | ceph_pthread_setname(print_thread, "seq_read_stat"); | |
669 | ||
670 | utime_t finish_time = data.start_time + time_to_run; | |
671 | //start initial reads | |
672 | for (int i = 0; i < concurrentios; ++i) { | |
673 | index[i] = i; | |
674 | start_times[i] = ceph_clock_now(); | |
675 | create_completion(i, _aio_cb, (void *)&lc); | |
676 | r = aio_read(name[i], i, contents[i], data.op_size, | |
677 | data.op_size * (i % writes_per_object)); | |
678 | if (r < 0) { //naughty, doesn't clean up heap -- oh, or handle the print thread! | |
679 | cerr << "r = " << r << std::endl; | |
680 | goto ERR; | |
681 | } | |
682 | lock.Lock(); | |
683 | ++data.started; | |
684 | ++data.in_flight; | |
685 | lock.Unlock(); | |
686 | } | |
687 | ||
688 | //keep on adding new reads as old ones complete | |
689 | int slot; | |
690 | bufferlist *cur_contents; | |
691 | ||
692 | slot = 0; | |
693 | while ((!seconds_to_run || ceph_clock_now() < finish_time) && | |
694 | num_objects > data.started) { | |
695 | lock.Lock(); | |
696 | int old_slot = slot; | |
697 | bool found = false; | |
698 | while (1) { | |
699 | do { | |
700 | if (completion_is_done(slot)) { | |
701 | found = true; | |
702 | break; | |
703 | } | |
704 | slot++; | |
705 | if (slot == concurrentios) { | |
706 | slot = 0; | |
707 | } | |
708 | } while (slot != old_slot); | |
709 | if (found) { | |
710 | break; | |
711 | } | |
712 | lc.cond.Wait(lock); | |
713 | } | |
714 | ||
715 | // calculate latency here, so memcmp doesn't inflate it | |
716 | data.cur_latency = ceph_clock_now() - start_times[slot]; | |
717 | ||
718 | cur_contents = contents[slot]; | |
719 | int current_index = index[slot]; | |
720 | ||
721 | // invalidate internal crc cache | |
722 | cur_contents->invalidate_crc(); | |
723 | ||
724 | if (!no_verify) { | |
725 | snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", current_index); | |
726 | if ( (cur_contents->length() != data.op_size) || | |
727 | (memcmp(data.object_contents, cur_contents->c_str(), data.op_size) != 0) ) { | |
728 | cerr << name[slot] << " is not correct!" << std::endl; | |
729 | ++errors; | |
730 | } | |
731 | } | |
732 | ||
733 | newName = generate_object_name(data.started / writes_per_object, pid); | |
734 | index[slot] = data.started; | |
735 | lock.Unlock(); | |
736 | completion_wait(slot); | |
737 | lock.Lock(); | |
738 | r = completion_ret(slot); | |
739 | if (r < 0) { | |
740 | cerr << "read got " << r << std::endl; | |
741 | lock.Unlock(); | |
742 | goto ERR; | |
743 | } | |
744 | total_latency += data.cur_latency; | |
745 | if (data.cur_latency > data.max_latency) data.max_latency = data.cur_latency; | |
746 | if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency; | |
747 | ++data.finished; | |
748 | data.avg_latency = total_latency / data.finished; | |
749 | --data.in_flight; | |
750 | lock.Unlock(); | |
751 | release_completion(slot); | |
752 | ||
753 | //start new read and check data if requested | |
754 | start_times[slot] = ceph_clock_now(); | |
755 | create_completion(slot, _aio_cb, (void *)&lc); | |
756 | r = aio_read(newName, slot, contents[slot], data.op_size, | |
757 | data.op_size * (data.started % writes_per_object)); | |
758 | if (r < 0) { | |
759 | goto ERR; | |
760 | } | |
761 | lock.Lock(); | |
762 | ++data.started; | |
763 | ++data.in_flight; | |
764 | lock.Unlock(); | |
765 | name[slot] = newName; | |
766 | } | |
767 | ||
768 | //wait for final reads to complete | |
769 | while (data.finished < data.started) { | |
770 | slot = data.finished % concurrentios; | |
771 | completion_wait(slot); | |
772 | lock.Lock(); | |
773 | r = completion_ret(slot); | |
774 | if (r < 0) { | |
775 | cerr << "read got " << r << std::endl; | |
776 | lock.Unlock(); | |
777 | goto ERR; | |
778 | } | |
779 | data.cur_latency = ceph_clock_now() - start_times[slot]; | |
780 | total_latency += data.cur_latency; | |
781 | if (data.cur_latency > data.max_latency) data.max_latency = data.cur_latency; | |
782 | if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency; | |
783 | ++data.finished; | |
784 | data.avg_latency = total_latency / data.finished; | |
785 | --data.in_flight; | |
786 | release_completion(slot); | |
787 | if (!no_verify) { | |
788 | snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", index[slot]); | |
789 | lock.Unlock(); | |
790 | if ((contents[slot]->length() != data.op_size) || | |
791 | (memcmp(data.object_contents, contents[slot]->c_str(), data.op_size) != 0)) { | |
792 | cerr << name[slot] << " is not correct!" << std::endl; | |
793 | ++errors; | |
794 | } | |
795 | } else { | |
796 | lock.Unlock(); | |
797 | } | |
798 | delete contents[slot]; | |
799 | } | |
800 | ||
801 | runtime = ceph_clock_now() - data.start_time; | |
802 | lock.Lock(); | |
803 | data.done = true; | |
804 | lock.Unlock(); | |
805 | ||
806 | pthread_join(print_thread, NULL); | |
807 | ||
808 | double bandwidth; | |
809 | bandwidth = ((double)data.finished)*((double)data.op_size)/(double)runtime; | |
810 | bandwidth = bandwidth/(1024*1024); // we want it in MB/sec | |
811 | ||
812 | if (!formatter) { | |
813 | out(cout) << "Total time run: " << runtime << std::endl | |
814 | << "Total reads made: " << data.finished << std::endl | |
815 | << "Read size: " << data.op_size << std::endl | |
816 | << "Object size: " << data.object_size << std::endl | |
817 | << "Bandwidth (MB/sec): " << setprecision(6) << bandwidth << std::endl | |
818 | << "Average IOPS: " << (int)(data.finished/runtime) << std::endl | |
819 | << "Stddev IOPS: " << vec_stddev(data.history.iops) << std::endl | |
820 | << "Max IOPS: " << data.idata.max_iops << std::endl | |
821 | << "Min IOPS: " << data.idata.min_iops << std::endl | |
822 | << "Average Latency(s): " << data.avg_latency << std::endl | |
823 | << "Max latency(s): " << data.max_latency << std::endl | |
824 | << "Min latency(s): " << data.min_latency << std::endl; | |
825 | } else { | |
826 | formatter->dump_format("total_time_run", "%f", (double)runtime); | |
827 | formatter->dump_format("total_reads_made", "%d", data.finished); | |
828 | formatter->dump_format("read_size", "%d", data.op_size); | |
829 | formatter->dump_format("object_size", "%d", data.object_size); | |
830 | formatter->dump_format("bandwidth", "%f", bandwidth); | |
831 | formatter->dump_format("average_iops", "%d", (int)(data.finished/runtime)); | |
832 | formatter->dump_format("stddev_iops", "%d", vec_stddev(data.history.iops)); | |
833 | formatter->dump_format("max_iops", "%d", data.idata.max_iops); | |
834 | formatter->dump_format("min_iops", "%d", data.idata.min_iops); | |
835 | formatter->dump_format("average_latency", "%f", data.avg_latency); | |
836 | formatter->dump_format("max_latency", "%f", data.max_latency); | |
837 | formatter->dump_format("min_latency", "%f", data.min_latency); | |
838 | } | |
839 | ||
840 | completions_done(); | |
841 | ||
842 | return (errors > 0 ? -EIO : 0); | |
843 | ||
844 | ERR: | |
845 | lock.Lock(); | |
846 | data.done = 1; | |
847 | lock.Unlock(); | |
848 | pthread_join(print_thread, NULL); | |
849 | return r; | |
850 | } | |
851 | ||
852 | int ObjBencher::rand_read_bench(int seconds_to_run, int num_objects, int concurrentios, int pid, bool no_verify) | |
853 | { | |
854 | lock_cond lc(&lock); | |
855 | ||
856 | if (concurrentios <= 0) | |
857 | return -EINVAL; | |
858 | ||
859 | std::vector<string> name(concurrentios); | |
860 | std::string newName; | |
861 | bufferlist* contents[concurrentios]; | |
862 | int index[concurrentios]; | |
863 | int errors = 0; | |
864 | utime_t start_time; | |
865 | std::vector<utime_t> start_times(concurrentios); | |
866 | utime_t time_to_run; | |
867 | time_to_run.set_from_double(seconds_to_run); | |
868 | double total_latency = 0; | |
869 | int r = 0; | |
870 | utime_t runtime; | |
871 | sanitize_object_contents(&data, data.op_size); //clean it up once; subsequent | |
872 | //changes will be safe because string length should remain the same | |
873 | ||
874 | unsigned writes_per_object = 1; | |
875 | if (data.op_size) | |
876 | writes_per_object = data.object_size / data.op_size; | |
877 | ||
878 | srand (time(NULL)); | |
879 | ||
880 | r = completions_init(concurrentios); | |
881 | if (r < 0) | |
882 | return r; | |
883 | ||
884 | //set up initial reads | |
885 | for (int i = 0; i < concurrentios; ++i) { | |
886 | name[i] = generate_object_name(i / writes_per_object, pid); | |
887 | contents[i] = new bufferlist(); | |
888 | } | |
889 | ||
890 | lock.Lock(); | |
891 | data.finished = 0; | |
892 | data.start_time = ceph_clock_now(); | |
893 | lock.Unlock(); | |
894 | ||
895 | pthread_t print_thread; | |
896 | pthread_create(&print_thread, NULL, status_printer, (void *)this); | |
897 | ceph_pthread_setname(print_thread, "rand_read_stat"); | |
898 | ||
899 | utime_t finish_time = data.start_time + time_to_run; | |
900 | //start initial reads | |
901 | for (int i = 0; i < concurrentios; ++i) { | |
902 | index[i] = i; | |
903 | start_times[i] = ceph_clock_now(); | |
904 | create_completion(i, _aio_cb, (void *)&lc); | |
905 | r = aio_read(name[i], i, contents[i], data.op_size, | |
906 | data.op_size * (i % writes_per_object)); | |
907 | if (r < 0) { //naughty, doesn't clean up heap -- oh, or handle the print thread! | |
908 | cerr << "r = " << r << std::endl; | |
909 | goto ERR; | |
910 | } | |
911 | lock.Lock(); | |
912 | ++data.started; | |
913 | ++data.in_flight; | |
914 | lock.Unlock(); | |
915 | } | |
916 | ||
917 | //keep on adding new reads as old ones complete | |
918 | int slot; | |
919 | bufferlist *cur_contents; | |
920 | int rand_id; | |
921 | ||
922 | slot = 0; | |
923 | while ((!seconds_to_run || ceph_clock_now() < finish_time)) { | |
924 | lock.Lock(); | |
925 | int old_slot = slot; | |
926 | bool found = false; | |
927 | while (1) { | |
928 | do { | |
929 | if (completion_is_done(slot)) { | |
930 | found = true; | |
931 | break; | |
932 | } | |
933 | slot++; | |
934 | if (slot == concurrentios) { | |
935 | slot = 0; | |
936 | } | |
937 | } while (slot != old_slot); | |
938 | if (found) { | |
939 | break; | |
940 | } | |
941 | lc.cond.Wait(lock); | |
942 | } | |
943 | ||
944 | // calculate latency here, so memcmp doesn't inflate it | |
945 | data.cur_latency = ceph_clock_now() - start_times[slot]; | |
946 | ||
947 | lock.Unlock(); | |
948 | ||
949 | int current_index = index[slot]; | |
950 | cur_contents = contents[slot]; | |
951 | completion_wait(slot); | |
952 | lock.Lock(); | |
953 | r = completion_ret(slot); | |
954 | if (r < 0) { | |
955 | cerr << "read got " << r << std::endl; | |
956 | lock.Unlock(); | |
957 | goto ERR; | |
958 | } | |
959 | ||
960 | total_latency += data.cur_latency; | |
961 | if (data.cur_latency > data.max_latency) data.max_latency = data.cur_latency; | |
962 | if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency; | |
963 | ++data.finished; | |
964 | data.avg_latency = total_latency / data.finished; | |
965 | --data.in_flight; | |
966 | lock.Unlock(); | |
967 | ||
968 | if (!no_verify) { | |
969 | snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", current_index); | |
970 | if ((cur_contents->length() != data.op_size) || | |
971 | (memcmp(data.object_contents, cur_contents->c_str(), data.op_size) != 0)) { | |
972 | cerr << name[slot] << " is not correct!" << std::endl; | |
973 | ++errors; | |
974 | } | |
975 | } | |
976 | ||
977 | rand_id = rand() % num_objects; | |
978 | newName = generate_object_name(rand_id / writes_per_object, pid); | |
979 | index[slot] = rand_id; | |
980 | release_completion(slot); | |
981 | ||
982 | // invalidate internal crc cache | |
983 | cur_contents->invalidate_crc(); | |
984 | ||
985 | //start new read and check data if requested | |
986 | start_times[slot] = ceph_clock_now(); | |
987 | create_completion(slot, _aio_cb, (void *)&lc); | |
988 | r = aio_read(newName, slot, contents[slot], data.op_size, | |
989 | data.op_size * (rand_id % writes_per_object)); | |
990 | if (r < 0) { | |
991 | goto ERR; | |
992 | } | |
993 | lock.Lock(); | |
994 | ++data.started; | |
995 | ++data.in_flight; | |
996 | lock.Unlock(); | |
997 | name[slot] = newName; | |
998 | } | |
999 | ||
1000 | ||
1001 | //wait for final reads to complete | |
1002 | while (data.finished < data.started) { | |
1003 | slot = data.finished % concurrentios; | |
1004 | completion_wait(slot); | |
1005 | lock.Lock(); | |
1006 | r = completion_ret(slot); | |
1007 | if (r < 0) { | |
1008 | cerr << "read got " << r << std::endl; | |
1009 | lock.Unlock(); | |
1010 | goto ERR; | |
1011 | } | |
1012 | data.cur_latency = ceph_clock_now() - start_times[slot]; | |
1013 | total_latency += data.cur_latency; | |
1014 | if (data.cur_latency > data.max_latency) data.max_latency = data.cur_latency; | |
1015 | if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency; | |
1016 | ++data.finished; | |
1017 | data.avg_latency = total_latency / data.finished; | |
1018 | --data.in_flight; | |
1019 | release_completion(slot); | |
1020 | if (!no_verify) { | |
1021 | snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", index[slot]); | |
1022 | lock.Unlock(); | |
1023 | if ((contents[slot]->length() != data.op_size) || | |
1024 | (memcmp(data.object_contents, contents[slot]->c_str(), data.op_size) != 0)) { | |
1025 | cerr << name[slot] << " is not correct!" << std::endl; | |
1026 | ++errors; | |
1027 | } | |
1028 | } else { | |
1029 | lock.Unlock(); | |
1030 | } | |
1031 | delete contents[slot]; | |
1032 | } | |
1033 | ||
1034 | runtime = ceph_clock_now() - data.start_time; | |
1035 | lock.Lock(); | |
1036 | data.done = true; | |
1037 | lock.Unlock(); | |
1038 | ||
1039 | pthread_join(print_thread, NULL); | |
1040 | ||
1041 | double bandwidth; | |
1042 | bandwidth = ((double)data.finished)*((double)data.op_size)/(double)runtime; | |
1043 | bandwidth = bandwidth/(1024*1024); // we want it in MB/sec | |
1044 | ||
1045 | if (!formatter) { | |
1046 | out(cout) << "Total time run: " << runtime << std::endl | |
1047 | << "Total reads made: " << data.finished << std::endl | |
1048 | << "Read size: " << data.op_size << std::endl | |
1049 | << "Object size: " << data.object_size << std::endl | |
1050 | << "Bandwidth (MB/sec): " << setprecision(6) << bandwidth << std::endl | |
1051 | << "Average IOPS: " << (int)(data.finished/runtime) << std::endl | |
1052 | << "Stddev IOPS: " << vec_stddev(data.history.iops) << std::endl | |
1053 | << "Max IOPS: " << data.idata.max_iops << std::endl | |
1054 | << "Min IOPS: " << data.idata.min_iops << std::endl | |
1055 | << "Average Latency(s): " << data.avg_latency << std::endl | |
1056 | << "Max latency(s): " << data.max_latency << std::endl | |
1057 | << "Min latency(s): " << data.min_latency << std::endl; | |
1058 | } else { | |
1059 | formatter->dump_format("total_time_run", "%f", (double)runtime); | |
1060 | formatter->dump_format("total_reads_made", "%d", data.finished); | |
1061 | formatter->dump_format("read_size", "%d", data.op_size); | |
1062 | formatter->dump_format("object_size", "%d", data.object_size); | |
1063 | formatter->dump_format("bandwidth", "%f", bandwidth); | |
1064 | formatter->dump_format("average_iops", "%d", (int)(data.finished/runtime)); | |
1065 | formatter->dump_format("stddev_iops", "%d", vec_stddev(data.history.iops)); | |
1066 | formatter->dump_format("max_iops", "%d", data.idata.max_iops); | |
1067 | formatter->dump_format("min_iops", "%d", data.idata.min_iops); | |
1068 | formatter->dump_format("average_latency", "%f", data.avg_latency); | |
1069 | formatter->dump_format("max_latency", "%f", data.max_latency); | |
1070 | formatter->dump_format("min_latency", "%f", data.min_latency); | |
1071 | } | |
1072 | completions_done(); | |
1073 | ||
1074 | return (errors > 0 ? -EIO : 0); | |
1075 | ||
1076 | ERR: | |
1077 | lock.Lock(); | |
1078 | data.done = 1; | |
1079 | lock.Unlock(); | |
1080 | pthread_join(print_thread, NULL); | |
1081 | return r; | |
1082 | } | |
1083 | ||
1084 | int ObjBencher::clean_up(const std::string& orig_prefix, int concurrentios, const std::string& run_name) { | |
1085 | int r = 0; | |
1086 | uint64_t op_size, object_size; | |
1087 | int num_objects; | |
1088 | int prevPid; | |
1089 | ||
1090 | // default meta object if user does not specify one | |
1091 | const std::string run_name_meta = (run_name.empty() ? BENCH_LASTRUN_METADATA : run_name); | |
1092 | const std::string prefix = (orig_prefix.empty() ? generate_object_prefix_nopid() : orig_prefix); | |
1093 | ||
1094 | if (prefix.substr(0, BENCH_PREFIX.length()) != BENCH_PREFIX) { | |
1095 | cerr << "Specified --prefix invalid, it must begin with \"" << BENCH_PREFIX << "\"" << std::endl; | |
1096 | return -EINVAL; | |
1097 | } | |
1098 | ||
1099 | std::list<Object> unfiltered_objects; | |
1100 | std::set<std::string> meta_namespaces, all_namespaces; | |
1101 | ||
1102 | // If caller set all_nspaces this will be searching | |
1103 | // across multiple namespaces. | |
1104 | while (true) { | |
1105 | bool objects_remain = get_objects(&unfiltered_objects, 20); | |
1106 | if (!objects_remain) | |
1107 | break; | |
1108 | ||
1109 | std::list<Object>::const_iterator i = unfiltered_objects.begin(); | |
1110 | for ( ; i != unfiltered_objects.end(); ++i) { | |
1111 | if (i->first == run_name_meta) { | |
1112 | meta_namespaces.insert(i->second); | |
1113 | } | |
1114 | if (i->first.substr(0, prefix.length()) == prefix) { | |
1115 | all_namespaces.insert(i->second); | |
1116 | } | |
1117 | } | |
1118 | } | |
1119 | ||
1120 | std::set<std::string>::const_iterator i = all_namespaces.begin(); | |
1121 | for ( ; i != all_namespaces.end(); ++i) { | |
1122 | set_namespace(*i); | |
1123 | ||
1124 | // if no metadata file found we should try to do a linear search on the prefix | |
1125 | if (meta_namespaces.find(*i) == meta_namespaces.end()) { | |
1126 | int r = clean_up_slow(prefix, concurrentios); | |
1127 | if (r < 0) { | |
1128 | cerr << "clean_up_slow error r= " << r << std::endl; | |
1129 | return r; | |
1130 | } | |
1131 | continue; | |
1132 | } | |
1133 | ||
1134 | r = fetch_bench_metadata(run_name_meta, &op_size, &object_size, &num_objects, &prevPid); | |
1135 | if (r < 0) { | |
1136 | return r; | |
1137 | } | |
1138 | ||
1139 | r = clean_up(num_objects, prevPid, concurrentios); | |
1140 | if (r != 0) return r; | |
1141 | ||
1142 | r = sync_remove(run_name_meta); | |
1143 | if (r != 0) return r; | |
1144 | } | |
1145 | ||
1146 | return 0; | |
1147 | } | |
1148 | ||
1149 | int ObjBencher::clean_up(int num_objects, int prevPid, int concurrentios) { | |
1150 | lock_cond lc(&lock); | |
1151 | ||
1152 | if (concurrentios <= 0) | |
1153 | return -EINVAL; | |
1154 | ||
1155 | std::vector<string> name(concurrentios); | |
1156 | std::string newName; | |
1157 | int r = 0; | |
1158 | utime_t runtime; | |
1159 | int slot = 0; | |
1160 | ||
1161 | lock.Lock(); | |
1162 | data.done = false; | |
1163 | data.in_flight = 0; | |
1164 | data.started = 0; | |
1165 | data.finished = 0; | |
1166 | lock.Unlock(); | |
1167 | ||
1168 | // don't start more completions than files | |
1169 | if (num_objects == 0) { | |
1170 | return 0; | |
1171 | } else if (num_objects < concurrentios) { | |
1172 | concurrentios = num_objects; | |
1173 | } | |
1174 | ||
1175 | r = completions_init(concurrentios); | |
1176 | if (r < 0) | |
1177 | return r; | |
1178 | ||
1179 | //set up initial removes | |
1180 | for (int i = 0; i < concurrentios; ++i) { | |
1181 | name[i] = generate_object_name(i, prevPid); | |
1182 | } | |
1183 | ||
1184 | //start initial removes | |
1185 | for (int i = 0; i < concurrentios; ++i) { | |
1186 | create_completion(i, _aio_cb, (void *)&lc); | |
1187 | r = aio_remove(name[i], i); | |
1188 | if (r < 0) { //naughty, doesn't clean up heap | |
1189 | cerr << "r = " << r << std::endl; | |
1190 | goto ERR; | |
1191 | } | |
1192 | lock.Lock(); | |
1193 | ++data.started; | |
1194 | ++data.in_flight; | |
1195 | lock.Unlock(); | |
1196 | } | |
1197 | ||
1198 | //keep on adding new removes as old ones complete | |
1199 | while (data.started < num_objects) { | |
1200 | lock.Lock(); | |
1201 | int old_slot = slot; | |
1202 | bool found = false; | |
1203 | while (1) { | |
1204 | do { | |
1205 | if (completion_is_done(slot)) { | |
1206 | found = true; | |
1207 | break; | |
1208 | } | |
1209 | slot++; | |
1210 | if (slot == concurrentios) { | |
1211 | slot = 0; | |
1212 | } | |
1213 | } while (slot != old_slot); | |
1214 | if (found) { | |
1215 | break; | |
1216 | } | |
1217 | lc.cond.Wait(lock); | |
1218 | } | |
1219 | lock.Unlock(); | |
1220 | newName = generate_object_name(data.started, prevPid); | |
1221 | completion_wait(slot); | |
1222 | lock.Lock(); | |
1223 | r = completion_ret(slot); | |
1224 | if (r != 0 && r != -ENOENT) { // file does not exist | |
1225 | cerr << "remove got " << r << std::endl; | |
1226 | lock.Unlock(); | |
1227 | goto ERR; | |
1228 | } | |
1229 | ++data.finished; | |
1230 | --data.in_flight; | |
1231 | lock.Unlock(); | |
1232 | release_completion(slot); | |
1233 | ||
1234 | //start new remove and check data if requested | |
1235 | create_completion(slot, _aio_cb, (void *)&lc); | |
1236 | r = aio_remove(newName, slot); | |
1237 | if (r < 0) { | |
1238 | goto ERR; | |
1239 | } | |
1240 | lock.Lock(); | |
1241 | ++data.started; | |
1242 | ++data.in_flight; | |
1243 | lock.Unlock(); | |
1244 | name[slot] = newName; | |
1245 | } | |
1246 | ||
1247 | //wait for final removes to complete | |
1248 | while (data.finished < data.started) { | |
1249 | slot = data.finished % concurrentios; | |
1250 | completion_wait(slot); | |
1251 | lock.Lock(); | |
1252 | r = completion_ret(slot); | |
1253 | if (r != 0 && r != -ENOENT) { // file does not exist | |
1254 | cerr << "remove got " << r << std::endl; | |
1255 | lock.Unlock(); | |
1256 | goto ERR; | |
1257 | } | |
1258 | ++data.finished; | |
1259 | --data.in_flight; | |
1260 | release_completion(slot); | |
1261 | lock.Unlock(); | |
1262 | } | |
1263 | ||
1264 | lock.Lock(); | |
1265 | data.done = true; | |
1266 | lock.Unlock(); | |
1267 | ||
1268 | completions_done(); | |
1269 | ||
1270 | out(cout) << "Removed " << data.finished << " object" << (data.finished != 1 ? "s" : "") << std::endl; | |
1271 | ||
1272 | return 0; | |
1273 | ||
1274 | ERR: | |
1275 | lock.Lock(); | |
1276 | data.done = 1; | |
1277 | lock.Unlock(); | |
1278 | return r; | |
1279 | } | |
1280 | ||
1281 | /** | |
1282 | * Return objects from the datastore which match a prefix. | |
1283 | * | |
1284 | * Clears the list and populates it with any objects which match the | |
1285 | * prefix. The list is guaranteed to have at least one item when the | |
1286 | * function returns true. | |
1287 | * | |
1288 | * @param prefix the prefix to match against | |
1289 | * @param objects [out] return list of objects | |
1290 | * @returns true if there are any objects in the store which match | |
1291 | * the prefix, false if there are no more | |
1292 | */ | |
1293 | bool ObjBencher::more_objects_matching_prefix(const std::string& prefix, std::list<Object>* objects) { | |
1294 | std::list<Object> unfiltered_objects; | |
1295 | ||
1296 | objects->clear(); | |
1297 | ||
1298 | while (objects->empty()) { | |
1299 | bool objects_remain = get_objects(&unfiltered_objects, 20); | |
1300 | if (!objects_remain) | |
1301 | return false; | |
1302 | ||
1303 | std::list<Object>::const_iterator i = unfiltered_objects.begin(); | |
1304 | for ( ; i != unfiltered_objects.end(); ++i) { | |
1305 | if (i->first.substr(0, prefix.length()) == prefix) { | |
1306 | objects->push_back(*i); | |
1307 | } | |
1308 | } | |
1309 | } | |
1310 | ||
1311 | return true; | |
1312 | } | |
1313 | ||
1314 | int ObjBencher::clean_up_slow(const std::string& prefix, int concurrentios) { | |
1315 | lock_cond lc(&lock); | |
1316 | ||
1317 | if (concurrentios <= 0) | |
1318 | return -EINVAL; | |
1319 | ||
1320 | std::vector<Object> name(concurrentios); | |
1321 | Object newName; | |
1322 | int r = 0; | |
1323 | utime_t runtime; | |
1324 | int slot = 0; | |
1325 | std::list<Object> objects; | |
1326 | bool objects_remain = true; | |
1327 | ||
1328 | lock.Lock(); | |
1329 | data.done = false; | |
1330 | data.in_flight = 0; | |
1331 | data.started = 0; | |
1332 | data.finished = 0; | |
1333 | lock.Unlock(); | |
1334 | ||
1335 | out(cout) << "Warning: using slow linear search" << std::endl; | |
1336 | ||
1337 | r = completions_init(concurrentios); | |
1338 | if (r < 0) | |
1339 | return r; | |
1340 | ||
1341 | //set up initial removes | |
1342 | for (int i = 0; i < concurrentios; ++i) { | |
1343 | if (objects.empty()) { | |
1344 | // if there are fewer objects than concurrent ios, don't generate extras | |
1345 | bool objects_found = more_objects_matching_prefix(prefix, &objects); | |
1346 | if (!objects_found) { | |
1347 | concurrentios = i; | |
1348 | objects_remain = false; | |
1349 | break; | |
1350 | } | |
1351 | } | |
1352 | ||
1353 | name[i] = objects.front(); | |
1354 | objects.pop_front(); | |
1355 | } | |
1356 | ||
1357 | //start initial removes | |
1358 | for (int i = 0; i < concurrentios; ++i) { | |
1359 | create_completion(i, _aio_cb, (void *)&lc); | |
1360 | set_namespace(name[i].second); | |
1361 | r = aio_remove(name[i].first, i); | |
1362 | if (r < 0) { //naughty, doesn't clean up heap | |
1363 | cerr << "r = " << r << std::endl; | |
1364 | goto ERR; | |
1365 | } | |
1366 | lock.Lock(); | |
1367 | ++data.started; | |
1368 | ++data.in_flight; | |
1369 | lock.Unlock(); | |
1370 | } | |
1371 | ||
1372 | //keep on adding new removes as old ones complete | |
1373 | while (objects_remain) { | |
1374 | lock.Lock(); | |
1375 | int old_slot = slot; | |
1376 | bool found = false; | |
1377 | while (1) { | |
1378 | do { | |
1379 | if (completion_is_done(slot)) { | |
1380 | found = true; | |
1381 | break; | |
1382 | } | |
1383 | slot++; | |
1384 | if (slot == concurrentios) { | |
1385 | slot = 0; | |
1386 | } | |
1387 | } while (slot != old_slot); | |
1388 | if (found) { | |
1389 | break; | |
1390 | } | |
1391 | lc.cond.Wait(lock); | |
1392 | } | |
1393 | lock.Unlock(); | |
1394 | ||
1395 | // get more objects if necessary | |
1396 | if (objects.empty()) { | |
1397 | objects_remain = more_objects_matching_prefix(prefix, &objects); | |
1398 | // quit if there are no more | |
1399 | if (!objects_remain) { | |
1400 | break; | |
1401 | } | |
1402 | } | |
1403 | ||
1404 | // get the next object | |
1405 | newName = objects.front(); | |
1406 | objects.pop_front(); | |
1407 | ||
1408 | completion_wait(slot); | |
1409 | lock.Lock(); | |
1410 | r = completion_ret(slot); | |
1411 | if (r != 0 && r != -ENOENT) { // file does not exist | |
1412 | cerr << "remove got " << r << std::endl; | |
1413 | lock.Unlock(); | |
1414 | goto ERR; | |
1415 | } | |
1416 | ++data.finished; | |
1417 | --data.in_flight; | |
1418 | lock.Unlock(); | |
1419 | release_completion(slot); | |
1420 | ||
1421 | //start new remove and check data if requested | |
1422 | create_completion(slot, _aio_cb, (void *)&lc); | |
1423 | set_namespace(newName.second); | |
1424 | r = aio_remove(newName.first, slot); | |
1425 | if (r < 0) { | |
1426 | goto ERR; | |
1427 | } | |
1428 | lock.Lock(); | |
1429 | ++data.started; | |
1430 | ++data.in_flight; | |
1431 | lock.Unlock(); | |
1432 | name[slot] = newName; | |
1433 | } | |
1434 | ||
1435 | //wait for final removes to complete | |
1436 | while (data.finished < data.started) { | |
1437 | slot = data.finished % concurrentios; | |
1438 | completion_wait(slot); | |
1439 | lock.Lock(); | |
1440 | r = completion_ret(slot); | |
1441 | if (r != 0 && r != -ENOENT) { // file does not exist | |
1442 | cerr << "remove got " << r << std::endl; | |
1443 | lock.Unlock(); | |
1444 | goto ERR; | |
1445 | } | |
1446 | ++data.finished; | |
1447 | --data.in_flight; | |
1448 | release_completion(slot); | |
1449 | lock.Unlock(); | |
1450 | } | |
1451 | ||
1452 | lock.Lock(); | |
1453 | data.done = true; | |
1454 | lock.Unlock(); | |
1455 | ||
1456 | completions_done(); | |
1457 | ||
1458 | out(cout) << "Removed " << data.finished << " object" << (data.finished != 1 ? "s" : "") << std::endl; | |
1459 | ||
1460 | return 0; | |
1461 | ||
1462 | ERR: | |
1463 | lock.Lock(); | |
1464 | data.done = 1; | |
1465 | lock.Unlock(); | |
1466 | return -EIO; | |
1467 | } |