]> git.proxmox.com Git - ceph.git/blob - ceph/src/key_value_store/kv_flat_btree_async.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / key_value_store / kv_flat_btree_async.cc
1 /*
2 * Key-value store using librados
3 *
4 * September 2, 2012
5 * Eleanor Cawthon
6 * eleanor.cawthon@inktank.com
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13
14 #include "include/compat.h"
15 #include "key_value_store/key_value_structure.h"
16 #include "key_value_store/kv_flat_btree_async.h"
17 #include "key_value_store/kvs_arg_types.h"
18 #include "include/rados/librados.hpp"
19 #include "common/ceph_context.h"
20 #include "common/Clock.h"
21 #include "include/types.h"
22
23 #include <errno.h>
24 #include <string>
25 #include <iostream>
26 #include <cassert>
27 #include <climits>
28 #include <cmath>
29 #include <sstream>
30 #include <stdlib.h>
31 #include <iterator>
32
33 using ceph::bufferlist;
34 using namespace std;
35
36 bool index_data::is_timed_out(utime_t now, utime_t timeout) const {
37 return prefix != "" && now - ts > timeout;
38 }
39
40 void IndexCache::clear() {
41 k2itmap.clear();
42 t2kmap.clear();
43 }
44
45 void IndexCache::push(const string &key, const index_data &idata) {
46 if (cache_size == 0) {
47 return;
48 }
49 index_data old_idata;
50 std::map<key_data, std::pair<index_data, utime_t> >::iterator old_it =
51 k2itmap.lower_bound(key_data(key));
52 if (old_it != k2itmap.end()) {
53 t2kmap.erase(old_it->second.second);
54 k2itmap.erase(old_it);
55 }
56 std::map<key_data, std::pair<index_data, utime_t> >::iterator new_it =
57 k2itmap.find(idata.kdata);
58 if (new_it != k2itmap.end()) {
59 utime_t old_time = new_it->second.second;
60 t2kmap.erase(old_time);
61 }
62 utime_t time = ceph_clock_now();
63 k2itmap[idata.kdata] = std::make_pair(idata, time);
64 t2kmap[time] = idata.kdata;
65 if ((int)k2itmap.size() > cache_size) {
66 pop();
67 }
68
69 }
70
71 void IndexCache::push(const index_data &idata) {
72 if (cache_size == 0) {
73 return;
74 }
75 if (k2itmap.count(idata.kdata) > 0) {
76 utime_t old_time = k2itmap[idata.kdata].second;
77 t2kmap.erase(old_time);
78 k2itmap.erase(idata.kdata);
79 }
80 utime_t time = ceph_clock_now();
81 k2itmap[idata.kdata] = std::make_pair(idata, time);
82 t2kmap[time] = idata.kdata;
83 if ((int)k2itmap.size() > cache_size) {
84 pop();
85 }
86 }
87
88 void IndexCache::pop() {
89 if (cache_size == 0) {
90 return;
91 }
92 std::map<utime_t, key_data>::iterator it = t2kmap.begin();
93 utime_t time = it->first;
94 key_data kdata = it->second;
95 k2itmap.erase(kdata);
96 t2kmap.erase(time);
97 }
98
99 void IndexCache::erase(key_data kdata) {
100 if (cache_size == 0) {
101 return;
102 }
103 if (k2itmap.count(kdata) > 0) {
104 utime_t c = k2itmap[kdata].second;
105 k2itmap.erase(kdata);
106 t2kmap.erase(c);
107 }
108 }
109
110 int IndexCache::get(const string &key, index_data *idata) const {
111 if (cache_size == 0) {
112 return -ENODATA;
113 }
114 if ((int)k2itmap.size() == 0) {
115 return -ENODATA;
116 }
117 std::map<key_data, std::pair<index_data, utime_t> >::const_iterator it =
118 k2itmap.lower_bound(key_data(key));
119 if (it == k2itmap.end() || !(it->second.first.min_kdata < key_data(key))) {
120 return -ENODATA;
121 } else {
122 *idata = it->second.first;
123 }
124 return 0;
125 }
126
127 int IndexCache::get(const string &key, index_data *idata,
128 index_data *next_idata) const {
129 if (cache_size == 0) {
130 return -ENODATA;
131 }
132 std::map<key_data, std::pair<index_data, utime_t> >::const_iterator it =
133 k2itmap.lower_bound(key_data(key));
134 if (it == k2itmap.end() || ++it == k2itmap.end()) {
135 return -ENODATA;
136 } else {
137 --it;
138 if (!(it->second.first.min_kdata < key_data(key))){
139 //stale, should be reread.
140 return -ENODATA;
141 } else {
142 *idata = it->second.first;
143 ++it;
144 if (it != k2itmap.end()) {
145 *next_idata = it->second.first;
146 }
147 }
148 }
149 return 0;
150 }
151
152 int KvFlatBtreeAsync::nothing() {
153 return 0;
154 }
155
156 int KvFlatBtreeAsync::wait() {
157 if (rand() % 10 == 0) {
158 usleep(wait_ms);
159 }
160 return 0;
161 }
162
163 int KvFlatBtreeAsync::suicide() {
164 if (rand() % 10 == 0) {
165 if (verbose) cout << client_name << " is suiciding" << std::endl;
166 return 1;
167 }
168 return 0;
169 }
170
171 int KvFlatBtreeAsync::next(const index_data &idata, index_data * out_data)
172 {
173 if (verbose) cout << "\t\t" << client_name << "-next: finding next of "
174 << idata.str()
175 << std::endl;
176 int err = 0;
177 librados::ObjectReadOperation oro;
178 std::map<std::string, bufferlist> kvs;
179 oro.omap_get_vals2(idata.kdata.encoded(),1,&kvs, nullptr, &err);
180 err = io_ctx.operate(index_name, &oro, NULL);
181 if (err < 0){
182 if (verbose) cout << "\t\t\t" << client_name
183 << "-next: getting index failed with error "
184 << err << std::endl;
185 return err;
186 }
187 if (!kvs.empty()) {
188 out_data->kdata.parse(kvs.begin()->first);
189 auto b = kvs.begin()->second.cbegin();
190 out_data->decode(b);
191 if (idata.is_timed_out(ceph_clock_now(), timeout)) {
192 if (verbose) cout << client_name << " THINKS THE OTHER CLIENT DIED."
193 << std::endl;
194 //the client died after deleting the object. clean up.
195 cleanup(idata, err);
196 }
197 } else {
198 err = -EOVERFLOW;
199 }
200 return err;
201 }
202
203 int KvFlatBtreeAsync::prev(const index_data &idata, index_data * out_data)
204 {
205 if (verbose) cout << "\t\t" << client_name << "-prev: finding prev of "
206 << idata.str() << std::endl;
207 int err = 0;
208 bufferlist inbl;
209 idata_from_idata_args in_args;
210 in_args.idata = idata;
211 in_args.encode(inbl);
212 bufferlist outbl;
213 err = io_ctx.exec(index_name,"kvs", "get_prev_idata", inbl, outbl);
214 if (err < 0){
215 if (verbose) cout << "\t\t\t" << client_name
216 << "-prev: getting index failed with error "
217 << err << std::endl;
218 if (idata.is_timed_out(ceph_clock_now(), timeout)) {
219 if (verbose) cout << client_name << " THINKS THE OTHER CLIENT DIED."
220 << std::endl;
221 //the client died after deleting the object. clean up.
222 err = cleanup(idata, err);
223 if (err == -ESUICIDE) {
224 return err;
225 } else {
226 err = 0;
227 }
228 }
229 return err;
230 }
231 auto it = outbl.cbegin();
232 in_args.decode(it);
233 *out_data = in_args.next_idata;
234 if (verbose) cout << "\t\t" << client_name << "-prev: prev is "
235 << out_data->str()
236 << std::endl;
237 return err;
238 }
239
240 int KvFlatBtreeAsync::read_index(const string &key, index_data * idata,
241 index_data * next_idata, bool force_update) {
242 int err = 0;
243 if (!force_update) {
244 if (verbose) cout << "\t" << client_name
245 << "-read_index: getting index_data for " << key
246 << " from cache" << std::endl;
247 icache_lock.lock();
248 if (next_idata != NULL) {
249 err = icache.get(key, idata, next_idata);
250 } else {
251 err = icache.get(key, idata);
252 }
253 icache_lock.unlock();
254
255 if (err == 0) {
256 //if (verbose) cout << "CACHE SUCCESS" << std::endl;
257 return err;
258 } else {
259 if (verbose) cout << "NOT IN CACHE" << std::endl;
260 }
261 }
262
263 if (verbose) cout << "\t" << client_name
264 << "-read_index: getting index_data for " << key
265 << " from object" << std::endl;
266 librados::ObjectReadOperation oro;
267 bufferlist raw_val;
268 std::set<std::string> key_set;
269 key_set.insert(key_data(key).encoded());
270 std::map<std::string, bufferlist> kvmap;
271 std::map<std::string, bufferlist> dupmap;
272 oro.omap_get_vals_by_keys(key_set, &dupmap, &err);
273 oro.omap_get_vals2(key_data(key).encoded(),
274 (cache_size / cache_refresh >= 2? cache_size / cache_refresh: 2),
275 &kvmap, nullptr, &err);
276 err = io_ctx.operate(index_name, &oro, NULL);
277 utime_t mytime = ceph_clock_now();
278 if (err < 0){
279 cerr << "\t" << client_name
280 << "-read_index: getting keys failed with "
281 << err << std::endl;
282 ceph_abort_msg(client_name + "-read_index: reading index failed");
283 return err;
284 }
285 kvmap.insert(dupmap.begin(), dupmap.end());
286 for (map<string, bufferlist>::iterator it = ++kvmap.begin();
287 it != kvmap.end();
288 ++it) {
289 bufferlist bl = it->second;
290 auto blit = bl.cbegin();
291 index_data this_idata;
292 this_idata.decode(blit);
293 if (this_idata.is_timed_out(mytime, timeout)) {
294 if (verbose) cout << client_name
295 << " THINKS THE OTHER CLIENT DIED. (mytime is "
296 << mytime.sec() << "." << mytime.usec() << ", idata.ts is "
297 << this_idata.ts.sec() << "." << this_idata.ts.usec()
298 << ", it has been " << (mytime - this_idata.ts).sec()
299 << '.' << (mytime - this_idata.ts).usec()
300 << ", timeout is " << timeout << ")" << std::endl;
301 //the client died after deleting the object. clean up.
302 if (cleanup(this_idata, -EPREFIX) == -ESUICIDE) {
303 return -ESUICIDE;
304 }
305 return read_index(key, idata, next_idata, force_update);
306 }
307 std::scoped_lock l{icache_lock};
308 icache.push(this_idata);
309 }
310 auto b = kvmap.begin()->second.cbegin();
311 idata->decode(b);
312 idata->kdata.parse(kvmap.begin()->first);
313 if (verbose) cout << "\t" << client_name << "-read_index: kvmap_size is "
314 << kvmap.size()
315 << ", idata is " << idata->str() << std::endl;
316
317 ceph_assert(idata->obj != "");
318 icache_lock.lock();
319 icache.push(key, *idata);
320 icache_lock.unlock();
321
322 if (next_idata != NULL && idata->kdata.prefix != "1") {
323 next_idata->kdata.parse((++kvmap.begin())->first);
324 auto nb = (++kvmap.begin())->second.cbegin();
325 next_idata->decode(nb);
326 std::scoped_lock l{icache_lock};
327 icache.push(*next_idata);
328 }
329 return err;
330 }
331
332 int KvFlatBtreeAsync::split(const index_data &idata) {
333 int err = 0;
334 opmap['l']++;
335
336 if (idata.prefix != "") {
337 return -EPREFIX;
338 }
339
340 rebalance_args args;
341 args.bound = 2 * k - 1;
342 args.comparator = CEPH_OSD_CMPXATTR_OP_GT;
343 err = read_object(idata.obj, &args);
344 args.odata.max_kdata = idata.kdata;
345 if (err < 0) {
346 if (verbose) cout << "\t\t" << client_name << "-split: read object "
347 << args.odata.name
348 << " got " << err << std::endl;
349 return err;
350 }
351
352 if (verbose) cout << "\t\t" << client_name << "-split: splitting "
353 << idata.obj
354 << ", which has size " << args.odata.size
355 << " and actual size " << args.odata.omap.size() << std::endl;
356
357 ///////preparations that happen outside the critical section
358 //for prefix index
359 vector<object_data> to_create;
360 vector<object_data> to_delete;
361 to_delete.push_back(object_data(idata.min_kdata,
362 args.odata.max_kdata, args.odata.name, args.odata.version));
363
364 //for lower half object
365 std::map<std::string, bufferlist>::const_iterator it = args.odata.omap.begin();
366 client_index_lock.lock();
367 to_create.push_back(object_data(to_string(client_name, client_index++)));
368 client_index_lock.unlock();
369 for (int i = 0; i < k; i++) {
370 to_create[0].omap.insert(*it);
371 ++it;
372 }
373 to_create[0].min_kdata = idata.min_kdata;
374 to_create[0].max_kdata = key_data(to_create[0].omap.rbegin()->first);
375
376 //for upper half object
377 client_index_lock.lock();
378 to_create.push_back(object_data(to_create[0].max_kdata,
379 args.odata.max_kdata,
380 to_string(client_name, client_index++)));
381 client_index_lock.unlock();
382 to_create[1].omap.insert(
383 ++args.odata.omap.find(to_create[0].omap.rbegin()->first),
384 args.odata.omap.end());
385
386 //setting up operations
387 librados::ObjectWriteOperation owos[6];
388 vector<std::pair<std::pair<int, string>, librados::ObjectWriteOperation*> > ops;
389 index_data out_data;
390 set_up_prefix_index(to_create, to_delete, &owos[0], &out_data, &err);
391 ops.push_back(std::make_pair(
392 std::pair<int, string>(ADD_PREFIX, index_name),
393 &owos[0]));
394 for (int i = 1; i < 6; i++) {
395 ops.push_back(std::make_pair(std::make_pair(0,""), &owos[i]));
396 }
397 set_up_ops(to_create, to_delete, &ops, out_data, &err);
398
399 /////BEGIN CRITICAL SECTION/////
400 //put prefix on index entry for idata.val
401 err = perform_ops("\t\t" + client_name + "-split:", out_data, &ops);
402 if (err < 0) {
403 return err;
404 }
405 if (verbose) cout << "\t\t" << client_name << "-split: done splitting."
406 << std::endl;
407 /////END CRITICAL SECTION/////
408 icache_lock.lock();
409 for (vector<delete_data>::iterator it = out_data.to_delete.begin();
410 it != out_data.to_delete.end(); ++it) {
411 icache.erase(it->max);
412 }
413 for (vector<create_data>::iterator it = out_data.to_create.begin();
414 it != out_data.to_create.end(); ++it) {
415 icache.push(index_data(*it));
416 }
417 icache_lock.unlock();
418 return err;
419 }
420
421 int KvFlatBtreeAsync::rebalance(const index_data &idata1,
422 const index_data &next_idata){
423 opmap['m']++;
424 int err = 0;
425
426 if (idata1.prefix != "") {
427 return -EPREFIX;
428 }
429
430 rebalance_args args1;
431 args1.bound = k + 1;
432 args1.comparator = CEPH_OSD_CMPXATTR_OP_LT;
433 index_data idata2 = next_idata;
434
435 rebalance_args args2;
436 args2.bound = k + 1;
437 args2.comparator = CEPH_OSD_CMPXATTR_OP_LT;
438
439 if (idata1.kdata.prefix == "1") {
440 //this is the highest key in the index, so it doesn't have a next.
441
442 //read the index for the previous entry
443 err = prev(idata1, &idata2);
444 if (err == -ERANGE) {
445 if (verbose) cout << "\t\t" << client_name
446 << "-rebalance: this is the only node, "
447 << "so aborting" << std::endl;
448 return -EUCLEAN;
449 } else if (err < 0) {
450 return err;
451 }
452
453 //read the first object
454 err = read_object(idata1.obj, &args2);
455 if (err < 0) {
456 if (verbose) cout << "reading " << idata1.obj << " failed with " << err
457 << std::endl;
458 if (err == -ENOENT) {
459 return -ECANCELED;
460 }
461 return err;
462 }
463 args2.odata.min_kdata = idata1.min_kdata;
464 args2.odata.max_kdata = idata1.kdata;
465
466 //read the second object
467 args1.bound = 2 * k + 1;
468 err = read_object(idata2.obj, &args1);
469 if (err < 0) {
470 if (verbose) cout << "reading " << idata1.obj << " failed with " << err
471 << std::endl;
472 return err;
473 }
474 args1.odata.min_kdata = idata2.min_kdata;
475 args1.odata.max_kdata = idata2.kdata;
476
477 if (verbose) cout << "\t\t" << client_name << "-rebalance: read "
478 << idata2.obj
479 << ". size: " << args1.odata.size << " version: "
480 << args1.odata.version
481 << std::endl;
482 } else {
483 assert (next_idata.obj != "");
484 //there is a next key, so get it.
485 err = read_object(idata1.obj, &args1);
486 if (err < 0) {
487 if (verbose) cout << "reading " << idata1.obj << " failed with " << err
488 << std::endl;
489 return err;
490 }
491 args1.odata.min_kdata = idata1.min_kdata;
492 args1.odata.max_kdata = idata1.kdata;
493
494 args2.bound = 2 * k + 1;
495 err = read_object(idata2.obj, &args2);
496 if (err < 0) {
497 if (verbose) cout << "reading " << idata1.obj << " failed with " << err
498 << std::endl;
499 if (err == -ENOENT) {
500 return -ECANCELED;
501 }
502 return err;
503 }
504 args2.odata.min_kdata = idata2.min_kdata;
505 args2.odata.max_kdata = idata2.kdata;
506
507 if (verbose) cout << "\t\t" << client_name << "-rebalance: read "
508 << idata2.obj
509 << ". size: " << args2.odata.size << " version: "
510 << args2.odata.version
511 << std::endl;
512 }
513
514 if (verbose) cout << "\t\t" << client_name << "-rebalance: o1 is "
515 << args1.odata.max_kdata.encoded() << ","
516 << args1.odata.name << " with size " << args1.odata.size
517 << " , o2 is " << args2.odata.max_kdata.encoded()
518 << "," << args2.odata.name << " with size " << args2.odata.size
519 << std::endl;
520
521 //calculations
522 if ((int)args1.odata.size > k && (int)args1.odata.size <= 2*k
523 && (int)args2.odata.size > k
524 && (int)args2.odata.size <= 2*k) {
525 //nothing to do
526 if (verbose) cout << "\t\t" << client_name
527 << "-rebalance: both sizes in range, so"
528 << " aborting " << std::endl;
529 return -EBALANCE;
530 } else if (idata1.prefix != "" || idata2.prefix != "") {
531 return -EPREFIX;
532 }
533
534 //this is the high object. it gets created regardless of rebalance or merge.
535 client_index_lock.lock();
536 string o2w = to_string(client_name, client_index++);
537 client_index_lock.unlock();
538 index_data idata;
539 vector<object_data> to_create;
540 vector<object_data> to_delete;
541 librados::ObjectWriteOperation create[2];//possibly only 1 will be used
542 librados::ObjectWriteOperation other_ops[6];
543 vector<std::pair<std::pair<int, string>, librados::ObjectWriteOperation*> > ops;
544 ops.push_back(std::make_pair(
545 std::pair<int, string>(ADD_PREFIX, index_name),
546 &other_ops[0]));
547
548 if ((int)args1.odata.size + (int)args2.odata.size <= 2*k) {
549 //merge
550 if (verbose) cout << "\t\t" << client_name << "-rebalance: merging "
551 << args1.odata.name
552 << " and " << args2.odata.name << " to get " << o2w
553 << std::endl;
554 std::map<string, bufferlist> write2_map;
555 write2_map.insert(args1.odata.omap.begin(), args1.odata.omap.end());
556 write2_map.insert(args2.odata.omap.begin(), args2.odata.omap.end());
557 to_create.push_back(object_data(args1.odata.min_kdata,
558 args2.odata.max_kdata, o2w, write2_map));
559 ops.push_back(std::make_pair(
560 std::pair<int, std::string>(MAKE_OBJECT, o2w),
561 &create[0]));
562 ceph_assert((int)write2_map.size() <= 2*k);
563 } else {
564 //rebalance
565 if (verbose) cout << "\t\t" << client_name << "-rebalance: rebalancing "
566 << args1.odata.name
567 << " and " << args2.odata.name << std::endl;
568 std::map<std::string, bufferlist> write1_map;
569 std::map<std::string, bufferlist> write2_map;
570 std::map<std::string, bufferlist>::iterator it;
571 client_index_lock.lock();
572 string o1w = to_string(client_name, client_index++);
573 client_index_lock.unlock();
574 int target_size_1 = ceil(((int)args1.odata.size + (int)args2.odata.size)
575 / 2.0);
576 if (args1.odata.max_kdata != idata1.kdata) {
577 //this should be true if idata1 is the high object
578 target_size_1 = floor(((int)args1.odata.size + (int)args2.odata.size)
579 / 2.0);
580 }
581 for (it = args1.odata.omap.begin();
582 it != args1.odata.omap.end() && (int)write1_map.size()
583 < target_size_1;
584 ++it) {
585 write1_map.insert(*it);
586 }
587 if (it != args1.odata.omap.end()){
588 //write1_map is full, so put the rest in write2_map
589 write2_map.insert(it, args1.odata.omap.end());
590 write2_map.insert(args2.odata.omap.begin(), args2.odata.omap.end());
591 } else {
592 //args1.odata.omap was small, and write2_map still needs more
593 std::map<std::string, bufferlist>::iterator it2;
594 for(it2 = args2.odata.omap.begin();
595 (it2 != args2.odata.omap.end()) && ((int)write1_map.size()
596 < target_size_1);
597 ++it2) {
598 write1_map.insert(*it2);
599 }
600 write2_map.insert(it2, args2.odata.omap.end());
601 }
602 if (verbose) cout << "\t\t" << client_name
603 << "-rebalance: write1_map has size "
604 << write1_map.size() << ", write2_map.size() is " << write2_map.size()
605 << std::endl;
606 //at this point, write1_map and write2_map should have the correct pairs
607 to_create.push_back(object_data(args1.odata.min_kdata,
608 key_data(write1_map.rbegin()->first),
609 o1w,write1_map));
610 to_create.push_back(object_data( key_data(write1_map.rbegin()->first),
611 args2.odata.max_kdata, o2w, write2_map));
612 ops.push_back(std::make_pair(
613 std::pair<int, std::string>(MAKE_OBJECT, o1w),
614 &create[0]));
615 ops.push_back(std::make_pair(
616 std::pair<int, std::string>(MAKE_OBJECT, o2w),
617 &create[1]));
618 }
619
620 to_delete.push_back(object_data(args1.odata.min_kdata,
621 args1.odata.max_kdata, args1.odata.name, args1.odata.version));
622 to_delete.push_back(object_data(args2.odata.min_kdata,
623 args2.odata.max_kdata, args2.odata.name, args2.odata.version));
624 for (int i = 1; i < 6; i++) {
625 ops.push_back(std::make_pair(std::make_pair(0,""), &other_ops[i]));
626 }
627
628 index_data out_data;
629 set_up_prefix_index(to_create, to_delete, &other_ops[0], &out_data, &err);
630 set_up_ops(to_create, to_delete, &ops, out_data, &err);
631
632 //at this point, all operations should be completely set up.
633 /////BEGIN CRITICAL SECTION/////
634 err = perform_ops("\t\t" + client_name + "-rebalance:", out_data, &ops);
635 if (err < 0) {
636 return err;
637 }
638 icache_lock.lock();
639 for (vector<delete_data>::iterator it = out_data.to_delete.begin();
640 it != out_data.to_delete.end(); ++it) {
641 icache.erase(it->max);
642 }
643 for (vector<create_data>::iterator it = out_data.to_create.begin();
644 it != out_data.to_create.end(); ++it) {
645 icache.push(index_data(*it));
646 }
647 icache_lock.unlock();
648 if (verbose) cout << "\t\t" << client_name << "-rebalance: done rebalancing."
649 << std::endl;
650 /////END CRITICAL SECTION/////
651 return err;
652 }
653
654 int KvFlatBtreeAsync::read_object(const string &obj, object_data * odata) {
655 librados::ObjectReadOperation get_obj;
656 librados::AioCompletion * obj_aioc = rados.aio_create_completion();
657 int err;
658 bufferlist unw_bl;
659 odata->name = obj;
660 get_obj.omap_get_vals2("", LONG_MAX, &odata->omap, nullptr, &err);
661 get_obj.getxattr("unwritable", &unw_bl, &err);
662 io_ctx.aio_operate(obj, obj_aioc, &get_obj, NULL);
663 obj_aioc->wait_for_complete();
664 err = obj_aioc->get_return_value();
665 if (err < 0){
666 //possibly -ENOENT, meaning someone else deleted it.
667 obj_aioc->release();
668 return err;
669 }
670 odata->unwritable = string(unw_bl.c_str(), unw_bl.length()) == "1";
671 odata->version = obj_aioc->get_version64();
672 odata->size = odata->omap.size();
673 obj_aioc->release();
674 return 0;
675 }
676
677 int KvFlatBtreeAsync::read_object(const string &obj, rebalance_args * args) {
678 bufferlist inbl;
679 args->encode(inbl);
680 bufferlist outbl;
681 int err;
682 librados::AioCompletion * a = rados.aio_create_completion();
683 io_ctx.aio_exec(obj, a, "kvs", "maybe_read_for_balance", inbl, &outbl);
684 a->wait_for_complete();
685 err = a->get_return_value();
686 if (err < 0) {
687 if (verbose) cout << "\t\t" << client_name
688 << "-read_object: reading failed with "
689 << err << std::endl;
690 a->release();
691 return err;
692 }
693 auto it = outbl.cbegin();
694 args->decode(it);
695 args->odata.name = obj;
696 args->odata.version = a->get_version64();
697 a->release();
698 return err;
699 }
700
701 void KvFlatBtreeAsync::set_up_prefix_index(
702 const vector<object_data> &to_create,
703 const vector<object_data> &to_delete,
704 librados::ObjectWriteOperation * owo,
705 index_data * idata,
706 int * err) {
707 std::map<std::string, std::pair<bufferlist, int> > assertions;
708 std::map<string, bufferlist> to_insert;
709 idata->prefix = "1";
710 idata->ts = ceph_clock_now();
711 for(vector<object_data>::const_iterator it = to_create.begin();
712 it != to_create.end();
713 ++it) {
714 create_data c(it->min_kdata, it->max_kdata, it->name);
715 idata->to_create.push_back(c);
716 }
717 for(vector<object_data>::const_iterator it = to_delete.begin();
718 it != to_delete.end();
719 ++it) {
720 delete_data d(it->min_kdata, it->max_kdata, it->name, it->version);
721 idata->to_delete.push_back(d);
722 }
723 for(vector<object_data>::const_iterator it = to_delete.begin();
724 it != to_delete.end();
725 ++it) {
726 idata->obj = it->name;
727 idata->min_kdata = it->min_kdata;
728 idata->kdata = it->max_kdata;
729 bufferlist insert;
730 idata->encode(insert);
731 to_insert[it->max_kdata.encoded()] = insert;
732 index_data this_entry;
733 this_entry.min_kdata = idata->min_kdata;
734 this_entry.kdata = idata->kdata;
735 this_entry.obj = idata->obj;
736 assertions[it->max_kdata.encoded()] = std::pair<bufferlist, int>
737 (to_bl(this_entry), CEPH_OSD_CMPXATTR_OP_EQ);
738 if (verbose) cout << "\t\t\t" << client_name
739 << "-setup_prefix: will assert "
740 << this_entry.str() << std::endl;
741 }
742 ceph_assert(*err == 0);
743 owo->omap_cmp(assertions, err);
744 if (to_create.size() <= 2) {
745 owo->omap_set(to_insert);
746 }
747 }
748
749 //some args can be null if there are no corresponding entries in p
750 void KvFlatBtreeAsync::set_up_ops(
751 const vector<object_data> &create_vector,
752 const vector<object_data> &delete_vector,
753 vector<std::pair<std::pair<int, string>, librados::ObjectWriteOperation*> > * ops,
754 const index_data &idata,
755 int * err) {
756 vector<std::pair<std::pair<int, string>,
757 librados::ObjectWriteOperation* > >::iterator it;
758
759 //skip the prefixing part
760 for(it = ops->begin(); it->first.first == ADD_PREFIX; ++it) {}
761 std::map<string, bufferlist> to_insert;
762 std::set<string> to_remove;
763 std::map<string, std::pair<bufferlist, int> > assertions;
764 if (create_vector.size() > 0) {
765 for (int i = 0; i < (int)idata.to_delete.size(); ++i) {
766 it->first = std::pair<int, string>(UNWRITE_OBJECT, idata.to_delete[i].obj);
767 set_up_unwrite_object(delete_vector[i].version, it->second);
768 ++it;
769 }
770 }
771 for (int i = 0; i < (int)idata.to_create.size(); ++i) {
772 index_data this_entry(idata.to_create[i].max, idata.to_create[i].min,
773 idata.to_create[i].obj);
774 to_insert[idata.to_create[i].max.encoded()] = to_bl(this_entry);
775 if (idata.to_create.size() <= 2) {
776 it->first = std::pair<int, string>(MAKE_OBJECT, idata.to_create[i].obj);
777 } else {
778 it->first = std::pair<int, string>(AIO_MAKE_OBJECT, idata.to_create[i].obj);
779 }
780 set_up_make_object(create_vector[i].omap, it->second);
781 ++it;
782 }
783 for (int i = 0; i < (int)idata.to_delete.size(); ++i) {
784 index_data this_entry = idata;
785 this_entry.obj = idata.to_delete[i].obj;
786 this_entry.min_kdata = idata.to_delete[i].min;
787 this_entry.kdata = idata.to_delete[i].max;
788 if (verbose) cout << "\t\t\t" << client_name << "-setup_ops: will assert "
789 << this_entry.str() << std::endl;
790 assertions[idata.to_delete[i].max.encoded()] = std::pair<bufferlist, int>(
791 to_bl(this_entry), CEPH_OSD_CMPXATTR_OP_EQ);
792 to_remove.insert(idata.to_delete[i].max.encoded());
793 it->first = std::pair<int, string>(REMOVE_OBJECT, idata.to_delete[i].obj);
794 set_up_delete_object(it->second);
795 ++it;
796 }
797 if ((int)idata.to_create.size() <= 2) {
798 it->second->omap_cmp(assertions, err);
799 }
800 it->second->omap_rm_keys(to_remove);
801 it->second->omap_set(to_insert);
802
803
804 it->first = std::pair<int, string>(REMOVE_PREFIX, index_name);
805 }
806
807 void KvFlatBtreeAsync::set_up_make_object(
808 const std::map<std::string, bufferlist> &to_set,
809 librados::ObjectWriteOperation *owo) {
810 bufferlist inbl;
811 encode(to_set, inbl);
812 owo->exec("kvs", "create_with_omap", inbl);
813 }
814
815 void KvFlatBtreeAsync::set_up_unwrite_object(
816 const int &ver, librados::ObjectWriteOperation *owo) {
817 if (ver > 0) {
818 owo->assert_version(ver);
819 }
820 owo->cmpxattr("unwritable", CEPH_OSD_CMPXATTR_OP_EQ, to_bl("0"));
821 owo->setxattr("unwritable", to_bl("1"));
822 }
823
824 void KvFlatBtreeAsync::set_up_restore_object(
825 librados::ObjectWriteOperation *owo) {
826 owo->cmpxattr("unwritable", CEPH_OSD_CMPXATTR_OP_EQ, to_bl("1"));
827 owo->setxattr("unwritable", to_bl("0"));
828 }
829
830 void KvFlatBtreeAsync::set_up_delete_object(
831 librados::ObjectWriteOperation *owo) {
832 owo->cmpxattr("unwritable", CEPH_OSD_CMPXATTR_OP_EQ, to_bl("1"));
833 owo->remove();
834 }
835
836 int KvFlatBtreeAsync::perform_ops(const string &debug_prefix,
837 const index_data &idata,
838 vector<std::pair<std::pair<int, string>, librados::ObjectWriteOperation*> > *ops) {
839 int err = 0;
840 vector<librados::AioCompletion*> aiocs(idata.to_create.size());
841 int count = 0;
842 for (vector<std::pair<std::pair<int, string>,
843 librados::ObjectWriteOperation*> >::iterator it = ops->begin();
844 it != ops->end(); ++it) {
845 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) {
846 return -ESUICIDE;
847 }
848 switch (it->first.first) {
849 case ADD_PREFIX://prefixing
850 if (verbose) cout << debug_prefix << " adding prefix" << std::endl;
851 err = io_ctx.operate(index_name, it->second);
852 if (err < 0) {
853 if (verbose) cout << debug_prefix << " prefixing the index failed with "
854 << err << std::endl;
855 return -EPREFIX;
856 }
857 if (verbose) cout << debug_prefix << " prefix added." << std::endl;
858 break;
859 case UNWRITE_OBJECT://marking
860 if (verbose) cout << debug_prefix << " marking " << it->first.second
861 << std::endl;
862 err = io_ctx.operate(it->first.second, it->second);
863 if (err < 0) {
864 //most likely because it changed, in which case it will be -ERANGE
865 if (verbose) cout << debug_prefix << " marking " << it->first.second
866 << "failed with code" << err << std::endl;
867 if (it->first.second == (*idata.to_delete.begin()).max.encoded()) {
868 if (cleanup(idata, -EFIRSTOBJ) == -ESUICIDE) {
869 return -ESUICIDE;
870 }
871 } else {
872 if (cleanup(idata, -ERANGE) == -ESUICIDE) {
873 return -ESUICIDE;
874 }
875 }
876 return err;
877 }
878 if (verbose) cout << debug_prefix << " marked " << it->first.second
879 << std::endl;
880 break;
881 case MAKE_OBJECT://creating
882 if (verbose) cout << debug_prefix << " creating " << it->first.second
883 << std::endl;
884 err = io_ctx.operate(it->first.second, it->second);
885 if (err < 0) {
886 //this can happen if someone else was cleaning up after us.
887 if (verbose) cout << debug_prefix << " creating " << it->first.second
888 << " failed"
889 << " with code " << err << std::endl;
890 if (err == -EEXIST) {
891 //someone thinks we died, so die
892 if (verbose) cout << client_name << " is suiciding!" << std::endl;
893 return -ESUICIDE;
894 } else {
895 ceph_abort();
896 }
897 return err;
898 }
899 if (verbose || idata.to_create.size() > 2) {
900 cout << debug_prefix << " created object " << it->first.second
901 << std::endl;
902 }
903 break;
904 case AIO_MAKE_OBJECT:
905 cout << debug_prefix << " launching asynchronous create "
906 << it->first.second << std::endl;
907 aiocs[count] = rados.aio_create_completion();
908 io_ctx.aio_operate(it->first.second, aiocs[count], it->second);
909 count++;
910 if ((int)idata.to_create.size() == count) {
911 cout << "starting aiowrite waiting loop" << std::endl;
912 for (count -= 1; count >= 0; count--) {
913 aiocs[count]->wait_for_complete();
914 err = aiocs[count]->get_return_value();
915 if (err < 0) {
916 //this can happen if someone else was cleaning up after us.
917 cerr << debug_prefix << " a create failed"
918 << " with code " << err << std::endl;
919 if (err == -EEXIST) {
920 //someone thinks we died, so die
921 cerr << client_name << " is suiciding!" << std::endl;
922 return -ESUICIDE;
923 } else {
924 ceph_abort();
925 }
926 return err;
927 }
928 if (verbose || idata.to_create.size() > 2) {
929 cout << debug_prefix << " completed aio " << aiocs.size() - count
930 << "/" << aiocs.size() << std::endl;
931 }
932 }
933 }
934 break;
935 case REMOVE_OBJECT://deleting
936 if (verbose) cout << debug_prefix << " deleting " << it->first.second
937 << std::endl;
938 err = io_ctx.operate(it->first.second, it->second);
939 if (err < 0) {
940 //if someone else called cleanup on this prefix first
941 if (verbose) cout << debug_prefix << " deleting " << it->first.second
942 << "failed with code" << err << std::endl;
943 }
944 if (verbose) cout << debug_prefix << " deleted " << it->first.second
945 << std::endl;
946 break;
947 case REMOVE_PREFIX://rewriting index
948 if (verbose) cout << debug_prefix << " updating index " << std::endl;
949 err = io_ctx.operate(index_name, it->second);
950 if (err < 0) {
951 if (verbose) cout << debug_prefix
952 << " rewriting the index failed with code " << err
953 << ". someone else must have thought we died, so dying" << std::endl;
954 return -ETIMEDOUT;
955 }
956 if (verbose) cout << debug_prefix << " updated index." << std::endl;
957 break;
958 case RESTORE_OBJECT:
959 if (verbose) cout << debug_prefix << " restoring " << it->first.second
960 << std::endl;
961 err = io_ctx.operate(it->first.second, it->second);
962 if (err < 0) {
963 if (verbose) cout << debug_prefix << "restoring " << it->first.second
964 << " failed"
965 << " with " << err << std::endl;
966 return err;
967 }
968 if (verbose) cout << debug_prefix << " restored " << it->first.second
969 << std::endl;
970 break;
971 default:
972 if (verbose) cout << debug_prefix << " performing unknown op on "
973 << it->first.second
974 << std::endl;
975 err = io_ctx.operate(index_name, it->second);
976 if (err < 0) {
977 if (verbose) cout << debug_prefix << " unknown op on "
978 << it->first.second
979 << " failed with " << err << std::endl;
980 return err;
981 }
982 if (verbose) cout << debug_prefix << " unknown op on "
983 << it->first.second
984 << " succeeded." << std::endl;
985 break;
986 }
987 }
988
989 return err;
990 }
991
992 int KvFlatBtreeAsync::cleanup(const index_data &idata, const int &error) {
993 if (verbose) cout << "\t\t" << client_name << ": cleaning up after "
994 << idata.str()
995 << std::endl;
996 int err = 0;
997 ceph_assert(idata.prefix != "");
998 std::map<std::string,bufferlist> new_index;
999 std::map<std::string, std::pair<bufferlist, int> > assertions;
1000 switch (error) {
1001 case -EFIRSTOBJ: {
1002 //this happens if the split or rebalance failed to mark the first object,
1003 //meaning only the index needs to be changed.
1004 //restore objects that had been marked unwritable.
1005 for(vector<delete_data >::const_iterator it =
1006 idata.to_delete.begin();
1007 it != idata.to_delete.end(); ++it) {
1008 index_data this_entry;
1009 this_entry.obj = (*it).obj;
1010 this_entry.min_kdata = it->min;
1011 this_entry.kdata = it->max;
1012 new_index[it->max.encoded()] = to_bl(this_entry);
1013 this_entry = idata;
1014 this_entry.obj = it->obj;
1015 this_entry.min_kdata = it->min;
1016 this_entry.kdata = it->max;
1017 if (verbose) cout << "\t\t\t" << client_name
1018 << "-cleanup: will assert index contains "
1019 << this_entry.str() << std::endl;
1020 assertions[it->max.encoded()] =
1021 std::pair<bufferlist, int>(to_bl(this_entry),
1022 CEPH_OSD_CMPXATTR_OP_EQ);
1023 }
1024
1025 //update the index
1026 librados::ObjectWriteOperation update_index;
1027 update_index.omap_cmp(assertions, &err);
1028 update_index.omap_set(new_index);
1029 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: updating index"
1030 << std::endl;
1031 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) {
1032 return -ESUICIDE;
1033 }
1034 err = io_ctx.operate(index_name, &update_index);
1035 if (err < 0) {
1036 if (verbose) cout << "\t\t\t" << client_name
1037 << "-cleanup: rewriting failed with "
1038 << err << ". returning -ECANCELED" << std::endl;
1039 return -ECANCELED;
1040 }
1041 if (verbose) cout << "\t\t\t" << client_name
1042 << "-cleanup: updated index. cleanup done."
1043 << std::endl;
1044 break;
1045 }
1046 case -ERANGE: {
1047 //this happens if a split or rebalance fails to mark an object. It is a
1048 //special case of rolling back that does not have to deal with new objects.
1049
1050 //restore objects that had been marked unwritable.
1051 vector<delete_data >::const_iterator it;
1052 for(it = idata.to_delete.begin();
1053 it != idata.to_delete.end(); ++it) {
1054 index_data this_entry;
1055 this_entry.obj = (*it).obj;
1056 this_entry.min_kdata = it->min;
1057 this_entry.kdata = it->max;
1058 new_index[it->max.encoded()] = to_bl(this_entry);
1059 this_entry = idata;
1060 this_entry.obj = it->obj;
1061 this_entry.min_kdata = it->min;
1062 this_entry.kdata = it->max;
1063 if (verbose) cout << "\t\t\t" << client_name
1064 << "-cleanup: will assert index contains "
1065 << this_entry.str() << std::endl;
1066 assertions[it->max.encoded()] =
1067 std::pair<bufferlist, int>(to_bl(this_entry),
1068 CEPH_OSD_CMPXATTR_OP_EQ);
1069 }
1070 it = idata.to_delete.begin();
1071 librados::ObjectWriteOperation restore;
1072 set_up_restore_object(&restore);
1073 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) {
1074 return -ESUICIDE;
1075 }
1076 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: restoring "
1077 << it->obj
1078 << std::endl;
1079 err = io_ctx.operate(it->obj, &restore);
1080 if (err < 0) {
1081 //i.e., -ECANCELED because the object was already restored by someone
1082 //else
1083 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: restoring "
1084 << it->obj
1085 << " failed with " << err << std::endl;
1086 } else {
1087 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: restored "
1088 << it->obj
1089 << std::endl;
1090 }
1091
1092 //update the index
1093 librados::ObjectWriteOperation update_index;
1094 update_index.omap_cmp(assertions, &err);
1095 update_index.omap_set(new_index);
1096 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: updating index"
1097 << std::endl;
1098 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) {
1099 return -ESUICIDE;
1100 }
1101 err = io_ctx.operate(index_name, &update_index);
1102 if (err < 0) {
1103 if (verbose) cout << "\t\t\t" << client_name
1104 << "-cleanup: rewriting failed with "
1105 << err << ". returning -ECANCELED" << std::endl;
1106 return -ECANCELED;
1107 }
1108 if (verbose) cout << "\t\t\t" << client_name
1109 << "-cleanup: updated index. cleanup done."
1110 << std::endl;
1111 break;
1112 }
1113 case -ENOENT: {
1114 if (verbose) cout << "\t\t" << client_name << "-cleanup: rolling forward"
1115 << std::endl;
1116 //all changes were created except for updating the index and possibly
1117 //deleting the objects. roll forward.
1118 vector<std::pair<std::pair<int, string>, librados::ObjectWriteOperation*> > ops;
1119 vector<librados::ObjectWriteOperation> owos(idata.to_delete.size() + 1);
1120 for (int i = 0; i <= (int)idata.to_delete.size(); ++i) {
1121 ops.push_back(std::make_pair(std::pair<int, std::string>(0, ""), &owos[i]));
1122 }
1123 set_up_ops(vector<object_data>(),
1124 vector<object_data>(), &ops, idata, &err);
1125 err = perform_ops("\t\t" + client_name + "-cleanup:", idata, &ops);
1126 if (err < 0) {
1127 if (err == -ESUICIDE) {
1128 return -ESUICIDE;
1129 }
1130 if (verbose) cout << "\t\t\t" << client_name
1131 << "-cleanup: rewriting failed with "
1132 << err << ". returning -ECANCELED" << std::endl;
1133 return -ECANCELED;
1134 }
1135 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: updated index"
1136 << std::endl;
1137 break;
1138 }
1139 default: {
1140 //roll back all changes.
1141 if (verbose) cout << "\t\t" << client_name << "-cleanup: rolling back"
1142 << std::endl;
1143 std::map<std::string,bufferlist> new_index;
1144 std::set<std::string> to_remove;
1145 std::map<std::string, std::pair<bufferlist, int> > assertions;
1146
1147 //mark the objects to be created. if someone else already has, die.
1148 for(vector<create_data >::const_reverse_iterator it =
1149 idata.to_create.rbegin();
1150 it != idata.to_create.rend(); ++it) {
1151 librados::ObjectWriteOperation rm;
1152 set_up_unwrite_object(0, &rm);
1153 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 )
1154 {
1155 return -ESUICIDE;
1156 }
1157 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: marking "
1158 << it->obj
1159 << std::endl;
1160 err = io_ctx.operate(it->obj, &rm);
1161 if (err < 0) {
1162 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: marking "
1163 << it->obj
1164 << " failed with " << err << std::endl;
1165 } else {
1166 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: marked "
1167 << it->obj
1168 << std::endl;
1169 }
1170 }
1171
1172 //restore objects that had been marked unwritable.
1173 for(vector<delete_data >::const_iterator it =
1174 idata.to_delete.begin();
1175 it != idata.to_delete.end(); ++it) {
1176 index_data this_entry;
1177 this_entry.obj = (*it).obj;
1178 this_entry.min_kdata = it->min;
1179 this_entry.kdata = it->max;
1180 new_index[it->max.encoded()] = to_bl(this_entry);
1181 this_entry = idata;
1182 this_entry.obj = it->obj;
1183 this_entry.min_kdata = it->min;
1184 this_entry.kdata = it->max;
1185 if (verbose) cout << "\t\t\t" << client_name
1186 << "-cleanup: will assert index contains "
1187 << this_entry.str() << std::endl;
1188 assertions[it->max.encoded()] =
1189 std::pair<bufferlist, int>(to_bl(this_entry),
1190 CEPH_OSD_CMPXATTR_OP_EQ);
1191 librados::ObjectWriteOperation restore;
1192 set_up_restore_object(&restore);
1193 if (verbose) cout << "\t\t\t" << client_name
1194 << "-cleanup: will assert index contains "
1195 << this_entry.str() << std::endl;
1196 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 )
1197 {
1198 return -ESUICIDE;
1199 }
1200 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: restoring "
1201 << it->obj
1202 << std::endl;
1203 err = io_ctx.operate(it->obj, &restore);
1204 if (err == -ENOENT) {
1205 //it had gotten far enough to be rolled forward - unmark the objects
1206 //and roll forward.
1207 if (verbose) cout << "\t\t\t" << client_name
1208 << "-cleanup: roll forward instead"
1209 << std::endl;
1210 for(vector<create_data >::const_iterator cit =
1211 idata.to_create.begin();
1212 cit != idata.to_create.end(); ++cit) {
1213 librados::ObjectWriteOperation res;
1214 set_up_restore_object(&res);
1215 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)()
1216 == 1 ) {
1217 return -ECANCELED;
1218 }
1219 if (verbose) cout << "\t\t\t" << client_name
1220 << "-cleanup: restoring " << cit->obj
1221 << std::endl;
1222 err = io_ctx.operate(cit->obj, &res);
1223 if (err < 0) {
1224 if (verbose) cout << "\t\t\t" << client_name
1225 << "-cleanup: restoring "
1226 << cit->obj << " failed with " << err << std::endl;
1227 }
1228 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: restored "
1229 << cit->obj
1230 << std::endl;
1231 }
1232 return cleanup(idata, -ENOENT);
1233 } else if (err < 0) {
1234 //i.e., -ECANCELED because the object was already restored by someone
1235 //else
1236 if (verbose) cout << "\t\t\t" << client_name
1237 << "-cleanup: restoring " << it->obj
1238 << " failed with " << err << std::endl;
1239 } else {
1240 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: restored "
1241 << it->obj
1242 << std::endl;
1243 }
1244 }
1245
1246 //remove the new objects
1247 for(vector<create_data >::const_reverse_iterator it =
1248 idata.to_create.rbegin();
1249 it != idata.to_create.rend(); ++it) {
1250 to_remove.insert(it->max.encoded());
1251 librados::ObjectWriteOperation rm;
1252 rm.remove();
1253 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 )
1254 {
1255 return -ESUICIDE;
1256 }
1257 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: removing "
1258 << it->obj
1259 << std::endl;
1260 err = io_ctx.operate(it->obj, &rm);
1261 if (err < 0) {
1262 if (verbose) cout << "\t\t\t" << client_name
1263 << "-cleanup: failed to remove "
1264 << it->obj << std::endl;
1265 } else {
1266 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: removed "
1267 << it->obj
1268 << std::endl;
1269 }
1270 }
1271
1272 //update the index
1273 librados::ObjectWriteOperation update_index;
1274 update_index.omap_cmp(assertions, &err);
1275 update_index.omap_rm_keys(to_remove);
1276 update_index.omap_set(new_index);
1277 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: updating index"
1278 << std::endl;
1279 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) {
1280 return -ESUICIDE;
1281 }
1282 err = io_ctx.operate(index_name, &update_index);
1283 if (err < 0) {
1284 if (verbose) cout << "\t\t\t" << client_name
1285 << "-cleanup: rewriting failed with "
1286 << err << ". returning -ECANCELED" << std::endl;
1287 return -ECANCELED;
1288 }
1289 if (verbose) cout << "\t\t\t" << client_name
1290 << "-cleanup: updated index. cleanup done."
1291 << std::endl;
1292 break;
1293 }
1294 }
1295 return err;
1296 }
1297
1298 string KvFlatBtreeAsync::to_string(string s, int i) {
1299 stringstream ret;
1300 ret << s << i;
1301 return ret.str();
1302 }
1303
1304 string KvFlatBtreeAsync::get_name() {
1305 return rados_id;
1306 }
1307
1308 void KvFlatBtreeAsync::set_inject(injection_t inject, int wait_time) {
1309 interrupt = inject;
1310 wait_ms = wait_time;
1311 }
1312
1313 int KvFlatBtreeAsync::setup(int argc, const char** argv) {
1314 int r = rados.init(rados_id.c_str());
1315 if (r < 0) {
1316 cerr << "error during init" << r << std::endl;
1317 return r;
1318 }
1319 r = rados.conf_parse_argv(argc, argv);
1320 if (r < 0) {
1321 cerr << "error during parsing args" << r << std::endl;
1322 return r;
1323 }
1324 r = rados.conf_parse_env(NULL);
1325 if (r < 0) {
1326 cerr << "error during parsing env" << r << std::endl;
1327 return r;
1328 }
1329 r = rados.conf_read_file(NULL);
1330 if (r < 0) {
1331 cerr << "error during read file: " << r << std::endl;
1332 return r;
1333 }
1334 r = rados.connect();
1335 if (r < 0) {
1336 cerr << "error during connect: " << r << std::endl;
1337 return r;
1338 }
1339 r = rados.ioctx_create(pool_name.c_str(), io_ctx);
1340 if (r < 0) {
1341 cerr << "error creating io ctx: " << r << std::endl;
1342 rados.shutdown();
1343 return r;
1344 }
1345
1346 librados::ObjectWriteOperation make_index;
1347 make_index.create(true);
1348 std::map<std::string,bufferlist> index_map;
1349 index_data idata;
1350 idata.obj = client_name;
1351 idata.min_kdata.raw_key = "";
1352 idata.kdata = key_data("");
1353 index_map["1"] = to_bl(idata);
1354 make_index.omap_set(index_map);
1355 r = io_ctx.operate(index_name, &make_index);
1356 if (r < 0) {
1357 if (verbose) cout << client_name << ": Making the index failed with code "
1358 << r
1359 << std::endl;
1360 return 0;
1361 }
1362 if (verbose) cout << client_name << ": created index object" << std::endl;
1363
1364 librados::ObjectWriteOperation make_max_obj;
1365 make_max_obj.create(true);
1366 make_max_obj.setxattr("unwritable", to_bl("0"));
1367 make_max_obj.setxattr("size", to_bl("0"));
1368 r = io_ctx.operate(client_name, &make_max_obj);
1369 if (r < 0) {
1370 if (verbose) cout << client_name << ": Setting xattr failed with code "
1371 << r
1372 << std::endl;
1373 }
1374
1375 return 0;
1376 }
1377
1378 int KvFlatBtreeAsync::set(const string &key, const bufferlist &val,
1379 bool update_on_existing) {
1380 if (verbose) cout << client_name << " is "
1381 << (update_on_existing? "updating " : "setting ")
1382 << key << std::endl;
1383 int err = 0;
1384 utime_t mytime;
1385 index_data idata(key);
1386
1387 if (verbose) cout << "\t" << client_name << ": finding oid" << std::endl;
1388 err = read_index(key, &idata, NULL, false);
1389 if (err < 0) {
1390 if (verbose) cout << "\t" << client_name
1391 << ": getting oid failed with code "
1392 << err << std::endl;
1393 return err;
1394 }
1395 if (verbose) cout << "\t" << client_name << ": index data is " << idata.str()
1396 << ", object is " << idata.obj << std::endl;
1397
1398 err = set_op(key, val, update_on_existing, idata);
1399
1400 if (verbose) cout << "\t" << client_name << ": finished set with " << err
1401 << std::endl;
1402 return err;
1403 }
1404
1405 int KvFlatBtreeAsync::set_op(const string &key, const bufferlist &val,
1406 bool update_on_existing, index_data &idata) {
1407 //write
1408
1409 bufferlist inbl;
1410 omap_set_args args;
1411 args.bound = 2 * k;
1412 args.exclusive = !update_on_existing;
1413 args.omap[key] = val;
1414 args.encode(inbl);
1415
1416 librados::ObjectWriteOperation owo;
1417 owo.exec("kvs", "omap_insert", inbl);
1418 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) {
1419 if (verbose) cout << client_name << " IS SUICIDING!" << std::endl;
1420 return -ESUICIDE;
1421 }
1422 if (verbose) cout << "\t" << client_name << ": inserting " << key
1423 << " into object "
1424 << idata.obj << std::endl;
1425 int err = io_ctx.operate(idata.obj, &owo);
1426 if (err < 0) {
1427 switch (err) {
1428 case -EEXIST: {
1429 //the key already exists and this is an exclusive insert.
1430 cerr << "\t" << client_name << ": writing key failed with "
1431 << err << std::endl;
1432 return err;
1433 }
1434 case -EKEYREJECTED: {
1435 //the object needs to be split.
1436 do {
1437 if (verbose) cout << "\t" << client_name << ": running split on "
1438 << idata.obj
1439 << std::endl;
1440 err = read_index(key, &idata, NULL, true);
1441 if (err < 0) {
1442 if (verbose) cout << "\t" << client_name
1443 << ": getting oid failed with code "
1444 << err << std::endl;
1445 return err;
1446 }
1447 err = split(idata);
1448 if (err < 0 && err != -ENOENT && err != -EBALANCE) {
1449 if (verbose) cerr << "\t" << client_name << ": split failed with "
1450 << err << std::endl;
1451 int ret = handle_set_rm_errors(err, idata.obj, key, &idata, NULL);
1452 switch (ret) {
1453 case -ESUICIDE:
1454 if (verbose) cout << client_name << " IS SUICIDING!" << std::endl;
1455 return ret;
1456 case 1:
1457 return set_op(key, val, update_on_existing, idata);
1458 case 2:
1459 return err;
1460 }
1461 }
1462 } while (err < 0 && err != -EBALANCE && err != -ENOENT);
1463 err = read_index(key, &idata, NULL, true);
1464 if (err < 0) {
1465 if (verbose) cout << "\t" << client_name
1466 << ": getting oid failed with code "
1467 << err << std::endl;
1468 return err;
1469 }
1470 return set_op(key, val, update_on_existing, idata);
1471 }
1472 default:
1473 if (verbose) cerr << "\t" << client_name << ": writing obj failed with "
1474 << err << std::endl;
1475 if (err == -ENOENT || err == -EACCES) {
1476 if (err == -ENOENT) {
1477 if (verbose) cout << "CACHE FAILURE" << std::endl;
1478 }
1479 err = read_index(key, &idata, NULL, true);
1480 if (err < 0) {
1481 if (verbose) cout << "\t" << client_name
1482 << ": getting oid failed with code "
1483 << err << std::endl;
1484 return err;
1485 }
1486 if (verbose) cout << "\t" << client_name << ": index data is "
1487 << idata.str()
1488 << ", object is " << idata.obj << std::endl;
1489 return set_op(key, val, update_on_existing, idata);
1490 } else {
1491 return err;
1492 }
1493 }
1494 }
1495 return 0;
1496 }
1497
1498 int KvFlatBtreeAsync::remove(const string &key) {
1499 if (verbose) cout << client_name << ": removing " << key << std::endl;
1500 int err = 0;
1501 string obj;
1502 utime_t mytime;
1503 index_data idata;
1504 index_data next_idata;
1505
1506 if (verbose) cout << "\t" << client_name << ": finding oid" << std::endl;
1507 err = read_index(key, &idata, &next_idata, false);
1508 if (err < 0) {
1509 if (verbose) cout << "getting oid failed with code " << err << std::endl;
1510 return err;
1511 }
1512 obj = idata.obj;
1513 if (verbose) cout << "\t" << client_name << ": idata is " << idata.str()
1514 << ", next_idata is " << next_idata.str()
1515 << ", obj is " << obj << std::endl;
1516
1517 err = remove_op(key, idata, next_idata);
1518
1519 if (verbose) cout << "\t" << client_name << ": finished remove with " << err
1520 << " and exiting" << std::endl;
1521 return err;
1522 }
1523
1524 int KvFlatBtreeAsync::remove_op(const string &key, index_data &idata,
1525 index_data &next_idata) {
1526 //write
1527 bufferlist inbl;
1528 omap_rm_args args;
1529 args.bound = k;
1530 args.omap.insert(key);
1531 args.encode(inbl);
1532
1533 librados::ObjectWriteOperation owo;
1534 owo.exec("kvs", "omap_remove", inbl);
1535 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) {
1536 if (verbose) cout << client_name << " IS SUICIDING!" << std::endl;
1537 return -ESUICIDE;
1538 }
1539 if (verbose) cout << "\t" << client_name << ": removing " << key << " from "
1540 << idata.obj
1541 << std::endl;
1542 int err = io_ctx.operate(idata.obj, &owo);
1543 if (err < 0) {
1544 if (verbose) cout << "\t" << client_name << ": writing obj failed with "
1545 << err << std::endl;
1546 switch (err) {
1547 case -ENODATA: {
1548 //the key does not exist in the object
1549 return err;
1550 }
1551 case -EKEYREJECTED: {
1552 //the object needs to be split.
1553 do {
1554 if (verbose) cerr << "\t" << client_name << ": running rebalance on "
1555 << idata.obj << std::endl;
1556 err = read_index(key, &idata, &next_idata, true);
1557 if (err < 0) {
1558 if (verbose) cout << "\t" << client_name
1559 << ": getting oid failed with code "
1560 << err << std::endl;
1561 return err;
1562 }
1563 err = rebalance(idata, next_idata);
1564 if (err < 0 && err != -ENOENT && err != -EBALANCE) {
1565 if (verbose) cerr << "\t" << client_name << ": rebalance returned "
1566 << err << std::endl;
1567 int ret = handle_set_rm_errors(err, idata.obj, key, &idata,
1568 &next_idata);
1569 switch (ret) {
1570 case -ESUICIDE:
1571 if (verbose) cout << client_name << " IS SUICIDING!" << std::endl;
1572 return err;
1573 case 1:
1574 return remove_op(key, idata, next_idata);
1575 case 2:
1576 return err;
1577 break;
1578 case -EUCLEAN:
1579 //this is the only node, so it's ok to go below k.
1580 librados::ObjectWriteOperation owo;
1581 bufferlist inbl;
1582 omap_rm_args args;
1583 args.bound = 0;
1584 args.omap.insert(key);
1585 args.encode(inbl);
1586 owo.exec("kvs", "omap_remove", inbl);
1587 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)()
1588 == 1 ) {
1589 if (verbose) cout << client_name << " IS SUICIDING!"
1590 << std::endl;
1591 return -ESUICIDE;
1592 }
1593 if (verbose) cout << "\t" << client_name << ": removing " << key
1594 << " from "
1595 << idata.obj
1596 << std::endl;
1597 int err = io_ctx.operate(idata.obj, &owo);
1598 if (err == 0) {
1599 return 0;
1600 }
1601 }
1602 }
1603 } while (err < 0 && err != -EBALANCE && err != -ENOENT);
1604 err = read_index(key, &idata, &next_idata, true);
1605 if (err < 0) {
1606 if (verbose) cout << "\t" << client_name
1607 << ": getting oid failed with code "
1608 << err << std::endl;
1609 return err;
1610 }
1611 return remove(key);
1612 }
1613 default:
1614 if (err == -ENOENT || err == -EACCES) {
1615 err = read_index(key, &idata, &next_idata, true);
1616 if (err < 0) {
1617 if (verbose) cout << "\t" << client_name
1618 << ": getting oid failed with code "
1619 << err << std::endl;
1620 return err;
1621 }
1622 if (verbose) cout << "\t" << client_name << ": index data is "
1623 << idata.str()
1624 << ", object is " << idata.obj << std::endl;
1625 //idea: we read the time every time we read the index anyway - store it.
1626 return remove_op(key, idata, next_idata);
1627 } else {
1628 return err;
1629 }
1630 }
1631 }
1632 return 0;
1633 }
1634
1635 int KvFlatBtreeAsync::handle_set_rm_errors(int &err, string obj,
1636 string key,
1637 index_data * idata, index_data * next_idata) {
1638 if (err == -ESUICIDE) {
1639 return err;
1640 } else if (err == -ECANCELED //if an object was unwritable or index changed
1641 || err == -EPREFIX //if there is currently a prefix
1642 || err == -ETIMEDOUT// if the index changes during the op - i.e. cleanup
1643 || err == -EACCES) //possible if we were acting on old index data
1644 {
1645 err = read_index(key, idata, next_idata, true);
1646 if (err < 0) {
1647 return err;
1648 }
1649 if (verbose) cout << "\t" << client_name << ": prefix is " << idata->str()
1650 << std::endl;
1651 if (idata->obj != obj) {
1652 //someone else has split or cleaned up or something. start over.
1653 return 1;//meaning repeat
1654 }
1655 } else if (err != -ETIMEDOUT && err != -ERANGE && err != -EACCES
1656 && err != -EUCLEAN){
1657 if (verbose) cout << "\t" << client_name
1658 << ": split encountered an unexpected error: " << err
1659 << std::endl;
1660 return 2;
1661 }
1662 return err;
1663 }
1664
1665 int KvFlatBtreeAsync::get(const string &key, bufferlist *val) {
1666 opmap['g']++;
1667 if (verbose) cout << client_name << ": getting " << key << std::endl;
1668 int err = 0;
1669 index_data idata;
1670 utime_t mytime;
1671
1672 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) {
1673 return -ESUICIDE;
1674 }
1675 err = read_index(key, &idata, NULL, false);
1676 mytime = ceph_clock_now();
1677 if (err < 0) {
1678 if (verbose) cout << "getting oid failed with code " << err << std::endl;
1679 return err;
1680 }
1681
1682 err = get_op(key, val, idata);
1683
1684 if (verbose) cout << client_name << ": got " << key << " with " << err
1685 << std::endl;
1686
1687 return err;
1688 }
1689
1690 int KvFlatBtreeAsync::get_op(const string &key, bufferlist *val,
1691 index_data &idata) {
1692 int err = 0;
1693 std::set<std::string> key_set;
1694 key_set.insert(key);
1695 std::map<std::string,bufferlist> omap;
1696 librados::ObjectReadOperation read;
1697 read.omap_get_vals_by_keys(key_set, &omap, &err);
1698 err = io_ctx.operate(idata.obj, &read, NULL);
1699 if (err < 0) {
1700 if (err == -ENOENT) {
1701 err = read_index(key, &idata, NULL, true);
1702 if (err < 0) {
1703 if (verbose) cout << "\t" << client_name
1704 << ": getting oid failed with code "
1705 << err << std::endl;
1706 return err;
1707 }
1708 if (verbose) cout << "\t" << client_name << ": index data is "
1709 << idata.str()
1710 << ", object is " << idata.obj << std::endl;
1711 return get_op(key, val, idata);
1712 } else {
1713 if (verbose) cout << client_name
1714 << ": get encountered an unexpected error: " << err
1715 << std::endl;
1716 return err;
1717 }
1718 }
1719
1720 *val = omap[key];
1721 return err;
1722 }
1723
1724 void *KvFlatBtreeAsync::pset(void *ptr) {
1725 struct aio_set_args *args = (struct aio_set_args *)ptr;
1726 *args->err =
1727 args->kvba->KvFlatBtreeAsync::set((string)args->key,
1728 (bufferlist)args->val, (bool)args->exc);
1729 args->cb(args->err, args->cb_args);
1730 delete args;
1731 return NULL;
1732 }
1733
1734 void KvFlatBtreeAsync::aio_set(const string &key, const bufferlist &val,
1735 bool exclusive, callback cb, void * cb_args, int * err) {
1736 aio_set_args *args = new aio_set_args();
1737 args->kvba = this;
1738 args->key = key;
1739 args->val = val;
1740 args->exc = exclusive;
1741 args->cb = cb;
1742 args->cb_args = cb_args;
1743 args->err = err;
1744 pthread_t t;
1745 int r = pthread_create(&t, NULL, pset, (void*)args);
1746 if (r < 0) {
1747 *args->err = r;
1748 return;
1749 }
1750 pthread_detach(t);
1751 }
1752
1753 void *KvFlatBtreeAsync::prm(void *ptr) {
1754 struct aio_rm_args *args = (struct aio_rm_args *)ptr;
1755 *args->err =
1756 args->kvba->KvFlatBtreeAsync::remove((string)args->key);
1757 args->cb(args->err, args->cb_args);
1758 delete args;
1759 return NULL;
1760 }
1761
1762 void KvFlatBtreeAsync::aio_remove(const string &key,
1763 callback cb, void * cb_args, int * err) {
1764 aio_rm_args * args = new aio_rm_args();
1765 args->kvba = this;
1766 args->key = key;
1767 args->cb = cb;
1768 args->cb_args = cb_args;
1769 args->err = err;
1770 pthread_t t;
1771 int r = pthread_create(&t, NULL, prm, (void*)args);
1772 if (r < 0) {
1773 *args->err = r;
1774 return;
1775 }
1776 pthread_detach(t);
1777 }
1778
1779 void *KvFlatBtreeAsync::pget(void *ptr) {
1780 struct aio_get_args *args = (struct aio_get_args *)ptr;
1781 *args->err =
1782 args->kvba->KvFlatBtreeAsync::get((string)args->key,
1783 (bufferlist *)args->val);
1784 args->cb(args->err, args->cb_args);
1785 delete args;
1786 return NULL;
1787 }
1788
1789 void KvFlatBtreeAsync::aio_get(const string &key, bufferlist *val,
1790 callback cb, void * cb_args, int * err) {
1791 aio_get_args * args = new aio_get_args();
1792 args->kvba = this;
1793 args->key = key;
1794 args->val = val;
1795 args->cb = cb;
1796 args->cb_args = cb_args;
1797 args->err = err;
1798 pthread_t t;
1799 int r = pthread_create(&t, NULL, pget, (void*)args);
1800 if (r < 0) {
1801 *args->err = r;
1802 return;
1803 }
1804 pthread_detach(t);
1805 }
1806
1807 int KvFlatBtreeAsync::set_many(const std::map<string, bufferlist> &in_map) {
1808 int err = 0;
1809 bufferlist inbl;
1810 bufferlist outbl;
1811 std::set<string> keys;
1812
1813 std::map<string, bufferlist> big_map;
1814 for (map<string, bufferlist>::const_iterator it = in_map.begin();
1815 it != in_map.end(); ++it) {
1816 keys.insert(it->first);
1817 big_map.insert(*it);
1818 }
1819
1820 if (verbose) cout << "created key set and big_map" << std::endl;
1821
1822 encode(keys, inbl);
1823 librados::AioCompletion * aioc = rados.aio_create_completion();
1824 io_ctx.aio_exec(index_name, aioc, "kvs", "read_many", inbl, &outbl);
1825 aioc->wait_for_complete();
1826 err = aioc->get_return_value();
1827 aioc->release();
1828 if (err < 0) {
1829 cerr << "getting index failed with " << err << std::endl;
1830 return err;
1831 }
1832
1833 std::map<string, bufferlist> imap;//read from the index
1834 auto blit = outbl.cbegin();
1835 decode(imap, blit);
1836
1837 if (verbose) cout << "finished reading index for objects. there are "
1838 << imap.size() << " entries that need to be changed. " << std::endl;
1839
1840
1841 vector<object_data> to_delete;
1842
1843 vector<object_data> to_create;
1844
1845 if (verbose) cout << "setting up to_delete and to_create vectors from index "
1846 << "map" << std::endl;
1847 //set up to_delete from index map
1848 for (map<string, bufferlist>::iterator it = imap.begin(); it != imap.end();
1849 ++it){
1850 index_data idata;
1851 blit = it->second.begin();
1852 idata.decode(blit);
1853 to_delete.push_back(object_data(idata.min_kdata, idata.kdata, idata.obj));
1854 err = read_object(idata.obj, &to_delete[to_delete.size() - 1]);
1855 if (err < 0) {
1856 if (verbose) cout << "reading " << idata.obj << " failed with " << err
1857 << std::endl;
1858 return set_many(in_map);
1859 }
1860
1861 big_map.insert(to_delete[to_delete.size() - 1].omap.begin(),
1862 to_delete[to_delete.size() - 1].omap.end());
1863 }
1864
1865 to_create.push_back(object_data(
1866 to_string(client_name, client_index++)));
1867 to_create[0].min_kdata = to_delete[0].min_kdata;
1868
1869 for(map<string, bufferlist>::iterator it = big_map.begin();
1870 it != big_map.end(); ++it) {
1871 if (to_create[to_create.size() - 1].omap.size() == 1.5 * k) {
1872 to_create[to_create.size() - 1].max_kdata =
1873 key_data(to_create[to_create.size() - 1]
1874 .omap.rbegin()->first);
1875
1876 to_create.push_back(object_data(
1877 to_string(client_name, client_index++)));
1878 to_create[to_create.size() - 1].min_kdata =
1879 to_create[to_create.size() - 2].max_kdata;
1880 }
1881
1882 to_create[to_create.size() - 1].omap.insert(*it);
1883 }
1884 to_create[to_create.size() - 1].max_kdata =
1885 to_delete[to_delete.size() - 1].max_kdata;
1886
1887 vector<librados::ObjectWriteOperation> owos(2 + 2 * to_delete.size()
1888 + to_create.size());
1889 vector<std::pair<std::pair<int, string>, librados::ObjectWriteOperation*> > ops;
1890
1891
1892 index_data idata;
1893 set_up_prefix_index(to_create, to_delete, &owos[0], &idata, &err);
1894
1895 if (verbose) cout << "finished making to_create and to_delete. "
1896 << std::endl;
1897
1898 ops.push_back(std::make_pair(
1899 std::pair<int, string>(ADD_PREFIX, index_name),
1900 &owos[0]));
1901 for (int i = 1; i < 2 + 2 * (int)to_delete.size() + (int)to_create.size();
1902 i++) {
1903 ops.push_back(std::make_pair(std::make_pair(0,""), &owos[i]));
1904 }
1905
1906 set_up_ops(to_create, to_delete, &ops, idata, &err);
1907
1908 cout << "finished setting up ops. Starting critical section..." << std::endl;
1909
1910 /////BEGIN CRITICAL SECTION/////
1911 //put prefix on index entry for idata.val
1912 err = perform_ops("\t\t" + client_name + "-set_many:", idata, &ops);
1913 if (err < 0) {
1914 return set_many(in_map);
1915 }
1916 if (verbose) cout << "\t\t" << client_name << "-split: done splitting."
1917 << std::endl;
1918 /////END CRITICAL SECTION/////
1919 std::scoped_lock l{icache_lock};
1920 for (vector<delete_data>::iterator it = idata.to_delete.begin();
1921 it != idata.to_delete.end(); ++it) {
1922 icache.erase(it->max);
1923 }
1924 for (vector<create_data>::iterator it = idata.to_create.begin();
1925 it != idata.to_create.end(); ++it) {
1926 icache.push(index_data(*it));
1927 }
1928 return err;
1929 }
1930
1931 int KvFlatBtreeAsync::remove_all() {
1932 if (verbose) cout << client_name << ": removing all" << std::endl;
1933 int err = 0;
1934 librados::ObjectReadOperation oro;
1935 librados::AioCompletion * oro_aioc = rados.aio_create_completion();
1936 std::map<std::string, bufferlist> index_set;
1937 oro.omap_get_vals2("",LONG_MAX,&index_set, nullptr, &err);
1938 err = io_ctx.aio_operate(index_name, oro_aioc, &oro, NULL);
1939 if (err < 0){
1940 if (err == -ENOENT) {
1941 return 0;
1942 }
1943 if (verbose) cout << "getting keys failed with error " << err << std::endl;
1944 return err;
1945 }
1946 oro_aioc->wait_for_complete();
1947 oro_aioc->release();
1948
1949 librados::ObjectWriteOperation rm_index;
1950 librados::AioCompletion * rm_index_aioc = rados.aio_create_completion();
1951 std::map<std::string,bufferlist> new_index;
1952 new_index["1"] = index_set["1"];
1953 rm_index.omap_clear();
1954 rm_index.omap_set(new_index);
1955 io_ctx.aio_operate(index_name, rm_index_aioc, &rm_index);
1956 err = rm_index_aioc->get_return_value();
1957 rm_index_aioc->release();
1958 if (err < 0) {
1959 if (verbose) cout << "rm index aioc failed with " << err
1960 << std::endl;
1961 return err;
1962 }
1963
1964 if (!index_set.empty()) {
1965 for (std::map<std::string,bufferlist>::iterator it = index_set.begin();
1966 it != index_set.end(); ++it){
1967 librados::ObjectWriteOperation sub;
1968 if (it->first == "1") {
1969 sub.omap_clear();
1970 } else {
1971 sub.remove();
1972 }
1973 index_data idata;
1974 auto b = it->second.cbegin();
1975 idata.decode(b);
1976 io_ctx.operate(idata.obj, &sub);
1977 }
1978 }
1979
1980 icache.clear();
1981
1982 return 0;
1983 }
1984
1985 int KvFlatBtreeAsync::get_all_keys(std::set<std::string> *keys) {
1986 if (verbose) cout << client_name << ": getting all keys" << std::endl;
1987 int err = 0;
1988 librados::ObjectReadOperation oro;
1989 std::map<std::string,bufferlist> index_set;
1990 oro.omap_get_vals2("",LONG_MAX,&index_set, nullptr, &err);
1991 io_ctx.operate(index_name, &oro, NULL);
1992 if (err < 0){
1993 if (verbose) cout << "getting keys failed with error " << err << std::endl;
1994 return err;
1995 }
1996 for (std::map<std::string,bufferlist>::iterator it = index_set.begin();
1997 it != index_set.end(); ++it){
1998 librados::ObjectReadOperation sub;
1999 std::set<std::string> ret;
2000 sub.omap_get_keys2("",LONG_MAX,&ret, nullptr, &err);
2001 index_data idata;
2002 auto b = it->second.cbegin();
2003 idata.decode(b);
2004 io_ctx.operate(idata.obj, &sub, NULL);
2005 keys->insert(ret.begin(), ret.end());
2006 }
2007 return err;
2008 }
2009
2010 int KvFlatBtreeAsync::get_all_keys_and_values(
2011 std::map<std::string,bufferlist> *kv_map) {
2012 if (verbose) cout << client_name << ": getting all keys and values"
2013 << std::endl;
2014 int err = 0;
2015 librados::ObjectReadOperation first_read;
2016 std::set<std::string> index_set;
2017 first_read.omap_get_keys2("",LONG_MAX,&index_set, nullptr, &err);
2018 io_ctx.operate(index_name, &first_read, NULL);
2019 if (err < 0){
2020 if (verbose) cout << "getting keys failed with error " << err << std::endl;
2021 return err;
2022 }
2023 for (std::set<std::string>::iterator it = index_set.begin();
2024 it != index_set.end(); ++it){
2025 librados::ObjectReadOperation sub;
2026 std::map<std::string, bufferlist> ret;
2027 sub.omap_get_vals2("",LONG_MAX,&ret, nullptr, &err);
2028 io_ctx.operate(*it, &sub, NULL);
2029 kv_map->insert(ret.begin(), ret.end());
2030 }
2031 return err;
2032 }
2033
2034 bool KvFlatBtreeAsync::is_consistent() {
2035 int err;
2036 bool ret = true;
2037 if (verbose) cout << client_name << ": checking consistency" << std::endl;
2038 std::map<std::string,bufferlist> index;
2039 std::map<std::string, std::set<std::string> > sub_objs;
2040 librados::ObjectReadOperation oro;
2041 oro.omap_get_vals2("",LONG_MAX,&index, nullptr, &err);
2042 io_ctx.operate(index_name, &oro, NULL);
2043 if (err < 0){
2044 //probably because the index doesn't exist - this might be ok.
2045 for (librados::NObjectIterator oit = io_ctx.nobjects_begin();
2046 oit != io_ctx.nobjects_end(); ++oit) {
2047 //if this executes, there are floating objects.
2048 cerr << "Not consistent! found floating object " << oit->get_oid()
2049 << std::endl;
2050 ret = false;
2051 }
2052 return ret;
2053 }
2054
2055 std::map<std::string, string> parsed_index;
2056 std::set<std::string> onames;
2057 std::set<std::string> special_names;
2058 for (map<std::string,bufferlist>::iterator it = index.begin();
2059 it != index.end(); ++it) {
2060 if (it->first != "") {
2061 index_data idata;
2062 auto b = it->second.cbegin();
2063 idata.decode(b);
2064 if (idata.prefix != "") {
2065 for(vector<delete_data>::iterator dit = idata.to_delete.begin();
2066 dit != idata.to_delete.end(); ++dit) {
2067 librados::ObjectReadOperation oro;
2068 librados::AioCompletion * aioc = rados.aio_create_completion();
2069 bufferlist un;
2070 oro.getxattr("unwritable", &un, &err);
2071 io_ctx.aio_operate(dit->obj, aioc, &oro, NULL);
2072 aioc->wait_for_complete();
2073 err = aioc->get_return_value();
2074 if (ceph_clock_now() - idata.ts > timeout) {
2075 if (err < 0) {
2076 aioc->release();
2077 if (err == -ENOENT) {
2078 continue;
2079 } else {
2080 cerr << "Not consistent! reading object " << dit->obj
2081 << "returned " << err << std::endl;
2082 ret = false;
2083 break;
2084 }
2085 }
2086 if (atoi(string(un.c_str(), un.length()).c_str()) != 1 &&
2087 aioc->get_version64() != dit->version) {
2088 cerr << "Not consistent! object " << dit->obj << " has been "
2089 << " modified since the client died was not cleaned up."
2090 << std::endl;
2091 ret = false;
2092 }
2093 }
2094 special_names.insert(dit->obj);
2095 aioc->release();
2096 }
2097 for(vector<create_data >::iterator cit = idata.to_create.begin();
2098 cit != idata.to_create.end(); ++cit) {
2099 special_names.insert(cit->obj);
2100 }
2101 }
2102 parsed_index.insert(std::make_pair(it->first, idata.obj));
2103 onames.insert(idata.obj);
2104 }
2105 }
2106
2107 //make sure that an object exists iff it either is the index
2108 //or is listed in the index
2109 for (librados::NObjectIterator oit = io_ctx.nobjects_begin();
2110 oit != io_ctx.nobjects_end(); ++oit) {
2111 string name = oit->get_oid();
2112 if (name != index_name && onames.count(name) == 0
2113 && special_names.count(name) == 0) {
2114 cerr << "Not consistent! found floating object " << name << std::endl;
2115 ret = false;
2116 }
2117 }
2118
2119 //check objects
2120 string prev = "";
2121 for (std::map<std::string, string>::iterator it = parsed_index.begin();
2122 it != parsed_index.end();
2123 ++it) {
2124 librados::ObjectReadOperation read;
2125 read.omap_get_keys2("", LONG_MAX, &sub_objs[it->second], nullptr, &err);
2126 err = io_ctx.operate(it->second, &read, NULL);
2127 int size_int = (int)sub_objs[it->second].size();
2128
2129 //check that size is in the right range
2130 if (it->first != "1" && special_names.count(it->second) == 0 &&
2131 err != -ENOENT && (size_int > 2*k|| size_int < k)
2132 && parsed_index.size() > 1) {
2133 cerr << "Not consistent! Object " << *it << " has size " << size_int
2134 << ", which is outside the acceptable range." << std::endl;
2135 ret = false;
2136 }
2137
2138 //check that all keys belong in that object
2139 for(std::set<std::string>::iterator subit = sub_objs[it->second].begin();
2140 subit != sub_objs[it->second].end(); ++subit) {
2141 if ((it->first != "1"
2142 && *subit > it->first.substr(1,it->first.length()))
2143 || *subit <= prev) {
2144 cerr << "Not consistent! key " << *subit << " does not belong in "
2145 << *it << std::endl;
2146 cerr << "not last element, i.e. " << it->first << " not equal to 1? "
2147 << (it->first != "1") << std::endl
2148 << "greater than " << it->first.substr(1,it->first.length())
2149 <<"? " << (*subit > it->first.substr(1,it->first.length()))
2150 << std::endl
2151 << "less than or equal to " << prev << "? "
2152 << (*subit <= prev) << std::endl;
2153 ret = false;
2154 }
2155 }
2156
2157 prev = it->first.substr(1,it->first.length());
2158 }
2159
2160 if (!ret) {
2161 if (verbose) cout << "failed consistency test - see error log"
2162 << std::endl;
2163 cerr << str();
2164 } else {
2165 if (verbose) cout << "passed consistency test" << std::endl;
2166 }
2167 return ret;
2168 }
2169
2170 string KvFlatBtreeAsync::str() {
2171 stringstream ret;
2172 ret << "Top-level map:" << std::endl;
2173 int err = 0;
2174 std::set<std::string> keys;
2175 std::map<std::string,bufferlist> index;
2176 librados::ObjectReadOperation oro;
2177 librados::AioCompletion * top_aioc = rados.aio_create_completion();
2178 oro.omap_get_vals2("",LONG_MAX,&index, nullptr, &err);
2179 io_ctx.aio_operate(index_name, top_aioc, &oro, NULL);
2180 top_aioc->wait_for_complete();
2181 err = top_aioc->get_return_value();
2182 top_aioc->release();
2183 if (err < 0 && err != -5){
2184 if (verbose) cout << "getting keys failed with error " << err << std::endl;
2185 return ret.str();
2186 }
2187 if(index.empty()) {
2188 ret << "There are no objects!" << std::endl;
2189 return ret.str();
2190 }
2191
2192 for (map<std::string,bufferlist>::iterator it = index.begin();
2193 it != index.end(); ++it) {
2194 keys.insert(string(it->second.c_str(), it->second.length())
2195 .substr(1,it->second.length()));
2196 }
2197
2198 vector<std::string> all_names;
2199 vector<int> all_sizes(index.size());
2200 vector<int> all_versions(index.size());
2201 vector<bufferlist> all_unwrit(index.size());
2202 vector<map<std::string,bufferlist> > all_maps(keys.size());
2203 vector<map<std::string,bufferlist>::iterator> its(keys.size());
2204 unsigned done = 0;
2205 vector<bool> dones(keys.size());
2206 ret << std::endl << string(150,'-') << std::endl;
2207
2208 for (map<std::string,bufferlist>::iterator it = index.begin();
2209 it != index.end(); ++it){
2210 index_data idata;
2211 auto b = it->second.cbegin();
2212 idata.decode(b);
2213 string s = idata.str();
2214 ret << "|" << string((148 -
2215 ((*it).first.length()+s.length()+3))/2,' ');
2216 ret << (*it).first;
2217 ret << " | ";
2218 ret << string(idata.str());
2219 ret << string((148 -
2220 ((*it).first.length()+s.length()+3))/2,' ');
2221 ret << "|\t";
2222 all_names.push_back(idata.obj);
2223 ret << std::endl << string(150,'-') << std::endl;
2224 }
2225
2226 int indexer = 0;
2227
2228 //get the object names and sizes
2229 for(vector<std::string>::iterator it = all_names.begin(); it
2230 != all_names.end();
2231 ++it) {
2232 librados::ObjectReadOperation oro;
2233 librados::AioCompletion *aioc = rados.aio_create_completion();
2234 oro.omap_get_vals2("", LONG_MAX, &all_maps[indexer], nullptr, &err);
2235 oro.getxattr("unwritable", &all_unwrit[indexer], &err);
2236 io_ctx.aio_operate(*it, aioc, &oro, NULL);
2237 aioc->wait_for_complete();
2238 if (aioc->get_return_value() < 0) {
2239 ret << "reading" << *it << "failed: " << err << std::endl;
2240 //return ret.str();
2241 }
2242 all_sizes[indexer] = all_maps[indexer].size();
2243 all_versions[indexer] = aioc->get_version64();
2244 indexer++;
2245 aioc->release();
2246 }
2247
2248 ret << "///////////////////OBJECT NAMES////////////////" << std::endl;
2249 //HEADERS
2250 ret << std::endl;
2251 for (int i = 0; i < indexer; i++) {
2252 ret << "---------------------------\t";
2253 }
2254 ret << std::endl;
2255 for (int i = 0; i < indexer; i++) {
2256 ret << "|" << string((25 -
2257 (string("Bucket: ").length() + all_names[i].length()))/2, ' ');
2258 ret << "Bucket: " << all_names[i];
2259 ret << string((25 -
2260 (string("Bucket: ").length() + all_names[i].length()))/2, ' ') << "|\t";
2261 }
2262 ret << std::endl;
2263 for (int i = 0; i < indexer; i++) {
2264 its[i] = all_maps[i].begin();
2265 ret << "|" << string((25 - (string("size: ").length()
2266 + to_string("",all_sizes[i]).length()))/2, ' ');
2267 ret << "size: " << all_sizes[i];
2268 ret << string((25 - (string("size: ").length()
2269 + to_string("",all_sizes[i]).length()))/2, ' ') << "|\t";
2270 }
2271 ret << std::endl;
2272 for (int i = 0; i < indexer; i++) {
2273 its[i] = all_maps[i].begin();
2274 ret << "|" << string((25 - (string("version: ").length()
2275 + to_string("",all_versions[i]).length()))/2, ' ');
2276 ret << "version: " << all_versions[i];
2277 ret << string((25 - (string("version: ").length()
2278 + to_string("",all_versions[i]).length()))/2, ' ') << "|\t";
2279 }
2280 ret << std::endl;
2281 for (int i = 0; i < indexer; i++) {
2282 its[i] = all_maps[i].begin();
2283 ret << "|" << string((25 - (string("unwritable? ").length()
2284 + 1))/2, ' ');
2285 ret << "unwritable? " << string(all_unwrit[i].c_str(),
2286 all_unwrit[i].length());
2287 ret << string((25 - (string("unwritable? ").length()
2288 + 1))/2, ' ') << "|\t";
2289 }
2290 ret << std::endl;
2291 for (int i = 0; i < indexer; i++) {
2292 ret << "---------------------------\t";
2293 }
2294 ret << std::endl;
2295 ret << "///////////////////THE ACTUAL BLOCKS////////////////" << std::endl;
2296
2297
2298 ret << std::endl;
2299 for (int i = 0; i < indexer; i++) {
2300 ret << "---------------------------\t";
2301 }
2302 ret << std::endl;
2303 //each time through this part is two lines
2304 while(done < keys.size()) {
2305 for(int i = 0; i < indexer; i++) {
2306 if(dones[i]){
2307 ret << " \t";
2308 } else {
2309 if (its[i] == all_maps[i].end()){
2310 done++;
2311 dones[i] = true;
2312 ret << " \t";
2313 } else {
2314 ret << "|" << string((25 -
2315 ((*its[i]).first.length()+its[i]->second.length()+3))/2,' ');
2316 ret << (*its[i]).first;
2317 ret << " | ";
2318 ret << string(its[i]->second.c_str(), its[i]->second.length());
2319 ret << string((25 -
2320 ((*its[i]).first.length()+its[i]->second.length()+3))/2,' ');
2321 ret << "|\t";
2322 ++(its[i]);
2323 }
2324
2325 }
2326 }
2327 ret << std::endl;
2328 for (int i = 0; i < indexer; i++) {
2329 if(dones[i]){
2330 ret << " \t";
2331 } else {
2332 ret << "---------------------------\t";
2333 }
2334 }
2335 ret << std::endl;
2336
2337 }
2338 return ret.str();
2339 }