]> git.proxmox.com Git - ceph.git/blob - ceph/src/os/bluestore/BlueStore.h
ce48331b223b6706b4a668071e6ae46a1658b264
[ceph.git] / ceph / src / os / bluestore / BlueStore.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_OSD_BLUESTORE_H
16 #define CEPH_OSD_BLUESTORE_H
17
18 #include "acconfig.h"
19
20 #include <unistd.h>
21
22 #include <atomic>
23 #include <chrono>
24 #include <ratio>
25 #include <mutex>
26 #include <condition_variable>
27
28 #include <boost/intrusive/list.hpp>
29 #include <boost/intrusive/unordered_set.hpp>
30 #include <boost/intrusive/set.hpp>
31 #include <boost/functional/hash.hpp>
32 #include <boost/dynamic_bitset.hpp>
33 #include <boost/circular_buffer.hpp>
34
35 #include "include/cpp-btree/btree_set.h"
36
37 #include "include/ceph_assert.h"
38 #include "include/interval_set.h"
39 #include "include/unordered_map.h"
40 #include "include/mempool.h"
41 #include "include/hash.h"
42 #include "common/bloom_filter.hpp"
43 #include "common/Finisher.h"
44 #include "common/ceph_mutex.h"
45 #include "common/Throttle.h"
46 #include "common/perf_counters.h"
47 #include "common/PriorityCache.h"
48 #include "compressor/Compressor.h"
49 #include "os/ObjectStore.h"
50
51 #include "bluestore_types.h"
52 #include "BlueFS.h"
53 #include "common/EventTrace.h"
54
55 #ifdef WITH_BLKIN
56 #include "common/zipkin_trace.h"
57 #endif
58
59 class Allocator;
60 class FreelistManager;
61 class BlueStoreRepairer;
62
63 //#define DEBUG_CACHE
64 //#define DEBUG_DEFERRED
65
66
67
68 // constants for Buffer::optimize()
69 #define MAX_BUFFER_SLOP_RATIO_DEN 8 // so actually 1/N
70
71
72 enum {
73 l_bluestore_first = 732430,
74 l_bluestore_kv_flush_lat,
75 l_bluestore_kv_commit_lat,
76 l_bluestore_kv_sync_lat,
77 l_bluestore_kv_final_lat,
78 l_bluestore_state_prepare_lat,
79 l_bluestore_state_aio_wait_lat,
80 l_bluestore_state_io_done_lat,
81 l_bluestore_state_kv_queued_lat,
82 l_bluestore_state_kv_committing_lat,
83 l_bluestore_state_kv_done_lat,
84 l_bluestore_state_deferred_queued_lat,
85 l_bluestore_state_deferred_aio_wait_lat,
86 l_bluestore_state_deferred_cleanup_lat,
87 l_bluestore_state_finishing_lat,
88 l_bluestore_state_done_lat,
89 l_bluestore_throttle_lat,
90 l_bluestore_submit_lat,
91 l_bluestore_commit_lat,
92 l_bluestore_read_lat,
93 l_bluestore_read_onode_meta_lat,
94 l_bluestore_read_wait_aio_lat,
95 l_bluestore_compress_lat,
96 l_bluestore_decompress_lat,
97 l_bluestore_csum_lat,
98 l_bluestore_compress_success_count,
99 l_bluestore_compress_rejected_count,
100 l_bluestore_write_pad_bytes,
101 l_bluestore_deferred_write_ops,
102 l_bluestore_deferred_write_bytes,
103 l_bluestore_write_penalty_read_ops,
104 l_bluestore_allocated,
105 l_bluestore_stored,
106 l_bluestore_compressed,
107 l_bluestore_compressed_allocated,
108 l_bluestore_compressed_original,
109 l_bluestore_onodes,
110 l_bluestore_pinned_onodes,
111 l_bluestore_onode_hits,
112 l_bluestore_onode_misses,
113 l_bluestore_onode_shard_hits,
114 l_bluestore_onode_shard_misses,
115 l_bluestore_extents,
116 l_bluestore_blobs,
117 l_bluestore_buffers,
118 l_bluestore_buffer_bytes,
119 l_bluestore_buffer_hit_bytes,
120 l_bluestore_buffer_miss_bytes,
121 l_bluestore_write_big,
122 l_bluestore_write_big_bytes,
123 l_bluestore_write_big_blobs,
124 l_bluestore_write_big_deferred,
125 l_bluestore_write_small,
126 l_bluestore_write_small_bytes,
127 l_bluestore_write_small_unused,
128 l_bluestore_write_deferred,
129 l_bluestore_write_small_pre_read,
130 l_bluestore_write_new,
131 l_bluestore_txc,
132 l_bluestore_onode_reshard,
133 l_bluestore_blob_split,
134 l_bluestore_extent_compress,
135 l_bluestore_gc_merged,
136 l_bluestore_read_eio,
137 l_bluestore_reads_with_retries,
138 l_bluestore_fragmentation,
139 l_bluestore_omap_seek_to_first_lat,
140 l_bluestore_omap_upper_bound_lat,
141 l_bluestore_omap_lower_bound_lat,
142 l_bluestore_omap_next_lat,
143 l_bluestore_omap_get_keys_lat,
144 l_bluestore_omap_get_values_lat,
145 l_bluestore_clist_lat,
146 l_bluestore_remove_lat,
147 l_bluestore_last
148 };
149
150 #define META_POOL_ID ((uint64_t)-1ull)
151
152 class BlueStore : public ObjectStore,
153 public md_config_obs_t {
154 // -----------------------------------------------------
155 // types
156 public:
157 // config observer
158 const char** get_tracked_conf_keys() const override;
159 void handle_conf_change(const ConfigProxy& conf,
160 const std::set<std::string> &changed) override;
161
162 //handler for discard event
163 void handle_discard(interval_set<uint64_t>& to_release);
164
165 void _set_csum();
166 void _set_compression();
167 void _set_throttle_params();
168 int _set_cache_sizes();
169 void _set_max_defer_interval() {
170 max_defer_interval =
171 cct->_conf.get_val<double>("bluestore_max_defer_interval");
172 }
173
174 struct TransContext;
175
176 typedef std::map<uint64_t, ceph::buffer::list> ready_regions_t;
177
178
179 struct BufferSpace;
180 struct Collection;
181 typedef boost::intrusive_ptr<Collection> CollectionRef;
182
183 struct AioContext {
184 virtual void aio_finish(BlueStore *store) = 0;
185 virtual ~AioContext() {}
186 };
187
188 /// cached buffer
189 struct Buffer {
190 MEMPOOL_CLASS_HELPERS();
191
192 enum {
193 STATE_EMPTY, ///< empty buffer -- used for cache history
194 STATE_CLEAN, ///< clean data that is up to date
195 STATE_WRITING, ///< data that is being written (io not yet complete)
196 };
197 static const char *get_state_name(int s) {
198 switch (s) {
199 case STATE_EMPTY: return "empty";
200 case STATE_CLEAN: return "clean";
201 case STATE_WRITING: return "writing";
202 default: return "???";
203 }
204 }
205 enum {
206 FLAG_NOCACHE = 1, ///< trim when done WRITING (do not become CLEAN)
207 // NOTE: fix operator<< when you define a second flag
208 };
209 static const char *get_flag_name(int s) {
210 switch (s) {
211 case FLAG_NOCACHE: return "nocache";
212 default: return "???";
213 }
214 }
215
216 BufferSpace *space;
217 uint16_t state; ///< STATE_*
218 uint16_t cache_private = 0; ///< opaque (to us) value used by Cache impl
219 uint32_t flags; ///< FLAG_*
220 uint64_t seq;
221 uint32_t offset, length;
222 ceph::buffer::list data;
223
224 boost::intrusive::list_member_hook<> lru_item;
225 boost::intrusive::list_member_hook<> state_item;
226
227 Buffer(BufferSpace *space, unsigned s, uint64_t q, uint32_t o, uint32_t l,
228 unsigned f = 0)
229 : space(space), state(s), flags(f), seq(q), offset(o), length(l) {}
230 Buffer(BufferSpace *space, unsigned s, uint64_t q, uint32_t o, ceph::buffer::list& b,
231 unsigned f = 0)
232 : space(space), state(s), flags(f), seq(q), offset(o),
233 length(b.length()), data(b) {}
234
235 bool is_empty() const {
236 return state == STATE_EMPTY;
237 }
238 bool is_clean() const {
239 return state == STATE_CLEAN;
240 }
241 bool is_writing() const {
242 return state == STATE_WRITING;
243 }
244
245 uint32_t end() const {
246 return offset + length;
247 }
248
249 void truncate(uint32_t newlen) {
250 ceph_assert(newlen < length);
251 if (data.length()) {
252 ceph::buffer::list t;
253 t.substr_of(data, 0, newlen);
254 data = std::move(t);
255 }
256 length = newlen;
257 }
258 void maybe_rebuild() {
259 if (data.length() &&
260 (data.get_num_buffers() > 1 ||
261 data.front().wasted() > data.length() / MAX_BUFFER_SLOP_RATIO_DEN)) {
262 data.rebuild();
263 }
264 }
265
266 void dump(ceph::Formatter *f) const {
267 f->dump_string("state", get_state_name(state));
268 f->dump_unsigned("seq", seq);
269 f->dump_unsigned("offset", offset);
270 f->dump_unsigned("length", length);
271 f->dump_unsigned("data_length", data.length());
272 }
273 };
274
275 struct BufferCacheShard;
276
277 /// map logical extent range (object) onto buffers
278 struct BufferSpace {
279 enum {
280 BYPASS_CLEAN_CACHE = 0x1, // bypass clean cache
281 };
282
283 typedef boost::intrusive::list<
284 Buffer,
285 boost::intrusive::member_hook<
286 Buffer,
287 boost::intrusive::list_member_hook<>,
288 &Buffer::state_item> > state_list_t;
289
290 mempool::bluestore_cache_meta::map<uint32_t, std::unique_ptr<Buffer>>
291 buffer_map;
292
293 // we use a bare intrusive list here instead of std::map because
294 // it uses less memory and we expect this to be very small (very
295 // few IOs in flight to the same Blob at the same time).
296 state_list_t writing; ///< writing buffers, sorted by seq, ascending
297
298 ~BufferSpace() {
299 ceph_assert(buffer_map.empty());
300 ceph_assert(writing.empty());
301 }
302
303 void _add_buffer(BufferCacheShard* cache, Buffer* b, int level, Buffer* near) {
304 cache->_audit("_add_buffer start");
305 buffer_map[b->offset].reset(b);
306 if (b->is_writing()) {
307 // we might get already cached data for which resetting mempool is inppropriate
308 // hence calling try_assign_to_mempool
309 b->data.try_assign_to_mempool(mempool::mempool_bluestore_writing);
310 if (writing.empty() || writing.rbegin()->seq <= b->seq) {
311 writing.push_back(*b);
312 } else {
313 auto it = writing.begin();
314 while (it->seq < b->seq) {
315 ++it;
316 }
317
318 ceph_assert(it->seq >= b->seq);
319 // note that this will insert b before it
320 // hence the order is maintained
321 writing.insert(it, *b);
322 }
323 } else {
324 b->data.reassign_to_mempool(mempool::mempool_bluestore_cache_data);
325 cache->_add(b, level, near);
326 }
327 cache->_audit("_add_buffer end");
328 }
329 void _rm_buffer(BufferCacheShard* cache, Buffer *b) {
330 _rm_buffer(cache, buffer_map.find(b->offset));
331 }
332 void _rm_buffer(BufferCacheShard* cache,
333 std::map<uint32_t, std::unique_ptr<Buffer>>::iterator p) {
334 ceph_assert(p != buffer_map.end());
335 cache->_audit("_rm_buffer start");
336 if (p->second->is_writing()) {
337 writing.erase(writing.iterator_to(*p->second));
338 } else {
339 cache->_rm(p->second.get());
340 }
341 buffer_map.erase(p);
342 cache->_audit("_rm_buffer end");
343 }
344
345 std::map<uint32_t,std::unique_ptr<Buffer>>::iterator _data_lower_bound(
346 uint32_t offset) {
347 auto i = buffer_map.lower_bound(offset);
348 if (i != buffer_map.begin()) {
349 --i;
350 if (i->first + i->second->length <= offset)
351 ++i;
352 }
353 return i;
354 }
355
356 // must be called under protection of the Cache lock
357 void _clear(BufferCacheShard* cache);
358
359 // return value is the highest cache_private of a trimmed buffer, or 0.
360 int discard(BufferCacheShard* cache, uint32_t offset, uint32_t length) {
361 std::lock_guard l(cache->lock);
362 int ret = _discard(cache, offset, length);
363 cache->_trim();
364 return ret;
365 }
366 int _discard(BufferCacheShard* cache, uint32_t offset, uint32_t length);
367
368 void write(BufferCacheShard* cache, uint64_t seq, uint32_t offset, ceph::buffer::list& bl,
369 unsigned flags) {
370 std::lock_guard l(cache->lock);
371 Buffer *b = new Buffer(this, Buffer::STATE_WRITING, seq, offset, bl,
372 flags);
373 b->cache_private = _discard(cache, offset, bl.length());
374 _add_buffer(cache, b, (flags & Buffer::FLAG_NOCACHE) ? 0 : 1, nullptr);
375 cache->_trim();
376 }
377 void _finish_write(BufferCacheShard* cache, uint64_t seq);
378 void did_read(BufferCacheShard* cache, uint32_t offset, ceph::buffer::list& bl) {
379 std::lock_guard l(cache->lock);
380 Buffer *b = new Buffer(this, Buffer::STATE_CLEAN, 0, offset, bl);
381 b->cache_private = _discard(cache, offset, bl.length());
382 _add_buffer(cache, b, 1, nullptr);
383 cache->_trim();
384 }
385
386 void read(BufferCacheShard* cache, uint32_t offset, uint32_t length,
387 BlueStore::ready_regions_t& res,
388 interval_set<uint32_t>& res_intervals,
389 int flags = 0);
390
391 void truncate(BufferCacheShard* cache, uint32_t offset) {
392 discard(cache, offset, (uint32_t)-1 - offset);
393 }
394
395 void split(BufferCacheShard* cache, size_t pos, BufferSpace &r);
396
397 void dump(BufferCacheShard* cache, ceph::Formatter *f) const {
398 std::lock_guard l(cache->lock);
399 f->open_array_section("buffers");
400 for (auto& i : buffer_map) {
401 f->open_object_section("buffer");
402 ceph_assert(i.first == i.second->offset);
403 i.second->dump(f);
404 f->close_section();
405 }
406 f->close_section();
407 }
408 };
409
410 struct SharedBlobSet;
411
412 /// in-memory shared blob state (incl cached buffers)
413 struct SharedBlob {
414 MEMPOOL_CLASS_HELPERS();
415
416 std::atomic_int nref = {0}; ///< reference count
417 bool loaded = false;
418
419 CollectionRef coll;
420 union {
421 uint64_t sbid_unloaded; ///< sbid if persistent isn't loaded
422 bluestore_shared_blob_t *persistent; ///< persistent part of the shared blob if any
423 };
424 BufferSpace bc; ///< buffer cache
425
426 SharedBlob(Collection *_coll) : coll(_coll), sbid_unloaded(0) {
427 if (get_cache()) {
428 get_cache()->add_blob();
429 }
430 }
431 SharedBlob(uint64_t i, Collection *_coll);
432 ~SharedBlob();
433
434 uint64_t get_sbid() const {
435 return loaded ? persistent->sbid : sbid_unloaded;
436 }
437
438 friend void intrusive_ptr_add_ref(SharedBlob *b) { b->get(); }
439 friend void intrusive_ptr_release(SharedBlob *b) { b->put(); }
440
441 void dump(ceph::Formatter* f) const;
442 friend std::ostream& operator<<(std::ostream& out, const SharedBlob& sb);
443
444 void get() {
445 ++nref;
446 }
447 void put();
448
449 /// get logical references
450 void get_ref(uint64_t offset, uint32_t length);
451
452 /// put logical references, and get back any released extents
453 void put_ref(uint64_t offset, uint32_t length,
454 PExtentVector *r, bool *unshare);
455
456 void finish_write(uint64_t seq);
457
458 friend bool operator==(const SharedBlob &l, const SharedBlob &r) {
459 return l.get_sbid() == r.get_sbid();
460 }
461 inline BufferCacheShard* get_cache() {
462 return coll ? coll->cache : nullptr;
463 }
464 inline SharedBlobSet* get_parent() {
465 return coll ? &(coll->shared_blob_set) : nullptr;
466 }
467 inline bool is_loaded() const {
468 return loaded;
469 }
470
471 };
472 typedef boost::intrusive_ptr<SharedBlob> SharedBlobRef;
473
474 /// a lookup table of SharedBlobs
475 struct SharedBlobSet {
476 /// protect lookup, insertion, removal
477 ceph::mutex lock = ceph::make_mutex("BlueStore::SharedBlobSet::lock");
478
479 // we use a bare pointer because we don't want to affect the ref
480 // count
481 mempool::bluestore_cache_meta::unordered_map<uint64_t,SharedBlob*> sb_map;
482
483 SharedBlobRef lookup(uint64_t sbid) {
484 std::lock_guard l(lock);
485 auto p = sb_map.find(sbid);
486 if (p == sb_map.end() ||
487 p->second->nref == 0) {
488 return nullptr;
489 }
490 return p->second;
491 }
492
493 void add(Collection* coll, SharedBlob *sb) {
494 std::lock_guard l(lock);
495 sb_map[sb->get_sbid()] = sb;
496 sb->coll = coll;
497 }
498
499 bool remove(SharedBlob *sb, bool verify_nref_is_zero=false) {
500 std::lock_guard l(lock);
501 ceph_assert(sb->get_parent() == this);
502 if (verify_nref_is_zero && sb->nref != 0) {
503 return false;
504 }
505 // only remove if it still points to us
506 auto p = sb_map.find(sb->get_sbid());
507 if (p != sb_map.end() &&
508 p->second == sb) {
509 sb_map.erase(p);
510 }
511 return true;
512 }
513
514 bool empty() {
515 std::lock_guard l(lock);
516 return sb_map.empty();
517 }
518
519 template <int LogLevelV>
520 void dump(CephContext *cct);
521 };
522
523 //#define CACHE_BLOB_BL // not sure if this is a win yet or not... :/
524
525 /// in-memory blob metadata and associated cached buffers (if any)
526 struct Blob {
527 MEMPOOL_CLASS_HELPERS();
528
529 std::atomic_int nref = {0}; ///< reference count
530 int16_t id = -1; ///< id, for spanning blobs only, >= 0
531 int16_t last_encoded_id = -1; ///< (ephemeral) used during encoding only
532 SharedBlobRef shared_blob; ///< shared blob state (if any)
533
534 private:
535 mutable bluestore_blob_t blob; ///< decoded blob metadata
536 #ifdef CACHE_BLOB_BL
537 mutable ceph::buffer::list blob_bl; ///< cached encoded blob, blob is dirty if empty
538 #endif
539 /// refs from this shard. ephemeral if id<0, persisted if spanning.
540 bluestore_blob_use_tracker_t used_in_blob;
541
542 public:
543
544 friend void intrusive_ptr_add_ref(Blob *b) { b->get(); }
545 friend void intrusive_ptr_release(Blob *b) { b->put(); }
546
547 void dump(ceph::Formatter* f) const;
548 friend std::ostream& operator<<(std::ostream& out, const Blob &b);
549
550 const bluestore_blob_use_tracker_t& get_blob_use_tracker() const {
551 return used_in_blob;
552 }
553 bool is_referenced() const {
554 return used_in_blob.is_not_empty();
555 }
556 uint32_t get_referenced_bytes() const {
557 return used_in_blob.get_referenced_bytes();
558 }
559
560 bool is_spanning() const {
561 return id >= 0;
562 }
563
564 bool can_split() const {
565 std::lock_guard l(shared_blob->get_cache()->lock);
566 // splitting a BufferSpace writing list is too hard; don't try.
567 return shared_blob->bc.writing.empty() &&
568 used_in_blob.can_split() &&
569 get_blob().can_split();
570 }
571
572 bool can_split_at(uint32_t blob_offset) const {
573 return used_in_blob.can_split_at(blob_offset) &&
574 get_blob().can_split_at(blob_offset);
575 }
576
577 bool can_reuse_blob(uint32_t min_alloc_size,
578 uint32_t target_blob_size,
579 uint32_t b_offset,
580 uint32_t *length0);
581
582 void dup(Blob& o) {
583 o.shared_blob = shared_blob;
584 o.blob = blob;
585 #ifdef CACHE_BLOB_BL
586 o.blob_bl = blob_bl;
587 #endif
588 }
589
590 inline const bluestore_blob_t& get_blob() const {
591 return blob;
592 }
593 inline bluestore_blob_t& dirty_blob() {
594 #ifdef CACHE_BLOB_BL
595 blob_bl.clear();
596 #endif
597 return blob;
598 }
599
600 /// discard buffers for unallocated regions
601 void discard_unallocated(Collection *coll);
602
603 /// get logical references
604 void get_ref(Collection *coll, uint32_t offset, uint32_t length);
605 /// put logical references, and get back any released extents
606 bool put_ref(Collection *coll, uint32_t offset, uint32_t length,
607 PExtentVector *r);
608
609 /// split the blob
610 void split(Collection *coll, uint32_t blob_offset, Blob *o);
611
612 void get() {
613 ++nref;
614 }
615 void put() {
616 if (--nref == 0)
617 delete this;
618 }
619
620
621 #ifdef CACHE_BLOB_BL
622 void _encode() const {
623 if (blob_bl.length() == 0 ) {
624 encode(blob, blob_bl);
625 } else {
626 ceph_assert(blob_bl.length());
627 }
628 }
629 void bound_encode(
630 size_t& p,
631 bool include_ref_map) const {
632 _encode();
633 p += blob_bl.length();
634 if (include_ref_map) {
635 used_in_blob.bound_encode(p);
636 }
637 }
638 void encode(
639 ceph::buffer::list::contiguous_appender& p,
640 bool include_ref_map) const {
641 _encode();
642 p.append(blob_bl);
643 if (include_ref_map) {
644 used_in_blob.encode(p);
645 }
646 }
647 void decode(
648 Collection */*coll*/,
649 ceph::buffer::ptr::const_iterator& p,
650 bool include_ref_map) {
651 const char *start = p.get_pos();
652 denc(blob, p);
653 const char *end = p.get_pos();
654 blob_bl.clear();
655 blob_bl.append(start, end - start);
656 if (include_ref_map) {
657 used_in_blob.decode(p);
658 }
659 }
660 #else
661 void bound_encode(
662 size_t& p,
663 uint64_t struct_v,
664 uint64_t sbid,
665 bool include_ref_map) const {
666 denc(blob, p, struct_v);
667 if (blob.is_shared()) {
668 denc(sbid, p);
669 }
670 if (include_ref_map) {
671 used_in_blob.bound_encode(p);
672 }
673 }
674 void encode(
675 ceph::buffer::list::contiguous_appender& p,
676 uint64_t struct_v,
677 uint64_t sbid,
678 bool include_ref_map) const {
679 denc(blob, p, struct_v);
680 if (blob.is_shared()) {
681 denc(sbid, p);
682 }
683 if (include_ref_map) {
684 used_in_blob.encode(p);
685 }
686 }
687 void decode(
688 Collection *coll,
689 ceph::buffer::ptr::const_iterator& p,
690 uint64_t struct_v,
691 uint64_t* sbid,
692 bool include_ref_map);
693 #endif
694 };
695 typedef boost::intrusive_ptr<Blob> BlobRef;
696 typedef mempool::bluestore_cache_meta::map<int,BlobRef> blob_map_t;
697
698 /// a logical extent, pointing to (some portion of) a blob
699 typedef boost::intrusive::set_base_hook<boost::intrusive::optimize_size<true> > ExtentBase; //making an alias to avoid build warnings
700 struct Extent : public ExtentBase {
701 MEMPOOL_CLASS_HELPERS();
702
703 uint32_t logical_offset = 0; ///< logical offset
704 uint32_t blob_offset = 0; ///< blob offset
705 uint32_t length = 0; ///< length
706 BlobRef blob; ///< the blob with our data
707
708 /// ctor for lookup only
709 explicit Extent(uint32_t lo) : ExtentBase(), logical_offset(lo) { }
710 /// ctor for delayed initialization (see decode_some())
711 explicit Extent() : ExtentBase() {
712 }
713 /// ctor for general usage
714 Extent(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b)
715 : ExtentBase(),
716 logical_offset(lo), blob_offset(o), length(l) {
717 assign_blob(b);
718 }
719 ~Extent() {
720 if (blob) {
721 blob->shared_blob->get_cache()->rm_extent();
722 }
723 }
724
725 void dump(ceph::Formatter* f) const;
726
727 void assign_blob(const BlobRef& b) {
728 ceph_assert(!blob);
729 blob = b;
730 blob->shared_blob->get_cache()->add_extent();
731 }
732
733 // comparators for intrusive_set
734 friend bool operator<(const Extent &a, const Extent &b) {
735 return a.logical_offset < b.logical_offset;
736 }
737 friend bool operator>(const Extent &a, const Extent &b) {
738 return a.logical_offset > b.logical_offset;
739 }
740 friend bool operator==(const Extent &a, const Extent &b) {
741 return a.logical_offset == b.logical_offset;
742 }
743
744 uint32_t blob_start() const {
745 return logical_offset - blob_offset;
746 }
747
748 uint32_t blob_end() const {
749 return blob_start() + blob->get_blob().get_logical_length();
750 }
751
752 uint32_t logical_end() const {
753 return logical_offset + length;
754 }
755
756 // return true if any piece of the blob is out of
757 // the given range [o, o + l].
758 bool blob_escapes_range(uint32_t o, uint32_t l) const {
759 return blob_start() < o || blob_end() > o + l;
760 }
761 };
762 typedef boost::intrusive::set<Extent> extent_map_t;
763
764
765 friend std::ostream& operator<<(std::ostream& out, const Extent& e);
766
767 struct OldExtent {
768 boost::intrusive::list_member_hook<> old_extent_item;
769 Extent e;
770 PExtentVector r;
771 bool blob_empty; // flag to track the last removed extent that makes blob
772 // empty - required to update compression stat properly
773 OldExtent(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b)
774 : e(lo, o, l, b), blob_empty(false) {
775 }
776 static OldExtent* create(CollectionRef c,
777 uint32_t lo,
778 uint32_t o,
779 uint32_t l,
780 BlobRef& b);
781 };
782 typedef boost::intrusive::list<
783 OldExtent,
784 boost::intrusive::member_hook<
785 OldExtent,
786 boost::intrusive::list_member_hook<>,
787 &OldExtent::old_extent_item> > old_extent_map_t;
788
789 struct Onode;
790
791 /// a sharded extent map, mapping offsets to lextents to blobs
792 struct ExtentMap {
793 Onode *onode;
794 extent_map_t extent_map; ///< map of Extents to Blobs
795 blob_map_t spanning_blob_map; ///< blobs that span shards
796 typedef boost::intrusive_ptr<Onode> OnodeRef;
797
798 struct Shard {
799 bluestore_onode_t::shard_info *shard_info = nullptr;
800 unsigned extents = 0; ///< count extents in this shard
801 bool loaded = false; ///< true if shard is loaded
802 bool dirty = false; ///< true if shard is dirty and needs reencoding
803 };
804 mempool::bluestore_cache_meta::vector<Shard> shards; ///< shards
805
806 ceph::buffer::list inline_bl; ///< cached encoded map, if unsharded; empty=>dirty
807
808 uint32_t needs_reshard_begin = 0;
809 uint32_t needs_reshard_end = 0;
810
811 void dup(BlueStore* b, TransContext*, CollectionRef&, OnodeRef&, OnodeRef&,
812 uint64_t&, uint64_t&, uint64_t&);
813
814 bool needs_reshard() const {
815 return needs_reshard_end > needs_reshard_begin;
816 }
817 void clear_needs_reshard() {
818 needs_reshard_begin = needs_reshard_end = 0;
819 }
820 void request_reshard(uint32_t begin, uint32_t end) {
821 if (begin < needs_reshard_begin) {
822 needs_reshard_begin = begin;
823 }
824 if (end > needs_reshard_end) {
825 needs_reshard_end = end;
826 }
827 }
828
829 struct DeleteDisposer {
830 void operator()(Extent *e) { delete e; }
831 };
832
833 ExtentMap(Onode *o);
834 ~ExtentMap() {
835 extent_map.clear_and_dispose(DeleteDisposer());
836 }
837
838 void clear() {
839 extent_map.clear_and_dispose(DeleteDisposer());
840 shards.clear();
841 inline_bl.clear();
842 clear_needs_reshard();
843 }
844
845 void dump(ceph::Formatter* f) const;
846
847 bool encode_some(uint32_t offset, uint32_t length, ceph::buffer::list& bl,
848 unsigned *pn);
849 unsigned decode_some(ceph::buffer::list& bl);
850
851 void bound_encode_spanning_blobs(size_t& p);
852 void encode_spanning_blobs(ceph::buffer::list::contiguous_appender& p);
853 void decode_spanning_blobs(ceph::buffer::ptr::const_iterator& p);
854
855 BlobRef get_spanning_blob(int id) {
856 auto p = spanning_blob_map.find(id);
857 ceph_assert(p != spanning_blob_map.end());
858 return p->second;
859 }
860
861 void update(KeyValueDB::Transaction t, bool force);
862 decltype(BlueStore::Blob::id) allocate_spanning_blob_id();
863 void reshard(
864 KeyValueDB *db,
865 KeyValueDB::Transaction t);
866
867 /// initialize Shards from the onode
868 void init_shards(bool loaded, bool dirty);
869
870 /// return index of shard containing offset
871 /// or -1 if not found
872 int seek_shard(uint32_t offset) {
873 size_t end = shards.size();
874 size_t mid, left = 0;
875 size_t right = end; // one passed the right end
876
877 while (left < right) {
878 mid = left + (right - left) / 2;
879 if (offset >= shards[mid].shard_info->offset) {
880 size_t next = mid + 1;
881 if (next >= end || offset < shards[next].shard_info->offset)
882 return mid;
883 //continue to search forwards
884 left = next;
885 } else {
886 //continue to search backwards
887 right = mid;
888 }
889 }
890
891 return -1; // not found
892 }
893
894 /// check if a range spans a shard
895 bool spans_shard(uint32_t offset, uint32_t length) {
896 if (shards.empty()) {
897 return false;
898 }
899 int s = seek_shard(offset);
900 ceph_assert(s >= 0);
901 if (s == (int)shards.size() - 1) {
902 return false; // last shard
903 }
904 if (offset + length <= shards[s+1].shard_info->offset) {
905 return false;
906 }
907 return true;
908 }
909
910 /// ensure that a range of the map is loaded
911 void fault_range(KeyValueDB *db,
912 uint32_t offset, uint32_t length);
913
914 /// ensure a range of the map is marked dirty
915 void dirty_range(uint32_t offset, uint32_t length);
916
917 /// for seek_lextent test
918 extent_map_t::iterator find(uint64_t offset);
919
920 /// seek to the first lextent including or after offset
921 extent_map_t::iterator seek_lextent(uint64_t offset);
922 extent_map_t::const_iterator seek_lextent(uint64_t offset) const;
923
924 /// add a new Extent
925 void add(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b) {
926 extent_map.insert(*new Extent(lo, o, l, b));
927 }
928
929 /// remove (and delete) an Extent
930 void rm(extent_map_t::iterator p) {
931 extent_map.erase_and_dispose(p, DeleteDisposer());
932 }
933
934 bool has_any_lextents(uint64_t offset, uint64_t length);
935
936 /// consolidate adjacent lextents in extent_map
937 int compress_extent_map(uint64_t offset, uint64_t length);
938
939 /// punch a logical hole. add lextents to deref to target list.
940 void punch_hole(CollectionRef &c,
941 uint64_t offset, uint64_t length,
942 old_extent_map_t *old_extents);
943
944 /// put new lextent into lextent_map overwriting existing ones if
945 /// any and update references accordingly
946 Extent *set_lextent(CollectionRef &c,
947 uint64_t logical_offset,
948 uint64_t offset, uint64_t length,
949 BlobRef b,
950 old_extent_map_t *old_extents);
951
952 /// split a blob (and referring extents)
953 BlobRef split_blob(BlobRef lb, uint32_t blob_offset, uint32_t pos);
954 };
955
956 /// Compressed Blob Garbage collector
957 /*
958 The primary idea of the collector is to estimate a difference between
959 allocation units(AU) currently present for compressed blobs and new AUs
960 required to store that data uncompressed.
961 Estimation is performed for protrusive extents within a logical range
962 determined by a concatenation of old_extents collection and specific(current)
963 write request.
964 The root cause for old_extents use is the need to handle blob ref counts
965 properly. Old extents still hold blob refs and hence we need to traverse
966 the collection to determine if blob to be released.
967 Protrusive extents are extents that fit into the blob std::set in action
968 (ones that are below the logical range from above) but not removed totally
969 due to the current write.
970 E.g. for
971 extent1 <loffs = 100, boffs = 100, len = 100> ->
972 blob1<compressed, len_on_disk=4096, logical_len=8192>
973 extent2 <loffs = 200, boffs = 200, len = 100> ->
974 blob2<raw, len_on_disk=4096, llen=4096>
975 extent3 <loffs = 300, boffs = 300, len = 100> ->
976 blob1<compressed, len_on_disk=4096, llen=8192>
977 extent4 <loffs = 4096, boffs = 0, len = 100> ->
978 blob3<raw, len_on_disk=4096, llen=4096>
979 write(300~100)
980 protrusive extents are within the following ranges <0~300, 400~8192-400>
981 In this case existing AUs that might be removed due to GC (i.e. blob1)
982 use 2x4K bytes.
983 And new AUs expected after GC = 0 since extent1 to be merged into blob2.
984 Hence we should do a collect.
985 */
986 class GarbageCollector
987 {
988 public:
989 /// return amount of allocation units that might be saved due to GC
990 int64_t estimate(
991 uint64_t offset,
992 uint64_t length,
993 const ExtentMap& extent_map,
994 const old_extent_map_t& old_extents,
995 uint64_t min_alloc_size);
996
997 /// return a collection of extents to perform GC on
998 const interval_set<uint64_t>& get_extents_to_collect() const {
999 return extents_to_collect;
1000 }
1001 GarbageCollector(CephContext* _cct) : cct(_cct) {}
1002
1003 private:
1004 struct BlobInfo {
1005 uint64_t referenced_bytes = 0; ///< amount of bytes referenced in blob
1006 int64_t expected_allocations = 0; ///< new alloc units required
1007 ///< in case of gc fulfilled
1008 bool collect_candidate = false; ///< indicate if blob has any extents
1009 ///< eligible for GC.
1010 extent_map_t::const_iterator first_lextent; ///< points to the first
1011 ///< lextent referring to
1012 ///< the blob if any.
1013 ///< collect_candidate flag
1014 ///< determines the validity
1015 extent_map_t::const_iterator last_lextent; ///< points to the last
1016 ///< lextent referring to
1017 ///< the blob if any.
1018
1019 BlobInfo(uint64_t ref_bytes) :
1020 referenced_bytes(ref_bytes) {
1021 }
1022 };
1023 CephContext* cct;
1024 std::map<Blob*, BlobInfo> affected_blobs; ///< compressed blobs and their ref_map
1025 ///< copies that are affected by the
1026 ///< specific write
1027
1028 ///< protrusive extents that should be collected if GC takes place
1029 interval_set<uint64_t> extents_to_collect;
1030
1031 boost::optional<uint64_t > used_alloc_unit; ///< last processed allocation
1032 ///< unit when traversing
1033 ///< protrusive extents.
1034 ///< Other extents mapped to
1035 ///< this AU to be ignored
1036 ///< (except the case where
1037 ///< uncompressed extent follows
1038 ///< compressed one - see below).
1039 BlobInfo* blob_info_counted = nullptr; ///< std::set if previous allocation unit
1040 ///< caused expected_allocations
1041 ///< counter increment at this blob.
1042 ///< if uncompressed extent follows
1043 ///< a decrement for the
1044 ///< expected_allocations counter
1045 ///< is needed
1046 int64_t expected_allocations = 0; ///< new alloc units required in case
1047 ///< of gc fulfilled
1048 int64_t expected_for_release = 0; ///< alloc units currently used by
1049 ///< compressed blobs that might
1050 ///< gone after GC
1051
1052 protected:
1053 void process_protrusive_extents(const BlueStore::ExtentMap& extent_map,
1054 uint64_t start_offset,
1055 uint64_t end_offset,
1056 uint64_t start_touch_offset,
1057 uint64_t end_touch_offset,
1058 uint64_t min_alloc_size);
1059 };
1060
1061 struct OnodeSpace;
1062 /// an in-memory object
1063 struct Onode {
1064 MEMPOOL_CLASS_HELPERS();
1065
1066 std::atomic_int nref; ///< reference count
1067 Collection *c;
1068 ghobject_t oid;
1069
1070 /// key under PREFIX_OBJ where we are stored
1071 mempool::bluestore_cache_meta::string key;
1072
1073 boost::intrusive::list_member_hook<> lru_item;
1074
1075 bluestore_onode_t onode; ///< metadata stored as value in kv store
1076 bool exists; ///< true if object logically exists
1077 bool cached; ///< Onode is logically in the cache
1078 /// (it can be pinned and hence physically out
1079 /// of it at the moment though)
1080 std::atomic_bool pinned; ///< Onode is pinned
1081 /// (or should be pinned when cached)
1082 ExtentMap extent_map;
1083
1084 // track txc's that have not been committed to kv store (and whose
1085 // effects cannot be read via the kvdb read methods)
1086 std::atomic<int> flushing_count = {0};
1087 std::atomic<int> waiting_count = {0};
1088 /// protect flush_txns
1089 ceph::mutex flush_lock = ceph::make_mutex("BlueStore::Onode::flush_lock");
1090 ceph::condition_variable flush_cond; ///< wait here for uncommitted txns
1091
1092 Onode(Collection *c, const ghobject_t& o,
1093 const mempool::bluestore_cache_meta::string& k)
1094 : nref(0),
1095 c(c),
1096 oid(o),
1097 key(k),
1098 exists(false),
1099 cached(false),
1100 pinned(false),
1101 extent_map(this) {
1102 }
1103 Onode(Collection* c, const ghobject_t& o,
1104 const std::string& k)
1105 : nref(0),
1106 c(c),
1107 oid(o),
1108 key(k),
1109 exists(false),
1110 cached(false),
1111 pinned(false),
1112 extent_map(this) {
1113 }
1114 Onode(Collection* c, const ghobject_t& o,
1115 const char* k)
1116 : nref(0),
1117 c(c),
1118 oid(o),
1119 key(k),
1120 exists(false),
1121 cached(false),
1122 pinned(false),
1123 extent_map(this) {
1124 }
1125
1126 static Onode* decode(
1127 CollectionRef c,
1128 const ghobject_t& oid,
1129 const std::string& key,
1130 const ceph::buffer::list& v);
1131
1132 void dump(ceph::Formatter* f) const;
1133
1134 void flush();
1135 void get();
1136 void put();
1137
1138 inline bool put_cache() {
1139 ceph_assert(!cached);
1140 cached = true;
1141 return !pinned;
1142 }
1143 inline bool pop_cache() {
1144 ceph_assert(cached);
1145 cached = false;
1146 return !pinned;
1147 }
1148
1149 const std::string& get_omap_prefix();
1150 void get_omap_header(std::string *out);
1151 void get_omap_key(const std::string& key, std::string *out);
1152 void rewrite_omap_key(const std::string& old, std::string *out);
1153 void get_omap_tail(std::string *out);
1154 void decode_omap_key(const std::string& key, std::string *user_key);
1155
1156 // Return the offset of an object on disk. This function is intended *only*
1157 // for use with zoned storage devices because in these devices, the objects
1158 // are laid out contiguously on disk, which is not the case in general.
1159 // Also, it should always be called after calling extent_map.fault_range(),
1160 // so that the extent map is loaded.
1161 int64_t zoned_get_ondisk_starting_offset() const {
1162 return extent_map.extent_map.begin()->blob->
1163 get_blob().calc_offset(0, nullptr);
1164 }
1165 };
1166 typedef boost::intrusive_ptr<Onode> OnodeRef;
1167
1168 /// A generic Cache Shard
1169 struct CacheShard {
1170 CephContext *cct;
1171 PerfCounters *logger;
1172
1173 /// protect lru and other structures
1174 ceph::recursive_mutex lock = {
1175 ceph::make_recursive_mutex("BlueStore::CacheShard::lock") };
1176
1177 std::atomic<uint64_t> max = {0};
1178 std::atomic<uint64_t> num = {0};
1179
1180 CacheShard(CephContext* cct) : cct(cct), logger(nullptr) {}
1181 virtual ~CacheShard() {}
1182
1183 void set_max(uint64_t max_) {
1184 max = max_;
1185 }
1186
1187 uint64_t _get_num() {
1188 return num;
1189 }
1190
1191 virtual void _trim_to(uint64_t new_size) = 0;
1192 void _trim() {
1193 if (cct->_conf->objectstore_blackhole) {
1194 // do not trim if we are throwing away IOs a layer down
1195 return;
1196 }
1197 _trim_to(max);
1198 }
1199
1200 void trim() {
1201 std::lock_guard l(lock);
1202 _trim();
1203 }
1204 void flush() {
1205 std::lock_guard l(lock);
1206 // we should not be shutting down after the blackhole is enabled
1207 assert(!cct->_conf->objectstore_blackhole);
1208 _trim_to(0);
1209 }
1210
1211 #ifdef DEBUG_CACHE
1212 virtual void _audit(const char *s) = 0;
1213 #else
1214 void _audit(const char *s) { /* no-op */ }
1215 #endif
1216 };
1217
1218 /// A Generic onode Cache Shard
1219 struct OnodeCacheShard : public CacheShard {
1220 std::atomic<uint64_t> num_pinned = {0};
1221
1222 std::array<std::pair<ghobject_t, ceph::mono_clock::time_point>, 64> dumped_onodes;
1223
1224 virtual void _pin(Onode* o) = 0;
1225 virtual void _unpin(Onode* o) = 0;
1226
1227 public:
1228 OnodeCacheShard(CephContext* cct) : CacheShard(cct) {}
1229 static OnodeCacheShard *create(CephContext* cct, std::string type,
1230 PerfCounters *logger);
1231 virtual void _add(Onode* o, int level) = 0;
1232 virtual void _rm(Onode* o) = 0;
1233 virtual void _unpin_and_rm(Onode* o) = 0;
1234
1235 virtual void move_pinned(OnodeCacheShard *to, Onode *o) = 0;
1236 virtual void add_stats(uint64_t *onodes, uint64_t *pinned_onodes) = 0;
1237 bool empty() {
1238 return _get_num() == 0;
1239 }
1240 };
1241
1242 /// A Generic buffer Cache Shard
1243 struct BufferCacheShard : public CacheShard {
1244 std::atomic<uint64_t> num_extents = {0};
1245 std::atomic<uint64_t> num_blobs = {0};
1246 uint64_t buffer_bytes = 0;
1247
1248 public:
1249 BufferCacheShard(CephContext* cct) : CacheShard(cct) {}
1250 static BufferCacheShard *create(CephContext* cct, std::string type,
1251 PerfCounters *logger);
1252 virtual void _add(Buffer *b, int level, Buffer *near) = 0;
1253 virtual void _rm(Buffer *b) = 0;
1254 virtual void _move(BufferCacheShard *src, Buffer *b) = 0;
1255 virtual void _touch(Buffer *b) = 0;
1256 virtual void _adjust_size(Buffer *b, int64_t delta) = 0;
1257
1258 uint64_t _get_bytes() {
1259 return buffer_bytes;
1260 }
1261
1262 void add_extent() {
1263 ++num_extents;
1264 }
1265 void rm_extent() {
1266 --num_extents;
1267 }
1268
1269 void add_blob() {
1270 ++num_blobs;
1271 }
1272 void rm_blob() {
1273 --num_blobs;
1274 }
1275
1276 virtual void add_stats(uint64_t *extents,
1277 uint64_t *blobs,
1278 uint64_t *buffers,
1279 uint64_t *bytes) = 0;
1280
1281 bool empty() {
1282 std::lock_guard l(lock);
1283 return _get_bytes() == 0;
1284 }
1285 };
1286
1287 struct OnodeSpace {
1288 OnodeCacheShard *cache;
1289
1290 private:
1291 /// forward lookups
1292 mempool::bluestore_cache_meta::unordered_map<ghobject_t,OnodeRef> onode_map;
1293
1294 friend struct Collection; // for split_cache()
1295 friend struct Onode; // for put()
1296 friend struct LruOnodeCacheShard;
1297 void _remove(const ghobject_t& oid);
1298 public:
1299 OnodeSpace(OnodeCacheShard *c) : cache(c) {}
1300 ~OnodeSpace() {
1301 clear();
1302 }
1303
1304 OnodeRef add(const ghobject_t& oid, OnodeRef& o);
1305 OnodeRef lookup(const ghobject_t& o);
1306 void rename(OnodeRef& o, const ghobject_t& old_oid,
1307 const ghobject_t& new_oid,
1308 const mempool::bluestore_cache_meta::string& new_okey);
1309 void clear();
1310 bool empty();
1311
1312 template <int LogLevelV>
1313 void dump(CephContext *cct);
1314
1315 /// return true if f true for any item
1316 bool map_any(std::function<bool(Onode*)> f);
1317 };
1318
1319 class OpSequencer;
1320 using OpSequencerRef = ceph::ref_t<OpSequencer>;
1321
1322 struct Collection : public CollectionImpl {
1323 BlueStore *store;
1324 OpSequencerRef osr;
1325 BufferCacheShard *cache; ///< our cache shard
1326 bluestore_cnode_t cnode;
1327 ceph::shared_mutex lock =
1328 ceph::make_shared_mutex("BlueStore::Collection::lock", true, false);
1329
1330 bool exists;
1331
1332 SharedBlobSet shared_blob_set; ///< open SharedBlobs
1333
1334 // cache onodes on a per-collection basis to avoid lock
1335 // contention.
1336 OnodeSpace onode_map;
1337
1338 //pool options
1339 pool_opts_t pool_opts;
1340 ContextQueue *commit_queue;
1341
1342 OnodeCacheShard* get_onode_cache() const {
1343 return onode_map.cache;
1344 }
1345 OnodeRef get_onode(const ghobject_t& oid, bool create, bool is_createop=false);
1346
1347 // the terminology is confusing here, sorry!
1348 //
1349 // blob_t shared_blob_t
1350 // !shared unused -> open
1351 // shared !loaded -> open + shared
1352 // shared loaded -> open + shared + loaded
1353 //
1354 // i.e.,
1355 // open = SharedBlob is instantiated
1356 // shared = blob_t shared flag is std::set; SharedBlob is hashed.
1357 // loaded = SharedBlob::shared_blob_t is loaded from kv store
1358 void open_shared_blob(uint64_t sbid, BlobRef b);
1359 void load_shared_blob(SharedBlobRef sb);
1360 void make_blob_shared(uint64_t sbid, BlobRef b);
1361 uint64_t make_blob_unshared(SharedBlob *sb);
1362
1363 BlobRef new_blob() {
1364 BlobRef b = new Blob();
1365 b->shared_blob = new SharedBlob(this);
1366 return b;
1367 }
1368
1369 bool contains(const ghobject_t& oid) {
1370 if (cid.is_meta())
1371 return oid.hobj.pool == -1;
1372 spg_t spgid;
1373 if (cid.is_pg(&spgid))
1374 return
1375 spgid.pgid.contains(cnode.bits, oid) &&
1376 oid.shard_id == spgid.shard;
1377 return false;
1378 }
1379
1380 int64_t pool() const {
1381 return cid.pool();
1382 }
1383
1384 void split_cache(Collection *dest);
1385
1386 bool flush_commit(Context *c) override;
1387 void flush() override;
1388 void flush_all_but_last();
1389
1390 Collection(BlueStore *ns, OnodeCacheShard *oc, BufferCacheShard *bc, coll_t c);
1391 };
1392
1393 class OmapIteratorImpl : public ObjectMap::ObjectMapIteratorImpl {
1394 CollectionRef c;
1395 OnodeRef o;
1396 KeyValueDB::Iterator it;
1397 std::string head, tail;
1398
1399 std::string _stringify() const;
1400
1401 public:
1402 OmapIteratorImpl(CollectionRef c, OnodeRef o, KeyValueDB::Iterator it);
1403 int seek_to_first() override;
1404 int upper_bound(const std::string &after) override;
1405 int lower_bound(const std::string &to) override;
1406 bool valid() override;
1407 int next() override;
1408 std::string key() override;
1409 ceph::buffer::list value() override;
1410 std::string tail_key() override {
1411 return tail;
1412 }
1413
1414 int status() override {
1415 return 0;
1416 }
1417 };
1418
1419 struct volatile_statfs{
1420 enum {
1421 STATFS_ALLOCATED = 0,
1422 STATFS_STORED,
1423 STATFS_COMPRESSED_ORIGINAL,
1424 STATFS_COMPRESSED,
1425 STATFS_COMPRESSED_ALLOCATED,
1426 STATFS_LAST
1427 };
1428 int64_t values[STATFS_LAST];
1429 volatile_statfs() {
1430 memset(this, 0, sizeof(volatile_statfs));
1431 }
1432 void reset() {
1433 *this = volatile_statfs();
1434 }
1435 void publish(store_statfs_t* buf) const {
1436 buf->allocated = allocated();
1437 buf->data_stored = stored();
1438 buf->data_compressed = compressed();
1439 buf->data_compressed_original = compressed_original();
1440 buf->data_compressed_allocated = compressed_allocated();
1441 }
1442
1443 volatile_statfs& operator+=(const volatile_statfs& other) {
1444 for (size_t i = 0; i < STATFS_LAST; ++i) {
1445 values[i] += other.values[i];
1446 }
1447 return *this;
1448 }
1449 int64_t& allocated() {
1450 return values[STATFS_ALLOCATED];
1451 }
1452 int64_t& stored() {
1453 return values[STATFS_STORED];
1454 }
1455 int64_t& compressed_original() {
1456 return values[STATFS_COMPRESSED_ORIGINAL];
1457 }
1458 int64_t& compressed() {
1459 return values[STATFS_COMPRESSED];
1460 }
1461 int64_t& compressed_allocated() {
1462 return values[STATFS_COMPRESSED_ALLOCATED];
1463 }
1464 int64_t allocated() const {
1465 return values[STATFS_ALLOCATED];
1466 }
1467 int64_t stored() const {
1468 return values[STATFS_STORED];
1469 }
1470 int64_t compressed_original() const {
1471 return values[STATFS_COMPRESSED_ORIGINAL];
1472 }
1473 int64_t compressed() const {
1474 return values[STATFS_COMPRESSED];
1475 }
1476 int64_t compressed_allocated() const {
1477 return values[STATFS_COMPRESSED_ALLOCATED];
1478 }
1479 volatile_statfs& operator=(const store_statfs_t& st) {
1480 values[STATFS_ALLOCATED] = st.allocated;
1481 values[STATFS_STORED] = st.data_stored;
1482 values[STATFS_COMPRESSED_ORIGINAL] = st.data_compressed_original;
1483 values[STATFS_COMPRESSED] = st.data_compressed;
1484 values[STATFS_COMPRESSED_ALLOCATED] = st.data_compressed_allocated;
1485 return *this;
1486 }
1487 bool is_empty() {
1488 return values[STATFS_ALLOCATED] == 0 &&
1489 values[STATFS_STORED] == 0 &&
1490 values[STATFS_COMPRESSED] == 0 &&
1491 values[STATFS_COMPRESSED_ORIGINAL] == 0 &&
1492 values[STATFS_COMPRESSED_ALLOCATED] == 0;
1493 }
1494 void decode(ceph::buffer::list::const_iterator& it) {
1495 using ceph::decode;
1496 for (size_t i = 0; i < STATFS_LAST; i++) {
1497 decode(values[i], it);
1498 }
1499 }
1500
1501 void encode(ceph::buffer::list& bl) {
1502 using ceph::encode;
1503 for (size_t i = 0; i < STATFS_LAST; i++) {
1504 encode(values[i], bl);
1505 }
1506 }
1507 };
1508
1509 struct TransContext final : public AioContext {
1510 MEMPOOL_CLASS_HELPERS();
1511
1512 typedef enum {
1513 STATE_PREPARE,
1514 STATE_AIO_WAIT,
1515 STATE_IO_DONE,
1516 STATE_KV_QUEUED, // queued for kv_sync_thread submission
1517 STATE_KV_SUBMITTED, // submitted to kv; not yet synced
1518 STATE_KV_DONE,
1519 STATE_DEFERRED_QUEUED, // in deferred_queue (pending or running)
1520 STATE_DEFERRED_CLEANUP, // remove deferred kv record
1521 STATE_DEFERRED_DONE,
1522 STATE_FINISHING,
1523 STATE_DONE,
1524 } state_t;
1525
1526 const char *get_state_name() {
1527 switch (state) {
1528 case STATE_PREPARE: return "prepare";
1529 case STATE_AIO_WAIT: return "aio_wait";
1530 case STATE_IO_DONE: return "io_done";
1531 case STATE_KV_QUEUED: return "kv_queued";
1532 case STATE_KV_SUBMITTED: return "kv_submitted";
1533 case STATE_KV_DONE: return "kv_done";
1534 case STATE_DEFERRED_QUEUED: return "deferred_queued";
1535 case STATE_DEFERRED_CLEANUP: return "deferred_cleanup";
1536 case STATE_DEFERRED_DONE: return "deferred_done";
1537 case STATE_FINISHING: return "finishing";
1538 case STATE_DONE: return "done";
1539 }
1540 return "???";
1541 }
1542
1543 #if defined(WITH_LTTNG)
1544 const char *get_state_latency_name(int state) {
1545 switch (state) {
1546 case l_bluestore_state_prepare_lat: return "prepare";
1547 case l_bluestore_state_aio_wait_lat: return "aio_wait";
1548 case l_bluestore_state_io_done_lat: return "io_done";
1549 case l_bluestore_state_kv_queued_lat: return "kv_queued";
1550 case l_bluestore_state_kv_committing_lat: return "kv_committing";
1551 case l_bluestore_state_kv_done_lat: return "kv_done";
1552 case l_bluestore_state_deferred_queued_lat: return "deferred_queued";
1553 case l_bluestore_state_deferred_cleanup_lat: return "deferred_cleanup";
1554 case l_bluestore_state_finishing_lat: return "finishing";
1555 case l_bluestore_state_done_lat: return "done";
1556 }
1557 return "???";
1558 }
1559 #endif
1560
1561 inline void set_state(state_t s) {
1562 state = s;
1563 #ifdef WITH_BLKIN
1564 if (trace) {
1565 trace.event(get_state_name());
1566 }
1567 #endif
1568 }
1569 inline state_t get_state() {
1570 return state;
1571 }
1572
1573 CollectionRef ch;
1574 OpSequencerRef osr; // this should be ch->osr
1575 boost::intrusive::list_member_hook<> sequencer_item;
1576
1577 uint64_t bytes = 0, ios = 0, cost = 0;
1578
1579 std::set<OnodeRef> onodes; ///< these need to be updated/written
1580 std::set<OnodeRef> modified_objects; ///< objects we modified (and need a ref)
1581
1582 // A map from onode to a vector of object offset. For new objects created
1583 // in the transaction we append the new offset to the vector, for
1584 // overwritten objects we append the negative of the previous ondisk offset
1585 // followed by the new offset, and for truncated objects we append the
1586 // negative of the previous ondisk offset. We need to maintain a vector of
1587 // offsets because *within the same transaction* an object may be truncated
1588 // and then written again, or an object may be overwritten multiple times to
1589 // different zones. See update_cleaning_metadata function for how this map
1590 // is used.
1591 std::map<OnodeRef, std::vector<int64_t>> zoned_onode_to_offset_map;
1592
1593 std::set<SharedBlobRef> shared_blobs; ///< these need to be updated/written
1594 std::set<SharedBlobRef> shared_blobs_written; ///< update these on io completion
1595
1596 KeyValueDB::Transaction t; ///< then we will commit this
1597 std::list<Context*> oncommits; ///< more commit completions
1598 std::list<CollectionRef> removed_collections; ///< colls we removed
1599
1600 boost::intrusive::list_member_hook<> deferred_queue_item;
1601 bluestore_deferred_transaction_t *deferred_txn = nullptr; ///< if any
1602
1603 interval_set<uint64_t> allocated, released;
1604 volatile_statfs statfs_delta; ///< overall store statistics delta
1605 uint64_t osd_pool_id = META_POOL_ID; ///< osd pool id we're operating on
1606
1607 IOContext ioc;
1608 bool had_ios = false; ///< true if we submitted IOs before our kv txn
1609
1610 uint64_t seq = 0;
1611 ceph::mono_clock::time_point start;
1612 ceph::mono_clock::time_point last_stamp;
1613
1614 uint64_t last_nid = 0; ///< if non-zero, highest new nid we allocated
1615 uint64_t last_blobid = 0; ///< if non-zero, highest new blobid we allocated
1616
1617 #if defined(WITH_LTTNG)
1618 bool tracing = false;
1619 #endif
1620
1621 #ifdef WITH_BLKIN
1622 ZTracer::Trace trace;
1623 #endif
1624
1625 explicit TransContext(CephContext* cct, Collection *c, OpSequencer *o,
1626 std::list<Context*> *on_commits)
1627 : ch(c),
1628 osr(o),
1629 ioc(cct, this),
1630 start(ceph::mono_clock::now()) {
1631 last_stamp = start;
1632 if (on_commits) {
1633 oncommits.swap(*on_commits);
1634 }
1635 }
1636 ~TransContext() {
1637 #ifdef WITH_BLKIN
1638 if (trace) {
1639 trace.event("txc destruct");
1640 }
1641 #endif
1642 delete deferred_txn;
1643 }
1644
1645 void write_onode(OnodeRef &o) {
1646 onodes.insert(o);
1647 }
1648 void write_shared_blob(SharedBlobRef &sb) {
1649 shared_blobs.insert(sb);
1650 }
1651 void unshare_blob(SharedBlob *sb) {
1652 shared_blobs.erase(sb);
1653 }
1654
1655 /// note we logically modified object (when onode itself is unmodified)
1656 void note_modified_object(OnodeRef &o) {
1657 // onode itself isn't written, though
1658 modified_objects.insert(o);
1659 }
1660 void note_removed_object(OnodeRef& o) {
1661 modified_objects.insert(o);
1662 onodes.erase(o);
1663 }
1664
1665 void zoned_note_new_object(OnodeRef &o) {
1666 auto [_, ok] = zoned_onode_to_offset_map.emplace(
1667 std::pair<OnodeRef, std::vector<int64_t>>(o, {o->zoned_get_ondisk_starting_offset()}));
1668 ceph_assert(ok);
1669 }
1670
1671 void zoned_note_updated_object(OnodeRef &o, int64_t prev_offset) {
1672 int64_t new_offset = o->zoned_get_ondisk_starting_offset();
1673 auto [it, ok] = zoned_onode_to_offset_map.emplace(
1674 std::pair<OnodeRef, std::vector<int64_t>>(o, {-prev_offset, new_offset}));
1675 if (!ok) {
1676 it->second.push_back(-prev_offset);
1677 it->second.push_back(new_offset);
1678 }
1679 }
1680
1681 void zoned_note_truncated_object(OnodeRef &o, int64_t offset) {
1682 auto [it, ok] = zoned_onode_to_offset_map.emplace(
1683 std::pair<OnodeRef, std::vector<int64_t>>(o, {-offset}));
1684 if (!ok) {
1685 it->second.push_back(-offset);
1686 }
1687 }
1688
1689 void aio_finish(BlueStore *store) override {
1690 store->txc_aio_finish(this);
1691 }
1692 private:
1693 state_t state = STATE_PREPARE;
1694 };
1695
1696 class BlueStoreThrottle {
1697 #if defined(WITH_LTTNG)
1698 const std::chrono::time_point<ceph::mono_clock> time_base = ceph::mono_clock::now();
1699
1700 // Time of last chosen io (microseconds)
1701 std::atomic<uint64_t> previous_emitted_tp_time_mono_mcs = {0};
1702 std::atomic<uint64_t> ios_started_since_last_traced = {0};
1703 std::atomic<uint64_t> ios_completed_since_last_traced = {0};
1704
1705 std::atomic_uint pending_kv_ios = {0};
1706 std::atomic_uint pending_deferred_ios = {0};
1707
1708 // Min period between trace points (microseconds)
1709 std::atomic<uint64_t> trace_period_mcs = {0};
1710
1711 bool should_trace(
1712 uint64_t *started,
1713 uint64_t *completed) {
1714 uint64_t min_period_mcs = trace_period_mcs.load(
1715 std::memory_order_relaxed);
1716
1717 if (min_period_mcs == 0) {
1718 *started = 1;
1719 *completed = ios_completed_since_last_traced.exchange(0);
1720 return true;
1721 } else {
1722 ios_started_since_last_traced++;
1723 auto now_mcs = ceph::to_microseconds<uint64_t>(
1724 ceph::mono_clock::now() - time_base);
1725 uint64_t previous_mcs = previous_emitted_tp_time_mono_mcs;
1726 uint64_t period_mcs = now_mcs - previous_mcs;
1727 if (period_mcs > min_period_mcs) {
1728 if (previous_emitted_tp_time_mono_mcs.compare_exchange_strong(
1729 previous_mcs, now_mcs)) {
1730 // This would be racy at a sufficiently extreme trace rate, but isn't
1731 // worth the overhead of doing it more carefully.
1732 *started = ios_started_since_last_traced.exchange(0);
1733 *completed = ios_completed_since_last_traced.exchange(0);
1734 return true;
1735 }
1736 }
1737 return false;
1738 }
1739 }
1740 #endif
1741
1742 #if defined(WITH_LTTNG)
1743 void emit_initial_tracepoint(
1744 KeyValueDB &db,
1745 TransContext &txc,
1746 ceph::mono_clock::time_point);
1747 #else
1748 void emit_initial_tracepoint(
1749 KeyValueDB &db,
1750 TransContext &txc,
1751 ceph::mono_clock::time_point) {}
1752 #endif
1753
1754 Throttle throttle_bytes; ///< submit to commit
1755 Throttle throttle_deferred_bytes; ///< submit to deferred complete
1756
1757 public:
1758 BlueStoreThrottle(CephContext *cct) :
1759 throttle_bytes(cct, "bluestore_throttle_bytes", 0),
1760 throttle_deferred_bytes(cct, "bluestore_throttle_deferred_bytes", 0)
1761 {
1762 reset_throttle(cct->_conf);
1763 }
1764
1765 #if defined(WITH_LTTNG)
1766 void complete_kv(TransContext &txc);
1767 void complete(TransContext &txc);
1768 #else
1769 void complete_kv(TransContext &txc) {}
1770 void complete(TransContext &txc) {}
1771 #endif
1772
1773 ceph::mono_clock::duration log_state_latency(
1774 TransContext &txc, PerfCounters *logger, int state);
1775 bool try_start_transaction(
1776 KeyValueDB &db,
1777 TransContext &txc,
1778 ceph::mono_clock::time_point);
1779 void finish_start_transaction(
1780 KeyValueDB &db,
1781 TransContext &txc,
1782 ceph::mono_clock::time_point);
1783 void release_kv_throttle(uint64_t cost) {
1784 throttle_bytes.put(cost);
1785 }
1786 void release_deferred_throttle(uint64_t cost) {
1787 throttle_deferred_bytes.put(cost);
1788 }
1789 bool should_submit_deferred() {
1790 return throttle_deferred_bytes.past_midpoint();
1791 }
1792 void reset_throttle(const ConfigProxy &conf) {
1793 throttle_bytes.reset_max(conf->bluestore_throttle_bytes);
1794 throttle_deferred_bytes.reset_max(
1795 conf->bluestore_throttle_bytes +
1796 conf->bluestore_throttle_deferred_bytes);
1797 #if defined(WITH_LTTNG)
1798 double rate = conf.get_val<double>("bluestore_throttle_trace_rate");
1799 trace_period_mcs = rate > 0 ? floor((1/rate) * 1000000.0) : 0;
1800 #endif
1801 }
1802 } throttle;
1803
1804 typedef boost::intrusive::list<
1805 TransContext,
1806 boost::intrusive::member_hook<
1807 TransContext,
1808 boost::intrusive::list_member_hook<>,
1809 &TransContext::deferred_queue_item> > deferred_queue_t;
1810
1811 struct DeferredBatch final : public AioContext {
1812 OpSequencer *osr;
1813 struct deferred_io {
1814 ceph::buffer::list bl; ///< data
1815 uint64_t seq; ///< deferred transaction seq
1816 };
1817 std::map<uint64_t,deferred_io> iomap; ///< map of ios in this batch
1818 deferred_queue_t txcs; ///< txcs in this batch
1819 IOContext ioc; ///< our aios
1820 /// bytes of pending io for each deferred seq (may be 0)
1821 std::map<uint64_t,int> seq_bytes;
1822
1823 void _discard(CephContext *cct, uint64_t offset, uint64_t length);
1824 void _audit(CephContext *cct);
1825
1826 DeferredBatch(CephContext *cct, OpSequencer *osr)
1827 : osr(osr), ioc(cct, this) {}
1828
1829 /// prepare a write
1830 void prepare_write(CephContext *cct,
1831 uint64_t seq, uint64_t offset, uint64_t length,
1832 ceph::buffer::list::const_iterator& p);
1833
1834 void aio_finish(BlueStore *store) override {
1835 store->_deferred_aio_finish(osr);
1836 }
1837 };
1838
1839 class OpSequencer : public RefCountedObject {
1840 public:
1841 ceph::mutex qlock = ceph::make_mutex("BlueStore::OpSequencer::qlock");
1842 ceph::condition_variable qcond;
1843 typedef boost::intrusive::list<
1844 TransContext,
1845 boost::intrusive::member_hook<
1846 TransContext,
1847 boost::intrusive::list_member_hook<>,
1848 &TransContext::sequencer_item> > q_list_t;
1849 q_list_t q; ///< transactions
1850
1851 boost::intrusive::list_member_hook<> deferred_osr_queue_item;
1852
1853 DeferredBatch *deferred_running = nullptr;
1854 DeferredBatch *deferred_pending = nullptr;
1855
1856 ceph::mutex deferred_lock = ceph::make_mutex("BlueStore::OpSequencer::deferred_lock");
1857
1858 BlueStore *store;
1859 coll_t cid;
1860
1861 uint64_t last_seq = 0;
1862
1863 std::atomic_int txc_with_unstable_io = {0}; ///< num txcs with unstable io
1864
1865 std::atomic_int kv_committing_serially = {0};
1866
1867 std::atomic_int kv_submitted_waiters = {0};
1868
1869 std::atomic_bool zombie = {false}; ///< in zombie_osr std::set (collection going away)
1870
1871 const uint32_t sequencer_id;
1872
1873 uint32_t get_sequencer_id() const {
1874 return sequencer_id;
1875 }
1876
1877 void queue_new(TransContext *txc) {
1878 std::lock_guard l(qlock);
1879 txc->seq = ++last_seq;
1880 q.push_back(*txc);
1881 }
1882
1883 void drain() {
1884 std::unique_lock l(qlock);
1885 while (!q.empty())
1886 qcond.wait(l);
1887 }
1888
1889 void drain_preceding(TransContext *txc) {
1890 std::unique_lock l(qlock);
1891 while (&q.front() != txc)
1892 qcond.wait(l);
1893 }
1894
1895 bool _is_all_kv_submitted() {
1896 // caller must hold qlock & q.empty() must not empty
1897 ceph_assert(!q.empty());
1898 TransContext *txc = &q.back();
1899 if (txc->get_state() >= TransContext::STATE_KV_SUBMITTED) {
1900 return true;
1901 }
1902 return false;
1903 }
1904
1905 void flush() {
1906 std::unique_lock l(qlock);
1907 while (true) {
1908 // std::set flag before the check because the condition
1909 // may become true outside qlock, and we need to make
1910 // sure those threads see waiters and signal qcond.
1911 ++kv_submitted_waiters;
1912 if (q.empty() || _is_all_kv_submitted()) {
1913 --kv_submitted_waiters;
1914 return;
1915 }
1916 qcond.wait(l);
1917 --kv_submitted_waiters;
1918 }
1919 }
1920
1921 void flush_all_but_last() {
1922 std::unique_lock l(qlock);
1923 assert (q.size() >= 1);
1924 while (true) {
1925 // std::set flag before the check because the condition
1926 // may become true outside qlock, and we need to make
1927 // sure those threads see waiters and signal qcond.
1928 ++kv_submitted_waiters;
1929 if (q.size() <= 1) {
1930 --kv_submitted_waiters;
1931 return;
1932 } else {
1933 auto it = q.rbegin();
1934 it++;
1935 if (it->get_state() >= TransContext::STATE_KV_SUBMITTED) {
1936 --kv_submitted_waiters;
1937 return;
1938 }
1939 }
1940 qcond.wait(l);
1941 --kv_submitted_waiters;
1942 }
1943 }
1944
1945 bool flush_commit(Context *c) {
1946 std::lock_guard l(qlock);
1947 if (q.empty()) {
1948 return true;
1949 }
1950 TransContext *txc = &q.back();
1951 if (txc->get_state() >= TransContext::STATE_KV_DONE) {
1952 return true;
1953 }
1954 txc->oncommits.push_back(c);
1955 return false;
1956 }
1957 private:
1958 FRIEND_MAKE_REF(OpSequencer);
1959 OpSequencer(BlueStore *store, uint32_t sequencer_id, const coll_t& c)
1960 : RefCountedObject(store->cct),
1961 store(store), cid(c), sequencer_id(sequencer_id) {
1962 }
1963 ~OpSequencer() {
1964 ceph_assert(q.empty());
1965 }
1966 };
1967
1968 typedef boost::intrusive::list<
1969 OpSequencer,
1970 boost::intrusive::member_hook<
1971 OpSequencer,
1972 boost::intrusive::list_member_hook<>,
1973 &OpSequencer::deferred_osr_queue_item> > deferred_osr_queue_t;
1974
1975 struct KVSyncThread : public Thread {
1976 BlueStore *store;
1977 explicit KVSyncThread(BlueStore *s) : store(s) {}
1978 void *entry() override {
1979 store->_kv_sync_thread();
1980 return NULL;
1981 }
1982 };
1983 struct KVFinalizeThread : public Thread {
1984 BlueStore *store;
1985 explicit KVFinalizeThread(BlueStore *s) : store(s) {}
1986 void *entry() override {
1987 store->_kv_finalize_thread();
1988 return NULL;
1989 }
1990 };
1991 struct ZonedCleanerThread : public Thread {
1992 BlueStore *store;
1993 explicit ZonedCleanerThread(BlueStore *s) : store(s) {}
1994 void *entry() override {
1995 store->_zoned_cleaner_thread();
1996 return nullptr;
1997 }
1998 };
1999
2000 struct DBHistogram {
2001 struct value_dist {
2002 uint64_t count;
2003 uint32_t max_len;
2004 };
2005
2006 struct key_dist {
2007 uint64_t count;
2008 uint32_t max_len;
2009 std::map<int, struct value_dist> val_map; ///< slab id to count, max length of value and key
2010 };
2011
2012 std::map<std::string, std::map<int, struct key_dist> > key_hist;
2013 std::map<int, uint64_t> value_hist;
2014 int get_key_slab(size_t sz);
2015 std::string get_key_slab_to_range(int slab);
2016 int get_value_slab(size_t sz);
2017 std::string get_value_slab_to_range(int slab);
2018 void update_hist_entry(std::map<std::string, std::map<int, struct key_dist> > &key_hist,
2019 const std::string &prefix, size_t key_size, size_t value_size);
2020 void dump(ceph::Formatter *f);
2021 };
2022
2023 struct BigDeferredWriteContext {
2024 uint64_t off = 0; // original logical offset
2025 uint32_t b_off = 0; // blob relative offset
2026 uint32_t used = 0;
2027 uint64_t head_read = 0;
2028 uint64_t tail_read = 0;
2029 BlobRef blob_ref;
2030 uint64_t blob_start = 0;
2031 PExtentVector res_extents;
2032
2033 inline uint64_t blob_aligned_len() const {
2034 return used + head_read + tail_read;
2035 }
2036
2037 bool can_defer(BlueStore::extent_map_t::iterator ep,
2038 uint64_t prefer_deferred_size,
2039 uint64_t block_size,
2040 uint64_t offset,
2041 uint64_t l);
2042 bool apply_defer();
2043 };
2044
2045 // --------------------------------------------------------
2046 // members
2047 private:
2048 BlueFS *bluefs = nullptr;
2049 bluefs_layout_t bluefs_layout;
2050 utime_t next_dump_on_bluefs_alloc_failure;
2051
2052 KeyValueDB *db = nullptr;
2053 BlockDevice *bdev = nullptr;
2054 std::string freelist_type;
2055 FreelistManager *fm = nullptr;
2056
2057 bluefs_shared_alloc_context_t shared_alloc;
2058
2059 uuid_d fsid;
2060 int path_fd = -1; ///< open handle to $path
2061 int fsid_fd = -1; ///< open handle (locked) to $path/fsid
2062 bool mounted = false;
2063
2064 ceph::shared_mutex coll_lock = ceph::make_shared_mutex("BlueStore::coll_lock"); ///< rwlock to protect coll_map
2065 mempool::bluestore_cache_other::unordered_map<coll_t, CollectionRef> coll_map;
2066 bool collections_had_errors = false;
2067 std::map<coll_t,CollectionRef> new_coll_map;
2068
2069 std::vector<OnodeCacheShard*> onode_cache_shards;
2070 std::vector<BufferCacheShard*> buffer_cache_shards;
2071
2072 /// protect zombie_osr_set
2073 ceph::mutex zombie_osr_lock = ceph::make_mutex("BlueStore::zombie_osr_lock");
2074 uint32_t next_sequencer_id = 0;
2075 std::map<coll_t,OpSequencerRef> zombie_osr_set; ///< std::set of OpSequencers for deleted collections
2076
2077 std::atomic<uint64_t> nid_last = {0};
2078 std::atomic<uint64_t> nid_max = {0};
2079 std::atomic<uint64_t> blobid_last = {0};
2080 std::atomic<uint64_t> blobid_max = {0};
2081
2082 ceph::mutex deferred_lock = ceph::make_mutex("BlueStore::deferred_lock");
2083 ceph::mutex atomic_alloc_and_submit_lock =
2084 ceph::make_mutex("BlueStore::atomic_alloc_and_submit_lock");
2085 std::atomic<uint64_t> deferred_seq = {0};
2086 deferred_osr_queue_t deferred_queue; ///< osr's with deferred io pending
2087 std::atomic_int deferred_queue_size = {0}; ///< num txc's queued across all osrs
2088 std::atomic_int deferred_aggressive = {0}; ///< aggressive wakeup of kv thread
2089 Finisher finisher;
2090 utime_t deferred_last_submitted = utime_t();
2091
2092 KVSyncThread kv_sync_thread;
2093 ceph::mutex kv_lock = ceph::make_mutex("BlueStore::kv_lock");
2094 ceph::condition_variable kv_cond;
2095 bool _kv_only = false;
2096 bool kv_sync_started = false;
2097 bool kv_stop = false;
2098 bool kv_finalize_started = false;
2099 bool kv_finalize_stop = false;
2100 std::deque<TransContext*> kv_queue; ///< ready, already submitted
2101 std::deque<TransContext*> kv_queue_unsubmitted; ///< ready, need submit by kv thread
2102 std::deque<TransContext*> kv_committing; ///< currently syncing
2103 std::deque<DeferredBatch*> deferred_done_queue; ///< deferred ios done
2104 bool kv_sync_in_progress = false;
2105
2106 KVFinalizeThread kv_finalize_thread;
2107 ceph::mutex kv_finalize_lock = ceph::make_mutex("BlueStore::kv_finalize_lock");
2108 ceph::condition_variable kv_finalize_cond;
2109 std::deque<TransContext*> kv_committing_to_finalize; ///< pending finalization
2110 std::deque<DeferredBatch*> deferred_stable_to_finalize; ///< pending finalization
2111 bool kv_finalize_in_progress = false;
2112
2113 ZonedCleanerThread zoned_cleaner_thread;
2114 ceph::mutex zoned_cleaner_lock = ceph::make_mutex("BlueStore::zoned_cleaner_lock");
2115 ceph::condition_variable zoned_cleaner_cond;
2116 bool zoned_cleaner_started = false;
2117 bool zoned_cleaner_stop = false;
2118 std::deque<uint64_t> zoned_cleaner_queue;
2119
2120 PerfCounters *logger = nullptr;
2121
2122 std::list<CollectionRef> removed_collections;
2123
2124 ceph::shared_mutex debug_read_error_lock =
2125 ceph::make_shared_mutex("BlueStore::debug_read_error_lock");
2126 std::set<ghobject_t> debug_data_error_objects;
2127 std::set<ghobject_t> debug_mdata_error_objects;
2128
2129 std::atomic<int> csum_type = {Checksummer::CSUM_CRC32C};
2130
2131 uint64_t block_size = 0; ///< block size of block device (power of 2)
2132 uint64_t block_mask = 0; ///< mask to get just the block offset
2133 size_t block_size_order = 0; ///< bits to shift to get block size
2134
2135 uint64_t min_alloc_size; ///< minimum allocation unit (power of 2)
2136 ///< bits for min_alloc_size
2137 uint8_t min_alloc_size_order = 0;
2138 static_assert(std::numeric_limits<uint8_t>::max() >
2139 std::numeric_limits<decltype(min_alloc_size)>::digits,
2140 "not enough bits for min_alloc_size");
2141
2142 enum {
2143 // Please preserve the order since it's DB persistent
2144 OMAP_BULK = 0,
2145 OMAP_PER_POOL = 1,
2146 OMAP_PER_PG = 2,
2147 } per_pool_omap = OMAP_BULK;
2148
2149 ///< maximum allocation unit (power of 2)
2150 std::atomic<uint64_t> max_alloc_size = {0};
2151
2152 ///< number threshold for forced deferred writes
2153 std::atomic<int> deferred_batch_ops = {0};
2154
2155 ///< size threshold for forced deferred writes
2156 std::atomic<uint64_t> prefer_deferred_size = {0};
2157
2158 ///< approx cost per io, in bytes
2159 std::atomic<uint64_t> throttle_cost_per_io = {0};
2160
2161 std::atomic<Compressor::CompressionMode> comp_mode =
2162 {Compressor::COMP_NONE}; ///< compression mode
2163 CompressorRef compressor;
2164 std::atomic<uint64_t> comp_min_blob_size = {0};
2165 std::atomic<uint64_t> comp_max_blob_size = {0};
2166
2167 std::atomic<uint64_t> max_blob_size = {0}; ///< maximum blob size
2168
2169 uint64_t kv_ios = 0;
2170 uint64_t kv_throttle_costs = 0;
2171
2172 // cache trim control
2173 uint64_t cache_size = 0; ///< total cache size
2174 double cache_meta_ratio = 0; ///< cache ratio dedicated to metadata
2175 double cache_kv_ratio = 0; ///< cache ratio dedicated to kv (e.g., rocksdb)
2176 double cache_kv_onode_ratio = 0; ///< cache ratio dedicated to kv onodes (e.g., rocksdb onode CF)
2177 double cache_data_ratio = 0; ///< cache ratio dedicated to object data
2178 bool cache_autotune = false; ///< cache autotune setting
2179 double cache_autotune_interval = 0; ///< time to wait between cache rebalancing
2180 uint64_t osd_memory_target = 0; ///< OSD memory target when autotuning cache
2181 uint64_t osd_memory_base = 0; ///< OSD base memory when autotuning cache
2182 double osd_memory_expected_fragmentation = 0; ///< expected memory fragmentation
2183 uint64_t osd_memory_cache_min = 0; ///< Min memory to assign when autotuning cache
2184 double osd_memory_cache_resize_interval = 0; ///< Time to wait between cache resizing
2185 double max_defer_interval = 0; ///< Time to wait between last deferred submit
2186 std::atomic<uint32_t> config_changed = {0}; ///< Counter to determine if there is a configuration change.
2187
2188 typedef std::map<uint64_t, volatile_statfs> osd_pools_map;
2189
2190 ceph::mutex vstatfs_lock = ceph::make_mutex("BlueStore::vstatfs_lock");
2191 volatile_statfs vstatfs;
2192 osd_pools_map osd_pools; // protected by vstatfs_lock as well
2193
2194 bool per_pool_stat_collection = true;
2195
2196 struct MempoolThread : public Thread {
2197 public:
2198 BlueStore *store;
2199
2200 ceph::condition_variable cond;
2201 ceph::mutex lock = ceph::make_mutex("BlueStore::MempoolThread::lock");
2202 bool stop = false;
2203 std::shared_ptr<PriorityCache::PriCache> binned_kv_cache = nullptr;
2204 std::shared_ptr<PriorityCache::PriCache> binned_kv_onode_cache = nullptr;
2205 std::shared_ptr<PriorityCache::Manager> pcm = nullptr;
2206
2207 struct MempoolCache : public PriorityCache::PriCache {
2208 BlueStore *store;
2209 int64_t cache_bytes[PriorityCache::Priority::LAST+1] = {0};
2210 int64_t committed_bytes = 0;
2211 double cache_ratio = 0;
2212
2213 MempoolCache(BlueStore *s) : store(s) {};
2214
2215 virtual uint64_t _get_used_bytes() const = 0;
2216
2217 virtual int64_t request_cache_bytes(
2218 PriorityCache::Priority pri, uint64_t total_cache) const {
2219 int64_t assigned = get_cache_bytes(pri);
2220
2221 switch (pri) {
2222 // All cache items are currently shoved into the PRI1 priority
2223 case PriorityCache::Priority::PRI1:
2224 {
2225 int64_t request = _get_used_bytes();
2226 return(request > assigned) ? request - assigned : 0;
2227 }
2228 default:
2229 break;
2230 }
2231 return -EOPNOTSUPP;
2232 }
2233
2234 virtual int64_t get_cache_bytes(PriorityCache::Priority pri) const {
2235 return cache_bytes[pri];
2236 }
2237 virtual int64_t get_cache_bytes() const {
2238 int64_t total = 0;
2239
2240 for (int i = 0; i < PriorityCache::Priority::LAST + 1; i++) {
2241 PriorityCache::Priority pri = static_cast<PriorityCache::Priority>(i);
2242 total += get_cache_bytes(pri);
2243 }
2244 return total;
2245 }
2246 virtual void set_cache_bytes(PriorityCache::Priority pri, int64_t bytes) {
2247 cache_bytes[pri] = bytes;
2248 }
2249 virtual void add_cache_bytes(PriorityCache::Priority pri, int64_t bytes) {
2250 cache_bytes[pri] += bytes;
2251 }
2252 virtual int64_t commit_cache_size(uint64_t total_cache) {
2253 committed_bytes = PriorityCache::get_chunk(
2254 get_cache_bytes(), total_cache);
2255 return committed_bytes;
2256 }
2257 virtual int64_t get_committed_size() const {
2258 return committed_bytes;
2259 }
2260 virtual double get_cache_ratio() const {
2261 return cache_ratio;
2262 }
2263 virtual void set_cache_ratio(double ratio) {
2264 cache_ratio = ratio;
2265 }
2266 virtual std::string get_cache_name() const = 0;
2267 };
2268
2269 struct MetaCache : public MempoolCache {
2270 MetaCache(BlueStore *s) : MempoolCache(s) {};
2271
2272 virtual uint64_t _get_used_bytes() const {
2273 return mempool::bluestore_Buffer::allocated_bytes() +
2274 mempool::bluestore_Blob::allocated_bytes() +
2275 mempool::bluestore_Extent::allocated_bytes() +
2276 mempool::bluestore_cache_meta::allocated_bytes() +
2277 mempool::bluestore_cache_other::allocated_bytes() +
2278 mempool::bluestore_cache_onode::allocated_bytes() +
2279 mempool::bluestore_SharedBlob::allocated_bytes() +
2280 mempool::bluestore_inline_bl::allocated_bytes();
2281 }
2282
2283 virtual std::string get_cache_name() const {
2284 return "BlueStore Meta Cache";
2285 }
2286
2287 uint64_t _get_num_onodes() const {
2288 uint64_t onode_num =
2289 mempool::bluestore_cache_onode::allocated_items();
2290 return (2 > onode_num) ? 2 : onode_num;
2291 }
2292
2293 double get_bytes_per_onode() const {
2294 return (double)_get_used_bytes() / (double)_get_num_onodes();
2295 }
2296 };
2297 std::shared_ptr<MetaCache> meta_cache;
2298
2299 struct DataCache : public MempoolCache {
2300 DataCache(BlueStore *s) : MempoolCache(s) {};
2301
2302 virtual uint64_t _get_used_bytes() const {
2303 uint64_t bytes = 0;
2304 for (auto i : store->buffer_cache_shards) {
2305 bytes += i->_get_bytes();
2306 }
2307 return bytes;
2308 }
2309 virtual std::string get_cache_name() const {
2310 return "BlueStore Data Cache";
2311 }
2312 };
2313 std::shared_ptr<DataCache> data_cache;
2314
2315 public:
2316 explicit MempoolThread(BlueStore *s)
2317 : store(s),
2318 meta_cache(new MetaCache(s)),
2319 data_cache(new DataCache(s)) {}
2320
2321 void *entry() override;
2322 void init() {
2323 ceph_assert(stop == false);
2324 create("bstore_mempool");
2325 }
2326 void shutdown() {
2327 lock.lock();
2328 stop = true;
2329 cond.notify_all();
2330 lock.unlock();
2331 join();
2332 }
2333
2334 private:
2335 void _adjust_cache_settings();
2336 void _update_cache_settings();
2337 void _resize_shards(bool interval_stats);
2338 } mempool_thread;
2339
2340 #ifdef WITH_BLKIN
2341 ZTracer::Endpoint trace_endpoint {"0.0.0.0", 0, "BlueStore"};
2342 #endif
2343
2344 // --------------------------------------------------------
2345 // private methods
2346
2347 void _init_logger();
2348 void _shutdown_logger();
2349 int _reload_logger();
2350
2351 int _open_path();
2352 void _close_path();
2353 int _open_fsid(bool create);
2354 int _lock_fsid();
2355 int _read_fsid(uuid_d *f);
2356 int _write_fsid();
2357 void _close_fsid();
2358 void _set_alloc_sizes();
2359 void _set_blob_size();
2360 void _set_finisher_num();
2361 void _set_per_pool_omap();
2362 void _update_osd_memory_options();
2363
2364 int _open_bdev(bool create);
2365 // Verifies if disk space is enough for reserved + min bluefs
2366 // and alters the latter if needed.
2367 // Depends on min_alloc_size hence should be called after
2368 // its initialization (and outside of _open_bdev)
2369 void _validate_bdev();
2370 void _close_bdev();
2371
2372 int _minimal_open_bluefs(bool create);
2373 void _minimal_close_bluefs();
2374 int _open_bluefs(bool create, bool read_only);
2375 void _close_bluefs(bool cold_close);
2376
2377 int _is_bluefs(bool create, bool* ret);
2378 /*
2379 * opens both DB and dependant super_meta, FreelistManager and allocator
2380 * in the proper order
2381 */
2382 int _open_db_and_around(bool read_only, bool to_repair = false);
2383 void _close_db_and_around(bool read_only);
2384
2385 int _prepare_db_environment(bool create, bool read_only,
2386 std::string* kv_dir, std::string* kv_backend);
2387
2388 /*
2389 * @warning to_repair_db means that we open this db to repair it, will not
2390 * hold the rocksdb's file lock.
2391 */
2392 int _open_db(bool create,
2393 bool to_repair_db=false,
2394 bool read_only = false);
2395 void _close_db(bool read_only);
2396 int _open_fm(KeyValueDB::Transaction t, bool read_only);
2397 void _close_fm();
2398 int _write_out_fm_meta(uint64_t target_size);
2399 int _create_alloc();
2400 int _init_alloc();
2401 void _close_alloc();
2402 int _open_collections();
2403 void _fsck_collections(int64_t* errors);
2404 void _close_collections();
2405
2406 int _setup_block_symlink_or_file(std::string name, std::string path, uint64_t size,
2407 bool create);
2408
2409 // Functions related to zoned storage.
2410 uint64_t _zoned_piggyback_device_parameters_onto(uint64_t min_alloc_size);
2411 int _zoned_check_config_settings();
2412 void _zoned_update_cleaning_metadata(TransContext *txc);
2413 std::string _zoned_get_prefix(uint64_t offset);
2414
2415 public:
2416 utime_t get_deferred_last_submitted() {
2417 std::lock_guard l(deferred_lock);
2418 return deferred_last_submitted;
2419 }
2420
2421 static int _write_bdev_label(CephContext* cct,
2422 std::string path, bluestore_bdev_label_t label);
2423 static int _read_bdev_label(CephContext* cct, std::string path,
2424 bluestore_bdev_label_t *label);
2425 private:
2426 int _check_or_set_bdev_label(std::string path, uint64_t size, std::string desc,
2427 bool create);
2428 int _set_bdev_label_size(const string& path, uint64_t size);
2429
2430 int _open_super_meta();
2431
2432 void _open_statfs();
2433 void _get_statfs_overall(struct store_statfs_t *buf);
2434
2435 void _dump_alloc_on_failure();
2436
2437 CollectionRef _get_collection(const coll_t& cid);
2438 void _queue_reap_collection(CollectionRef& c);
2439 void _reap_collections();
2440 void _update_cache_logger();
2441
2442 void _assign_nid(TransContext *txc, OnodeRef o);
2443 uint64_t _assign_blobid(TransContext *txc);
2444
2445 template <int LogLevelV>
2446 friend void _dump_onode(CephContext *cct, const Onode& o);
2447 template <int LogLevelV>
2448 friend void _dump_extent_map(CephContext *cct, const ExtentMap& em);
2449 template <int LogLevelV>
2450 friend void _dump_transaction(CephContext *cct, Transaction *t);
2451
2452 TransContext *_txc_create(Collection *c, OpSequencer *osr,
2453 std::list<Context*> *on_commits,
2454 TrackedOpRef osd_op=TrackedOpRef());
2455 void _txc_update_store_statfs(TransContext *txc);
2456 void _txc_add_transaction(TransContext *txc, Transaction *t);
2457 void _txc_calc_cost(TransContext *txc);
2458 void _txc_write_nodes(TransContext *txc, KeyValueDB::Transaction t);
2459 void _txc_state_proc(TransContext *txc);
2460 void _txc_aio_submit(TransContext *txc);
2461 public:
2462 void txc_aio_finish(void *p) {
2463 _txc_state_proc(static_cast<TransContext*>(p));
2464 }
2465 private:
2466 void _txc_finish_io(TransContext *txc);
2467 void _txc_finalize_kv(TransContext *txc, KeyValueDB::Transaction t);
2468 void _txc_apply_kv(TransContext *txc, bool sync_submit_transaction);
2469 void _txc_committed_kv(TransContext *txc);
2470 void _txc_finish(TransContext *txc);
2471 void _txc_release_alloc(TransContext *txc);
2472
2473 void _osr_attach(Collection *c);
2474 void _osr_register_zombie(OpSequencer *osr);
2475 void _osr_drain(OpSequencer *osr);
2476 void _osr_drain_preceding(TransContext *txc);
2477 void _osr_drain_all();
2478
2479 void _kv_start();
2480 void _kv_stop();
2481 void _kv_sync_thread();
2482 void _kv_finalize_thread();
2483
2484 void _zoned_cleaner_start();
2485 void _zoned_cleaner_stop();
2486 void _zoned_cleaner_thread();
2487 void _zoned_clean_zone(uint64_t zone_num);
2488
2489 bluestore_deferred_op_t *_get_deferred_op(TransContext *txc);
2490 void _deferred_queue(TransContext *txc);
2491 public:
2492 void deferred_try_submit();
2493 private:
2494 void _deferred_submit_unlock(OpSequencer *osr);
2495 void _deferred_aio_finish(OpSequencer *osr);
2496 int _deferred_replay();
2497
2498 public:
2499 using mempool_dynamic_bitset =
2500 boost::dynamic_bitset<uint64_t,
2501 mempool::bluestore_fsck::pool_allocator<uint64_t>>;
2502 using per_pool_statfs =
2503 mempool::bluestore_fsck::map<uint64_t, store_statfs_t>;
2504
2505 enum FSCKDepth {
2506 FSCK_REGULAR,
2507 FSCK_DEEP,
2508 FSCK_SHALLOW
2509 };
2510 enum {
2511 MAX_FSCK_ERROR_LINES = 100,
2512 };
2513
2514 private:
2515 int _fsck_check_extents(
2516 const coll_t& cid,
2517 const ghobject_t& oid,
2518 const PExtentVector& extents,
2519 bool compressed,
2520 mempool_dynamic_bitset &used_blocks,
2521 uint64_t granularity,
2522 BlueStoreRepairer* repairer,
2523 store_statfs_t& expected_statfs,
2524 FSCKDepth depth);
2525
2526 void _fsck_check_pool_statfs(
2527 per_pool_statfs& expected_pool_statfs,
2528 int64_t& errors,
2529 int64_t &warnings,
2530 BlueStoreRepairer* repairer);
2531
2532 int _fsck(FSCKDepth depth, bool repair);
2533 int _fsck_on_open(BlueStore::FSCKDepth depth, bool repair);
2534
2535 void _buffer_cache_write(
2536 TransContext *txc,
2537 BlobRef b,
2538 uint64_t offset,
2539 ceph::buffer::list& bl,
2540 unsigned flags) {
2541 b->shared_blob->bc.write(b->shared_blob->get_cache(), txc->seq, offset, bl,
2542 flags);
2543 txc->shared_blobs_written.insert(b->shared_blob);
2544 }
2545
2546 int _collection_list(
2547 Collection *c, const ghobject_t& start, const ghobject_t& end,
2548 int max, bool legacy, std::vector<ghobject_t> *ls, ghobject_t *next);
2549
2550 template <typename T, typename F>
2551 T select_option(const std::string& opt_name, T val1, F f) {
2552 //NB: opt_name reserved for future use
2553 boost::optional<T> val2 = f();
2554 if (val2) {
2555 return *val2;
2556 }
2557 return val1;
2558 }
2559
2560 void _apply_padding(uint64_t head_pad,
2561 uint64_t tail_pad,
2562 ceph::buffer::list& padded);
2563
2564 void _record_onode(OnodeRef &o, KeyValueDB::Transaction &txn);
2565
2566 // -- ondisk version ---
2567 public:
2568 const int32_t latest_ondisk_format = 4; ///< our version
2569 const int32_t min_readable_ondisk_format = 1; ///< what we can read
2570 const int32_t min_compat_ondisk_format = 3; ///< who can read us
2571
2572 private:
2573 int32_t ondisk_format = 0; ///< value detected on mount
2574
2575 int _upgrade_super(); ///< upgrade (called during open_super)
2576 uint64_t _get_ondisk_reserved() const;
2577 void _prepare_ondisk_format_super(KeyValueDB::Transaction& t);
2578
2579 // --- public interface ---
2580 public:
2581 BlueStore(CephContext *cct, const std::string& path);
2582 BlueStore(CephContext *cct, const std::string& path, uint64_t min_alloc_size); // Ctor for UT only
2583 ~BlueStore() override;
2584
2585 std::string get_type() override {
2586 return "bluestore";
2587 }
2588
2589 bool needs_journal() override { return false; };
2590 bool wants_journal() override { return false; };
2591 bool allows_journal() override { return false; };
2592
2593 uint64_t get_min_alloc_size() const override {
2594 return min_alloc_size;
2595 }
2596
2597 int get_devices(std::set<std::string> *ls) override;
2598
2599 bool is_rotational() override;
2600 bool is_journal_rotational() override;
2601
2602 std::string get_default_device_class() override {
2603 std::string device_class;
2604 std::map<std::string, std::string> metadata;
2605 collect_metadata(&metadata);
2606 auto it = metadata.find("bluestore_bdev_type");
2607 if (it != metadata.end()) {
2608 device_class = it->second;
2609 }
2610 return device_class;
2611 }
2612
2613 int get_numa_node(
2614 int *numa_node,
2615 std::set<int> *nodes,
2616 std::set<std::string> *failed) override;
2617
2618 static int get_block_device_fsid(CephContext* cct, const std::string& path,
2619 uuid_d *fsid);
2620
2621 bool test_mount_in_use() override;
2622
2623 private:
2624 int _mount();
2625 public:
2626 int mount() override {
2627 return _mount();
2628 }
2629 int umount() override;
2630
2631 int open_db_environment(KeyValueDB **pdb, bool to_repair);
2632 int close_db_environment();
2633
2634 int write_meta(const std::string& key, const std::string& value) override;
2635 int read_meta(const std::string& key, std::string *value) override;
2636
2637 // open in read-only and limited mode
2638 int cold_open();
2639 int cold_close();
2640
2641 int fsck(bool deep) override {
2642 return _fsck(deep ? FSCK_DEEP : FSCK_REGULAR, false);
2643 }
2644 int repair(bool deep) override {
2645 return _fsck(deep ? FSCK_DEEP : FSCK_REGULAR, true);
2646 }
2647 int quick_fix() override {
2648 return _fsck(FSCK_SHALLOW, true);
2649 }
2650
2651 void set_cache_shards(unsigned num) override;
2652 void dump_cache_stats(ceph::Formatter *f) override {
2653 int onode_count = 0, buffers_bytes = 0;
2654 for (auto i: onode_cache_shards) {
2655 onode_count += i->_get_num();
2656 }
2657 for (auto i: buffer_cache_shards) {
2658 buffers_bytes += i->_get_bytes();
2659 }
2660 f->dump_int("bluestore_onode", onode_count);
2661 f->dump_int("bluestore_buffers", buffers_bytes);
2662 }
2663 void dump_cache_stats(std::ostream& ss) override {
2664 int onode_count = 0, buffers_bytes = 0;
2665 for (auto i: onode_cache_shards) {
2666 onode_count += i->_get_num();
2667 }
2668 for (auto i: buffer_cache_shards) {
2669 buffers_bytes += i->_get_bytes();
2670 }
2671 ss << "bluestore_onode: " << onode_count;
2672 ss << "bluestore_buffers: " << buffers_bytes;
2673 }
2674
2675 int validate_hobject_key(const hobject_t &obj) const override {
2676 return 0;
2677 }
2678 unsigned get_max_attr_name_length() override {
2679 return 256; // arbitrary; there is no real limit internally
2680 }
2681
2682 int mkfs() override;
2683 int mkjournal() override {
2684 return 0;
2685 }
2686
2687 void get_db_statistics(ceph::Formatter *f) override;
2688 void generate_db_histogram(ceph::Formatter *f) override;
2689 void _shutdown_cache();
2690 int flush_cache(std::ostream *os = NULL) override;
2691 void dump_perf_counters(ceph::Formatter *f) override {
2692 f->open_object_section("perf_counters");
2693 logger->dump_formatted(f, false);
2694 f->close_section();
2695 }
2696
2697 int add_new_bluefs_device(int id, const std::string& path);
2698 int migrate_to_existing_bluefs_device(const std::set<int>& devs_source,
2699 int id);
2700 int migrate_to_new_bluefs_device(const std::set<int>& devs_source,
2701 int id,
2702 const std::string& path);
2703 int expand_devices(std::ostream& out);
2704 std::string get_device_path(unsigned id);
2705
2706 int dump_bluefs_sizes(ostream& out);
2707
2708 public:
2709 int statfs(struct store_statfs_t *buf,
2710 osd_alert_list_t* alerts = nullptr) override;
2711 int pool_statfs(uint64_t pool_id, struct store_statfs_t *buf,
2712 bool *per_pool_omap) override;
2713
2714 void collect_metadata(std::map<std::string,std::string> *pm) override;
2715
2716 bool exists(CollectionHandle &c, const ghobject_t& oid) override;
2717 int set_collection_opts(
2718 CollectionHandle& c,
2719 const pool_opts_t& opts) override;
2720 int stat(
2721 CollectionHandle &c,
2722 const ghobject_t& oid,
2723 struct stat *st,
2724 bool allow_eio = false) override;
2725 int read(
2726 CollectionHandle &c,
2727 const ghobject_t& oid,
2728 uint64_t offset,
2729 size_t len,
2730 ceph::buffer::list& bl,
2731 uint32_t op_flags = 0) override;
2732
2733 private:
2734
2735 // --------------------------------------------------------
2736 // intermediate data structures used while reading
2737 struct region_t {
2738 uint64_t logical_offset;
2739 uint64_t blob_xoffset; //region offset within the blob
2740 uint64_t length;
2741
2742 // used later in read process
2743 uint64_t front = 0;
2744
2745 region_t(uint64_t offset, uint64_t b_offs, uint64_t len, uint64_t front = 0)
2746 : logical_offset(offset),
2747 blob_xoffset(b_offs),
2748 length(len),
2749 front(front){}
2750 region_t(const region_t& from)
2751 : logical_offset(from.logical_offset),
2752 blob_xoffset(from.blob_xoffset),
2753 length(from.length),
2754 front(from.front){}
2755
2756 friend std::ostream& operator<<(std::ostream& out, const region_t& r) {
2757 return out << "0x" << std::hex << r.logical_offset << ":"
2758 << r.blob_xoffset << "~" << r.length << std::dec;
2759 }
2760 };
2761
2762 // merged blob read request
2763 struct read_req_t {
2764 uint64_t r_off = 0;
2765 uint64_t r_len = 0;
2766 ceph::buffer::list bl;
2767 std::list<region_t> regs; // original read regions
2768
2769 read_req_t(uint64_t off, uint64_t len) : r_off(off), r_len(len) {}
2770
2771 friend std::ostream& operator<<(std::ostream& out, const read_req_t& r) {
2772 out << "{<0x" << std::hex << r.r_off << ", 0x" << r.r_len << "> : [";
2773 for (const auto& reg : r.regs)
2774 out << reg;
2775 return out << "]}" << std::dec;
2776 }
2777 };
2778
2779 typedef std::list<read_req_t> regions2read_t;
2780 typedef std::map<BlueStore::BlobRef, regions2read_t> blobs2read_t;
2781
2782 void _read_cache(
2783 OnodeRef o,
2784 uint64_t offset,
2785 size_t length,
2786 int read_cache_policy,
2787 ready_regions_t& ready_regions,
2788 blobs2read_t& blobs2read);
2789
2790
2791 int _prepare_read_ioc(
2792 blobs2read_t& blobs2read,
2793 std::vector<ceph::buffer::list>* compressed_blob_bls,
2794 IOContext* ioc);
2795
2796 int _generate_read_result_bl(
2797 OnodeRef o,
2798 uint64_t offset,
2799 size_t length,
2800 ready_regions_t& ready_regions,
2801 std::vector<ceph::buffer::list>& compressed_blob_bls,
2802 blobs2read_t& blobs2read,
2803 bool buffered,
2804 bool* csum_error,
2805 ceph::buffer::list& bl);
2806
2807 int _do_read(
2808 Collection *c,
2809 OnodeRef o,
2810 uint64_t offset,
2811 size_t len,
2812 ceph::buffer::list& bl,
2813 uint32_t op_flags = 0,
2814 uint64_t retry_count = 0);
2815
2816 int _do_readv(
2817 Collection *c,
2818 OnodeRef o,
2819 const interval_set<uint64_t>& m,
2820 ceph::buffer::list& bl,
2821 uint32_t op_flags = 0,
2822 uint64_t retry_count = 0);
2823
2824 int _fiemap(CollectionHandle &c_, const ghobject_t& oid,
2825 uint64_t offset, size_t len, interval_set<uint64_t>& destset);
2826 public:
2827 int fiemap(CollectionHandle &c, const ghobject_t& oid,
2828 uint64_t offset, size_t len, ceph::buffer::list& bl) override;
2829 int fiemap(CollectionHandle &c, const ghobject_t& oid,
2830 uint64_t offset, size_t len, std::map<uint64_t, uint64_t>& destmap) override;
2831
2832 int readv(
2833 CollectionHandle &c_,
2834 const ghobject_t& oid,
2835 interval_set<uint64_t>& m,
2836 ceph::buffer::list& bl,
2837 uint32_t op_flags) override;
2838
2839 int dump_onode(CollectionHandle &c, const ghobject_t& oid,
2840 const std::string& section_name, ceph::Formatter *f) override;
2841
2842 int getattr(CollectionHandle &c, const ghobject_t& oid, const char *name,
2843 ceph::buffer::ptr& value) override;
2844
2845 int getattrs(CollectionHandle &c, const ghobject_t& oid,
2846 std::map<std::string,ceph::buffer::ptr>& aset) override;
2847
2848 int list_collections(std::vector<coll_t>& ls) override;
2849
2850 CollectionHandle open_collection(const coll_t &c) override;
2851 CollectionHandle create_new_collection(const coll_t& cid) override;
2852 void set_collection_commit_queue(const coll_t& cid,
2853 ContextQueue *commit_queue) override;
2854
2855 bool collection_exists(const coll_t& c) override;
2856 int collection_empty(CollectionHandle& c, bool *empty) override;
2857 int collection_bits(CollectionHandle& c) override;
2858
2859 int collection_list(CollectionHandle &c,
2860 const ghobject_t& start,
2861 const ghobject_t& end,
2862 int max,
2863 std::vector<ghobject_t> *ls, ghobject_t *next) override;
2864
2865 int collection_list_legacy(CollectionHandle &c,
2866 const ghobject_t& start,
2867 const ghobject_t& end,
2868 int max,
2869 std::vector<ghobject_t> *ls,
2870 ghobject_t *next) override;
2871
2872 int omap_get(
2873 CollectionHandle &c, ///< [in] Collection containing oid
2874 const ghobject_t &oid, ///< [in] Object containing omap
2875 ceph::buffer::list *header, ///< [out] omap header
2876 std::map<std::string, ceph::buffer::list> *out /// < [out] Key to value map
2877 ) override;
2878 int _omap_get(
2879 Collection *c, ///< [in] Collection containing oid
2880 const ghobject_t &oid, ///< [in] Object containing omap
2881 ceph::buffer::list *header, ///< [out] omap header
2882 std::map<std::string, ceph::buffer::list> *out /// < [out] Key to value map
2883 );
2884 int _onode_omap_get(
2885 const OnodeRef &o, ///< [in] Object containing omap
2886 ceph::buffer::list *header, ///< [out] omap header
2887 std::map<std::string, ceph::buffer::list> *out /// < [out] Key to value map
2888 );
2889
2890
2891 /// Get omap header
2892 int omap_get_header(
2893 CollectionHandle &c, ///< [in] Collection containing oid
2894 const ghobject_t &oid, ///< [in] Object containing omap
2895 ceph::buffer::list *header, ///< [out] omap header
2896 bool allow_eio = false ///< [in] don't assert on eio
2897 ) override;
2898
2899 /// Get keys defined on oid
2900 int omap_get_keys(
2901 CollectionHandle &c, ///< [in] Collection containing oid
2902 const ghobject_t &oid, ///< [in] Object containing omap
2903 std::set<std::string> *keys ///< [out] Keys defined on oid
2904 ) override;
2905
2906 /// Get key values
2907 int omap_get_values(
2908 CollectionHandle &c, ///< [in] Collection containing oid
2909 const ghobject_t &oid, ///< [in] Object containing omap
2910 const std::set<std::string> &keys, ///< [in] Keys to get
2911 std::map<std::string, ceph::buffer::list> *out ///< [out] Returned keys and values
2912 ) override;
2913
2914 #ifdef WITH_SEASTAR
2915 int omap_get_values(
2916 CollectionHandle &c, ///< [in] Collection containing oid
2917 const ghobject_t &oid, ///< [in] Object containing omap
2918 const std::optional<std::string> &start_after, ///< [in] Keys to get
2919 std::map<std::string, ceph::buffer::list> *out ///< [out] Returned keys and values
2920 ) override;
2921 #endif
2922
2923 /// Filters keys into out which are defined on oid
2924 int omap_check_keys(
2925 CollectionHandle &c, ///< [in] Collection containing oid
2926 const ghobject_t &oid, ///< [in] Object containing omap
2927 const std::set<std::string> &keys, ///< [in] Keys to check
2928 std::set<std::string> *out ///< [out] Subset of keys defined on oid
2929 ) override;
2930
2931 ObjectMap::ObjectMapIterator get_omap_iterator(
2932 CollectionHandle &c, ///< [in] collection
2933 const ghobject_t &oid ///< [in] object
2934 ) override;
2935
2936 void set_fsid(uuid_d u) override {
2937 fsid = u;
2938 }
2939 uuid_d get_fsid() override {
2940 return fsid;
2941 }
2942
2943 uint64_t estimate_objects_overhead(uint64_t num_objects) override {
2944 return num_objects * 300; //assuming per-object overhead is 300 bytes
2945 }
2946
2947 struct BSPerfTracker {
2948 PerfCounters::avg_tracker<uint64_t> os_commit_latency_ns;
2949 PerfCounters::avg_tracker<uint64_t> os_apply_latency_ns;
2950
2951 objectstore_perf_stat_t get_cur_stats() const {
2952 objectstore_perf_stat_t ret;
2953 ret.os_commit_latency_ns = os_commit_latency_ns.current_avg();
2954 ret.os_apply_latency_ns = os_apply_latency_ns.current_avg();
2955 return ret;
2956 }
2957
2958 void update_from_perfcounters(PerfCounters &logger);
2959 } perf_tracker;
2960
2961 objectstore_perf_stat_t get_cur_stats() override {
2962 perf_tracker.update_from_perfcounters(*logger);
2963 return perf_tracker.get_cur_stats();
2964 }
2965 const PerfCounters* get_perf_counters() const override {
2966 return logger;
2967 }
2968 const PerfCounters* get_bluefs_perf_counters() const {
2969 return bluefs->get_perf_counters();
2970 }
2971 KeyValueDB* get_kv() {
2972 return db;
2973 }
2974
2975 int queue_transactions(
2976 CollectionHandle& ch,
2977 std::vector<Transaction>& tls,
2978 TrackedOpRef op = TrackedOpRef(),
2979 ThreadPool::TPHandle *handle = NULL) override;
2980
2981 // error injection
2982 void inject_data_error(const ghobject_t& o) override {
2983 std::unique_lock l(debug_read_error_lock);
2984 debug_data_error_objects.insert(o);
2985 }
2986 void inject_mdata_error(const ghobject_t& o) override {
2987 std::unique_lock l(debug_read_error_lock);
2988 debug_mdata_error_objects.insert(o);
2989 }
2990
2991 /// methods to inject various errors fsck can repair
2992 void inject_broken_shared_blob_key(const std::string& key,
2993 const ceph::buffer::list& bl);
2994 void inject_leaked(uint64_t len);
2995 void inject_false_free(coll_t cid, ghobject_t oid);
2996 void inject_statfs(const std::string& key, const store_statfs_t& new_statfs);
2997 void inject_global_statfs(const store_statfs_t& new_statfs);
2998 void inject_misreference(coll_t cid1, ghobject_t oid1,
2999 coll_t cid2, ghobject_t oid2,
3000 uint64_t offset);
3001 void inject_zombie_spanning_blob(coll_t cid, ghobject_t oid, int16_t blob_id);
3002 // resets global per_pool_omap in DB
3003 void inject_legacy_omap();
3004 // resets per_pool_omap | pgmeta_omap for onode
3005 void inject_legacy_omap(coll_t cid, ghobject_t oid);
3006
3007 void compact() override {
3008 ceph_assert(db);
3009 db->compact();
3010 }
3011 bool has_builtin_csum() const override {
3012 return true;
3013 }
3014
3015 inline void log_latency(const char* name,
3016 int idx,
3017 const ceph::timespan& lat,
3018 double lat_threshold,
3019 const char* info = "") const;
3020
3021 inline void log_latency_fn(const char* name,
3022 int idx,
3023 const ceph::timespan& lat,
3024 double lat_threshold,
3025 std::function<std::string (const ceph::timespan& lat)> fn) const;
3026
3027 private:
3028 bool _debug_data_eio(const ghobject_t& o) {
3029 if (!cct->_conf->bluestore_debug_inject_read_err) {
3030 return false;
3031 }
3032 std::shared_lock l(debug_read_error_lock);
3033 return debug_data_error_objects.count(o);
3034 }
3035 bool _debug_mdata_eio(const ghobject_t& o) {
3036 if (!cct->_conf->bluestore_debug_inject_read_err) {
3037 return false;
3038 }
3039 std::shared_lock l(debug_read_error_lock);
3040 return debug_mdata_error_objects.count(o);
3041 }
3042 void _debug_obj_on_delete(const ghobject_t& o) {
3043 if (cct->_conf->bluestore_debug_inject_read_err) {
3044 std::unique_lock l(debug_read_error_lock);
3045 debug_data_error_objects.erase(o);
3046 debug_mdata_error_objects.erase(o);
3047 }
3048 }
3049 private:
3050 ceph::mutex qlock = ceph::make_mutex("BlueStore::Alerts::qlock");
3051 std::string failed_cmode;
3052 std::set<std::string> failed_compressors;
3053 std::string spillover_alert;
3054 std::string legacy_statfs_alert;
3055 std::string no_per_pool_omap_alert;
3056 std::string no_per_pg_omap_alert;
3057 std::string disk_size_mismatch_alert;
3058 std::string spurious_read_errors_alert;
3059
3060 void _log_alerts(osd_alert_list_t& alerts);
3061 bool _set_compression_alert(bool cmode, const char* s) {
3062 std::lock_guard l(qlock);
3063 if (cmode) {
3064 bool ret = failed_cmode.empty();
3065 failed_cmode = s;
3066 return ret;
3067 }
3068 return failed_compressors.emplace(s).second;
3069 }
3070 void _clear_compression_alert() {
3071 std::lock_guard l(qlock);
3072 failed_compressors.clear();
3073 failed_cmode.clear();
3074 }
3075
3076 void _set_spillover_alert(const std::string& s) {
3077 std::lock_guard l(qlock);
3078 spillover_alert = s;
3079 }
3080 void _clear_spillover_alert() {
3081 std::lock_guard l(qlock);
3082 spillover_alert.clear();
3083 }
3084
3085 void _check_legacy_statfs_alert();
3086 void _check_no_per_pg_or_pool_omap_alert();
3087 void _set_disk_size_mismatch_alert(const std::string& s) {
3088 std::lock_guard l(qlock);
3089 disk_size_mismatch_alert = s;
3090 }
3091 void _set_spurious_read_errors_alert(const string& s) {
3092 std::lock_guard l(qlock);
3093 spurious_read_errors_alert = s;
3094 }
3095
3096 private:
3097
3098 // --------------------------------------------------------
3099 // read processing internal methods
3100 int _verify_csum(
3101 OnodeRef& o,
3102 const bluestore_blob_t* blob,
3103 uint64_t blob_xoffset,
3104 const ceph::buffer::list& bl,
3105 uint64_t logical_offset) const;
3106 int _decompress(ceph::buffer::list& source, ceph::buffer::list* result);
3107
3108
3109 // --------------------------------------------------------
3110 // write ops
3111
3112 struct WriteContext {
3113 bool buffered = false; ///< buffered write
3114 bool compress = false; ///< compressed write
3115 uint64_t target_blob_size = 0; ///< target (max) blob size
3116 unsigned csum_order = 0; ///< target checksum chunk order
3117
3118 old_extent_map_t old_extents; ///< must deref these blobs
3119 interval_set<uint64_t> extents_to_gc; ///< extents for garbage collection
3120
3121 struct write_item {
3122 uint64_t logical_offset; ///< write logical offset
3123 BlobRef b;
3124 uint64_t blob_length;
3125 uint64_t b_off;
3126 ceph::buffer::list bl;
3127 uint64_t b_off0; ///< original offset in a blob prior to padding
3128 uint64_t length0; ///< original data length prior to padding
3129
3130 bool mark_unused;
3131 bool new_blob; ///< whether new blob was created
3132
3133 bool compressed = false;
3134 ceph::buffer::list compressed_bl;
3135 size_t compressed_len = 0;
3136
3137 write_item(
3138 uint64_t logical_offs,
3139 BlobRef b,
3140 uint64_t blob_len,
3141 uint64_t o,
3142 ceph::buffer::list& bl,
3143 uint64_t o0,
3144 uint64_t l0,
3145 bool _mark_unused,
3146 bool _new_blob)
3147 :
3148 logical_offset(logical_offs),
3149 b(b),
3150 blob_length(blob_len),
3151 b_off(o),
3152 bl(bl),
3153 b_off0(o0),
3154 length0(l0),
3155 mark_unused(_mark_unused),
3156 new_blob(_new_blob) {}
3157 };
3158 std::vector<write_item> writes; ///< blobs we're writing
3159
3160 /// partial clone of the context
3161 void fork(const WriteContext& other) {
3162 buffered = other.buffered;
3163 compress = other.compress;
3164 target_blob_size = other.target_blob_size;
3165 csum_order = other.csum_order;
3166 }
3167 void write(
3168 uint64_t loffs,
3169 BlobRef b,
3170 uint64_t blob_len,
3171 uint64_t o,
3172 ceph::buffer::list& bl,
3173 uint64_t o0,
3174 uint64_t len0,
3175 bool _mark_unused,
3176 bool _new_blob) {
3177 writes.emplace_back(loffs,
3178 b,
3179 blob_len,
3180 o,
3181 bl,
3182 o0,
3183 len0,
3184 _mark_unused,
3185 _new_blob);
3186 }
3187 /// Checks for writes to the same pextent within a blob
3188 bool has_conflict(
3189 BlobRef b,
3190 uint64_t loffs,
3191 uint64_t loffs_end,
3192 uint64_t min_alloc_size);
3193 };
3194
3195 void _do_write_small(
3196 TransContext *txc,
3197 CollectionRef &c,
3198 OnodeRef o,
3199 uint64_t offset, uint64_t length,
3200 ceph::buffer::list::iterator& blp,
3201 WriteContext *wctx);
3202 void _do_write_big_apply_deferred(
3203 TransContext* txc,
3204 CollectionRef& c,
3205 OnodeRef o,
3206 BigDeferredWriteContext& dctx,
3207 bufferlist::iterator& blp,
3208 WriteContext* wctx);
3209 void _do_write_big(
3210 TransContext *txc,
3211 CollectionRef &c,
3212 OnodeRef o,
3213 uint64_t offset, uint64_t length,
3214 ceph::buffer::list::iterator& blp,
3215 WriteContext *wctx);
3216 int _do_alloc_write(
3217 TransContext *txc,
3218 CollectionRef c,
3219 OnodeRef o,
3220 WriteContext *wctx);
3221 void _wctx_finish(
3222 TransContext *txc,
3223 CollectionRef& c,
3224 OnodeRef o,
3225 WriteContext *wctx,
3226 std::set<SharedBlob*> *maybe_unshared_blobs=0);
3227
3228 int _write(TransContext *txc,
3229 CollectionRef& c,
3230 OnodeRef& o,
3231 uint64_t offset, size_t len,
3232 ceph::buffer::list& bl,
3233 uint32_t fadvise_flags);
3234 void _pad_zeros(ceph::buffer::list *bl, uint64_t *offset,
3235 uint64_t chunk_size);
3236
3237 void _choose_write_options(CollectionRef& c,
3238 OnodeRef o,
3239 uint32_t fadvise_flags,
3240 WriteContext *wctx);
3241
3242 int _do_gc(TransContext *txc,
3243 CollectionRef& c,
3244 OnodeRef o,
3245 const WriteContext& wctx,
3246 uint64_t *dirty_start,
3247 uint64_t *dirty_end);
3248
3249 int _do_write(TransContext *txc,
3250 CollectionRef &c,
3251 OnodeRef o,
3252 uint64_t offset, uint64_t length,
3253 ceph::buffer::list& bl,
3254 uint32_t fadvise_flags);
3255 void _do_write_data(TransContext *txc,
3256 CollectionRef& c,
3257 OnodeRef o,
3258 uint64_t offset,
3259 uint64_t length,
3260 ceph::buffer::list& bl,
3261 WriteContext *wctx);
3262
3263 int _touch(TransContext *txc,
3264 CollectionRef& c,
3265 OnodeRef& o);
3266 int _do_zero(TransContext *txc,
3267 CollectionRef& c,
3268 OnodeRef& o,
3269 uint64_t offset, size_t len);
3270 int _zero(TransContext *txc,
3271 CollectionRef& c,
3272 OnodeRef& o,
3273 uint64_t offset, size_t len);
3274 void _do_truncate(TransContext *txc,
3275 CollectionRef& c,
3276 OnodeRef o,
3277 uint64_t offset,
3278 std::set<SharedBlob*> *maybe_unshared_blobs=0);
3279 int _truncate(TransContext *txc,
3280 CollectionRef& c,
3281 OnodeRef& o,
3282 uint64_t offset);
3283 int _remove(TransContext *txc,
3284 CollectionRef& c,
3285 OnodeRef& o);
3286 int _do_remove(TransContext *txc,
3287 CollectionRef& c,
3288 OnodeRef o);
3289 int _setattr(TransContext *txc,
3290 CollectionRef& c,
3291 OnodeRef& o,
3292 const std::string& name,
3293 ceph::buffer::ptr& val);
3294 int _setattrs(TransContext *txc,
3295 CollectionRef& c,
3296 OnodeRef& o,
3297 const std::map<std::string,ceph::buffer::ptr>& aset);
3298 int _rmattr(TransContext *txc,
3299 CollectionRef& c,
3300 OnodeRef& o,
3301 const std::string& name);
3302 int _rmattrs(TransContext *txc,
3303 CollectionRef& c,
3304 OnodeRef& o);
3305 void _do_omap_clear(TransContext *txc, OnodeRef &o);
3306 int _omap_clear(TransContext *txc,
3307 CollectionRef& c,
3308 OnodeRef& o);
3309 int _omap_setkeys(TransContext *txc,
3310 CollectionRef& c,
3311 OnodeRef& o,
3312 ceph::buffer::list& bl);
3313 int _omap_setheader(TransContext *txc,
3314 CollectionRef& c,
3315 OnodeRef& o,
3316 ceph::buffer::list& header);
3317 int _omap_rmkeys(TransContext *txc,
3318 CollectionRef& c,
3319 OnodeRef& o,
3320 ceph::buffer::list& bl);
3321 int _omap_rmkey_range(TransContext *txc,
3322 CollectionRef& c,
3323 OnodeRef& o,
3324 const std::string& first, const std::string& last);
3325 int _set_alloc_hint(
3326 TransContext *txc,
3327 CollectionRef& c,
3328 OnodeRef& o,
3329 uint64_t expected_object_size,
3330 uint64_t expected_write_size,
3331 uint32_t flags);
3332 int _do_clone_range(TransContext *txc,
3333 CollectionRef& c,
3334 OnodeRef& oldo,
3335 OnodeRef& newo,
3336 uint64_t srcoff, uint64_t length, uint64_t dstoff);
3337 int _clone(TransContext *txc,
3338 CollectionRef& c,
3339 OnodeRef& oldo,
3340 OnodeRef& newo);
3341 int _clone_range(TransContext *txc,
3342 CollectionRef& c,
3343 OnodeRef& oldo,
3344 OnodeRef& newo,
3345 uint64_t srcoff, uint64_t length, uint64_t dstoff);
3346 int _rename(TransContext *txc,
3347 CollectionRef& c,
3348 OnodeRef& oldo,
3349 OnodeRef& newo,
3350 const ghobject_t& new_oid);
3351 int _create_collection(TransContext *txc, const coll_t &cid,
3352 unsigned bits, CollectionRef *c);
3353 int _remove_collection(TransContext *txc, const coll_t &cid,
3354 CollectionRef *c);
3355 void _do_remove_collection(TransContext *txc, CollectionRef *c);
3356 int _split_collection(TransContext *txc,
3357 CollectionRef& c,
3358 CollectionRef& d,
3359 unsigned bits, int rem);
3360 int _merge_collection(TransContext *txc,
3361 CollectionRef *c,
3362 CollectionRef& d,
3363 unsigned bits);
3364
3365 void _collect_allocation_stats(uint64_t need, uint32_t alloc_size,
3366 size_t extents);
3367 void _record_allocation_stats();
3368 private:
3369 uint64_t probe_count = 0;
3370 std::atomic<uint64_t> alloc_stats_count = {0};
3371 std::atomic<uint64_t> alloc_stats_fragments = { 0 };
3372 std::atomic<uint64_t> alloc_stats_size = { 0 };
3373 //
3374 std::array<std::tuple<uint64_t, uint64_t, uint64_t>, 5> alloc_stats_history =
3375 { std::make_tuple(0ul, 0ul, 0ul) };
3376
3377 inline bool _use_rotational_settings();
3378
3379 public:
3380 struct sb_info_t {
3381 coll_t cid;
3382 int64_t pool_id = INT64_MIN;
3383 std::list<ghobject_t> oids;
3384 BlueStore::SharedBlobRef sb;
3385 bluestore_extent_ref_map_t ref_map;
3386 bool compressed = false;
3387 bool passed = false;
3388 bool updated = false;
3389 };
3390 typedef btree::btree_set<
3391 uint64_t, std::less<uint64_t>,
3392 mempool::bluestore_fsck::pool_allocator<uint64_t>> uint64_t_btree_t;
3393
3394 typedef mempool::bluestore_fsck::map<uint64_t, sb_info_t> sb_info_map_t;
3395 struct FSCK_ObjectCtx {
3396 int64_t& errors;
3397 int64_t& warnings;
3398 uint64_t& num_objects;
3399 uint64_t& num_extents;
3400 uint64_t& num_blobs;
3401 uint64_t& num_sharded_objects;
3402 uint64_t& num_spanning_blobs;
3403
3404 mempool_dynamic_bitset* used_blocks;
3405 uint64_t_btree_t* used_omap_head;
3406
3407 ceph::mutex* sb_info_lock;
3408 sb_info_map_t& sb_info;
3409
3410 store_statfs_t& expected_store_statfs;
3411 per_pool_statfs& expected_pool_statfs;
3412 BlueStoreRepairer* repairer;
3413
3414 FSCK_ObjectCtx(int64_t& e,
3415 int64_t& w,
3416 uint64_t& _num_objects,
3417 uint64_t& _num_extents,
3418 uint64_t& _num_blobs,
3419 uint64_t& _num_sharded_objects,
3420 uint64_t& _num_spanning_blobs,
3421 mempool_dynamic_bitset* _ub,
3422 uint64_t_btree_t* _used_omap_head,
3423 ceph::mutex* _sb_info_lock,
3424 sb_info_map_t& _sb_info,
3425 store_statfs_t& _store_statfs,
3426 per_pool_statfs& _pool_statfs,
3427 BlueStoreRepairer* _repairer) :
3428 errors(e),
3429 warnings(w),
3430 num_objects(_num_objects),
3431 num_extents(_num_extents),
3432 num_blobs(_num_blobs),
3433 num_sharded_objects(_num_sharded_objects),
3434 num_spanning_blobs(_num_spanning_blobs),
3435 used_blocks(_ub),
3436 used_omap_head(_used_omap_head),
3437 sb_info_lock(_sb_info_lock),
3438 sb_info(_sb_info),
3439 expected_store_statfs(_store_statfs),
3440 expected_pool_statfs(_pool_statfs),
3441 repairer(_repairer) {
3442 }
3443 };
3444
3445 OnodeRef fsck_check_objects_shallow(
3446 FSCKDepth depth,
3447 int64_t pool_id,
3448 CollectionRef c,
3449 const ghobject_t& oid,
3450 const std::string& key,
3451 const ceph::buffer::list& value,
3452 mempool::bluestore_fsck::list<std::string>* expecting_shards,
3453 std::map<BlobRef, bluestore_blob_t::unused_t>* referenced,
3454 const BlueStore::FSCK_ObjectCtx& ctx);
3455
3456 private:
3457 void _fsck_check_object_omap(FSCKDepth depth,
3458 OnodeRef& o,
3459 const BlueStore::FSCK_ObjectCtx& ctx);
3460
3461 void _fsck_check_objects(FSCKDepth depth,
3462 FSCK_ObjectCtx& ctx);
3463 };
3464
3465 inline std::ostream& operator<<(std::ostream& out, const BlueStore::volatile_statfs& s) {
3466 return out
3467 << " allocated:"
3468 << s.values[BlueStore::volatile_statfs::STATFS_ALLOCATED]
3469 << " stored:"
3470 << s.values[BlueStore::volatile_statfs::STATFS_STORED]
3471 << " compressed:"
3472 << s.values[BlueStore::volatile_statfs::STATFS_COMPRESSED]
3473 << " compressed_orig:"
3474 << s.values[BlueStore::volatile_statfs::STATFS_COMPRESSED_ORIGINAL]
3475 << " compressed_alloc:"
3476 << s.values[BlueStore::volatile_statfs::STATFS_COMPRESSED_ALLOCATED];
3477 }
3478
3479 static inline void intrusive_ptr_add_ref(BlueStore::Onode *o) {
3480 o->get();
3481 }
3482 static inline void intrusive_ptr_release(BlueStore::Onode *o) {
3483 o->put();
3484 }
3485
3486 static inline void intrusive_ptr_add_ref(BlueStore::OpSequencer *o) {
3487 o->get();
3488 }
3489 static inline void intrusive_ptr_release(BlueStore::OpSequencer *o) {
3490 o->put();
3491 }
3492
3493 class BlueStoreRepairer
3494 {
3495 public:
3496 // to simplify future potential migration to mempools
3497 using fsck_interval = interval_set<uint64_t>;
3498
3499 // Structure to track what pextents are used for specific cid/oid.
3500 // Similar to Bloom filter positive and false-positive matches are
3501 // possible only.
3502 // Maintains two lists of bloom filters for both cids and oids
3503 // where each list entry is a BF for specific disk pextent
3504 // The length of the extent per filter is measured on init.
3505 // Allows to filter out 'uninteresting' pextents to speadup subsequent
3506 // 'is_used' access.
3507 struct StoreSpaceTracker {
3508 const uint64_t BLOOM_FILTER_SALT_COUNT = 2;
3509 const uint64_t BLOOM_FILTER_TABLE_SIZE = 32; // bytes per single filter
3510 const uint64_t BLOOM_FILTER_EXPECTED_COUNT = 16; // arbitrary selected
3511 static const uint64_t DEF_MEM_CAP = 128 * 1024 * 1024;
3512
3513 typedef mempool::bluestore_fsck::vector<bloom_filter> bloom_vector;
3514 bloom_vector collections_bfs;
3515 bloom_vector objects_bfs;
3516
3517 bool was_filtered_out = false;
3518 uint64_t granularity = 0; // extent length for a single filter
3519
3520 StoreSpaceTracker() {
3521 }
3522 StoreSpaceTracker(const StoreSpaceTracker& from) :
3523 collections_bfs(from.collections_bfs),
3524 objects_bfs(from.objects_bfs),
3525 granularity(from.granularity) {
3526 }
3527
3528 void init(uint64_t total,
3529 uint64_t min_alloc_size,
3530 uint64_t mem_cap = DEF_MEM_CAP) {
3531 ceph_assert(!granularity); // not initialized yet
3532 ceph_assert(min_alloc_size && isp2(min_alloc_size));
3533 ceph_assert(mem_cap);
3534
3535 total = round_up_to(total, min_alloc_size);
3536 granularity = total * BLOOM_FILTER_TABLE_SIZE * 2 / mem_cap;
3537
3538 if (!granularity) {
3539 granularity = min_alloc_size;
3540 } else {
3541 granularity = round_up_to(granularity, min_alloc_size);
3542 }
3543
3544 uint64_t entries = round_up_to(total, granularity) / granularity;
3545 collections_bfs.resize(entries,
3546 bloom_filter(BLOOM_FILTER_SALT_COUNT,
3547 BLOOM_FILTER_TABLE_SIZE,
3548 0,
3549 BLOOM_FILTER_EXPECTED_COUNT));
3550 objects_bfs.resize(entries,
3551 bloom_filter(BLOOM_FILTER_SALT_COUNT,
3552 BLOOM_FILTER_TABLE_SIZE,
3553 0,
3554 BLOOM_FILTER_EXPECTED_COUNT));
3555 }
3556 inline uint32_t get_hash(const coll_t& cid) const {
3557 return cid.hash_to_shard(1);
3558 }
3559 inline void set_used(uint64_t offset, uint64_t len,
3560 const coll_t& cid, const ghobject_t& oid) {
3561 ceph_assert(granularity); // initialized
3562
3563 // can't call this func after filter_out has been applied
3564 ceph_assert(!was_filtered_out);
3565 if (!len) {
3566 return;
3567 }
3568 auto pos = offset / granularity;
3569 auto end_pos = (offset + len - 1) / granularity;
3570 while (pos <= end_pos) {
3571 collections_bfs[pos].insert(get_hash(cid));
3572 objects_bfs[pos].insert(oid.hobj.get_hash());
3573 ++pos;
3574 }
3575 }
3576 // filter-out entries unrelated to the specified(broken) extents.
3577 // 'is_used' calls are permitted after that only
3578 size_t filter_out(const fsck_interval& extents);
3579
3580 // determines if collection's present after filtering-out
3581 inline bool is_used(const coll_t& cid) const {
3582 ceph_assert(was_filtered_out);
3583 for(auto& bf : collections_bfs) {
3584 if (bf.contains(get_hash(cid))) {
3585 return true;
3586 }
3587 }
3588 return false;
3589 }
3590 // determines if object's present after filtering-out
3591 inline bool is_used(const ghobject_t& oid) const {
3592 ceph_assert(was_filtered_out);
3593 for(auto& bf : objects_bfs) {
3594 if (bf.contains(oid.hobj.get_hash())) {
3595 return true;
3596 }
3597 }
3598 return false;
3599 }
3600 // determines if collection's present before filtering-out
3601 inline bool is_used(const coll_t& cid, uint64_t offs) const {
3602 ceph_assert(granularity); // initialized
3603 ceph_assert(!was_filtered_out);
3604 auto &bf = collections_bfs[offs / granularity];
3605 if (bf.contains(get_hash(cid))) {
3606 return true;
3607 }
3608 return false;
3609 }
3610 // determines if object's present before filtering-out
3611 inline bool is_used(const ghobject_t& oid, uint64_t offs) const {
3612 ceph_assert(granularity); // initialized
3613 ceph_assert(!was_filtered_out);
3614 auto &bf = objects_bfs[offs / granularity];
3615 if (bf.contains(oid.hobj.get_hash())) {
3616 return true;
3617 }
3618 return false;
3619 }
3620 };
3621 public:
3622 void fix_per_pool_omap(KeyValueDB *db, int);
3623 bool remove_key(KeyValueDB *db, const std::string& prefix, const std::string& key);
3624 bool fix_shared_blob(KeyValueDB *db,
3625 uint64_t sbid,
3626 const ceph::buffer::list* bl);
3627 bool fix_statfs(KeyValueDB *db, const std::string& key,
3628 const store_statfs_t& new_statfs);
3629
3630 bool fix_leaked(KeyValueDB *db,
3631 FreelistManager* fm,
3632 uint64_t offset, uint64_t len);
3633 bool fix_false_free(KeyValueDB *db,
3634 FreelistManager* fm,
3635 uint64_t offset, uint64_t len);
3636 KeyValueDB::Transaction fix_spanning_blobs(KeyValueDB* db);
3637
3638 void init(uint64_t total_space, uint64_t lres_tracking_unit_size);
3639
3640 bool preprocess_misreference(KeyValueDB *db);
3641
3642 unsigned apply(KeyValueDB* db);
3643
3644 void note_misreference(uint64_t offs, uint64_t len, bool inc_error) {
3645 misreferenced_extents.union_insert(offs, len);
3646 if (inc_error) {
3647 ++to_repair_cnt;
3648 }
3649 }
3650 // In fact this is the only repairer's method which is thread-safe!!
3651 void inc_repaired() {
3652 ++to_repair_cnt;
3653 }
3654
3655 StoreSpaceTracker& get_space_usage_tracker() {
3656 return space_usage_tracker;
3657 }
3658 const fsck_interval& get_misreferences() const {
3659 return misreferenced_extents;
3660 }
3661 KeyValueDB::Transaction get_fix_misreferences_txn() {
3662 return fix_misreferences_txn;
3663 }
3664
3665 private:
3666 std::atomic<unsigned> to_repair_cnt = { 0 };
3667 KeyValueDB::Transaction fix_per_pool_omap_txn;
3668 KeyValueDB::Transaction fix_fm_leaked_txn;
3669 KeyValueDB::Transaction fix_fm_false_free_txn;
3670 KeyValueDB::Transaction remove_key_txn;
3671 KeyValueDB::Transaction fix_statfs_txn;
3672 KeyValueDB::Transaction fix_shared_blob_txn;
3673
3674 KeyValueDB::Transaction fix_misreferences_txn;
3675 KeyValueDB::Transaction fix_onode_txn;
3676
3677 StoreSpaceTracker space_usage_tracker;
3678
3679 // non-shared extents with multiple references
3680 fsck_interval misreferenced_extents;
3681
3682 };
3683
3684 class RocksDBBlueFSVolumeSelector : public BlueFSVolumeSelector
3685 {
3686 template <class T, size_t MaxX, size_t MaxY>
3687 class matrix_2d {
3688 T values[MaxX][MaxY];
3689 public:
3690 matrix_2d() {
3691 clear();
3692 }
3693 T& at(size_t x, size_t y) {
3694 ceph_assert(x < MaxX);
3695 ceph_assert(y < MaxY);
3696
3697 return values[x][y];
3698 }
3699 size_t get_max_x() const {
3700 return MaxX;
3701 }
3702 size_t get_max_y() const {
3703 return MaxY;
3704 }
3705 void clear() {
3706 memset(values, 0, sizeof(values));
3707 }
3708 };
3709
3710 enum {
3711 // use 0/nullptr as unset indication
3712 LEVEL_FIRST = 1,
3713 LEVEL_LOG = LEVEL_FIRST, // BlueFS log
3714 LEVEL_WAL,
3715 LEVEL_DB,
3716 LEVEL_SLOW,
3717 LEVEL_MAX
3718 };
3719 // add +1 row for corresponding per-device totals
3720 // add +1 column for per-level actual (taken from file size) total
3721 typedef matrix_2d<uint64_t, BlueFS::MAX_BDEV + 1, LEVEL_MAX - LEVEL_FIRST + 1> per_level_per_dev_usage_t;
3722
3723 per_level_per_dev_usage_t per_level_per_dev_usage;
3724 // file count per level, add +1 to keep total file count
3725 uint64_t per_level_files[LEVEL_MAX - LEVEL_FIRST + 1] = { 0 };
3726
3727 // Note: maximum per-device totals below might be smaller than corresponding
3728 // perf counters by up to a single alloc unit (1M) due to superblock extent.
3729 // The later is not accounted here.
3730 per_level_per_dev_usage_t per_level_per_dev_max;
3731
3732 uint64_t l_totals[LEVEL_MAX - LEVEL_FIRST];
3733 uint64_t db_avail4slow = 0;
3734 enum {
3735 OLD_POLICY,
3736 USE_SOME_EXTRA
3737 };
3738
3739 public:
3740 RocksDBBlueFSVolumeSelector(
3741 uint64_t _wal_total,
3742 uint64_t _db_total,
3743 uint64_t _slow_total,
3744 uint64_t _level0_size,
3745 uint64_t _level_base,
3746 uint64_t _level_multiplier,
3747 double reserved_factor,
3748 uint64_t reserved,
3749 bool new_pol)
3750 {
3751 l_totals[LEVEL_LOG - LEVEL_FIRST] = 0; // not used at the moment
3752 l_totals[LEVEL_WAL - LEVEL_FIRST] = _wal_total;
3753 l_totals[LEVEL_DB - LEVEL_FIRST] = _db_total;
3754 l_totals[LEVEL_SLOW - LEVEL_FIRST] = _slow_total;
3755
3756 if (!new_pol) {
3757 return;
3758 }
3759
3760 // Calculating how much extra space is available at DB volume.
3761 // Depending on the presence of explicit reserved size specification it might be either
3762 // * DB volume size - reserved
3763 // or
3764 // * DB volume size - sum_max_level_size(0, L-1) - max_level_size(L) * reserved_factor
3765 if (!reserved) {
3766 uint64_t prev_levels = _level0_size;
3767 uint64_t cur_level = _level_base;
3768 uint64_t cur_threshold = 0;
3769 do {
3770 uint64_t next_level = cur_level * _level_multiplier;
3771 uint64_t next_threshold = prev_levels + cur_level + next_level * reserved_factor;
3772 if (_db_total <= next_threshold) {
3773 db_avail4slow = cur_threshold ? _db_total - cur_threshold : 0;
3774 break;
3775 } else {
3776 prev_levels += cur_level;
3777 cur_level = next_level;
3778 cur_threshold = next_threshold;
3779 }
3780 } while (true);
3781 } else {
3782 db_avail4slow = _db_total - reserved;
3783 }
3784 }
3785
3786 void* get_hint_for_log() const override {
3787 return reinterpret_cast<void*>(LEVEL_LOG);
3788 }
3789 void* get_hint_by_dir(const std::string& dirname) const override;
3790
3791 void add_usage(void* hint, const bluefs_fnode_t& fnode) override {
3792 if (hint == nullptr)
3793 return;
3794 size_t pos = (size_t)hint - LEVEL_FIRST;
3795 for (auto& p : fnode.extents) {
3796 auto& cur = per_level_per_dev_usage.at(p.bdev, pos);
3797 auto& max = per_level_per_dev_max.at(p.bdev, pos);
3798 cur += p.length;
3799 if (cur > max) {
3800 max = cur;
3801 }
3802 {
3803 //update per-device totals
3804 auto& cur = per_level_per_dev_usage.at(p.bdev, LEVEL_MAX - LEVEL_FIRST);
3805 auto& max = per_level_per_dev_max.at(p.bdev, LEVEL_MAX - LEVEL_FIRST);
3806 cur += p.length;
3807 if (cur > max) {
3808 max = cur;
3809 }
3810 }
3811 }
3812 {
3813 //update per-level actual totals
3814 auto& cur = per_level_per_dev_usage.at(BlueFS::MAX_BDEV, pos);
3815 auto& max = per_level_per_dev_max.at(BlueFS::MAX_BDEV, pos);
3816 cur += fnode.size;
3817 if (cur > max) {
3818 max = cur;
3819 }
3820 }
3821 ++per_level_files[pos];
3822 ++per_level_files[LEVEL_MAX - LEVEL_FIRST];
3823 }
3824 void sub_usage(void* hint, const bluefs_fnode_t& fnode) override {
3825 if (hint == nullptr)
3826 return;
3827 size_t pos = (size_t)hint - LEVEL_FIRST;
3828 for (auto& p : fnode.extents) {
3829 auto& cur = per_level_per_dev_usage.at(p.bdev, pos);
3830 ceph_assert(cur >= p.length);
3831 cur -= p.length;
3832
3833 //update per-device totals
3834 auto& cur2 = per_level_per_dev_usage.at(p.bdev, LEVEL_MAX - LEVEL_FIRST);
3835 ceph_assert(cur2 >= p.length);
3836 cur2 -= p.length;
3837 }
3838 //update per-level actual totals
3839 auto& cur = per_level_per_dev_usage.at(BlueFS::MAX_BDEV, pos);
3840 ceph_assert(cur >= fnode.size);
3841 cur -= fnode.size;
3842 ceph_assert(per_level_files[pos] > 0);
3843 --per_level_files[pos];
3844 ceph_assert(per_level_files[LEVEL_MAX - LEVEL_FIRST] > 0);
3845 --per_level_files[LEVEL_MAX - LEVEL_FIRST];
3846 }
3847 void add_usage(void* hint, uint64_t fsize) override {
3848 if (hint == nullptr)
3849 return;
3850 size_t pos = (size_t)hint - LEVEL_FIRST;
3851 //update per-level actual totals
3852 auto& cur = per_level_per_dev_usage.at(BlueFS::MAX_BDEV, pos);
3853 auto& max = per_level_per_dev_max.at(BlueFS::MAX_BDEV, pos);
3854 cur += fsize;
3855 if (cur > max) {
3856 max = cur;
3857 }
3858 }
3859 void sub_usage(void* hint, uint64_t fsize) override {
3860 if (hint == nullptr)
3861 return;
3862 size_t pos = (size_t)hint - LEVEL_FIRST;
3863 //update per-level actual totals
3864 auto& cur = per_level_per_dev_usage.at(BlueFS::MAX_BDEV, pos);
3865 ceph_assert(cur >= fsize);
3866 per_level_per_dev_usage.at(BlueFS::MAX_BDEV, pos) -= fsize;
3867 }
3868
3869 uint8_t select_prefer_bdev(void* h) override;
3870 void get_paths(
3871 const std::string& base,
3872 BlueFSVolumeSelector::paths& res) const override;
3873
3874 void dump(std::ostream& sout) override;
3875 };
3876
3877 #endif