]> git.proxmox.com Git - ceph.git/blame - ceph/src/test/bench/bencher.cc
bump version to 12.2.12-pve1
[ceph.git] / ceph / src / test / bench / bencher.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2
3#include "bencher.h"
4#include "include/utime.h"
5#include <unistd.h>
6#include "include/memory.h"
7#include "common/Mutex.h"
8#include "common/Cond.h"
9
10template<typename T>
11struct C_Holder : public Context {
12 T obj;
13 explicit C_Holder(
14 T obj)
15 : obj(obj) {}
16 void finish(int r) override {
17 return;
18 }
19};
20
21struct OnDelete {
22 Context *c;
23 explicit OnDelete(Context *c) : c(c) {}
24 ~OnDelete() { c->complete(0); }
25};
26
27struct Cleanup : public Context {
28 Bencher *bench;
29 explicit Cleanup(Bencher *bench) : bench(bench) {}
30 void finish(int r) override {
31 bench->complete_op();
32 }
33};
34
35struct OnWriteApplied : public Context {
36 Bencher *bench;
37 uint64_t seq;
38 ceph::shared_ptr<OnDelete> on_delete;
39 OnWriteApplied(
40 Bencher *bench, uint64_t seq,
41 ceph::shared_ptr<OnDelete> on_delete
42 ) : bench(bench), seq(seq), on_delete(on_delete) {}
43 void finish(int r) override {
44 bench->stat_collector->write_applied(seq);
45 }
46};
47
48struct OnWriteCommit : public Context {
49 Bencher *bench;
50 uint64_t seq;
51 ceph::shared_ptr<OnDelete> on_delete;
52 OnWriteCommit(
53 Bencher *bench, uint64_t seq,
54 ceph::shared_ptr<OnDelete> on_delete
55 ) : bench(bench), seq(seq), on_delete(on_delete) {}
56 void finish(int r) override {
57 bench->stat_collector->write_committed(seq);
58 }
59};
60
61struct OnReadComplete : public Context {
62 Bencher *bench;
63 uint64_t seq;
64 boost::scoped_ptr<bufferlist> bl;
65 OnReadComplete(Bencher *bench, uint64_t seq, bufferlist *bl) :
66 bench(bench), seq(seq), bl(bl) {}
67 void finish(int r) override {
68 bench->stat_collector->read_complete(seq);
69 bench->complete_op();
70 }
71};
72
73void Bencher::start_op() {
74 Mutex::Locker l(lock);
75 while (open_ops >= max_in_flight)
76 open_ops_cond.Wait(lock);
77 ++open_ops;
78}
79
80void Bencher::drain_ops() {
81 Mutex::Locker l(lock);
82 while (open_ops)
83 open_ops_cond.Wait(lock);
84}
85
86void Bencher::complete_op() {
87 Mutex::Locker l(lock);
88 assert(open_ops > 0);
89 --open_ops;
90 open_ops_cond.Signal();
91}
92
93struct OnFinish {
94 bool *done;
95 Mutex *lock;
96 Cond *cond;
97 OnFinish(
98 bool *done,
99 Mutex *lock,
100 Cond *cond) :
101 done(done), lock(lock), cond(cond) {}
102 ~OnFinish() {
103 Mutex::Locker l(*lock);
104 *done = true;
105 cond->Signal();
106 }
107};
108
109void Bencher::init(
110 const set<std::string> &objects,
111 uint64_t size,
112 std::ostream *out
113 )
114{
115 bufferlist bl;
116 for (uint64_t i = 0; i < size; ++i) {
117 bl.append(0);
118 }
119 Mutex lock("init_lock");
120 Cond cond;
121 bool done = 0;
122 {
123 ceph::shared_ptr<OnFinish> on_finish(
124 new OnFinish(&done, &lock, &cond));
125 uint64_t num = 0;
126 for (set<std::string>::const_iterator i = objects.begin();
127 i != objects.end();
128 ++i, ++num) {
129 if (!(num % 20))
130 *out << "Creating " << num << "/" << objects.size() << std::endl;
131 backend->write(
132 *i,
133 0,
134 bl,
135 new C_Holder<ceph::shared_ptr<OnFinish> >(on_finish),
136 new C_Holder<ceph::shared_ptr<OnFinish> >(on_finish)
137 );
138 }
139 }
140 {
141 Mutex::Locker l(lock);
142 while (!done)
143 cond.Wait(lock);
144 }
145}
146
147void Bencher::run_bench()
148{
149 time_t end = time(0) + max_duration;
150 uint64_t ops = 0;
151
152 bufferlist bl;
153
154 while ((!max_duration || time(0) < end) && (!max_ops || ops < max_ops)) {
155 start_op();
156 uint64_t seq = stat_collector->next_seq();
157 boost::tuple<std::string, uint64_t, uint64_t, OpType> next =
158 (*op_dist)();
159 string obj_name = next.get<0>();
160 uint64_t offset = next.get<1>();
161 uint64_t length = next.get<2>();
162 OpType op_type = next.get<3>();
163 switch (op_type) {
164 case WRITE: {
165 ceph::shared_ptr<OnDelete> on_delete(
166 new OnDelete(new Cleanup(this)));
167 stat_collector->start_write(seq, length);
168 while (bl.length() < length) {
169 bl.append(rand());
170 }
171 backend->write(
172 obj_name,
173 offset,
174 bl,
175 new OnWriteApplied(
176 this, seq, on_delete),
177 new OnWriteCommit(
178 this, seq, on_delete)
179 );
180 break;
181 }
182 case READ: {
183 stat_collector->start_read(seq, length);
184 bufferlist *read_bl = new bufferlist;
185 backend->read(
186 obj_name,
187 offset,
188 length,
189 read_bl,
190 new OnReadComplete(
191 this, seq, read_bl)
192 );
193 break;
194 }
195 default: {
196 ceph_abort();
197 }
198 }
199 ops++;
200 }
201 drain_ops();
202}