1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
9 #include <boost/scoped_ptr.hpp>
11 #include "common/ceph_argparse.h"
12 #include "common/ceph_mutex.h"
13 #include "common/common_init.h"
14 #include "common/config.h"
15 #include "common/snap_types.h"
16 #include "global/global_init.h"
17 #include "include/buffer.h"
18 #include "include/Context.h"
19 #include "include/stringify.h"
20 #include "osdc/ObjectCacher.h"
22 #include "FakeWriteback.h"
23 #include "MemWriteback.h"
29 // XXX: Only tests default namespace
31 op_data(const std::string
&oid
, uint64_t offset
, uint64_t len
, bool read
)
32 : extent(oid
, 0, offset
, len
, 0), is_read(read
)
35 extent
.buffer_extents
.push_back(make_pair(0, len
));
40 ceph::bufferlist result
;
41 std::atomic
<unsigned> done
= { 0 };
44 class C_Count
: public Context
{
46 std::atomic
<unsigned> *m_outstanding
= nullptr;
48 C_Count(op_data
*op
, std::atomic
<unsigned> *outstanding
)
49 : m_op(op
), m_outstanding(outstanding
) {}
50 void finish(int r
) override
{
52 ceph_assert(*m_outstanding
> 0);
57 int stress_test(uint64_t num_ops
, uint64_t num_objs
,
58 uint64_t max_obj_size
, uint64_t delay_ns
,
59 uint64_t max_op_len
, float percent_reads
)
61 ceph::mutex lock
= ceph::make_mutex("object_cacher_stress::object_cacher");
62 FakeWriteback
writeback(g_ceph_context
, &lock
, delay_ns
);
64 ObjectCacher
obc(g_ceph_context
, "test", writeback
, lock
, NULL
, NULL
,
65 g_conf()->client_oc_size
,
66 g_conf()->client_oc_max_objects
,
67 g_conf()->client_oc_max_dirty
,
68 g_conf()->client_oc_target_dirty
,
69 g_conf()->client_oc_max_dirty_age
,
73 std::atomic
<unsigned> outstanding_reads
= { 0 };
74 vector
<std::shared_ptr
<op_data
> > ops
;
75 ObjectCacher::ObjectSet
object_set(NULL
, 0, 0);
77 ceph::buffer::ptr
bp(max_op_len
);
79 uint64_t journal_tid
= 0;
84 std::cout
<< "Test configuration:\n\n"
85 << setw(10) << "ops: " << num_ops
<< "\n"
86 << setw(10) << "objects: " << num_objs
<< "\n"
87 << setw(10) << "obj size: " << max_obj_size
<< "\n"
88 << setw(10) << "delay: " << delay_ns
<< "\n"
89 << setw(10) << "max op len: " << max_op_len
<< "\n"
90 << setw(10) << "percent reads: " << percent_reads
<< "\n\n";
92 for (uint64_t i
= 0; i
< num_ops
; ++i
) {
93 uint64_t offset
= random() % max_obj_size
;
94 uint64_t max_len
= std::min(max_obj_size
- offset
, max_op_len
);
95 // no zero-length operations
96 uint64_t length
= random() % (std::max
<uint64_t>(max_len
- 1, 1)) + 1;
97 std::string oid
= "test" + stringify(random() % num_objs
);
98 bool is_read
= random() < percent_reads
* float(RAND_MAX
);
99 std::shared_ptr
<op_data
> op(new op_data(oid
, offset
, length
, is_read
));
101 std::cout
<< "op " << i
<< " " << (is_read
? "read" : "write")
102 << " " << op
->extent
<< "\n";
104 ObjectCacher::OSDRead
*rd
= obc
.prepare_read(CEPH_NOSNAP
, &op
->result
, 0);
105 rd
->extents
.push_back(op
->extent
);
107 Context
*completion
= new C_Count(op
.get(), &outstanding_reads
);
109 int r
= obc
.readx(rd
, &object_set
, completion
);
112 if ((uint64_t)r
== length
)
113 completion
->complete(r
);
117 ObjectCacher::OSDWrite
*wr
= obc
.prepare_write(snapc
, bl
,
118 ceph::real_clock::zero(), 0,
120 wr
->extents
.push_back(op
->extent
);
122 obc
.writex(wr
, &object_set
, NULL
);
127 // check that all reads completed
128 for (uint64_t i
= 0; i
< num_ops
; ++i
) {
129 if (!ops
[i
]->is_read
)
131 std::cout
<< "waiting for read " << i
<< ops
[i
]->extent
<< std::endl
;
140 std::cout
<< "completion called more than once!\n" << std::endl
;
146 obc
.release_set(&object_set
);
150 ceph::mutex mylock
= ceph::make_mutex("librbd::ImageCtx::flush_cache");
151 ceph::condition_variable cond
;
153 Context
*onfinish
= new C_SafeCond(mylock
, cond
, &done
, &r
);
155 bool already_flushed
= obc
.flush_set(&object_set
, onfinish
);
156 std::cout
<< "already flushed = " << already_flushed
<< std::endl
;
159 std::unique_lock locker
{mylock
};
160 cond
.wait(locker
, [&done
] { return done
; });
163 bool unclean
= obc
.release_set(&object_set
);
167 std::cout
<< "unclean buffers left over!" << std::endl
;
173 std::cout
<< "Test completed successfully." << std::endl
;
178 int correctness_test(uint64_t delay_ns
)
180 std::cerr
<< "starting correctness test" << std::endl
;
181 ceph::mutex lock
= ceph::make_mutex("object_cacher_stress::object_cacher");
182 MemWriteback
writeback(g_ceph_context
, &lock
, delay_ns
);
184 ObjectCacher
obc(g_ceph_context
, "test", writeback
, lock
, NULL
, NULL
,
185 1<<21, // max cache size, 2MB
186 1, // max objects, just one
187 1<<18, // max dirty, 256KB
188 1<<17, // target dirty, 128KB
189 g_conf()->client_oc_max_dirty_age
,
192 std::cerr
<< "just start()ed ObjectCacher" << std::endl
;
195 ceph_tid_t journal_tid
= 0;
196 std::string
oid("correctness_test_obj");
197 ObjectCacher::ObjectSet
object_set(NULL
, 0, 0);
198 ceph::bufferlist zeroes_bl
;
199 zeroes_bl
.append_zero(1<<20);
201 // set up a 4MB all-zero object
202 std::cerr
<< "writing 4x1MB object" << std::endl
;
203 std::map
<int, C_SaferCond
> create_finishers
;
204 for (int i
= 0; i
< 4; ++i
) {
205 ObjectCacher::OSDWrite
*wr
= obc
.prepare_write(snapc
, zeroes_bl
,
206 ceph::real_clock::zero(), 0,
208 ObjectExtent
extent(oid
, 0, zeroes_bl
.length()*i
, zeroes_bl
.length(), 0);
209 extent
.oloc
.pool
= 0;
210 extent
.buffer_extents
.push_back(make_pair(0, 1<<20));
211 wr
->extents
.push_back(extent
);
213 obc
.writex(wr
, &object_set
, &create_finishers
[i
]);
217 // write some 1-valued bits at 256-KB intervals for checking consistency
218 std::cerr
<< "Writing some 0xff values" << std::endl
;
219 ceph::buffer::ptr
ones(1<<16);
220 memset(ones
.c_str(), 0xff, ones
.length());
221 ceph::bufferlist ones_bl
;
222 ones_bl
.append(ones
);
223 for (int i
= 1<<18; i
< 1<<22; i
+=1<<18) {
224 ObjectCacher::OSDWrite
*wr
= obc
.prepare_write(snapc
, ones_bl
,
225 ceph::real_clock::zero(), 0,
227 ObjectExtent
extent(oid
, 0, i
, ones_bl
.length(), 0);
228 extent
.oloc
.pool
= 0;
229 extent
.buffer_extents
.push_back(make_pair(0, 1<<16));
230 wr
->extents
.push_back(extent
);
232 obc
.writex(wr
, &object_set
, &create_finishers
[i
]);
236 for (auto i
= create_finishers
.begin(); i
!= create_finishers
.end(); ++i
) {
239 std::cout
<< "Finished setting up object" << std::endl
;
241 C_SaferCond flushcond
;
242 bool done
= obc
.flush_all(&flushcond
);
244 std::cout
<< "Waiting for flush" << std::endl
;
251 /* now read the back half of the object in, check consistency,
253 std::cout
<< "Reading back half of object (1<<21~1<<21)" << std::endl
;
255 C_SaferCond backreadcond
;
256 ObjectCacher::OSDRead
*back_half_rd
= obc
.prepare_read(CEPH_NOSNAP
, &readbl
, 0);
257 ObjectExtent
back_half_extent(oid
, 0, 1<<21, 1<<21, 0);
258 back_half_extent
.oloc
.pool
= 0;
259 back_half_extent
.buffer_extents
.push_back(make_pair(0, 1<<21));
260 back_half_rd
->extents
.push_back(back_half_extent
);
262 int r
= obc
.readx(back_half_rd
, &object_set
, &backreadcond
);
266 std::cout
<< "Waiting to read data into cache" << std::endl
;
267 r
= backreadcond
.wait();
270 ceph_assert(r
== 1<<21);
272 /* Read the whole object in,
273 * verify we have to wait for it to complete,
274 * overwrite a small piece, (http://tracker.ceph.com/issues/16002),
275 * and check consistency */
278 std::cout
<< "Reading whole object (0~1<<22)" << std::endl
;
279 C_SaferCond frontreadcond
;
280 ObjectCacher::OSDRead
*whole_rd
= obc
.prepare_read(CEPH_NOSNAP
, &readbl
, 0);
281 ObjectExtent
whole_extent(oid
, 0, 0, 1<<22, 0);
282 whole_extent
.oloc
.pool
= 0;
283 whole_extent
.buffer_extents
.push_back(make_pair(0, 1<<22));
284 whole_rd
->extents
.push_back(whole_extent
);
286 r
= obc
.readx(whole_rd
, &object_set
, &frontreadcond
);
287 // we cleared out the cache by reading back half, it shouldn't pass immediately!
289 std::cout
<< "Data (correctly) not available without fetching" << std::endl
;
291 ObjectCacher::OSDWrite
*verify_wr
= obc
.prepare_write(snapc
, ones_bl
,
292 ceph::real_clock::zero(), 0,
294 ObjectExtent
verify_extent(oid
, 0, (1<<18)+(1<<16), ones_bl
.length(), 0);
295 verify_extent
.oloc
.pool
= 0;
296 verify_extent
.buffer_extents
.push_back(make_pair(0, 1<<16));
297 verify_wr
->extents
.push_back(verify_extent
);
298 C_SaferCond verify_finisher
;
299 obc
.writex(verify_wr
, &object_set
, &verify_finisher
);
301 std::cout
<< "wrote dirtying data" << std::endl
;
303 std::cout
<< "Waiting to read data into cache" << std::endl
;
304 frontreadcond
.wait();
305 verify_finisher
.wait();
307 std::cout
<< "Validating data" << std::endl
;
309 for (int i
= 1<<18; i
< 1<<22; i
+=1<<18) {
310 bufferlist ones_maybe
;
311 ones_maybe
.substr_of(readbl
, i
, ones_bl
.length());
312 ceph_assert(0 == memcmp(ones_maybe
.c_str(), ones_bl
.c_str(), ones_bl
.length()));
314 bufferlist ones_maybe
;
315 ones_maybe
.substr_of(readbl
, (1<<18)+(1<<16), ones_bl
.length());
316 ceph_assert(0 == memcmp(ones_maybe
.c_str(), ones_bl
.c_str(), ones_bl
.length()));
318 std::cout
<< "validated that data is 0xff where it should be" << std::endl
;
321 C_SaferCond flushcond2
;
322 done
= obc
.flush_all(&flushcond2
);
324 std::cout
<< "Waiting for final write flush" << std::endl
;
330 bool unclean
= obc
.release_set(&object_set
);
332 std::cout
<< "unclean buffers left over!" << std::endl
;
333 vector
<ObjectExtent
> discard_extents
;
335 for (auto oi
= object_set
.objects
.begin(); !oi
.end(); ++oi
) {
336 discard_extents
.emplace_back(oid
, i
++, 0, 1<<22, 0);
338 obc
.discard_set(&object_set
, discard_extents
);
347 std::cout
<< "Testing ObjectCacher correctness complete" << std::endl
;
354 int main(int argc
, const char **argv
)
356 auto args
= argv_to_vec(argc
, argv
);
357 auto cct
= global_init(nullptr, args
, CEPH_ENTITY_TYPE_CLIENT
,
358 CODE_ENVIRONMENT_UTILITY
,
359 CINIT_FLAG_NO_DEFAULT_CONFIG_FILE
);
361 long long delay_ns
= 0;
362 long long num_ops
= 1000;
363 long long obj_bytes
= 4 << 20;
364 long long max_len
= 128 << 10;
365 long long num_objs
= 10;
366 float percent_reads
= 0.90;
367 int seed
= time(0) % 100000;
369 bool correctness
= false;
370 std::ostringstream err
;
371 std::vector
<const char*>::iterator i
;
372 for (i
= args
.begin(); i
!= args
.end();) {
373 if (ceph_argparse_witharg(args
, i
, &delay_ns
, err
, "--delay-ns", (char*)NULL
)) {
374 if (!err
.str().empty()) {
375 cerr
<< argv
[0] << ": " << err
.str() << std::endl
;
378 } else if (ceph_argparse_witharg(args
, i
, &num_ops
, err
, "--ops", (char*)NULL
)) {
379 if (!err
.str().empty()) {
380 cerr
<< argv
[0] << ": " << err
.str() << std::endl
;
383 } else if (ceph_argparse_witharg(args
, i
, &num_objs
, err
, "--objects", (char*)NULL
)) {
384 if (!err
.str().empty()) {
385 cerr
<< argv
[0] << ": " << err
.str() << std::endl
;
388 } else if (ceph_argparse_witharg(args
, i
, &obj_bytes
, err
, "--obj-size", (char*)NULL
)) {
389 if (!err
.str().empty()) {
390 cerr
<< argv
[0] << ": " << err
.str() << std::endl
;
393 } else if (ceph_argparse_witharg(args
, i
, &max_len
, err
, "--max-op-size", (char*)NULL
)) {
394 if (!err
.str().empty()) {
395 cerr
<< argv
[0] << ": " << err
.str() << std::endl
;
398 } else if (ceph_argparse_witharg(args
, i
, &percent_reads
, err
, "--percent-read", (char*)NULL
)) {
399 if (!err
.str().empty()) {
400 cerr
<< argv
[0] << ": " << err
.str() << std::endl
;
403 } else if (ceph_argparse_witharg(args
, i
, &seed
, err
, "--seed", (char*)NULL
)) {
404 if (!err
.str().empty()) {
405 cerr
<< argv
[0] << ": " << err
.str() << std::endl
;
408 } else if (ceph_argparse_flag(args
, i
, "--stress-test", NULL
)) {
410 } else if (ceph_argparse_flag(args
, i
, "--correctness-test", NULL
)) {
413 cerr
<< "unknown option " << *i
<< std::endl
;
420 return stress_test(num_ops
, num_objs
, obj_bytes
, delay_ns
, max_len
, percent_reads
);
423 return correctness_test(delay_ns
);