1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 #include <gtest/gtest.h>
6 #include "common/ceph_argparse.h"
7 #include "common/common_init.h"
8 #include "global/global_init.h"
9 #include "common/config.h"
10 #include "common/Finisher.h"
11 #include "os/filestore/FileJournal.h"
12 #include "include/Context.h"
13 #include "common/Mutex.h"
14 #include "common/safe_io.h"
15 #include "os/filestore/JournalingObjectStore.h"
22 bool directio
, aio
, faio
;
23 const char *description
;
25 { false, false, false, "DIRECTIO OFF AIO OFF" },
26 { true, false, false, "DIRECTIO ON AIO OFF" },
27 { true, true, true, "DIRECTIO ON AIO ON"}
32 Mutex
wait_lock("lock");
52 : lock("C_Sync::lock"), done(false) {
53 c
= new C_SafeCond(&lock
, &cond
, &done
);
57 //cout << "wait" << std::endl;
60 //cout << "waited" << std::endl;
65 unsigned size_mb
= 200;
66 //Gtest argument prefix
67 const char GTEST_PRFIX
[] = "--gtest_";
69 int main(int argc
, char **argv
) {
70 vector
<const char*> args
;
71 argv_to_vec(argc
, (const char **)argv
, args
);
73 auto cct
= global_init(NULL
, args
, CEPH_ENTITY_TYPE_CLIENT
,
74 CODE_ENVIRONMENT_UTILITY
, 0);
75 common_init_finish(g_ceph_context
);
78 sprintf(mb
, "%u", size_mb
);
79 g_ceph_context
->_conf
->set_val("osd_journal_size", mb
);
80 g_ceph_context
->_conf
->apply_changes(NULL
);
82 finisher
= new Finisher(g_ceph_context
);
86 for ( unsigned int i
= 0; i
< args
.size(); ++i
) {
87 if (strncmp(args
[i
], GTEST_PRFIX
, sizeof(GTEST_PRFIX
) - 1)) {
88 //Non gtest argument, set to path.
89 size_t copy_len
= std::min(sizeof(path
) - 1, strlen(args
[i
]));
90 strncpy(path
, args
[i
], copy_len
);
91 path
[copy_len
] = '\0';
96 if ( path
[0] == '\0') {
97 srand(getpid() + time(0));
98 snprintf(path
, sizeof(path
), "/var/tmp/ceph_test_filejournal.tmp.%d", rand());
100 cout
<< "path " << path
<< std::endl
;
102 ::testing::InitGoogleTest(&argc
, argv
);
106 int r
= RUN_ALL_TESTS();
115 TEST(TestFileJournal
, Create
) {
116 g_ceph_context
->_conf
->set_val("journal_ignore_corruption", "false");
117 g_ceph_context
->_conf
->set_val("journal_write_header_frequency", "0");
118 g_ceph_context
->_conf
->apply_changes(NULL
);
120 for (unsigned i
= 0 ; i
< 3; ++i
) {
121 SCOPED_TRACE(subtests
[i
].description
);
122 fsid
.generate_random();
123 FileJournal
fj(g_ceph_context
, fsid
, finisher
, &sync_cond
, path
,
124 subtests
[i
].directio
, subtests
[i
].aio
, subtests
[i
].faio
);
125 ASSERT_EQ(0, fj
.create());
129 TEST(TestFileJournal
, WriteSmall
) {
130 g_ceph_context
->_conf
->set_val("journal_ignore_corruption", "false");
131 g_ceph_context
->_conf
->set_val("journal_write_header_frequency", "0");
132 g_ceph_context
->_conf
->apply_changes(NULL
);
134 for (unsigned i
= 0 ; i
< 3; ++i
) {
135 SCOPED_TRACE(subtests
[i
].description
);
136 fsid
.generate_random();
137 FileJournal
fj(g_ceph_context
, fsid
, finisher
, &sync_cond
, path
,
138 subtests
[i
].directio
, subtests
[i
].aio
, subtests
[i
].faio
);
139 ASSERT_EQ(0, fj
.create());
140 ASSERT_EQ(0, fj
.make_writeable());
142 vector
<ObjectStore::Transaction
> tls
;
145 int orig_len
= fj
.prepare_entry(tls
, &bl
);
146 fj
.reserve_throttle_and_backoff(bl
.length());
147 fj
.submit_entry(1, bl
, orig_len
, new C_SafeCond(&wait_lock
, &cond
, &done
));
154 TEST(TestFileJournal
, WriteBig
) {
155 g_ceph_context
->_conf
->set_val("journal_ignore_corruption", "false");
156 g_ceph_context
->_conf
->set_val("journal_write_header_frequency", "0");
157 g_ceph_context
->_conf
->apply_changes(NULL
);
159 for (unsigned i
= 0 ; i
< 3; ++i
) {
160 SCOPED_TRACE(subtests
[i
].description
);
161 fsid
.generate_random();
162 FileJournal
fj(g_ceph_context
, fsid
, finisher
, &sync_cond
, path
,
163 subtests
[i
].directio
, subtests
[i
].aio
, subtests
[i
].faio
);
164 ASSERT_EQ(0, fj
.create());
165 ASSERT_EQ(0, fj
.make_writeable());
168 while (bl
.length() < size_mb
*1000/2) {
170 memset(foo
, 1, sizeof(foo
));
171 bl
.append(foo
, sizeof(foo
));
173 vector
<ObjectStore::Transaction
> tls
;
174 int orig_len
= fj
.prepare_entry(tls
, &bl
);
175 fj
.reserve_throttle_and_backoff(bl
.length());
176 fj
.submit_entry(1, bl
, orig_len
, new C_SafeCond(&wait_lock
, &cond
, &done
));
182 TEST(TestFileJournal
, WriteMany
) {
183 g_ceph_context
->_conf
->set_val("journal_ignore_corruption", "false");
184 g_ceph_context
->_conf
->set_val("journal_write_header_frequency", "0");
185 g_ceph_context
->_conf
->apply_changes(NULL
);
187 for (unsigned i
= 0 ; i
< 3; ++i
) {
188 SCOPED_TRACE(subtests
[i
].description
);
189 fsid
.generate_random();
190 FileJournal
fj(g_ceph_context
, fsid
, finisher
, &sync_cond
, path
,
191 subtests
[i
].directio
, subtests
[i
].aio
, subtests
[i
].faio
);
192 ASSERT_EQ(0, fj
.create());
193 ASSERT_EQ(0, fj
.make_writeable());
195 C_GatherBuilder
gb(g_ceph_context
, new C_SafeCond(&wait_lock
, &cond
, &done
));
197 vector
<ObjectStore::Transaction
> tls
;
201 for (int i
=0; i
<100; i
++) {
203 int orig_len
= fj
.prepare_entry(tls
, &bl
);
204 fj
.reserve_throttle_and_backoff(bl
.length());
205 fj
.submit_entry(seq
++, bl
, orig_len
, gb
.new_sub());
215 TEST(TestFileJournal
, WriteManyVecs
) {
216 g_ceph_context
->_conf
->set_val("journal_ignore_corruption", "false");
217 g_ceph_context
->_conf
->set_val("journal_write_header_frequency", "0");
218 g_ceph_context
->_conf
->apply_changes(NULL
);
220 for (unsigned i
= 0 ; i
< 3; ++i
) {
221 SCOPED_TRACE(subtests
[i
].description
);
222 fsid
.generate_random();
223 FileJournal
fj(g_ceph_context
, fsid
, finisher
, &sync_cond
, path
,
224 subtests
[i
].directio
, subtests
[i
].aio
, subtests
[i
].faio
);
225 ASSERT_EQ(0, fj
.create());
226 ASSERT_EQ(0, fj
.make_writeable());
228 C_GatherBuilder
gb(g_ceph_context
, new C_SafeCond(&wait_lock
, &cond
, &done
));
231 first
.append("small");
232 vector
<ObjectStore::Transaction
> tls
;
233 int orig_len
= fj
.prepare_entry(tls
, &first
);
234 fj
.reserve_throttle_and_backoff(first
.length());
235 fj
.submit_entry(1, first
, orig_len
, gb
.new_sub());
238 for (int i
=0; i
<IOV_MAX
* 2; i
++) {
239 bufferptr bp
= buffer::create_page_aligned(4096);
240 memset(bp
.c_str(), (char)i
, 4096);
243 bufferlist origbl
= bl
;
244 orig_len
= fj
.prepare_entry(tls
, &bl
);
245 fj
.reserve_throttle_and_backoff(bl
.length());
246 fj
.submit_entry(2, bl
, orig_len
, gb
.new_sub());
256 ASSERT_EQ(true, fj
.read_entry(inbl
, seq
));
257 ASSERT_EQ(seq
, 2ull);
258 ASSERT_TRUE(inbl
.contents_equal(origbl
));
259 ASSERT_EQ(0, fj
.make_writeable());
265 TEST(TestFileJournal
, ReplaySmall
) {
266 g_ceph_context
->_conf
->set_val("journal_ignore_corruption", "false");
267 g_ceph_context
->_conf
->set_val("journal_write_header_frequency", "0");
268 g_ceph_context
->_conf
->apply_changes(NULL
);
270 vector
<ObjectStore::Transaction
> tls
;
272 for (unsigned i
= 0 ; i
< 3; ++i
) {
273 SCOPED_TRACE(subtests
[i
].description
);
274 fsid
.generate_random();
275 FileJournal
fj(g_ceph_context
, fsid
, finisher
, &sync_cond
, path
,
276 subtests
[i
].directio
, subtests
[i
].aio
, subtests
[i
].faio
);
277 ASSERT_EQ(0, fj
.create());
278 ASSERT_EQ(0, fj
.make_writeable());
280 C_GatherBuilder
gb(g_ceph_context
, new C_SafeCond(&wait_lock
, &cond
, &done
));
284 int orig_len
= fj
.prepare_entry(tls
, &bl
);
285 fj
.reserve_throttle_and_backoff(bl
.length());
286 fj
.submit_entry(1, bl
, orig_len
, gb
.new_sub());
288 orig_len
= fj
.prepare_entry(tls
, &bl
);
289 fj
.reserve_throttle_and_backoff(bl
.length());
290 fj
.submit_entry(2, bl
, orig_len
, gb
.new_sub());
292 orig_len
= fj
.prepare_entry(tls
, &bl
);
293 fj
.reserve_throttle_and_backoff(bl
.length());
294 fj
.submit_entry(3, bl
, orig_len
, gb
.new_sub());
305 ASSERT_EQ(true, fj
.read_entry(inbl
, seq
));
306 ASSERT_EQ(seq
, 2ull);
307 inbl
.copy(0, inbl
.length(), v
);
308 ASSERT_EQ("small", v
);
312 ASSERT_EQ(true, fj
.read_entry(inbl
, seq
));
313 ASSERT_EQ(seq
, 3ull);
314 inbl
.copy(0, inbl
.length(), v
);
315 ASSERT_EQ("small", v
);
319 ASSERT_TRUE(!fj
.read_entry(inbl
, seq
));
321 ASSERT_EQ(0, fj
.make_writeable());
326 TEST(TestFileJournal
, ReplayCorrupt
) {
327 g_ceph_context
->_conf
->set_val("journal_ignore_corruption", "true");
328 g_ceph_context
->_conf
->set_val("journal_write_header_frequency", "0");
329 g_ceph_context
->_conf
->apply_changes(NULL
);
331 vector
<ObjectStore::Transaction
> tls
;
332 for (unsigned i
= 0 ; i
< 3; ++i
) {
333 SCOPED_TRACE(subtests
[i
].description
);
334 fsid
.generate_random();
335 FileJournal
fj(g_ceph_context
, fsid
, finisher
, &sync_cond
, path
,
336 subtests
[i
].directio
, subtests
[i
].aio
, subtests
[i
].faio
);
337 ASSERT_EQ(0, fj
.create());
338 ASSERT_EQ(0, fj
.make_writeable());
340 C_GatherBuilder
gb(g_ceph_context
, new C_SafeCond(&wait_lock
, &cond
, &done
));
342 const char *needle
= "i am a needle";
343 const char *newneedle
= "in a haystack";
346 int orig_len
= fj
.prepare_entry(tls
, &bl
);
347 fj
.reserve_throttle_and_backoff(bl
.length());
348 fj
.submit_entry(1, bl
, orig_len
, gb
.new_sub());
350 orig_len
= fj
.prepare_entry(tls
, &bl
);
351 fj
.reserve_throttle_and_backoff(bl
.length());
352 fj
.submit_entry(2, bl
, orig_len
, gb
.new_sub());
354 orig_len
= fj
.prepare_entry(tls
, &bl
);
355 fj
.reserve_throttle_and_backoff(bl
.length());
356 fj
.submit_entry(3, bl
, orig_len
, gb
.new_sub());
358 orig_len
= fj
.prepare_entry(tls
, &bl
);
359 fj
.reserve_throttle_and_backoff(bl
.length());
360 fj
.submit_entry(4, bl
, orig_len
, gb
.new_sub());
366 cout
<< "corrupting journal" << std::endl
;
368 int fd
= open(path
, O_RDONLY
);
370 int r
= safe_read_exact(fd
, buf
, sizeof(buf
));
373 for (unsigned o
=0; o
< sizeof(buf
) - strlen(needle
); o
++) {
374 if (memcmp(buf
+o
, needle
, strlen(needle
)) == 0) {
376 cout
<< "replacing at offset " << o
<< std::endl
;
377 memcpy(buf
+o
, newneedle
, strlen(newneedle
));
379 cout
<< "leaving at offset " << o
<< std::endl
;
386 fd
= open(path
, O_WRONLY
);
388 r
= safe_write(fd
, buf
, sizeof(buf
));
397 ASSERT_EQ(true, fj
.read_entry(inbl
, seq
));
398 ASSERT_EQ(seq
, 2ull);
399 inbl
.copy(0, inbl
.length(), v
);
400 ASSERT_EQ(needle
, v
);
404 ASSERT_FALSE(fj
.read_entry(inbl
, seq
, &corrupt
));
405 ASSERT_TRUE(corrupt
);
407 ASSERT_EQ(0, fj
.make_writeable());
412 TEST(TestFileJournal
, WriteTrim
) {
413 g_ceph_context
->_conf
->set_val("journal_ignore_corruption", "false");
414 g_ceph_context
->_conf
->set_val("journal_write_header_frequency", "0");
415 g_ceph_context
->_conf
->apply_changes(NULL
);
417 for (unsigned i
= 0 ; i
< 3; ++i
) {
418 SCOPED_TRACE(subtests
[i
].description
);
419 fsid
.generate_random();
420 FileJournal
fj(g_ceph_context
, fsid
, finisher
, &sync_cond
, path
,
421 subtests
[i
].directio
, subtests
[i
].aio
, subtests
[i
].faio
);
422 ASSERT_EQ(0, fj
.create());
423 ASSERT_EQ(0, fj
.make_writeable());
429 memset(foo
, 1, sizeof(foo
));
431 uint64_t seq
= 1, committed
= 0;
432 vector
<ObjectStore::Transaction
> tls
;
434 for (unsigned i
=0; i
<size_mb
*2; i
++) {
436 bl
.push_back(buffer::copy(foo
, sizeof(foo
)));
438 ls
.push_back(new C_Sync
);
439 int orig_len
= fj
.prepare_entry(tls
, &bl
);
440 fj
.reserve_throttle_and_backoff(bl
.length());
441 fj
.submit_entry(seq
++, bl
, orig_len
, ls
.back()->c
);
443 while (ls
.size() > size_mb
/2) {
447 fj
.committed_thru(committed
);
454 fj
.committed_thru(++committed
);
457 ASSERT_TRUE(fj
.journalq_empty());
463 TEST(TestFileJournal
, WriteTrimSmall
) {
464 g_ceph_context
->_conf
->set_val("journal_ignore_corruption", "false");
465 g_ceph_context
->_conf
->set_val("journal_write_header_frequency", "0");
466 g_ceph_context
->_conf
->apply_changes(NULL
);
467 vector
<ObjectStore::Transaction
> tls
;
469 for (unsigned i
= 0 ; i
< 3; ++i
) {
470 SCOPED_TRACE(subtests
[i
].description
);
471 fsid
.generate_random();
472 FileJournal
fj(g_ceph_context
, fsid
, finisher
, &sync_cond
, path
,
473 subtests
[i
].directio
, subtests
[i
].aio
, subtests
[i
].faio
);
474 ASSERT_EQ(0, fj
.create());
475 ASSERT_EQ(0, fj
.make_writeable());
481 memset(foo
, 1, sizeof(foo
));
483 uint64_t seq
= 1, committed
= 0;
485 for (unsigned i
=0; i
<size_mb
*2; i
++) {
487 for (int k
=0; k
<128; k
++)
488 bl
.push_back(buffer::copy(foo
, sizeof(foo
) / 128));
490 ls
.push_back(new C_Sync
);
491 int orig_len
= fj
.prepare_entry(tls
, &bl
);
492 fj
.reserve_throttle_and_backoff(bl
.length());
493 fj
.submit_entry(seq
++, bl
, orig_len
, ls
.back()->c
);
495 while (ls
.size() > size_mb
/2) {
499 fj
.committed_thru(committed
);
506 fj
.committed_thru(committed
);
513 TEST(TestFileJournal
, ReplayDetectCorruptFooterMagic
) {
514 g_ceph_context
->_conf
->set_val("journal_ignore_corruption", "true");
515 g_ceph_context
->_conf
->set_val("journal_write_header_frequency", "1");
516 g_ceph_context
->_conf
->apply_changes(NULL
);
518 vector
<ObjectStore::Transaction
> tls
;
519 for (unsigned i
= 0 ; i
< 3; ++i
) {
520 SCOPED_TRACE(subtests
[i
].description
);
521 fsid
.generate_random();
522 FileJournal
fj(g_ceph_context
, fsid
, finisher
, &sync_cond
, path
,
523 subtests
[i
].directio
, subtests
[i
].aio
, subtests
[i
].faio
);
524 ASSERT_EQ(0, fj
.create());
525 ASSERT_EQ(0, fj
.make_writeable());
527 C_GatherBuilder
gb(g_ceph_context
, new C_SafeCond(&wait_lock
, &cond
, &done
));
529 const char *needle
= "i am a needle";
530 for (unsigned i
= 1; i
<= 4; ++i
) {
533 int orig_len
= fj
.prepare_entry(tls
, &bl
);
534 fj
.reserve_throttle_and_backoff(bl
.length());
535 fj
.submit_entry(i
, bl
, orig_len
, gb
.new_sub());
542 int orig_len
= fj
.prepare_entry(tls
, &bl
);
543 fj
.reserve_throttle_and_backoff(bl
.length());
544 fj
.submit_entry(5, bl
, orig_len
, new C_SafeCond(&wait_lock
, &cond
, &done
));
548 int fd
= open(path
, O_WRONLY
);
550 cout
<< "corrupting journal" << std::endl
;
552 fj
.corrupt_footer_magic(fd
, 2);
556 bool corrupt
= false;
557 bool result
= fj
.read_entry(bl
, seq
, &corrupt
);
560 ASSERT_FALSE(corrupt
);
562 result
= fj
.read_entry(bl
, seq
, &corrupt
);
563 ASSERT_FALSE(result
);
564 ASSERT_TRUE(corrupt
);
566 ASSERT_EQ(0, fj
.make_writeable());
572 TEST(TestFileJournal
, ReplayDetectCorruptPayload
) {
573 g_ceph_context
->_conf
->set_val("journal_ignore_corruption", "true");
574 g_ceph_context
->_conf
->set_val("journal_write_header_frequency", "1");
575 g_ceph_context
->_conf
->apply_changes(NULL
);
577 vector
<ObjectStore::Transaction
> tls
;
578 for (unsigned i
= 0 ; i
< 3; ++i
) {
579 SCOPED_TRACE(subtests
[i
].description
);
580 fsid
.generate_random();
581 FileJournal
fj(g_ceph_context
, fsid
, finisher
, &sync_cond
, path
,
582 subtests
[i
].directio
, subtests
[i
].aio
, subtests
[i
].faio
);
583 ASSERT_EQ(0, fj
.create());
584 ASSERT_EQ(0, fj
.make_writeable());
586 C_GatherBuilder
gb(g_ceph_context
, new C_SafeCond(&wait_lock
, &cond
, &done
));
588 const char *needle
= "i am a needle";
589 for (unsigned i
= 1; i
<= 4; ++i
) {
592 int orig_len
= fj
.prepare_entry(tls
, &bl
);
593 fj
.reserve_throttle_and_backoff(bl
.length());
594 fj
.submit_entry(i
, bl
, orig_len
, gb
.new_sub());
601 int orig_len
= fj
.prepare_entry(tls
, &bl
);
602 fj
.reserve_throttle_and_backoff(bl
.length());
603 fj
.submit_entry(5, bl
, orig_len
, new C_SafeCond(&wait_lock
, &cond
, &done
));
607 int fd
= open(path
, O_WRONLY
);
609 cout
<< "corrupting journal" << std::endl
;
611 fj
.corrupt_payload(fd
, 2);
615 bool corrupt
= false;
616 bool result
= fj
.read_entry(bl
, seq
, &corrupt
);
619 ASSERT_FALSE(corrupt
);
621 result
= fj
.read_entry(bl
, seq
, &corrupt
);
622 ASSERT_FALSE(result
);
623 ASSERT_TRUE(corrupt
);
625 ASSERT_EQ(0, fj
.make_writeable());
631 TEST(TestFileJournal
, ReplayDetectCorruptHeader
) {
632 g_ceph_context
->_conf
->set_val("journal_ignore_corruption", "true");
633 g_ceph_context
->_conf
->set_val("journal_write_header_frequency", "1");
634 g_ceph_context
->_conf
->apply_changes(NULL
);
636 vector
<ObjectStore::Transaction
> tls
;
637 for (unsigned i
= 0 ; i
< 3; ++i
) {
638 SCOPED_TRACE(subtests
[i
].description
);
639 fsid
.generate_random();
640 FileJournal
fj(g_ceph_context
, fsid
, finisher
, &sync_cond
, path
,
641 subtests
[i
].directio
, subtests
[i
].aio
, subtests
[i
].faio
);
642 ASSERT_EQ(0, fj
.create());
643 ASSERT_EQ(0, fj
.make_writeable());
645 C_GatherBuilder
gb(g_ceph_context
, new C_SafeCond(&wait_lock
, &cond
, &done
));
647 const char *needle
= "i am a needle";
648 for (unsigned i
= 1; i
<= 4; ++i
) {
651 int orig_len
= fj
.prepare_entry(tls
, &bl
);
652 fj
.reserve_throttle_and_backoff(bl
.length());
653 fj
.submit_entry(i
, bl
, orig_len
, gb
.new_sub());
660 int orig_len
= fj
.prepare_entry(tls
, &bl
);
661 fj
.reserve_throttle_and_backoff(bl
.length());
662 fj
.submit_entry(5, bl
, orig_len
, new C_SafeCond(&wait_lock
, &cond
, &done
));
666 int fd
= open(path
, O_WRONLY
);
668 cout
<< "corrupting journal" << std::endl
;
670 fj
.corrupt_header_magic(fd
, 2);
674 bool corrupt
= false;
675 bool result
= fj
.read_entry(bl
, seq
, &corrupt
);
678 ASSERT_FALSE(corrupt
);
680 result
= fj
.read_entry(bl
, seq
, &corrupt
);
681 ASSERT_FALSE(result
);
682 ASSERT_TRUE(corrupt
);
684 ASSERT_EQ(0, fj
.make_writeable());