]> git.proxmox.com Git - ceph.git/blob - ceph/src/os/bluestore/BlueStore.h
update sources to 12.2.2
[ceph.git] / ceph / src / os / bluestore / BlueStore.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_OSD_BLUESTORE_H
16 #define CEPH_OSD_BLUESTORE_H
17
18 #include "acconfig.h"
19
20 #include <unistd.h>
21
22 #include <atomic>
23 #include <mutex>
24 #include <condition_variable>
25
26 #include <boost/intrusive/list.hpp>
27 #include <boost/intrusive/unordered_set.hpp>
28 #include <boost/intrusive/set.hpp>
29 #include <boost/functional/hash.hpp>
30 #include <boost/dynamic_bitset.hpp>
31
32 #include "include/assert.h"
33 #include "include/unordered_map.h"
34 #include "include/memory.h"
35 #include "include/mempool.h"
36 #include "common/Finisher.h"
37 #include "common/perf_counters.h"
38 #include "compressor/Compressor.h"
39 #include "os/ObjectStore.h"
40
41 #include "bluestore_types.h"
42 #include "BlockDevice.h"
43 #include "common/EventTrace.h"
44
45 class Allocator;
46 class FreelistManager;
47 class BlueFS;
48
49 //#define DEBUG_CACHE
50 //#define DEBUG_DEFERRED
51
52
53
54 // constants for Buffer::optimize()
55 #define MAX_BUFFER_SLOP_RATIO_DEN 8 // so actually 1/N
56
57
58 enum {
59 l_bluestore_first = 732430,
60 l_bluestore_kv_flush_lat,
61 l_bluestore_kv_commit_lat,
62 l_bluestore_kv_lat,
63 l_bluestore_state_prepare_lat,
64 l_bluestore_state_aio_wait_lat,
65 l_bluestore_state_io_done_lat,
66 l_bluestore_state_kv_queued_lat,
67 l_bluestore_state_kv_committing_lat,
68 l_bluestore_state_kv_done_lat,
69 l_bluestore_state_deferred_queued_lat,
70 l_bluestore_state_deferred_aio_wait_lat,
71 l_bluestore_state_deferred_cleanup_lat,
72 l_bluestore_state_finishing_lat,
73 l_bluestore_state_done_lat,
74 l_bluestore_throttle_lat,
75 l_bluestore_submit_lat,
76 l_bluestore_commit_lat,
77 l_bluestore_read_lat,
78 l_bluestore_read_onode_meta_lat,
79 l_bluestore_read_wait_aio_lat,
80 l_bluestore_compress_lat,
81 l_bluestore_decompress_lat,
82 l_bluestore_csum_lat,
83 l_bluestore_compress_success_count,
84 l_bluestore_compress_rejected_count,
85 l_bluestore_write_pad_bytes,
86 l_bluestore_deferred_write_ops,
87 l_bluestore_deferred_write_bytes,
88 l_bluestore_write_penalty_read_ops,
89 l_bluestore_allocated,
90 l_bluestore_stored,
91 l_bluestore_compressed,
92 l_bluestore_compressed_allocated,
93 l_bluestore_compressed_original,
94 l_bluestore_onodes,
95 l_bluestore_onode_hits,
96 l_bluestore_onode_misses,
97 l_bluestore_onode_shard_hits,
98 l_bluestore_onode_shard_misses,
99 l_bluestore_extents,
100 l_bluestore_blobs,
101 l_bluestore_buffers,
102 l_bluestore_buffer_bytes,
103 l_bluestore_buffer_hit_bytes,
104 l_bluestore_buffer_miss_bytes,
105 l_bluestore_write_big,
106 l_bluestore_write_big_bytes,
107 l_bluestore_write_big_blobs,
108 l_bluestore_write_small,
109 l_bluestore_write_small_bytes,
110 l_bluestore_write_small_unused,
111 l_bluestore_write_small_deferred,
112 l_bluestore_write_small_pre_read,
113 l_bluestore_write_small_new,
114 l_bluestore_txc,
115 l_bluestore_onode_reshard,
116 l_bluestore_blob_split,
117 l_bluestore_extent_compress,
118 l_bluestore_gc_merged,
119 l_bluestore_last
120 };
121
122 class BlueStore : public ObjectStore,
123 public md_config_obs_t {
124 // -----------------------------------------------------
125 // types
126 public:
127 // config observer
128 const char** get_tracked_conf_keys() const override;
129 void handle_conf_change(const struct md_config_t *conf,
130 const std::set<std::string> &changed) override;
131
132 void _set_csum();
133 void _set_compression();
134 void _set_throttle_params();
135 int _set_cache_sizes();
136
137 class TransContext;
138
139 typedef map<uint64_t, bufferlist> ready_regions_t;
140
141 struct BufferSpace;
142 struct Collection;
143 typedef boost::intrusive_ptr<Collection> CollectionRef;
144
145 struct AioContext {
146 virtual void aio_finish(BlueStore *store) = 0;
147 virtual ~AioContext() {}
148 };
149
150 /// cached buffer
151 struct Buffer {
152 MEMPOOL_CLASS_HELPERS();
153
154 enum {
155 STATE_EMPTY, ///< empty buffer -- used for cache history
156 STATE_CLEAN, ///< clean data that is up to date
157 STATE_WRITING, ///< data that is being written (io not yet complete)
158 };
159 static const char *get_state_name(int s) {
160 switch (s) {
161 case STATE_EMPTY: return "empty";
162 case STATE_CLEAN: return "clean";
163 case STATE_WRITING: return "writing";
164 default: return "???";
165 }
166 }
167 enum {
168 FLAG_NOCACHE = 1, ///< trim when done WRITING (do not become CLEAN)
169 // NOTE: fix operator<< when you define a second flag
170 };
171 static const char *get_flag_name(int s) {
172 switch (s) {
173 case FLAG_NOCACHE: return "nocache";
174 default: return "???";
175 }
176 }
177
178 BufferSpace *space;
179 uint16_t state; ///< STATE_*
180 uint16_t cache_private = 0; ///< opaque (to us) value used by Cache impl
181 uint32_t flags; ///< FLAG_*
182 uint64_t seq;
183 uint32_t offset, length;
184 bufferlist data;
185
186 boost::intrusive::list_member_hook<> lru_item;
187 boost::intrusive::list_member_hook<> state_item;
188
189 Buffer(BufferSpace *space, unsigned s, uint64_t q, uint32_t o, uint32_t l,
190 unsigned f = 0)
191 : space(space), state(s), flags(f), seq(q), offset(o), length(l) {}
192 Buffer(BufferSpace *space, unsigned s, uint64_t q, uint32_t o, bufferlist& b,
193 unsigned f = 0)
194 : space(space), state(s), flags(f), seq(q), offset(o),
195 length(b.length()), data(b) {}
196
197 bool is_empty() const {
198 return state == STATE_EMPTY;
199 }
200 bool is_clean() const {
201 return state == STATE_CLEAN;
202 }
203 bool is_writing() const {
204 return state == STATE_WRITING;
205 }
206
207 uint32_t end() const {
208 return offset + length;
209 }
210
211 void truncate(uint32_t newlen) {
212 assert(newlen < length);
213 if (data.length()) {
214 bufferlist t;
215 t.substr_of(data, 0, newlen);
216 data.claim(t);
217 }
218 length = newlen;
219 }
220 void maybe_rebuild() {
221 if (data.length() &&
222 (data.get_num_buffers() > 1 ||
223 data.front().wasted() > data.length() / MAX_BUFFER_SLOP_RATIO_DEN)) {
224 data.rebuild();
225 }
226 }
227
228 void dump(Formatter *f) const {
229 f->dump_string("state", get_state_name(state));
230 f->dump_unsigned("seq", seq);
231 f->dump_unsigned("offset", offset);
232 f->dump_unsigned("length", length);
233 f->dump_unsigned("data_length", data.length());
234 }
235 };
236
237 struct Cache;
238
239 /// map logical extent range (object) onto buffers
240 struct BufferSpace {
241 typedef boost::intrusive::list<
242 Buffer,
243 boost::intrusive::member_hook<
244 Buffer,
245 boost::intrusive::list_member_hook<>,
246 &Buffer::state_item> > state_list_t;
247
248 mempool::bluestore_cache_other::map<uint32_t, std::unique_ptr<Buffer>>
249 buffer_map;
250
251 // we use a bare intrusive list here instead of std::map because
252 // it uses less memory and we expect this to be very small (very
253 // few IOs in flight to the same Blob at the same time).
254 state_list_t writing; ///< writing buffers, sorted by seq, ascending
255
256 ~BufferSpace() {
257 assert(buffer_map.empty());
258 assert(writing.empty());
259 }
260
261 void _add_buffer(Cache* cache, Buffer *b, int level, Buffer *near) {
262 cache->_audit("_add_buffer start");
263 buffer_map[b->offset].reset(b);
264 if (b->is_writing()) {
265 b->data.reassign_to_mempool(mempool::mempool_bluestore_writing);
266 if (writing.empty() || writing.rbegin()->seq <= b->seq) {
267 writing.push_back(*b);
268 } else {
269 auto it = writing.begin();
270 while (it->seq < b->seq) {
271 ++it;
272 }
273
274 assert(it->seq >= b->seq);
275 // note that this will insert b before it
276 // hence the order is maintained
277 writing.insert(it, *b);
278 }
279 } else {
280 b->data.reassign_to_mempool(mempool::mempool_bluestore_cache_data);
281 cache->_add_buffer(b, level, near);
282 }
283 cache->_audit("_add_buffer end");
284 }
285 void _rm_buffer(Cache* cache, Buffer *b) {
286 _rm_buffer(cache, buffer_map.find(b->offset));
287 }
288 void _rm_buffer(Cache* cache,
289 map<uint32_t, std::unique_ptr<Buffer>>::iterator p) {
290 assert(p != buffer_map.end());
291 cache->_audit("_rm_buffer start");
292 if (p->second->is_writing()) {
293 writing.erase(writing.iterator_to(*p->second));
294 } else {
295 cache->_rm_buffer(p->second.get());
296 }
297 buffer_map.erase(p);
298 cache->_audit("_rm_buffer end");
299 }
300
301 map<uint32_t,std::unique_ptr<Buffer>>::iterator _data_lower_bound(
302 uint32_t offset) {
303 auto i = buffer_map.lower_bound(offset);
304 if (i != buffer_map.begin()) {
305 --i;
306 if (i->first + i->second->length <= offset)
307 ++i;
308 }
309 return i;
310 }
311
312 // must be called under protection of the Cache lock
313 void _clear(Cache* cache);
314
315 // return value is the highest cache_private of a trimmed buffer, or 0.
316 int discard(Cache* cache, uint32_t offset, uint32_t length) {
317 std::lock_guard<std::recursive_mutex> l(cache->lock);
318 return _discard(cache, offset, length);
319 }
320 int _discard(Cache* cache, uint32_t offset, uint32_t length);
321
322 void write(Cache* cache, uint64_t seq, uint32_t offset, bufferlist& bl,
323 unsigned flags) {
324 std::lock_guard<std::recursive_mutex> l(cache->lock);
325 Buffer *b = new Buffer(this, Buffer::STATE_WRITING, seq, offset, bl,
326 flags);
327 b->cache_private = _discard(cache, offset, bl.length());
328 _add_buffer(cache, b, (flags & Buffer::FLAG_NOCACHE) ? 0 : 1, nullptr);
329 }
330 void finish_write(Cache* cache, uint64_t seq);
331 void did_read(Cache* cache, uint32_t offset, bufferlist& bl) {
332 std::lock_guard<std::recursive_mutex> l(cache->lock);
333 Buffer *b = new Buffer(this, Buffer::STATE_CLEAN, 0, offset, bl);
334 b->cache_private = _discard(cache, offset, bl.length());
335 _add_buffer(cache, b, 1, nullptr);
336 }
337
338 void read(Cache* cache, uint32_t offset, uint32_t length,
339 BlueStore::ready_regions_t& res,
340 interval_set<uint32_t>& res_intervals);
341
342 void truncate(Cache* cache, uint32_t offset) {
343 discard(cache, offset, (uint32_t)-1 - offset);
344 }
345
346 void split(Cache* cache, size_t pos, BufferSpace &r);
347
348 void dump(Cache* cache, Formatter *f) const {
349 std::lock_guard<std::recursive_mutex> l(cache->lock);
350 f->open_array_section("buffers");
351 for (auto& i : buffer_map) {
352 f->open_object_section("buffer");
353 assert(i.first == i.second->offset);
354 i.second->dump(f);
355 f->close_section();
356 }
357 f->close_section();
358 }
359 };
360
361 struct SharedBlobSet;
362
363 /// in-memory shared blob state (incl cached buffers)
364 struct SharedBlob {
365 MEMPOOL_CLASS_HELPERS();
366
367 std::atomic_int nref = {0}; ///< reference count
368 bool loaded = false;
369
370 CollectionRef coll;
371 union {
372 uint64_t sbid_unloaded; ///< sbid if persistent isn't loaded
373 bluestore_shared_blob_t *persistent; ///< persistent part of the shared blob if any
374 };
375 BufferSpace bc; ///< buffer cache
376
377 SharedBlob(Collection *_coll) : coll(_coll), sbid_unloaded(0) {
378 if (get_cache()) {
379 get_cache()->add_blob();
380 }
381 }
382 SharedBlob(uint64_t i, Collection *_coll);
383 ~SharedBlob();
384
385 uint64_t get_sbid() const {
386 return loaded ? persistent->sbid : sbid_unloaded;
387 }
388
389 friend void intrusive_ptr_add_ref(SharedBlob *b) { b->get(); }
390 friend void intrusive_ptr_release(SharedBlob *b) { b->put(); }
391
392 friend ostream& operator<<(ostream& out, const SharedBlob& sb);
393
394 void get() {
395 ++nref;
396 }
397 void put();
398
399 /// get logical references
400 void get_ref(uint64_t offset, uint32_t length);
401
402 /// put logical references, and get back any released extents
403 void put_ref(uint64_t offset, uint32_t length,
404 PExtentVector *r, set<SharedBlob*> *maybe_unshared_blobs);
405
406 friend bool operator==(const SharedBlob &l, const SharedBlob &r) {
407 return l.get_sbid() == r.get_sbid();
408 }
409 inline Cache* get_cache() {
410 return coll ? coll->cache : nullptr;
411 }
412 inline SharedBlobSet* get_parent() {
413 return coll ? &(coll->shared_blob_set) : nullptr;
414 }
415 inline bool is_loaded() const {
416 return loaded;
417 }
418
419 };
420 typedef boost::intrusive_ptr<SharedBlob> SharedBlobRef;
421
422 /// a lookup table of SharedBlobs
423 struct SharedBlobSet {
424 std::mutex lock; ///< protect lookup, insertion, removal
425
426 // we use a bare pointer because we don't want to affect the ref
427 // count
428 mempool::bluestore_cache_other::unordered_map<uint64_t,SharedBlob*> sb_map;
429
430 SharedBlobRef lookup(uint64_t sbid) {
431 std::lock_guard<std::mutex> l(lock);
432 auto p = sb_map.find(sbid);
433 if (p == sb_map.end()) {
434 return nullptr;
435 }
436 return p->second;
437 }
438
439 void add(Collection* coll, SharedBlob *sb) {
440 std::lock_guard<std::mutex> l(lock);
441 sb_map[sb->get_sbid()] = sb;
442 sb->coll = coll;
443 }
444
445 bool try_remove(SharedBlob *sb) {
446 std::lock_guard<std::mutex> l(lock);
447 if (sb->nref == 0) {
448 assert(sb->get_parent() == this);
449 sb_map.erase(sb->get_sbid());
450 return true;
451 }
452 return false;
453 }
454
455 void remove(SharedBlob *sb) {
456 std::lock_guard<std::mutex> l(lock);
457 assert(sb->get_parent() == this);
458 sb_map.erase(sb->get_sbid());
459 }
460
461 bool empty() {
462 std::lock_guard<std::mutex> l(lock);
463 return sb_map.empty();
464 }
465
466 void dump(CephContext *cct, int lvl);
467 };
468
469 //#define CACHE_BLOB_BL // not sure if this is a win yet or not... :/
470
471 /// in-memory blob metadata and associated cached buffers (if any)
472 struct Blob {
473 MEMPOOL_CLASS_HELPERS();
474
475 std::atomic_int nref = {0}; ///< reference count
476 int16_t id = -1; ///< id, for spanning blobs only, >= 0
477 int16_t last_encoded_id = -1; ///< (ephemeral) used during encoding only
478 SharedBlobRef shared_blob; ///< shared blob state (if any)
479
480 private:
481 mutable bluestore_blob_t blob; ///< decoded blob metadata
482 #ifdef CACHE_BLOB_BL
483 mutable bufferlist blob_bl; ///< cached encoded blob, blob is dirty if empty
484 #endif
485 /// refs from this shard. ephemeral if id<0, persisted if spanning.
486 bluestore_blob_use_tracker_t used_in_blob;
487
488 public:
489
490 friend void intrusive_ptr_add_ref(Blob *b) { b->get(); }
491 friend void intrusive_ptr_release(Blob *b) { b->put(); }
492
493 friend ostream& operator<<(ostream& out, const Blob &b);
494
495 const bluestore_blob_use_tracker_t& get_blob_use_tracker() const {
496 return used_in_blob;
497 }
498 bool is_referenced() const {
499 return used_in_blob.is_not_empty();
500 }
501 uint32_t get_referenced_bytes() const {
502 return used_in_blob.get_referenced_bytes();
503 }
504
505 bool is_spanning() const {
506 return id >= 0;
507 }
508
509 bool can_split() const {
510 std::lock_guard<std::recursive_mutex> l(shared_blob->get_cache()->lock);
511 // splitting a BufferSpace writing list is too hard; don't try.
512 return shared_blob->bc.writing.empty() &&
513 used_in_blob.can_split() &&
514 get_blob().can_split();
515 }
516
517 bool can_split_at(uint32_t blob_offset) const {
518 return used_in_blob.can_split_at(blob_offset) &&
519 get_blob().can_split_at(blob_offset);
520 }
521
522 bool can_reuse_blob(uint32_t min_alloc_size,
523 uint32_t target_blob_size,
524 uint32_t b_offset,
525 uint32_t *length0);
526
527 void dup(Blob& o) {
528 o.shared_blob = shared_blob;
529 o.blob = blob;
530 #ifdef CACHE_BLOB_BL
531 o.blob_bl = blob_bl;
532 #endif
533 }
534
535 inline const bluestore_blob_t& get_blob() const {
536 return blob;
537 }
538 inline bluestore_blob_t& dirty_blob() {
539 #ifdef CACHE_BLOB_BL
540 blob_bl.clear();
541 #endif
542 return blob;
543 }
544
545 /// discard buffers for unallocated regions
546 void discard_unallocated(Collection *coll);
547
548 /// get logical references
549 void get_ref(Collection *coll, uint32_t offset, uint32_t length);
550 /// put logical references, and get back any released extents
551 bool put_ref(Collection *coll, uint32_t offset, uint32_t length,
552 PExtentVector *r);
553
554 /// split the blob
555 void split(Collection *coll, uint32_t blob_offset, Blob *o);
556
557 void get() {
558 ++nref;
559 }
560 void put() {
561 if (--nref == 0)
562 delete this;
563 }
564
565
566 #ifdef CACHE_BLOB_BL
567 void _encode() const {
568 if (blob_bl.length() == 0 ) {
569 ::encode(blob, blob_bl);
570 } else {
571 assert(blob_bl.length());
572 }
573 }
574 void bound_encode(
575 size_t& p,
576 bool include_ref_map) const {
577 _encode();
578 p += blob_bl.length();
579 if (include_ref_map) {
580 used_in_blob.bound_encode(p);
581 }
582 }
583 void encode(
584 bufferlist::contiguous_appender& p,
585 bool include_ref_map) const {
586 _encode();
587 p.append(blob_bl);
588 if (include_ref_map) {
589 used_in_blob.encode(p);
590 }
591 }
592 void decode(
593 Collection */*coll*/,
594 bufferptr::iterator& p,
595 bool include_ref_map) {
596 const char *start = p.get_pos();
597 denc(blob, p);
598 const char *end = p.get_pos();
599 blob_bl.clear();
600 blob_bl.append(start, end - start);
601 if (include_ref_map) {
602 used_in_blob.decode(p);
603 }
604 }
605 #else
606 void bound_encode(
607 size_t& p,
608 uint64_t struct_v,
609 uint64_t sbid,
610 bool include_ref_map) const {
611 denc(blob, p, struct_v);
612 if (blob.is_shared()) {
613 denc(sbid, p);
614 }
615 if (include_ref_map) {
616 used_in_blob.bound_encode(p);
617 }
618 }
619 void encode(
620 bufferlist::contiguous_appender& p,
621 uint64_t struct_v,
622 uint64_t sbid,
623 bool include_ref_map) const {
624 denc(blob, p, struct_v);
625 if (blob.is_shared()) {
626 denc(sbid, p);
627 }
628 if (include_ref_map) {
629 used_in_blob.encode(p);
630 }
631 }
632 void decode(
633 Collection *coll,
634 bufferptr::iterator& p,
635 uint64_t struct_v,
636 uint64_t* sbid,
637 bool include_ref_map);
638 #endif
639 };
640 typedef boost::intrusive_ptr<Blob> BlobRef;
641 typedef mempool::bluestore_cache_other::map<int,BlobRef> blob_map_t;
642
643 /// a logical extent, pointing to (some portion of) a blob
644 typedef boost::intrusive::set_base_hook<boost::intrusive::optimize_size<true> > ExtentBase; //making an alias to avoid build warnings
645 struct Extent : public ExtentBase {
646 MEMPOOL_CLASS_HELPERS();
647
648 uint32_t logical_offset = 0; ///< logical offset
649 uint32_t blob_offset = 0; ///< blob offset
650 uint32_t length = 0; ///< length
651 BlobRef blob; ///< the blob with our data
652
653 /// ctor for lookup only
654 explicit Extent(uint32_t lo) : ExtentBase(), logical_offset(lo) { }
655 /// ctor for delayed initialization (see decode_some())
656 explicit Extent() : ExtentBase() {
657 }
658 /// ctor for general usage
659 Extent(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b)
660 : ExtentBase(),
661 logical_offset(lo), blob_offset(o), length(l) {
662 assign_blob(b);
663 }
664 ~Extent() {
665 if (blob) {
666 blob->shared_blob->get_cache()->rm_extent();
667 }
668 }
669
670 void assign_blob(const BlobRef& b) {
671 assert(!blob);
672 blob = b;
673 blob->shared_blob->get_cache()->add_extent();
674 }
675
676 // comparators for intrusive_set
677 friend bool operator<(const Extent &a, const Extent &b) {
678 return a.logical_offset < b.logical_offset;
679 }
680 friend bool operator>(const Extent &a, const Extent &b) {
681 return a.logical_offset > b.logical_offset;
682 }
683 friend bool operator==(const Extent &a, const Extent &b) {
684 return a.logical_offset == b.logical_offset;
685 }
686
687 uint32_t blob_start() const {
688 return logical_offset - blob_offset;
689 }
690
691 uint32_t blob_end() const {
692 return blob_start() + blob->get_blob().get_logical_length();
693 }
694
695 uint32_t logical_end() const {
696 return logical_offset + length;
697 }
698
699 // return true if any piece of the blob is out of
700 // the given range [o, o + l].
701 bool blob_escapes_range(uint32_t o, uint32_t l) const {
702 return blob_start() < o || blob_end() > o + l;
703 }
704 };
705 typedef boost::intrusive::set<Extent> extent_map_t;
706
707
708 friend ostream& operator<<(ostream& out, const Extent& e);
709
710 struct OldExtent {
711 boost::intrusive::list_member_hook<> old_extent_item;
712 Extent e;
713 PExtentVector r;
714 bool blob_empty; // flag to track the last removed extent that makes blob
715 // empty - required to update compression stat properly
716 OldExtent(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b)
717 : e(lo, o, l, b), blob_empty(false) {
718 }
719 static OldExtent* create(CollectionRef c,
720 uint32_t lo,
721 uint32_t o,
722 uint32_t l,
723 BlobRef& b);
724 };
725 typedef boost::intrusive::list<
726 OldExtent,
727 boost::intrusive::member_hook<
728 OldExtent,
729 boost::intrusive::list_member_hook<>,
730 &OldExtent::old_extent_item> > old_extent_map_t;
731
732 struct Onode;
733
734 /// a sharded extent map, mapping offsets to lextents to blobs
735 struct ExtentMap {
736 Onode *onode;
737 extent_map_t extent_map; ///< map of Extents to Blobs
738 blob_map_t spanning_blob_map; ///< blobs that span shards
739
740 struct Shard {
741 bluestore_onode_t::shard_info *shard_info = nullptr;
742 unsigned extents = 0; ///< count extents in this shard
743 bool loaded = false; ///< true if shard is loaded
744 bool dirty = false; ///< true if shard is dirty and needs reencoding
745 };
746 mempool::bluestore_cache_other::vector<Shard> shards; ///< shards
747
748 bufferlist inline_bl; ///< cached encoded map, if unsharded; empty=>dirty
749
750 uint32_t needs_reshard_begin = 0;
751 uint32_t needs_reshard_end = 0;
752
753 bool needs_reshard() const {
754 return needs_reshard_end > needs_reshard_begin;
755 }
756 void clear_needs_reshard() {
757 needs_reshard_begin = needs_reshard_end = 0;
758 }
759 void request_reshard(uint32_t begin, uint32_t end) {
760 if (begin < needs_reshard_begin) {
761 needs_reshard_begin = begin;
762 }
763 if (end > needs_reshard_end) {
764 needs_reshard_end = end;
765 }
766 }
767
768 struct DeleteDisposer {
769 void operator()(Extent *e) { delete e; }
770 };
771
772 ExtentMap(Onode *o);
773 ~ExtentMap() {
774 extent_map.clear_and_dispose(DeleteDisposer());
775 }
776
777 void clear() {
778 extent_map.clear_and_dispose(DeleteDisposer());
779 shards.clear();
780 inline_bl.clear();
781 clear_needs_reshard();
782 }
783
784 bool encode_some(uint32_t offset, uint32_t length, bufferlist& bl,
785 unsigned *pn);
786 unsigned decode_some(bufferlist& bl);
787
788 void bound_encode_spanning_blobs(size_t& p);
789 void encode_spanning_blobs(bufferlist::contiguous_appender& p);
790 void decode_spanning_blobs(bufferptr::iterator& p);
791
792 BlobRef get_spanning_blob(int id) {
793 auto p = spanning_blob_map.find(id);
794 assert(p != spanning_blob_map.end());
795 return p->second;
796 }
797
798 void update(KeyValueDB::Transaction t, bool force);
799 decltype(BlueStore::Blob::id) allocate_spanning_blob_id();
800 void reshard(
801 KeyValueDB *db,
802 KeyValueDB::Transaction t);
803
804 /// initialize Shards from the onode
805 void init_shards(bool loaded, bool dirty);
806
807 /// return index of shard containing offset
808 /// or -1 if not found
809 int seek_shard(uint32_t offset) {
810 size_t end = shards.size();
811 size_t mid, left = 0;
812 size_t right = end; // one passed the right end
813
814 while (left < right) {
815 mid = left + (right - left) / 2;
816 if (offset >= shards[mid].shard_info->offset) {
817 size_t next = mid + 1;
818 if (next >= end || offset < shards[next].shard_info->offset)
819 return mid;
820 //continue to search forwards
821 left = next;
822 } else {
823 //continue to search backwards
824 right = mid;
825 }
826 }
827
828 return -1; // not found
829 }
830
831 /// check if a range spans a shard
832 bool spans_shard(uint32_t offset, uint32_t length) {
833 if (shards.empty()) {
834 return false;
835 }
836 int s = seek_shard(offset);
837 assert(s >= 0);
838 if (s == (int)shards.size() - 1) {
839 return false; // last shard
840 }
841 if (offset + length <= shards[s+1].shard_info->offset) {
842 return false;
843 }
844 return true;
845 }
846
847 /// ensure that a range of the map is loaded
848 void fault_range(KeyValueDB *db,
849 uint32_t offset, uint32_t length);
850
851 /// ensure a range of the map is marked dirty
852 void dirty_range(uint32_t offset, uint32_t length);
853
854 /// for seek_lextent test
855 extent_map_t::iterator find(uint64_t offset);
856
857 /// seek to the first lextent including or after offset
858 extent_map_t::iterator seek_lextent(uint64_t offset);
859 extent_map_t::const_iterator seek_lextent(uint64_t offset) const;
860
861 /// add a new Extent
862 void add(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b) {
863 extent_map.insert(*new Extent(lo, o, l, b));
864 }
865
866 /// remove (and delete) an Extent
867 void rm(extent_map_t::iterator p) {
868 extent_map.erase_and_dispose(p, DeleteDisposer());
869 }
870
871 bool has_any_lextents(uint64_t offset, uint64_t length);
872
873 /// consolidate adjacent lextents in extent_map
874 int compress_extent_map(uint64_t offset, uint64_t length);
875
876 /// punch a logical hole. add lextents to deref to target list.
877 void punch_hole(CollectionRef &c,
878 uint64_t offset, uint64_t length,
879 old_extent_map_t *old_extents);
880
881 /// put new lextent into lextent_map overwriting existing ones if
882 /// any and update references accordingly
883 Extent *set_lextent(CollectionRef &c,
884 uint64_t logical_offset,
885 uint64_t offset, uint64_t length,
886 BlobRef b,
887 old_extent_map_t *old_extents);
888
889 /// split a blob (and referring extents)
890 BlobRef split_blob(BlobRef lb, uint32_t blob_offset, uint32_t pos);
891 };
892
893 /// Compressed Blob Garbage collector
894 /*
895 The primary idea of the collector is to estimate a difference between
896 allocation units(AU) currently present for compressed blobs and new AUs
897 required to store that data uncompressed.
898 Estimation is performed for protrusive extents within a logical range
899 determined by a concatenation of old_extents collection and specific(current)
900 write request.
901 The root cause for old_extents use is the need to handle blob ref counts
902 properly. Old extents still hold blob refs and hence we need to traverse
903 the collection to determine if blob to be released.
904 Protrusive extents are extents that fit into the blob set in action
905 (ones that are below the logical range from above) but not removed totally
906 due to the current write.
907 E.g. for
908 extent1 <loffs = 100, boffs = 100, len = 100> ->
909 blob1<compressed, len_on_disk=4096, logical_len=8192>
910 extent2 <loffs = 200, boffs = 200, len = 100> ->
911 blob2<raw, len_on_disk=4096, llen=4096>
912 extent3 <loffs = 300, boffs = 300, len = 100> ->
913 blob1<compressed, len_on_disk=4096, llen=8192>
914 extent4 <loffs = 4096, boffs = 0, len = 100> ->
915 blob3<raw, len_on_disk=4096, llen=4096>
916 write(300~100)
917 protrusive extents are within the following ranges <0~300, 400~8192-400>
918 In this case existing AUs that might be removed due to GC (i.e. blob1)
919 use 2x4K bytes.
920 And new AUs expected after GC = 0 since extent1 to be merged into blob2.
921 Hence we should do a collect.
922 */
923 class GarbageCollector
924 {
925 public:
926 /// return amount of allocation units that might be saved due to GC
927 int64_t estimate(
928 uint64_t offset,
929 uint64_t length,
930 const ExtentMap& extent_map,
931 const old_extent_map_t& old_extents,
932 uint64_t min_alloc_size);
933
934 /// return a collection of extents to perform GC on
935 const vector<AllocExtent>& get_extents_to_collect() const {
936 return extents_to_collect;
937 }
938 GarbageCollector(CephContext* _cct) : cct(_cct) {}
939
940 private:
941 struct BlobInfo {
942 uint64_t referenced_bytes = 0; ///< amount of bytes referenced in blob
943 int64_t expected_allocations = 0; ///< new alloc units required
944 ///< in case of gc fulfilled
945 bool collect_candidate = false; ///< indicate if blob has any extents
946 ///< eligible for GC.
947 extent_map_t::const_iterator first_lextent; ///< points to the first
948 ///< lextent referring to
949 ///< the blob if any.
950 ///< collect_candidate flag
951 ///< determines the validity
952 extent_map_t::const_iterator last_lextent; ///< points to the last
953 ///< lextent referring to
954 ///< the blob if any.
955
956 BlobInfo(uint64_t ref_bytes) :
957 referenced_bytes(ref_bytes) {
958 }
959 };
960 CephContext* cct;
961 map<Blob*, BlobInfo> affected_blobs; ///< compressed blobs and their ref_map
962 ///< copies that are affected by the
963 ///< specific write
964
965 vector<AllocExtent> extents_to_collect; ///< protrusive extents that should
966 ///< be collected if GC takes place
967
968 boost::optional<uint64_t > used_alloc_unit; ///< last processed allocation
969 ///< unit when traversing
970 ///< protrusive extents.
971 ///< Other extents mapped to
972 ///< this AU to be ignored
973 ///< (except the case where
974 ///< uncompressed extent follows
975 ///< compressed one - see below).
976 BlobInfo* blob_info_counted = nullptr; ///< set if previous allocation unit
977 ///< caused expected_allocations
978 ///< counter increment at this blob.
979 ///< if uncompressed extent follows
980 ///< a decrement for the
981 ///< expected_allocations counter
982 ///< is needed
983 int64_t expected_allocations = 0; ///< new alloc units required in case
984 ///< of gc fulfilled
985 int64_t expected_for_release = 0; ///< alloc units currently used by
986 ///< compressed blobs that might
987 ///< gone after GC
988 uint64_t gc_start_offset; ///starting offset for GC
989 uint64_t gc_end_offset; ///ending offset for GC
990
991 protected:
992 void process_protrusive_extents(const BlueStore::ExtentMap& extent_map,
993 uint64_t start_offset,
994 uint64_t end_offset,
995 uint64_t start_touch_offset,
996 uint64_t end_touch_offset,
997 uint64_t min_alloc_size);
998 };
999
1000 struct OnodeSpace;
1001
1002 /// an in-memory object
1003 struct Onode {
1004 MEMPOOL_CLASS_HELPERS();
1005
1006 std::atomic_int nref; ///< reference count
1007 Collection *c;
1008
1009 ghobject_t oid;
1010
1011 /// key under PREFIX_OBJ where we are stored
1012 mempool::bluestore_cache_other::string key;
1013
1014 boost::intrusive::list_member_hook<> lru_item;
1015
1016 bluestore_onode_t onode; ///< metadata stored as value in kv store
1017 bool exists; ///< true if object logically exists
1018
1019 ExtentMap extent_map;
1020
1021 // track txc's that have not been committed to kv store (and whose
1022 // effects cannot be read via the kvdb read methods)
1023 std::atomic<int> flushing_count = {0};
1024 std::mutex flush_lock; ///< protect flush_txns
1025 std::condition_variable flush_cond; ///< wait here for uncommitted txns
1026
1027 Onode(Collection *c, const ghobject_t& o,
1028 const mempool::bluestore_cache_other::string& k)
1029 : nref(0),
1030 c(c),
1031 oid(o),
1032 key(k),
1033 exists(false),
1034 extent_map(this) {
1035 }
1036
1037 void flush();
1038 void get() {
1039 ++nref;
1040 }
1041 void put() {
1042 if (--nref == 0)
1043 delete this;
1044 }
1045 };
1046 typedef boost::intrusive_ptr<Onode> OnodeRef;
1047
1048
1049 /// a cache (shard) of onodes and buffers
1050 struct Cache {
1051 CephContext* cct;
1052 PerfCounters *logger;
1053 std::recursive_mutex lock; ///< protect lru and other structures
1054
1055 std::atomic<uint64_t> num_extents = {0};
1056 std::atomic<uint64_t> num_blobs = {0};
1057
1058 static Cache *create(CephContext* cct, string type, PerfCounters *logger);
1059
1060 Cache(CephContext* cct) : cct(cct), logger(nullptr) {}
1061 virtual ~Cache() {}
1062
1063 virtual void _add_onode(OnodeRef& o, int level) = 0;
1064 virtual void _rm_onode(OnodeRef& o) = 0;
1065 virtual void _touch_onode(OnodeRef& o) = 0;
1066
1067 virtual void _add_buffer(Buffer *b, int level, Buffer *near) = 0;
1068 virtual void _rm_buffer(Buffer *b) = 0;
1069 virtual void _move_buffer(Cache *src, Buffer *b) = 0;
1070 virtual void _adjust_buffer_size(Buffer *b, int64_t delta) = 0;
1071 virtual void _touch_buffer(Buffer *b) = 0;
1072
1073 virtual uint64_t _get_num_onodes() = 0;
1074 virtual uint64_t _get_buffer_bytes() = 0;
1075
1076 void add_extent() {
1077 ++num_extents;
1078 }
1079 void rm_extent() {
1080 --num_extents;
1081 }
1082
1083 void add_blob() {
1084 ++num_blobs;
1085 }
1086 void rm_blob() {
1087 --num_blobs;
1088 }
1089
1090 void trim(uint64_t target_bytes,
1091 float target_meta_ratio,
1092 float target_data_ratio,
1093 float bytes_per_onode);
1094
1095 void trim_all();
1096
1097 virtual void _trim(uint64_t onode_max, uint64_t buffer_max) = 0;
1098
1099 virtual void add_stats(uint64_t *onodes, uint64_t *extents,
1100 uint64_t *blobs,
1101 uint64_t *buffers,
1102 uint64_t *bytes) = 0;
1103
1104 bool empty() {
1105 std::lock_guard<std::recursive_mutex> l(lock);
1106 return _get_num_onodes() == 0 && _get_buffer_bytes() == 0;
1107 }
1108
1109 #ifdef DEBUG_CACHE
1110 virtual void _audit(const char *s) = 0;
1111 #else
1112 void _audit(const char *s) { /* no-op */ }
1113 #endif
1114 };
1115
1116 /// simple LRU cache for onodes and buffers
1117 struct LRUCache : public Cache {
1118 private:
1119 typedef boost::intrusive::list<
1120 Onode,
1121 boost::intrusive::member_hook<
1122 Onode,
1123 boost::intrusive::list_member_hook<>,
1124 &Onode::lru_item> > onode_lru_list_t;
1125 typedef boost::intrusive::list<
1126 Buffer,
1127 boost::intrusive::member_hook<
1128 Buffer,
1129 boost::intrusive::list_member_hook<>,
1130 &Buffer::lru_item> > buffer_lru_list_t;
1131
1132 onode_lru_list_t onode_lru;
1133
1134 buffer_lru_list_t buffer_lru;
1135 uint64_t buffer_size = 0;
1136
1137 public:
1138 LRUCache(CephContext* cct) : Cache(cct) {}
1139 uint64_t _get_num_onodes() override {
1140 return onode_lru.size();
1141 }
1142 void _add_onode(OnodeRef& o, int level) override {
1143 if (level > 0)
1144 onode_lru.push_front(*o);
1145 else
1146 onode_lru.push_back(*o);
1147 }
1148 void _rm_onode(OnodeRef& o) override {
1149 auto q = onode_lru.iterator_to(*o);
1150 onode_lru.erase(q);
1151 }
1152 void _touch_onode(OnodeRef& o) override;
1153
1154 uint64_t _get_buffer_bytes() override {
1155 return buffer_size;
1156 }
1157 void _add_buffer(Buffer *b, int level, Buffer *near) override {
1158 if (near) {
1159 auto q = buffer_lru.iterator_to(*near);
1160 buffer_lru.insert(q, *b);
1161 } else if (level > 0) {
1162 buffer_lru.push_front(*b);
1163 } else {
1164 buffer_lru.push_back(*b);
1165 }
1166 buffer_size += b->length;
1167 }
1168 void _rm_buffer(Buffer *b) override {
1169 assert(buffer_size >= b->length);
1170 buffer_size -= b->length;
1171 auto q = buffer_lru.iterator_to(*b);
1172 buffer_lru.erase(q);
1173 }
1174 void _move_buffer(Cache *src, Buffer *b) override {
1175 src->_rm_buffer(b);
1176 _add_buffer(b, 0, nullptr);
1177 }
1178 void _adjust_buffer_size(Buffer *b, int64_t delta) override {
1179 assert((int64_t)buffer_size + delta >= 0);
1180 buffer_size += delta;
1181 }
1182 void _touch_buffer(Buffer *b) override {
1183 auto p = buffer_lru.iterator_to(*b);
1184 buffer_lru.erase(p);
1185 buffer_lru.push_front(*b);
1186 _audit("_touch_buffer end");
1187 }
1188
1189 void _trim(uint64_t onode_max, uint64_t buffer_max) override;
1190
1191 void add_stats(uint64_t *onodes, uint64_t *extents,
1192 uint64_t *blobs,
1193 uint64_t *buffers,
1194 uint64_t *bytes) override {
1195 std::lock_guard<std::recursive_mutex> l(lock);
1196 *onodes += onode_lru.size();
1197 *extents += num_extents;
1198 *blobs += num_blobs;
1199 *buffers += buffer_lru.size();
1200 *bytes += buffer_size;
1201 }
1202
1203 #ifdef DEBUG_CACHE
1204 void _audit(const char *s) override;
1205 #endif
1206 };
1207
1208 // 2Q cache for buffers, LRU for onodes
1209 struct TwoQCache : public Cache {
1210 private:
1211 // stick with LRU for onodes for now (fixme?)
1212 typedef boost::intrusive::list<
1213 Onode,
1214 boost::intrusive::member_hook<
1215 Onode,
1216 boost::intrusive::list_member_hook<>,
1217 &Onode::lru_item> > onode_lru_list_t;
1218 typedef boost::intrusive::list<
1219 Buffer,
1220 boost::intrusive::member_hook<
1221 Buffer,
1222 boost::intrusive::list_member_hook<>,
1223 &Buffer::lru_item> > buffer_list_t;
1224
1225 onode_lru_list_t onode_lru;
1226
1227 buffer_list_t buffer_hot; ///< "Am" hot buffers
1228 buffer_list_t buffer_warm_in; ///< "A1in" newly warm buffers
1229 buffer_list_t buffer_warm_out; ///< "A1out" empty buffers we've evicted
1230 uint64_t buffer_bytes = 0; ///< bytes
1231
1232 enum {
1233 BUFFER_NEW = 0,
1234 BUFFER_WARM_IN, ///< in buffer_warm_in
1235 BUFFER_WARM_OUT, ///< in buffer_warm_out
1236 BUFFER_HOT, ///< in buffer_hot
1237 BUFFER_TYPE_MAX
1238 };
1239
1240 uint64_t buffer_list_bytes[BUFFER_TYPE_MAX] = {0}; ///< bytes per type
1241
1242 public:
1243 TwoQCache(CephContext* cct) : Cache(cct) {}
1244 uint64_t _get_num_onodes() override {
1245 return onode_lru.size();
1246 }
1247 void _add_onode(OnodeRef& o, int level) override {
1248 if (level > 0)
1249 onode_lru.push_front(*o);
1250 else
1251 onode_lru.push_back(*o);
1252 }
1253 void _rm_onode(OnodeRef& o) override {
1254 auto q = onode_lru.iterator_to(*o);
1255 onode_lru.erase(q);
1256 }
1257 void _touch_onode(OnodeRef& o) override;
1258
1259 uint64_t _get_buffer_bytes() override {
1260 return buffer_bytes;
1261 }
1262 void _add_buffer(Buffer *b, int level, Buffer *near) override;
1263 void _rm_buffer(Buffer *b) override;
1264 void _move_buffer(Cache *src, Buffer *b) override;
1265 void _adjust_buffer_size(Buffer *b, int64_t delta) override;
1266 void _touch_buffer(Buffer *b) override {
1267 switch (b->cache_private) {
1268 case BUFFER_WARM_IN:
1269 // do nothing (somewhat counter-intuitively!)
1270 break;
1271 case BUFFER_WARM_OUT:
1272 // move from warm_out to hot LRU
1273 assert(0 == "this happens via discard hint");
1274 break;
1275 case BUFFER_HOT:
1276 // move to front of hot LRU
1277 buffer_hot.erase(buffer_hot.iterator_to(*b));
1278 buffer_hot.push_front(*b);
1279 break;
1280 }
1281 _audit("_touch_buffer end");
1282 }
1283
1284 void _trim(uint64_t onode_max, uint64_t buffer_max) override;
1285
1286 void add_stats(uint64_t *onodes, uint64_t *extents,
1287 uint64_t *blobs,
1288 uint64_t *buffers,
1289 uint64_t *bytes) override {
1290 std::lock_guard<std::recursive_mutex> l(lock);
1291 *onodes += onode_lru.size();
1292 *extents += num_extents;
1293 *blobs += num_blobs;
1294 *buffers += buffer_hot.size() + buffer_warm_in.size();
1295 *bytes += buffer_bytes;
1296 }
1297
1298 #ifdef DEBUG_CACHE
1299 void _audit(const char *s) override;
1300 #endif
1301 };
1302
1303 struct OnodeSpace {
1304 private:
1305 Cache *cache;
1306
1307 /// forward lookups
1308 mempool::bluestore_cache_other::unordered_map<ghobject_t,OnodeRef> onode_map;
1309
1310 friend class Collection; // for split_cache()
1311
1312 public:
1313 OnodeSpace(Cache *c) : cache(c) {}
1314 ~OnodeSpace() {
1315 clear();
1316 }
1317
1318 OnodeRef add(const ghobject_t& oid, OnodeRef o);
1319 OnodeRef lookup(const ghobject_t& o);
1320 void remove(const ghobject_t& oid) {
1321 onode_map.erase(oid);
1322 }
1323 void rename(OnodeRef& o, const ghobject_t& old_oid,
1324 const ghobject_t& new_oid,
1325 const mempool::bluestore_cache_other::string& new_okey);
1326 void clear();
1327 bool empty();
1328
1329 void dump(CephContext *cct, int lvl);
1330
1331 /// return true if f true for any item
1332 bool map_any(std::function<bool(OnodeRef)> f);
1333 };
1334
1335 struct Collection : public CollectionImpl {
1336 BlueStore *store;
1337 Cache *cache; ///< our cache shard
1338 coll_t cid;
1339 bluestore_cnode_t cnode;
1340 RWLock lock;
1341
1342 bool exists;
1343
1344 SharedBlobSet shared_blob_set; ///< open SharedBlobs
1345
1346 // cache onodes on a per-collection basis to avoid lock
1347 // contention.
1348 OnodeSpace onode_map;
1349
1350 //pool options
1351 pool_opts_t pool_opts;
1352
1353 OnodeRef get_onode(const ghobject_t& oid, bool create);
1354
1355 // the terminology is confusing here, sorry!
1356 //
1357 // blob_t shared_blob_t
1358 // !shared unused -> open
1359 // shared !loaded -> open + shared
1360 // shared loaded -> open + shared + loaded
1361 //
1362 // i.e.,
1363 // open = SharedBlob is instantiated
1364 // shared = blob_t shared flag is set; SharedBlob is hashed.
1365 // loaded = SharedBlob::shared_blob_t is loaded from kv store
1366 void open_shared_blob(uint64_t sbid, BlobRef b);
1367 void load_shared_blob(SharedBlobRef sb);
1368 void make_blob_shared(uint64_t sbid, BlobRef b);
1369 uint64_t make_blob_unshared(SharedBlob *sb);
1370
1371 BlobRef new_blob() {
1372 BlobRef b = new Blob();
1373 b->shared_blob = new SharedBlob(this);
1374 return b;
1375 }
1376
1377 const coll_t &get_cid() override {
1378 return cid;
1379 }
1380
1381 bool contains(const ghobject_t& oid) {
1382 if (cid.is_meta())
1383 return oid.hobj.pool == -1;
1384 spg_t spgid;
1385 if (cid.is_pg(&spgid))
1386 return
1387 spgid.pgid.contains(cnode.bits, oid) &&
1388 oid.shard_id == spgid.shard;
1389 return false;
1390 }
1391
1392 void split_cache(Collection *dest);
1393
1394 Collection(BlueStore *ns, Cache *ca, coll_t c);
1395 };
1396
1397 class OmapIteratorImpl : public ObjectMap::ObjectMapIteratorImpl {
1398 CollectionRef c;
1399 OnodeRef o;
1400 KeyValueDB::Iterator it;
1401 string head, tail;
1402 public:
1403 OmapIteratorImpl(CollectionRef c, OnodeRef o, KeyValueDB::Iterator it);
1404 int seek_to_first() override;
1405 int upper_bound(const string &after) override;
1406 int lower_bound(const string &to) override;
1407 bool valid() override;
1408 int next(bool validate=true) override;
1409 string key() override;
1410 bufferlist value() override;
1411 int status() override {
1412 return 0;
1413 }
1414 };
1415
1416 class OpSequencer;
1417 typedef boost::intrusive_ptr<OpSequencer> OpSequencerRef;
1418
1419 struct volatile_statfs{
1420 enum {
1421 STATFS_ALLOCATED = 0,
1422 STATFS_STORED,
1423 STATFS_COMPRESSED_ORIGINAL,
1424 STATFS_COMPRESSED,
1425 STATFS_COMPRESSED_ALLOCATED,
1426 STATFS_LAST
1427 };
1428 int64_t values[STATFS_LAST];
1429 volatile_statfs() {
1430 memset(this, 0, sizeof(volatile_statfs));
1431 }
1432 void reset() {
1433 *this = volatile_statfs();
1434 }
1435 volatile_statfs& operator+=(const volatile_statfs& other) {
1436 for (size_t i = 0; i < STATFS_LAST; ++i) {
1437 values[i] += other.values[i];
1438 }
1439 return *this;
1440 }
1441 int64_t& allocated() {
1442 return values[STATFS_ALLOCATED];
1443 }
1444 int64_t& stored() {
1445 return values[STATFS_STORED];
1446 }
1447 int64_t& compressed_original() {
1448 return values[STATFS_COMPRESSED_ORIGINAL];
1449 }
1450 int64_t& compressed() {
1451 return values[STATFS_COMPRESSED];
1452 }
1453 int64_t& compressed_allocated() {
1454 return values[STATFS_COMPRESSED_ALLOCATED];
1455 }
1456 bool is_empty() {
1457 return values[STATFS_ALLOCATED] == 0 &&
1458 values[STATFS_STORED] == 0 &&
1459 values[STATFS_COMPRESSED] == 0 &&
1460 values[STATFS_COMPRESSED_ORIGINAL] == 0 &&
1461 values[STATFS_COMPRESSED_ALLOCATED] == 0;
1462 }
1463 void decode(bufferlist::iterator& it) {
1464 for (size_t i = 0; i < STATFS_LAST; i++) {
1465 ::decode(values[i], it);
1466 }
1467 }
1468
1469 void encode(bufferlist& bl) {
1470 for (size_t i = 0; i < STATFS_LAST; i++) {
1471 ::encode(values[i], bl);
1472 }
1473 }
1474 };
1475
1476 struct TransContext : public AioContext {
1477 MEMPOOL_CLASS_HELPERS();
1478
1479 typedef enum {
1480 STATE_PREPARE,
1481 STATE_AIO_WAIT,
1482 STATE_IO_DONE,
1483 STATE_KV_QUEUED, // queued for kv_sync_thread submission
1484 STATE_KV_SUBMITTED, // submitted to kv; not yet synced
1485 STATE_KV_DONE,
1486 STATE_DEFERRED_QUEUED, // in deferred_queue (pending or running)
1487 STATE_DEFERRED_CLEANUP, // remove deferred kv record
1488 STATE_DEFERRED_DONE,
1489 STATE_FINISHING,
1490 STATE_DONE,
1491 } state_t;
1492
1493 state_t state = STATE_PREPARE;
1494
1495 const char *get_state_name() {
1496 switch (state) {
1497 case STATE_PREPARE: return "prepare";
1498 case STATE_AIO_WAIT: return "aio_wait";
1499 case STATE_IO_DONE: return "io_done";
1500 case STATE_KV_QUEUED: return "kv_queued";
1501 case STATE_KV_SUBMITTED: return "kv_submitted";
1502 case STATE_KV_DONE: return "kv_done";
1503 case STATE_DEFERRED_QUEUED: return "deferred_queued";
1504 case STATE_DEFERRED_CLEANUP: return "deferred_cleanup";
1505 case STATE_DEFERRED_DONE: return "deferred_done";
1506 case STATE_FINISHING: return "finishing";
1507 case STATE_DONE: return "done";
1508 }
1509 return "???";
1510 }
1511
1512 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1513 const char *get_state_latency_name(int state) {
1514 switch (state) {
1515 case l_bluestore_state_prepare_lat: return "prepare";
1516 case l_bluestore_state_aio_wait_lat: return "aio_wait";
1517 case l_bluestore_state_io_done_lat: return "io_done";
1518 case l_bluestore_state_kv_queued_lat: return "kv_queued";
1519 case l_bluestore_state_kv_committing_lat: return "kv_committing";
1520 case l_bluestore_state_kv_done_lat: return "kv_done";
1521 case l_bluestore_state_deferred_queued_lat: return "deferred_queued";
1522 case l_bluestore_state_deferred_cleanup_lat: return "deferred_cleanup";
1523 case l_bluestore_state_finishing_lat: return "finishing";
1524 case l_bluestore_state_done_lat: return "done";
1525 }
1526 return "???";
1527 }
1528 #endif
1529
1530 void log_state_latency(PerfCounters *logger, int state) {
1531 utime_t lat, now = ceph_clock_now();
1532 lat = now - last_stamp;
1533 logger->tinc(state, lat);
1534 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1535 if (state >= l_bluestore_state_prepare_lat && state <= l_bluestore_state_done_lat) {
1536 double usecs = (now.to_nsec()-last_stamp.to_nsec())/1000;
1537 OID_ELAPSED("", usecs, get_state_latency_name(state));
1538 }
1539 #endif
1540 last_stamp = now;
1541 }
1542
1543 OpSequencerRef osr;
1544 boost::intrusive::list_member_hook<> sequencer_item;
1545
1546 uint64_t bytes = 0, cost = 0;
1547
1548 set<OnodeRef> onodes; ///< these need to be updated/written
1549 set<OnodeRef> modified_objects; ///< objects we modified (and need a ref)
1550 set<SharedBlobRef> shared_blobs; ///< these need to be updated/written
1551 set<SharedBlobRef> shared_blobs_written; ///< update these on io completion
1552
1553 KeyValueDB::Transaction t; ///< then we will commit this
1554 Context *oncommit = nullptr; ///< signal on commit
1555 Context *onreadable = nullptr; ///< signal on readable
1556 Context *onreadable_sync = nullptr; ///< signal on readable
1557 list<Context*> oncommits; ///< more commit completions
1558 list<CollectionRef> removed_collections; ///< colls we removed
1559
1560 boost::intrusive::list_member_hook<> deferred_queue_item;
1561 bluestore_deferred_transaction_t *deferred_txn = nullptr; ///< if any
1562
1563 interval_set<uint64_t> allocated, released;
1564 volatile_statfs statfs_delta;
1565
1566 IOContext ioc;
1567 bool had_ios = false; ///< true if we submitted IOs before our kv txn
1568
1569 uint64_t seq = 0;
1570 utime_t start;
1571 utime_t last_stamp;
1572
1573 uint64_t last_nid = 0; ///< if non-zero, highest new nid we allocated
1574 uint64_t last_blobid = 0; ///< if non-zero, highest new blobid we allocated
1575
1576 explicit TransContext(CephContext* cct, OpSequencer *o)
1577 : osr(o),
1578 ioc(cct, this),
1579 start(ceph_clock_now()) {
1580 last_stamp = start;
1581 }
1582 ~TransContext() {
1583 delete deferred_txn;
1584 }
1585
1586 void write_onode(OnodeRef &o) {
1587 onodes.insert(o);
1588 }
1589 void write_shared_blob(SharedBlobRef &sb) {
1590 shared_blobs.insert(sb);
1591 }
1592 void unshare_blob(SharedBlob *sb) {
1593 shared_blobs.erase(sb);
1594 }
1595
1596 /// note we logically modified object (when onode itself is unmodified)
1597 void note_modified_object(OnodeRef &o) {
1598 // onode itself isn't written, though
1599 modified_objects.insert(o);
1600 }
1601 void removed(OnodeRef& o) {
1602 onodes.erase(o);
1603 modified_objects.erase(o);
1604 }
1605
1606 void aio_finish(BlueStore *store) override {
1607 store->txc_aio_finish(this);
1608 }
1609 };
1610
1611 typedef boost::intrusive::list<
1612 TransContext,
1613 boost::intrusive::member_hook<
1614 TransContext,
1615 boost::intrusive::list_member_hook<>,
1616 &TransContext::deferred_queue_item> > deferred_queue_t;
1617
1618 struct DeferredBatch : public AioContext {
1619 OpSequencer *osr;
1620 struct deferred_io {
1621 bufferlist bl; ///< data
1622 uint64_t seq; ///< deferred transaction seq
1623 };
1624 map<uint64_t,deferred_io> iomap; ///< map of ios in this batch
1625 deferred_queue_t txcs; ///< txcs in this batch
1626 IOContext ioc; ///< our aios
1627 /// bytes of pending io for each deferred seq (may be 0)
1628 map<uint64_t,int> seq_bytes;
1629
1630 void _discard(CephContext *cct, uint64_t offset, uint64_t length);
1631 void _audit(CephContext *cct);
1632
1633 DeferredBatch(CephContext *cct, OpSequencer *osr)
1634 : osr(osr), ioc(cct, this) {}
1635
1636 /// prepare a write
1637 void prepare_write(CephContext *cct,
1638 uint64_t seq, uint64_t offset, uint64_t length,
1639 bufferlist::const_iterator& p);
1640
1641 void aio_finish(BlueStore *store) override {
1642 store->_deferred_aio_finish(osr);
1643 }
1644 };
1645
1646 class OpSequencer : public Sequencer_impl {
1647 public:
1648 std::mutex qlock;
1649 std::condition_variable qcond;
1650 typedef boost::intrusive::list<
1651 TransContext,
1652 boost::intrusive::member_hook<
1653 TransContext,
1654 boost::intrusive::list_member_hook<>,
1655 &TransContext::sequencer_item> > q_list_t;
1656 q_list_t q; ///< transactions
1657
1658 boost::intrusive::list_member_hook<> deferred_osr_queue_item;
1659
1660 DeferredBatch *deferred_running = nullptr;
1661 DeferredBatch *deferred_pending = nullptr;
1662
1663 Sequencer *parent;
1664 BlueStore *store;
1665
1666 uint64_t last_seq = 0;
1667
1668 std::atomic_int txc_with_unstable_io = {0}; ///< num txcs with unstable io
1669
1670 std::atomic_int kv_committing_serially = {0};
1671
1672 std::atomic_int kv_submitted_waiters = {0};
1673
1674 std::atomic_bool registered = {true}; ///< registered in BlueStore's osr_set
1675 std::atomic_bool zombie = {false}; ///< owning Sequencer has gone away
1676
1677 OpSequencer(CephContext* cct, BlueStore *store)
1678 : Sequencer_impl(cct),
1679 parent(NULL), store(store) {
1680 store->register_osr(this);
1681 }
1682 ~OpSequencer() override {
1683 assert(q.empty());
1684 _unregister();
1685 }
1686
1687 void discard() override {
1688 // Note that we may have txc's in flight when the parent Sequencer
1689 // goes away. Reflect this with zombie==registered==true and let
1690 // _osr_drain_all clean up later.
1691 assert(!zombie);
1692 zombie = true;
1693 parent = nullptr;
1694 bool empty;
1695 {
1696 std::lock_guard<std::mutex> l(qlock);
1697 empty = q.empty();
1698 }
1699 if (empty) {
1700 _unregister();
1701 }
1702 }
1703
1704 void _unregister() {
1705 if (registered) {
1706 store->unregister_osr(this);
1707 registered = false;
1708 }
1709 }
1710
1711 void queue_new(TransContext *txc) {
1712 std::lock_guard<std::mutex> l(qlock);
1713 txc->seq = ++last_seq;
1714 q.push_back(*txc);
1715 }
1716
1717 void drain() {
1718 std::unique_lock<std::mutex> l(qlock);
1719 while (!q.empty())
1720 qcond.wait(l);
1721 }
1722
1723 void drain_preceding(TransContext *txc) {
1724 std::unique_lock<std::mutex> l(qlock);
1725 while (!q.empty() && &q.front() != txc)
1726 qcond.wait(l);
1727 }
1728
1729 bool _is_all_kv_submitted() {
1730 // caller must hold qlock
1731 if (q.empty()) {
1732 return true;
1733 }
1734 TransContext *txc = &q.back();
1735 if (txc->state >= TransContext::STATE_KV_SUBMITTED) {
1736 return true;
1737 }
1738 return false;
1739 }
1740
1741 void flush() override {
1742 std::unique_lock<std::mutex> l(qlock);
1743 while (true) {
1744 // set flag before the check because the condition
1745 // may become true outside qlock, and we need to make
1746 // sure those threads see waiters and signal qcond.
1747 ++kv_submitted_waiters;
1748 if (_is_all_kv_submitted()) {
1749 return;
1750 }
1751 qcond.wait(l);
1752 --kv_submitted_waiters;
1753 }
1754 }
1755
1756 bool flush_commit(Context *c) override {
1757 std::lock_guard<std::mutex> l(qlock);
1758 if (q.empty()) {
1759 return true;
1760 }
1761 TransContext *txc = &q.back();
1762 if (txc->state >= TransContext::STATE_KV_DONE) {
1763 return true;
1764 }
1765 txc->oncommits.push_back(c);
1766 return false;
1767 }
1768 };
1769
1770 typedef boost::intrusive::list<
1771 OpSequencer,
1772 boost::intrusive::member_hook<
1773 OpSequencer,
1774 boost::intrusive::list_member_hook<>,
1775 &OpSequencer::deferred_osr_queue_item> > deferred_osr_queue_t;
1776
1777 struct KVSyncThread : public Thread {
1778 BlueStore *store;
1779 explicit KVSyncThread(BlueStore *s) : store(s) {}
1780 void *entry() override {
1781 store->_kv_sync_thread();
1782 return NULL;
1783 }
1784 };
1785 struct KVFinalizeThread : public Thread {
1786 BlueStore *store;
1787 explicit KVFinalizeThread(BlueStore *s) : store(s) {}
1788 void *entry() {
1789 store->_kv_finalize_thread();
1790 return NULL;
1791 }
1792 };
1793
1794 struct DBHistogram {
1795 struct value_dist {
1796 uint64_t count;
1797 uint32_t max_len;
1798 };
1799
1800 struct key_dist {
1801 uint64_t count;
1802 uint32_t max_len;
1803 map<int, struct value_dist> val_map; ///< slab id to count, max length of value and key
1804 };
1805
1806 map<string, map<int, struct key_dist> > key_hist;
1807 map<int, uint64_t> value_hist;
1808 int get_key_slab(size_t sz);
1809 string get_key_slab_to_range(int slab);
1810 int get_value_slab(size_t sz);
1811 string get_value_slab_to_range(int slab);
1812 void update_hist_entry(map<string, map<int, struct key_dist> > &key_hist,
1813 const string &prefix, size_t key_size, size_t value_size);
1814 void dump(Formatter *f);
1815 };
1816
1817 // --------------------------------------------------------
1818 // members
1819 private:
1820 BlueFS *bluefs = nullptr;
1821 unsigned bluefs_shared_bdev = 0; ///< which bluefs bdev we are sharing
1822 bool bluefs_single_shared_device = true;
1823 utime_t bluefs_last_balance;
1824
1825 KeyValueDB *db = nullptr;
1826 BlockDevice *bdev = nullptr;
1827 std::string freelist_type;
1828 FreelistManager *fm = nullptr;
1829 Allocator *alloc = nullptr;
1830 uuid_d fsid;
1831 int path_fd = -1; ///< open handle to $path
1832 int fsid_fd = -1; ///< open handle (locked) to $path/fsid
1833 bool mounted = false;
1834
1835 RWLock coll_lock = {"BlueStore::coll_lock"}; ///< rwlock to protect coll_map
1836 mempool::bluestore_cache_other::unordered_map<coll_t, CollectionRef> coll_map;
1837
1838 vector<Cache*> cache_shards;
1839
1840 std::mutex osr_lock; ///< protect osd_set
1841 std::set<OpSequencerRef> osr_set; ///< set of all OpSequencers
1842
1843 std::atomic<uint64_t> nid_last = {0};
1844 std::atomic<uint64_t> nid_max = {0};
1845 std::atomic<uint64_t> blobid_last = {0};
1846 std::atomic<uint64_t> blobid_max = {0};
1847
1848 Throttle throttle_bytes; ///< submit to commit
1849 Throttle throttle_deferred_bytes; ///< submit to deferred complete
1850
1851 interval_set<uint64_t> bluefs_extents; ///< block extents owned by bluefs
1852 interval_set<uint64_t> bluefs_extents_reclaiming; ///< currently reclaiming
1853
1854 std::mutex deferred_lock;
1855 std::atomic<uint64_t> deferred_seq = {0};
1856 deferred_osr_queue_t deferred_queue; ///< osr's with deferred io pending
1857 int deferred_queue_size = 0; ///< num txc's queued across all osrs
1858 atomic_int deferred_aggressive = {0}; ///< aggressive wakeup of kv thread
1859 Finisher deferred_finisher;
1860
1861 int m_finisher_num = 1;
1862 vector<Finisher*> finishers;
1863
1864 KVSyncThread kv_sync_thread;
1865 std::mutex kv_lock;
1866 std::condition_variable kv_cond;
1867 bool _kv_only = false;
1868 bool kv_sync_started = false;
1869 bool kv_stop = false;
1870 bool kv_finalize_started = false;
1871 bool kv_finalize_stop = false;
1872 deque<TransContext*> kv_queue; ///< ready, already submitted
1873 deque<TransContext*> kv_queue_unsubmitted; ///< ready, need submit by kv thread
1874 deque<TransContext*> kv_committing; ///< currently syncing
1875 deque<DeferredBatch*> deferred_done_queue; ///< deferred ios done
1876 deque<DeferredBatch*> deferred_stable_queue; ///< deferred ios done + stable
1877
1878 KVFinalizeThread kv_finalize_thread;
1879 std::mutex kv_finalize_lock;
1880 std::condition_variable kv_finalize_cond;
1881 deque<TransContext*> kv_committing_to_finalize; ///< pending finalization
1882 deque<DeferredBatch*> deferred_stable_to_finalize; ///< pending finalization
1883
1884 PerfCounters *logger = nullptr;
1885
1886 std::mutex reap_lock;
1887 list<CollectionRef> removed_collections;
1888
1889 RWLock debug_read_error_lock = {"BlueStore::debug_read_error_lock"};
1890 set<ghobject_t> debug_data_error_objects;
1891 set<ghobject_t> debug_mdata_error_objects;
1892
1893 std::atomic<int> csum_type = {Checksummer::CSUM_CRC32C};
1894
1895 uint64_t block_size = 0; ///< block size of block device (power of 2)
1896 uint64_t block_mask = 0; ///< mask to get just the block offset
1897 size_t block_size_order = 0; ///< bits to shift to get block size
1898
1899 uint64_t min_alloc_size = 0; ///< minimum allocation unit (power of 2)
1900 ///< bits for min_alloc_size
1901 uint8_t min_alloc_size_order = 0;
1902 static_assert(std::numeric_limits<uint8_t>::max() >
1903 std::numeric_limits<decltype(min_alloc_size)>::digits,
1904 "not enough bits for min_alloc_size");
1905
1906 ///< maximum allocation unit (power of 2)
1907 std::atomic<uint64_t> max_alloc_size = {0};
1908
1909 ///< number threshold for forced deferred writes
1910 std::atomic<int> deferred_batch_ops = {0};
1911
1912 ///< size threshold for forced deferred writes
1913 std::atomic<uint64_t> prefer_deferred_size = {0};
1914
1915 ///< approx cost per io, in bytes
1916 std::atomic<uint64_t> throttle_cost_per_io = {0};
1917
1918 std::atomic<Compressor::CompressionMode> comp_mode =
1919 {Compressor::COMP_NONE}; ///< compression mode
1920 CompressorRef compressor;
1921 std::atomic<uint64_t> comp_min_blob_size = {0};
1922 std::atomic<uint64_t> comp_max_blob_size = {0};
1923
1924 std::atomic<uint64_t> max_blob_size = {0}; ///< maximum blob size
1925
1926 uint64_t kv_ios = 0;
1927 uint64_t kv_throttle_costs = 0;
1928
1929 // cache trim control
1930 uint64_t cache_size = 0; ///< total cache size
1931 float cache_meta_ratio = 0; ///< cache ratio dedicated to metadata
1932 float cache_kv_ratio = 0; ///< cache ratio dedicated to kv (e.g., rocksdb)
1933 float cache_data_ratio = 0; ///< cache ratio dedicated to object data
1934
1935 std::mutex vstatfs_lock;
1936 volatile_statfs vstatfs;
1937
1938 struct MempoolThread : public Thread {
1939 BlueStore *store;
1940 Cond cond;
1941 Mutex lock;
1942 bool stop = false;
1943 public:
1944 explicit MempoolThread(BlueStore *s)
1945 : store(s),
1946 lock("BlueStore::MempoolThread::lock") {}
1947 void *entry() override;
1948 void init() {
1949 assert(stop == false);
1950 create("bstore_mempool");
1951 }
1952 void shutdown() {
1953 lock.Lock();
1954 stop = true;
1955 cond.Signal();
1956 lock.Unlock();
1957 join();
1958 }
1959 } mempool_thread;
1960
1961 // --------------------------------------------------------
1962 // private methods
1963
1964 void _init_logger();
1965 void _shutdown_logger();
1966 int _reload_logger();
1967
1968 int _open_path();
1969 void _close_path();
1970 int _open_fsid(bool create);
1971 int _lock_fsid();
1972 int _read_fsid(uuid_d *f);
1973 int _write_fsid();
1974 void _close_fsid();
1975 void _set_alloc_sizes();
1976 void _set_blob_size();
1977
1978 int _open_bdev(bool create);
1979 void _close_bdev();
1980 int _open_db(bool create);
1981 void _close_db();
1982 int _open_fm(bool create);
1983 void _close_fm();
1984 int _open_alloc();
1985 void _close_alloc();
1986 int _open_collections(int *errors=0);
1987 void _close_collections();
1988
1989 int _setup_block_symlink_or_file(string name, string path, uint64_t size,
1990 bool create);
1991
1992 public:
1993 static int _write_bdev_label(CephContext* cct,
1994 string path, bluestore_bdev_label_t label);
1995 static int _read_bdev_label(CephContext* cct, string path,
1996 bluestore_bdev_label_t *label);
1997 private:
1998 int _check_or_set_bdev_label(string path, uint64_t size, string desc,
1999 bool create);
2000
2001 int _open_super_meta();
2002
2003 void _open_statfs();
2004
2005 int _reconcile_bluefs_freespace();
2006 int _balance_bluefs_freespace(PExtentVector *extents);
2007 void _commit_bluefs_freespace(const PExtentVector& extents);
2008
2009 CollectionRef _get_collection(const coll_t& cid);
2010 void _queue_reap_collection(CollectionRef& c);
2011 void _reap_collections();
2012 void _update_cache_logger();
2013
2014 void _assign_nid(TransContext *txc, OnodeRef o);
2015 uint64_t _assign_blobid(TransContext *txc);
2016
2017 void _dump_onode(OnodeRef o, int log_level=30);
2018 void _dump_extent_map(ExtentMap& em, int log_level=30);
2019 void _dump_transaction(Transaction *t, int log_level = 30);
2020
2021 TransContext *_txc_create(OpSequencer *osr);
2022 void _txc_update_store_statfs(TransContext *txc);
2023 void _txc_add_transaction(TransContext *txc, Transaction *t);
2024 void _txc_calc_cost(TransContext *txc);
2025 void _txc_write_nodes(TransContext *txc, KeyValueDB::Transaction t);
2026 void _txc_state_proc(TransContext *txc);
2027 void _txc_aio_submit(TransContext *txc);
2028 public:
2029 void txc_aio_finish(void *p) {
2030 _txc_state_proc(static_cast<TransContext*>(p));
2031 }
2032 private:
2033 void _txc_finish_io(TransContext *txc);
2034 void _txc_finalize_kv(TransContext *txc, KeyValueDB::Transaction t);
2035 void _txc_applied_kv(TransContext *txc);
2036 void _txc_committed_kv(TransContext *txc);
2037 void _txc_finish(TransContext *txc);
2038 void _txc_release_alloc(TransContext *txc);
2039
2040 void _osr_drain_preceding(TransContext *txc);
2041 void _osr_drain_all();
2042 void _osr_unregister_all();
2043
2044 void _kv_start();
2045 void _kv_stop();
2046 void _kv_sync_thread();
2047 void _kv_finalize_thread();
2048
2049 bluestore_deferred_op_t *_get_deferred_op(TransContext *txc, OnodeRef o);
2050 void _deferred_queue(TransContext *txc);
2051 public:
2052 void deferred_try_submit();
2053 private:
2054 void _deferred_submit_unlock(OpSequencer *osr);
2055 void _deferred_aio_finish(OpSequencer *osr);
2056 int _deferred_replay();
2057
2058 public:
2059 using mempool_dynamic_bitset =
2060 boost::dynamic_bitset<uint64_t,
2061 mempool::bluestore_fsck::pool_allocator<uint64_t>>;
2062
2063 private:
2064 int _fsck_check_extents(
2065 const ghobject_t& oid,
2066 const PExtentVector& extents,
2067 bool compressed,
2068 mempool_dynamic_bitset &used_blocks,
2069 store_statfs_t& expected_statfs);
2070
2071 void _buffer_cache_write(
2072 TransContext *txc,
2073 BlobRef b,
2074 uint64_t offset,
2075 bufferlist& bl,
2076 unsigned flags) {
2077 b->shared_blob->bc.write(b->shared_blob->get_cache(), txc->seq, offset, bl,
2078 flags);
2079 txc->shared_blobs_written.insert(b->shared_blob);
2080 }
2081
2082 int _collection_list(
2083 Collection *c, const ghobject_t& start, const ghobject_t& end,
2084 int max, vector<ghobject_t> *ls, ghobject_t *next);
2085
2086 template <typename T, typename F>
2087 T select_option(const std::string& opt_name, T val1, F f) {
2088 //NB: opt_name reserved for future use
2089 boost::optional<T> val2 = f();
2090 if (val2) {
2091 return *val2;
2092 }
2093 return val1;
2094 }
2095
2096 void _apply_padding(uint64_t head_pad,
2097 uint64_t tail_pad,
2098 bufferlist& padded);
2099
2100 // -- ondisk version ---
2101 public:
2102 const int32_t latest_ondisk_format = 2; ///< our version
2103 const int32_t min_readable_ondisk_format = 1; ///< what we can read
2104 const int32_t min_compat_ondisk_format = 2; ///< who can read us
2105
2106 private:
2107 int32_t ondisk_format = 0; ///< value detected on mount
2108
2109 int _upgrade_super(); ///< upgrade (called during open_super)
2110 void _prepare_ondisk_format_super(KeyValueDB::Transaction& t);
2111
2112 // --- public interface ---
2113 public:
2114 BlueStore(CephContext *cct, const string& path);
2115 BlueStore(CephContext *cct, const string& path, uint64_t min_alloc_size); // Ctor for UT only
2116 ~BlueStore() override;
2117
2118 string get_type() override {
2119 return "bluestore";
2120 }
2121
2122 bool needs_journal() override { return false; };
2123 bool wants_journal() override { return false; };
2124 bool allows_journal() override { return false; };
2125
2126 bool is_rotational() override;
2127 bool is_journal_rotational() override;
2128
2129 string get_default_device_class() override {
2130 string device_class;
2131 map<string, string> metadata;
2132 collect_metadata(&metadata);
2133 auto it = metadata.find("bluestore_bdev_type");
2134 if (it != metadata.end()) {
2135 device_class = it->second;
2136 }
2137 return device_class;
2138 }
2139
2140 static int get_block_device_fsid(CephContext* cct, const string& path,
2141 uuid_d *fsid);
2142
2143 bool test_mount_in_use() override;
2144
2145 private:
2146 int _mount(bool kv_only);
2147 public:
2148 int mount() override {
2149 return _mount(false);
2150 }
2151 int umount() override;
2152
2153 int start_kv_only(KeyValueDB **pdb) {
2154 int r = _mount(true);
2155 if (r < 0)
2156 return r;
2157 *pdb = db;
2158 return 0;
2159 }
2160
2161 int write_meta(const std::string& key, const std::string& value) override;
2162 int read_meta(const std::string& key, std::string *value) override;
2163
2164
2165 int fsck(bool deep) override {
2166 return _fsck(deep, false);
2167 }
2168 int repair(bool deep) override {
2169 return _fsck(deep, true);
2170 }
2171 int _fsck(bool deep, bool repair);
2172
2173 void set_cache_shards(unsigned num) override;
2174
2175 int validate_hobject_key(const hobject_t &obj) const override {
2176 return 0;
2177 }
2178 unsigned get_max_attr_name_length() override {
2179 return 256; // arbitrary; there is no real limit internally
2180 }
2181
2182 int mkfs() override;
2183 int mkjournal() override {
2184 return 0;
2185 }
2186
2187 void get_db_statistics(Formatter *f) override;
2188 void generate_db_histogram(Formatter *f) override;
2189 void _flush_cache();
2190 void flush_cache() override;
2191 void dump_perf_counters(Formatter *f) override {
2192 f->open_object_section("perf_counters");
2193 logger->dump_formatted(f, false);
2194 f->close_section();
2195 }
2196
2197 void register_osr(OpSequencer *osr) {
2198 std::lock_guard<std::mutex> l(osr_lock);
2199 osr_set.insert(osr);
2200 }
2201 void unregister_osr(OpSequencer *osr) {
2202 std::lock_guard<std::mutex> l(osr_lock);
2203 osr_set.erase(osr);
2204 }
2205
2206 public:
2207 int statfs(struct store_statfs_t *buf) override;
2208
2209 void collect_metadata(map<string,string> *pm) override;
2210
2211 bool exists(const coll_t& cid, const ghobject_t& oid) override;
2212 bool exists(CollectionHandle &c, const ghobject_t& oid) override;
2213 int set_collection_opts(
2214 const coll_t& cid,
2215 const pool_opts_t& opts) override;
2216 int stat(
2217 const coll_t& cid,
2218 const ghobject_t& oid,
2219 struct stat *st,
2220 bool allow_eio = false) override;
2221 int stat(
2222 CollectionHandle &c,
2223 const ghobject_t& oid,
2224 struct stat *st,
2225 bool allow_eio = false) override;
2226 int read(
2227 const coll_t& cid,
2228 const ghobject_t& oid,
2229 uint64_t offset,
2230 size_t len,
2231 bufferlist& bl,
2232 uint32_t op_flags = 0) override;
2233 int read(
2234 CollectionHandle &c,
2235 const ghobject_t& oid,
2236 uint64_t offset,
2237 size_t len,
2238 bufferlist& bl,
2239 uint32_t op_flags = 0) override;
2240 int _do_read(
2241 Collection *c,
2242 OnodeRef o,
2243 uint64_t offset,
2244 size_t len,
2245 bufferlist& bl,
2246 uint32_t op_flags = 0);
2247
2248 private:
2249 int _fiemap(CollectionHandle &c_, const ghobject_t& oid,
2250 uint64_t offset, size_t len, interval_set<uint64_t>& destset);
2251 public:
2252 int fiemap(const coll_t& cid, const ghobject_t& oid,
2253 uint64_t offset, size_t len, bufferlist& bl) override;
2254 int fiemap(CollectionHandle &c, const ghobject_t& oid,
2255 uint64_t offset, size_t len, bufferlist& bl) override;
2256 int fiemap(const coll_t& cid, const ghobject_t& oid,
2257 uint64_t offset, size_t len, map<uint64_t, uint64_t>& destmap) override;
2258 int fiemap(CollectionHandle &c, const ghobject_t& oid,
2259 uint64_t offset, size_t len, map<uint64_t, uint64_t>& destmap) override;
2260
2261
2262 int getattr(const coll_t& cid, const ghobject_t& oid, const char *name,
2263 bufferptr& value) override;
2264 int getattr(CollectionHandle &c, const ghobject_t& oid, const char *name,
2265 bufferptr& value) override;
2266
2267 int getattrs(const coll_t& cid, const ghobject_t& oid,
2268 map<string,bufferptr>& aset) override;
2269 int getattrs(CollectionHandle &c, const ghobject_t& oid,
2270 map<string,bufferptr>& aset) override;
2271
2272 int list_collections(vector<coll_t>& ls) override;
2273
2274 CollectionHandle open_collection(const coll_t &c) override;
2275
2276 bool collection_exists(const coll_t& c) override;
2277 int collection_empty(const coll_t& c, bool *empty) override;
2278 int collection_bits(const coll_t& c) override;
2279
2280 int collection_list(const coll_t& cid,
2281 const ghobject_t& start,
2282 const ghobject_t& end,
2283 int max,
2284 vector<ghobject_t> *ls, ghobject_t *next) override;
2285 int collection_list(CollectionHandle &c,
2286 const ghobject_t& start,
2287 const ghobject_t& end,
2288 int max,
2289 vector<ghobject_t> *ls, ghobject_t *next) override;
2290
2291 int omap_get(
2292 const coll_t& cid, ///< [in] Collection containing oid
2293 const ghobject_t &oid, ///< [in] Object containing omap
2294 bufferlist *header, ///< [out] omap header
2295 map<string, bufferlist> *out /// < [out] Key to value map
2296 ) override;
2297 int omap_get(
2298 CollectionHandle &c, ///< [in] Collection containing oid
2299 const ghobject_t &oid, ///< [in] Object containing omap
2300 bufferlist *header, ///< [out] omap header
2301 map<string, bufferlist> *out /// < [out] Key to value map
2302 ) override;
2303
2304 /// Get omap header
2305 int omap_get_header(
2306 const coll_t& cid, ///< [in] Collection containing oid
2307 const ghobject_t &oid, ///< [in] Object containing omap
2308 bufferlist *header, ///< [out] omap header
2309 bool allow_eio = false ///< [in] don't assert on eio
2310 ) override;
2311 int omap_get_header(
2312 CollectionHandle &c, ///< [in] Collection containing oid
2313 const ghobject_t &oid, ///< [in] Object containing omap
2314 bufferlist *header, ///< [out] omap header
2315 bool allow_eio = false ///< [in] don't assert on eio
2316 ) override;
2317
2318 /// Get keys defined on oid
2319 int omap_get_keys(
2320 const coll_t& cid, ///< [in] Collection containing oid
2321 const ghobject_t &oid, ///< [in] Object containing omap
2322 set<string> *keys ///< [out] Keys defined on oid
2323 ) override;
2324 int omap_get_keys(
2325 CollectionHandle &c, ///< [in] Collection containing oid
2326 const ghobject_t &oid, ///< [in] Object containing omap
2327 set<string> *keys ///< [out] Keys defined on oid
2328 ) override;
2329
2330 /// Get key values
2331 int omap_get_values(
2332 const coll_t& cid, ///< [in] Collection containing oid
2333 const ghobject_t &oid, ///< [in] Object containing omap
2334 const set<string> &keys, ///< [in] Keys to get
2335 map<string, bufferlist> *out ///< [out] Returned keys and values
2336 ) override;
2337 int omap_get_values(
2338 CollectionHandle &c, ///< [in] Collection containing oid
2339 const ghobject_t &oid, ///< [in] Object containing omap
2340 const set<string> &keys, ///< [in] Keys to get
2341 map<string, bufferlist> *out ///< [out] Returned keys and values
2342 ) override;
2343
2344 /// Filters keys into out which are defined on oid
2345 int omap_check_keys(
2346 const coll_t& cid, ///< [in] Collection containing oid
2347 const ghobject_t &oid, ///< [in] Object containing omap
2348 const set<string> &keys, ///< [in] Keys to check
2349 set<string> *out ///< [out] Subset of keys defined on oid
2350 ) override;
2351 int omap_check_keys(
2352 CollectionHandle &c, ///< [in] Collection containing oid
2353 const ghobject_t &oid, ///< [in] Object containing omap
2354 const set<string> &keys, ///< [in] Keys to check
2355 set<string> *out ///< [out] Subset of keys defined on oid
2356 ) override;
2357
2358 ObjectMap::ObjectMapIterator get_omap_iterator(
2359 const coll_t& cid, ///< [in] collection
2360 const ghobject_t &oid ///< [in] object
2361 ) override;
2362 ObjectMap::ObjectMapIterator get_omap_iterator(
2363 CollectionHandle &c, ///< [in] collection
2364 const ghobject_t &oid ///< [in] object
2365 ) override;
2366
2367 void set_fsid(uuid_d u) override {
2368 fsid = u;
2369 }
2370 uuid_d get_fsid() override {
2371 return fsid;
2372 }
2373
2374 uint64_t estimate_objects_overhead(uint64_t num_objects) override {
2375 return num_objects * 300; //assuming per-object overhead is 300 bytes
2376 }
2377
2378 struct BSPerfTracker {
2379 PerfCounters::avg_tracker<uint64_t> os_commit_latency;
2380 PerfCounters::avg_tracker<uint64_t> os_apply_latency;
2381
2382 objectstore_perf_stat_t get_cur_stats() const {
2383 objectstore_perf_stat_t ret;
2384 ret.os_commit_latency = os_commit_latency.current_avg();
2385 ret.os_apply_latency = os_apply_latency.current_avg();
2386 return ret;
2387 }
2388
2389 void update_from_perfcounters(PerfCounters &logger);
2390 } perf_tracker;
2391
2392 objectstore_perf_stat_t get_cur_stats() override {
2393 perf_tracker.update_from_perfcounters(*logger);
2394 return perf_tracker.get_cur_stats();
2395 }
2396 const PerfCounters* get_perf_counters() const override {
2397 return logger;
2398 }
2399
2400 int queue_transactions(
2401 Sequencer *osr,
2402 vector<Transaction>& tls,
2403 TrackedOpRef op = TrackedOpRef(),
2404 ThreadPool::TPHandle *handle = NULL) override;
2405
2406 // error injection
2407 void inject_data_error(const ghobject_t& o) override {
2408 RWLock::WLocker l(debug_read_error_lock);
2409 debug_data_error_objects.insert(o);
2410 }
2411 void inject_mdata_error(const ghobject_t& o) override {
2412 RWLock::WLocker l(debug_read_error_lock);
2413 debug_mdata_error_objects.insert(o);
2414 }
2415 void compact() override {
2416 assert(db);
2417 db->compact();
2418 }
2419
2420 private:
2421 bool _debug_data_eio(const ghobject_t& o) {
2422 if (!cct->_conf->bluestore_debug_inject_read_err) {
2423 return false;
2424 }
2425 RWLock::RLocker l(debug_read_error_lock);
2426 return debug_data_error_objects.count(o);
2427 }
2428 bool _debug_mdata_eio(const ghobject_t& o) {
2429 if (!cct->_conf->bluestore_debug_inject_read_err) {
2430 return false;
2431 }
2432 RWLock::RLocker l(debug_read_error_lock);
2433 return debug_mdata_error_objects.count(o);
2434 }
2435 void _debug_obj_on_delete(const ghobject_t& o) {
2436 if (cct->_conf->bluestore_debug_inject_read_err) {
2437 RWLock::WLocker l(debug_read_error_lock);
2438 debug_data_error_objects.erase(o);
2439 debug_mdata_error_objects.erase(o);
2440 }
2441 }
2442
2443 private:
2444
2445 // --------------------------------------------------------
2446 // read processing internal methods
2447 int _verify_csum(
2448 OnodeRef& o,
2449 const bluestore_blob_t* blob,
2450 uint64_t blob_xoffset,
2451 const bufferlist& bl,
2452 uint64_t logical_offset) const;
2453 int _decompress(bufferlist& source, bufferlist* result);
2454
2455
2456 // --------------------------------------------------------
2457 // write ops
2458
2459 struct WriteContext {
2460 bool buffered = false; ///< buffered write
2461 bool compress = false; ///< compressed write
2462 uint64_t target_blob_size = 0; ///< target (max) blob size
2463 unsigned csum_order = 0; ///< target checksum chunk order
2464
2465 old_extent_map_t old_extents; ///< must deref these blobs
2466
2467 struct write_item {
2468 uint64_t logical_offset; ///< write logical offset
2469 BlobRef b;
2470 uint64_t blob_length;
2471 uint64_t b_off;
2472 bufferlist bl;
2473 uint64_t b_off0; ///< original offset in a blob prior to padding
2474 uint64_t length0; ///< original data length prior to padding
2475
2476 bool mark_unused;
2477 bool new_blob; ///< whether new blob was created
2478
2479 bool compressed = false;
2480 bufferlist compressed_bl;
2481 size_t compressed_len = 0;
2482
2483 write_item(
2484 uint64_t logical_offs,
2485 BlobRef b,
2486 uint64_t blob_len,
2487 uint64_t o,
2488 bufferlist& bl,
2489 uint64_t o0,
2490 uint64_t l0,
2491 bool _mark_unused,
2492 bool _new_blob)
2493 :
2494 logical_offset(logical_offs),
2495 b(b),
2496 blob_length(blob_len),
2497 b_off(o),
2498 bl(bl),
2499 b_off0(o0),
2500 length0(l0),
2501 mark_unused(_mark_unused),
2502 new_blob(_new_blob) {}
2503 };
2504 vector<write_item> writes; ///< blobs we're writing
2505
2506 /// partial clone of the context
2507 void fork(const WriteContext& other) {
2508 buffered = other.buffered;
2509 compress = other.compress;
2510 target_blob_size = other.target_blob_size;
2511 csum_order = other.csum_order;
2512 }
2513 void write(
2514 uint64_t loffs,
2515 BlobRef b,
2516 uint64_t blob_len,
2517 uint64_t o,
2518 bufferlist& bl,
2519 uint64_t o0,
2520 uint64_t len0,
2521 bool _mark_unused,
2522 bool _new_blob) {
2523 writes.emplace_back(loffs,
2524 b,
2525 blob_len,
2526 o,
2527 bl,
2528 o0,
2529 len0,
2530 _mark_unused,
2531 _new_blob);
2532 }
2533 /// Checks for writes to the same pextent within a blob
2534 bool has_conflict(
2535 BlobRef b,
2536 uint64_t loffs,
2537 uint64_t loffs_end,
2538 uint64_t min_alloc_size);
2539 };
2540
2541 void _do_write_small(
2542 TransContext *txc,
2543 CollectionRef &c,
2544 OnodeRef o,
2545 uint64_t offset, uint64_t length,
2546 bufferlist::iterator& blp,
2547 WriteContext *wctx);
2548 void _do_write_big(
2549 TransContext *txc,
2550 CollectionRef &c,
2551 OnodeRef o,
2552 uint64_t offset, uint64_t length,
2553 bufferlist::iterator& blp,
2554 WriteContext *wctx);
2555 int _do_alloc_write(
2556 TransContext *txc,
2557 CollectionRef c,
2558 OnodeRef o,
2559 WriteContext *wctx);
2560 void _wctx_finish(
2561 TransContext *txc,
2562 CollectionRef& c,
2563 OnodeRef o,
2564 WriteContext *wctx,
2565 set<SharedBlob*> *maybe_unshared_blobs=0);
2566
2567 int _do_transaction(Transaction *t,
2568 TransContext *txc,
2569 ThreadPool::TPHandle *handle);
2570
2571 int _write(TransContext *txc,
2572 CollectionRef& c,
2573 OnodeRef& o,
2574 uint64_t offset, size_t len,
2575 bufferlist& bl,
2576 uint32_t fadvise_flags);
2577 void _pad_zeros(bufferlist *bl, uint64_t *offset,
2578 uint64_t chunk_size);
2579
2580 void _choose_write_options(CollectionRef& c,
2581 OnodeRef o,
2582 uint32_t fadvise_flags,
2583 WriteContext *wctx);
2584
2585 int _do_gc(TransContext *txc,
2586 CollectionRef& c,
2587 OnodeRef o,
2588 const GarbageCollector& gc,
2589 const WriteContext& wctx,
2590 uint64_t *dirty_start,
2591 uint64_t *dirty_end);
2592
2593 int _do_write(TransContext *txc,
2594 CollectionRef &c,
2595 OnodeRef o,
2596 uint64_t offset, uint64_t length,
2597 bufferlist& bl,
2598 uint32_t fadvise_flags);
2599 void _do_write_data(TransContext *txc,
2600 CollectionRef& c,
2601 OnodeRef o,
2602 uint64_t offset,
2603 uint64_t length,
2604 bufferlist& bl,
2605 WriteContext *wctx);
2606
2607 int _touch(TransContext *txc,
2608 CollectionRef& c,
2609 OnodeRef& o);
2610 int _do_zero(TransContext *txc,
2611 CollectionRef& c,
2612 OnodeRef& o,
2613 uint64_t offset, size_t len);
2614 int _zero(TransContext *txc,
2615 CollectionRef& c,
2616 OnodeRef& o,
2617 uint64_t offset, size_t len);
2618 void _do_truncate(TransContext *txc,
2619 CollectionRef& c,
2620 OnodeRef o,
2621 uint64_t offset,
2622 set<SharedBlob*> *maybe_unshared_blobs=0);
2623 int _truncate(TransContext *txc,
2624 CollectionRef& c,
2625 OnodeRef& o,
2626 uint64_t offset);
2627 int _remove(TransContext *txc,
2628 CollectionRef& c,
2629 OnodeRef& o);
2630 int _do_remove(TransContext *txc,
2631 CollectionRef& c,
2632 OnodeRef o);
2633 int _setattr(TransContext *txc,
2634 CollectionRef& c,
2635 OnodeRef& o,
2636 const string& name,
2637 bufferptr& val);
2638 int _setattrs(TransContext *txc,
2639 CollectionRef& c,
2640 OnodeRef& o,
2641 const map<string,bufferptr>& aset);
2642 int _rmattr(TransContext *txc,
2643 CollectionRef& c,
2644 OnodeRef& o,
2645 const string& name);
2646 int _rmattrs(TransContext *txc,
2647 CollectionRef& c,
2648 OnodeRef& o);
2649 void _do_omap_clear(TransContext *txc, uint64_t id);
2650 int _omap_clear(TransContext *txc,
2651 CollectionRef& c,
2652 OnodeRef& o);
2653 int _omap_setkeys(TransContext *txc,
2654 CollectionRef& c,
2655 OnodeRef& o,
2656 bufferlist& bl);
2657 int _omap_setheader(TransContext *txc,
2658 CollectionRef& c,
2659 OnodeRef& o,
2660 bufferlist& header);
2661 int _omap_rmkeys(TransContext *txc,
2662 CollectionRef& c,
2663 OnodeRef& o,
2664 bufferlist& bl);
2665 int _omap_rmkey_range(TransContext *txc,
2666 CollectionRef& c,
2667 OnodeRef& o,
2668 const string& first, const string& last);
2669 int _set_alloc_hint(
2670 TransContext *txc,
2671 CollectionRef& c,
2672 OnodeRef& o,
2673 uint64_t expected_object_size,
2674 uint64_t expected_write_size,
2675 uint32_t flags);
2676 int _do_clone_range(TransContext *txc,
2677 CollectionRef& c,
2678 OnodeRef& oldo,
2679 OnodeRef& newo,
2680 uint64_t srcoff, uint64_t length, uint64_t dstoff);
2681 int _clone(TransContext *txc,
2682 CollectionRef& c,
2683 OnodeRef& oldo,
2684 OnodeRef& newo);
2685 int _clone_range(TransContext *txc,
2686 CollectionRef& c,
2687 OnodeRef& oldo,
2688 OnodeRef& newo,
2689 uint64_t srcoff, uint64_t length, uint64_t dstoff);
2690 int _rename(TransContext *txc,
2691 CollectionRef& c,
2692 OnodeRef& oldo,
2693 OnodeRef& newo,
2694 const ghobject_t& new_oid);
2695 int _create_collection(TransContext *txc, const coll_t &cid,
2696 unsigned bits, CollectionRef *c);
2697 int _remove_collection(TransContext *txc, const coll_t &cid,
2698 CollectionRef *c);
2699 int _split_collection(TransContext *txc,
2700 CollectionRef& c,
2701 CollectionRef& d,
2702 unsigned bits, int rem);
2703 };
2704
2705 inline ostream& operator<<(ostream& out, const BlueStore::OpSequencer& s) {
2706 return out << *s.parent;
2707 }
2708
2709 static inline void intrusive_ptr_add_ref(BlueStore::Onode *o) {
2710 o->get();
2711 }
2712 static inline void intrusive_ptr_release(BlueStore::Onode *o) {
2713 o->put();
2714 }
2715
2716 static inline void intrusive_ptr_add_ref(BlueStore::OpSequencer *o) {
2717 o->get();
2718 }
2719 static inline void intrusive_ptr_release(BlueStore::OpSequencer *o) {
2720 o->put();
2721 }
2722
2723 #endif