]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | ||
3 | #include "bencher.h" | |
4 | #include "include/utime.h" | |
5 | #include <unistd.h> | |
6 | #include "include/memory.h" | |
7 | #include "common/Mutex.h" | |
8 | #include "common/Cond.h" | |
9 | ||
10 | template<typename T> | |
11 | struct C_Holder : public Context { | |
12 | T obj; | |
13 | explicit C_Holder( | |
14 | T obj) | |
15 | : obj(obj) {} | |
16 | void finish(int r) override { | |
17 | return; | |
18 | } | |
19 | }; | |
20 | ||
21 | struct OnDelete { | |
22 | Context *c; | |
23 | explicit OnDelete(Context *c) : c(c) {} | |
24 | ~OnDelete() { c->complete(0); } | |
25 | }; | |
26 | ||
27 | struct Cleanup : public Context { | |
28 | Bencher *bench; | |
29 | explicit Cleanup(Bencher *bench) : bench(bench) {} | |
30 | void finish(int r) override { | |
31 | bench->complete_op(); | |
32 | } | |
33 | }; | |
34 | ||
35 | struct OnWriteApplied : public Context { | |
36 | Bencher *bench; | |
37 | uint64_t seq; | |
38 | ceph::shared_ptr<OnDelete> on_delete; | |
39 | OnWriteApplied( | |
40 | Bencher *bench, uint64_t seq, | |
41 | ceph::shared_ptr<OnDelete> on_delete | |
42 | ) : bench(bench), seq(seq), on_delete(on_delete) {} | |
43 | void finish(int r) override { | |
44 | bench->stat_collector->write_applied(seq); | |
45 | } | |
46 | }; | |
47 | ||
48 | struct OnWriteCommit : public Context { | |
49 | Bencher *bench; | |
50 | uint64_t seq; | |
51 | ceph::shared_ptr<OnDelete> on_delete; | |
52 | OnWriteCommit( | |
53 | Bencher *bench, uint64_t seq, | |
54 | ceph::shared_ptr<OnDelete> on_delete | |
55 | ) : bench(bench), seq(seq), on_delete(on_delete) {} | |
56 | void finish(int r) override { | |
57 | bench->stat_collector->write_committed(seq); | |
58 | } | |
59 | }; | |
60 | ||
61 | struct OnReadComplete : public Context { | |
62 | Bencher *bench; | |
63 | uint64_t seq; | |
64 | boost::scoped_ptr<bufferlist> bl; | |
65 | OnReadComplete(Bencher *bench, uint64_t seq, bufferlist *bl) : | |
66 | bench(bench), seq(seq), bl(bl) {} | |
67 | void finish(int r) override { | |
68 | bench->stat_collector->read_complete(seq); | |
69 | bench->complete_op(); | |
70 | } | |
71 | }; | |
72 | ||
73 | void Bencher::start_op() { | |
74 | Mutex::Locker l(lock); | |
75 | while (open_ops >= max_in_flight) | |
76 | open_ops_cond.Wait(lock); | |
77 | ++open_ops; | |
78 | } | |
79 | ||
80 | void Bencher::drain_ops() { | |
81 | Mutex::Locker l(lock); | |
82 | while (open_ops) | |
83 | open_ops_cond.Wait(lock); | |
84 | } | |
85 | ||
86 | void Bencher::complete_op() { | |
87 | Mutex::Locker l(lock); | |
88 | assert(open_ops > 0); | |
89 | --open_ops; | |
90 | open_ops_cond.Signal(); | |
91 | } | |
92 | ||
93 | struct OnFinish { | |
94 | bool *done; | |
95 | Mutex *lock; | |
96 | Cond *cond; | |
97 | OnFinish( | |
98 | bool *done, | |
99 | Mutex *lock, | |
100 | Cond *cond) : | |
101 | done(done), lock(lock), cond(cond) {} | |
102 | ~OnFinish() { | |
103 | Mutex::Locker l(*lock); | |
104 | *done = true; | |
105 | cond->Signal(); | |
106 | } | |
107 | }; | |
108 | ||
109 | void Bencher::init( | |
110 | const set<std::string> &objects, | |
111 | uint64_t size, | |
112 | std::ostream *out | |
113 | ) | |
114 | { | |
115 | bufferlist bl; | |
116 | for (uint64_t i = 0; i < size; ++i) { | |
117 | bl.append(0); | |
118 | } | |
119 | Mutex lock("init_lock"); | |
120 | Cond cond; | |
121 | bool done = 0; | |
122 | { | |
123 | ceph::shared_ptr<OnFinish> on_finish( | |
124 | new OnFinish(&done, &lock, &cond)); | |
125 | uint64_t num = 0; | |
126 | for (set<std::string>::const_iterator i = objects.begin(); | |
127 | i != objects.end(); | |
128 | ++i, ++num) { | |
129 | if (!(num % 20)) | |
130 | *out << "Creating " << num << "/" << objects.size() << std::endl; | |
131 | backend->write( | |
132 | *i, | |
133 | 0, | |
134 | bl, | |
135 | new C_Holder<ceph::shared_ptr<OnFinish> >(on_finish), | |
136 | new C_Holder<ceph::shared_ptr<OnFinish> >(on_finish) | |
137 | ); | |
138 | } | |
139 | } | |
140 | { | |
141 | Mutex::Locker l(lock); | |
142 | while (!done) | |
143 | cond.Wait(lock); | |
144 | } | |
145 | } | |
146 | ||
147 | void Bencher::run_bench() | |
148 | { | |
149 | time_t end = time(0) + max_duration; | |
150 | uint64_t ops = 0; | |
151 | ||
152 | bufferlist bl; | |
153 | ||
154 | while ((!max_duration || time(0) < end) && (!max_ops || ops < max_ops)) { | |
155 | start_op(); | |
156 | uint64_t seq = stat_collector->next_seq(); | |
157 | boost::tuple<std::string, uint64_t, uint64_t, OpType> next = | |
158 | (*op_dist)(); | |
159 | string obj_name = next.get<0>(); | |
160 | uint64_t offset = next.get<1>(); | |
161 | uint64_t length = next.get<2>(); | |
162 | OpType op_type = next.get<3>(); | |
163 | switch (op_type) { | |
164 | case WRITE: { | |
165 | ceph::shared_ptr<OnDelete> on_delete( | |
166 | new OnDelete(new Cleanup(this))); | |
167 | stat_collector->start_write(seq, length); | |
168 | while (bl.length() < length) { | |
169 | bl.append(rand()); | |
170 | } | |
171 | backend->write( | |
172 | obj_name, | |
173 | offset, | |
174 | bl, | |
175 | new OnWriteApplied( | |
176 | this, seq, on_delete), | |
177 | new OnWriteCommit( | |
178 | this, seq, on_delete) | |
179 | ); | |
180 | break; | |
181 | } | |
182 | case READ: { | |
183 | stat_collector->start_read(seq, length); | |
184 | bufferlist *read_bl = new bufferlist; | |
185 | backend->read( | |
186 | obj_name, | |
187 | offset, | |
188 | length, | |
189 | read_bl, | |
190 | new OnReadComplete( | |
191 | this, seq, read_bl) | |
192 | ); | |
193 | break; | |
194 | } | |
195 | default: { | |
196 | ceph_abort(); | |
197 | } | |
198 | } | |
199 | ops++; | |
200 | } | |
201 | drain_ops(); | |
202 | } |