]> git.proxmox.com Git - ceph.git/blob - ceph/src/kv/LevelDBStore.cc
update sources to v12.1.0
[ceph.git] / ceph / src / kv / LevelDBStore.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 #include "LevelDBStore.h"
4
5 #include <set>
6 #include <map>
7 #include <string>
8 #include <cerrno>
9
10 using std::string;
11
12 #include "include/memory.h"
13
14 #include "common/debug.h"
15 #include "common/perf_counters.h"
16
17 // re-include our assert to clobber the system one; fix dout:
18 #include "include/assert.h"
19
20 #define dout_context cct
21 #define dout_subsys ceph_subsys_leveldb
22 #undef dout_prefix
23 #define dout_prefix *_dout << "leveldb: "
24
25 class CephLevelDBLogger : public leveldb::Logger {
26 CephContext *cct;
27 public:
28 explicit CephLevelDBLogger(CephContext *c) : cct(c) {
29 cct->get();
30 }
31 ~CephLevelDBLogger() override {
32 cct->put();
33 }
34
35 // Write an entry to the log file with the specified format.
36 void Logv(const char* format, va_list ap) override {
37 dout(1);
38 char buf[65536];
39 vsnprintf(buf, sizeof(buf), format, ap);
40 *_dout << buf << dendl;
41 }
42 };
43
44 leveldb::Logger *create_leveldb_ceph_logger()
45 {
46 return new CephLevelDBLogger(g_ceph_context);
47 }
48
49 int LevelDBStore::init(string option_str)
50 {
51 // init defaults. caller can override these if they want
52 // prior to calling open.
53 options.write_buffer_size = g_conf->leveldb_write_buffer_size;
54 options.cache_size = g_conf->leveldb_cache_size;
55 options.block_size = g_conf->leveldb_block_size;
56 options.bloom_size = g_conf->leveldb_bloom_size;
57 options.compression_enabled = g_conf->leveldb_compression;
58 options.paranoid_checks = g_conf->leveldb_paranoid;
59 options.max_open_files = g_conf->leveldb_max_open_files;
60 options.log_file = g_conf->leveldb_log;
61 return 0;
62 }
63
64 int LevelDBStore::do_open(ostream &out, bool create_if_missing)
65 {
66 leveldb::Options ldoptions;
67
68 if (options.write_buffer_size)
69 ldoptions.write_buffer_size = options.write_buffer_size;
70 if (options.max_open_files)
71 ldoptions.max_open_files = options.max_open_files;
72 if (options.cache_size) {
73 leveldb::Cache *_db_cache = leveldb::NewLRUCache(options.cache_size);
74 db_cache.reset(_db_cache);
75 ldoptions.block_cache = db_cache.get();
76 }
77 if (options.block_size)
78 ldoptions.block_size = options.block_size;
79 if (options.bloom_size) {
80 #ifdef HAVE_LEVELDB_FILTER_POLICY
81 const leveldb::FilterPolicy *_filterpolicy =
82 leveldb::NewBloomFilterPolicy(options.bloom_size);
83 filterpolicy.reset(_filterpolicy);
84 ldoptions.filter_policy = filterpolicy.get();
85 #else
86 assert(0 == "bloom size set but installed leveldb doesn't support bloom filters");
87 #endif
88 }
89 if (options.compression_enabled)
90 ldoptions.compression = leveldb::kSnappyCompression;
91 else
92 ldoptions.compression = leveldb::kNoCompression;
93 if (options.block_restart_interval)
94 ldoptions.block_restart_interval = options.block_restart_interval;
95
96 ldoptions.error_if_exists = options.error_if_exists;
97 ldoptions.paranoid_checks = options.paranoid_checks;
98 ldoptions.create_if_missing = create_if_missing;
99
100 if (g_conf->leveldb_log_to_ceph_log) {
101 ceph_logger = new CephLevelDBLogger(g_ceph_context);
102 ldoptions.info_log = ceph_logger;
103 }
104
105 if (options.log_file.length()) {
106 leveldb::Env *env = leveldb::Env::Default();
107 env->NewLogger(options.log_file, &ldoptions.info_log);
108 }
109
110 leveldb::DB *_db;
111 leveldb::Status status = leveldb::DB::Open(ldoptions, path, &_db);
112 db.reset(_db);
113 if (!status.ok()) {
114 out << status.ToString() << std::endl;
115 return -EINVAL;
116 }
117
118 PerfCountersBuilder plb(g_ceph_context, "leveldb", l_leveldb_first, l_leveldb_last);
119 plb.add_u64_counter(l_leveldb_gets, "leveldb_get", "Gets");
120 plb.add_u64_counter(l_leveldb_txns, "leveldb_transaction", "Transactions");
121 plb.add_time_avg(l_leveldb_get_latency, "leveldb_get_latency", "Get Latency");
122 plb.add_time_avg(l_leveldb_submit_latency, "leveldb_submit_latency", "Submit Latency");
123 plb.add_time_avg(l_leveldb_submit_sync_latency, "leveldb_submit_sync_latency", "Submit Sync Latency");
124 plb.add_u64_counter(l_leveldb_compact, "leveldb_compact", "Compactions");
125 plb.add_u64_counter(l_leveldb_compact_range, "leveldb_compact_range", "Compactions by range");
126 plb.add_u64_counter(l_leveldb_compact_queue_merge, "leveldb_compact_queue_merge", "Mergings of ranges in compaction queue");
127 plb.add_u64(l_leveldb_compact_queue_len, "leveldb_compact_queue_len", "Length of compaction queue");
128 logger = plb.create_perf_counters();
129 cct->get_perfcounters_collection()->add(logger);
130
131 if (g_conf->leveldb_compact_on_mount) {
132 derr << "Compacting leveldb store..." << dendl;
133 compact();
134 derr << "Finished compacting leveldb store" << dendl;
135 }
136 return 0;
137 }
138
139 int LevelDBStore::_test_init(const string& dir)
140 {
141 leveldb::Options options;
142 options.create_if_missing = true;
143 leveldb::DB *db;
144 leveldb::Status status = leveldb::DB::Open(options, dir, &db);
145 delete db;
146 return status.ok() ? 0 : -EIO;
147 }
148
149 LevelDBStore::~LevelDBStore()
150 {
151 close();
152 delete logger;
153 delete ceph_logger;
154
155 // Ensure db is destroyed before dependent db_cache and filterpolicy
156 db.reset();
157 }
158
159 void LevelDBStore::close()
160 {
161 // stop compaction thread
162 compact_queue_lock.Lock();
163 if (compact_thread.is_started()) {
164 compact_queue_stop = true;
165 compact_queue_cond.Signal();
166 compact_queue_lock.Unlock();
167 compact_thread.join();
168 } else {
169 compact_queue_lock.Unlock();
170 }
171
172 if (logger)
173 cct->get_perfcounters_collection()->remove(logger);
174 }
175
176 int LevelDBStore::submit_transaction(KeyValueDB::Transaction t)
177 {
178 utime_t start = ceph_clock_now();
179 LevelDBTransactionImpl * _t =
180 static_cast<LevelDBTransactionImpl *>(t.get());
181 leveldb::Status s = db->Write(leveldb::WriteOptions(), &(_t->bat));
182 utime_t lat = ceph_clock_now() - start;
183 logger->inc(l_leveldb_txns);
184 logger->tinc(l_leveldb_submit_latency, lat);
185 return s.ok() ? 0 : -1;
186 }
187
188 int LevelDBStore::submit_transaction_sync(KeyValueDB::Transaction t)
189 {
190 utime_t start = ceph_clock_now();
191 LevelDBTransactionImpl * _t =
192 static_cast<LevelDBTransactionImpl *>(t.get());
193 leveldb::WriteOptions options;
194 options.sync = true;
195 leveldb::Status s = db->Write(options, &(_t->bat));
196 utime_t lat = ceph_clock_now() - start;
197 logger->inc(l_leveldb_txns);
198 logger->tinc(l_leveldb_submit_sync_latency, lat);
199 return s.ok() ? 0 : -1;
200 }
201
202 void LevelDBStore::LevelDBTransactionImpl::set(
203 const string &prefix,
204 const string &k,
205 const bufferlist &to_set_bl)
206 {
207 string key = combine_strings(prefix, k);
208 size_t bllen = to_set_bl.length();
209 // bufferlist::c_str() is non-constant, so we can't call c_str()
210 if (to_set_bl.is_contiguous() && bllen > 0) {
211 // bufferlist contains just one ptr or they're contiguous
212 bat.Put(leveldb::Slice(key), leveldb::Slice(to_set_bl.buffers().front().c_str(), bllen));
213 } else if ((bllen <= 32 * 1024) && (bllen > 0)) {
214 // 2+ bufferptrs that are not contiguopus
215 // allocate buffer on stack and copy bl contents to that buffer
216 // make sure the buffer isn't too large or we might crash here...
217 char* slicebuf = (char*) alloca(bllen);
218 leveldb::Slice newslice(slicebuf, bllen);
219 std::list<buffer::ptr>::const_iterator pb;
220 for (pb = to_set_bl.buffers().begin(); pb != to_set_bl.buffers().end(); ++pb) {
221 size_t ptrlen = (*pb).length();
222 memcpy((void*)slicebuf, (*pb).c_str(), ptrlen);
223 slicebuf += ptrlen;
224 }
225 bat.Put(leveldb::Slice(key), newslice);
226 } else {
227 // 2+ bufferptrs that are not contiguous, and enormous in size
228 bufferlist val = to_set_bl;
229 bat.Put(leveldb::Slice(key), leveldb::Slice(val.c_str(), val.length()));
230 }
231 }
232
233 void LevelDBStore::LevelDBTransactionImpl::rmkey(const string &prefix,
234 const string &k)
235 {
236 string key = combine_strings(prefix, k);
237 bat.Delete(leveldb::Slice(key));
238 }
239
240 void LevelDBStore::LevelDBTransactionImpl::rmkeys_by_prefix(const string &prefix)
241 {
242 KeyValueDB::Iterator it = db->get_iterator(prefix);
243 for (it->seek_to_first();
244 it->valid();
245 it->next()) {
246 bat.Delete(leveldb::Slice(combine_strings(prefix, it->key())));
247 }
248 }
249
250 void LevelDBStore::LevelDBTransactionImpl::rm_range_keys(const string &prefix, const string &start, const string &end)
251 {
252 KeyValueDB::Iterator it = db->get_iterator(prefix);
253 it->lower_bound(start);
254 while (it->valid()) {
255 if (it->key() >= end) {
256 break;
257 }
258 bat.Delete(combine_strings(prefix, it->key()));
259 it->next();
260 }
261 }
262
263 int LevelDBStore::get(
264 const string &prefix,
265 const std::set<string> &keys,
266 std::map<string, bufferlist> *out)
267 {
268 utime_t start = ceph_clock_now();
269 for (std::set<string>::const_iterator i = keys.begin();
270 i != keys.end(); ++i) {
271 std::string value;
272 std::string bound = combine_strings(prefix, *i);
273 auto status = db->Get(leveldb::ReadOptions(), leveldb::Slice(bound), &value);
274 if (status.ok())
275 (*out)[*i].append(value);
276 }
277 utime_t lat = ceph_clock_now() - start;
278 logger->inc(l_leveldb_gets);
279 logger->tinc(l_leveldb_get_latency, lat);
280 return 0;
281 }
282
283 int LevelDBStore::get(const string &prefix,
284 const string &key,
285 bufferlist *out)
286 {
287 assert(out && (out->length() == 0));
288 utime_t start = ceph_clock_now();
289 int r = 0;
290 string value, k;
291 leveldb::Status s;
292 k = combine_strings(prefix, key);
293 s = db->Get(leveldb::ReadOptions(), leveldb::Slice(k), &value);
294 if (s.ok()) {
295 out->append(value);
296 } else {
297 r = -ENOENT;
298 }
299 utime_t lat = ceph_clock_now() - start;
300 logger->inc(l_leveldb_gets);
301 logger->tinc(l_leveldb_get_latency, lat);
302 return r;
303 }
304
305 string LevelDBStore::combine_strings(const string &prefix, const string &value)
306 {
307 string out = prefix;
308 out.push_back(0);
309 out.append(value);
310 return out;
311 }
312
313 bufferlist LevelDBStore::to_bufferlist(leveldb::Slice in)
314 {
315 bufferlist bl;
316 bl.append(bufferptr(in.data(), in.size()));
317 return bl;
318 }
319
320 int LevelDBStore::split_key(leveldb::Slice in, string *prefix, string *key)
321 {
322 size_t prefix_len = 0;
323
324 // Find separator inside Slice
325 char* separator = (char*) memchr(in.data(), 0, in.size());
326 if (separator == NULL)
327 return -EINVAL;
328 prefix_len = size_t(separator - in.data());
329 if (prefix_len >= in.size())
330 return -EINVAL;
331
332 if (prefix)
333 *prefix = string(in.data(), prefix_len);
334 if (key)
335 *key = string(separator+1, in.size() - prefix_len - 1);
336 return 0;
337 }
338
339 void LevelDBStore::compact()
340 {
341 logger->inc(l_leveldb_compact);
342 db->CompactRange(NULL, NULL);
343 }
344
345
346 void LevelDBStore::compact_thread_entry()
347 {
348 compact_queue_lock.Lock();
349 while (!compact_queue_stop) {
350 while (!compact_queue.empty()) {
351 pair<string,string> range = compact_queue.front();
352 compact_queue.pop_front();
353 logger->set(l_leveldb_compact_queue_len, compact_queue.size());
354 compact_queue_lock.Unlock();
355 logger->inc(l_leveldb_compact_range);
356 compact_range(range.first, range.second);
357 compact_queue_lock.Lock();
358 continue;
359 }
360 compact_queue_cond.Wait(compact_queue_lock);
361 }
362 compact_queue_lock.Unlock();
363 }
364
365 void LevelDBStore::compact_range_async(const string& start, const string& end)
366 {
367 Mutex::Locker l(compact_queue_lock);
368
369 // try to merge adjacent ranges. this is O(n), but the queue should
370 // be short. note that we do not cover all overlap cases and merge
371 // opportunities here, but we capture the ones we currently need.
372 list< pair<string,string> >::iterator p = compact_queue.begin();
373 while (p != compact_queue.end()) {
374 if (p->first == start && p->second == end) {
375 // dup; no-op
376 return;
377 }
378 if (p->first <= end && p->first > start) {
379 // merge with existing range to the right
380 compact_queue.push_back(make_pair(start, p->second));
381 compact_queue.erase(p);
382 logger->inc(l_leveldb_compact_queue_merge);
383 break;
384 }
385 if (p->second >= start && p->second < end) {
386 // merge with existing range to the left
387 compact_queue.push_back(make_pair(p->first, end));
388 compact_queue.erase(p);
389 logger->inc(l_leveldb_compact_queue_merge);
390 break;
391 }
392 ++p;
393 }
394 if (p == compact_queue.end()) {
395 // no merge, new entry.
396 compact_queue.push_back(make_pair(start, end));
397 logger->set(l_leveldb_compact_queue_len, compact_queue.size());
398 }
399 compact_queue_cond.Signal();
400 if (!compact_thread.is_started()) {
401 compact_thread.create("levdbst_compact");
402 }
403 }