1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 Red Hat
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
17 #include <sys/types.h>
23 #include "osd/osd_types.h"
25 #include "include/compat.h"
26 #include "include/stringify.h"
27 #include "common/errno.h"
28 #include "common/safe_io.h"
29 #include "common/Formatter.h"
32 #define dout_context cct
33 #define dout_subsys ceph_subsys_kstore
39 * superblock, features
40 * refcounted extents (for efficient clone)
44 const string PREFIX_SUPER
= "S"; // field -> value
45 const string PREFIX_COLL
= "C"; // collection name -> (nothing)
46 const string PREFIX_OBJ
= "O"; // object name -> onode
47 const string PREFIX_DATA
= "D"; // nid + offset -> data
48 const string PREFIX_OMAP
= "M"; // u64 + keyname -> value
51 * object name key structure
53 * 2 chars: shard (-- for none, or hex digit, so that we sort properly)
54 * encoded u64: poolid + 2^63 (so that it sorts properly)
55 * encoded u32: hash (bit reversed)
59 * escaped string: namespace
61 * 1 char: '<', '=', or '>'. if =, then object key == object name, and
62 * we are followed just by the key. otherwise, we are followed by
63 * the key and then the object name.
65 * escaped string: object name (unless '=' above)
68 * encoded u64: generation
72 * string encoding in the key
74 * The key string needs to lexicographically sort the same way that
75 * ghobject_t does. We do this by escaping anything <= to '#' with #
76 * plus a 2 digit hex string, and anything >= '~' with ~ plus the two
79 * We use ! as a terminator for strings; this works because it is < #
80 * and will get escaped if it is present in the string.
84 static void append_escaped(const string
&in
, string
*out
)
87 for (string::const_iterator i
= in
.begin(); i
!= in
.end(); ++i
) {
89 snprintf(hexbyte
, sizeof(hexbyte
), "#%02x", (uint8_t)*i
);
91 } else if (*i
>= '~') {
92 snprintf(hexbyte
, sizeof(hexbyte
), "~%02x", (uint8_t)*i
);
101 static int decode_escaped(const char *p
, string
*out
)
103 const char *orig_p
= p
;
104 while (*p
&& *p
!= '!') {
105 if (*p
== '#' || *p
== '~') {
107 int r
= sscanf(++p
, "%2x", &hex
);
110 out
->push_back((char)hex
);
113 out
->push_back(*p
++);
119 // some things we encode in binary (as le32 or le64); print the
120 // resulting key strings nicely
121 static string
pretty_binary_string(const string
& in
)
125 out
.reserve(in
.length() * 3);
126 enum { NONE
, HEX
, STRING
} mode
= NONE
;
127 unsigned from
= 0, i
;
128 for (i
=0; i
< in
.length(); ++i
) {
129 if ((in
[i
] < 32 || (unsigned char)in
[i
] > 126) ||
130 (mode
== HEX
&& in
.length() - i
>= 4 &&
131 ((in
[i
] < 32 || (unsigned char)in
[i
] > 126) ||
132 (in
[i
+1] < 32 || (unsigned char)in
[i
+1] > 126) ||
133 (in
[i
+2] < 32 || (unsigned char)in
[i
+2] > 126) ||
134 (in
[i
+3] < 32 || (unsigned char)in
[i
+3] > 126)))) {
135 if (mode
== STRING
) {
136 out
.append(in
.substr(from
, i
- from
));
143 if (in
.length() - i
>= 4) {
144 // print a whole u32 at once
145 snprintf(buf
, sizeof(buf
), "%08x",
146 (uint32_t)(((unsigned char)in
[i
] << 24) |
147 ((unsigned char)in
[i
+1] << 16) |
148 ((unsigned char)in
[i
+2] << 8) |
149 ((unsigned char)in
[i
+3] << 0)));
152 snprintf(buf
, sizeof(buf
), "%02x", (int)(unsigned char)in
[i
]);
156 if (mode
!= STRING
) {
163 if (mode
== STRING
) {
164 out
.append(in
.substr(from
, i
- from
));
170 static void _key_encode_shard(shard_id_t shard
, string
*key
)
172 // make field ordering match with ghobject_t compare operations
173 if (shard
== shard_id_t::NO_SHARD
) {
174 // otherwise ff will sort *after* 0, not before.
178 snprintf(buf
, sizeof(buf
), "%02x", (int)shard
);
182 static const char *_key_decode_shard(const char *key
, shard_id_t
*pshard
)
185 *pshard
= shard_id_t::NO_SHARD
;
188 int r
= sscanf(key
, "%x", &shard
);
191 *pshard
= shard_id_t(shard
);
196 static void get_coll_key_range(const coll_t
& cid
, int bits
,
197 string
*temp_start
, string
*temp_end
,
198 string
*start
, string
*end
)
206 if (cid
.is_pg(&pgid
)) {
207 _key_encode_shard(pgid
.shard
, start
);
209 *temp_start
= *start
;
212 _key_encode_u64(pgid
.pool() + 0x8000000000000000ull
, start
);
213 _key_encode_u64((-2ll - pgid
.pool()) + 0x8000000000000000ull
, temp_start
);
214 _key_encode_u32(hobject_t::_reverse_bits(pgid
.ps()), start
);
215 _key_encode_u32(hobject_t::_reverse_bits(pgid
.ps()), temp_start
);
217 temp_start
->append(".");
219 _key_encode_u64(pgid
.pool() + 0x8000000000000000ull
, end
);
220 _key_encode_u64((-2ll - pgid
.pool()) + 0x8000000000000000ull
, temp_end
);
223 hobject_t::_reverse_bits(pgid
.ps()) + (1ull << (32-bits
));
224 if (end_hash
<= 0xffffffffull
) {
225 _key_encode_u32(end_hash
, end
);
226 _key_encode_u32(end_hash
, temp_end
);
228 temp_end
->append(".");
230 _key_encode_u32(0xffffffff, end
);
231 _key_encode_u32(0xffffffff, temp_end
);
233 temp_end
->append(":");
236 _key_encode_shard(shard_id_t::NO_SHARD
, start
);
237 _key_encode_u64(-1ull + 0x8000000000000000ull
, start
);
239 _key_encode_u32(0, start
);
241 _key_encode_u32(0xffffffff, end
);
244 // no separate temp section
250 static int get_key_object(const string
& key
, ghobject_t
*oid
);
252 static void get_object_key(CephContext
* cct
, const ghobject_t
& oid
,
257 _key_encode_shard(oid
.shard_id
, key
);
258 _key_encode_u64(oid
.hobj
.pool
+ 0x8000000000000000ull
, key
);
259 _key_encode_u32(oid
.hobj
.get_bitwise_key_u32(), key
);
262 append_escaped(oid
.hobj
.nspace
, key
);
264 if (oid
.hobj
.get_key().length()) {
265 // is a key... could be < = or >.
266 // (ASCII chars < = and > sort in that order, yay)
267 if (oid
.hobj
.get_key() < oid
.hobj
.oid
.name
) {
269 append_escaped(oid
.hobj
.get_key(), key
);
270 append_escaped(oid
.hobj
.oid
.name
, key
);
271 } else if (oid
.hobj
.get_key() > oid
.hobj
.oid
.name
) {
273 append_escaped(oid
.hobj
.get_key(), key
);
274 append_escaped(oid
.hobj
.oid
.name
, key
);
278 append_escaped(oid
.hobj
.oid
.name
, key
);
283 append_escaped(oid
.hobj
.oid
.name
, key
);
286 _key_encode_u64(oid
.hobj
.snap
, key
);
287 _key_encode_u64(oid
.generation
, key
);
292 int r
= get_key_object(*key
, &t
);
294 derr
<< " r " << r
<< dendl
;
295 derr
<< "key " << pretty_binary_string(*key
) << dendl
;
296 derr
<< "oid " << oid
<< dendl
;
297 derr
<< " t " << t
<< dendl
;
303 static int get_key_object(const string
& key
, ghobject_t
*oid
)
306 const char *p
= key
.c_str();
308 p
= _key_decode_shard(p
, &oid
->shard_id
);
311 p
= _key_decode_u64(p
, &pool
);
312 oid
->hobj
.pool
= pool
- 0x8000000000000000ull
;
315 p
= _key_decode_u32(p
, &hash
);
316 oid
->hobj
.set_bitwise_key_u32(hash
);
321 r
= decode_escaped(p
, &oid
->hobj
.nspace
);
329 r
= decode_escaped(p
, &oid
->hobj
.oid
.name
);
333 } else if (*p
== '<' || *p
== '>') {
337 r
= decode_escaped(p
, &okey
);
341 r
= decode_escaped(p
, &oid
->hobj
.oid
.name
);
345 oid
->hobj
.set_key(okey
);
351 p
= _key_decode_u64(p
, &oid
->hobj
.snap
.val
);
352 p
= _key_decode_u64(p
, &oid
->generation
);
354 // if we get something other than a null terminator here,
355 // something goes wrong.
363 static void get_data_key(uint64_t nid
, uint64_t offset
, string
*out
)
365 _key_encode_u64(nid
, out
);
366 _key_encode_u64(offset
, out
);
370 static void get_omap_header(uint64_t id
, string
*out
)
372 _key_encode_u64(id
, out
);
376 // hmm, I don't think there's any need to escape the user key since we
377 // have a clean prefix.
378 static void get_omap_key(uint64_t id
, const string
& key
, string
*out
)
380 _key_encode_u64(id
, out
);
385 static void rewrite_omap_key(uint64_t id
, string old
, string
*out
)
387 _key_encode_u64(id
, out
);
388 out
->append(old
.substr(out
->length()));
391 static void decode_omap_key(const string
& key
, string
*user_key
)
393 *user_key
= key
.substr(sizeof(uint64_t) + 1);
396 static void get_omap_tail(uint64_t id
, string
*out
)
398 _key_encode_u64(id
, out
);
407 #define dout_prefix *_dout << "kstore.onode(" << this << ") "
409 void KStore::Onode::flush()
411 std::unique_lock
<std::mutex
> l(flush_lock
);
412 dout(20) << __func__
<< " " << flush_txns
<< dendl
;
413 while (!flush_txns
.empty())
415 dout(20) << __func__
<< " done" << dendl
;
421 #define dout_prefix *_dout << "kstore.lru(" << this << ") "
423 void KStore::OnodeHashLRU::_touch(OnodeRef o
)
425 lru_list_t::iterator p
= lru
.iterator_to(*o
);
430 void KStore::OnodeHashLRU::add(const ghobject_t
& oid
, OnodeRef o
)
432 std::lock_guard
<std::mutex
> l(lock
);
433 dout(30) << __func__
<< " " << oid
<< " " << o
<< dendl
;
434 assert(onode_map
.count(oid
) == 0);
439 KStore::OnodeRef
KStore::OnodeHashLRU::lookup(const ghobject_t
& oid
)
441 std::lock_guard
<std::mutex
> l(lock
);
442 dout(30) << __func__
<< dendl
;
443 ceph::unordered_map
<ghobject_t
,OnodeRef
>::iterator p
= onode_map
.find(oid
);
444 if (p
== onode_map
.end()) {
445 dout(30) << __func__
<< " " << oid
<< " miss" << dendl
;
448 dout(30) << __func__
<< " " << oid
<< " hit " << p
->second
<< dendl
;
453 void KStore::OnodeHashLRU::clear()
455 std::lock_guard
<std::mutex
> l(lock
);
456 dout(10) << __func__
<< dendl
;
461 void KStore::OnodeHashLRU::rename(const ghobject_t
& old_oid
,
462 const ghobject_t
& new_oid
)
464 std::lock_guard
<std::mutex
> l(lock
);
465 dout(30) << __func__
<< " " << old_oid
<< " -> " << new_oid
<< dendl
;
466 ceph::unordered_map
<ghobject_t
,OnodeRef
>::iterator po
, pn
;
467 po
= onode_map
.find(old_oid
);
468 pn
= onode_map
.find(new_oid
);
470 assert(po
!= onode_map
.end());
471 if (pn
!= onode_map
.end()) {
472 lru_list_t::iterator p
= lru
.iterator_to(*pn
->second
);
476 OnodeRef o
= po
->second
;
478 // install a non-existent onode it its place
479 po
->second
.reset(new Onode(cct
, old_oid
, o
->key
));
480 lru
.push_back(*po
->second
);
483 onode_map
.insert(make_pair(new_oid
, o
));
486 get_object_key(cct
, new_oid
, &o
->key
);
489 bool KStore::OnodeHashLRU::get_next(
490 const ghobject_t
& after
,
491 pair
<ghobject_t
,OnodeRef
> *next
)
493 std::lock_guard
<std::mutex
> l(lock
);
494 dout(20) << __func__
<< " after " << after
<< dendl
;
496 if (after
== ghobject_t()) {
500 ceph::unordered_map
<ghobject_t
,OnodeRef
>::iterator p
= onode_map
.begin();
501 assert(p
!= onode_map
.end());
502 next
->first
= p
->first
;
503 next
->second
= p
->second
;
507 ceph::unordered_map
<ghobject_t
,OnodeRef
>::iterator p
= onode_map
.find(after
);
508 assert(p
!= onode_map
.end()); // for now
509 lru_list_t::iterator pi
= lru
.iterator_to(*p
->second
);
511 if (pi
== lru
.end()) {
514 next
->first
= pi
->oid
;
515 next
->second
= onode_map
[pi
->oid
];
519 int KStore::OnodeHashLRU::trim(int max
)
521 std::lock_guard
<std::mutex
> l(lock
);
522 dout(20) << __func__
<< " max " << max
523 << " size " << onode_map
.size() << dendl
;
525 int num
= onode_map
.size() - max
;
526 if (onode_map
.size() == 0 || num
<= 0)
527 return 0; // don't even try
529 lru_list_t::iterator p
= lru
.end();
534 int refs
= o
->nref
.load();
536 dout(20) << __func__
<< " " << o
->oid
<< " has " << refs
537 << " refs; stopping with " << num
<< " left to trim" << dendl
;
540 dout(30) << __func__
<< " trim " << o
->oid
<< dendl
;
541 if (p
!= lru
.begin()) {
547 o
->get(); // paranoia
548 onode_map
.erase(o
->oid
);
556 // =======================================================
561 #define dout_prefix *_dout << "kstore(" << store->path << ").collection(" << cid << ") "
563 KStore::Collection::Collection(KStore
*ns
, coll_t c
)
566 lock("KStore::Collection::lock", true, false),
567 onode_map(store
->cct
)
571 KStore::OnodeRef
KStore::Collection::get_onode(
572 const ghobject_t
& oid
,
575 assert(create
? lock
.is_wlocked() : lock
.is_locked());
578 if (cid
.is_pg(&pgid
)) {
579 if (!oid
.match(cnode
.bits
, pgid
.ps())) {
580 lderr(store
->cct
) << __func__
<< " oid " << oid
<< " not part of "
581 << pgid
<< " bits " << cnode
.bits
<< dendl
;
586 OnodeRef o
= onode_map
.lookup(oid
);
591 get_object_key(store
->cct
, oid
, &key
);
593 ldout(store
->cct
, 20) << __func__
<< " oid " << oid
<< " key "
594 << pretty_binary_string(key
) << dendl
;
597 int r
= store
->db
->get(PREFIX_OBJ
, key
, &v
);
598 ldout(store
->cct
, 20) << " r " << r
<< " v.len " << v
.length() << dendl
;
600 if (v
.length() == 0) {
601 assert(r
== -ENOENT
);
606 on
= new Onode(store
->cct
, oid
, key
);
611 on
= new Onode(store
->cct
, oid
, key
);
613 bufferlist::iterator p
= v
.begin();
614 ::decode(on
->onode
, p
);
617 onode_map
.add(oid
, o
);
623 // =======================================================
626 #define dout_prefix *_dout << "kstore(" << path << ") "
628 KStore::KStore(CephContext
*cct
, const string
& path
)
629 : ObjectStore(cct
, path
),
634 coll_lock("KStore::coll_lock"),
637 throttle_ops(cct
, "kstore_max_ops", cct
->_conf
->kstore_max_ops
),
638 throttle_bytes(cct
, "kstore_max_bytes", cct
->_conf
->kstore_max_bytes
),
640 kv_sync_thread(this),
655 void KStore::_init_logger()
658 PerfCountersBuilder
b(cct
, "KStore",
659 l_kstore_first
, l_kstore_last
);
660 b
.add_time_avg(l_kstore_state_prepare_lat
, "state_prepare_lat", "Average prepare state latency");
661 b
.add_time_avg(l_kstore_state_kv_queued_lat
, "state_kv_queued_lat", "Average kv_queued state latency");
662 b
.add_time_avg(l_kstore_state_kv_done_lat
, "state_kv_done_lat", "Average kv_done state latency");
663 b
.add_time_avg(l_kstore_state_finishing_lat
, "state_finishing_lat", "Average finishing state latency");
664 b
.add_time_avg(l_kstore_state_done_lat
, "state_done_lat", "Average done state latency");
665 logger
= b
.create_perf_counters();
666 cct
->get_perfcounters_collection()->add(logger
);
669 void KStore::_shutdown_logger()
672 cct
->get_perfcounters_collection()->remove(logger
);
676 int KStore::_open_path()
679 path_fd
= ::open(path
.c_str(), O_DIRECTORY
|O_CLOEXEC
);
682 derr
<< __func__
<< " unable to open " << path
<< ": " << cpp_strerror(r
)
689 void KStore::_close_path()
691 VOID_TEMP_FAILURE_RETRY(::close(path_fd
));
695 int KStore::_open_fsid(bool create
)
701 fsid_fd
= ::openat(path_fd
, "fsid", flags
, 0644);
704 derr
<< __func__
<< " " << cpp_strerror(err
) << dendl
;
710 int KStore::_read_fsid(uuid_d
*uuid
)
713 memset(fsid_str
, 0, sizeof(fsid_str
));
714 int ret
= safe_read(fsid_fd
, fsid_str
, sizeof(fsid_str
));
716 derr
<< __func__
<< " failed: " << cpp_strerror(ret
) << dendl
;
723 if (!uuid
->parse(fsid_str
)) {
724 derr
<< __func__
<< " unparsable uuid " << fsid_str
<< dendl
;
730 int KStore::_write_fsid()
732 int r
= ::ftruncate(fsid_fd
, 0);
735 derr
<< __func__
<< " fsid truncate failed: " << cpp_strerror(r
) << dendl
;
738 string str
= stringify(fsid
) + "\n";
739 r
= safe_write(fsid_fd
, str
.c_str(), str
.length());
741 derr
<< __func__
<< " fsid write failed: " << cpp_strerror(r
) << dendl
;
744 r
= ::fsync(fsid_fd
);
747 derr
<< __func__
<< " fsid fsync failed: " << cpp_strerror(r
) << dendl
;
753 void KStore::_close_fsid()
755 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd
));
759 int KStore::_lock_fsid()
762 memset(&l
, 0, sizeof(l
));
764 l
.l_whence
= SEEK_SET
;
767 int r
= ::fcntl(fsid_fd
, F_SETLK
, &l
);
770 derr
<< __func__
<< " failed to lock " << path
<< "/fsid"
771 << " (is another ceph-osd still running?)"
772 << cpp_strerror(err
) << dendl
;
778 bool KStore::test_mount_in_use()
780 // most error conditions mean the mount is not in use (e.g., because
781 // it doesn't exist). only if we fail to lock do we conclude it is
784 int r
= _open_path();
787 r
= _open_fsid(false);
792 ret
= true; // if we can't lock, it is in use
799 int KStore::_open_db(bool create
)
804 snprintf(fn
, sizeof(fn
), "%s/db", path
.c_str());
808 kv_backend
= cct
->_conf
->kstore_backend
;
810 r
= read_meta("kv_backend", &kv_backend
);
812 derr
<< __func__
<< " uanble to read 'kv_backend' meta" << dendl
;
816 dout(10) << __func__
<< " kv_backend = " << kv_backend
<< dendl
;
819 int r
= ::mkdir(fn
, 0755);
822 if (r
< 0 && r
!= -EEXIST
) {
823 derr
<< __func__
<< " failed to create " << fn
<< ": " << cpp_strerror(r
)
829 char walfn
[PATH_MAX
];
830 snprintf(walfn
, sizeof(walfn
), "%s/db.wal", path
.c_str());
831 r
= ::mkdir(walfn
, 0755);
834 if (r
< 0 && r
!= -EEXIST
) {
835 derr
<< __func__
<< " failed to create " << walfn
836 << ": " << cpp_strerror(r
)
842 db
= KeyValueDB::create(cct
, kv_backend
, fn
);
844 derr
<< __func__
<< " error creating db" << dendl
;
848 if (kv_backend
== "rocksdb")
849 options
= cct
->_conf
->kstore_rocksdb_options
;
853 r
= db
->create_and_open(err
);
857 derr
<< __func__
<< " erroring opening db: " << err
.str() << dendl
;
862 dout(1) << __func__
<< " opened " << kv_backend
863 << " path " << fn
<< " options " << options
<< dendl
;
867 void KStore::_close_db()
874 int KStore::_open_collections(int *errors
)
876 assert(coll_map
.empty());
877 KeyValueDB::Iterator it
= db
->get_iterator(PREFIX_COLL
);
878 for (it
->upper_bound(string());
882 if (cid
.parse(it
->key())) {
883 CollectionRef
c(new Collection(this, cid
));
884 bufferlist bl
= it
->value();
885 bufferlist::iterator p
= bl
.begin();
887 ::decode(c
->cnode
, p
);
888 } catch (buffer::error
& e
) {
889 derr
<< __func__
<< " failed to decode cnode, key:"
890 << pretty_binary_string(it
->key()) << dendl
;
893 dout(20) << __func__
<< " opened " << cid
<< dendl
;
896 derr
<< __func__
<< " unrecognized collection " << it
->key() << dendl
;
906 dout(1) << __func__
<< " path " << path
<< dendl
;
914 r
= _open_fsid(true);
922 r
= _read_fsid(&old_fsid
);
923 if (r
< 0 || old_fsid
.is_zero()) {
924 if (fsid
.is_zero()) {
925 fsid
.generate_random();
926 dout(1) << __func__
<< " generated fsid " << fsid
<< dendl
;
928 dout(1) << __func__
<< " using provided fsid " << fsid
<< dendl
;
930 // we'll write it last.
932 if (!fsid
.is_zero() && fsid
!= old_fsid
) {
933 derr
<< __func__
<< " on-disk fsid " << old_fsid
934 << " != provided " << fsid
<< dendl
;
939 dout(1) << __func__
<< " already created, fsid is " << fsid
<< dendl
;
947 r
= write_meta("kv_backend", cct
->_conf
->kstore_backend
);
951 r
= write_meta("type", "kstore");
955 // indicate mkfs completion/success by writing the fsid file
958 dout(10) << __func__
<< " success" << dendl
;
960 derr
<< __func__
<< " error writing fsid: " << cpp_strerror(r
) << dendl
;
973 dout(1) << __func__
<< " path " << path
<< dendl
;
975 if (cct
->_conf
->kstore_fsck_on_mount
) {
976 int rc
= fsck(cct
->_conf
->kstore_fsck_on_mount_deep
);
981 int r
= _open_path();
984 r
= _open_fsid(false);
988 r
= _read_fsid(&fsid
);
1000 r
= _open_super_meta();
1004 r
= _open_collections();
1009 kv_sync_thread
.create("kstore_kv_sync");
1023 int KStore::umount()
1026 dout(1) << __func__
<< dendl
;
1029 _reap_collections();
1032 dout(20) << __func__
<< " stopping kv thread" << dendl
;
1034 dout(20) << __func__
<< " draining finisher" << dendl
;
1035 finisher
.wait_for_empty();
1036 dout(20) << __func__
<< " stopping finisher" << dendl
;
1038 dout(20) << __func__
<< " closing" << dendl
;
1047 int KStore::fsck(bool deep
)
1049 dout(1) << __func__
<< dendl
;
1051 dout(1) << __func__
<< " finish with " << errors
<< " errors" << dendl
;
1055 void KStore::_sync()
1057 dout(10) << __func__
<< dendl
;
1059 std::unique_lock
<std::mutex
> l(kv_lock
);
1060 while (!kv_committing
.empty() ||
1061 !kv_queue
.empty()) {
1062 dout(20) << " waiting for kv to commit" << dendl
;
1063 kv_sync_cond
.wait(l
);
1066 dout(10) << __func__
<< " done" << dendl
;
1069 int KStore::statfs(struct store_statfs_t
* buf
)
1071 return db
->get_statfs(buf
);
1077 KStore::CollectionRef
KStore::_get_collection(coll_t cid
)
1079 RWLock::RLocker
l(coll_lock
);
1080 ceph::unordered_map
<coll_t
,CollectionRef
>::iterator cp
= coll_map
.find(cid
);
1081 if (cp
== coll_map
.end())
1082 return CollectionRef();
1086 void KStore::_queue_reap_collection(CollectionRef
& c
)
1088 dout(10) << __func__
<< " " << c
->cid
<< dendl
;
1089 std::lock_guard
<std::mutex
> l(reap_lock
);
1090 removed_collections
.push_back(c
);
1093 void KStore::_reap_collections()
1095 list
<CollectionRef
> removed_colls
;
1096 std::lock_guard
<std::mutex
> l(reap_lock
);
1097 removed_colls
.swap(removed_collections
);
1099 for (list
<CollectionRef
>::iterator p
= removed_colls
.begin();
1100 p
!= removed_colls
.end();
1102 CollectionRef c
= *p
;
1103 dout(10) << __func__
<< " " << c
->cid
<< dendl
;
1105 pair
<ghobject_t
,OnodeRef
> next
;
1106 while (c
->onode_map
.get_next(next
.first
, &next
)) {
1107 assert(!next
.second
->exists
);
1108 if (!next
.second
->flush_txns
.empty()) {
1109 dout(10) << __func__
<< " " << c
->cid
<< " " << next
.second
->oid
1110 << " flush_txns " << next
.second
->flush_txns
<< dendl
;
1115 c
->onode_map
.clear();
1116 dout(10) << __func__
<< " " << c
->cid
<< " done" << dendl
;
1119 dout(10) << __func__
<< " all reaped" << dendl
;
1125 bool KStore::exists(const coll_t
& cid
, const ghobject_t
& oid
)
1127 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
1128 CollectionRef c
= _get_collection(cid
);
1131 RWLock::RLocker
l(c
->lock
);
1132 OnodeRef o
= c
->get_onode(oid
, false);
1133 if (!o
|| !o
->exists
)
1140 const ghobject_t
& oid
,
1144 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
1145 CollectionRef c
= _get_collection(cid
);
1148 RWLock::RLocker
l(c
->lock
);
1149 OnodeRef o
= c
->get_onode(oid
, false);
1150 if (!o
|| !o
->exists
)
1152 st
->st_size
= o
->onode
.size
;
1153 st
->st_blksize
= 4096;
1154 st
->st_blocks
= (st
->st_size
+ st
->st_blksize
- 1) / st
->st_blksize
;
1159 int KStore::set_collection_opts(
1161 const pool_opts_t
& opts
)
1168 const ghobject_t
& oid
,
1174 dout(15) << __func__
<< " " << cid
<< " " << oid
1175 << " " << offset
<< "~" << length
1178 CollectionRef c
= _get_collection(cid
);
1181 RWLock::RLocker
l(c
->lock
);
1185 OnodeRef o
= c
->get_onode(oid
, false);
1186 if (!o
|| !o
->exists
) {
1191 if (offset
== length
&& offset
== 0)
1192 length
= o
->onode
.size
;
1194 r
= _do_read(o
, offset
, length
, bl
, op_flags
);
1197 dout(10) << __func__
<< " " << cid
<< " " << oid
1198 << " " << offset
<< "~" << length
1199 << " = " << r
<< dendl
;
1203 int KStore::_do_read(
1211 uint64_t stripe_size
= o
->onode
.stripe_size
;
1212 uint64_t stripe_off
;
1214 dout(20) << __func__
<< " " << offset
<< "~" << length
<< " size "
1215 << o
->onode
.size
<< " nid " << o
->onode
.nid
<< dendl
;
1218 if (offset
> o
->onode
.size
) {
1221 if (offset
+ length
> o
->onode
.size
) {
1222 length
= o
->onode
.size
- offset
;
1224 if (stripe_size
== 0) {
1225 bl
.append_zero(length
);
1232 stripe_off
= offset
% stripe_size
;
1233 while (length
> 0) {
1235 _do_read_stripe(o
, offset
- stripe_off
, &stripe
);
1236 dout(30) << __func__
<< " stripe " << offset
- stripe_off
<< " got "
1237 << stripe
.length() << dendl
;
1238 unsigned swant
= MIN(stripe_size
- stripe_off
, length
);
1239 if (stripe
.length()) {
1240 if (swant
== stripe
.length()) {
1241 bl
.claim_append(stripe
);
1242 dout(30) << __func__
<< " taking full stripe" << dendl
;
1245 if (stripe_off
< stripe
.length()) {
1246 l
= MIN(stripe
.length() - stripe_off
, swant
);
1248 t
.substr_of(stripe
, stripe_off
, l
);
1250 dout(30) << __func__
<< " taking " << stripe_off
<< "~" << l
<< dendl
;
1253 bl
.append_zero(swant
- l
);
1254 dout(30) << __func__
<< " adding " << swant
- l
<< " zeros" << dendl
;
1258 dout(30) << __func__
<< " generating " << swant
<< " zeros" << dendl
;
1259 bl
.append_zero(swant
);
1266 dout(30) << " result:\n";
1276 const ghobject_t
& oid
,
1281 map
<uint64_t, uint64_t> m
;
1282 int r
= fiemap(cid
, oid
, offset
, len
, m
);
1292 const ghobject_t
& oid
,
1295 map
<uint64_t, uint64_t>& destmap
)
1297 CollectionRef c
= _get_collection(cid
);
1300 RWLock::RLocker
l(c
->lock
);
1302 OnodeRef o
= c
->get_onode(oid
, false);
1303 if (!o
|| !o
->exists
) {
1307 if (offset
> o
->onode
.size
)
1310 if (offset
+ len
> o
->onode
.size
) {
1311 len
= o
->onode
.size
- offset
;
1314 dout(20) << __func__
<< " " << offset
<< "~" << len
<< " size "
1315 << o
->onode
.size
<< dendl
;
1317 // FIXME: do something smarter here
1318 destmap
[0] = o
->onode
.size
;
1321 dout(20) << __func__
<< " " << offset
<< "~" << len
1322 << " size = 0 (" << destmap
<< ")" << dendl
;
1326 int KStore::getattr(
1328 const ghobject_t
& oid
,
1332 dout(15) << __func__
<< " " << cid
<< " " << oid
<< " " << name
<< dendl
;
1333 CollectionRef c
= _get_collection(cid
);
1336 RWLock::RLocker
l(c
->lock
);
1340 OnodeRef o
= c
->get_onode(oid
, false);
1341 if (!o
|| !o
->exists
) {
1346 if (!o
->onode
.attrs
.count(k
)) {
1350 value
= o
->onode
.attrs
[k
];
1353 dout(10) << __func__
<< " " << cid
<< " " << oid
<< " " << name
1354 << " = " << r
<< dendl
;
1358 int KStore::getattrs(
1360 const ghobject_t
& oid
,
1361 map
<string
,bufferptr
>& aset
)
1363 dout(15) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
1364 CollectionRef c
= _get_collection(cid
);
1367 RWLock::RLocker
l(c
->lock
);
1370 OnodeRef o
= c
->get_onode(oid
, false);
1371 if (!o
|| !o
->exists
) {
1375 aset
= o
->onode
.attrs
;
1378 dout(10) << __func__
<< " " << cid
<< " " << oid
1379 << " = " << r
<< dendl
;
1383 int KStore::list_collections(vector
<coll_t
>& ls
)
1385 RWLock::RLocker
l(coll_lock
);
1386 for (ceph::unordered_map
<coll_t
, CollectionRef
>::iterator p
= coll_map
.begin();
1387 p
!= coll_map
.end();
1389 ls
.push_back(p
->first
);
1393 bool KStore::collection_exists(const coll_t
& c
)
1395 RWLock::RLocker
l(coll_lock
);
1396 return coll_map
.count(c
);
1399 int KStore::collection_empty(const coll_t
& cid
, bool *empty
)
1401 dout(15) << __func__
<< " " << cid
<< dendl
;
1402 vector
<ghobject_t
> ls
;
1404 int r
= collection_list(cid
, ghobject_t(), ghobject_t::get_max(), 1,
1407 derr
<< __func__
<< " collection_list returned: " << cpp_strerror(r
)
1411 *empty
= ls
.empty();
1412 dout(10) << __func__
<< " " << cid
<< " = " << (int)(*empty
) << dendl
;
1416 int KStore::collection_bits(const coll_t
& cid
)
1418 dout(15) << __func__
<< " " << cid
<< dendl
;
1419 CollectionHandle ch
= _get_collection(cid
);
1422 Collection
*c
= static_cast<Collection
*>(ch
.get());
1423 RWLock::RLocker
l(c
->lock
);
1424 dout(10) << __func__
<< " " << cid
<< " = " << c
->cnode
.bits
<< dendl
;
1425 return c
->cnode
.bits
;
1428 int KStore::collection_list(
1429 const coll_t
& cid
, const ghobject_t
& start
, const ghobject_t
& end
, int max
,
1430 vector
<ghobject_t
> *ls
, ghobject_t
*pnext
)
1432 CollectionHandle c
= _get_collection(cid
);
1435 return collection_list(c
, start
, end
, max
, ls
, pnext
);
1438 int KStore::collection_list(
1439 CollectionHandle
&c_
, const ghobject_t
& start
, const ghobject_t
& end
, int max
,
1440 vector
<ghobject_t
> *ls
, ghobject_t
*pnext
)
1443 Collection
*c
= static_cast<Collection
*>(c_
.get());
1444 dout(15) << __func__
<< " " << c
->cid
1445 << " start " << start
<< " end " << end
<< " max " << max
<< dendl
;
1448 RWLock::RLocker
l(c
->lock
);
1449 r
= _collection_list(c
, start
, end
, max
, ls
, pnext
);
1452 dout(10) << __func__
<< " " << c
->cid
1453 << " start " << start
<< " end " << end
<< " max " << max
1454 << " = " << r
<< ", ls.size() = " << ls
->size()
1455 << ", next = " << (pnext
? *pnext
: ghobject_t()) << dendl
;
1459 int KStore::_collection_list(
1460 Collection
* c
, const ghobject_t
& start
, const ghobject_t
& end
, int max
,
1461 vector
<ghobject_t
> *ls
, ghobject_t
*pnext
)
1464 KeyValueDB::Iterator it
;
1465 string temp_start_key
, temp_end_key
;
1466 string start_key
, end_key
;
1467 bool set_next
= false;
1471 ghobject_t static_next
;
1473 pnext
= &static_next
;
1475 if (start
== ghobject_t::get_max() ||
1476 start
.hobj
.is_max()) {
1479 get_coll_key_range(c
->cid
, c
->cnode
.bits
, &temp_start_key
, &temp_end_key
,
1480 &start_key
, &end_key
);
1481 dout(20) << __func__
1482 << " range " << pretty_binary_string(temp_start_key
)
1483 << " to " << pretty_binary_string(temp_end_key
)
1484 << " and " << pretty_binary_string(start_key
)
1485 << " to " << pretty_binary_string(end_key
)
1486 << " start " << start
<< dendl
;
1487 it
= db
->get_iterator(PREFIX_OBJ
);
1488 if (start
== ghobject_t() || start
== c
->cid
.get_min_hobj()) {
1489 it
->upper_bound(temp_start_key
);
1493 get_object_key(cct
, start
, &k
);
1494 if (start
.hobj
.is_temp()) {
1496 assert(k
>= temp_start_key
&& k
< temp_end_key
);
1499 assert(k
>= start_key
&& k
< end_key
);
1501 dout(20) << " start from " << pretty_binary_string(k
)
1502 << " temp=" << (int)temp
<< dendl
;
1505 if (end
.hobj
.is_max()) {
1506 pend
= temp
? temp_end_key
: end_key
;
1508 get_object_key(cct
, end
, &end_key
);
1509 if (end
.hobj
.is_temp()) {
1515 pend
= temp
? temp_end_key
: end_key
;
1518 dout(20) << __func__
<< " pend " << pretty_binary_string(pend
) << dendl
;
1520 if (!it
->valid() || it
->key() >= pend
) {
1522 dout(20) << __func__
<< " iterator not valid (end of db?)" << dendl
;
1524 dout(20) << __func__
<< " key " << pretty_binary_string(it
->key())
1525 << " > " << end
<< dendl
;
1527 if (end
.hobj
.is_temp()) {
1530 dout(30) << __func__
<< " switch to non-temp namespace" << dendl
;
1532 it
->upper_bound(start_key
);
1534 dout(30) << __func__
<< " pend " << pretty_binary_string(pend
) << dendl
;
1539 dout(20) << __func__
<< " key " << pretty_binary_string(it
->key()) << dendl
;
1541 int r
= get_key_object(it
->key(), &oid
);
1543 if (ls
->size() >= (unsigned)max
) {
1544 dout(20) << __func__
<< " reached max " << max
<< dendl
;
1554 *pnext
= ghobject_t::get_max();
1561 KStore::OmapIteratorImpl::OmapIteratorImpl(
1562 CollectionRef c
, OnodeRef o
, KeyValueDB::Iterator it
)
1563 : c(c
), o(o
), it(it
)
1565 RWLock::RLocker
l(c
->lock
);
1566 if (o
->onode
.omap_head
) {
1567 get_omap_key(o
->onode
.omap_head
, string(), &head
);
1568 get_omap_tail(o
->onode
.omap_head
, &tail
);
1569 it
->lower_bound(head
);
1573 int KStore::OmapIteratorImpl::seek_to_first()
1575 RWLock::RLocker
l(c
->lock
);
1576 if (o
->onode
.omap_head
) {
1577 it
->lower_bound(head
);
1579 it
= KeyValueDB::Iterator();
1584 int KStore::OmapIteratorImpl::upper_bound(const string
& after
)
1586 RWLock::RLocker
l(c
->lock
);
1587 if (o
->onode
.omap_head
) {
1589 get_omap_key(o
->onode
.omap_head
, after
, &key
);
1590 it
->upper_bound(key
);
1592 it
= KeyValueDB::Iterator();
1597 int KStore::OmapIteratorImpl::lower_bound(const string
& to
)
1599 RWLock::RLocker
l(c
->lock
);
1600 if (o
->onode
.omap_head
) {
1602 get_omap_key(o
->onode
.omap_head
, to
, &key
);
1603 it
->lower_bound(key
);
1605 it
= KeyValueDB::Iterator();
1610 bool KStore::OmapIteratorImpl::valid()
1612 RWLock::RLocker
l(c
->lock
);
1613 if (o
->onode
.omap_head
&& it
->valid() && it
->raw_key().second
<= tail
) {
1620 int KStore::OmapIteratorImpl::next(bool validate
)
1622 RWLock::RLocker
l(c
->lock
);
1623 if (o
->onode
.omap_head
) {
1631 string
KStore::OmapIteratorImpl::key()
1633 RWLock::RLocker
l(c
->lock
);
1634 assert(it
->valid());
1635 string db_key
= it
->raw_key().second
;
1637 decode_omap_key(db_key
, &user_key
);
1641 bufferlist
KStore::OmapIteratorImpl::value()
1643 RWLock::RLocker
l(c
->lock
);
1644 assert(it
->valid());
1648 int KStore::omap_get(
1649 const coll_t
& cid
, ///< [in] Collection containing oid
1650 const ghobject_t
&oid
, ///< [in] Object containing omap
1651 bufferlist
*header
, ///< [out] omap header
1652 map
<string
, bufferlist
> *out
/// < [out] Key to value map
1655 dout(15) << __func__
<< " " << cid
<< " oid " << oid
<< dendl
;
1656 CollectionRef c
= _get_collection(cid
);
1659 RWLock::RLocker
l(c
->lock
);
1661 OnodeRef o
= c
->get_onode(oid
, false);
1662 if (!o
|| !o
->exists
) {
1666 if (!o
->onode
.omap_head
)
1670 KeyValueDB::Iterator it
= db
->get_iterator(PREFIX_OMAP
);
1672 get_omap_header(o
->onode
.omap_head
, &head
);
1673 get_omap_tail(o
->onode
.omap_head
, &tail
);
1674 it
->lower_bound(head
);
1675 while (it
->valid()) {
1676 if (it
->key() == head
) {
1677 dout(30) << __func__
<< " got header" << dendl
;
1678 *header
= it
->value();
1679 } else if (it
->key() >= tail
) {
1680 dout(30) << __func__
<< " reached tail" << dendl
;
1684 decode_omap_key(it
->key(), &user_key
);
1685 dout(30) << __func__
<< " got " << pretty_binary_string(it
->key())
1686 << " -> " << user_key
<< dendl
;
1687 assert(it
->key() < tail
);
1688 (*out
)[user_key
] = it
->value();
1694 dout(10) << __func__
<< " " << cid
<< " oid " << oid
<< " = " << r
<< dendl
;
1698 int KStore::omap_get_header(
1699 const coll_t
& cid
, ///< [in] Collection containing oid
1700 const ghobject_t
&oid
, ///< [in] Object containing omap
1701 bufferlist
*header
, ///< [out] omap header
1702 bool allow_eio
///< [in] don't assert on eio
1705 dout(15) << __func__
<< " " << cid
<< " oid " << oid
<< dendl
;
1706 CollectionRef c
= _get_collection(cid
);
1709 RWLock::RLocker
l(c
->lock
);
1711 OnodeRef o
= c
->get_onode(oid
, false);
1712 if (!o
|| !o
->exists
) {
1716 if (!o
->onode
.omap_head
)
1721 get_omap_header(o
->onode
.omap_head
, &head
);
1722 if (db
->get(PREFIX_OMAP
, head
, header
) >= 0) {
1723 dout(30) << __func__
<< " got header" << dendl
;
1725 dout(30) << __func__
<< " no header" << dendl
;
1729 dout(10) << __func__
<< " " << cid
<< " oid " << oid
<< " = " << r
<< dendl
;
1733 int KStore::omap_get_keys(
1734 const coll_t
& cid
, ///< [in] Collection containing oid
1735 const ghobject_t
&oid
, ///< [in] Object containing omap
1736 set
<string
> *keys
///< [out] Keys defined on oid
1739 dout(15) << __func__
<< " " << cid
<< " oid " << oid
<< dendl
;
1740 CollectionRef c
= _get_collection(cid
);
1743 RWLock::RLocker
l(c
->lock
);
1745 OnodeRef o
= c
->get_onode(oid
, false);
1746 if (!o
|| !o
->exists
) {
1750 if (!o
->onode
.omap_head
)
1754 KeyValueDB::Iterator it
= db
->get_iterator(PREFIX_OMAP
);
1756 get_omap_key(o
->onode
.omap_head
, string(), &head
);
1757 get_omap_tail(o
->onode
.omap_head
, &tail
);
1758 it
->lower_bound(head
);
1759 while (it
->valid()) {
1760 if (it
->key() >= tail
) {
1761 dout(30) << __func__
<< " reached tail" << dendl
;
1765 decode_omap_key(it
->key(), &user_key
);
1766 dout(30) << __func__
<< " got " << pretty_binary_string(it
->key())
1767 << " -> " << user_key
<< dendl
;
1768 assert(it
->key() < tail
);
1769 keys
->insert(user_key
);
1774 dout(10) << __func__
<< " " << cid
<< " oid " << oid
<< " = " << r
<< dendl
;
1778 int KStore::omap_get_values(
1779 const coll_t
& cid
, ///< [in] Collection containing oid
1780 const ghobject_t
&oid
, ///< [in] Object containing omap
1781 const set
<string
> &keys
, ///< [in] Keys to get
1782 map
<string
, bufferlist
> *out
///< [out] Returned keys and values
1785 dout(15) << __func__
<< " " << cid
<< " oid " << oid
<< dendl
;
1786 CollectionRef c
= _get_collection(cid
);
1789 RWLock::RLocker
l(c
->lock
);
1791 OnodeRef o
= c
->get_onode(oid
, false);
1792 if (!o
|| !o
->exists
) {
1796 if (!o
->onode
.omap_head
)
1799 for (set
<string
>::const_iterator p
= keys
.begin(); p
!= keys
.end(); ++p
) {
1801 get_omap_key(o
->onode
.omap_head
, *p
, &key
);
1803 if (db
->get(PREFIX_OMAP
, key
, &val
) >= 0) {
1804 dout(30) << __func__
<< " got " << pretty_binary_string(key
)
1805 << " -> " << *p
<< dendl
;
1806 out
->insert(make_pair(*p
, val
));
1810 dout(10) << __func__
<< " " << cid
<< " oid " << oid
<< " = " << r
<< dendl
;
1814 int KStore::omap_check_keys(
1815 const coll_t
& cid
, ///< [in] Collection containing oid
1816 const ghobject_t
&oid
, ///< [in] Object containing omap
1817 const set
<string
> &keys
, ///< [in] Keys to check
1818 set
<string
> *out
///< [out] Subset of keys defined on oid
1821 dout(15) << __func__
<< " " << cid
<< " oid " << oid
<< dendl
;
1822 CollectionRef c
= _get_collection(cid
);
1825 RWLock::RLocker
l(c
->lock
);
1827 OnodeRef o
= c
->get_onode(oid
, false);
1828 if (!o
|| !o
->exists
) {
1832 if (!o
->onode
.omap_head
)
1835 for (set
<string
>::const_iterator p
= keys
.begin(); p
!= keys
.end(); ++p
) {
1837 get_omap_key(o
->onode
.omap_head
, *p
, &key
);
1839 if (db
->get(PREFIX_OMAP
, key
, &val
) >= 0) {
1840 dout(30) << __func__
<< " have " << pretty_binary_string(key
)
1841 << " -> " << *p
<< dendl
;
1844 dout(30) << __func__
<< " miss " << pretty_binary_string(key
)
1845 << " -> " << *p
<< dendl
;
1849 dout(10) << __func__
<< " " << cid
<< " oid " << oid
<< " = " << r
<< dendl
;
1853 ObjectMap::ObjectMapIterator
KStore::get_omap_iterator(
1854 const coll_t
& cid
, ///< [in] collection
1855 const ghobject_t
&oid
///< [in] object
1859 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
1860 CollectionRef c
= _get_collection(cid
);
1862 dout(10) << __func__
<< " " << cid
<< "doesn't exist" <<dendl
;
1863 return ObjectMap::ObjectMapIterator();
1865 RWLock::RLocker
l(c
->lock
);
1866 OnodeRef o
= c
->get_onode(oid
, false);
1867 if (!o
|| !o
->exists
) {
1868 dout(10) << __func__
<< " " << oid
<< "doesn't exist" <<dendl
;
1869 return ObjectMap::ObjectMapIterator();
1872 dout(10) << __func__
<< " header = " << o
->onode
.omap_head
<<dendl
;
1873 KeyValueDB::Iterator it
= db
->get_iterator(PREFIX_OMAP
);
1874 return ObjectMap::ObjectMapIterator(new OmapIteratorImpl(c
, o
, it
));
1878 // -----------------
1881 int KStore::_open_super_meta()
1887 db
->get(PREFIX_SUPER
, "nid_max", &bl
);
1888 bufferlist::iterator p
= bl
.begin();
1890 ::decode(nid_max
, p
);
1891 } catch (buffer::error
& e
) {
1893 dout(10) << __func__
<< " old nid_max " << nid_max
<< dendl
;
1899 void KStore::_assign_nid(TransContext
*txc
, OnodeRef o
)
1903 std::lock_guard
<std::mutex
> l(nid_lock
);
1904 o
->onode
.nid
= ++nid_last
;
1905 dout(20) << __func__
<< " " << o
->oid
<< " nid " << o
->onode
.nid
<< dendl
;
1906 if (nid_last
> nid_max
) {
1907 nid_max
+= cct
->_conf
->kstore_nid_prealloc
;
1909 ::encode(nid_max
, bl
);
1910 txc
->t
->set(PREFIX_SUPER
, "nid_max", bl
);
1911 dout(10) << __func__
<< " nid_max now " << nid_max
<< dendl
;
1915 KStore::TransContext
*KStore::_txc_create(OpSequencer
*osr
)
1917 TransContext
*txc
= new TransContext(osr
);
1918 txc
->t
= db
->get_transaction();
1919 osr
->queue_new(txc
);
1920 dout(20) << __func__
<< " osr " << osr
<< " = " << txc
<< dendl
;
1924 void KStore::_txc_state_proc(TransContext
*txc
)
1927 dout(10) << __func__
<< " txc " << txc
1928 << " " << txc
->get_state_name() << dendl
;
1929 switch (txc
->state
) {
1930 case TransContext::STATE_PREPARE
:
1931 txc
->log_state_latency(logger
, l_kstore_state_prepare_lat
);
1932 txc
->state
= TransContext::STATE_KV_QUEUED
;
1933 if (!cct
->_conf
->kstore_sync_transaction
) {
1934 std::lock_guard
<std::mutex
> l(kv_lock
);
1935 if (cct
->_conf
->kstore_sync_submit_transaction
) {
1936 int r
= db
->submit_transaction(txc
->t
);
1939 kv_queue
.push_back(txc
);
1940 kv_cond
.notify_one();
1944 int r
= db
->submit_transaction_sync(txc
->t
);
1949 case TransContext::STATE_KV_QUEUED
:
1950 txc
->log_state_latency(logger
, l_kstore_state_kv_queued_lat
);
1951 txc
->state
= TransContext::STATE_KV_DONE
;
1952 _txc_finish_kv(txc
);
1955 case TransContext::STATE_KV_DONE
:
1956 txc
->log_state_latency(logger
, l_kstore_state_kv_done_lat
);
1957 txc
->state
= TransContext::STATE_FINISHING
;
1960 case TransContext::TransContext::STATE_FINISHING
:
1961 txc
->log_state_latency(logger
, l_kstore_state_finishing_lat
);
1966 derr
<< __func__
<< " unexpected txc " << txc
1967 << " state " << txc
->get_state_name() << dendl
;
1968 assert(0 == "unexpected txc state");
1974 void KStore::_txc_finalize(OpSequencer
*osr
, TransContext
*txc
)
1976 dout(20) << __func__
<< " osr " << osr
<< " txc " << txc
1977 << " onodes " << txc
->onodes
<< dendl
;
1980 for (set
<OnodeRef
>::iterator p
= txc
->onodes
.begin();
1981 p
!= txc
->onodes
.end();
1984 ::encode((*p
)->onode
, bl
);
1985 dout(20) << " onode size is " << bl
.length() << dendl
;
1986 txc
->t
->set(PREFIX_OBJ
, (*p
)->key
, bl
);
1988 std::lock_guard
<std::mutex
> l((*p
)->flush_lock
);
1989 (*p
)->flush_txns
.insert(txc
);
1993 void KStore::_txc_finish_kv(TransContext
*txc
)
1995 dout(20) << __func__
<< " txc " << txc
<< dendl
;
1997 // warning: we're calling onreadable_sync inside the sequencer lock
1998 if (txc
->onreadable_sync
) {
1999 txc
->onreadable_sync
->complete(0);
2000 txc
->onreadable_sync
= NULL
;
2002 if (txc
->onreadable
) {
2003 finisher
.queue(txc
->onreadable
);
2004 txc
->onreadable
= NULL
;
2006 if (txc
->oncommit
) {
2007 finisher
.queue(txc
->oncommit
);
2008 txc
->oncommit
= NULL
;
2010 if (!txc
->oncommits
.empty()) {
2011 finisher
.queue(txc
->oncommits
);
2014 throttle_ops
.put(txc
->ops
);
2015 throttle_bytes
.put(txc
->bytes
);
2018 void KStore::_txc_finish(TransContext
*txc
)
2020 dout(20) << __func__
<< " " << txc
<< " onodes " << txc
->onodes
<< dendl
;
2021 assert(txc
->state
== TransContext::STATE_FINISHING
);
2023 for (set
<OnodeRef
>::iterator p
= txc
->onodes
.begin();
2024 p
!= txc
->onodes
.end();
2026 std::lock_guard
<std::mutex
> l((*p
)->flush_lock
);
2027 dout(20) << __func__
<< " onode " << *p
<< " had " << (*p
)->flush_txns
2029 assert((*p
)->flush_txns
.count(txc
));
2030 (*p
)->flush_txns
.erase(txc
);
2031 if ((*p
)->flush_txns
.empty()) {
2032 (*p
)->flush_cond
.notify_all();
2033 (*p
)->clear_pending_stripes();
2038 txc
->onodes
.clear();
2040 while (!txc
->removed_collections
.empty()) {
2041 _queue_reap_collection(txc
->removed_collections
.front());
2042 txc
->removed_collections
.pop_front();
2045 OpSequencerRef osr
= txc
->osr
;
2047 std::lock_guard
<std::mutex
> l(osr
->qlock
);
2048 txc
->state
= TransContext::STATE_DONE
;
2051 _osr_reap_done(osr
.get());
2054 void KStore::_osr_reap_done(OpSequencer
*osr
)
2056 std::lock_guard
<std::mutex
> l(osr
->qlock
);
2057 dout(20) << __func__
<< " osr " << osr
<< dendl
;
2058 while (!osr
->q
.empty()) {
2059 TransContext
*txc
= &osr
->q
.front();
2060 dout(20) << __func__
<< " txc " << txc
<< " " << txc
->get_state_name()
2062 if (txc
->state
!= TransContext::STATE_DONE
) {
2066 if (txc
->first_collection
) {
2067 txc
->first_collection
->onode_map
.trim(cct
->_conf
->kstore_onode_map_size
);
2071 txc
->log_state_latency(logger
, l_kstore_state_done_lat
);
2073 osr
->qcond
.notify_all();
2075 dout(20) << __func__
<< " osr " << osr
<< " q now empty" << dendl
;
2079 void KStore::_kv_sync_thread()
2081 dout(10) << __func__
<< " start" << dendl
;
2082 std::unique_lock
<std::mutex
> l(kv_lock
);
2084 assert(kv_committing
.empty());
2085 if (kv_queue
.empty()) {
2088 dout(20) << __func__
<< " sleep" << dendl
;
2089 kv_sync_cond
.notify_all();
2091 dout(20) << __func__
<< " wake" << dendl
;
2093 dout(20) << __func__
<< " committing " << kv_queue
.size() << dendl
;
2094 kv_committing
.swap(kv_queue
);
2095 utime_t start
= ceph_clock_now();
2098 dout(30) << __func__
<< " committing txc " << kv_committing
<< dendl
;
2100 // one transaction to force a sync
2101 KeyValueDB::Transaction t
= db
->get_transaction();
2102 if (!cct
->_conf
->kstore_sync_submit_transaction
) {
2103 for (std::deque
<TransContext
*>::iterator it
= kv_committing
.begin();
2104 it
!= kv_committing
.end();
2106 int r
= db
->submit_transaction((*it
)->t
);
2110 int r
= db
->submit_transaction_sync(t
);
2112 utime_t finish
= ceph_clock_now();
2113 utime_t dur
= finish
- start
;
2114 dout(20) << __func__
<< " committed " << kv_committing
.size()
2115 << " in " << dur
<< dendl
;
2116 while (!kv_committing
.empty()) {
2117 TransContext
*txc
= kv_committing
.front();
2118 _txc_state_proc(txc
);
2119 kv_committing
.pop_front();
2122 // this is as good a place as any ...
2123 _reap_collections();
2128 dout(10) << __func__
<< " finish" << dendl
;
2132 // ---------------------------
2135 int KStore::queue_transactions(
2137 vector
<Transaction
>& tls
,
2139 ThreadPool::TPHandle
*handle
)
2141 Context
*onreadable
;
2143 Context
*onreadable_sync
;
2144 ObjectStore::Transaction::collect_contexts(
2145 tls
, &onreadable
, &ondisk
, &onreadable_sync
);
2147 // set up the sequencer
2151 osr
= static_cast<OpSequencer
*>(posr
->p
.get());
2152 dout(10) << __func__
<< " existing " << osr
<< " " << *osr
<< dendl
;
2154 osr
= new OpSequencer(cct
);
2157 dout(10) << __func__
<< " new " << osr
<< " " << *osr
<< dendl
;
2161 TransContext
*txc
= _txc_create(osr
);
2162 txc
->onreadable
= onreadable
;
2163 txc
->onreadable_sync
= onreadable_sync
;
2164 txc
->oncommit
= ondisk
;
2166 for (vector
<Transaction
>::iterator p
= tls
.begin(); p
!= tls
.end(); ++p
) {
2168 txc
->ops
+= (*p
).get_num_ops();
2169 txc
->bytes
+= (*p
).get_num_bytes();
2170 _txc_add_transaction(txc
, &(*p
));
2173 _txc_finalize(osr
, txc
);
2175 throttle_ops
.get(txc
->ops
);
2176 throttle_bytes
.get(txc
->bytes
);
2179 _txc_state_proc(txc
);
2183 void KStore::_txc_add_transaction(TransContext
*txc
, Transaction
*t
)
2185 Transaction::iterator i
= t
->begin();
2187 dout(30) << __func__
<< " transaction dump:\n";
2188 JSONFormatter
f(true);
2189 f
.open_object_section("transaction");
2195 vector
<CollectionRef
> cvec(i
.colls
.size());
2197 for (vector
<coll_t
>::iterator p
= i
.colls
.begin(); p
!= i
.colls
.end();
2199 cvec
[j
] = _get_collection(*p
);
2201 // note first collection we reference
2202 if (!j
&& !txc
->first_collection
)
2203 txc
->first_collection
= cvec
[j
];
2205 vector
<OnodeRef
> ovec(i
.objects
.size());
2207 for (int pos
= 0; i
.have_op(); ++pos
) {
2208 Transaction::Op
*op
= i
.decode_op();
2212 if (op
->op
== Transaction::OP_NOP
)
2215 // collection operations
2216 CollectionRef
&c
= cvec
[op
->cid
];
2218 case Transaction::OP_RMCOLL
:
2220 coll_t cid
= i
.get_cid(op
->cid
);
2221 r
= _remove_collection(txc
, cid
, &c
);
2227 case Transaction::OP_MKCOLL
:
2230 coll_t cid
= i
.get_cid(op
->cid
);
2231 r
= _create_collection(txc
, cid
, op
->split_bits
, &c
);
2237 case Transaction::OP_SPLIT_COLLECTION
:
2238 assert(0 == "deprecated");
2241 case Transaction::OP_SPLIT_COLLECTION2
:
2243 uint32_t bits
= op
->split_bits
;
2244 uint32_t rem
= op
->split_rem
;
2245 r
= _split_collection(txc
, c
, cvec
[op
->dest_cid
], bits
, rem
);
2251 case Transaction::OP_COLL_HINT
:
2253 uint32_t type
= op
->hint_type
;
2256 bufferlist::iterator hiter
= hint
.begin();
2257 if (type
== Transaction::COLL_HINT_EXPECTED_NUM_OBJECTS
) {
2260 ::decode(pg_num
, hiter
);
2261 ::decode(num_objs
, hiter
);
2262 dout(10) << __func__
<< " collection hint objects is a no-op, "
2263 << " pg_num " << pg_num
<< " num_objects " << num_objs
2267 dout(10) << __func__
<< " unknown collection hint " << type
<< dendl
;
2273 case Transaction::OP_COLL_SETATTR
:
2277 case Transaction::OP_COLL_RMATTR
:
2281 case Transaction::OP_COLL_RENAME
:
2282 assert(0 == "not implemented");
2286 derr
<< " error " << cpp_strerror(r
)
2287 << " not handled on operation " << op
->op
2288 << " (op " << pos
<< ", counting from 0)" << dendl
;
2289 dout(0) << " transaction dump:\n";
2290 JSONFormatter
f(true);
2291 f
.open_object_section("transaction");
2296 assert(0 == "unexpected error");
2299 // object operations
2300 RWLock::WLocker
l(c
->lock
);
2301 OnodeRef
&o
= ovec
[op
->oid
];
2303 // these operations implicity create the object
2304 bool create
= false;
2305 if (op
->op
== Transaction::OP_TOUCH
||
2306 op
->op
== Transaction::OP_WRITE
||
2307 op
->op
== Transaction::OP_ZERO
) {
2310 ghobject_t oid
= i
.get_oid(op
->oid
);
2311 o
= c
->get_onode(oid
, create
);
2313 if (!o
|| !o
->exists
) {
2314 dout(10) << __func__
<< " op " << op
->op
<< " got ENOENT on "
2323 case Transaction::OP_TOUCH
:
2324 r
= _touch(txc
, c
, o
);
2327 case Transaction::OP_WRITE
:
2329 uint64_t off
= op
->off
;
2330 uint64_t len
= op
->len
;
2331 uint32_t fadvise_flags
= i
.get_fadvise_flags();
2334 r
= _write(txc
, c
, o
, off
, len
, bl
, fadvise_flags
);
2338 case Transaction::OP_ZERO
:
2340 uint64_t off
= op
->off
;
2341 uint64_t len
= op
->len
;
2342 r
= _zero(txc
, c
, o
, off
, len
);
2346 case Transaction::OP_TRIMCACHE
:
2348 // deprecated, no-op
2352 case Transaction::OP_TRUNCATE
:
2354 uint64_t off
= op
->off
;
2355 r
= _truncate(txc
, c
, o
, off
);
2359 case Transaction::OP_REMOVE
:
2360 r
= _remove(txc
, c
, o
);
2363 case Transaction::OP_SETATTR
:
2365 string name
= i
.decode_string();
2368 map
<string
, bufferptr
> to_set
;
2369 to_set
[name
] = bufferptr(bl
.c_str(), bl
.length());
2370 r
= _setattrs(txc
, c
, o
, to_set
);
2374 case Transaction::OP_SETATTRS
:
2376 map
<string
, bufferptr
> aset
;
2377 i
.decode_attrset(aset
);
2378 r
= _setattrs(txc
, c
, o
, aset
);
2382 case Transaction::OP_RMATTR
:
2384 string name
= i
.decode_string();
2385 r
= _rmattr(txc
, c
, o
, name
);
2389 case Transaction::OP_RMATTRS
:
2391 r
= _rmattrs(txc
, c
, o
);
2395 case Transaction::OP_CLONE
:
2397 const ghobject_t
& noid
= i
.get_oid(op
->dest_oid
);
2398 OnodeRef no
= c
->get_onode(noid
, true);
2399 r
= _clone(txc
, c
, o
, no
);
2403 case Transaction::OP_CLONERANGE
:
2404 assert(0 == "deprecated");
2407 case Transaction::OP_CLONERANGE2
:
2409 const ghobject_t
& noid
= i
.get_oid(op
->dest_oid
);
2410 OnodeRef no
= c
->get_onode(noid
, true);
2411 uint64_t srcoff
= op
->off
;
2412 uint64_t len
= op
->len
;
2413 uint64_t dstoff
= op
->dest_off
;
2414 r
= _clone_range(txc
, c
, o
, no
, srcoff
, len
, dstoff
);
2418 case Transaction::OP_COLL_ADD
:
2419 assert(0 == "not implemented");
2422 case Transaction::OP_COLL_REMOVE
:
2423 assert(0 == "not implemented");
2426 case Transaction::OP_COLL_MOVE
:
2427 assert(0 == "deprecated");
2430 case Transaction::OP_COLL_MOVE_RENAME
:
2432 assert(op
->cid
== op
->dest_cid
);
2433 const ghobject_t
& noid
= i
.get_oid(op
->dest_oid
);
2434 OnodeRef no
= c
->get_onode(noid
, true);
2435 r
= _rename(txc
, c
, o
, no
, noid
);
2440 case Transaction::OP_TRY_RENAME
:
2442 const ghobject_t
& noid
= i
.get_oid(op
->dest_oid
);
2443 OnodeRef no
= c
->get_onode(noid
, true);
2444 r
= _rename(txc
, c
, o
, no
, noid
);
2451 case Transaction::OP_OMAP_CLEAR
:
2453 r
= _omap_clear(txc
, c
, o
);
2456 case Transaction::OP_OMAP_SETKEYS
:
2459 i
.decode_attrset_bl(&aset_bl
);
2460 r
= _omap_setkeys(txc
, c
, o
, aset_bl
);
2463 case Transaction::OP_OMAP_RMKEYS
:
2466 i
.decode_keyset_bl(&keys_bl
);
2467 r
= _omap_rmkeys(txc
, c
, o
, keys_bl
);
2470 case Transaction::OP_OMAP_RMKEYRANGE
:
2473 first
= i
.decode_string();
2474 last
= i
.decode_string();
2475 r
= _omap_rmkey_range(txc
, c
, o
, first
, last
);
2478 case Transaction::OP_OMAP_SETHEADER
:
2482 r
= _omap_setheader(txc
, c
, o
, bl
);
2486 case Transaction::OP_SETALLOCHINT
:
2488 uint64_t expected_object_size
= op
->expected_object_size
;
2489 uint64_t expected_write_size
= op
->expected_write_size
;
2490 uint32_t flags
= op
->alloc_hint_flags
;
2491 r
= _setallochint(txc
, c
, o
,
2492 expected_object_size
,
2493 expected_write_size
,
2499 derr
<< "bad op " << op
->op
<< dendl
;
2507 if (r
== -ENOENT
&& !(op
->op
== Transaction::OP_CLONERANGE
||
2508 op
->op
== Transaction::OP_CLONE
||
2509 op
->op
== Transaction::OP_CLONERANGE2
||
2510 op
->op
== Transaction::OP_COLL_ADD
))
2511 // -ENOENT is usually okay
2517 const char *msg
= "unexpected error code";
2519 if (r
== -ENOENT
&& (op
->op
== Transaction::OP_CLONERANGE
||
2520 op
->op
== Transaction::OP_CLONE
||
2521 op
->op
== Transaction::OP_CLONERANGE2
))
2522 msg
= "ENOENT on clone suggests osd bug";
2525 // For now, if we hit _any_ ENOSPC, crash, before we do any damage
2526 // by partially applying transactions.
2527 msg
= "ENOSPC from key value store, misconfigured cluster";
2529 if (r
== -ENOTEMPTY
) {
2530 msg
= "ENOTEMPTY suggests garbage data in osd data dir";
2533 dout(0) << " error " << cpp_strerror(r
) << " not handled on operation " << op
->op
2534 << " (op " << pos
<< ", counting from 0)" << dendl
;
2535 dout(0) << msg
<< dendl
;
2536 dout(0) << " transaction dump:\n";
2537 JSONFormatter
f(true);
2538 f
.open_object_section("transaction");
2543 assert(0 == "unexpected error");
2551 // -----------------
2554 int KStore::_touch(TransContext
*txc
,
2558 dout(15) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< dendl
;
2561 _assign_nid(txc
, o
);
2562 txc
->write_onode(o
);
2563 dout(10) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< " = " << r
<< dendl
;
2567 void KStore::_dump_onode(OnodeRef o
)
2569 dout(30) << __func__
<< " " << o
2570 << " nid " << o
->onode
.nid
2571 << " size " << o
->onode
.size
2572 << " expected_object_size " << o
->onode
.expected_object_size
2573 << " expected_write_size " << o
->onode
.expected_write_size
2575 for (map
<string
,bufferptr
>::iterator p
= o
->onode
.attrs
.begin();
2576 p
!= o
->onode
.attrs
.end();
2578 dout(30) << __func__
<< " attr " << p
->first
2579 << " len " << p
->second
.length() << dendl
;
2583 void KStore::_do_read_stripe(OnodeRef o
, uint64_t offset
, bufferlist
*pbl
)
2585 map
<uint64_t,bufferlist
>::iterator p
= o
->pending_stripes
.find(offset
);
2586 if (p
== o
->pending_stripes
.end()) {
2588 get_data_key(o
->onode
.nid
, offset
, &key
);
2589 db
->get(PREFIX_DATA
, key
, pbl
);
2590 o
->pending_stripes
[offset
] = *pbl
;
2596 void KStore::_do_write_stripe(TransContext
*txc
, OnodeRef o
,
2597 uint64_t offset
, bufferlist
& bl
)
2599 o
->pending_stripes
[offset
] = bl
;
2601 get_data_key(o
->onode
.nid
, offset
, &key
);
2602 txc
->t
->set(PREFIX_DATA
, key
, bl
);
2605 void KStore::_do_remove_stripe(TransContext
*txc
, OnodeRef o
, uint64_t offset
)
2607 o
->pending_stripes
.erase(offset
);
2609 get_data_key(o
->onode
.nid
, offset
, &key
);
2610 txc
->t
->rmkey(PREFIX_DATA
, key
);
2613 int KStore::_do_write(TransContext
*txc
,
2615 uint64_t offset
, uint64_t length
,
2616 bufferlist
& orig_bl
,
2617 uint32_t fadvise_flags
)
2621 dout(20) << __func__
2622 << " " << o
->oid
<< " " << offset
<< "~" << length
2623 << " - have " << o
->onode
.size
2624 << " bytes, nid " << o
->onode
.nid
<< dendl
;
2632 uint64_t stripe_size
= o
->onode
.stripe_size
;
2634 o
->onode
.stripe_size
= cct
->_conf
->kstore_default_stripe_size
;
2635 stripe_size
= o
->onode
.stripe_size
;
2638 unsigned bl_off
= 0;
2639 while (length
> 0) {
2640 uint64_t offset_rem
= offset
% stripe_size
;
2641 uint64_t end_rem
= (offset
+ length
) % stripe_size
;
2642 if (offset_rem
== 0 && end_rem
== 0) {
2644 bl
.substr_of(orig_bl
, bl_off
, stripe_size
);
2645 dout(30) << __func__
<< " full stripe " << offset
<< dendl
;
2646 _do_write_stripe(txc
, o
, offset
, bl
);
2647 offset
+= stripe_size
;
2648 length
-= stripe_size
;
2649 bl_off
+= stripe_size
;
2652 uint64_t stripe_off
= offset
- offset_rem
;
2654 _do_read_stripe(o
, stripe_off
, &prev
);
2655 dout(20) << __func__
<< " read previous stripe " << stripe_off
2656 << ", got " << prev
.length() << dendl
;
2659 unsigned p
= MIN(prev
.length(), offset_rem
);
2661 dout(20) << __func__
<< " reuse leading " << p
<< " bytes" << dendl
;
2662 bl
.substr_of(prev
, 0, p
);
2664 if (p
< offset_rem
) {
2665 dout(20) << __func__
<< " add leading " << offset_rem
- p
<< " zeros" << dendl
;
2666 bl
.append_zero(offset_rem
- p
);
2669 unsigned use
= stripe_size
- offset_rem
;
2671 use
-= stripe_size
- end_rem
;
2672 dout(20) << __func__
<< " using " << use
<< " for this stripe" << dendl
;
2674 t
.substr_of(orig_bl
, bl_off
, use
);
2678 if (end_rem
< prev
.length()) {
2679 unsigned l
= prev
.length() - end_rem
;
2680 dout(20) << __func__
<< " reuse trailing " << l
<< " bytes" << dendl
;
2682 t
.substr_of(prev
, end_rem
, l
);
2686 dout(30) << " writing:\n";
2689 _do_write_stripe(txc
, o
, stripe_off
, bl
);
2694 if (offset
> o
->onode
.size
) {
2695 dout(20) << __func__
<< " extending size to " << offset
+ length
2697 o
->onode
.size
= offset
;
2703 int KStore::_write(TransContext
*txc
,
2706 uint64_t offset
, size_t length
,
2708 uint32_t fadvise_flags
)
2710 dout(15) << __func__
<< " " << c
->cid
<< " " << o
->oid
2711 << " " << offset
<< "~" << length
2713 _assign_nid(txc
, o
);
2714 int r
= _do_write(txc
, o
, offset
, length
, bl
, fadvise_flags
);
2715 txc
->write_onode(o
);
2717 dout(10) << __func__
<< " " << c
->cid
<< " " << o
->oid
2718 << " " << offset
<< "~" << length
2719 << " = " << r
<< dendl
;
2723 int KStore::_zero(TransContext
*txc
,
2726 uint64_t offset
, size_t length
)
2728 dout(15) << __func__
<< " " << c
->cid
<< " " << o
->oid
2729 << " " << offset
<< "~" << length
2735 _assign_nid(txc
, o
);
2737 uint64_t stripe_size
= o
->onode
.stripe_size
;
2739 uint64_t end
= offset
+ length
;
2740 uint64_t pos
= offset
;
2741 uint64_t stripe_off
= pos
% stripe_size
;
2742 while (pos
< offset
+ length
) {
2743 if (stripe_off
|| end
- pos
< stripe_size
) {
2745 _do_read_stripe(o
, pos
- stripe_off
, &stripe
);
2746 dout(30) << __func__
<< " stripe " << pos
- stripe_off
<< " got "
2747 << stripe
.length() << dendl
;
2749 bl
.substr_of(stripe
, 0, MIN(stripe
.length(), stripe_off
));
2750 if (end
>= pos
- stripe_off
+ stripe_size
||
2751 end
>= o
->onode
.size
) {
2752 dout(20) << __func__
<< " truncated stripe " << pos
- stripe_off
2753 << " to " << bl
.length() << dendl
;
2755 auto len
= end
- (pos
- stripe_off
+ bl
.length());
2756 bl
.append_zero(len
);
2757 dout(20) << __func__
<< " adding " << len
<< " of zeros" << dendl
;
2758 if (stripe
.length() > bl
.length()) {
2759 unsigned l
= stripe
.length() - bl
.length();
2761 t
.substr_of(stripe
, stripe
.length() - l
, l
);
2762 dout(20) << __func__
<< " keeping tail " << l
<< " of stripe" << dendl
;
2766 _do_write_stripe(txc
, o
, pos
- stripe_off
, bl
);
2767 pos
+= stripe_size
- stripe_off
;
2770 dout(20) << __func__
<< " rm stripe " << pos
<< dendl
;
2771 _do_remove_stripe(txc
, o
, pos
- stripe_off
);
2776 if (offset
+ length
> o
->onode
.size
) {
2777 o
->onode
.size
= offset
+ length
;
2778 dout(20) << __func__
<< " extending size to " << offset
+ length
2781 txc
->write_onode(o
);
2783 dout(10) << __func__
<< " " << c
->cid
<< " " << o
->oid
2784 << " " << offset
<< "~" << length
2785 << " = " << r
<< dendl
;
2789 int KStore::_do_truncate(TransContext
*txc
, OnodeRef o
, uint64_t offset
)
2791 uint64_t stripe_size
= o
->onode
.stripe_size
;
2795 // trim down stripes
2797 uint64_t pos
= offset
;
2798 uint64_t stripe_off
= pos
% stripe_size
;
2799 while (pos
< o
->onode
.size
) {
2802 _do_read_stripe(o
, pos
- stripe_off
, &stripe
);
2803 dout(30) << __func__
<< " stripe " << pos
- stripe_off
<< " got "
2804 << stripe
.length() << dendl
;
2806 t
.substr_of(stripe
, 0, MIN(stripe_off
, stripe
.length()));
2807 _do_write_stripe(txc
, o
, pos
- stripe_off
, t
);
2808 dout(20) << __func__
<< " truncated stripe " << pos
- stripe_off
2809 << " to " << t
.length() << dendl
;
2810 pos
+= stripe_size
- stripe_off
;
2813 dout(20) << __func__
<< " rm stripe " << pos
<< dendl
;
2814 _do_remove_stripe(txc
, o
, pos
- stripe_off
);
2819 // trim down cached tail
2820 if (o
->tail_bl
.length()) {
2821 if (offset
/ stripe_size
!= o
->onode
.size
/ stripe_size
) {
2822 dout(20) << __func__
<< " clear cached tail" << dendl
;
2828 o
->onode
.size
= offset
;
2829 dout(10) << __func__
<< " truncate size to " << offset
<< dendl
;
2831 txc
->write_onode(o
);
2835 int KStore::_truncate(TransContext
*txc
,
2840 dout(15) << __func__
<< " " << c
->cid
<< " " << o
->oid
2843 int r
= _do_truncate(txc
, o
, offset
);
2844 dout(10) << __func__
<< " " << c
->cid
<< " " << o
->oid
2846 << " = " << r
<< dendl
;
2850 int KStore::_do_remove(TransContext
*txc
,
2855 _do_truncate(txc
, o
, 0);
2858 if (o
->onode
.omap_head
) {
2859 _do_omap_clear(txc
, o
->onode
.omap_head
);
2862 o
->onode
= kstore_onode_t();
2863 txc
->onodes
.erase(o
);
2864 get_object_key(cct
, o
->oid
, &key
);
2865 txc
->t
->rmkey(PREFIX_OBJ
, key
);
2869 int KStore::_remove(TransContext
*txc
,
2873 dout(15) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< dendl
;
2874 int r
= _do_remove(txc
, o
);
2875 dout(10) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< " = " << r
<< dendl
;
2879 int KStore::_setattr(TransContext
*txc
,
2885 dout(15) << __func__
<< " " << c
->cid
<< " " << o
->oid
2886 << " " << name
<< " (" << val
.length() << " bytes)"
2889 o
->onode
.attrs
[name
] = val
;
2890 txc
->write_onode(o
);
2891 dout(10) << __func__
<< " " << c
->cid
<< " " << o
->oid
2892 << " " << name
<< " (" << val
.length() << " bytes)"
2893 << " = " << r
<< dendl
;
2897 int KStore::_setattrs(TransContext
*txc
,
2900 const map
<string
,bufferptr
>& aset
)
2902 dout(15) << __func__
<< " " << c
->cid
<< " " << o
->oid
2903 << " " << aset
.size() << " keys"
2906 for (map
<string
,bufferptr
>::const_iterator p
= aset
.begin();
2907 p
!= aset
.end(); ++p
) {
2908 if (p
->second
.is_partial())
2909 o
->onode
.attrs
[p
->first
] = bufferptr(p
->second
.c_str(), p
->second
.length());
2911 o
->onode
.attrs
[p
->first
] = p
->second
;
2913 txc
->write_onode(o
);
2914 dout(10) << __func__
<< " " << c
->cid
<< " " << o
->oid
2915 << " " << aset
.size() << " keys"
2916 << " = " << r
<< dendl
;
2921 int KStore::_rmattr(TransContext
*txc
,
2926 dout(15) << __func__
<< " " << c
->cid
<< " " << o
->oid
2927 << " " << name
<< dendl
;
2929 o
->onode
.attrs
.erase(name
);
2930 txc
->write_onode(o
);
2931 dout(10) << __func__
<< " " << c
->cid
<< " " << o
->oid
2932 << " " << name
<< " = " << r
<< dendl
;
2936 int KStore::_rmattrs(TransContext
*txc
,
2940 dout(15) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< dendl
;
2942 o
->onode
.attrs
.clear();
2943 txc
->write_onode(o
);
2944 dout(10) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< " = " << r
<< dendl
;
2948 void KStore::_do_omap_clear(TransContext
*txc
, uint64_t id
)
2950 KeyValueDB::Iterator it
= db
->get_iterator(PREFIX_OMAP
);
2951 string prefix
, tail
;
2952 get_omap_header(id
, &prefix
);
2953 get_omap_tail(id
, &tail
);
2954 it
->lower_bound(prefix
);
2955 while (it
->valid()) {
2956 if (it
->key() >= tail
) {
2957 dout(30) << __func__
<< " stop at " << tail
<< dendl
;
2960 txc
->t
->rmkey(PREFIX_OMAP
, it
->key());
2961 dout(30) << __func__
<< " rm " << pretty_binary_string(it
->key()) << dendl
;
2966 int KStore::_omap_clear(TransContext
*txc
,
2970 dout(15) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< dendl
;
2972 if (o
->onode
.omap_head
!= 0) {
2973 _do_omap_clear(txc
, o
->onode
.omap_head
);
2975 dout(10) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< " = " << r
<< dendl
;
2979 int KStore::_omap_setkeys(TransContext
*txc
,
2984 dout(15) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< dendl
;
2986 bufferlist::iterator p
= bl
.begin();
2988 if (!o
->onode
.omap_head
) {
2989 o
->onode
.omap_head
= o
->onode
.nid
;
2990 txc
->write_onode(o
);
2999 get_omap_key(o
->onode
.omap_head
, key
, &final_key
);
3000 dout(30) << __func__
<< " " << pretty_binary_string(final_key
)
3001 << " <- " << key
<< dendl
;
3002 txc
->t
->set(PREFIX_OMAP
, final_key
, value
);
3005 dout(10) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< " = " << r
<< dendl
;
3009 int KStore::_omap_setheader(TransContext
*txc
,
3014 dout(15) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< dendl
;
3017 if (!o
->onode
.omap_head
) {
3018 o
->onode
.omap_head
= o
->onode
.nid
;
3019 txc
->write_onode(o
);
3021 get_omap_header(o
->onode
.omap_head
, &key
);
3022 txc
->t
->set(PREFIX_OMAP
, key
, bl
);
3024 dout(10) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< " = " << r
<< dendl
;
3028 int KStore::_omap_rmkeys(TransContext
*txc
,
3033 dout(15) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< dendl
;
3035 bufferlist::iterator p
= bl
.begin();
3038 if (!o
->onode
.omap_head
) {
3047 get_omap_key(o
->onode
.omap_head
, key
, &final_key
);
3048 dout(30) << __func__
<< " rm " << pretty_binary_string(final_key
)
3049 << " <- " << key
<< dendl
;
3050 txc
->t
->rmkey(PREFIX_OMAP
, final_key
);
3055 dout(10) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< " = " << r
<< dendl
;
3059 int KStore::_omap_rmkey_range(TransContext
*txc
,
3062 const string
& first
, const string
& last
)
3064 dout(15) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< dendl
;
3065 KeyValueDB::Iterator it
;
3066 string key_first
, key_last
;
3069 if (!o
->onode
.omap_head
) {
3072 it
= db
->get_iterator(PREFIX_OMAP
);
3073 get_omap_key(o
->onode
.omap_head
, first
, &key_first
);
3074 get_omap_key(o
->onode
.omap_head
, last
, &key_last
);
3075 it
->lower_bound(key_first
);
3076 while (it
->valid()) {
3077 if (it
->key() >= key_last
) {
3078 dout(30) << __func__
<< " stop at " << pretty_binary_string(key_last
)
3082 txc
->t
->rmkey(PREFIX_OMAP
, it
->key());
3083 dout(30) << __func__
<< " rm " << pretty_binary_string(it
->key()) << dendl
;
3089 dout(10) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< " = " << r
<< dendl
;
3093 int KStore::_setallochint(TransContext
*txc
,
3096 uint64_t expected_object_size
,
3097 uint64_t expected_write_size
,
3100 dout(15) << __func__
<< " " << c
->cid
<< " " << o
->oid
3101 << " object_size " << expected_object_size
3102 << " write_size " << expected_write_size
3103 << " flags " << flags
3106 o
->onode
.expected_object_size
= expected_object_size
;
3107 o
->onode
.expected_write_size
= expected_write_size
;
3108 o
->onode
.alloc_hint_flags
= flags
;
3110 txc
->write_onode(o
);
3111 dout(10) << __func__
<< " " << c
->cid
<< " " << o
->oid
3112 << " object_size " << expected_object_size
3113 << " write_size " << expected_write_size
3114 << " = " << r
<< dendl
;
3118 int KStore::_clone(TransContext
*txc
,
3123 dout(15) << __func__
<< " " << c
->cid
<< " " << oldo
->oid
<< " -> "
3124 << newo
->oid
<< dendl
;
3126 if (oldo
->oid
.hobj
.get_hash() != newo
->oid
.hobj
.get_hash()) {
3127 derr
<< __func__
<< " mismatched hash on " << oldo
->oid
3128 << " and " << newo
->oid
<< dendl
;
3133 newo
->exists
= true;
3134 _assign_nid(txc
, newo
);
3139 r
= _do_read(oldo
, 0, oldo
->onode
.size
, bl
, 0);
3143 // truncate any old data
3144 r
= _do_truncate(txc
, newo
, 0);
3148 r
= _do_write(txc
, newo
, 0, oldo
->onode
.size
, bl
, 0);
3152 newo
->onode
.attrs
= oldo
->onode
.attrs
;
3155 if (newo
->onode
.omap_head
) {
3156 dout(20) << __func__
<< " clearing old omap data" << dendl
;
3157 _do_omap_clear(txc
, newo
->onode
.omap_head
);
3159 if (oldo
->onode
.omap_head
) {
3160 dout(20) << __func__
<< " copying omap data" << dendl
;
3161 if (!newo
->onode
.omap_head
) {
3162 newo
->onode
.omap_head
= newo
->onode
.nid
;
3164 KeyValueDB::Iterator it
= db
->get_iterator(PREFIX_OMAP
);
3166 get_omap_header(oldo
->onode
.omap_head
, &head
);
3167 get_omap_tail(oldo
->onode
.omap_head
, &tail
);
3168 it
->lower_bound(head
);
3169 while (it
->valid()) {
3171 if (it
->key() >= tail
) {
3172 dout(30) << __func__
<< " reached tail" << dendl
;
3175 dout(30) << __func__
<< " got header/data "
3176 << pretty_binary_string(it
->key()) << dendl
;
3177 assert(it
->key() < tail
);
3178 rewrite_omap_key(newo
->onode
.omap_head
, it
->key(), &key
);
3179 txc
->t
->set(PREFIX_OMAP
, key
, it
->value());
3185 txc
->write_onode(newo
);
3189 dout(10) << __func__
<< " " << c
->cid
<< " " << oldo
->oid
<< " -> "
3190 << newo
->oid
<< " = " << r
<< dendl
;
3194 int KStore::_clone_range(TransContext
*txc
,
3198 uint64_t srcoff
, uint64_t length
, uint64_t dstoff
)
3200 dout(15) << __func__
<< " " << c
->cid
<< " " << oldo
->oid
<< " -> "
3201 << newo
->oid
<< " from " << srcoff
<< "~" << length
3202 << " to offset " << dstoff
<< dendl
;
3206 newo
->exists
= true;
3207 _assign_nid(txc
, newo
);
3209 r
= _do_read(oldo
, srcoff
, length
, bl
, 0);
3213 r
= _do_write(txc
, newo
, dstoff
, bl
.length(), bl
, 0);
3217 txc
->write_onode(newo
);
3222 dout(10) << __func__
<< " " << c
->cid
<< " " << oldo
->oid
<< " -> "
3223 << newo
->oid
<< " from " << srcoff
<< "~" << length
3224 << " to offset " << dstoff
3225 << " = " << r
<< dendl
;
3229 int KStore::_rename(TransContext
*txc
,
3233 const ghobject_t
& new_oid
)
3235 dout(15) << __func__
<< " " << c
->cid
<< " " << oldo
->oid
<< " -> "
3236 << new_oid
<< dendl
;
3238 ghobject_t old_oid
= oldo
->oid
;
3240 string old_key
, new_key
;
3242 if (newo
&& newo
->exists
) {
3243 // destination object already exists, remove it first
3244 r
= _do_remove(txc
, newo
);
3249 txc
->t
->rmkey(PREFIX_OBJ
, oldo
->key
);
3250 txc
->write_onode(oldo
);
3251 c
->onode_map
.rename(old_oid
, new_oid
); // this adjusts oldo->{oid,key}
3255 dout(10) << __func__
<< " " << c
->cid
<< " " << old_oid
<< " -> "
3256 << new_oid
<< " = " << r
<< dendl
;
3262 int KStore::_create_collection(
3268 dout(15) << __func__
<< " " << cid
<< " bits " << bits
<< dendl
;
3273 RWLock::WLocker
l(coll_lock
);
3278 c
->reset(new Collection(this, cid
));
3279 (*c
)->cnode
.bits
= bits
;
3282 ::encode((*c
)->cnode
, bl
);
3283 txc
->t
->set(PREFIX_COLL
, stringify(cid
), bl
);
3287 dout(10) << __func__
<< " " << cid
<< " bits " << bits
<< " = " << r
<< dendl
;
3291 int KStore::_remove_collection(TransContext
*txc
, coll_t cid
,
3294 dout(15) << __func__
<< " " << cid
<< dendl
;
3298 RWLock::WLocker
l(coll_lock
);
3303 size_t nonexistent_count
= 0;
3304 pair
<ghobject_t
,OnodeRef
> next_onode
;
3305 while ((*c
)->onode_map
.get_next(next_onode
.first
, &next_onode
)) {
3306 if (next_onode
.second
->exists
) {
3310 ++nonexistent_count
;
3312 vector
<ghobject_t
> ls
;
3314 // Enumerate onodes in db, up to nonexistent_count + 1
3315 // then check if all of them are marked as non-existent.
3316 // Bypass the check if returned number is greater than nonexistent_count
3317 r
= _collection_list(c
->get(), ghobject_t(), ghobject_t::get_max(),
3318 nonexistent_count
+ 1, &ls
, &next
);
3320 bool exists
= false; //ls.size() > nonexistent_count;
3321 for (auto it
= ls
.begin(); !exists
&& it
< ls
.end(); ++it
) {
3322 dout(10) << __func__
<< " oid " << *it
<< dendl
;
3323 auto onode
= (*c
)->onode_map
.lookup(*it
);
3324 exists
= !onode
|| onode
->exists
;
3326 dout(10) << __func__
<< " " << *it
3327 << " exists in db" << dendl
;
3331 coll_map
.erase(cid
);
3332 txc
->removed_collections
.push_back(*c
);
3334 txc
->t
->rmkey(PREFIX_COLL
, stringify(cid
));
3337 dout(10) << __func__
<< " " << cid
3338 << " is non-empty" << dendl
;
3345 dout(10) << __func__
<< " " << cid
<< " = " << r
<< dendl
;
3349 int KStore::_split_collection(TransContext
*txc
,
3352 unsigned bits
, int rem
)
3354 dout(15) << __func__
<< " " << c
->cid
<< " to " << d
->cid
<< " "
3355 << " bits " << bits
<< dendl
;
3357 RWLock::WLocker
l(c
->lock
);
3358 RWLock::WLocker
l2(d
->lock
);
3359 c
->onode_map
.clear();
3360 d
->onode_map
.clear();
3361 c
->cnode
.bits
= bits
;
3362 assert(d
->cnode
.bits
== bits
);
3366 ::encode(c
->cnode
, bl
);
3367 txc
->t
->set(PREFIX_COLL
, stringify(c
->cid
), bl
);
3369 dout(10) << __func__
<< " " << c
->cid
<< " to " << d
->cid
<< " "
3370 << " bits " << bits
<< " = " << r
<< dendl
;
3374 // ===========================================