1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2012 Inktank, Inc.
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
13 #ifndef CEPH_MONITOR_DB_STORE_H
14 #define CEPH_MONITOR_DB_STORE_H
16 #include "include/types.h"
17 #include "include/buffer.h"
21 #include <boost/scoped_ptr.hpp>
24 #include "kv/KeyValueDB.h"
26 #include "include/ceph_assert.h"
27 #include "common/Formatter.h"
28 #include "common/Finisher.h"
29 #include "common/errno.h"
30 #include "common/debug.h"
31 #include "common/safe_io.h"
32 #include "common/blkdev.h"
33 #include "common/PriorityCache.h"
35 #define dout_context g_ceph_context
40 boost::scoped_ptr
<KeyValueDB
> db
;
43 std::ofstream dump_fd_json
;
44 ceph::JSONFormatter dump_fmt
;
53 std::string
get_devname() {
54 char devname
[4096] = {0}, partition
[4096];
55 get_device_by_path(path
.c_str(), partition
, devname
,
60 std::shared_ptr
<PriorityCache::PriCache
> get_priority_cache() const {
61 return db
->get_priority_cache();
67 std::string key
, endkey
;
68 ceph::buffer::list bl
;
72 Op(int t
, const std::string
& p
, const std::string
& k
)
73 : type(t
), prefix(p
), key(k
) { }
74 Op(int t
, const std::string
& p
, const std::string
& k
, const ceph::buffer::list
& b
)
75 : type(t
), prefix(p
), key(k
), bl(b
) { }
76 Op(int t
, const std::string
& p
, const std::string
& start
, const std::string
& end
)
77 : type(t
), prefix(p
), key(start
), endkey(end
) { }
79 void encode(ceph::buffer::list
& encode_bl
) const {
80 ENCODE_START(2, 1, encode_bl
);
81 encode(type
, encode_bl
);
82 encode(prefix
, encode_bl
);
83 encode(key
, encode_bl
);
84 encode(bl
, encode_bl
);
85 encode(endkey
, encode_bl
);
86 ENCODE_FINISH(encode_bl
);
89 void decode(ceph::buffer::list::const_iterator
& decode_bl
) {
90 DECODE_START(2, decode_bl
);
91 decode(type
, decode_bl
);
92 decode(prefix
, decode_bl
);
93 decode(key
, decode_bl
);
94 decode(bl
, decode_bl
);
96 decode(endkey
, decode_bl
);
97 DECODE_FINISH(decode_bl
);
100 void dump(ceph::Formatter
*f
) const {
101 f
->dump_int("type", type
);
102 f
->dump_string("prefix", prefix
);
103 f
->dump_string("key", key
);
104 if (endkey
.length()) {
105 f
->dump_string("endkey", endkey
);
109 int approx_size() const {
117 static void generate_test_instances(std::list
<Op
*>& ls
) {
118 ls
.push_back(new Op
);
119 // we get coverage here from the Transaction instances
124 typedef std::shared_ptr
<Transaction
> TransactionRef
;
127 uint64_t bytes
, keys
;
129 Transaction() : bytes(6 + 4 + 8*2), keys(0) {}
138 void put(const std::string
& prefix
, const std::string
& key
, const ceph::buffer::list
& bl
) {
139 ops
.push_back(Op(OP_PUT
, prefix
, key
, bl
));
141 bytes
+= ops
.back().approx_size();
144 void put(const std::string
& prefix
, version_t ver
, const ceph::buffer::list
& bl
) {
145 std::ostringstream os
;
147 put(prefix
, os
.str(), bl
);
150 void put(const std::string
& prefix
, const std::string
& key
, version_t ver
) {
152 ceph::buffer::list bl
;
154 put(prefix
, key
, bl
);
157 void erase(const std::string
& prefix
, const std::string
& key
) {
158 ops
.push_back(Op(OP_ERASE
, prefix
, key
));
160 bytes
+= ops
.back().approx_size();
163 void erase(const std::string
& prefix
, version_t ver
) {
164 std::ostringstream os
;
166 erase(prefix
, os
.str());
169 void erase_range(const std::string
& prefix
, const std::string
& begin
,
170 const std::string
& end
) {
171 ops
.push_back(Op(OP_ERASE_RANGE
, prefix
, begin
, end
));
173 bytes
+= ops
.back().approx_size();
176 void compact_prefix(const std::string
& prefix
) {
177 ops
.push_back(Op(OP_COMPACT
, prefix
, {}));
180 void compact_range(const std::string
& prefix
, const std::string
& start
,
181 const std::string
& end
) {
182 ops
.push_back(Op(OP_COMPACT
, prefix
, start
, end
));
185 void encode(ceph::buffer::list
& bl
) const {
186 ENCODE_START(2, 1, bl
);
193 void decode(ceph::buffer::list::const_iterator
& bl
) {
203 static void generate_test_instances(std::list
<Transaction
*>& ls
) {
204 ls
.push_back(new Transaction
);
205 ls
.push_back(new Transaction
);
206 ceph::buffer::list bl
;
208 ls
.back()->put("prefix", "key", bl
);
209 ls
.back()->erase("prefix2", "key2");
210 ls
.back()->erase_range("prefix3", "key3", "key4");
211 ls
.back()->compact_prefix("prefix3");
212 ls
.back()->compact_range("prefix4", "from", "to");
215 void append(TransactionRef other
) {
216 ops
.splice(ops
.end(), other
->ops
);
218 bytes
+= other
->bytes
;
221 void append_from_encoded(ceph::buffer::list
& bl
) {
222 auto other(std::make_shared
<Transaction
>());
223 auto it
= bl
.cbegin();
229 return (size() == 0);
232 size_t size() const {
235 uint64_t get_keys() const {
238 uint64_t get_bytes() const {
242 void dump(ceph::Formatter
*f
, bool dump_val
=false) const {
243 f
->open_object_section("transaction");
244 f
->open_array_section("ops");
246 for (auto it
= ops
.begin(); it
!= ops
.end(); ++it
) {
248 f
->open_object_section("op");
249 f
->dump_int("op_num", op_num
++);
253 f
->dump_string("type", "PUT");
254 f
->dump_string("prefix", op
.prefix
);
255 f
->dump_string("key", op
.key
);
256 f
->dump_unsigned("length", op
.bl
.length());
258 std::ostringstream os
;
260 f
->dump_string("bl", os
.str());
266 f
->dump_string("type", "ERASE");
267 f
->dump_string("prefix", op
.prefix
);
268 f
->dump_string("key", op
.key
);
273 f
->dump_string("type", "ERASE_RANGE");
274 f
->dump_string("prefix", op
.prefix
);
275 f
->dump_string("start", op
.key
);
276 f
->dump_string("end", op
.endkey
);
281 f
->dump_string("type", "COMPACT");
282 f
->dump_string("prefix", op
.prefix
);
283 f
->dump_string("start", op
.key
);
284 f
->dump_string("end", op
.endkey
);
289 f
->dump_string("type", "unknown");
290 f
->dump_unsigned("op_code", op
.type
);
297 f
->dump_unsigned("num_keys", keys
);
298 f
->dump_unsigned("num_bytes", bytes
);
303 int apply_transaction(MonitorDBStore::TransactionRef t
) {
304 KeyValueDB::Transaction dbt
= db
->get_transaction();
307 if (!g_conf()->mon_debug_dump_json
) {
308 ceph::buffer::list bl
;
310 bl
.write_fd(dump_fd_binary
);
312 t
->dump(&dump_fmt
, true);
313 dump_fmt
.flush(dump_fd_json
);
314 dump_fd_json
.flush();
318 std::list
<std::pair
<std::string
, std::pair
<std::string
,std::string
>>> compact
;
319 for (auto it
= t
->ops
.begin(); it
!= t
->ops
.end(); ++it
) {
322 case Transaction::OP_PUT
:
323 dbt
->set(op
.prefix
, op
.key
, op
.bl
);
325 case Transaction::OP_ERASE
:
326 dbt
->rmkey(op
.prefix
, op
.key
);
328 case Transaction::OP_ERASE_RANGE
:
329 dbt
->rm_range_keys(op
.prefix
, op
.key
, op
.endkey
);
331 case Transaction::OP_COMPACT
:
332 compact
.push_back(make_pair(op
.prefix
, make_pair(op
.key
, op
.endkey
)));
335 derr
<< __func__
<< " unknown op type " << op
.type
<< dendl
;
340 int r
= db
->submit_transaction_sync(dbt
);
342 while (!compact
.empty()) {
343 if (compact
.front().second
.first
== std::string() &&
344 compact
.front().second
.second
== std::string())
345 db
->compact_prefix_async(compact
.front().first
);
347 db
->compact_range_async(compact
.front().first
, compact
.front().second
.first
, compact
.front().second
.second
);
351 ceph_abort_msg("failed to write to db");
356 struct C_DoTransaction
: public Context
{
357 MonitorDBStore
*store
;
358 MonitorDBStore::TransactionRef t
;
360 C_DoTransaction(MonitorDBStore
*s
, MonitorDBStore::TransactionRef t
,
362 : store(s
), t(t
), oncommit(f
)
364 void finish(int r
) override
{
365 /* The store serializes writes. Each transaction is handled
366 * sequentially by the io_work Finisher. If a transaction takes longer
367 * to apply its state to permanent storage, then no other transaction
368 * will be handled meanwhile.
370 * We will now randomly inject random delays. We can safely sleep prior
371 * to applying the transaction as it won't break the model.
373 double delay_prob
= g_conf()->mon_inject_transaction_delay_probability
;
374 if (delay_prob
&& (rand() % 10000 < delay_prob
* 10000.0)) {
376 double delay_max
= g_conf()->mon_inject_transaction_delay_max
;
377 delay
.set_from_double(delay_max
* (double)(rand() % 10000) / 10000.0);
378 lsubdout(g_ceph_context
, mon
, 1)
379 << "apply_transaction will be delayed for " << delay
380 << " seconds" << dendl
;
383 int ret
= store
->apply_transaction(t
);
384 oncommit
->complete(ret
);
391 * Queue a transaction to commit asynchronously. Trigger a context
392 * on completion (without any locks held).
394 void queue_transaction(MonitorDBStore::TransactionRef t
,
396 io_work
.queue(new C_DoTransaction(this, t
, oncommit
));
400 * block and flush all io activity
403 io_work
.wait_for_empty();
406 class StoreIteratorImpl
{
409 std::pair
<std::string
,std::string
> last_key
;
410 ceph::buffer::list crc_bl
;
412 StoreIteratorImpl() : done(false) { }
413 virtual ~StoreIteratorImpl() { }
415 virtual bool _is_valid() = 0;
419 if (g_conf()->mon_sync_debug
)
420 return crc_bl
.crc32c(0);
423 std::pair
<std::string
,std::string
> get_last_key() {
426 virtual bool has_next_chunk() {
427 return !done
&& _is_valid();
429 virtual void get_chunk_tx(TransactionRef tx
, uint64_t max_bytes
,
430 uint64_t max_keys
) = 0;
431 virtual std::pair
<std::string
,std::string
> get_next_key() = 0;
433 typedef std::shared_ptr
<StoreIteratorImpl
> Synchronizer
;
435 class WholeStoreIteratorImpl
: public StoreIteratorImpl
{
436 KeyValueDB::WholeSpaceIterator iter
;
437 std::set
<std::string
> sync_prefixes
;
440 WholeStoreIteratorImpl(KeyValueDB::WholeSpaceIterator iter
,
441 std::set
<std::string
> &prefixes
)
442 : StoreIteratorImpl(),
444 sync_prefixes(prefixes
)
447 ~WholeStoreIteratorImpl() override
{ }
450 * Obtain a chunk of the store
452 * @param bl Encoded transaction that will recreate the chunk
453 * @param first_key Pair containing the first key to obtain, and that
454 * will contain the first key in the chunk (that may
455 * differ from the one passed on to the function)
456 * @param last_key[out] Last key in the chunk
458 void get_chunk_tx(TransactionRef tx
, uint64_t max_bytes
,
459 uint64_t max_keys
) override
{
461 ceph_assert(done
== false);
462 ceph_assert(iter
->valid() == true);
464 while (iter
->valid()) {
465 std::string
prefix(iter
->raw_key().first
);
466 std::string
key(iter
->raw_key().second
);
467 if (sync_prefixes
.count(prefix
)) {
468 ceph::buffer::list value
= iter
->value();
470 (tx
->get_bytes() + value
.length() + key
.size() +
471 prefix
.size() < max_bytes
&&
472 tx
->get_keys() < max_keys
)) {
473 // NOTE: putting every key in a separate transaction is
474 // questionable as far as efficiency goes
475 auto tmp(std::make_shared
<Transaction
>());
476 tmp
->put(prefix
, key
, value
);
478 if (g_conf()->mon_sync_debug
) {
479 encode(prefix
, crc_bl
);
481 encode(value
, crc_bl
);
484 last_key
.first
= prefix
;
485 last_key
.second
= key
;
491 ceph_assert(iter
->valid() == false);
495 std::pair
<std::string
,std::string
> get_next_key() override
{
496 ceph_assert(iter
->valid());
498 for (; iter
->valid(); iter
->next()) {
499 std::pair
<std::string
,std::string
> r
= iter
->raw_key();
500 if (sync_prefixes
.count(r
.first
) > 0) {
505 return std::pair
<std::string
,std::string
>();
508 bool _is_valid() override
{
509 return iter
->valid();
513 Synchronizer
get_synchronizer(std::pair
<std::string
,std::string
> &key
,
514 std::set
<std::string
> &prefixes
) {
515 KeyValueDB::WholeSpaceIterator iter
;
516 iter
= db
->get_wholespace_iterator();
518 if (!key
.first
.empty() && !key
.second
.empty())
519 iter
->upper_bound(key
.first
, key
.second
);
521 iter
->seek_to_first();
523 return std::shared_ptr
<StoreIteratorImpl
>(
524 new WholeStoreIteratorImpl(iter
, prefixes
)
528 KeyValueDB::Iterator
get_iterator(const std::string
&prefix
) {
529 ceph_assert(!prefix
.empty());
530 KeyValueDB::Iterator iter
= db
->get_iterator(prefix
);
531 iter
->seek_to_first();
535 KeyValueDB::WholeSpaceIterator
get_iterator() {
536 KeyValueDB::WholeSpaceIterator iter
;
537 iter
= db
->get_wholespace_iterator();
538 iter
->seek_to_first();
542 int get(const std::string
& prefix
, const std::string
& key
, ceph::buffer::list
& bl
) {
543 ceph_assert(bl
.length() == 0);
544 return db
->get(prefix
, key
, &bl
);
547 int get(const std::string
& prefix
, const version_t ver
, ceph::buffer::list
& bl
) {
548 std::ostringstream os
;
550 return get(prefix
, os
.str(), bl
);
553 version_t
get(const std::string
& prefix
, const std::string
& key
) {
555 ceph::buffer::list bl
;
556 int err
= get(prefix
, key
, bl
);
558 if (err
== -ENOENT
) // if key doesn't exist, assume its value is 0
560 // we're not expecting any other negative return value, and we can't
561 // just return a negative value if we're returning a version_t
562 generic_dout(0) << "MonitorDBStore::get() error obtaining"
563 << " (" << prefix
<< ":" << key
<< "): "
564 << cpp_strerror(err
) << dendl
;
565 ceph_abort_msg("error obtaining key");
568 ceph_assert(bl
.length());
570 auto p
= bl
.cbegin();
575 bool exists(const std::string
& prefix
, const std::string
& key
) {
576 KeyValueDB::Iterator it
= db
->get_iterator(prefix
);
577 int err
= it
->lower_bound(key
);
581 return (it
->valid() && it
->key() == key
);
584 bool exists(const std::string
& prefix
, version_t ver
) {
585 std::ostringstream os
;
587 return exists(prefix
, os
.str());
590 std::string
combine_strings(const std::string
& prefix
, const std::string
& value
) {
591 std::string out
= prefix
;
597 std::string
combine_strings(const std::string
& prefix
, const version_t ver
) {
598 std::ostringstream os
;
600 return combine_strings(prefix
, os
.str());
603 void clear(std::set
<std::string
>& prefixes
) {
604 KeyValueDB::Transaction dbt
= db
->get_transaction();
606 for (auto iter
= prefixes
.begin(); iter
!= prefixes
.end(); ++iter
) {
607 dbt
->rmkeys_by_prefix((*iter
));
609 int r
= db
->submit_transaction_sync(dbt
);
613 void _open(const std::string
& kv_type
) {
615 for (auto rit
= path
.rbegin(); rit
!= path
.rend(); ++rit
, ++pos
) {
619 std::ostringstream os
;
620 os
<< path
.substr(0, path
.size() - pos
) << "/store.db";
621 std::string full_path
= os
.str();
623 KeyValueDB
*db_ptr
= KeyValueDB::create(g_ceph_context
,
627 derr
<< __func__
<< " error initializing "
628 << kv_type
<< " db back storage in "
629 << full_path
<< dendl
;
630 ceph_abort_msg("MonitorDBStore: error initializing keyvaluedb back storage");
634 if (g_conf()->mon_debug_dump_transactions
) {
635 if (!g_conf()->mon_debug_dump_json
) {
636 dump_fd_binary
= ::open(
637 g_conf()->mon_debug_dump_location
.c_str(),
638 O_CREAT
|O_APPEND
|O_WRONLY
|O_CLOEXEC
, 0644);
639 if (dump_fd_binary
< 0) {
640 dump_fd_binary
= -errno
;
641 derr
<< "Could not open log file, got "
642 << cpp_strerror(dump_fd_binary
) << dendl
;
646 dump_fmt
.open_array_section("dump");
647 dump_fd_json
.open(g_conf()->mon_debug_dump_location
.c_str());
651 if (kv_type
== "rocksdb")
652 db
->init(g_conf()->mon_rocksdb_options
);
659 int open(std::ostream
&out
) {
661 int r
= read_meta("kv_backend", &kv_type
);
662 if (r
< 0 || kv_type
.empty()) {
663 // assume old monitors that did not mark the type were leveldb.
665 r
= write_meta("kv_backend", kv_type
);
674 // Monitors are few in number, so the resource cost of exposing
675 // very detailed stats is low: ramp up the priority of all the
676 // KV store's perf counters. Do this after open, because backend may
677 // not have constructed PerfCounters earlier.
678 if (db
->get_perf_counters()) {
679 db
->get_perf_counters()->set_prio_adjust(
680 PerfCountersBuilder::PRIO_USEFUL
- PerfCountersBuilder::PRIO_DEBUGONLY
);
688 int create_and_open(std::ostream
&out
) {
689 // record the type before open
691 int r
= read_meta("kv_backend", &kv_type
);
693 kv_type
= g_conf()->mon_keyvaluedb
;
694 r
= write_meta("kv_backend", kv_type
);
699 r
= db
->create_and_open(out
);
708 // there should be no work queued!
718 void compact_async() {
722 void compact_prefix(const std::string
& prefix
) {
723 db
->compact_prefix(prefix
);
726 uint64_t get_estimated_size(std::map
<std::string
, uint64_t> &extras
) {
727 return db
->get_estimated_size(extras
);
731 * write_meta - write a simple configuration key out-of-band
733 * Write a simple key/value pair for basic store configuration
734 * (e.g., a uuid or magic number) to an unopened/unmounted store.
735 * The default implementation writes this to a plaintext file in the
738 * A newline is appended.
740 * @param key key name (e.g., "fsid")
741 * @param value value (e.g., a uuid rendered as a string)
742 * @returns 0 for success, or an error code
744 int write_meta(const std::string
& key
,
745 const std::string
& value
) const {
746 std::string v
= value
;
748 int r
= safe_write_file(path
.c_str(), key
.c_str(),
749 v
.c_str(), v
.length(),
757 * read_meta - read a simple configuration key out-of-band
759 * Read a simple key value to an unopened/mounted store.
761 * Trailing whitespace is stripped off.
763 * @param key key name
764 * @param value pointer to value string
765 * @returns 0 for success, or an error code
767 int read_meta(const std::string
& key
,
768 std::string
*value
) const {
770 int r
= safe_read_file(path
.c_str(), key
.c_str(),
774 // drop trailing newlines
775 while (r
&& isspace(buf
[r
-1])) {
778 *value
= std::string(buf
, r
);
782 explicit MonitorDBStore(const std::string
& path
)
788 io_work(g_ceph_context
, "monstore", "fn_monstore"),
792 ceph_assert(!is_open
);
794 if (!g_conf()->mon_debug_dump_json
) {
795 ::close(dump_fd_binary
);
797 dump_fmt
.close_section();
798 dump_fmt
.flush(dump_fd_json
);
799 dump_fd_json
.flush();
800 dump_fd_json
.close();
807 WRITE_CLASS_ENCODER(MonitorDBStore::Op
)
808 WRITE_CLASS_ENCODER(MonitorDBStore::Transaction
)
810 #endif /* CEPH_MONITOR_DB_STORE_H */