*/
#include "include/compat.h"
#include <pthread.h>
-#include "common/Cond.h"
+#include "common/ceph_mutex.h"
+#include "common/Clock.h"
#include "obj_bencher.h"
const std::string BENCH_LASTRUN_METADATA = "benchmark_last_metadata";
bench_data& data = bencher->data;
Formatter *formatter = bencher->formatter;
ostream *outstream = bencher->outstream;
- Cond cond;
+ ceph::condition_variable cond;
int i = 0;
int previous_writes = 0;
int cycleSinceChange = 0;
double bandwidth;
int iops = 0;
mono_clock::duration ONE_SECOND = std::chrono::seconds(1);
- bencher->lock.lock();
+ std::unique_lock locker{bencher->lock};
if (formatter)
formatter->open_array_section("datas");
while(!data.done) {
}
++i;
++cycleSinceChange;
- cond.WaitInterval(bencher->lock, ONE_SECOND);
+ cond.wait_for(locker, ONE_SECOND);
}
if (formatter)
formatter->close_section(); //datas
std::chrono::duration<double> runtime = mono_clock::now() - data.start_time;
data.idata.min_iops = data.idata.max_iops = data.finished / runtime.count();
}
- bencher->lock.unlock();
return NULL;
}
if (concurrentios <= 0)
return -EINVAL;
+ int num_ops = 0;
int num_objects = 0;
int r = 0;
int prev_pid = 0;
if (operation != OP_WRITE || reuse_bench) {
uint64_t prev_op_size, prev_object_size;
r = fetch_bench_metadata(run_name_meta, &prev_op_size, &prev_object_size,
- &num_objects, &prev_pid);
+ &num_ops, &num_objects, &prev_pid);
if (r < 0) {
if (r == -ENOENT) {
if (reuse_bench)
if (r != 0) goto out;
}
else if (OP_SEQ_READ == operation) {
- r = seq_read_bench(secondsToRun, num_objects, concurrentios, prev_pid, no_verify);
+ r = seq_read_bench(secondsToRun, num_ops, num_objects, concurrentios, prev_pid, no_verify);
if (r != 0) goto out;
}
else if (OP_RAND_READ == operation) {
- r = rand_read_bench(secondsToRun, num_objects, concurrentios, prev_pid, no_verify);
+ r = rand_read_bench(secondsToRun, num_ops, num_objects, concurrentios, prev_pid, no_verify);
if (r != 0) goto out;
}
if (OP_WRITE == operation && cleanup) {
r = fetch_bench_metadata(run_name_meta, &op_size, &object_size,
- &num_objects, &prev_pid);
+ &num_ops, &num_objects, &prev_pid);
if (r < 0) {
if (r == -ENOENT)
cerr << "Should never happen: bench metadata missing for current run!" << std::endl;
}
struct lock_cond {
- explicit lock_cond(Mutex *_lock) : lock(_lock) {}
- Mutex *lock;
- Cond cond;
+ explicit lock_cond(ceph::mutex *_lock) : lock(_lock) {}
+ ceph::mutex *lock;
+ ceph::condition_variable cond;
};
void _aio_cb(void *cb, void *arg) {
struct lock_cond *lc = (struct lock_cond *)arg;
lc->lock->lock();
- lc->cond.Signal();
+ lc->cond.notify_all();
lc->lock->unlock();
}
int ObjBencher::fetch_bench_metadata(const std::string& metadata_file,
uint64_t *op_size, uint64_t* object_size,
- int* num_objects, int* prevPid) {
+ int* num_ops, int* num_objects, int* prevPid) {
int r = 0;
bufferlist object_data;
}
auto p = object_data.cbegin();
decode(*object_size, p);
- decode(*num_objects, p);
+ decode(*num_ops, p);
decode(*prevPid, p);
if (!p.end()) {
decode(*op_size, p);
} else {
*op_size = *object_size;
}
+ unsigned ops_per_object = 1;
+ // make sure *op_size value is reasonable
+ if (*op_size > 0 && *object_size > *op_size) {
+ ops_per_object = *object_size / *op_size;
+ }
+ *num_objects = (*num_ops + ops_per_object - 1) / ops_per_object;
return 0;
}
int ObjBencher::write_bench(int secondsToRun,
int concurrentios, const string& run_name_meta,
unsigned max_objects, int prev_pid) {
- if (concurrentios <= 0)
+ if (concurrentios <= 0)
return -EINVAL;
-
+
if (!formatter) {
out(cout) << "Maintaining " << concurrentios << " concurrent writes of "
<< data.op_size << " bytes to objects of size "
pthread_create(&print_thread, NULL, ObjBencher::status_printer, (void *)this);
ceph_pthread_setname(print_thread, "write_stat");
- lock.lock();
+ std::unique_lock locker{lock};
data.finished = 0;
data.start_time = mono_clock::now();
- lock.unlock();
+ locker.unlock();
for (int i = 0; i<concurrentios; ++i) {
start_times[i] = mono_clock::now();
r = create_completion(i, _aio_cb, (void *)&lc);
if (r < 0) {
goto ERR;
}
- lock.lock();
+ locker.lock();
++data.started;
++data.in_flight;
- lock.unlock();
+ locker.unlock();
}
//keep on adding new writes as old ones complete until we've passed minimum time
int slot;
- int num_objects;
//don't need locking for reads because other thread doesn't write
stopTime = data.start_time + std::chrono::seconds(secondsToRun);
slot = 0;
- lock.lock();
- while (secondsToRun && mono_clock::now() < stopTime) {
+ locker.lock();
+ while (data.finished < data.started) {
bool found = false;
while (1) {
int old_slot = slot;
} while (slot != old_slot);
if (found)
break;
- lc.cond.Wait(lock);
+ lc.cond.wait(locker);
}
- lock.unlock();
- //create new contents and name on the heap, and fill them
- newName = generate_object_name_fast(data.started / writes_per_object);
- newContents = contents[slot].get();
- snprintf(newContents->c_str(), data.op_size, "I'm the %16dth op!", data.started);
- // we wrote to buffer, going around internal crc cache, so invalidate it now.
- newContents->invalidate_crc();
+ locker.unlock();
completion_wait(slot);
- lock.lock();
+ locker.lock();
r = completion_ret(slot);
if (r != 0) {
- lock.unlock();
+ locker.unlock();
goto ERR;
}
data.cur_latency = mono_clock::now() - start_times[slot];
data.avg_latency = total_latency / data.finished;
data.latency_diff_sum += delta * (data.cur_latency.count() - data.avg_latency);
--data.in_flight;
- lock.unlock();
+ locker.unlock();
release_completion(slot);
+ if (!secondsToRun || mono_clock::now() >= stopTime) {
+ locker.lock();
+ continue;
+ }
+
+ if (data.op_size && max_objects &&
+ data.started >=
+ (int)((data.object_size * max_objects + data.op_size - 1) /
+ data.op_size)) {
+ locker.lock();
+ continue;
+ }
+
//write new stuff to backend
+
+ //create new contents and name on the heap, and fill them
+ newName = generate_object_name_fast(data.started / writes_per_object);
+ newContents = contents[slot].get();
+ snprintf(newContents->c_str(), data.op_size, "I'm the %16dth op!", data.started);
+ // we wrote to buffer, going around internal crc cache, so invalidate it now.
+ newContents->invalidate_crc();
+
start_times[slot] = mono_clock::now();
r = create_completion(slot, _aio_cb, &lc);
if (r < 0)
goto ERR;
}
name[slot] = newName;
- lock.lock();
+ locker.lock();
++data.started;
++data.in_flight;
- if (data.op_size) {
- if (max_objects &&
- data.started >= (int)((data.object_size * max_objects + data.op_size - 1) /
- data.op_size))
- break;
- }
- }
- lock.unlock();
-
- while (data.finished < data.started) {
- slot = data.finished % concurrentios;
- completion_wait(slot);
- lock.lock();
- r = completion_ret(slot);
- if (r != 0) {
- lock.unlock();
- goto ERR;
- }
- data.cur_latency = mono_clock::now() - start_times[slot];
- total_latency += data.cur_latency.count();
- if (data.cur_latency.count() > data.max_latency)
- data.max_latency = data.cur_latency.count();
- if (data.cur_latency.count() < data.min_latency)
- data.min_latency = data.cur_latency.count();
- ++data.finished;
- double delta = data.cur_latency.count() - data.avg_latency;
- data.avg_latency = total_latency / data.finished;
- data.latency_diff_sum += delta * (data.cur_latency.count() - data.avg_latency);
- --data.in_flight;
- lock.unlock();
- release_completion(slot);
}
+ locker.unlock();
timePassed = mono_clock::now() - data.start_time;
- lock.lock();
+ locker.lock();
data.done = true;
- lock.unlock();
+ locker.unlock();
pthread_join(print_thread, NULL);
out(cout) << "Total time run: " << timePassed.count() << std::endl
<< "Total writes made: " << data.finished << std::endl
<< "Write size: " << data.op_size << std::endl
- << "Object size: " << data.object_size << std::endl
+ << "Object size: " << data.object_size << std::endl
<< "Bandwidth (MB/sec): " << setprecision(6) << bandwidth << std::endl
<< "Stddev Bandwidth: " << bandwidth_stddev << std::endl
<< "Max bandwidth (MB/sec): " << data.idata.max_bandwidth << std::endl
}
//write object size/number data for read benchmarks
encode(data.object_size, b_write);
- num_objects = (data.finished + writes_per_object - 1) / writes_per_object;
- encode(num_objects, b_write);
+ encode(data.finished, b_write);
encode(prev_pid ? prev_pid : getpid(), b_write);
encode(data.op_size, b_write);
return 0;
ERR:
- lock.lock();
+ locker.lock();
data.done = 1;
- lock.unlock();
+ locker.unlock();
pthread_join(print_thread, NULL);
return r;
}
-int ObjBencher::seq_read_bench(int seconds_to_run, int num_objects, int concurrentios, int pid, bool no_verify) {
+int ObjBencher::seq_read_bench(
+ int seconds_to_run, int num_ops, int num_objects,
+ int concurrentios, int pid, bool no_verify) {
+
lock_cond lc(&lock);
- if (concurrentios <= 0)
+ if (concurrentios <= 0)
return -EINVAL;
std::vector<string> name(concurrentios);
contents[i] = std::make_unique<bufferlist>();
}
- lock.lock();
+ std::unique_lock locker{lock};
data.finished = 0;
data.start_time = mono_clock::now();
- lock.unlock();
+ locker.unlock();
pthread_t print_thread;
pthread_create(&print_thread, NULL, status_printer, (void *)this);
cerr << "r = " << r << std::endl;
goto ERR;
}
- lock.lock();
+ locker.lock();
++data.started;
++data.in_flight;
- lock.unlock();
+ locker.unlock();
}
//keep on adding new reads as old ones complete
bufferlist *cur_contents;
slot = 0;
- while ((seconds_to_run && mono_clock::now() < finish_time) &&
- num_objects > data.started) {
- lock.lock();
+ while (data.finished < data.started) {
+ locker.lock();
int old_slot = slot;
bool found = false;
while (1) {
if (found) {
break;
}
- lc.cond.Wait(lock);
+ lc.cond.wait(locker);
}
// calculate latency here, so memcmp doesn't inflate it
cur_contents = contents[slot].get();
int current_index = index[slot];
-
+
// invalidate internal crc cache
cur_contents->invalidate_crc();
-
+
if (!no_verify) {
snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", current_index);
- if ( (cur_contents->length() != data.op_size) ||
+ if ( (cur_contents->length() != data.op_size) ||
(memcmp(data.object_contents, cur_contents->c_str(), data.op_size) != 0) ) {
cerr << name[slot] << " is not correct!" << std::endl;
++errors;
}
}
- newName = generate_object_name_fast(data.started / reads_per_object, pid);
- index[slot] = data.started;
- lock.unlock();
+ bool start_new_read = (seconds_to_run && mono_clock::now() < finish_time) &&
+ num_ops > data.started;
+ if (start_new_read) {
+ newName = generate_object_name_fast(data.started / reads_per_object, pid);
+ index[slot] = data.started;
+ }
+
+ locker.unlock();
completion_wait(slot);
- lock.lock();
+ locker.lock();
r = completion_ret(slot);
if (r < 0) {
cerr << "read got " << r << std::endl;
- lock.unlock();
+ locker.unlock();
goto ERR;
}
total_latency += data.cur_latency.count();
++data.finished;
data.avg_latency = total_latency / data.finished;
--data.in_flight;
- lock.unlock();
+ locker.unlock();
release_completion(slot);
+ if (!start_new_read)
+ continue;
+
//start new read and check data if requested
start_times[slot] = mono_clock::now();
create_completion(slot, _aio_cb, (void *)&lc);
if (r < 0) {
goto ERR;
}
- lock.lock();
+ locker.lock();
++data.started;
++data.in_flight;
- lock.unlock();
+ locker.unlock();
name[slot] = newName;
}
- //wait for final reads to complete
- while (data.finished < data.started) {
- slot = data.finished % concurrentios;
- completion_wait(slot);
- lock.lock();
- r = completion_ret(slot);
- if (r < 0) {
- cerr << "read got " << r << std::endl;
- lock.unlock();
- goto ERR;
- }
- data.cur_latency = mono_clock::now() - start_times[slot];
- total_latency += data.cur_latency.count();
- if (data.cur_latency.count() > data.max_latency)
- data.max_latency = data.cur_latency.count();
- if (data.cur_latency.count() < data.min_latency)
- data.min_latency = data.cur_latency.count();
- ++data.finished;
- data.avg_latency = total_latency / data.finished;
- --data.in_flight;
- release_completion(slot);
- if (!no_verify) {
- snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", index[slot]);
- lock.unlock();
- if ((contents[slot]->length() != data.op_size) ||
- (memcmp(data.object_contents, contents[slot]->c_str(), data.op_size) != 0)) {
- cerr << name[slot] << " is not correct!" << std::endl;
- ++errors;
- }
- } else {
- lock.unlock();
- }
- }
-
timePassed = mono_clock::now() - data.start_time;
- lock.lock();
+ locker.lock();
data.done = true;
- lock.unlock();
+ locker.unlock();
pthread_join(print_thread, NULL);
double bandwidth;
bandwidth = ((double)data.finished)*((double)data.op_size)/timePassed.count();
bandwidth = bandwidth/(1024*1024); // we want it in MB/sec
-
+
double iops_stddev;
if (data.idata.iops_cycles > 1) {
iops_stddev = std::sqrt(data.idata.iops_diff_sum / (data.idata.iops_cycles - 1));
return (errors > 0 ? -EIO : 0);
ERR:
- lock.lock();
+ locker.lock();
data.done = 1;
- lock.unlock();
+ locker.unlock();
pthread_join(print_thread, NULL);
return r;
}
-int ObjBencher::rand_read_bench(int seconds_to_run, int num_objects, int concurrentios, int pid, bool no_verify)
-{
+int ObjBencher::rand_read_bench(
+ int seconds_to_run, int num_ops, int num_objects,
+ int concurrentios, int pid, bool no_verify) {
+
lock_cond lc(&lock);
if (concurrentios <= 0)
contents[i] = std::make_unique<bufferlist>();
}
- lock.lock();
+ unique_lock locker{lock};
data.finished = 0;
data.start_time = mono_clock::now();
- lock.unlock();
+ locker.unlock();
pthread_t print_thread;
pthread_create(&print_thread, NULL, status_printer, (void *)this);
cerr << "r = " << r << std::endl;
goto ERR;
}
- lock.lock();
+ locker.lock();
++data.started;
++data.in_flight;
- lock.unlock();
+ locker.unlock();
}
//keep on adding new reads as old ones complete
int rand_id;
slot = 0;
- while ((seconds_to_run && mono_clock::now() < finish_time)) {
- lock.lock();
+ while (data.finished < data.started) {
+ locker.lock();
int old_slot = slot;
bool found = false;
while (1) {
if (found) {
break;
}
- lc.cond.Wait(lock);
+ lc.cond.wait(locker);
}
// calculate latency here, so memcmp doesn't inflate it
data.cur_latency = mono_clock::now() - start_times[slot];
- lock.unlock();
+ locker.unlock();
int current_index = index[slot];
cur_contents = contents[slot].get();
completion_wait(slot);
- lock.lock();
+ locker.lock();
r = completion_ret(slot);
if (r < 0) {
cerr << "read got " << r << std::endl;
- lock.unlock();
+ locker.unlock();
goto ERR;
}
++data.finished;
data.avg_latency = total_latency / data.finished;
--data.in_flight;
- lock.unlock();
-
+
if (!no_verify) {
snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", current_index);
- if ((cur_contents->length() != data.op_size) ||
+ if ((cur_contents->length() != data.op_size) ||
(memcmp(data.object_contents, cur_contents->c_str(), data.op_size) != 0)) {
cerr << name[slot] << " is not correct!" << std::endl;
++errors;
}
- }
+ }
+
+ locker.unlock();
+ release_completion(slot);
+
+ if (!seconds_to_run || mono_clock::now() >= finish_time)
+ continue;
+
+ //start new read and check data if requested
- rand_id = rand() % num_objects;
+ rand_id = rand() % num_ops;
newName = generate_object_name_fast(rand_id / reads_per_object, pid);
index[slot] = rand_id;
- release_completion(slot);
// invalidate internal crc cache
cur_contents->invalidate_crc();
- //start new read and check data if requested
start_times[slot] = mono_clock::now();
create_completion(slot, _aio_cb, (void *)&lc);
r = aio_read(newName, slot, contents[slot].get(), data.op_size,
if (r < 0) {
goto ERR;
}
- lock.lock();
+ locker.lock();
++data.started;
++data.in_flight;
- lock.unlock();
+ locker.unlock();
name[slot] = newName;
}
-
- //wait for final reads to complete
- while (data.finished < data.started) {
- slot = data.finished % concurrentios;
- completion_wait(slot);
- lock.lock();
- r = completion_ret(slot);
- if (r < 0) {
- cerr << "read got " << r << std::endl;
- lock.unlock();
- goto ERR;
- }
- data.cur_latency = mono_clock::now() - start_times[slot];
- total_latency += data.cur_latency.count();
- if (data.cur_latency.count() > data.max_latency)
- data.max_latency = data.cur_latency.count();
- if (data.cur_latency.count() < data.min_latency)
- data.min_latency = data.cur_latency.count();
- ++data.finished;
- data.avg_latency = total_latency / data.finished;
- --data.in_flight;
- release_completion(slot);
- if (!no_verify) {
- snprintf(data.object_contents, data.op_size, "I'm the %16dth op!", index[slot]);
- lock.unlock();
- if ((contents[slot]->length() != data.op_size) ||
- (memcmp(data.object_contents, contents[slot]->c_str(), data.op_size) != 0)) {
- cerr << name[slot] << " is not correct!" << std::endl;
- ++errors;
- }
- } else {
- lock.unlock();
- }
- }
-
timePassed = mono_clock::now() - data.start_time;
- lock.lock();
+ locker.lock();
data.done = true;
- lock.unlock();
+ locker.unlock();
pthread_join(print_thread, NULL);
double bandwidth;
bandwidth = ((double)data.finished)*((double)data.op_size)/timePassed.count();
bandwidth = bandwidth/(1024*1024); // we want it in MB/sec
-
+
double iops_stddev;
if (data.idata.iops_cycles > 1) {
iops_stddev = std::sqrt(data.idata.iops_diff_sum / (data.idata.iops_cycles - 1));
return (errors > 0 ? -EIO : 0);
ERR:
- lock.lock();
+ locker.lock();
data.done = 1;
- lock.unlock();
+ locker.unlock();
pthread_join(print_thread, NULL);
return r;
}
int ObjBencher::clean_up(const std::string& orig_prefix, int concurrentios, const std::string& run_name) {
int r = 0;
uint64_t op_size, object_size;
- int num_objects;
+ int num_ops, num_objects;
int prevPid;
// default meta object if user does not specify one
continue;
}
- r = fetch_bench_metadata(run_name_meta, &op_size, &object_size, &num_objects, &prevPid);
+ r = fetch_bench_metadata(run_name_meta, &op_size, &object_size, &num_ops, &num_objects, &prevPid);
if (r < 0) {
return r;
}
int ObjBencher::clean_up(int num_objects, int prevPid, int concurrentios) {
lock_cond lc(&lock);
-
- if (concurrentios <= 0)
+
+ if (concurrentios <= 0)
return -EINVAL;
std::vector<string> name(concurrentios);
int r = 0;
int slot = 0;
- lock.lock();
+ unique_lock locker{lock};
data.done = false;
data.in_flight = 0;
data.started = 0;
data.finished = 0;
- lock.unlock();
+ locker.unlock();
// don't start more completions than files
if (num_objects == 0) {
cerr << "r = " << r << std::endl;
goto ERR;
}
- lock.lock();
+ locker.lock();
++data.started;
++data.in_flight;
- lock.unlock();
+ locker.unlock();
}
//keep on adding new removes as old ones complete
- while (data.started < num_objects) {
- lock.lock();
+ while (data.finished < data.started) {
+ locker.lock();
int old_slot = slot;
bool found = false;
while (1) {
if (found) {
break;
}
- lc.cond.Wait(lock);
+ lc.cond.wait(locker);
}
- lock.unlock();
- newName = generate_object_name_fast(data.started, prevPid);
+ locker.unlock();
completion_wait(slot);
- lock.lock();
+ locker.lock();
r = completion_ret(slot);
if (r != 0 && r != -ENOENT) { // file does not exist
cerr << "remove got " << r << std::endl;
- lock.unlock();
+ locker.unlock();
goto ERR;
}
++data.finished;
--data.in_flight;
- lock.unlock();
+ locker.unlock();
release_completion(slot);
+ if (data.started >= num_objects)
+ continue;
+
//start new remove and check data if requested
+ newName = generate_object_name_fast(data.started, prevPid);
create_completion(slot, _aio_cb, (void *)&lc);
r = aio_remove(newName, slot);
if (r < 0) {
goto ERR;
}
- lock.lock();
+ locker.lock();
++data.started;
++data.in_flight;
- lock.unlock();
+ locker.unlock();
name[slot] = newName;
}
- //wait for final removes to complete
- while (data.finished < data.started) {
- slot = data.finished % concurrentios;
- completion_wait(slot);
- lock.lock();
- r = completion_ret(slot);
- if (r != 0 && r != -ENOENT) { // file does not exist
- cerr << "remove got " << r << std::endl;
- lock.unlock();
- goto ERR;
- }
- ++data.finished;
- --data.in_flight;
- release_completion(slot);
- lock.unlock();
- }
-
- lock.lock();
+ locker.lock();
data.done = true;
- lock.unlock();
+ locker.unlock();
completions_done();
return 0;
ERR:
- lock.lock();
+ locker.lock();
data.done = 1;
- lock.unlock();
+ locker.unlock();
return r;
}
std::list<Object> objects;
bool objects_remain = true;
- lock.lock();
+ std::unique_lock locker{lock};
data.done = false;
data.in_flight = 0;
data.started = 0;
data.finished = 0;
- lock.unlock();
+ locker.unlock();
out(cout) << "Warning: using slow linear search" << std::endl;
cerr << "r = " << r << std::endl;
goto ERR;
}
- lock.lock();
+ locker.lock();
++data.started;
++data.in_flight;
- lock.unlock();
+ locker.unlock();
}
//keep on adding new removes as old ones complete
while (objects_remain) {
- lock.lock();
+ locker.lock();
int old_slot = slot;
bool found = false;
while (1) {
if (found) {
break;
}
- lc.cond.Wait(lock);
+ lc.cond.wait(locker);
}
- lock.unlock();
+ locker.unlock();
// get more objects if necessary
if (objects.empty()) {
objects.pop_front();
completion_wait(slot);
- lock.lock();
+ locker.lock();
r = completion_ret(slot);
if (r != 0 && r != -ENOENT) { // file does not exist
cerr << "remove got " << r << std::endl;
- lock.unlock();
+ locker.unlock();
goto ERR;
}
++data.finished;
--data.in_flight;
- lock.unlock();
+ locker.unlock();
release_completion(slot);
//start new remove and check data if requested
if (r < 0) {
goto ERR;
}
- lock.lock();
+ locker.lock();
++data.started;
++data.in_flight;
- lock.unlock();
+ locker.unlock();
name[slot] = newName;
}
while (data.finished < data.started) {
slot = data.finished % concurrentios;
completion_wait(slot);
- lock.lock();
+ locker.lock();
r = completion_ret(slot);
if (r != 0 && r != -ENOENT) { // file does not exist
cerr << "remove got " << r << std::endl;
- lock.unlock();
+ locker.unlock();
goto ERR;
}
++data.finished;
--data.in_flight;
release_completion(slot);
- lock.unlock();
+ locker.unlock();
}
- lock.lock();
+ locker.lock();
data.done = true;
- lock.unlock();
+ locker.unlock();
completions_done();
return 0;
ERR:
- lock.lock();
+ locker.lock();
data.done = 1;
- lock.unlock();
+ locker.unlock();
return -EIO;
}