1 // Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
2 // This source code is licensed under both the GPLv2 (found in the
3 // COPYING file in the root directory) and Apache 2.0 License
4 // (found in the LICENSE.Apache file in the root directory).
7 #include "db/compacted_db_impl.h"
9 #include "db/db_impl/db_impl.h"
10 #include "db/version_set.h"
11 #include "table/get_context.h"
12 #include "util/cast_util.h"
14 namespace ROCKSDB_NAMESPACE
{
16 extern void MarkKeyMayExist(void* arg
);
17 extern bool SaveValue(void* arg
, const ParsedInternalKey
& parsed_key
,
18 const Slice
& v
, bool hit_and_return
);
20 CompactedDBImpl::CompactedDBImpl(
21 const DBOptions
& options
, const std::string
& dbname
)
22 : DBImpl(options
, dbname
), cfd_(nullptr), version_(nullptr),
23 user_comparator_(nullptr) {
26 CompactedDBImpl::~CompactedDBImpl() {
29 size_t CompactedDBImpl::FindFile(const Slice
& key
) {
30 size_t right
= files_
.num_files
- 1;
31 auto cmp
= [&](const FdWithKeyRange
& f
, const Slice
& k
) -> bool {
32 return user_comparator_
->Compare(ExtractUserKey(f
.largest_key
), k
) < 0;
34 return static_cast<size_t>(std::lower_bound(files_
.files
,
35 files_
.files
+ right
, key
, cmp
) - files_
.files
);
38 Status
CompactedDBImpl::Get(const ReadOptions
& options
, ColumnFamilyHandle
*,
39 const Slice
& key
, PinnableSlice
* value
) {
40 GetContext
get_context(user_comparator_
, nullptr, nullptr, nullptr,
41 GetContext::kNotFound
, key
, value
, nullptr, nullptr,
42 nullptr, true, nullptr, nullptr);
43 LookupKey
lkey(key
, kMaxSequenceNumber
);
44 Status s
= files_
.files
[FindFile(key
)].fd
.table_reader
->Get(
45 options
, lkey
.internal_key(), &get_context
, nullptr);
46 if (!s
.ok() && !s
.IsNotFound()) {
49 if (get_context
.State() == GetContext::kFound
) {
52 return Status::NotFound();
55 std::vector
<Status
> CompactedDBImpl::MultiGet(const ReadOptions
& options
,
56 const std::vector
<ColumnFamilyHandle
*>&,
57 const std::vector
<Slice
>& keys
, std::vector
<std::string
>* values
) {
58 autovector
<TableReader
*, 16> reader_list
;
59 for (const auto& key
: keys
) {
60 const FdWithKeyRange
& f
= files_
.files
[FindFile(key
)];
61 if (user_comparator_
->Compare(key
, ExtractUserKey(f
.smallest_key
)) < 0) {
62 reader_list
.push_back(nullptr);
64 LookupKey
lkey(key
, kMaxSequenceNumber
);
65 f
.fd
.table_reader
->Prepare(lkey
.internal_key());
66 reader_list
.push_back(f
.fd
.table_reader
);
69 std::vector
<Status
> statuses(keys
.size(), Status::NotFound());
70 values
->resize(keys
.size());
72 for (auto* r
: reader_list
) {
74 PinnableSlice pinnable_val
;
75 std::string
& value
= (*values
)[idx
];
76 GetContext
get_context(user_comparator_
, nullptr, nullptr, nullptr,
77 GetContext::kNotFound
, keys
[idx
], &pinnable_val
,
78 nullptr, nullptr, nullptr, true, nullptr, nullptr);
79 LookupKey
lkey(keys
[idx
], kMaxSequenceNumber
);
80 Status s
= r
->Get(options
, lkey
.internal_key(), &get_context
, nullptr);
81 if (!s
.ok() && !s
.IsNotFound()) {
84 value
.assign(pinnable_val
.data(), pinnable_val
.size());
85 if (get_context
.State() == GetContext::kFound
) {
86 statuses
[idx
] = Status::OK();
95 Status
CompactedDBImpl::Init(const Options
& options
) {
96 SuperVersionContext
sv_context(/* create_superversion */ true);
98 ColumnFamilyDescriptor
cf(kDefaultColumnFamilyName
,
99 ColumnFamilyOptions(options
));
100 Status s
= Recover({cf
}, true /* read only */, false, true);
102 cfd_
= static_cast_with_check
<ColumnFamilyHandleImpl
>(DefaultColumnFamily())
104 cfd_
->InstallSuperVersion(&sv_context
, &mutex_
);
111 NewThreadStatusCfInfo(cfd_
);
112 version_
= cfd_
->GetSuperVersion()->current
;
113 user_comparator_
= cfd_
->user_comparator();
114 auto* vstorage
= version_
->storage_info();
115 if (vstorage
->num_non_empty_levels() == 0) {
116 return Status::NotSupported("no file exists");
118 const LevelFilesBrief
& l0
= vstorage
->LevelFilesBrief(0);
119 // L0 should not have files
120 if (l0
.num_files
> 1) {
121 return Status::NotSupported("L0 contain more than 1 file");
123 if (l0
.num_files
== 1) {
124 if (vstorage
->num_non_empty_levels() > 1) {
125 return Status::NotSupported("Both L0 and other level contain files");
131 for (int i
= 1; i
< vstorage
->num_non_empty_levels() - 1; ++i
) {
132 if (vstorage
->LevelFilesBrief(i
).num_files
> 0) {
133 return Status::NotSupported("Other levels also contain files");
137 int level
= vstorage
->num_non_empty_levels() - 1;
138 if (vstorage
->LevelFilesBrief(level
).num_files
> 0) {
139 files_
= vstorage
->LevelFilesBrief(level
);
142 return Status::NotSupported("no file exists");
145 Status
CompactedDBImpl::Open(const Options
& options
,
146 const std::string
& dbname
, DB
** dbptr
) {
149 if (options
.max_open_files
!= -1) {
150 return Status::InvalidArgument("require max_open_files = -1");
152 if (options
.merge_operator
.get() != nullptr) {
153 return Status::InvalidArgument("merge operator is not supported");
155 DBOptions
db_options(options
);
156 std::unique_ptr
<CompactedDBImpl
> db(new CompactedDBImpl(db_options
, dbname
));
157 Status s
= db
->Init(options
);
159 db
->StartPeriodicWorkScheduler();
160 ROCKS_LOG_INFO(db
->immutable_db_options_
.info_log
,
161 "Opened the db as fully compacted mode");
162 LogFlush(db
->immutable_db_options_
.info_log
);
163 *dbptr
= db
.release();
168 } // namespace ROCKSDB_NAMESPACE
169 #endif // ROCKSDB_LITE