1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 Red Hat
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #ifndef CEPH_OSD_BLUESTORE_H
16 #define CEPH_OSD_BLUESTORE_H
26 #include <condition_variable>
28 #include <boost/intrusive/list.hpp>
29 #include <boost/intrusive/unordered_set.hpp>
30 #include <boost/intrusive/set.hpp>
31 #include <boost/functional/hash.hpp>
32 #include <boost/dynamic_bitset.hpp>
33 #include <boost/circular_buffer.hpp>
35 #include "include/cpp-btree/btree_set.h"
37 #include "include/ceph_assert.h"
38 #include "include/unordered_map.h"
39 #include "include/mempool.h"
40 #include "include/hash.h"
41 #include "common/bloom_filter.hpp"
42 #include "common/Finisher.h"
43 #include "common/ceph_mutex.h"
44 #include "common/Throttle.h"
45 #include "common/perf_counters.h"
46 #include "common/PriorityCache.h"
47 #include "compressor/Compressor.h"
48 #include "os/ObjectStore.h"
50 #include "bluestore_types.h"
51 #include "BlockDevice.h"
53 #include "common/EventTrace.h"
56 class FreelistManager
;
57 class BlueStoreRepairer
;
60 //#define DEBUG_DEFERRED
64 // constants for Buffer::optimize()
65 #define MAX_BUFFER_SLOP_RATIO_DEN 8 // so actually 1/N
69 l_bluestore_first
= 732430,
70 l_bluestore_kv_flush_lat
,
71 l_bluestore_kv_commit_lat
,
72 l_bluestore_kv_sync_lat
,
73 l_bluestore_kv_final_lat
,
74 l_bluestore_state_prepare_lat
,
75 l_bluestore_state_aio_wait_lat
,
76 l_bluestore_state_io_done_lat
,
77 l_bluestore_state_kv_queued_lat
,
78 l_bluestore_state_kv_committing_lat
,
79 l_bluestore_state_kv_done_lat
,
80 l_bluestore_state_deferred_queued_lat
,
81 l_bluestore_state_deferred_aio_wait_lat
,
82 l_bluestore_state_deferred_cleanup_lat
,
83 l_bluestore_state_finishing_lat
,
84 l_bluestore_state_done_lat
,
85 l_bluestore_throttle_lat
,
86 l_bluestore_submit_lat
,
87 l_bluestore_commit_lat
,
89 l_bluestore_read_onode_meta_lat
,
90 l_bluestore_read_wait_aio_lat
,
91 l_bluestore_compress_lat
,
92 l_bluestore_decompress_lat
,
94 l_bluestore_compress_success_count
,
95 l_bluestore_compress_rejected_count
,
96 l_bluestore_write_pad_bytes
,
97 l_bluestore_deferred_write_ops
,
98 l_bluestore_deferred_write_bytes
,
99 l_bluestore_write_penalty_read_ops
,
100 l_bluestore_allocated
,
102 l_bluestore_compressed
,
103 l_bluestore_compressed_allocated
,
104 l_bluestore_compressed_original
,
106 l_bluestore_pinned_onodes
,
107 l_bluestore_onode_hits
,
108 l_bluestore_onode_misses
,
109 l_bluestore_onode_shard_hits
,
110 l_bluestore_onode_shard_misses
,
114 l_bluestore_buffer_bytes
,
115 l_bluestore_buffer_hit_bytes
,
116 l_bluestore_buffer_miss_bytes
,
117 l_bluestore_write_big
,
118 l_bluestore_write_big_bytes
,
119 l_bluestore_write_big_blobs
,
120 l_bluestore_write_small
,
121 l_bluestore_write_small_bytes
,
122 l_bluestore_write_small_unused
,
123 l_bluestore_write_small_deferred
,
124 l_bluestore_write_small_pre_read
,
125 l_bluestore_write_small_new
,
127 l_bluestore_onode_reshard
,
128 l_bluestore_blob_split
,
129 l_bluestore_extent_compress
,
130 l_bluestore_gc_merged
,
131 l_bluestore_read_eio
,
132 l_bluestore_reads_with_retries
,
133 l_bluestore_fragmentation
,
134 l_bluestore_omap_seek_to_first_lat
,
135 l_bluestore_omap_upper_bound_lat
,
136 l_bluestore_omap_lower_bound_lat
,
137 l_bluestore_omap_next_lat
,
138 l_bluestore_clist_lat
,
142 #define META_POOL_ID ((uint64_t)-1ull)
144 class BlueStore
: public ObjectStore
,
145 public BlueFSDeviceExpander
,
146 public md_config_obs_t
{
147 // -----------------------------------------------------
151 const char** get_tracked_conf_keys() const override
;
152 void handle_conf_change(const ConfigProxy
& conf
,
153 const std::set
<std::string
> &changed
) override
;
155 //handler for discard event
156 void handle_discard(interval_set
<uint64_t>& to_release
);
159 void _set_compression();
160 void _set_throttle_params();
161 int _set_cache_sizes();
162 void _set_max_defer_interval() {
164 cct
->_conf
.get_val
<double>("bluestore_max_defer_interval");
169 typedef map
<uint64_t, bufferlist
> ready_regions_t
;
174 typedef boost::intrusive_ptr
<Collection
> CollectionRef
;
177 virtual void aio_finish(BlueStore
*store
) = 0;
178 virtual ~AioContext() {}
183 MEMPOOL_CLASS_HELPERS();
186 STATE_EMPTY
, ///< empty buffer -- used for cache history
187 STATE_CLEAN
, ///< clean data that is up to date
188 STATE_WRITING
, ///< data that is being written (io not yet complete)
190 static const char *get_state_name(int s
) {
192 case STATE_EMPTY
: return "empty";
193 case STATE_CLEAN
: return "clean";
194 case STATE_WRITING
: return "writing";
195 default: return "???";
199 FLAG_NOCACHE
= 1, ///< trim when done WRITING (do not become CLEAN)
200 // NOTE: fix operator<< when you define a second flag
202 static const char *get_flag_name(int s
) {
204 case FLAG_NOCACHE
: return "nocache";
205 default: return "???";
210 uint16_t state
; ///< STATE_*
211 uint16_t cache_private
= 0; ///< opaque (to us) value used by Cache impl
212 uint32_t flags
; ///< FLAG_*
214 uint32_t offset
, length
;
217 boost::intrusive::list_member_hook
<> lru_item
;
218 boost::intrusive::list_member_hook
<> state_item
;
220 Buffer(BufferSpace
*space
, unsigned s
, uint64_t q
, uint32_t o
, uint32_t l
,
222 : space(space
), state(s
), flags(f
), seq(q
), offset(o
), length(l
) {}
223 Buffer(BufferSpace
*space
, unsigned s
, uint64_t q
, uint32_t o
, bufferlist
& b
,
225 : space(space
), state(s
), flags(f
), seq(q
), offset(o
),
226 length(b
.length()), data(b
) {}
228 bool is_empty() const {
229 return state
== STATE_EMPTY
;
231 bool is_clean() const {
232 return state
== STATE_CLEAN
;
234 bool is_writing() const {
235 return state
== STATE_WRITING
;
238 uint32_t end() const {
239 return offset
+ length
;
242 void truncate(uint32_t newlen
) {
243 ceph_assert(newlen
< length
);
246 t
.substr_of(data
, 0, newlen
);
251 void maybe_rebuild() {
253 (data
.get_num_buffers() > 1 ||
254 data
.front().wasted() > data
.length() / MAX_BUFFER_SLOP_RATIO_DEN
)) {
259 void dump(Formatter
*f
) const {
260 f
->dump_string("state", get_state_name(state
));
261 f
->dump_unsigned("seq", seq
);
262 f
->dump_unsigned("offset", offset
);
263 f
->dump_unsigned("length", length
);
264 f
->dump_unsigned("data_length", data
.length());
268 struct BufferCacheShard
;
270 /// map logical extent range (object) onto buffers
273 BYPASS_CLEAN_CACHE
= 0x1, // bypass clean cache
276 typedef boost::intrusive::list
<
278 boost::intrusive::member_hook
<
280 boost::intrusive::list_member_hook
<>,
281 &Buffer::state_item
> > state_list_t
;
283 mempool::bluestore_cache_other::map
<uint32_t, std::unique_ptr
<Buffer
>>
286 // we use a bare intrusive list here instead of std::map because
287 // it uses less memory and we expect this to be very small (very
288 // few IOs in flight to the same Blob at the same time).
289 state_list_t writing
; ///< writing buffers, sorted by seq, ascending
292 ceph_assert(buffer_map
.empty());
293 ceph_assert(writing
.empty());
296 void _add_buffer(BufferCacheShard
* cache
, Buffer
*b
, int level
, Buffer
*near
) {
297 cache
->_audit("_add_buffer start");
298 buffer_map
[b
->offset
].reset(b
);
299 if (b
->is_writing()) {
300 b
->data
.reassign_to_mempool(mempool::mempool_bluestore_writing
);
301 if (writing
.empty() || writing
.rbegin()->seq
<= b
->seq
) {
302 writing
.push_back(*b
);
304 auto it
= writing
.begin();
305 while (it
->seq
< b
->seq
) {
309 ceph_assert(it
->seq
>= b
->seq
);
310 // note that this will insert b before it
311 // hence the order is maintained
312 writing
.insert(it
, *b
);
315 b
->data
.reassign_to_mempool(mempool::mempool_bluestore_cache_data
);
316 cache
->_add(b
, level
, near
);
318 cache
->_audit("_add_buffer end");
320 void _rm_buffer(BufferCacheShard
* cache
, Buffer
*b
) {
321 _rm_buffer(cache
, buffer_map
.find(b
->offset
));
323 void _rm_buffer(BufferCacheShard
* cache
,
324 map
<uint32_t, std::unique_ptr
<Buffer
>>::iterator p
) {
325 ceph_assert(p
!= buffer_map
.end());
326 cache
->_audit("_rm_buffer start");
327 if (p
->second
->is_writing()) {
328 writing
.erase(writing
.iterator_to(*p
->second
));
330 cache
->_rm(p
->second
.get());
333 cache
->_audit("_rm_buffer end");
336 map
<uint32_t,std::unique_ptr
<Buffer
>>::iterator
_data_lower_bound(
338 auto i
= buffer_map
.lower_bound(offset
);
339 if (i
!= buffer_map
.begin()) {
341 if (i
->first
+ i
->second
->length
<= offset
)
347 // must be called under protection of the Cache lock
348 void _clear(BufferCacheShard
* cache
);
350 // return value is the highest cache_private of a trimmed buffer, or 0.
351 int discard(BufferCacheShard
* cache
, uint32_t offset
, uint32_t length
) {
352 std::lock_guard
l(cache
->lock
);
353 int ret
= _discard(cache
, offset
, length
);
357 int _discard(BufferCacheShard
* cache
, uint32_t offset
, uint32_t length
);
359 void write(BufferCacheShard
* cache
, uint64_t seq
, uint32_t offset
, bufferlist
& bl
,
361 std::lock_guard
l(cache
->lock
);
362 Buffer
*b
= new Buffer(this, Buffer::STATE_WRITING
, seq
, offset
, bl
,
364 b
->cache_private
= _discard(cache
, offset
, bl
.length());
365 _add_buffer(cache
, b
, (flags
& Buffer::FLAG_NOCACHE
) ? 0 : 1, nullptr);
368 void _finish_write(BufferCacheShard
* cache
, uint64_t seq
);
369 void did_read(BufferCacheShard
* cache
, uint32_t offset
, bufferlist
& bl
) {
370 std::lock_guard
l(cache
->lock
);
371 Buffer
*b
= new Buffer(this, Buffer::STATE_CLEAN
, 0, offset
, bl
);
372 b
->cache_private
= _discard(cache
, offset
, bl
.length());
373 _add_buffer(cache
, b
, 1, nullptr);
377 void read(BufferCacheShard
* cache
, uint32_t offset
, uint32_t length
,
378 BlueStore::ready_regions_t
& res
,
379 interval_set
<uint32_t>& res_intervals
,
382 void truncate(BufferCacheShard
* cache
, uint32_t offset
) {
383 discard(cache
, offset
, (uint32_t)-1 - offset
);
386 void split(BufferCacheShard
* cache
, size_t pos
, BufferSpace
&r
);
388 void dump(BufferCacheShard
* cache
, Formatter
*f
) const {
389 std::lock_guard
l(cache
->lock
);
390 f
->open_array_section("buffers");
391 for (auto& i
: buffer_map
) {
392 f
->open_object_section("buffer");
393 ceph_assert(i
.first
== i
.second
->offset
);
401 struct SharedBlobSet
;
403 /// in-memory shared blob state (incl cached buffers)
405 MEMPOOL_CLASS_HELPERS();
407 std::atomic_int nref
= {0}; ///< reference count
412 uint64_t sbid_unloaded
; ///< sbid if persistent isn't loaded
413 bluestore_shared_blob_t
*persistent
; ///< persistent part of the shared blob if any
415 BufferSpace bc
; ///< buffer cache
417 SharedBlob(Collection
*_coll
) : coll(_coll
), sbid_unloaded(0) {
419 get_cache()->add_blob();
422 SharedBlob(uint64_t i
, Collection
*_coll
);
425 uint64_t get_sbid() const {
426 return loaded
? persistent
->sbid
: sbid_unloaded
;
429 friend void intrusive_ptr_add_ref(SharedBlob
*b
) { b
->get(); }
430 friend void intrusive_ptr_release(SharedBlob
*b
) { b
->put(); }
432 void dump(Formatter
* f
) const;
433 friend ostream
& operator<<(ostream
& out
, const SharedBlob
& sb
);
440 /// get logical references
441 void get_ref(uint64_t offset
, uint32_t length
);
443 /// put logical references, and get back any released extents
444 void put_ref(uint64_t offset
, uint32_t length
,
445 PExtentVector
*r
, bool *unshare
);
447 void finish_write(uint64_t seq
);
449 friend bool operator==(const SharedBlob
&l
, const SharedBlob
&r
) {
450 return l
.get_sbid() == r
.get_sbid();
452 inline BufferCacheShard
* get_cache() {
453 return coll
? coll
->cache
: nullptr;
455 inline SharedBlobSet
* get_parent() {
456 return coll
? &(coll
->shared_blob_set
) : nullptr;
458 inline bool is_loaded() const {
463 typedef boost::intrusive_ptr
<SharedBlob
> SharedBlobRef
;
465 /// a lookup table of SharedBlobs
466 struct SharedBlobSet
{
467 /// protect lookup, insertion, removal
468 ceph::mutex lock
= ceph::make_mutex("BlueStore::SharedBlobSet::lock");
470 // we use a bare pointer because we don't want to affect the ref
472 mempool::bluestore_cache_other::unordered_map
<uint64_t,SharedBlob
*> sb_map
;
474 SharedBlobRef
lookup(uint64_t sbid
) {
475 std::lock_guard
l(lock
);
476 auto p
= sb_map
.find(sbid
);
477 if (p
== sb_map
.end() ||
478 p
->second
->nref
== 0) {
484 void add(Collection
* coll
, SharedBlob
*sb
) {
485 std::lock_guard
l(lock
);
486 sb_map
[sb
->get_sbid()] = sb
;
490 bool remove(SharedBlob
*sb
, bool verify_nref_is_zero
=false) {
491 std::lock_guard
l(lock
);
492 ceph_assert(sb
->get_parent() == this);
493 if (verify_nref_is_zero
&& sb
->nref
!= 0) {
496 // only remove if it still points to us
497 auto p
= sb_map
.find(sb
->get_sbid());
498 if (p
!= sb_map
.end() &&
506 std::lock_guard
l(lock
);
507 return sb_map
.empty();
510 template <int LogLevelV
>
511 void dump(CephContext
*cct
);
514 //#define CACHE_BLOB_BL // not sure if this is a win yet or not... :/
516 /// in-memory blob metadata and associated cached buffers (if any)
518 MEMPOOL_CLASS_HELPERS();
520 std::atomic_int nref
= {0}; ///< reference count
521 int16_t id
= -1; ///< id, for spanning blobs only, >= 0
522 int16_t last_encoded_id
= -1; ///< (ephemeral) used during encoding only
523 SharedBlobRef shared_blob
; ///< shared blob state (if any)
526 mutable bluestore_blob_t blob
; ///< decoded blob metadata
528 mutable bufferlist blob_bl
; ///< cached encoded blob, blob is dirty if empty
530 /// refs from this shard. ephemeral if id<0, persisted if spanning.
531 bluestore_blob_use_tracker_t used_in_blob
;
535 friend void intrusive_ptr_add_ref(Blob
*b
) { b
->get(); }
536 friend void intrusive_ptr_release(Blob
*b
) { b
->put(); }
538 void dump(Formatter
* f
) const;
539 friend ostream
& operator<<(ostream
& out
, const Blob
&b
);
541 const bluestore_blob_use_tracker_t
& get_blob_use_tracker() const {
544 bool is_referenced() const {
545 return used_in_blob
.is_not_empty();
547 uint32_t get_referenced_bytes() const {
548 return used_in_blob
.get_referenced_bytes();
551 bool is_spanning() const {
555 bool can_split() const {
556 std::lock_guard
l(shared_blob
->get_cache()->lock
);
557 // splitting a BufferSpace writing list is too hard; don't try.
558 return shared_blob
->bc
.writing
.empty() &&
559 used_in_blob
.can_split() &&
560 get_blob().can_split();
563 bool can_split_at(uint32_t blob_offset
) const {
564 return used_in_blob
.can_split_at(blob_offset
) &&
565 get_blob().can_split_at(blob_offset
);
568 bool can_reuse_blob(uint32_t min_alloc_size
,
569 uint32_t target_blob_size
,
574 o
.shared_blob
= shared_blob
;
581 inline const bluestore_blob_t
& get_blob() const {
584 inline bluestore_blob_t
& dirty_blob() {
591 /// discard buffers for unallocated regions
592 void discard_unallocated(Collection
*coll
);
594 /// get logical references
595 void get_ref(Collection
*coll
, uint32_t offset
, uint32_t length
);
596 /// put logical references, and get back any released extents
597 bool put_ref(Collection
*coll
, uint32_t offset
, uint32_t length
,
601 void split(Collection
*coll
, uint32_t blob_offset
, Blob
*o
);
613 void _encode() const {
614 if (blob_bl
.length() == 0 ) {
615 encode(blob
, blob_bl
);
617 ceph_assert(blob_bl
.length());
622 bool include_ref_map
) const {
624 p
+= blob_bl
.length();
625 if (include_ref_map
) {
626 used_in_blob
.bound_encode(p
);
630 bufferlist::contiguous_appender
& p
,
631 bool include_ref_map
) const {
634 if (include_ref_map
) {
635 used_in_blob
.encode(p
);
639 Collection */
*coll*/
,
640 bufferptr::const_iterator
& p
,
641 bool include_ref_map
) {
642 const char *start
= p
.get_pos();
644 const char *end
= p
.get_pos();
646 blob_bl
.append(start
, end
- start
);
647 if (include_ref_map
) {
648 used_in_blob
.decode(p
);
656 bool include_ref_map
) const {
657 denc(blob
, p
, struct_v
);
658 if (blob
.is_shared()) {
661 if (include_ref_map
) {
662 used_in_blob
.bound_encode(p
);
666 bufferlist::contiguous_appender
& p
,
669 bool include_ref_map
) const {
670 denc(blob
, p
, struct_v
);
671 if (blob
.is_shared()) {
674 if (include_ref_map
) {
675 used_in_blob
.encode(p
);
680 bufferptr::const_iterator
& p
,
683 bool include_ref_map
);
686 typedef boost::intrusive_ptr
<Blob
> BlobRef
;
687 typedef mempool::bluestore_cache_other::map
<int,BlobRef
> blob_map_t
;
689 /// a logical extent, pointing to (some portion of) a blob
690 typedef boost::intrusive::set_base_hook
<boost::intrusive::optimize_size
<true> > ExtentBase
; //making an alias to avoid build warnings
691 struct Extent
: public ExtentBase
{
692 MEMPOOL_CLASS_HELPERS();
694 uint32_t logical_offset
= 0; ///< logical offset
695 uint32_t blob_offset
= 0; ///< blob offset
696 uint32_t length
= 0; ///< length
697 BlobRef blob
; ///< the blob with our data
699 /// ctor for lookup only
700 explicit Extent(uint32_t lo
) : ExtentBase(), logical_offset(lo
) { }
701 /// ctor for delayed initialization (see decode_some())
702 explicit Extent() : ExtentBase() {
704 /// ctor for general usage
705 Extent(uint32_t lo
, uint32_t o
, uint32_t l
, BlobRef
& b
)
707 logical_offset(lo
), blob_offset(o
), length(l
) {
712 blob
->shared_blob
->get_cache()->rm_extent();
716 void dump(Formatter
* f
) const;
718 void assign_blob(const BlobRef
& b
) {
721 blob
->shared_blob
->get_cache()->add_extent();
724 // comparators for intrusive_set
725 friend bool operator<(const Extent
&a
, const Extent
&b
) {
726 return a
.logical_offset
< b
.logical_offset
;
728 friend bool operator>(const Extent
&a
, const Extent
&b
) {
729 return a
.logical_offset
> b
.logical_offset
;
731 friend bool operator==(const Extent
&a
, const Extent
&b
) {
732 return a
.logical_offset
== b
.logical_offset
;
735 uint32_t blob_start() const {
736 return logical_offset
- blob_offset
;
739 uint32_t blob_end() const {
740 return blob_start() + blob
->get_blob().get_logical_length();
743 uint32_t logical_end() const {
744 return logical_offset
+ length
;
747 // return true if any piece of the blob is out of
748 // the given range [o, o + l].
749 bool blob_escapes_range(uint32_t o
, uint32_t l
) const {
750 return blob_start() < o
|| blob_end() > o
+ l
;
753 typedef boost::intrusive::set
<Extent
> extent_map_t
;
756 friend ostream
& operator<<(ostream
& out
, const Extent
& e
);
759 boost::intrusive::list_member_hook
<> old_extent_item
;
762 bool blob_empty
; // flag to track the last removed extent that makes blob
763 // empty - required to update compression stat properly
764 OldExtent(uint32_t lo
, uint32_t o
, uint32_t l
, BlobRef
& b
)
765 : e(lo
, o
, l
, b
), blob_empty(false) {
767 static OldExtent
* create(CollectionRef c
,
773 typedef boost::intrusive::list
<
775 boost::intrusive::member_hook
<
777 boost::intrusive::list_member_hook
<>,
778 &OldExtent::old_extent_item
> > old_extent_map_t
;
782 /// a sharded extent map, mapping offsets to lextents to blobs
785 extent_map_t extent_map
; ///< map of Extents to Blobs
786 blob_map_t spanning_blob_map
; ///< blobs that span shards
787 typedef boost::intrusive_ptr
<Onode
> OnodeRef
;
790 bluestore_onode_t::shard_info
*shard_info
= nullptr;
791 unsigned extents
= 0; ///< count extents in this shard
792 bool loaded
= false; ///< true if shard is loaded
793 bool dirty
= false; ///< true if shard is dirty and needs reencoding
795 mempool::bluestore_cache_other::vector
<Shard
> shards
; ///< shards
797 bufferlist inline_bl
; ///< cached encoded map, if unsharded; empty=>dirty
799 uint32_t needs_reshard_begin
= 0;
800 uint32_t needs_reshard_end
= 0;
802 void dup(BlueStore
* b
, TransContext
*, CollectionRef
&, OnodeRef
&, OnodeRef
&,
803 uint64_t&, uint64_t&, uint64_t&);
805 bool needs_reshard() const {
806 return needs_reshard_end
> needs_reshard_begin
;
808 void clear_needs_reshard() {
809 needs_reshard_begin
= needs_reshard_end
= 0;
811 void request_reshard(uint32_t begin
, uint32_t end
) {
812 if (begin
< needs_reshard_begin
) {
813 needs_reshard_begin
= begin
;
815 if (end
> needs_reshard_end
) {
816 needs_reshard_end
= end
;
820 struct DeleteDisposer
{
821 void operator()(Extent
*e
) { delete e
; }
826 extent_map
.clear_and_dispose(DeleteDisposer());
830 extent_map
.clear_and_dispose(DeleteDisposer());
833 clear_needs_reshard();
836 void dump(Formatter
* f
) const;
838 bool encode_some(uint32_t offset
, uint32_t length
, bufferlist
& bl
,
840 unsigned decode_some(bufferlist
& bl
);
842 void bound_encode_spanning_blobs(size_t& p
);
843 void encode_spanning_blobs(bufferlist::contiguous_appender
& p
);
844 void decode_spanning_blobs(bufferptr::const_iterator
& p
);
846 BlobRef
get_spanning_blob(int id
) {
847 auto p
= spanning_blob_map
.find(id
);
848 ceph_assert(p
!= spanning_blob_map
.end());
852 void update(KeyValueDB::Transaction t
, bool force
);
853 decltype(BlueStore::Blob::id
) allocate_spanning_blob_id();
856 KeyValueDB::Transaction t
);
858 /// initialize Shards from the onode
859 void init_shards(bool loaded
, bool dirty
);
861 /// return index of shard containing offset
862 /// or -1 if not found
863 int seek_shard(uint32_t offset
) {
864 size_t end
= shards
.size();
865 size_t mid
, left
= 0;
866 size_t right
= end
; // one passed the right end
868 while (left
< right
) {
869 mid
= left
+ (right
- left
) / 2;
870 if (offset
>= shards
[mid
].shard_info
->offset
) {
871 size_t next
= mid
+ 1;
872 if (next
>= end
|| offset
< shards
[next
].shard_info
->offset
)
874 //continue to search forwards
877 //continue to search backwards
882 return -1; // not found
885 /// check if a range spans a shard
886 bool spans_shard(uint32_t offset
, uint32_t length
) {
887 if (shards
.empty()) {
890 int s
= seek_shard(offset
);
892 if (s
== (int)shards
.size() - 1) {
893 return false; // last shard
895 if (offset
+ length
<= shards
[s
+1].shard_info
->offset
) {
901 /// ensure that a range of the map is loaded
902 void fault_range(KeyValueDB
*db
,
903 uint32_t offset
, uint32_t length
);
905 /// ensure a range of the map is marked dirty
906 void dirty_range(uint32_t offset
, uint32_t length
);
908 /// for seek_lextent test
909 extent_map_t::iterator
find(uint64_t offset
);
911 /// seek to the first lextent including or after offset
912 extent_map_t::iterator
seek_lextent(uint64_t offset
);
913 extent_map_t::const_iterator
seek_lextent(uint64_t offset
) const;
916 void add(uint32_t lo
, uint32_t o
, uint32_t l
, BlobRef
& b
) {
917 extent_map
.insert(*new Extent(lo
, o
, l
, b
));
920 /// remove (and delete) an Extent
921 void rm(extent_map_t::iterator p
) {
922 extent_map
.erase_and_dispose(p
, DeleteDisposer());
925 bool has_any_lextents(uint64_t offset
, uint64_t length
);
927 /// consolidate adjacent lextents in extent_map
928 int compress_extent_map(uint64_t offset
, uint64_t length
);
930 /// punch a logical hole. add lextents to deref to target list.
931 void punch_hole(CollectionRef
&c
,
932 uint64_t offset
, uint64_t length
,
933 old_extent_map_t
*old_extents
);
935 /// put new lextent into lextent_map overwriting existing ones if
936 /// any and update references accordingly
937 Extent
*set_lextent(CollectionRef
&c
,
938 uint64_t logical_offset
,
939 uint64_t offset
, uint64_t length
,
941 old_extent_map_t
*old_extents
);
943 /// split a blob (and referring extents)
944 BlobRef
split_blob(BlobRef lb
, uint32_t blob_offset
, uint32_t pos
);
947 /// Compressed Blob Garbage collector
949 The primary idea of the collector is to estimate a difference between
950 allocation units(AU) currently present for compressed blobs and new AUs
951 required to store that data uncompressed.
952 Estimation is performed for protrusive extents within a logical range
953 determined by a concatenation of old_extents collection and specific(current)
955 The root cause for old_extents use is the need to handle blob ref counts
956 properly. Old extents still hold blob refs and hence we need to traverse
957 the collection to determine if blob to be released.
958 Protrusive extents are extents that fit into the blob set in action
959 (ones that are below the logical range from above) but not removed totally
960 due to the current write.
962 extent1 <loffs = 100, boffs = 100, len = 100> ->
963 blob1<compressed, len_on_disk=4096, logical_len=8192>
964 extent2 <loffs = 200, boffs = 200, len = 100> ->
965 blob2<raw, len_on_disk=4096, llen=4096>
966 extent3 <loffs = 300, boffs = 300, len = 100> ->
967 blob1<compressed, len_on_disk=4096, llen=8192>
968 extent4 <loffs = 4096, boffs = 0, len = 100> ->
969 blob3<raw, len_on_disk=4096, llen=4096>
971 protrusive extents are within the following ranges <0~300, 400~8192-400>
972 In this case existing AUs that might be removed due to GC (i.e. blob1)
974 And new AUs expected after GC = 0 since extent1 to be merged into blob2.
975 Hence we should do a collect.
977 class GarbageCollector
980 /// return amount of allocation units that might be saved due to GC
984 const ExtentMap
& extent_map
,
985 const old_extent_map_t
& old_extents
,
986 uint64_t min_alloc_size
);
988 /// return a collection of extents to perform GC on
989 const interval_set
<uint64_t>& get_extents_to_collect() const {
990 return extents_to_collect
;
992 GarbageCollector(CephContext
* _cct
) : cct(_cct
) {}
996 uint64_t referenced_bytes
= 0; ///< amount of bytes referenced in blob
997 int64_t expected_allocations
= 0; ///< new alloc units required
998 ///< in case of gc fulfilled
999 bool collect_candidate
= false; ///< indicate if blob has any extents
1000 ///< eligible for GC.
1001 extent_map_t::const_iterator first_lextent
; ///< points to the first
1002 ///< lextent referring to
1003 ///< the blob if any.
1004 ///< collect_candidate flag
1005 ///< determines the validity
1006 extent_map_t::const_iterator last_lextent
; ///< points to the last
1007 ///< lextent referring to
1008 ///< the blob if any.
1010 BlobInfo(uint64_t ref_bytes
) :
1011 referenced_bytes(ref_bytes
) {
1015 map
<Blob
*, BlobInfo
> affected_blobs
; ///< compressed blobs and their ref_map
1016 ///< copies that are affected by the
1019 ///< protrusive extents that should be collected if GC takes place
1020 interval_set
<uint64_t> extents_to_collect
;
1022 boost::optional
<uint64_t > used_alloc_unit
; ///< last processed allocation
1023 ///< unit when traversing
1024 ///< protrusive extents.
1025 ///< Other extents mapped to
1026 ///< this AU to be ignored
1027 ///< (except the case where
1028 ///< uncompressed extent follows
1029 ///< compressed one - see below).
1030 BlobInfo
* blob_info_counted
= nullptr; ///< set if previous allocation unit
1031 ///< caused expected_allocations
1032 ///< counter increment at this blob.
1033 ///< if uncompressed extent follows
1034 ///< a decrement for the
1035 ///< expected_allocations counter
1037 int64_t expected_allocations
= 0; ///< new alloc units required in case
1038 ///< of gc fulfilled
1039 int64_t expected_for_release
= 0; ///< alloc units currently used by
1040 ///< compressed blobs that might
1044 void process_protrusive_extents(const BlueStore::ExtentMap
& extent_map
,
1045 uint64_t start_offset
,
1046 uint64_t end_offset
,
1047 uint64_t start_touch_offset
,
1048 uint64_t end_touch_offset
,
1049 uint64_t min_alloc_size
);
1053 struct OnodeCacheShard
;
1054 /// an in-memory object
1056 MEMPOOL_CLASS_HELPERS();
1057 // Not persisted and updated on cache insertion/removal
1059 bool pinned
= false; // Only to be used by the onode cache shard
1061 std::atomic_int nref
; ///< reference count
1065 /// key under PREFIX_OBJ where we are stored
1066 mempool::bluestore_cache_other::string key
;
1068 boost::intrusive::list_member_hook
<> lru_item
, pin_item
;
1070 bluestore_onode_t onode
; ///< metadata stored as value in kv store
1071 bool exists
; ///< true if object logically exists
1073 ExtentMap extent_map
;
1075 // track txc's that have not been committed to kv store (and whose
1076 // effects cannot be read via the kvdb read methods)
1077 std::atomic
<int> flushing_count
= {0};
1078 std::atomic
<int> waiting_count
= {0};
1079 /// protect flush_txns
1080 ceph::mutex flush_lock
= ceph::make_mutex("BlueStore::Onode::flush_lock");
1081 ceph::condition_variable flush_cond
; ///< wait here for uncommitted txns
1083 Onode(Collection
*c
, const ghobject_t
& o
,
1084 const mempool::bluestore_cache_other::string
& k
)
1093 Onode(Collection
* c
, const ghobject_t
& o
,
1103 Onode(Collection
* c
, const ghobject_t
& o
,
1114 static Onode
* decode(
1116 const ghobject_t
& oid
,
1118 const bufferlist
& v
);
1120 void dump(Formatter
* f
) const;
1124 if (++nref
== 2 && s
!= nullptr) {
1130 if (n
== 1 && s
!= nullptr) {
1138 const string
& get_omap_prefix();
1139 void get_omap_header(string
*out
);
1140 void get_omap_key(const string
& key
, string
*out
);
1141 void rewrite_omap_key(const string
& old
, string
*out
);
1142 void get_omap_tail(string
*out
);
1143 void decode_omap_key(const string
& key
, string
*user_key
);
1145 typedef boost::intrusive_ptr
<Onode
> OnodeRef
;
1147 /// A generic Cache Shard
1150 PerfCounters
*logger
;
1152 /// protect lru and other structures
1153 ceph::recursive_mutex lock
= {
1154 ceph::make_recursive_mutex("BlueStore::CacheShard::lock") };
1156 std::atomic
<uint64_t> max
= {0};
1157 std::atomic
<uint64_t> num
= {0};
1159 CacheShard(CephContext
* cct
) : cct(cct
), logger(nullptr) {}
1160 virtual ~CacheShard() {}
1162 void set_max(uint64_t max_
) {
1166 uint64_t _get_num() {
1170 virtual void _trim_to(uint64_t new_size
) = 0;
1172 if (cct
->_conf
->objectstore_blackhole
) {
1173 // do not trim if we are throwing away IOs a layer down
1180 std::lock_guard
l(lock
);
1184 std::lock_guard
l(lock
);
1185 // we should not be shutting down after the blackhole is enabled
1186 assert(!cct
->_conf
->objectstore_blackhole
);
1191 virtual void _audit(const char *s
) = 0;
1193 void _audit(const char *s
) { /* no-op */ }
1197 /// A Generic onode Cache Shard
1198 struct OnodeCacheShard
: public CacheShard
{
1199 std::atomic
<uint64_t> num_pinned
= {0};
1201 std::array
<std::pair
<ghobject_t
, mono_clock::time_point
>, 64> dumped_onodes
;
1203 OnodeCacheShard(CephContext
* cct
) : CacheShard(cct
) {}
1204 static OnodeCacheShard
*create(CephContext
* cct
, string type
,
1205 PerfCounters
*logger
);
1206 virtual void _add(OnodeRef
& o
, int level
) = 0;
1207 virtual void _rm(OnodeRef
& o
) = 0;
1208 virtual void _touch(OnodeRef
& o
) = 0;
1209 virtual void _pin(Onode
& o
) = 0;
1210 virtual void _unpin(Onode
& o
) = 0;
1212 void pin(Onode
& o
) {
1213 std::lock_guard
l(lock
);
1217 void unpin(Onode
& o
) {
1218 std::lock_guard
l(lock
);
1222 virtual void add_stats(uint64_t *onodes
, uint64_t *pinned_onodes
) = 0;
1224 return _get_num() == 0;
1228 /// A Generic buffer Cache Shard
1229 struct BufferCacheShard
: public CacheShard
{
1230 std::atomic
<uint64_t> num_extents
= {0};
1231 std::atomic
<uint64_t> num_blobs
= {0};
1232 uint64_t buffer_bytes
= 0;
1235 BufferCacheShard(CephContext
* cct
) : CacheShard(cct
) {}
1236 static BufferCacheShard
*create(CephContext
* cct
, string type
,
1237 PerfCounters
*logger
);
1238 virtual void _add(Buffer
*b
, int level
, Buffer
*near
) = 0;
1239 virtual void _rm(Buffer
*b
) = 0;
1240 virtual void _move(BufferCacheShard
*src
, Buffer
*b
) = 0;
1241 virtual void _touch(Buffer
*b
) = 0;
1242 virtual void _adjust_size(Buffer
*b
, int64_t delta
) = 0;
1244 uint64_t _get_bytes() {
1245 return buffer_bytes
;
1262 virtual void add_stats(uint64_t *extents
,
1265 uint64_t *bytes
) = 0;
1268 std::lock_guard
l(lock
);
1269 return _get_bytes() == 0;
1274 OnodeCacheShard
*cache
;
1278 mempool::bluestore_cache_other::unordered_map
<ghobject_t
,OnodeRef
> onode_map
;
1280 friend class Collection
; // for split_cache()
1283 OnodeSpace(OnodeCacheShard
*c
) : cache(c
) {}
1288 OnodeRef
add(const ghobject_t
& oid
, OnodeRef o
);
1289 OnodeRef
lookup(const ghobject_t
& o
);
1290 void remove(const ghobject_t
& oid
) {
1291 onode_map
.erase(oid
);
1293 void rename(OnodeRef
& o
, const ghobject_t
& old_oid
,
1294 const ghobject_t
& new_oid
,
1295 const mempool::bluestore_cache_other::string
& new_okey
);
1299 template <int LogLevelV
>
1300 void dump(CephContext
*cct
);
1302 /// return true if f true for any item
1303 bool map_any(std::function
<bool(OnodeRef
)> f
);
1307 using OpSequencerRef
= ceph::ref_t
<OpSequencer
>;
1309 struct Collection
: public CollectionImpl
{
1312 BufferCacheShard
*cache
; ///< our cache shard
1313 bluestore_cnode_t cnode
;
1314 ceph::shared_mutex lock
=
1315 ceph::make_shared_mutex("BlueStore::Collection::lock", true, false);
1319 SharedBlobSet shared_blob_set
; ///< open SharedBlobs
1321 // cache onodes on a per-collection basis to avoid lock
1323 OnodeSpace onode_map
;
1326 pool_opts_t pool_opts
;
1327 ContextQueue
*commit_queue
;
1329 OnodeRef
get_onode(const ghobject_t
& oid
, bool create
, bool is_createop
=false);
1331 // the terminology is confusing here, sorry!
1333 // blob_t shared_blob_t
1334 // !shared unused -> open
1335 // shared !loaded -> open + shared
1336 // shared loaded -> open + shared + loaded
1339 // open = SharedBlob is instantiated
1340 // shared = blob_t shared flag is set; SharedBlob is hashed.
1341 // loaded = SharedBlob::shared_blob_t is loaded from kv store
1342 void open_shared_blob(uint64_t sbid
, BlobRef b
);
1343 void load_shared_blob(SharedBlobRef sb
);
1344 void make_blob_shared(uint64_t sbid
, BlobRef b
);
1345 uint64_t make_blob_unshared(SharedBlob
*sb
);
1347 BlobRef
new_blob() {
1348 BlobRef b
= new Blob();
1349 b
->shared_blob
= new SharedBlob(this);
1353 bool contains(const ghobject_t
& oid
) {
1355 return oid
.hobj
.pool
== -1;
1357 if (cid
.is_pg(&spgid
))
1359 spgid
.pgid
.contains(cnode
.bits
, oid
) &&
1360 oid
.shard_id
== spgid
.shard
;
1364 int64_t pool() const {
1368 void split_cache(Collection
*dest
);
1370 bool flush_commit(Context
*c
) override
;
1371 void flush() override
;
1372 void flush_all_but_last();
1374 Collection(BlueStore
*ns
, OnodeCacheShard
*oc
, BufferCacheShard
*bc
, coll_t c
);
1377 class OmapIteratorImpl
: public ObjectMap::ObjectMapIteratorImpl
{
1380 KeyValueDB::Iterator it
;
1383 string
_stringify() const;
1386 OmapIteratorImpl(CollectionRef c
, OnodeRef o
, KeyValueDB::Iterator it
);
1387 int seek_to_first() override
;
1388 int upper_bound(const string
&after
) override
;
1389 int lower_bound(const string
&to
) override
;
1390 bool valid() override
;
1391 int next() override
;
1392 string
key() override
;
1393 bufferlist
value() override
;
1394 std::string
tail_key() {
1398 int status() override
{
1403 struct volatile_statfs
{
1405 STATFS_ALLOCATED
= 0,
1407 STATFS_COMPRESSED_ORIGINAL
,
1409 STATFS_COMPRESSED_ALLOCATED
,
1412 int64_t values
[STATFS_LAST
];
1414 memset(this, 0, sizeof(volatile_statfs
));
1417 *this = volatile_statfs();
1419 void publish(store_statfs_t
* buf
) const {
1420 buf
->allocated
= allocated();
1421 buf
->data_stored
= stored();
1422 buf
->data_compressed
= compressed();
1423 buf
->data_compressed_original
= compressed_original();
1424 buf
->data_compressed_allocated
= compressed_allocated();
1427 volatile_statfs
& operator+=(const volatile_statfs
& other
) {
1428 for (size_t i
= 0; i
< STATFS_LAST
; ++i
) {
1429 values
[i
] += other
.values
[i
];
1433 int64_t& allocated() {
1434 return values
[STATFS_ALLOCATED
];
1437 return values
[STATFS_STORED
];
1439 int64_t& compressed_original() {
1440 return values
[STATFS_COMPRESSED_ORIGINAL
];
1442 int64_t& compressed() {
1443 return values
[STATFS_COMPRESSED
];
1445 int64_t& compressed_allocated() {
1446 return values
[STATFS_COMPRESSED_ALLOCATED
];
1448 int64_t allocated() const {
1449 return values
[STATFS_ALLOCATED
];
1451 int64_t stored() const {
1452 return values
[STATFS_STORED
];
1454 int64_t compressed_original() const {
1455 return values
[STATFS_COMPRESSED_ORIGINAL
];
1457 int64_t compressed() const {
1458 return values
[STATFS_COMPRESSED
];
1460 int64_t compressed_allocated() const {
1461 return values
[STATFS_COMPRESSED_ALLOCATED
];
1463 volatile_statfs
& operator=(const store_statfs_t
& st
) {
1464 values
[STATFS_ALLOCATED
] = st
.allocated
;
1465 values
[STATFS_STORED
] = st
.data_stored
;
1466 values
[STATFS_COMPRESSED_ORIGINAL
] = st
.data_compressed_original
;
1467 values
[STATFS_COMPRESSED
] = st
.data_compressed
;
1468 values
[STATFS_COMPRESSED_ALLOCATED
] = st
.data_compressed_allocated
;
1472 return values
[STATFS_ALLOCATED
] == 0 &&
1473 values
[STATFS_STORED
] == 0 &&
1474 values
[STATFS_COMPRESSED
] == 0 &&
1475 values
[STATFS_COMPRESSED_ORIGINAL
] == 0 &&
1476 values
[STATFS_COMPRESSED_ALLOCATED
] == 0;
1478 void decode(bufferlist::const_iterator
& it
) {
1480 for (size_t i
= 0; i
< STATFS_LAST
; i
++) {
1481 decode(values
[i
], it
);
1485 void encode(bufferlist
& bl
) {
1487 for (size_t i
= 0; i
< STATFS_LAST
; i
++) {
1488 encode(values
[i
], bl
);
1493 struct TransContext final
: public AioContext
{
1494 MEMPOOL_CLASS_HELPERS();
1500 STATE_KV_QUEUED
, // queued for kv_sync_thread submission
1501 STATE_KV_SUBMITTED
, // submitted to kv; not yet synced
1503 STATE_DEFERRED_QUEUED
, // in deferred_queue (pending or running)
1504 STATE_DEFERRED_CLEANUP
, // remove deferred kv record
1505 STATE_DEFERRED_DONE
,
1510 state_t state
= STATE_PREPARE
;
1512 const char *get_state_name() {
1514 case STATE_PREPARE
: return "prepare";
1515 case STATE_AIO_WAIT
: return "aio_wait";
1516 case STATE_IO_DONE
: return "io_done";
1517 case STATE_KV_QUEUED
: return "kv_queued";
1518 case STATE_KV_SUBMITTED
: return "kv_submitted";
1519 case STATE_KV_DONE
: return "kv_done";
1520 case STATE_DEFERRED_QUEUED
: return "deferred_queued";
1521 case STATE_DEFERRED_CLEANUP
: return "deferred_cleanup";
1522 case STATE_DEFERRED_DONE
: return "deferred_done";
1523 case STATE_FINISHING
: return "finishing";
1524 case STATE_DONE
: return "done";
1529 #if defined(WITH_LTTNG)
1530 const char *get_state_latency_name(int state
) {
1532 case l_bluestore_state_prepare_lat
: return "prepare";
1533 case l_bluestore_state_aio_wait_lat
: return "aio_wait";
1534 case l_bluestore_state_io_done_lat
: return "io_done";
1535 case l_bluestore_state_kv_queued_lat
: return "kv_queued";
1536 case l_bluestore_state_kv_committing_lat
: return "kv_committing";
1537 case l_bluestore_state_kv_done_lat
: return "kv_done";
1538 case l_bluestore_state_deferred_queued_lat
: return "deferred_queued";
1539 case l_bluestore_state_deferred_cleanup_lat
: return "deferred_cleanup";
1540 case l_bluestore_state_finishing_lat
: return "finishing";
1541 case l_bluestore_state_done_lat
: return "done";
1548 OpSequencerRef osr
; // this should be ch->osr
1549 boost::intrusive::list_member_hook
<> sequencer_item
;
1551 uint64_t bytes
= 0, ios
= 0, cost
= 0;
1553 set
<OnodeRef
> onodes
; ///< these need to be updated/written
1554 set
<OnodeRef
> modified_objects
; ///< objects we modified (and need a ref)
1555 set
<SharedBlobRef
> shared_blobs
; ///< these need to be updated/written
1556 set
<SharedBlobRef
> shared_blobs_written
; ///< update these on io completion
1558 KeyValueDB::Transaction t
; ///< then we will commit this
1559 list
<Context
*> oncommits
; ///< more commit completions
1560 list
<CollectionRef
> removed_collections
; ///< colls we removed
1562 boost::intrusive::list_member_hook
<> deferred_queue_item
;
1563 bluestore_deferred_transaction_t
*deferred_txn
= nullptr; ///< if any
1565 interval_set
<uint64_t> allocated
, released
;
1566 volatile_statfs statfs_delta
; ///< overall store statistics delta
1567 uint64_t osd_pool_id
= META_POOL_ID
; ///< osd pool id we're operating on
1570 bool had_ios
= false; ///< true if we submitted IOs before our kv txn
1573 mono_clock::time_point start
;
1574 mono_clock::time_point last_stamp
;
1576 uint64_t last_nid
= 0; ///< if non-zero, highest new nid we allocated
1577 uint64_t last_blobid
= 0; ///< if non-zero, highest new blobid we allocated
1579 #if defined(WITH_LTTNG)
1580 bool tracing
= false;
1583 explicit TransContext(CephContext
* cct
, Collection
*c
, OpSequencer
*o
,
1584 list
<Context
*> *on_commits
)
1588 start(mono_clock::now()) {
1591 oncommits
.swap(*on_commits
);
1595 delete deferred_txn
;
1598 void write_onode(OnodeRef
&o
) {
1601 void write_shared_blob(SharedBlobRef
&sb
) {
1602 shared_blobs
.insert(sb
);
1604 void unshare_blob(SharedBlob
*sb
) {
1605 shared_blobs
.erase(sb
);
1608 /// note we logically modified object (when onode itself is unmodified)
1609 void note_modified_object(OnodeRef
&o
) {
1610 // onode itself isn't written, though
1611 modified_objects
.insert(o
);
1613 void note_removed_object(OnodeRef
& o
) {
1615 modified_objects
.insert(o
);
1618 void aio_finish(BlueStore
*store
) override
{
1619 store
->txc_aio_finish(this);
1623 class BlueStoreThrottle
{
1624 #if defined(WITH_LTTNG)
1625 const std::chrono::time_point
<mono_clock
> time_base
= mono_clock::now();
1627 // Time of last chosen io (microseconds)
1628 std::atomic
<uint64_t> previous_emitted_tp_time_mono_mcs
= {0};
1629 std::atomic
<uint64_t> ios_started_since_last_traced
= {0};
1630 std::atomic
<uint64_t> ios_completed_since_last_traced
= {0};
1632 std::atomic_uint pending_kv_ios
= {0};
1633 std::atomic_uint pending_deferred_ios
= {0};
1635 // Min period between trace points (microseconds)
1636 std::atomic
<uint64_t> trace_period_mcs
= {0};
1640 uint64_t *completed
) {
1641 uint64_t min_period_mcs
= trace_period_mcs
.load(
1642 std::memory_order_relaxed
);
1644 if (min_period_mcs
== 0) {
1646 *completed
= ios_completed_since_last_traced
.exchange(0);
1649 ios_started_since_last_traced
++;
1650 auto now_mcs
= ceph::to_microseconds
<uint64_t>(
1651 mono_clock::now() - time_base
);
1652 uint64_t previous_mcs
= previous_emitted_tp_time_mono_mcs
;
1653 uint64_t period_mcs
= now_mcs
- previous_mcs
;
1654 if (period_mcs
> min_period_mcs
) {
1655 if (previous_emitted_tp_time_mono_mcs
.compare_exchange_strong(
1656 previous_mcs
, now_mcs
)) {
1657 // This would be racy at a sufficiently extreme trace rate, but isn't
1658 // worth the overhead of doing it more carefully.
1659 *started
= ios_started_since_last_traced
.exchange(0);
1660 *completed
= ios_completed_since_last_traced
.exchange(0);
1669 #if defined(WITH_LTTNG)
1670 void emit_initial_tracepoint(
1673 mono_clock::time_point
);
1675 void emit_initial_tracepoint(
1678 mono_clock::time_point
) {}
1681 Throttle throttle_bytes
; ///< submit to commit
1682 Throttle throttle_deferred_bytes
; ///< submit to deferred complete
1685 BlueStoreThrottle(CephContext
*cct
) :
1686 throttle_bytes(cct
, "bluestore_throttle_bytes", 0),
1687 throttle_deferred_bytes(cct
, "bluestore_throttle_deferred_bytes", 0)
1689 reset_throttle(cct
->_conf
);
1692 #if defined(WITH_LTTNG)
1693 void complete_kv(TransContext
&txc
);
1694 void complete(TransContext
&txc
);
1696 void complete_kv(TransContext
&txc
) {}
1697 void complete(TransContext
&txc
) {}
1700 mono_clock::duration
log_state_latency(
1701 TransContext
&txc
, PerfCounters
*logger
, int state
);
1702 bool try_start_transaction(
1705 mono_clock::time_point
);
1706 void finish_start_transaction(
1709 mono_clock::time_point
);
1710 void release_kv_throttle(uint64_t cost
) {
1711 throttle_bytes
.put(cost
);
1713 void release_deferred_throttle(uint64_t cost
) {
1714 throttle_deferred_bytes
.put(cost
);
1716 bool should_submit_deferred() {
1717 return throttle_deferred_bytes
.past_midpoint();
1719 void reset_throttle(const ConfigProxy
&conf
) {
1720 throttle_bytes
.reset_max(conf
->bluestore_throttle_bytes
);
1721 throttle_deferred_bytes
.reset_max(
1722 conf
->bluestore_throttle_bytes
+
1723 conf
->bluestore_throttle_deferred_bytes
);
1724 #if defined(WITH_LTTNG)
1725 double rate
= conf
.get_val
<double>("bluestore_throttle_trace_rate");
1726 trace_period_mcs
= rate
> 0 ? floor((1/rate
) * 1000000.0) : 0;
1731 typedef boost::intrusive::list
<
1733 boost::intrusive::member_hook
<
1735 boost::intrusive::list_member_hook
<>,
1736 &TransContext::deferred_queue_item
> > deferred_queue_t
;
1738 struct DeferredBatch final
: public AioContext
{
1740 struct deferred_io
{
1741 bufferlist bl
; ///< data
1742 uint64_t seq
; ///< deferred transaction seq
1744 map
<uint64_t,deferred_io
> iomap
; ///< map of ios in this batch
1745 deferred_queue_t txcs
; ///< txcs in this batch
1746 IOContext ioc
; ///< our aios
1747 /// bytes of pending io for each deferred seq (may be 0)
1748 map
<uint64_t,int> seq_bytes
;
1750 void _discard(CephContext
*cct
, uint64_t offset
, uint64_t length
);
1751 void _audit(CephContext
*cct
);
1753 DeferredBatch(CephContext
*cct
, OpSequencer
*osr
)
1754 : osr(osr
), ioc(cct
, this) {}
1757 void prepare_write(CephContext
*cct
,
1758 uint64_t seq
, uint64_t offset
, uint64_t length
,
1759 bufferlist::const_iterator
& p
);
1761 void aio_finish(BlueStore
*store
) override
{
1762 store
->_deferred_aio_finish(osr
);
1766 class OpSequencer
: public RefCountedObject
{
1768 ceph::mutex qlock
= ceph::make_mutex("BlueStore::OpSequencer::qlock");
1769 ceph::condition_variable qcond
;
1770 typedef boost::intrusive::list
<
1772 boost::intrusive::member_hook
<
1774 boost::intrusive::list_member_hook
<>,
1775 &TransContext::sequencer_item
> > q_list_t
;
1776 q_list_t q
; ///< transactions
1778 boost::intrusive::list_member_hook
<> deferred_osr_queue_item
;
1780 DeferredBatch
*deferred_running
= nullptr;
1781 DeferredBatch
*deferred_pending
= nullptr;
1786 uint64_t last_seq
= 0;
1788 std::atomic_int txc_with_unstable_io
= {0}; ///< num txcs with unstable io
1790 std::atomic_int kv_committing_serially
= {0};
1792 std::atomic_int kv_submitted_waiters
= {0};
1794 std::atomic_bool zombie
= {false}; ///< in zombie_osr set (collection going away)
1796 const uint32_t sequencer_id
;
1798 uint32_t get_sequencer_id() const {
1799 return sequencer_id
;
1802 void queue_new(TransContext
*txc
) {
1803 std::lock_guard
l(qlock
);
1804 txc
->seq
= ++last_seq
;
1809 std::unique_lock
l(qlock
);
1814 void drain_preceding(TransContext
*txc
) {
1815 std::unique_lock
l(qlock
);
1816 while (&q
.front() != txc
)
1820 bool _is_all_kv_submitted() {
1821 // caller must hold qlock & q.empty() must not empty
1822 ceph_assert(!q
.empty());
1823 TransContext
*txc
= &q
.back();
1824 if (txc
->state
>= TransContext::STATE_KV_SUBMITTED
) {
1831 std::unique_lock
l(qlock
);
1833 // set flag before the check because the condition
1834 // may become true outside qlock, and we need to make
1835 // sure those threads see waiters and signal qcond.
1836 ++kv_submitted_waiters
;
1837 if (q
.empty() || _is_all_kv_submitted()) {
1838 --kv_submitted_waiters
;
1842 --kv_submitted_waiters
;
1846 void flush_all_but_last() {
1847 std::unique_lock
l(qlock
);
1848 assert (q
.size() >= 1);
1850 // set flag before the check because the condition
1851 // may become true outside qlock, and we need to make
1852 // sure those threads see waiters and signal qcond.
1853 ++kv_submitted_waiters
;
1854 if (q
.size() <= 1) {
1855 --kv_submitted_waiters
;
1858 auto it
= q
.rbegin();
1860 if (it
->state
>= TransContext::STATE_KV_SUBMITTED
) {
1861 --kv_submitted_waiters
;
1866 --kv_submitted_waiters
;
1870 bool flush_commit(Context
*c
) {
1871 std::lock_guard
l(qlock
);
1875 TransContext
*txc
= &q
.back();
1876 if (txc
->state
>= TransContext::STATE_KV_DONE
) {
1879 txc
->oncommits
.push_back(c
);
1883 FRIEND_MAKE_REF(OpSequencer
);
1884 OpSequencer(BlueStore
*store
, uint32_t sequencer_id
, const coll_t
& c
)
1885 : RefCountedObject(store
->cct
),
1886 store(store
), cid(c
), sequencer_id(sequencer_id
) {
1889 ceph_assert(q
.empty());
1893 typedef boost::intrusive::list
<
1895 boost::intrusive::member_hook
<
1897 boost::intrusive::list_member_hook
<>,
1898 &OpSequencer::deferred_osr_queue_item
> > deferred_osr_queue_t
;
1900 struct KVSyncThread
: public Thread
{
1902 explicit KVSyncThread(BlueStore
*s
) : store(s
) {}
1903 void *entry() override
{
1904 store
->_kv_sync_thread();
1908 struct KVFinalizeThread
: public Thread
{
1910 explicit KVFinalizeThread(BlueStore
*s
) : store(s
) {}
1912 store
->_kv_finalize_thread();
1917 struct DBHistogram
{
1926 map
<int, struct value_dist
> val_map
; ///< slab id to count, max length of value and key
1929 map
<string
, map
<int, struct key_dist
> > key_hist
;
1930 map
<int, uint64_t> value_hist
;
1931 int get_key_slab(size_t sz
);
1932 string
get_key_slab_to_range(int slab
);
1933 int get_value_slab(size_t sz
);
1934 string
get_value_slab_to_range(int slab
);
1935 void update_hist_entry(map
<string
, map
<int, struct key_dist
> > &key_hist
,
1936 const string
&prefix
, size_t key_size
, size_t value_size
);
1937 void dump(Formatter
*f
);
1940 // --------------------------------------------------------
1943 BlueFS
*bluefs
= nullptr;
1944 bluefs_layout_t bluefs_layout
;
1945 mono_time bluefs_last_balance
;
1946 utime_t next_dump_on_bluefs_alloc_failure
;
1948 KeyValueDB
*db
= nullptr;
1949 BlockDevice
*bdev
= nullptr;
1950 std::string freelist_type
;
1951 FreelistManager
*fm
= nullptr;
1952 Allocator
*alloc
= nullptr;
1954 int path_fd
= -1; ///< open handle to $path
1955 int fsid_fd
= -1; ///< open handle (locked) to $path/fsid
1956 bool mounted
= false;
1958 ceph::shared_mutex coll_lock
= ceph::make_shared_mutex("BlueStore::coll_lock"); ///< rwlock to protect coll_map
1959 mempool::bluestore_cache_other::unordered_map
<coll_t
, CollectionRef
> coll_map
;
1960 bool collections_had_errors
= false;
1961 map
<coll_t
,CollectionRef
> new_coll_map
;
1963 vector
<OnodeCacheShard
*> onode_cache_shards
;
1964 vector
<BufferCacheShard
*> buffer_cache_shards
;
1966 /// protect zombie_osr_set
1967 ceph::mutex zombie_osr_lock
= ceph::make_mutex("BlueStore::zombie_osr_lock");
1968 uint32_t next_sequencer_id
= 0;
1969 std::map
<coll_t
,OpSequencerRef
> zombie_osr_set
; ///< set of OpSequencers for deleted collections
1971 std::atomic
<uint64_t> nid_last
= {0};
1972 std::atomic
<uint64_t> nid_max
= {0};
1973 std::atomic
<uint64_t> blobid_last
= {0};
1974 std::atomic
<uint64_t> blobid_max
= {0};
1976 interval_set
<uint64_t> bluefs_extents
; ///< block extents owned by bluefs
1977 interval_set
<uint64_t> bluefs_extents_reclaiming
; ///< currently reclaiming
1979 ceph::mutex deferred_lock
= ceph::make_mutex("BlueStore::deferred_lock");
1980 std::atomic
<uint64_t> deferred_seq
= {0};
1981 deferred_osr_queue_t deferred_queue
; ///< osr's with deferred io pending
1982 int deferred_queue_size
= 0; ///< num txc's queued across all osrs
1983 atomic_int deferred_aggressive
= {0}; ///< aggressive wakeup of kv thread
1985 utime_t deferred_last_submitted
= utime_t();
1987 KVSyncThread kv_sync_thread
;
1988 ceph::mutex kv_lock
= ceph::make_mutex("BlueStore::kv_lock");
1989 ceph::condition_variable kv_cond
;
1990 bool _kv_only
= false;
1991 bool kv_sync_started
= false;
1992 bool kv_stop
= false;
1993 bool kv_finalize_started
= false;
1994 bool kv_finalize_stop
= false;
1995 deque
<TransContext
*> kv_queue
; ///< ready, already submitted
1996 deque
<TransContext
*> kv_queue_unsubmitted
; ///< ready, need submit by kv thread
1997 deque
<TransContext
*> kv_committing
; ///< currently syncing
1998 deque
<DeferredBatch
*> deferred_done_queue
; ///< deferred ios done
1999 bool kv_sync_in_progress
= false;
2001 KVFinalizeThread kv_finalize_thread
;
2002 ceph::mutex kv_finalize_lock
= ceph::make_mutex("BlueStore::kv_finalize_lock");
2003 ceph::condition_variable kv_finalize_cond
;
2004 deque
<TransContext
*> kv_committing_to_finalize
; ///< pending finalization
2005 deque
<DeferredBatch
*> deferred_stable_to_finalize
; ///< pending finalization
2006 bool kv_finalize_in_progress
= false;
2008 PerfCounters
*logger
= nullptr;
2010 list
<CollectionRef
> removed_collections
;
2012 ceph::shared_mutex debug_read_error_lock
=
2013 ceph::make_shared_mutex("BlueStore::debug_read_error_lock");
2014 set
<ghobject_t
> debug_data_error_objects
;
2015 set
<ghobject_t
> debug_mdata_error_objects
;
2017 std::atomic
<int> csum_type
= {Checksummer::CSUM_CRC32C
};
2019 uint64_t block_size
= 0; ///< block size of block device (power of 2)
2020 uint64_t block_mask
= 0; ///< mask to get just the block offset
2021 size_t block_size_order
= 0; ///< bits to shift to get block size
2023 uint64_t min_alloc_size
; ///< minimum allocation unit (power of 2)
2024 ///< bits for min_alloc_size
2025 uint8_t min_alloc_size_order
= 0;
2026 static_assert(std::numeric_limits
<uint8_t>::max() >
2027 std::numeric_limits
<decltype(min_alloc_size
)>::digits
,
2028 "not enough bits for min_alloc_size");
2030 bool per_pool_omap
= false;
2032 ///< maximum allocation unit (power of 2)
2033 std::atomic
<uint64_t> max_alloc_size
= {0};
2035 ///< number threshold for forced deferred writes
2036 std::atomic
<int> deferred_batch_ops
= {0};
2038 ///< size threshold for forced deferred writes
2039 std::atomic
<uint64_t> prefer_deferred_size
= {0};
2041 ///< approx cost per io, in bytes
2042 std::atomic
<uint64_t> throttle_cost_per_io
= {0};
2044 std::atomic
<Compressor::CompressionMode
> comp_mode
=
2045 {Compressor::COMP_NONE
}; ///< compression mode
2046 CompressorRef compressor
;
2047 std::atomic
<uint64_t> comp_min_blob_size
= {0};
2048 std::atomic
<uint64_t> comp_max_blob_size
= {0};
2050 std::atomic
<uint64_t> max_blob_size
= {0}; ///< maximum blob size
2052 uint64_t kv_ios
= 0;
2053 uint64_t kv_throttle_costs
= 0;
2055 // cache trim control
2056 uint64_t cache_size
= 0; ///< total cache size
2057 double cache_meta_ratio
= 0; ///< cache ratio dedicated to metadata
2058 double cache_kv_ratio
= 0; ///< cache ratio dedicated to kv (e.g., rocksdb)
2059 double cache_data_ratio
= 0; ///< cache ratio dedicated to object data
2060 bool cache_autotune
= false; ///< cache autotune setting
2061 double cache_autotune_interval
= 0; ///< time to wait between cache rebalancing
2062 uint64_t osd_memory_target
= 0; ///< OSD memory target when autotuning cache
2063 uint64_t osd_memory_base
= 0; ///< OSD base memory when autotuning cache
2064 double osd_memory_expected_fragmentation
= 0; ///< expected memory fragmentation
2065 uint64_t osd_memory_cache_min
= 0; ///< Min memory to assign when autotuning cache
2066 double osd_memory_cache_resize_interval
= 0; ///< Time to wait between cache resizing
2067 double max_defer_interval
= 0; ///< Time to wait between last deferred submit
2068 std::atomic
<uint32_t> config_changed
= {0}; ///< Counter to determine if there is a configuration change.
2070 typedef map
<uint64_t, volatile_statfs
> osd_pools_map
;
2072 ceph::mutex vstatfs_lock
= ceph::make_mutex("BlueStore::vstatfs_lock");
2073 volatile_statfs vstatfs
;
2074 osd_pools_map osd_pools
; // protected by vstatfs_lock as well
2076 bool per_pool_stat_collection
= true;
2078 struct MempoolThread
: public Thread
{
2082 ceph::condition_variable cond
;
2083 ceph::mutex lock
= ceph::make_mutex("BlueStore::MempoolThread::lock");
2085 std::shared_ptr
<PriorityCache::PriCache
> binned_kv_cache
= nullptr;
2086 std::shared_ptr
<PriorityCache::Manager
> pcm
= nullptr;
2088 struct MempoolCache
: public PriorityCache::PriCache
{
2090 int64_t cache_bytes
[PriorityCache::Priority::LAST
+1] = {0};
2091 int64_t committed_bytes
= 0;
2092 double cache_ratio
= 0;
2094 MempoolCache(BlueStore
*s
) : store(s
) {};
2096 virtual uint64_t _get_used_bytes() const = 0;
2098 virtual int64_t request_cache_bytes(
2099 PriorityCache::Priority pri
, uint64_t total_cache
) const {
2100 int64_t assigned
= get_cache_bytes(pri
);
2103 // All cache items are currently shoved into the PRI1 priority
2104 case PriorityCache::Priority::PRI1
:
2106 int64_t request
= _get_used_bytes();
2107 return(request
> assigned
) ? request
- assigned
: 0;
2115 virtual int64_t get_cache_bytes(PriorityCache::Priority pri
) const {
2116 return cache_bytes
[pri
];
2118 virtual int64_t get_cache_bytes() const {
2121 for (int i
= 0; i
< PriorityCache::Priority::LAST
+ 1; i
++) {
2122 PriorityCache::Priority pri
= static_cast<PriorityCache::Priority
>(i
);
2123 total
+= get_cache_bytes(pri
);
2127 virtual void set_cache_bytes(PriorityCache::Priority pri
, int64_t bytes
) {
2128 cache_bytes
[pri
] = bytes
;
2130 virtual void add_cache_bytes(PriorityCache::Priority pri
, int64_t bytes
) {
2131 cache_bytes
[pri
] += bytes
;
2133 virtual int64_t commit_cache_size(uint64_t total_cache
) {
2134 committed_bytes
= PriorityCache::get_chunk(
2135 get_cache_bytes(), total_cache
);
2136 return committed_bytes
;
2138 virtual int64_t get_committed_size() const {
2139 return committed_bytes
;
2141 virtual double get_cache_ratio() const {
2144 virtual void set_cache_ratio(double ratio
) {
2145 cache_ratio
= ratio
;
2147 virtual string
get_cache_name() const = 0;
2150 struct MetaCache
: public MempoolCache
{
2151 MetaCache(BlueStore
*s
) : MempoolCache(s
) {};
2153 virtual uint64_t _get_used_bytes() const {
2154 return mempool::bluestore_cache_other::allocated_bytes() +
2155 mempool::bluestore_cache_onode::allocated_bytes();
2158 virtual string
get_cache_name() const {
2159 return "BlueStore Meta Cache";
2162 uint64_t _get_num_onodes() const {
2163 uint64_t onode_num
=
2164 mempool::bluestore_cache_onode::allocated_items();
2165 return (2 > onode_num
) ? 2 : onode_num
;
2168 double get_bytes_per_onode() const {
2169 return (double)_get_used_bytes() / (double)_get_num_onodes();
2172 std::shared_ptr
<MetaCache
> meta_cache
;
2174 struct DataCache
: public MempoolCache
{
2175 DataCache(BlueStore
*s
) : MempoolCache(s
) {};
2177 virtual uint64_t _get_used_bytes() const {
2179 for (auto i
: store
->buffer_cache_shards
) {
2180 bytes
+= i
->_get_bytes();
2184 virtual string
get_cache_name() const {
2185 return "BlueStore Data Cache";
2188 std::shared_ptr
<DataCache
> data_cache
;
2191 explicit MempoolThread(BlueStore
*s
)
2193 meta_cache(new MetaCache(s
)),
2194 data_cache(new DataCache(s
)) {}
2196 void *entry() override
;
2198 ceph_assert(stop
== false);
2199 create("bstore_mempool");
2210 void _adjust_cache_settings();
2211 void _update_cache_settings();
2212 void _resize_shards(bool interval_stats
);
2215 // --------------------------------------------------------
2218 void _init_logger();
2219 void _shutdown_logger();
2220 int _reload_logger();
2224 int _open_fsid(bool create
);
2226 int _read_fsid(uuid_d
*f
);
2229 void _set_alloc_sizes();
2230 void _set_blob_size();
2231 void _set_finisher_num();
2232 void _set_per_pool_omap();
2233 void _update_osd_memory_options();
2235 int _open_bdev(bool create
);
2236 // Verifies if disk space is enough for reserved + min bluefs
2237 // and alters the latter if needed.
2238 // Depends on min_alloc_size hence should be called after
2239 // its initialization (and outside of _open_bdev)
2240 void _validate_bdev();
2243 int _minimal_open_bluefs(bool create
);
2244 void _minimal_close_bluefs();
2245 int _open_bluefs(bool create
);
2246 void _close_bluefs(bool cold_close
);
2248 // Limited (u)mount intended for BlueFS operations only
2249 int _mount_for_bluefs();
2250 void _umount_for_bluefs();
2253 int _is_bluefs(bool create
, bool* ret
);
2255 * opens both DB and dependant super_meta, FreelistManager and allocator
2256 * in the proper order
2258 int _open_db_and_around(bool read_only
);
2259 void _close_db_and_around(bool read_only
);
2261 // updates legacy bluefs related recs in DB to a state valid for
2262 // downgrades from nautilus.
2263 void _sync_bluefs_and_fm();
2266 * @warning to_repair_db means that we open this db to repair it, will not
2267 * hold the rocksdb's file lock.
2269 int _open_db(bool create
,
2270 bool to_repair_db
=false,
2271 bool read_only
= false);
2272 void _close_db(bool read_only
);
2273 int _open_fm(KeyValueDB::Transaction t
, bool read_only
);
2275 int _write_out_fm_meta(uint64_t target_size
,
2276 bool update_root_size
= false,
2277 bluestore_bdev_label_t
* res_label
= nullptr);
2279 void _close_alloc();
2280 int _open_collections();
2281 void _fsck_collections(int64_t* errors
);
2282 void _close_collections();
2284 int _setup_block_symlink_or_file(string name
, string path
, uint64_t size
,
2288 utime_t
get_deferred_last_submitted() {
2289 std::lock_guard
l(deferred_lock
);
2290 return deferred_last_submitted
;
2293 static int _write_bdev_label(CephContext
* cct
,
2294 string path
, bluestore_bdev_label_t label
);
2295 static int _read_bdev_label(CephContext
* cct
, string path
,
2296 bluestore_bdev_label_t
*label
);
2298 int _check_or_set_bdev_label(string path
, uint64_t size
, string desc
,
2301 int _open_super_meta();
2303 void _open_statfs();
2304 void _get_statfs_overall(struct store_statfs_t
*buf
);
2306 void _dump_alloc_on_failure();
2308 int64_t _get_bluefs_size_delta(uint64_t bluefs_free
, uint64_t bluefs_total
);
2309 int _balance_bluefs_freespace();
2311 CollectionRef
_get_collection(const coll_t
& cid
);
2312 void _queue_reap_collection(CollectionRef
& c
);
2313 void _reap_collections();
2314 void _update_cache_logger();
2316 void _assign_nid(TransContext
*txc
, OnodeRef o
);
2317 uint64_t _assign_blobid(TransContext
*txc
);
2319 template <int LogLevelV
>
2320 friend void _dump_onode(CephContext
*cct
, const Onode
& o
);
2321 template <int LogLevelV
>
2322 friend void _dump_extent_map(CephContext
*cct
, const ExtentMap
& em
);
2323 template <int LogLevelV
>
2324 friend void _dump_transaction(CephContext
*cct
, Transaction
*t
);
2326 TransContext
*_txc_create(Collection
*c
, OpSequencer
*osr
,
2327 list
<Context
*> *on_commits
);
2328 void _txc_update_store_statfs(TransContext
*txc
);
2329 void _txc_add_transaction(TransContext
*txc
, Transaction
*t
);
2330 void _txc_calc_cost(TransContext
*txc
);
2331 void _txc_write_nodes(TransContext
*txc
, KeyValueDB::Transaction t
);
2332 void _txc_state_proc(TransContext
*txc
);
2333 void _txc_aio_submit(TransContext
*txc
);
2335 void txc_aio_finish(void *p
) {
2336 _txc_state_proc(static_cast<TransContext
*>(p
));
2339 void _txc_finish_io(TransContext
*txc
);
2340 void _txc_finalize_kv(TransContext
*txc
, KeyValueDB::Transaction t
);
2341 void _txc_apply_kv(TransContext
*txc
, bool sync_submit_transaction
);
2342 void _txc_committed_kv(TransContext
*txc
);
2343 void _txc_finish(TransContext
*txc
);
2344 void _txc_release_alloc(TransContext
*txc
);
2346 void _osr_attach(Collection
*c
);
2347 void _osr_register_zombie(OpSequencer
*osr
);
2348 void _osr_drain(OpSequencer
*osr
);
2349 void _osr_drain_preceding(TransContext
*txc
);
2350 void _osr_drain_all();
2354 void _kv_sync_thread();
2355 void _kv_finalize_thread();
2357 bluestore_deferred_op_t
*_get_deferred_op(TransContext
*txc
);
2358 void _deferred_queue(TransContext
*txc
);
2360 void deferred_try_submit();
2362 void _deferred_submit_unlock(OpSequencer
*osr
);
2363 void _deferred_aio_finish(OpSequencer
*osr
);
2364 int _deferred_replay();
2367 using mempool_dynamic_bitset
=
2368 boost::dynamic_bitset
<uint64_t,
2369 mempool::bluestore_fsck::pool_allocator
<uint64_t>>;
2370 using per_pool_statfs
=
2371 mempool::bluestore_fsck::map
<uint64_t, store_statfs_t
>;
2379 MAX_FSCK_ERROR_LINES
= 100,
2383 int _fsck_check_extents(
2385 const ghobject_t
& oid
,
2386 const PExtentVector
& extents
,
2388 mempool_dynamic_bitset
&used_blocks
,
2389 uint64_t granularity
,
2390 BlueStoreRepairer
* repairer
,
2391 store_statfs_t
& expected_statfs
,
2394 void _fsck_check_pool_statfs(
2395 per_pool_statfs
& expected_pool_statfs
,
2398 BlueStoreRepairer
* repairer
);
2400 int _fsck(FSCKDepth depth
, bool repair
);
2401 int _fsck_on_open(BlueStore::FSCKDepth depth
, bool repair
);
2403 void _buffer_cache_write(
2409 b
->shared_blob
->bc
.write(b
->shared_blob
->get_cache(), txc
->seq
, offset
, bl
,
2411 txc
->shared_blobs_written
.insert(b
->shared_blob
);
2414 int _collection_list(
2415 Collection
*c
, const ghobject_t
& start
, const ghobject_t
& end
,
2416 int max
, vector
<ghobject_t
> *ls
, ghobject_t
*next
);
2418 template <typename T
, typename F
>
2419 T
select_option(const std::string
& opt_name
, T val1
, F f
) {
2420 //NB: opt_name reserved for future use
2421 boost::optional
<T
> val2
= f();
2428 void _apply_padding(uint64_t head_pad
,
2430 bufferlist
& padded
);
2432 void _record_onode(OnodeRef
&o
, KeyValueDB::Transaction
&txn
);
2434 // -- ondisk version ---
2436 const int32_t latest_ondisk_format
= 4; ///< our version
2437 const int32_t min_readable_ondisk_format
= 1; ///< what we can read
2438 const int32_t min_compat_ondisk_format
= 3; ///< who can read us
2441 int32_t ondisk_format
= 0; ///< value detected on mount
2443 int _upgrade_super(); ///< upgrade (called during open_super)
2444 uint64_t _get_ondisk_reserved() const;
2445 void _prepare_ondisk_format_super(KeyValueDB::Transaction
& t
);
2447 // --- public interface ---
2449 BlueStore(CephContext
*cct
, const string
& path
);
2450 BlueStore(CephContext
*cct
, const string
& path
, uint64_t min_alloc_size
); // Ctor for UT only
2451 ~BlueStore() override
;
2453 string
get_type() override
{
2457 bool needs_journal() override
{ return false; };
2458 bool wants_journal() override
{ return false; };
2459 bool allows_journal() override
{ return false; };
2461 uint64_t get_min_alloc_size() const override
{
2462 return min_alloc_size
;
2465 int get_devices(set
<string
> *ls
) override
;
2467 bool is_rotational() override
;
2468 bool is_journal_rotational() override
;
2470 string
get_default_device_class() override
{
2471 string device_class
;
2472 map
<string
, string
> metadata
;
2473 collect_metadata(&metadata
);
2474 auto it
= metadata
.find("bluestore_bdev_type");
2475 if (it
!= metadata
.end()) {
2476 device_class
= it
->second
;
2478 return device_class
;
2484 set
<string
> *failed
) override
;
2486 static int get_block_device_fsid(CephContext
* cct
, const string
& path
,
2489 bool test_mount_in_use() override
;
2492 int _mount(bool kv_only
, bool open_db
=true);
2494 int mount() override
{
2495 return _mount(false);
2497 int umount() override
;
2499 int start_kv_only(KeyValueDB
**pdb
, bool open_db
=true) {
2500 int r
= _mount(true, open_db
);
2507 int write_meta(const std::string
& key
, const std::string
& value
) override
;
2508 int read_meta(const std::string
& key
, std::string
*value
) override
;
2513 int fsck(bool deep
) override
{
2514 return _fsck(deep
? FSCK_DEEP
: FSCK_REGULAR
, false);
2516 int repair(bool deep
) override
{
2517 return _fsck(deep
? FSCK_DEEP
: FSCK_REGULAR
, true);
2519 int quick_fix() override
{
2520 return _fsck(FSCK_SHALLOW
, true);
2523 void set_cache_shards(unsigned num
) override
;
2524 void dump_cache_stats(Formatter
*f
) override
{
2525 int onode_count
= 0, buffers_bytes
= 0;
2526 for (auto i
: onode_cache_shards
) {
2527 onode_count
+= i
->_get_num();
2529 for (auto i
: buffer_cache_shards
) {
2530 buffers_bytes
+= i
->_get_bytes();
2532 f
->dump_int("bluestore_onode", onode_count
);
2533 f
->dump_int("bluestore_buffers", buffers_bytes
);
2535 void dump_cache_stats(ostream
& ss
) override
{
2536 int onode_count
= 0, buffers_bytes
= 0;
2537 for (auto i
: onode_cache_shards
) {
2538 onode_count
+= i
->_get_num();
2540 for (auto i
: buffer_cache_shards
) {
2541 buffers_bytes
+= i
->_get_bytes();
2543 ss
<< "bluestore_onode: " << onode_count
;
2544 ss
<< "bluestore_buffers: " << buffers_bytes
;
2547 int validate_hobject_key(const hobject_t
&obj
) const override
{
2550 unsigned get_max_attr_name_length() override
{
2551 return 256; // arbitrary; there is no real limit internally
2554 int mkfs() override
;
2555 int mkjournal() override
{
2559 void get_db_statistics(Formatter
*f
) override
;
2560 void generate_db_histogram(Formatter
*f
) override
;
2561 void _flush_cache();
2562 int flush_cache(ostream
*os
= NULL
) override
;
2563 void dump_perf_counters(Formatter
*f
) override
{
2564 f
->open_object_section("perf_counters");
2565 logger
->dump_formatted(f
, false);
2569 int add_new_bluefs_device(int id
, const string
& path
);
2570 int migrate_to_existing_bluefs_device(const set
<int>& devs_source
,
2572 int migrate_to_new_bluefs_device(const set
<int>& devs_source
,
2574 const string
& path
);
2575 int expand_devices(ostream
& out
);
2576 string
get_device_path(unsigned id
);
2578 int dump_bluefs_sizes(ostream
& out
);
2581 int statfs(struct store_statfs_t
*buf
,
2582 osd_alert_list_t
* alerts
= nullptr) override
;
2583 int pool_statfs(uint64_t pool_id
, struct store_statfs_t
*buf
,
2584 bool *per_pool_omap
) override
;
2586 void collect_metadata(map
<string
,string
> *pm
) override
;
2588 bool exists(CollectionHandle
&c
, const ghobject_t
& oid
) override
;
2589 int set_collection_opts(
2590 CollectionHandle
& c
,
2591 const pool_opts_t
& opts
) override
;
2593 CollectionHandle
&c
,
2594 const ghobject_t
& oid
,
2596 bool allow_eio
= false) override
;
2598 CollectionHandle
&c
,
2599 const ghobject_t
& oid
,
2603 uint32_t op_flags
= 0) override
;
2607 // --------------------------------------------------------
2608 // intermediate data structures used while reading
2610 uint64_t logical_offset
;
2611 uint64_t blob_xoffset
; //region offset within the blob
2614 // used later in read process
2617 region_t(uint64_t offset
, uint64_t b_offs
, uint64_t len
, uint64_t front
= 0)
2618 : logical_offset(offset
),
2619 blob_xoffset(b_offs
),
2622 region_t(const region_t
& from
)
2623 : logical_offset(from
.logical_offset
),
2624 blob_xoffset(from
.blob_xoffset
),
2625 length(from
.length
),
2628 friend ostream
& operator<<(ostream
& out
, const region_t
& r
) {
2629 return out
<< "0x" << std::hex
<< r
.logical_offset
<< ":"
2630 << r
.blob_xoffset
<< "~" << r
.length
<< std::dec
;
2634 // merged blob read request
2639 std::list
<region_t
> regs
; // original read regions
2641 read_req_t(uint64_t off
, uint64_t len
) : r_off(off
), r_len(len
) {}
2643 friend ostream
& operator<<(ostream
& out
, const read_req_t
& r
) {
2644 out
<< "{<0x" << std::hex
<< r
.r_off
<< ", 0x" << r
.r_len
<< "> : [";
2645 for (const auto& reg
: r
.regs
)
2647 return out
<< "]}" << std::dec
;
2651 typedef list
<read_req_t
> regions2read_t
;
2652 typedef map
<BlueStore::BlobRef
, regions2read_t
> blobs2read_t
;
2658 int read_cache_policy
,
2659 ready_regions_t
& ready_regions
,
2660 blobs2read_t
& blobs2read
);
2663 int _prepare_read_ioc(
2664 blobs2read_t
& blobs2read
,
2665 vector
<bufferlist
>* compressed_blob_bls
,
2668 int _generate_read_result_bl(
2672 ready_regions_t
& ready_regions
,
2673 vector
<bufferlist
>& compressed_blob_bls
,
2674 blobs2read_t
& blobs2read
,
2685 uint32_t op_flags
= 0,
2686 uint64_t retry_count
= 0);
2691 const interval_set
<uint64_t>& m
,
2693 uint32_t op_flags
= 0,
2694 uint64_t retry_count
= 0);
2696 int _fiemap(CollectionHandle
&c_
, const ghobject_t
& oid
,
2697 uint64_t offset
, size_t len
, interval_set
<uint64_t>& destset
);
2699 int fiemap(CollectionHandle
&c
, const ghobject_t
& oid
,
2700 uint64_t offset
, size_t len
, bufferlist
& bl
) override
;
2701 int fiemap(CollectionHandle
&c
, const ghobject_t
& oid
,
2702 uint64_t offset
, size_t len
, map
<uint64_t, uint64_t>& destmap
) override
;
2705 CollectionHandle
&c_
,
2706 const ghobject_t
& oid
,
2707 interval_set
<uint64_t>& m
,
2709 uint32_t op_flags
) override
;
2711 int dump_onode(CollectionHandle
&c
, const ghobject_t
& oid
,
2712 const string
& section_name
, Formatter
*f
) override
;
2714 int getattr(CollectionHandle
&c
, const ghobject_t
& oid
, const char *name
,
2715 bufferptr
& value
) override
;
2717 int getattrs(CollectionHandle
&c
, const ghobject_t
& oid
,
2718 map
<string
,bufferptr
>& aset
) override
;
2720 int list_collections(vector
<coll_t
>& ls
) override
;
2722 CollectionHandle
open_collection(const coll_t
&c
) override
;
2723 CollectionHandle
create_new_collection(const coll_t
& cid
) override
;
2724 void set_collection_commit_queue(const coll_t
& cid
,
2725 ContextQueue
*commit_queue
) override
;
2727 bool collection_exists(const coll_t
& c
) override
;
2728 int collection_empty(CollectionHandle
& c
, bool *empty
) override
;
2729 int collection_bits(CollectionHandle
& c
) override
;
2731 int collection_list(CollectionHandle
&c
,
2732 const ghobject_t
& start
,
2733 const ghobject_t
& end
,
2735 vector
<ghobject_t
> *ls
, ghobject_t
*next
) override
;
2738 CollectionHandle
&c
, ///< [in] Collection containing oid
2739 const ghobject_t
&oid
, ///< [in] Object containing omap
2740 bufferlist
*header
, ///< [out] omap header
2741 map
<string
, bufferlist
> *out
/// < [out] Key to value map
2744 Collection
*c
, ///< [in] Collection containing oid
2745 const ghobject_t
&oid
, ///< [in] Object containing omap
2746 bufferlist
*header
, ///< [out] omap header
2747 map
<string
, bufferlist
> *out
/// < [out] Key to value map
2749 int _onode_omap_get(
2750 const OnodeRef
&o
, ///< [in] Object containing omap
2751 bufferlist
*header
, ///< [out] omap header
2752 map
<string
, bufferlist
> *out
/// < [out] Key to value map
2757 int omap_get_header(
2758 CollectionHandle
&c
, ///< [in] Collection containing oid
2759 const ghobject_t
&oid
, ///< [in] Object containing omap
2760 bufferlist
*header
, ///< [out] omap header
2761 bool allow_eio
= false ///< [in] don't assert on eio
2764 /// Get keys defined on oid
2766 CollectionHandle
&c
, ///< [in] Collection containing oid
2767 const ghobject_t
&oid
, ///< [in] Object containing omap
2768 set
<string
> *keys
///< [out] Keys defined on oid
2772 int omap_get_values(
2773 CollectionHandle
&c
, ///< [in] Collection containing oid
2774 const ghobject_t
&oid
, ///< [in] Object containing omap
2775 const set
<string
> &keys
, ///< [in] Keys to get
2776 map
<string
, bufferlist
> *out
///< [out] Returned keys and values
2780 int omap_get_values(
2781 CollectionHandle
&c
, ///< [in] Collection containing oid
2782 const ghobject_t
&oid
, ///< [in] Object containing omap
2783 const std::optional
<string
> &start_after
, ///< [in] Keys to get
2784 map
<string
, bufferlist
> *out
///< [out] Returned keys and values
2788 /// Filters keys into out which are defined on oid
2789 int omap_check_keys(
2790 CollectionHandle
&c
, ///< [in] Collection containing oid
2791 const ghobject_t
&oid
, ///< [in] Object containing omap
2792 const set
<string
> &keys
, ///< [in] Keys to check
2793 set
<string
> *out
///< [out] Subset of keys defined on oid
2796 ObjectMap::ObjectMapIterator
get_omap_iterator(
2797 CollectionHandle
&c
, ///< [in] collection
2798 const ghobject_t
&oid
///< [in] object
2801 void set_fsid(uuid_d u
) override
{
2804 uuid_d
get_fsid() override
{
2808 uint64_t estimate_objects_overhead(uint64_t num_objects
) override
{
2809 return num_objects
* 300; //assuming per-object overhead is 300 bytes
2812 struct BSPerfTracker
{
2813 PerfCounters::avg_tracker
<uint64_t> os_commit_latency_ns
;
2814 PerfCounters::avg_tracker
<uint64_t> os_apply_latency_ns
;
2816 objectstore_perf_stat_t
get_cur_stats() const {
2817 objectstore_perf_stat_t ret
;
2818 ret
.os_commit_latency_ns
= os_commit_latency_ns
.current_avg();
2819 ret
.os_apply_latency_ns
= os_apply_latency_ns
.current_avg();
2823 void update_from_perfcounters(PerfCounters
&logger
);
2826 objectstore_perf_stat_t
get_cur_stats() override
{
2827 perf_tracker
.update_from_perfcounters(*logger
);
2828 return perf_tracker
.get_cur_stats();
2830 const PerfCounters
* get_perf_counters() const override
{
2833 const PerfCounters
* get_bluefs_perf_counters() const {
2834 return bluefs
->get_perf_counters();
2837 int queue_transactions(
2838 CollectionHandle
& ch
,
2839 vector
<Transaction
>& tls
,
2840 TrackedOpRef op
= TrackedOpRef(),
2841 ThreadPool::TPHandle
*handle
= NULL
) override
;
2844 void inject_data_error(const ghobject_t
& o
) override
{
2845 std::unique_lock
l(debug_read_error_lock
);
2846 debug_data_error_objects
.insert(o
);
2848 void inject_mdata_error(const ghobject_t
& o
) override
{
2849 std::unique_lock
l(debug_read_error_lock
);
2850 debug_mdata_error_objects
.insert(o
);
2853 /// methods to inject various errors fsck can repair
2854 void inject_broken_shared_blob_key(const string
& key
,
2855 const bufferlist
& bl
);
2856 void inject_leaked(uint64_t len
);
2857 void inject_false_free(coll_t cid
, ghobject_t oid
);
2858 void inject_statfs(const string
& key
, const store_statfs_t
& new_statfs
);
2859 void inject_global_statfs(const store_statfs_t
& new_statfs
);
2860 void inject_misreference(coll_t cid1
, ghobject_t oid1
,
2861 coll_t cid2
, ghobject_t oid2
,
2863 // resets global per_pool_omap in DB
2864 void inject_legacy_omap();
2865 // resets per_pool_omap | pgmeta_omap for onode
2866 void inject_legacy_omap(coll_t cid
, ghobject_t oid
);
2868 void compact() override
{
2872 bool has_builtin_csum() const override
{
2877 Allocate space for BlueFS from slow device.
2878 Either automatically applies allocated extents to underlying
2879 BlueFS (extents == nullptr) or just return them (non-null extents) provided
2881 int allocate_bluefs_freespace(
2884 PExtentVector
* extents
);
2886 inline void log_latency(const char* name
,
2888 const ceph::timespan
& lat
,
2889 double lat_threshold
,
2890 const char* info
= "") const;
2892 inline void log_latency_fn(const char* name
,
2894 const ceph::timespan
& lat
,
2895 double lat_threshold
,
2896 std::function
<string (const ceph::timespan
& lat
)> fn
) const;
2899 bool _debug_data_eio(const ghobject_t
& o
) {
2900 if (!cct
->_conf
->bluestore_debug_inject_read_err
) {
2903 std::shared_lock
l(debug_read_error_lock
);
2904 return debug_data_error_objects
.count(o
);
2906 bool _debug_mdata_eio(const ghobject_t
& o
) {
2907 if (!cct
->_conf
->bluestore_debug_inject_read_err
) {
2910 std::shared_lock
l(debug_read_error_lock
);
2911 return debug_mdata_error_objects
.count(o
);
2913 void _debug_obj_on_delete(const ghobject_t
& o
) {
2914 if (cct
->_conf
->bluestore_debug_inject_read_err
) {
2915 std::unique_lock
l(debug_read_error_lock
);
2916 debug_data_error_objects
.erase(o
);
2917 debug_mdata_error_objects
.erase(o
);
2921 ceph::mutex qlock
= ceph::make_mutex("BlueStore::Alerts::qlock");
2922 string failed_cmode
;
2923 set
<string
> failed_compressors
;
2924 string spillover_alert
;
2925 string legacy_statfs_alert
;
2926 string no_per_pool_omap_alert
;
2927 string disk_size_mismatch_alert
;
2929 void _log_alerts(osd_alert_list_t
& alerts
);
2930 bool _set_compression_alert(bool cmode
, const char* s
) {
2931 std::lock_guard
l(qlock
);
2933 bool ret
= failed_cmode
.empty();
2937 return failed_compressors
.emplace(s
).second
;
2939 void _clear_compression_alert() {
2940 std::lock_guard
l(qlock
);
2941 failed_compressors
.clear();
2942 failed_cmode
.clear();
2945 void _set_spillover_alert(const string
& s
) {
2946 std::lock_guard
l(qlock
);
2947 spillover_alert
= s
;
2949 void _clear_spillover_alert() {
2950 std::lock_guard
l(qlock
);
2951 spillover_alert
.clear();
2954 void _check_legacy_statfs_alert();
2955 void _check_no_per_pool_omap_alert();
2956 void _set_disk_size_mismatch_alert(const string
& s
) {
2957 std::lock_guard
l(qlock
);
2958 disk_size_mismatch_alert
= s
;
2963 // --------------------------------------------------------
2964 // read processing internal methods
2967 const bluestore_blob_t
* blob
,
2968 uint64_t blob_xoffset
,
2969 const bufferlist
& bl
,
2970 uint64_t logical_offset
) const;
2971 int _decompress(bufferlist
& source
, bufferlist
* result
);
2974 // --------------------------------------------------------
2977 struct WriteContext
{
2978 bool buffered
= false; ///< buffered write
2979 bool compress
= false; ///< compressed write
2980 uint64_t target_blob_size
= 0; ///< target (max) blob size
2981 unsigned csum_order
= 0; ///< target checksum chunk order
2983 old_extent_map_t old_extents
; ///< must deref these blobs
2984 interval_set
<uint64_t> extents_to_gc
; ///< extents for garbage collection
2987 uint64_t logical_offset
; ///< write logical offset
2989 uint64_t blob_length
;
2992 uint64_t b_off0
; ///< original offset in a blob prior to padding
2993 uint64_t length0
; ///< original data length prior to padding
2996 bool new_blob
; ///< whether new blob was created
2998 bool compressed
= false;
2999 bufferlist compressed_bl
;
3000 size_t compressed_len
= 0;
3003 uint64_t logical_offs
,
3013 logical_offset(logical_offs
),
3015 blob_length(blob_len
),
3020 mark_unused(_mark_unused
),
3021 new_blob(_new_blob
) {}
3023 vector
<write_item
> writes
; ///< blobs we're writing
3025 /// partial clone of the context
3026 void fork(const WriteContext
& other
) {
3027 buffered
= other
.buffered
;
3028 compress
= other
.compress
;
3029 target_blob_size
= other
.target_blob_size
;
3030 csum_order
= other
.csum_order
;
3042 writes
.emplace_back(loffs
,
3052 /// Checks for writes to the same pextent within a blob
3057 uint64_t min_alloc_size
);
3060 void _do_write_small(
3064 uint64_t offset
, uint64_t length
,
3065 bufferlist::iterator
& blp
,
3066 WriteContext
*wctx
);
3071 uint64_t offset
, uint64_t length
,
3072 bufferlist::iterator
& blp
,
3073 WriteContext
*wctx
);
3074 int _do_alloc_write(
3078 WriteContext
*wctx
);
3084 set
<SharedBlob
*> *maybe_unshared_blobs
=0);
3086 int _write(TransContext
*txc
,
3089 uint64_t offset
, size_t len
,
3091 uint32_t fadvise_flags
);
3092 void _pad_zeros(bufferlist
*bl
, uint64_t *offset
,
3093 uint64_t chunk_size
);
3095 void _choose_write_options(CollectionRef
& c
,
3097 uint32_t fadvise_flags
,
3098 WriteContext
*wctx
);
3100 int _do_gc(TransContext
*txc
,
3103 const WriteContext
& wctx
,
3104 uint64_t *dirty_start
,
3105 uint64_t *dirty_end
);
3107 int _do_write(TransContext
*txc
,
3110 uint64_t offset
, uint64_t length
,
3112 uint32_t fadvise_flags
);
3113 void _do_write_data(TransContext
*txc
,
3119 WriteContext
*wctx
);
3121 int _touch(TransContext
*txc
,
3124 int _do_zero(TransContext
*txc
,
3127 uint64_t offset
, size_t len
);
3128 int _zero(TransContext
*txc
,
3131 uint64_t offset
, size_t len
);
3132 void _do_truncate(TransContext
*txc
,
3136 set
<SharedBlob
*> *maybe_unshared_blobs
=0);
3137 int _truncate(TransContext
*txc
,
3141 int _remove(TransContext
*txc
,
3144 int _do_remove(TransContext
*txc
,
3147 int _setattr(TransContext
*txc
,
3152 int _setattrs(TransContext
*txc
,
3155 const map
<string
,bufferptr
>& aset
);
3156 int _rmattr(TransContext
*txc
,
3159 const string
& name
);
3160 int _rmattrs(TransContext
*txc
,
3163 void _do_omap_clear(TransContext
*txc
, OnodeRef
&o
);
3164 int _omap_clear(TransContext
*txc
,
3167 int _omap_setkeys(TransContext
*txc
,
3171 int _omap_setheader(TransContext
*txc
,
3174 bufferlist
& header
);
3175 int _omap_rmkeys(TransContext
*txc
,
3179 int _omap_rmkey_range(TransContext
*txc
,
3182 const string
& first
, const string
& last
);
3183 int _set_alloc_hint(
3187 uint64_t expected_object_size
,
3188 uint64_t expected_write_size
,
3190 int _do_clone_range(TransContext
*txc
,
3194 uint64_t srcoff
, uint64_t length
, uint64_t dstoff
);
3195 int _clone(TransContext
*txc
,
3199 int _clone_range(TransContext
*txc
,
3203 uint64_t srcoff
, uint64_t length
, uint64_t dstoff
);
3204 int _rename(TransContext
*txc
,
3208 const ghobject_t
& new_oid
);
3209 int _create_collection(TransContext
*txc
, const coll_t
&cid
,
3210 unsigned bits
, CollectionRef
*c
);
3211 int _remove_collection(TransContext
*txc
, const coll_t
&cid
,
3213 void _do_remove_collection(TransContext
*txc
, CollectionRef
*c
);
3214 int _split_collection(TransContext
*txc
,
3217 unsigned bits
, int rem
);
3218 int _merge_collection(TransContext
*txc
,
3223 void _collect_allocation_stats(uint64_t need
, uint32_t alloc_size
,
3225 void _record_allocation_stats();
3227 uint64_t probe_count
= 0;
3228 std::atomic
<uint64_t> alloc_stats_count
= {0};
3229 std::atomic
<uint64_t> alloc_stats_fragments
= { 0 };
3230 std::atomic
<uint64_t> alloc_stats_size
= { 0 };
3232 std::array
<std::tuple
<uint64_t, uint64_t, uint64_t>, 5> alloc_stats_history
=
3233 { std::make_tuple(0ul, 0ul, 0ul) };
3235 std::atomic
<uint64_t> out_of_sync_fm
= {0};
3236 // --------------------------------------------------------
3237 // BlueFSDeviceExpander implementation
3238 uint64_t get_recommended_expansion_delta(uint64_t bluefs_free
,
3239 uint64_t bluefs_total
) override
{
3240 auto delta
= _get_bluefs_size_delta(bluefs_free
, bluefs_total
);
3241 return delta
> 0 ? delta
: 0;
3243 int allocate_freespace(
3246 PExtentVector
& extents
) override
{
3247 return allocate_bluefs_freespace(min_size
, size
, &extents
);
3249 uint64_t available_freespace(uint64_t alloc_size
) override
;
3250 inline bool _use_rotational_settings();
3255 int64_t pool_id
= INT64_MIN
;
3256 list
<ghobject_t
> oids
;
3257 BlueStore::SharedBlobRef sb
;
3258 bluestore_extent_ref_map_t ref_map
;
3259 bool compressed
= false;
3260 bool passed
= false;
3261 bool updated
= false;
3263 typedef btree::btree_set
<
3264 uint64_t, std::less
<uint64_t>,
3265 mempool::bluestore_fsck::pool_allocator
<uint64_t>> uint64_t_btree_t
;
3267 typedef mempool::bluestore_fsck::map
<uint64_t, sb_info_t
> sb_info_map_t
;
3268 struct FSCK_ObjectCtx
{
3271 uint64_t& num_objects
;
3272 uint64_t& num_extents
;
3273 uint64_t& num_blobs
;
3274 uint64_t& num_sharded_objects
;
3275 uint64_t& num_spanning_blobs
;
3277 mempool_dynamic_bitset
* used_blocks
;
3278 uint64_t_btree_t
* used_omap_head
;
3280 ceph::mutex
* sb_info_lock
;
3281 sb_info_map_t
& sb_info
;
3283 store_statfs_t
& expected_store_statfs
;
3284 per_pool_statfs
& expected_pool_statfs
;
3285 BlueStoreRepairer
* repairer
;
3287 FSCK_ObjectCtx(int64_t& e
,
3289 uint64_t& _num_objects
,
3290 uint64_t& _num_extents
,
3291 uint64_t& _num_blobs
,
3292 uint64_t& _num_sharded_objects
,
3293 uint64_t& _num_spanning_blobs
,
3294 mempool_dynamic_bitset
* _ub
,
3295 uint64_t_btree_t
* _used_omap_head
,
3296 ceph::mutex
* _sb_info_lock
,
3297 sb_info_map_t
& _sb_info
,
3298 store_statfs_t
& _store_statfs
,
3299 per_pool_statfs
& _pool_statfs
,
3300 BlueStoreRepairer
* _repairer
) :
3303 num_objects(_num_objects
),
3304 num_extents(_num_extents
),
3305 num_blobs(_num_blobs
),
3306 num_sharded_objects(_num_sharded_objects
),
3307 num_spanning_blobs(_num_spanning_blobs
),
3309 used_omap_head(_used_omap_head
),
3310 sb_info_lock(_sb_info_lock
),
3312 expected_store_statfs(_store_statfs
),
3313 expected_pool_statfs(_pool_statfs
),
3314 repairer(_repairer
) {
3318 OnodeRef
fsck_check_objects_shallow(
3322 const ghobject_t
& oid
,
3324 const bufferlist
& value
,
3325 mempool::bluestore_fsck::list
<string
>* expecting_shards
,
3326 map
<BlobRef
, bluestore_blob_t::unused_t
>* referenced
,
3327 const BlueStore::FSCK_ObjectCtx
& ctx
);
3330 void _fsck_check_object_omap(FSCKDepth depth
,
3332 const BlueStore::FSCK_ObjectCtx
& ctx
);
3334 void _fsck_check_objects(FSCKDepth depth
,
3335 FSCK_ObjectCtx
& ctx
);
3338 inline ostream
& operator<<(ostream
& out
, const BlueStore::volatile_statfs
& s
) {
3341 << s
.values
[BlueStore::volatile_statfs::STATFS_ALLOCATED
]
3343 << s
.values
[BlueStore::volatile_statfs::STATFS_STORED
]
3345 << s
.values
[BlueStore::volatile_statfs::STATFS_COMPRESSED
]
3346 << " compressed_orig:"
3347 << s
.values
[BlueStore::volatile_statfs::STATFS_COMPRESSED_ORIGINAL
]
3348 << " compressed_alloc:"
3349 << s
.values
[BlueStore::volatile_statfs::STATFS_COMPRESSED_ALLOCATED
];
3352 static inline void intrusive_ptr_add_ref(BlueStore::Onode
*o
) {
3355 static inline void intrusive_ptr_release(BlueStore::Onode
*o
) {
3359 static inline void intrusive_ptr_add_ref(BlueStore::OpSequencer
*o
) {
3362 static inline void intrusive_ptr_release(BlueStore::OpSequencer
*o
) {
3366 class BlueStoreRepairer
3369 // to simplify future potential migration to mempools
3370 using fsck_interval
= interval_set
<uint64_t>;
3372 // Structure to track what pextents are used for specific cid/oid.
3373 // Similar to Bloom filter positive and false-positive matches are
3375 // Maintains two lists of bloom filters for both cids and oids
3376 // where each list entry is a BF for specific disk pextent
3377 // The length of the extent per filter is measured on init.
3378 // Allows to filter out 'uninteresting' pextents to speadup subsequent
3379 // 'is_used' access.
3380 struct StoreSpaceTracker
{
3381 const uint64_t BLOOM_FILTER_SALT_COUNT
= 2;
3382 const uint64_t BLOOM_FILTER_TABLE_SIZE
= 32; // bytes per single filter
3383 const uint64_t BLOOM_FILTER_EXPECTED_COUNT
= 16; // arbitrary selected
3384 static const uint64_t DEF_MEM_CAP
= 128 * 1024 * 1024;
3386 typedef mempool::bluestore_fsck::vector
<bloom_filter
> bloom_vector
;
3387 bloom_vector collections_bfs
;
3388 bloom_vector objects_bfs
;
3390 bool was_filtered_out
= false;
3391 uint64_t granularity
= 0; // extent length for a single filter
3393 StoreSpaceTracker() {
3395 StoreSpaceTracker(const StoreSpaceTracker
& from
) :
3396 collections_bfs(from
.collections_bfs
),
3397 objects_bfs(from
.objects_bfs
),
3398 granularity(from
.granularity
) {
3401 void init(uint64_t total
,
3402 uint64_t min_alloc_size
,
3403 uint64_t mem_cap
= DEF_MEM_CAP
) {
3404 ceph_assert(!granularity
); // not initialized yet
3405 ceph_assert(min_alloc_size
&& isp2(min_alloc_size
));
3406 ceph_assert(mem_cap
);
3408 total
= round_up_to(total
, min_alloc_size
);
3409 granularity
= total
* BLOOM_FILTER_TABLE_SIZE
* 2 / mem_cap
;
3412 granularity
= min_alloc_size
;
3414 granularity
= round_up_to(granularity
, min_alloc_size
);
3417 uint64_t entries
= round_up_to(total
, granularity
) / granularity
;
3418 collections_bfs
.resize(entries
,
3419 bloom_filter(BLOOM_FILTER_SALT_COUNT
,
3420 BLOOM_FILTER_TABLE_SIZE
,
3422 BLOOM_FILTER_EXPECTED_COUNT
));
3423 objects_bfs
.resize(entries
,
3424 bloom_filter(BLOOM_FILTER_SALT_COUNT
,
3425 BLOOM_FILTER_TABLE_SIZE
,
3427 BLOOM_FILTER_EXPECTED_COUNT
));
3429 inline uint32_t get_hash(const coll_t
& cid
) const {
3430 return cid
.hash_to_shard(1);
3432 inline void set_used(uint64_t offset
, uint64_t len
,
3433 const coll_t
& cid
, const ghobject_t
& oid
) {
3434 ceph_assert(granularity
); // initialized
3436 // can't call this func after filter_out has been applied
3437 ceph_assert(!was_filtered_out
);
3441 auto pos
= offset
/ granularity
;
3442 auto end_pos
= (offset
+ len
- 1) / granularity
;
3443 while (pos
<= end_pos
) {
3444 collections_bfs
[pos
].insert(get_hash(cid
));
3445 objects_bfs
[pos
].insert(oid
.hobj
.get_hash());
3449 // filter-out entries unrelated to the specified(broken) extents.
3450 // 'is_used' calls are permitted after that only
3451 size_t filter_out(const fsck_interval
& extents
);
3453 // determines if collection's present after filtering-out
3454 inline bool is_used(const coll_t
& cid
) const {
3455 ceph_assert(was_filtered_out
);
3456 for(auto& bf
: collections_bfs
) {
3457 if (bf
.contains(get_hash(cid
))) {
3463 // determines if object's present after filtering-out
3464 inline bool is_used(const ghobject_t
& oid
) const {
3465 ceph_assert(was_filtered_out
);
3466 for(auto& bf
: objects_bfs
) {
3467 if (bf
.contains(oid
.hobj
.get_hash())) {
3473 // determines if collection's present before filtering-out
3474 inline bool is_used(const coll_t
& cid
, uint64_t offs
) const {
3475 ceph_assert(granularity
); // initialized
3476 ceph_assert(!was_filtered_out
);
3477 auto &bf
= collections_bfs
[offs
/ granularity
];
3478 if (bf
.contains(get_hash(cid
))) {
3483 // determines if object's present before filtering-out
3484 inline bool is_used(const ghobject_t
& oid
, uint64_t offs
) const {
3485 ceph_assert(granularity
); // initialized
3486 ceph_assert(!was_filtered_out
);
3487 auto &bf
= objects_bfs
[offs
/ granularity
];
3488 if (bf
.contains(oid
.hobj
.get_hash())) {
3495 void fix_per_pool_omap(KeyValueDB
*db
);
3496 bool remove_key(KeyValueDB
*db
, const string
& prefix
, const string
& key
);
3497 bool fix_shared_blob(KeyValueDB
*db
,
3499 const bufferlist
* bl
);
3500 bool fix_statfs(KeyValueDB
*db
, const string
& key
,
3501 const store_statfs_t
& new_statfs
);
3503 bool fix_leaked(KeyValueDB
*db
,
3504 FreelistManager
* fm
,
3505 uint64_t offset
, uint64_t len
);
3506 bool fix_false_free(KeyValueDB
*db
,
3507 FreelistManager
* fm
,
3508 uint64_t offset
, uint64_t len
);
3509 bool fix_bluefs_extents(std::atomic
<uint64_t>& out_of_sync_flag
);
3511 void init(uint64_t total_space
, uint64_t lres_tracking_unit_size
);
3513 bool preprocess_misreference(KeyValueDB
*db
);
3515 unsigned apply(KeyValueDB
* db
);
3517 void note_misreference(uint64_t offs
, uint64_t len
, bool inc_error
) {
3518 misreferenced_extents
.union_insert(offs
, len
);
3523 // In fact this is the only repairer's method which is thread-safe!!
3524 void inc_repaired() {
3528 StoreSpaceTracker
& get_space_usage_tracker() {
3529 return space_usage_tracker
;
3531 const fsck_interval
& get_misreferences() const {
3532 return misreferenced_extents
;
3534 KeyValueDB::Transaction
get_fix_misreferences_txn() {
3535 return fix_misreferences_txn
;
3539 std::atomic
<unsigned> to_repair_cnt
= { 0 };
3540 KeyValueDB::Transaction fix_per_pool_omap_txn
;
3541 KeyValueDB::Transaction fix_fm_leaked_txn
;
3542 KeyValueDB::Transaction fix_fm_false_free_txn
;
3543 KeyValueDB::Transaction remove_key_txn
;
3544 KeyValueDB::Transaction fix_statfs_txn
;
3545 KeyValueDB::Transaction fix_shared_blob_txn
;
3547 KeyValueDB::Transaction fix_misreferences_txn
;
3549 StoreSpaceTracker space_usage_tracker
;
3551 // non-shared extents with multiple references
3552 fsck_interval misreferenced_extents
;
3556 class RocksDBBlueFSVolumeSelector
: public BlueFSVolumeSelector
3558 template <class T
, size_t MaxX
, size_t MaxY
>
3560 T values
[MaxX
][MaxY
];
3565 T
& at(size_t x
, size_t y
) {
3566 ceph_assert(x
< MaxX
);
3567 ceph_assert(y
< MaxY
);
3569 return values
[x
][y
];
3571 size_t get_max_x() const {
3574 size_t get_max_y() const {
3578 memset(values
, 0, sizeof(values
));
3583 // use 0/nullptr as unset indication
3585 LEVEL_WAL
= LEVEL_FIRST
,
3590 // add +1 row for corresponding per-device totals
3591 // add +1 column for per-level actual (taken from file size) total
3592 typedef matrix_2d
<uint64_t, BlueFS::MAX_BDEV
+ 1, LEVEL_MAX
- LEVEL_FIRST
+ 1> per_level_per_dev_usage_t
;
3594 per_level_per_dev_usage_t per_level_per_dev_usage
;
3596 // Note: maximum per-device totals below might be smaller than corresponding
3597 // perf counters by up to a single alloc unit (1M) due to superblock extent.
3598 // The later is not accounted here.
3599 per_level_per_dev_usage_t per_level_per_dev_max
;
3601 uint64_t l_totals
[LEVEL_MAX
- LEVEL_FIRST
];
3602 uint64_t db_avail4slow
= 0;
3609 RocksDBBlueFSVolumeSelector(
3610 uint64_t _wal_total
,
3612 uint64_t _slow_total
,
3613 uint64_t _level0_size
,
3614 uint64_t _level_base
,
3615 uint64_t _level_multiplier
,
3616 double reserved_factor
,
3620 l_totals
[LEVEL_WAL
- LEVEL_FIRST
] = _wal_total
;
3621 l_totals
[LEVEL_DB
- LEVEL_FIRST
] = _db_total
;
3622 l_totals
[LEVEL_SLOW
- LEVEL_FIRST
] = _slow_total
;
3628 // Calculating how much extra space is available at DB volume.
3629 // Depending on the presence of explicit reserved size specification it might be either
3630 // * DB volume size - reserved
3632 // * DB volume size - sum_max_level_size(0, L-1) - max_level_size(L) * reserved_factor
3634 uint64_t prev_levels
= _level0_size
;
3635 uint64_t cur_level
= _level_base
;
3636 uint64_t cur_threshold
= 0;
3638 uint64_t next_level
= cur_level
* _level_multiplier
;
3639 uint64_t next_threshold
= prev_levels
+ cur_level
+ next_level
* reserved_factor
;
3640 if (_db_total
<= next_threshold
) {
3641 db_avail4slow
= cur_threshold
? _db_total
- cur_threshold
: 0;
3644 prev_levels
+= cur_level
;
3645 cur_level
= next_level
;
3646 cur_threshold
= next_threshold
;
3650 db_avail4slow
= _db_total
- reserved
;
3654 void* get_hint_by_device(uint8_t dev
) const override
{
3655 ceph_assert(dev
== BlueFS::BDEV_WAL
); // others aren't used atm
3656 return reinterpret_cast<void*>(LEVEL_WAL
);
3658 void* get_hint_by_dir(const string
& dirname
) const override
;
3660 void add_usage(void* hint
, const bluefs_fnode_t
& fnode
) override
{
3661 if (hint
== nullptr)
3663 size_t pos
= (size_t)hint
- LEVEL_FIRST
;
3664 for (auto& p
: fnode
.extents
) {
3665 auto& cur
= per_level_per_dev_usage
.at(p
.bdev
, pos
);
3666 auto& max
= per_level_per_dev_max
.at(p
.bdev
, pos
);
3672 //update per-device totals
3673 auto& cur
= per_level_per_dev_usage
.at(p
.bdev
, LEVEL_MAX
- LEVEL_FIRST
);
3674 auto& max
= per_level_per_dev_max
.at(p
.bdev
, LEVEL_MAX
- LEVEL_FIRST
);
3682 //update per-level actual totals
3683 auto& cur
= per_level_per_dev_usage
.at(BlueFS::MAX_BDEV
, pos
);
3684 auto& max
= per_level_per_dev_max
.at(BlueFS::MAX_BDEV
, pos
);
3691 void sub_usage(void* hint
, const bluefs_fnode_t
& fnode
) override
{
3692 if (hint
== nullptr)
3694 size_t pos
= (size_t)hint
- LEVEL_FIRST
;
3695 for (auto& p
: fnode
.extents
) {
3696 auto& cur
= per_level_per_dev_usage
.at(p
.bdev
, pos
);
3697 ceph_assert(cur
>= p
.length
);
3700 //update per-device totals
3701 auto& cur2
= per_level_per_dev_usage
.at(p
.bdev
, LEVEL_MAX
- LEVEL_FIRST
);
3702 ceph_assert(cur2
>= p
.length
);
3705 //update per-level actual totals
3706 auto& cur
= per_level_per_dev_usage
.at(BlueFS::MAX_BDEV
, pos
);
3707 ceph_assert(cur
>= fnode
.size
);
3710 void add_usage(void* hint
, uint64_t fsize
) override
{
3711 if (hint
== nullptr)
3713 size_t pos
= (size_t)hint
- LEVEL_FIRST
;
3714 //update per-level actual totals
3715 auto& cur
= per_level_per_dev_usage
.at(BlueFS::MAX_BDEV
, pos
);
3716 auto& max
= per_level_per_dev_max
.at(BlueFS::MAX_BDEV
, pos
);
3722 void sub_usage(void* hint
, uint64_t fsize
) override
{
3723 if (hint
== nullptr)
3725 size_t pos
= (size_t)hint
- LEVEL_FIRST
;
3726 //update per-level actual totals
3727 auto& cur
= per_level_per_dev_usage
.at(BlueFS::MAX_BDEV
, pos
);
3728 ceph_assert(cur
>= fsize
);
3729 per_level_per_dev_usage
.at(BlueFS::MAX_BDEV
, pos
) -= fsize
;
3732 uint8_t select_prefer_bdev(void* h
) override
;
3734 const std::string
& base
,
3735 BlueFSVolumeSelector::paths
& res
) const override
;
3737 void dump(ostream
& sout
) override
;