1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2012 Inktank, Inc.
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
13 #ifndef CEPH_MONITOR_DB_STORE_H
14 #define CEPH_MONITOR_DB_STORE_H
16 #include "include/types.h"
17 #include "include/buffer.h"
21 #include <boost/scoped_ptr.hpp>
24 #include "kv/KeyValueDB.h"
26 #include "include/assert.h"
27 #include "common/Formatter.h"
28 #include "common/Finisher.h"
29 #include "common/errno.h"
30 #include "common/debug.h"
31 #include "common/safe_io.h"
33 #define dout_context g_ceph_context
38 boost::scoped_ptr
<KeyValueDB
> db
;
41 std::ofstream dump_fd_json
;
42 JSONFormatter dump_fmt
;
59 Op(int t
, string p
, string k
)
60 : type(t
), prefix(p
), key(k
) { }
61 Op(int t
, const string
& p
, string k
, bufferlist
& b
)
62 : type(t
), prefix(p
), key(k
), bl(b
) { }
63 Op(int t
, const string
& p
, string start
, string end
)
64 : type(t
), prefix(p
), key(start
), endkey(end
) { }
66 void encode(bufferlist
& encode_bl
) const {
67 ENCODE_START(2, 1, encode_bl
);
68 ::encode(type
, encode_bl
);
69 ::encode(prefix
, encode_bl
);
70 ::encode(key
, encode_bl
);
71 ::encode(bl
, encode_bl
);
72 ::encode(endkey
, encode_bl
);
73 ENCODE_FINISH(encode_bl
);
76 void decode(bufferlist::iterator
& decode_bl
) {
77 DECODE_START(2, decode_bl
);
78 ::decode(type
, decode_bl
);
79 ::decode(prefix
, decode_bl
);
80 ::decode(key
, decode_bl
);
81 ::decode(bl
, decode_bl
);
83 ::decode(endkey
, decode_bl
);
84 DECODE_FINISH(decode_bl
);
87 void dump(Formatter
*f
) const {
88 f
->dump_int("type", type
);
89 f
->dump_string("prefix", prefix
);
90 f
->dump_string("key", key
);
92 f
->dump_string("endkey", endkey
);
95 static void generate_test_instances(list
<Op
*>& ls
) {
97 // we get coverage here from the Transaction instances
102 typedef ceph::shared_ptr
<Transaction
> TransactionRef
;
105 uint64_t bytes
, keys
;
107 Transaction() : bytes(0), keys(0) {}
115 void put(string prefix
, string key
, bufferlist
& bl
) {
116 ops
.push_back(Op(OP_PUT
, prefix
, key
, bl
));
118 bytes
+= prefix
.length() + key
.length() + bl
.length();
121 void put(string prefix
, version_t ver
, bufferlist
& bl
) {
124 put(prefix
, os
.str(), bl
);
127 void put(string prefix
, string key
, version_t ver
) {
130 put(prefix
, key
, bl
);
133 void erase(string prefix
, string key
) {
134 ops
.push_back(Op(OP_ERASE
, prefix
, key
));
136 bytes
+= prefix
.length() + key
.length();
139 void erase(string prefix
, version_t ver
) {
142 erase(prefix
, os
.str());
145 void compact_prefix(string prefix
) {
146 ops
.push_back(Op(OP_COMPACT
, prefix
, string()));
149 void compact_range(string prefix
, string start
, string end
) {
150 ops
.push_back(Op(OP_COMPACT
, prefix
, start
, end
));
153 void encode(bufferlist
& bl
) const {
154 ENCODE_START(2, 1, bl
);
161 void decode(bufferlist::iterator
& bl
) {
171 static void generate_test_instances(list
<Transaction
*>& ls
) {
172 ls
.push_back(new Transaction
);
173 ls
.push_back(new Transaction
);
176 ls
.back()->put("prefix", "key", bl
);
177 ls
.back()->erase("prefix2", "key2");
178 ls
.back()->compact_prefix("prefix3");
179 ls
.back()->compact_range("prefix4", "from", "to");
182 void append(TransactionRef other
) {
183 ops
.splice(ops
.end(), other
->ops
);
185 bytes
+= other
->bytes
;
188 void append_from_encoded(bufferlist
& bl
) {
189 auto other(std::make_shared
<Transaction
>());
190 bufferlist::iterator it
= bl
.begin();
196 return (size() == 0);
199 size_t size() const {
202 uint64_t get_keys() const {
205 uint64_t get_bytes() const {
209 void dump(ceph::Formatter
*f
, bool dump_val
=false) const {
210 f
->open_object_section("transaction");
211 f
->open_array_section("ops");
212 list
<Op
>::const_iterator it
;
214 for (it
= ops
.begin(); it
!= ops
.end(); ++it
) {
216 f
->open_object_section("op");
217 f
->dump_int("op_num", op_num
++);
221 f
->dump_string("type", "PUT");
222 f
->dump_string("prefix", op
.prefix
);
223 f
->dump_string("key", op
.key
);
224 f
->dump_unsigned("length", op
.bl
.length());
228 f
->dump_string("bl", os
.str());
234 f
->dump_string("type", "ERASE");
235 f
->dump_string("prefix", op
.prefix
);
236 f
->dump_string("key", op
.key
);
241 f
->dump_string("type", "COMPACT");
242 f
->dump_string("prefix", op
.prefix
);
243 f
->dump_string("start", op
.key
);
244 f
->dump_string("end", op
.endkey
);
249 f
->dump_string("type", "unknown");
250 f
->dump_unsigned("op_code", op
.type
);
257 f
->dump_unsigned("num_keys", keys
);
258 f
->dump_unsigned("num_bytes", bytes
);
263 int apply_transaction(MonitorDBStore::TransactionRef t
) {
264 KeyValueDB::Transaction dbt
= db
->get_transaction();
267 if (!g_conf
->mon_debug_dump_json
) {
270 bl
.write_fd(dump_fd_binary
);
272 t
->dump(&dump_fmt
, true);
273 dump_fmt
.flush(dump_fd_json
);
274 dump_fd_json
.flush();
278 list
<pair
<string
, pair
<string
,string
> > > compact
;
279 for (list
<Op
>::const_iterator it
= t
->ops
.begin();
284 case Transaction::OP_PUT
:
285 dbt
->set(op
.prefix
, op
.key
, op
.bl
);
287 case Transaction::OP_ERASE
:
288 dbt
->rmkey(op
.prefix
, op
.key
);
290 case Transaction::OP_COMPACT
:
291 compact
.push_back(make_pair(op
.prefix
, make_pair(op
.key
, op
.endkey
)));
294 derr
<< __func__
<< " unknown op type " << op
.type
<< dendl
;
299 int r
= db
->submit_transaction_sync(dbt
);
301 while (!compact
.empty()) {
302 if (compact
.front().second
.first
== string() &&
303 compact
.front().second
.second
== string())
304 db
->compact_prefix_async(compact
.front().first
);
306 db
->compact_range_async(compact
.front().first
, compact
.front().second
.first
, compact
.front().second
.second
);
310 assert(0 == "failed to write to db");
315 struct C_DoTransaction
: public Context
{
316 MonitorDBStore
*store
;
317 MonitorDBStore::TransactionRef t
;
319 C_DoTransaction(MonitorDBStore
*s
, MonitorDBStore::TransactionRef t
,
321 : store(s
), t(t
), oncommit(f
)
323 void finish(int r
) override
{
324 /* The store serializes writes. Each transaction is handled
325 * sequentially by the io_work Finisher. If a transaction takes longer
326 * to apply its state to permanent storage, then no other transaction
327 * will be handled meanwhile.
329 * We will now randomly inject random delays. We can safely sleep prior
330 * to applying the transaction as it won't break the model.
332 double delay_prob
= g_conf
->mon_inject_transaction_delay_probability
;
333 if (delay_prob
&& (rand() % 10000 < delay_prob
* 10000.0)) {
335 double delay_max
= g_conf
->mon_inject_transaction_delay_max
;
336 delay
.set_from_double(delay_max
* (double)(rand() % 10000) / 10000.0);
337 lsubdout(g_ceph_context
, mon
, 1)
338 << "apply_transaction will be delayed for " << delay
339 << " seconds" << dendl
;
342 int ret
= store
->apply_transaction(t
);
343 oncommit
->complete(ret
);
350 * Queue a transaction to commit asynchronously. Trigger a context
351 * on completion (without any locks held).
353 void queue_transaction(MonitorDBStore::TransactionRef t
,
355 io_work
.queue(new C_DoTransaction(this, t
, oncommit
));
359 * block and flush all io activity
362 io_work
.wait_for_empty();
365 class StoreIteratorImpl
{
368 pair
<string
,string
> last_key
;
371 StoreIteratorImpl() : done(false) { }
372 virtual ~StoreIteratorImpl() { }
374 bool add_chunk_entry(TransactionRef tx
,
379 auto tmp(std::make_shared
<Transaction
>());
381 tmp
->put(prefix
, key
, value
);
387 size_t len
= tx_bl
.length() + tmp_bl
.length();
389 if (!tx
->empty() && (len
> max
)) {
394 last_key
.first
= prefix
;
395 last_key
.second
= key
;
397 if (g_conf
->mon_sync_debug
) {
398 ::encode(prefix
, crc_bl
);
399 ::encode(key
, crc_bl
);
400 ::encode(value
, crc_bl
);
406 virtual bool _is_valid() = 0;
410 if (g_conf
->mon_sync_debug
)
411 return crc_bl
.crc32c(0);
414 pair
<string
,string
> get_last_key() {
417 virtual bool has_next_chunk() {
418 return !done
&& _is_valid();
420 virtual void get_chunk_tx(TransactionRef tx
, uint64_t max
) = 0;
421 virtual pair
<string
,string
> get_next_key() = 0;
423 typedef ceph::shared_ptr
<StoreIteratorImpl
> Synchronizer
;
425 class WholeStoreIteratorImpl
: public StoreIteratorImpl
{
426 KeyValueDB::WholeSpaceIterator iter
;
427 set
<string
> sync_prefixes
;
430 WholeStoreIteratorImpl(KeyValueDB::WholeSpaceIterator iter
,
431 set
<string
> &prefixes
)
432 : StoreIteratorImpl(),
434 sync_prefixes(prefixes
)
437 ~WholeStoreIteratorImpl() override
{ }
440 * Obtain a chunk of the store
442 * @param bl Encoded transaction that will recreate the chunk
443 * @param first_key Pair containing the first key to obtain, and that
444 * will contain the first key in the chunk (that may
445 * differ from the one passed on to the function)
446 * @param last_key[out] Last key in the chunk
448 void get_chunk_tx(TransactionRef tx
, uint64_t max
) override
{
449 assert(done
== false);
450 assert(iter
->valid() == true);
452 while (iter
->valid()) {
453 string
prefix(iter
->raw_key().first
);
454 string
key(iter
->raw_key().second
);
455 if (sync_prefixes
.count(prefix
)) {
456 bufferlist value
= iter
->value();
457 if (!add_chunk_entry(tx
, prefix
, key
, value
, max
))
462 assert(iter
->valid() == false);
466 pair
<string
,string
> get_next_key() override
{
467 assert(iter
->valid());
469 for (; iter
->valid(); iter
->next()) {
470 pair
<string
,string
> r
= iter
->raw_key();
471 if (sync_prefixes
.count(r
.first
) > 0) {
476 return pair
<string
,string
>();
479 bool _is_valid() override
{
480 return iter
->valid();
484 Synchronizer
get_synchronizer(pair
<string
,string
> &key
,
485 set
<string
> &prefixes
) {
486 KeyValueDB::WholeSpaceIterator iter
;
487 iter
= db
->get_iterator();
489 if (!key
.first
.empty() && !key
.second
.empty())
490 iter
->upper_bound(key
.first
, key
.second
);
492 iter
->seek_to_first();
494 return ceph::shared_ptr
<StoreIteratorImpl
>(
495 new WholeStoreIteratorImpl(iter
, prefixes
)
499 KeyValueDB::Iterator
get_iterator(const string
&prefix
) {
500 assert(!prefix
.empty());
501 KeyValueDB::Iterator iter
= db
->get_iterator(prefix
);
502 iter
->seek_to_first();
506 KeyValueDB::WholeSpaceIterator
get_iterator() {
507 KeyValueDB::WholeSpaceIterator iter
;
508 iter
= db
->get_iterator();
509 iter
->seek_to_first();
513 int get(const string
& prefix
, const string
& key
, bufferlist
& bl
) {
514 assert(bl
.length() == 0);
515 return db
->get(prefix
, key
, &bl
);
518 int get(const string
& prefix
, const version_t ver
, bufferlist
& bl
) {
521 return get(prefix
, os
.str(), bl
);
524 version_t
get(const string
& prefix
, const string
& key
) {
526 int err
= get(prefix
, key
, bl
);
528 if (err
== -ENOENT
) // if key doesn't exist, assume its value is 0
530 // we're not expecting any other negative return value, and we can't
531 // just return a negative value if we're returning a version_t
532 generic_dout(0) << "MonitorDBStore::get() error obtaining"
533 << " (" << prefix
<< ":" << key
<< "): "
534 << cpp_strerror(err
) << dendl
;
535 assert(0 == "error obtaining key");
540 bufferlist::iterator p
= bl
.begin();
545 bool exists(const string
& prefix
, const string
& key
) {
546 KeyValueDB::Iterator it
= db
->get_iterator(prefix
);
547 int err
= it
->lower_bound(key
);
551 return (it
->valid() && it
->key() == key
);
554 bool exists(const string
& prefix
, version_t ver
) {
557 return exists(prefix
, os
.str());
560 string
combine_strings(const string
& prefix
, const string
& value
) {
567 string
combine_strings(const string
& prefix
, const version_t ver
) {
570 return combine_strings(prefix
, os
.str());
573 void clear(set
<string
>& prefixes
) {
574 set
<string
>::iterator iter
;
575 KeyValueDB::Transaction dbt
= db
->get_transaction();
577 for (iter
= prefixes
.begin(); iter
!= prefixes
.end(); ++iter
) {
578 dbt
->rmkeys_by_prefix((*iter
));
580 int r
= db
->submit_transaction_sync(dbt
);
584 void _open(string kv_type
) {
585 string::const_reverse_iterator rit
;
587 for (rit
= path
.rbegin(); rit
!= path
.rend(); ++rit
, ++pos
) {
592 os
<< path
.substr(0, path
.size() - pos
) << "/store.db";
593 string full_path
= os
.str();
595 KeyValueDB
*db_ptr
= KeyValueDB::create(g_ceph_context
,
599 derr
<< __func__
<< " error initializing "
600 << kv_type
<< " db back storage in "
601 << full_path
<< dendl
;
602 assert(0 == "MonitorDBStore: error initializing keyvaluedb back storage");
606 if (g_conf
->mon_debug_dump_transactions
) {
607 if (!g_conf
->mon_debug_dump_json
) {
608 dump_fd_binary
= ::open(
609 g_conf
->mon_debug_dump_location
.c_str(),
610 O_CREAT
|O_APPEND
|O_WRONLY
, 0644);
611 if (dump_fd_binary
< 0) {
612 dump_fd_binary
= -errno
;
613 derr
<< "Could not open log file, got "
614 << cpp_strerror(dump_fd_binary
) << dendl
;
618 dump_fmt
.open_array_section("dump");
619 dump_fd_json
.open(g_conf
->mon_debug_dump_location
.c_str());
623 if (kv_type
== "rocksdb")
624 db
->init(g_conf
->mon_rocksdb_options
);
629 int open(ostream
&out
) {
631 int r
= read_meta("kv_backend", &kv_type
);
632 if (r
< 0 || kv_type
.empty()) {
633 // assume old monitors that did not mark the type were leveldb.
635 r
= write_meta("kv_backend", kv_type
);
648 int create_and_open(ostream
&out
) {
649 // record the type before open
651 int r
= read_meta("kv_backend", &kv_type
);
653 kv_type
= g_conf
->mon_keyvaluedb
;
654 r
= write_meta("kv_backend", kv_type
);
659 r
= db
->create_and_open(out
);
668 // there should be no work queued!
678 void compact_prefix(const string
& prefix
) {
679 db
->compact_prefix(prefix
);
682 uint64_t get_estimated_size(map
<string
, uint64_t> &extras
) {
683 return db
->get_estimated_size(extras
);
687 * write_meta - write a simple configuration key out-of-band
689 * Write a simple key/value pair for basic store configuration
690 * (e.g., a uuid or magic number) to an unopened/unmounted store.
691 * The default implementation writes this to a plaintext file in the
694 * A newline is appended.
696 * @param key key name (e.g., "fsid")
697 * @param value value (e.g., a uuid rendered as a string)
698 * @returns 0 for success, or an error code
700 int write_meta(const std::string
& key
,
701 const std::string
& value
) const {
704 int r
= safe_write_file(path
.c_str(), key
.c_str(),
705 v
.c_str(), v
.length());
712 * read_meta - read a simple configuration key out-of-band
714 * Read a simple key value to an unopened/mounted store.
716 * Trailing whitespace is stripped off.
718 * @param key key name
719 * @param value pointer to value string
720 * @returns 0 for success, or an error code
722 int read_meta(const std::string
& key
,
723 std::string
*value
) const {
725 int r
= safe_read_file(path
.c_str(), key
.c_str(),
729 // drop trailing newlines
730 while (r
&& isspace(buf
[r
-1])) {
733 *value
= string(buf
, r
);
737 explicit MonitorDBStore(const string
& path
)
743 io_work(g_ceph_context
, "monstore", "fn_monstore"),
749 if (!g_conf
->mon_debug_dump_json
) {
750 ::close(dump_fd_binary
);
752 dump_fmt
.close_section();
753 dump_fmt
.flush(dump_fd_json
);
754 dump_fd_json
.flush();
755 dump_fd_json
.close();
762 WRITE_CLASS_ENCODER(MonitorDBStore::Op
)
763 WRITE_CLASS_ENCODER(MonitorDBStore::Transaction
)
765 #endif /* CEPH_MONITOR_DB_STORE_H */