]> git.proxmox.com Git - ceph.git/blob - ceph/src/kv/LevelDBStore.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / kv / LevelDBStore.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 #include "LevelDBStore.h"
4
5 #include <set>
6 #include <map>
7 #include <string>
8 #include "include/memory.h"
9 #include <errno.h>
10 using std::string;
11 #include "common/debug.h"
12 #include "common/perf_counters.h"
13
14 #define dout_context cct
15 #define dout_subsys ceph_subsys_leveldb
16 #undef dout_prefix
17 #define dout_prefix *_dout << "leveldb: "
18
19 class CephLevelDBLogger : public leveldb::Logger {
20 CephContext *cct;
21 public:
22 explicit CephLevelDBLogger(CephContext *c) : cct(c) {
23 cct->get();
24 }
25 ~CephLevelDBLogger() override {
26 cct->put();
27 }
28
29 // Write an entry to the log file with the specified format.
30 void Logv(const char* format, va_list ap) override {
31 dout(1);
32 char buf[65536];
33 vsnprintf(buf, sizeof(buf), format, ap);
34 *_dout << buf << dendl;
35 }
36 };
37
38 leveldb::Logger *create_leveldb_ceph_logger()
39 {
40 return new CephLevelDBLogger(g_ceph_context);
41 }
42
43 int LevelDBStore::init(string option_str)
44 {
45 // init defaults. caller can override these if they want
46 // prior to calling open.
47 options.write_buffer_size = g_conf->leveldb_write_buffer_size;
48 options.cache_size = g_conf->leveldb_cache_size;
49 options.block_size = g_conf->leveldb_block_size;
50 options.bloom_size = g_conf->leveldb_bloom_size;
51 options.compression_enabled = g_conf->leveldb_compression;
52 options.paranoid_checks = g_conf->leveldb_paranoid;
53 options.max_open_files = g_conf->leveldb_max_open_files;
54 options.log_file = g_conf->leveldb_log;
55 return 0;
56 }
57
58 int LevelDBStore::do_open(ostream &out, bool create_if_missing)
59 {
60 leveldb::Options ldoptions;
61
62 if (options.write_buffer_size)
63 ldoptions.write_buffer_size = options.write_buffer_size;
64 if (options.max_open_files)
65 ldoptions.max_open_files = options.max_open_files;
66 if (options.cache_size) {
67 leveldb::Cache *_db_cache = leveldb::NewLRUCache(options.cache_size);
68 db_cache.reset(_db_cache);
69 ldoptions.block_cache = db_cache.get();
70 }
71 if (options.block_size)
72 ldoptions.block_size = options.block_size;
73 if (options.bloom_size) {
74 #ifdef HAVE_LEVELDB_FILTER_POLICY
75 const leveldb::FilterPolicy *_filterpolicy =
76 leveldb::NewBloomFilterPolicy(options.bloom_size);
77 filterpolicy.reset(_filterpolicy);
78 ldoptions.filter_policy = filterpolicy.get();
79 #else
80 assert(0 == "bloom size set but installed leveldb doesn't support bloom filters");
81 #endif
82 }
83 if (options.compression_enabled)
84 ldoptions.compression = leveldb::kSnappyCompression;
85 else
86 ldoptions.compression = leveldb::kNoCompression;
87 if (options.block_restart_interval)
88 ldoptions.block_restart_interval = options.block_restart_interval;
89
90 ldoptions.error_if_exists = options.error_if_exists;
91 ldoptions.paranoid_checks = options.paranoid_checks;
92 ldoptions.create_if_missing = create_if_missing;
93
94 if (g_conf->leveldb_log_to_ceph_log) {
95 ceph_logger = new CephLevelDBLogger(g_ceph_context);
96 ldoptions.info_log = ceph_logger;
97 }
98
99 if (options.log_file.length()) {
100 leveldb::Env *env = leveldb::Env::Default();
101 env->NewLogger(options.log_file, &ldoptions.info_log);
102 }
103
104 leveldb::DB *_db;
105 leveldb::Status status = leveldb::DB::Open(ldoptions, path, &_db);
106 db.reset(_db);
107 if (!status.ok()) {
108 out << status.ToString() << std::endl;
109 return -EINVAL;
110 }
111
112 PerfCountersBuilder plb(g_ceph_context, "leveldb", l_leveldb_first, l_leveldb_last);
113 plb.add_u64_counter(l_leveldb_gets, "leveldb_get", "Gets");
114 plb.add_u64_counter(l_leveldb_txns, "leveldb_transaction", "Transactions");
115 plb.add_time_avg(l_leveldb_get_latency, "leveldb_get_latency", "Get Latency");
116 plb.add_time_avg(l_leveldb_submit_latency, "leveldb_submit_latency", "Submit Latency");
117 plb.add_time_avg(l_leveldb_submit_sync_latency, "leveldb_submit_sync_latency", "Submit Sync Latency");
118 plb.add_u64_counter(l_leveldb_compact, "leveldb_compact", "Compactions");
119 plb.add_u64_counter(l_leveldb_compact_range, "leveldb_compact_range", "Compactions by range");
120 plb.add_u64_counter(l_leveldb_compact_queue_merge, "leveldb_compact_queue_merge", "Mergings of ranges in compaction queue");
121 plb.add_u64(l_leveldb_compact_queue_len, "leveldb_compact_queue_len", "Length of compaction queue");
122 logger = plb.create_perf_counters();
123 cct->get_perfcounters_collection()->add(logger);
124
125 if (g_conf->leveldb_compact_on_mount) {
126 derr << "Compacting leveldb store..." << dendl;
127 compact();
128 derr << "Finished compacting leveldb store" << dendl;
129 }
130 return 0;
131 }
132
133 int LevelDBStore::_test_init(const string& dir)
134 {
135 leveldb::Options options;
136 options.create_if_missing = true;
137 leveldb::DB *db;
138 leveldb::Status status = leveldb::DB::Open(options, dir, &db);
139 delete db;
140 return status.ok() ? 0 : -EIO;
141 }
142
143 LevelDBStore::~LevelDBStore()
144 {
145 close();
146 delete logger;
147 delete ceph_logger;
148
149 // Ensure db is destroyed before dependent db_cache and filterpolicy
150 db.reset();
151 }
152
153 void LevelDBStore::close()
154 {
155 // stop compaction thread
156 compact_queue_lock.Lock();
157 if (compact_thread.is_started()) {
158 compact_queue_stop = true;
159 compact_queue_cond.Signal();
160 compact_queue_lock.Unlock();
161 compact_thread.join();
162 } else {
163 compact_queue_lock.Unlock();
164 }
165
166 if (logger)
167 cct->get_perfcounters_collection()->remove(logger);
168 }
169
170 int LevelDBStore::submit_transaction(KeyValueDB::Transaction t)
171 {
172 utime_t start = ceph_clock_now();
173 LevelDBTransactionImpl * _t =
174 static_cast<LevelDBTransactionImpl *>(t.get());
175 leveldb::Status s = db->Write(leveldb::WriteOptions(), &(_t->bat));
176 utime_t lat = ceph_clock_now() - start;
177 logger->inc(l_leveldb_txns);
178 logger->tinc(l_leveldb_submit_latency, lat);
179 return s.ok() ? 0 : -1;
180 }
181
182 int LevelDBStore::submit_transaction_sync(KeyValueDB::Transaction t)
183 {
184 utime_t start = ceph_clock_now();
185 LevelDBTransactionImpl * _t =
186 static_cast<LevelDBTransactionImpl *>(t.get());
187 leveldb::WriteOptions options;
188 options.sync = true;
189 leveldb::Status s = db->Write(options, &(_t->bat));
190 utime_t lat = ceph_clock_now() - start;
191 logger->inc(l_leveldb_txns);
192 logger->tinc(l_leveldb_submit_sync_latency, lat);
193 return s.ok() ? 0 : -1;
194 }
195
196 void LevelDBStore::LevelDBTransactionImpl::set(
197 const string &prefix,
198 const string &k,
199 const bufferlist &to_set_bl)
200 {
201 string key = combine_strings(prefix, k);
202 size_t bllen = to_set_bl.length();
203 // bufferlist::c_str() is non-constant, so we can't call c_str()
204 if (to_set_bl.is_contiguous() && bllen > 0) {
205 // bufferlist contains just one ptr or they're contiguous
206 bat.Put(leveldb::Slice(key), leveldb::Slice(to_set_bl.buffers().front().c_str(), bllen));
207 } else if ((bllen <= 32 * 1024) && (bllen > 0)) {
208 // 2+ bufferptrs that are not contiguopus
209 // allocate buffer on stack and copy bl contents to that buffer
210 // make sure the buffer isn't too large or we might crash here...
211 char* slicebuf = (char*) alloca(bllen);
212 leveldb::Slice newslice(slicebuf, bllen);
213 std::list<buffer::ptr>::const_iterator pb;
214 for (pb = to_set_bl.buffers().begin(); pb != to_set_bl.buffers().end(); ++pb) {
215 size_t ptrlen = (*pb).length();
216 memcpy((void*)slicebuf, (*pb).c_str(), ptrlen);
217 slicebuf += ptrlen;
218 }
219 bat.Put(leveldb::Slice(key), newslice);
220 } else {
221 // 2+ bufferptrs that are not contiguous, and enormous in size
222 bufferlist val = to_set_bl;
223 bat.Put(leveldb::Slice(key), leveldb::Slice(val.c_str(), val.length()));
224 }
225 }
226
227 void LevelDBStore::LevelDBTransactionImpl::rmkey(const string &prefix,
228 const string &k)
229 {
230 string key = combine_strings(prefix, k);
231 bat.Delete(leveldb::Slice(key));
232 }
233
234 void LevelDBStore::LevelDBTransactionImpl::rmkeys_by_prefix(const string &prefix)
235 {
236 KeyValueDB::Iterator it = db->get_iterator(prefix);
237 for (it->seek_to_first();
238 it->valid();
239 it->next()) {
240 bat.Delete(leveldb::Slice(combine_strings(prefix, it->key())));
241 }
242 }
243
244 void LevelDBStore::LevelDBTransactionImpl::rm_range_keys(const string &prefix, const string &start, const string &end)
245 {
246 KeyValueDB::Iterator it = db->get_iterator(prefix);
247 it->lower_bound(start);
248 while (it->valid()) {
249 if (it->key() >= end) {
250 break;
251 }
252 bat.Delete(combine_strings(prefix, it->key()));
253 it->next();
254 }
255 }
256
257 int LevelDBStore::get(
258 const string &prefix,
259 const std::set<string> &keys,
260 std::map<string, bufferlist> *out)
261 {
262 utime_t start = ceph_clock_now();
263 for (std::set<string>::const_iterator i = keys.begin();
264 i != keys.end(); ++i) {
265 std::string value;
266 std::string bound = combine_strings(prefix, *i);
267 auto status = db->Get(leveldb::ReadOptions(), leveldb::Slice(bound), &value);
268 if (status.ok())
269 (*out)[*i].append(value);
270 }
271 utime_t lat = ceph_clock_now() - start;
272 logger->inc(l_leveldb_gets);
273 logger->tinc(l_leveldb_get_latency, lat);
274 return 0;
275 }
276
277 int LevelDBStore::get(const string &prefix,
278 const string &key,
279 bufferlist *out)
280 {
281 assert(out && (out->length() == 0));
282 utime_t start = ceph_clock_now();
283 int r = 0;
284 string value, k;
285 leveldb::Status s;
286 k = combine_strings(prefix, key);
287 s = db->Get(leveldb::ReadOptions(), leveldb::Slice(k), &value);
288 if (s.ok()) {
289 out->append(value);
290 } else {
291 r = -ENOENT;
292 }
293 utime_t lat = ceph_clock_now() - start;
294 logger->inc(l_leveldb_gets);
295 logger->tinc(l_leveldb_get_latency, lat);
296 return r;
297 }
298
299 string LevelDBStore::combine_strings(const string &prefix, const string &value)
300 {
301 string out = prefix;
302 out.push_back(0);
303 out.append(value);
304 return out;
305 }
306
307 bufferlist LevelDBStore::to_bufferlist(leveldb::Slice in)
308 {
309 bufferlist bl;
310 bl.append(bufferptr(in.data(), in.size()));
311 return bl;
312 }
313
314 int LevelDBStore::split_key(leveldb::Slice in, string *prefix, string *key)
315 {
316 size_t prefix_len = 0;
317
318 // Find separator inside Slice
319 char* separator = (char*) memchr(in.data(), 0, in.size());
320 if (separator == NULL)
321 return -EINVAL;
322 prefix_len = size_t(separator - in.data());
323 if (prefix_len >= in.size())
324 return -EINVAL;
325
326 if (prefix)
327 *prefix = string(in.data(), prefix_len);
328 if (key)
329 *key = string(separator+1, in.size() - prefix_len - 1);
330 return 0;
331 }
332
333 void LevelDBStore::compact()
334 {
335 logger->inc(l_leveldb_compact);
336 db->CompactRange(NULL, NULL);
337 }
338
339
340 void LevelDBStore::compact_thread_entry()
341 {
342 compact_queue_lock.Lock();
343 while (!compact_queue_stop) {
344 while (!compact_queue.empty()) {
345 pair<string,string> range = compact_queue.front();
346 compact_queue.pop_front();
347 logger->set(l_leveldb_compact_queue_len, compact_queue.size());
348 compact_queue_lock.Unlock();
349 logger->inc(l_leveldb_compact_range);
350 compact_range(range.first, range.second);
351 compact_queue_lock.Lock();
352 continue;
353 }
354 compact_queue_cond.Wait(compact_queue_lock);
355 }
356 compact_queue_lock.Unlock();
357 }
358
359 void LevelDBStore::compact_range_async(const string& start, const string& end)
360 {
361 Mutex::Locker l(compact_queue_lock);
362
363 // try to merge adjacent ranges. this is O(n), but the queue should
364 // be short. note that we do not cover all overlap cases and merge
365 // opportunities here, but we capture the ones we currently need.
366 list< pair<string,string> >::iterator p = compact_queue.begin();
367 while (p != compact_queue.end()) {
368 if (p->first == start && p->second == end) {
369 // dup; no-op
370 return;
371 }
372 if (p->first <= end && p->first > start) {
373 // merge with existing range to the right
374 compact_queue.push_back(make_pair(start, p->second));
375 compact_queue.erase(p);
376 logger->inc(l_leveldb_compact_queue_merge);
377 break;
378 }
379 if (p->second >= start && p->second < end) {
380 // merge with existing range to the left
381 compact_queue.push_back(make_pair(p->first, end));
382 compact_queue.erase(p);
383 logger->inc(l_leveldb_compact_queue_merge);
384 break;
385 }
386 ++p;
387 }
388 if (p == compact_queue.end()) {
389 // no merge, new entry.
390 compact_queue.push_back(make_pair(start, end));
391 logger->set(l_leveldb_compact_queue_len, compact_queue.size());
392 }
393 compact_queue_cond.Signal();
394 if (!compact_thread.is_started()) {
395 compact_thread.create("levdbst_compact");
396 }
397 }