]> git.proxmox.com Git - ceph.git/blob - ceph/src/os/bluestore/BlueStore.h
update sources to v12.2.3
[ceph.git] / ceph / src / os / bluestore / BlueStore.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_OSD_BLUESTORE_H
16 #define CEPH_OSD_BLUESTORE_H
17
18 #include "acconfig.h"
19
20 #include <unistd.h>
21
22 #include <atomic>
23 #include <mutex>
24 #include <condition_variable>
25
26 #include <boost/intrusive/list.hpp>
27 #include <boost/intrusive/unordered_set.hpp>
28 #include <boost/intrusive/set.hpp>
29 #include <boost/functional/hash.hpp>
30 #include <boost/dynamic_bitset.hpp>
31
32 #include "include/assert.h"
33 #include "include/unordered_map.h"
34 #include "include/memory.h"
35 #include "include/mempool.h"
36 #include "common/Finisher.h"
37 #include "common/perf_counters.h"
38 #include "compressor/Compressor.h"
39 #include "os/ObjectStore.h"
40
41 #include "bluestore_types.h"
42 #include "BlockDevice.h"
43 #include "common/EventTrace.h"
44
45 class Allocator;
46 class FreelistManager;
47 class BlueFS;
48
49 //#define DEBUG_CACHE
50 //#define DEBUG_DEFERRED
51
52
53
54 // constants for Buffer::optimize()
55 #define MAX_BUFFER_SLOP_RATIO_DEN 8 // so actually 1/N
56
57
58 enum {
59 l_bluestore_first = 732430,
60 l_bluestore_kv_flush_lat,
61 l_bluestore_kv_commit_lat,
62 l_bluestore_kv_lat,
63 l_bluestore_state_prepare_lat,
64 l_bluestore_state_aio_wait_lat,
65 l_bluestore_state_io_done_lat,
66 l_bluestore_state_kv_queued_lat,
67 l_bluestore_state_kv_committing_lat,
68 l_bluestore_state_kv_done_lat,
69 l_bluestore_state_deferred_queued_lat,
70 l_bluestore_state_deferred_aio_wait_lat,
71 l_bluestore_state_deferred_cleanup_lat,
72 l_bluestore_state_finishing_lat,
73 l_bluestore_state_done_lat,
74 l_bluestore_throttle_lat,
75 l_bluestore_submit_lat,
76 l_bluestore_commit_lat,
77 l_bluestore_read_lat,
78 l_bluestore_read_onode_meta_lat,
79 l_bluestore_read_wait_aio_lat,
80 l_bluestore_compress_lat,
81 l_bluestore_decompress_lat,
82 l_bluestore_csum_lat,
83 l_bluestore_compress_success_count,
84 l_bluestore_compress_rejected_count,
85 l_bluestore_write_pad_bytes,
86 l_bluestore_deferred_write_ops,
87 l_bluestore_deferred_write_bytes,
88 l_bluestore_write_penalty_read_ops,
89 l_bluestore_allocated,
90 l_bluestore_stored,
91 l_bluestore_compressed,
92 l_bluestore_compressed_allocated,
93 l_bluestore_compressed_original,
94 l_bluestore_onodes,
95 l_bluestore_onode_hits,
96 l_bluestore_onode_misses,
97 l_bluestore_onode_shard_hits,
98 l_bluestore_onode_shard_misses,
99 l_bluestore_extents,
100 l_bluestore_blobs,
101 l_bluestore_buffers,
102 l_bluestore_buffer_bytes,
103 l_bluestore_buffer_hit_bytes,
104 l_bluestore_buffer_miss_bytes,
105 l_bluestore_write_big,
106 l_bluestore_write_big_bytes,
107 l_bluestore_write_big_blobs,
108 l_bluestore_write_small,
109 l_bluestore_write_small_bytes,
110 l_bluestore_write_small_unused,
111 l_bluestore_write_small_deferred,
112 l_bluestore_write_small_pre_read,
113 l_bluestore_write_small_new,
114 l_bluestore_txc,
115 l_bluestore_onode_reshard,
116 l_bluestore_blob_split,
117 l_bluestore_extent_compress,
118 l_bluestore_gc_merged,
119 l_bluestore_read_eio,
120 l_bluestore_last
121 };
122
123 class BlueStore : public ObjectStore,
124 public md_config_obs_t {
125 // -----------------------------------------------------
126 // types
127 public:
128 // config observer
129 const char** get_tracked_conf_keys() const override;
130 void handle_conf_change(const struct md_config_t *conf,
131 const std::set<std::string> &changed) override;
132
133 void _set_csum();
134 void _set_compression();
135 void _set_throttle_params();
136 int _set_cache_sizes();
137
138 class TransContext;
139
140 typedef map<uint64_t, bufferlist> ready_regions_t;
141
142 struct BufferSpace;
143 struct Collection;
144 typedef boost::intrusive_ptr<Collection> CollectionRef;
145
146 struct AioContext {
147 virtual void aio_finish(BlueStore *store) = 0;
148 virtual ~AioContext() {}
149 };
150
151 /// cached buffer
152 struct Buffer {
153 MEMPOOL_CLASS_HELPERS();
154
155 enum {
156 STATE_EMPTY, ///< empty buffer -- used for cache history
157 STATE_CLEAN, ///< clean data that is up to date
158 STATE_WRITING, ///< data that is being written (io not yet complete)
159 };
160 static const char *get_state_name(int s) {
161 switch (s) {
162 case STATE_EMPTY: return "empty";
163 case STATE_CLEAN: return "clean";
164 case STATE_WRITING: return "writing";
165 default: return "???";
166 }
167 }
168 enum {
169 FLAG_NOCACHE = 1, ///< trim when done WRITING (do not become CLEAN)
170 // NOTE: fix operator<< when you define a second flag
171 };
172 static const char *get_flag_name(int s) {
173 switch (s) {
174 case FLAG_NOCACHE: return "nocache";
175 default: return "???";
176 }
177 }
178
179 BufferSpace *space;
180 uint16_t state; ///< STATE_*
181 uint16_t cache_private = 0; ///< opaque (to us) value used by Cache impl
182 uint32_t flags; ///< FLAG_*
183 uint64_t seq;
184 uint32_t offset, length;
185 bufferlist data;
186
187 boost::intrusive::list_member_hook<> lru_item;
188 boost::intrusive::list_member_hook<> state_item;
189
190 Buffer(BufferSpace *space, unsigned s, uint64_t q, uint32_t o, uint32_t l,
191 unsigned f = 0)
192 : space(space), state(s), flags(f), seq(q), offset(o), length(l) {}
193 Buffer(BufferSpace *space, unsigned s, uint64_t q, uint32_t o, bufferlist& b,
194 unsigned f = 0)
195 : space(space), state(s), flags(f), seq(q), offset(o),
196 length(b.length()), data(b) {}
197
198 bool is_empty() const {
199 return state == STATE_EMPTY;
200 }
201 bool is_clean() const {
202 return state == STATE_CLEAN;
203 }
204 bool is_writing() const {
205 return state == STATE_WRITING;
206 }
207
208 uint32_t end() const {
209 return offset + length;
210 }
211
212 void truncate(uint32_t newlen) {
213 assert(newlen < length);
214 if (data.length()) {
215 bufferlist t;
216 t.substr_of(data, 0, newlen);
217 data.claim(t);
218 }
219 length = newlen;
220 }
221 void maybe_rebuild() {
222 if (data.length() &&
223 (data.get_num_buffers() > 1 ||
224 data.front().wasted() > data.length() / MAX_BUFFER_SLOP_RATIO_DEN)) {
225 data.rebuild();
226 }
227 }
228
229 void dump(Formatter *f) const {
230 f->dump_string("state", get_state_name(state));
231 f->dump_unsigned("seq", seq);
232 f->dump_unsigned("offset", offset);
233 f->dump_unsigned("length", length);
234 f->dump_unsigned("data_length", data.length());
235 }
236 };
237
238 struct Cache;
239
240 /// map logical extent range (object) onto buffers
241 struct BufferSpace {
242 typedef boost::intrusive::list<
243 Buffer,
244 boost::intrusive::member_hook<
245 Buffer,
246 boost::intrusive::list_member_hook<>,
247 &Buffer::state_item> > state_list_t;
248
249 mempool::bluestore_cache_other::map<uint32_t, std::unique_ptr<Buffer>>
250 buffer_map;
251
252 // we use a bare intrusive list here instead of std::map because
253 // it uses less memory and we expect this to be very small (very
254 // few IOs in flight to the same Blob at the same time).
255 state_list_t writing; ///< writing buffers, sorted by seq, ascending
256
257 ~BufferSpace() {
258 assert(buffer_map.empty());
259 assert(writing.empty());
260 }
261
262 void _add_buffer(Cache* cache, Buffer *b, int level, Buffer *near) {
263 cache->_audit("_add_buffer start");
264 buffer_map[b->offset].reset(b);
265 if (b->is_writing()) {
266 b->data.reassign_to_mempool(mempool::mempool_bluestore_writing);
267 if (writing.empty() || writing.rbegin()->seq <= b->seq) {
268 writing.push_back(*b);
269 } else {
270 auto it = writing.begin();
271 while (it->seq < b->seq) {
272 ++it;
273 }
274
275 assert(it->seq >= b->seq);
276 // note that this will insert b before it
277 // hence the order is maintained
278 writing.insert(it, *b);
279 }
280 } else {
281 b->data.reassign_to_mempool(mempool::mempool_bluestore_cache_data);
282 cache->_add_buffer(b, level, near);
283 }
284 cache->_audit("_add_buffer end");
285 }
286 void _rm_buffer(Cache* cache, Buffer *b) {
287 _rm_buffer(cache, buffer_map.find(b->offset));
288 }
289 void _rm_buffer(Cache* cache,
290 map<uint32_t, std::unique_ptr<Buffer>>::iterator p) {
291 assert(p != buffer_map.end());
292 cache->_audit("_rm_buffer start");
293 if (p->second->is_writing()) {
294 writing.erase(writing.iterator_to(*p->second));
295 } else {
296 cache->_rm_buffer(p->second.get());
297 }
298 buffer_map.erase(p);
299 cache->_audit("_rm_buffer end");
300 }
301
302 map<uint32_t,std::unique_ptr<Buffer>>::iterator _data_lower_bound(
303 uint32_t offset) {
304 auto i = buffer_map.lower_bound(offset);
305 if (i != buffer_map.begin()) {
306 --i;
307 if (i->first + i->second->length <= offset)
308 ++i;
309 }
310 return i;
311 }
312
313 // must be called under protection of the Cache lock
314 void _clear(Cache* cache);
315
316 // return value is the highest cache_private of a trimmed buffer, or 0.
317 int discard(Cache* cache, uint32_t offset, uint32_t length) {
318 std::lock_guard<std::recursive_mutex> l(cache->lock);
319 return _discard(cache, offset, length);
320 }
321 int _discard(Cache* cache, uint32_t offset, uint32_t length);
322
323 void write(Cache* cache, uint64_t seq, uint32_t offset, bufferlist& bl,
324 unsigned flags) {
325 std::lock_guard<std::recursive_mutex> l(cache->lock);
326 Buffer *b = new Buffer(this, Buffer::STATE_WRITING, seq, offset, bl,
327 flags);
328 b->cache_private = _discard(cache, offset, bl.length());
329 _add_buffer(cache, b, (flags & Buffer::FLAG_NOCACHE) ? 0 : 1, nullptr);
330 }
331 void finish_write(Cache* cache, uint64_t seq);
332 void did_read(Cache* cache, uint32_t offset, bufferlist& bl) {
333 std::lock_guard<std::recursive_mutex> l(cache->lock);
334 Buffer *b = new Buffer(this, Buffer::STATE_CLEAN, 0, offset, bl);
335 b->cache_private = _discard(cache, offset, bl.length());
336 _add_buffer(cache, b, 1, nullptr);
337 }
338
339 void read(Cache* cache, uint32_t offset, uint32_t length,
340 BlueStore::ready_regions_t& res,
341 interval_set<uint32_t>& res_intervals);
342
343 void truncate(Cache* cache, uint32_t offset) {
344 discard(cache, offset, (uint32_t)-1 - offset);
345 }
346
347 void split(Cache* cache, size_t pos, BufferSpace &r);
348
349 void dump(Cache* cache, Formatter *f) const {
350 std::lock_guard<std::recursive_mutex> l(cache->lock);
351 f->open_array_section("buffers");
352 for (auto& i : buffer_map) {
353 f->open_object_section("buffer");
354 assert(i.first == i.second->offset);
355 i.second->dump(f);
356 f->close_section();
357 }
358 f->close_section();
359 }
360 };
361
362 struct SharedBlobSet;
363
364 /// in-memory shared blob state (incl cached buffers)
365 struct SharedBlob {
366 MEMPOOL_CLASS_HELPERS();
367
368 std::atomic_int nref = {0}; ///< reference count
369 bool loaded = false;
370
371 CollectionRef coll;
372 union {
373 uint64_t sbid_unloaded; ///< sbid if persistent isn't loaded
374 bluestore_shared_blob_t *persistent; ///< persistent part of the shared blob if any
375 };
376 BufferSpace bc; ///< buffer cache
377
378 SharedBlob(Collection *_coll) : coll(_coll), sbid_unloaded(0) {
379 if (get_cache()) {
380 get_cache()->add_blob();
381 }
382 }
383 SharedBlob(uint64_t i, Collection *_coll);
384 ~SharedBlob();
385
386 uint64_t get_sbid() const {
387 return loaded ? persistent->sbid : sbid_unloaded;
388 }
389
390 friend void intrusive_ptr_add_ref(SharedBlob *b) { b->get(); }
391 friend void intrusive_ptr_release(SharedBlob *b) { b->put(); }
392
393 friend ostream& operator<<(ostream& out, const SharedBlob& sb);
394
395 void get() {
396 ++nref;
397 }
398 void put();
399
400 /// get logical references
401 void get_ref(uint64_t offset, uint32_t length);
402
403 /// put logical references, and get back any released extents
404 void put_ref(uint64_t offset, uint32_t length,
405 PExtentVector *r, set<SharedBlob*> *maybe_unshared_blobs);
406
407 friend bool operator==(const SharedBlob &l, const SharedBlob &r) {
408 return l.get_sbid() == r.get_sbid();
409 }
410 inline Cache* get_cache() {
411 return coll ? coll->cache : nullptr;
412 }
413 inline SharedBlobSet* get_parent() {
414 return coll ? &(coll->shared_blob_set) : nullptr;
415 }
416 inline bool is_loaded() const {
417 return loaded;
418 }
419
420 };
421 typedef boost::intrusive_ptr<SharedBlob> SharedBlobRef;
422
423 /// a lookup table of SharedBlobs
424 struct SharedBlobSet {
425 std::mutex lock; ///< protect lookup, insertion, removal
426
427 // we use a bare pointer because we don't want to affect the ref
428 // count
429 mempool::bluestore_cache_other::unordered_map<uint64_t,SharedBlob*> sb_map;
430
431 SharedBlobRef lookup(uint64_t sbid) {
432 std::lock_guard<std::mutex> l(lock);
433 auto p = sb_map.find(sbid);
434 if (p == sb_map.end()) {
435 return nullptr;
436 }
437 return p->second;
438 }
439
440 void add(Collection* coll, SharedBlob *sb) {
441 std::lock_guard<std::mutex> l(lock);
442 sb_map[sb->get_sbid()] = sb;
443 sb->coll = coll;
444 }
445
446 bool try_remove(SharedBlob *sb) {
447 std::lock_guard<std::mutex> l(lock);
448 if (sb->nref == 0) {
449 assert(sb->get_parent() == this);
450 sb_map.erase(sb->get_sbid());
451 return true;
452 }
453 return false;
454 }
455
456 void remove(SharedBlob *sb) {
457 std::lock_guard<std::mutex> l(lock);
458 assert(sb->get_parent() == this);
459 sb_map.erase(sb->get_sbid());
460 }
461
462 bool empty() {
463 std::lock_guard<std::mutex> l(lock);
464 return sb_map.empty();
465 }
466
467 void dump(CephContext *cct, int lvl);
468 };
469
470 //#define CACHE_BLOB_BL // not sure if this is a win yet or not... :/
471
472 /// in-memory blob metadata and associated cached buffers (if any)
473 struct Blob {
474 MEMPOOL_CLASS_HELPERS();
475
476 std::atomic_int nref = {0}; ///< reference count
477 int16_t id = -1; ///< id, for spanning blobs only, >= 0
478 int16_t last_encoded_id = -1; ///< (ephemeral) used during encoding only
479 SharedBlobRef shared_blob; ///< shared blob state (if any)
480
481 private:
482 mutable bluestore_blob_t blob; ///< decoded blob metadata
483 #ifdef CACHE_BLOB_BL
484 mutable bufferlist blob_bl; ///< cached encoded blob, blob is dirty if empty
485 #endif
486 /// refs from this shard. ephemeral if id<0, persisted if spanning.
487 bluestore_blob_use_tracker_t used_in_blob;
488
489 public:
490
491 friend void intrusive_ptr_add_ref(Blob *b) { b->get(); }
492 friend void intrusive_ptr_release(Blob *b) { b->put(); }
493
494 friend ostream& operator<<(ostream& out, const Blob &b);
495
496 const bluestore_blob_use_tracker_t& get_blob_use_tracker() const {
497 return used_in_blob;
498 }
499 bool is_referenced() const {
500 return used_in_blob.is_not_empty();
501 }
502 uint32_t get_referenced_bytes() const {
503 return used_in_blob.get_referenced_bytes();
504 }
505
506 bool is_spanning() const {
507 return id >= 0;
508 }
509
510 bool can_split() const {
511 std::lock_guard<std::recursive_mutex> l(shared_blob->get_cache()->lock);
512 // splitting a BufferSpace writing list is too hard; don't try.
513 return shared_blob->bc.writing.empty() &&
514 used_in_blob.can_split() &&
515 get_blob().can_split();
516 }
517
518 bool can_split_at(uint32_t blob_offset) const {
519 return used_in_blob.can_split_at(blob_offset) &&
520 get_blob().can_split_at(blob_offset);
521 }
522
523 bool can_reuse_blob(uint32_t min_alloc_size,
524 uint32_t target_blob_size,
525 uint32_t b_offset,
526 uint32_t *length0);
527
528 void dup(Blob& o) {
529 o.shared_blob = shared_blob;
530 o.blob = blob;
531 #ifdef CACHE_BLOB_BL
532 o.blob_bl = blob_bl;
533 #endif
534 }
535
536 inline const bluestore_blob_t& get_blob() const {
537 return blob;
538 }
539 inline bluestore_blob_t& dirty_blob() {
540 #ifdef CACHE_BLOB_BL
541 blob_bl.clear();
542 #endif
543 return blob;
544 }
545
546 /// discard buffers for unallocated regions
547 void discard_unallocated(Collection *coll);
548
549 /// get logical references
550 void get_ref(Collection *coll, uint32_t offset, uint32_t length);
551 /// put logical references, and get back any released extents
552 bool put_ref(Collection *coll, uint32_t offset, uint32_t length,
553 PExtentVector *r);
554
555 /// split the blob
556 void split(Collection *coll, uint32_t blob_offset, Blob *o);
557
558 void get() {
559 ++nref;
560 }
561 void put() {
562 if (--nref == 0)
563 delete this;
564 }
565
566
567 #ifdef CACHE_BLOB_BL
568 void _encode() const {
569 if (blob_bl.length() == 0 ) {
570 ::encode(blob, blob_bl);
571 } else {
572 assert(blob_bl.length());
573 }
574 }
575 void bound_encode(
576 size_t& p,
577 bool include_ref_map) const {
578 _encode();
579 p += blob_bl.length();
580 if (include_ref_map) {
581 used_in_blob.bound_encode(p);
582 }
583 }
584 void encode(
585 bufferlist::contiguous_appender& p,
586 bool include_ref_map) const {
587 _encode();
588 p.append(blob_bl);
589 if (include_ref_map) {
590 used_in_blob.encode(p);
591 }
592 }
593 void decode(
594 Collection */*coll*/,
595 bufferptr::iterator& p,
596 bool include_ref_map) {
597 const char *start = p.get_pos();
598 denc(blob, p);
599 const char *end = p.get_pos();
600 blob_bl.clear();
601 blob_bl.append(start, end - start);
602 if (include_ref_map) {
603 used_in_blob.decode(p);
604 }
605 }
606 #else
607 void bound_encode(
608 size_t& p,
609 uint64_t struct_v,
610 uint64_t sbid,
611 bool include_ref_map) const {
612 denc(blob, p, struct_v);
613 if (blob.is_shared()) {
614 denc(sbid, p);
615 }
616 if (include_ref_map) {
617 used_in_blob.bound_encode(p);
618 }
619 }
620 void encode(
621 bufferlist::contiguous_appender& p,
622 uint64_t struct_v,
623 uint64_t sbid,
624 bool include_ref_map) const {
625 denc(blob, p, struct_v);
626 if (blob.is_shared()) {
627 denc(sbid, p);
628 }
629 if (include_ref_map) {
630 used_in_blob.encode(p);
631 }
632 }
633 void decode(
634 Collection *coll,
635 bufferptr::iterator& p,
636 uint64_t struct_v,
637 uint64_t* sbid,
638 bool include_ref_map);
639 #endif
640 };
641 typedef boost::intrusive_ptr<Blob> BlobRef;
642 typedef mempool::bluestore_cache_other::map<int,BlobRef> blob_map_t;
643
644 /// a logical extent, pointing to (some portion of) a blob
645 typedef boost::intrusive::set_base_hook<boost::intrusive::optimize_size<true> > ExtentBase; //making an alias to avoid build warnings
646 struct Extent : public ExtentBase {
647 MEMPOOL_CLASS_HELPERS();
648
649 uint32_t logical_offset = 0; ///< logical offset
650 uint32_t blob_offset = 0; ///< blob offset
651 uint32_t length = 0; ///< length
652 BlobRef blob; ///< the blob with our data
653
654 /// ctor for lookup only
655 explicit Extent(uint32_t lo) : ExtentBase(), logical_offset(lo) { }
656 /// ctor for delayed initialization (see decode_some())
657 explicit Extent() : ExtentBase() {
658 }
659 /// ctor for general usage
660 Extent(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b)
661 : ExtentBase(),
662 logical_offset(lo), blob_offset(o), length(l) {
663 assign_blob(b);
664 }
665 ~Extent() {
666 if (blob) {
667 blob->shared_blob->get_cache()->rm_extent();
668 }
669 }
670
671 void assign_blob(const BlobRef& b) {
672 assert(!blob);
673 blob = b;
674 blob->shared_blob->get_cache()->add_extent();
675 }
676
677 // comparators for intrusive_set
678 friend bool operator<(const Extent &a, const Extent &b) {
679 return a.logical_offset < b.logical_offset;
680 }
681 friend bool operator>(const Extent &a, const Extent &b) {
682 return a.logical_offset > b.logical_offset;
683 }
684 friend bool operator==(const Extent &a, const Extent &b) {
685 return a.logical_offset == b.logical_offset;
686 }
687
688 uint32_t blob_start() const {
689 return logical_offset - blob_offset;
690 }
691
692 uint32_t blob_end() const {
693 return blob_start() + blob->get_blob().get_logical_length();
694 }
695
696 uint32_t logical_end() const {
697 return logical_offset + length;
698 }
699
700 // return true if any piece of the blob is out of
701 // the given range [o, o + l].
702 bool blob_escapes_range(uint32_t o, uint32_t l) const {
703 return blob_start() < o || blob_end() > o + l;
704 }
705 };
706 typedef boost::intrusive::set<Extent> extent_map_t;
707
708
709 friend ostream& operator<<(ostream& out, const Extent& e);
710
711 struct OldExtent {
712 boost::intrusive::list_member_hook<> old_extent_item;
713 Extent e;
714 PExtentVector r;
715 bool blob_empty; // flag to track the last removed extent that makes blob
716 // empty - required to update compression stat properly
717 OldExtent(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b)
718 : e(lo, o, l, b), blob_empty(false) {
719 }
720 static OldExtent* create(CollectionRef c,
721 uint32_t lo,
722 uint32_t o,
723 uint32_t l,
724 BlobRef& b);
725 };
726 typedef boost::intrusive::list<
727 OldExtent,
728 boost::intrusive::member_hook<
729 OldExtent,
730 boost::intrusive::list_member_hook<>,
731 &OldExtent::old_extent_item> > old_extent_map_t;
732
733 struct Onode;
734
735 /// a sharded extent map, mapping offsets to lextents to blobs
736 struct ExtentMap {
737 Onode *onode;
738 extent_map_t extent_map; ///< map of Extents to Blobs
739 blob_map_t spanning_blob_map; ///< blobs that span shards
740
741 struct Shard {
742 bluestore_onode_t::shard_info *shard_info = nullptr;
743 unsigned extents = 0; ///< count extents in this shard
744 bool loaded = false; ///< true if shard is loaded
745 bool dirty = false; ///< true if shard is dirty and needs reencoding
746 };
747 mempool::bluestore_cache_other::vector<Shard> shards; ///< shards
748
749 bufferlist inline_bl; ///< cached encoded map, if unsharded; empty=>dirty
750
751 uint32_t needs_reshard_begin = 0;
752 uint32_t needs_reshard_end = 0;
753
754 bool needs_reshard() const {
755 return needs_reshard_end > needs_reshard_begin;
756 }
757 void clear_needs_reshard() {
758 needs_reshard_begin = needs_reshard_end = 0;
759 }
760 void request_reshard(uint32_t begin, uint32_t end) {
761 if (begin < needs_reshard_begin) {
762 needs_reshard_begin = begin;
763 }
764 if (end > needs_reshard_end) {
765 needs_reshard_end = end;
766 }
767 }
768
769 struct DeleteDisposer {
770 void operator()(Extent *e) { delete e; }
771 };
772
773 ExtentMap(Onode *o);
774 ~ExtentMap() {
775 extent_map.clear_and_dispose(DeleteDisposer());
776 }
777
778 void clear() {
779 extent_map.clear_and_dispose(DeleteDisposer());
780 shards.clear();
781 inline_bl.clear();
782 clear_needs_reshard();
783 }
784
785 bool encode_some(uint32_t offset, uint32_t length, bufferlist& bl,
786 unsigned *pn);
787 unsigned decode_some(bufferlist& bl);
788
789 void bound_encode_spanning_blobs(size_t& p);
790 void encode_spanning_blobs(bufferlist::contiguous_appender& p);
791 void decode_spanning_blobs(bufferptr::iterator& p);
792
793 BlobRef get_spanning_blob(int id) {
794 auto p = spanning_blob_map.find(id);
795 assert(p != spanning_blob_map.end());
796 return p->second;
797 }
798
799 void update(KeyValueDB::Transaction t, bool force);
800 decltype(BlueStore::Blob::id) allocate_spanning_blob_id();
801 void reshard(
802 KeyValueDB *db,
803 KeyValueDB::Transaction t);
804
805 /// initialize Shards from the onode
806 void init_shards(bool loaded, bool dirty);
807
808 /// return index of shard containing offset
809 /// or -1 if not found
810 int seek_shard(uint32_t offset) {
811 size_t end = shards.size();
812 size_t mid, left = 0;
813 size_t right = end; // one passed the right end
814
815 while (left < right) {
816 mid = left + (right - left) / 2;
817 if (offset >= shards[mid].shard_info->offset) {
818 size_t next = mid + 1;
819 if (next >= end || offset < shards[next].shard_info->offset)
820 return mid;
821 //continue to search forwards
822 left = next;
823 } else {
824 //continue to search backwards
825 right = mid;
826 }
827 }
828
829 return -1; // not found
830 }
831
832 /// check if a range spans a shard
833 bool spans_shard(uint32_t offset, uint32_t length) {
834 if (shards.empty()) {
835 return false;
836 }
837 int s = seek_shard(offset);
838 assert(s >= 0);
839 if (s == (int)shards.size() - 1) {
840 return false; // last shard
841 }
842 if (offset + length <= shards[s+1].shard_info->offset) {
843 return false;
844 }
845 return true;
846 }
847
848 /// ensure that a range of the map is loaded
849 void fault_range(KeyValueDB *db,
850 uint32_t offset, uint32_t length);
851
852 /// ensure a range of the map is marked dirty
853 void dirty_range(uint32_t offset, uint32_t length);
854
855 /// for seek_lextent test
856 extent_map_t::iterator find(uint64_t offset);
857
858 /// seek to the first lextent including or after offset
859 extent_map_t::iterator seek_lextent(uint64_t offset);
860 extent_map_t::const_iterator seek_lextent(uint64_t offset) const;
861
862 /// add a new Extent
863 void add(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b) {
864 extent_map.insert(*new Extent(lo, o, l, b));
865 }
866
867 /// remove (and delete) an Extent
868 void rm(extent_map_t::iterator p) {
869 extent_map.erase_and_dispose(p, DeleteDisposer());
870 }
871
872 bool has_any_lextents(uint64_t offset, uint64_t length);
873
874 /// consolidate adjacent lextents in extent_map
875 int compress_extent_map(uint64_t offset, uint64_t length);
876
877 /// punch a logical hole. add lextents to deref to target list.
878 void punch_hole(CollectionRef &c,
879 uint64_t offset, uint64_t length,
880 old_extent_map_t *old_extents);
881
882 /// put new lextent into lextent_map overwriting existing ones if
883 /// any and update references accordingly
884 Extent *set_lextent(CollectionRef &c,
885 uint64_t logical_offset,
886 uint64_t offset, uint64_t length,
887 BlobRef b,
888 old_extent_map_t *old_extents);
889
890 /// split a blob (and referring extents)
891 BlobRef split_blob(BlobRef lb, uint32_t blob_offset, uint32_t pos);
892 };
893
894 /// Compressed Blob Garbage collector
895 /*
896 The primary idea of the collector is to estimate a difference between
897 allocation units(AU) currently present for compressed blobs and new AUs
898 required to store that data uncompressed.
899 Estimation is performed for protrusive extents within a logical range
900 determined by a concatenation of old_extents collection and specific(current)
901 write request.
902 The root cause for old_extents use is the need to handle blob ref counts
903 properly. Old extents still hold blob refs and hence we need to traverse
904 the collection to determine if blob to be released.
905 Protrusive extents are extents that fit into the blob set in action
906 (ones that are below the logical range from above) but not removed totally
907 due to the current write.
908 E.g. for
909 extent1 <loffs = 100, boffs = 100, len = 100> ->
910 blob1<compressed, len_on_disk=4096, logical_len=8192>
911 extent2 <loffs = 200, boffs = 200, len = 100> ->
912 blob2<raw, len_on_disk=4096, llen=4096>
913 extent3 <loffs = 300, boffs = 300, len = 100> ->
914 blob1<compressed, len_on_disk=4096, llen=8192>
915 extent4 <loffs = 4096, boffs = 0, len = 100> ->
916 blob3<raw, len_on_disk=4096, llen=4096>
917 write(300~100)
918 protrusive extents are within the following ranges <0~300, 400~8192-400>
919 In this case existing AUs that might be removed due to GC (i.e. blob1)
920 use 2x4K bytes.
921 And new AUs expected after GC = 0 since extent1 to be merged into blob2.
922 Hence we should do a collect.
923 */
924 class GarbageCollector
925 {
926 public:
927 /// return amount of allocation units that might be saved due to GC
928 int64_t estimate(
929 uint64_t offset,
930 uint64_t length,
931 const ExtentMap& extent_map,
932 const old_extent_map_t& old_extents,
933 uint64_t min_alloc_size);
934
935 /// return a collection of extents to perform GC on
936 const vector<AllocExtent>& get_extents_to_collect() const {
937 return extents_to_collect;
938 }
939 GarbageCollector(CephContext* _cct) : cct(_cct) {}
940
941 private:
942 struct BlobInfo {
943 uint64_t referenced_bytes = 0; ///< amount of bytes referenced in blob
944 int64_t expected_allocations = 0; ///< new alloc units required
945 ///< in case of gc fulfilled
946 bool collect_candidate = false; ///< indicate if blob has any extents
947 ///< eligible for GC.
948 extent_map_t::const_iterator first_lextent; ///< points to the first
949 ///< lextent referring to
950 ///< the blob if any.
951 ///< collect_candidate flag
952 ///< determines the validity
953 extent_map_t::const_iterator last_lextent; ///< points to the last
954 ///< lextent referring to
955 ///< the blob if any.
956
957 BlobInfo(uint64_t ref_bytes) :
958 referenced_bytes(ref_bytes) {
959 }
960 };
961 CephContext* cct;
962 map<Blob*, BlobInfo> affected_blobs; ///< compressed blobs and their ref_map
963 ///< copies that are affected by the
964 ///< specific write
965
966 vector<AllocExtent> extents_to_collect; ///< protrusive extents that should
967 ///< be collected if GC takes place
968
969 boost::optional<uint64_t > used_alloc_unit; ///< last processed allocation
970 ///< unit when traversing
971 ///< protrusive extents.
972 ///< Other extents mapped to
973 ///< this AU to be ignored
974 ///< (except the case where
975 ///< uncompressed extent follows
976 ///< compressed one - see below).
977 BlobInfo* blob_info_counted = nullptr; ///< set if previous allocation unit
978 ///< caused expected_allocations
979 ///< counter increment at this blob.
980 ///< if uncompressed extent follows
981 ///< a decrement for the
982 ///< expected_allocations counter
983 ///< is needed
984 int64_t expected_allocations = 0; ///< new alloc units required in case
985 ///< of gc fulfilled
986 int64_t expected_for_release = 0; ///< alloc units currently used by
987 ///< compressed blobs that might
988 ///< gone after GC
989 uint64_t gc_start_offset; ///starting offset for GC
990 uint64_t gc_end_offset; ///ending offset for GC
991
992 protected:
993 void process_protrusive_extents(const BlueStore::ExtentMap& extent_map,
994 uint64_t start_offset,
995 uint64_t end_offset,
996 uint64_t start_touch_offset,
997 uint64_t end_touch_offset,
998 uint64_t min_alloc_size);
999 };
1000
1001 struct OnodeSpace;
1002
1003 /// an in-memory object
1004 struct Onode {
1005 MEMPOOL_CLASS_HELPERS();
1006
1007 std::atomic_int nref; ///< reference count
1008 Collection *c;
1009
1010 ghobject_t oid;
1011
1012 /// key under PREFIX_OBJ where we are stored
1013 mempool::bluestore_cache_other::string key;
1014
1015 boost::intrusive::list_member_hook<> lru_item;
1016
1017 bluestore_onode_t onode; ///< metadata stored as value in kv store
1018 bool exists; ///< true if object logically exists
1019
1020 ExtentMap extent_map;
1021
1022 // track txc's that have not been committed to kv store (and whose
1023 // effects cannot be read via the kvdb read methods)
1024 std::atomic<int> flushing_count = {0};
1025 std::mutex flush_lock; ///< protect flush_txns
1026 std::condition_variable flush_cond; ///< wait here for uncommitted txns
1027
1028 Onode(Collection *c, const ghobject_t& o,
1029 const mempool::bluestore_cache_other::string& k)
1030 : nref(0),
1031 c(c),
1032 oid(o),
1033 key(k),
1034 exists(false),
1035 extent_map(this) {
1036 }
1037
1038 void flush();
1039 void get() {
1040 ++nref;
1041 }
1042 void put() {
1043 if (--nref == 0)
1044 delete this;
1045 }
1046 };
1047 typedef boost::intrusive_ptr<Onode> OnodeRef;
1048
1049
1050 /// a cache (shard) of onodes and buffers
1051 struct Cache {
1052 CephContext* cct;
1053 PerfCounters *logger;
1054 std::recursive_mutex lock; ///< protect lru and other structures
1055
1056 std::atomic<uint64_t> num_extents = {0};
1057 std::atomic<uint64_t> num_blobs = {0};
1058
1059 static Cache *create(CephContext* cct, string type, PerfCounters *logger);
1060
1061 Cache(CephContext* cct) : cct(cct), logger(nullptr) {}
1062 virtual ~Cache() {}
1063
1064 virtual void _add_onode(OnodeRef& o, int level) = 0;
1065 virtual void _rm_onode(OnodeRef& o) = 0;
1066 virtual void _touch_onode(OnodeRef& o) = 0;
1067
1068 virtual void _add_buffer(Buffer *b, int level, Buffer *near) = 0;
1069 virtual void _rm_buffer(Buffer *b) = 0;
1070 virtual void _move_buffer(Cache *src, Buffer *b) = 0;
1071 virtual void _adjust_buffer_size(Buffer *b, int64_t delta) = 0;
1072 virtual void _touch_buffer(Buffer *b) = 0;
1073
1074 virtual uint64_t _get_num_onodes() = 0;
1075 virtual uint64_t _get_buffer_bytes() = 0;
1076
1077 void add_extent() {
1078 ++num_extents;
1079 }
1080 void rm_extent() {
1081 --num_extents;
1082 }
1083
1084 void add_blob() {
1085 ++num_blobs;
1086 }
1087 void rm_blob() {
1088 --num_blobs;
1089 }
1090
1091 void trim(uint64_t target_bytes,
1092 float target_meta_ratio,
1093 float target_data_ratio,
1094 float bytes_per_onode);
1095
1096 void trim_all();
1097
1098 virtual void _trim(uint64_t onode_max, uint64_t buffer_max) = 0;
1099
1100 virtual void add_stats(uint64_t *onodes, uint64_t *extents,
1101 uint64_t *blobs,
1102 uint64_t *buffers,
1103 uint64_t *bytes) = 0;
1104
1105 bool empty() {
1106 std::lock_guard<std::recursive_mutex> l(lock);
1107 return _get_num_onodes() == 0 && _get_buffer_bytes() == 0;
1108 }
1109
1110 #ifdef DEBUG_CACHE
1111 virtual void _audit(const char *s) = 0;
1112 #else
1113 void _audit(const char *s) { /* no-op */ }
1114 #endif
1115 };
1116
1117 /// simple LRU cache for onodes and buffers
1118 struct LRUCache : public Cache {
1119 private:
1120 typedef boost::intrusive::list<
1121 Onode,
1122 boost::intrusive::member_hook<
1123 Onode,
1124 boost::intrusive::list_member_hook<>,
1125 &Onode::lru_item> > onode_lru_list_t;
1126 typedef boost::intrusive::list<
1127 Buffer,
1128 boost::intrusive::member_hook<
1129 Buffer,
1130 boost::intrusive::list_member_hook<>,
1131 &Buffer::lru_item> > buffer_lru_list_t;
1132
1133 onode_lru_list_t onode_lru;
1134
1135 buffer_lru_list_t buffer_lru;
1136 uint64_t buffer_size = 0;
1137
1138 public:
1139 LRUCache(CephContext* cct) : Cache(cct) {}
1140 uint64_t _get_num_onodes() override {
1141 return onode_lru.size();
1142 }
1143 void _add_onode(OnodeRef& o, int level) override {
1144 if (level > 0)
1145 onode_lru.push_front(*o);
1146 else
1147 onode_lru.push_back(*o);
1148 }
1149 void _rm_onode(OnodeRef& o) override {
1150 auto q = onode_lru.iterator_to(*o);
1151 onode_lru.erase(q);
1152 }
1153 void _touch_onode(OnodeRef& o) override;
1154
1155 uint64_t _get_buffer_bytes() override {
1156 return buffer_size;
1157 }
1158 void _add_buffer(Buffer *b, int level, Buffer *near) override {
1159 if (near) {
1160 auto q = buffer_lru.iterator_to(*near);
1161 buffer_lru.insert(q, *b);
1162 } else if (level > 0) {
1163 buffer_lru.push_front(*b);
1164 } else {
1165 buffer_lru.push_back(*b);
1166 }
1167 buffer_size += b->length;
1168 }
1169 void _rm_buffer(Buffer *b) override {
1170 assert(buffer_size >= b->length);
1171 buffer_size -= b->length;
1172 auto q = buffer_lru.iterator_to(*b);
1173 buffer_lru.erase(q);
1174 }
1175 void _move_buffer(Cache *src, Buffer *b) override {
1176 src->_rm_buffer(b);
1177 _add_buffer(b, 0, nullptr);
1178 }
1179 void _adjust_buffer_size(Buffer *b, int64_t delta) override {
1180 assert((int64_t)buffer_size + delta >= 0);
1181 buffer_size += delta;
1182 }
1183 void _touch_buffer(Buffer *b) override {
1184 auto p = buffer_lru.iterator_to(*b);
1185 buffer_lru.erase(p);
1186 buffer_lru.push_front(*b);
1187 _audit("_touch_buffer end");
1188 }
1189
1190 void _trim(uint64_t onode_max, uint64_t buffer_max) override;
1191
1192 void add_stats(uint64_t *onodes, uint64_t *extents,
1193 uint64_t *blobs,
1194 uint64_t *buffers,
1195 uint64_t *bytes) override {
1196 std::lock_guard<std::recursive_mutex> l(lock);
1197 *onodes += onode_lru.size();
1198 *extents += num_extents;
1199 *blobs += num_blobs;
1200 *buffers += buffer_lru.size();
1201 *bytes += buffer_size;
1202 }
1203
1204 #ifdef DEBUG_CACHE
1205 void _audit(const char *s) override;
1206 #endif
1207 };
1208
1209 // 2Q cache for buffers, LRU for onodes
1210 struct TwoQCache : public Cache {
1211 private:
1212 // stick with LRU for onodes for now (fixme?)
1213 typedef boost::intrusive::list<
1214 Onode,
1215 boost::intrusive::member_hook<
1216 Onode,
1217 boost::intrusive::list_member_hook<>,
1218 &Onode::lru_item> > onode_lru_list_t;
1219 typedef boost::intrusive::list<
1220 Buffer,
1221 boost::intrusive::member_hook<
1222 Buffer,
1223 boost::intrusive::list_member_hook<>,
1224 &Buffer::lru_item> > buffer_list_t;
1225
1226 onode_lru_list_t onode_lru;
1227
1228 buffer_list_t buffer_hot; ///< "Am" hot buffers
1229 buffer_list_t buffer_warm_in; ///< "A1in" newly warm buffers
1230 buffer_list_t buffer_warm_out; ///< "A1out" empty buffers we've evicted
1231 uint64_t buffer_bytes = 0; ///< bytes
1232
1233 enum {
1234 BUFFER_NEW = 0,
1235 BUFFER_WARM_IN, ///< in buffer_warm_in
1236 BUFFER_WARM_OUT, ///< in buffer_warm_out
1237 BUFFER_HOT, ///< in buffer_hot
1238 BUFFER_TYPE_MAX
1239 };
1240
1241 uint64_t buffer_list_bytes[BUFFER_TYPE_MAX] = {0}; ///< bytes per type
1242
1243 public:
1244 TwoQCache(CephContext* cct) : Cache(cct) {}
1245 uint64_t _get_num_onodes() override {
1246 return onode_lru.size();
1247 }
1248 void _add_onode(OnodeRef& o, int level) override {
1249 if (level > 0)
1250 onode_lru.push_front(*o);
1251 else
1252 onode_lru.push_back(*o);
1253 }
1254 void _rm_onode(OnodeRef& o) override {
1255 auto q = onode_lru.iterator_to(*o);
1256 onode_lru.erase(q);
1257 }
1258 void _touch_onode(OnodeRef& o) override;
1259
1260 uint64_t _get_buffer_bytes() override {
1261 return buffer_bytes;
1262 }
1263 void _add_buffer(Buffer *b, int level, Buffer *near) override;
1264 void _rm_buffer(Buffer *b) override;
1265 void _move_buffer(Cache *src, Buffer *b) override;
1266 void _adjust_buffer_size(Buffer *b, int64_t delta) override;
1267 void _touch_buffer(Buffer *b) override {
1268 switch (b->cache_private) {
1269 case BUFFER_WARM_IN:
1270 // do nothing (somewhat counter-intuitively!)
1271 break;
1272 case BUFFER_WARM_OUT:
1273 // move from warm_out to hot LRU
1274 assert(0 == "this happens via discard hint");
1275 break;
1276 case BUFFER_HOT:
1277 // move to front of hot LRU
1278 buffer_hot.erase(buffer_hot.iterator_to(*b));
1279 buffer_hot.push_front(*b);
1280 break;
1281 }
1282 _audit("_touch_buffer end");
1283 }
1284
1285 void _trim(uint64_t onode_max, uint64_t buffer_max) override;
1286
1287 void add_stats(uint64_t *onodes, uint64_t *extents,
1288 uint64_t *blobs,
1289 uint64_t *buffers,
1290 uint64_t *bytes) override {
1291 std::lock_guard<std::recursive_mutex> l(lock);
1292 *onodes += onode_lru.size();
1293 *extents += num_extents;
1294 *blobs += num_blobs;
1295 *buffers += buffer_hot.size() + buffer_warm_in.size();
1296 *bytes += buffer_bytes;
1297 }
1298
1299 #ifdef DEBUG_CACHE
1300 void _audit(const char *s) override;
1301 #endif
1302 };
1303
1304 struct OnodeSpace {
1305 private:
1306 Cache *cache;
1307
1308 /// forward lookups
1309 mempool::bluestore_cache_other::unordered_map<ghobject_t,OnodeRef> onode_map;
1310
1311 friend class Collection; // for split_cache()
1312
1313 public:
1314 OnodeSpace(Cache *c) : cache(c) {}
1315 ~OnodeSpace() {
1316 clear();
1317 }
1318
1319 OnodeRef add(const ghobject_t& oid, OnodeRef o);
1320 OnodeRef lookup(const ghobject_t& o);
1321 void remove(const ghobject_t& oid) {
1322 onode_map.erase(oid);
1323 }
1324 void rename(OnodeRef& o, const ghobject_t& old_oid,
1325 const ghobject_t& new_oid,
1326 const mempool::bluestore_cache_other::string& new_okey);
1327 void clear();
1328 bool empty();
1329
1330 void dump(CephContext *cct, int lvl);
1331
1332 /// return true if f true for any item
1333 bool map_any(std::function<bool(OnodeRef)> f);
1334 };
1335
1336 struct Collection : public CollectionImpl {
1337 BlueStore *store;
1338 Cache *cache; ///< our cache shard
1339 coll_t cid;
1340 bluestore_cnode_t cnode;
1341 RWLock lock;
1342
1343 bool exists;
1344
1345 SharedBlobSet shared_blob_set; ///< open SharedBlobs
1346
1347 // cache onodes on a per-collection basis to avoid lock
1348 // contention.
1349 OnodeSpace onode_map;
1350
1351 //pool options
1352 pool_opts_t pool_opts;
1353
1354 OnodeRef get_onode(const ghobject_t& oid, bool create);
1355
1356 // the terminology is confusing here, sorry!
1357 //
1358 // blob_t shared_blob_t
1359 // !shared unused -> open
1360 // shared !loaded -> open + shared
1361 // shared loaded -> open + shared + loaded
1362 //
1363 // i.e.,
1364 // open = SharedBlob is instantiated
1365 // shared = blob_t shared flag is set; SharedBlob is hashed.
1366 // loaded = SharedBlob::shared_blob_t is loaded from kv store
1367 void open_shared_blob(uint64_t sbid, BlobRef b);
1368 void load_shared_blob(SharedBlobRef sb);
1369 void make_blob_shared(uint64_t sbid, BlobRef b);
1370 uint64_t make_blob_unshared(SharedBlob *sb);
1371
1372 BlobRef new_blob() {
1373 BlobRef b = new Blob();
1374 b->shared_blob = new SharedBlob(this);
1375 return b;
1376 }
1377
1378 const coll_t &get_cid() override {
1379 return cid;
1380 }
1381
1382 bool contains(const ghobject_t& oid) {
1383 if (cid.is_meta())
1384 return oid.hobj.pool == -1;
1385 spg_t spgid;
1386 if (cid.is_pg(&spgid))
1387 return
1388 spgid.pgid.contains(cnode.bits, oid) &&
1389 oid.shard_id == spgid.shard;
1390 return false;
1391 }
1392
1393 void split_cache(Collection *dest);
1394
1395 Collection(BlueStore *ns, Cache *ca, coll_t c);
1396 };
1397
1398 class OmapIteratorImpl : public ObjectMap::ObjectMapIteratorImpl {
1399 CollectionRef c;
1400 OnodeRef o;
1401 KeyValueDB::Iterator it;
1402 string head, tail;
1403 public:
1404 OmapIteratorImpl(CollectionRef c, OnodeRef o, KeyValueDB::Iterator it);
1405 int seek_to_first() override;
1406 int upper_bound(const string &after) override;
1407 int lower_bound(const string &to) override;
1408 bool valid() override;
1409 int next(bool validate=true) override;
1410 string key() override;
1411 bufferlist value() override;
1412 int status() override {
1413 return 0;
1414 }
1415 };
1416
1417 class OpSequencer;
1418 typedef boost::intrusive_ptr<OpSequencer> OpSequencerRef;
1419
1420 struct volatile_statfs{
1421 enum {
1422 STATFS_ALLOCATED = 0,
1423 STATFS_STORED,
1424 STATFS_COMPRESSED_ORIGINAL,
1425 STATFS_COMPRESSED,
1426 STATFS_COMPRESSED_ALLOCATED,
1427 STATFS_LAST
1428 };
1429 int64_t values[STATFS_LAST];
1430 volatile_statfs() {
1431 memset(this, 0, sizeof(volatile_statfs));
1432 }
1433 void reset() {
1434 *this = volatile_statfs();
1435 }
1436 volatile_statfs& operator+=(const volatile_statfs& other) {
1437 for (size_t i = 0; i < STATFS_LAST; ++i) {
1438 values[i] += other.values[i];
1439 }
1440 return *this;
1441 }
1442 int64_t& allocated() {
1443 return values[STATFS_ALLOCATED];
1444 }
1445 int64_t& stored() {
1446 return values[STATFS_STORED];
1447 }
1448 int64_t& compressed_original() {
1449 return values[STATFS_COMPRESSED_ORIGINAL];
1450 }
1451 int64_t& compressed() {
1452 return values[STATFS_COMPRESSED];
1453 }
1454 int64_t& compressed_allocated() {
1455 return values[STATFS_COMPRESSED_ALLOCATED];
1456 }
1457 bool is_empty() {
1458 return values[STATFS_ALLOCATED] == 0 &&
1459 values[STATFS_STORED] == 0 &&
1460 values[STATFS_COMPRESSED] == 0 &&
1461 values[STATFS_COMPRESSED_ORIGINAL] == 0 &&
1462 values[STATFS_COMPRESSED_ALLOCATED] == 0;
1463 }
1464 void decode(bufferlist::iterator& it) {
1465 for (size_t i = 0; i < STATFS_LAST; i++) {
1466 ::decode(values[i], it);
1467 }
1468 }
1469
1470 void encode(bufferlist& bl) {
1471 for (size_t i = 0; i < STATFS_LAST; i++) {
1472 ::encode(values[i], bl);
1473 }
1474 }
1475 };
1476
1477 struct TransContext : public AioContext {
1478 MEMPOOL_CLASS_HELPERS();
1479
1480 typedef enum {
1481 STATE_PREPARE,
1482 STATE_AIO_WAIT,
1483 STATE_IO_DONE,
1484 STATE_KV_QUEUED, // queued for kv_sync_thread submission
1485 STATE_KV_SUBMITTED, // submitted to kv; not yet synced
1486 STATE_KV_DONE,
1487 STATE_DEFERRED_QUEUED, // in deferred_queue (pending or running)
1488 STATE_DEFERRED_CLEANUP, // remove deferred kv record
1489 STATE_DEFERRED_DONE,
1490 STATE_FINISHING,
1491 STATE_DONE,
1492 } state_t;
1493
1494 state_t state = STATE_PREPARE;
1495
1496 const char *get_state_name() {
1497 switch (state) {
1498 case STATE_PREPARE: return "prepare";
1499 case STATE_AIO_WAIT: return "aio_wait";
1500 case STATE_IO_DONE: return "io_done";
1501 case STATE_KV_QUEUED: return "kv_queued";
1502 case STATE_KV_SUBMITTED: return "kv_submitted";
1503 case STATE_KV_DONE: return "kv_done";
1504 case STATE_DEFERRED_QUEUED: return "deferred_queued";
1505 case STATE_DEFERRED_CLEANUP: return "deferred_cleanup";
1506 case STATE_DEFERRED_DONE: return "deferred_done";
1507 case STATE_FINISHING: return "finishing";
1508 case STATE_DONE: return "done";
1509 }
1510 return "???";
1511 }
1512
1513 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1514 const char *get_state_latency_name(int state) {
1515 switch (state) {
1516 case l_bluestore_state_prepare_lat: return "prepare";
1517 case l_bluestore_state_aio_wait_lat: return "aio_wait";
1518 case l_bluestore_state_io_done_lat: return "io_done";
1519 case l_bluestore_state_kv_queued_lat: return "kv_queued";
1520 case l_bluestore_state_kv_committing_lat: return "kv_committing";
1521 case l_bluestore_state_kv_done_lat: return "kv_done";
1522 case l_bluestore_state_deferred_queued_lat: return "deferred_queued";
1523 case l_bluestore_state_deferred_cleanup_lat: return "deferred_cleanup";
1524 case l_bluestore_state_finishing_lat: return "finishing";
1525 case l_bluestore_state_done_lat: return "done";
1526 }
1527 return "???";
1528 }
1529 #endif
1530
1531 void log_state_latency(PerfCounters *logger, int state) {
1532 utime_t lat, now = ceph_clock_now();
1533 lat = now - last_stamp;
1534 logger->tinc(state, lat);
1535 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1536 if (state >= l_bluestore_state_prepare_lat && state <= l_bluestore_state_done_lat) {
1537 double usecs = (now.to_nsec()-last_stamp.to_nsec())/1000;
1538 OID_ELAPSED("", usecs, get_state_latency_name(state));
1539 }
1540 #endif
1541 last_stamp = now;
1542 }
1543
1544 OpSequencerRef osr;
1545 boost::intrusive::list_member_hook<> sequencer_item;
1546
1547 uint64_t bytes = 0, cost = 0;
1548
1549 set<OnodeRef> onodes; ///< these need to be updated/written
1550 set<OnodeRef> modified_objects; ///< objects we modified (and need a ref)
1551 set<SharedBlobRef> shared_blobs; ///< these need to be updated/written
1552 set<SharedBlobRef> shared_blobs_written; ///< update these on io completion
1553
1554 KeyValueDB::Transaction t; ///< then we will commit this
1555 Context *oncommit = nullptr; ///< signal on commit
1556 Context *onreadable = nullptr; ///< signal on readable
1557 Context *onreadable_sync = nullptr; ///< signal on readable
1558 list<Context*> oncommits; ///< more commit completions
1559 list<CollectionRef> removed_collections; ///< colls we removed
1560
1561 boost::intrusive::list_member_hook<> deferred_queue_item;
1562 bluestore_deferred_transaction_t *deferred_txn = nullptr; ///< if any
1563
1564 interval_set<uint64_t> allocated, released;
1565 volatile_statfs statfs_delta;
1566
1567 IOContext ioc;
1568 bool had_ios = false; ///< true if we submitted IOs before our kv txn
1569
1570 uint64_t seq = 0;
1571 utime_t start;
1572 utime_t last_stamp;
1573
1574 uint64_t last_nid = 0; ///< if non-zero, highest new nid we allocated
1575 uint64_t last_blobid = 0; ///< if non-zero, highest new blobid we allocated
1576
1577 explicit TransContext(CephContext* cct, OpSequencer *o)
1578 : osr(o),
1579 ioc(cct, this),
1580 start(ceph_clock_now()) {
1581 last_stamp = start;
1582 }
1583 ~TransContext() {
1584 delete deferred_txn;
1585 }
1586
1587 void write_onode(OnodeRef &o) {
1588 onodes.insert(o);
1589 }
1590 void write_shared_blob(SharedBlobRef &sb) {
1591 shared_blobs.insert(sb);
1592 }
1593 void unshare_blob(SharedBlob *sb) {
1594 shared_blobs.erase(sb);
1595 }
1596
1597 /// note we logically modified object (when onode itself is unmodified)
1598 void note_modified_object(OnodeRef &o) {
1599 // onode itself isn't written, though
1600 modified_objects.insert(o);
1601 }
1602 void removed(OnodeRef& o) {
1603 onodes.erase(o);
1604 modified_objects.erase(o);
1605 }
1606
1607 void aio_finish(BlueStore *store) override {
1608 store->txc_aio_finish(this);
1609 }
1610 };
1611
1612 typedef boost::intrusive::list<
1613 TransContext,
1614 boost::intrusive::member_hook<
1615 TransContext,
1616 boost::intrusive::list_member_hook<>,
1617 &TransContext::deferred_queue_item> > deferred_queue_t;
1618
1619 struct DeferredBatch : public AioContext {
1620 OpSequencer *osr;
1621 struct deferred_io {
1622 bufferlist bl; ///< data
1623 uint64_t seq; ///< deferred transaction seq
1624 };
1625 map<uint64_t,deferred_io> iomap; ///< map of ios in this batch
1626 deferred_queue_t txcs; ///< txcs in this batch
1627 IOContext ioc; ///< our aios
1628 /// bytes of pending io for each deferred seq (may be 0)
1629 map<uint64_t,int> seq_bytes;
1630
1631 void _discard(CephContext *cct, uint64_t offset, uint64_t length);
1632 void _audit(CephContext *cct);
1633
1634 DeferredBatch(CephContext *cct, OpSequencer *osr)
1635 : osr(osr), ioc(cct, this) {}
1636
1637 /// prepare a write
1638 void prepare_write(CephContext *cct,
1639 uint64_t seq, uint64_t offset, uint64_t length,
1640 bufferlist::const_iterator& p);
1641
1642 void aio_finish(BlueStore *store) override {
1643 store->_deferred_aio_finish(osr);
1644 }
1645 };
1646
1647 class OpSequencer : public Sequencer_impl {
1648 public:
1649 std::mutex qlock;
1650 std::condition_variable qcond;
1651 typedef boost::intrusive::list<
1652 TransContext,
1653 boost::intrusive::member_hook<
1654 TransContext,
1655 boost::intrusive::list_member_hook<>,
1656 &TransContext::sequencer_item> > q_list_t;
1657 q_list_t q; ///< transactions
1658
1659 boost::intrusive::list_member_hook<> deferred_osr_queue_item;
1660
1661 DeferredBatch *deferred_running = nullptr;
1662 DeferredBatch *deferred_pending = nullptr;
1663
1664 Sequencer *parent;
1665 BlueStore *store;
1666
1667 uint64_t last_seq = 0;
1668
1669 std::atomic_int txc_with_unstable_io = {0}; ///< num txcs with unstable io
1670
1671 std::atomic_int kv_committing_serially = {0};
1672
1673 std::atomic_int kv_submitted_waiters = {0};
1674
1675 std::atomic_bool registered = {true}; ///< registered in BlueStore's osr_set
1676 std::atomic_bool zombie = {false}; ///< owning Sequencer has gone away
1677
1678 OpSequencer(CephContext* cct, BlueStore *store)
1679 : Sequencer_impl(cct),
1680 parent(NULL), store(store) {
1681 store->register_osr(this);
1682 }
1683 ~OpSequencer() override {
1684 assert(q.empty());
1685 _unregister();
1686 }
1687
1688 void discard() override {
1689 // Note that we may have txc's in flight when the parent Sequencer
1690 // goes away. Reflect this with zombie==registered==true and let
1691 // _osr_drain_all clean up later.
1692 assert(!zombie);
1693 zombie = true;
1694 parent = nullptr;
1695 bool empty;
1696 {
1697 std::lock_guard<std::mutex> l(qlock);
1698 empty = q.empty();
1699 }
1700 if (empty) {
1701 _unregister();
1702 }
1703 }
1704
1705 void _unregister() {
1706 if (registered) {
1707 store->unregister_osr(this);
1708 registered = false;
1709 }
1710 }
1711
1712 void queue_new(TransContext *txc) {
1713 std::lock_guard<std::mutex> l(qlock);
1714 txc->seq = ++last_seq;
1715 q.push_back(*txc);
1716 }
1717
1718 void drain() {
1719 std::unique_lock<std::mutex> l(qlock);
1720 while (!q.empty())
1721 qcond.wait(l);
1722 }
1723
1724 void drain_preceding(TransContext *txc) {
1725 std::unique_lock<std::mutex> l(qlock);
1726 while (!q.empty() && &q.front() != txc)
1727 qcond.wait(l);
1728 }
1729
1730 bool _is_all_kv_submitted() {
1731 // caller must hold qlock
1732 if (q.empty()) {
1733 return true;
1734 }
1735 TransContext *txc = &q.back();
1736 if (txc->state >= TransContext::STATE_KV_SUBMITTED) {
1737 return true;
1738 }
1739 return false;
1740 }
1741
1742 void flush() override {
1743 std::unique_lock<std::mutex> l(qlock);
1744 while (true) {
1745 // set flag before the check because the condition
1746 // may become true outside qlock, and we need to make
1747 // sure those threads see waiters and signal qcond.
1748 ++kv_submitted_waiters;
1749 if (_is_all_kv_submitted()) {
1750 return;
1751 }
1752 qcond.wait(l);
1753 --kv_submitted_waiters;
1754 }
1755 }
1756
1757 bool flush_commit(Context *c) override {
1758 std::lock_guard<std::mutex> l(qlock);
1759 if (q.empty()) {
1760 return true;
1761 }
1762 TransContext *txc = &q.back();
1763 if (txc->state >= TransContext::STATE_KV_DONE) {
1764 return true;
1765 }
1766 txc->oncommits.push_back(c);
1767 return false;
1768 }
1769 };
1770
1771 typedef boost::intrusive::list<
1772 OpSequencer,
1773 boost::intrusive::member_hook<
1774 OpSequencer,
1775 boost::intrusive::list_member_hook<>,
1776 &OpSequencer::deferred_osr_queue_item> > deferred_osr_queue_t;
1777
1778 struct KVSyncThread : public Thread {
1779 BlueStore *store;
1780 explicit KVSyncThread(BlueStore *s) : store(s) {}
1781 void *entry() override {
1782 store->_kv_sync_thread();
1783 return NULL;
1784 }
1785 };
1786 struct KVFinalizeThread : public Thread {
1787 BlueStore *store;
1788 explicit KVFinalizeThread(BlueStore *s) : store(s) {}
1789 void *entry() {
1790 store->_kv_finalize_thread();
1791 return NULL;
1792 }
1793 };
1794
1795 struct DBHistogram {
1796 struct value_dist {
1797 uint64_t count;
1798 uint32_t max_len;
1799 };
1800
1801 struct key_dist {
1802 uint64_t count;
1803 uint32_t max_len;
1804 map<int, struct value_dist> val_map; ///< slab id to count, max length of value and key
1805 };
1806
1807 map<string, map<int, struct key_dist> > key_hist;
1808 map<int, uint64_t> value_hist;
1809 int get_key_slab(size_t sz);
1810 string get_key_slab_to_range(int slab);
1811 int get_value_slab(size_t sz);
1812 string get_value_slab_to_range(int slab);
1813 void update_hist_entry(map<string, map<int, struct key_dist> > &key_hist,
1814 const string &prefix, size_t key_size, size_t value_size);
1815 void dump(Formatter *f);
1816 };
1817
1818 // --------------------------------------------------------
1819 // members
1820 private:
1821 BlueFS *bluefs = nullptr;
1822 unsigned bluefs_shared_bdev = 0; ///< which bluefs bdev we are sharing
1823 bool bluefs_single_shared_device = true;
1824 utime_t bluefs_last_balance;
1825
1826 KeyValueDB *db = nullptr;
1827 BlockDevice *bdev = nullptr;
1828 std::string freelist_type;
1829 FreelistManager *fm = nullptr;
1830 Allocator *alloc = nullptr;
1831 uuid_d fsid;
1832 int path_fd = -1; ///< open handle to $path
1833 int fsid_fd = -1; ///< open handle (locked) to $path/fsid
1834 bool mounted = false;
1835
1836 RWLock coll_lock = {"BlueStore::coll_lock"}; ///< rwlock to protect coll_map
1837 mempool::bluestore_cache_other::unordered_map<coll_t, CollectionRef> coll_map;
1838
1839 vector<Cache*> cache_shards;
1840
1841 std::mutex osr_lock; ///< protect osd_set
1842 std::set<OpSequencerRef> osr_set; ///< set of all OpSequencers
1843
1844 std::atomic<uint64_t> nid_last = {0};
1845 std::atomic<uint64_t> nid_max = {0};
1846 std::atomic<uint64_t> blobid_last = {0};
1847 std::atomic<uint64_t> blobid_max = {0};
1848
1849 Throttle throttle_bytes; ///< submit to commit
1850 Throttle throttle_deferred_bytes; ///< submit to deferred complete
1851
1852 interval_set<uint64_t> bluefs_extents; ///< block extents owned by bluefs
1853 interval_set<uint64_t> bluefs_extents_reclaiming; ///< currently reclaiming
1854
1855 std::mutex deferred_lock;
1856 std::atomic<uint64_t> deferred_seq = {0};
1857 deferred_osr_queue_t deferred_queue; ///< osr's with deferred io pending
1858 int deferred_queue_size = 0; ///< num txc's queued across all osrs
1859 atomic_int deferred_aggressive = {0}; ///< aggressive wakeup of kv thread
1860 Finisher deferred_finisher;
1861
1862 int m_finisher_num = 1;
1863 vector<Finisher*> finishers;
1864
1865 KVSyncThread kv_sync_thread;
1866 std::mutex kv_lock;
1867 std::condition_variable kv_cond;
1868 bool _kv_only = false;
1869 bool kv_sync_started = false;
1870 bool kv_stop = false;
1871 bool kv_finalize_started = false;
1872 bool kv_finalize_stop = false;
1873 deque<TransContext*> kv_queue; ///< ready, already submitted
1874 deque<TransContext*> kv_queue_unsubmitted; ///< ready, need submit by kv thread
1875 deque<TransContext*> kv_committing; ///< currently syncing
1876 deque<DeferredBatch*> deferred_done_queue; ///< deferred ios done
1877 deque<DeferredBatch*> deferred_stable_queue; ///< deferred ios done + stable
1878
1879 KVFinalizeThread kv_finalize_thread;
1880 std::mutex kv_finalize_lock;
1881 std::condition_variable kv_finalize_cond;
1882 deque<TransContext*> kv_committing_to_finalize; ///< pending finalization
1883 deque<DeferredBatch*> deferred_stable_to_finalize; ///< pending finalization
1884
1885 PerfCounters *logger = nullptr;
1886
1887 std::mutex reap_lock;
1888 list<CollectionRef> removed_collections;
1889
1890 RWLock debug_read_error_lock = {"BlueStore::debug_read_error_lock"};
1891 set<ghobject_t> debug_data_error_objects;
1892 set<ghobject_t> debug_mdata_error_objects;
1893
1894 std::atomic<int> csum_type = {Checksummer::CSUM_CRC32C};
1895
1896 uint64_t block_size = 0; ///< block size of block device (power of 2)
1897 uint64_t block_mask = 0; ///< mask to get just the block offset
1898 size_t block_size_order = 0; ///< bits to shift to get block size
1899
1900 uint64_t min_alloc_size = 0; ///< minimum allocation unit (power of 2)
1901 ///< bits for min_alloc_size
1902 uint8_t min_alloc_size_order = 0;
1903 static_assert(std::numeric_limits<uint8_t>::max() >
1904 std::numeric_limits<decltype(min_alloc_size)>::digits,
1905 "not enough bits for min_alloc_size");
1906
1907 ///< maximum allocation unit (power of 2)
1908 std::atomic<uint64_t> max_alloc_size = {0};
1909
1910 ///< number threshold for forced deferred writes
1911 std::atomic<int> deferred_batch_ops = {0};
1912
1913 ///< size threshold for forced deferred writes
1914 std::atomic<uint64_t> prefer_deferred_size = {0};
1915
1916 ///< approx cost per io, in bytes
1917 std::atomic<uint64_t> throttle_cost_per_io = {0};
1918
1919 std::atomic<Compressor::CompressionMode> comp_mode =
1920 {Compressor::COMP_NONE}; ///< compression mode
1921 CompressorRef compressor;
1922 std::atomic<uint64_t> comp_min_blob_size = {0};
1923 std::atomic<uint64_t> comp_max_blob_size = {0};
1924
1925 std::atomic<uint64_t> max_blob_size = {0}; ///< maximum blob size
1926
1927 uint64_t kv_ios = 0;
1928 uint64_t kv_throttle_costs = 0;
1929
1930 // cache trim control
1931 uint64_t cache_size = 0; ///< total cache size
1932 float cache_meta_ratio = 0; ///< cache ratio dedicated to metadata
1933 float cache_kv_ratio = 0; ///< cache ratio dedicated to kv (e.g., rocksdb)
1934 float cache_data_ratio = 0; ///< cache ratio dedicated to object data
1935
1936 std::mutex vstatfs_lock;
1937 volatile_statfs vstatfs;
1938
1939 struct MempoolThread : public Thread {
1940 BlueStore *store;
1941 Cond cond;
1942 Mutex lock;
1943 bool stop = false;
1944 public:
1945 explicit MempoolThread(BlueStore *s)
1946 : store(s),
1947 lock("BlueStore::MempoolThread::lock") {}
1948 void *entry() override;
1949 void init() {
1950 assert(stop == false);
1951 create("bstore_mempool");
1952 }
1953 void shutdown() {
1954 lock.Lock();
1955 stop = true;
1956 cond.Signal();
1957 lock.Unlock();
1958 join();
1959 }
1960 } mempool_thread;
1961
1962 // --------------------------------------------------------
1963 // private methods
1964
1965 void _init_logger();
1966 void _shutdown_logger();
1967 int _reload_logger();
1968
1969 int _open_path();
1970 void _close_path();
1971 int _open_fsid(bool create);
1972 int _lock_fsid();
1973 int _read_fsid(uuid_d *f);
1974 int _write_fsid();
1975 void _close_fsid();
1976 void _set_alloc_sizes();
1977 void _set_blob_size();
1978
1979 int _open_bdev(bool create);
1980 void _close_bdev();
1981 int _open_db(bool create);
1982 void _close_db();
1983 int _open_fm(bool create);
1984 void _close_fm();
1985 int _open_alloc();
1986 void _close_alloc();
1987 int _open_collections(int *errors=0);
1988 void _close_collections();
1989
1990 int _setup_block_symlink_or_file(string name, string path, uint64_t size,
1991 bool create);
1992
1993 public:
1994 static int _write_bdev_label(CephContext* cct,
1995 string path, bluestore_bdev_label_t label);
1996 static int _read_bdev_label(CephContext* cct, string path,
1997 bluestore_bdev_label_t *label);
1998 private:
1999 int _check_or_set_bdev_label(string path, uint64_t size, string desc,
2000 bool create);
2001
2002 int _open_super_meta();
2003
2004 void _open_statfs();
2005
2006 int _reconcile_bluefs_freespace();
2007 int _balance_bluefs_freespace(PExtentVector *extents);
2008 void _commit_bluefs_freespace(const PExtentVector& extents);
2009
2010 CollectionRef _get_collection(const coll_t& cid);
2011 void _queue_reap_collection(CollectionRef& c);
2012 void _reap_collections();
2013 void _update_cache_logger();
2014
2015 void _assign_nid(TransContext *txc, OnodeRef o);
2016 uint64_t _assign_blobid(TransContext *txc);
2017
2018 void _dump_onode(OnodeRef o, int log_level=30);
2019 void _dump_extent_map(ExtentMap& em, int log_level=30);
2020 void _dump_transaction(Transaction *t, int log_level = 30);
2021
2022 TransContext *_txc_create(OpSequencer *osr);
2023 void _txc_update_store_statfs(TransContext *txc);
2024 void _txc_add_transaction(TransContext *txc, Transaction *t);
2025 void _txc_calc_cost(TransContext *txc);
2026 void _txc_write_nodes(TransContext *txc, KeyValueDB::Transaction t);
2027 void _txc_state_proc(TransContext *txc);
2028 void _txc_aio_submit(TransContext *txc);
2029 public:
2030 void txc_aio_finish(void *p) {
2031 _txc_state_proc(static_cast<TransContext*>(p));
2032 }
2033 private:
2034 void _txc_finish_io(TransContext *txc);
2035 void _txc_finalize_kv(TransContext *txc, KeyValueDB::Transaction t);
2036 void _txc_applied_kv(TransContext *txc);
2037 void _txc_committed_kv(TransContext *txc);
2038 void _txc_finish(TransContext *txc);
2039 void _txc_release_alloc(TransContext *txc);
2040
2041 void _osr_drain_preceding(TransContext *txc);
2042 void _osr_drain_all();
2043 void _osr_unregister_all();
2044
2045 void _kv_start();
2046 void _kv_stop();
2047 void _kv_sync_thread();
2048 void _kv_finalize_thread();
2049
2050 bluestore_deferred_op_t *_get_deferred_op(TransContext *txc, OnodeRef o);
2051 void _deferred_queue(TransContext *txc);
2052 public:
2053 void deferred_try_submit();
2054 private:
2055 void _deferred_submit_unlock(OpSequencer *osr);
2056 void _deferred_aio_finish(OpSequencer *osr);
2057 int _deferred_replay();
2058
2059 public:
2060 using mempool_dynamic_bitset =
2061 boost::dynamic_bitset<uint64_t,
2062 mempool::bluestore_fsck::pool_allocator<uint64_t>>;
2063
2064 private:
2065 int _fsck_check_extents(
2066 const ghobject_t& oid,
2067 const PExtentVector& extents,
2068 bool compressed,
2069 mempool_dynamic_bitset &used_blocks,
2070 uint64_t granularity,
2071 store_statfs_t& expected_statfs);
2072
2073 void _buffer_cache_write(
2074 TransContext *txc,
2075 BlobRef b,
2076 uint64_t offset,
2077 bufferlist& bl,
2078 unsigned flags) {
2079 b->shared_blob->bc.write(b->shared_blob->get_cache(), txc->seq, offset, bl,
2080 flags);
2081 txc->shared_blobs_written.insert(b->shared_blob);
2082 }
2083
2084 int _collection_list(
2085 Collection *c, const ghobject_t& start, const ghobject_t& end,
2086 int max, vector<ghobject_t> *ls, ghobject_t *next);
2087
2088 template <typename T, typename F>
2089 T select_option(const std::string& opt_name, T val1, F f) {
2090 //NB: opt_name reserved for future use
2091 boost::optional<T> val2 = f();
2092 if (val2) {
2093 return *val2;
2094 }
2095 return val1;
2096 }
2097
2098 void _apply_padding(uint64_t head_pad,
2099 uint64_t tail_pad,
2100 bufferlist& padded);
2101
2102 // -- ondisk version ---
2103 public:
2104 const int32_t latest_ondisk_format = 2; ///< our version
2105 const int32_t min_readable_ondisk_format = 1; ///< what we can read
2106 const int32_t min_compat_ondisk_format = 2; ///< who can read us
2107
2108 private:
2109 int32_t ondisk_format = 0; ///< value detected on mount
2110
2111 int _upgrade_super(); ///< upgrade (called during open_super)
2112 void _prepare_ondisk_format_super(KeyValueDB::Transaction& t);
2113
2114 // --- public interface ---
2115 public:
2116 BlueStore(CephContext *cct, const string& path);
2117 BlueStore(CephContext *cct, const string& path, uint64_t min_alloc_size); // Ctor for UT only
2118 ~BlueStore() override;
2119
2120 string get_type() override {
2121 return "bluestore";
2122 }
2123
2124 bool needs_journal() override { return false; };
2125 bool wants_journal() override { return false; };
2126 bool allows_journal() override { return false; };
2127
2128 bool is_rotational() override;
2129 bool is_journal_rotational() override;
2130
2131 string get_default_device_class() override {
2132 string device_class;
2133 map<string, string> metadata;
2134 collect_metadata(&metadata);
2135 auto it = metadata.find("bluestore_bdev_type");
2136 if (it != metadata.end()) {
2137 device_class = it->second;
2138 }
2139 return device_class;
2140 }
2141
2142 static int get_block_device_fsid(CephContext* cct, const string& path,
2143 uuid_d *fsid);
2144
2145 bool test_mount_in_use() override;
2146
2147 private:
2148 int _mount(bool kv_only);
2149 public:
2150 int mount() override {
2151 return _mount(false);
2152 }
2153 int umount() override;
2154
2155 int start_kv_only(KeyValueDB **pdb) {
2156 int r = _mount(true);
2157 if (r < 0)
2158 return r;
2159 *pdb = db;
2160 return 0;
2161 }
2162
2163 int write_meta(const std::string& key, const std::string& value) override;
2164 int read_meta(const std::string& key, std::string *value) override;
2165
2166
2167 int fsck(bool deep) override {
2168 return _fsck(deep, false);
2169 }
2170 int repair(bool deep) override {
2171 return _fsck(deep, true);
2172 }
2173 int _fsck(bool deep, bool repair);
2174
2175 void set_cache_shards(unsigned num) override;
2176
2177 int validate_hobject_key(const hobject_t &obj) const override {
2178 return 0;
2179 }
2180 unsigned get_max_attr_name_length() override {
2181 return 256; // arbitrary; there is no real limit internally
2182 }
2183
2184 int mkfs() override;
2185 int mkjournal() override {
2186 return 0;
2187 }
2188
2189 void get_db_statistics(Formatter *f) override;
2190 void generate_db_histogram(Formatter *f) override;
2191 void _flush_cache();
2192 void flush_cache() override;
2193 void dump_perf_counters(Formatter *f) override {
2194 f->open_object_section("perf_counters");
2195 logger->dump_formatted(f, false);
2196 f->close_section();
2197 }
2198
2199 void register_osr(OpSequencer *osr) {
2200 std::lock_guard<std::mutex> l(osr_lock);
2201 osr_set.insert(osr);
2202 }
2203 void unregister_osr(OpSequencer *osr) {
2204 std::lock_guard<std::mutex> l(osr_lock);
2205 osr_set.erase(osr);
2206 }
2207
2208 public:
2209 int statfs(struct store_statfs_t *buf) override;
2210
2211 void collect_metadata(map<string,string> *pm) override;
2212
2213 bool exists(const coll_t& cid, const ghobject_t& oid) override;
2214 bool exists(CollectionHandle &c, const ghobject_t& oid) override;
2215 int set_collection_opts(
2216 const coll_t& cid,
2217 const pool_opts_t& opts) override;
2218 int stat(
2219 const coll_t& cid,
2220 const ghobject_t& oid,
2221 struct stat *st,
2222 bool allow_eio = false) override;
2223 int stat(
2224 CollectionHandle &c,
2225 const ghobject_t& oid,
2226 struct stat *st,
2227 bool allow_eio = false) override;
2228 int read(
2229 const coll_t& cid,
2230 const ghobject_t& oid,
2231 uint64_t offset,
2232 size_t len,
2233 bufferlist& bl,
2234 uint32_t op_flags = 0) override;
2235 int read(
2236 CollectionHandle &c,
2237 const ghobject_t& oid,
2238 uint64_t offset,
2239 size_t len,
2240 bufferlist& bl,
2241 uint32_t op_flags = 0) override;
2242 int _do_read(
2243 Collection *c,
2244 OnodeRef o,
2245 uint64_t offset,
2246 size_t len,
2247 bufferlist& bl,
2248 uint32_t op_flags = 0);
2249
2250 private:
2251 int _fiemap(CollectionHandle &c_, const ghobject_t& oid,
2252 uint64_t offset, size_t len, interval_set<uint64_t>& destset);
2253 public:
2254 int fiemap(const coll_t& cid, const ghobject_t& oid,
2255 uint64_t offset, size_t len, bufferlist& bl) override;
2256 int fiemap(CollectionHandle &c, const ghobject_t& oid,
2257 uint64_t offset, size_t len, bufferlist& bl) override;
2258 int fiemap(const coll_t& cid, const ghobject_t& oid,
2259 uint64_t offset, size_t len, map<uint64_t, uint64_t>& destmap) override;
2260 int fiemap(CollectionHandle &c, const ghobject_t& oid,
2261 uint64_t offset, size_t len, map<uint64_t, uint64_t>& destmap) override;
2262
2263
2264 int getattr(const coll_t& cid, const ghobject_t& oid, const char *name,
2265 bufferptr& value) override;
2266 int getattr(CollectionHandle &c, const ghobject_t& oid, const char *name,
2267 bufferptr& value) override;
2268
2269 int getattrs(const coll_t& cid, const ghobject_t& oid,
2270 map<string,bufferptr>& aset) override;
2271 int getattrs(CollectionHandle &c, const ghobject_t& oid,
2272 map<string,bufferptr>& aset) override;
2273
2274 int list_collections(vector<coll_t>& ls) override;
2275
2276 CollectionHandle open_collection(const coll_t &c) override;
2277
2278 bool collection_exists(const coll_t& c) override;
2279 int collection_empty(const coll_t& c, bool *empty) override;
2280 int collection_bits(const coll_t& c) override;
2281
2282 int collection_list(const coll_t& cid,
2283 const ghobject_t& start,
2284 const ghobject_t& end,
2285 int max,
2286 vector<ghobject_t> *ls, ghobject_t *next) override;
2287 int collection_list(CollectionHandle &c,
2288 const ghobject_t& start,
2289 const ghobject_t& end,
2290 int max,
2291 vector<ghobject_t> *ls, ghobject_t *next) override;
2292
2293 int omap_get(
2294 const coll_t& cid, ///< [in] Collection containing oid
2295 const ghobject_t &oid, ///< [in] Object containing omap
2296 bufferlist *header, ///< [out] omap header
2297 map<string, bufferlist> *out /// < [out] Key to value map
2298 ) override;
2299 int omap_get(
2300 CollectionHandle &c, ///< [in] Collection containing oid
2301 const ghobject_t &oid, ///< [in] Object containing omap
2302 bufferlist *header, ///< [out] omap header
2303 map<string, bufferlist> *out /// < [out] Key to value map
2304 ) override;
2305
2306 /// Get omap header
2307 int omap_get_header(
2308 const coll_t& cid, ///< [in] Collection containing oid
2309 const ghobject_t &oid, ///< [in] Object containing omap
2310 bufferlist *header, ///< [out] omap header
2311 bool allow_eio = false ///< [in] don't assert on eio
2312 ) override;
2313 int omap_get_header(
2314 CollectionHandle &c, ///< [in] Collection containing oid
2315 const ghobject_t &oid, ///< [in] Object containing omap
2316 bufferlist *header, ///< [out] omap header
2317 bool allow_eio = false ///< [in] don't assert on eio
2318 ) override;
2319
2320 /// Get keys defined on oid
2321 int omap_get_keys(
2322 const coll_t& cid, ///< [in] Collection containing oid
2323 const ghobject_t &oid, ///< [in] Object containing omap
2324 set<string> *keys ///< [out] Keys defined on oid
2325 ) override;
2326 int omap_get_keys(
2327 CollectionHandle &c, ///< [in] Collection containing oid
2328 const ghobject_t &oid, ///< [in] Object containing omap
2329 set<string> *keys ///< [out] Keys defined on oid
2330 ) override;
2331
2332 /// Get key values
2333 int omap_get_values(
2334 const coll_t& cid, ///< [in] Collection containing oid
2335 const ghobject_t &oid, ///< [in] Object containing omap
2336 const set<string> &keys, ///< [in] Keys to get
2337 map<string, bufferlist> *out ///< [out] Returned keys and values
2338 ) override;
2339 int omap_get_values(
2340 CollectionHandle &c, ///< [in] Collection containing oid
2341 const ghobject_t &oid, ///< [in] Object containing omap
2342 const set<string> &keys, ///< [in] Keys to get
2343 map<string, bufferlist> *out ///< [out] Returned keys and values
2344 ) override;
2345
2346 /// Filters keys into out which are defined on oid
2347 int omap_check_keys(
2348 const coll_t& cid, ///< [in] Collection containing oid
2349 const ghobject_t &oid, ///< [in] Object containing omap
2350 const set<string> &keys, ///< [in] Keys to check
2351 set<string> *out ///< [out] Subset of keys defined on oid
2352 ) override;
2353 int omap_check_keys(
2354 CollectionHandle &c, ///< [in] Collection containing oid
2355 const ghobject_t &oid, ///< [in] Object containing omap
2356 const set<string> &keys, ///< [in] Keys to check
2357 set<string> *out ///< [out] Subset of keys defined on oid
2358 ) override;
2359
2360 ObjectMap::ObjectMapIterator get_omap_iterator(
2361 const coll_t& cid, ///< [in] collection
2362 const ghobject_t &oid ///< [in] object
2363 ) override;
2364 ObjectMap::ObjectMapIterator get_omap_iterator(
2365 CollectionHandle &c, ///< [in] collection
2366 const ghobject_t &oid ///< [in] object
2367 ) override;
2368
2369 void set_fsid(uuid_d u) override {
2370 fsid = u;
2371 }
2372 uuid_d get_fsid() override {
2373 return fsid;
2374 }
2375
2376 uint64_t estimate_objects_overhead(uint64_t num_objects) override {
2377 return num_objects * 300; //assuming per-object overhead is 300 bytes
2378 }
2379
2380 struct BSPerfTracker {
2381 PerfCounters::avg_tracker<uint64_t> os_commit_latency;
2382 PerfCounters::avg_tracker<uint64_t> os_apply_latency;
2383
2384 objectstore_perf_stat_t get_cur_stats() const {
2385 objectstore_perf_stat_t ret;
2386 ret.os_commit_latency = os_commit_latency.current_avg();
2387 ret.os_apply_latency = os_apply_latency.current_avg();
2388 return ret;
2389 }
2390
2391 void update_from_perfcounters(PerfCounters &logger);
2392 } perf_tracker;
2393
2394 objectstore_perf_stat_t get_cur_stats() override {
2395 perf_tracker.update_from_perfcounters(*logger);
2396 return perf_tracker.get_cur_stats();
2397 }
2398 const PerfCounters* get_perf_counters() const override {
2399 return logger;
2400 }
2401
2402 int queue_transactions(
2403 Sequencer *osr,
2404 vector<Transaction>& tls,
2405 TrackedOpRef op = TrackedOpRef(),
2406 ThreadPool::TPHandle *handle = NULL) override;
2407
2408 // error injection
2409 void inject_data_error(const ghobject_t& o) override {
2410 RWLock::WLocker l(debug_read_error_lock);
2411 debug_data_error_objects.insert(o);
2412 }
2413 void inject_mdata_error(const ghobject_t& o) override {
2414 RWLock::WLocker l(debug_read_error_lock);
2415 debug_mdata_error_objects.insert(o);
2416 }
2417 void compact() override {
2418 assert(db);
2419 db->compact();
2420 }
2421
2422 private:
2423 bool _debug_data_eio(const ghobject_t& o) {
2424 if (!cct->_conf->bluestore_debug_inject_read_err) {
2425 return false;
2426 }
2427 RWLock::RLocker l(debug_read_error_lock);
2428 return debug_data_error_objects.count(o);
2429 }
2430 bool _debug_mdata_eio(const ghobject_t& o) {
2431 if (!cct->_conf->bluestore_debug_inject_read_err) {
2432 return false;
2433 }
2434 RWLock::RLocker l(debug_read_error_lock);
2435 return debug_mdata_error_objects.count(o);
2436 }
2437 void _debug_obj_on_delete(const ghobject_t& o) {
2438 if (cct->_conf->bluestore_debug_inject_read_err) {
2439 RWLock::WLocker l(debug_read_error_lock);
2440 debug_data_error_objects.erase(o);
2441 debug_mdata_error_objects.erase(o);
2442 }
2443 }
2444
2445 private:
2446
2447 // --------------------------------------------------------
2448 // read processing internal methods
2449 int _verify_csum(
2450 OnodeRef& o,
2451 const bluestore_blob_t* blob,
2452 uint64_t blob_xoffset,
2453 const bufferlist& bl,
2454 uint64_t logical_offset) const;
2455 int _decompress(bufferlist& source, bufferlist* result);
2456
2457
2458 // --------------------------------------------------------
2459 // write ops
2460
2461 struct WriteContext {
2462 bool buffered = false; ///< buffered write
2463 bool compress = false; ///< compressed write
2464 uint64_t target_blob_size = 0; ///< target (max) blob size
2465 unsigned csum_order = 0; ///< target checksum chunk order
2466
2467 old_extent_map_t old_extents; ///< must deref these blobs
2468
2469 struct write_item {
2470 uint64_t logical_offset; ///< write logical offset
2471 BlobRef b;
2472 uint64_t blob_length;
2473 uint64_t b_off;
2474 bufferlist bl;
2475 uint64_t b_off0; ///< original offset in a blob prior to padding
2476 uint64_t length0; ///< original data length prior to padding
2477
2478 bool mark_unused;
2479 bool new_blob; ///< whether new blob was created
2480
2481 bool compressed = false;
2482 bufferlist compressed_bl;
2483 size_t compressed_len = 0;
2484
2485 write_item(
2486 uint64_t logical_offs,
2487 BlobRef b,
2488 uint64_t blob_len,
2489 uint64_t o,
2490 bufferlist& bl,
2491 uint64_t o0,
2492 uint64_t l0,
2493 bool _mark_unused,
2494 bool _new_blob)
2495 :
2496 logical_offset(logical_offs),
2497 b(b),
2498 blob_length(blob_len),
2499 b_off(o),
2500 bl(bl),
2501 b_off0(o0),
2502 length0(l0),
2503 mark_unused(_mark_unused),
2504 new_blob(_new_blob) {}
2505 };
2506 vector<write_item> writes; ///< blobs we're writing
2507
2508 /// partial clone of the context
2509 void fork(const WriteContext& other) {
2510 buffered = other.buffered;
2511 compress = other.compress;
2512 target_blob_size = other.target_blob_size;
2513 csum_order = other.csum_order;
2514 }
2515 void write(
2516 uint64_t loffs,
2517 BlobRef b,
2518 uint64_t blob_len,
2519 uint64_t o,
2520 bufferlist& bl,
2521 uint64_t o0,
2522 uint64_t len0,
2523 bool _mark_unused,
2524 bool _new_blob) {
2525 writes.emplace_back(loffs,
2526 b,
2527 blob_len,
2528 o,
2529 bl,
2530 o0,
2531 len0,
2532 _mark_unused,
2533 _new_blob);
2534 }
2535 /// Checks for writes to the same pextent within a blob
2536 bool has_conflict(
2537 BlobRef b,
2538 uint64_t loffs,
2539 uint64_t loffs_end,
2540 uint64_t min_alloc_size);
2541 };
2542
2543 void _do_write_small(
2544 TransContext *txc,
2545 CollectionRef &c,
2546 OnodeRef o,
2547 uint64_t offset, uint64_t length,
2548 bufferlist::iterator& blp,
2549 WriteContext *wctx);
2550 void _do_write_big(
2551 TransContext *txc,
2552 CollectionRef &c,
2553 OnodeRef o,
2554 uint64_t offset, uint64_t length,
2555 bufferlist::iterator& blp,
2556 WriteContext *wctx);
2557 int _do_alloc_write(
2558 TransContext *txc,
2559 CollectionRef c,
2560 OnodeRef o,
2561 WriteContext *wctx);
2562 void _wctx_finish(
2563 TransContext *txc,
2564 CollectionRef& c,
2565 OnodeRef o,
2566 WriteContext *wctx,
2567 set<SharedBlob*> *maybe_unshared_blobs=0);
2568
2569 int _do_transaction(Transaction *t,
2570 TransContext *txc,
2571 ThreadPool::TPHandle *handle);
2572
2573 int _write(TransContext *txc,
2574 CollectionRef& c,
2575 OnodeRef& o,
2576 uint64_t offset, size_t len,
2577 bufferlist& bl,
2578 uint32_t fadvise_flags);
2579 void _pad_zeros(bufferlist *bl, uint64_t *offset,
2580 uint64_t chunk_size);
2581
2582 void _choose_write_options(CollectionRef& c,
2583 OnodeRef o,
2584 uint32_t fadvise_flags,
2585 WriteContext *wctx);
2586
2587 int _do_gc(TransContext *txc,
2588 CollectionRef& c,
2589 OnodeRef o,
2590 const GarbageCollector& gc,
2591 const WriteContext& wctx,
2592 uint64_t *dirty_start,
2593 uint64_t *dirty_end);
2594
2595 int _do_write(TransContext *txc,
2596 CollectionRef &c,
2597 OnodeRef o,
2598 uint64_t offset, uint64_t length,
2599 bufferlist& bl,
2600 uint32_t fadvise_flags);
2601 void _do_write_data(TransContext *txc,
2602 CollectionRef& c,
2603 OnodeRef o,
2604 uint64_t offset,
2605 uint64_t length,
2606 bufferlist& bl,
2607 WriteContext *wctx);
2608
2609 int _touch(TransContext *txc,
2610 CollectionRef& c,
2611 OnodeRef& o);
2612 int _do_zero(TransContext *txc,
2613 CollectionRef& c,
2614 OnodeRef& o,
2615 uint64_t offset, size_t len);
2616 int _zero(TransContext *txc,
2617 CollectionRef& c,
2618 OnodeRef& o,
2619 uint64_t offset, size_t len);
2620 void _do_truncate(TransContext *txc,
2621 CollectionRef& c,
2622 OnodeRef o,
2623 uint64_t offset,
2624 set<SharedBlob*> *maybe_unshared_blobs=0);
2625 int _truncate(TransContext *txc,
2626 CollectionRef& c,
2627 OnodeRef& o,
2628 uint64_t offset);
2629 int _remove(TransContext *txc,
2630 CollectionRef& c,
2631 OnodeRef& o);
2632 int _do_remove(TransContext *txc,
2633 CollectionRef& c,
2634 OnodeRef o);
2635 int _setattr(TransContext *txc,
2636 CollectionRef& c,
2637 OnodeRef& o,
2638 const string& name,
2639 bufferptr& val);
2640 int _setattrs(TransContext *txc,
2641 CollectionRef& c,
2642 OnodeRef& o,
2643 const map<string,bufferptr>& aset);
2644 int _rmattr(TransContext *txc,
2645 CollectionRef& c,
2646 OnodeRef& o,
2647 const string& name);
2648 int _rmattrs(TransContext *txc,
2649 CollectionRef& c,
2650 OnodeRef& o);
2651 void _do_omap_clear(TransContext *txc, uint64_t id);
2652 int _omap_clear(TransContext *txc,
2653 CollectionRef& c,
2654 OnodeRef& o);
2655 int _omap_setkeys(TransContext *txc,
2656 CollectionRef& c,
2657 OnodeRef& o,
2658 bufferlist& bl);
2659 int _omap_setheader(TransContext *txc,
2660 CollectionRef& c,
2661 OnodeRef& o,
2662 bufferlist& header);
2663 int _omap_rmkeys(TransContext *txc,
2664 CollectionRef& c,
2665 OnodeRef& o,
2666 bufferlist& bl);
2667 int _omap_rmkey_range(TransContext *txc,
2668 CollectionRef& c,
2669 OnodeRef& o,
2670 const string& first, const string& last);
2671 int _set_alloc_hint(
2672 TransContext *txc,
2673 CollectionRef& c,
2674 OnodeRef& o,
2675 uint64_t expected_object_size,
2676 uint64_t expected_write_size,
2677 uint32_t flags);
2678 int _do_clone_range(TransContext *txc,
2679 CollectionRef& c,
2680 OnodeRef& oldo,
2681 OnodeRef& newo,
2682 uint64_t srcoff, uint64_t length, uint64_t dstoff);
2683 int _clone(TransContext *txc,
2684 CollectionRef& c,
2685 OnodeRef& oldo,
2686 OnodeRef& newo);
2687 int _clone_range(TransContext *txc,
2688 CollectionRef& c,
2689 OnodeRef& oldo,
2690 OnodeRef& newo,
2691 uint64_t srcoff, uint64_t length, uint64_t dstoff);
2692 int _rename(TransContext *txc,
2693 CollectionRef& c,
2694 OnodeRef& oldo,
2695 OnodeRef& newo,
2696 const ghobject_t& new_oid);
2697 int _create_collection(TransContext *txc, const coll_t &cid,
2698 unsigned bits, CollectionRef *c);
2699 int _remove_collection(TransContext *txc, const coll_t &cid,
2700 CollectionRef *c);
2701 int _split_collection(TransContext *txc,
2702 CollectionRef& c,
2703 CollectionRef& d,
2704 unsigned bits, int rem);
2705 };
2706
2707 inline ostream& operator<<(ostream& out, const BlueStore::OpSequencer& s) {
2708 return out << *s.parent;
2709 }
2710
2711 static inline void intrusive_ptr_add_ref(BlueStore::Onode *o) {
2712 o->get();
2713 }
2714 static inline void intrusive_ptr_release(BlueStore::Onode *o) {
2715 o->put();
2716 }
2717
2718 static inline void intrusive_ptr_add_ref(BlueStore::OpSequencer *o) {
2719 o->get();
2720 }
2721 static inline void intrusive_ptr_release(BlueStore::OpSequencer *o) {
2722 o->put();
2723 }
2724
2725 #endif