]> git.proxmox.com Git - ceph.git/blob - ceph/src/os/bluestore/BlueStore.h
e3b982887f58c648e90931d61d877683ebc01e08
[ceph.git] / ceph / src / os / bluestore / BlueStore.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_OSD_BLUESTORE_H
16 #define CEPH_OSD_BLUESTORE_H
17
18 #include "acconfig.h"
19
20 #include <unistd.h>
21
22 #include <atomic>
23 #include <chrono>
24 #include <ratio>
25 #include <mutex>
26 #include <condition_variable>
27
28 #include <boost/intrusive/list.hpp>
29 #include <boost/intrusive/unordered_set.hpp>
30 #include <boost/intrusive/set.hpp>
31 #include <boost/functional/hash.hpp>
32 #include <boost/dynamic_bitset.hpp>
33 #include <boost/circular_buffer.hpp>
34
35 #include "include/cpp-btree/btree_set.h"
36
37 #include "include/ceph_assert.h"
38 #include "include/unordered_map.h"
39 #include "include/mempool.h"
40 #include "include/hash.h"
41 #include "common/bloom_filter.hpp"
42 #include "common/Finisher.h"
43 #include "common/ceph_mutex.h"
44 #include "common/Throttle.h"
45 #include "common/perf_counters.h"
46 #include "common/PriorityCache.h"
47 #include "compressor/Compressor.h"
48 #include "os/ObjectStore.h"
49
50 #include "bluestore_types.h"
51 #include "BlockDevice.h"
52 #include "BlueFS.h"
53 #include "common/EventTrace.h"
54
55 class Allocator;
56 class FreelistManager;
57 class BlueStoreRepairer;
58
59 //#define DEBUG_CACHE
60 //#define DEBUG_DEFERRED
61
62
63
64 // constants for Buffer::optimize()
65 #define MAX_BUFFER_SLOP_RATIO_DEN 8 // so actually 1/N
66
67
68 enum {
69 l_bluestore_first = 732430,
70 l_bluestore_kv_flush_lat,
71 l_bluestore_kv_commit_lat,
72 l_bluestore_kv_sync_lat,
73 l_bluestore_kv_final_lat,
74 l_bluestore_state_prepare_lat,
75 l_bluestore_state_aio_wait_lat,
76 l_bluestore_state_io_done_lat,
77 l_bluestore_state_kv_queued_lat,
78 l_bluestore_state_kv_committing_lat,
79 l_bluestore_state_kv_done_lat,
80 l_bluestore_state_deferred_queued_lat,
81 l_bluestore_state_deferred_aio_wait_lat,
82 l_bluestore_state_deferred_cleanup_lat,
83 l_bluestore_state_finishing_lat,
84 l_bluestore_state_done_lat,
85 l_bluestore_throttle_lat,
86 l_bluestore_submit_lat,
87 l_bluestore_commit_lat,
88 l_bluestore_read_lat,
89 l_bluestore_read_onode_meta_lat,
90 l_bluestore_read_wait_aio_lat,
91 l_bluestore_compress_lat,
92 l_bluestore_decompress_lat,
93 l_bluestore_csum_lat,
94 l_bluestore_compress_success_count,
95 l_bluestore_compress_rejected_count,
96 l_bluestore_write_pad_bytes,
97 l_bluestore_deferred_write_ops,
98 l_bluestore_deferred_write_bytes,
99 l_bluestore_write_penalty_read_ops,
100 l_bluestore_allocated,
101 l_bluestore_stored,
102 l_bluestore_compressed,
103 l_bluestore_compressed_allocated,
104 l_bluestore_compressed_original,
105 l_bluestore_onodes,
106 l_bluestore_pinned_onodes,
107 l_bluestore_onode_hits,
108 l_bluestore_onode_misses,
109 l_bluestore_onode_shard_hits,
110 l_bluestore_onode_shard_misses,
111 l_bluestore_extents,
112 l_bluestore_blobs,
113 l_bluestore_buffers,
114 l_bluestore_buffer_bytes,
115 l_bluestore_buffer_hit_bytes,
116 l_bluestore_buffer_miss_bytes,
117 l_bluestore_write_big,
118 l_bluestore_write_big_bytes,
119 l_bluestore_write_big_blobs,
120 l_bluestore_write_small,
121 l_bluestore_write_small_bytes,
122 l_bluestore_write_small_unused,
123 l_bluestore_write_small_deferred,
124 l_bluestore_write_small_pre_read,
125 l_bluestore_write_small_new,
126 l_bluestore_txc,
127 l_bluestore_onode_reshard,
128 l_bluestore_blob_split,
129 l_bluestore_extent_compress,
130 l_bluestore_gc_merged,
131 l_bluestore_read_eio,
132 l_bluestore_reads_with_retries,
133 l_bluestore_fragmentation,
134 l_bluestore_omap_seek_to_first_lat,
135 l_bluestore_omap_upper_bound_lat,
136 l_bluestore_omap_lower_bound_lat,
137 l_bluestore_omap_next_lat,
138 l_bluestore_omap_get_keys_lat,
139 l_bluestore_omap_get_values_lat,
140 l_bluestore_clist_lat,
141 l_bluestore_remove_lat,
142 l_bluestore_last
143 };
144
145 #define META_POOL_ID ((uint64_t)-1ull)
146
147 class BlueStore : public ObjectStore,
148 public BlueFSDeviceExpander,
149 public md_config_obs_t {
150 // -----------------------------------------------------
151 // types
152 public:
153 // config observer
154 const char** get_tracked_conf_keys() const override;
155 void handle_conf_change(const ConfigProxy& conf,
156 const std::set<std::string> &changed) override;
157
158 //handler for discard event
159 void handle_discard(interval_set<uint64_t>& to_release);
160
161 void _set_csum();
162 void _set_compression();
163 void _set_throttle_params();
164 int _set_cache_sizes();
165 void _set_max_defer_interval() {
166 max_defer_interval =
167 cct->_conf.get_val<double>("bluestore_max_defer_interval");
168 }
169
170 class TransContext;
171
172 typedef map<uint64_t, bufferlist> ready_regions_t;
173
174
175 struct BufferSpace;
176 struct Collection;
177 typedef boost::intrusive_ptr<Collection> CollectionRef;
178
179 struct AioContext {
180 virtual void aio_finish(BlueStore *store) = 0;
181 virtual ~AioContext() {}
182 };
183
184 /// cached buffer
185 struct Buffer {
186 MEMPOOL_CLASS_HELPERS();
187
188 enum {
189 STATE_EMPTY, ///< empty buffer -- used for cache history
190 STATE_CLEAN, ///< clean data that is up to date
191 STATE_WRITING, ///< data that is being written (io not yet complete)
192 };
193 static const char *get_state_name(int s) {
194 switch (s) {
195 case STATE_EMPTY: return "empty";
196 case STATE_CLEAN: return "clean";
197 case STATE_WRITING: return "writing";
198 default: return "???";
199 }
200 }
201 enum {
202 FLAG_NOCACHE = 1, ///< trim when done WRITING (do not become CLEAN)
203 // NOTE: fix operator<< when you define a second flag
204 };
205 static const char *get_flag_name(int s) {
206 switch (s) {
207 case FLAG_NOCACHE: return "nocache";
208 default: return "???";
209 }
210 }
211
212 BufferSpace *space;
213 uint16_t state; ///< STATE_*
214 uint16_t cache_private = 0; ///< opaque (to us) value used by Cache impl
215 uint32_t flags; ///< FLAG_*
216 uint64_t seq;
217 uint32_t offset, length;
218 bufferlist data;
219
220 boost::intrusive::list_member_hook<> lru_item;
221 boost::intrusive::list_member_hook<> state_item;
222
223 Buffer(BufferSpace *space, unsigned s, uint64_t q, uint32_t o, uint32_t l,
224 unsigned f = 0)
225 : space(space), state(s), flags(f), seq(q), offset(o), length(l) {}
226 Buffer(BufferSpace *space, unsigned s, uint64_t q, uint32_t o, bufferlist& b,
227 unsigned f = 0)
228 : space(space), state(s), flags(f), seq(q), offset(o),
229 length(b.length()), data(b) {}
230
231 bool is_empty() const {
232 return state == STATE_EMPTY;
233 }
234 bool is_clean() const {
235 return state == STATE_CLEAN;
236 }
237 bool is_writing() const {
238 return state == STATE_WRITING;
239 }
240
241 uint32_t end() const {
242 return offset + length;
243 }
244
245 void truncate(uint32_t newlen) {
246 ceph_assert(newlen < length);
247 if (data.length()) {
248 bufferlist t;
249 t.substr_of(data, 0, newlen);
250 data.claim(t);
251 }
252 length = newlen;
253 }
254 void maybe_rebuild() {
255 if (data.length() &&
256 (data.get_num_buffers() > 1 ||
257 data.front().wasted() > data.length() / MAX_BUFFER_SLOP_RATIO_DEN)) {
258 data.rebuild();
259 }
260 }
261
262 void dump(Formatter *f) const {
263 f->dump_string("state", get_state_name(state));
264 f->dump_unsigned("seq", seq);
265 f->dump_unsigned("offset", offset);
266 f->dump_unsigned("length", length);
267 f->dump_unsigned("data_length", data.length());
268 }
269 };
270
271 struct BufferCacheShard;
272
273 /// map logical extent range (object) onto buffers
274 struct BufferSpace {
275 enum {
276 BYPASS_CLEAN_CACHE = 0x1, // bypass clean cache
277 };
278
279 typedef boost::intrusive::list<
280 Buffer,
281 boost::intrusive::member_hook<
282 Buffer,
283 boost::intrusive::list_member_hook<>,
284 &Buffer::state_item> > state_list_t;
285
286 mempool::bluestore_cache_meta::map<uint32_t, std::unique_ptr<Buffer>>
287 buffer_map;
288
289 // we use a bare intrusive list here instead of std::map because
290 // it uses less memory and we expect this to be very small (very
291 // few IOs in flight to the same Blob at the same time).
292 state_list_t writing; ///< writing buffers, sorted by seq, ascending
293
294 ~BufferSpace() {
295 ceph_assert(buffer_map.empty());
296 ceph_assert(writing.empty());
297 }
298
299 void _add_buffer(BufferCacheShard* cache, Buffer *b, int level, Buffer *near) {
300 cache->_audit("_add_buffer start");
301 buffer_map[b->offset].reset(b);
302 if (b->is_writing()) {
303 b->data.reassign_to_mempool(mempool::mempool_bluestore_writing);
304 if (writing.empty() || writing.rbegin()->seq <= b->seq) {
305 writing.push_back(*b);
306 } else {
307 auto it = writing.begin();
308 while (it->seq < b->seq) {
309 ++it;
310 }
311
312 ceph_assert(it->seq >= b->seq);
313 // note that this will insert b before it
314 // hence the order is maintained
315 writing.insert(it, *b);
316 }
317 } else {
318 b->data.reassign_to_mempool(mempool::mempool_bluestore_cache_data);
319 cache->_add(b, level, near);
320 }
321 cache->_audit("_add_buffer end");
322 }
323 void _rm_buffer(BufferCacheShard* cache, Buffer *b) {
324 _rm_buffer(cache, buffer_map.find(b->offset));
325 }
326 void _rm_buffer(BufferCacheShard* cache,
327 map<uint32_t, std::unique_ptr<Buffer>>::iterator p) {
328 ceph_assert(p != buffer_map.end());
329 cache->_audit("_rm_buffer start");
330 if (p->second->is_writing()) {
331 writing.erase(writing.iterator_to(*p->second));
332 } else {
333 cache->_rm(p->second.get());
334 }
335 buffer_map.erase(p);
336 cache->_audit("_rm_buffer end");
337 }
338
339 map<uint32_t,std::unique_ptr<Buffer>>::iterator _data_lower_bound(
340 uint32_t offset) {
341 auto i = buffer_map.lower_bound(offset);
342 if (i != buffer_map.begin()) {
343 --i;
344 if (i->first + i->second->length <= offset)
345 ++i;
346 }
347 return i;
348 }
349
350 // must be called under protection of the Cache lock
351 void _clear(BufferCacheShard* cache);
352
353 // return value is the highest cache_private of a trimmed buffer, or 0.
354 int discard(BufferCacheShard* cache, uint32_t offset, uint32_t length) {
355 std::lock_guard l(cache->lock);
356 int ret = _discard(cache, offset, length);
357 cache->_trim();
358 return ret;
359 }
360 int _discard(BufferCacheShard* cache, uint32_t offset, uint32_t length);
361
362 void write(BufferCacheShard* cache, uint64_t seq, uint32_t offset, bufferlist& bl,
363 unsigned flags) {
364 std::lock_guard l(cache->lock);
365 Buffer *b = new Buffer(this, Buffer::STATE_WRITING, seq, offset, bl,
366 flags);
367 b->cache_private = _discard(cache, offset, bl.length());
368 _add_buffer(cache, b, (flags & Buffer::FLAG_NOCACHE) ? 0 : 1, nullptr);
369 cache->_trim();
370 }
371 void _finish_write(BufferCacheShard* cache, uint64_t seq);
372 void did_read(BufferCacheShard* cache, uint32_t offset, bufferlist& bl) {
373 std::lock_guard l(cache->lock);
374 Buffer *b = new Buffer(this, Buffer::STATE_CLEAN, 0, offset, bl);
375 b->cache_private = _discard(cache, offset, bl.length());
376 _add_buffer(cache, b, 1, nullptr);
377 cache->_trim();
378 }
379
380 void read(BufferCacheShard* cache, uint32_t offset, uint32_t length,
381 BlueStore::ready_regions_t& res,
382 interval_set<uint32_t>& res_intervals,
383 int flags = 0);
384
385 void truncate(BufferCacheShard* cache, uint32_t offset) {
386 discard(cache, offset, (uint32_t)-1 - offset);
387 }
388
389 void split(BufferCacheShard* cache, size_t pos, BufferSpace &r);
390
391 void dump(BufferCacheShard* cache, Formatter *f) const {
392 std::lock_guard l(cache->lock);
393 f->open_array_section("buffers");
394 for (auto& i : buffer_map) {
395 f->open_object_section("buffer");
396 ceph_assert(i.first == i.second->offset);
397 i.second->dump(f);
398 f->close_section();
399 }
400 f->close_section();
401 }
402 };
403
404 struct SharedBlobSet;
405
406 /// in-memory shared blob state (incl cached buffers)
407 struct SharedBlob {
408 MEMPOOL_CLASS_HELPERS();
409
410 std::atomic_int nref = {0}; ///< reference count
411 bool loaded = false;
412
413 CollectionRef coll;
414 union {
415 uint64_t sbid_unloaded; ///< sbid if persistent isn't loaded
416 bluestore_shared_blob_t *persistent; ///< persistent part of the shared blob if any
417 };
418 BufferSpace bc; ///< buffer cache
419
420 SharedBlob(Collection *_coll) : coll(_coll), sbid_unloaded(0) {
421 if (get_cache()) {
422 get_cache()->add_blob();
423 }
424 }
425 SharedBlob(uint64_t i, Collection *_coll);
426 ~SharedBlob();
427
428 uint64_t get_sbid() const {
429 return loaded ? persistent->sbid : sbid_unloaded;
430 }
431
432 friend void intrusive_ptr_add_ref(SharedBlob *b) { b->get(); }
433 friend void intrusive_ptr_release(SharedBlob *b) { b->put(); }
434
435 void dump(Formatter* f) const;
436 friend ostream& operator<<(ostream& out, const SharedBlob& sb);
437
438 void get() {
439 ++nref;
440 }
441 void put();
442
443 /// get logical references
444 void get_ref(uint64_t offset, uint32_t length);
445
446 /// put logical references, and get back any released extents
447 void put_ref(uint64_t offset, uint32_t length,
448 PExtentVector *r, bool *unshare);
449
450 void finish_write(uint64_t seq);
451
452 friend bool operator==(const SharedBlob &l, const SharedBlob &r) {
453 return l.get_sbid() == r.get_sbid();
454 }
455 inline BufferCacheShard* get_cache() {
456 return coll ? coll->cache : nullptr;
457 }
458 inline SharedBlobSet* get_parent() {
459 return coll ? &(coll->shared_blob_set) : nullptr;
460 }
461 inline bool is_loaded() const {
462 return loaded;
463 }
464
465 };
466 typedef boost::intrusive_ptr<SharedBlob> SharedBlobRef;
467
468 /// a lookup table of SharedBlobs
469 struct SharedBlobSet {
470 /// protect lookup, insertion, removal
471 ceph::mutex lock = ceph::make_mutex("BlueStore::SharedBlobSet::lock");
472
473 // we use a bare pointer because we don't want to affect the ref
474 // count
475 mempool::bluestore_cache_meta::unordered_map<uint64_t,SharedBlob*> sb_map;
476
477 SharedBlobRef lookup(uint64_t sbid) {
478 std::lock_guard l(lock);
479 auto p = sb_map.find(sbid);
480 if (p == sb_map.end() ||
481 p->second->nref == 0) {
482 return nullptr;
483 }
484 return p->second;
485 }
486
487 void add(Collection* coll, SharedBlob *sb) {
488 std::lock_guard l(lock);
489 sb_map[sb->get_sbid()] = sb;
490 sb->coll = coll;
491 }
492
493 bool remove(SharedBlob *sb, bool verify_nref_is_zero=false) {
494 std::lock_guard l(lock);
495 ceph_assert(sb->get_parent() == this);
496 if (verify_nref_is_zero && sb->nref != 0) {
497 return false;
498 }
499 // only remove if it still points to us
500 auto p = sb_map.find(sb->get_sbid());
501 if (p != sb_map.end() &&
502 p->second == sb) {
503 sb_map.erase(p);
504 }
505 return true;
506 }
507
508 bool empty() {
509 std::lock_guard l(lock);
510 return sb_map.empty();
511 }
512
513 template <int LogLevelV>
514 void dump(CephContext *cct);
515 };
516
517 //#define CACHE_BLOB_BL // not sure if this is a win yet or not... :/
518
519 /// in-memory blob metadata and associated cached buffers (if any)
520 struct Blob {
521 MEMPOOL_CLASS_HELPERS();
522
523 std::atomic_int nref = {0}; ///< reference count
524 int16_t id = -1; ///< id, for spanning blobs only, >= 0
525 int16_t last_encoded_id = -1; ///< (ephemeral) used during encoding only
526 SharedBlobRef shared_blob; ///< shared blob state (if any)
527
528 private:
529 mutable bluestore_blob_t blob; ///< decoded blob metadata
530 #ifdef CACHE_BLOB_BL
531 mutable bufferlist blob_bl; ///< cached encoded blob, blob is dirty if empty
532 #endif
533 /// refs from this shard. ephemeral if id<0, persisted if spanning.
534 bluestore_blob_use_tracker_t used_in_blob;
535
536 public:
537
538 friend void intrusive_ptr_add_ref(Blob *b) { b->get(); }
539 friend void intrusive_ptr_release(Blob *b) { b->put(); }
540
541 void dump(Formatter* f) const;
542 friend ostream& operator<<(ostream& out, const Blob &b);
543
544 const bluestore_blob_use_tracker_t& get_blob_use_tracker() const {
545 return used_in_blob;
546 }
547 bool is_referenced() const {
548 return used_in_blob.is_not_empty();
549 }
550 uint32_t get_referenced_bytes() const {
551 return used_in_blob.get_referenced_bytes();
552 }
553
554 bool is_spanning() const {
555 return id >= 0;
556 }
557
558 bool can_split() const {
559 std::lock_guard l(shared_blob->get_cache()->lock);
560 // splitting a BufferSpace writing list is too hard; don't try.
561 return shared_blob->bc.writing.empty() &&
562 used_in_blob.can_split() &&
563 get_blob().can_split();
564 }
565
566 bool can_split_at(uint32_t blob_offset) const {
567 return used_in_blob.can_split_at(blob_offset) &&
568 get_blob().can_split_at(blob_offset);
569 }
570
571 bool can_reuse_blob(uint32_t min_alloc_size,
572 uint32_t target_blob_size,
573 uint32_t b_offset,
574 uint32_t *length0);
575
576 void dup(Blob& o) {
577 o.shared_blob = shared_blob;
578 o.blob = blob;
579 #ifdef CACHE_BLOB_BL
580 o.blob_bl = blob_bl;
581 #endif
582 }
583
584 inline const bluestore_blob_t& get_blob() const {
585 return blob;
586 }
587 inline bluestore_blob_t& dirty_blob() {
588 #ifdef CACHE_BLOB_BL
589 blob_bl.clear();
590 #endif
591 return blob;
592 }
593
594 /// discard buffers for unallocated regions
595 void discard_unallocated(Collection *coll);
596
597 /// get logical references
598 void get_ref(Collection *coll, uint32_t offset, uint32_t length);
599 /// put logical references, and get back any released extents
600 bool put_ref(Collection *coll, uint32_t offset, uint32_t length,
601 PExtentVector *r);
602
603 /// split the blob
604 void split(Collection *coll, uint32_t blob_offset, Blob *o);
605
606 void get() {
607 ++nref;
608 }
609 void put() {
610 if (--nref == 0)
611 delete this;
612 }
613
614
615 #ifdef CACHE_BLOB_BL
616 void _encode() const {
617 if (blob_bl.length() == 0 ) {
618 encode(blob, blob_bl);
619 } else {
620 ceph_assert(blob_bl.length());
621 }
622 }
623 void bound_encode(
624 size_t& p,
625 bool include_ref_map) const {
626 _encode();
627 p += blob_bl.length();
628 if (include_ref_map) {
629 used_in_blob.bound_encode(p);
630 }
631 }
632 void encode(
633 bufferlist::contiguous_appender& p,
634 bool include_ref_map) const {
635 _encode();
636 p.append(blob_bl);
637 if (include_ref_map) {
638 used_in_blob.encode(p);
639 }
640 }
641 void decode(
642 Collection */*coll*/,
643 bufferptr::const_iterator& p,
644 bool include_ref_map) {
645 const char *start = p.get_pos();
646 denc(blob, p);
647 const char *end = p.get_pos();
648 blob_bl.clear();
649 blob_bl.append(start, end - start);
650 if (include_ref_map) {
651 used_in_blob.decode(p);
652 }
653 }
654 #else
655 void bound_encode(
656 size_t& p,
657 uint64_t struct_v,
658 uint64_t sbid,
659 bool include_ref_map) const {
660 denc(blob, p, struct_v);
661 if (blob.is_shared()) {
662 denc(sbid, p);
663 }
664 if (include_ref_map) {
665 used_in_blob.bound_encode(p);
666 }
667 }
668 void encode(
669 bufferlist::contiguous_appender& p,
670 uint64_t struct_v,
671 uint64_t sbid,
672 bool include_ref_map) const {
673 denc(blob, p, struct_v);
674 if (blob.is_shared()) {
675 denc(sbid, p);
676 }
677 if (include_ref_map) {
678 used_in_blob.encode(p);
679 }
680 }
681 void decode(
682 Collection *coll,
683 bufferptr::const_iterator& p,
684 uint64_t struct_v,
685 uint64_t* sbid,
686 bool include_ref_map);
687 #endif
688 };
689 typedef boost::intrusive_ptr<Blob> BlobRef;
690 typedef mempool::bluestore_cache_meta::map<int,BlobRef> blob_map_t;
691
692 /// a logical extent, pointing to (some portion of) a blob
693 typedef boost::intrusive::set_base_hook<boost::intrusive::optimize_size<true> > ExtentBase; //making an alias to avoid build warnings
694 struct Extent : public ExtentBase {
695 MEMPOOL_CLASS_HELPERS();
696
697 uint32_t logical_offset = 0; ///< logical offset
698 uint32_t blob_offset = 0; ///< blob offset
699 uint32_t length = 0; ///< length
700 BlobRef blob; ///< the blob with our data
701
702 /// ctor for lookup only
703 explicit Extent(uint32_t lo) : ExtentBase(), logical_offset(lo) { }
704 /// ctor for delayed initialization (see decode_some())
705 explicit Extent() : ExtentBase() {
706 }
707 /// ctor for general usage
708 Extent(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b)
709 : ExtentBase(),
710 logical_offset(lo), blob_offset(o), length(l) {
711 assign_blob(b);
712 }
713 ~Extent() {
714 if (blob) {
715 blob->shared_blob->get_cache()->rm_extent();
716 }
717 }
718
719 void dump(Formatter* f) const;
720
721 void assign_blob(const BlobRef& b) {
722 ceph_assert(!blob);
723 blob = b;
724 blob->shared_blob->get_cache()->add_extent();
725 }
726
727 // comparators for intrusive_set
728 friend bool operator<(const Extent &a, const Extent &b) {
729 return a.logical_offset < b.logical_offset;
730 }
731 friend bool operator>(const Extent &a, const Extent &b) {
732 return a.logical_offset > b.logical_offset;
733 }
734 friend bool operator==(const Extent &a, const Extent &b) {
735 return a.logical_offset == b.logical_offset;
736 }
737
738 uint32_t blob_start() const {
739 return logical_offset - blob_offset;
740 }
741
742 uint32_t blob_end() const {
743 return blob_start() + blob->get_blob().get_logical_length();
744 }
745
746 uint32_t logical_end() const {
747 return logical_offset + length;
748 }
749
750 // return true if any piece of the blob is out of
751 // the given range [o, o + l].
752 bool blob_escapes_range(uint32_t o, uint32_t l) const {
753 return blob_start() < o || blob_end() > o + l;
754 }
755 };
756 typedef boost::intrusive::set<Extent> extent_map_t;
757
758
759 friend ostream& operator<<(ostream& out, const Extent& e);
760
761 struct OldExtent {
762 boost::intrusive::list_member_hook<> old_extent_item;
763 Extent e;
764 PExtentVector r;
765 bool blob_empty; // flag to track the last removed extent that makes blob
766 // empty - required to update compression stat properly
767 OldExtent(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b)
768 : e(lo, o, l, b), blob_empty(false) {
769 }
770 static OldExtent* create(CollectionRef c,
771 uint32_t lo,
772 uint32_t o,
773 uint32_t l,
774 BlobRef& b);
775 };
776 typedef boost::intrusive::list<
777 OldExtent,
778 boost::intrusive::member_hook<
779 OldExtent,
780 boost::intrusive::list_member_hook<>,
781 &OldExtent::old_extent_item> > old_extent_map_t;
782
783 struct Onode;
784
785 /// a sharded extent map, mapping offsets to lextents to blobs
786 struct ExtentMap {
787 Onode *onode;
788 extent_map_t extent_map; ///< map of Extents to Blobs
789 blob_map_t spanning_blob_map; ///< blobs that span shards
790 typedef boost::intrusive_ptr<Onode> OnodeRef;
791
792 struct Shard {
793 bluestore_onode_t::shard_info *shard_info = nullptr;
794 unsigned extents = 0; ///< count extents in this shard
795 bool loaded = false; ///< true if shard is loaded
796 bool dirty = false; ///< true if shard is dirty and needs reencoding
797 };
798 mempool::bluestore_cache_meta::vector<Shard> shards; ///< shards
799
800 bufferlist inline_bl; ///< cached encoded map, if unsharded; empty=>dirty
801
802 uint32_t needs_reshard_begin = 0;
803 uint32_t needs_reshard_end = 0;
804
805 void dup(BlueStore* b, TransContext*, CollectionRef&, OnodeRef&, OnodeRef&,
806 uint64_t&, uint64_t&, uint64_t&);
807
808 bool needs_reshard() const {
809 return needs_reshard_end > needs_reshard_begin;
810 }
811 void clear_needs_reshard() {
812 needs_reshard_begin = needs_reshard_end = 0;
813 }
814 void request_reshard(uint32_t begin, uint32_t end) {
815 if (begin < needs_reshard_begin) {
816 needs_reshard_begin = begin;
817 }
818 if (end > needs_reshard_end) {
819 needs_reshard_end = end;
820 }
821 }
822
823 struct DeleteDisposer {
824 void operator()(Extent *e) { delete e; }
825 };
826
827 ExtentMap(Onode *o);
828 ~ExtentMap() {
829 extent_map.clear_and_dispose(DeleteDisposer());
830 }
831
832 void clear() {
833 extent_map.clear_and_dispose(DeleteDisposer());
834 shards.clear();
835 inline_bl.clear();
836 clear_needs_reshard();
837 }
838
839 void dump(Formatter* f) const;
840
841 bool encode_some(uint32_t offset, uint32_t length, bufferlist& bl,
842 unsigned *pn);
843 unsigned decode_some(bufferlist& bl);
844
845 void bound_encode_spanning_blobs(size_t& p);
846 void encode_spanning_blobs(bufferlist::contiguous_appender& p);
847 void decode_spanning_blobs(bufferptr::const_iterator& p);
848
849 BlobRef get_spanning_blob(int id) {
850 auto p = spanning_blob_map.find(id);
851 ceph_assert(p != spanning_blob_map.end());
852 return p->second;
853 }
854
855 void update(KeyValueDB::Transaction t, bool force);
856 decltype(BlueStore::Blob::id) allocate_spanning_blob_id();
857 void reshard(
858 KeyValueDB *db,
859 KeyValueDB::Transaction t);
860
861 /// initialize Shards from the onode
862 void init_shards(bool loaded, bool dirty);
863
864 /// return index of shard containing offset
865 /// or -1 if not found
866 int seek_shard(uint32_t offset) {
867 size_t end = shards.size();
868 size_t mid, left = 0;
869 size_t right = end; // one passed the right end
870
871 while (left < right) {
872 mid = left + (right - left) / 2;
873 if (offset >= shards[mid].shard_info->offset) {
874 size_t next = mid + 1;
875 if (next >= end || offset < shards[next].shard_info->offset)
876 return mid;
877 //continue to search forwards
878 left = next;
879 } else {
880 //continue to search backwards
881 right = mid;
882 }
883 }
884
885 return -1; // not found
886 }
887
888 /// check if a range spans a shard
889 bool spans_shard(uint32_t offset, uint32_t length) {
890 if (shards.empty()) {
891 return false;
892 }
893 int s = seek_shard(offset);
894 ceph_assert(s >= 0);
895 if (s == (int)shards.size() - 1) {
896 return false; // last shard
897 }
898 if (offset + length <= shards[s+1].shard_info->offset) {
899 return false;
900 }
901 return true;
902 }
903
904 /// ensure that a range of the map is loaded
905 void fault_range(KeyValueDB *db,
906 uint32_t offset, uint32_t length);
907
908 /// ensure a range of the map is marked dirty
909 void dirty_range(uint32_t offset, uint32_t length);
910
911 /// for seek_lextent test
912 extent_map_t::iterator find(uint64_t offset);
913
914 /// seek to the first lextent including or after offset
915 extent_map_t::iterator seek_lextent(uint64_t offset);
916 extent_map_t::const_iterator seek_lextent(uint64_t offset) const;
917
918 /// add a new Extent
919 void add(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b) {
920 extent_map.insert(*new Extent(lo, o, l, b));
921 }
922
923 /// remove (and delete) an Extent
924 void rm(extent_map_t::iterator p) {
925 extent_map.erase_and_dispose(p, DeleteDisposer());
926 }
927
928 bool has_any_lextents(uint64_t offset, uint64_t length);
929
930 /// consolidate adjacent lextents in extent_map
931 int compress_extent_map(uint64_t offset, uint64_t length);
932
933 /// punch a logical hole. add lextents to deref to target list.
934 void punch_hole(CollectionRef &c,
935 uint64_t offset, uint64_t length,
936 old_extent_map_t *old_extents);
937
938 /// put new lextent into lextent_map overwriting existing ones if
939 /// any and update references accordingly
940 Extent *set_lextent(CollectionRef &c,
941 uint64_t logical_offset,
942 uint64_t offset, uint64_t length,
943 BlobRef b,
944 old_extent_map_t *old_extents);
945
946 /// split a blob (and referring extents)
947 BlobRef split_blob(BlobRef lb, uint32_t blob_offset, uint32_t pos);
948 };
949
950 /// Compressed Blob Garbage collector
951 /*
952 The primary idea of the collector is to estimate a difference between
953 allocation units(AU) currently present for compressed blobs and new AUs
954 required to store that data uncompressed.
955 Estimation is performed for protrusive extents within a logical range
956 determined by a concatenation of old_extents collection and specific(current)
957 write request.
958 The root cause for old_extents use is the need to handle blob ref counts
959 properly. Old extents still hold blob refs and hence we need to traverse
960 the collection to determine if blob to be released.
961 Protrusive extents are extents that fit into the blob set in action
962 (ones that are below the logical range from above) but not removed totally
963 due to the current write.
964 E.g. for
965 extent1 <loffs = 100, boffs = 100, len = 100> ->
966 blob1<compressed, len_on_disk=4096, logical_len=8192>
967 extent2 <loffs = 200, boffs = 200, len = 100> ->
968 blob2<raw, len_on_disk=4096, llen=4096>
969 extent3 <loffs = 300, boffs = 300, len = 100> ->
970 blob1<compressed, len_on_disk=4096, llen=8192>
971 extent4 <loffs = 4096, boffs = 0, len = 100> ->
972 blob3<raw, len_on_disk=4096, llen=4096>
973 write(300~100)
974 protrusive extents are within the following ranges <0~300, 400~8192-400>
975 In this case existing AUs that might be removed due to GC (i.e. blob1)
976 use 2x4K bytes.
977 And new AUs expected after GC = 0 since extent1 to be merged into blob2.
978 Hence we should do a collect.
979 */
980 class GarbageCollector
981 {
982 public:
983 /// return amount of allocation units that might be saved due to GC
984 int64_t estimate(
985 uint64_t offset,
986 uint64_t length,
987 const ExtentMap& extent_map,
988 const old_extent_map_t& old_extents,
989 uint64_t min_alloc_size);
990
991 /// return a collection of extents to perform GC on
992 const interval_set<uint64_t>& get_extents_to_collect() const {
993 return extents_to_collect;
994 }
995 GarbageCollector(CephContext* _cct) : cct(_cct) {}
996
997 private:
998 struct BlobInfo {
999 uint64_t referenced_bytes = 0; ///< amount of bytes referenced in blob
1000 int64_t expected_allocations = 0; ///< new alloc units required
1001 ///< in case of gc fulfilled
1002 bool collect_candidate = false; ///< indicate if blob has any extents
1003 ///< eligible for GC.
1004 extent_map_t::const_iterator first_lextent; ///< points to the first
1005 ///< lextent referring to
1006 ///< the blob if any.
1007 ///< collect_candidate flag
1008 ///< determines the validity
1009 extent_map_t::const_iterator last_lextent; ///< points to the last
1010 ///< lextent referring to
1011 ///< the blob if any.
1012
1013 BlobInfo(uint64_t ref_bytes) :
1014 referenced_bytes(ref_bytes) {
1015 }
1016 };
1017 CephContext* cct;
1018 map<Blob*, BlobInfo> affected_blobs; ///< compressed blobs and their ref_map
1019 ///< copies that are affected by the
1020 ///< specific write
1021
1022 ///< protrusive extents that should be collected if GC takes place
1023 interval_set<uint64_t> extents_to_collect;
1024
1025 boost::optional<uint64_t > used_alloc_unit; ///< last processed allocation
1026 ///< unit when traversing
1027 ///< protrusive extents.
1028 ///< Other extents mapped to
1029 ///< this AU to be ignored
1030 ///< (except the case where
1031 ///< uncompressed extent follows
1032 ///< compressed one - see below).
1033 BlobInfo* blob_info_counted = nullptr; ///< set if previous allocation unit
1034 ///< caused expected_allocations
1035 ///< counter increment at this blob.
1036 ///< if uncompressed extent follows
1037 ///< a decrement for the
1038 ///< expected_allocations counter
1039 ///< is needed
1040 int64_t expected_allocations = 0; ///< new alloc units required in case
1041 ///< of gc fulfilled
1042 int64_t expected_for_release = 0; ///< alloc units currently used by
1043 ///< compressed blobs that might
1044 ///< gone after GC
1045
1046 protected:
1047 void process_protrusive_extents(const BlueStore::ExtentMap& extent_map,
1048 uint64_t start_offset,
1049 uint64_t end_offset,
1050 uint64_t start_touch_offset,
1051 uint64_t end_touch_offset,
1052 uint64_t min_alloc_size);
1053 };
1054
1055 struct OnodeSpace;
1056 /// an in-memory object
1057 struct Onode {
1058 MEMPOOL_CLASS_HELPERS();
1059
1060 std::atomic_int nref; ///< reference count
1061 Collection *c;
1062 ghobject_t oid;
1063
1064 /// key under PREFIX_OBJ where we are stored
1065 mempool::bluestore_cache_meta::string key;
1066
1067 boost::intrusive::list_member_hook<> lru_item;
1068
1069 bluestore_onode_t onode; ///< metadata stored as value in kv store
1070 bool exists; ///< true if object logically exists
1071 bool cached; ///< Onode is logically in the cache
1072 /// (it can be pinned and hence physically out
1073 /// of it at the moment though)
1074 std::atomic_bool pinned; ///< Onode is pinned
1075 /// (or should be pinned when cached)
1076 ExtentMap extent_map;
1077
1078 // track txc's that have not been committed to kv store (and whose
1079 // effects cannot be read via the kvdb read methods)
1080 std::atomic<int> flushing_count = {0};
1081 std::atomic<int> waiting_count = {0};
1082 /// protect flush_txns
1083 ceph::mutex flush_lock = ceph::make_mutex("BlueStore::Onode::flush_lock");
1084 ceph::condition_variable flush_cond; ///< wait here for uncommitted txns
1085
1086 Onode(Collection *c, const ghobject_t& o,
1087 const mempool::bluestore_cache_meta::string& k)
1088 : nref(0),
1089 c(c),
1090 oid(o),
1091 key(k),
1092 exists(false),
1093 cached(false),
1094 pinned(false),
1095 extent_map(this) {
1096 }
1097 Onode(Collection* c, const ghobject_t& o,
1098 const std::string& k)
1099 : nref(0),
1100 c(c),
1101 oid(o),
1102 key(k),
1103 exists(false),
1104 cached(false),
1105 pinned(false),
1106 extent_map(this) {
1107 }
1108 Onode(Collection* c, const ghobject_t& o,
1109 const char* k)
1110 : nref(0),
1111 c(c),
1112 oid(o),
1113 key(k),
1114 exists(false),
1115 cached(false),
1116 pinned(false),
1117 extent_map(this) {
1118 }
1119
1120 static Onode* decode(
1121 CollectionRef c,
1122 const ghobject_t& oid,
1123 const string& key,
1124 const bufferlist& v);
1125
1126 void dump(Formatter* f) const;
1127
1128 void flush();
1129 void get();
1130 void put();
1131
1132 inline bool put_cache() {
1133 ceph_assert(!cached);
1134 cached = true;
1135 return !pinned;
1136 }
1137 inline bool pop_cache() {
1138 ceph_assert(cached);
1139 cached = false;
1140 return !pinned;
1141 }
1142
1143 const string& get_omap_prefix();
1144 void get_omap_header(string *out);
1145 void get_omap_key(const string& key, string *out);
1146 void rewrite_omap_key(const string& old, string *out);
1147 void get_omap_tail(string *out);
1148 void decode_omap_key(const string& key, string *user_key);
1149 };
1150 typedef boost::intrusive_ptr<Onode> OnodeRef;
1151
1152 /// A generic Cache Shard
1153 struct CacheShard {
1154 CephContext *cct;
1155 PerfCounters *logger;
1156
1157 /// protect lru and other structures
1158 ceph::recursive_mutex lock = {
1159 ceph::make_recursive_mutex("BlueStore::CacheShard::lock") };
1160
1161 std::atomic<uint64_t> max = {0};
1162 std::atomic<uint64_t> num = {0};
1163
1164 CacheShard(CephContext* cct) : cct(cct), logger(nullptr) {}
1165 virtual ~CacheShard() {}
1166
1167 void set_max(uint64_t max_) {
1168 max = max_;
1169 }
1170
1171 uint64_t _get_num() {
1172 return num;
1173 }
1174
1175 virtual void _trim_to(uint64_t new_size) = 0;
1176 void _trim() {
1177 if (cct->_conf->objectstore_blackhole) {
1178 // do not trim if we are throwing away IOs a layer down
1179 return;
1180 }
1181 _trim_to(max);
1182 }
1183
1184 void trim() {
1185 std::lock_guard l(lock);
1186 _trim();
1187 }
1188 void flush() {
1189 std::lock_guard l(lock);
1190 // we should not be shutting down after the blackhole is enabled
1191 assert(!cct->_conf->objectstore_blackhole);
1192 _trim_to(0);
1193 }
1194
1195 #ifdef DEBUG_CACHE
1196 virtual void _audit(const char *s) = 0;
1197 #else
1198 void _audit(const char *s) { /* no-op */ }
1199 #endif
1200 };
1201
1202 /// A Generic onode Cache Shard
1203 struct OnodeCacheShard : public CacheShard {
1204 std::atomic<uint64_t> num_pinned = {0};
1205
1206 std::array<std::pair<ghobject_t, mono_clock::time_point>, 64> dumped_onodes;
1207
1208 virtual void _pin(Onode* o) = 0;
1209 virtual void _unpin(Onode* o) = 0;
1210
1211 public:
1212 OnodeCacheShard(CephContext* cct) : CacheShard(cct) {}
1213 static OnodeCacheShard *create(CephContext* cct, string type,
1214 PerfCounters *logger);
1215 virtual void _add(Onode* o, int level) = 0;
1216 virtual void _rm(Onode* o) = 0;
1217 virtual void _unpin_and_rm(Onode* o) = 0;
1218
1219 virtual void move_pinned(OnodeCacheShard *to, Onode *o) = 0;
1220 virtual void add_stats(uint64_t *onodes, uint64_t *pinned_onodes) = 0;
1221 bool empty() {
1222 return _get_num() == 0;
1223 }
1224 };
1225
1226 /// A Generic buffer Cache Shard
1227 struct BufferCacheShard : public CacheShard {
1228 std::atomic<uint64_t> num_extents = {0};
1229 std::atomic<uint64_t> num_blobs = {0};
1230 uint64_t buffer_bytes = 0;
1231
1232 public:
1233 BufferCacheShard(CephContext* cct) : CacheShard(cct) {}
1234 static BufferCacheShard *create(CephContext* cct, string type,
1235 PerfCounters *logger);
1236 virtual void _add(Buffer *b, int level, Buffer *near) = 0;
1237 virtual void _rm(Buffer *b) = 0;
1238 virtual void _move(BufferCacheShard *src, Buffer *b) = 0;
1239 virtual void _touch(Buffer *b) = 0;
1240 virtual void _adjust_size(Buffer *b, int64_t delta) = 0;
1241
1242 uint64_t _get_bytes() {
1243 return buffer_bytes;
1244 }
1245
1246 void add_extent() {
1247 ++num_extents;
1248 }
1249 void rm_extent() {
1250 --num_extents;
1251 }
1252
1253 void add_blob() {
1254 ++num_blobs;
1255 }
1256 void rm_blob() {
1257 --num_blobs;
1258 }
1259
1260 virtual void add_stats(uint64_t *extents,
1261 uint64_t *blobs,
1262 uint64_t *buffers,
1263 uint64_t *bytes) = 0;
1264
1265 bool empty() {
1266 std::lock_guard l(lock);
1267 return _get_bytes() == 0;
1268 }
1269 };
1270
1271 struct OnodeSpace {
1272 OnodeCacheShard *cache;
1273
1274 private:
1275 /// forward lookups
1276 mempool::bluestore_cache_meta::unordered_map<ghobject_t,OnodeRef> onode_map;
1277
1278 friend struct Collection; // for split_cache()
1279 friend struct Onode; // for put()
1280 friend struct LruOnodeCacheShard;
1281 void _remove(const ghobject_t& oid);
1282 public:
1283 OnodeSpace(OnodeCacheShard *c) : cache(c) {}
1284 ~OnodeSpace() {
1285 clear();
1286 }
1287
1288 OnodeRef add(const ghobject_t& oid, OnodeRef& o);
1289 OnodeRef lookup(const ghobject_t& o);
1290 void rename(OnodeRef& o, const ghobject_t& old_oid,
1291 const ghobject_t& new_oid,
1292 const mempool::bluestore_cache_meta::string& new_okey);
1293 void clear();
1294 bool empty();
1295
1296 template <int LogLevelV>
1297 void dump(CephContext *cct);
1298
1299 /// return true if f true for any item
1300 bool map_any(std::function<bool(Onode*)> f);
1301 };
1302
1303 class OpSequencer;
1304 using OpSequencerRef = ceph::ref_t<OpSequencer>;
1305
1306 struct Collection : public CollectionImpl {
1307 BlueStore *store;
1308 OpSequencerRef osr;
1309 BufferCacheShard *cache; ///< our cache shard
1310 bluestore_cnode_t cnode;
1311 ceph::shared_mutex lock =
1312 ceph::make_shared_mutex("BlueStore::Collection::lock", true, false);
1313
1314 bool exists;
1315
1316 SharedBlobSet shared_blob_set; ///< open SharedBlobs
1317
1318 // cache onodes on a per-collection basis to avoid lock
1319 // contention.
1320 OnodeSpace onode_map;
1321
1322 //pool options
1323 pool_opts_t pool_opts;
1324 ContextQueue *commit_queue;
1325
1326 OnodeCacheShard* get_onode_cache() const {
1327 return onode_map.cache;
1328 }
1329 OnodeRef get_onode(const ghobject_t& oid, bool create, bool is_createop=false);
1330
1331 // the terminology is confusing here, sorry!
1332 //
1333 // blob_t shared_blob_t
1334 // !shared unused -> open
1335 // shared !loaded -> open + shared
1336 // shared loaded -> open + shared + loaded
1337 //
1338 // i.e.,
1339 // open = SharedBlob is instantiated
1340 // shared = blob_t shared flag is set; SharedBlob is hashed.
1341 // loaded = SharedBlob::shared_blob_t is loaded from kv store
1342 void open_shared_blob(uint64_t sbid, BlobRef b);
1343 void load_shared_blob(SharedBlobRef sb);
1344 void make_blob_shared(uint64_t sbid, BlobRef b);
1345 uint64_t make_blob_unshared(SharedBlob *sb);
1346
1347 BlobRef new_blob() {
1348 BlobRef b = new Blob();
1349 b->shared_blob = new SharedBlob(this);
1350 return b;
1351 }
1352
1353 bool contains(const ghobject_t& oid) {
1354 if (cid.is_meta())
1355 return oid.hobj.pool == -1;
1356 spg_t spgid;
1357 if (cid.is_pg(&spgid))
1358 return
1359 spgid.pgid.contains(cnode.bits, oid) &&
1360 oid.shard_id == spgid.shard;
1361 return false;
1362 }
1363
1364 int64_t pool() const {
1365 return cid.pool();
1366 }
1367
1368 void split_cache(Collection *dest);
1369
1370 bool flush_commit(Context *c) override;
1371 void flush() override;
1372 void flush_all_but_last();
1373
1374 Collection(BlueStore *ns, OnodeCacheShard *oc, BufferCacheShard *bc, coll_t c);
1375 };
1376
1377 class OmapIteratorImpl : public ObjectMap::ObjectMapIteratorImpl {
1378 CollectionRef c;
1379 OnodeRef o;
1380 KeyValueDB::Iterator it;
1381 string head, tail;
1382
1383 string _stringify() const;
1384
1385 public:
1386 OmapIteratorImpl(CollectionRef c, OnodeRef o, KeyValueDB::Iterator it);
1387 int seek_to_first() override;
1388 int upper_bound(const string &after) override;
1389 int lower_bound(const string &to) override;
1390 bool valid() override;
1391 int next() override;
1392 string key() override;
1393 bufferlist value() override;
1394 std::string tail_key() {
1395 return tail;
1396 }
1397
1398 int status() override {
1399 return 0;
1400 }
1401 };
1402
1403 struct volatile_statfs{
1404 enum {
1405 STATFS_ALLOCATED = 0,
1406 STATFS_STORED,
1407 STATFS_COMPRESSED_ORIGINAL,
1408 STATFS_COMPRESSED,
1409 STATFS_COMPRESSED_ALLOCATED,
1410 STATFS_LAST
1411 };
1412 int64_t values[STATFS_LAST];
1413 volatile_statfs() {
1414 memset(this, 0, sizeof(volatile_statfs));
1415 }
1416 void reset() {
1417 *this = volatile_statfs();
1418 }
1419 void publish(store_statfs_t* buf) const {
1420 buf->allocated = allocated();
1421 buf->data_stored = stored();
1422 buf->data_compressed = compressed();
1423 buf->data_compressed_original = compressed_original();
1424 buf->data_compressed_allocated = compressed_allocated();
1425 }
1426
1427 volatile_statfs& operator+=(const volatile_statfs& other) {
1428 for (size_t i = 0; i < STATFS_LAST; ++i) {
1429 values[i] += other.values[i];
1430 }
1431 return *this;
1432 }
1433 int64_t& allocated() {
1434 return values[STATFS_ALLOCATED];
1435 }
1436 int64_t& stored() {
1437 return values[STATFS_STORED];
1438 }
1439 int64_t& compressed_original() {
1440 return values[STATFS_COMPRESSED_ORIGINAL];
1441 }
1442 int64_t& compressed() {
1443 return values[STATFS_COMPRESSED];
1444 }
1445 int64_t& compressed_allocated() {
1446 return values[STATFS_COMPRESSED_ALLOCATED];
1447 }
1448 int64_t allocated() const {
1449 return values[STATFS_ALLOCATED];
1450 }
1451 int64_t stored() const {
1452 return values[STATFS_STORED];
1453 }
1454 int64_t compressed_original() const {
1455 return values[STATFS_COMPRESSED_ORIGINAL];
1456 }
1457 int64_t compressed() const {
1458 return values[STATFS_COMPRESSED];
1459 }
1460 int64_t compressed_allocated() const {
1461 return values[STATFS_COMPRESSED_ALLOCATED];
1462 }
1463 volatile_statfs& operator=(const store_statfs_t& st) {
1464 values[STATFS_ALLOCATED] = st.allocated;
1465 values[STATFS_STORED] = st.data_stored;
1466 values[STATFS_COMPRESSED_ORIGINAL] = st.data_compressed_original;
1467 values[STATFS_COMPRESSED] = st.data_compressed;
1468 values[STATFS_COMPRESSED_ALLOCATED] = st.data_compressed_allocated;
1469 return *this;
1470 }
1471 bool is_empty() {
1472 return values[STATFS_ALLOCATED] == 0 &&
1473 values[STATFS_STORED] == 0 &&
1474 values[STATFS_COMPRESSED] == 0 &&
1475 values[STATFS_COMPRESSED_ORIGINAL] == 0 &&
1476 values[STATFS_COMPRESSED_ALLOCATED] == 0;
1477 }
1478 void decode(bufferlist::const_iterator& it) {
1479 using ceph::decode;
1480 for (size_t i = 0; i < STATFS_LAST; i++) {
1481 decode(values[i], it);
1482 }
1483 }
1484
1485 void encode(bufferlist& bl) {
1486 using ceph::encode;
1487 for (size_t i = 0; i < STATFS_LAST; i++) {
1488 encode(values[i], bl);
1489 }
1490 }
1491 };
1492
1493 struct TransContext final : public AioContext {
1494 MEMPOOL_CLASS_HELPERS();
1495
1496 typedef enum {
1497 STATE_PREPARE,
1498 STATE_AIO_WAIT,
1499 STATE_IO_DONE,
1500 STATE_KV_QUEUED, // queued for kv_sync_thread submission
1501 STATE_KV_SUBMITTED, // submitted to kv; not yet synced
1502 STATE_KV_DONE,
1503 STATE_DEFERRED_QUEUED, // in deferred_queue (pending or running)
1504 STATE_DEFERRED_CLEANUP, // remove deferred kv record
1505 STATE_DEFERRED_DONE,
1506 STATE_FINISHING,
1507 STATE_DONE,
1508 } state_t;
1509
1510 state_t state = STATE_PREPARE;
1511
1512 const char *get_state_name() {
1513 switch (state) {
1514 case STATE_PREPARE: return "prepare";
1515 case STATE_AIO_WAIT: return "aio_wait";
1516 case STATE_IO_DONE: return "io_done";
1517 case STATE_KV_QUEUED: return "kv_queued";
1518 case STATE_KV_SUBMITTED: return "kv_submitted";
1519 case STATE_KV_DONE: return "kv_done";
1520 case STATE_DEFERRED_QUEUED: return "deferred_queued";
1521 case STATE_DEFERRED_CLEANUP: return "deferred_cleanup";
1522 case STATE_DEFERRED_DONE: return "deferred_done";
1523 case STATE_FINISHING: return "finishing";
1524 case STATE_DONE: return "done";
1525 }
1526 return "???";
1527 }
1528
1529 #if defined(WITH_LTTNG)
1530 const char *get_state_latency_name(int state) {
1531 switch (state) {
1532 case l_bluestore_state_prepare_lat: return "prepare";
1533 case l_bluestore_state_aio_wait_lat: return "aio_wait";
1534 case l_bluestore_state_io_done_lat: return "io_done";
1535 case l_bluestore_state_kv_queued_lat: return "kv_queued";
1536 case l_bluestore_state_kv_committing_lat: return "kv_committing";
1537 case l_bluestore_state_kv_done_lat: return "kv_done";
1538 case l_bluestore_state_deferred_queued_lat: return "deferred_queued";
1539 case l_bluestore_state_deferred_cleanup_lat: return "deferred_cleanup";
1540 case l_bluestore_state_finishing_lat: return "finishing";
1541 case l_bluestore_state_done_lat: return "done";
1542 }
1543 return "???";
1544 }
1545 #endif
1546
1547 CollectionRef ch;
1548 OpSequencerRef osr; // this should be ch->osr
1549 boost::intrusive::list_member_hook<> sequencer_item;
1550
1551 uint64_t bytes = 0, ios = 0, cost = 0;
1552
1553 set<OnodeRef> onodes; ///< these need to be updated/written
1554 set<OnodeRef> modified_objects; ///< objects we modified (and need a ref)
1555 set<SharedBlobRef> shared_blobs; ///< these need to be updated/written
1556 set<SharedBlobRef> shared_blobs_written; ///< update these on io completion
1557
1558 KeyValueDB::Transaction t; ///< then we will commit this
1559 list<Context*> oncommits; ///< more commit completions
1560 list<CollectionRef> removed_collections; ///< colls we removed
1561
1562 boost::intrusive::list_member_hook<> deferred_queue_item;
1563 bluestore_deferred_transaction_t *deferred_txn = nullptr; ///< if any
1564
1565 interval_set<uint64_t> allocated, released;
1566 volatile_statfs statfs_delta; ///< overall store statistics delta
1567 uint64_t osd_pool_id = META_POOL_ID; ///< osd pool id we're operating on
1568
1569 IOContext ioc;
1570 bool had_ios = false; ///< true if we submitted IOs before our kv txn
1571
1572 uint64_t seq = 0;
1573 mono_clock::time_point start;
1574 mono_clock::time_point last_stamp;
1575
1576 uint64_t last_nid = 0; ///< if non-zero, highest new nid we allocated
1577 uint64_t last_blobid = 0; ///< if non-zero, highest new blobid we allocated
1578
1579 #if defined(WITH_LTTNG)
1580 bool tracing = false;
1581 #endif
1582
1583 explicit TransContext(CephContext* cct, Collection *c, OpSequencer *o,
1584 list<Context*> *on_commits)
1585 : ch(c),
1586 osr(o),
1587 ioc(cct, this),
1588 start(mono_clock::now()) {
1589 last_stamp = start;
1590 if (on_commits) {
1591 oncommits.swap(*on_commits);
1592 }
1593 }
1594 ~TransContext() {
1595 delete deferred_txn;
1596 }
1597
1598 void write_onode(OnodeRef &o) {
1599 onodes.insert(o);
1600 }
1601 void write_shared_blob(SharedBlobRef &sb) {
1602 shared_blobs.insert(sb);
1603 }
1604 void unshare_blob(SharedBlob *sb) {
1605 shared_blobs.erase(sb);
1606 }
1607
1608 /// note we logically modified object (when onode itself is unmodified)
1609 void note_modified_object(OnodeRef &o) {
1610 // onode itself isn't written, though
1611 modified_objects.insert(o);
1612 }
1613 void note_removed_object(OnodeRef& o) {
1614 onodes.erase(o);
1615 modified_objects.insert(o);
1616 }
1617
1618 void aio_finish(BlueStore *store) override {
1619 store->txc_aio_finish(this);
1620 }
1621 };
1622
1623 class BlueStoreThrottle {
1624 #if defined(WITH_LTTNG)
1625 const std::chrono::time_point<mono_clock> time_base = mono_clock::now();
1626
1627 // Time of last chosen io (microseconds)
1628 std::atomic<uint64_t> previous_emitted_tp_time_mono_mcs = {0};
1629 std::atomic<uint64_t> ios_started_since_last_traced = {0};
1630 std::atomic<uint64_t> ios_completed_since_last_traced = {0};
1631
1632 std::atomic_uint pending_kv_ios = {0};
1633 std::atomic_uint pending_deferred_ios = {0};
1634
1635 // Min period between trace points (microseconds)
1636 std::atomic<uint64_t> trace_period_mcs = {0};
1637
1638 bool should_trace(
1639 uint64_t *started,
1640 uint64_t *completed) {
1641 uint64_t min_period_mcs = trace_period_mcs.load(
1642 std::memory_order_relaxed);
1643
1644 if (min_period_mcs == 0) {
1645 *started = 1;
1646 *completed = ios_completed_since_last_traced.exchange(0);
1647 return true;
1648 } else {
1649 ios_started_since_last_traced++;
1650 auto now_mcs = ceph::to_microseconds<uint64_t>(
1651 mono_clock::now() - time_base);
1652 uint64_t previous_mcs = previous_emitted_tp_time_mono_mcs;
1653 uint64_t period_mcs = now_mcs - previous_mcs;
1654 if (period_mcs > min_period_mcs) {
1655 if (previous_emitted_tp_time_mono_mcs.compare_exchange_strong(
1656 previous_mcs, now_mcs)) {
1657 // This would be racy at a sufficiently extreme trace rate, but isn't
1658 // worth the overhead of doing it more carefully.
1659 *started = ios_started_since_last_traced.exchange(0);
1660 *completed = ios_completed_since_last_traced.exchange(0);
1661 return true;
1662 }
1663 }
1664 return false;
1665 }
1666 }
1667 #endif
1668
1669 #if defined(WITH_LTTNG)
1670 void emit_initial_tracepoint(
1671 KeyValueDB &db,
1672 TransContext &txc,
1673 mono_clock::time_point);
1674 #else
1675 void emit_initial_tracepoint(
1676 KeyValueDB &db,
1677 TransContext &txc,
1678 mono_clock::time_point) {}
1679 #endif
1680
1681 Throttle throttle_bytes; ///< submit to commit
1682 Throttle throttle_deferred_bytes; ///< submit to deferred complete
1683
1684 public:
1685 BlueStoreThrottle(CephContext *cct) :
1686 throttle_bytes(cct, "bluestore_throttle_bytes", 0),
1687 throttle_deferred_bytes(cct, "bluestore_throttle_deferred_bytes", 0)
1688 {
1689 reset_throttle(cct->_conf);
1690 }
1691
1692 #if defined(WITH_LTTNG)
1693 void complete_kv(TransContext &txc);
1694 void complete(TransContext &txc);
1695 #else
1696 void complete_kv(TransContext &txc) {}
1697 void complete(TransContext &txc) {}
1698 #endif
1699
1700 mono_clock::duration log_state_latency(
1701 TransContext &txc, PerfCounters *logger, int state);
1702 bool try_start_transaction(
1703 KeyValueDB &db,
1704 TransContext &txc,
1705 mono_clock::time_point);
1706 void finish_start_transaction(
1707 KeyValueDB &db,
1708 TransContext &txc,
1709 mono_clock::time_point);
1710 void release_kv_throttle(uint64_t cost) {
1711 throttle_bytes.put(cost);
1712 }
1713 void release_deferred_throttle(uint64_t cost) {
1714 throttle_deferred_bytes.put(cost);
1715 }
1716 bool should_submit_deferred() {
1717 return throttle_deferred_bytes.past_midpoint();
1718 }
1719 void reset_throttle(const ConfigProxy &conf) {
1720 throttle_bytes.reset_max(conf->bluestore_throttle_bytes);
1721 throttle_deferred_bytes.reset_max(
1722 conf->bluestore_throttle_bytes +
1723 conf->bluestore_throttle_deferred_bytes);
1724 #if defined(WITH_LTTNG)
1725 double rate = conf.get_val<double>("bluestore_throttle_trace_rate");
1726 trace_period_mcs = rate > 0 ? floor((1/rate) * 1000000.0) : 0;
1727 #endif
1728 }
1729 } throttle;
1730
1731 typedef boost::intrusive::list<
1732 TransContext,
1733 boost::intrusive::member_hook<
1734 TransContext,
1735 boost::intrusive::list_member_hook<>,
1736 &TransContext::deferred_queue_item> > deferred_queue_t;
1737
1738 struct DeferredBatch final : public AioContext {
1739 OpSequencer *osr;
1740 struct deferred_io {
1741 bufferlist bl; ///< data
1742 uint64_t seq; ///< deferred transaction seq
1743 };
1744 map<uint64_t,deferred_io> iomap; ///< map of ios in this batch
1745 deferred_queue_t txcs; ///< txcs in this batch
1746 IOContext ioc; ///< our aios
1747 /// bytes of pending io for each deferred seq (may be 0)
1748 map<uint64_t,int> seq_bytes;
1749
1750 void _discard(CephContext *cct, uint64_t offset, uint64_t length);
1751 void _audit(CephContext *cct);
1752
1753 DeferredBatch(CephContext *cct, OpSequencer *osr)
1754 : osr(osr), ioc(cct, this) {}
1755
1756 /// prepare a write
1757 void prepare_write(CephContext *cct,
1758 uint64_t seq, uint64_t offset, uint64_t length,
1759 bufferlist::const_iterator& p);
1760
1761 void aio_finish(BlueStore *store) override {
1762 store->_deferred_aio_finish(osr);
1763 }
1764 };
1765
1766 class OpSequencer : public RefCountedObject {
1767 public:
1768 ceph::mutex qlock = ceph::make_mutex("BlueStore::OpSequencer::qlock");
1769 ceph::condition_variable qcond;
1770 typedef boost::intrusive::list<
1771 TransContext,
1772 boost::intrusive::member_hook<
1773 TransContext,
1774 boost::intrusive::list_member_hook<>,
1775 &TransContext::sequencer_item> > q_list_t;
1776 q_list_t q; ///< transactions
1777
1778 boost::intrusive::list_member_hook<> deferred_osr_queue_item;
1779
1780 DeferredBatch *deferred_running = nullptr;
1781 DeferredBatch *deferred_pending = nullptr;
1782
1783 BlueStore *store;
1784 coll_t cid;
1785
1786 uint64_t last_seq = 0;
1787
1788 std::atomic_int txc_with_unstable_io = {0}; ///< num txcs with unstable io
1789
1790 std::atomic_int kv_committing_serially = {0};
1791
1792 std::atomic_int kv_submitted_waiters = {0};
1793
1794 std::atomic_bool zombie = {false}; ///< in zombie_osr set (collection going away)
1795
1796 const uint32_t sequencer_id;
1797
1798 uint32_t get_sequencer_id() const {
1799 return sequencer_id;
1800 }
1801
1802 void queue_new(TransContext *txc) {
1803 std::lock_guard l(qlock);
1804 txc->seq = ++last_seq;
1805 q.push_back(*txc);
1806 }
1807
1808 void drain() {
1809 std::unique_lock l(qlock);
1810 while (!q.empty())
1811 qcond.wait(l);
1812 }
1813
1814 void drain_preceding(TransContext *txc) {
1815 std::unique_lock l(qlock);
1816 while (&q.front() != txc)
1817 qcond.wait(l);
1818 }
1819
1820 bool _is_all_kv_submitted() {
1821 // caller must hold qlock & q.empty() must not empty
1822 ceph_assert(!q.empty());
1823 TransContext *txc = &q.back();
1824 if (txc->state >= TransContext::STATE_KV_SUBMITTED) {
1825 return true;
1826 }
1827 return false;
1828 }
1829
1830 void flush() {
1831 std::unique_lock l(qlock);
1832 while (true) {
1833 // set flag before the check because the condition
1834 // may become true outside qlock, and we need to make
1835 // sure those threads see waiters and signal qcond.
1836 ++kv_submitted_waiters;
1837 if (q.empty() || _is_all_kv_submitted()) {
1838 --kv_submitted_waiters;
1839 return;
1840 }
1841 qcond.wait(l);
1842 --kv_submitted_waiters;
1843 }
1844 }
1845
1846 void flush_all_but_last() {
1847 std::unique_lock l(qlock);
1848 assert (q.size() >= 1);
1849 while (true) {
1850 // set flag before the check because the condition
1851 // may become true outside qlock, and we need to make
1852 // sure those threads see waiters and signal qcond.
1853 ++kv_submitted_waiters;
1854 if (q.size() <= 1) {
1855 --kv_submitted_waiters;
1856 return;
1857 } else {
1858 auto it = q.rbegin();
1859 it++;
1860 if (it->state >= TransContext::STATE_KV_SUBMITTED) {
1861 --kv_submitted_waiters;
1862 return;
1863 }
1864 }
1865 qcond.wait(l);
1866 --kv_submitted_waiters;
1867 }
1868 }
1869
1870 bool flush_commit(Context *c) {
1871 std::lock_guard l(qlock);
1872 if (q.empty()) {
1873 return true;
1874 }
1875 TransContext *txc = &q.back();
1876 if (txc->state >= TransContext::STATE_KV_DONE) {
1877 return true;
1878 }
1879 txc->oncommits.push_back(c);
1880 return false;
1881 }
1882 private:
1883 FRIEND_MAKE_REF(OpSequencer);
1884 OpSequencer(BlueStore *store, uint32_t sequencer_id, const coll_t& c)
1885 : RefCountedObject(store->cct),
1886 store(store), cid(c), sequencer_id(sequencer_id) {
1887 }
1888 ~OpSequencer() {
1889 ceph_assert(q.empty());
1890 }
1891 };
1892
1893 typedef boost::intrusive::list<
1894 OpSequencer,
1895 boost::intrusive::member_hook<
1896 OpSequencer,
1897 boost::intrusive::list_member_hook<>,
1898 &OpSequencer::deferred_osr_queue_item> > deferred_osr_queue_t;
1899
1900 struct KVSyncThread : public Thread {
1901 BlueStore *store;
1902 explicit KVSyncThread(BlueStore *s) : store(s) {}
1903 void *entry() override {
1904 store->_kv_sync_thread();
1905 return NULL;
1906 }
1907 };
1908 struct KVFinalizeThread : public Thread {
1909 BlueStore *store;
1910 explicit KVFinalizeThread(BlueStore *s) : store(s) {}
1911 void *entry() {
1912 store->_kv_finalize_thread();
1913 return NULL;
1914 }
1915 };
1916
1917 struct DBHistogram {
1918 struct value_dist {
1919 uint64_t count;
1920 uint32_t max_len;
1921 };
1922
1923 struct key_dist {
1924 uint64_t count;
1925 uint32_t max_len;
1926 map<int, struct value_dist> val_map; ///< slab id to count, max length of value and key
1927 };
1928
1929 map<string, map<int, struct key_dist> > key_hist;
1930 map<int, uint64_t> value_hist;
1931 int get_key_slab(size_t sz);
1932 string get_key_slab_to_range(int slab);
1933 int get_value_slab(size_t sz);
1934 string get_value_slab_to_range(int slab);
1935 void update_hist_entry(map<string, map<int, struct key_dist> > &key_hist,
1936 const string &prefix, size_t key_size, size_t value_size);
1937 void dump(Formatter *f);
1938 };
1939
1940 // --------------------------------------------------------
1941 // members
1942 private:
1943 BlueFS *bluefs = nullptr;
1944 bluefs_layout_t bluefs_layout;
1945 mono_time bluefs_last_balance;
1946 utime_t next_dump_on_bluefs_alloc_failure;
1947
1948 KeyValueDB *db = nullptr;
1949 BlockDevice *bdev = nullptr;
1950 std::string freelist_type;
1951 FreelistManager *fm = nullptr;
1952 Allocator *alloc = nullptr;
1953 uuid_d fsid;
1954 int path_fd = -1; ///< open handle to $path
1955 int fsid_fd = -1; ///< open handle (locked) to $path/fsid
1956 bool mounted = false;
1957
1958 ceph::shared_mutex coll_lock = ceph::make_shared_mutex("BlueStore::coll_lock"); ///< rwlock to protect coll_map
1959 mempool::bluestore_cache_other::unordered_map<coll_t, CollectionRef> coll_map;
1960 bool collections_had_errors = false;
1961 map<coll_t,CollectionRef> new_coll_map;
1962
1963 vector<OnodeCacheShard*> onode_cache_shards;
1964 vector<BufferCacheShard*> buffer_cache_shards;
1965
1966 /// protect zombie_osr_set
1967 ceph::mutex zombie_osr_lock = ceph::make_mutex("BlueStore::zombie_osr_lock");
1968 uint32_t next_sequencer_id = 0;
1969 std::map<coll_t,OpSequencerRef> zombie_osr_set; ///< set of OpSequencers for deleted collections
1970
1971 std::atomic<uint64_t> nid_last = {0};
1972 std::atomic<uint64_t> nid_max = {0};
1973 std::atomic<uint64_t> blobid_last = {0};
1974 std::atomic<uint64_t> blobid_max = {0};
1975
1976 interval_set<uint64_t> bluefs_extents; ///< block extents owned by bluefs
1977 interval_set<uint64_t> bluefs_extents_reclaiming; ///< currently reclaiming
1978
1979 ceph::mutex deferred_lock = ceph::make_mutex("BlueStore::deferred_lock");
1980 std::atomic<uint64_t> deferred_seq = {0};
1981 deferred_osr_queue_t deferred_queue; ///< osr's with deferred io pending
1982 int deferred_queue_size = 0; ///< num txc's queued across all osrs
1983 atomic_int deferred_aggressive = {0}; ///< aggressive wakeup of kv thread
1984 Finisher finisher;
1985 utime_t deferred_last_submitted = utime_t();
1986
1987 KVSyncThread kv_sync_thread;
1988 ceph::mutex kv_lock = ceph::make_mutex("BlueStore::kv_lock");
1989 ceph::condition_variable kv_cond;
1990 bool _kv_only = false;
1991 bool kv_sync_started = false;
1992 bool kv_stop = false;
1993 bool kv_finalize_started = false;
1994 bool kv_finalize_stop = false;
1995 deque<TransContext*> kv_queue; ///< ready, already submitted
1996 deque<TransContext*> kv_queue_unsubmitted; ///< ready, need submit by kv thread
1997 deque<TransContext*> kv_committing; ///< currently syncing
1998 deque<DeferredBatch*> deferred_done_queue; ///< deferred ios done
1999 bool kv_sync_in_progress = false;
2000
2001 KVFinalizeThread kv_finalize_thread;
2002 ceph::mutex kv_finalize_lock = ceph::make_mutex("BlueStore::kv_finalize_lock");
2003 ceph::condition_variable kv_finalize_cond;
2004 deque<TransContext*> kv_committing_to_finalize; ///< pending finalization
2005 deque<DeferredBatch*> deferred_stable_to_finalize; ///< pending finalization
2006 bool kv_finalize_in_progress = false;
2007
2008 PerfCounters *logger = nullptr;
2009
2010 list<CollectionRef> removed_collections;
2011
2012 ceph::shared_mutex debug_read_error_lock =
2013 ceph::make_shared_mutex("BlueStore::debug_read_error_lock");
2014 set<ghobject_t> debug_data_error_objects;
2015 set<ghobject_t> debug_mdata_error_objects;
2016
2017 std::atomic<int> csum_type = {Checksummer::CSUM_CRC32C};
2018
2019 uint64_t block_size = 0; ///< block size of block device (power of 2)
2020 uint64_t block_mask = 0; ///< mask to get just the block offset
2021 size_t block_size_order = 0; ///< bits to shift to get block size
2022
2023 uint64_t min_alloc_size; ///< minimum allocation unit (power of 2)
2024 ///< bits for min_alloc_size
2025 uint8_t min_alloc_size_order = 0;
2026 static_assert(std::numeric_limits<uint8_t>::max() >
2027 std::numeric_limits<decltype(min_alloc_size)>::digits,
2028 "not enough bits for min_alloc_size");
2029
2030 bool per_pool_omap = false;
2031
2032 ///< maximum allocation unit (power of 2)
2033 std::atomic<uint64_t> max_alloc_size = {0};
2034
2035 ///< number threshold for forced deferred writes
2036 std::atomic<int> deferred_batch_ops = {0};
2037
2038 ///< size threshold for forced deferred writes
2039 std::atomic<uint64_t> prefer_deferred_size = {0};
2040
2041 ///< approx cost per io, in bytes
2042 std::atomic<uint64_t> throttle_cost_per_io = {0};
2043
2044 std::atomic<Compressor::CompressionMode> comp_mode =
2045 {Compressor::COMP_NONE}; ///< compression mode
2046 CompressorRef compressor;
2047 std::atomic<uint64_t> comp_min_blob_size = {0};
2048 std::atomic<uint64_t> comp_max_blob_size = {0};
2049
2050 std::atomic<uint64_t> max_blob_size = {0}; ///< maximum blob size
2051
2052 uint64_t kv_ios = 0;
2053 uint64_t kv_throttle_costs = 0;
2054
2055 // cache trim control
2056 uint64_t cache_size = 0; ///< total cache size
2057 double cache_meta_ratio = 0; ///< cache ratio dedicated to metadata
2058 double cache_kv_ratio = 0; ///< cache ratio dedicated to kv (e.g., rocksdb)
2059 double cache_data_ratio = 0; ///< cache ratio dedicated to object data
2060 bool cache_autotune = false; ///< cache autotune setting
2061 double cache_autotune_interval = 0; ///< time to wait between cache rebalancing
2062 uint64_t osd_memory_target = 0; ///< OSD memory target when autotuning cache
2063 uint64_t osd_memory_base = 0; ///< OSD base memory when autotuning cache
2064 double osd_memory_expected_fragmentation = 0; ///< expected memory fragmentation
2065 uint64_t osd_memory_cache_min = 0; ///< Min memory to assign when autotuning cache
2066 double osd_memory_cache_resize_interval = 0; ///< Time to wait between cache resizing
2067 double max_defer_interval = 0; ///< Time to wait between last deferred submit
2068 std::atomic<uint32_t> config_changed = {0}; ///< Counter to determine if there is a configuration change.
2069
2070 typedef map<uint64_t, volatile_statfs> osd_pools_map;
2071
2072 ceph::mutex vstatfs_lock = ceph::make_mutex("BlueStore::vstatfs_lock");
2073 volatile_statfs vstatfs;
2074 osd_pools_map osd_pools; // protected by vstatfs_lock as well
2075
2076 bool per_pool_stat_collection = true;
2077
2078 struct MempoolThread : public Thread {
2079 public:
2080 BlueStore *store;
2081
2082 ceph::condition_variable cond;
2083 ceph::mutex lock = ceph::make_mutex("BlueStore::MempoolThread::lock");
2084 bool stop = false;
2085 std::shared_ptr<PriorityCache::PriCache> binned_kv_cache = nullptr;
2086 std::shared_ptr<PriorityCache::Manager> pcm = nullptr;
2087
2088 struct MempoolCache : public PriorityCache::PriCache {
2089 BlueStore *store;
2090 int64_t cache_bytes[PriorityCache::Priority::LAST+1] = {0};
2091 int64_t committed_bytes = 0;
2092 double cache_ratio = 0;
2093
2094 MempoolCache(BlueStore *s) : store(s) {};
2095
2096 virtual uint64_t _get_used_bytes() const = 0;
2097
2098 virtual int64_t request_cache_bytes(
2099 PriorityCache::Priority pri, uint64_t total_cache) const {
2100 int64_t assigned = get_cache_bytes(pri);
2101
2102 switch (pri) {
2103 // All cache items are currently shoved into the PRI1 priority
2104 case PriorityCache::Priority::PRI1:
2105 {
2106 int64_t request = _get_used_bytes();
2107 return(request > assigned) ? request - assigned : 0;
2108 }
2109 default:
2110 break;
2111 }
2112 return -EOPNOTSUPP;
2113 }
2114
2115 virtual int64_t get_cache_bytes(PriorityCache::Priority pri) const {
2116 return cache_bytes[pri];
2117 }
2118 virtual int64_t get_cache_bytes() const {
2119 int64_t total = 0;
2120
2121 for (int i = 0; i < PriorityCache::Priority::LAST + 1; i++) {
2122 PriorityCache::Priority pri = static_cast<PriorityCache::Priority>(i);
2123 total += get_cache_bytes(pri);
2124 }
2125 return total;
2126 }
2127 virtual void set_cache_bytes(PriorityCache::Priority pri, int64_t bytes) {
2128 cache_bytes[pri] = bytes;
2129 }
2130 virtual void add_cache_bytes(PriorityCache::Priority pri, int64_t bytes) {
2131 cache_bytes[pri] += bytes;
2132 }
2133 virtual int64_t commit_cache_size(uint64_t total_cache) {
2134 committed_bytes = PriorityCache::get_chunk(
2135 get_cache_bytes(), total_cache);
2136 return committed_bytes;
2137 }
2138 virtual int64_t get_committed_size() const {
2139 return committed_bytes;
2140 }
2141 virtual double get_cache_ratio() const {
2142 return cache_ratio;
2143 }
2144 virtual void set_cache_ratio(double ratio) {
2145 cache_ratio = ratio;
2146 }
2147 virtual string get_cache_name() const = 0;
2148 };
2149
2150 struct MetaCache : public MempoolCache {
2151 MetaCache(BlueStore *s) : MempoolCache(s) {};
2152
2153 virtual uint64_t _get_used_bytes() const {
2154 return mempool::bluestore_Buffer::allocated_bytes() +
2155 mempool::bluestore_Blob::allocated_bytes() +
2156 mempool::bluestore_Extent::allocated_bytes() +
2157 mempool::bluestore_cache_meta::allocated_bytes() +
2158 mempool::bluestore_cache_other::allocated_bytes() +
2159 mempool::bluestore_cache_onode::allocated_bytes() +
2160 mempool::bluestore_SharedBlob::allocated_bytes() +
2161 mempool::bluestore_inline_bl::allocated_bytes();
2162 }
2163
2164 virtual string get_cache_name() const {
2165 return "BlueStore Meta Cache";
2166 }
2167
2168 uint64_t _get_num_onodes() const {
2169 uint64_t onode_num =
2170 mempool::bluestore_cache_onode::allocated_items();
2171 return (2 > onode_num) ? 2 : onode_num;
2172 }
2173
2174 double get_bytes_per_onode() const {
2175 return (double)_get_used_bytes() / (double)_get_num_onodes();
2176 }
2177 };
2178 std::shared_ptr<MetaCache> meta_cache;
2179
2180 struct DataCache : public MempoolCache {
2181 DataCache(BlueStore *s) : MempoolCache(s) {};
2182
2183 virtual uint64_t _get_used_bytes() const {
2184 uint64_t bytes = 0;
2185 for (auto i : store->buffer_cache_shards) {
2186 bytes += i->_get_bytes();
2187 }
2188 return bytes;
2189 }
2190 virtual string get_cache_name() const {
2191 return "BlueStore Data Cache";
2192 }
2193 };
2194 std::shared_ptr<DataCache> data_cache;
2195
2196 public:
2197 explicit MempoolThread(BlueStore *s)
2198 : store(s),
2199 meta_cache(new MetaCache(s)),
2200 data_cache(new DataCache(s)) {}
2201
2202 void *entry() override;
2203 void init() {
2204 ceph_assert(stop == false);
2205 create("bstore_mempool");
2206 }
2207 void shutdown() {
2208 lock.lock();
2209 stop = true;
2210 cond.notify_all();
2211 lock.unlock();
2212 join();
2213 }
2214
2215 private:
2216 void _adjust_cache_settings();
2217 void _update_cache_settings();
2218 void _resize_shards(bool interval_stats);
2219 } mempool_thread;
2220
2221 // --------------------------------------------------------
2222 // private methods
2223
2224 void _init_logger();
2225 void _shutdown_logger();
2226 int _reload_logger();
2227
2228 int _open_path();
2229 void _close_path();
2230 int _open_fsid(bool create);
2231 int _lock_fsid();
2232 int _read_fsid(uuid_d *f);
2233 int _write_fsid();
2234 void _close_fsid();
2235 void _set_alloc_sizes();
2236 void _set_blob_size();
2237 void _set_finisher_num();
2238 void _set_per_pool_omap();
2239 void _update_osd_memory_options();
2240
2241 int _open_bdev(bool create);
2242 // Verifies if disk space is enough for reserved + min bluefs
2243 // and alters the latter if needed.
2244 // Depends on min_alloc_size hence should be called after
2245 // its initialization (and outside of _open_bdev)
2246 void _validate_bdev();
2247 void _close_bdev();
2248
2249 int _minimal_open_bluefs(bool create);
2250 void _minimal_close_bluefs();
2251 int _open_bluefs(bool create);
2252 void _close_bluefs(bool cold_close);
2253
2254 // Limited (u)mount intended for BlueFS operations only
2255 int _mount_for_bluefs();
2256 void _umount_for_bluefs();
2257
2258
2259 int _is_bluefs(bool create, bool* ret);
2260 /*
2261 * opens both DB and dependant super_meta, FreelistManager and allocator
2262 * in the proper order
2263 */
2264 int _open_db_and_around(bool read_only);
2265 void _close_db_and_around(bool read_only);
2266
2267 // updates legacy bluefs related recs in DB to a state valid for
2268 // downgrades from nautilus.
2269 void _sync_bluefs_and_fm();
2270
2271 /*
2272 * @warning to_repair_db means that we open this db to repair it, will not
2273 * hold the rocksdb's file lock.
2274 */
2275 int _open_db(bool create,
2276 bool to_repair_db=false,
2277 bool read_only = false);
2278 void _close_db(bool read_only);
2279 int _open_fm(KeyValueDB::Transaction t, bool read_only);
2280 void _close_fm();
2281 int _write_out_fm_meta(uint64_t target_size,
2282 bool update_root_size = false,
2283 bluestore_bdev_label_t* res_label = nullptr);
2284 int _open_alloc();
2285 void _close_alloc();
2286 int _open_collections();
2287 void _fsck_collections(int64_t* errors);
2288 void _close_collections();
2289
2290 int _setup_block_symlink_or_file(string name, string path, uint64_t size,
2291 bool create);
2292
2293 public:
2294 utime_t get_deferred_last_submitted() {
2295 std::lock_guard l(deferred_lock);
2296 return deferred_last_submitted;
2297 }
2298
2299 static int _write_bdev_label(CephContext* cct,
2300 string path, bluestore_bdev_label_t label);
2301 static int _read_bdev_label(CephContext* cct, string path,
2302 bluestore_bdev_label_t *label);
2303 private:
2304 int _check_or_set_bdev_label(string path, uint64_t size, string desc,
2305 bool create);
2306
2307 int _open_super_meta();
2308
2309 void _open_statfs();
2310 void _get_statfs_overall(struct store_statfs_t *buf);
2311
2312 void _dump_alloc_on_failure();
2313
2314 int64_t _get_bluefs_size_delta(uint64_t bluefs_free, uint64_t bluefs_total);
2315 int _balance_bluefs_freespace();
2316
2317 CollectionRef _get_collection(const coll_t& cid);
2318 void _queue_reap_collection(CollectionRef& c);
2319 void _reap_collections();
2320 void _update_cache_logger();
2321
2322 void _assign_nid(TransContext *txc, OnodeRef o);
2323 uint64_t _assign_blobid(TransContext *txc);
2324
2325 template <int LogLevelV>
2326 friend void _dump_onode(CephContext *cct, const Onode& o);
2327 template <int LogLevelV>
2328 friend void _dump_extent_map(CephContext *cct, const ExtentMap& em);
2329 template <int LogLevelV>
2330 friend void _dump_transaction(CephContext *cct, Transaction *t);
2331
2332 TransContext *_txc_create(Collection *c, OpSequencer *osr,
2333 list<Context*> *on_commits);
2334 void _txc_update_store_statfs(TransContext *txc);
2335 void _txc_add_transaction(TransContext *txc, Transaction *t);
2336 void _txc_calc_cost(TransContext *txc);
2337 void _txc_write_nodes(TransContext *txc, KeyValueDB::Transaction t);
2338 void _txc_state_proc(TransContext *txc);
2339 void _txc_aio_submit(TransContext *txc);
2340 public:
2341 void txc_aio_finish(void *p) {
2342 _txc_state_proc(static_cast<TransContext*>(p));
2343 }
2344 private:
2345 void _txc_finish_io(TransContext *txc);
2346 void _txc_finalize_kv(TransContext *txc, KeyValueDB::Transaction t);
2347 void _txc_apply_kv(TransContext *txc, bool sync_submit_transaction);
2348 void _txc_committed_kv(TransContext *txc);
2349 void _txc_finish(TransContext *txc);
2350 void _txc_release_alloc(TransContext *txc);
2351
2352 void _osr_attach(Collection *c);
2353 void _osr_register_zombie(OpSequencer *osr);
2354 void _osr_drain(OpSequencer *osr);
2355 void _osr_drain_preceding(TransContext *txc);
2356 void _osr_drain_all();
2357
2358 void _kv_start();
2359 void _kv_stop();
2360 void _kv_sync_thread();
2361 void _kv_finalize_thread();
2362
2363 bluestore_deferred_op_t *_get_deferred_op(TransContext *txc);
2364 void _deferred_queue(TransContext *txc);
2365 public:
2366 void deferred_try_submit();
2367 private:
2368 void _deferred_submit_unlock(OpSequencer *osr);
2369 void _deferred_aio_finish(OpSequencer *osr);
2370 int _deferred_replay();
2371
2372 public:
2373 using mempool_dynamic_bitset =
2374 boost::dynamic_bitset<uint64_t,
2375 mempool::bluestore_fsck::pool_allocator<uint64_t>>;
2376 using per_pool_statfs =
2377 mempool::bluestore_fsck::map<uint64_t, store_statfs_t>;
2378
2379 enum FSCKDepth {
2380 FSCK_REGULAR,
2381 FSCK_DEEP,
2382 FSCK_SHALLOW
2383 };
2384 enum {
2385 MAX_FSCK_ERROR_LINES = 100,
2386 };
2387
2388 private:
2389 int _fsck_check_extents(
2390 const coll_t& cid,
2391 const ghobject_t& oid,
2392 const PExtentVector& extents,
2393 bool compressed,
2394 mempool_dynamic_bitset &used_blocks,
2395 uint64_t granularity,
2396 BlueStoreRepairer* repairer,
2397 store_statfs_t& expected_statfs,
2398 FSCKDepth depth);
2399
2400 void _fsck_check_pool_statfs(
2401 per_pool_statfs& expected_pool_statfs,
2402 int64_t& errors,
2403 int64_t &warnings,
2404 BlueStoreRepairer* repairer);
2405
2406 int _fsck(FSCKDepth depth, bool repair);
2407 int _fsck_on_open(BlueStore::FSCKDepth depth, bool repair);
2408
2409 void _buffer_cache_write(
2410 TransContext *txc,
2411 BlobRef b,
2412 uint64_t offset,
2413 bufferlist& bl,
2414 unsigned flags) {
2415 b->shared_blob->bc.write(b->shared_blob->get_cache(), txc->seq, offset, bl,
2416 flags);
2417 txc->shared_blobs_written.insert(b->shared_blob);
2418 }
2419
2420 int _collection_list(
2421 Collection *c, const ghobject_t& start, const ghobject_t& end,
2422 int max, bool legacy, vector<ghobject_t> *ls, ghobject_t *next);
2423
2424 template <typename T, typename F>
2425 T select_option(const std::string& opt_name, T val1, F f) {
2426 //NB: opt_name reserved for future use
2427 boost::optional<T> val2 = f();
2428 if (val2) {
2429 return *val2;
2430 }
2431 return val1;
2432 }
2433
2434 void _apply_padding(uint64_t head_pad,
2435 uint64_t tail_pad,
2436 bufferlist& padded);
2437
2438 void _record_onode(OnodeRef &o, KeyValueDB::Transaction &txn);
2439
2440 // -- ondisk version ---
2441 public:
2442 const int32_t latest_ondisk_format = 4; ///< our version
2443 const int32_t min_readable_ondisk_format = 1; ///< what we can read
2444 const int32_t min_compat_ondisk_format = 3; ///< who can read us
2445
2446 private:
2447 int32_t ondisk_format = 0; ///< value detected on mount
2448
2449 int _upgrade_super(); ///< upgrade (called during open_super)
2450 uint64_t _get_ondisk_reserved() const;
2451 void _prepare_ondisk_format_super(KeyValueDB::Transaction& t);
2452
2453 // --- public interface ---
2454 public:
2455 BlueStore(CephContext *cct, const string& path);
2456 BlueStore(CephContext *cct, const string& path, uint64_t min_alloc_size); // Ctor for UT only
2457 ~BlueStore() override;
2458
2459 string get_type() override {
2460 return "bluestore";
2461 }
2462
2463 bool needs_journal() override { return false; };
2464 bool wants_journal() override { return false; };
2465 bool allows_journal() override { return false; };
2466
2467 uint64_t get_min_alloc_size() const override {
2468 return min_alloc_size;
2469 }
2470
2471 int get_devices(set<string> *ls) override;
2472
2473 bool is_rotational() override;
2474 bool is_journal_rotational() override;
2475
2476 string get_default_device_class() override {
2477 string device_class;
2478 map<string, string> metadata;
2479 collect_metadata(&metadata);
2480 auto it = metadata.find("bluestore_bdev_type");
2481 if (it != metadata.end()) {
2482 device_class = it->second;
2483 }
2484 return device_class;
2485 }
2486
2487 int get_numa_node(
2488 int *numa_node,
2489 set<int> *nodes,
2490 set<string> *failed) override;
2491
2492 static int get_block_device_fsid(CephContext* cct, const string& path,
2493 uuid_d *fsid);
2494
2495 bool test_mount_in_use() override;
2496
2497 private:
2498 int _mount(bool kv_only, bool open_db=true);
2499 public:
2500 int mount() override {
2501 return _mount(false);
2502 }
2503 int umount() override;
2504
2505 int start_kv_only(KeyValueDB **pdb, bool open_db=true) {
2506 int r = _mount(true, open_db);
2507 if (r < 0)
2508 return r;
2509 *pdb = db;
2510 return 0;
2511 }
2512
2513 int write_meta(const std::string& key, const std::string& value) override;
2514 int read_meta(const std::string& key, std::string *value) override;
2515
2516 int cold_open();
2517 int cold_close();
2518
2519 int fsck(bool deep) override {
2520 return _fsck(deep ? FSCK_DEEP : FSCK_REGULAR, false);
2521 }
2522 int repair(bool deep) override {
2523 return _fsck(deep ? FSCK_DEEP : FSCK_REGULAR, true);
2524 }
2525 int quick_fix() override {
2526 return _fsck(FSCK_SHALLOW, true);
2527 }
2528
2529 void set_cache_shards(unsigned num) override;
2530 void dump_cache_stats(Formatter *f) override {
2531 int onode_count = 0, buffers_bytes = 0;
2532 for (auto i: onode_cache_shards) {
2533 onode_count += i->_get_num();
2534 }
2535 for (auto i: buffer_cache_shards) {
2536 buffers_bytes += i->_get_bytes();
2537 }
2538 f->dump_int("bluestore_onode", onode_count);
2539 f->dump_int("bluestore_buffers", buffers_bytes);
2540 }
2541 void dump_cache_stats(ostream& ss) override {
2542 int onode_count = 0, buffers_bytes = 0;
2543 for (auto i: onode_cache_shards) {
2544 onode_count += i->_get_num();
2545 }
2546 for (auto i: buffer_cache_shards) {
2547 buffers_bytes += i->_get_bytes();
2548 }
2549 ss << "bluestore_onode: " << onode_count;
2550 ss << "bluestore_buffers: " << buffers_bytes;
2551 }
2552
2553 int validate_hobject_key(const hobject_t &obj) const override {
2554 return 0;
2555 }
2556 unsigned get_max_attr_name_length() override {
2557 return 256; // arbitrary; there is no real limit internally
2558 }
2559
2560 int mkfs() override;
2561 int mkjournal() override {
2562 return 0;
2563 }
2564
2565 void get_db_statistics(Formatter *f) override;
2566 void generate_db_histogram(Formatter *f) override;
2567 void _shutdown_cache();
2568 int flush_cache(ostream *os = NULL) override;
2569 void dump_perf_counters(Formatter *f) override {
2570 f->open_object_section("perf_counters");
2571 logger->dump_formatted(f, false);
2572 f->close_section();
2573 }
2574
2575 int add_new_bluefs_device(int id, const string& path);
2576 int migrate_to_existing_bluefs_device(const set<int>& devs_source,
2577 int id);
2578 int migrate_to_new_bluefs_device(const set<int>& devs_source,
2579 int id,
2580 const string& path);
2581 int expand_devices(ostream& out);
2582 string get_device_path(unsigned id);
2583
2584 int dump_bluefs_sizes(ostream& out);
2585
2586 public:
2587 int statfs(struct store_statfs_t *buf,
2588 osd_alert_list_t* alerts = nullptr) override;
2589 int pool_statfs(uint64_t pool_id, struct store_statfs_t *buf,
2590 bool *per_pool_omap) override;
2591
2592 void collect_metadata(map<string,string> *pm) override;
2593
2594 bool exists(CollectionHandle &c, const ghobject_t& oid) override;
2595 int set_collection_opts(
2596 CollectionHandle& c,
2597 const pool_opts_t& opts) override;
2598 int stat(
2599 CollectionHandle &c,
2600 const ghobject_t& oid,
2601 struct stat *st,
2602 bool allow_eio = false) override;
2603 int read(
2604 CollectionHandle &c,
2605 const ghobject_t& oid,
2606 uint64_t offset,
2607 size_t len,
2608 bufferlist& bl,
2609 uint32_t op_flags = 0) override;
2610
2611 private:
2612
2613 // --------------------------------------------------------
2614 // intermediate data structures used while reading
2615 struct region_t {
2616 uint64_t logical_offset;
2617 uint64_t blob_xoffset; //region offset within the blob
2618 uint64_t length;
2619
2620 // used later in read process
2621 uint64_t front = 0;
2622
2623 region_t(uint64_t offset, uint64_t b_offs, uint64_t len, uint64_t front = 0)
2624 : logical_offset(offset),
2625 blob_xoffset(b_offs),
2626 length(len),
2627 front(front){}
2628 region_t(const region_t& from)
2629 : logical_offset(from.logical_offset),
2630 blob_xoffset(from.blob_xoffset),
2631 length(from.length),
2632 front(from.front){}
2633
2634 friend ostream& operator<<(ostream& out, const region_t& r) {
2635 return out << "0x" << std::hex << r.logical_offset << ":"
2636 << r.blob_xoffset << "~" << r.length << std::dec;
2637 }
2638 };
2639
2640 // merged blob read request
2641 struct read_req_t {
2642 uint64_t r_off = 0;
2643 uint64_t r_len = 0;
2644 bufferlist bl;
2645 std::list<region_t> regs; // original read regions
2646
2647 read_req_t(uint64_t off, uint64_t len) : r_off(off), r_len(len) {}
2648
2649 friend ostream& operator<<(ostream& out, const read_req_t& r) {
2650 out << "{<0x" << std::hex << r.r_off << ", 0x" << r.r_len << "> : [";
2651 for (const auto& reg : r.regs)
2652 out << reg;
2653 return out << "]}" << std::dec;
2654 }
2655 };
2656
2657 typedef list<read_req_t> regions2read_t;
2658 typedef map<BlueStore::BlobRef, regions2read_t> blobs2read_t;
2659
2660 void _read_cache(
2661 OnodeRef o,
2662 uint64_t offset,
2663 size_t length,
2664 int read_cache_policy,
2665 ready_regions_t& ready_regions,
2666 blobs2read_t& blobs2read);
2667
2668
2669 int _prepare_read_ioc(
2670 blobs2read_t& blobs2read,
2671 vector<bufferlist>* compressed_blob_bls,
2672 IOContext* ioc);
2673
2674 int _generate_read_result_bl(
2675 OnodeRef o,
2676 uint64_t offset,
2677 size_t length,
2678 ready_regions_t& ready_regions,
2679 vector<bufferlist>& compressed_blob_bls,
2680 blobs2read_t& blobs2read,
2681 bool buffered,
2682 bool* csum_error,
2683 bufferlist& bl);
2684
2685 int _do_read(
2686 Collection *c,
2687 OnodeRef o,
2688 uint64_t offset,
2689 size_t len,
2690 bufferlist& bl,
2691 uint32_t op_flags = 0,
2692 uint64_t retry_count = 0);
2693
2694 int _do_readv(
2695 Collection *c,
2696 OnodeRef o,
2697 const interval_set<uint64_t>& m,
2698 bufferlist& bl,
2699 uint32_t op_flags = 0,
2700 uint64_t retry_count = 0);
2701
2702 int _fiemap(CollectionHandle &c_, const ghobject_t& oid,
2703 uint64_t offset, size_t len, interval_set<uint64_t>& destset);
2704 public:
2705 int fiemap(CollectionHandle &c, const ghobject_t& oid,
2706 uint64_t offset, size_t len, bufferlist& bl) override;
2707 int fiemap(CollectionHandle &c, const ghobject_t& oid,
2708 uint64_t offset, size_t len, map<uint64_t, uint64_t>& destmap) override;
2709
2710 int readv(
2711 CollectionHandle &c_,
2712 const ghobject_t& oid,
2713 interval_set<uint64_t>& m,
2714 bufferlist& bl,
2715 uint32_t op_flags) override;
2716
2717 int dump_onode(CollectionHandle &c, const ghobject_t& oid,
2718 const string& section_name, Formatter *f) override;
2719
2720 int getattr(CollectionHandle &c, const ghobject_t& oid, const char *name,
2721 bufferptr& value) override;
2722
2723 int getattrs(CollectionHandle &c, const ghobject_t& oid,
2724 map<string,bufferptr>& aset) override;
2725
2726 int list_collections(vector<coll_t>& ls) override;
2727
2728 CollectionHandle open_collection(const coll_t &c) override;
2729 CollectionHandle create_new_collection(const coll_t& cid) override;
2730 void set_collection_commit_queue(const coll_t& cid,
2731 ContextQueue *commit_queue) override;
2732
2733 bool collection_exists(const coll_t& c) override;
2734 int collection_empty(CollectionHandle& c, bool *empty) override;
2735 int collection_bits(CollectionHandle& c) override;
2736
2737 int collection_list(CollectionHandle &c,
2738 const ghobject_t& start,
2739 const ghobject_t& end,
2740 int max,
2741 vector<ghobject_t> *ls, ghobject_t *next) override;
2742
2743 int collection_list_legacy(CollectionHandle &c,
2744 const ghobject_t& start,
2745 const ghobject_t& end,
2746 int max,
2747 vector<ghobject_t> *ls,
2748 ghobject_t *next) override;
2749
2750 int omap_get(
2751 CollectionHandle &c, ///< [in] Collection containing oid
2752 const ghobject_t &oid, ///< [in] Object containing omap
2753 bufferlist *header, ///< [out] omap header
2754 map<string, bufferlist> *out /// < [out] Key to value map
2755 ) override;
2756 int _omap_get(
2757 Collection *c, ///< [in] Collection containing oid
2758 const ghobject_t &oid, ///< [in] Object containing omap
2759 bufferlist *header, ///< [out] omap header
2760 map<string, bufferlist> *out /// < [out] Key to value map
2761 );
2762 int _onode_omap_get(
2763 const OnodeRef &o, ///< [in] Object containing omap
2764 bufferlist *header, ///< [out] omap header
2765 map<string, bufferlist> *out /// < [out] Key to value map
2766 );
2767
2768
2769 /// Get omap header
2770 int omap_get_header(
2771 CollectionHandle &c, ///< [in] Collection containing oid
2772 const ghobject_t &oid, ///< [in] Object containing omap
2773 bufferlist *header, ///< [out] omap header
2774 bool allow_eio = false ///< [in] don't assert on eio
2775 ) override;
2776
2777 /// Get keys defined on oid
2778 int omap_get_keys(
2779 CollectionHandle &c, ///< [in] Collection containing oid
2780 const ghobject_t &oid, ///< [in] Object containing omap
2781 set<string> *keys ///< [out] Keys defined on oid
2782 ) override;
2783
2784 /// Get key values
2785 int omap_get_values(
2786 CollectionHandle &c, ///< [in] Collection containing oid
2787 const ghobject_t &oid, ///< [in] Object containing omap
2788 const set<string> &keys, ///< [in] Keys to get
2789 map<string, bufferlist> *out ///< [out] Returned keys and values
2790 ) override;
2791
2792 #ifdef WITH_SEASTAR
2793 int omap_get_values(
2794 CollectionHandle &c, ///< [in] Collection containing oid
2795 const ghobject_t &oid, ///< [in] Object containing omap
2796 const std::optional<string> &start_after, ///< [in] Keys to get
2797 map<string, bufferlist> *out ///< [out] Returned keys and values
2798 ) override;
2799 #endif
2800
2801 /// Filters keys into out which are defined on oid
2802 int omap_check_keys(
2803 CollectionHandle &c, ///< [in] Collection containing oid
2804 const ghobject_t &oid, ///< [in] Object containing omap
2805 const set<string> &keys, ///< [in] Keys to check
2806 set<string> *out ///< [out] Subset of keys defined on oid
2807 ) override;
2808
2809 ObjectMap::ObjectMapIterator get_omap_iterator(
2810 CollectionHandle &c, ///< [in] collection
2811 const ghobject_t &oid ///< [in] object
2812 ) override;
2813
2814 void set_fsid(uuid_d u) override {
2815 fsid = u;
2816 }
2817 uuid_d get_fsid() override {
2818 return fsid;
2819 }
2820
2821 uint64_t estimate_objects_overhead(uint64_t num_objects) override {
2822 return num_objects * 300; //assuming per-object overhead is 300 bytes
2823 }
2824
2825 struct BSPerfTracker {
2826 PerfCounters::avg_tracker<uint64_t> os_commit_latency_ns;
2827 PerfCounters::avg_tracker<uint64_t> os_apply_latency_ns;
2828
2829 objectstore_perf_stat_t get_cur_stats() const {
2830 objectstore_perf_stat_t ret;
2831 ret.os_commit_latency_ns = os_commit_latency_ns.current_avg();
2832 ret.os_apply_latency_ns = os_apply_latency_ns.current_avg();
2833 return ret;
2834 }
2835
2836 void update_from_perfcounters(PerfCounters &logger);
2837 } perf_tracker;
2838
2839 objectstore_perf_stat_t get_cur_stats() override {
2840 perf_tracker.update_from_perfcounters(*logger);
2841 return perf_tracker.get_cur_stats();
2842 }
2843 const PerfCounters* get_perf_counters() const override {
2844 return logger;
2845 }
2846 const PerfCounters* get_bluefs_perf_counters() const {
2847 return bluefs->get_perf_counters();
2848 }
2849
2850 int queue_transactions(
2851 CollectionHandle& ch,
2852 vector<Transaction>& tls,
2853 TrackedOpRef op = TrackedOpRef(),
2854 ThreadPool::TPHandle *handle = NULL) override;
2855
2856 // error injection
2857 void inject_data_error(const ghobject_t& o) override {
2858 std::unique_lock l(debug_read_error_lock);
2859 debug_data_error_objects.insert(o);
2860 }
2861 void inject_mdata_error(const ghobject_t& o) override {
2862 std::unique_lock l(debug_read_error_lock);
2863 debug_mdata_error_objects.insert(o);
2864 }
2865
2866 /// methods to inject various errors fsck can repair
2867 void inject_broken_shared_blob_key(const string& key,
2868 const bufferlist& bl);
2869 void inject_leaked(uint64_t len);
2870 void inject_false_free(coll_t cid, ghobject_t oid);
2871 void inject_statfs(const string& key, const store_statfs_t& new_statfs);
2872 void inject_global_statfs(const store_statfs_t& new_statfs);
2873 void inject_misreference(coll_t cid1, ghobject_t oid1,
2874 coll_t cid2, ghobject_t oid2,
2875 uint64_t offset);
2876 void inject_zombie_spanning_blob(coll_t cid, ghobject_t oid, int16_t blob_id);
2877 // resets global per_pool_omap in DB
2878 void inject_legacy_omap();
2879 // resets per_pool_omap | pgmeta_omap for onode
2880 void inject_legacy_omap(coll_t cid, ghobject_t oid);
2881
2882 void compact() override {
2883 ceph_assert(db);
2884 db->compact();
2885 }
2886 bool has_builtin_csum() const override {
2887 return true;
2888 }
2889
2890 /*
2891 Allocate space for BlueFS from slow device.
2892 Either automatically applies allocated extents to underlying
2893 BlueFS (extents == nullptr) or just return them (non-null extents) provided
2894 */
2895 int allocate_bluefs_freespace(
2896 uint64_t min_size,
2897 uint64_t size,
2898 PExtentVector* extents);
2899
2900 inline void log_latency(const char* name,
2901 int idx,
2902 const ceph::timespan& lat,
2903 double lat_threshold,
2904 const char* info = "") const;
2905
2906 inline void log_latency_fn(const char* name,
2907 int idx,
2908 const ceph::timespan& lat,
2909 double lat_threshold,
2910 std::function<string (const ceph::timespan& lat)> fn) const;
2911
2912 private:
2913 bool _debug_data_eio(const ghobject_t& o) {
2914 if (!cct->_conf->bluestore_debug_inject_read_err) {
2915 return false;
2916 }
2917 std::shared_lock l(debug_read_error_lock);
2918 return debug_data_error_objects.count(o);
2919 }
2920 bool _debug_mdata_eio(const ghobject_t& o) {
2921 if (!cct->_conf->bluestore_debug_inject_read_err) {
2922 return false;
2923 }
2924 std::shared_lock l(debug_read_error_lock);
2925 return debug_mdata_error_objects.count(o);
2926 }
2927 void _debug_obj_on_delete(const ghobject_t& o) {
2928 if (cct->_conf->bluestore_debug_inject_read_err) {
2929 std::unique_lock l(debug_read_error_lock);
2930 debug_data_error_objects.erase(o);
2931 debug_mdata_error_objects.erase(o);
2932 }
2933 }
2934 private:
2935 ceph::mutex qlock = ceph::make_mutex("BlueStore::Alerts::qlock");
2936 string failed_cmode;
2937 set<string> failed_compressors;
2938 string spillover_alert;
2939 string legacy_statfs_alert;
2940 string no_per_pool_omap_alert;
2941 string disk_size_mismatch_alert;
2942
2943 void _log_alerts(osd_alert_list_t& alerts);
2944 bool _set_compression_alert(bool cmode, const char* s) {
2945 std::lock_guard l(qlock);
2946 if (cmode) {
2947 bool ret = failed_cmode.empty();
2948 failed_cmode = s;
2949 return ret;
2950 }
2951 return failed_compressors.emplace(s).second;
2952 }
2953 void _clear_compression_alert() {
2954 std::lock_guard l(qlock);
2955 failed_compressors.clear();
2956 failed_cmode.clear();
2957 }
2958
2959 void _set_spillover_alert(const string& s) {
2960 std::lock_guard l(qlock);
2961 spillover_alert = s;
2962 }
2963 void _clear_spillover_alert() {
2964 std::lock_guard l(qlock);
2965 spillover_alert.clear();
2966 }
2967
2968 void _check_legacy_statfs_alert();
2969 void _check_no_per_pool_omap_alert();
2970 void _set_disk_size_mismatch_alert(const string& s) {
2971 std::lock_guard l(qlock);
2972 disk_size_mismatch_alert = s;
2973 }
2974
2975 private:
2976
2977 // --------------------------------------------------------
2978 // read processing internal methods
2979 int _verify_csum(
2980 OnodeRef& o,
2981 const bluestore_blob_t* blob,
2982 uint64_t blob_xoffset,
2983 const bufferlist& bl,
2984 uint64_t logical_offset) const;
2985 int _decompress(bufferlist& source, bufferlist* result);
2986
2987
2988 // --------------------------------------------------------
2989 // write ops
2990
2991 struct WriteContext {
2992 bool buffered = false; ///< buffered write
2993 bool compress = false; ///< compressed write
2994 uint64_t target_blob_size = 0; ///< target (max) blob size
2995 unsigned csum_order = 0; ///< target checksum chunk order
2996
2997 old_extent_map_t old_extents; ///< must deref these blobs
2998 interval_set<uint64_t> extents_to_gc; ///< extents for garbage collection
2999
3000 struct write_item {
3001 uint64_t logical_offset; ///< write logical offset
3002 BlobRef b;
3003 uint64_t blob_length;
3004 uint64_t b_off;
3005 bufferlist bl;
3006 uint64_t b_off0; ///< original offset in a blob prior to padding
3007 uint64_t length0; ///< original data length prior to padding
3008
3009 bool mark_unused;
3010 bool new_blob; ///< whether new blob was created
3011
3012 bool compressed = false;
3013 bufferlist compressed_bl;
3014 size_t compressed_len = 0;
3015
3016 write_item(
3017 uint64_t logical_offs,
3018 BlobRef b,
3019 uint64_t blob_len,
3020 uint64_t o,
3021 bufferlist& bl,
3022 uint64_t o0,
3023 uint64_t l0,
3024 bool _mark_unused,
3025 bool _new_blob)
3026 :
3027 logical_offset(logical_offs),
3028 b(b),
3029 blob_length(blob_len),
3030 b_off(o),
3031 bl(bl),
3032 b_off0(o0),
3033 length0(l0),
3034 mark_unused(_mark_unused),
3035 new_blob(_new_blob) {}
3036 };
3037 vector<write_item> writes; ///< blobs we're writing
3038
3039 /// partial clone of the context
3040 void fork(const WriteContext& other) {
3041 buffered = other.buffered;
3042 compress = other.compress;
3043 target_blob_size = other.target_blob_size;
3044 csum_order = other.csum_order;
3045 }
3046 void write(
3047 uint64_t loffs,
3048 BlobRef b,
3049 uint64_t blob_len,
3050 uint64_t o,
3051 bufferlist& bl,
3052 uint64_t o0,
3053 uint64_t len0,
3054 bool _mark_unused,
3055 bool _new_blob) {
3056 writes.emplace_back(loffs,
3057 b,
3058 blob_len,
3059 o,
3060 bl,
3061 o0,
3062 len0,
3063 _mark_unused,
3064 _new_blob);
3065 }
3066 /// Checks for writes to the same pextent within a blob
3067 bool has_conflict(
3068 BlobRef b,
3069 uint64_t loffs,
3070 uint64_t loffs_end,
3071 uint64_t min_alloc_size);
3072 };
3073
3074 void _do_write_small(
3075 TransContext *txc,
3076 CollectionRef &c,
3077 OnodeRef o,
3078 uint64_t offset, uint64_t length,
3079 bufferlist::iterator& blp,
3080 WriteContext *wctx);
3081 void _do_write_big(
3082 TransContext *txc,
3083 CollectionRef &c,
3084 OnodeRef o,
3085 uint64_t offset, uint64_t length,
3086 bufferlist::iterator& blp,
3087 WriteContext *wctx);
3088 int _do_alloc_write(
3089 TransContext *txc,
3090 CollectionRef c,
3091 OnodeRef o,
3092 WriteContext *wctx);
3093 void _wctx_finish(
3094 TransContext *txc,
3095 CollectionRef& c,
3096 OnodeRef o,
3097 WriteContext *wctx,
3098 set<SharedBlob*> *maybe_unshared_blobs=0);
3099
3100 int _write(TransContext *txc,
3101 CollectionRef& c,
3102 OnodeRef& o,
3103 uint64_t offset, size_t len,
3104 bufferlist& bl,
3105 uint32_t fadvise_flags);
3106 void _pad_zeros(bufferlist *bl, uint64_t *offset,
3107 uint64_t chunk_size);
3108
3109 void _choose_write_options(CollectionRef& c,
3110 OnodeRef o,
3111 uint32_t fadvise_flags,
3112 WriteContext *wctx);
3113
3114 int _do_gc(TransContext *txc,
3115 CollectionRef& c,
3116 OnodeRef o,
3117 const WriteContext& wctx,
3118 uint64_t *dirty_start,
3119 uint64_t *dirty_end);
3120
3121 int _do_write(TransContext *txc,
3122 CollectionRef &c,
3123 OnodeRef o,
3124 uint64_t offset, uint64_t length,
3125 bufferlist& bl,
3126 uint32_t fadvise_flags);
3127 void _do_write_data(TransContext *txc,
3128 CollectionRef& c,
3129 OnodeRef o,
3130 uint64_t offset,
3131 uint64_t length,
3132 bufferlist& bl,
3133 WriteContext *wctx);
3134
3135 int _touch(TransContext *txc,
3136 CollectionRef& c,
3137 OnodeRef& o);
3138 int _do_zero(TransContext *txc,
3139 CollectionRef& c,
3140 OnodeRef& o,
3141 uint64_t offset, size_t len);
3142 int _zero(TransContext *txc,
3143 CollectionRef& c,
3144 OnodeRef& o,
3145 uint64_t offset, size_t len);
3146 void _do_truncate(TransContext *txc,
3147 CollectionRef& c,
3148 OnodeRef o,
3149 uint64_t offset,
3150 set<SharedBlob*> *maybe_unshared_blobs=0);
3151 int _truncate(TransContext *txc,
3152 CollectionRef& c,
3153 OnodeRef& o,
3154 uint64_t offset);
3155 int _remove(TransContext *txc,
3156 CollectionRef& c,
3157 OnodeRef& o);
3158 int _do_remove(TransContext *txc,
3159 CollectionRef& c,
3160 OnodeRef o);
3161 int _setattr(TransContext *txc,
3162 CollectionRef& c,
3163 OnodeRef& o,
3164 const string& name,
3165 bufferptr& val);
3166 int _setattrs(TransContext *txc,
3167 CollectionRef& c,
3168 OnodeRef& o,
3169 const map<string,bufferptr>& aset);
3170 int _rmattr(TransContext *txc,
3171 CollectionRef& c,
3172 OnodeRef& o,
3173 const string& name);
3174 int _rmattrs(TransContext *txc,
3175 CollectionRef& c,
3176 OnodeRef& o);
3177 void _do_omap_clear(TransContext *txc, OnodeRef &o);
3178 int _omap_clear(TransContext *txc,
3179 CollectionRef& c,
3180 OnodeRef& o);
3181 int _omap_setkeys(TransContext *txc,
3182 CollectionRef& c,
3183 OnodeRef& o,
3184 bufferlist& bl);
3185 int _omap_setheader(TransContext *txc,
3186 CollectionRef& c,
3187 OnodeRef& o,
3188 bufferlist& header);
3189 int _omap_rmkeys(TransContext *txc,
3190 CollectionRef& c,
3191 OnodeRef& o,
3192 bufferlist& bl);
3193 int _omap_rmkey_range(TransContext *txc,
3194 CollectionRef& c,
3195 OnodeRef& o,
3196 const string& first, const string& last);
3197 int _set_alloc_hint(
3198 TransContext *txc,
3199 CollectionRef& c,
3200 OnodeRef& o,
3201 uint64_t expected_object_size,
3202 uint64_t expected_write_size,
3203 uint32_t flags);
3204 int _do_clone_range(TransContext *txc,
3205 CollectionRef& c,
3206 OnodeRef& oldo,
3207 OnodeRef& newo,
3208 uint64_t srcoff, uint64_t length, uint64_t dstoff);
3209 int _clone(TransContext *txc,
3210 CollectionRef& c,
3211 OnodeRef& oldo,
3212 OnodeRef& newo);
3213 int _clone_range(TransContext *txc,
3214 CollectionRef& c,
3215 OnodeRef& oldo,
3216 OnodeRef& newo,
3217 uint64_t srcoff, uint64_t length, uint64_t dstoff);
3218 int _rename(TransContext *txc,
3219 CollectionRef& c,
3220 OnodeRef& oldo,
3221 OnodeRef& newo,
3222 const ghobject_t& new_oid);
3223 int _create_collection(TransContext *txc, const coll_t &cid,
3224 unsigned bits, CollectionRef *c);
3225 int _remove_collection(TransContext *txc, const coll_t &cid,
3226 CollectionRef *c);
3227 void _do_remove_collection(TransContext *txc, CollectionRef *c);
3228 int _split_collection(TransContext *txc,
3229 CollectionRef& c,
3230 CollectionRef& d,
3231 unsigned bits, int rem);
3232 int _merge_collection(TransContext *txc,
3233 CollectionRef *c,
3234 CollectionRef& d,
3235 unsigned bits);
3236
3237 void _collect_allocation_stats(uint64_t need, uint32_t alloc_size,
3238 size_t extents);
3239 void _record_allocation_stats();
3240 private:
3241 uint64_t probe_count = 0;
3242 std::atomic<uint64_t> alloc_stats_count = {0};
3243 std::atomic<uint64_t> alloc_stats_fragments = { 0 };
3244 std::atomic<uint64_t> alloc_stats_size = { 0 };
3245 //
3246 std::array<std::tuple<uint64_t, uint64_t, uint64_t>, 5> alloc_stats_history =
3247 { std::make_tuple(0ul, 0ul, 0ul) };
3248
3249 std::atomic<uint64_t> out_of_sync_fm = {0};
3250 // --------------------------------------------------------
3251 // BlueFSDeviceExpander implementation
3252 uint64_t get_recommended_expansion_delta(uint64_t bluefs_free,
3253 uint64_t bluefs_total) override {
3254 auto delta = _get_bluefs_size_delta(bluefs_free, bluefs_total);
3255 return delta > 0 ? delta : 0;
3256 }
3257 int allocate_freespace(
3258 uint64_t min_size,
3259 uint64_t size,
3260 PExtentVector& extents) override {
3261 return allocate_bluefs_freespace(min_size, size, &extents);
3262 };
3263 uint64_t available_freespace(uint64_t alloc_size) override;
3264 inline bool _use_rotational_settings();
3265
3266 public:
3267 struct sb_info_t {
3268 coll_t cid;
3269 int64_t pool_id = INT64_MIN;
3270 list<ghobject_t> oids;
3271 BlueStore::SharedBlobRef sb;
3272 bluestore_extent_ref_map_t ref_map;
3273 bool compressed = false;
3274 bool passed = false;
3275 bool updated = false;
3276 };
3277 typedef btree::btree_set<
3278 uint64_t, std::less<uint64_t>,
3279 mempool::bluestore_fsck::pool_allocator<uint64_t>> uint64_t_btree_t;
3280
3281 typedef mempool::bluestore_fsck::map<uint64_t, sb_info_t> sb_info_map_t;
3282 struct FSCK_ObjectCtx {
3283 int64_t& errors;
3284 int64_t& warnings;
3285 uint64_t& num_objects;
3286 uint64_t& num_extents;
3287 uint64_t& num_blobs;
3288 uint64_t& num_sharded_objects;
3289 uint64_t& num_spanning_blobs;
3290
3291 mempool_dynamic_bitset* used_blocks;
3292 uint64_t_btree_t* used_omap_head;
3293
3294 ceph::mutex* sb_info_lock;
3295 sb_info_map_t& sb_info;
3296
3297 store_statfs_t& expected_store_statfs;
3298 per_pool_statfs& expected_pool_statfs;
3299 BlueStoreRepairer* repairer;
3300
3301 FSCK_ObjectCtx(int64_t& e,
3302 int64_t& w,
3303 uint64_t& _num_objects,
3304 uint64_t& _num_extents,
3305 uint64_t& _num_blobs,
3306 uint64_t& _num_sharded_objects,
3307 uint64_t& _num_spanning_blobs,
3308 mempool_dynamic_bitset* _ub,
3309 uint64_t_btree_t* _used_omap_head,
3310 ceph::mutex* _sb_info_lock,
3311 sb_info_map_t& _sb_info,
3312 store_statfs_t& _store_statfs,
3313 per_pool_statfs& _pool_statfs,
3314 BlueStoreRepairer* _repairer) :
3315 errors(e),
3316 warnings(w),
3317 num_objects(_num_objects),
3318 num_extents(_num_extents),
3319 num_blobs(_num_blobs),
3320 num_sharded_objects(_num_sharded_objects),
3321 num_spanning_blobs(_num_spanning_blobs),
3322 used_blocks(_ub),
3323 used_omap_head(_used_omap_head),
3324 sb_info_lock(_sb_info_lock),
3325 sb_info(_sb_info),
3326 expected_store_statfs(_store_statfs),
3327 expected_pool_statfs(_pool_statfs),
3328 repairer(_repairer) {
3329 }
3330 };
3331
3332 OnodeRef fsck_check_objects_shallow(
3333 FSCKDepth depth,
3334 int64_t pool_id,
3335 CollectionRef c,
3336 const ghobject_t& oid,
3337 const string& key,
3338 const bufferlist& value,
3339 mempool::bluestore_fsck::list<string>* expecting_shards,
3340 map<BlobRef, bluestore_blob_t::unused_t>* referenced,
3341 const BlueStore::FSCK_ObjectCtx& ctx);
3342
3343 private:
3344 void _fsck_check_object_omap(FSCKDepth depth,
3345 OnodeRef& o,
3346 const BlueStore::FSCK_ObjectCtx& ctx);
3347
3348 void _fsck_check_objects(FSCKDepth depth,
3349 FSCK_ObjectCtx& ctx);
3350 };
3351
3352 inline ostream& operator<<(ostream& out, const BlueStore::volatile_statfs& s) {
3353 return out
3354 << " allocated:"
3355 << s.values[BlueStore::volatile_statfs::STATFS_ALLOCATED]
3356 << " stored:"
3357 << s.values[BlueStore::volatile_statfs::STATFS_STORED]
3358 << " compressed:"
3359 << s.values[BlueStore::volatile_statfs::STATFS_COMPRESSED]
3360 << " compressed_orig:"
3361 << s.values[BlueStore::volatile_statfs::STATFS_COMPRESSED_ORIGINAL]
3362 << " compressed_alloc:"
3363 << s.values[BlueStore::volatile_statfs::STATFS_COMPRESSED_ALLOCATED];
3364 }
3365
3366 static inline void intrusive_ptr_add_ref(BlueStore::Onode *o) {
3367 o->get();
3368 }
3369 static inline void intrusive_ptr_release(BlueStore::Onode *o) {
3370 o->put();
3371 }
3372
3373 static inline void intrusive_ptr_add_ref(BlueStore::OpSequencer *o) {
3374 o->get();
3375 }
3376 static inline void intrusive_ptr_release(BlueStore::OpSequencer *o) {
3377 o->put();
3378 }
3379
3380 class BlueStoreRepairer
3381 {
3382 public:
3383 // to simplify future potential migration to mempools
3384 using fsck_interval = interval_set<uint64_t>;
3385
3386 // Structure to track what pextents are used for specific cid/oid.
3387 // Similar to Bloom filter positive and false-positive matches are
3388 // possible only.
3389 // Maintains two lists of bloom filters for both cids and oids
3390 // where each list entry is a BF for specific disk pextent
3391 // The length of the extent per filter is measured on init.
3392 // Allows to filter out 'uninteresting' pextents to speadup subsequent
3393 // 'is_used' access.
3394 struct StoreSpaceTracker {
3395 const uint64_t BLOOM_FILTER_SALT_COUNT = 2;
3396 const uint64_t BLOOM_FILTER_TABLE_SIZE = 32; // bytes per single filter
3397 const uint64_t BLOOM_FILTER_EXPECTED_COUNT = 16; // arbitrary selected
3398 static const uint64_t DEF_MEM_CAP = 128 * 1024 * 1024;
3399
3400 typedef mempool::bluestore_fsck::vector<bloom_filter> bloom_vector;
3401 bloom_vector collections_bfs;
3402 bloom_vector objects_bfs;
3403
3404 bool was_filtered_out = false;
3405 uint64_t granularity = 0; // extent length for a single filter
3406
3407 StoreSpaceTracker() {
3408 }
3409 StoreSpaceTracker(const StoreSpaceTracker& from) :
3410 collections_bfs(from.collections_bfs),
3411 objects_bfs(from.objects_bfs),
3412 granularity(from.granularity) {
3413 }
3414
3415 void init(uint64_t total,
3416 uint64_t min_alloc_size,
3417 uint64_t mem_cap = DEF_MEM_CAP) {
3418 ceph_assert(!granularity); // not initialized yet
3419 ceph_assert(min_alloc_size && isp2(min_alloc_size));
3420 ceph_assert(mem_cap);
3421
3422 total = round_up_to(total, min_alloc_size);
3423 granularity = total * BLOOM_FILTER_TABLE_SIZE * 2 / mem_cap;
3424
3425 if (!granularity) {
3426 granularity = min_alloc_size;
3427 } else {
3428 granularity = round_up_to(granularity, min_alloc_size);
3429 }
3430
3431 uint64_t entries = round_up_to(total, granularity) / granularity;
3432 collections_bfs.resize(entries,
3433 bloom_filter(BLOOM_FILTER_SALT_COUNT,
3434 BLOOM_FILTER_TABLE_SIZE,
3435 0,
3436 BLOOM_FILTER_EXPECTED_COUNT));
3437 objects_bfs.resize(entries,
3438 bloom_filter(BLOOM_FILTER_SALT_COUNT,
3439 BLOOM_FILTER_TABLE_SIZE,
3440 0,
3441 BLOOM_FILTER_EXPECTED_COUNT));
3442 }
3443 inline uint32_t get_hash(const coll_t& cid) const {
3444 return cid.hash_to_shard(1);
3445 }
3446 inline void set_used(uint64_t offset, uint64_t len,
3447 const coll_t& cid, const ghobject_t& oid) {
3448 ceph_assert(granularity); // initialized
3449
3450 // can't call this func after filter_out has been applied
3451 ceph_assert(!was_filtered_out);
3452 if (!len) {
3453 return;
3454 }
3455 auto pos = offset / granularity;
3456 auto end_pos = (offset + len - 1) / granularity;
3457 while (pos <= end_pos) {
3458 collections_bfs[pos].insert(get_hash(cid));
3459 objects_bfs[pos].insert(oid.hobj.get_hash());
3460 ++pos;
3461 }
3462 }
3463 // filter-out entries unrelated to the specified(broken) extents.
3464 // 'is_used' calls are permitted after that only
3465 size_t filter_out(const fsck_interval& extents);
3466
3467 // determines if collection's present after filtering-out
3468 inline bool is_used(const coll_t& cid) const {
3469 ceph_assert(was_filtered_out);
3470 for(auto& bf : collections_bfs) {
3471 if (bf.contains(get_hash(cid))) {
3472 return true;
3473 }
3474 }
3475 return false;
3476 }
3477 // determines if object's present after filtering-out
3478 inline bool is_used(const ghobject_t& oid) const {
3479 ceph_assert(was_filtered_out);
3480 for(auto& bf : objects_bfs) {
3481 if (bf.contains(oid.hobj.get_hash())) {
3482 return true;
3483 }
3484 }
3485 return false;
3486 }
3487 // determines if collection's present before filtering-out
3488 inline bool is_used(const coll_t& cid, uint64_t offs) const {
3489 ceph_assert(granularity); // initialized
3490 ceph_assert(!was_filtered_out);
3491 auto &bf = collections_bfs[offs / granularity];
3492 if (bf.contains(get_hash(cid))) {
3493 return true;
3494 }
3495 return false;
3496 }
3497 // determines if object's present before filtering-out
3498 inline bool is_used(const ghobject_t& oid, uint64_t offs) const {
3499 ceph_assert(granularity); // initialized
3500 ceph_assert(!was_filtered_out);
3501 auto &bf = objects_bfs[offs / granularity];
3502 if (bf.contains(oid.hobj.get_hash())) {
3503 return true;
3504 }
3505 return false;
3506 }
3507 };
3508 public:
3509 void fix_per_pool_omap(KeyValueDB *db);
3510 bool remove_key(KeyValueDB *db, const string& prefix, const string& key);
3511 bool fix_shared_blob(KeyValueDB *db,
3512 uint64_t sbid,
3513 const bufferlist* bl);
3514 bool fix_statfs(KeyValueDB *db, const string& key,
3515 const store_statfs_t& new_statfs);
3516
3517 bool fix_leaked(KeyValueDB *db,
3518 FreelistManager* fm,
3519 uint64_t offset, uint64_t len);
3520 bool fix_false_free(KeyValueDB *db,
3521 FreelistManager* fm,
3522 uint64_t offset, uint64_t len);
3523 bool fix_bluefs_extents(std::atomic<uint64_t>& out_of_sync_flag);
3524 KeyValueDB::Transaction fix_spanning_blobs(KeyValueDB* db);
3525
3526 void init(uint64_t total_space, uint64_t lres_tracking_unit_size);
3527
3528 bool preprocess_misreference(KeyValueDB *db);
3529
3530 unsigned apply(KeyValueDB* db);
3531
3532 void note_misreference(uint64_t offs, uint64_t len, bool inc_error) {
3533 misreferenced_extents.union_insert(offs, len);
3534 if (inc_error) {
3535 ++to_repair_cnt;
3536 }
3537 }
3538 // In fact this is the only repairer's method which is thread-safe!!
3539 void inc_repaired() {
3540 ++to_repair_cnt;
3541 }
3542
3543 StoreSpaceTracker& get_space_usage_tracker() {
3544 return space_usage_tracker;
3545 }
3546 const fsck_interval& get_misreferences() const {
3547 return misreferenced_extents;
3548 }
3549 KeyValueDB::Transaction get_fix_misreferences_txn() {
3550 return fix_misreferences_txn;
3551 }
3552
3553 private:
3554 std::atomic<unsigned> to_repair_cnt = { 0 };
3555 KeyValueDB::Transaction fix_per_pool_omap_txn;
3556 KeyValueDB::Transaction fix_fm_leaked_txn;
3557 KeyValueDB::Transaction fix_fm_false_free_txn;
3558 KeyValueDB::Transaction remove_key_txn;
3559 KeyValueDB::Transaction fix_statfs_txn;
3560 KeyValueDB::Transaction fix_shared_blob_txn;
3561
3562 KeyValueDB::Transaction fix_misreferences_txn;
3563 KeyValueDB::Transaction fix_onode_txn;
3564
3565 StoreSpaceTracker space_usage_tracker;
3566
3567 // non-shared extents with multiple references
3568 fsck_interval misreferenced_extents;
3569
3570 };
3571
3572 class RocksDBBlueFSVolumeSelector : public BlueFSVolumeSelector
3573 {
3574 template <class T, size_t MaxX, size_t MaxY>
3575 class matrix_2d {
3576 T values[MaxX][MaxY];
3577 public:
3578 matrix_2d() {
3579 clear();
3580 }
3581 T& at(size_t x, size_t y) {
3582 ceph_assert(x < MaxX);
3583 ceph_assert(y < MaxY);
3584
3585 return values[x][y];
3586 }
3587 size_t get_max_x() const {
3588 return MaxX;
3589 }
3590 size_t get_max_y() const {
3591 return MaxY;
3592 }
3593 void clear() {
3594 memset(values, 0, sizeof(values));
3595 }
3596 };
3597
3598 enum {
3599 // use 0/nullptr as unset indication
3600 LEVEL_FIRST = 1,
3601 LEVEL_LOG = LEVEL_FIRST, // BlueFS log
3602 LEVEL_WAL,
3603 LEVEL_DB,
3604 LEVEL_SLOW,
3605 LEVEL_MAX
3606 };
3607 // add +1 row for corresponding per-device totals
3608 // add +1 column for per-level actual (taken from file size) total
3609 typedef matrix_2d<uint64_t, BlueFS::MAX_BDEV + 1, LEVEL_MAX - LEVEL_FIRST + 1> per_level_per_dev_usage_t;
3610
3611 per_level_per_dev_usage_t per_level_per_dev_usage;
3612 // file count per level, add +1 to keep total file count
3613 uint64_t per_level_files[LEVEL_MAX - LEVEL_FIRST + 1] = { 0 };
3614
3615 // Note: maximum per-device totals below might be smaller than corresponding
3616 // perf counters by up to a single alloc unit (1M) due to superblock extent.
3617 // The later is not accounted here.
3618 per_level_per_dev_usage_t per_level_per_dev_max;
3619
3620 uint64_t l_totals[LEVEL_MAX - LEVEL_FIRST];
3621 uint64_t db_avail4slow = 0;
3622 enum {
3623 OLD_POLICY,
3624 USE_SOME_EXTRA
3625 };
3626
3627 public:
3628 RocksDBBlueFSVolumeSelector(
3629 uint64_t _wal_total,
3630 uint64_t _db_total,
3631 uint64_t _slow_total,
3632 uint64_t _level0_size,
3633 uint64_t _level_base,
3634 uint64_t _level_multiplier,
3635 double reserved_factor,
3636 uint64_t reserved,
3637 bool new_pol)
3638 {
3639 l_totals[LEVEL_LOG - LEVEL_FIRST] = 0; // not used at the moment
3640 l_totals[LEVEL_WAL - LEVEL_FIRST] = _wal_total;
3641 l_totals[LEVEL_DB - LEVEL_FIRST] = _db_total;
3642 l_totals[LEVEL_SLOW - LEVEL_FIRST] = _slow_total;
3643
3644 if (!new_pol) {
3645 return;
3646 }
3647
3648 // Calculating how much extra space is available at DB volume.
3649 // Depending on the presence of explicit reserved size specification it might be either
3650 // * DB volume size - reserved
3651 // or
3652 // * DB volume size - sum_max_level_size(0, L-1) - max_level_size(L) * reserved_factor
3653 if (!reserved) {
3654 uint64_t prev_levels = _level0_size;
3655 uint64_t cur_level = _level_base;
3656 uint64_t cur_threshold = 0;
3657 do {
3658 uint64_t next_level = cur_level * _level_multiplier;
3659 uint64_t next_threshold = prev_levels + cur_level + next_level * reserved_factor;
3660 if (_db_total <= next_threshold) {
3661 db_avail4slow = cur_threshold ? _db_total - cur_threshold : 0;
3662 break;
3663 } else {
3664 prev_levels += cur_level;
3665 cur_level = next_level;
3666 cur_threshold = next_threshold;
3667 }
3668 } while (true);
3669 } else {
3670 db_avail4slow = _db_total - reserved;
3671 }
3672 }
3673
3674 void* get_hint_for_log() const override {
3675 return reinterpret_cast<void*>(LEVEL_LOG);
3676 }
3677 void* get_hint_by_dir(const string& dirname) const override;
3678
3679 void add_usage(void* hint, const bluefs_fnode_t& fnode) override {
3680 if (hint == nullptr)
3681 return;
3682 size_t pos = (size_t)hint - LEVEL_FIRST;
3683 for (auto& p : fnode.extents) {
3684 auto& cur = per_level_per_dev_usage.at(p.bdev, pos);
3685 auto& max = per_level_per_dev_max.at(p.bdev, pos);
3686 cur += p.length;
3687 if (cur > max) {
3688 max = cur;
3689 }
3690 {
3691 //update per-device totals
3692 auto& cur = per_level_per_dev_usage.at(p.bdev, LEVEL_MAX - LEVEL_FIRST);
3693 auto& max = per_level_per_dev_max.at(p.bdev, LEVEL_MAX - LEVEL_FIRST);
3694 cur += p.length;
3695 if (cur > max) {
3696 max = cur;
3697 }
3698 }
3699 }
3700 {
3701 //update per-level actual totals
3702 auto& cur = per_level_per_dev_usage.at(BlueFS::MAX_BDEV, pos);
3703 auto& max = per_level_per_dev_max.at(BlueFS::MAX_BDEV, pos);
3704 cur += fnode.size;
3705 if (cur > max) {
3706 max = cur;
3707 }
3708 }
3709 ++per_level_files[pos];
3710 ++per_level_files[LEVEL_MAX - LEVEL_FIRST];
3711 }
3712 void sub_usage(void* hint, const bluefs_fnode_t& fnode) override {
3713 if (hint == nullptr)
3714 return;
3715 size_t pos = (size_t)hint - LEVEL_FIRST;
3716 for (auto& p : fnode.extents) {
3717 auto& cur = per_level_per_dev_usage.at(p.bdev, pos);
3718 ceph_assert(cur >= p.length);
3719 cur -= p.length;
3720
3721 //update per-device totals
3722 auto& cur2 = per_level_per_dev_usage.at(p.bdev, LEVEL_MAX - LEVEL_FIRST);
3723 ceph_assert(cur2 >= p.length);
3724 cur2 -= p.length;
3725 }
3726 //update per-level actual totals
3727 auto& cur = per_level_per_dev_usage.at(BlueFS::MAX_BDEV, pos);
3728 ceph_assert(cur >= fnode.size);
3729 cur -= fnode.size;
3730 ceph_assert(per_level_files[pos] > 0);
3731 --per_level_files[pos];
3732 ceph_assert(per_level_files[LEVEL_MAX - LEVEL_FIRST] > 0);
3733 --per_level_files[LEVEL_MAX - LEVEL_FIRST];
3734 }
3735 void add_usage(void* hint, uint64_t fsize) override {
3736 if (hint == nullptr)
3737 return;
3738 size_t pos = (size_t)hint - LEVEL_FIRST;
3739 //update per-level actual totals
3740 auto& cur = per_level_per_dev_usage.at(BlueFS::MAX_BDEV, pos);
3741 auto& max = per_level_per_dev_max.at(BlueFS::MAX_BDEV, pos);
3742 cur += fsize;
3743 if (cur > max) {
3744 max = cur;
3745 }
3746 }
3747 void sub_usage(void* hint, uint64_t fsize) override {
3748 if (hint == nullptr)
3749 return;
3750 size_t pos = (size_t)hint - LEVEL_FIRST;
3751 //update per-level actual totals
3752 auto& cur = per_level_per_dev_usage.at(BlueFS::MAX_BDEV, pos);
3753 ceph_assert(cur >= fsize);
3754 per_level_per_dev_usage.at(BlueFS::MAX_BDEV, pos) -= fsize;
3755 }
3756
3757 uint8_t select_prefer_bdev(void* h) override;
3758 void get_paths(
3759 const std::string& base,
3760 BlueFSVolumeSelector::paths& res) const override;
3761
3762 void dump(ostream& sout) override;
3763 };
3764
3765 #endif