]> git.proxmox.com Git - ceph.git/blob - ceph/src/os/bluestore/BlueStore.h
5f1b84d9100ac97a22297487672ac44a27dfc66b
[ceph.git] / ceph / src / os / bluestore / BlueStore.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_OSD_BLUESTORE_H
16 #define CEPH_OSD_BLUESTORE_H
17
18 #include "acconfig.h"
19
20 #include <unistd.h>
21
22 #include <atomic>
23 #include <chrono>
24 #include <ratio>
25 #include <mutex>
26 #include <condition_variable>
27
28 #include <boost/intrusive/list.hpp>
29 #include <boost/intrusive/unordered_set.hpp>
30 #include <boost/intrusive/set.hpp>
31 #include <boost/functional/hash.hpp>
32 #include <boost/dynamic_bitset.hpp>
33 #include <boost/circular_buffer.hpp>
34
35 #include "include/cpp-btree/btree_set.h"
36
37 #include "include/ceph_assert.h"
38 #include "include/interval_set.h"
39 #include "include/unordered_map.h"
40 #include "include/mempool.h"
41 #include "include/hash.h"
42 #include "common/bloom_filter.hpp"
43 #include "common/Finisher.h"
44 #include "common/ceph_mutex.h"
45 #include "common/Throttle.h"
46 #include "common/perf_counters.h"
47 #include "common/PriorityCache.h"
48 #include "compressor/Compressor.h"
49 #include "os/ObjectStore.h"
50
51 #include "bluestore_types.h"
52 #include "BlueFS.h"
53 #include "common/EventTrace.h"
54
55 #ifdef WITH_BLKIN
56 #include "common/zipkin_trace.h"
57 #endif
58
59 class Allocator;
60 class FreelistManager;
61 class BlueStoreRepairer;
62 class SimpleBitmap;
63 //#define DEBUG_CACHE
64 //#define DEBUG_DEFERRED
65
66
67
68 // constants for Buffer::optimize()
69 #define MAX_BUFFER_SLOP_RATIO_DEN 8 // so actually 1/N
70 #define CEPH_BLUESTORE_TOOL_RESTORE_ALLOCATION
71
72 enum {
73 l_bluestore_first = 732430,
74 // space utilization stats
75 //****************************************
76 l_bluestore_allocated,
77 l_bluestore_stored,
78 l_bluestore_fragmentation,
79 l_bluestore_alloc_unit,
80 //****************************************
81
82 // Update op processing state latencies
83 //****************************************
84 l_bluestore_state_prepare_lat,
85 l_bluestore_state_aio_wait_lat,
86 l_bluestore_state_io_done_lat,
87 l_bluestore_state_kv_queued_lat,
88 l_bluestore_state_kv_committing_lat,
89 l_bluestore_state_kv_done_lat,
90 l_bluestore_state_finishing_lat,
91 l_bluestore_state_done_lat,
92
93 l_bluestore_state_deferred_queued_lat,
94 l_bluestore_state_deferred_aio_wait_lat,
95 l_bluestore_state_deferred_cleanup_lat,
96
97 l_bluestore_commit_lat,
98 //****************************************
99
100 // Update Transaction stats
101 //****************************************
102 l_bluestore_throttle_lat,
103 l_bluestore_submit_lat,
104 l_bluestore_txc,
105 //****************************************
106
107 // Read op stats
108 //****************************************
109 l_bluestore_read_onode_meta_lat,
110 l_bluestore_read_wait_aio_lat,
111 l_bluestore_csum_lat,
112 l_bluestore_read_eio,
113 l_bluestore_reads_with_retries,
114 l_bluestore_read_lat,
115 //****************************************
116
117 // kv_thread latencies
118 //****************************************
119 l_bluestore_kv_flush_lat,
120 l_bluestore_kv_commit_lat,
121 l_bluestore_kv_sync_lat,
122 l_bluestore_kv_final_lat,
123 //****************************************
124
125 // write op stats
126 //****************************************
127 l_bluestore_write_big,
128 l_bluestore_write_big_bytes,
129 l_bluestore_write_big_blobs,
130 l_bluestore_write_big_deferred,
131
132 l_bluestore_write_small,
133 l_bluestore_write_small_bytes,
134 l_bluestore_write_small_unused,
135 l_bluestore_write_small_pre_read,
136
137 l_bluestore_write_pad_bytes,
138 l_bluestore_write_penalty_read_ops,
139 l_bluestore_write_new,
140
141 l_bluestore_issued_deferred_writes,
142 l_bluestore_issued_deferred_write_bytes,
143 l_bluestore_submitted_deferred_writes,
144 l_bluestore_submitted_deferred_write_bytes,
145
146 l_bluestore_write_big_skipped_blobs,
147 l_bluestore_write_big_skipped_bytes,
148 l_bluestore_write_small_skipped,
149 l_bluestore_write_small_skipped_bytes,
150 //****************************************
151
152 // compressions stats
153 //****************************************
154 l_bluestore_compressed,
155 l_bluestore_compressed_allocated,
156 l_bluestore_compressed_original,
157 l_bluestore_compress_lat,
158 l_bluestore_decompress_lat,
159 l_bluestore_compress_success_count,
160 l_bluestore_compress_rejected_count,
161 //****************************************
162
163 // onode cache stats
164 //****************************************
165 l_bluestore_onodes,
166 l_bluestore_pinned_onodes,
167 l_bluestore_onode_hits,
168 l_bluestore_onode_misses,
169 l_bluestore_onode_shard_hits,
170 l_bluestore_onode_shard_misses,
171 l_bluestore_extents,
172 l_bluestore_blobs,
173 //****************************************
174
175 // buffer cache stats
176 //****************************************
177 l_bluestore_buffers,
178 l_bluestore_buffer_bytes,
179 l_bluestore_buffer_hit_bytes,
180 l_bluestore_buffer_miss_bytes,
181 //****************************************
182
183 // internal stats
184 //****************************************
185 l_bluestore_onode_reshard,
186 l_bluestore_blob_split,
187 l_bluestore_extent_compress,
188 l_bluestore_gc_merged,
189 //****************************************
190
191 // other client ops latencies
192 //****************************************
193 l_bluestore_omap_seek_to_first_lat,
194 l_bluestore_omap_upper_bound_lat,
195 l_bluestore_omap_lower_bound_lat,
196 l_bluestore_omap_next_lat,
197 l_bluestore_omap_get_keys_lat,
198 l_bluestore_omap_get_values_lat,
199 l_bluestore_omap_clear_lat,
200 l_bluestore_clist_lat,
201 l_bluestore_remove_lat,
202 l_bluestore_truncate_lat,
203 //****************************************
204
205 // allocation stats
206 //****************************************
207 l_bluestore_allocate_hist,
208 //****************************************
209 l_bluestore_last
210 };
211
212 #define META_POOL_ID ((uint64_t)-1ull)
213
214 class BlueStore : public ObjectStore,
215 public md_config_obs_t {
216 // -----------------------------------------------------
217 // types
218 public:
219 // config observer
220 const char** get_tracked_conf_keys() const override;
221 void handle_conf_change(const ConfigProxy& conf,
222 const std::set<std::string> &changed) override;
223
224 //handler for discard event
225 void handle_discard(interval_set<uint64_t>& to_release);
226
227 void _set_csum();
228 void _set_compression();
229 void _set_throttle_params();
230 int _set_cache_sizes();
231 void _set_max_defer_interval() {
232 max_defer_interval =
233 cct->_conf.get_val<double>("bluestore_max_defer_interval");
234 }
235
236 struct TransContext;
237
238 typedef std::map<uint64_t, ceph::buffer::list> ready_regions_t;
239
240
241 struct BufferSpace;
242 struct Collection;
243 typedef boost::intrusive_ptr<Collection> CollectionRef;
244
245 struct AioContext {
246 virtual void aio_finish(BlueStore *store) = 0;
247 virtual ~AioContext() {}
248 };
249
250 /// cached buffer
251 struct Buffer {
252 MEMPOOL_CLASS_HELPERS();
253
254 enum {
255 STATE_EMPTY, ///< empty buffer -- used for cache history
256 STATE_CLEAN, ///< clean data that is up to date
257 STATE_WRITING, ///< data that is being written (io not yet complete)
258 };
259 static const char *get_state_name(int s) {
260 switch (s) {
261 case STATE_EMPTY: return "empty";
262 case STATE_CLEAN: return "clean";
263 case STATE_WRITING: return "writing";
264 default: return "???";
265 }
266 }
267 enum {
268 FLAG_NOCACHE = 1, ///< trim when done WRITING (do not become CLEAN)
269 // NOTE: fix operator<< when you define a second flag
270 };
271 static const char *get_flag_name(int s) {
272 switch (s) {
273 case FLAG_NOCACHE: return "nocache";
274 default: return "???";
275 }
276 }
277
278 BufferSpace *space;
279 uint16_t state; ///< STATE_*
280 uint16_t cache_private = 0; ///< opaque (to us) value used by Cache impl
281 uint32_t flags; ///< FLAG_*
282 uint64_t seq;
283 uint32_t offset, length;
284 ceph::buffer::list data;
285 std::shared_ptr<int64_t> cache_age_bin; ///< cache age bin
286
287 boost::intrusive::list_member_hook<> lru_item;
288 boost::intrusive::list_member_hook<> state_item;
289
290 Buffer(BufferSpace *space, unsigned s, uint64_t q, uint32_t o, uint32_t l,
291 unsigned f = 0)
292 : space(space), state(s), flags(f), seq(q), offset(o), length(l) {}
293 Buffer(BufferSpace *space, unsigned s, uint64_t q, uint32_t o, ceph::buffer::list& b,
294 unsigned f = 0)
295 : space(space), state(s), flags(f), seq(q), offset(o),
296 length(b.length()), data(b) {}
297
298 bool is_empty() const {
299 return state == STATE_EMPTY;
300 }
301 bool is_clean() const {
302 return state == STATE_CLEAN;
303 }
304 bool is_writing() const {
305 return state == STATE_WRITING;
306 }
307
308 uint32_t end() const {
309 return offset + length;
310 }
311
312 void truncate(uint32_t newlen) {
313 ceph_assert(newlen < length);
314 if (data.length()) {
315 ceph::buffer::list t;
316 t.substr_of(data, 0, newlen);
317 data = std::move(t);
318 }
319 length = newlen;
320 }
321 void maybe_rebuild() {
322 if (data.length() &&
323 (data.get_num_buffers() > 1 ||
324 data.front().wasted() > data.length() / MAX_BUFFER_SLOP_RATIO_DEN)) {
325 data.rebuild();
326 }
327 }
328
329 void dump(ceph::Formatter *f) const {
330 f->dump_string("state", get_state_name(state));
331 f->dump_unsigned("seq", seq);
332 f->dump_unsigned("offset", offset);
333 f->dump_unsigned("length", length);
334 f->dump_unsigned("data_length", data.length());
335 }
336 };
337
338 struct BufferCacheShard;
339
340 /// map logical extent range (object) onto buffers
341 struct BufferSpace {
342 enum {
343 BYPASS_CLEAN_CACHE = 0x1, // bypass clean cache
344 };
345
346 typedef boost::intrusive::list<
347 Buffer,
348 boost::intrusive::member_hook<
349 Buffer,
350 boost::intrusive::list_member_hook<>,
351 &Buffer::state_item> > state_list_t;
352
353 mempool::bluestore_cache_meta::map<uint32_t, std::unique_ptr<Buffer>>
354 buffer_map;
355
356 // we use a bare intrusive list here instead of std::map because
357 // it uses less memory and we expect this to be very small (very
358 // few IOs in flight to the same Blob at the same time).
359 state_list_t writing; ///< writing buffers, sorted by seq, ascending
360
361 ~BufferSpace() {
362 ceph_assert(buffer_map.empty());
363 ceph_assert(writing.empty());
364 }
365
366 void _add_buffer(BufferCacheShard* cache, Buffer* b, int level, Buffer* near) {
367 cache->_audit("_add_buffer start");
368 buffer_map[b->offset].reset(b);
369 if (b->is_writing()) {
370 // we might get already cached data for which resetting mempool is inppropriate
371 // hence calling try_assign_to_mempool
372 b->data.try_assign_to_mempool(mempool::mempool_bluestore_writing);
373 if (writing.empty() || writing.rbegin()->seq <= b->seq) {
374 writing.push_back(*b);
375 } else {
376 auto it = writing.begin();
377 while (it->seq < b->seq) {
378 ++it;
379 }
380
381 ceph_assert(it->seq >= b->seq);
382 // note that this will insert b before it
383 // hence the order is maintained
384 writing.insert(it, *b);
385 }
386 } else {
387 b->data.reassign_to_mempool(mempool::mempool_bluestore_cache_data);
388 cache->_add(b, level, near);
389 }
390 cache->_audit("_add_buffer end");
391 }
392 void _rm_buffer(BufferCacheShard* cache, Buffer *b) {
393 _rm_buffer(cache, buffer_map.find(b->offset));
394 }
395 void _rm_buffer(BufferCacheShard* cache,
396 std::map<uint32_t, std::unique_ptr<Buffer>>::iterator p) {
397 ceph_assert(p != buffer_map.end());
398 cache->_audit("_rm_buffer start");
399 if (p->second->is_writing()) {
400 writing.erase(writing.iterator_to(*p->second));
401 } else {
402 cache->_rm(p->second.get());
403 }
404 buffer_map.erase(p);
405 cache->_audit("_rm_buffer end");
406 }
407
408 std::map<uint32_t,std::unique_ptr<Buffer>>::iterator _data_lower_bound(
409 uint32_t offset) {
410 auto i = buffer_map.lower_bound(offset);
411 if (i != buffer_map.begin()) {
412 --i;
413 if (i->first + i->second->length <= offset)
414 ++i;
415 }
416 return i;
417 }
418
419 // must be called under protection of the Cache lock
420 void _clear(BufferCacheShard* cache);
421
422 // return value is the highest cache_private of a trimmed buffer, or 0.
423 int discard(BufferCacheShard* cache, uint32_t offset, uint32_t length) {
424 std::lock_guard l(cache->lock);
425 int ret = _discard(cache, offset, length);
426 cache->_trim();
427 return ret;
428 }
429 int _discard(BufferCacheShard* cache, uint32_t offset, uint32_t length);
430
431 void write(BufferCacheShard* cache, uint64_t seq, uint32_t offset, ceph::buffer::list& bl,
432 unsigned flags) {
433 std::lock_guard l(cache->lock);
434 Buffer *b = new Buffer(this, Buffer::STATE_WRITING, seq, offset, bl,
435 flags);
436 b->cache_private = _discard(cache, offset, bl.length());
437 _add_buffer(cache, b, (flags & Buffer::FLAG_NOCACHE) ? 0 : 1, nullptr);
438 cache->_trim();
439 }
440 void _finish_write(BufferCacheShard* cache, uint64_t seq);
441 void did_read(BufferCacheShard* cache, uint32_t offset, ceph::buffer::list& bl) {
442 std::lock_guard l(cache->lock);
443 Buffer *b = new Buffer(this, Buffer::STATE_CLEAN, 0, offset, bl);
444 b->cache_private = _discard(cache, offset, bl.length());
445 _add_buffer(cache, b, 1, nullptr);
446 cache->_trim();
447 }
448
449 void read(BufferCacheShard* cache, uint32_t offset, uint32_t length,
450 BlueStore::ready_regions_t& res,
451 interval_set<uint32_t>& res_intervals,
452 int flags = 0);
453
454 void truncate(BufferCacheShard* cache, uint32_t offset) {
455 discard(cache, offset, (uint32_t)-1 - offset);
456 }
457
458 void split(BufferCacheShard* cache, size_t pos, BufferSpace &r);
459
460 void dump(BufferCacheShard* cache, ceph::Formatter *f) const {
461 std::lock_guard l(cache->lock);
462 f->open_array_section("buffers");
463 for (auto& i : buffer_map) {
464 f->open_object_section("buffer");
465 ceph_assert(i.first == i.second->offset);
466 i.second->dump(f);
467 f->close_section();
468 }
469 f->close_section();
470 }
471 };
472
473 struct SharedBlobSet;
474
475 /// in-memory shared blob state (incl cached buffers)
476 struct SharedBlob {
477 MEMPOOL_CLASS_HELPERS();
478
479 std::atomic_int nref = {0}; ///< reference count
480 bool loaded = false;
481
482 CollectionRef coll;
483 union {
484 uint64_t sbid_unloaded; ///< sbid if persistent isn't loaded
485 bluestore_shared_blob_t *persistent; ///< persistent part of the shared blob if any
486 };
487 BufferSpace bc; ///< buffer cache
488
489 SharedBlob(Collection *_coll) : coll(_coll), sbid_unloaded(0) {
490 if (get_cache()) {
491 get_cache()->add_blob();
492 }
493 }
494 SharedBlob(uint64_t i, Collection *_coll);
495 ~SharedBlob();
496
497 uint64_t get_sbid() const {
498 return loaded ? persistent->sbid : sbid_unloaded;
499 }
500
501 friend void intrusive_ptr_add_ref(SharedBlob *b) { b->get(); }
502 friend void intrusive_ptr_release(SharedBlob *b) { b->put(); }
503
504 void dump(ceph::Formatter* f) const;
505 friend std::ostream& operator<<(std::ostream& out, const SharedBlob& sb);
506
507 void get() {
508 ++nref;
509 }
510 void put();
511
512 /// get logical references
513 void get_ref(uint64_t offset, uint32_t length);
514
515 /// put logical references, and get back any released extents
516 void put_ref(uint64_t offset, uint32_t length,
517 PExtentVector *r, bool *unshare);
518
519 void finish_write(uint64_t seq);
520
521 friend bool operator==(const SharedBlob &l, const SharedBlob &r) {
522 return l.get_sbid() == r.get_sbid();
523 }
524 inline BufferCacheShard* get_cache() {
525 return coll ? coll->cache : nullptr;
526 }
527 inline SharedBlobSet* get_parent() {
528 return coll ? &(coll->shared_blob_set) : nullptr;
529 }
530 inline bool is_loaded() const {
531 return loaded;
532 }
533
534 };
535 typedef boost::intrusive_ptr<SharedBlob> SharedBlobRef;
536
537 /// a lookup table of SharedBlobs
538 struct SharedBlobSet {
539 /// protect lookup, insertion, removal
540 ceph::mutex lock = ceph::make_mutex("BlueStore::SharedBlobSet::lock");
541
542 // we use a bare pointer because we don't want to affect the ref
543 // count
544 mempool::bluestore_cache_meta::unordered_map<uint64_t,SharedBlob*> sb_map;
545
546 SharedBlobRef lookup(uint64_t sbid) {
547 std::lock_guard l(lock);
548 auto p = sb_map.find(sbid);
549 if (p == sb_map.end() ||
550 p->second->nref == 0) {
551 return nullptr;
552 }
553 return p->second;
554 }
555
556 void add(Collection* coll, SharedBlob *sb) {
557 std::lock_guard l(lock);
558 sb_map[sb->get_sbid()] = sb;
559 sb->coll = coll;
560 }
561
562 bool remove(SharedBlob *sb, bool verify_nref_is_zero=false) {
563 std::lock_guard l(lock);
564 ceph_assert(sb->get_parent() == this);
565 if (verify_nref_is_zero && sb->nref != 0) {
566 return false;
567 }
568 // only remove if it still points to us
569 auto p = sb_map.find(sb->get_sbid());
570 if (p != sb_map.end() &&
571 p->second == sb) {
572 sb_map.erase(p);
573 }
574 return true;
575 }
576
577 bool empty() {
578 std::lock_guard l(lock);
579 return sb_map.empty();
580 }
581
582 template <int LogLevelV>
583 void dump(CephContext *cct);
584 };
585
586 //#define CACHE_BLOB_BL // not sure if this is a win yet or not... :/
587
588 /// in-memory blob metadata and associated cached buffers (if any)
589 struct Blob {
590 MEMPOOL_CLASS_HELPERS();
591
592 std::atomic_int nref = {0}; ///< reference count
593 int16_t id = -1; ///< id, for spanning blobs only, >= 0
594 int16_t last_encoded_id = -1; ///< (ephemeral) used during encoding only
595 SharedBlobRef shared_blob; ///< shared blob state (if any)
596
597 private:
598 mutable bluestore_blob_t blob; ///< decoded blob metadata
599 #ifdef CACHE_BLOB_BL
600 mutable ceph::buffer::list blob_bl; ///< cached encoded blob, blob is dirty if empty
601 #endif
602 /// refs from this shard. ephemeral if id<0, persisted if spanning.
603 bluestore_blob_use_tracker_t used_in_blob;
604
605 public:
606
607 friend void intrusive_ptr_add_ref(Blob *b) { b->get(); }
608 friend void intrusive_ptr_release(Blob *b) { b->put(); }
609
610 void dump(ceph::Formatter* f) const;
611 friend std::ostream& operator<<(std::ostream& out, const Blob &b);
612
613 const bluestore_blob_use_tracker_t& get_blob_use_tracker() const {
614 return used_in_blob;
615 }
616 bool is_referenced() const {
617 return used_in_blob.is_not_empty();
618 }
619 uint32_t get_referenced_bytes() const {
620 return used_in_blob.get_referenced_bytes();
621 }
622
623 bool is_spanning() const {
624 return id >= 0;
625 }
626
627 bool can_split() const {
628 std::lock_guard l(shared_blob->get_cache()->lock);
629 // splitting a BufferSpace writing list is too hard; don't try.
630 return shared_blob->bc.writing.empty() &&
631 used_in_blob.can_split() &&
632 get_blob().can_split();
633 }
634
635 bool can_split_at(uint32_t blob_offset) const {
636 return used_in_blob.can_split_at(blob_offset) &&
637 get_blob().can_split_at(blob_offset);
638 }
639
640 bool can_reuse_blob(uint32_t min_alloc_size,
641 uint32_t target_blob_size,
642 uint32_t b_offset,
643 uint32_t *length0);
644
645 void dup(Blob& o) {
646 o.shared_blob = shared_blob;
647 o.blob = blob;
648 #ifdef CACHE_BLOB_BL
649 o.blob_bl = blob_bl;
650 #endif
651 }
652
653 inline const bluestore_blob_t& get_blob() const {
654 return blob;
655 }
656 inline bluestore_blob_t& dirty_blob() {
657 #ifdef CACHE_BLOB_BL
658 blob_bl.clear();
659 #endif
660 return blob;
661 }
662
663 /// discard buffers for unallocated regions
664 void discard_unallocated(Collection *coll);
665
666 /// get logical references
667 void get_ref(Collection *coll, uint32_t offset, uint32_t length);
668 /// put logical references, and get back any released extents
669 bool put_ref(Collection *coll, uint32_t offset, uint32_t length,
670 PExtentVector *r);
671
672 /// split the blob
673 void split(Collection *coll, uint32_t blob_offset, Blob *o);
674
675 void get() {
676 ++nref;
677 }
678 void put() {
679 if (--nref == 0)
680 delete this;
681 }
682
683
684 #ifdef CACHE_BLOB_BL
685 void _encode() const {
686 if (blob_bl.length() == 0 ) {
687 encode(blob, blob_bl);
688 } else {
689 ceph_assert(blob_bl.length());
690 }
691 }
692 void bound_encode(
693 size_t& p,
694 bool include_ref_map) const {
695 _encode();
696 p += blob_bl.length();
697 if (include_ref_map) {
698 used_in_blob.bound_encode(p);
699 }
700 }
701 void encode(
702 ceph::buffer::list::contiguous_appender& p,
703 bool include_ref_map) const {
704 _encode();
705 p.append(blob_bl);
706 if (include_ref_map) {
707 used_in_blob.encode(p);
708 }
709 }
710 void decode(
711 Collection */*coll*/,
712 ceph::buffer::ptr::const_iterator& p,
713 bool include_ref_map) {
714 const char *start = p.get_pos();
715 denc(blob, p);
716 const char *end = p.get_pos();
717 blob_bl.clear();
718 blob_bl.append(start, end - start);
719 if (include_ref_map) {
720 used_in_blob.decode(p);
721 }
722 }
723 #else
724 void bound_encode(
725 size_t& p,
726 uint64_t struct_v,
727 uint64_t sbid,
728 bool include_ref_map) const {
729 denc(blob, p, struct_v);
730 if (blob.is_shared()) {
731 denc(sbid, p);
732 }
733 if (include_ref_map) {
734 used_in_blob.bound_encode(p);
735 }
736 }
737 void encode(
738 ceph::buffer::list::contiguous_appender& p,
739 uint64_t struct_v,
740 uint64_t sbid,
741 bool include_ref_map) const {
742 denc(blob, p, struct_v);
743 if (blob.is_shared()) {
744 denc(sbid, p);
745 }
746 if (include_ref_map) {
747 used_in_blob.encode(p);
748 }
749 }
750 void decode(
751 Collection *coll,
752 ceph::buffer::ptr::const_iterator& p,
753 uint64_t struct_v,
754 uint64_t* sbid,
755 bool include_ref_map);
756 #endif
757 };
758 typedef boost::intrusive_ptr<Blob> BlobRef;
759 typedef mempool::bluestore_cache_meta::map<int,BlobRef> blob_map_t;
760
761 /// a logical extent, pointing to (some portion of) a blob
762 typedef boost::intrusive::set_base_hook<boost::intrusive::optimize_size<true> > ExtentBase; //making an alias to avoid build warnings
763 struct Extent : public ExtentBase {
764 MEMPOOL_CLASS_HELPERS();
765
766 uint32_t logical_offset = 0; ///< logical offset
767 uint32_t blob_offset = 0; ///< blob offset
768 uint32_t length = 0; ///< length
769 BlobRef blob; ///< the blob with our data
770
771 /// ctor for lookup only
772 explicit Extent(uint32_t lo) : ExtentBase(), logical_offset(lo) { }
773 /// ctor for delayed initialization (see decode_some())
774 explicit Extent() : ExtentBase() {
775 }
776 /// ctor for general usage
777 Extent(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b)
778 : ExtentBase(),
779 logical_offset(lo), blob_offset(o), length(l) {
780 assign_blob(b);
781 }
782 ~Extent() {
783 if (blob) {
784 blob->shared_blob->get_cache()->rm_extent();
785 }
786 }
787
788 void dump(ceph::Formatter* f) const;
789
790 void assign_blob(const BlobRef& b) {
791 ceph_assert(!blob);
792 blob = b;
793 blob->shared_blob->get_cache()->add_extent();
794 }
795
796 // comparators for intrusive_set
797 friend bool operator<(const Extent &a, const Extent &b) {
798 return a.logical_offset < b.logical_offset;
799 }
800 friend bool operator>(const Extent &a, const Extent &b) {
801 return a.logical_offset > b.logical_offset;
802 }
803 friend bool operator==(const Extent &a, const Extent &b) {
804 return a.logical_offset == b.logical_offset;
805 }
806
807 uint32_t blob_start() const {
808 return logical_offset - blob_offset;
809 }
810
811 uint32_t blob_end() const {
812 return blob_start() + blob->get_blob().get_logical_length();
813 }
814
815 uint32_t logical_end() const {
816 return logical_offset + length;
817 }
818
819 // return true if any piece of the blob is out of
820 // the given range [o, o + l].
821 bool blob_escapes_range(uint32_t o, uint32_t l) const {
822 return blob_start() < o || blob_end() > o + l;
823 }
824 };
825 typedef boost::intrusive::set<Extent> extent_map_t;
826
827
828 friend std::ostream& operator<<(std::ostream& out, const Extent& e);
829
830 struct OldExtent {
831 boost::intrusive::list_member_hook<> old_extent_item;
832 Extent e;
833 PExtentVector r;
834 bool blob_empty; // flag to track the last removed extent that makes blob
835 // empty - required to update compression stat properly
836 OldExtent(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b)
837 : e(lo, o, l, b), blob_empty(false) {
838 }
839 static OldExtent* create(CollectionRef c,
840 uint32_t lo,
841 uint32_t o,
842 uint32_t l,
843 BlobRef& b);
844 };
845 typedef boost::intrusive::list<
846 OldExtent,
847 boost::intrusive::member_hook<
848 OldExtent,
849 boost::intrusive::list_member_hook<>,
850 &OldExtent::old_extent_item> > old_extent_map_t;
851
852 struct Onode;
853
854 /// a sharded extent map, mapping offsets to lextents to blobs
855 struct ExtentMap {
856 Onode *onode;
857 extent_map_t extent_map; ///< map of Extents to Blobs
858 blob_map_t spanning_blob_map; ///< blobs that span shards
859 typedef boost::intrusive_ptr<Onode> OnodeRef;
860
861 struct Shard {
862 bluestore_onode_t::shard_info *shard_info = nullptr;
863 unsigned extents = 0; ///< count extents in this shard
864 bool loaded = false; ///< true if shard is loaded
865 bool dirty = false; ///< true if shard is dirty and needs reencoding
866 };
867 mempool::bluestore_cache_meta::vector<Shard> shards; ///< shards
868
869 ceph::buffer::list inline_bl; ///< cached encoded map, if unsharded; empty=>dirty
870
871 uint32_t needs_reshard_begin = 0;
872 uint32_t needs_reshard_end = 0;
873
874 void dup(BlueStore* b, TransContext*, CollectionRef&, OnodeRef&, OnodeRef&,
875 uint64_t&, uint64_t&, uint64_t&);
876
877 bool needs_reshard() const {
878 return needs_reshard_end > needs_reshard_begin;
879 }
880 void clear_needs_reshard() {
881 needs_reshard_begin = needs_reshard_end = 0;
882 }
883 void request_reshard(uint32_t begin, uint32_t end) {
884 if (begin < needs_reshard_begin) {
885 needs_reshard_begin = begin;
886 }
887 if (end > needs_reshard_end) {
888 needs_reshard_end = end;
889 }
890 }
891
892 struct DeleteDisposer {
893 void operator()(Extent *e) { delete e; }
894 };
895
896 ExtentMap(Onode *o);
897 ~ExtentMap() {
898 extent_map.clear_and_dispose(DeleteDisposer());
899 }
900
901 void clear() {
902 extent_map.clear_and_dispose(DeleteDisposer());
903 shards.clear();
904 inline_bl.clear();
905 clear_needs_reshard();
906 }
907
908 void dump(ceph::Formatter* f) const;
909
910 bool encode_some(uint32_t offset, uint32_t length, ceph::buffer::list& bl,
911 unsigned *pn);
912 unsigned decode_some(ceph::buffer::list& bl);
913
914 void bound_encode_spanning_blobs(size_t& p);
915 void encode_spanning_blobs(ceph::buffer::list::contiguous_appender& p);
916 void decode_spanning_blobs(ceph::buffer::ptr::const_iterator& p);
917
918 BlobRef get_spanning_blob(int id) {
919 auto p = spanning_blob_map.find(id);
920 ceph_assert(p != spanning_blob_map.end());
921 return p->second;
922 }
923
924 void update(KeyValueDB::Transaction t, bool force);
925 decltype(BlueStore::Blob::id) allocate_spanning_blob_id();
926 void reshard(
927 KeyValueDB *db,
928 KeyValueDB::Transaction t);
929
930 /// initialize Shards from the onode
931 void init_shards(bool loaded, bool dirty);
932
933 /// return index of shard containing offset
934 /// or -1 if not found
935 int seek_shard(uint32_t offset) {
936 size_t end = shards.size();
937 size_t mid, left = 0;
938 size_t right = end; // one passed the right end
939
940 while (left < right) {
941 mid = left + (right - left) / 2;
942 if (offset >= shards[mid].shard_info->offset) {
943 size_t next = mid + 1;
944 if (next >= end || offset < shards[next].shard_info->offset)
945 return mid;
946 //continue to search forwards
947 left = next;
948 } else {
949 //continue to search backwards
950 right = mid;
951 }
952 }
953
954 return -1; // not found
955 }
956
957 /// check if a range spans a shard
958 bool spans_shard(uint32_t offset, uint32_t length) {
959 if (shards.empty()) {
960 return false;
961 }
962 int s = seek_shard(offset);
963 ceph_assert(s >= 0);
964 if (s == (int)shards.size() - 1) {
965 return false; // last shard
966 }
967 if (offset + length <= shards[s+1].shard_info->offset) {
968 return false;
969 }
970 return true;
971 }
972
973 /// ensure that a range of the map is loaded
974 void fault_range(KeyValueDB *db,
975 uint32_t offset, uint32_t length);
976
977 /// ensure a range of the map is marked dirty
978 void dirty_range(uint32_t offset, uint32_t length);
979
980 /// for seek_lextent test
981 extent_map_t::iterator find(uint64_t offset);
982
983 /// seek to the first lextent including or after offset
984 extent_map_t::iterator seek_lextent(uint64_t offset);
985 extent_map_t::const_iterator seek_lextent(uint64_t offset) const;
986
987 /// add a new Extent
988 void add(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b) {
989 extent_map.insert(*new Extent(lo, o, l, b));
990 }
991
992 /// remove (and delete) an Extent
993 void rm(extent_map_t::iterator p) {
994 extent_map.erase_and_dispose(p, DeleteDisposer());
995 }
996
997 bool has_any_lextents(uint64_t offset, uint64_t length);
998
999 /// consolidate adjacent lextents in extent_map
1000 int compress_extent_map(uint64_t offset, uint64_t length);
1001
1002 /// punch a logical hole. add lextents to deref to target list.
1003 void punch_hole(CollectionRef &c,
1004 uint64_t offset, uint64_t length,
1005 old_extent_map_t *old_extents);
1006
1007 /// put new lextent into lextent_map overwriting existing ones if
1008 /// any and update references accordingly
1009 Extent *set_lextent(CollectionRef &c,
1010 uint64_t logical_offset,
1011 uint64_t offset, uint64_t length,
1012 BlobRef b,
1013 old_extent_map_t *old_extents);
1014
1015 /// split a blob (and referring extents)
1016 BlobRef split_blob(BlobRef lb, uint32_t blob_offset, uint32_t pos);
1017
1018 void provide_shard_info_to_onode(bufferlist v, uint32_t shard_id);
1019 };
1020
1021 /// Compressed Blob Garbage collector
1022 /*
1023 The primary idea of the collector is to estimate a difference between
1024 allocation units(AU) currently present for compressed blobs and new AUs
1025 required to store that data uncompressed.
1026 Estimation is performed for protrusive extents within a logical range
1027 determined by a concatenation of old_extents collection and specific(current)
1028 write request.
1029 The root cause for old_extents use is the need to handle blob ref counts
1030 properly. Old extents still hold blob refs and hence we need to traverse
1031 the collection to determine if blob to be released.
1032 Protrusive extents are extents that fit into the blob std::set in action
1033 (ones that are below the logical range from above) but not removed totally
1034 due to the current write.
1035 E.g. for
1036 extent1 <loffs = 100, boffs = 100, len = 100> ->
1037 blob1<compressed, len_on_disk=4096, logical_len=8192>
1038 extent2 <loffs = 200, boffs = 200, len = 100> ->
1039 blob2<raw, len_on_disk=4096, llen=4096>
1040 extent3 <loffs = 300, boffs = 300, len = 100> ->
1041 blob1<compressed, len_on_disk=4096, llen=8192>
1042 extent4 <loffs = 4096, boffs = 0, len = 100> ->
1043 blob3<raw, len_on_disk=4096, llen=4096>
1044 write(300~100)
1045 protrusive extents are within the following ranges <0~300, 400~8192-400>
1046 In this case existing AUs that might be removed due to GC (i.e. blob1)
1047 use 2x4K bytes.
1048 And new AUs expected after GC = 0 since extent1 to be merged into blob2.
1049 Hence we should do a collect.
1050 */
1051 class GarbageCollector
1052 {
1053 public:
1054 /// return amount of allocation units that might be saved due to GC
1055 int64_t estimate(
1056 uint64_t offset,
1057 uint64_t length,
1058 const ExtentMap& extent_map,
1059 const old_extent_map_t& old_extents,
1060 uint64_t min_alloc_size);
1061
1062 /// return a collection of extents to perform GC on
1063 const interval_set<uint64_t>& get_extents_to_collect() const {
1064 return extents_to_collect;
1065 }
1066 GarbageCollector(CephContext* _cct) : cct(_cct) {}
1067
1068 private:
1069 struct BlobInfo {
1070 uint64_t referenced_bytes = 0; ///< amount of bytes referenced in blob
1071 int64_t expected_allocations = 0; ///< new alloc units required
1072 ///< in case of gc fulfilled
1073 bool collect_candidate = false; ///< indicate if blob has any extents
1074 ///< eligible for GC.
1075 extent_map_t::const_iterator first_lextent; ///< points to the first
1076 ///< lextent referring to
1077 ///< the blob if any.
1078 ///< collect_candidate flag
1079 ///< determines the validity
1080 extent_map_t::const_iterator last_lextent; ///< points to the last
1081 ///< lextent referring to
1082 ///< the blob if any.
1083
1084 BlobInfo(uint64_t ref_bytes) :
1085 referenced_bytes(ref_bytes) {
1086 }
1087 };
1088 CephContext* cct;
1089 std::map<Blob*, BlobInfo> affected_blobs; ///< compressed blobs and their ref_map
1090 ///< copies that are affected by the
1091 ///< specific write
1092
1093 ///< protrusive extents that should be collected if GC takes place
1094 interval_set<uint64_t> extents_to_collect;
1095
1096 boost::optional<uint64_t > used_alloc_unit; ///< last processed allocation
1097 ///< unit when traversing
1098 ///< protrusive extents.
1099 ///< Other extents mapped to
1100 ///< this AU to be ignored
1101 ///< (except the case where
1102 ///< uncompressed extent follows
1103 ///< compressed one - see below).
1104 BlobInfo* blob_info_counted = nullptr; ///< std::set if previous allocation unit
1105 ///< caused expected_allocations
1106 ///< counter increment at this blob.
1107 ///< if uncompressed extent follows
1108 ///< a decrement for the
1109 ///< expected_allocations counter
1110 ///< is needed
1111 int64_t expected_allocations = 0; ///< new alloc units required in case
1112 ///< of gc fulfilled
1113 int64_t expected_for_release = 0; ///< alloc units currently used by
1114 ///< compressed blobs that might
1115 ///< gone after GC
1116
1117 protected:
1118 void process_protrusive_extents(const BlueStore::ExtentMap& extent_map,
1119 uint64_t start_offset,
1120 uint64_t end_offset,
1121 uint64_t start_touch_offset,
1122 uint64_t end_touch_offset,
1123 uint64_t min_alloc_size);
1124 };
1125
1126 struct OnodeSpace;
1127 /// an in-memory object
1128 struct Onode {
1129 MEMPOOL_CLASS_HELPERS();
1130
1131 std::atomic_int nref; ///< reference count
1132 std::atomic_int put_nref = {0};
1133 Collection *c;
1134 ghobject_t oid;
1135
1136 /// key under PREFIX_OBJ where we are stored
1137 mempool::bluestore_cache_meta::string key;
1138
1139 boost::intrusive::list_member_hook<> lru_item;
1140
1141 bluestore_onode_t onode; ///< metadata stored as value in kv store
1142 bool exists; ///< true if object logically exists
1143 bool cached; ///< Onode is logically in the cache
1144 /// (it can be pinned and hence physically out
1145 /// of it at the moment though)
1146 std::atomic_bool pinned; ///< Onode is pinned
1147 /// (or should be pinned when cached)
1148 ExtentMap extent_map;
1149
1150 // track txc's that have not been committed to kv store (and whose
1151 // effects cannot be read via the kvdb read methods)
1152 std::atomic<int> flushing_count = {0};
1153 std::atomic<int> waiting_count = {0};
1154 /// protect flush_txns
1155 ceph::mutex flush_lock = ceph::make_mutex("BlueStore::Onode::flush_lock");
1156 ceph::condition_variable flush_cond; ///< wait here for uncommitted txns
1157 std::shared_ptr<int64_t> cache_age_bin; ///< cache age bin
1158
1159 Onode(Collection *c, const ghobject_t& o,
1160 const mempool::bluestore_cache_meta::string& k)
1161 : nref(0),
1162 c(c),
1163 oid(o),
1164 key(k),
1165 exists(false),
1166 cached(false),
1167 pinned(false),
1168 extent_map(this) {
1169 }
1170 Onode(Collection* c, const ghobject_t& o,
1171 const std::string& k)
1172 : nref(0),
1173 c(c),
1174 oid(o),
1175 key(k),
1176 exists(false),
1177 cached(false),
1178 pinned(false),
1179 extent_map(this) {
1180 }
1181 Onode(Collection* c, const ghobject_t& o,
1182 const char* k)
1183 : nref(0),
1184 c(c),
1185 oid(o),
1186 key(k),
1187 exists(false),
1188 cached(false),
1189 pinned(false),
1190 extent_map(this) {
1191 }
1192
1193 static Onode* decode(
1194 CollectionRef c,
1195 const ghobject_t& oid,
1196 const std::string& key,
1197 const ceph::buffer::list& v);
1198
1199 void dump(ceph::Formatter* f) const;
1200
1201 void flush();
1202 void get();
1203 void put();
1204
1205 inline bool put_cache() {
1206 ceph_assert(!cached);
1207 cached = true;
1208 return !pinned;
1209 }
1210 inline bool pop_cache() {
1211 ceph_assert(cached);
1212 cached = false;
1213 return !pinned;
1214 }
1215
1216 static const std::string& calc_omap_prefix(uint8_t flags);
1217 static void calc_omap_header(uint8_t flags, const Onode* o,
1218 std::string* out);
1219 static void calc_omap_key(uint8_t flags, const Onode* o,
1220 const std::string& key, std::string* out);
1221 static void calc_omap_tail(uint8_t flags, const Onode* o,
1222 std::string* out);
1223
1224 const std::string& get_omap_prefix() {
1225 return calc_omap_prefix(onode.flags);
1226 }
1227 void get_omap_header(std::string* out) {
1228 calc_omap_header(onode.flags, this, out);
1229 }
1230 void get_omap_key(const std::string& key, std::string* out) {
1231 calc_omap_key(onode.flags, this, key, out);
1232 }
1233 void get_omap_tail(std::string* out) {
1234 calc_omap_tail(onode.flags, this, out);
1235 }
1236
1237 void rewrite_omap_key(const std::string& old, std::string *out);
1238 void decode_omap_key(const std::string& key, std::string *user_key);
1239
1240 #ifdef HAVE_LIBZBD
1241 // Return the offset of an object on disk. This function is intended *only*
1242 // for use with zoned storage devices because in these devices, the objects
1243 // are laid out contiguously on disk, which is not the case in general.
1244 // Also, it should always be called after calling extent_map.fault_range(),
1245 // so that the extent map is loaded.
1246 int64_t zoned_get_ondisk_starting_offset() const {
1247 return extent_map.extent_map.begin()->blob->
1248 get_blob().calc_offset(0, nullptr);
1249 }
1250 #endif
1251
1252 };
1253 typedef boost::intrusive_ptr<Onode> OnodeRef;
1254
1255 /// A generic Cache Shard
1256 struct CacheShard {
1257 CephContext *cct;
1258 PerfCounters *logger;
1259
1260 /// protect lru and other structures
1261 ceph::recursive_mutex lock = {
1262 ceph::make_recursive_mutex("BlueStore::CacheShard::lock") };
1263
1264 std::atomic<uint64_t> max = {0};
1265 std::atomic<uint64_t> num = {0};
1266 boost::circular_buffer<std::shared_ptr<int64_t>> age_bins;
1267
1268 CacheShard(CephContext* cct) : cct(cct), logger(nullptr), age_bins(1) {
1269 shift_bins();
1270 }
1271 virtual ~CacheShard() {}
1272
1273 void set_max(uint64_t max_) {
1274 max = max_;
1275 }
1276
1277 uint64_t _get_num() {
1278 return num;
1279 }
1280
1281 virtual void _trim_to(uint64_t new_size) = 0;
1282 void _trim() {
1283 if (cct->_conf->objectstore_blackhole) {
1284 // do not trim if we are throwing away IOs a layer down
1285 return;
1286 }
1287 _trim_to(max);
1288 }
1289
1290 void trim() {
1291 std::lock_guard l(lock);
1292 _trim();
1293 }
1294 void flush() {
1295 std::lock_guard l(lock);
1296 // we should not be shutting down after the blackhole is enabled
1297 ceph_assert(!cct->_conf->objectstore_blackhole);
1298 _trim_to(0);
1299 }
1300
1301 virtual void shift_bins() {
1302 std::lock_guard l(lock);
1303 age_bins.push_front(std::make_shared<int64_t>(0));
1304 }
1305 virtual uint32_t get_bin_count() {
1306 std::lock_guard l(lock);
1307 return age_bins.capacity();
1308 }
1309 virtual void set_bin_count(uint32_t count) {
1310 std::lock_guard l(lock);
1311 age_bins.set_capacity(count);
1312 }
1313 virtual uint64_t sum_bins(uint32_t start, uint32_t end) {
1314 std::lock_guard l(lock);
1315 auto size = age_bins.size();
1316 if (size < start) {
1317 return 0;
1318 }
1319 uint64_t count = 0;
1320 end = (size < end) ? size : end;
1321 for (auto i = start; i < end; i++) {
1322 count += *(age_bins[i]);
1323 }
1324 return count;
1325 }
1326
1327 #ifdef DEBUG_CACHE
1328 virtual void _audit(const char *s) = 0;
1329 #else
1330 void _audit(const char *s) { /* no-op */ }
1331 #endif
1332 };
1333
1334 /// A Generic onode Cache Shard
1335 struct OnodeCacheShard : public CacheShard {
1336 std::atomic<uint64_t> num_pinned = {0};
1337 std::array<std::pair<ghobject_t, ceph::mono_clock::time_point>, 64> dumped_onodes;
1338
1339 virtual void _pin(Onode* o) = 0;
1340 virtual void _unpin(Onode* o) = 0;
1341
1342 public:
1343 OnodeCacheShard(CephContext* cct) : CacheShard(cct) {}
1344 static OnodeCacheShard *create(CephContext* cct, std::string type,
1345 PerfCounters *logger);
1346 virtual void _add(Onode* o, int level) = 0;
1347 virtual void _rm(Onode* o) = 0;
1348 virtual void _unpin_and_rm(Onode* o) = 0;
1349
1350 virtual void move_pinned(OnodeCacheShard *to, Onode *o) = 0;
1351 virtual void add_stats(uint64_t *onodes, uint64_t *pinned_onodes) = 0;
1352 bool empty() {
1353 return _get_num() == 0;
1354 }
1355 };
1356
1357 /// A Generic buffer Cache Shard
1358 struct BufferCacheShard : public CacheShard {
1359 std::atomic<uint64_t> num_extents = {0};
1360 std::atomic<uint64_t> num_blobs = {0};
1361 uint64_t buffer_bytes = 0;
1362
1363 public:
1364 BufferCacheShard(CephContext* cct) : CacheShard(cct) {}
1365 static BufferCacheShard *create(CephContext* cct, std::string type,
1366 PerfCounters *logger);
1367 virtual void _add(Buffer *b, int level, Buffer *near) = 0;
1368 virtual void _rm(Buffer *b) = 0;
1369 virtual void _move(BufferCacheShard *src, Buffer *b) = 0;
1370 virtual void _touch(Buffer *b) = 0;
1371 virtual void _adjust_size(Buffer *b, int64_t delta) = 0;
1372
1373 uint64_t _get_bytes() {
1374 return buffer_bytes;
1375 }
1376
1377 void add_extent() {
1378 ++num_extents;
1379 }
1380 void rm_extent() {
1381 --num_extents;
1382 }
1383
1384 void add_blob() {
1385 ++num_blobs;
1386 }
1387 void rm_blob() {
1388 --num_blobs;
1389 }
1390
1391 virtual void add_stats(uint64_t *extents,
1392 uint64_t *blobs,
1393 uint64_t *buffers,
1394 uint64_t *bytes) = 0;
1395
1396 bool empty() {
1397 std::lock_guard l(lock);
1398 return _get_bytes() == 0;
1399 }
1400 };
1401
1402 struct OnodeSpace {
1403 OnodeCacheShard *cache;
1404
1405 private:
1406 /// forward lookups
1407 mempool::bluestore_cache_meta::unordered_map<ghobject_t,OnodeRef> onode_map;
1408
1409 friend struct Collection; // for split_cache()
1410 friend struct Onode; // for put()
1411 friend struct LruOnodeCacheShard;
1412 void _remove(const ghobject_t& oid);
1413 public:
1414 OnodeSpace(OnodeCacheShard *c) : cache(c) {}
1415 ~OnodeSpace() {
1416 clear();
1417 }
1418
1419 OnodeRef add(const ghobject_t& oid, OnodeRef& o);
1420 OnodeRef lookup(const ghobject_t& o);
1421 void rename(OnodeRef& o, const ghobject_t& old_oid,
1422 const ghobject_t& new_oid,
1423 const mempool::bluestore_cache_meta::string& new_okey);
1424 void clear();
1425 bool empty();
1426
1427 template <int LogLevelV>
1428 void dump(CephContext *cct);
1429
1430 /// return true if f true for any item
1431 bool map_any(std::function<bool(Onode*)> f);
1432 };
1433
1434 class OpSequencer;
1435 using OpSequencerRef = ceph::ref_t<OpSequencer>;
1436
1437 struct Collection : public CollectionImpl {
1438 BlueStore *store;
1439 OpSequencerRef osr;
1440 BufferCacheShard *cache; ///< our cache shard
1441 bluestore_cnode_t cnode;
1442 ceph::shared_mutex lock =
1443 ceph::make_shared_mutex("BlueStore::Collection::lock", true, false);
1444
1445 bool exists;
1446
1447 SharedBlobSet shared_blob_set; ///< open SharedBlobs
1448
1449 // cache onodes on a per-collection basis to avoid lock
1450 // contention.
1451 OnodeSpace onode_map;
1452
1453 //pool options
1454 pool_opts_t pool_opts;
1455 ContextQueue *commit_queue;
1456
1457 OnodeCacheShard* get_onode_cache() const {
1458 return onode_map.cache;
1459 }
1460 OnodeRef get_onode(const ghobject_t& oid, bool create, bool is_createop=false);
1461
1462 // the terminology is confusing here, sorry!
1463 //
1464 // blob_t shared_blob_t
1465 // !shared unused -> open
1466 // shared !loaded -> open + shared
1467 // shared loaded -> open + shared + loaded
1468 //
1469 // i.e.,
1470 // open = SharedBlob is instantiated
1471 // shared = blob_t shared flag is std::set; SharedBlob is hashed.
1472 // loaded = SharedBlob::shared_blob_t is loaded from kv store
1473 void open_shared_blob(uint64_t sbid, BlobRef b);
1474 void load_shared_blob(SharedBlobRef sb);
1475 void make_blob_shared(uint64_t sbid, BlobRef b);
1476 uint64_t make_blob_unshared(SharedBlob *sb);
1477
1478 BlobRef new_blob() {
1479 BlobRef b = new Blob();
1480 b->shared_blob = new SharedBlob(this);
1481 return b;
1482 }
1483
1484 bool contains(const ghobject_t& oid) {
1485 if (cid.is_meta())
1486 return oid.hobj.pool == -1;
1487 spg_t spgid;
1488 if (cid.is_pg(&spgid))
1489 return
1490 spgid.pgid.contains(cnode.bits, oid) &&
1491 oid.shard_id == spgid.shard;
1492 return false;
1493 }
1494
1495 int64_t pool() const {
1496 return cid.pool();
1497 }
1498
1499 void split_cache(Collection *dest);
1500
1501 bool flush_commit(Context *c) override;
1502 void flush() override;
1503 void flush_all_but_last();
1504
1505 Collection(BlueStore *ns, OnodeCacheShard *oc, BufferCacheShard *bc, coll_t c);
1506 };
1507
1508 class OmapIteratorImpl : public ObjectMap::ObjectMapIteratorImpl {
1509 CollectionRef c;
1510 OnodeRef o;
1511 KeyValueDB::Iterator it;
1512 std::string head, tail;
1513
1514 std::string _stringify() const;
1515
1516 public:
1517 OmapIteratorImpl(CollectionRef c, OnodeRef o, KeyValueDB::Iterator it);
1518 int seek_to_first() override;
1519 int upper_bound(const std::string &after) override;
1520 int lower_bound(const std::string &to) override;
1521 bool valid() override;
1522 int next() override;
1523 std::string key() override;
1524 ceph::buffer::list value() override;
1525 std::string tail_key() override {
1526 return tail;
1527 }
1528
1529 int status() override {
1530 return 0;
1531 }
1532 };
1533
1534 struct volatile_statfs{
1535 enum {
1536 STATFS_ALLOCATED = 0,
1537 STATFS_STORED,
1538 STATFS_COMPRESSED_ORIGINAL,
1539 STATFS_COMPRESSED,
1540 STATFS_COMPRESSED_ALLOCATED,
1541 STATFS_LAST
1542 };
1543 int64_t values[STATFS_LAST];
1544 volatile_statfs() {
1545 memset(this, 0, sizeof(volatile_statfs));
1546 }
1547 void reset() {
1548 *this = volatile_statfs();
1549 }
1550 void publish(store_statfs_t* buf) const {
1551 buf->allocated = allocated();
1552 buf->data_stored = stored();
1553 buf->data_compressed = compressed();
1554 buf->data_compressed_original = compressed_original();
1555 buf->data_compressed_allocated = compressed_allocated();
1556 }
1557
1558 volatile_statfs& operator+=(const volatile_statfs& other) {
1559 for (size_t i = 0; i < STATFS_LAST; ++i) {
1560 values[i] += other.values[i];
1561 }
1562 return *this;
1563 }
1564 int64_t& allocated() {
1565 return values[STATFS_ALLOCATED];
1566 }
1567 int64_t& stored() {
1568 return values[STATFS_STORED];
1569 }
1570 int64_t& compressed_original() {
1571 return values[STATFS_COMPRESSED_ORIGINAL];
1572 }
1573 int64_t& compressed() {
1574 return values[STATFS_COMPRESSED];
1575 }
1576 int64_t& compressed_allocated() {
1577 return values[STATFS_COMPRESSED_ALLOCATED];
1578 }
1579 int64_t allocated() const {
1580 return values[STATFS_ALLOCATED];
1581 }
1582 int64_t stored() const {
1583 return values[STATFS_STORED];
1584 }
1585 int64_t compressed_original() const {
1586 return values[STATFS_COMPRESSED_ORIGINAL];
1587 }
1588 int64_t compressed() const {
1589 return values[STATFS_COMPRESSED];
1590 }
1591 int64_t compressed_allocated() const {
1592 return values[STATFS_COMPRESSED_ALLOCATED];
1593 }
1594 volatile_statfs& operator=(const store_statfs_t& st) {
1595 values[STATFS_ALLOCATED] = st.allocated;
1596 values[STATFS_STORED] = st.data_stored;
1597 values[STATFS_COMPRESSED_ORIGINAL] = st.data_compressed_original;
1598 values[STATFS_COMPRESSED] = st.data_compressed;
1599 values[STATFS_COMPRESSED_ALLOCATED] = st.data_compressed_allocated;
1600 return *this;
1601 }
1602 bool is_empty() {
1603 return values[STATFS_ALLOCATED] == 0 &&
1604 values[STATFS_STORED] == 0 &&
1605 values[STATFS_COMPRESSED] == 0 &&
1606 values[STATFS_COMPRESSED_ORIGINAL] == 0 &&
1607 values[STATFS_COMPRESSED_ALLOCATED] == 0;
1608 }
1609 void decode(ceph::buffer::list::const_iterator& it) {
1610 using ceph::decode;
1611 for (size_t i = 0; i < STATFS_LAST; i++) {
1612 decode(values[i], it);
1613 }
1614 }
1615
1616 void encode(ceph::buffer::list& bl) {
1617 using ceph::encode;
1618 for (size_t i = 0; i < STATFS_LAST; i++) {
1619 encode(values[i], bl);
1620 }
1621 }
1622 };
1623
1624 struct TransContext final : public AioContext {
1625 MEMPOOL_CLASS_HELPERS();
1626
1627 typedef enum {
1628 STATE_PREPARE,
1629 STATE_AIO_WAIT,
1630 STATE_IO_DONE,
1631 STATE_KV_QUEUED, // queued for kv_sync_thread submission
1632 STATE_KV_SUBMITTED, // submitted to kv; not yet synced
1633 STATE_KV_DONE,
1634 STATE_DEFERRED_QUEUED, // in deferred_queue (pending or running)
1635 STATE_DEFERRED_CLEANUP, // remove deferred kv record
1636 STATE_DEFERRED_DONE,
1637 STATE_FINISHING,
1638 STATE_DONE,
1639 } state_t;
1640
1641 const char *get_state_name() {
1642 switch (state) {
1643 case STATE_PREPARE: return "prepare";
1644 case STATE_AIO_WAIT: return "aio_wait";
1645 case STATE_IO_DONE: return "io_done";
1646 case STATE_KV_QUEUED: return "kv_queued";
1647 case STATE_KV_SUBMITTED: return "kv_submitted";
1648 case STATE_KV_DONE: return "kv_done";
1649 case STATE_DEFERRED_QUEUED: return "deferred_queued";
1650 case STATE_DEFERRED_CLEANUP: return "deferred_cleanup";
1651 case STATE_DEFERRED_DONE: return "deferred_done";
1652 case STATE_FINISHING: return "finishing";
1653 case STATE_DONE: return "done";
1654 }
1655 return "???";
1656 }
1657
1658 #if defined(WITH_LTTNG)
1659 const char *get_state_latency_name(int state) {
1660 switch (state) {
1661 case l_bluestore_state_prepare_lat: return "prepare";
1662 case l_bluestore_state_aio_wait_lat: return "aio_wait";
1663 case l_bluestore_state_io_done_lat: return "io_done";
1664 case l_bluestore_state_kv_queued_lat: return "kv_queued";
1665 case l_bluestore_state_kv_committing_lat: return "kv_committing";
1666 case l_bluestore_state_kv_done_lat: return "kv_done";
1667 case l_bluestore_state_deferred_queued_lat: return "deferred_queued";
1668 case l_bluestore_state_deferred_cleanup_lat: return "deferred_cleanup";
1669 case l_bluestore_state_finishing_lat: return "finishing";
1670 case l_bluestore_state_done_lat: return "done";
1671 }
1672 return "???";
1673 }
1674 #endif
1675
1676 inline void set_state(state_t s) {
1677 state = s;
1678 #ifdef WITH_BLKIN
1679 if (trace) {
1680 trace.event(get_state_name());
1681 }
1682 #endif
1683 }
1684 inline state_t get_state() {
1685 return state;
1686 }
1687
1688 CollectionRef ch;
1689 OpSequencerRef osr; // this should be ch->osr
1690 boost::intrusive::list_member_hook<> sequencer_item;
1691
1692 uint64_t bytes = 0, ios = 0, cost = 0;
1693
1694 std::set<OnodeRef> onodes; ///< these need to be updated/written
1695 std::set<OnodeRef> modified_objects; ///< objects we modified (and need a ref)
1696
1697 #ifdef HAVE_LIBZBD
1698 // zone refs to add/remove. each zone ref is a (zone, offset) tuple. The offset
1699 // is the first offset in the zone that the onode touched; subsequent writes
1700 // to that zone do not generate additional refs. This is a bit imprecise but
1701 // is sufficient to generate reasonably sequential reads when doing zone
1702 // cleaning with less metadata than a ref for every extent.
1703 std::map<std::pair<OnodeRef, uint32_t>, uint64_t> new_zone_offset_refs;
1704 std::map<std::pair<OnodeRef, uint32_t>, uint64_t> old_zone_offset_refs;
1705 #endif
1706
1707 std::set<SharedBlobRef> shared_blobs; ///< these need to be updated/written
1708 std::set<SharedBlobRef> shared_blobs_written; ///< update these on io completion
1709
1710 KeyValueDB::Transaction t; ///< then we will commit this
1711 std::list<Context*> oncommits; ///< more commit completions
1712 std::list<CollectionRef> removed_collections; ///< colls we removed
1713
1714 boost::intrusive::list_member_hook<> deferred_queue_item;
1715 bluestore_deferred_transaction_t *deferred_txn = nullptr; ///< if any
1716
1717 interval_set<uint64_t> allocated, released;
1718 volatile_statfs statfs_delta; ///< overall store statistics delta
1719 uint64_t osd_pool_id = META_POOL_ID; ///< osd pool id we're operating on
1720
1721 IOContext ioc;
1722 bool had_ios = false; ///< true if we submitted IOs before our kv txn
1723
1724 uint64_t seq = 0;
1725 ceph::mono_clock::time_point start;
1726 ceph::mono_clock::time_point last_stamp;
1727
1728 uint64_t last_nid = 0; ///< if non-zero, highest new nid we allocated
1729 uint64_t last_blobid = 0; ///< if non-zero, highest new blobid we allocated
1730
1731 #if defined(WITH_LTTNG)
1732 bool tracing = false;
1733 #endif
1734
1735 #ifdef WITH_BLKIN
1736 ZTracer::Trace trace;
1737 #endif
1738
1739 explicit TransContext(CephContext* cct, Collection *c, OpSequencer *o,
1740 std::list<Context*> *on_commits)
1741 : ch(c),
1742 osr(o),
1743 ioc(cct, this),
1744 start(ceph::mono_clock::now()) {
1745 last_stamp = start;
1746 if (on_commits) {
1747 oncommits.swap(*on_commits);
1748 }
1749 }
1750 ~TransContext() {
1751 #ifdef WITH_BLKIN
1752 if (trace) {
1753 trace.event("txc destruct");
1754 }
1755 #endif
1756 delete deferred_txn;
1757 }
1758
1759 void write_onode(OnodeRef &o) {
1760 onodes.insert(o);
1761 }
1762 void write_shared_blob(SharedBlobRef &sb) {
1763 shared_blobs.insert(sb);
1764 }
1765 void unshare_blob(SharedBlob *sb) {
1766 shared_blobs.erase(sb);
1767 }
1768
1769 /// note we logically modified object (when onode itself is unmodified)
1770 void note_modified_object(OnodeRef &o) {
1771 // onode itself isn't written, though
1772 modified_objects.insert(o);
1773 }
1774 void note_removed_object(OnodeRef& o) {
1775 modified_objects.insert(o);
1776 onodes.erase(o);
1777 }
1778
1779 #ifdef HAVE_LIBZBD
1780 void note_write_zone_offset(OnodeRef& o, uint32_t zone, uint64_t offset) {
1781 o->onode.zone_offset_refs[zone] = offset;
1782 new_zone_offset_refs[std::make_pair(o, zone)] = offset;
1783 }
1784 void note_release_zone_offset(OnodeRef& o, uint32_t zone, uint64_t offset) {
1785 old_zone_offset_refs[std::make_pair(o, zone)] = offset;
1786 o->onode.zone_offset_refs.erase(zone);
1787 }
1788 #endif
1789
1790 void aio_finish(BlueStore *store) override {
1791 store->txc_aio_finish(this);
1792 }
1793 private:
1794 state_t state = STATE_PREPARE;
1795 };
1796
1797 class BlueStoreThrottle {
1798 #if defined(WITH_LTTNG)
1799 const std::chrono::time_point<ceph::mono_clock> time_base = ceph::mono_clock::now();
1800
1801 // Time of last chosen io (microseconds)
1802 std::atomic<uint64_t> previous_emitted_tp_time_mono_mcs = {0};
1803 std::atomic<uint64_t> ios_started_since_last_traced = {0};
1804 std::atomic<uint64_t> ios_completed_since_last_traced = {0};
1805
1806 std::atomic_uint pending_kv_ios = {0};
1807 std::atomic_uint pending_deferred_ios = {0};
1808
1809 // Min period between trace points (microseconds)
1810 std::atomic<uint64_t> trace_period_mcs = {0};
1811
1812 bool should_trace(
1813 uint64_t *started,
1814 uint64_t *completed) {
1815 uint64_t min_period_mcs = trace_period_mcs.load(
1816 std::memory_order_relaxed);
1817
1818 if (min_period_mcs == 0) {
1819 *started = 1;
1820 *completed = ios_completed_since_last_traced.exchange(0);
1821 return true;
1822 } else {
1823 ios_started_since_last_traced++;
1824 auto now_mcs = ceph::to_microseconds<uint64_t>(
1825 ceph::mono_clock::now() - time_base);
1826 uint64_t previous_mcs = previous_emitted_tp_time_mono_mcs;
1827 uint64_t period_mcs = now_mcs - previous_mcs;
1828 if (period_mcs > min_period_mcs) {
1829 if (previous_emitted_tp_time_mono_mcs.compare_exchange_strong(
1830 previous_mcs, now_mcs)) {
1831 // This would be racy at a sufficiently extreme trace rate, but isn't
1832 // worth the overhead of doing it more carefully.
1833 *started = ios_started_since_last_traced.exchange(0);
1834 *completed = ios_completed_since_last_traced.exchange(0);
1835 return true;
1836 }
1837 }
1838 return false;
1839 }
1840 }
1841 #endif
1842
1843 #if defined(WITH_LTTNG)
1844 void emit_initial_tracepoint(
1845 KeyValueDB &db,
1846 TransContext &txc,
1847 ceph::mono_clock::time_point);
1848 #else
1849 void emit_initial_tracepoint(
1850 KeyValueDB &db,
1851 TransContext &txc,
1852 ceph::mono_clock::time_point) {}
1853 #endif
1854
1855 Throttle throttle_bytes; ///< submit to commit
1856 Throttle throttle_deferred_bytes; ///< submit to deferred complete
1857
1858 public:
1859 BlueStoreThrottle(CephContext *cct) :
1860 throttle_bytes(cct, "bluestore_throttle_bytes", 0),
1861 throttle_deferred_bytes(cct, "bluestore_throttle_deferred_bytes", 0)
1862 {
1863 reset_throttle(cct->_conf);
1864 }
1865
1866 #if defined(WITH_LTTNG)
1867 void complete_kv(TransContext &txc);
1868 void complete(TransContext &txc);
1869 #else
1870 void complete_kv(TransContext &txc) {}
1871 void complete(TransContext &txc) {}
1872 #endif
1873
1874 ceph::mono_clock::duration log_state_latency(
1875 TransContext &txc, PerfCounters *logger, int state);
1876 bool try_start_transaction(
1877 KeyValueDB &db,
1878 TransContext &txc,
1879 ceph::mono_clock::time_point);
1880 void finish_start_transaction(
1881 KeyValueDB &db,
1882 TransContext &txc,
1883 ceph::mono_clock::time_point);
1884 void release_kv_throttle(uint64_t cost) {
1885 throttle_bytes.put(cost);
1886 }
1887 void release_deferred_throttle(uint64_t cost) {
1888 throttle_deferred_bytes.put(cost);
1889 }
1890 bool should_submit_deferred() {
1891 return throttle_deferred_bytes.past_midpoint();
1892 }
1893 void reset_throttle(const ConfigProxy &conf) {
1894 throttle_bytes.reset_max(conf->bluestore_throttle_bytes);
1895 throttle_deferred_bytes.reset_max(
1896 conf->bluestore_throttle_bytes +
1897 conf->bluestore_throttle_deferred_bytes);
1898 #if defined(WITH_LTTNG)
1899 double rate = conf.get_val<double>("bluestore_throttle_trace_rate");
1900 trace_period_mcs = rate > 0 ? floor((1/rate) * 1000000.0) : 0;
1901 #endif
1902 }
1903 } throttle;
1904
1905 typedef boost::intrusive::list<
1906 TransContext,
1907 boost::intrusive::member_hook<
1908 TransContext,
1909 boost::intrusive::list_member_hook<>,
1910 &TransContext::deferred_queue_item> > deferred_queue_t;
1911
1912 struct DeferredBatch final : public AioContext {
1913 OpSequencer *osr;
1914 struct deferred_io {
1915 ceph::buffer::list bl; ///< data
1916 uint64_t seq; ///< deferred transaction seq
1917 };
1918 std::map<uint64_t,deferred_io> iomap; ///< map of ios in this batch
1919 deferred_queue_t txcs; ///< txcs in this batch
1920 IOContext ioc; ///< our aios
1921 /// bytes of pending io for each deferred seq (may be 0)
1922 std::map<uint64_t,int> seq_bytes;
1923
1924 void _discard(CephContext *cct, uint64_t offset, uint64_t length);
1925 void _audit(CephContext *cct);
1926
1927 DeferredBatch(CephContext *cct, OpSequencer *osr)
1928 : osr(osr), ioc(cct, this) {}
1929
1930 /// prepare a write
1931 void prepare_write(CephContext *cct,
1932 uint64_t seq, uint64_t offset, uint64_t length,
1933 ceph::buffer::list::const_iterator& p);
1934
1935 void aio_finish(BlueStore *store) override {
1936 store->_deferred_aio_finish(osr);
1937 }
1938 };
1939
1940 class OpSequencer : public RefCountedObject {
1941 public:
1942 ceph::mutex qlock = ceph::make_mutex("BlueStore::OpSequencer::qlock");
1943 ceph::condition_variable qcond;
1944 typedef boost::intrusive::list<
1945 TransContext,
1946 boost::intrusive::member_hook<
1947 TransContext,
1948 boost::intrusive::list_member_hook<>,
1949 &TransContext::sequencer_item> > q_list_t;
1950 q_list_t q; ///< transactions
1951
1952 boost::intrusive::list_member_hook<> deferred_osr_queue_item;
1953
1954 DeferredBatch *deferred_running = nullptr;
1955 DeferredBatch *deferred_pending = nullptr;
1956
1957 ceph::mutex deferred_lock = ceph::make_mutex("BlueStore::OpSequencer::deferred_lock");
1958
1959 BlueStore *store;
1960 coll_t cid;
1961
1962 uint64_t last_seq = 0;
1963
1964 std::atomic_int txc_with_unstable_io = {0}; ///< num txcs with unstable io
1965
1966 std::atomic_int kv_committing_serially = {0};
1967
1968 std::atomic_int kv_submitted_waiters = {0};
1969
1970 std::atomic_bool zombie = {false}; ///< in zombie_osr std::set (collection going away)
1971
1972 const uint32_t sequencer_id;
1973
1974 uint32_t get_sequencer_id() const {
1975 return sequencer_id;
1976 }
1977
1978 void queue_new(TransContext *txc) {
1979 std::lock_guard l(qlock);
1980 txc->seq = ++last_seq;
1981 q.push_back(*txc);
1982 }
1983
1984 void drain() {
1985 std::unique_lock l(qlock);
1986 while (!q.empty())
1987 qcond.wait(l);
1988 }
1989
1990 void drain_preceding(TransContext *txc) {
1991 std::unique_lock l(qlock);
1992 while (&q.front() != txc)
1993 qcond.wait(l);
1994 }
1995
1996 bool _is_all_kv_submitted() {
1997 // caller must hold qlock & q.empty() must not empty
1998 ceph_assert(!q.empty());
1999 TransContext *txc = &q.back();
2000 if (txc->get_state() >= TransContext::STATE_KV_SUBMITTED) {
2001 return true;
2002 }
2003 return false;
2004 }
2005
2006 void flush() {
2007 std::unique_lock l(qlock);
2008 while (true) {
2009 // std::set flag before the check because the condition
2010 // may become true outside qlock, and we need to make
2011 // sure those threads see waiters and signal qcond.
2012 ++kv_submitted_waiters;
2013 if (q.empty() || _is_all_kv_submitted()) {
2014 --kv_submitted_waiters;
2015 return;
2016 }
2017 qcond.wait(l);
2018 --kv_submitted_waiters;
2019 }
2020 }
2021
2022 void flush_all_but_last() {
2023 std::unique_lock l(qlock);
2024 ceph_assert (q.size() >= 1);
2025 while (true) {
2026 // std::set flag before the check because the condition
2027 // may become true outside qlock, and we need to make
2028 // sure those threads see waiters and signal qcond.
2029 ++kv_submitted_waiters;
2030 if (q.size() <= 1) {
2031 --kv_submitted_waiters;
2032 return;
2033 } else {
2034 auto it = q.rbegin();
2035 it++;
2036 if (it->get_state() >= TransContext::STATE_KV_SUBMITTED) {
2037 --kv_submitted_waiters;
2038 return;
2039 }
2040 }
2041 qcond.wait(l);
2042 --kv_submitted_waiters;
2043 }
2044 }
2045
2046 bool flush_commit(Context *c) {
2047 std::lock_guard l(qlock);
2048 if (q.empty()) {
2049 return true;
2050 }
2051 TransContext *txc = &q.back();
2052 if (txc->get_state() >= TransContext::STATE_KV_DONE) {
2053 return true;
2054 }
2055 txc->oncommits.push_back(c);
2056 return false;
2057 }
2058 private:
2059 FRIEND_MAKE_REF(OpSequencer);
2060 OpSequencer(BlueStore *store, uint32_t sequencer_id, const coll_t& c)
2061 : RefCountedObject(store->cct),
2062 store(store), cid(c), sequencer_id(sequencer_id) {
2063 }
2064 ~OpSequencer() {
2065 ceph_assert(q.empty());
2066 }
2067 };
2068
2069 typedef boost::intrusive::list<
2070 OpSequencer,
2071 boost::intrusive::member_hook<
2072 OpSequencer,
2073 boost::intrusive::list_member_hook<>,
2074 &OpSequencer::deferred_osr_queue_item> > deferred_osr_queue_t;
2075
2076 struct KVSyncThread : public Thread {
2077 BlueStore *store;
2078 explicit KVSyncThread(BlueStore *s) : store(s) {}
2079 void *entry() override {
2080 store->_kv_sync_thread();
2081 return NULL;
2082 }
2083 };
2084 struct KVFinalizeThread : public Thread {
2085 BlueStore *store;
2086 explicit KVFinalizeThread(BlueStore *s) : store(s) {}
2087 void *entry() override {
2088 store->_kv_finalize_thread();
2089 return NULL;
2090 }
2091 };
2092
2093 #ifdef HAVE_LIBZBD
2094 struct ZonedCleanerThread : public Thread {
2095 BlueStore *store;
2096 explicit ZonedCleanerThread(BlueStore *s) : store(s) {}
2097 void *entry() override {
2098 store->_zoned_cleaner_thread();
2099 return nullptr;
2100 }
2101 };
2102 #endif
2103
2104 struct BigDeferredWriteContext {
2105 uint64_t off = 0; // original logical offset
2106 uint32_t b_off = 0; // blob relative offset
2107 uint32_t used = 0;
2108 uint64_t head_read = 0;
2109 uint64_t tail_read = 0;
2110 BlobRef blob_ref;
2111 uint64_t blob_start = 0;
2112 PExtentVector res_extents;
2113
2114 inline uint64_t blob_aligned_len() const {
2115 return used + head_read + tail_read;
2116 }
2117
2118 bool can_defer(BlueStore::extent_map_t::iterator ep,
2119 uint64_t prefer_deferred_size,
2120 uint64_t block_size,
2121 uint64_t offset,
2122 uint64_t l);
2123 bool apply_defer();
2124 };
2125
2126 bool has_null_fm();
2127 // --------------------------------------------------------
2128 // members
2129 private:
2130 BlueFS *bluefs = nullptr;
2131 bluefs_layout_t bluefs_layout;
2132 utime_t next_dump_on_bluefs_alloc_failure;
2133
2134 KeyValueDB *db = nullptr;
2135 BlockDevice *bdev = nullptr;
2136 std::string freelist_type;
2137 FreelistManager *fm = nullptr;
2138
2139 Allocator *alloc = nullptr; ///< allocator consumed by BlueStore
2140 bluefs_shared_alloc_context_t shared_alloc; ///< consumed by BlueFS (may be == alloc)
2141
2142 uuid_d fsid;
2143 int path_fd = -1; ///< open handle to $path
2144 int fsid_fd = -1; ///< open handle (locked) to $path/fsid
2145 bool mounted = false;
2146
2147 // store open_db options:
2148 bool db_was_opened_read_only = true;
2149 bool need_to_destage_allocation_file = false;
2150
2151 ///< rwlock to protect coll_map/new_coll_map
2152 ceph::shared_mutex coll_lock = ceph::make_shared_mutex("BlueStore::coll_lock");
2153 mempool::bluestore_cache_other::unordered_map<coll_t, CollectionRef> coll_map;
2154 bool collections_had_errors = false;
2155 std::map<coll_t,CollectionRef> new_coll_map;
2156
2157 std::vector<OnodeCacheShard*> onode_cache_shards;
2158 std::vector<BufferCacheShard*> buffer_cache_shards;
2159
2160 /// protect zombie_osr_set
2161 ceph::mutex zombie_osr_lock = ceph::make_mutex("BlueStore::zombie_osr_lock");
2162 uint32_t next_sequencer_id = 0;
2163 std::map<coll_t,OpSequencerRef> zombie_osr_set; ///< std::set of OpSequencers for deleted collections
2164
2165 std::atomic<uint64_t> nid_last = {0};
2166 std::atomic<uint64_t> nid_max = {0};
2167 std::atomic<uint64_t> blobid_last = {0};
2168 std::atomic<uint64_t> blobid_max = {0};
2169
2170 ceph::mutex deferred_lock = ceph::make_mutex("BlueStore::deferred_lock");
2171 ceph::mutex atomic_alloc_and_submit_lock =
2172 ceph::make_mutex("BlueStore::atomic_alloc_and_submit_lock");
2173 std::atomic<uint64_t> deferred_seq = {0};
2174 deferred_osr_queue_t deferred_queue; ///< osr's with deferred io pending
2175 std::atomic_int deferred_queue_size = {0}; ///< num txc's queued across all osrs
2176 std::atomic_int deferred_aggressive = {0}; ///< aggressive wakeup of kv thread
2177 Finisher finisher;
2178 utime_t deferred_last_submitted = utime_t();
2179
2180 KVSyncThread kv_sync_thread;
2181 ceph::mutex kv_lock = ceph::make_mutex("BlueStore::kv_lock");
2182 ceph::condition_variable kv_cond;
2183 bool _kv_only = false;
2184 bool kv_sync_started = false;
2185 bool kv_stop = false;
2186 bool kv_finalize_started = false;
2187 bool kv_finalize_stop = false;
2188 std::deque<TransContext*> kv_queue; ///< ready, already submitted
2189 std::deque<TransContext*> kv_queue_unsubmitted; ///< ready, need submit by kv thread
2190 std::deque<TransContext*> kv_committing; ///< currently syncing
2191 std::deque<DeferredBatch*> deferred_done_queue; ///< deferred ios done
2192 bool kv_sync_in_progress = false;
2193
2194 KVFinalizeThread kv_finalize_thread;
2195 ceph::mutex kv_finalize_lock = ceph::make_mutex("BlueStore::kv_finalize_lock");
2196 ceph::condition_variable kv_finalize_cond;
2197 std::deque<TransContext*> kv_committing_to_finalize; ///< pending finalization
2198 std::deque<DeferredBatch*> deferred_stable_to_finalize; ///< pending finalization
2199 bool kv_finalize_in_progress = false;
2200
2201 #ifdef HAVE_LIBZBD
2202 ZonedCleanerThread zoned_cleaner_thread;
2203 ceph::mutex zoned_cleaner_lock = ceph::make_mutex("BlueStore::zoned_cleaner_lock");
2204 ceph::condition_variable zoned_cleaner_cond;
2205 bool zoned_cleaner_started = false;
2206 bool zoned_cleaner_stop = false;
2207 std::deque<uint64_t> zoned_cleaner_queue;
2208 #endif
2209
2210 PerfCounters *logger = nullptr;
2211
2212 std::list<CollectionRef> removed_collections;
2213
2214 ceph::shared_mutex debug_read_error_lock =
2215 ceph::make_shared_mutex("BlueStore::debug_read_error_lock");
2216 std::set<ghobject_t> debug_data_error_objects;
2217 std::set<ghobject_t> debug_mdata_error_objects;
2218
2219 std::atomic<int> csum_type = {Checksummer::CSUM_CRC32C};
2220
2221 uint64_t block_size = 0; ///< block size of block device (power of 2)
2222 uint64_t block_mask = 0; ///< mask to get just the block offset
2223 size_t block_size_order = 0; ///< bits to shift to get block size
2224 uint64_t optimal_io_size = 0;///< best performance io size for block device
2225
2226 uint64_t min_alloc_size; ///< minimum allocation unit (power of 2)
2227 uint8_t min_alloc_size_order = 0;///< bits to shift to get min_alloc_size
2228 uint64_t min_alloc_size_mask;///< mask for fast checking of allocation alignment
2229 static_assert(std::numeric_limits<uint8_t>::max() >
2230 std::numeric_limits<decltype(min_alloc_size)>::digits,
2231 "not enough bits for min_alloc_size");
2232
2233 // smr-only
2234 uint64_t zone_size = 0; ///< number of SMR zones
2235 uint64_t first_sequential_zone = 0; ///< first SMR zone that is sequential-only
2236
2237 enum {
2238 // Please preserve the order since it's DB persistent
2239 OMAP_BULK = 0,
2240 OMAP_PER_POOL = 1,
2241 OMAP_PER_PG = 2,
2242 } per_pool_omap = OMAP_BULK;
2243
2244 ///< maximum allocation unit (power of 2)
2245 std::atomic<uint64_t> max_alloc_size = {0};
2246
2247 ///< number threshold for forced deferred writes
2248 std::atomic<int> deferred_batch_ops = {0};
2249
2250 ///< size threshold for forced deferred writes
2251 std::atomic<uint64_t> prefer_deferred_size = {0};
2252
2253 ///< approx cost per io, in bytes
2254 std::atomic<uint64_t> throttle_cost_per_io = {0};
2255
2256 std::atomic<Compressor::CompressionMode> comp_mode =
2257 {Compressor::COMP_NONE}; ///< compression mode
2258 CompressorRef compressor;
2259 std::atomic<uint64_t> comp_min_blob_size = {0};
2260 std::atomic<uint64_t> comp_max_blob_size = {0};
2261
2262 std::atomic<uint64_t> max_blob_size = {0}; ///< maximum blob size
2263
2264 uint64_t kv_ios = 0;
2265 uint64_t kv_throttle_costs = 0;
2266
2267 // cache trim control
2268 uint64_t cache_size = 0; ///< total cache size
2269 double cache_meta_ratio = 0; ///< cache ratio dedicated to metadata
2270 double cache_kv_ratio = 0; ///< cache ratio dedicated to kv (e.g., rocksdb)
2271 double cache_kv_onode_ratio = 0; ///< cache ratio dedicated to kv onodes (e.g., rocksdb onode CF)
2272 double cache_data_ratio = 0; ///< cache ratio dedicated to object data
2273 bool cache_autotune = false; ///< cache autotune setting
2274 double cache_age_bin_interval = 0; ///< time to wait between cache age bin rotations
2275 double cache_autotune_interval = 0; ///< time to wait between cache rebalancing
2276 std::vector<uint64_t> kv_bins; ///< kv autotune bins
2277 std::vector<uint64_t> kv_onode_bins; ///< kv onode autotune bins
2278 std::vector<uint64_t> meta_bins; ///< meta autotune bins
2279 std::vector<uint64_t> data_bins; ///< data autotune bins
2280 uint64_t osd_memory_target = 0; ///< OSD memory target when autotuning cache
2281 uint64_t osd_memory_base = 0; ///< OSD base memory when autotuning cache
2282 double osd_memory_expected_fragmentation = 0; ///< expected memory fragmentation
2283 uint64_t osd_memory_cache_min = 0; ///< Min memory to assign when autotuning cache
2284 double osd_memory_cache_resize_interval = 0; ///< Time to wait between cache resizing
2285 double max_defer_interval = 0; ///< Time to wait between last deferred submit
2286 std::atomic<uint32_t> config_changed = {0}; ///< Counter to determine if there is a configuration change.
2287
2288 typedef std::map<uint64_t, volatile_statfs> osd_pools_map;
2289
2290 ceph::mutex vstatfs_lock = ceph::make_mutex("BlueStore::vstatfs_lock");
2291 volatile_statfs vstatfs;
2292 osd_pools_map osd_pools; // protected by vstatfs_lock as well
2293
2294 bool per_pool_stat_collection = true;
2295
2296 struct MempoolThread : public Thread {
2297 public:
2298 BlueStore *store;
2299
2300 ceph::condition_variable cond;
2301 ceph::mutex lock = ceph::make_mutex("BlueStore::MempoolThread::lock");
2302 bool stop = false;
2303 std::shared_ptr<PriorityCache::PriCache> binned_kv_cache = nullptr;
2304 std::shared_ptr<PriorityCache::PriCache> binned_kv_onode_cache = nullptr;
2305 std::shared_ptr<PriorityCache::Manager> pcm = nullptr;
2306
2307 struct MempoolCache : public PriorityCache::PriCache {
2308 BlueStore *store;
2309 uint64_t bins[PriorityCache::Priority::LAST+1] = {0};
2310 int64_t cache_bytes[PriorityCache::Priority::LAST+1] = {0};
2311 int64_t committed_bytes = 0;
2312 double cache_ratio = 0;
2313
2314 MempoolCache(BlueStore *s) : store(s) {};
2315
2316 virtual uint64_t _get_used_bytes() const = 0;
2317 virtual uint64_t _sum_bins(uint32_t start, uint32_t end) const = 0;
2318
2319 virtual int64_t request_cache_bytes(
2320 PriorityCache::Priority pri, uint64_t total_cache) const {
2321 int64_t assigned = get_cache_bytes(pri);
2322
2323 switch (pri) {
2324 case PriorityCache::Priority::PRI0:
2325 {
2326 // BlueStore caches currently don't put anything in PRI0
2327 break;
2328 }
2329 case PriorityCache::Priority::LAST:
2330 {
2331 uint32_t max = get_bin_count();
2332 int64_t request = _get_used_bytes() - _sum_bins(0, max);
2333 return(request > assigned) ? request - assigned : 0;
2334 }
2335 default:
2336 {
2337 ceph_assert(pri > 0 && pri < PriorityCache::Priority::LAST);
2338 auto prev_pri = static_cast<PriorityCache::Priority>(pri - 1);
2339 uint64_t start = get_bins(prev_pri);
2340 uint64_t end = get_bins(pri);
2341 int64_t request = _sum_bins(start, end);
2342 return(request > assigned) ? request - assigned : 0;
2343 }
2344 }
2345 return -EOPNOTSUPP;
2346 }
2347
2348 virtual int64_t get_cache_bytes(PriorityCache::Priority pri) const {
2349 return cache_bytes[pri];
2350 }
2351 virtual int64_t get_cache_bytes() const {
2352 int64_t total = 0;
2353
2354 for (int i = 0; i < PriorityCache::Priority::LAST + 1; i++) {
2355 PriorityCache::Priority pri = static_cast<PriorityCache::Priority>(i);
2356 total += get_cache_bytes(pri);
2357 }
2358 return total;
2359 }
2360 virtual void set_cache_bytes(PriorityCache::Priority pri, int64_t bytes) {
2361 cache_bytes[pri] = bytes;
2362 }
2363 virtual void add_cache_bytes(PriorityCache::Priority pri, int64_t bytes) {
2364 cache_bytes[pri] += bytes;
2365 }
2366 virtual int64_t commit_cache_size(uint64_t total_cache) {
2367 committed_bytes = PriorityCache::get_chunk(
2368 get_cache_bytes(), total_cache);
2369 return committed_bytes;
2370 }
2371 virtual int64_t get_committed_size() const {
2372 return committed_bytes;
2373 }
2374 virtual uint64_t get_bins(PriorityCache::Priority pri) const {
2375 if (pri > PriorityCache::Priority::PRI0 &&
2376 pri < PriorityCache::Priority::LAST) {
2377 return bins[pri];
2378 }
2379 return 0;
2380 }
2381 virtual void set_bins(PriorityCache::Priority pri, uint64_t end_bin) {
2382 if (pri <= PriorityCache::Priority::PRI0 ||
2383 pri >= PriorityCache::Priority::LAST) {
2384 return;
2385 }
2386 bins[pri] = end_bin;
2387 uint64_t max = 0;
2388 for (int pri = 1; pri < PriorityCache::Priority::LAST; pri++) {
2389 if (bins[pri] > max) {
2390 max = bins[pri];
2391 }
2392 }
2393 set_bin_count(max);
2394 }
2395 virtual void import_bins(const std::vector<uint64_t> &bins_v) {
2396 uint64_t max = 0;
2397 for (int pri = 1; pri < PriorityCache::Priority::LAST; pri++) {
2398 unsigned i = (unsigned) pri - 1;
2399 if (i < bins_v.size()) {
2400 bins[pri] = bins_v[i];
2401 if (bins[pri] > max) {
2402 max = bins[pri];
2403 }
2404 } else {
2405 bins[pri] = 0;
2406 }
2407 }
2408 set_bin_count(max);
2409 }
2410 virtual double get_cache_ratio() const {
2411 return cache_ratio;
2412 }
2413 virtual void set_cache_ratio(double ratio) {
2414 cache_ratio = ratio;
2415 }
2416 virtual std::string get_cache_name() const = 0;
2417 virtual uint32_t get_bin_count() const = 0;
2418 virtual void set_bin_count(uint32_t count) = 0;
2419 };
2420
2421 struct MetaCache : public MempoolCache {
2422 MetaCache(BlueStore *s) : MempoolCache(s) {};
2423
2424 virtual uint32_t get_bin_count() const {
2425 return store->onode_cache_shards[0]->get_bin_count();
2426 }
2427 virtual void set_bin_count(uint32_t count) {
2428 for (auto i : store->onode_cache_shards) {
2429 i->set_bin_count(count);
2430 }
2431 }
2432 virtual uint64_t _get_used_bytes() const {
2433 return mempool::bluestore_Buffer::allocated_bytes() +
2434 mempool::bluestore_Blob::allocated_bytes() +
2435 mempool::bluestore_Extent::allocated_bytes() +
2436 mempool::bluestore_cache_meta::allocated_bytes() +
2437 mempool::bluestore_cache_other::allocated_bytes() +
2438 mempool::bluestore_cache_onode::allocated_bytes() +
2439 mempool::bluestore_SharedBlob::allocated_bytes() +
2440 mempool::bluestore_inline_bl::allocated_bytes();
2441 }
2442 virtual void shift_bins() {
2443 for (auto i : store->onode_cache_shards) {
2444 i->shift_bins();
2445 }
2446 }
2447 virtual uint64_t _sum_bins(uint32_t start, uint32_t end) const {
2448 uint64_t onodes = 0;
2449 for (auto i : store->onode_cache_shards) {
2450 onodes += i->sum_bins(start, end);
2451 }
2452 return onodes*get_bytes_per_onode();
2453 }
2454 virtual std::string get_cache_name() const {
2455 return "BlueStore Meta Cache";
2456 }
2457 uint64_t _get_num_onodes() const {
2458 uint64_t onode_num =
2459 mempool::bluestore_cache_onode::allocated_items();
2460 return (2 > onode_num) ? 2 : onode_num;
2461 }
2462 double get_bytes_per_onode() const {
2463 return (double)_get_used_bytes() / (double)_get_num_onodes();
2464 }
2465 };
2466 std::shared_ptr<MetaCache> meta_cache;
2467
2468 struct DataCache : public MempoolCache {
2469 DataCache(BlueStore *s) : MempoolCache(s) {};
2470
2471 virtual uint32_t get_bin_count() const {
2472 return store->buffer_cache_shards[0]->get_bin_count();
2473 }
2474 virtual void set_bin_count(uint32_t count) {
2475 for (auto i : store->buffer_cache_shards) {
2476 i->set_bin_count(count);
2477 }
2478 }
2479 virtual uint64_t _get_used_bytes() const {
2480 uint64_t bytes = 0;
2481 for (auto i : store->buffer_cache_shards) {
2482 bytes += i->_get_bytes();
2483 }
2484 return bytes;
2485 }
2486 virtual void shift_bins() {
2487 for (auto i : store->buffer_cache_shards) {
2488 i->shift_bins();
2489 }
2490 }
2491 virtual uint64_t _sum_bins(uint32_t start, uint32_t end) const {
2492 uint64_t bytes = 0;
2493 for (auto i : store->buffer_cache_shards) {
2494 bytes += i->sum_bins(start, end);
2495 }
2496 return bytes;
2497 }
2498 virtual std::string get_cache_name() const {
2499 return "BlueStore Data Cache";
2500 }
2501 };
2502 std::shared_ptr<DataCache> data_cache;
2503
2504 public:
2505 explicit MempoolThread(BlueStore *s)
2506 : store(s),
2507 meta_cache(new MetaCache(s)),
2508 data_cache(new DataCache(s)) {}
2509
2510 void *entry() override;
2511 void init() {
2512 ceph_assert(stop == false);
2513 create("bstore_mempool");
2514 }
2515 void shutdown() {
2516 lock.lock();
2517 stop = true;
2518 cond.notify_all();
2519 lock.unlock();
2520 join();
2521 }
2522
2523 private:
2524 void _update_cache_settings();
2525 void _resize_shards(bool interval_stats);
2526 } mempool_thread;
2527
2528 #ifdef WITH_BLKIN
2529 ZTracer::Endpoint trace_endpoint {"0.0.0.0", 0, "BlueStore"};
2530 #endif
2531
2532 // --------------------------------------------------------
2533 // private methods
2534
2535 void _init_logger();
2536 void _shutdown_logger();
2537 int _reload_logger();
2538
2539 int _open_path();
2540 void _close_path();
2541 int _open_fsid(bool create);
2542 int _lock_fsid();
2543 int _read_fsid(uuid_d *f);
2544 int _write_fsid();
2545 void _close_fsid();
2546 void _set_alloc_sizes();
2547 void _set_blob_size();
2548 void _set_finisher_num();
2549 void _set_per_pool_omap();
2550 void _update_osd_memory_options();
2551
2552 int _open_bdev(bool create);
2553 // Verifies if disk space is enough for reserved + min bluefs
2554 // and alters the latter if needed.
2555 // Depends on min_alloc_size hence should be called after
2556 // its initialization (and outside of _open_bdev)
2557 void _validate_bdev();
2558 void _close_bdev();
2559
2560 int _minimal_open_bluefs(bool create);
2561 void _minimal_close_bluefs();
2562 int _open_bluefs(bool create, bool read_only);
2563 void _close_bluefs();
2564
2565 int _is_bluefs(bool create, bool* ret);
2566 /*
2567 * opens both DB and dependant super_meta, FreelistManager and allocator
2568 * in the proper order
2569 */
2570 int _open_db_and_around(bool read_only, bool to_repair = false);
2571 void _close_db_and_around();
2572
2573 int _prepare_db_environment(bool create, bool read_only,
2574 std::string* kv_dir, std::string* kv_backend);
2575
2576 /*
2577 * @warning to_repair_db means that we open this db to repair it, will not
2578 * hold the rocksdb's file lock.
2579 */
2580 int _open_db(bool create,
2581 bool to_repair_db=false,
2582 bool read_only = false);
2583 void _close_db();
2584 void _close_db_leave_bluefs();
2585 int _open_fm(KeyValueDB::Transaction t, bool read_only, bool fm_restore = false);
2586 void _close_fm();
2587 int _write_out_fm_meta(uint64_t target_size);
2588 int _create_alloc();
2589 int _init_alloc(std::map<uint64_t, uint64_t> *zone_adjustments);
2590 void _post_init_alloc(const std::map<uint64_t, uint64_t>& zone_adjustments);
2591 void _close_alloc();
2592 int _open_collections();
2593 void _fsck_collections(int64_t* errors);
2594 void _close_collections();
2595
2596 int _setup_block_symlink_or_file(std::string name, std::string path, uint64_t size,
2597 bool create);
2598
2599 public:
2600 utime_t get_deferred_last_submitted() {
2601 std::lock_guard l(deferred_lock);
2602 return deferred_last_submitted;
2603 }
2604
2605 static int _write_bdev_label(CephContext* cct,
2606 const std::string &path, bluestore_bdev_label_t label);
2607 static int _read_bdev_label(CephContext* cct, const std::string &path,
2608 bluestore_bdev_label_t *label);
2609 private:
2610 int _check_or_set_bdev_label(std::string path, uint64_t size, std::string desc,
2611 bool create);
2612 int _set_bdev_label_size(const std::string& path, uint64_t size);
2613
2614 int _open_super_meta();
2615
2616 void _open_statfs();
2617 void _get_statfs_overall(struct store_statfs_t *buf);
2618
2619 void _dump_alloc_on_failure();
2620
2621 CollectionRef _get_collection(const coll_t& cid);
2622 CollectionRef _get_collection_by_oid(const ghobject_t& oid);
2623 void _queue_reap_collection(CollectionRef& c);
2624 void _reap_collections();
2625 void _update_cache_logger();
2626
2627 void _assign_nid(TransContext *txc, OnodeRef o);
2628 uint64_t _assign_blobid(TransContext *txc);
2629
2630 template <int LogLevelV>
2631 friend void _dump_onode(CephContext *cct, const Onode& o);
2632 template <int LogLevelV>
2633 friend void _dump_extent_map(CephContext *cct, const ExtentMap& em);
2634 template <int LogLevelV>
2635 friend void _dump_transaction(CephContext *cct, Transaction *t);
2636
2637 TransContext *_txc_create(Collection *c, OpSequencer *osr,
2638 std::list<Context*> *on_commits,
2639 TrackedOpRef osd_op=TrackedOpRef());
2640 void _txc_update_store_statfs(TransContext *txc);
2641 void _txc_add_transaction(TransContext *txc, Transaction *t);
2642 void _txc_calc_cost(TransContext *txc);
2643 void _txc_write_nodes(TransContext *txc, KeyValueDB::Transaction t);
2644 void _txc_state_proc(TransContext *txc);
2645 void _txc_aio_submit(TransContext *txc);
2646 public:
2647 void txc_aio_finish(void *p) {
2648 _txc_state_proc(static_cast<TransContext*>(p));
2649 }
2650 private:
2651 void _txc_finish_io(TransContext *txc);
2652 void _txc_finalize_kv(TransContext *txc, KeyValueDB::Transaction t);
2653 void _txc_apply_kv(TransContext *txc, bool sync_submit_transaction);
2654 void _txc_committed_kv(TransContext *txc);
2655 void _txc_finish(TransContext *txc);
2656 void _txc_release_alloc(TransContext *txc);
2657
2658 void _osr_attach(Collection *c);
2659 void _osr_register_zombie(OpSequencer *osr);
2660 void _osr_drain(OpSequencer *osr);
2661 void _osr_drain_preceding(TransContext *txc);
2662 void _osr_drain_all();
2663
2664 void _kv_start();
2665 void _kv_stop();
2666 void _kv_sync_thread();
2667 void _kv_finalize_thread();
2668
2669 #ifdef HAVE_LIBZBD
2670 void _zoned_cleaner_start();
2671 void _zoned_cleaner_stop();
2672 void _zoned_cleaner_thread();
2673 void _zoned_clean_zone(uint64_t zone_num,
2674 class ZonedAllocator *a,
2675 class ZonedFreelistManager *f);
2676 void _clean_some(ghobject_t oid, uint32_t zone_num);
2677 #endif
2678
2679 bluestore_deferred_op_t *_get_deferred_op(TransContext *txc, uint64_t len);
2680 void _deferred_queue(TransContext *txc);
2681 public:
2682 void deferred_try_submit();
2683 private:
2684 void _deferred_submit_unlock(OpSequencer *osr);
2685 void _deferred_aio_finish(OpSequencer *osr);
2686 int _deferred_replay();
2687 bool _eliminate_outdated_deferred(bluestore_deferred_transaction_t* deferred_txn,
2688 interval_set<uint64_t>& bluefs_extents);
2689
2690 public:
2691 using mempool_dynamic_bitset =
2692 boost::dynamic_bitset<uint64_t,
2693 mempool::bluestore_fsck::pool_allocator<uint64_t>>;
2694 using per_pool_statfs =
2695 mempool::bluestore_fsck::map<uint64_t, store_statfs_t>;
2696
2697 enum FSCKDepth {
2698 FSCK_REGULAR,
2699 FSCK_DEEP,
2700 FSCK_SHALLOW
2701 };
2702 enum {
2703 MAX_FSCK_ERROR_LINES = 100,
2704 };
2705
2706 private:
2707 int _fsck_check_extents(
2708 std::string_view ctx_descr,
2709 const PExtentVector& extents,
2710 bool compressed,
2711 mempool_dynamic_bitset &used_blocks,
2712 uint64_t granularity,
2713 BlueStoreRepairer* repairer,
2714 store_statfs_t& expected_statfs,
2715 FSCKDepth depth);
2716
2717 void _fsck_check_pool_statfs(
2718 per_pool_statfs& expected_pool_statfs,
2719 int64_t& errors,
2720 int64_t &warnings,
2721 BlueStoreRepairer* repairer);
2722 void _fsck_repair_shared_blobs(
2723 BlueStoreRepairer& repairer,
2724 shared_blob_2hash_tracker_t& sb_ref_counts,
2725 sb_info_space_efficient_map_t& sb_info);
2726
2727 int _fsck(FSCKDepth depth, bool repair);
2728 int _fsck_on_open(BlueStore::FSCKDepth depth, bool repair);
2729
2730 void _buffer_cache_write(
2731 TransContext *txc,
2732 BlobRef b,
2733 uint64_t offset,
2734 ceph::buffer::list& bl,
2735 unsigned flags) {
2736 b->shared_blob->bc.write(b->shared_blob->get_cache(), txc->seq, offset, bl,
2737 flags);
2738 txc->shared_blobs_written.insert(b->shared_blob);
2739 }
2740
2741 int _collection_list(
2742 Collection *c, const ghobject_t& start, const ghobject_t& end,
2743 int max, bool legacy, std::vector<ghobject_t> *ls, ghobject_t *next);
2744
2745 template <typename T, typename F>
2746 T select_option(const std::string& opt_name, T val1, F f) {
2747 //NB: opt_name reserved for future use
2748 boost::optional<T> val2 = f();
2749 if (val2) {
2750 return *val2;
2751 }
2752 return val1;
2753 }
2754
2755 void _apply_padding(uint64_t head_pad,
2756 uint64_t tail_pad,
2757 ceph::buffer::list& padded);
2758
2759 void _record_onode(OnodeRef &o, KeyValueDB::Transaction &txn);
2760
2761 // -- ondisk version ---
2762 public:
2763 const int32_t latest_ondisk_format = 4; ///< our version
2764 const int32_t min_readable_ondisk_format = 1; ///< what we can read
2765 const int32_t min_compat_ondisk_format = 3; ///< who can read us
2766
2767 private:
2768 int32_t ondisk_format = 0; ///< value detected on mount
2769 bool m_fast_shutdown = false;
2770 int _upgrade_super(); ///< upgrade (called during open_super)
2771 uint64_t _get_ondisk_reserved() const;
2772 void _prepare_ondisk_format_super(KeyValueDB::Transaction& t);
2773
2774 // --- public interface ---
2775 public:
2776 BlueStore(CephContext *cct, const std::string& path);
2777 BlueStore(CephContext *cct, const std::string& path, uint64_t min_alloc_size); // Ctor for UT only
2778 ~BlueStore() override;
2779
2780 std::string get_type() override {
2781 return "bluestore";
2782 }
2783
2784 bool needs_journal() override { return false; };
2785 bool wants_journal() override { return false; };
2786 bool allows_journal() override { return false; };
2787
2788 void prepare_for_fast_shutdown() override;
2789 virtual bool has_null_manager();
2790
2791 uint64_t get_min_alloc_size() const override {
2792 return min_alloc_size;
2793 }
2794
2795 int get_devices(std::set<std::string> *ls) override;
2796
2797 bool is_rotational() override;
2798 bool is_journal_rotational() override;
2799 bool is_db_rotational() ;
2800
2801 std::string get_default_device_class() override {
2802 std::string device_class;
2803 std::map<std::string, std::string> metadata;
2804 collect_metadata(&metadata);
2805 auto it = metadata.find("bluestore_bdev_type");
2806 if (it != metadata.end()) {
2807 device_class = it->second;
2808 }
2809 return device_class;
2810 }
2811
2812 int get_numa_node(
2813 int *numa_node,
2814 std::set<int> *nodes,
2815 std::set<std::string> *failed) override;
2816
2817 static int get_block_device_fsid(CephContext* cct, const std::string& path,
2818 uuid_d *fsid);
2819
2820 bool test_mount_in_use() override;
2821
2822 private:
2823 int _mount();
2824 public:
2825 int mount() override {
2826 return _mount();
2827 }
2828 int umount() override;
2829
2830 int open_db_environment(KeyValueDB **pdb, bool to_repair);
2831 int close_db_environment();
2832 BlueFS* get_bluefs();
2833
2834 int write_meta(const std::string& key, const std::string& value) override;
2835 int read_meta(const std::string& key, std::string *value) override;
2836
2837 // open in read-only and limited mode
2838 int cold_open();
2839 int cold_close();
2840
2841 int fsck(bool deep) override {
2842 return _fsck(deep ? FSCK_DEEP : FSCK_REGULAR, false);
2843 }
2844 int repair(bool deep) override {
2845 return _fsck(deep ? FSCK_DEEP : FSCK_REGULAR, true);
2846 }
2847 int quick_fix() override {
2848 return _fsck(FSCK_SHALLOW, true);
2849 }
2850
2851 void set_cache_shards(unsigned num) override;
2852 void dump_cache_stats(ceph::Formatter *f) override {
2853 int onode_count = 0, buffers_bytes = 0;
2854 for (auto i: onode_cache_shards) {
2855 onode_count += i->_get_num();
2856 }
2857 for (auto i: buffer_cache_shards) {
2858 buffers_bytes += i->_get_bytes();
2859 }
2860 f->dump_int("bluestore_onode", onode_count);
2861 f->dump_int("bluestore_buffers", buffers_bytes);
2862 }
2863 void dump_cache_stats(std::ostream& ss) override {
2864 int onode_count = 0, buffers_bytes = 0;
2865 for (auto i: onode_cache_shards) {
2866 onode_count += i->_get_num();
2867 }
2868 for (auto i: buffer_cache_shards) {
2869 buffers_bytes += i->_get_bytes();
2870 }
2871 ss << "bluestore_onode: " << onode_count;
2872 ss << "bluestore_buffers: " << buffers_bytes;
2873 }
2874
2875 int validate_hobject_key(const hobject_t &obj) const override {
2876 return 0;
2877 }
2878 unsigned get_max_attr_name_length() override {
2879 return 256; // arbitrary; there is no real limit internally
2880 }
2881
2882 int mkfs() override;
2883 int mkjournal() override {
2884 return 0;
2885 }
2886
2887 void get_db_statistics(ceph::Formatter *f) override;
2888 void generate_db_histogram(ceph::Formatter *f) override;
2889 void _shutdown_cache();
2890 int flush_cache(std::ostream *os = NULL) override;
2891 void dump_perf_counters(ceph::Formatter *f) override {
2892 f->open_object_section("perf_counters");
2893 logger->dump_formatted(f, false);
2894 f->close_section();
2895 }
2896
2897 int add_new_bluefs_device(int id, const std::string& path);
2898 int migrate_to_existing_bluefs_device(const std::set<int>& devs_source,
2899 int id);
2900 int migrate_to_new_bluefs_device(const std::set<int>& devs_source,
2901 int id,
2902 const std::string& path);
2903 int expand_devices(std::ostream& out);
2904 std::string get_device_path(unsigned id);
2905
2906 int dump_bluefs_sizes(std::ostream& out);
2907
2908 public:
2909 int statfs(struct store_statfs_t *buf,
2910 osd_alert_list_t* alerts = nullptr) override;
2911 int pool_statfs(uint64_t pool_id, struct store_statfs_t *buf,
2912 bool *per_pool_omap) override;
2913
2914 void collect_metadata(std::map<std::string,std::string> *pm) override;
2915
2916 bool exists(CollectionHandle &c, const ghobject_t& oid) override;
2917 int set_collection_opts(
2918 CollectionHandle& c,
2919 const pool_opts_t& opts) override;
2920 int stat(
2921 CollectionHandle &c,
2922 const ghobject_t& oid,
2923 struct stat *st,
2924 bool allow_eio = false) override;
2925 int read(
2926 CollectionHandle &c,
2927 const ghobject_t& oid,
2928 uint64_t offset,
2929 size_t len,
2930 ceph::buffer::list& bl,
2931 uint32_t op_flags = 0) override;
2932
2933 private:
2934
2935 // --------------------------------------------------------
2936 // intermediate data structures used while reading
2937 struct region_t {
2938 uint64_t logical_offset;
2939 uint64_t blob_xoffset; //region offset within the blob
2940 uint64_t length;
2941
2942 // used later in read process
2943 uint64_t front = 0;
2944
2945 region_t(uint64_t offset, uint64_t b_offs, uint64_t len, uint64_t front = 0)
2946 : logical_offset(offset),
2947 blob_xoffset(b_offs),
2948 length(len),
2949 front(front){}
2950 region_t(const region_t& from)
2951 : logical_offset(from.logical_offset),
2952 blob_xoffset(from.blob_xoffset),
2953 length(from.length),
2954 front(from.front){}
2955
2956 friend std::ostream& operator<<(std::ostream& out, const region_t& r) {
2957 return out << "0x" << std::hex << r.logical_offset << ":"
2958 << r.blob_xoffset << "~" << r.length << std::dec;
2959 }
2960 };
2961
2962 // merged blob read request
2963 struct read_req_t {
2964 uint64_t r_off = 0;
2965 uint64_t r_len = 0;
2966 ceph::buffer::list bl;
2967 std::list<region_t> regs; // original read regions
2968
2969 read_req_t(uint64_t off, uint64_t len) : r_off(off), r_len(len) {}
2970
2971 friend std::ostream& operator<<(std::ostream& out, const read_req_t& r) {
2972 out << "{<0x" << std::hex << r.r_off << ", 0x" << r.r_len << "> : [";
2973 for (const auto& reg : r.regs)
2974 out << reg;
2975 return out << "]}" << std::dec;
2976 }
2977 };
2978
2979 typedef std::list<read_req_t> regions2read_t;
2980 typedef std::map<BlueStore::BlobRef, regions2read_t> blobs2read_t;
2981
2982 void _read_cache(
2983 OnodeRef o,
2984 uint64_t offset,
2985 size_t length,
2986 int read_cache_policy,
2987 ready_regions_t& ready_regions,
2988 blobs2read_t& blobs2read);
2989
2990
2991 int _prepare_read_ioc(
2992 blobs2read_t& blobs2read,
2993 std::vector<ceph::buffer::list>* compressed_blob_bls,
2994 IOContext* ioc);
2995
2996 int _generate_read_result_bl(
2997 OnodeRef o,
2998 uint64_t offset,
2999 size_t length,
3000 ready_regions_t& ready_regions,
3001 std::vector<ceph::buffer::list>& compressed_blob_bls,
3002 blobs2read_t& blobs2read,
3003 bool buffered,
3004 bool* csum_error,
3005 ceph::buffer::list& bl);
3006
3007 int _do_read(
3008 Collection *c,
3009 OnodeRef o,
3010 uint64_t offset,
3011 size_t len,
3012 ceph::buffer::list& bl,
3013 uint32_t op_flags = 0,
3014 uint64_t retry_count = 0);
3015
3016 int _do_readv(
3017 Collection *c,
3018 OnodeRef o,
3019 const interval_set<uint64_t>& m,
3020 ceph::buffer::list& bl,
3021 uint32_t op_flags = 0,
3022 uint64_t retry_count = 0);
3023
3024 int _fiemap(CollectionHandle &c_, const ghobject_t& oid,
3025 uint64_t offset, size_t len, interval_set<uint64_t>& destset);
3026 public:
3027 int fiemap(CollectionHandle &c, const ghobject_t& oid,
3028 uint64_t offset, size_t len, ceph::buffer::list& bl) override;
3029 int fiemap(CollectionHandle &c, const ghobject_t& oid,
3030 uint64_t offset, size_t len, std::map<uint64_t, uint64_t>& destmap) override;
3031
3032 int readv(
3033 CollectionHandle &c_,
3034 const ghobject_t& oid,
3035 interval_set<uint64_t>& m,
3036 ceph::buffer::list& bl,
3037 uint32_t op_flags) override;
3038
3039 int dump_onode(CollectionHandle &c, const ghobject_t& oid,
3040 const std::string& section_name, ceph::Formatter *f) override;
3041
3042 int getattr(CollectionHandle &c, const ghobject_t& oid, const char *name,
3043 ceph::buffer::ptr& value) override;
3044
3045 int getattrs(CollectionHandle &c, const ghobject_t& oid,
3046 std::map<std::string,ceph::buffer::ptr, std::less<>>& aset) override;
3047
3048 int list_collections(std::vector<coll_t>& ls) override;
3049
3050 CollectionHandle open_collection(const coll_t &c) override;
3051 CollectionHandle create_new_collection(const coll_t& cid) override;
3052 void set_collection_commit_queue(const coll_t& cid,
3053 ContextQueue *commit_queue) override;
3054
3055 bool collection_exists(const coll_t& c) override;
3056 int collection_empty(CollectionHandle& c, bool *empty) override;
3057 int collection_bits(CollectionHandle& c) override;
3058
3059 int collection_list(CollectionHandle &c,
3060 const ghobject_t& start,
3061 const ghobject_t& end,
3062 int max,
3063 std::vector<ghobject_t> *ls, ghobject_t *next) override;
3064
3065 int collection_list_legacy(CollectionHandle &c,
3066 const ghobject_t& start,
3067 const ghobject_t& end,
3068 int max,
3069 std::vector<ghobject_t> *ls,
3070 ghobject_t *next) override;
3071
3072 int omap_get(
3073 CollectionHandle &c, ///< [in] Collection containing oid
3074 const ghobject_t &oid, ///< [in] Object containing omap
3075 ceph::buffer::list *header, ///< [out] omap header
3076 std::map<std::string, ceph::buffer::list> *out /// < [out] Key to value map
3077 ) override;
3078 int _omap_get(
3079 Collection *c, ///< [in] Collection containing oid
3080 const ghobject_t &oid, ///< [in] Object containing omap
3081 ceph::buffer::list *header, ///< [out] omap header
3082 std::map<std::string, ceph::buffer::list> *out /// < [out] Key to value map
3083 );
3084 int _onode_omap_get(
3085 const OnodeRef &o, ///< [in] Object containing omap
3086 ceph::buffer::list *header, ///< [out] omap header
3087 std::map<std::string, ceph::buffer::list> *out /// < [out] Key to value map
3088 );
3089
3090
3091 /// Get omap header
3092 int omap_get_header(
3093 CollectionHandle &c, ///< [in] Collection containing oid
3094 const ghobject_t &oid, ///< [in] Object containing omap
3095 ceph::buffer::list *header, ///< [out] omap header
3096 bool allow_eio = false ///< [in] don't assert on eio
3097 ) override;
3098
3099 /// Get keys defined on oid
3100 int omap_get_keys(
3101 CollectionHandle &c, ///< [in] Collection containing oid
3102 const ghobject_t &oid, ///< [in] Object containing omap
3103 std::set<std::string> *keys ///< [out] Keys defined on oid
3104 ) override;
3105
3106 /// Get key values
3107 int omap_get_values(
3108 CollectionHandle &c, ///< [in] Collection containing oid
3109 const ghobject_t &oid, ///< [in] Object containing omap
3110 const std::set<std::string> &keys, ///< [in] Keys to get
3111 std::map<std::string, ceph::buffer::list> *out ///< [out] Returned keys and values
3112 ) override;
3113
3114 #ifdef WITH_SEASTAR
3115 int omap_get_values(
3116 CollectionHandle &c, ///< [in] Collection containing oid
3117 const ghobject_t &oid, ///< [in] Object containing omap
3118 const std::optional<std::string> &start_after, ///< [in] Keys to get
3119 std::map<std::string, ceph::buffer::list> *out ///< [out] Returned keys and values
3120 ) override;
3121 #endif
3122
3123 /// Filters keys into out which are defined on oid
3124 int omap_check_keys(
3125 CollectionHandle &c, ///< [in] Collection containing oid
3126 const ghobject_t &oid, ///< [in] Object containing omap
3127 const std::set<std::string> &keys, ///< [in] Keys to check
3128 std::set<std::string> *out ///< [out] Subset of keys defined on oid
3129 ) override;
3130
3131 ObjectMap::ObjectMapIterator get_omap_iterator(
3132 CollectionHandle &c, ///< [in] collection
3133 const ghobject_t &oid ///< [in] object
3134 ) override;
3135
3136 void set_fsid(uuid_d u) override {
3137 fsid = u;
3138 }
3139 uuid_d get_fsid() override {
3140 return fsid;
3141 }
3142
3143 uint64_t estimate_objects_overhead(uint64_t num_objects) override {
3144 return num_objects * 300; //assuming per-object overhead is 300 bytes
3145 }
3146
3147 struct BSPerfTracker {
3148 PerfCounters::avg_tracker<uint64_t> os_commit_latency_ns;
3149 PerfCounters::avg_tracker<uint64_t> os_apply_latency_ns;
3150
3151 objectstore_perf_stat_t get_cur_stats() const {
3152 objectstore_perf_stat_t ret;
3153 ret.os_commit_latency_ns = os_commit_latency_ns.current_avg();
3154 ret.os_apply_latency_ns = os_apply_latency_ns.current_avg();
3155 return ret;
3156 }
3157
3158 void update_from_perfcounters(PerfCounters &logger);
3159 } perf_tracker;
3160
3161 objectstore_perf_stat_t get_cur_stats() override {
3162 perf_tracker.update_from_perfcounters(*logger);
3163 return perf_tracker.get_cur_stats();
3164 }
3165 const PerfCounters* get_perf_counters() const override {
3166 return logger;
3167 }
3168 const PerfCounters* get_bluefs_perf_counters() const {
3169 return bluefs->get_perf_counters();
3170 }
3171 KeyValueDB* get_kv() {
3172 return db;
3173 }
3174
3175 int queue_transactions(
3176 CollectionHandle& ch,
3177 std::vector<Transaction>& tls,
3178 TrackedOpRef op = TrackedOpRef(),
3179 ThreadPool::TPHandle *handle = NULL) override;
3180
3181 // error injection
3182 void inject_data_error(const ghobject_t& o) override {
3183 std::unique_lock l(debug_read_error_lock);
3184 debug_data_error_objects.insert(o);
3185 }
3186 void inject_mdata_error(const ghobject_t& o) override {
3187 std::unique_lock l(debug_read_error_lock);
3188 debug_mdata_error_objects.insert(o);
3189 }
3190
3191 /// methods to inject various errors fsck can repair
3192 void inject_broken_shared_blob_key(const std::string& key,
3193 const ceph::buffer::list& bl);
3194 void inject_no_shared_blob_key();
3195 void inject_stray_shared_blob_key(uint64_t sbid);
3196
3197 void inject_leaked(uint64_t len);
3198 void inject_false_free(coll_t cid, ghobject_t oid);
3199 void inject_statfs(const std::string& key, const store_statfs_t& new_statfs);
3200 void inject_global_statfs(const store_statfs_t& new_statfs);
3201 void inject_misreference(coll_t cid1, ghobject_t oid1,
3202 coll_t cid2, ghobject_t oid2,
3203 uint64_t offset);
3204 void inject_zombie_spanning_blob(coll_t cid, ghobject_t oid, int16_t blob_id);
3205 // resets global per_pool_omap in DB
3206 void inject_legacy_omap();
3207 // resets per_pool_omap | pgmeta_omap for onode
3208 void inject_legacy_omap(coll_t cid, ghobject_t oid);
3209 void inject_stray_omap(uint64_t head, const std::string& name);
3210
3211 void inject_bluefs_file(std::string_view dir,
3212 std::string_view name,
3213 size_t new_size);
3214
3215 void compact() override {
3216 ceph_assert(db);
3217 db->compact();
3218 }
3219 bool has_builtin_csum() const override {
3220 return true;
3221 }
3222
3223 inline void log_latency(const char* name,
3224 int idx,
3225 const ceph::timespan& lat,
3226 double lat_threshold,
3227 const char* info = "") const;
3228
3229 inline void log_latency_fn(const char* name,
3230 int idx,
3231 const ceph::timespan& lat,
3232 double lat_threshold,
3233 std::function<std::string (const ceph::timespan& lat)> fn) const;
3234
3235 private:
3236 bool _debug_data_eio(const ghobject_t& o) {
3237 if (!cct->_conf->bluestore_debug_inject_read_err) {
3238 return false;
3239 }
3240 std::shared_lock l(debug_read_error_lock);
3241 return debug_data_error_objects.count(o);
3242 }
3243 bool _debug_mdata_eio(const ghobject_t& o) {
3244 if (!cct->_conf->bluestore_debug_inject_read_err) {
3245 return false;
3246 }
3247 std::shared_lock l(debug_read_error_lock);
3248 return debug_mdata_error_objects.count(o);
3249 }
3250 void _debug_obj_on_delete(const ghobject_t& o) {
3251 if (cct->_conf->bluestore_debug_inject_read_err) {
3252 std::unique_lock l(debug_read_error_lock);
3253 debug_data_error_objects.erase(o);
3254 debug_mdata_error_objects.erase(o);
3255 }
3256 }
3257 private:
3258 ceph::mutex qlock = ceph::make_mutex("BlueStore::Alerts::qlock");
3259 std::string failed_cmode;
3260 std::set<std::string> failed_compressors;
3261 std::string spillover_alert;
3262 std::string legacy_statfs_alert;
3263 std::string no_per_pool_omap_alert;
3264 std::string no_per_pg_omap_alert;
3265 std::string disk_size_mismatch_alert;
3266 std::string spurious_read_errors_alert;
3267
3268 void _log_alerts(osd_alert_list_t& alerts);
3269 bool _set_compression_alert(bool cmode, const char* s) {
3270 std::lock_guard l(qlock);
3271 if (cmode) {
3272 bool ret = failed_cmode.empty();
3273 failed_cmode = s;
3274 return ret;
3275 }
3276 return failed_compressors.emplace(s).second;
3277 }
3278 void _clear_compression_alert() {
3279 std::lock_guard l(qlock);
3280 failed_compressors.clear();
3281 failed_cmode.clear();
3282 }
3283
3284 void _set_spillover_alert(const std::string& s) {
3285 std::lock_guard l(qlock);
3286 spillover_alert = s;
3287 }
3288 void _clear_spillover_alert() {
3289 std::lock_guard l(qlock);
3290 spillover_alert.clear();
3291 }
3292
3293 void _check_legacy_statfs_alert();
3294 void _check_no_per_pg_or_pool_omap_alert();
3295 void _set_disk_size_mismatch_alert(const std::string& s) {
3296 std::lock_guard l(qlock);
3297 disk_size_mismatch_alert = s;
3298 }
3299 void _set_spurious_read_errors_alert(const std::string& s) {
3300 std::lock_guard l(qlock);
3301 spurious_read_errors_alert = s;
3302 }
3303
3304 private:
3305
3306 // --------------------------------------------------------
3307 // read processing internal methods
3308 int _verify_csum(
3309 OnodeRef& o,
3310 const bluestore_blob_t* blob,
3311 uint64_t blob_xoffset,
3312 const ceph::buffer::list& bl,
3313 uint64_t logical_offset) const;
3314 int _decompress(ceph::buffer::list& source, ceph::buffer::list* result);
3315
3316
3317 // --------------------------------------------------------
3318 // write ops
3319
3320 struct WriteContext {
3321 bool buffered = false; ///< buffered write
3322 bool compress = false; ///< compressed write
3323 uint64_t target_blob_size = 0; ///< target (max) blob size
3324 unsigned csum_order = 0; ///< target checksum chunk order
3325
3326 old_extent_map_t old_extents; ///< must deref these blobs
3327 interval_set<uint64_t> extents_to_gc; ///< extents for garbage collection
3328
3329 struct write_item {
3330 uint64_t logical_offset; ///< write logical offset
3331 BlobRef b;
3332 uint64_t blob_length;
3333 uint64_t b_off;
3334 ceph::buffer::list bl;
3335 uint64_t b_off0; ///< original offset in a blob prior to padding
3336 uint64_t length0; ///< original data length prior to padding
3337
3338 bool mark_unused;
3339 bool new_blob; ///< whether new blob was created
3340
3341 bool compressed = false;
3342 ceph::buffer::list compressed_bl;
3343 size_t compressed_len = 0;
3344
3345 write_item(
3346 uint64_t logical_offs,
3347 BlobRef b,
3348 uint64_t blob_len,
3349 uint64_t o,
3350 ceph::buffer::list& bl,
3351 uint64_t o0,
3352 uint64_t l0,
3353 bool _mark_unused,
3354 bool _new_blob)
3355 :
3356 logical_offset(logical_offs),
3357 b(b),
3358 blob_length(blob_len),
3359 b_off(o),
3360 bl(bl),
3361 b_off0(o0),
3362 length0(l0),
3363 mark_unused(_mark_unused),
3364 new_blob(_new_blob) {}
3365 };
3366 std::vector<write_item> writes; ///< blobs we're writing
3367
3368 /// partial clone of the context
3369 void fork(const WriteContext& other) {
3370 buffered = other.buffered;
3371 compress = other.compress;
3372 target_blob_size = other.target_blob_size;
3373 csum_order = other.csum_order;
3374 }
3375 void write(
3376 uint64_t loffs,
3377 BlobRef b,
3378 uint64_t blob_len,
3379 uint64_t o,
3380 ceph::buffer::list& bl,
3381 uint64_t o0,
3382 uint64_t len0,
3383 bool _mark_unused,
3384 bool _new_blob) {
3385 writes.emplace_back(loffs,
3386 b,
3387 blob_len,
3388 o,
3389 bl,
3390 o0,
3391 len0,
3392 _mark_unused,
3393 _new_blob);
3394 }
3395 /// Checks for writes to the same pextent within a blob
3396 bool has_conflict(
3397 BlobRef b,
3398 uint64_t loffs,
3399 uint64_t loffs_end,
3400 uint64_t min_alloc_size);
3401 };
3402 void _do_write_small(
3403 TransContext *txc,
3404 CollectionRef &c,
3405 OnodeRef o,
3406 uint64_t offset, uint64_t length,
3407 ceph::buffer::list::iterator& blp,
3408 WriteContext *wctx);
3409 void _do_write_big_apply_deferred(
3410 TransContext* txc,
3411 CollectionRef& c,
3412 OnodeRef o,
3413 BigDeferredWriteContext& dctx,
3414 bufferlist::iterator& blp,
3415 WriteContext* wctx);
3416 void _do_write_big(
3417 TransContext *txc,
3418 CollectionRef &c,
3419 OnodeRef o,
3420 uint64_t offset, uint64_t length,
3421 ceph::buffer::list::iterator& blp,
3422 WriteContext *wctx);
3423 int _do_alloc_write(
3424 TransContext *txc,
3425 CollectionRef c,
3426 OnodeRef o,
3427 WriteContext *wctx);
3428 void _wctx_finish(
3429 TransContext *txc,
3430 CollectionRef& c,
3431 OnodeRef o,
3432 WriteContext *wctx,
3433 std::set<SharedBlob*> *maybe_unshared_blobs=0);
3434
3435 int _write(TransContext *txc,
3436 CollectionRef& c,
3437 OnodeRef& o,
3438 uint64_t offset, size_t len,
3439 ceph::buffer::list& bl,
3440 uint32_t fadvise_flags);
3441 void _pad_zeros(ceph::buffer::list *bl, uint64_t *offset,
3442 uint64_t chunk_size);
3443
3444 void _choose_write_options(CollectionRef& c,
3445 OnodeRef o,
3446 uint32_t fadvise_flags,
3447 WriteContext *wctx);
3448
3449 int _do_gc(TransContext *txc,
3450 CollectionRef& c,
3451 OnodeRef o,
3452 const WriteContext& wctx,
3453 uint64_t *dirty_start,
3454 uint64_t *dirty_end);
3455
3456 int _do_write(TransContext *txc,
3457 CollectionRef &c,
3458 OnodeRef o,
3459 uint64_t offset, uint64_t length,
3460 ceph::buffer::list& bl,
3461 uint32_t fadvise_flags);
3462 void _do_write_data(TransContext *txc,
3463 CollectionRef& c,
3464 OnodeRef o,
3465 uint64_t offset,
3466 uint64_t length,
3467 ceph::buffer::list& bl,
3468 WriteContext *wctx);
3469
3470 int _touch(TransContext *txc,
3471 CollectionRef& c,
3472 OnodeRef& o);
3473 int _do_zero(TransContext *txc,
3474 CollectionRef& c,
3475 OnodeRef& o,
3476 uint64_t offset, size_t len);
3477 int _zero(TransContext *txc,
3478 CollectionRef& c,
3479 OnodeRef& o,
3480 uint64_t offset, size_t len);
3481 void _do_truncate(TransContext *txc,
3482 CollectionRef& c,
3483 OnodeRef o,
3484 uint64_t offset,
3485 std::set<SharedBlob*> *maybe_unshared_blobs=0);
3486 int _truncate(TransContext *txc,
3487 CollectionRef& c,
3488 OnodeRef& o,
3489 uint64_t offset);
3490 int _remove(TransContext *txc,
3491 CollectionRef& c,
3492 OnodeRef& o);
3493 int _do_remove(TransContext *txc,
3494 CollectionRef& c,
3495 OnodeRef o);
3496 int _setattr(TransContext *txc,
3497 CollectionRef& c,
3498 OnodeRef& o,
3499 const std::string& name,
3500 ceph::buffer::ptr& val);
3501 int _setattrs(TransContext *txc,
3502 CollectionRef& c,
3503 OnodeRef& o,
3504 const std::map<std::string,ceph::buffer::ptr>& aset);
3505 int _rmattr(TransContext *txc,
3506 CollectionRef& c,
3507 OnodeRef& o,
3508 const std::string& name);
3509 int _rmattrs(TransContext *txc,
3510 CollectionRef& c,
3511 OnodeRef& o);
3512 void _do_omap_clear(TransContext *txc, OnodeRef &o);
3513 int _omap_clear(TransContext *txc,
3514 CollectionRef& c,
3515 OnodeRef& o);
3516 int _omap_setkeys(TransContext *txc,
3517 CollectionRef& c,
3518 OnodeRef& o,
3519 ceph::buffer::list& bl);
3520 int _omap_setheader(TransContext *txc,
3521 CollectionRef& c,
3522 OnodeRef& o,
3523 ceph::buffer::list& header);
3524 int _omap_rmkeys(TransContext *txc,
3525 CollectionRef& c,
3526 OnodeRef& o,
3527 ceph::buffer::list& bl);
3528 int _omap_rmkey_range(TransContext *txc,
3529 CollectionRef& c,
3530 OnodeRef& o,
3531 const std::string& first, const std::string& last);
3532 int _set_alloc_hint(
3533 TransContext *txc,
3534 CollectionRef& c,
3535 OnodeRef& o,
3536 uint64_t expected_object_size,
3537 uint64_t expected_write_size,
3538 uint32_t flags);
3539 int _do_clone_range(TransContext *txc,
3540 CollectionRef& c,
3541 OnodeRef& oldo,
3542 OnodeRef& newo,
3543 uint64_t srcoff, uint64_t length, uint64_t dstoff);
3544 int _clone(TransContext *txc,
3545 CollectionRef& c,
3546 OnodeRef& oldo,
3547 OnodeRef& newo);
3548 int _clone_range(TransContext *txc,
3549 CollectionRef& c,
3550 OnodeRef& oldo,
3551 OnodeRef& newo,
3552 uint64_t srcoff, uint64_t length, uint64_t dstoff);
3553 int _rename(TransContext *txc,
3554 CollectionRef& c,
3555 OnodeRef& oldo,
3556 OnodeRef& newo,
3557 const ghobject_t& new_oid);
3558 int _create_collection(TransContext *txc, const coll_t &cid,
3559 unsigned bits, CollectionRef *c);
3560 int _remove_collection(TransContext *txc, const coll_t &cid,
3561 CollectionRef *c);
3562 void _do_remove_collection(TransContext *txc, CollectionRef *c);
3563 int _split_collection(TransContext *txc,
3564 CollectionRef& c,
3565 CollectionRef& d,
3566 unsigned bits, int rem);
3567 int _merge_collection(TransContext *txc,
3568 CollectionRef *c,
3569 CollectionRef& d,
3570 unsigned bits);
3571
3572 void _collect_allocation_stats(uint64_t need, uint32_t alloc_size,
3573 const PExtentVector&);
3574 void _record_allocation_stats();
3575 private:
3576 uint64_t probe_count = 0;
3577 std::atomic<uint64_t> alloc_stats_count = {0};
3578 std::atomic<uint64_t> alloc_stats_fragments = { 0 };
3579 std::atomic<uint64_t> alloc_stats_size = { 0 };
3580 //
3581 std::array<std::tuple<uint64_t, uint64_t, uint64_t>, 5> alloc_stats_history =
3582 { std::make_tuple(0ul, 0ul, 0ul) };
3583
3584 inline bool _use_rotational_settings();
3585
3586 public:
3587 typedef btree::btree_set<
3588 uint64_t, std::less<uint64_t>,
3589 mempool::bluestore_fsck::pool_allocator<uint64_t>> uint64_t_btree_t;
3590
3591 struct FSCK_ObjectCtx {
3592 int64_t& errors;
3593 int64_t& warnings;
3594 uint64_t& num_objects;
3595 uint64_t& num_extents;
3596 uint64_t& num_blobs;
3597 uint64_t& num_sharded_objects;
3598 uint64_t& num_spanning_blobs;
3599
3600 mempool_dynamic_bitset* used_blocks;
3601 uint64_t_btree_t* used_omap_head;
3602 std::vector<std::unordered_map<ghobject_t, uint64_t>> *zone_refs;
3603
3604 ceph::mutex* sb_info_lock;
3605 sb_info_space_efficient_map_t& sb_info;
3606 // approximate amount of references per <shared blob, chunk>
3607 shared_blob_2hash_tracker_t& sb_ref_counts;
3608
3609 store_statfs_t& expected_store_statfs;
3610 per_pool_statfs& expected_pool_statfs;
3611 BlueStoreRepairer* repairer;
3612
3613 FSCK_ObjectCtx(int64_t& e,
3614 int64_t& w,
3615 uint64_t& _num_objects,
3616 uint64_t& _num_extents,
3617 uint64_t& _num_blobs,
3618 uint64_t& _num_sharded_objects,
3619 uint64_t& _num_spanning_blobs,
3620 mempool_dynamic_bitset* _ub,
3621 uint64_t_btree_t* _used_omap_head,
3622 std::vector<std::unordered_map<ghobject_t, uint64_t>> *_zone_refs,
3623
3624 ceph::mutex* _sb_info_lock,
3625 sb_info_space_efficient_map_t& _sb_info,
3626 shared_blob_2hash_tracker_t& _sb_ref_counts,
3627 store_statfs_t& _store_statfs,
3628 per_pool_statfs& _pool_statfs,
3629 BlueStoreRepairer* _repairer) :
3630 errors(e),
3631 warnings(w),
3632 num_objects(_num_objects),
3633 num_extents(_num_extents),
3634 num_blobs(_num_blobs),
3635 num_sharded_objects(_num_sharded_objects),
3636 num_spanning_blobs(_num_spanning_blobs),
3637 used_blocks(_ub),
3638 used_omap_head(_used_omap_head),
3639 zone_refs(_zone_refs),
3640 sb_info_lock(_sb_info_lock),
3641 sb_info(_sb_info),
3642 sb_ref_counts(_sb_ref_counts),
3643 expected_store_statfs(_store_statfs),
3644 expected_pool_statfs(_pool_statfs),
3645 repairer(_repairer) {
3646 }
3647 };
3648
3649 OnodeRef fsck_check_objects_shallow(
3650 FSCKDepth depth,
3651 int64_t pool_id,
3652 CollectionRef c,
3653 const ghobject_t& oid,
3654 const std::string& key,
3655 const ceph::buffer::list& value,
3656 mempool::bluestore_fsck::list<std::string>* expecting_shards,
3657 std::map<BlobRef, bluestore_blob_t::unused_t>* referenced,
3658 const BlueStore::FSCK_ObjectCtx& ctx);
3659 #ifdef CEPH_BLUESTORE_TOOL_RESTORE_ALLOCATION
3660 int push_allocation_to_rocksdb();
3661 int read_allocation_from_drive_for_bluestore_tool();
3662 #endif
3663 private:
3664 #define MAX_BLOBS_IN_ONODE 128
3665 struct read_alloc_stats_t {
3666 //read_alloc_stats_t() { memset(&this, 0, sizeof(read_alloc_stats_t)); }
3667 uint32_t onode_count = 0;
3668 uint32_t shard_count = 0;
3669
3670 uint32_t skipped_repeated_extent = 0;
3671 uint32_t skipped_illegal_extent = 0;
3672
3673 uint32_t collection_search = 0;
3674 uint32_t pad_limit_count = 0;
3675
3676 uint64_t shared_blobs_count = 0;
3677 uint64_t compressed_blob_count = 0;
3678 uint64_t spanning_blob_count = 0;
3679 uint64_t insert_count = 0;
3680 uint64_t extent_count = 0;
3681
3682 uint64_t saved_inplace_count = 0;
3683 uint32_t merge_insert_count = 0;
3684 uint32_t merge_inplace_count = 0;
3685
3686 std::array<uint32_t, MAX_BLOBS_IN_ONODE+1>blobs_in_onode = {};
3687 //uint32_t blobs_in_onode[MAX_BLOBS_IN_ONODE+1];
3688 };
3689
3690 friend std::ostream& operator<<(std::ostream& out, const read_alloc_stats_t& stats) {
3691 out << "==========================================================" << std::endl;
3692 out << "NCB::onode_count = " ;out.width(10);out << stats.onode_count << std::endl
3693 << "NCB::shard_count = " ;out.width(10);out << stats.shard_count << std::endl
3694 << "NCB::shared_blobs_count = " ;out.width(10);out << stats.shared_blobs_count << std::endl
3695 << "NCB::compressed_blob_count = " ;out.width(10);out << stats.compressed_blob_count << std::endl
3696 << "NCB::spanning_blob_count = " ;out.width(10);out << stats.spanning_blob_count << std::endl
3697 << "NCB::collection search = " ;out.width(10);out << stats.collection_search << std::endl
3698 << "NCB::skipped_repeated_extent = " ;out.width(10);out << stats.skipped_repeated_extent << std::endl
3699 << "NCB::skipped_illegal_extent = " ;out.width(10);out << stats.skipped_illegal_extent << std::endl
3700 << "NCB::extent_count = " ;out.width(10);out << stats.extent_count << std::endl
3701 << "NCB::insert_count = " ;out.width(10);out << stats.insert_count << std::endl;
3702
3703 if (stats.merge_insert_count) {
3704 out << "NCB::merge_insert_count = " ;out.width(10);out << stats.merge_insert_count << std::endl;
3705 }
3706 if (stats.merge_inplace_count ) {
3707 out << "NCB::merge_inplace_count = " ;out.width(10);out << stats.merge_inplace_count << std::endl;
3708 out << "NCB::saved_inplace_count = " ;out.width(10);out << stats.saved_inplace_count << std::endl;
3709 out << "NCB::saved inplace per call = " ;out.width(10);out << stats.saved_inplace_count/stats.merge_inplace_count << std::endl;
3710 }
3711 out << "==========================================================" << std::endl;
3712
3713 for (unsigned i = 0; i < MAX_BLOBS_IN_ONODE; i++ ) {
3714 if (stats.blobs_in_onode[i]) {
3715 out << "NCB::We had " ;out.width(9); out << stats.blobs_in_onode[i]
3716 << " ONodes with "; out.width(3); out << i << " blobs" << std::endl;
3717 }
3718 }
3719
3720 if (stats.blobs_in_onode[MAX_BLOBS_IN_ONODE]) {
3721 out << "NCB::We had " ;out.width(9);out << stats.blobs_in_onode[MAX_BLOBS_IN_ONODE]
3722 << " ONodes with more than " << MAX_BLOBS_IN_ONODE << " blobs" << std::endl;
3723 }
3724 return out;
3725 }
3726
3727 int compare_allocators(Allocator* alloc1, Allocator* alloc2, uint64_t req_extent_count, uint64_t memory_target);
3728 Allocator* create_bitmap_allocator(uint64_t bdev_size);
3729 int add_existing_bluefs_allocation(Allocator* allocator, read_alloc_stats_t& stats);
3730 int allocator_add_restored_entries(Allocator *allocator, const void *buff, unsigned extent_count, uint64_t *p_read_alloc_size,
3731 uint64_t *p_extent_count, const void *v_header, BlueFS::FileReader *p_handle, uint64_t offset);
3732
3733 int copy_allocator(Allocator* src_alloc, Allocator *dest_alloc, uint64_t* p_num_entries);
3734 int store_allocator(Allocator* allocator);
3735 int invalidate_allocation_file_on_bluefs();
3736 int __restore_allocator(Allocator* allocator, uint64_t *num, uint64_t *bytes);
3737 int restore_allocator(Allocator* allocator, uint64_t *num, uint64_t *bytes);
3738 int read_allocation_from_drive_on_startup();
3739 int reconstruct_allocations(SimpleBitmap *smbmp, read_alloc_stats_t &stats);
3740 int read_allocation_from_onodes(SimpleBitmap *smbmp, read_alloc_stats_t& stats);
3741 void read_allocation_from_single_onode(SimpleBitmap *smbmp, BlueStore::OnodeRef& onode_ref, read_alloc_stats_t& stats);
3742 void set_allocation_in_simple_bmap(SimpleBitmap* sbmap, uint64_t offset, uint64_t length);
3743 int commit_to_null_manager();
3744 int commit_to_real_manager();
3745 int db_cleanup(int ret);
3746 int reset_fm_for_restore();
3747 int verify_rocksdb_allocations(Allocator *allocator);
3748 Allocator* clone_allocator_without_bluefs(Allocator *src_allocator);
3749 Allocator* initialize_allocator_from_freelist(FreelistManager *real_fm);
3750 void copy_allocator_content_to_fm(Allocator *allocator, FreelistManager *real_fm);
3751
3752
3753 void _fsck_check_object_omap(FSCKDepth depth,
3754 OnodeRef& o,
3755 const BlueStore::FSCK_ObjectCtx& ctx);
3756
3757 void _fsck_check_objects(FSCKDepth depth,
3758 FSCK_ObjectCtx& ctx);
3759 };
3760
3761 inline std::ostream& operator<<(std::ostream& out, const BlueStore::volatile_statfs& s) {
3762 return out
3763 << " allocated:"
3764 << s.values[BlueStore::volatile_statfs::STATFS_ALLOCATED]
3765 << " stored:"
3766 << s.values[BlueStore::volatile_statfs::STATFS_STORED]
3767 << " compressed:"
3768 << s.values[BlueStore::volatile_statfs::STATFS_COMPRESSED]
3769 << " compressed_orig:"
3770 << s.values[BlueStore::volatile_statfs::STATFS_COMPRESSED_ORIGINAL]
3771 << " compressed_alloc:"
3772 << s.values[BlueStore::volatile_statfs::STATFS_COMPRESSED_ALLOCATED];
3773 }
3774
3775 static inline void intrusive_ptr_add_ref(BlueStore::Onode *o) {
3776 o->get();
3777 }
3778 static inline void intrusive_ptr_release(BlueStore::Onode *o) {
3779 o->put();
3780 }
3781
3782 static inline void intrusive_ptr_add_ref(BlueStore::OpSequencer *o) {
3783 o->get();
3784 }
3785 static inline void intrusive_ptr_release(BlueStore::OpSequencer *o) {
3786 o->put();
3787 }
3788
3789 class BlueStoreRepairer
3790 {
3791 ceph::mutex lock = ceph::make_mutex("BlueStore::BlueStoreRepairer::lock");
3792
3793 public:
3794 // to simplify future potential migration to mempools
3795 using fsck_interval = interval_set<uint64_t>;
3796
3797 // Structure to track what pextents are used for specific cid/oid.
3798 // Similar to Bloom filter positive and false-positive matches are
3799 // possible only.
3800 // Maintains two lists of bloom filters for both cids and oids
3801 // where each list entry is a BF for specific disk pextent
3802 // The length of the extent per filter is measured on init.
3803 // Allows to filter out 'uninteresting' pextents to speadup subsequent
3804 // 'is_used' access.
3805 struct StoreSpaceTracker {
3806 const uint64_t BLOOM_FILTER_SALT_COUNT = 2;
3807 const uint64_t BLOOM_FILTER_TABLE_SIZE = 32; // bytes per single filter
3808 const uint64_t BLOOM_FILTER_EXPECTED_COUNT = 16; // arbitrary selected
3809 static const uint64_t DEF_MEM_CAP = 128 * 1024 * 1024;
3810
3811 typedef mempool::bluestore_fsck::vector<bloom_filter> bloom_vector;
3812 bloom_vector collections_bfs;
3813 bloom_vector objects_bfs;
3814
3815 bool was_filtered_out = false;
3816 uint64_t granularity = 0; // extent length for a single filter
3817
3818 StoreSpaceTracker() {
3819 }
3820 StoreSpaceTracker(const StoreSpaceTracker& from) :
3821 collections_bfs(from.collections_bfs),
3822 objects_bfs(from.objects_bfs),
3823 granularity(from.granularity) {
3824 }
3825
3826 void init(uint64_t total,
3827 uint64_t min_alloc_size,
3828 uint64_t mem_cap = DEF_MEM_CAP) {
3829 ceph_assert(!granularity); // not initialized yet
3830 ceph_assert(min_alloc_size && isp2(min_alloc_size));
3831 ceph_assert(mem_cap);
3832
3833 total = round_up_to(total, min_alloc_size);
3834 granularity = total * BLOOM_FILTER_TABLE_SIZE * 2 / mem_cap;
3835
3836 if (!granularity) {
3837 granularity = min_alloc_size;
3838 } else {
3839 granularity = round_up_to(granularity, min_alloc_size);
3840 }
3841
3842 uint64_t entries = round_up_to(total, granularity) / granularity;
3843 collections_bfs.resize(entries,
3844 bloom_filter(BLOOM_FILTER_SALT_COUNT,
3845 BLOOM_FILTER_TABLE_SIZE,
3846 0,
3847 BLOOM_FILTER_EXPECTED_COUNT));
3848 objects_bfs.resize(entries,
3849 bloom_filter(BLOOM_FILTER_SALT_COUNT,
3850 BLOOM_FILTER_TABLE_SIZE,
3851 0,
3852 BLOOM_FILTER_EXPECTED_COUNT));
3853 }
3854 inline uint32_t get_hash(const coll_t& cid) const {
3855 return cid.hash_to_shard(1);
3856 }
3857 inline void set_used(uint64_t offset, uint64_t len,
3858 const coll_t& cid, const ghobject_t& oid) {
3859 ceph_assert(granularity); // initialized
3860
3861 // can't call this func after filter_out has been applied
3862 ceph_assert(!was_filtered_out);
3863 if (!len) {
3864 return;
3865 }
3866 auto pos = offset / granularity;
3867 auto end_pos = (offset + len - 1) / granularity;
3868 while (pos <= end_pos) {
3869 collections_bfs[pos].insert(get_hash(cid));
3870 objects_bfs[pos].insert(oid.hobj.get_hash());
3871 ++pos;
3872 }
3873 }
3874 // filter-out entries unrelated to the specified(broken) extents.
3875 // 'is_used' calls are permitted after that only
3876 size_t filter_out(const fsck_interval& extents);
3877
3878 // determines if collection's present after filtering-out
3879 inline bool is_used(const coll_t& cid) const {
3880 ceph_assert(was_filtered_out);
3881 for(auto& bf : collections_bfs) {
3882 if (bf.contains(get_hash(cid))) {
3883 return true;
3884 }
3885 }
3886 return false;
3887 }
3888 // determines if object's present after filtering-out
3889 inline bool is_used(const ghobject_t& oid) const {
3890 ceph_assert(was_filtered_out);
3891 for(auto& bf : objects_bfs) {
3892 if (bf.contains(oid.hobj.get_hash())) {
3893 return true;
3894 }
3895 }
3896 return false;
3897 }
3898 // determines if collection's present before filtering-out
3899 inline bool is_used(const coll_t& cid, uint64_t offs) const {
3900 ceph_assert(granularity); // initialized
3901 ceph_assert(!was_filtered_out);
3902 auto &bf = collections_bfs[offs / granularity];
3903 if (bf.contains(get_hash(cid))) {
3904 return true;
3905 }
3906 return false;
3907 }
3908 // determines if object's present before filtering-out
3909 inline bool is_used(const ghobject_t& oid, uint64_t offs) const {
3910 ceph_assert(granularity); // initialized
3911 ceph_assert(!was_filtered_out);
3912 auto &bf = objects_bfs[offs / granularity];
3913 if (bf.contains(oid.hobj.get_hash())) {
3914 return true;
3915 }
3916 return false;
3917 }
3918 };
3919
3920 public:
3921 void fix_per_pool_omap(KeyValueDB *db, int);
3922 bool remove_key(KeyValueDB *db, const std::string& prefix, const std::string& key);
3923 bool fix_shared_blob(KeyValueDB::Transaction txn,
3924 uint64_t sbid,
3925 bluestore_extent_ref_map_t* ref_map,
3926 size_t repaired = 1);
3927 bool fix_statfs(KeyValueDB *db, const std::string& key,
3928 const store_statfs_t& new_statfs);
3929
3930 bool fix_leaked(KeyValueDB *db,
3931 FreelistManager* fm,
3932 uint64_t offset, uint64_t len);
3933 bool fix_false_free(KeyValueDB *db,
3934 FreelistManager* fm,
3935 uint64_t offset, uint64_t len);
3936 bool fix_spanning_blobs(
3937 KeyValueDB* db,
3938 std::function<void(KeyValueDB::Transaction)> f);
3939
3940 bool preprocess_misreference(KeyValueDB *db);
3941
3942 unsigned apply(KeyValueDB* db);
3943
3944 void note_misreference(uint64_t offs, uint64_t len, bool inc_error) {
3945 std::lock_guard l(lock);
3946 misreferenced_extents.union_insert(offs, len);
3947 if (inc_error) {
3948 ++to_repair_cnt;
3949 }
3950 }
3951 //////////////////////
3952 //In fact two methods below are the only ones in this class which are thread-safe!!
3953 void inc_repaired(size_t n = 1) {
3954 to_repair_cnt += n;
3955 }
3956 void request_compaction() {
3957 need_compact = true;
3958 }
3959 //////////////////////
3960
3961 void init_space_usage_tracker(
3962 uint64_t total_space, uint64_t lres_tracking_unit_size)
3963 {
3964 //NB: not for use in multithreading mode!!!
3965 space_usage_tracker.init(total_space, lres_tracking_unit_size);
3966 }
3967 void set_space_used(uint64_t offset, uint64_t len,
3968 const coll_t& cid, const ghobject_t& oid) {
3969 std::lock_guard l(lock);
3970 space_usage_tracker.set_used(offset, len, cid, oid);
3971 }
3972 inline bool is_used(const coll_t& cid) const {
3973 //NB: not for use in multithreading mode!!!
3974 return space_usage_tracker.is_used(cid);
3975 }
3976 inline bool is_used(const ghobject_t& oid) const {
3977 //NB: not for use in multithreading mode!!!
3978 return space_usage_tracker.is_used(oid);
3979 }
3980
3981 const fsck_interval& get_misreferences() const {
3982 //NB: not for use in multithreading mode!!!
3983 return misreferenced_extents;
3984 }
3985 KeyValueDB::Transaction get_fix_misreferences_txn() {
3986 //NB: not for use in multithreading mode!!!
3987 return fix_misreferences_txn;
3988 }
3989
3990 private:
3991 std::atomic<unsigned> to_repair_cnt = { 0 };
3992 std::atomic<bool> need_compact = { false };
3993 KeyValueDB::Transaction fix_per_pool_omap_txn;
3994 KeyValueDB::Transaction fix_fm_leaked_txn;
3995 KeyValueDB::Transaction fix_fm_false_free_txn;
3996 KeyValueDB::Transaction remove_key_txn;
3997 KeyValueDB::Transaction fix_statfs_txn;
3998 KeyValueDB::Transaction fix_shared_blob_txn;
3999
4000 KeyValueDB::Transaction fix_misreferences_txn;
4001 KeyValueDB::Transaction fix_onode_txn;
4002
4003 StoreSpaceTracker space_usage_tracker;
4004
4005 // non-shared extents with multiple references
4006 fsck_interval misreferenced_extents;
4007
4008 };
4009
4010 class RocksDBBlueFSVolumeSelector : public BlueFSVolumeSelector
4011 {
4012 template <class T, size_t MaxX, size_t MaxY>
4013 class matrix_2d {
4014 T values[MaxX][MaxY];
4015 public:
4016 matrix_2d() {
4017 clear();
4018 }
4019 T& at(size_t x, size_t y) {
4020 ceph_assert(x < MaxX);
4021 ceph_assert(y < MaxY);
4022
4023 return values[x][y];
4024 }
4025 size_t get_max_x() const {
4026 return MaxX;
4027 }
4028 size_t get_max_y() const {
4029 return MaxY;
4030 }
4031 void clear() {
4032 memset(values, 0, sizeof(values));
4033 }
4034 };
4035
4036 enum {
4037 // use 0/nullptr as unset indication
4038 LEVEL_FIRST = 1,
4039 LEVEL_LOG = LEVEL_FIRST, // BlueFS log
4040 LEVEL_WAL,
4041 LEVEL_DB,
4042 LEVEL_SLOW,
4043 LEVEL_MAX
4044 };
4045 // add +1 row for corresponding per-device totals
4046 // add +1 column for per-level actual (taken from file size) total
4047 typedef matrix_2d<std::atomic<uint64_t>, BlueFS::MAX_BDEV + 1, LEVEL_MAX - LEVEL_FIRST + 1> per_level_per_dev_usage_t;
4048
4049 per_level_per_dev_usage_t per_level_per_dev_usage;
4050 // file count per level, add +1 to keep total file count
4051 std::atomic<uint64_t> per_level_files[LEVEL_MAX - LEVEL_FIRST + 1] = { 0 };
4052
4053 // Note: maximum per-device totals below might be smaller than corresponding
4054 // perf counters by up to a single alloc unit (1M) due to superblock extent.
4055 // The later is not accounted here.
4056 per_level_per_dev_usage_t per_level_per_dev_max;
4057
4058 uint64_t l_totals[LEVEL_MAX - LEVEL_FIRST];
4059 uint64_t db_avail4slow = 0;
4060 enum {
4061 OLD_POLICY,
4062 USE_SOME_EXTRA
4063 };
4064
4065 public:
4066 RocksDBBlueFSVolumeSelector(
4067 uint64_t _wal_total,
4068 uint64_t _db_total,
4069 uint64_t _slow_total,
4070 uint64_t _level0_size,
4071 uint64_t _level_base,
4072 uint64_t _level_multiplier,
4073 double reserved_factor,
4074 uint64_t reserved,
4075 bool new_pol)
4076 {
4077 l_totals[LEVEL_LOG - LEVEL_FIRST] = 0; // not used at the moment
4078 l_totals[LEVEL_WAL - LEVEL_FIRST] = _wal_total;
4079 l_totals[LEVEL_DB - LEVEL_FIRST] = _db_total;
4080 l_totals[LEVEL_SLOW - LEVEL_FIRST] = _slow_total;
4081
4082 if (!new_pol) {
4083 return;
4084 }
4085
4086 // Calculating how much extra space is available at DB volume.
4087 // Depending on the presence of explicit reserved size specification it might be either
4088 // * DB volume size - reserved
4089 // or
4090 // * DB volume size - sum_max_level_size(0, L-1) - max_level_size(L) * reserved_factor
4091 if (!reserved) {
4092 uint64_t prev_levels = _level0_size;
4093 uint64_t cur_level = _level_base;
4094 uint64_t cur_threshold = 0;
4095 do {
4096 uint64_t next_level = cur_level * _level_multiplier;
4097 uint64_t next_threshold = prev_levels + cur_level + next_level * reserved_factor;
4098 if (_db_total <= next_threshold) {
4099 db_avail4slow = cur_threshold ? _db_total - cur_threshold : 0;
4100 break;
4101 } else {
4102 prev_levels += cur_level;
4103 cur_level = next_level;
4104 cur_threshold = next_threshold;
4105 }
4106 } while (true);
4107 } else {
4108 db_avail4slow = _db_total - reserved;
4109 }
4110 }
4111
4112 void* get_hint_for_log() const override {
4113 return reinterpret_cast<void*>(LEVEL_LOG);
4114 }
4115 void* get_hint_by_dir(std::string_view dirname) const override;
4116
4117 void add_usage(void* hint, const bluefs_fnode_t& fnode) override {
4118 if (hint == nullptr)
4119 return;
4120 size_t pos = (size_t)hint - LEVEL_FIRST;
4121 for (auto& p : fnode.extents) {
4122 auto& cur = per_level_per_dev_usage.at(p.bdev, pos);
4123 auto& max = per_level_per_dev_max.at(p.bdev, pos);
4124 uint64_t v = cur.fetch_add(p.length) + p.length;
4125 while (v > max) {
4126 max.exchange(v);
4127 }
4128 {
4129 //update per-device totals
4130 auto& cur = per_level_per_dev_usage.at(p.bdev, LEVEL_MAX - LEVEL_FIRST);
4131 auto& max = per_level_per_dev_max.at(p.bdev, LEVEL_MAX - LEVEL_FIRST);
4132 uint64_t v = cur.fetch_add(p.length) + p.length;
4133 while (v > max) {
4134 max.exchange(v);
4135 }
4136 }
4137 }
4138 {
4139 //update per-level actual totals
4140 auto& cur = per_level_per_dev_usage.at(BlueFS::MAX_BDEV, pos);
4141 auto& max = per_level_per_dev_max.at(BlueFS::MAX_BDEV, pos);
4142 uint64_t v = cur.fetch_add(fnode.size) + fnode.size;
4143 while (v > max) {
4144 max.exchange(v);
4145 }
4146 }
4147 ++per_level_files[pos];
4148 ++per_level_files[LEVEL_MAX - LEVEL_FIRST];
4149 }
4150 void sub_usage(void* hint, const bluefs_fnode_t& fnode) override {
4151 if (hint == nullptr)
4152 return;
4153 size_t pos = (size_t)hint - LEVEL_FIRST;
4154 for (auto& p : fnode.extents) {
4155 auto& cur = per_level_per_dev_usage.at(p.bdev, pos);
4156 ceph_assert(cur >= p.length);
4157 cur -= p.length;
4158
4159 //update per-device totals
4160 auto& cur2 = per_level_per_dev_usage.at(p.bdev, LEVEL_MAX - LEVEL_FIRST);
4161 ceph_assert(cur2 >= p.length);
4162 cur2 -= p.length;
4163 }
4164 //update per-level actual totals
4165 auto& cur = per_level_per_dev_usage.at(BlueFS::MAX_BDEV, pos);
4166 ceph_assert(cur >= fnode.size);
4167 cur -= fnode.size;
4168 ceph_assert(per_level_files[pos] > 0);
4169 --per_level_files[pos];
4170 ceph_assert(per_level_files[LEVEL_MAX - LEVEL_FIRST] > 0);
4171 --per_level_files[LEVEL_MAX - LEVEL_FIRST];
4172 }
4173 void add_usage(void* hint, uint64_t size_more) override {
4174 if (hint == nullptr)
4175 return;
4176 size_t pos = (size_t)hint - LEVEL_FIRST;
4177 //update per-level actual totals
4178 auto& cur = per_level_per_dev_usage.at(BlueFS::MAX_BDEV, pos);
4179 auto& max = per_level_per_dev_max.at(BlueFS::MAX_BDEV, pos);
4180 uint64_t v = cur.fetch_add(size_more) + size_more;
4181 while (v > max) {
4182 max.exchange(v);
4183 }
4184 }
4185 void sub_usage(void* hint, uint64_t size_less) override {
4186 if (hint == nullptr)
4187 return;
4188 size_t pos = (size_t)hint - LEVEL_FIRST;
4189 //update per-level actual totals
4190 auto& cur = per_level_per_dev_usage.at(BlueFS::MAX_BDEV, pos);
4191 ceph_assert(cur >= size_less);
4192 cur -= size_less;
4193 }
4194
4195 uint8_t select_prefer_bdev(void* h) override;
4196 void get_paths(
4197 const std::string& base,
4198 BlueFSVolumeSelector::paths& res) const override;
4199
4200 void dump(std::ostream& sout) override;
4201 BlueFSVolumeSelector* clone_empty() const override;
4202 bool compare(BlueFSVolumeSelector* other) override;
4203 };
4204
4205 #endif