]> git.proxmox.com Git - ceph.git/blob - ceph/src/os/DBObjectMap.cc
update ceph source to reef 18.1.2
[ceph.git] / ceph / src / os / DBObjectMap.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2
3 #include "include/int_types.h"
4 #include "include/buffer.h"
5
6 #include <iostream>
7 #include <set>
8 #include <map>
9 #include <string>
10 #include <vector>
11
12 #include "os/ObjectMap.h"
13 #include "kv/KeyValueDB.h"
14 #include "DBObjectMap.h"
15 #include <errno.h>
16
17 #include "common/debug.h"
18 #include "common/config.h"
19 #include "include/ceph_assert.h"
20
21 #define dout_context cct
22 #define dout_subsys ceph_subsys_filestore
23 #undef dout_prefix
24 #define dout_prefix *_dout << "filestore "
25
26 using std::map;
27 using std::ostream;
28 using std::ostringstream;
29 using std::set;
30 using std::string;
31 using std::stringstream;
32 using std::vector;
33
34 using ceph::bufferlist;
35
36 const string DBObjectMap::USER_PREFIX = "_USER_";
37 const string DBObjectMap::XATTR_PREFIX = "_AXATTR_";
38 const string DBObjectMap::SYS_PREFIX = "_SYS_";
39 const string DBObjectMap::COMPLETE_PREFIX = "_COMPLETE_";
40 const string DBObjectMap::HEADER_KEY = "HEADER";
41 const string DBObjectMap::USER_HEADER_KEY = "USER_HEADER";
42 const string DBObjectMap::GLOBAL_STATE_KEY = "HEADER";
43 const string DBObjectMap::HOBJECT_TO_SEQ = "_HOBJTOSEQ_";
44
45 // Legacy
46 const string DBObjectMap::LEAF_PREFIX = "_LEAF_";
47 const string DBObjectMap::REVERSE_LEAF_PREFIX = "_REVLEAF_";
48
49 static void append_escaped(const string &in, string *out)
50 {
51 for (string::const_iterator i = in.begin(); i != in.end(); ++i) {
52 if (*i == '%') {
53 out->push_back('%');
54 out->push_back('p');
55 } else if (*i == '.') {
56 out->push_back('%');
57 out->push_back('e');
58 } else if (*i == '_') {
59 out->push_back('%');
60 out->push_back('u');
61 } else {
62 out->push_back(*i);
63 }
64 }
65 }
66
67 int DBObjectMap::check(std::ostream &out, bool repair, bool force)
68 {
69 int errors = 0, comp_errors = 0;
70 bool repaired = false;
71 map<uint64_t, uint64_t> parent_to_num_children;
72 map<uint64_t, uint64_t> parent_to_actual_num_children;
73 KeyValueDB::Iterator iter = db->get_iterator(HOBJECT_TO_SEQ);
74 for (iter->seek_to_first(); iter->valid(); iter->next()) {
75 _Header header;
76 bufferlist bl = iter->value();
77 while (true) {
78 auto bliter = bl.cbegin();
79 header.decode(bliter);
80 if (header.seq != 0)
81 parent_to_actual_num_children[header.seq] = header.num_children;
82
83 if (state.v == 2 || force) {
84 // Check complete table
85 bool complete_error = false;
86 boost::optional<string> prev;
87 KeyValueDB::Iterator complete_iter = db->get_iterator(USER_PREFIX + header_key(header.seq) + COMPLETE_PREFIX);
88 for (complete_iter->seek_to_first(); complete_iter->valid();
89 complete_iter->next()) {
90 if (prev && prev >= complete_iter->key()) {
91 out << "Bad complete for " << header.oid << std::endl;
92 complete_error = true;
93 break;
94 }
95 prev = string(complete_iter->value().c_str(), complete_iter->value().length() - 1);
96 }
97 if (complete_error) {
98 out << "Complete mapping for " << header.seq << " :" << std::endl;
99 for (complete_iter->seek_to_first(); complete_iter->valid();
100 complete_iter->next()) {
101 out << complete_iter->key() << " -> " << string(complete_iter->value().c_str(), complete_iter->value().length() - 1) << std::endl;
102 }
103 if (repair) {
104 repaired = true;
105 KeyValueDB::Transaction t = db->get_transaction();
106 t->rmkeys_by_prefix(USER_PREFIX + header_key(header.seq) + COMPLETE_PREFIX);
107 db->submit_transaction(t);
108 out << "Cleared complete mapping to repair" << std::endl;
109 } else {
110 errors++; // Only count when not repaired
111 comp_errors++; // Track errors here for version update
112 }
113 }
114 }
115
116 if (header.parent == 0)
117 break;
118
119 if (!parent_to_num_children.count(header.parent))
120 parent_to_num_children[header.parent] = 0;
121 parent_to_num_children[header.parent]++;
122 if (parent_to_actual_num_children.count(header.parent))
123 break;
124
125 set<string> to_get;
126 map<string, bufferlist> got;
127 to_get.insert(HEADER_KEY);
128 db->get(sys_parent_prefix(header), to_get, &got);
129 if (got.empty()) {
130 out << "Missing: seq " << header.parent << std::endl;
131 errors++;
132 break;
133 } else {
134 bl = got.begin()->second;
135 }
136 }
137 }
138
139 for (map<uint64_t, uint64_t>::iterator i = parent_to_num_children.begin();
140 i != parent_to_num_children.end();
141 parent_to_num_children.erase(i++)) {
142 if (!parent_to_actual_num_children.count(i->first))
143 continue;
144 if (parent_to_actual_num_children[i->first] != i->second) {
145 out << "Invalid: seq " << i->first << " recorded children: "
146 << parent_to_actual_num_children[i->first] << " found: "
147 << i->second << std::endl;
148 errors++;
149 }
150 parent_to_actual_num_children.erase(i->first);
151 }
152
153 // Only advance the version from 2 to 3 here
154 // Mark as legacy because there are still older structures
155 // we don't update. The value of legacy is only used
156 // for internal assertions.
157 if (comp_errors == 0 && state.v == 2 && repair) {
158 state.v = 3;
159 state.legacy = true;
160 set_state();
161 }
162
163 if (errors == 0 && repaired)
164 return -1;
165 return errors;
166 }
167
168 string DBObjectMap::ghobject_key(const ghobject_t &oid)
169 {
170 string out;
171 append_escaped(oid.hobj.oid.name, &out);
172 out.push_back('.');
173 append_escaped(oid.hobj.get_key(), &out);
174 out.push_back('.');
175 append_escaped(oid.hobj.nspace, &out);
176 out.push_back('.');
177
178 char snap_with_hash[1000];
179 char *t = snap_with_hash;
180 char *end = t + sizeof(snap_with_hash);
181 if (oid.hobj.snap == CEPH_NOSNAP)
182 t += snprintf(t, end - t, "head");
183 else if (oid.hobj.snap == CEPH_SNAPDIR)
184 t += snprintf(t, end - t, "snapdir");
185 else
186 t += snprintf(t, end - t, "%llx", (long long unsigned)oid.hobj.snap);
187
188 if (oid.hobj.pool == -1)
189 t += snprintf(t, end - t, ".none");
190 else
191 t += snprintf(t, end - t, ".%llx", (long long unsigned)oid.hobj.pool);
192 t += snprintf(t, end - t, ".%.*X", (int)(sizeof(uint32_t)*2), oid.hobj.get_hash());
193
194 if (oid.generation != ghobject_t::NO_GEN ||
195 oid.shard_id != shard_id_t::NO_SHARD) {
196 t += snprintf(t, end - t, ".%llx", (long long unsigned)oid.generation);
197 t += snprintf(t, end - t, ".%x", (int)oid.shard_id);
198 }
199 out += string(snap_with_hash);
200 return out;
201 }
202
203 // ok: pglog%u3%efs1...0.none.0017B237
204 // bad: plana8923501-10...4c.3.ffffffffffffffff.2
205 // fixed: plana8923501-10...4c.3.CB767F2D.ffffffffffffffff.2
206 // returns 0 for false, 1 for true, negative for error
207 int DBObjectMap::is_buggy_ghobject_key_v1(CephContext* cct,
208 const string &in)
209 {
210 int dots = 5; // skip 5 .'s
211 const char *s = in.c_str();
212 do {
213 while (*s && *s != '.')
214 ++s;
215 if (!*s) {
216 derr << "unexpected null at " << (int)(s-in.c_str()) << dendl;
217 return -EINVAL;
218 }
219 ++s;
220 } while (*s && --dots);
221 if (!*s) {
222 derr << "unexpected null at " << (int)(s-in.c_str()) << dendl;
223 return -EINVAL;
224 }
225 // we are now either at a hash value (32 bits, 8 chars) or a generation
226 // value (64 bits) '.' and shard id. count the dots!
227 int len = 0;
228 while (*s && *s != '.') {
229 ++s;
230 ++len;
231 }
232 if (*s == '\0') {
233 if (len != 8) {
234 derr << "hash value is not 8 chars" << dendl;
235 return -EINVAL; // the hash value is always 8 chars.
236 }
237 return 0;
238 }
239 if (*s != '.') { // the shard follows.
240 derr << "missing final . and shard id at " << (int)(s-in.c_str()) << dendl;
241 return -EINVAL;
242 }
243 return 1;
244 }
245
246
247 string DBObjectMap::map_header_key(const ghobject_t &oid)
248 {
249 return ghobject_key(oid);
250 }
251
252 string DBObjectMap::header_key(uint64_t seq)
253 {
254 char buf[100];
255 snprintf(buf, sizeof(buf), "%.*" PRId64, (int)(2*sizeof(seq)), seq);
256 return string(buf);
257 }
258
259 string DBObjectMap::complete_prefix(Header header)
260 {
261 return USER_PREFIX + header_key(header->seq) + COMPLETE_PREFIX;
262 }
263
264 string DBObjectMap::user_prefix(Header header)
265 {
266 return USER_PREFIX + header_key(header->seq) + USER_PREFIX;
267 }
268
269 string DBObjectMap::sys_prefix(Header header)
270 {
271 return USER_PREFIX + header_key(header->seq) + SYS_PREFIX;
272 }
273
274 string DBObjectMap::xattr_prefix(Header header)
275 {
276 return USER_PREFIX + header_key(header->seq) + XATTR_PREFIX;
277 }
278
279 string DBObjectMap::sys_parent_prefix(_Header header)
280 {
281 return USER_PREFIX + header_key(header.parent) + SYS_PREFIX;
282 }
283
284 int DBObjectMap::DBObjectMapIteratorImpl::init()
285 {
286 invalid = false;
287 if (ready) {
288 return 0;
289 }
290 ceph_assert(!parent_iter);
291 if (header->parent) {
292 Header parent = map->lookup_parent(header);
293 if (!parent) {
294 ceph_abort();
295 return -EINVAL;
296 }
297 parent_iter = std::make_shared<DBObjectMapIteratorImpl>(map, parent);
298 }
299 key_iter = map->db->get_iterator(map->user_prefix(header));
300 ceph_assert(key_iter);
301 complete_iter = map->db->get_iterator(map->complete_prefix(header));
302 ceph_assert(complete_iter);
303 cur_iter = key_iter;
304 ceph_assert(cur_iter);
305 ready = true;
306 return 0;
307 }
308
309 ObjectMap::ObjectMapIterator DBObjectMap::get_iterator(
310 const ghobject_t &oid)
311 {
312 MapHeaderLock hl(this, oid);
313 Header header = lookup_map_header(hl, oid);
314 if (!header)
315 return ObjectMapIterator(new EmptyIteratorImpl());
316 DBObjectMapIterator iter = _get_iterator(header);
317 iter->hlock.swap(hl);
318 return iter;
319 }
320
321 int DBObjectMap::DBObjectMapIteratorImpl::seek_to_first()
322 {
323 init();
324 r = 0;
325 if (parent_iter) {
326 r = parent_iter->seek_to_first();
327 if (r < 0)
328 return r;
329 }
330 r = key_iter->seek_to_first();
331 if (r < 0)
332 return r;
333 return adjust();
334 }
335
336 int DBObjectMap::DBObjectMapIteratorImpl::seek_to_last()
337 {
338 init();
339 r = 0;
340 if (parent_iter) {
341 r = parent_iter->seek_to_last();
342 if (r < 0)
343 return r;
344 if (parent_iter->valid())
345 r = parent_iter->next();
346 if (r < 0)
347 return r;
348 }
349 r = key_iter->seek_to_last();
350 if (r < 0)
351 return r;
352 if (key_iter->valid())
353 r = key_iter->next();
354 if (r < 0)
355 return r;
356 return adjust();
357 }
358
359 int DBObjectMap::DBObjectMapIteratorImpl::lower_bound(const string &to)
360 {
361 init();
362 r = 0;
363 if (parent_iter) {
364 r = parent_iter->lower_bound(to);
365 if (r < 0)
366 return r;
367 }
368 r = key_iter->lower_bound(to);
369 if (r < 0)
370 return r;
371 return adjust();
372 }
373
374 int DBObjectMap::DBObjectMapIteratorImpl::lower_bound_parent(const string &to)
375 {
376 int r = lower_bound(to);
377 if (r < 0)
378 return r;
379 if (valid() && !on_parent())
380 return next_parent();
381 else
382 return r;
383 }
384
385 int DBObjectMap::DBObjectMapIteratorImpl::upper_bound(const string &after)
386 {
387 init();
388 r = 0;
389 if (parent_iter) {
390 r = parent_iter->upper_bound(after);
391 if (r < 0)
392 return r;
393 }
394 r = key_iter->upper_bound(after);
395 if (r < 0)
396 return r;
397 return adjust();
398 }
399
400 bool DBObjectMap::DBObjectMapIteratorImpl::valid()
401 {
402 bool valid = !invalid && ready;
403 ceph_assert(!valid || cur_iter->valid());
404 return valid;
405 }
406
407 bool DBObjectMap::DBObjectMapIteratorImpl::valid_parent()
408 {
409 if (parent_iter && parent_iter->valid() &&
410 (!key_iter->valid() || key_iter->key() > parent_iter->key()))
411 return true;
412 return false;
413 }
414
415 int DBObjectMap::DBObjectMapIteratorImpl::next()
416 {
417 ceph_assert(cur_iter->valid());
418 ceph_assert(valid());
419 cur_iter->next();
420 return adjust();
421 }
422
423 int DBObjectMap::DBObjectMapIteratorImpl::next_parent()
424 {
425 r = next();
426 if (r < 0)
427 return r;
428 while (parent_iter && parent_iter->valid() && !on_parent()) {
429 ceph_assert(valid());
430 r = lower_bound(parent_iter->key());
431 if (r < 0)
432 return r;
433 }
434
435 if (!parent_iter || !parent_iter->valid()) {
436 invalid = true;
437 }
438 return 0;
439 }
440
441 int DBObjectMap::DBObjectMapIteratorImpl::in_complete_region(const string &to_test,
442 string *begin,
443 string *end)
444 {
445 /* This is clumsy because one cannot call prev() on end(), nor can one
446 * test for == begin().
447 */
448 complete_iter->upper_bound(to_test);
449 if (complete_iter->valid()) {
450 complete_iter->prev();
451 if (!complete_iter->valid()) {
452 complete_iter->upper_bound(to_test);
453 return false;
454 }
455 } else {
456 complete_iter->seek_to_last();
457 if (!complete_iter->valid())
458 return false;
459 }
460
461 ceph_assert(complete_iter->key() <= to_test);
462 ceph_assert(complete_iter->value().length() >= 1);
463 string _end(complete_iter->value().c_str(),
464 complete_iter->value().length() - 1);
465 if (_end.empty() || _end > to_test) {
466 if (begin)
467 *begin = complete_iter->key();
468 if (end)
469 *end = _end;
470 return true;
471 } else {
472 complete_iter->next();
473 ceph_assert(!complete_iter->valid() || complete_iter->key() > to_test);
474 return false;
475 }
476 }
477
478 /**
479 * Moves parent_iter to the next position both out of the complete_region and
480 * not equal to key_iter. Then, we set cur_iter to parent_iter if valid and
481 * less than key_iter and key_iter otherwise.
482 */
483 int DBObjectMap::DBObjectMapIteratorImpl::adjust()
484 {
485 string begin, end;
486 while (parent_iter && parent_iter->valid()) {
487 if (in_complete_region(parent_iter->key(), &begin, &end)) {
488 if (end.size() == 0) {
489 parent_iter->seek_to_last();
490 if (parent_iter->valid())
491 parent_iter->next();
492 } else
493 parent_iter->lower_bound(end);
494 } else if (key_iter->valid() && key_iter->key() == parent_iter->key()) {
495 parent_iter->next();
496 } else {
497 break;
498 }
499 }
500 if (valid_parent()) {
501 cur_iter = parent_iter;
502 } else if (key_iter->valid()) {
503 cur_iter = key_iter;
504 } else {
505 invalid = true;
506 }
507 ceph_assert(invalid || cur_iter->valid());
508 return 0;
509 }
510
511
512 string DBObjectMap::DBObjectMapIteratorImpl::key()
513 {
514 return cur_iter->key();
515 }
516
517 bufferlist DBObjectMap::DBObjectMapIteratorImpl::value()
518 {
519 return cur_iter->value();
520 }
521
522 int DBObjectMap::DBObjectMapIteratorImpl::status()
523 {
524 return r;
525 }
526
527 int DBObjectMap::set_keys(const ghobject_t &oid,
528 const map<string, bufferlist> &set,
529 const SequencerPosition *spos)
530 {
531 KeyValueDB::Transaction t = db->get_transaction();
532 MapHeaderLock hl(this, oid);
533 Header header = lookup_create_map_header(hl, oid, t);
534 if (!header)
535 return -EINVAL;
536 if (check_spos(oid, header, spos))
537 return 0;
538
539 t->set(user_prefix(header), set);
540
541 return db->submit_transaction(t);
542 }
543
544 int DBObjectMap::set_header(const ghobject_t &oid,
545 const bufferlist &bl,
546 const SequencerPosition *spos)
547 {
548 KeyValueDB::Transaction t = db->get_transaction();
549 MapHeaderLock hl(this, oid);
550 Header header = lookup_create_map_header(hl, oid, t);
551 if (!header)
552 return -EINVAL;
553 if (check_spos(oid, header, spos))
554 return 0;
555 _set_header(header, bl, t);
556 return db->submit_transaction(t);
557 }
558
559 void DBObjectMap::_set_header(Header header, const bufferlist &bl,
560 KeyValueDB::Transaction t)
561 {
562 map<string, bufferlist> to_set;
563 to_set[USER_HEADER_KEY] = bl;
564 t->set(sys_prefix(header), to_set);
565 }
566
567 int DBObjectMap::get_header(const ghobject_t &oid,
568 bufferlist *bl)
569 {
570 MapHeaderLock hl(this, oid);
571 Header header = lookup_map_header(hl, oid);
572 if (!header) {
573 return 0;
574 }
575 return _get_header(header, bl);
576 }
577
578 int DBObjectMap::_get_header(Header header,
579 bufferlist *bl)
580 {
581 map<string, bufferlist> out;
582 while (true) {
583 out.clear();
584 set<string> to_get;
585 to_get.insert(USER_HEADER_KEY);
586 int r = db->get(sys_prefix(header), to_get, &out);
587 if (r == 0 && !out.empty())
588 break;
589 if (r < 0)
590 return r;
591 Header current(header);
592 if (!current->parent)
593 break;
594 header = lookup_parent(current);
595 }
596
597 if (!out.empty())
598 bl->swap(out.begin()->second);
599 return 0;
600 }
601
602 int DBObjectMap::clear(const ghobject_t &oid,
603 const SequencerPosition *spos)
604 {
605 KeyValueDB::Transaction t = db->get_transaction();
606 MapHeaderLock hl(this, oid);
607 Header header = lookup_map_header(hl, oid);
608 if (!header)
609 return -ENOENT;
610 if (check_spos(oid, header, spos))
611 return 0;
612 remove_map_header(hl, oid, header, t);
613 ceph_assert(header->num_children > 0);
614 header->num_children--;
615 int r = _clear(header, t);
616 if (r < 0)
617 return r;
618 return db->submit_transaction(t);
619 }
620
621 int DBObjectMap::_clear(Header header,
622 KeyValueDB::Transaction t)
623 {
624 while (1) {
625 if (header->num_children) {
626 set_header(header, t);
627 break;
628 }
629 clear_header(header, t);
630 if (!header->parent)
631 break;
632 Header parent = lookup_parent(header);
633 if (!parent) {
634 return -EINVAL;
635 }
636 ceph_assert(parent->num_children > 0);
637 parent->num_children--;
638 header.swap(parent);
639 }
640 return 0;
641 }
642
643 int DBObjectMap::copy_up_header(Header header,
644 KeyValueDB::Transaction t)
645 {
646 bufferlist bl;
647 int r = _get_header(header, &bl);
648 if (r < 0)
649 return r;
650
651 _set_header(header, bl, t);
652 return 0;
653 }
654
655 int DBObjectMap::rm_keys(const ghobject_t &oid,
656 const set<string> &to_clear,
657 const SequencerPosition *spos)
658 {
659 MapHeaderLock hl(this, oid);
660 Header header = lookup_map_header(hl, oid);
661 if (!header)
662 return -ENOENT;
663 KeyValueDB::Transaction t = db->get_transaction();
664 if (check_spos(oid, header, spos))
665 return 0;
666 t->rmkeys(user_prefix(header), to_clear);
667 if (!header->parent) {
668 return db->submit_transaction(t);
669 }
670
671 ceph_assert(state.legacy);
672
673 {
674 // We only get here for legacy (v2) stores
675 // Copy up all keys from parent excluding to_clear
676 // and remove parent
677 // This eliminates a v2 format use of complete for this oid only
678 map<string, bufferlist> to_write;
679 ObjectMapIterator iter = _get_iterator(header);
680 for (iter->seek_to_first() ; iter->valid() ; iter->next()) {
681 if (iter->status())
682 return iter->status();
683 if (!to_clear.count(iter->key()))
684 to_write[iter->key()] = iter->value();
685 }
686 t->set(user_prefix(header), to_write);
687 } // destruct iter which has parent in_use
688
689 copy_up_header(header, t);
690 Header parent = lookup_parent(header);
691 if (!parent)
692 return -EINVAL;
693 parent->num_children--;
694 _clear(parent, t);
695 header->parent = 0;
696 set_map_header(hl, oid, *header, t);
697 t->rmkeys_by_prefix(complete_prefix(header));
698 return db->submit_transaction(t);
699 }
700
701 int DBObjectMap::clear_keys_header(const ghobject_t &oid,
702 const SequencerPosition *spos)
703 {
704 KeyValueDB::Transaction t = db->get_transaction();
705 MapHeaderLock hl(this, oid);
706 Header header = lookup_map_header(hl, oid);
707 if (!header)
708 return -ENOENT;
709 if (check_spos(oid, header, spos))
710 return 0;
711
712 // save old attrs
713 KeyValueDB::Iterator iter = db->get_iterator(xattr_prefix(header));
714 if (!iter)
715 return -EINVAL;
716 map<string, bufferlist> attrs;
717 for (iter->seek_to_first(); !iter->status() && iter->valid(); iter->next())
718 attrs.insert(make_pair(iter->key(), iter->value()));
719 if (iter->status())
720 return iter->status();
721
722 // remove current header
723 remove_map_header(hl, oid, header, t);
724 ceph_assert(header->num_children > 0);
725 header->num_children--;
726 int r = _clear(header, t);
727 if (r < 0)
728 return r;
729
730 // create new header
731 Header newheader = generate_new_header(oid, Header());
732 set_map_header(hl, oid, *newheader, t);
733 if (!attrs.empty())
734 t->set(xattr_prefix(newheader), attrs);
735 return db->submit_transaction(t);
736 }
737
738 int DBObjectMap::get(const ghobject_t &oid,
739 bufferlist *_header,
740 map<string, bufferlist> *out)
741 {
742 MapHeaderLock hl(this, oid);
743 Header header = lookup_map_header(hl, oid);
744 if (!header)
745 return -ENOENT;
746 _get_header(header, _header);
747 ObjectMapIterator iter = _get_iterator(header);
748 for (iter->seek_to_first(); iter->valid(); iter->next()) {
749 if (iter->status())
750 return iter->status();
751 out->insert(make_pair(iter->key(), iter->value()));
752 }
753 return 0;
754 }
755
756 int DBObjectMap::get_keys(const ghobject_t &oid,
757 set<string> *keys)
758 {
759 MapHeaderLock hl(this, oid);
760 Header header = lookup_map_header(hl, oid);
761 if (!header)
762 return -ENOENT;
763 ObjectMapIterator iter = _get_iterator(header);
764 for (iter->seek_to_first(); iter->valid(); iter->next()) {
765 if (iter->status())
766 return iter->status();
767 keys->insert(iter->key());
768 }
769 return 0;
770 }
771
772 int DBObjectMap::scan(Header header,
773 const set<string> &in_keys,
774 set<string> *out_keys,
775 map<string, bufferlist> *out_values)
776 {
777 ObjectMapIterator db_iter = _get_iterator(header);
778 for (set<string>::const_iterator key_iter = in_keys.begin();
779 key_iter != in_keys.end();
780 ++key_iter) {
781 db_iter->lower_bound(*key_iter);
782 if (db_iter->status())
783 return db_iter->status();
784 if (db_iter->valid() && db_iter->key() == *key_iter) {
785 if (out_keys)
786 out_keys->insert(*key_iter);
787 if (out_values)
788 out_values->insert(make_pair(db_iter->key(), db_iter->value()));
789 }
790 }
791 return 0;
792 }
793
794 int DBObjectMap::get_values(const ghobject_t &oid,
795 const set<string> &keys,
796 map<string, bufferlist> *out)
797 {
798 MapHeaderLock hl(this, oid);
799 Header header = lookup_map_header(hl, oid);
800 if (!header)
801 return -ENOENT;
802 return scan(header, keys, 0, out);
803 }
804
805 int DBObjectMap::check_keys(const ghobject_t &oid,
806 const set<string> &keys,
807 set<string> *out)
808 {
809 MapHeaderLock hl(this, oid);
810 Header header = lookup_map_header(hl, oid);
811 if (!header)
812 return -ENOENT;
813 return scan(header, keys, out, 0);
814 }
815
816 int DBObjectMap::get_xattrs(const ghobject_t &oid,
817 const set<string> &to_get,
818 map<string, bufferlist> *out)
819 {
820 MapHeaderLock hl(this, oid);
821 Header header = lookup_map_header(hl, oid);
822 if (!header)
823 return -ENOENT;
824 return db->get(xattr_prefix(header), to_get, out);
825 }
826
827 int DBObjectMap::get_all_xattrs(const ghobject_t &oid,
828 set<string> *out)
829 {
830 MapHeaderLock hl(this, oid);
831 Header header = lookup_map_header(hl, oid);
832 if (!header)
833 return -ENOENT;
834 KeyValueDB::Iterator iter = db->get_iterator(xattr_prefix(header));
835 if (!iter)
836 return -EINVAL;
837 for (iter->seek_to_first(); !iter->status() && iter->valid(); iter->next())
838 out->insert(iter->key());
839 return iter->status();
840 }
841
842 int DBObjectMap::set_xattrs(const ghobject_t &oid,
843 const map<string, bufferlist> &to_set,
844 const SequencerPosition *spos)
845 {
846 KeyValueDB::Transaction t = db->get_transaction();
847 MapHeaderLock hl(this, oid);
848 Header header = lookup_create_map_header(hl, oid, t);
849 if (!header)
850 return -EINVAL;
851 if (check_spos(oid, header, spos))
852 return 0;
853 t->set(xattr_prefix(header), to_set);
854 return db->submit_transaction(t);
855 }
856
857 int DBObjectMap::remove_xattrs(const ghobject_t &oid,
858 const set<string> &to_remove,
859 const SequencerPosition *spos)
860 {
861 KeyValueDB::Transaction t = db->get_transaction();
862 MapHeaderLock hl(this, oid);
863 Header header = lookup_map_header(hl, oid);
864 if (!header)
865 return -ENOENT;
866 if (check_spos(oid, header, spos))
867 return 0;
868 t->rmkeys(xattr_prefix(header), to_remove);
869 return db->submit_transaction(t);
870 }
871
872 // ONLY USED FOR TESTING
873 // Set version to 2 to avoid asserts
874 int DBObjectMap::legacy_clone(const ghobject_t &oid,
875 const ghobject_t &target,
876 const SequencerPosition *spos)
877 {
878 state.legacy = true;
879
880 if (oid == target)
881 return 0;
882
883 MapHeaderLock _l1(this, std::min(oid, target));
884 MapHeaderLock _l2(this, std::max(oid, target));
885 MapHeaderLock *lsource, *ltarget;
886 if (oid > target) {
887 lsource = &_l2;
888 ltarget= &_l1;
889 } else {
890 lsource = &_l1;
891 ltarget= &_l2;
892 }
893
894 KeyValueDB::Transaction t = db->get_transaction();
895 {
896 Header destination = lookup_map_header(*ltarget, target);
897 if (destination) {
898 if (check_spos(target, destination, spos))
899 return 0;
900 destination->num_children--;
901 remove_map_header(*ltarget, target, destination, t);
902 _clear(destination, t);
903 }
904 }
905
906 Header parent = lookup_map_header(*lsource, oid);
907 if (!parent)
908 return db->submit_transaction(t);
909
910 Header source = generate_new_header(oid, parent);
911 Header destination = generate_new_header(target, parent);
912 if (spos)
913 destination->spos = *spos;
914
915 parent->num_children = 2;
916 set_header(parent, t);
917 set_map_header(*lsource, oid, *source, t);
918 set_map_header(*ltarget, target, *destination, t);
919
920 map<string, bufferlist> to_set;
921 KeyValueDB::Iterator xattr_iter = db->get_iterator(xattr_prefix(parent));
922 for (xattr_iter->seek_to_first();
923 xattr_iter->valid();
924 xattr_iter->next())
925 to_set.insert(make_pair(xattr_iter->key(), xattr_iter->value()));
926 t->set(xattr_prefix(source), to_set);
927 t->set(xattr_prefix(destination), to_set);
928 t->rmkeys_by_prefix(xattr_prefix(parent));
929 return db->submit_transaction(t);
930 }
931
932 int DBObjectMap::clone(const ghobject_t &oid,
933 const ghobject_t &target,
934 const SequencerPosition *spos)
935 {
936 if (oid == target)
937 return 0;
938
939 MapHeaderLock _l1(this, std::min(oid, target));
940 MapHeaderLock _l2(this, std::max(oid, target));
941 MapHeaderLock *lsource, *ltarget;
942 if (oid > target) {
943 lsource = &_l2;
944 ltarget= &_l1;
945 } else {
946 lsource = &_l1;
947 ltarget= &_l2;
948 }
949
950 KeyValueDB::Transaction t = db->get_transaction();
951 {
952 Header destination = lookup_map_header(*ltarget, target);
953 if (destination) {
954 if (check_spos(target, destination, spos))
955 return 0;
956 destination->num_children--;
957 remove_map_header(*ltarget, target, destination, t);
958 _clear(destination, t);
959 }
960 }
961
962 Header source = lookup_map_header(*lsource, oid);
963 if (!source)
964 return db->submit_transaction(t);
965
966 Header destination = generate_new_header(target, Header());
967 if (spos)
968 destination->spos = *spos;
969
970 set_map_header(*ltarget, target, *destination, t);
971
972 bufferlist bl;
973 int r = _get_header(source, &bl);
974 if (r < 0)
975 return r;
976 _set_header(destination, bl, t);
977
978 map<string, bufferlist> to_set;
979 KeyValueDB::Iterator xattr_iter = db->get_iterator(xattr_prefix(source));
980 for (xattr_iter->seek_to_first();
981 xattr_iter->valid();
982 xattr_iter->next())
983 to_set.insert(make_pair(xattr_iter->key(), xattr_iter->value()));
984 t->set(xattr_prefix(destination), to_set);
985
986 map<string, bufferlist> to_write;
987 ObjectMapIterator iter = _get_iterator(source);
988 for (iter->seek_to_first() ; iter->valid() ; iter->next()) {
989 if (iter->status())
990 return iter->status();
991 to_write[iter->key()] = iter->value();
992 }
993 t->set(user_prefix(destination), to_write);
994
995 return db->submit_transaction(t);
996 }
997
998 int DBObjectMap::upgrade_to_v2()
999 {
1000 dout(1) << __func__ << " start" << dendl;
1001 KeyValueDB::Iterator iter = db->get_iterator(HOBJECT_TO_SEQ);
1002 iter->seek_to_first();
1003 while (iter->valid()) {
1004 unsigned count = 0;
1005 KeyValueDB::Transaction t = db->get_transaction();
1006 set<string> remove;
1007 map<string, bufferlist> add;
1008 for (;
1009 iter->valid() && count < 300;
1010 iter->next()) {
1011 dout(20) << __func__ << " key is " << iter->key() << dendl;
1012 int r = is_buggy_ghobject_key_v1(cct, iter->key());
1013 if (r < 0) {
1014 derr << __func__ << " bad key '" << iter->key() << "'" << dendl;
1015 return r;
1016 }
1017 if (!r) {
1018 dout(20) << __func__ << " " << iter->key() << " ok" << dendl;
1019 continue;
1020 }
1021
1022 // decode header to get oid
1023 _Header hdr;
1024 bufferlist bl = iter->value();
1025 auto bliter = bl.cbegin();
1026 hdr.decode(bliter);
1027
1028 string newkey(ghobject_key(hdr.oid));
1029 dout(20) << __func__ << " " << iter->key() << " -> " << newkey << dendl;
1030 add[newkey] = iter->value();
1031 remove.insert(iter->key());
1032 ++count;
1033 }
1034
1035 if (!remove.empty()) {
1036 dout(20) << __func__ << " updating " << remove.size() << " keys" << dendl;
1037 t->rmkeys(HOBJECT_TO_SEQ, remove);
1038 t->set(HOBJECT_TO_SEQ, add);
1039 int r = db->submit_transaction(t);
1040 if (r < 0)
1041 return r;
1042 }
1043 }
1044
1045 state.v = 2;
1046
1047 set_state();
1048 return 0;
1049 }
1050
1051 void DBObjectMap::set_state()
1052 {
1053 std::lock_guard l{header_lock};
1054 KeyValueDB::Transaction t = db->get_transaction();
1055 write_state(t);
1056 int ret = db->submit_transaction_sync(t);
1057 ceph_assert(ret == 0);
1058 dout(1) << __func__ << " done" << dendl;
1059 return;
1060 }
1061
1062 int DBObjectMap::get_state()
1063 {
1064 map<string, bufferlist> result;
1065 set<string> to_get;
1066 to_get.insert(GLOBAL_STATE_KEY);
1067 int r = db->get(SYS_PREFIX, to_get, &result);
1068 if (r < 0)
1069 return r;
1070 if (!result.empty()) {
1071 auto bliter = result.begin()->second.cbegin();
1072 state.decode(bliter);
1073 } else {
1074 // New store
1075 state.v = State::CUR_VERSION;
1076 state.seq = 1;
1077 state.legacy = false;
1078 }
1079 return 0;
1080 }
1081
1082 int DBObjectMap::init(bool do_upgrade)
1083 {
1084 int ret = get_state();
1085 if (ret < 0)
1086 return ret;
1087 if (state.v < 1) {
1088 dout(1) << "DBObjectMap is *very* old; upgrade to an older version first"
1089 << dendl;
1090 return -ENOTSUP;
1091 }
1092 if (state.v < 2) { // Needs upgrade
1093 if (!do_upgrade) {
1094 dout(1) << "DOBjbectMap requires an upgrade,"
1095 << " set filestore_update_to"
1096 << dendl;
1097 return -ENOTSUP;
1098 } else {
1099 int r = upgrade_to_v2();
1100 if (r < 0)
1101 return r;
1102 }
1103 }
1104 ostringstream ss;
1105 int errors = check(ss, true);
1106 if (errors) {
1107 derr << ss.str() << dendl;
1108 if (errors > 0)
1109 return -EINVAL;
1110 }
1111 dout(20) << "(init)dbobjectmap: seq is " << state.seq << dendl;
1112 return 0;
1113 }
1114
1115 int DBObjectMap::sync(const ghobject_t *oid,
1116 const SequencerPosition *spos) {
1117 KeyValueDB::Transaction t = db->get_transaction();
1118 if (oid) {
1119 ceph_assert(spos);
1120 MapHeaderLock hl(this, *oid);
1121 Header header = lookup_map_header(hl, *oid);
1122 if (header) {
1123 dout(10) << "oid: " << *oid << " setting spos to "
1124 << *spos << dendl;
1125 header->spos = *spos;
1126 set_map_header(hl, *oid, *header, t);
1127 }
1128 /* It may appear that this and the identical portion of the else
1129 * block can combined below, but in this block, the transaction
1130 * must be submitted under *both* the MapHeaderLock and the full
1131 * header_lock.
1132 *
1133 * See 2b63dd25fc1c73fa42e52e9ea4ab5a45dd9422a0 and bug 9891.
1134 */
1135 std::lock_guard l{header_lock};
1136 write_state(t);
1137 return db->submit_transaction_sync(t);
1138 } else {
1139 std::lock_guard l{header_lock};
1140 write_state(t);
1141 return db->submit_transaction_sync(t);
1142 }
1143 }
1144
1145 int DBObjectMap::write_state(KeyValueDB::Transaction _t) {
1146 ceph_assert(ceph_mutex_is_locked_by_me(header_lock));
1147 dout(20) << "dbobjectmap: seq is " << state.seq << dendl;
1148 KeyValueDB::Transaction t = _t ? _t : db->get_transaction();
1149 bufferlist bl;
1150 state.encode(bl);
1151 map<string, bufferlist> to_write;
1152 to_write[GLOBAL_STATE_KEY] = bl;
1153 t->set(SYS_PREFIX, to_write);
1154 return _t ? 0 : db->submit_transaction(t);
1155 }
1156
1157
1158 DBObjectMap::Header DBObjectMap::_lookup_map_header(
1159 const MapHeaderLock &l,
1160 const ghobject_t &oid)
1161 {
1162 ceph_assert(l.get_locked() == oid);
1163
1164 _Header *header = new _Header();
1165 {
1166 std::lock_guard l{cache_lock};
1167 if (caches.lookup(oid, header)) {
1168 ceph_assert(!in_use.count(header->seq));
1169 in_use.insert(header->seq);
1170 return Header(header, RemoveOnDelete(this));
1171 }
1172 }
1173
1174 bufferlist out;
1175 int r = db->get(HOBJECT_TO_SEQ, map_header_key(oid), &out);
1176 if (r < 0 || out.length()==0) {
1177 delete header;
1178 return Header();
1179 }
1180
1181 Header ret(header, RemoveOnDelete(this));
1182 auto iter = out.cbegin();
1183 ret->decode(iter);
1184 {
1185 std::lock_guard l{cache_lock};
1186 caches.add(oid, *ret);
1187 }
1188
1189 ceph_assert(!in_use.count(header->seq));
1190 in_use.insert(header->seq);
1191 return ret;
1192 }
1193
1194 DBObjectMap::Header DBObjectMap::_generate_new_header(const ghobject_t &oid,
1195 Header parent)
1196 {
1197 Header header = Header(new _Header(), RemoveOnDelete(this));
1198 header->seq = state.seq++;
1199 if (parent) {
1200 header->parent = parent->seq;
1201 header->spos = parent->spos;
1202 }
1203 header->num_children = 1;
1204 header->oid = oid;
1205 ceph_assert(!in_use.count(header->seq));
1206 in_use.insert(header->seq);
1207
1208 write_state();
1209 return header;
1210 }
1211
1212 DBObjectMap::Header DBObjectMap::lookup_parent(Header input)
1213 {
1214 std::unique_lock l{header_lock};
1215 header_cond.wait(l, [&input, this] { return !in_use.count(input->parent); });
1216 map<string, bufferlist> out;
1217 set<string> keys;
1218 keys.insert(HEADER_KEY);
1219
1220 dout(20) << "lookup_parent: parent " << input->parent
1221 << " for seq " << input->seq << dendl;
1222 int r = db->get(sys_parent_prefix(input), keys, &out);
1223 if (r < 0) {
1224 ceph_abort();
1225 return Header();
1226 }
1227 if (out.empty()) {
1228 ceph_abort();
1229 return Header();
1230 }
1231
1232 Header header = Header(new _Header(), RemoveOnDelete(this));
1233 auto iter = out.begin()->second.cbegin();
1234 header->decode(iter);
1235 ceph_assert(header->seq == input->parent);
1236 dout(20) << "lookup_parent: parent seq is " << header->seq << " with parent "
1237 << header->parent << dendl;
1238 in_use.insert(header->seq);
1239 return header;
1240 }
1241
1242 DBObjectMap::Header DBObjectMap::lookup_create_map_header(
1243 const MapHeaderLock &hl,
1244 const ghobject_t &oid,
1245 KeyValueDB::Transaction t)
1246 {
1247 std::lock_guard l{header_lock};
1248 Header header = _lookup_map_header(hl, oid);
1249 if (!header) {
1250 header = _generate_new_header(oid, Header());
1251 set_map_header(hl, oid, *header, t);
1252 }
1253 return header;
1254 }
1255
1256 void DBObjectMap::clear_header(Header header, KeyValueDB::Transaction t)
1257 {
1258 dout(20) << "clear_header: clearing seq " << header->seq << dendl;
1259 t->rmkeys_by_prefix(user_prefix(header));
1260 t->rmkeys_by_prefix(sys_prefix(header));
1261 if (state.legacy)
1262 t->rmkeys_by_prefix(complete_prefix(header)); // Needed when header.parent != 0
1263 t->rmkeys_by_prefix(xattr_prefix(header));
1264 set<string> keys;
1265 keys.insert(header_key(header->seq));
1266 t->rmkeys(USER_PREFIX, keys);
1267 }
1268
1269 void DBObjectMap::set_header(Header header, KeyValueDB::Transaction t)
1270 {
1271 dout(20) << "set_header: setting seq " << header->seq << dendl;
1272 map<string, bufferlist> to_write;
1273 header->encode(to_write[HEADER_KEY]);
1274 t->set(sys_prefix(header), to_write);
1275 }
1276
1277 void DBObjectMap::remove_map_header(
1278 const MapHeaderLock &l,
1279 const ghobject_t &oid,
1280 Header header,
1281 KeyValueDB::Transaction t)
1282 {
1283 ceph_assert(l.get_locked() == oid);
1284 dout(20) << "remove_map_header: removing " << header->seq
1285 << " oid " << oid << dendl;
1286 set<string> to_remove;
1287 to_remove.insert(map_header_key(oid));
1288 t->rmkeys(HOBJECT_TO_SEQ, to_remove);
1289 {
1290 std::lock_guard l{cache_lock};
1291 caches.clear(oid);
1292 }
1293 }
1294
1295 void DBObjectMap::set_map_header(
1296 const MapHeaderLock &l,
1297 const ghobject_t &oid, _Header header,
1298 KeyValueDB::Transaction t)
1299 {
1300 ceph_assert(l.get_locked() == oid);
1301 dout(20) << "set_map_header: setting " << header.seq
1302 << " oid " << oid << " parent seq "
1303 << header.parent << dendl;
1304 map<string, bufferlist> to_set;
1305 header.encode(to_set[map_header_key(oid)]);
1306 t->set(HOBJECT_TO_SEQ, to_set);
1307 {
1308 std::lock_guard l{cache_lock};
1309 caches.add(oid, header);
1310 }
1311 }
1312
1313 bool DBObjectMap::check_spos(const ghobject_t &oid,
1314 Header header,
1315 const SequencerPosition *spos)
1316 {
1317 if (!spos || *spos > header->spos) {
1318 stringstream out;
1319 if (spos)
1320 dout(10) << "oid: " << oid << " not skipping op, *spos "
1321 << *spos << dendl;
1322 else
1323 dout(10) << "oid: " << oid << " not skipping op, *spos "
1324 << "empty" << dendl;
1325 dout(10) << " > header.spos " << header->spos << dendl;
1326 return false;
1327 } else {
1328 dout(10) << "oid: " << oid << " skipping op, *spos " << *spos
1329 << " <= header.spos " << header->spos << dendl;
1330 return true;
1331 }
1332 }
1333
1334 int DBObjectMap::list_objects(vector<ghobject_t> *out)
1335 {
1336 KeyValueDB::Iterator iter = db->get_iterator(HOBJECT_TO_SEQ);
1337 for (iter->seek_to_first(); iter->valid(); iter->next()) {
1338 bufferlist bl = iter->value();
1339 auto bliter = bl.cbegin();
1340 _Header header;
1341 header.decode(bliter);
1342 out->push_back(header.oid);
1343 }
1344 return 0;
1345 }
1346
1347 int DBObjectMap::list_object_headers(vector<_Header> *out)
1348 {
1349 int error = 0;
1350 KeyValueDB::Iterator iter = db->get_iterator(HOBJECT_TO_SEQ);
1351 for (iter->seek_to_first(); iter->valid(); iter->next()) {
1352 bufferlist bl = iter->value();
1353 auto bliter = bl.cbegin();
1354 _Header header;
1355 header.decode(bliter);
1356 out->push_back(header);
1357 while (header.parent) {
1358 set<string> to_get;
1359 map<string, bufferlist> got;
1360 to_get.insert(HEADER_KEY);
1361 db->get(sys_parent_prefix(header), to_get, &got);
1362 if (got.empty()) {
1363 dout(0) << "Missing: seq " << header.parent << dendl;
1364 error = -ENOENT;
1365 break;
1366 } else {
1367 bl = got.begin()->second;
1368 auto bliter = bl.cbegin();
1369 header.decode(bliter);
1370 out->push_back(header);
1371 }
1372 }
1373 }
1374 return error;
1375 }
1376
1377 ostream& operator<<(ostream& out, const DBObjectMap::_Header& h)
1378 {
1379 out << "seq=" << h.seq << " parent=" << h.parent
1380 << " num_children=" << h.num_children
1381 << " ghobject=" << h.oid;
1382 return out;
1383 }
1384
1385 int DBObjectMap::rename(const ghobject_t &from,
1386 const ghobject_t &to,
1387 const SequencerPosition *spos)
1388 {
1389 if (from == to)
1390 return 0;
1391
1392 MapHeaderLock _l1(this, std::min(from, to));
1393 MapHeaderLock _l2(this, std::max(from, to));
1394 MapHeaderLock *lsource, *ltarget;
1395 if (from > to) {
1396 lsource = &_l2;
1397 ltarget= &_l1;
1398 } else {
1399 lsource = &_l1;
1400 ltarget= &_l2;
1401 }
1402
1403 KeyValueDB::Transaction t = db->get_transaction();
1404 {
1405 Header destination = lookup_map_header(*ltarget, to);
1406 if (destination) {
1407 if (check_spos(to, destination, spos))
1408 return 0;
1409 destination->num_children--;
1410 remove_map_header(*ltarget, to, destination, t);
1411 _clear(destination, t);
1412 }
1413 }
1414
1415 Header hdr = lookup_map_header(*lsource, from);
1416 if (!hdr)
1417 return db->submit_transaction(t);
1418
1419 remove_map_header(*lsource, from, hdr, t);
1420 hdr->oid = to;
1421 set_map_header(*ltarget, to, *hdr, t);
1422
1423 return db->submit_transaction(t);
1424 }