]> git.proxmox.com Git - ceph.git/blob - ceph/src/os/bluestore/BlueStore.h
84a6e3961c1013d43041646e07d3cdd751089096
[ceph.git] / ceph / src / os / bluestore / BlueStore.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_OSD_BLUESTORE_H
16 #define CEPH_OSD_BLUESTORE_H
17
18 #include "acconfig.h"
19
20 #include <unistd.h>
21
22 #include <atomic>
23 #include <mutex>
24 #include <condition_variable>
25
26 #include <boost/intrusive/list.hpp>
27 #include <boost/intrusive/unordered_set.hpp>
28 #include <boost/intrusive/set.hpp>
29 #include <boost/functional/hash.hpp>
30 #include <boost/dynamic_bitset.hpp>
31
32 #include "include/ceph_assert.h"
33 #include "include/unordered_map.h"
34 #include "include/mempool.h"
35 #include "common/bloom_filter.hpp"
36 #include "common/Finisher.h"
37 #include "common/Throttle.h"
38 #include "common/perf_counters.h"
39 #include "common/PriorityCache.h"
40 #include "compressor/Compressor.h"
41 #include "os/ObjectStore.h"
42
43 #include "bluestore_types.h"
44 #include "BlockDevice.h"
45 #include "BlueFS.h"
46 #include "common/EventTrace.h"
47
48 class Allocator;
49 class FreelistManager;
50 class BlueStoreRepairer;
51
52 //#define DEBUG_CACHE
53 //#define DEBUG_DEFERRED
54
55
56
57 // constants for Buffer::optimize()
58 #define MAX_BUFFER_SLOP_RATIO_DEN 8 // so actually 1/N
59
60
61 enum {
62 l_bluestore_first = 732430,
63 l_bluestore_kv_flush_lat,
64 l_bluestore_kv_commit_lat,
65 l_bluestore_kv_sync_lat,
66 l_bluestore_kv_final_lat,
67 l_bluestore_state_prepare_lat,
68 l_bluestore_state_aio_wait_lat,
69 l_bluestore_state_io_done_lat,
70 l_bluestore_state_kv_queued_lat,
71 l_bluestore_state_kv_committing_lat,
72 l_bluestore_state_kv_done_lat,
73 l_bluestore_state_deferred_queued_lat,
74 l_bluestore_state_deferred_aio_wait_lat,
75 l_bluestore_state_deferred_cleanup_lat,
76 l_bluestore_state_finishing_lat,
77 l_bluestore_state_done_lat,
78 l_bluestore_throttle_lat,
79 l_bluestore_submit_lat,
80 l_bluestore_commit_lat,
81 l_bluestore_read_lat,
82 l_bluestore_read_onode_meta_lat,
83 l_bluestore_read_wait_aio_lat,
84 l_bluestore_compress_lat,
85 l_bluestore_decompress_lat,
86 l_bluestore_csum_lat,
87 l_bluestore_compress_success_count,
88 l_bluestore_compress_rejected_count,
89 l_bluestore_write_pad_bytes,
90 l_bluestore_deferred_write_ops,
91 l_bluestore_deferred_write_bytes,
92 l_bluestore_write_penalty_read_ops,
93 l_bluestore_allocated,
94 l_bluestore_stored,
95 l_bluestore_compressed,
96 l_bluestore_compressed_allocated,
97 l_bluestore_compressed_original,
98 l_bluestore_onodes,
99 l_bluestore_onode_hits,
100 l_bluestore_onode_misses,
101 l_bluestore_onode_shard_hits,
102 l_bluestore_onode_shard_misses,
103 l_bluestore_extents,
104 l_bluestore_blobs,
105 l_bluestore_buffers,
106 l_bluestore_buffer_bytes,
107 l_bluestore_buffer_hit_bytes,
108 l_bluestore_buffer_miss_bytes,
109 l_bluestore_write_big,
110 l_bluestore_write_big_bytes,
111 l_bluestore_write_big_blobs,
112 l_bluestore_write_small,
113 l_bluestore_write_small_bytes,
114 l_bluestore_write_small_unused,
115 l_bluestore_write_small_deferred,
116 l_bluestore_write_small_pre_read,
117 l_bluestore_write_small_new,
118 l_bluestore_txc,
119 l_bluestore_onode_reshard,
120 l_bluestore_blob_split,
121 l_bluestore_extent_compress,
122 l_bluestore_gc_merged,
123 l_bluestore_read_eio,
124 l_bluestore_reads_with_retries,
125 l_bluestore_fragmentation,
126 l_bluestore_omap_seek_to_first_lat,
127 l_bluestore_omap_upper_bound_lat,
128 l_bluestore_omap_lower_bound_lat,
129 l_bluestore_omap_next_lat,
130 l_bluestore_clist_lat,
131 l_bluestore_last
132 };
133
134 #define META_POOL_ID ((uint64_t)-1ull)
135
136 class BlueStore : public ObjectStore,
137 public BlueFSDeviceExpander,
138 public md_config_obs_t {
139 // -----------------------------------------------------
140 // types
141 public:
142 // config observer
143 const char** get_tracked_conf_keys() const override;
144 void handle_conf_change(const ConfigProxy& conf,
145 const std::set<std::string> &changed) override;
146
147 //handler for discard event
148 void handle_discard(interval_set<uint64_t>& to_release);
149
150 void _set_csum();
151 void _set_compression();
152 void _set_throttle_params();
153 int _set_cache_sizes();
154
155 class TransContext;
156
157 typedef map<uint64_t, bufferlist> ready_regions_t;
158
159 struct BufferSpace;
160 struct Collection;
161 typedef boost::intrusive_ptr<Collection> CollectionRef;
162
163 struct AioContext {
164 virtual void aio_finish(BlueStore *store) = 0;
165 virtual ~AioContext() {}
166 };
167
168 /// cached buffer
169 struct Buffer {
170 MEMPOOL_CLASS_HELPERS();
171
172 enum {
173 STATE_EMPTY, ///< empty buffer -- used for cache history
174 STATE_CLEAN, ///< clean data that is up to date
175 STATE_WRITING, ///< data that is being written (io not yet complete)
176 };
177 static const char *get_state_name(int s) {
178 switch (s) {
179 case STATE_EMPTY: return "empty";
180 case STATE_CLEAN: return "clean";
181 case STATE_WRITING: return "writing";
182 default: return "???";
183 }
184 }
185 enum {
186 FLAG_NOCACHE = 1, ///< trim when done WRITING (do not become CLEAN)
187 // NOTE: fix operator<< when you define a second flag
188 };
189 static const char *get_flag_name(int s) {
190 switch (s) {
191 case FLAG_NOCACHE: return "nocache";
192 default: return "???";
193 }
194 }
195
196 BufferSpace *space;
197 uint16_t state; ///< STATE_*
198 uint16_t cache_private = 0; ///< opaque (to us) value used by Cache impl
199 uint32_t flags; ///< FLAG_*
200 uint64_t seq;
201 uint32_t offset, length;
202 bufferlist data;
203
204 boost::intrusive::list_member_hook<> lru_item;
205 boost::intrusive::list_member_hook<> state_item;
206
207 Buffer(BufferSpace *space, unsigned s, uint64_t q, uint32_t o, uint32_t l,
208 unsigned f = 0)
209 : space(space), state(s), flags(f), seq(q), offset(o), length(l) {}
210 Buffer(BufferSpace *space, unsigned s, uint64_t q, uint32_t o, bufferlist& b,
211 unsigned f = 0)
212 : space(space), state(s), flags(f), seq(q), offset(o),
213 length(b.length()), data(b) {}
214
215 bool is_empty() const {
216 return state == STATE_EMPTY;
217 }
218 bool is_clean() const {
219 return state == STATE_CLEAN;
220 }
221 bool is_writing() const {
222 return state == STATE_WRITING;
223 }
224
225 uint32_t end() const {
226 return offset + length;
227 }
228
229 void truncate(uint32_t newlen) {
230 ceph_assert(newlen < length);
231 if (data.length()) {
232 bufferlist t;
233 t.substr_of(data, 0, newlen);
234 data.claim(t);
235 }
236 length = newlen;
237 }
238 void maybe_rebuild() {
239 if (data.length() &&
240 (data.get_num_buffers() > 1 ||
241 data.front().wasted() > data.length() / MAX_BUFFER_SLOP_RATIO_DEN)) {
242 data.rebuild();
243 }
244 }
245
246 void dump(Formatter *f) const {
247 f->dump_string("state", get_state_name(state));
248 f->dump_unsigned("seq", seq);
249 f->dump_unsigned("offset", offset);
250 f->dump_unsigned("length", length);
251 f->dump_unsigned("data_length", data.length());
252 }
253 };
254
255 struct Cache;
256
257 /// map logical extent range (object) onto buffers
258 struct BufferSpace {
259 enum {
260 BYPASS_CLEAN_CACHE = 0x1, // bypass clean cache
261 };
262
263 typedef boost::intrusive::list<
264 Buffer,
265 boost::intrusive::member_hook<
266 Buffer,
267 boost::intrusive::list_member_hook<>,
268 &Buffer::state_item> > state_list_t;
269
270 mempool::bluestore_cache_other::map<uint32_t, std::unique_ptr<Buffer>>
271 buffer_map;
272
273 // we use a bare intrusive list here instead of std::map because
274 // it uses less memory and we expect this to be very small (very
275 // few IOs in flight to the same Blob at the same time).
276 state_list_t writing; ///< writing buffers, sorted by seq, ascending
277
278 ~BufferSpace() {
279 ceph_assert(buffer_map.empty());
280 ceph_assert(writing.empty());
281 }
282
283 void _add_buffer(Cache* cache, Buffer *b, int level, Buffer *near) {
284 cache->_audit("_add_buffer start");
285 buffer_map[b->offset].reset(b);
286 if (b->is_writing()) {
287 b->data.reassign_to_mempool(mempool::mempool_bluestore_writing);
288 if (writing.empty() || writing.rbegin()->seq <= b->seq) {
289 writing.push_back(*b);
290 } else {
291 auto it = writing.begin();
292 while (it->seq < b->seq) {
293 ++it;
294 }
295
296 ceph_assert(it->seq >= b->seq);
297 // note that this will insert b before it
298 // hence the order is maintained
299 writing.insert(it, *b);
300 }
301 } else {
302 b->data.reassign_to_mempool(mempool::mempool_bluestore_cache_data);
303 cache->_add_buffer(b, level, near);
304 }
305 cache->_audit("_add_buffer end");
306 }
307 void _rm_buffer(Cache* cache, Buffer *b) {
308 _rm_buffer(cache, buffer_map.find(b->offset));
309 }
310 void _rm_buffer(Cache* cache,
311 map<uint32_t, std::unique_ptr<Buffer>>::iterator p) {
312 ceph_assert(p != buffer_map.end());
313 cache->_audit("_rm_buffer start");
314 if (p->second->is_writing()) {
315 writing.erase(writing.iterator_to(*p->second));
316 } else {
317 cache->_rm_buffer(p->second.get());
318 }
319 buffer_map.erase(p);
320 cache->_audit("_rm_buffer end");
321 }
322
323 map<uint32_t,std::unique_ptr<Buffer>>::iterator _data_lower_bound(
324 uint32_t offset) {
325 auto i = buffer_map.lower_bound(offset);
326 if (i != buffer_map.begin()) {
327 --i;
328 if (i->first + i->second->length <= offset)
329 ++i;
330 }
331 return i;
332 }
333
334 // must be called under protection of the Cache lock
335 void _clear(Cache* cache);
336
337 // return value is the highest cache_private of a trimmed buffer, or 0.
338 int discard(Cache* cache, uint32_t offset, uint32_t length) {
339 std::lock_guard l(cache->lock);
340 return _discard(cache, offset, length);
341 }
342 int _discard(Cache* cache, uint32_t offset, uint32_t length);
343
344 void write(Cache* cache, uint64_t seq, uint32_t offset, bufferlist& bl,
345 unsigned flags) {
346 std::lock_guard l(cache->lock);
347 Buffer *b = new Buffer(this, Buffer::STATE_WRITING, seq, offset, bl,
348 flags);
349 b->cache_private = _discard(cache, offset, bl.length());
350 _add_buffer(cache, b, (flags & Buffer::FLAG_NOCACHE) ? 0 : 1, nullptr);
351 }
352 void _finish_write(Cache* cache, uint64_t seq);
353 void did_read(Cache* cache, uint32_t offset, bufferlist& bl) {
354 std::lock_guard l(cache->lock);
355 Buffer *b = new Buffer(this, Buffer::STATE_CLEAN, 0, offset, bl);
356 b->cache_private = _discard(cache, offset, bl.length());
357 _add_buffer(cache, b, 1, nullptr);
358 }
359
360 void read(Cache* cache, uint32_t offset, uint32_t length,
361 BlueStore::ready_regions_t& res,
362 interval_set<uint32_t>& res_intervals,
363 int flags = 0);
364
365 void truncate(Cache* cache, uint32_t offset) {
366 discard(cache, offset, (uint32_t)-1 - offset);
367 }
368
369 void split(Cache* cache, size_t pos, BufferSpace &r);
370
371 void dump(Cache* cache, Formatter *f) const {
372 std::lock_guard l(cache->lock);
373 f->open_array_section("buffers");
374 for (auto& i : buffer_map) {
375 f->open_object_section("buffer");
376 ceph_assert(i.first == i.second->offset);
377 i.second->dump(f);
378 f->close_section();
379 }
380 f->close_section();
381 }
382 };
383
384 struct SharedBlobSet;
385
386 /// in-memory shared blob state (incl cached buffers)
387 struct SharedBlob {
388 MEMPOOL_CLASS_HELPERS();
389
390 std::atomic_int nref = {0}; ///< reference count
391 bool loaded = false;
392
393 CollectionRef coll;
394 union {
395 uint64_t sbid_unloaded; ///< sbid if persistent isn't loaded
396 bluestore_shared_blob_t *persistent; ///< persistent part of the shared blob if any
397 };
398 BufferSpace bc; ///< buffer cache
399
400 SharedBlob(Collection *_coll) : coll(_coll), sbid_unloaded(0) {
401 if (get_cache()) {
402 get_cache()->add_blob();
403 }
404 }
405 SharedBlob(uint64_t i, Collection *_coll);
406 ~SharedBlob();
407
408 uint64_t get_sbid() const {
409 return loaded ? persistent->sbid : sbid_unloaded;
410 }
411
412 friend void intrusive_ptr_add_ref(SharedBlob *b) { b->get(); }
413 friend void intrusive_ptr_release(SharedBlob *b) { b->put(); }
414
415 friend ostream& operator<<(ostream& out, const SharedBlob& sb);
416
417 void get() {
418 ++nref;
419 }
420 void put();
421
422 /// get logical references
423 void get_ref(uint64_t offset, uint32_t length);
424
425 /// put logical references, and get back any released extents
426 void put_ref(uint64_t offset, uint32_t length,
427 PExtentVector *r, bool *unshare);
428
429 void finish_write(uint64_t seq);
430
431 friend bool operator==(const SharedBlob &l, const SharedBlob &r) {
432 return l.get_sbid() == r.get_sbid();
433 }
434 inline Cache* get_cache() {
435 return coll ? coll->cache : nullptr;
436 }
437 inline SharedBlobSet* get_parent() {
438 return coll ? &(coll->shared_blob_set) : nullptr;
439 }
440 inline bool is_loaded() const {
441 return loaded;
442 }
443
444 };
445 typedef boost::intrusive_ptr<SharedBlob> SharedBlobRef;
446
447 /// a lookup table of SharedBlobs
448 struct SharedBlobSet {
449 /// protect lookup, insertion, removal
450 ceph::mutex lock = ceph::make_mutex("BlueStore::SharedBlobSet::lock");
451
452 // we use a bare pointer because we don't want to affect the ref
453 // count
454 mempool::bluestore_cache_other::unordered_map<uint64_t,SharedBlob*> sb_map;
455
456 SharedBlobRef lookup(uint64_t sbid) {
457 std::lock_guard l(lock);
458 auto p = sb_map.find(sbid);
459 if (p == sb_map.end() ||
460 p->second->nref == 0) {
461 return nullptr;
462 }
463 return p->second;
464 }
465
466 void add(Collection* coll, SharedBlob *sb) {
467 std::lock_guard l(lock);
468 sb_map[sb->get_sbid()] = sb;
469 sb->coll = coll;
470 }
471
472 bool remove(SharedBlob *sb, bool verify_nref_is_zero=false) {
473 std::lock_guard l(lock);
474 ceph_assert(sb->get_parent() == this);
475 if (verify_nref_is_zero && sb->nref != 0) {
476 return false;
477 }
478 // only remove if it still points to us
479 auto p = sb_map.find(sb->get_sbid());
480 if (p != sb_map.end() &&
481 p->second == sb) {
482 sb_map.erase(p);
483 }
484 return true;
485 }
486
487 bool empty() {
488 std::lock_guard l(lock);
489 return sb_map.empty();
490 }
491
492 template <int LogLevelV>
493 void dump(CephContext *cct);
494 };
495
496 //#define CACHE_BLOB_BL // not sure if this is a win yet or not... :/
497
498 /// in-memory blob metadata and associated cached buffers (if any)
499 struct Blob {
500 MEMPOOL_CLASS_HELPERS();
501
502 std::atomic_int nref = {0}; ///< reference count
503 int16_t id = -1; ///< id, for spanning blobs only, >= 0
504 int16_t last_encoded_id = -1; ///< (ephemeral) used during encoding only
505 SharedBlobRef shared_blob; ///< shared blob state (if any)
506
507 private:
508 mutable bluestore_blob_t blob; ///< decoded blob metadata
509 #ifdef CACHE_BLOB_BL
510 mutable bufferlist blob_bl; ///< cached encoded blob, blob is dirty if empty
511 #endif
512 /// refs from this shard. ephemeral if id<0, persisted if spanning.
513 bluestore_blob_use_tracker_t used_in_blob;
514
515 public:
516
517 friend void intrusive_ptr_add_ref(Blob *b) { b->get(); }
518 friend void intrusive_ptr_release(Blob *b) { b->put(); }
519
520 friend ostream& operator<<(ostream& out, const Blob &b);
521
522 const bluestore_blob_use_tracker_t& get_blob_use_tracker() const {
523 return used_in_blob;
524 }
525 bool is_referenced() const {
526 return used_in_blob.is_not_empty();
527 }
528 uint32_t get_referenced_bytes() const {
529 return used_in_blob.get_referenced_bytes();
530 }
531
532 bool is_spanning() const {
533 return id >= 0;
534 }
535
536 bool can_split() const {
537 std::lock_guard l(shared_blob->get_cache()->lock);
538 // splitting a BufferSpace writing list is too hard; don't try.
539 return shared_blob->bc.writing.empty() &&
540 used_in_blob.can_split() &&
541 get_blob().can_split();
542 }
543
544 bool can_split_at(uint32_t blob_offset) const {
545 return used_in_blob.can_split_at(blob_offset) &&
546 get_blob().can_split_at(blob_offset);
547 }
548
549 bool can_reuse_blob(uint32_t min_alloc_size,
550 uint32_t target_blob_size,
551 uint32_t b_offset,
552 uint32_t *length0);
553
554 void dup(Blob& o) {
555 o.shared_blob = shared_blob;
556 o.blob = blob;
557 #ifdef CACHE_BLOB_BL
558 o.blob_bl = blob_bl;
559 #endif
560 }
561
562 inline const bluestore_blob_t& get_blob() const {
563 return blob;
564 }
565 inline bluestore_blob_t& dirty_blob() {
566 #ifdef CACHE_BLOB_BL
567 blob_bl.clear();
568 #endif
569 return blob;
570 }
571
572 /// discard buffers for unallocated regions
573 void discard_unallocated(Collection *coll);
574
575 /// get logical references
576 void get_ref(Collection *coll, uint32_t offset, uint32_t length);
577 /// put logical references, and get back any released extents
578 bool put_ref(Collection *coll, uint32_t offset, uint32_t length,
579 PExtentVector *r);
580
581 /// split the blob
582 void split(Collection *coll, uint32_t blob_offset, Blob *o);
583
584 void get() {
585 ++nref;
586 }
587 void put() {
588 if (--nref == 0)
589 delete this;
590 }
591
592
593 #ifdef CACHE_BLOB_BL
594 void _encode() const {
595 if (blob_bl.length() == 0 ) {
596 encode(blob, blob_bl);
597 } else {
598 ceph_assert(blob_bl.length());
599 }
600 }
601 void bound_encode(
602 size_t& p,
603 bool include_ref_map) const {
604 _encode();
605 p += blob_bl.length();
606 if (include_ref_map) {
607 used_in_blob.bound_encode(p);
608 }
609 }
610 void encode(
611 bufferlist::contiguous_appender& p,
612 bool include_ref_map) const {
613 _encode();
614 p.append(blob_bl);
615 if (include_ref_map) {
616 used_in_blob.encode(p);
617 }
618 }
619 void decode(
620 Collection */*coll*/,
621 bufferptr::const_iterator& p,
622 bool include_ref_map) {
623 const char *start = p.get_pos();
624 denc(blob, p);
625 const char *end = p.get_pos();
626 blob_bl.clear();
627 blob_bl.append(start, end - start);
628 if (include_ref_map) {
629 used_in_blob.decode(p);
630 }
631 }
632 #else
633 void bound_encode(
634 size_t& p,
635 uint64_t struct_v,
636 uint64_t sbid,
637 bool include_ref_map) const {
638 denc(blob, p, struct_v);
639 if (blob.is_shared()) {
640 denc(sbid, p);
641 }
642 if (include_ref_map) {
643 used_in_blob.bound_encode(p);
644 }
645 }
646 void encode(
647 bufferlist::contiguous_appender& p,
648 uint64_t struct_v,
649 uint64_t sbid,
650 bool include_ref_map) const {
651 denc(blob, p, struct_v);
652 if (blob.is_shared()) {
653 denc(sbid, p);
654 }
655 if (include_ref_map) {
656 used_in_blob.encode(p);
657 }
658 }
659 void decode(
660 Collection *coll,
661 bufferptr::const_iterator& p,
662 uint64_t struct_v,
663 uint64_t* sbid,
664 bool include_ref_map);
665 #endif
666 };
667 typedef boost::intrusive_ptr<Blob> BlobRef;
668 typedef mempool::bluestore_cache_other::map<int,BlobRef> blob_map_t;
669
670 /// a logical extent, pointing to (some portion of) a blob
671 typedef boost::intrusive::set_base_hook<boost::intrusive::optimize_size<true> > ExtentBase; //making an alias to avoid build warnings
672 struct Extent : public ExtentBase {
673 MEMPOOL_CLASS_HELPERS();
674
675 uint32_t logical_offset = 0; ///< logical offset
676 uint32_t blob_offset = 0; ///< blob offset
677 uint32_t length = 0; ///< length
678 BlobRef blob; ///< the blob with our data
679
680 /// ctor for lookup only
681 explicit Extent(uint32_t lo) : ExtentBase(), logical_offset(lo) { }
682 /// ctor for delayed initialization (see decode_some())
683 explicit Extent() : ExtentBase() {
684 }
685 /// ctor for general usage
686 Extent(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b)
687 : ExtentBase(),
688 logical_offset(lo), blob_offset(o), length(l) {
689 assign_blob(b);
690 }
691 ~Extent() {
692 if (blob) {
693 blob->shared_blob->get_cache()->rm_extent();
694 }
695 }
696
697 void assign_blob(const BlobRef& b) {
698 ceph_assert(!blob);
699 blob = b;
700 blob->shared_blob->get_cache()->add_extent();
701 }
702
703 // comparators for intrusive_set
704 friend bool operator<(const Extent &a, const Extent &b) {
705 return a.logical_offset < b.logical_offset;
706 }
707 friend bool operator>(const Extent &a, const Extent &b) {
708 return a.logical_offset > b.logical_offset;
709 }
710 friend bool operator==(const Extent &a, const Extent &b) {
711 return a.logical_offset == b.logical_offset;
712 }
713
714 uint32_t blob_start() const {
715 return logical_offset - blob_offset;
716 }
717
718 uint32_t blob_end() const {
719 return blob_start() + blob->get_blob().get_logical_length();
720 }
721
722 uint32_t logical_end() const {
723 return logical_offset + length;
724 }
725
726 // return true if any piece of the blob is out of
727 // the given range [o, o + l].
728 bool blob_escapes_range(uint32_t o, uint32_t l) const {
729 return blob_start() < o || blob_end() > o + l;
730 }
731 };
732 typedef boost::intrusive::set<Extent> extent_map_t;
733
734
735 friend ostream& operator<<(ostream& out, const Extent& e);
736
737 struct OldExtent {
738 boost::intrusive::list_member_hook<> old_extent_item;
739 Extent e;
740 PExtentVector r;
741 bool blob_empty; // flag to track the last removed extent that makes blob
742 // empty - required to update compression stat properly
743 OldExtent(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b)
744 : e(lo, o, l, b), blob_empty(false) {
745 }
746 static OldExtent* create(CollectionRef c,
747 uint32_t lo,
748 uint32_t o,
749 uint32_t l,
750 BlobRef& b);
751 };
752 typedef boost::intrusive::list<
753 OldExtent,
754 boost::intrusive::member_hook<
755 OldExtent,
756 boost::intrusive::list_member_hook<>,
757 &OldExtent::old_extent_item> > old_extent_map_t;
758
759 struct Onode;
760
761 /// a sharded extent map, mapping offsets to lextents to blobs
762 struct ExtentMap {
763 Onode *onode;
764 extent_map_t extent_map; ///< map of Extents to Blobs
765 blob_map_t spanning_blob_map; ///< blobs that span shards
766 typedef boost::intrusive_ptr<Onode> OnodeRef;
767
768 struct Shard {
769 bluestore_onode_t::shard_info *shard_info = nullptr;
770 unsigned extents = 0; ///< count extents in this shard
771 bool loaded = false; ///< true if shard is loaded
772 bool dirty = false; ///< true if shard is dirty and needs reencoding
773 };
774 mempool::bluestore_cache_other::vector<Shard> shards; ///< shards
775
776 bufferlist inline_bl; ///< cached encoded map, if unsharded; empty=>dirty
777
778 uint32_t needs_reshard_begin = 0;
779 uint32_t needs_reshard_end = 0;
780
781 void dup(BlueStore* b, TransContext*, CollectionRef&, OnodeRef&, OnodeRef&,
782 uint64_t&, uint64_t&, uint64_t&);
783
784 bool needs_reshard() const {
785 return needs_reshard_end > needs_reshard_begin;
786 }
787 void clear_needs_reshard() {
788 needs_reshard_begin = needs_reshard_end = 0;
789 }
790 void request_reshard(uint32_t begin, uint32_t end) {
791 if (begin < needs_reshard_begin) {
792 needs_reshard_begin = begin;
793 }
794 if (end > needs_reshard_end) {
795 needs_reshard_end = end;
796 }
797 }
798
799 struct DeleteDisposer {
800 void operator()(Extent *e) { delete e; }
801 };
802
803 ExtentMap(Onode *o);
804 ~ExtentMap() {
805 extent_map.clear_and_dispose(DeleteDisposer());
806 }
807
808 void clear() {
809 extent_map.clear_and_dispose(DeleteDisposer());
810 shards.clear();
811 inline_bl.clear();
812 clear_needs_reshard();
813 }
814
815 bool encode_some(uint32_t offset, uint32_t length, bufferlist& bl,
816 unsigned *pn);
817 unsigned decode_some(bufferlist& bl);
818
819 void bound_encode_spanning_blobs(size_t& p);
820 void encode_spanning_blobs(bufferlist::contiguous_appender& p);
821 void decode_spanning_blobs(bufferptr::const_iterator& p);
822
823 BlobRef get_spanning_blob(int id) {
824 auto p = spanning_blob_map.find(id);
825 ceph_assert(p != spanning_blob_map.end());
826 return p->second;
827 }
828
829 void update(KeyValueDB::Transaction t, bool force);
830 decltype(BlueStore::Blob::id) allocate_spanning_blob_id();
831 void reshard(
832 KeyValueDB *db,
833 KeyValueDB::Transaction t);
834
835 /// initialize Shards from the onode
836 void init_shards(bool loaded, bool dirty);
837
838 /// return index of shard containing offset
839 /// or -1 if not found
840 int seek_shard(uint32_t offset) {
841 size_t end = shards.size();
842 size_t mid, left = 0;
843 size_t right = end; // one passed the right end
844
845 while (left < right) {
846 mid = left + (right - left) / 2;
847 if (offset >= shards[mid].shard_info->offset) {
848 size_t next = mid + 1;
849 if (next >= end || offset < shards[next].shard_info->offset)
850 return mid;
851 //continue to search forwards
852 left = next;
853 } else {
854 //continue to search backwards
855 right = mid;
856 }
857 }
858
859 return -1; // not found
860 }
861
862 /// check if a range spans a shard
863 bool spans_shard(uint32_t offset, uint32_t length) {
864 if (shards.empty()) {
865 return false;
866 }
867 int s = seek_shard(offset);
868 ceph_assert(s >= 0);
869 if (s == (int)shards.size() - 1) {
870 return false; // last shard
871 }
872 if (offset + length <= shards[s+1].shard_info->offset) {
873 return false;
874 }
875 return true;
876 }
877
878 /// ensure that a range of the map is loaded
879 void fault_range(KeyValueDB *db,
880 uint32_t offset, uint32_t length);
881
882 /// ensure a range of the map is marked dirty
883 void dirty_range(uint32_t offset, uint32_t length);
884
885 /// for seek_lextent test
886 extent_map_t::iterator find(uint64_t offset);
887
888 /// seek to the first lextent including or after offset
889 extent_map_t::iterator seek_lextent(uint64_t offset);
890 extent_map_t::const_iterator seek_lextent(uint64_t offset) const;
891
892 /// add a new Extent
893 void add(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b) {
894 extent_map.insert(*new Extent(lo, o, l, b));
895 }
896
897 /// remove (and delete) an Extent
898 void rm(extent_map_t::iterator p) {
899 extent_map.erase_and_dispose(p, DeleteDisposer());
900 }
901
902 bool has_any_lextents(uint64_t offset, uint64_t length);
903
904 /// consolidate adjacent lextents in extent_map
905 int compress_extent_map(uint64_t offset, uint64_t length);
906
907 /// punch a logical hole. add lextents to deref to target list.
908 void punch_hole(CollectionRef &c,
909 uint64_t offset, uint64_t length,
910 old_extent_map_t *old_extents);
911
912 /// put new lextent into lextent_map overwriting existing ones if
913 /// any and update references accordingly
914 Extent *set_lextent(CollectionRef &c,
915 uint64_t logical_offset,
916 uint64_t offset, uint64_t length,
917 BlobRef b,
918 old_extent_map_t *old_extents);
919
920 /// split a blob (and referring extents)
921 BlobRef split_blob(BlobRef lb, uint32_t blob_offset, uint32_t pos);
922 };
923
924 /// Compressed Blob Garbage collector
925 /*
926 The primary idea of the collector is to estimate a difference between
927 allocation units(AU) currently present for compressed blobs and new AUs
928 required to store that data uncompressed.
929 Estimation is performed for protrusive extents within a logical range
930 determined by a concatenation of old_extents collection and specific(current)
931 write request.
932 The root cause for old_extents use is the need to handle blob ref counts
933 properly. Old extents still hold blob refs and hence we need to traverse
934 the collection to determine if blob to be released.
935 Protrusive extents are extents that fit into the blob set in action
936 (ones that are below the logical range from above) but not removed totally
937 due to the current write.
938 E.g. for
939 extent1 <loffs = 100, boffs = 100, len = 100> ->
940 blob1<compressed, len_on_disk=4096, logical_len=8192>
941 extent2 <loffs = 200, boffs = 200, len = 100> ->
942 blob2<raw, len_on_disk=4096, llen=4096>
943 extent3 <loffs = 300, boffs = 300, len = 100> ->
944 blob1<compressed, len_on_disk=4096, llen=8192>
945 extent4 <loffs = 4096, boffs = 0, len = 100> ->
946 blob3<raw, len_on_disk=4096, llen=4096>
947 write(300~100)
948 protrusive extents are within the following ranges <0~300, 400~8192-400>
949 In this case existing AUs that might be removed due to GC (i.e. blob1)
950 use 2x4K bytes.
951 And new AUs expected after GC = 0 since extent1 to be merged into blob2.
952 Hence we should do a collect.
953 */
954 class GarbageCollector
955 {
956 public:
957 /// return amount of allocation units that might be saved due to GC
958 int64_t estimate(
959 uint64_t offset,
960 uint64_t length,
961 const ExtentMap& extent_map,
962 const old_extent_map_t& old_extents,
963 uint64_t min_alloc_size);
964
965 /// return a collection of extents to perform GC on
966 const vector<bluestore_pextent_t>& get_extents_to_collect() const {
967 return extents_to_collect;
968 }
969 GarbageCollector(CephContext* _cct) : cct(_cct) {}
970
971 private:
972 struct BlobInfo {
973 uint64_t referenced_bytes = 0; ///< amount of bytes referenced in blob
974 int64_t expected_allocations = 0; ///< new alloc units required
975 ///< in case of gc fulfilled
976 bool collect_candidate = false; ///< indicate if blob has any extents
977 ///< eligible for GC.
978 extent_map_t::const_iterator first_lextent; ///< points to the first
979 ///< lextent referring to
980 ///< the blob if any.
981 ///< collect_candidate flag
982 ///< determines the validity
983 extent_map_t::const_iterator last_lextent; ///< points to the last
984 ///< lextent referring to
985 ///< the blob if any.
986
987 BlobInfo(uint64_t ref_bytes) :
988 referenced_bytes(ref_bytes) {
989 }
990 };
991 CephContext* cct;
992 map<Blob*, BlobInfo> affected_blobs; ///< compressed blobs and their ref_map
993 ///< copies that are affected by the
994 ///< specific write
995
996 ///< protrusive extents that should be collected if GC takes place
997 vector<bluestore_pextent_t> extents_to_collect;
998
999 boost::optional<uint64_t > used_alloc_unit; ///< last processed allocation
1000 ///< unit when traversing
1001 ///< protrusive extents.
1002 ///< Other extents mapped to
1003 ///< this AU to be ignored
1004 ///< (except the case where
1005 ///< uncompressed extent follows
1006 ///< compressed one - see below).
1007 BlobInfo* blob_info_counted = nullptr; ///< set if previous allocation unit
1008 ///< caused expected_allocations
1009 ///< counter increment at this blob.
1010 ///< if uncompressed extent follows
1011 ///< a decrement for the
1012 ///< expected_allocations counter
1013 ///< is needed
1014 int64_t expected_allocations = 0; ///< new alloc units required in case
1015 ///< of gc fulfilled
1016 int64_t expected_for_release = 0; ///< alloc units currently used by
1017 ///< compressed blobs that might
1018 ///< gone after GC
1019 uint64_t gc_start_offset = 0; ///starting offset for GC
1020 uint64_t gc_end_offset = 0; ///ending offset for GC
1021
1022 protected:
1023 void process_protrusive_extents(const BlueStore::ExtentMap& extent_map,
1024 uint64_t start_offset,
1025 uint64_t end_offset,
1026 uint64_t start_touch_offset,
1027 uint64_t end_touch_offset,
1028 uint64_t min_alloc_size);
1029 };
1030
1031 struct OnodeSpace;
1032
1033 /// an in-memory object
1034 struct Onode {
1035 MEMPOOL_CLASS_HELPERS();
1036
1037 std::atomic_int nref; ///< reference count
1038 Collection *c;
1039
1040 ghobject_t oid;
1041
1042 /// key under PREFIX_OBJ where we are stored
1043 mempool::bluestore_cache_other::string key;
1044
1045 boost::intrusive::list_member_hook<> lru_item;
1046
1047 bluestore_onode_t onode; ///< metadata stored as value in kv store
1048 bool exists; ///< true if object logically exists
1049
1050 ExtentMap extent_map;
1051
1052 // track txc's that have not been committed to kv store (and whose
1053 // effects cannot be read via the kvdb read methods)
1054 std::atomic<int> flushing_count = {0};
1055 /// protect flush_txns
1056 ceph::mutex flush_lock = ceph::make_mutex("BlueStore::Onode::flush_lock");
1057 ceph::condition_variable flush_cond; ///< wait here for uncommitted txns
1058
1059 Onode(Collection *c, const ghobject_t& o,
1060 const mempool::bluestore_cache_other::string& k)
1061 : nref(0),
1062 c(c),
1063 oid(o),
1064 key(k),
1065 exists(false),
1066 extent_map(this) {
1067 }
1068
1069 void flush();
1070 void get() {
1071 ++nref;
1072 }
1073 void put() {
1074 if (--nref == 0)
1075 delete this;
1076 }
1077 };
1078 typedef boost::intrusive_ptr<Onode> OnodeRef;
1079
1080
1081 /// a cache (shard) of onodes and buffers
1082 struct Cache {
1083 CephContext* cct;
1084 PerfCounters *logger;
1085
1086 /// protect lru and other structures
1087 ceph::recursive_mutex lock = {
1088 ceph::make_recursive_mutex("BlueStore::Cache::lock") };
1089
1090 std::atomic<uint64_t> num_extents = {0};
1091 std::atomic<uint64_t> num_blobs = {0};
1092
1093 static Cache *create(CephContext* cct, string type, PerfCounters *logger);
1094
1095 Cache(CephContext* cct) : cct(cct), logger(nullptr) {}
1096 virtual ~Cache() {}
1097
1098 virtual void _add_onode(OnodeRef& o, int level) = 0;
1099 virtual void _rm_onode(OnodeRef& o) = 0;
1100 virtual void _touch_onode(OnodeRef& o) = 0;
1101
1102 virtual void _add_buffer(Buffer *b, int level, Buffer *near) = 0;
1103 virtual void _rm_buffer(Buffer *b) = 0;
1104 virtual void _move_buffer(Cache *src, Buffer *b) = 0;
1105 virtual void _adjust_buffer_size(Buffer *b, int64_t delta) = 0;
1106 virtual void _touch_buffer(Buffer *b) = 0;
1107
1108 virtual uint64_t _get_num_onodes() = 0;
1109 virtual uint64_t _get_buffer_bytes() = 0;
1110
1111 void add_extent() {
1112 ++num_extents;
1113 }
1114 void rm_extent() {
1115 --num_extents;
1116 }
1117
1118 void add_blob() {
1119 ++num_blobs;
1120 }
1121 void rm_blob() {
1122 --num_blobs;
1123 }
1124
1125 void trim(uint64_t onode_max, uint64_t buffer_max);
1126
1127 void trim_all();
1128
1129 virtual void _trim(uint64_t onode_max, uint64_t buffer_max) = 0;
1130
1131 virtual void add_stats(uint64_t *onodes, uint64_t *extents,
1132 uint64_t *blobs,
1133 uint64_t *buffers,
1134 uint64_t *bytes) = 0;
1135
1136 bool empty() {
1137 std::lock_guard l(lock);
1138 return _get_num_onodes() == 0 && _get_buffer_bytes() == 0;
1139 }
1140
1141 #ifdef DEBUG_CACHE
1142 virtual void _audit(const char *s) = 0;
1143 #else
1144 void _audit(const char *s) { /* no-op */ }
1145 #endif
1146 };
1147
1148 /// simple LRU cache for onodes and buffers
1149 struct LRUCache : public Cache {
1150 private:
1151 typedef boost::intrusive::list<
1152 Onode,
1153 boost::intrusive::member_hook<
1154 Onode,
1155 boost::intrusive::list_member_hook<>,
1156 &Onode::lru_item> > onode_lru_list_t;
1157 typedef boost::intrusive::list<
1158 Buffer,
1159 boost::intrusive::member_hook<
1160 Buffer,
1161 boost::intrusive::list_member_hook<>,
1162 &Buffer::lru_item> > buffer_lru_list_t;
1163
1164 onode_lru_list_t onode_lru;
1165
1166 buffer_lru_list_t buffer_lru;
1167 uint64_t buffer_size = 0;
1168
1169 public:
1170 LRUCache(CephContext* cct) : Cache(cct) {}
1171 uint64_t _get_num_onodes() override {
1172 return onode_lru.size();
1173 }
1174 void _add_onode(OnodeRef& o, int level) override {
1175 if (level > 0)
1176 onode_lru.push_front(*o);
1177 else
1178 onode_lru.push_back(*o);
1179 }
1180 void _rm_onode(OnodeRef& o) override {
1181 auto q = onode_lru.iterator_to(*o);
1182 onode_lru.erase(q);
1183 }
1184 void _touch_onode(OnodeRef& o) override;
1185
1186 uint64_t _get_buffer_bytes() override {
1187 return buffer_size;
1188 }
1189 void _add_buffer(Buffer *b, int level, Buffer *near) override {
1190 if (near) {
1191 auto q = buffer_lru.iterator_to(*near);
1192 buffer_lru.insert(q, *b);
1193 } else if (level > 0) {
1194 buffer_lru.push_front(*b);
1195 } else {
1196 buffer_lru.push_back(*b);
1197 }
1198 buffer_size += b->length;
1199 }
1200 void _rm_buffer(Buffer *b) override {
1201 ceph_assert(buffer_size >= b->length);
1202 buffer_size -= b->length;
1203 auto q = buffer_lru.iterator_to(*b);
1204 buffer_lru.erase(q);
1205 }
1206 void _move_buffer(Cache *src, Buffer *b) override {
1207 src->_rm_buffer(b);
1208 _add_buffer(b, 0, nullptr);
1209 }
1210 void _adjust_buffer_size(Buffer *b, int64_t delta) override {
1211 ceph_assert((int64_t)buffer_size + delta >= 0);
1212 buffer_size += delta;
1213 }
1214 void _touch_buffer(Buffer *b) override {
1215 auto p = buffer_lru.iterator_to(*b);
1216 buffer_lru.erase(p);
1217 buffer_lru.push_front(*b);
1218 _audit("_touch_buffer end");
1219 }
1220
1221 void _trim(uint64_t onode_max, uint64_t buffer_max) override;
1222
1223 void add_stats(uint64_t *onodes, uint64_t *extents,
1224 uint64_t *blobs,
1225 uint64_t *buffers,
1226 uint64_t *bytes) override {
1227 std::lock_guard l(lock);
1228 *onodes += onode_lru.size();
1229 *extents += num_extents;
1230 *blobs += num_blobs;
1231 *buffers += buffer_lru.size();
1232 *bytes += buffer_size;
1233 }
1234
1235 #ifdef DEBUG_CACHE
1236 void _audit(const char *s) override;
1237 #endif
1238 };
1239
1240 // 2Q cache for buffers, LRU for onodes
1241 struct TwoQCache : public Cache {
1242 private:
1243 // stick with LRU for onodes for now (fixme?)
1244 typedef boost::intrusive::list<
1245 Onode,
1246 boost::intrusive::member_hook<
1247 Onode,
1248 boost::intrusive::list_member_hook<>,
1249 &Onode::lru_item> > onode_lru_list_t;
1250 typedef boost::intrusive::list<
1251 Buffer,
1252 boost::intrusive::member_hook<
1253 Buffer,
1254 boost::intrusive::list_member_hook<>,
1255 &Buffer::lru_item> > buffer_list_t;
1256
1257 onode_lru_list_t onode_lru;
1258
1259 buffer_list_t buffer_hot; ///< "Am" hot buffers
1260 buffer_list_t buffer_warm_in; ///< "A1in" newly warm buffers
1261 buffer_list_t buffer_warm_out; ///< "A1out" empty buffers we've evicted
1262 uint64_t buffer_bytes = 0; ///< bytes
1263
1264 enum {
1265 BUFFER_NEW = 0,
1266 BUFFER_WARM_IN, ///< in buffer_warm_in
1267 BUFFER_WARM_OUT, ///< in buffer_warm_out
1268 BUFFER_HOT, ///< in buffer_hot
1269 BUFFER_TYPE_MAX
1270 };
1271
1272 uint64_t buffer_list_bytes[BUFFER_TYPE_MAX] = {0}; ///< bytes per type
1273
1274 public:
1275 TwoQCache(CephContext* cct) : Cache(cct) {}
1276 uint64_t _get_num_onodes() override {
1277 return onode_lru.size();
1278 }
1279 void _add_onode(OnodeRef& o, int level) override {
1280 if (level > 0)
1281 onode_lru.push_front(*o);
1282 else
1283 onode_lru.push_back(*o);
1284 }
1285 void _rm_onode(OnodeRef& o) override {
1286 auto q = onode_lru.iterator_to(*o);
1287 onode_lru.erase(q);
1288 }
1289 void _touch_onode(OnodeRef& o) override;
1290
1291 uint64_t _get_buffer_bytes() override {
1292 return buffer_bytes;
1293 }
1294 void _add_buffer(Buffer *b, int level, Buffer *near) override;
1295 void _rm_buffer(Buffer *b) override;
1296 void _move_buffer(Cache *src, Buffer *b) override;
1297 void _adjust_buffer_size(Buffer *b, int64_t delta) override;
1298 void _touch_buffer(Buffer *b) override {
1299 switch (b->cache_private) {
1300 case BUFFER_WARM_IN:
1301 // do nothing (somewhat counter-intuitively!)
1302 break;
1303 case BUFFER_WARM_OUT:
1304 // move from warm_out to hot LRU
1305 ceph_abort_msg("this happens via discard hint");
1306 break;
1307 case BUFFER_HOT:
1308 // move to front of hot LRU
1309 buffer_hot.erase(buffer_hot.iterator_to(*b));
1310 buffer_hot.push_front(*b);
1311 break;
1312 }
1313 _audit("_touch_buffer end");
1314 }
1315
1316 void _trim(uint64_t onode_max, uint64_t buffer_max) override;
1317
1318 void add_stats(uint64_t *onodes, uint64_t *extents,
1319 uint64_t *blobs,
1320 uint64_t *buffers,
1321 uint64_t *bytes) override {
1322 std::lock_guard l(lock);
1323 *onodes += onode_lru.size();
1324 *extents += num_extents;
1325 *blobs += num_blobs;
1326 *buffers += buffer_hot.size() + buffer_warm_in.size();
1327 *bytes += buffer_bytes;
1328 }
1329
1330 #ifdef DEBUG_CACHE
1331 void _audit(const char *s) override;
1332 #endif
1333 };
1334
1335 struct OnodeSpace {
1336 private:
1337 Cache *cache;
1338
1339 /// forward lookups
1340 mempool::bluestore_cache_other::unordered_map<ghobject_t,OnodeRef> onode_map;
1341
1342 friend class Collection; // for split_cache()
1343
1344 public:
1345 OnodeSpace(Cache *c) : cache(c) {}
1346 ~OnodeSpace() {
1347 clear();
1348 }
1349
1350 OnodeRef add(const ghobject_t& oid, OnodeRef o);
1351 OnodeRef lookup(const ghobject_t& o);
1352 void remove(const ghobject_t& oid) {
1353 onode_map.erase(oid);
1354 }
1355 void rename(OnodeRef& o, const ghobject_t& old_oid,
1356 const ghobject_t& new_oid,
1357 const mempool::bluestore_cache_other::string& new_okey);
1358 void clear();
1359 bool empty();
1360
1361 template <int LogLevelV>
1362 void dump(CephContext *cct);
1363
1364 /// return true if f true for any item
1365 bool map_any(std::function<bool(OnodeRef)> f);
1366 };
1367
1368 class OpSequencer;
1369 typedef boost::intrusive_ptr<OpSequencer> OpSequencerRef;
1370
1371 struct Collection : public CollectionImpl {
1372 BlueStore *store;
1373 OpSequencerRef osr;
1374 Cache *cache; ///< our cache shard
1375 bluestore_cnode_t cnode;
1376 RWLock lock;
1377
1378 bool exists;
1379
1380 SharedBlobSet shared_blob_set; ///< open SharedBlobs
1381
1382 // cache onodes on a per-collection basis to avoid lock
1383 // contention.
1384 OnodeSpace onode_map;
1385
1386 //pool options
1387 pool_opts_t pool_opts;
1388 ContextQueue *commit_queue;
1389
1390 OnodeRef get_onode(const ghobject_t& oid, bool create);
1391
1392 // the terminology is confusing here, sorry!
1393 //
1394 // blob_t shared_blob_t
1395 // !shared unused -> open
1396 // shared !loaded -> open + shared
1397 // shared loaded -> open + shared + loaded
1398 //
1399 // i.e.,
1400 // open = SharedBlob is instantiated
1401 // shared = blob_t shared flag is set; SharedBlob is hashed.
1402 // loaded = SharedBlob::shared_blob_t is loaded from kv store
1403 void open_shared_blob(uint64_t sbid, BlobRef b);
1404 void load_shared_blob(SharedBlobRef sb);
1405 void make_blob_shared(uint64_t sbid, BlobRef b);
1406 uint64_t make_blob_unshared(SharedBlob *sb);
1407
1408 BlobRef new_blob() {
1409 BlobRef b = new Blob();
1410 b->shared_blob = new SharedBlob(this);
1411 return b;
1412 }
1413
1414 bool contains(const ghobject_t& oid) {
1415 if (cid.is_meta())
1416 return oid.hobj.pool == -1;
1417 spg_t spgid;
1418 if (cid.is_pg(&spgid))
1419 return
1420 spgid.pgid.contains(cnode.bits, oid) &&
1421 oid.shard_id == spgid.shard;
1422 return false;
1423 }
1424
1425 void split_cache(Collection *dest);
1426
1427 bool flush_commit(Context *c) override;
1428 void flush() override;
1429 void flush_all_but_last();
1430
1431 Collection(BlueStore *ns, Cache *ca, coll_t c);
1432 };
1433
1434 class OmapIteratorImpl : public ObjectMap::ObjectMapIteratorImpl {
1435 CollectionRef c;
1436 OnodeRef o;
1437 KeyValueDB::Iterator it;
1438 string head, tail;
1439
1440 string _stringify() const;
1441
1442 public:
1443 OmapIteratorImpl(CollectionRef c, OnodeRef o, KeyValueDB::Iterator it);
1444 int seek_to_first() override;
1445 int upper_bound(const string &after) override;
1446 int lower_bound(const string &to) override;
1447 bool valid() override;
1448 int next() override;
1449 string key() override;
1450 bufferlist value() override;
1451 int status() override {
1452 return 0;
1453 }
1454 };
1455
1456 struct volatile_statfs{
1457 enum {
1458 STATFS_ALLOCATED = 0,
1459 STATFS_STORED,
1460 STATFS_COMPRESSED_ORIGINAL,
1461 STATFS_COMPRESSED,
1462 STATFS_COMPRESSED_ALLOCATED,
1463 STATFS_LAST
1464 };
1465 int64_t values[STATFS_LAST];
1466 volatile_statfs() {
1467 memset(this, 0, sizeof(volatile_statfs));
1468 }
1469 void reset() {
1470 *this = volatile_statfs();
1471 }
1472 void publish(store_statfs_t* buf) const {
1473 buf->allocated = allocated();
1474 buf->data_stored = stored();
1475 buf->data_compressed = compressed();
1476 buf->data_compressed_original = compressed_original();
1477 buf->data_compressed_allocated = compressed_allocated();
1478 }
1479
1480 volatile_statfs& operator+=(const volatile_statfs& other) {
1481 for (size_t i = 0; i < STATFS_LAST; ++i) {
1482 values[i] += other.values[i];
1483 }
1484 return *this;
1485 }
1486 int64_t& allocated() {
1487 return values[STATFS_ALLOCATED];
1488 }
1489 int64_t& stored() {
1490 return values[STATFS_STORED];
1491 }
1492 int64_t& compressed_original() {
1493 return values[STATFS_COMPRESSED_ORIGINAL];
1494 }
1495 int64_t& compressed() {
1496 return values[STATFS_COMPRESSED];
1497 }
1498 int64_t& compressed_allocated() {
1499 return values[STATFS_COMPRESSED_ALLOCATED];
1500 }
1501 int64_t allocated() const {
1502 return values[STATFS_ALLOCATED];
1503 }
1504 int64_t stored() const {
1505 return values[STATFS_STORED];
1506 }
1507 int64_t compressed_original() const {
1508 return values[STATFS_COMPRESSED_ORIGINAL];
1509 }
1510 int64_t compressed() const {
1511 return values[STATFS_COMPRESSED];
1512 }
1513 int64_t compressed_allocated() const {
1514 return values[STATFS_COMPRESSED_ALLOCATED];
1515 }
1516 volatile_statfs& operator=(const store_statfs_t& st) {
1517 values[STATFS_ALLOCATED] = st.allocated;
1518 values[STATFS_STORED] = st.data_stored;
1519 values[STATFS_COMPRESSED_ORIGINAL] = st.data_compressed_original;
1520 values[STATFS_COMPRESSED] = st.data_compressed;
1521 values[STATFS_COMPRESSED_ALLOCATED] = st.data_compressed_allocated;
1522 return *this;
1523 }
1524 bool is_empty() {
1525 return values[STATFS_ALLOCATED] == 0 &&
1526 values[STATFS_STORED] == 0 &&
1527 values[STATFS_COMPRESSED] == 0 &&
1528 values[STATFS_COMPRESSED_ORIGINAL] == 0 &&
1529 values[STATFS_COMPRESSED_ALLOCATED] == 0;
1530 }
1531 void decode(bufferlist::const_iterator& it) {
1532 using ceph::decode;
1533 for (size_t i = 0; i < STATFS_LAST; i++) {
1534 decode(values[i], it);
1535 }
1536 }
1537
1538 void encode(bufferlist& bl) {
1539 using ceph::encode;
1540 for (size_t i = 0; i < STATFS_LAST; i++) {
1541 encode(values[i], bl);
1542 }
1543 }
1544 };
1545
1546 struct TransContext final : public AioContext {
1547 MEMPOOL_CLASS_HELPERS();
1548
1549 typedef enum {
1550 STATE_PREPARE,
1551 STATE_AIO_WAIT,
1552 STATE_IO_DONE,
1553 STATE_KV_QUEUED, // queued for kv_sync_thread submission
1554 STATE_KV_SUBMITTED, // submitted to kv; not yet synced
1555 STATE_KV_DONE,
1556 STATE_DEFERRED_QUEUED, // in deferred_queue (pending or running)
1557 STATE_DEFERRED_CLEANUP, // remove deferred kv record
1558 STATE_DEFERRED_DONE,
1559 STATE_FINISHING,
1560 STATE_DONE,
1561 } state_t;
1562
1563 state_t state = STATE_PREPARE;
1564
1565 const char *get_state_name() {
1566 switch (state) {
1567 case STATE_PREPARE: return "prepare";
1568 case STATE_AIO_WAIT: return "aio_wait";
1569 case STATE_IO_DONE: return "io_done";
1570 case STATE_KV_QUEUED: return "kv_queued";
1571 case STATE_KV_SUBMITTED: return "kv_submitted";
1572 case STATE_KV_DONE: return "kv_done";
1573 case STATE_DEFERRED_QUEUED: return "deferred_queued";
1574 case STATE_DEFERRED_CLEANUP: return "deferred_cleanup";
1575 case STATE_DEFERRED_DONE: return "deferred_done";
1576 case STATE_FINISHING: return "finishing";
1577 case STATE_DONE: return "done";
1578 }
1579 return "???";
1580 }
1581
1582 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1583 const char *get_state_latency_name(int state) {
1584 switch (state) {
1585 case l_bluestore_state_prepare_lat: return "prepare";
1586 case l_bluestore_state_aio_wait_lat: return "aio_wait";
1587 case l_bluestore_state_io_done_lat: return "io_done";
1588 case l_bluestore_state_kv_queued_lat: return "kv_queued";
1589 case l_bluestore_state_kv_committing_lat: return "kv_committing";
1590 case l_bluestore_state_kv_done_lat: return "kv_done";
1591 case l_bluestore_state_deferred_queued_lat: return "deferred_queued";
1592 case l_bluestore_state_deferred_cleanup_lat: return "deferred_cleanup";
1593 case l_bluestore_state_finishing_lat: return "finishing";
1594 case l_bluestore_state_done_lat: return "done";
1595 }
1596 return "???";
1597 }
1598 #endif
1599
1600 utime_t log_state_latency(PerfCounters *logger, int state) {
1601 utime_t lat, now = ceph_clock_now();
1602 lat = now - last_stamp;
1603 logger->tinc(state, lat);
1604 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1605 if (state >= l_bluestore_state_prepare_lat && state <= l_bluestore_state_done_lat) {
1606 double usecs = (now.to_nsec()-last_stamp.to_nsec())/1000;
1607 OID_ELAPSED("", usecs, get_state_latency_name(state));
1608 }
1609 #endif
1610 last_stamp = now;
1611 return lat;
1612 }
1613
1614 CollectionRef ch;
1615 OpSequencerRef osr; // this should be ch->osr
1616 boost::intrusive::list_member_hook<> sequencer_item;
1617
1618 uint64_t bytes = 0, cost = 0;
1619
1620 set<OnodeRef> onodes; ///< these need to be updated/written
1621 set<OnodeRef> modified_objects; ///< objects we modified (and need a ref)
1622 set<SharedBlobRef> shared_blobs; ///< these need to be updated/written
1623 set<SharedBlobRef> shared_blobs_written; ///< update these on io completion
1624
1625 KeyValueDB::Transaction t; ///< then we will commit this
1626 list<Context*> oncommits; ///< more commit completions
1627 list<CollectionRef> removed_collections; ///< colls we removed
1628
1629 boost::intrusive::list_member_hook<> deferred_queue_item;
1630 bluestore_deferred_transaction_t *deferred_txn = nullptr; ///< if any
1631
1632 interval_set<uint64_t> allocated, released;
1633 volatile_statfs statfs_delta; ///< overall store statistics delta
1634 uint64_t osd_pool_id = META_POOL_ID; ///< osd pool id we're operating on
1635
1636 IOContext ioc;
1637 bool had_ios = false; ///< true if we submitted IOs before our kv txn
1638
1639 uint64_t seq = 0;
1640 utime_t start;
1641 utime_t last_stamp;
1642
1643 uint64_t last_nid = 0; ///< if non-zero, highest new nid we allocated
1644 uint64_t last_blobid = 0; ///< if non-zero, highest new blobid we allocated
1645
1646 explicit TransContext(CephContext* cct, Collection *c, OpSequencer *o,
1647 list<Context*> *on_commits)
1648 : ch(c),
1649 osr(o),
1650 ioc(cct, this),
1651 start(ceph_clock_now()) {
1652 last_stamp = start;
1653 if (on_commits) {
1654 oncommits.swap(*on_commits);
1655 }
1656 }
1657 ~TransContext() {
1658 delete deferred_txn;
1659 }
1660
1661 void write_onode(OnodeRef &o) {
1662 onodes.insert(o);
1663 }
1664 void write_shared_blob(SharedBlobRef &sb) {
1665 shared_blobs.insert(sb);
1666 }
1667 void unshare_blob(SharedBlob *sb) {
1668 shared_blobs.erase(sb);
1669 }
1670
1671 /// note we logically modified object (when onode itself is unmodified)
1672 void note_modified_object(OnodeRef &o) {
1673 // onode itself isn't written, though
1674 modified_objects.insert(o);
1675 }
1676 void note_removed_object(OnodeRef& o) {
1677 onodes.erase(o);
1678 modified_objects.insert(o);
1679 }
1680
1681 void aio_finish(BlueStore *store) override {
1682 store->txc_aio_finish(this);
1683 }
1684 };
1685
1686 typedef boost::intrusive::list<
1687 TransContext,
1688 boost::intrusive::member_hook<
1689 TransContext,
1690 boost::intrusive::list_member_hook<>,
1691 &TransContext::deferred_queue_item> > deferred_queue_t;
1692
1693 struct DeferredBatch final : public AioContext {
1694 OpSequencer *osr;
1695 struct deferred_io {
1696 bufferlist bl; ///< data
1697 uint64_t seq; ///< deferred transaction seq
1698 };
1699 map<uint64_t,deferred_io> iomap; ///< map of ios in this batch
1700 deferred_queue_t txcs; ///< txcs in this batch
1701 IOContext ioc; ///< our aios
1702 /// bytes of pending io for each deferred seq (may be 0)
1703 map<uint64_t,int> seq_bytes;
1704
1705 void _discard(CephContext *cct, uint64_t offset, uint64_t length);
1706 void _audit(CephContext *cct);
1707
1708 DeferredBatch(CephContext *cct, OpSequencer *osr)
1709 : osr(osr), ioc(cct, this) {}
1710
1711 /// prepare a write
1712 void prepare_write(CephContext *cct,
1713 uint64_t seq, uint64_t offset, uint64_t length,
1714 bufferlist::const_iterator& p);
1715
1716 void aio_finish(BlueStore *store) override {
1717 store->_deferred_aio_finish(osr);
1718 }
1719 };
1720
1721 class OpSequencer : public RefCountedObject {
1722 public:
1723 ceph::mutex qlock = ceph::make_mutex("BlueStore::OpSequencer::qlock");
1724 ceph::condition_variable qcond;
1725 typedef boost::intrusive::list<
1726 TransContext,
1727 boost::intrusive::member_hook<
1728 TransContext,
1729 boost::intrusive::list_member_hook<>,
1730 &TransContext::sequencer_item> > q_list_t;
1731 q_list_t q; ///< transactions
1732
1733 boost::intrusive::list_member_hook<> deferred_osr_queue_item;
1734
1735 DeferredBatch *deferred_running = nullptr;
1736 DeferredBatch *deferred_pending = nullptr;
1737
1738 BlueStore *store;
1739 coll_t cid;
1740
1741 uint64_t last_seq = 0;
1742
1743 std::atomic_int txc_with_unstable_io = {0}; ///< num txcs with unstable io
1744
1745 std::atomic_int kv_committing_serially = {0};
1746
1747 std::atomic_int kv_submitted_waiters = {0};
1748
1749 std::atomic_bool zombie = {false}; ///< in zombie_osr set (collection going away)
1750
1751 OpSequencer(BlueStore *store, const coll_t& c)
1752 : RefCountedObject(store->cct, 0),
1753 store(store), cid(c) {
1754 }
1755 ~OpSequencer() {
1756 ceph_assert(q.empty());
1757 }
1758
1759 void queue_new(TransContext *txc) {
1760 std::lock_guard l(qlock);
1761 txc->seq = ++last_seq;
1762 q.push_back(*txc);
1763 }
1764
1765 void drain() {
1766 std::unique_lock l(qlock);
1767 while (!q.empty())
1768 qcond.wait(l);
1769 }
1770
1771 void drain_preceding(TransContext *txc) {
1772 std::unique_lock l(qlock);
1773 while (!q.empty() && &q.front() != txc)
1774 qcond.wait(l);
1775 }
1776
1777 bool _is_all_kv_submitted() {
1778 // caller must hold qlock & q.empty() must not empty
1779 ceph_assert(!q.empty());
1780 TransContext *txc = &q.back();
1781 if (txc->state >= TransContext::STATE_KV_SUBMITTED) {
1782 return true;
1783 }
1784 return false;
1785 }
1786
1787 void flush() {
1788 std::unique_lock l(qlock);
1789 while (true) {
1790 // set flag before the check because the condition
1791 // may become true outside qlock, and we need to make
1792 // sure those threads see waiters and signal qcond.
1793 ++kv_submitted_waiters;
1794 if (q.empty() || _is_all_kv_submitted()) {
1795 --kv_submitted_waiters;
1796 return;
1797 }
1798 qcond.wait(l);
1799 --kv_submitted_waiters;
1800 }
1801 }
1802
1803 void flush_all_but_last() {
1804 std::unique_lock l(qlock);
1805 assert (q.size() >= 1);
1806 while (true) {
1807 // set flag before the check because the condition
1808 // may become true outside qlock, and we need to make
1809 // sure those threads see waiters and signal qcond.
1810 ++kv_submitted_waiters;
1811 if (q.size() <= 1) {
1812 --kv_submitted_waiters;
1813 return;
1814 } else {
1815 auto it = q.rbegin();
1816 it++;
1817 if (it->state >= TransContext::STATE_KV_SUBMITTED) {
1818 return;
1819 }
1820 }
1821 qcond.wait(l);
1822 --kv_submitted_waiters;
1823 }
1824 }
1825
1826 bool flush_commit(Context *c) {
1827 std::lock_guard l(qlock);
1828 if (q.empty()) {
1829 return true;
1830 }
1831 TransContext *txc = &q.back();
1832 if (txc->state >= TransContext::STATE_KV_DONE) {
1833 return true;
1834 }
1835 txc->oncommits.push_back(c);
1836 return false;
1837 }
1838 };
1839
1840 typedef boost::intrusive::list<
1841 OpSequencer,
1842 boost::intrusive::member_hook<
1843 OpSequencer,
1844 boost::intrusive::list_member_hook<>,
1845 &OpSequencer::deferred_osr_queue_item> > deferred_osr_queue_t;
1846
1847 struct KVSyncThread : public Thread {
1848 BlueStore *store;
1849 explicit KVSyncThread(BlueStore *s) : store(s) {}
1850 void *entry() override {
1851 store->_kv_sync_thread();
1852 return NULL;
1853 }
1854 };
1855 struct KVFinalizeThread : public Thread {
1856 BlueStore *store;
1857 explicit KVFinalizeThread(BlueStore *s) : store(s) {}
1858 void *entry() {
1859 store->_kv_finalize_thread();
1860 return NULL;
1861 }
1862 };
1863
1864 struct DBHistogram {
1865 struct value_dist {
1866 uint64_t count;
1867 uint32_t max_len;
1868 };
1869
1870 struct key_dist {
1871 uint64_t count;
1872 uint32_t max_len;
1873 map<int, struct value_dist> val_map; ///< slab id to count, max length of value and key
1874 };
1875
1876 map<string, map<int, struct key_dist> > key_hist;
1877 map<int, uint64_t> value_hist;
1878 int get_key_slab(size_t sz);
1879 string get_key_slab_to_range(int slab);
1880 int get_value_slab(size_t sz);
1881 string get_value_slab_to_range(int slab);
1882 void update_hist_entry(map<string, map<int, struct key_dist> > &key_hist,
1883 const string &prefix, size_t key_size, size_t value_size);
1884 void dump(Formatter *f);
1885 };
1886
1887 // --------------------------------------------------------
1888 // members
1889 private:
1890 BlueFS *bluefs = nullptr;
1891 unsigned bluefs_shared_bdev = 0; ///< which bluefs bdev we are sharing
1892 bool bluefs_single_shared_device = true;
1893 mono_time bluefs_last_balance;
1894 utime_t next_dump_on_bluefs_alloc_failure;
1895
1896 KeyValueDB *db = nullptr;
1897 BlockDevice *bdev = nullptr;
1898 std::string freelist_type;
1899 FreelistManager *fm = nullptr;
1900 Allocator *alloc = nullptr;
1901 uuid_d fsid;
1902 int path_fd = -1; ///< open handle to $path
1903 int fsid_fd = -1; ///< open handle (locked) to $path/fsid
1904 bool mounted = false;
1905
1906 RWLock coll_lock = {"BlueStore::coll_lock"}; ///< rwlock to protect coll_map
1907 mempool::bluestore_cache_other::unordered_map<coll_t, CollectionRef> coll_map;
1908 map<coll_t,CollectionRef> new_coll_map;
1909
1910 vector<Cache*> cache_shards;
1911
1912 /// protect zombie_osr_set
1913 ceph::mutex zombie_osr_lock = ceph::make_mutex("BlueStore::zombie_osr_lock");
1914 std::map<coll_t,OpSequencerRef> zombie_osr_set; ///< set of OpSequencers for deleted collections
1915
1916 std::atomic<uint64_t> nid_last = {0};
1917 std::atomic<uint64_t> nid_max = {0};
1918 std::atomic<uint64_t> blobid_last = {0};
1919 std::atomic<uint64_t> blobid_max = {0};
1920
1921 Throttle throttle_bytes; ///< submit to commit
1922 Throttle throttle_deferred_bytes; ///< submit to deferred complete
1923
1924 interval_set<uint64_t> bluefs_extents; ///< block extents owned by bluefs
1925 interval_set<uint64_t> bluefs_extents_reclaiming; ///< currently reclaiming
1926
1927 ceph::mutex deferred_lock = ceph::make_mutex("BlueStore::deferred_lock");
1928 std::atomic<uint64_t> deferred_seq = {0};
1929 deferred_osr_queue_t deferred_queue; ///< osr's with deferred io pending
1930 int deferred_queue_size = 0; ///< num txc's queued across all osrs
1931 atomic_int deferred_aggressive = {0}; ///< aggressive wakeup of kv thread
1932 Finisher deferred_finisher, finisher;
1933
1934 KVSyncThread kv_sync_thread;
1935 ceph::mutex kv_lock = ceph::make_mutex("BlueStore::kv_lock");
1936 ceph::condition_variable kv_cond;
1937 bool _kv_only = false;
1938 bool kv_sync_started = false;
1939 bool kv_stop = false;
1940 bool kv_finalize_started = false;
1941 bool kv_finalize_stop = false;
1942 deque<TransContext*> kv_queue; ///< ready, already submitted
1943 deque<TransContext*> kv_queue_unsubmitted; ///< ready, need submit by kv thread
1944 deque<TransContext*> kv_committing; ///< currently syncing
1945 deque<DeferredBatch*> deferred_done_queue; ///< deferred ios done
1946
1947 KVFinalizeThread kv_finalize_thread;
1948 ceph::mutex kv_finalize_lock = ceph::make_mutex("BlueStore::kv_finalize_lock");
1949 ceph::condition_variable kv_finalize_cond;
1950 deque<TransContext*> kv_committing_to_finalize; ///< pending finalization
1951 deque<DeferredBatch*> deferred_stable_to_finalize; ///< pending finalization
1952
1953 PerfCounters *logger = nullptr;
1954
1955 list<CollectionRef> removed_collections;
1956
1957 RWLock debug_read_error_lock = {"BlueStore::debug_read_error_lock"};
1958 set<ghobject_t> debug_data_error_objects;
1959 set<ghobject_t> debug_mdata_error_objects;
1960
1961 std::atomic<int> csum_type = {Checksummer::CSUM_CRC32C};
1962
1963 uint64_t block_size = 0; ///< block size of block device (power of 2)
1964 uint64_t block_mask = 0; ///< mask to get just the block offset
1965 size_t block_size_order = 0; ///< bits to shift to get block size
1966
1967 uint64_t min_alloc_size = 0; ///< minimum allocation unit (power of 2)
1968 ///< bits for min_alloc_size
1969 uint8_t min_alloc_size_order = 0;
1970 static_assert(std::numeric_limits<uint8_t>::max() >
1971 std::numeric_limits<decltype(min_alloc_size)>::digits,
1972 "not enough bits for min_alloc_size");
1973
1974 ///< maximum allocation unit (power of 2)
1975 std::atomic<uint64_t> max_alloc_size = {0};
1976
1977 ///< number threshold for forced deferred writes
1978 std::atomic<int> deferred_batch_ops = {0};
1979
1980 ///< size threshold for forced deferred writes
1981 std::atomic<uint64_t> prefer_deferred_size = {0};
1982
1983 ///< approx cost per io, in bytes
1984 std::atomic<uint64_t> throttle_cost_per_io = {0};
1985
1986 std::atomic<Compressor::CompressionMode> comp_mode =
1987 {Compressor::COMP_NONE}; ///< compression mode
1988 CompressorRef compressor;
1989 std::atomic<uint64_t> comp_min_blob_size = {0};
1990 std::atomic<uint64_t> comp_max_blob_size = {0};
1991
1992 std::atomic<uint64_t> max_blob_size = {0}; ///< maximum blob size
1993
1994 uint64_t kv_ios = 0;
1995 uint64_t kv_throttle_costs = 0;
1996
1997 // cache trim control
1998 uint64_t cache_size = 0; ///< total cache size
1999 double cache_meta_ratio = 0; ///< cache ratio dedicated to metadata
2000 double cache_kv_ratio = 0; ///< cache ratio dedicated to kv (e.g., rocksdb)
2001 double cache_data_ratio = 0; ///< cache ratio dedicated to object data
2002 bool cache_autotune = false; ///< cache autotune setting
2003 double cache_autotune_interval = 0; ///< time to wait between cache rebalancing
2004 uint64_t osd_memory_target = 0; ///< OSD memory target when autotuning cache
2005 uint64_t osd_memory_base = 0; ///< OSD base memory when autotuning cache
2006 double osd_memory_expected_fragmentation = 0; ///< expected memory fragmentation
2007 uint64_t osd_memory_cache_min = 0; ///< Min memory to assign when autotuning cache
2008 double osd_memory_cache_resize_interval = 0; ///< Time to wait between cache resizing
2009
2010 typedef map<uint64_t, volatile_statfs> osd_pools_map;
2011
2012 ceph::mutex vstatfs_lock = ceph::make_mutex("BlueStore::vstatfs_lock");
2013 volatile_statfs vstatfs;
2014 osd_pools_map osd_pools; // protected by vstatfs_lock as well
2015
2016 bool per_pool_stat_collection = true;
2017
2018 struct MempoolThread : public Thread {
2019 public:
2020 BlueStore *store;
2021
2022 ceph::condition_variable cond;
2023 ceph::mutex lock = ceph::make_mutex("BlueStore::MempoolThread::lock");
2024 bool stop = false;
2025 uint64_t autotune_cache_size = 0;
2026 std::shared_ptr<PriorityCache::PriCache> binned_kv_cache = nullptr;
2027
2028 struct MempoolCache : public PriorityCache::PriCache {
2029 BlueStore *store;
2030 int64_t cache_bytes[PriorityCache::Priority::LAST+1] = {0};
2031 int64_t committed_bytes = 0;
2032 double cache_ratio = 0;
2033
2034 MempoolCache(BlueStore *s) : store(s) {};
2035
2036 virtual uint64_t _get_used_bytes() const = 0;
2037
2038 virtual int64_t request_cache_bytes(
2039 PriorityCache::Priority pri, uint64_t total_cache) const {
2040 int64_t assigned = get_cache_bytes(pri);
2041
2042 switch (pri) {
2043 // All cache items are currently shoved into the LAST priority
2044 case PriorityCache::Priority::LAST:
2045 {
2046 int64_t request = _get_used_bytes();
2047 return(request > assigned) ? request - assigned : 0;
2048 }
2049 default:
2050 break;
2051 }
2052 return -EOPNOTSUPP;
2053 }
2054
2055 virtual int64_t get_cache_bytes(PriorityCache::Priority pri) const {
2056 return cache_bytes[pri];
2057 }
2058 virtual int64_t get_cache_bytes() const {
2059 int64_t total = 0;
2060
2061 for (int i = 0; i < PriorityCache::Priority::LAST + 1; i++) {
2062 PriorityCache::Priority pri = static_cast<PriorityCache::Priority>(i);
2063 total += get_cache_bytes(pri);
2064 }
2065 return total;
2066 }
2067 virtual void set_cache_bytes(PriorityCache::Priority pri, int64_t bytes) {
2068 cache_bytes[pri] = bytes;
2069 }
2070 virtual void add_cache_bytes(PriorityCache::Priority pri, int64_t bytes) {
2071 cache_bytes[pri] += bytes;
2072 }
2073 virtual int64_t commit_cache_size(uint64_t total_cache) {
2074 committed_bytes = PriorityCache::get_chunk(
2075 get_cache_bytes(), total_cache);
2076 return committed_bytes;
2077 }
2078 virtual int64_t get_committed_size() const {
2079 return committed_bytes;
2080 }
2081 virtual double get_cache_ratio() const {
2082 return cache_ratio;
2083 }
2084 virtual void set_cache_ratio(double ratio) {
2085 cache_ratio = ratio;
2086 }
2087 virtual string get_cache_name() const = 0;
2088 };
2089
2090 struct MetaCache : public MempoolCache {
2091 MetaCache(BlueStore *s) : MempoolCache(s) {};
2092
2093 virtual uint64_t _get_used_bytes() const {
2094 return mempool::bluestore_cache_other::allocated_bytes() +
2095 mempool::bluestore_cache_onode::allocated_bytes();
2096 }
2097
2098 virtual string get_cache_name() const {
2099 return "BlueStore Meta Cache";
2100 }
2101
2102 uint64_t _get_num_onodes() const {
2103 uint64_t onode_num =
2104 mempool::bluestore_cache_onode::allocated_items();
2105 return (2 > onode_num) ? 2 : onode_num;
2106 }
2107
2108 double get_bytes_per_onode() const {
2109 return (double)_get_used_bytes() / (double)_get_num_onodes();
2110 }
2111 };
2112 std::shared_ptr<MetaCache> meta_cache;
2113
2114 struct DataCache : public MempoolCache {
2115 DataCache(BlueStore *s) : MempoolCache(s) {};
2116
2117 virtual uint64_t _get_used_bytes() const {
2118 uint64_t bytes = 0;
2119 for (auto i : store->cache_shards) {
2120 bytes += i->_get_buffer_bytes();
2121 }
2122 return bytes;
2123 }
2124 virtual string get_cache_name() const {
2125 return "BlueStore Data Cache";
2126 }
2127 };
2128 std::shared_ptr<DataCache> data_cache;
2129
2130 public:
2131 explicit MempoolThread(BlueStore *s)
2132 : store(s),
2133 meta_cache(new MetaCache(s)),
2134 data_cache(new DataCache(s)) {}
2135
2136 void *entry() override;
2137 void init() {
2138 ceph_assert(stop == false);
2139 create("bstore_mempool");
2140 }
2141 void shutdown() {
2142 lock.lock();
2143 stop = true;
2144 cond.notify_all();
2145 lock.unlock();
2146 join();
2147 }
2148
2149 private:
2150 void _adjust_cache_settings();
2151 void _trim_shards(bool interval_stats);
2152 void _tune_cache_size(bool interval_stats);
2153 void _balance_cache(
2154 const std::list<std::shared_ptr<PriorityCache::PriCache>>& caches);
2155 void _balance_cache_pri(
2156 int64_t *mem_avail,
2157 const std::list<std::shared_ptr<PriorityCache::PriCache>>& caches,
2158 PriorityCache::Priority pri);
2159 } mempool_thread;
2160
2161 // --------------------------------------------------------
2162 // private methods
2163
2164 void _init_logger();
2165 void _shutdown_logger();
2166 int _reload_logger();
2167
2168 int _open_path();
2169 void _close_path();
2170 int _open_fsid(bool create);
2171 int _lock_fsid();
2172 int _read_fsid(uuid_d *f);
2173 int _write_fsid();
2174 void _close_fsid();
2175 void _set_alloc_sizes();
2176 void _set_blob_size();
2177 void _set_finisher_num();
2178
2179 int _open_bdev(bool create);
2180 // Verifies if disk space is enough for reserved + min bluefs
2181 // and alters the latter if needed.
2182 // Depends on min_alloc_size hence should be called after
2183 // its initialization (and outside of _open_bdev)
2184 void _validate_bdev();
2185 void _close_bdev();
2186
2187 int _minimal_open_bluefs(bool create);
2188 void _minimal_close_bluefs();
2189 int _open_bluefs(bool create);
2190 void _close_bluefs();
2191
2192 // Limited (u)mount intended for BlueFS operations only
2193 int _mount_for_bluefs();
2194 void _umount_for_bluefs();
2195
2196
2197 int _is_bluefs(bool create, bool* ret);
2198 /*
2199 * opens both DB and dependant super_meta, FreelistManager and allocator
2200 * in the proper order
2201 */
2202 int _open_db_and_around(bool read_only);
2203 void _close_db_and_around();
2204
2205 // updates legacy bluefs related recs in DB to a state valid for
2206 // downgrades from nautilus.
2207 void _sync_bluefs_and_fm();
2208
2209 /*
2210 * @warning to_repair_db means that we open this db to repair it, will not
2211 * hold the rocksdb's file lock.
2212 */
2213 int _open_db(bool create,
2214 bool to_repair_db=false,
2215 bool read_only = false);
2216 void _close_db();
2217 int _open_fm(KeyValueDB::Transaction t);
2218 void _close_fm();
2219 int _open_alloc();
2220 void _close_alloc();
2221 int _open_collections(int *errors=0);
2222 void _close_collections();
2223
2224 int _setup_block_symlink_or_file(string name, string path, uint64_t size,
2225 bool create);
2226
2227 public:
2228 static int _write_bdev_label(CephContext* cct,
2229 string path, bluestore_bdev_label_t label);
2230 static int _read_bdev_label(CephContext* cct, string path,
2231 bluestore_bdev_label_t *label);
2232 private:
2233 int _check_or_set_bdev_label(string path, uint64_t size, string desc,
2234 bool create);
2235
2236 int _open_super_meta();
2237
2238 void _open_statfs();
2239 void _get_statfs_overall(struct store_statfs_t *buf);
2240
2241 void _dump_alloc_on_failure();
2242
2243 int64_t _get_bluefs_size_delta(uint64_t bluefs_free, uint64_t bluefs_total);
2244 int _balance_bluefs_freespace();
2245
2246 CollectionRef _get_collection(const coll_t& cid);
2247 void _queue_reap_collection(CollectionRef& c);
2248 void _reap_collections();
2249 void _update_cache_logger();
2250
2251 void _assign_nid(TransContext *txc, OnodeRef o);
2252 uint64_t _assign_blobid(TransContext *txc);
2253
2254 template <int LogLevelV>
2255 friend void _dump_onode(CephContext *cct, const Onode& o);
2256 template <int LogLevelV>
2257 friend void _dump_extent_map(CephContext *cct, const ExtentMap& em);
2258 template <int LogLevelV>
2259 friend void _dump_transaction(CephContext *cct, Transaction *t);
2260
2261 TransContext *_txc_create(Collection *c, OpSequencer *osr,
2262 list<Context*> *on_commits);
2263 void _txc_update_store_statfs(TransContext *txc);
2264 void _txc_add_transaction(TransContext *txc, Transaction *t);
2265 void _txc_calc_cost(TransContext *txc);
2266 void _txc_write_nodes(TransContext *txc, KeyValueDB::Transaction t);
2267 void _txc_state_proc(TransContext *txc);
2268 void _txc_aio_submit(TransContext *txc);
2269 public:
2270 void txc_aio_finish(void *p) {
2271 _txc_state_proc(static_cast<TransContext*>(p));
2272 }
2273 private:
2274 void _txc_finish_io(TransContext *txc);
2275 void _txc_finalize_kv(TransContext *txc, KeyValueDB::Transaction t);
2276 void _txc_applied_kv(TransContext *txc);
2277 void _txc_committed_kv(TransContext *txc);
2278 void _txc_finish(TransContext *txc);
2279 void _txc_release_alloc(TransContext *txc);
2280
2281 void _osr_attach(Collection *c);
2282 void _osr_register_zombie(OpSequencer *osr);
2283 void _osr_drain(OpSequencer *osr);
2284 void _osr_drain_preceding(TransContext *txc);
2285 void _osr_drain_all();
2286
2287 void _kv_start();
2288 void _kv_stop();
2289 void _kv_sync_thread();
2290 void _kv_finalize_thread();
2291
2292 bluestore_deferred_op_t *_get_deferred_op(TransContext *txc, OnodeRef o);
2293 void _deferred_queue(TransContext *txc);
2294 public:
2295 void deferred_try_submit();
2296 private:
2297 void _deferred_submit_unlock(OpSequencer *osr);
2298 void _deferred_aio_finish(OpSequencer *osr);
2299 int _deferred_replay();
2300
2301 public:
2302 using mempool_dynamic_bitset =
2303 boost::dynamic_bitset<uint64_t,
2304 mempool::bluestore_fsck::pool_allocator<uint64_t>>;
2305
2306 private:
2307 int _fsck_check_extents(
2308 const coll_t& cid,
2309 const ghobject_t& oid,
2310 const PExtentVector& extents,
2311 bool compressed,
2312 mempool_dynamic_bitset &used_blocks,
2313 uint64_t granularity,
2314 BlueStoreRepairer* repairer,
2315 store_statfs_t& expected_statfs);
2316
2317 using per_pool_statfs =
2318 mempool::bluestore_fsck::map<uint64_t, store_statfs_t>;
2319 void _fsck_check_pool_statfs(
2320 per_pool_statfs& expected_pool_statfs,
2321 bool need_per_pool_stats,
2322 int& errors,
2323 BlueStoreRepairer* repairer);
2324
2325 void _buffer_cache_write(
2326 TransContext *txc,
2327 BlobRef b,
2328 uint64_t offset,
2329 bufferlist& bl,
2330 unsigned flags) {
2331 b->shared_blob->bc.write(b->shared_blob->get_cache(), txc->seq, offset, bl,
2332 flags);
2333 txc->shared_blobs_written.insert(b->shared_blob);
2334 }
2335
2336 int _collection_list(
2337 Collection *c, const ghobject_t& start, const ghobject_t& end,
2338 int max, vector<ghobject_t> *ls, ghobject_t *next);
2339
2340 template <typename T, typename F>
2341 T select_option(const std::string& opt_name, T val1, F f) {
2342 //NB: opt_name reserved for future use
2343 boost::optional<T> val2 = f();
2344 if (val2) {
2345 return *val2;
2346 }
2347 return val1;
2348 }
2349
2350 void _apply_padding(uint64_t head_pad,
2351 uint64_t tail_pad,
2352 bufferlist& padded);
2353
2354 void _record_onode(OnodeRef &o, KeyValueDB::Transaction &txn);
2355
2356 // -- ondisk version ---
2357 public:
2358 const int32_t latest_ondisk_format = 2; ///< our version
2359 const int32_t min_readable_ondisk_format = 1; ///< what we can read
2360 const int32_t min_compat_ondisk_format = 2; ///< who can read us
2361
2362 private:
2363 int32_t ondisk_format = 0; ///< value detected on mount
2364
2365 int _upgrade_super(); ///< upgrade (called during open_super)
2366 uint64_t _get_ondisk_reserved() const;
2367 void _prepare_ondisk_format_super(KeyValueDB::Transaction& t);
2368
2369 // --- public interface ---
2370 public:
2371 BlueStore(CephContext *cct, const string& path);
2372 BlueStore(CephContext *cct, const string& path, uint64_t min_alloc_size); // Ctor for UT only
2373 ~BlueStore() override;
2374
2375 string get_type() override {
2376 return "bluestore";
2377 }
2378
2379 bool needs_journal() override { return false; };
2380 bool wants_journal() override { return false; };
2381 bool allows_journal() override { return false; };
2382
2383 int get_devices(set<string> *ls) override;
2384
2385 bool is_rotational() override;
2386 bool is_journal_rotational() override;
2387
2388 string get_default_device_class() override {
2389 string device_class;
2390 map<string, string> metadata;
2391 collect_metadata(&metadata);
2392 auto it = metadata.find("bluestore_bdev_type");
2393 if (it != metadata.end()) {
2394 device_class = it->second;
2395 }
2396 return device_class;
2397 }
2398
2399 int get_numa_node(
2400 int *numa_node,
2401 set<int> *nodes,
2402 set<string> *failed) override;
2403
2404 static int get_block_device_fsid(CephContext* cct, const string& path,
2405 uuid_d *fsid);
2406
2407 bool test_mount_in_use() override;
2408
2409 private:
2410 int _mount(bool kv_only, bool open_db=true);
2411 public:
2412 int mount() override {
2413 return _mount(false);
2414 }
2415 int umount() override;
2416
2417 int start_kv_only(KeyValueDB **pdb, bool open_db=true) {
2418 int r = _mount(true, open_db);
2419 if (r < 0)
2420 return r;
2421 *pdb = db;
2422 return 0;
2423 }
2424
2425 int write_meta(const std::string& key, const std::string& value) override;
2426 int read_meta(const std::string& key, std::string *value) override;
2427
2428
2429 int fsck(bool deep) override {
2430 return _fsck(deep, false);
2431 }
2432 int repair(bool deep) override {
2433 return _fsck(deep, true);
2434 }
2435 int _fsck(bool deep, bool repair);
2436
2437 void set_cache_shards(unsigned num) override;
2438 void dump_cache_stats(Formatter *f) override {
2439 int onode_count = 0, buffers_bytes = 0;
2440 for (auto i: cache_shards) {
2441 onode_count += i->_get_num_onodes();
2442 buffers_bytes += i->_get_buffer_bytes();
2443 }
2444 f->dump_int("bluestore_onode", onode_count);
2445 f->dump_int("bluestore_buffers", buffers_bytes);
2446 }
2447 void dump_cache_stats(ostream& ss) override {
2448 int onode_count = 0, buffers_bytes = 0;
2449 for (auto i: cache_shards) {
2450 onode_count += i->_get_num_onodes();
2451 buffers_bytes += i->_get_buffer_bytes();
2452 }
2453 ss << "bluestore_onode: " << onode_count;
2454 ss << "bluestore_buffers: " << buffers_bytes;
2455 }
2456
2457 int validate_hobject_key(const hobject_t &obj) const override {
2458 return 0;
2459 }
2460 unsigned get_max_attr_name_length() override {
2461 return 256; // arbitrary; there is no real limit internally
2462 }
2463
2464 int mkfs() override;
2465 int mkjournal() override {
2466 return 0;
2467 }
2468
2469 void get_db_statistics(Formatter *f) override;
2470 void generate_db_histogram(Formatter *f) override;
2471 void _flush_cache();
2472 int flush_cache(ostream *os = NULL) override;
2473 void dump_perf_counters(Formatter *f) override {
2474 f->open_object_section("perf_counters");
2475 logger->dump_formatted(f, false);
2476 f->close_section();
2477 }
2478
2479 int add_new_bluefs_device(int id, const string& path);
2480 int migrate_to_existing_bluefs_device(const set<int>& devs_source,
2481 int id);
2482 int migrate_to_new_bluefs_device(const set<int>& devs_source,
2483 int id,
2484 const string& path);
2485 int expand_devices(ostream& out);
2486 string get_device_path(unsigned id);
2487
2488 public:
2489 int statfs(struct store_statfs_t *buf,
2490 osd_alert_list_t* alerts = nullptr) override;
2491 int pool_statfs(uint64_t pool_id, struct store_statfs_t *buf) override;
2492
2493 void collect_metadata(map<string,string> *pm) override;
2494
2495 bool exists(CollectionHandle &c, const ghobject_t& oid) override;
2496 int set_collection_opts(
2497 CollectionHandle& c,
2498 const pool_opts_t& opts) override;
2499 int stat(
2500 CollectionHandle &c,
2501 const ghobject_t& oid,
2502 struct stat *st,
2503 bool allow_eio = false) override;
2504 int read(
2505 CollectionHandle &c,
2506 const ghobject_t& oid,
2507 uint64_t offset,
2508 size_t len,
2509 bufferlist& bl,
2510 uint32_t op_flags = 0) override;
2511 int _do_read(
2512 Collection *c,
2513 OnodeRef o,
2514 uint64_t offset,
2515 size_t len,
2516 bufferlist& bl,
2517 uint32_t op_flags = 0,
2518 uint64_t retry_count = 0);
2519
2520 private:
2521 int _fiemap(CollectionHandle &c_, const ghobject_t& oid,
2522 uint64_t offset, size_t len, interval_set<uint64_t>& destset);
2523 public:
2524 int fiemap(CollectionHandle &c, const ghobject_t& oid,
2525 uint64_t offset, size_t len, bufferlist& bl) override;
2526 int fiemap(CollectionHandle &c, const ghobject_t& oid,
2527 uint64_t offset, size_t len, map<uint64_t, uint64_t>& destmap) override;
2528
2529
2530 int getattr(CollectionHandle &c, const ghobject_t& oid, const char *name,
2531 bufferptr& value) override;
2532
2533 int getattrs(CollectionHandle &c, const ghobject_t& oid,
2534 map<string,bufferptr>& aset) override;
2535
2536 int list_collections(vector<coll_t>& ls) override;
2537
2538 CollectionHandle open_collection(const coll_t &c) override;
2539 CollectionHandle create_new_collection(const coll_t& cid) override;
2540 void set_collection_commit_queue(const coll_t& cid,
2541 ContextQueue *commit_queue) override;
2542
2543 bool collection_exists(const coll_t& c) override;
2544 int collection_empty(CollectionHandle& c, bool *empty) override;
2545 int collection_bits(CollectionHandle& c) override;
2546
2547 int collection_list(CollectionHandle &c,
2548 const ghobject_t& start,
2549 const ghobject_t& end,
2550 int max,
2551 vector<ghobject_t> *ls, ghobject_t *next) override;
2552
2553 int omap_get(
2554 CollectionHandle &c, ///< [in] Collection containing oid
2555 const ghobject_t &oid, ///< [in] Object containing omap
2556 bufferlist *header, ///< [out] omap header
2557 map<string, bufferlist> *out /// < [out] Key to value map
2558 ) override;
2559
2560 /// Get omap header
2561 int omap_get_header(
2562 CollectionHandle &c, ///< [in] Collection containing oid
2563 const ghobject_t &oid, ///< [in] Object containing omap
2564 bufferlist *header, ///< [out] omap header
2565 bool allow_eio = false ///< [in] don't assert on eio
2566 ) override;
2567
2568 /// Get keys defined on oid
2569 int omap_get_keys(
2570 CollectionHandle &c, ///< [in] Collection containing oid
2571 const ghobject_t &oid, ///< [in] Object containing omap
2572 set<string> *keys ///< [out] Keys defined on oid
2573 ) override;
2574
2575 /// Get key values
2576 int omap_get_values(
2577 CollectionHandle &c, ///< [in] Collection containing oid
2578 const ghobject_t &oid, ///< [in] Object containing omap
2579 const set<string> &keys, ///< [in] Keys to get
2580 map<string, bufferlist> *out ///< [out] Returned keys and values
2581 ) override;
2582
2583 /// Filters keys into out which are defined on oid
2584 int omap_check_keys(
2585 CollectionHandle &c, ///< [in] Collection containing oid
2586 const ghobject_t &oid, ///< [in] Object containing omap
2587 const set<string> &keys, ///< [in] Keys to check
2588 set<string> *out ///< [out] Subset of keys defined on oid
2589 ) override;
2590
2591 ObjectMap::ObjectMapIterator get_omap_iterator(
2592 CollectionHandle &c, ///< [in] collection
2593 const ghobject_t &oid ///< [in] object
2594 ) override;
2595
2596 void set_fsid(uuid_d u) override {
2597 fsid = u;
2598 }
2599 uuid_d get_fsid() override {
2600 return fsid;
2601 }
2602
2603 uint64_t estimate_objects_overhead(uint64_t num_objects) override {
2604 return num_objects * 300; //assuming per-object overhead is 300 bytes
2605 }
2606
2607 struct BSPerfTracker {
2608 PerfCounters::avg_tracker<uint64_t> os_commit_latency_ns;
2609 PerfCounters::avg_tracker<uint64_t> os_apply_latency_ns;
2610
2611 objectstore_perf_stat_t get_cur_stats() const {
2612 objectstore_perf_stat_t ret;
2613 ret.os_commit_latency_ns = os_commit_latency_ns.current_avg();
2614 ret.os_apply_latency_ns = os_apply_latency_ns.current_avg();
2615 return ret;
2616 }
2617
2618 void update_from_perfcounters(PerfCounters &logger);
2619 } perf_tracker;
2620
2621 objectstore_perf_stat_t get_cur_stats() override {
2622 perf_tracker.update_from_perfcounters(*logger);
2623 return perf_tracker.get_cur_stats();
2624 }
2625 const PerfCounters* get_perf_counters() const override {
2626 return logger;
2627 }
2628
2629 int queue_transactions(
2630 CollectionHandle& ch,
2631 vector<Transaction>& tls,
2632 TrackedOpRef op = TrackedOpRef(),
2633 ThreadPool::TPHandle *handle = NULL) override;
2634
2635 // error injection
2636 void inject_data_error(const ghobject_t& o) override {
2637 RWLock::WLocker l(debug_read_error_lock);
2638 debug_data_error_objects.insert(o);
2639 }
2640 void inject_mdata_error(const ghobject_t& o) override {
2641 RWLock::WLocker l(debug_read_error_lock);
2642 debug_mdata_error_objects.insert(o);
2643 }
2644
2645 /// methods to inject various errors fsck can repair
2646 void inject_broken_shared_blob_key(const string& key,
2647 const bufferlist& bl);
2648 void inject_leaked(uint64_t len);
2649 void inject_false_free(coll_t cid, ghobject_t oid);
2650 void inject_statfs(const string& key, const store_statfs_t& new_statfs);
2651 void inject_misreference(coll_t cid1, ghobject_t oid1,
2652 coll_t cid2, ghobject_t oid2,
2653 uint64_t offset);
2654
2655 void compact() override {
2656 ceph_assert(db);
2657 db->compact();
2658 }
2659 bool has_builtin_csum() const override {
2660 return true;
2661 }
2662
2663 /*
2664 Allocate space for BlueFS from slow device.
2665 Either automatically applies allocated extents to underlying
2666 BlueFS (extents == nullptr) or just return them (non-null extents) provided
2667 */
2668 int allocate_bluefs_freespace(
2669 uint64_t min_size,
2670 uint64_t size,
2671 PExtentVector* extents);
2672
2673 inline void log_latency(const char* name,
2674 int idx,
2675 const ceph::timespan& lat,
2676 double lat_threshold,
2677 const char* info = "") const;
2678
2679 inline void log_latency_fn(const char* name,
2680 int idx,
2681 const ceph::timespan& lat,
2682 double lat_threshold,
2683 std::function<string (const ceph::timespan& lat)> fn) const;
2684
2685 private:
2686 bool _debug_data_eio(const ghobject_t& o) {
2687 if (!cct->_conf->bluestore_debug_inject_read_err) {
2688 return false;
2689 }
2690 RWLock::RLocker l(debug_read_error_lock);
2691 return debug_data_error_objects.count(o);
2692 }
2693 bool _debug_mdata_eio(const ghobject_t& o) {
2694 if (!cct->_conf->bluestore_debug_inject_read_err) {
2695 return false;
2696 }
2697 RWLock::RLocker l(debug_read_error_lock);
2698 return debug_mdata_error_objects.count(o);
2699 }
2700 void _debug_obj_on_delete(const ghobject_t& o) {
2701 if (cct->_conf->bluestore_debug_inject_read_err) {
2702 RWLock::WLocker l(debug_read_error_lock);
2703 debug_data_error_objects.erase(o);
2704 debug_mdata_error_objects.erase(o);
2705 }
2706 }
2707 private:
2708 ceph::mutex qlock = ceph::make_mutex("BlueStore::Alerts::qlock");
2709 string failed_cmode;
2710 set<string> failed_compressors;
2711 string spillover_alert;
2712 string legacy_statfs_alert;
2713 string disk_size_mismatch_alert;
2714
2715 void _log_alerts(osd_alert_list_t& alerts);
2716 bool _set_compression_alert(bool cmode, const char* s) {
2717 std::lock_guard l(qlock);
2718 if (cmode) {
2719 bool ret = failed_cmode.empty();
2720 failed_cmode = s;
2721 return ret;
2722 }
2723 return failed_compressors.emplace(s).second;
2724 }
2725 void _clear_compression_alert() {
2726 std::lock_guard l(qlock);
2727 failed_compressors.clear();
2728 failed_cmode.clear();
2729 }
2730
2731 void _set_spillover_alert(const string& s) {
2732 std::lock_guard l(qlock);
2733 spillover_alert = s;
2734 }
2735 void _clear_spillover_alert() {
2736 std::lock_guard l(qlock);
2737 spillover_alert.clear();
2738 }
2739
2740 void _check_legacy_statfs_alert();
2741 void _set_disk_size_mismatch_alert(const string& s) {
2742 std::lock_guard l(qlock);
2743 disk_size_mismatch_alert = s;
2744 }
2745
2746 private:
2747
2748 // --------------------------------------------------------
2749 // read processing internal methods
2750 int _verify_csum(
2751 OnodeRef& o,
2752 const bluestore_blob_t* blob,
2753 uint64_t blob_xoffset,
2754 const bufferlist& bl,
2755 uint64_t logical_offset) const;
2756 int _decompress(bufferlist& source, bufferlist* result);
2757
2758
2759 // --------------------------------------------------------
2760 // write ops
2761
2762 struct WriteContext {
2763 bool buffered = false; ///< buffered write
2764 bool compress = false; ///< compressed write
2765 uint64_t target_blob_size = 0; ///< target (max) blob size
2766 unsigned csum_order = 0; ///< target checksum chunk order
2767
2768 old_extent_map_t old_extents; ///< must deref these blobs
2769
2770 struct write_item {
2771 uint64_t logical_offset; ///< write logical offset
2772 BlobRef b;
2773 uint64_t blob_length;
2774 uint64_t b_off;
2775 bufferlist bl;
2776 uint64_t b_off0; ///< original offset in a blob prior to padding
2777 uint64_t length0; ///< original data length prior to padding
2778
2779 bool mark_unused;
2780 bool new_blob; ///< whether new blob was created
2781
2782 bool compressed = false;
2783 bufferlist compressed_bl;
2784 size_t compressed_len = 0;
2785
2786 write_item(
2787 uint64_t logical_offs,
2788 BlobRef b,
2789 uint64_t blob_len,
2790 uint64_t o,
2791 bufferlist& bl,
2792 uint64_t o0,
2793 uint64_t l0,
2794 bool _mark_unused,
2795 bool _new_blob)
2796 :
2797 logical_offset(logical_offs),
2798 b(b),
2799 blob_length(blob_len),
2800 b_off(o),
2801 bl(bl),
2802 b_off0(o0),
2803 length0(l0),
2804 mark_unused(_mark_unused),
2805 new_blob(_new_blob) {}
2806 };
2807 vector<write_item> writes; ///< blobs we're writing
2808
2809 /// partial clone of the context
2810 void fork(const WriteContext& other) {
2811 buffered = other.buffered;
2812 compress = other.compress;
2813 target_blob_size = other.target_blob_size;
2814 csum_order = other.csum_order;
2815 }
2816 void write(
2817 uint64_t loffs,
2818 BlobRef b,
2819 uint64_t blob_len,
2820 uint64_t o,
2821 bufferlist& bl,
2822 uint64_t o0,
2823 uint64_t len0,
2824 bool _mark_unused,
2825 bool _new_blob) {
2826 writes.emplace_back(loffs,
2827 b,
2828 blob_len,
2829 o,
2830 bl,
2831 o0,
2832 len0,
2833 _mark_unused,
2834 _new_blob);
2835 }
2836 /// Checks for writes to the same pextent within a blob
2837 bool has_conflict(
2838 BlobRef b,
2839 uint64_t loffs,
2840 uint64_t loffs_end,
2841 uint64_t min_alloc_size);
2842 };
2843
2844 void _do_write_small(
2845 TransContext *txc,
2846 CollectionRef &c,
2847 OnodeRef o,
2848 uint64_t offset, uint64_t length,
2849 bufferlist::iterator& blp,
2850 WriteContext *wctx);
2851 void _do_write_big(
2852 TransContext *txc,
2853 CollectionRef &c,
2854 OnodeRef o,
2855 uint64_t offset, uint64_t length,
2856 bufferlist::iterator& blp,
2857 WriteContext *wctx);
2858 int _do_alloc_write(
2859 TransContext *txc,
2860 CollectionRef c,
2861 OnodeRef o,
2862 WriteContext *wctx);
2863 void _wctx_finish(
2864 TransContext *txc,
2865 CollectionRef& c,
2866 OnodeRef o,
2867 WriteContext *wctx,
2868 set<SharedBlob*> *maybe_unshared_blobs=0);
2869
2870 int _write(TransContext *txc,
2871 CollectionRef& c,
2872 OnodeRef& o,
2873 uint64_t offset, size_t len,
2874 bufferlist& bl,
2875 uint32_t fadvise_flags);
2876 void _pad_zeros(bufferlist *bl, uint64_t *offset,
2877 uint64_t chunk_size);
2878
2879 void _choose_write_options(CollectionRef& c,
2880 OnodeRef o,
2881 uint32_t fadvise_flags,
2882 WriteContext *wctx);
2883
2884 int _do_gc(TransContext *txc,
2885 CollectionRef& c,
2886 OnodeRef o,
2887 const GarbageCollector& gc,
2888 const WriteContext& wctx,
2889 uint64_t *dirty_start,
2890 uint64_t *dirty_end);
2891
2892 int _do_write(TransContext *txc,
2893 CollectionRef &c,
2894 OnodeRef o,
2895 uint64_t offset, uint64_t length,
2896 bufferlist& bl,
2897 uint32_t fadvise_flags);
2898 void _do_write_data(TransContext *txc,
2899 CollectionRef& c,
2900 OnodeRef o,
2901 uint64_t offset,
2902 uint64_t length,
2903 bufferlist& bl,
2904 WriteContext *wctx);
2905
2906 int _touch(TransContext *txc,
2907 CollectionRef& c,
2908 OnodeRef& o);
2909 int _do_zero(TransContext *txc,
2910 CollectionRef& c,
2911 OnodeRef& o,
2912 uint64_t offset, size_t len);
2913 int _zero(TransContext *txc,
2914 CollectionRef& c,
2915 OnodeRef& o,
2916 uint64_t offset, size_t len);
2917 void _do_truncate(TransContext *txc,
2918 CollectionRef& c,
2919 OnodeRef o,
2920 uint64_t offset,
2921 set<SharedBlob*> *maybe_unshared_blobs=0);
2922 int _truncate(TransContext *txc,
2923 CollectionRef& c,
2924 OnodeRef& o,
2925 uint64_t offset);
2926 int _remove(TransContext *txc,
2927 CollectionRef& c,
2928 OnodeRef& o);
2929 int _do_remove(TransContext *txc,
2930 CollectionRef& c,
2931 OnodeRef o);
2932 int _setattr(TransContext *txc,
2933 CollectionRef& c,
2934 OnodeRef& o,
2935 const string& name,
2936 bufferptr& val);
2937 int _setattrs(TransContext *txc,
2938 CollectionRef& c,
2939 OnodeRef& o,
2940 const map<string,bufferptr>& aset);
2941 int _rmattr(TransContext *txc,
2942 CollectionRef& c,
2943 OnodeRef& o,
2944 const string& name);
2945 int _rmattrs(TransContext *txc,
2946 CollectionRef& c,
2947 OnodeRef& o);
2948 void _do_omap_clear(TransContext *txc, const string& prefix, uint64_t id);
2949 int _omap_clear(TransContext *txc,
2950 CollectionRef& c,
2951 OnodeRef& o);
2952 int _omap_setkeys(TransContext *txc,
2953 CollectionRef& c,
2954 OnodeRef& o,
2955 bufferlist& bl);
2956 int _omap_setheader(TransContext *txc,
2957 CollectionRef& c,
2958 OnodeRef& o,
2959 bufferlist& header);
2960 int _omap_rmkeys(TransContext *txc,
2961 CollectionRef& c,
2962 OnodeRef& o,
2963 bufferlist& bl);
2964 int _omap_rmkey_range(TransContext *txc,
2965 CollectionRef& c,
2966 OnodeRef& o,
2967 const string& first, const string& last);
2968 int _set_alloc_hint(
2969 TransContext *txc,
2970 CollectionRef& c,
2971 OnodeRef& o,
2972 uint64_t expected_object_size,
2973 uint64_t expected_write_size,
2974 uint32_t flags);
2975 int _do_clone_range(TransContext *txc,
2976 CollectionRef& c,
2977 OnodeRef& oldo,
2978 OnodeRef& newo,
2979 uint64_t srcoff, uint64_t length, uint64_t dstoff);
2980 int _clone(TransContext *txc,
2981 CollectionRef& c,
2982 OnodeRef& oldo,
2983 OnodeRef& newo);
2984 int _clone_range(TransContext *txc,
2985 CollectionRef& c,
2986 OnodeRef& oldo,
2987 OnodeRef& newo,
2988 uint64_t srcoff, uint64_t length, uint64_t dstoff);
2989 int _rename(TransContext *txc,
2990 CollectionRef& c,
2991 OnodeRef& oldo,
2992 OnodeRef& newo,
2993 const ghobject_t& new_oid);
2994 int _create_collection(TransContext *txc, const coll_t &cid,
2995 unsigned bits, CollectionRef *c);
2996 int _remove_collection(TransContext *txc, const coll_t &cid,
2997 CollectionRef *c);
2998 void _do_remove_collection(TransContext *txc, CollectionRef *c);
2999 int _split_collection(TransContext *txc,
3000 CollectionRef& c,
3001 CollectionRef& d,
3002 unsigned bits, int rem);
3003 int _merge_collection(TransContext *txc,
3004 CollectionRef *c,
3005 CollectionRef& d,
3006 unsigned bits);
3007
3008 private:
3009 std::atomic<uint64_t> out_of_sync_fm = {0};
3010 // --------------------------------------------------------
3011 // BlueFSDeviceExpander implementation
3012 uint64_t get_recommended_expansion_delta(uint64_t bluefs_free,
3013 uint64_t bluefs_total) override {
3014 auto delta = _get_bluefs_size_delta(bluefs_free, bluefs_total);
3015 return delta > 0 ? delta : 0;
3016 }
3017 int allocate_freespace(
3018 uint64_t min_size,
3019 uint64_t size,
3020 PExtentVector& extents) override {
3021 return allocate_bluefs_freespace(min_size, size, &extents);
3022 };
3023 };
3024
3025 inline ostream& operator<<(ostream& out, const BlueStore::volatile_statfs& s) {
3026 return out
3027 << " allocated:"
3028 << s.values[BlueStore::volatile_statfs::STATFS_ALLOCATED]
3029 << " stored:"
3030 << s.values[BlueStore::volatile_statfs::STATFS_STORED]
3031 << " compressed:"
3032 << s.values[BlueStore::volatile_statfs::STATFS_COMPRESSED]
3033 << " compressed_orig:"
3034 << s.values[BlueStore::volatile_statfs::STATFS_COMPRESSED_ORIGINAL]
3035 << " compressed_alloc:"
3036 << s.values[BlueStore::volatile_statfs::STATFS_COMPRESSED_ALLOCATED];
3037 }
3038
3039 static inline void intrusive_ptr_add_ref(BlueStore::Onode *o) {
3040 o->get();
3041 }
3042 static inline void intrusive_ptr_release(BlueStore::Onode *o) {
3043 o->put();
3044 }
3045
3046 static inline void intrusive_ptr_add_ref(BlueStore::OpSequencer *o) {
3047 o->get();
3048 }
3049 static inline void intrusive_ptr_release(BlueStore::OpSequencer *o) {
3050 o->put();
3051 }
3052
3053 class BlueStoreRepairer
3054 {
3055 public:
3056 // to simplify future potential migration to mempools
3057 using fsck_interval = interval_set<uint64_t>;
3058
3059 // Structure to track what pextents are used for specific cid/oid.
3060 // Similar to Bloom filter positive and false-positive matches are
3061 // possible only.
3062 // Maintains two lists of bloom filters for both cids and oids
3063 // where each list entry is a BF for specific disk pextent
3064 // The length of the extent per filter is measured on init.
3065 // Allows to filter out 'uninteresting' pextents to speadup subsequent
3066 // 'is_used' access.
3067 struct StoreSpaceTracker {
3068 const uint64_t BLOOM_FILTER_SALT_COUNT = 2;
3069 const uint64_t BLOOM_FILTER_TABLE_SIZE = 32; // bytes per single filter
3070 const uint64_t BLOOM_FILTER_EXPECTED_COUNT = 16; // arbitrary selected
3071 static const uint64_t DEF_MEM_CAP = 128 * 1024 * 1024;
3072
3073 typedef mempool::bluestore_fsck::vector<bloom_filter> bloom_vector;
3074 bloom_vector collections_bfs;
3075 bloom_vector objects_bfs;
3076
3077 bool was_filtered_out = false;
3078 uint64_t granularity = 0; // extent length for a single filter
3079
3080 StoreSpaceTracker() {
3081 }
3082 StoreSpaceTracker(const StoreSpaceTracker& from) :
3083 collections_bfs(from.collections_bfs),
3084 objects_bfs(from.objects_bfs),
3085 granularity(from.granularity) {
3086 }
3087
3088 void init(uint64_t total,
3089 uint64_t min_alloc_size,
3090 uint64_t mem_cap = DEF_MEM_CAP) {
3091 ceph_assert(!granularity); // not initialized yet
3092 ceph_assert(min_alloc_size && isp2(min_alloc_size));
3093 ceph_assert(mem_cap);
3094
3095 total = round_up_to(total, min_alloc_size);
3096 granularity = total * BLOOM_FILTER_TABLE_SIZE * 2 / mem_cap;
3097
3098 if (!granularity) {
3099 granularity = min_alloc_size;
3100 } else {
3101 granularity = round_up_to(granularity, min_alloc_size);
3102 }
3103
3104 uint64_t entries = round_up_to(total, granularity) / granularity;
3105 collections_bfs.resize(entries,
3106 bloom_filter(BLOOM_FILTER_SALT_COUNT,
3107 BLOOM_FILTER_TABLE_SIZE,
3108 0,
3109 BLOOM_FILTER_EXPECTED_COUNT));
3110 objects_bfs.resize(entries,
3111 bloom_filter(BLOOM_FILTER_SALT_COUNT,
3112 BLOOM_FILTER_TABLE_SIZE,
3113 0,
3114 BLOOM_FILTER_EXPECTED_COUNT));
3115 }
3116 inline uint32_t get_hash(const coll_t& cid) const {
3117 return cid.hash_to_shard(1);
3118 }
3119 inline void set_used(uint64_t offset, uint64_t len,
3120 const coll_t& cid, const ghobject_t& oid) {
3121 ceph_assert(granularity); // initialized
3122
3123 // can't call this func after filter_out has been applied
3124 ceph_assert(!was_filtered_out);
3125 if (!len) {
3126 return;
3127 }
3128 auto pos = offset / granularity;
3129 auto end_pos = (offset + len - 1) / granularity;
3130 while (pos <= end_pos) {
3131 collections_bfs[pos].insert(get_hash(cid));
3132 objects_bfs[pos].insert(oid.hobj.get_hash());
3133 ++pos;
3134 }
3135 }
3136 // filter-out entries unrelated to the specified(broken) extents.
3137 // 'is_used' calls are permitted after that only
3138 size_t filter_out(const fsck_interval& extents);
3139
3140 // determines if collection's present after filtering-out
3141 inline bool is_used(const coll_t& cid) const {
3142 ceph_assert(was_filtered_out);
3143 for(auto& bf : collections_bfs) {
3144 if (bf.contains(get_hash(cid))) {
3145 return true;
3146 }
3147 }
3148 return false;
3149 }
3150 // determines if object's present after filtering-out
3151 inline bool is_used(const ghobject_t& oid) const {
3152 ceph_assert(was_filtered_out);
3153 for(auto& bf : objects_bfs) {
3154 if (bf.contains(oid.hobj.get_hash())) {
3155 return true;
3156 }
3157 }
3158 return false;
3159 }
3160 // determines if collection's present before filtering-out
3161 inline bool is_used(const coll_t& cid, uint64_t offs) const {
3162 ceph_assert(granularity); // initialized
3163 ceph_assert(!was_filtered_out);
3164 auto &bf = collections_bfs[offs / granularity];
3165 if (bf.contains(get_hash(cid))) {
3166 return true;
3167 }
3168 return false;
3169 }
3170 // determines if object's present before filtering-out
3171 inline bool is_used(const ghobject_t& oid, uint64_t offs) const {
3172 ceph_assert(granularity); // initialized
3173 ceph_assert(!was_filtered_out);
3174 auto &bf = objects_bfs[offs / granularity];
3175 if (bf.contains(oid.hobj.get_hash())) {
3176 return true;
3177 }
3178 return false;
3179 }
3180 };
3181 public:
3182
3183 bool remove_key(KeyValueDB *db, const string& prefix, const string& key);
3184 bool fix_shared_blob(KeyValueDB *db,
3185 uint64_t sbid,
3186 const bufferlist* bl);
3187 bool fix_statfs(KeyValueDB *db, const string& key,
3188 const store_statfs_t& new_statfs);
3189
3190 bool fix_leaked(KeyValueDB *db,
3191 FreelistManager* fm,
3192 uint64_t offset, uint64_t len);
3193 bool fix_false_free(KeyValueDB *db,
3194 FreelistManager* fm,
3195 uint64_t offset, uint64_t len);
3196 bool fix_bluefs_extents(std::atomic<uint64_t>& out_of_sync_flag);
3197
3198 void init(uint64_t total_space, uint64_t lres_tracking_unit_size);
3199
3200 bool preprocess_misreference(KeyValueDB *db);
3201
3202 unsigned apply(KeyValueDB* db);
3203
3204 void note_misreference(uint64_t offs, uint64_t len, bool inc_error) {
3205 misreferenced_extents.union_insert(offs, len);
3206 if (inc_error) {
3207 ++to_repair_cnt;
3208 }
3209 }
3210
3211 StoreSpaceTracker& get_space_usage_tracker() {
3212 return space_usage_tracker;
3213 }
3214 const fsck_interval& get_misreferences() const {
3215 return misreferenced_extents;
3216 }
3217 KeyValueDB::Transaction get_fix_misreferences_txn() {
3218 return fix_misreferences_txn;
3219 }
3220
3221 private:
3222 unsigned to_repair_cnt = 0;
3223 KeyValueDB::Transaction fix_fm_leaked_txn;
3224 KeyValueDB::Transaction fix_fm_false_free_txn;
3225 KeyValueDB::Transaction remove_key_txn;
3226 KeyValueDB::Transaction fix_statfs_txn;
3227 KeyValueDB::Transaction fix_shared_blob_txn;
3228
3229 KeyValueDB::Transaction fix_misreferences_txn;
3230
3231 StoreSpaceTracker space_usage_tracker;
3232
3233 // non-shared extents with multiple references
3234 fsck_interval misreferenced_extents;
3235
3236 };
3237 #endif