// We scan every table to compute
// (1) smallest/largest for the table
// (2) largest sequence number in the table
+// (3) oldest blob file referred to by the table (if applicable)
//
// If we are unable to scan the file, then we ignore the table.
//
#ifndef ROCKSDB_LITE
-#ifndef __STDC_FORMAT_MACROS
-#define __STDC_FORMAT_MACROS
-#endif
-
-#include <inttypes.h>
+#include <cinttypes>
#include "db/builder.h"
-#include "db/db_impl.h"
+#include "db/db_impl/db_impl.h"
#include "db/dbformat.h"
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "db/table_cache.h"
#include "db/version_edit.h"
#include "db/write_batch_internal.h"
+#include "env/composite_env_wrapper.h"
+#include "file/filename.h"
+#include "file/writable_file_writer.h"
#include "options/cf_options.h"
#include "rocksdb/comparator.h"
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/write_buffer_manager.h"
#include "table/scoped_arena_iterator.h"
-#include "util/file_reader_writer.h"
-#include "util/filename.h"
#include "util/string_util.h"
-namespace rocksdb {
+namespace ROCKSDB_NAMESPACE {
namespace {
// once.
NewLRUCache(10, db_options_.table_cache_numshardbits)),
table_cache_(new TableCache(default_cf_iopts_, env_options_,
- raw_table_cache_.get())),
+ raw_table_cache_.get(),
+ /*block_cache_tracer=*/nullptr)),
wb_(db_options_.db_write_buffer_size),
wc_(db_options_.delayed_write_rate),
vset_(dbname_, &immutable_db_options_, env_options_,
- raw_table_cache_.get(), &wb_, &wc_),
+ raw_table_cache_.get(), &wb_, &wc_,
+ /*block_cache_tracer=*/nullptr),
next_file_number_(1),
db_lock_(nullptr) {
for (const auto& cfd : column_families) {
FileMetaData meta;
uint32_t column_family_id;
std::string column_family_name;
- SequenceNumber min_sequence;
- SequenceNumber max_sequence;
};
std::string const dbname_;
if (!status.ok()) {
return status;
}
- std::unique_ptr<SequentialFileReader> lfile_reader(
- new SequentialFileReader(std::move(lfile), logname));
+ std::unique_ptr<SequentialFileReader> lfile_reader(new SequentialFileReader(
+ NewLegacySequentialFileWrapper(lfile), logname));
// Create the log reader.
LogReporter reporter;
continue;
}
WriteBatchInternal::SetContents(&batch, record);
- status = WriteBatchInternal::InsertInto(&batch, cf_mems, nullptr);
+ status =
+ WriteBatchInternal::InsertInto(&batch, cf_mems, nullptr, nullptr);
if (status.ok()) {
counter += WriteBatchInternal::Count(&batch);
} else {
if (range_del_iter != nullptr) {
range_del_iters.emplace_back(range_del_iter);
}
+
+ LegacyFileSystemWrapper fs(env_);
status = BuildTable(
- dbname_, env_, *cfd->ioptions(), *cfd->GetLatestMutableCFOptions(),
- env_options_, table_cache_, iter.get(), std::move(range_del_iters),
- &meta, cfd->internal_comparator(),
- cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),
- {}, kMaxSequenceNumber, snapshot_checker, kNoCompression,
- 0 /* sample_for_compression */, CompressionOptions(), false,
- nullptr /* internal_stats */, TableFileCreationReason::kRecovery,
- nullptr /* event_logger */, 0 /* job_id */, Env::IO_HIGH,
- nullptr /* table_properties */, -1 /* level */, current_time,
- write_hint);
+ dbname_, env_, &fs, *cfd->ioptions(),
+ *cfd->GetLatestMutableCFOptions(), env_options_, table_cache_,
+ iter.get(), std::move(range_del_iters), &meta,
+ cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(),
+ cfd->GetID(), cfd->GetName(), {}, kMaxSequenceNumber,
+ snapshot_checker, kNoCompression, 0 /* sample_for_compression */,
+ CompressionOptions(), false, nullptr /* internal_stats */,
+ TableFileCreationReason::kRecovery, nullptr /* event_logger */,
+ 0 /* job_id */, Env::IO_HIGH, nullptr /* table_properties */,
+ -1 /* level */, current_time, write_hint);
ROCKS_LOG_INFO(db_options_.info_log,
"Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s",
log, counter, meta.fd.GetNumber(),
status =
AddColumnFamily(props->column_family_name, t->column_family_id);
}
+ t->meta.oldest_ancester_time = props->creation_time;
}
ColumnFamilyData* cfd = nullptr;
if (status.ok()) {
}
}
if (status.ok()) {
+ ReadOptions ropts;
+ ropts.total_order_seek = true;
InternalIterator* iter = table_cache_->NewIterator(
- ReadOptions(), env_options_, cfd->internal_comparator(), t->meta,
+ ropts, env_options_, cfd->internal_comparator(), t->meta,
nullptr /* range_del_agg */,
- cfd->GetLatestMutableCFOptions()->prefix_extractor.get());
- bool empty = true;
+ cfd->GetLatestMutableCFOptions()->prefix_extractor.get(),
+ /*table_reader_ptr=*/nullptr, /*file_read_hist=*/nullptr,
+ TableReaderCaller::kRepair, /*arena=*/nullptr, /*skip_filters=*/false,
+ /*level=*/-1, /*smallest_compaction_key=*/nullptr,
+ /*largest_compaction_key=*/nullptr);
ParsedInternalKey parsed;
- t->min_sequence = 0;
- t->max_sequence = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
Slice key = iter->key();
if (!ParseInternalKey(key, &parsed)) {
}
counter++;
- if (empty) {
- empty = false;
- t->meta.smallest.DecodeFrom(key);
- t->min_sequence = parsed.sequence;
- }
- t->meta.largest.DecodeFrom(key);
- if (parsed.sequence < t->min_sequence) {
- t->min_sequence = parsed.sequence;
- }
- if (parsed.sequence > t->max_sequence) {
- t->max_sequence = parsed.sequence;
- }
+
+ t->meta.UpdateBoundaries(key, iter->value(), parsed.sequence,
+ parsed.type);
}
if (!iter->status().ok()) {
status = iter->status();
SequenceNumber max_sequence = 0;
for (size_t i = 0; i < tables_.size(); i++) {
cf_id_to_tables[tables_[i].column_family_id].push_back(&tables_[i]);
- if (max_sequence < tables_[i].max_sequence) {
- max_sequence = tables_[i].max_sequence;
+ if (max_sequence < tables_[i].meta.fd.largest_seqno) {
+ max_sequence = tables_[i].meta.fd.largest_seqno;
}
}
vset_.SetLastAllocatedSequence(max_sequence);
// TODO(opt): separate out into multiple levels
for (const auto* table : cf_id_and_tables.second) {
- edit.AddFile(0, table->meta.fd.GetNumber(), table->meta.fd.GetPathId(),
- table->meta.fd.GetFileSize(), table->meta.smallest,
- table->meta.largest, table->min_sequence,
- table->max_sequence, table->meta.marked_for_compaction);
+ edit.AddFile(
+ 0, table->meta.fd.GetNumber(), table->meta.fd.GetPathId(),
+ table->meta.fd.GetFileSize(), table->meta.smallest,
+ table->meta.largest, table->meta.fd.smallest_seqno,
+ table->meta.fd.largest_seqno, table->meta.marked_for_compaction,
+ table->meta.oldest_blob_file_number,
+ table->meta.oldest_ancester_time, table->meta.file_creation_time,
+ table->meta.file_checksum, table->meta.file_checksum_func_name);
}
assert(next_file_number_ > 0);
vset_.MarkFileNumberUsed(next_file_number_ - 1);
}
Status RepairDB(const std::string& dbname, const Options& options) {
- DBOptions db_options(options);
- ColumnFamilyOptions cf_options(options);
+ Options opts(options);
+ if (opts.file_system == nullptr) {
+ opts.file_system.reset(new LegacyFileSystemWrapper(opts.env));
+ ;
+ }
+
+ DBOptions db_options(opts);
+ ColumnFamilyOptions cf_options(opts);
Repairer repairer(dbname, db_options,
{}, cf_options /* default_cf_opts */,
cf_options /* unknown_cf_opts */,
return repairer.Run();
}
-} // namespace rocksdb
+} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE