1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 #include "KineticStore.h"
4 #include "common/ceph_crypto.h"
11 #include "common/perf_counters.h"
13 #define dout_subsys ceph_subsys_kinetic
15 int KineticStore::init()
17 // init defaults. caller can override these if they want
18 // prior to calling open.
19 host
= cct
->_conf
->kinetic_host
;
20 port
= cct
->_conf
->kinetic_port
;
21 user_id
= cct
->_conf
->kinetic_user_id
;
22 hmac_key
= cct
->_conf
->kinetic_hmac_key
;
23 use_ssl
= cct
->_conf
->kinetic_use_ssl
;
27 int KineticStore::_test_init(CephContext
*c
)
29 kinetic::KineticConnectionFactory conn_factory
=
30 kinetic::NewKineticConnectionFactory();
32 kinetic::ConnectionOptions options
;
33 options
.host
= cct
->_conf
->kinetic_host
;
34 options
.port
= cct
->_conf
->kinetic_port
;
35 options
.user_id
= cct
->_conf
->kinetic_user_id
;
36 options
.hmac_key
= cct
->_conf
->kinetic_hmac_key
;
37 options
.use_ssl
= cct
->_conf
->kinetic_use_ssl
;
39 kinetic::Status status
= conn_factory
.NewThreadsafeBlockingConnection(options
, kinetic_conn
, 10);
42 derr
<< __func__
<< " Unable to connect to kinetic store " << options
.host
43 << ":" << options
.port
<< " : " << status
.ToString() << dendl
;
44 return status
.ok() ? 0 : -EIO
;
47 int KineticStore::open(ostream
&out
, const vector
<ColumnFamily
>& cfs
)
50 ceph_abort_msg("Not implemented");
52 return do_open(out
, false);
55 int KineticStore::create_and_open(ostream
&out
, const vector
<ColumnFamily
>& cfs
)
58 ceph_abort_msg("Not implemented");
60 return do_open(out
, true);
63 int KineticStore::do_open(ostream
&out
, bool create_if_missing
)
65 kinetic::KineticConnectionFactory conn_factory
=
66 kinetic::NewKineticConnectionFactory();
67 kinetic::ConnectionOptions options
;
70 options
.user_id
= user_id
;
71 options
.hmac_key
= hmac_key
;
72 options
.use_ssl
= use_ssl
;
73 kinetic::Status status
= conn_factory
.NewThreadsafeBlockingConnection(options
, kinetic_conn
, 10);
75 derr
<< "Unable to connect to kinetic store " << host
<< ":" << port
76 << " : " << status
.ToString() << dendl
;
80 PerfCountersBuilder
plb(g_ceph_context
, "kinetic", l_kinetic_first
, l_kinetic_last
);
81 plb
.add_u64_counter(l_kinetic_gets
, "kinetic_get", "Gets");
82 plb
.add_u64_counter(l_kinetic_txns
, "kinetic_transaction", "Transactions");
83 logger
= plb
.create_perf_counters();
84 cct
->get_perfcounters_collection()->add(logger
);
88 KineticStore::KineticStore(CephContext
*c
) :
92 host
= c
->_conf
->kinetic_host
;
93 port
= c
->_conf
->kinetic_port
;
94 user_id
= c
->_conf
->kinetic_user_id
;
95 hmac_key
= c
->_conf
->kinetic_hmac_key
;
96 use_ssl
= c
->_conf
->kinetic_use_ssl
;
99 KineticStore::~KineticStore()
105 void KineticStore::close()
107 kinetic_conn
.reset();
109 cct
->get_perfcounters_collection()->remove(logger
);
112 int KineticStore::submit_transaction(KeyValueDB::Transaction t
)
114 KineticTransactionImpl
* _t
=
115 static_cast<KineticTransactionImpl
*>(t
.get());
117 dout(20) << "kinetic submit_transaction" << dendl
;
119 for (vector
<KineticOp
>::iterator it
= _t
->ops
.begin();
120 it
!= _t
->ops
.end(); ++it
) {
121 kinetic::KineticStatus
status(kinetic::StatusCode::OK
, "");
122 if (it
->type
== KINETIC_OP_WRITE
) {
123 string
data(it
->data
.c_str(), it
->data
.length());
124 kinetic::KineticRecord
record(data
, "", "",
125 com::seagate::kinetic::client::proto::Message_Algorithm_SHA1
);
126 dout(30) << "kinetic before put of " << it
->key
<< " (" << data
.length() << " bytes)" << dendl
;
127 status
= kinetic_conn
->Put(it
->key
, "", kinetic::WriteMode::IGNORE_VERSION
,
129 dout(30) << "kinetic after put of " << it
->key
<< dendl
;
131 ceph_assert(it
->type
== KINETIC_OP_DELETE
);
132 dout(30) << "kinetic before delete" << dendl
;
133 status
= kinetic_conn
->Delete(it
->key
, "",
134 kinetic::WriteMode::IGNORE_VERSION
);
135 dout(30) << "kinetic after delete" << dendl
;
138 derr
<< "kinetic error submitting transaction: "
139 << status
.message() << dendl
;
144 logger
->inc(l_kinetic_txns
);
148 int KineticStore::submit_transaction_sync(KeyValueDB::Transaction t
)
150 return submit_transaction(t
);
153 void KineticStore::KineticTransactionImpl::set(
154 const string
&prefix
,
156 const bufferlist
&to_set_bl
)
158 string key
= combine_strings(prefix
, k
);
159 dout(30) << "kinetic set key " << key
<< dendl
;
160 ops
.push_back(KineticOp(KINETIC_OP_WRITE
, key
, to_set_bl
));
163 void KineticStore::KineticTransactionImpl::rmkey(const string
&prefix
,
166 string key
= combine_strings(prefix
, k
);
167 dout(30) << "kinetic rm key " << key
<< dendl
;
168 ops
.push_back(KineticOp(KINETIC_OP_DELETE
, key
));
171 void KineticStore::KineticTransactionImpl::rmkeys_by_prefix(const string
&prefix
)
173 dout(20) << "kinetic rmkeys_by_prefix " << prefix
<< dendl
;
174 KeyValueDB::Iterator it
= db
->get_iterator(prefix
);
175 for (it
->seek_to_first();
178 string key
= combine_strings(prefix
, it
->key());
179 ops
.push_back(KineticOp(KINETIC_OP_DELETE
, key
));
180 dout(30) << "kinetic rm key by prefix: " << key
<< dendl
;
184 void KineticStore::KineticTransactionImpl::rm_range_keys(const string
&prefix
, const string
&start
, const string
&end
)
186 KeyValueDB::Iterator it
= db
->get_iterator(prefix
);
187 it
->lower_bound(start
);
188 while (it
->valid()) {
189 if (it
->key() >= end
) {
193 KineticOp(KINETIC_OP_DELETE
, combine_strings(prefix
, it
->key())));
198 int KineticStore::get(
199 const string
&prefix
,
200 const std::set
<string
> &keys
,
201 std::map
<string
, bufferlist
> *out
)
203 dout(30) << "kinetic get prefix: " << prefix
<< " keys: " << keys
<< dendl
;
204 for (std::set
<string
>::const_iterator i
= keys
.begin();
207 unique_ptr
<kinetic::KineticRecord
> record
;
208 string key
= combine_strings(prefix
, *i
);
209 dout(30) << "before get key " << key
<< dendl
;
210 kinetic::KineticStatus status
= kinetic_conn
->Get(key
, record
);
213 dout(30) << "kinetic get got key: " << key
<< dendl
;
214 out
->insert(make_pair(key
, to_bufferlist(*record
.get())));
216 logger
->inc(l_kinetic_gets
);
220 string
KineticStore::combine_strings(const string
&prefix
, const string
&value
)
228 bufferlist
KineticStore::to_bufferlist(const kinetic::KineticRecord
&record
)
231 bl
.append(*(record
.value()));
235 int KineticStore::split_key(string
&in
, string
*prefix
, string
*key
)
237 size_t prefix_len
= 0;
238 char* in_data
= in
.c_str();
240 // Find separator inside Slice
241 char* separator
= (char*) memchr((void*)in_data
, 1, in
.size());
242 if (separator
== NULL
)
244 prefix_len
= size_t(separator
- in_data
);
245 if (prefix_len
>= in
.size())
248 // Fetch prefix and/or key directly from Slice
250 *prefix
= string(in_data
, prefix_len
);
252 *key
= string(separator
+1, in
.size()-prefix_len
-1);
256 KineticStore::KineticWholeSpaceIteratorImpl::KineticWholeSpaceIteratorImpl(kinetic::BlockingKineticConnection
*conn
) : kinetic_conn(conn
),
257 kinetic_status(kinetic::StatusCode::OK
, "")
259 dout(30) << "kinetic iterator constructor()" << dendl
;
260 const static string last_key
= "\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF";
261 kinetic::KeyRangeIterator it
=
262 kinetic_conn
->IterateKeyRange("", true, last_key
, true, 1024);
263 while (it
!= kinetic::KeyRangeEnd()) {
266 dout(30) << "kinetic iterator added " << *it
<< dendl
;
267 } catch (std::runtime_error
&e
) {
268 kinetic_status
= kinetic::KineticStatus(kinetic::StatusCode::CLIENT_INTERNAL_ERROR
, e
.what());
273 keys_iter
= keys
.begin();
276 int KineticStore::KineticWholeSpaceIteratorImpl::seek_to_first(const string
&prefix
)
278 dout(30) << "kinetic iterator seek_to_first(prefix): " << prefix
<< dendl
;
279 keys_iter
= keys
.lower_bound(prefix
);
283 int KineticStore::KineticWholeSpaceIteratorImpl::seek_to_last()
285 dout(30) << "kinetic iterator seek_to_last()" << dendl
;
286 keys_iter
= keys
.end();
287 if (keys
.begin() != keys_iter
)
292 int KineticStore::KineticWholeSpaceIteratorImpl::seek_to_last(const string
&prefix
)
294 dout(30) << "kinetic iterator seek_to_last(prefix): " << prefix
<< dendl
;
295 keys_iter
= keys
.upper_bound(prefix
+ "\2");
296 if (keys
.begin() == keys_iter
) {
297 keys_iter
= keys
.end();
304 int KineticStore::KineticWholeSpaceIteratorImpl::upper_bound(const string
&prefix
, const string
&after
) {
305 dout(30) << "kinetic iterator upper_bound()" << dendl
;
306 string bound
= combine_strings(prefix
, after
);
307 keys_iter
= keys
.upper_bound(bound
);
311 int KineticStore::KineticWholeSpaceIteratorImpl::lower_bound(const string
&prefix
, const string
&to
) {
312 dout(30) << "kinetic iterator lower_bound()" << dendl
;
313 string bound
= combine_strings(prefix
, to
);
314 keys_iter
= keys
.lower_bound(bound
);
318 bool KineticStore::KineticWholeSpaceIteratorImpl::valid() {
319 dout(30) << "kinetic iterator valid()" << dendl
;
320 return keys_iter
!= keys
.end();
323 int KineticStore::KineticWholeSpaceIteratorImpl::next() {
324 dout(30) << "kinetic iterator next()" << dendl
;
325 if (keys_iter
!= keys
.end()) {
332 int KineticStore::KineticWholeSpaceIteratorImpl::prev() {
333 dout(30) << "kinetic iterator prev()" << dendl
;
334 if (keys_iter
!= keys
.begin()) {
338 keys_iter
= keys
.end();
342 string
KineticStore::KineticWholeSpaceIteratorImpl::key() {
343 dout(30) << "kinetic iterator key()" << dendl
;
345 split_key(*keys_iter
, NULL
, &out_key
);
349 pair
<string
,string
> KineticStore::KineticWholeSpaceIteratorImpl::raw_key() {
350 dout(30) << "kinetic iterator raw_key()" << dendl
;
352 split_key(*keys_iter
, &prefix
, &key
);
353 return make_pair(prefix
, key
);
356 bool KineticStore::KineticWholeSpaceIteratorImpl::raw_key_is_prefixed(const string
&prefix
) {
357 // Look for "prefix\1" right in *keys_iter without making a copy
358 string key
= *keys_iter
;
359 if ((key
.size() > prefix
.length()) && (key
[prefix
.length()] == '\1')) {
360 return memcmp(key
.c_str(), prefix
.c_str(), prefix
.length()) == 0;
367 bufferlist
KineticStore::KineticWholeSpaceIteratorImpl::value() {
368 dout(30) << "kinetic iterator value()" << dendl
;
369 unique_ptr
<kinetic::KineticRecord
> record
;
370 kinetic_status
= kinetic_conn
->Get(*keys_iter
, record
);
371 return to_bufferlist(*record
.get());
374 int KineticStore::KineticWholeSpaceIteratorImpl::status() {
375 dout(30) << "kinetic iterator status()" << dendl
;
376 return kinetic_status
.ok() ? 0 : -1;