1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under the BSD-style license found in the
3 // LICENSE file in the root directory of this source tree. An additional grant
4 // of patent rights can be found in the PATENTS file in the same directory.
6 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7 // Use of this source code is governed by a BSD-style license that can be
8 // found in the LICENSE file. See the AUTHORS file for names of contributors.
11 #include <sys/ioctl.h>
14 #ifdef ROCKSDB_MALLOC_USABLE_SIZE
16 #include <malloc_np.h>
21 #include <sys/types.h>
24 #include <unordered_set>
36 #ifdef ROCKSDB_FALLOCATE_PRESENT
40 #include "env/env_chroot.h"
41 #include "port/port.h"
42 #include "rocksdb/env.h"
43 #include "util/coding.h"
44 #include "util/log_buffer.h"
45 #include "util/mutexlock.h"
46 #include "util/string_util.h"
47 #include "util/sync_point.h"
48 #include "util/testharness.h"
49 #include "util/testutil.h"
52 static const size_t kPageSize
= sysconf(_SC_PAGESIZE
);
54 static const size_t kPageSize
= 4 * 1024;
59 static const int kDelayMicros
= 100000;
62 explicit Deleter(void (*fn
)(void*)) : fn_(fn
) {}
64 void operator()(void* ptr
) {
73 std::unique_ptr
<char, Deleter
> NewAligned(const size_t size
, const char ch
) {
76 if (!(ptr
= reinterpret_cast<char*>(_aligned_malloc(size
, kPageSize
)))) {
77 return std::unique_ptr
<char, Deleter
>(nullptr, Deleter(_aligned_free
));
79 std::unique_ptr
<char, Deleter
> uptr(ptr
, Deleter(_aligned_free
));
81 if (posix_memalign(reinterpret_cast<void**>(&ptr
), kPageSize
, size
) != 0) {
82 return std::unique_ptr
<char, Deleter
>(nullptr, Deleter(free
));
84 std::unique_ptr
<char, Deleter
> uptr(ptr
, Deleter(free
));
86 memset(uptr
.get(), ch
, size
);
90 class EnvPosixTest
: public testing::Test
{
98 EnvPosixTest() : env_(Env::Default()), direct_io_(false) {}
101 class EnvPosixTestWithParam
102 : public EnvPosixTest
,
103 public ::testing::WithParamInterface
<std::pair
<Env
*, bool>> {
105 EnvPosixTestWithParam() {
106 std::pair
<Env
*, bool> param_pair
= GetParam();
107 env_
= param_pair
.first
;
108 direct_io_
= param_pair
.second
;
111 void WaitThreadPoolsEmpty() {
112 // Wait until the thread pools are empty.
113 while (env_
->GetThreadPoolQueueLen(Env::Priority::LOW
) != 0) {
114 Env::Default()->SleepForMicroseconds(kDelayMicros
);
116 while (env_
->GetThreadPoolQueueLen(Env::Priority::HIGH
) != 0) {
117 Env::Default()->SleepForMicroseconds(kDelayMicros
);
121 ~EnvPosixTestWithParam() { WaitThreadPoolsEmpty(); }
124 static void SetBool(void* ptr
) {
125 reinterpret_cast<std::atomic
<bool>*>(ptr
)->store(true);
128 TEST_P(EnvPosixTestWithParam
, RunImmediately
) {
129 std::atomic
<bool> called(false);
130 env_
->Schedule(&SetBool
, &called
);
131 Env::Default()->SleepForMicroseconds(kDelayMicros
);
132 ASSERT_TRUE(called
.load());
133 WaitThreadPoolsEmpty();
136 TEST_P(EnvPosixTestWithParam
, UnSchedule
) {
137 std::atomic
<bool> called(false);
138 env_
->SetBackgroundThreads(1, Env::LOW
);
140 /* Block the low priority queue */
141 test::SleepingBackgroundTask sleeping_task
, sleeping_task1
;
142 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &sleeping_task
,
145 /* Schedule another task */
146 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &sleeping_task1
,
147 Env::Priority::LOW
, &sleeping_task1
);
149 /* Remove it with a different tag */
150 ASSERT_EQ(0, env_
->UnSchedule(&called
, Env::Priority::LOW
));
152 /* Remove it from the queue with the right tag */
153 ASSERT_EQ(1, env_
->UnSchedule(&sleeping_task1
, Env::Priority::LOW
));
155 // Unblock background thread
156 sleeping_task
.WakeUp();
158 /* Schedule another task */
159 env_
->Schedule(&SetBool
, &called
);
160 for (int i
= 0; i
< kDelayMicros
; i
++) {
164 Env::Default()->SleepForMicroseconds(1);
166 ASSERT_TRUE(called
.load());
168 ASSERT_TRUE(!sleeping_task
.IsSleeping() && !sleeping_task1
.IsSleeping());
169 WaitThreadPoolsEmpty();
172 TEST_P(EnvPosixTestWithParam
, RunMany
) {
173 std::atomic
<int> last_id(0);
176 std::atomic
<int>* last_id_ptr
; // Pointer to shared slot
177 int id
; // Order# for the execution of this callback
179 CB(std::atomic
<int>* p
, int i
) : last_id_ptr(p
), id(i
) {}
181 static void Run(void* v
) {
182 CB
* cb
= reinterpret_cast<CB
*>(v
);
183 int cur
= cb
->last_id_ptr
->load();
184 ASSERT_EQ(cb
->id
- 1, cur
);
185 cb
->last_id_ptr
->store(cb
->id
);
189 // Schedule in different order than start time
194 env_
->Schedule(&CB::Run
, &cb1
);
195 env_
->Schedule(&CB::Run
, &cb2
);
196 env_
->Schedule(&CB::Run
, &cb3
);
197 env_
->Schedule(&CB::Run
, &cb4
);
199 Env::Default()->SleepForMicroseconds(kDelayMicros
);
200 int cur
= last_id
.load(std::memory_order_acquire
);
202 WaitThreadPoolsEmpty();
211 static void ThreadBody(void* arg
) {
212 State
* s
= reinterpret_cast<State
*>(arg
);
219 TEST_P(EnvPosixTestWithParam
, StartThread
) {
222 state
.num_running
= 3;
223 for (int i
= 0; i
< 3; i
++) {
224 env_
->StartThread(&ThreadBody
, &state
);
228 int num
= state
.num_running
;
233 Env::Default()->SleepForMicroseconds(kDelayMicros
);
235 ASSERT_EQ(state
.val
, 3);
236 WaitThreadPoolsEmpty();
239 TEST_P(EnvPosixTestWithParam
, TwoPools
) {
240 // Data structures to signal tasks to run.
242 port::CondVar
cv(&mutex
);
243 bool should_start
= false;
247 CB(const std::string
& pool_name
, int pool_size
, port::Mutex
* trigger_mu
,
248 port::CondVar
* trigger_cv
, bool* _should_start
)
252 pool_size_(pool_size
),
253 pool_name_(pool_name
),
254 trigger_mu_(trigger_mu
),
255 trigger_cv_(trigger_cv
),
256 should_start_(_should_start
) {}
258 static void Run(void* v
) {
259 CB
* cb
= reinterpret_cast<CB
*>(v
);
267 // make sure we don't have more than pool_size_ jobs running.
268 ASSERT_LE(num_running_
, pool_size_
.load());
272 MutexLock
l(trigger_mu_
);
273 while (!(*should_start_
)) {
287 return num_finished_
;
290 void Reset(int pool_size
) {
291 pool_size_
.store(pool_size
);
299 std::atomic
<int> pool_size_
;
300 std::string pool_name_
;
301 port::Mutex
* trigger_mu_
;
302 port::CondVar
* trigger_cv_
;
306 const int kLowPoolSize
= 2;
307 const int kHighPoolSize
= 4;
310 CB
low_pool_job("low", kLowPoolSize
, &mutex
, &cv
, &should_start
);
311 CB
high_pool_job("high", kHighPoolSize
, &mutex
, &cv
, &should_start
);
313 env_
->SetBackgroundThreads(kLowPoolSize
);
314 env_
->SetBackgroundThreads(kHighPoolSize
, Env::Priority::HIGH
);
316 ASSERT_EQ(0U, env_
->GetThreadPoolQueueLen(Env::Priority::LOW
));
317 ASSERT_EQ(0U, env_
->GetThreadPoolQueueLen(Env::Priority::HIGH
));
319 // schedule same number of jobs in each pool
320 for (int i
= 0; i
< kJobs
; i
++) {
321 env_
->Schedule(&CB::Run
, &low_pool_job
);
322 env_
->Schedule(&CB::Run
, &high_pool_job
, Env::Priority::HIGH
);
324 // Wait a short while for the jobs to be dispatched.
326 while ((unsigned int)(kJobs
- kLowPoolSize
) !=
327 env_
->GetThreadPoolQueueLen(Env::Priority::LOW
) ||
328 (unsigned int)(kJobs
- kHighPoolSize
) !=
329 env_
->GetThreadPoolQueueLen(Env::Priority::HIGH
)) {
330 env_
->SleepForMicroseconds(kDelayMicros
);
331 if (++sleep_count
> 100) {
336 ASSERT_EQ((unsigned int)(kJobs
- kLowPoolSize
),
337 env_
->GetThreadPoolQueueLen());
338 ASSERT_EQ((unsigned int)(kJobs
- kLowPoolSize
),
339 env_
->GetThreadPoolQueueLen(Env::Priority::LOW
));
340 ASSERT_EQ((unsigned int)(kJobs
- kHighPoolSize
),
341 env_
->GetThreadPoolQueueLen(Env::Priority::HIGH
));
343 // Trigger jobs to run.
350 // wait for all jobs to finish
351 while (low_pool_job
.NumFinished() < kJobs
||
352 high_pool_job
.NumFinished() < kJobs
) {
353 env_
->SleepForMicroseconds(kDelayMicros
);
356 ASSERT_EQ(0U, env_
->GetThreadPoolQueueLen(Env::Priority::LOW
));
357 ASSERT_EQ(0U, env_
->GetThreadPoolQueueLen(Env::Priority::HIGH
));
359 // Hold jobs to schedule;
360 should_start
= false;
362 // call IncBackgroundThreadsIfNeeded to two pools. One increasing and
363 // the other decreasing
364 env_
->IncBackgroundThreadsIfNeeded(kLowPoolSize
- 1, Env::Priority::LOW
);
365 env_
->IncBackgroundThreadsIfNeeded(kHighPoolSize
+ 1, Env::Priority::HIGH
);
366 high_pool_job
.Reset(kHighPoolSize
+ 1);
367 low_pool_job
.Reset(kLowPoolSize
);
369 // schedule same number of jobs in each pool
370 for (int i
= 0; i
< kJobs
; i
++) {
371 env_
->Schedule(&CB::Run
, &low_pool_job
);
372 env_
->Schedule(&CB::Run
, &high_pool_job
, Env::Priority::HIGH
);
374 // Wait a short while for the jobs to be dispatched.
376 while ((unsigned int)(kJobs
- kLowPoolSize
) !=
377 env_
->GetThreadPoolQueueLen(Env::Priority::LOW
) ||
378 (unsigned int)(kJobs
- (kHighPoolSize
+ 1)) !=
379 env_
->GetThreadPoolQueueLen(Env::Priority::HIGH
)) {
380 env_
->SleepForMicroseconds(kDelayMicros
);
381 if (++sleep_count
> 100) {
385 ASSERT_EQ((unsigned int)(kJobs
- kLowPoolSize
),
386 env_
->GetThreadPoolQueueLen());
387 ASSERT_EQ((unsigned int)(kJobs
- kLowPoolSize
),
388 env_
->GetThreadPoolQueueLen(Env::Priority::LOW
));
389 ASSERT_EQ((unsigned int)(kJobs
- (kHighPoolSize
+ 1)),
390 env_
->GetThreadPoolQueueLen(Env::Priority::HIGH
));
392 // Trigger jobs to run.
399 // wait for all jobs to finish
400 while (low_pool_job
.NumFinished() < kJobs
||
401 high_pool_job
.NumFinished() < kJobs
) {
402 env_
->SleepForMicroseconds(kDelayMicros
);
405 env_
->SetBackgroundThreads(kHighPoolSize
, Env::Priority::HIGH
);
406 WaitThreadPoolsEmpty();
409 TEST_P(EnvPosixTestWithParam
, DecreaseNumBgThreads
) {
410 std::vector
<test::SleepingBackgroundTask
> tasks(10);
412 // Set number of thread to 1 first.
413 env_
->SetBackgroundThreads(1, Env::Priority::HIGH
);
414 Env::Default()->SleepForMicroseconds(kDelayMicros
);
416 // Schedule 3 tasks. 0 running; Task 1, 2 waiting.
417 for (size_t i
= 0; i
< 3; i
++) {
418 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &tasks
[i
],
419 Env::Priority::HIGH
);
420 Env::Default()->SleepForMicroseconds(kDelayMicros
);
422 ASSERT_EQ(2U, env_
->GetThreadPoolQueueLen(Env::Priority::HIGH
));
423 ASSERT_TRUE(tasks
[0].IsSleeping());
424 ASSERT_TRUE(!tasks
[1].IsSleeping());
425 ASSERT_TRUE(!tasks
[2].IsSleeping());
427 // Increase to 2 threads. Task 0, 1 running; 2 waiting
428 env_
->SetBackgroundThreads(2, Env::Priority::HIGH
);
429 Env::Default()->SleepForMicroseconds(kDelayMicros
);
430 ASSERT_EQ(1U, env_
->GetThreadPoolQueueLen(Env::Priority::HIGH
));
431 ASSERT_TRUE(tasks
[0].IsSleeping());
432 ASSERT_TRUE(tasks
[1].IsSleeping());
433 ASSERT_TRUE(!tasks
[2].IsSleeping());
435 // Shrink back to 1 thread. Still task 0, 1 running, 2 waiting
436 env_
->SetBackgroundThreads(1, Env::Priority::HIGH
);
437 Env::Default()->SleepForMicroseconds(kDelayMicros
);
438 ASSERT_EQ(1U, env_
->GetThreadPoolQueueLen(Env::Priority::HIGH
));
439 ASSERT_TRUE(tasks
[0].IsSleeping());
440 ASSERT_TRUE(tasks
[1].IsSleeping());
441 ASSERT_TRUE(!tasks
[2].IsSleeping());
443 // The last task finishes. Task 0 running, 2 waiting.
445 Env::Default()->SleepForMicroseconds(kDelayMicros
);
446 ASSERT_EQ(1U, env_
->GetThreadPoolQueueLen(Env::Priority::HIGH
));
447 ASSERT_TRUE(tasks
[0].IsSleeping());
448 ASSERT_TRUE(!tasks
[1].IsSleeping());
449 ASSERT_TRUE(!tasks
[2].IsSleeping());
451 // Increase to 5 threads. Task 0 and 2 running.
452 env_
->SetBackgroundThreads(5, Env::Priority::HIGH
);
453 Env::Default()->SleepForMicroseconds(kDelayMicros
);
454 ASSERT_EQ((unsigned int)0, env_
->GetThreadPoolQueueLen(Env::Priority::HIGH
));
455 ASSERT_TRUE(tasks
[0].IsSleeping());
456 ASSERT_TRUE(tasks
[2].IsSleeping());
458 // Change number of threads a couple of times while there is no sufficient
460 env_
->SetBackgroundThreads(7, Env::Priority::HIGH
);
461 Env::Default()->SleepForMicroseconds(kDelayMicros
);
463 ASSERT_EQ(0U, env_
->GetThreadPoolQueueLen(Env::Priority::HIGH
));
464 env_
->SetBackgroundThreads(3, Env::Priority::HIGH
);
465 Env::Default()->SleepForMicroseconds(kDelayMicros
);
466 ASSERT_EQ(0U, env_
->GetThreadPoolQueueLen(Env::Priority::HIGH
));
467 env_
->SetBackgroundThreads(4, Env::Priority::HIGH
);
468 Env::Default()->SleepForMicroseconds(kDelayMicros
);
469 ASSERT_EQ(0U, env_
->GetThreadPoolQueueLen(Env::Priority::HIGH
));
470 env_
->SetBackgroundThreads(5, Env::Priority::HIGH
);
471 Env::Default()->SleepForMicroseconds(kDelayMicros
);
472 ASSERT_EQ(0U, env_
->GetThreadPoolQueueLen(Env::Priority::HIGH
));
473 env_
->SetBackgroundThreads(4, Env::Priority::HIGH
);
474 Env::Default()->SleepForMicroseconds(kDelayMicros
);
475 ASSERT_EQ(0U, env_
->GetThreadPoolQueueLen(Env::Priority::HIGH
));
477 Env::Default()->SleepForMicroseconds(kDelayMicros
* 50);
479 // Enqueue 5 more tasks. Thread pool size now is 4.
480 // Task 0, 3, 4, 5 running;6, 7 waiting.
481 for (size_t i
= 3; i
< 8; i
++) {
482 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &tasks
[i
],
483 Env::Priority::HIGH
);
485 Env::Default()->SleepForMicroseconds(kDelayMicros
);
486 ASSERT_EQ(2U, env_
->GetThreadPoolQueueLen(Env::Priority::HIGH
));
487 ASSERT_TRUE(tasks
[3].IsSleeping());
488 ASSERT_TRUE(tasks
[4].IsSleeping());
489 ASSERT_TRUE(tasks
[5].IsSleeping());
490 ASSERT_TRUE(!tasks
[6].IsSleeping());
491 ASSERT_TRUE(!tasks
[7].IsSleeping());
493 // Wake up task 0, 3 and 4. Task 5, 6, 7 running.
498 Env::Default()->SleepForMicroseconds(kDelayMicros
);
499 ASSERT_EQ((unsigned int)0, env_
->GetThreadPoolQueueLen(Env::Priority::HIGH
));
500 for (size_t i
= 5; i
< 8; i
++) {
501 ASSERT_TRUE(tasks
[i
].IsSleeping());
504 // Shrink back to 1 thread. Still task 5, 6, 7 running
505 env_
->SetBackgroundThreads(1, Env::Priority::HIGH
);
506 Env::Default()->SleepForMicroseconds(kDelayMicros
);
507 ASSERT_TRUE(tasks
[5].IsSleeping());
508 ASSERT_TRUE(tasks
[6].IsSleeping());
509 ASSERT_TRUE(tasks
[7].IsSleeping());
511 // Wake up task 6. Task 5, 7 running
513 Env::Default()->SleepForMicroseconds(kDelayMicros
);
514 ASSERT_TRUE(tasks
[5].IsSleeping());
515 ASSERT_TRUE(!tasks
[6].IsSleeping());
516 ASSERT_TRUE(tasks
[7].IsSleeping());
518 // Wake up threads 7. Task 5 running
520 Env::Default()->SleepForMicroseconds(kDelayMicros
);
521 ASSERT_TRUE(!tasks
[7].IsSleeping());
523 // Enqueue thread 8 and 9. Task 5 running; one of 8, 9 might be running.
524 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &tasks
[8],
525 Env::Priority::HIGH
);
526 env_
->Schedule(&test::SleepingBackgroundTask::DoSleepTask
, &tasks
[9],
527 Env::Priority::HIGH
);
528 Env::Default()->SleepForMicroseconds(kDelayMicros
);
529 ASSERT_GT(env_
->GetThreadPoolQueueLen(Env::Priority::HIGH
), (unsigned int)0);
530 ASSERT_TRUE(!tasks
[8].IsSleeping() || !tasks
[9].IsSleeping());
532 // Increase to 4 threads. Task 5, 8, 9 running.
533 env_
->SetBackgroundThreads(4, Env::Priority::HIGH
);
534 Env::Default()->SleepForMicroseconds(kDelayMicros
);
535 ASSERT_EQ((unsigned int)0, env_
->GetThreadPoolQueueLen(Env::Priority::HIGH
));
536 ASSERT_TRUE(tasks
[8].IsSleeping());
537 ASSERT_TRUE(tasks
[9].IsSleeping());
539 // Shrink to 1 thread
540 env_
->SetBackgroundThreads(1, Env::Priority::HIGH
);
544 Env::Default()->SleepForMicroseconds(kDelayMicros
);
545 ASSERT_TRUE(!tasks
[9].IsSleeping());
546 ASSERT_TRUE(tasks
[8].IsSleeping());
550 Env::Default()->SleepForMicroseconds(kDelayMicros
);
551 ASSERT_TRUE(!tasks
[8].IsSleeping());
553 // Wake up the last thread
556 Env::Default()->SleepForMicroseconds(kDelayMicros
);
557 ASSERT_TRUE(!tasks
[5].IsSleeping());
558 WaitThreadPoolsEmpty();
561 #if (defined OS_LINUX || defined OS_WIN)
562 // Travis doesn't support fallocate or getting unique ID from files for whatever
567 bool IsSingleVarint(const std::string
& s
) {
571 if (!GetVarint64(&slice
, &v
)) {
575 return slice
.size() == 0;
578 bool IsUniqueIDValid(const std::string
& s
) {
579 return !s
.empty() && !IsSingleVarint(s
);
582 const size_t MAX_ID_SIZE
= 100;
583 char temp_id
[MAX_ID_SIZE
];
588 // Determine whether we can use the FS_IOC_GETVERSION ioctl
589 // on a file in directory DIR. Create a temporary file therein,
590 // try to apply the ioctl (save that result), cleanup and
591 // return the result. Return true if it is supported, and
592 // false if anything fails.
593 // Note that this function "knows" that dir has just been created
594 // and is empty, so we create a simply-named test file: "f".
595 bool ioctl_support__FS_IOC_GETVERSION(const std::string
& dir
) {
599 const std::string file
= dir
+ "/f";
602 fd
= open(file
.c_str(), O_CREAT
| O_RDWR
| O_TRUNC
, 0644);
603 } while (fd
< 0 && errno
== EINTR
);
605 bool ok
= (fd
>= 0 && ioctl(fd
, FS_IOC_GETVERSION
, &version
) >= 0);
608 unlink(file
.c_str());
614 // To ensure that Env::GetUniqueId-related tests work correctly, the files
615 // should be stored in regular storage like "hard disk" or "flash device",
616 // and not on a tmpfs file system (like /dev/shm and /tmp on some systems).
617 // Otherwise we cannot get the correct id.
619 // This function serves as the replacement for test::TmpDir(), which may be
620 // customized to be on a file system that doesn't work with GetUniqueId().
622 class IoctlFriendlyTmpdir
{
624 explicit IoctlFriendlyTmpdir() {
627 const char *fmt
= "%s/rocksdb.XXXXXX";
628 const char *tmp
= getenv("TEST_IOCTL_FRIENDLY_TMPDIR");
636 snprintf(dir_buf
, sizeof dir_buf
, fmt
, tmp
);
637 auto result
= _mktemp(dir_buf
);
638 assert(result
!= nullptr);
639 BOOL ret
= CreateDirectory(dir_buf
, NULL
);
643 std::list
<std::string
> candidate_dir_list
= {"/var/tmp", "/tmp"};
645 // If $TEST_IOCTL_FRIENDLY_TMPDIR/rocksdb.XXXXXX fits, use
646 // $TEST_IOCTL_FRIENDLY_TMPDIR; subtract 2 for the "%s", and
647 // add 1 for the trailing NUL byte.
648 if (tmp
&& strlen(tmp
) + strlen(fmt
) - 2 + 1 <= sizeof dir_buf
) {
649 // use $TEST_IOCTL_FRIENDLY_TMPDIR value
650 candidate_dir_list
.push_front(tmp
);
653 for (const std::string
& d
: candidate_dir_list
) {
654 snprintf(dir_buf
, sizeof dir_buf
, fmt
, d
.c_str());
655 if (mkdtemp(dir_buf
)) {
656 if (ioctl_support__FS_IOC_GETVERSION(dir_buf
)) {
660 // Diagnose ioctl-related failure only if this is the
661 // directory specified via that envvar.
662 if (tmp
&& tmp
== d
) {
663 fprintf(stderr
, "TEST_IOCTL_FRIENDLY_TMPDIR-specified directory is "
664 "not suitable: %s\n", d
.c_str());
666 rmdir(dir_buf
); // ignore failure
669 // mkdtemp failed: diagnose it, but don't give up.
670 fprintf(stderr
, "mkdtemp(%s/...) failed: %s\n", d
.c_str(),
675 fprintf(stderr
, "failed to find an ioctl-friendly temporary directory;"
676 " specify one via the TEST_IOCTL_FRIENDLY_TMPDIR envvar\n");
681 ~IoctlFriendlyTmpdir() {
685 const std::string
& name() const {
694 TEST_F(EnvPosixTest
, PositionedAppend
) {
695 unique_ptr
<WritableFile
> writable_file
;
697 options
.use_direct_writes
= true;
698 options
.use_mmap_writes
= false;
699 IoctlFriendlyTmpdir ift
;
700 ASSERT_OK(env_
->NewWritableFile(ift
.name() + "/f", &writable_file
, options
));
701 const size_t kBlockSize
= 4096;
702 const size_t kPageSize
= 4096;
703 const size_t kDataSize
= kPageSize
;
704 // Write a page worth of 'a'
705 auto data_ptr
= NewAligned(kDataSize
, 'a');
706 Slice
data_a(data_ptr
.get(), kDataSize
);
707 ASSERT_OK(writable_file
->PositionedAppend(data_a
, 0U));
708 // Write a page worth of 'b' right after the first sector
709 data_ptr
= NewAligned(kDataSize
, 'b');
710 Slice
data_b(data_ptr
.get(), kDataSize
);
711 ASSERT_OK(writable_file
->PositionedAppend(data_b
, kBlockSize
));
712 ASSERT_OK(writable_file
->Close());
713 // The file now has 1 sector worth of a followed by a page worth of b
716 unique_ptr
<SequentialFile
> seq_file
;
717 ASSERT_OK(env_
->NewSequentialFile(ift
.name() + "/f", &seq_file
, options
));
718 char scratch
[kPageSize
* 2];
720 ASSERT_OK(seq_file
->Read(sizeof(scratch
), &result
, scratch
));
721 ASSERT_EQ(kPageSize
+ kBlockSize
, result
.size());
722 ASSERT_EQ('a', result
[kBlockSize
- 1]);
723 ASSERT_EQ('b', result
[kBlockSize
]);
725 #endif // !ROCKSDB_LITE
727 // Only works in linux platforms
728 TEST_P(EnvPosixTestWithParam
, RandomAccessUniqueID
) {
730 if (env_
== Env::Default()) {
732 soptions
.use_direct_reads
= soptions
.use_direct_writes
= direct_io_
;
733 IoctlFriendlyTmpdir ift
;
734 std::string fname
= ift
.name() + "/testfile";
735 unique_ptr
<WritableFile
> wfile
;
736 ASSERT_OK(env_
->NewWritableFile(fname
, &wfile
, soptions
));
738 unique_ptr
<RandomAccessFile
> file
;
741 ASSERT_OK(env_
->NewRandomAccessFile(fname
, &file
, soptions
));
742 size_t id_size
= file
->GetUniqueId(temp_id
, MAX_ID_SIZE
);
743 ASSERT_TRUE(id_size
> 0);
744 std::string
unique_id1(temp_id
, id_size
);
745 ASSERT_TRUE(IsUniqueIDValid(unique_id1
));
747 // Get Unique ID again
748 ASSERT_OK(env_
->NewRandomAccessFile(fname
, &file
, soptions
));
749 id_size
= file
->GetUniqueId(temp_id
, MAX_ID_SIZE
);
750 ASSERT_TRUE(id_size
> 0);
751 std::string
unique_id2(temp_id
, id_size
);
752 ASSERT_TRUE(IsUniqueIDValid(unique_id2
));
754 // Get Unique ID again after waiting some time.
755 env_
->SleepForMicroseconds(1000000);
756 ASSERT_OK(env_
->NewRandomAccessFile(fname
, &file
, soptions
));
757 id_size
= file
->GetUniqueId(temp_id
, MAX_ID_SIZE
);
758 ASSERT_TRUE(id_size
> 0);
759 std::string
unique_id3(temp_id
, id_size
);
760 ASSERT_TRUE(IsUniqueIDValid(unique_id3
));
762 // Check IDs are the same.
763 ASSERT_EQ(unique_id1
, unique_id2
);
764 ASSERT_EQ(unique_id2
, unique_id3
);
767 env_
->DeleteFile(fname
);
771 // only works in linux platforms
772 #ifdef ROCKSDB_FALLOCATE_PRESENT
773 TEST_P(EnvPosixTestWithParam
, AllocateTest
) {
774 if (env_
== Env::Default()) {
775 IoctlFriendlyTmpdir ift
;
776 std::string fname
= ift
.name() + "/preallocate_testfile";
778 // Try fallocate in a file to see whether the target file system supports
780 // Skip the test if fallocate is not supported.
781 std::string fname_test_fallocate
= ift
.name() + "/preallocate_testfile_2";
784 fd
= open(fname_test_fallocate
.c_str(), O_CREAT
| O_RDWR
| O_TRUNC
, 0644);
785 } while (fd
< 0 && errno
== EINTR
);
788 int alloc_status
= fallocate(fd
, 0, 0, 1);
791 if (alloc_status
!= 0) {
793 fprintf(stderr
, "Warning: fallocate() fails, %s\n", strerror(err_number
));
796 ASSERT_OK(env_
->DeleteFile(fname_test_fallocate
));
797 if (alloc_status
!= 0 && err_number
== EOPNOTSUPP
) {
798 // The filesystem containing the file does not support fallocate
803 soptions
.use_mmap_writes
= false;
804 soptions
.use_direct_reads
= soptions
.use_direct_writes
= direct_io_
;
805 unique_ptr
<WritableFile
> wfile
;
806 ASSERT_OK(env_
->NewWritableFile(fname
, &wfile
, soptions
));
809 size_t kPreallocateSize
= 100 * 1024 * 1024;
810 size_t kBlockSize
= 512;
811 size_t kPageSize
= 4096;
812 size_t kDataSize
= 1024 * 1024;
813 auto data_ptr
= NewAligned(kDataSize
, 'A');
814 Slice
data(data_ptr
.get(), kDataSize
);
815 wfile
->SetPreallocationBlockSize(kPreallocateSize
);
816 wfile
->PrepareWrite(wfile
->GetFileSize(), kDataSize
);
817 ASSERT_OK(wfile
->Append(data
));
818 ASSERT_OK(wfile
->Flush());
821 ASSERT_EQ(stat(fname
.c_str(), &f_stat
), 0);
822 ASSERT_EQ((unsigned int)kDataSize
, f_stat
.st_size
);
823 // verify that blocks are preallocated
824 // Note here that we don't check the exact number of blocks preallocated --
825 // we only require that number of allocated blocks is at least what we
827 // It looks like some FS give us more blocks that we asked for. That's fine.
828 // It might be worth investigating further.
829 ASSERT_LE((unsigned int)(kPreallocateSize
/ kBlockSize
), f_stat
.st_blocks
);
831 // close the file, should deallocate the blocks
834 stat(fname
.c_str(), &f_stat
);
835 ASSERT_EQ((unsigned int)kDataSize
, f_stat
.st_size
);
836 // verify that preallocated blocks were deallocated on file close
837 // Because the FS might give us more blocks, we add a full page to the size
838 // and expect the number of blocks to be less or equal to that.
839 ASSERT_GE((f_stat
.st_size
+ kPageSize
+ kBlockSize
- 1) / kBlockSize
,
840 (unsigned int)f_stat
.st_blocks
);
843 #endif // ROCKSDB_FALLOCATE_PRESENT
845 // Returns true if any of the strings in ss are the prefix of another string.
846 bool HasPrefix(const std::unordered_set
<std::string
>& ss
) {
847 for (const std::string
& s
: ss
) {
851 for (size_t i
= 1; i
< s
.size(); ++i
) {
852 if (ss
.count(s
.substr(0, i
)) != 0) {
860 // Only works in linux and WIN platforms
861 TEST_P(EnvPosixTestWithParam
, RandomAccessUniqueIDConcurrent
) {
862 if (env_
== Env::Default()) {
863 // Check whether a bunch of concurrently existing files have unique IDs.
865 soptions
.use_direct_reads
= soptions
.use_direct_writes
= direct_io_
;
868 IoctlFriendlyTmpdir ift
;
869 std::vector
<std::string
> fnames
;
870 for (int i
= 0; i
< 1000; ++i
) {
871 fnames
.push_back(ift
.name() + "/" + "testfile" + ToString(i
));
874 unique_ptr
<WritableFile
> wfile
;
875 ASSERT_OK(env_
->NewWritableFile(fnames
[i
], &wfile
, soptions
));
878 // Collect and check whether the IDs are unique.
879 std::unordered_set
<std::string
> ids
;
880 for (const std::string fname
: fnames
) {
881 unique_ptr
<RandomAccessFile
> file
;
882 std::string unique_id
;
883 ASSERT_OK(env_
->NewRandomAccessFile(fname
, &file
, soptions
));
884 size_t id_size
= file
->GetUniqueId(temp_id
, MAX_ID_SIZE
);
885 ASSERT_TRUE(id_size
> 0);
886 unique_id
= std::string(temp_id
, id_size
);
887 ASSERT_TRUE(IsUniqueIDValid(unique_id
));
889 ASSERT_TRUE(ids
.count(unique_id
) == 0);
890 ids
.insert(unique_id
);
894 for (const std::string fname
: fnames
) {
895 ASSERT_OK(env_
->DeleteFile(fname
));
898 ASSERT_TRUE(!HasPrefix(ids
));
902 // Only works in linux and WIN platforms
903 TEST_P(EnvPosixTestWithParam
, RandomAccessUniqueIDDeletes
) {
904 if (env_
== Env::Default()) {
906 soptions
.use_direct_reads
= soptions
.use_direct_writes
= direct_io_
;
908 IoctlFriendlyTmpdir ift
;
909 std::string fname
= ift
.name() + "/" + "testfile";
911 // Check that after file is deleted we don't get same ID again in a new
913 std::unordered_set
<std::string
> ids
;
914 for (int i
= 0; i
< 1000; ++i
) {
917 unique_ptr
<WritableFile
> wfile
;
918 ASSERT_OK(env_
->NewWritableFile(fname
, &wfile
, soptions
));
922 std::string unique_id
;
924 unique_ptr
<RandomAccessFile
> file
;
925 ASSERT_OK(env_
->NewRandomAccessFile(fname
, &file
, soptions
));
926 size_t id_size
= file
->GetUniqueId(temp_id
, MAX_ID_SIZE
);
927 ASSERT_TRUE(id_size
> 0);
928 unique_id
= std::string(temp_id
, id_size
);
931 ASSERT_TRUE(IsUniqueIDValid(unique_id
));
932 ASSERT_TRUE(ids
.count(unique_id
) == 0);
933 ids
.insert(unique_id
);
936 ASSERT_OK(env_
->DeleteFile(fname
));
939 ASSERT_TRUE(!HasPrefix(ids
));
943 // Only works in linux platforms
945 TEST_P(EnvPosixTestWithParam
, DISABLED_InvalidateCache
) {
947 TEST_P(EnvPosixTestWithParam
, InvalidateCache
) {
949 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
951 soptions
.use_direct_reads
= soptions
.use_direct_writes
= direct_io_
;
952 std::string fname
= test::TmpDir(env_
) + "/" + "testfile";
954 const size_t kSectorSize
= 512;
955 auto data
= NewAligned(kSectorSize
, 0);
956 Slice
slice(data
.get(), kSectorSize
);
960 unique_ptr
<WritableFile
> wfile
;
961 #if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && !defined(OS_AIX)
962 if (soptions
.use_direct_writes
) {
963 soptions
.use_direct_writes
= false;
966 ASSERT_OK(env_
->NewWritableFile(fname
, &wfile
, soptions
));
967 ASSERT_OK(wfile
->Append(slice
));
968 ASSERT_OK(wfile
->InvalidateCache(0, 0));
969 ASSERT_OK(wfile
->Close());
974 unique_ptr
<RandomAccessFile
> file
;
975 auto scratch
= NewAligned(kSectorSize
, 0);
977 #if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && !defined(OS_AIX)
978 if (soptions
.use_direct_reads
) {
979 soptions
.use_direct_reads
= false;
982 ASSERT_OK(env_
->NewRandomAccessFile(fname
, &file
, soptions
));
983 ASSERT_OK(file
->Read(0, kSectorSize
, &result
, scratch
.get()));
984 ASSERT_EQ(memcmp(scratch
.get(), data
.get(), kSectorSize
), 0);
985 ASSERT_OK(file
->InvalidateCache(0, 11));
986 ASSERT_OK(file
->InvalidateCache(0, 0));
991 unique_ptr
<SequentialFile
> file
;
992 auto scratch
= NewAligned(kSectorSize
, 0);
994 #if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && !defined(OS_AIX)
995 if (soptions
.use_direct_reads
) {
996 soptions
.use_direct_reads
= false;
999 ASSERT_OK(env_
->NewSequentialFile(fname
, &file
, soptions
));
1000 if (file
->use_direct_io()) {
1001 ASSERT_OK(file
->PositionedRead(0, kSectorSize
, &result
, scratch
.get()));
1003 ASSERT_OK(file
->Read(kSectorSize
, &result
, scratch
.get()));
1005 ASSERT_EQ(memcmp(scratch
.get(), data
.get(), kSectorSize
), 0);
1006 ASSERT_OK(file
->InvalidateCache(0, 11));
1007 ASSERT_OK(file
->InvalidateCache(0, 0));
1010 ASSERT_OK(env_
->DeleteFile(fname
));
1011 rocksdb::SyncPoint::GetInstance()->ClearTrace();
1013 #endif // not TRAVIS
1014 #endif // OS_LINUX || OS_WIN
1016 class TestLogger
: public Logger
{
1019 virtual void Logv(const char* format
, va_list ap
) override
{
1022 char new_format
[550];
1023 std::fill_n(new_format
, sizeof(new_format
), '2');
1026 va_copy(backup_ap
, ap
);
1027 int n
= vsnprintf(new_format
, sizeof(new_format
) - 1, format
, backup_ap
);
1028 // 48 bytes for extra information + bytes allocated
1030 // When we have n == -1 there is not a terminating zero expected
1037 if (new_format
[0] == '[') {
1039 ASSERT_TRUE(n
<= 56 + (512 - static_cast<int>(sizeof(struct timeval
))));
1041 ASSERT_TRUE(n
<= 48 + (512 - static_cast<int>(sizeof(struct timeval
))));
1046 for (size_t i
= 0; i
< sizeof(new_format
); i
++) {
1047 if (new_format
[i
] == 'x') {
1049 } else if (new_format
[i
] == '\0') {
1059 TEST_P(EnvPosixTestWithParam
, LogBufferTest
) {
1060 TestLogger test_logger
;
1061 test_logger
.SetInfoLogLevel(InfoLogLevel::INFO_LEVEL
);
1062 test_logger
.log_count
= 0;
1063 test_logger
.char_x_count
= 0;
1064 test_logger
.char_0_count
= 0;
1065 LogBuffer
log_buffer(InfoLogLevel::INFO_LEVEL
, &test_logger
);
1066 LogBuffer
log_buffer_debug(DEBUG_LEVEL
, &test_logger
);
1069 std::fill_n(bytes200
, sizeof(bytes200
), '1');
1070 bytes200
[sizeof(bytes200
) - 1] = '\0';
1072 std::fill_n(bytes600
, sizeof(bytes600
), '1');
1073 bytes600
[sizeof(bytes600
) - 1] = '\0';
1074 char bytes9000
[9000];
1075 std::fill_n(bytes9000
, sizeof(bytes9000
), '1');
1076 bytes9000
[sizeof(bytes9000
) - 1] = '\0';
1078 ROCKS_LOG_BUFFER(&log_buffer
, "x%sx", bytes200
);
1079 ROCKS_LOG_BUFFER(&log_buffer
, "x%sx", bytes600
);
1080 ROCKS_LOG_BUFFER(&log_buffer
, "x%sx%sx%sx", bytes200
, bytes200
, bytes200
);
1081 ROCKS_LOG_BUFFER(&log_buffer
, "x%sx%sx", bytes200
, bytes600
);
1082 ROCKS_LOG_BUFFER(&log_buffer
, "x%sx%sx", bytes600
, bytes9000
);
1084 ROCKS_LOG_BUFFER(&log_buffer_debug
, "x%sx", bytes200
);
1085 test_logger
.SetInfoLogLevel(DEBUG_LEVEL
);
1086 ROCKS_LOG_BUFFER(&log_buffer_debug
, "x%sx%sx%sx", bytes600
, bytes9000
,
1089 ASSERT_EQ(0, test_logger
.log_count
);
1090 log_buffer
.FlushBufferToLog();
1091 log_buffer_debug
.FlushBufferToLog();
1092 ASSERT_EQ(6, test_logger
.log_count
);
1093 ASSERT_EQ(6, test_logger
.char_0_count
);
1094 ASSERT_EQ(10, test_logger
.char_x_count
);
1097 class TestLogger2
: public Logger
{
1099 explicit TestLogger2(size_t max_log_size
) : max_log_size_(max_log_size
) {}
1101 virtual void Logv(const char* format
, va_list ap
) override
{
1102 char new_format
[2000];
1103 std::fill_n(new_format
, sizeof(new_format
), '2');
1106 va_copy(backup_ap
, ap
);
1107 int n
= vsnprintf(new_format
, sizeof(new_format
) - 1, format
, backup_ap
);
1108 // 48 bytes for extra information + bytes allocated
1110 n
<= 48 + static_cast<int>(max_log_size_
- sizeof(struct timeval
)));
1111 ASSERT_TRUE(n
> static_cast<int>(max_log_size_
- sizeof(struct timeval
)));
1115 size_t max_log_size_
;
1118 TEST_P(EnvPosixTestWithParam
, LogBufferMaxSizeTest
) {
1119 char bytes9000
[9000];
1120 std::fill_n(bytes9000
, sizeof(bytes9000
), '1');
1121 bytes9000
[sizeof(bytes9000
) - 1] = '\0';
1123 for (size_t max_log_size
= 256; max_log_size
<= 1024;
1124 max_log_size
+= 1024 - 256) {
1125 TestLogger2
test_logger(max_log_size
);
1126 test_logger
.SetInfoLogLevel(InfoLogLevel::INFO_LEVEL
);
1127 LogBuffer
log_buffer(InfoLogLevel::INFO_LEVEL
, &test_logger
);
1128 ROCKS_LOG_BUFFER_MAX_SZ(&log_buffer
, max_log_size
, "%s", bytes9000
);
1129 log_buffer
.FlushBufferToLog();
1133 TEST_P(EnvPosixTestWithParam
, Preallocation
) {
1134 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
1135 const std::string src
= test::TmpDir(env_
) + "/" + "testfile";
1136 unique_ptr
<WritableFile
> srcfile
;
1137 EnvOptions soptions
;
1138 soptions
.use_direct_reads
= soptions
.use_direct_writes
= direct_io_
;
1139 #if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && !defined(OS_AIX)
1140 if (soptions
.use_direct_writes
) {
1141 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1142 "NewWritableFile:O_DIRECT", [&](void* arg
) {
1143 int* val
= static_cast<int*>(arg
);
1148 ASSERT_OK(env_
->NewWritableFile(src
, &srcfile
, soptions
));
1149 srcfile
->SetPreallocationBlockSize(1024 * 1024);
1151 // No writes should mean no preallocation
1152 size_t block_size
, last_allocated_block
;
1153 srcfile
->GetPreallocationStatus(&block_size
, &last_allocated_block
);
1154 ASSERT_EQ(last_allocated_block
, 0UL);
1156 // Small write should preallocate one block
1157 size_t kStrSize
= 4096;
1158 auto data
= NewAligned(kStrSize
, 'A');
1159 Slice
str(data
.get(), kStrSize
);
1160 srcfile
->PrepareWrite(srcfile
->GetFileSize(), kStrSize
);
1161 srcfile
->Append(str
);
1162 srcfile
->GetPreallocationStatus(&block_size
, &last_allocated_block
);
1163 ASSERT_EQ(last_allocated_block
, 1UL);
1165 // Write an entire preallocation block, make sure we increased by two.
1167 auto buf_ptr
= NewAligned(block_size
, ' ');
1168 Slice
buf(buf_ptr
.get(), block_size
);
1169 srcfile
->PrepareWrite(srcfile
->GetFileSize(), block_size
);
1170 srcfile
->Append(buf
);
1171 srcfile
->GetPreallocationStatus(&block_size
, &last_allocated_block
);
1172 ASSERT_EQ(last_allocated_block
, 2UL);
1175 // Write five more blocks at once, ensure we're where we need to be.
1177 auto buf_ptr
= NewAligned(block_size
* 5, ' ');
1178 Slice buf
= Slice(buf_ptr
.get(), block_size
* 5);
1179 srcfile
->PrepareWrite(srcfile
->GetFileSize(), buf
.size());
1180 srcfile
->Append(buf
);
1181 srcfile
->GetPreallocationStatus(&block_size
, &last_allocated_block
);
1182 ASSERT_EQ(last_allocated_block
, 7UL);
1184 rocksdb::SyncPoint::GetInstance()->ClearTrace();
1187 // Test that the two ways to get children file attributes (in bulk or
1188 // individually) behave consistently.
1189 TEST_P(EnvPosixTestWithParam
, ConsistentChildrenAttributes
) {
1190 rocksdb::SyncPoint::GetInstance()->EnableProcessing();
1191 EnvOptions soptions
;
1192 soptions
.use_direct_reads
= soptions
.use_direct_writes
= direct_io_
;
1193 const int kNumChildren
= 10;
1196 for (int i
= 0; i
< kNumChildren
; ++i
) {
1197 std::ostringstream oss
;
1198 oss
<< test::TmpDir(env_
) << "/testfile_" << i
;
1199 const std::string path
= oss
.str();
1200 unique_ptr
<WritableFile
> file
;
1201 #if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && !defined(OS_AIX)
1202 if (soptions
.use_direct_writes
) {
1203 rocksdb::SyncPoint::GetInstance()->SetCallBack(
1204 "NewWritableFile:O_DIRECT", [&](void* arg
) {
1205 int* val
= static_cast<int*>(arg
);
1210 ASSERT_OK(env_
->NewWritableFile(path
, &file
, soptions
));
1211 auto buf_ptr
= NewAligned(data
.size(), 'T');
1212 Slice
buf(buf_ptr
.get(), data
.size());
1214 data
.append(std::string(4096, 'T'));
1217 std::vector
<Env::FileAttributes
> file_attrs
;
1218 ASSERT_OK(env_
->GetChildrenFileAttributes(test::TmpDir(env_
), &file_attrs
));
1219 for (int i
= 0; i
< kNumChildren
; ++i
) {
1220 std::ostringstream oss
;
1221 oss
<< "testfile_" << i
;
1222 const std::string name
= oss
.str();
1223 const std::string path
= test::TmpDir(env_
) + "/" + name
;
1225 auto file_attrs_iter
= std::find_if(
1226 file_attrs
.begin(), file_attrs
.end(),
1227 [&name
](const Env::FileAttributes
& fm
) { return fm
.name
== name
; });
1228 ASSERT_TRUE(file_attrs_iter
!= file_attrs
.end());
1230 ASSERT_OK(env_
->GetFileSize(path
, &size
));
1231 ASSERT_EQ(size
, 4096 * i
);
1232 ASSERT_EQ(size
, file_attrs_iter
->size_bytes
);
1234 rocksdb::SyncPoint::GetInstance()->ClearTrace();
1237 // Test that all WritableFileWrapper forwards all calls to WritableFile.
1238 TEST_P(EnvPosixTestWithParam
, WritableFileWrapper
) {
1239 class Base
: public WritableFile
{
1243 void inc(int x
) const {
1244 EXPECT_EQ(x
, (*step_
)++);
1247 explicit Base(int* step
) : step_(step
) {
1251 Status
Append(const Slice
& data
) override
{ inc(1); return Status::OK(); }
1252 Status
Truncate(uint64_t size
) override
{ return Status::OK(); }
1253 Status
Close() override
{ inc(2); return Status::OK(); }
1254 Status
Flush() override
{ inc(3); return Status::OK(); }
1255 Status
Sync() override
{ inc(4); return Status::OK(); }
1256 Status
Fsync() override
{ inc(5); return Status::OK(); }
1257 void SetIOPriority(Env::IOPriority pri
) override
{ inc(6); }
1258 uint64_t GetFileSize() override
{ inc(7); return 0; }
1259 void GetPreallocationStatus(size_t* block_size
,
1260 size_t* last_allocated_block
) override
{
1263 size_t GetUniqueId(char* id
, size_t max_size
) const override
{
1267 Status
InvalidateCache(size_t offset
, size_t length
) override
{
1269 return Status::OK();
1273 Status
Allocate(uint64_t offset
, uint64_t len
) override
{
1275 return Status::OK();
1277 Status
RangeSync(uint64_t offset
, uint64_t nbytes
) override
{
1279 return Status::OK();
1288 class Wrapper
: public WritableFileWrapper
{
1290 explicit Wrapper(WritableFile
* target
) : WritableFileWrapper(target
) {}
1292 void CallProtectedMethods() {
1308 w
.SetIOPriority(Env::IOPriority::IO_HIGH
);
1310 w
.GetPreallocationStatus(nullptr, nullptr);
1311 w
.GetUniqueId(nullptr, 0);
1312 w
.InvalidateCache(0, 0);
1313 w
.CallProtectedMethods();
1316 EXPECT_EQ(14, step
);
1319 TEST_P(EnvPosixTestWithParam
, PosixRandomRWFile
) {
1320 const std::string path
= test::TmpDir(env_
) + "/random_rw_file";
1322 env_
->DeleteFile(path
);
1324 std::unique_ptr
<RandomRWFile
> file
;
1325 ASSERT_OK(env_
->NewRandomRWFile(path
, &file
, EnvOptions()));
1330 ASSERT_OK(file
->Write(0, "ABCD"));
1331 ASSERT_OK(file
->Read(0, 10, &read_res
, buf
));
1332 ASSERT_EQ(read_res
.ToString(), "ABCD");
1334 ASSERT_OK(file
->Write(2, "XXXX"));
1335 ASSERT_OK(file
->Read(0, 10, &read_res
, buf
));
1336 ASSERT_EQ(read_res
.ToString(), "ABXXXX");
1338 ASSERT_OK(file
->Write(10, "ZZZ"));
1339 ASSERT_OK(file
->Read(10, 10, &read_res
, buf
));
1340 ASSERT_EQ(read_res
.ToString(), "ZZZ");
1342 ASSERT_OK(file
->Write(11, "Y"));
1343 ASSERT_OK(file
->Read(10, 10, &read_res
, buf
));
1344 ASSERT_EQ(read_res
.ToString(), "ZYZ");
1346 ASSERT_OK(file
->Write(200, "FFFFF"));
1347 ASSERT_OK(file
->Read(200, 10, &read_res
, buf
));
1348 ASSERT_EQ(read_res
.ToString(), "FFFFF");
1350 ASSERT_OK(file
->Write(205, "XXXX"));
1351 ASSERT_OK(file
->Read(200, 10, &read_res
, buf
));
1352 ASSERT_EQ(read_res
.ToString(), "FFFFFXXXX");
1354 ASSERT_OK(file
->Write(5, "QQQQ"));
1355 ASSERT_OK(file
->Read(0, 9, &read_res
, buf
));
1356 ASSERT_EQ(read_res
.ToString(), "ABXXXQQQQ");
1358 ASSERT_OK(file
->Read(2, 4, &read_res
, buf
));
1359 ASSERT_EQ(read_res
.ToString(), "XXXQ");
1361 // Close file and reopen it
1363 ASSERT_OK(env_
->NewRandomRWFile(path
, &file
, EnvOptions()));
1365 ASSERT_OK(file
->Read(0, 9, &read_res
, buf
));
1366 ASSERT_EQ(read_res
.ToString(), "ABXXXQQQQ");
1368 ASSERT_OK(file
->Read(10, 3, &read_res
, buf
));
1369 ASSERT_EQ(read_res
.ToString(), "ZYZ");
1371 ASSERT_OK(file
->Read(200, 9, &read_res
, buf
));
1372 ASSERT_EQ(read_res
.ToString(), "FFFFFXXXX");
1374 ASSERT_OK(file
->Write(4, "TTTTTTTTTTTTTTTT"));
1375 ASSERT_OK(file
->Read(0, 10, &read_res
, buf
));
1376 ASSERT_EQ(read_res
.ToString(), "ABXXTTTTTT");
1379 env_
->DeleteFile(path
);
1382 class RandomRWFileWithMirrorString
{
1384 explicit RandomRWFileWithMirrorString(RandomRWFile
* _file
) : file_(_file
) {}
1386 void Write(size_t offset
, const std::string
& data
) {
1387 // Write to mirror string
1388 StringWrite(offset
, data
);
1391 Status s
= file_
->Write(offset
, data
);
1392 ASSERT_OK(s
) << s
.ToString();
1395 void Read(size_t offset
= 0, size_t n
= 1000000) {
1396 Slice
str_res(nullptr, 0);
1397 if (offset
< file_mirror_
.size()) {
1398 size_t str_res_sz
= std::min(file_mirror_
.size() - offset
, n
);
1399 str_res
= Slice(file_mirror_
.data() + offset
, str_res_sz
);
1400 StopSliceAtNull(&str_res
);
1404 Status s
= file_
->Read(offset
, n
, &file_res
, buf_
);
1405 ASSERT_OK(s
) << s
.ToString();
1406 StopSliceAtNull(&file_res
);
1408 ASSERT_EQ(str_res
.ToString(), file_res
.ToString()) << offset
<< " " << n
;
1411 void SetFile(RandomRWFile
* _file
) { file_
= _file
; }
1414 void StringWrite(size_t offset
, const std::string
& src
) {
1415 if (offset
+ src
.size() > file_mirror_
.size()) {
1416 file_mirror_
.resize(offset
+ src
.size(), '\0');
1419 char* pos
= const_cast<char*>(file_mirror_
.data() + offset
);
1420 memcpy(pos
, src
.data(), src
.size());
1423 void StopSliceAtNull(Slice
* slc
) {
1424 for (size_t i
= 0; i
< slc
->size(); i
++) {
1425 if ((*slc
)[i
] == '\0') {
1426 *slc
= Slice(slc
->data(), i
);
1433 RandomRWFile
* file_
;
1434 std::string file_mirror_
;
1437 TEST_P(EnvPosixTestWithParam
, PosixRandomRWFileRandomized
) {
1438 const std::string path
= test::TmpDir(env_
) + "/random_rw_file_rand";
1439 env_
->DeleteFile(path
);
1441 unique_ptr
<RandomRWFile
> file
;
1442 ASSERT_OK(env_
->NewRandomRWFile(path
, &file
, EnvOptions()));
1443 RandomRWFileWithMirrorString
file_with_mirror(file
.get());
1447 for (int i
= 0; i
< 10000; i
++) {
1448 // Genrate random data
1449 test::RandomString(&rnd
, 10, &buf
);
1451 // Pick random offset for write
1452 size_t write_off
= rnd
.Next() % 1000;
1453 file_with_mirror
.Write(write_off
, buf
);
1455 // Pick random offset for read
1456 size_t read_off
= rnd
.Next() % 1000;
1457 size_t read_sz
= rnd
.Next() % 20;
1458 file_with_mirror
.Read(read_off
, read_sz
);
1461 // Reopen the file every 500 iters
1462 ASSERT_OK(env_
->NewRandomRWFile(path
, &file
, EnvOptions()));
1463 file_with_mirror
.SetFile(file
.get());
1468 env_
->DeleteFile(path
);
1471 INSTANTIATE_TEST_CASE_P(DefaultEnvWithoutDirectIO
, EnvPosixTestWithParam
,
1472 ::testing::Values(std::pair
<Env
*, bool>(Env::Default(),
1474 #if !defined(ROCKSDB_LITE)
1475 INSTANTIATE_TEST_CASE_P(DefaultEnvWithDirectIO
, EnvPosixTestWithParam
,
1476 ::testing::Values(std::pair
<Env
*, bool>(Env::Default(),
1478 #endif // !defined(ROCKSDB_LITE)
1480 #if !defined(ROCKSDB_LITE) && !defined(OS_WIN)
1481 static unique_ptr
<Env
> chroot_env(NewChrootEnv(Env::Default(),
1482 test::TmpDir(Env::Default())));
1483 INSTANTIATE_TEST_CASE_P(
1484 ChrootEnvWithoutDirectIO
, EnvPosixTestWithParam
,
1485 ::testing::Values(std::pair
<Env
*, bool>(chroot_env
.get(), false)));
1486 INSTANTIATE_TEST_CASE_P(
1487 ChrootEnvWithDirectIO
, EnvPosixTestWithParam
,
1488 ::testing::Values(std::pair
<Env
*, bool>(chroot_env
.get(), true)));
1489 #endif // !defined(ROCKSDB_LITE) && !defined(OS_WIN)
1491 } // namespace rocksdb
1493 int main(int argc
, char** argv
) {
1494 ::testing::InitGoogleTest(&argc
, argv
);
1495 return RUN_ALL_TESTS();