]> git.proxmox.com Git - ceph.git/blob - ceph/src/kv/KineticStore.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / kv / KineticStore.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 #include "KineticStore.h"
4 #include "common/ceph_crypto.h"
5
6 #include <set>
7 #include <map>
8 #include <string>
9 #include <errno.h>
10 using std::string;
11 #include "common/perf_counters.h"
12
13 #define dout_subsys ceph_subsys_kinetic
14
15 int KineticStore::init()
16 {
17 // init defaults. caller can override these if they want
18 // prior to calling open.
19 host = cct->_conf->kinetic_host;
20 port = cct->_conf->kinetic_port;
21 user_id = cct->_conf->kinetic_user_id;
22 hmac_key = cct->_conf->kinetic_hmac_key;
23 use_ssl = cct->_conf->kinetic_use_ssl;
24 return 0;
25 }
26
27 int KineticStore::_test_init(CephContext *c)
28 {
29 kinetic::KineticConnectionFactory conn_factory =
30 kinetic::NewKineticConnectionFactory();
31
32 kinetic::ConnectionOptions options;
33 options.host = cct->_conf->kinetic_host;
34 options.port = cct->_conf->kinetic_port;
35 options.user_id = cct->_conf->kinetic_user_id;
36 options.hmac_key = cct->_conf->kinetic_hmac_key;
37 options.use_ssl = cct->_conf->kinetic_use_ssl;
38
39 kinetic::Status status = conn_factory.NewThreadsafeBlockingConnection(options, kinetic_conn, 10);
40 kinetic_conn.reset();
41 if (!status.ok())
42 derr << __func__ << " Unable to connect to kinetic store " << options.host
43 << ":" << options.port << " : " << status.ToString() << dendl;
44 return status.ok() ? 0 : -EIO;
45 }
46
47 int KineticStore::open(ostream &out, const vector<ColumnFamily>& cfs)
48 {
49 if (!cfs.empty()) {
50 ceph_abort_msg("Not implemented");
51 }
52 return do_open(out, false);
53 }
54
55 int KineticStore::create_and_open(ostream &out, const vector<ColumnFamily>& cfs)
56 {
57 if (!cfs.empty()) {
58 ceph_abort_msg("Not implemented");
59 }
60 return do_open(out, true);
61 }
62
63 int KineticStore::do_open(ostream &out, bool create_if_missing)
64 {
65 kinetic::KineticConnectionFactory conn_factory =
66 kinetic::NewKineticConnectionFactory();
67 kinetic::ConnectionOptions options;
68 options.host = host;
69 options.port = port;
70 options.user_id = user_id;
71 options.hmac_key = hmac_key;
72 options.use_ssl = use_ssl;
73 kinetic::Status status = conn_factory.NewThreadsafeBlockingConnection(options, kinetic_conn, 10);
74 if (!status.ok()) {
75 derr << "Unable to connect to kinetic store " << host << ":" << port
76 << " : " << status.ToString() << dendl;
77 return -EINVAL;
78 }
79
80 PerfCountersBuilder plb(g_ceph_context, "kinetic", l_kinetic_first, l_kinetic_last);
81 plb.add_u64_counter(l_kinetic_gets, "kinetic_get", "Gets");
82 plb.add_u64_counter(l_kinetic_txns, "kinetic_transaction", "Transactions");
83 logger = plb.create_perf_counters();
84 cct->get_perfcounters_collection()->add(logger);
85 return 0;
86 }
87
88 KineticStore::KineticStore(CephContext *c) :
89 cct(c),
90 logger(NULL)
91 {
92 host = c->_conf->kinetic_host;
93 port = c->_conf->kinetic_port;
94 user_id = c->_conf->kinetic_user_id;
95 hmac_key = c->_conf->kinetic_hmac_key;
96 use_ssl = c->_conf->kinetic_use_ssl;
97 }
98
99 KineticStore::~KineticStore()
100 {
101 close();
102 delete logger;
103 }
104
105 void KineticStore::close()
106 {
107 kinetic_conn.reset();
108 if (logger)
109 cct->get_perfcounters_collection()->remove(logger);
110 }
111
112 int KineticStore::submit_transaction(KeyValueDB::Transaction t)
113 {
114 KineticTransactionImpl * _t =
115 static_cast<KineticTransactionImpl *>(t.get());
116
117 dout(20) << "kinetic submit_transaction" << dendl;
118
119 for (vector<KineticOp>::iterator it = _t->ops.begin();
120 it != _t->ops.end(); ++it) {
121 kinetic::KineticStatus status(kinetic::StatusCode::OK, "");
122 if (it->type == KINETIC_OP_WRITE) {
123 string data(it->data.c_str(), it->data.length());
124 kinetic::KineticRecord record(data, "", "",
125 com::seagate::kinetic::client::proto::Message_Algorithm_SHA1);
126 dout(30) << "kinetic before put of " << it->key << " (" << data.length() << " bytes)" << dendl;
127 status = kinetic_conn->Put(it->key, "", kinetic::WriteMode::IGNORE_VERSION,
128 record);
129 dout(30) << "kinetic after put of " << it->key << dendl;
130 } else {
131 ceph_assert(it->type == KINETIC_OP_DELETE);
132 dout(30) << "kinetic before delete" << dendl;
133 status = kinetic_conn->Delete(it->key, "",
134 kinetic::WriteMode::IGNORE_VERSION);
135 dout(30) << "kinetic after delete" << dendl;
136 }
137 if (!status.ok()) {
138 derr << "kinetic error submitting transaction: "
139 << status.message() << dendl;
140 return -1;
141 }
142 }
143
144 logger->inc(l_kinetic_txns);
145 return 0;
146 }
147
148 int KineticStore::submit_transaction_sync(KeyValueDB::Transaction t)
149 {
150 return submit_transaction(t);
151 }
152
153 void KineticStore::KineticTransactionImpl::set(
154 const string &prefix,
155 const string &k,
156 const bufferlist &to_set_bl)
157 {
158 string key = combine_strings(prefix, k);
159 dout(30) << "kinetic set key " << key << dendl;
160 ops.push_back(KineticOp(KINETIC_OP_WRITE, key, to_set_bl));
161 }
162
163 void KineticStore::KineticTransactionImpl::rmkey(const string &prefix,
164 const string &k)
165 {
166 string key = combine_strings(prefix, k);
167 dout(30) << "kinetic rm key " << key << dendl;
168 ops.push_back(KineticOp(KINETIC_OP_DELETE, key));
169 }
170
171 void KineticStore::KineticTransactionImpl::rmkeys_by_prefix(const string &prefix)
172 {
173 dout(20) << "kinetic rmkeys_by_prefix " << prefix << dendl;
174 KeyValueDB::Iterator it = db->get_iterator(prefix);
175 for (it->seek_to_first();
176 it->valid();
177 it->next()) {
178 string key = combine_strings(prefix, it->key());
179 ops.push_back(KineticOp(KINETIC_OP_DELETE, key));
180 dout(30) << "kinetic rm key by prefix: " << key << dendl;
181 }
182 }
183
184 void KineticStore::KineticTransactionImpl::rm_range_keys(const string &prefix, const string &start, const string &end)
185 {
186 KeyValueDB::Iterator it = db->get_iterator(prefix);
187 it->lower_bound(start);
188 while (it->valid()) {
189 if (it->key() >= end) {
190 break;
191 }
192 ops.push_back(
193 KineticOp(KINETIC_OP_DELETE, combine_strings(prefix, it->key())));
194 it->next();
195 }
196 }
197
198 int KineticStore::get(
199 const string &prefix,
200 const std::set<string> &keys,
201 std::map<string, bufferlist> *out)
202 {
203 dout(30) << "kinetic get prefix: " << prefix << " keys: " << keys << dendl;
204 for (std::set<string>::const_iterator i = keys.begin();
205 i != keys.end();
206 ++i) {
207 unique_ptr<kinetic::KineticRecord> record;
208 string key = combine_strings(prefix, *i);
209 dout(30) << "before get key " << key << dendl;
210 kinetic::KineticStatus status = kinetic_conn->Get(key, record);
211 if (!status.ok())
212 break;
213 dout(30) << "kinetic get got key: " << key << dendl;
214 out->insert(make_pair(key, to_bufferlist(*record.get())));
215 }
216 logger->inc(l_kinetic_gets);
217 return 0;
218 }
219
220 string KineticStore::combine_strings(const string &prefix, const string &value)
221 {
222 string out = prefix;
223 out.push_back(1);
224 out.append(value);
225 return out;
226 }
227
228 bufferlist KineticStore::to_bufferlist(const kinetic::KineticRecord &record)
229 {
230 bufferlist bl;
231 bl.append(*(record.value()));
232 return bl;
233 }
234
235 int KineticStore::split_key(string &in, string *prefix, string *key)
236 {
237 size_t prefix_len = 0;
238 char* in_data = in.c_str();
239
240 // Find separator inside Slice
241 char* separator = (char*) memchr((void*)in_data, 1, in.size());
242 if (separator == NULL)
243 return -EINVAL;
244 prefix_len = size_t(separator - in_data);
245 if (prefix_len >= in.size())
246 return -EINVAL;
247
248 // Fetch prefix and/or key directly from Slice
249 if (prefix)
250 *prefix = string(in_data, prefix_len);
251 if (key)
252 *key = string(separator+1, in.size()-prefix_len-1);
253 return 0;
254 }
255
256 KineticStore::KineticWholeSpaceIteratorImpl::KineticWholeSpaceIteratorImpl(kinetic::BlockingKineticConnection *conn) : kinetic_conn(conn),
257 kinetic_status(kinetic::StatusCode::OK, "")
258 {
259 dout(30) << "kinetic iterator constructor()" << dendl;
260 const static string last_key = "\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF";
261 kinetic::KeyRangeIterator it =
262 kinetic_conn->IterateKeyRange("", true, last_key, true, 1024);
263 while (it != kinetic::KeyRangeEnd()) {
264 try {
265 keys.insert(*it);
266 dout(30) << "kinetic iterator added " << *it << dendl;
267 } catch (std::runtime_error &e) {
268 kinetic_status = kinetic::KineticStatus(kinetic::StatusCode::CLIENT_INTERNAL_ERROR, e.what());
269 return;
270 }
271 ++it;
272 }
273 keys_iter = keys.begin();
274 }
275
276 int KineticStore::KineticWholeSpaceIteratorImpl::seek_to_first(const string &prefix)
277 {
278 dout(30) << "kinetic iterator seek_to_first(prefix): " << prefix << dendl;
279 keys_iter = keys.lower_bound(prefix);
280 return 0;
281 }
282
283 int KineticStore::KineticWholeSpaceIteratorImpl::seek_to_last()
284 {
285 dout(30) << "kinetic iterator seek_to_last()" << dendl;
286 keys_iter = keys.end();
287 if (keys.begin() != keys_iter)
288 --keys_iter;
289 return 0;
290 }
291
292 int KineticStore::KineticWholeSpaceIteratorImpl::seek_to_last(const string &prefix)
293 {
294 dout(30) << "kinetic iterator seek_to_last(prefix): " << prefix << dendl;
295 keys_iter = keys.upper_bound(prefix + "\2");
296 if (keys.begin() == keys_iter) {
297 keys_iter = keys.end();
298 } else {
299 --keys_iter;
300 }
301 return 0;
302 }
303
304 int KineticStore::KineticWholeSpaceIteratorImpl::upper_bound(const string &prefix, const string &after) {
305 dout(30) << "kinetic iterator upper_bound()" << dendl;
306 string bound = combine_strings(prefix, after);
307 keys_iter = keys.upper_bound(bound);
308 return 0;
309 }
310
311 int KineticStore::KineticWholeSpaceIteratorImpl::lower_bound(const string &prefix, const string &to) {
312 dout(30) << "kinetic iterator lower_bound()" << dendl;
313 string bound = combine_strings(prefix, to);
314 keys_iter = keys.lower_bound(bound);
315 return 0;
316 }
317
318 bool KineticStore::KineticWholeSpaceIteratorImpl::valid() {
319 dout(30) << "kinetic iterator valid()" << dendl;
320 return keys_iter != keys.end();
321 }
322
323 int KineticStore::KineticWholeSpaceIteratorImpl::next() {
324 dout(30) << "kinetic iterator next()" << dendl;
325 if (keys_iter != keys.end()) {
326 ++keys_iter;
327 return 0;
328 }
329 return -1;
330 }
331
332 int KineticStore::KineticWholeSpaceIteratorImpl::prev() {
333 dout(30) << "kinetic iterator prev()" << dendl;
334 if (keys_iter != keys.begin()) {
335 --keys_iter;
336 return 0;
337 }
338 keys_iter = keys.end();
339 return -1;
340 }
341
342 string KineticStore::KineticWholeSpaceIteratorImpl::key() {
343 dout(30) << "kinetic iterator key()" << dendl;
344 string out_key;
345 split_key(*keys_iter, NULL, &out_key);
346 return out_key;
347 }
348
349 pair<string,string> KineticStore::KineticWholeSpaceIteratorImpl::raw_key() {
350 dout(30) << "kinetic iterator raw_key()" << dendl;
351 string prefix, key;
352 split_key(*keys_iter, &prefix, &key);
353 return make_pair(prefix, key);
354 }
355
356 bool KineticStore::KineticWholeSpaceIteratorImpl::raw_key_is_prefixed(const string &prefix) {
357 // Look for "prefix\1" right in *keys_iter without making a copy
358 string key = *keys_iter;
359 if ((key.size() > prefix.length()) && (key[prefix.length()] == '\1')) {
360 return memcmp(key.c_str(), prefix.c_str(), prefix.length()) == 0;
361 } else {
362 return false;
363 }
364 }
365
366
367 bufferlist KineticStore::KineticWholeSpaceIteratorImpl::value() {
368 dout(30) << "kinetic iterator value()" << dendl;
369 unique_ptr<kinetic::KineticRecord> record;
370 kinetic_status = kinetic_conn->Get(*keys_iter, record);
371 return to_bufferlist(*record.get());
372 }
373
374 int KineticStore::KineticWholeSpaceIteratorImpl::status() {
375 dout(30) << "kinetic iterator status()" << dendl;
376 return kinetic_status.ok() ? 0 : -1;
377 }