]> git.proxmox.com Git - ceph.git/blob - ceph/src/rocksdb/utilities/date_tiered/date_tiered_db_impl.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / rocksdb / utilities / date_tiered / date_tiered_db_impl.cc
1 // Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2 // Use of this source code is governed by a BSD-style license that can be
3 // found in the LICENSE file. See the AUTHORS file for names of contributors.
4 #ifndef ROCKSDB_LITE
5
6 #include "utilities/date_tiered/date_tiered_db_impl.h"
7
8 #include <limits>
9
10 #include "db/db_impl.h"
11 #include "db/db_iter.h"
12 #include "db/write_batch_internal.h"
13 #include "monitoring/instrumented_mutex.h"
14 #include "options/options_helper.h"
15 #include "rocksdb/convenience.h"
16 #include "rocksdb/env.h"
17 #include "rocksdb/iterator.h"
18 #include "rocksdb/utilities/date_tiered_db.h"
19 #include "table/merging_iterator.h"
20 #include "util/coding.h"
21 #include "util/filename.h"
22 #include "util/string_util.h"
23
24 namespace rocksdb {
25
26 // Open the db inside DateTieredDBImpl because options needs pointer to its ttl
27 DateTieredDBImpl::DateTieredDBImpl(
28 DB* db, Options options,
29 const std::vector<ColumnFamilyDescriptor>& descriptors,
30 const std::vector<ColumnFamilyHandle*>& handles, int64_t ttl,
31 int64_t column_family_interval)
32 : db_(db),
33 cf_options_(ColumnFamilyOptions(options)),
34 ioptions_(ImmutableCFOptions(options)),
35 ttl_(ttl),
36 column_family_interval_(column_family_interval),
37 mutex_(options.statistics.get(), db->GetEnv(), DB_MUTEX_WAIT_MICROS,
38 options.use_adaptive_mutex) {
39 latest_timebound_ = std::numeric_limits<int64_t>::min();
40 for (size_t i = 0; i < handles.size(); ++i) {
41 const auto& name = descriptors[i].name;
42 int64_t timestamp = 0;
43 try {
44 timestamp = ParseUint64(name);
45 } catch (const std::invalid_argument&) {
46 // Bypass unrelated column family, e.g. default
47 db_->DestroyColumnFamilyHandle(handles[i]);
48 continue;
49 }
50 if (timestamp > latest_timebound_) {
51 latest_timebound_ = timestamp;
52 }
53 handle_map_.insert(std::make_pair(timestamp, handles[i]));
54 }
55 }
56
57 DateTieredDBImpl::~DateTieredDBImpl() {
58 for (auto handle : handle_map_) {
59 db_->DestroyColumnFamilyHandle(handle.second);
60 }
61 delete db_;
62 db_ = nullptr;
63 }
64
65 Status DateTieredDB::Open(const Options& options, const std::string& dbname,
66 DateTieredDB** dbptr, int64_t ttl,
67 int64_t column_family_interval, bool read_only) {
68 DBOptions db_options(options);
69 ColumnFamilyOptions cf_options(options);
70 std::vector<ColumnFamilyDescriptor> descriptors;
71 std::vector<ColumnFamilyHandle*> handles;
72 DB* db;
73 Status s;
74
75 // Get column families
76 std::vector<std::string> column_family_names;
77 s = DB::ListColumnFamilies(db_options, dbname, &column_family_names);
78 if (!s.ok()) {
79 // No column family found. Use default
80 s = DB::Open(options, dbname, &db);
81 if (!s.ok()) {
82 return s;
83 }
84 } else {
85 for (auto name : column_family_names) {
86 descriptors.emplace_back(ColumnFamilyDescriptor(name, cf_options));
87 }
88
89 // Open database
90 if (read_only) {
91 s = DB::OpenForReadOnly(db_options, dbname, descriptors, &handles, &db);
92 } else {
93 s = DB::Open(db_options, dbname, descriptors, &handles, &db);
94 }
95 }
96
97 if (s.ok()) {
98 *dbptr = new DateTieredDBImpl(db, options, descriptors, handles, ttl,
99 column_family_interval);
100 }
101 return s;
102 }
103
104 // Checks if the string is stale or not according to TTl provided
105 bool DateTieredDBImpl::IsStale(int64_t keytime, int64_t ttl, Env* env) {
106 if (ttl <= 0) {
107 // Data is fresh if TTL is non-positive
108 return false;
109 }
110 int64_t curtime;
111 if (!env->GetCurrentTime(&curtime).ok()) {
112 // Treat the data as fresh if could not get current time
113 return false;
114 }
115 return curtime >= keytime + ttl;
116 }
117
118 // Drop column family when all data in that column family is expired
119 // TODO(jhli): Can be made a background job
120 Status DateTieredDBImpl::DropObsoleteColumnFamilies() {
121 int64_t curtime;
122 Status s;
123 s = db_->GetEnv()->GetCurrentTime(&curtime);
124 if (!s.ok()) {
125 return s;
126 }
127 {
128 InstrumentedMutexLock l(&mutex_);
129 auto iter = handle_map_.begin();
130 while (iter != handle_map_.end()) {
131 if (iter->first <= curtime - ttl_) {
132 s = db_->DropColumnFamily(iter->second);
133 if (!s.ok()) {
134 return s;
135 }
136 delete iter->second;
137 iter = handle_map_.erase(iter);
138 } else {
139 break;
140 }
141 }
142 }
143 return Status::OK();
144 }
145
146 // Get timestamp from user key
147 Status DateTieredDBImpl::GetTimestamp(const Slice& key, int64_t* result) {
148 if (key.size() < kTSLength) {
149 return Status::Corruption("Bad timestamp in key");
150 }
151 const char* pos = key.data() + key.size() - 8;
152 int64_t timestamp = 0;
153 if (port::kLittleEndian) {
154 int bytes_to_fill = 8;
155 for (int i = 0; i < bytes_to_fill; ++i) {
156 timestamp |= (static_cast<uint64_t>(static_cast<unsigned char>(pos[i]))
157 << ((bytes_to_fill - i - 1) << 3));
158 }
159 } else {
160 memcpy(&timestamp, pos, sizeof(timestamp));
161 }
162 *result = timestamp;
163 return Status::OK();
164 }
165
166 Status DateTieredDBImpl::CreateColumnFamily(
167 ColumnFamilyHandle** column_family) {
168 int64_t curtime;
169 Status s;
170 mutex_.AssertHeld();
171 s = db_->GetEnv()->GetCurrentTime(&curtime);
172 if (!s.ok()) {
173 return s;
174 }
175 int64_t new_timebound;
176 if (handle_map_.empty()) {
177 new_timebound = curtime + column_family_interval_;
178 } else {
179 new_timebound =
180 latest_timebound_ +
181 ((curtime - latest_timebound_) / column_family_interval_ + 1) *
182 column_family_interval_;
183 }
184 std::string cf_name = ToString(new_timebound);
185 latest_timebound_ = new_timebound;
186 s = db_->CreateColumnFamily(cf_options_, cf_name, column_family);
187 if (s.ok()) {
188 handle_map_.insert(std::make_pair(new_timebound, *column_family));
189 }
190 return s;
191 }
192
193 Status DateTieredDBImpl::FindColumnFamily(int64_t keytime,
194 ColumnFamilyHandle** column_family,
195 bool create_if_missing) {
196 *column_family = nullptr;
197 {
198 InstrumentedMutexLock l(&mutex_);
199 auto iter = handle_map_.upper_bound(keytime);
200 if (iter == handle_map_.end()) {
201 if (!create_if_missing) {
202 return Status::NotFound();
203 } else {
204 return CreateColumnFamily(column_family);
205 }
206 }
207 // Move to previous element to get the appropriate time window
208 *column_family = iter->second;
209 }
210 return Status::OK();
211 }
212
213 Status DateTieredDBImpl::Put(const WriteOptions& options, const Slice& key,
214 const Slice& val) {
215 int64_t timestamp = 0;
216 Status s;
217 s = GetTimestamp(key, &timestamp);
218 if (!s.ok()) {
219 return s;
220 }
221 DropObsoleteColumnFamilies();
222
223 // Prune request to obsolete data
224 if (IsStale(timestamp, ttl_, db_->GetEnv())) {
225 return Status::InvalidArgument();
226 }
227
228 // Decide column family (i.e. the time window) to put into
229 ColumnFamilyHandle* column_family;
230 s = FindColumnFamily(timestamp, &column_family, true /*create_if_missing*/);
231 if (!s.ok()) {
232 return s;
233 }
234
235 // Efficiently put with WriteBatch
236 WriteBatch batch;
237 batch.Put(column_family, key, val);
238 return Write(options, &batch);
239 }
240
241 Status DateTieredDBImpl::Get(const ReadOptions& options, const Slice& key,
242 std::string* value) {
243 int64_t timestamp = 0;
244 Status s;
245 s = GetTimestamp(key, &timestamp);
246 if (!s.ok()) {
247 return s;
248 }
249 // Prune request to obsolete data
250 if (IsStale(timestamp, ttl_, db_->GetEnv())) {
251 return Status::NotFound();
252 }
253
254 // Decide column family to get from
255 ColumnFamilyHandle* column_family;
256 s = FindColumnFamily(timestamp, &column_family, false /*create_if_missing*/);
257 if (!s.ok()) {
258 return s;
259 }
260 if (column_family == nullptr) {
261 // Cannot find column family
262 return Status::NotFound();
263 }
264
265 // Get value with key
266 return db_->Get(options, column_family, key, value);
267 }
268
269 bool DateTieredDBImpl::KeyMayExist(const ReadOptions& options, const Slice& key,
270 std::string* value, bool* value_found) {
271 int64_t timestamp = 0;
272 Status s;
273 s = GetTimestamp(key, &timestamp);
274 if (!s.ok()) {
275 // Cannot get current time
276 return false;
277 }
278 // Decide column family to get from
279 ColumnFamilyHandle* column_family;
280 s = FindColumnFamily(timestamp, &column_family, false /*create_if_missing*/);
281 if (!s.ok() || column_family == nullptr) {
282 // Cannot find column family
283 return false;
284 }
285 if (IsStale(timestamp, ttl_, db_->GetEnv())) {
286 return false;
287 }
288 return db_->KeyMayExist(options, column_family, key, value, value_found);
289 }
290
291 Status DateTieredDBImpl::Delete(const WriteOptions& options, const Slice& key) {
292 int64_t timestamp = 0;
293 Status s;
294 s = GetTimestamp(key, &timestamp);
295 if (!s.ok()) {
296 return s;
297 }
298 DropObsoleteColumnFamilies();
299 // Prune request to obsolete data
300 if (IsStale(timestamp, ttl_, db_->GetEnv())) {
301 return Status::NotFound();
302 }
303
304 // Decide column family to get from
305 ColumnFamilyHandle* column_family;
306 s = FindColumnFamily(timestamp, &column_family, false /*create_if_missing*/);
307 if (!s.ok()) {
308 return s;
309 }
310 if (column_family == nullptr) {
311 // Cannot find column family
312 return Status::NotFound();
313 }
314
315 // Get value with key
316 return db_->Delete(options, column_family, key);
317 }
318
319 Status DateTieredDBImpl::Merge(const WriteOptions& options, const Slice& key,
320 const Slice& value) {
321 // Decide column family to get from
322 int64_t timestamp = 0;
323 Status s;
324 s = GetTimestamp(key, &timestamp);
325 if (!s.ok()) {
326 // Cannot get current time
327 return s;
328 }
329 ColumnFamilyHandle* column_family;
330 s = FindColumnFamily(timestamp, &column_family, true /*create_if_missing*/);
331 if (!s.ok()) {
332 return s;
333 }
334 WriteBatch batch;
335 batch.Merge(column_family, key, value);
336 return Write(options, &batch);
337 }
338
339 Status DateTieredDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
340 class Handler : public WriteBatch::Handler {
341 public:
342 explicit Handler() {}
343 WriteBatch updates_ttl;
344 Status batch_rewrite_status;
345 virtual Status PutCF(uint32_t column_family_id, const Slice& key,
346 const Slice& value) override {
347 WriteBatchInternal::Put(&updates_ttl, column_family_id, key, value);
348 return Status::OK();
349 }
350 virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
351 const Slice& value) override {
352 WriteBatchInternal::Merge(&updates_ttl, column_family_id, key, value);
353 return Status::OK();
354 }
355 virtual Status DeleteCF(uint32_t column_family_id,
356 const Slice& key) override {
357 WriteBatchInternal::Delete(&updates_ttl, column_family_id, key);
358 return Status::OK();
359 }
360 virtual void LogData(const Slice& blob) override {
361 updates_ttl.PutLogData(blob);
362 }
363 };
364 Handler handler;
365 updates->Iterate(&handler);
366 if (!handler.batch_rewrite_status.ok()) {
367 return handler.batch_rewrite_status;
368 } else {
369 return db_->Write(opts, &(handler.updates_ttl));
370 }
371 }
372
373 Iterator* DateTieredDBImpl::NewIterator(const ReadOptions& opts) {
374 if (handle_map_.empty()) {
375 return NewEmptyIterator();
376 }
377
378 DBImpl* db_impl = reinterpret_cast<DBImpl*>(db_);
379
380 auto db_iter = NewArenaWrappedDbIterator(
381 db_impl->GetEnv(), opts, ioptions_, cf_options_.comparator,
382 kMaxSequenceNumber, cf_options_.max_sequential_skip_in_iterations, 0);
383
384 auto arena = db_iter->GetArena();
385 MergeIteratorBuilder builder(cf_options_.comparator, arena);
386 for (auto& item : handle_map_) {
387 auto handle = item.second;
388 builder.AddIterator(db_impl->NewInternalIterator(
389 arena, db_iter->GetRangeDelAggregator(), handle));
390 }
391 auto internal_iter = builder.Finish();
392 db_iter->SetIterUnderDBIter(internal_iter);
393 return db_iter;
394 }
395 } // namespace rocksdb
396 #endif // ROCKSDB_LITE