1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
6 #include "utilities/date_tiered/date_tiered_db_impl.h"
10 #include "db/db_impl.h"
11 #include "db/db_iter.h"
12 #include "db/write_batch_internal.h"
13 #include "monitoring/instrumented_mutex.h"
14 #include "options/options_helper.h"
15 #include "rocksdb/convenience.h"
16 #include "rocksdb/env.h"
17 #include "rocksdb/iterator.h"
18 #include "rocksdb/utilities/date_tiered_db.h"
19 #include "table/merging_iterator.h"
20 #include "util/coding.h"
21 #include "util/filename.h"
22 #include "util/string_util.h"
26 // Open the db inside DateTieredDBImpl because options needs pointer to its ttl
27 DateTieredDBImpl::DateTieredDBImpl(
28 DB
* db
, Options options
,
29 const std::vector
<ColumnFamilyDescriptor
>& descriptors
,
30 const std::vector
<ColumnFamilyHandle
*>& handles
, int64_t ttl
,
31 int64_t column_family_interval
)
33 cf_options_(ColumnFamilyOptions(options
)),
34 ioptions_(ImmutableCFOptions(options
)),
36 column_family_interval_(column_family_interval
),
37 mutex_(options
.statistics
.get(), db
->GetEnv(), DB_MUTEX_WAIT_MICROS
,
38 options
.use_adaptive_mutex
) {
39 latest_timebound_
= std::numeric_limits
<int64_t>::min();
40 for (size_t i
= 0; i
< handles
.size(); ++i
) {
41 const auto& name
= descriptors
[i
].name
;
42 int64_t timestamp
= 0;
44 timestamp
= ParseUint64(name
);
45 } catch (const std::invalid_argument
&) {
46 // Bypass unrelated column family, e.g. default
47 db_
->DestroyColumnFamilyHandle(handles
[i
]);
50 if (timestamp
> latest_timebound_
) {
51 latest_timebound_
= timestamp
;
53 handle_map_
.insert(std::make_pair(timestamp
, handles
[i
]));
57 DateTieredDBImpl::~DateTieredDBImpl() {
58 for (auto handle
: handle_map_
) {
59 db_
->DestroyColumnFamilyHandle(handle
.second
);
65 Status
DateTieredDB::Open(const Options
& options
, const std::string
& dbname
,
66 DateTieredDB
** dbptr
, int64_t ttl
,
67 int64_t column_family_interval
, bool read_only
) {
68 DBOptions
db_options(options
);
69 ColumnFamilyOptions
cf_options(options
);
70 std::vector
<ColumnFamilyDescriptor
> descriptors
;
71 std::vector
<ColumnFamilyHandle
*> handles
;
75 // Get column families
76 std::vector
<std::string
> column_family_names
;
77 s
= DB::ListColumnFamilies(db_options
, dbname
, &column_family_names
);
79 // No column family found. Use default
80 s
= DB::Open(options
, dbname
, &db
);
85 for (auto name
: column_family_names
) {
86 descriptors
.emplace_back(ColumnFamilyDescriptor(name
, cf_options
));
91 s
= DB::OpenForReadOnly(db_options
, dbname
, descriptors
, &handles
, &db
);
93 s
= DB::Open(db_options
, dbname
, descriptors
, &handles
, &db
);
98 *dbptr
= new DateTieredDBImpl(db
, options
, descriptors
, handles
, ttl
,
99 column_family_interval
);
104 // Checks if the string is stale or not according to TTl provided
105 bool DateTieredDBImpl::IsStale(int64_t keytime
, int64_t ttl
, Env
* env
) {
107 // Data is fresh if TTL is non-positive
111 if (!env
->GetCurrentTime(&curtime
).ok()) {
112 // Treat the data as fresh if could not get current time
115 return curtime
>= keytime
+ ttl
;
118 // Drop column family when all data in that column family is expired
119 // TODO(jhli): Can be made a background job
120 Status
DateTieredDBImpl::DropObsoleteColumnFamilies() {
123 s
= db_
->GetEnv()->GetCurrentTime(&curtime
);
128 InstrumentedMutexLock
l(&mutex_
);
129 auto iter
= handle_map_
.begin();
130 while (iter
!= handle_map_
.end()) {
131 if (iter
->first
<= curtime
- ttl_
) {
132 s
= db_
->DropColumnFamily(iter
->second
);
137 iter
= handle_map_
.erase(iter
);
146 // Get timestamp from user key
147 Status
DateTieredDBImpl::GetTimestamp(const Slice
& key
, int64_t* result
) {
148 if (key
.size() < kTSLength
) {
149 return Status::Corruption("Bad timestamp in key");
151 const char* pos
= key
.data() + key
.size() - 8;
152 int64_t timestamp
= 0;
153 if (port::kLittleEndian
) {
154 int bytes_to_fill
= 8;
155 for (int i
= 0; i
< bytes_to_fill
; ++i
) {
156 timestamp
|= (static_cast<uint64_t>(static_cast<unsigned char>(pos
[i
]))
157 << ((bytes_to_fill
- i
- 1) << 3));
160 memcpy(×tamp
, pos
, sizeof(timestamp
));
166 Status
DateTieredDBImpl::CreateColumnFamily(
167 ColumnFamilyHandle
** column_family
) {
171 s
= db_
->GetEnv()->GetCurrentTime(&curtime
);
175 int64_t new_timebound
;
176 if (handle_map_
.empty()) {
177 new_timebound
= curtime
+ column_family_interval_
;
181 ((curtime
- latest_timebound_
) / column_family_interval_
+ 1) *
182 column_family_interval_
;
184 std::string cf_name
= ToString(new_timebound
);
185 latest_timebound_
= new_timebound
;
186 s
= db_
->CreateColumnFamily(cf_options_
, cf_name
, column_family
);
188 handle_map_
.insert(std::make_pair(new_timebound
, *column_family
));
193 Status
DateTieredDBImpl::FindColumnFamily(int64_t keytime
,
194 ColumnFamilyHandle
** column_family
,
195 bool create_if_missing
) {
196 *column_family
= nullptr;
198 InstrumentedMutexLock
l(&mutex_
);
199 auto iter
= handle_map_
.upper_bound(keytime
);
200 if (iter
== handle_map_
.end()) {
201 if (!create_if_missing
) {
202 return Status::NotFound();
204 return CreateColumnFamily(column_family
);
207 // Move to previous element to get the appropriate time window
208 *column_family
= iter
->second
;
213 Status
DateTieredDBImpl::Put(const WriteOptions
& options
, const Slice
& key
,
215 int64_t timestamp
= 0;
217 s
= GetTimestamp(key
, ×tamp
);
221 DropObsoleteColumnFamilies();
223 // Prune request to obsolete data
224 if (IsStale(timestamp
, ttl_
, db_
->GetEnv())) {
225 return Status::InvalidArgument();
228 // Decide column family (i.e. the time window) to put into
229 ColumnFamilyHandle
* column_family
;
230 s
= FindColumnFamily(timestamp
, &column_family
, true /*create_if_missing*/);
235 // Efficiently put with WriteBatch
237 batch
.Put(column_family
, key
, val
);
238 return Write(options
, &batch
);
241 Status
DateTieredDBImpl::Get(const ReadOptions
& options
, const Slice
& key
,
242 std::string
* value
) {
243 int64_t timestamp
= 0;
245 s
= GetTimestamp(key
, ×tamp
);
249 // Prune request to obsolete data
250 if (IsStale(timestamp
, ttl_
, db_
->GetEnv())) {
251 return Status::NotFound();
254 // Decide column family to get from
255 ColumnFamilyHandle
* column_family
;
256 s
= FindColumnFamily(timestamp
, &column_family
, false /*create_if_missing*/);
260 if (column_family
== nullptr) {
261 // Cannot find column family
262 return Status::NotFound();
265 // Get value with key
266 return db_
->Get(options
, column_family
, key
, value
);
269 bool DateTieredDBImpl::KeyMayExist(const ReadOptions
& options
, const Slice
& key
,
270 std::string
* value
, bool* value_found
) {
271 int64_t timestamp
= 0;
273 s
= GetTimestamp(key
, ×tamp
);
275 // Cannot get current time
278 // Decide column family to get from
279 ColumnFamilyHandle
* column_family
;
280 s
= FindColumnFamily(timestamp
, &column_family
, false /*create_if_missing*/);
281 if (!s
.ok() || column_family
== nullptr) {
282 // Cannot find column family
285 if (IsStale(timestamp
, ttl_
, db_
->GetEnv())) {
288 return db_
->KeyMayExist(options
, column_family
, key
, value
, value_found
);
291 Status
DateTieredDBImpl::Delete(const WriteOptions
& options
, const Slice
& key
) {
292 int64_t timestamp
= 0;
294 s
= GetTimestamp(key
, ×tamp
);
298 DropObsoleteColumnFamilies();
299 // Prune request to obsolete data
300 if (IsStale(timestamp
, ttl_
, db_
->GetEnv())) {
301 return Status::NotFound();
304 // Decide column family to get from
305 ColumnFamilyHandle
* column_family
;
306 s
= FindColumnFamily(timestamp
, &column_family
, false /*create_if_missing*/);
310 if (column_family
== nullptr) {
311 // Cannot find column family
312 return Status::NotFound();
315 // Get value with key
316 return db_
->Delete(options
, column_family
, key
);
319 Status
DateTieredDBImpl::Merge(const WriteOptions
& options
, const Slice
& key
,
320 const Slice
& value
) {
321 // Decide column family to get from
322 int64_t timestamp
= 0;
324 s
= GetTimestamp(key
, ×tamp
);
326 // Cannot get current time
329 ColumnFamilyHandle
* column_family
;
330 s
= FindColumnFamily(timestamp
, &column_family
, true /*create_if_missing*/);
335 batch
.Merge(column_family
, key
, value
);
336 return Write(options
, &batch
);
339 Status
DateTieredDBImpl::Write(const WriteOptions
& opts
, WriteBatch
* updates
) {
340 class Handler
: public WriteBatch::Handler
{
342 explicit Handler() {}
343 WriteBatch updates_ttl
;
344 Status batch_rewrite_status
;
345 virtual Status
PutCF(uint32_t column_family_id
, const Slice
& key
,
346 const Slice
& value
) override
{
347 WriteBatchInternal::Put(&updates_ttl
, column_family_id
, key
, value
);
350 virtual Status
MergeCF(uint32_t column_family_id
, const Slice
& key
,
351 const Slice
& value
) override
{
352 WriteBatchInternal::Merge(&updates_ttl
, column_family_id
, key
, value
);
355 virtual Status
DeleteCF(uint32_t column_family_id
,
356 const Slice
& key
) override
{
357 WriteBatchInternal::Delete(&updates_ttl
, column_family_id
, key
);
360 virtual void LogData(const Slice
& blob
) override
{
361 updates_ttl
.PutLogData(blob
);
365 updates
->Iterate(&handler
);
366 if (!handler
.batch_rewrite_status
.ok()) {
367 return handler
.batch_rewrite_status
;
369 return db_
->Write(opts
, &(handler
.updates_ttl
));
373 Iterator
* DateTieredDBImpl::NewIterator(const ReadOptions
& opts
) {
374 if (handle_map_
.empty()) {
375 return NewEmptyIterator();
378 DBImpl
* db_impl
= reinterpret_cast<DBImpl
*>(db_
);
380 auto db_iter
= NewArenaWrappedDbIterator(
381 db_impl
->GetEnv(), opts
, ioptions_
, cf_options_
.comparator
,
382 kMaxSequenceNumber
, cf_options_
.max_sequential_skip_in_iterations
, 0);
384 auto arena
= db_iter
->GetArena();
385 MergeIteratorBuilder
builder(cf_options_
.comparator
, arena
);
386 for (auto& item
: handle_map_
) {
387 auto handle
= item
.second
;
388 builder
.AddIterator(db_impl
->NewInternalIterator(
389 arena
, db_iter
->GetRangeDelAggregator(), handle
));
391 auto internal_iter
= builder
.Finish();
392 db_iter
->SetIterUnderDBIter(internal_iter
);
395 } // namespace rocksdb
396 #endif // ROCKSDB_LITE