#include "env/composite_env_wrapper.h"
#include "logging/logging.h"
#include "port/port.h"
+#include "rocksdb/system_clock.h"
#include "util/crc32c.h"
namespace ROCKSDB_NAMESPACE {
return s;
}
-Status NewRandomAccessCacheFile(Env* const env, const std::string& filepath,
- std::unique_ptr<RandomAccessFile>* file,
+Status NewRandomAccessCacheFile(const std::shared_ptr<FileSystem>& fs,
+ const std::string& filepath,
+ std::unique_ptr<FSRandomAccessFile>* file,
const bool use_direct_reads = true) {
- assert(env);
+ assert(fs.get());
- EnvOptions opt;
+ FileOptions opt;
opt.use_direct_reads = use_direct_reads;
- Status s = env->NewRandomAccessFile(filepath, file, opt);
- return s;
+ return fs->NewRandomAccessFile(filepath, opt, file, nullptr);
}
//
// <-- 4 --><-- 4 --><-- 4 --><-- 4 --><-- key size --><-- v-size -->
//
struct CacheRecordHeader {
- CacheRecordHeader()
- : magic_(0), crc_(0), key_size_(0), val_size_(0) {}
+ CacheRecordHeader() : magic_(0), crc_(0), key_size_(0), val_size_(0) {}
CacheRecordHeader(const uint32_t magic, const uint32_t key_size,
const uint32_t val_size)
: magic_(magic), crc_(0), key_size_(key_size), val_size_(val_size) {}
rwlock_.AssertHeld();
ROCKS_LOG_DEBUG(log_, "Opening cache file %s", Path().c_str());
+ assert(env_);
- std::unique_ptr<RandomAccessFile> file;
- Status status =
- NewRandomAccessCacheFile(env_, Path(), &file, enable_direct_reads);
+ std::unique_ptr<FSRandomAccessFile> file;
+ Status status = NewRandomAccessCacheFile(env_->GetFileSystem(), Path(), &file,
+ enable_direct_reads);
if (!status.ok()) {
Error(log_, "Error opening random access file %s. %s", Path().c_str(),
status.ToString().c_str());
return false;
}
- freader_.reset(new RandomAccessFileReader(
- NewLegacyRandomAccessFileWrapper(file), Path(), env_));
+ freader_.reset(new RandomAccessFileReader(std::move(file), Path(),
+ env_->GetSystemClock().get()));
return true;
}
Slice result;
Status s = freader_->Read(IOOptions(), lba.off_, lba.size_, &result, scratch,
- nullptr);
+ nullptr, Env::IO_TOTAL /* rate_limiter_priority */);
if (!s.ok()) {
Error(log_, "Error reading from file %s. %s", Path().c_str(),
s.ToString().c_str());
// We can fail to reserve space if every file in the system
// is being currently accessed
/* sleep override */
- Env::Default()->SleepForMicroseconds(1000000);
+ SystemClock::Default()->SleepForMicroseconds(1000000);
}
DispatchIO(io);