1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 * Ceph - scalable distributed file system
6 * Copyright (C) 2014 Red Hat
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
15 #ifndef CEPH_OSD_BLUESTORE_H
16 #define CEPH_OSD_BLUESTORE_H
24 #include <condition_variable>
26 #include <boost/intrusive/list.hpp>
27 #include <boost/intrusive/unordered_set.hpp>
28 #include <boost/intrusive/set.hpp>
29 #include <boost/functional/hash.hpp>
30 #include <boost/dynamic_bitset.hpp>
32 #include "include/assert.h"
33 #include "include/unordered_map.h"
34 #include "include/memory.h"
35 #include "include/mempool.h"
36 #include "common/Finisher.h"
37 #include "common/perf_counters.h"
38 #include "compressor/Compressor.h"
39 #include "os/ObjectStore.h"
41 #include "bluestore_types.h"
42 #include "BlockDevice.h"
43 #include "common/EventTrace.h"
46 class FreelistManager
;
50 //#define DEBUG_DEFERRED
54 // constants for Buffer::optimize()
55 #define MAX_BUFFER_SLOP_RATIO_DEN 8 // so actually 1/N
59 l_bluestore_first
= 732430,
60 l_bluestore_kv_flush_lat
,
61 l_bluestore_kv_commit_lat
,
63 l_bluestore_state_prepare_lat
,
64 l_bluestore_state_aio_wait_lat
,
65 l_bluestore_state_io_done_lat
,
66 l_bluestore_state_kv_queued_lat
,
67 l_bluestore_state_kv_committing_lat
,
68 l_bluestore_state_kv_done_lat
,
69 l_bluestore_state_deferred_queued_lat
,
70 l_bluestore_state_deferred_aio_wait_lat
,
71 l_bluestore_state_deferred_cleanup_lat
,
72 l_bluestore_state_finishing_lat
,
73 l_bluestore_state_done_lat
,
74 l_bluestore_throttle_lat
,
75 l_bluestore_submit_lat
,
76 l_bluestore_commit_lat
,
78 l_bluestore_read_onode_meta_lat
,
79 l_bluestore_read_wait_aio_lat
,
80 l_bluestore_compress_lat
,
81 l_bluestore_decompress_lat
,
83 l_bluestore_compress_success_count
,
84 l_bluestore_compress_rejected_count
,
85 l_bluestore_write_pad_bytes
,
86 l_bluestore_deferred_write_ops
,
87 l_bluestore_deferred_write_bytes
,
88 l_bluestore_write_penalty_read_ops
,
89 l_bluestore_allocated
,
91 l_bluestore_compressed
,
92 l_bluestore_compressed_allocated
,
93 l_bluestore_compressed_original
,
95 l_bluestore_onode_hits
,
96 l_bluestore_onode_misses
,
97 l_bluestore_onode_shard_hits
,
98 l_bluestore_onode_shard_misses
,
102 l_bluestore_buffer_bytes
,
103 l_bluestore_buffer_hit_bytes
,
104 l_bluestore_buffer_miss_bytes
,
105 l_bluestore_write_big
,
106 l_bluestore_write_big_bytes
,
107 l_bluestore_write_big_blobs
,
108 l_bluestore_write_small
,
109 l_bluestore_write_small_bytes
,
110 l_bluestore_write_small_unused
,
111 l_bluestore_write_small_deferred
,
112 l_bluestore_write_small_pre_read
,
113 l_bluestore_write_small_new
,
115 l_bluestore_onode_reshard
,
116 l_bluestore_blob_split
,
117 l_bluestore_extent_compress
,
118 l_bluestore_gc_merged
,
119 l_bluestore_read_eio
,
123 class BlueStore
: public ObjectStore
,
124 public md_config_obs_t
{
125 // -----------------------------------------------------
129 const char** get_tracked_conf_keys() const override
;
130 void handle_conf_change(const struct md_config_t
*conf
,
131 const std::set
<std::string
> &changed
) override
;
134 void _set_compression();
135 void _set_throttle_params();
136 int _set_cache_sizes();
140 typedef map
<uint64_t, bufferlist
> ready_regions_t
;
144 typedef boost::intrusive_ptr
<Collection
> CollectionRef
;
147 virtual void aio_finish(BlueStore
*store
) = 0;
148 virtual ~AioContext() {}
153 MEMPOOL_CLASS_HELPERS();
156 STATE_EMPTY
, ///< empty buffer -- used for cache history
157 STATE_CLEAN
, ///< clean data that is up to date
158 STATE_WRITING
, ///< data that is being written (io not yet complete)
160 static const char *get_state_name(int s
) {
162 case STATE_EMPTY
: return "empty";
163 case STATE_CLEAN
: return "clean";
164 case STATE_WRITING
: return "writing";
165 default: return "???";
169 FLAG_NOCACHE
= 1, ///< trim when done WRITING (do not become CLEAN)
170 // NOTE: fix operator<< when you define a second flag
172 static const char *get_flag_name(int s
) {
174 case FLAG_NOCACHE
: return "nocache";
175 default: return "???";
180 uint16_t state
; ///< STATE_*
181 uint16_t cache_private
= 0; ///< opaque (to us) value used by Cache impl
182 uint32_t flags
; ///< FLAG_*
184 uint32_t offset
, length
;
187 boost::intrusive::list_member_hook
<> lru_item
;
188 boost::intrusive::list_member_hook
<> state_item
;
190 Buffer(BufferSpace
*space
, unsigned s
, uint64_t q
, uint32_t o
, uint32_t l
,
192 : space(space
), state(s
), flags(f
), seq(q
), offset(o
), length(l
) {}
193 Buffer(BufferSpace
*space
, unsigned s
, uint64_t q
, uint32_t o
, bufferlist
& b
,
195 : space(space
), state(s
), flags(f
), seq(q
), offset(o
),
196 length(b
.length()), data(b
) {}
198 bool is_empty() const {
199 return state
== STATE_EMPTY
;
201 bool is_clean() const {
202 return state
== STATE_CLEAN
;
204 bool is_writing() const {
205 return state
== STATE_WRITING
;
208 uint32_t end() const {
209 return offset
+ length
;
212 void truncate(uint32_t newlen
) {
213 assert(newlen
< length
);
216 t
.substr_of(data
, 0, newlen
);
221 void maybe_rebuild() {
223 (data
.get_num_buffers() > 1 ||
224 data
.front().wasted() > data
.length() / MAX_BUFFER_SLOP_RATIO_DEN
)) {
229 void dump(Formatter
*f
) const {
230 f
->dump_string("state", get_state_name(state
));
231 f
->dump_unsigned("seq", seq
);
232 f
->dump_unsigned("offset", offset
);
233 f
->dump_unsigned("length", length
);
234 f
->dump_unsigned("data_length", data
.length());
240 /// map logical extent range (object) onto buffers
242 typedef boost::intrusive::list
<
244 boost::intrusive::member_hook
<
246 boost::intrusive::list_member_hook
<>,
247 &Buffer::state_item
> > state_list_t
;
249 mempool::bluestore_cache_other::map
<uint32_t, std::unique_ptr
<Buffer
>>
252 // we use a bare intrusive list here instead of std::map because
253 // it uses less memory and we expect this to be very small (very
254 // few IOs in flight to the same Blob at the same time).
255 state_list_t writing
; ///< writing buffers, sorted by seq, ascending
258 assert(buffer_map
.empty());
259 assert(writing
.empty());
262 void _add_buffer(Cache
* cache
, Buffer
*b
, int level
, Buffer
*near
) {
263 cache
->_audit("_add_buffer start");
264 buffer_map
[b
->offset
].reset(b
);
265 if (b
->is_writing()) {
266 b
->data
.reassign_to_mempool(mempool::mempool_bluestore_writing
);
267 if (writing
.empty() || writing
.rbegin()->seq
<= b
->seq
) {
268 writing
.push_back(*b
);
270 auto it
= writing
.begin();
271 while (it
->seq
< b
->seq
) {
275 assert(it
->seq
>= b
->seq
);
276 // note that this will insert b before it
277 // hence the order is maintained
278 writing
.insert(it
, *b
);
281 b
->data
.reassign_to_mempool(mempool::mempool_bluestore_cache_data
);
282 cache
->_add_buffer(b
, level
, near
);
284 cache
->_audit("_add_buffer end");
286 void _rm_buffer(Cache
* cache
, Buffer
*b
) {
287 _rm_buffer(cache
, buffer_map
.find(b
->offset
));
289 void _rm_buffer(Cache
* cache
,
290 map
<uint32_t, std::unique_ptr
<Buffer
>>::iterator p
) {
291 assert(p
!= buffer_map
.end());
292 cache
->_audit("_rm_buffer start");
293 if (p
->second
->is_writing()) {
294 writing
.erase(writing
.iterator_to(*p
->second
));
296 cache
->_rm_buffer(p
->second
.get());
299 cache
->_audit("_rm_buffer end");
302 map
<uint32_t,std::unique_ptr
<Buffer
>>::iterator
_data_lower_bound(
304 auto i
= buffer_map
.lower_bound(offset
);
305 if (i
!= buffer_map
.begin()) {
307 if (i
->first
+ i
->second
->length
<= offset
)
313 // must be called under protection of the Cache lock
314 void _clear(Cache
* cache
);
316 // return value is the highest cache_private of a trimmed buffer, or 0.
317 int discard(Cache
* cache
, uint32_t offset
, uint32_t length
) {
318 std::lock_guard
<std::recursive_mutex
> l(cache
->lock
);
319 return _discard(cache
, offset
, length
);
321 int _discard(Cache
* cache
, uint32_t offset
, uint32_t length
);
323 void write(Cache
* cache
, uint64_t seq
, uint32_t offset
, bufferlist
& bl
,
325 std::lock_guard
<std::recursive_mutex
> l(cache
->lock
);
326 Buffer
*b
= new Buffer(this, Buffer::STATE_WRITING
, seq
, offset
, bl
,
328 b
->cache_private
= _discard(cache
, offset
, bl
.length());
329 _add_buffer(cache
, b
, (flags
& Buffer::FLAG_NOCACHE
) ? 0 : 1, nullptr);
331 void finish_write(Cache
* cache
, uint64_t seq
);
332 void did_read(Cache
* cache
, uint32_t offset
, bufferlist
& bl
) {
333 std::lock_guard
<std::recursive_mutex
> l(cache
->lock
);
334 Buffer
*b
= new Buffer(this, Buffer::STATE_CLEAN
, 0, offset
, bl
);
335 b
->cache_private
= _discard(cache
, offset
, bl
.length());
336 _add_buffer(cache
, b
, 1, nullptr);
339 void read(Cache
* cache
, uint32_t offset
, uint32_t length
,
340 BlueStore::ready_regions_t
& res
,
341 interval_set
<uint32_t>& res_intervals
);
343 void truncate(Cache
* cache
, uint32_t offset
) {
344 discard(cache
, offset
, (uint32_t)-1 - offset
);
347 void split(Cache
* cache
, size_t pos
, BufferSpace
&r
);
349 void dump(Cache
* cache
, Formatter
*f
) const {
350 std::lock_guard
<std::recursive_mutex
> l(cache
->lock
);
351 f
->open_array_section("buffers");
352 for (auto& i
: buffer_map
) {
353 f
->open_object_section("buffer");
354 assert(i
.first
== i
.second
->offset
);
362 struct SharedBlobSet
;
364 /// in-memory shared blob state (incl cached buffers)
366 MEMPOOL_CLASS_HELPERS();
368 std::atomic_int nref
= {0}; ///< reference count
373 uint64_t sbid_unloaded
; ///< sbid if persistent isn't loaded
374 bluestore_shared_blob_t
*persistent
; ///< persistent part of the shared blob if any
376 BufferSpace bc
; ///< buffer cache
378 SharedBlob(Collection
*_coll
) : coll(_coll
), sbid_unloaded(0) {
380 get_cache()->add_blob();
383 SharedBlob(uint64_t i
, Collection
*_coll
);
386 uint64_t get_sbid() const {
387 return loaded
? persistent
->sbid
: sbid_unloaded
;
390 friend void intrusive_ptr_add_ref(SharedBlob
*b
) { b
->get(); }
391 friend void intrusive_ptr_release(SharedBlob
*b
) { b
->put(); }
393 friend ostream
& operator<<(ostream
& out
, const SharedBlob
& sb
);
400 /// get logical references
401 void get_ref(uint64_t offset
, uint32_t length
);
403 /// put logical references, and get back any released extents
404 void put_ref(uint64_t offset
, uint32_t length
,
405 PExtentVector
*r
, set
<SharedBlob
*> *maybe_unshared_blobs
);
407 friend bool operator==(const SharedBlob
&l
, const SharedBlob
&r
) {
408 return l
.get_sbid() == r
.get_sbid();
410 inline Cache
* get_cache() {
411 return coll
? coll
->cache
: nullptr;
413 inline SharedBlobSet
* get_parent() {
414 return coll
? &(coll
->shared_blob_set
) : nullptr;
416 inline bool is_loaded() const {
421 typedef boost::intrusive_ptr
<SharedBlob
> SharedBlobRef
;
423 /// a lookup table of SharedBlobs
424 struct SharedBlobSet
{
425 std::mutex lock
; ///< protect lookup, insertion, removal
427 // we use a bare pointer because we don't want to affect the ref
429 mempool::bluestore_cache_other::unordered_map
<uint64_t,SharedBlob
*> sb_map
;
431 SharedBlobRef
lookup(uint64_t sbid
) {
432 std::lock_guard
<std::mutex
> l(lock
);
433 auto p
= sb_map
.find(sbid
);
434 if (p
== sb_map
.end() ||
435 p
->second
->nref
== 0) {
441 void add(Collection
* coll
, SharedBlob
*sb
) {
442 std::lock_guard
<std::mutex
> l(lock
);
443 sb_map
[sb
->get_sbid()] = sb
;
447 void remove(SharedBlob
*sb
) {
448 std::lock_guard
<std::mutex
> l(lock
);
449 assert(sb
->get_parent() == this);
450 // only remove if it still points to us
451 auto p
= sb_map
.find(sb
->get_sbid());
452 if (p
!= sb_map
.end() &&
459 std::lock_guard
<std::mutex
> l(lock
);
460 return sb_map
.empty();
463 void dump(CephContext
*cct
, int lvl
);
466 //#define CACHE_BLOB_BL // not sure if this is a win yet or not... :/
468 /// in-memory blob metadata and associated cached buffers (if any)
470 MEMPOOL_CLASS_HELPERS();
472 std::atomic_int nref
= {0}; ///< reference count
473 int16_t id
= -1; ///< id, for spanning blobs only, >= 0
474 int16_t last_encoded_id
= -1; ///< (ephemeral) used during encoding only
475 SharedBlobRef shared_blob
; ///< shared blob state (if any)
478 mutable bluestore_blob_t blob
; ///< decoded blob metadata
480 mutable bufferlist blob_bl
; ///< cached encoded blob, blob is dirty if empty
482 /// refs from this shard. ephemeral if id<0, persisted if spanning.
483 bluestore_blob_use_tracker_t used_in_blob
;
487 friend void intrusive_ptr_add_ref(Blob
*b
) { b
->get(); }
488 friend void intrusive_ptr_release(Blob
*b
) { b
->put(); }
490 friend ostream
& operator<<(ostream
& out
, const Blob
&b
);
492 const bluestore_blob_use_tracker_t
& get_blob_use_tracker() const {
495 bool is_referenced() const {
496 return used_in_blob
.is_not_empty();
498 uint32_t get_referenced_bytes() const {
499 return used_in_blob
.get_referenced_bytes();
502 bool is_spanning() const {
506 bool can_split() const {
507 std::lock_guard
<std::recursive_mutex
> l(shared_blob
->get_cache()->lock
);
508 // splitting a BufferSpace writing list is too hard; don't try.
509 return shared_blob
->bc
.writing
.empty() &&
510 used_in_blob
.can_split() &&
511 get_blob().can_split();
514 bool can_split_at(uint32_t blob_offset
) const {
515 return used_in_blob
.can_split_at(blob_offset
) &&
516 get_blob().can_split_at(blob_offset
);
519 bool can_reuse_blob(uint32_t min_alloc_size
,
520 uint32_t target_blob_size
,
525 o
.shared_blob
= shared_blob
;
532 inline const bluestore_blob_t
& get_blob() const {
535 inline bluestore_blob_t
& dirty_blob() {
542 /// discard buffers for unallocated regions
543 void discard_unallocated(Collection
*coll
);
545 /// get logical references
546 void get_ref(Collection
*coll
, uint32_t offset
, uint32_t length
);
547 /// put logical references, and get back any released extents
548 bool put_ref(Collection
*coll
, uint32_t offset
, uint32_t length
,
552 void split(Collection
*coll
, uint32_t blob_offset
, Blob
*o
);
564 void _encode() const {
565 if (blob_bl
.length() == 0 ) {
566 ::encode(blob
, blob_bl
);
568 assert(blob_bl
.length());
573 bool include_ref_map
) const {
575 p
+= blob_bl
.length();
576 if (include_ref_map
) {
577 used_in_blob
.bound_encode(p
);
581 bufferlist::contiguous_appender
& p
,
582 bool include_ref_map
) const {
585 if (include_ref_map
) {
586 used_in_blob
.encode(p
);
590 Collection */
*coll*/
,
591 bufferptr::iterator
& p
,
592 bool include_ref_map
) {
593 const char *start
= p
.get_pos();
595 const char *end
= p
.get_pos();
597 blob_bl
.append(start
, end
- start
);
598 if (include_ref_map
) {
599 used_in_blob
.decode(p
);
607 bool include_ref_map
) const {
608 denc(blob
, p
, struct_v
);
609 if (blob
.is_shared()) {
612 if (include_ref_map
) {
613 used_in_blob
.bound_encode(p
);
617 bufferlist::contiguous_appender
& p
,
620 bool include_ref_map
) const {
621 denc(blob
, p
, struct_v
);
622 if (blob
.is_shared()) {
625 if (include_ref_map
) {
626 used_in_blob
.encode(p
);
631 bufferptr::iterator
& p
,
634 bool include_ref_map
);
637 typedef boost::intrusive_ptr
<Blob
> BlobRef
;
638 typedef mempool::bluestore_cache_other::map
<int,BlobRef
> blob_map_t
;
640 /// a logical extent, pointing to (some portion of) a blob
641 typedef boost::intrusive::set_base_hook
<boost::intrusive::optimize_size
<true> > ExtentBase
; //making an alias to avoid build warnings
642 struct Extent
: public ExtentBase
{
643 MEMPOOL_CLASS_HELPERS();
645 uint32_t logical_offset
= 0; ///< logical offset
646 uint32_t blob_offset
= 0; ///< blob offset
647 uint32_t length
= 0; ///< length
648 BlobRef blob
; ///< the blob with our data
650 /// ctor for lookup only
651 explicit Extent(uint32_t lo
) : ExtentBase(), logical_offset(lo
) { }
652 /// ctor for delayed initialization (see decode_some())
653 explicit Extent() : ExtentBase() {
655 /// ctor for general usage
656 Extent(uint32_t lo
, uint32_t o
, uint32_t l
, BlobRef
& b
)
658 logical_offset(lo
), blob_offset(o
), length(l
) {
663 blob
->shared_blob
->get_cache()->rm_extent();
667 void assign_blob(const BlobRef
& b
) {
670 blob
->shared_blob
->get_cache()->add_extent();
673 // comparators for intrusive_set
674 friend bool operator<(const Extent
&a
, const Extent
&b
) {
675 return a
.logical_offset
< b
.logical_offset
;
677 friend bool operator>(const Extent
&a
, const Extent
&b
) {
678 return a
.logical_offset
> b
.logical_offset
;
680 friend bool operator==(const Extent
&a
, const Extent
&b
) {
681 return a
.logical_offset
== b
.logical_offset
;
684 uint32_t blob_start() const {
685 return logical_offset
- blob_offset
;
688 uint32_t blob_end() const {
689 return blob_start() + blob
->get_blob().get_logical_length();
692 uint32_t logical_end() const {
693 return logical_offset
+ length
;
696 // return true if any piece of the blob is out of
697 // the given range [o, o + l].
698 bool blob_escapes_range(uint32_t o
, uint32_t l
) const {
699 return blob_start() < o
|| blob_end() > o
+ l
;
702 typedef boost::intrusive::set
<Extent
> extent_map_t
;
705 friend ostream
& operator<<(ostream
& out
, const Extent
& e
);
708 boost::intrusive::list_member_hook
<> old_extent_item
;
711 bool blob_empty
; // flag to track the last removed extent that makes blob
712 // empty - required to update compression stat properly
713 OldExtent(uint32_t lo
, uint32_t o
, uint32_t l
, BlobRef
& b
)
714 : e(lo
, o
, l
, b
), blob_empty(false) {
716 static OldExtent
* create(CollectionRef c
,
722 typedef boost::intrusive::list
<
724 boost::intrusive::member_hook
<
726 boost::intrusive::list_member_hook
<>,
727 &OldExtent::old_extent_item
> > old_extent_map_t
;
731 /// a sharded extent map, mapping offsets to lextents to blobs
734 extent_map_t extent_map
; ///< map of Extents to Blobs
735 blob_map_t spanning_blob_map
; ///< blobs that span shards
738 bluestore_onode_t::shard_info
*shard_info
= nullptr;
739 unsigned extents
= 0; ///< count extents in this shard
740 bool loaded
= false; ///< true if shard is loaded
741 bool dirty
= false; ///< true if shard is dirty and needs reencoding
743 mempool::bluestore_cache_other::vector
<Shard
> shards
; ///< shards
745 bufferlist inline_bl
; ///< cached encoded map, if unsharded; empty=>dirty
747 uint32_t needs_reshard_begin
= 0;
748 uint32_t needs_reshard_end
= 0;
750 bool needs_reshard() const {
751 return needs_reshard_end
> needs_reshard_begin
;
753 void clear_needs_reshard() {
754 needs_reshard_begin
= needs_reshard_end
= 0;
756 void request_reshard(uint32_t begin
, uint32_t end
) {
757 if (begin
< needs_reshard_begin
) {
758 needs_reshard_begin
= begin
;
760 if (end
> needs_reshard_end
) {
761 needs_reshard_end
= end
;
765 struct DeleteDisposer
{
766 void operator()(Extent
*e
) { delete e
; }
771 extent_map
.clear_and_dispose(DeleteDisposer());
775 extent_map
.clear_and_dispose(DeleteDisposer());
778 clear_needs_reshard();
781 bool encode_some(uint32_t offset
, uint32_t length
, bufferlist
& bl
,
783 unsigned decode_some(bufferlist
& bl
);
785 void bound_encode_spanning_blobs(size_t& p
);
786 void encode_spanning_blobs(bufferlist::contiguous_appender
& p
);
787 void decode_spanning_blobs(bufferptr::iterator
& p
);
789 BlobRef
get_spanning_blob(int id
) {
790 auto p
= spanning_blob_map
.find(id
);
791 assert(p
!= spanning_blob_map
.end());
795 void update(KeyValueDB::Transaction t
, bool force
);
796 decltype(BlueStore::Blob::id
) allocate_spanning_blob_id();
799 KeyValueDB::Transaction t
);
801 /// initialize Shards from the onode
802 void init_shards(bool loaded
, bool dirty
);
804 /// return index of shard containing offset
805 /// or -1 if not found
806 int seek_shard(uint32_t offset
) {
807 size_t end
= shards
.size();
808 size_t mid
, left
= 0;
809 size_t right
= end
; // one passed the right end
811 while (left
< right
) {
812 mid
= left
+ (right
- left
) / 2;
813 if (offset
>= shards
[mid
].shard_info
->offset
) {
814 size_t next
= mid
+ 1;
815 if (next
>= end
|| offset
< shards
[next
].shard_info
->offset
)
817 //continue to search forwards
820 //continue to search backwards
825 return -1; // not found
828 /// check if a range spans a shard
829 bool spans_shard(uint32_t offset
, uint32_t length
) {
830 if (shards
.empty()) {
833 int s
= seek_shard(offset
);
835 if (s
== (int)shards
.size() - 1) {
836 return false; // last shard
838 if (offset
+ length
<= shards
[s
+1].shard_info
->offset
) {
844 /// ensure that a range of the map is loaded
845 void fault_range(KeyValueDB
*db
,
846 uint32_t offset
, uint32_t length
);
848 /// ensure a range of the map is marked dirty
849 void dirty_range(uint32_t offset
, uint32_t length
);
851 /// for seek_lextent test
852 extent_map_t::iterator
find(uint64_t offset
);
854 /// seek to the first lextent including or after offset
855 extent_map_t::iterator
seek_lextent(uint64_t offset
);
856 extent_map_t::const_iterator
seek_lextent(uint64_t offset
) const;
859 void add(uint32_t lo
, uint32_t o
, uint32_t l
, BlobRef
& b
) {
860 extent_map
.insert(*new Extent(lo
, o
, l
, b
));
863 /// remove (and delete) an Extent
864 void rm(extent_map_t::iterator p
) {
865 extent_map
.erase_and_dispose(p
, DeleteDisposer());
868 bool has_any_lextents(uint64_t offset
, uint64_t length
);
870 /// consolidate adjacent lextents in extent_map
871 int compress_extent_map(uint64_t offset
, uint64_t length
);
873 /// punch a logical hole. add lextents to deref to target list.
874 void punch_hole(CollectionRef
&c
,
875 uint64_t offset
, uint64_t length
,
876 old_extent_map_t
*old_extents
);
878 /// put new lextent into lextent_map overwriting existing ones if
879 /// any and update references accordingly
880 Extent
*set_lextent(CollectionRef
&c
,
881 uint64_t logical_offset
,
882 uint64_t offset
, uint64_t length
,
884 old_extent_map_t
*old_extents
);
886 /// split a blob (and referring extents)
887 BlobRef
split_blob(BlobRef lb
, uint32_t blob_offset
, uint32_t pos
);
890 /// Compressed Blob Garbage collector
892 The primary idea of the collector is to estimate a difference between
893 allocation units(AU) currently present for compressed blobs and new AUs
894 required to store that data uncompressed.
895 Estimation is performed for protrusive extents within a logical range
896 determined by a concatenation of old_extents collection and specific(current)
898 The root cause for old_extents use is the need to handle blob ref counts
899 properly. Old extents still hold blob refs and hence we need to traverse
900 the collection to determine if blob to be released.
901 Protrusive extents are extents that fit into the blob set in action
902 (ones that are below the logical range from above) but not removed totally
903 due to the current write.
905 extent1 <loffs = 100, boffs = 100, len = 100> ->
906 blob1<compressed, len_on_disk=4096, logical_len=8192>
907 extent2 <loffs = 200, boffs = 200, len = 100> ->
908 blob2<raw, len_on_disk=4096, llen=4096>
909 extent3 <loffs = 300, boffs = 300, len = 100> ->
910 blob1<compressed, len_on_disk=4096, llen=8192>
911 extent4 <loffs = 4096, boffs = 0, len = 100> ->
912 blob3<raw, len_on_disk=4096, llen=4096>
914 protrusive extents are within the following ranges <0~300, 400~8192-400>
915 In this case existing AUs that might be removed due to GC (i.e. blob1)
917 And new AUs expected after GC = 0 since extent1 to be merged into blob2.
918 Hence we should do a collect.
920 class GarbageCollector
923 /// return amount of allocation units that might be saved due to GC
927 const ExtentMap
& extent_map
,
928 const old_extent_map_t
& old_extents
,
929 uint64_t min_alloc_size
);
931 /// return a collection of extents to perform GC on
932 const vector
<AllocExtent
>& get_extents_to_collect() const {
933 return extents_to_collect
;
935 GarbageCollector(CephContext
* _cct
) : cct(_cct
) {}
939 uint64_t referenced_bytes
= 0; ///< amount of bytes referenced in blob
940 int64_t expected_allocations
= 0; ///< new alloc units required
941 ///< in case of gc fulfilled
942 bool collect_candidate
= false; ///< indicate if blob has any extents
943 ///< eligible for GC.
944 extent_map_t::const_iterator first_lextent
; ///< points to the first
945 ///< lextent referring to
946 ///< the blob if any.
947 ///< collect_candidate flag
948 ///< determines the validity
949 extent_map_t::const_iterator last_lextent
; ///< points to the last
950 ///< lextent referring to
951 ///< the blob if any.
953 BlobInfo(uint64_t ref_bytes
) :
954 referenced_bytes(ref_bytes
) {
958 map
<Blob
*, BlobInfo
> affected_blobs
; ///< compressed blobs and their ref_map
959 ///< copies that are affected by the
962 vector
<AllocExtent
> extents_to_collect
; ///< protrusive extents that should
963 ///< be collected if GC takes place
965 boost::optional
<uint64_t > used_alloc_unit
; ///< last processed allocation
966 ///< unit when traversing
967 ///< protrusive extents.
968 ///< Other extents mapped to
969 ///< this AU to be ignored
970 ///< (except the case where
971 ///< uncompressed extent follows
972 ///< compressed one - see below).
973 BlobInfo
* blob_info_counted
= nullptr; ///< set if previous allocation unit
974 ///< caused expected_allocations
975 ///< counter increment at this blob.
976 ///< if uncompressed extent follows
977 ///< a decrement for the
978 ///< expected_allocations counter
980 int64_t expected_allocations
= 0; ///< new alloc units required in case
982 int64_t expected_for_release
= 0; ///< alloc units currently used by
983 ///< compressed blobs that might
985 uint64_t gc_start_offset
; ///starting offset for GC
986 uint64_t gc_end_offset
; ///ending offset for GC
989 void process_protrusive_extents(const BlueStore::ExtentMap
& extent_map
,
990 uint64_t start_offset
,
992 uint64_t start_touch_offset
,
993 uint64_t end_touch_offset
,
994 uint64_t min_alloc_size
);
999 /// an in-memory object
1001 MEMPOOL_CLASS_HELPERS();
1003 std::atomic_int nref
; ///< reference count
1008 /// key under PREFIX_OBJ where we are stored
1009 mempool::bluestore_cache_other::string key
;
1011 boost::intrusive::list_member_hook
<> lru_item
;
1013 bluestore_onode_t onode
; ///< metadata stored as value in kv store
1014 bool exists
; ///< true if object logically exists
1016 ExtentMap extent_map
;
1018 // track txc's that have not been committed to kv store (and whose
1019 // effects cannot be read via the kvdb read methods)
1020 std::atomic
<int> flushing_count
= {0};
1021 std::mutex flush_lock
; ///< protect flush_txns
1022 std::condition_variable flush_cond
; ///< wait here for uncommitted txns
1024 Onode(Collection
*c
, const ghobject_t
& o
,
1025 const mempool::bluestore_cache_other::string
& k
)
1043 typedef boost::intrusive_ptr
<Onode
> OnodeRef
;
1046 /// a cache (shard) of onodes and buffers
1049 PerfCounters
*logger
;
1050 std::recursive_mutex lock
; ///< protect lru and other structures
1052 std::atomic
<uint64_t> num_extents
= {0};
1053 std::atomic
<uint64_t> num_blobs
= {0};
1055 static Cache
*create(CephContext
* cct
, string type
, PerfCounters
*logger
);
1057 Cache(CephContext
* cct
) : cct(cct
), logger(nullptr) {}
1060 virtual void _add_onode(OnodeRef
& o
, int level
) = 0;
1061 virtual void _rm_onode(OnodeRef
& o
) = 0;
1062 virtual void _touch_onode(OnodeRef
& o
) = 0;
1064 virtual void _add_buffer(Buffer
*b
, int level
, Buffer
*near
) = 0;
1065 virtual void _rm_buffer(Buffer
*b
) = 0;
1066 virtual void _move_buffer(Cache
*src
, Buffer
*b
) = 0;
1067 virtual void _adjust_buffer_size(Buffer
*b
, int64_t delta
) = 0;
1068 virtual void _touch_buffer(Buffer
*b
) = 0;
1070 virtual uint64_t _get_num_onodes() = 0;
1071 virtual uint64_t _get_buffer_bytes() = 0;
1087 void trim(uint64_t target_bytes
,
1088 float target_meta_ratio
,
1089 float target_data_ratio
,
1090 float bytes_per_onode
);
1094 virtual void _trim(uint64_t onode_max
, uint64_t buffer_max
) = 0;
1096 virtual void add_stats(uint64_t *onodes
, uint64_t *extents
,
1099 uint64_t *bytes
) = 0;
1102 std::lock_guard
<std::recursive_mutex
> l(lock
);
1103 return _get_num_onodes() == 0 && _get_buffer_bytes() == 0;
1107 virtual void _audit(const char *s
) = 0;
1109 void _audit(const char *s
) { /* no-op */ }
1113 /// simple LRU cache for onodes and buffers
1114 struct LRUCache
: public Cache
{
1116 typedef boost::intrusive::list
<
1118 boost::intrusive::member_hook
<
1120 boost::intrusive::list_member_hook
<>,
1121 &Onode::lru_item
> > onode_lru_list_t
;
1122 typedef boost::intrusive::list
<
1124 boost::intrusive::member_hook
<
1126 boost::intrusive::list_member_hook
<>,
1127 &Buffer::lru_item
> > buffer_lru_list_t
;
1129 onode_lru_list_t onode_lru
;
1131 buffer_lru_list_t buffer_lru
;
1132 uint64_t buffer_size
= 0;
1135 LRUCache(CephContext
* cct
) : Cache(cct
) {}
1136 uint64_t _get_num_onodes() override
{
1137 return onode_lru
.size();
1139 void _add_onode(OnodeRef
& o
, int level
) override
{
1141 onode_lru
.push_front(*o
);
1143 onode_lru
.push_back(*o
);
1145 void _rm_onode(OnodeRef
& o
) override
{
1146 auto q
= onode_lru
.iterator_to(*o
);
1149 void _touch_onode(OnodeRef
& o
) override
;
1151 uint64_t _get_buffer_bytes() override
{
1154 void _add_buffer(Buffer
*b
, int level
, Buffer
*near
) override
{
1156 auto q
= buffer_lru
.iterator_to(*near
);
1157 buffer_lru
.insert(q
, *b
);
1158 } else if (level
> 0) {
1159 buffer_lru
.push_front(*b
);
1161 buffer_lru
.push_back(*b
);
1163 buffer_size
+= b
->length
;
1165 void _rm_buffer(Buffer
*b
) override
{
1166 assert(buffer_size
>= b
->length
);
1167 buffer_size
-= b
->length
;
1168 auto q
= buffer_lru
.iterator_to(*b
);
1169 buffer_lru
.erase(q
);
1171 void _move_buffer(Cache
*src
, Buffer
*b
) override
{
1173 _add_buffer(b
, 0, nullptr);
1175 void _adjust_buffer_size(Buffer
*b
, int64_t delta
) override
{
1176 assert((int64_t)buffer_size
+ delta
>= 0);
1177 buffer_size
+= delta
;
1179 void _touch_buffer(Buffer
*b
) override
{
1180 auto p
= buffer_lru
.iterator_to(*b
);
1181 buffer_lru
.erase(p
);
1182 buffer_lru
.push_front(*b
);
1183 _audit("_touch_buffer end");
1186 void _trim(uint64_t onode_max
, uint64_t buffer_max
) override
;
1188 void add_stats(uint64_t *onodes
, uint64_t *extents
,
1191 uint64_t *bytes
) override
{
1192 std::lock_guard
<std::recursive_mutex
> l(lock
);
1193 *onodes
+= onode_lru
.size();
1194 *extents
+= num_extents
;
1195 *blobs
+= num_blobs
;
1196 *buffers
+= buffer_lru
.size();
1197 *bytes
+= buffer_size
;
1201 void _audit(const char *s
) override
;
1205 // 2Q cache for buffers, LRU for onodes
1206 struct TwoQCache
: public Cache
{
1208 // stick with LRU for onodes for now (fixme?)
1209 typedef boost::intrusive::list
<
1211 boost::intrusive::member_hook
<
1213 boost::intrusive::list_member_hook
<>,
1214 &Onode::lru_item
> > onode_lru_list_t
;
1215 typedef boost::intrusive::list
<
1217 boost::intrusive::member_hook
<
1219 boost::intrusive::list_member_hook
<>,
1220 &Buffer::lru_item
> > buffer_list_t
;
1222 onode_lru_list_t onode_lru
;
1224 buffer_list_t buffer_hot
; ///< "Am" hot buffers
1225 buffer_list_t buffer_warm_in
; ///< "A1in" newly warm buffers
1226 buffer_list_t buffer_warm_out
; ///< "A1out" empty buffers we've evicted
1227 uint64_t buffer_bytes
= 0; ///< bytes
1231 BUFFER_WARM_IN
, ///< in buffer_warm_in
1232 BUFFER_WARM_OUT
, ///< in buffer_warm_out
1233 BUFFER_HOT
, ///< in buffer_hot
1237 uint64_t buffer_list_bytes
[BUFFER_TYPE_MAX
] = {0}; ///< bytes per type
1240 TwoQCache(CephContext
* cct
) : Cache(cct
) {}
1241 uint64_t _get_num_onodes() override
{
1242 return onode_lru
.size();
1244 void _add_onode(OnodeRef
& o
, int level
) override
{
1246 onode_lru
.push_front(*o
);
1248 onode_lru
.push_back(*o
);
1250 void _rm_onode(OnodeRef
& o
) override
{
1251 auto q
= onode_lru
.iterator_to(*o
);
1254 void _touch_onode(OnodeRef
& o
) override
;
1256 uint64_t _get_buffer_bytes() override
{
1257 return buffer_bytes
;
1259 void _add_buffer(Buffer
*b
, int level
, Buffer
*near
) override
;
1260 void _rm_buffer(Buffer
*b
) override
;
1261 void _move_buffer(Cache
*src
, Buffer
*b
) override
;
1262 void _adjust_buffer_size(Buffer
*b
, int64_t delta
) override
;
1263 void _touch_buffer(Buffer
*b
) override
{
1264 switch (b
->cache_private
) {
1265 case BUFFER_WARM_IN
:
1266 // do nothing (somewhat counter-intuitively!)
1268 case BUFFER_WARM_OUT
:
1269 // move from warm_out to hot LRU
1270 assert(0 == "this happens via discard hint");
1273 // move to front of hot LRU
1274 buffer_hot
.erase(buffer_hot
.iterator_to(*b
));
1275 buffer_hot
.push_front(*b
);
1278 _audit("_touch_buffer end");
1281 void _trim(uint64_t onode_max
, uint64_t buffer_max
) override
;
1283 void add_stats(uint64_t *onodes
, uint64_t *extents
,
1286 uint64_t *bytes
) override
{
1287 std::lock_guard
<std::recursive_mutex
> l(lock
);
1288 *onodes
+= onode_lru
.size();
1289 *extents
+= num_extents
;
1290 *blobs
+= num_blobs
;
1291 *buffers
+= buffer_hot
.size() + buffer_warm_in
.size();
1292 *bytes
+= buffer_bytes
;
1296 void _audit(const char *s
) override
;
1305 mempool::bluestore_cache_other::unordered_map
<ghobject_t
,OnodeRef
> onode_map
;
1307 friend class Collection
; // for split_cache()
1310 OnodeSpace(Cache
*c
) : cache(c
) {}
1315 OnodeRef
add(const ghobject_t
& oid
, OnodeRef o
);
1316 OnodeRef
lookup(const ghobject_t
& o
);
1317 void remove(const ghobject_t
& oid
) {
1318 onode_map
.erase(oid
);
1320 void rename(OnodeRef
& o
, const ghobject_t
& old_oid
,
1321 const ghobject_t
& new_oid
,
1322 const mempool::bluestore_cache_other::string
& new_okey
);
1326 void dump(CephContext
*cct
, int lvl
);
1328 /// return true if f true for any item
1329 bool map_any(std::function
<bool(OnodeRef
)> f
);
1332 struct Collection
: public CollectionImpl
{
1334 Cache
*cache
; ///< our cache shard
1336 bluestore_cnode_t cnode
;
1341 SharedBlobSet shared_blob_set
; ///< open SharedBlobs
1343 // cache onodes on a per-collection basis to avoid lock
1345 OnodeSpace onode_map
;
1348 pool_opts_t pool_opts
;
1350 OnodeRef
get_onode(const ghobject_t
& oid
, bool create
);
1352 // the terminology is confusing here, sorry!
1354 // blob_t shared_blob_t
1355 // !shared unused -> open
1356 // shared !loaded -> open + shared
1357 // shared loaded -> open + shared + loaded
1360 // open = SharedBlob is instantiated
1361 // shared = blob_t shared flag is set; SharedBlob is hashed.
1362 // loaded = SharedBlob::shared_blob_t is loaded from kv store
1363 void open_shared_blob(uint64_t sbid
, BlobRef b
);
1364 void load_shared_blob(SharedBlobRef sb
);
1365 void make_blob_shared(uint64_t sbid
, BlobRef b
);
1366 uint64_t make_blob_unshared(SharedBlob
*sb
);
1368 BlobRef
new_blob() {
1369 BlobRef b
= new Blob();
1370 b
->shared_blob
= new SharedBlob(this);
1374 const coll_t
&get_cid() override
{
1378 bool contains(const ghobject_t
& oid
) {
1380 return oid
.hobj
.pool
== -1;
1382 if (cid
.is_pg(&spgid
))
1384 spgid
.pgid
.contains(cnode
.bits
, oid
) &&
1385 oid
.shard_id
== spgid
.shard
;
1389 void split_cache(Collection
*dest
);
1391 Collection(BlueStore
*ns
, Cache
*ca
, coll_t c
);
1394 class OmapIteratorImpl
: public ObjectMap::ObjectMapIteratorImpl
{
1397 KeyValueDB::Iterator it
;
1400 OmapIteratorImpl(CollectionRef c
, OnodeRef o
, KeyValueDB::Iterator it
);
1401 int seek_to_first() override
;
1402 int upper_bound(const string
&after
) override
;
1403 int lower_bound(const string
&to
) override
;
1404 bool valid() override
;
1405 int next(bool validate
=true) override
;
1406 string
key() override
;
1407 bufferlist
value() override
;
1408 int status() override
{
1414 typedef boost::intrusive_ptr
<OpSequencer
> OpSequencerRef
;
1416 struct volatile_statfs
{
1418 STATFS_ALLOCATED
= 0,
1420 STATFS_COMPRESSED_ORIGINAL
,
1422 STATFS_COMPRESSED_ALLOCATED
,
1425 int64_t values
[STATFS_LAST
];
1427 memset(this, 0, sizeof(volatile_statfs
));
1430 *this = volatile_statfs();
1432 volatile_statfs
& operator+=(const volatile_statfs
& other
) {
1433 for (size_t i
= 0; i
< STATFS_LAST
; ++i
) {
1434 values
[i
] += other
.values
[i
];
1438 int64_t& allocated() {
1439 return values
[STATFS_ALLOCATED
];
1442 return values
[STATFS_STORED
];
1444 int64_t& compressed_original() {
1445 return values
[STATFS_COMPRESSED_ORIGINAL
];
1447 int64_t& compressed() {
1448 return values
[STATFS_COMPRESSED
];
1450 int64_t& compressed_allocated() {
1451 return values
[STATFS_COMPRESSED_ALLOCATED
];
1454 return values
[STATFS_ALLOCATED
] == 0 &&
1455 values
[STATFS_STORED
] == 0 &&
1456 values
[STATFS_COMPRESSED
] == 0 &&
1457 values
[STATFS_COMPRESSED_ORIGINAL
] == 0 &&
1458 values
[STATFS_COMPRESSED_ALLOCATED
] == 0;
1460 void decode(bufferlist::iterator
& it
) {
1461 for (size_t i
= 0; i
< STATFS_LAST
; i
++) {
1462 ::decode(values
[i
], it
);
1466 void encode(bufferlist
& bl
) {
1467 for (size_t i
= 0; i
< STATFS_LAST
; i
++) {
1468 ::encode(values
[i
], bl
);
1473 struct TransContext
: public AioContext
{
1474 MEMPOOL_CLASS_HELPERS();
1480 STATE_KV_QUEUED
, // queued for kv_sync_thread submission
1481 STATE_KV_SUBMITTED
, // submitted to kv; not yet synced
1483 STATE_DEFERRED_QUEUED
, // in deferred_queue (pending or running)
1484 STATE_DEFERRED_CLEANUP
, // remove deferred kv record
1485 STATE_DEFERRED_DONE
,
1490 state_t state
= STATE_PREPARE
;
1492 const char *get_state_name() {
1494 case STATE_PREPARE
: return "prepare";
1495 case STATE_AIO_WAIT
: return "aio_wait";
1496 case STATE_IO_DONE
: return "io_done";
1497 case STATE_KV_QUEUED
: return "kv_queued";
1498 case STATE_KV_SUBMITTED
: return "kv_submitted";
1499 case STATE_KV_DONE
: return "kv_done";
1500 case STATE_DEFERRED_QUEUED
: return "deferred_queued";
1501 case STATE_DEFERRED_CLEANUP
: return "deferred_cleanup";
1502 case STATE_DEFERRED_DONE
: return "deferred_done";
1503 case STATE_FINISHING
: return "finishing";
1504 case STATE_DONE
: return "done";
1509 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1510 const char *get_state_latency_name(int state
) {
1512 case l_bluestore_state_prepare_lat
: return "prepare";
1513 case l_bluestore_state_aio_wait_lat
: return "aio_wait";
1514 case l_bluestore_state_io_done_lat
: return "io_done";
1515 case l_bluestore_state_kv_queued_lat
: return "kv_queued";
1516 case l_bluestore_state_kv_committing_lat
: return "kv_committing";
1517 case l_bluestore_state_kv_done_lat
: return "kv_done";
1518 case l_bluestore_state_deferred_queued_lat
: return "deferred_queued";
1519 case l_bluestore_state_deferred_cleanup_lat
: return "deferred_cleanup";
1520 case l_bluestore_state_finishing_lat
: return "finishing";
1521 case l_bluestore_state_done_lat
: return "done";
1527 void log_state_latency(PerfCounters
*logger
, int state
) {
1528 utime_t lat
, now
= ceph_clock_now();
1529 lat
= now
- last_stamp
;
1530 logger
->tinc(state
, lat
);
1531 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1532 if (state
>= l_bluestore_state_prepare_lat
&& state
<= l_bluestore_state_done_lat
) {
1533 double usecs
= (now
.to_nsec()-last_stamp
.to_nsec())/1000;
1534 OID_ELAPSED("", usecs
, get_state_latency_name(state
));
1541 boost::intrusive::list_member_hook
<> sequencer_item
;
1543 uint64_t bytes
= 0, cost
= 0;
1545 set
<OnodeRef
> onodes
; ///< these need to be updated/written
1546 set
<OnodeRef
> modified_objects
; ///< objects we modified (and need a ref)
1547 set
<SharedBlobRef
> shared_blobs
; ///< these need to be updated/written
1548 set
<SharedBlobRef
> shared_blobs_written
; ///< update these on io completion
1550 KeyValueDB::Transaction t
; ///< then we will commit this
1551 Context
*oncommit
= nullptr; ///< signal on commit
1552 Context
*onreadable
= nullptr; ///< signal on readable
1553 Context
*onreadable_sync
= nullptr; ///< signal on readable
1554 list
<Context
*> oncommits
; ///< more commit completions
1555 list
<CollectionRef
> removed_collections
; ///< colls we removed
1557 boost::intrusive::list_member_hook
<> deferred_queue_item
;
1558 bluestore_deferred_transaction_t
*deferred_txn
= nullptr; ///< if any
1560 interval_set
<uint64_t> allocated
, released
;
1561 volatile_statfs statfs_delta
;
1564 bool had_ios
= false; ///< true if we submitted IOs before our kv txn
1570 uint64_t last_nid
= 0; ///< if non-zero, highest new nid we allocated
1571 uint64_t last_blobid
= 0; ///< if non-zero, highest new blobid we allocated
1573 explicit TransContext(CephContext
* cct
, OpSequencer
*o
)
1576 start(ceph_clock_now()) {
1580 delete deferred_txn
;
1583 void write_onode(OnodeRef
&o
) {
1586 void write_shared_blob(SharedBlobRef
&sb
) {
1587 shared_blobs
.insert(sb
);
1589 void unshare_blob(SharedBlob
*sb
) {
1590 shared_blobs
.erase(sb
);
1593 /// note we logically modified object (when onode itself is unmodified)
1594 void note_modified_object(OnodeRef
&o
) {
1595 // onode itself isn't written, though
1596 modified_objects
.insert(o
);
1598 void removed(OnodeRef
& o
) {
1600 modified_objects
.erase(o
);
1603 void aio_finish(BlueStore
*store
) override
{
1604 store
->txc_aio_finish(this);
1608 typedef boost::intrusive::list
<
1610 boost::intrusive::member_hook
<
1612 boost::intrusive::list_member_hook
<>,
1613 &TransContext::deferred_queue_item
> > deferred_queue_t
;
1615 struct DeferredBatch
: public AioContext
{
1617 struct deferred_io
{
1618 bufferlist bl
; ///< data
1619 uint64_t seq
; ///< deferred transaction seq
1621 map
<uint64_t,deferred_io
> iomap
; ///< map of ios in this batch
1622 deferred_queue_t txcs
; ///< txcs in this batch
1623 IOContext ioc
; ///< our aios
1624 /// bytes of pending io for each deferred seq (may be 0)
1625 map
<uint64_t,int> seq_bytes
;
1627 void _discard(CephContext
*cct
, uint64_t offset
, uint64_t length
);
1628 void _audit(CephContext
*cct
);
1630 DeferredBatch(CephContext
*cct
, OpSequencer
*osr
)
1631 : osr(osr
), ioc(cct
, this) {}
1634 void prepare_write(CephContext
*cct
,
1635 uint64_t seq
, uint64_t offset
, uint64_t length
,
1636 bufferlist::const_iterator
& p
);
1638 void aio_finish(BlueStore
*store
) override
{
1639 store
->_deferred_aio_finish(osr
);
1643 class OpSequencer
: public Sequencer_impl
{
1646 std::condition_variable qcond
;
1647 typedef boost::intrusive::list
<
1649 boost::intrusive::member_hook
<
1651 boost::intrusive::list_member_hook
<>,
1652 &TransContext::sequencer_item
> > q_list_t
;
1653 q_list_t q
; ///< transactions
1655 boost::intrusive::list_member_hook
<> deferred_osr_queue_item
;
1657 DeferredBatch
*deferred_running
= nullptr;
1658 DeferredBatch
*deferred_pending
= nullptr;
1663 uint64_t last_seq
= 0;
1665 std::atomic_int txc_with_unstable_io
= {0}; ///< num txcs with unstable io
1667 std::atomic_int kv_committing_serially
= {0};
1669 std::atomic_int kv_submitted_waiters
= {0};
1671 std::atomic_bool registered
= {true}; ///< registered in BlueStore's osr_set
1672 std::atomic_bool zombie
= {false}; ///< owning Sequencer has gone away
1674 OpSequencer(CephContext
* cct
, BlueStore
*store
)
1675 : Sequencer_impl(cct
),
1676 parent(NULL
), store(store
) {
1677 store
->register_osr(this);
1679 ~OpSequencer() override
{
1684 void discard() override
{
1685 // Note that we may have txc's in flight when the parent Sequencer
1686 // goes away. Reflect this with zombie==registered==true and let
1687 // _osr_drain_all clean up later.
1693 std::lock_guard
<std::mutex
> l(qlock
);
1701 void _unregister() {
1703 store
->unregister_osr(this);
1708 void queue_new(TransContext
*txc
) {
1709 std::lock_guard
<std::mutex
> l(qlock
);
1710 txc
->seq
= ++last_seq
;
1715 std::unique_lock
<std::mutex
> l(qlock
);
1720 void drain_preceding(TransContext
*txc
) {
1721 std::unique_lock
<std::mutex
> l(qlock
);
1722 while (!q
.empty() && &q
.front() != txc
)
1726 bool _is_all_kv_submitted() {
1727 // caller must hold qlock
1731 TransContext
*txc
= &q
.back();
1732 if (txc
->state
>= TransContext::STATE_KV_SUBMITTED
) {
1738 void flush() override
{
1739 std::unique_lock
<std::mutex
> l(qlock
);
1741 // set flag before the check because the condition
1742 // may become true outside qlock, and we need to make
1743 // sure those threads see waiters and signal qcond.
1744 ++kv_submitted_waiters
;
1745 if (_is_all_kv_submitted()) {
1749 --kv_submitted_waiters
;
1753 bool flush_commit(Context
*c
) override
{
1754 std::lock_guard
<std::mutex
> l(qlock
);
1758 TransContext
*txc
= &q
.back();
1759 if (txc
->state
>= TransContext::STATE_KV_DONE
) {
1762 txc
->oncommits
.push_back(c
);
1767 typedef boost::intrusive::list
<
1769 boost::intrusive::member_hook
<
1771 boost::intrusive::list_member_hook
<>,
1772 &OpSequencer::deferred_osr_queue_item
> > deferred_osr_queue_t
;
1774 struct KVSyncThread
: public Thread
{
1776 explicit KVSyncThread(BlueStore
*s
) : store(s
) {}
1777 void *entry() override
{
1778 store
->_kv_sync_thread();
1782 struct KVFinalizeThread
: public Thread
{
1784 explicit KVFinalizeThread(BlueStore
*s
) : store(s
) {}
1786 store
->_kv_finalize_thread();
1791 struct DBHistogram
{
1800 map
<int, struct value_dist
> val_map
; ///< slab id to count, max length of value and key
1803 map
<string
, map
<int, struct key_dist
> > key_hist
;
1804 map
<int, uint64_t> value_hist
;
1805 int get_key_slab(size_t sz
);
1806 string
get_key_slab_to_range(int slab
);
1807 int get_value_slab(size_t sz
);
1808 string
get_value_slab_to_range(int slab
);
1809 void update_hist_entry(map
<string
, map
<int, struct key_dist
> > &key_hist
,
1810 const string
&prefix
, size_t key_size
, size_t value_size
);
1811 void dump(Formatter
*f
);
1814 // --------------------------------------------------------
1817 BlueFS
*bluefs
= nullptr;
1818 unsigned bluefs_shared_bdev
= 0; ///< which bluefs bdev we are sharing
1819 bool bluefs_single_shared_device
= true;
1820 utime_t bluefs_last_balance
;
1822 KeyValueDB
*db
= nullptr;
1823 BlockDevice
*bdev
= nullptr;
1824 std::string freelist_type
;
1825 FreelistManager
*fm
= nullptr;
1826 Allocator
*alloc
= nullptr;
1828 int path_fd
= -1; ///< open handle to $path
1829 int fsid_fd
= -1; ///< open handle (locked) to $path/fsid
1830 bool mounted
= false;
1832 RWLock coll_lock
= {"BlueStore::coll_lock"}; ///< rwlock to protect coll_map
1833 mempool::bluestore_cache_other::unordered_map
<coll_t
, CollectionRef
> coll_map
;
1835 vector
<Cache
*> cache_shards
;
1837 std::mutex osr_lock
; ///< protect osd_set
1838 std::set
<OpSequencerRef
> osr_set
; ///< set of all OpSequencers
1840 std::atomic
<uint64_t> nid_last
= {0};
1841 std::atomic
<uint64_t> nid_max
= {0};
1842 std::atomic
<uint64_t> blobid_last
= {0};
1843 std::atomic
<uint64_t> blobid_max
= {0};
1845 Throttle throttle_bytes
; ///< submit to commit
1846 Throttle throttle_deferred_bytes
; ///< submit to deferred complete
1848 interval_set
<uint64_t> bluefs_extents
; ///< block extents owned by bluefs
1849 interval_set
<uint64_t> bluefs_extents_reclaiming
; ///< currently reclaiming
1851 std::mutex deferred_lock
;
1852 std::atomic
<uint64_t> deferred_seq
= {0};
1853 deferred_osr_queue_t deferred_queue
; ///< osr's with deferred io pending
1854 int deferred_queue_size
= 0; ///< num txc's queued across all osrs
1855 atomic_int deferred_aggressive
= {0}; ///< aggressive wakeup of kv thread
1856 Finisher deferred_finisher
;
1858 int m_finisher_num
= 1;
1859 vector
<Finisher
*> finishers
;
1861 KVSyncThread kv_sync_thread
;
1863 std::condition_variable kv_cond
;
1864 bool _kv_only
= false;
1865 bool kv_sync_started
= false;
1866 bool kv_stop
= false;
1867 bool kv_finalize_started
= false;
1868 bool kv_finalize_stop
= false;
1869 deque
<TransContext
*> kv_queue
; ///< ready, already submitted
1870 deque
<TransContext
*> kv_queue_unsubmitted
; ///< ready, need submit by kv thread
1871 deque
<TransContext
*> kv_committing
; ///< currently syncing
1872 deque
<DeferredBatch
*> deferred_done_queue
; ///< deferred ios done
1873 deque
<DeferredBatch
*> deferred_stable_queue
; ///< deferred ios done + stable
1875 KVFinalizeThread kv_finalize_thread
;
1876 std::mutex kv_finalize_lock
;
1877 std::condition_variable kv_finalize_cond
;
1878 deque
<TransContext
*> kv_committing_to_finalize
; ///< pending finalization
1879 deque
<DeferredBatch
*> deferred_stable_to_finalize
; ///< pending finalization
1881 PerfCounters
*logger
= nullptr;
1883 list
<CollectionRef
> removed_collections
;
1885 RWLock debug_read_error_lock
= {"BlueStore::debug_read_error_lock"};
1886 set
<ghobject_t
> debug_data_error_objects
;
1887 set
<ghobject_t
> debug_mdata_error_objects
;
1889 std::atomic
<int> csum_type
= {Checksummer::CSUM_CRC32C
};
1891 uint64_t block_size
= 0; ///< block size of block device (power of 2)
1892 uint64_t block_mask
= 0; ///< mask to get just the block offset
1893 size_t block_size_order
= 0; ///< bits to shift to get block size
1895 uint64_t min_alloc_size
= 0; ///< minimum allocation unit (power of 2)
1896 ///< bits for min_alloc_size
1897 uint8_t min_alloc_size_order
= 0;
1898 static_assert(std::numeric_limits
<uint8_t>::max() >
1899 std::numeric_limits
<decltype(min_alloc_size
)>::digits
,
1900 "not enough bits for min_alloc_size");
1902 ///< maximum allocation unit (power of 2)
1903 std::atomic
<uint64_t> max_alloc_size
= {0};
1905 ///< number threshold for forced deferred writes
1906 std::atomic
<int> deferred_batch_ops
= {0};
1908 ///< size threshold for forced deferred writes
1909 std::atomic
<uint64_t> prefer_deferred_size
= {0};
1911 ///< approx cost per io, in bytes
1912 std::atomic
<uint64_t> throttle_cost_per_io
= {0};
1914 std::atomic
<Compressor::CompressionMode
> comp_mode
=
1915 {Compressor::COMP_NONE
}; ///< compression mode
1916 CompressorRef compressor
;
1917 std::atomic
<uint64_t> comp_min_blob_size
= {0};
1918 std::atomic
<uint64_t> comp_max_blob_size
= {0};
1920 std::atomic
<uint64_t> max_blob_size
= {0}; ///< maximum blob size
1922 uint64_t kv_ios
= 0;
1923 uint64_t kv_throttle_costs
= 0;
1925 // cache trim control
1926 uint64_t cache_size
= 0; ///< total cache size
1927 float cache_meta_ratio
= 0; ///< cache ratio dedicated to metadata
1928 float cache_kv_ratio
= 0; ///< cache ratio dedicated to kv (e.g., rocksdb)
1929 float cache_data_ratio
= 0; ///< cache ratio dedicated to object data
1931 std::mutex vstatfs_lock
;
1932 volatile_statfs vstatfs
;
1934 struct MempoolThread
: public Thread
{
1940 explicit MempoolThread(BlueStore
*s
)
1942 lock("BlueStore::MempoolThread::lock") {}
1943 void *entry() override
;
1945 assert(stop
== false);
1946 create("bstore_mempool");
1957 // --------------------------------------------------------
1960 void _init_logger();
1961 void _shutdown_logger();
1962 int _reload_logger();
1966 int _open_fsid(bool create
);
1968 int _read_fsid(uuid_d
*f
);
1971 void _set_alloc_sizes();
1972 void _set_blob_size();
1973 void _set_finisher_num();
1975 int _open_bdev(bool create
);
1977 int _open_db(bool create
);
1979 int _open_fm(bool create
);
1982 void _close_alloc();
1983 int _open_collections(int *errors
=0);
1984 void _close_collections();
1986 int _setup_block_symlink_or_file(string name
, string path
, uint64_t size
,
1990 static int _write_bdev_label(CephContext
* cct
,
1991 string path
, bluestore_bdev_label_t label
);
1992 static int _read_bdev_label(CephContext
* cct
, string path
,
1993 bluestore_bdev_label_t
*label
);
1995 int _check_or_set_bdev_label(string path
, uint64_t size
, string desc
,
1998 int _open_super_meta();
2000 void _open_statfs();
2002 int _reconcile_bluefs_freespace();
2003 int _balance_bluefs_freespace(PExtentVector
*extents
);
2004 void _commit_bluefs_freespace(const PExtentVector
& extents
);
2006 CollectionRef
_get_collection(const coll_t
& cid
);
2007 void _queue_reap_collection(CollectionRef
& c
);
2008 void _reap_collections();
2009 void _update_cache_logger();
2011 void _assign_nid(TransContext
*txc
, OnodeRef o
);
2012 uint64_t _assign_blobid(TransContext
*txc
);
2014 void _dump_onode(const OnodeRef
& o
, int log_level
=30);
2015 void _dump_extent_map(ExtentMap
& em
, int log_level
=30);
2016 void _dump_transaction(Transaction
*t
, int log_level
= 30);
2018 TransContext
*_txc_create(OpSequencer
*osr
);
2019 void _txc_update_store_statfs(TransContext
*txc
);
2020 void _txc_add_transaction(TransContext
*txc
, Transaction
*t
);
2021 void _txc_calc_cost(TransContext
*txc
);
2022 void _txc_write_nodes(TransContext
*txc
, KeyValueDB::Transaction t
);
2023 void _txc_state_proc(TransContext
*txc
);
2024 void _txc_aio_submit(TransContext
*txc
);
2026 void txc_aio_finish(void *p
) {
2027 _txc_state_proc(static_cast<TransContext
*>(p
));
2030 void _txc_finish_io(TransContext
*txc
);
2031 void _txc_finalize_kv(TransContext
*txc
, KeyValueDB::Transaction t
);
2032 void _txc_applied_kv(TransContext
*txc
);
2033 void _txc_committed_kv(TransContext
*txc
);
2034 void _txc_finish(TransContext
*txc
);
2035 void _txc_release_alloc(TransContext
*txc
);
2037 void _osr_drain_preceding(TransContext
*txc
);
2038 void _osr_drain_all();
2039 void _osr_unregister_all();
2043 void _kv_sync_thread();
2044 void _kv_finalize_thread();
2046 bluestore_deferred_op_t
*_get_deferred_op(TransContext
*txc
, OnodeRef o
);
2047 void _deferred_queue(TransContext
*txc
);
2049 void deferred_try_submit();
2051 void _deferred_submit_unlock(OpSequencer
*osr
);
2052 void _deferred_aio_finish(OpSequencer
*osr
);
2053 int _deferred_replay();
2056 using mempool_dynamic_bitset
=
2057 boost::dynamic_bitset
<uint64_t,
2058 mempool::bluestore_fsck::pool_allocator
<uint64_t>>;
2061 int _fsck_check_extents(
2062 const ghobject_t
& oid
,
2063 const PExtentVector
& extents
,
2065 mempool_dynamic_bitset
&used_blocks
,
2066 uint64_t granularity
,
2067 store_statfs_t
& expected_statfs
);
2069 void _buffer_cache_write(
2075 b
->shared_blob
->bc
.write(b
->shared_blob
->get_cache(), txc
->seq
, offset
, bl
,
2077 txc
->shared_blobs_written
.insert(b
->shared_blob
);
2080 int _collection_list(
2081 Collection
*c
, const ghobject_t
& start
, const ghobject_t
& end
,
2082 int max
, vector
<ghobject_t
> *ls
, ghobject_t
*next
);
2084 template <typename T
, typename F
>
2085 T
select_option(const std::string
& opt_name
, T val1
, F f
) {
2086 //NB: opt_name reserved for future use
2087 boost::optional
<T
> val2
= f();
2094 void _apply_padding(uint64_t head_pad
,
2096 bufferlist
& padded
);
2098 // -- ondisk version ---
2100 const int32_t latest_ondisk_format
= 2; ///< our version
2101 const int32_t min_readable_ondisk_format
= 1; ///< what we can read
2102 const int32_t min_compat_ondisk_format
= 2; ///< who can read us
2105 int32_t ondisk_format
= 0; ///< value detected on mount
2107 int _upgrade_super(); ///< upgrade (called during open_super)
2108 void _prepare_ondisk_format_super(KeyValueDB::Transaction
& t
);
2110 // --- public interface ---
2112 BlueStore(CephContext
*cct
, const string
& path
);
2113 BlueStore(CephContext
*cct
, const string
& path
, uint64_t min_alloc_size
); // Ctor for UT only
2114 ~BlueStore() override
;
2116 string
get_type() override
{
2120 bool needs_journal() override
{ return false; };
2121 bool wants_journal() override
{ return false; };
2122 bool allows_journal() override
{ return false; };
2124 bool is_rotational() override
;
2125 bool is_journal_rotational() override
;
2127 string
get_default_device_class() override
{
2128 string device_class
;
2129 map
<string
, string
> metadata
;
2130 collect_metadata(&metadata
);
2131 auto it
= metadata
.find("bluestore_bdev_type");
2132 if (it
!= metadata
.end()) {
2133 device_class
= it
->second
;
2135 return device_class
;
2138 static int get_block_device_fsid(CephContext
* cct
, const string
& path
,
2141 bool test_mount_in_use() override
;
2144 int _mount(bool kv_only
);
2146 int mount() override
{
2147 return _mount(false);
2149 int umount() override
;
2151 int start_kv_only(KeyValueDB
**pdb
) {
2152 int r
= _mount(true);
2159 int write_meta(const std::string
& key
, const std::string
& value
) override
;
2160 int read_meta(const std::string
& key
, std::string
*value
) override
;
2163 int fsck(bool deep
) override
{
2164 return _fsck(deep
, false);
2166 int repair(bool deep
) override
{
2167 return _fsck(deep
, true);
2169 int _fsck(bool deep
, bool repair
);
2171 void set_cache_shards(unsigned num
) override
;
2173 int validate_hobject_key(const hobject_t
&obj
) const override
{
2176 unsigned get_max_attr_name_length() override
{
2177 return 256; // arbitrary; there is no real limit internally
2180 int mkfs() override
;
2181 int mkjournal() override
{
2185 void get_db_statistics(Formatter
*f
) override
;
2186 void generate_db_histogram(Formatter
*f
) override
;
2187 void _flush_cache();
2188 void flush_cache() override
;
2189 void dump_perf_counters(Formatter
*f
) override
{
2190 f
->open_object_section("perf_counters");
2191 logger
->dump_formatted(f
, false);
2195 void register_osr(OpSequencer
*osr
) {
2196 std::lock_guard
<std::mutex
> l(osr_lock
);
2197 osr_set
.insert(osr
);
2199 void unregister_osr(OpSequencer
*osr
) {
2200 std::lock_guard
<std::mutex
> l(osr_lock
);
2205 int statfs(struct store_statfs_t
*buf
) override
;
2207 void collect_metadata(map
<string
,string
> *pm
) override
;
2209 bool exists(const coll_t
& cid
, const ghobject_t
& oid
) override
;
2210 bool exists(CollectionHandle
&c
, const ghobject_t
& oid
) override
;
2211 int set_collection_opts(
2213 const pool_opts_t
& opts
) override
;
2216 const ghobject_t
& oid
,
2218 bool allow_eio
= false) override
;
2220 CollectionHandle
&c
,
2221 const ghobject_t
& oid
,
2223 bool allow_eio
= false) override
;
2226 const ghobject_t
& oid
,
2230 uint32_t op_flags
= 0) override
;
2232 CollectionHandle
&c
,
2233 const ghobject_t
& oid
,
2237 uint32_t op_flags
= 0) override
;
2244 uint32_t op_flags
= 0);
2247 int _fiemap(CollectionHandle
&c_
, const ghobject_t
& oid
,
2248 uint64_t offset
, size_t len
, interval_set
<uint64_t>& destset
);
2250 int fiemap(const coll_t
& cid
, const ghobject_t
& oid
,
2251 uint64_t offset
, size_t len
, bufferlist
& bl
) override
;
2252 int fiemap(CollectionHandle
&c
, const ghobject_t
& oid
,
2253 uint64_t offset
, size_t len
, bufferlist
& bl
) override
;
2254 int fiemap(const coll_t
& cid
, const ghobject_t
& oid
,
2255 uint64_t offset
, size_t len
, map
<uint64_t, uint64_t>& destmap
) override
;
2256 int fiemap(CollectionHandle
&c
, const ghobject_t
& oid
,
2257 uint64_t offset
, size_t len
, map
<uint64_t, uint64_t>& destmap
) override
;
2260 int getattr(const coll_t
& cid
, const ghobject_t
& oid
, const char *name
,
2261 bufferptr
& value
) override
;
2262 int getattr(CollectionHandle
&c
, const ghobject_t
& oid
, const char *name
,
2263 bufferptr
& value
) override
;
2265 int getattrs(const coll_t
& cid
, const ghobject_t
& oid
,
2266 map
<string
,bufferptr
>& aset
) override
;
2267 int getattrs(CollectionHandle
&c
, const ghobject_t
& oid
,
2268 map
<string
,bufferptr
>& aset
) override
;
2270 int list_collections(vector
<coll_t
>& ls
) override
;
2272 CollectionHandle
open_collection(const coll_t
&c
) override
;
2274 bool collection_exists(const coll_t
& c
) override
;
2275 int collection_empty(const coll_t
& c
, bool *empty
) override
;
2276 int collection_bits(const coll_t
& c
) override
;
2278 int collection_list(const coll_t
& cid
,
2279 const ghobject_t
& start
,
2280 const ghobject_t
& end
,
2282 vector
<ghobject_t
> *ls
, ghobject_t
*next
) override
;
2283 int collection_list(CollectionHandle
&c
,
2284 const ghobject_t
& start
,
2285 const ghobject_t
& end
,
2287 vector
<ghobject_t
> *ls
, ghobject_t
*next
) override
;
2290 const coll_t
& cid
, ///< [in] Collection containing oid
2291 const ghobject_t
&oid
, ///< [in] Object containing omap
2292 bufferlist
*header
, ///< [out] omap header
2293 map
<string
, bufferlist
> *out
/// < [out] Key to value map
2296 CollectionHandle
&c
, ///< [in] Collection containing oid
2297 const ghobject_t
&oid
, ///< [in] Object containing omap
2298 bufferlist
*header
, ///< [out] omap header
2299 map
<string
, bufferlist
> *out
/// < [out] Key to value map
2303 int omap_get_header(
2304 const coll_t
& cid
, ///< [in] Collection containing oid
2305 const ghobject_t
&oid
, ///< [in] Object containing omap
2306 bufferlist
*header
, ///< [out] omap header
2307 bool allow_eio
= false ///< [in] don't assert on eio
2309 int omap_get_header(
2310 CollectionHandle
&c
, ///< [in] Collection containing oid
2311 const ghobject_t
&oid
, ///< [in] Object containing omap
2312 bufferlist
*header
, ///< [out] omap header
2313 bool allow_eio
= false ///< [in] don't assert on eio
2316 /// Get keys defined on oid
2318 const coll_t
& cid
, ///< [in] Collection containing oid
2319 const ghobject_t
&oid
, ///< [in] Object containing omap
2320 set
<string
> *keys
///< [out] Keys defined on oid
2323 CollectionHandle
&c
, ///< [in] Collection containing oid
2324 const ghobject_t
&oid
, ///< [in] Object containing omap
2325 set
<string
> *keys
///< [out] Keys defined on oid
2329 int omap_get_values(
2330 const coll_t
& cid
, ///< [in] Collection containing oid
2331 const ghobject_t
&oid
, ///< [in] Object containing omap
2332 const set
<string
> &keys
, ///< [in] Keys to get
2333 map
<string
, bufferlist
> *out
///< [out] Returned keys and values
2335 int omap_get_values(
2336 CollectionHandle
&c
, ///< [in] Collection containing oid
2337 const ghobject_t
&oid
, ///< [in] Object containing omap
2338 const set
<string
> &keys
, ///< [in] Keys to get
2339 map
<string
, bufferlist
> *out
///< [out] Returned keys and values
2342 /// Filters keys into out which are defined on oid
2343 int omap_check_keys(
2344 const coll_t
& cid
, ///< [in] Collection containing oid
2345 const ghobject_t
&oid
, ///< [in] Object containing omap
2346 const set
<string
> &keys
, ///< [in] Keys to check
2347 set
<string
> *out
///< [out] Subset of keys defined on oid
2349 int omap_check_keys(
2350 CollectionHandle
&c
, ///< [in] Collection containing oid
2351 const ghobject_t
&oid
, ///< [in] Object containing omap
2352 const set
<string
> &keys
, ///< [in] Keys to check
2353 set
<string
> *out
///< [out] Subset of keys defined on oid
2356 ObjectMap::ObjectMapIterator
get_omap_iterator(
2357 const coll_t
& cid
, ///< [in] collection
2358 const ghobject_t
&oid
///< [in] object
2360 ObjectMap::ObjectMapIterator
get_omap_iterator(
2361 CollectionHandle
&c
, ///< [in] collection
2362 const ghobject_t
&oid
///< [in] object
2365 void set_fsid(uuid_d u
) override
{
2368 uuid_d
get_fsid() override
{
2372 uint64_t estimate_objects_overhead(uint64_t num_objects
) override
{
2373 return num_objects
* 300; //assuming per-object overhead is 300 bytes
2376 struct BSPerfTracker
{
2377 PerfCounters::avg_tracker
<uint64_t> os_commit_latency
;
2378 PerfCounters::avg_tracker
<uint64_t> os_apply_latency
;
2380 objectstore_perf_stat_t
get_cur_stats() const {
2381 objectstore_perf_stat_t ret
;
2382 ret
.os_commit_latency
= os_commit_latency
.current_avg();
2383 ret
.os_apply_latency
= os_apply_latency
.current_avg();
2387 void update_from_perfcounters(PerfCounters
&logger
);
2390 objectstore_perf_stat_t
get_cur_stats() override
{
2391 perf_tracker
.update_from_perfcounters(*logger
);
2392 return perf_tracker
.get_cur_stats();
2394 const PerfCounters
* get_perf_counters() const override
{
2398 int queue_transactions(
2400 vector
<Transaction
>& tls
,
2401 TrackedOpRef op
= TrackedOpRef(),
2402 ThreadPool::TPHandle
*handle
= NULL
) override
;
2405 void inject_data_error(const ghobject_t
& o
) override
{
2406 RWLock::WLocker
l(debug_read_error_lock
);
2407 debug_data_error_objects
.insert(o
);
2409 void inject_mdata_error(const ghobject_t
& o
) override
{
2410 RWLock::WLocker
l(debug_read_error_lock
);
2411 debug_mdata_error_objects
.insert(o
);
2413 void compact() override
{
2417 bool has_builtin_csum() const override
{
2422 bool _debug_data_eio(const ghobject_t
& o
) {
2423 if (!cct
->_conf
->bluestore_debug_inject_read_err
) {
2426 RWLock::RLocker
l(debug_read_error_lock
);
2427 return debug_data_error_objects
.count(o
);
2429 bool _debug_mdata_eio(const ghobject_t
& o
) {
2430 if (!cct
->_conf
->bluestore_debug_inject_read_err
) {
2433 RWLock::RLocker
l(debug_read_error_lock
);
2434 return debug_mdata_error_objects
.count(o
);
2436 void _debug_obj_on_delete(const ghobject_t
& o
) {
2437 if (cct
->_conf
->bluestore_debug_inject_read_err
) {
2438 RWLock::WLocker
l(debug_read_error_lock
);
2439 debug_data_error_objects
.erase(o
);
2440 debug_mdata_error_objects
.erase(o
);
2446 // --------------------------------------------------------
2447 // read processing internal methods
2450 const bluestore_blob_t
* blob
,
2451 uint64_t blob_xoffset
,
2452 const bufferlist
& bl
,
2453 uint64_t logical_offset
) const;
2454 int _decompress(bufferlist
& source
, bufferlist
* result
);
2457 // --------------------------------------------------------
2460 struct WriteContext
{
2461 bool buffered
= false; ///< buffered write
2462 bool compress
= false; ///< compressed write
2463 uint64_t target_blob_size
= 0; ///< target (max) blob size
2464 unsigned csum_order
= 0; ///< target checksum chunk order
2466 old_extent_map_t old_extents
; ///< must deref these blobs
2469 uint64_t logical_offset
; ///< write logical offset
2471 uint64_t blob_length
;
2474 uint64_t b_off0
; ///< original offset in a blob prior to padding
2475 uint64_t length0
; ///< original data length prior to padding
2478 bool new_blob
; ///< whether new blob was created
2480 bool compressed
= false;
2481 bufferlist compressed_bl
;
2482 size_t compressed_len
= 0;
2485 uint64_t logical_offs
,
2495 logical_offset(logical_offs
),
2497 blob_length(blob_len
),
2502 mark_unused(_mark_unused
),
2503 new_blob(_new_blob
) {}
2505 vector
<write_item
> writes
; ///< blobs we're writing
2507 /// partial clone of the context
2508 void fork(const WriteContext
& other
) {
2509 buffered
= other
.buffered
;
2510 compress
= other
.compress
;
2511 target_blob_size
= other
.target_blob_size
;
2512 csum_order
= other
.csum_order
;
2524 writes
.emplace_back(loffs
,
2534 /// Checks for writes to the same pextent within a blob
2539 uint64_t min_alloc_size
);
2542 void _do_write_small(
2546 uint64_t offset
, uint64_t length
,
2547 bufferlist::iterator
& blp
,
2548 WriteContext
*wctx
);
2553 uint64_t offset
, uint64_t length
,
2554 bufferlist::iterator
& blp
,
2555 WriteContext
*wctx
);
2556 int _do_alloc_write(
2560 WriteContext
*wctx
);
2566 set
<SharedBlob
*> *maybe_unshared_blobs
=0);
2568 int _do_transaction(Transaction
*t
,
2570 ThreadPool::TPHandle
*handle
);
2572 int _write(TransContext
*txc
,
2575 uint64_t offset
, size_t len
,
2577 uint32_t fadvise_flags
);
2578 void _pad_zeros(bufferlist
*bl
, uint64_t *offset
,
2579 uint64_t chunk_size
);
2581 void _choose_write_options(CollectionRef
& c
,
2583 uint32_t fadvise_flags
,
2584 WriteContext
*wctx
);
2586 int _do_gc(TransContext
*txc
,
2589 const GarbageCollector
& gc
,
2590 const WriteContext
& wctx
,
2591 uint64_t *dirty_start
,
2592 uint64_t *dirty_end
);
2594 int _do_write(TransContext
*txc
,
2597 uint64_t offset
, uint64_t length
,
2599 uint32_t fadvise_flags
);
2600 void _do_write_data(TransContext
*txc
,
2606 WriteContext
*wctx
);
2608 int _touch(TransContext
*txc
,
2611 int _do_zero(TransContext
*txc
,
2614 uint64_t offset
, size_t len
);
2615 int _zero(TransContext
*txc
,
2618 uint64_t offset
, size_t len
);
2619 void _do_truncate(TransContext
*txc
,
2623 set
<SharedBlob
*> *maybe_unshared_blobs
=0);
2624 int _truncate(TransContext
*txc
,
2628 int _remove(TransContext
*txc
,
2631 int _do_remove(TransContext
*txc
,
2634 int _setattr(TransContext
*txc
,
2639 int _setattrs(TransContext
*txc
,
2642 const map
<string
,bufferptr
>& aset
);
2643 int _rmattr(TransContext
*txc
,
2646 const string
& name
);
2647 int _rmattrs(TransContext
*txc
,
2650 void _do_omap_clear(TransContext
*txc
, uint64_t id
);
2651 int _omap_clear(TransContext
*txc
,
2654 int _omap_setkeys(TransContext
*txc
,
2658 int _omap_setheader(TransContext
*txc
,
2661 bufferlist
& header
);
2662 int _omap_rmkeys(TransContext
*txc
,
2666 int _omap_rmkey_range(TransContext
*txc
,
2669 const string
& first
, const string
& last
);
2670 int _set_alloc_hint(
2674 uint64_t expected_object_size
,
2675 uint64_t expected_write_size
,
2677 int _do_clone_range(TransContext
*txc
,
2681 uint64_t srcoff
, uint64_t length
, uint64_t dstoff
);
2682 int _clone(TransContext
*txc
,
2686 int _clone_range(TransContext
*txc
,
2690 uint64_t srcoff
, uint64_t length
, uint64_t dstoff
);
2691 int _rename(TransContext
*txc
,
2695 const ghobject_t
& new_oid
);
2696 int _create_collection(TransContext
*txc
, const coll_t
&cid
,
2697 unsigned bits
, CollectionRef
*c
);
2698 int _remove_collection(TransContext
*txc
, const coll_t
&cid
,
2700 int _split_collection(TransContext
*txc
,
2703 unsigned bits
, int rem
);
2706 inline ostream
& operator<<(ostream
& out
, const BlueStore::OpSequencer
& s
) {
2707 return out
<< *s
.parent
;
2710 static inline void intrusive_ptr_add_ref(BlueStore::Onode
*o
) {
2713 static inline void intrusive_ptr_release(BlueStore::Onode
*o
) {
2717 static inline void intrusive_ptr_add_ref(BlueStore::OpSequencer
*o
) {
2720 static inline void intrusive_ptr_release(BlueStore::OpSequencer
*o
) {