]> git.proxmox.com Git - ceph.git/blob - ceph/src/key_value_store/kv_flat_btree_async.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / key_value_store / kv_flat_btree_async.cc
1 /*
2 * Key-value store using librados
3 *
4 * September 2, 2012
5 * Eleanor Cawthon
6 * eleanor.cawthon@inktank.com
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13
14 #include "include/compat.h"
15 #include "key_value_store/key_value_structure.h"
16 #include "key_value_store/kv_flat_btree_async.h"
17 #include "key_value_store/kvs_arg_types.h"
18 #include "include/rados/librados.hpp"
19 #include "common/ceph_context.h"
20 #include "common/Clock.h"
21 #include "include/types.h"
22
23 #include <errno.h>
24 #include <string>
25 #include <iostream>
26 #include <cassert>
27 #include <climits>
28 #include <cmath>
29 #include <sstream>
30 #include <stdlib.h>
31 #include <iterator>
32
33 using ceph::bufferlist;
34
35 bool index_data::is_timed_out(utime_t now, utime_t timeout) const {
36 return prefix != "" && now - ts > timeout;
37 }
38
39 void IndexCache::clear() {
40 k2itmap.clear();
41 t2kmap.clear();
42 }
43
44 void IndexCache::push(const string &key, const index_data &idata) {
45 if (cache_size == 0) {
46 return;
47 }
48 index_data old_idata;
49 map<key_data, pair<index_data, utime_t> >::iterator old_it =
50 k2itmap.lower_bound(key_data(key));
51 if (old_it != k2itmap.end()) {
52 t2kmap.erase(old_it->second.second);
53 k2itmap.erase(old_it);
54 }
55 map<key_data, pair<index_data, utime_t> >::iterator new_it =
56 k2itmap.find(idata.kdata);
57 if (new_it != k2itmap.end()) {
58 utime_t old_time = new_it->second.second;
59 t2kmap.erase(old_time);
60 }
61 utime_t time = ceph_clock_now();
62 k2itmap[idata.kdata] = make_pair(idata, time);
63 t2kmap[time] = idata.kdata;
64 if ((int)k2itmap.size() > cache_size) {
65 pop();
66 }
67
68 }
69
70 void IndexCache::push(const index_data &idata) {
71 if (cache_size == 0) {
72 return;
73 }
74 if (k2itmap.count(idata.kdata) > 0) {
75 utime_t old_time = k2itmap[idata.kdata].second;
76 t2kmap.erase(old_time);
77 k2itmap.erase(idata.kdata);
78 }
79 utime_t time = ceph_clock_now();
80 k2itmap[idata.kdata] = make_pair(idata, time);
81 t2kmap[time] = idata.kdata;
82 if ((int)k2itmap.size() > cache_size) {
83 pop();
84 }
85 }
86
87 void IndexCache::pop() {
88 if (cache_size == 0) {
89 return;
90 }
91 map<utime_t, key_data>::iterator it = t2kmap.begin();
92 utime_t time = it->first;
93 key_data kdata = it->second;
94 k2itmap.erase(kdata);
95 t2kmap.erase(time);
96 }
97
98 void IndexCache::erase(key_data kdata) {
99 if (cache_size == 0) {
100 return;
101 }
102 if (k2itmap.count(kdata) > 0) {
103 utime_t c = k2itmap[kdata].second;
104 k2itmap.erase(kdata);
105 t2kmap.erase(c);
106 }
107 }
108
109 int IndexCache::get(const string &key, index_data *idata) const {
110 if (cache_size == 0) {
111 return -ENODATA;
112 }
113 if ((int)k2itmap.size() == 0) {
114 return -ENODATA;
115 }
116 map<key_data, pair<index_data, utime_t> >::const_iterator it =
117 k2itmap.lower_bound(key_data(key));
118 if (it == k2itmap.end() || !(it->second.first.min_kdata < key_data(key))) {
119 return -ENODATA;
120 } else {
121 *idata = it->second.first;
122 }
123 return 0;
124 }
125
126 int IndexCache::get(const string &key, index_data *idata,
127 index_data *next_idata) const {
128 if (cache_size == 0) {
129 return -ENODATA;
130 }
131 map<key_data, pair<index_data, utime_t> >::const_iterator it =
132 k2itmap.lower_bound(key_data(key));
133 if (it == k2itmap.end() || ++it == k2itmap.end()) {
134 return -ENODATA;
135 } else {
136 --it;
137 if (!(it->second.first.min_kdata < key_data(key))){
138 //stale, should be reread.
139 return -ENODATA;
140 } else {
141 *idata = it->second.first;
142 ++it;
143 if (it != k2itmap.end()) {
144 *next_idata = it->second.first;
145 }
146 }
147 }
148 return 0;
149 }
150
151 int KvFlatBtreeAsync::nothing() {
152 return 0;
153 }
154
155 int KvFlatBtreeAsync::wait() {
156 if (rand() % 10 == 0) {
157 usleep(wait_ms);
158 }
159 return 0;
160 }
161
162 int KvFlatBtreeAsync::suicide() {
163 if (rand() % 10 == 0) {
164 if (verbose) cout << client_name << " is suiciding" << std::endl;
165 return 1;
166 }
167 return 0;
168 }
169
170 int KvFlatBtreeAsync::next(const index_data &idata, index_data * out_data)
171 {
172 if (verbose) cout << "\t\t" << client_name << "-next: finding next of "
173 << idata.str()
174 << std::endl;
175 int err = 0;
176 librados::ObjectReadOperation oro;
177 std::map<std::string, bufferlist> kvs;
178 oro.omap_get_vals2(idata.kdata.encoded(),1,&kvs, nullptr, &err);
179 err = io_ctx.operate(index_name, &oro, NULL);
180 if (err < 0){
181 if (verbose) cout << "\t\t\t" << client_name
182 << "-next: getting index failed with error "
183 << err << std::endl;
184 return err;
185 }
186 if (!kvs.empty()) {
187 out_data->kdata.parse(kvs.begin()->first);
188 auto b = kvs.begin()->second.cbegin();
189 out_data->decode(b);
190 if (idata.is_timed_out(ceph_clock_now(), timeout)) {
191 if (verbose) cout << client_name << " THINKS THE OTHER CLIENT DIED."
192 << std::endl;
193 //the client died after deleting the object. clean up.
194 cleanup(idata, err);
195 }
196 } else {
197 err = -EOVERFLOW;
198 }
199 return err;
200 }
201
202 int KvFlatBtreeAsync::prev(const index_data &idata, index_data * out_data)
203 {
204 if (verbose) cout << "\t\t" << client_name << "-prev: finding prev of "
205 << idata.str() << std::endl;
206 int err = 0;
207 bufferlist inbl;
208 idata_from_idata_args in_args;
209 in_args.idata = idata;
210 in_args.encode(inbl);
211 bufferlist outbl;
212 err = io_ctx.exec(index_name,"kvs", "get_prev_idata", inbl, outbl);
213 if (err < 0){
214 if (verbose) cout << "\t\t\t" << client_name
215 << "-prev: getting index failed with error "
216 << err << std::endl;
217 if (idata.is_timed_out(ceph_clock_now(), timeout)) {
218 if (verbose) cout << client_name << " THINKS THE OTHER CLIENT DIED."
219 << std::endl;
220 //the client died after deleting the object. clean up.
221 err = cleanup(idata, err);
222 if (err == -ESUICIDE) {
223 return err;
224 } else {
225 err = 0;
226 }
227 }
228 return err;
229 }
230 auto it = outbl.cbegin();
231 in_args.decode(it);
232 *out_data = in_args.next_idata;
233 if (verbose) cout << "\t\t" << client_name << "-prev: prev is "
234 << out_data->str()
235 << std::endl;
236 return err;
237 }
238
239 int KvFlatBtreeAsync::read_index(const string &key, index_data * idata,
240 index_data * next_idata, bool force_update) {
241 int err = 0;
242 if (!force_update) {
243 if (verbose) cout << "\t" << client_name
244 << "-read_index: getting index_data for " << key
245 << " from cache" << std::endl;
246 icache_lock.lock();
247 if (next_idata != NULL) {
248 err = icache.get(key, idata, next_idata);
249 } else {
250 err = icache.get(key, idata);
251 }
252 icache_lock.unlock();
253
254 if (err == 0) {
255 //if (verbose) cout << "CACHE SUCCESS" << std::endl;
256 return err;
257 } else {
258 if (verbose) cout << "NOT IN CACHE" << std::endl;
259 }
260 }
261
262 if (verbose) cout << "\t" << client_name
263 << "-read_index: getting index_data for " << key
264 << " from object" << std::endl;
265 librados::ObjectReadOperation oro;
266 bufferlist raw_val;
267 std::set<std::string> key_set;
268 key_set.insert(key_data(key).encoded());
269 std::map<std::string, bufferlist> kvmap;
270 std::map<std::string, bufferlist> dupmap;
271 oro.omap_get_vals_by_keys(key_set, &dupmap, &err);
272 oro.omap_get_vals2(key_data(key).encoded(),
273 (cache_size / cache_refresh >= 2? cache_size / cache_refresh: 2),
274 &kvmap, nullptr, &err);
275 err = io_ctx.operate(index_name, &oro, NULL);
276 utime_t mytime = ceph_clock_now();
277 if (err < 0){
278 cerr << "\t" << client_name
279 << "-read_index: getting keys failed with "
280 << err << std::endl;
281 ceph_abort_msg(client_name + "-read_index: reading index failed");
282 return err;
283 }
284 kvmap.insert(dupmap.begin(), dupmap.end());
285 for (map<string, bufferlist>::iterator it = ++kvmap.begin();
286 it != kvmap.end();
287 ++it) {
288 bufferlist bl = it->second;
289 auto blit = bl.cbegin();
290 index_data this_idata;
291 this_idata.decode(blit);
292 if (this_idata.is_timed_out(mytime, timeout)) {
293 if (verbose) cout << client_name
294 << " THINKS THE OTHER CLIENT DIED. (mytime is "
295 << mytime.sec() << "." << mytime.usec() << ", idata.ts is "
296 << this_idata.ts.sec() << "." << this_idata.ts.usec()
297 << ", it has been " << (mytime - this_idata.ts).sec()
298 << '.' << (mytime - this_idata.ts).usec()
299 << ", timeout is " << timeout << ")" << std::endl;
300 //the client died after deleting the object. clean up.
301 if (cleanup(this_idata, -EPREFIX) == -ESUICIDE) {
302 return -ESUICIDE;
303 }
304 return read_index(key, idata, next_idata, force_update);
305 }
306 std::scoped_lock l{icache_lock};
307 icache.push(this_idata);
308 }
309 auto b = kvmap.begin()->second.cbegin();
310 idata->decode(b);
311 idata->kdata.parse(kvmap.begin()->first);
312 if (verbose) cout << "\t" << client_name << "-read_index: kvmap_size is "
313 << kvmap.size()
314 << ", idata is " << idata->str() << std::endl;
315
316 ceph_assert(idata->obj != "");
317 icache_lock.lock();
318 icache.push(key, *idata);
319 icache_lock.unlock();
320
321 if (next_idata != NULL && idata->kdata.prefix != "1") {
322 next_idata->kdata.parse((++kvmap.begin())->first);
323 auto nb = (++kvmap.begin())->second.cbegin();
324 next_idata->decode(nb);
325 std::scoped_lock l{icache_lock};
326 icache.push(*next_idata);
327 }
328 return err;
329 }
330
331 int KvFlatBtreeAsync::split(const index_data &idata) {
332 int err = 0;
333 opmap['l']++;
334
335 if (idata.prefix != "") {
336 return -EPREFIX;
337 }
338
339 rebalance_args args;
340 args.bound = 2 * k - 1;
341 args.comparator = CEPH_OSD_CMPXATTR_OP_GT;
342 err = read_object(idata.obj, &args);
343 args.odata.max_kdata = idata.kdata;
344 if (err < 0) {
345 if (verbose) cout << "\t\t" << client_name << "-split: read object "
346 << args.odata.name
347 << " got " << err << std::endl;
348 return err;
349 }
350
351 if (verbose) cout << "\t\t" << client_name << "-split: splitting "
352 << idata.obj
353 << ", which has size " << args.odata.size
354 << " and actual size " << args.odata.omap.size() << std::endl;
355
356 ///////preparations that happen outside the critical section
357 //for prefix index
358 vector<object_data> to_create;
359 vector<object_data> to_delete;
360 to_delete.push_back(object_data(idata.min_kdata,
361 args.odata.max_kdata, args.odata.name, args.odata.version));
362
363 //for lower half object
364 map<std::string, bufferlist>::const_iterator it = args.odata.omap.begin();
365 client_index_lock.lock();
366 to_create.push_back(object_data(to_string(client_name, client_index++)));
367 client_index_lock.unlock();
368 for (int i = 0; i < k; i++) {
369 to_create[0].omap.insert(*it);
370 ++it;
371 }
372 to_create[0].min_kdata = idata.min_kdata;
373 to_create[0].max_kdata = key_data(to_create[0].omap.rbegin()->first);
374
375 //for upper half object
376 client_index_lock.lock();
377 to_create.push_back(object_data(to_create[0].max_kdata,
378 args.odata.max_kdata,
379 to_string(client_name, client_index++)));
380 client_index_lock.unlock();
381 to_create[1].omap.insert(
382 ++args.odata.omap.find(to_create[0].omap.rbegin()->first),
383 args.odata.omap.end());
384
385 //setting up operations
386 librados::ObjectWriteOperation owos[6];
387 vector<pair<pair<int, string>, librados::ObjectWriteOperation*> > ops;
388 index_data out_data;
389 set_up_prefix_index(to_create, to_delete, &owos[0], &out_data, &err);
390 ops.push_back(make_pair(
391 pair<int, string>(ADD_PREFIX, index_name),
392 &owos[0]));
393 for (int i = 1; i < 6; i++) {
394 ops.push_back(make_pair(make_pair(0,""), &owos[i]));
395 }
396 set_up_ops(to_create, to_delete, &ops, out_data, &err);
397
398 /////BEGIN CRITICAL SECTION/////
399 //put prefix on index entry for idata.val
400 err = perform_ops("\t\t" + client_name + "-split:", out_data, &ops);
401 if (err < 0) {
402 return err;
403 }
404 if (verbose) cout << "\t\t" << client_name << "-split: done splitting."
405 << std::endl;
406 /////END CRITICAL SECTION/////
407 icache_lock.lock();
408 for (vector<delete_data>::iterator it = out_data.to_delete.begin();
409 it != out_data.to_delete.end(); ++it) {
410 icache.erase(it->max);
411 }
412 for (vector<create_data>::iterator it = out_data.to_create.begin();
413 it != out_data.to_create.end(); ++it) {
414 icache.push(index_data(*it));
415 }
416 icache_lock.unlock();
417 return err;
418 }
419
420 int KvFlatBtreeAsync::rebalance(const index_data &idata1,
421 const index_data &next_idata){
422 opmap['m']++;
423 int err = 0;
424
425 if (idata1.prefix != "") {
426 return -EPREFIX;
427 }
428
429 rebalance_args args1;
430 args1.bound = k + 1;
431 args1.comparator = CEPH_OSD_CMPXATTR_OP_LT;
432 index_data idata2 = next_idata;
433
434 rebalance_args args2;
435 args2.bound = k + 1;
436 args2.comparator = CEPH_OSD_CMPXATTR_OP_LT;
437
438 if (idata1.kdata.prefix == "1") {
439 //this is the highest key in the index, so it doesn't have a next.
440
441 //read the index for the previous entry
442 err = prev(idata1, &idata2);
443 if (err == -ERANGE) {
444 if (verbose) cout << "\t\t" << client_name
445 << "-rebalance: this is the only node, "
446 << "so aborting" << std::endl;
447 return -EUCLEAN;
448 } else if (err < 0) {
449 return err;
450 }
451
452 //read the first object
453 err = read_object(idata1.obj, &args2);
454 if (err < 0) {
455 if (verbose) cout << "reading " << idata1.obj << " failed with " << err
456 << std::endl;
457 if (err == -ENOENT) {
458 return -ECANCELED;
459 }
460 return err;
461 }
462 args2.odata.min_kdata = idata1.min_kdata;
463 args2.odata.max_kdata = idata1.kdata;
464
465 //read the second object
466 args1.bound = 2 * k + 1;
467 err = read_object(idata2.obj, &args1);
468 if (err < 0) {
469 if (verbose) cout << "reading " << idata1.obj << " failed with " << err
470 << std::endl;
471 return err;
472 }
473 args1.odata.min_kdata = idata2.min_kdata;
474 args1.odata.max_kdata = idata2.kdata;
475
476 if (verbose) cout << "\t\t" << client_name << "-rebalance: read "
477 << idata2.obj
478 << ". size: " << args1.odata.size << " version: "
479 << args1.odata.version
480 << std::endl;
481 } else {
482 assert (next_idata.obj != "");
483 //there is a next key, so get it.
484 err = read_object(idata1.obj, &args1);
485 if (err < 0) {
486 if (verbose) cout << "reading " << idata1.obj << " failed with " << err
487 << std::endl;
488 return err;
489 }
490 args1.odata.min_kdata = idata1.min_kdata;
491 args1.odata.max_kdata = idata1.kdata;
492
493 args2.bound = 2 * k + 1;
494 err = read_object(idata2.obj, &args2);
495 if (err < 0) {
496 if (verbose) cout << "reading " << idata1.obj << " failed with " << err
497 << std::endl;
498 if (err == -ENOENT) {
499 return -ECANCELED;
500 }
501 return err;
502 }
503 args2.odata.min_kdata = idata2.min_kdata;
504 args2.odata.max_kdata = idata2.kdata;
505
506 if (verbose) cout << "\t\t" << client_name << "-rebalance: read "
507 << idata2.obj
508 << ". size: " << args2.odata.size << " version: "
509 << args2.odata.version
510 << std::endl;
511 }
512
513 if (verbose) cout << "\t\t" << client_name << "-rebalance: o1 is "
514 << args1.odata.max_kdata.encoded() << ","
515 << args1.odata.name << " with size " << args1.odata.size
516 << " , o2 is " << args2.odata.max_kdata.encoded()
517 << "," << args2.odata.name << " with size " << args2.odata.size
518 << std::endl;
519
520 //calculations
521 if ((int)args1.odata.size > k && (int)args1.odata.size <= 2*k
522 && (int)args2.odata.size > k
523 && (int)args2.odata.size <= 2*k) {
524 //nothing to do
525 if (verbose) cout << "\t\t" << client_name
526 << "-rebalance: both sizes in range, so"
527 << " aborting " << std::endl;
528 return -EBALANCE;
529 } else if (idata1.prefix != "" || idata2.prefix != "") {
530 return -EPREFIX;
531 }
532
533 //this is the high object. it gets created regardless of rebalance or merge.
534 client_index_lock.lock();
535 string o2w = to_string(client_name, client_index++);
536 client_index_lock.unlock();
537 index_data idata;
538 vector<object_data> to_create;
539 vector<object_data> to_delete;
540 librados::ObjectWriteOperation create[2];//possibly only 1 will be used
541 librados::ObjectWriteOperation other_ops[6];
542 vector<pair<pair<int, string>, librados::ObjectWriteOperation*> > ops;
543 ops.push_back(make_pair(
544 pair<int, string>(ADD_PREFIX, index_name),
545 &other_ops[0]));
546
547 if ((int)args1.odata.size + (int)args2.odata.size <= 2*k) {
548 //merge
549 if (verbose) cout << "\t\t" << client_name << "-rebalance: merging "
550 << args1.odata.name
551 << " and " << args2.odata.name << " to get " << o2w
552 << std::endl;
553 map<string, bufferlist> write2_map;
554 write2_map.insert(args1.odata.omap.begin(), args1.odata.omap.end());
555 write2_map.insert(args2.odata.omap.begin(), args2.odata.omap.end());
556 to_create.push_back(object_data(args1.odata.min_kdata,
557 args2.odata.max_kdata, o2w, write2_map));
558 ops.push_back(make_pair(
559 pair<int, string>(MAKE_OBJECT, o2w),
560 &create[0]));
561 ceph_assert((int)write2_map.size() <= 2*k);
562 } else {
563 //rebalance
564 if (verbose) cout << "\t\t" << client_name << "-rebalance: rebalancing "
565 << args1.odata.name
566 << " and " << args2.odata.name << std::endl;
567 map<std::string, bufferlist> write1_map;
568 map<std::string, bufferlist> write2_map;
569 map<std::string, bufferlist>::iterator it;
570 client_index_lock.lock();
571 string o1w = to_string(client_name, client_index++);
572 client_index_lock.unlock();
573 int target_size_1 = ceil(((int)args1.odata.size + (int)args2.odata.size)
574 / 2.0);
575 if (args1.odata.max_kdata != idata1.kdata) {
576 //this should be true if idata1 is the high object
577 target_size_1 = floor(((int)args1.odata.size + (int)args2.odata.size)
578 / 2.0);
579 }
580 for (it = args1.odata.omap.begin();
581 it != args1.odata.omap.end() && (int)write1_map.size()
582 < target_size_1;
583 ++it) {
584 write1_map.insert(*it);
585 }
586 if (it != args1.odata.omap.end()){
587 //write1_map is full, so put the rest in write2_map
588 write2_map.insert(it, args1.odata.omap.end());
589 write2_map.insert(args2.odata.omap.begin(), args2.odata.omap.end());
590 } else {
591 //args1.odata.omap was small, and write2_map still needs more
592 map<std::string, bufferlist>::iterator it2;
593 for(it2 = args2.odata.omap.begin();
594 (it2 != args2.odata.omap.end()) && ((int)write1_map.size()
595 < target_size_1);
596 ++it2) {
597 write1_map.insert(*it2);
598 }
599 write2_map.insert(it2, args2.odata.omap.end());
600 }
601 if (verbose) cout << "\t\t" << client_name
602 << "-rebalance: write1_map has size "
603 << write1_map.size() << ", write2_map.size() is " << write2_map.size()
604 << std::endl;
605 //at this point, write1_map and write2_map should have the correct pairs
606 to_create.push_back(object_data(args1.odata.min_kdata,
607 key_data(write1_map.rbegin()->first),
608 o1w,write1_map));
609 to_create.push_back(object_data( key_data(write1_map.rbegin()->first),
610 args2.odata.max_kdata, o2w, write2_map));
611 ops.push_back(make_pair(
612 pair<int, string>(MAKE_OBJECT, o1w),
613 &create[0]));
614 ops.push_back(make_pair(
615 pair<int, string>(MAKE_OBJECT, o2w),
616 &create[1]));
617 }
618
619 to_delete.push_back(object_data(args1.odata.min_kdata,
620 args1.odata.max_kdata, args1.odata.name, args1.odata.version));
621 to_delete.push_back(object_data(args2.odata.min_kdata,
622 args2.odata.max_kdata, args2.odata.name, args2.odata.version));
623 for (int i = 1; i < 6; i++) {
624 ops.push_back(make_pair(make_pair(0,""), &other_ops[i]));
625 }
626
627 index_data out_data;
628 set_up_prefix_index(to_create, to_delete, &other_ops[0], &out_data, &err);
629 set_up_ops(to_create, to_delete, &ops, out_data, &err);
630
631 //at this point, all operations should be completely set up.
632 /////BEGIN CRITICAL SECTION/////
633 err = perform_ops("\t\t" + client_name + "-rebalance:", out_data, &ops);
634 if (err < 0) {
635 return err;
636 }
637 icache_lock.lock();
638 for (vector<delete_data>::iterator it = out_data.to_delete.begin();
639 it != out_data.to_delete.end(); ++it) {
640 icache.erase(it->max);
641 }
642 for (vector<create_data>::iterator it = out_data.to_create.begin();
643 it != out_data.to_create.end(); ++it) {
644 icache.push(index_data(*it));
645 }
646 icache_lock.unlock();
647 if (verbose) cout << "\t\t" << client_name << "-rebalance: done rebalancing."
648 << std::endl;
649 /////END CRITICAL SECTION/////
650 return err;
651 }
652
653 int KvFlatBtreeAsync::read_object(const string &obj, object_data * odata) {
654 librados::ObjectReadOperation get_obj;
655 librados::AioCompletion * obj_aioc = rados.aio_create_completion();
656 int err;
657 bufferlist unw_bl;
658 odata->name = obj;
659 get_obj.omap_get_vals2("", LONG_MAX, &odata->omap, nullptr, &err);
660 get_obj.getxattr("unwritable", &unw_bl, &err);
661 io_ctx.aio_operate(obj, obj_aioc, &get_obj, NULL);
662 obj_aioc->wait_for_complete();
663 err = obj_aioc->get_return_value();
664 if (err < 0){
665 //possibly -ENOENT, meaning someone else deleted it.
666 obj_aioc->release();
667 return err;
668 }
669 odata->unwritable = string(unw_bl.c_str(), unw_bl.length()) == "1";
670 odata->version = obj_aioc->get_version64();
671 odata->size = odata->omap.size();
672 obj_aioc->release();
673 return 0;
674 }
675
676 int KvFlatBtreeAsync::read_object(const string &obj, rebalance_args * args) {
677 bufferlist inbl;
678 args->encode(inbl);
679 bufferlist outbl;
680 int err;
681 librados::AioCompletion * a = rados.aio_create_completion();
682 io_ctx.aio_exec(obj, a, "kvs", "maybe_read_for_balance", inbl, &outbl);
683 a->wait_for_complete();
684 err = a->get_return_value();
685 if (err < 0) {
686 if (verbose) cout << "\t\t" << client_name
687 << "-read_object: reading failed with "
688 << err << std::endl;
689 a->release();
690 return err;
691 }
692 auto it = outbl.cbegin();
693 args->decode(it);
694 args->odata.name = obj;
695 args->odata.version = a->get_version64();
696 a->release();
697 return err;
698 }
699
700 void KvFlatBtreeAsync::set_up_prefix_index(
701 const vector<object_data> &to_create,
702 const vector<object_data> &to_delete,
703 librados::ObjectWriteOperation * owo,
704 index_data * idata,
705 int * err) {
706 std::map<std::string, pair<bufferlist, int> > assertions;
707 map<string, bufferlist> to_insert;
708 idata->prefix = "1";
709 idata->ts = ceph_clock_now();
710 for(vector<object_data>::const_iterator it = to_create.begin();
711 it != to_create.end();
712 ++it) {
713 create_data c(it->min_kdata, it->max_kdata, it->name);
714 idata->to_create.push_back(c);
715 }
716 for(vector<object_data>::const_iterator it = to_delete.begin();
717 it != to_delete.end();
718 ++it) {
719 delete_data d(it->min_kdata, it->max_kdata, it->name, it->version);
720 idata->to_delete.push_back(d);
721 }
722 for(vector<object_data>::const_iterator it = to_delete.begin();
723 it != to_delete.end();
724 ++it) {
725 idata->obj = it->name;
726 idata->min_kdata = it->min_kdata;
727 idata->kdata = it->max_kdata;
728 bufferlist insert;
729 idata->encode(insert);
730 to_insert[it->max_kdata.encoded()] = insert;
731 index_data this_entry;
732 this_entry.min_kdata = idata->min_kdata;
733 this_entry.kdata = idata->kdata;
734 this_entry.obj = idata->obj;
735 assertions[it->max_kdata.encoded()] = pair<bufferlist, int>
736 (to_bl(this_entry), CEPH_OSD_CMPXATTR_OP_EQ);
737 if (verbose) cout << "\t\t\t" << client_name
738 << "-setup_prefix: will assert "
739 << this_entry.str() << std::endl;
740 }
741 ceph_assert(*err == 0);
742 owo->omap_cmp(assertions, err);
743 if (to_create.size() <= 2) {
744 owo->omap_set(to_insert);
745 }
746 }
747
748 //some args can be null if there are no corresponding entries in p
749 void KvFlatBtreeAsync::set_up_ops(
750 const vector<object_data> &create_vector,
751 const vector<object_data> &delete_vector,
752 vector<pair<pair<int, string>, librados::ObjectWriteOperation*> > * ops,
753 const index_data &idata,
754 int * err) {
755 vector<pair<pair<int, string>,
756 librados::ObjectWriteOperation* > >::iterator it;
757
758 //skip the prefixing part
759 for(it = ops->begin(); it->first.first == ADD_PREFIX; ++it) {}
760 map<string, bufferlist> to_insert;
761 std::set<string> to_remove;
762 map<string, pair<bufferlist, int> > assertions;
763 if (create_vector.size() > 0) {
764 for (int i = 0; i < (int)idata.to_delete.size(); ++i) {
765 it->first = pair<int, string>(UNWRITE_OBJECT, idata.to_delete[i].obj);
766 set_up_unwrite_object(delete_vector[i].version, it->second);
767 ++it;
768 }
769 }
770 for (int i = 0; i < (int)idata.to_create.size(); ++i) {
771 index_data this_entry(idata.to_create[i].max, idata.to_create[i].min,
772 idata.to_create[i].obj);
773 to_insert[idata.to_create[i].max.encoded()] = to_bl(this_entry);
774 if (idata.to_create.size() <= 2) {
775 it->first = pair<int, string>(MAKE_OBJECT, idata.to_create[i].obj);
776 } else {
777 it->first = pair<int, string>(AIO_MAKE_OBJECT, idata.to_create[i].obj);
778 }
779 set_up_make_object(create_vector[i].omap, it->second);
780 ++it;
781 }
782 for (int i = 0; i < (int)idata.to_delete.size(); ++i) {
783 index_data this_entry = idata;
784 this_entry.obj = idata.to_delete[i].obj;
785 this_entry.min_kdata = idata.to_delete[i].min;
786 this_entry.kdata = idata.to_delete[i].max;
787 if (verbose) cout << "\t\t\t" << client_name << "-setup_ops: will assert "
788 << this_entry.str() << std::endl;
789 assertions[idata.to_delete[i].max.encoded()] = pair<bufferlist, int>(
790 to_bl(this_entry), CEPH_OSD_CMPXATTR_OP_EQ);
791 to_remove.insert(idata.to_delete[i].max.encoded());
792 it->first = pair<int, string>(REMOVE_OBJECT, idata.to_delete[i].obj);
793 set_up_delete_object(it->second);
794 ++it;
795 }
796 if ((int)idata.to_create.size() <= 2) {
797 it->second->omap_cmp(assertions, err);
798 }
799 it->second->omap_rm_keys(to_remove);
800 it->second->omap_set(to_insert);
801
802
803 it->first = pair<int, string>(REMOVE_PREFIX, index_name);
804 }
805
806 void KvFlatBtreeAsync::set_up_make_object(
807 const map<std::string, bufferlist> &to_set,
808 librados::ObjectWriteOperation *owo) {
809 bufferlist inbl;
810 encode(to_set, inbl);
811 owo->exec("kvs", "create_with_omap", inbl);
812 }
813
814 void KvFlatBtreeAsync::set_up_unwrite_object(
815 const int &ver, librados::ObjectWriteOperation *owo) {
816 if (ver > 0) {
817 owo->assert_version(ver);
818 }
819 owo->cmpxattr("unwritable", CEPH_OSD_CMPXATTR_OP_EQ, to_bl("0"));
820 owo->setxattr("unwritable", to_bl("1"));
821 }
822
823 void KvFlatBtreeAsync::set_up_restore_object(
824 librados::ObjectWriteOperation *owo) {
825 owo->cmpxattr("unwritable", CEPH_OSD_CMPXATTR_OP_EQ, to_bl("1"));
826 owo->setxattr("unwritable", to_bl("0"));
827 }
828
829 void KvFlatBtreeAsync::set_up_delete_object(
830 librados::ObjectWriteOperation *owo) {
831 owo->cmpxattr("unwritable", CEPH_OSD_CMPXATTR_OP_EQ, to_bl("1"));
832 owo->remove();
833 }
834
835 int KvFlatBtreeAsync::perform_ops(const string &debug_prefix,
836 const index_data &idata,
837 vector<pair<pair<int, string>, librados::ObjectWriteOperation*> > *ops) {
838 int err = 0;
839 vector<librados::AioCompletion*> aiocs(idata.to_create.size());
840 int count = 0;
841 for (vector<pair<pair<int, string>,
842 librados::ObjectWriteOperation*> >::iterator it = ops->begin();
843 it != ops->end(); ++it) {
844 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) {
845 return -ESUICIDE;
846 }
847 switch (it->first.first) {
848 case ADD_PREFIX://prefixing
849 if (verbose) cout << debug_prefix << " adding prefix" << std::endl;
850 err = io_ctx.operate(index_name, it->second);
851 if (err < 0) {
852 if (verbose) cout << debug_prefix << " prefixing the index failed with "
853 << err << std::endl;
854 return -EPREFIX;
855 }
856 if (verbose) cout << debug_prefix << " prefix added." << std::endl;
857 break;
858 case UNWRITE_OBJECT://marking
859 if (verbose) cout << debug_prefix << " marking " << it->first.second
860 << std::endl;
861 err = io_ctx.operate(it->first.second, it->second);
862 if (err < 0) {
863 //most likely because it changed, in which case it will be -ERANGE
864 if (verbose) cout << debug_prefix << " marking " << it->first.second
865 << "failed with code" << err << std::endl;
866 if (it->first.second == (*idata.to_delete.begin()).max.encoded()) {
867 if (cleanup(idata, -EFIRSTOBJ) == -ESUICIDE) {
868 return -ESUICIDE;
869 }
870 } else {
871 if (cleanup(idata, -ERANGE) == -ESUICIDE) {
872 return -ESUICIDE;
873 }
874 }
875 return err;
876 }
877 if (verbose) cout << debug_prefix << " marked " << it->first.second
878 << std::endl;
879 break;
880 case MAKE_OBJECT://creating
881 if (verbose) cout << debug_prefix << " creating " << it->first.second
882 << std::endl;
883 err = io_ctx.operate(it->first.second, it->second);
884 if (err < 0) {
885 //this can happen if someone else was cleaning up after us.
886 if (verbose) cout << debug_prefix << " creating " << it->first.second
887 << " failed"
888 << " with code " << err << std::endl;
889 if (err == -EEXIST) {
890 //someone thinks we died, so die
891 if (verbose) cout << client_name << " is suiciding!" << std::endl;
892 return -ESUICIDE;
893 } else {
894 ceph_abort();
895 }
896 return err;
897 }
898 if (verbose || idata.to_create.size() > 2) {
899 cout << debug_prefix << " created object " << it->first.second
900 << std::endl;
901 }
902 break;
903 case AIO_MAKE_OBJECT:
904 cout << debug_prefix << " launching asynchronous create "
905 << it->first.second << std::endl;
906 aiocs[count] = rados.aio_create_completion();
907 io_ctx.aio_operate(it->first.second, aiocs[count], it->second);
908 count++;
909 if ((int)idata.to_create.size() == count) {
910 cout << "starting aiowrite waiting loop" << std::endl;
911 for (count -= 1; count >= 0; count--) {
912 aiocs[count]->wait_for_complete();
913 err = aiocs[count]->get_return_value();
914 if (err < 0) {
915 //this can happen if someone else was cleaning up after us.
916 cerr << debug_prefix << " a create failed"
917 << " with code " << err << std::endl;
918 if (err == -EEXIST) {
919 //someone thinks we died, so die
920 cerr << client_name << " is suiciding!" << std::endl;
921 return -ESUICIDE;
922 } else {
923 ceph_abort();
924 }
925 return err;
926 }
927 if (verbose || idata.to_create.size() > 2) {
928 cout << debug_prefix << " completed aio " << aiocs.size() - count
929 << "/" << aiocs.size() << std::endl;
930 }
931 }
932 }
933 break;
934 case REMOVE_OBJECT://deleting
935 if (verbose) cout << debug_prefix << " deleting " << it->first.second
936 << std::endl;
937 err = io_ctx.operate(it->first.second, it->second);
938 if (err < 0) {
939 //if someone else called cleanup on this prefix first
940 if (verbose) cout << debug_prefix << " deleting " << it->first.second
941 << "failed with code" << err << std::endl;
942 }
943 if (verbose) cout << debug_prefix << " deleted " << it->first.second
944 << std::endl;
945 break;
946 case REMOVE_PREFIX://rewriting index
947 if (verbose) cout << debug_prefix << " updating index " << std::endl;
948 err = io_ctx.operate(index_name, it->second);
949 if (err < 0) {
950 if (verbose) cout << debug_prefix
951 << " rewriting the index failed with code " << err
952 << ". someone else must have thought we died, so dying" << std::endl;
953 return -ETIMEDOUT;
954 }
955 if (verbose) cout << debug_prefix << " updated index." << std::endl;
956 break;
957 case RESTORE_OBJECT:
958 if (verbose) cout << debug_prefix << " restoring " << it->first.second
959 << std::endl;
960 err = io_ctx.operate(it->first.second, it->second);
961 if (err < 0) {
962 if (verbose) cout << debug_prefix << "restoring " << it->first.second
963 << " failed"
964 << " with " << err << std::endl;
965 return err;
966 }
967 if (verbose) cout << debug_prefix << " restored " << it->first.second
968 << std::endl;
969 break;
970 default:
971 if (verbose) cout << debug_prefix << " performing unknown op on "
972 << it->first.second
973 << std::endl;
974 err = io_ctx.operate(index_name, it->second);
975 if (err < 0) {
976 if (verbose) cout << debug_prefix << " unknown op on "
977 << it->first.second
978 << " failed with " << err << std::endl;
979 return err;
980 }
981 if (verbose) cout << debug_prefix << " unknown op on "
982 << it->first.second
983 << " succeeded." << std::endl;
984 break;
985 }
986 }
987
988 return err;
989 }
990
991 int KvFlatBtreeAsync::cleanup(const index_data &idata, const int &error) {
992 if (verbose) cout << "\t\t" << client_name << ": cleaning up after "
993 << idata.str()
994 << std::endl;
995 int err = 0;
996 ceph_assert(idata.prefix != "");
997 map<std::string,bufferlist> new_index;
998 map<std::string, pair<bufferlist, int> > assertions;
999 switch (error) {
1000 case -EFIRSTOBJ: {
1001 //this happens if the split or rebalance failed to mark the first object,
1002 //meaning only the index needs to be changed.
1003 //restore objects that had been marked unwritable.
1004 for(vector<delete_data >::const_iterator it =
1005 idata.to_delete.begin();
1006 it != idata.to_delete.end(); ++it) {
1007 index_data this_entry;
1008 this_entry.obj = (*it).obj;
1009 this_entry.min_kdata = it->min;
1010 this_entry.kdata = it->max;
1011 new_index[it->max.encoded()] = to_bl(this_entry);
1012 this_entry = idata;
1013 this_entry.obj = it->obj;
1014 this_entry.min_kdata = it->min;
1015 this_entry.kdata = it->max;
1016 if (verbose) cout << "\t\t\t" << client_name
1017 << "-cleanup: will assert index contains "
1018 << this_entry.str() << std::endl;
1019 assertions[it->max.encoded()] =
1020 pair<bufferlist, int>(to_bl(this_entry),
1021 CEPH_OSD_CMPXATTR_OP_EQ);
1022 }
1023
1024 //update the index
1025 librados::ObjectWriteOperation update_index;
1026 update_index.omap_cmp(assertions, &err);
1027 update_index.omap_set(new_index);
1028 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: updating index"
1029 << std::endl;
1030 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) {
1031 return -ESUICIDE;
1032 }
1033 err = io_ctx.operate(index_name, &update_index);
1034 if (err < 0) {
1035 if (verbose) cout << "\t\t\t" << client_name
1036 << "-cleanup: rewriting failed with "
1037 << err << ". returning -ECANCELED" << std::endl;
1038 return -ECANCELED;
1039 }
1040 if (verbose) cout << "\t\t\t" << client_name
1041 << "-cleanup: updated index. cleanup done."
1042 << std::endl;
1043 break;
1044 }
1045 case -ERANGE: {
1046 //this happens if a split or rebalance fails to mark an object. It is a
1047 //special case of rolling back that does not have to deal with new objects.
1048
1049 //restore objects that had been marked unwritable.
1050 vector<delete_data >::const_iterator it;
1051 for(it = idata.to_delete.begin();
1052 it != idata.to_delete.end(); ++it) {
1053 index_data this_entry;
1054 this_entry.obj = (*it).obj;
1055 this_entry.min_kdata = it->min;
1056 this_entry.kdata = it->max;
1057 new_index[it->max.encoded()] = to_bl(this_entry);
1058 this_entry = idata;
1059 this_entry.obj = it->obj;
1060 this_entry.min_kdata = it->min;
1061 this_entry.kdata = it->max;
1062 if (verbose) cout << "\t\t\t" << client_name
1063 << "-cleanup: will assert index contains "
1064 << this_entry.str() << std::endl;
1065 assertions[it->max.encoded()] =
1066 pair<bufferlist, int>(to_bl(this_entry),
1067 CEPH_OSD_CMPXATTR_OP_EQ);
1068 }
1069 it = idata.to_delete.begin();
1070 librados::ObjectWriteOperation restore;
1071 set_up_restore_object(&restore);
1072 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) {
1073 return -ESUICIDE;
1074 }
1075 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: restoring "
1076 << it->obj
1077 << std::endl;
1078 err = io_ctx.operate(it->obj, &restore);
1079 if (err < 0) {
1080 //i.e., -ECANCELED because the object was already restored by someone
1081 //else
1082 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: restoring "
1083 << it->obj
1084 << " failed with " << err << std::endl;
1085 } else {
1086 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: restored "
1087 << it->obj
1088 << std::endl;
1089 }
1090
1091 //update the index
1092 librados::ObjectWriteOperation update_index;
1093 update_index.omap_cmp(assertions, &err);
1094 update_index.omap_set(new_index);
1095 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: updating index"
1096 << std::endl;
1097 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) {
1098 return -ESUICIDE;
1099 }
1100 err = io_ctx.operate(index_name, &update_index);
1101 if (err < 0) {
1102 if (verbose) cout << "\t\t\t" << client_name
1103 << "-cleanup: rewriting failed with "
1104 << err << ". returning -ECANCELED" << std::endl;
1105 return -ECANCELED;
1106 }
1107 if (verbose) cout << "\t\t\t" << client_name
1108 << "-cleanup: updated index. cleanup done."
1109 << std::endl;
1110 break;
1111 }
1112 case -ENOENT: {
1113 if (verbose) cout << "\t\t" << client_name << "-cleanup: rolling forward"
1114 << std::endl;
1115 //all changes were created except for updating the index and possibly
1116 //deleting the objects. roll forward.
1117 vector<pair<pair<int, string>, librados::ObjectWriteOperation*> > ops;
1118 vector<librados::ObjectWriteOperation> owos(idata.to_delete.size() + 1);
1119 for (int i = 0; i <= (int)idata.to_delete.size(); ++i) {
1120 ops.push_back(make_pair(pair<int, string>(0, ""), &owos[i]));
1121 }
1122 set_up_ops(vector<object_data>(),
1123 vector<object_data>(), &ops, idata, &err);
1124 err = perform_ops("\t\t" + client_name + "-cleanup:", idata, &ops);
1125 if (err < 0) {
1126 if (err == -ESUICIDE) {
1127 return -ESUICIDE;
1128 }
1129 if (verbose) cout << "\t\t\t" << client_name
1130 << "-cleanup: rewriting failed with "
1131 << err << ". returning -ECANCELED" << std::endl;
1132 return -ECANCELED;
1133 }
1134 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: updated index"
1135 << std::endl;
1136 break;
1137 }
1138 default: {
1139 //roll back all changes.
1140 if (verbose) cout << "\t\t" << client_name << "-cleanup: rolling back"
1141 << std::endl;
1142 map<std::string,bufferlist> new_index;
1143 std::set<string> to_remove;
1144 map<std::string, pair<bufferlist, int> > assertions;
1145
1146 //mark the objects to be created. if someone else already has, die.
1147 for(vector<create_data >::const_reverse_iterator it =
1148 idata.to_create.rbegin();
1149 it != idata.to_create.rend(); ++it) {
1150 librados::ObjectWriteOperation rm;
1151 set_up_unwrite_object(0, &rm);
1152 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 )
1153 {
1154 return -ESUICIDE;
1155 }
1156 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: marking "
1157 << it->obj
1158 << std::endl;
1159 err = io_ctx.operate(it->obj, &rm);
1160 if (err < 0) {
1161 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: marking "
1162 << it->obj
1163 << " failed with " << err << std::endl;
1164 } else {
1165 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: marked "
1166 << it->obj
1167 << std::endl;
1168 }
1169 }
1170
1171 //restore objects that had been marked unwritable.
1172 for(vector<delete_data >::const_iterator it =
1173 idata.to_delete.begin();
1174 it != idata.to_delete.end(); ++it) {
1175 index_data this_entry;
1176 this_entry.obj = (*it).obj;
1177 this_entry.min_kdata = it->min;
1178 this_entry.kdata = it->max;
1179 new_index[it->max.encoded()] = to_bl(this_entry);
1180 this_entry = idata;
1181 this_entry.obj = it->obj;
1182 this_entry.min_kdata = it->min;
1183 this_entry.kdata = it->max;
1184 if (verbose) cout << "\t\t\t" << client_name
1185 << "-cleanup: will assert index contains "
1186 << this_entry.str() << std::endl;
1187 assertions[it->max.encoded()] =
1188 pair<bufferlist, int>(to_bl(this_entry),
1189 CEPH_OSD_CMPXATTR_OP_EQ);
1190 librados::ObjectWriteOperation restore;
1191 set_up_restore_object(&restore);
1192 if (verbose) cout << "\t\t\t" << client_name
1193 << "-cleanup: will assert index contains "
1194 << this_entry.str() << std::endl;
1195 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 )
1196 {
1197 return -ESUICIDE;
1198 }
1199 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: restoring "
1200 << it->obj
1201 << std::endl;
1202 err = io_ctx.operate(it->obj, &restore);
1203 if (err == -ENOENT) {
1204 //it had gotten far enough to be rolled forward - unmark the objects
1205 //and roll forward.
1206 if (verbose) cout << "\t\t\t" << client_name
1207 << "-cleanup: roll forward instead"
1208 << std::endl;
1209 for(vector<create_data >::const_iterator cit =
1210 idata.to_create.begin();
1211 cit != idata.to_create.end(); ++cit) {
1212 librados::ObjectWriteOperation res;
1213 set_up_restore_object(&res);
1214 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)()
1215 == 1 ) {
1216 return -ECANCELED;
1217 }
1218 if (verbose) cout << "\t\t\t" << client_name
1219 << "-cleanup: restoring " << cit->obj
1220 << std::endl;
1221 err = io_ctx.operate(cit->obj, &res);
1222 if (err < 0) {
1223 if (verbose) cout << "\t\t\t" << client_name
1224 << "-cleanup: restoring "
1225 << cit->obj << " failed with " << err << std::endl;
1226 }
1227 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: restored "
1228 << cit->obj
1229 << std::endl;
1230 }
1231 return cleanup(idata, -ENOENT);
1232 } else if (err < 0) {
1233 //i.e., -ECANCELED because the object was already restored by someone
1234 //else
1235 if (verbose) cout << "\t\t\t" << client_name
1236 << "-cleanup: restoring " << it->obj
1237 << " failed with " << err << std::endl;
1238 } else {
1239 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: restored "
1240 << it->obj
1241 << std::endl;
1242 }
1243 }
1244
1245 //remove the new objects
1246 for(vector<create_data >::const_reverse_iterator it =
1247 idata.to_create.rbegin();
1248 it != idata.to_create.rend(); ++it) {
1249 to_remove.insert(it->max.encoded());
1250 librados::ObjectWriteOperation rm;
1251 rm.remove();
1252 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 )
1253 {
1254 return -ESUICIDE;
1255 }
1256 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: removing "
1257 << it->obj
1258 << std::endl;
1259 err = io_ctx.operate(it->obj, &rm);
1260 if (err < 0) {
1261 if (verbose) cout << "\t\t\t" << client_name
1262 << "-cleanup: failed to remove "
1263 << it->obj << std::endl;
1264 } else {
1265 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: removed "
1266 << it->obj
1267 << std::endl;
1268 }
1269 }
1270
1271 //update the index
1272 librados::ObjectWriteOperation update_index;
1273 update_index.omap_cmp(assertions, &err);
1274 update_index.omap_rm_keys(to_remove);
1275 update_index.omap_set(new_index);
1276 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: updating index"
1277 << std::endl;
1278 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) {
1279 return -ESUICIDE;
1280 }
1281 err = io_ctx.operate(index_name, &update_index);
1282 if (err < 0) {
1283 if (verbose) cout << "\t\t\t" << client_name
1284 << "-cleanup: rewriting failed with "
1285 << err << ". returning -ECANCELED" << std::endl;
1286 return -ECANCELED;
1287 }
1288 if (verbose) cout << "\t\t\t" << client_name
1289 << "-cleanup: updated index. cleanup done."
1290 << std::endl;
1291 break;
1292 }
1293 }
1294 return err;
1295 }
1296
1297 string KvFlatBtreeAsync::to_string(string s, int i) {
1298 stringstream ret;
1299 ret << s << i;
1300 return ret.str();
1301 }
1302
1303 string KvFlatBtreeAsync::get_name() {
1304 return rados_id;
1305 }
1306
1307 void KvFlatBtreeAsync::set_inject(injection_t inject, int wait_time) {
1308 interrupt = inject;
1309 wait_ms = wait_time;
1310 }
1311
1312 int KvFlatBtreeAsync::setup(int argc, const char** argv) {
1313 int r = rados.init(rados_id.c_str());
1314 if (r < 0) {
1315 cerr << "error during init" << r << std::endl;
1316 return r;
1317 }
1318 r = rados.conf_parse_argv(argc, argv);
1319 if (r < 0) {
1320 cerr << "error during parsing args" << r << std::endl;
1321 return r;
1322 }
1323 r = rados.conf_parse_env(NULL);
1324 if (r < 0) {
1325 cerr << "error during parsing env" << r << std::endl;
1326 return r;
1327 }
1328 r = rados.conf_read_file(NULL);
1329 if (r < 0) {
1330 cerr << "error during read file: " << r << std::endl;
1331 return r;
1332 }
1333 r = rados.connect();
1334 if (r < 0) {
1335 cerr << "error during connect: " << r << std::endl;
1336 return r;
1337 }
1338 r = rados.ioctx_create(pool_name.c_str(), io_ctx);
1339 if (r < 0) {
1340 cerr << "error creating io ctx: " << r << std::endl;
1341 rados.shutdown();
1342 return r;
1343 }
1344
1345 librados::ObjectWriteOperation make_index;
1346 make_index.create(true);
1347 map<std::string,bufferlist> index_map;
1348 index_data idata;
1349 idata.obj = client_name;
1350 idata.min_kdata.raw_key = "";
1351 idata.kdata = key_data("");
1352 index_map["1"] = to_bl(idata);
1353 make_index.omap_set(index_map);
1354 r = io_ctx.operate(index_name, &make_index);
1355 if (r < 0) {
1356 if (verbose) cout << client_name << ": Making the index failed with code "
1357 << r
1358 << std::endl;
1359 return 0;
1360 }
1361 if (verbose) cout << client_name << ": created index object" << std::endl;
1362
1363 librados::ObjectWriteOperation make_max_obj;
1364 make_max_obj.create(true);
1365 make_max_obj.setxattr("unwritable", to_bl("0"));
1366 make_max_obj.setxattr("size", to_bl("0"));
1367 r = io_ctx.operate(client_name, &make_max_obj);
1368 if (r < 0) {
1369 if (verbose) cout << client_name << ": Setting xattr failed with code "
1370 << r
1371 << std::endl;
1372 }
1373
1374 return 0;
1375 }
1376
1377 int KvFlatBtreeAsync::set(const string &key, const bufferlist &val,
1378 bool update_on_existing) {
1379 if (verbose) cout << client_name << " is "
1380 << (update_on_existing? "updating " : "setting ")
1381 << key << std::endl;
1382 int err = 0;
1383 utime_t mytime;
1384 index_data idata(key);
1385
1386 if (verbose) cout << "\t" << client_name << ": finding oid" << std::endl;
1387 err = read_index(key, &idata, NULL, false);
1388 if (err < 0) {
1389 if (verbose) cout << "\t" << client_name
1390 << ": getting oid failed with code "
1391 << err << std::endl;
1392 return err;
1393 }
1394 if (verbose) cout << "\t" << client_name << ": index data is " << idata.str()
1395 << ", object is " << idata.obj << std::endl;
1396
1397 err = set_op(key, val, update_on_existing, idata);
1398
1399 if (verbose) cout << "\t" << client_name << ": finished set with " << err
1400 << std::endl;
1401 return err;
1402 }
1403
1404 int KvFlatBtreeAsync::set_op(const string &key, const bufferlist &val,
1405 bool update_on_existing, index_data &idata) {
1406 //write
1407
1408 bufferlist inbl;
1409 omap_set_args args;
1410 args.bound = 2 * k;
1411 args.exclusive = !update_on_existing;
1412 args.omap[key] = val;
1413 args.encode(inbl);
1414
1415 librados::ObjectWriteOperation owo;
1416 owo.exec("kvs", "omap_insert", inbl);
1417 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) {
1418 if (verbose) cout << client_name << " IS SUICIDING!" << std::endl;
1419 return -ESUICIDE;
1420 }
1421 if (verbose) cout << "\t" << client_name << ": inserting " << key
1422 << " into object "
1423 << idata.obj << std::endl;
1424 int err = io_ctx.operate(idata.obj, &owo);
1425 if (err < 0) {
1426 switch (err) {
1427 case -EEXIST: {
1428 //the key already exists and this is an exclusive insert.
1429 cerr << "\t" << client_name << ": writing key failed with "
1430 << err << std::endl;
1431 return err;
1432 }
1433 case -EKEYREJECTED: {
1434 //the object needs to be split.
1435 do {
1436 if (verbose) cout << "\t" << client_name << ": running split on "
1437 << idata.obj
1438 << std::endl;
1439 err = read_index(key, &idata, NULL, true);
1440 if (err < 0) {
1441 if (verbose) cout << "\t" << client_name
1442 << ": getting oid failed with code "
1443 << err << std::endl;
1444 return err;
1445 }
1446 err = split(idata);
1447 if (err < 0 && err != -ENOENT && err != -EBALANCE) {
1448 if (verbose) cerr << "\t" << client_name << ": split failed with "
1449 << err << std::endl;
1450 int ret = handle_set_rm_errors(err, idata.obj, key, &idata, NULL);
1451 switch (ret) {
1452 case -ESUICIDE:
1453 if (verbose) cout << client_name << " IS SUICIDING!" << std::endl;
1454 return ret;
1455 case 1:
1456 return set_op(key, val, update_on_existing, idata);
1457 case 2:
1458 return err;
1459 }
1460 }
1461 } while (err < 0 && err != -EBALANCE && err != -ENOENT);
1462 err = read_index(key, &idata, NULL, true);
1463 if (err < 0) {
1464 if (verbose) cout << "\t" << client_name
1465 << ": getting oid failed with code "
1466 << err << std::endl;
1467 return err;
1468 }
1469 return set_op(key, val, update_on_existing, idata);
1470 }
1471 default:
1472 if (verbose) cerr << "\t" << client_name << ": writing obj failed with "
1473 << err << std::endl;
1474 if (err == -ENOENT || err == -EACCES) {
1475 if (err == -ENOENT) {
1476 if (verbose) cout << "CACHE FAILURE" << std::endl;
1477 }
1478 err = read_index(key, &idata, NULL, true);
1479 if (err < 0) {
1480 if (verbose) cout << "\t" << client_name
1481 << ": getting oid failed with code "
1482 << err << std::endl;
1483 return err;
1484 }
1485 if (verbose) cout << "\t" << client_name << ": index data is "
1486 << idata.str()
1487 << ", object is " << idata.obj << std::endl;
1488 return set_op(key, val, update_on_existing, idata);
1489 } else {
1490 return err;
1491 }
1492 }
1493 }
1494 return 0;
1495 }
1496
1497 int KvFlatBtreeAsync::remove(const string &key) {
1498 if (verbose) cout << client_name << ": removing " << key << std::endl;
1499 int err = 0;
1500 string obj;
1501 utime_t mytime;
1502 index_data idata;
1503 index_data next_idata;
1504
1505 if (verbose) cout << "\t" << client_name << ": finding oid" << std::endl;
1506 err = read_index(key, &idata, &next_idata, false);
1507 if (err < 0) {
1508 if (verbose) cout << "getting oid failed with code " << err << std::endl;
1509 return err;
1510 }
1511 obj = idata.obj;
1512 if (verbose) cout << "\t" << client_name << ": idata is " << idata.str()
1513 << ", next_idata is " << next_idata.str()
1514 << ", obj is " << obj << std::endl;
1515
1516 err = remove_op(key, idata, next_idata);
1517
1518 if (verbose) cout << "\t" << client_name << ": finished remove with " << err
1519 << " and exiting" << std::endl;
1520 return err;
1521 }
1522
1523 int KvFlatBtreeAsync::remove_op(const string &key, index_data &idata,
1524 index_data &next_idata) {
1525 //write
1526 bufferlist inbl;
1527 omap_rm_args args;
1528 args.bound = k;
1529 args.omap.insert(key);
1530 args.encode(inbl);
1531
1532 librados::ObjectWriteOperation owo;
1533 owo.exec("kvs", "omap_remove", inbl);
1534 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) {
1535 if (verbose) cout << client_name << " IS SUICIDING!" << std::endl;
1536 return -ESUICIDE;
1537 }
1538 if (verbose) cout << "\t" << client_name << ": removing " << key << " from "
1539 << idata.obj
1540 << std::endl;
1541 int err = io_ctx.operate(idata.obj, &owo);
1542 if (err < 0) {
1543 if (verbose) cout << "\t" << client_name << ": writing obj failed with "
1544 << err << std::endl;
1545 switch (err) {
1546 case -ENODATA: {
1547 //the key does not exist in the object
1548 return err;
1549 }
1550 case -EKEYREJECTED: {
1551 //the object needs to be split.
1552 do {
1553 if (verbose) cerr << "\t" << client_name << ": running rebalance on "
1554 << idata.obj << std::endl;
1555 err = read_index(key, &idata, &next_idata, true);
1556 if (err < 0) {
1557 if (verbose) cout << "\t" << client_name
1558 << ": getting oid failed with code "
1559 << err << std::endl;
1560 return err;
1561 }
1562 err = rebalance(idata, next_idata);
1563 if (err < 0 && err != -ENOENT && err != -EBALANCE) {
1564 if (verbose) cerr << "\t" << client_name << ": rebalance returned "
1565 << err << std::endl;
1566 int ret = handle_set_rm_errors(err, idata.obj, key, &idata,
1567 &next_idata);
1568 switch (ret) {
1569 case -ESUICIDE:
1570 if (verbose) cout << client_name << " IS SUICIDING!" << std::endl;
1571 return err;
1572 case 1:
1573 return remove_op(key, idata, next_idata);
1574 case 2:
1575 return err;
1576 break;
1577 case -EUCLEAN:
1578 //this is the only node, so it's ok to go below k.
1579 librados::ObjectWriteOperation owo;
1580 bufferlist inbl;
1581 omap_rm_args args;
1582 args.bound = 0;
1583 args.omap.insert(key);
1584 args.encode(inbl);
1585 owo.exec("kvs", "omap_remove", inbl);
1586 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)()
1587 == 1 ) {
1588 if (verbose) cout << client_name << " IS SUICIDING!"
1589 << std::endl;
1590 return -ESUICIDE;
1591 }
1592 if (verbose) cout << "\t" << client_name << ": removing " << key
1593 << " from "
1594 << idata.obj
1595 << std::endl;
1596 int err = io_ctx.operate(idata.obj, &owo);
1597 if (err == 0) {
1598 return 0;
1599 }
1600 }
1601 }
1602 } while (err < 0 && err != -EBALANCE && err != -ENOENT);
1603 err = read_index(key, &idata, &next_idata, true);
1604 if (err < 0) {
1605 if (verbose) cout << "\t" << client_name
1606 << ": getting oid failed with code "
1607 << err << std::endl;
1608 return err;
1609 }
1610 return remove(key);
1611 }
1612 default:
1613 if (err == -ENOENT || err == -EACCES) {
1614 err = read_index(key, &idata, &next_idata, true);
1615 if (err < 0) {
1616 if (verbose) cout << "\t" << client_name
1617 << ": getting oid failed with code "
1618 << err << std::endl;
1619 return err;
1620 }
1621 if (verbose) cout << "\t" << client_name << ": index data is "
1622 << idata.str()
1623 << ", object is " << idata.obj << std::endl;
1624 //idea: we read the time every time we read the index anyway - store it.
1625 return remove_op(key, idata, next_idata);
1626 } else {
1627 return err;
1628 }
1629 }
1630 }
1631 return 0;
1632 }
1633
1634 int KvFlatBtreeAsync::handle_set_rm_errors(int &err, string obj,
1635 string key,
1636 index_data * idata, index_data * next_idata) {
1637 if (err == -ESUICIDE) {
1638 return err;
1639 } else if (err == -ECANCELED //if an object was unwritable or index changed
1640 || err == -EPREFIX //if there is currently a prefix
1641 || err == -ETIMEDOUT// if the index changes during the op - i.e. cleanup
1642 || err == -EACCES) //possible if we were acting on old index data
1643 {
1644 err = read_index(key, idata, next_idata, true);
1645 if (err < 0) {
1646 return err;
1647 }
1648 if (verbose) cout << "\t" << client_name << ": prefix is " << idata->str()
1649 << std::endl;
1650 if (idata->obj != obj) {
1651 //someone else has split or cleaned up or something. start over.
1652 return 1;//meaning repeat
1653 }
1654 } else if (err != -ETIMEDOUT && err != -ERANGE && err != -EACCES
1655 && err != -EUCLEAN){
1656 if (verbose) cout << "\t" << client_name
1657 << ": split encountered an unexpected error: " << err
1658 << std::endl;
1659 return 2;
1660 }
1661 return err;
1662 }
1663
1664 int KvFlatBtreeAsync::get(const string &key, bufferlist *val) {
1665 opmap['g']++;
1666 if (verbose) cout << client_name << ": getting " << key << std::endl;
1667 int err = 0;
1668 index_data idata;
1669 utime_t mytime;
1670
1671 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) {
1672 return -ESUICIDE;
1673 }
1674 err = read_index(key, &idata, NULL, false);
1675 mytime = ceph_clock_now();
1676 if (err < 0) {
1677 if (verbose) cout << "getting oid failed with code " << err << std::endl;
1678 return err;
1679 }
1680
1681 err = get_op(key, val, idata);
1682
1683 if (verbose) cout << client_name << ": got " << key << " with " << err
1684 << std::endl;
1685
1686 return err;
1687 }
1688
1689 int KvFlatBtreeAsync::get_op(const string &key, bufferlist *val,
1690 index_data &idata) {
1691 int err = 0;
1692 std::set<std::string> key_set;
1693 key_set.insert(key);
1694 map<std::string,bufferlist> omap;
1695 librados::ObjectReadOperation read;
1696 read.omap_get_vals_by_keys(key_set, &omap, &err);
1697 err = io_ctx.operate(idata.obj, &read, NULL);
1698 if (err < 0) {
1699 if (err == -ENOENT) {
1700 err = read_index(key, &idata, NULL, true);
1701 if (err < 0) {
1702 if (verbose) cout << "\t" << client_name
1703 << ": getting oid failed with code "
1704 << err << std::endl;
1705 return err;
1706 }
1707 if (verbose) cout << "\t" << client_name << ": index data is "
1708 << idata.str()
1709 << ", object is " << idata.obj << std::endl;
1710 return get_op(key, val, idata);
1711 } else {
1712 if (verbose) cout << client_name
1713 << ": get encountered an unexpected error: " << err
1714 << std::endl;
1715 return err;
1716 }
1717 }
1718
1719 *val = omap[key];
1720 return err;
1721 }
1722
1723 void *KvFlatBtreeAsync::pset(void *ptr) {
1724 struct aio_set_args *args = (struct aio_set_args *)ptr;
1725 *args->err =
1726 args->kvba->KvFlatBtreeAsync::set((string)args->key,
1727 (bufferlist)args->val, (bool)args->exc);
1728 args->cb(args->err, args->cb_args);
1729 delete args;
1730 return NULL;
1731 }
1732
1733 void KvFlatBtreeAsync::aio_set(const string &key, const bufferlist &val,
1734 bool exclusive, callback cb, void * cb_args, int * err) {
1735 aio_set_args *args = new aio_set_args();
1736 args->kvba = this;
1737 args->key = key;
1738 args->val = val;
1739 args->exc = exclusive;
1740 args->cb = cb;
1741 args->cb_args = cb_args;
1742 args->err = err;
1743 pthread_t t;
1744 int r = pthread_create(&t, NULL, pset, (void*)args);
1745 if (r < 0) {
1746 *args->err = r;
1747 return;
1748 }
1749 pthread_detach(t);
1750 }
1751
1752 void *KvFlatBtreeAsync::prm(void *ptr) {
1753 struct aio_rm_args *args = (struct aio_rm_args *)ptr;
1754 *args->err =
1755 args->kvba->KvFlatBtreeAsync::remove((string)args->key);
1756 args->cb(args->err, args->cb_args);
1757 delete args;
1758 return NULL;
1759 }
1760
1761 void KvFlatBtreeAsync::aio_remove(const string &key,
1762 callback cb, void * cb_args, int * err) {
1763 aio_rm_args * args = new aio_rm_args();
1764 args->kvba = this;
1765 args->key = key;
1766 args->cb = cb;
1767 args->cb_args = cb_args;
1768 args->err = err;
1769 pthread_t t;
1770 int r = pthread_create(&t, NULL, prm, (void*)args);
1771 if (r < 0) {
1772 *args->err = r;
1773 return;
1774 }
1775 pthread_detach(t);
1776 }
1777
1778 void *KvFlatBtreeAsync::pget(void *ptr) {
1779 struct aio_get_args *args = (struct aio_get_args *)ptr;
1780 *args->err =
1781 args->kvba->KvFlatBtreeAsync::get((string)args->key,
1782 (bufferlist *)args->val);
1783 args->cb(args->err, args->cb_args);
1784 delete args;
1785 return NULL;
1786 }
1787
1788 void KvFlatBtreeAsync::aio_get(const string &key, bufferlist *val,
1789 callback cb, void * cb_args, int * err) {
1790 aio_get_args * args = new aio_get_args();
1791 args->kvba = this;
1792 args->key = key;
1793 args->val = val;
1794 args->cb = cb;
1795 args->cb_args = cb_args;
1796 args->err = err;
1797 pthread_t t;
1798 int r = pthread_create(&t, NULL, pget, (void*)args);
1799 if (r < 0) {
1800 *args->err = r;
1801 return;
1802 }
1803 pthread_detach(t);
1804 }
1805
1806 int KvFlatBtreeAsync::set_many(const map<string, bufferlist> &in_map) {
1807 int err = 0;
1808 bufferlist inbl;
1809 bufferlist outbl;
1810 std::set<string> keys;
1811
1812 map<string, bufferlist> big_map;
1813 for (map<string, bufferlist>::const_iterator it = in_map.begin();
1814 it != in_map.end(); ++it) {
1815 keys.insert(it->first);
1816 big_map.insert(*it);
1817 }
1818
1819 if (verbose) cout << "created key set and big_map" << std::endl;
1820
1821 encode(keys, inbl);
1822 librados::AioCompletion * aioc = rados.aio_create_completion();
1823 io_ctx.aio_exec(index_name, aioc, "kvs", "read_many", inbl, &outbl);
1824 aioc->wait_for_complete();
1825 err = aioc->get_return_value();
1826 aioc->release();
1827 if (err < 0) {
1828 cerr << "getting index failed with " << err << std::endl;
1829 return err;
1830 }
1831
1832 map<string, bufferlist> imap;//read from the index
1833 auto blit = outbl.cbegin();
1834 decode(imap, blit);
1835
1836 if (verbose) cout << "finished reading index for objects. there are "
1837 << imap.size() << " entries that need to be changed. " << std::endl;
1838
1839
1840 vector<object_data> to_delete;
1841
1842 vector<object_data> to_create;
1843
1844 if (verbose) cout << "setting up to_delete and to_create vectors from index "
1845 << "map" << std::endl;
1846 //set up to_delete from index map
1847 for (map<string, bufferlist>::iterator it = imap.begin(); it != imap.end();
1848 ++it){
1849 index_data idata;
1850 blit = it->second.begin();
1851 idata.decode(blit);
1852 to_delete.push_back(object_data(idata.min_kdata, idata.kdata, idata.obj));
1853 err = read_object(idata.obj, &to_delete[to_delete.size() - 1]);
1854 if (err < 0) {
1855 if (verbose) cout << "reading " << idata.obj << " failed with " << err
1856 << std::endl;
1857 return set_many(in_map);
1858 }
1859
1860 big_map.insert(to_delete[to_delete.size() - 1].omap.begin(),
1861 to_delete[to_delete.size() - 1].omap.end());
1862 }
1863
1864 to_create.push_back(object_data(
1865 to_string(client_name, client_index++)));
1866 to_create[0].min_kdata = to_delete[0].min_kdata;
1867
1868 for(map<string, bufferlist>::iterator it = big_map.begin();
1869 it != big_map.end(); ++it) {
1870 if (to_create[to_create.size() - 1].omap.size() == 1.5 * k) {
1871 to_create[to_create.size() - 1].max_kdata =
1872 key_data(to_create[to_create.size() - 1]
1873 .omap.rbegin()->first);
1874
1875 to_create.push_back(object_data(
1876 to_string(client_name, client_index++)));
1877 to_create[to_create.size() - 1].min_kdata =
1878 to_create[to_create.size() - 2].max_kdata;
1879 }
1880
1881 to_create[to_create.size() - 1].omap.insert(*it);
1882 }
1883 to_create[to_create.size() - 1].max_kdata =
1884 to_delete[to_delete.size() - 1].max_kdata;
1885
1886 vector<librados::ObjectWriteOperation> owos(2 + 2 * to_delete.size()
1887 + to_create.size());
1888 vector<pair<pair<int, string>, librados::ObjectWriteOperation*> > ops;
1889
1890
1891 index_data idata;
1892 set_up_prefix_index(to_create, to_delete, &owos[0], &idata, &err);
1893
1894 if (verbose) cout << "finished making to_create and to_delete. "
1895 << std::endl;
1896
1897 ops.push_back(make_pair(
1898 pair<int, string>(ADD_PREFIX, index_name),
1899 &owos[0]));
1900 for (int i = 1; i < 2 + 2 * (int)to_delete.size() + (int)to_create.size();
1901 i++) {
1902 ops.push_back(make_pair(make_pair(0,""), &owos[i]));
1903 }
1904
1905 set_up_ops(to_create, to_delete, &ops, idata, &err);
1906
1907 cout << "finished setting up ops. Starting critical section..." << std::endl;
1908
1909 /////BEGIN CRITICAL SECTION/////
1910 //put prefix on index entry for idata.val
1911 err = perform_ops("\t\t" + client_name + "-set_many:", idata, &ops);
1912 if (err < 0) {
1913 return set_many(in_map);
1914 }
1915 if (verbose) cout << "\t\t" << client_name << "-split: done splitting."
1916 << std::endl;
1917 /////END CRITICAL SECTION/////
1918 std::scoped_lock l{icache_lock};
1919 for (vector<delete_data>::iterator it = idata.to_delete.begin();
1920 it != idata.to_delete.end(); ++it) {
1921 icache.erase(it->max);
1922 }
1923 for (vector<create_data>::iterator it = idata.to_create.begin();
1924 it != idata.to_create.end(); ++it) {
1925 icache.push(index_data(*it));
1926 }
1927 return err;
1928 }
1929
1930 int KvFlatBtreeAsync::remove_all() {
1931 if (verbose) cout << client_name << ": removing all" << std::endl;
1932 int err = 0;
1933 librados::ObjectReadOperation oro;
1934 librados::AioCompletion * oro_aioc = rados.aio_create_completion();
1935 std::map<std::string, bufferlist> index_set;
1936 oro.omap_get_vals2("",LONG_MAX,&index_set, nullptr, &err);
1937 err = io_ctx.aio_operate(index_name, oro_aioc, &oro, NULL);
1938 if (err < 0){
1939 if (err == -ENOENT) {
1940 return 0;
1941 }
1942 if (verbose) cout << "getting keys failed with error " << err << std::endl;
1943 return err;
1944 }
1945 oro_aioc->wait_for_complete();
1946 oro_aioc->release();
1947
1948 librados::ObjectWriteOperation rm_index;
1949 librados::AioCompletion * rm_index_aioc = rados.aio_create_completion();
1950 map<std::string,bufferlist> new_index;
1951 new_index["1"] = index_set["1"];
1952 rm_index.omap_clear();
1953 rm_index.omap_set(new_index);
1954 io_ctx.aio_operate(index_name, rm_index_aioc, &rm_index);
1955 err = rm_index_aioc->get_return_value();
1956 rm_index_aioc->release();
1957 if (err < 0) {
1958 if (verbose) cout << "rm index aioc failed with " << err
1959 << std::endl;
1960 return err;
1961 }
1962
1963 if (!index_set.empty()) {
1964 for (std::map<std::string,bufferlist>::iterator it = index_set.begin();
1965 it != index_set.end(); ++it){
1966 librados::ObjectWriteOperation sub;
1967 if (it->first == "1") {
1968 sub.omap_clear();
1969 } else {
1970 sub.remove();
1971 }
1972 index_data idata;
1973 auto b = it->second.cbegin();
1974 idata.decode(b);
1975 io_ctx.operate(idata.obj, &sub);
1976 }
1977 }
1978
1979 icache.clear();
1980
1981 return 0;
1982 }
1983
1984 int KvFlatBtreeAsync::get_all_keys(std::set<std::string> *keys) {
1985 if (verbose) cout << client_name << ": getting all keys" << std::endl;
1986 int err = 0;
1987 librados::ObjectReadOperation oro;
1988 std::map<std::string,bufferlist> index_set;
1989 oro.omap_get_vals2("",LONG_MAX,&index_set, nullptr, &err);
1990 io_ctx.operate(index_name, &oro, NULL);
1991 if (err < 0){
1992 if (verbose) cout << "getting keys failed with error " << err << std::endl;
1993 return err;
1994 }
1995 for (std::map<std::string,bufferlist>::iterator it = index_set.begin();
1996 it != index_set.end(); ++it){
1997 librados::ObjectReadOperation sub;
1998 std::set<std::string> ret;
1999 sub.omap_get_keys2("",LONG_MAX,&ret, nullptr, &err);
2000 index_data idata;
2001 auto b = it->second.cbegin();
2002 idata.decode(b);
2003 io_ctx.operate(idata.obj, &sub, NULL);
2004 keys->insert(ret.begin(), ret.end());
2005 }
2006 return err;
2007 }
2008
2009 int KvFlatBtreeAsync::get_all_keys_and_values(
2010 map<std::string,bufferlist> *kv_map) {
2011 if (verbose) cout << client_name << ": getting all keys and values"
2012 << std::endl;
2013 int err = 0;
2014 librados::ObjectReadOperation first_read;
2015 std::set<std::string> index_set;
2016 first_read.omap_get_keys2("",LONG_MAX,&index_set, nullptr, &err);
2017 io_ctx.operate(index_name, &first_read, NULL);
2018 if (err < 0){
2019 if (verbose) cout << "getting keys failed with error " << err << std::endl;
2020 return err;
2021 }
2022 for (std::set<std::string>::iterator it = index_set.begin();
2023 it != index_set.end(); ++it){
2024 librados::ObjectReadOperation sub;
2025 map<std::string, bufferlist> ret;
2026 sub.omap_get_vals2("",LONG_MAX,&ret, nullptr, &err);
2027 io_ctx.operate(*it, &sub, NULL);
2028 kv_map->insert(ret.begin(), ret.end());
2029 }
2030 return err;
2031 }
2032
2033 bool KvFlatBtreeAsync::is_consistent() {
2034 int err;
2035 bool ret = true;
2036 if (verbose) cout << client_name << ": checking consistency" << std::endl;
2037 std::map<std::string,bufferlist> index;
2038 map<std::string, std::set<std::string> > sub_objs;
2039 librados::ObjectReadOperation oro;
2040 oro.omap_get_vals2("",LONG_MAX,&index, nullptr, &err);
2041 io_ctx.operate(index_name, &oro, NULL);
2042 if (err < 0){
2043 //probably because the index doesn't exist - this might be ok.
2044 for (librados::NObjectIterator oit = io_ctx.nobjects_begin();
2045 oit != io_ctx.nobjects_end(); ++oit) {
2046 //if this executes, there are floating objects.
2047 cerr << "Not consistent! found floating object " << oit->get_oid()
2048 << std::endl;
2049 ret = false;
2050 }
2051 return ret;
2052 }
2053
2054 std::map<std::string, string> parsed_index;
2055 std::set<std::string> onames;
2056 std::set<std::string> special_names;
2057 for (map<std::string,bufferlist>::iterator it = index.begin();
2058 it != index.end(); ++it) {
2059 if (it->first != "") {
2060 index_data idata;
2061 auto b = it->second.cbegin();
2062 idata.decode(b);
2063 if (idata.prefix != "") {
2064 for(vector<delete_data>::iterator dit = idata.to_delete.begin();
2065 dit != idata.to_delete.end(); ++dit) {
2066 librados::ObjectReadOperation oro;
2067 librados::AioCompletion * aioc = rados.aio_create_completion();
2068 bufferlist un;
2069 oro.getxattr("unwritable", &un, &err);
2070 io_ctx.aio_operate(dit->obj, aioc, &oro, NULL);
2071 aioc->wait_for_complete();
2072 err = aioc->get_return_value();
2073 if (ceph_clock_now() - idata.ts > timeout) {
2074 if (err < 0) {
2075 aioc->release();
2076 if (err == -ENOENT) {
2077 continue;
2078 } else {
2079 cerr << "Not consistent! reading object " << dit->obj
2080 << "returned " << err << std::endl;
2081 ret = false;
2082 break;
2083 }
2084 }
2085 if (atoi(string(un.c_str(), un.length()).c_str()) != 1 &&
2086 aioc->get_version64() != dit->version) {
2087 cerr << "Not consistent! object " << dit->obj << " has been "
2088 << " modified since the client died was not cleaned up."
2089 << std::endl;
2090 ret = false;
2091 }
2092 }
2093 special_names.insert(dit->obj);
2094 aioc->release();
2095 }
2096 for(vector<create_data >::iterator cit = idata.to_create.begin();
2097 cit != idata.to_create.end(); ++cit) {
2098 special_names.insert(cit->obj);
2099 }
2100 }
2101 parsed_index.insert(make_pair(it->first, idata.obj));
2102 onames.insert(idata.obj);
2103 }
2104 }
2105
2106 //make sure that an object exists iff it either is the index
2107 //or is listed in the index
2108 for (librados::NObjectIterator oit = io_ctx.nobjects_begin();
2109 oit != io_ctx.nobjects_end(); ++oit) {
2110 string name = oit->get_oid();
2111 if (name != index_name && onames.count(name) == 0
2112 && special_names.count(name) == 0) {
2113 cerr << "Not consistent! found floating object " << name << std::endl;
2114 ret = false;
2115 }
2116 }
2117
2118 //check objects
2119 string prev = "";
2120 for (std::map<std::string, string>::iterator it = parsed_index.begin();
2121 it != parsed_index.end();
2122 ++it) {
2123 librados::ObjectReadOperation read;
2124 read.omap_get_keys2("", LONG_MAX, &sub_objs[it->second], nullptr, &err);
2125 err = io_ctx.operate(it->second, &read, NULL);
2126 int size_int = (int)sub_objs[it->second].size();
2127
2128 //check that size is in the right range
2129 if (it->first != "1" && special_names.count(it->second) == 0 &&
2130 err != -ENOENT && (size_int > 2*k|| size_int < k)
2131 && parsed_index.size() > 1) {
2132 cerr << "Not consistent! Object " << *it << " has size " << size_int
2133 << ", which is outside the acceptable range." << std::endl;
2134 ret = false;
2135 }
2136
2137 //check that all keys belong in that object
2138 for(std::set<std::string>::iterator subit = sub_objs[it->second].begin();
2139 subit != sub_objs[it->second].end(); ++subit) {
2140 if ((it->first != "1"
2141 && *subit > it->first.substr(1,it->first.length()))
2142 || *subit <= prev) {
2143 cerr << "Not consistent! key " << *subit << " does not belong in "
2144 << *it << std::endl;
2145 cerr << "not last element, i.e. " << it->first << " not equal to 1? "
2146 << (it->first != "1") << std::endl
2147 << "greater than " << it->first.substr(1,it->first.length())
2148 <<"? " << (*subit > it->first.substr(1,it->first.length()))
2149 << std::endl
2150 << "less than or equal to " << prev << "? "
2151 << (*subit <= prev) << std::endl;
2152 ret = false;
2153 }
2154 }
2155
2156 prev = it->first.substr(1,it->first.length());
2157 }
2158
2159 if (!ret) {
2160 if (verbose) cout << "failed consistency test - see error log"
2161 << std::endl;
2162 cerr << str();
2163 } else {
2164 if (verbose) cout << "passed consistency test" << std::endl;
2165 }
2166 return ret;
2167 }
2168
2169 string KvFlatBtreeAsync::str() {
2170 stringstream ret;
2171 ret << "Top-level map:" << std::endl;
2172 int err = 0;
2173 std::set<std::string> keys;
2174 std::map<std::string,bufferlist> index;
2175 librados::ObjectReadOperation oro;
2176 librados::AioCompletion * top_aioc = rados.aio_create_completion();
2177 oro.omap_get_vals2("",LONG_MAX,&index, nullptr, &err);
2178 io_ctx.aio_operate(index_name, top_aioc, &oro, NULL);
2179 top_aioc->wait_for_complete();
2180 err = top_aioc->get_return_value();
2181 top_aioc->release();
2182 if (err < 0 && err != -5){
2183 if (verbose) cout << "getting keys failed with error " << err << std::endl;
2184 return ret.str();
2185 }
2186 if(index.empty()) {
2187 ret << "There are no objects!" << std::endl;
2188 return ret.str();
2189 }
2190
2191 for (map<std::string,bufferlist>::iterator it = index.begin();
2192 it != index.end(); ++it) {
2193 keys.insert(string(it->second.c_str(), it->second.length())
2194 .substr(1,it->second.length()));
2195 }
2196
2197 vector<std::string> all_names;
2198 vector<int> all_sizes(index.size());
2199 vector<int> all_versions(index.size());
2200 vector<bufferlist> all_unwrit(index.size());
2201 vector<map<std::string,bufferlist> > all_maps(keys.size());
2202 vector<map<std::string,bufferlist>::iterator> its(keys.size());
2203 unsigned done = 0;
2204 vector<bool> dones(keys.size());
2205 ret << std::endl << string(150,'-') << std::endl;
2206
2207 for (map<std::string,bufferlist>::iterator it = index.begin();
2208 it != index.end(); ++it){
2209 index_data idata;
2210 auto b = it->second.cbegin();
2211 idata.decode(b);
2212 string s = idata.str();
2213 ret << "|" << string((148 -
2214 ((*it).first.length()+s.length()+3))/2,' ');
2215 ret << (*it).first;
2216 ret << " | ";
2217 ret << string(idata.str());
2218 ret << string((148 -
2219 ((*it).first.length()+s.length()+3))/2,' ');
2220 ret << "|\t";
2221 all_names.push_back(idata.obj);
2222 ret << std::endl << string(150,'-') << std::endl;
2223 }
2224
2225 int indexer = 0;
2226
2227 //get the object names and sizes
2228 for(vector<std::string>::iterator it = all_names.begin(); it
2229 != all_names.end();
2230 ++it) {
2231 librados::ObjectReadOperation oro;
2232 librados::AioCompletion *aioc = rados.aio_create_completion();
2233 oro.omap_get_vals2("", LONG_MAX, &all_maps[indexer], nullptr, &err);
2234 oro.getxattr("unwritable", &all_unwrit[indexer], &err);
2235 io_ctx.aio_operate(*it, aioc, &oro, NULL);
2236 aioc->wait_for_complete();
2237 if (aioc->get_return_value() < 0) {
2238 ret << "reading" << *it << "failed: " << err << std::endl;
2239 //return ret.str();
2240 }
2241 all_sizes[indexer] = all_maps[indexer].size();
2242 all_versions[indexer] = aioc->get_version64();
2243 indexer++;
2244 aioc->release();
2245 }
2246
2247 ret << "///////////////////OBJECT NAMES////////////////" << std::endl;
2248 //HEADERS
2249 ret << std::endl;
2250 for (int i = 0; i < indexer; i++) {
2251 ret << "---------------------------\t";
2252 }
2253 ret << std::endl;
2254 for (int i = 0; i < indexer; i++) {
2255 ret << "|" << string((25 -
2256 (string("Bucket: ").length() + all_names[i].length()))/2, ' ');
2257 ret << "Bucket: " << all_names[i];
2258 ret << string((25 -
2259 (string("Bucket: ").length() + all_names[i].length()))/2, ' ') << "|\t";
2260 }
2261 ret << std::endl;
2262 for (int i = 0; i < indexer; i++) {
2263 its[i] = all_maps[i].begin();
2264 ret << "|" << string((25 - (string("size: ").length()
2265 + to_string("",all_sizes[i]).length()))/2, ' ');
2266 ret << "size: " << all_sizes[i];
2267 ret << string((25 - (string("size: ").length()
2268 + to_string("",all_sizes[i]).length()))/2, ' ') << "|\t";
2269 }
2270 ret << std::endl;
2271 for (int i = 0; i < indexer; i++) {
2272 its[i] = all_maps[i].begin();
2273 ret << "|" << string((25 - (string("version: ").length()
2274 + to_string("",all_versions[i]).length()))/2, ' ');
2275 ret << "version: " << all_versions[i];
2276 ret << string((25 - (string("version: ").length()
2277 + to_string("",all_versions[i]).length()))/2, ' ') << "|\t";
2278 }
2279 ret << std::endl;
2280 for (int i = 0; i < indexer; i++) {
2281 its[i] = all_maps[i].begin();
2282 ret << "|" << string((25 - (string("unwritable? ").length()
2283 + 1))/2, ' ');
2284 ret << "unwritable? " << string(all_unwrit[i].c_str(),
2285 all_unwrit[i].length());
2286 ret << string((25 - (string("unwritable? ").length()
2287 + 1))/2, ' ') << "|\t";
2288 }
2289 ret << std::endl;
2290 for (int i = 0; i < indexer; i++) {
2291 ret << "---------------------------\t";
2292 }
2293 ret << std::endl;
2294 ret << "///////////////////THE ACTUAL BLOCKS////////////////" << std::endl;
2295
2296
2297 ret << std::endl;
2298 for (int i = 0; i < indexer; i++) {
2299 ret << "---------------------------\t";
2300 }
2301 ret << std::endl;
2302 //each time through this part is two lines
2303 while(done < keys.size()) {
2304 for(int i = 0; i < indexer; i++) {
2305 if(dones[i]){
2306 ret << " \t";
2307 } else {
2308 if (its[i] == all_maps[i].end()){
2309 done++;
2310 dones[i] = true;
2311 ret << " \t";
2312 } else {
2313 ret << "|" << string((25 -
2314 ((*its[i]).first.length()+its[i]->second.length()+3))/2,' ');
2315 ret << (*its[i]).first;
2316 ret << " | ";
2317 ret << string(its[i]->second.c_str(), its[i]->second.length());
2318 ret << string((25 -
2319 ((*its[i]).first.length()+its[i]->second.length()+3))/2,' ');
2320 ret << "|\t";
2321 ++(its[i]);
2322 }
2323
2324 }
2325 }
2326 ret << std::endl;
2327 for (int i = 0; i < indexer; i++) {
2328 if(dones[i]){
2329 ret << " \t";
2330 } else {
2331 ret << "---------------------------\t";
2332 }
2333 }
2334 ret << std::endl;
2335
2336 }
2337 return ret.str();
2338 }