]> git.proxmox.com Git - ceph.git/blob - ceph/src/os/bluestore/BlueStore.h
update sources to 12.2.8
[ceph.git] / ceph / src / os / bluestore / BlueStore.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_OSD_BLUESTORE_H
16 #define CEPH_OSD_BLUESTORE_H
17
18 #include "acconfig.h"
19
20 #include <unistd.h>
21
22 #include <atomic>
23 #include <mutex>
24 #include <condition_variable>
25
26 #include <boost/intrusive/list.hpp>
27 #include <boost/intrusive/unordered_set.hpp>
28 #include <boost/intrusive/set.hpp>
29 #include <boost/functional/hash.hpp>
30 #include <boost/dynamic_bitset.hpp>
31
32 #include "include/assert.h"
33 #include "include/unordered_map.h"
34 #include "include/memory.h"
35 #include "include/mempool.h"
36 #include "common/Finisher.h"
37 #include "common/perf_counters.h"
38 #include "compressor/Compressor.h"
39 #include "os/ObjectStore.h"
40
41 #include "bluestore_types.h"
42 #include "BlockDevice.h"
43 #include "common/EventTrace.h"
44
45 class Allocator;
46 class FreelistManager;
47 class BlueFS;
48
49 //#define DEBUG_CACHE
50 //#define DEBUG_DEFERRED
51
52
53
54 // constants for Buffer::optimize()
55 #define MAX_BUFFER_SLOP_RATIO_DEN 8 // so actually 1/N
56
57
58 enum {
59 l_bluestore_first = 732430,
60 l_bluestore_kv_flush_lat,
61 l_bluestore_kv_commit_lat,
62 l_bluestore_kv_lat,
63 l_bluestore_state_prepare_lat,
64 l_bluestore_state_aio_wait_lat,
65 l_bluestore_state_io_done_lat,
66 l_bluestore_state_kv_queued_lat,
67 l_bluestore_state_kv_committing_lat,
68 l_bluestore_state_kv_done_lat,
69 l_bluestore_state_deferred_queued_lat,
70 l_bluestore_state_deferred_aio_wait_lat,
71 l_bluestore_state_deferred_cleanup_lat,
72 l_bluestore_state_finishing_lat,
73 l_bluestore_state_done_lat,
74 l_bluestore_throttle_lat,
75 l_bluestore_submit_lat,
76 l_bluestore_commit_lat,
77 l_bluestore_read_lat,
78 l_bluestore_read_onode_meta_lat,
79 l_bluestore_read_wait_aio_lat,
80 l_bluestore_compress_lat,
81 l_bluestore_decompress_lat,
82 l_bluestore_csum_lat,
83 l_bluestore_compress_success_count,
84 l_bluestore_compress_rejected_count,
85 l_bluestore_write_pad_bytes,
86 l_bluestore_deferred_write_ops,
87 l_bluestore_deferred_write_bytes,
88 l_bluestore_write_penalty_read_ops,
89 l_bluestore_allocated,
90 l_bluestore_stored,
91 l_bluestore_compressed,
92 l_bluestore_compressed_allocated,
93 l_bluestore_compressed_original,
94 l_bluestore_onodes,
95 l_bluestore_onode_hits,
96 l_bluestore_onode_misses,
97 l_bluestore_onode_shard_hits,
98 l_bluestore_onode_shard_misses,
99 l_bluestore_extents,
100 l_bluestore_blobs,
101 l_bluestore_buffers,
102 l_bluestore_buffer_bytes,
103 l_bluestore_buffer_hit_bytes,
104 l_bluestore_buffer_miss_bytes,
105 l_bluestore_write_big,
106 l_bluestore_write_big_bytes,
107 l_bluestore_write_big_blobs,
108 l_bluestore_write_small,
109 l_bluestore_write_small_bytes,
110 l_bluestore_write_small_unused,
111 l_bluestore_write_small_deferred,
112 l_bluestore_write_small_pre_read,
113 l_bluestore_write_small_new,
114 l_bluestore_txc,
115 l_bluestore_onode_reshard,
116 l_bluestore_blob_split,
117 l_bluestore_extent_compress,
118 l_bluestore_gc_merged,
119 l_bluestore_read_eio,
120 l_bluestore_last
121 };
122
123 class BlueStore : public ObjectStore,
124 public md_config_obs_t {
125 // -----------------------------------------------------
126 // types
127 public:
128 // config observer
129 const char** get_tracked_conf_keys() const override;
130 void handle_conf_change(const struct md_config_t *conf,
131 const std::set<std::string> &changed) override;
132
133 void _set_csum();
134 void _set_compression();
135 void _set_throttle_params();
136 int _set_cache_sizes();
137
138 class TransContext;
139
140 typedef map<uint64_t, bufferlist> ready_regions_t;
141
142 struct BufferSpace;
143 struct Collection;
144 typedef boost::intrusive_ptr<Collection> CollectionRef;
145
146 struct AioContext {
147 virtual void aio_finish(BlueStore *store) = 0;
148 virtual ~AioContext() {}
149 };
150
151 /// cached buffer
152 struct Buffer {
153 MEMPOOL_CLASS_HELPERS();
154
155 enum {
156 STATE_EMPTY, ///< empty buffer -- used for cache history
157 STATE_CLEAN, ///< clean data that is up to date
158 STATE_WRITING, ///< data that is being written (io not yet complete)
159 };
160 static const char *get_state_name(int s) {
161 switch (s) {
162 case STATE_EMPTY: return "empty";
163 case STATE_CLEAN: return "clean";
164 case STATE_WRITING: return "writing";
165 default: return "???";
166 }
167 }
168 enum {
169 FLAG_NOCACHE = 1, ///< trim when done WRITING (do not become CLEAN)
170 // NOTE: fix operator<< when you define a second flag
171 };
172 static const char *get_flag_name(int s) {
173 switch (s) {
174 case FLAG_NOCACHE: return "nocache";
175 default: return "???";
176 }
177 }
178
179 BufferSpace *space;
180 uint16_t state; ///< STATE_*
181 uint16_t cache_private = 0; ///< opaque (to us) value used by Cache impl
182 uint32_t flags; ///< FLAG_*
183 uint64_t seq;
184 uint32_t offset, length;
185 bufferlist data;
186
187 boost::intrusive::list_member_hook<> lru_item;
188 boost::intrusive::list_member_hook<> state_item;
189
190 Buffer(BufferSpace *space, unsigned s, uint64_t q, uint32_t o, uint32_t l,
191 unsigned f = 0)
192 : space(space), state(s), flags(f), seq(q), offset(o), length(l) {}
193 Buffer(BufferSpace *space, unsigned s, uint64_t q, uint32_t o, bufferlist& b,
194 unsigned f = 0)
195 : space(space), state(s), flags(f), seq(q), offset(o),
196 length(b.length()), data(b) {}
197
198 bool is_empty() const {
199 return state == STATE_EMPTY;
200 }
201 bool is_clean() const {
202 return state == STATE_CLEAN;
203 }
204 bool is_writing() const {
205 return state == STATE_WRITING;
206 }
207
208 uint32_t end() const {
209 return offset + length;
210 }
211
212 void truncate(uint32_t newlen) {
213 assert(newlen < length);
214 if (data.length()) {
215 bufferlist t;
216 t.substr_of(data, 0, newlen);
217 data.claim(t);
218 }
219 length = newlen;
220 }
221 void maybe_rebuild() {
222 if (data.length() &&
223 (data.get_num_buffers() > 1 ||
224 data.front().wasted() > data.length() / MAX_BUFFER_SLOP_RATIO_DEN)) {
225 data.rebuild();
226 }
227 }
228
229 void dump(Formatter *f) const {
230 f->dump_string("state", get_state_name(state));
231 f->dump_unsigned("seq", seq);
232 f->dump_unsigned("offset", offset);
233 f->dump_unsigned("length", length);
234 f->dump_unsigned("data_length", data.length());
235 }
236 };
237
238 struct Cache;
239
240 /// map logical extent range (object) onto buffers
241 struct BufferSpace {
242 typedef boost::intrusive::list<
243 Buffer,
244 boost::intrusive::member_hook<
245 Buffer,
246 boost::intrusive::list_member_hook<>,
247 &Buffer::state_item> > state_list_t;
248
249 mempool::bluestore_cache_other::map<uint32_t, std::unique_ptr<Buffer>>
250 buffer_map;
251
252 // we use a bare intrusive list here instead of std::map because
253 // it uses less memory and we expect this to be very small (very
254 // few IOs in flight to the same Blob at the same time).
255 state_list_t writing; ///< writing buffers, sorted by seq, ascending
256
257 ~BufferSpace() {
258 assert(buffer_map.empty());
259 assert(writing.empty());
260 }
261
262 void _add_buffer(Cache* cache, Buffer *b, int level, Buffer *near) {
263 cache->_audit("_add_buffer start");
264 buffer_map[b->offset].reset(b);
265 if (b->is_writing()) {
266 b->data.reassign_to_mempool(mempool::mempool_bluestore_writing);
267 if (writing.empty() || writing.rbegin()->seq <= b->seq) {
268 writing.push_back(*b);
269 } else {
270 auto it = writing.begin();
271 while (it->seq < b->seq) {
272 ++it;
273 }
274
275 assert(it->seq >= b->seq);
276 // note that this will insert b before it
277 // hence the order is maintained
278 writing.insert(it, *b);
279 }
280 } else {
281 b->data.reassign_to_mempool(mempool::mempool_bluestore_cache_data);
282 cache->_add_buffer(b, level, near);
283 }
284 cache->_audit("_add_buffer end");
285 }
286 void _rm_buffer(Cache* cache, Buffer *b) {
287 _rm_buffer(cache, buffer_map.find(b->offset));
288 }
289 void _rm_buffer(Cache* cache,
290 map<uint32_t, std::unique_ptr<Buffer>>::iterator p) {
291 assert(p != buffer_map.end());
292 cache->_audit("_rm_buffer start");
293 if (p->second->is_writing()) {
294 writing.erase(writing.iterator_to(*p->second));
295 } else {
296 cache->_rm_buffer(p->second.get());
297 }
298 buffer_map.erase(p);
299 cache->_audit("_rm_buffer end");
300 }
301
302 map<uint32_t,std::unique_ptr<Buffer>>::iterator _data_lower_bound(
303 uint32_t offset) {
304 auto i = buffer_map.lower_bound(offset);
305 if (i != buffer_map.begin()) {
306 --i;
307 if (i->first + i->second->length <= offset)
308 ++i;
309 }
310 return i;
311 }
312
313 // must be called under protection of the Cache lock
314 void _clear(Cache* cache);
315
316 // return value is the highest cache_private of a trimmed buffer, or 0.
317 int discard(Cache* cache, uint32_t offset, uint32_t length) {
318 std::lock_guard<std::recursive_mutex> l(cache->lock);
319 return _discard(cache, offset, length);
320 }
321 int _discard(Cache* cache, uint32_t offset, uint32_t length);
322
323 void write(Cache* cache, uint64_t seq, uint32_t offset, bufferlist& bl,
324 unsigned flags) {
325 std::lock_guard<std::recursive_mutex> l(cache->lock);
326 Buffer *b = new Buffer(this, Buffer::STATE_WRITING, seq, offset, bl,
327 flags);
328 b->cache_private = _discard(cache, offset, bl.length());
329 _add_buffer(cache, b, (flags & Buffer::FLAG_NOCACHE) ? 0 : 1, nullptr);
330 }
331 void finish_write(Cache* cache, uint64_t seq);
332 void did_read(Cache* cache, uint32_t offset, bufferlist& bl) {
333 std::lock_guard<std::recursive_mutex> l(cache->lock);
334 Buffer *b = new Buffer(this, Buffer::STATE_CLEAN, 0, offset, bl);
335 b->cache_private = _discard(cache, offset, bl.length());
336 _add_buffer(cache, b, 1, nullptr);
337 }
338
339 void read(Cache* cache, uint32_t offset, uint32_t length,
340 BlueStore::ready_regions_t& res,
341 interval_set<uint32_t>& res_intervals);
342
343 void truncate(Cache* cache, uint32_t offset) {
344 discard(cache, offset, (uint32_t)-1 - offset);
345 }
346
347 void split(Cache* cache, size_t pos, BufferSpace &r);
348
349 void dump(Cache* cache, Formatter *f) const {
350 std::lock_guard<std::recursive_mutex> l(cache->lock);
351 f->open_array_section("buffers");
352 for (auto& i : buffer_map) {
353 f->open_object_section("buffer");
354 assert(i.first == i.second->offset);
355 i.second->dump(f);
356 f->close_section();
357 }
358 f->close_section();
359 }
360 };
361
362 struct SharedBlobSet;
363
364 /// in-memory shared blob state (incl cached buffers)
365 struct SharedBlob {
366 MEMPOOL_CLASS_HELPERS();
367
368 std::atomic_int nref = {0}; ///< reference count
369 bool loaded = false;
370
371 CollectionRef coll;
372 union {
373 uint64_t sbid_unloaded; ///< sbid if persistent isn't loaded
374 bluestore_shared_blob_t *persistent; ///< persistent part of the shared blob if any
375 };
376 BufferSpace bc; ///< buffer cache
377
378 SharedBlob(Collection *_coll) : coll(_coll), sbid_unloaded(0) {
379 if (get_cache()) {
380 get_cache()->add_blob();
381 }
382 }
383 SharedBlob(uint64_t i, Collection *_coll);
384 ~SharedBlob();
385
386 uint64_t get_sbid() const {
387 return loaded ? persistent->sbid : sbid_unloaded;
388 }
389
390 friend void intrusive_ptr_add_ref(SharedBlob *b) { b->get(); }
391 friend void intrusive_ptr_release(SharedBlob *b) { b->put(); }
392
393 friend ostream& operator<<(ostream& out, const SharedBlob& sb);
394
395 void get() {
396 ++nref;
397 }
398 void put();
399
400 /// get logical references
401 void get_ref(uint64_t offset, uint32_t length);
402
403 /// put logical references, and get back any released extents
404 void put_ref(uint64_t offset, uint32_t length,
405 PExtentVector *r, set<SharedBlob*> *maybe_unshared_blobs);
406
407 friend bool operator==(const SharedBlob &l, const SharedBlob &r) {
408 return l.get_sbid() == r.get_sbid();
409 }
410 inline Cache* get_cache() {
411 return coll ? coll->cache : nullptr;
412 }
413 inline SharedBlobSet* get_parent() {
414 return coll ? &(coll->shared_blob_set) : nullptr;
415 }
416 inline bool is_loaded() const {
417 return loaded;
418 }
419
420 };
421 typedef boost::intrusive_ptr<SharedBlob> SharedBlobRef;
422
423 /// a lookup table of SharedBlobs
424 struct SharedBlobSet {
425 std::mutex lock; ///< protect lookup, insertion, removal
426
427 // we use a bare pointer because we don't want to affect the ref
428 // count
429 mempool::bluestore_cache_other::unordered_map<uint64_t,SharedBlob*> sb_map;
430
431 SharedBlobRef lookup(uint64_t sbid) {
432 std::lock_guard<std::mutex> l(lock);
433 auto p = sb_map.find(sbid);
434 if (p == sb_map.end() ||
435 p->second->nref == 0) {
436 return nullptr;
437 }
438 return p->second;
439 }
440
441 void add(Collection* coll, SharedBlob *sb) {
442 std::lock_guard<std::mutex> l(lock);
443 sb_map[sb->get_sbid()] = sb;
444 sb->coll = coll;
445 }
446
447 void remove(SharedBlob *sb) {
448 std::lock_guard<std::mutex> l(lock);
449 assert(sb->get_parent() == this);
450 // only remove if it still points to us
451 auto p = sb_map.find(sb->get_sbid());
452 if (p != sb_map.end() &&
453 p->second == sb) {
454 sb_map.erase(p);
455 }
456 }
457
458 bool empty() {
459 std::lock_guard<std::mutex> l(lock);
460 return sb_map.empty();
461 }
462
463 void dump(CephContext *cct, int lvl);
464 };
465
466 //#define CACHE_BLOB_BL // not sure if this is a win yet or not... :/
467
468 /// in-memory blob metadata and associated cached buffers (if any)
469 struct Blob {
470 MEMPOOL_CLASS_HELPERS();
471
472 std::atomic_int nref = {0}; ///< reference count
473 int16_t id = -1; ///< id, for spanning blobs only, >= 0
474 int16_t last_encoded_id = -1; ///< (ephemeral) used during encoding only
475 SharedBlobRef shared_blob; ///< shared blob state (if any)
476
477 private:
478 mutable bluestore_blob_t blob; ///< decoded blob metadata
479 #ifdef CACHE_BLOB_BL
480 mutable bufferlist blob_bl; ///< cached encoded blob, blob is dirty if empty
481 #endif
482 /// refs from this shard. ephemeral if id<0, persisted if spanning.
483 bluestore_blob_use_tracker_t used_in_blob;
484
485 public:
486
487 friend void intrusive_ptr_add_ref(Blob *b) { b->get(); }
488 friend void intrusive_ptr_release(Blob *b) { b->put(); }
489
490 friend ostream& operator<<(ostream& out, const Blob &b);
491
492 const bluestore_blob_use_tracker_t& get_blob_use_tracker() const {
493 return used_in_blob;
494 }
495 bool is_referenced() const {
496 return used_in_blob.is_not_empty();
497 }
498 uint32_t get_referenced_bytes() const {
499 return used_in_blob.get_referenced_bytes();
500 }
501
502 bool is_spanning() const {
503 return id >= 0;
504 }
505
506 bool can_split() const {
507 std::lock_guard<std::recursive_mutex> l(shared_blob->get_cache()->lock);
508 // splitting a BufferSpace writing list is too hard; don't try.
509 return shared_blob->bc.writing.empty() &&
510 used_in_blob.can_split() &&
511 get_blob().can_split();
512 }
513
514 bool can_split_at(uint32_t blob_offset) const {
515 return used_in_blob.can_split_at(blob_offset) &&
516 get_blob().can_split_at(blob_offset);
517 }
518
519 bool can_reuse_blob(uint32_t min_alloc_size,
520 uint32_t target_blob_size,
521 uint32_t b_offset,
522 uint32_t *length0);
523
524 void dup(Blob& o) {
525 o.shared_blob = shared_blob;
526 o.blob = blob;
527 #ifdef CACHE_BLOB_BL
528 o.blob_bl = blob_bl;
529 #endif
530 }
531
532 inline const bluestore_blob_t& get_blob() const {
533 return blob;
534 }
535 inline bluestore_blob_t& dirty_blob() {
536 #ifdef CACHE_BLOB_BL
537 blob_bl.clear();
538 #endif
539 return blob;
540 }
541
542 /// discard buffers for unallocated regions
543 void discard_unallocated(Collection *coll);
544
545 /// get logical references
546 void get_ref(Collection *coll, uint32_t offset, uint32_t length);
547 /// put logical references, and get back any released extents
548 bool put_ref(Collection *coll, uint32_t offset, uint32_t length,
549 PExtentVector *r);
550
551 /// split the blob
552 void split(Collection *coll, uint32_t blob_offset, Blob *o);
553
554 void get() {
555 ++nref;
556 }
557 void put() {
558 if (--nref == 0)
559 delete this;
560 }
561
562
563 #ifdef CACHE_BLOB_BL
564 void _encode() const {
565 if (blob_bl.length() == 0 ) {
566 ::encode(blob, blob_bl);
567 } else {
568 assert(blob_bl.length());
569 }
570 }
571 void bound_encode(
572 size_t& p,
573 bool include_ref_map) const {
574 _encode();
575 p += blob_bl.length();
576 if (include_ref_map) {
577 used_in_blob.bound_encode(p);
578 }
579 }
580 void encode(
581 bufferlist::contiguous_appender& p,
582 bool include_ref_map) const {
583 _encode();
584 p.append(blob_bl);
585 if (include_ref_map) {
586 used_in_blob.encode(p);
587 }
588 }
589 void decode(
590 Collection */*coll*/,
591 bufferptr::iterator& p,
592 bool include_ref_map) {
593 const char *start = p.get_pos();
594 denc(blob, p);
595 const char *end = p.get_pos();
596 blob_bl.clear();
597 blob_bl.append(start, end - start);
598 if (include_ref_map) {
599 used_in_blob.decode(p);
600 }
601 }
602 #else
603 void bound_encode(
604 size_t& p,
605 uint64_t struct_v,
606 uint64_t sbid,
607 bool include_ref_map) const {
608 denc(blob, p, struct_v);
609 if (blob.is_shared()) {
610 denc(sbid, p);
611 }
612 if (include_ref_map) {
613 used_in_blob.bound_encode(p);
614 }
615 }
616 void encode(
617 bufferlist::contiguous_appender& p,
618 uint64_t struct_v,
619 uint64_t sbid,
620 bool include_ref_map) const {
621 denc(blob, p, struct_v);
622 if (blob.is_shared()) {
623 denc(sbid, p);
624 }
625 if (include_ref_map) {
626 used_in_blob.encode(p);
627 }
628 }
629 void decode(
630 Collection *coll,
631 bufferptr::iterator& p,
632 uint64_t struct_v,
633 uint64_t* sbid,
634 bool include_ref_map);
635 #endif
636 };
637 typedef boost::intrusive_ptr<Blob> BlobRef;
638 typedef mempool::bluestore_cache_other::map<int,BlobRef> blob_map_t;
639
640 /// a logical extent, pointing to (some portion of) a blob
641 typedef boost::intrusive::set_base_hook<boost::intrusive::optimize_size<true> > ExtentBase; //making an alias to avoid build warnings
642 struct Extent : public ExtentBase {
643 MEMPOOL_CLASS_HELPERS();
644
645 uint32_t logical_offset = 0; ///< logical offset
646 uint32_t blob_offset = 0; ///< blob offset
647 uint32_t length = 0; ///< length
648 BlobRef blob; ///< the blob with our data
649
650 /// ctor for lookup only
651 explicit Extent(uint32_t lo) : ExtentBase(), logical_offset(lo) { }
652 /// ctor for delayed initialization (see decode_some())
653 explicit Extent() : ExtentBase() {
654 }
655 /// ctor for general usage
656 Extent(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b)
657 : ExtentBase(),
658 logical_offset(lo), blob_offset(o), length(l) {
659 assign_blob(b);
660 }
661 ~Extent() {
662 if (blob) {
663 blob->shared_blob->get_cache()->rm_extent();
664 }
665 }
666
667 void assign_blob(const BlobRef& b) {
668 assert(!blob);
669 blob = b;
670 blob->shared_blob->get_cache()->add_extent();
671 }
672
673 // comparators for intrusive_set
674 friend bool operator<(const Extent &a, const Extent &b) {
675 return a.logical_offset < b.logical_offset;
676 }
677 friend bool operator>(const Extent &a, const Extent &b) {
678 return a.logical_offset > b.logical_offset;
679 }
680 friend bool operator==(const Extent &a, const Extent &b) {
681 return a.logical_offset == b.logical_offset;
682 }
683
684 uint32_t blob_start() const {
685 return logical_offset - blob_offset;
686 }
687
688 uint32_t blob_end() const {
689 return blob_start() + blob->get_blob().get_logical_length();
690 }
691
692 uint32_t logical_end() const {
693 return logical_offset + length;
694 }
695
696 // return true if any piece of the blob is out of
697 // the given range [o, o + l].
698 bool blob_escapes_range(uint32_t o, uint32_t l) const {
699 return blob_start() < o || blob_end() > o + l;
700 }
701 };
702 typedef boost::intrusive::set<Extent> extent_map_t;
703
704
705 friend ostream& operator<<(ostream& out, const Extent& e);
706
707 struct OldExtent {
708 boost::intrusive::list_member_hook<> old_extent_item;
709 Extent e;
710 PExtentVector r;
711 bool blob_empty; // flag to track the last removed extent that makes blob
712 // empty - required to update compression stat properly
713 OldExtent(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b)
714 : e(lo, o, l, b), blob_empty(false) {
715 }
716 static OldExtent* create(CollectionRef c,
717 uint32_t lo,
718 uint32_t o,
719 uint32_t l,
720 BlobRef& b);
721 };
722 typedef boost::intrusive::list<
723 OldExtent,
724 boost::intrusive::member_hook<
725 OldExtent,
726 boost::intrusive::list_member_hook<>,
727 &OldExtent::old_extent_item> > old_extent_map_t;
728
729 struct Onode;
730
731 /// a sharded extent map, mapping offsets to lextents to blobs
732 struct ExtentMap {
733 Onode *onode;
734 extent_map_t extent_map; ///< map of Extents to Blobs
735 blob_map_t spanning_blob_map; ///< blobs that span shards
736
737 struct Shard {
738 bluestore_onode_t::shard_info *shard_info = nullptr;
739 unsigned extents = 0; ///< count extents in this shard
740 bool loaded = false; ///< true if shard is loaded
741 bool dirty = false; ///< true if shard is dirty and needs reencoding
742 };
743 mempool::bluestore_cache_other::vector<Shard> shards; ///< shards
744
745 bufferlist inline_bl; ///< cached encoded map, if unsharded; empty=>dirty
746
747 uint32_t needs_reshard_begin = 0;
748 uint32_t needs_reshard_end = 0;
749
750 bool needs_reshard() const {
751 return needs_reshard_end > needs_reshard_begin;
752 }
753 void clear_needs_reshard() {
754 needs_reshard_begin = needs_reshard_end = 0;
755 }
756 void request_reshard(uint32_t begin, uint32_t end) {
757 if (begin < needs_reshard_begin) {
758 needs_reshard_begin = begin;
759 }
760 if (end > needs_reshard_end) {
761 needs_reshard_end = end;
762 }
763 }
764
765 struct DeleteDisposer {
766 void operator()(Extent *e) { delete e; }
767 };
768
769 ExtentMap(Onode *o);
770 ~ExtentMap() {
771 extent_map.clear_and_dispose(DeleteDisposer());
772 }
773
774 void clear() {
775 extent_map.clear_and_dispose(DeleteDisposer());
776 shards.clear();
777 inline_bl.clear();
778 clear_needs_reshard();
779 }
780
781 bool encode_some(uint32_t offset, uint32_t length, bufferlist& bl,
782 unsigned *pn);
783 unsigned decode_some(bufferlist& bl);
784
785 void bound_encode_spanning_blobs(size_t& p);
786 void encode_spanning_blobs(bufferlist::contiguous_appender& p);
787 void decode_spanning_blobs(bufferptr::iterator& p);
788
789 BlobRef get_spanning_blob(int id) {
790 auto p = spanning_blob_map.find(id);
791 assert(p != spanning_blob_map.end());
792 return p->second;
793 }
794
795 void update(KeyValueDB::Transaction t, bool force);
796 decltype(BlueStore::Blob::id) allocate_spanning_blob_id();
797 void reshard(
798 KeyValueDB *db,
799 KeyValueDB::Transaction t);
800
801 /// initialize Shards from the onode
802 void init_shards(bool loaded, bool dirty);
803
804 /// return index of shard containing offset
805 /// or -1 if not found
806 int seek_shard(uint32_t offset) {
807 size_t end = shards.size();
808 size_t mid, left = 0;
809 size_t right = end; // one passed the right end
810
811 while (left < right) {
812 mid = left + (right - left) / 2;
813 if (offset >= shards[mid].shard_info->offset) {
814 size_t next = mid + 1;
815 if (next >= end || offset < shards[next].shard_info->offset)
816 return mid;
817 //continue to search forwards
818 left = next;
819 } else {
820 //continue to search backwards
821 right = mid;
822 }
823 }
824
825 return -1; // not found
826 }
827
828 /// check if a range spans a shard
829 bool spans_shard(uint32_t offset, uint32_t length) {
830 if (shards.empty()) {
831 return false;
832 }
833 int s = seek_shard(offset);
834 assert(s >= 0);
835 if (s == (int)shards.size() - 1) {
836 return false; // last shard
837 }
838 if (offset + length <= shards[s+1].shard_info->offset) {
839 return false;
840 }
841 return true;
842 }
843
844 /// ensure that a range of the map is loaded
845 void fault_range(KeyValueDB *db,
846 uint32_t offset, uint32_t length);
847
848 /// ensure a range of the map is marked dirty
849 void dirty_range(uint32_t offset, uint32_t length);
850
851 /// for seek_lextent test
852 extent_map_t::iterator find(uint64_t offset);
853
854 /// seek to the first lextent including or after offset
855 extent_map_t::iterator seek_lextent(uint64_t offset);
856 extent_map_t::const_iterator seek_lextent(uint64_t offset) const;
857
858 /// add a new Extent
859 void add(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b) {
860 extent_map.insert(*new Extent(lo, o, l, b));
861 }
862
863 /// remove (and delete) an Extent
864 void rm(extent_map_t::iterator p) {
865 extent_map.erase_and_dispose(p, DeleteDisposer());
866 }
867
868 bool has_any_lextents(uint64_t offset, uint64_t length);
869
870 /// consolidate adjacent lextents in extent_map
871 int compress_extent_map(uint64_t offset, uint64_t length);
872
873 /// punch a logical hole. add lextents to deref to target list.
874 void punch_hole(CollectionRef &c,
875 uint64_t offset, uint64_t length,
876 old_extent_map_t *old_extents);
877
878 /// put new lextent into lextent_map overwriting existing ones if
879 /// any and update references accordingly
880 Extent *set_lextent(CollectionRef &c,
881 uint64_t logical_offset,
882 uint64_t offset, uint64_t length,
883 BlobRef b,
884 old_extent_map_t *old_extents);
885
886 /// split a blob (and referring extents)
887 BlobRef split_blob(BlobRef lb, uint32_t blob_offset, uint32_t pos);
888 };
889
890 /// Compressed Blob Garbage collector
891 /*
892 The primary idea of the collector is to estimate a difference between
893 allocation units(AU) currently present for compressed blobs and new AUs
894 required to store that data uncompressed.
895 Estimation is performed for protrusive extents within a logical range
896 determined by a concatenation of old_extents collection and specific(current)
897 write request.
898 The root cause for old_extents use is the need to handle blob ref counts
899 properly. Old extents still hold blob refs and hence we need to traverse
900 the collection to determine if blob to be released.
901 Protrusive extents are extents that fit into the blob set in action
902 (ones that are below the logical range from above) but not removed totally
903 due to the current write.
904 E.g. for
905 extent1 <loffs = 100, boffs = 100, len = 100> ->
906 blob1<compressed, len_on_disk=4096, logical_len=8192>
907 extent2 <loffs = 200, boffs = 200, len = 100> ->
908 blob2<raw, len_on_disk=4096, llen=4096>
909 extent3 <loffs = 300, boffs = 300, len = 100> ->
910 blob1<compressed, len_on_disk=4096, llen=8192>
911 extent4 <loffs = 4096, boffs = 0, len = 100> ->
912 blob3<raw, len_on_disk=4096, llen=4096>
913 write(300~100)
914 protrusive extents are within the following ranges <0~300, 400~8192-400>
915 In this case existing AUs that might be removed due to GC (i.e. blob1)
916 use 2x4K bytes.
917 And new AUs expected after GC = 0 since extent1 to be merged into blob2.
918 Hence we should do a collect.
919 */
920 class GarbageCollector
921 {
922 public:
923 /// return amount of allocation units that might be saved due to GC
924 int64_t estimate(
925 uint64_t offset,
926 uint64_t length,
927 const ExtentMap& extent_map,
928 const old_extent_map_t& old_extents,
929 uint64_t min_alloc_size);
930
931 /// return a collection of extents to perform GC on
932 const vector<AllocExtent>& get_extents_to_collect() const {
933 return extents_to_collect;
934 }
935 GarbageCollector(CephContext* _cct) : cct(_cct) {}
936
937 private:
938 struct BlobInfo {
939 uint64_t referenced_bytes = 0; ///< amount of bytes referenced in blob
940 int64_t expected_allocations = 0; ///< new alloc units required
941 ///< in case of gc fulfilled
942 bool collect_candidate = false; ///< indicate if blob has any extents
943 ///< eligible for GC.
944 extent_map_t::const_iterator first_lextent; ///< points to the first
945 ///< lextent referring to
946 ///< the blob if any.
947 ///< collect_candidate flag
948 ///< determines the validity
949 extent_map_t::const_iterator last_lextent; ///< points to the last
950 ///< lextent referring to
951 ///< the blob if any.
952
953 BlobInfo(uint64_t ref_bytes) :
954 referenced_bytes(ref_bytes) {
955 }
956 };
957 CephContext* cct;
958 map<Blob*, BlobInfo> affected_blobs; ///< compressed blobs and their ref_map
959 ///< copies that are affected by the
960 ///< specific write
961
962 vector<AllocExtent> extents_to_collect; ///< protrusive extents that should
963 ///< be collected if GC takes place
964
965 boost::optional<uint64_t > used_alloc_unit; ///< last processed allocation
966 ///< unit when traversing
967 ///< protrusive extents.
968 ///< Other extents mapped to
969 ///< this AU to be ignored
970 ///< (except the case where
971 ///< uncompressed extent follows
972 ///< compressed one - see below).
973 BlobInfo* blob_info_counted = nullptr; ///< set if previous allocation unit
974 ///< caused expected_allocations
975 ///< counter increment at this blob.
976 ///< if uncompressed extent follows
977 ///< a decrement for the
978 ///< expected_allocations counter
979 ///< is needed
980 int64_t expected_allocations = 0; ///< new alloc units required in case
981 ///< of gc fulfilled
982 int64_t expected_for_release = 0; ///< alloc units currently used by
983 ///< compressed blobs that might
984 ///< gone after GC
985 uint64_t gc_start_offset; ///starting offset for GC
986 uint64_t gc_end_offset; ///ending offset for GC
987
988 protected:
989 void process_protrusive_extents(const BlueStore::ExtentMap& extent_map,
990 uint64_t start_offset,
991 uint64_t end_offset,
992 uint64_t start_touch_offset,
993 uint64_t end_touch_offset,
994 uint64_t min_alloc_size);
995 };
996
997 struct OnodeSpace;
998
999 /// an in-memory object
1000 struct Onode {
1001 MEMPOOL_CLASS_HELPERS();
1002
1003 std::atomic_int nref; ///< reference count
1004 Collection *c;
1005
1006 ghobject_t oid;
1007
1008 /// key under PREFIX_OBJ where we are stored
1009 mempool::bluestore_cache_other::string key;
1010
1011 boost::intrusive::list_member_hook<> lru_item;
1012
1013 bluestore_onode_t onode; ///< metadata stored as value in kv store
1014 bool exists; ///< true if object logically exists
1015
1016 ExtentMap extent_map;
1017
1018 // track txc's that have not been committed to kv store (and whose
1019 // effects cannot be read via the kvdb read methods)
1020 std::atomic<int> flushing_count = {0};
1021 std::mutex flush_lock; ///< protect flush_txns
1022 std::condition_variable flush_cond; ///< wait here for uncommitted txns
1023
1024 Onode(Collection *c, const ghobject_t& o,
1025 const mempool::bluestore_cache_other::string& k)
1026 : nref(0),
1027 c(c),
1028 oid(o),
1029 key(k),
1030 exists(false),
1031 extent_map(this) {
1032 }
1033
1034 void flush();
1035 void get() {
1036 ++nref;
1037 }
1038 void put() {
1039 if (--nref == 0)
1040 delete this;
1041 }
1042 };
1043 typedef boost::intrusive_ptr<Onode> OnodeRef;
1044
1045
1046 /// a cache (shard) of onodes and buffers
1047 struct Cache {
1048 CephContext* cct;
1049 PerfCounters *logger;
1050 std::recursive_mutex lock; ///< protect lru and other structures
1051
1052 std::atomic<uint64_t> num_extents = {0};
1053 std::atomic<uint64_t> num_blobs = {0};
1054
1055 static Cache *create(CephContext* cct, string type, PerfCounters *logger);
1056
1057 Cache(CephContext* cct) : cct(cct), logger(nullptr) {}
1058 virtual ~Cache() {}
1059
1060 virtual void _add_onode(OnodeRef& o, int level) = 0;
1061 virtual void _rm_onode(OnodeRef& o) = 0;
1062 virtual void _touch_onode(OnodeRef& o) = 0;
1063
1064 virtual void _add_buffer(Buffer *b, int level, Buffer *near) = 0;
1065 virtual void _rm_buffer(Buffer *b) = 0;
1066 virtual void _move_buffer(Cache *src, Buffer *b) = 0;
1067 virtual void _adjust_buffer_size(Buffer *b, int64_t delta) = 0;
1068 virtual void _touch_buffer(Buffer *b) = 0;
1069
1070 virtual uint64_t _get_num_onodes() = 0;
1071 virtual uint64_t _get_buffer_bytes() = 0;
1072
1073 void add_extent() {
1074 ++num_extents;
1075 }
1076 void rm_extent() {
1077 --num_extents;
1078 }
1079
1080 void add_blob() {
1081 ++num_blobs;
1082 }
1083 void rm_blob() {
1084 --num_blobs;
1085 }
1086
1087 void trim(uint64_t target_bytes,
1088 float target_meta_ratio,
1089 float target_data_ratio,
1090 float bytes_per_onode);
1091
1092 void trim_all();
1093
1094 virtual void _trim(uint64_t onode_max, uint64_t buffer_max) = 0;
1095
1096 virtual void add_stats(uint64_t *onodes, uint64_t *extents,
1097 uint64_t *blobs,
1098 uint64_t *buffers,
1099 uint64_t *bytes) = 0;
1100
1101 bool empty() {
1102 std::lock_guard<std::recursive_mutex> l(lock);
1103 return _get_num_onodes() == 0 && _get_buffer_bytes() == 0;
1104 }
1105
1106 #ifdef DEBUG_CACHE
1107 virtual void _audit(const char *s) = 0;
1108 #else
1109 void _audit(const char *s) { /* no-op */ }
1110 #endif
1111 };
1112
1113 /// simple LRU cache for onodes and buffers
1114 struct LRUCache : public Cache {
1115 private:
1116 typedef boost::intrusive::list<
1117 Onode,
1118 boost::intrusive::member_hook<
1119 Onode,
1120 boost::intrusive::list_member_hook<>,
1121 &Onode::lru_item> > onode_lru_list_t;
1122 typedef boost::intrusive::list<
1123 Buffer,
1124 boost::intrusive::member_hook<
1125 Buffer,
1126 boost::intrusive::list_member_hook<>,
1127 &Buffer::lru_item> > buffer_lru_list_t;
1128
1129 onode_lru_list_t onode_lru;
1130
1131 buffer_lru_list_t buffer_lru;
1132 uint64_t buffer_size = 0;
1133
1134 public:
1135 LRUCache(CephContext* cct) : Cache(cct) {}
1136 uint64_t _get_num_onodes() override {
1137 return onode_lru.size();
1138 }
1139 void _add_onode(OnodeRef& o, int level) override {
1140 if (level > 0)
1141 onode_lru.push_front(*o);
1142 else
1143 onode_lru.push_back(*o);
1144 }
1145 void _rm_onode(OnodeRef& o) override {
1146 auto q = onode_lru.iterator_to(*o);
1147 onode_lru.erase(q);
1148 }
1149 void _touch_onode(OnodeRef& o) override;
1150
1151 uint64_t _get_buffer_bytes() override {
1152 return buffer_size;
1153 }
1154 void _add_buffer(Buffer *b, int level, Buffer *near) override {
1155 if (near) {
1156 auto q = buffer_lru.iterator_to(*near);
1157 buffer_lru.insert(q, *b);
1158 } else if (level > 0) {
1159 buffer_lru.push_front(*b);
1160 } else {
1161 buffer_lru.push_back(*b);
1162 }
1163 buffer_size += b->length;
1164 }
1165 void _rm_buffer(Buffer *b) override {
1166 assert(buffer_size >= b->length);
1167 buffer_size -= b->length;
1168 auto q = buffer_lru.iterator_to(*b);
1169 buffer_lru.erase(q);
1170 }
1171 void _move_buffer(Cache *src, Buffer *b) override {
1172 src->_rm_buffer(b);
1173 _add_buffer(b, 0, nullptr);
1174 }
1175 void _adjust_buffer_size(Buffer *b, int64_t delta) override {
1176 assert((int64_t)buffer_size + delta >= 0);
1177 buffer_size += delta;
1178 }
1179 void _touch_buffer(Buffer *b) override {
1180 auto p = buffer_lru.iterator_to(*b);
1181 buffer_lru.erase(p);
1182 buffer_lru.push_front(*b);
1183 _audit("_touch_buffer end");
1184 }
1185
1186 void _trim(uint64_t onode_max, uint64_t buffer_max) override;
1187
1188 void add_stats(uint64_t *onodes, uint64_t *extents,
1189 uint64_t *blobs,
1190 uint64_t *buffers,
1191 uint64_t *bytes) override {
1192 std::lock_guard<std::recursive_mutex> l(lock);
1193 *onodes += onode_lru.size();
1194 *extents += num_extents;
1195 *blobs += num_blobs;
1196 *buffers += buffer_lru.size();
1197 *bytes += buffer_size;
1198 }
1199
1200 #ifdef DEBUG_CACHE
1201 void _audit(const char *s) override;
1202 #endif
1203 };
1204
1205 // 2Q cache for buffers, LRU for onodes
1206 struct TwoQCache : public Cache {
1207 private:
1208 // stick with LRU for onodes for now (fixme?)
1209 typedef boost::intrusive::list<
1210 Onode,
1211 boost::intrusive::member_hook<
1212 Onode,
1213 boost::intrusive::list_member_hook<>,
1214 &Onode::lru_item> > onode_lru_list_t;
1215 typedef boost::intrusive::list<
1216 Buffer,
1217 boost::intrusive::member_hook<
1218 Buffer,
1219 boost::intrusive::list_member_hook<>,
1220 &Buffer::lru_item> > buffer_list_t;
1221
1222 onode_lru_list_t onode_lru;
1223
1224 buffer_list_t buffer_hot; ///< "Am" hot buffers
1225 buffer_list_t buffer_warm_in; ///< "A1in" newly warm buffers
1226 buffer_list_t buffer_warm_out; ///< "A1out" empty buffers we've evicted
1227 uint64_t buffer_bytes = 0; ///< bytes
1228
1229 enum {
1230 BUFFER_NEW = 0,
1231 BUFFER_WARM_IN, ///< in buffer_warm_in
1232 BUFFER_WARM_OUT, ///< in buffer_warm_out
1233 BUFFER_HOT, ///< in buffer_hot
1234 BUFFER_TYPE_MAX
1235 };
1236
1237 uint64_t buffer_list_bytes[BUFFER_TYPE_MAX] = {0}; ///< bytes per type
1238
1239 public:
1240 TwoQCache(CephContext* cct) : Cache(cct) {}
1241 uint64_t _get_num_onodes() override {
1242 return onode_lru.size();
1243 }
1244 void _add_onode(OnodeRef& o, int level) override {
1245 if (level > 0)
1246 onode_lru.push_front(*o);
1247 else
1248 onode_lru.push_back(*o);
1249 }
1250 void _rm_onode(OnodeRef& o) override {
1251 auto q = onode_lru.iterator_to(*o);
1252 onode_lru.erase(q);
1253 }
1254 void _touch_onode(OnodeRef& o) override;
1255
1256 uint64_t _get_buffer_bytes() override {
1257 return buffer_bytes;
1258 }
1259 void _add_buffer(Buffer *b, int level, Buffer *near) override;
1260 void _rm_buffer(Buffer *b) override;
1261 void _move_buffer(Cache *src, Buffer *b) override;
1262 void _adjust_buffer_size(Buffer *b, int64_t delta) override;
1263 void _touch_buffer(Buffer *b) override {
1264 switch (b->cache_private) {
1265 case BUFFER_WARM_IN:
1266 // do nothing (somewhat counter-intuitively!)
1267 break;
1268 case BUFFER_WARM_OUT:
1269 // move from warm_out to hot LRU
1270 assert(0 == "this happens via discard hint");
1271 break;
1272 case BUFFER_HOT:
1273 // move to front of hot LRU
1274 buffer_hot.erase(buffer_hot.iterator_to(*b));
1275 buffer_hot.push_front(*b);
1276 break;
1277 }
1278 _audit("_touch_buffer end");
1279 }
1280
1281 void _trim(uint64_t onode_max, uint64_t buffer_max) override;
1282
1283 void add_stats(uint64_t *onodes, uint64_t *extents,
1284 uint64_t *blobs,
1285 uint64_t *buffers,
1286 uint64_t *bytes) override {
1287 std::lock_guard<std::recursive_mutex> l(lock);
1288 *onodes += onode_lru.size();
1289 *extents += num_extents;
1290 *blobs += num_blobs;
1291 *buffers += buffer_hot.size() + buffer_warm_in.size();
1292 *bytes += buffer_bytes;
1293 }
1294
1295 #ifdef DEBUG_CACHE
1296 void _audit(const char *s) override;
1297 #endif
1298 };
1299
1300 struct OnodeSpace {
1301 private:
1302 Cache *cache;
1303
1304 /// forward lookups
1305 mempool::bluestore_cache_other::unordered_map<ghobject_t,OnodeRef> onode_map;
1306
1307 friend class Collection; // for split_cache()
1308
1309 public:
1310 OnodeSpace(Cache *c) : cache(c) {}
1311 ~OnodeSpace() {
1312 clear();
1313 }
1314
1315 OnodeRef add(const ghobject_t& oid, OnodeRef o);
1316 OnodeRef lookup(const ghobject_t& o);
1317 void remove(const ghobject_t& oid) {
1318 onode_map.erase(oid);
1319 }
1320 void rename(OnodeRef& o, const ghobject_t& old_oid,
1321 const ghobject_t& new_oid,
1322 const mempool::bluestore_cache_other::string& new_okey);
1323 void clear();
1324 bool empty();
1325
1326 void dump(CephContext *cct, int lvl);
1327
1328 /// return true if f true for any item
1329 bool map_any(std::function<bool(OnodeRef)> f);
1330 };
1331
1332 struct Collection : public CollectionImpl {
1333 BlueStore *store;
1334 Cache *cache; ///< our cache shard
1335 coll_t cid;
1336 bluestore_cnode_t cnode;
1337 RWLock lock;
1338
1339 bool exists;
1340
1341 SharedBlobSet shared_blob_set; ///< open SharedBlobs
1342
1343 // cache onodes on a per-collection basis to avoid lock
1344 // contention.
1345 OnodeSpace onode_map;
1346
1347 //pool options
1348 pool_opts_t pool_opts;
1349
1350 OnodeRef get_onode(const ghobject_t& oid, bool create);
1351
1352 // the terminology is confusing here, sorry!
1353 //
1354 // blob_t shared_blob_t
1355 // !shared unused -> open
1356 // shared !loaded -> open + shared
1357 // shared loaded -> open + shared + loaded
1358 //
1359 // i.e.,
1360 // open = SharedBlob is instantiated
1361 // shared = blob_t shared flag is set; SharedBlob is hashed.
1362 // loaded = SharedBlob::shared_blob_t is loaded from kv store
1363 void open_shared_blob(uint64_t sbid, BlobRef b);
1364 void load_shared_blob(SharedBlobRef sb);
1365 void make_blob_shared(uint64_t sbid, BlobRef b);
1366 uint64_t make_blob_unshared(SharedBlob *sb);
1367
1368 BlobRef new_blob() {
1369 BlobRef b = new Blob();
1370 b->shared_blob = new SharedBlob(this);
1371 return b;
1372 }
1373
1374 const coll_t &get_cid() override {
1375 return cid;
1376 }
1377
1378 bool contains(const ghobject_t& oid) {
1379 if (cid.is_meta())
1380 return oid.hobj.pool == -1;
1381 spg_t spgid;
1382 if (cid.is_pg(&spgid))
1383 return
1384 spgid.pgid.contains(cnode.bits, oid) &&
1385 oid.shard_id == spgid.shard;
1386 return false;
1387 }
1388
1389 void split_cache(Collection *dest);
1390
1391 Collection(BlueStore *ns, Cache *ca, coll_t c);
1392 };
1393
1394 class OmapIteratorImpl : public ObjectMap::ObjectMapIteratorImpl {
1395 CollectionRef c;
1396 OnodeRef o;
1397 KeyValueDB::Iterator it;
1398 string head, tail;
1399 public:
1400 OmapIteratorImpl(CollectionRef c, OnodeRef o, KeyValueDB::Iterator it);
1401 int seek_to_first() override;
1402 int upper_bound(const string &after) override;
1403 int lower_bound(const string &to) override;
1404 bool valid() override;
1405 int next(bool validate=true) override;
1406 string key() override;
1407 bufferlist value() override;
1408 int status() override {
1409 return 0;
1410 }
1411 };
1412
1413 class OpSequencer;
1414 typedef boost::intrusive_ptr<OpSequencer> OpSequencerRef;
1415
1416 struct volatile_statfs{
1417 enum {
1418 STATFS_ALLOCATED = 0,
1419 STATFS_STORED,
1420 STATFS_COMPRESSED_ORIGINAL,
1421 STATFS_COMPRESSED,
1422 STATFS_COMPRESSED_ALLOCATED,
1423 STATFS_LAST
1424 };
1425 int64_t values[STATFS_LAST];
1426 volatile_statfs() {
1427 memset(this, 0, sizeof(volatile_statfs));
1428 }
1429 void reset() {
1430 *this = volatile_statfs();
1431 }
1432 volatile_statfs& operator+=(const volatile_statfs& other) {
1433 for (size_t i = 0; i < STATFS_LAST; ++i) {
1434 values[i] += other.values[i];
1435 }
1436 return *this;
1437 }
1438 int64_t& allocated() {
1439 return values[STATFS_ALLOCATED];
1440 }
1441 int64_t& stored() {
1442 return values[STATFS_STORED];
1443 }
1444 int64_t& compressed_original() {
1445 return values[STATFS_COMPRESSED_ORIGINAL];
1446 }
1447 int64_t& compressed() {
1448 return values[STATFS_COMPRESSED];
1449 }
1450 int64_t& compressed_allocated() {
1451 return values[STATFS_COMPRESSED_ALLOCATED];
1452 }
1453 bool is_empty() {
1454 return values[STATFS_ALLOCATED] == 0 &&
1455 values[STATFS_STORED] == 0 &&
1456 values[STATFS_COMPRESSED] == 0 &&
1457 values[STATFS_COMPRESSED_ORIGINAL] == 0 &&
1458 values[STATFS_COMPRESSED_ALLOCATED] == 0;
1459 }
1460 void decode(bufferlist::iterator& it) {
1461 for (size_t i = 0; i < STATFS_LAST; i++) {
1462 ::decode(values[i], it);
1463 }
1464 }
1465
1466 void encode(bufferlist& bl) {
1467 for (size_t i = 0; i < STATFS_LAST; i++) {
1468 ::encode(values[i], bl);
1469 }
1470 }
1471 };
1472
1473 struct TransContext : public AioContext {
1474 MEMPOOL_CLASS_HELPERS();
1475
1476 typedef enum {
1477 STATE_PREPARE,
1478 STATE_AIO_WAIT,
1479 STATE_IO_DONE,
1480 STATE_KV_QUEUED, // queued for kv_sync_thread submission
1481 STATE_KV_SUBMITTED, // submitted to kv; not yet synced
1482 STATE_KV_DONE,
1483 STATE_DEFERRED_QUEUED, // in deferred_queue (pending or running)
1484 STATE_DEFERRED_CLEANUP, // remove deferred kv record
1485 STATE_DEFERRED_DONE,
1486 STATE_FINISHING,
1487 STATE_DONE,
1488 } state_t;
1489
1490 state_t state = STATE_PREPARE;
1491
1492 const char *get_state_name() {
1493 switch (state) {
1494 case STATE_PREPARE: return "prepare";
1495 case STATE_AIO_WAIT: return "aio_wait";
1496 case STATE_IO_DONE: return "io_done";
1497 case STATE_KV_QUEUED: return "kv_queued";
1498 case STATE_KV_SUBMITTED: return "kv_submitted";
1499 case STATE_KV_DONE: return "kv_done";
1500 case STATE_DEFERRED_QUEUED: return "deferred_queued";
1501 case STATE_DEFERRED_CLEANUP: return "deferred_cleanup";
1502 case STATE_DEFERRED_DONE: return "deferred_done";
1503 case STATE_FINISHING: return "finishing";
1504 case STATE_DONE: return "done";
1505 }
1506 return "???";
1507 }
1508
1509 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1510 const char *get_state_latency_name(int state) {
1511 switch (state) {
1512 case l_bluestore_state_prepare_lat: return "prepare";
1513 case l_bluestore_state_aio_wait_lat: return "aio_wait";
1514 case l_bluestore_state_io_done_lat: return "io_done";
1515 case l_bluestore_state_kv_queued_lat: return "kv_queued";
1516 case l_bluestore_state_kv_committing_lat: return "kv_committing";
1517 case l_bluestore_state_kv_done_lat: return "kv_done";
1518 case l_bluestore_state_deferred_queued_lat: return "deferred_queued";
1519 case l_bluestore_state_deferred_cleanup_lat: return "deferred_cleanup";
1520 case l_bluestore_state_finishing_lat: return "finishing";
1521 case l_bluestore_state_done_lat: return "done";
1522 }
1523 return "???";
1524 }
1525 #endif
1526
1527 void log_state_latency(PerfCounters *logger, int state) {
1528 utime_t lat, now = ceph_clock_now();
1529 lat = now - last_stamp;
1530 logger->tinc(state, lat);
1531 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1532 if (state >= l_bluestore_state_prepare_lat && state <= l_bluestore_state_done_lat) {
1533 double usecs = (now.to_nsec()-last_stamp.to_nsec())/1000;
1534 OID_ELAPSED("", usecs, get_state_latency_name(state));
1535 }
1536 #endif
1537 last_stamp = now;
1538 }
1539
1540 OpSequencerRef osr;
1541 boost::intrusive::list_member_hook<> sequencer_item;
1542
1543 uint64_t bytes = 0, cost = 0;
1544
1545 set<OnodeRef> onodes; ///< these need to be updated/written
1546 set<OnodeRef> modified_objects; ///< objects we modified (and need a ref)
1547 set<SharedBlobRef> shared_blobs; ///< these need to be updated/written
1548 set<SharedBlobRef> shared_blobs_written; ///< update these on io completion
1549
1550 KeyValueDB::Transaction t; ///< then we will commit this
1551 Context *oncommit = nullptr; ///< signal on commit
1552 Context *onreadable = nullptr; ///< signal on readable
1553 Context *onreadable_sync = nullptr; ///< signal on readable
1554 list<Context*> oncommits; ///< more commit completions
1555 list<CollectionRef> removed_collections; ///< colls we removed
1556
1557 boost::intrusive::list_member_hook<> deferred_queue_item;
1558 bluestore_deferred_transaction_t *deferred_txn = nullptr; ///< if any
1559
1560 interval_set<uint64_t> allocated, released;
1561 volatile_statfs statfs_delta;
1562
1563 IOContext ioc;
1564 bool had_ios = false; ///< true if we submitted IOs before our kv txn
1565
1566 uint64_t seq = 0;
1567 utime_t start;
1568 utime_t last_stamp;
1569
1570 uint64_t last_nid = 0; ///< if non-zero, highest new nid we allocated
1571 uint64_t last_blobid = 0; ///< if non-zero, highest new blobid we allocated
1572
1573 explicit TransContext(CephContext* cct, OpSequencer *o)
1574 : osr(o),
1575 ioc(cct, this),
1576 start(ceph_clock_now()) {
1577 last_stamp = start;
1578 }
1579 ~TransContext() {
1580 delete deferred_txn;
1581 }
1582
1583 void write_onode(OnodeRef &o) {
1584 onodes.insert(o);
1585 }
1586 void write_shared_blob(SharedBlobRef &sb) {
1587 shared_blobs.insert(sb);
1588 }
1589 void unshare_blob(SharedBlob *sb) {
1590 shared_blobs.erase(sb);
1591 }
1592
1593 /// note we logically modified object (when onode itself is unmodified)
1594 void note_modified_object(OnodeRef &o) {
1595 // onode itself isn't written, though
1596 modified_objects.insert(o);
1597 }
1598 void removed(OnodeRef& o) {
1599 onodes.erase(o);
1600 modified_objects.erase(o);
1601 }
1602
1603 void aio_finish(BlueStore *store) override {
1604 store->txc_aio_finish(this);
1605 }
1606 };
1607
1608 typedef boost::intrusive::list<
1609 TransContext,
1610 boost::intrusive::member_hook<
1611 TransContext,
1612 boost::intrusive::list_member_hook<>,
1613 &TransContext::deferred_queue_item> > deferred_queue_t;
1614
1615 struct DeferredBatch : public AioContext {
1616 OpSequencer *osr;
1617 struct deferred_io {
1618 bufferlist bl; ///< data
1619 uint64_t seq; ///< deferred transaction seq
1620 };
1621 map<uint64_t,deferred_io> iomap; ///< map of ios in this batch
1622 deferred_queue_t txcs; ///< txcs in this batch
1623 IOContext ioc; ///< our aios
1624 /// bytes of pending io for each deferred seq (may be 0)
1625 map<uint64_t,int> seq_bytes;
1626
1627 void _discard(CephContext *cct, uint64_t offset, uint64_t length);
1628 void _audit(CephContext *cct);
1629
1630 DeferredBatch(CephContext *cct, OpSequencer *osr)
1631 : osr(osr), ioc(cct, this) {}
1632
1633 /// prepare a write
1634 void prepare_write(CephContext *cct,
1635 uint64_t seq, uint64_t offset, uint64_t length,
1636 bufferlist::const_iterator& p);
1637
1638 void aio_finish(BlueStore *store) override {
1639 store->_deferred_aio_finish(osr);
1640 }
1641 };
1642
1643 class OpSequencer : public Sequencer_impl {
1644 public:
1645 std::mutex qlock;
1646 std::condition_variable qcond;
1647 typedef boost::intrusive::list<
1648 TransContext,
1649 boost::intrusive::member_hook<
1650 TransContext,
1651 boost::intrusive::list_member_hook<>,
1652 &TransContext::sequencer_item> > q_list_t;
1653 q_list_t q; ///< transactions
1654
1655 boost::intrusive::list_member_hook<> deferred_osr_queue_item;
1656
1657 DeferredBatch *deferred_running = nullptr;
1658 DeferredBatch *deferred_pending = nullptr;
1659
1660 Sequencer *parent;
1661 BlueStore *store;
1662
1663 uint64_t last_seq = 0;
1664
1665 std::atomic_int txc_with_unstable_io = {0}; ///< num txcs with unstable io
1666
1667 std::atomic_int kv_committing_serially = {0};
1668
1669 std::atomic_int kv_submitted_waiters = {0};
1670
1671 std::atomic_bool registered = {true}; ///< registered in BlueStore's osr_set
1672 std::atomic_bool zombie = {false}; ///< owning Sequencer has gone away
1673
1674 OpSequencer(CephContext* cct, BlueStore *store)
1675 : Sequencer_impl(cct),
1676 parent(NULL), store(store) {
1677 store->register_osr(this);
1678 }
1679 ~OpSequencer() override {
1680 assert(q.empty());
1681 _unregister();
1682 }
1683
1684 void discard() override {
1685 // Note that we may have txc's in flight when the parent Sequencer
1686 // goes away. Reflect this with zombie==registered==true and let
1687 // _osr_drain_all clean up later.
1688 assert(!zombie);
1689 zombie = true;
1690 parent = nullptr;
1691 bool empty;
1692 {
1693 std::lock_guard<std::mutex> l(qlock);
1694 empty = q.empty();
1695 }
1696 if (empty) {
1697 _unregister();
1698 }
1699 }
1700
1701 void _unregister() {
1702 if (registered) {
1703 store->unregister_osr(this);
1704 registered = false;
1705 }
1706 }
1707
1708 void queue_new(TransContext *txc) {
1709 std::lock_guard<std::mutex> l(qlock);
1710 txc->seq = ++last_seq;
1711 q.push_back(*txc);
1712 }
1713
1714 void drain() {
1715 std::unique_lock<std::mutex> l(qlock);
1716 while (!q.empty())
1717 qcond.wait(l);
1718 }
1719
1720 void drain_preceding(TransContext *txc) {
1721 std::unique_lock<std::mutex> l(qlock);
1722 while (!q.empty() && &q.front() != txc)
1723 qcond.wait(l);
1724 }
1725
1726 bool _is_all_kv_submitted() {
1727 // caller must hold qlock
1728 if (q.empty()) {
1729 return true;
1730 }
1731 TransContext *txc = &q.back();
1732 if (txc->state >= TransContext::STATE_KV_SUBMITTED) {
1733 return true;
1734 }
1735 return false;
1736 }
1737
1738 void flush() override {
1739 std::unique_lock<std::mutex> l(qlock);
1740 while (true) {
1741 // set flag before the check because the condition
1742 // may become true outside qlock, and we need to make
1743 // sure those threads see waiters and signal qcond.
1744 ++kv_submitted_waiters;
1745 if (_is_all_kv_submitted()) {
1746 return;
1747 }
1748 qcond.wait(l);
1749 --kv_submitted_waiters;
1750 }
1751 }
1752
1753 bool flush_commit(Context *c) override {
1754 std::lock_guard<std::mutex> l(qlock);
1755 if (q.empty()) {
1756 return true;
1757 }
1758 TransContext *txc = &q.back();
1759 if (txc->state >= TransContext::STATE_KV_DONE) {
1760 return true;
1761 }
1762 txc->oncommits.push_back(c);
1763 return false;
1764 }
1765 };
1766
1767 typedef boost::intrusive::list<
1768 OpSequencer,
1769 boost::intrusive::member_hook<
1770 OpSequencer,
1771 boost::intrusive::list_member_hook<>,
1772 &OpSequencer::deferred_osr_queue_item> > deferred_osr_queue_t;
1773
1774 struct KVSyncThread : public Thread {
1775 BlueStore *store;
1776 explicit KVSyncThread(BlueStore *s) : store(s) {}
1777 void *entry() override {
1778 store->_kv_sync_thread();
1779 return NULL;
1780 }
1781 };
1782 struct KVFinalizeThread : public Thread {
1783 BlueStore *store;
1784 explicit KVFinalizeThread(BlueStore *s) : store(s) {}
1785 void *entry() {
1786 store->_kv_finalize_thread();
1787 return NULL;
1788 }
1789 };
1790
1791 struct DBHistogram {
1792 struct value_dist {
1793 uint64_t count;
1794 uint32_t max_len;
1795 };
1796
1797 struct key_dist {
1798 uint64_t count;
1799 uint32_t max_len;
1800 map<int, struct value_dist> val_map; ///< slab id to count, max length of value and key
1801 };
1802
1803 map<string, map<int, struct key_dist> > key_hist;
1804 map<int, uint64_t> value_hist;
1805 int get_key_slab(size_t sz);
1806 string get_key_slab_to_range(int slab);
1807 int get_value_slab(size_t sz);
1808 string get_value_slab_to_range(int slab);
1809 void update_hist_entry(map<string, map<int, struct key_dist> > &key_hist,
1810 const string &prefix, size_t key_size, size_t value_size);
1811 void dump(Formatter *f);
1812 };
1813
1814 // --------------------------------------------------------
1815 // members
1816 private:
1817 BlueFS *bluefs = nullptr;
1818 unsigned bluefs_shared_bdev = 0; ///< which bluefs bdev we are sharing
1819 bool bluefs_single_shared_device = true;
1820 utime_t bluefs_last_balance;
1821
1822 KeyValueDB *db = nullptr;
1823 BlockDevice *bdev = nullptr;
1824 std::string freelist_type;
1825 FreelistManager *fm = nullptr;
1826 Allocator *alloc = nullptr;
1827 uuid_d fsid;
1828 int path_fd = -1; ///< open handle to $path
1829 int fsid_fd = -1; ///< open handle (locked) to $path/fsid
1830 bool mounted = false;
1831
1832 RWLock coll_lock = {"BlueStore::coll_lock"}; ///< rwlock to protect coll_map
1833 mempool::bluestore_cache_other::unordered_map<coll_t, CollectionRef> coll_map;
1834
1835 vector<Cache*> cache_shards;
1836
1837 std::mutex osr_lock; ///< protect osd_set
1838 std::set<OpSequencerRef> osr_set; ///< set of all OpSequencers
1839
1840 std::atomic<uint64_t> nid_last = {0};
1841 std::atomic<uint64_t> nid_max = {0};
1842 std::atomic<uint64_t> blobid_last = {0};
1843 std::atomic<uint64_t> blobid_max = {0};
1844
1845 Throttle throttle_bytes; ///< submit to commit
1846 Throttle throttle_deferred_bytes; ///< submit to deferred complete
1847
1848 interval_set<uint64_t> bluefs_extents; ///< block extents owned by bluefs
1849 interval_set<uint64_t> bluefs_extents_reclaiming; ///< currently reclaiming
1850
1851 std::mutex deferred_lock;
1852 std::atomic<uint64_t> deferred_seq = {0};
1853 deferred_osr_queue_t deferred_queue; ///< osr's with deferred io pending
1854 int deferred_queue_size = 0; ///< num txc's queued across all osrs
1855 atomic_int deferred_aggressive = {0}; ///< aggressive wakeup of kv thread
1856 Finisher deferred_finisher;
1857
1858 int m_finisher_num = 1;
1859 vector<Finisher*> finishers;
1860
1861 KVSyncThread kv_sync_thread;
1862 std::mutex kv_lock;
1863 std::condition_variable kv_cond;
1864 bool _kv_only = false;
1865 bool kv_sync_started = false;
1866 bool kv_stop = false;
1867 bool kv_finalize_started = false;
1868 bool kv_finalize_stop = false;
1869 deque<TransContext*> kv_queue; ///< ready, already submitted
1870 deque<TransContext*> kv_queue_unsubmitted; ///< ready, need submit by kv thread
1871 deque<TransContext*> kv_committing; ///< currently syncing
1872 deque<DeferredBatch*> deferred_done_queue; ///< deferred ios done
1873 deque<DeferredBatch*> deferred_stable_queue; ///< deferred ios done + stable
1874
1875 KVFinalizeThread kv_finalize_thread;
1876 std::mutex kv_finalize_lock;
1877 std::condition_variable kv_finalize_cond;
1878 deque<TransContext*> kv_committing_to_finalize; ///< pending finalization
1879 deque<DeferredBatch*> deferred_stable_to_finalize; ///< pending finalization
1880
1881 PerfCounters *logger = nullptr;
1882
1883 list<CollectionRef> removed_collections;
1884
1885 RWLock debug_read_error_lock = {"BlueStore::debug_read_error_lock"};
1886 set<ghobject_t> debug_data_error_objects;
1887 set<ghobject_t> debug_mdata_error_objects;
1888
1889 std::atomic<int> csum_type = {Checksummer::CSUM_CRC32C};
1890
1891 uint64_t block_size = 0; ///< block size of block device (power of 2)
1892 uint64_t block_mask = 0; ///< mask to get just the block offset
1893 size_t block_size_order = 0; ///< bits to shift to get block size
1894
1895 uint64_t min_alloc_size = 0; ///< minimum allocation unit (power of 2)
1896 ///< bits for min_alloc_size
1897 uint8_t min_alloc_size_order = 0;
1898 static_assert(std::numeric_limits<uint8_t>::max() >
1899 std::numeric_limits<decltype(min_alloc_size)>::digits,
1900 "not enough bits for min_alloc_size");
1901
1902 ///< maximum allocation unit (power of 2)
1903 std::atomic<uint64_t> max_alloc_size = {0};
1904
1905 ///< number threshold for forced deferred writes
1906 std::atomic<int> deferred_batch_ops = {0};
1907
1908 ///< size threshold for forced deferred writes
1909 std::atomic<uint64_t> prefer_deferred_size = {0};
1910
1911 ///< approx cost per io, in bytes
1912 std::atomic<uint64_t> throttle_cost_per_io = {0};
1913
1914 std::atomic<Compressor::CompressionMode> comp_mode =
1915 {Compressor::COMP_NONE}; ///< compression mode
1916 CompressorRef compressor;
1917 std::atomic<uint64_t> comp_min_blob_size = {0};
1918 std::atomic<uint64_t> comp_max_blob_size = {0};
1919
1920 std::atomic<uint64_t> max_blob_size = {0}; ///< maximum blob size
1921
1922 uint64_t kv_ios = 0;
1923 uint64_t kv_throttle_costs = 0;
1924
1925 // cache trim control
1926 uint64_t cache_size = 0; ///< total cache size
1927 float cache_meta_ratio = 0; ///< cache ratio dedicated to metadata
1928 float cache_kv_ratio = 0; ///< cache ratio dedicated to kv (e.g., rocksdb)
1929 float cache_data_ratio = 0; ///< cache ratio dedicated to object data
1930
1931 std::mutex vstatfs_lock;
1932 volatile_statfs vstatfs;
1933
1934 struct MempoolThread : public Thread {
1935 BlueStore *store;
1936 Cond cond;
1937 Mutex lock;
1938 bool stop = false;
1939 public:
1940 explicit MempoolThread(BlueStore *s)
1941 : store(s),
1942 lock("BlueStore::MempoolThread::lock") {}
1943 void *entry() override;
1944 void init() {
1945 assert(stop == false);
1946 create("bstore_mempool");
1947 }
1948 void shutdown() {
1949 lock.Lock();
1950 stop = true;
1951 cond.Signal();
1952 lock.Unlock();
1953 join();
1954 }
1955 } mempool_thread;
1956
1957 // --------------------------------------------------------
1958 // private methods
1959
1960 void _init_logger();
1961 void _shutdown_logger();
1962 int _reload_logger();
1963
1964 int _open_path();
1965 void _close_path();
1966 int _open_fsid(bool create);
1967 int _lock_fsid();
1968 int _read_fsid(uuid_d *f);
1969 int _write_fsid();
1970 void _close_fsid();
1971 void _set_alloc_sizes();
1972 void _set_blob_size();
1973 void _set_finisher_num();
1974
1975 int _open_bdev(bool create);
1976 void _close_bdev();
1977 int _open_db(bool create);
1978 void _close_db();
1979 int _open_fm(bool create);
1980 void _close_fm();
1981 int _open_alloc();
1982 void _close_alloc();
1983 int _open_collections(int *errors=0);
1984 void _close_collections();
1985
1986 int _setup_block_symlink_or_file(string name, string path, uint64_t size,
1987 bool create);
1988
1989 public:
1990 static int _write_bdev_label(CephContext* cct,
1991 string path, bluestore_bdev_label_t label);
1992 static int _read_bdev_label(CephContext* cct, string path,
1993 bluestore_bdev_label_t *label);
1994 private:
1995 int _check_or_set_bdev_label(string path, uint64_t size, string desc,
1996 bool create);
1997
1998 int _open_super_meta();
1999
2000 void _open_statfs();
2001
2002 int _reconcile_bluefs_freespace();
2003 int _balance_bluefs_freespace(PExtentVector *extents);
2004 void _commit_bluefs_freespace(const PExtentVector& extents);
2005
2006 CollectionRef _get_collection(const coll_t& cid);
2007 void _queue_reap_collection(CollectionRef& c);
2008 void _reap_collections();
2009 void _update_cache_logger();
2010
2011 void _assign_nid(TransContext *txc, OnodeRef o);
2012 uint64_t _assign_blobid(TransContext *txc);
2013
2014 void _dump_onode(const OnodeRef& o, int log_level=30);
2015 void _dump_extent_map(ExtentMap& em, int log_level=30);
2016 void _dump_transaction(Transaction *t, int log_level = 30);
2017
2018 TransContext *_txc_create(OpSequencer *osr);
2019 void _txc_update_store_statfs(TransContext *txc);
2020 void _txc_add_transaction(TransContext *txc, Transaction *t);
2021 void _txc_calc_cost(TransContext *txc);
2022 void _txc_write_nodes(TransContext *txc, KeyValueDB::Transaction t);
2023 void _txc_state_proc(TransContext *txc);
2024 void _txc_aio_submit(TransContext *txc);
2025 public:
2026 void txc_aio_finish(void *p) {
2027 _txc_state_proc(static_cast<TransContext*>(p));
2028 }
2029 private:
2030 void _txc_finish_io(TransContext *txc);
2031 void _txc_finalize_kv(TransContext *txc, KeyValueDB::Transaction t);
2032 void _txc_applied_kv(TransContext *txc);
2033 void _txc_committed_kv(TransContext *txc);
2034 void _txc_finish(TransContext *txc);
2035 void _txc_release_alloc(TransContext *txc);
2036
2037 void _osr_drain_preceding(TransContext *txc);
2038 void _osr_drain_all();
2039 void _osr_unregister_all();
2040
2041 void _kv_start();
2042 void _kv_stop();
2043 void _kv_sync_thread();
2044 void _kv_finalize_thread();
2045
2046 bluestore_deferred_op_t *_get_deferred_op(TransContext *txc, OnodeRef o);
2047 void _deferred_queue(TransContext *txc);
2048 public:
2049 void deferred_try_submit();
2050 private:
2051 void _deferred_submit_unlock(OpSequencer *osr);
2052 void _deferred_aio_finish(OpSequencer *osr);
2053 int _deferred_replay();
2054
2055 public:
2056 using mempool_dynamic_bitset =
2057 boost::dynamic_bitset<uint64_t,
2058 mempool::bluestore_fsck::pool_allocator<uint64_t>>;
2059
2060 private:
2061 int _fsck_check_extents(
2062 const ghobject_t& oid,
2063 const PExtentVector& extents,
2064 bool compressed,
2065 mempool_dynamic_bitset &used_blocks,
2066 uint64_t granularity,
2067 store_statfs_t& expected_statfs);
2068
2069 void _buffer_cache_write(
2070 TransContext *txc,
2071 BlobRef b,
2072 uint64_t offset,
2073 bufferlist& bl,
2074 unsigned flags) {
2075 b->shared_blob->bc.write(b->shared_blob->get_cache(), txc->seq, offset, bl,
2076 flags);
2077 txc->shared_blobs_written.insert(b->shared_blob);
2078 }
2079
2080 int _collection_list(
2081 Collection *c, const ghobject_t& start, const ghobject_t& end,
2082 int max, vector<ghobject_t> *ls, ghobject_t *next);
2083
2084 template <typename T, typename F>
2085 T select_option(const std::string& opt_name, T val1, F f) {
2086 //NB: opt_name reserved for future use
2087 boost::optional<T> val2 = f();
2088 if (val2) {
2089 return *val2;
2090 }
2091 return val1;
2092 }
2093
2094 void _apply_padding(uint64_t head_pad,
2095 uint64_t tail_pad,
2096 bufferlist& padded);
2097
2098 // -- ondisk version ---
2099 public:
2100 const int32_t latest_ondisk_format = 2; ///< our version
2101 const int32_t min_readable_ondisk_format = 1; ///< what we can read
2102 const int32_t min_compat_ondisk_format = 2; ///< who can read us
2103
2104 private:
2105 int32_t ondisk_format = 0; ///< value detected on mount
2106
2107 int _upgrade_super(); ///< upgrade (called during open_super)
2108 void _prepare_ondisk_format_super(KeyValueDB::Transaction& t);
2109
2110 // --- public interface ---
2111 public:
2112 BlueStore(CephContext *cct, const string& path);
2113 BlueStore(CephContext *cct, const string& path, uint64_t min_alloc_size); // Ctor for UT only
2114 ~BlueStore() override;
2115
2116 string get_type() override {
2117 return "bluestore";
2118 }
2119
2120 bool needs_journal() override { return false; };
2121 bool wants_journal() override { return false; };
2122 bool allows_journal() override { return false; };
2123
2124 bool is_rotational() override;
2125 bool is_journal_rotational() override;
2126
2127 string get_default_device_class() override {
2128 string device_class;
2129 map<string, string> metadata;
2130 collect_metadata(&metadata);
2131 auto it = metadata.find("bluestore_bdev_type");
2132 if (it != metadata.end()) {
2133 device_class = it->second;
2134 }
2135 return device_class;
2136 }
2137
2138 static int get_block_device_fsid(CephContext* cct, const string& path,
2139 uuid_d *fsid);
2140
2141 bool test_mount_in_use() override;
2142
2143 private:
2144 int _mount(bool kv_only);
2145 public:
2146 int mount() override {
2147 return _mount(false);
2148 }
2149 int umount() override;
2150
2151 int start_kv_only(KeyValueDB **pdb) {
2152 int r = _mount(true);
2153 if (r < 0)
2154 return r;
2155 *pdb = db;
2156 return 0;
2157 }
2158
2159 int write_meta(const std::string& key, const std::string& value) override;
2160 int read_meta(const std::string& key, std::string *value) override;
2161
2162
2163 int fsck(bool deep) override {
2164 return _fsck(deep, false);
2165 }
2166 int repair(bool deep) override {
2167 return _fsck(deep, true);
2168 }
2169 int _fsck(bool deep, bool repair);
2170
2171 void set_cache_shards(unsigned num) override;
2172
2173 int validate_hobject_key(const hobject_t &obj) const override {
2174 return 0;
2175 }
2176 unsigned get_max_attr_name_length() override {
2177 return 256; // arbitrary; there is no real limit internally
2178 }
2179
2180 int mkfs() override;
2181 int mkjournal() override {
2182 return 0;
2183 }
2184
2185 void get_db_statistics(Formatter *f) override;
2186 void generate_db_histogram(Formatter *f) override;
2187 void _flush_cache();
2188 void flush_cache() override;
2189 void dump_perf_counters(Formatter *f) override {
2190 f->open_object_section("perf_counters");
2191 logger->dump_formatted(f, false);
2192 f->close_section();
2193 }
2194
2195 void register_osr(OpSequencer *osr) {
2196 std::lock_guard<std::mutex> l(osr_lock);
2197 osr_set.insert(osr);
2198 }
2199 void unregister_osr(OpSequencer *osr) {
2200 std::lock_guard<std::mutex> l(osr_lock);
2201 osr_set.erase(osr);
2202 }
2203
2204 public:
2205 int statfs(struct store_statfs_t *buf) override;
2206
2207 void collect_metadata(map<string,string> *pm) override;
2208
2209 bool exists(const coll_t& cid, const ghobject_t& oid) override;
2210 bool exists(CollectionHandle &c, const ghobject_t& oid) override;
2211 int set_collection_opts(
2212 const coll_t& cid,
2213 const pool_opts_t& opts) override;
2214 int stat(
2215 const coll_t& cid,
2216 const ghobject_t& oid,
2217 struct stat *st,
2218 bool allow_eio = false) override;
2219 int stat(
2220 CollectionHandle &c,
2221 const ghobject_t& oid,
2222 struct stat *st,
2223 bool allow_eio = false) override;
2224 int read(
2225 const coll_t& cid,
2226 const ghobject_t& oid,
2227 uint64_t offset,
2228 size_t len,
2229 bufferlist& bl,
2230 uint32_t op_flags = 0) override;
2231 int read(
2232 CollectionHandle &c,
2233 const ghobject_t& oid,
2234 uint64_t offset,
2235 size_t len,
2236 bufferlist& bl,
2237 uint32_t op_flags = 0) override;
2238 int _do_read(
2239 Collection *c,
2240 OnodeRef o,
2241 uint64_t offset,
2242 size_t len,
2243 bufferlist& bl,
2244 uint32_t op_flags = 0);
2245
2246 private:
2247 int _fiemap(CollectionHandle &c_, const ghobject_t& oid,
2248 uint64_t offset, size_t len, interval_set<uint64_t>& destset);
2249 public:
2250 int fiemap(const coll_t& cid, const ghobject_t& oid,
2251 uint64_t offset, size_t len, bufferlist& bl) override;
2252 int fiemap(CollectionHandle &c, const ghobject_t& oid,
2253 uint64_t offset, size_t len, bufferlist& bl) override;
2254 int fiemap(const coll_t& cid, const ghobject_t& oid,
2255 uint64_t offset, size_t len, map<uint64_t, uint64_t>& destmap) override;
2256 int fiemap(CollectionHandle &c, const ghobject_t& oid,
2257 uint64_t offset, size_t len, map<uint64_t, uint64_t>& destmap) override;
2258
2259
2260 int getattr(const coll_t& cid, const ghobject_t& oid, const char *name,
2261 bufferptr& value) override;
2262 int getattr(CollectionHandle &c, const ghobject_t& oid, const char *name,
2263 bufferptr& value) override;
2264
2265 int getattrs(const coll_t& cid, const ghobject_t& oid,
2266 map<string,bufferptr>& aset) override;
2267 int getattrs(CollectionHandle &c, const ghobject_t& oid,
2268 map<string,bufferptr>& aset) override;
2269
2270 int list_collections(vector<coll_t>& ls) override;
2271
2272 CollectionHandle open_collection(const coll_t &c) override;
2273
2274 bool collection_exists(const coll_t& c) override;
2275 int collection_empty(const coll_t& c, bool *empty) override;
2276 int collection_bits(const coll_t& c) override;
2277
2278 int collection_list(const coll_t& cid,
2279 const ghobject_t& start,
2280 const ghobject_t& end,
2281 int max,
2282 vector<ghobject_t> *ls, ghobject_t *next) override;
2283 int collection_list(CollectionHandle &c,
2284 const ghobject_t& start,
2285 const ghobject_t& end,
2286 int max,
2287 vector<ghobject_t> *ls, ghobject_t *next) override;
2288
2289 int omap_get(
2290 const coll_t& cid, ///< [in] Collection containing oid
2291 const ghobject_t &oid, ///< [in] Object containing omap
2292 bufferlist *header, ///< [out] omap header
2293 map<string, bufferlist> *out /// < [out] Key to value map
2294 ) override;
2295 int omap_get(
2296 CollectionHandle &c, ///< [in] Collection containing oid
2297 const ghobject_t &oid, ///< [in] Object containing omap
2298 bufferlist *header, ///< [out] omap header
2299 map<string, bufferlist> *out /// < [out] Key to value map
2300 ) override;
2301
2302 /// Get omap header
2303 int omap_get_header(
2304 const coll_t& cid, ///< [in] Collection containing oid
2305 const ghobject_t &oid, ///< [in] Object containing omap
2306 bufferlist *header, ///< [out] omap header
2307 bool allow_eio = false ///< [in] don't assert on eio
2308 ) override;
2309 int omap_get_header(
2310 CollectionHandle &c, ///< [in] Collection containing oid
2311 const ghobject_t &oid, ///< [in] Object containing omap
2312 bufferlist *header, ///< [out] omap header
2313 bool allow_eio = false ///< [in] don't assert on eio
2314 ) override;
2315
2316 /// Get keys defined on oid
2317 int omap_get_keys(
2318 const coll_t& cid, ///< [in] Collection containing oid
2319 const ghobject_t &oid, ///< [in] Object containing omap
2320 set<string> *keys ///< [out] Keys defined on oid
2321 ) override;
2322 int omap_get_keys(
2323 CollectionHandle &c, ///< [in] Collection containing oid
2324 const ghobject_t &oid, ///< [in] Object containing omap
2325 set<string> *keys ///< [out] Keys defined on oid
2326 ) override;
2327
2328 /// Get key values
2329 int omap_get_values(
2330 const coll_t& cid, ///< [in] Collection containing oid
2331 const ghobject_t &oid, ///< [in] Object containing omap
2332 const set<string> &keys, ///< [in] Keys to get
2333 map<string, bufferlist> *out ///< [out] Returned keys and values
2334 ) override;
2335 int omap_get_values(
2336 CollectionHandle &c, ///< [in] Collection containing oid
2337 const ghobject_t &oid, ///< [in] Object containing omap
2338 const set<string> &keys, ///< [in] Keys to get
2339 map<string, bufferlist> *out ///< [out] Returned keys and values
2340 ) override;
2341
2342 /// Filters keys into out which are defined on oid
2343 int omap_check_keys(
2344 const coll_t& cid, ///< [in] Collection containing oid
2345 const ghobject_t &oid, ///< [in] Object containing omap
2346 const set<string> &keys, ///< [in] Keys to check
2347 set<string> *out ///< [out] Subset of keys defined on oid
2348 ) override;
2349 int omap_check_keys(
2350 CollectionHandle &c, ///< [in] Collection containing oid
2351 const ghobject_t &oid, ///< [in] Object containing omap
2352 const set<string> &keys, ///< [in] Keys to check
2353 set<string> *out ///< [out] Subset of keys defined on oid
2354 ) override;
2355
2356 ObjectMap::ObjectMapIterator get_omap_iterator(
2357 const coll_t& cid, ///< [in] collection
2358 const ghobject_t &oid ///< [in] object
2359 ) override;
2360 ObjectMap::ObjectMapIterator get_omap_iterator(
2361 CollectionHandle &c, ///< [in] collection
2362 const ghobject_t &oid ///< [in] object
2363 ) override;
2364
2365 void set_fsid(uuid_d u) override {
2366 fsid = u;
2367 }
2368 uuid_d get_fsid() override {
2369 return fsid;
2370 }
2371
2372 uint64_t estimate_objects_overhead(uint64_t num_objects) override {
2373 return num_objects * 300; //assuming per-object overhead is 300 bytes
2374 }
2375
2376 struct BSPerfTracker {
2377 PerfCounters::avg_tracker<uint64_t> os_commit_latency;
2378 PerfCounters::avg_tracker<uint64_t> os_apply_latency;
2379
2380 objectstore_perf_stat_t get_cur_stats() const {
2381 objectstore_perf_stat_t ret;
2382 ret.os_commit_latency = os_commit_latency.current_avg();
2383 ret.os_apply_latency = os_apply_latency.current_avg();
2384 return ret;
2385 }
2386
2387 void update_from_perfcounters(PerfCounters &logger);
2388 } perf_tracker;
2389
2390 objectstore_perf_stat_t get_cur_stats() override {
2391 perf_tracker.update_from_perfcounters(*logger);
2392 return perf_tracker.get_cur_stats();
2393 }
2394 const PerfCounters* get_perf_counters() const override {
2395 return logger;
2396 }
2397
2398 int queue_transactions(
2399 Sequencer *osr,
2400 vector<Transaction>& tls,
2401 TrackedOpRef op = TrackedOpRef(),
2402 ThreadPool::TPHandle *handle = NULL) override;
2403
2404 // error injection
2405 void inject_data_error(const ghobject_t& o) override {
2406 RWLock::WLocker l(debug_read_error_lock);
2407 debug_data_error_objects.insert(o);
2408 }
2409 void inject_mdata_error(const ghobject_t& o) override {
2410 RWLock::WLocker l(debug_read_error_lock);
2411 debug_mdata_error_objects.insert(o);
2412 }
2413 void compact() override {
2414 assert(db);
2415 db->compact();
2416 }
2417 bool has_builtin_csum() const override {
2418 return true;
2419 }
2420
2421 private:
2422 bool _debug_data_eio(const ghobject_t& o) {
2423 if (!cct->_conf->bluestore_debug_inject_read_err) {
2424 return false;
2425 }
2426 RWLock::RLocker l(debug_read_error_lock);
2427 return debug_data_error_objects.count(o);
2428 }
2429 bool _debug_mdata_eio(const ghobject_t& o) {
2430 if (!cct->_conf->bluestore_debug_inject_read_err) {
2431 return false;
2432 }
2433 RWLock::RLocker l(debug_read_error_lock);
2434 return debug_mdata_error_objects.count(o);
2435 }
2436 void _debug_obj_on_delete(const ghobject_t& o) {
2437 if (cct->_conf->bluestore_debug_inject_read_err) {
2438 RWLock::WLocker l(debug_read_error_lock);
2439 debug_data_error_objects.erase(o);
2440 debug_mdata_error_objects.erase(o);
2441 }
2442 }
2443
2444 private:
2445
2446 // --------------------------------------------------------
2447 // read processing internal methods
2448 int _verify_csum(
2449 OnodeRef& o,
2450 const bluestore_blob_t* blob,
2451 uint64_t blob_xoffset,
2452 const bufferlist& bl,
2453 uint64_t logical_offset) const;
2454 int _decompress(bufferlist& source, bufferlist* result);
2455
2456
2457 // --------------------------------------------------------
2458 // write ops
2459
2460 struct WriteContext {
2461 bool buffered = false; ///< buffered write
2462 bool compress = false; ///< compressed write
2463 uint64_t target_blob_size = 0; ///< target (max) blob size
2464 unsigned csum_order = 0; ///< target checksum chunk order
2465
2466 old_extent_map_t old_extents; ///< must deref these blobs
2467
2468 struct write_item {
2469 uint64_t logical_offset; ///< write logical offset
2470 BlobRef b;
2471 uint64_t blob_length;
2472 uint64_t b_off;
2473 bufferlist bl;
2474 uint64_t b_off0; ///< original offset in a blob prior to padding
2475 uint64_t length0; ///< original data length prior to padding
2476
2477 bool mark_unused;
2478 bool new_blob; ///< whether new blob was created
2479
2480 bool compressed = false;
2481 bufferlist compressed_bl;
2482 size_t compressed_len = 0;
2483
2484 write_item(
2485 uint64_t logical_offs,
2486 BlobRef b,
2487 uint64_t blob_len,
2488 uint64_t o,
2489 bufferlist& bl,
2490 uint64_t o0,
2491 uint64_t l0,
2492 bool _mark_unused,
2493 bool _new_blob)
2494 :
2495 logical_offset(logical_offs),
2496 b(b),
2497 blob_length(blob_len),
2498 b_off(o),
2499 bl(bl),
2500 b_off0(o0),
2501 length0(l0),
2502 mark_unused(_mark_unused),
2503 new_blob(_new_blob) {}
2504 };
2505 vector<write_item> writes; ///< blobs we're writing
2506
2507 /// partial clone of the context
2508 void fork(const WriteContext& other) {
2509 buffered = other.buffered;
2510 compress = other.compress;
2511 target_blob_size = other.target_blob_size;
2512 csum_order = other.csum_order;
2513 }
2514 void write(
2515 uint64_t loffs,
2516 BlobRef b,
2517 uint64_t blob_len,
2518 uint64_t o,
2519 bufferlist& bl,
2520 uint64_t o0,
2521 uint64_t len0,
2522 bool _mark_unused,
2523 bool _new_blob) {
2524 writes.emplace_back(loffs,
2525 b,
2526 blob_len,
2527 o,
2528 bl,
2529 o0,
2530 len0,
2531 _mark_unused,
2532 _new_blob);
2533 }
2534 /// Checks for writes to the same pextent within a blob
2535 bool has_conflict(
2536 BlobRef b,
2537 uint64_t loffs,
2538 uint64_t loffs_end,
2539 uint64_t min_alloc_size);
2540 };
2541
2542 void _do_write_small(
2543 TransContext *txc,
2544 CollectionRef &c,
2545 OnodeRef o,
2546 uint64_t offset, uint64_t length,
2547 bufferlist::iterator& blp,
2548 WriteContext *wctx);
2549 void _do_write_big(
2550 TransContext *txc,
2551 CollectionRef &c,
2552 OnodeRef o,
2553 uint64_t offset, uint64_t length,
2554 bufferlist::iterator& blp,
2555 WriteContext *wctx);
2556 int _do_alloc_write(
2557 TransContext *txc,
2558 CollectionRef c,
2559 OnodeRef o,
2560 WriteContext *wctx);
2561 void _wctx_finish(
2562 TransContext *txc,
2563 CollectionRef& c,
2564 OnodeRef o,
2565 WriteContext *wctx,
2566 set<SharedBlob*> *maybe_unshared_blobs=0);
2567
2568 int _do_transaction(Transaction *t,
2569 TransContext *txc,
2570 ThreadPool::TPHandle *handle);
2571
2572 int _write(TransContext *txc,
2573 CollectionRef& c,
2574 OnodeRef& o,
2575 uint64_t offset, size_t len,
2576 bufferlist& bl,
2577 uint32_t fadvise_flags);
2578 void _pad_zeros(bufferlist *bl, uint64_t *offset,
2579 uint64_t chunk_size);
2580
2581 void _choose_write_options(CollectionRef& c,
2582 OnodeRef o,
2583 uint32_t fadvise_flags,
2584 WriteContext *wctx);
2585
2586 int _do_gc(TransContext *txc,
2587 CollectionRef& c,
2588 OnodeRef o,
2589 const GarbageCollector& gc,
2590 const WriteContext& wctx,
2591 uint64_t *dirty_start,
2592 uint64_t *dirty_end);
2593
2594 int _do_write(TransContext *txc,
2595 CollectionRef &c,
2596 OnodeRef o,
2597 uint64_t offset, uint64_t length,
2598 bufferlist& bl,
2599 uint32_t fadvise_flags);
2600 void _do_write_data(TransContext *txc,
2601 CollectionRef& c,
2602 OnodeRef o,
2603 uint64_t offset,
2604 uint64_t length,
2605 bufferlist& bl,
2606 WriteContext *wctx);
2607
2608 int _touch(TransContext *txc,
2609 CollectionRef& c,
2610 OnodeRef& o);
2611 int _do_zero(TransContext *txc,
2612 CollectionRef& c,
2613 OnodeRef& o,
2614 uint64_t offset, size_t len);
2615 int _zero(TransContext *txc,
2616 CollectionRef& c,
2617 OnodeRef& o,
2618 uint64_t offset, size_t len);
2619 void _do_truncate(TransContext *txc,
2620 CollectionRef& c,
2621 OnodeRef o,
2622 uint64_t offset,
2623 set<SharedBlob*> *maybe_unshared_blobs=0);
2624 int _truncate(TransContext *txc,
2625 CollectionRef& c,
2626 OnodeRef& o,
2627 uint64_t offset);
2628 int _remove(TransContext *txc,
2629 CollectionRef& c,
2630 OnodeRef& o);
2631 int _do_remove(TransContext *txc,
2632 CollectionRef& c,
2633 OnodeRef o);
2634 int _setattr(TransContext *txc,
2635 CollectionRef& c,
2636 OnodeRef& o,
2637 const string& name,
2638 bufferptr& val);
2639 int _setattrs(TransContext *txc,
2640 CollectionRef& c,
2641 OnodeRef& o,
2642 const map<string,bufferptr>& aset);
2643 int _rmattr(TransContext *txc,
2644 CollectionRef& c,
2645 OnodeRef& o,
2646 const string& name);
2647 int _rmattrs(TransContext *txc,
2648 CollectionRef& c,
2649 OnodeRef& o);
2650 void _do_omap_clear(TransContext *txc, uint64_t id);
2651 int _omap_clear(TransContext *txc,
2652 CollectionRef& c,
2653 OnodeRef& o);
2654 int _omap_setkeys(TransContext *txc,
2655 CollectionRef& c,
2656 OnodeRef& o,
2657 bufferlist& bl);
2658 int _omap_setheader(TransContext *txc,
2659 CollectionRef& c,
2660 OnodeRef& o,
2661 bufferlist& header);
2662 int _omap_rmkeys(TransContext *txc,
2663 CollectionRef& c,
2664 OnodeRef& o,
2665 bufferlist& bl);
2666 int _omap_rmkey_range(TransContext *txc,
2667 CollectionRef& c,
2668 OnodeRef& o,
2669 const string& first, const string& last);
2670 int _set_alloc_hint(
2671 TransContext *txc,
2672 CollectionRef& c,
2673 OnodeRef& o,
2674 uint64_t expected_object_size,
2675 uint64_t expected_write_size,
2676 uint32_t flags);
2677 int _do_clone_range(TransContext *txc,
2678 CollectionRef& c,
2679 OnodeRef& oldo,
2680 OnodeRef& newo,
2681 uint64_t srcoff, uint64_t length, uint64_t dstoff);
2682 int _clone(TransContext *txc,
2683 CollectionRef& c,
2684 OnodeRef& oldo,
2685 OnodeRef& newo);
2686 int _clone_range(TransContext *txc,
2687 CollectionRef& c,
2688 OnodeRef& oldo,
2689 OnodeRef& newo,
2690 uint64_t srcoff, uint64_t length, uint64_t dstoff);
2691 int _rename(TransContext *txc,
2692 CollectionRef& c,
2693 OnodeRef& oldo,
2694 OnodeRef& newo,
2695 const ghobject_t& new_oid);
2696 int _create_collection(TransContext *txc, const coll_t &cid,
2697 unsigned bits, CollectionRef *c);
2698 int _remove_collection(TransContext *txc, const coll_t &cid,
2699 CollectionRef *c);
2700 int _split_collection(TransContext *txc,
2701 CollectionRef& c,
2702 CollectionRef& d,
2703 unsigned bits, int rem);
2704 };
2705
2706 inline ostream& operator<<(ostream& out, const BlueStore::OpSequencer& s) {
2707 return out << *s.parent;
2708 }
2709
2710 static inline void intrusive_ptr_add_ref(BlueStore::Onode *o) {
2711 o->get();
2712 }
2713 static inline void intrusive_ptr_release(BlueStore::Onode *o) {
2714 o->put();
2715 }
2716
2717 static inline void intrusive_ptr_add_ref(BlueStore::OpSequencer *o) {
2718 o->get();
2719 }
2720 static inline void intrusive_ptr_release(BlueStore::OpSequencer *o) {
2721 o->put();
2722 }
2723
2724 #endif