#include "db/blob/blob_log_format.h"
#include "file/writable_file_writer.h"
#include "monitoring/statistics.h"
-#include "rocksdb/env.h"
+#include "rocksdb/system_clock.h"
#include "test_util/sync_point.h"
#include "util/coding.h"
#include "util/stop_watch.h"
namespace ROCKSDB_NAMESPACE {
BlobLogWriter::BlobLogWriter(std::unique_ptr<WritableFileWriter>&& dest,
- Env* env, Statistics* statistics,
- uint64_t log_number, bool use_fs, uint64_t boffset)
+ SystemClock* clock, Statistics* statistics,
+ uint64_t log_number, bool use_fs, bool do_flush,
+ uint64_t boffset)
: dest_(std::move(dest)),
- env_(env),
+ clock_(clock),
statistics_(statistics),
log_number_(log_number),
block_offset_(boffset),
use_fsync_(use_fs),
+ do_flush_(do_flush),
last_elem_type_(kEtNone) {}
BlobLogWriter::~BlobLogWriter() = default;
Status BlobLogWriter::Sync() {
TEST_SYNC_POINT("BlobLogWriter::Sync");
- StopWatch sync_sw(env_, statistics_, BLOB_DB_BLOB_FILE_SYNC_MICROS);
+ StopWatch sync_sw(clock_, statistics_, BLOB_DB_BLOB_FILE_SYNC_MICROS);
Status s = dest_->Sync(use_fsync_);
RecordTick(statistics_, BLOB_DB_BLOB_FILE_SYNCED);
return s;
Status s = dest_->Append(Slice(str));
if (s.ok()) {
block_offset_ += str.size();
- s = dest_->Flush();
+ if (do_flush_) {
+ s = dest_->Flush();
+ }
}
last_elem_type_ = kEtFileHdr;
RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_WRITTEN,
std::string str;
footer.EncodeTo(&str);
- Status s = dest_->Append(Slice(str));
- if (s.ok()) {
- block_offset_ += str.size();
-
- s = Sync();
-
+ Status s;
+ if (dest_->seen_error()) {
+ s.PermitUncheckedError();
+ return Status::IOError("Seen Error. Skip closing.");
+ } else {
+ s = dest_->Append(Slice(str));
if (s.ok()) {
- s = dest_->Close();
+ block_offset_ += str.size();
+
+ s = Sync();
if (s.ok()) {
- assert(!!checksum_method == !!checksum_value);
+ s = dest_->Close();
+
+ if (s.ok()) {
+ assert(!!checksum_method == !!checksum_value);
- if (checksum_method) {
- assert(checksum_method->empty());
+ if (checksum_method) {
+ assert(checksum_method->empty());
- std::string method = dest_->GetFileChecksumFuncName();
- if (method != kUnknownFileChecksumFuncName) {
- *checksum_method = std::move(method);
+ std::string method = dest_->GetFileChecksumFuncName();
+ if (method != kUnknownFileChecksumFuncName) {
+ *checksum_method = std::move(method);
+ }
}
- }
- if (checksum_value) {
- assert(checksum_value->empty());
+ if (checksum_value) {
+ assert(checksum_value->empty());
- std::string value = dest_->GetFileChecksum();
- if (value != kUnknownFileChecksum) {
- *checksum_value = std::move(value);
+ std::string value = dest_->GetFileChecksum();
+ if (value != kUnknownFileChecksum) {
+ *checksum_value = std::move(value);
+ }
}
}
}
const Slice& key, const Slice& val,
uint64_t* key_offset,
uint64_t* blob_offset) {
- StopWatch write_sw(env_, statistics_, BLOB_DB_BLOB_FILE_WRITE_MICROS);
+ StopWatch write_sw(clock_, statistics_, BLOB_DB_BLOB_FILE_WRITE_MICROS);
Status s = dest_->Append(Slice(headerbuf));
if (s.ok()) {
s = dest_->Append(key);
if (s.ok()) {
s = dest_->Append(val);
}
- if (s.ok()) {
+ if (do_flush_ && s.ok()) {
s = dest_->Flush();
}