]> git.proxmox.com Git - ceph.git/blame - ceph/src/key_value_store/kv_flat_btree_async.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / key_value_store / kv_flat_btree_async.cc
CommitLineData
7c673cae
FG
1/*
2 * Key-value store using librados
3 *
4 * September 2, 2012
5 * Eleanor Cawthon
6 * eleanor.cawthon@inktank.com
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 */
13
11fdf7f2 14#include "include/compat.h"
7c673cae
FG
15#include "key_value_store/key_value_structure.h"
16#include "key_value_store/kv_flat_btree_async.h"
17#include "key_value_store/kvs_arg_types.h"
18#include "include/rados/librados.hpp"
7c673cae
FG
19#include "common/ceph_context.h"
20#include "common/Clock.h"
21#include "include/types.h"
22
11fdf7f2 23#include <errno.h>
7c673cae
FG
24#include <string>
25#include <iostream>
26#include <cassert>
27#include <climits>
28#include <cmath>
29#include <sstream>
30#include <stdlib.h>
31#include <iterator>
32
7c673cae
FG
33using ceph::bufferlist;
34
35bool index_data::is_timed_out(utime_t now, utime_t timeout) const {
36 return prefix != "" && now - ts > timeout;
37}
38
39void IndexCache::clear() {
40 k2itmap.clear();
41 t2kmap.clear();
42}
43
44void IndexCache::push(const string &key, const index_data &idata) {
45 if (cache_size == 0) {
46 return;
47 }
48 index_data old_idata;
49 map<key_data, pair<index_data, utime_t> >::iterator old_it =
50 k2itmap.lower_bound(key_data(key));
51 if (old_it != k2itmap.end()) {
52 t2kmap.erase(old_it->second.second);
53 k2itmap.erase(old_it);
54 }
55 map<key_data, pair<index_data, utime_t> >::iterator new_it =
56 k2itmap.find(idata.kdata);
57 if (new_it != k2itmap.end()) {
58 utime_t old_time = new_it->second.second;
59 t2kmap.erase(old_time);
60 }
61 utime_t time = ceph_clock_now();
62 k2itmap[idata.kdata] = make_pair(idata, time);
63 t2kmap[time] = idata.kdata;
64 if ((int)k2itmap.size() > cache_size) {
65 pop();
66 }
67
68}
69
70void IndexCache::push(const index_data &idata) {
71 if (cache_size == 0) {
72 return;
73 }
74 if (k2itmap.count(idata.kdata) > 0) {
75 utime_t old_time = k2itmap[idata.kdata].second;
76 t2kmap.erase(old_time);
77 k2itmap.erase(idata.kdata);
78 }
79 utime_t time = ceph_clock_now();
80 k2itmap[idata.kdata] = make_pair(idata, time);
81 t2kmap[time] = idata.kdata;
82 if ((int)k2itmap.size() > cache_size) {
83 pop();
84 }
85}
86
87void IndexCache::pop() {
88 if (cache_size == 0) {
89 return;
90 }
91 map<utime_t, key_data>::iterator it = t2kmap.begin();
92 utime_t time = it->first;
93 key_data kdata = it->second;
94 k2itmap.erase(kdata);
95 t2kmap.erase(time);
96}
97
98void IndexCache::erase(key_data kdata) {
99 if (cache_size == 0) {
100 return;
101 }
102 if (k2itmap.count(kdata) > 0) {
103 utime_t c = k2itmap[kdata].second;
104 k2itmap.erase(kdata);
105 t2kmap.erase(c);
106 }
107}
108
109int IndexCache::get(const string &key, index_data *idata) const {
110 if (cache_size == 0) {
111 return -ENODATA;
112 }
113 if ((int)k2itmap.size() == 0) {
114 return -ENODATA;
115 }
116 map<key_data, pair<index_data, utime_t> >::const_iterator it =
117 k2itmap.lower_bound(key_data(key));
118 if (it == k2itmap.end() || !(it->second.first.min_kdata < key_data(key))) {
119 return -ENODATA;
120 } else {
121 *idata = it->second.first;
122 }
123 return 0;
124}
125
126int IndexCache::get(const string &key, index_data *idata,
127 index_data *next_idata) const {
128 if (cache_size == 0) {
129 return -ENODATA;
130 }
131 map<key_data, pair<index_data, utime_t> >::const_iterator it =
132 k2itmap.lower_bound(key_data(key));
133 if (it == k2itmap.end() || ++it == k2itmap.end()) {
134 return -ENODATA;
135 } else {
136 --it;
137 if (!(it->second.first.min_kdata < key_data(key))){
138 //stale, should be reread.
139 return -ENODATA;
140 } else {
141 *idata = it->second.first;
142 ++it;
143 if (it != k2itmap.end()) {
144 *next_idata = it->second.first;
145 }
146 }
147 }
148 return 0;
149}
150
151int KvFlatBtreeAsync::nothing() {
152 return 0;
153}
154
155int KvFlatBtreeAsync::wait() {
156 if (rand() % 10 == 0) {
157 usleep(wait_ms);
158 }
159 return 0;
160}
161
162int KvFlatBtreeAsync::suicide() {
163 if (rand() % 10 == 0) {
164 if (verbose) cout << client_name << " is suiciding" << std::endl;
165 return 1;
166 }
167 return 0;
168}
169
170int KvFlatBtreeAsync::next(const index_data &idata, index_data * out_data)
171{
172 if (verbose) cout << "\t\t" << client_name << "-next: finding next of "
173 << idata.str()
174 << std::endl;
175 int err = 0;
176 librados::ObjectReadOperation oro;
177 std::map<std::string, bufferlist> kvs;
178 oro.omap_get_vals2(idata.kdata.encoded(),1,&kvs, nullptr, &err);
179 err = io_ctx.operate(index_name, &oro, NULL);
180 if (err < 0){
181 if (verbose) cout << "\t\t\t" << client_name
182 << "-next: getting index failed with error "
183 << err << std::endl;
184 return err;
185 }
186 if (!kvs.empty()) {
187 out_data->kdata.parse(kvs.begin()->first);
11fdf7f2 188 auto b = kvs.begin()->second.cbegin();
7c673cae
FG
189 out_data->decode(b);
190 if (idata.is_timed_out(ceph_clock_now(), timeout)) {
191 if (verbose) cout << client_name << " THINKS THE OTHER CLIENT DIED."
192 << std::endl;
193 //the client died after deleting the object. clean up.
194 cleanup(idata, err);
195 }
196 } else {
197 err = -EOVERFLOW;
198 }
199 return err;
200}
201
202int KvFlatBtreeAsync::prev(const index_data &idata, index_data * out_data)
203{
204 if (verbose) cout << "\t\t" << client_name << "-prev: finding prev of "
205 << idata.str() << std::endl;
206 int err = 0;
207 bufferlist inbl;
208 idata_from_idata_args in_args;
209 in_args.idata = idata;
210 in_args.encode(inbl);
211 bufferlist outbl;
212 err = io_ctx.exec(index_name,"kvs", "get_prev_idata", inbl, outbl);
213 if (err < 0){
214 if (verbose) cout << "\t\t\t" << client_name
215 << "-prev: getting index failed with error "
216 << err << std::endl;
217 if (idata.is_timed_out(ceph_clock_now(), timeout)) {
218 if (verbose) cout << client_name << " THINKS THE OTHER CLIENT DIED."
219 << std::endl;
220 //the client died after deleting the object. clean up.
221 err = cleanup(idata, err);
222 if (err == -ESUICIDE) {
223 return err;
224 } else {
225 err = 0;
226 }
227 }
228 return err;
229 }
11fdf7f2 230 auto it = outbl.cbegin();
7c673cae
FG
231 in_args.decode(it);
232 *out_data = in_args.next_idata;
233 if (verbose) cout << "\t\t" << client_name << "-prev: prev is "
234 << out_data->str()
235 << std::endl;
236 return err;
237}
238
239int KvFlatBtreeAsync::read_index(const string &key, index_data * idata,
240 index_data * next_idata, bool force_update) {
241 int err = 0;
242 if (!force_update) {
243 if (verbose) cout << "\t" << client_name
244 << "-read_index: getting index_data for " << key
245 << " from cache" << std::endl;
246 icache_lock.Lock();
247 if (next_idata != NULL) {
248 err = icache.get(key, idata, next_idata);
249 } else {
250 err = icache.get(key, idata);
251 }
252 icache_lock.Unlock();
253
254 if (err == 0) {
255 //if (verbose) cout << "CACHE SUCCESS" << std::endl;
256 return err;
257 } else {
258 if (verbose) cout << "NOT IN CACHE" << std::endl;
259 }
260 }
261
262 if (verbose) cout << "\t" << client_name
263 << "-read_index: getting index_data for " << key
264 << " from object" << std::endl;
265 librados::ObjectReadOperation oro;
266 bufferlist raw_val;
267 std::set<std::string> key_set;
268 key_set.insert(key_data(key).encoded());
269 std::map<std::string, bufferlist> kvmap;
270 std::map<std::string, bufferlist> dupmap;
271 oro.omap_get_vals_by_keys(key_set, &dupmap, &err);
272 oro.omap_get_vals2(key_data(key).encoded(),
273 (cache_size / cache_refresh >= 2? cache_size / cache_refresh: 2),
274 &kvmap, nullptr, &err);
275 err = io_ctx.operate(index_name, &oro, NULL);
276 utime_t mytime = ceph_clock_now();
277 if (err < 0){
278 cerr << "\t" << client_name
279 << "-read_index: getting keys failed with "
280 << err << std::endl;
11fdf7f2 281 ceph_abort_msg(client_name + "-read_index: reading index failed");
7c673cae
FG
282 return err;
283 }
284 kvmap.insert(dupmap.begin(), dupmap.end());
285 for (map<string, bufferlist>::iterator it = ++kvmap.begin();
286 it != kvmap.end();
287 ++it) {
288 bufferlist bl = it->second;
11fdf7f2 289 auto blit = bl.cbegin();
7c673cae
FG
290 index_data this_idata;
291 this_idata.decode(blit);
292 if (this_idata.is_timed_out(mytime, timeout)) {
293 if (verbose) cout << client_name
294 << " THINKS THE OTHER CLIENT DIED. (mytime is "
295 << mytime.sec() << "." << mytime.usec() << ", idata.ts is "
296 << this_idata.ts.sec() << "." << this_idata.ts.usec()
297 << ", it has been " << (mytime - this_idata.ts).sec()
298 << '.' << (mytime - this_idata.ts).usec()
299 << ", timeout is " << timeout << ")" << std::endl;
300 //the client died after deleting the object. clean up.
301 if (cleanup(this_idata, -EPREFIX) == -ESUICIDE) {
302 return -ESUICIDE;
303 }
304 return read_index(key, idata, next_idata, force_update);
305 }
306 icache_lock.Lock();
307 icache.push(this_idata);
308 icache_lock.Unlock();
309 }
11fdf7f2 310 auto b = kvmap.begin()->second.cbegin();
7c673cae
FG
311 idata->decode(b);
312 idata->kdata.parse(kvmap.begin()->first);
313 if (verbose) cout << "\t" << client_name << "-read_index: kvmap_size is "
314 << kvmap.size()
315 << ", idata is " << idata->str() << std::endl;
316
11fdf7f2 317 ceph_assert(idata->obj != "");
7c673cae
FG
318 icache_lock.Lock();
319 icache.push(key, *idata);
320 icache_lock.Unlock();
321
322 if (next_idata != NULL && idata->kdata.prefix != "1") {
323 next_idata->kdata.parse((++kvmap.begin())->first);
11fdf7f2 324 auto nb = (++kvmap.begin())->second.cbegin();
7c673cae
FG
325 next_idata->decode(nb);
326 icache_lock.Lock();
327 icache.push(*next_idata);
328 icache_lock.Unlock();
329 }
330 return err;
331}
332
333int KvFlatBtreeAsync::split(const index_data &idata) {
334 int err = 0;
335 opmap['l']++;
336
337 if (idata.prefix != "") {
338 return -EPREFIX;
339 }
340
341 rebalance_args args;
342 args.bound = 2 * k - 1;
343 args.comparator = CEPH_OSD_CMPXATTR_OP_GT;
344 err = read_object(idata.obj, &args);
345 args.odata.max_kdata = idata.kdata;
346 if (err < 0) {
347 if (verbose) cout << "\t\t" << client_name << "-split: read object "
348 << args.odata.name
349 << " got " << err << std::endl;
350 return err;
351 }
352
353 if (verbose) cout << "\t\t" << client_name << "-split: splitting "
354 << idata.obj
355 << ", which has size " << args.odata.size
356 << " and actual size " << args.odata.omap.size() << std::endl;
357
358 ///////preparations that happen outside the critical section
359 //for prefix index
360 vector<object_data> to_create;
361 vector<object_data> to_delete;
362 to_delete.push_back(object_data(idata.min_kdata,
363 args.odata.max_kdata, args.odata.name, args.odata.version));
364
365 //for lower half object
366 map<std::string, bufferlist>::const_iterator it = args.odata.omap.begin();
367 client_index_lock.Lock();
368 to_create.push_back(object_data(to_string(client_name, client_index++)));
369 client_index_lock.Unlock();
370 for (int i = 0; i < k; i++) {
371 to_create[0].omap.insert(*it);
372 ++it;
373 }
374 to_create[0].min_kdata = idata.min_kdata;
375 to_create[0].max_kdata = key_data(to_create[0].omap.rbegin()->first);
376
377 //for upper half object
378 client_index_lock.Lock();
379 to_create.push_back(object_data(to_create[0].max_kdata,
380 args.odata.max_kdata,
381 to_string(client_name, client_index++)));
382 client_index_lock.Unlock();
383 to_create[1].omap.insert(
384 ++args.odata.omap.find(to_create[0].omap.rbegin()->first),
385 args.odata.omap.end());
386
387 //setting up operations
388 librados::ObjectWriteOperation owos[6];
389 vector<pair<pair<int, string>, librados::ObjectWriteOperation*> > ops;
390 index_data out_data;
391 set_up_prefix_index(to_create, to_delete, &owos[0], &out_data, &err);
392 ops.push_back(make_pair(
393 pair<int, string>(ADD_PREFIX, index_name),
394 &owos[0]));
395 for (int i = 1; i < 6; i++) {
396 ops.push_back(make_pair(make_pair(0,""), &owos[i]));
397 }
398 set_up_ops(to_create, to_delete, &ops, out_data, &err);
399
400 /////BEGIN CRITICAL SECTION/////
401 //put prefix on index entry for idata.val
402 err = perform_ops("\t\t" + client_name + "-split:", out_data, &ops);
403 if (err < 0) {
404 return err;
405 }
406 if (verbose) cout << "\t\t" << client_name << "-split: done splitting."
407 << std::endl;
408 /////END CRITICAL SECTION/////
409 icache_lock.Lock();
410 for (vector<delete_data>::iterator it = out_data.to_delete.begin();
411 it != out_data.to_delete.end(); ++it) {
412 icache.erase(it->max);
413 }
414 for (vector<create_data>::iterator it = out_data.to_create.begin();
415 it != out_data.to_create.end(); ++it) {
416 icache.push(index_data(*it));
417 }
418 icache_lock.Unlock();
419 return err;
420}
421
422int KvFlatBtreeAsync::rebalance(const index_data &idata1,
423 const index_data &next_idata){
424 opmap['m']++;
425 int err = 0;
426
427 if (idata1.prefix != "") {
428 return -EPREFIX;
429 }
430
431 rebalance_args args1;
432 args1.bound = k + 1;
433 args1.comparator = CEPH_OSD_CMPXATTR_OP_LT;
434 index_data idata2 = next_idata;
435
436 rebalance_args args2;
437 args2.bound = k + 1;
438 args2.comparator = CEPH_OSD_CMPXATTR_OP_LT;
439
440 if (idata1.kdata.prefix == "1") {
441 //this is the highest key in the index, so it doesn't have a next.
442
443 //read the index for the previous entry
444 err = prev(idata1, &idata2);
445 if (err == -ERANGE) {
446 if (verbose) cout << "\t\t" << client_name
447 << "-rebalance: this is the only node, "
448 << "so aborting" << std::endl;
449 return -EUCLEAN;
450 } else if (err < 0) {
451 return err;
452 }
453
454 //read the first object
455 err = read_object(idata1.obj, &args2);
456 if (err < 0) {
457 if (verbose) cout << "reading " << idata1.obj << " failed with " << err
458 << std::endl;
459 if (err == -ENOENT) {
460 return -ECANCELED;
461 }
462 return err;
463 }
464 args2.odata.min_kdata = idata1.min_kdata;
465 args2.odata.max_kdata = idata1.kdata;
466
467 //read the second object
468 args1.bound = 2 * k + 1;
469 err = read_object(idata2.obj, &args1);
470 if (err < 0) {
471 if (verbose) cout << "reading " << idata1.obj << " failed with " << err
472 << std::endl;
473 return err;
474 }
475 args1.odata.min_kdata = idata2.min_kdata;
476 args1.odata.max_kdata = idata2.kdata;
477
478 if (verbose) cout << "\t\t" << client_name << "-rebalance: read "
479 << idata2.obj
480 << ". size: " << args1.odata.size << " version: "
481 << args1.odata.version
482 << std::endl;
483 } else {
484 assert (next_idata.obj != "");
485 //there is a next key, so get it.
486 err = read_object(idata1.obj, &args1);
487 if (err < 0) {
488 if (verbose) cout << "reading " << idata1.obj << " failed with " << err
489 << std::endl;
490 return err;
491 }
492 args1.odata.min_kdata = idata1.min_kdata;
493 args1.odata.max_kdata = idata1.kdata;
494
495 args2.bound = 2 * k + 1;
496 err = read_object(idata2.obj, &args2);
497 if (err < 0) {
498 if (verbose) cout << "reading " << idata1.obj << " failed with " << err
499 << std::endl;
500 if (err == -ENOENT) {
501 return -ECANCELED;
502 }
503 return err;
504 }
505 args2.odata.min_kdata = idata2.min_kdata;
506 args2.odata.max_kdata = idata2.kdata;
507
508 if (verbose) cout << "\t\t" << client_name << "-rebalance: read "
509 << idata2.obj
510 << ". size: " << args2.odata.size << " version: "
511 << args2.odata.version
512 << std::endl;
513 }
514
515 if (verbose) cout << "\t\t" << client_name << "-rebalance: o1 is "
516 << args1.odata.max_kdata.encoded() << ","
517 << args1.odata.name << " with size " << args1.odata.size
518 << " , o2 is " << args2.odata.max_kdata.encoded()
519 << "," << args2.odata.name << " with size " << args2.odata.size
520 << std::endl;
521
522 //calculations
523 if ((int)args1.odata.size > k && (int)args1.odata.size <= 2*k
524 && (int)args2.odata.size > k
525 && (int)args2.odata.size <= 2*k) {
526 //nothing to do
527 if (verbose) cout << "\t\t" << client_name
528 << "-rebalance: both sizes in range, so"
529 << " aborting " << std::endl;
530 return -EBALANCE;
531 } else if (idata1.prefix != "" || idata2.prefix != "") {
532 return -EPREFIX;
533 }
534
535 //this is the high object. it gets created regardless of rebalance or merge.
536 client_index_lock.Lock();
537 string o2w = to_string(client_name, client_index++);
538 client_index_lock.Unlock();
539 index_data idata;
540 vector<object_data> to_create;
541 vector<object_data> to_delete;
542 librados::ObjectWriteOperation create[2];//possibly only 1 will be used
543 librados::ObjectWriteOperation other_ops[6];
544 vector<pair<pair<int, string>, librados::ObjectWriteOperation*> > ops;
545 ops.push_back(make_pair(
546 pair<int, string>(ADD_PREFIX, index_name),
547 &other_ops[0]));
548
549 if ((int)args1.odata.size + (int)args2.odata.size <= 2*k) {
550 //merge
551 if (verbose) cout << "\t\t" << client_name << "-rebalance: merging "
552 << args1.odata.name
553 << " and " << args2.odata.name << " to get " << o2w
554 << std::endl;
555 map<string, bufferlist> write2_map;
556 write2_map.insert(args1.odata.omap.begin(), args1.odata.omap.end());
557 write2_map.insert(args2.odata.omap.begin(), args2.odata.omap.end());
558 to_create.push_back(object_data(args1.odata.min_kdata,
559 args2.odata.max_kdata, o2w, write2_map));
560 ops.push_back(make_pair(
561 pair<int, string>(MAKE_OBJECT, o2w),
562 &create[0]));
11fdf7f2 563 ceph_assert((int)write2_map.size() <= 2*k);
7c673cae
FG
564 } else {
565 //rebalance
566 if (verbose) cout << "\t\t" << client_name << "-rebalance: rebalancing "
567 << args1.odata.name
568 << " and " << args2.odata.name << std::endl;
569 map<std::string, bufferlist> write1_map;
570 map<std::string, bufferlist> write2_map;
571 map<std::string, bufferlist>::iterator it;
572 client_index_lock.Lock();
573 string o1w = to_string(client_name, client_index++);
574 client_index_lock.Unlock();
575 int target_size_1 = ceil(((int)args1.odata.size + (int)args2.odata.size)
576 / 2.0);
577 if (args1.odata.max_kdata != idata1.kdata) {
578 //this should be true if idata1 is the high object
579 target_size_1 = floor(((int)args1.odata.size + (int)args2.odata.size)
580 / 2.0);
581 }
582 for (it = args1.odata.omap.begin();
583 it != args1.odata.omap.end() && (int)write1_map.size()
584 < target_size_1;
585 ++it) {
586 write1_map.insert(*it);
587 }
588 if (it != args1.odata.omap.end()){
589 //write1_map is full, so put the rest in write2_map
590 write2_map.insert(it, args1.odata.omap.end());
591 write2_map.insert(args2.odata.omap.begin(), args2.odata.omap.end());
592 } else {
593 //args1.odata.omap was small, and write2_map still needs more
594 map<std::string, bufferlist>::iterator it2;
595 for(it2 = args2.odata.omap.begin();
596 (it2 != args2.odata.omap.end()) && ((int)write1_map.size()
597 < target_size_1);
598 ++it2) {
599 write1_map.insert(*it2);
600 }
601 write2_map.insert(it2, args2.odata.omap.end());
602 }
603 if (verbose) cout << "\t\t" << client_name
604 << "-rebalance: write1_map has size "
605 << write1_map.size() << ", write2_map.size() is " << write2_map.size()
606 << std::endl;
607 //at this point, write1_map and write2_map should have the correct pairs
608 to_create.push_back(object_data(args1.odata.min_kdata,
609 key_data(write1_map.rbegin()->first),
610 o1w,write1_map));
611 to_create.push_back(object_data( key_data(write1_map.rbegin()->first),
612 args2.odata.max_kdata, o2w, write2_map));
613 ops.push_back(make_pair(
614 pair<int, string>(MAKE_OBJECT, o1w),
615 &create[0]));
616 ops.push_back(make_pair(
617 pair<int, string>(MAKE_OBJECT, o2w),
618 &create[1]));
619 }
620
621 to_delete.push_back(object_data(args1.odata.min_kdata,
622 args1.odata.max_kdata, args1.odata.name, args1.odata.version));
623 to_delete.push_back(object_data(args2.odata.min_kdata,
624 args2.odata.max_kdata, args2.odata.name, args2.odata.version));
625 for (int i = 1; i < 6; i++) {
626 ops.push_back(make_pair(make_pair(0,""), &other_ops[i]));
627 }
628
629 index_data out_data;
630 set_up_prefix_index(to_create, to_delete, &other_ops[0], &out_data, &err);
631 set_up_ops(to_create, to_delete, &ops, out_data, &err);
632
633 //at this point, all operations should be completely set up.
634 /////BEGIN CRITICAL SECTION/////
635 err = perform_ops("\t\t" + client_name + "-rebalance:", out_data, &ops);
636 if (err < 0) {
637 return err;
638 }
639 icache_lock.Lock();
640 for (vector<delete_data>::iterator it = out_data.to_delete.begin();
641 it != out_data.to_delete.end(); ++it) {
642 icache.erase(it->max);
643 }
644 for (vector<create_data>::iterator it = out_data.to_create.begin();
645 it != out_data.to_create.end(); ++it) {
646 icache.push(index_data(*it));
647 }
648 icache_lock.Unlock();
649 if (verbose) cout << "\t\t" << client_name << "-rebalance: done rebalancing."
650 << std::endl;
651 /////END CRITICAL SECTION/////
652 return err;
653}
654
655int KvFlatBtreeAsync::read_object(const string &obj, object_data * odata) {
656 librados::ObjectReadOperation get_obj;
657 librados::AioCompletion * obj_aioc = rados.aio_create_completion();
658 int err;
659 bufferlist unw_bl;
660 odata->name = obj;
661 get_obj.omap_get_vals2("", LONG_MAX, &odata->omap, nullptr, &err);
662 get_obj.getxattr("unwritable", &unw_bl, &err);
663 io_ctx.aio_operate(obj, obj_aioc, &get_obj, NULL);
664 obj_aioc->wait_for_safe();
665 err = obj_aioc->get_return_value();
666 if (err < 0){
667 //possibly -ENOENT, meaning someone else deleted it.
668 obj_aioc->release();
669 return err;
670 }
671 odata->unwritable = string(unw_bl.c_str(), unw_bl.length()) == "1";
672 odata->version = obj_aioc->get_version64();
673 odata->size = odata->omap.size();
674 obj_aioc->release();
675 return 0;
676}
677
678int KvFlatBtreeAsync::read_object(const string &obj, rebalance_args * args) {
679 bufferlist inbl;
680 args->encode(inbl);
681 bufferlist outbl;
682 int err;
683 librados::AioCompletion * a = rados.aio_create_completion();
684 io_ctx.aio_exec(obj, a, "kvs", "maybe_read_for_balance", inbl, &outbl);
685 a->wait_for_safe();
686 err = a->get_return_value();
687 if (err < 0) {
688 if (verbose) cout << "\t\t" << client_name
689 << "-read_object: reading failed with "
690 << err << std::endl;
691 a->release();
692 return err;
693 }
11fdf7f2 694 auto it = outbl.cbegin();
7c673cae
FG
695 args->decode(it);
696 args->odata.name = obj;
697 args->odata.version = a->get_version64();
698 a->release();
699 return err;
700}
701
702void KvFlatBtreeAsync::set_up_prefix_index(
703 const vector<object_data> &to_create,
704 const vector<object_data> &to_delete,
705 librados::ObjectWriteOperation * owo,
706 index_data * idata,
707 int * err) {
708 std::map<std::string, pair<bufferlist, int> > assertions;
709 map<string, bufferlist> to_insert;
710 idata->prefix = "1";
711 idata->ts = ceph_clock_now();
712 for(vector<object_data>::const_iterator it = to_create.begin();
713 it != to_create.end();
714 ++it) {
715 create_data c(it->min_kdata, it->max_kdata, it->name);
716 idata->to_create.push_back(c);
717 }
718 for(vector<object_data>::const_iterator it = to_delete.begin();
719 it != to_delete.end();
720 ++it) {
721 delete_data d(it->min_kdata, it->max_kdata, it->name, it->version);
722 idata->to_delete.push_back(d);
723 }
724 for(vector<object_data>::const_iterator it = to_delete.begin();
725 it != to_delete.end();
726 ++it) {
727 idata->obj = it->name;
728 idata->min_kdata = it->min_kdata;
729 idata->kdata = it->max_kdata;
730 bufferlist insert;
731 idata->encode(insert);
732 to_insert[it->max_kdata.encoded()] = insert;
733 index_data this_entry;
734 this_entry.min_kdata = idata->min_kdata;
735 this_entry.kdata = idata->kdata;
736 this_entry.obj = idata->obj;
737 assertions[it->max_kdata.encoded()] = pair<bufferlist, int>
738 (to_bl(this_entry), CEPH_OSD_CMPXATTR_OP_EQ);
739 if (verbose) cout << "\t\t\t" << client_name
740 << "-setup_prefix: will assert "
741 << this_entry.str() << std::endl;
742 }
11fdf7f2 743 ceph_assert(*err == 0);
7c673cae
FG
744 owo->omap_cmp(assertions, err);
745 if (to_create.size() <= 2) {
746 owo->omap_set(to_insert);
747 }
748}
749
750//some args can be null if there are no corresponding entries in p
751void KvFlatBtreeAsync::set_up_ops(
752 const vector<object_data> &create_vector,
753 const vector<object_data> &delete_vector,
754 vector<pair<pair<int, string>, librados::ObjectWriteOperation*> > * ops,
755 const index_data &idata,
756 int * err) {
757 vector<pair<pair<int, string>,
758 librados::ObjectWriteOperation* > >::iterator it;
759
760 //skip the prefixing part
761 for(it = ops->begin(); it->first.first == ADD_PREFIX; ++it) {}
762 map<string, bufferlist> to_insert;
763 std::set<string> to_remove;
764 map<string, pair<bufferlist, int> > assertions;
765 if (create_vector.size() > 0) {
766 for (int i = 0; i < (int)idata.to_delete.size(); ++i) {
767 it->first = pair<int, string>(UNWRITE_OBJECT, idata.to_delete[i].obj);
768 set_up_unwrite_object(delete_vector[i].version, it->second);
769 ++it;
770 }
771 }
772 for (int i = 0; i < (int)idata.to_create.size(); ++i) {
773 index_data this_entry(idata.to_create[i].max, idata.to_create[i].min,
774 idata.to_create[i].obj);
775 to_insert[idata.to_create[i].max.encoded()] = to_bl(this_entry);
776 if (idata.to_create.size() <= 2) {
777 it->first = pair<int, string>(MAKE_OBJECT, idata.to_create[i].obj);
778 } else {
779 it->first = pair<int, string>(AIO_MAKE_OBJECT, idata.to_create[i].obj);
780 }
781 set_up_make_object(create_vector[i].omap, it->second);
782 ++it;
783 }
784 for (int i = 0; i < (int)idata.to_delete.size(); ++i) {
785 index_data this_entry = idata;
786 this_entry.obj = idata.to_delete[i].obj;
787 this_entry.min_kdata = idata.to_delete[i].min;
788 this_entry.kdata = idata.to_delete[i].max;
789 if (verbose) cout << "\t\t\t" << client_name << "-setup_ops: will assert "
790 << this_entry.str() << std::endl;
791 assertions[idata.to_delete[i].max.encoded()] = pair<bufferlist, int>(
792 to_bl(this_entry), CEPH_OSD_CMPXATTR_OP_EQ);
793 to_remove.insert(idata.to_delete[i].max.encoded());
794 it->first = pair<int, string>(REMOVE_OBJECT, idata.to_delete[i].obj);
795 set_up_delete_object(it->second);
796 ++it;
797 }
798 if ((int)idata.to_create.size() <= 2) {
799 it->second->omap_cmp(assertions, err);
800 }
801 it->second->omap_rm_keys(to_remove);
802 it->second->omap_set(to_insert);
803
804
805 it->first = pair<int, string>(REMOVE_PREFIX, index_name);
806}
807
808void KvFlatBtreeAsync::set_up_make_object(
809 const map<std::string, bufferlist> &to_set,
810 librados::ObjectWriteOperation *owo) {
811 bufferlist inbl;
11fdf7f2 812 encode(to_set, inbl);
7c673cae
FG
813 owo->exec("kvs", "create_with_omap", inbl);
814}
815
816void KvFlatBtreeAsync::set_up_unwrite_object(
817 const int &ver, librados::ObjectWriteOperation *owo) {
818 if (ver > 0) {
819 owo->assert_version(ver);
820 }
821 owo->cmpxattr("unwritable", CEPH_OSD_CMPXATTR_OP_EQ, to_bl("0"));
822 owo->setxattr("unwritable", to_bl("1"));
823}
824
825void KvFlatBtreeAsync::set_up_restore_object(
826 librados::ObjectWriteOperation *owo) {
827 owo->cmpxattr("unwritable", CEPH_OSD_CMPXATTR_OP_EQ, to_bl("1"));
828 owo->setxattr("unwritable", to_bl("0"));
829}
830
831void KvFlatBtreeAsync::set_up_delete_object(
832 librados::ObjectWriteOperation *owo) {
833 owo->cmpxattr("unwritable", CEPH_OSD_CMPXATTR_OP_EQ, to_bl("1"));
834 owo->remove();
835}
836
837int KvFlatBtreeAsync::perform_ops(const string &debug_prefix,
838 const index_data &idata,
839 vector<pair<pair<int, string>, librados::ObjectWriteOperation*> > *ops) {
840 int err = 0;
841 vector<librados::AioCompletion*> aiocs(idata.to_create.size());
842 int count = 0;
843 for (vector<pair<pair<int, string>,
844 librados::ObjectWriteOperation*> >::iterator it = ops->begin();
845 it != ops->end(); ++it) {
846 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) {
847 return -ESUICIDE;
848 }
849 switch (it->first.first) {
850 case ADD_PREFIX://prefixing
851 if (verbose) cout << debug_prefix << " adding prefix" << std::endl;
852 err = io_ctx.operate(index_name, it->second);
853 if (err < 0) {
854 if (verbose) cout << debug_prefix << " prefixing the index failed with "
855 << err << std::endl;
856 return -EPREFIX;
857 }
858 if (verbose) cout << debug_prefix << " prefix added." << std::endl;
859 break;
860 case UNWRITE_OBJECT://marking
861 if (verbose) cout << debug_prefix << " marking " << it->first.second
862 << std::endl;
863 err = io_ctx.operate(it->first.second, it->second);
864 if (err < 0) {
865 //most likely because it changed, in which case it will be -ERANGE
866 if (verbose) cout << debug_prefix << " marking " << it->first.second
867 << "failed with code" << err << std::endl;
868 if (it->first.second == (*idata.to_delete.begin()).max.encoded()) {
869 if (cleanup(idata, -EFIRSTOBJ) == -ESUICIDE) {
870 return -ESUICIDE;
871 }
872 } else {
873 if (cleanup(idata, -ERANGE) == -ESUICIDE) {
874 return -ESUICIDE;
875 }
876 }
877 return err;
878 }
879 if (verbose) cout << debug_prefix << " marked " << it->first.second
880 << std::endl;
881 break;
882 case MAKE_OBJECT://creating
883 if (verbose) cout << debug_prefix << " creating " << it->first.second
884 << std::endl;
885 err = io_ctx.operate(it->first.second, it->second);
886 if (err < 0) {
887 //this can happen if someone else was cleaning up after us.
888 if (verbose) cout << debug_prefix << " creating " << it->first.second
889 << " failed"
890 << " with code " << err << std::endl;
891 if (err == -EEXIST) {
892 //someone thinks we died, so die
893 if (verbose) cout << client_name << " is suiciding!" << std::endl;
894 return -ESUICIDE;
895 } else {
11fdf7f2 896 ceph_abort();
7c673cae
FG
897 }
898 return err;
899 }
900 if (verbose || idata.to_create.size() > 2) {
901 cout << debug_prefix << " created object " << it->first.second
902 << std::endl;
903 }
904 break;
905 case AIO_MAKE_OBJECT:
906 cout << debug_prefix << " launching asynchronous create "
907 << it->first.second << std::endl;
908 aiocs[count] = rados.aio_create_completion();
909 io_ctx.aio_operate(it->first.second, aiocs[count], it->second);
910 count++;
911 if ((int)idata.to_create.size() == count) {
912 cout << "starting aiowrite waiting loop" << std::endl;
913 for (count -= 1; count >= 0; count--) {
914 aiocs[count]->wait_for_safe();
915 err = aiocs[count]->get_return_value();
916 if (err < 0) {
917 //this can happen if someone else was cleaning up after us.
918 cerr << debug_prefix << " a create failed"
919 << " with code " << err << std::endl;
920 if (err == -EEXIST) {
921 //someone thinks we died, so die
922 cerr << client_name << " is suiciding!" << std::endl;
923 return -ESUICIDE;
924 } else {
11fdf7f2 925 ceph_abort();
7c673cae
FG
926 }
927 return err;
928 }
929 if (verbose || idata.to_create.size() > 2) {
930 cout << debug_prefix << " completed aio " << aiocs.size() - count
931 << "/" << aiocs.size() << std::endl;
932 }
933 }
934 }
935 break;
936 case REMOVE_OBJECT://deleting
937 if (verbose) cout << debug_prefix << " deleting " << it->first.second
938 << std::endl;
939 err = io_ctx.operate(it->first.second, it->second);
940 if (err < 0) {
941 //if someone else called cleanup on this prefix first
942 if (verbose) cout << debug_prefix << " deleting " << it->first.second
943 << "failed with code" << err << std::endl;
944 }
945 if (verbose) cout << debug_prefix << " deleted " << it->first.second
946 << std::endl;
947 break;
948 case REMOVE_PREFIX://rewriting index
949 if (verbose) cout << debug_prefix << " updating index " << std::endl;
950 err = io_ctx.operate(index_name, it->second);
951 if (err < 0) {
952 if (verbose) cout << debug_prefix
953 << " rewriting the index failed with code " << err
954 << ". someone else must have thought we died, so dying" << std::endl;
955 return -ETIMEDOUT;
956 }
957 if (verbose) cout << debug_prefix << " updated index." << std::endl;
958 break;
959 case RESTORE_OBJECT:
960 if (verbose) cout << debug_prefix << " restoring " << it->first.second
961 << std::endl;
962 err = io_ctx.operate(it->first.second, it->second);
963 if (err < 0) {
964 if (verbose) cout << debug_prefix << "restoring " << it->first.second
965 << " failed"
966 << " with " << err << std::endl;
967 return err;
968 }
969 if (verbose) cout << debug_prefix << " restored " << it->first.second
970 << std::endl;
971 break;
972 default:
973 if (verbose) cout << debug_prefix << " performing unknown op on "
974 << it->first.second
975 << std::endl;
976 err = io_ctx.operate(index_name, it->second);
977 if (err < 0) {
978 if (verbose) cout << debug_prefix << " unknown op on "
979 << it->first.second
980 << " failed with " << err << std::endl;
981 return err;
982 }
983 if (verbose) cout << debug_prefix << " unknown op on "
984 << it->first.second
985 << " succeeded." << std::endl;
986 break;
987 }
988 }
989
990 return err;
991}
992
993int KvFlatBtreeAsync::cleanup(const index_data &idata, const int &error) {
994 if (verbose) cout << "\t\t" << client_name << ": cleaning up after "
995 << idata.str()
996 << std::endl;
997 int err = 0;
11fdf7f2 998 ceph_assert(idata.prefix != "");
7c673cae
FG
999 map<std::string,bufferlist> new_index;
1000 map<std::string, pair<bufferlist, int> > assertions;
1001 switch (error) {
1002 case -EFIRSTOBJ: {
1003 //this happens if the split or rebalance failed to mark the first object,
1004 //meaning only the index needs to be changed.
1005 //restore objects that had been marked unwritable.
1006 for(vector<delete_data >::const_iterator it =
1007 idata.to_delete.begin();
1008 it != idata.to_delete.end(); ++it) {
1009 index_data this_entry;
1010 this_entry.obj = (*it).obj;
1011 this_entry.min_kdata = it->min;
1012 this_entry.kdata = it->max;
1013 new_index[it->max.encoded()] = to_bl(this_entry);
1014 this_entry = idata;
1015 this_entry.obj = it->obj;
1016 this_entry.min_kdata = it->min;
1017 this_entry.kdata = it->max;
1018 if (verbose) cout << "\t\t\t" << client_name
1019 << "-cleanup: will assert index contains "
1020 << this_entry.str() << std::endl;
1021 assertions[it->max.encoded()] =
1022 pair<bufferlist, int>(to_bl(this_entry),
1023 CEPH_OSD_CMPXATTR_OP_EQ);
1024 }
1025
1026 //update the index
1027 librados::ObjectWriteOperation update_index;
1028 update_index.omap_cmp(assertions, &err);
1029 update_index.omap_set(new_index);
1030 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: updating index"
1031 << std::endl;
1032 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) {
1033 return -ESUICIDE;
1034 }
1035 err = io_ctx.operate(index_name, &update_index);
1036 if (err < 0) {
1037 if (verbose) cout << "\t\t\t" << client_name
1038 << "-cleanup: rewriting failed with "
1039 << err << ". returning -ECANCELED" << std::endl;
1040 return -ECANCELED;
1041 }
1042 if (verbose) cout << "\t\t\t" << client_name
1043 << "-cleanup: updated index. cleanup done."
1044 << std::endl;
1045 break;
1046 }
1047 case -ERANGE: {
1048 //this happens if a split or rebalance fails to mark an object. It is a
1049 //special case of rolling back that does not have to deal with new objects.
1050
1051 //restore objects that had been marked unwritable.
1052 vector<delete_data >::const_iterator it;
1053 for(it = idata.to_delete.begin();
1054 it != idata.to_delete.end(); ++it) {
1055 index_data this_entry;
1056 this_entry.obj = (*it).obj;
1057 this_entry.min_kdata = it->min;
1058 this_entry.kdata = it->max;
1059 new_index[it->max.encoded()] = to_bl(this_entry);
1060 this_entry = idata;
1061 this_entry.obj = it->obj;
1062 this_entry.min_kdata = it->min;
1063 this_entry.kdata = it->max;
1064 if (verbose) cout << "\t\t\t" << client_name
1065 << "-cleanup: will assert index contains "
1066 << this_entry.str() << std::endl;
1067 assertions[it->max.encoded()] =
1068 pair<bufferlist, int>(to_bl(this_entry),
1069 CEPH_OSD_CMPXATTR_OP_EQ);
1070 }
1071 it = idata.to_delete.begin();
1072 librados::ObjectWriteOperation restore;
1073 set_up_restore_object(&restore);
1074 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) {
1075 return -ESUICIDE;
1076 }
1077 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: restoring "
1078 << it->obj
1079 << std::endl;
1080 err = io_ctx.operate(it->obj, &restore);
1081 if (err < 0) {
1082 //i.e., -ECANCELED because the object was already restored by someone
1083 //else
1084 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: restoring "
1085 << it->obj
1086 << " failed with " << err << std::endl;
1087 } else {
1088 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: restored "
1089 << it->obj
1090 << std::endl;
1091 }
1092
1093 //update the index
1094 librados::ObjectWriteOperation update_index;
1095 update_index.omap_cmp(assertions, &err);
1096 update_index.omap_set(new_index);
1097 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: updating index"
1098 << std::endl;
1099 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) {
1100 return -ESUICIDE;
1101 }
1102 err = io_ctx.operate(index_name, &update_index);
1103 if (err < 0) {
1104 if (verbose) cout << "\t\t\t" << client_name
1105 << "-cleanup: rewriting failed with "
1106 << err << ". returning -ECANCELED" << std::endl;
1107 return -ECANCELED;
1108 }
1109 if (verbose) cout << "\t\t\t" << client_name
1110 << "-cleanup: updated index. cleanup done."
1111 << std::endl;
1112 break;
1113 }
1114 case -ENOENT: {
1115 if (verbose) cout << "\t\t" << client_name << "-cleanup: rolling forward"
1116 << std::endl;
1117 //all changes were created except for updating the index and possibly
1118 //deleting the objects. roll forward.
1119 vector<pair<pair<int, string>, librados::ObjectWriteOperation*> > ops;
1120 vector<librados::ObjectWriteOperation> owos(idata.to_delete.size() + 1);
1121 for (int i = 0; i <= (int)idata.to_delete.size(); ++i) {
1122 ops.push_back(make_pair(pair<int, string>(0, ""), &owos[i]));
1123 }
1124 set_up_ops(vector<object_data>(),
1125 vector<object_data>(), &ops, idata, &err);
1126 err = perform_ops("\t\t" + client_name + "-cleanup:", idata, &ops);
1127 if (err < 0) {
1128 if (err == -ESUICIDE) {
1129 return -ESUICIDE;
1130 }
1131 if (verbose) cout << "\t\t\t" << client_name
1132 << "-cleanup: rewriting failed with "
1133 << err << ". returning -ECANCELED" << std::endl;
1134 return -ECANCELED;
1135 }
1136 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: updated index"
1137 << std::endl;
1138 break;
1139 }
1140 default: {
1141 //roll back all changes.
1142 if (verbose) cout << "\t\t" << client_name << "-cleanup: rolling back"
1143 << std::endl;
1144 map<std::string,bufferlist> new_index;
1145 std::set<string> to_remove;
1146 map<std::string, pair<bufferlist, int> > assertions;
1147
1148 //mark the objects to be created. if someone else already has, die.
1149 for(vector<create_data >::const_reverse_iterator it =
1150 idata.to_create.rbegin();
1151 it != idata.to_create.rend(); ++it) {
1152 librados::ObjectWriteOperation rm;
1153 set_up_unwrite_object(0, &rm);
1154 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 )
1155 {
1156 return -ESUICIDE;
1157 }
1158 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: marking "
1159 << it->obj
1160 << std::endl;
1161 err = io_ctx.operate(it->obj, &rm);
1162 if (err < 0) {
1163 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: marking "
1164 << it->obj
1165 << " failed with " << err << std::endl;
1166 } else {
1167 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: marked "
1168 << it->obj
1169 << std::endl;
1170 }
1171 }
1172
1173 //restore objects that had been marked unwritable.
1174 for(vector<delete_data >::const_iterator it =
1175 idata.to_delete.begin();
1176 it != idata.to_delete.end(); ++it) {
1177 index_data this_entry;
1178 this_entry.obj = (*it).obj;
1179 this_entry.min_kdata = it->min;
1180 this_entry.kdata = it->max;
1181 new_index[it->max.encoded()] = to_bl(this_entry);
1182 this_entry = idata;
1183 this_entry.obj = it->obj;
1184 this_entry.min_kdata = it->min;
1185 this_entry.kdata = it->max;
1186 if (verbose) cout << "\t\t\t" << client_name
1187 << "-cleanup: will assert index contains "
1188 << this_entry.str() << std::endl;
1189 assertions[it->max.encoded()] =
1190 pair<bufferlist, int>(to_bl(this_entry),
1191 CEPH_OSD_CMPXATTR_OP_EQ);
1192 librados::ObjectWriteOperation restore;
1193 set_up_restore_object(&restore);
1194 if (verbose) cout << "\t\t\t" << client_name
1195 << "-cleanup: will assert index contains "
1196 << this_entry.str() << std::endl;
1197 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 )
1198 {
1199 return -ESUICIDE;
1200 }
1201 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: restoring "
1202 << it->obj
1203 << std::endl;
1204 err = io_ctx.operate(it->obj, &restore);
1205 if (err == -ENOENT) {
1206 //it had gotten far enough to be rolled forward - unmark the objects
1207 //and roll forward.
1208 if (verbose) cout << "\t\t\t" << client_name
1209 << "-cleanup: roll forward instead"
1210 << std::endl;
1211 for(vector<create_data >::const_iterator cit =
1212 idata.to_create.begin();
1213 cit != idata.to_create.end(); ++cit) {
1214 librados::ObjectWriteOperation res;
1215 set_up_restore_object(&res);
1216 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)()
1217 == 1 ) {
1218 return -ECANCELED;
1219 }
1220 if (verbose) cout << "\t\t\t" << client_name
1221 << "-cleanup: restoring " << cit->obj
1222 << std::endl;
1223 err = io_ctx.operate(cit->obj, &res);
1224 if (err < 0) {
1225 if (verbose) cout << "\t\t\t" << client_name
1226 << "-cleanup: restoring "
1227 << cit->obj << " failed with " << err << std::endl;
1228 }
1229 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: restored "
1230 << cit->obj
1231 << std::endl;
1232 }
1233 return cleanup(idata, -ENOENT);
1234 } else if (err < 0) {
1235 //i.e., -ECANCELED because the object was already restored by someone
1236 //else
1237 if (verbose) cout << "\t\t\t" << client_name
1238 << "-cleanup: restoring " << it->obj
1239 << " failed with " << err << std::endl;
1240 } else {
1241 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: restored "
1242 << it->obj
1243 << std::endl;
1244 }
1245 }
1246
1247 //remove the new objects
1248 for(vector<create_data >::const_reverse_iterator it =
1249 idata.to_create.rbegin();
1250 it != idata.to_create.rend(); ++it) {
1251 to_remove.insert(it->max.encoded());
1252 librados::ObjectWriteOperation rm;
1253 rm.remove();
1254 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 )
1255 {
1256 return -ESUICIDE;
1257 }
1258 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: removing "
1259 << it->obj
1260 << std::endl;
1261 err = io_ctx.operate(it->obj, &rm);
1262 if (err < 0) {
1263 if (verbose) cout << "\t\t\t" << client_name
1264 << "-cleanup: failed to remove "
1265 << it->obj << std::endl;
1266 } else {
1267 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: removed "
1268 << it->obj
1269 << std::endl;
1270 }
1271 }
1272
1273 //update the index
1274 librados::ObjectWriteOperation update_index;
1275 update_index.omap_cmp(assertions, &err);
1276 update_index.omap_rm_keys(to_remove);
1277 update_index.omap_set(new_index);
1278 if (verbose) cout << "\t\t\t" << client_name << "-cleanup: updating index"
1279 << std::endl;
1280 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) {
1281 return -ESUICIDE;
1282 }
1283 err = io_ctx.operate(index_name, &update_index);
1284 if (err < 0) {
1285 if (verbose) cout << "\t\t\t" << client_name
1286 << "-cleanup: rewriting failed with "
1287 << err << ". returning -ECANCELED" << std::endl;
1288 return -ECANCELED;
1289 }
1290 if (verbose) cout << "\t\t\t" << client_name
1291 << "-cleanup: updated index. cleanup done."
1292 << std::endl;
1293 break;
1294 }
1295 }
1296 return err;
1297}
1298
1299string KvFlatBtreeAsync::to_string(string s, int i) {
1300 stringstream ret;
1301 ret << s << i;
1302 return ret.str();
1303}
1304
1305string KvFlatBtreeAsync::get_name() {
1306 return rados_id;
1307}
1308
1309void KvFlatBtreeAsync::set_inject(injection_t inject, int wait_time) {
1310 interrupt = inject;
1311 wait_ms = wait_time;
1312}
1313
1314int KvFlatBtreeAsync::setup(int argc, const char** argv) {
1315 int r = rados.init(rados_id.c_str());
1316 if (r < 0) {
1317 cerr << "error during init" << r << std::endl;
1318 return r;
1319 }
1320 r = rados.conf_parse_argv(argc, argv);
1321 if (r < 0) {
1322 cerr << "error during parsing args" << r << std::endl;
1323 return r;
1324 }
1325 r = rados.conf_parse_env(NULL);
1326 if (r < 0) {
1327 cerr << "error during parsing env" << r << std::endl;
1328 return r;
1329 }
1330 r = rados.conf_read_file(NULL);
1331 if (r < 0) {
1332 cerr << "error during read file: " << r << std::endl;
1333 return r;
1334 }
1335 r = rados.connect();
1336 if (r < 0) {
1337 cerr << "error during connect: " << r << std::endl;
1338 return r;
1339 }
1340 r = rados.ioctx_create(pool_name.c_str(), io_ctx);
1341 if (r < 0) {
1342 cerr << "error creating io ctx: " << r << std::endl;
1343 rados.shutdown();
1344 return r;
1345 }
1346
1347 librados::ObjectWriteOperation make_index;
1348 make_index.create(true);
1349 map<std::string,bufferlist> index_map;
1350 index_data idata;
1351 idata.obj = client_name;
1352 idata.min_kdata.raw_key = "";
1353 idata.kdata = key_data("");
1354 index_map["1"] = to_bl(idata);
1355 make_index.omap_set(index_map);
1356 r = io_ctx.operate(index_name, &make_index);
1357 if (r < 0) {
1358 if (verbose) cout << client_name << ": Making the index failed with code "
1359 << r
1360 << std::endl;
1361 return 0;
1362 }
1363 if (verbose) cout << client_name << ": created index object" << std::endl;
1364
1365 librados::ObjectWriteOperation make_max_obj;
1366 make_max_obj.create(true);
1367 make_max_obj.setxattr("unwritable", to_bl("0"));
1368 make_max_obj.setxattr("size", to_bl("0"));
1369 r = io_ctx.operate(client_name, &make_max_obj);
1370 if (r < 0) {
1371 if (verbose) cout << client_name << ": Setting xattr failed with code "
1372 << r
1373 << std::endl;
1374 }
1375
1376 return 0;
1377}
1378
1379int KvFlatBtreeAsync::set(const string &key, const bufferlist &val,
1380 bool update_on_existing) {
1381 if (verbose) cout << client_name << " is "
1382 << (update_on_existing? "updating " : "setting ")
1383 << key << std::endl;
1384 int err = 0;
1385 utime_t mytime;
1386 index_data idata(key);
1387
1388 if (verbose) cout << "\t" << client_name << ": finding oid" << std::endl;
1389 err = read_index(key, &idata, NULL, false);
1390 if (err < 0) {
1391 if (verbose) cout << "\t" << client_name
1392 << ": getting oid failed with code "
1393 << err << std::endl;
1394 return err;
1395 }
1396 if (verbose) cout << "\t" << client_name << ": index data is " << idata.str()
1397 << ", object is " << idata.obj << std::endl;
1398
1399 err = set_op(key, val, update_on_existing, idata);
1400
1401 if (verbose) cout << "\t" << client_name << ": finished set with " << err
1402 << std::endl;
1403 return err;
1404}
1405
1406int KvFlatBtreeAsync::set_op(const string &key, const bufferlist &val,
1407 bool update_on_existing, index_data &idata) {
1408 //write
1409
1410 bufferlist inbl;
1411 omap_set_args args;
1412 args.bound = 2 * k;
1413 args.exclusive = !update_on_existing;
1414 args.omap[key] = val;
1415 args.encode(inbl);
1416
1417 librados::ObjectWriteOperation owo;
1418 owo.exec("kvs", "omap_insert", inbl);
1419 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) {
1420 if (verbose) cout << client_name << " IS SUICIDING!" << std::endl;
1421 return -ESUICIDE;
1422 }
1423 if (verbose) cout << "\t" << client_name << ": inserting " << key
1424 << " into object "
1425 << idata.obj << std::endl;
1426 int err = io_ctx.operate(idata.obj, &owo);
1427 if (err < 0) {
1428 switch (err) {
1429 case -EEXIST: {
1430 //the key already exists and this is an exclusive insert.
1431 cerr << "\t" << client_name << ": writing key failed with "
1432 << err << std::endl;
1433 return err;
1434 }
1435 case -EKEYREJECTED: {
1436 //the object needs to be split.
1437 do {
1438 if (verbose) cout << "\t" << client_name << ": running split on "
1439 << idata.obj
1440 << std::endl;
1441 err = read_index(key, &idata, NULL, true);
1442 if (err < 0) {
1443 if (verbose) cout << "\t" << client_name
1444 << ": getting oid failed with code "
1445 << err << std::endl;
1446 return err;
1447 }
1448 err = split(idata);
1449 if (err < 0 && err != -ENOENT && err != -EBALANCE) {
1450 if (verbose) cerr << "\t" << client_name << ": split failed with "
1451 << err << std::endl;
1452 int ret = handle_set_rm_errors(err, idata.obj, key, &idata, NULL);
1453 switch (ret) {
1454 case -ESUICIDE:
1455 if (verbose) cout << client_name << " IS SUICIDING!" << std::endl;
1456 return ret;
1457 break;
1458 case 1:
1459 return set_op(key, val, update_on_existing, idata);
1460 break;
1461 case 2:
1462 return err;
1463 break;
1464 }
1465 }
1466 } while (err < 0 && err != -EBALANCE && err != -ENOENT);
1467 err = read_index(key, &idata, NULL, true);
1468 if (err < 0) {
1469 if (verbose) cout << "\t" << client_name
1470 << ": getting oid failed with code "
1471 << err << std::endl;
1472 return err;
1473 }
1474 return set_op(key, val, update_on_existing, idata);
1475 }
1476 default:
1477 if (verbose) cerr << "\t" << client_name << ": writing obj failed with "
1478 << err << std::endl;
1479 if (err == -ENOENT || err == -EACCES) {
1480 if (err == -ENOENT) {
1481 if (verbose) cout << "CACHE FAILURE" << std::endl;
1482 }
1483 err = read_index(key, &idata, NULL, true);
1484 if (err < 0) {
1485 if (verbose) cout << "\t" << client_name
1486 << ": getting oid failed with code "
1487 << err << std::endl;
1488 return err;
1489 }
1490 if (verbose) cout << "\t" << client_name << ": index data is "
1491 << idata.str()
1492 << ", object is " << idata.obj << std::endl;
1493 return set_op(key, val, update_on_existing, idata);
1494 } else {
1495 return err;
1496 }
1497 }
1498 }
1499 return 0;
1500}
1501
1502int KvFlatBtreeAsync::remove(const string &key) {
1503 if (verbose) cout << client_name << ": removing " << key << std::endl;
1504 int err = 0;
1505 string obj;
1506 utime_t mytime;
1507 index_data idata;
1508 index_data next_idata;
1509
1510 if (verbose) cout << "\t" << client_name << ": finding oid" << std::endl;
1511 err = read_index(key, &idata, &next_idata, false);
1512 if (err < 0) {
1513 if (verbose) cout << "getting oid failed with code " << err << std::endl;
1514 return err;
1515 }
1516 obj = idata.obj;
1517 if (verbose) cout << "\t" << client_name << ": idata is " << idata.str()
1518 << ", next_idata is " << next_idata.str()
1519 << ", obj is " << obj << std::endl;
1520
1521 err = remove_op(key, idata, next_idata);
1522
1523 if (verbose) cout << "\t" << client_name << ": finished remove with " << err
1524 << " and exiting" << std::endl;
1525 return err;
1526}
1527
1528int KvFlatBtreeAsync::remove_op(const string &key, index_data &idata,
1529 index_data &next_idata) {
1530 //write
1531 bufferlist inbl;
1532 omap_rm_args args;
1533 args.bound = k;
1534 args.omap.insert(key);
1535 args.encode(inbl);
1536
1537 librados::ObjectWriteOperation owo;
1538 owo.exec("kvs", "omap_remove", inbl);
1539 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) {
1540 if (verbose) cout << client_name << " IS SUICIDING!" << std::endl;
1541 return -ESUICIDE;
1542 }
1543 if (verbose) cout << "\t" << client_name << ": removing " << key << " from "
1544 << idata.obj
1545 << std::endl;
1546 int err = io_ctx.operate(idata.obj, &owo);
1547 if (err < 0) {
1548 if (verbose) cout << "\t" << client_name << ": writing obj failed with "
1549 << err << std::endl;
1550 switch (err) {
1551 case -ENODATA: {
1552 //the key does not exist in the object
1553 return err;
1554 }
1555 case -EKEYREJECTED: {
1556 //the object needs to be split.
1557 do {
1558 if (verbose) cerr << "\t" << client_name << ": running rebalance on "
1559 << idata.obj << std::endl;
1560 err = read_index(key, &idata, &next_idata, true);
1561 if (err < 0) {
1562 if (verbose) cout << "\t" << client_name
1563 << ": getting oid failed with code "
1564 << err << std::endl;
1565 return err;
1566 }
1567 err = rebalance(idata, next_idata);
1568 if (err < 0 && err != -ENOENT && err != -EBALANCE) {
1569 if (verbose) cerr << "\t" << client_name << ": rebalance returned "
1570 << err << std::endl;
1571 int ret = handle_set_rm_errors(err, idata.obj, key, &idata,
1572 &next_idata);
1573 switch (ret) {
1574 case -ESUICIDE:
1575 if (verbose) cout << client_name << " IS SUICIDING!" << std::endl;
1576 return err;
1577 break;
1578 case 1:
1579 return remove_op(key, idata, next_idata);
1580 break;
1581 case 2:
1582 return err;
1583 break;
1584 case -EUCLEAN:
1585 //this is the only node, so it's ok to go below k.
1586 librados::ObjectWriteOperation owo;
1587 bufferlist inbl;
1588 omap_rm_args args;
1589 args.bound = 0;
1590 args.omap.insert(key);
1591 args.encode(inbl);
1592 owo.exec("kvs", "omap_remove", inbl);
1593 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)()
1594 == 1 ) {
1595 if (verbose) cout << client_name << " IS SUICIDING!"
1596 << std::endl;
1597 return -ESUICIDE;
1598 }
1599 if (verbose) cout << "\t" << client_name << ": removing " << key
1600 << " from "
1601 << idata.obj
1602 << std::endl;
1603 int err = io_ctx.operate(idata.obj, &owo);
1604 if (err == 0) {
1605 return 0;
1606 }
1607 }
1608 }
1609 } while (err < 0 && err != -EBALANCE && err != -ENOENT);
1610 err = read_index(key, &idata, &next_idata, true);
1611 if (err < 0) {
1612 if (verbose) cout << "\t" << client_name
1613 << ": getting oid failed with code "
1614 << err << std::endl;
1615 return err;
1616 }
1617 return remove(key);
1618 }
1619 default:
1620 if (err == -ENOENT || err == -EACCES) {
1621 err = read_index(key, &idata, &next_idata, true);
1622 if (err < 0) {
1623 if (verbose) cout << "\t" << client_name
1624 << ": getting oid failed with code "
1625 << err << std::endl;
1626 return err;
1627 }
1628 if (verbose) cout << "\t" << client_name << ": index data is "
1629 << idata.str()
1630 << ", object is " << idata.obj << std::endl;
1631 //idea: we read the time every time we read the index anyway - store it.
1632 return remove_op(key, idata, next_idata);
1633 } else {
1634 return err;
1635 }
1636 }
1637 }
1638 return 0;
1639}
1640
1641int KvFlatBtreeAsync::handle_set_rm_errors(int &err, string obj,
1642 string key,
1643 index_data * idata, index_data * next_idata) {
1644 if (err == -ESUICIDE) {
1645 return err;
1646 } else if (err == -ECANCELED //if an object was unwritable or index changed
1647 || err == -EPREFIX //if there is currently a prefix
1648 || err == -ETIMEDOUT// if the index changes during the op - i.e. cleanup
1649 || err == -EACCES) //possible if we were acting on old index data
1650 {
1651 err = read_index(key, idata, next_idata, true);
1652 if (err < 0) {
1653 return err;
1654 }
1655 if (verbose) cout << "\t" << client_name << ": prefix is " << idata->str()
1656 << std::endl;
1657 if (idata->obj != obj) {
1658 //someone else has split or cleaned up or something. start over.
1659 return 1;//meaning repeat
1660 }
1661 } else if (err != -ETIMEDOUT && err != -ERANGE && err != -EACCES
1662 && err != -EUCLEAN){
1663 if (verbose) cout << "\t" << client_name
1664 << ": split encountered an unexpected error: " << err
1665 << std::endl;
1666 return 2;
1667 }
1668 return err;
1669}
1670
1671int KvFlatBtreeAsync::get(const string &key, bufferlist *val) {
1672 opmap['g']++;
1673 if (verbose) cout << client_name << ": getting " << key << std::endl;
1674 int err = 0;
1675 index_data idata;
1676 utime_t mytime;
1677
1678 if ((((KeyValueStructure *)this)->*KvFlatBtreeAsync::interrupt)() == 1 ) {
1679 return -ESUICIDE;
1680 }
1681 err = read_index(key, &idata, NULL, false);
1682 mytime = ceph_clock_now();
1683 if (err < 0) {
1684 if (verbose) cout << "getting oid failed with code " << err << std::endl;
1685 return err;
1686 }
1687
1688 err = get_op(key, val, idata);
1689
1690 if (verbose) cout << client_name << ": got " << key << " with " << err
1691 << std::endl;
1692
1693 return err;
1694}
1695
1696int KvFlatBtreeAsync::get_op(const string &key, bufferlist *val,
1697 index_data &idata) {
1698 int err = 0;
1699 std::set<std::string> key_set;
1700 key_set.insert(key);
1701 map<std::string,bufferlist> omap;
1702 librados::ObjectReadOperation read;
1703 read.omap_get_vals_by_keys(key_set, &omap, &err);
1704 err = io_ctx.operate(idata.obj, &read, NULL);
1705 if (err < 0) {
1706 if (err == -ENOENT) {
1707 err = read_index(key, &idata, NULL, true);
1708 if (err < 0) {
1709 if (verbose) cout << "\t" << client_name
1710 << ": getting oid failed with code "
1711 << err << std::endl;
1712 return err;
1713 }
1714 if (verbose) cout << "\t" << client_name << ": index data is "
1715 << idata.str()
1716 << ", object is " << idata.obj << std::endl;
1717 return get_op(key, val, idata);
1718 } else {
1719 if (verbose) cout << client_name
1720 << ": get encountered an unexpected error: " << err
1721 << std::endl;
1722 return err;
1723 }
1724 }
1725
1726 *val = omap[key];
1727 return err;
1728}
1729
1730void *KvFlatBtreeAsync::pset(void *ptr) {
1731 struct aio_set_args *args = (struct aio_set_args *)ptr;
1732 *args->err =
1733 args->kvba->KvFlatBtreeAsync::set((string)args->key,
1734 (bufferlist)args->val, (bool)args->exc);
1735 args->cb(args->err, args->cb_args);
1736 delete args;
1737 return NULL;
1738}
1739
1740void KvFlatBtreeAsync::aio_set(const string &key, const bufferlist &val,
1741 bool exclusive, callback cb, void * cb_args, int * err) {
1742 aio_set_args *args = new aio_set_args();
1743 args->kvba = this;
1744 args->key = key;
1745 args->val = val;
1746 args->exc = exclusive;
1747 args->cb = cb;
1748 args->cb_args = cb_args;
1749 args->err = err;
1750 pthread_t t;
1751 int r = pthread_create(&t, NULL, pset, (void*)args);
1752 if (r < 0) {
1753 *args->err = r;
1754 return;
1755 }
1756 pthread_detach(t);
1757}
1758
1759void *KvFlatBtreeAsync::prm(void *ptr) {
1760 struct aio_rm_args *args = (struct aio_rm_args *)ptr;
1761 *args->err =
1762 args->kvba->KvFlatBtreeAsync::remove((string)args->key);
1763 args->cb(args->err, args->cb_args);
1764 delete args;
1765 return NULL;
1766}
1767
1768void KvFlatBtreeAsync::aio_remove(const string &key,
1769 callback cb, void * cb_args, int * err) {
1770 aio_rm_args * args = new aio_rm_args();
1771 args->kvba = this;
1772 args->key = key;
1773 args->cb = cb;
1774 args->cb_args = cb_args;
1775 args->err = err;
1776 pthread_t t;
1777 int r = pthread_create(&t, NULL, prm, (void*)args);
1778 if (r < 0) {
1779 *args->err = r;
1780 return;
1781 }
1782 pthread_detach(t);
1783}
1784
1785void *KvFlatBtreeAsync::pget(void *ptr) {
1786 struct aio_get_args *args = (struct aio_get_args *)ptr;
1787 *args->err =
1788 args->kvba->KvFlatBtreeAsync::get((string)args->key,
1789 (bufferlist *)args->val);
1790 args->cb(args->err, args->cb_args);
1791 delete args;
1792 return NULL;
1793}
1794
1795void KvFlatBtreeAsync::aio_get(const string &key, bufferlist *val,
1796 callback cb, void * cb_args, int * err) {
1797 aio_get_args * args = new aio_get_args();
1798 args->kvba = this;
1799 args->key = key;
1800 args->val = val;
1801 args->cb = cb;
1802 args->cb_args = cb_args;
1803 args->err = err;
1804 pthread_t t;
1805 int r = pthread_create(&t, NULL, pget, (void*)args);
1806 if (r < 0) {
1807 *args->err = r;
1808 return;
1809 }
1810 pthread_detach(t);
1811}
1812
1813int KvFlatBtreeAsync::set_many(const map<string, bufferlist> &in_map) {
1814 int err = 0;
1815 bufferlist inbl;
1816 bufferlist outbl;
1817 std::set<string> keys;
1818
1819 map<string, bufferlist> big_map;
1820 for (map<string, bufferlist>::const_iterator it = in_map.begin();
1821 it != in_map.end(); ++it) {
1822 keys.insert(it->first);
1823 big_map.insert(*it);
1824 }
1825
1826 if (verbose) cout << "created key set and big_map" << std::endl;
1827
11fdf7f2 1828 encode(keys, inbl);
7c673cae
FG
1829 librados::AioCompletion * aioc = rados.aio_create_completion();
1830 io_ctx.aio_exec(index_name, aioc, "kvs", "read_many", inbl, &outbl);
1831 aioc->wait_for_safe();
1832 err = aioc->get_return_value();
1833 aioc->release();
1834 if (err < 0) {
1835 cerr << "getting index failed with " << err << std::endl;
1836 return err;
1837 }
1838
1839 map<string, bufferlist> imap;//read from the index
11fdf7f2
TL
1840 auto blit = outbl.cbegin();
1841 decode(imap, blit);
7c673cae
FG
1842
1843 if (verbose) cout << "finished reading index for objects. there are "
1844 << imap.size() << " entries that need to be changed. " << std::endl;
1845
1846
1847 vector<object_data> to_delete;
1848
1849 vector<object_data> to_create;
1850
1851 if (verbose) cout << "setting up to_delete and to_create vectors from index "
1852 << "map" << std::endl;
1853 //set up to_delete from index map
1854 for (map<string, bufferlist>::iterator it = imap.begin(); it != imap.end();
1855 ++it){
1856 index_data idata;
1857 blit = it->second.begin();
1858 idata.decode(blit);
1859 to_delete.push_back(object_data(idata.min_kdata, idata.kdata, idata.obj));
1860 err = read_object(idata.obj, &to_delete[to_delete.size() - 1]);
1861 if (err < 0) {
1862 if (verbose) cout << "reading " << idata.obj << " failed with " << err
1863 << std::endl;
1864 return set_many(in_map);
1865 }
1866
1867 big_map.insert(to_delete[to_delete.size() - 1].omap.begin(),
1868 to_delete[to_delete.size() - 1].omap.end());
1869 }
1870
1871 to_create.push_back(object_data(
1872 to_string(client_name, client_index++)));
1873 to_create[0].min_kdata = to_delete[0].min_kdata;
1874
1875 for(map<string, bufferlist>::iterator it = big_map.begin();
1876 it != big_map.end(); ++it) {
1877 if (to_create[to_create.size() - 1].omap.size() == 1.5 * k) {
1878 to_create[to_create.size() - 1].max_kdata =
1879 key_data(to_create[to_create.size() - 1]
1880 .omap.rbegin()->first);
1881
1882 to_create.push_back(object_data(
1883 to_string(client_name, client_index++)));
1884 to_create[to_create.size() - 1].min_kdata =
1885 to_create[to_create.size() - 2].max_kdata;
1886 }
1887
1888 to_create[to_create.size() - 1].omap.insert(*it);
1889 }
1890 to_create[to_create.size() - 1].max_kdata =
1891 to_delete[to_delete.size() - 1].max_kdata;
1892
1893 vector<librados::ObjectWriteOperation> owos(2 + 2 * to_delete.size()
1894 + to_create.size());
1895 vector<pair<pair<int, string>, librados::ObjectWriteOperation*> > ops;
1896
1897
1898 index_data idata;
1899 set_up_prefix_index(to_create, to_delete, &owos[0], &idata, &err);
1900
1901 if (verbose) cout << "finished making to_create and to_delete. "
1902 << std::endl;
1903
1904 ops.push_back(make_pair(
1905 pair<int, string>(ADD_PREFIX, index_name),
1906 &owos[0]));
1907 for (int i = 1; i < 2 + 2 * (int)to_delete.size() + (int)to_create.size();
1908 i++) {
1909 ops.push_back(make_pair(make_pair(0,""), &owos[i]));
1910 }
1911
1912 set_up_ops(to_create, to_delete, &ops, idata, &err);
1913
1914 cout << "finished setting up ops. Starting critical section..." << std::endl;
1915
1916 /////BEGIN CRITICAL SECTION/////
1917 //put prefix on index entry for idata.val
1918 err = perform_ops("\t\t" + client_name + "-set_many:", idata, &ops);
1919 if (err < 0) {
1920 return set_many(in_map);
1921 }
1922 if (verbose) cout << "\t\t" << client_name << "-split: done splitting."
1923 << std::endl;
1924 /////END CRITICAL SECTION/////
1925 icache_lock.Lock();
1926 for (vector<delete_data>::iterator it = idata.to_delete.begin();
1927 it != idata.to_delete.end(); ++it) {
1928 icache.erase(it->max);
1929 }
1930 for (vector<create_data>::iterator it = idata.to_create.begin();
1931 it != idata.to_create.end(); ++it) {
1932 icache.push(index_data(*it));
1933 }
1934 icache_lock.Unlock();
1935 return err;
1936}
1937
1938int KvFlatBtreeAsync::remove_all() {
1939 if (verbose) cout << client_name << ": removing all" << std::endl;
1940 int err = 0;
1941 librados::ObjectReadOperation oro;
1942 librados::AioCompletion * oro_aioc = rados.aio_create_completion();
1943 std::map<std::string, bufferlist> index_set;
1944 oro.omap_get_vals2("",LONG_MAX,&index_set, nullptr, &err);
1945 err = io_ctx.aio_operate(index_name, oro_aioc, &oro, NULL);
1946 if (err < 0){
1947 if (err == -ENOENT) {
1948 return 0;
1949 }
1950 if (verbose) cout << "getting keys failed with error " << err << std::endl;
1951 return err;
1952 }
1953 oro_aioc->wait_for_safe();
1954 oro_aioc->release();
1955
1956 librados::ObjectWriteOperation rm_index;
1957 librados::AioCompletion * rm_index_aioc = rados.aio_create_completion();
1958 map<std::string,bufferlist> new_index;
1959 new_index["1"] = index_set["1"];
1960 rm_index.omap_clear();
1961 rm_index.omap_set(new_index);
1962 io_ctx.aio_operate(index_name, rm_index_aioc, &rm_index);
1963 err = rm_index_aioc->get_return_value();
1964 rm_index_aioc->release();
1965 if (err < 0) {
1966 if (verbose) cout << "rm index aioc failed with " << err
1967 << std::endl;
1968 return err;
1969 }
1970
1971 if (!index_set.empty()) {
1972 for (std::map<std::string,bufferlist>::iterator it = index_set.begin();
1973 it != index_set.end(); ++it){
1974 librados::ObjectWriteOperation sub;
1975 if (it->first == "1") {
1976 sub.omap_clear();
1977 } else {
1978 sub.remove();
1979 }
1980 index_data idata;
11fdf7f2 1981 auto b = it->second.cbegin();
7c673cae
FG
1982 idata.decode(b);
1983 io_ctx.operate(idata.obj, &sub);
1984 }
1985 }
1986
1987 icache.clear();
1988
1989 return 0;
1990}
1991
1992int KvFlatBtreeAsync::get_all_keys(std::set<std::string> *keys) {
1993 if (verbose) cout << client_name << ": getting all keys" << std::endl;
1994 int err = 0;
1995 librados::ObjectReadOperation oro;
1996 std::map<std::string,bufferlist> index_set;
1997 oro.omap_get_vals2("",LONG_MAX,&index_set, nullptr, &err);
1998 io_ctx.operate(index_name, &oro, NULL);
1999 if (err < 0){
2000 if (verbose) cout << "getting keys failed with error " << err << std::endl;
2001 return err;
2002 }
2003 for (std::map<std::string,bufferlist>::iterator it = index_set.begin();
2004 it != index_set.end(); ++it){
2005 librados::ObjectReadOperation sub;
2006 std::set<std::string> ret;
2007 sub.omap_get_keys2("",LONG_MAX,&ret, nullptr, &err);
2008 index_data idata;
11fdf7f2 2009 auto b = it->second.cbegin();
7c673cae
FG
2010 idata.decode(b);
2011 io_ctx.operate(idata.obj, &sub, NULL);
2012 keys->insert(ret.begin(), ret.end());
2013 }
2014 return err;
2015}
2016
2017int KvFlatBtreeAsync::get_all_keys_and_values(
2018 map<std::string,bufferlist> *kv_map) {
2019 if (verbose) cout << client_name << ": getting all keys and values"
2020 << std::endl;
2021 int err = 0;
2022 librados::ObjectReadOperation first_read;
2023 std::set<std::string> index_set;
2024 first_read.omap_get_keys2("",LONG_MAX,&index_set, nullptr, &err);
2025 io_ctx.operate(index_name, &first_read, NULL);
2026 if (err < 0){
2027 if (verbose) cout << "getting keys failed with error " << err << std::endl;
2028 return err;
2029 }
2030 for (std::set<std::string>::iterator it = index_set.begin();
2031 it != index_set.end(); ++it){
2032 librados::ObjectReadOperation sub;
2033 map<std::string, bufferlist> ret;
2034 sub.omap_get_vals2("",LONG_MAX,&ret, nullptr, &err);
2035 io_ctx.operate(*it, &sub, NULL);
2036 kv_map->insert(ret.begin(), ret.end());
2037 }
2038 return err;
2039}
2040
2041bool KvFlatBtreeAsync::is_consistent() {
2042 int err;
2043 bool ret = true;
2044 if (verbose) cout << client_name << ": checking consistency" << std::endl;
2045 std::map<std::string,bufferlist> index;
2046 map<std::string, std::set<std::string> > sub_objs;
2047 librados::ObjectReadOperation oro;
2048 oro.omap_get_vals2("",LONG_MAX,&index, nullptr, &err);
2049 io_ctx.operate(index_name, &oro, NULL);
2050 if (err < 0){
2051 //probably because the index doesn't exist - this might be ok.
2052 for (librados::NObjectIterator oit = io_ctx.nobjects_begin();
2053 oit != io_ctx.nobjects_end(); ++oit) {
2054 //if this executes, there are floating objects.
2055 cerr << "Not consistent! found floating object " << oit->get_oid()
2056 << std::endl;
2057 ret = false;
2058 }
2059 return ret;
2060 }
2061
2062 std::map<std::string, string> parsed_index;
2063 std::set<std::string> onames;
2064 std::set<std::string> special_names;
2065 for (map<std::string,bufferlist>::iterator it = index.begin();
2066 it != index.end(); ++it) {
2067 if (it->first != "") {
2068 index_data idata;
11fdf7f2 2069 auto b = it->second.cbegin();
7c673cae
FG
2070 idata.decode(b);
2071 if (idata.prefix != "") {
2072 for(vector<delete_data>::iterator dit = idata.to_delete.begin();
2073 dit != idata.to_delete.end(); ++dit) {
2074 librados::ObjectReadOperation oro;
2075 librados::AioCompletion * aioc = rados.aio_create_completion();
2076 bufferlist un;
2077 oro.getxattr("unwritable", &un, &err);
2078 io_ctx.aio_operate(dit->obj, aioc, &oro, NULL);
2079 aioc->wait_for_safe();
2080 err = aioc->get_return_value();
2081 if (ceph_clock_now() - idata.ts > timeout) {
2082 if (err < 0) {
2083 aioc->release();
2084 if (err == -ENOENT) {
2085 continue;
2086 } else {
2087 cerr << "Not consistent! reading object " << dit->obj
2088 << "returned " << err << std::endl;
2089 ret = false;
2090 break;
2091 }
2092 }
2093 if (atoi(string(un.c_str(), un.length()).c_str()) != 1 &&
2094 aioc->get_version64() != dit->version) {
2095 cerr << "Not consistent! object " << dit->obj << " has been "
2096 << " modified since the client died was not cleaned up."
2097 << std::endl;
2098 ret = false;
2099 }
2100 }
2101 special_names.insert(dit->obj);
2102 aioc->release();
2103 }
2104 for(vector<create_data >::iterator cit = idata.to_create.begin();
2105 cit != idata.to_create.end(); ++cit) {
2106 special_names.insert(cit->obj);
2107 }
2108 }
2109 parsed_index.insert(make_pair(it->first, idata.obj));
2110 onames.insert(idata.obj);
2111 }
2112 }
2113
2114 //make sure that an object exists iff it either is the index
2115 //or is listed in the index
2116 for (librados::NObjectIterator oit = io_ctx.nobjects_begin();
2117 oit != io_ctx.nobjects_end(); ++oit) {
2118 string name = oit->get_oid();
2119 if (name != index_name && onames.count(name) == 0
2120 && special_names.count(name) == 0) {
2121 cerr << "Not consistent! found floating object " << name << std::endl;
2122 ret = false;
2123 }
2124 }
2125
2126 //check objects
2127 string prev = "";
2128 for (std::map<std::string, string>::iterator it = parsed_index.begin();
2129 it != parsed_index.end();
2130 ++it) {
2131 librados::ObjectReadOperation read;
2132 read.omap_get_keys2("", LONG_MAX, &sub_objs[it->second], nullptr, &err);
2133 err = io_ctx.operate(it->second, &read, NULL);
2134 int size_int = (int)sub_objs[it->second].size();
2135
2136 //check that size is in the right range
2137 if (it->first != "1" && special_names.count(it->second) == 0 &&
2138 err != -ENOENT && (size_int > 2*k|| size_int < k)
2139 && parsed_index.size() > 1) {
2140 cerr << "Not consistent! Object " << *it << " has size " << size_int
2141 << ", which is outside the acceptable range." << std::endl;
2142 ret = false;
2143 }
2144
2145 //check that all keys belong in that object
2146 for(std::set<std::string>::iterator subit = sub_objs[it->second].begin();
2147 subit != sub_objs[it->second].end(); ++subit) {
2148 if ((it->first != "1"
2149 && *subit > it->first.substr(1,it->first.length()))
2150 || *subit <= prev) {
2151 cerr << "Not consistent! key " << *subit << " does not belong in "
2152 << *it << std::endl;
2153 cerr << "not last element, i.e. " << it->first << " not equal to 1? "
2154 << (it->first != "1") << std::endl
2155 << "greater than " << it->first.substr(1,it->first.length())
2156 <<"? " << (*subit > it->first.substr(1,it->first.length()))
2157 << std::endl
2158 << "less than or equal to " << prev << "? "
2159 << (*subit <= prev) << std::endl;
2160 ret = false;
2161 }
2162 }
2163
2164 prev = it->first.substr(1,it->first.length());
2165 }
2166
2167 if (!ret) {
2168 if (verbose) cout << "failed consistency test - see error log"
2169 << std::endl;
2170 cerr << str();
2171 } else {
2172 if (verbose) cout << "passed consistency test" << std::endl;
2173 }
2174 return ret;
2175}
2176
2177string KvFlatBtreeAsync::str() {
2178 stringstream ret;
2179 ret << "Top-level map:" << std::endl;
2180 int err = 0;
2181 std::set<std::string> keys;
2182 std::map<std::string,bufferlist> index;
2183 librados::ObjectReadOperation oro;
2184 librados::AioCompletion * top_aioc = rados.aio_create_completion();
2185 oro.omap_get_vals2("",LONG_MAX,&index, nullptr, &err);
2186 io_ctx.aio_operate(index_name, top_aioc, &oro, NULL);
2187 top_aioc->wait_for_safe();
2188 err = top_aioc->get_return_value();
2189 top_aioc->release();
2190 if (err < 0 && err != -5){
2191 if (verbose) cout << "getting keys failed with error " << err << std::endl;
2192 return ret.str();
2193 }
2194 if(index.empty()) {
2195 ret << "There are no objects!" << std::endl;
2196 return ret.str();
2197 }
2198
2199 for (map<std::string,bufferlist>::iterator it = index.begin();
2200 it != index.end(); ++it) {
2201 keys.insert(string(it->second.c_str(), it->second.length())
2202 .substr(1,it->second.length()));
2203 }
2204
2205 vector<std::string> all_names;
2206 vector<int> all_sizes(index.size());
2207 vector<int> all_versions(index.size());
2208 vector<bufferlist> all_unwrit(index.size());
2209 vector<map<std::string,bufferlist> > all_maps(keys.size());
2210 vector<map<std::string,bufferlist>::iterator> its(keys.size());
2211 unsigned done = 0;
2212 vector<bool> dones(keys.size());
2213 ret << std::endl << string(150,'-') << std::endl;
2214
2215 for (map<std::string,bufferlist>::iterator it = index.begin();
2216 it != index.end(); ++it){
2217 index_data idata;
11fdf7f2 2218 auto b = it->second.cbegin();
7c673cae
FG
2219 idata.decode(b);
2220 string s = idata.str();
2221 ret << "|" << string((148 -
2222 ((*it).first.length()+s.length()+3))/2,' ');
2223 ret << (*it).first;
2224 ret << " | ";
2225 ret << string(idata.str());
2226 ret << string((148 -
2227 ((*it).first.length()+s.length()+3))/2,' ');
2228 ret << "|\t";
2229 all_names.push_back(idata.obj);
2230 ret << std::endl << string(150,'-') << std::endl;
2231 }
2232
2233 int indexer = 0;
2234
2235 //get the object names and sizes
2236 for(vector<std::string>::iterator it = all_names.begin(); it
2237 != all_names.end();
2238 ++it) {
2239 librados::ObjectReadOperation oro;
2240 librados::AioCompletion *aioc = rados.aio_create_completion();
2241 oro.omap_get_vals2("", LONG_MAX, &all_maps[indexer], nullptr, &err);
2242 oro.getxattr("unwritable", &all_unwrit[indexer], &err);
2243 io_ctx.aio_operate(*it, aioc, &oro, NULL);
2244 aioc->wait_for_safe();
2245 if (aioc->get_return_value() < 0) {
2246 ret << "reading" << *it << "failed: " << err << std::endl;
2247 //return ret.str();
2248 }
2249 all_sizes[indexer] = all_maps[indexer].size();
2250 all_versions[indexer] = aioc->get_version64();
2251 indexer++;
2252 aioc->release();
2253 }
2254
2255 ret << "///////////////////OBJECT NAMES////////////////" << std::endl;
2256 //HEADERS
2257 ret << std::endl;
2258 for (int i = 0; i < indexer; i++) {
2259 ret << "---------------------------\t";
2260 }
2261 ret << std::endl;
2262 for (int i = 0; i < indexer; i++) {
2263 ret << "|" << string((25 -
2264 (string("Bucket: ").length() + all_names[i].length()))/2, ' ');
2265 ret << "Bucket: " << all_names[i];
2266 ret << string((25 -
2267 (string("Bucket: ").length() + all_names[i].length()))/2, ' ') << "|\t";
2268 }
2269 ret << std::endl;
2270 for (int i = 0; i < indexer; i++) {
2271 its[i] = all_maps[i].begin();
2272 ret << "|" << string((25 - (string("size: ").length()
2273 + to_string("",all_sizes[i]).length()))/2, ' ');
2274 ret << "size: " << all_sizes[i];
2275 ret << string((25 - (string("size: ").length()
2276 + to_string("",all_sizes[i]).length()))/2, ' ') << "|\t";
2277 }
2278 ret << std::endl;
2279 for (int i = 0; i < indexer; i++) {
2280 its[i] = all_maps[i].begin();
2281 ret << "|" << string((25 - (string("version: ").length()
2282 + to_string("",all_versions[i]).length()))/2, ' ');
2283 ret << "version: " << all_versions[i];
2284 ret << string((25 - (string("version: ").length()
2285 + to_string("",all_versions[i]).length()))/2, ' ') << "|\t";
2286 }
2287 ret << std::endl;
2288 for (int i = 0; i < indexer; i++) {
2289 its[i] = all_maps[i].begin();
2290 ret << "|" << string((25 - (string("unwritable? ").length()
2291 + 1))/2, ' ');
2292 ret << "unwritable? " << string(all_unwrit[i].c_str(),
2293 all_unwrit[i].length());
2294 ret << string((25 - (string("unwritable? ").length()
2295 + 1))/2, ' ') << "|\t";
2296 }
2297 ret << std::endl;
2298 for (int i = 0; i < indexer; i++) {
2299 ret << "---------------------------\t";
2300 }
2301 ret << std::endl;
2302 ret << "///////////////////THE ACTUAL BLOCKS////////////////" << std::endl;
2303
2304
2305 ret << std::endl;
2306 for (int i = 0; i < indexer; i++) {
2307 ret << "---------------------------\t";
2308 }
2309 ret << std::endl;
2310 //each time through this part is two lines
2311 while(done < keys.size()) {
2312 for(int i = 0; i < indexer; i++) {
2313 if(dones[i]){
2314 ret << " \t";
2315 } else {
2316 if (its[i] == all_maps[i].end()){
2317 done++;
2318 dones[i] = true;
2319 ret << " \t";
2320 } else {
2321 ret << "|" << string((25 -
2322 ((*its[i]).first.length()+its[i]->second.length()+3))/2,' ');
2323 ret << (*its[i]).first;
2324 ret << " | ";
2325 ret << string(its[i]->second.c_str(), its[i]->second.length());
2326 ret << string((25 -
2327 ((*its[i]).first.length()+its[i]->second.length()+3))/2,' ');
2328 ret << "|\t";
2329 ++(its[i]);
2330 }
2331
2332 }
2333 }
2334 ret << std::endl;
2335 for (int i = 0; i < indexer; i++) {
2336 if(dones[i]){
2337 ret << " \t";
2338 } else {
2339 ret << "---------------------------\t";
2340 }
2341 }
2342 ret << std::endl;
2343
2344 }
2345 return ret.str();
2346}