1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 #include <gtest/gtest.h>
6 #include "common/ceph_argparse.h"
7 #include "common/common_init.h"
8 #include "global/global_init.h"
9 #include "common/config.h"
10 #include "common/Finisher.h"
11 #include "os/filestore/FileJournal.h"
12 #include "include/Context.h"
13 #include "common/Mutex.h"
14 #include "common/safe_io.h"
15 #include "os/filestore/JournalingObjectStore.h"
22 bool directio
, aio
, faio
;
23 const char *description
;
25 { false, false, false, "DIRECTIO OFF AIO OFF" },
26 { true, false, false, "DIRECTIO ON AIO OFF" },
27 { true, true, true, "DIRECTIO ON AIO ON"}
32 Mutex
wait_lock("lock");
52 : lock("C_Sync::lock"), done(false) {
53 c
= new C_SafeCond(&lock
, &cond
, &done
);
57 //cout << "wait" << std::endl;
60 //cout << "waited" << std::endl;
65 unsigned size_mb
= 200;
66 //Gtest argument prefix
67 const char GTEST_PRFIX
[] = "--gtest_";
69 int main(int argc
, char **argv
) {
70 vector
<const char*> args
;
71 argv_to_vec(argc
, (const char **)argv
, args
);
73 auto cct
= global_init(NULL
, args
, CEPH_ENTITY_TYPE_CLIENT
,
74 CODE_ENVIRONMENT_UTILITY
,
75 CINIT_FLAG_NO_DEFAULT_CONFIG_FILE
);
76 common_init_finish(g_ceph_context
);
79 sprintf(mb
, "%u", size_mb
);
80 g_ceph_context
->_conf
.set_val("osd_journal_size", mb
);
81 g_ceph_context
->_conf
.apply_changes(nullptr);
83 finisher
= new Finisher(g_ceph_context
);
87 for ( unsigned int i
= 0; i
< args
.size(); ++i
) {
88 if (strncmp(args
[i
], GTEST_PRFIX
, sizeof(GTEST_PRFIX
) - 1)) {
89 //Non gtest argument, set to path.
90 size_t copy_len
= std::min(sizeof(path
) - 1, strlen(args
[i
]));
91 strncpy(path
, args
[i
], copy_len
);
92 path
[copy_len
] = '\0';
97 if ( path
[0] == '\0') {
98 srand(getpid() + time(0));
99 snprintf(path
, sizeof(path
), "/var/tmp/ceph_test_filejournal.tmp.%d", rand());
101 cout
<< "path " << path
<< std::endl
;
103 ::testing::InitGoogleTest(&argc
, argv
);
107 int r
= RUN_ALL_TESTS();
116 TEST(TestFileJournal
, Create
) {
117 g_ceph_context
->_conf
.set_val("journal_ignore_corruption", "false");
118 g_ceph_context
->_conf
.set_val("journal_write_header_frequency", "0");
119 g_ceph_context
->_conf
.apply_changes(nullptr);
121 for (unsigned i
= 0 ; i
< 3; ++i
) {
122 SCOPED_TRACE(subtests
[i
].description
);
123 fsid
.generate_random();
124 FileJournal
fj(g_ceph_context
, fsid
, finisher
, &sync_cond
, path
,
125 subtests
[i
].directio
, subtests
[i
].aio
, subtests
[i
].faio
);
126 ASSERT_EQ(0, fj
.create());
130 TEST(TestFileJournal
, WriteSmall
) {
131 g_ceph_context
->_conf
.set_val("journal_ignore_corruption", "false");
132 g_ceph_context
->_conf
.set_val("journal_write_header_frequency", "0");
133 g_ceph_context
->_conf
.apply_changes(nullptr);
135 for (unsigned i
= 0 ; i
< 3; ++i
) {
136 SCOPED_TRACE(subtests
[i
].description
);
137 fsid
.generate_random();
138 FileJournal
fj(g_ceph_context
, fsid
, finisher
, &sync_cond
, path
,
139 subtests
[i
].directio
, subtests
[i
].aio
, subtests
[i
].faio
);
140 ASSERT_EQ(0, fj
.create());
141 ASSERT_EQ(0, fj
.make_writeable());
143 vector
<ObjectStore::Transaction
> tls
;
146 int orig_len
= fj
.prepare_entry(tls
, &bl
);
147 fj
.reserve_throttle_and_backoff(bl
.length());
148 fj
.submit_entry(1, bl
, orig_len
, new C_SafeCond(&wait_lock
, &cond
, &done
));
155 TEST(TestFileJournal
, WriteBig
) {
156 g_ceph_context
->_conf
.set_val("journal_ignore_corruption", "false");
157 g_ceph_context
->_conf
.set_val("journal_write_header_frequency", "0");
158 g_ceph_context
->_conf
.apply_changes(nullptr);
160 for (unsigned i
= 0 ; i
< 3; ++i
) {
161 SCOPED_TRACE(subtests
[i
].description
);
162 fsid
.generate_random();
163 FileJournal
fj(g_ceph_context
, fsid
, finisher
, &sync_cond
, path
,
164 subtests
[i
].directio
, subtests
[i
].aio
, subtests
[i
].faio
);
165 ASSERT_EQ(0, fj
.create());
166 ASSERT_EQ(0, fj
.make_writeable());
169 while (bl
.length() < size_mb
*1000/2) {
171 memset(foo
, 1, sizeof(foo
));
172 bl
.append(foo
, sizeof(foo
));
174 vector
<ObjectStore::Transaction
> tls
;
175 int orig_len
= fj
.prepare_entry(tls
, &bl
);
176 fj
.reserve_throttle_and_backoff(bl
.length());
177 fj
.submit_entry(1, bl
, orig_len
, new C_SafeCond(&wait_lock
, &cond
, &done
));
183 TEST(TestFileJournal
, WriteMany
) {
184 g_ceph_context
->_conf
.set_val("journal_ignore_corruption", "false");
185 g_ceph_context
->_conf
.set_val("journal_write_header_frequency", "0");
186 g_ceph_context
->_conf
.apply_changes(nullptr);
188 for (unsigned i
= 0 ; i
< 3; ++i
) {
189 SCOPED_TRACE(subtests
[i
].description
);
190 fsid
.generate_random();
191 FileJournal
fj(g_ceph_context
, fsid
, finisher
, &sync_cond
, path
,
192 subtests
[i
].directio
, subtests
[i
].aio
, subtests
[i
].faio
);
193 ASSERT_EQ(0, fj
.create());
194 ASSERT_EQ(0, fj
.make_writeable());
196 C_GatherBuilder
gb(g_ceph_context
, new C_SafeCond(&wait_lock
, &cond
, &done
));
198 vector
<ObjectStore::Transaction
> tls
;
202 for (int i
=0; i
<100; i
++) {
204 int orig_len
= fj
.prepare_entry(tls
, &bl
);
205 fj
.reserve_throttle_and_backoff(bl
.length());
206 fj
.submit_entry(seq
++, bl
, orig_len
, gb
.new_sub());
216 TEST(TestFileJournal
, WriteManyVecs
) {
217 g_ceph_context
->_conf
.set_val("journal_ignore_corruption", "false");
218 g_ceph_context
->_conf
.set_val("journal_write_header_frequency", "0");
219 g_ceph_context
->_conf
.apply_changes(nullptr);
221 for (unsigned i
= 0 ; i
< 3; ++i
) {
222 SCOPED_TRACE(subtests
[i
].description
);
223 fsid
.generate_random();
224 FileJournal
fj(g_ceph_context
, fsid
, finisher
, &sync_cond
, path
,
225 subtests
[i
].directio
, subtests
[i
].aio
, subtests
[i
].faio
);
226 ASSERT_EQ(0, fj
.create());
227 ASSERT_EQ(0, fj
.make_writeable());
229 C_GatherBuilder
gb(g_ceph_context
, new C_SafeCond(&wait_lock
, &cond
, &done
));
232 first
.append("small");
233 vector
<ObjectStore::Transaction
> tls
;
234 int orig_len
= fj
.prepare_entry(tls
, &first
);
235 fj
.reserve_throttle_and_backoff(first
.length());
236 fj
.submit_entry(1, first
, orig_len
, gb
.new_sub());
239 for (int i
=0; i
<IOV_MAX
* 2; i
++) {
240 bufferptr bp
= buffer::create_page_aligned(4096);
241 memset(bp
.c_str(), (char)i
, 4096);
244 bufferlist origbl
= bl
;
245 orig_len
= fj
.prepare_entry(tls
, &bl
);
246 fj
.reserve_throttle_and_backoff(bl
.length());
247 fj
.submit_entry(2, bl
, orig_len
, gb
.new_sub());
257 ASSERT_EQ(true, fj
.read_entry(inbl
, seq
));
258 ASSERT_EQ(seq
, 2ull);
259 ASSERT_TRUE(inbl
.contents_equal(origbl
));
260 ASSERT_EQ(0, fj
.make_writeable());
266 TEST(TestFileJournal
, ReplaySmall
) {
267 g_ceph_context
->_conf
.set_val("journal_ignore_corruption", "false");
268 g_ceph_context
->_conf
.set_val("journal_write_header_frequency", "0");
269 g_ceph_context
->_conf
.apply_changes(nullptr);
271 vector
<ObjectStore::Transaction
> tls
;
273 for (unsigned i
= 0 ; i
< 3; ++i
) {
274 SCOPED_TRACE(subtests
[i
].description
);
275 fsid
.generate_random();
276 FileJournal
fj(g_ceph_context
, fsid
, finisher
, &sync_cond
, path
,
277 subtests
[i
].directio
, subtests
[i
].aio
, subtests
[i
].faio
);
278 ASSERT_EQ(0, fj
.create());
279 ASSERT_EQ(0, fj
.make_writeable());
281 C_GatherBuilder
gb(g_ceph_context
, new C_SafeCond(&wait_lock
, &cond
, &done
));
285 int orig_len
= fj
.prepare_entry(tls
, &bl
);
286 fj
.reserve_throttle_and_backoff(bl
.length());
287 fj
.submit_entry(1, bl
, orig_len
, gb
.new_sub());
289 orig_len
= fj
.prepare_entry(tls
, &bl
);
290 fj
.reserve_throttle_and_backoff(bl
.length());
291 fj
.submit_entry(2, bl
, orig_len
, gb
.new_sub());
293 orig_len
= fj
.prepare_entry(tls
, &bl
);
294 fj
.reserve_throttle_and_backoff(bl
.length());
295 fj
.submit_entry(3, bl
, orig_len
, gb
.new_sub());
306 ASSERT_EQ(true, fj
.read_entry(inbl
, seq
));
307 ASSERT_EQ(seq
, 2ull);
308 inbl
.copy(0, inbl
.length(), v
);
309 ASSERT_EQ("small", v
);
313 ASSERT_EQ(true, fj
.read_entry(inbl
, seq
));
314 ASSERT_EQ(seq
, 3ull);
315 inbl
.copy(0, inbl
.length(), v
);
316 ASSERT_EQ("small", v
);
320 ASSERT_TRUE(!fj
.read_entry(inbl
, seq
));
322 ASSERT_EQ(0, fj
.make_writeable());
327 TEST(TestFileJournal
, ReplayCorrupt
) {
328 g_ceph_context
->_conf
.set_val("journal_ignore_corruption", "true");
329 g_ceph_context
->_conf
.set_val("journal_write_header_frequency", "0");
330 g_ceph_context
->_conf
.apply_changes(nullptr);
332 vector
<ObjectStore::Transaction
> tls
;
333 for (unsigned i
= 0 ; i
< 3; ++i
) {
334 SCOPED_TRACE(subtests
[i
].description
);
335 fsid
.generate_random();
336 FileJournal
fj(g_ceph_context
, fsid
, finisher
, &sync_cond
, path
,
337 subtests
[i
].directio
, subtests
[i
].aio
, subtests
[i
].faio
);
338 ASSERT_EQ(0, fj
.create());
339 ASSERT_EQ(0, fj
.make_writeable());
341 C_GatherBuilder
gb(g_ceph_context
, new C_SafeCond(&wait_lock
, &cond
, &done
));
343 const char *needle
= "i am a needle";
344 const char *newneedle
= "in a haystack";
347 int orig_len
= fj
.prepare_entry(tls
, &bl
);
348 fj
.reserve_throttle_and_backoff(bl
.length());
349 fj
.submit_entry(1, bl
, orig_len
, gb
.new_sub());
351 orig_len
= fj
.prepare_entry(tls
, &bl
);
352 fj
.reserve_throttle_and_backoff(bl
.length());
353 fj
.submit_entry(2, bl
, orig_len
, gb
.new_sub());
355 orig_len
= fj
.prepare_entry(tls
, &bl
);
356 fj
.reserve_throttle_and_backoff(bl
.length());
357 fj
.submit_entry(3, bl
, orig_len
, gb
.new_sub());
359 orig_len
= fj
.prepare_entry(tls
, &bl
);
360 fj
.reserve_throttle_and_backoff(bl
.length());
361 fj
.submit_entry(4, bl
, orig_len
, gb
.new_sub());
367 cout
<< "corrupting journal" << std::endl
;
369 int fd
= open(path
, O_RDONLY
);
371 int r
= safe_read_exact(fd
, buf
, sizeof(buf
));
374 for (unsigned o
=0; o
< sizeof(buf
) - strlen(needle
); o
++) {
375 if (memcmp(buf
+o
, needle
, strlen(needle
)) == 0) {
377 cout
<< "replacing at offset " << o
<< std::endl
;
378 memcpy(buf
+o
, newneedle
, strlen(newneedle
));
380 cout
<< "leaving at offset " << o
<< std::endl
;
387 fd
= open(path
, O_WRONLY
);
389 r
= safe_write(fd
, buf
, sizeof(buf
));
398 ASSERT_EQ(true, fj
.read_entry(inbl
, seq
));
399 ASSERT_EQ(seq
, 2ull);
400 inbl
.copy(0, inbl
.length(), v
);
401 ASSERT_EQ(needle
, v
);
405 ASSERT_FALSE(fj
.read_entry(inbl
, seq
, &corrupt
));
406 ASSERT_TRUE(corrupt
);
408 ASSERT_EQ(0, fj
.make_writeable());
413 TEST(TestFileJournal
, WriteTrim
) {
414 g_ceph_context
->_conf
.set_val("journal_ignore_corruption", "false");
415 g_ceph_context
->_conf
.set_val("journal_write_header_frequency", "0");
416 g_ceph_context
->_conf
.apply_changes(nullptr);
418 for (unsigned i
= 0 ; i
< 3; ++i
) {
419 SCOPED_TRACE(subtests
[i
].description
);
420 fsid
.generate_random();
421 FileJournal
fj(g_ceph_context
, fsid
, finisher
, &sync_cond
, path
,
422 subtests
[i
].directio
, subtests
[i
].aio
, subtests
[i
].faio
);
423 ASSERT_EQ(0, fj
.create());
424 ASSERT_EQ(0, fj
.make_writeable());
430 memset(foo
, 1, sizeof(foo
));
432 uint64_t seq
= 1, committed
= 0;
433 vector
<ObjectStore::Transaction
> tls
;
435 for (unsigned i
=0; i
<size_mb
*2; i
++) {
437 bl
.push_back(buffer::copy(foo
, sizeof(foo
)));
439 ls
.push_back(new C_Sync
);
440 int orig_len
= fj
.prepare_entry(tls
, &bl
);
441 fj
.reserve_throttle_and_backoff(bl
.length());
442 fj
.submit_entry(seq
++, bl
, orig_len
, ls
.back()->c
);
444 while (ls
.size() > size_mb
/2) {
448 fj
.committed_thru(committed
);
455 fj
.committed_thru(++committed
);
458 ASSERT_TRUE(fj
.journalq_empty());
464 TEST(TestFileJournal
, WriteTrimSmall
) {
465 g_ceph_context
->_conf
.set_val("journal_ignore_corruption", "false");
466 g_ceph_context
->_conf
.set_val("journal_write_header_frequency", "0");
467 g_ceph_context
->_conf
.apply_changes(nullptr);
468 vector
<ObjectStore::Transaction
> tls
;
470 for (unsigned i
= 0 ; i
< 3; ++i
) {
471 SCOPED_TRACE(subtests
[i
].description
);
472 fsid
.generate_random();
473 FileJournal
fj(g_ceph_context
, fsid
, finisher
, &sync_cond
, path
,
474 subtests
[i
].directio
, subtests
[i
].aio
, subtests
[i
].faio
);
475 ASSERT_EQ(0, fj
.create());
476 ASSERT_EQ(0, fj
.make_writeable());
482 memset(foo
, 1, sizeof(foo
));
484 uint64_t seq
= 1, committed
= 0;
486 for (unsigned i
=0; i
<size_mb
*2; i
++) {
488 for (int k
=0; k
<128; k
++)
489 bl
.push_back(buffer::copy(foo
, sizeof(foo
) / 128));
491 ls
.push_back(new C_Sync
);
492 int orig_len
= fj
.prepare_entry(tls
, &bl
);
493 fj
.reserve_throttle_and_backoff(bl
.length());
494 fj
.submit_entry(seq
++, bl
, orig_len
, ls
.back()->c
);
496 while (ls
.size() > size_mb
/2) {
500 fj
.committed_thru(committed
);
507 fj
.committed_thru(committed
);
514 TEST(TestFileJournal
, ReplayDetectCorruptFooterMagic
) {
515 g_ceph_context
->_conf
.set_val("journal_ignore_corruption", "true");
516 g_ceph_context
->_conf
.set_val("journal_write_header_frequency", "1");
517 g_ceph_context
->_conf
.apply_changes(nullptr);
519 vector
<ObjectStore::Transaction
> tls
;
520 for (unsigned i
= 0 ; i
< 3; ++i
) {
521 SCOPED_TRACE(subtests
[i
].description
);
522 fsid
.generate_random();
523 FileJournal
fj(g_ceph_context
, fsid
, finisher
, &sync_cond
, path
,
524 subtests
[i
].directio
, subtests
[i
].aio
, subtests
[i
].faio
);
525 ASSERT_EQ(0, fj
.create());
526 ASSERT_EQ(0, fj
.make_writeable());
528 C_GatherBuilder
gb(g_ceph_context
, new C_SafeCond(&wait_lock
, &cond
, &done
));
530 const char *needle
= "i am a needle";
531 for (unsigned i
= 1; i
<= 4; ++i
) {
534 int orig_len
= fj
.prepare_entry(tls
, &bl
);
535 fj
.reserve_throttle_and_backoff(bl
.length());
536 fj
.submit_entry(i
, bl
, orig_len
, gb
.new_sub());
543 int orig_len
= fj
.prepare_entry(tls
, &bl
);
544 fj
.reserve_throttle_and_backoff(bl
.length());
545 fj
.submit_entry(5, bl
, orig_len
, new C_SafeCond(&wait_lock
, &cond
, &done
));
549 int fd
= open(path
, O_WRONLY
);
551 cout
<< "corrupting journal" << std::endl
;
553 fj
.corrupt_footer_magic(fd
, 2);
557 bool corrupt
= false;
558 bool result
= fj
.read_entry(bl
, seq
, &corrupt
);
561 ASSERT_FALSE(corrupt
);
563 result
= fj
.read_entry(bl
, seq
, &corrupt
);
564 ASSERT_FALSE(result
);
565 ASSERT_TRUE(corrupt
);
567 ASSERT_EQ(0, fj
.make_writeable());
573 TEST(TestFileJournal
, ReplayDetectCorruptPayload
) {
574 g_ceph_context
->_conf
.set_val("journal_ignore_corruption", "true");
575 g_ceph_context
->_conf
.set_val("journal_write_header_frequency", "1");
576 g_ceph_context
->_conf
.apply_changes(nullptr);
578 vector
<ObjectStore::Transaction
> tls
;
579 for (unsigned i
= 0 ; i
< 3; ++i
) {
580 SCOPED_TRACE(subtests
[i
].description
);
581 fsid
.generate_random();
582 FileJournal
fj(g_ceph_context
, fsid
, finisher
, &sync_cond
, path
,
583 subtests
[i
].directio
, subtests
[i
].aio
, subtests
[i
].faio
);
584 ASSERT_EQ(0, fj
.create());
585 ASSERT_EQ(0, fj
.make_writeable());
587 C_GatherBuilder
gb(g_ceph_context
, new C_SafeCond(&wait_lock
, &cond
, &done
));
589 const char *needle
= "i am a needle";
590 for (unsigned i
= 1; i
<= 4; ++i
) {
593 int orig_len
= fj
.prepare_entry(tls
, &bl
);
594 fj
.reserve_throttle_and_backoff(bl
.length());
595 fj
.submit_entry(i
, bl
, orig_len
, gb
.new_sub());
602 int orig_len
= fj
.prepare_entry(tls
, &bl
);
603 fj
.reserve_throttle_and_backoff(bl
.length());
604 fj
.submit_entry(5, bl
, orig_len
, new C_SafeCond(&wait_lock
, &cond
, &done
));
608 int fd
= open(path
, O_WRONLY
);
610 cout
<< "corrupting journal" << std::endl
;
612 fj
.corrupt_payload(fd
, 2);
616 bool corrupt
= false;
617 bool result
= fj
.read_entry(bl
, seq
, &corrupt
);
620 ASSERT_FALSE(corrupt
);
622 result
= fj
.read_entry(bl
, seq
, &corrupt
);
623 ASSERT_FALSE(result
);
624 ASSERT_TRUE(corrupt
);
626 ASSERT_EQ(0, fj
.make_writeable());
632 TEST(TestFileJournal
, ReplayDetectCorruptHeader
) {
633 g_ceph_context
->_conf
.set_val("journal_ignore_corruption", "true");
634 g_ceph_context
->_conf
.set_val("journal_write_header_frequency", "1");
635 g_ceph_context
->_conf
.apply_changes(nullptr);
637 vector
<ObjectStore::Transaction
> tls
;
638 for (unsigned i
= 0 ; i
< 3; ++i
) {
639 SCOPED_TRACE(subtests
[i
].description
);
640 fsid
.generate_random();
641 FileJournal
fj(g_ceph_context
, fsid
, finisher
, &sync_cond
, path
,
642 subtests
[i
].directio
, subtests
[i
].aio
, subtests
[i
].faio
);
643 ASSERT_EQ(0, fj
.create());
644 ASSERT_EQ(0, fj
.make_writeable());
646 C_GatherBuilder
gb(g_ceph_context
, new C_SafeCond(&wait_lock
, &cond
, &done
));
648 const char *needle
= "i am a needle";
649 for (unsigned i
= 1; i
<= 4; ++i
) {
652 int orig_len
= fj
.prepare_entry(tls
, &bl
);
653 fj
.reserve_throttle_and_backoff(bl
.length());
654 fj
.submit_entry(i
, bl
, orig_len
, gb
.new_sub());
661 int orig_len
= fj
.prepare_entry(tls
, &bl
);
662 fj
.reserve_throttle_and_backoff(bl
.length());
663 fj
.submit_entry(5, bl
, orig_len
, new C_SafeCond(&wait_lock
, &cond
, &done
));
667 int fd
= open(path
, O_WRONLY
);
669 cout
<< "corrupting journal" << std::endl
;
671 fj
.corrupt_header_magic(fd
, 2);
675 bool corrupt
= false;
676 bool result
= fj
.read_entry(bl
, seq
, &corrupt
);
679 ASSERT_FALSE(corrupt
);
681 result
= fj
.read_entry(bl
, seq
, &corrupt
);
682 ASSERT_FALSE(result
);
683 ASSERT_TRUE(corrupt
);
685 ASSERT_EQ(0, fj
.make_writeable());