1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 Red Hat
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
17 #include <sys/types.h>
21 #if defined(__FreeBSD__)
22 #include <sys/param.h>
23 #include <sys/mount.h>
27 #include "osd/osd_types.h"
29 #include "include/compat.h"
30 #include "include/stringify.h"
31 #include "common/errno.h"
32 #include "common/safe_io.h"
33 #include "common/Formatter.h"
36 #define dout_context cct
37 #define dout_subsys ceph_subsys_kstore
43 * superblock, features
44 * refcounted extents (for efficient clone)
48 const string PREFIX_SUPER
= "S"; // field -> value
49 const string PREFIX_COLL
= "C"; // collection name -> (nothing)
50 const string PREFIX_OBJ
= "O"; // object name -> onode
51 const string PREFIX_DATA
= "D"; // nid + offset -> data
52 const string PREFIX_OMAP
= "M"; // u64 + keyname -> value
55 * object name key structure
57 * 2 chars: shard (-- for none, or hex digit, so that we sort properly)
58 * encoded u64: poolid + 2^63 (so that it sorts properly)
59 * encoded u32: hash (bit reversed)
63 * escaped string: namespace
65 * 1 char: '<', '=', or '>'. if =, then object key == object name, and
66 * we are followed just by the key. otherwise, we are followed by
67 * the key and then the object name.
69 * escaped string: object name (unless '=' above)
72 * encoded u64: generation
76 * string encoding in the key
78 * The key string needs to lexicographically sort the same way that
79 * ghobject_t does. We do this by escaping anything <= to '#' with #
80 * plus a 2 digit hex string, and anything >= '~' with ~ plus the two
83 * We use ! as a terminator for strings; this works because it is < #
84 * and will get escaped if it is present in the string.
88 static void append_escaped(const string
&in
, string
*out
)
91 for (string::const_iterator i
= in
.begin(); i
!= in
.end(); ++i
) {
93 snprintf(hexbyte
, sizeof(hexbyte
), "#%02x", (uint8_t)*i
);
95 } else if (*i
>= '~') {
96 snprintf(hexbyte
, sizeof(hexbyte
), "~%02x", (uint8_t)*i
);
105 static int decode_escaped(const char *p
, string
*out
)
107 const char *orig_p
= p
;
108 while (*p
&& *p
!= '!') {
109 if (*p
== '#' || *p
== '~') {
111 int r
= sscanf(++p
, "%2x", &hex
);
114 out
->push_back((char)hex
);
117 out
->push_back(*p
++);
123 // some things we encode in binary (as le32 or le64); print the
124 // resulting key strings nicely
125 static string
pretty_binary_string(const string
& in
)
129 out
.reserve(in
.length() * 3);
130 enum { NONE
, HEX
, STRING
} mode
= NONE
;
131 unsigned from
= 0, i
;
132 for (i
=0; i
< in
.length(); ++i
) {
133 if ((in
[i
] < 32 || (unsigned char)in
[i
] > 126) ||
134 (mode
== HEX
&& in
.length() - i
>= 4 &&
135 ((in
[i
] < 32 || (unsigned char)in
[i
] > 126) ||
136 (in
[i
+1] < 32 || (unsigned char)in
[i
+1] > 126) ||
137 (in
[i
+2] < 32 || (unsigned char)in
[i
+2] > 126) ||
138 (in
[i
+3] < 32 || (unsigned char)in
[i
+3] > 126)))) {
139 if (mode
== STRING
) {
140 out
.append(in
.substr(from
, i
- from
));
147 if (in
.length() - i
>= 4) {
148 // print a whole u32 at once
149 snprintf(buf
, sizeof(buf
), "%08x",
150 (uint32_t)(((unsigned char)in
[i
] << 24) |
151 ((unsigned char)in
[i
+1] << 16) |
152 ((unsigned char)in
[i
+2] << 8) |
153 ((unsigned char)in
[i
+3] << 0)));
156 snprintf(buf
, sizeof(buf
), "%02x", (int)(unsigned char)in
[i
]);
160 if (mode
!= STRING
) {
167 if (mode
== STRING
) {
168 out
.append(in
.substr(from
, i
- from
));
174 static void _key_encode_shard(shard_id_t shard
, string
*key
)
176 // make field ordering match with ghobject_t compare operations
177 if (shard
== shard_id_t::NO_SHARD
) {
178 // otherwise ff will sort *after* 0, not before.
182 snprintf(buf
, sizeof(buf
), "%02x", (int)shard
);
186 static const char *_key_decode_shard(const char *key
, shard_id_t
*pshard
)
189 *pshard
= shard_id_t::NO_SHARD
;
192 int r
= sscanf(key
, "%x", &shard
);
195 *pshard
= shard_id_t(shard
);
200 static void get_coll_key_range(const coll_t
& cid
, int bits
,
201 string
*temp_start
, string
*temp_end
,
202 string
*start
, string
*end
)
210 if (cid
.is_pg(&pgid
)) {
211 _key_encode_shard(pgid
.shard
, start
);
213 *temp_start
= *start
;
216 _key_encode_u64(pgid
.pool() + 0x8000000000000000ull
, start
);
217 _key_encode_u64((-2ll - pgid
.pool()) + 0x8000000000000000ull
, temp_start
);
218 _key_encode_u32(hobject_t::_reverse_bits(pgid
.ps()), start
);
219 _key_encode_u32(hobject_t::_reverse_bits(pgid
.ps()), temp_start
);
221 temp_start
->append(".");
223 _key_encode_u64(pgid
.pool() + 0x8000000000000000ull
, end
);
224 _key_encode_u64((-2ll - pgid
.pool()) + 0x8000000000000000ull
, temp_end
);
227 hobject_t::_reverse_bits(pgid
.ps()) + (1ull << (32-bits
));
228 if (end_hash
<= 0xffffffffull
) {
229 _key_encode_u32(end_hash
, end
);
230 _key_encode_u32(end_hash
, temp_end
);
232 temp_end
->append(".");
234 _key_encode_u32(0xffffffff, end
);
235 _key_encode_u32(0xffffffff, temp_end
);
237 temp_end
->append(":");
240 _key_encode_shard(shard_id_t::NO_SHARD
, start
);
241 _key_encode_u64(-1ull + 0x8000000000000000ull
, start
);
243 _key_encode_u32(0, start
);
245 _key_encode_u32(0xffffffff, end
);
248 // no separate temp section
254 static int get_key_object(const string
& key
, ghobject_t
*oid
);
256 static void get_object_key(CephContext
* cct
, const ghobject_t
& oid
,
261 _key_encode_shard(oid
.shard_id
, key
);
262 _key_encode_u64(oid
.hobj
.pool
+ 0x8000000000000000ull
, key
);
263 _key_encode_u32(oid
.hobj
.get_bitwise_key_u32(), key
);
266 append_escaped(oid
.hobj
.nspace
, key
);
268 if (oid
.hobj
.get_key().length()) {
269 // is a key... could be < = or >.
270 // (ASCII chars < = and > sort in that order, yay)
271 if (oid
.hobj
.get_key() < oid
.hobj
.oid
.name
) {
273 append_escaped(oid
.hobj
.get_key(), key
);
274 append_escaped(oid
.hobj
.oid
.name
, key
);
275 } else if (oid
.hobj
.get_key() > oid
.hobj
.oid
.name
) {
277 append_escaped(oid
.hobj
.get_key(), key
);
278 append_escaped(oid
.hobj
.oid
.name
, key
);
282 append_escaped(oid
.hobj
.oid
.name
, key
);
287 append_escaped(oid
.hobj
.oid
.name
, key
);
290 _key_encode_u64(oid
.hobj
.snap
, key
);
291 _key_encode_u64(oid
.generation
, key
);
296 int r
= get_key_object(*key
, &t
);
298 derr
<< " r " << r
<< dendl
;
299 derr
<< "key " << pretty_binary_string(*key
) << dendl
;
300 derr
<< "oid " << oid
<< dendl
;
301 derr
<< " t " << t
<< dendl
;
302 ceph_assert(t
== oid
);
307 static int get_key_object(const string
& key
, ghobject_t
*oid
)
310 const char *p
= key
.c_str();
312 p
= _key_decode_shard(p
, &oid
->shard_id
);
315 p
= _key_decode_u64(p
, &pool
);
316 oid
->hobj
.pool
= pool
- 0x8000000000000000ull
;
319 p
= _key_decode_u32(p
, &hash
);
320 oid
->hobj
.set_bitwise_key_u32(hash
);
325 r
= decode_escaped(p
, &oid
->hobj
.nspace
);
333 r
= decode_escaped(p
, &oid
->hobj
.oid
.name
);
337 } else if (*p
== '<' || *p
== '>') {
341 r
= decode_escaped(p
, &okey
);
345 r
= decode_escaped(p
, &oid
->hobj
.oid
.name
);
349 oid
->hobj
.set_key(okey
);
355 p
= _key_decode_u64(p
, &oid
->hobj
.snap
.val
);
356 p
= _key_decode_u64(p
, &oid
->generation
);
358 // if we get something other than a null terminator here,
359 // something goes wrong.
367 static void get_data_key(uint64_t nid
, uint64_t offset
, string
*out
)
369 _key_encode_u64(nid
, out
);
370 _key_encode_u64(offset
, out
);
374 static void get_omap_header(uint64_t id
, string
*out
)
376 _key_encode_u64(id
, out
);
380 // hmm, I don't think there's any need to escape the user key since we
381 // have a clean prefix.
382 static void get_omap_key(uint64_t id
, const string
& key
, string
*out
)
384 _key_encode_u64(id
, out
);
389 static void rewrite_omap_key(uint64_t id
, string old
, string
*out
)
391 _key_encode_u64(id
, out
);
392 out
->append(old
.substr(out
->length()));
395 static void decode_omap_key(const string
& key
, string
*user_key
)
397 *user_key
= key
.substr(sizeof(uint64_t) + 1);
400 static void get_omap_tail(uint64_t id
, string
*out
)
402 _key_encode_u64(id
, out
);
411 #define dout_prefix *_dout << "kstore.onode(" << this << ") "
413 void KStore::Onode::flush()
415 std::unique_lock
<std::mutex
> l(flush_lock
);
416 dout(20) << __func__
<< " " << flush_txns
<< dendl
;
417 while (!flush_txns
.empty())
419 dout(20) << __func__
<< " done" << dendl
;
425 #define dout_prefix *_dout << "kstore.lru(" << this << ") "
427 void KStore::OnodeHashLRU::_touch(OnodeRef o
)
429 lru_list_t::iterator p
= lru
.iterator_to(*o
);
434 void KStore::OnodeHashLRU::add(const ghobject_t
& oid
, OnodeRef o
)
436 std::lock_guard
<std::mutex
> l(lock
);
437 dout(30) << __func__
<< " " << oid
<< " " << o
<< dendl
;
438 ceph_assert(onode_map
.count(oid
) == 0);
443 KStore::OnodeRef
KStore::OnodeHashLRU::lookup(const ghobject_t
& oid
)
445 std::lock_guard
<std::mutex
> l(lock
);
446 dout(30) << __func__
<< dendl
;
447 ceph::unordered_map
<ghobject_t
,OnodeRef
>::iterator p
= onode_map
.find(oid
);
448 if (p
== onode_map
.end()) {
449 dout(30) << __func__
<< " " << oid
<< " miss" << dendl
;
452 dout(30) << __func__
<< " " << oid
<< " hit " << p
->second
<< dendl
;
457 void KStore::OnodeHashLRU::clear()
459 std::lock_guard
<std::mutex
> l(lock
);
460 dout(10) << __func__
<< dendl
;
465 void KStore::OnodeHashLRU::rename(const ghobject_t
& old_oid
,
466 const ghobject_t
& new_oid
)
468 std::lock_guard
<std::mutex
> l(lock
);
469 dout(30) << __func__
<< " " << old_oid
<< " -> " << new_oid
<< dendl
;
470 ceph::unordered_map
<ghobject_t
,OnodeRef
>::iterator po
, pn
;
471 po
= onode_map
.find(old_oid
);
472 pn
= onode_map
.find(new_oid
);
474 ceph_assert(po
!= onode_map
.end());
475 if (pn
!= onode_map
.end()) {
476 lru_list_t::iterator p
= lru
.iterator_to(*pn
->second
);
480 OnodeRef o
= po
->second
;
482 // install a non-existent onode it its place
483 po
->second
.reset(new Onode(cct
, old_oid
, o
->key
));
484 lru
.push_back(*po
->second
);
487 onode_map
.insert(make_pair(new_oid
, o
));
490 get_object_key(cct
, new_oid
, &o
->key
);
493 bool KStore::OnodeHashLRU::get_next(
494 const ghobject_t
& after
,
495 pair
<ghobject_t
,OnodeRef
> *next
)
497 std::lock_guard
<std::mutex
> l(lock
);
498 dout(20) << __func__
<< " after " << after
<< dendl
;
500 if (after
== ghobject_t()) {
504 ceph::unordered_map
<ghobject_t
,OnodeRef
>::iterator p
= onode_map
.begin();
505 ceph_assert(p
!= onode_map
.end());
506 next
->first
= p
->first
;
507 next
->second
= p
->second
;
511 ceph::unordered_map
<ghobject_t
,OnodeRef
>::iterator p
= onode_map
.find(after
);
512 ceph_assert(p
!= onode_map
.end()); // for now
513 lru_list_t::iterator pi
= lru
.iterator_to(*p
->second
);
515 if (pi
== lru
.end()) {
518 next
->first
= pi
->oid
;
519 next
->second
= onode_map
[pi
->oid
];
523 int KStore::OnodeHashLRU::trim(int max
)
525 std::lock_guard
<std::mutex
> l(lock
);
526 dout(20) << __func__
<< " max " << max
527 << " size " << onode_map
.size() << dendl
;
529 int num
= onode_map
.size() - max
;
530 if (onode_map
.size() == 0 || num
<= 0)
531 return 0; // don't even try
533 lru_list_t::iterator p
= lru
.end();
538 int refs
= o
->nref
.load();
540 dout(20) << __func__
<< " " << o
->oid
<< " has " << refs
541 << " refs; stopping with " << num
<< " left to trim" << dendl
;
544 dout(30) << __func__
<< " trim " << o
->oid
<< dendl
;
545 if (p
!= lru
.begin()) {
549 ceph_assert(num
== 1);
551 o
->get(); // paranoia
552 onode_map
.erase(o
->oid
);
560 // =======================================================
565 #define dout_prefix *_dout << "kstore(" << store->path << ").collection(" << cid << ") "
567 KStore::Collection::Collection(KStore
*ns
, coll_t cid
)
568 : CollectionImpl(ns
->cct
, cid
),
570 osr(new OpSequencer()),
571 onode_map(store
->cct
)
575 void KStore::Collection::flush()
580 bool KStore::Collection::flush_commit(Context
*c
)
582 return osr
->flush_commit(c
);
586 KStore::OnodeRef
KStore::Collection::get_onode(
587 const ghobject_t
& oid
,
590 ceph_assert(create
? ceph_mutex_is_wlocked(lock
) : ceph_mutex_is_locked(lock
));
593 if (cid
.is_pg(&pgid
)) {
594 if (!oid
.match(cnode
.bits
, pgid
.ps())) {
595 lderr(store
->cct
) << __func__
<< " oid " << oid
<< " not part of "
596 << pgid
<< " bits " << cnode
.bits
<< dendl
;
601 OnodeRef o
= onode_map
.lookup(oid
);
606 get_object_key(store
->cct
, oid
, &key
);
608 ldout(store
->cct
, 20) << __func__
<< " oid " << oid
<< " key "
609 << pretty_binary_string(key
) << dendl
;
612 int r
= store
->db
->get(PREFIX_OBJ
, key
, &v
);
613 ldout(store
->cct
, 20) << " r " << r
<< " v.len " << v
.length() << dendl
;
615 if (v
.length() == 0) {
616 ceph_assert(r
== -ENOENT
);
621 on
= new Onode(store
->cct
, oid
, key
);
626 on
= new Onode(store
->cct
, oid
, key
);
629 decode(on
->onode
, p
);
632 onode_map
.add(oid
, o
);
638 // =======================================================
641 #define dout_prefix *_dout << "kstore(" << path << ") "
643 KStore::KStore(CephContext
*cct
, const string
& path
)
644 : ObjectStore(cct
, path
),
652 throttle_ops(cct
, "kstore_max_ops", cct
->_conf
->kstore_max_ops
),
653 throttle_bytes(cct
, "kstore_max_bytes", cct
->_conf
->kstore_max_bytes
),
655 kv_sync_thread(this),
665 ceph_assert(!mounted
);
666 ceph_assert(db
== NULL
);
667 ceph_assert(fsid_fd
< 0);
670 void KStore::_init_logger()
673 PerfCountersBuilder
b(cct
, "KStore",
674 l_kstore_first
, l_kstore_last
);
675 b
.add_time_avg(l_kstore_state_prepare_lat
, "state_prepare_lat", "Average prepare state latency");
676 b
.add_time_avg(l_kstore_state_kv_queued_lat
, "state_kv_queued_lat", "Average kv_queued state latency");
677 b
.add_time_avg(l_kstore_state_kv_done_lat
, "state_kv_done_lat", "Average kv_done state latency");
678 b
.add_time_avg(l_kstore_state_finishing_lat
, "state_finishing_lat", "Average finishing state latency");
679 b
.add_time_avg(l_kstore_state_done_lat
, "state_done_lat", "Average done state latency");
680 logger
= b
.create_perf_counters();
681 cct
->get_perfcounters_collection()->add(logger
);
684 void KStore::_shutdown_logger()
687 cct
->get_perfcounters_collection()->remove(logger
);
691 int KStore::_open_path()
693 ceph_assert(path_fd
< 0);
694 path_fd
= ::open(path
.c_str(), O_DIRECTORY
|O_CLOEXEC
);
697 derr
<< __func__
<< " unable to open " << path
<< ": " << cpp_strerror(r
)
704 void KStore::_close_path()
706 VOID_TEMP_FAILURE_RETRY(::close(path_fd
));
710 int KStore::_open_fsid(bool create
)
712 ceph_assert(fsid_fd
< 0);
716 fsid_fd
= ::openat(path_fd
, "fsid", flags
, 0644);
719 derr
<< __func__
<< " " << cpp_strerror(err
) << dendl
;
725 int KStore::_read_fsid(uuid_d
*uuid
)
728 memset(fsid_str
, 0, sizeof(fsid_str
));
729 int ret
= safe_read(fsid_fd
, fsid_str
, sizeof(fsid_str
));
731 derr
<< __func__
<< " failed: " << cpp_strerror(ret
) << dendl
;
738 if (!uuid
->parse(fsid_str
)) {
739 derr
<< __func__
<< " unparsable uuid " << fsid_str
<< dendl
;
745 int KStore::_write_fsid()
747 int r
= ::ftruncate(fsid_fd
, 0);
750 derr
<< __func__
<< " fsid truncate failed: " << cpp_strerror(r
) << dendl
;
753 string str
= stringify(fsid
) + "\n";
754 r
= safe_write(fsid_fd
, str
.c_str(), str
.length());
756 derr
<< __func__
<< " fsid write failed: " << cpp_strerror(r
) << dendl
;
759 r
= ::fsync(fsid_fd
);
762 derr
<< __func__
<< " fsid fsync failed: " << cpp_strerror(r
) << dendl
;
768 void KStore::_close_fsid()
770 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd
));
774 int KStore::_lock_fsid()
777 memset(&l
, 0, sizeof(l
));
779 l
.l_whence
= SEEK_SET
;
782 int r
= ::fcntl(fsid_fd
, F_SETLK
, &l
);
785 derr
<< __func__
<< " failed to lock " << path
<< "/fsid"
786 << " (is another ceph-osd still running?)"
787 << cpp_strerror(err
) << dendl
;
793 bool KStore::test_mount_in_use()
795 // most error conditions mean the mount is not in use (e.g., because
796 // it doesn't exist). only if we fail to lock do we conclude it is
799 int r
= _open_path();
802 r
= _open_fsid(false);
807 ret
= true; // if we can't lock, it is in use
814 int KStore::_open_db(bool create
)
819 snprintf(fn
, sizeof(fn
), "%s/db", path
.c_str());
823 kv_backend
= cct
->_conf
->kstore_backend
;
825 r
= read_meta("kv_backend", &kv_backend
);
827 derr
<< __func__
<< " uanble to read 'kv_backend' meta" << dendl
;
831 dout(10) << __func__
<< " kv_backend = " << kv_backend
<< dendl
;
834 int r
= ::mkdir(fn
, 0755);
837 if (r
< 0 && r
!= -EEXIST
) {
838 derr
<< __func__
<< " failed to create " << fn
<< ": " << cpp_strerror(r
)
844 char walfn
[PATH_MAX
];
845 snprintf(walfn
, sizeof(walfn
), "%s/db.wal", path
.c_str());
846 r
= ::mkdir(walfn
, 0755);
849 if (r
< 0 && r
!= -EEXIST
) {
850 derr
<< __func__
<< " failed to create " << walfn
851 << ": " << cpp_strerror(r
)
857 db
= KeyValueDB::create(cct
, kv_backend
, fn
);
859 derr
<< __func__
<< " error creating db" << dendl
;
863 if (kv_backend
== "rocksdb")
864 options
= cct
->_conf
->kstore_rocksdb_options
;
868 r
= db
->create_and_open(err
);
872 derr
<< __func__
<< " erroring opening db: " << err
.str() << dendl
;
877 dout(1) << __func__
<< " opened " << kv_backend
878 << " path " << fn
<< " options " << options
<< dendl
;
882 void KStore::_close_db()
889 int KStore::_open_collections(int *errors
)
891 ceph_assert(coll_map
.empty());
892 KeyValueDB::Iterator it
= db
->get_iterator(PREFIX_COLL
);
893 for (it
->upper_bound(string());
897 if (cid
.parse(it
->key())) {
898 auto c
= ceph::make_ref
<Collection
>(this, cid
);
899 bufferlist bl
= it
->value();
900 auto p
= bl
.cbegin();
903 } catch (buffer::error
& e
) {
904 derr
<< __func__
<< " failed to decode cnode, key:"
905 << pretty_binary_string(it
->key()) << dendl
;
908 dout(20) << __func__
<< " opened " << cid
<< dendl
;
911 derr
<< __func__
<< " unrecognized collection " << it
->key() << dendl
;
921 dout(1) << __func__
<< " path " << path
<< dendl
;
929 r
= _open_fsid(true);
937 r
= _read_fsid(&old_fsid
);
938 if (r
< 0 || old_fsid
.is_zero()) {
939 if (fsid
.is_zero()) {
940 fsid
.generate_random();
941 dout(1) << __func__
<< " generated fsid " << fsid
<< dendl
;
943 dout(1) << __func__
<< " using provided fsid " << fsid
<< dendl
;
945 // we'll write it last.
947 if (!fsid
.is_zero() && fsid
!= old_fsid
) {
948 derr
<< __func__
<< " on-disk fsid " << old_fsid
949 << " != provided " << fsid
<< dendl
;
954 dout(1) << __func__
<< " already created, fsid is " << fsid
<< dendl
;
962 r
= write_meta("kv_backend", cct
->_conf
->kstore_backend
);
966 r
= write_meta("type", "kstore");
970 // indicate mkfs completion/success by writing the fsid file
973 dout(10) << __func__
<< " success" << dendl
;
975 derr
<< __func__
<< " error writing fsid: " << cpp_strerror(r
) << dendl
;
988 dout(1) << __func__
<< " path " << path
<< dendl
;
990 if (cct
->_conf
->kstore_fsck_on_mount
) {
991 int rc
= fsck(cct
->_conf
->kstore_fsck_on_mount_deep
);
996 int r
= _open_path();
999 r
= _open_fsid(false);
1003 r
= _read_fsid(&fsid
);
1011 r
= _open_db(false);
1015 r
= _open_super_meta();
1019 r
= _open_collections();
1024 kv_sync_thread
.create("kstore_kv_sync");
1038 int KStore::umount()
1040 ceph_assert(mounted
);
1041 dout(1) << __func__
<< dendl
;
1044 _reap_collections();
1047 dout(20) << __func__
<< " stopping kv thread" << dendl
;
1049 dout(20) << __func__
<< " draining finisher" << dendl
;
1050 finisher
.wait_for_empty();
1051 dout(20) << __func__
<< " stopping finisher" << dendl
;
1053 dout(20) << __func__
<< " closing" << dendl
;
1062 int KStore::fsck(bool deep
)
1064 dout(1) << __func__
<< dendl
;
1066 dout(1) << __func__
<< " finish with " << errors
<< " errors" << dendl
;
1070 void KStore::_sync()
1072 dout(10) << __func__
<< dendl
;
1074 std::unique_lock
<std::mutex
> l(kv_lock
);
1075 while (!kv_committing
.empty() ||
1076 !kv_queue
.empty()) {
1077 dout(20) << " waiting for kv to commit" << dendl
;
1078 kv_sync_cond
.wait(l
);
1081 dout(10) << __func__
<< " done" << dendl
;
1084 int KStore::statfs(struct store_statfs_t
* buf0
, osd_alert_list_t
* alerts
)
1089 alerts
->clear(); // returns nothing for now
1091 if (::statfs(basedir
.c_str(), &buf
) < 0) {
1093 ceph_assert(r
!= -ENOENT
);
1097 buf0
->total
= buf
.f_blocks
* buf
.f_bsize
;
1098 buf0
->available
= buf
.f_bavail
* buf
.f_bsize
;
1103 ObjectStore::CollectionHandle
KStore::open_collection(const coll_t
& cid
)
1105 return _get_collection(cid
);
1108 ObjectStore::CollectionHandle
KStore::create_new_collection(const coll_t
& cid
)
1110 auto c
= ceph::make_ref
<Collection
>(this, cid
);
1111 std::unique_lock l
{coll_lock
};
1112 new_coll_map
[cid
] = c
;
1116 int KStore::pool_statfs(uint64_t pool_id
, struct store_statfs_t
*buf
,
1117 bool *per_pool_omap
)
1125 KStore::CollectionRef
KStore::_get_collection(coll_t cid
)
1127 std::shared_lock l
{coll_lock
};
1128 ceph::unordered_map
<coll_t
,CollectionRef
>::iterator cp
= coll_map
.find(cid
);
1129 if (cp
== coll_map
.end())
1130 return CollectionRef();
1134 void KStore::_queue_reap_collection(CollectionRef
& c
)
1136 dout(10) << __func__
<< " " << c
->cid
<< dendl
;
1137 std::lock_guard
<std::mutex
> l(reap_lock
);
1138 removed_collections
.push_back(c
);
1141 void KStore::_reap_collections()
1143 list
<CollectionRef
> removed_colls
;
1144 std::lock_guard
<std::mutex
> l(reap_lock
);
1145 removed_colls
.swap(removed_collections
);
1147 for (list
<CollectionRef
>::iterator p
= removed_colls
.begin();
1148 p
!= removed_colls
.end();
1150 CollectionRef c
= *p
;
1151 dout(10) << __func__
<< " " << c
->cid
<< dendl
;
1153 pair
<ghobject_t
,OnodeRef
> next
;
1154 while (c
->onode_map
.get_next(next
.first
, &next
)) {
1155 ceph_assert(!next
.second
->exists
);
1156 if (!next
.second
->flush_txns
.empty()) {
1157 dout(10) << __func__
<< " " << c
->cid
<< " " << next
.second
->oid
1158 << " flush_txns " << next
.second
->flush_txns
<< dendl
;
1163 c
->onode_map
.clear();
1164 dout(10) << __func__
<< " " << c
->cid
<< " done" << dendl
;
1167 dout(10) << __func__
<< " all reaped" << dendl
;
1173 bool KStore::exists(CollectionHandle
& ch
, const ghobject_t
& oid
)
1175 dout(10) << __func__
<< " " << ch
->cid
<< " " << oid
<< dendl
;
1176 Collection
*c
= static_cast<Collection
*>(ch
.get());
1177 std::shared_lock l
{c
->lock
};
1178 OnodeRef o
= c
->get_onode(oid
, false);
1179 if (!o
|| !o
->exists
)
1185 CollectionHandle
& ch
,
1186 const ghobject_t
& oid
,
1190 dout(10) << __func__
<< " " << ch
->cid
<< " " << oid
<< dendl
;
1191 Collection
*c
= static_cast<Collection
*>(ch
.get());
1192 std::shared_lock l
{c
->lock
};
1193 OnodeRef o
= c
->get_onode(oid
, false);
1194 if (!o
|| !o
->exists
)
1196 st
->st_size
= o
->onode
.size
;
1197 st
->st_blksize
= 4096;
1198 st
->st_blocks
= (st
->st_size
+ st
->st_blksize
- 1) / st
->st_blksize
;
1203 int KStore::set_collection_opts(
1204 CollectionHandle
& ch
,
1205 const pool_opts_t
& opts
)
1211 CollectionHandle
& ch
,
1212 const ghobject_t
& oid
,
1218 dout(15) << __func__
<< " " << ch
->cid
<< " " << oid
1219 << " " << offset
<< "~" << length
1222 Collection
*c
= static_cast<Collection
*>(ch
.get());
1223 std::shared_lock l
{c
->lock
};
1227 OnodeRef o
= c
->get_onode(oid
, false);
1228 if (!o
|| !o
->exists
) {
1233 if (offset
== length
&& offset
== 0)
1234 length
= o
->onode
.size
;
1236 r
= _do_read(o
, offset
, length
, bl
, false, op_flags
);
1239 dout(10) << __func__
<< " " << ch
->cid
<< " " << oid
1240 << " " << offset
<< "~" << length
1241 << " = " << r
<< dendl
;
1245 int KStore::_do_read(
1254 uint64_t stripe_size
= o
->onode
.stripe_size
;
1255 uint64_t stripe_off
;
1257 dout(20) << __func__
<< " " << offset
<< "~" << length
<< " size "
1258 << o
->onode
.size
<< " nid " << o
->onode
.nid
<< dendl
;
1261 if (offset
> o
->onode
.size
) {
1264 if (offset
+ length
> o
->onode
.size
) {
1265 length
= o
->onode
.size
- offset
;
1267 if (stripe_size
== 0) {
1268 bl
.append_zero(length
);
1275 stripe_off
= offset
% stripe_size
;
1276 while (length
> 0) {
1278 _do_read_stripe(o
, offset
- stripe_off
, &stripe
, do_cache
);
1279 dout(30) << __func__
<< " stripe " << offset
- stripe_off
<< " got "
1280 << stripe
.length() << dendl
;
1281 unsigned swant
= std::min
<unsigned>(stripe_size
- stripe_off
, length
);
1282 if (stripe
.length()) {
1283 if (swant
== stripe
.length()) {
1284 bl
.claim_append(stripe
);
1285 dout(30) << __func__
<< " taking full stripe" << dendl
;
1288 if (stripe_off
< stripe
.length()) {
1289 l
= std::min
<uint64_t>(stripe
.length() - stripe_off
, swant
);
1291 t
.substr_of(stripe
, stripe_off
, l
);
1293 dout(30) << __func__
<< " taking " << stripe_off
<< "~" << l
<< dendl
;
1296 bl
.append_zero(swant
- l
);
1297 dout(30) << __func__
<< " adding " << swant
- l
<< " zeros" << dendl
;
1301 dout(30) << __func__
<< " generating " << swant
<< " zeros" << dendl
;
1302 bl
.append_zero(swant
);
1309 dout(30) << " result:\n";
1318 CollectionHandle
& ch
,
1319 const ghobject_t
& oid
,
1324 map
<uint64_t, uint64_t> m
;
1325 int r
= fiemap(ch
, oid
, offset
, len
, m
);
1333 CollectionHandle
& ch
,
1334 const ghobject_t
& oid
,
1337 map
<uint64_t, uint64_t>& destmap
)
1339 CollectionRef c
= static_cast<Collection
*>(ch
.get());
1342 std::shared_lock l
{c
->lock
};
1344 OnodeRef o
= c
->get_onode(oid
, false);
1345 if (!o
|| !o
->exists
) {
1349 if (offset
> o
->onode
.size
)
1352 if (offset
+ len
> o
->onode
.size
) {
1353 len
= o
->onode
.size
- offset
;
1356 dout(20) << __func__
<< " " << offset
<< "~" << len
<< " size "
1357 << o
->onode
.size
<< dendl
;
1359 // FIXME: do something smarter here
1360 destmap
[0] = o
->onode
.size
;
1363 dout(20) << __func__
<< " " << offset
<< "~" << len
1364 << " size = 0 (" << destmap
<< ")" << dendl
;
1368 int KStore::getattr(
1369 CollectionHandle
& ch
,
1370 const ghobject_t
& oid
,
1374 dout(15) << __func__
<< " " << ch
->cid
<< " " << oid
<< " " << name
<< dendl
;
1375 Collection
*c
= static_cast<Collection
*>(ch
.get());
1376 std::shared_lock l
{c
->lock
};
1380 OnodeRef o
= c
->get_onode(oid
, false);
1381 if (!o
|| !o
->exists
) {
1386 if (!o
->onode
.attrs
.count(k
)) {
1390 value
= o
->onode
.attrs
[k
];
1393 dout(10) << __func__
<< " " << ch
->cid
<< " " << oid
<< " " << name
1394 << " = " << r
<< dendl
;
1398 int KStore::getattrs(
1399 CollectionHandle
& ch
,
1400 const ghobject_t
& oid
,
1401 map
<string
,bufferptr
>& aset
)
1403 dout(15) << __func__
<< " " << ch
->cid
<< " " << oid
<< dendl
;
1404 Collection
*c
= static_cast<Collection
*>(ch
.get());
1405 std::shared_lock l
{c
->lock
};
1408 OnodeRef o
= c
->get_onode(oid
, false);
1409 if (!o
|| !o
->exists
) {
1413 aset
= o
->onode
.attrs
;
1416 dout(10) << __func__
<< " " << ch
->cid
<< " " << oid
1417 << " = " << r
<< dendl
;
1421 int KStore::list_collections(vector
<coll_t
>& ls
)
1423 std::shared_lock l
{coll_lock
};
1424 for (ceph::unordered_map
<coll_t
, CollectionRef
>::iterator p
= coll_map
.begin();
1425 p
!= coll_map
.end();
1427 ls
.push_back(p
->first
);
1431 bool KStore::collection_exists(const coll_t
& c
)
1433 std::shared_lock l
{coll_lock
};
1434 return coll_map
.count(c
);
1437 int KStore::collection_empty(CollectionHandle
& ch
, bool *empty
)
1439 dout(15) << __func__
<< " " << ch
->cid
<< dendl
;
1440 vector
<ghobject_t
> ls
;
1442 int r
= collection_list(ch
, ghobject_t(), ghobject_t::get_max(), 1,
1445 derr
<< __func__
<< " collection_list returned: " << cpp_strerror(r
)
1449 *empty
= ls
.empty();
1450 dout(10) << __func__
<< " " << ch
->cid
<< " = " << (int)(*empty
) << dendl
;
1454 int KStore::collection_bits(CollectionHandle
& ch
)
1456 dout(15) << __func__
<< " " << ch
->cid
<< dendl
;
1457 Collection
*c
= static_cast<Collection
*>(ch
.get());
1458 std::shared_lock l
{c
->lock
};
1459 dout(10) << __func__
<< " " << ch
->cid
<< " = " << c
->cnode
.bits
<< dendl
;
1460 return c
->cnode
.bits
;
1463 int KStore::collection_list(
1464 CollectionHandle
&c_
, const ghobject_t
& start
, const ghobject_t
& end
, int max
,
1465 vector
<ghobject_t
> *ls
, ghobject_t
*pnext
)
1468 Collection
*c
= static_cast<Collection
*>(c_
.get());
1470 dout(15) << __func__
<< " " << c
->cid
1471 << " start " << start
<< " end " << end
<< " max " << max
<< dendl
;
1474 std::shared_lock l
{c
->lock
};
1475 r
= _collection_list(c
, start
, end
, max
, ls
, pnext
);
1478 dout(10) << __func__
<< " " << c
->cid
1479 << " start " << start
<< " end " << end
<< " max " << max
1480 << " = " << r
<< ", ls.size() = " << ls
->size()
1481 << ", next = " << (pnext
? *pnext
: ghobject_t()) << dendl
;
1485 int KStore::_collection_list(
1486 Collection
* c
, const ghobject_t
& start
, const ghobject_t
& end
, int max
,
1487 vector
<ghobject_t
> *ls
, ghobject_t
*pnext
)
1490 KeyValueDB::Iterator it
;
1491 string temp_start_key
, temp_end_key
;
1492 string start_key
, end_key
;
1493 bool set_next
= false;
1497 ghobject_t static_next
;
1499 pnext
= &static_next
;
1501 if (start
== ghobject_t::get_max() ||
1502 start
.hobj
.is_max()) {
1505 get_coll_key_range(c
->cid
, c
->cnode
.bits
, &temp_start_key
, &temp_end_key
,
1506 &start_key
, &end_key
);
1507 dout(20) << __func__
1508 << " range " << pretty_binary_string(temp_start_key
)
1509 << " to " << pretty_binary_string(temp_end_key
)
1510 << " and " << pretty_binary_string(start_key
)
1511 << " to " << pretty_binary_string(end_key
)
1512 << " start " << start
<< dendl
;
1513 it
= db
->get_iterator(PREFIX_OBJ
);
1514 if (start
== ghobject_t() || start
== c
->cid
.get_min_hobj()) {
1515 it
->upper_bound(temp_start_key
);
1519 get_object_key(cct
, start
, &k
);
1520 if (start
.hobj
.is_temp()) {
1522 ceph_assert(k
>= temp_start_key
&& k
< temp_end_key
);
1525 ceph_assert(k
>= start_key
&& k
< end_key
);
1527 dout(20) << " start from " << pretty_binary_string(k
)
1528 << " temp=" << (int)temp
<< dendl
;
1531 if (end
.hobj
.is_max()) {
1532 pend
= temp
? temp_end_key
: end_key
;
1534 get_object_key(cct
, end
, &end_key
);
1535 if (end
.hobj
.is_temp()) {
1541 pend
= temp
? temp_end_key
: end_key
;
1544 dout(20) << __func__
<< " pend " << pretty_binary_string(pend
) << dendl
;
1546 if (!it
->valid() || it
->key() >= pend
) {
1548 dout(20) << __func__
<< " iterator not valid (end of db?)" << dendl
;
1550 dout(20) << __func__
<< " key " << pretty_binary_string(it
->key())
1551 << " > " << end
<< dendl
;
1553 if (end
.hobj
.is_temp()) {
1556 dout(30) << __func__
<< " switch to non-temp namespace" << dendl
;
1558 it
->upper_bound(start_key
);
1560 dout(30) << __func__
<< " pend " << pretty_binary_string(pend
) << dendl
;
1565 dout(20) << __func__
<< " key " << pretty_binary_string(it
->key()) << dendl
;
1567 int r
= get_key_object(it
->key(), &oid
);
1568 ceph_assert(r
== 0);
1569 if (ls
->size() >= (unsigned)max
) {
1570 dout(20) << __func__
<< " reached max " << max
<< dendl
;
1580 *pnext
= ghobject_t::get_max();
1587 KStore::OmapIteratorImpl::OmapIteratorImpl(
1588 CollectionRef c
, OnodeRef o
, KeyValueDB::Iterator it
)
1589 : c(c
), o(o
), it(it
)
1591 std::shared_lock l
{c
->lock
};
1592 if (o
->onode
.omap_head
) {
1593 get_omap_key(o
->onode
.omap_head
, string(), &head
);
1594 get_omap_tail(o
->onode
.omap_head
, &tail
);
1595 it
->lower_bound(head
);
1599 int KStore::OmapIteratorImpl::seek_to_first()
1601 std::shared_lock l
{c
->lock
};
1602 if (o
->onode
.omap_head
) {
1603 it
->lower_bound(head
);
1605 it
= KeyValueDB::Iterator();
1610 int KStore::OmapIteratorImpl::upper_bound(const string
& after
)
1612 std::shared_lock l
{c
->lock
};
1613 if (o
->onode
.omap_head
) {
1615 get_omap_key(o
->onode
.omap_head
, after
, &key
);
1616 it
->upper_bound(key
);
1618 it
= KeyValueDB::Iterator();
1623 int KStore::OmapIteratorImpl::lower_bound(const string
& to
)
1625 std::shared_lock l
{c
->lock
};
1626 if (o
->onode
.omap_head
) {
1628 get_omap_key(o
->onode
.omap_head
, to
, &key
);
1629 it
->lower_bound(key
);
1631 it
= KeyValueDB::Iterator();
1636 bool KStore::OmapIteratorImpl::valid()
1638 std::shared_lock l
{c
->lock
};
1639 if (o
->onode
.omap_head
&& it
->valid() && it
->raw_key().second
<= tail
) {
1646 int KStore::OmapIteratorImpl::next()
1648 std::shared_lock l
{c
->lock
};
1649 if (o
->onode
.omap_head
) {
1657 string
KStore::OmapIteratorImpl::key()
1659 std::shared_lock l
{c
->lock
};
1660 ceph_assert(it
->valid());
1661 string db_key
= it
->raw_key().second
;
1663 decode_omap_key(db_key
, &user_key
);
1667 bufferlist
KStore::OmapIteratorImpl::value()
1669 std::shared_lock l
{c
->lock
};
1670 ceph_assert(it
->valid());
1674 int KStore::omap_get(
1675 CollectionHandle
& ch
, ///< [in] Collection containing oid
1676 const ghobject_t
&oid
, ///< [in] Object containing omap
1677 bufferlist
*header
, ///< [out] omap header
1678 map
<string
, bufferlist
> *out
/// < [out] Key to value map
1681 dout(15) << __func__
<< " " << ch
->cid
<< " oid " << oid
<< dendl
;
1682 Collection
*c
= static_cast<Collection
*>(ch
.get());
1683 std::shared_lock l
{c
->lock
};
1685 OnodeRef o
= c
->get_onode(oid
, false);
1686 if (!o
|| !o
->exists
) {
1690 if (!o
->onode
.omap_head
)
1694 KeyValueDB::Iterator it
= db
->get_iterator(PREFIX_OMAP
);
1696 get_omap_header(o
->onode
.omap_head
, &head
);
1697 get_omap_tail(o
->onode
.omap_head
, &tail
);
1698 it
->lower_bound(head
);
1699 while (it
->valid()) {
1700 if (it
->key() == head
) {
1701 dout(30) << __func__
<< " got header" << dendl
;
1702 *header
= it
->value();
1703 } else if (it
->key() >= tail
) {
1704 dout(30) << __func__
<< " reached tail" << dendl
;
1708 decode_omap_key(it
->key(), &user_key
);
1709 dout(30) << __func__
<< " got " << pretty_binary_string(it
->key())
1710 << " -> " << user_key
<< dendl
;
1711 ceph_assert(it
->key() < tail
);
1712 (*out
)[user_key
] = it
->value();
1718 dout(10) << __func__
<< " " << ch
->cid
<< " oid " << oid
<< " = " << r
<< dendl
;
1722 int KStore::omap_get_header(
1723 CollectionHandle
& ch
, ///< [in] Collection containing oid
1724 const ghobject_t
&oid
, ///< [in] Object containing omap
1725 bufferlist
*header
, ///< [out] omap header
1726 bool allow_eio
///< [in] don't assert on eio
1729 dout(15) << __func__
<< " " << ch
->cid
<< " oid " << oid
<< dendl
;
1730 Collection
*c
= static_cast<Collection
*>(ch
.get());
1731 std::shared_lock l
{c
->lock
};
1733 OnodeRef o
= c
->get_onode(oid
, false);
1734 if (!o
|| !o
->exists
) {
1738 if (!o
->onode
.omap_head
)
1743 get_omap_header(o
->onode
.omap_head
, &head
);
1744 if (db
->get(PREFIX_OMAP
, head
, header
) >= 0) {
1745 dout(30) << __func__
<< " got header" << dendl
;
1747 dout(30) << __func__
<< " no header" << dendl
;
1751 dout(10) << __func__
<< " " << ch
->cid
<< " oid " << oid
<< " = " << r
<< dendl
;
1755 int KStore::omap_get_keys(
1756 CollectionHandle
& ch
, ///< [in] Collection containing oid
1757 const ghobject_t
&oid
, ///< [in] Object containing omap
1758 set
<string
> *keys
///< [out] Keys defined on oid
1761 dout(15) << __func__
<< " " << ch
->cid
<< " oid " << oid
<< dendl
;
1762 Collection
*c
= static_cast<Collection
*>(ch
.get());
1763 std::shared_lock l
{c
->lock
};
1765 OnodeRef o
= c
->get_onode(oid
, false);
1766 if (!o
|| !o
->exists
) {
1770 if (!o
->onode
.omap_head
)
1774 KeyValueDB::Iterator it
= db
->get_iterator(PREFIX_OMAP
);
1776 get_omap_key(o
->onode
.omap_head
, string(), &head
);
1777 get_omap_tail(o
->onode
.omap_head
, &tail
);
1778 it
->lower_bound(head
);
1779 while (it
->valid()) {
1780 if (it
->key() >= tail
) {
1781 dout(30) << __func__
<< " reached tail" << dendl
;
1785 decode_omap_key(it
->key(), &user_key
);
1786 dout(30) << __func__
<< " got " << pretty_binary_string(it
->key())
1787 << " -> " << user_key
<< dendl
;
1788 ceph_assert(it
->key() < tail
);
1789 keys
->insert(user_key
);
1794 dout(10) << __func__
<< " " << ch
->cid
<< " oid " << oid
<< " = " << r
<< dendl
;
1798 int KStore::omap_get_values(
1799 CollectionHandle
& ch
, ///< [in] Collection containing oid
1800 const ghobject_t
&oid
, ///< [in] Object containing omap
1801 const set
<string
> &keys
, ///< [in] Keys to get
1802 map
<string
, bufferlist
> *out
///< [out] Returned keys and values
1805 dout(15) << __func__
<< " " << ch
->cid
<< " oid " << oid
<< dendl
;
1806 Collection
*c
= static_cast<Collection
*>(ch
.get());
1807 std::shared_lock l
{c
->lock
};
1809 OnodeRef o
= c
->get_onode(oid
, false);
1810 if (!o
|| !o
->exists
) {
1814 if (!o
->onode
.omap_head
)
1817 for (set
<string
>::const_iterator p
= keys
.begin(); p
!= keys
.end(); ++p
) {
1819 get_omap_key(o
->onode
.omap_head
, *p
, &key
);
1821 if (db
->get(PREFIX_OMAP
, key
, &val
) >= 0) {
1822 dout(30) << __func__
<< " got " << pretty_binary_string(key
)
1823 << " -> " << *p
<< dendl
;
1824 out
->insert(make_pair(*p
, val
));
1828 dout(10) << __func__
<< " " << ch
->cid
<< " oid " << oid
<< " = " << r
<< dendl
;
1832 int KStore::omap_check_keys(
1833 CollectionHandle
& ch
, ///< [in] Collection containing oid
1834 const ghobject_t
&oid
, ///< [in] Object containing omap
1835 const set
<string
> &keys
, ///< [in] Keys to check
1836 set
<string
> *out
///< [out] Subset of keys defined on oid
1839 dout(15) << __func__
<< " " << ch
->cid
<< " oid " << oid
<< dendl
;
1840 Collection
*c
= static_cast<Collection
*>(ch
.get());
1841 std::shared_lock l
{c
->lock
};
1843 OnodeRef o
= c
->get_onode(oid
, false);
1844 if (!o
|| !o
->exists
) {
1848 if (!o
->onode
.omap_head
)
1851 for (set
<string
>::const_iterator p
= keys
.begin(); p
!= keys
.end(); ++p
) {
1853 get_omap_key(o
->onode
.omap_head
, *p
, &key
);
1855 if (db
->get(PREFIX_OMAP
, key
, &val
) >= 0) {
1856 dout(30) << __func__
<< " have " << pretty_binary_string(key
)
1857 << " -> " << *p
<< dendl
;
1860 dout(30) << __func__
<< " miss " << pretty_binary_string(key
)
1861 << " -> " << *p
<< dendl
;
1865 dout(10) << __func__
<< " " << ch
->cid
<< " oid " << oid
<< " = " << r
<< dendl
;
1869 ObjectMap::ObjectMapIterator
KStore::get_omap_iterator(
1870 CollectionHandle
& ch
, ///< [in] collection
1871 const ghobject_t
&oid
///< [in] object
1875 dout(10) << __func__
<< " " << ch
->cid
<< " " << oid
<< dendl
;
1876 Collection
*c
= static_cast<Collection
*>(ch
.get());
1877 std::shared_lock l
{c
->lock
};
1878 OnodeRef o
= c
->get_onode(oid
, false);
1879 if (!o
|| !o
->exists
) {
1880 dout(10) << __func__
<< " " << oid
<< "doesn't exist" <<dendl
;
1881 return ObjectMap::ObjectMapIterator();
1884 dout(10) << __func__
<< " header = " << o
->onode
.omap_head
<<dendl
;
1885 KeyValueDB::Iterator it
= db
->get_iterator(PREFIX_OMAP
);
1886 return ObjectMap::ObjectMapIterator(new OmapIteratorImpl(c
, o
, it
));
1890 // -----------------
1893 int KStore::_open_super_meta()
1899 db
->get(PREFIX_SUPER
, "nid_max", &bl
);
1900 auto p
= bl
.cbegin();
1903 } catch (buffer::error
& e
) {
1905 dout(10) << __func__
<< " old nid_max " << nid_max
<< dendl
;
1911 void KStore::_assign_nid(TransContext
*txc
, OnodeRef o
)
1915 std::lock_guard
<std::mutex
> l(nid_lock
);
1916 o
->onode
.nid
= ++nid_last
;
1917 dout(20) << __func__
<< " " << o
->oid
<< " nid " << o
->onode
.nid
<< dendl
;
1918 if (nid_last
> nid_max
) {
1919 nid_max
+= cct
->_conf
->kstore_nid_prealloc
;
1921 encode(nid_max
, bl
);
1922 txc
->t
->set(PREFIX_SUPER
, "nid_max", bl
);
1923 dout(10) << __func__
<< " nid_max now " << nid_max
<< dendl
;
1927 KStore::TransContext
*KStore::_txc_create(OpSequencer
*osr
)
1929 TransContext
*txc
= new TransContext(osr
);
1930 txc
->t
= db
->get_transaction();
1931 osr
->queue_new(txc
);
1932 dout(20) << __func__
<< " osr " << osr
<< " = " << txc
<< dendl
;
1936 void KStore::_txc_state_proc(TransContext
*txc
)
1939 dout(10) << __func__
<< " txc " << txc
1940 << " " << txc
->get_state_name() << dendl
;
1941 switch (txc
->state
) {
1942 case TransContext::STATE_PREPARE
:
1943 txc
->log_state_latency(logger
, l_kstore_state_prepare_lat
);
1944 txc
->state
= TransContext::STATE_KV_QUEUED
;
1945 if (!cct
->_conf
->kstore_sync_transaction
) {
1946 std::lock_guard
<std::mutex
> l(kv_lock
);
1947 if (cct
->_conf
->kstore_sync_submit_transaction
) {
1948 int r
= db
->submit_transaction(txc
->t
);
1949 ceph_assert(r
== 0);
1951 kv_queue
.push_back(txc
);
1952 kv_cond
.notify_one();
1956 int r
= db
->submit_transaction_sync(txc
->t
);
1957 ceph_assert(r
== 0);
1961 case TransContext::STATE_KV_QUEUED
:
1962 txc
->log_state_latency(logger
, l_kstore_state_kv_queued_lat
);
1963 txc
->state
= TransContext::STATE_KV_DONE
;
1964 _txc_finish_kv(txc
);
1967 case TransContext::STATE_KV_DONE
:
1968 txc
->log_state_latency(logger
, l_kstore_state_kv_done_lat
);
1969 txc
->state
= TransContext::STATE_FINISHING
;
1972 case TransContext::TransContext::STATE_FINISHING
:
1973 txc
->log_state_latency(logger
, l_kstore_state_finishing_lat
);
1978 derr
<< __func__
<< " unexpected txc " << txc
1979 << " state " << txc
->get_state_name() << dendl
;
1980 ceph_abort_msg("unexpected txc state");
1986 void KStore::_txc_finalize(OpSequencer
*osr
, TransContext
*txc
)
1988 dout(20) << __func__
<< " osr " << osr
<< " txc " << txc
1989 << " onodes " << txc
->onodes
<< dendl
;
1992 for (set
<OnodeRef
>::iterator p
= txc
->onodes
.begin();
1993 p
!= txc
->onodes
.end();
1996 encode((*p
)->onode
, bl
);
1997 dout(20) << " onode size is " << bl
.length() << dendl
;
1998 txc
->t
->set(PREFIX_OBJ
, (*p
)->key
, bl
);
2000 std::lock_guard
<std::mutex
> l((*p
)->flush_lock
);
2001 (*p
)->flush_txns
.insert(txc
);
2005 void KStore::_txc_finish_kv(TransContext
*txc
)
2007 dout(20) << __func__
<< " txc " << txc
<< dendl
;
2009 // warning: we're calling onreadable_sync inside the sequencer lock
2010 if (txc
->onreadable_sync
) {
2011 txc
->onreadable_sync
->complete(0);
2012 txc
->onreadable_sync
= NULL
;
2014 if (txc
->onreadable
) {
2015 finisher
.queue(txc
->onreadable
);
2016 txc
->onreadable
= NULL
;
2018 if (txc
->oncommit
) {
2019 finisher
.queue(txc
->oncommit
);
2020 txc
->oncommit
= NULL
;
2022 if (!txc
->oncommits
.empty()) {
2023 finisher
.queue(txc
->oncommits
);
2026 throttle_ops
.put(txc
->ops
);
2027 throttle_bytes
.put(txc
->bytes
);
2030 void KStore::_txc_finish(TransContext
*txc
)
2032 dout(20) << __func__
<< " " << txc
<< " onodes " << txc
->onodes
<< dendl
;
2033 ceph_assert(txc
->state
== TransContext::STATE_FINISHING
);
2035 for (set
<OnodeRef
>::iterator p
= txc
->onodes
.begin();
2036 p
!= txc
->onodes
.end();
2038 std::lock_guard
<std::mutex
> l((*p
)->flush_lock
);
2039 dout(20) << __func__
<< " onode " << *p
<< " had " << (*p
)->flush_txns
2041 ceph_assert((*p
)->flush_txns
.count(txc
));
2042 (*p
)->flush_txns
.erase(txc
);
2043 if ((*p
)->flush_txns
.empty()) {
2044 (*p
)->flush_cond
.notify_all();
2045 (*p
)->clear_pending_stripes();
2050 txc
->onodes
.clear();
2052 while (!txc
->removed_collections
.empty()) {
2053 _queue_reap_collection(txc
->removed_collections
.front());
2054 txc
->removed_collections
.pop_front();
2057 OpSequencerRef osr
= txc
->osr
;
2059 std::lock_guard
<std::mutex
> l(osr
->qlock
);
2060 txc
->state
= TransContext::STATE_DONE
;
2063 _osr_reap_done(osr
.get());
2066 void KStore::_osr_reap_done(OpSequencer
*osr
)
2068 std::lock_guard
<std::mutex
> l(osr
->qlock
);
2069 dout(20) << __func__
<< " osr " << osr
<< dendl
;
2070 while (!osr
->q
.empty()) {
2071 TransContext
*txc
= &osr
->q
.front();
2072 dout(20) << __func__
<< " txc " << txc
<< " " << txc
->get_state_name()
2074 if (txc
->state
!= TransContext::STATE_DONE
) {
2078 if (txc
->first_collection
) {
2079 txc
->first_collection
->onode_map
.trim(cct
->_conf
->kstore_onode_map_size
);
2083 txc
->log_state_latency(logger
, l_kstore_state_done_lat
);
2085 osr
->qcond
.notify_all();
2087 dout(20) << __func__
<< " osr " << osr
<< " q now empty" << dendl
;
2091 void KStore::_kv_sync_thread()
2093 dout(10) << __func__
<< " start" << dendl
;
2094 std::unique_lock
<std::mutex
> l(kv_lock
);
2096 ceph_assert(kv_committing
.empty());
2097 if (kv_queue
.empty()) {
2100 dout(20) << __func__
<< " sleep" << dendl
;
2101 kv_sync_cond
.notify_all();
2103 dout(20) << __func__
<< " wake" << dendl
;
2105 dout(20) << __func__
<< " committing " << kv_queue
.size() << dendl
;
2106 kv_committing
.swap(kv_queue
);
2107 utime_t start
= ceph_clock_now();
2110 dout(30) << __func__
<< " committing txc " << kv_committing
<< dendl
;
2112 // one transaction to force a sync
2113 KeyValueDB::Transaction t
= db
->get_transaction();
2114 if (!cct
->_conf
->kstore_sync_submit_transaction
) {
2115 for (std::deque
<TransContext
*>::iterator it
= kv_committing
.begin();
2116 it
!= kv_committing
.end();
2118 int r
= db
->submit_transaction((*it
)->t
);
2119 ceph_assert(r
== 0);
2122 int r
= db
->submit_transaction_sync(t
);
2123 ceph_assert(r
== 0);
2124 utime_t finish
= ceph_clock_now();
2125 utime_t dur
= finish
- start
;
2126 dout(20) << __func__
<< " committed " << kv_committing
.size()
2127 << " in " << dur
<< dendl
;
2128 while (!kv_committing
.empty()) {
2129 TransContext
*txc
= kv_committing
.front();
2130 _txc_state_proc(txc
);
2131 kv_committing
.pop_front();
2134 // this is as good a place as any ...
2135 _reap_collections();
2140 dout(10) << __func__
<< " finish" << dendl
;
2144 // ---------------------------
2147 int KStore::queue_transactions(
2148 CollectionHandle
& ch
,
2149 vector
<Transaction
>& tls
,
2151 ThreadPool::TPHandle
*handle
)
2153 Context
*onreadable
;
2155 Context
*onreadable_sync
;
2156 ObjectStore::Transaction::collect_contexts(
2157 tls
, &onreadable
, &ondisk
, &onreadable_sync
);
2159 // set up the sequencer
2160 Collection
*c
= static_cast<Collection
*>(ch
.get());
2161 OpSequencer
*osr
= c
->osr
.get();
2162 dout(10) << __func__
<< " ch " << ch
.get() << " " << c
->cid
<< dendl
;
2165 TransContext
*txc
= _txc_create(osr
);
2166 txc
->onreadable
= onreadable
;
2167 txc
->onreadable_sync
= onreadable_sync
;
2168 txc
->oncommit
= ondisk
;
2170 for (vector
<Transaction
>::iterator p
= tls
.begin(); p
!= tls
.end(); ++p
) {
2171 txc
->ops
+= (*p
).get_num_ops();
2172 txc
->bytes
+= (*p
).get_num_bytes();
2173 _txc_add_transaction(txc
, &(*p
));
2176 _txc_finalize(osr
, txc
);
2178 throttle_ops
.get(txc
->ops
);
2179 throttle_bytes
.get(txc
->bytes
);
2182 _txc_state_proc(txc
);
2186 void KStore::_txc_add_transaction(TransContext
*txc
, Transaction
*t
)
2188 Transaction::iterator i
= t
->begin();
2190 dout(30) << __func__
<< " transaction dump:\n";
2191 JSONFormatter
f(true);
2192 f
.open_object_section("transaction");
2198 vector
<CollectionRef
> cvec(i
.colls
.size());
2200 for (vector
<coll_t
>::iterator p
= i
.colls
.begin(); p
!= i
.colls
.end();
2202 cvec
[j
] = _get_collection(*p
);
2204 // note first collection we reference
2205 if (!j
&& !txc
->first_collection
)
2206 txc
->first_collection
= cvec
[j
];
2208 vector
<OnodeRef
> ovec(i
.objects
.size());
2210 for (int pos
= 0; i
.have_op(); ++pos
) {
2211 Transaction::Op
*op
= i
.decode_op();
2215 if (op
->op
== Transaction::OP_NOP
)
2218 // collection operations
2219 CollectionRef
&c
= cvec
[op
->cid
];
2221 case Transaction::OP_RMCOLL
:
2223 coll_t cid
= i
.get_cid(op
->cid
);
2224 r
= _remove_collection(txc
, cid
, &c
);
2230 case Transaction::OP_MKCOLL
:
2233 coll_t cid
= i
.get_cid(op
->cid
);
2234 r
= _create_collection(txc
, cid
, op
->split_bits
, &c
);
2240 case Transaction::OP_SPLIT_COLLECTION
:
2241 ceph_abort_msg("deprecated");
2244 case Transaction::OP_SPLIT_COLLECTION2
:
2246 uint32_t bits
= op
->split_bits
;
2247 uint32_t rem
= op
->split_rem
;
2248 r
= _split_collection(txc
, c
, cvec
[op
->dest_cid
], bits
, rem
);
2254 case Transaction::OP_MERGE_COLLECTION
:
2256 uint32_t bits
= op
->split_bits
;
2257 r
= _merge_collection(txc
, &c
, cvec
[op
->dest_cid
], bits
);
2263 case Transaction::OP_COLL_HINT
:
2265 uint32_t type
= op
->hint_type
;
2268 auto hiter
= hint
.cbegin();
2269 if (type
== Transaction::COLL_HINT_EXPECTED_NUM_OBJECTS
) {
2272 decode(pg_num
, hiter
);
2273 decode(num_objs
, hiter
);
2274 dout(10) << __func__
<< " collection hint objects is a no-op, "
2275 << " pg_num " << pg_num
<< " num_objects " << num_objs
2279 dout(10) << __func__
<< " unknown collection hint " << type
<< dendl
;
2285 case Transaction::OP_COLL_SETATTR
:
2289 case Transaction::OP_COLL_RMATTR
:
2293 case Transaction::OP_COLL_RENAME
:
2294 ceph_abort_msg("not implemented");
2298 derr
<< " error " << cpp_strerror(r
)
2299 << " not handled on operation " << op
->op
2300 << " (op " << pos
<< ", counting from 0)" << dendl
;
2301 dout(0) << " transaction dump:\n";
2302 JSONFormatter
f(true);
2303 f
.open_object_section("transaction");
2308 ceph_abort_msg("unexpected error");
2311 // object operations
2312 std::unique_lock l
{c
->lock
};
2313 OnodeRef
&o
= ovec
[op
->oid
];
2315 // these operations implicity create the object
2316 bool create
= false;
2317 if (op
->op
== Transaction::OP_TOUCH
||
2318 op
->op
== Transaction::OP_CREATE
||
2319 op
->op
== Transaction::OP_WRITE
||
2320 op
->op
== Transaction::OP_ZERO
) {
2323 ghobject_t oid
= i
.get_oid(op
->oid
);
2324 o
= c
->get_onode(oid
, create
);
2326 if (!o
|| !o
->exists
) {
2327 dout(10) << __func__
<< " op " << op
->op
<< " got ENOENT on "
2336 case Transaction::OP_TOUCH
:
2337 case Transaction::OP_CREATE
:
2338 r
= _touch(txc
, c
, o
);
2341 case Transaction::OP_WRITE
:
2343 uint64_t off
= op
->off
;
2344 uint64_t len
= op
->len
;
2345 uint32_t fadvise_flags
= i
.get_fadvise_flags();
2348 r
= _write(txc
, c
, o
, off
, len
, bl
, fadvise_flags
);
2352 case Transaction::OP_ZERO
:
2354 uint64_t off
= op
->off
;
2355 uint64_t len
= op
->len
;
2356 r
= _zero(txc
, c
, o
, off
, len
);
2360 case Transaction::OP_TRIMCACHE
:
2362 // deprecated, no-op
2366 case Transaction::OP_TRUNCATE
:
2368 uint64_t off
= op
->off
;
2369 r
= _truncate(txc
, c
, o
, off
);
2373 case Transaction::OP_REMOVE
:
2374 r
= _remove(txc
, c
, o
);
2377 case Transaction::OP_SETATTR
:
2379 string name
= i
.decode_string();
2382 map
<string
, bufferptr
> to_set
;
2383 to_set
[name
] = bufferptr(bl
.c_str(), bl
.length());
2384 r
= _setattrs(txc
, c
, o
, to_set
);
2388 case Transaction::OP_SETATTRS
:
2390 map
<string
, bufferptr
> aset
;
2391 i
.decode_attrset(aset
);
2392 r
= _setattrs(txc
, c
, o
, aset
);
2396 case Transaction::OP_RMATTR
:
2398 string name
= i
.decode_string();
2399 r
= _rmattr(txc
, c
, o
, name
);
2403 case Transaction::OP_RMATTRS
:
2405 r
= _rmattrs(txc
, c
, o
);
2409 case Transaction::OP_CLONE
:
2411 const ghobject_t
& noid
= i
.get_oid(op
->dest_oid
);
2412 OnodeRef no
= c
->get_onode(noid
, true);
2413 r
= _clone(txc
, c
, o
, no
);
2417 case Transaction::OP_CLONERANGE
:
2418 ceph_abort_msg("deprecated");
2421 case Transaction::OP_CLONERANGE2
:
2423 const ghobject_t
& noid
= i
.get_oid(op
->dest_oid
);
2424 OnodeRef no
= c
->get_onode(noid
, true);
2425 uint64_t srcoff
= op
->off
;
2426 uint64_t len
= op
->len
;
2427 uint64_t dstoff
= op
->dest_off
;
2428 r
= _clone_range(txc
, c
, o
, no
, srcoff
, len
, dstoff
);
2432 case Transaction::OP_COLL_ADD
:
2433 ceph_abort_msg("not implemented");
2436 case Transaction::OP_COLL_REMOVE
:
2437 ceph_abort_msg("not implemented");
2440 case Transaction::OP_COLL_MOVE
:
2441 ceph_abort_msg("deprecated");
2444 case Transaction::OP_COLL_MOVE_RENAME
:
2446 ceph_assert(op
->cid
== op
->dest_cid
);
2447 const ghobject_t
& noid
= i
.get_oid(op
->dest_oid
);
2448 OnodeRef no
= c
->get_onode(noid
, true);
2449 r
= _rename(txc
, c
, o
, no
, noid
);
2454 case Transaction::OP_TRY_RENAME
:
2456 const ghobject_t
& noid
= i
.get_oid(op
->dest_oid
);
2457 OnodeRef no
= c
->get_onode(noid
, true);
2458 r
= _rename(txc
, c
, o
, no
, noid
);
2465 case Transaction::OP_OMAP_CLEAR
:
2467 r
= _omap_clear(txc
, c
, o
);
2470 case Transaction::OP_OMAP_SETKEYS
:
2473 i
.decode_attrset_bl(&aset_bl
);
2474 r
= _omap_setkeys(txc
, c
, o
, aset_bl
);
2477 case Transaction::OP_OMAP_RMKEYS
:
2480 i
.decode_keyset_bl(&keys_bl
);
2481 r
= _omap_rmkeys(txc
, c
, o
, keys_bl
);
2484 case Transaction::OP_OMAP_RMKEYRANGE
:
2487 first
= i
.decode_string();
2488 last
= i
.decode_string();
2489 r
= _omap_rmkey_range(txc
, c
, o
, first
, last
);
2492 case Transaction::OP_OMAP_SETHEADER
:
2496 r
= _omap_setheader(txc
, c
, o
, bl
);
2500 case Transaction::OP_SETALLOCHINT
:
2502 uint64_t expected_object_size
= op
->expected_object_size
;
2503 uint64_t expected_write_size
= op
->expected_write_size
;
2504 uint32_t flags
= op
->alloc_hint_flags
;
2505 r
= _setallochint(txc
, c
, o
,
2506 expected_object_size
,
2507 expected_write_size
,
2513 derr
<< "bad op " << op
->op
<< dendl
;
2521 if (r
== -ENOENT
&& !(op
->op
== Transaction::OP_CLONERANGE
||
2522 op
->op
== Transaction::OP_CLONE
||
2523 op
->op
== Transaction::OP_CLONERANGE2
||
2524 op
->op
== Transaction::OP_COLL_ADD
))
2525 // -ENOENT is usually okay
2531 const char *msg
= "unexpected error code";
2533 if (r
== -ENOENT
&& (op
->op
== Transaction::OP_CLONERANGE
||
2534 op
->op
== Transaction::OP_CLONE
||
2535 op
->op
== Transaction::OP_CLONERANGE2
))
2536 msg
= "ENOENT on clone suggests osd bug";
2539 // For now, if we hit _any_ ENOSPC, crash, before we do any damage
2540 // by partially applying transactions.
2541 msg
= "ENOSPC from key value store, misconfigured cluster";
2543 if (r
== -ENOTEMPTY
) {
2544 msg
= "ENOTEMPTY suggests garbage data in osd data dir";
2547 dout(0) << " error " << cpp_strerror(r
) << " not handled on operation " << op
->op
2548 << " (op " << pos
<< ", counting from 0)" << dendl
;
2549 dout(0) << msg
<< dendl
;
2550 dout(0) << " transaction dump:\n";
2551 JSONFormatter
f(true);
2552 f
.open_object_section("transaction");
2557 ceph_abort_msg("unexpected error");
2565 // -----------------
2568 int KStore::_touch(TransContext
*txc
,
2572 dout(15) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< dendl
;
2575 _assign_nid(txc
, o
);
2576 txc
->write_onode(o
);
2577 dout(10) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< " = " << r
<< dendl
;
2581 void KStore::_dump_onode(OnodeRef o
)
2583 dout(30) << __func__
<< " " << o
2584 << " nid " << o
->onode
.nid
2585 << " size " << o
->onode
.size
2586 << " expected_object_size " << o
->onode
.expected_object_size
2587 << " expected_write_size " << o
->onode
.expected_write_size
2589 for (map
<string
,bufferptr
>::iterator p
= o
->onode
.attrs
.begin();
2590 p
!= o
->onode
.attrs
.end();
2592 dout(30) << __func__
<< " attr " << p
->first
2593 << " len " << p
->second
.length() << dendl
;
2597 void KStore::_do_read_stripe(OnodeRef o
, uint64_t offset
, bufferlist
*pbl
, bool do_cache
)
2601 get_data_key(o
->onode
.nid
, offset
, &key
);
2602 db
->get(PREFIX_DATA
, key
, pbl
);
2606 map
<uint64_t,bufferlist
>::iterator p
= o
->pending_stripes
.find(offset
);
2607 if (p
== o
->pending_stripes
.end()) {
2609 get_data_key(o
->onode
.nid
, offset
, &key
);
2610 db
->get(PREFIX_DATA
, key
, pbl
);
2611 o
->pending_stripes
[offset
] = *pbl
;
2617 void KStore::_do_write_stripe(TransContext
*txc
, OnodeRef o
,
2618 uint64_t offset
, bufferlist
& bl
)
2620 o
->pending_stripes
[offset
] = bl
;
2622 get_data_key(o
->onode
.nid
, offset
, &key
);
2623 txc
->t
->set(PREFIX_DATA
, key
, bl
);
2626 void KStore::_do_remove_stripe(TransContext
*txc
, OnodeRef o
, uint64_t offset
)
2628 o
->pending_stripes
.erase(offset
);
2630 get_data_key(o
->onode
.nid
, offset
, &key
);
2631 txc
->t
->rmkey(PREFIX_DATA
, key
);
2634 int KStore::_do_write(TransContext
*txc
,
2636 uint64_t offset
, uint64_t length
,
2637 bufferlist
& orig_bl
,
2638 uint32_t fadvise_flags
)
2642 dout(20) << __func__
2643 << " " << o
->oid
<< " " << offset
<< "~" << length
2644 << " - have " << o
->onode
.size
2645 << " bytes, nid " << o
->onode
.nid
<< dendl
;
2653 uint64_t stripe_size
= o
->onode
.stripe_size
;
2655 o
->onode
.stripe_size
= cct
->_conf
->kstore_default_stripe_size
;
2656 stripe_size
= o
->onode
.stripe_size
;
2659 unsigned bl_off
= 0;
2660 while (length
> 0) {
2661 uint64_t offset_rem
= offset
% stripe_size
;
2662 uint64_t end_rem
= (offset
+ length
) % stripe_size
;
2663 if (offset_rem
== 0 && end_rem
== 0) {
2665 bl
.substr_of(orig_bl
, bl_off
, stripe_size
);
2666 dout(30) << __func__
<< " full stripe " << offset
<< dendl
;
2667 _do_write_stripe(txc
, o
, offset
, bl
);
2668 offset
+= stripe_size
;
2669 length
-= stripe_size
;
2670 bl_off
+= stripe_size
;
2673 uint64_t stripe_off
= offset
- offset_rem
;
2675 _do_read_stripe(o
, stripe_off
, &prev
, true);
2676 dout(20) << __func__
<< " read previous stripe " << stripe_off
2677 << ", got " << prev
.length() << dendl
;
2680 unsigned p
= std::min
<uint64_t>(prev
.length(), offset_rem
);
2682 dout(20) << __func__
<< " reuse leading " << p
<< " bytes" << dendl
;
2683 bl
.substr_of(prev
, 0, p
);
2685 if (p
< offset_rem
) {
2686 dout(20) << __func__
<< " add leading " << offset_rem
- p
<< " zeros" << dendl
;
2687 bl
.append_zero(offset_rem
- p
);
2690 unsigned use
= stripe_size
- offset_rem
;
2692 use
-= stripe_size
- end_rem
;
2693 dout(20) << __func__
<< " using " << use
<< " for this stripe" << dendl
;
2695 t
.substr_of(orig_bl
, bl_off
, use
);
2699 if (end_rem
< prev
.length()) {
2700 unsigned l
= prev
.length() - end_rem
;
2701 dout(20) << __func__
<< " reuse trailing " << l
<< " bytes" << dendl
;
2703 t
.substr_of(prev
, end_rem
, l
);
2707 dout(30) << " writing:\n";
2710 _do_write_stripe(txc
, o
, stripe_off
, bl
);
2715 if (offset
> o
->onode
.size
) {
2716 dout(20) << __func__
<< " extending size to " << offset
+ length
2718 o
->onode
.size
= offset
;
2724 int KStore::_write(TransContext
*txc
,
2727 uint64_t offset
, size_t length
,
2729 uint32_t fadvise_flags
)
2731 dout(15) << __func__
<< " " << c
->cid
<< " " << o
->oid
2732 << " " << offset
<< "~" << length
2734 _assign_nid(txc
, o
);
2735 int r
= _do_write(txc
, o
, offset
, length
, bl
, fadvise_flags
);
2736 txc
->write_onode(o
);
2738 dout(10) << __func__
<< " " << c
->cid
<< " " << o
->oid
2739 << " " << offset
<< "~" << length
2740 << " = " << r
<< dendl
;
2744 int KStore::_zero(TransContext
*txc
,
2747 uint64_t offset
, size_t length
)
2749 dout(15) << __func__
<< " " << c
->cid
<< " " << o
->oid
2750 << " " << offset
<< "~" << length
2756 _assign_nid(txc
, o
);
2758 uint64_t stripe_size
= o
->onode
.stripe_size
;
2760 uint64_t end
= offset
+ length
;
2761 uint64_t pos
= offset
;
2762 uint64_t stripe_off
= pos
% stripe_size
;
2763 while (pos
< offset
+ length
) {
2764 if (stripe_off
|| end
- pos
< stripe_size
) {
2766 _do_read_stripe(o
, pos
- stripe_off
, &stripe
, true);
2767 dout(30) << __func__
<< " stripe " << pos
- stripe_off
<< " got "
2768 << stripe
.length() << dendl
;
2770 bl
.substr_of(stripe
, 0, std::min
<uint64_t>(stripe
.length(), stripe_off
));
2771 if (end
>= pos
- stripe_off
+ stripe_size
||
2772 end
>= o
->onode
.size
) {
2773 dout(20) << __func__
<< " truncated stripe " << pos
- stripe_off
2774 << " to " << bl
.length() << dendl
;
2776 auto len
= end
- (pos
- stripe_off
+ bl
.length());
2777 bl
.append_zero(len
);
2778 dout(20) << __func__
<< " adding " << len
<< " of zeros" << dendl
;
2779 if (stripe
.length() > bl
.length()) {
2780 unsigned l
= stripe
.length() - bl
.length();
2782 t
.substr_of(stripe
, stripe
.length() - l
, l
);
2783 dout(20) << __func__
<< " keeping tail " << l
<< " of stripe" << dendl
;
2787 _do_write_stripe(txc
, o
, pos
- stripe_off
, bl
);
2788 pos
+= stripe_size
- stripe_off
;
2791 dout(20) << __func__
<< " rm stripe " << pos
<< dendl
;
2792 _do_remove_stripe(txc
, o
, pos
- stripe_off
);
2797 if (offset
+ length
> o
->onode
.size
) {
2798 o
->onode
.size
= offset
+ length
;
2799 dout(20) << __func__
<< " extending size to " << offset
+ length
2802 txc
->write_onode(o
);
2804 dout(10) << __func__
<< " " << c
->cid
<< " " << o
->oid
2805 << " " << offset
<< "~" << length
2806 << " = " << r
<< dendl
;
2810 int KStore::_do_truncate(TransContext
*txc
, OnodeRef o
, uint64_t offset
)
2812 uint64_t stripe_size
= o
->onode
.stripe_size
;
2816 // trim down stripes
2818 uint64_t pos
= offset
;
2819 uint64_t stripe_off
= pos
% stripe_size
;
2820 while (pos
< o
->onode
.size
) {
2823 _do_read_stripe(o
, pos
- stripe_off
, &stripe
, true);
2824 dout(30) << __func__
<< " stripe " << pos
- stripe_off
<< " got "
2825 << stripe
.length() << dendl
;
2827 t
.substr_of(stripe
, 0, std::min
<uint64_t>(stripe_off
, stripe
.length()));
2828 _do_write_stripe(txc
, o
, pos
- stripe_off
, t
);
2829 dout(20) << __func__
<< " truncated stripe " << pos
- stripe_off
2830 << " to " << t
.length() << dendl
;
2831 pos
+= stripe_size
- stripe_off
;
2834 dout(20) << __func__
<< " rm stripe " << pos
<< dendl
;
2835 _do_remove_stripe(txc
, o
, pos
- stripe_off
);
2840 // trim down cached tail
2841 if (o
->tail_bl
.length()) {
2842 if (offset
/ stripe_size
!= o
->onode
.size
/ stripe_size
) {
2843 dout(20) << __func__
<< " clear cached tail" << dendl
;
2849 o
->onode
.size
= offset
;
2850 dout(10) << __func__
<< " truncate size to " << offset
<< dendl
;
2852 txc
->write_onode(o
);
2856 int KStore::_truncate(TransContext
*txc
,
2861 dout(15) << __func__
<< " " << c
->cid
<< " " << o
->oid
2864 int r
= _do_truncate(txc
, o
, offset
);
2865 dout(10) << __func__
<< " " << c
->cid
<< " " << o
->oid
2867 << " = " << r
<< dendl
;
2871 int KStore::_do_remove(TransContext
*txc
,
2876 _do_truncate(txc
, o
, 0);
2879 if (o
->onode
.omap_head
) {
2880 _do_omap_clear(txc
, o
->onode
.omap_head
);
2883 o
->onode
= kstore_onode_t();
2884 txc
->onodes
.erase(o
);
2885 get_object_key(cct
, o
->oid
, &key
);
2886 txc
->t
->rmkey(PREFIX_OBJ
, key
);
2890 int KStore::_remove(TransContext
*txc
,
2894 dout(15) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< dendl
;
2895 int r
= _do_remove(txc
, o
);
2896 dout(10) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< " = " << r
<< dendl
;
2900 int KStore::_setattr(TransContext
*txc
,
2906 dout(15) << __func__
<< " " << c
->cid
<< " " << o
->oid
2907 << " " << name
<< " (" << val
.length() << " bytes)"
2910 o
->onode
.attrs
[name
] = val
;
2911 txc
->write_onode(o
);
2912 dout(10) << __func__
<< " " << c
->cid
<< " " << o
->oid
2913 << " " << name
<< " (" << val
.length() << " bytes)"
2914 << " = " << r
<< dendl
;
2918 int KStore::_setattrs(TransContext
*txc
,
2921 const map
<string
,bufferptr
>& aset
)
2923 dout(15) << __func__
<< " " << c
->cid
<< " " << o
->oid
2924 << " " << aset
.size() << " keys"
2927 for (map
<string
,bufferptr
>::const_iterator p
= aset
.begin();
2928 p
!= aset
.end(); ++p
) {
2929 if (p
->second
.is_partial())
2930 o
->onode
.attrs
[p
->first
] = bufferptr(p
->second
.c_str(), p
->second
.length());
2932 o
->onode
.attrs
[p
->first
] = p
->second
;
2934 txc
->write_onode(o
);
2935 dout(10) << __func__
<< " " << c
->cid
<< " " << o
->oid
2936 << " " << aset
.size() << " keys"
2937 << " = " << r
<< dendl
;
2942 int KStore::_rmattr(TransContext
*txc
,
2947 dout(15) << __func__
<< " " << c
->cid
<< " " << o
->oid
2948 << " " << name
<< dendl
;
2950 o
->onode
.attrs
.erase(name
);
2951 txc
->write_onode(o
);
2952 dout(10) << __func__
<< " " << c
->cid
<< " " << o
->oid
2953 << " " << name
<< " = " << r
<< dendl
;
2957 int KStore::_rmattrs(TransContext
*txc
,
2961 dout(15) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< dendl
;
2963 o
->onode
.attrs
.clear();
2964 txc
->write_onode(o
);
2965 dout(10) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< " = " << r
<< dendl
;
2969 void KStore::_do_omap_clear(TransContext
*txc
, uint64_t id
)
2971 KeyValueDB::Iterator it
= db
->get_iterator(PREFIX_OMAP
);
2972 string prefix
, tail
;
2973 get_omap_header(id
, &prefix
);
2974 get_omap_tail(id
, &tail
);
2975 it
->lower_bound(prefix
);
2976 while (it
->valid()) {
2977 if (it
->key() >= tail
) {
2978 dout(30) << __func__
<< " stop at " << tail
<< dendl
;
2981 txc
->t
->rmkey(PREFIX_OMAP
, it
->key());
2982 dout(30) << __func__
<< " rm " << pretty_binary_string(it
->key()) << dendl
;
2987 int KStore::_omap_clear(TransContext
*txc
,
2991 dout(15) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< dendl
;
2993 if (o
->onode
.omap_head
!= 0) {
2994 _do_omap_clear(txc
, o
->onode
.omap_head
);
2996 dout(10) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< " = " << r
<< dendl
;
3000 int KStore::_omap_setkeys(TransContext
*txc
,
3005 dout(15) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< dendl
;
3007 auto p
= bl
.cbegin();
3009 if (!o
->onode
.omap_head
) {
3010 o
->onode
.omap_head
= o
->onode
.nid
;
3011 txc
->write_onode(o
);
3020 get_omap_key(o
->onode
.omap_head
, key
, &final_key
);
3021 dout(30) << __func__
<< " " << pretty_binary_string(final_key
)
3022 << " <- " << key
<< dendl
;
3023 txc
->t
->set(PREFIX_OMAP
, final_key
, value
);
3026 dout(10) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< " = " << r
<< dendl
;
3030 int KStore::_omap_setheader(TransContext
*txc
,
3035 dout(15) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< dendl
;
3038 if (!o
->onode
.omap_head
) {
3039 o
->onode
.omap_head
= o
->onode
.nid
;
3040 txc
->write_onode(o
);
3042 get_omap_header(o
->onode
.omap_head
, &key
);
3043 txc
->t
->set(PREFIX_OMAP
, key
, bl
);
3045 dout(10) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< " = " << r
<< dendl
;
3049 int KStore::_omap_rmkeys(TransContext
*txc
,
3052 const bufferlist
& bl
)
3054 dout(15) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< dendl
;
3056 auto p
= bl
.cbegin();
3059 if (!o
->onode
.omap_head
) {
3068 get_omap_key(o
->onode
.omap_head
, key
, &final_key
);
3069 dout(30) << __func__
<< " rm " << pretty_binary_string(final_key
)
3070 << " <- " << key
<< dendl
;
3071 txc
->t
->rmkey(PREFIX_OMAP
, final_key
);
3076 dout(10) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< " = " << r
<< dendl
;
3080 int KStore::_omap_rmkey_range(TransContext
*txc
,
3083 const string
& first
, const string
& last
)
3085 dout(15) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< dendl
;
3086 KeyValueDB::Iterator it
;
3087 string key_first
, key_last
;
3090 if (!o
->onode
.omap_head
) {
3093 it
= db
->get_iterator(PREFIX_OMAP
);
3094 get_omap_key(o
->onode
.omap_head
, first
, &key_first
);
3095 get_omap_key(o
->onode
.omap_head
, last
, &key_last
);
3096 it
->lower_bound(key_first
);
3097 while (it
->valid()) {
3098 if (it
->key() >= key_last
) {
3099 dout(30) << __func__
<< " stop at " << pretty_binary_string(key_last
)
3103 txc
->t
->rmkey(PREFIX_OMAP
, it
->key());
3104 dout(30) << __func__
<< " rm " << pretty_binary_string(it
->key()) << dendl
;
3110 dout(10) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< " = " << r
<< dendl
;
3114 int KStore::_setallochint(TransContext
*txc
,
3117 uint64_t expected_object_size
,
3118 uint64_t expected_write_size
,
3121 dout(15) << __func__
<< " " << c
->cid
<< " " << o
->oid
3122 << " object_size " << expected_object_size
3123 << " write_size " << expected_write_size
3124 << " flags " << flags
3127 o
->onode
.expected_object_size
= expected_object_size
;
3128 o
->onode
.expected_write_size
= expected_write_size
;
3129 o
->onode
.alloc_hint_flags
= flags
;
3131 txc
->write_onode(o
);
3132 dout(10) << __func__
<< " " << c
->cid
<< " " << o
->oid
3133 << " object_size " << expected_object_size
3134 << " write_size " << expected_write_size
3135 << " = " << r
<< dendl
;
3139 int KStore::_clone(TransContext
*txc
,
3144 dout(15) << __func__
<< " " << c
->cid
<< " " << oldo
->oid
<< " -> "
3145 << newo
->oid
<< dendl
;
3147 if (oldo
->oid
.hobj
.get_hash() != newo
->oid
.hobj
.get_hash()) {
3148 derr
<< __func__
<< " mismatched hash on " << oldo
->oid
3149 << " and " << newo
->oid
<< dendl
;
3154 newo
->exists
= true;
3155 _assign_nid(txc
, newo
);
3160 r
= _do_read(oldo
, 0, oldo
->onode
.size
, bl
, true, 0);
3164 // truncate any old data
3165 r
= _do_truncate(txc
, newo
, 0);
3169 r
= _do_write(txc
, newo
, 0, oldo
->onode
.size
, bl
, 0);
3173 newo
->onode
.attrs
= oldo
->onode
.attrs
;
3176 if (newo
->onode
.omap_head
) {
3177 dout(20) << __func__
<< " clearing old omap data" << dendl
;
3178 _do_omap_clear(txc
, newo
->onode
.omap_head
);
3180 if (oldo
->onode
.omap_head
) {
3181 dout(20) << __func__
<< " copying omap data" << dendl
;
3182 if (!newo
->onode
.omap_head
) {
3183 newo
->onode
.omap_head
= newo
->onode
.nid
;
3185 KeyValueDB::Iterator it
= db
->get_iterator(PREFIX_OMAP
);
3187 get_omap_header(oldo
->onode
.omap_head
, &head
);
3188 get_omap_tail(oldo
->onode
.omap_head
, &tail
);
3189 it
->lower_bound(head
);
3190 while (it
->valid()) {
3192 if (it
->key() >= tail
) {
3193 dout(30) << __func__
<< " reached tail" << dendl
;
3196 dout(30) << __func__
<< " got header/data "
3197 << pretty_binary_string(it
->key()) << dendl
;
3198 ceph_assert(it
->key() < tail
);
3199 rewrite_omap_key(newo
->onode
.omap_head
, it
->key(), &key
);
3200 txc
->t
->set(PREFIX_OMAP
, key
, it
->value());
3206 txc
->write_onode(newo
);
3210 dout(10) << __func__
<< " " << c
->cid
<< " " << oldo
->oid
<< " -> "
3211 << newo
->oid
<< " = " << r
<< dendl
;
3215 int KStore::_clone_range(TransContext
*txc
,
3219 uint64_t srcoff
, uint64_t length
, uint64_t dstoff
)
3221 dout(15) << __func__
<< " " << c
->cid
<< " " << oldo
->oid
<< " -> "
3222 << newo
->oid
<< " from " << srcoff
<< "~" << length
3223 << " to offset " << dstoff
<< dendl
;
3227 newo
->exists
= true;
3228 _assign_nid(txc
, newo
);
3230 r
= _do_read(oldo
, srcoff
, length
, bl
, true, 0);
3234 r
= _do_write(txc
, newo
, dstoff
, bl
.length(), bl
, 0);
3238 txc
->write_onode(newo
);
3243 dout(10) << __func__
<< " " << c
->cid
<< " " << oldo
->oid
<< " -> "
3244 << newo
->oid
<< " from " << srcoff
<< "~" << length
3245 << " to offset " << dstoff
3246 << " = " << r
<< dendl
;
3250 int KStore::_rename(TransContext
*txc
,
3254 const ghobject_t
& new_oid
)
3256 dout(15) << __func__
<< " " << c
->cid
<< " " << oldo
->oid
<< " -> "
3257 << new_oid
<< dendl
;
3259 ghobject_t old_oid
= oldo
->oid
;
3261 string old_key
, new_key
;
3263 if (newo
&& newo
->exists
) {
3264 // destination object already exists, remove it first
3265 r
= _do_remove(txc
, newo
);
3270 txc
->t
->rmkey(PREFIX_OBJ
, oldo
->key
);
3271 txc
->write_onode(oldo
);
3272 c
->onode_map
.rename(old_oid
, new_oid
); // this adjusts oldo->{oid,key}
3276 dout(10) << __func__
<< " " << c
->cid
<< " " << old_oid
<< " -> "
3277 << new_oid
<< " = " << r
<< dendl
;
3283 int KStore::_create_collection(
3289 dout(15) << __func__
<< " " << cid
<< " bits " << bits
<< dendl
;
3294 std::unique_lock l
{coll_lock
};
3299 auto p
= new_coll_map
.find(cid
);
3300 ceph_assert(p
!= new_coll_map
.end());
3302 ceph_assert((*c
)->cid
== cid
);
3303 (*c
)->cnode
.bits
= bits
;
3305 new_coll_map
.erase(p
);
3307 encode((*c
)->cnode
, bl
);
3308 txc
->t
->set(PREFIX_COLL
, stringify(cid
), bl
);
3312 dout(10) << __func__
<< " " << cid
<< " bits " << bits
<< " = " << r
<< dendl
;
3316 int KStore::_remove_collection(TransContext
*txc
, coll_t cid
,
3319 dout(15) << __func__
<< " " << cid
<< dendl
;
3323 std::unique_lock l
{coll_lock
};
3328 size_t nonexistent_count
= 0;
3329 pair
<ghobject_t
,OnodeRef
> next_onode
;
3330 while ((*c
)->onode_map
.get_next(next_onode
.first
, &next_onode
)) {
3331 if (next_onode
.second
->exists
) {
3335 ++nonexistent_count
;
3337 vector
<ghobject_t
> ls
;
3339 // Enumerate onodes in db, up to nonexistent_count + 1
3340 // then check if all of them are marked as non-existent.
3341 // Bypass the check if returned number is greater than nonexistent_count
3342 r
= _collection_list(c
->get(), ghobject_t(), ghobject_t::get_max(),
3343 nonexistent_count
+ 1, &ls
, &next
);
3345 bool exists
= false; //ls.size() > nonexistent_count;
3346 for (auto it
= ls
.begin(); !exists
&& it
< ls
.end(); ++it
) {
3347 dout(10) << __func__
<< " oid " << *it
<< dendl
;
3348 auto onode
= (*c
)->onode_map
.lookup(*it
);
3349 exists
= !onode
|| onode
->exists
;
3351 dout(10) << __func__
<< " " << *it
3352 << " exists in db" << dendl
;
3356 coll_map
.erase(cid
);
3357 txc
->removed_collections
.push_back(*c
);
3359 txc
->t
->rmkey(PREFIX_COLL
, stringify(cid
));
3362 dout(10) << __func__
<< " " << cid
3363 << " is non-empty" << dendl
;
3370 dout(10) << __func__
<< " " << cid
<< " = " << r
<< dendl
;
3374 int KStore::_split_collection(TransContext
*txc
,
3377 unsigned bits
, int rem
)
3379 dout(15) << __func__
<< " " << c
->cid
<< " to " << d
->cid
<< " "
3380 << " bits " << bits
<< dendl
;
3382 std::unique_lock l
{c
->lock
};
3383 std::unique_lock l2
{d
->lock
};
3384 c
->onode_map
.clear();
3385 d
->onode_map
.clear();
3386 c
->cnode
.bits
= bits
;
3387 ceph_assert(d
->cnode
.bits
== bits
);
3391 encode(c
->cnode
, bl
);
3392 txc
->t
->set(PREFIX_COLL
, stringify(c
->cid
), bl
);
3394 dout(10) << __func__
<< " " << c
->cid
<< " to " << d
->cid
<< " "
3395 << " bits " << bits
<< " = " << r
<< dendl
;
3399 int KStore::_merge_collection(TransContext
*txc
,
3404 dout(15) << __func__
<< " " << (*c
)->cid
<< " to " << d
->cid
<< " "
3405 << " bits " << bits
<< dendl
;
3407 std::scoped_lock l
{(*c
)->lock
, d
->lock
};
3408 (*c
)->onode_map
.clear();
3409 d
->onode_map
.clear();
3410 d
->cnode
.bits
= bits
;
3413 coll_t cid
= (*c
)->cid
;
3416 encode(d
->cnode
, bl
);
3417 txc
->t
->set(PREFIX_COLL
, stringify(d
->cid
), bl
);
3419 coll_map
.erase((*c
)->cid
);
3420 txc
->removed_collections
.push_back(*c
);
3422 txc
->t
->rmkey(PREFIX_COLL
, stringify(cid
));
3424 dout(10) << __func__
<< " " << cid
<< " to " << d
->cid
<< " "
3425 << " bits " << bits
<< " = " << r
<< dendl
;
3429 // ===========================================