#include <fstream>
#include "kv/KeyValueDB.h"
-#include "include/assert.h"
+#include "include/ceph_assert.h"
#include "common/Formatter.h"
#include "common/Finisher.h"
#include "common/errno.h"
#include "common/debug.h"
#include "common/safe_io.h"
+#include "common/blkdev.h"
#define dout_context g_ceph_context
public:
+ string get_devname() {
+ char devname[4096] = {0}, partition[4096];
+ get_device_by_path(path.c_str(), partition, devname,
+ sizeof(devname));
+ return devname;
+ }
+
struct Op {
uint8_t type;
string prefix;
void encode(bufferlist& encode_bl) const {
ENCODE_START(2, 1, encode_bl);
- ::encode(type, encode_bl);
- ::encode(prefix, encode_bl);
- ::encode(key, encode_bl);
- ::encode(bl, encode_bl);
- ::encode(endkey, encode_bl);
+ encode(type, encode_bl);
+ encode(prefix, encode_bl);
+ encode(key, encode_bl);
+ encode(bl, encode_bl);
+ encode(endkey, encode_bl);
ENCODE_FINISH(encode_bl);
}
- void decode(bufferlist::iterator& decode_bl) {
+ void decode(bufferlist::const_iterator& decode_bl) {
DECODE_START(2, decode_bl);
- ::decode(type, decode_bl);
- ::decode(prefix, decode_bl);
- ::decode(key, decode_bl);
- ::decode(bl, decode_bl);
+ decode(type, decode_bl);
+ decode(prefix, decode_bl);
+ decode(key, decode_bl);
+ decode(bl, decode_bl);
if (struct_v >= 2)
- ::decode(endkey, decode_bl);
+ decode(endkey, decode_bl);
DECODE_FINISH(decode_bl);
}
};
struct Transaction;
- typedef ceph::shared_ptr<Transaction> TransactionRef;
+ typedef std::shared_ptr<Transaction> TransactionRef;
struct Transaction {
list<Op> ops;
uint64_t bytes, keys;
}
void put(string prefix, string key, version_t ver) {
+ using ceph::encode;
bufferlist bl;
- ::encode(ver, bl);
+ encode(ver, bl);
put(prefix, key, bl);
}
void encode(bufferlist& bl) const {
ENCODE_START(2, 1, bl);
- ::encode(ops, bl);
- ::encode(bytes, bl);
- ::encode(keys, bl);
+ encode(ops, bl);
+ encode(bytes, bl);
+ encode(keys, bl);
ENCODE_FINISH(bl);
}
- void decode(bufferlist::iterator& bl) {
+ void decode(bufferlist::const_iterator& bl) {
DECODE_START(2, bl);
- ::decode(ops, bl);
+ decode(ops, bl);
if (struct_v >= 2) {
- ::decode(bytes, bl);
- ::decode(keys, bl);
+ decode(bytes, bl);
+ decode(keys, bl);
}
DECODE_FINISH(bl);
}
void append_from_encoded(bufferlist& bl) {
auto other(std::make_shared<Transaction>());
- bufferlist::iterator it = bl.begin();
+ auto it = bl.cbegin();
other->decode(it);
append(other);
}
KeyValueDB::Transaction dbt = db->get_transaction();
if (do_dump) {
- if (!g_conf->mon_debug_dump_json) {
+ if (!g_conf()->mon_debug_dump_json) {
bufferlist bl;
t->encode(bl);
bl.write_fd(dump_fd_binary);
compact.pop_front();
}
} else {
- assert(0 == "failed to write to db");
+ ceph_abort_msg("failed to write to db");
}
return r;
}
* We will now randomly inject random delays. We can safely sleep prior
* to applying the transaction as it won't break the model.
*/
- double delay_prob = g_conf->mon_inject_transaction_delay_probability;
+ double delay_prob = g_conf()->mon_inject_transaction_delay_probability;
if (delay_prob && (rand() % 10000 < delay_prob * 10000.0)) {
utime_t delay;
- double delay_max = g_conf->mon_inject_transaction_delay_max;
+ double delay_max = g_conf()->mon_inject_transaction_delay_max;
delay.set_from_double(delay_max * (double)(rand() % 10000) / 10000.0);
lsubdout(g_ceph_context, mon, 1)
<< "apply_transaction will be delayed for " << delay
last_key.first = prefix;
last_key.second = key;
- if (g_conf->mon_sync_debug) {
- ::encode(prefix, crc_bl);
- ::encode(key, crc_bl);
- ::encode(value, crc_bl);
+ if (g_conf()->mon_sync_debug) {
+ encode(prefix, crc_bl);
+ encode(key, crc_bl);
+ encode(value, crc_bl);
}
return true;
public:
__u32 crc() {
- if (g_conf->mon_sync_debug)
+ if (g_conf()->mon_sync_debug)
return crc_bl.crc32c(0);
return 0;
}
virtual void get_chunk_tx(TransactionRef tx, uint64_t max) = 0;
virtual pair<string,string> get_next_key() = 0;
};
- typedef ceph::shared_ptr<StoreIteratorImpl> Synchronizer;
+ typedef std::shared_ptr<StoreIteratorImpl> Synchronizer;
class WholeStoreIteratorImpl : public StoreIteratorImpl {
KeyValueDB::WholeSpaceIterator iter;
* @param last_key[out] Last key in the chunk
*/
void get_chunk_tx(TransactionRef tx, uint64_t max) override {
- assert(done == false);
- assert(iter->valid() == true);
+ ceph_assert(done == false);
+ ceph_assert(iter->valid() == true);
while (iter->valid()) {
string prefix(iter->raw_key().first);
}
iter->next();
}
- assert(iter->valid() == false);
+ ceph_assert(iter->valid() == false);
done = true;
}
pair<string,string> get_next_key() override {
- assert(iter->valid());
+ ceph_assert(iter->valid());
for (; iter->valid(); iter->next()) {
pair<string,string> r = iter->raw_key();
Synchronizer get_synchronizer(pair<string,string> &key,
set<string> &prefixes) {
KeyValueDB::WholeSpaceIterator iter;
- iter = db->get_iterator();
+ iter = db->get_wholespace_iterator();
if (!key.first.empty() && !key.second.empty())
iter->upper_bound(key.first, key.second);
else
iter->seek_to_first();
- return ceph::shared_ptr<StoreIteratorImpl>(
+ return std::shared_ptr<StoreIteratorImpl>(
new WholeStoreIteratorImpl(iter, prefixes)
);
}
KeyValueDB::Iterator get_iterator(const string &prefix) {
- assert(!prefix.empty());
+ ceph_assert(!prefix.empty());
KeyValueDB::Iterator iter = db->get_iterator(prefix);
iter->seek_to_first();
return iter;
KeyValueDB::WholeSpaceIterator get_iterator() {
KeyValueDB::WholeSpaceIterator iter;
- iter = db->get_iterator();
+ iter = db->get_wholespace_iterator();
iter->seek_to_first();
return iter;
}
int get(const string& prefix, const string& key, bufferlist& bl) {
- assert(bl.length() == 0);
+ ceph_assert(bl.length() == 0);
return db->get(prefix, key, &bl);
}
generic_dout(0) << "MonitorDBStore::get() error obtaining"
<< " (" << prefix << ":" << key << "): "
<< cpp_strerror(err) << dendl;
- assert(0 == "error obtaining key");
+ ceph_abort_msg("error obtaining key");
}
- assert(bl.length());
+ ceph_assert(bl.length());
version_t ver;
- bufferlist::iterator p = bl.begin();
- ::decode(ver, p);
+ auto p = bl.cbegin();
+ decode(ver, p);
return ver;
}
dbt->rmkeys_by_prefix((*iter));
}
int r = db->submit_transaction_sync(dbt);
- assert(r >= 0);
+ ceph_assert(r >= 0);
}
void _open(string kv_type) {
derr << __func__ << " error initializing "
<< kv_type << " db back storage in "
<< full_path << dendl;
- assert(0 == "MonitorDBStore: error initializing keyvaluedb back storage");
+ ceph_abort_msg("MonitorDBStore: error initializing keyvaluedb back storage");
}
db.reset(db_ptr);
- if (g_conf->mon_debug_dump_transactions) {
- if (!g_conf->mon_debug_dump_json) {
+ if (g_conf()->mon_debug_dump_transactions) {
+ if (!g_conf()->mon_debug_dump_json) {
dump_fd_binary = ::open(
- g_conf->mon_debug_dump_location.c_str(),
+ g_conf()->mon_debug_dump_location.c_str(),
O_CREAT|O_APPEND|O_WRONLY|O_CLOEXEC, 0644);
if (dump_fd_binary < 0) {
dump_fd_binary = -errno;
} else {
dump_fmt.reset();
dump_fmt.open_array_section("dump");
- dump_fd_json.open(g_conf->mon_debug_dump_location.c_str());
+ dump_fd_json.open(g_conf()->mon_debug_dump_location.c_str());
}
do_dump = true;
}
if (kv_type == "rocksdb")
- db->init(g_conf->mon_rocksdb_options);
+ db->init(g_conf()->mon_rocksdb_options);
else
db->init();
string kv_type;
int r = read_meta("kv_backend", &kv_type);
if (r < 0) {
- kv_type = g_conf->mon_keyvaluedb;
+ kv_type = g_conf()->mon_keyvaluedb;
r = write_meta("kv_backend", kv_type);
if (r < 0)
return r;
db->compact();
}
+ void compact_async() {
+ db->compact_async();
+ }
+
void compact_prefix(const string& prefix) {
db->compact_prefix(prefix);
}
is_open(false) {
}
~MonitorDBStore() {
- assert(!is_open);
+ ceph_assert(!is_open);
if (do_dump) {
- if (!g_conf->mon_debug_dump_json) {
+ if (!g_conf()->mon_debug_dump_json) {
::close(dump_fd_binary);
} else {
dump_fmt.close_section();