]> git.proxmox.com Git - ceph.git/blame - ceph/src/key_value_store/kv_flat_btree_async.cc
bump version to 12.2.12-pve1
[ceph.git] / ceph / src / key_value_store / kv_flat_btree_async.cc
CommitLineData
7c673cae
FG
1/*
2 * Key-value store using librados
3 *
4 * September 2, 2012
5 * Eleanor Cawthon
6 * eleanor.cawthon@inktank.com
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13
14#include "key_value_store/key_value_structure.h"
15#include "key_value_store/kv_flat_btree_async.h"
16#include "key_value_store/kvs_arg_types.h"
17#include "include/rados/librados.hpp"
18#include "/usr/include/asm-generic/errno.h"
19#include "/usr/include/asm-generic/errno-base.h"
20#include "common/ceph_context.h"
21#include "common/Clock.h"
22#include "include/types.h"
23
24
25#include <string>
26#include <iostream>
27#include <cassert>
28#include <climits>
29#include <cmath>
30#include <sstream>
31#include <stdlib.h>
32#include <iterator>
33
34using namespace std;
35using ceph::bufferlist;
36
37bool index_data::is_timed_out(utime_t now, utime_t timeout) const {
38 return prefix != "" && now - ts > timeout;
39}
40
41void IndexCache::clear() {
42 k2itmap.clear();
43 t2kmap.clear();
44}
45
46void IndexCache::push(const string &key, const index_data &idata) {
47 if (cache_size == 0) {
48 return;
49 }
50 index_data old_idata;
51 map<key_data, pair<index_data, utime_t> >::iterator old_it =
52 k2itmap.lower_bound(key_data(key));
53 if (old_it != k2itmap.end()) {
54 t2kmap.erase(old_it->second.second);
55 k2itmap.erase(old_it);
56 }
57 map<key_data, pair<index_data, utime_t> >::iterator new_it =
58 k2itmap.find(idata.kdata);
59 if (new_it != k2itmap.end()) {
60 utime_t old_time = new_it->second.second;
61 t2kmap.erase(old_time);
62 }
63 utime_t time = ceph_clock_now();
64 k2itmap[idata.kdata] = make_pair(idata, time);
65 t2kmap[time] = idata.kdata;
66 if ((int)k2itmap.size() > cache_size) {
67 pop();
68 }
69
70}
71
72void IndexCache::push(const index_data &idata) {
73 if (cache_size == 0) {
74 return;
75 }
76 if (k2itmap.count(idata.kdata) > 0) {
77 utime_t old_time = k2itmap[idata.kdata].second;
78 t2kmap.erase(old_time);
79 k2itmap.erase(idata.kdata);
80 }
81 utime_t time = ceph_clock_now();
82 k2itmap[idata.kdata] = make_pair(idata, time);
83 t2kmap[time] = idata.kdata;
84 if ((int)k2itmap.size() > cache_size) {
85 pop();
86 }
87}
88
89void IndexCache::pop() {
90 if (cache_size == 0) {
91 return;
92 }
93 map<utime_t, key_data>::iterator it = t2kmap.begin();
94 utime_t time = it->first;
95 key_data kdata = it->second;
96 k2itmap.erase(kdata);
97 t2kmap.erase(time);
98}
99
100void IndexCache::erase(key_data kdata) {
101 if (cache_size == 0) {
102 return;
103 }
104 if (k2itmap.count(kdata) > 0) {
105 utime_t c = k2itmap[kdata].second;
106 k2itmap.erase(kdata);
107 t2kmap.erase(c);
108 }
109}
110
111int IndexCache::get(const string &key, index_data *idata) const {
112 if (cache_size == 0) {
113 return -ENODATA;
114 }
115 if ((int)k2itmap.size() == 0) {
116 return -ENODATA;
117 }
118 map<key_data, pair<index_data, utime_t> >::const_iterator it =
119 k2itmap.lower_bound(key_data(key));
120 if (it == k2itmap.end() || !(it->second.first.min_kdata < key_data(key))) {
121 return -ENODATA;
122 } else {
123 *idata = it->second.first;
124 }
125 return 0;
126}
127
128int IndexCache::get(const string &key, index_data *idata,
129 index_data *next_idata) const {
130 if (cache_size == 0) {
131 return -ENODATA;
132 }
133 map<key_data, pair<index_data, utime_t> >::const_iterator it =
134 k2itmap.lower_bound(key_data(key));
135 if (it == k2itmap.end() || ++it == k2itmap.end()) {
136 return -ENODATA;
137 } else {
138 --it;
139 if (!(it->second.first.min_kdata < key_data(key))){
140 //stale, should be reread.
141 return -ENODATA;
142 } else {
143 *idata = it->second.first;
144 ++it;
145 if (it != k2itmap.end()) {
146 *next_idata = it->second.first;
147 }
148 }
149 }
150 return 0;
151}
152
153int KvFlatBtreeAsync::nothing() {
154 return 0;
155}
156
157int KvFlatBtreeAsync::wait() {
158 if (rand() % 10 == 0) {
159 usleep(wait_ms);
160 }
161 return 0;
162}
163
164int KvFlatBtreeAsync::suicide() {
165 if (rand() % 10 == 0) {
166 if (verbose) cout << client_name << " is suiciding" << std::endl;
167 return 1;
168 }
169 return 0;
170}
171
172int KvFlatBtreeAsync::next(const index_data &idata, index_data * out_data)
173{
174 if (verbose) cout << "\t\t" << client_name << "-next: finding next of "
175 << idata.str()
176 << std::endl;
177 int err = 0;
178 librados::ObjectReadOperation oro;
179 std::map<std::string, bufferlist> kvs;
180 oro.omap_get_vals2(idata.kdata.encoded(),1,&kvs, nullptr, &err);
181 err = io_ctx.operate(index_name, &oro, NULL);
182 if (err < 0){
183 if (verbose) cout << "\t\t\t" << client_name
184 << "-next: getting index failed with error "
185 << err << std::endl;
186 return err;
187 }
188 if (!kvs.empty()) {
189 out_data->kdata.parse(kvs.begin()->first);
190 bufferlist::iterator b = kvs.begin()->second.begin();
191 out_data->decode(b);
192 if (idata.is_timed_out(ceph_clock_now(), timeout)) {
193 if (verbose) cout << client_name << " THINKS THE OTHER CLIENT DIED."
194 << std::endl;
195 //the client died after deleting the object. clean up.
196 cleanup(idata, err);
197 }
198 } else {
199 err = -EOVERFLOW;
200 }
201 return err;
202}
203
204int KvFlatBtreeAsync::prev(const index_data &idata, index_data * out_data)
205{
206 if (verbose) cout << "\t\t" << client_name << "-prev: finding prev of "
207 << idata.str() << std::endl;
208 int err = 0;
209 bufferlist inbl;
210 idata_from_idata_args in_args;
211 in_args.idata = idata;
212 in_args.encode(inbl);
213 bufferlist outbl;
214 err = io_ctx.exec(index_name,"kvs", "get_prev_idata", inbl, outbl);
215 if (err < 0){
216 if (verbose) cout << "\t\t\t" << client_name
217 << "-prev: getting index failed with error "
218 << err << std::endl;
219 if (idata.is_timed_out(ceph_clock_now(), timeout)) {
220 if (verbose) cout << client_name << " THINKS THE OTHER CLIENT DIED."
221 << std::endl;
222 //the client died after deleting the object. clean up.
223 err = cleanup(idata, err);
224 if (err == -ESUICIDE) {
225 return err;
226 } else {
227 err = 0;
228 }
229 }
230 return err;
231 }
232 bufferlist::iterator it = outbl.begin();
233 in_args.decode(it);
234 *out_data = in_args.next_idata;
235 if (verbose) cout << "\t\t" << client_name << "-prev: prev is "
236 << out_data->str()
237 << std::endl;
238 return err;
239}
240
241int KvFlatBtreeAsync::read_index(const string &key, index_data * idata,
242 index_data * next_idata, bool force_update) {
243 int err = 0;
244 if (!force_update) {
245 if (verbose) cout << "\t" << client_name
246 << "-read_index: getting index_data for " << key
247 << " from cache" << std::endl;
248 icache_lock.Lock();
249 if (next_idata != NULL) {
250 err = icache.get(key, idata, next_idata);
251 } else {
252 err = icache.get(key, idata);
253 }
254 icache_lock.Unlock();
255
256 if (err == 0) {
257 //if (verbose) cout << "CACHE SUCCESS" << std::endl;
258 return err;
259 } else {
260 if (verbose) cout << "NOT IN CACHE" << std::endl;
261 }
262 }
263
264 if (verbose) cout << "\t" << client_name
265 << "-read_index: getting index_data for " << key
266 << " from object" << std::endl;
267 librados::ObjectReadOperation oro;
268 bufferlist raw_val;
269 std::set<std::string> key_set;
270 key_set.insert(key_data(key).encoded());
271 std::map<std::string, bufferlist> kvmap;
272 std::map<std::string, bufferlist> dupmap;
273 oro.omap_get_vals_by_keys(key_set, &dupmap, &err);
274 oro.omap_get_vals2(key_data(key).encoded(),
275 (cache_size / cache_refresh >= 2? cache_size / cache_refresh: 2),
276 &kvmap, nullptr, &err);
277 err = io_ctx.operate(index_name, &oro, NULL);
278 utime_t mytime = ceph_clock_now();
279 if (err < 0){
280 cerr << "\t" << client_name
281 << "-read_index: getting keys failed with "
282 << err << std::endl;
283 assert(0 == client_name + "-read_index: reading index failed");
284 return err;
285 }
286 kvmap.insert(dupmap.begin(), dupmap.end());
287 for (map<string, bufferlist>::iterator it = ++kvmap.begin();
288 it != kvmap.end();
289 ++it) {
290 bufferlist bl = it->second;
291 bufferlist::iterator blit = bl.begin();
292 index_data this_idata;
293 this_idata.decode(blit);
294 if (this_idata.is_timed_out(mytime, timeout)) {
295 if (verbose) cout << client_name
296 << " THINKS THE OTHER CLIENT DIED. (mytime is "
297 << mytime.sec() << "." << mytime.usec() << ", idata.ts is "
298 << this_idata.ts.sec() << "." << this_idata.ts.usec()
299 << ", it has been " << (mytime - this_idata.ts).sec()
300 << '.' << (mytime - this_idata.ts).usec()
301 << ", timeout is " << timeout << ")" << std::endl;
302 //the client died after deleting the object. clean up.
303 if (cleanup(this_idata, -EPREFIX) == -ESUICIDE) {
304 return -ESUICIDE;
305 }
306 return read_index(key, idata, next_idata, force_update);
307 }
308 icache_lock.Lock();
309 icache.push(this_idata);
310 icache_lock.Unlock();
311 }
312 bufferlist::iterator b = kvmap.begin()->second.begin();
313 idata->decode(b);
314 idata->kdata.parse(kvmap.begin()->first);
315 if (verbose) cout << "\t" << client_name << "-read_index: kvmap_size is "
316 << kvmap.size()
317 << ", idata is " << idata->str() << std::endl;
318
319 assert(idata->obj != "");
320 icache_lock.Lock();
321 icache.push(key, *idata);
322 icache_lock.Unlock();
323
324 if (next_idata != NULL && idata->kdata.prefix != "1") {
325 next_idata->kdata.parse((++kvmap.begin())->first);
326 bufferlist::iterator nb = (++kvmap.begin())->second.begin();
327 next_idata->decode(nb);
328 icache_lock.Lock();
329 icache.push(*next_idata);
330 icache_lock.Unlock();
331 }
332 return err;
333}
334
335int KvFlatBtreeAsync::split(const index_data &idata) {
336 int err = 0;
337 opmap['l']++;
338
339 if (idata.prefix != "") {
340 return -EPREFIX;
341 }
342
343 rebalance_args args;
344 args.bound = 2 * k - 1;
345 args.comparator = CEPH_OSD_CMPXATTR_OP_GT;
346 err = read_object(idata.obj, &args);
347 args.odata.max_kdata = idata.kdata;
348 if (err < 0) {
349 if (verbose) cout << "\t\t" << client_name << "-split: read object "
350 << args.odata.name
351 << " got " << err << std::endl;
352 return err;
353 }
354
355 if (verbose) cout << "\t\t" << client_name << "-split: splitting "
356 << idata.obj
357 << ", which has size " << args.odata.size
358 << " and actual size " << args.odata.omap.size() << std::endl;
359
360 ///////preparations that happen outside the critical section
361 //for prefix index
362 vector<object_data> to_create;
363 vector<object_data> to_delete;
364 to_delete.push_back(object_data(idata.min_kdata,
365 args.odata.max_kdata, args.odata.name, args.odata.version));
366
367 //for lower half object
368 map<std::string, bufferlist>::const_iterator it = args.odata.omap.begin();
369 client_index_lock.Lock();
370 to_create.push_back(object_data(to_string(client_name, client_index++)));
371 client_index_lock.Unlock();
372 for (int i = 0; i < k; i++) {
373 to_create[0].omap.insert(*it);
374 ++it;
375 }
376 to_create[0].min_kdata = idata.min_kdata;
377 to_create[0].max_kdata = key_data(to_create[0].omap.rbegin()->first);
378
379 //for upper half object
380 client_index_lock.Lock();
381 to_create.push_back(object_data(to_create[0].max_kdata,
382 args.odata.max_kdata,
383 to_string(client_name, client_index++)));
384 client_index_lock.Unlock();
385 to_create[1].omap.insert(
386 ++args.odata.omap.find(to_create[0].omap.rbegin()->first),
387 args.odata.omap.end());
388
389 //setting up operations
390 librados::ObjectWriteOperation owos[6];
391 vector<pair<pair<int, string>, librados::ObjectWriteOperation*> > ops;
392 index_data out_data;
393 set_up_prefix_index(to_create, to_delete, &owos[0], &out_data, &err);
394 ops.push_back(make_pair(
395 pair<int, string>(ADD_PREFIX, index_name),
396 &owos[0]));
397 for (int i = 1; i < 6; i++) {
398 ops.push_back(make_pair(make_pair(0,""), &owos[i]));
399 }
400 set_up_ops(to_create, to_delete, &ops, out_data, &err);
401
402 /////BEGIN CRITICAL SECTION/////
403 //put prefix on index entry for idata.val
404 err = perform_ops("\t\t" + client_name + "-split:", out_data, &ops);
405 if (err < 0) {
406 return err;
407 }
408 if (verbose) cout << "\t\t" << client_name << "-split: done splitting."
409 << std::endl;
410 /////END CRITICAL SECTION/////
411 icache_lock.Lock();
412 for (vector<delete_data>::iterator it = out_data.to_delete.begin();
413 it != out_data.to_delete.end(); ++it) {
414 icache.erase(it->max);
415 }
416 for (vector<create_data>::iterator it = out_data.to_create.begin();
417 it != out_data.to_create.end(); ++it) {
418 icache.push(index_data(*it));
419 }
420 icache_lock.Unlock();
421 return err;
422}
423
424int KvFlatBtreeAsync::rebalance(const index_data &idata1,
425 const index_data &next_idata){
426 opmap['m']++;
427 int err = 0;
428
429 if (idata1.prefix != "") {
430 return -EPREFIX;
431 }
432
433 rebalance_args args1;
434 args1.bound = k + 1;
435 args1.comparator = CEPH_OSD_CMPXATTR_OP_LT;
436 index_data idata2 = next_idata;
437
438 rebalance_args args2;
439 args2.bound = k + 1;
440 args2.comparator = CEPH_OSD_CMPXATTR_OP_LT;
441
442 if (idata1.kdata.prefix == "1") {
443 //this is the highest key in the index, so it doesn't have a next.
444
445 //read the index for the previous entry
446 err = prev(idata1, &idata2);
447 if (err == -ERANGE) {
448 if (verbose) cout << "\t\t" << client_name
449 << "-rebalance: this is the only node, "
450 << "so aborting" << std::endl;
451 return -EUCLEAN;
452 } else if (err < 0) {
453 return err;
454 }
455
456 //read the first object
457 err = read_object(idata1.obj, &args2);
458 if (err < 0) {
459 if (verbose) cout << "reading " << idata1.obj << " failed with " << err
460 << std::endl;
461 if (err == -ENOENT) {
462 return -ECANCELED;
463 }
464 return err;
465 }
466 args2.odata.min_kdata = idata1.min_kdata;
467 args2.odata.max_kdata = idata1.kdata;
468
469 //read the second object
470 args1.bound = 2 * k + 1;
471 err = read_object(idata2.obj, &args1);
472 if (err < 0) {
473 if (verbose) cout << "reading " << idata1.obj << " failed with " << err
474 << std::endl;
475 return err;
476 }
477 args1.odata.min_kdata = idata2.min_kdata;
478 args1.odata.max_kdata = idata2.kdata;
479
480 if (verbose) cout << "\t\t" << client_name << "-rebalance: read "
481 << idata2.obj
482 << ". size: " << args1.odata.size << " version: "
483 << args1.odata.version
484 << std::endl;
485 } else {
486 assert (next_idata.obj != "");
487 //there is a next key, so get it.
488 err = read_object(idata1.obj, &args1);
489 if (err < 0) {
490 if (verbose) cout << "reading " << idata1.obj << " failed with " << err
491 << std::endl;
492 return err;
493 }
494 args1.odata.min_kdata = idata1.min_kdata;
495 args1.odata.max_kdata = idata1.kdata;
496
497 args2.bound = 2 * k + 1;
498 err = read_object(idata2.obj, &args2);
499 if (err < 0) {
500 if (verbose) cout << "reading " << idata1.obj << " failed with " << err
501 << std::endl;
502 if (err == -ENOENT) {
503 return -ECANCELED;
504 }
505 return err;
506 }
507 args2.odata.min_kdata = idata2.min_kdata;
508 args2.odata.max_kdata = idata2.kdata;
509
510 if (verbose) cout << "\t\t" << client_name << "-rebalance: read "
511 << idata2.obj
512 << ". size: " << args2.odata.size << " version: "
513 << args2.odata.version
514 << std::endl;
515 }
516
517 if (verbose) cout << "\t\t" << client_name << "-rebalance: o1 is "
518 << args1.odata.max_kdata.encoded() << ","
519 << args1.odata.name << " with size " << args1.odata.size
520 << " , o2 is " << args2.odata.max_kdata.encoded()
521 << "," << args2.odata.name << " with size " << args2.odata.size
522 << std::endl;
523
524 //calculations
525 if ((int)args1.odata.size > k && (int)args1.odata.size <= 2*k
526 && (int)args2.odata.size > k
527 && (int)args2.odata.size <= 2*k) {
528 //nothing to do
529 if (verbose) cout << "\t\t" << client_name
530 << "-rebalance: both sizes in range, so"
531 << " aborting " << std::endl;
532 return -EBALANCE;
533 } else if (idata1.prefix != "" || idata2.prefix != "") {
534 return -EPREFIX;
535 }
536
537 //this is the high object. it gets created regardless of rebalance or merge.
538 client_index_lock.Lock();
539 string o2w = to_string(client_name, client_index++);
540 client_index_lock.Unlock();
541 index_data idata;
542 vector<object_data> to_create;
543 vector<object_data> to_delete;
544 librados::ObjectWriteOperation create[2];//possibly only 1 will be used
545 librados::ObjectWriteOperation other_ops[6];
546 vector<pair<pair<int, string>, librados::ObjectWriteOperation*> > ops;
547 ops.push_back(make_pair(
548 pair<int, string>(ADD_PREFIX, index_name),
549 &other_ops[0]));
550
551 if ((int)args1.odata.size + (int)args2.odata.size <= 2*k) {
552 //merge
553 if (verbose) cout << "\t\t" << client_name << "-rebalance: merging "
554 << args1.odata.name
555 << " and " << args2.odata.name << " to get " << o2w
556 << std::endl;
557 map<string, bufferlist> write2_map;
558 write2_map.insert(args1.odata.omap.begin(), args1.odata.omap.end());
559 write2_map.insert(args2.odata.omap.begin(), args2.odata.omap.end());
560 to_create.push_back(object_data(args1.odata.min_kdata,
561 args2.odata.max_kdata, o2w, write2_map));
562 ops.push_back(make_pair(
563 pair<int, string>(MAKE_OBJECT, o2w),
564 &create[0]));
565 assert((int)write2_map.size() <= 2*k);
566 } else {
567 //rebalance
568 if (verbose) cout << "\t\t" << client_name << "-rebalance: rebalancing "
569 << args1.odata.name
570 << " and " << args2.odata.name << std::endl;
571 map<std::string, bufferlist> write1_map;
572 map<std::string, bufferlist> write2_map;
573 map<std::string, bufferlist>::iterator it;
574 client_index_lock.Lock();
575 string o1w = to_string(client_name, client_index++);
576 client_index_lock.Unlock();
577 int target_size_1 = ceil(((int)args1.odata.size + (int)args2.odata.size)
578 / 2.0);
579 if (args1.odata.max_kdata != idata1.kdata) {
580 //this should be true if idata1 is the high object
581 target_size_1 = floor(((int)args1.odata.size + (int)args2.odata.size)
582 / 2.0);
583 }
584 for (it = args1.odata.omap.begin();
585 it != args1.odata.omap.end() && (int)write1_map.size()
586 < target_size_1;
587 ++it) {
588 write1_map.insert(*it);
589 }
590 if (it != args1.odata.omap.end()){
591 //write1_map is full, so put the rest in write2_map
592 write2_map.insert(it, args1.odata.omap.end());
593 write2_map.insert(args2.odata.omap.begin(), args2.odata.omap.end());
594 } else {
595 //args1.odata.omap was small, and write2_map still needs more
596 map<std::string, bufferlist>::iterator it2;
597 for(it2 = args2.odata.omap.begin();
598 (it2 != args2.odata.omap.end()) && ((int)write1_map.size()
599 < target_size_1);
600 ++it2) {
601 write1_map.insert(*it2);
602 }
603 write2_map.insert(it2, args2.odata.omap.end());
604 }
605 if (verbose) cout << "\t\t" << client_name
606 << "-rebalance: write1_map has size "
607 << write1_map.size() << ", write2_map.size() is " << write2_map.size()
608 << std::endl;
609 //at this point, write1_map and write2_map should have the correct pairs
610 to_create.push_back(object_data(args1.odata.min_kdata,
611 key_data(write1_map.rbegin()->first),
612 o1w,write1_map));
613 to_create.push_back(object_data( key_data(write1_map.rbegin()->first),
614 args2.odata.max_kdata, o2w, write2_map));
615 ops.push_back(make_pair(
616 pair<int, string>(MAKE_OBJECT, o1w),
617 &create[0]));
618 ops.push_back(make_pair(
619 pair<int, string>(MAKE_OBJECT, o2w),
620 &create[1]));
621 }
622
623 to_delete.push_back(object_data(args1.odata.min_kdata,
624 args1.odata.max_kdata, args1.odata.name, args1.odata.version));
625 to_delete.push_back(object_data(args2.odata.min_kdata,
626 args2.odata.max_kdata, args2.odata.name, args2.odata.version));
627 for (int i = 1; i < 6; i++) {
628 ops.push_back(make_pair(make_pair(0,""), &other_ops[i]));
629 }
630
631 index_data out_data;
632 set_up_prefix_index(to_create, to_delete, &other_ops[0], &out_data, &err);
633 set_up_ops(to_create, to_delete, &ops, out_data, &err);
634
635 //at this point, all operations should be completely set up.
636 /////BEGIN CRITICAL SECTION/////
637 err = perform_ops("\t\t" + client_name + "-rebalance:", out_data, &ops);
638 if (err < 0) {
639 return err;
640 }
641 icache_lock.Lock();
642 for (vector<delete_data>::iterator it = out_data.to_delete.begin();
643 it != out_data.to_delete.end(); ++it) {
644 icache.erase(it->max);
645 }
646 for (vector<create_data>::iterator it = out_data.to_create.begin();
647 it != out_data.to_create.end(); ++it) {
648 icache.push(index_data(*it));
649 }
650 icache_lock.Unlock();
651 if (verbose) cout << "\t\t" << client_name << "-rebalance: done rebalancing."
652 << std::endl;
653 /////END CRITICAL SECTION/////
654 return err;
655}
656
657int KvFlatBtreeAsync::read_object(const string &obj, object_data * odata) {
658 librados::ObjectReadOperation get_obj;
659 librados::AioCompletion * obj_aioc = rados.aio_create_completion();
660 int err;
661 bufferlist unw_bl;
662 odata->name = obj;
663 get_obj.omap_get_vals2("", LONG_MAX, &odata->omap, nullptr, &err);
664 get_obj.getxattr("unwritable", &unw_bl, &err);
665 io_ctx.aio_operate(obj, obj_aioc, &get_obj, NULL);
666 obj_aioc->wait_for_safe();
667 err = obj_aioc->get_return_value();
668 if (err < 0){
669 //possibly -ENOENT, meaning someone else deleted it.
670 obj_aioc->release();
671 return err;
672 }
673 odata->unwritable = string(unw_bl.c_str(), unw_bl.length()) == "1";
674 odata->version = obj_aioc->get_version64();
675 odata->size = odata->omap.size();
676 obj_aioc->release();
677 return 0;
678}
679
680int KvFlatBtreeAsync::read_object(const string &obj, rebalance_args * args) {
681 bufferlist inbl;
682 args->encode(inbl);
683 bufferlist outbl;
684 int err;
685 librados::AioCompletion * a = rados.aio_create_completion();
686 io_ctx.aio_exec(obj, a, "kvs", "maybe_read_for_balance", inbl, &outbl);
687 a->wait_for_safe();
688 err = a->get_return_value();
689 if (err < 0) {
690 if (verbose) cout << "\t\t" << client_name
691 << "-read_object: reading failed with "
692 << err << std::endl;
693 a->release();
694 return err;
695 }
696 bufferlist::iterator it = outbl.begin();
697 args->decode(it);
698 args->odata.name = obj;
699 args->odata.version = a->get_version64();
700 a->release();
701 return err;
702}
703
704void KvFlatBtreeAsync::set_up_prefix_index(
705 const vector<object_data> &to_create,
706 const vector<object_data> &to_delete,
707 librados::ObjectWriteOperation * owo,
708 index_data * idata,
709 int * err) {
710 std::map<std::string, pair<bufferlist, int> > assertions;
711 map<string, bufferlist> to_insert;
712 idata->prefix = "1";
713 idata->ts = ceph_clock_now();
714 for(vector<object_data>::const_iterator it = to_create.begin();
715 it != to_create.end();
716 ++it) {
717 create_data c(it->min_kdata, it->max_kdata, it->name);
718 idata->to_create.push_back(c);
719 }
720 for(vector<object_data>::const_iterator it = to_delete.begin();
721 it != to_delete.end();
722 ++it) {
723 delete_data d(it->min_kdata, it->max_kdata, it->name, it->version);
724 idata->to_delete.push_back(d);
725 }
726 for(vector<object_data>::const_iterator it = to_delete.begin();
727 it != to_delete.end();
728 ++it) {
729 idata->obj = it->name;
730 idata->min_kdata = it->min_kdata;
731 idata->kdata = it->max_kdata;
732 bufferlist insert;
733 idata->encode(insert);
734 to_insert[it->max_kdata.encoded()] = insert;
735 index_data this_entry;
736 this_entry.min_kdata = idata->min_kdata;
737 this_entry.kdata = idata->kdata;
738 this_entry.obj = idata->obj;
739 assertions[it->max_kdata.encoded()] = pair<bufferlist, int>
740 (to_bl(this_entry), CEPH_OSD_CMPXATTR_OP_EQ);
741 if (verbose) cout << "\t\t\t" << client_name
742 << "-setup_prefix: will assert "
743 << this_entry.str() << std::endl;
744 }
745 assert(*err == 0);
746 owo->omap_cmp(assertions, err);
747 if (to_create.size() <= 2) {
748 owo->omap_set(to_insert);
749 }
750}
751
752//some args can be null if there are no corresponding entries in p
753void KvFlatBtreeAsync::set_up_ops(
754 const vector<object_data> &create_vector,
755 const vector<object_data> &delete_vector,
756 vector<pair<pair<int, string>, librados::ObjectWriteOperation*> > * ops,
757 const index_data &idata,
758 int * err) {
759 vector<pair<pair<int, string>,
760 librados::ObjectWriteOperation* > >::iterator it;
761
762 //skip the prefixing part
763 for(it = ops->begin(); it->first.first == ADD_PREFIX; ++it) {}
764 map<string, bufferlist> to_insert;
765 std::set<string> to_remove;
766 map<string, pair<bufferlist, int> > assertions;
767 if (create_vector.size() > 0) {
768 for (int i = 0; i < (int)idata.to_delete.size(); ++i) {
769 it->first = pair<int, string>(UNWRITE_OBJECT, idata.to_delete[i].obj);
770 set_up_unwrite_object(delete_vector[i].version, it->second);
771 ++it;
772 }
773 }
774 for (int i = 0; i < (int)idata.to_create.size(); ++i) {
775 index_data this_entry(idata.to_create[i].max, idata.to_create[i].min,
776 idata.to_create[i].obj);
777 to_insert[idata.to_create[i].max.encoded()] = to_bl(this_entry);
778 if (idata.to_create.size() <= 2) {
779 it->first = pair<int, string>(MAKE_OBJECT, idata.to_create[i].obj);
780 } else {
781 it->first = pair<int, string>(AIO_MAKE_OBJECT, idata.to_create[i].obj);
782 }
783 set_up_make_object(create_vector[i].omap, it->second);
784 ++it;
785 }
786 for (int i = 0; i < (int)idata.to_delete.size(); ++i) {
787 index_data this_entry = idata;
788 this_entry.obj = idata.to_delete[i].obj;
789 this_entry.min_kdata = idata.to_delete[i].min;
790 this_entry.kdata = idata.to_delete[i].max;
791 if (verbose) cout << "\t\t\t" << client_name << "-setup_ops: will assert "
792 << this_entry.str() << std::endl;
793 assertions[idata.to_delete[i].max.encoded()] = pair<bufferlist, int>(
794 to_bl(this_entry), CEPH_OSD_CMPXATTR_OP_EQ);
795 to_remove.insert(idata.to_delete[i].max.encoded());
796 it->first = pair<int, string>(REMOVE_OBJECT, idata.to_delete[i].obj);
797 set_up_delete_object(it->second);
798 ++it;
799 }
800 if ((int)idata.to_create.size() <= 2) {
801 it->second->omap_cmp(assertions, err);
802 }
803 it->second->omap_rm_keys(to_remove);
804 it->second->omap_set(to_insert);
805
806
807 it->first = pair<int, string>(REMOVE_PREFIX, index_name);
808}
809
810void KvFlatBtreeAsync::set_up_make_object(
811 const map<std::string, bufferlist> &to_set,
812 librados::ObjectWriteOperation *owo) {
813 bufferlist inbl;
814 ::encode(to_set, inbl);
815 owo->exec("kvs", "create_with_omap", inbl);
816}
817
818void KvFlatBtreeAsync::set_up_unwrite_object(
819 const int &ver, librados::ObjectWriteOperation *owo) {
820 if (ver > 0) {
821 owo->assert_version(ver);
822 }
823 owo->cmpxattr("unwritable", CEPH_OSD_CMPXATTR_OP_EQ, to_bl("0"));
824 owo->setxattr("unwritable", to_bl("1"));
825}
826
827void KvFlatBtreeAsync::set_up_restore_object(
828 librados::ObjectWriteOperation *owo) {
829 owo->cmpxattr("unwritable", CEPH_OSD_CMPXATTR_OP_EQ, to_bl("1"));
830 owo->setxattr("unwritable", to_bl("0"));
831}
832
833void KvFlatBtreeAsync::set_up_delete_object(
834 librados::ObjectWriteOperation *owo) {
835 owo->cmpxattr("unwritable", CEPH_OSD_CMPXATTR_OP_EQ, to_bl("1"));
836 owo->remove();
837}
838
839int KvFlatBtreeAsync::perform_ops(const string &debug_prefix,
840 const index_data &idata,
841 vector<pair<pair<int, string>, librados::ObjectWriteOperation*> > *ops) {
842 int err = 0;
843 vector<librados::AioCompletion*> aiocs(idata.to_create.size());
844 int count = 0;
845 for (vector<pair<pair<int, string>,
846 librados::ObjectWriteOperation*> >::iterator it = ops->begin();
847 it != ops->end(); ++it) {
848 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) {
849 return -ESUICIDE;
850 }
851 switch (it->first.first) {
852 case ADD_PREFIX://prefixing
853 if (verbose) cout << debug_prefix << " adding prefix" << std::endl;
854 err = io_ctx.operate(index_name, it->second);
855 if (err < 0) {
856 if (verbose) cout << debug_prefix << " prefixing the index failed with "
857 << err << std::endl;
858 return -EPREFIX;
859 }
860 if (verbose) cout << debug_prefix << " prefix added." << std::endl;
861 break;
862 case UNWRITE_OBJECT://marking
863 if (verbose) cout << debug_prefix << " marking " << it->first.second
864 << std::endl;
865 err = io_ctx.operate(it->first.second, it->second);
866 if (err < 0) {
867 //most likely because it changed, in which case it will be -ERANGE
868 if (verbose) cout << debug_prefix << " marking " << it->first.second
869 << "failed with code" << err << std::endl;
870 if (it->first.second == (*idata.to_delete.begin()).max.encoded()) {
871 if (cleanup(idata, -EFIRSTOBJ) == -ESUICIDE) {
872 return -ESUICIDE;
873 }
874 } else {
875 if (cleanup(idata, -ERANGE) == -ESUICIDE) {
876 return -ESUICIDE;
877 }
878 }
879 return err;
880 }
881 if (verbose) cout << debug_prefix << " marked " << it->first.second
882 << std::endl;
883 break;
884 case MAKE_OBJECT://creating
885 if (verbose) cout << debug_prefix << " creating " << it->first.second
886 << std::endl;
887 err = io_ctx.operate(it->first.second, it->second);
888 if (err < 0) {
889 //this can happen if someone else was cleaning up after us.
890 if (verbose) cout << debug_prefix << " creating " << it->first.second
891 << " failed"
892 << " with code " << err << std::endl;
893 if (err == -EEXIST) {
894 //someone thinks we died, so die
895 if (verbose) cout << client_name << " is suiciding!" << std::endl;
896 return -ESUICIDE;
897 } else {
898 assert(false);
899 }
900 return err;
901 }
902 if (verbose || idata.to_create.size() > 2) {
903 cout << debug_prefix << " created object " << it->first.second
904 << std::endl;
905 }
906 break;
907 case AIO_MAKE_OBJECT:
908 cout << debug_prefix << " launching asynchronous create "
909 << it->first.second << std::endl;
910 aiocs[count] = rados.aio_create_completion();
911 io_ctx.aio_operate(it->first.second, aiocs[count], it->second);
912 count++;
913 if ((int)idata.to_create.size() == count) {
914 cout << "starting aiowrite waiting loop" << std::endl;
915 for (count -= 1; count >= 0; count--) {
916 aiocs[count]->wait_for_safe();
917 err = aiocs[count]->get_return_value();
918 if (err < 0) {
919 //this can happen if someone else was cleaning up after us.
920 cerr << debug_prefix << " a create failed"
921 << " with code " << err << std::endl;
922 if (err == -EEXIST) {
923 //someone thinks we died, so die
924 cerr << client_name << " is suiciding!" << std::endl;
925 return -ESUICIDE;
926 } else {
927 assert(false);
928 }
929 return err;
930 }
931 if (verbose || idata.to_create.size() > 2) {
932 cout << debug_prefix << " completed aio " << aiocs.size() - count
933 << "/" << aiocs.size() << std::endl;
934 }
935 }
936 }
937 break;
938 case REMOVE_OBJECT://deleting
939 if (verbose) cout << debug_prefix << " deleting " << it->first.second
940 << std::endl;
941 err = io_ctx.operate(it->first.second, it->second);
942 if (err < 0) {
943 //if someone else called cleanup on this prefix first
944 if (verbose) cout << debug_prefix << " deleting " << it->first.second
945 << "failed with code" << err << std::endl;
946 }
947 if (verbose) cout << debug_prefix << " deleted " << it->first.second
948 << std::endl;
949 break;
950 case REMOVE_PREFIX://rewriting index
951 if (verbose) cout << debug_prefix << " updating index " << std::endl;
952 err = io_ctx.operate(index_name, it->second);
953 if (err < 0) {
954 if (verbose) cout << debug_prefix
955 << " rewriting the index failed with code " << err
956 << ". someone else must have thought we died, so dying" << std::endl;
957 return -ETIMEDOUT;
958 }
959 if (verbose) cout << debug_prefix << " updated index." << std::endl;
960 break;
961 case RESTORE_OBJECT:
962 if (verbose) cout << debug_prefix << " restoring " << it->first.second
963 << std::endl;
964 err = io_ctx.operate(it->first.second, it->second);
965 if (err < 0) {
966 if (verbose) cout << debug_prefix << "restoring " << it->first.second
967 << " failed"
968 << " with " << err << std::endl;
969 return err;
970 }
971 if (verbose) cout << debug_prefix << " restored " << it->first.second
972 << std::endl;
973 break;
974 default:
975 if (verbose) cout << debug_prefix << " performing unknown op on "
976 << it->first.second
977 << std::endl;
978 err = io_ctx.operate(index_name, it->second);
979 if (err < 0) {
980 if (verbose) cout << debug_prefix << " unknown op on "
981 << it->first.second
982 << " failed with " << err << std::endl;
983 return err;
984 }
985 if (verbose) cout << debug_prefix << " unknown op on "
986 << it->first.second
987 << " succeeded." << std::endl;
988 break;
989 }
990 }
991
992 return err;
993}
994
995int KvFlatBtreeAsync::cleanup(const index_data &idata, const int &error) {
996 if (verbose) cout << "\t\t" << client_name << ": cleaning up after "
997 << idata.str()
998 << std::endl;
999 int err = 0;
1000 assert(idata.prefix != "");
1001 map<std::string,bufferlist> new_index;
1002 map<std::string, pair<bufferlist, int> > assertions;
1003 switch (error) {
1004 case -EFIRSTOBJ: {
1005 //this happens if the split or rebalance failed to mark the first object,
1006 //meaning only the index needs to be changed.
1007 //restore objects that had been marked unwritable.
1008 for(vector<delete_data >::const_iterator it =
1009 idata.to_delete.begin();
1010 it != idata.to_delete.end(); ++it) {
1011 index_data this_entry;
1012 this_entry.obj = (*it).obj;
1013 this_entry.min_kdata = it->min;
1014 this_entry.kdata = it->max;
1015 new_index[it->max.encoded()] = to_bl(this_entry);
1016 this_entry = idata;
1017 this_entry.obj = it->obj;
1018 this_entry.min_kdata = it->min;
1019 this_entry.kdata = it->max;
1020 if (verbose) cout << "\t\t\t" << client_name
1021 << "-cleanup: will assert index contains "
1022 << this_entry.str() << std::endl;
1023 assertions[it->max.encoded()] =
1024 pair<bufferlist, int>(to_bl(this_entry),
1025 CEPH_OSD_CMPXATTR_OP_EQ);
1026 }
1027
1028 //update the index
1029 librados::ObjectWriteOperation update_index;
1030 update_index.omap_cmp(assertions, &err);
1031 update_index.omap_set(new_index);
1032 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: updating index"
1033 << std::endl;
1034 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) {
1035 return -ESUICIDE;
1036 }
1037 err = io_ctx.operate(index_name, &update_index);
1038 if (err < 0) {
1039 if (verbose) cout << "\t\t\t" << client_name
1040 << "-cleanup: rewriting failed with "
1041 << err << ". returning -ECANCELED" << std::endl;
1042 return -ECANCELED;
1043 }
1044 if (verbose) cout << "\t\t\t" << client_name
1045 << "-cleanup: updated index. cleanup done."
1046 << std::endl;
1047 break;
1048 }
1049 case -ERANGE: {
1050 //this happens if a split or rebalance fails to mark an object. It is a
1051 //special case of rolling back that does not have to deal with new objects.
1052
1053 //restore objects that had been marked unwritable.
1054 vector<delete_data >::const_iterator it;
1055 for(it = idata.to_delete.begin();
1056 it != idata.to_delete.end(); ++it) {
1057 index_data this_entry;
1058 this_entry.obj = (*it).obj;
1059 this_entry.min_kdata = it->min;
1060 this_entry.kdata = it->max;
1061 new_index[it->max.encoded()] = to_bl(this_entry);
1062 this_entry = idata;
1063 this_entry.obj = it->obj;
1064 this_entry.min_kdata = it->min;
1065 this_entry.kdata = it->max;
1066 if (verbose) cout << "\t\t\t" << client_name
1067 << "-cleanup: will assert index contains "
1068 << this_entry.str() << std::endl;
1069 assertions[it->max.encoded()] =
1070 pair<bufferlist, int>(to_bl(this_entry),
1071 CEPH_OSD_CMPXATTR_OP_EQ);
1072 }
1073 it = idata.to_delete.begin();
1074 librados::ObjectWriteOperation restore;
1075 set_up_restore_object(&restore);
1076 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) {
1077 return -ESUICIDE;
1078 }
1079 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: restoring "
1080 << it->obj
1081 << std::endl;
1082 err = io_ctx.operate(it->obj, &restore);
1083 if (err < 0) {
1084 //i.e., -ECANCELED because the object was already restored by someone
1085 //else
1086 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: restoring "
1087 << it->obj
1088 << " failed with " << err << std::endl;
1089 } else {
1090 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: restored "
1091 << it->obj
1092 << std::endl;
1093 }
1094
1095 //update the index
1096 librados::ObjectWriteOperation update_index;
1097 update_index.omap_cmp(assertions, &err);
1098 update_index.omap_set(new_index);
1099 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: updating index"
1100 << std::endl;
1101 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) {
1102 return -ESUICIDE;
1103 }
1104 err = io_ctx.operate(index_name, &update_index);
1105 if (err < 0) {
1106 if (verbose) cout << "\t\t\t" << client_name
1107 << "-cleanup: rewriting failed with "
1108 << err << ". returning -ECANCELED" << std::endl;
1109 return -ECANCELED;
1110 }
1111 if (verbose) cout << "\t\t\t" << client_name
1112 << "-cleanup: updated index. cleanup done."
1113 << std::endl;
1114 break;
1115 }
1116 case -ENOENT: {
1117 if (verbose) cout << "\t\t" << client_name << "-cleanup: rolling forward"
1118 << std::endl;
1119 //all changes were created except for updating the index and possibly
1120 //deleting the objects. roll forward.
1121 vector<pair<pair<int, string>, librados::ObjectWriteOperation*> > ops;
1122 vector<librados::ObjectWriteOperation> owos(idata.to_delete.size() + 1);
1123 for (int i = 0; i <= (int)idata.to_delete.size(); ++i) {
1124 ops.push_back(make_pair(pair<int, string>(0, ""), &owos[i]));
1125 }
1126 set_up_ops(vector<object_data>(),
1127 vector<object_data>(), &ops, idata, &err);
1128 err = perform_ops("\t\t" + client_name + "-cleanup:", idata, &ops);
1129 if (err < 0) {
1130 if (err == -ESUICIDE) {
1131 return -ESUICIDE;
1132 }
1133 if (verbose) cout << "\t\t\t" << client_name
1134 << "-cleanup: rewriting failed with "
1135 << err << ". returning -ECANCELED" << std::endl;
1136 return -ECANCELED;
1137 }
1138 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: updated index"
1139 << std::endl;
1140 break;
1141 }
1142 default: {
1143 //roll back all changes.
1144 if (verbose) cout << "\t\t" << client_name << "-cleanup: rolling back"
1145 << std::endl;
1146 map<std::string,bufferlist> new_index;
1147 std::set<string> to_remove;
1148 map<std::string, pair<bufferlist, int> > assertions;
1149
1150 //mark the objects to be created. if someone else already has, die.
1151 for(vector<create_data >::const_reverse_iterator it =
1152 idata.to_create.rbegin();
1153 it != idata.to_create.rend(); ++it) {
1154 librados::ObjectWriteOperation rm;
1155 set_up_unwrite_object(0, &rm);
1156 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 )
1157 {
1158 return -ESUICIDE;
1159 }
1160 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: marking "
1161 << it->obj
1162 << std::endl;
1163 err = io_ctx.operate(it->obj, &rm);
1164 if (err < 0) {
1165 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: marking "
1166 << it->obj
1167 << " failed with " << err << std::endl;
1168 } else {
1169 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: marked "
1170 << it->obj
1171 << std::endl;
1172 }
1173 }
1174
1175 //restore objects that had been marked unwritable.
1176 for(vector<delete_data >::const_iterator it =
1177 idata.to_delete.begin();
1178 it != idata.to_delete.end(); ++it) {
1179 index_data this_entry;
1180 this_entry.obj = (*it).obj;
1181 this_entry.min_kdata = it->min;
1182 this_entry.kdata = it->max;
1183 new_index[it->max.encoded()] = to_bl(this_entry);
1184 this_entry = idata;
1185 this_entry.obj = it->obj;
1186 this_entry.min_kdata = it->min;
1187 this_entry.kdata = it->max;
1188 if (verbose) cout << "\t\t\t" << client_name
1189 << "-cleanup: will assert index contains "
1190 << this_entry.str() << std::endl;
1191 assertions[it->max.encoded()] =
1192 pair<bufferlist, int>(to_bl(this_entry),
1193 CEPH_OSD_CMPXATTR_OP_EQ);
1194 librados::ObjectWriteOperation restore;
1195 set_up_restore_object(&restore);
1196 if (verbose) cout << "\t\t\t" << client_name
1197 << "-cleanup: will assert index contains "
1198 << this_entry.str() << std::endl;
1199 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 )
1200 {
1201 return -ESUICIDE;
1202 }
1203 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: restoring "
1204 << it->obj
1205 << std::endl;
1206 err = io_ctx.operate(it->obj, &restore);
1207 if (err == -ENOENT) {
1208 //it had gotten far enough to be rolled forward - unmark the objects
1209 //and roll forward.
1210 if (verbose) cout << "\t\t\t" << client_name
1211 << "-cleanup: roll forward instead"
1212 << std::endl;
1213 for(vector<create_data >::const_iterator cit =
1214 idata.to_create.begin();
1215 cit != idata.to_create.end(); ++cit) {
1216 librados::ObjectWriteOperation res;
1217 set_up_restore_object(&res);
1218 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)()
1219 == 1 ) {
1220 return -ECANCELED;
1221 }
1222 if (verbose) cout << "\t\t\t" << client_name
1223 << "-cleanup: restoring " << cit->obj
1224 << std::endl;
1225 err = io_ctx.operate(cit->obj, &res);
1226 if (err < 0) {
1227 if (verbose) cout << "\t\t\t" << client_name
1228 << "-cleanup: restoring "
1229 << cit->obj << " failed with " << err << std::endl;
1230 }
1231 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: restored "
1232 << cit->obj
1233 << std::endl;
1234 }
1235 return cleanup(idata, -ENOENT);
1236 } else if (err < 0) {
1237 //i.e., -ECANCELED because the object was already restored by someone
1238 //else
1239 if (verbose) cout << "\t\t\t" << client_name
1240 << "-cleanup: restoring " << it->obj
1241 << " failed with " << err << std::endl;
1242 } else {
1243 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: restored "
1244 << it->obj
1245 << std::endl;
1246 }
1247 }
1248
1249 //remove the new objects
1250 for(vector<create_data >::const_reverse_iterator it =
1251 idata.to_create.rbegin();
1252 it != idata.to_create.rend(); ++it) {
1253 to_remove.insert(it->max.encoded());
1254 librados::ObjectWriteOperation rm;
1255 rm.remove();
1256 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 )
1257 {
1258 return -ESUICIDE;
1259 }
1260 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: removing "
1261 << it->obj
1262 << std::endl;
1263 err = io_ctx.operate(it->obj, &rm);
1264 if (err < 0) {
1265 if (verbose) cout << "\t\t\t" << client_name
1266 << "-cleanup: failed to remove "
1267 << it->obj << std::endl;
1268 } else {
1269 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: removed "
1270 << it->obj
1271 << std::endl;
1272 }
1273 }
1274
1275 //update the index
1276 librados::ObjectWriteOperation update_index;
1277 update_index.omap_cmp(assertions, &err);
1278 update_index.omap_rm_keys(to_remove);
1279 update_index.omap_set(new_index);
1280 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: updating index"
1281 << std::endl;
1282 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) {
1283 return -ESUICIDE;
1284 }
1285 err = io_ctx.operate(index_name, &update_index);
1286 if (err < 0) {
1287 if (verbose) cout << "\t\t\t" << client_name
1288 << "-cleanup: rewriting failed with "
1289 << err << ". returning -ECANCELED" << std::endl;
1290 return -ECANCELED;
1291 }
1292 if (verbose) cout << "\t\t\t" << client_name
1293 << "-cleanup: updated index. cleanup done."
1294 << std::endl;
1295 break;
1296 }
1297 }
1298 return err;
1299}
1300
1301string KvFlatBtreeAsync::to_string(string s, int i) {
1302 stringstream ret;
1303 ret << s << i;
1304 return ret.str();
1305}
1306
1307string KvFlatBtreeAsync::get_name() {
1308 return rados_id;
1309}
1310
1311void KvFlatBtreeAsync::set_inject(injection_t inject, int wait_time) {
1312 interrupt = inject;
1313 wait_ms = wait_time;
1314}
1315
1316int KvFlatBtreeAsync::setup(int argc, const char** argv) {
1317 int r = rados.init(rados_id.c_str());
1318 if (r < 0) {
1319 cerr << "error during init" << r << std::endl;
1320 return r;
1321 }
1322 r = rados.conf_parse_argv(argc, argv);
1323 if (r < 0) {
1324 cerr << "error during parsing args" << r << std::endl;
1325 return r;
1326 }
1327 r = rados.conf_parse_env(NULL);
1328 if (r < 0) {
1329 cerr << "error during parsing env" << r << std::endl;
1330 return r;
1331 }
1332 r = rados.conf_read_file(NULL);
1333 if (r < 0) {
1334 cerr << "error during read file: " << r << std::endl;
1335 return r;
1336 }
1337 r = rados.connect();
1338 if (r < 0) {
1339 cerr << "error during connect: " << r << std::endl;
1340 return r;
1341 }
1342 r = rados.ioctx_create(pool_name.c_str(), io_ctx);
1343 if (r < 0) {
1344 cerr << "error creating io ctx: " << r << std::endl;
1345 rados.shutdown();
1346 return r;
1347 }
1348
1349 librados::ObjectWriteOperation make_index;
1350 make_index.create(true);
1351 map<std::string,bufferlist> index_map;
1352 index_data idata;
1353 idata.obj = client_name;
1354 idata.min_kdata.raw_key = "";
1355 idata.kdata = key_data("");
1356 index_map["1"] = to_bl(idata);
1357 make_index.omap_set(index_map);
1358 r = io_ctx.operate(index_name, &make_index);
1359 if (r < 0) {
1360 if (verbose) cout << client_name << ": Making the index failed with code "
1361 << r
1362 << std::endl;
1363 return 0;
1364 }
1365 if (verbose) cout << client_name << ": created index object" << std::endl;
1366
1367 librados::ObjectWriteOperation make_max_obj;
1368 make_max_obj.create(true);
1369 make_max_obj.setxattr("unwritable", to_bl("0"));
1370 make_max_obj.setxattr("size", to_bl("0"));
1371 r = io_ctx.operate(client_name, &make_max_obj);
1372 if (r < 0) {
1373 if (verbose) cout << client_name << ": Setting xattr failed with code "
1374 << r
1375 << std::endl;
1376 }
1377
1378 return 0;
1379}
1380
1381int KvFlatBtreeAsync::set(const string &key, const bufferlist &val,
1382 bool update_on_existing) {
1383 if (verbose) cout << client_name << " is "
1384 << (update_on_existing? "updating " : "setting ")
1385 << key << std::endl;
1386 int err = 0;
1387 utime_t mytime;
1388 index_data idata(key);
1389
1390 if (verbose) cout << "\t" << client_name << ": finding oid" << std::endl;
1391 err = read_index(key, &idata, NULL, false);
1392 if (err < 0) {
1393 if (verbose) cout << "\t" << client_name
1394 << ": getting oid failed with code "
1395 << err << std::endl;
1396 return err;
1397 }
1398 if (verbose) cout << "\t" << client_name << ": index data is " << idata.str()
1399 << ", object is " << idata.obj << std::endl;
1400
1401 err = set_op(key, val, update_on_existing, idata);
1402
1403 if (verbose) cout << "\t" << client_name << ": finished set with " << err
1404 << std::endl;
1405 return err;
1406}
1407
1408int KvFlatBtreeAsync::set_op(const string &key, const bufferlist &val,
1409 bool update_on_existing, index_data &idata) {
1410 //write
1411
1412 bufferlist inbl;
1413 omap_set_args args;
1414 args.bound = 2 * k;
1415 args.exclusive = !update_on_existing;
1416 args.omap[key] = val;
1417 args.encode(inbl);
1418
1419 librados::ObjectWriteOperation owo;
1420 owo.exec("kvs", "omap_insert", inbl);
1421 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) {
1422 if (verbose) cout << client_name << " IS SUICIDING!" << std::endl;
1423 return -ESUICIDE;
1424 }
1425 if (verbose) cout << "\t" << client_name << ": inserting " << key
1426 << " into object "
1427 << idata.obj << std::endl;
1428 int err = io_ctx.operate(idata.obj, &owo);
1429 if (err < 0) {
1430 switch (err) {
1431 case -EEXIST: {
1432 //the key already exists and this is an exclusive insert.
1433 cerr << "\t" << client_name << ": writing key failed with "
1434 << err << std::endl;
1435 return err;
1436 }
1437 case -EKEYREJECTED: {
1438 //the object needs to be split.
1439 do {
1440 if (verbose) cout << "\t" << client_name << ": running split on "
1441 << idata.obj
1442 << std::endl;
1443 err = read_index(key, &idata, NULL, true);
1444 if (err < 0) {
1445 if (verbose) cout << "\t" << client_name
1446 << ": getting oid failed with code "
1447 << err << std::endl;
1448 return err;
1449 }
1450 err = split(idata);
1451 if (err < 0 && err != -ENOENT && err != -EBALANCE) {
1452 if (verbose) cerr << "\t" << client_name << ": split failed with "
1453 << err << std::endl;
1454 int ret = handle_set_rm_errors(err, idata.obj, key, &idata, NULL);
1455 switch (ret) {
1456 case -ESUICIDE:
1457 if (verbose) cout << client_name << " IS SUICIDING!" << std::endl;
1458 return ret;
1459 break;
1460 case 1:
1461 return set_op(key, val, update_on_existing, idata);
1462 break;
1463 case 2:
1464 return err;
1465 break;
1466 }
1467 }
1468 } while (err < 0 && err != -EBALANCE && err != -ENOENT);
1469 err = read_index(key, &idata, NULL, true);
1470 if (err < 0) {
1471 if (verbose) cout << "\t" << client_name
1472 << ": getting oid failed with code "
1473 << err << std::endl;
1474 return err;
1475 }
1476 return set_op(key, val, update_on_existing, idata);
1477 }
1478 default:
1479 if (verbose) cerr << "\t" << client_name << ": writing obj failed with "
1480 << err << std::endl;
1481 if (err == -ENOENT || err == -EACCES) {
1482 if (err == -ENOENT) {
1483 if (verbose) cout << "CACHE FAILURE" << std::endl;
1484 }
1485 err = read_index(key, &idata, NULL, true);
1486 if (err < 0) {
1487 if (verbose) cout << "\t" << client_name
1488 << ": getting oid failed with code "
1489 << err << std::endl;
1490 return err;
1491 }
1492 if (verbose) cout << "\t" << client_name << ": index data is "
1493 << idata.str()
1494 << ", object is " << idata.obj << std::endl;
1495 return set_op(key, val, update_on_existing, idata);
1496 } else {
1497 return err;
1498 }
1499 }
1500 }
1501 return 0;
1502}
1503
1504int KvFlatBtreeAsync::remove(const string &key) {
1505 if (verbose) cout << client_name << ": removing " << key << std::endl;
1506 int err = 0;
1507 string obj;
1508 utime_t mytime;
1509 index_data idata;
1510 index_data next_idata;
1511
1512 if (verbose) cout << "\t" << client_name << ": finding oid" << std::endl;
1513 err = read_index(key, &idata, &next_idata, false);
1514 if (err < 0) {
1515 if (verbose) cout << "getting oid failed with code " << err << std::endl;
1516 return err;
1517 }
1518 obj = idata.obj;
1519 if (verbose) cout << "\t" << client_name << ": idata is " << idata.str()
1520 << ", next_idata is " << next_idata.str()
1521 << ", obj is " << obj << std::endl;
1522
1523 err = remove_op(key, idata, next_idata);
1524
1525 if (verbose) cout << "\t" << client_name << ": finished remove with " << err
1526 << " and exiting" << std::endl;
1527 return err;
1528}
1529
1530int KvFlatBtreeAsync::remove_op(const string &key, index_data &idata,
1531 index_data &next_idata) {
1532 //write
1533 bufferlist inbl;
1534 omap_rm_args args;
1535 args.bound = k;
1536 args.omap.insert(key);
1537 args.encode(inbl);
1538
1539 librados::ObjectWriteOperation owo;
1540 owo.exec("kvs", "omap_remove", inbl);
1541 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) {
1542 if (verbose) cout << client_name << " IS SUICIDING!" << std::endl;
1543 return -ESUICIDE;
1544 }
1545 if (verbose) cout << "\t" << client_name << ": removing " << key << " from "
1546 << idata.obj
1547 << std::endl;
1548 int err = io_ctx.operate(idata.obj, &owo);
1549 if (err < 0) {
1550 if (verbose) cout << "\t" << client_name << ": writing obj failed with "
1551 << err << std::endl;
1552 switch (err) {
1553 case -ENODATA: {
1554 //the key does not exist in the object
1555 return err;
1556 }
1557 case -EKEYREJECTED: {
1558 //the object needs to be split.
1559 do {
1560 if (verbose) cerr << "\t" << client_name << ": running rebalance on "
1561 << idata.obj << std::endl;
1562 err = read_index(key, &idata, &next_idata, true);
1563 if (err < 0) {
1564 if (verbose) cout << "\t" << client_name
1565 << ": getting oid failed with code "
1566 << err << std::endl;
1567 return err;
1568 }
1569 err = rebalance(idata, next_idata);
1570 if (err < 0 && err != -ENOENT && err != -EBALANCE) {
1571 if (verbose) cerr << "\t" << client_name << ": rebalance returned "
1572 << err << std::endl;
1573 int ret = handle_set_rm_errors(err, idata.obj, key, &idata,
1574 &next_idata);
1575 switch (ret) {
1576 case -ESUICIDE:
1577 if (verbose) cout << client_name << " IS SUICIDING!" << std::endl;
1578 return err;
1579 break;
1580 case 1:
1581 return remove_op(key, idata, next_idata);
1582 break;
1583 case 2:
1584 return err;
1585 break;
1586 case -EUCLEAN:
1587 //this is the only node, so it's ok to go below k.
1588 librados::ObjectWriteOperation owo;
1589 bufferlist inbl;
1590 omap_rm_args args;
1591 args.bound = 0;
1592 args.omap.insert(key);
1593 args.encode(inbl);
1594 owo.exec("kvs", "omap_remove", inbl);
1595 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)()
1596 == 1 ) {
1597 if (verbose) cout << client_name << " IS SUICIDING!"
1598 << std::endl;
1599 return -ESUICIDE;
1600 }
1601 if (verbose) cout << "\t" << client_name << ": removing " << key
1602 << " from "
1603 << idata.obj
1604 << std::endl;
1605 int err = io_ctx.operate(idata.obj, &owo);
1606 if (err == 0) {
1607 return 0;
1608 }
1609 }
1610 }
1611 } while (err < 0 && err != -EBALANCE && err != -ENOENT);
1612 err = read_index(key, &idata, &next_idata, true);
1613 if (err < 0) {
1614 if (verbose) cout << "\t" << client_name
1615 << ": getting oid failed with code "
1616 << err << std::endl;
1617 return err;
1618 }
1619 return remove(key);
1620 }
1621 default:
1622 if (err == -ENOENT || err == -EACCES) {
1623 err = read_index(key, &idata, &next_idata, true);
1624 if (err < 0) {
1625 if (verbose) cout << "\t" << client_name
1626 << ": getting oid failed with code "
1627 << err << std::endl;
1628 return err;
1629 }
1630 if (verbose) cout << "\t" << client_name << ": index data is "
1631 << idata.str()
1632 << ", object is " << idata.obj << std::endl;
1633 //idea: we read the time every time we read the index anyway - store it.
1634 return remove_op(key, idata, next_idata);
1635 } else {
1636 return err;
1637 }
1638 }
1639 }
1640 return 0;
1641}
1642
1643int KvFlatBtreeAsync::handle_set_rm_errors(int &err, string obj,
1644 string key,
1645 index_data * idata, index_data * next_idata) {
1646 if (err == -ESUICIDE) {
1647 return err;
1648 } else if (err == -ECANCELED //if an object was unwritable or index changed
1649 || err == -EPREFIX //if there is currently a prefix
1650 || err == -ETIMEDOUT// if the index changes during the op - i.e. cleanup
1651 || err == -EACCES) //possible if we were acting on old index data
1652 {
1653 err = read_index(key, idata, next_idata, true);
1654 if (err < 0) {
1655 return err;
1656 }
1657 if (verbose) cout << "\t" << client_name << ": prefix is " << idata->str()
1658 << std::endl;
1659 if (idata->obj != obj) {
1660 //someone else has split or cleaned up or something. start over.
1661 return 1;//meaning repeat
1662 }
1663 } else if (err != -ETIMEDOUT && err != -ERANGE && err != -EACCES
1664 && err != -EUCLEAN){
1665 if (verbose) cout << "\t" << client_name
1666 << ": split encountered an unexpected error: " << err
1667 << std::endl;
1668 return 2;
1669 }
1670 return err;
1671}
1672
1673int KvFlatBtreeAsync::get(const string &key, bufferlist *val) {
1674 opmap['g']++;
1675 if (verbose) cout << client_name << ": getting " << key << std::endl;
1676 int err = 0;
1677 index_data idata;
1678 utime_t mytime;
1679
1680 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) {
1681 return -ESUICIDE;
1682 }
1683 err = read_index(key, &idata, NULL, false);
1684 mytime = ceph_clock_now();
1685 if (err < 0) {
1686 if (verbose) cout << "getting oid failed with code " << err << std::endl;
1687 return err;
1688 }
1689
1690 err = get_op(key, val, idata);
1691
1692 if (verbose) cout << client_name << ": got " << key << " with " << err
1693 << std::endl;
1694
1695 return err;
1696}
1697
1698int KvFlatBtreeAsync::get_op(const string &key, bufferlist *val,
1699 index_data &idata) {
1700 int err = 0;
1701 std::set<std::string> key_set;
1702 key_set.insert(key);
1703 map<std::string,bufferlist> omap;
1704 librados::ObjectReadOperation read;
1705 read.omap_get_vals_by_keys(key_set, &omap, &err);
1706 err = io_ctx.operate(idata.obj, &read, NULL);
1707 if (err < 0) {
1708 if (err == -ENOENT) {
1709 err = read_index(key, &idata, NULL, true);
1710 if (err < 0) {
1711 if (verbose) cout << "\t" << client_name
1712 << ": getting oid failed with code "
1713 << err << std::endl;
1714 return err;
1715 }
1716 if (verbose) cout << "\t" << client_name << ": index data is "
1717 << idata.str()
1718 << ", object is " << idata.obj << std::endl;
1719 return get_op(key, val, idata);
1720 } else {
1721 if (verbose) cout << client_name
1722 << ": get encountered an unexpected error: " << err
1723 << std::endl;
1724 return err;
1725 }
1726 }
1727
1728 *val = omap[key];
1729 return err;
1730}
1731
1732void *KvFlatBtreeAsync::pset(void *ptr) {
1733 struct aio_set_args *args = (struct aio_set_args *)ptr;
1734 *args->err =
1735 args->kvba->KvFlatBtreeAsync::set((string)args->key,
1736 (bufferlist)args->val, (bool)args->exc);
1737 args->cb(args->err, args->cb_args);
1738 delete args;
1739 return NULL;
1740}
1741
1742void KvFlatBtreeAsync::aio_set(const string &key, const bufferlist &val,
1743 bool exclusive, callback cb, void * cb_args, int * err) {
1744 aio_set_args *args = new aio_set_args();
1745 args->kvba = this;
1746 args->key = key;
1747 args->val = val;
1748 args->exc = exclusive;
1749 args->cb = cb;
1750 args->cb_args = cb_args;
1751 args->err = err;
1752 pthread_t t;
1753 int r = pthread_create(&t, NULL, pset, (void*)args);
1754 if (r < 0) {
1755 *args->err = r;
1756 return;
1757 }
1758 pthread_detach(t);
1759}
1760
1761void *KvFlatBtreeAsync::prm(void *ptr) {
1762 struct aio_rm_args *args = (struct aio_rm_args *)ptr;
1763 *args->err =
1764 args->kvba->KvFlatBtreeAsync::remove((string)args->key);
1765 args->cb(args->err, args->cb_args);
1766 delete args;
1767 return NULL;
1768}
1769
1770void KvFlatBtreeAsync::aio_remove(const string &key,
1771 callback cb, void * cb_args, int * err) {
1772 aio_rm_args * args = new aio_rm_args();
1773 args->kvba = this;
1774 args->key = key;
1775 args->cb = cb;
1776 args->cb_args = cb_args;
1777 args->err = err;
1778 pthread_t t;
1779 int r = pthread_create(&t, NULL, prm, (void*)args);
1780 if (r < 0) {
1781 *args->err = r;
1782 return;
1783 }
1784 pthread_detach(t);
1785}
1786
1787void *KvFlatBtreeAsync::pget(void *ptr) {
1788 struct aio_get_args *args = (struct aio_get_args *)ptr;
1789 *args->err =
1790 args->kvba->KvFlatBtreeAsync::get((string)args->key,
1791 (bufferlist *)args->val);
1792 args->cb(args->err, args->cb_args);
1793 delete args;
1794 return NULL;
1795}
1796
1797void KvFlatBtreeAsync::aio_get(const string &key, bufferlist *val,
1798 callback cb, void * cb_args, int * err) {
1799 aio_get_args * args = new aio_get_args();
1800 args->kvba = this;
1801 args->key = key;
1802 args->val = val;
1803 args->cb = cb;
1804 args->cb_args = cb_args;
1805 args->err = err;
1806 pthread_t t;
1807 int r = pthread_create(&t, NULL, pget, (void*)args);
1808 if (r < 0) {
1809 *args->err = r;
1810 return;
1811 }
1812 pthread_detach(t);
1813}
1814
1815int KvFlatBtreeAsync::set_many(const map<string, bufferlist> &in_map) {
1816 int err = 0;
1817 bufferlist inbl;
1818 bufferlist outbl;
1819 std::set<string> keys;
1820
1821 map<string, bufferlist> big_map;
1822 for (map<string, bufferlist>::const_iterator it = in_map.begin();
1823 it != in_map.end(); ++it) {
1824 keys.insert(it->first);
1825 big_map.insert(*it);
1826 }
1827
1828 if (verbose) cout << "created key set and big_map" << std::endl;
1829
1830 ::encode(keys, inbl);
1831 librados::AioCompletion * aioc = rados.aio_create_completion();
1832 io_ctx.aio_exec(index_name, aioc, "kvs", "read_many", inbl, &outbl);
1833 aioc->wait_for_safe();
1834 err = aioc->get_return_value();
1835 aioc->release();
1836 if (err < 0) {
1837 cerr << "getting index failed with " << err << std::endl;
1838 return err;
1839 }
1840
1841 map<string, bufferlist> imap;//read from the index
1842 bufferlist::iterator blit = outbl.begin();
1843 ::decode(imap, blit);
1844
1845 if (verbose) cout << "finished reading index for objects. there are "
1846 << imap.size() << " entries that need to be changed. " << std::endl;
1847
1848
1849 vector<object_data> to_delete;
1850
1851 vector<object_data> to_create;
1852
1853 if (verbose) cout << "setting up to_delete and to_create vectors from index "
1854 << "map" << std::endl;
1855 //set up to_delete from index map
1856 for (map<string, bufferlist>::iterator it = imap.begin(); it != imap.end();
1857 ++it){
1858 index_data idata;
1859 blit = it->second.begin();
1860 idata.decode(blit);
1861 to_delete.push_back(object_data(idata.min_kdata, idata.kdata, idata.obj));
1862 err = read_object(idata.obj, &to_delete[to_delete.size() - 1]);
1863 if (err < 0) {
1864 if (verbose) cout << "reading " << idata.obj << " failed with " << err
1865 << std::endl;
1866 return set_many(in_map);
1867 }
1868
1869 big_map.insert(to_delete[to_delete.size() - 1].omap.begin(),
1870 to_delete[to_delete.size() - 1].omap.end());
1871 }
1872
1873 to_create.push_back(object_data(
1874 to_string(client_name, client_index++)));
1875 to_create[0].min_kdata = to_delete[0].min_kdata;
1876
1877 for(map<string, bufferlist>::iterator it = big_map.begin();
1878 it != big_map.end(); ++it) {
1879 if (to_create[to_create.size() - 1].omap.size() == 1.5 * k) {
1880 to_create[to_create.size() - 1].max_kdata =
1881 key_data(to_create[to_create.size() - 1]
1882 .omap.rbegin()->first);
1883
1884 to_create.push_back(object_data(
1885 to_string(client_name, client_index++)));
1886 to_create[to_create.size() - 1].min_kdata =
1887 to_create[to_create.size() - 2].max_kdata;
1888 }
1889
1890 to_create[to_create.size() - 1].omap.insert(*it);
1891 }
1892 to_create[to_create.size() - 1].max_kdata =
1893 to_delete[to_delete.size() - 1].max_kdata;
1894
1895 vector<librados::ObjectWriteOperation> owos(2 + 2 * to_delete.size()
1896 + to_create.size());
1897 vector<pair<pair<int, string>, librados::ObjectWriteOperation*> > ops;
1898
1899
1900 index_data idata;
1901 set_up_prefix_index(to_create, to_delete, &owos[0], &idata, &err);
1902
1903 if (verbose) cout << "finished making to_create and to_delete. "
1904 << std::endl;
1905
1906 ops.push_back(make_pair(
1907 pair<int, string>(ADD_PREFIX, index_name),
1908 &owos[0]));
1909 for (int i = 1; i < 2 + 2 * (int)to_delete.size() + (int)to_create.size();
1910 i++) {
1911 ops.push_back(make_pair(make_pair(0,""), &owos[i]));
1912 }
1913
1914 set_up_ops(to_create, to_delete, &ops, idata, &err);
1915
1916 cout << "finished setting up ops. Starting critical section..." << std::endl;
1917
1918 /////BEGIN CRITICAL SECTION/////
1919 //put prefix on index entry for idata.val
1920 err = perform_ops("\t\t" + client_name + "-set_many:", idata, &ops);
1921 if (err < 0) {
1922 return set_many(in_map);
1923 }
1924 if (verbose) cout << "\t\t" << client_name << "-split: done splitting."
1925 << std::endl;
1926 /////END CRITICAL SECTION/////
1927 icache_lock.Lock();
1928 for (vector<delete_data>::iterator it = idata.to_delete.begin();
1929 it != idata.to_delete.end(); ++it) {
1930 icache.erase(it->max);
1931 }
1932 for (vector<create_data>::iterator it = idata.to_create.begin();
1933 it != idata.to_create.end(); ++it) {
1934 icache.push(index_data(*it));
1935 }
1936 icache_lock.Unlock();
1937 return err;
1938}
1939
1940int KvFlatBtreeAsync::remove_all() {
1941 if (verbose) cout << client_name << ": removing all" << std::endl;
1942 int err = 0;
1943 librados::ObjectReadOperation oro;
1944 librados::AioCompletion * oro_aioc = rados.aio_create_completion();
1945 std::map<std::string, bufferlist> index_set;
1946 oro.omap_get_vals2("",LONG_MAX,&index_set, nullptr, &err);
1947 err = io_ctx.aio_operate(index_name, oro_aioc, &oro, NULL);
1948 if (err < 0){
1949 if (err == -ENOENT) {
1950 return 0;
1951 }
1952 if (verbose) cout << "getting keys failed with error " << err << std::endl;
1953 return err;
1954 }
1955 oro_aioc->wait_for_safe();
1956 oro_aioc->release();
1957
1958 librados::ObjectWriteOperation rm_index;
1959 librados::AioCompletion * rm_index_aioc = rados.aio_create_completion();
1960 map<std::string,bufferlist> new_index;
1961 new_index["1"] = index_set["1"];
1962 rm_index.omap_clear();
1963 rm_index.omap_set(new_index);
1964 io_ctx.aio_operate(index_name, rm_index_aioc, &rm_index);
1965 err = rm_index_aioc->get_return_value();
1966 rm_index_aioc->release();
1967 if (err < 0) {
1968 if (verbose) cout << "rm index aioc failed with " << err
1969 << std::endl;
1970 return err;
1971 }
1972
1973 if (!index_set.empty()) {
1974 for (std::map<std::string,bufferlist>::iterator it = index_set.begin();
1975 it != index_set.end(); ++it){
1976 librados::ObjectWriteOperation sub;
1977 if (it->first == "1") {
1978 sub.omap_clear();
1979 } else {
1980 sub.remove();
1981 }
1982 index_data idata;
1983 bufferlist::iterator b = it->second.begin();
1984 idata.decode(b);
1985 io_ctx.operate(idata.obj, &sub);
1986 }
1987 }
1988
1989 icache.clear();
1990
1991 return 0;
1992}
1993
1994int KvFlatBtreeAsync::get_all_keys(std::set<std::string> *keys) {
1995 if (verbose) cout << client_name << ": getting all keys" << std::endl;
1996 int err = 0;
1997 librados::ObjectReadOperation oro;
1998 std::map<std::string,bufferlist> index_set;
1999 oro.omap_get_vals2("",LONG_MAX,&index_set, nullptr, &err);
2000 io_ctx.operate(index_name, &oro, NULL);
2001 if (err < 0){
2002 if (verbose) cout << "getting keys failed with error " << err << std::endl;
2003 return err;
2004 }
2005 for (std::map<std::string,bufferlist>::iterator it = index_set.begin();
2006 it != index_set.end(); ++it){
2007 librados::ObjectReadOperation sub;
2008 std::set<std::string> ret;
2009 sub.omap_get_keys2("",LONG_MAX,&ret, nullptr, &err);
2010 index_data idata;
2011 bufferlist::iterator b = it->second.begin();
2012 idata.decode(b);
2013 io_ctx.operate(idata.obj, &sub, NULL);
2014 keys->insert(ret.begin(), ret.end());
2015 }
2016 return err;
2017}
2018
2019int KvFlatBtreeAsync::get_all_keys_and_values(
2020 map<std::string,bufferlist> *kv_map) {
2021 if (verbose) cout << client_name << ": getting all keys and values"
2022 << std::endl;
2023 int err = 0;
2024 librados::ObjectReadOperation first_read;
2025 std::set<std::string> index_set;
2026 first_read.omap_get_keys2("",LONG_MAX,&index_set, nullptr, &err);
2027 io_ctx.operate(index_name, &first_read, NULL);
2028 if (err < 0){
2029 if (verbose) cout << "getting keys failed with error " << err << std::endl;
2030 return err;
2031 }
2032 for (std::set<std::string>::iterator it = index_set.begin();
2033 it != index_set.end(); ++it){
2034 librados::ObjectReadOperation sub;
2035 map<std::string, bufferlist> ret;
2036 sub.omap_get_vals2("",LONG_MAX,&ret, nullptr, &err);
2037 io_ctx.operate(*it, &sub, NULL);
2038 kv_map->insert(ret.begin(), ret.end());
2039 }
2040 return err;
2041}
2042
2043bool KvFlatBtreeAsync::is_consistent() {
2044 int err;
2045 bool ret = true;
2046 if (verbose) cout << client_name << ": checking consistency" << std::endl;
2047 std::map<std::string,bufferlist> index;
2048 map<std::string, std::set<std::string> > sub_objs;
2049 librados::ObjectReadOperation oro;
2050 oro.omap_get_vals2("",LONG_MAX,&index, nullptr, &err);
2051 io_ctx.operate(index_name, &oro, NULL);
2052 if (err < 0){
2053 //probably because the index doesn't exist - this might be ok.
2054 for (librados::NObjectIterator oit = io_ctx.nobjects_begin();
2055 oit != io_ctx.nobjects_end(); ++oit) {
2056 //if this executes, there are floating objects.
2057 cerr << "Not consistent! found floating object " << oit->get_oid()
2058 << std::endl;
2059 ret = false;
2060 }
2061 return ret;
2062 }
2063
2064 std::map<std::string, string> parsed_index;
2065 std::set<std::string> onames;
2066 std::set<std::string> special_names;
2067 for (map<std::string,bufferlist>::iterator it = index.begin();
2068 it != index.end(); ++it) {
2069 if (it->first != "") {
2070 index_data idata;
2071 bufferlist::iterator b = it->second.begin();
2072 idata.decode(b);
2073 if (idata.prefix != "") {
2074 for(vector<delete_data>::iterator dit = idata.to_delete.begin();
2075 dit != idata.to_delete.end(); ++dit) {
2076 librados::ObjectReadOperation oro;
2077 librados::AioCompletion * aioc = rados.aio_create_completion();
2078 bufferlist un;
2079 oro.getxattr("unwritable", &un, &err);
2080 io_ctx.aio_operate(dit->obj, aioc, &oro, NULL);
2081 aioc->wait_for_safe();
2082 err = aioc->get_return_value();
2083 if (ceph_clock_now() - idata.ts > timeout) {
2084 if (err < 0) {
2085 aioc->release();
2086 if (err == -ENOENT) {
2087 continue;
2088 } else {
2089 cerr << "Not consistent! reading object " << dit->obj
2090 << "returned " << err << std::endl;
2091 ret = false;
2092 break;
2093 }
2094 }
2095 if (atoi(string(un.c_str(), un.length()).c_str()) != 1 &&
2096 aioc->get_version64() != dit->version) {
2097 cerr << "Not consistent! object " << dit->obj << " has been "
2098 << " modified since the client died was not cleaned up."
2099 << std::endl;
2100 ret = false;
2101 }
2102 }
2103 special_names.insert(dit->obj);
2104 aioc->release();
2105 }
2106 for(vector<create_data >::iterator cit = idata.to_create.begin();
2107 cit != idata.to_create.end(); ++cit) {
2108 special_names.insert(cit->obj);
2109 }
2110 }
2111 parsed_index.insert(make_pair(it->first, idata.obj));
2112 onames.insert(idata.obj);
2113 }
2114 }
2115
2116 //make sure that an object exists iff it either is the index
2117 //or is listed in the index
2118 for (librados::NObjectIterator oit = io_ctx.nobjects_begin();
2119 oit != io_ctx.nobjects_end(); ++oit) {
2120 string name = oit->get_oid();
2121 if (name != index_name && onames.count(name) == 0
2122 && special_names.count(name) == 0) {
2123 cerr << "Not consistent! found floating object " << name << std::endl;
2124 ret = false;
2125 }
2126 }
2127
2128 //check objects
2129 string prev = "";
2130 for (std::map<std::string, string>::iterator it = parsed_index.begin();
2131 it != parsed_index.end();
2132 ++it) {
2133 librados::ObjectReadOperation read;
2134 read.omap_get_keys2("", LONG_MAX, &sub_objs[it->second], nullptr, &err);
2135 err = io_ctx.operate(it->second, &read, NULL);
2136 int size_int = (int)sub_objs[it->second].size();
2137
2138 //check that size is in the right range
2139 if (it->first != "1" && special_names.count(it->second) == 0 &&
2140 err != -ENOENT && (size_int > 2*k|| size_int < k)
2141 && parsed_index.size() > 1) {
2142 cerr << "Not consistent! Object " << *it << " has size " << size_int
2143 << ", which is outside the acceptable range." << std::endl;
2144 ret = false;
2145 }
2146
2147 //check that all keys belong in that object
2148 for(std::set<std::string>::iterator subit = sub_objs[it->second].begin();
2149 subit != sub_objs[it->second].end(); ++subit) {
2150 if ((it->first != "1"
2151 && *subit > it->first.substr(1,it->first.length()))
2152 || *subit <= prev) {
2153 cerr << "Not consistent! key " << *subit << " does not belong in "
2154 << *it << std::endl;
2155 cerr << "not last element, i.e. " << it->first << " not equal to 1? "
2156 << (it->first != "1") << std::endl
2157 << "greater than " << it->first.substr(1,it->first.length())
2158 <<"? " << (*subit > it->first.substr(1,it->first.length()))
2159 << std::endl
2160 << "less than or equal to " << prev << "? "
2161 << (*subit <= prev) << std::endl;
2162 ret = false;
2163 }
2164 }
2165
2166 prev = it->first.substr(1,it->first.length());
2167 }
2168
2169 if (!ret) {
2170 if (verbose) cout << "failed consistency test - see error log"
2171 << std::endl;
2172 cerr << str();
2173 } else {
2174 if (verbose) cout << "passed consistency test" << std::endl;
2175 }
2176 return ret;
2177}
2178
2179string KvFlatBtreeAsync::str() {
2180 stringstream ret;
2181 ret << "Top-level map:" << std::endl;
2182 int err = 0;
2183 std::set<std::string> keys;
2184 std::map<std::string,bufferlist> index;
2185 librados::ObjectReadOperation oro;
2186 librados::AioCompletion * top_aioc = rados.aio_create_completion();
2187 oro.omap_get_vals2("",LONG_MAX,&index, nullptr, &err);
2188 io_ctx.aio_operate(index_name, top_aioc, &oro, NULL);
2189 top_aioc->wait_for_safe();
2190 err = top_aioc->get_return_value();
2191 top_aioc->release();
2192 if (err < 0 && err != -5){
2193 if (verbose) cout << "getting keys failed with error " << err << std::endl;
2194 return ret.str();
2195 }
2196 if(index.empty()) {
2197 ret << "There are no objects!" << std::endl;
2198 return ret.str();
2199 }
2200
2201 for (map<std::string,bufferlist>::iterator it = index.begin();
2202 it != index.end(); ++it) {
2203 keys.insert(string(it->second.c_str(), it->second.length())
2204 .substr(1,it->second.length()));
2205 }
2206
2207 vector<std::string> all_names;
2208 vector<int> all_sizes(index.size());
2209 vector<int> all_versions(index.size());
2210 vector<bufferlist> all_unwrit(index.size());
2211 vector<map<std::string,bufferlist> > all_maps(keys.size());
2212 vector<map<std::string,bufferlist>::iterator> its(keys.size());
2213 unsigned done = 0;
2214 vector<bool> dones(keys.size());
2215 ret << std::endl << string(150,'-') << std::endl;
2216
2217 for (map<std::string,bufferlist>::iterator it = index.begin();
2218 it != index.end(); ++it){
2219 index_data idata;
2220 bufferlist::iterator b = it->second.begin();
2221 idata.decode(b);
2222 string s = idata.str();
2223 ret << "|" << string((148 -
2224 ((*it).first.length()+s.length()+3))/2,' ');
2225 ret << (*it).first;
2226 ret << " | ";
2227 ret << string(idata.str());
2228 ret << string((148 -
2229 ((*it).first.length()+s.length()+3))/2,' ');
2230 ret << "|\t";
2231 all_names.push_back(idata.obj);
2232 ret << std::endl << string(150,'-') << std::endl;
2233 }
2234
2235 int indexer = 0;
2236
2237 //get the object names and sizes
2238 for(vector<std::string>::iterator it = all_names.begin(); it
2239 != all_names.end();
2240 ++it) {
2241 librados::ObjectReadOperation oro;
2242 librados::AioCompletion *aioc = rados.aio_create_completion();
2243 oro.omap_get_vals2("", LONG_MAX, &all_maps[indexer], nullptr, &err);
2244 oro.getxattr("unwritable", &all_unwrit[indexer], &err);
2245 io_ctx.aio_operate(*it, aioc, &oro, NULL);
2246 aioc->wait_for_safe();
2247 if (aioc->get_return_value() < 0) {
2248 ret << "reading" << *it << "failed: " << err << std::endl;
2249 //return ret.str();
2250 }
2251 all_sizes[indexer] = all_maps[indexer].size();
2252 all_versions[indexer] = aioc->get_version64();
2253 indexer++;
2254 aioc->release();
2255 }
2256
2257 ret << "///////////////////OBJECT NAMES////////////////" << std::endl;
2258 //HEADERS
2259 ret << std::endl;
2260 for (int i = 0; i < indexer; i++) {
2261 ret << "---------------------------\t";
2262 }
2263 ret << std::endl;
2264 for (int i = 0; i < indexer; i++) {
2265 ret << "|" << string((25 -
2266 (string("Bucket: ").length() + all_names[i].length()))/2, ' ');
2267 ret << "Bucket: " << all_names[i];
2268 ret << string((25 -
2269 (string("Bucket: ").length() + all_names[i].length()))/2, ' ') << "|\t";
2270 }
2271 ret << std::endl;
2272 for (int i = 0; i < indexer; i++) {
2273 its[i] = all_maps[i].begin();
2274 ret << "|" << string((25 - (string("size: ").length()
2275 + to_string("",all_sizes[i]).length()))/2, ' ');
2276 ret << "size: " << all_sizes[i];
2277 ret << string((25 - (string("size: ").length()
2278 + to_string("",all_sizes[i]).length()))/2, ' ') << "|\t";
2279 }
2280 ret << std::endl;
2281 for (int i = 0; i < indexer; i++) {
2282 its[i] = all_maps[i].begin();
2283 ret << "|" << string((25 - (string("version: ").length()
2284 + to_string("",all_versions[i]).length()))/2, ' ');
2285 ret << "version: " << all_versions[i];
2286 ret << string((25 - (string("version: ").length()
2287 + to_string("",all_versions[i]).length()))/2, ' ') << "|\t";
2288 }
2289 ret << std::endl;
2290 for (int i = 0; i < indexer; i++) {
2291 its[i] = all_maps[i].begin();
2292 ret << "|" << string((25 - (string("unwritable? ").length()
2293 + 1))/2, ' ');
2294 ret << "unwritable? " << string(all_unwrit[i].c_str(),
2295 all_unwrit[i].length());
2296 ret << string((25 - (string("unwritable? ").length()
2297 + 1))/2, ' ') << "|\t";
2298 }
2299 ret << std::endl;
2300 for (int i = 0; i < indexer; i++) {
2301 ret << "---------------------------\t";
2302 }
2303 ret << std::endl;
2304 ret << "///////////////////THE ACTUAL BLOCKS////////////////" << std::endl;
2305
2306
2307 ret << std::endl;
2308 for (int i = 0; i < indexer; i++) {
2309 ret << "---------------------------\t";
2310 }
2311 ret << std::endl;
2312 //each time through this part is two lines
2313 while(done < keys.size()) {
2314 for(int i = 0; i < indexer; i++) {
2315 if(dones[i]){
2316 ret << " \t";
2317 } else {
2318 if (its[i] == all_maps[i].end()){
2319 done++;
2320 dones[i] = true;
2321 ret << " \t";
2322 } else {
2323 ret << "|" << string((25 -
2324 ((*its[i]).first.length()+its[i]->second.length()+3))/2,' ');
2325 ret << (*its[i]).first;
2326 ret << " | ";
2327 ret << string(its[i]->second.c_str(), its[i]->second.length());
2328 ret << string((25 -
2329 ((*its[i]).first.length()+its[i]->second.length()+3))/2,' ');
2330 ret << "|\t";
2331 ++(its[i]);
2332 }
2333
2334 }
2335 }
2336 ret << std::endl;
2337 for (int i = 0; i < indexer; i++) {
2338 if(dones[i]){
2339 ret << " \t";
2340 } else {
2341 ret << "---------------------------\t";
2342 }
2343 }
2344 ret << std::endl;
2345
2346 }
2347 return ret.str();
2348}