]> git.proxmox.com Git - ceph.git/blob - ceph/src/os/bluestore/BlueStore.h
bump version to 15.2.4-pve1
[ceph.git] / ceph / src / os / bluestore / BlueStore.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_OSD_BLUESTORE_H
16 #define CEPH_OSD_BLUESTORE_H
17
18 #include "acconfig.h"
19
20 #include <unistd.h>
21
22 #include <atomic>
23 #include <chrono>
24 #include <ratio>
25 #include <mutex>
26 #include <condition_variable>
27
28 #include <boost/intrusive/list.hpp>
29 #include <boost/intrusive/unordered_set.hpp>
30 #include <boost/intrusive/set.hpp>
31 #include <boost/functional/hash.hpp>
32 #include <boost/dynamic_bitset.hpp>
33 #include <boost/circular_buffer.hpp>
34
35 #include "include/cpp-btree/btree_set.h"
36
37 #include "include/ceph_assert.h"
38 #include "include/unordered_map.h"
39 #include "include/mempool.h"
40 #include "include/hash.h"
41 #include "common/bloom_filter.hpp"
42 #include "common/Finisher.h"
43 #include "common/ceph_mutex.h"
44 #include "common/Throttle.h"
45 #include "common/perf_counters.h"
46 #include "common/PriorityCache.h"
47 #include "compressor/Compressor.h"
48 #include "os/ObjectStore.h"
49
50 #include "bluestore_types.h"
51 #include "BlockDevice.h"
52 #include "BlueFS.h"
53 #include "common/EventTrace.h"
54
55 class Allocator;
56 class FreelistManager;
57 class BlueStoreRepairer;
58
59 //#define DEBUG_CACHE
60 //#define DEBUG_DEFERRED
61
62
63
64 // constants for Buffer::optimize()
65 #define MAX_BUFFER_SLOP_RATIO_DEN 8 // so actually 1/N
66
67
68 enum {
69 l_bluestore_first = 732430,
70 l_bluestore_kv_flush_lat,
71 l_bluestore_kv_commit_lat,
72 l_bluestore_kv_sync_lat,
73 l_bluestore_kv_final_lat,
74 l_bluestore_state_prepare_lat,
75 l_bluestore_state_aio_wait_lat,
76 l_bluestore_state_io_done_lat,
77 l_bluestore_state_kv_queued_lat,
78 l_bluestore_state_kv_committing_lat,
79 l_bluestore_state_kv_done_lat,
80 l_bluestore_state_deferred_queued_lat,
81 l_bluestore_state_deferred_aio_wait_lat,
82 l_bluestore_state_deferred_cleanup_lat,
83 l_bluestore_state_finishing_lat,
84 l_bluestore_state_done_lat,
85 l_bluestore_throttle_lat,
86 l_bluestore_submit_lat,
87 l_bluestore_commit_lat,
88 l_bluestore_read_lat,
89 l_bluestore_read_onode_meta_lat,
90 l_bluestore_read_wait_aio_lat,
91 l_bluestore_compress_lat,
92 l_bluestore_decompress_lat,
93 l_bluestore_csum_lat,
94 l_bluestore_compress_success_count,
95 l_bluestore_compress_rejected_count,
96 l_bluestore_write_pad_bytes,
97 l_bluestore_deferred_write_ops,
98 l_bluestore_deferred_write_bytes,
99 l_bluestore_write_penalty_read_ops,
100 l_bluestore_allocated,
101 l_bluestore_stored,
102 l_bluestore_compressed,
103 l_bluestore_compressed_allocated,
104 l_bluestore_compressed_original,
105 l_bluestore_onodes,
106 l_bluestore_pinned_onodes,
107 l_bluestore_onode_hits,
108 l_bluestore_onode_misses,
109 l_bluestore_onode_shard_hits,
110 l_bluestore_onode_shard_misses,
111 l_bluestore_extents,
112 l_bluestore_blobs,
113 l_bluestore_buffers,
114 l_bluestore_buffer_bytes,
115 l_bluestore_buffer_hit_bytes,
116 l_bluestore_buffer_miss_bytes,
117 l_bluestore_write_big,
118 l_bluestore_write_big_bytes,
119 l_bluestore_write_big_blobs,
120 l_bluestore_write_small,
121 l_bluestore_write_small_bytes,
122 l_bluestore_write_small_unused,
123 l_bluestore_write_small_deferred,
124 l_bluestore_write_small_pre_read,
125 l_bluestore_write_small_new,
126 l_bluestore_txc,
127 l_bluestore_onode_reshard,
128 l_bluestore_blob_split,
129 l_bluestore_extent_compress,
130 l_bluestore_gc_merged,
131 l_bluestore_read_eio,
132 l_bluestore_reads_with_retries,
133 l_bluestore_fragmentation,
134 l_bluestore_omap_seek_to_first_lat,
135 l_bluestore_omap_upper_bound_lat,
136 l_bluestore_omap_lower_bound_lat,
137 l_bluestore_omap_next_lat,
138 l_bluestore_clist_lat,
139 l_bluestore_last
140 };
141
142 #define META_POOL_ID ((uint64_t)-1ull)
143
144 class BlueStore : public ObjectStore,
145 public BlueFSDeviceExpander,
146 public md_config_obs_t {
147 // -----------------------------------------------------
148 // types
149 public:
150 // config observer
151 const char** get_tracked_conf_keys() const override;
152 void handle_conf_change(const ConfigProxy& conf,
153 const std::set<std::string> &changed) override;
154
155 //handler for discard event
156 void handle_discard(interval_set<uint64_t>& to_release);
157
158 void _set_csum();
159 void _set_compression();
160 void _set_throttle_params();
161 int _set_cache_sizes();
162 void _set_max_defer_interval() {
163 max_defer_interval =
164 cct->_conf.get_val<double>("bluestore_max_defer_interval");
165 }
166
167 class TransContext;
168
169 typedef map<uint64_t, bufferlist> ready_regions_t;
170
171
172 struct BufferSpace;
173 struct Collection;
174 typedef boost::intrusive_ptr<Collection> CollectionRef;
175
176 struct AioContext {
177 virtual void aio_finish(BlueStore *store) = 0;
178 virtual ~AioContext() {}
179 };
180
181 /// cached buffer
182 struct Buffer {
183 MEMPOOL_CLASS_HELPERS();
184
185 enum {
186 STATE_EMPTY, ///< empty buffer -- used for cache history
187 STATE_CLEAN, ///< clean data that is up to date
188 STATE_WRITING, ///< data that is being written (io not yet complete)
189 };
190 static const char *get_state_name(int s) {
191 switch (s) {
192 case STATE_EMPTY: return "empty";
193 case STATE_CLEAN: return "clean";
194 case STATE_WRITING: return "writing";
195 default: return "???";
196 }
197 }
198 enum {
199 FLAG_NOCACHE = 1, ///< trim when done WRITING (do not become CLEAN)
200 // NOTE: fix operator<< when you define a second flag
201 };
202 static const char *get_flag_name(int s) {
203 switch (s) {
204 case FLAG_NOCACHE: return "nocache";
205 default: return "???";
206 }
207 }
208
209 BufferSpace *space;
210 uint16_t state; ///< STATE_*
211 uint16_t cache_private = 0; ///< opaque (to us) value used by Cache impl
212 uint32_t flags; ///< FLAG_*
213 uint64_t seq;
214 uint32_t offset, length;
215 bufferlist data;
216
217 boost::intrusive::list_member_hook<> lru_item;
218 boost::intrusive::list_member_hook<> state_item;
219
220 Buffer(BufferSpace *space, unsigned s, uint64_t q, uint32_t o, uint32_t l,
221 unsigned f = 0)
222 : space(space), state(s), flags(f), seq(q), offset(o), length(l) {}
223 Buffer(BufferSpace *space, unsigned s, uint64_t q, uint32_t o, bufferlist& b,
224 unsigned f = 0)
225 : space(space), state(s), flags(f), seq(q), offset(o),
226 length(b.length()), data(b) {}
227
228 bool is_empty() const {
229 return state == STATE_EMPTY;
230 }
231 bool is_clean() const {
232 return state == STATE_CLEAN;
233 }
234 bool is_writing() const {
235 return state == STATE_WRITING;
236 }
237
238 uint32_t end() const {
239 return offset + length;
240 }
241
242 void truncate(uint32_t newlen) {
243 ceph_assert(newlen < length);
244 if (data.length()) {
245 bufferlist t;
246 t.substr_of(data, 0, newlen);
247 data.claim(t);
248 }
249 length = newlen;
250 }
251 void maybe_rebuild() {
252 if (data.length() &&
253 (data.get_num_buffers() > 1 ||
254 data.front().wasted() > data.length() / MAX_BUFFER_SLOP_RATIO_DEN)) {
255 data.rebuild();
256 }
257 }
258
259 void dump(Formatter *f) const {
260 f->dump_string("state", get_state_name(state));
261 f->dump_unsigned("seq", seq);
262 f->dump_unsigned("offset", offset);
263 f->dump_unsigned("length", length);
264 f->dump_unsigned("data_length", data.length());
265 }
266 };
267
268 struct BufferCacheShard;
269
270 /// map logical extent range (object) onto buffers
271 struct BufferSpace {
272 enum {
273 BYPASS_CLEAN_CACHE = 0x1, // bypass clean cache
274 };
275
276 typedef boost::intrusive::list<
277 Buffer,
278 boost::intrusive::member_hook<
279 Buffer,
280 boost::intrusive::list_member_hook<>,
281 &Buffer::state_item> > state_list_t;
282
283 mempool::bluestore_cache_other::map<uint32_t, std::unique_ptr<Buffer>>
284 buffer_map;
285
286 // we use a bare intrusive list here instead of std::map because
287 // it uses less memory and we expect this to be very small (very
288 // few IOs in flight to the same Blob at the same time).
289 state_list_t writing; ///< writing buffers, sorted by seq, ascending
290
291 ~BufferSpace() {
292 ceph_assert(buffer_map.empty());
293 ceph_assert(writing.empty());
294 }
295
296 void _add_buffer(BufferCacheShard* cache, Buffer *b, int level, Buffer *near) {
297 cache->_audit("_add_buffer start");
298 buffer_map[b->offset].reset(b);
299 if (b->is_writing()) {
300 b->data.reassign_to_mempool(mempool::mempool_bluestore_writing);
301 if (writing.empty() || writing.rbegin()->seq <= b->seq) {
302 writing.push_back(*b);
303 } else {
304 auto it = writing.begin();
305 while (it->seq < b->seq) {
306 ++it;
307 }
308
309 ceph_assert(it->seq >= b->seq);
310 // note that this will insert b before it
311 // hence the order is maintained
312 writing.insert(it, *b);
313 }
314 } else {
315 b->data.reassign_to_mempool(mempool::mempool_bluestore_cache_data);
316 cache->_add(b, level, near);
317 }
318 cache->_audit("_add_buffer end");
319 }
320 void _rm_buffer(BufferCacheShard* cache, Buffer *b) {
321 _rm_buffer(cache, buffer_map.find(b->offset));
322 }
323 void _rm_buffer(BufferCacheShard* cache,
324 map<uint32_t, std::unique_ptr<Buffer>>::iterator p) {
325 ceph_assert(p != buffer_map.end());
326 cache->_audit("_rm_buffer start");
327 if (p->second->is_writing()) {
328 writing.erase(writing.iterator_to(*p->second));
329 } else {
330 cache->_rm(p->second.get());
331 }
332 buffer_map.erase(p);
333 cache->_audit("_rm_buffer end");
334 }
335
336 map<uint32_t,std::unique_ptr<Buffer>>::iterator _data_lower_bound(
337 uint32_t offset) {
338 auto i = buffer_map.lower_bound(offset);
339 if (i != buffer_map.begin()) {
340 --i;
341 if (i->first + i->second->length <= offset)
342 ++i;
343 }
344 return i;
345 }
346
347 // must be called under protection of the Cache lock
348 void _clear(BufferCacheShard* cache);
349
350 // return value is the highest cache_private of a trimmed buffer, or 0.
351 int discard(BufferCacheShard* cache, uint32_t offset, uint32_t length) {
352 std::lock_guard l(cache->lock);
353 int ret = _discard(cache, offset, length);
354 cache->_trim();
355 return ret;
356 }
357 int _discard(BufferCacheShard* cache, uint32_t offset, uint32_t length);
358
359 void write(BufferCacheShard* cache, uint64_t seq, uint32_t offset, bufferlist& bl,
360 unsigned flags) {
361 std::lock_guard l(cache->lock);
362 Buffer *b = new Buffer(this, Buffer::STATE_WRITING, seq, offset, bl,
363 flags);
364 b->cache_private = _discard(cache, offset, bl.length());
365 _add_buffer(cache, b, (flags & Buffer::FLAG_NOCACHE) ? 0 : 1, nullptr);
366 cache->_trim();
367 }
368 void _finish_write(BufferCacheShard* cache, uint64_t seq);
369 void did_read(BufferCacheShard* cache, uint32_t offset, bufferlist& bl) {
370 std::lock_guard l(cache->lock);
371 Buffer *b = new Buffer(this, Buffer::STATE_CLEAN, 0, offset, bl);
372 b->cache_private = _discard(cache, offset, bl.length());
373 _add_buffer(cache, b, 1, nullptr);
374 cache->_trim();
375 }
376
377 void read(BufferCacheShard* cache, uint32_t offset, uint32_t length,
378 BlueStore::ready_regions_t& res,
379 interval_set<uint32_t>& res_intervals,
380 int flags = 0);
381
382 void truncate(BufferCacheShard* cache, uint32_t offset) {
383 discard(cache, offset, (uint32_t)-1 - offset);
384 }
385
386 void split(BufferCacheShard* cache, size_t pos, BufferSpace &r);
387
388 void dump(BufferCacheShard* cache, Formatter *f) const {
389 std::lock_guard l(cache->lock);
390 f->open_array_section("buffers");
391 for (auto& i : buffer_map) {
392 f->open_object_section("buffer");
393 ceph_assert(i.first == i.second->offset);
394 i.second->dump(f);
395 f->close_section();
396 }
397 f->close_section();
398 }
399 };
400
401 struct SharedBlobSet;
402
403 /// in-memory shared blob state (incl cached buffers)
404 struct SharedBlob {
405 MEMPOOL_CLASS_HELPERS();
406
407 std::atomic_int nref = {0}; ///< reference count
408 bool loaded = false;
409
410 CollectionRef coll;
411 union {
412 uint64_t sbid_unloaded; ///< sbid if persistent isn't loaded
413 bluestore_shared_blob_t *persistent; ///< persistent part of the shared blob if any
414 };
415 BufferSpace bc; ///< buffer cache
416
417 SharedBlob(Collection *_coll) : coll(_coll), sbid_unloaded(0) {
418 if (get_cache()) {
419 get_cache()->add_blob();
420 }
421 }
422 SharedBlob(uint64_t i, Collection *_coll);
423 ~SharedBlob();
424
425 uint64_t get_sbid() const {
426 return loaded ? persistent->sbid : sbid_unloaded;
427 }
428
429 friend void intrusive_ptr_add_ref(SharedBlob *b) { b->get(); }
430 friend void intrusive_ptr_release(SharedBlob *b) { b->put(); }
431
432 void dump(Formatter* f) const;
433 friend ostream& operator<<(ostream& out, const SharedBlob& sb);
434
435 void get() {
436 ++nref;
437 }
438 void put();
439
440 /// get logical references
441 void get_ref(uint64_t offset, uint32_t length);
442
443 /// put logical references, and get back any released extents
444 void put_ref(uint64_t offset, uint32_t length,
445 PExtentVector *r, bool *unshare);
446
447 void finish_write(uint64_t seq);
448
449 friend bool operator==(const SharedBlob &l, const SharedBlob &r) {
450 return l.get_sbid() == r.get_sbid();
451 }
452 inline BufferCacheShard* get_cache() {
453 return coll ? coll->cache : nullptr;
454 }
455 inline SharedBlobSet* get_parent() {
456 return coll ? &(coll->shared_blob_set) : nullptr;
457 }
458 inline bool is_loaded() const {
459 return loaded;
460 }
461
462 };
463 typedef boost::intrusive_ptr<SharedBlob> SharedBlobRef;
464
465 /// a lookup table of SharedBlobs
466 struct SharedBlobSet {
467 /// protect lookup, insertion, removal
468 ceph::mutex lock = ceph::make_mutex("BlueStore::SharedBlobSet::lock");
469
470 // we use a bare pointer because we don't want to affect the ref
471 // count
472 mempool::bluestore_cache_other::unordered_map<uint64_t,SharedBlob*> sb_map;
473
474 SharedBlobRef lookup(uint64_t sbid) {
475 std::lock_guard l(lock);
476 auto p = sb_map.find(sbid);
477 if (p == sb_map.end() ||
478 p->second->nref == 0) {
479 return nullptr;
480 }
481 return p->second;
482 }
483
484 void add(Collection* coll, SharedBlob *sb) {
485 std::lock_guard l(lock);
486 sb_map[sb->get_sbid()] = sb;
487 sb->coll = coll;
488 }
489
490 bool remove(SharedBlob *sb, bool verify_nref_is_zero=false) {
491 std::lock_guard l(lock);
492 ceph_assert(sb->get_parent() == this);
493 if (verify_nref_is_zero && sb->nref != 0) {
494 return false;
495 }
496 // only remove if it still points to us
497 auto p = sb_map.find(sb->get_sbid());
498 if (p != sb_map.end() &&
499 p->second == sb) {
500 sb_map.erase(p);
501 }
502 return true;
503 }
504
505 bool empty() {
506 std::lock_guard l(lock);
507 return sb_map.empty();
508 }
509
510 template <int LogLevelV>
511 void dump(CephContext *cct);
512 };
513
514 //#define CACHE_BLOB_BL // not sure if this is a win yet or not... :/
515
516 /// in-memory blob metadata and associated cached buffers (if any)
517 struct Blob {
518 MEMPOOL_CLASS_HELPERS();
519
520 std::atomic_int nref = {0}; ///< reference count
521 int16_t id = -1; ///< id, for spanning blobs only, >= 0
522 int16_t last_encoded_id = -1; ///< (ephemeral) used during encoding only
523 SharedBlobRef shared_blob; ///< shared blob state (if any)
524
525 private:
526 mutable bluestore_blob_t blob; ///< decoded blob metadata
527 #ifdef CACHE_BLOB_BL
528 mutable bufferlist blob_bl; ///< cached encoded blob, blob is dirty if empty
529 #endif
530 /// refs from this shard. ephemeral if id<0, persisted if spanning.
531 bluestore_blob_use_tracker_t used_in_blob;
532
533 public:
534
535 friend void intrusive_ptr_add_ref(Blob *b) { b->get(); }
536 friend void intrusive_ptr_release(Blob *b) { b->put(); }
537
538 void dump(Formatter* f) const;
539 friend ostream& operator<<(ostream& out, const Blob &b);
540
541 const bluestore_blob_use_tracker_t& get_blob_use_tracker() const {
542 return used_in_blob;
543 }
544 bool is_referenced() const {
545 return used_in_blob.is_not_empty();
546 }
547 uint32_t get_referenced_bytes() const {
548 return used_in_blob.get_referenced_bytes();
549 }
550
551 bool is_spanning() const {
552 return id >= 0;
553 }
554
555 bool can_split() const {
556 std::lock_guard l(shared_blob->get_cache()->lock);
557 // splitting a BufferSpace writing list is too hard; don't try.
558 return shared_blob->bc.writing.empty() &&
559 used_in_blob.can_split() &&
560 get_blob().can_split();
561 }
562
563 bool can_split_at(uint32_t blob_offset) const {
564 return used_in_blob.can_split_at(blob_offset) &&
565 get_blob().can_split_at(blob_offset);
566 }
567
568 bool can_reuse_blob(uint32_t min_alloc_size,
569 uint32_t target_blob_size,
570 uint32_t b_offset,
571 uint32_t *length0);
572
573 void dup(Blob& o) {
574 o.shared_blob = shared_blob;
575 o.blob = blob;
576 #ifdef CACHE_BLOB_BL
577 o.blob_bl = blob_bl;
578 #endif
579 }
580
581 inline const bluestore_blob_t& get_blob() const {
582 return blob;
583 }
584 inline bluestore_blob_t& dirty_blob() {
585 #ifdef CACHE_BLOB_BL
586 blob_bl.clear();
587 #endif
588 return blob;
589 }
590
591 /// discard buffers for unallocated regions
592 void discard_unallocated(Collection *coll);
593
594 /// get logical references
595 void get_ref(Collection *coll, uint32_t offset, uint32_t length);
596 /// put logical references, and get back any released extents
597 bool put_ref(Collection *coll, uint32_t offset, uint32_t length,
598 PExtentVector *r);
599
600 /// split the blob
601 void split(Collection *coll, uint32_t blob_offset, Blob *o);
602
603 void get() {
604 ++nref;
605 }
606 void put() {
607 if (--nref == 0)
608 delete this;
609 }
610
611
612 #ifdef CACHE_BLOB_BL
613 void _encode() const {
614 if (blob_bl.length() == 0 ) {
615 encode(blob, blob_bl);
616 } else {
617 ceph_assert(blob_bl.length());
618 }
619 }
620 void bound_encode(
621 size_t& p,
622 bool include_ref_map) const {
623 _encode();
624 p += blob_bl.length();
625 if (include_ref_map) {
626 used_in_blob.bound_encode(p);
627 }
628 }
629 void encode(
630 bufferlist::contiguous_appender& p,
631 bool include_ref_map) const {
632 _encode();
633 p.append(blob_bl);
634 if (include_ref_map) {
635 used_in_blob.encode(p);
636 }
637 }
638 void decode(
639 Collection */*coll*/,
640 bufferptr::const_iterator& p,
641 bool include_ref_map) {
642 const char *start = p.get_pos();
643 denc(blob, p);
644 const char *end = p.get_pos();
645 blob_bl.clear();
646 blob_bl.append(start, end - start);
647 if (include_ref_map) {
648 used_in_blob.decode(p);
649 }
650 }
651 #else
652 void bound_encode(
653 size_t& p,
654 uint64_t struct_v,
655 uint64_t sbid,
656 bool include_ref_map) const {
657 denc(blob, p, struct_v);
658 if (blob.is_shared()) {
659 denc(sbid, p);
660 }
661 if (include_ref_map) {
662 used_in_blob.bound_encode(p);
663 }
664 }
665 void encode(
666 bufferlist::contiguous_appender& p,
667 uint64_t struct_v,
668 uint64_t sbid,
669 bool include_ref_map) const {
670 denc(blob, p, struct_v);
671 if (blob.is_shared()) {
672 denc(sbid, p);
673 }
674 if (include_ref_map) {
675 used_in_blob.encode(p);
676 }
677 }
678 void decode(
679 Collection *coll,
680 bufferptr::const_iterator& p,
681 uint64_t struct_v,
682 uint64_t* sbid,
683 bool include_ref_map);
684 #endif
685 };
686 typedef boost::intrusive_ptr<Blob> BlobRef;
687 typedef mempool::bluestore_cache_other::map<int,BlobRef> blob_map_t;
688
689 /// a logical extent, pointing to (some portion of) a blob
690 typedef boost::intrusive::set_base_hook<boost::intrusive::optimize_size<true> > ExtentBase; //making an alias to avoid build warnings
691 struct Extent : public ExtentBase {
692 MEMPOOL_CLASS_HELPERS();
693
694 uint32_t logical_offset = 0; ///< logical offset
695 uint32_t blob_offset = 0; ///< blob offset
696 uint32_t length = 0; ///< length
697 BlobRef blob; ///< the blob with our data
698
699 /// ctor for lookup only
700 explicit Extent(uint32_t lo) : ExtentBase(), logical_offset(lo) { }
701 /// ctor for delayed initialization (see decode_some())
702 explicit Extent() : ExtentBase() {
703 }
704 /// ctor for general usage
705 Extent(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b)
706 : ExtentBase(),
707 logical_offset(lo), blob_offset(o), length(l) {
708 assign_blob(b);
709 }
710 ~Extent() {
711 if (blob) {
712 blob->shared_blob->get_cache()->rm_extent();
713 }
714 }
715
716 void dump(Formatter* f) const;
717
718 void assign_blob(const BlobRef& b) {
719 ceph_assert(!blob);
720 blob = b;
721 blob->shared_blob->get_cache()->add_extent();
722 }
723
724 // comparators for intrusive_set
725 friend bool operator<(const Extent &a, const Extent &b) {
726 return a.logical_offset < b.logical_offset;
727 }
728 friend bool operator>(const Extent &a, const Extent &b) {
729 return a.logical_offset > b.logical_offset;
730 }
731 friend bool operator==(const Extent &a, const Extent &b) {
732 return a.logical_offset == b.logical_offset;
733 }
734
735 uint32_t blob_start() const {
736 return logical_offset - blob_offset;
737 }
738
739 uint32_t blob_end() const {
740 return blob_start() + blob->get_blob().get_logical_length();
741 }
742
743 uint32_t logical_end() const {
744 return logical_offset + length;
745 }
746
747 // return true if any piece of the blob is out of
748 // the given range [o, o + l].
749 bool blob_escapes_range(uint32_t o, uint32_t l) const {
750 return blob_start() < o || blob_end() > o + l;
751 }
752 };
753 typedef boost::intrusive::set<Extent> extent_map_t;
754
755
756 friend ostream& operator<<(ostream& out, const Extent& e);
757
758 struct OldExtent {
759 boost::intrusive::list_member_hook<> old_extent_item;
760 Extent e;
761 PExtentVector r;
762 bool blob_empty; // flag to track the last removed extent that makes blob
763 // empty - required to update compression stat properly
764 OldExtent(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b)
765 : e(lo, o, l, b), blob_empty(false) {
766 }
767 static OldExtent* create(CollectionRef c,
768 uint32_t lo,
769 uint32_t o,
770 uint32_t l,
771 BlobRef& b);
772 };
773 typedef boost::intrusive::list<
774 OldExtent,
775 boost::intrusive::member_hook<
776 OldExtent,
777 boost::intrusive::list_member_hook<>,
778 &OldExtent::old_extent_item> > old_extent_map_t;
779
780 struct Onode;
781
782 /// a sharded extent map, mapping offsets to lextents to blobs
783 struct ExtentMap {
784 Onode *onode;
785 extent_map_t extent_map; ///< map of Extents to Blobs
786 blob_map_t spanning_blob_map; ///< blobs that span shards
787 typedef boost::intrusive_ptr<Onode> OnodeRef;
788
789 struct Shard {
790 bluestore_onode_t::shard_info *shard_info = nullptr;
791 unsigned extents = 0; ///< count extents in this shard
792 bool loaded = false; ///< true if shard is loaded
793 bool dirty = false; ///< true if shard is dirty and needs reencoding
794 };
795 mempool::bluestore_cache_other::vector<Shard> shards; ///< shards
796
797 bufferlist inline_bl; ///< cached encoded map, if unsharded; empty=>dirty
798
799 uint32_t needs_reshard_begin = 0;
800 uint32_t needs_reshard_end = 0;
801
802 void dup(BlueStore* b, TransContext*, CollectionRef&, OnodeRef&, OnodeRef&,
803 uint64_t&, uint64_t&, uint64_t&);
804
805 bool needs_reshard() const {
806 return needs_reshard_end > needs_reshard_begin;
807 }
808 void clear_needs_reshard() {
809 needs_reshard_begin = needs_reshard_end = 0;
810 }
811 void request_reshard(uint32_t begin, uint32_t end) {
812 if (begin < needs_reshard_begin) {
813 needs_reshard_begin = begin;
814 }
815 if (end > needs_reshard_end) {
816 needs_reshard_end = end;
817 }
818 }
819
820 struct DeleteDisposer {
821 void operator()(Extent *e) { delete e; }
822 };
823
824 ExtentMap(Onode *o);
825 ~ExtentMap() {
826 extent_map.clear_and_dispose(DeleteDisposer());
827 }
828
829 void clear() {
830 extent_map.clear_and_dispose(DeleteDisposer());
831 shards.clear();
832 inline_bl.clear();
833 clear_needs_reshard();
834 }
835
836 void dump(Formatter* f) const;
837
838 bool encode_some(uint32_t offset, uint32_t length, bufferlist& bl,
839 unsigned *pn);
840 unsigned decode_some(bufferlist& bl);
841
842 void bound_encode_spanning_blobs(size_t& p);
843 void encode_spanning_blobs(bufferlist::contiguous_appender& p);
844 void decode_spanning_blobs(bufferptr::const_iterator& p);
845
846 BlobRef get_spanning_blob(int id) {
847 auto p = spanning_blob_map.find(id);
848 ceph_assert(p != spanning_blob_map.end());
849 return p->second;
850 }
851
852 void update(KeyValueDB::Transaction t, bool force);
853 decltype(BlueStore::Blob::id) allocate_spanning_blob_id();
854 void reshard(
855 KeyValueDB *db,
856 KeyValueDB::Transaction t);
857
858 /// initialize Shards from the onode
859 void init_shards(bool loaded, bool dirty);
860
861 /// return index of shard containing offset
862 /// or -1 if not found
863 int seek_shard(uint32_t offset) {
864 size_t end = shards.size();
865 size_t mid, left = 0;
866 size_t right = end; // one passed the right end
867
868 while (left < right) {
869 mid = left + (right - left) / 2;
870 if (offset >= shards[mid].shard_info->offset) {
871 size_t next = mid + 1;
872 if (next >= end || offset < shards[next].shard_info->offset)
873 return mid;
874 //continue to search forwards
875 left = next;
876 } else {
877 //continue to search backwards
878 right = mid;
879 }
880 }
881
882 return -1; // not found
883 }
884
885 /// check if a range spans a shard
886 bool spans_shard(uint32_t offset, uint32_t length) {
887 if (shards.empty()) {
888 return false;
889 }
890 int s = seek_shard(offset);
891 ceph_assert(s >= 0);
892 if (s == (int)shards.size() - 1) {
893 return false; // last shard
894 }
895 if (offset + length <= shards[s+1].shard_info->offset) {
896 return false;
897 }
898 return true;
899 }
900
901 /// ensure that a range of the map is loaded
902 void fault_range(KeyValueDB *db,
903 uint32_t offset, uint32_t length);
904
905 /// ensure a range of the map is marked dirty
906 void dirty_range(uint32_t offset, uint32_t length);
907
908 /// for seek_lextent test
909 extent_map_t::iterator find(uint64_t offset);
910
911 /// seek to the first lextent including or after offset
912 extent_map_t::iterator seek_lextent(uint64_t offset);
913 extent_map_t::const_iterator seek_lextent(uint64_t offset) const;
914
915 /// add a new Extent
916 void add(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b) {
917 extent_map.insert(*new Extent(lo, o, l, b));
918 }
919
920 /// remove (and delete) an Extent
921 void rm(extent_map_t::iterator p) {
922 extent_map.erase_and_dispose(p, DeleteDisposer());
923 }
924
925 bool has_any_lextents(uint64_t offset, uint64_t length);
926
927 /// consolidate adjacent lextents in extent_map
928 int compress_extent_map(uint64_t offset, uint64_t length);
929
930 /// punch a logical hole. add lextents to deref to target list.
931 void punch_hole(CollectionRef &c,
932 uint64_t offset, uint64_t length,
933 old_extent_map_t *old_extents);
934
935 /// put new lextent into lextent_map overwriting existing ones if
936 /// any and update references accordingly
937 Extent *set_lextent(CollectionRef &c,
938 uint64_t logical_offset,
939 uint64_t offset, uint64_t length,
940 BlobRef b,
941 old_extent_map_t *old_extents);
942
943 /// split a blob (and referring extents)
944 BlobRef split_blob(BlobRef lb, uint32_t blob_offset, uint32_t pos);
945 };
946
947 /// Compressed Blob Garbage collector
948 /*
949 The primary idea of the collector is to estimate a difference between
950 allocation units(AU) currently present for compressed blobs and new AUs
951 required to store that data uncompressed.
952 Estimation is performed for protrusive extents within a logical range
953 determined by a concatenation of old_extents collection and specific(current)
954 write request.
955 The root cause for old_extents use is the need to handle blob ref counts
956 properly. Old extents still hold blob refs and hence we need to traverse
957 the collection to determine if blob to be released.
958 Protrusive extents are extents that fit into the blob set in action
959 (ones that are below the logical range from above) but not removed totally
960 due to the current write.
961 E.g. for
962 extent1 <loffs = 100, boffs = 100, len = 100> ->
963 blob1<compressed, len_on_disk=4096, logical_len=8192>
964 extent2 <loffs = 200, boffs = 200, len = 100> ->
965 blob2<raw, len_on_disk=4096, llen=4096>
966 extent3 <loffs = 300, boffs = 300, len = 100> ->
967 blob1<compressed, len_on_disk=4096, llen=8192>
968 extent4 <loffs = 4096, boffs = 0, len = 100> ->
969 blob3<raw, len_on_disk=4096, llen=4096>
970 write(300~100)
971 protrusive extents are within the following ranges <0~300, 400~8192-400>
972 In this case existing AUs that might be removed due to GC (i.e. blob1)
973 use 2x4K bytes.
974 And new AUs expected after GC = 0 since extent1 to be merged into blob2.
975 Hence we should do a collect.
976 */
977 class GarbageCollector
978 {
979 public:
980 /// return amount of allocation units that might be saved due to GC
981 int64_t estimate(
982 uint64_t offset,
983 uint64_t length,
984 const ExtentMap& extent_map,
985 const old_extent_map_t& old_extents,
986 uint64_t min_alloc_size);
987
988 /// return a collection of extents to perform GC on
989 const interval_set<uint64_t>& get_extents_to_collect() const {
990 return extents_to_collect;
991 }
992 GarbageCollector(CephContext* _cct) : cct(_cct) {}
993
994 private:
995 struct BlobInfo {
996 uint64_t referenced_bytes = 0; ///< amount of bytes referenced in blob
997 int64_t expected_allocations = 0; ///< new alloc units required
998 ///< in case of gc fulfilled
999 bool collect_candidate = false; ///< indicate if blob has any extents
1000 ///< eligible for GC.
1001 extent_map_t::const_iterator first_lextent; ///< points to the first
1002 ///< lextent referring to
1003 ///< the blob if any.
1004 ///< collect_candidate flag
1005 ///< determines the validity
1006 extent_map_t::const_iterator last_lextent; ///< points to the last
1007 ///< lextent referring to
1008 ///< the blob if any.
1009
1010 BlobInfo(uint64_t ref_bytes) :
1011 referenced_bytes(ref_bytes) {
1012 }
1013 };
1014 CephContext* cct;
1015 map<Blob*, BlobInfo> affected_blobs; ///< compressed blobs and their ref_map
1016 ///< copies that are affected by the
1017 ///< specific write
1018
1019 ///< protrusive extents that should be collected if GC takes place
1020 interval_set<uint64_t> extents_to_collect;
1021
1022 boost::optional<uint64_t > used_alloc_unit; ///< last processed allocation
1023 ///< unit when traversing
1024 ///< protrusive extents.
1025 ///< Other extents mapped to
1026 ///< this AU to be ignored
1027 ///< (except the case where
1028 ///< uncompressed extent follows
1029 ///< compressed one - see below).
1030 BlobInfo* blob_info_counted = nullptr; ///< set if previous allocation unit
1031 ///< caused expected_allocations
1032 ///< counter increment at this blob.
1033 ///< if uncompressed extent follows
1034 ///< a decrement for the
1035 ///< expected_allocations counter
1036 ///< is needed
1037 int64_t expected_allocations = 0; ///< new alloc units required in case
1038 ///< of gc fulfilled
1039 int64_t expected_for_release = 0; ///< alloc units currently used by
1040 ///< compressed blobs that might
1041 ///< gone after GC
1042
1043 protected:
1044 void process_protrusive_extents(const BlueStore::ExtentMap& extent_map,
1045 uint64_t start_offset,
1046 uint64_t end_offset,
1047 uint64_t start_touch_offset,
1048 uint64_t end_touch_offset,
1049 uint64_t min_alloc_size);
1050 };
1051
1052 struct OnodeSpace;
1053 struct OnodeCacheShard;
1054 /// an in-memory object
1055 struct Onode {
1056 MEMPOOL_CLASS_HELPERS();
1057 // Not persisted and updated on cache insertion/removal
1058 OnodeCacheShard *s;
1059 bool pinned = false; // Only to be used by the onode cache shard
1060
1061 std::atomic_int nref; ///< reference count
1062 Collection *c;
1063 ghobject_t oid;
1064
1065 /// key under PREFIX_OBJ where we are stored
1066 mempool::bluestore_cache_other::string key;
1067
1068 boost::intrusive::list_member_hook<> lru_item, pin_item;
1069
1070 bluestore_onode_t onode; ///< metadata stored as value in kv store
1071 bool exists; ///< true if object logically exists
1072
1073 ExtentMap extent_map;
1074
1075 // track txc's that have not been committed to kv store (and whose
1076 // effects cannot be read via the kvdb read methods)
1077 std::atomic<int> flushing_count = {0};
1078 std::atomic<int> waiting_count = {0};
1079 /// protect flush_txns
1080 ceph::mutex flush_lock = ceph::make_mutex("BlueStore::Onode::flush_lock");
1081 ceph::condition_variable flush_cond; ///< wait here for uncommitted txns
1082
1083 Onode(Collection *c, const ghobject_t& o,
1084 const mempool::bluestore_cache_other::string& k)
1085 : s(nullptr),
1086 nref(0),
1087 c(c),
1088 oid(o),
1089 key(k),
1090 exists(false),
1091 extent_map(this) {
1092 }
1093 Onode(Collection* c, const ghobject_t& o,
1094 const string& k)
1095 : s(nullptr),
1096 nref(0),
1097 c(c),
1098 oid(o),
1099 key(k),
1100 exists(false),
1101 extent_map(this) {
1102 }
1103 Onode(Collection* c, const ghobject_t& o,
1104 const char* k)
1105 : s(nullptr),
1106 nref(0),
1107 c(c),
1108 oid(o),
1109 key(k),
1110 exists(false),
1111 extent_map(this) {
1112 }
1113
1114 static Onode* decode(
1115 CollectionRef c,
1116 const ghobject_t& oid,
1117 const string& key,
1118 const bufferlist& v);
1119
1120 void dump(Formatter* f) const;
1121
1122 void flush();
1123 void get() {
1124 if (++nref == 2 && s != nullptr) {
1125 s->pin(*this);
1126 }
1127 }
1128 void put() {
1129 int n = --nref;
1130 if (n == 1 && s != nullptr) {
1131 s->unpin(*this);
1132 }
1133 if (n == 0) {
1134 delete this;
1135 }
1136 }
1137
1138 const string& get_omap_prefix();
1139 void get_omap_header(string *out);
1140 void get_omap_key(const string& key, string *out);
1141 void rewrite_omap_key(const string& old, string *out);
1142 void get_omap_tail(string *out);
1143 void decode_omap_key(const string& key, string *user_key);
1144 };
1145 typedef boost::intrusive_ptr<Onode> OnodeRef;
1146
1147 /// A generic Cache Shard
1148 struct CacheShard {
1149 CephContext *cct;
1150 PerfCounters *logger;
1151
1152 /// protect lru and other structures
1153 ceph::recursive_mutex lock = {
1154 ceph::make_recursive_mutex("BlueStore::CacheShard::lock") };
1155
1156 std::atomic<uint64_t> max = {0};
1157 std::atomic<uint64_t> num = {0};
1158
1159 CacheShard(CephContext* cct) : cct(cct), logger(nullptr) {}
1160 virtual ~CacheShard() {}
1161
1162 void set_max(uint64_t max_) {
1163 max = max_;
1164 }
1165
1166 uint64_t _get_num() {
1167 return num;
1168 }
1169
1170 virtual void _trim_to(uint64_t new_size) = 0;
1171 void _trim() {
1172 if (cct->_conf->objectstore_blackhole) {
1173 // do not trim if we are throwing away IOs a layer down
1174 return;
1175 }
1176 _trim_to(max);
1177 }
1178
1179 void trim() {
1180 std::lock_guard l(lock);
1181 _trim();
1182 }
1183 void flush() {
1184 std::lock_guard l(lock);
1185 // we should not be shutting down after the blackhole is enabled
1186 assert(!cct->_conf->objectstore_blackhole);
1187 _trim_to(0);
1188 }
1189
1190 #ifdef DEBUG_CACHE
1191 virtual void _audit(const char *s) = 0;
1192 #else
1193 void _audit(const char *s) { /* no-op */ }
1194 #endif
1195 };
1196
1197 /// A Generic onode Cache Shard
1198 struct OnodeCacheShard : public CacheShard {
1199 std::atomic<uint64_t> num_pinned = {0};
1200
1201 std::array<std::pair<ghobject_t, mono_clock::time_point>, 64> dumped_onodes;
1202 public:
1203 OnodeCacheShard(CephContext* cct) : CacheShard(cct) {}
1204 static OnodeCacheShard *create(CephContext* cct, string type,
1205 PerfCounters *logger);
1206 virtual void _add(OnodeRef& o, int level) = 0;
1207 virtual void _rm(OnodeRef& o) = 0;
1208 virtual void _touch(OnodeRef& o) = 0;
1209 virtual void _pin(Onode& o) = 0;
1210 virtual void _unpin(Onode& o) = 0;
1211
1212 void pin(Onode& o) {
1213 std::lock_guard l(lock);
1214 _pin(o);
1215 }
1216
1217 void unpin(Onode& o) {
1218 std::lock_guard l(lock);
1219 _unpin(o);
1220 }
1221
1222 virtual void add_stats(uint64_t *onodes, uint64_t *pinned_onodes) = 0;
1223 bool empty() {
1224 return _get_num() == 0;
1225 }
1226 };
1227
1228 /// A Generic buffer Cache Shard
1229 struct BufferCacheShard : public CacheShard {
1230 std::atomic<uint64_t> num_extents = {0};
1231 std::atomic<uint64_t> num_blobs = {0};
1232 uint64_t buffer_bytes = 0;
1233
1234 public:
1235 BufferCacheShard(CephContext* cct) : CacheShard(cct) {}
1236 static BufferCacheShard *create(CephContext* cct, string type,
1237 PerfCounters *logger);
1238 virtual void _add(Buffer *b, int level, Buffer *near) = 0;
1239 virtual void _rm(Buffer *b) = 0;
1240 virtual void _move(BufferCacheShard *src, Buffer *b) = 0;
1241 virtual void _touch(Buffer *b) = 0;
1242 virtual void _adjust_size(Buffer *b, int64_t delta) = 0;
1243
1244 uint64_t _get_bytes() {
1245 return buffer_bytes;
1246 }
1247
1248 void add_extent() {
1249 ++num_extents;
1250 }
1251 void rm_extent() {
1252 --num_extents;
1253 }
1254
1255 void add_blob() {
1256 ++num_blobs;
1257 }
1258 void rm_blob() {
1259 --num_blobs;
1260 }
1261
1262 virtual void add_stats(uint64_t *extents,
1263 uint64_t *blobs,
1264 uint64_t *buffers,
1265 uint64_t *bytes) = 0;
1266
1267 bool empty() {
1268 std::lock_guard l(lock);
1269 return _get_bytes() == 0;
1270 }
1271 };
1272
1273 struct OnodeSpace {
1274 OnodeCacheShard *cache;
1275
1276 private:
1277 /// forward lookups
1278 mempool::bluestore_cache_other::unordered_map<ghobject_t,OnodeRef> onode_map;
1279
1280 friend class Collection; // for split_cache()
1281
1282 public:
1283 OnodeSpace(OnodeCacheShard *c) : cache(c) {}
1284 ~OnodeSpace() {
1285 clear();
1286 }
1287
1288 OnodeRef add(const ghobject_t& oid, OnodeRef o);
1289 OnodeRef lookup(const ghobject_t& o);
1290 void remove(const ghobject_t& oid) {
1291 onode_map.erase(oid);
1292 }
1293 void rename(OnodeRef& o, const ghobject_t& old_oid,
1294 const ghobject_t& new_oid,
1295 const mempool::bluestore_cache_other::string& new_okey);
1296 void clear();
1297 bool empty();
1298
1299 template <int LogLevelV>
1300 void dump(CephContext *cct);
1301
1302 /// return true if f true for any item
1303 bool map_any(std::function<bool(OnodeRef)> f);
1304 };
1305
1306 class OpSequencer;
1307 using OpSequencerRef = ceph::ref_t<OpSequencer>;
1308
1309 struct Collection : public CollectionImpl {
1310 BlueStore *store;
1311 OpSequencerRef osr;
1312 BufferCacheShard *cache; ///< our cache shard
1313 bluestore_cnode_t cnode;
1314 ceph::shared_mutex lock =
1315 ceph::make_shared_mutex("BlueStore::Collection::lock", true, false);
1316
1317 bool exists;
1318
1319 SharedBlobSet shared_blob_set; ///< open SharedBlobs
1320
1321 // cache onodes on a per-collection basis to avoid lock
1322 // contention.
1323 OnodeSpace onode_map;
1324
1325 //pool options
1326 pool_opts_t pool_opts;
1327 ContextQueue *commit_queue;
1328
1329 OnodeRef get_onode(const ghobject_t& oid, bool create, bool is_createop=false);
1330
1331 // the terminology is confusing here, sorry!
1332 //
1333 // blob_t shared_blob_t
1334 // !shared unused -> open
1335 // shared !loaded -> open + shared
1336 // shared loaded -> open + shared + loaded
1337 //
1338 // i.e.,
1339 // open = SharedBlob is instantiated
1340 // shared = blob_t shared flag is set; SharedBlob is hashed.
1341 // loaded = SharedBlob::shared_blob_t is loaded from kv store
1342 void open_shared_blob(uint64_t sbid, BlobRef b);
1343 void load_shared_blob(SharedBlobRef sb);
1344 void make_blob_shared(uint64_t sbid, BlobRef b);
1345 uint64_t make_blob_unshared(SharedBlob *sb);
1346
1347 BlobRef new_blob() {
1348 BlobRef b = new Blob();
1349 b->shared_blob = new SharedBlob(this);
1350 return b;
1351 }
1352
1353 bool contains(const ghobject_t& oid) {
1354 if (cid.is_meta())
1355 return oid.hobj.pool == -1;
1356 spg_t spgid;
1357 if (cid.is_pg(&spgid))
1358 return
1359 spgid.pgid.contains(cnode.bits, oid) &&
1360 oid.shard_id == spgid.shard;
1361 return false;
1362 }
1363
1364 int64_t pool() const {
1365 return cid.pool();
1366 }
1367
1368 void split_cache(Collection *dest);
1369
1370 bool flush_commit(Context *c) override;
1371 void flush() override;
1372 void flush_all_but_last();
1373
1374 Collection(BlueStore *ns, OnodeCacheShard *oc, BufferCacheShard *bc, coll_t c);
1375 };
1376
1377 class OmapIteratorImpl : public ObjectMap::ObjectMapIteratorImpl {
1378 CollectionRef c;
1379 OnodeRef o;
1380 KeyValueDB::Iterator it;
1381 string head, tail;
1382
1383 string _stringify() const;
1384
1385 public:
1386 OmapIteratorImpl(CollectionRef c, OnodeRef o, KeyValueDB::Iterator it);
1387 int seek_to_first() override;
1388 int upper_bound(const string &after) override;
1389 int lower_bound(const string &to) override;
1390 bool valid() override;
1391 int next() override;
1392 string key() override;
1393 bufferlist value() override;
1394 std::string tail_key() {
1395 return tail;
1396 }
1397
1398 int status() override {
1399 return 0;
1400 }
1401 };
1402
1403 struct volatile_statfs{
1404 enum {
1405 STATFS_ALLOCATED = 0,
1406 STATFS_STORED,
1407 STATFS_COMPRESSED_ORIGINAL,
1408 STATFS_COMPRESSED,
1409 STATFS_COMPRESSED_ALLOCATED,
1410 STATFS_LAST
1411 };
1412 int64_t values[STATFS_LAST];
1413 volatile_statfs() {
1414 memset(this, 0, sizeof(volatile_statfs));
1415 }
1416 void reset() {
1417 *this = volatile_statfs();
1418 }
1419 void publish(store_statfs_t* buf) const {
1420 buf->allocated = allocated();
1421 buf->data_stored = stored();
1422 buf->data_compressed = compressed();
1423 buf->data_compressed_original = compressed_original();
1424 buf->data_compressed_allocated = compressed_allocated();
1425 }
1426
1427 volatile_statfs& operator+=(const volatile_statfs& other) {
1428 for (size_t i = 0; i < STATFS_LAST; ++i) {
1429 values[i] += other.values[i];
1430 }
1431 return *this;
1432 }
1433 int64_t& allocated() {
1434 return values[STATFS_ALLOCATED];
1435 }
1436 int64_t& stored() {
1437 return values[STATFS_STORED];
1438 }
1439 int64_t& compressed_original() {
1440 return values[STATFS_COMPRESSED_ORIGINAL];
1441 }
1442 int64_t& compressed() {
1443 return values[STATFS_COMPRESSED];
1444 }
1445 int64_t& compressed_allocated() {
1446 return values[STATFS_COMPRESSED_ALLOCATED];
1447 }
1448 int64_t allocated() const {
1449 return values[STATFS_ALLOCATED];
1450 }
1451 int64_t stored() const {
1452 return values[STATFS_STORED];
1453 }
1454 int64_t compressed_original() const {
1455 return values[STATFS_COMPRESSED_ORIGINAL];
1456 }
1457 int64_t compressed() const {
1458 return values[STATFS_COMPRESSED];
1459 }
1460 int64_t compressed_allocated() const {
1461 return values[STATFS_COMPRESSED_ALLOCATED];
1462 }
1463 volatile_statfs& operator=(const store_statfs_t& st) {
1464 values[STATFS_ALLOCATED] = st.allocated;
1465 values[STATFS_STORED] = st.data_stored;
1466 values[STATFS_COMPRESSED_ORIGINAL] = st.data_compressed_original;
1467 values[STATFS_COMPRESSED] = st.data_compressed;
1468 values[STATFS_COMPRESSED_ALLOCATED] = st.data_compressed_allocated;
1469 return *this;
1470 }
1471 bool is_empty() {
1472 return values[STATFS_ALLOCATED] == 0 &&
1473 values[STATFS_STORED] == 0 &&
1474 values[STATFS_COMPRESSED] == 0 &&
1475 values[STATFS_COMPRESSED_ORIGINAL] == 0 &&
1476 values[STATFS_COMPRESSED_ALLOCATED] == 0;
1477 }
1478 void decode(bufferlist::const_iterator& it) {
1479 using ceph::decode;
1480 for (size_t i = 0; i < STATFS_LAST; i++) {
1481 decode(values[i], it);
1482 }
1483 }
1484
1485 void encode(bufferlist& bl) {
1486 using ceph::encode;
1487 for (size_t i = 0; i < STATFS_LAST; i++) {
1488 encode(values[i], bl);
1489 }
1490 }
1491 };
1492
1493 struct TransContext final : public AioContext {
1494 MEMPOOL_CLASS_HELPERS();
1495
1496 typedef enum {
1497 STATE_PREPARE,
1498 STATE_AIO_WAIT,
1499 STATE_IO_DONE,
1500 STATE_KV_QUEUED, // queued for kv_sync_thread submission
1501 STATE_KV_SUBMITTED, // submitted to kv; not yet synced
1502 STATE_KV_DONE,
1503 STATE_DEFERRED_QUEUED, // in deferred_queue (pending or running)
1504 STATE_DEFERRED_CLEANUP, // remove deferred kv record
1505 STATE_DEFERRED_DONE,
1506 STATE_FINISHING,
1507 STATE_DONE,
1508 } state_t;
1509
1510 state_t state = STATE_PREPARE;
1511
1512 const char *get_state_name() {
1513 switch (state) {
1514 case STATE_PREPARE: return "prepare";
1515 case STATE_AIO_WAIT: return "aio_wait";
1516 case STATE_IO_DONE: return "io_done";
1517 case STATE_KV_QUEUED: return "kv_queued";
1518 case STATE_KV_SUBMITTED: return "kv_submitted";
1519 case STATE_KV_DONE: return "kv_done";
1520 case STATE_DEFERRED_QUEUED: return "deferred_queued";
1521 case STATE_DEFERRED_CLEANUP: return "deferred_cleanup";
1522 case STATE_DEFERRED_DONE: return "deferred_done";
1523 case STATE_FINISHING: return "finishing";
1524 case STATE_DONE: return "done";
1525 }
1526 return "???";
1527 }
1528
1529 #if defined(WITH_LTTNG)
1530 const char *get_state_latency_name(int state) {
1531 switch (state) {
1532 case l_bluestore_state_prepare_lat: return "prepare";
1533 case l_bluestore_state_aio_wait_lat: return "aio_wait";
1534 case l_bluestore_state_io_done_lat: return "io_done";
1535 case l_bluestore_state_kv_queued_lat: return "kv_queued";
1536 case l_bluestore_state_kv_committing_lat: return "kv_committing";
1537 case l_bluestore_state_kv_done_lat: return "kv_done";
1538 case l_bluestore_state_deferred_queued_lat: return "deferred_queued";
1539 case l_bluestore_state_deferred_cleanup_lat: return "deferred_cleanup";
1540 case l_bluestore_state_finishing_lat: return "finishing";
1541 case l_bluestore_state_done_lat: return "done";
1542 }
1543 return "???";
1544 }
1545 #endif
1546
1547 CollectionRef ch;
1548 OpSequencerRef osr; // this should be ch->osr
1549 boost::intrusive::list_member_hook<> sequencer_item;
1550
1551 uint64_t bytes = 0, ios = 0, cost = 0;
1552
1553 set<OnodeRef> onodes; ///< these need to be updated/written
1554 set<OnodeRef> modified_objects; ///< objects we modified (and need a ref)
1555 set<SharedBlobRef> shared_blobs; ///< these need to be updated/written
1556 set<SharedBlobRef> shared_blobs_written; ///< update these on io completion
1557
1558 KeyValueDB::Transaction t; ///< then we will commit this
1559 list<Context*> oncommits; ///< more commit completions
1560 list<CollectionRef> removed_collections; ///< colls we removed
1561
1562 boost::intrusive::list_member_hook<> deferred_queue_item;
1563 bluestore_deferred_transaction_t *deferred_txn = nullptr; ///< if any
1564
1565 interval_set<uint64_t> allocated, released;
1566 volatile_statfs statfs_delta; ///< overall store statistics delta
1567 uint64_t osd_pool_id = META_POOL_ID; ///< osd pool id we're operating on
1568
1569 IOContext ioc;
1570 bool had_ios = false; ///< true if we submitted IOs before our kv txn
1571
1572 uint64_t seq = 0;
1573 mono_clock::time_point start;
1574 mono_clock::time_point last_stamp;
1575
1576 uint64_t last_nid = 0; ///< if non-zero, highest new nid we allocated
1577 uint64_t last_blobid = 0; ///< if non-zero, highest new blobid we allocated
1578
1579 #if defined(WITH_LTTNG)
1580 bool tracing = false;
1581 #endif
1582
1583 explicit TransContext(CephContext* cct, Collection *c, OpSequencer *o,
1584 list<Context*> *on_commits)
1585 : ch(c),
1586 osr(o),
1587 ioc(cct, this),
1588 start(mono_clock::now()) {
1589 last_stamp = start;
1590 if (on_commits) {
1591 oncommits.swap(*on_commits);
1592 }
1593 }
1594 ~TransContext() {
1595 delete deferred_txn;
1596 }
1597
1598 void write_onode(OnodeRef &o) {
1599 onodes.insert(o);
1600 }
1601 void write_shared_blob(SharedBlobRef &sb) {
1602 shared_blobs.insert(sb);
1603 }
1604 void unshare_blob(SharedBlob *sb) {
1605 shared_blobs.erase(sb);
1606 }
1607
1608 /// note we logically modified object (when onode itself is unmodified)
1609 void note_modified_object(OnodeRef &o) {
1610 // onode itself isn't written, though
1611 modified_objects.insert(o);
1612 }
1613 void note_removed_object(OnodeRef& o) {
1614 onodes.erase(o);
1615 modified_objects.insert(o);
1616 }
1617
1618 void aio_finish(BlueStore *store) override {
1619 store->txc_aio_finish(this);
1620 }
1621 };
1622
1623 class BlueStoreThrottle {
1624 #if defined(WITH_LTTNG)
1625 const std::chrono::time_point<mono_clock> time_base = mono_clock::now();
1626
1627 // Time of last chosen io (microseconds)
1628 std::atomic<uint64_t> previous_emitted_tp_time_mono_mcs = {0};
1629 std::atomic<uint64_t> ios_started_since_last_traced = {0};
1630 std::atomic<uint64_t> ios_completed_since_last_traced = {0};
1631
1632 std::atomic_uint pending_kv_ios = {0};
1633 std::atomic_uint pending_deferred_ios = {0};
1634
1635 // Min period between trace points (microseconds)
1636 std::atomic<uint64_t> trace_period_mcs = {0};
1637
1638 bool should_trace(
1639 uint64_t *started,
1640 uint64_t *completed) {
1641 uint64_t min_period_mcs = trace_period_mcs.load(
1642 std::memory_order_relaxed);
1643
1644 if (min_period_mcs == 0) {
1645 *started = 1;
1646 *completed = ios_completed_since_last_traced.exchange(0);
1647 return true;
1648 } else {
1649 ios_started_since_last_traced++;
1650 auto now_mcs = ceph::to_microseconds<uint64_t>(
1651 mono_clock::now() - time_base);
1652 uint64_t previous_mcs = previous_emitted_tp_time_mono_mcs;
1653 uint64_t period_mcs = now_mcs - previous_mcs;
1654 if (period_mcs > min_period_mcs) {
1655 if (previous_emitted_tp_time_mono_mcs.compare_exchange_strong(
1656 previous_mcs, now_mcs)) {
1657 // This would be racy at a sufficiently extreme trace rate, but isn't
1658 // worth the overhead of doing it more carefully.
1659 *started = ios_started_since_last_traced.exchange(0);
1660 *completed = ios_completed_since_last_traced.exchange(0);
1661 return true;
1662 }
1663 }
1664 return false;
1665 }
1666 }
1667 #endif
1668
1669 #if defined(WITH_LTTNG)
1670 void emit_initial_tracepoint(
1671 KeyValueDB &db,
1672 TransContext &txc,
1673 mono_clock::time_point);
1674 #else
1675 void emit_initial_tracepoint(
1676 KeyValueDB &db,
1677 TransContext &txc,
1678 mono_clock::time_point) {}
1679 #endif
1680
1681 Throttle throttle_bytes; ///< submit to commit
1682 Throttle throttle_deferred_bytes; ///< submit to deferred complete
1683
1684 public:
1685 BlueStoreThrottle(CephContext *cct) :
1686 throttle_bytes(cct, "bluestore_throttle_bytes", 0),
1687 throttle_deferred_bytes(cct, "bluestore_throttle_deferred_bytes", 0)
1688 {
1689 reset_throttle(cct->_conf);
1690 }
1691
1692 #if defined(WITH_LTTNG)
1693 void complete_kv(TransContext &txc);
1694 void complete(TransContext &txc);
1695 #else
1696 void complete_kv(TransContext &txc) {}
1697 void complete(TransContext &txc) {}
1698 #endif
1699
1700 mono_clock::duration log_state_latency(
1701 TransContext &txc, PerfCounters *logger, int state);
1702 bool try_start_transaction(
1703 KeyValueDB &db,
1704 TransContext &txc,
1705 mono_clock::time_point);
1706 void finish_start_transaction(
1707 KeyValueDB &db,
1708 TransContext &txc,
1709 mono_clock::time_point);
1710 void release_kv_throttle(uint64_t cost) {
1711 throttle_bytes.put(cost);
1712 }
1713 void release_deferred_throttle(uint64_t cost) {
1714 throttle_deferred_bytes.put(cost);
1715 }
1716 bool should_submit_deferred() {
1717 return throttle_deferred_bytes.past_midpoint();
1718 }
1719 void reset_throttle(const ConfigProxy &conf) {
1720 throttle_bytes.reset_max(conf->bluestore_throttle_bytes);
1721 throttle_deferred_bytes.reset_max(
1722 conf->bluestore_throttle_bytes +
1723 conf->bluestore_throttle_deferred_bytes);
1724 #if defined(WITH_LTTNG)
1725 double rate = conf.get_val<double>("bluestore_throttle_trace_rate");
1726 trace_period_mcs = rate > 0 ? floor((1/rate) * 1000000.0) : 0;
1727 #endif
1728 }
1729 } throttle;
1730
1731 typedef boost::intrusive::list<
1732 TransContext,
1733 boost::intrusive::member_hook<
1734 TransContext,
1735 boost::intrusive::list_member_hook<>,
1736 &TransContext::deferred_queue_item> > deferred_queue_t;
1737
1738 struct DeferredBatch final : public AioContext {
1739 OpSequencer *osr;
1740 struct deferred_io {
1741 bufferlist bl; ///< data
1742 uint64_t seq; ///< deferred transaction seq
1743 };
1744 map<uint64_t,deferred_io> iomap; ///< map of ios in this batch
1745 deferred_queue_t txcs; ///< txcs in this batch
1746 IOContext ioc; ///< our aios
1747 /// bytes of pending io for each deferred seq (may be 0)
1748 map<uint64_t,int> seq_bytes;
1749
1750 void _discard(CephContext *cct, uint64_t offset, uint64_t length);
1751 void _audit(CephContext *cct);
1752
1753 DeferredBatch(CephContext *cct, OpSequencer *osr)
1754 : osr(osr), ioc(cct, this) {}
1755
1756 /// prepare a write
1757 void prepare_write(CephContext *cct,
1758 uint64_t seq, uint64_t offset, uint64_t length,
1759 bufferlist::const_iterator& p);
1760
1761 void aio_finish(BlueStore *store) override {
1762 store->_deferred_aio_finish(osr);
1763 }
1764 };
1765
1766 class OpSequencer : public RefCountedObject {
1767 public:
1768 ceph::mutex qlock = ceph::make_mutex("BlueStore::OpSequencer::qlock");
1769 ceph::condition_variable qcond;
1770 typedef boost::intrusive::list<
1771 TransContext,
1772 boost::intrusive::member_hook<
1773 TransContext,
1774 boost::intrusive::list_member_hook<>,
1775 &TransContext::sequencer_item> > q_list_t;
1776 q_list_t q; ///< transactions
1777
1778 boost::intrusive::list_member_hook<> deferred_osr_queue_item;
1779
1780 DeferredBatch *deferred_running = nullptr;
1781 DeferredBatch *deferred_pending = nullptr;
1782
1783 BlueStore *store;
1784 coll_t cid;
1785
1786 uint64_t last_seq = 0;
1787
1788 std::atomic_int txc_with_unstable_io = {0}; ///< num txcs with unstable io
1789
1790 std::atomic_int kv_committing_serially = {0};
1791
1792 std::atomic_int kv_submitted_waiters = {0};
1793
1794 std::atomic_bool zombie = {false}; ///< in zombie_osr set (collection going away)
1795
1796 const uint32_t sequencer_id;
1797
1798 uint32_t get_sequencer_id() const {
1799 return sequencer_id;
1800 }
1801
1802 void queue_new(TransContext *txc) {
1803 std::lock_guard l(qlock);
1804 txc->seq = ++last_seq;
1805 q.push_back(*txc);
1806 }
1807
1808 void drain() {
1809 std::unique_lock l(qlock);
1810 while (!q.empty())
1811 qcond.wait(l);
1812 }
1813
1814 void drain_preceding(TransContext *txc) {
1815 std::unique_lock l(qlock);
1816 while (&q.front() != txc)
1817 qcond.wait(l);
1818 }
1819
1820 bool _is_all_kv_submitted() {
1821 // caller must hold qlock & q.empty() must not empty
1822 ceph_assert(!q.empty());
1823 TransContext *txc = &q.back();
1824 if (txc->state >= TransContext::STATE_KV_SUBMITTED) {
1825 return true;
1826 }
1827 return false;
1828 }
1829
1830 void flush() {
1831 std::unique_lock l(qlock);
1832 while (true) {
1833 // set flag before the check because the condition
1834 // may become true outside qlock, and we need to make
1835 // sure those threads see waiters and signal qcond.
1836 ++kv_submitted_waiters;
1837 if (q.empty() || _is_all_kv_submitted()) {
1838 --kv_submitted_waiters;
1839 return;
1840 }
1841 qcond.wait(l);
1842 --kv_submitted_waiters;
1843 }
1844 }
1845
1846 void flush_all_but_last() {
1847 std::unique_lock l(qlock);
1848 assert (q.size() >= 1);
1849 while (true) {
1850 // set flag before the check because the condition
1851 // may become true outside qlock, and we need to make
1852 // sure those threads see waiters and signal qcond.
1853 ++kv_submitted_waiters;
1854 if (q.size() <= 1) {
1855 --kv_submitted_waiters;
1856 return;
1857 } else {
1858 auto it = q.rbegin();
1859 it++;
1860 if (it->state >= TransContext::STATE_KV_SUBMITTED) {
1861 --kv_submitted_waiters;
1862 return;
1863 }
1864 }
1865 qcond.wait(l);
1866 --kv_submitted_waiters;
1867 }
1868 }
1869
1870 bool flush_commit(Context *c) {
1871 std::lock_guard l(qlock);
1872 if (q.empty()) {
1873 return true;
1874 }
1875 TransContext *txc = &q.back();
1876 if (txc->state >= TransContext::STATE_KV_DONE) {
1877 return true;
1878 }
1879 txc->oncommits.push_back(c);
1880 return false;
1881 }
1882 private:
1883 FRIEND_MAKE_REF(OpSequencer);
1884 OpSequencer(BlueStore *store, uint32_t sequencer_id, const coll_t& c)
1885 : RefCountedObject(store->cct),
1886 store(store), cid(c), sequencer_id(sequencer_id) {
1887 }
1888 ~OpSequencer() {
1889 ceph_assert(q.empty());
1890 }
1891 };
1892
1893 typedef boost::intrusive::list<
1894 OpSequencer,
1895 boost::intrusive::member_hook<
1896 OpSequencer,
1897 boost::intrusive::list_member_hook<>,
1898 &OpSequencer::deferred_osr_queue_item> > deferred_osr_queue_t;
1899
1900 struct KVSyncThread : public Thread {
1901 BlueStore *store;
1902 explicit KVSyncThread(BlueStore *s) : store(s) {}
1903 void *entry() override {
1904 store->_kv_sync_thread();
1905 return NULL;
1906 }
1907 };
1908 struct KVFinalizeThread : public Thread {
1909 BlueStore *store;
1910 explicit KVFinalizeThread(BlueStore *s) : store(s) {}
1911 void *entry() {
1912 store->_kv_finalize_thread();
1913 return NULL;
1914 }
1915 };
1916
1917 struct DBHistogram {
1918 struct value_dist {
1919 uint64_t count;
1920 uint32_t max_len;
1921 };
1922
1923 struct key_dist {
1924 uint64_t count;
1925 uint32_t max_len;
1926 map<int, struct value_dist> val_map; ///< slab id to count, max length of value and key
1927 };
1928
1929 map<string, map<int, struct key_dist> > key_hist;
1930 map<int, uint64_t> value_hist;
1931 int get_key_slab(size_t sz);
1932 string get_key_slab_to_range(int slab);
1933 int get_value_slab(size_t sz);
1934 string get_value_slab_to_range(int slab);
1935 void update_hist_entry(map<string, map<int, struct key_dist> > &key_hist,
1936 const string &prefix, size_t key_size, size_t value_size);
1937 void dump(Formatter *f);
1938 };
1939
1940 // --------------------------------------------------------
1941 // members
1942 private:
1943 BlueFS *bluefs = nullptr;
1944 bluefs_layout_t bluefs_layout;
1945 mono_time bluefs_last_balance;
1946 utime_t next_dump_on_bluefs_alloc_failure;
1947
1948 KeyValueDB *db = nullptr;
1949 BlockDevice *bdev = nullptr;
1950 std::string freelist_type;
1951 FreelistManager *fm = nullptr;
1952 Allocator *alloc = nullptr;
1953 uuid_d fsid;
1954 int path_fd = -1; ///< open handle to $path
1955 int fsid_fd = -1; ///< open handle (locked) to $path/fsid
1956 bool mounted = false;
1957
1958 ceph::shared_mutex coll_lock = ceph::make_shared_mutex("BlueStore::coll_lock"); ///< rwlock to protect coll_map
1959 mempool::bluestore_cache_other::unordered_map<coll_t, CollectionRef> coll_map;
1960 bool collections_had_errors = false;
1961 map<coll_t,CollectionRef> new_coll_map;
1962
1963 vector<OnodeCacheShard*> onode_cache_shards;
1964 vector<BufferCacheShard*> buffer_cache_shards;
1965
1966 /// protect zombie_osr_set
1967 ceph::mutex zombie_osr_lock = ceph::make_mutex("BlueStore::zombie_osr_lock");
1968 uint32_t next_sequencer_id = 0;
1969 std::map<coll_t,OpSequencerRef> zombie_osr_set; ///< set of OpSequencers for deleted collections
1970
1971 std::atomic<uint64_t> nid_last = {0};
1972 std::atomic<uint64_t> nid_max = {0};
1973 std::atomic<uint64_t> blobid_last = {0};
1974 std::atomic<uint64_t> blobid_max = {0};
1975
1976 interval_set<uint64_t> bluefs_extents; ///< block extents owned by bluefs
1977 interval_set<uint64_t> bluefs_extents_reclaiming; ///< currently reclaiming
1978
1979 ceph::mutex deferred_lock = ceph::make_mutex("BlueStore::deferred_lock");
1980 std::atomic<uint64_t> deferred_seq = {0};
1981 deferred_osr_queue_t deferred_queue; ///< osr's with deferred io pending
1982 int deferred_queue_size = 0; ///< num txc's queued across all osrs
1983 atomic_int deferred_aggressive = {0}; ///< aggressive wakeup of kv thread
1984 Finisher finisher;
1985 utime_t deferred_last_submitted = utime_t();
1986
1987 KVSyncThread kv_sync_thread;
1988 ceph::mutex kv_lock = ceph::make_mutex("BlueStore::kv_lock");
1989 ceph::condition_variable kv_cond;
1990 bool _kv_only = false;
1991 bool kv_sync_started = false;
1992 bool kv_stop = false;
1993 bool kv_finalize_started = false;
1994 bool kv_finalize_stop = false;
1995 deque<TransContext*> kv_queue; ///< ready, already submitted
1996 deque<TransContext*> kv_queue_unsubmitted; ///< ready, need submit by kv thread
1997 deque<TransContext*> kv_committing; ///< currently syncing
1998 deque<DeferredBatch*> deferred_done_queue; ///< deferred ios done
1999 bool kv_sync_in_progress = false;
2000
2001 KVFinalizeThread kv_finalize_thread;
2002 ceph::mutex kv_finalize_lock = ceph::make_mutex("BlueStore::kv_finalize_lock");
2003 ceph::condition_variable kv_finalize_cond;
2004 deque<TransContext*> kv_committing_to_finalize; ///< pending finalization
2005 deque<DeferredBatch*> deferred_stable_to_finalize; ///< pending finalization
2006 bool kv_finalize_in_progress = false;
2007
2008 PerfCounters *logger = nullptr;
2009
2010 list<CollectionRef> removed_collections;
2011
2012 ceph::shared_mutex debug_read_error_lock =
2013 ceph::make_shared_mutex("BlueStore::debug_read_error_lock");
2014 set<ghobject_t> debug_data_error_objects;
2015 set<ghobject_t> debug_mdata_error_objects;
2016
2017 std::atomic<int> csum_type = {Checksummer::CSUM_CRC32C};
2018
2019 uint64_t block_size = 0; ///< block size of block device (power of 2)
2020 uint64_t block_mask = 0; ///< mask to get just the block offset
2021 size_t block_size_order = 0; ///< bits to shift to get block size
2022
2023 uint64_t min_alloc_size; ///< minimum allocation unit (power of 2)
2024 ///< bits for min_alloc_size
2025 uint8_t min_alloc_size_order = 0;
2026 static_assert(std::numeric_limits<uint8_t>::max() >
2027 std::numeric_limits<decltype(min_alloc_size)>::digits,
2028 "not enough bits for min_alloc_size");
2029
2030 bool per_pool_omap = false;
2031
2032 ///< maximum allocation unit (power of 2)
2033 std::atomic<uint64_t> max_alloc_size = {0};
2034
2035 ///< number threshold for forced deferred writes
2036 std::atomic<int> deferred_batch_ops = {0};
2037
2038 ///< size threshold for forced deferred writes
2039 std::atomic<uint64_t> prefer_deferred_size = {0};
2040
2041 ///< approx cost per io, in bytes
2042 std::atomic<uint64_t> throttle_cost_per_io = {0};
2043
2044 std::atomic<Compressor::CompressionMode> comp_mode =
2045 {Compressor::COMP_NONE}; ///< compression mode
2046 CompressorRef compressor;
2047 std::atomic<uint64_t> comp_min_blob_size = {0};
2048 std::atomic<uint64_t> comp_max_blob_size = {0};
2049
2050 std::atomic<uint64_t> max_blob_size = {0}; ///< maximum blob size
2051
2052 uint64_t kv_ios = 0;
2053 uint64_t kv_throttle_costs = 0;
2054
2055 // cache trim control
2056 uint64_t cache_size = 0; ///< total cache size
2057 double cache_meta_ratio = 0; ///< cache ratio dedicated to metadata
2058 double cache_kv_ratio = 0; ///< cache ratio dedicated to kv (e.g., rocksdb)
2059 double cache_data_ratio = 0; ///< cache ratio dedicated to object data
2060 bool cache_autotune = false; ///< cache autotune setting
2061 double cache_autotune_interval = 0; ///< time to wait between cache rebalancing
2062 uint64_t osd_memory_target = 0; ///< OSD memory target when autotuning cache
2063 uint64_t osd_memory_base = 0; ///< OSD base memory when autotuning cache
2064 double osd_memory_expected_fragmentation = 0; ///< expected memory fragmentation
2065 uint64_t osd_memory_cache_min = 0; ///< Min memory to assign when autotuning cache
2066 double osd_memory_cache_resize_interval = 0; ///< Time to wait between cache resizing
2067 double max_defer_interval = 0; ///< Time to wait between last deferred submit
2068 std::atomic<uint32_t> config_changed = {0}; ///< Counter to determine if there is a configuration change.
2069
2070 typedef map<uint64_t, volatile_statfs> osd_pools_map;
2071
2072 ceph::mutex vstatfs_lock = ceph::make_mutex("BlueStore::vstatfs_lock");
2073 volatile_statfs vstatfs;
2074 osd_pools_map osd_pools; // protected by vstatfs_lock as well
2075
2076 bool per_pool_stat_collection = true;
2077
2078 struct MempoolThread : public Thread {
2079 public:
2080 BlueStore *store;
2081
2082 ceph::condition_variable cond;
2083 ceph::mutex lock = ceph::make_mutex("BlueStore::MempoolThread::lock");
2084 bool stop = false;
2085 std::shared_ptr<PriorityCache::PriCache> binned_kv_cache = nullptr;
2086 std::shared_ptr<PriorityCache::Manager> pcm = nullptr;
2087
2088 struct MempoolCache : public PriorityCache::PriCache {
2089 BlueStore *store;
2090 int64_t cache_bytes[PriorityCache::Priority::LAST+1] = {0};
2091 int64_t committed_bytes = 0;
2092 double cache_ratio = 0;
2093
2094 MempoolCache(BlueStore *s) : store(s) {};
2095
2096 virtual uint64_t _get_used_bytes() const = 0;
2097
2098 virtual int64_t request_cache_bytes(
2099 PriorityCache::Priority pri, uint64_t total_cache) const {
2100 int64_t assigned = get_cache_bytes(pri);
2101
2102 switch (pri) {
2103 // All cache items are currently shoved into the PRI1 priority
2104 case PriorityCache::Priority::PRI1:
2105 {
2106 int64_t request = _get_used_bytes();
2107 return(request > assigned) ? request - assigned : 0;
2108 }
2109 default:
2110 break;
2111 }
2112 return -EOPNOTSUPP;
2113 }
2114
2115 virtual int64_t get_cache_bytes(PriorityCache::Priority pri) const {
2116 return cache_bytes[pri];
2117 }
2118 virtual int64_t get_cache_bytes() const {
2119 int64_t total = 0;
2120
2121 for (int i = 0; i < PriorityCache::Priority::LAST + 1; i++) {
2122 PriorityCache::Priority pri = static_cast<PriorityCache::Priority>(i);
2123 total += get_cache_bytes(pri);
2124 }
2125 return total;
2126 }
2127 virtual void set_cache_bytes(PriorityCache::Priority pri, int64_t bytes) {
2128 cache_bytes[pri] = bytes;
2129 }
2130 virtual void add_cache_bytes(PriorityCache::Priority pri, int64_t bytes) {
2131 cache_bytes[pri] += bytes;
2132 }
2133 virtual int64_t commit_cache_size(uint64_t total_cache) {
2134 committed_bytes = PriorityCache::get_chunk(
2135 get_cache_bytes(), total_cache);
2136 return committed_bytes;
2137 }
2138 virtual int64_t get_committed_size() const {
2139 return committed_bytes;
2140 }
2141 virtual double get_cache_ratio() const {
2142 return cache_ratio;
2143 }
2144 virtual void set_cache_ratio(double ratio) {
2145 cache_ratio = ratio;
2146 }
2147 virtual string get_cache_name() const = 0;
2148 };
2149
2150 struct MetaCache : public MempoolCache {
2151 MetaCache(BlueStore *s) : MempoolCache(s) {};
2152
2153 virtual uint64_t _get_used_bytes() const {
2154 return mempool::bluestore_cache_other::allocated_bytes() +
2155 mempool::bluestore_cache_onode::allocated_bytes();
2156 }
2157
2158 virtual string get_cache_name() const {
2159 return "BlueStore Meta Cache";
2160 }
2161
2162 uint64_t _get_num_onodes() const {
2163 uint64_t onode_num =
2164 mempool::bluestore_cache_onode::allocated_items();
2165 return (2 > onode_num) ? 2 : onode_num;
2166 }
2167
2168 double get_bytes_per_onode() const {
2169 return (double)_get_used_bytes() / (double)_get_num_onodes();
2170 }
2171 };
2172 std::shared_ptr<MetaCache> meta_cache;
2173
2174 struct DataCache : public MempoolCache {
2175 DataCache(BlueStore *s) : MempoolCache(s) {};
2176
2177 virtual uint64_t _get_used_bytes() const {
2178 uint64_t bytes = 0;
2179 for (auto i : store->buffer_cache_shards) {
2180 bytes += i->_get_bytes();
2181 }
2182 return bytes;
2183 }
2184 virtual string get_cache_name() const {
2185 return "BlueStore Data Cache";
2186 }
2187 };
2188 std::shared_ptr<DataCache> data_cache;
2189
2190 public:
2191 explicit MempoolThread(BlueStore *s)
2192 : store(s),
2193 meta_cache(new MetaCache(s)),
2194 data_cache(new DataCache(s)) {}
2195
2196 void *entry() override;
2197 void init() {
2198 ceph_assert(stop == false);
2199 create("bstore_mempool");
2200 }
2201 void shutdown() {
2202 lock.lock();
2203 stop = true;
2204 cond.notify_all();
2205 lock.unlock();
2206 join();
2207 }
2208
2209 private:
2210 void _adjust_cache_settings();
2211 void _update_cache_settings();
2212 void _resize_shards(bool interval_stats);
2213 } mempool_thread;
2214
2215 // --------------------------------------------------------
2216 // private methods
2217
2218 void _init_logger();
2219 void _shutdown_logger();
2220 int _reload_logger();
2221
2222 int _open_path();
2223 void _close_path();
2224 int _open_fsid(bool create);
2225 int _lock_fsid();
2226 int _read_fsid(uuid_d *f);
2227 int _write_fsid();
2228 void _close_fsid();
2229 void _set_alloc_sizes();
2230 void _set_blob_size();
2231 void _set_finisher_num();
2232 void _set_per_pool_omap();
2233 void _update_osd_memory_options();
2234
2235 int _open_bdev(bool create);
2236 // Verifies if disk space is enough for reserved + min bluefs
2237 // and alters the latter if needed.
2238 // Depends on min_alloc_size hence should be called after
2239 // its initialization (and outside of _open_bdev)
2240 void _validate_bdev();
2241 void _close_bdev();
2242
2243 int _minimal_open_bluefs(bool create);
2244 void _minimal_close_bluefs();
2245 int _open_bluefs(bool create);
2246 void _close_bluefs(bool cold_close);
2247
2248 // Limited (u)mount intended for BlueFS operations only
2249 int _mount_for_bluefs();
2250 void _umount_for_bluefs();
2251
2252
2253 int _is_bluefs(bool create, bool* ret);
2254 /*
2255 * opens both DB and dependant super_meta, FreelistManager and allocator
2256 * in the proper order
2257 */
2258 int _open_db_and_around(bool read_only);
2259 void _close_db_and_around(bool read_only);
2260
2261 // updates legacy bluefs related recs in DB to a state valid for
2262 // downgrades from nautilus.
2263 void _sync_bluefs_and_fm();
2264
2265 /*
2266 * @warning to_repair_db means that we open this db to repair it, will not
2267 * hold the rocksdb's file lock.
2268 */
2269 int _open_db(bool create,
2270 bool to_repair_db=false,
2271 bool read_only = false);
2272 void _close_db(bool read_only);
2273 int _open_fm(KeyValueDB::Transaction t, bool read_only);
2274 void _close_fm();
2275 int _write_out_fm_meta(uint64_t target_size,
2276 bool update_root_size = false,
2277 bluestore_bdev_label_t* res_label = nullptr);
2278 int _open_alloc();
2279 void _close_alloc();
2280 int _open_collections();
2281 void _fsck_collections(int64_t* errors);
2282 void _close_collections();
2283
2284 int _setup_block_symlink_or_file(string name, string path, uint64_t size,
2285 bool create);
2286
2287 public:
2288 utime_t get_deferred_last_submitted() {
2289 std::lock_guard l(deferred_lock);
2290 return deferred_last_submitted;
2291 }
2292
2293 static int _write_bdev_label(CephContext* cct,
2294 string path, bluestore_bdev_label_t label);
2295 static int _read_bdev_label(CephContext* cct, string path,
2296 bluestore_bdev_label_t *label);
2297 private:
2298 int _check_or_set_bdev_label(string path, uint64_t size, string desc,
2299 bool create);
2300
2301 int _open_super_meta();
2302
2303 void _open_statfs();
2304 void _get_statfs_overall(struct store_statfs_t *buf);
2305
2306 void _dump_alloc_on_failure();
2307
2308 int64_t _get_bluefs_size_delta(uint64_t bluefs_free, uint64_t bluefs_total);
2309 int _balance_bluefs_freespace();
2310
2311 CollectionRef _get_collection(const coll_t& cid);
2312 void _queue_reap_collection(CollectionRef& c);
2313 void _reap_collections();
2314 void _update_cache_logger();
2315
2316 void _assign_nid(TransContext *txc, OnodeRef o);
2317 uint64_t _assign_blobid(TransContext *txc);
2318
2319 template <int LogLevelV>
2320 friend void _dump_onode(CephContext *cct, const Onode& o);
2321 template <int LogLevelV>
2322 friend void _dump_extent_map(CephContext *cct, const ExtentMap& em);
2323 template <int LogLevelV>
2324 friend void _dump_transaction(CephContext *cct, Transaction *t);
2325
2326 TransContext *_txc_create(Collection *c, OpSequencer *osr,
2327 list<Context*> *on_commits);
2328 void _txc_update_store_statfs(TransContext *txc);
2329 void _txc_add_transaction(TransContext *txc, Transaction *t);
2330 void _txc_calc_cost(TransContext *txc);
2331 void _txc_write_nodes(TransContext *txc, KeyValueDB::Transaction t);
2332 void _txc_state_proc(TransContext *txc);
2333 void _txc_aio_submit(TransContext *txc);
2334 public:
2335 void txc_aio_finish(void *p) {
2336 _txc_state_proc(static_cast<TransContext*>(p));
2337 }
2338 private:
2339 void _txc_finish_io(TransContext *txc);
2340 void _txc_finalize_kv(TransContext *txc, KeyValueDB::Transaction t);
2341 void _txc_apply_kv(TransContext *txc, bool sync_submit_transaction);
2342 void _txc_committed_kv(TransContext *txc);
2343 void _txc_finish(TransContext *txc);
2344 void _txc_release_alloc(TransContext *txc);
2345
2346 void _osr_attach(Collection *c);
2347 void _osr_register_zombie(OpSequencer *osr);
2348 void _osr_drain(OpSequencer *osr);
2349 void _osr_drain_preceding(TransContext *txc);
2350 void _osr_drain_all();
2351
2352 void _kv_start();
2353 void _kv_stop();
2354 void _kv_sync_thread();
2355 void _kv_finalize_thread();
2356
2357 bluestore_deferred_op_t *_get_deferred_op(TransContext *txc);
2358 void _deferred_queue(TransContext *txc);
2359 public:
2360 void deferred_try_submit();
2361 private:
2362 void _deferred_submit_unlock(OpSequencer *osr);
2363 void _deferred_aio_finish(OpSequencer *osr);
2364 int _deferred_replay();
2365
2366 public:
2367 using mempool_dynamic_bitset =
2368 boost::dynamic_bitset<uint64_t,
2369 mempool::bluestore_fsck::pool_allocator<uint64_t>>;
2370 using per_pool_statfs =
2371 mempool::bluestore_fsck::map<uint64_t, store_statfs_t>;
2372
2373 enum FSCKDepth {
2374 FSCK_REGULAR,
2375 FSCK_DEEP,
2376 FSCK_SHALLOW
2377 };
2378 enum {
2379 MAX_FSCK_ERROR_LINES = 100,
2380 };
2381
2382 private:
2383 int _fsck_check_extents(
2384 const coll_t& cid,
2385 const ghobject_t& oid,
2386 const PExtentVector& extents,
2387 bool compressed,
2388 mempool_dynamic_bitset &used_blocks,
2389 uint64_t granularity,
2390 BlueStoreRepairer* repairer,
2391 store_statfs_t& expected_statfs,
2392 FSCKDepth depth);
2393
2394 void _fsck_check_pool_statfs(
2395 per_pool_statfs& expected_pool_statfs,
2396 int64_t& errors,
2397 int64_t &warnings,
2398 BlueStoreRepairer* repairer);
2399
2400 int _fsck(FSCKDepth depth, bool repair);
2401 int _fsck_on_open(BlueStore::FSCKDepth depth, bool repair);
2402
2403 void _buffer_cache_write(
2404 TransContext *txc,
2405 BlobRef b,
2406 uint64_t offset,
2407 bufferlist& bl,
2408 unsigned flags) {
2409 b->shared_blob->bc.write(b->shared_blob->get_cache(), txc->seq, offset, bl,
2410 flags);
2411 txc->shared_blobs_written.insert(b->shared_blob);
2412 }
2413
2414 int _collection_list(
2415 Collection *c, const ghobject_t& start, const ghobject_t& end,
2416 int max, vector<ghobject_t> *ls, ghobject_t *next);
2417
2418 template <typename T, typename F>
2419 T select_option(const std::string& opt_name, T val1, F f) {
2420 //NB: opt_name reserved for future use
2421 boost::optional<T> val2 = f();
2422 if (val2) {
2423 return *val2;
2424 }
2425 return val1;
2426 }
2427
2428 void _apply_padding(uint64_t head_pad,
2429 uint64_t tail_pad,
2430 bufferlist& padded);
2431
2432 void _record_onode(OnodeRef &o, KeyValueDB::Transaction &txn);
2433
2434 // -- ondisk version ---
2435 public:
2436 const int32_t latest_ondisk_format = 4; ///< our version
2437 const int32_t min_readable_ondisk_format = 1; ///< what we can read
2438 const int32_t min_compat_ondisk_format = 3; ///< who can read us
2439
2440 private:
2441 int32_t ondisk_format = 0; ///< value detected on mount
2442
2443 int _upgrade_super(); ///< upgrade (called during open_super)
2444 uint64_t _get_ondisk_reserved() const;
2445 void _prepare_ondisk_format_super(KeyValueDB::Transaction& t);
2446
2447 // --- public interface ---
2448 public:
2449 BlueStore(CephContext *cct, const string& path);
2450 BlueStore(CephContext *cct, const string& path, uint64_t min_alloc_size); // Ctor for UT only
2451 ~BlueStore() override;
2452
2453 string get_type() override {
2454 return "bluestore";
2455 }
2456
2457 bool needs_journal() override { return false; };
2458 bool wants_journal() override { return false; };
2459 bool allows_journal() override { return false; };
2460
2461 uint64_t get_min_alloc_size() const override {
2462 return min_alloc_size;
2463 }
2464
2465 int get_devices(set<string> *ls) override;
2466
2467 bool is_rotational() override;
2468 bool is_journal_rotational() override;
2469
2470 string get_default_device_class() override {
2471 string device_class;
2472 map<string, string> metadata;
2473 collect_metadata(&metadata);
2474 auto it = metadata.find("bluestore_bdev_type");
2475 if (it != metadata.end()) {
2476 device_class = it->second;
2477 }
2478 return device_class;
2479 }
2480
2481 int get_numa_node(
2482 int *numa_node,
2483 set<int> *nodes,
2484 set<string> *failed) override;
2485
2486 static int get_block_device_fsid(CephContext* cct, const string& path,
2487 uuid_d *fsid);
2488
2489 bool test_mount_in_use() override;
2490
2491 private:
2492 int _mount(bool kv_only, bool open_db=true);
2493 public:
2494 int mount() override {
2495 return _mount(false);
2496 }
2497 int umount() override;
2498
2499 int start_kv_only(KeyValueDB **pdb, bool open_db=true) {
2500 int r = _mount(true, open_db);
2501 if (r < 0)
2502 return r;
2503 *pdb = db;
2504 return 0;
2505 }
2506
2507 int write_meta(const std::string& key, const std::string& value) override;
2508 int read_meta(const std::string& key, std::string *value) override;
2509
2510 int cold_open();
2511 int cold_close();
2512
2513 int fsck(bool deep) override {
2514 return _fsck(deep ? FSCK_DEEP : FSCK_REGULAR, false);
2515 }
2516 int repair(bool deep) override {
2517 return _fsck(deep ? FSCK_DEEP : FSCK_REGULAR, true);
2518 }
2519 int quick_fix() override {
2520 return _fsck(FSCK_SHALLOW, true);
2521 }
2522
2523 void set_cache_shards(unsigned num) override;
2524 void dump_cache_stats(Formatter *f) override {
2525 int onode_count = 0, buffers_bytes = 0;
2526 for (auto i: onode_cache_shards) {
2527 onode_count += i->_get_num();
2528 }
2529 for (auto i: buffer_cache_shards) {
2530 buffers_bytes += i->_get_bytes();
2531 }
2532 f->dump_int("bluestore_onode", onode_count);
2533 f->dump_int("bluestore_buffers", buffers_bytes);
2534 }
2535 void dump_cache_stats(ostream& ss) override {
2536 int onode_count = 0, buffers_bytes = 0;
2537 for (auto i: onode_cache_shards) {
2538 onode_count += i->_get_num();
2539 }
2540 for (auto i: buffer_cache_shards) {
2541 buffers_bytes += i->_get_bytes();
2542 }
2543 ss << "bluestore_onode: " << onode_count;
2544 ss << "bluestore_buffers: " << buffers_bytes;
2545 }
2546
2547 int validate_hobject_key(const hobject_t &obj) const override {
2548 return 0;
2549 }
2550 unsigned get_max_attr_name_length() override {
2551 return 256; // arbitrary; there is no real limit internally
2552 }
2553
2554 int mkfs() override;
2555 int mkjournal() override {
2556 return 0;
2557 }
2558
2559 void get_db_statistics(Formatter *f) override;
2560 void generate_db_histogram(Formatter *f) override;
2561 void _flush_cache();
2562 int flush_cache(ostream *os = NULL) override;
2563 void dump_perf_counters(Formatter *f) override {
2564 f->open_object_section("perf_counters");
2565 logger->dump_formatted(f, false);
2566 f->close_section();
2567 }
2568
2569 int add_new_bluefs_device(int id, const string& path);
2570 int migrate_to_existing_bluefs_device(const set<int>& devs_source,
2571 int id);
2572 int migrate_to_new_bluefs_device(const set<int>& devs_source,
2573 int id,
2574 const string& path);
2575 int expand_devices(ostream& out);
2576 string get_device_path(unsigned id);
2577
2578 int dump_bluefs_sizes(ostream& out);
2579
2580 public:
2581 int statfs(struct store_statfs_t *buf,
2582 osd_alert_list_t* alerts = nullptr) override;
2583 int pool_statfs(uint64_t pool_id, struct store_statfs_t *buf,
2584 bool *per_pool_omap) override;
2585
2586 void collect_metadata(map<string,string> *pm) override;
2587
2588 bool exists(CollectionHandle &c, const ghobject_t& oid) override;
2589 int set_collection_opts(
2590 CollectionHandle& c,
2591 const pool_opts_t& opts) override;
2592 int stat(
2593 CollectionHandle &c,
2594 const ghobject_t& oid,
2595 struct stat *st,
2596 bool allow_eio = false) override;
2597 int read(
2598 CollectionHandle &c,
2599 const ghobject_t& oid,
2600 uint64_t offset,
2601 size_t len,
2602 bufferlist& bl,
2603 uint32_t op_flags = 0) override;
2604
2605 private:
2606
2607 // --------------------------------------------------------
2608 // intermediate data structures used while reading
2609 struct region_t {
2610 uint64_t logical_offset;
2611 uint64_t blob_xoffset; //region offset within the blob
2612 uint64_t length;
2613
2614 // used later in read process
2615 uint64_t front = 0;
2616
2617 region_t(uint64_t offset, uint64_t b_offs, uint64_t len, uint64_t front = 0)
2618 : logical_offset(offset),
2619 blob_xoffset(b_offs),
2620 length(len),
2621 front(front){}
2622 region_t(const region_t& from)
2623 : logical_offset(from.logical_offset),
2624 blob_xoffset(from.blob_xoffset),
2625 length(from.length),
2626 front(from.front){}
2627
2628 friend ostream& operator<<(ostream& out, const region_t& r) {
2629 return out << "0x" << std::hex << r.logical_offset << ":"
2630 << r.blob_xoffset << "~" << r.length << std::dec;
2631 }
2632 };
2633
2634 // merged blob read request
2635 struct read_req_t {
2636 uint64_t r_off = 0;
2637 uint64_t r_len = 0;
2638 bufferlist bl;
2639 std::list<region_t> regs; // original read regions
2640
2641 read_req_t(uint64_t off, uint64_t len) : r_off(off), r_len(len) {}
2642
2643 friend ostream& operator<<(ostream& out, const read_req_t& r) {
2644 out << "{<0x" << std::hex << r.r_off << ", 0x" << r.r_len << "> : [";
2645 for (const auto& reg : r.regs)
2646 out << reg;
2647 return out << "]}" << std::dec;
2648 }
2649 };
2650
2651 typedef list<read_req_t> regions2read_t;
2652 typedef map<BlueStore::BlobRef, regions2read_t> blobs2read_t;
2653
2654 void _read_cache(
2655 OnodeRef o,
2656 uint64_t offset,
2657 size_t length,
2658 int read_cache_policy,
2659 ready_regions_t& ready_regions,
2660 blobs2read_t& blobs2read);
2661
2662
2663 int _prepare_read_ioc(
2664 blobs2read_t& blobs2read,
2665 vector<bufferlist>* compressed_blob_bls,
2666 IOContext* ioc);
2667
2668 int _generate_read_result_bl(
2669 OnodeRef o,
2670 uint64_t offset,
2671 size_t length,
2672 ready_regions_t& ready_regions,
2673 vector<bufferlist>& compressed_blob_bls,
2674 blobs2read_t& blobs2read,
2675 bool buffered,
2676 bool* csum_error,
2677 bufferlist& bl);
2678
2679 int _do_read(
2680 Collection *c,
2681 OnodeRef o,
2682 uint64_t offset,
2683 size_t len,
2684 bufferlist& bl,
2685 uint32_t op_flags = 0,
2686 uint64_t retry_count = 0);
2687
2688 int _do_readv(
2689 Collection *c,
2690 OnodeRef o,
2691 const interval_set<uint64_t>& m,
2692 bufferlist& bl,
2693 uint32_t op_flags = 0,
2694 uint64_t retry_count = 0);
2695
2696 int _fiemap(CollectionHandle &c_, const ghobject_t& oid,
2697 uint64_t offset, size_t len, interval_set<uint64_t>& destset);
2698 public:
2699 int fiemap(CollectionHandle &c, const ghobject_t& oid,
2700 uint64_t offset, size_t len, bufferlist& bl) override;
2701 int fiemap(CollectionHandle &c, const ghobject_t& oid,
2702 uint64_t offset, size_t len, map<uint64_t, uint64_t>& destmap) override;
2703
2704 int readv(
2705 CollectionHandle &c_,
2706 const ghobject_t& oid,
2707 interval_set<uint64_t>& m,
2708 bufferlist& bl,
2709 uint32_t op_flags) override;
2710
2711 int dump_onode(CollectionHandle &c, const ghobject_t& oid,
2712 const string& section_name, Formatter *f) override;
2713
2714 int getattr(CollectionHandle &c, const ghobject_t& oid, const char *name,
2715 bufferptr& value) override;
2716
2717 int getattrs(CollectionHandle &c, const ghobject_t& oid,
2718 map<string,bufferptr>& aset) override;
2719
2720 int list_collections(vector<coll_t>& ls) override;
2721
2722 CollectionHandle open_collection(const coll_t &c) override;
2723 CollectionHandle create_new_collection(const coll_t& cid) override;
2724 void set_collection_commit_queue(const coll_t& cid,
2725 ContextQueue *commit_queue) override;
2726
2727 bool collection_exists(const coll_t& c) override;
2728 int collection_empty(CollectionHandle& c, bool *empty) override;
2729 int collection_bits(CollectionHandle& c) override;
2730
2731 int collection_list(CollectionHandle &c,
2732 const ghobject_t& start,
2733 const ghobject_t& end,
2734 int max,
2735 vector<ghobject_t> *ls, ghobject_t *next) override;
2736
2737 int omap_get(
2738 CollectionHandle &c, ///< [in] Collection containing oid
2739 const ghobject_t &oid, ///< [in] Object containing omap
2740 bufferlist *header, ///< [out] omap header
2741 map<string, bufferlist> *out /// < [out] Key to value map
2742 ) override;
2743 int _omap_get(
2744 Collection *c, ///< [in] Collection containing oid
2745 const ghobject_t &oid, ///< [in] Object containing omap
2746 bufferlist *header, ///< [out] omap header
2747 map<string, bufferlist> *out /// < [out] Key to value map
2748 );
2749 int _onode_omap_get(
2750 const OnodeRef &o, ///< [in] Object containing omap
2751 bufferlist *header, ///< [out] omap header
2752 map<string, bufferlist> *out /// < [out] Key to value map
2753 );
2754
2755
2756 /// Get omap header
2757 int omap_get_header(
2758 CollectionHandle &c, ///< [in] Collection containing oid
2759 const ghobject_t &oid, ///< [in] Object containing omap
2760 bufferlist *header, ///< [out] omap header
2761 bool allow_eio = false ///< [in] don't assert on eio
2762 ) override;
2763
2764 /// Get keys defined on oid
2765 int omap_get_keys(
2766 CollectionHandle &c, ///< [in] Collection containing oid
2767 const ghobject_t &oid, ///< [in] Object containing omap
2768 set<string> *keys ///< [out] Keys defined on oid
2769 ) override;
2770
2771 /// Get key values
2772 int omap_get_values(
2773 CollectionHandle &c, ///< [in] Collection containing oid
2774 const ghobject_t &oid, ///< [in] Object containing omap
2775 const set<string> &keys, ///< [in] Keys to get
2776 map<string, bufferlist> *out ///< [out] Returned keys and values
2777 ) override;
2778
2779 #ifdef WITH_SEASTAR
2780 int omap_get_values(
2781 CollectionHandle &c, ///< [in] Collection containing oid
2782 const ghobject_t &oid, ///< [in] Object containing omap
2783 const std::optional<string> &start_after, ///< [in] Keys to get
2784 map<string, bufferlist> *out ///< [out] Returned keys and values
2785 ) override;
2786 #endif
2787
2788 /// Filters keys into out which are defined on oid
2789 int omap_check_keys(
2790 CollectionHandle &c, ///< [in] Collection containing oid
2791 const ghobject_t &oid, ///< [in] Object containing omap
2792 const set<string> &keys, ///< [in] Keys to check
2793 set<string> *out ///< [out] Subset of keys defined on oid
2794 ) override;
2795
2796 ObjectMap::ObjectMapIterator get_omap_iterator(
2797 CollectionHandle &c, ///< [in] collection
2798 const ghobject_t &oid ///< [in] object
2799 ) override;
2800
2801 void set_fsid(uuid_d u) override {
2802 fsid = u;
2803 }
2804 uuid_d get_fsid() override {
2805 return fsid;
2806 }
2807
2808 uint64_t estimate_objects_overhead(uint64_t num_objects) override {
2809 return num_objects * 300; //assuming per-object overhead is 300 bytes
2810 }
2811
2812 struct BSPerfTracker {
2813 PerfCounters::avg_tracker<uint64_t> os_commit_latency_ns;
2814 PerfCounters::avg_tracker<uint64_t> os_apply_latency_ns;
2815
2816 objectstore_perf_stat_t get_cur_stats() const {
2817 objectstore_perf_stat_t ret;
2818 ret.os_commit_latency_ns = os_commit_latency_ns.current_avg();
2819 ret.os_apply_latency_ns = os_apply_latency_ns.current_avg();
2820 return ret;
2821 }
2822
2823 void update_from_perfcounters(PerfCounters &logger);
2824 } perf_tracker;
2825
2826 objectstore_perf_stat_t get_cur_stats() override {
2827 perf_tracker.update_from_perfcounters(*logger);
2828 return perf_tracker.get_cur_stats();
2829 }
2830 const PerfCounters* get_perf_counters() const override {
2831 return logger;
2832 }
2833 const PerfCounters* get_bluefs_perf_counters() const {
2834 return bluefs->get_perf_counters();
2835 }
2836
2837 int queue_transactions(
2838 CollectionHandle& ch,
2839 vector<Transaction>& tls,
2840 TrackedOpRef op = TrackedOpRef(),
2841 ThreadPool::TPHandle *handle = NULL) override;
2842
2843 // error injection
2844 void inject_data_error(const ghobject_t& o) override {
2845 std::unique_lock l(debug_read_error_lock);
2846 debug_data_error_objects.insert(o);
2847 }
2848 void inject_mdata_error(const ghobject_t& o) override {
2849 std::unique_lock l(debug_read_error_lock);
2850 debug_mdata_error_objects.insert(o);
2851 }
2852
2853 /// methods to inject various errors fsck can repair
2854 void inject_broken_shared_blob_key(const string& key,
2855 const bufferlist& bl);
2856 void inject_leaked(uint64_t len);
2857 void inject_false_free(coll_t cid, ghobject_t oid);
2858 void inject_statfs(const string& key, const store_statfs_t& new_statfs);
2859 void inject_global_statfs(const store_statfs_t& new_statfs);
2860 void inject_misreference(coll_t cid1, ghobject_t oid1,
2861 coll_t cid2, ghobject_t oid2,
2862 uint64_t offset);
2863 // resets global per_pool_omap in DB
2864 void inject_legacy_omap();
2865 // resets per_pool_omap | pgmeta_omap for onode
2866 void inject_legacy_omap(coll_t cid, ghobject_t oid);
2867
2868 void compact() override {
2869 ceph_assert(db);
2870 db->compact();
2871 }
2872 bool has_builtin_csum() const override {
2873 return true;
2874 }
2875
2876 /*
2877 Allocate space for BlueFS from slow device.
2878 Either automatically applies allocated extents to underlying
2879 BlueFS (extents == nullptr) or just return them (non-null extents) provided
2880 */
2881 int allocate_bluefs_freespace(
2882 uint64_t min_size,
2883 uint64_t size,
2884 PExtentVector* extents);
2885
2886 inline void log_latency(const char* name,
2887 int idx,
2888 const ceph::timespan& lat,
2889 double lat_threshold,
2890 const char* info = "") const;
2891
2892 inline void log_latency_fn(const char* name,
2893 int idx,
2894 const ceph::timespan& lat,
2895 double lat_threshold,
2896 std::function<string (const ceph::timespan& lat)> fn) const;
2897
2898 private:
2899 bool _debug_data_eio(const ghobject_t& o) {
2900 if (!cct->_conf->bluestore_debug_inject_read_err) {
2901 return false;
2902 }
2903 std::shared_lock l(debug_read_error_lock);
2904 return debug_data_error_objects.count(o);
2905 }
2906 bool _debug_mdata_eio(const ghobject_t& o) {
2907 if (!cct->_conf->bluestore_debug_inject_read_err) {
2908 return false;
2909 }
2910 std::shared_lock l(debug_read_error_lock);
2911 return debug_mdata_error_objects.count(o);
2912 }
2913 void _debug_obj_on_delete(const ghobject_t& o) {
2914 if (cct->_conf->bluestore_debug_inject_read_err) {
2915 std::unique_lock l(debug_read_error_lock);
2916 debug_data_error_objects.erase(o);
2917 debug_mdata_error_objects.erase(o);
2918 }
2919 }
2920 private:
2921 ceph::mutex qlock = ceph::make_mutex("BlueStore::Alerts::qlock");
2922 string failed_cmode;
2923 set<string> failed_compressors;
2924 string spillover_alert;
2925 string legacy_statfs_alert;
2926 string no_per_pool_omap_alert;
2927 string disk_size_mismatch_alert;
2928
2929 void _log_alerts(osd_alert_list_t& alerts);
2930 bool _set_compression_alert(bool cmode, const char* s) {
2931 std::lock_guard l(qlock);
2932 if (cmode) {
2933 bool ret = failed_cmode.empty();
2934 failed_cmode = s;
2935 return ret;
2936 }
2937 return failed_compressors.emplace(s).second;
2938 }
2939 void _clear_compression_alert() {
2940 std::lock_guard l(qlock);
2941 failed_compressors.clear();
2942 failed_cmode.clear();
2943 }
2944
2945 void _set_spillover_alert(const string& s) {
2946 std::lock_guard l(qlock);
2947 spillover_alert = s;
2948 }
2949 void _clear_spillover_alert() {
2950 std::lock_guard l(qlock);
2951 spillover_alert.clear();
2952 }
2953
2954 void _check_legacy_statfs_alert();
2955 void _check_no_per_pool_omap_alert();
2956 void _set_disk_size_mismatch_alert(const string& s) {
2957 std::lock_guard l(qlock);
2958 disk_size_mismatch_alert = s;
2959 }
2960
2961 private:
2962
2963 // --------------------------------------------------------
2964 // read processing internal methods
2965 int _verify_csum(
2966 OnodeRef& o,
2967 const bluestore_blob_t* blob,
2968 uint64_t blob_xoffset,
2969 const bufferlist& bl,
2970 uint64_t logical_offset) const;
2971 int _decompress(bufferlist& source, bufferlist* result);
2972
2973
2974 // --------------------------------------------------------
2975 // write ops
2976
2977 struct WriteContext {
2978 bool buffered = false; ///< buffered write
2979 bool compress = false; ///< compressed write
2980 uint64_t target_blob_size = 0; ///< target (max) blob size
2981 unsigned csum_order = 0; ///< target checksum chunk order
2982
2983 old_extent_map_t old_extents; ///< must deref these blobs
2984 interval_set<uint64_t> extents_to_gc; ///< extents for garbage collection
2985
2986 struct write_item {
2987 uint64_t logical_offset; ///< write logical offset
2988 BlobRef b;
2989 uint64_t blob_length;
2990 uint64_t b_off;
2991 bufferlist bl;
2992 uint64_t b_off0; ///< original offset in a blob prior to padding
2993 uint64_t length0; ///< original data length prior to padding
2994
2995 bool mark_unused;
2996 bool new_blob; ///< whether new blob was created
2997
2998 bool compressed = false;
2999 bufferlist compressed_bl;
3000 size_t compressed_len = 0;
3001
3002 write_item(
3003 uint64_t logical_offs,
3004 BlobRef b,
3005 uint64_t blob_len,
3006 uint64_t o,
3007 bufferlist& bl,
3008 uint64_t o0,
3009 uint64_t l0,
3010 bool _mark_unused,
3011 bool _new_blob)
3012 :
3013 logical_offset(logical_offs),
3014 b(b),
3015 blob_length(blob_len),
3016 b_off(o),
3017 bl(bl),
3018 b_off0(o0),
3019 length0(l0),
3020 mark_unused(_mark_unused),
3021 new_blob(_new_blob) {}
3022 };
3023 vector<write_item> writes; ///< blobs we're writing
3024
3025 /// partial clone of the context
3026 void fork(const WriteContext& other) {
3027 buffered = other.buffered;
3028 compress = other.compress;
3029 target_blob_size = other.target_blob_size;
3030 csum_order = other.csum_order;
3031 }
3032 void write(
3033 uint64_t loffs,
3034 BlobRef b,
3035 uint64_t blob_len,
3036 uint64_t o,
3037 bufferlist& bl,
3038 uint64_t o0,
3039 uint64_t len0,
3040 bool _mark_unused,
3041 bool _new_blob) {
3042 writes.emplace_back(loffs,
3043 b,
3044 blob_len,
3045 o,
3046 bl,
3047 o0,
3048 len0,
3049 _mark_unused,
3050 _new_blob);
3051 }
3052 /// Checks for writes to the same pextent within a blob
3053 bool has_conflict(
3054 BlobRef b,
3055 uint64_t loffs,
3056 uint64_t loffs_end,
3057 uint64_t min_alloc_size);
3058 };
3059
3060 void _do_write_small(
3061 TransContext *txc,
3062 CollectionRef &c,
3063 OnodeRef o,
3064 uint64_t offset, uint64_t length,
3065 bufferlist::iterator& blp,
3066 WriteContext *wctx);
3067 void _do_write_big(
3068 TransContext *txc,
3069 CollectionRef &c,
3070 OnodeRef o,
3071 uint64_t offset, uint64_t length,
3072 bufferlist::iterator& blp,
3073 WriteContext *wctx);
3074 int _do_alloc_write(
3075 TransContext *txc,
3076 CollectionRef c,
3077 OnodeRef o,
3078 WriteContext *wctx);
3079 void _wctx_finish(
3080 TransContext *txc,
3081 CollectionRef& c,
3082 OnodeRef o,
3083 WriteContext *wctx,
3084 set<SharedBlob*> *maybe_unshared_blobs=0);
3085
3086 int _write(TransContext *txc,
3087 CollectionRef& c,
3088 OnodeRef& o,
3089 uint64_t offset, size_t len,
3090 bufferlist& bl,
3091 uint32_t fadvise_flags);
3092 void _pad_zeros(bufferlist *bl, uint64_t *offset,
3093 uint64_t chunk_size);
3094
3095 void _choose_write_options(CollectionRef& c,
3096 OnodeRef o,
3097 uint32_t fadvise_flags,
3098 WriteContext *wctx);
3099
3100 int _do_gc(TransContext *txc,
3101 CollectionRef& c,
3102 OnodeRef o,
3103 const WriteContext& wctx,
3104 uint64_t *dirty_start,
3105 uint64_t *dirty_end);
3106
3107 int _do_write(TransContext *txc,
3108 CollectionRef &c,
3109 OnodeRef o,
3110 uint64_t offset, uint64_t length,
3111 bufferlist& bl,
3112 uint32_t fadvise_flags);
3113 void _do_write_data(TransContext *txc,
3114 CollectionRef& c,
3115 OnodeRef o,
3116 uint64_t offset,
3117 uint64_t length,
3118 bufferlist& bl,
3119 WriteContext *wctx);
3120
3121 int _touch(TransContext *txc,
3122 CollectionRef& c,
3123 OnodeRef& o);
3124 int _do_zero(TransContext *txc,
3125 CollectionRef& c,
3126 OnodeRef& o,
3127 uint64_t offset, size_t len);
3128 int _zero(TransContext *txc,
3129 CollectionRef& c,
3130 OnodeRef& o,
3131 uint64_t offset, size_t len);
3132 void _do_truncate(TransContext *txc,
3133 CollectionRef& c,
3134 OnodeRef o,
3135 uint64_t offset,
3136 set<SharedBlob*> *maybe_unshared_blobs=0);
3137 int _truncate(TransContext *txc,
3138 CollectionRef& c,
3139 OnodeRef& o,
3140 uint64_t offset);
3141 int _remove(TransContext *txc,
3142 CollectionRef& c,
3143 OnodeRef& o);
3144 int _do_remove(TransContext *txc,
3145 CollectionRef& c,
3146 OnodeRef o);
3147 int _setattr(TransContext *txc,
3148 CollectionRef& c,
3149 OnodeRef& o,
3150 const string& name,
3151 bufferptr& val);
3152 int _setattrs(TransContext *txc,
3153 CollectionRef& c,
3154 OnodeRef& o,
3155 const map<string,bufferptr>& aset);
3156 int _rmattr(TransContext *txc,
3157 CollectionRef& c,
3158 OnodeRef& o,
3159 const string& name);
3160 int _rmattrs(TransContext *txc,
3161 CollectionRef& c,
3162 OnodeRef& o);
3163 void _do_omap_clear(TransContext *txc, OnodeRef &o);
3164 int _omap_clear(TransContext *txc,
3165 CollectionRef& c,
3166 OnodeRef& o);
3167 int _omap_setkeys(TransContext *txc,
3168 CollectionRef& c,
3169 OnodeRef& o,
3170 bufferlist& bl);
3171 int _omap_setheader(TransContext *txc,
3172 CollectionRef& c,
3173 OnodeRef& o,
3174 bufferlist& header);
3175 int _omap_rmkeys(TransContext *txc,
3176 CollectionRef& c,
3177 OnodeRef& o,
3178 bufferlist& bl);
3179 int _omap_rmkey_range(TransContext *txc,
3180 CollectionRef& c,
3181 OnodeRef& o,
3182 const string& first, const string& last);
3183 int _set_alloc_hint(
3184 TransContext *txc,
3185 CollectionRef& c,
3186 OnodeRef& o,
3187 uint64_t expected_object_size,
3188 uint64_t expected_write_size,
3189 uint32_t flags);
3190 int _do_clone_range(TransContext *txc,
3191 CollectionRef& c,
3192 OnodeRef& oldo,
3193 OnodeRef& newo,
3194 uint64_t srcoff, uint64_t length, uint64_t dstoff);
3195 int _clone(TransContext *txc,
3196 CollectionRef& c,
3197 OnodeRef& oldo,
3198 OnodeRef& newo);
3199 int _clone_range(TransContext *txc,
3200 CollectionRef& c,
3201 OnodeRef& oldo,
3202 OnodeRef& newo,
3203 uint64_t srcoff, uint64_t length, uint64_t dstoff);
3204 int _rename(TransContext *txc,
3205 CollectionRef& c,
3206 OnodeRef& oldo,
3207 OnodeRef& newo,
3208 const ghobject_t& new_oid);
3209 int _create_collection(TransContext *txc, const coll_t &cid,
3210 unsigned bits, CollectionRef *c);
3211 int _remove_collection(TransContext *txc, const coll_t &cid,
3212 CollectionRef *c);
3213 void _do_remove_collection(TransContext *txc, CollectionRef *c);
3214 int _split_collection(TransContext *txc,
3215 CollectionRef& c,
3216 CollectionRef& d,
3217 unsigned bits, int rem);
3218 int _merge_collection(TransContext *txc,
3219 CollectionRef *c,
3220 CollectionRef& d,
3221 unsigned bits);
3222
3223 void _collect_allocation_stats(uint64_t need, uint32_t alloc_size,
3224 size_t extents);
3225 void _record_allocation_stats();
3226 private:
3227 uint64_t probe_count = 0;
3228 std::atomic<uint64_t> alloc_stats_count = {0};
3229 std::atomic<uint64_t> alloc_stats_fragments = { 0 };
3230 std::atomic<uint64_t> alloc_stats_size = { 0 };
3231 //
3232 std::array<std::tuple<uint64_t, uint64_t, uint64_t>, 5> alloc_stats_history =
3233 { std::make_tuple(0ul, 0ul, 0ul) };
3234
3235 std::atomic<uint64_t> out_of_sync_fm = {0};
3236 // --------------------------------------------------------
3237 // BlueFSDeviceExpander implementation
3238 uint64_t get_recommended_expansion_delta(uint64_t bluefs_free,
3239 uint64_t bluefs_total) override {
3240 auto delta = _get_bluefs_size_delta(bluefs_free, bluefs_total);
3241 return delta > 0 ? delta : 0;
3242 }
3243 int allocate_freespace(
3244 uint64_t min_size,
3245 uint64_t size,
3246 PExtentVector& extents) override {
3247 return allocate_bluefs_freespace(min_size, size, &extents);
3248 };
3249 uint64_t available_freespace(uint64_t alloc_size) override;
3250 inline bool _use_rotational_settings();
3251
3252 public:
3253 struct sb_info_t {
3254 coll_t cid;
3255 int64_t pool_id = INT64_MIN;
3256 list<ghobject_t> oids;
3257 BlueStore::SharedBlobRef sb;
3258 bluestore_extent_ref_map_t ref_map;
3259 bool compressed = false;
3260 bool passed = false;
3261 bool updated = false;
3262 };
3263 typedef btree::btree_set<
3264 uint64_t, std::less<uint64_t>,
3265 mempool::bluestore_fsck::pool_allocator<uint64_t>> uint64_t_btree_t;
3266
3267 typedef mempool::bluestore_fsck::map<uint64_t, sb_info_t> sb_info_map_t;
3268 struct FSCK_ObjectCtx {
3269 int64_t& errors;
3270 int64_t& warnings;
3271 uint64_t& num_objects;
3272 uint64_t& num_extents;
3273 uint64_t& num_blobs;
3274 uint64_t& num_sharded_objects;
3275 uint64_t& num_spanning_blobs;
3276
3277 mempool_dynamic_bitset* used_blocks;
3278 uint64_t_btree_t* used_omap_head;
3279
3280 ceph::mutex* sb_info_lock;
3281 sb_info_map_t& sb_info;
3282
3283 store_statfs_t& expected_store_statfs;
3284 per_pool_statfs& expected_pool_statfs;
3285 BlueStoreRepairer* repairer;
3286
3287 FSCK_ObjectCtx(int64_t& e,
3288 int64_t& w,
3289 uint64_t& _num_objects,
3290 uint64_t& _num_extents,
3291 uint64_t& _num_blobs,
3292 uint64_t& _num_sharded_objects,
3293 uint64_t& _num_spanning_blobs,
3294 mempool_dynamic_bitset* _ub,
3295 uint64_t_btree_t* _used_omap_head,
3296 ceph::mutex* _sb_info_lock,
3297 sb_info_map_t& _sb_info,
3298 store_statfs_t& _store_statfs,
3299 per_pool_statfs& _pool_statfs,
3300 BlueStoreRepairer* _repairer) :
3301 errors(e),
3302 warnings(w),
3303 num_objects(_num_objects),
3304 num_extents(_num_extents),
3305 num_blobs(_num_blobs),
3306 num_sharded_objects(_num_sharded_objects),
3307 num_spanning_blobs(_num_spanning_blobs),
3308 used_blocks(_ub),
3309 used_omap_head(_used_omap_head),
3310 sb_info_lock(_sb_info_lock),
3311 sb_info(_sb_info),
3312 expected_store_statfs(_store_statfs),
3313 expected_pool_statfs(_pool_statfs),
3314 repairer(_repairer) {
3315 }
3316 };
3317
3318 OnodeRef fsck_check_objects_shallow(
3319 FSCKDepth depth,
3320 int64_t pool_id,
3321 CollectionRef c,
3322 const ghobject_t& oid,
3323 const string& key,
3324 const bufferlist& value,
3325 mempool::bluestore_fsck::list<string>* expecting_shards,
3326 map<BlobRef, bluestore_blob_t::unused_t>* referenced,
3327 const BlueStore::FSCK_ObjectCtx& ctx);
3328
3329 private:
3330 void _fsck_check_object_omap(FSCKDepth depth,
3331 OnodeRef& o,
3332 const BlueStore::FSCK_ObjectCtx& ctx);
3333
3334 void _fsck_check_objects(FSCKDepth depth,
3335 FSCK_ObjectCtx& ctx);
3336 };
3337
3338 inline ostream& operator<<(ostream& out, const BlueStore::volatile_statfs& s) {
3339 return out
3340 << " allocated:"
3341 << s.values[BlueStore::volatile_statfs::STATFS_ALLOCATED]
3342 << " stored:"
3343 << s.values[BlueStore::volatile_statfs::STATFS_STORED]
3344 << " compressed:"
3345 << s.values[BlueStore::volatile_statfs::STATFS_COMPRESSED]
3346 << " compressed_orig:"
3347 << s.values[BlueStore::volatile_statfs::STATFS_COMPRESSED_ORIGINAL]
3348 << " compressed_alloc:"
3349 << s.values[BlueStore::volatile_statfs::STATFS_COMPRESSED_ALLOCATED];
3350 }
3351
3352 static inline void intrusive_ptr_add_ref(BlueStore::Onode *o) {
3353 o->get();
3354 }
3355 static inline void intrusive_ptr_release(BlueStore::Onode *o) {
3356 o->put();
3357 }
3358
3359 static inline void intrusive_ptr_add_ref(BlueStore::OpSequencer *o) {
3360 o->get();
3361 }
3362 static inline void intrusive_ptr_release(BlueStore::OpSequencer *o) {
3363 o->put();
3364 }
3365
3366 class BlueStoreRepairer
3367 {
3368 public:
3369 // to simplify future potential migration to mempools
3370 using fsck_interval = interval_set<uint64_t>;
3371
3372 // Structure to track what pextents are used for specific cid/oid.
3373 // Similar to Bloom filter positive and false-positive matches are
3374 // possible only.
3375 // Maintains two lists of bloom filters for both cids and oids
3376 // where each list entry is a BF for specific disk pextent
3377 // The length of the extent per filter is measured on init.
3378 // Allows to filter out 'uninteresting' pextents to speadup subsequent
3379 // 'is_used' access.
3380 struct StoreSpaceTracker {
3381 const uint64_t BLOOM_FILTER_SALT_COUNT = 2;
3382 const uint64_t BLOOM_FILTER_TABLE_SIZE = 32; // bytes per single filter
3383 const uint64_t BLOOM_FILTER_EXPECTED_COUNT = 16; // arbitrary selected
3384 static const uint64_t DEF_MEM_CAP = 128 * 1024 * 1024;
3385
3386 typedef mempool::bluestore_fsck::vector<bloom_filter> bloom_vector;
3387 bloom_vector collections_bfs;
3388 bloom_vector objects_bfs;
3389
3390 bool was_filtered_out = false;
3391 uint64_t granularity = 0; // extent length for a single filter
3392
3393 StoreSpaceTracker() {
3394 }
3395 StoreSpaceTracker(const StoreSpaceTracker& from) :
3396 collections_bfs(from.collections_bfs),
3397 objects_bfs(from.objects_bfs),
3398 granularity(from.granularity) {
3399 }
3400
3401 void init(uint64_t total,
3402 uint64_t min_alloc_size,
3403 uint64_t mem_cap = DEF_MEM_CAP) {
3404 ceph_assert(!granularity); // not initialized yet
3405 ceph_assert(min_alloc_size && isp2(min_alloc_size));
3406 ceph_assert(mem_cap);
3407
3408 total = round_up_to(total, min_alloc_size);
3409 granularity = total * BLOOM_FILTER_TABLE_SIZE * 2 / mem_cap;
3410
3411 if (!granularity) {
3412 granularity = min_alloc_size;
3413 } else {
3414 granularity = round_up_to(granularity, min_alloc_size);
3415 }
3416
3417 uint64_t entries = round_up_to(total, granularity) / granularity;
3418 collections_bfs.resize(entries,
3419 bloom_filter(BLOOM_FILTER_SALT_COUNT,
3420 BLOOM_FILTER_TABLE_SIZE,
3421 0,
3422 BLOOM_FILTER_EXPECTED_COUNT));
3423 objects_bfs.resize(entries,
3424 bloom_filter(BLOOM_FILTER_SALT_COUNT,
3425 BLOOM_FILTER_TABLE_SIZE,
3426 0,
3427 BLOOM_FILTER_EXPECTED_COUNT));
3428 }
3429 inline uint32_t get_hash(const coll_t& cid) const {
3430 return cid.hash_to_shard(1);
3431 }
3432 inline void set_used(uint64_t offset, uint64_t len,
3433 const coll_t& cid, const ghobject_t& oid) {
3434 ceph_assert(granularity); // initialized
3435
3436 // can't call this func after filter_out has been applied
3437 ceph_assert(!was_filtered_out);
3438 if (!len) {
3439 return;
3440 }
3441 auto pos = offset / granularity;
3442 auto end_pos = (offset + len - 1) / granularity;
3443 while (pos <= end_pos) {
3444 collections_bfs[pos].insert(get_hash(cid));
3445 objects_bfs[pos].insert(oid.hobj.get_hash());
3446 ++pos;
3447 }
3448 }
3449 // filter-out entries unrelated to the specified(broken) extents.
3450 // 'is_used' calls are permitted after that only
3451 size_t filter_out(const fsck_interval& extents);
3452
3453 // determines if collection's present after filtering-out
3454 inline bool is_used(const coll_t& cid) const {
3455 ceph_assert(was_filtered_out);
3456 for(auto& bf : collections_bfs) {
3457 if (bf.contains(get_hash(cid))) {
3458 return true;
3459 }
3460 }
3461 return false;
3462 }
3463 // determines if object's present after filtering-out
3464 inline bool is_used(const ghobject_t& oid) const {
3465 ceph_assert(was_filtered_out);
3466 for(auto& bf : objects_bfs) {
3467 if (bf.contains(oid.hobj.get_hash())) {
3468 return true;
3469 }
3470 }
3471 return false;
3472 }
3473 // determines if collection's present before filtering-out
3474 inline bool is_used(const coll_t& cid, uint64_t offs) const {
3475 ceph_assert(granularity); // initialized
3476 ceph_assert(!was_filtered_out);
3477 auto &bf = collections_bfs[offs / granularity];
3478 if (bf.contains(get_hash(cid))) {
3479 return true;
3480 }
3481 return false;
3482 }
3483 // determines if object's present before filtering-out
3484 inline bool is_used(const ghobject_t& oid, uint64_t offs) const {
3485 ceph_assert(granularity); // initialized
3486 ceph_assert(!was_filtered_out);
3487 auto &bf = objects_bfs[offs / granularity];
3488 if (bf.contains(oid.hobj.get_hash())) {
3489 return true;
3490 }
3491 return false;
3492 }
3493 };
3494 public:
3495 void fix_per_pool_omap(KeyValueDB *db);
3496 bool remove_key(KeyValueDB *db, const string& prefix, const string& key);
3497 bool fix_shared_blob(KeyValueDB *db,
3498 uint64_t sbid,
3499 const bufferlist* bl);
3500 bool fix_statfs(KeyValueDB *db, const string& key,
3501 const store_statfs_t& new_statfs);
3502
3503 bool fix_leaked(KeyValueDB *db,
3504 FreelistManager* fm,
3505 uint64_t offset, uint64_t len);
3506 bool fix_false_free(KeyValueDB *db,
3507 FreelistManager* fm,
3508 uint64_t offset, uint64_t len);
3509 bool fix_bluefs_extents(std::atomic<uint64_t>& out_of_sync_flag);
3510
3511 void init(uint64_t total_space, uint64_t lres_tracking_unit_size);
3512
3513 bool preprocess_misreference(KeyValueDB *db);
3514
3515 unsigned apply(KeyValueDB* db);
3516
3517 void note_misreference(uint64_t offs, uint64_t len, bool inc_error) {
3518 misreferenced_extents.union_insert(offs, len);
3519 if (inc_error) {
3520 ++to_repair_cnt;
3521 }
3522 }
3523 // In fact this is the only repairer's method which is thread-safe!!
3524 void inc_repaired() {
3525 ++to_repair_cnt;
3526 }
3527
3528 StoreSpaceTracker& get_space_usage_tracker() {
3529 return space_usage_tracker;
3530 }
3531 const fsck_interval& get_misreferences() const {
3532 return misreferenced_extents;
3533 }
3534 KeyValueDB::Transaction get_fix_misreferences_txn() {
3535 return fix_misreferences_txn;
3536 }
3537
3538 private:
3539 std::atomic<unsigned> to_repair_cnt = { 0 };
3540 KeyValueDB::Transaction fix_per_pool_omap_txn;
3541 KeyValueDB::Transaction fix_fm_leaked_txn;
3542 KeyValueDB::Transaction fix_fm_false_free_txn;
3543 KeyValueDB::Transaction remove_key_txn;
3544 KeyValueDB::Transaction fix_statfs_txn;
3545 KeyValueDB::Transaction fix_shared_blob_txn;
3546
3547 KeyValueDB::Transaction fix_misreferences_txn;
3548
3549 StoreSpaceTracker space_usage_tracker;
3550
3551 // non-shared extents with multiple references
3552 fsck_interval misreferenced_extents;
3553
3554 };
3555
3556 class RocksDBBlueFSVolumeSelector : public BlueFSVolumeSelector
3557 {
3558 template <class T, size_t MaxX, size_t MaxY>
3559 class matrix_2d {
3560 T values[MaxX][MaxY];
3561 public:
3562 matrix_2d() {
3563 clear();
3564 }
3565 T& at(size_t x, size_t y) {
3566 ceph_assert(x < MaxX);
3567 ceph_assert(y < MaxY);
3568
3569 return values[x][y];
3570 }
3571 size_t get_max_x() const {
3572 return MaxX;
3573 }
3574 size_t get_max_y() const {
3575 return MaxY;
3576 }
3577 void clear() {
3578 memset(values, 0, sizeof(values));
3579 }
3580 };
3581
3582 enum {
3583 // use 0/nullptr as unset indication
3584 LEVEL_FIRST = 1,
3585 LEVEL_WAL = LEVEL_FIRST,
3586 LEVEL_DB,
3587 LEVEL_SLOW,
3588 LEVEL_MAX
3589 };
3590 // add +1 row for corresponding per-device totals
3591 // add +1 column for per-level actual (taken from file size) total
3592 typedef matrix_2d<uint64_t, BlueFS::MAX_BDEV + 1, LEVEL_MAX - LEVEL_FIRST + 1> per_level_per_dev_usage_t;
3593
3594 per_level_per_dev_usage_t per_level_per_dev_usage;
3595
3596 // Note: maximum per-device totals below might be smaller than corresponding
3597 // perf counters by up to a single alloc unit (1M) due to superblock extent.
3598 // The later is not accounted here.
3599 per_level_per_dev_usage_t per_level_per_dev_max;
3600
3601 uint64_t l_totals[LEVEL_MAX - LEVEL_FIRST];
3602 uint64_t db_avail4slow = 0;
3603 enum {
3604 OLD_POLICY,
3605 USE_SOME_EXTRA
3606 };
3607
3608 public:
3609 RocksDBBlueFSVolumeSelector(
3610 uint64_t _wal_total,
3611 uint64_t _db_total,
3612 uint64_t _slow_total,
3613 uint64_t _level0_size,
3614 uint64_t _level_base,
3615 uint64_t _level_multiplier,
3616 double reserved_factor,
3617 uint64_t reserved,
3618 bool new_pol)
3619 {
3620 l_totals[LEVEL_WAL - LEVEL_FIRST] = _wal_total;
3621 l_totals[LEVEL_DB - LEVEL_FIRST] = _db_total;
3622 l_totals[LEVEL_SLOW - LEVEL_FIRST] = _slow_total;
3623
3624 if (!new_pol) {
3625 return;
3626 }
3627
3628 // Calculating how much extra space is available at DB volume.
3629 // Depending on the presence of explicit reserved size specification it might be either
3630 // * DB volume size - reserved
3631 // or
3632 // * DB volume size - sum_max_level_size(0, L-1) - max_level_size(L) * reserved_factor
3633 if (!reserved) {
3634 uint64_t prev_levels = _level0_size;
3635 uint64_t cur_level = _level_base;
3636 uint64_t cur_threshold = 0;
3637 do {
3638 uint64_t next_level = cur_level * _level_multiplier;
3639 uint64_t next_threshold = prev_levels + cur_level + next_level * reserved_factor;
3640 if (_db_total <= next_threshold) {
3641 db_avail4slow = cur_threshold ? _db_total - cur_threshold : 0;
3642 break;
3643 } else {
3644 prev_levels += cur_level;
3645 cur_level = next_level;
3646 cur_threshold = next_threshold;
3647 }
3648 } while (true);
3649 } else {
3650 db_avail4slow = _db_total - reserved;
3651 }
3652 }
3653
3654 void* get_hint_by_device(uint8_t dev) const override {
3655 ceph_assert(dev == BlueFS::BDEV_WAL); // others aren't used atm
3656 return reinterpret_cast<void*>(LEVEL_WAL);
3657 }
3658 void* get_hint_by_dir(const string& dirname) const override;
3659
3660 void add_usage(void* hint, const bluefs_fnode_t& fnode) override {
3661 if (hint == nullptr)
3662 return;
3663 size_t pos = (size_t)hint - LEVEL_FIRST;
3664 for (auto& p : fnode.extents) {
3665 auto& cur = per_level_per_dev_usage.at(p.bdev, pos);
3666 auto& max = per_level_per_dev_max.at(p.bdev, pos);
3667 cur += p.length;
3668 if (cur > max) {
3669 max = cur;
3670 }
3671 {
3672 //update per-device totals
3673 auto& cur = per_level_per_dev_usage.at(p.bdev, LEVEL_MAX - LEVEL_FIRST);
3674 auto& max = per_level_per_dev_max.at(p.bdev, LEVEL_MAX - LEVEL_FIRST);
3675 cur += p.length;
3676 if (cur > max) {
3677 max = cur;
3678 }
3679 }
3680 }
3681 {
3682 //update per-level actual totals
3683 auto& cur = per_level_per_dev_usage.at(BlueFS::MAX_BDEV, pos);
3684 auto& max = per_level_per_dev_max.at(BlueFS::MAX_BDEV, pos);
3685 cur += fnode.size;
3686 if (cur > max) {
3687 max = cur;
3688 }
3689 }
3690 }
3691 void sub_usage(void* hint, const bluefs_fnode_t& fnode) override {
3692 if (hint == nullptr)
3693 return;
3694 size_t pos = (size_t)hint - LEVEL_FIRST;
3695 for (auto& p : fnode.extents) {
3696 auto& cur = per_level_per_dev_usage.at(p.bdev, pos);
3697 ceph_assert(cur >= p.length);
3698 cur -= p.length;
3699
3700 //update per-device totals
3701 auto& cur2 = per_level_per_dev_usage.at(p.bdev, LEVEL_MAX - LEVEL_FIRST);
3702 ceph_assert(cur2 >= p.length);
3703 cur2 -= p.length;
3704 }
3705 //update per-level actual totals
3706 auto& cur = per_level_per_dev_usage.at(BlueFS::MAX_BDEV, pos);
3707 ceph_assert(cur >= fnode.size);
3708 cur -= fnode.size;
3709 }
3710 void add_usage(void* hint, uint64_t fsize) override {
3711 if (hint == nullptr)
3712 return;
3713 size_t pos = (size_t)hint - LEVEL_FIRST;
3714 //update per-level actual totals
3715 auto& cur = per_level_per_dev_usage.at(BlueFS::MAX_BDEV, pos);
3716 auto& max = per_level_per_dev_max.at(BlueFS::MAX_BDEV, pos);
3717 cur += fsize;
3718 if (cur > max) {
3719 max = cur;
3720 }
3721 }
3722 void sub_usage(void* hint, uint64_t fsize) override {
3723 if (hint == nullptr)
3724 return;
3725 size_t pos = (size_t)hint - LEVEL_FIRST;
3726 //update per-level actual totals
3727 auto& cur = per_level_per_dev_usage.at(BlueFS::MAX_BDEV, pos);
3728 ceph_assert(cur >= fsize);
3729 per_level_per_dev_usage.at(BlueFS::MAX_BDEV, pos) -= fsize;
3730 }
3731
3732 uint8_t select_prefer_bdev(void* h) override;
3733 void get_paths(
3734 const std::string& base,
3735 BlueFSVolumeSelector::paths& res) const override;
3736
3737 void dump(ostream& sout) override;
3738 };
3739
3740 #endif