// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
-// This source code is licensed under the BSD-style license found in the
-// LICENSE file in the root directory of this source tree. An additional grant
-// of patent rights can be found in the PATENTS file in the same directory.
+// This source code is licensed under both the GPLv2 (found in the
+// COPYING file in the root directory) and Apache 2.0 License
+// (found in the LICENSE.Apache file in the root directory).
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
#include <sys/ioctl.h>
#include <sys/mman.h>
#include <sys/stat.h>
-#ifdef OS_LINUX
+#if defined(OS_LINUX) || defined(OS_SOLARIS) || defined(OS_ANDROID)
#include <sys/statfs.h>
#include <sys/syscall.h>
+#include <sys/sysmacros.h>
#endif
+#include <sys/statvfs.h>
#include <sys/time.h>
#include <sys/types.h>
#include <time.h>
#include "rocksdb/options.h"
#include "rocksdb/slice.h"
#include "util/coding.h"
+#include "util/compression_context_cache.h"
#include "util/logging.h"
#include "util/random.h"
#include "util/string_util.h"
return new ThreadStatusUpdater();
}
+inline mode_t GetDBFileMode(bool allow_non_owner_access) {
+ return allow_non_owner_access ? 0644 : 0600;
+}
+
// list of pathnames that are locked
static std::set<std::string> lockedFiles;
static port::Mutex mutex_lockedFiles;
-static int LockOrUnlock(const std::string& fname, int fd, bool lock) {
- mutex_lockedFiles.Lock();
- if (lock) {
- // If it already exists in the lockedFiles set, then it is already locked,
- // and fail this lock attempt. Otherwise, insert it into lockedFiles.
- // This check is needed because fcntl() does not detect lock conflict
- // if the fcntl is issued by the same thread that earlier acquired
- // this lock.
- if (lockedFiles.insert(fname).second == false) {
- mutex_lockedFiles.Unlock();
- errno = ENOLCK;
- return -1;
- }
- } else {
- // If we are unlocking, then verify that we had locked it earlier,
- // it should already exist in lockedFiles. Remove it from lockedFiles.
- if (lockedFiles.erase(fname) != 1) {
- mutex_lockedFiles.Unlock();
- errno = ENOLCK;
- return -1;
- }
- }
+static int LockOrUnlock(int fd, bool lock) {
errno = 0;
struct flock f;
memset(&f, 0, sizeof(f));
f.l_start = 0;
f.l_len = 0; // Lock/unlock entire file
int value = fcntl(fd, F_SETLK, &f);
- if (value == -1 && lock) {
- // if there is an error in locking, then remove the pathname from lockedfiles
- lockedFiles.erase(fname);
- }
- mutex_lockedFiles.Unlock();
+
return value;
}
std::string filename;
};
+int cloexec_flags(int flags, const EnvOptions* options) {
+ // If the system supports opening the file with cloexec enabled,
+ // do so, as this avoids a race condition if a db is opened around
+ // the same time that a child process is forked
+#ifdef O_CLOEXEC
+ if (options == nullptr || options->set_fd_cloexec) {
+ flags |= O_CLOEXEC;
+ }
+#endif
+ return flags;
+}
+
class PosixEnv : public Env {
public:
PosixEnv();
const EnvOptions& options) override {
result->reset();
int fd = -1;
- int flags = O_RDONLY;
+ int flags = cloexec_flags(O_RDONLY, &options);
FILE* file = nullptr;
if (options.use_direct_reads && !options.use_mmap_reads) {
do {
IOSTATS_TIMER_GUARD(open_nanos);
- fd = open(fname.c_str(), flags, 0644);
+ fd = open(fname.c_str(), flags, GetDBFileMode(allow_non_owner_access_));
} while (fd < 0 && errno == EINTR);
if (fd < 0) {
- return IOError(fname, errno);
+ return IOError("While opening a file for sequentially reading", fname,
+ errno);
}
SetFD_CLOEXEC(fd, &options);
#ifdef OS_MACOSX
if (fcntl(fd, F_NOCACHE, 1) == -1) {
close(fd);
- return IOError(fname, errno);
+ return IOError("While fcntl NoCache", fname, errno);
}
#endif
} else {
} while (file == nullptr && errno == EINTR);
if (file == nullptr) {
close(fd);
- return IOError(fname, errno);
+ return IOError("While opening file for sequentially read", fname,
+ errno);
}
}
result->reset(new PosixSequentialFile(fname, file, fd, options));
result->reset();
Status s;
int fd;
- int flags = O_RDONLY;
+ int flags = cloexec_flags(O_RDONLY, &options);
+
if (options.use_direct_reads && !options.use_mmap_reads) {
#ifdef ROCKSDB_LITE
return Status::IOError(fname, "Direct I/O not supported in RocksDB lite");
do {
IOSTATS_TIMER_GUARD(open_nanos);
- fd = open(fname.c_str(), flags, 0644);
+ fd = open(fname.c_str(), flags, GetDBFileMode(allow_non_owner_access_));
} while (fd < 0 && errno == EINTR);
if (fd < 0) {
- return IOError(fname, errno);
+ return IOError("While open a file for random read", fname, errno);
}
SetFD_CLOEXEC(fd, &options);
result->reset(new PosixMmapReadableFile(fd, fname, base,
size, options));
} else {
- s = IOError(fname, errno);
+ s = IOError("while mmap file for read", fname, errno);
+ close(fd);
}
}
- close(fd);
} else {
if (options.use_direct_reads && !options.use_mmap_reads) {
#ifdef OS_MACOSX
if (fcntl(fd, F_NOCACHE, 1) == -1) {
close(fd);
- return IOError(fname, errno);
+ return IOError("while fcntl NoCache", fname, errno);
}
#endif
}
return s;
}
- virtual Status NewWritableFile(const std::string& fname,
- unique_ptr<WritableFile>* result,
- const EnvOptions& options) override {
+ virtual Status OpenWritableFile(const std::string& fname,
+ unique_ptr<WritableFile>* result,
+ const EnvOptions& options,
+ bool reopen = false) {
result->reset();
Status s;
int fd = -1;
- int flags = O_CREAT | O_TRUNC;
+ int flags = (reopen) ? (O_CREAT | O_APPEND) : (O_CREAT | O_TRUNC);
// Direct IO mode with O_DIRECT flag or F_NOCAHCE (MAC OSX)
if (options.use_direct_writes && !options.use_mmap_writes) {
// Note: we should avoid O_APPEND here due to ta the following bug:
flags |= O_WRONLY;
}
+ flags = cloexec_flags(flags, &options);
+
do {
IOSTATS_TIMER_GUARD(open_nanos);
- fd = open(fname.c_str(), flags, 0644);
+ fd = open(fname.c_str(), flags, GetDBFileMode(allow_non_owner_access_));
} while (fd < 0 && errno == EINTR);
if (fd < 0) {
- s = IOError(fname, errno);
+ s = IOError("While open a file for appending", fname, errno);
return s;
}
SetFD_CLOEXEC(fd, &options);
#ifdef OS_MACOSX
if (fcntl(fd, F_NOCACHE, 1) == -1) {
close(fd);
- s = IOError(fname, errno);
+ s = IOError("While fcntl NoCache an opened file for appending", fname,
+ errno);
return s;
}
#elif defined(OS_SOLARIS)
if (directio(fd, DIRECTIO_ON) == -1) {
if (errno != ENOTTY) { // ZFS filesystems don't support DIRECTIO_ON
close(fd);
- s = IOError(fname, errno);
+ s = IOError("While calling directio()", fname, errno);
return s;
}
}
return s;
}
+ virtual Status NewWritableFile(const std::string& fname,
+ unique_ptr<WritableFile>* result,
+ const EnvOptions& options) override {
+ return OpenWritableFile(fname, result, options, false);
+ }
+
+ virtual Status ReopenWritableFile(const std::string& fname,
+ unique_ptr<WritableFile>* result,
+ const EnvOptions& options) override {
+ return OpenWritableFile(fname, result, options, true);
+ }
+
virtual Status ReuseWritableFile(const std::string& fname,
const std::string& old_fname,
unique_ptr<WritableFile>* result,
flags |= O_WRONLY;
}
+ flags = cloexec_flags(flags, &options);
+
do {
IOSTATS_TIMER_GUARD(open_nanos);
- fd = open(old_fname.c_str(), flags, 0644);
+ fd = open(old_fname.c_str(), flags,
+ GetDBFileMode(allow_non_owner_access_));
} while (fd < 0 && errno == EINTR);
if (fd < 0) {
- s = IOError(fname, errno);
+ s = IOError("while reopen file for write", fname, errno);
return s;
}
SetFD_CLOEXEC(fd, &options);
// rename into place
if (rename(old_fname.c_str(), fname.c_str()) != 0) {
- s = IOError(old_fname, errno);
+ s = IOError("while rename file to " + fname, old_fname, errno);
close(fd);
return s;
}
#ifdef OS_MACOSX
if (fcntl(fd, F_NOCACHE, 1) == -1) {
close(fd);
- s = IOError(fname, errno);
+ s = IOError("while fcntl NoCache for reopened file for append", fname,
+ errno);
return s;
}
#elif defined(OS_SOLARIS)
if (directio(fd, DIRECTIO_ON) == -1) {
if (errno != ENOTTY) { // ZFS filesystems don't support DIRECTIO_ON
close(fd);
- s = IOError(fname, errno);
+ s = IOError("while calling directio()", fname, errno);
return s;
}
}
result->reset(new PosixWritableFile(fname, fd, no_mmap_writes_options));
}
return s;
-
- return s;
}
virtual Status NewRandomRWFile(const std::string& fname,
unique_ptr<RandomRWFile>* result,
const EnvOptions& options) override {
int fd = -1;
+ int flags = cloexec_flags(O_RDWR, &options);
+
while (fd < 0) {
IOSTATS_TIMER_GUARD(open_nanos);
- fd = open(fname.c_str(), O_CREAT | O_RDWR, 0644);
+
+ fd = open(fname.c_str(), flags, GetDBFileMode(allow_non_owner_access_));
if (fd < 0) {
// Error while opening the file
if (errno == EINTR) {
continue;
}
- return IOError(fname, errno);
+ return IOError("While open file for random read/write", fname, errno);
}
}
return Status::OK();
}
+ virtual Status NewMemoryMappedFileBuffer(
+ const std::string& fname,
+ unique_ptr<MemoryMappedFileBuffer>* result) override {
+ int fd = -1;
+ Status status;
+ int flags = cloexec_flags(O_RDWR, nullptr);
+
+ while (fd < 0) {
+ IOSTATS_TIMER_GUARD(open_nanos);
+ fd = open(fname.c_str(), flags, 0644);
+ if (fd < 0) {
+ // Error while opening the file
+ if (errno == EINTR) {
+ continue;
+ }
+ status =
+ IOError("While open file for raw mmap buffer access", fname, errno);
+ break;
+ }
+ }
+ uint64_t size;
+ if (status.ok()) {
+ status = GetFileSize(fname, &size);
+ }
+ void* base = nullptr;
+ if (status.ok()) {
+ base = mmap(nullptr, static_cast<size_t>(size), PROT_READ | PROT_WRITE,
+ MAP_SHARED, fd, 0);
+ if (base == MAP_FAILED) {
+ status = IOError("while mmap file for read", fname, errno);
+ }
+ }
+ if (status.ok()) {
+ result->reset(
+ new PosixMemoryMappedFileBuffer(base, static_cast<size_t>(size)));
+ }
+ if (fd >= 0) {
+ // don't need to keep it open after mmap has been called
+ close(fd);
+ }
+ return status;
+ }
+
virtual Status NewDirectory(const std::string& name,
unique_ptr<Directory>* result) override {
result->reset();
int fd;
+ int flags = cloexec_flags(0, nullptr);
{
IOSTATS_TIMER_GUARD(open_nanos);
- fd = open(name.c_str(), 0);
+ fd = open(name.c_str(), flags);
}
if (fd < 0) {
- return IOError(name, errno);
+ return IOError("While open directory", name, errno);
} else {
result->reset(new PosixDirectory(fd));
}
return Status::OK();
}
- switch (errno) {
+ int err = errno;
+ switch (err) {
case EACCES:
case ELOOP:
case ENAMETOOLONG:
case ENOTDIR:
return Status::NotFound();
default:
- assert(result == EIO || result == ENOMEM);
- return Status::IOError("Unexpected error(" + ToString(result) +
+ assert(err == EIO || err == ENOMEM);
+ return Status::IOError("Unexpected error(" + ToString(err) +
") accessing file `" + fname + "' ");
}
}
case ENOTDIR:
return Status::NotFound();
default:
- return IOError(dir, errno);
+ return IOError("While opendir", dir, errno);
}
}
struct dirent* entry;
virtual Status DeleteFile(const std::string& fname) override {
Status result;
if (unlink(fname.c_str()) != 0) {
- result = IOError(fname, errno);
+ result = IOError("while unlink() file", fname, errno);
}
return result;
};
virtual Status CreateDir(const std::string& name) override {
Status result;
if (mkdir(name.c_str(), 0755) != 0) {
- result = IOError(name, errno);
+ result = IOError("While mkdir", name, errno);
}
return result;
};
Status result;
if (mkdir(name.c_str(), 0755) != 0) {
if (errno != EEXIST) {
- result = IOError(name, errno);
+ result = IOError("While mkdir if missing", name, errno);
} else if (!DirExists(name)) { // Check that name is actually a
// directory.
// Message is taken from mkdir
virtual Status DeleteDir(const std::string& name) override {
Status result;
if (rmdir(name.c_str()) != 0) {
- result = IOError(name, errno);
+ result = IOError("file rmdir", name, errno);
}
return result;
};
struct stat sbuf;
if (stat(fname.c_str(), &sbuf) != 0) {
*size = 0;
- s = IOError(fname, errno);
+ s = IOError("while stat a file for size", fname, errno);
} else {
*size = sbuf.st_size;
}
uint64_t* file_mtime) override {
struct stat s;
if (stat(fname.c_str(), &s) !=0) {
- return IOError(fname, errno);
+ return IOError("while stat a file for modification time", fname, errno);
}
*file_mtime = static_cast<uint64_t>(s.st_mtime);
return Status::OK();
const std::string& target) override {
Status result;
if (rename(src.c_str(), target.c_str()) != 0) {
- result = IOError(src, errno);
+ result = IOError("While renaming a file to " + target, src, errno);
}
return result;
}
if (errno == EXDEV) {
return Status::NotSupported("No cross FS links allowed");
}
- result = IOError(src, errno);
+ result = IOError("while link file to " + target, src, errno);
}
return result;
}
+ Status NumFileLinks(const std::string& fname, uint64_t* count) override {
+ struct stat s;
+ if (stat(fname.c_str(), &s) != 0) {
+ return IOError("while stat a file for num file links", fname, errno);
+ }
+ *count = static_cast<uint64_t>(s.st_nlink);
+ return Status::OK();
+ }
+
+ virtual Status AreFilesSame(const std::string& first,
+ const std::string& second, bool* res) override {
+ struct stat statbuf[2];
+ if (stat(first.c_str(), &statbuf[0]) != 0) {
+ return IOError("stat file", first, errno);
+ }
+ if (stat(second.c_str(), &statbuf[1]) != 0) {
+ return IOError("stat file", second, errno);
+ }
+
+ if (major(statbuf[0].st_dev) != major(statbuf[1].st_dev) ||
+ minor(statbuf[0].st_dev) != minor(statbuf[1].st_dev) ||
+ statbuf[0].st_ino != statbuf[1].st_ino) {
+ *res = false;
+ } else {
+ *res = true;
+ }
+ return Status::OK();
+ }
+
virtual Status LockFile(const std::string& fname, FileLock** lock) override {
*lock = nullptr;
Status result;
+
+ mutex_lockedFiles.Lock();
+ // If it already exists in the lockedFiles set, then it is already locked,
+ // and fail this lock attempt. Otherwise, insert it into lockedFiles.
+ // This check is needed because fcntl() does not detect lock conflict
+ // if the fcntl is issued by the same thread that earlier acquired
+ // this lock.
+ // We must do this check *before* opening the file:
+ // Otherwise, we will open a new file descriptor. Locks are associated with
+ // a process, not a file descriptor and when *any* file descriptor is closed,
+ // all locks the process holds for that *file* are released
+ if (lockedFiles.insert(fname).second == false) {
+ mutex_lockedFiles.Unlock();
+ errno = ENOLCK;
+ return IOError("lock ", fname, errno);
+ }
+
int fd;
+ int flags = cloexec_flags(O_RDWR | O_CREAT, nullptr);
+
{
IOSTATS_TIMER_GUARD(open_nanos);
- fd = open(fname.c_str(), O_RDWR | O_CREAT, 0644);
+ fd = open(fname.c_str(), flags, 0644);
}
if (fd < 0) {
- result = IOError(fname, errno);
- } else if (LockOrUnlock(fname, fd, true) == -1) {
- result = IOError("lock " + fname, errno);
+ result = IOError("while open a file for lock", fname, errno);
+ } else if (LockOrUnlock(fd, true) == -1) {
+ // if there is an error in locking, then remove the pathname from lockedfiles
+ lockedFiles.erase(fname);
+ result = IOError("While lock file", fname, errno);
close(fd);
} else {
SetFD_CLOEXEC(fd, nullptr);
my_lock->filename = fname;
*lock = my_lock;
}
+
+ mutex_lockedFiles.Unlock();
return result;
}
virtual Status UnlockFile(FileLock* lock) override {
PosixFileLock* my_lock = reinterpret_cast<PosixFileLock*>(lock);
Status result;
- if (LockOrUnlock(my_lock->filename, my_lock->fd_, false) == -1) {
- result = IOError("unlock", errno);
+ mutex_lockedFiles.Lock();
+ // If we are unlocking, then verify that we had locked it earlier,
+ // it should already exist in lockedFiles. Remove it from lockedFiles.
+ if (lockedFiles.erase(my_lock->filename) != 1) {
+ errno = ENOLCK;
+ result = IOError("unlock", my_lock->filename, errno);
+ } else if (LockOrUnlock(my_lock->fd_, false) == -1) {
+ result = IOError("unlock", my_lock->filename, errno);
}
close(my_lock->fd_);
delete my_lock;
+ mutex_lockedFiles.Unlock();
return result;
}
virtual void Schedule(void (*function)(void* arg1), void* arg,
Priority pri = LOW, void* tag = nullptr,
- void (*unschedFunction)(void* arg) = 0) override;
+ void (*unschedFunction)(void* arg) = nullptr) override;
virtual int UnSchedule(void* arg, Priority pri) override;
return gettid(pthread_self());
}
+ virtual Status GetFreeSpace(const std::string& fname,
+ uint64_t* free_space) override {
+ struct statvfs sbuf;
+
+ if (statvfs(fname.c_str(), &sbuf) < 0) {
+ return IOError("While doing statvfs", fname, errno);
+ }
+
+ *free_space = ((uint64_t)sbuf.f_bsize * sbuf.f_bfree);
+ return Status::OK();
+ }
+
virtual Status NewLogger(const std::string& fname,
shared_ptr<Logger>* result) override {
FILE* f;
{
IOSTATS_TIMER_GUARD(open_nanos);
- f = fopen(fname.c_str(), "w");
+ f = fopen(fname.c_str(), "w"
+#ifdef __GLIBC_PREREQ
+#if __GLIBC_PREREQ(2, 7)
+ "e" // glibc extension to enable O_CLOEXEC
+#endif
+#endif
+ );
}
if (f == nullptr) {
result->reset();
- return IOError(fname, errno);
+ return IOError("when fopen a file for new logger", fname, errno);
} else {
int fd = fileno(f);
#ifdef ROCKSDB_FALLOCATE_PRESENT
if (errno == EFAULT || errno == EINVAL)
return Status::InvalidArgument(strerror(errno));
else
- return IOError("GetHostName", errno);
+ return IOError("GetHostName", name, errno);
}
return Status::OK();
}
virtual Status GetCurrentTime(int64_t* unix_time) override {
time_t ret = time(nullptr);
if (ret == (time_t) -1) {
- return IOError("GetCurrentTime", errno);
+ return IOError("GetCurrentTime", "", errno);
}
*unix_time = (int64_t) ret;
return Status::OK();
virtual Status GetAbsolutePath(const std::string& db_path,
std::string* output_path) override {
- if (db_path.find('/') == 0) {
+ if (!db_path.empty() && db_path[0] == '/') {
*output_path = db_path;
return Status::OK();
}
// Allow increasing the number of worker threads.
virtual void SetBackgroundThreads(int num, Priority pri) override {
- assert(pri >= Priority::LOW && pri <= Priority::HIGH);
+ assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
thread_pools_[pri].SetBackgroundThreads(num);
}
+ virtual int GetBackgroundThreads(Priority pri) override {
+ assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
+ return thread_pools_[pri].GetBackgroundThreads();
+ }
+
+ virtual Status SetAllowNonOwnerAccess(bool allow_non_owner_access) override {
+ allow_non_owner_access_ = allow_non_owner_access;
+ return Status::OK();
+ }
+
// Allow increasing the number of worker threads.
virtual void IncBackgroundThreadsIfNeeded(int num, Priority pri) override {
- assert(pri >= Priority::LOW && pri <= Priority::HIGH);
+ assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
thread_pools_[pri].IncBackgroundThreadsIfNeeded(num);
}
virtual void LowerThreadPoolIOPriority(Priority pool = LOW) override {
- assert(pool >= Priority::LOW && pool <= Priority::HIGH);
+ assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH);
#ifdef OS_LINUX
thread_pools_[pool].LowerIOPriority();
+#else
+ (void)pool;
+#endif
+ }
+
+ virtual void LowerThreadPoolCPUPriority(Priority pool = LOW) override {
+ assert(pool >= Priority::BOTTOM && pool <= Priority::HIGH);
+#ifdef OS_LINUX
+ thread_pools_[pool].LowerCPUPriority();
+#else
+ (void)pool;
#endif
}
// breaks TransactionLogIteratorStallAtLastRecord unit test. Fix the unit
// test and make this false
optimized.fallocate_with_keep_size = true;
+ optimized.writable_file_max_buffer_size =
+ db_options.writable_file_max_buffer_size;
return optimized;
}
return false;
}
#else
+ (void)path;
return false;
#endif
}
std::vector<ThreadPoolImpl> thread_pools_;
pthread_mutex_t mu_;
std::vector<pthread_t> threads_to_join_;
+ // If true, allow non owner read access for db files. Otherwise, non-owner
+ // has no access to db files.
+ bool allow_non_owner_access_;
};
PosixEnv::PosixEnv()
: checkedDiskForMmap_(false),
forceMmapOff_(false),
page_size_(getpagesize()),
- thread_pools_(Priority::TOTAL) {
+ thread_pools_(Priority::TOTAL),
+ allow_non_owner_access_(true) {
ThreadPoolImpl::PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr));
for (int pool_id = 0; pool_id < Env::Priority::TOTAL; ++pool_id) {
thread_pools_[pool_id].SetThreadPriority(
void PosixEnv::Schedule(void (*function)(void* arg1), void* arg, Priority pri,
void* tag, void (*unschedFunction)(void* arg)) {
- assert(pri >= Priority::LOW && pri <= Priority::HIGH);
+ assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
thread_pools_[pri].Schedule(function, arg, tag, unschedFunction);
}
}
unsigned int PosixEnv::GetThreadPoolQueueLen(Priority pri) const {
- assert(pri >= Priority::LOW && pri <= Priority::HIGH);
+ assert(pri >= Priority::BOTTOM && pri <= Priority::HIGH);
return thread_pools_[pri].GetQueueLen();
}
// the destructor of static PosixEnv will go first, then the
// the singletons of ThreadLocalPtr.
ThreadLocalPtr::InitSingletons();
+ CompressionContextCache::InitSingleton();
+ INIT_SYNC_POINT_SINGLETONS();
static PosixEnv default_env;
return &default_env;
}