]> git.proxmox.com Git - ceph.git/blame - ceph/src/rocksdb/db/compacted_db_impl.cc
buildsys: change download over to reef release
[ceph.git] / ceph / src / rocksdb / db / compacted_db_impl.cc
CommitLineData
7c673cae 1// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
11fdf7f2
TL
2// This source code is licensed under both the GPLv2 (found in the
3// COPYING file in the root directory) and Apache 2.0 License
4// (found in the LICENSE.Apache file in the root directory).
7c673cae
FG
5
6#ifndef ROCKSDB_LITE
7#include "db/compacted_db_impl.h"
20effc67 8
f67539c2 9#include "db/db_impl/db_impl.h"
7c673cae
FG
10#include "db/version_set.h"
11#include "table/get_context.h"
20effc67 12#include "util/cast_util.h"
7c673cae 13
f67539c2 14namespace ROCKSDB_NAMESPACE {
7c673cae
FG
15
16extern void MarkKeyMayExist(void* arg);
17extern bool SaveValue(void* arg, const ParsedInternalKey& parsed_key,
18 const Slice& v, bool hit_and_return);
19
20CompactedDBImpl::CompactedDBImpl(
21 const DBOptions& options, const std::string& dbname)
11fdf7f2
TL
22 : DBImpl(options, dbname), cfd_(nullptr), version_(nullptr),
23 user_comparator_(nullptr) {
7c673cae
FG
24}
25
26CompactedDBImpl::~CompactedDBImpl() {
27}
28
29size_t CompactedDBImpl::FindFile(const Slice& key) {
7c673cae 30 size_t right = files_.num_files - 1;
11fdf7f2
TL
31 auto cmp = [&](const FdWithKeyRange& f, const Slice& k) -> bool {
32 return user_comparator_->Compare(ExtractUserKey(f.largest_key), k) < 0;
33 };
34 return static_cast<size_t>(std::lower_bound(files_.files,
35 files_.files + right, key, cmp) - files_.files);
7c673cae
FG
36}
37
38Status CompactedDBImpl::Get(const ReadOptions& options, ColumnFamilyHandle*,
39 const Slice& key, PinnableSlice* value) {
40 GetContext get_context(user_comparator_, nullptr, nullptr, nullptr,
41 GetContext::kNotFound, key, value, nullptr, nullptr,
20effc67 42 nullptr, true, nullptr, nullptr);
7c673cae 43 LookupKey lkey(key, kMaxSequenceNumber);
20effc67
TL
44 Status s = files_.files[FindFile(key)].fd.table_reader->Get(
45 options, lkey.internal_key(), &get_context, nullptr);
46 if (!s.ok() && !s.IsNotFound()) {
47 return s;
48 }
7c673cae
FG
49 if (get_context.State() == GetContext::kFound) {
50 return Status::OK();
51 }
52 return Status::NotFound();
53}
54
55std::vector<Status> CompactedDBImpl::MultiGet(const ReadOptions& options,
56 const std::vector<ColumnFamilyHandle*>&,
57 const std::vector<Slice>& keys, std::vector<std::string>* values) {
58 autovector<TableReader*, 16> reader_list;
59 for (const auto& key : keys) {
60 const FdWithKeyRange& f = files_.files[FindFile(key)];
61 if (user_comparator_->Compare(key, ExtractUserKey(f.smallest_key)) < 0) {
62 reader_list.push_back(nullptr);
63 } else {
64 LookupKey lkey(key, kMaxSequenceNumber);
65 f.fd.table_reader->Prepare(lkey.internal_key());
66 reader_list.push_back(f.fd.table_reader);
67 }
68 }
69 std::vector<Status> statuses(keys.size(), Status::NotFound());
70 values->resize(keys.size());
71 int idx = 0;
72 for (auto* r : reader_list) {
73 if (r != nullptr) {
74 PinnableSlice pinnable_val;
75 std::string& value = (*values)[idx];
76 GetContext get_context(user_comparator_, nullptr, nullptr, nullptr,
77 GetContext::kNotFound, keys[idx], &pinnable_val,
20effc67 78 nullptr, nullptr, nullptr, true, nullptr, nullptr);
7c673cae 79 LookupKey lkey(keys[idx], kMaxSequenceNumber);
20effc67
TL
80 Status s = r->Get(options, lkey.internal_key(), &get_context, nullptr);
81 if (!s.ok() && !s.IsNotFound()) {
82 statuses[idx] = s;
83 } else {
84 value.assign(pinnable_val.data(), pinnable_val.size());
85 if (get_context.State() == GetContext::kFound) {
86 statuses[idx] = Status::OK();
87 }
7c673cae
FG
88 }
89 }
90 ++idx;
91 }
92 return statuses;
93}
94
95Status CompactedDBImpl::Init(const Options& options) {
11fdf7f2 96 SuperVersionContext sv_context(/* create_superversion */ true);
7c673cae
FG
97 mutex_.Lock();
98 ColumnFamilyDescriptor cf(kDefaultColumnFamilyName,
99 ColumnFamilyOptions(options));
100 Status s = Recover({cf}, true /* read only */, false, true);
101 if (s.ok()) {
20effc67
TL
102 cfd_ = static_cast_with_check<ColumnFamilyHandleImpl>(DefaultColumnFamily())
103 ->cfd();
11fdf7f2 104 cfd_->InstallSuperVersion(&sv_context, &mutex_);
7c673cae
FG
105 }
106 mutex_.Unlock();
11fdf7f2 107 sv_context.Clean();
7c673cae
FG
108 if (!s.ok()) {
109 return s;
110 }
111 NewThreadStatusCfInfo(cfd_);
112 version_ = cfd_->GetSuperVersion()->current;
113 user_comparator_ = cfd_->user_comparator();
114 auto* vstorage = version_->storage_info();
115 if (vstorage->num_non_empty_levels() == 0) {
116 return Status::NotSupported("no file exists");
117 }
118 const LevelFilesBrief& l0 = vstorage->LevelFilesBrief(0);
119 // L0 should not have files
120 if (l0.num_files > 1) {
121 return Status::NotSupported("L0 contain more than 1 file");
122 }
123 if (l0.num_files == 1) {
124 if (vstorage->num_non_empty_levels() > 1) {
125 return Status::NotSupported("Both L0 and other level contain files");
126 }
127 files_ = l0;
128 return Status::OK();
129 }
130
131 for (int i = 1; i < vstorage->num_non_empty_levels() - 1; ++i) {
132 if (vstorage->LevelFilesBrief(i).num_files > 0) {
133 return Status::NotSupported("Other levels also contain files");
134 }
135 }
136
137 int level = vstorage->num_non_empty_levels() - 1;
138 if (vstorage->LevelFilesBrief(level).num_files > 0) {
139 files_ = vstorage->LevelFilesBrief(level);
140 return Status::OK();
141 }
142 return Status::NotSupported("no file exists");
143}
144
145Status CompactedDBImpl::Open(const Options& options,
146 const std::string& dbname, DB** dbptr) {
147 *dbptr = nullptr;
148
149 if (options.max_open_files != -1) {
150 return Status::InvalidArgument("require max_open_files = -1");
151 }
152 if (options.merge_operator.get() != nullptr) {
153 return Status::InvalidArgument("merge operator is not supported");
154 }
155 DBOptions db_options(options);
156 std::unique_ptr<CompactedDBImpl> db(new CompactedDBImpl(db_options, dbname));
157 Status s = db->Init(options);
158 if (s.ok()) {
20effc67 159 db->StartPeriodicWorkScheduler();
7c673cae
FG
160 ROCKS_LOG_INFO(db->immutable_db_options_.info_log,
161 "Opened the db as fully compacted mode");
162 LogFlush(db->immutable_db_options_.info_log);
163 *dbptr = db.release();
164 }
165 return s;
166}
167
f67539c2 168} // namespace ROCKSDB_NAMESPACE
7c673cae 169#endif // ROCKSDB_LITE