]> git.proxmox.com Git - ceph.git/blob - ceph/src/test/osdc/object_cacher_stress.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / test / osdc / object_cacher_stress.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include <cstdlib>
5 #include <ctime>
6 #include <sstream>
7 #include <string>
8 #include <vector>
9 #include <boost/scoped_ptr.hpp>
10
11 #include "common/ceph_argparse.h"
12 #include "common/common_init.h"
13 #include "common/config.h"
14 #include "common/Mutex.h"
15 #include "common/snap_types.h"
16 #include "global/global_init.h"
17 #include "include/atomic.h"
18 #include "include/buffer.h"
19 #include "include/Context.h"
20 #include "include/stringify.h"
21 #include "osdc/ObjectCacher.h"
22
23 #include "FakeWriteback.h"
24 #include "MemWriteback.h"
25
26 // XXX: Only tests default namespace
27 struct op_data {
28 op_data(std::string oid, uint64_t offset, uint64_t len, bool read)
29 : extent(oid, 0, offset, len, 0), is_read(read)
30 {
31 extent.oloc.pool = 0;
32 extent.buffer_extents.push_back(make_pair(0, len));
33 }
34
35 ObjectExtent extent;
36 bool is_read;
37 ceph::bufferlist result;
38 atomic_t done;
39 };
40
41 class C_Count : public Context {
42 op_data *m_op;
43 atomic_t *m_outstanding;
44 public:
45 C_Count(op_data *op, atomic_t *outstanding)
46 : m_op(op), m_outstanding(outstanding) {}
47 void finish(int r) override {
48 m_op->done.inc();
49 assert(m_outstanding->read() > 0);
50 m_outstanding->dec();
51 }
52 };
53
54 int stress_test(uint64_t num_ops, uint64_t num_objs,
55 uint64_t max_obj_size, uint64_t delay_ns,
56 uint64_t max_op_len, float percent_reads)
57 {
58 Mutex lock("object_cacher_stress::object_cacher");
59 FakeWriteback writeback(g_ceph_context, &lock, delay_ns);
60
61 ObjectCacher obc(g_ceph_context, "test", writeback, lock, NULL, NULL,
62 g_conf->client_oc_size,
63 g_conf->client_oc_max_objects,
64 g_conf->client_oc_max_dirty,
65 g_conf->client_oc_target_dirty,
66 g_conf->client_oc_max_dirty_age,
67 true);
68 obc.start();
69
70 atomic_t outstanding_reads;
71 vector<ceph::shared_ptr<op_data> > ops;
72 ObjectCacher::ObjectSet object_set(NULL, 0, 0);
73 SnapContext snapc;
74 ceph::buffer::ptr bp(max_op_len);
75 ceph::bufferlist bl;
76 uint64_t journal_tid = 0;
77 bp.zero();
78 bl.append(bp);
79
80 // schedule ops
81 std::cout << "Test configuration:\n\n"
82 << setw(10) << "ops: " << num_ops << "\n"
83 << setw(10) << "objects: " << num_objs << "\n"
84 << setw(10) << "obj size: " << max_obj_size << "\n"
85 << setw(10) << "delay: " << delay_ns << "\n"
86 << setw(10) << "max op len: " << max_op_len << "\n"
87 << setw(10) << "percent reads: " << percent_reads << "\n\n";
88
89 for (uint64_t i = 0; i < num_ops; ++i) {
90 uint64_t offset = random() % max_obj_size;
91 uint64_t max_len = MIN(max_obj_size - offset, max_op_len);
92 // no zero-length operations
93 uint64_t length = random() % (MAX(max_len - 1, 1)) + 1;
94 std::string oid = "test" + stringify(random() % num_objs);
95 bool is_read = random() < percent_reads * RAND_MAX;
96 ceph::shared_ptr<op_data> op(new op_data(oid, offset, length, is_read));
97 ops.push_back(op);
98 std::cout << "op " << i << " " << (is_read ? "read" : "write")
99 << " " << op->extent << "\n";
100 if (op->is_read) {
101 ObjectCacher::OSDRead *rd = obc.prepare_read(CEPH_NOSNAP, &op->result, 0);
102 rd->extents.push_back(op->extent);
103 outstanding_reads.inc();
104 Context *completion = new C_Count(op.get(), &outstanding_reads);
105 lock.Lock();
106 int r = obc.readx(rd, &object_set, completion);
107 lock.Unlock();
108 assert(r >= 0);
109 if ((uint64_t)r == length)
110 completion->complete(r);
111 else
112 assert(r == 0);
113 } else {
114 ObjectCacher::OSDWrite *wr = obc.prepare_write(snapc, bl,
115 ceph::real_time::min(), 0,
116 ++journal_tid);
117 wr->extents.push_back(op->extent);
118 lock.Lock();
119 obc.writex(wr, &object_set, NULL);
120 lock.Unlock();
121 }
122 }
123
124 // check that all reads completed
125 for (uint64_t i = 0; i < num_ops; ++i) {
126 if (!ops[i]->is_read)
127 continue;
128 std::cout << "waiting for read " << i << ops[i]->extent << std::endl;
129 uint64_t done = 0;
130 while (done == 0) {
131 done = ops[i]->done.read();
132 if (!done) {
133 usleep(500);
134 }
135 }
136 if (done > 1) {
137 std::cout << "completion called more than once!\n" << std::endl;
138 return EXIT_FAILURE;
139 }
140 }
141
142 lock.Lock();
143 obc.release_set(&object_set);
144 lock.Unlock();
145
146 int r = 0;
147 Mutex mylock("librbd::ImageCtx::flush_cache");
148 Cond cond;
149 bool done;
150 Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &r);
151 lock.Lock();
152 bool already_flushed = obc.flush_set(&object_set, onfinish);
153 std::cout << "already flushed = " << already_flushed << std::endl;
154 lock.Unlock();
155 mylock.Lock();
156 while (!done) {
157 cond.Wait(mylock);
158 }
159 mylock.Unlock();
160
161 lock.Lock();
162 bool unclean = obc.release_set(&object_set);
163 lock.Unlock();
164
165 if (unclean) {
166 std::cout << "unclean buffers left over!" << std::endl;
167 return EXIT_FAILURE;
168 }
169
170 obc.stop();
171
172 std::cout << "Test completed successfully." << std::endl;
173
174 return EXIT_SUCCESS;
175 }
176
177 int correctness_test(uint64_t delay_ns)
178 {
179 std::cerr << "starting correctness test" << std::endl;
180 Mutex lock("object_cacher_stress::object_cacher");
181 MemWriteback writeback(g_ceph_context, &lock, delay_ns);
182
183 ObjectCacher obc(g_ceph_context, "test", writeback, lock, NULL, NULL,
184 1<<21, // max cache size, 2MB
185 1, // max objects, just one
186 1<<18, // max dirty, 256KB
187 1<<17, // target dirty, 128KB
188 g_conf->client_oc_max_dirty_age,
189 true);
190 obc.start();
191 std::cerr << "just start()ed ObjectCacher" << std::endl;
192
193 SnapContext snapc;
194 ceph_tid_t journal_tid = 0;
195 std::string oid("correctness_test_obj");
196 ObjectCacher::ObjectSet object_set(NULL, 0, 0);
197 ceph::bufferlist zeroes_bl;
198 zeroes_bl.append_zero(1<<20);
199
200 // set up a 4MB all-zero object
201 std::cerr << "writing 4x1MB object" << std::endl;
202 std::map<int, C_SaferCond> create_finishers;
203 for (int i = 0; i < 4; ++i) {
204 ObjectCacher::OSDWrite *wr = obc.prepare_write(snapc, zeroes_bl,
205 ceph::real_time::min(), 0,
206 ++journal_tid);
207 ObjectExtent extent(oid, 0, zeroes_bl.length()*i, zeroes_bl.length(), 0);
208 extent.oloc.pool = 0;
209 extent.buffer_extents.push_back(make_pair(0, 1<<20));
210 wr->extents.push_back(extent);
211 lock.Lock();
212 obc.writex(wr, &object_set, &create_finishers[i]);
213 lock.Unlock();
214 }
215
216 // write some 1-valued bits at 256-KB intervals for checking consistency
217 std::cerr << "Writing some 0xff values" << std::endl;
218 ceph::buffer::ptr ones(1<<16);
219 memset(ones.c_str(), 0xff, ones.length());
220 ceph::bufferlist ones_bl;
221 ones_bl.append(ones);
222 for (int i = 1<<18; i < 1<<22; i+=1<<18) {
223 ObjectCacher::OSDWrite *wr = obc.prepare_write(snapc, ones_bl,
224 ceph::real_time::min(), 0,
225 ++journal_tid);
226 ObjectExtent extent(oid, 0, i, ones_bl.length(), 0);
227 extent.oloc.pool = 0;
228 extent.buffer_extents.push_back(make_pair(0, 1<<16));
229 wr->extents.push_back(extent);
230 lock.Lock();
231 obc.writex(wr, &object_set, &create_finishers[i]);
232 lock.Unlock();
233 }
234
235 for (auto i = create_finishers.begin(); i != create_finishers.end(); ++i) {
236 i->second.wait();
237 }
238 std::cout << "Finished setting up object" << std::endl;
239 lock.Lock();
240 C_SaferCond flushcond;
241 bool done = obc.flush_all(&flushcond);
242 if (!done) {
243 std::cout << "Waiting for flush" << std::endl;
244 lock.Unlock();
245 flushcond.wait();
246 lock.Lock();
247 }
248 lock.Unlock();
249
250 /* now read the back half of the object in, check consistency,
251 */
252 std::cout << "Reading back half of object (1<<21~1<<21)" << std::endl;
253 bufferlist readbl;
254 C_SaferCond backreadcond;
255 ObjectCacher::OSDRead *back_half_rd = obc.prepare_read(CEPH_NOSNAP, &readbl, 0);
256 ObjectExtent back_half_extent(oid, 0, 1<<21, 1<<21, 0);
257 back_half_extent.oloc.pool = 0;
258 back_half_extent.buffer_extents.push_back(make_pair(0, 1<<21));
259 back_half_rd->extents.push_back(back_half_extent);
260 lock.Lock();
261 int r = obc.readx(back_half_rd, &object_set, &backreadcond);
262 lock.Unlock();
263 assert(r >= 0);
264 if (r == 0) {
265 std::cout << "Waiting to read data into cache" << std::endl;
266 r = backreadcond.wait();
267 }
268
269 assert(r == 1<<21);
270
271 /* Read the whole object in,
272 * verify we have to wait for it to complete,
273 * overwrite a small piece, (http://tracker.ceph.com/issues/16002),
274 * and check consistency */
275
276 readbl.clear();
277 std::cout<< "Reading whole object (0~1<<22)" << std::endl;
278 C_SaferCond frontreadcond;
279 ObjectCacher::OSDRead *whole_rd = obc.prepare_read(CEPH_NOSNAP, &readbl, 0);
280 ObjectExtent whole_extent(oid, 0, 0, 1<<22, 0);
281 whole_extent.oloc.pool = 0;
282 whole_extent.buffer_extents.push_back(make_pair(0, 1<<22));
283 whole_rd->extents.push_back(whole_extent);
284 lock.Lock();
285 r = obc.readx(whole_rd, &object_set, &frontreadcond);
286 // we cleared out the cache by reading back half, it shouldn't pass immediately!
287 assert(r == 0);
288 std::cout << "Data (correctly) not available without fetching" << std::endl;
289
290 ObjectCacher::OSDWrite *verify_wr = obc.prepare_write(snapc, ones_bl,
291 ceph::real_time::min(), 0,
292 ++journal_tid);
293 ObjectExtent verify_extent(oid, 0, (1<<18)+(1<<16), ones_bl.length(), 0);
294 verify_extent.oloc.pool = 0;
295 verify_extent.buffer_extents.push_back(make_pair(0, 1<<16));
296 verify_wr->extents.push_back(verify_extent);
297 C_SaferCond verify_finisher;
298 obc.writex(verify_wr, &object_set, &verify_finisher);
299 lock.Unlock();
300 std::cout << "wrote dirtying data" << std::endl;
301
302 std::cout << "Waiting to read data into cache" << std::endl;
303 frontreadcond.wait();
304 verify_finisher.wait();
305
306 std::cout << "Validating data" << std::endl;
307
308 for (int i = 1<<18; i < 1<<22; i+=1<<18) {
309 bufferlist ones_maybe;
310 ones_maybe.substr_of(readbl, i, ones_bl.length());
311 assert(0 == memcmp(ones_maybe.c_str(), ones_bl.c_str(), ones_bl.length()));
312 }
313 bufferlist ones_maybe;
314 ones_maybe.substr_of(readbl, (1<<18)+(1<<16), ones_bl.length());
315 assert(0 == memcmp(ones_maybe.c_str(), ones_bl.c_str(), ones_bl.length()));
316
317 std::cout << "validated that data is 0xff where it should be" << std::endl;
318
319 lock.Lock();
320 C_SaferCond flushcond2;
321 done = obc.flush_all(&flushcond2);
322 if (!done) {
323 std::cout << "Waiting for final write flush" << std::endl;
324 lock.Unlock();
325 flushcond2.wait();
326 lock.Lock();
327 }
328
329 bool unclean = obc.release_set(&object_set);
330 if (unclean) {
331 std::cout << "unclean buffers left over!" << std::endl;
332 vector<ObjectExtent> discard_extents;
333 int i = 0;
334 for (auto oi = object_set.objects.begin(); !oi.end(); ++oi) {
335 discard_extents.emplace_back(oid, i++, 0, 1<<22, 0);
336 }
337 obc.discard_set(&object_set, discard_extents);
338 lock.Unlock();
339 obc.stop();
340 goto fail;
341 }
342 lock.Unlock();
343
344 obc.stop();
345
346 std::cout << "Testing ObjectCacher correctness complete" << std::endl;
347 return EXIT_SUCCESS;
348
349 fail:
350 return EXIT_FAILURE;
351 }
352
353 int main(int argc, const char **argv)
354 {
355 std::vector<const char*> args;
356 argv_to_vec(argc, argv, args);
357 env_to_vec(args);
358 auto cct = global_init(NULL, args, CEPH_ENTITY_TYPE_CLIENT,
359 CODE_ENVIRONMENT_UTILITY, 0);
360
361 long long delay_ns = 0;
362 long long num_ops = 1000;
363 long long obj_bytes = 4 << 20;
364 long long max_len = 128 << 10;
365 long long num_objs = 10;
366 float percent_reads = 0.90;
367 int seed = time(0) % 100000;
368 bool stress = false;
369 bool correctness = false;
370 std::ostringstream err;
371 std::vector<const char*>::iterator i;
372 for (i = args.begin(); i != args.end();) {
373 if (ceph_argparse_witharg(args, i, &delay_ns, err, "--delay-ns", (char*)NULL)) {
374 if (!err.str().empty()) {
375 cerr << argv[0] << ": " << err.str() << std::endl;
376 return EXIT_FAILURE;
377 }
378 } else if (ceph_argparse_witharg(args, i, &num_ops, err, "--ops", (char*)NULL)) {
379 if (!err.str().empty()) {
380 cerr << argv[0] << ": " << err.str() << std::endl;
381 return EXIT_FAILURE;
382 }
383 } else if (ceph_argparse_witharg(args, i, &num_objs, err, "--objects", (char*)NULL)) {
384 if (!err.str().empty()) {
385 cerr << argv[0] << ": " << err.str() << std::endl;
386 return EXIT_FAILURE;
387 }
388 } else if (ceph_argparse_witharg(args, i, &obj_bytes, err, "--obj-size", (char*)NULL)) {
389 if (!err.str().empty()) {
390 cerr << argv[0] << ": " << err.str() << std::endl;
391 return EXIT_FAILURE;
392 }
393 } else if (ceph_argparse_witharg(args, i, &max_len, err, "--max-op-size", (char*)NULL)) {
394 if (!err.str().empty()) {
395 cerr << argv[0] << ": " << err.str() << std::endl;
396 return EXIT_FAILURE;
397 }
398 } else if (ceph_argparse_witharg(args, i, &percent_reads, err, "--percent-read", (char*)NULL)) {
399 if (!err.str().empty()) {
400 cerr << argv[0] << ": " << err.str() << std::endl;
401 return EXIT_FAILURE;
402 }
403 } else if (ceph_argparse_witharg(args, i, &seed, err, "--seed", (char*)NULL)) {
404 if (!err.str().empty()) {
405 cerr << argv[0] << ": " << err.str() << std::endl;
406 return EXIT_FAILURE;
407 }
408 } else if (ceph_argparse_flag(args, i, "--stress-test", NULL)) {
409 stress = true;
410 } else if (ceph_argparse_flag(args, i, "--correctness-test", NULL)) {
411 correctness = true;
412 } else {
413 cerr << "unknown option " << *i << std::endl;
414 return EXIT_FAILURE;
415 }
416 }
417
418 if (stress) {
419 srandom(seed);
420 return stress_test(num_ops, num_objs, obj_bytes, delay_ns, max_len, percent_reads);
421 }
422 if (correctness) {
423 return correctness_test(delay_ns);
424 }
425 }