Op()
: type(0) { }
- Op(int t, string p, string k)
+ Op(int t, const string& p, const string& k)
: type(t), prefix(p), key(k) { }
- Op(int t, const string& p, string k, bufferlist& b)
+ Op(int t, const string& p, const string& k, const bufferlist& b)
: type(t), prefix(p), key(k), bl(b) { }
- Op(int t, const string& p, string start, string end)
+ Op(int t, const string& p, const string& start, const string& end)
: type(t), prefix(p), key(start), endkey(end) { }
void encode(bufferlist& encode_bl) const {
f->dump_int("type", type);
f->dump_string("prefix", prefix);
f->dump_string("key", key);
- if (endkey.length())
+ if (endkey.length()) {
f->dump_string("endkey", endkey);
+ }
+ }
+
+ int approx_size() const {
+ return 6 + 1 +
+ 4 + prefix.size() +
+ 4 + key.size() +
+ 4 + endkey.size() +
+ 4 + bl.length();
}
static void generate_test_instances(list<Op*>& ls) {
list<Op> ops;
uint64_t bytes, keys;
- Transaction() : bytes(0), keys(0) {}
+ Transaction() : bytes(6 + 4 + 8*2), keys(0) {}
enum {
OP_PUT = 1,
OP_ERASE = 2,
OP_COMPACT = 3,
+ OP_ERASE_RANGE = 4,
};
- void put(string prefix, string key, bufferlist& bl) {
+ void put(const string& prefix, const string& key, const bufferlist& bl) {
ops.push_back(Op(OP_PUT, prefix, key, bl));
++keys;
- bytes += prefix.length() + key.length() + bl.length();
+ bytes += ops.back().approx_size();
}
- void put(string prefix, version_t ver, bufferlist& bl) {
+ void put(const string& prefix, version_t ver, const bufferlist& bl) {
ostringstream os;
os << ver;
put(prefix, os.str(), bl);
}
- void put(string prefix, string key, version_t ver) {
+ void put(const string& prefix, const string& key, version_t ver) {
using ceph::encode;
bufferlist bl;
encode(ver, bl);
put(prefix, key, bl);
}
- void erase(string prefix, string key) {
+ void erase(const string& prefix, const string& key) {
ops.push_back(Op(OP_ERASE, prefix, key));
++keys;
- bytes += prefix.length() + key.length();
+ bytes += ops.back().approx_size();
}
- void erase(string prefix, version_t ver) {
+ void erase(const string& prefix, version_t ver) {
ostringstream os;
os << ver;
erase(prefix, os.str());
}
- void compact_prefix(string prefix) {
+ void erase_range(const string& prefix, const string& begin,
+ const string& end) {
+ ops.push_back(Op(OP_ERASE_RANGE, prefix, begin, end));
+ ++keys;
+ bytes += ops.back().approx_size();
+ }
+
+ void compact_prefix(const string& prefix) {
ops.push_back(Op(OP_COMPACT, prefix, string()));
}
- void compact_range(string prefix, string start, string end) {
+ void compact_range(const string& prefix, const string& start,
+ const string& end) {
ops.push_back(Op(OP_COMPACT, prefix, start, end));
}
bl.append("value");
ls.back()->put("prefix", "key", bl);
ls.back()->erase("prefix2", "key2");
+ ls.back()->erase_range("prefix3", "key3", "key4");
ls.back()->compact_prefix("prefix3");
ls.back()->compact_range("prefix4", "from", "to");
}
f->dump_string("key", op.key);
}
break;
+ case OP_ERASE_RANGE:
+ {
+ f->dump_string("type", "ERASE_RANGE");
+ f->dump_string("prefix", op.prefix);
+ f->dump_string("start", op.key);
+ f->dump_string("end", op.endkey);
+ }
+ break;
case OP_COMPACT:
{
f->dump_string("type", "COMPACT");
case Transaction::OP_ERASE:
dbt->rmkey(op.prefix, op.key);
break;
+ case Transaction::OP_ERASE_RANGE:
+ dbt->rm_range_keys(op.prefix, op.key, op.endkey);
+ break;
case Transaction::OP_COMPACT:
compact.push_back(make_pair(op.prefix, make_pair(op.key, op.endkey)));
break;
StoreIteratorImpl() : done(false) { }
virtual ~StoreIteratorImpl() { }
- bool add_chunk_entry(TransactionRef tx,
- string &prefix,
- string &key,
- bufferlist &value,
- uint64_t max) {
- auto tmp(std::make_shared<Transaction>());
- bufferlist tmp_bl;
- tmp->put(prefix, key, value);
- tmp->encode(tmp_bl);
-
- bufferlist tx_bl;
- tx->encode(tx_bl);
-
- size_t len = tx_bl.length() + tmp_bl.length();
-
- if (!tx->empty() && (len > max)) {
- return false;
- }
-
- tx->append(tmp);
- last_key.first = prefix;
- last_key.second = key;
-
- if (g_conf()->mon_sync_debug) {
- encode(prefix, crc_bl);
- encode(key, crc_bl);
- encode(value, crc_bl);
- }
-
- return true;
- }
-
virtual bool _is_valid() = 0;
public:
virtual bool has_next_chunk() {
return !done && _is_valid();
}
- virtual void get_chunk_tx(TransactionRef tx, uint64_t max) = 0;
+ virtual void get_chunk_tx(TransactionRef tx, uint64_t max_bytes,
+ uint64_t max_keys) = 0;
virtual pair<string,string> get_next_key() = 0;
};
typedef std::shared_ptr<StoreIteratorImpl> Synchronizer;
* differ from the one passed on to the function)
* @param last_key[out] Last key in the chunk
*/
- void get_chunk_tx(TransactionRef tx, uint64_t max) override {
+ void get_chunk_tx(TransactionRef tx, uint64_t max_bytes,
+ uint64_t max_keys) override {
ceph_assert(done == false);
ceph_assert(iter->valid() == true);
string key(iter->raw_key().second);
if (sync_prefixes.count(prefix)) {
bufferlist value = iter->value();
- if (!add_chunk_entry(tx, prefix, key, value, max))
+ if (tx->empty() ||
+ (tx->get_bytes() + value.length() + key.size() +
+ prefix.size() < max_bytes &&
+ tx->get_keys() < max_keys)) {
+ // NOTE: putting every key in a separate transaction is
+ // questionable as far as efficiency goes
+ auto tmp(std::make_shared<Transaction>());
+ tmp->put(prefix, key, value);
+ tx->append(tmp);
+ if (g_conf()->mon_sync_debug) {
+ encode(prefix, crc_bl);
+ encode(key, crc_bl);
+ encode(value, crc_bl);
+ }
+ } else {
+ last_key.first = prefix;
+ last_key.second = key;
return;
+ }
}
iter->next();
}
ceph_assert(r >= 0);
}
- void _open(string kv_type) {
+ void _open(const string& kv_type) {
string::const_reverse_iterator rit;
int pos = 0;
for (rit = path.rbegin(); rit != path.rend(); ++rit, ++pos) {