]> git.proxmox.com Git - ceph.git/blame - ceph/src/os/filestore/DBObjectMap.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / os / filestore / DBObjectMap.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2
3#include "include/int_types.h"
4#include "include/buffer.h"
5
6#include <iostream>
7#include <set>
8#include <map>
9#include <string>
7c673cae
FG
10#include <vector>
11
12#include "os/ObjectMap.h"
13#include "kv/KeyValueDB.h"
14#include "DBObjectMap.h"
15#include <errno.h>
16
17#include "common/debug.h"
18#include "common/config.h"
11fdf7f2 19#include "include/ceph_assert.h"
7c673cae
FG
20
21#define dout_context cct
22#define dout_subsys ceph_subsys_filestore
23#undef dout_prefix
24#define dout_prefix *_dout << "filestore "
25
f67539c2
TL
26using std::map;
27using std::ostream;
28using std::ostringstream;
29using std::set;
30using std::string;
31using std::stringstream;
32using std::vector;
33
34using ceph::bufferlist;
35
7c673cae
FG
36const string DBObjectMap::USER_PREFIX = "_USER_";
37const string DBObjectMap::XATTR_PREFIX = "_AXATTR_";
38const string DBObjectMap::SYS_PREFIX = "_SYS_";
39const string DBObjectMap::COMPLETE_PREFIX = "_COMPLETE_";
40const string DBObjectMap::HEADER_KEY = "HEADER";
41const string DBObjectMap::USER_HEADER_KEY = "USER_HEADER";
42const string DBObjectMap::GLOBAL_STATE_KEY = "HEADER";
43const string DBObjectMap::HOBJECT_TO_SEQ = "_HOBJTOSEQ_";
44
45// Legacy
46const string DBObjectMap::LEAF_PREFIX = "_LEAF_";
47const string DBObjectMap::REVERSE_LEAF_PREFIX = "_REVLEAF_";
48
49static void append_escaped(const string &in, string *out)
50{
51 for (string::const_iterator i = in.begin(); i != in.end(); ++i) {
52 if (*i == '%') {
53 out->push_back('%');
54 out->push_back('p');
55 } else if (*i == '.') {
56 out->push_back('%');
57 out->push_back('e');
58 } else if (*i == '_') {
59 out->push_back('%');
60 out->push_back('u');
61 } else {
62 out->push_back(*i);
63 }
64 }
65}
66
3efd9988 67int DBObjectMap::check(std::ostream &out, bool repair, bool force)
7c673cae 68{
3efd9988 69 int errors = 0, comp_errors = 0;
7c673cae
FG
70 bool repaired = false;
71 map<uint64_t, uint64_t> parent_to_num_children;
72 map<uint64_t, uint64_t> parent_to_actual_num_children;
73 KeyValueDB::Iterator iter = db->get_iterator(HOBJECT_TO_SEQ);
74 for (iter->seek_to_first(); iter->valid(); iter->next()) {
75 _Header header;
76 bufferlist bl = iter->value();
77 while (true) {
11fdf7f2 78 auto bliter = bl.cbegin();
7c673cae
FG
79 header.decode(bliter);
80 if (header.seq != 0)
81 parent_to_actual_num_children[header.seq] = header.num_children;
82
3efd9988
FG
83 if (state.v == 2 || force) {
84 // Check complete table
85 bool complete_error = false;
86 boost::optional<string> prev;
87 KeyValueDB::Iterator complete_iter = db->get_iterator(USER_PREFIX + header_key(header.seq) + COMPLETE_PREFIX);
88 for (complete_iter->seek_to_first(); complete_iter->valid();
89 complete_iter->next()) {
90 if (prev && prev >= complete_iter->key()) {
91 out << "Bad complete for " << header.oid << std::endl;
92 complete_error = true;
93 break;
94 }
95 prev = string(complete_iter->value().c_str(), complete_iter->value().length() - 1);
96 }
97 if (complete_error) {
98 out << "Complete mapping for " << header.seq << " :" << std::endl;
99 for (complete_iter->seek_to_first(); complete_iter->valid();
100 complete_iter->next()) {
101 out << complete_iter->key() << " -> " << string(complete_iter->value().c_str(), complete_iter->value().length() - 1) << std::endl;
102 }
103 if (repair) {
104 repaired = true;
105 KeyValueDB::Transaction t = db->get_transaction();
106 t->rmkeys_by_prefix(USER_PREFIX + header_key(header.seq) + COMPLETE_PREFIX);
107 db->submit_transaction(t);
108 out << "Cleared complete mapping to repair" << std::endl;
109 } else {
110 errors++; // Only count when not repaired
111 comp_errors++; // Track errors here for version update
112 }
113 }
7c673cae
FG
114 }
115
116 if (header.parent == 0)
117 break;
118
119 if (!parent_to_num_children.count(header.parent))
120 parent_to_num_children[header.parent] = 0;
121 parent_to_num_children[header.parent]++;
122 if (parent_to_actual_num_children.count(header.parent))
123 break;
124
125 set<string> to_get;
126 map<string, bufferlist> got;
127 to_get.insert(HEADER_KEY);
128 db->get(sys_parent_prefix(header), to_get, &got);
129 if (got.empty()) {
130 out << "Missing: seq " << header.parent << std::endl;
131 errors++;
132 break;
133 } else {
134 bl = got.begin()->second;
135 }
136 }
137 }
138
139 for (map<uint64_t, uint64_t>::iterator i = parent_to_num_children.begin();
140 i != parent_to_num_children.end();
141 parent_to_num_children.erase(i++)) {
142 if (!parent_to_actual_num_children.count(i->first))
143 continue;
144 if (parent_to_actual_num_children[i->first] != i->second) {
145 out << "Invalid: seq " << i->first << " recorded children: "
146 << parent_to_actual_num_children[i->first] << " found: "
147 << i->second << std::endl;
148 errors++;
149 }
150 parent_to_actual_num_children.erase(i->first);
151 }
3efd9988
FG
152
153 // Only advance the version from 2 to 3 here
154 // Mark as legacy because there are still older structures
155 // we don't update. The value of legacy is only used
156 // for internal assertions.
157 if (comp_errors == 0 && state.v == 2 && repair) {
158 state.v = 3;
159 state.legacy = true;
160 set_state();
161 }
162
7c673cae
FG
163 if (errors == 0 && repaired)
164 return -1;
165 return errors;
166}
167
168string DBObjectMap::ghobject_key(const ghobject_t &oid)
169{
170 string out;
171 append_escaped(oid.hobj.oid.name, &out);
172 out.push_back('.');
173 append_escaped(oid.hobj.get_key(), &out);
174 out.push_back('.');
175 append_escaped(oid.hobj.nspace, &out);
176 out.push_back('.');
177
178 char snap_with_hash[1000];
179 char *t = snap_with_hash;
180 char *end = t + sizeof(snap_with_hash);
181 if (oid.hobj.snap == CEPH_NOSNAP)
182 t += snprintf(t, end - t, "head");
183 else if (oid.hobj.snap == CEPH_SNAPDIR)
184 t += snprintf(t, end - t, "snapdir");
185 else
186 t += snprintf(t, end - t, "%llx", (long long unsigned)oid.hobj.snap);
187
188 if (oid.hobj.pool == -1)
189 t += snprintf(t, end - t, ".none");
190 else
191 t += snprintf(t, end - t, ".%llx", (long long unsigned)oid.hobj.pool);
192 t += snprintf(t, end - t, ".%.*X", (int)(sizeof(uint32_t)*2), oid.hobj.get_hash());
193
194 if (oid.generation != ghobject_t::NO_GEN ||
195 oid.shard_id != shard_id_t::NO_SHARD) {
196 t += snprintf(t, end - t, ".%llx", (long long unsigned)oid.generation);
197 t += snprintf(t, end - t, ".%x", (int)oid.shard_id);
198 }
199 out += string(snap_with_hash);
200 return out;
201}
202
203// ok: pglog%u3%efs1...0.none.0017B237
204// bad: plana8923501-10...4c.3.ffffffffffffffff.2
205// fixed: plana8923501-10...4c.3.CB767F2D.ffffffffffffffff.2
206// returns 0 for false, 1 for true, negative for error
207int DBObjectMap::is_buggy_ghobject_key_v1(CephContext* cct,
208 const string &in)
209{
210 int dots = 5; // skip 5 .'s
211 const char *s = in.c_str();
212 do {
213 while (*s && *s != '.')
214 ++s;
215 if (!*s) {
216 derr << "unexpected null at " << (int)(s-in.c_str()) << dendl;
217 return -EINVAL;
218 }
219 ++s;
220 } while (*s && --dots);
221 if (!*s) {
222 derr << "unexpected null at " << (int)(s-in.c_str()) << dendl;
223 return -EINVAL;
224 }
225 // we are now either at a hash value (32 bits, 8 chars) or a generation
226 // value (64 bits) '.' and shard id. count the dots!
227 int len = 0;
228 while (*s && *s != '.') {
229 ++s;
230 ++len;
231 }
232 if (*s == '\0') {
233 if (len != 8) {
234 derr << "hash value is not 8 chars" << dendl;
235 return -EINVAL; // the hash value is always 8 chars.
236 }
237 return 0;
238 }
239 if (*s != '.') { // the shard follows.
240 derr << "missing final . and shard id at " << (int)(s-in.c_str()) << dendl;
241 return -EINVAL;
242 }
243 return 1;
244}
245
246
247string DBObjectMap::map_header_key(const ghobject_t &oid)
248{
249 return ghobject_key(oid);
250}
251
252string DBObjectMap::header_key(uint64_t seq)
253{
254 char buf[100];
255 snprintf(buf, sizeof(buf), "%.*" PRId64, (int)(2*sizeof(seq)), seq);
256 return string(buf);
257}
258
259string DBObjectMap::complete_prefix(Header header)
260{
261 return USER_PREFIX + header_key(header->seq) + COMPLETE_PREFIX;
262}
263
264string DBObjectMap::user_prefix(Header header)
265{
266 return USER_PREFIX + header_key(header->seq) + USER_PREFIX;
267}
268
269string DBObjectMap::sys_prefix(Header header)
270{
271 return USER_PREFIX + header_key(header->seq) + SYS_PREFIX;
272}
273
274string DBObjectMap::xattr_prefix(Header header)
275{
276 return USER_PREFIX + header_key(header->seq) + XATTR_PREFIX;
277}
278
279string DBObjectMap::sys_parent_prefix(_Header header)
280{
281 return USER_PREFIX + header_key(header.parent) + SYS_PREFIX;
282}
283
284int DBObjectMap::DBObjectMapIteratorImpl::init()
285{
286 invalid = false;
287 if (ready) {
288 return 0;
289 }
11fdf7f2 290 ceph_assert(!parent_iter);
7c673cae
FG
291 if (header->parent) {
292 Header parent = map->lookup_parent(header);
293 if (!parent) {
294 ceph_abort();
295 return -EINVAL;
296 }
297 parent_iter = std::make_shared<DBObjectMapIteratorImpl>(map, parent);
298 }
299 key_iter = map->db->get_iterator(map->user_prefix(header));
11fdf7f2 300 ceph_assert(key_iter);
7c673cae 301 complete_iter = map->db->get_iterator(map->complete_prefix(header));
11fdf7f2 302 ceph_assert(complete_iter);
7c673cae 303 cur_iter = key_iter;
11fdf7f2 304 ceph_assert(cur_iter);
7c673cae
FG
305 ready = true;
306 return 0;
307}
308
309ObjectMap::ObjectMapIterator DBObjectMap::get_iterator(
310 const ghobject_t &oid)
311{
312 MapHeaderLock hl(this, oid);
313 Header header = lookup_map_header(hl, oid);
314 if (!header)
315 return ObjectMapIterator(new EmptyIteratorImpl());
316 DBObjectMapIterator iter = _get_iterator(header);
317 iter->hlock.swap(hl);
318 return iter;
319}
320
321int DBObjectMap::DBObjectMapIteratorImpl::seek_to_first()
322{
323 init();
324 r = 0;
325 if (parent_iter) {
326 r = parent_iter->seek_to_first();
327 if (r < 0)
328 return r;
329 }
330 r = key_iter->seek_to_first();
331 if (r < 0)
332 return r;
333 return adjust();
334}
335
336int DBObjectMap::DBObjectMapIteratorImpl::seek_to_last()
337{
338 init();
339 r = 0;
340 if (parent_iter) {
341 r = parent_iter->seek_to_last();
342 if (r < 0)
343 return r;
344 if (parent_iter->valid())
345 r = parent_iter->next();
346 if (r < 0)
347 return r;
348 }
349 r = key_iter->seek_to_last();
350 if (r < 0)
351 return r;
352 if (key_iter->valid())
353 r = key_iter->next();
354 if (r < 0)
355 return r;
356 return adjust();
357}
358
359int DBObjectMap::DBObjectMapIteratorImpl::lower_bound(const string &to)
360{
361 init();
362 r = 0;
363 if (parent_iter) {
364 r = parent_iter->lower_bound(to);
365 if (r < 0)
366 return r;
367 }
368 r = key_iter->lower_bound(to);
369 if (r < 0)
370 return r;
371 return adjust();
372}
373
374int DBObjectMap::DBObjectMapIteratorImpl::lower_bound_parent(const string &to)
375{
376 int r = lower_bound(to);
377 if (r < 0)
378 return r;
379 if (valid() && !on_parent())
380 return next_parent();
381 else
382 return r;
383}
384
385int DBObjectMap::DBObjectMapIteratorImpl::upper_bound(const string &after)
386{
387 init();
388 r = 0;
389 if (parent_iter) {
390 r = parent_iter->upper_bound(after);
391 if (r < 0)
392 return r;
393 }
394 r = key_iter->upper_bound(after);
395 if (r < 0)
396 return r;
397 return adjust();
398}
399
400bool DBObjectMap::DBObjectMapIteratorImpl::valid()
401{
402 bool valid = !invalid && ready;
11fdf7f2 403 ceph_assert(!valid || cur_iter->valid());
7c673cae
FG
404 return valid;
405}
406
407bool DBObjectMap::DBObjectMapIteratorImpl::valid_parent()
408{
409 if (parent_iter && parent_iter->valid() &&
410 (!key_iter->valid() || key_iter->key() > parent_iter->key()))
411 return true;
412 return false;
413}
414
11fdf7f2 415int DBObjectMap::DBObjectMapIteratorImpl::next()
7c673cae 416{
11fdf7f2
TL
417 ceph_assert(cur_iter->valid());
418 ceph_assert(valid());
7c673cae
FG
419 cur_iter->next();
420 return adjust();
421}
422
423int DBObjectMap::DBObjectMapIteratorImpl::next_parent()
424{
425 r = next();
426 if (r < 0)
427 return r;
428 while (parent_iter && parent_iter->valid() && !on_parent()) {
11fdf7f2 429 ceph_assert(valid());
7c673cae
FG
430 r = lower_bound(parent_iter->key());
431 if (r < 0)
432 return r;
433 }
434
435 if (!parent_iter || !parent_iter->valid()) {
436 invalid = true;
437 }
438 return 0;
439}
440
441int DBObjectMap::DBObjectMapIteratorImpl::in_complete_region(const string &to_test,
442 string *begin,
443 string *end)
444{
445 /* This is clumsy because one cannot call prev() on end(), nor can one
446 * test for == begin().
447 */
448 complete_iter->upper_bound(to_test);
449 if (complete_iter->valid()) {
450 complete_iter->prev();
451 if (!complete_iter->valid()) {
452 complete_iter->upper_bound(to_test);
453 return false;
454 }
455 } else {
456 complete_iter->seek_to_last();
457 if (!complete_iter->valid())
458 return false;
459 }
460
11fdf7f2
TL
461 ceph_assert(complete_iter->key() <= to_test);
462 ceph_assert(complete_iter->value().length() >= 1);
7c673cae
FG
463 string _end(complete_iter->value().c_str(),
464 complete_iter->value().length() - 1);
465 if (_end.empty() || _end > to_test) {
466 if (begin)
467 *begin = complete_iter->key();
468 if (end)
469 *end = _end;
470 return true;
471 } else {
472 complete_iter->next();
11fdf7f2 473 ceph_assert(!complete_iter->valid() || complete_iter->key() > to_test);
7c673cae
FG
474 return false;
475 }
476}
477
478/**
479 * Moves parent_iter to the next position both out of the complete_region and
480 * not equal to key_iter. Then, we set cur_iter to parent_iter if valid and
481 * less than key_iter and key_iter otherwise.
482 */
483int DBObjectMap::DBObjectMapIteratorImpl::adjust()
484{
485 string begin, end;
486 while (parent_iter && parent_iter->valid()) {
487 if (in_complete_region(parent_iter->key(), &begin, &end)) {
488 if (end.size() == 0) {
489 parent_iter->seek_to_last();
490 if (parent_iter->valid())
491 parent_iter->next();
492 } else
493 parent_iter->lower_bound(end);
494 } else if (key_iter->valid() && key_iter->key() == parent_iter->key()) {
495 parent_iter->next();
496 } else {
497 break;
498 }
499 }
500 if (valid_parent()) {
501 cur_iter = parent_iter;
502 } else if (key_iter->valid()) {
503 cur_iter = key_iter;
504 } else {
505 invalid = true;
506 }
11fdf7f2 507 ceph_assert(invalid || cur_iter->valid());
7c673cae
FG
508 return 0;
509}
510
511
512string DBObjectMap::DBObjectMapIteratorImpl::key()
513{
514 return cur_iter->key();
515}
516
517bufferlist DBObjectMap::DBObjectMapIteratorImpl::value()
518{
519 return cur_iter->value();
520}
521
522int DBObjectMap::DBObjectMapIteratorImpl::status()
523{
524 return r;
525}
526
527int DBObjectMap::set_keys(const ghobject_t &oid,
528 const map<string, bufferlist> &set,
529 const SequencerPosition *spos)
530{
531 KeyValueDB::Transaction t = db->get_transaction();
532 MapHeaderLock hl(this, oid);
533 Header header = lookup_create_map_header(hl, oid, t);
534 if (!header)
535 return -EINVAL;
536 if (check_spos(oid, header, spos))
537 return 0;
538
539 t->set(user_prefix(header), set);
540
541 return db->submit_transaction(t);
542}
543
544int DBObjectMap::set_header(const ghobject_t &oid,
545 const bufferlist &bl,
546 const SequencerPosition *spos)
547{
548 KeyValueDB::Transaction t = db->get_transaction();
549 MapHeaderLock hl(this, oid);
550 Header header = lookup_create_map_header(hl, oid, t);
551 if (!header)
552 return -EINVAL;
553 if (check_spos(oid, header, spos))
554 return 0;
555 _set_header(header, bl, t);
556 return db->submit_transaction(t);
557}
558
559void DBObjectMap::_set_header(Header header, const bufferlist &bl,
560 KeyValueDB::Transaction t)
561{
562 map<string, bufferlist> to_set;
563 to_set[USER_HEADER_KEY] = bl;
564 t->set(sys_prefix(header), to_set);
565}
566
567int DBObjectMap::get_header(const ghobject_t &oid,
568 bufferlist *bl)
569{
570 MapHeaderLock hl(this, oid);
571 Header header = lookup_map_header(hl, oid);
572 if (!header) {
573 return 0;
574 }
575 return _get_header(header, bl);
576}
577
578int DBObjectMap::_get_header(Header header,
579 bufferlist *bl)
580{
581 map<string, bufferlist> out;
582 while (true) {
583 out.clear();
584 set<string> to_get;
585 to_get.insert(USER_HEADER_KEY);
586 int r = db->get(sys_prefix(header), to_get, &out);
587 if (r == 0 && !out.empty())
588 break;
589 if (r < 0)
590 return r;
591 Header current(header);
592 if (!current->parent)
593 break;
594 header = lookup_parent(current);
595 }
596
597 if (!out.empty())
598 bl->swap(out.begin()->second);
599 return 0;
600}
601
602int DBObjectMap::clear(const ghobject_t &oid,
603 const SequencerPosition *spos)
604{
605 KeyValueDB::Transaction t = db->get_transaction();
606 MapHeaderLock hl(this, oid);
607 Header header = lookup_map_header(hl, oid);
608 if (!header)
609 return -ENOENT;
610 if (check_spos(oid, header, spos))
611 return 0;
612 remove_map_header(hl, oid, header, t);
11fdf7f2 613 ceph_assert(header->num_children > 0);
7c673cae
FG
614 header->num_children--;
615 int r = _clear(header, t);
616 if (r < 0)
617 return r;
618 return db->submit_transaction(t);
619}
620
621int DBObjectMap::_clear(Header header,
622 KeyValueDB::Transaction t)
623{
624 while (1) {
625 if (header->num_children) {
626 set_header(header, t);
627 break;
628 }
629 clear_header(header, t);
630 if (!header->parent)
631 break;
632 Header parent = lookup_parent(header);
633 if (!parent) {
634 return -EINVAL;
635 }
11fdf7f2 636 ceph_assert(parent->num_children > 0);
7c673cae
FG
637 parent->num_children--;
638 header.swap(parent);
639 }
640 return 0;
641}
642
643int DBObjectMap::copy_up_header(Header header,
644 KeyValueDB::Transaction t)
645{
646 bufferlist bl;
647 int r = _get_header(header, &bl);
648 if (r < 0)
649 return r;
650
651 _set_header(header, bl, t);
652 return 0;
653}
654
655int DBObjectMap::rm_keys(const ghobject_t &oid,
656 const set<string> &to_clear,
657 const SequencerPosition *spos)
658{
659 MapHeaderLock hl(this, oid);
660 Header header = lookup_map_header(hl, oid);
661 if (!header)
662 return -ENOENT;
663 KeyValueDB::Transaction t = db->get_transaction();
664 if (check_spos(oid, header, spos))
665 return 0;
666 t->rmkeys(user_prefix(header), to_clear);
667 if (!header->parent) {
668 return db->submit_transaction(t);
669 }
670
11fdf7f2 671 ceph_assert(state.legacy);
7c673cae
FG
672
673 {
674 // We only get here for legacy (v2) stores
675 // Copy up all keys from parent excluding to_clear
676 // and remove parent
677 // This eliminates a v2 format use of complete for this oid only
678 map<string, bufferlist> to_write;
679 ObjectMapIterator iter = _get_iterator(header);
680 for (iter->seek_to_first() ; iter->valid() ; iter->next()) {
681 if (iter->status())
682 return iter->status();
683 if (!to_clear.count(iter->key()))
684 to_write[iter->key()] = iter->value();
685 }
686 t->set(user_prefix(header), to_write);
687 } // destruct iter which has parent in_use
688
689 copy_up_header(header, t);
690 Header parent = lookup_parent(header);
691 if (!parent)
692 return -EINVAL;
693 parent->num_children--;
694 _clear(parent, t);
695 header->parent = 0;
696 set_map_header(hl, oid, *header, t);
697 t->rmkeys_by_prefix(complete_prefix(header));
698 return db->submit_transaction(t);
699}
700
701int DBObjectMap::clear_keys_header(const ghobject_t &oid,
702 const SequencerPosition *spos)
703{
704 KeyValueDB::Transaction t = db->get_transaction();
705 MapHeaderLock hl(this, oid);
706 Header header = lookup_map_header(hl, oid);
707 if (!header)
708 return -ENOENT;
709 if (check_spos(oid, header, spos))
710 return 0;
711
712 // save old attrs
713 KeyValueDB::Iterator iter = db->get_iterator(xattr_prefix(header));
714 if (!iter)
715 return -EINVAL;
716 map<string, bufferlist> attrs;
717 for (iter->seek_to_first(); !iter->status() && iter->valid(); iter->next())
718 attrs.insert(make_pair(iter->key(), iter->value()));
719 if (iter->status())
720 return iter->status();
721
722 // remove current header
723 remove_map_header(hl, oid, header, t);
11fdf7f2 724 ceph_assert(header->num_children > 0);
7c673cae
FG
725 header->num_children--;
726 int r = _clear(header, t);
727 if (r < 0)
728 return r;
729
730 // create new header
731 Header newheader = generate_new_header(oid, Header());
732 set_map_header(hl, oid, *newheader, t);
733 if (!attrs.empty())
734 t->set(xattr_prefix(newheader), attrs);
735 return db->submit_transaction(t);
736}
737
738int DBObjectMap::get(const ghobject_t &oid,
739 bufferlist *_header,
740 map<string, bufferlist> *out)
741{
742 MapHeaderLock hl(this, oid);
743 Header header = lookup_map_header(hl, oid);
744 if (!header)
745 return -ENOENT;
746 _get_header(header, _header);
747 ObjectMapIterator iter = _get_iterator(header);
748 for (iter->seek_to_first(); iter->valid(); iter->next()) {
749 if (iter->status())
750 return iter->status();
751 out->insert(make_pair(iter->key(), iter->value()));
752 }
753 return 0;
754}
755
756int DBObjectMap::get_keys(const ghobject_t &oid,
757 set<string> *keys)
758{
759 MapHeaderLock hl(this, oid);
760 Header header = lookup_map_header(hl, oid);
761 if (!header)
762 return -ENOENT;
763 ObjectMapIterator iter = _get_iterator(header);
764 for (iter->seek_to_first(); iter->valid(); iter->next()) {
765 if (iter->status())
766 return iter->status();
767 keys->insert(iter->key());
768 }
769 return 0;
770}
771
772int DBObjectMap::scan(Header header,
773 const set<string> &in_keys,
774 set<string> *out_keys,
775 map<string, bufferlist> *out_values)
776{
777 ObjectMapIterator db_iter = _get_iterator(header);
778 for (set<string>::const_iterator key_iter = in_keys.begin();
779 key_iter != in_keys.end();
780 ++key_iter) {
781 db_iter->lower_bound(*key_iter);
782 if (db_iter->status())
783 return db_iter->status();
784 if (db_iter->valid() && db_iter->key() == *key_iter) {
785 if (out_keys)
786 out_keys->insert(*key_iter);
787 if (out_values)
788 out_values->insert(make_pair(db_iter->key(), db_iter->value()));
789 }
790 }
791 return 0;
792}
793
794int DBObjectMap::get_values(const ghobject_t &oid,
795 const set<string> &keys,
796 map<string, bufferlist> *out)
797{
798 MapHeaderLock hl(this, oid);
799 Header header = lookup_map_header(hl, oid);
800 if (!header)
801 return -ENOENT;
802 return scan(header, keys, 0, out);
803}
804
805int DBObjectMap::check_keys(const ghobject_t &oid,
806 const set<string> &keys,
807 set<string> *out)
808{
809 MapHeaderLock hl(this, oid);
810 Header header = lookup_map_header(hl, oid);
811 if (!header)
812 return -ENOENT;
813 return scan(header, keys, out, 0);
814}
815
816int DBObjectMap::get_xattrs(const ghobject_t &oid,
817 const set<string> &to_get,
818 map<string, bufferlist> *out)
819{
820 MapHeaderLock hl(this, oid);
821 Header header = lookup_map_header(hl, oid);
822 if (!header)
823 return -ENOENT;
824 return db->get(xattr_prefix(header), to_get, out);
825}
826
827int DBObjectMap::get_all_xattrs(const ghobject_t &oid,
828 set<string> *out)
829{
830 MapHeaderLock hl(this, oid);
831 Header header = lookup_map_header(hl, oid);
832 if (!header)
833 return -ENOENT;
834 KeyValueDB::Iterator iter = db->get_iterator(xattr_prefix(header));
835 if (!iter)
836 return -EINVAL;
837 for (iter->seek_to_first(); !iter->status() && iter->valid(); iter->next())
838 out->insert(iter->key());
839 return iter->status();
840}
841
842int DBObjectMap::set_xattrs(const ghobject_t &oid,
843 const map<string, bufferlist> &to_set,
844 const SequencerPosition *spos)
845{
846 KeyValueDB::Transaction t = db->get_transaction();
847 MapHeaderLock hl(this, oid);
848 Header header = lookup_create_map_header(hl, oid, t);
849 if (!header)
850 return -EINVAL;
851 if (check_spos(oid, header, spos))
852 return 0;
853 t->set(xattr_prefix(header), to_set);
854 return db->submit_transaction(t);
855}
856
857int DBObjectMap::remove_xattrs(const ghobject_t &oid,
858 const set<string> &to_remove,
859 const SequencerPosition *spos)
860{
861 KeyValueDB::Transaction t = db->get_transaction();
862 MapHeaderLock hl(this, oid);
863 Header header = lookup_map_header(hl, oid);
864 if (!header)
865 return -ENOENT;
866 if (check_spos(oid, header, spos))
867 return 0;
868 t->rmkeys(xattr_prefix(header), to_remove);
869 return db->submit_transaction(t);
870}
871
872// ONLY USED FOR TESTING
873// Set version to 2 to avoid asserts
874int DBObjectMap::legacy_clone(const ghobject_t &oid,
875 const ghobject_t &target,
876 const SequencerPosition *spos)
877{
3efd9988 878 state.legacy = true;
7c673cae
FG
879
880 if (oid == target)
881 return 0;
882
883 MapHeaderLock _l1(this, std::min(oid, target));
884 MapHeaderLock _l2(this, std::max(oid, target));
885 MapHeaderLock *lsource, *ltarget;
886 if (oid > target) {
887 lsource = &_l2;
888 ltarget= &_l1;
889 } else {
890 lsource = &_l1;
891 ltarget= &_l2;
892 }
893
894 KeyValueDB::Transaction t = db->get_transaction();
895 {
896 Header destination = lookup_map_header(*ltarget, target);
897 if (destination) {
898 if (check_spos(target, destination, spos))
899 return 0;
900 destination->num_children--;
901 remove_map_header(*ltarget, target, destination, t);
902 _clear(destination, t);
903 }
904 }
905
906 Header parent = lookup_map_header(*lsource, oid);
907 if (!parent)
908 return db->submit_transaction(t);
909
910 Header source = generate_new_header(oid, parent);
911 Header destination = generate_new_header(target, parent);
912 if (spos)
913 destination->spos = *spos;
914
915 parent->num_children = 2;
916 set_header(parent, t);
917 set_map_header(*lsource, oid, *source, t);
918 set_map_header(*ltarget, target, *destination, t);
919
920 map<string, bufferlist> to_set;
921 KeyValueDB::Iterator xattr_iter = db->get_iterator(xattr_prefix(parent));
922 for (xattr_iter->seek_to_first();
923 xattr_iter->valid();
924 xattr_iter->next())
925 to_set.insert(make_pair(xattr_iter->key(), xattr_iter->value()));
926 t->set(xattr_prefix(source), to_set);
927 t->set(xattr_prefix(destination), to_set);
928 t->rmkeys_by_prefix(xattr_prefix(parent));
929 return db->submit_transaction(t);
930}
931
932int DBObjectMap::clone(const ghobject_t &oid,
933 const ghobject_t &target,
934 const SequencerPosition *spos)
935{
936 if (oid == target)
937 return 0;
938
939 MapHeaderLock _l1(this, std::min(oid, target));
940 MapHeaderLock _l2(this, std::max(oid, target));
941 MapHeaderLock *lsource, *ltarget;
942 if (oid > target) {
943 lsource = &_l2;
944 ltarget= &_l1;
945 } else {
946 lsource = &_l1;
947 ltarget= &_l2;
948 }
949
950 KeyValueDB::Transaction t = db->get_transaction();
951 {
952 Header destination = lookup_map_header(*ltarget, target);
953 if (destination) {
954 if (check_spos(target, destination, spos))
955 return 0;
956 destination->num_children--;
957 remove_map_header(*ltarget, target, destination, t);
958 _clear(destination, t);
959 }
960 }
961
962 Header source = lookup_map_header(*lsource, oid);
963 if (!source)
964 return db->submit_transaction(t);
965
966 Header destination = generate_new_header(target, Header());
967 if (spos)
968 destination->spos = *spos;
969
970 set_map_header(*ltarget, target, *destination, t);
971
972 bufferlist bl;
973 int r = _get_header(source, &bl);
974 if (r < 0)
975 return r;
976 _set_header(destination, bl, t);
977
978 map<string, bufferlist> to_set;
979 KeyValueDB::Iterator xattr_iter = db->get_iterator(xattr_prefix(source));
980 for (xattr_iter->seek_to_first();
981 xattr_iter->valid();
982 xattr_iter->next())
983 to_set.insert(make_pair(xattr_iter->key(), xattr_iter->value()));
984 t->set(xattr_prefix(destination), to_set);
985
986 map<string, bufferlist> to_write;
987 ObjectMapIterator iter = _get_iterator(source);
988 for (iter->seek_to_first() ; iter->valid() ; iter->next()) {
989 if (iter->status())
990 return iter->status();
991 to_write[iter->key()] = iter->value();
992 }
993 t->set(user_prefix(destination), to_write);
994
995 return db->submit_transaction(t);
996}
997
998int DBObjectMap::upgrade_to_v2()
999{
1000 dout(1) << __func__ << " start" << dendl;
1001 KeyValueDB::Iterator iter = db->get_iterator(HOBJECT_TO_SEQ);
1002 iter->seek_to_first();
1003 while (iter->valid()) {
1004 unsigned count = 0;
1005 KeyValueDB::Transaction t = db->get_transaction();
1006 set<string> remove;
1007 map<string, bufferlist> add;
1008 for (;
1009 iter->valid() && count < 300;
1010 iter->next()) {
1011 dout(20) << __func__ << " key is " << iter->key() << dendl;
1012 int r = is_buggy_ghobject_key_v1(cct, iter->key());
1013 if (r < 0) {
1014 derr << __func__ << " bad key '" << iter->key() << "'" << dendl;
1015 return r;
1016 }
1017 if (!r) {
1018 dout(20) << __func__ << " " << iter->key() << " ok" << dendl;
1019 continue;
1020 }
1021
1022 // decode header to get oid
1023 _Header hdr;
1024 bufferlist bl = iter->value();
11fdf7f2 1025 auto bliter = bl.cbegin();
7c673cae
FG
1026 hdr.decode(bliter);
1027
1028 string newkey(ghobject_key(hdr.oid));
1029 dout(20) << __func__ << " " << iter->key() << " -> " << newkey << dendl;
1030 add[newkey] = iter->value();
1031 remove.insert(iter->key());
1032 ++count;
1033 }
1034
1035 if (!remove.empty()) {
1036 dout(20) << __func__ << " updating " << remove.size() << " keys" << dendl;
1037 t->rmkeys(HOBJECT_TO_SEQ, remove);
1038 t->set(HOBJECT_TO_SEQ, add);
1039 int r = db->submit_transaction(t);
1040 if (r < 0)
1041 return r;
1042 }
1043 }
1044
1045 state.v = 2;
1046
3efd9988
FG
1047 set_state();
1048 return 0;
1049}
1050
1051void DBObjectMap::set_state()
1052{
9f95a23c 1053 std::lock_guard l{header_lock};
7c673cae
FG
1054 KeyValueDB::Transaction t = db->get_transaction();
1055 write_state(t);
3efd9988 1056 int ret = db->submit_transaction_sync(t);
11fdf7f2 1057 ceph_assert(ret == 0);
7c673cae 1058 dout(1) << __func__ << " done" << dendl;
3efd9988 1059 return;
7c673cae
FG
1060}
1061
3efd9988 1062int DBObjectMap::get_state()
7c673cae
FG
1063{
1064 map<string, bufferlist> result;
1065 set<string> to_get;
1066 to_get.insert(GLOBAL_STATE_KEY);
1067 int r = db->get(SYS_PREFIX, to_get, &result);
1068 if (r < 0)
1069 return r;
1070 if (!result.empty()) {
11fdf7f2 1071 auto bliter = result.begin()->second.cbegin();
7c673cae 1072 state.decode(bliter);
7c673cae
FG
1073 } else {
1074 // New store
3efd9988 1075 state.v = State::CUR_VERSION;
7c673cae 1076 state.seq = 1;
3efd9988
FG
1077 state.legacy = false;
1078 }
1079 return 0;
1080}
1081
1082int DBObjectMap::init(bool do_upgrade)
1083{
1084 int ret = get_state();
1085 if (ret < 0)
1086 return ret;
1087 if (state.v < 1) {
1088 dout(1) << "DBObjectMap is *very* old; upgrade to an older version first"
1089 << dendl;
1090 return -ENOTSUP;
1091 }
1092 if (state.v < 2) { // Needs upgrade
1093 if (!do_upgrade) {
1094 dout(1) << "DOBjbectMap requires an upgrade,"
1095 << " set filestore_update_to"
1096 << dendl;
1097 return -ENOTSUP;
1098 } else {
1099 int r = upgrade_to_v2();
1100 if (r < 0)
1101 return r;
1102 }
7c673cae
FG
1103 }
1104 ostringstream ss;
1105 int errors = check(ss, true);
1106 if (errors) {
1107 derr << ss.str() << dendl;
1108 if (errors > 0)
1109 return -EINVAL;
1110 }
1111 dout(20) << "(init)dbobjectmap: seq is " << state.seq << dendl;
1112 return 0;
1113}
1114
1115int DBObjectMap::sync(const ghobject_t *oid,
1116 const SequencerPosition *spos) {
1117 KeyValueDB::Transaction t = db->get_transaction();
1118 if (oid) {
11fdf7f2 1119 ceph_assert(spos);
7c673cae
FG
1120 MapHeaderLock hl(this, *oid);
1121 Header header = lookup_map_header(hl, *oid);
1122 if (header) {
1123 dout(10) << "oid: " << *oid << " setting spos to "
1124 << *spos << dendl;
1125 header->spos = *spos;
1126 set_map_header(hl, *oid, *header, t);
1127 }
1128 /* It may appear that this and the identical portion of the else
1129 * block can combined below, but in this block, the transaction
1130 * must be submitted under *both* the MapHeaderLock and the full
1131 * header_lock.
1132 *
1133 * See 2b63dd25fc1c73fa42e52e9ea4ab5a45dd9422a0 and bug 9891.
1134 */
9f95a23c 1135 std::lock_guard l{header_lock};
7c673cae
FG
1136 write_state(t);
1137 return db->submit_transaction_sync(t);
1138 } else {
9f95a23c 1139 std::lock_guard l{header_lock};
7c673cae
FG
1140 write_state(t);
1141 return db->submit_transaction_sync(t);
1142 }
1143}
1144
1145int DBObjectMap::write_state(KeyValueDB::Transaction _t) {
9f95a23c 1146 ceph_assert(ceph_mutex_is_locked_by_me(header_lock));
7c673cae
FG
1147 dout(20) << "dbobjectmap: seq is " << state.seq << dendl;
1148 KeyValueDB::Transaction t = _t ? _t : db->get_transaction();
1149 bufferlist bl;
1150 state.encode(bl);
1151 map<string, bufferlist> to_write;
1152 to_write[GLOBAL_STATE_KEY] = bl;
1153 t->set(SYS_PREFIX, to_write);
1154 return _t ? 0 : db->submit_transaction(t);
1155}
1156
1157
1158DBObjectMap::Header DBObjectMap::_lookup_map_header(
1159 const MapHeaderLock &l,
1160 const ghobject_t &oid)
1161{
11fdf7f2 1162 ceph_assert(l.get_locked() == oid);
7c673cae
FG
1163
1164 _Header *header = new _Header();
1165 {
9f95a23c 1166 std::lock_guard l{cache_lock};
7c673cae 1167 if (caches.lookup(oid, header)) {
11fdf7f2 1168 ceph_assert(!in_use.count(header->seq));
7c673cae
FG
1169 in_use.insert(header->seq);
1170 return Header(header, RemoveOnDelete(this));
1171 }
1172 }
1173
1174 bufferlist out;
1175 int r = db->get(HOBJECT_TO_SEQ, map_header_key(oid), &out);
1176 if (r < 0 || out.length()==0) {
1177 delete header;
1178 return Header();
1179 }
1180
1181 Header ret(header, RemoveOnDelete(this));
11fdf7f2 1182 auto iter = out.cbegin();
7c673cae
FG
1183 ret->decode(iter);
1184 {
9f95a23c 1185 std::lock_guard l{cache_lock};
7c673cae
FG
1186 caches.add(oid, *ret);
1187 }
1188
11fdf7f2 1189 ceph_assert(!in_use.count(header->seq));
7c673cae
FG
1190 in_use.insert(header->seq);
1191 return ret;
1192}
1193
1194DBObjectMap::Header DBObjectMap::_generate_new_header(const ghobject_t &oid,
1195 Header parent)
1196{
1197 Header header = Header(new _Header(), RemoveOnDelete(this));
1198 header->seq = state.seq++;
1199 if (parent) {
1200 header->parent = parent->seq;
1201 header->spos = parent->spos;
1202 }
1203 header->num_children = 1;
1204 header->oid = oid;
11fdf7f2 1205 ceph_assert(!in_use.count(header->seq));
7c673cae
FG
1206 in_use.insert(header->seq);
1207
1208 write_state();
1209 return header;
1210}
1211
1212DBObjectMap::Header DBObjectMap::lookup_parent(Header input)
1213{
9f95a23c
TL
1214 std::unique_lock l{header_lock};
1215 header_cond.wait(l, [&input, this] { return !in_use.count(input->parent); });
7c673cae
FG
1216 map<string, bufferlist> out;
1217 set<string> keys;
1218 keys.insert(HEADER_KEY);
1219
1220 dout(20) << "lookup_parent: parent " << input->parent
1221 << " for seq " << input->seq << dendl;
1222 int r = db->get(sys_parent_prefix(input), keys, &out);
1223 if (r < 0) {
1224 ceph_abort();
1225 return Header();
1226 }
1227 if (out.empty()) {
1228 ceph_abort();
1229 return Header();
1230 }
1231
1232 Header header = Header(new _Header(), RemoveOnDelete(this));
11fdf7f2 1233 auto iter = out.begin()->second.cbegin();
7c673cae 1234 header->decode(iter);
11fdf7f2 1235 ceph_assert(header->seq == input->parent);
7c673cae
FG
1236 dout(20) << "lookup_parent: parent seq is " << header->seq << " with parent "
1237 << header->parent << dendl;
1238 in_use.insert(header->seq);
1239 return header;
1240}
1241
1242DBObjectMap::Header DBObjectMap::lookup_create_map_header(
1243 const MapHeaderLock &hl,
1244 const ghobject_t &oid,
1245 KeyValueDB::Transaction t)
1246{
9f95a23c 1247 std::lock_guard l{header_lock};
7c673cae
FG
1248 Header header = _lookup_map_header(hl, oid);
1249 if (!header) {
1250 header = _generate_new_header(oid, Header());
1251 set_map_header(hl, oid, *header, t);
1252 }
1253 return header;
1254}
1255
1256void DBObjectMap::clear_header(Header header, KeyValueDB::Transaction t)
1257{
1258 dout(20) << "clear_header: clearing seq " << header->seq << dendl;
1259 t->rmkeys_by_prefix(user_prefix(header));
1260 t->rmkeys_by_prefix(sys_prefix(header));
3efd9988 1261 if (state.legacy)
7c673cae
FG
1262 t->rmkeys_by_prefix(complete_prefix(header)); // Needed when header.parent != 0
1263 t->rmkeys_by_prefix(xattr_prefix(header));
1264 set<string> keys;
1265 keys.insert(header_key(header->seq));
1266 t->rmkeys(USER_PREFIX, keys);
1267}
1268
1269void DBObjectMap::set_header(Header header, KeyValueDB::Transaction t)
1270{
1271 dout(20) << "set_header: setting seq " << header->seq << dendl;
1272 map<string, bufferlist> to_write;
1273 header->encode(to_write[HEADER_KEY]);
1274 t->set(sys_prefix(header), to_write);
1275}
1276
1277void DBObjectMap::remove_map_header(
1278 const MapHeaderLock &l,
1279 const ghobject_t &oid,
1280 Header header,
1281 KeyValueDB::Transaction t)
1282{
11fdf7f2 1283 ceph_assert(l.get_locked() == oid);
7c673cae
FG
1284 dout(20) << "remove_map_header: removing " << header->seq
1285 << " oid " << oid << dendl;
1286 set<string> to_remove;
1287 to_remove.insert(map_header_key(oid));
1288 t->rmkeys(HOBJECT_TO_SEQ, to_remove);
1289 {
9f95a23c 1290 std::lock_guard l{cache_lock};
7c673cae
FG
1291 caches.clear(oid);
1292 }
1293}
1294
1295void DBObjectMap::set_map_header(
1296 const MapHeaderLock &l,
1297 const ghobject_t &oid, _Header header,
1298 KeyValueDB::Transaction t)
1299{
11fdf7f2 1300 ceph_assert(l.get_locked() == oid);
7c673cae
FG
1301 dout(20) << "set_map_header: setting " << header.seq
1302 << " oid " << oid << " parent seq "
1303 << header.parent << dendl;
1304 map<string, bufferlist> to_set;
1305 header.encode(to_set[map_header_key(oid)]);
1306 t->set(HOBJECT_TO_SEQ, to_set);
1307 {
9f95a23c 1308 std::lock_guard l{cache_lock};
7c673cae
FG
1309 caches.add(oid, header);
1310 }
1311}
1312
1313bool DBObjectMap::check_spos(const ghobject_t &oid,
1314 Header header,
1315 const SequencerPosition *spos)
1316{
1317 if (!spos || *spos > header->spos) {
1318 stringstream out;
1319 if (spos)
1320 dout(10) << "oid: " << oid << " not skipping op, *spos "
1321 << *spos << dendl;
1322 else
1323 dout(10) << "oid: " << oid << " not skipping op, *spos "
1324 << "empty" << dendl;
1325 dout(10) << " > header.spos " << header->spos << dendl;
1326 return false;
1327 } else {
1328 dout(10) << "oid: " << oid << " skipping op, *spos " << *spos
1329 << " <= header.spos " << header->spos << dendl;
1330 return true;
1331 }
1332}
1333
1334int DBObjectMap::list_objects(vector<ghobject_t> *out)
1335{
1336 KeyValueDB::Iterator iter = db->get_iterator(HOBJECT_TO_SEQ);
1337 for (iter->seek_to_first(); iter->valid(); iter->next()) {
1338 bufferlist bl = iter->value();
11fdf7f2 1339 auto bliter = bl.cbegin();
7c673cae
FG
1340 _Header header;
1341 header.decode(bliter);
1342 out->push_back(header.oid);
1343 }
1344 return 0;
1345}
1346
1347int DBObjectMap::list_object_headers(vector<_Header> *out)
1348{
1349 int error = 0;
1350 KeyValueDB::Iterator iter = db->get_iterator(HOBJECT_TO_SEQ);
1351 for (iter->seek_to_first(); iter->valid(); iter->next()) {
1352 bufferlist bl = iter->value();
11fdf7f2 1353 auto bliter = bl.cbegin();
7c673cae
FG
1354 _Header header;
1355 header.decode(bliter);
1356 out->push_back(header);
1357 while (header.parent) {
1358 set<string> to_get;
1359 map<string, bufferlist> got;
1360 to_get.insert(HEADER_KEY);
1361 db->get(sys_parent_prefix(header), to_get, &got);
1362 if (got.empty()) {
1363 dout(0) << "Missing: seq " << header.parent << dendl;
1364 error = -ENOENT;
1365 break;
1366 } else {
1367 bl = got.begin()->second;
11fdf7f2 1368 auto bliter = bl.cbegin();
7c673cae
FG
1369 header.decode(bliter);
1370 out->push_back(header);
1371 }
1372 }
1373 }
1374 return error;
1375}
1376
1377ostream& operator<<(ostream& out, const DBObjectMap::_Header& h)
1378{
1379 out << "seq=" << h.seq << " parent=" << h.parent
1380 << " num_children=" << h.num_children
1381 << " ghobject=" << h.oid;
1382 return out;
1383}
1384
1385int DBObjectMap::rename(const ghobject_t &from,
1386 const ghobject_t &to,
1387 const SequencerPosition *spos)
1388{
1389 if (from == to)
1390 return 0;
1391
1392 MapHeaderLock _l1(this, std::min(from, to));
1393 MapHeaderLock _l2(this, std::max(from, to));
1394 MapHeaderLock *lsource, *ltarget;
1395 if (from > to) {
1396 lsource = &_l2;
1397 ltarget= &_l1;
1398 } else {
1399 lsource = &_l1;
1400 ltarget= &_l2;
1401 }
1402
1403 KeyValueDB::Transaction t = db->get_transaction();
1404 {
1405 Header destination = lookup_map_header(*ltarget, to);
1406 if (destination) {
1407 if (check_spos(to, destination, spos))
1408 return 0;
1409 destination->num_children--;
1410 remove_map_header(*ltarget, to, destination, t);
1411 _clear(destination, t);
1412 }
1413 }
1414
1415 Header hdr = lookup_map_header(*lsource, from);
1416 if (!hdr)
1417 return db->submit_transaction(t);
1418
1419 remove_map_header(*lsource, from, hdr, t);
1420 hdr->oid = to;
1421 set_map_header(*ltarget, to, *hdr, t);
1422
1423 return db->submit_transaction(t);
1424}