]> git.proxmox.com Git - ceph.git/blame - ceph/src/mon/MonitorDBStore.h
import ceph quincy 17.2.6
[ceph.git] / ceph / src / mon / MonitorDBStore.h
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3/*
4* Ceph - scalable distributed file system
5*
6* Copyright (C) 2012 Inktank, Inc.
7*
8* This is free software; you can redistribute it and/or
9* modify it under the terms of the GNU Lesser General Public
10* License version 2.1, as published by the Free Software
11* Foundation. See file COPYING.
12*/
13#ifndef CEPH_MONITOR_DB_STORE_H
14#define CEPH_MONITOR_DB_STORE_H
15
16#include "include/types.h"
17#include "include/buffer.h"
18#include <set>
19#include <map>
20#include <string>
21#include <boost/scoped_ptr.hpp>
22#include <sstream>
23#include <fstream>
24#include "kv/KeyValueDB.h"
25
11fdf7f2 26#include "include/ceph_assert.h"
7c673cae
FG
27#include "common/Formatter.h"
28#include "common/Finisher.h"
29#include "common/errno.h"
30#include "common/debug.h"
31#include "common/safe_io.h"
11fdf7f2 32#include "common/blkdev.h"
eafe8130 33#include "common/PriorityCache.h"
7c673cae
FG
34
35#define dout_context g_ceph_context
36
37class MonitorDBStore
38{
f67539c2 39 std::string path;
7c673cae
FG
40 boost::scoped_ptr<KeyValueDB> db;
41 bool do_dump;
42 int dump_fd_binary;
43 std::ofstream dump_fd_json;
f67539c2 44 ceph::JSONFormatter dump_fmt;
7c673cae
FG
45
46
47 Finisher io_work;
48
49 bool is_open;
50
51 public:
52
f67539c2 53 std::string get_devname() {
11fdf7f2
TL
54 char devname[4096] = {0}, partition[4096];
55 get_device_by_path(path.c_str(), partition, devname,
56 sizeof(devname));
57 return devname;
58 }
59
20effc67
TL
60 std::string get_path() {
61 return path;
62 }
63
eafe8130
TL
64 std::shared_ptr<PriorityCache::PriCache> get_priority_cache() const {
65 return db->get_priority_cache();
66 }
67
7c673cae
FG
68 struct Op {
69 uint8_t type;
f67539c2
TL
70 std::string prefix;
71 std::string key, endkey;
72 ceph::buffer::list bl;
7c673cae
FG
73
74 Op()
75 : type(0) { }
f67539c2 76 Op(int t, const std::string& p, const std::string& k)
7c673cae 77 : type(t), prefix(p), key(k) { }
f67539c2 78 Op(int t, const std::string& p, const std::string& k, const ceph::buffer::list& b)
7c673cae 79 : type(t), prefix(p), key(k), bl(b) { }
f67539c2 80 Op(int t, const std::string& p, const std::string& start, const std::string& end)
7c673cae
FG
81 : type(t), prefix(p), key(start), endkey(end) { }
82
f67539c2 83 void encode(ceph::buffer::list& encode_bl) const {
7c673cae 84 ENCODE_START(2, 1, encode_bl);
11fdf7f2
TL
85 encode(type, encode_bl);
86 encode(prefix, encode_bl);
87 encode(key, encode_bl);
88 encode(bl, encode_bl);
89 encode(endkey, encode_bl);
7c673cae
FG
90 ENCODE_FINISH(encode_bl);
91 }
92
f67539c2 93 void decode(ceph::buffer::list::const_iterator& decode_bl) {
7c673cae 94 DECODE_START(2, decode_bl);
11fdf7f2
TL
95 decode(type, decode_bl);
96 decode(prefix, decode_bl);
97 decode(key, decode_bl);
98 decode(bl, decode_bl);
7c673cae 99 if (struct_v >= 2)
11fdf7f2 100 decode(endkey, decode_bl);
7c673cae
FG
101 DECODE_FINISH(decode_bl);
102 }
103
f67539c2 104 void dump(ceph::Formatter *f) const {
7c673cae
FG
105 f->dump_int("type", type);
106 f->dump_string("prefix", prefix);
107 f->dump_string("key", key);
9f95a23c 108 if (endkey.length()) {
7c673cae 109 f->dump_string("endkey", endkey);
9f95a23c
TL
110 }
111 }
112
113 int approx_size() const {
114 return 6 + 1 +
115 4 + prefix.size() +
116 4 + key.size() +
117 4 + endkey.size() +
118 4 + bl.length();
7c673cae
FG
119 }
120
f67539c2 121 static void generate_test_instances(std::list<Op*>& ls) {
7c673cae
FG
122 ls.push_back(new Op);
123 // we get coverage here from the Transaction instances
124 }
125 };
126
127 struct Transaction;
11fdf7f2 128 typedef std::shared_ptr<Transaction> TransactionRef;
7c673cae 129 struct Transaction {
f67539c2 130 std::list<Op> ops;
7c673cae
FG
131 uint64_t bytes, keys;
132
9f95a23c 133 Transaction() : bytes(6 + 4 + 8*2), keys(0) {}
7c673cae
FG
134
135 enum {
136 OP_PUT = 1,
137 OP_ERASE = 2,
138 OP_COMPACT = 3,
9f95a23c 139 OP_ERASE_RANGE = 4,
7c673cae
FG
140 };
141
f67539c2 142 void put(const std::string& prefix, const std::string& key, const ceph::buffer::list& bl) {
7c673cae
FG
143 ops.push_back(Op(OP_PUT, prefix, key, bl));
144 ++keys;
9f95a23c 145 bytes += ops.back().approx_size();
7c673cae
FG
146 }
147
f67539c2
TL
148 void put(const std::string& prefix, version_t ver, const ceph::buffer::list& bl) {
149 std::ostringstream os;
7c673cae
FG
150 os << ver;
151 put(prefix, os.str(), bl);
152 }
153
f67539c2 154 void put(const std::string& prefix, const std::string& key, version_t ver) {
11fdf7f2 155 using ceph::encode;
f67539c2 156 ceph::buffer::list bl;
11fdf7f2 157 encode(ver, bl);
7c673cae
FG
158 put(prefix, key, bl);
159 }
160
f67539c2 161 void erase(const std::string& prefix, const std::string& key) {
7c673cae
FG
162 ops.push_back(Op(OP_ERASE, prefix, key));
163 ++keys;
9f95a23c 164 bytes += ops.back().approx_size();
7c673cae
FG
165 }
166
f67539c2
TL
167 void erase(const std::string& prefix, version_t ver) {
168 std::ostringstream os;
7c673cae
FG
169 os << ver;
170 erase(prefix, os.str());
171 }
172
f67539c2
TL
173 void erase_range(const std::string& prefix, const std::string& begin,
174 const std::string& end) {
9f95a23c
TL
175 ops.push_back(Op(OP_ERASE_RANGE, prefix, begin, end));
176 ++keys;
177 bytes += ops.back().approx_size();
178 }
179
f67539c2
TL
180 void compact_prefix(const std::string& prefix) {
181 ops.push_back(Op(OP_COMPACT, prefix, {}));
7c673cae
FG
182 }
183
f67539c2
TL
184 void compact_range(const std::string& prefix, const std::string& start,
185 const std::string& end) {
7c673cae
FG
186 ops.push_back(Op(OP_COMPACT, prefix, start, end));
187 }
188
f67539c2 189 void encode(ceph::buffer::list& bl) const {
7c673cae 190 ENCODE_START(2, 1, bl);
11fdf7f2
TL
191 encode(ops, bl);
192 encode(bytes, bl);
193 encode(keys, bl);
7c673cae
FG
194 ENCODE_FINISH(bl);
195 }
196
f67539c2 197 void decode(ceph::buffer::list::const_iterator& bl) {
7c673cae 198 DECODE_START(2, bl);
11fdf7f2 199 decode(ops, bl);
7c673cae 200 if (struct_v >= 2) {
11fdf7f2
TL
201 decode(bytes, bl);
202 decode(keys, bl);
7c673cae
FG
203 }
204 DECODE_FINISH(bl);
205 }
206
f67539c2 207 static void generate_test_instances(std::list<Transaction*>& ls) {
7c673cae
FG
208 ls.push_back(new Transaction);
209 ls.push_back(new Transaction);
f67539c2 210 ceph::buffer::list bl;
7c673cae
FG
211 bl.append("value");
212 ls.back()->put("prefix", "key", bl);
213 ls.back()->erase("prefix2", "key2");
9f95a23c 214 ls.back()->erase_range("prefix3", "key3", "key4");
7c673cae
FG
215 ls.back()->compact_prefix("prefix3");
216 ls.back()->compact_range("prefix4", "from", "to");
217 }
218
219 void append(TransactionRef other) {
220 ops.splice(ops.end(), other->ops);
221 keys += other->keys;
222 bytes += other->bytes;
223 }
224
f67539c2 225 void append_from_encoded(ceph::buffer::list& bl) {
7c673cae 226 auto other(std::make_shared<Transaction>());
11fdf7f2 227 auto it = bl.cbegin();
7c673cae
FG
228 other->decode(it);
229 append(other);
230 }
231
232 bool empty() {
233 return (size() == 0);
234 }
235
236 size_t size() const {
237 return ops.size();
238 }
239 uint64_t get_keys() const {
240 return keys;
241 }
242 uint64_t get_bytes() const {
243 return bytes;
244 }
245
246 void dump(ceph::Formatter *f, bool dump_val=false) const {
247 f->open_object_section("transaction");
248 f->open_array_section("ops");
7c673cae 249 int op_num = 0;
f67539c2 250 for (auto it = ops.begin(); it != ops.end(); ++it) {
7c673cae
FG
251 const Op& op = *it;
252 f->open_object_section("op");
253 f->dump_int("op_num", op_num++);
254 switch (op.type) {
255 case OP_PUT:
256 {
257 f->dump_string("type", "PUT");
258 f->dump_string("prefix", op.prefix);
259 f->dump_string("key", op.key);
260 f->dump_unsigned("length", op.bl.length());
261 if (dump_val) {
f67539c2 262 std::ostringstream os;
7c673cae
FG
263 op.bl.hexdump(os);
264 f->dump_string("bl", os.str());
265 }
266 }
267 break;
268 case OP_ERASE:
269 {
270 f->dump_string("type", "ERASE");
271 f->dump_string("prefix", op.prefix);
272 f->dump_string("key", op.key);
273 }
274 break;
9f95a23c
TL
275 case OP_ERASE_RANGE:
276 {
277 f->dump_string("type", "ERASE_RANGE");
278 f->dump_string("prefix", op.prefix);
279 f->dump_string("start", op.key);
280 f->dump_string("end", op.endkey);
281 }
282 break;
7c673cae
FG
283 case OP_COMPACT:
284 {
285 f->dump_string("type", "COMPACT");
286 f->dump_string("prefix", op.prefix);
287 f->dump_string("start", op.key);
288 f->dump_string("end", op.endkey);
289 }
290 break;
291 default:
292 {
293 f->dump_string("type", "unknown");
294 f->dump_unsigned("op_code", op.type);
295 break;
296 }
297 }
298 f->close_section();
299 }
300 f->close_section();
301 f->dump_unsigned("num_keys", keys);
302 f->dump_unsigned("num_bytes", bytes);
303 f->close_section();
304 }
305 };
306
307 int apply_transaction(MonitorDBStore::TransactionRef t) {
308 KeyValueDB::Transaction dbt = db->get_transaction();
309
310 if (do_dump) {
11fdf7f2 311 if (!g_conf()->mon_debug_dump_json) {
f67539c2 312 ceph::buffer::list bl;
7c673cae
FG
313 t->encode(bl);
314 bl.write_fd(dump_fd_binary);
315 } else {
316 t->dump(&dump_fmt, true);
317 dump_fmt.flush(dump_fd_json);
318 dump_fd_json.flush();
319 }
320 }
321
f67539c2
TL
322 std::list<std::pair<std::string, std::pair<std::string,std::string>>> compact;
323 for (auto it = t->ops.begin(); it != t->ops.end(); ++it) {
7c673cae
FG
324 const Op& op = *it;
325 switch (op.type) {
326 case Transaction::OP_PUT:
327 dbt->set(op.prefix, op.key, op.bl);
328 break;
329 case Transaction::OP_ERASE:
330 dbt->rmkey(op.prefix, op.key);
331 break;
9f95a23c
TL
332 case Transaction::OP_ERASE_RANGE:
333 dbt->rm_range_keys(op.prefix, op.key, op.endkey);
334 break;
7c673cae
FG
335 case Transaction::OP_COMPACT:
336 compact.push_back(make_pair(op.prefix, make_pair(op.key, op.endkey)));
337 break;
338 default:
339 derr << __func__ << " unknown op type " << op.type << dendl;
340 ceph_abort();
341 break;
342 }
343 }
344 int r = db->submit_transaction_sync(dbt);
345 if (r >= 0) {
346 while (!compact.empty()) {
f67539c2
TL
347 if (compact.front().second.first == std::string() &&
348 compact.front().second.second == std::string())
7c673cae
FG
349 db->compact_prefix_async(compact.front().first);
350 else
351 db->compact_range_async(compact.front().first, compact.front().second.first, compact.front().second.second);
352 compact.pop_front();
353 }
354 } else {
11fdf7f2 355 ceph_abort_msg("failed to write to db");
7c673cae
FG
356 }
357 return r;
358 }
359
360 struct C_DoTransaction : public Context {
361 MonitorDBStore *store;
362 MonitorDBStore::TransactionRef t;
363 Context *oncommit;
364 C_DoTransaction(MonitorDBStore *s, MonitorDBStore::TransactionRef t,
365 Context *f)
366 : store(s), t(t), oncommit(f)
367 {}
368 void finish(int r) override {
369 /* The store serializes writes. Each transaction is handled
370 * sequentially by the io_work Finisher. If a transaction takes longer
371 * to apply its state to permanent storage, then no other transaction
372 * will be handled meanwhile.
373 *
374 * We will now randomly inject random delays. We can safely sleep prior
375 * to applying the transaction as it won't break the model.
376 */
11fdf7f2 377 double delay_prob = g_conf()->mon_inject_transaction_delay_probability;
7c673cae
FG
378 if (delay_prob && (rand() % 10000 < delay_prob * 10000.0)) {
379 utime_t delay;
11fdf7f2 380 double delay_max = g_conf()->mon_inject_transaction_delay_max;
7c673cae
FG
381 delay.set_from_double(delay_max * (double)(rand() % 10000) / 10000.0);
382 lsubdout(g_ceph_context, mon, 1)
383 << "apply_transaction will be delayed for " << delay
384 << " seconds" << dendl;
385 delay.sleep();
386 }
387 int ret = store->apply_transaction(t);
388 oncommit->complete(ret);
389 }
390 };
391
392 /**
393 * queue transaction
394 *
395 * Queue a transaction to commit asynchronously. Trigger a context
396 * on completion (without any locks held).
397 */
398 void queue_transaction(MonitorDBStore::TransactionRef t,
399 Context *oncommit) {
400 io_work.queue(new C_DoTransaction(this, t, oncommit));
401 }
402
403 /**
404 * block and flush all io activity
405 */
406 void flush() {
407 io_work.wait_for_empty();
408 }
409
410 class StoreIteratorImpl {
411 protected:
412 bool done;
f67539c2
TL
413 std::pair<std::string,std::string> last_key;
414 ceph::buffer::list crc_bl;
7c673cae
FG
415
416 StoreIteratorImpl() : done(false) { }
417 virtual ~StoreIteratorImpl() { }
418
7c673cae
FG
419 virtual bool _is_valid() = 0;
420
421 public:
422 __u32 crc() {
11fdf7f2 423 if (g_conf()->mon_sync_debug)
7c673cae
FG
424 return crc_bl.crc32c(0);
425 return 0;
426 }
f67539c2 427 std::pair<std::string,std::string> get_last_key() {
7c673cae
FG
428 return last_key;
429 }
430 virtual bool has_next_chunk() {
431 return !done && _is_valid();
432 }
9f95a23c
TL
433 virtual void get_chunk_tx(TransactionRef tx, uint64_t max_bytes,
434 uint64_t max_keys) = 0;
f67539c2 435 virtual std::pair<std::string,std::string> get_next_key() = 0;
7c673cae 436 };
11fdf7f2 437 typedef std::shared_ptr<StoreIteratorImpl> Synchronizer;
7c673cae
FG
438
439 class WholeStoreIteratorImpl : public StoreIteratorImpl {
440 KeyValueDB::WholeSpaceIterator iter;
f67539c2 441 std::set<std::string> sync_prefixes;
7c673cae
FG
442
443 public:
444 WholeStoreIteratorImpl(KeyValueDB::WholeSpaceIterator iter,
f67539c2 445 std::set<std::string> &prefixes)
7c673cae
FG
446 : StoreIteratorImpl(),
447 iter(iter),
448 sync_prefixes(prefixes)
449 { }
450
451 ~WholeStoreIteratorImpl() override { }
452
453 /**
454 * Obtain a chunk of the store
455 *
456 * @param bl Encoded transaction that will recreate the chunk
457 * @param first_key Pair containing the first key to obtain, and that
458 * will contain the first key in the chunk (that may
459 * differ from the one passed on to the function)
460 * @param last_key[out] Last key in the chunk
461 */
9f95a23c
TL
462 void get_chunk_tx(TransactionRef tx, uint64_t max_bytes,
463 uint64_t max_keys) override {
f67539c2 464 using ceph::encode;
11fdf7f2
TL
465 ceph_assert(done == false);
466 ceph_assert(iter->valid() == true);
7c673cae
FG
467
468 while (iter->valid()) {
f67539c2
TL
469 std::string prefix(iter->raw_key().first);
470 std::string key(iter->raw_key().second);
7c673cae 471 if (sync_prefixes.count(prefix)) {
f67539c2 472 ceph::buffer::list value = iter->value();
9f95a23c
TL
473 if (tx->empty() ||
474 (tx->get_bytes() + value.length() + key.size() +
475 prefix.size() < max_bytes &&
476 tx->get_keys() < max_keys)) {
477 // NOTE: putting every key in a separate transaction is
478 // questionable as far as efficiency goes
479 auto tmp(std::make_shared<Transaction>());
480 tmp->put(prefix, key, value);
481 tx->append(tmp);
482 if (g_conf()->mon_sync_debug) {
483 encode(prefix, crc_bl);
484 encode(key, crc_bl);
485 encode(value, crc_bl);
486 }
487 } else {
488 last_key.first = prefix;
489 last_key.second = key;
7c673cae 490 return;
9f95a23c 491 }
7c673cae
FG
492 }
493 iter->next();
494 }
11fdf7f2 495 ceph_assert(iter->valid() == false);
7c673cae
FG
496 done = true;
497 }
498
f67539c2 499 std::pair<std::string,std::string> get_next_key() override {
11fdf7f2 500 ceph_assert(iter->valid());
7c673cae
FG
501
502 for (; iter->valid(); iter->next()) {
f67539c2 503 std::pair<std::string,std::string> r = iter->raw_key();
7c673cae
FG
504 if (sync_prefixes.count(r.first) > 0) {
505 iter->next();
506 return r;
507 }
508 }
f67539c2 509 return std::pair<std::string,std::string>();
7c673cae
FG
510 }
511
512 bool _is_valid() override {
513 return iter->valid();
514 }
515 };
516
f67539c2
TL
517 Synchronizer get_synchronizer(std::pair<std::string,std::string> &key,
518 std::set<std::string> &prefixes) {
7c673cae 519 KeyValueDB::WholeSpaceIterator iter;
11fdf7f2 520 iter = db->get_wholespace_iterator();
7c673cae
FG
521
522 if (!key.first.empty() && !key.second.empty())
523 iter->upper_bound(key.first, key.second);
524 else
525 iter->seek_to_first();
526
11fdf7f2 527 return std::shared_ptr<StoreIteratorImpl>(
7c673cae
FG
528 new WholeStoreIteratorImpl(iter, prefixes)
529 );
530 }
531
f67539c2 532 KeyValueDB::Iterator get_iterator(const std::string &prefix) {
11fdf7f2 533 ceph_assert(!prefix.empty());
7c673cae
FG
534 KeyValueDB::Iterator iter = db->get_iterator(prefix);
535 iter->seek_to_first();
536 return iter;
537 }
538
539 KeyValueDB::WholeSpaceIterator get_iterator() {
540 KeyValueDB::WholeSpaceIterator iter;
11fdf7f2 541 iter = db->get_wholespace_iterator();
7c673cae
FG
542 iter->seek_to_first();
543 return iter;
544 }
545
f67539c2 546 int get(const std::string& prefix, const std::string& key, ceph::buffer::list& bl) {
11fdf7f2 547 ceph_assert(bl.length() == 0);
7c673cae
FG
548 return db->get(prefix, key, &bl);
549 }
550
f67539c2
TL
551 int get(const std::string& prefix, const version_t ver, ceph::buffer::list& bl) {
552 std::ostringstream os;
7c673cae
FG
553 os << ver;
554 return get(prefix, os.str(), bl);
555 }
556
f67539c2
TL
557 version_t get(const std::string& prefix, const std::string& key) {
558 using ceph::decode;
559 ceph::buffer::list bl;
7c673cae
FG
560 int err = get(prefix, key, bl);
561 if (err < 0) {
562 if (err == -ENOENT) // if key doesn't exist, assume its value is 0
563 return 0;
564 // we're not expecting any other negative return value, and we can't
565 // just return a negative value if we're returning a version_t
566 generic_dout(0) << "MonitorDBStore::get() error obtaining"
567 << " (" << prefix << ":" << key << "): "
568 << cpp_strerror(err) << dendl;
11fdf7f2 569 ceph_abort_msg("error obtaining key");
7c673cae
FG
570 }
571
11fdf7f2 572 ceph_assert(bl.length());
7c673cae 573 version_t ver;
11fdf7f2
TL
574 auto p = bl.cbegin();
575 decode(ver, p);
7c673cae
FG
576 return ver;
577 }
578
f67539c2 579 bool exists(const std::string& prefix, const std::string& key) {
7c673cae
FG
580 KeyValueDB::Iterator it = db->get_iterator(prefix);
581 int err = it->lower_bound(key);
582 if (err < 0)
583 return false;
584
585 return (it->valid() && it->key() == key);
586 }
587
f67539c2
TL
588 bool exists(const std::string& prefix, version_t ver) {
589 std::ostringstream os;
7c673cae
FG
590 os << ver;
591 return exists(prefix, os.str());
592 }
593
f67539c2
TL
594 std::string combine_strings(const std::string& prefix, const std::string& value) {
595 std::string out = prefix;
7c673cae
FG
596 out.push_back('_');
597 out.append(value);
598 return out;
599 }
600
f67539c2
TL
601 std::string combine_strings(const std::string& prefix, const version_t ver) {
602 std::ostringstream os;
7c673cae
FG
603 os << ver;
604 return combine_strings(prefix, os.str());
605 }
606
f67539c2 607 void clear(std::set<std::string>& prefixes) {
7c673cae
FG
608 KeyValueDB::Transaction dbt = db->get_transaction();
609
f67539c2 610 for (auto iter = prefixes.begin(); iter != prefixes.end(); ++iter) {
7c673cae
FG
611 dbt->rmkeys_by_prefix((*iter));
612 }
613 int r = db->submit_transaction_sync(dbt);
11fdf7f2 614 ceph_assert(r >= 0);
7c673cae
FG
615 }
616
f67539c2 617 void _open(const std::string& kv_type) {
7c673cae 618 int pos = 0;
f67539c2 619 for (auto rit = path.rbegin(); rit != path.rend(); ++rit, ++pos) {
7c673cae
FG
620 if (*rit != '/')
621 break;
622 }
f67539c2 623 std::ostringstream os;
7c673cae 624 os << path.substr(0, path.size() - pos) << "/store.db";
f67539c2 625 std::string full_path = os.str();
7c673cae
FG
626
627 KeyValueDB *db_ptr = KeyValueDB::create(g_ceph_context,
628 kv_type,
629 full_path);
630 if (!db_ptr) {
631 derr << __func__ << " error initializing "
632 << kv_type << " db back storage in "
633 << full_path << dendl;
11fdf7f2 634 ceph_abort_msg("MonitorDBStore: error initializing keyvaluedb back storage");
7c673cae
FG
635 }
636 db.reset(db_ptr);
637
11fdf7f2
TL
638 if (g_conf()->mon_debug_dump_transactions) {
639 if (!g_conf()->mon_debug_dump_json) {
7c673cae 640 dump_fd_binary = ::open(
11fdf7f2 641 g_conf()->mon_debug_dump_location.c_str(),
91327a77 642 O_CREAT|O_APPEND|O_WRONLY|O_CLOEXEC, 0644);
7c673cae
FG
643 if (dump_fd_binary < 0) {
644 dump_fd_binary = -errno;
645 derr << "Could not open log file, got "
646 << cpp_strerror(dump_fd_binary) << dendl;
647 }
648 } else {
649 dump_fmt.reset();
650 dump_fmt.open_array_section("dump");
11fdf7f2 651 dump_fd_json.open(g_conf()->mon_debug_dump_location.c_str());
7c673cae
FG
652 }
653 do_dump = true;
654 }
655 if (kv_type == "rocksdb")
11fdf7f2 656 db->init(g_conf()->mon_rocksdb_options);
7c673cae
FG
657 else
658 db->init();
3efd9988
FG
659
660
7c673cae
FG
661 }
662
f67539c2
TL
663 int open(std::ostream &out) {
664 std::string kv_type;
7c673cae 665 int r = read_meta("kv_backend", &kv_type);
31f18b77
FG
666 if (r < 0 || kv_type.empty()) {
667 // assume old monitors that did not mark the type were leveldb.
7c673cae 668 kv_type = "leveldb";
31f18b77
FG
669 r = write_meta("kv_backend", kv_type);
670 if (r < 0)
671 return r;
672 }
7c673cae
FG
673 _open(kv_type);
674 r = db->open(out);
675 if (r < 0)
676 return r;
3efd9988
FG
677
678 // Monitors are few in number, so the resource cost of exposing
679 // very detailed stats is low: ramp up the priority of all the
680 // KV store's perf counters. Do this after open, because backend may
681 // not have constructed PerfCounters earlier.
682 if (db->get_perf_counters()) {
683 db->get_perf_counters()->set_prio_adjust(
684 PerfCountersBuilder::PRIO_USEFUL - PerfCountersBuilder::PRIO_DEBUGONLY);
685 }
686
7c673cae
FG
687 io_work.start();
688 is_open = true;
689 return 0;
690 }
691
f67539c2 692 int create_and_open(std::ostream &out) {
7c673cae 693 // record the type before open
f67539c2 694 std::string kv_type;
7c673cae
FG
695 int r = read_meta("kv_backend", &kv_type);
696 if (r < 0) {
11fdf7f2 697 kv_type = g_conf()->mon_keyvaluedb;
7c673cae
FG
698 r = write_meta("kv_backend", kv_type);
699 if (r < 0)
700 return r;
701 }
702 _open(kv_type);
703 r = db->create_and_open(out);
704 if (r < 0)
705 return r;
706 io_work.start();
707 is_open = true;
708 return 0;
709 }
710
711 void close() {
712 // there should be no work queued!
713 io_work.stop();
714 is_open = false;
715 db.reset(NULL);
716 }
717
718 void compact() {
719 db->compact();
720 }
721
11fdf7f2
TL
722 void compact_async() {
723 db->compact_async();
724 }
725
f67539c2 726 void compact_prefix(const std::string& prefix) {
7c673cae
FG
727 db->compact_prefix(prefix);
728 }
729
f67539c2 730 uint64_t get_estimated_size(std::map<std::string, uint64_t> &extras) {
7c673cae
FG
731 return db->get_estimated_size(extras);
732 }
733
734 /**
735 * write_meta - write a simple configuration key out-of-band
736 *
737 * Write a simple key/value pair for basic store configuration
738 * (e.g., a uuid or magic number) to an unopened/unmounted store.
739 * The default implementation writes this to a plaintext file in the
740 * path.
741 *
742 * A newline is appended.
743 *
744 * @param key key name (e.g., "fsid")
745 * @param value value (e.g., a uuid rendered as a string)
746 * @returns 0 for success, or an error code
747 */
748 int write_meta(const std::string& key,
749 const std::string& value) const {
f67539c2 750 std::string v = value;
7c673cae
FG
751 v += "\n";
752 int r = safe_write_file(path.c_str(), key.c_str(),
eafe8130
TL
753 v.c_str(), v.length(),
754 0600);
7c673cae
FG
755 if (r < 0)
756 return r;
757 return 0;
758 }
759
760 /**
761 * read_meta - read a simple configuration key out-of-band
762 *
763 * Read a simple key value to an unopened/mounted store.
764 *
765 * Trailing whitespace is stripped off.
766 *
767 * @param key key name
768 * @param value pointer to value string
769 * @returns 0 for success, or an error code
770 */
771 int read_meta(const std::string& key,
772 std::string *value) const {
773 char buf[4096];
774 int r = safe_read_file(path.c_str(), key.c_str(),
775 buf, sizeof(buf));
776 if (r <= 0)
777 return r;
778 // drop trailing newlines
779 while (r && isspace(buf[r-1])) {
780 --r;
781 }
f67539c2 782 *value = std::string(buf, r);
7c673cae
FG
783 return 0;
784 }
785
f67539c2 786 explicit MonitorDBStore(const std::string& path)
7c673cae
FG
787 : path(path),
788 db(0),
789 do_dump(false),
790 dump_fd_binary(-1),
791 dump_fmt(true),
792 io_work(g_ceph_context, "monstore", "fn_monstore"),
793 is_open(false) {
794 }
795 ~MonitorDBStore() {
11fdf7f2 796 ceph_assert(!is_open);
7c673cae 797 if (do_dump) {
11fdf7f2 798 if (!g_conf()->mon_debug_dump_json) {
7c673cae
FG
799 ::close(dump_fd_binary);
800 } else {
801 dump_fmt.close_section();
802 dump_fmt.flush(dump_fd_json);
803 dump_fd_json.flush();
804 dump_fd_json.close();
805 }
806 }
807 }
808
809};
810
811WRITE_CLASS_ENCODER(MonitorDBStore::Op)
812WRITE_CLASS_ENCODER(MonitorDBStore::Transaction)
813
814#endif /* CEPH_MONITOR_DB_STORE_H */