]> git.proxmox.com Git - ceph.git/blob - ceph/src/os/bluestore/BlueStore.h
import ceph 16.2.7
[ceph.git] / ceph / src / os / bluestore / BlueStore.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_OSD_BLUESTORE_H
16 #define CEPH_OSD_BLUESTORE_H
17
18 #include "acconfig.h"
19
20 #include <unistd.h>
21
22 #include <atomic>
23 #include <chrono>
24 #include <ratio>
25 #include <mutex>
26 #include <condition_variable>
27
28 #include <boost/intrusive/list.hpp>
29 #include <boost/intrusive/unordered_set.hpp>
30 #include <boost/intrusive/set.hpp>
31 #include <boost/functional/hash.hpp>
32 #include <boost/dynamic_bitset.hpp>
33 #include <boost/circular_buffer.hpp>
34
35 #include "include/cpp-btree/btree_set.h"
36
37 #include "include/ceph_assert.h"
38 #include "include/interval_set.h"
39 #include "include/unordered_map.h"
40 #include "include/mempool.h"
41 #include "include/hash.h"
42 #include "common/bloom_filter.hpp"
43 #include "common/Finisher.h"
44 #include "common/ceph_mutex.h"
45 #include "common/Throttle.h"
46 #include "common/perf_counters.h"
47 #include "common/PriorityCache.h"
48 #include "compressor/Compressor.h"
49 #include "os/ObjectStore.h"
50
51 #include "bluestore_types.h"
52 #include "BlueFS.h"
53 #include "common/EventTrace.h"
54
55 #ifdef WITH_BLKIN
56 #include "common/zipkin_trace.h"
57 #endif
58
59 class Allocator;
60 class FreelistManager;
61 class BlueStoreRepairer;
62
63 //#define DEBUG_CACHE
64 //#define DEBUG_DEFERRED
65
66
67
68 // constants for Buffer::optimize()
69 #define MAX_BUFFER_SLOP_RATIO_DEN 8 // so actually 1/N
70
71
72 enum {
73 l_bluestore_first = 732430,
74 l_bluestore_kv_flush_lat,
75 l_bluestore_kv_commit_lat,
76 l_bluestore_kv_sync_lat,
77 l_bluestore_kv_final_lat,
78 l_bluestore_state_prepare_lat,
79 l_bluestore_state_aio_wait_lat,
80 l_bluestore_state_io_done_lat,
81 l_bluestore_state_kv_queued_lat,
82 l_bluestore_state_kv_committing_lat,
83 l_bluestore_state_kv_done_lat,
84 l_bluestore_state_deferred_queued_lat,
85 l_bluestore_state_deferred_aio_wait_lat,
86 l_bluestore_state_deferred_cleanup_lat,
87 l_bluestore_state_finishing_lat,
88 l_bluestore_state_done_lat,
89 l_bluestore_throttle_lat,
90 l_bluestore_submit_lat,
91 l_bluestore_commit_lat,
92 l_bluestore_read_lat,
93 l_bluestore_read_onode_meta_lat,
94 l_bluestore_read_wait_aio_lat,
95 l_bluestore_compress_lat,
96 l_bluestore_decompress_lat,
97 l_bluestore_csum_lat,
98 l_bluestore_compress_success_count,
99 l_bluestore_compress_rejected_count,
100 l_bluestore_write_pad_bytes,
101 l_bluestore_deferred_write_ops,
102 l_bluestore_deferred_write_bytes,
103 l_bluestore_write_penalty_read_ops,
104 l_bluestore_allocated,
105 l_bluestore_stored,
106 l_bluestore_compressed,
107 l_bluestore_compressed_allocated,
108 l_bluestore_compressed_original,
109 l_bluestore_onodes,
110 l_bluestore_pinned_onodes,
111 l_bluestore_onode_hits,
112 l_bluestore_onode_misses,
113 l_bluestore_onode_shard_hits,
114 l_bluestore_onode_shard_misses,
115 l_bluestore_extents,
116 l_bluestore_blobs,
117 l_bluestore_buffers,
118 l_bluestore_buffer_bytes,
119 l_bluestore_buffer_hit_bytes,
120 l_bluestore_buffer_miss_bytes,
121 l_bluestore_write_big,
122 l_bluestore_write_big_bytes,
123 l_bluestore_write_big_blobs,
124 l_bluestore_write_big_deferred,
125 l_bluestore_write_small,
126 l_bluestore_write_small_bytes,
127 l_bluestore_write_small_unused,
128 l_bluestore_write_deferred,
129 l_bluestore_write_deferred_bytes,
130 l_bluestore_write_small_pre_read,
131 l_bluestore_write_new,
132 l_bluestore_txc,
133 l_bluestore_onode_reshard,
134 l_bluestore_blob_split,
135 l_bluestore_extent_compress,
136 l_bluestore_gc_merged,
137 l_bluestore_read_eio,
138 l_bluestore_reads_with_retries,
139 l_bluestore_fragmentation,
140 l_bluestore_omap_seek_to_first_lat,
141 l_bluestore_omap_upper_bound_lat,
142 l_bluestore_omap_lower_bound_lat,
143 l_bluestore_omap_next_lat,
144 l_bluestore_omap_get_keys_lat,
145 l_bluestore_omap_get_values_lat,
146 l_bluestore_clist_lat,
147 l_bluestore_remove_lat,
148 l_bluestore_last
149 };
150
151 #define META_POOL_ID ((uint64_t)-1ull)
152
153 class BlueStore : public ObjectStore,
154 public md_config_obs_t {
155 // -----------------------------------------------------
156 // types
157 public:
158 // config observer
159 const char** get_tracked_conf_keys() const override;
160 void handle_conf_change(const ConfigProxy& conf,
161 const std::set<std::string> &changed) override;
162
163 //handler for discard event
164 void handle_discard(interval_set<uint64_t>& to_release);
165
166 void _set_csum();
167 void _set_compression();
168 void _set_throttle_params();
169 int _set_cache_sizes();
170 void _set_max_defer_interval() {
171 max_defer_interval =
172 cct->_conf.get_val<double>("bluestore_max_defer_interval");
173 }
174
175 struct TransContext;
176
177 typedef std::map<uint64_t, ceph::buffer::list> ready_regions_t;
178
179
180 struct BufferSpace;
181 struct Collection;
182 typedef boost::intrusive_ptr<Collection> CollectionRef;
183
184 struct AioContext {
185 virtual void aio_finish(BlueStore *store) = 0;
186 virtual ~AioContext() {}
187 };
188
189 /// cached buffer
190 struct Buffer {
191 MEMPOOL_CLASS_HELPERS();
192
193 enum {
194 STATE_EMPTY, ///< empty buffer -- used for cache history
195 STATE_CLEAN, ///< clean data that is up to date
196 STATE_WRITING, ///< data that is being written (io not yet complete)
197 };
198 static const char *get_state_name(int s) {
199 switch (s) {
200 case STATE_EMPTY: return "empty";
201 case STATE_CLEAN: return "clean";
202 case STATE_WRITING: return "writing";
203 default: return "???";
204 }
205 }
206 enum {
207 FLAG_NOCACHE = 1, ///< trim when done WRITING (do not become CLEAN)
208 // NOTE: fix operator<< when you define a second flag
209 };
210 static const char *get_flag_name(int s) {
211 switch (s) {
212 case FLAG_NOCACHE: return "nocache";
213 default: return "???";
214 }
215 }
216
217 BufferSpace *space;
218 uint16_t state; ///< STATE_*
219 uint16_t cache_private = 0; ///< opaque (to us) value used by Cache impl
220 uint32_t flags; ///< FLAG_*
221 uint64_t seq;
222 uint32_t offset, length;
223 ceph::buffer::list data;
224
225 boost::intrusive::list_member_hook<> lru_item;
226 boost::intrusive::list_member_hook<> state_item;
227
228 Buffer(BufferSpace *space, unsigned s, uint64_t q, uint32_t o, uint32_t l,
229 unsigned f = 0)
230 : space(space), state(s), flags(f), seq(q), offset(o), length(l) {}
231 Buffer(BufferSpace *space, unsigned s, uint64_t q, uint32_t o, ceph::buffer::list& b,
232 unsigned f = 0)
233 : space(space), state(s), flags(f), seq(q), offset(o),
234 length(b.length()), data(b) {}
235
236 bool is_empty() const {
237 return state == STATE_EMPTY;
238 }
239 bool is_clean() const {
240 return state == STATE_CLEAN;
241 }
242 bool is_writing() const {
243 return state == STATE_WRITING;
244 }
245
246 uint32_t end() const {
247 return offset + length;
248 }
249
250 void truncate(uint32_t newlen) {
251 ceph_assert(newlen < length);
252 if (data.length()) {
253 ceph::buffer::list t;
254 t.substr_of(data, 0, newlen);
255 data = std::move(t);
256 }
257 length = newlen;
258 }
259 void maybe_rebuild() {
260 if (data.length() &&
261 (data.get_num_buffers() > 1 ||
262 data.front().wasted() > data.length() / MAX_BUFFER_SLOP_RATIO_DEN)) {
263 data.rebuild();
264 }
265 }
266
267 void dump(ceph::Formatter *f) const {
268 f->dump_string("state", get_state_name(state));
269 f->dump_unsigned("seq", seq);
270 f->dump_unsigned("offset", offset);
271 f->dump_unsigned("length", length);
272 f->dump_unsigned("data_length", data.length());
273 }
274 };
275
276 struct BufferCacheShard;
277
278 /// map logical extent range (object) onto buffers
279 struct BufferSpace {
280 enum {
281 BYPASS_CLEAN_CACHE = 0x1, // bypass clean cache
282 };
283
284 typedef boost::intrusive::list<
285 Buffer,
286 boost::intrusive::member_hook<
287 Buffer,
288 boost::intrusive::list_member_hook<>,
289 &Buffer::state_item> > state_list_t;
290
291 mempool::bluestore_cache_meta::map<uint32_t, std::unique_ptr<Buffer>>
292 buffer_map;
293
294 // we use a bare intrusive list here instead of std::map because
295 // it uses less memory and we expect this to be very small (very
296 // few IOs in flight to the same Blob at the same time).
297 state_list_t writing; ///< writing buffers, sorted by seq, ascending
298
299 ~BufferSpace() {
300 ceph_assert(buffer_map.empty());
301 ceph_assert(writing.empty());
302 }
303
304 void _add_buffer(BufferCacheShard* cache, Buffer* b, int level, Buffer* near) {
305 cache->_audit("_add_buffer start");
306 buffer_map[b->offset].reset(b);
307 if (b->is_writing()) {
308 // we might get already cached data for which resetting mempool is inppropriate
309 // hence calling try_assign_to_mempool
310 b->data.try_assign_to_mempool(mempool::mempool_bluestore_writing);
311 if (writing.empty() || writing.rbegin()->seq <= b->seq) {
312 writing.push_back(*b);
313 } else {
314 auto it = writing.begin();
315 while (it->seq < b->seq) {
316 ++it;
317 }
318
319 ceph_assert(it->seq >= b->seq);
320 // note that this will insert b before it
321 // hence the order is maintained
322 writing.insert(it, *b);
323 }
324 } else {
325 b->data.reassign_to_mempool(mempool::mempool_bluestore_cache_data);
326 cache->_add(b, level, near);
327 }
328 cache->_audit("_add_buffer end");
329 }
330 void _rm_buffer(BufferCacheShard* cache, Buffer *b) {
331 _rm_buffer(cache, buffer_map.find(b->offset));
332 }
333 void _rm_buffer(BufferCacheShard* cache,
334 std::map<uint32_t, std::unique_ptr<Buffer>>::iterator p) {
335 ceph_assert(p != buffer_map.end());
336 cache->_audit("_rm_buffer start");
337 if (p->second->is_writing()) {
338 writing.erase(writing.iterator_to(*p->second));
339 } else {
340 cache->_rm(p->second.get());
341 }
342 buffer_map.erase(p);
343 cache->_audit("_rm_buffer end");
344 }
345
346 std::map<uint32_t,std::unique_ptr<Buffer>>::iterator _data_lower_bound(
347 uint32_t offset) {
348 auto i = buffer_map.lower_bound(offset);
349 if (i != buffer_map.begin()) {
350 --i;
351 if (i->first + i->second->length <= offset)
352 ++i;
353 }
354 return i;
355 }
356
357 // must be called under protection of the Cache lock
358 void _clear(BufferCacheShard* cache);
359
360 // return value is the highest cache_private of a trimmed buffer, or 0.
361 int discard(BufferCacheShard* cache, uint32_t offset, uint32_t length) {
362 std::lock_guard l(cache->lock);
363 int ret = _discard(cache, offset, length);
364 cache->_trim();
365 return ret;
366 }
367 int _discard(BufferCacheShard* cache, uint32_t offset, uint32_t length);
368
369 void write(BufferCacheShard* cache, uint64_t seq, uint32_t offset, ceph::buffer::list& bl,
370 unsigned flags) {
371 std::lock_guard l(cache->lock);
372 Buffer *b = new Buffer(this, Buffer::STATE_WRITING, seq, offset, bl,
373 flags);
374 b->cache_private = _discard(cache, offset, bl.length());
375 _add_buffer(cache, b, (flags & Buffer::FLAG_NOCACHE) ? 0 : 1, nullptr);
376 cache->_trim();
377 }
378 void _finish_write(BufferCacheShard* cache, uint64_t seq);
379 void did_read(BufferCacheShard* cache, uint32_t offset, ceph::buffer::list& bl) {
380 std::lock_guard l(cache->lock);
381 Buffer *b = new Buffer(this, Buffer::STATE_CLEAN, 0, offset, bl);
382 b->cache_private = _discard(cache, offset, bl.length());
383 _add_buffer(cache, b, 1, nullptr);
384 cache->_trim();
385 }
386
387 void read(BufferCacheShard* cache, uint32_t offset, uint32_t length,
388 BlueStore::ready_regions_t& res,
389 interval_set<uint32_t>& res_intervals,
390 int flags = 0);
391
392 void truncate(BufferCacheShard* cache, uint32_t offset) {
393 discard(cache, offset, (uint32_t)-1 - offset);
394 }
395
396 void split(BufferCacheShard* cache, size_t pos, BufferSpace &r);
397
398 void dump(BufferCacheShard* cache, ceph::Formatter *f) const {
399 std::lock_guard l(cache->lock);
400 f->open_array_section("buffers");
401 for (auto& i : buffer_map) {
402 f->open_object_section("buffer");
403 ceph_assert(i.first == i.second->offset);
404 i.second->dump(f);
405 f->close_section();
406 }
407 f->close_section();
408 }
409 };
410
411 struct SharedBlobSet;
412
413 /// in-memory shared blob state (incl cached buffers)
414 struct SharedBlob {
415 MEMPOOL_CLASS_HELPERS();
416
417 std::atomic_int nref = {0}; ///< reference count
418 bool loaded = false;
419
420 CollectionRef coll;
421 union {
422 uint64_t sbid_unloaded; ///< sbid if persistent isn't loaded
423 bluestore_shared_blob_t *persistent; ///< persistent part of the shared blob if any
424 };
425 BufferSpace bc; ///< buffer cache
426
427 SharedBlob(Collection *_coll) : coll(_coll), sbid_unloaded(0) {
428 if (get_cache()) {
429 get_cache()->add_blob();
430 }
431 }
432 SharedBlob(uint64_t i, Collection *_coll);
433 ~SharedBlob();
434
435 uint64_t get_sbid() const {
436 return loaded ? persistent->sbid : sbid_unloaded;
437 }
438
439 friend void intrusive_ptr_add_ref(SharedBlob *b) { b->get(); }
440 friend void intrusive_ptr_release(SharedBlob *b) { b->put(); }
441
442 void dump(ceph::Formatter* f) const;
443 friend std::ostream& operator<<(std::ostream& out, const SharedBlob& sb);
444
445 void get() {
446 ++nref;
447 }
448 void put();
449
450 /// get logical references
451 void get_ref(uint64_t offset, uint32_t length);
452
453 /// put logical references, and get back any released extents
454 void put_ref(uint64_t offset, uint32_t length,
455 PExtentVector *r, bool *unshare);
456
457 void finish_write(uint64_t seq);
458
459 friend bool operator==(const SharedBlob &l, const SharedBlob &r) {
460 return l.get_sbid() == r.get_sbid();
461 }
462 inline BufferCacheShard* get_cache() {
463 return coll ? coll->cache : nullptr;
464 }
465 inline SharedBlobSet* get_parent() {
466 return coll ? &(coll->shared_blob_set) : nullptr;
467 }
468 inline bool is_loaded() const {
469 return loaded;
470 }
471
472 };
473 typedef boost::intrusive_ptr<SharedBlob> SharedBlobRef;
474
475 /// a lookup table of SharedBlobs
476 struct SharedBlobSet {
477 /// protect lookup, insertion, removal
478 ceph::mutex lock = ceph::make_mutex("BlueStore::SharedBlobSet::lock");
479
480 // we use a bare pointer because we don't want to affect the ref
481 // count
482 mempool::bluestore_cache_meta::unordered_map<uint64_t,SharedBlob*> sb_map;
483
484 SharedBlobRef lookup(uint64_t sbid) {
485 std::lock_guard l(lock);
486 auto p = sb_map.find(sbid);
487 if (p == sb_map.end() ||
488 p->second->nref == 0) {
489 return nullptr;
490 }
491 return p->second;
492 }
493
494 void add(Collection* coll, SharedBlob *sb) {
495 std::lock_guard l(lock);
496 sb_map[sb->get_sbid()] = sb;
497 sb->coll = coll;
498 }
499
500 bool remove(SharedBlob *sb, bool verify_nref_is_zero=false) {
501 std::lock_guard l(lock);
502 ceph_assert(sb->get_parent() == this);
503 if (verify_nref_is_zero && sb->nref != 0) {
504 return false;
505 }
506 // only remove if it still points to us
507 auto p = sb_map.find(sb->get_sbid());
508 if (p != sb_map.end() &&
509 p->second == sb) {
510 sb_map.erase(p);
511 }
512 return true;
513 }
514
515 bool empty() {
516 std::lock_guard l(lock);
517 return sb_map.empty();
518 }
519
520 template <int LogLevelV>
521 void dump(CephContext *cct);
522 };
523
524 //#define CACHE_BLOB_BL // not sure if this is a win yet or not... :/
525
526 /// in-memory blob metadata and associated cached buffers (if any)
527 struct Blob {
528 MEMPOOL_CLASS_HELPERS();
529
530 std::atomic_int nref = {0}; ///< reference count
531 int16_t id = -1; ///< id, for spanning blobs only, >= 0
532 int16_t last_encoded_id = -1; ///< (ephemeral) used during encoding only
533 SharedBlobRef shared_blob; ///< shared blob state (if any)
534
535 private:
536 mutable bluestore_blob_t blob; ///< decoded blob metadata
537 #ifdef CACHE_BLOB_BL
538 mutable ceph::buffer::list blob_bl; ///< cached encoded blob, blob is dirty if empty
539 #endif
540 /// refs from this shard. ephemeral if id<0, persisted if spanning.
541 bluestore_blob_use_tracker_t used_in_blob;
542
543 public:
544
545 friend void intrusive_ptr_add_ref(Blob *b) { b->get(); }
546 friend void intrusive_ptr_release(Blob *b) { b->put(); }
547
548 void dump(ceph::Formatter* f) const;
549 friend std::ostream& operator<<(std::ostream& out, const Blob &b);
550
551 const bluestore_blob_use_tracker_t& get_blob_use_tracker() const {
552 return used_in_blob;
553 }
554 bool is_referenced() const {
555 return used_in_blob.is_not_empty();
556 }
557 uint32_t get_referenced_bytes() const {
558 return used_in_blob.get_referenced_bytes();
559 }
560
561 bool is_spanning() const {
562 return id >= 0;
563 }
564
565 bool can_split() const {
566 std::lock_guard l(shared_blob->get_cache()->lock);
567 // splitting a BufferSpace writing list is too hard; don't try.
568 return shared_blob->bc.writing.empty() &&
569 used_in_blob.can_split() &&
570 get_blob().can_split();
571 }
572
573 bool can_split_at(uint32_t blob_offset) const {
574 return used_in_blob.can_split_at(blob_offset) &&
575 get_blob().can_split_at(blob_offset);
576 }
577
578 bool can_reuse_blob(uint32_t min_alloc_size,
579 uint32_t target_blob_size,
580 uint32_t b_offset,
581 uint32_t *length0);
582
583 void dup(Blob& o) {
584 o.shared_blob = shared_blob;
585 o.blob = blob;
586 #ifdef CACHE_BLOB_BL
587 o.blob_bl = blob_bl;
588 #endif
589 }
590
591 inline const bluestore_blob_t& get_blob() const {
592 return blob;
593 }
594 inline bluestore_blob_t& dirty_blob() {
595 #ifdef CACHE_BLOB_BL
596 blob_bl.clear();
597 #endif
598 return blob;
599 }
600
601 /// discard buffers for unallocated regions
602 void discard_unallocated(Collection *coll);
603
604 /// get logical references
605 void get_ref(Collection *coll, uint32_t offset, uint32_t length);
606 /// put logical references, and get back any released extents
607 bool put_ref(Collection *coll, uint32_t offset, uint32_t length,
608 PExtentVector *r);
609
610 /// split the blob
611 void split(Collection *coll, uint32_t blob_offset, Blob *o);
612
613 void get() {
614 ++nref;
615 }
616 void put() {
617 if (--nref == 0)
618 delete this;
619 }
620
621
622 #ifdef CACHE_BLOB_BL
623 void _encode() const {
624 if (blob_bl.length() == 0 ) {
625 encode(blob, blob_bl);
626 } else {
627 ceph_assert(blob_bl.length());
628 }
629 }
630 void bound_encode(
631 size_t& p,
632 bool include_ref_map) const {
633 _encode();
634 p += blob_bl.length();
635 if (include_ref_map) {
636 used_in_blob.bound_encode(p);
637 }
638 }
639 void encode(
640 ceph::buffer::list::contiguous_appender& p,
641 bool include_ref_map) const {
642 _encode();
643 p.append(blob_bl);
644 if (include_ref_map) {
645 used_in_blob.encode(p);
646 }
647 }
648 void decode(
649 Collection */*coll*/,
650 ceph::buffer::ptr::const_iterator& p,
651 bool include_ref_map) {
652 const char *start = p.get_pos();
653 denc(blob, p);
654 const char *end = p.get_pos();
655 blob_bl.clear();
656 blob_bl.append(start, end - start);
657 if (include_ref_map) {
658 used_in_blob.decode(p);
659 }
660 }
661 #else
662 void bound_encode(
663 size_t& p,
664 uint64_t struct_v,
665 uint64_t sbid,
666 bool include_ref_map) const {
667 denc(blob, p, struct_v);
668 if (blob.is_shared()) {
669 denc(sbid, p);
670 }
671 if (include_ref_map) {
672 used_in_blob.bound_encode(p);
673 }
674 }
675 void encode(
676 ceph::buffer::list::contiguous_appender& p,
677 uint64_t struct_v,
678 uint64_t sbid,
679 bool include_ref_map) const {
680 denc(blob, p, struct_v);
681 if (blob.is_shared()) {
682 denc(sbid, p);
683 }
684 if (include_ref_map) {
685 used_in_blob.encode(p);
686 }
687 }
688 void decode(
689 Collection *coll,
690 ceph::buffer::ptr::const_iterator& p,
691 uint64_t struct_v,
692 uint64_t* sbid,
693 bool include_ref_map);
694 #endif
695 };
696 typedef boost::intrusive_ptr<Blob> BlobRef;
697 typedef mempool::bluestore_cache_meta::map<int,BlobRef> blob_map_t;
698
699 /// a logical extent, pointing to (some portion of) a blob
700 typedef boost::intrusive::set_base_hook<boost::intrusive::optimize_size<true> > ExtentBase; //making an alias to avoid build warnings
701 struct Extent : public ExtentBase {
702 MEMPOOL_CLASS_HELPERS();
703
704 uint32_t logical_offset = 0; ///< logical offset
705 uint32_t blob_offset = 0; ///< blob offset
706 uint32_t length = 0; ///< length
707 BlobRef blob; ///< the blob with our data
708
709 /// ctor for lookup only
710 explicit Extent(uint32_t lo) : ExtentBase(), logical_offset(lo) { }
711 /// ctor for delayed initialization (see decode_some())
712 explicit Extent() : ExtentBase() {
713 }
714 /// ctor for general usage
715 Extent(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b)
716 : ExtentBase(),
717 logical_offset(lo), blob_offset(o), length(l) {
718 assign_blob(b);
719 }
720 ~Extent() {
721 if (blob) {
722 blob->shared_blob->get_cache()->rm_extent();
723 }
724 }
725
726 void dump(ceph::Formatter* f) const;
727
728 void assign_blob(const BlobRef& b) {
729 ceph_assert(!blob);
730 blob = b;
731 blob->shared_blob->get_cache()->add_extent();
732 }
733
734 // comparators for intrusive_set
735 friend bool operator<(const Extent &a, const Extent &b) {
736 return a.logical_offset < b.logical_offset;
737 }
738 friend bool operator>(const Extent &a, const Extent &b) {
739 return a.logical_offset > b.logical_offset;
740 }
741 friend bool operator==(const Extent &a, const Extent &b) {
742 return a.logical_offset == b.logical_offset;
743 }
744
745 uint32_t blob_start() const {
746 return logical_offset - blob_offset;
747 }
748
749 uint32_t blob_end() const {
750 return blob_start() + blob->get_blob().get_logical_length();
751 }
752
753 uint32_t logical_end() const {
754 return logical_offset + length;
755 }
756
757 // return true if any piece of the blob is out of
758 // the given range [o, o + l].
759 bool blob_escapes_range(uint32_t o, uint32_t l) const {
760 return blob_start() < o || blob_end() > o + l;
761 }
762 };
763 typedef boost::intrusive::set<Extent> extent_map_t;
764
765
766 friend std::ostream& operator<<(std::ostream& out, const Extent& e);
767
768 struct OldExtent {
769 boost::intrusive::list_member_hook<> old_extent_item;
770 Extent e;
771 PExtentVector r;
772 bool blob_empty; // flag to track the last removed extent that makes blob
773 // empty - required to update compression stat properly
774 OldExtent(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b)
775 : e(lo, o, l, b), blob_empty(false) {
776 }
777 static OldExtent* create(CollectionRef c,
778 uint32_t lo,
779 uint32_t o,
780 uint32_t l,
781 BlobRef& b);
782 };
783 typedef boost::intrusive::list<
784 OldExtent,
785 boost::intrusive::member_hook<
786 OldExtent,
787 boost::intrusive::list_member_hook<>,
788 &OldExtent::old_extent_item> > old_extent_map_t;
789
790 struct Onode;
791
792 /// a sharded extent map, mapping offsets to lextents to blobs
793 struct ExtentMap {
794 Onode *onode;
795 extent_map_t extent_map; ///< map of Extents to Blobs
796 blob_map_t spanning_blob_map; ///< blobs that span shards
797 typedef boost::intrusive_ptr<Onode> OnodeRef;
798
799 struct Shard {
800 bluestore_onode_t::shard_info *shard_info = nullptr;
801 unsigned extents = 0; ///< count extents in this shard
802 bool loaded = false; ///< true if shard is loaded
803 bool dirty = false; ///< true if shard is dirty and needs reencoding
804 };
805 mempool::bluestore_cache_meta::vector<Shard> shards; ///< shards
806
807 ceph::buffer::list inline_bl; ///< cached encoded map, if unsharded; empty=>dirty
808
809 uint32_t needs_reshard_begin = 0;
810 uint32_t needs_reshard_end = 0;
811
812 void dup(BlueStore* b, TransContext*, CollectionRef&, OnodeRef&, OnodeRef&,
813 uint64_t&, uint64_t&, uint64_t&);
814
815 bool needs_reshard() const {
816 return needs_reshard_end > needs_reshard_begin;
817 }
818 void clear_needs_reshard() {
819 needs_reshard_begin = needs_reshard_end = 0;
820 }
821 void request_reshard(uint32_t begin, uint32_t end) {
822 if (begin < needs_reshard_begin) {
823 needs_reshard_begin = begin;
824 }
825 if (end > needs_reshard_end) {
826 needs_reshard_end = end;
827 }
828 }
829
830 struct DeleteDisposer {
831 void operator()(Extent *e) { delete e; }
832 };
833
834 ExtentMap(Onode *o);
835 ~ExtentMap() {
836 extent_map.clear_and_dispose(DeleteDisposer());
837 }
838
839 void clear() {
840 extent_map.clear_and_dispose(DeleteDisposer());
841 shards.clear();
842 inline_bl.clear();
843 clear_needs_reshard();
844 }
845
846 void dump(ceph::Formatter* f) const;
847
848 bool encode_some(uint32_t offset, uint32_t length, ceph::buffer::list& bl,
849 unsigned *pn);
850 unsigned decode_some(ceph::buffer::list& bl);
851
852 void bound_encode_spanning_blobs(size_t& p);
853 void encode_spanning_blobs(ceph::buffer::list::contiguous_appender& p);
854 void decode_spanning_blobs(ceph::buffer::ptr::const_iterator& p);
855
856 BlobRef get_spanning_blob(int id) {
857 auto p = spanning_blob_map.find(id);
858 ceph_assert(p != spanning_blob_map.end());
859 return p->second;
860 }
861
862 void update(KeyValueDB::Transaction t, bool force);
863 decltype(BlueStore::Blob::id) allocate_spanning_blob_id();
864 void reshard(
865 KeyValueDB *db,
866 KeyValueDB::Transaction t);
867
868 /// initialize Shards from the onode
869 void init_shards(bool loaded, bool dirty);
870
871 /// return index of shard containing offset
872 /// or -1 if not found
873 int seek_shard(uint32_t offset) {
874 size_t end = shards.size();
875 size_t mid, left = 0;
876 size_t right = end; // one passed the right end
877
878 while (left < right) {
879 mid = left + (right - left) / 2;
880 if (offset >= shards[mid].shard_info->offset) {
881 size_t next = mid + 1;
882 if (next >= end || offset < shards[next].shard_info->offset)
883 return mid;
884 //continue to search forwards
885 left = next;
886 } else {
887 //continue to search backwards
888 right = mid;
889 }
890 }
891
892 return -1; // not found
893 }
894
895 /// check if a range spans a shard
896 bool spans_shard(uint32_t offset, uint32_t length) {
897 if (shards.empty()) {
898 return false;
899 }
900 int s = seek_shard(offset);
901 ceph_assert(s >= 0);
902 if (s == (int)shards.size() - 1) {
903 return false; // last shard
904 }
905 if (offset + length <= shards[s+1].shard_info->offset) {
906 return false;
907 }
908 return true;
909 }
910
911 /// ensure that a range of the map is loaded
912 void fault_range(KeyValueDB *db,
913 uint32_t offset, uint32_t length);
914
915 /// ensure a range of the map is marked dirty
916 void dirty_range(uint32_t offset, uint32_t length);
917
918 /// for seek_lextent test
919 extent_map_t::iterator find(uint64_t offset);
920
921 /// seek to the first lextent including or after offset
922 extent_map_t::iterator seek_lextent(uint64_t offset);
923 extent_map_t::const_iterator seek_lextent(uint64_t offset) const;
924
925 /// add a new Extent
926 void add(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b) {
927 extent_map.insert(*new Extent(lo, o, l, b));
928 }
929
930 /// remove (and delete) an Extent
931 void rm(extent_map_t::iterator p) {
932 extent_map.erase_and_dispose(p, DeleteDisposer());
933 }
934
935 bool has_any_lextents(uint64_t offset, uint64_t length);
936
937 /// consolidate adjacent lextents in extent_map
938 int compress_extent_map(uint64_t offset, uint64_t length);
939
940 /// punch a logical hole. add lextents to deref to target list.
941 void punch_hole(CollectionRef &c,
942 uint64_t offset, uint64_t length,
943 old_extent_map_t *old_extents);
944
945 /// put new lextent into lextent_map overwriting existing ones if
946 /// any and update references accordingly
947 Extent *set_lextent(CollectionRef &c,
948 uint64_t logical_offset,
949 uint64_t offset, uint64_t length,
950 BlobRef b,
951 old_extent_map_t *old_extents);
952
953 /// split a blob (and referring extents)
954 BlobRef split_blob(BlobRef lb, uint32_t blob_offset, uint32_t pos);
955 };
956
957 /// Compressed Blob Garbage collector
958 /*
959 The primary idea of the collector is to estimate a difference between
960 allocation units(AU) currently present for compressed blobs and new AUs
961 required to store that data uncompressed.
962 Estimation is performed for protrusive extents within a logical range
963 determined by a concatenation of old_extents collection and specific(current)
964 write request.
965 The root cause for old_extents use is the need to handle blob ref counts
966 properly. Old extents still hold blob refs and hence we need to traverse
967 the collection to determine if blob to be released.
968 Protrusive extents are extents that fit into the blob std::set in action
969 (ones that are below the logical range from above) but not removed totally
970 due to the current write.
971 E.g. for
972 extent1 <loffs = 100, boffs = 100, len = 100> ->
973 blob1<compressed, len_on_disk=4096, logical_len=8192>
974 extent2 <loffs = 200, boffs = 200, len = 100> ->
975 blob2<raw, len_on_disk=4096, llen=4096>
976 extent3 <loffs = 300, boffs = 300, len = 100> ->
977 blob1<compressed, len_on_disk=4096, llen=8192>
978 extent4 <loffs = 4096, boffs = 0, len = 100> ->
979 blob3<raw, len_on_disk=4096, llen=4096>
980 write(300~100)
981 protrusive extents are within the following ranges <0~300, 400~8192-400>
982 In this case existing AUs that might be removed due to GC (i.e. blob1)
983 use 2x4K bytes.
984 And new AUs expected after GC = 0 since extent1 to be merged into blob2.
985 Hence we should do a collect.
986 */
987 class GarbageCollector
988 {
989 public:
990 /// return amount of allocation units that might be saved due to GC
991 int64_t estimate(
992 uint64_t offset,
993 uint64_t length,
994 const ExtentMap& extent_map,
995 const old_extent_map_t& old_extents,
996 uint64_t min_alloc_size);
997
998 /// return a collection of extents to perform GC on
999 const interval_set<uint64_t>& get_extents_to_collect() const {
1000 return extents_to_collect;
1001 }
1002 GarbageCollector(CephContext* _cct) : cct(_cct) {}
1003
1004 private:
1005 struct BlobInfo {
1006 uint64_t referenced_bytes = 0; ///< amount of bytes referenced in blob
1007 int64_t expected_allocations = 0; ///< new alloc units required
1008 ///< in case of gc fulfilled
1009 bool collect_candidate = false; ///< indicate if blob has any extents
1010 ///< eligible for GC.
1011 extent_map_t::const_iterator first_lextent; ///< points to the first
1012 ///< lextent referring to
1013 ///< the blob if any.
1014 ///< collect_candidate flag
1015 ///< determines the validity
1016 extent_map_t::const_iterator last_lextent; ///< points to the last
1017 ///< lextent referring to
1018 ///< the blob if any.
1019
1020 BlobInfo(uint64_t ref_bytes) :
1021 referenced_bytes(ref_bytes) {
1022 }
1023 };
1024 CephContext* cct;
1025 std::map<Blob*, BlobInfo> affected_blobs; ///< compressed blobs and their ref_map
1026 ///< copies that are affected by the
1027 ///< specific write
1028
1029 ///< protrusive extents that should be collected if GC takes place
1030 interval_set<uint64_t> extents_to_collect;
1031
1032 boost::optional<uint64_t > used_alloc_unit; ///< last processed allocation
1033 ///< unit when traversing
1034 ///< protrusive extents.
1035 ///< Other extents mapped to
1036 ///< this AU to be ignored
1037 ///< (except the case where
1038 ///< uncompressed extent follows
1039 ///< compressed one - see below).
1040 BlobInfo* blob_info_counted = nullptr; ///< std::set if previous allocation unit
1041 ///< caused expected_allocations
1042 ///< counter increment at this blob.
1043 ///< if uncompressed extent follows
1044 ///< a decrement for the
1045 ///< expected_allocations counter
1046 ///< is needed
1047 int64_t expected_allocations = 0; ///< new alloc units required in case
1048 ///< of gc fulfilled
1049 int64_t expected_for_release = 0; ///< alloc units currently used by
1050 ///< compressed blobs that might
1051 ///< gone after GC
1052
1053 protected:
1054 void process_protrusive_extents(const BlueStore::ExtentMap& extent_map,
1055 uint64_t start_offset,
1056 uint64_t end_offset,
1057 uint64_t start_touch_offset,
1058 uint64_t end_touch_offset,
1059 uint64_t min_alloc_size);
1060 };
1061
1062 struct OnodeSpace;
1063 /// an in-memory object
1064 struct Onode {
1065 MEMPOOL_CLASS_HELPERS();
1066
1067 std::atomic_int nref; ///< reference count
1068 Collection *c;
1069 ghobject_t oid;
1070
1071 /// key under PREFIX_OBJ where we are stored
1072 mempool::bluestore_cache_meta::string key;
1073
1074 boost::intrusive::list_member_hook<> lru_item;
1075
1076 bluestore_onode_t onode; ///< metadata stored as value in kv store
1077 bool exists; ///< true if object logically exists
1078 bool cached; ///< Onode is logically in the cache
1079 /// (it can be pinned and hence physically out
1080 /// of it at the moment though)
1081 std::atomic_bool pinned; ///< Onode is pinned
1082 /// (or should be pinned when cached)
1083 ExtentMap extent_map;
1084
1085 // track txc's that have not been committed to kv store (and whose
1086 // effects cannot be read via the kvdb read methods)
1087 std::atomic<int> flushing_count = {0};
1088 std::atomic<int> waiting_count = {0};
1089 /// protect flush_txns
1090 ceph::mutex flush_lock = ceph::make_mutex("BlueStore::Onode::flush_lock");
1091 ceph::condition_variable flush_cond; ///< wait here for uncommitted txns
1092
1093 Onode(Collection *c, const ghobject_t& o,
1094 const mempool::bluestore_cache_meta::string& k)
1095 : nref(0),
1096 c(c),
1097 oid(o),
1098 key(k),
1099 exists(false),
1100 cached(false),
1101 pinned(false),
1102 extent_map(this) {
1103 }
1104 Onode(Collection* c, const ghobject_t& o,
1105 const std::string& k)
1106 : nref(0),
1107 c(c),
1108 oid(o),
1109 key(k),
1110 exists(false),
1111 cached(false),
1112 pinned(false),
1113 extent_map(this) {
1114 }
1115 Onode(Collection* c, const ghobject_t& o,
1116 const char* k)
1117 : nref(0),
1118 c(c),
1119 oid(o),
1120 key(k),
1121 exists(false),
1122 cached(false),
1123 pinned(false),
1124 extent_map(this) {
1125 }
1126
1127 static Onode* decode(
1128 CollectionRef c,
1129 const ghobject_t& oid,
1130 const std::string& key,
1131 const ceph::buffer::list& v);
1132
1133 void dump(ceph::Formatter* f) const;
1134
1135 void flush();
1136 void get();
1137 void put();
1138
1139 inline bool put_cache() {
1140 ceph_assert(!cached);
1141 cached = true;
1142 return !pinned;
1143 }
1144 inline bool pop_cache() {
1145 ceph_assert(cached);
1146 cached = false;
1147 return !pinned;
1148 }
1149
1150 static const std::string& calc_omap_prefix(uint8_t flags);
1151 static void calc_omap_header(uint8_t flags, const Onode* o,
1152 std::string* out);
1153 static void calc_omap_key(uint8_t flags, const Onode* o,
1154 const std::string& key, std::string* out);
1155 static void calc_omap_tail(uint8_t flags, const Onode* o,
1156 std::string* out);
1157
1158 const std::string& get_omap_prefix() {
1159 return calc_omap_prefix(onode.flags);
1160 }
1161 void get_omap_header(std::string* out) {
1162 calc_omap_header(onode.flags, this, out);
1163 }
1164 void get_omap_key(const std::string& key, std::string* out) {
1165 calc_omap_key(onode.flags, this, key, out);
1166 }
1167 void get_omap_tail(std::string* out) {
1168 calc_omap_tail(onode.flags, this, out);
1169 }
1170
1171 void rewrite_omap_key(const std::string& old, std::string *out);
1172 void decode_omap_key(const std::string& key, std::string *user_key);
1173
1174 // Return the offset of an object on disk. This function is intended *only*
1175 // for use with zoned storage devices because in these devices, the objects
1176 // are laid out contiguously on disk, which is not the case in general.
1177 // Also, it should always be called after calling extent_map.fault_range(),
1178 // so that the extent map is loaded.
1179 int64_t zoned_get_ondisk_starting_offset() const {
1180 return extent_map.extent_map.begin()->blob->
1181 get_blob().calc_offset(0, nullptr);
1182 }
1183 };
1184 typedef boost::intrusive_ptr<Onode> OnodeRef;
1185
1186 /// A generic Cache Shard
1187 struct CacheShard {
1188 CephContext *cct;
1189 PerfCounters *logger;
1190
1191 /// protect lru and other structures
1192 ceph::recursive_mutex lock = {
1193 ceph::make_recursive_mutex("BlueStore::CacheShard::lock") };
1194
1195 std::atomic<uint64_t> max = {0};
1196 std::atomic<uint64_t> num = {0};
1197
1198 CacheShard(CephContext* cct) : cct(cct), logger(nullptr) {}
1199 virtual ~CacheShard() {}
1200
1201 void set_max(uint64_t max_) {
1202 max = max_;
1203 }
1204
1205 uint64_t _get_num() {
1206 return num;
1207 }
1208
1209 virtual void _trim_to(uint64_t new_size) = 0;
1210 void _trim() {
1211 if (cct->_conf->objectstore_blackhole) {
1212 // do not trim if we are throwing away IOs a layer down
1213 return;
1214 }
1215 _trim_to(max);
1216 }
1217
1218 void trim() {
1219 std::lock_guard l(lock);
1220 _trim();
1221 }
1222 void flush() {
1223 std::lock_guard l(lock);
1224 // we should not be shutting down after the blackhole is enabled
1225 assert(!cct->_conf->objectstore_blackhole);
1226 _trim_to(0);
1227 }
1228
1229 #ifdef DEBUG_CACHE
1230 virtual void _audit(const char *s) = 0;
1231 #else
1232 void _audit(const char *s) { /* no-op */ }
1233 #endif
1234 };
1235
1236 /// A Generic onode Cache Shard
1237 struct OnodeCacheShard : public CacheShard {
1238 std::atomic<uint64_t> num_pinned = {0};
1239
1240 std::array<std::pair<ghobject_t, ceph::mono_clock::time_point>, 64> dumped_onodes;
1241
1242 virtual void _pin(Onode* o) = 0;
1243 virtual void _unpin(Onode* o) = 0;
1244
1245 public:
1246 OnodeCacheShard(CephContext* cct) : CacheShard(cct) {}
1247 static OnodeCacheShard *create(CephContext* cct, std::string type,
1248 PerfCounters *logger);
1249 virtual void _add(Onode* o, int level) = 0;
1250 virtual void _rm(Onode* o) = 0;
1251 virtual void _unpin_and_rm(Onode* o) = 0;
1252
1253 virtual void move_pinned(OnodeCacheShard *to, Onode *o) = 0;
1254 virtual void add_stats(uint64_t *onodes, uint64_t *pinned_onodes) = 0;
1255 bool empty() {
1256 return _get_num() == 0;
1257 }
1258 };
1259
1260 /// A Generic buffer Cache Shard
1261 struct BufferCacheShard : public CacheShard {
1262 std::atomic<uint64_t> num_extents = {0};
1263 std::atomic<uint64_t> num_blobs = {0};
1264 uint64_t buffer_bytes = 0;
1265
1266 public:
1267 BufferCacheShard(CephContext* cct) : CacheShard(cct) {}
1268 static BufferCacheShard *create(CephContext* cct, std::string type,
1269 PerfCounters *logger);
1270 virtual void _add(Buffer *b, int level, Buffer *near) = 0;
1271 virtual void _rm(Buffer *b) = 0;
1272 virtual void _move(BufferCacheShard *src, Buffer *b) = 0;
1273 virtual void _touch(Buffer *b) = 0;
1274 virtual void _adjust_size(Buffer *b, int64_t delta) = 0;
1275
1276 uint64_t _get_bytes() {
1277 return buffer_bytes;
1278 }
1279
1280 void add_extent() {
1281 ++num_extents;
1282 }
1283 void rm_extent() {
1284 --num_extents;
1285 }
1286
1287 void add_blob() {
1288 ++num_blobs;
1289 }
1290 void rm_blob() {
1291 --num_blobs;
1292 }
1293
1294 virtual void add_stats(uint64_t *extents,
1295 uint64_t *blobs,
1296 uint64_t *buffers,
1297 uint64_t *bytes) = 0;
1298
1299 bool empty() {
1300 std::lock_guard l(lock);
1301 return _get_bytes() == 0;
1302 }
1303 };
1304
1305 struct OnodeSpace {
1306 OnodeCacheShard *cache;
1307
1308 private:
1309 /// forward lookups
1310 mempool::bluestore_cache_meta::unordered_map<ghobject_t,OnodeRef> onode_map;
1311
1312 friend struct Collection; // for split_cache()
1313 friend struct Onode; // for put()
1314 friend struct LruOnodeCacheShard;
1315 void _remove(const ghobject_t& oid);
1316 public:
1317 OnodeSpace(OnodeCacheShard *c) : cache(c) {}
1318 ~OnodeSpace() {
1319 clear();
1320 }
1321
1322 OnodeRef add(const ghobject_t& oid, OnodeRef& o);
1323 OnodeRef lookup(const ghobject_t& o);
1324 void rename(OnodeRef& o, const ghobject_t& old_oid,
1325 const ghobject_t& new_oid,
1326 const mempool::bluestore_cache_meta::string& new_okey);
1327 void clear();
1328 bool empty();
1329
1330 template <int LogLevelV>
1331 void dump(CephContext *cct);
1332
1333 /// return true if f true for any item
1334 bool map_any(std::function<bool(Onode*)> f);
1335 };
1336
1337 class OpSequencer;
1338 using OpSequencerRef = ceph::ref_t<OpSequencer>;
1339
1340 struct Collection : public CollectionImpl {
1341 BlueStore *store;
1342 OpSequencerRef osr;
1343 BufferCacheShard *cache; ///< our cache shard
1344 bluestore_cnode_t cnode;
1345 ceph::shared_mutex lock =
1346 ceph::make_shared_mutex("BlueStore::Collection::lock", true, false);
1347
1348 bool exists;
1349
1350 SharedBlobSet shared_blob_set; ///< open SharedBlobs
1351
1352 // cache onodes on a per-collection basis to avoid lock
1353 // contention.
1354 OnodeSpace onode_map;
1355
1356 //pool options
1357 pool_opts_t pool_opts;
1358 ContextQueue *commit_queue;
1359
1360 OnodeCacheShard* get_onode_cache() const {
1361 return onode_map.cache;
1362 }
1363 OnodeRef get_onode(const ghobject_t& oid, bool create, bool is_createop=false);
1364
1365 // the terminology is confusing here, sorry!
1366 //
1367 // blob_t shared_blob_t
1368 // !shared unused -> open
1369 // shared !loaded -> open + shared
1370 // shared loaded -> open + shared + loaded
1371 //
1372 // i.e.,
1373 // open = SharedBlob is instantiated
1374 // shared = blob_t shared flag is std::set; SharedBlob is hashed.
1375 // loaded = SharedBlob::shared_blob_t is loaded from kv store
1376 void open_shared_blob(uint64_t sbid, BlobRef b);
1377 void load_shared_blob(SharedBlobRef sb);
1378 void make_blob_shared(uint64_t sbid, BlobRef b);
1379 uint64_t make_blob_unshared(SharedBlob *sb);
1380
1381 BlobRef new_blob() {
1382 BlobRef b = new Blob();
1383 b->shared_blob = new SharedBlob(this);
1384 return b;
1385 }
1386
1387 bool contains(const ghobject_t& oid) {
1388 if (cid.is_meta())
1389 return oid.hobj.pool == -1;
1390 spg_t spgid;
1391 if (cid.is_pg(&spgid))
1392 return
1393 spgid.pgid.contains(cnode.bits, oid) &&
1394 oid.shard_id == spgid.shard;
1395 return false;
1396 }
1397
1398 int64_t pool() const {
1399 return cid.pool();
1400 }
1401
1402 void split_cache(Collection *dest);
1403
1404 bool flush_commit(Context *c) override;
1405 void flush() override;
1406 void flush_all_but_last();
1407
1408 Collection(BlueStore *ns, OnodeCacheShard *oc, BufferCacheShard *bc, coll_t c);
1409 };
1410
1411 class OmapIteratorImpl : public ObjectMap::ObjectMapIteratorImpl {
1412 CollectionRef c;
1413 OnodeRef o;
1414 KeyValueDB::Iterator it;
1415 std::string head, tail;
1416
1417 std::string _stringify() const;
1418
1419 public:
1420 OmapIteratorImpl(CollectionRef c, OnodeRef o, KeyValueDB::Iterator it);
1421 int seek_to_first() override;
1422 int upper_bound(const std::string &after) override;
1423 int lower_bound(const std::string &to) override;
1424 bool valid() override;
1425 int next() override;
1426 std::string key() override;
1427 ceph::buffer::list value() override;
1428 std::string tail_key() override {
1429 return tail;
1430 }
1431
1432 int status() override {
1433 return 0;
1434 }
1435 };
1436
1437 struct volatile_statfs{
1438 enum {
1439 STATFS_ALLOCATED = 0,
1440 STATFS_STORED,
1441 STATFS_COMPRESSED_ORIGINAL,
1442 STATFS_COMPRESSED,
1443 STATFS_COMPRESSED_ALLOCATED,
1444 STATFS_LAST
1445 };
1446 int64_t values[STATFS_LAST];
1447 volatile_statfs() {
1448 memset(this, 0, sizeof(volatile_statfs));
1449 }
1450 void reset() {
1451 *this = volatile_statfs();
1452 }
1453 void publish(store_statfs_t* buf) const {
1454 buf->allocated = allocated();
1455 buf->data_stored = stored();
1456 buf->data_compressed = compressed();
1457 buf->data_compressed_original = compressed_original();
1458 buf->data_compressed_allocated = compressed_allocated();
1459 }
1460
1461 volatile_statfs& operator+=(const volatile_statfs& other) {
1462 for (size_t i = 0; i < STATFS_LAST; ++i) {
1463 values[i] += other.values[i];
1464 }
1465 return *this;
1466 }
1467 int64_t& allocated() {
1468 return values[STATFS_ALLOCATED];
1469 }
1470 int64_t& stored() {
1471 return values[STATFS_STORED];
1472 }
1473 int64_t& compressed_original() {
1474 return values[STATFS_COMPRESSED_ORIGINAL];
1475 }
1476 int64_t& compressed() {
1477 return values[STATFS_COMPRESSED];
1478 }
1479 int64_t& compressed_allocated() {
1480 return values[STATFS_COMPRESSED_ALLOCATED];
1481 }
1482 int64_t allocated() const {
1483 return values[STATFS_ALLOCATED];
1484 }
1485 int64_t stored() const {
1486 return values[STATFS_STORED];
1487 }
1488 int64_t compressed_original() const {
1489 return values[STATFS_COMPRESSED_ORIGINAL];
1490 }
1491 int64_t compressed() const {
1492 return values[STATFS_COMPRESSED];
1493 }
1494 int64_t compressed_allocated() const {
1495 return values[STATFS_COMPRESSED_ALLOCATED];
1496 }
1497 volatile_statfs& operator=(const store_statfs_t& st) {
1498 values[STATFS_ALLOCATED] = st.allocated;
1499 values[STATFS_STORED] = st.data_stored;
1500 values[STATFS_COMPRESSED_ORIGINAL] = st.data_compressed_original;
1501 values[STATFS_COMPRESSED] = st.data_compressed;
1502 values[STATFS_COMPRESSED_ALLOCATED] = st.data_compressed_allocated;
1503 return *this;
1504 }
1505 bool is_empty() {
1506 return values[STATFS_ALLOCATED] == 0 &&
1507 values[STATFS_STORED] == 0 &&
1508 values[STATFS_COMPRESSED] == 0 &&
1509 values[STATFS_COMPRESSED_ORIGINAL] == 0 &&
1510 values[STATFS_COMPRESSED_ALLOCATED] == 0;
1511 }
1512 void decode(ceph::buffer::list::const_iterator& it) {
1513 using ceph::decode;
1514 for (size_t i = 0; i < STATFS_LAST; i++) {
1515 decode(values[i], it);
1516 }
1517 }
1518
1519 void encode(ceph::buffer::list& bl) {
1520 using ceph::encode;
1521 for (size_t i = 0; i < STATFS_LAST; i++) {
1522 encode(values[i], bl);
1523 }
1524 }
1525 };
1526
1527 struct TransContext final : public AioContext {
1528 MEMPOOL_CLASS_HELPERS();
1529
1530 typedef enum {
1531 STATE_PREPARE,
1532 STATE_AIO_WAIT,
1533 STATE_IO_DONE,
1534 STATE_KV_QUEUED, // queued for kv_sync_thread submission
1535 STATE_KV_SUBMITTED, // submitted to kv; not yet synced
1536 STATE_KV_DONE,
1537 STATE_DEFERRED_QUEUED, // in deferred_queue (pending or running)
1538 STATE_DEFERRED_CLEANUP, // remove deferred kv record
1539 STATE_DEFERRED_DONE,
1540 STATE_FINISHING,
1541 STATE_DONE,
1542 } state_t;
1543
1544 const char *get_state_name() {
1545 switch (state) {
1546 case STATE_PREPARE: return "prepare";
1547 case STATE_AIO_WAIT: return "aio_wait";
1548 case STATE_IO_DONE: return "io_done";
1549 case STATE_KV_QUEUED: return "kv_queued";
1550 case STATE_KV_SUBMITTED: return "kv_submitted";
1551 case STATE_KV_DONE: return "kv_done";
1552 case STATE_DEFERRED_QUEUED: return "deferred_queued";
1553 case STATE_DEFERRED_CLEANUP: return "deferred_cleanup";
1554 case STATE_DEFERRED_DONE: return "deferred_done";
1555 case STATE_FINISHING: return "finishing";
1556 case STATE_DONE: return "done";
1557 }
1558 return "???";
1559 }
1560
1561 #if defined(WITH_LTTNG)
1562 const char *get_state_latency_name(int state) {
1563 switch (state) {
1564 case l_bluestore_state_prepare_lat: return "prepare";
1565 case l_bluestore_state_aio_wait_lat: return "aio_wait";
1566 case l_bluestore_state_io_done_lat: return "io_done";
1567 case l_bluestore_state_kv_queued_lat: return "kv_queued";
1568 case l_bluestore_state_kv_committing_lat: return "kv_committing";
1569 case l_bluestore_state_kv_done_lat: return "kv_done";
1570 case l_bluestore_state_deferred_queued_lat: return "deferred_queued";
1571 case l_bluestore_state_deferred_cleanup_lat: return "deferred_cleanup";
1572 case l_bluestore_state_finishing_lat: return "finishing";
1573 case l_bluestore_state_done_lat: return "done";
1574 }
1575 return "???";
1576 }
1577 #endif
1578
1579 inline void set_state(state_t s) {
1580 state = s;
1581 #ifdef WITH_BLKIN
1582 if (trace) {
1583 trace.event(get_state_name());
1584 }
1585 #endif
1586 }
1587 inline state_t get_state() {
1588 return state;
1589 }
1590
1591 CollectionRef ch;
1592 OpSequencerRef osr; // this should be ch->osr
1593 boost::intrusive::list_member_hook<> sequencer_item;
1594
1595 uint64_t bytes = 0, ios = 0, cost = 0;
1596
1597 std::set<OnodeRef> onodes; ///< these need to be updated/written
1598 std::set<OnodeRef> modified_objects; ///< objects we modified (and need a ref)
1599
1600 // A map from onode to a vector of object offset. For new objects created
1601 // in the transaction we append the new offset to the vector, for
1602 // overwritten objects we append the negative of the previous ondisk offset
1603 // followed by the new offset, and for truncated objects we append the
1604 // negative of the previous ondisk offset. We need to maintain a vector of
1605 // offsets because *within the same transaction* an object may be truncated
1606 // and then written again, or an object may be overwritten multiple times to
1607 // different zones. See update_cleaning_metadata function for how this map
1608 // is used.
1609 std::map<OnodeRef, std::vector<int64_t>> zoned_onode_to_offset_map;
1610
1611 std::set<SharedBlobRef> shared_blobs; ///< these need to be updated/written
1612 std::set<SharedBlobRef> shared_blobs_written; ///< update these on io completion
1613
1614 KeyValueDB::Transaction t; ///< then we will commit this
1615 std::list<Context*> oncommits; ///< more commit completions
1616 std::list<CollectionRef> removed_collections; ///< colls we removed
1617
1618 boost::intrusive::list_member_hook<> deferred_queue_item;
1619 bluestore_deferred_transaction_t *deferred_txn = nullptr; ///< if any
1620
1621 interval_set<uint64_t> allocated, released;
1622 volatile_statfs statfs_delta; ///< overall store statistics delta
1623 uint64_t osd_pool_id = META_POOL_ID; ///< osd pool id we're operating on
1624
1625 IOContext ioc;
1626 bool had_ios = false; ///< true if we submitted IOs before our kv txn
1627
1628 uint64_t seq = 0;
1629 ceph::mono_clock::time_point start;
1630 ceph::mono_clock::time_point last_stamp;
1631
1632 uint64_t last_nid = 0; ///< if non-zero, highest new nid we allocated
1633 uint64_t last_blobid = 0; ///< if non-zero, highest new blobid we allocated
1634
1635 #if defined(WITH_LTTNG)
1636 bool tracing = false;
1637 #endif
1638
1639 #ifdef WITH_BLKIN
1640 ZTracer::Trace trace;
1641 #endif
1642
1643 explicit TransContext(CephContext* cct, Collection *c, OpSequencer *o,
1644 std::list<Context*> *on_commits)
1645 : ch(c),
1646 osr(o),
1647 ioc(cct, this),
1648 start(ceph::mono_clock::now()) {
1649 last_stamp = start;
1650 if (on_commits) {
1651 oncommits.swap(*on_commits);
1652 }
1653 }
1654 ~TransContext() {
1655 #ifdef WITH_BLKIN
1656 if (trace) {
1657 trace.event("txc destruct");
1658 }
1659 #endif
1660 delete deferred_txn;
1661 }
1662
1663 void write_onode(OnodeRef &o) {
1664 onodes.insert(o);
1665 }
1666 void write_shared_blob(SharedBlobRef &sb) {
1667 shared_blobs.insert(sb);
1668 }
1669 void unshare_blob(SharedBlob *sb) {
1670 shared_blobs.erase(sb);
1671 }
1672
1673 /// note we logically modified object (when onode itself is unmodified)
1674 void note_modified_object(OnodeRef &o) {
1675 // onode itself isn't written, though
1676 modified_objects.insert(o);
1677 }
1678 void note_removed_object(OnodeRef& o) {
1679 modified_objects.insert(o);
1680 onodes.erase(o);
1681 }
1682
1683 void zoned_note_new_object(OnodeRef &o) {
1684 auto [_, ok] = zoned_onode_to_offset_map.emplace(
1685 std::pair<OnodeRef, std::vector<int64_t>>(o, {o->zoned_get_ondisk_starting_offset()}));
1686 ceph_assert(ok);
1687 }
1688
1689 void zoned_note_updated_object(OnodeRef &o, int64_t prev_offset) {
1690 int64_t new_offset = o->zoned_get_ondisk_starting_offset();
1691 auto [it, ok] = zoned_onode_to_offset_map.emplace(
1692 std::pair<OnodeRef, std::vector<int64_t>>(o, {-prev_offset, new_offset}));
1693 if (!ok) {
1694 it->second.push_back(-prev_offset);
1695 it->second.push_back(new_offset);
1696 }
1697 }
1698
1699 void zoned_note_truncated_object(OnodeRef &o, int64_t offset) {
1700 auto [it, ok] = zoned_onode_to_offset_map.emplace(
1701 std::pair<OnodeRef, std::vector<int64_t>>(o, {-offset}));
1702 if (!ok) {
1703 it->second.push_back(-offset);
1704 }
1705 }
1706
1707 void aio_finish(BlueStore *store) override {
1708 store->txc_aio_finish(this);
1709 }
1710 private:
1711 state_t state = STATE_PREPARE;
1712 };
1713
1714 class BlueStoreThrottle {
1715 #if defined(WITH_LTTNG)
1716 const std::chrono::time_point<ceph::mono_clock> time_base = ceph::mono_clock::now();
1717
1718 // Time of last chosen io (microseconds)
1719 std::atomic<uint64_t> previous_emitted_tp_time_mono_mcs = {0};
1720 std::atomic<uint64_t> ios_started_since_last_traced = {0};
1721 std::atomic<uint64_t> ios_completed_since_last_traced = {0};
1722
1723 std::atomic_uint pending_kv_ios = {0};
1724 std::atomic_uint pending_deferred_ios = {0};
1725
1726 // Min period between trace points (microseconds)
1727 std::atomic<uint64_t> trace_period_mcs = {0};
1728
1729 bool should_trace(
1730 uint64_t *started,
1731 uint64_t *completed) {
1732 uint64_t min_period_mcs = trace_period_mcs.load(
1733 std::memory_order_relaxed);
1734
1735 if (min_period_mcs == 0) {
1736 *started = 1;
1737 *completed = ios_completed_since_last_traced.exchange(0);
1738 return true;
1739 } else {
1740 ios_started_since_last_traced++;
1741 auto now_mcs = ceph::to_microseconds<uint64_t>(
1742 ceph::mono_clock::now() - time_base);
1743 uint64_t previous_mcs = previous_emitted_tp_time_mono_mcs;
1744 uint64_t period_mcs = now_mcs - previous_mcs;
1745 if (period_mcs > min_period_mcs) {
1746 if (previous_emitted_tp_time_mono_mcs.compare_exchange_strong(
1747 previous_mcs, now_mcs)) {
1748 // This would be racy at a sufficiently extreme trace rate, but isn't
1749 // worth the overhead of doing it more carefully.
1750 *started = ios_started_since_last_traced.exchange(0);
1751 *completed = ios_completed_since_last_traced.exchange(0);
1752 return true;
1753 }
1754 }
1755 return false;
1756 }
1757 }
1758 #endif
1759
1760 #if defined(WITH_LTTNG)
1761 void emit_initial_tracepoint(
1762 KeyValueDB &db,
1763 TransContext &txc,
1764 ceph::mono_clock::time_point);
1765 #else
1766 void emit_initial_tracepoint(
1767 KeyValueDB &db,
1768 TransContext &txc,
1769 ceph::mono_clock::time_point) {}
1770 #endif
1771
1772 Throttle throttle_bytes; ///< submit to commit
1773 Throttle throttle_deferred_bytes; ///< submit to deferred complete
1774
1775 public:
1776 BlueStoreThrottle(CephContext *cct) :
1777 throttle_bytes(cct, "bluestore_throttle_bytes", 0),
1778 throttle_deferred_bytes(cct, "bluestore_throttle_deferred_bytes", 0)
1779 {
1780 reset_throttle(cct->_conf);
1781 }
1782
1783 #if defined(WITH_LTTNG)
1784 void complete_kv(TransContext &txc);
1785 void complete(TransContext &txc);
1786 #else
1787 void complete_kv(TransContext &txc) {}
1788 void complete(TransContext &txc) {}
1789 #endif
1790
1791 ceph::mono_clock::duration log_state_latency(
1792 TransContext &txc, PerfCounters *logger, int state);
1793 bool try_start_transaction(
1794 KeyValueDB &db,
1795 TransContext &txc,
1796 ceph::mono_clock::time_point);
1797 void finish_start_transaction(
1798 KeyValueDB &db,
1799 TransContext &txc,
1800 ceph::mono_clock::time_point);
1801 void release_kv_throttle(uint64_t cost) {
1802 throttle_bytes.put(cost);
1803 }
1804 void release_deferred_throttle(uint64_t cost) {
1805 throttle_deferred_bytes.put(cost);
1806 }
1807 bool should_submit_deferred() {
1808 return throttle_deferred_bytes.past_midpoint();
1809 }
1810 void reset_throttle(const ConfigProxy &conf) {
1811 throttle_bytes.reset_max(conf->bluestore_throttle_bytes);
1812 throttle_deferred_bytes.reset_max(
1813 conf->bluestore_throttle_bytes +
1814 conf->bluestore_throttle_deferred_bytes);
1815 #if defined(WITH_LTTNG)
1816 double rate = conf.get_val<double>("bluestore_throttle_trace_rate");
1817 trace_period_mcs = rate > 0 ? floor((1/rate) * 1000000.0) : 0;
1818 #endif
1819 }
1820 } throttle;
1821
1822 typedef boost::intrusive::list<
1823 TransContext,
1824 boost::intrusive::member_hook<
1825 TransContext,
1826 boost::intrusive::list_member_hook<>,
1827 &TransContext::deferred_queue_item> > deferred_queue_t;
1828
1829 struct DeferredBatch final : public AioContext {
1830 OpSequencer *osr;
1831 struct deferred_io {
1832 ceph::buffer::list bl; ///< data
1833 uint64_t seq; ///< deferred transaction seq
1834 };
1835 std::map<uint64_t,deferred_io> iomap; ///< map of ios in this batch
1836 deferred_queue_t txcs; ///< txcs in this batch
1837 IOContext ioc; ///< our aios
1838 /// bytes of pending io for each deferred seq (may be 0)
1839 std::map<uint64_t,int> seq_bytes;
1840
1841 void _discard(CephContext *cct, uint64_t offset, uint64_t length);
1842 void _audit(CephContext *cct);
1843
1844 DeferredBatch(CephContext *cct, OpSequencer *osr)
1845 : osr(osr), ioc(cct, this) {}
1846
1847 /// prepare a write
1848 void prepare_write(CephContext *cct,
1849 uint64_t seq, uint64_t offset, uint64_t length,
1850 ceph::buffer::list::const_iterator& p);
1851
1852 void aio_finish(BlueStore *store) override {
1853 store->_deferred_aio_finish(osr);
1854 }
1855 };
1856
1857 class OpSequencer : public RefCountedObject {
1858 public:
1859 ceph::mutex qlock = ceph::make_mutex("BlueStore::OpSequencer::qlock");
1860 ceph::condition_variable qcond;
1861 typedef boost::intrusive::list<
1862 TransContext,
1863 boost::intrusive::member_hook<
1864 TransContext,
1865 boost::intrusive::list_member_hook<>,
1866 &TransContext::sequencer_item> > q_list_t;
1867 q_list_t q; ///< transactions
1868
1869 boost::intrusive::list_member_hook<> deferred_osr_queue_item;
1870
1871 DeferredBatch *deferred_running = nullptr;
1872 DeferredBatch *deferred_pending = nullptr;
1873
1874 ceph::mutex deferred_lock = ceph::make_mutex("BlueStore::OpSequencer::deferred_lock");
1875
1876 BlueStore *store;
1877 coll_t cid;
1878
1879 uint64_t last_seq = 0;
1880
1881 std::atomic_int txc_with_unstable_io = {0}; ///< num txcs with unstable io
1882
1883 std::atomic_int kv_committing_serially = {0};
1884
1885 std::atomic_int kv_submitted_waiters = {0};
1886
1887 std::atomic_bool zombie = {false}; ///< in zombie_osr std::set (collection going away)
1888
1889 const uint32_t sequencer_id;
1890
1891 uint32_t get_sequencer_id() const {
1892 return sequencer_id;
1893 }
1894
1895 void queue_new(TransContext *txc) {
1896 std::lock_guard l(qlock);
1897 txc->seq = ++last_seq;
1898 q.push_back(*txc);
1899 }
1900
1901 void drain() {
1902 std::unique_lock l(qlock);
1903 while (!q.empty())
1904 qcond.wait(l);
1905 }
1906
1907 void drain_preceding(TransContext *txc) {
1908 std::unique_lock l(qlock);
1909 while (&q.front() != txc)
1910 qcond.wait(l);
1911 }
1912
1913 bool _is_all_kv_submitted() {
1914 // caller must hold qlock & q.empty() must not empty
1915 ceph_assert(!q.empty());
1916 TransContext *txc = &q.back();
1917 if (txc->get_state() >= TransContext::STATE_KV_SUBMITTED) {
1918 return true;
1919 }
1920 return false;
1921 }
1922
1923 void flush() {
1924 std::unique_lock l(qlock);
1925 while (true) {
1926 // std::set flag before the check because the condition
1927 // may become true outside qlock, and we need to make
1928 // sure those threads see waiters and signal qcond.
1929 ++kv_submitted_waiters;
1930 if (q.empty() || _is_all_kv_submitted()) {
1931 --kv_submitted_waiters;
1932 return;
1933 }
1934 qcond.wait(l);
1935 --kv_submitted_waiters;
1936 }
1937 }
1938
1939 void flush_all_but_last() {
1940 std::unique_lock l(qlock);
1941 assert (q.size() >= 1);
1942 while (true) {
1943 // std::set flag before the check because the condition
1944 // may become true outside qlock, and we need to make
1945 // sure those threads see waiters and signal qcond.
1946 ++kv_submitted_waiters;
1947 if (q.size() <= 1) {
1948 --kv_submitted_waiters;
1949 return;
1950 } else {
1951 auto it = q.rbegin();
1952 it++;
1953 if (it->get_state() >= TransContext::STATE_KV_SUBMITTED) {
1954 --kv_submitted_waiters;
1955 return;
1956 }
1957 }
1958 qcond.wait(l);
1959 --kv_submitted_waiters;
1960 }
1961 }
1962
1963 bool flush_commit(Context *c) {
1964 std::lock_guard l(qlock);
1965 if (q.empty()) {
1966 return true;
1967 }
1968 TransContext *txc = &q.back();
1969 if (txc->get_state() >= TransContext::STATE_KV_DONE) {
1970 return true;
1971 }
1972 txc->oncommits.push_back(c);
1973 return false;
1974 }
1975 private:
1976 FRIEND_MAKE_REF(OpSequencer);
1977 OpSequencer(BlueStore *store, uint32_t sequencer_id, const coll_t& c)
1978 : RefCountedObject(store->cct),
1979 store(store), cid(c), sequencer_id(sequencer_id) {
1980 }
1981 ~OpSequencer() {
1982 ceph_assert(q.empty());
1983 }
1984 };
1985
1986 typedef boost::intrusive::list<
1987 OpSequencer,
1988 boost::intrusive::member_hook<
1989 OpSequencer,
1990 boost::intrusive::list_member_hook<>,
1991 &OpSequencer::deferred_osr_queue_item> > deferred_osr_queue_t;
1992
1993 struct KVSyncThread : public Thread {
1994 BlueStore *store;
1995 explicit KVSyncThread(BlueStore *s) : store(s) {}
1996 void *entry() override {
1997 store->_kv_sync_thread();
1998 return NULL;
1999 }
2000 };
2001 struct KVFinalizeThread : public Thread {
2002 BlueStore *store;
2003 explicit KVFinalizeThread(BlueStore *s) : store(s) {}
2004 void *entry() override {
2005 store->_kv_finalize_thread();
2006 return NULL;
2007 }
2008 };
2009 struct ZonedCleanerThread : public Thread {
2010 BlueStore *store;
2011 explicit ZonedCleanerThread(BlueStore *s) : store(s) {}
2012 void *entry() override {
2013 store->_zoned_cleaner_thread();
2014 return nullptr;
2015 }
2016 };
2017
2018 struct DBHistogram {
2019 struct value_dist {
2020 uint64_t count;
2021 uint32_t max_len;
2022 };
2023
2024 struct key_dist {
2025 uint64_t count;
2026 uint32_t max_len;
2027 std::map<int, struct value_dist> val_map; ///< slab id to count, max length of value and key
2028 };
2029
2030 std::map<std::string, std::map<int, struct key_dist> > key_hist;
2031 std::map<int, uint64_t> value_hist;
2032 int get_key_slab(size_t sz);
2033 std::string get_key_slab_to_range(int slab);
2034 int get_value_slab(size_t sz);
2035 std::string get_value_slab_to_range(int slab);
2036 void update_hist_entry(std::map<std::string, std::map<int, struct key_dist> > &key_hist,
2037 const std::string &prefix, size_t key_size, size_t value_size);
2038 void dump(ceph::Formatter *f);
2039 };
2040
2041 struct BigDeferredWriteContext {
2042 uint64_t off = 0; // original logical offset
2043 uint32_t b_off = 0; // blob relative offset
2044 uint32_t used = 0;
2045 uint64_t head_read = 0;
2046 uint64_t tail_read = 0;
2047 BlobRef blob_ref;
2048 uint64_t blob_start = 0;
2049 PExtentVector res_extents;
2050
2051 inline uint64_t blob_aligned_len() const {
2052 return used + head_read + tail_read;
2053 }
2054
2055 bool can_defer(BlueStore::extent_map_t::iterator ep,
2056 uint64_t prefer_deferred_size,
2057 uint64_t block_size,
2058 uint64_t offset,
2059 uint64_t l);
2060 bool apply_defer();
2061 };
2062
2063 // --------------------------------------------------------
2064 // members
2065 private:
2066 BlueFS *bluefs = nullptr;
2067 bluefs_layout_t bluefs_layout;
2068 utime_t next_dump_on_bluefs_alloc_failure;
2069
2070 KeyValueDB *db = nullptr;
2071 BlockDevice *bdev = nullptr;
2072 std::string freelist_type;
2073 FreelistManager *fm = nullptr;
2074
2075 bluefs_shared_alloc_context_t shared_alloc;
2076
2077 uuid_d fsid;
2078 int path_fd = -1; ///< open handle to $path
2079 int fsid_fd = -1; ///< open handle (locked) to $path/fsid
2080 bool mounted = false;
2081
2082 ceph::shared_mutex coll_lock = ceph::make_shared_mutex("BlueStore::coll_lock"); ///< rwlock to protect coll_map
2083 mempool::bluestore_cache_other::unordered_map<coll_t, CollectionRef> coll_map;
2084 bool collections_had_errors = false;
2085 std::map<coll_t,CollectionRef> new_coll_map;
2086
2087 std::vector<OnodeCacheShard*> onode_cache_shards;
2088 std::vector<BufferCacheShard*> buffer_cache_shards;
2089
2090 /// protect zombie_osr_set
2091 ceph::mutex zombie_osr_lock = ceph::make_mutex("BlueStore::zombie_osr_lock");
2092 uint32_t next_sequencer_id = 0;
2093 std::map<coll_t,OpSequencerRef> zombie_osr_set; ///< std::set of OpSequencers for deleted collections
2094
2095 std::atomic<uint64_t> nid_last = {0};
2096 std::atomic<uint64_t> nid_max = {0};
2097 std::atomic<uint64_t> blobid_last = {0};
2098 std::atomic<uint64_t> blobid_max = {0};
2099
2100 ceph::mutex deferred_lock = ceph::make_mutex("BlueStore::deferred_lock");
2101 ceph::mutex atomic_alloc_and_submit_lock =
2102 ceph::make_mutex("BlueStore::atomic_alloc_and_submit_lock");
2103 std::atomic<uint64_t> deferred_seq = {0};
2104 deferred_osr_queue_t deferred_queue; ///< osr's with deferred io pending
2105 std::atomic_int deferred_queue_size = {0}; ///< num txc's queued across all osrs
2106 std::atomic_int deferred_aggressive = {0}; ///< aggressive wakeup of kv thread
2107 Finisher finisher;
2108 utime_t deferred_last_submitted = utime_t();
2109
2110 KVSyncThread kv_sync_thread;
2111 ceph::mutex kv_lock = ceph::make_mutex("BlueStore::kv_lock");
2112 ceph::condition_variable kv_cond;
2113 bool _kv_only = false;
2114 bool kv_sync_started = false;
2115 bool kv_stop = false;
2116 bool kv_finalize_started = false;
2117 bool kv_finalize_stop = false;
2118 std::deque<TransContext*> kv_queue; ///< ready, already submitted
2119 std::deque<TransContext*> kv_queue_unsubmitted; ///< ready, need submit by kv thread
2120 std::deque<TransContext*> kv_committing; ///< currently syncing
2121 std::deque<DeferredBatch*> deferred_done_queue; ///< deferred ios done
2122 bool kv_sync_in_progress = false;
2123
2124 KVFinalizeThread kv_finalize_thread;
2125 ceph::mutex kv_finalize_lock = ceph::make_mutex("BlueStore::kv_finalize_lock");
2126 ceph::condition_variable kv_finalize_cond;
2127 std::deque<TransContext*> kv_committing_to_finalize; ///< pending finalization
2128 std::deque<DeferredBatch*> deferred_stable_to_finalize; ///< pending finalization
2129 bool kv_finalize_in_progress = false;
2130
2131 ZonedCleanerThread zoned_cleaner_thread;
2132 ceph::mutex zoned_cleaner_lock = ceph::make_mutex("BlueStore::zoned_cleaner_lock");
2133 ceph::condition_variable zoned_cleaner_cond;
2134 bool zoned_cleaner_started = false;
2135 bool zoned_cleaner_stop = false;
2136 std::deque<uint64_t> zoned_cleaner_queue;
2137
2138 PerfCounters *logger = nullptr;
2139
2140 std::list<CollectionRef> removed_collections;
2141
2142 ceph::shared_mutex debug_read_error_lock =
2143 ceph::make_shared_mutex("BlueStore::debug_read_error_lock");
2144 std::set<ghobject_t> debug_data_error_objects;
2145 std::set<ghobject_t> debug_mdata_error_objects;
2146
2147 std::atomic<int> csum_type = {Checksummer::CSUM_CRC32C};
2148
2149 uint64_t block_size = 0; ///< block size of block device (power of 2)
2150 uint64_t block_mask = 0; ///< mask to get just the block offset
2151 size_t block_size_order = 0; ///< bits to shift to get block size
2152
2153 uint64_t min_alloc_size; ///< minimum allocation unit (power of 2)
2154 ///< bits for min_alloc_size
2155 uint8_t min_alloc_size_order = 0;
2156 static_assert(std::numeric_limits<uint8_t>::max() >
2157 std::numeric_limits<decltype(min_alloc_size)>::digits,
2158 "not enough bits for min_alloc_size");
2159
2160 enum {
2161 // Please preserve the order since it's DB persistent
2162 OMAP_BULK = 0,
2163 OMAP_PER_POOL = 1,
2164 OMAP_PER_PG = 2,
2165 } per_pool_omap = OMAP_BULK;
2166
2167 ///< maximum allocation unit (power of 2)
2168 std::atomic<uint64_t> max_alloc_size = {0};
2169
2170 ///< number threshold for forced deferred writes
2171 std::atomic<int> deferred_batch_ops = {0};
2172
2173 ///< size threshold for forced deferred writes
2174 std::atomic<uint64_t> prefer_deferred_size = {0};
2175
2176 ///< approx cost per io, in bytes
2177 std::atomic<uint64_t> throttle_cost_per_io = {0};
2178
2179 std::atomic<Compressor::CompressionMode> comp_mode =
2180 {Compressor::COMP_NONE}; ///< compression mode
2181 CompressorRef compressor;
2182 std::atomic<uint64_t> comp_min_blob_size = {0};
2183 std::atomic<uint64_t> comp_max_blob_size = {0};
2184
2185 std::atomic<uint64_t> max_blob_size = {0}; ///< maximum blob size
2186
2187 uint64_t kv_ios = 0;
2188 uint64_t kv_throttle_costs = 0;
2189
2190 // cache trim control
2191 uint64_t cache_size = 0; ///< total cache size
2192 double cache_meta_ratio = 0; ///< cache ratio dedicated to metadata
2193 double cache_kv_ratio = 0; ///< cache ratio dedicated to kv (e.g., rocksdb)
2194 double cache_kv_onode_ratio = 0; ///< cache ratio dedicated to kv onodes (e.g., rocksdb onode CF)
2195 double cache_data_ratio = 0; ///< cache ratio dedicated to object data
2196 bool cache_autotune = false; ///< cache autotune setting
2197 double cache_autotune_interval = 0; ///< time to wait between cache rebalancing
2198 uint64_t osd_memory_target = 0; ///< OSD memory target when autotuning cache
2199 uint64_t osd_memory_base = 0; ///< OSD base memory when autotuning cache
2200 double osd_memory_expected_fragmentation = 0; ///< expected memory fragmentation
2201 uint64_t osd_memory_cache_min = 0; ///< Min memory to assign when autotuning cache
2202 double osd_memory_cache_resize_interval = 0; ///< Time to wait between cache resizing
2203 double max_defer_interval = 0; ///< Time to wait between last deferred submit
2204 std::atomic<uint32_t> config_changed = {0}; ///< Counter to determine if there is a configuration change.
2205
2206 typedef std::map<uint64_t, volatile_statfs> osd_pools_map;
2207
2208 ceph::mutex vstatfs_lock = ceph::make_mutex("BlueStore::vstatfs_lock");
2209 volatile_statfs vstatfs;
2210 osd_pools_map osd_pools; // protected by vstatfs_lock as well
2211
2212 bool per_pool_stat_collection = true;
2213
2214 struct MempoolThread : public Thread {
2215 public:
2216 BlueStore *store;
2217
2218 ceph::condition_variable cond;
2219 ceph::mutex lock = ceph::make_mutex("BlueStore::MempoolThread::lock");
2220 bool stop = false;
2221 std::shared_ptr<PriorityCache::PriCache> binned_kv_cache = nullptr;
2222 std::shared_ptr<PriorityCache::PriCache> binned_kv_onode_cache = nullptr;
2223 std::shared_ptr<PriorityCache::Manager> pcm = nullptr;
2224
2225 struct MempoolCache : public PriorityCache::PriCache {
2226 BlueStore *store;
2227 int64_t cache_bytes[PriorityCache::Priority::LAST+1] = {0};
2228 int64_t committed_bytes = 0;
2229 double cache_ratio = 0;
2230
2231 MempoolCache(BlueStore *s) : store(s) {};
2232
2233 virtual uint64_t _get_used_bytes() const = 0;
2234
2235 virtual int64_t request_cache_bytes(
2236 PriorityCache::Priority pri, uint64_t total_cache) const {
2237 int64_t assigned = get_cache_bytes(pri);
2238
2239 switch (pri) {
2240 // All cache items are currently shoved into the PRI1 priority
2241 case PriorityCache::Priority::PRI1:
2242 {
2243 int64_t request = _get_used_bytes();
2244 return(request > assigned) ? request - assigned : 0;
2245 }
2246 default:
2247 break;
2248 }
2249 return -EOPNOTSUPP;
2250 }
2251
2252 virtual int64_t get_cache_bytes(PriorityCache::Priority pri) const {
2253 return cache_bytes[pri];
2254 }
2255 virtual int64_t get_cache_bytes() const {
2256 int64_t total = 0;
2257
2258 for (int i = 0; i < PriorityCache::Priority::LAST + 1; i++) {
2259 PriorityCache::Priority pri = static_cast<PriorityCache::Priority>(i);
2260 total += get_cache_bytes(pri);
2261 }
2262 return total;
2263 }
2264 virtual void set_cache_bytes(PriorityCache::Priority pri, int64_t bytes) {
2265 cache_bytes[pri] = bytes;
2266 }
2267 virtual void add_cache_bytes(PriorityCache::Priority pri, int64_t bytes) {
2268 cache_bytes[pri] += bytes;
2269 }
2270 virtual int64_t commit_cache_size(uint64_t total_cache) {
2271 committed_bytes = PriorityCache::get_chunk(
2272 get_cache_bytes(), total_cache);
2273 return committed_bytes;
2274 }
2275 virtual int64_t get_committed_size() const {
2276 return committed_bytes;
2277 }
2278 virtual double get_cache_ratio() const {
2279 return cache_ratio;
2280 }
2281 virtual void set_cache_ratio(double ratio) {
2282 cache_ratio = ratio;
2283 }
2284 virtual std::string get_cache_name() const = 0;
2285 };
2286
2287 struct MetaCache : public MempoolCache {
2288 MetaCache(BlueStore *s) : MempoolCache(s) {};
2289
2290 virtual uint64_t _get_used_bytes() const {
2291 return mempool::bluestore_Buffer::allocated_bytes() +
2292 mempool::bluestore_Blob::allocated_bytes() +
2293 mempool::bluestore_Extent::allocated_bytes() +
2294 mempool::bluestore_cache_meta::allocated_bytes() +
2295 mempool::bluestore_cache_other::allocated_bytes() +
2296 mempool::bluestore_cache_onode::allocated_bytes() +
2297 mempool::bluestore_SharedBlob::allocated_bytes() +
2298 mempool::bluestore_inline_bl::allocated_bytes();
2299 }
2300
2301 virtual std::string get_cache_name() const {
2302 return "BlueStore Meta Cache";
2303 }
2304
2305 uint64_t _get_num_onodes() const {
2306 uint64_t onode_num =
2307 mempool::bluestore_cache_onode::allocated_items();
2308 return (2 > onode_num) ? 2 : onode_num;
2309 }
2310
2311 double get_bytes_per_onode() const {
2312 return (double)_get_used_bytes() / (double)_get_num_onodes();
2313 }
2314 };
2315 std::shared_ptr<MetaCache> meta_cache;
2316
2317 struct DataCache : public MempoolCache {
2318 DataCache(BlueStore *s) : MempoolCache(s) {};
2319
2320 virtual uint64_t _get_used_bytes() const {
2321 uint64_t bytes = 0;
2322 for (auto i : store->buffer_cache_shards) {
2323 bytes += i->_get_bytes();
2324 }
2325 return bytes;
2326 }
2327 virtual std::string get_cache_name() const {
2328 return "BlueStore Data Cache";
2329 }
2330 };
2331 std::shared_ptr<DataCache> data_cache;
2332
2333 public:
2334 explicit MempoolThread(BlueStore *s)
2335 : store(s),
2336 meta_cache(new MetaCache(s)),
2337 data_cache(new DataCache(s)) {}
2338
2339 void *entry() override;
2340 void init() {
2341 ceph_assert(stop == false);
2342 create("bstore_mempool");
2343 }
2344 void shutdown() {
2345 lock.lock();
2346 stop = true;
2347 cond.notify_all();
2348 lock.unlock();
2349 join();
2350 }
2351
2352 private:
2353 void _adjust_cache_settings();
2354 void _update_cache_settings();
2355 void _resize_shards(bool interval_stats);
2356 } mempool_thread;
2357
2358 #ifdef WITH_BLKIN
2359 ZTracer::Endpoint trace_endpoint {"0.0.0.0", 0, "BlueStore"};
2360 #endif
2361
2362 // --------------------------------------------------------
2363 // private methods
2364
2365 void _init_logger();
2366 void _shutdown_logger();
2367 int _reload_logger();
2368
2369 int _open_path();
2370 void _close_path();
2371 int _open_fsid(bool create);
2372 int _lock_fsid();
2373 int _read_fsid(uuid_d *f);
2374 int _write_fsid();
2375 void _close_fsid();
2376 void _set_alloc_sizes();
2377 void _set_blob_size();
2378 void _set_finisher_num();
2379 void _set_per_pool_omap();
2380 void _update_osd_memory_options();
2381
2382 int _open_bdev(bool create);
2383 // Verifies if disk space is enough for reserved + min bluefs
2384 // and alters the latter if needed.
2385 // Depends on min_alloc_size hence should be called after
2386 // its initialization (and outside of _open_bdev)
2387 void _validate_bdev();
2388 void _close_bdev();
2389
2390 int _minimal_open_bluefs(bool create);
2391 void _minimal_close_bluefs();
2392 int _open_bluefs(bool create, bool read_only);
2393 void _close_bluefs(bool cold_close);
2394
2395 int _is_bluefs(bool create, bool* ret);
2396 /*
2397 * opens both DB and dependant super_meta, FreelistManager and allocator
2398 * in the proper order
2399 */
2400 int _open_db_and_around(bool read_only, bool to_repair = false);
2401 void _close_db_and_around(bool read_only);
2402
2403 int _prepare_db_environment(bool create, bool read_only,
2404 std::string* kv_dir, std::string* kv_backend);
2405
2406 /*
2407 * @warning to_repair_db means that we open this db to repair it, will not
2408 * hold the rocksdb's file lock.
2409 */
2410 int _open_db(bool create,
2411 bool to_repair_db=false,
2412 bool read_only = false);
2413 void _close_db(bool read_only);
2414 int _open_fm(KeyValueDB::Transaction t, bool read_only);
2415 void _close_fm();
2416 int _write_out_fm_meta(uint64_t target_size);
2417 int _create_alloc();
2418 int _init_alloc();
2419 void _close_alloc();
2420 int _open_collections();
2421 void _fsck_collections(int64_t* errors);
2422 void _close_collections();
2423
2424 int _setup_block_symlink_or_file(std::string name, std::string path, uint64_t size,
2425 bool create);
2426
2427 // Functions related to zoned storage.
2428 uint64_t _zoned_piggyback_device_parameters_onto(uint64_t min_alloc_size);
2429 int _zoned_check_config_settings();
2430 void _zoned_update_cleaning_metadata(TransContext *txc);
2431 std::string _zoned_get_prefix(uint64_t offset);
2432
2433 public:
2434 utime_t get_deferred_last_submitted() {
2435 std::lock_guard l(deferred_lock);
2436 return deferred_last_submitted;
2437 }
2438
2439 static int _write_bdev_label(CephContext* cct,
2440 std::string path, bluestore_bdev_label_t label);
2441 static int _read_bdev_label(CephContext* cct, std::string path,
2442 bluestore_bdev_label_t *label);
2443 private:
2444 int _check_or_set_bdev_label(std::string path, uint64_t size, std::string desc,
2445 bool create);
2446 int _set_bdev_label_size(const string& path, uint64_t size);
2447
2448 int _open_super_meta();
2449
2450 void _open_statfs();
2451 void _get_statfs_overall(struct store_statfs_t *buf);
2452
2453 void _dump_alloc_on_failure();
2454
2455 CollectionRef _get_collection(const coll_t& cid);
2456 void _queue_reap_collection(CollectionRef& c);
2457 void _reap_collections();
2458 void _update_cache_logger();
2459
2460 void _assign_nid(TransContext *txc, OnodeRef o);
2461 uint64_t _assign_blobid(TransContext *txc);
2462
2463 template <int LogLevelV>
2464 friend void _dump_onode(CephContext *cct, const Onode& o);
2465 template <int LogLevelV>
2466 friend void _dump_extent_map(CephContext *cct, const ExtentMap& em);
2467 template <int LogLevelV>
2468 friend void _dump_transaction(CephContext *cct, Transaction *t);
2469
2470 TransContext *_txc_create(Collection *c, OpSequencer *osr,
2471 std::list<Context*> *on_commits,
2472 TrackedOpRef osd_op=TrackedOpRef());
2473 void _txc_update_store_statfs(TransContext *txc);
2474 void _txc_add_transaction(TransContext *txc, Transaction *t);
2475 void _txc_calc_cost(TransContext *txc);
2476 void _txc_write_nodes(TransContext *txc, KeyValueDB::Transaction t);
2477 void _txc_state_proc(TransContext *txc);
2478 void _txc_aio_submit(TransContext *txc);
2479 public:
2480 void txc_aio_finish(void *p) {
2481 _txc_state_proc(static_cast<TransContext*>(p));
2482 }
2483 private:
2484 void _txc_finish_io(TransContext *txc);
2485 void _txc_finalize_kv(TransContext *txc, KeyValueDB::Transaction t);
2486 void _txc_apply_kv(TransContext *txc, bool sync_submit_transaction);
2487 void _txc_committed_kv(TransContext *txc);
2488 void _txc_finish(TransContext *txc);
2489 void _txc_release_alloc(TransContext *txc);
2490
2491 void _osr_attach(Collection *c);
2492 void _osr_register_zombie(OpSequencer *osr);
2493 void _osr_drain(OpSequencer *osr);
2494 void _osr_drain_preceding(TransContext *txc);
2495 void _osr_drain_all();
2496
2497 void _kv_start();
2498 void _kv_stop();
2499 void _kv_sync_thread();
2500 void _kv_finalize_thread();
2501
2502 void _zoned_cleaner_start();
2503 void _zoned_cleaner_stop();
2504 void _zoned_cleaner_thread();
2505 void _zoned_clean_zone(uint64_t zone_num);
2506
2507 bluestore_deferred_op_t *_get_deferred_op(TransContext *txc, uint64_t len);
2508 void _deferred_queue(TransContext *txc);
2509 public:
2510 void deferred_try_submit();
2511 private:
2512 void _deferred_submit_unlock(OpSequencer *osr);
2513 void _deferred_aio_finish(OpSequencer *osr);
2514 int _deferred_replay();
2515
2516 public:
2517 using mempool_dynamic_bitset =
2518 boost::dynamic_bitset<uint64_t,
2519 mempool::bluestore_fsck::pool_allocator<uint64_t>>;
2520 using per_pool_statfs =
2521 mempool::bluestore_fsck::map<uint64_t, store_statfs_t>;
2522
2523 enum FSCKDepth {
2524 FSCK_REGULAR,
2525 FSCK_DEEP,
2526 FSCK_SHALLOW
2527 };
2528 enum {
2529 MAX_FSCK_ERROR_LINES = 100,
2530 };
2531
2532 private:
2533 int _fsck_check_extents(
2534 const coll_t& cid,
2535 const ghobject_t& oid,
2536 const PExtentVector& extents,
2537 bool compressed,
2538 mempool_dynamic_bitset &used_blocks,
2539 uint64_t granularity,
2540 BlueStoreRepairer* repairer,
2541 store_statfs_t& expected_statfs,
2542 FSCKDepth depth);
2543
2544 void _fsck_check_pool_statfs(
2545 per_pool_statfs& expected_pool_statfs,
2546 int64_t& errors,
2547 int64_t &warnings,
2548 BlueStoreRepairer* repairer);
2549
2550 int _fsck(FSCKDepth depth, bool repair);
2551 int _fsck_on_open(BlueStore::FSCKDepth depth, bool repair);
2552
2553 void _buffer_cache_write(
2554 TransContext *txc,
2555 BlobRef b,
2556 uint64_t offset,
2557 ceph::buffer::list& bl,
2558 unsigned flags) {
2559 b->shared_blob->bc.write(b->shared_blob->get_cache(), txc->seq, offset, bl,
2560 flags);
2561 txc->shared_blobs_written.insert(b->shared_blob);
2562 }
2563
2564 int _collection_list(
2565 Collection *c, const ghobject_t& start, const ghobject_t& end,
2566 int max, bool legacy, std::vector<ghobject_t> *ls, ghobject_t *next);
2567
2568 template <typename T, typename F>
2569 T select_option(const std::string& opt_name, T val1, F f) {
2570 //NB: opt_name reserved for future use
2571 boost::optional<T> val2 = f();
2572 if (val2) {
2573 return *val2;
2574 }
2575 return val1;
2576 }
2577
2578 void _apply_padding(uint64_t head_pad,
2579 uint64_t tail_pad,
2580 ceph::buffer::list& padded);
2581
2582 void _record_onode(OnodeRef &o, KeyValueDB::Transaction &txn);
2583
2584 // -- ondisk version ---
2585 public:
2586 const int32_t latest_ondisk_format = 4; ///< our version
2587 const int32_t min_readable_ondisk_format = 1; ///< what we can read
2588 const int32_t min_compat_ondisk_format = 3; ///< who can read us
2589
2590 private:
2591 int32_t ondisk_format = 0; ///< value detected on mount
2592
2593 int _upgrade_super(); ///< upgrade (called during open_super)
2594 uint64_t _get_ondisk_reserved() const;
2595 void _prepare_ondisk_format_super(KeyValueDB::Transaction& t);
2596
2597 // --- public interface ---
2598 public:
2599 BlueStore(CephContext *cct, const std::string& path);
2600 BlueStore(CephContext *cct, const std::string& path, uint64_t min_alloc_size); // Ctor for UT only
2601 ~BlueStore() override;
2602
2603 std::string get_type() override {
2604 return "bluestore";
2605 }
2606
2607 bool needs_journal() override { return false; };
2608 bool wants_journal() override { return false; };
2609 bool allows_journal() override { return false; };
2610
2611 uint64_t get_min_alloc_size() const override {
2612 return min_alloc_size;
2613 }
2614
2615 int get_devices(std::set<std::string> *ls) override;
2616
2617 bool is_rotational() override;
2618 bool is_journal_rotational() override;
2619
2620 std::string get_default_device_class() override {
2621 std::string device_class;
2622 std::map<std::string, std::string> metadata;
2623 collect_metadata(&metadata);
2624 auto it = metadata.find("bluestore_bdev_type");
2625 if (it != metadata.end()) {
2626 device_class = it->second;
2627 }
2628 return device_class;
2629 }
2630
2631 int get_numa_node(
2632 int *numa_node,
2633 std::set<int> *nodes,
2634 std::set<std::string> *failed) override;
2635
2636 static int get_block_device_fsid(CephContext* cct, const std::string& path,
2637 uuid_d *fsid);
2638
2639 bool test_mount_in_use() override;
2640
2641 private:
2642 int _mount();
2643 public:
2644 int mount() override {
2645 return _mount();
2646 }
2647 int umount() override;
2648
2649 int open_db_environment(KeyValueDB **pdb, bool to_repair);
2650 int close_db_environment();
2651
2652 int write_meta(const std::string& key, const std::string& value) override;
2653 int read_meta(const std::string& key, std::string *value) override;
2654
2655 // open in read-only and limited mode
2656 int cold_open();
2657 int cold_close();
2658
2659 int fsck(bool deep) override {
2660 return _fsck(deep ? FSCK_DEEP : FSCK_REGULAR, false);
2661 }
2662 int repair(bool deep) override {
2663 return _fsck(deep ? FSCK_DEEP : FSCK_REGULAR, true);
2664 }
2665 int quick_fix() override {
2666 return _fsck(FSCK_SHALLOW, true);
2667 }
2668
2669 void set_cache_shards(unsigned num) override;
2670 void dump_cache_stats(ceph::Formatter *f) override {
2671 int onode_count = 0, buffers_bytes = 0;
2672 for (auto i: onode_cache_shards) {
2673 onode_count += i->_get_num();
2674 }
2675 for (auto i: buffer_cache_shards) {
2676 buffers_bytes += i->_get_bytes();
2677 }
2678 f->dump_int("bluestore_onode", onode_count);
2679 f->dump_int("bluestore_buffers", buffers_bytes);
2680 }
2681 void dump_cache_stats(std::ostream& ss) override {
2682 int onode_count = 0, buffers_bytes = 0;
2683 for (auto i: onode_cache_shards) {
2684 onode_count += i->_get_num();
2685 }
2686 for (auto i: buffer_cache_shards) {
2687 buffers_bytes += i->_get_bytes();
2688 }
2689 ss << "bluestore_onode: " << onode_count;
2690 ss << "bluestore_buffers: " << buffers_bytes;
2691 }
2692
2693 int validate_hobject_key(const hobject_t &obj) const override {
2694 return 0;
2695 }
2696 unsigned get_max_attr_name_length() override {
2697 return 256; // arbitrary; there is no real limit internally
2698 }
2699
2700 int mkfs() override;
2701 int mkjournal() override {
2702 return 0;
2703 }
2704
2705 void get_db_statistics(ceph::Formatter *f) override;
2706 void generate_db_histogram(ceph::Formatter *f) override;
2707 void _shutdown_cache();
2708 int flush_cache(std::ostream *os = NULL) override;
2709 void dump_perf_counters(ceph::Formatter *f) override {
2710 f->open_object_section("perf_counters");
2711 logger->dump_formatted(f, false);
2712 f->close_section();
2713 }
2714
2715 int add_new_bluefs_device(int id, const std::string& path);
2716 int migrate_to_existing_bluefs_device(const std::set<int>& devs_source,
2717 int id);
2718 int migrate_to_new_bluefs_device(const std::set<int>& devs_source,
2719 int id,
2720 const std::string& path);
2721 int expand_devices(std::ostream& out);
2722 std::string get_device_path(unsigned id);
2723
2724 int dump_bluefs_sizes(ostream& out);
2725
2726 public:
2727 int statfs(struct store_statfs_t *buf,
2728 osd_alert_list_t* alerts = nullptr) override;
2729 int pool_statfs(uint64_t pool_id, struct store_statfs_t *buf,
2730 bool *per_pool_omap) override;
2731
2732 void collect_metadata(std::map<std::string,std::string> *pm) override;
2733
2734 bool exists(CollectionHandle &c, const ghobject_t& oid) override;
2735 int set_collection_opts(
2736 CollectionHandle& c,
2737 const pool_opts_t& opts) override;
2738 int stat(
2739 CollectionHandle &c,
2740 const ghobject_t& oid,
2741 struct stat *st,
2742 bool allow_eio = false) override;
2743 int read(
2744 CollectionHandle &c,
2745 const ghobject_t& oid,
2746 uint64_t offset,
2747 size_t len,
2748 ceph::buffer::list& bl,
2749 uint32_t op_flags = 0) override;
2750
2751 private:
2752
2753 // --------------------------------------------------------
2754 // intermediate data structures used while reading
2755 struct region_t {
2756 uint64_t logical_offset;
2757 uint64_t blob_xoffset; //region offset within the blob
2758 uint64_t length;
2759
2760 // used later in read process
2761 uint64_t front = 0;
2762
2763 region_t(uint64_t offset, uint64_t b_offs, uint64_t len, uint64_t front = 0)
2764 : logical_offset(offset),
2765 blob_xoffset(b_offs),
2766 length(len),
2767 front(front){}
2768 region_t(const region_t& from)
2769 : logical_offset(from.logical_offset),
2770 blob_xoffset(from.blob_xoffset),
2771 length(from.length),
2772 front(from.front){}
2773
2774 friend std::ostream& operator<<(std::ostream& out, const region_t& r) {
2775 return out << "0x" << std::hex << r.logical_offset << ":"
2776 << r.blob_xoffset << "~" << r.length << std::dec;
2777 }
2778 };
2779
2780 // merged blob read request
2781 struct read_req_t {
2782 uint64_t r_off = 0;
2783 uint64_t r_len = 0;
2784 ceph::buffer::list bl;
2785 std::list<region_t> regs; // original read regions
2786
2787 read_req_t(uint64_t off, uint64_t len) : r_off(off), r_len(len) {}
2788
2789 friend std::ostream& operator<<(std::ostream& out, const read_req_t& r) {
2790 out << "{<0x" << std::hex << r.r_off << ", 0x" << r.r_len << "> : [";
2791 for (const auto& reg : r.regs)
2792 out << reg;
2793 return out << "]}" << std::dec;
2794 }
2795 };
2796
2797 typedef std::list<read_req_t> regions2read_t;
2798 typedef std::map<BlueStore::BlobRef, regions2read_t> blobs2read_t;
2799
2800 void _read_cache(
2801 OnodeRef o,
2802 uint64_t offset,
2803 size_t length,
2804 int read_cache_policy,
2805 ready_regions_t& ready_regions,
2806 blobs2read_t& blobs2read);
2807
2808
2809 int _prepare_read_ioc(
2810 blobs2read_t& blobs2read,
2811 std::vector<ceph::buffer::list>* compressed_blob_bls,
2812 IOContext* ioc);
2813
2814 int _generate_read_result_bl(
2815 OnodeRef o,
2816 uint64_t offset,
2817 size_t length,
2818 ready_regions_t& ready_regions,
2819 std::vector<ceph::buffer::list>& compressed_blob_bls,
2820 blobs2read_t& blobs2read,
2821 bool buffered,
2822 bool* csum_error,
2823 ceph::buffer::list& bl);
2824
2825 int _do_read(
2826 Collection *c,
2827 OnodeRef o,
2828 uint64_t offset,
2829 size_t len,
2830 ceph::buffer::list& bl,
2831 uint32_t op_flags = 0,
2832 uint64_t retry_count = 0);
2833
2834 int _do_readv(
2835 Collection *c,
2836 OnodeRef o,
2837 const interval_set<uint64_t>& m,
2838 ceph::buffer::list& bl,
2839 uint32_t op_flags = 0,
2840 uint64_t retry_count = 0);
2841
2842 int _fiemap(CollectionHandle &c_, const ghobject_t& oid,
2843 uint64_t offset, size_t len, interval_set<uint64_t>& destset);
2844 public:
2845 int fiemap(CollectionHandle &c, const ghobject_t& oid,
2846 uint64_t offset, size_t len, ceph::buffer::list& bl) override;
2847 int fiemap(CollectionHandle &c, const ghobject_t& oid,
2848 uint64_t offset, size_t len, std::map<uint64_t, uint64_t>& destmap) override;
2849
2850 int readv(
2851 CollectionHandle &c_,
2852 const ghobject_t& oid,
2853 interval_set<uint64_t>& m,
2854 ceph::buffer::list& bl,
2855 uint32_t op_flags) override;
2856
2857 int dump_onode(CollectionHandle &c, const ghobject_t& oid,
2858 const std::string& section_name, ceph::Formatter *f) override;
2859
2860 int getattr(CollectionHandle &c, const ghobject_t& oid, const char *name,
2861 ceph::buffer::ptr& value) override;
2862
2863 int getattrs(CollectionHandle &c, const ghobject_t& oid,
2864 std::map<std::string,ceph::buffer::ptr>& aset) override;
2865
2866 int list_collections(std::vector<coll_t>& ls) override;
2867
2868 CollectionHandle open_collection(const coll_t &c) override;
2869 CollectionHandle create_new_collection(const coll_t& cid) override;
2870 void set_collection_commit_queue(const coll_t& cid,
2871 ContextQueue *commit_queue) override;
2872
2873 bool collection_exists(const coll_t& c) override;
2874 int collection_empty(CollectionHandle& c, bool *empty) override;
2875 int collection_bits(CollectionHandle& c) override;
2876
2877 int collection_list(CollectionHandle &c,
2878 const ghobject_t& start,
2879 const ghobject_t& end,
2880 int max,
2881 std::vector<ghobject_t> *ls, ghobject_t *next) override;
2882
2883 int collection_list_legacy(CollectionHandle &c,
2884 const ghobject_t& start,
2885 const ghobject_t& end,
2886 int max,
2887 std::vector<ghobject_t> *ls,
2888 ghobject_t *next) override;
2889
2890 int omap_get(
2891 CollectionHandle &c, ///< [in] Collection containing oid
2892 const ghobject_t &oid, ///< [in] Object containing omap
2893 ceph::buffer::list *header, ///< [out] omap header
2894 std::map<std::string, ceph::buffer::list> *out /// < [out] Key to value map
2895 ) override;
2896 int _omap_get(
2897 Collection *c, ///< [in] Collection containing oid
2898 const ghobject_t &oid, ///< [in] Object containing omap
2899 ceph::buffer::list *header, ///< [out] omap header
2900 std::map<std::string, ceph::buffer::list> *out /// < [out] Key to value map
2901 );
2902 int _onode_omap_get(
2903 const OnodeRef &o, ///< [in] Object containing omap
2904 ceph::buffer::list *header, ///< [out] omap header
2905 std::map<std::string, ceph::buffer::list> *out /// < [out] Key to value map
2906 );
2907
2908
2909 /// Get omap header
2910 int omap_get_header(
2911 CollectionHandle &c, ///< [in] Collection containing oid
2912 const ghobject_t &oid, ///< [in] Object containing omap
2913 ceph::buffer::list *header, ///< [out] omap header
2914 bool allow_eio = false ///< [in] don't assert on eio
2915 ) override;
2916
2917 /// Get keys defined on oid
2918 int omap_get_keys(
2919 CollectionHandle &c, ///< [in] Collection containing oid
2920 const ghobject_t &oid, ///< [in] Object containing omap
2921 std::set<std::string> *keys ///< [out] Keys defined on oid
2922 ) override;
2923
2924 /// Get key values
2925 int omap_get_values(
2926 CollectionHandle &c, ///< [in] Collection containing oid
2927 const ghobject_t &oid, ///< [in] Object containing omap
2928 const std::set<std::string> &keys, ///< [in] Keys to get
2929 std::map<std::string, ceph::buffer::list> *out ///< [out] Returned keys and values
2930 ) override;
2931
2932 #ifdef WITH_SEASTAR
2933 int omap_get_values(
2934 CollectionHandle &c, ///< [in] Collection containing oid
2935 const ghobject_t &oid, ///< [in] Object containing omap
2936 const std::optional<std::string> &start_after, ///< [in] Keys to get
2937 std::map<std::string, ceph::buffer::list> *out ///< [out] Returned keys and values
2938 ) override;
2939 #endif
2940
2941 /// Filters keys into out which are defined on oid
2942 int omap_check_keys(
2943 CollectionHandle &c, ///< [in] Collection containing oid
2944 const ghobject_t &oid, ///< [in] Object containing omap
2945 const std::set<std::string> &keys, ///< [in] Keys to check
2946 std::set<std::string> *out ///< [out] Subset of keys defined on oid
2947 ) override;
2948
2949 ObjectMap::ObjectMapIterator get_omap_iterator(
2950 CollectionHandle &c, ///< [in] collection
2951 const ghobject_t &oid ///< [in] object
2952 ) override;
2953
2954 void set_fsid(uuid_d u) override {
2955 fsid = u;
2956 }
2957 uuid_d get_fsid() override {
2958 return fsid;
2959 }
2960
2961 uint64_t estimate_objects_overhead(uint64_t num_objects) override {
2962 return num_objects * 300; //assuming per-object overhead is 300 bytes
2963 }
2964
2965 struct BSPerfTracker {
2966 PerfCounters::avg_tracker<uint64_t> os_commit_latency_ns;
2967 PerfCounters::avg_tracker<uint64_t> os_apply_latency_ns;
2968
2969 objectstore_perf_stat_t get_cur_stats() const {
2970 objectstore_perf_stat_t ret;
2971 ret.os_commit_latency_ns = os_commit_latency_ns.current_avg();
2972 ret.os_apply_latency_ns = os_apply_latency_ns.current_avg();
2973 return ret;
2974 }
2975
2976 void update_from_perfcounters(PerfCounters &logger);
2977 } perf_tracker;
2978
2979 objectstore_perf_stat_t get_cur_stats() override {
2980 perf_tracker.update_from_perfcounters(*logger);
2981 return perf_tracker.get_cur_stats();
2982 }
2983 const PerfCounters* get_perf_counters() const override {
2984 return logger;
2985 }
2986 const PerfCounters* get_bluefs_perf_counters() const {
2987 return bluefs->get_perf_counters();
2988 }
2989 KeyValueDB* get_kv() {
2990 return db;
2991 }
2992
2993 int queue_transactions(
2994 CollectionHandle& ch,
2995 std::vector<Transaction>& tls,
2996 TrackedOpRef op = TrackedOpRef(),
2997 ThreadPool::TPHandle *handle = NULL) override;
2998
2999 // error injection
3000 void inject_data_error(const ghobject_t& o) override {
3001 std::unique_lock l(debug_read_error_lock);
3002 debug_data_error_objects.insert(o);
3003 }
3004 void inject_mdata_error(const ghobject_t& o) override {
3005 std::unique_lock l(debug_read_error_lock);
3006 debug_mdata_error_objects.insert(o);
3007 }
3008
3009 /// methods to inject various errors fsck can repair
3010 void inject_broken_shared_blob_key(const std::string& key,
3011 const ceph::buffer::list& bl);
3012 void inject_no_shared_blob_key();
3013
3014 void inject_leaked(uint64_t len);
3015 void inject_false_free(coll_t cid, ghobject_t oid);
3016 void inject_statfs(const std::string& key, const store_statfs_t& new_statfs);
3017 void inject_global_statfs(const store_statfs_t& new_statfs);
3018 void inject_misreference(coll_t cid1, ghobject_t oid1,
3019 coll_t cid2, ghobject_t oid2,
3020 uint64_t offset);
3021 void inject_zombie_spanning_blob(coll_t cid, ghobject_t oid, int16_t blob_id);
3022 // resets global per_pool_omap in DB
3023 void inject_legacy_omap();
3024 // resets per_pool_omap | pgmeta_omap for onode
3025 void inject_legacy_omap(coll_t cid, ghobject_t oid);
3026
3027 void inject_bluefs_file(std::string_view dir,
3028 std::string_view name,
3029 size_t new_size);
3030
3031 void compact() override {
3032 ceph_assert(db);
3033 db->compact();
3034 }
3035 bool has_builtin_csum() const override {
3036 return true;
3037 }
3038
3039 inline void log_latency(const char* name,
3040 int idx,
3041 const ceph::timespan& lat,
3042 double lat_threshold,
3043 const char* info = "") const;
3044
3045 inline void log_latency_fn(const char* name,
3046 int idx,
3047 const ceph::timespan& lat,
3048 double lat_threshold,
3049 std::function<std::string (const ceph::timespan& lat)> fn) const;
3050
3051 private:
3052 bool _debug_data_eio(const ghobject_t& o) {
3053 if (!cct->_conf->bluestore_debug_inject_read_err) {
3054 return false;
3055 }
3056 std::shared_lock l(debug_read_error_lock);
3057 return debug_data_error_objects.count(o);
3058 }
3059 bool _debug_mdata_eio(const ghobject_t& o) {
3060 if (!cct->_conf->bluestore_debug_inject_read_err) {
3061 return false;
3062 }
3063 std::shared_lock l(debug_read_error_lock);
3064 return debug_mdata_error_objects.count(o);
3065 }
3066 void _debug_obj_on_delete(const ghobject_t& o) {
3067 if (cct->_conf->bluestore_debug_inject_read_err) {
3068 std::unique_lock l(debug_read_error_lock);
3069 debug_data_error_objects.erase(o);
3070 debug_mdata_error_objects.erase(o);
3071 }
3072 }
3073 private:
3074 ceph::mutex qlock = ceph::make_mutex("BlueStore::Alerts::qlock");
3075 std::string failed_cmode;
3076 std::set<std::string> failed_compressors;
3077 std::string spillover_alert;
3078 std::string legacy_statfs_alert;
3079 std::string no_per_pool_omap_alert;
3080 std::string no_per_pg_omap_alert;
3081 std::string disk_size_mismatch_alert;
3082 std::string spurious_read_errors_alert;
3083
3084 void _log_alerts(osd_alert_list_t& alerts);
3085 bool _set_compression_alert(bool cmode, const char* s) {
3086 std::lock_guard l(qlock);
3087 if (cmode) {
3088 bool ret = failed_cmode.empty();
3089 failed_cmode = s;
3090 return ret;
3091 }
3092 return failed_compressors.emplace(s).second;
3093 }
3094 void _clear_compression_alert() {
3095 std::lock_guard l(qlock);
3096 failed_compressors.clear();
3097 failed_cmode.clear();
3098 }
3099
3100 void _set_spillover_alert(const std::string& s) {
3101 std::lock_guard l(qlock);
3102 spillover_alert = s;
3103 }
3104 void _clear_spillover_alert() {
3105 std::lock_guard l(qlock);
3106 spillover_alert.clear();
3107 }
3108
3109 void _check_legacy_statfs_alert();
3110 void _check_no_per_pg_or_pool_omap_alert();
3111 void _set_disk_size_mismatch_alert(const std::string& s) {
3112 std::lock_guard l(qlock);
3113 disk_size_mismatch_alert = s;
3114 }
3115 void _set_spurious_read_errors_alert(const string& s) {
3116 std::lock_guard l(qlock);
3117 spurious_read_errors_alert = s;
3118 }
3119
3120 private:
3121
3122 // --------------------------------------------------------
3123 // read processing internal methods
3124 int _verify_csum(
3125 OnodeRef& o,
3126 const bluestore_blob_t* blob,
3127 uint64_t blob_xoffset,
3128 const ceph::buffer::list& bl,
3129 uint64_t logical_offset) const;
3130 int _decompress(ceph::buffer::list& source, ceph::buffer::list* result);
3131
3132
3133 // --------------------------------------------------------
3134 // write ops
3135
3136 struct WriteContext {
3137 bool buffered = false; ///< buffered write
3138 bool compress = false; ///< compressed write
3139 uint64_t target_blob_size = 0; ///< target (max) blob size
3140 unsigned csum_order = 0; ///< target checksum chunk order
3141
3142 old_extent_map_t old_extents; ///< must deref these blobs
3143 interval_set<uint64_t> extents_to_gc; ///< extents for garbage collection
3144
3145 struct write_item {
3146 uint64_t logical_offset; ///< write logical offset
3147 BlobRef b;
3148 uint64_t blob_length;
3149 uint64_t b_off;
3150 ceph::buffer::list bl;
3151 uint64_t b_off0; ///< original offset in a blob prior to padding
3152 uint64_t length0; ///< original data length prior to padding
3153
3154 bool mark_unused;
3155 bool new_blob; ///< whether new blob was created
3156
3157 bool compressed = false;
3158 ceph::buffer::list compressed_bl;
3159 size_t compressed_len = 0;
3160
3161 write_item(
3162 uint64_t logical_offs,
3163 BlobRef b,
3164 uint64_t blob_len,
3165 uint64_t o,
3166 ceph::buffer::list& bl,
3167 uint64_t o0,
3168 uint64_t l0,
3169 bool _mark_unused,
3170 bool _new_blob)
3171 :
3172 logical_offset(logical_offs),
3173 b(b),
3174 blob_length(blob_len),
3175 b_off(o),
3176 bl(bl),
3177 b_off0(o0),
3178 length0(l0),
3179 mark_unused(_mark_unused),
3180 new_blob(_new_blob) {}
3181 };
3182 std::vector<write_item> writes; ///< blobs we're writing
3183
3184 /// partial clone of the context
3185 void fork(const WriteContext& other) {
3186 buffered = other.buffered;
3187 compress = other.compress;
3188 target_blob_size = other.target_blob_size;
3189 csum_order = other.csum_order;
3190 }
3191 void write(
3192 uint64_t loffs,
3193 BlobRef b,
3194 uint64_t blob_len,
3195 uint64_t o,
3196 ceph::buffer::list& bl,
3197 uint64_t o0,
3198 uint64_t len0,
3199 bool _mark_unused,
3200 bool _new_blob) {
3201 writes.emplace_back(loffs,
3202 b,
3203 blob_len,
3204 o,
3205 bl,
3206 o0,
3207 len0,
3208 _mark_unused,
3209 _new_blob);
3210 }
3211 /// Checks for writes to the same pextent within a blob
3212 bool has_conflict(
3213 BlobRef b,
3214 uint64_t loffs,
3215 uint64_t loffs_end,
3216 uint64_t min_alloc_size);
3217 };
3218
3219 void _do_write_small(
3220 TransContext *txc,
3221 CollectionRef &c,
3222 OnodeRef o,
3223 uint64_t offset, uint64_t length,
3224 ceph::buffer::list::iterator& blp,
3225 WriteContext *wctx);
3226 void _do_write_big_apply_deferred(
3227 TransContext* txc,
3228 CollectionRef& c,
3229 OnodeRef o,
3230 BigDeferredWriteContext& dctx,
3231 bufferlist::iterator& blp,
3232 WriteContext* wctx);
3233 void _do_write_big(
3234 TransContext *txc,
3235 CollectionRef &c,
3236 OnodeRef o,
3237 uint64_t offset, uint64_t length,
3238 ceph::buffer::list::iterator& blp,
3239 WriteContext *wctx);
3240 int _do_alloc_write(
3241 TransContext *txc,
3242 CollectionRef c,
3243 OnodeRef o,
3244 WriteContext *wctx);
3245 void _wctx_finish(
3246 TransContext *txc,
3247 CollectionRef& c,
3248 OnodeRef o,
3249 WriteContext *wctx,
3250 std::set<SharedBlob*> *maybe_unshared_blobs=0);
3251
3252 int _write(TransContext *txc,
3253 CollectionRef& c,
3254 OnodeRef& o,
3255 uint64_t offset, size_t len,
3256 ceph::buffer::list& bl,
3257 uint32_t fadvise_flags);
3258 void _pad_zeros(ceph::buffer::list *bl, uint64_t *offset,
3259 uint64_t chunk_size);
3260
3261 void _choose_write_options(CollectionRef& c,
3262 OnodeRef o,
3263 uint32_t fadvise_flags,
3264 WriteContext *wctx);
3265
3266 int _do_gc(TransContext *txc,
3267 CollectionRef& c,
3268 OnodeRef o,
3269 const WriteContext& wctx,
3270 uint64_t *dirty_start,
3271 uint64_t *dirty_end);
3272
3273 int _do_write(TransContext *txc,
3274 CollectionRef &c,
3275 OnodeRef o,
3276 uint64_t offset, uint64_t length,
3277 ceph::buffer::list& bl,
3278 uint32_t fadvise_flags);
3279 void _do_write_data(TransContext *txc,
3280 CollectionRef& c,
3281 OnodeRef o,
3282 uint64_t offset,
3283 uint64_t length,
3284 ceph::buffer::list& bl,
3285 WriteContext *wctx);
3286
3287 int _touch(TransContext *txc,
3288 CollectionRef& c,
3289 OnodeRef& o);
3290 int _do_zero(TransContext *txc,
3291 CollectionRef& c,
3292 OnodeRef& o,
3293 uint64_t offset, size_t len);
3294 int _zero(TransContext *txc,
3295 CollectionRef& c,
3296 OnodeRef& o,
3297 uint64_t offset, size_t len);
3298 void _do_truncate(TransContext *txc,
3299 CollectionRef& c,
3300 OnodeRef o,
3301 uint64_t offset,
3302 std::set<SharedBlob*> *maybe_unshared_blobs=0);
3303 int _truncate(TransContext *txc,
3304 CollectionRef& c,
3305 OnodeRef& o,
3306 uint64_t offset);
3307 int _remove(TransContext *txc,
3308 CollectionRef& c,
3309 OnodeRef& o);
3310 int _do_remove(TransContext *txc,
3311 CollectionRef& c,
3312 OnodeRef o);
3313 int _setattr(TransContext *txc,
3314 CollectionRef& c,
3315 OnodeRef& o,
3316 const std::string& name,
3317 ceph::buffer::ptr& val);
3318 int _setattrs(TransContext *txc,
3319 CollectionRef& c,
3320 OnodeRef& o,
3321 const std::map<std::string,ceph::buffer::ptr>& aset);
3322 int _rmattr(TransContext *txc,
3323 CollectionRef& c,
3324 OnodeRef& o,
3325 const std::string& name);
3326 int _rmattrs(TransContext *txc,
3327 CollectionRef& c,
3328 OnodeRef& o);
3329 void _do_omap_clear(TransContext *txc, OnodeRef &o);
3330 int _omap_clear(TransContext *txc,
3331 CollectionRef& c,
3332 OnodeRef& o);
3333 int _omap_setkeys(TransContext *txc,
3334 CollectionRef& c,
3335 OnodeRef& o,
3336 ceph::buffer::list& bl);
3337 int _omap_setheader(TransContext *txc,
3338 CollectionRef& c,
3339 OnodeRef& o,
3340 ceph::buffer::list& header);
3341 int _omap_rmkeys(TransContext *txc,
3342 CollectionRef& c,
3343 OnodeRef& o,
3344 ceph::buffer::list& bl);
3345 int _omap_rmkey_range(TransContext *txc,
3346 CollectionRef& c,
3347 OnodeRef& o,
3348 const std::string& first, const std::string& last);
3349 int _set_alloc_hint(
3350 TransContext *txc,
3351 CollectionRef& c,
3352 OnodeRef& o,
3353 uint64_t expected_object_size,
3354 uint64_t expected_write_size,
3355 uint32_t flags);
3356 int _do_clone_range(TransContext *txc,
3357 CollectionRef& c,
3358 OnodeRef& oldo,
3359 OnodeRef& newo,
3360 uint64_t srcoff, uint64_t length, uint64_t dstoff);
3361 int _clone(TransContext *txc,
3362 CollectionRef& c,
3363 OnodeRef& oldo,
3364 OnodeRef& newo);
3365 int _clone_range(TransContext *txc,
3366 CollectionRef& c,
3367 OnodeRef& oldo,
3368 OnodeRef& newo,
3369 uint64_t srcoff, uint64_t length, uint64_t dstoff);
3370 int _rename(TransContext *txc,
3371 CollectionRef& c,
3372 OnodeRef& oldo,
3373 OnodeRef& newo,
3374 const ghobject_t& new_oid);
3375 int _create_collection(TransContext *txc, const coll_t &cid,
3376 unsigned bits, CollectionRef *c);
3377 int _remove_collection(TransContext *txc, const coll_t &cid,
3378 CollectionRef *c);
3379 void _do_remove_collection(TransContext *txc, CollectionRef *c);
3380 int _split_collection(TransContext *txc,
3381 CollectionRef& c,
3382 CollectionRef& d,
3383 unsigned bits, int rem);
3384 int _merge_collection(TransContext *txc,
3385 CollectionRef *c,
3386 CollectionRef& d,
3387 unsigned bits);
3388
3389 void _collect_allocation_stats(uint64_t need, uint32_t alloc_size,
3390 size_t extents);
3391 void _record_allocation_stats();
3392 private:
3393 uint64_t probe_count = 0;
3394 std::atomic<uint64_t> alloc_stats_count = {0};
3395 std::atomic<uint64_t> alloc_stats_fragments = { 0 };
3396 std::atomic<uint64_t> alloc_stats_size = { 0 };
3397 //
3398 std::array<std::tuple<uint64_t, uint64_t, uint64_t>, 5> alloc_stats_history =
3399 { std::make_tuple(0ul, 0ul, 0ul) };
3400
3401 inline bool _use_rotational_settings();
3402
3403 public:
3404 struct sb_info_t {
3405 coll_t cid;
3406 int64_t pool_id = INT64_MIN;
3407 std::list<ghobject_t> oids;
3408 BlueStore::SharedBlobRef sb;
3409 bluestore_extent_ref_map_t ref_map;
3410 bool compressed = false;
3411 bool passed = false;
3412 bool updated = false;
3413 };
3414 typedef btree::btree_set<
3415 uint64_t, std::less<uint64_t>,
3416 mempool::bluestore_fsck::pool_allocator<uint64_t>> uint64_t_btree_t;
3417
3418 typedef mempool::bluestore_fsck::map<uint64_t, sb_info_t> sb_info_map_t;
3419 struct FSCK_ObjectCtx {
3420 int64_t& errors;
3421 int64_t& warnings;
3422 uint64_t& num_objects;
3423 uint64_t& num_extents;
3424 uint64_t& num_blobs;
3425 uint64_t& num_sharded_objects;
3426 uint64_t& num_spanning_blobs;
3427
3428 mempool_dynamic_bitset* used_blocks;
3429 uint64_t_btree_t* used_omap_head;
3430
3431 ceph::mutex* sb_info_lock;
3432 sb_info_map_t& sb_info;
3433
3434 store_statfs_t& expected_store_statfs;
3435 per_pool_statfs& expected_pool_statfs;
3436 BlueStoreRepairer* repairer;
3437
3438 FSCK_ObjectCtx(int64_t& e,
3439 int64_t& w,
3440 uint64_t& _num_objects,
3441 uint64_t& _num_extents,
3442 uint64_t& _num_blobs,
3443 uint64_t& _num_sharded_objects,
3444 uint64_t& _num_spanning_blobs,
3445 mempool_dynamic_bitset* _ub,
3446 uint64_t_btree_t* _used_omap_head,
3447 ceph::mutex* _sb_info_lock,
3448 sb_info_map_t& _sb_info,
3449 store_statfs_t& _store_statfs,
3450 per_pool_statfs& _pool_statfs,
3451 BlueStoreRepairer* _repairer) :
3452 errors(e),
3453 warnings(w),
3454 num_objects(_num_objects),
3455 num_extents(_num_extents),
3456 num_blobs(_num_blobs),
3457 num_sharded_objects(_num_sharded_objects),
3458 num_spanning_blobs(_num_spanning_blobs),
3459 used_blocks(_ub),
3460 used_omap_head(_used_omap_head),
3461 sb_info_lock(_sb_info_lock),
3462 sb_info(_sb_info),
3463 expected_store_statfs(_store_statfs),
3464 expected_pool_statfs(_pool_statfs),
3465 repairer(_repairer) {
3466 }
3467 };
3468
3469 OnodeRef fsck_check_objects_shallow(
3470 FSCKDepth depth,
3471 int64_t pool_id,
3472 CollectionRef c,
3473 const ghobject_t& oid,
3474 const std::string& key,
3475 const ceph::buffer::list& value,
3476 mempool::bluestore_fsck::list<std::string>* expecting_shards,
3477 std::map<BlobRef, bluestore_blob_t::unused_t>* referenced,
3478 const BlueStore::FSCK_ObjectCtx& ctx);
3479
3480 private:
3481 void _fsck_check_object_omap(FSCKDepth depth,
3482 OnodeRef& o,
3483 const BlueStore::FSCK_ObjectCtx& ctx);
3484
3485 void _fsck_check_objects(FSCKDepth depth,
3486 FSCK_ObjectCtx& ctx);
3487 };
3488
3489 inline std::ostream& operator<<(std::ostream& out, const BlueStore::volatile_statfs& s) {
3490 return out
3491 << " allocated:"
3492 << s.values[BlueStore::volatile_statfs::STATFS_ALLOCATED]
3493 << " stored:"
3494 << s.values[BlueStore::volatile_statfs::STATFS_STORED]
3495 << " compressed:"
3496 << s.values[BlueStore::volatile_statfs::STATFS_COMPRESSED]
3497 << " compressed_orig:"
3498 << s.values[BlueStore::volatile_statfs::STATFS_COMPRESSED_ORIGINAL]
3499 << " compressed_alloc:"
3500 << s.values[BlueStore::volatile_statfs::STATFS_COMPRESSED_ALLOCATED];
3501 }
3502
3503 static inline void intrusive_ptr_add_ref(BlueStore::Onode *o) {
3504 o->get();
3505 }
3506 static inline void intrusive_ptr_release(BlueStore::Onode *o) {
3507 o->put();
3508 }
3509
3510 static inline void intrusive_ptr_add_ref(BlueStore::OpSequencer *o) {
3511 o->get();
3512 }
3513 static inline void intrusive_ptr_release(BlueStore::OpSequencer *o) {
3514 o->put();
3515 }
3516
3517 class BlueStoreRepairer
3518 {
3519 ceph::mutex lock = ceph::make_mutex("BlueStore::BlueStoreRepairer::lock");
3520
3521 public:
3522 // to simplify future potential migration to mempools
3523 using fsck_interval = interval_set<uint64_t>;
3524
3525 // Structure to track what pextents are used for specific cid/oid.
3526 // Similar to Bloom filter positive and false-positive matches are
3527 // possible only.
3528 // Maintains two lists of bloom filters for both cids and oids
3529 // where each list entry is a BF for specific disk pextent
3530 // The length of the extent per filter is measured on init.
3531 // Allows to filter out 'uninteresting' pextents to speadup subsequent
3532 // 'is_used' access.
3533 struct StoreSpaceTracker {
3534 const uint64_t BLOOM_FILTER_SALT_COUNT = 2;
3535 const uint64_t BLOOM_FILTER_TABLE_SIZE = 32; // bytes per single filter
3536 const uint64_t BLOOM_FILTER_EXPECTED_COUNT = 16; // arbitrary selected
3537 static const uint64_t DEF_MEM_CAP = 128 * 1024 * 1024;
3538
3539 typedef mempool::bluestore_fsck::vector<bloom_filter> bloom_vector;
3540 bloom_vector collections_bfs;
3541 bloom_vector objects_bfs;
3542
3543 bool was_filtered_out = false;
3544 uint64_t granularity = 0; // extent length for a single filter
3545
3546 StoreSpaceTracker() {
3547 }
3548 StoreSpaceTracker(const StoreSpaceTracker& from) :
3549 collections_bfs(from.collections_bfs),
3550 objects_bfs(from.objects_bfs),
3551 granularity(from.granularity) {
3552 }
3553
3554 void init(uint64_t total,
3555 uint64_t min_alloc_size,
3556 uint64_t mem_cap = DEF_MEM_CAP) {
3557 ceph_assert(!granularity); // not initialized yet
3558 ceph_assert(min_alloc_size && isp2(min_alloc_size));
3559 ceph_assert(mem_cap);
3560
3561 total = round_up_to(total, min_alloc_size);
3562 granularity = total * BLOOM_FILTER_TABLE_SIZE * 2 / mem_cap;
3563
3564 if (!granularity) {
3565 granularity = min_alloc_size;
3566 } else {
3567 granularity = round_up_to(granularity, min_alloc_size);
3568 }
3569
3570 uint64_t entries = round_up_to(total, granularity) / granularity;
3571 collections_bfs.resize(entries,
3572 bloom_filter(BLOOM_FILTER_SALT_COUNT,
3573 BLOOM_FILTER_TABLE_SIZE,
3574 0,
3575 BLOOM_FILTER_EXPECTED_COUNT));
3576 objects_bfs.resize(entries,
3577 bloom_filter(BLOOM_FILTER_SALT_COUNT,
3578 BLOOM_FILTER_TABLE_SIZE,
3579 0,
3580 BLOOM_FILTER_EXPECTED_COUNT));
3581 }
3582 inline uint32_t get_hash(const coll_t& cid) const {
3583 return cid.hash_to_shard(1);
3584 }
3585 inline void set_used(uint64_t offset, uint64_t len,
3586 const coll_t& cid, const ghobject_t& oid) {
3587 ceph_assert(granularity); // initialized
3588
3589 // can't call this func after filter_out has been applied
3590 ceph_assert(!was_filtered_out);
3591 if (!len) {
3592 return;
3593 }
3594 auto pos = offset / granularity;
3595 auto end_pos = (offset + len - 1) / granularity;
3596 while (pos <= end_pos) {
3597 collections_bfs[pos].insert(get_hash(cid));
3598 objects_bfs[pos].insert(oid.hobj.get_hash());
3599 ++pos;
3600 }
3601 }
3602 // filter-out entries unrelated to the specified(broken) extents.
3603 // 'is_used' calls are permitted after that only
3604 size_t filter_out(const fsck_interval& extents);
3605
3606 // determines if collection's present after filtering-out
3607 inline bool is_used(const coll_t& cid) const {
3608 ceph_assert(was_filtered_out);
3609 for(auto& bf : collections_bfs) {
3610 if (bf.contains(get_hash(cid))) {
3611 return true;
3612 }
3613 }
3614 return false;
3615 }
3616 // determines if object's present after filtering-out
3617 inline bool is_used(const ghobject_t& oid) const {
3618 ceph_assert(was_filtered_out);
3619 for(auto& bf : objects_bfs) {
3620 if (bf.contains(oid.hobj.get_hash())) {
3621 return true;
3622 }
3623 }
3624 return false;
3625 }
3626 // determines if collection's present before filtering-out
3627 inline bool is_used(const coll_t& cid, uint64_t offs) const {
3628 ceph_assert(granularity); // initialized
3629 ceph_assert(!was_filtered_out);
3630 auto &bf = collections_bfs[offs / granularity];
3631 if (bf.contains(get_hash(cid))) {
3632 return true;
3633 }
3634 return false;
3635 }
3636 // determines if object's present before filtering-out
3637 inline bool is_used(const ghobject_t& oid, uint64_t offs) const {
3638 ceph_assert(granularity); // initialized
3639 ceph_assert(!was_filtered_out);
3640 auto &bf = objects_bfs[offs / granularity];
3641 if (bf.contains(oid.hobj.get_hash())) {
3642 return true;
3643 }
3644 return false;
3645 }
3646 };
3647 public:
3648 void fix_per_pool_omap(KeyValueDB *db, int);
3649 bool remove_key(KeyValueDB *db, const std::string& prefix, const std::string& key);
3650 bool fix_shared_blob(KeyValueDB *db,
3651 uint64_t sbid,
3652 const ceph::buffer::list* bl);
3653 bool fix_statfs(KeyValueDB *db, const std::string& key,
3654 const store_statfs_t& new_statfs);
3655
3656 bool fix_leaked(KeyValueDB *db,
3657 FreelistManager* fm,
3658 uint64_t offset, uint64_t len);
3659 bool fix_false_free(KeyValueDB *db,
3660 FreelistManager* fm,
3661 uint64_t offset, uint64_t len);
3662 bool fix_spanning_blobs(
3663 KeyValueDB* db,
3664 std::function<void(KeyValueDB::Transaction)> f);
3665
3666 bool preprocess_misreference(KeyValueDB *db);
3667
3668 unsigned apply(KeyValueDB* db);
3669
3670 void note_misreference(uint64_t offs, uint64_t len, bool inc_error) {
3671 std::lock_guard l(lock);
3672 misreferenced_extents.union_insert(offs, len);
3673 if (inc_error) {
3674 ++to_repair_cnt;
3675 }
3676 }
3677 //////////////////////
3678 //In fact two methods below are the only ones in this class which are thread-safe!!
3679 void inc_repaired() {
3680 ++to_repair_cnt;
3681 }
3682 void request_compaction() {
3683 need_compact = true;
3684 }
3685 //////////////////////
3686
3687 void init_space_usage_tracker(
3688 uint64_t total_space, uint64_t lres_tracking_unit_size)
3689 {
3690 //NB: not for use in multithreading mode!!!
3691 space_usage_tracker.init(total_space, lres_tracking_unit_size);
3692 }
3693 void set_space_used(uint64_t offset, uint64_t len,
3694 const coll_t& cid, const ghobject_t& oid) {
3695 std::lock_guard l(lock);
3696 space_usage_tracker.set_used(offset, len, cid, oid);
3697 }
3698 inline bool is_used(const coll_t& cid) const {
3699 //NB: not for use in multithreading mode!!!
3700 return space_usage_tracker.is_used(cid);
3701 }
3702 inline bool is_used(const ghobject_t& oid) const {
3703 //NB: not for use in multithreading mode!!!
3704 return space_usage_tracker.is_used(oid);
3705 }
3706
3707 const fsck_interval& get_misreferences() const {
3708 //NB: not for use in multithreading mode!!!
3709 return misreferenced_extents;
3710 }
3711 KeyValueDB::Transaction get_fix_misreferences_txn() {
3712 //NB: not for use in multithreading mode!!!
3713 return fix_misreferences_txn;
3714 }
3715
3716 private:
3717 std::atomic<unsigned> to_repair_cnt = { 0 };
3718 std::atomic<bool> need_compact = { false };
3719 KeyValueDB::Transaction fix_per_pool_omap_txn;
3720 KeyValueDB::Transaction fix_fm_leaked_txn;
3721 KeyValueDB::Transaction fix_fm_false_free_txn;
3722 KeyValueDB::Transaction remove_key_txn;
3723 KeyValueDB::Transaction fix_statfs_txn;
3724 KeyValueDB::Transaction fix_shared_blob_txn;
3725
3726 KeyValueDB::Transaction fix_misreferences_txn;
3727 KeyValueDB::Transaction fix_onode_txn;
3728
3729 StoreSpaceTracker space_usage_tracker;
3730
3731 // non-shared extents with multiple references
3732 fsck_interval misreferenced_extents;
3733
3734 };
3735
3736 class RocksDBBlueFSVolumeSelector : public BlueFSVolumeSelector
3737 {
3738 template <class T, size_t MaxX, size_t MaxY>
3739 class matrix_2d {
3740 T values[MaxX][MaxY];
3741 public:
3742 matrix_2d() {
3743 clear();
3744 }
3745 T& at(size_t x, size_t y) {
3746 ceph_assert(x < MaxX);
3747 ceph_assert(y < MaxY);
3748
3749 return values[x][y];
3750 }
3751 size_t get_max_x() const {
3752 return MaxX;
3753 }
3754 size_t get_max_y() const {
3755 return MaxY;
3756 }
3757 void clear() {
3758 memset(values, 0, sizeof(values));
3759 }
3760 };
3761
3762 enum {
3763 // use 0/nullptr as unset indication
3764 LEVEL_FIRST = 1,
3765 LEVEL_LOG = LEVEL_FIRST, // BlueFS log
3766 LEVEL_WAL,
3767 LEVEL_DB,
3768 LEVEL_SLOW,
3769 LEVEL_MAX
3770 };
3771 // add +1 row for corresponding per-device totals
3772 // add +1 column for per-level actual (taken from file size) total
3773 typedef matrix_2d<uint64_t, BlueFS::MAX_BDEV + 1, LEVEL_MAX - LEVEL_FIRST + 1> per_level_per_dev_usage_t;
3774
3775 per_level_per_dev_usage_t per_level_per_dev_usage;
3776 // file count per level, add +1 to keep total file count
3777 uint64_t per_level_files[LEVEL_MAX - LEVEL_FIRST + 1] = { 0 };
3778
3779 // Note: maximum per-device totals below might be smaller than corresponding
3780 // perf counters by up to a single alloc unit (1M) due to superblock extent.
3781 // The later is not accounted here.
3782 per_level_per_dev_usage_t per_level_per_dev_max;
3783
3784 uint64_t l_totals[LEVEL_MAX - LEVEL_FIRST];
3785 uint64_t db_avail4slow = 0;
3786 enum {
3787 OLD_POLICY,
3788 USE_SOME_EXTRA
3789 };
3790
3791 public:
3792 RocksDBBlueFSVolumeSelector(
3793 uint64_t _wal_total,
3794 uint64_t _db_total,
3795 uint64_t _slow_total,
3796 uint64_t _level0_size,
3797 uint64_t _level_base,
3798 uint64_t _level_multiplier,
3799 double reserved_factor,
3800 uint64_t reserved,
3801 bool new_pol)
3802 {
3803 l_totals[LEVEL_LOG - LEVEL_FIRST] = 0; // not used at the moment
3804 l_totals[LEVEL_WAL - LEVEL_FIRST] = _wal_total;
3805 l_totals[LEVEL_DB - LEVEL_FIRST] = _db_total;
3806 l_totals[LEVEL_SLOW - LEVEL_FIRST] = _slow_total;
3807
3808 if (!new_pol) {
3809 return;
3810 }
3811
3812 // Calculating how much extra space is available at DB volume.
3813 // Depending on the presence of explicit reserved size specification it might be either
3814 // * DB volume size - reserved
3815 // or
3816 // * DB volume size - sum_max_level_size(0, L-1) - max_level_size(L) * reserved_factor
3817 if (!reserved) {
3818 uint64_t prev_levels = _level0_size;
3819 uint64_t cur_level = _level_base;
3820 uint64_t cur_threshold = 0;
3821 do {
3822 uint64_t next_level = cur_level * _level_multiplier;
3823 uint64_t next_threshold = prev_levels + cur_level + next_level * reserved_factor;
3824 if (_db_total <= next_threshold) {
3825 db_avail4slow = cur_threshold ? _db_total - cur_threshold : 0;
3826 break;
3827 } else {
3828 prev_levels += cur_level;
3829 cur_level = next_level;
3830 cur_threshold = next_threshold;
3831 }
3832 } while (true);
3833 } else {
3834 db_avail4slow = _db_total - reserved;
3835 }
3836 }
3837
3838 void* get_hint_for_log() const override {
3839 return reinterpret_cast<void*>(LEVEL_LOG);
3840 }
3841 void* get_hint_by_dir(std::string_view dirname) const override;
3842
3843 void add_usage(void* hint, const bluefs_fnode_t& fnode) override {
3844 if (hint == nullptr)
3845 return;
3846 size_t pos = (size_t)hint - LEVEL_FIRST;
3847 for (auto& p : fnode.extents) {
3848 auto& cur = per_level_per_dev_usage.at(p.bdev, pos);
3849 auto& max = per_level_per_dev_max.at(p.bdev, pos);
3850 cur += p.length;
3851 if (cur > max) {
3852 max = cur;
3853 }
3854 {
3855 //update per-device totals
3856 auto& cur = per_level_per_dev_usage.at(p.bdev, LEVEL_MAX - LEVEL_FIRST);
3857 auto& max = per_level_per_dev_max.at(p.bdev, LEVEL_MAX - LEVEL_FIRST);
3858 cur += p.length;
3859 if (cur > max) {
3860 max = cur;
3861 }
3862 }
3863 }
3864 {
3865 //update per-level actual totals
3866 auto& cur = per_level_per_dev_usage.at(BlueFS::MAX_BDEV, pos);
3867 auto& max = per_level_per_dev_max.at(BlueFS::MAX_BDEV, pos);
3868 cur += fnode.size;
3869 if (cur > max) {
3870 max = cur;
3871 }
3872 }
3873 ++per_level_files[pos];
3874 ++per_level_files[LEVEL_MAX - LEVEL_FIRST];
3875 }
3876 void sub_usage(void* hint, const bluefs_fnode_t& fnode) override {
3877 if (hint == nullptr)
3878 return;
3879 size_t pos = (size_t)hint - LEVEL_FIRST;
3880 for (auto& p : fnode.extents) {
3881 auto& cur = per_level_per_dev_usage.at(p.bdev, pos);
3882 ceph_assert(cur >= p.length);
3883 cur -= p.length;
3884
3885 //update per-device totals
3886 auto& cur2 = per_level_per_dev_usage.at(p.bdev, LEVEL_MAX - LEVEL_FIRST);
3887 ceph_assert(cur2 >= p.length);
3888 cur2 -= p.length;
3889 }
3890 //update per-level actual totals
3891 auto& cur = per_level_per_dev_usage.at(BlueFS::MAX_BDEV, pos);
3892 ceph_assert(cur >= fnode.size);
3893 cur -= fnode.size;
3894 ceph_assert(per_level_files[pos] > 0);
3895 --per_level_files[pos];
3896 ceph_assert(per_level_files[LEVEL_MAX - LEVEL_FIRST] > 0);
3897 --per_level_files[LEVEL_MAX - LEVEL_FIRST];
3898 }
3899 void add_usage(void* hint, uint64_t fsize) override {
3900 if (hint == nullptr)
3901 return;
3902 size_t pos = (size_t)hint - LEVEL_FIRST;
3903 //update per-level actual totals
3904 auto& cur = per_level_per_dev_usage.at(BlueFS::MAX_BDEV, pos);
3905 auto& max = per_level_per_dev_max.at(BlueFS::MAX_BDEV, pos);
3906 cur += fsize;
3907 if (cur > max) {
3908 max = cur;
3909 }
3910 }
3911 void sub_usage(void* hint, uint64_t fsize) override {
3912 if (hint == nullptr)
3913 return;
3914 size_t pos = (size_t)hint - LEVEL_FIRST;
3915 //update per-level actual totals
3916 auto& cur = per_level_per_dev_usage.at(BlueFS::MAX_BDEV, pos);
3917 ceph_assert(cur >= fsize);
3918 per_level_per_dev_usage.at(BlueFS::MAX_BDEV, pos) -= fsize;
3919 }
3920
3921 uint8_t select_prefer_bdev(void* h) override;
3922 void get_paths(
3923 const std::string& base,
3924 BlueFSVolumeSelector::paths& res) const override;
3925
3926 void dump(std::ostream& sout) override;
3927 };
3928
3929 #endif