#pragma once
#include <time.h>
+
#include <atomic>
#include <memory>
-#include "port/sys_time.h"
#include "file/writable_file_writer.h"
#include "monitoring/iostats_context_imp.h"
+#include "port/sys_time.h"
#include "rocksdb/env.h"
+#include "rocksdb/file_system.h"
+#include "rocksdb/perf_level.h"
#include "rocksdb/slice.h"
#include "test_util/sync_point.h"
#include "util/mutexlock.h"
const std::string& fname, const EnvOptions& options, Env* env,
InfoLogLevel log_level = InfoLogLevel::ERROR_LEVEL)
: Logger(log_level),
- file_(std::move(writable_file), fname, options, env),
- last_flush_micros_(0),
env_(env),
+ clock_(env_->GetSystemClock().get()),
+ file_(std::move(writable_file), fname, options, clock_),
+ last_flush_micros_(0),
flush_pending_(false) {}
~EnvLogger() {
if (!closed_) {
closed_ = true;
- CloseHelper();
+ CloseHelper().PermitUncheckedError();
}
}
private:
+ // A guard to prepare file operations, such as mutex and skip
+ // I/O context.
+ class FileOpGuard {
+ public:
+ explicit FileOpGuard(EnvLogger& logger)
+ : logger_(logger), prev_perf_level_(GetPerfLevel()) {
+ // Preserve iostats not to pollute writes from user writes. We might
+ // need a better solution than this.
+ SetPerfLevel(PerfLevel::kDisable);
+ IOSTATS_SET_DISABLE(true);
+ logger.mutex_.Lock();
+ }
+ ~FileOpGuard() {
+ logger_.mutex_.Unlock();
+ IOSTATS_SET_DISABLE(false);
+ SetPerfLevel(prev_perf_level_);
+ }
+
+ private:
+ EnvLogger& logger_;
+ PerfLevel prev_perf_level_;
+ };
+
void FlushLocked() {
mutex_.AssertHeld();
if (flush_pending_) {
flush_pending_ = false;
- file_.Flush();
+ file_.Flush().PermitUncheckedError();
}
- last_flush_micros_ = env_->NowMicros();
+ last_flush_micros_ = clock_->NowMicros();
}
void Flush() override {
TEST_SYNC_POINT("EnvLogger::Flush:Begin1");
TEST_SYNC_POINT("EnvLogger::Flush:Begin2");
- MutexLock l(&mutex_);
+ FileOpGuard guard(*this);
FlushLocked();
}
Status CloseImpl() override { return CloseHelper(); }
Status CloseHelper() {
- mutex_.Lock();
+ FileOpGuard guard(*this);
const auto close_status = file_.Close();
- mutex_.Unlock();
if (close_status.ok()) {
return close_status;
char* p = base;
char* limit = base + bufsize;
- struct timeval now_tv;
- gettimeofday(&now_tv, nullptr);
+ port::TimeVal now_tv;
+ port::GetTimeOfDay(&now_tv, nullptr);
const time_t seconds = now_tv.tv_sec;
struct tm t;
- localtime_r(&seconds, &t);
- p += snprintf(p, limit - p, "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ",
+ port::LocalTimeR(&seconds, &t);
+ p += snprintf(p, limit - p, "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llu ",
t.tm_year + 1900, t.tm_mon + 1, t.tm_mday, t.tm_hour,
t.tm_min, t.tm_sec, static_cast<int>(now_tv.tv_usec),
static_cast<long long unsigned int>(thread_id));
}
assert(p <= limit);
- mutex_.Lock();
- // We will ignore any error returned by Append().
- file_.Append(Slice(base, p - base));
- flush_pending_ = true;
- const uint64_t now_micros = env_->NowMicros();
- if (now_micros - last_flush_micros_ >= flush_every_seconds_ * 1000000) {
- FlushLocked();
+ {
+ FileOpGuard guard(*this);
+ // We will ignore any error returned by Append().
+ file_.Append(Slice(base, p - base)).PermitUncheckedError();
+ flush_pending_ = true;
+ const uint64_t now_micros = clock_->NowMicros();
+ if (now_micros - last_flush_micros_ >= flush_every_seconds_ * 1000000) {
+ FlushLocked();
+ }
}
- mutex_.Unlock();
if (base != buffer) {
delete[] base;
}
}
private:
+ Env* env_;
+ SystemClock* clock_;
WritableFileWriter file_;
mutable port::Mutex mutex_; // Mutex to protect the shared variables below.
const static uint64_t flush_every_seconds_ = 5;
std::atomic_uint_fast64_t last_flush_micros_;
- Env* env_;
std::atomic<bool> flush_pending_;
};