1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 Red Hat
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
17 #include <sys/types.h>
23 #include "osd/osd_types.h"
25 #include "include/compat.h"
26 #include "include/stringify.h"
27 #include "common/errno.h"
28 #include "common/safe_io.h"
29 #include "common/Formatter.h"
32 #define dout_context cct
33 #define dout_subsys ceph_subsys_kstore
39 * superblock, features
40 * refcounted extents (for efficient clone)
44 const string PREFIX_SUPER
= "S"; // field -> value
45 const string PREFIX_COLL
= "C"; // collection name -> (nothing)
46 const string PREFIX_OBJ
= "O"; // object name -> onode
47 const string PREFIX_DATA
= "D"; // nid + offset -> data
48 const string PREFIX_OMAP
= "M"; // u64 + keyname -> value
51 * object name key structure
53 * 2 chars: shard (-- for none, or hex digit, so that we sort properly)
54 * encoded u64: poolid + 2^63 (so that it sorts properly)
55 * encoded u32: hash (bit reversed)
59 * escaped string: namespace
61 * 1 char: '<', '=', or '>'. if =, then object key == object name, and
62 * we are followed just by the key. otherwise, we are followed by
63 * the key and then the object name.
65 * escaped string: object name (unless '=' above)
68 * encoded u64: generation
72 * string encoding in the key
74 * The key string needs to lexicographically sort the same way that
75 * ghobject_t does. We do this by escaping anything <= to '#' with #
76 * plus a 2 digit hex string, and anything >= '~' with ~ plus the two
79 * We use ! as a terminator for strings; this works because it is < #
80 * and will get escaped if it is present in the string.
84 static void append_escaped(const string
&in
, string
*out
)
87 for (string::const_iterator i
= in
.begin(); i
!= in
.end(); ++i
) {
89 snprintf(hexbyte
, sizeof(hexbyte
), "#%02x", (uint8_t)*i
);
91 } else if (*i
>= '~') {
92 snprintf(hexbyte
, sizeof(hexbyte
), "~%02x", (uint8_t)*i
);
101 static int decode_escaped(const char *p
, string
*out
)
103 const char *orig_p
= p
;
104 while (*p
&& *p
!= '!') {
105 if (*p
== '#' || *p
== '~') {
107 int r
= sscanf(++p
, "%2x", &hex
);
110 out
->push_back((char)hex
);
113 out
->push_back(*p
++);
119 // some things we encode in binary (as le32 or le64); print the
120 // resulting key strings nicely
121 static string
pretty_binary_string(const string
& in
)
125 out
.reserve(in
.length() * 3);
126 enum { NONE
, HEX
, STRING
} mode
= NONE
;
127 unsigned from
= 0, i
;
128 for (i
=0; i
< in
.length(); ++i
) {
129 if ((in
[i
] < 32 || (unsigned char)in
[i
] > 126) ||
130 (mode
== HEX
&& in
.length() - i
>= 4 &&
131 ((in
[i
] < 32 || (unsigned char)in
[i
] > 126) ||
132 (in
[i
+1] < 32 || (unsigned char)in
[i
+1] > 126) ||
133 (in
[i
+2] < 32 || (unsigned char)in
[i
+2] > 126) ||
134 (in
[i
+3] < 32 || (unsigned char)in
[i
+3] > 126)))) {
135 if (mode
== STRING
) {
136 out
.append(in
.substr(from
, i
- from
));
143 if (in
.length() - i
>= 4) {
144 // print a whole u32 at once
145 snprintf(buf
, sizeof(buf
), "%08x",
146 (uint32_t)(((unsigned char)in
[i
] << 24) |
147 ((unsigned char)in
[i
+1] << 16) |
148 ((unsigned char)in
[i
+2] << 8) |
149 ((unsigned char)in
[i
+3] << 0)));
152 snprintf(buf
, sizeof(buf
), "%02x", (int)(unsigned char)in
[i
]);
156 if (mode
!= STRING
) {
163 if (mode
== STRING
) {
164 out
.append(in
.substr(from
, i
- from
));
170 static void _key_encode_shard(shard_id_t shard
, string
*key
)
172 // make field ordering match with ghobject_t compare operations
173 if (shard
== shard_id_t::NO_SHARD
) {
174 // otherwise ff will sort *after* 0, not before.
178 snprintf(buf
, sizeof(buf
), "%02x", (int)shard
);
182 static const char *_key_decode_shard(const char *key
, shard_id_t
*pshard
)
185 *pshard
= shard_id_t::NO_SHARD
;
188 int r
= sscanf(key
, "%x", &shard
);
191 *pshard
= shard_id_t(shard
);
196 static void get_coll_key_range(const coll_t
& cid
, int bits
,
197 string
*temp_start
, string
*temp_end
,
198 string
*start
, string
*end
)
206 if (cid
.is_pg(&pgid
)) {
207 _key_encode_shard(pgid
.shard
, start
);
209 *temp_start
= *start
;
212 _key_encode_u64(pgid
.pool() + 0x8000000000000000ull
, start
);
213 _key_encode_u64((-2ll - pgid
.pool()) + 0x8000000000000000ull
, temp_start
);
214 _key_encode_u32(hobject_t::_reverse_bits(pgid
.ps()), start
);
215 _key_encode_u32(hobject_t::_reverse_bits(pgid
.ps()), temp_start
);
217 temp_start
->append(".");
219 _key_encode_u64(pgid
.pool() + 0x8000000000000000ull
, end
);
220 _key_encode_u64((-2ll - pgid
.pool()) + 0x8000000000000000ull
, temp_end
);
223 hobject_t::_reverse_bits(pgid
.ps()) + (1ull << (32-bits
));
224 if (end_hash
<= 0xffffffffull
) {
225 _key_encode_u32(end_hash
, end
);
226 _key_encode_u32(end_hash
, temp_end
);
228 temp_end
->append(".");
230 _key_encode_u32(0xffffffff, end
);
231 _key_encode_u32(0xffffffff, temp_end
);
233 temp_end
->append(":");
236 _key_encode_shard(shard_id_t::NO_SHARD
, start
);
237 _key_encode_u64(-1ull + 0x8000000000000000ull
, start
);
239 _key_encode_u32(0, start
);
241 _key_encode_u32(0xffffffff, end
);
244 // no separate temp section
250 static int get_key_object(const string
& key
, ghobject_t
*oid
);
252 static void get_object_key(CephContext
* cct
, const ghobject_t
& oid
,
257 _key_encode_shard(oid
.shard_id
, key
);
258 _key_encode_u64(oid
.hobj
.pool
+ 0x8000000000000000ull
, key
);
259 _key_encode_u32(oid
.hobj
.get_bitwise_key_u32(), key
);
262 append_escaped(oid
.hobj
.nspace
, key
);
264 if (oid
.hobj
.get_key().length()) {
265 // is a key... could be < = or >.
266 // (ASCII chars < = and > sort in that order, yay)
267 if (oid
.hobj
.get_key() < oid
.hobj
.oid
.name
) {
269 append_escaped(oid
.hobj
.get_key(), key
);
270 append_escaped(oid
.hobj
.oid
.name
, key
);
271 } else if (oid
.hobj
.get_key() > oid
.hobj
.oid
.name
) {
273 append_escaped(oid
.hobj
.get_key(), key
);
274 append_escaped(oid
.hobj
.oid
.name
, key
);
278 append_escaped(oid
.hobj
.oid
.name
, key
);
283 append_escaped(oid
.hobj
.oid
.name
, key
);
286 _key_encode_u64(oid
.hobj
.snap
, key
);
287 _key_encode_u64(oid
.generation
, key
);
292 int r
= get_key_object(*key
, &t
);
294 derr
<< " r " << r
<< dendl
;
295 derr
<< "key " << pretty_binary_string(*key
) << dendl
;
296 derr
<< "oid " << oid
<< dendl
;
297 derr
<< " t " << t
<< dendl
;
303 static int get_key_object(const string
& key
, ghobject_t
*oid
)
306 const char *p
= key
.c_str();
308 p
= _key_decode_shard(p
, &oid
->shard_id
);
311 p
= _key_decode_u64(p
, &pool
);
312 oid
->hobj
.pool
= pool
- 0x8000000000000000ull
;
315 p
= _key_decode_u32(p
, &hash
);
316 oid
->hobj
.set_bitwise_key_u32(hash
);
321 r
= decode_escaped(p
, &oid
->hobj
.nspace
);
329 r
= decode_escaped(p
, &oid
->hobj
.oid
.name
);
333 } else if (*p
== '<' || *p
== '>') {
337 r
= decode_escaped(p
, &okey
);
341 r
= decode_escaped(p
, &oid
->hobj
.oid
.name
);
345 oid
->hobj
.set_key(okey
);
351 p
= _key_decode_u64(p
, &oid
->hobj
.snap
.val
);
352 p
= _key_decode_u64(p
, &oid
->generation
);
354 // if we get something other than a null terminator here,
355 // something goes wrong.
363 static void get_data_key(uint64_t nid
, uint64_t offset
, string
*out
)
365 _key_encode_u64(nid
, out
);
366 _key_encode_u64(offset
, out
);
370 static void get_omap_header(uint64_t id
, string
*out
)
372 _key_encode_u64(id
, out
);
376 // hmm, I don't think there's any need to escape the user key since we
377 // have a clean prefix.
378 static void get_omap_key(uint64_t id
, const string
& key
, string
*out
)
380 _key_encode_u64(id
, out
);
385 static void rewrite_omap_key(uint64_t id
, string old
, string
*out
)
387 _key_encode_u64(id
, out
);
388 out
->append(old
.substr(out
->length()));
391 static void decode_omap_key(const string
& key
, string
*user_key
)
393 *user_key
= key
.substr(sizeof(uint64_t) + 1);
396 static void get_omap_tail(uint64_t id
, string
*out
)
398 _key_encode_u64(id
, out
);
407 #define dout_prefix *_dout << "kstore.onode(" << this << ") "
409 void KStore::Onode::flush()
411 std::unique_lock
<std::mutex
> l(flush_lock
);
412 dout(20) << __func__
<< " " << flush_txns
<< dendl
;
413 while (!flush_txns
.empty())
415 dout(20) << __func__
<< " done" << dendl
;
421 #define dout_prefix *_dout << "kstore.lru(" << this << ") "
423 void KStore::OnodeHashLRU::_touch(OnodeRef o
)
425 lru_list_t::iterator p
= lru
.iterator_to(*o
);
430 void KStore::OnodeHashLRU::add(const ghobject_t
& oid
, OnodeRef o
)
432 std::lock_guard
<std::mutex
> l(lock
);
433 dout(30) << __func__
<< " " << oid
<< " " << o
<< dendl
;
434 assert(onode_map
.count(oid
) == 0);
439 KStore::OnodeRef
KStore::OnodeHashLRU::lookup(const ghobject_t
& oid
)
441 std::lock_guard
<std::mutex
> l(lock
);
442 dout(30) << __func__
<< dendl
;
443 ceph::unordered_map
<ghobject_t
,OnodeRef
>::iterator p
= onode_map
.find(oid
);
444 if (p
== onode_map
.end()) {
445 dout(30) << __func__
<< " " << oid
<< " miss" << dendl
;
448 dout(30) << __func__
<< " " << oid
<< " hit " << p
->second
<< dendl
;
453 void KStore::OnodeHashLRU::clear()
455 std::lock_guard
<std::mutex
> l(lock
);
456 dout(10) << __func__
<< dendl
;
461 void KStore::OnodeHashLRU::rename(const ghobject_t
& old_oid
,
462 const ghobject_t
& new_oid
)
464 std::lock_guard
<std::mutex
> l(lock
);
465 dout(30) << __func__
<< " " << old_oid
<< " -> " << new_oid
<< dendl
;
466 ceph::unordered_map
<ghobject_t
,OnodeRef
>::iterator po
, pn
;
467 po
= onode_map
.find(old_oid
);
468 pn
= onode_map
.find(new_oid
);
470 assert(po
!= onode_map
.end());
471 if (pn
!= onode_map
.end()) {
472 lru_list_t::iterator p
= lru
.iterator_to(*pn
->second
);
476 OnodeRef o
= po
->second
;
478 // install a non-existent onode it its place
479 po
->second
.reset(new Onode(cct
, old_oid
, o
->key
));
480 lru
.push_back(*po
->second
);
483 onode_map
.insert(make_pair(new_oid
, o
));
486 get_object_key(cct
, new_oid
, &o
->key
);
489 bool KStore::OnodeHashLRU::get_next(
490 const ghobject_t
& after
,
491 pair
<ghobject_t
,OnodeRef
> *next
)
493 std::lock_guard
<std::mutex
> l(lock
);
494 dout(20) << __func__
<< " after " << after
<< dendl
;
496 if (after
== ghobject_t()) {
500 ceph::unordered_map
<ghobject_t
,OnodeRef
>::iterator p
= onode_map
.begin();
501 assert(p
!= onode_map
.end());
502 next
->first
= p
->first
;
503 next
->second
= p
->second
;
507 ceph::unordered_map
<ghobject_t
,OnodeRef
>::iterator p
= onode_map
.find(after
);
508 assert(p
!= onode_map
.end()); // for now
509 lru_list_t::iterator pi
= lru
.iterator_to(*p
->second
);
511 if (pi
== lru
.end()) {
514 next
->first
= pi
->oid
;
515 next
->second
= onode_map
[pi
->oid
];
519 int KStore::OnodeHashLRU::trim(int max
)
521 std::lock_guard
<std::mutex
> l(lock
);
522 dout(20) << __func__
<< " max " << max
523 << " size " << onode_map
.size() << dendl
;
525 int num
= onode_map
.size() - max
;
526 if (onode_map
.size() == 0 || num
<= 0)
527 return 0; // don't even try
529 lru_list_t::iterator p
= lru
.end();
534 int refs
= o
->nref
.load();
536 dout(20) << __func__
<< " " << o
->oid
<< " has " << refs
537 << " refs; stopping with " << num
<< " left to trim" << dendl
;
540 dout(30) << __func__
<< " trim " << o
->oid
<< dendl
;
541 if (p
!= lru
.begin()) {
547 o
->get(); // paranoia
548 onode_map
.erase(o
->oid
);
556 // =======================================================
561 #define dout_prefix *_dout << "kstore(" << store->path << ").collection(" << cid << ") "
563 KStore::Collection::Collection(KStore
*ns
, coll_t c
)
566 lock("KStore::Collection::lock", true, false),
567 onode_map(store
->cct
)
571 KStore::OnodeRef
KStore::Collection::get_onode(
572 const ghobject_t
& oid
,
575 assert(create
? lock
.is_wlocked() : lock
.is_locked());
578 if (cid
.is_pg(&pgid
)) {
579 if (!oid
.match(cnode
.bits
, pgid
.ps())) {
580 lderr(store
->cct
) << __func__
<< " oid " << oid
<< " not part of "
581 << pgid
<< " bits " << cnode
.bits
<< dendl
;
586 OnodeRef o
= onode_map
.lookup(oid
);
591 get_object_key(store
->cct
, oid
, &key
);
593 ldout(store
->cct
, 20) << __func__
<< " oid " << oid
<< " key "
594 << pretty_binary_string(key
) << dendl
;
597 int r
= store
->db
->get(PREFIX_OBJ
, key
, &v
);
598 ldout(store
->cct
, 20) << " r " << r
<< " v.len " << v
.length() << dendl
;
600 if (v
.length() == 0) {
601 assert(r
== -ENOENT
);
606 on
= new Onode(store
->cct
, oid
, key
);
611 on
= new Onode(store
->cct
, oid
, key
);
613 bufferlist::iterator p
= v
.begin();
614 ::decode(on
->onode
, p
);
617 onode_map
.add(oid
, o
);
623 // =======================================================
626 #define dout_prefix *_dout << "kstore(" << path << ") "
628 KStore::KStore(CephContext
*cct
, const string
& path
)
629 : ObjectStore(cct
, path
),
634 coll_lock("KStore::coll_lock"),
637 throttle_ops(cct
, "kstore_max_ops", cct
->_conf
->kstore_max_ops
),
638 throttle_bytes(cct
, "kstore_max_bytes", cct
->_conf
->kstore_max_bytes
),
640 kv_sync_thread(this),
655 void KStore::_init_logger()
658 PerfCountersBuilder
b(cct
, "KStore",
659 l_kstore_first
, l_kstore_last
);
660 b
.add_time_avg(l_kstore_state_prepare_lat
, "state_prepare_lat", "Average prepare state latency");
661 b
.add_time_avg(l_kstore_state_kv_queued_lat
, "state_kv_queued_lat", "Average kv_queued state latency");
662 b
.add_time_avg(l_kstore_state_kv_done_lat
, "state_kv_done_lat", "Average kv_done state latency");
663 b
.add_time_avg(l_kstore_state_finishing_lat
, "state_finishing_lat", "Average finishing state latency");
664 b
.add_time_avg(l_kstore_state_done_lat
, "state_done_lat", "Average done state latency");
665 logger
= b
.create_perf_counters();
666 cct
->get_perfcounters_collection()->add(logger
);
669 void KStore::_shutdown_logger()
672 cct
->get_perfcounters_collection()->remove(logger
);
676 int KStore::_open_path()
679 path_fd
= ::open(path
.c_str(), O_DIRECTORY
);
682 derr
<< __func__
<< " unable to open " << path
<< ": " << cpp_strerror(r
)
689 void KStore::_close_path()
691 VOID_TEMP_FAILURE_RETRY(::close(path_fd
));
695 int KStore::_open_fsid(bool create
)
701 fsid_fd
= ::openat(path_fd
, "fsid", flags
, 0644);
704 derr
<< __func__
<< " " << cpp_strerror(err
) << dendl
;
710 int KStore::_read_fsid(uuid_d
*uuid
)
713 memset(fsid_str
, 0, sizeof(fsid_str
));
714 int ret
= safe_read(fsid_fd
, fsid_str
, sizeof(fsid_str
));
716 derr
<< __func__
<< " failed: " << cpp_strerror(ret
) << dendl
;
723 if (!uuid
->parse(fsid_str
)) {
724 derr
<< __func__
<< " unparsable uuid " << fsid_str
<< dendl
;
730 int KStore::_write_fsid()
732 int r
= ::ftruncate(fsid_fd
, 0);
735 derr
<< __func__
<< " fsid truncate failed: " << cpp_strerror(r
) << dendl
;
738 string str
= stringify(fsid
) + "\n";
739 r
= safe_write(fsid_fd
, str
.c_str(), str
.length());
741 derr
<< __func__
<< " fsid write failed: " << cpp_strerror(r
) << dendl
;
744 r
= ::fsync(fsid_fd
);
747 derr
<< __func__
<< " fsid fsync failed: " << cpp_strerror(r
) << dendl
;
753 void KStore::_close_fsid()
755 VOID_TEMP_FAILURE_RETRY(::close(fsid_fd
));
759 int KStore::_lock_fsid()
762 memset(&l
, 0, sizeof(l
));
764 l
.l_whence
= SEEK_SET
;
767 int r
= ::fcntl(fsid_fd
, F_SETLK
, &l
);
770 derr
<< __func__
<< " failed to lock " << path
<< "/fsid"
771 << " (is another ceph-osd still running?)"
772 << cpp_strerror(err
) << dendl
;
778 bool KStore::test_mount_in_use()
780 // most error conditions mean the mount is not in use (e.g., because
781 // it doesn't exist). only if we fail to lock do we conclude it is
784 int r
= _open_path();
787 r
= _open_fsid(false);
792 ret
= true; // if we can't lock, it is in use
799 int KStore::_open_db(bool create
)
804 snprintf(fn
, sizeof(fn
), "%s/db", path
.c_str());
808 kv_backend
= cct
->_conf
->kstore_backend
;
810 r
= read_meta("kv_backend", &kv_backend
);
812 derr
<< __func__
<< " uanble to read 'kv_backend' meta" << dendl
;
816 dout(10) << __func__
<< " kv_backend = " << kv_backend
<< dendl
;
819 int r
= ::mkdir(fn
, 0755);
822 if (r
< 0 && r
!= -EEXIST
) {
823 derr
<< __func__
<< " failed to create " << fn
<< ": " << cpp_strerror(r
)
829 char walfn
[PATH_MAX
];
830 snprintf(walfn
, sizeof(walfn
), "%s/db.wal", path
.c_str());
831 r
= ::mkdir(walfn
, 0755);
834 if (r
< 0 && r
!= -EEXIST
) {
835 derr
<< __func__
<< " failed to create " << walfn
836 << ": " << cpp_strerror(r
)
842 db
= KeyValueDB::create(cct
, kv_backend
, fn
);
844 derr
<< __func__
<< " error creating db" << dendl
;
848 if (kv_backend
== "rocksdb")
849 options
= cct
->_conf
->kstore_rocksdb_options
;
853 r
= db
->create_and_open(err
);
857 derr
<< __func__
<< " erroring opening db: " << err
.str() << dendl
;
862 dout(1) << __func__
<< " opened " << kv_backend
863 << " path " << fn
<< " options " << options
<< dendl
;
867 void KStore::_close_db()
874 int KStore::_open_collections(int *errors
)
876 assert(coll_map
.empty());
877 KeyValueDB::Iterator it
= db
->get_iterator(PREFIX_COLL
);
878 for (it
->upper_bound(string());
882 if (cid
.parse(it
->key())) {
883 CollectionRef
c(new Collection(this, cid
));
884 bufferlist bl
= it
->value();
885 bufferlist::iterator p
= bl
.begin();
887 ::decode(c
->cnode
, p
);
888 } catch (buffer::error
& e
) {
889 derr
<< __func__
<< " failed to decode cnode, key:"
890 << pretty_binary_string(it
->key()) << dendl
;
893 dout(20) << __func__
<< " opened " << cid
<< dendl
;
896 derr
<< __func__
<< " unrecognized collection " << it
->key() << dendl
;
906 dout(1) << __func__
<< " path " << path
<< dendl
;
914 r
= _open_fsid(true);
922 r
= _read_fsid(&old_fsid
);
923 if (r
< 0 || old_fsid
.is_zero()) {
924 if (fsid
.is_zero()) {
925 fsid
.generate_random();
926 dout(1) << __func__
<< " generated fsid " << fsid
<< dendl
;
928 dout(1) << __func__
<< " using provided fsid " << fsid
<< dendl
;
930 // we'll write it last.
932 if (!fsid
.is_zero() && fsid
!= old_fsid
) {
933 derr
<< __func__
<< " on-disk fsid " << old_fsid
934 << " != provided " << fsid
<< dendl
;
939 dout(1) << __func__
<< " already created, fsid is " << fsid
<< dendl
;
947 r
= write_meta("kv_backend", cct
->_conf
->kstore_backend
);
951 r
= write_meta("type", "kstore");
955 // indicate mkfs completion/success by writing the fsid file
958 dout(10) << __func__
<< " success" << dendl
;
960 derr
<< __func__
<< " error writing fsid: " << cpp_strerror(r
) << dendl
;
973 dout(1) << __func__
<< " path " << path
<< dendl
;
975 if (cct
->_conf
->kstore_fsck_on_mount
) {
976 int rc
= fsck(cct
->_conf
->kstore_fsck_on_mount_deep
);
981 int r
= _open_path();
984 r
= _open_fsid(false);
988 r
= _read_fsid(&fsid
);
1000 r
= _open_super_meta();
1004 r
= _open_collections();
1009 kv_sync_thread
.create("kstore_kv_sync");
1023 int KStore::umount()
1026 dout(1) << __func__
<< dendl
;
1029 _reap_collections();
1032 dout(20) << __func__
<< " stopping kv thread" << dendl
;
1034 dout(20) << __func__
<< " draining finisher" << dendl
;
1035 finisher
.wait_for_empty();
1036 dout(20) << __func__
<< " stopping finisher" << dendl
;
1038 dout(20) << __func__
<< " closing" << dendl
;
1047 int KStore::fsck(bool deep
)
1049 dout(1) << __func__
<< dendl
;
1051 dout(1) << __func__
<< " finish with " << errors
<< " errors" << dendl
;
1055 void KStore::_sync()
1057 dout(10) << __func__
<< dendl
;
1059 std::unique_lock
<std::mutex
> l(kv_lock
);
1060 while (!kv_committing
.empty() ||
1061 !kv_queue
.empty()) {
1062 dout(20) << " waiting for kv to commit" << dendl
;
1063 kv_sync_cond
.wait(l
);
1066 dout(10) << __func__
<< " done" << dendl
;
1069 int KStore::statfs(struct store_statfs_t
* buf
)
1071 return db
->get_statfs(buf
);
1077 KStore::CollectionRef
KStore::_get_collection(coll_t cid
)
1079 RWLock::RLocker
l(coll_lock
);
1080 ceph::unordered_map
<coll_t
,CollectionRef
>::iterator cp
= coll_map
.find(cid
);
1081 if (cp
== coll_map
.end())
1082 return CollectionRef();
1086 void KStore::_queue_reap_collection(CollectionRef
& c
)
1088 dout(10) << __func__
<< " " << c
->cid
<< dendl
;
1089 std::lock_guard
<std::mutex
> l(reap_lock
);
1090 removed_collections
.push_back(c
);
1093 void KStore::_reap_collections()
1095 list
<CollectionRef
> removed_colls
;
1096 std::lock_guard
<std::mutex
> l(reap_lock
);
1097 removed_colls
.swap(removed_collections
);
1099 for (list
<CollectionRef
>::iterator p
= removed_colls
.begin();
1100 p
!= removed_colls
.end();
1102 CollectionRef c
= *p
;
1103 dout(10) << __func__
<< " " << c
->cid
<< dendl
;
1105 pair
<ghobject_t
,OnodeRef
> next
;
1106 while (c
->onode_map
.get_next(next
.first
, &next
)) {
1107 assert(!next
.second
->exists
);
1108 if (!next
.second
->flush_txns
.empty()) {
1109 dout(10) << __func__
<< " " << c
->cid
<< " " << next
.second
->oid
1110 << " flush_txns " << next
.second
->flush_txns
<< dendl
;
1115 c
->onode_map
.clear();
1116 dout(10) << __func__
<< " " << c
->cid
<< " done" << dendl
;
1119 dout(10) << __func__
<< " all reaped" << dendl
;
1125 bool KStore::exists(const coll_t
& cid
, const ghobject_t
& oid
)
1127 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
1128 CollectionRef c
= _get_collection(cid
);
1131 RWLock::RLocker
l(c
->lock
);
1132 OnodeRef o
= c
->get_onode(oid
, false);
1133 if (!o
|| !o
->exists
)
1140 const ghobject_t
& oid
,
1144 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
1145 CollectionRef c
= _get_collection(cid
);
1148 RWLock::RLocker
l(c
->lock
);
1149 OnodeRef o
= c
->get_onode(oid
, false);
1150 if (!o
|| !o
->exists
)
1152 st
->st_size
= o
->onode
.size
;
1153 st
->st_blksize
= 4096;
1154 st
->st_blocks
= (st
->st_size
+ st
->st_blksize
- 1) / st
->st_blksize
;
1159 int KStore::set_collection_opts(
1161 const pool_opts_t
& opts
)
1168 const ghobject_t
& oid
,
1175 dout(15) << __func__
<< " " << cid
<< " " << oid
1176 << " " << offset
<< "~" << length
1179 CollectionRef c
= _get_collection(cid
);
1182 RWLock::RLocker
l(c
->lock
);
1186 OnodeRef o
= c
->get_onode(oid
, false);
1187 if (!o
|| !o
->exists
) {
1192 if (offset
== length
&& offset
== 0)
1193 length
= o
->onode
.size
;
1195 r
= _do_read(o
, offset
, length
, bl
, op_flags
);
1198 dout(10) << __func__
<< " " << cid
<< " " << oid
1199 << " " << offset
<< "~" << length
1200 << " = " << r
<< dendl
;
1204 int KStore::_do_read(
1212 uint64_t stripe_size
= o
->onode
.stripe_size
;
1213 uint64_t stripe_off
;
1215 dout(20) << __func__
<< " " << offset
<< "~" << length
<< " size "
1216 << o
->onode
.size
<< " nid " << o
->onode
.nid
<< dendl
;
1219 if (offset
> o
->onode
.size
) {
1222 if (offset
+ length
> o
->onode
.size
) {
1223 length
= o
->onode
.size
- offset
;
1225 if (stripe_size
== 0) {
1226 bl
.append_zero(length
);
1233 stripe_off
= offset
% stripe_size
;
1234 while (length
> 0) {
1236 _do_read_stripe(o
, offset
- stripe_off
, &stripe
);
1237 dout(30) << __func__
<< " stripe " << offset
- stripe_off
<< " got "
1238 << stripe
.length() << dendl
;
1239 unsigned swant
= MIN(stripe_size
- stripe_off
, length
);
1240 if (stripe
.length()) {
1241 if (swant
== stripe
.length()) {
1242 bl
.claim_append(stripe
);
1243 dout(30) << __func__
<< " taking full stripe" << dendl
;
1246 if (stripe_off
< stripe
.length()) {
1247 l
= MIN(stripe
.length() - stripe_off
, swant
);
1249 t
.substr_of(stripe
, stripe_off
, l
);
1251 dout(30) << __func__
<< " taking " << stripe_off
<< "~" << l
<< dendl
;
1254 bl
.append_zero(swant
- l
);
1255 dout(30) << __func__
<< " adding " << swant
- l
<< " zeros" << dendl
;
1259 dout(30) << __func__
<< " generating " << swant
<< " zeros" << dendl
;
1260 bl
.append_zero(swant
);
1267 dout(30) << " result:\n";
1277 const ghobject_t
& oid
,
1282 map
<uint64_t, uint64_t> m
;
1283 int r
= fiemap(cid
, oid
, offset
, len
, m
);
1293 const ghobject_t
& oid
,
1296 map
<uint64_t, uint64_t>& destmap
)
1298 CollectionRef c
= _get_collection(cid
);
1301 RWLock::RLocker
l(c
->lock
);
1303 OnodeRef o
= c
->get_onode(oid
, false);
1304 if (!o
|| !o
->exists
) {
1308 if (offset
> o
->onode
.size
)
1311 if (offset
+ len
> o
->onode
.size
) {
1312 len
= o
->onode
.size
- offset
;
1315 dout(20) << __func__
<< " " << offset
<< "~" << len
<< " size "
1316 << o
->onode
.size
<< dendl
;
1318 // FIXME: do something smarter here
1319 destmap
[0] = o
->onode
.size
;
1322 dout(20) << __func__
<< " " << offset
<< "~" << len
1323 << " size = 0 (" << destmap
<< ")" << dendl
;
1327 int KStore::getattr(
1329 const ghobject_t
& oid
,
1333 dout(15) << __func__
<< " " << cid
<< " " << oid
<< " " << name
<< dendl
;
1334 CollectionRef c
= _get_collection(cid
);
1337 RWLock::RLocker
l(c
->lock
);
1341 OnodeRef o
= c
->get_onode(oid
, false);
1342 if (!o
|| !o
->exists
) {
1347 if (!o
->onode
.attrs
.count(k
)) {
1351 value
= o
->onode
.attrs
[k
];
1354 dout(10) << __func__
<< " " << cid
<< " " << oid
<< " " << name
1355 << " = " << r
<< dendl
;
1359 int KStore::getattrs(
1361 const ghobject_t
& oid
,
1362 map
<string
,bufferptr
>& aset
)
1364 dout(15) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
1365 CollectionRef c
= _get_collection(cid
);
1368 RWLock::RLocker
l(c
->lock
);
1371 OnodeRef o
= c
->get_onode(oid
, false);
1372 if (!o
|| !o
->exists
) {
1376 aset
= o
->onode
.attrs
;
1379 dout(10) << __func__
<< " " << cid
<< " " << oid
1380 << " = " << r
<< dendl
;
1384 int KStore::list_collections(vector
<coll_t
>& ls
)
1386 RWLock::RLocker
l(coll_lock
);
1387 for (ceph::unordered_map
<coll_t
, CollectionRef
>::iterator p
= coll_map
.begin();
1388 p
!= coll_map
.end();
1390 ls
.push_back(p
->first
);
1394 bool KStore::collection_exists(const coll_t
& c
)
1396 RWLock::RLocker
l(coll_lock
);
1397 return coll_map
.count(c
);
1400 int KStore::collection_empty(const coll_t
& cid
, bool *empty
)
1402 dout(15) << __func__
<< " " << cid
<< dendl
;
1403 vector
<ghobject_t
> ls
;
1405 int r
= collection_list(cid
, ghobject_t(), ghobject_t::get_max(), 1,
1408 derr
<< __func__
<< " collection_list returned: " << cpp_strerror(r
)
1412 *empty
= ls
.empty();
1413 dout(10) << __func__
<< " " << cid
<< " = " << (int)(*empty
) << dendl
;
1417 int KStore::collection_bits(const coll_t
& cid
)
1419 dout(15) << __func__
<< " " << cid
<< dendl
;
1420 CollectionHandle ch
= _get_collection(cid
);
1423 Collection
*c
= static_cast<Collection
*>(ch
.get());
1424 RWLock::RLocker
l(c
->lock
);
1425 dout(10) << __func__
<< " " << cid
<< " = " << c
->cnode
.bits
<< dendl
;
1426 return c
->cnode
.bits
;
1429 int KStore::collection_list(
1430 const coll_t
& cid
, const ghobject_t
& start
, const ghobject_t
& end
, int max
,
1431 vector
<ghobject_t
> *ls
, ghobject_t
*pnext
)
1433 CollectionHandle c
= _get_collection(cid
);
1436 return collection_list(c
, start
, end
, max
, ls
, pnext
);
1439 int KStore::collection_list(
1440 CollectionHandle
&c_
, const ghobject_t
& start
, const ghobject_t
& end
, int max
,
1441 vector
<ghobject_t
> *ls
, ghobject_t
*pnext
)
1444 Collection
*c
= static_cast<Collection
*>(c_
.get());
1445 dout(15) << __func__
<< " " << c
->cid
1446 << " start " << start
<< " end " << end
<< " max " << max
<< dendl
;
1449 RWLock::RLocker
l(c
->lock
);
1450 r
= _collection_list(c
, start
, end
, max
, ls
, pnext
);
1453 dout(10) << __func__
<< " " << c
->cid
1454 << " start " << start
<< " end " << end
<< " max " << max
1455 << " = " << r
<< ", ls.size() = " << ls
->size()
1456 << ", next = " << (pnext
? *pnext
: ghobject_t()) << dendl
;
1460 int KStore::_collection_list(
1461 Collection
* c
, const ghobject_t
& start
, const ghobject_t
& end
, int max
,
1462 vector
<ghobject_t
> *ls
, ghobject_t
*pnext
)
1465 KeyValueDB::Iterator it
;
1466 string temp_start_key
, temp_end_key
;
1467 string start_key
, end_key
;
1468 bool set_next
= false;
1472 ghobject_t static_next
;
1474 pnext
= &static_next
;
1476 if (start
== ghobject_t::get_max() ||
1477 start
.hobj
.is_max()) {
1480 get_coll_key_range(c
->cid
, c
->cnode
.bits
, &temp_start_key
, &temp_end_key
,
1481 &start_key
, &end_key
);
1482 dout(20) << __func__
1483 << " range " << pretty_binary_string(temp_start_key
)
1484 << " to " << pretty_binary_string(temp_end_key
)
1485 << " and " << pretty_binary_string(start_key
)
1486 << " to " << pretty_binary_string(end_key
)
1487 << " start " << start
<< dendl
;
1488 it
= db
->get_iterator(PREFIX_OBJ
);
1489 if (start
== ghobject_t() || start
== c
->cid
.get_min_hobj()) {
1490 it
->upper_bound(temp_start_key
);
1494 get_object_key(cct
, start
, &k
);
1495 if (start
.hobj
.is_temp()) {
1497 assert(k
>= temp_start_key
&& k
< temp_end_key
);
1500 assert(k
>= start_key
&& k
< end_key
);
1502 dout(20) << " start from " << pretty_binary_string(k
)
1503 << " temp=" << (int)temp
<< dendl
;
1506 if (end
.hobj
.is_max()) {
1507 pend
= temp
? temp_end_key
: end_key
;
1509 get_object_key(cct
, end
, &end_key
);
1510 if (end
.hobj
.is_temp()) {
1516 pend
= temp
? temp_end_key
: end_key
;
1519 dout(20) << __func__
<< " pend " << pretty_binary_string(pend
) << dendl
;
1521 if (!it
->valid() || it
->key() >= pend
) {
1523 dout(20) << __func__
<< " iterator not valid (end of db?)" << dendl
;
1525 dout(20) << __func__
<< " key " << pretty_binary_string(it
->key())
1526 << " > " << end
<< dendl
;
1528 if (end
.hobj
.is_temp()) {
1531 dout(30) << __func__
<< " switch to non-temp namespace" << dendl
;
1533 it
->upper_bound(start_key
);
1535 dout(30) << __func__
<< " pend " << pretty_binary_string(pend
) << dendl
;
1540 dout(20) << __func__
<< " key " << pretty_binary_string(it
->key()) << dendl
;
1542 int r
= get_key_object(it
->key(), &oid
);
1544 if (ls
->size() >= (unsigned)max
) {
1545 dout(20) << __func__
<< " reached max " << max
<< dendl
;
1555 *pnext
= ghobject_t::get_max();
1562 KStore::OmapIteratorImpl::OmapIteratorImpl(
1563 CollectionRef c
, OnodeRef o
, KeyValueDB::Iterator it
)
1564 : c(c
), o(o
), it(it
)
1566 RWLock::RLocker
l(c
->lock
);
1567 if (o
->onode
.omap_head
) {
1568 get_omap_key(o
->onode
.omap_head
, string(), &head
);
1569 get_omap_tail(o
->onode
.omap_head
, &tail
);
1570 it
->lower_bound(head
);
1574 int KStore::OmapIteratorImpl::seek_to_first()
1576 RWLock::RLocker
l(c
->lock
);
1577 if (o
->onode
.omap_head
) {
1578 it
->lower_bound(head
);
1580 it
= KeyValueDB::Iterator();
1585 int KStore::OmapIteratorImpl::upper_bound(const string
& after
)
1587 RWLock::RLocker
l(c
->lock
);
1588 if (o
->onode
.omap_head
) {
1590 get_omap_key(o
->onode
.omap_head
, after
, &key
);
1591 it
->upper_bound(key
);
1593 it
= KeyValueDB::Iterator();
1598 int KStore::OmapIteratorImpl::lower_bound(const string
& to
)
1600 RWLock::RLocker
l(c
->lock
);
1601 if (o
->onode
.omap_head
) {
1603 get_omap_key(o
->onode
.omap_head
, to
, &key
);
1604 it
->lower_bound(key
);
1606 it
= KeyValueDB::Iterator();
1611 bool KStore::OmapIteratorImpl::valid()
1613 RWLock::RLocker
l(c
->lock
);
1614 if (o
->onode
.omap_head
&& it
->valid() && it
->raw_key().second
<= tail
) {
1621 int KStore::OmapIteratorImpl::next(bool validate
)
1623 RWLock::RLocker
l(c
->lock
);
1624 if (o
->onode
.omap_head
) {
1632 string
KStore::OmapIteratorImpl::key()
1634 RWLock::RLocker
l(c
->lock
);
1635 assert(it
->valid());
1636 string db_key
= it
->raw_key().second
;
1638 decode_omap_key(db_key
, &user_key
);
1642 bufferlist
KStore::OmapIteratorImpl::value()
1644 RWLock::RLocker
l(c
->lock
);
1645 assert(it
->valid());
1649 int KStore::omap_get(
1650 const coll_t
& cid
, ///< [in] Collection containing oid
1651 const ghobject_t
&oid
, ///< [in] Object containing omap
1652 bufferlist
*header
, ///< [out] omap header
1653 map
<string
, bufferlist
> *out
/// < [out] Key to value map
1656 dout(15) << __func__
<< " " << cid
<< " oid " << oid
<< dendl
;
1657 CollectionRef c
= _get_collection(cid
);
1660 RWLock::RLocker
l(c
->lock
);
1662 OnodeRef o
= c
->get_onode(oid
, false);
1663 if (!o
|| !o
->exists
) {
1667 if (!o
->onode
.omap_head
)
1671 KeyValueDB::Iterator it
= db
->get_iterator(PREFIX_OMAP
);
1673 get_omap_header(o
->onode
.omap_head
, &head
);
1674 get_omap_tail(o
->onode
.omap_head
, &tail
);
1675 it
->lower_bound(head
);
1676 while (it
->valid()) {
1677 if (it
->key() == head
) {
1678 dout(30) << __func__
<< " got header" << dendl
;
1679 *header
= it
->value();
1680 } else if (it
->key() >= tail
) {
1681 dout(30) << __func__
<< " reached tail" << dendl
;
1685 decode_omap_key(it
->key(), &user_key
);
1686 dout(30) << __func__
<< " got " << pretty_binary_string(it
->key())
1687 << " -> " << user_key
<< dendl
;
1688 assert(it
->key() < tail
);
1689 (*out
)[user_key
] = it
->value();
1695 dout(10) << __func__
<< " " << cid
<< " oid " << oid
<< " = " << r
<< dendl
;
1699 int KStore::omap_get_header(
1700 const coll_t
& cid
, ///< [in] Collection containing oid
1701 const ghobject_t
&oid
, ///< [in] Object containing omap
1702 bufferlist
*header
, ///< [out] omap header
1703 bool allow_eio
///< [in] don't assert on eio
1706 dout(15) << __func__
<< " " << cid
<< " oid " << oid
<< dendl
;
1707 CollectionRef c
= _get_collection(cid
);
1710 RWLock::RLocker
l(c
->lock
);
1712 OnodeRef o
= c
->get_onode(oid
, false);
1713 if (!o
|| !o
->exists
) {
1717 if (!o
->onode
.omap_head
)
1722 get_omap_header(o
->onode
.omap_head
, &head
);
1723 if (db
->get(PREFIX_OMAP
, head
, header
) >= 0) {
1724 dout(30) << __func__
<< " got header" << dendl
;
1726 dout(30) << __func__
<< " no header" << dendl
;
1730 dout(10) << __func__
<< " " << cid
<< " oid " << oid
<< " = " << r
<< dendl
;
1734 int KStore::omap_get_keys(
1735 const coll_t
& cid
, ///< [in] Collection containing oid
1736 const ghobject_t
&oid
, ///< [in] Object containing omap
1737 set
<string
> *keys
///< [out] Keys defined on oid
1740 dout(15) << __func__
<< " " << cid
<< " oid " << oid
<< dendl
;
1741 CollectionRef c
= _get_collection(cid
);
1744 RWLock::RLocker
l(c
->lock
);
1746 OnodeRef o
= c
->get_onode(oid
, false);
1747 if (!o
|| !o
->exists
) {
1751 if (!o
->onode
.omap_head
)
1755 KeyValueDB::Iterator it
= db
->get_iterator(PREFIX_OMAP
);
1757 get_omap_key(o
->onode
.omap_head
, string(), &head
);
1758 get_omap_tail(o
->onode
.omap_head
, &tail
);
1759 it
->lower_bound(head
);
1760 while (it
->valid()) {
1761 if (it
->key() >= tail
) {
1762 dout(30) << __func__
<< " reached tail" << dendl
;
1766 decode_omap_key(it
->key(), &user_key
);
1767 dout(30) << __func__
<< " got " << pretty_binary_string(it
->key())
1768 << " -> " << user_key
<< dendl
;
1769 assert(it
->key() < tail
);
1770 keys
->insert(user_key
);
1775 dout(10) << __func__
<< " " << cid
<< " oid " << oid
<< " = " << r
<< dendl
;
1779 int KStore::omap_get_values(
1780 const coll_t
& cid
, ///< [in] Collection containing oid
1781 const ghobject_t
&oid
, ///< [in] Object containing omap
1782 const set
<string
> &keys
, ///< [in] Keys to get
1783 map
<string
, bufferlist
> *out
///< [out] Returned keys and values
1786 dout(15) << __func__
<< " " << cid
<< " oid " << oid
<< dendl
;
1787 CollectionRef c
= _get_collection(cid
);
1790 RWLock::RLocker
l(c
->lock
);
1792 OnodeRef o
= c
->get_onode(oid
, false);
1793 if (!o
|| !o
->exists
) {
1797 if (!o
->onode
.omap_head
)
1800 for (set
<string
>::const_iterator p
= keys
.begin(); p
!= keys
.end(); ++p
) {
1802 get_omap_key(o
->onode
.omap_head
, *p
, &key
);
1804 if (db
->get(PREFIX_OMAP
, key
, &val
) >= 0) {
1805 dout(30) << __func__
<< " got " << pretty_binary_string(key
)
1806 << " -> " << *p
<< dendl
;
1807 out
->insert(make_pair(*p
, val
));
1811 dout(10) << __func__
<< " " << cid
<< " oid " << oid
<< " = " << r
<< dendl
;
1815 int KStore::omap_check_keys(
1816 const coll_t
& cid
, ///< [in] Collection containing oid
1817 const ghobject_t
&oid
, ///< [in] Object containing omap
1818 const set
<string
> &keys
, ///< [in] Keys to check
1819 set
<string
> *out
///< [out] Subset of keys defined on oid
1822 dout(15) << __func__
<< " " << cid
<< " oid " << oid
<< dendl
;
1823 CollectionRef c
= _get_collection(cid
);
1826 RWLock::RLocker
l(c
->lock
);
1828 OnodeRef o
= c
->get_onode(oid
, false);
1829 if (!o
|| !o
->exists
) {
1833 if (!o
->onode
.omap_head
)
1836 for (set
<string
>::const_iterator p
= keys
.begin(); p
!= keys
.end(); ++p
) {
1838 get_omap_key(o
->onode
.omap_head
, *p
, &key
);
1840 if (db
->get(PREFIX_OMAP
, key
, &val
) >= 0) {
1841 dout(30) << __func__
<< " have " << pretty_binary_string(key
)
1842 << " -> " << *p
<< dendl
;
1845 dout(30) << __func__
<< " miss " << pretty_binary_string(key
)
1846 << " -> " << *p
<< dendl
;
1850 dout(10) << __func__
<< " " << cid
<< " oid " << oid
<< " = " << r
<< dendl
;
1854 ObjectMap::ObjectMapIterator
KStore::get_omap_iterator(
1855 const coll_t
& cid
, ///< [in] collection
1856 const ghobject_t
&oid
///< [in] object
1860 dout(10) << __func__
<< " " << cid
<< " " << oid
<< dendl
;
1861 CollectionRef c
= _get_collection(cid
);
1863 dout(10) << __func__
<< " " << cid
<< "doesn't exist" <<dendl
;
1864 return ObjectMap::ObjectMapIterator();
1866 RWLock::RLocker
l(c
->lock
);
1867 OnodeRef o
= c
->get_onode(oid
, false);
1868 if (!o
|| !o
->exists
) {
1869 dout(10) << __func__
<< " " << oid
<< "doesn't exist" <<dendl
;
1870 return ObjectMap::ObjectMapIterator();
1873 dout(10) << __func__
<< " header = " << o
->onode
.omap_head
<<dendl
;
1874 KeyValueDB::Iterator it
= db
->get_iterator(PREFIX_OMAP
);
1875 return ObjectMap::ObjectMapIterator(new OmapIteratorImpl(c
, o
, it
));
1879 // -----------------
1882 int KStore::_open_super_meta()
1888 db
->get(PREFIX_SUPER
, "nid_max", &bl
);
1889 bufferlist::iterator p
= bl
.begin();
1891 ::decode(nid_max
, p
);
1892 } catch (buffer::error
& e
) {
1894 dout(10) << __func__
<< " old nid_max " << nid_max
<< dendl
;
1900 void KStore::_assign_nid(TransContext
*txc
, OnodeRef o
)
1904 std::lock_guard
<std::mutex
> l(nid_lock
);
1905 o
->onode
.nid
= ++nid_last
;
1906 dout(20) << __func__
<< " " << o
->oid
<< " nid " << o
->onode
.nid
<< dendl
;
1907 if (nid_last
> nid_max
) {
1908 nid_max
+= cct
->_conf
->kstore_nid_prealloc
;
1910 ::encode(nid_max
, bl
);
1911 txc
->t
->set(PREFIX_SUPER
, "nid_max", bl
);
1912 dout(10) << __func__
<< " nid_max now " << nid_max
<< dendl
;
1916 KStore::TransContext
*KStore::_txc_create(OpSequencer
*osr
)
1918 TransContext
*txc
= new TransContext(osr
);
1919 txc
->t
= db
->get_transaction();
1920 osr
->queue_new(txc
);
1921 dout(20) << __func__
<< " osr " << osr
<< " = " << txc
<< dendl
;
1925 void KStore::_txc_state_proc(TransContext
*txc
)
1928 dout(10) << __func__
<< " txc " << txc
1929 << " " << txc
->get_state_name() << dendl
;
1930 switch (txc
->state
) {
1931 case TransContext::STATE_PREPARE
:
1932 txc
->log_state_latency(logger
, l_kstore_state_prepare_lat
);
1933 txc
->state
= TransContext::STATE_KV_QUEUED
;
1934 if (!cct
->_conf
->kstore_sync_transaction
) {
1935 std::lock_guard
<std::mutex
> l(kv_lock
);
1936 if (cct
->_conf
->kstore_sync_submit_transaction
) {
1937 int r
= db
->submit_transaction(txc
->t
);
1940 kv_queue
.push_back(txc
);
1941 kv_cond
.notify_one();
1945 int r
= db
->submit_transaction_sync(txc
->t
);
1950 case TransContext::STATE_KV_QUEUED
:
1951 txc
->log_state_latency(logger
, l_kstore_state_kv_queued_lat
);
1952 txc
->state
= TransContext::STATE_KV_DONE
;
1953 _txc_finish_kv(txc
);
1956 case TransContext::STATE_KV_DONE
:
1957 txc
->log_state_latency(logger
, l_kstore_state_kv_done_lat
);
1958 txc
->state
= TransContext::STATE_FINISHING
;
1961 case TransContext::TransContext::STATE_FINISHING
:
1962 txc
->log_state_latency(logger
, l_kstore_state_finishing_lat
);
1967 derr
<< __func__
<< " unexpected txc " << txc
1968 << " state " << txc
->get_state_name() << dendl
;
1969 assert(0 == "unexpected txc state");
1975 void KStore::_txc_finalize(OpSequencer
*osr
, TransContext
*txc
)
1977 dout(20) << __func__
<< " osr " << osr
<< " txc " << txc
1978 << " onodes " << txc
->onodes
<< dendl
;
1981 for (set
<OnodeRef
>::iterator p
= txc
->onodes
.begin();
1982 p
!= txc
->onodes
.end();
1985 ::encode((*p
)->onode
, bl
);
1986 dout(20) << " onode size is " << bl
.length() << dendl
;
1987 txc
->t
->set(PREFIX_OBJ
, (*p
)->key
, bl
);
1989 std::lock_guard
<std::mutex
> l((*p
)->flush_lock
);
1990 (*p
)->flush_txns
.insert(txc
);
1994 void KStore::_txc_finish_kv(TransContext
*txc
)
1996 dout(20) << __func__
<< " txc " << txc
<< dendl
;
1998 // warning: we're calling onreadable_sync inside the sequencer lock
1999 if (txc
->onreadable_sync
) {
2000 txc
->onreadable_sync
->complete(0);
2001 txc
->onreadable_sync
= NULL
;
2003 if (txc
->onreadable
) {
2004 finisher
.queue(txc
->onreadable
);
2005 txc
->onreadable
= NULL
;
2007 if (txc
->oncommit
) {
2008 finisher
.queue(txc
->oncommit
);
2009 txc
->oncommit
= NULL
;
2011 if (!txc
->oncommits
.empty()) {
2012 finisher
.queue(txc
->oncommits
);
2015 throttle_ops
.put(txc
->ops
);
2016 throttle_bytes
.put(txc
->bytes
);
2019 void KStore::_txc_finish(TransContext
*txc
)
2021 dout(20) << __func__
<< " " << txc
<< " onodes " << txc
->onodes
<< dendl
;
2022 assert(txc
->state
== TransContext::STATE_FINISHING
);
2024 for (set
<OnodeRef
>::iterator p
= txc
->onodes
.begin();
2025 p
!= txc
->onodes
.end();
2027 std::lock_guard
<std::mutex
> l((*p
)->flush_lock
);
2028 dout(20) << __func__
<< " onode " << *p
<< " had " << (*p
)->flush_txns
2030 assert((*p
)->flush_txns
.count(txc
));
2031 (*p
)->flush_txns
.erase(txc
);
2032 if ((*p
)->flush_txns
.empty()) {
2033 (*p
)->flush_cond
.notify_all();
2034 (*p
)->clear_pending_stripes();
2039 txc
->onodes
.clear();
2041 while (!txc
->removed_collections
.empty()) {
2042 _queue_reap_collection(txc
->removed_collections
.front());
2043 txc
->removed_collections
.pop_front();
2046 OpSequencerRef osr
= txc
->osr
;
2048 std::lock_guard
<std::mutex
> l(osr
->qlock
);
2049 txc
->state
= TransContext::STATE_DONE
;
2052 _osr_reap_done(osr
.get());
2055 void KStore::_osr_reap_done(OpSequencer
*osr
)
2057 std::lock_guard
<std::mutex
> l(osr
->qlock
);
2058 dout(20) << __func__
<< " osr " << osr
<< dendl
;
2059 while (!osr
->q
.empty()) {
2060 TransContext
*txc
= &osr
->q
.front();
2061 dout(20) << __func__
<< " txc " << txc
<< " " << txc
->get_state_name()
2063 if (txc
->state
!= TransContext::STATE_DONE
) {
2067 if (txc
->first_collection
) {
2068 txc
->first_collection
->onode_map
.trim(cct
->_conf
->kstore_onode_map_size
);
2072 txc
->log_state_latency(logger
, l_kstore_state_done_lat
);
2074 osr
->qcond
.notify_all();
2076 dout(20) << __func__
<< " osr " << osr
<< " q now empty" << dendl
;
2080 void KStore::_kv_sync_thread()
2082 dout(10) << __func__
<< " start" << dendl
;
2083 std::unique_lock
<std::mutex
> l(kv_lock
);
2085 assert(kv_committing
.empty());
2086 if (kv_queue
.empty()) {
2089 dout(20) << __func__
<< " sleep" << dendl
;
2090 kv_sync_cond
.notify_all();
2092 dout(20) << __func__
<< " wake" << dendl
;
2094 dout(20) << __func__
<< " committing " << kv_queue
.size() << dendl
;
2095 kv_committing
.swap(kv_queue
);
2096 utime_t start
= ceph_clock_now();
2099 dout(30) << __func__
<< " committing txc " << kv_committing
<< dendl
;
2101 // one transaction to force a sync
2102 KeyValueDB::Transaction t
= db
->get_transaction();
2103 if (!cct
->_conf
->kstore_sync_submit_transaction
) {
2104 for (std::deque
<TransContext
*>::iterator it
= kv_committing
.begin();
2105 it
!= kv_committing
.end();
2107 int r
= db
->submit_transaction((*it
)->t
);
2111 int r
= db
->submit_transaction_sync(t
);
2113 utime_t finish
= ceph_clock_now();
2114 utime_t dur
= finish
- start
;
2115 dout(20) << __func__
<< " committed " << kv_committing
.size()
2116 << " in " << dur
<< dendl
;
2117 while (!kv_committing
.empty()) {
2118 TransContext
*txc
= kv_committing
.front();
2119 _txc_state_proc(txc
);
2120 kv_committing
.pop_front();
2123 // this is as good a place as any ...
2124 _reap_collections();
2129 dout(10) << __func__
<< " finish" << dendl
;
2133 // ---------------------------
2136 int KStore::queue_transactions(
2138 vector
<Transaction
>& tls
,
2140 ThreadPool::TPHandle
*handle
)
2142 Context
*onreadable
;
2144 Context
*onreadable_sync
;
2145 ObjectStore::Transaction::collect_contexts(
2146 tls
, &onreadable
, &ondisk
, &onreadable_sync
);
2148 // set up the sequencer
2152 osr
= static_cast<OpSequencer
*>(posr
->p
.get());
2153 dout(10) << __func__
<< " existing " << osr
<< " " << *osr
<< dendl
;
2155 osr
= new OpSequencer(cct
);
2158 dout(10) << __func__
<< " new " << osr
<< " " << *osr
<< dendl
;
2162 TransContext
*txc
= _txc_create(osr
);
2163 txc
->onreadable
= onreadable
;
2164 txc
->onreadable_sync
= onreadable_sync
;
2165 txc
->oncommit
= ondisk
;
2167 for (vector
<Transaction
>::iterator p
= tls
.begin(); p
!= tls
.end(); ++p
) {
2169 txc
->ops
+= (*p
).get_num_ops();
2170 txc
->bytes
+= (*p
).get_num_bytes();
2171 _txc_add_transaction(txc
, &(*p
));
2174 _txc_finalize(osr
, txc
);
2176 throttle_ops
.get(txc
->ops
);
2177 throttle_bytes
.get(txc
->bytes
);
2180 _txc_state_proc(txc
);
2184 void KStore::_txc_add_transaction(TransContext
*txc
, Transaction
*t
)
2186 Transaction::iterator i
= t
->begin();
2188 dout(30) << __func__
<< " transaction dump:\n";
2189 JSONFormatter
f(true);
2190 f
.open_object_section("transaction");
2196 vector
<CollectionRef
> cvec(i
.colls
.size());
2198 for (vector
<coll_t
>::iterator p
= i
.colls
.begin(); p
!= i
.colls
.end();
2200 cvec
[j
] = _get_collection(*p
);
2202 // note first collection we reference
2203 if (!j
&& !txc
->first_collection
)
2204 txc
->first_collection
= cvec
[j
];
2206 vector
<OnodeRef
> ovec(i
.objects
.size());
2208 for (int pos
= 0; i
.have_op(); ++pos
) {
2209 Transaction::Op
*op
= i
.decode_op();
2213 if (op
->op
== Transaction::OP_NOP
)
2216 // collection operations
2217 CollectionRef
&c
= cvec
[op
->cid
];
2219 case Transaction::OP_RMCOLL
:
2221 coll_t cid
= i
.get_cid(op
->cid
);
2222 r
= _remove_collection(txc
, cid
, &c
);
2228 case Transaction::OP_MKCOLL
:
2231 coll_t cid
= i
.get_cid(op
->cid
);
2232 r
= _create_collection(txc
, cid
, op
->split_bits
, &c
);
2238 case Transaction::OP_SPLIT_COLLECTION
:
2239 assert(0 == "deprecated");
2242 case Transaction::OP_SPLIT_COLLECTION2
:
2244 uint32_t bits
= op
->split_bits
;
2245 uint32_t rem
= op
->split_rem
;
2246 r
= _split_collection(txc
, c
, cvec
[op
->dest_cid
], bits
, rem
);
2252 case Transaction::OP_COLL_HINT
:
2254 uint32_t type
= op
->hint_type
;
2257 bufferlist::iterator hiter
= hint
.begin();
2258 if (type
== Transaction::COLL_HINT_EXPECTED_NUM_OBJECTS
) {
2261 ::decode(pg_num
, hiter
);
2262 ::decode(num_objs
, hiter
);
2263 dout(10) << __func__
<< " collection hint objects is a no-op, "
2264 << " pg_num " << pg_num
<< " num_objects " << num_objs
2268 dout(10) << __func__
<< " unknown collection hint " << type
<< dendl
;
2274 case Transaction::OP_COLL_SETATTR
:
2278 case Transaction::OP_COLL_RMATTR
:
2282 case Transaction::OP_COLL_RENAME
:
2283 assert(0 == "not implemented");
2287 derr
<< " error " << cpp_strerror(r
)
2288 << " not handled on operation " << op
->op
2289 << " (op " << pos
<< ", counting from 0)" << dendl
;
2290 dout(0) << " transaction dump:\n";
2291 JSONFormatter
f(true);
2292 f
.open_object_section("transaction");
2297 assert(0 == "unexpected error");
2300 // object operations
2301 RWLock::WLocker
l(c
->lock
);
2302 OnodeRef
&o
= ovec
[op
->oid
];
2304 // these operations implicity create the object
2305 bool create
= false;
2306 if (op
->op
== Transaction::OP_TOUCH
||
2307 op
->op
== Transaction::OP_WRITE
||
2308 op
->op
== Transaction::OP_ZERO
) {
2311 ghobject_t oid
= i
.get_oid(op
->oid
);
2312 o
= c
->get_onode(oid
, create
);
2314 if (!o
|| !o
->exists
) {
2315 dout(10) << __func__
<< " op " << op
->op
<< " got ENOENT on "
2324 case Transaction::OP_TOUCH
:
2325 r
= _touch(txc
, c
, o
);
2328 case Transaction::OP_WRITE
:
2330 uint64_t off
= op
->off
;
2331 uint64_t len
= op
->len
;
2332 uint32_t fadvise_flags
= i
.get_fadvise_flags();
2335 r
= _write(txc
, c
, o
, off
, len
, bl
, fadvise_flags
);
2339 case Transaction::OP_ZERO
:
2341 uint64_t off
= op
->off
;
2342 uint64_t len
= op
->len
;
2343 r
= _zero(txc
, c
, o
, off
, len
);
2347 case Transaction::OP_TRIMCACHE
:
2349 // deprecated, no-op
2353 case Transaction::OP_TRUNCATE
:
2355 uint64_t off
= op
->off
;
2356 r
= _truncate(txc
, c
, o
, off
);
2360 case Transaction::OP_REMOVE
:
2361 r
= _remove(txc
, c
, o
);
2364 case Transaction::OP_SETATTR
:
2366 string name
= i
.decode_string();
2369 map
<string
, bufferptr
> to_set
;
2370 to_set
[name
] = bufferptr(bl
.c_str(), bl
.length());
2371 r
= _setattrs(txc
, c
, o
, to_set
);
2375 case Transaction::OP_SETATTRS
:
2377 map
<string
, bufferptr
> aset
;
2378 i
.decode_attrset(aset
);
2379 r
= _setattrs(txc
, c
, o
, aset
);
2383 case Transaction::OP_RMATTR
:
2385 string name
= i
.decode_string();
2386 r
= _rmattr(txc
, c
, o
, name
);
2390 case Transaction::OP_RMATTRS
:
2392 r
= _rmattrs(txc
, c
, o
);
2396 case Transaction::OP_CLONE
:
2398 const ghobject_t
& noid
= i
.get_oid(op
->dest_oid
);
2399 OnodeRef no
= c
->get_onode(noid
, true);
2400 r
= _clone(txc
, c
, o
, no
);
2404 case Transaction::OP_CLONERANGE
:
2405 assert(0 == "deprecated");
2408 case Transaction::OP_CLONERANGE2
:
2410 const ghobject_t
& noid
= i
.get_oid(op
->dest_oid
);
2411 OnodeRef no
= c
->get_onode(noid
, true);
2412 uint64_t srcoff
= op
->off
;
2413 uint64_t len
= op
->len
;
2414 uint64_t dstoff
= op
->dest_off
;
2415 r
= _clone_range(txc
, c
, o
, no
, srcoff
, len
, dstoff
);
2419 case Transaction::OP_COLL_ADD
:
2420 assert(0 == "not implemented");
2423 case Transaction::OP_COLL_REMOVE
:
2424 assert(0 == "not implemented");
2427 case Transaction::OP_COLL_MOVE
:
2428 assert(0 == "deprecated");
2431 case Transaction::OP_COLL_MOVE_RENAME
:
2433 assert(op
->cid
== op
->dest_cid
);
2434 const ghobject_t
& noid
= i
.get_oid(op
->dest_oid
);
2435 OnodeRef no
= c
->get_onode(noid
, true);
2436 r
= _rename(txc
, c
, o
, no
, noid
);
2441 case Transaction::OP_TRY_RENAME
:
2443 const ghobject_t
& noid
= i
.get_oid(op
->dest_oid
);
2444 OnodeRef no
= c
->get_onode(noid
, true);
2445 r
= _rename(txc
, c
, o
, no
, noid
);
2452 case Transaction::OP_OMAP_CLEAR
:
2454 r
= _omap_clear(txc
, c
, o
);
2457 case Transaction::OP_OMAP_SETKEYS
:
2460 i
.decode_attrset_bl(&aset_bl
);
2461 r
= _omap_setkeys(txc
, c
, o
, aset_bl
);
2464 case Transaction::OP_OMAP_RMKEYS
:
2467 i
.decode_keyset_bl(&keys_bl
);
2468 r
= _omap_rmkeys(txc
, c
, o
, keys_bl
);
2471 case Transaction::OP_OMAP_RMKEYRANGE
:
2474 first
= i
.decode_string();
2475 last
= i
.decode_string();
2476 r
= _omap_rmkey_range(txc
, c
, o
, first
, last
);
2479 case Transaction::OP_OMAP_SETHEADER
:
2483 r
= _omap_setheader(txc
, c
, o
, bl
);
2487 case Transaction::OP_SETALLOCHINT
:
2489 uint64_t expected_object_size
= op
->expected_object_size
;
2490 uint64_t expected_write_size
= op
->expected_write_size
;
2491 uint32_t flags
= op
->alloc_hint_flags
;
2492 r
= _setallochint(txc
, c
, o
,
2493 expected_object_size
,
2494 expected_write_size
,
2500 derr
<< "bad op " << op
->op
<< dendl
;
2508 if (r
== -ENOENT
&& !(op
->op
== Transaction::OP_CLONERANGE
||
2509 op
->op
== Transaction::OP_CLONE
||
2510 op
->op
== Transaction::OP_CLONERANGE2
||
2511 op
->op
== Transaction::OP_COLL_ADD
))
2512 // -ENOENT is usually okay
2518 const char *msg
= "unexpected error code";
2520 if (r
== -ENOENT
&& (op
->op
== Transaction::OP_CLONERANGE
||
2521 op
->op
== Transaction::OP_CLONE
||
2522 op
->op
== Transaction::OP_CLONERANGE2
))
2523 msg
= "ENOENT on clone suggests osd bug";
2526 // For now, if we hit _any_ ENOSPC, crash, before we do any damage
2527 // by partially applying transactions.
2528 msg
= "ENOSPC from key value store, misconfigured cluster";
2530 if (r
== -ENOTEMPTY
) {
2531 msg
= "ENOTEMPTY suggests garbage data in osd data dir";
2534 dout(0) << " error " << cpp_strerror(r
) << " not handled on operation " << op
->op
2535 << " (op " << pos
<< ", counting from 0)" << dendl
;
2536 dout(0) << msg
<< dendl
;
2537 dout(0) << " transaction dump:\n";
2538 JSONFormatter
f(true);
2539 f
.open_object_section("transaction");
2544 assert(0 == "unexpected error");
2552 // -----------------
2555 int KStore::_touch(TransContext
*txc
,
2559 dout(15) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< dendl
;
2562 _assign_nid(txc
, o
);
2563 txc
->write_onode(o
);
2564 dout(10) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< " = " << r
<< dendl
;
2568 void KStore::_dump_onode(OnodeRef o
)
2570 dout(30) << __func__
<< " " << o
2571 << " nid " << o
->onode
.nid
2572 << " size " << o
->onode
.size
2573 << " expected_object_size " << o
->onode
.expected_object_size
2574 << " expected_write_size " << o
->onode
.expected_write_size
2576 for (map
<string
,bufferptr
>::iterator p
= o
->onode
.attrs
.begin();
2577 p
!= o
->onode
.attrs
.end();
2579 dout(30) << __func__
<< " attr " << p
->first
2580 << " len " << p
->second
.length() << dendl
;
2584 void KStore::_do_read_stripe(OnodeRef o
, uint64_t offset
, bufferlist
*pbl
)
2586 map
<uint64_t,bufferlist
>::iterator p
= o
->pending_stripes
.find(offset
);
2587 if (p
== o
->pending_stripes
.end()) {
2589 get_data_key(o
->onode
.nid
, offset
, &key
);
2590 db
->get(PREFIX_DATA
, key
, pbl
);
2591 o
->pending_stripes
[offset
] = *pbl
;
2597 void KStore::_do_write_stripe(TransContext
*txc
, OnodeRef o
,
2598 uint64_t offset
, bufferlist
& bl
)
2600 o
->pending_stripes
[offset
] = bl
;
2602 get_data_key(o
->onode
.nid
, offset
, &key
);
2603 txc
->t
->set(PREFIX_DATA
, key
, bl
);
2606 void KStore::_do_remove_stripe(TransContext
*txc
, OnodeRef o
, uint64_t offset
)
2608 o
->pending_stripes
.erase(offset
);
2610 get_data_key(o
->onode
.nid
, offset
, &key
);
2611 txc
->t
->rmkey(PREFIX_DATA
, key
);
2614 int KStore::_do_write(TransContext
*txc
,
2616 uint64_t offset
, uint64_t length
,
2617 bufferlist
& orig_bl
,
2618 uint32_t fadvise_flags
)
2622 dout(20) << __func__
2623 << " " << o
->oid
<< " " << offset
<< "~" << length
2624 << " - have " << o
->onode
.size
2625 << " bytes, nid " << o
->onode
.nid
<< dendl
;
2633 uint64_t stripe_size
= o
->onode
.stripe_size
;
2635 o
->onode
.stripe_size
= cct
->_conf
->kstore_default_stripe_size
;
2636 stripe_size
= o
->onode
.stripe_size
;
2639 unsigned bl_off
= 0;
2640 while (length
> 0) {
2641 uint64_t offset_rem
= offset
% stripe_size
;
2642 uint64_t end_rem
= (offset
+ length
) % stripe_size
;
2643 if (offset_rem
== 0 && end_rem
== 0) {
2645 bl
.substr_of(orig_bl
, bl_off
, stripe_size
);
2646 dout(30) << __func__
<< " full stripe " << offset
<< dendl
;
2647 _do_write_stripe(txc
, o
, offset
, bl
);
2648 offset
+= stripe_size
;
2649 length
-= stripe_size
;
2650 bl_off
+= stripe_size
;
2653 uint64_t stripe_off
= offset
- offset_rem
;
2655 _do_read_stripe(o
, stripe_off
, &prev
);
2656 dout(20) << __func__
<< " read previous stripe " << stripe_off
2657 << ", got " << prev
.length() << dendl
;
2660 unsigned p
= MIN(prev
.length(), offset_rem
);
2662 dout(20) << __func__
<< " reuse leading " << p
<< " bytes" << dendl
;
2663 bl
.substr_of(prev
, 0, p
);
2665 if (p
< offset_rem
) {
2666 dout(20) << __func__
<< " add leading " << offset_rem
- p
<< " zeros" << dendl
;
2667 bl
.append_zero(offset_rem
- p
);
2670 unsigned use
= stripe_size
- offset_rem
;
2672 use
-= stripe_size
- end_rem
;
2673 dout(20) << __func__
<< " using " << use
<< " for this stripe" << dendl
;
2675 t
.substr_of(orig_bl
, bl_off
, use
);
2679 if (end_rem
< prev
.length()) {
2680 unsigned l
= prev
.length() - end_rem
;
2681 dout(20) << __func__
<< " reuse trailing " << l
<< " bytes" << dendl
;
2683 t
.substr_of(prev
, end_rem
, l
);
2687 dout(30) << " writing:\n";
2690 _do_write_stripe(txc
, o
, stripe_off
, bl
);
2695 if (offset
> o
->onode
.size
) {
2696 dout(20) << __func__
<< " extending size to " << offset
+ length
2698 o
->onode
.size
= offset
;
2704 int KStore::_write(TransContext
*txc
,
2707 uint64_t offset
, size_t length
,
2709 uint32_t fadvise_flags
)
2711 dout(15) << __func__
<< " " << c
->cid
<< " " << o
->oid
2712 << " " << offset
<< "~" << length
2714 _assign_nid(txc
, o
);
2715 int r
= _do_write(txc
, o
, offset
, length
, bl
, fadvise_flags
);
2716 txc
->write_onode(o
);
2718 dout(10) << __func__
<< " " << c
->cid
<< " " << o
->oid
2719 << " " << offset
<< "~" << length
2720 << " = " << r
<< dendl
;
2724 int KStore::_zero(TransContext
*txc
,
2727 uint64_t offset
, size_t length
)
2729 dout(15) << __func__
<< " " << c
->cid
<< " " << o
->oid
2730 << " " << offset
<< "~" << length
2736 _assign_nid(txc
, o
);
2738 uint64_t stripe_size
= o
->onode
.stripe_size
;
2740 uint64_t end
= offset
+ length
;
2741 uint64_t pos
= offset
;
2742 uint64_t stripe_off
= pos
% stripe_size
;
2743 while (pos
< offset
+ length
) {
2744 if (stripe_off
|| end
- pos
< stripe_size
) {
2746 _do_read_stripe(o
, pos
- stripe_off
, &stripe
);
2747 dout(30) << __func__
<< " stripe " << pos
- stripe_off
<< " got "
2748 << stripe
.length() << dendl
;
2750 bl
.substr_of(stripe
, 0, MIN(stripe
.length(), stripe_off
));
2751 if (end
>= pos
- stripe_off
+ stripe_size
||
2752 end
>= o
->onode
.size
) {
2753 dout(20) << __func__
<< " truncated stripe " << pos
- stripe_off
2754 << " to " << bl
.length() << dendl
;
2756 auto len
= end
- (pos
- stripe_off
+ bl
.length());
2757 bl
.append_zero(len
);
2758 dout(20) << __func__
<< " adding " << len
<< " of zeros" << dendl
;
2759 if (stripe
.length() > bl
.length()) {
2760 unsigned l
= stripe
.length() - bl
.length();
2762 t
.substr_of(stripe
, stripe
.length() - l
, l
);
2763 dout(20) << __func__
<< " keeping tail " << l
<< " of stripe" << dendl
;
2767 _do_write_stripe(txc
, o
, pos
- stripe_off
, bl
);
2768 pos
+= stripe_size
- stripe_off
;
2771 dout(20) << __func__
<< " rm stripe " << pos
<< dendl
;
2772 _do_remove_stripe(txc
, o
, pos
- stripe_off
);
2777 if (offset
+ length
> o
->onode
.size
) {
2778 o
->onode
.size
= offset
+ length
;
2779 dout(20) << __func__
<< " extending size to " << offset
+ length
2782 txc
->write_onode(o
);
2784 dout(10) << __func__
<< " " << c
->cid
<< " " << o
->oid
2785 << " " << offset
<< "~" << length
2786 << " = " << r
<< dendl
;
2790 int KStore::_do_truncate(TransContext
*txc
, OnodeRef o
, uint64_t offset
)
2792 uint64_t stripe_size
= o
->onode
.stripe_size
;
2796 // trim down stripes
2798 uint64_t pos
= offset
;
2799 uint64_t stripe_off
= pos
% stripe_size
;
2800 while (pos
< o
->onode
.size
) {
2803 _do_read_stripe(o
, pos
- stripe_off
, &stripe
);
2804 dout(30) << __func__
<< " stripe " << pos
- stripe_off
<< " got "
2805 << stripe
.length() << dendl
;
2807 t
.substr_of(stripe
, 0, MIN(stripe_off
, stripe
.length()));
2808 _do_write_stripe(txc
, o
, pos
- stripe_off
, t
);
2809 dout(20) << __func__
<< " truncated stripe " << pos
- stripe_off
2810 << " to " << t
.length() << dendl
;
2811 pos
+= stripe_size
- stripe_off
;
2814 dout(20) << __func__
<< " rm stripe " << pos
<< dendl
;
2815 _do_remove_stripe(txc
, o
, pos
- stripe_off
);
2820 // trim down cached tail
2821 if (o
->tail_bl
.length()) {
2822 if (offset
/ stripe_size
!= o
->onode
.size
/ stripe_size
) {
2823 dout(20) << __func__
<< " clear cached tail" << dendl
;
2829 o
->onode
.size
= offset
;
2830 dout(10) << __func__
<< " truncate size to " << offset
<< dendl
;
2832 txc
->write_onode(o
);
2836 int KStore::_truncate(TransContext
*txc
,
2841 dout(15) << __func__
<< " " << c
->cid
<< " " << o
->oid
2844 int r
= _do_truncate(txc
, o
, offset
);
2845 dout(10) << __func__
<< " " << c
->cid
<< " " << o
->oid
2847 << " = " << r
<< dendl
;
2851 int KStore::_do_remove(TransContext
*txc
,
2856 _do_truncate(txc
, o
, 0);
2859 if (o
->onode
.omap_head
) {
2860 _do_omap_clear(txc
, o
->onode
.omap_head
);
2863 o
->onode
= kstore_onode_t();
2864 txc
->onodes
.erase(o
);
2865 get_object_key(cct
, o
->oid
, &key
);
2866 txc
->t
->rmkey(PREFIX_OBJ
, key
);
2870 int KStore::_remove(TransContext
*txc
,
2874 dout(15) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< dendl
;
2875 int r
= _do_remove(txc
, o
);
2876 dout(10) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< " = " << r
<< dendl
;
2880 int KStore::_setattr(TransContext
*txc
,
2886 dout(15) << __func__
<< " " << c
->cid
<< " " << o
->oid
2887 << " " << name
<< " (" << val
.length() << " bytes)"
2890 o
->onode
.attrs
[name
] = val
;
2891 txc
->write_onode(o
);
2892 dout(10) << __func__
<< " " << c
->cid
<< " " << o
->oid
2893 << " " << name
<< " (" << val
.length() << " bytes)"
2894 << " = " << r
<< dendl
;
2898 int KStore::_setattrs(TransContext
*txc
,
2901 const map
<string
,bufferptr
>& aset
)
2903 dout(15) << __func__
<< " " << c
->cid
<< " " << o
->oid
2904 << " " << aset
.size() << " keys"
2907 for (map
<string
,bufferptr
>::const_iterator p
= aset
.begin();
2908 p
!= aset
.end(); ++p
) {
2909 if (p
->second
.is_partial())
2910 o
->onode
.attrs
[p
->first
] = bufferptr(p
->second
.c_str(), p
->second
.length());
2912 o
->onode
.attrs
[p
->first
] = p
->second
;
2914 txc
->write_onode(o
);
2915 dout(10) << __func__
<< " " << c
->cid
<< " " << o
->oid
2916 << " " << aset
.size() << " keys"
2917 << " = " << r
<< dendl
;
2922 int KStore::_rmattr(TransContext
*txc
,
2927 dout(15) << __func__
<< " " << c
->cid
<< " " << o
->oid
2928 << " " << name
<< dendl
;
2930 o
->onode
.attrs
.erase(name
);
2931 txc
->write_onode(o
);
2932 dout(10) << __func__
<< " " << c
->cid
<< " " << o
->oid
2933 << " " << name
<< " = " << r
<< dendl
;
2937 int KStore::_rmattrs(TransContext
*txc
,
2941 dout(15) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< dendl
;
2943 o
->onode
.attrs
.clear();
2944 txc
->write_onode(o
);
2945 dout(10) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< " = " << r
<< dendl
;
2949 void KStore::_do_omap_clear(TransContext
*txc
, uint64_t id
)
2951 KeyValueDB::Iterator it
= db
->get_iterator(PREFIX_OMAP
);
2952 string prefix
, tail
;
2953 get_omap_header(id
, &prefix
);
2954 get_omap_tail(id
, &tail
);
2955 it
->lower_bound(prefix
);
2956 while (it
->valid()) {
2957 if (it
->key() >= tail
) {
2958 dout(30) << __func__
<< " stop at " << tail
<< dendl
;
2961 txc
->t
->rmkey(PREFIX_OMAP
, it
->key());
2962 dout(30) << __func__
<< " rm " << pretty_binary_string(it
->key()) << dendl
;
2967 int KStore::_omap_clear(TransContext
*txc
,
2971 dout(15) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< dendl
;
2973 if (o
->onode
.omap_head
!= 0) {
2974 _do_omap_clear(txc
, o
->onode
.omap_head
);
2976 dout(10) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< " = " << r
<< dendl
;
2980 int KStore::_omap_setkeys(TransContext
*txc
,
2985 dout(15) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< dendl
;
2987 bufferlist::iterator p
= bl
.begin();
2989 if (!o
->onode
.omap_head
) {
2990 o
->onode
.omap_head
= o
->onode
.nid
;
2991 txc
->write_onode(o
);
3000 get_omap_key(o
->onode
.omap_head
, key
, &final_key
);
3001 dout(30) << __func__
<< " " << pretty_binary_string(final_key
)
3002 << " <- " << key
<< dendl
;
3003 txc
->t
->set(PREFIX_OMAP
, final_key
, value
);
3006 dout(10) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< " = " << r
<< dendl
;
3010 int KStore::_omap_setheader(TransContext
*txc
,
3015 dout(15) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< dendl
;
3018 if (!o
->onode
.omap_head
) {
3019 o
->onode
.omap_head
= o
->onode
.nid
;
3020 txc
->write_onode(o
);
3022 get_omap_header(o
->onode
.omap_head
, &key
);
3023 txc
->t
->set(PREFIX_OMAP
, key
, bl
);
3025 dout(10) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< " = " << r
<< dendl
;
3029 int KStore::_omap_rmkeys(TransContext
*txc
,
3034 dout(15) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< dendl
;
3036 bufferlist::iterator p
= bl
.begin();
3039 if (!o
->onode
.omap_head
) {
3048 get_omap_key(o
->onode
.omap_head
, key
, &final_key
);
3049 dout(30) << __func__
<< " rm " << pretty_binary_string(final_key
)
3050 << " <- " << key
<< dendl
;
3051 txc
->t
->rmkey(PREFIX_OMAP
, final_key
);
3056 dout(10) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< " = " << r
<< dendl
;
3060 int KStore::_omap_rmkey_range(TransContext
*txc
,
3063 const string
& first
, const string
& last
)
3065 dout(15) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< dendl
;
3066 KeyValueDB::Iterator it
;
3067 string key_first
, key_last
;
3070 if (!o
->onode
.omap_head
) {
3073 it
= db
->get_iterator(PREFIX_OMAP
);
3074 get_omap_key(o
->onode
.omap_head
, first
, &key_first
);
3075 get_omap_key(o
->onode
.omap_head
, last
, &key_last
);
3076 it
->lower_bound(key_first
);
3077 while (it
->valid()) {
3078 if (it
->key() >= key_last
) {
3079 dout(30) << __func__
<< " stop at " << pretty_binary_string(key_last
)
3083 txc
->t
->rmkey(PREFIX_OMAP
, it
->key());
3084 dout(30) << __func__
<< " rm " << pretty_binary_string(it
->key()) << dendl
;
3090 dout(10) << __func__
<< " " << c
->cid
<< " " << o
->oid
<< " = " << r
<< dendl
;
3094 int KStore::_setallochint(TransContext
*txc
,
3097 uint64_t expected_object_size
,
3098 uint64_t expected_write_size
,
3101 dout(15) << __func__
<< " " << c
->cid
<< " " << o
->oid
3102 << " object_size " << expected_object_size
3103 << " write_size " << expected_write_size
3104 << " flags " << flags
3107 o
->onode
.expected_object_size
= expected_object_size
;
3108 o
->onode
.expected_write_size
= expected_write_size
;
3109 o
->onode
.alloc_hint_flags
= flags
;
3111 txc
->write_onode(o
);
3112 dout(10) << __func__
<< " " << c
->cid
<< " " << o
->oid
3113 << " object_size " << expected_object_size
3114 << " write_size " << expected_write_size
3115 << " = " << r
<< dendl
;
3119 int KStore::_clone(TransContext
*txc
,
3124 dout(15) << __func__
<< " " << c
->cid
<< " " << oldo
->oid
<< " -> "
3125 << newo
->oid
<< dendl
;
3127 if (oldo
->oid
.hobj
.get_hash() != newo
->oid
.hobj
.get_hash()) {
3128 derr
<< __func__
<< " mismatched hash on " << oldo
->oid
3129 << " and " << newo
->oid
<< dendl
;
3134 newo
->exists
= true;
3135 _assign_nid(txc
, newo
);
3140 r
= _do_read(oldo
, 0, oldo
->onode
.size
, bl
, 0);
3144 // truncate any old data
3145 r
= _do_truncate(txc
, newo
, 0);
3149 r
= _do_write(txc
, newo
, 0, oldo
->onode
.size
, bl
, 0);
3153 newo
->onode
.attrs
= oldo
->onode
.attrs
;
3156 if (newo
->onode
.omap_head
) {
3157 dout(20) << __func__
<< " clearing old omap data" << dendl
;
3158 _do_omap_clear(txc
, newo
->onode
.omap_head
);
3160 if (oldo
->onode
.omap_head
) {
3161 dout(20) << __func__
<< " copying omap data" << dendl
;
3162 if (!newo
->onode
.omap_head
) {
3163 newo
->onode
.omap_head
= newo
->onode
.nid
;
3165 KeyValueDB::Iterator it
= db
->get_iterator(PREFIX_OMAP
);
3167 get_omap_header(oldo
->onode
.omap_head
, &head
);
3168 get_omap_tail(oldo
->onode
.omap_head
, &tail
);
3169 it
->lower_bound(head
);
3170 while (it
->valid()) {
3172 if (it
->key() >= tail
) {
3173 dout(30) << __func__
<< " reached tail" << dendl
;
3176 dout(30) << __func__
<< " got header/data "
3177 << pretty_binary_string(it
->key()) << dendl
;
3178 assert(it
->key() < tail
);
3179 rewrite_omap_key(newo
->onode
.omap_head
, it
->key(), &key
);
3180 txc
->t
->set(PREFIX_OMAP
, key
, it
->value());
3186 txc
->write_onode(newo
);
3190 dout(10) << __func__
<< " " << c
->cid
<< " " << oldo
->oid
<< " -> "
3191 << newo
->oid
<< " = " << r
<< dendl
;
3195 int KStore::_clone_range(TransContext
*txc
,
3199 uint64_t srcoff
, uint64_t length
, uint64_t dstoff
)
3201 dout(15) << __func__
<< " " << c
->cid
<< " " << oldo
->oid
<< " -> "
3202 << newo
->oid
<< " from " << srcoff
<< "~" << length
3203 << " to offset " << dstoff
<< dendl
;
3207 newo
->exists
= true;
3208 _assign_nid(txc
, newo
);
3210 r
= _do_read(oldo
, srcoff
, length
, bl
, 0);
3214 r
= _do_write(txc
, newo
, dstoff
, bl
.length(), bl
, 0);
3218 txc
->write_onode(newo
);
3223 dout(10) << __func__
<< " " << c
->cid
<< " " << oldo
->oid
<< " -> "
3224 << newo
->oid
<< " from " << srcoff
<< "~" << length
3225 << " to offset " << dstoff
3226 << " = " << r
<< dendl
;
3230 int KStore::_rename(TransContext
*txc
,
3234 const ghobject_t
& new_oid
)
3236 dout(15) << __func__
<< " " << c
->cid
<< " " << oldo
->oid
<< " -> "
3237 << new_oid
<< dendl
;
3239 ghobject_t old_oid
= oldo
->oid
;
3241 string old_key
, new_key
;
3243 if (newo
&& newo
->exists
) {
3244 // destination object already exists, remove it first
3245 r
= _do_remove(txc
, newo
);
3250 txc
->t
->rmkey(PREFIX_OBJ
, oldo
->key
);
3251 txc
->write_onode(oldo
);
3252 c
->onode_map
.rename(old_oid
, new_oid
); // this adjusts oldo->{oid,key}
3256 dout(10) << __func__
<< " " << c
->cid
<< " " << old_oid
<< " -> "
3257 << new_oid
<< " = " << r
<< dendl
;
3263 int KStore::_create_collection(
3269 dout(15) << __func__
<< " " << cid
<< " bits " << bits
<< dendl
;
3274 RWLock::WLocker
l(coll_lock
);
3279 c
->reset(new Collection(this, cid
));
3280 (*c
)->cnode
.bits
= bits
;
3283 ::encode((*c
)->cnode
, bl
);
3284 txc
->t
->set(PREFIX_COLL
, stringify(cid
), bl
);
3288 dout(10) << __func__
<< " " << cid
<< " bits " << bits
<< " = " << r
<< dendl
;
3292 int KStore::_remove_collection(TransContext
*txc
, coll_t cid
,
3295 dout(15) << __func__
<< " " << cid
<< dendl
;
3299 RWLock::WLocker
l(coll_lock
);
3304 size_t nonexistent_count
= 0;
3305 pair
<ghobject_t
,OnodeRef
> next_onode
;
3306 while ((*c
)->onode_map
.get_next(next_onode
.first
, &next_onode
)) {
3307 if (next_onode
.second
->exists
) {
3311 ++nonexistent_count
;
3313 vector
<ghobject_t
> ls
;
3315 // Enumerate onodes in db, up to nonexistent_count + 1
3316 // then check if all of them are marked as non-existent.
3317 // Bypass the check if returned number is greater than nonexistent_count
3318 r
= _collection_list(c
->get(), ghobject_t(), ghobject_t::get_max(),
3319 nonexistent_count
+ 1, &ls
, &next
);
3321 bool exists
= false; //ls.size() > nonexistent_count;
3322 for (auto it
= ls
.begin(); !exists
&& it
< ls
.end(); ++it
) {
3323 dout(10) << __func__
<< " oid " << *it
<< dendl
;
3324 auto onode
= (*c
)->onode_map
.lookup(*it
);
3325 exists
= !onode
|| onode
->exists
;
3327 dout(10) << __func__
<< " " << *it
3328 << " exists in db" << dendl
;
3332 coll_map
.erase(cid
);
3333 txc
->removed_collections
.push_back(*c
);
3335 txc
->t
->rmkey(PREFIX_COLL
, stringify(cid
));
3338 dout(10) << __func__
<< " " << cid
3339 << " is non-empty" << dendl
;
3346 dout(10) << __func__
<< " " << cid
<< " = " << r
<< dendl
;
3350 int KStore::_split_collection(TransContext
*txc
,
3353 unsigned bits
, int rem
)
3355 dout(15) << __func__
<< " " << c
->cid
<< " to " << d
->cid
<< " "
3356 << " bits " << bits
<< dendl
;
3358 RWLock::WLocker
l(c
->lock
);
3359 RWLock::WLocker
l2(d
->lock
);
3360 c
->onode_map
.clear();
3361 d
->onode_map
.clear();
3362 c
->cnode
.bits
= bits
;
3363 assert(d
->cnode
.bits
== bits
);
3367 ::encode(c
->cnode
, bl
);
3368 txc
->t
->set(PREFIX_COLL
, stringify(c
->cid
), bl
);
3370 dout(10) << __func__
<< " " << c
->cid
<< " to " << d
->cid
<< " "
3371 << " bits " << bits
<< " = " << r
<< dendl
;
3375 // ===========================================