2 * Key-value store using librados
6 * eleanor.cawthon@inktank.com
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
14 #include "include/compat.h"
15 #include "key_value_store/key_value_structure.h"
16 #include "key_value_store/kv_flat_btree_async.h"
17 #include "key_value_store/kvs_arg_types.h"
18 #include "include/rados/librados.hpp"
19 #include "common/ceph_context.h"
20 #include "common/Clock.h"
21 #include "include/types.h"
33 using ceph::bufferlist
;
35 bool index_data::is_timed_out(utime_t now
, utime_t timeout
) const {
36 return prefix
!= "" && now
- ts
> timeout
;
39 void IndexCache::clear() {
44 void IndexCache::push(const string
&key
, const index_data
&idata
) {
45 if (cache_size
== 0) {
49 map
<key_data
, pair
<index_data
, utime_t
> >::iterator old_it
=
50 k2itmap
.lower_bound(key_data(key
));
51 if (old_it
!= k2itmap
.end()) {
52 t2kmap
.erase(old_it
->second
.second
);
53 k2itmap
.erase(old_it
);
55 map
<key_data
, pair
<index_data
, utime_t
> >::iterator new_it
=
56 k2itmap
.find(idata
.kdata
);
57 if (new_it
!= k2itmap
.end()) {
58 utime_t old_time
= new_it
->second
.second
;
59 t2kmap
.erase(old_time
);
61 utime_t time
= ceph_clock_now();
62 k2itmap
[idata
.kdata
] = make_pair(idata
, time
);
63 t2kmap
[time
] = idata
.kdata
;
64 if ((int)k2itmap
.size() > cache_size
) {
70 void IndexCache::push(const index_data
&idata
) {
71 if (cache_size
== 0) {
74 if (k2itmap
.count(idata
.kdata
) > 0) {
75 utime_t old_time
= k2itmap
[idata
.kdata
].second
;
76 t2kmap
.erase(old_time
);
77 k2itmap
.erase(idata
.kdata
);
79 utime_t time
= ceph_clock_now();
80 k2itmap
[idata
.kdata
] = make_pair(idata
, time
);
81 t2kmap
[time
] = idata
.kdata
;
82 if ((int)k2itmap
.size() > cache_size
) {
87 void IndexCache::pop() {
88 if (cache_size
== 0) {
91 map
<utime_t
, key_data
>::iterator it
= t2kmap
.begin();
92 utime_t time
= it
->first
;
93 key_data kdata
= it
->second
;
98 void IndexCache::erase(key_data kdata
) {
99 if (cache_size
== 0) {
102 if (k2itmap
.count(kdata
) > 0) {
103 utime_t c
= k2itmap
[kdata
].second
;
104 k2itmap
.erase(kdata
);
109 int IndexCache::get(const string
&key
, index_data
*idata
) const {
110 if (cache_size
== 0) {
113 if ((int)k2itmap
.size() == 0) {
116 map
<key_data
, pair
<index_data
, utime_t
> >::const_iterator it
=
117 k2itmap
.lower_bound(key_data(key
));
118 if (it
== k2itmap
.end() || !(it
->second
.first
.min_kdata
< key_data(key
))) {
121 *idata
= it
->second
.first
;
126 int IndexCache::get(const string
&key
, index_data
*idata
,
127 index_data
*next_idata
) const {
128 if (cache_size
== 0) {
131 map
<key_data
, pair
<index_data
, utime_t
> >::const_iterator it
=
132 k2itmap
.lower_bound(key_data(key
));
133 if (it
== k2itmap
.end() || ++it
== k2itmap
.end()) {
137 if (!(it
->second
.first
.min_kdata
< key_data(key
))){
138 //stale, should be reread.
141 *idata
= it
->second
.first
;
143 if (it
!= k2itmap
.end()) {
144 *next_idata
= it
->second
.first
;
151 int KvFlatBtreeAsync::nothing() {
155 int KvFlatBtreeAsync::wait() {
156 if (rand() % 10 == 0) {
162 int KvFlatBtreeAsync::suicide() {
163 if (rand() % 10 == 0) {
164 if (verbose
) cout
<< client_name
<< " is suiciding" << std::endl
;
170 int KvFlatBtreeAsync::next(const index_data
&idata
, index_data
* out_data
)
172 if (verbose
) cout
<< "\t\t" << client_name
<< "-next: finding next of "
176 librados::ObjectReadOperation oro
;
177 std::map
<std::string
, bufferlist
> kvs
;
178 oro
.omap_get_vals2(idata
.kdata
.encoded(),1,&kvs
, nullptr, &err
);
179 err
= io_ctx
.operate(index_name
, &oro
, NULL
);
181 if (verbose
) cout
<< "\t\t\t" << client_name
182 << "-next: getting index failed with error "
187 out_data
->kdata
.parse(kvs
.begin()->first
);
188 auto b
= kvs
.begin()->second
.cbegin();
190 if (idata
.is_timed_out(ceph_clock_now(), timeout
)) {
191 if (verbose
) cout
<< client_name
<< " THINKS THE OTHER CLIENT DIED."
193 //the client died after deleting the object. clean up.
202 int KvFlatBtreeAsync::prev(const index_data
&idata
, index_data
* out_data
)
204 if (verbose
) cout
<< "\t\t" << client_name
<< "-prev: finding prev of "
205 << idata
.str() << std::endl
;
208 idata_from_idata_args in_args
;
209 in_args
.idata
= idata
;
210 in_args
.encode(inbl
);
212 err
= io_ctx
.exec(index_name
,"kvs", "get_prev_idata", inbl
, outbl
);
214 if (verbose
) cout
<< "\t\t\t" << client_name
215 << "-prev: getting index failed with error "
217 if (idata
.is_timed_out(ceph_clock_now(), timeout
)) {
218 if (verbose
) cout
<< client_name
<< " THINKS THE OTHER CLIENT DIED."
220 //the client died after deleting the object. clean up.
221 err
= cleanup(idata
, err
);
222 if (err
== -ESUICIDE
) {
230 auto it
= outbl
.cbegin();
232 *out_data
= in_args
.next_idata
;
233 if (verbose
) cout
<< "\t\t" << client_name
<< "-prev: prev is "
239 int KvFlatBtreeAsync::read_index(const string
&key
, index_data
* idata
,
240 index_data
* next_idata
, bool force_update
) {
243 if (verbose
) cout
<< "\t" << client_name
244 << "-read_index: getting index_data for " << key
245 << " from cache" << std::endl
;
247 if (next_idata
!= NULL
) {
248 err
= icache
.get(key
, idata
, next_idata
);
250 err
= icache
.get(key
, idata
);
252 icache_lock
.unlock();
255 //if (verbose) cout << "CACHE SUCCESS" << std::endl;
258 if (verbose
) cout
<< "NOT IN CACHE" << std::endl
;
262 if (verbose
) cout
<< "\t" << client_name
263 << "-read_index: getting index_data for " << key
264 << " from object" << std::endl
;
265 librados::ObjectReadOperation oro
;
267 std::set
<std::string
> key_set
;
268 key_set
.insert(key_data(key
).encoded());
269 std::map
<std::string
, bufferlist
> kvmap
;
270 std::map
<std::string
, bufferlist
> dupmap
;
271 oro
.omap_get_vals_by_keys(key_set
, &dupmap
, &err
);
272 oro
.omap_get_vals2(key_data(key
).encoded(),
273 (cache_size
/ cache_refresh
>= 2? cache_size
/ cache_refresh
: 2),
274 &kvmap
, nullptr, &err
);
275 err
= io_ctx
.operate(index_name
, &oro
, NULL
);
276 utime_t mytime
= ceph_clock_now();
278 cerr
<< "\t" << client_name
279 << "-read_index: getting keys failed with "
281 ceph_abort_msg(client_name
+ "-read_index: reading index failed");
284 kvmap
.insert(dupmap
.begin(), dupmap
.end());
285 for (map
<string
, bufferlist
>::iterator it
= ++kvmap
.begin();
288 bufferlist bl
= it
->second
;
289 auto blit
= bl
.cbegin();
290 index_data this_idata
;
291 this_idata
.decode(blit
);
292 if (this_idata
.is_timed_out(mytime
, timeout
)) {
293 if (verbose
) cout
<< client_name
294 << " THINKS THE OTHER CLIENT DIED. (mytime is "
295 << mytime
.sec() << "." << mytime
.usec() << ", idata.ts is "
296 << this_idata
.ts
.sec() << "." << this_idata
.ts
.usec()
297 << ", it has been " << (mytime
- this_idata
.ts
).sec()
298 << '.' << (mytime
- this_idata
.ts
).usec()
299 << ", timeout is " << timeout
<< ")" << std::endl
;
300 //the client died after deleting the object. clean up.
301 if (cleanup(this_idata
, -EPREFIX
) == -ESUICIDE
) {
304 return read_index(key
, idata
, next_idata
, force_update
);
306 std::scoped_lock l
{icache_lock
};
307 icache
.push(this_idata
);
309 auto b
= kvmap
.begin()->second
.cbegin();
311 idata
->kdata
.parse(kvmap
.begin()->first
);
312 if (verbose
) cout
<< "\t" << client_name
<< "-read_index: kvmap_size is "
314 << ", idata is " << idata
->str() << std::endl
;
316 ceph_assert(idata
->obj
!= "");
318 icache
.push(key
, *idata
);
319 icache_lock
.unlock();
321 if (next_idata
!= NULL
&& idata
->kdata
.prefix
!= "1") {
322 next_idata
->kdata
.parse((++kvmap
.begin())->first
);
323 auto nb
= (++kvmap
.begin())->second
.cbegin();
324 next_idata
->decode(nb
);
325 std::scoped_lock l
{icache_lock
};
326 icache
.push(*next_idata
);
331 int KvFlatBtreeAsync::split(const index_data
&idata
) {
335 if (idata
.prefix
!= "") {
340 args
.bound
= 2 * k
- 1;
341 args
.comparator
= CEPH_OSD_CMPXATTR_OP_GT
;
342 err
= read_object(idata
.obj
, &args
);
343 args
.odata
.max_kdata
= idata
.kdata
;
345 if (verbose
) cout
<< "\t\t" << client_name
<< "-split: read object "
347 << " got " << err
<< std::endl
;
351 if (verbose
) cout
<< "\t\t" << client_name
<< "-split: splitting "
353 << ", which has size " << args
.odata
.size
354 << " and actual size " << args
.odata
.omap
.size() << std::endl
;
356 ///////preparations that happen outside the critical section
358 vector
<object_data
> to_create
;
359 vector
<object_data
> to_delete
;
360 to_delete
.push_back(object_data(idata
.min_kdata
,
361 args
.odata
.max_kdata
, args
.odata
.name
, args
.odata
.version
));
363 //for lower half object
364 map
<std::string
, bufferlist
>::const_iterator it
= args
.odata
.omap
.begin();
365 client_index_lock
.lock();
366 to_create
.push_back(object_data(to_string(client_name
, client_index
++)));
367 client_index_lock
.unlock();
368 for (int i
= 0; i
< k
; i
++) {
369 to_create
[0].omap
.insert(*it
);
372 to_create
[0].min_kdata
= idata
.min_kdata
;
373 to_create
[0].max_kdata
= key_data(to_create
[0].omap
.rbegin()->first
);
375 //for upper half object
376 client_index_lock
.lock();
377 to_create
.push_back(object_data(to_create
[0].max_kdata
,
378 args
.odata
.max_kdata
,
379 to_string(client_name
, client_index
++)));
380 client_index_lock
.unlock();
381 to_create
[1].omap
.insert(
382 ++args
.odata
.omap
.find(to_create
[0].omap
.rbegin()->first
),
383 args
.odata
.omap
.end());
385 //setting up operations
386 librados::ObjectWriteOperation owos
[6];
387 vector
<pair
<pair
<int, string
>, librados::ObjectWriteOperation
*> > ops
;
389 set_up_prefix_index(to_create
, to_delete
, &owos
[0], &out_data
, &err
);
390 ops
.push_back(make_pair(
391 pair
<int, string
>(ADD_PREFIX
, index_name
),
393 for (int i
= 1; i
< 6; i
++) {
394 ops
.push_back(make_pair(make_pair(0,""), &owos
[i
]));
396 set_up_ops(to_create
, to_delete
, &ops
, out_data
, &err
);
398 /////BEGIN CRITICAL SECTION/////
399 //put prefix on index entry for idata.val
400 err
= perform_ops("\t\t" + client_name
+ "-split:", out_data
, &ops
);
404 if (verbose
) cout
<< "\t\t" << client_name
<< "-split: done splitting."
406 /////END CRITICAL SECTION/////
408 for (vector
<delete_data
>::iterator it
= out_data
.to_delete
.begin();
409 it
!= out_data
.to_delete
.end(); ++it
) {
410 icache
.erase(it
->max
);
412 for (vector
<create_data
>::iterator it
= out_data
.to_create
.begin();
413 it
!= out_data
.to_create
.end(); ++it
) {
414 icache
.push(index_data(*it
));
416 icache_lock
.unlock();
420 int KvFlatBtreeAsync::rebalance(const index_data
&idata1
,
421 const index_data
&next_idata
){
425 if (idata1
.prefix
!= "") {
429 rebalance_args args1
;
431 args1
.comparator
= CEPH_OSD_CMPXATTR_OP_LT
;
432 index_data idata2
= next_idata
;
434 rebalance_args args2
;
436 args2
.comparator
= CEPH_OSD_CMPXATTR_OP_LT
;
438 if (idata1
.kdata
.prefix
== "1") {
439 //this is the highest key in the index, so it doesn't have a next.
441 //read the index for the previous entry
442 err
= prev(idata1
, &idata2
);
443 if (err
== -ERANGE
) {
444 if (verbose
) cout
<< "\t\t" << client_name
445 << "-rebalance: this is the only node, "
446 << "so aborting" << std::endl
;
448 } else if (err
< 0) {
452 //read the first object
453 err
= read_object(idata1
.obj
, &args2
);
455 if (verbose
) cout
<< "reading " << idata1
.obj
<< " failed with " << err
457 if (err
== -ENOENT
) {
462 args2
.odata
.min_kdata
= idata1
.min_kdata
;
463 args2
.odata
.max_kdata
= idata1
.kdata
;
465 //read the second object
466 args1
.bound
= 2 * k
+ 1;
467 err
= read_object(idata2
.obj
, &args1
);
469 if (verbose
) cout
<< "reading " << idata1
.obj
<< " failed with " << err
473 args1
.odata
.min_kdata
= idata2
.min_kdata
;
474 args1
.odata
.max_kdata
= idata2
.kdata
;
476 if (verbose
) cout
<< "\t\t" << client_name
<< "-rebalance: read "
478 << ". size: " << args1
.odata
.size
<< " version: "
479 << args1
.odata
.version
482 assert (next_idata
.obj
!= "");
483 //there is a next key, so get it.
484 err
= read_object(idata1
.obj
, &args1
);
486 if (verbose
) cout
<< "reading " << idata1
.obj
<< " failed with " << err
490 args1
.odata
.min_kdata
= idata1
.min_kdata
;
491 args1
.odata
.max_kdata
= idata1
.kdata
;
493 args2
.bound
= 2 * k
+ 1;
494 err
= read_object(idata2
.obj
, &args2
);
496 if (verbose
) cout
<< "reading " << idata1
.obj
<< " failed with " << err
498 if (err
== -ENOENT
) {
503 args2
.odata
.min_kdata
= idata2
.min_kdata
;
504 args2
.odata
.max_kdata
= idata2
.kdata
;
506 if (verbose
) cout
<< "\t\t" << client_name
<< "-rebalance: read "
508 << ". size: " << args2
.odata
.size
<< " version: "
509 << args2
.odata
.version
513 if (verbose
) cout
<< "\t\t" << client_name
<< "-rebalance: o1 is "
514 << args1
.odata
.max_kdata
.encoded() << ","
515 << args1
.odata
.name
<< " with size " << args1
.odata
.size
516 << " , o2 is " << args2
.odata
.max_kdata
.encoded()
517 << "," << args2
.odata
.name
<< " with size " << args2
.odata
.size
521 if ((int)args1
.odata
.size
> k
&& (int)args1
.odata
.size
<= 2*k
522 && (int)args2
.odata
.size
> k
523 && (int)args2
.odata
.size
<= 2*k
) {
525 if (verbose
) cout
<< "\t\t" << client_name
526 << "-rebalance: both sizes in range, so"
527 << " aborting " << std::endl
;
529 } else if (idata1
.prefix
!= "" || idata2
.prefix
!= "") {
533 //this is the high object. it gets created regardless of rebalance or merge.
534 client_index_lock
.lock();
535 string o2w
= to_string(client_name
, client_index
++);
536 client_index_lock
.unlock();
538 vector
<object_data
> to_create
;
539 vector
<object_data
> to_delete
;
540 librados::ObjectWriteOperation create
[2];//possibly only 1 will be used
541 librados::ObjectWriteOperation other_ops
[6];
542 vector
<pair
<pair
<int, string
>, librados::ObjectWriteOperation
*> > ops
;
543 ops
.push_back(make_pair(
544 pair
<int, string
>(ADD_PREFIX
, index_name
),
547 if ((int)args1
.odata
.size
+ (int)args2
.odata
.size
<= 2*k
) {
549 if (verbose
) cout
<< "\t\t" << client_name
<< "-rebalance: merging "
551 << " and " << args2
.odata
.name
<< " to get " << o2w
553 map
<string
, bufferlist
> write2_map
;
554 write2_map
.insert(args1
.odata
.omap
.begin(), args1
.odata
.omap
.end());
555 write2_map
.insert(args2
.odata
.omap
.begin(), args2
.odata
.omap
.end());
556 to_create
.push_back(object_data(args1
.odata
.min_kdata
,
557 args2
.odata
.max_kdata
, o2w
, write2_map
));
558 ops
.push_back(make_pair(
559 pair
<int, string
>(MAKE_OBJECT
, o2w
),
561 ceph_assert((int)write2_map
.size() <= 2*k
);
564 if (verbose
) cout
<< "\t\t" << client_name
<< "-rebalance: rebalancing "
566 << " and " << args2
.odata
.name
<< std::endl
;
567 map
<std::string
, bufferlist
> write1_map
;
568 map
<std::string
, bufferlist
> write2_map
;
569 map
<std::string
, bufferlist
>::iterator it
;
570 client_index_lock
.lock();
571 string o1w
= to_string(client_name
, client_index
++);
572 client_index_lock
.unlock();
573 int target_size_1
= ceil(((int)args1
.odata
.size
+ (int)args2
.odata
.size
)
575 if (args1
.odata
.max_kdata
!= idata1
.kdata
) {
576 //this should be true if idata1 is the high object
577 target_size_1
= floor(((int)args1
.odata
.size
+ (int)args2
.odata
.size
)
580 for (it
= args1
.odata
.omap
.begin();
581 it
!= args1
.odata
.omap
.end() && (int)write1_map
.size()
584 write1_map
.insert(*it
);
586 if (it
!= args1
.odata
.omap
.end()){
587 //write1_map is full, so put the rest in write2_map
588 write2_map
.insert(it
, args1
.odata
.omap
.end());
589 write2_map
.insert(args2
.odata
.omap
.begin(), args2
.odata
.omap
.end());
591 //args1.odata.omap was small, and write2_map still needs more
592 map
<std::string
, bufferlist
>::iterator it2
;
593 for(it2
= args2
.odata
.omap
.begin();
594 (it2
!= args2
.odata
.omap
.end()) && ((int)write1_map
.size()
597 write1_map
.insert(*it2
);
599 write2_map
.insert(it2
, args2
.odata
.omap
.end());
601 if (verbose
) cout
<< "\t\t" << client_name
602 << "-rebalance: write1_map has size "
603 << write1_map
.size() << ", write2_map.size() is " << write2_map
.size()
605 //at this point, write1_map and write2_map should have the correct pairs
606 to_create
.push_back(object_data(args1
.odata
.min_kdata
,
607 key_data(write1_map
.rbegin()->first
),
609 to_create
.push_back(object_data( key_data(write1_map
.rbegin()->first
),
610 args2
.odata
.max_kdata
, o2w
, write2_map
));
611 ops
.push_back(make_pair(
612 pair
<int, string
>(MAKE_OBJECT
, o1w
),
614 ops
.push_back(make_pair(
615 pair
<int, string
>(MAKE_OBJECT
, o2w
),
619 to_delete
.push_back(object_data(args1
.odata
.min_kdata
,
620 args1
.odata
.max_kdata
, args1
.odata
.name
, args1
.odata
.version
));
621 to_delete
.push_back(object_data(args2
.odata
.min_kdata
,
622 args2
.odata
.max_kdata
, args2
.odata
.name
, args2
.odata
.version
));
623 for (int i
= 1; i
< 6; i
++) {
624 ops
.push_back(make_pair(make_pair(0,""), &other_ops
[i
]));
628 set_up_prefix_index(to_create
, to_delete
, &other_ops
[0], &out_data
, &err
);
629 set_up_ops(to_create
, to_delete
, &ops
, out_data
, &err
);
631 //at this point, all operations should be completely set up.
632 /////BEGIN CRITICAL SECTION/////
633 err
= perform_ops("\t\t" + client_name
+ "-rebalance:", out_data
, &ops
);
638 for (vector
<delete_data
>::iterator it
= out_data
.to_delete
.begin();
639 it
!= out_data
.to_delete
.end(); ++it
) {
640 icache
.erase(it
->max
);
642 for (vector
<create_data
>::iterator it
= out_data
.to_create
.begin();
643 it
!= out_data
.to_create
.end(); ++it
) {
644 icache
.push(index_data(*it
));
646 icache_lock
.unlock();
647 if (verbose
) cout
<< "\t\t" << client_name
<< "-rebalance: done rebalancing."
649 /////END CRITICAL SECTION/////
653 int KvFlatBtreeAsync::read_object(const string
&obj
, object_data
* odata
) {
654 librados::ObjectReadOperation get_obj
;
655 librados::AioCompletion
* obj_aioc
= rados
.aio_create_completion();
659 get_obj
.omap_get_vals2("", LONG_MAX
, &odata
->omap
, nullptr, &err
);
660 get_obj
.getxattr("unwritable", &unw_bl
, &err
);
661 io_ctx
.aio_operate(obj
, obj_aioc
, &get_obj
, NULL
);
662 obj_aioc
->wait_for_complete();
663 err
= obj_aioc
->get_return_value();
665 //possibly -ENOENT, meaning someone else deleted it.
669 odata
->unwritable
= string(unw_bl
.c_str(), unw_bl
.length()) == "1";
670 odata
->version
= obj_aioc
->get_version64();
671 odata
->size
= odata
->omap
.size();
676 int KvFlatBtreeAsync::read_object(const string
&obj
, rebalance_args
* args
) {
681 librados::AioCompletion
* a
= rados
.aio_create_completion();
682 io_ctx
.aio_exec(obj
, a
, "kvs", "maybe_read_for_balance", inbl
, &outbl
);
683 a
->wait_for_complete();
684 err
= a
->get_return_value();
686 if (verbose
) cout
<< "\t\t" << client_name
687 << "-read_object: reading failed with "
692 auto it
= outbl
.cbegin();
694 args
->odata
.name
= obj
;
695 args
->odata
.version
= a
->get_version64();
700 void KvFlatBtreeAsync::set_up_prefix_index(
701 const vector
<object_data
> &to_create
,
702 const vector
<object_data
> &to_delete
,
703 librados::ObjectWriteOperation
* owo
,
706 std::map
<std::string
, pair
<bufferlist
, int> > assertions
;
707 map
<string
, bufferlist
> to_insert
;
709 idata
->ts
= ceph_clock_now();
710 for(vector
<object_data
>::const_iterator it
= to_create
.begin();
711 it
!= to_create
.end();
713 create_data
c(it
->min_kdata
, it
->max_kdata
, it
->name
);
714 idata
->to_create
.push_back(c
);
716 for(vector
<object_data
>::const_iterator it
= to_delete
.begin();
717 it
!= to_delete
.end();
719 delete_data
d(it
->min_kdata
, it
->max_kdata
, it
->name
, it
->version
);
720 idata
->to_delete
.push_back(d
);
722 for(vector
<object_data
>::const_iterator it
= to_delete
.begin();
723 it
!= to_delete
.end();
725 idata
->obj
= it
->name
;
726 idata
->min_kdata
= it
->min_kdata
;
727 idata
->kdata
= it
->max_kdata
;
729 idata
->encode(insert
);
730 to_insert
[it
->max_kdata
.encoded()] = insert
;
731 index_data this_entry
;
732 this_entry
.min_kdata
= idata
->min_kdata
;
733 this_entry
.kdata
= idata
->kdata
;
734 this_entry
.obj
= idata
->obj
;
735 assertions
[it
->max_kdata
.encoded()] = pair
<bufferlist
, int>
736 (to_bl(this_entry
), CEPH_OSD_CMPXATTR_OP_EQ
);
737 if (verbose
) cout
<< "\t\t\t" << client_name
738 << "-setup_prefix: will assert "
739 << this_entry
.str() << std::endl
;
741 ceph_assert(*err
== 0);
742 owo
->omap_cmp(assertions
, err
);
743 if (to_create
.size() <= 2) {
744 owo
->omap_set(to_insert
);
748 //some args can be null if there are no corresponding entries in p
749 void KvFlatBtreeAsync::set_up_ops(
750 const vector
<object_data
> &create_vector
,
751 const vector
<object_data
> &delete_vector
,
752 vector
<pair
<pair
<int, string
>, librados::ObjectWriteOperation
*> > * ops
,
753 const index_data
&idata
,
755 vector
<pair
<pair
<int, string
>,
756 librados::ObjectWriteOperation
* > >::iterator it
;
758 //skip the prefixing part
759 for(it
= ops
->begin(); it
->first
.first
== ADD_PREFIX
; ++it
) {}
760 map
<string
, bufferlist
> to_insert
;
761 std::set
<string
> to_remove
;
762 map
<string
, pair
<bufferlist
, int> > assertions
;
763 if (create_vector
.size() > 0) {
764 for (int i
= 0; i
< (int)idata
.to_delete
.size(); ++i
) {
765 it
->first
= pair
<int, string
>(UNWRITE_OBJECT
, idata
.to_delete
[i
].obj
);
766 set_up_unwrite_object(delete_vector
[i
].version
, it
->second
);
770 for (int i
= 0; i
< (int)idata
.to_create
.size(); ++i
) {
771 index_data
this_entry(idata
.to_create
[i
].max
, idata
.to_create
[i
].min
,
772 idata
.to_create
[i
].obj
);
773 to_insert
[idata
.to_create
[i
].max
.encoded()] = to_bl(this_entry
);
774 if (idata
.to_create
.size() <= 2) {
775 it
->first
= pair
<int, string
>(MAKE_OBJECT
, idata
.to_create
[i
].obj
);
777 it
->first
= pair
<int, string
>(AIO_MAKE_OBJECT
, idata
.to_create
[i
].obj
);
779 set_up_make_object(create_vector
[i
].omap
, it
->second
);
782 for (int i
= 0; i
< (int)idata
.to_delete
.size(); ++i
) {
783 index_data this_entry
= idata
;
784 this_entry
.obj
= idata
.to_delete
[i
].obj
;
785 this_entry
.min_kdata
= idata
.to_delete
[i
].min
;
786 this_entry
.kdata
= idata
.to_delete
[i
].max
;
787 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-setup_ops: will assert "
788 << this_entry
.str() << std::endl
;
789 assertions
[idata
.to_delete
[i
].max
.encoded()] = pair
<bufferlist
, int>(
790 to_bl(this_entry
), CEPH_OSD_CMPXATTR_OP_EQ
);
791 to_remove
.insert(idata
.to_delete
[i
].max
.encoded());
792 it
->first
= pair
<int, string
>(REMOVE_OBJECT
, idata
.to_delete
[i
].obj
);
793 set_up_delete_object(it
->second
);
796 if ((int)idata
.to_create
.size() <= 2) {
797 it
->second
->omap_cmp(assertions
, err
);
799 it
->second
->omap_rm_keys(to_remove
);
800 it
->second
->omap_set(to_insert
);
803 it
->first
= pair
<int, string
>(REMOVE_PREFIX
, index_name
);
806 void KvFlatBtreeAsync::set_up_make_object(
807 const map
<std::string
, bufferlist
> &to_set
,
808 librados::ObjectWriteOperation
*owo
) {
810 encode(to_set
, inbl
);
811 owo
->exec("kvs", "create_with_omap", inbl
);
814 void KvFlatBtreeAsync::set_up_unwrite_object(
815 const int &ver
, librados::ObjectWriteOperation
*owo
) {
817 owo
->assert_version(ver
);
819 owo
->cmpxattr("unwritable", CEPH_OSD_CMPXATTR_OP_EQ
, to_bl("0"));
820 owo
->setxattr("unwritable", to_bl("1"));
823 void KvFlatBtreeAsync::set_up_restore_object(
824 librados::ObjectWriteOperation
*owo
) {
825 owo
->cmpxattr("unwritable", CEPH_OSD_CMPXATTR_OP_EQ
, to_bl("1"));
826 owo
->setxattr("unwritable", to_bl("0"));
829 void KvFlatBtreeAsync::set_up_delete_object(
830 librados::ObjectWriteOperation
*owo
) {
831 owo
->cmpxattr("unwritable", CEPH_OSD_CMPXATTR_OP_EQ
, to_bl("1"));
835 int KvFlatBtreeAsync::perform_ops(const string
&debug_prefix
,
836 const index_data
&idata
,
837 vector
<pair
<pair
<int, string
>, librados::ObjectWriteOperation
*> > *ops
) {
839 vector
<librados::AioCompletion
*> aiocs(idata
.to_create
.size());
841 for (vector
<pair
<pair
<int, string
>,
842 librados::ObjectWriteOperation
*> >::iterator it
= ops
->begin();
843 it
!= ops
->end(); ++it
) {
844 if ((((KeyValueStructure
*)this)->*KvFlatBtreeAsync::interrupt
)() == 1 ) {
847 switch (it
->first
.first
) {
848 case ADD_PREFIX
://prefixing
849 if (verbose
) cout
<< debug_prefix
<< " adding prefix" << std::endl
;
850 err
= io_ctx
.operate(index_name
, it
->second
);
852 if (verbose
) cout
<< debug_prefix
<< " prefixing the index failed with "
856 if (verbose
) cout
<< debug_prefix
<< " prefix added." << std::endl
;
858 case UNWRITE_OBJECT
://marking
859 if (verbose
) cout
<< debug_prefix
<< " marking " << it
->first
.second
861 err
= io_ctx
.operate(it
->first
.second
, it
->second
);
863 //most likely because it changed, in which case it will be -ERANGE
864 if (verbose
) cout
<< debug_prefix
<< " marking " << it
->first
.second
865 << "failed with code" << err
<< std::endl
;
866 if (it
->first
.second
== (*idata
.to_delete
.begin()).max
.encoded()) {
867 if (cleanup(idata
, -EFIRSTOBJ
) == -ESUICIDE
) {
871 if (cleanup(idata
, -ERANGE
) == -ESUICIDE
) {
877 if (verbose
) cout
<< debug_prefix
<< " marked " << it
->first
.second
880 case MAKE_OBJECT
://creating
881 if (verbose
) cout
<< debug_prefix
<< " creating " << it
->first
.second
883 err
= io_ctx
.operate(it
->first
.second
, it
->second
);
885 //this can happen if someone else was cleaning up after us.
886 if (verbose
) cout
<< debug_prefix
<< " creating " << it
->first
.second
888 << " with code " << err
<< std::endl
;
889 if (err
== -EEXIST
) {
890 //someone thinks we died, so die
891 if (verbose
) cout
<< client_name
<< " is suiciding!" << std::endl
;
898 if (verbose
|| idata
.to_create
.size() > 2) {
899 cout
<< debug_prefix
<< " created object " << it
->first
.second
903 case AIO_MAKE_OBJECT
:
904 cout
<< debug_prefix
<< " launching asynchronous create "
905 << it
->first
.second
<< std::endl
;
906 aiocs
[count
] = rados
.aio_create_completion();
907 io_ctx
.aio_operate(it
->first
.second
, aiocs
[count
], it
->second
);
909 if ((int)idata
.to_create
.size() == count
) {
910 cout
<< "starting aiowrite waiting loop" << std::endl
;
911 for (count
-= 1; count
>= 0; count
--) {
912 aiocs
[count
]->wait_for_complete();
913 err
= aiocs
[count
]->get_return_value();
915 //this can happen if someone else was cleaning up after us.
916 cerr
<< debug_prefix
<< " a create failed"
917 << " with code " << err
<< std::endl
;
918 if (err
== -EEXIST
) {
919 //someone thinks we died, so die
920 cerr
<< client_name
<< " is suiciding!" << std::endl
;
927 if (verbose
|| idata
.to_create
.size() > 2) {
928 cout
<< debug_prefix
<< " completed aio " << aiocs
.size() - count
929 << "/" << aiocs
.size() << std::endl
;
934 case REMOVE_OBJECT
://deleting
935 if (verbose
) cout
<< debug_prefix
<< " deleting " << it
->first
.second
937 err
= io_ctx
.operate(it
->first
.second
, it
->second
);
939 //if someone else called cleanup on this prefix first
940 if (verbose
) cout
<< debug_prefix
<< " deleting " << it
->first
.second
941 << "failed with code" << err
<< std::endl
;
943 if (verbose
) cout
<< debug_prefix
<< " deleted " << it
->first
.second
946 case REMOVE_PREFIX
://rewriting index
947 if (verbose
) cout
<< debug_prefix
<< " updating index " << std::endl
;
948 err
= io_ctx
.operate(index_name
, it
->second
);
950 if (verbose
) cout
<< debug_prefix
951 << " rewriting the index failed with code " << err
952 << ". someone else must have thought we died, so dying" << std::endl
;
955 if (verbose
) cout
<< debug_prefix
<< " updated index." << std::endl
;
958 if (verbose
) cout
<< debug_prefix
<< " restoring " << it
->first
.second
960 err
= io_ctx
.operate(it
->first
.second
, it
->second
);
962 if (verbose
) cout
<< debug_prefix
<< "restoring " << it
->first
.second
964 << " with " << err
<< std::endl
;
967 if (verbose
) cout
<< debug_prefix
<< " restored " << it
->first
.second
971 if (verbose
) cout
<< debug_prefix
<< " performing unknown op on "
974 err
= io_ctx
.operate(index_name
, it
->second
);
976 if (verbose
) cout
<< debug_prefix
<< " unknown op on "
978 << " failed with " << err
<< std::endl
;
981 if (verbose
) cout
<< debug_prefix
<< " unknown op on "
983 << " succeeded." << std::endl
;
991 int KvFlatBtreeAsync::cleanup(const index_data
&idata
, const int &error
) {
992 if (verbose
) cout
<< "\t\t" << client_name
<< ": cleaning up after "
996 ceph_assert(idata
.prefix
!= "");
997 map
<std::string
,bufferlist
> new_index
;
998 map
<std::string
, pair
<bufferlist
, int> > assertions
;
1001 //this happens if the split or rebalance failed to mark the first object,
1002 //meaning only the index needs to be changed.
1003 //restore objects that had been marked unwritable.
1004 for(vector
<delete_data
>::const_iterator it
=
1005 idata
.to_delete
.begin();
1006 it
!= idata
.to_delete
.end(); ++it
) {
1007 index_data this_entry
;
1008 this_entry
.obj
= (*it
).obj
;
1009 this_entry
.min_kdata
= it
->min
;
1010 this_entry
.kdata
= it
->max
;
1011 new_index
[it
->max
.encoded()] = to_bl(this_entry
);
1013 this_entry
.obj
= it
->obj
;
1014 this_entry
.min_kdata
= it
->min
;
1015 this_entry
.kdata
= it
->max
;
1016 if (verbose
) cout
<< "\t\t\t" << client_name
1017 << "-cleanup: will assert index contains "
1018 << this_entry
.str() << std::endl
;
1019 assertions
[it
->max
.encoded()] =
1020 pair
<bufferlist
, int>(to_bl(this_entry
),
1021 CEPH_OSD_CMPXATTR_OP_EQ
);
1025 librados::ObjectWriteOperation update_index
;
1026 update_index
.omap_cmp(assertions
, &err
);
1027 update_index
.omap_set(new_index
);
1028 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-cleanup: updating index"
1030 if ((((KeyValueStructure
*)this)->*KvFlatBtreeAsync::interrupt
)() == 1 ) {
1033 err
= io_ctx
.operate(index_name
, &update_index
);
1035 if (verbose
) cout
<< "\t\t\t" << client_name
1036 << "-cleanup: rewriting failed with "
1037 << err
<< ". returning -ECANCELED" << std::endl
;
1040 if (verbose
) cout
<< "\t\t\t" << client_name
1041 << "-cleanup: updated index. cleanup done."
1046 //this happens if a split or rebalance fails to mark an object. It is a
1047 //special case of rolling back that does not have to deal with new objects.
1049 //restore objects that had been marked unwritable.
1050 vector
<delete_data
>::const_iterator it
;
1051 for(it
= idata
.to_delete
.begin();
1052 it
!= idata
.to_delete
.end(); ++it
) {
1053 index_data this_entry
;
1054 this_entry
.obj
= (*it
).obj
;
1055 this_entry
.min_kdata
= it
->min
;
1056 this_entry
.kdata
= it
->max
;
1057 new_index
[it
->max
.encoded()] = to_bl(this_entry
);
1059 this_entry
.obj
= it
->obj
;
1060 this_entry
.min_kdata
= it
->min
;
1061 this_entry
.kdata
= it
->max
;
1062 if (verbose
) cout
<< "\t\t\t" << client_name
1063 << "-cleanup: will assert index contains "
1064 << this_entry
.str() << std::endl
;
1065 assertions
[it
->max
.encoded()] =
1066 pair
<bufferlist
, int>(to_bl(this_entry
),
1067 CEPH_OSD_CMPXATTR_OP_EQ
);
1069 it
= idata
.to_delete
.begin();
1070 librados::ObjectWriteOperation restore
;
1071 set_up_restore_object(&restore
);
1072 if ((((KeyValueStructure
*)this)->*KvFlatBtreeAsync::interrupt
)() == 1 ) {
1075 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-cleanup: restoring "
1078 err
= io_ctx
.operate(it
->obj
, &restore
);
1080 //i.e., -ECANCELED because the object was already restored by someone
1082 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-cleanup: restoring "
1084 << " failed with " << err
<< std::endl
;
1086 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-cleanup: restored "
1092 librados::ObjectWriteOperation update_index
;
1093 update_index
.omap_cmp(assertions
, &err
);
1094 update_index
.omap_set(new_index
);
1095 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-cleanup: updating index"
1097 if ((((KeyValueStructure
*)this)->*KvFlatBtreeAsync::interrupt
)() == 1 ) {
1100 err
= io_ctx
.operate(index_name
, &update_index
);
1102 if (verbose
) cout
<< "\t\t\t" << client_name
1103 << "-cleanup: rewriting failed with "
1104 << err
<< ". returning -ECANCELED" << std::endl
;
1107 if (verbose
) cout
<< "\t\t\t" << client_name
1108 << "-cleanup: updated index. cleanup done."
1113 if (verbose
) cout
<< "\t\t" << client_name
<< "-cleanup: rolling forward"
1115 //all changes were created except for updating the index and possibly
1116 //deleting the objects. roll forward.
1117 vector
<pair
<pair
<int, string
>, librados::ObjectWriteOperation
*> > ops
;
1118 vector
<librados::ObjectWriteOperation
> owos(idata
.to_delete
.size() + 1);
1119 for (int i
= 0; i
<= (int)idata
.to_delete
.size(); ++i
) {
1120 ops
.push_back(make_pair(pair
<int, string
>(0, ""), &owos
[i
]));
1122 set_up_ops(vector
<object_data
>(),
1123 vector
<object_data
>(), &ops
, idata
, &err
);
1124 err
= perform_ops("\t\t" + client_name
+ "-cleanup:", idata
, &ops
);
1126 if (err
== -ESUICIDE
) {
1129 if (verbose
) cout
<< "\t\t\t" << client_name
1130 << "-cleanup: rewriting failed with "
1131 << err
<< ". returning -ECANCELED" << std::endl
;
1134 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-cleanup: updated index"
1139 //roll back all changes.
1140 if (verbose
) cout
<< "\t\t" << client_name
<< "-cleanup: rolling back"
1142 map
<std::string
,bufferlist
> new_index
;
1143 std::set
<string
> to_remove
;
1144 map
<std::string
, pair
<bufferlist
, int> > assertions
;
1146 //mark the objects to be created. if someone else already has, die.
1147 for(vector
<create_data
>::const_reverse_iterator it
=
1148 idata
.to_create
.rbegin();
1149 it
!= idata
.to_create
.rend(); ++it
) {
1150 librados::ObjectWriteOperation rm
;
1151 set_up_unwrite_object(0, &rm
);
1152 if ((((KeyValueStructure
*)this)->*KvFlatBtreeAsync::interrupt
)() == 1 )
1156 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-cleanup: marking "
1159 err
= io_ctx
.operate(it
->obj
, &rm
);
1161 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-cleanup: marking "
1163 << " failed with " << err
<< std::endl
;
1165 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-cleanup: marked "
1171 //restore objects that had been marked unwritable.
1172 for(vector
<delete_data
>::const_iterator it
=
1173 idata
.to_delete
.begin();
1174 it
!= idata
.to_delete
.end(); ++it
) {
1175 index_data this_entry
;
1176 this_entry
.obj
= (*it
).obj
;
1177 this_entry
.min_kdata
= it
->min
;
1178 this_entry
.kdata
= it
->max
;
1179 new_index
[it
->max
.encoded()] = to_bl(this_entry
);
1181 this_entry
.obj
= it
->obj
;
1182 this_entry
.min_kdata
= it
->min
;
1183 this_entry
.kdata
= it
->max
;
1184 if (verbose
) cout
<< "\t\t\t" << client_name
1185 << "-cleanup: will assert index contains "
1186 << this_entry
.str() << std::endl
;
1187 assertions
[it
->max
.encoded()] =
1188 pair
<bufferlist
, int>(to_bl(this_entry
),
1189 CEPH_OSD_CMPXATTR_OP_EQ
);
1190 librados::ObjectWriteOperation restore
;
1191 set_up_restore_object(&restore
);
1192 if (verbose
) cout
<< "\t\t\t" << client_name
1193 << "-cleanup: will assert index contains "
1194 << this_entry
.str() << std::endl
;
1195 if ((((KeyValueStructure
*)this)->*KvFlatBtreeAsync::interrupt
)() == 1 )
1199 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-cleanup: restoring "
1202 err
= io_ctx
.operate(it
->obj
, &restore
);
1203 if (err
== -ENOENT
) {
1204 //it had gotten far enough to be rolled forward - unmark the objects
1206 if (verbose
) cout
<< "\t\t\t" << client_name
1207 << "-cleanup: roll forward instead"
1209 for(vector
<create_data
>::const_iterator cit
=
1210 idata
.to_create
.begin();
1211 cit
!= idata
.to_create
.end(); ++cit
) {
1212 librados::ObjectWriteOperation res
;
1213 set_up_restore_object(&res
);
1214 if ((((KeyValueStructure
*)this)->*KvFlatBtreeAsync::interrupt
)()
1218 if (verbose
) cout
<< "\t\t\t" << client_name
1219 << "-cleanup: restoring " << cit
->obj
1221 err
= io_ctx
.operate(cit
->obj
, &res
);
1223 if (verbose
) cout
<< "\t\t\t" << client_name
1224 << "-cleanup: restoring "
1225 << cit
->obj
<< " failed with " << err
<< std::endl
;
1227 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-cleanup: restored "
1231 return cleanup(idata
, -ENOENT
);
1232 } else if (err
< 0) {
1233 //i.e., -ECANCELED because the object was already restored by someone
1235 if (verbose
) cout
<< "\t\t\t" << client_name
1236 << "-cleanup: restoring " << it
->obj
1237 << " failed with " << err
<< std::endl
;
1239 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-cleanup: restored "
1245 //remove the new objects
1246 for(vector
<create_data
>::const_reverse_iterator it
=
1247 idata
.to_create
.rbegin();
1248 it
!= idata
.to_create
.rend(); ++it
) {
1249 to_remove
.insert(it
->max
.encoded());
1250 librados::ObjectWriteOperation rm
;
1252 if ((((KeyValueStructure
*)this)->*KvFlatBtreeAsync::interrupt
)() == 1 )
1256 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-cleanup: removing "
1259 err
= io_ctx
.operate(it
->obj
, &rm
);
1261 if (verbose
) cout
<< "\t\t\t" << client_name
1262 << "-cleanup: failed to remove "
1263 << it
->obj
<< std::endl
;
1265 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-cleanup: removed "
1272 librados::ObjectWriteOperation update_index
;
1273 update_index
.omap_cmp(assertions
, &err
);
1274 update_index
.omap_rm_keys(to_remove
);
1275 update_index
.omap_set(new_index
);
1276 if (verbose
) cout
<< "\t\t\t" << client_name
<< "-cleanup: updating index"
1278 if ((((KeyValueStructure
*)this)->*KvFlatBtreeAsync::interrupt
)() == 1 ) {
1281 err
= io_ctx
.operate(index_name
, &update_index
);
1283 if (verbose
) cout
<< "\t\t\t" << client_name
1284 << "-cleanup: rewriting failed with "
1285 << err
<< ". returning -ECANCELED" << std::endl
;
1288 if (verbose
) cout
<< "\t\t\t" << client_name
1289 << "-cleanup: updated index. cleanup done."
1297 string
KvFlatBtreeAsync::to_string(string s
, int i
) {
1303 string
KvFlatBtreeAsync::get_name() {
1307 void KvFlatBtreeAsync::set_inject(injection_t inject
, int wait_time
) {
1309 wait_ms
= wait_time
;
1312 int KvFlatBtreeAsync::setup(int argc
, const char** argv
) {
1313 int r
= rados
.init(rados_id
.c_str());
1315 cerr
<< "error during init" << r
<< std::endl
;
1318 r
= rados
.conf_parse_argv(argc
, argv
);
1320 cerr
<< "error during parsing args" << r
<< std::endl
;
1323 r
= rados
.conf_parse_env(NULL
);
1325 cerr
<< "error during parsing env" << r
<< std::endl
;
1328 r
= rados
.conf_read_file(NULL
);
1330 cerr
<< "error during read file: " << r
<< std::endl
;
1333 r
= rados
.connect();
1335 cerr
<< "error during connect: " << r
<< std::endl
;
1338 r
= rados
.ioctx_create(pool_name
.c_str(), io_ctx
);
1340 cerr
<< "error creating io ctx: " << r
<< std::endl
;
1345 librados::ObjectWriteOperation make_index
;
1346 make_index
.create(true);
1347 map
<std::string
,bufferlist
> index_map
;
1349 idata
.obj
= client_name
;
1350 idata
.min_kdata
.raw_key
= "";
1351 idata
.kdata
= key_data("");
1352 index_map
["1"] = to_bl(idata
);
1353 make_index
.omap_set(index_map
);
1354 r
= io_ctx
.operate(index_name
, &make_index
);
1356 if (verbose
) cout
<< client_name
<< ": Making the index failed with code "
1361 if (verbose
) cout
<< client_name
<< ": created index object" << std::endl
;
1363 librados::ObjectWriteOperation make_max_obj
;
1364 make_max_obj
.create(true);
1365 make_max_obj
.setxattr("unwritable", to_bl("0"));
1366 make_max_obj
.setxattr("size", to_bl("0"));
1367 r
= io_ctx
.operate(client_name
, &make_max_obj
);
1369 if (verbose
) cout
<< client_name
<< ": Setting xattr failed with code "
1377 int KvFlatBtreeAsync::set(const string
&key
, const bufferlist
&val
,
1378 bool update_on_existing
) {
1379 if (verbose
) cout
<< client_name
<< " is "
1380 << (update_on_existing
? "updating " : "setting ")
1381 << key
<< std::endl
;
1384 index_data
idata(key
);
1386 if (verbose
) cout
<< "\t" << client_name
<< ": finding oid" << std::endl
;
1387 err
= read_index(key
, &idata
, NULL
, false);
1389 if (verbose
) cout
<< "\t" << client_name
1390 << ": getting oid failed with code "
1391 << err
<< std::endl
;
1394 if (verbose
) cout
<< "\t" << client_name
<< ": index data is " << idata
.str()
1395 << ", object is " << idata
.obj
<< std::endl
;
1397 err
= set_op(key
, val
, update_on_existing
, idata
);
1399 if (verbose
) cout
<< "\t" << client_name
<< ": finished set with " << err
1404 int KvFlatBtreeAsync::set_op(const string
&key
, const bufferlist
&val
,
1405 bool update_on_existing
, index_data
&idata
) {
1411 args
.exclusive
= !update_on_existing
;
1412 args
.omap
[key
] = val
;
1415 librados::ObjectWriteOperation owo
;
1416 owo
.exec("kvs", "omap_insert", inbl
);
1417 if ((((KeyValueStructure
*)this)->*KvFlatBtreeAsync::interrupt
)() == 1 ) {
1418 if (verbose
) cout
<< client_name
<< " IS SUICIDING!" << std::endl
;
1421 if (verbose
) cout
<< "\t" << client_name
<< ": inserting " << key
1423 << idata
.obj
<< std::endl
;
1424 int err
= io_ctx
.operate(idata
.obj
, &owo
);
1428 //the key already exists and this is an exclusive insert.
1429 cerr
<< "\t" << client_name
<< ": writing key failed with "
1430 << err
<< std::endl
;
1433 case -EKEYREJECTED
: {
1434 //the object needs to be split.
1436 if (verbose
) cout
<< "\t" << client_name
<< ": running split on "
1439 err
= read_index(key
, &idata
, NULL
, true);
1441 if (verbose
) cout
<< "\t" << client_name
1442 << ": getting oid failed with code "
1443 << err
<< std::endl
;
1447 if (err
< 0 && err
!= -ENOENT
&& err
!= -EBALANCE
) {
1448 if (verbose
) cerr
<< "\t" << client_name
<< ": split failed with "
1449 << err
<< std::endl
;
1450 int ret
= handle_set_rm_errors(err
, idata
.obj
, key
, &idata
, NULL
);
1453 if (verbose
) cout
<< client_name
<< " IS SUICIDING!" << std::endl
;
1456 return set_op(key
, val
, update_on_existing
, idata
);
1461 } while (err
< 0 && err
!= -EBALANCE
&& err
!= -ENOENT
);
1462 err
= read_index(key
, &idata
, NULL
, true);
1464 if (verbose
) cout
<< "\t" << client_name
1465 << ": getting oid failed with code "
1466 << err
<< std::endl
;
1469 return set_op(key
, val
, update_on_existing
, idata
);
1472 if (verbose
) cerr
<< "\t" << client_name
<< ": writing obj failed with "
1473 << err
<< std::endl
;
1474 if (err
== -ENOENT
|| err
== -EACCES
) {
1475 if (err
== -ENOENT
) {
1476 if (verbose
) cout
<< "CACHE FAILURE" << std::endl
;
1478 err
= read_index(key
, &idata
, NULL
, true);
1480 if (verbose
) cout
<< "\t" << client_name
1481 << ": getting oid failed with code "
1482 << err
<< std::endl
;
1485 if (verbose
) cout
<< "\t" << client_name
<< ": index data is "
1487 << ", object is " << idata
.obj
<< std::endl
;
1488 return set_op(key
, val
, update_on_existing
, idata
);
1497 int KvFlatBtreeAsync::remove(const string
&key
) {
1498 if (verbose
) cout
<< client_name
<< ": removing " << key
<< std::endl
;
1503 index_data next_idata
;
1505 if (verbose
) cout
<< "\t" << client_name
<< ": finding oid" << std::endl
;
1506 err
= read_index(key
, &idata
, &next_idata
, false);
1508 if (verbose
) cout
<< "getting oid failed with code " << err
<< std::endl
;
1512 if (verbose
) cout
<< "\t" << client_name
<< ": idata is " << idata
.str()
1513 << ", next_idata is " << next_idata
.str()
1514 << ", obj is " << obj
<< std::endl
;
1516 err
= remove_op(key
, idata
, next_idata
);
1518 if (verbose
) cout
<< "\t" << client_name
<< ": finished remove with " << err
1519 << " and exiting" << std::endl
;
1523 int KvFlatBtreeAsync::remove_op(const string
&key
, index_data
&idata
,
1524 index_data
&next_idata
) {
1529 args
.omap
.insert(key
);
1532 librados::ObjectWriteOperation owo
;
1533 owo
.exec("kvs", "omap_remove", inbl
);
1534 if ((((KeyValueStructure
*)this)->*KvFlatBtreeAsync::interrupt
)() == 1 ) {
1535 if (verbose
) cout
<< client_name
<< " IS SUICIDING!" << std::endl
;
1538 if (verbose
) cout
<< "\t" << client_name
<< ": removing " << key
<< " from "
1541 int err
= io_ctx
.operate(idata
.obj
, &owo
);
1543 if (verbose
) cout
<< "\t" << client_name
<< ": writing obj failed with "
1544 << err
<< std::endl
;
1547 //the key does not exist in the object
1550 case -EKEYREJECTED
: {
1551 //the object needs to be split.
1553 if (verbose
) cerr
<< "\t" << client_name
<< ": running rebalance on "
1554 << idata
.obj
<< std::endl
;
1555 err
= read_index(key
, &idata
, &next_idata
, true);
1557 if (verbose
) cout
<< "\t" << client_name
1558 << ": getting oid failed with code "
1559 << err
<< std::endl
;
1562 err
= rebalance(idata
, next_idata
);
1563 if (err
< 0 && err
!= -ENOENT
&& err
!= -EBALANCE
) {
1564 if (verbose
) cerr
<< "\t" << client_name
<< ": rebalance returned "
1565 << err
<< std::endl
;
1566 int ret
= handle_set_rm_errors(err
, idata
.obj
, key
, &idata
,
1570 if (verbose
) cout
<< client_name
<< " IS SUICIDING!" << std::endl
;
1573 return remove_op(key
, idata
, next_idata
);
1578 //this is the only node, so it's ok to go below k.
1579 librados::ObjectWriteOperation owo
;
1583 args
.omap
.insert(key
);
1585 owo
.exec("kvs", "omap_remove", inbl
);
1586 if ((((KeyValueStructure
*)this)->*KvFlatBtreeAsync::interrupt
)()
1588 if (verbose
) cout
<< client_name
<< " IS SUICIDING!"
1592 if (verbose
) cout
<< "\t" << client_name
<< ": removing " << key
1596 int err
= io_ctx
.operate(idata
.obj
, &owo
);
1602 } while (err
< 0 && err
!= -EBALANCE
&& err
!= -ENOENT
);
1603 err
= read_index(key
, &idata
, &next_idata
, true);
1605 if (verbose
) cout
<< "\t" << client_name
1606 << ": getting oid failed with code "
1607 << err
<< std::endl
;
1613 if (err
== -ENOENT
|| err
== -EACCES
) {
1614 err
= read_index(key
, &idata
, &next_idata
, true);
1616 if (verbose
) cout
<< "\t" << client_name
1617 << ": getting oid failed with code "
1618 << err
<< std::endl
;
1621 if (verbose
) cout
<< "\t" << client_name
<< ": index data is "
1623 << ", object is " << idata
.obj
<< std::endl
;
1624 //idea: we read the time every time we read the index anyway - store it.
1625 return remove_op(key
, idata
, next_idata
);
1634 int KvFlatBtreeAsync::handle_set_rm_errors(int &err
, string obj
,
1636 index_data
* idata
, index_data
* next_idata
) {
1637 if (err
== -ESUICIDE
) {
1639 } else if (err
== -ECANCELED
//if an object was unwritable or index changed
1640 || err
== -EPREFIX
//if there is currently a prefix
1641 || err
== -ETIMEDOUT
// if the index changes during the op - i.e. cleanup
1642 || err
== -EACCES
) //possible if we were acting on old index data
1644 err
= read_index(key
, idata
, next_idata
, true);
1648 if (verbose
) cout
<< "\t" << client_name
<< ": prefix is " << idata
->str()
1650 if (idata
->obj
!= obj
) {
1651 //someone else has split or cleaned up or something. start over.
1652 return 1;//meaning repeat
1654 } else if (err
!= -ETIMEDOUT
&& err
!= -ERANGE
&& err
!= -EACCES
1655 && err
!= -EUCLEAN
){
1656 if (verbose
) cout
<< "\t" << client_name
1657 << ": split encountered an unexpected error: " << err
1664 int KvFlatBtreeAsync::get(const string
&key
, bufferlist
*val
) {
1666 if (verbose
) cout
<< client_name
<< ": getting " << key
<< std::endl
;
1671 if ((((KeyValueStructure
*)this)->*KvFlatBtreeAsync::interrupt
)() == 1 ) {
1674 err
= read_index(key
, &idata
, NULL
, false);
1675 mytime
= ceph_clock_now();
1677 if (verbose
) cout
<< "getting oid failed with code " << err
<< std::endl
;
1681 err
= get_op(key
, val
, idata
);
1683 if (verbose
) cout
<< client_name
<< ": got " << key
<< " with " << err
1689 int KvFlatBtreeAsync::get_op(const string
&key
, bufferlist
*val
,
1690 index_data
&idata
) {
1692 std::set
<std::string
> key_set
;
1693 key_set
.insert(key
);
1694 map
<std::string
,bufferlist
> omap
;
1695 librados::ObjectReadOperation read
;
1696 read
.omap_get_vals_by_keys(key_set
, &omap
, &err
);
1697 err
= io_ctx
.operate(idata
.obj
, &read
, NULL
);
1699 if (err
== -ENOENT
) {
1700 err
= read_index(key
, &idata
, NULL
, true);
1702 if (verbose
) cout
<< "\t" << client_name
1703 << ": getting oid failed with code "
1704 << err
<< std::endl
;
1707 if (verbose
) cout
<< "\t" << client_name
<< ": index data is "
1709 << ", object is " << idata
.obj
<< std::endl
;
1710 return get_op(key
, val
, idata
);
1712 if (verbose
) cout
<< client_name
1713 << ": get encountered an unexpected error: " << err
1723 void *KvFlatBtreeAsync::pset(void *ptr
) {
1724 struct aio_set_args
*args
= (struct aio_set_args
*)ptr
;
1726 args
->kvba
->KvFlatBtreeAsync::set((string
)args
->key
,
1727 (bufferlist
)args
->val
, (bool)args
->exc
);
1728 args
->cb(args
->err
, args
->cb_args
);
1733 void KvFlatBtreeAsync::aio_set(const string
&key
, const bufferlist
&val
,
1734 bool exclusive
, callback cb
, void * cb_args
, int * err
) {
1735 aio_set_args
*args
= new aio_set_args();
1739 args
->exc
= exclusive
;
1741 args
->cb_args
= cb_args
;
1744 int r
= pthread_create(&t
, NULL
, pset
, (void*)args
);
1752 void *KvFlatBtreeAsync::prm(void *ptr
) {
1753 struct aio_rm_args
*args
= (struct aio_rm_args
*)ptr
;
1755 args
->kvba
->KvFlatBtreeAsync::remove((string
)args
->key
);
1756 args
->cb(args
->err
, args
->cb_args
);
1761 void KvFlatBtreeAsync::aio_remove(const string
&key
,
1762 callback cb
, void * cb_args
, int * err
) {
1763 aio_rm_args
* args
= new aio_rm_args();
1767 args
->cb_args
= cb_args
;
1770 int r
= pthread_create(&t
, NULL
, prm
, (void*)args
);
1778 void *KvFlatBtreeAsync::pget(void *ptr
) {
1779 struct aio_get_args
*args
= (struct aio_get_args
*)ptr
;
1781 args
->kvba
->KvFlatBtreeAsync::get((string
)args
->key
,
1782 (bufferlist
*)args
->val
);
1783 args
->cb(args
->err
, args
->cb_args
);
1788 void KvFlatBtreeAsync::aio_get(const string
&key
, bufferlist
*val
,
1789 callback cb
, void * cb_args
, int * err
) {
1790 aio_get_args
* args
= new aio_get_args();
1795 args
->cb_args
= cb_args
;
1798 int r
= pthread_create(&t
, NULL
, pget
, (void*)args
);
1806 int KvFlatBtreeAsync::set_many(const map
<string
, bufferlist
> &in_map
) {
1810 std::set
<string
> keys
;
1812 map
<string
, bufferlist
> big_map
;
1813 for (map
<string
, bufferlist
>::const_iterator it
= in_map
.begin();
1814 it
!= in_map
.end(); ++it
) {
1815 keys
.insert(it
->first
);
1816 big_map
.insert(*it
);
1819 if (verbose
) cout
<< "created key set and big_map" << std::endl
;
1822 librados::AioCompletion
* aioc
= rados
.aio_create_completion();
1823 io_ctx
.aio_exec(index_name
, aioc
, "kvs", "read_many", inbl
, &outbl
);
1824 aioc
->wait_for_complete();
1825 err
= aioc
->get_return_value();
1828 cerr
<< "getting index failed with " << err
<< std::endl
;
1832 map
<string
, bufferlist
> imap
;//read from the index
1833 auto blit
= outbl
.cbegin();
1836 if (verbose
) cout
<< "finished reading index for objects. there are "
1837 << imap
.size() << " entries that need to be changed. " << std::endl
;
1840 vector
<object_data
> to_delete
;
1842 vector
<object_data
> to_create
;
1844 if (verbose
) cout
<< "setting up to_delete and to_create vectors from index "
1845 << "map" << std::endl
;
1846 //set up to_delete from index map
1847 for (map
<string
, bufferlist
>::iterator it
= imap
.begin(); it
!= imap
.end();
1850 blit
= it
->second
.begin();
1852 to_delete
.push_back(object_data(idata
.min_kdata
, idata
.kdata
, idata
.obj
));
1853 err
= read_object(idata
.obj
, &to_delete
[to_delete
.size() - 1]);
1855 if (verbose
) cout
<< "reading " << idata
.obj
<< " failed with " << err
1857 return set_many(in_map
);
1860 big_map
.insert(to_delete
[to_delete
.size() - 1].omap
.begin(),
1861 to_delete
[to_delete
.size() - 1].omap
.end());
1864 to_create
.push_back(object_data(
1865 to_string(client_name
, client_index
++)));
1866 to_create
[0].min_kdata
= to_delete
[0].min_kdata
;
1868 for(map
<string
, bufferlist
>::iterator it
= big_map
.begin();
1869 it
!= big_map
.end(); ++it
) {
1870 if (to_create
[to_create
.size() - 1].omap
.size() == 1.5 * k
) {
1871 to_create
[to_create
.size() - 1].max_kdata
=
1872 key_data(to_create
[to_create
.size() - 1]
1873 .omap
.rbegin()->first
);
1875 to_create
.push_back(object_data(
1876 to_string(client_name
, client_index
++)));
1877 to_create
[to_create
.size() - 1].min_kdata
=
1878 to_create
[to_create
.size() - 2].max_kdata
;
1881 to_create
[to_create
.size() - 1].omap
.insert(*it
);
1883 to_create
[to_create
.size() - 1].max_kdata
=
1884 to_delete
[to_delete
.size() - 1].max_kdata
;
1886 vector
<librados::ObjectWriteOperation
> owos(2 + 2 * to_delete
.size()
1887 + to_create
.size());
1888 vector
<pair
<pair
<int, string
>, librados::ObjectWriteOperation
*> > ops
;
1892 set_up_prefix_index(to_create
, to_delete
, &owos
[0], &idata
, &err
);
1894 if (verbose
) cout
<< "finished making to_create and to_delete. "
1897 ops
.push_back(make_pair(
1898 pair
<int, string
>(ADD_PREFIX
, index_name
),
1900 for (int i
= 1; i
< 2 + 2 * (int)to_delete
.size() + (int)to_create
.size();
1902 ops
.push_back(make_pair(make_pair(0,""), &owos
[i
]));
1905 set_up_ops(to_create
, to_delete
, &ops
, idata
, &err
);
1907 cout
<< "finished setting up ops. Starting critical section..." << std::endl
;
1909 /////BEGIN CRITICAL SECTION/////
1910 //put prefix on index entry for idata.val
1911 err
= perform_ops("\t\t" + client_name
+ "-set_many:", idata
, &ops
);
1913 return set_many(in_map
);
1915 if (verbose
) cout
<< "\t\t" << client_name
<< "-split: done splitting."
1917 /////END CRITICAL SECTION/////
1918 std::scoped_lock l
{icache_lock
};
1919 for (vector
<delete_data
>::iterator it
= idata
.to_delete
.begin();
1920 it
!= idata
.to_delete
.end(); ++it
) {
1921 icache
.erase(it
->max
);
1923 for (vector
<create_data
>::iterator it
= idata
.to_create
.begin();
1924 it
!= idata
.to_create
.end(); ++it
) {
1925 icache
.push(index_data(*it
));
1930 int KvFlatBtreeAsync::remove_all() {
1931 if (verbose
) cout
<< client_name
<< ": removing all" << std::endl
;
1933 librados::ObjectReadOperation oro
;
1934 librados::AioCompletion
* oro_aioc
= rados
.aio_create_completion();
1935 std::map
<std::string
, bufferlist
> index_set
;
1936 oro
.omap_get_vals2("",LONG_MAX
,&index_set
, nullptr, &err
);
1937 err
= io_ctx
.aio_operate(index_name
, oro_aioc
, &oro
, NULL
);
1939 if (err
== -ENOENT
) {
1942 if (verbose
) cout
<< "getting keys failed with error " << err
<< std::endl
;
1945 oro_aioc
->wait_for_complete();
1946 oro_aioc
->release();
1948 librados::ObjectWriteOperation rm_index
;
1949 librados::AioCompletion
* rm_index_aioc
= rados
.aio_create_completion();
1950 map
<std::string
,bufferlist
> new_index
;
1951 new_index
["1"] = index_set
["1"];
1952 rm_index
.omap_clear();
1953 rm_index
.omap_set(new_index
);
1954 io_ctx
.aio_operate(index_name
, rm_index_aioc
, &rm_index
);
1955 err
= rm_index_aioc
->get_return_value();
1956 rm_index_aioc
->release();
1958 if (verbose
) cout
<< "rm index aioc failed with " << err
1963 if (!index_set
.empty()) {
1964 for (std::map
<std::string
,bufferlist
>::iterator it
= index_set
.begin();
1965 it
!= index_set
.end(); ++it
){
1966 librados::ObjectWriteOperation sub
;
1967 if (it
->first
== "1") {
1973 auto b
= it
->second
.cbegin();
1975 io_ctx
.operate(idata
.obj
, &sub
);
1984 int KvFlatBtreeAsync::get_all_keys(std::set
<std::string
> *keys
) {
1985 if (verbose
) cout
<< client_name
<< ": getting all keys" << std::endl
;
1987 librados::ObjectReadOperation oro
;
1988 std::map
<std::string
,bufferlist
> index_set
;
1989 oro
.omap_get_vals2("",LONG_MAX
,&index_set
, nullptr, &err
);
1990 io_ctx
.operate(index_name
, &oro
, NULL
);
1992 if (verbose
) cout
<< "getting keys failed with error " << err
<< std::endl
;
1995 for (std::map
<std::string
,bufferlist
>::iterator it
= index_set
.begin();
1996 it
!= index_set
.end(); ++it
){
1997 librados::ObjectReadOperation sub
;
1998 std::set
<std::string
> ret
;
1999 sub
.omap_get_keys2("",LONG_MAX
,&ret
, nullptr, &err
);
2001 auto b
= it
->second
.cbegin();
2003 io_ctx
.operate(idata
.obj
, &sub
, NULL
);
2004 keys
->insert(ret
.begin(), ret
.end());
2009 int KvFlatBtreeAsync::get_all_keys_and_values(
2010 map
<std::string
,bufferlist
> *kv_map
) {
2011 if (verbose
) cout
<< client_name
<< ": getting all keys and values"
2014 librados::ObjectReadOperation first_read
;
2015 std::set
<std::string
> index_set
;
2016 first_read
.omap_get_keys2("",LONG_MAX
,&index_set
, nullptr, &err
);
2017 io_ctx
.operate(index_name
, &first_read
, NULL
);
2019 if (verbose
) cout
<< "getting keys failed with error " << err
<< std::endl
;
2022 for (std::set
<std::string
>::iterator it
= index_set
.begin();
2023 it
!= index_set
.end(); ++it
){
2024 librados::ObjectReadOperation sub
;
2025 map
<std::string
, bufferlist
> ret
;
2026 sub
.omap_get_vals2("",LONG_MAX
,&ret
, nullptr, &err
);
2027 io_ctx
.operate(*it
, &sub
, NULL
);
2028 kv_map
->insert(ret
.begin(), ret
.end());
2033 bool KvFlatBtreeAsync::is_consistent() {
2036 if (verbose
) cout
<< client_name
<< ": checking consistency" << std::endl
;
2037 std::map
<std::string
,bufferlist
> index
;
2038 map
<std::string
, std::set
<std::string
> > sub_objs
;
2039 librados::ObjectReadOperation oro
;
2040 oro
.omap_get_vals2("",LONG_MAX
,&index
, nullptr, &err
);
2041 io_ctx
.operate(index_name
, &oro
, NULL
);
2043 //probably because the index doesn't exist - this might be ok.
2044 for (librados::NObjectIterator oit
= io_ctx
.nobjects_begin();
2045 oit
!= io_ctx
.nobjects_end(); ++oit
) {
2046 //if this executes, there are floating objects.
2047 cerr
<< "Not consistent! found floating object " << oit
->get_oid()
2054 std::map
<std::string
, string
> parsed_index
;
2055 std::set
<std::string
> onames
;
2056 std::set
<std::string
> special_names
;
2057 for (map
<std::string
,bufferlist
>::iterator it
= index
.begin();
2058 it
!= index
.end(); ++it
) {
2059 if (it
->first
!= "") {
2061 auto b
= it
->second
.cbegin();
2063 if (idata
.prefix
!= "") {
2064 for(vector
<delete_data
>::iterator dit
= idata
.to_delete
.begin();
2065 dit
!= idata
.to_delete
.end(); ++dit
) {
2066 librados::ObjectReadOperation oro
;
2067 librados::AioCompletion
* aioc
= rados
.aio_create_completion();
2069 oro
.getxattr("unwritable", &un
, &err
);
2070 io_ctx
.aio_operate(dit
->obj
, aioc
, &oro
, NULL
);
2071 aioc
->wait_for_complete();
2072 err
= aioc
->get_return_value();
2073 if (ceph_clock_now() - idata
.ts
> timeout
) {
2076 if (err
== -ENOENT
) {
2079 cerr
<< "Not consistent! reading object " << dit
->obj
2080 << "returned " << err
<< std::endl
;
2085 if (atoi(string(un
.c_str(), un
.length()).c_str()) != 1 &&
2086 aioc
->get_version64() != dit
->version
) {
2087 cerr
<< "Not consistent! object " << dit
->obj
<< " has been "
2088 << " modified since the client died was not cleaned up."
2093 special_names
.insert(dit
->obj
);
2096 for(vector
<create_data
>::iterator cit
= idata
.to_create
.begin();
2097 cit
!= idata
.to_create
.end(); ++cit
) {
2098 special_names
.insert(cit
->obj
);
2101 parsed_index
.insert(make_pair(it
->first
, idata
.obj
));
2102 onames
.insert(idata
.obj
);
2106 //make sure that an object exists iff it either is the index
2107 //or is listed in the index
2108 for (librados::NObjectIterator oit
= io_ctx
.nobjects_begin();
2109 oit
!= io_ctx
.nobjects_end(); ++oit
) {
2110 string name
= oit
->get_oid();
2111 if (name
!= index_name
&& onames
.count(name
) == 0
2112 && special_names
.count(name
) == 0) {
2113 cerr
<< "Not consistent! found floating object " << name
<< std::endl
;
2120 for (std::map
<std::string
, string
>::iterator it
= parsed_index
.begin();
2121 it
!= parsed_index
.end();
2123 librados::ObjectReadOperation read
;
2124 read
.omap_get_keys2("", LONG_MAX
, &sub_objs
[it
->second
], nullptr, &err
);
2125 err
= io_ctx
.operate(it
->second
, &read
, NULL
);
2126 int size_int
= (int)sub_objs
[it
->second
].size();
2128 //check that size is in the right range
2129 if (it
->first
!= "1" && special_names
.count(it
->second
) == 0 &&
2130 err
!= -ENOENT
&& (size_int
> 2*k
|| size_int
< k
)
2131 && parsed_index
.size() > 1) {
2132 cerr
<< "Not consistent! Object " << *it
<< " has size " << size_int
2133 << ", which is outside the acceptable range." << std::endl
;
2137 //check that all keys belong in that object
2138 for(std::set
<std::string
>::iterator subit
= sub_objs
[it
->second
].begin();
2139 subit
!= sub_objs
[it
->second
].end(); ++subit
) {
2140 if ((it
->first
!= "1"
2141 && *subit
> it
->first
.substr(1,it
->first
.length()))
2142 || *subit
<= prev
) {
2143 cerr
<< "Not consistent! key " << *subit
<< " does not belong in "
2144 << *it
<< std::endl
;
2145 cerr
<< "not last element, i.e. " << it
->first
<< " not equal to 1? "
2146 << (it
->first
!= "1") << std::endl
2147 << "greater than " << it
->first
.substr(1,it
->first
.length())
2148 <<"? " << (*subit
> it
->first
.substr(1,it
->first
.length()))
2150 << "less than or equal to " << prev
<< "? "
2151 << (*subit
<= prev
) << std::endl
;
2156 prev
= it
->first
.substr(1,it
->first
.length());
2160 if (verbose
) cout
<< "failed consistency test - see error log"
2164 if (verbose
) cout
<< "passed consistency test" << std::endl
;
2169 string
KvFlatBtreeAsync::str() {
2171 ret
<< "Top-level map:" << std::endl
;
2173 std::set
<std::string
> keys
;
2174 std::map
<std::string
,bufferlist
> index
;
2175 librados::ObjectReadOperation oro
;
2176 librados::AioCompletion
* top_aioc
= rados
.aio_create_completion();
2177 oro
.omap_get_vals2("",LONG_MAX
,&index
, nullptr, &err
);
2178 io_ctx
.aio_operate(index_name
, top_aioc
, &oro
, NULL
);
2179 top_aioc
->wait_for_complete();
2180 err
= top_aioc
->get_return_value();
2181 top_aioc
->release();
2182 if (err
< 0 && err
!= -5){
2183 if (verbose
) cout
<< "getting keys failed with error " << err
<< std::endl
;
2187 ret
<< "There are no objects!" << std::endl
;
2191 for (map
<std::string
,bufferlist
>::iterator it
= index
.begin();
2192 it
!= index
.end(); ++it
) {
2193 keys
.insert(string(it
->second
.c_str(), it
->second
.length())
2194 .substr(1,it
->second
.length()));
2197 vector
<std::string
> all_names
;
2198 vector
<int> all_sizes(index
.size());
2199 vector
<int> all_versions(index
.size());
2200 vector
<bufferlist
> all_unwrit(index
.size());
2201 vector
<map
<std::string
,bufferlist
> > all_maps(keys
.size());
2202 vector
<map
<std::string
,bufferlist
>::iterator
> its(keys
.size());
2204 vector
<bool> dones(keys
.size());
2205 ret
<< std::endl
<< string(150,'-') << std::endl
;
2207 for (map
<std::string
,bufferlist
>::iterator it
= index
.begin();
2208 it
!= index
.end(); ++it
){
2210 auto b
= it
->second
.cbegin();
2212 string s
= idata
.str();
2213 ret
<< "|" << string((148 -
2214 ((*it
).first
.length()+s
.length()+3))/2,' ');
2217 ret
<< string(idata
.str());
2218 ret
<< string((148 -
2219 ((*it
).first
.length()+s
.length()+3))/2,' ');
2221 all_names
.push_back(idata
.obj
);
2222 ret
<< std::endl
<< string(150,'-') << std::endl
;
2227 //get the object names and sizes
2228 for(vector
<std::string
>::iterator it
= all_names
.begin(); it
2231 librados::ObjectReadOperation oro
;
2232 librados::AioCompletion
*aioc
= rados
.aio_create_completion();
2233 oro
.omap_get_vals2("", LONG_MAX
, &all_maps
[indexer
], nullptr, &err
);
2234 oro
.getxattr("unwritable", &all_unwrit
[indexer
], &err
);
2235 io_ctx
.aio_operate(*it
, aioc
, &oro
, NULL
);
2236 aioc
->wait_for_complete();
2237 if (aioc
->get_return_value() < 0) {
2238 ret
<< "reading" << *it
<< "failed: " << err
<< std::endl
;
2241 all_sizes
[indexer
] = all_maps
[indexer
].size();
2242 all_versions
[indexer
] = aioc
->get_version64();
2247 ret
<< "///////////////////OBJECT NAMES////////////////" << std::endl
;
2250 for (int i
= 0; i
< indexer
; i
++) {
2251 ret
<< "---------------------------\t";
2254 for (int i
= 0; i
< indexer
; i
++) {
2255 ret
<< "|" << string((25 -
2256 (string("Bucket: ").length() + all_names
[i
].length()))/2, ' ');
2257 ret
<< "Bucket: " << all_names
[i
];
2259 (string("Bucket: ").length() + all_names
[i
].length()))/2, ' ') << "|\t";
2262 for (int i
= 0; i
< indexer
; i
++) {
2263 its
[i
] = all_maps
[i
].begin();
2264 ret
<< "|" << string((25 - (string("size: ").length()
2265 + to_string("",all_sizes
[i
]).length()))/2, ' ');
2266 ret
<< "size: " << all_sizes
[i
];
2267 ret
<< string((25 - (string("size: ").length()
2268 + to_string("",all_sizes
[i
]).length()))/2, ' ') << "|\t";
2271 for (int i
= 0; i
< indexer
; i
++) {
2272 its
[i
] = all_maps
[i
].begin();
2273 ret
<< "|" << string((25 - (string("version: ").length()
2274 + to_string("",all_versions
[i
]).length()))/2, ' ');
2275 ret
<< "version: " << all_versions
[i
];
2276 ret
<< string((25 - (string("version: ").length()
2277 + to_string("",all_versions
[i
]).length()))/2, ' ') << "|\t";
2280 for (int i
= 0; i
< indexer
; i
++) {
2281 its
[i
] = all_maps
[i
].begin();
2282 ret
<< "|" << string((25 - (string("unwritable? ").length()
2284 ret
<< "unwritable? " << string(all_unwrit
[i
].c_str(),
2285 all_unwrit
[i
].length());
2286 ret
<< string((25 - (string("unwritable? ").length()
2287 + 1))/2, ' ') << "|\t";
2290 for (int i
= 0; i
< indexer
; i
++) {
2291 ret
<< "---------------------------\t";
2294 ret
<< "///////////////////THE ACTUAL BLOCKS////////////////" << std::endl
;
2298 for (int i
= 0; i
< indexer
; i
++) {
2299 ret
<< "---------------------------\t";
2302 //each time through this part is two lines
2303 while(done
< keys
.size()) {
2304 for(int i
= 0; i
< indexer
; i
++) {
2308 if (its
[i
] == all_maps
[i
].end()){
2313 ret
<< "|" << string((25 -
2314 ((*its
[i
]).first
.length()+its
[i
]->second
.length()+3))/2,' ');
2315 ret
<< (*its
[i
]).first
;
2317 ret
<< string(its
[i
]->second
.c_str(), its
[i
]->second
.length());
2319 ((*its
[i
]).first
.length()+its
[i
]->second
.length()+3))/2,' ');
2327 for (int i
= 0; i
< indexer
; i
++) {
2331 ret
<< "---------------------------\t";