1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2012 Inktank, Inc.
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
13 #ifndef CEPH_MONITOR_DB_STORE_H
14 #define CEPH_MONITOR_DB_STORE_H
16 #include "include/types.h"
17 #include "include/buffer.h"
21 #include <boost/scoped_ptr.hpp>
24 #include "kv/KeyValueDB.h"
26 #include "include/ceph_assert.h"
27 #include "common/Formatter.h"
28 #include "common/Finisher.h"
29 #include "common/errno.h"
30 #include "common/debug.h"
31 #include "common/safe_io.h"
32 #include "common/blkdev.h"
33 #include "common/PriorityCache.h"
35 #define dout_context g_ceph_context
40 boost::scoped_ptr
<KeyValueDB
> db
;
43 std::ofstream dump_fd_json
;
44 JSONFormatter dump_fmt
;
53 string
get_devname() {
54 char devname
[4096] = {0}, partition
[4096];
55 get_device_by_path(path
.c_str(), partition
, devname
,
60 std::shared_ptr
<PriorityCache::PriCache
> get_priority_cache() const {
61 return db
->get_priority_cache();
72 Op(int t
, string p
, string k
)
73 : type(t
), prefix(p
), key(k
) { }
74 Op(int t
, const string
& p
, string k
, bufferlist
& b
)
75 : type(t
), prefix(p
), key(k
), bl(b
) { }
76 Op(int t
, const string
& p
, string start
, string end
)
77 : type(t
), prefix(p
), key(start
), endkey(end
) { }
79 void encode(bufferlist
& encode_bl
) const {
80 ENCODE_START(2, 1, encode_bl
);
81 encode(type
, encode_bl
);
82 encode(prefix
, encode_bl
);
83 encode(key
, encode_bl
);
84 encode(bl
, encode_bl
);
85 encode(endkey
, encode_bl
);
86 ENCODE_FINISH(encode_bl
);
89 void decode(bufferlist::const_iterator
& decode_bl
) {
90 DECODE_START(2, decode_bl
);
91 decode(type
, decode_bl
);
92 decode(prefix
, decode_bl
);
93 decode(key
, decode_bl
);
94 decode(bl
, decode_bl
);
96 decode(endkey
, decode_bl
);
97 DECODE_FINISH(decode_bl
);
100 void dump(Formatter
*f
) const {
101 f
->dump_int("type", type
);
102 f
->dump_string("prefix", prefix
);
103 f
->dump_string("key", key
);
105 f
->dump_string("endkey", endkey
);
108 static void generate_test_instances(list
<Op
*>& ls
) {
109 ls
.push_back(new Op
);
110 // we get coverage here from the Transaction instances
115 typedef std::shared_ptr
<Transaction
> TransactionRef
;
118 uint64_t bytes
, keys
;
120 Transaction() : bytes(0), keys(0) {}
128 void put(string prefix
, string key
, bufferlist
& bl
) {
129 ops
.push_back(Op(OP_PUT
, prefix
, key
, bl
));
131 bytes
+= prefix
.length() + key
.length() + bl
.length();
134 void put(string prefix
, version_t ver
, bufferlist
& bl
) {
137 put(prefix
, os
.str(), bl
);
140 void put(string prefix
, string key
, version_t ver
) {
144 put(prefix
, key
, bl
);
147 void erase(string prefix
, string key
) {
148 ops
.push_back(Op(OP_ERASE
, prefix
, key
));
150 bytes
+= prefix
.length() + key
.length();
153 void erase(string prefix
, version_t ver
) {
156 erase(prefix
, os
.str());
159 void compact_prefix(string prefix
) {
160 ops
.push_back(Op(OP_COMPACT
, prefix
, string()));
163 void compact_range(string prefix
, string start
, string end
) {
164 ops
.push_back(Op(OP_COMPACT
, prefix
, start
, end
));
167 void encode(bufferlist
& bl
) const {
168 ENCODE_START(2, 1, bl
);
175 void decode(bufferlist::const_iterator
& bl
) {
185 static void generate_test_instances(list
<Transaction
*>& ls
) {
186 ls
.push_back(new Transaction
);
187 ls
.push_back(new Transaction
);
190 ls
.back()->put("prefix", "key", bl
);
191 ls
.back()->erase("prefix2", "key2");
192 ls
.back()->compact_prefix("prefix3");
193 ls
.back()->compact_range("prefix4", "from", "to");
196 void append(TransactionRef other
) {
197 ops
.splice(ops
.end(), other
->ops
);
199 bytes
+= other
->bytes
;
202 void append_from_encoded(bufferlist
& bl
) {
203 auto other(std::make_shared
<Transaction
>());
204 auto it
= bl
.cbegin();
210 return (size() == 0);
213 size_t size() const {
216 uint64_t get_keys() const {
219 uint64_t get_bytes() const {
223 void dump(ceph::Formatter
*f
, bool dump_val
=false) const {
224 f
->open_object_section("transaction");
225 f
->open_array_section("ops");
226 list
<Op
>::const_iterator it
;
228 for (it
= ops
.begin(); it
!= ops
.end(); ++it
) {
230 f
->open_object_section("op");
231 f
->dump_int("op_num", op_num
++);
235 f
->dump_string("type", "PUT");
236 f
->dump_string("prefix", op
.prefix
);
237 f
->dump_string("key", op
.key
);
238 f
->dump_unsigned("length", op
.bl
.length());
242 f
->dump_string("bl", os
.str());
248 f
->dump_string("type", "ERASE");
249 f
->dump_string("prefix", op
.prefix
);
250 f
->dump_string("key", op
.key
);
255 f
->dump_string("type", "COMPACT");
256 f
->dump_string("prefix", op
.prefix
);
257 f
->dump_string("start", op
.key
);
258 f
->dump_string("end", op
.endkey
);
263 f
->dump_string("type", "unknown");
264 f
->dump_unsigned("op_code", op
.type
);
271 f
->dump_unsigned("num_keys", keys
);
272 f
->dump_unsigned("num_bytes", bytes
);
277 int apply_transaction(MonitorDBStore::TransactionRef t
) {
278 KeyValueDB::Transaction dbt
= db
->get_transaction();
281 if (!g_conf()->mon_debug_dump_json
) {
284 bl
.write_fd(dump_fd_binary
);
286 t
->dump(&dump_fmt
, true);
287 dump_fmt
.flush(dump_fd_json
);
288 dump_fd_json
.flush();
292 list
<pair
<string
, pair
<string
,string
> > > compact
;
293 for (list
<Op
>::const_iterator it
= t
->ops
.begin();
298 case Transaction::OP_PUT
:
299 dbt
->set(op
.prefix
, op
.key
, op
.bl
);
301 case Transaction::OP_ERASE
:
302 dbt
->rmkey(op
.prefix
, op
.key
);
304 case Transaction::OP_COMPACT
:
305 compact
.push_back(make_pair(op
.prefix
, make_pair(op
.key
, op
.endkey
)));
308 derr
<< __func__
<< " unknown op type " << op
.type
<< dendl
;
313 int r
= db
->submit_transaction_sync(dbt
);
315 while (!compact
.empty()) {
316 if (compact
.front().second
.first
== string() &&
317 compact
.front().second
.second
== string())
318 db
->compact_prefix_async(compact
.front().first
);
320 db
->compact_range_async(compact
.front().first
, compact
.front().second
.first
, compact
.front().second
.second
);
324 ceph_abort_msg("failed to write to db");
329 struct C_DoTransaction
: public Context
{
330 MonitorDBStore
*store
;
331 MonitorDBStore::TransactionRef t
;
333 C_DoTransaction(MonitorDBStore
*s
, MonitorDBStore::TransactionRef t
,
335 : store(s
), t(t
), oncommit(f
)
337 void finish(int r
) override
{
338 /* The store serializes writes. Each transaction is handled
339 * sequentially by the io_work Finisher. If a transaction takes longer
340 * to apply its state to permanent storage, then no other transaction
341 * will be handled meanwhile.
343 * We will now randomly inject random delays. We can safely sleep prior
344 * to applying the transaction as it won't break the model.
346 double delay_prob
= g_conf()->mon_inject_transaction_delay_probability
;
347 if (delay_prob
&& (rand() % 10000 < delay_prob
* 10000.0)) {
349 double delay_max
= g_conf()->mon_inject_transaction_delay_max
;
350 delay
.set_from_double(delay_max
* (double)(rand() % 10000) / 10000.0);
351 lsubdout(g_ceph_context
, mon
, 1)
352 << "apply_transaction will be delayed for " << delay
353 << " seconds" << dendl
;
356 int ret
= store
->apply_transaction(t
);
357 oncommit
->complete(ret
);
364 * Queue a transaction to commit asynchronously. Trigger a context
365 * on completion (without any locks held).
367 void queue_transaction(MonitorDBStore::TransactionRef t
,
369 io_work
.queue(new C_DoTransaction(this, t
, oncommit
));
373 * block and flush all io activity
376 io_work
.wait_for_empty();
379 class StoreIteratorImpl
{
382 pair
<string
,string
> last_key
;
385 StoreIteratorImpl() : done(false) { }
386 virtual ~StoreIteratorImpl() { }
388 bool add_chunk_entry(TransactionRef tx
,
393 auto tmp(std::make_shared
<Transaction
>());
395 tmp
->put(prefix
, key
, value
);
401 size_t len
= tx_bl
.length() + tmp_bl
.length();
403 if (!tx
->empty() && (len
> max
)) {
408 last_key
.first
= prefix
;
409 last_key
.second
= key
;
411 if (g_conf()->mon_sync_debug
) {
412 encode(prefix
, crc_bl
);
414 encode(value
, crc_bl
);
420 virtual bool _is_valid() = 0;
424 if (g_conf()->mon_sync_debug
)
425 return crc_bl
.crc32c(0);
428 pair
<string
,string
> get_last_key() {
431 virtual bool has_next_chunk() {
432 return !done
&& _is_valid();
434 virtual void get_chunk_tx(TransactionRef tx
, uint64_t max
) = 0;
435 virtual pair
<string
,string
> get_next_key() = 0;
437 typedef std::shared_ptr
<StoreIteratorImpl
> Synchronizer
;
439 class WholeStoreIteratorImpl
: public StoreIteratorImpl
{
440 KeyValueDB::WholeSpaceIterator iter
;
441 set
<string
> sync_prefixes
;
444 WholeStoreIteratorImpl(KeyValueDB::WholeSpaceIterator iter
,
445 set
<string
> &prefixes
)
446 : StoreIteratorImpl(),
448 sync_prefixes(prefixes
)
451 ~WholeStoreIteratorImpl() override
{ }
454 * Obtain a chunk of the store
456 * @param bl Encoded transaction that will recreate the chunk
457 * @param first_key Pair containing the first key to obtain, and that
458 * will contain the first key in the chunk (that may
459 * differ from the one passed on to the function)
460 * @param last_key[out] Last key in the chunk
462 void get_chunk_tx(TransactionRef tx
, uint64_t max
) override
{
463 ceph_assert(done
== false);
464 ceph_assert(iter
->valid() == true);
466 while (iter
->valid()) {
467 string
prefix(iter
->raw_key().first
);
468 string
key(iter
->raw_key().second
);
469 if (sync_prefixes
.count(prefix
)) {
470 bufferlist value
= iter
->value();
471 if (!add_chunk_entry(tx
, prefix
, key
, value
, max
))
476 ceph_assert(iter
->valid() == false);
480 pair
<string
,string
> get_next_key() override
{
481 ceph_assert(iter
->valid());
483 for (; iter
->valid(); iter
->next()) {
484 pair
<string
,string
> r
= iter
->raw_key();
485 if (sync_prefixes
.count(r
.first
) > 0) {
490 return pair
<string
,string
>();
493 bool _is_valid() override
{
494 return iter
->valid();
498 Synchronizer
get_synchronizer(pair
<string
,string
> &key
,
499 set
<string
> &prefixes
) {
500 KeyValueDB::WholeSpaceIterator iter
;
501 iter
= db
->get_wholespace_iterator();
503 if (!key
.first
.empty() && !key
.second
.empty())
504 iter
->upper_bound(key
.first
, key
.second
);
506 iter
->seek_to_first();
508 return std::shared_ptr
<StoreIteratorImpl
>(
509 new WholeStoreIteratorImpl(iter
, prefixes
)
513 KeyValueDB::Iterator
get_iterator(const string
&prefix
) {
514 ceph_assert(!prefix
.empty());
515 KeyValueDB::Iterator iter
= db
->get_iterator(prefix
);
516 iter
->seek_to_first();
520 KeyValueDB::WholeSpaceIterator
get_iterator() {
521 KeyValueDB::WholeSpaceIterator iter
;
522 iter
= db
->get_wholespace_iterator();
523 iter
->seek_to_first();
527 int get(const string
& prefix
, const string
& key
, bufferlist
& bl
) {
528 ceph_assert(bl
.length() == 0);
529 return db
->get(prefix
, key
, &bl
);
532 int get(const string
& prefix
, const version_t ver
, bufferlist
& bl
) {
535 return get(prefix
, os
.str(), bl
);
538 version_t
get(const string
& prefix
, const string
& key
) {
540 int err
= get(prefix
, key
, bl
);
542 if (err
== -ENOENT
) // if key doesn't exist, assume its value is 0
544 // we're not expecting any other negative return value, and we can't
545 // just return a negative value if we're returning a version_t
546 generic_dout(0) << "MonitorDBStore::get() error obtaining"
547 << " (" << prefix
<< ":" << key
<< "): "
548 << cpp_strerror(err
) << dendl
;
549 ceph_abort_msg("error obtaining key");
552 ceph_assert(bl
.length());
554 auto p
= bl
.cbegin();
559 bool exists(const string
& prefix
, const string
& key
) {
560 KeyValueDB::Iterator it
= db
->get_iterator(prefix
);
561 int err
= it
->lower_bound(key
);
565 return (it
->valid() && it
->key() == key
);
568 bool exists(const string
& prefix
, version_t ver
) {
571 return exists(prefix
, os
.str());
574 string
combine_strings(const string
& prefix
, const string
& value
) {
581 string
combine_strings(const string
& prefix
, const version_t ver
) {
584 return combine_strings(prefix
, os
.str());
587 void clear(set
<string
>& prefixes
) {
588 set
<string
>::iterator iter
;
589 KeyValueDB::Transaction dbt
= db
->get_transaction();
591 for (iter
= prefixes
.begin(); iter
!= prefixes
.end(); ++iter
) {
592 dbt
->rmkeys_by_prefix((*iter
));
594 int r
= db
->submit_transaction_sync(dbt
);
598 void _open(string kv_type
) {
599 string::const_reverse_iterator rit
;
601 for (rit
= path
.rbegin(); rit
!= path
.rend(); ++rit
, ++pos
) {
606 os
<< path
.substr(0, path
.size() - pos
) << "/store.db";
607 string full_path
= os
.str();
609 KeyValueDB
*db_ptr
= KeyValueDB::create(g_ceph_context
,
613 derr
<< __func__
<< " error initializing "
614 << kv_type
<< " db back storage in "
615 << full_path
<< dendl
;
616 ceph_abort_msg("MonitorDBStore: error initializing keyvaluedb back storage");
620 if (g_conf()->mon_debug_dump_transactions
) {
621 if (!g_conf()->mon_debug_dump_json
) {
622 dump_fd_binary
= ::open(
623 g_conf()->mon_debug_dump_location
.c_str(),
624 O_CREAT
|O_APPEND
|O_WRONLY
|O_CLOEXEC
, 0644);
625 if (dump_fd_binary
< 0) {
626 dump_fd_binary
= -errno
;
627 derr
<< "Could not open log file, got "
628 << cpp_strerror(dump_fd_binary
) << dendl
;
632 dump_fmt
.open_array_section("dump");
633 dump_fd_json
.open(g_conf()->mon_debug_dump_location
.c_str());
637 if (kv_type
== "rocksdb")
638 db
->init(g_conf()->mon_rocksdb_options
);
645 int open(ostream
&out
) {
647 int r
= read_meta("kv_backend", &kv_type
);
648 if (r
< 0 || kv_type
.empty()) {
649 // assume old monitors that did not mark the type were leveldb.
651 r
= write_meta("kv_backend", kv_type
);
660 // Monitors are few in number, so the resource cost of exposing
661 // very detailed stats is low: ramp up the priority of all the
662 // KV store's perf counters. Do this after open, because backend may
663 // not have constructed PerfCounters earlier.
664 if (db
->get_perf_counters()) {
665 db
->get_perf_counters()->set_prio_adjust(
666 PerfCountersBuilder::PRIO_USEFUL
- PerfCountersBuilder::PRIO_DEBUGONLY
);
674 int create_and_open(ostream
&out
) {
675 // record the type before open
677 int r
= read_meta("kv_backend", &kv_type
);
679 kv_type
= g_conf()->mon_keyvaluedb
;
680 r
= write_meta("kv_backend", kv_type
);
685 r
= db
->create_and_open(out
);
694 // there should be no work queued!
704 void compact_async() {
708 void compact_prefix(const string
& prefix
) {
709 db
->compact_prefix(prefix
);
712 uint64_t get_estimated_size(map
<string
, uint64_t> &extras
) {
713 return db
->get_estimated_size(extras
);
717 * write_meta - write a simple configuration key out-of-band
719 * Write a simple key/value pair for basic store configuration
720 * (e.g., a uuid or magic number) to an unopened/unmounted store.
721 * The default implementation writes this to a plaintext file in the
724 * A newline is appended.
726 * @param key key name (e.g., "fsid")
727 * @param value value (e.g., a uuid rendered as a string)
728 * @returns 0 for success, or an error code
730 int write_meta(const std::string
& key
,
731 const std::string
& value
) const {
734 int r
= safe_write_file(path
.c_str(), key
.c_str(),
735 v
.c_str(), v
.length(),
743 * read_meta - read a simple configuration key out-of-band
745 * Read a simple key value to an unopened/mounted store.
747 * Trailing whitespace is stripped off.
749 * @param key key name
750 * @param value pointer to value string
751 * @returns 0 for success, or an error code
753 int read_meta(const std::string
& key
,
754 std::string
*value
) const {
756 int r
= safe_read_file(path
.c_str(), key
.c_str(),
760 // drop trailing newlines
761 while (r
&& isspace(buf
[r
-1])) {
764 *value
= string(buf
, r
);
768 explicit MonitorDBStore(const string
& path
)
774 io_work(g_ceph_context
, "monstore", "fn_monstore"),
778 ceph_assert(!is_open
);
780 if (!g_conf()->mon_debug_dump_json
) {
781 ::close(dump_fd_binary
);
783 dump_fmt
.close_section();
784 dump_fmt
.flush(dump_fd_json
);
785 dump_fd_json
.flush();
786 dump_fd_json
.close();
793 WRITE_CLASS_ENCODER(MonitorDBStore::Op
)
794 WRITE_CLASS_ENCODER(MonitorDBStore::Transaction
)
796 #endif /* CEPH_MONITOR_DB_STORE_H */