1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
3 #include "include/int_types.h"
4 #include "include/buffer.h"
12 #include "os/ObjectMap.h"
13 #include "kv/KeyValueDB.h"
14 #include "DBObjectMap.h"
17 #include "common/debug.h"
18 #include "common/config.h"
19 #include "include/ceph_assert.h"
21 #define dout_context cct
22 #define dout_subsys ceph_subsys_filestore
24 #define dout_prefix *_dout << "filestore "
28 using std::ostringstream
;
31 using std::stringstream
;
34 using ceph::bufferlist
;
36 const string
DBObjectMap::USER_PREFIX
= "_USER_";
37 const string
DBObjectMap::XATTR_PREFIX
= "_AXATTR_";
38 const string
DBObjectMap::SYS_PREFIX
= "_SYS_";
39 const string
DBObjectMap::COMPLETE_PREFIX
= "_COMPLETE_";
40 const string
DBObjectMap::HEADER_KEY
= "HEADER";
41 const string
DBObjectMap::USER_HEADER_KEY
= "USER_HEADER";
42 const string
DBObjectMap::GLOBAL_STATE_KEY
= "HEADER";
43 const string
DBObjectMap::HOBJECT_TO_SEQ
= "_HOBJTOSEQ_";
46 const string
DBObjectMap::LEAF_PREFIX
= "_LEAF_";
47 const string
DBObjectMap::REVERSE_LEAF_PREFIX
= "_REVLEAF_";
49 static void append_escaped(const string
&in
, string
*out
)
51 for (string::const_iterator i
= in
.begin(); i
!= in
.end(); ++i
) {
55 } else if (*i
== '.') {
58 } else if (*i
== '_') {
67 int DBObjectMap::check(std::ostream
&out
, bool repair
, bool force
)
69 int errors
= 0, comp_errors
= 0;
70 bool repaired
= false;
71 map
<uint64_t, uint64_t> parent_to_num_children
;
72 map
<uint64_t, uint64_t> parent_to_actual_num_children
;
73 KeyValueDB::Iterator iter
= db
->get_iterator(HOBJECT_TO_SEQ
);
74 for (iter
->seek_to_first(); iter
->valid(); iter
->next()) {
76 bufferlist bl
= iter
->value();
78 auto bliter
= bl
.cbegin();
79 header
.decode(bliter
);
81 parent_to_actual_num_children
[header
.seq
] = header
.num_children
;
83 if (state
.v
== 2 || force
) {
84 // Check complete table
85 bool complete_error
= false;
86 boost::optional
<string
> prev
;
87 KeyValueDB::Iterator complete_iter
= db
->get_iterator(USER_PREFIX
+ header_key(header
.seq
) + COMPLETE_PREFIX
);
88 for (complete_iter
->seek_to_first(); complete_iter
->valid();
89 complete_iter
->next()) {
90 if (prev
&& prev
>= complete_iter
->key()) {
91 out
<< "Bad complete for " << header
.oid
<< std::endl
;
92 complete_error
= true;
95 prev
= string(complete_iter
->value().c_str(), complete_iter
->value().length() - 1);
98 out
<< "Complete mapping for " << header
.seq
<< " :" << std::endl
;
99 for (complete_iter
->seek_to_first(); complete_iter
->valid();
100 complete_iter
->next()) {
101 out
<< complete_iter
->key() << " -> " << string(complete_iter
->value().c_str(), complete_iter
->value().length() - 1) << std::endl
;
105 KeyValueDB::Transaction t
= db
->get_transaction();
106 t
->rmkeys_by_prefix(USER_PREFIX
+ header_key(header
.seq
) + COMPLETE_PREFIX
);
107 db
->submit_transaction(t
);
108 out
<< "Cleared complete mapping to repair" << std::endl
;
110 errors
++; // Only count when not repaired
111 comp_errors
++; // Track errors here for version update
116 if (header
.parent
== 0)
119 if (!parent_to_num_children
.count(header
.parent
))
120 parent_to_num_children
[header
.parent
] = 0;
121 parent_to_num_children
[header
.parent
]++;
122 if (parent_to_actual_num_children
.count(header
.parent
))
126 map
<string
, bufferlist
> got
;
127 to_get
.insert(HEADER_KEY
);
128 db
->get(sys_parent_prefix(header
), to_get
, &got
);
130 out
<< "Missing: seq " << header
.parent
<< std::endl
;
134 bl
= got
.begin()->second
;
139 for (map
<uint64_t, uint64_t>::iterator i
= parent_to_num_children
.begin();
140 i
!= parent_to_num_children
.end();
141 parent_to_num_children
.erase(i
++)) {
142 if (!parent_to_actual_num_children
.count(i
->first
))
144 if (parent_to_actual_num_children
[i
->first
] != i
->second
) {
145 out
<< "Invalid: seq " << i
->first
<< " recorded children: "
146 << parent_to_actual_num_children
[i
->first
] << " found: "
147 << i
->second
<< std::endl
;
150 parent_to_actual_num_children
.erase(i
->first
);
153 // Only advance the version from 2 to 3 here
154 // Mark as legacy because there are still older structures
155 // we don't update. The value of legacy is only used
156 // for internal assertions.
157 if (comp_errors
== 0 && state
.v
== 2 && repair
) {
163 if (errors
== 0 && repaired
)
168 string
DBObjectMap::ghobject_key(const ghobject_t
&oid
)
171 append_escaped(oid
.hobj
.oid
.name
, &out
);
173 append_escaped(oid
.hobj
.get_key(), &out
);
175 append_escaped(oid
.hobj
.nspace
, &out
);
178 char snap_with_hash
[1000];
179 char *t
= snap_with_hash
;
180 char *end
= t
+ sizeof(snap_with_hash
);
181 if (oid
.hobj
.snap
== CEPH_NOSNAP
)
182 t
+= snprintf(t
, end
- t
, "head");
183 else if (oid
.hobj
.snap
== CEPH_SNAPDIR
)
184 t
+= snprintf(t
, end
- t
, "snapdir");
186 t
+= snprintf(t
, end
- t
, "%llx", (long long unsigned)oid
.hobj
.snap
);
188 if (oid
.hobj
.pool
== -1)
189 t
+= snprintf(t
, end
- t
, ".none");
191 t
+= snprintf(t
, end
- t
, ".%llx", (long long unsigned)oid
.hobj
.pool
);
192 t
+= snprintf(t
, end
- t
, ".%.*X", (int)(sizeof(uint32_t)*2), oid
.hobj
.get_hash());
194 if (oid
.generation
!= ghobject_t::NO_GEN
||
195 oid
.shard_id
!= shard_id_t::NO_SHARD
) {
196 t
+= snprintf(t
, end
- t
, ".%llx", (long long unsigned)oid
.generation
);
197 t
+= snprintf(t
, end
- t
, ".%x", (int)oid
.shard_id
);
199 out
+= string(snap_with_hash
);
203 // ok: pglog%u3%efs1...0.none.0017B237
204 // bad: plana8923501-10...4c.3.ffffffffffffffff.2
205 // fixed: plana8923501-10...4c.3.CB767F2D.ffffffffffffffff.2
206 // returns 0 for false, 1 for true, negative for error
207 int DBObjectMap::is_buggy_ghobject_key_v1(CephContext
* cct
,
210 int dots
= 5; // skip 5 .'s
211 const char *s
= in
.c_str();
213 while (*s
&& *s
!= '.')
216 derr
<< "unexpected null at " << (int)(s
-in
.c_str()) << dendl
;
220 } while (*s
&& --dots
);
222 derr
<< "unexpected null at " << (int)(s
-in
.c_str()) << dendl
;
225 // we are now either at a hash value (32 bits, 8 chars) or a generation
226 // value (64 bits) '.' and shard id. count the dots!
228 while (*s
&& *s
!= '.') {
234 derr
<< "hash value is not 8 chars" << dendl
;
235 return -EINVAL
; // the hash value is always 8 chars.
239 if (*s
!= '.') { // the shard follows.
240 derr
<< "missing final . and shard id at " << (int)(s
-in
.c_str()) << dendl
;
247 string
DBObjectMap::map_header_key(const ghobject_t
&oid
)
249 return ghobject_key(oid
);
252 string
DBObjectMap::header_key(uint64_t seq
)
255 snprintf(buf
, sizeof(buf
), "%.*" PRId64
, (int)(2*sizeof(seq
)), seq
);
259 string
DBObjectMap::complete_prefix(Header header
)
261 return USER_PREFIX
+ header_key(header
->seq
) + COMPLETE_PREFIX
;
264 string
DBObjectMap::user_prefix(Header header
)
266 return USER_PREFIX
+ header_key(header
->seq
) + USER_PREFIX
;
269 string
DBObjectMap::sys_prefix(Header header
)
271 return USER_PREFIX
+ header_key(header
->seq
) + SYS_PREFIX
;
274 string
DBObjectMap::xattr_prefix(Header header
)
276 return USER_PREFIX
+ header_key(header
->seq
) + XATTR_PREFIX
;
279 string
DBObjectMap::sys_parent_prefix(_Header header
)
281 return USER_PREFIX
+ header_key(header
.parent
) + SYS_PREFIX
;
284 int DBObjectMap::DBObjectMapIteratorImpl::init()
290 ceph_assert(!parent_iter
);
291 if (header
->parent
) {
292 Header parent
= map
->lookup_parent(header
);
297 parent_iter
= std::make_shared
<DBObjectMapIteratorImpl
>(map
, parent
);
299 key_iter
= map
->db
->get_iterator(map
->user_prefix(header
));
300 ceph_assert(key_iter
);
301 complete_iter
= map
->db
->get_iterator(map
->complete_prefix(header
));
302 ceph_assert(complete_iter
);
304 ceph_assert(cur_iter
);
309 ObjectMap::ObjectMapIterator
DBObjectMap::get_iterator(
310 const ghobject_t
&oid
)
312 MapHeaderLock
hl(this, oid
);
313 Header header
= lookup_map_header(hl
, oid
);
315 return ObjectMapIterator(new EmptyIteratorImpl());
316 DBObjectMapIterator iter
= _get_iterator(header
);
317 iter
->hlock
.swap(hl
);
321 int DBObjectMap::DBObjectMapIteratorImpl::seek_to_first()
326 r
= parent_iter
->seek_to_first();
330 r
= key_iter
->seek_to_first();
336 int DBObjectMap::DBObjectMapIteratorImpl::seek_to_last()
341 r
= parent_iter
->seek_to_last();
344 if (parent_iter
->valid())
345 r
= parent_iter
->next();
349 r
= key_iter
->seek_to_last();
352 if (key_iter
->valid())
353 r
= key_iter
->next();
359 int DBObjectMap::DBObjectMapIteratorImpl::lower_bound(const string
&to
)
364 r
= parent_iter
->lower_bound(to
);
368 r
= key_iter
->lower_bound(to
);
374 int DBObjectMap::DBObjectMapIteratorImpl::lower_bound_parent(const string
&to
)
376 int r
= lower_bound(to
);
379 if (valid() && !on_parent())
380 return next_parent();
385 int DBObjectMap::DBObjectMapIteratorImpl::upper_bound(const string
&after
)
390 r
= parent_iter
->upper_bound(after
);
394 r
= key_iter
->upper_bound(after
);
400 bool DBObjectMap::DBObjectMapIteratorImpl::valid()
402 bool valid
= !invalid
&& ready
;
403 ceph_assert(!valid
|| cur_iter
->valid());
407 bool DBObjectMap::DBObjectMapIteratorImpl::valid_parent()
409 if (parent_iter
&& parent_iter
->valid() &&
410 (!key_iter
->valid() || key_iter
->key() > parent_iter
->key()))
415 int DBObjectMap::DBObjectMapIteratorImpl::next()
417 ceph_assert(cur_iter
->valid());
418 ceph_assert(valid());
423 int DBObjectMap::DBObjectMapIteratorImpl::next_parent()
428 while (parent_iter
&& parent_iter
->valid() && !on_parent()) {
429 ceph_assert(valid());
430 r
= lower_bound(parent_iter
->key());
435 if (!parent_iter
|| !parent_iter
->valid()) {
441 int DBObjectMap::DBObjectMapIteratorImpl::in_complete_region(const string
&to_test
,
445 /* This is clumsy because one cannot call prev() on end(), nor can one
446 * test for == begin().
448 complete_iter
->upper_bound(to_test
);
449 if (complete_iter
->valid()) {
450 complete_iter
->prev();
451 if (!complete_iter
->valid()) {
452 complete_iter
->upper_bound(to_test
);
456 complete_iter
->seek_to_last();
457 if (!complete_iter
->valid())
461 ceph_assert(complete_iter
->key() <= to_test
);
462 ceph_assert(complete_iter
->value().length() >= 1);
463 string
_end(complete_iter
->value().c_str(),
464 complete_iter
->value().length() - 1);
465 if (_end
.empty() || _end
> to_test
) {
467 *begin
= complete_iter
->key();
472 complete_iter
->next();
473 ceph_assert(!complete_iter
->valid() || complete_iter
->key() > to_test
);
479 * Moves parent_iter to the next position both out of the complete_region and
480 * not equal to key_iter. Then, we set cur_iter to parent_iter if valid and
481 * less than key_iter and key_iter otherwise.
483 int DBObjectMap::DBObjectMapIteratorImpl::adjust()
486 while (parent_iter
&& parent_iter
->valid()) {
487 if (in_complete_region(parent_iter
->key(), &begin
, &end
)) {
488 if (end
.size() == 0) {
489 parent_iter
->seek_to_last();
490 if (parent_iter
->valid())
493 parent_iter
->lower_bound(end
);
494 } else if (key_iter
->valid() && key_iter
->key() == parent_iter
->key()) {
500 if (valid_parent()) {
501 cur_iter
= parent_iter
;
502 } else if (key_iter
->valid()) {
507 ceph_assert(invalid
|| cur_iter
->valid());
512 string
DBObjectMap::DBObjectMapIteratorImpl::key()
514 return cur_iter
->key();
517 bufferlist
DBObjectMap::DBObjectMapIteratorImpl::value()
519 return cur_iter
->value();
522 int DBObjectMap::DBObjectMapIteratorImpl::status()
527 int DBObjectMap::set_keys(const ghobject_t
&oid
,
528 const map
<string
, bufferlist
> &set
,
529 const SequencerPosition
*spos
)
531 KeyValueDB::Transaction t
= db
->get_transaction();
532 MapHeaderLock
hl(this, oid
);
533 Header header
= lookup_create_map_header(hl
, oid
, t
);
536 if (check_spos(oid
, header
, spos
))
539 t
->set(user_prefix(header
), set
);
541 return db
->submit_transaction(t
);
544 int DBObjectMap::set_header(const ghobject_t
&oid
,
545 const bufferlist
&bl
,
546 const SequencerPosition
*spos
)
548 KeyValueDB::Transaction t
= db
->get_transaction();
549 MapHeaderLock
hl(this, oid
);
550 Header header
= lookup_create_map_header(hl
, oid
, t
);
553 if (check_spos(oid
, header
, spos
))
555 _set_header(header
, bl
, t
);
556 return db
->submit_transaction(t
);
559 void DBObjectMap::_set_header(Header header
, const bufferlist
&bl
,
560 KeyValueDB::Transaction t
)
562 map
<string
, bufferlist
> to_set
;
563 to_set
[USER_HEADER_KEY
] = bl
;
564 t
->set(sys_prefix(header
), to_set
);
567 int DBObjectMap::get_header(const ghobject_t
&oid
,
570 MapHeaderLock
hl(this, oid
);
571 Header header
= lookup_map_header(hl
, oid
);
575 return _get_header(header
, bl
);
578 int DBObjectMap::_get_header(Header header
,
581 map
<string
, bufferlist
> out
;
585 to_get
.insert(USER_HEADER_KEY
);
586 int r
= db
->get(sys_prefix(header
), to_get
, &out
);
587 if (r
== 0 && !out
.empty())
591 Header
current(header
);
592 if (!current
->parent
)
594 header
= lookup_parent(current
);
598 bl
->swap(out
.begin()->second
);
602 int DBObjectMap::clear(const ghobject_t
&oid
,
603 const SequencerPosition
*spos
)
605 KeyValueDB::Transaction t
= db
->get_transaction();
606 MapHeaderLock
hl(this, oid
);
607 Header header
= lookup_map_header(hl
, oid
);
610 if (check_spos(oid
, header
, spos
))
612 remove_map_header(hl
, oid
, header
, t
);
613 ceph_assert(header
->num_children
> 0);
614 header
->num_children
--;
615 int r
= _clear(header
, t
);
618 return db
->submit_transaction(t
);
621 int DBObjectMap::_clear(Header header
,
622 KeyValueDB::Transaction t
)
625 if (header
->num_children
) {
626 set_header(header
, t
);
629 clear_header(header
, t
);
632 Header parent
= lookup_parent(header
);
636 ceph_assert(parent
->num_children
> 0);
637 parent
->num_children
--;
643 int DBObjectMap::copy_up_header(Header header
,
644 KeyValueDB::Transaction t
)
647 int r
= _get_header(header
, &bl
);
651 _set_header(header
, bl
, t
);
655 int DBObjectMap::rm_keys(const ghobject_t
&oid
,
656 const set
<string
> &to_clear
,
657 const SequencerPosition
*spos
)
659 MapHeaderLock
hl(this, oid
);
660 Header header
= lookup_map_header(hl
, oid
);
663 KeyValueDB::Transaction t
= db
->get_transaction();
664 if (check_spos(oid
, header
, spos
))
666 t
->rmkeys(user_prefix(header
), to_clear
);
667 if (!header
->parent
) {
668 return db
->submit_transaction(t
);
671 ceph_assert(state
.legacy
);
674 // We only get here for legacy (v2) stores
675 // Copy up all keys from parent excluding to_clear
677 // This eliminates a v2 format use of complete for this oid only
678 map
<string
, bufferlist
> to_write
;
679 ObjectMapIterator iter
= _get_iterator(header
);
680 for (iter
->seek_to_first() ; iter
->valid() ; iter
->next()) {
682 return iter
->status();
683 if (!to_clear
.count(iter
->key()))
684 to_write
[iter
->key()] = iter
->value();
686 t
->set(user_prefix(header
), to_write
);
687 } // destruct iter which has parent in_use
689 copy_up_header(header
, t
);
690 Header parent
= lookup_parent(header
);
693 parent
->num_children
--;
696 set_map_header(hl
, oid
, *header
, t
);
697 t
->rmkeys_by_prefix(complete_prefix(header
));
698 return db
->submit_transaction(t
);
701 int DBObjectMap::clear_keys_header(const ghobject_t
&oid
,
702 const SequencerPosition
*spos
)
704 KeyValueDB::Transaction t
= db
->get_transaction();
705 MapHeaderLock
hl(this, oid
);
706 Header header
= lookup_map_header(hl
, oid
);
709 if (check_spos(oid
, header
, spos
))
713 KeyValueDB::Iterator iter
= db
->get_iterator(xattr_prefix(header
));
716 map
<string
, bufferlist
> attrs
;
717 for (iter
->seek_to_first(); !iter
->status() && iter
->valid(); iter
->next())
718 attrs
.insert(make_pair(iter
->key(), iter
->value()));
720 return iter
->status();
722 // remove current header
723 remove_map_header(hl
, oid
, header
, t
);
724 ceph_assert(header
->num_children
> 0);
725 header
->num_children
--;
726 int r
= _clear(header
, t
);
731 Header newheader
= generate_new_header(oid
, Header());
732 set_map_header(hl
, oid
, *newheader
, t
);
734 t
->set(xattr_prefix(newheader
), attrs
);
735 return db
->submit_transaction(t
);
738 int DBObjectMap::get(const ghobject_t
&oid
,
740 map
<string
, bufferlist
> *out
)
742 MapHeaderLock
hl(this, oid
);
743 Header header
= lookup_map_header(hl
, oid
);
746 _get_header(header
, _header
);
747 ObjectMapIterator iter
= _get_iterator(header
);
748 for (iter
->seek_to_first(); iter
->valid(); iter
->next()) {
750 return iter
->status();
751 out
->insert(make_pair(iter
->key(), iter
->value()));
756 int DBObjectMap::get_keys(const ghobject_t
&oid
,
759 MapHeaderLock
hl(this, oid
);
760 Header header
= lookup_map_header(hl
, oid
);
763 ObjectMapIterator iter
= _get_iterator(header
);
764 for (iter
->seek_to_first(); iter
->valid(); iter
->next()) {
766 return iter
->status();
767 keys
->insert(iter
->key());
772 int DBObjectMap::scan(Header header
,
773 const set
<string
> &in_keys
,
774 set
<string
> *out_keys
,
775 map
<string
, bufferlist
> *out_values
)
777 ObjectMapIterator db_iter
= _get_iterator(header
);
778 for (set
<string
>::const_iterator key_iter
= in_keys
.begin();
779 key_iter
!= in_keys
.end();
781 db_iter
->lower_bound(*key_iter
);
782 if (db_iter
->status())
783 return db_iter
->status();
784 if (db_iter
->valid() && db_iter
->key() == *key_iter
) {
786 out_keys
->insert(*key_iter
);
788 out_values
->insert(make_pair(db_iter
->key(), db_iter
->value()));
794 int DBObjectMap::get_values(const ghobject_t
&oid
,
795 const set
<string
> &keys
,
796 map
<string
, bufferlist
> *out
)
798 MapHeaderLock
hl(this, oid
);
799 Header header
= lookup_map_header(hl
, oid
);
802 return scan(header
, keys
, 0, out
);
805 int DBObjectMap::check_keys(const ghobject_t
&oid
,
806 const set
<string
> &keys
,
809 MapHeaderLock
hl(this, oid
);
810 Header header
= lookup_map_header(hl
, oid
);
813 return scan(header
, keys
, out
, 0);
816 int DBObjectMap::get_xattrs(const ghobject_t
&oid
,
817 const set
<string
> &to_get
,
818 map
<string
, bufferlist
> *out
)
820 MapHeaderLock
hl(this, oid
);
821 Header header
= lookup_map_header(hl
, oid
);
824 return db
->get(xattr_prefix(header
), to_get
, out
);
827 int DBObjectMap::get_all_xattrs(const ghobject_t
&oid
,
830 MapHeaderLock
hl(this, oid
);
831 Header header
= lookup_map_header(hl
, oid
);
834 KeyValueDB::Iterator iter
= db
->get_iterator(xattr_prefix(header
));
837 for (iter
->seek_to_first(); !iter
->status() && iter
->valid(); iter
->next())
838 out
->insert(iter
->key());
839 return iter
->status();
842 int DBObjectMap::set_xattrs(const ghobject_t
&oid
,
843 const map
<string
, bufferlist
> &to_set
,
844 const SequencerPosition
*spos
)
846 KeyValueDB::Transaction t
= db
->get_transaction();
847 MapHeaderLock
hl(this, oid
);
848 Header header
= lookup_create_map_header(hl
, oid
, t
);
851 if (check_spos(oid
, header
, spos
))
853 t
->set(xattr_prefix(header
), to_set
);
854 return db
->submit_transaction(t
);
857 int DBObjectMap::remove_xattrs(const ghobject_t
&oid
,
858 const set
<string
> &to_remove
,
859 const SequencerPosition
*spos
)
861 KeyValueDB::Transaction t
= db
->get_transaction();
862 MapHeaderLock
hl(this, oid
);
863 Header header
= lookup_map_header(hl
, oid
);
866 if (check_spos(oid
, header
, spos
))
868 t
->rmkeys(xattr_prefix(header
), to_remove
);
869 return db
->submit_transaction(t
);
872 // ONLY USED FOR TESTING
873 // Set version to 2 to avoid asserts
874 int DBObjectMap::legacy_clone(const ghobject_t
&oid
,
875 const ghobject_t
&target
,
876 const SequencerPosition
*spos
)
883 MapHeaderLock
_l1(this, std::min(oid
, target
));
884 MapHeaderLock
_l2(this, std::max(oid
, target
));
885 MapHeaderLock
*lsource
, *ltarget
;
894 KeyValueDB::Transaction t
= db
->get_transaction();
896 Header destination
= lookup_map_header(*ltarget
, target
);
898 if (check_spos(target
, destination
, spos
))
900 destination
->num_children
--;
901 remove_map_header(*ltarget
, target
, destination
, t
);
902 _clear(destination
, t
);
906 Header parent
= lookup_map_header(*lsource
, oid
);
908 return db
->submit_transaction(t
);
910 Header source
= generate_new_header(oid
, parent
);
911 Header destination
= generate_new_header(target
, parent
);
913 destination
->spos
= *spos
;
915 parent
->num_children
= 2;
916 set_header(parent
, t
);
917 set_map_header(*lsource
, oid
, *source
, t
);
918 set_map_header(*ltarget
, target
, *destination
, t
);
920 map
<string
, bufferlist
> to_set
;
921 KeyValueDB::Iterator xattr_iter
= db
->get_iterator(xattr_prefix(parent
));
922 for (xattr_iter
->seek_to_first();
925 to_set
.insert(make_pair(xattr_iter
->key(), xattr_iter
->value()));
926 t
->set(xattr_prefix(source
), to_set
);
927 t
->set(xattr_prefix(destination
), to_set
);
928 t
->rmkeys_by_prefix(xattr_prefix(parent
));
929 return db
->submit_transaction(t
);
932 int DBObjectMap::clone(const ghobject_t
&oid
,
933 const ghobject_t
&target
,
934 const SequencerPosition
*spos
)
939 MapHeaderLock
_l1(this, std::min(oid
, target
));
940 MapHeaderLock
_l2(this, std::max(oid
, target
));
941 MapHeaderLock
*lsource
, *ltarget
;
950 KeyValueDB::Transaction t
= db
->get_transaction();
952 Header destination
= lookup_map_header(*ltarget
, target
);
954 if (check_spos(target
, destination
, spos
))
956 destination
->num_children
--;
957 remove_map_header(*ltarget
, target
, destination
, t
);
958 _clear(destination
, t
);
962 Header source
= lookup_map_header(*lsource
, oid
);
964 return db
->submit_transaction(t
);
966 Header destination
= generate_new_header(target
, Header());
968 destination
->spos
= *spos
;
970 set_map_header(*ltarget
, target
, *destination
, t
);
973 int r
= _get_header(source
, &bl
);
976 _set_header(destination
, bl
, t
);
978 map
<string
, bufferlist
> to_set
;
979 KeyValueDB::Iterator xattr_iter
= db
->get_iterator(xattr_prefix(source
));
980 for (xattr_iter
->seek_to_first();
983 to_set
.insert(make_pair(xattr_iter
->key(), xattr_iter
->value()));
984 t
->set(xattr_prefix(destination
), to_set
);
986 map
<string
, bufferlist
> to_write
;
987 ObjectMapIterator iter
= _get_iterator(source
);
988 for (iter
->seek_to_first() ; iter
->valid() ; iter
->next()) {
990 return iter
->status();
991 to_write
[iter
->key()] = iter
->value();
993 t
->set(user_prefix(destination
), to_write
);
995 return db
->submit_transaction(t
);
998 int DBObjectMap::upgrade_to_v2()
1000 dout(1) << __func__
<< " start" << dendl
;
1001 KeyValueDB::Iterator iter
= db
->get_iterator(HOBJECT_TO_SEQ
);
1002 iter
->seek_to_first();
1003 while (iter
->valid()) {
1005 KeyValueDB::Transaction t
= db
->get_transaction();
1007 map
<string
, bufferlist
> add
;
1009 iter
->valid() && count
< 300;
1011 dout(20) << __func__
<< " key is " << iter
->key() << dendl
;
1012 int r
= is_buggy_ghobject_key_v1(cct
, iter
->key());
1014 derr
<< __func__
<< " bad key '" << iter
->key() << "'" << dendl
;
1018 dout(20) << __func__
<< " " << iter
->key() << " ok" << dendl
;
1022 // decode header to get oid
1024 bufferlist bl
= iter
->value();
1025 auto bliter
= bl
.cbegin();
1028 string
newkey(ghobject_key(hdr
.oid
));
1029 dout(20) << __func__
<< " " << iter
->key() << " -> " << newkey
<< dendl
;
1030 add
[newkey
] = iter
->value();
1031 remove
.insert(iter
->key());
1035 if (!remove
.empty()) {
1036 dout(20) << __func__
<< " updating " << remove
.size() << " keys" << dendl
;
1037 t
->rmkeys(HOBJECT_TO_SEQ
, remove
);
1038 t
->set(HOBJECT_TO_SEQ
, add
);
1039 int r
= db
->submit_transaction(t
);
1051 void DBObjectMap::set_state()
1053 std::lock_guard l
{header_lock
};
1054 KeyValueDB::Transaction t
= db
->get_transaction();
1056 int ret
= db
->submit_transaction_sync(t
);
1057 ceph_assert(ret
== 0);
1058 dout(1) << __func__
<< " done" << dendl
;
1062 int DBObjectMap::get_state()
1064 map
<string
, bufferlist
> result
;
1066 to_get
.insert(GLOBAL_STATE_KEY
);
1067 int r
= db
->get(SYS_PREFIX
, to_get
, &result
);
1070 if (!result
.empty()) {
1071 auto bliter
= result
.begin()->second
.cbegin();
1072 state
.decode(bliter
);
1075 state
.v
= State::CUR_VERSION
;
1077 state
.legacy
= false;
1082 int DBObjectMap::init(bool do_upgrade
)
1084 int ret
= get_state();
1088 dout(1) << "DBObjectMap is *very* old; upgrade to an older version first"
1092 if (state
.v
< 2) { // Needs upgrade
1094 dout(1) << "DOBjbectMap requires an upgrade,"
1095 << " set filestore_update_to"
1099 int r
= upgrade_to_v2();
1105 int errors
= check(ss
, true);
1107 derr
<< ss
.str() << dendl
;
1111 dout(20) << "(init)dbobjectmap: seq is " << state
.seq
<< dendl
;
1115 int DBObjectMap::sync(const ghobject_t
*oid
,
1116 const SequencerPosition
*spos
) {
1117 KeyValueDB::Transaction t
= db
->get_transaction();
1120 MapHeaderLock
hl(this, *oid
);
1121 Header header
= lookup_map_header(hl
, *oid
);
1123 dout(10) << "oid: " << *oid
<< " setting spos to "
1125 header
->spos
= *spos
;
1126 set_map_header(hl
, *oid
, *header
, t
);
1128 /* It may appear that this and the identical portion of the else
1129 * block can combined below, but in this block, the transaction
1130 * must be submitted under *both* the MapHeaderLock and the full
1133 * See 2b63dd25fc1c73fa42e52e9ea4ab5a45dd9422a0 and bug 9891.
1135 std::lock_guard l
{header_lock
};
1137 return db
->submit_transaction_sync(t
);
1139 std::lock_guard l
{header_lock
};
1141 return db
->submit_transaction_sync(t
);
1145 int DBObjectMap::write_state(KeyValueDB::Transaction _t
) {
1146 ceph_assert(ceph_mutex_is_locked_by_me(header_lock
));
1147 dout(20) << "dbobjectmap: seq is " << state
.seq
<< dendl
;
1148 KeyValueDB::Transaction t
= _t
? _t
: db
->get_transaction();
1151 map
<string
, bufferlist
> to_write
;
1152 to_write
[GLOBAL_STATE_KEY
] = bl
;
1153 t
->set(SYS_PREFIX
, to_write
);
1154 return _t
? 0 : db
->submit_transaction(t
);
1158 DBObjectMap::Header
DBObjectMap::_lookup_map_header(
1159 const MapHeaderLock
&l
,
1160 const ghobject_t
&oid
)
1162 ceph_assert(l
.get_locked() == oid
);
1164 _Header
*header
= new _Header();
1166 std::lock_guard l
{cache_lock
};
1167 if (caches
.lookup(oid
, header
)) {
1168 ceph_assert(!in_use
.count(header
->seq
));
1169 in_use
.insert(header
->seq
);
1170 return Header(header
, RemoveOnDelete(this));
1175 int r
= db
->get(HOBJECT_TO_SEQ
, map_header_key(oid
), &out
);
1176 if (r
< 0 || out
.length()==0) {
1181 Header
ret(header
, RemoveOnDelete(this));
1182 auto iter
= out
.cbegin();
1185 std::lock_guard l
{cache_lock
};
1186 caches
.add(oid
, *ret
);
1189 ceph_assert(!in_use
.count(header
->seq
));
1190 in_use
.insert(header
->seq
);
1194 DBObjectMap::Header
DBObjectMap::_generate_new_header(const ghobject_t
&oid
,
1197 Header header
= Header(new _Header(), RemoveOnDelete(this));
1198 header
->seq
= state
.seq
++;
1200 header
->parent
= parent
->seq
;
1201 header
->spos
= parent
->spos
;
1203 header
->num_children
= 1;
1205 ceph_assert(!in_use
.count(header
->seq
));
1206 in_use
.insert(header
->seq
);
1212 DBObjectMap::Header
DBObjectMap::lookup_parent(Header input
)
1214 std::unique_lock l
{header_lock
};
1215 header_cond
.wait(l
, [&input
, this] { return !in_use
.count(input
->parent
); });
1216 map
<string
, bufferlist
> out
;
1218 keys
.insert(HEADER_KEY
);
1220 dout(20) << "lookup_parent: parent " << input
->parent
1221 << " for seq " << input
->seq
<< dendl
;
1222 int r
= db
->get(sys_parent_prefix(input
), keys
, &out
);
1232 Header header
= Header(new _Header(), RemoveOnDelete(this));
1233 auto iter
= out
.begin()->second
.cbegin();
1234 header
->decode(iter
);
1235 ceph_assert(header
->seq
== input
->parent
);
1236 dout(20) << "lookup_parent: parent seq is " << header
->seq
<< " with parent "
1237 << header
->parent
<< dendl
;
1238 in_use
.insert(header
->seq
);
1242 DBObjectMap::Header
DBObjectMap::lookup_create_map_header(
1243 const MapHeaderLock
&hl
,
1244 const ghobject_t
&oid
,
1245 KeyValueDB::Transaction t
)
1247 std::lock_guard l
{header_lock
};
1248 Header header
= _lookup_map_header(hl
, oid
);
1250 header
= _generate_new_header(oid
, Header());
1251 set_map_header(hl
, oid
, *header
, t
);
1256 void DBObjectMap::clear_header(Header header
, KeyValueDB::Transaction t
)
1258 dout(20) << "clear_header: clearing seq " << header
->seq
<< dendl
;
1259 t
->rmkeys_by_prefix(user_prefix(header
));
1260 t
->rmkeys_by_prefix(sys_prefix(header
));
1262 t
->rmkeys_by_prefix(complete_prefix(header
)); // Needed when header.parent != 0
1263 t
->rmkeys_by_prefix(xattr_prefix(header
));
1265 keys
.insert(header_key(header
->seq
));
1266 t
->rmkeys(USER_PREFIX
, keys
);
1269 void DBObjectMap::set_header(Header header
, KeyValueDB::Transaction t
)
1271 dout(20) << "set_header: setting seq " << header
->seq
<< dendl
;
1272 map
<string
, bufferlist
> to_write
;
1273 header
->encode(to_write
[HEADER_KEY
]);
1274 t
->set(sys_prefix(header
), to_write
);
1277 void DBObjectMap::remove_map_header(
1278 const MapHeaderLock
&l
,
1279 const ghobject_t
&oid
,
1281 KeyValueDB::Transaction t
)
1283 ceph_assert(l
.get_locked() == oid
);
1284 dout(20) << "remove_map_header: removing " << header
->seq
1285 << " oid " << oid
<< dendl
;
1286 set
<string
> to_remove
;
1287 to_remove
.insert(map_header_key(oid
));
1288 t
->rmkeys(HOBJECT_TO_SEQ
, to_remove
);
1290 std::lock_guard l
{cache_lock
};
1295 void DBObjectMap::set_map_header(
1296 const MapHeaderLock
&l
,
1297 const ghobject_t
&oid
, _Header header
,
1298 KeyValueDB::Transaction t
)
1300 ceph_assert(l
.get_locked() == oid
);
1301 dout(20) << "set_map_header: setting " << header
.seq
1302 << " oid " << oid
<< " parent seq "
1303 << header
.parent
<< dendl
;
1304 map
<string
, bufferlist
> to_set
;
1305 header
.encode(to_set
[map_header_key(oid
)]);
1306 t
->set(HOBJECT_TO_SEQ
, to_set
);
1308 std::lock_guard l
{cache_lock
};
1309 caches
.add(oid
, header
);
1313 bool DBObjectMap::check_spos(const ghobject_t
&oid
,
1315 const SequencerPosition
*spos
)
1317 if (!spos
|| *spos
> header
->spos
) {
1320 dout(10) << "oid: " << oid
<< " not skipping op, *spos "
1323 dout(10) << "oid: " << oid
<< " not skipping op, *spos "
1324 << "empty" << dendl
;
1325 dout(10) << " > header.spos " << header
->spos
<< dendl
;
1328 dout(10) << "oid: " << oid
<< " skipping op, *spos " << *spos
1329 << " <= header.spos " << header
->spos
<< dendl
;
1334 int DBObjectMap::list_objects(vector
<ghobject_t
> *out
)
1336 KeyValueDB::Iterator iter
= db
->get_iterator(HOBJECT_TO_SEQ
);
1337 for (iter
->seek_to_first(); iter
->valid(); iter
->next()) {
1338 bufferlist bl
= iter
->value();
1339 auto bliter
= bl
.cbegin();
1341 header
.decode(bliter
);
1342 out
->push_back(header
.oid
);
1347 int DBObjectMap::list_object_headers(vector
<_Header
> *out
)
1350 KeyValueDB::Iterator iter
= db
->get_iterator(HOBJECT_TO_SEQ
);
1351 for (iter
->seek_to_first(); iter
->valid(); iter
->next()) {
1352 bufferlist bl
= iter
->value();
1353 auto bliter
= bl
.cbegin();
1355 header
.decode(bliter
);
1356 out
->push_back(header
);
1357 while (header
.parent
) {
1359 map
<string
, bufferlist
> got
;
1360 to_get
.insert(HEADER_KEY
);
1361 db
->get(sys_parent_prefix(header
), to_get
, &got
);
1363 dout(0) << "Missing: seq " << header
.parent
<< dendl
;
1367 bl
= got
.begin()->second
;
1368 auto bliter
= bl
.cbegin();
1369 header
.decode(bliter
);
1370 out
->push_back(header
);
1377 ostream
& operator<<(ostream
& out
, const DBObjectMap::_Header
& h
)
1379 out
<< "seq=" << h
.seq
<< " parent=" << h
.parent
1380 << " num_children=" << h
.num_children
1381 << " ghobject=" << h
.oid
;
1385 int DBObjectMap::rename(const ghobject_t
&from
,
1386 const ghobject_t
&to
,
1387 const SequencerPosition
*spos
)
1392 MapHeaderLock
_l1(this, std::min(from
, to
));
1393 MapHeaderLock
_l2(this, std::max(from
, to
));
1394 MapHeaderLock
*lsource
, *ltarget
;
1403 KeyValueDB::Transaction t
= db
->get_transaction();
1405 Header destination
= lookup_map_header(*ltarget
, to
);
1407 if (check_spos(to
, destination
, spos
))
1409 destination
->num_children
--;
1410 remove_map_header(*ltarget
, to
, destination
, t
);
1411 _clear(destination
, t
);
1415 Header hdr
= lookup_map_header(*lsource
, from
);
1417 return db
->submit_transaction(t
);
1419 remove_map_header(*lsource
, from
, hdr
, t
);
1421 set_map_header(*ltarget
, to
, *hdr
, t
);
1423 return db
->submit_transaction(t
);