]> git.proxmox.com Git - ceph.git/blob - ceph/src/os/bluestore/BlueStore.h
update sources to 12.2.7
[ceph.git] / ceph / src / os / bluestore / BlueStore.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_OSD_BLUESTORE_H
16 #define CEPH_OSD_BLUESTORE_H
17
18 #include "acconfig.h"
19
20 #include <unistd.h>
21
22 #include <atomic>
23 #include <mutex>
24 #include <condition_variable>
25
26 #include <boost/intrusive/list.hpp>
27 #include <boost/intrusive/unordered_set.hpp>
28 #include <boost/intrusive/set.hpp>
29 #include <boost/functional/hash.hpp>
30 #include <boost/dynamic_bitset.hpp>
31
32 #include "include/assert.h"
33 #include "include/unordered_map.h"
34 #include "include/memory.h"
35 #include "include/mempool.h"
36 #include "common/Finisher.h"
37 #include "common/perf_counters.h"
38 #include "compressor/Compressor.h"
39 #include "os/ObjectStore.h"
40
41 #include "bluestore_types.h"
42 #include "BlockDevice.h"
43 #include "common/EventTrace.h"
44
45 class Allocator;
46 class FreelistManager;
47 class BlueFS;
48
49 //#define DEBUG_CACHE
50 //#define DEBUG_DEFERRED
51
52
53
54 // constants for Buffer::optimize()
55 #define MAX_BUFFER_SLOP_RATIO_DEN 8 // so actually 1/N
56
57
58 enum {
59 l_bluestore_first = 732430,
60 l_bluestore_kv_flush_lat,
61 l_bluestore_kv_commit_lat,
62 l_bluestore_kv_lat,
63 l_bluestore_state_prepare_lat,
64 l_bluestore_state_aio_wait_lat,
65 l_bluestore_state_io_done_lat,
66 l_bluestore_state_kv_queued_lat,
67 l_bluestore_state_kv_committing_lat,
68 l_bluestore_state_kv_done_lat,
69 l_bluestore_state_deferred_queued_lat,
70 l_bluestore_state_deferred_aio_wait_lat,
71 l_bluestore_state_deferred_cleanup_lat,
72 l_bluestore_state_finishing_lat,
73 l_bluestore_state_done_lat,
74 l_bluestore_throttle_lat,
75 l_bluestore_submit_lat,
76 l_bluestore_commit_lat,
77 l_bluestore_read_lat,
78 l_bluestore_read_onode_meta_lat,
79 l_bluestore_read_wait_aio_lat,
80 l_bluestore_compress_lat,
81 l_bluestore_decompress_lat,
82 l_bluestore_csum_lat,
83 l_bluestore_compress_success_count,
84 l_bluestore_compress_rejected_count,
85 l_bluestore_write_pad_bytes,
86 l_bluestore_deferred_write_ops,
87 l_bluestore_deferred_write_bytes,
88 l_bluestore_write_penalty_read_ops,
89 l_bluestore_allocated,
90 l_bluestore_stored,
91 l_bluestore_compressed,
92 l_bluestore_compressed_allocated,
93 l_bluestore_compressed_original,
94 l_bluestore_onodes,
95 l_bluestore_onode_hits,
96 l_bluestore_onode_misses,
97 l_bluestore_onode_shard_hits,
98 l_bluestore_onode_shard_misses,
99 l_bluestore_extents,
100 l_bluestore_blobs,
101 l_bluestore_buffers,
102 l_bluestore_buffer_bytes,
103 l_bluestore_buffer_hit_bytes,
104 l_bluestore_buffer_miss_bytes,
105 l_bluestore_write_big,
106 l_bluestore_write_big_bytes,
107 l_bluestore_write_big_blobs,
108 l_bluestore_write_small,
109 l_bluestore_write_small_bytes,
110 l_bluestore_write_small_unused,
111 l_bluestore_write_small_deferred,
112 l_bluestore_write_small_pre_read,
113 l_bluestore_write_small_new,
114 l_bluestore_txc,
115 l_bluestore_onode_reshard,
116 l_bluestore_blob_split,
117 l_bluestore_extent_compress,
118 l_bluestore_gc_merged,
119 l_bluestore_read_eio,
120 l_bluestore_last
121 };
122
123 class BlueStore : public ObjectStore,
124 public md_config_obs_t {
125 // -----------------------------------------------------
126 // types
127 public:
128 // config observer
129 const char** get_tracked_conf_keys() const override;
130 void handle_conf_change(const struct md_config_t *conf,
131 const std::set<std::string> &changed) override;
132
133 void _set_csum();
134 void _set_compression();
135 void _set_throttle_params();
136 int _set_cache_sizes();
137
138 class TransContext;
139
140 typedef map<uint64_t, bufferlist> ready_regions_t;
141
142 struct BufferSpace;
143 struct Collection;
144 typedef boost::intrusive_ptr<Collection> CollectionRef;
145
146 struct AioContext {
147 virtual void aio_finish(BlueStore *store) = 0;
148 virtual ~AioContext() {}
149 };
150
151 /// cached buffer
152 struct Buffer {
153 MEMPOOL_CLASS_HELPERS();
154
155 enum {
156 STATE_EMPTY, ///< empty buffer -- used for cache history
157 STATE_CLEAN, ///< clean data that is up to date
158 STATE_WRITING, ///< data that is being written (io not yet complete)
159 };
160 static const char *get_state_name(int s) {
161 switch (s) {
162 case STATE_EMPTY: return "empty";
163 case STATE_CLEAN: return "clean";
164 case STATE_WRITING: return "writing";
165 default: return "???";
166 }
167 }
168 enum {
169 FLAG_NOCACHE = 1, ///< trim when done WRITING (do not become CLEAN)
170 // NOTE: fix operator<< when you define a second flag
171 };
172 static const char *get_flag_name(int s) {
173 switch (s) {
174 case FLAG_NOCACHE: return "nocache";
175 default: return "???";
176 }
177 }
178
179 BufferSpace *space;
180 uint16_t state; ///< STATE_*
181 uint16_t cache_private = 0; ///< opaque (to us) value used by Cache impl
182 uint32_t flags; ///< FLAG_*
183 uint64_t seq;
184 uint32_t offset, length;
185 bufferlist data;
186
187 boost::intrusive::list_member_hook<> lru_item;
188 boost::intrusive::list_member_hook<> state_item;
189
190 Buffer(BufferSpace *space, unsigned s, uint64_t q, uint32_t o, uint32_t l,
191 unsigned f = 0)
192 : space(space), state(s), flags(f), seq(q), offset(o), length(l) {}
193 Buffer(BufferSpace *space, unsigned s, uint64_t q, uint32_t o, bufferlist& b,
194 unsigned f = 0)
195 : space(space), state(s), flags(f), seq(q), offset(o),
196 length(b.length()), data(b) {}
197
198 bool is_empty() const {
199 return state == STATE_EMPTY;
200 }
201 bool is_clean() const {
202 return state == STATE_CLEAN;
203 }
204 bool is_writing() const {
205 return state == STATE_WRITING;
206 }
207
208 uint32_t end() const {
209 return offset + length;
210 }
211
212 void truncate(uint32_t newlen) {
213 assert(newlen < length);
214 if (data.length()) {
215 bufferlist t;
216 t.substr_of(data, 0, newlen);
217 data.claim(t);
218 }
219 length = newlen;
220 }
221 void maybe_rebuild() {
222 if (data.length() &&
223 (data.get_num_buffers() > 1 ||
224 data.front().wasted() > data.length() / MAX_BUFFER_SLOP_RATIO_DEN)) {
225 data.rebuild();
226 }
227 }
228
229 void dump(Formatter *f) const {
230 f->dump_string("state", get_state_name(state));
231 f->dump_unsigned("seq", seq);
232 f->dump_unsigned("offset", offset);
233 f->dump_unsigned("length", length);
234 f->dump_unsigned("data_length", data.length());
235 }
236 };
237
238 struct Cache;
239
240 /// map logical extent range (object) onto buffers
241 struct BufferSpace {
242 typedef boost::intrusive::list<
243 Buffer,
244 boost::intrusive::member_hook<
245 Buffer,
246 boost::intrusive::list_member_hook<>,
247 &Buffer::state_item> > state_list_t;
248
249 mempool::bluestore_cache_other::map<uint32_t, std::unique_ptr<Buffer>>
250 buffer_map;
251
252 // we use a bare intrusive list here instead of std::map because
253 // it uses less memory and we expect this to be very small (very
254 // few IOs in flight to the same Blob at the same time).
255 state_list_t writing; ///< writing buffers, sorted by seq, ascending
256
257 ~BufferSpace() {
258 assert(buffer_map.empty());
259 assert(writing.empty());
260 }
261
262 void _add_buffer(Cache* cache, Buffer *b, int level, Buffer *near) {
263 cache->_audit("_add_buffer start");
264 buffer_map[b->offset].reset(b);
265 if (b->is_writing()) {
266 b->data.reassign_to_mempool(mempool::mempool_bluestore_writing);
267 if (writing.empty() || writing.rbegin()->seq <= b->seq) {
268 writing.push_back(*b);
269 } else {
270 auto it = writing.begin();
271 while (it->seq < b->seq) {
272 ++it;
273 }
274
275 assert(it->seq >= b->seq);
276 // note that this will insert b before it
277 // hence the order is maintained
278 writing.insert(it, *b);
279 }
280 } else {
281 b->data.reassign_to_mempool(mempool::mempool_bluestore_cache_data);
282 cache->_add_buffer(b, level, near);
283 }
284 cache->_audit("_add_buffer end");
285 }
286 void _rm_buffer(Cache* cache, Buffer *b) {
287 _rm_buffer(cache, buffer_map.find(b->offset));
288 }
289 void _rm_buffer(Cache* cache,
290 map<uint32_t, std::unique_ptr<Buffer>>::iterator p) {
291 assert(p != buffer_map.end());
292 cache->_audit("_rm_buffer start");
293 if (p->second->is_writing()) {
294 writing.erase(writing.iterator_to(*p->second));
295 } else {
296 cache->_rm_buffer(p->second.get());
297 }
298 buffer_map.erase(p);
299 cache->_audit("_rm_buffer end");
300 }
301
302 map<uint32_t,std::unique_ptr<Buffer>>::iterator _data_lower_bound(
303 uint32_t offset) {
304 auto i = buffer_map.lower_bound(offset);
305 if (i != buffer_map.begin()) {
306 --i;
307 if (i->first + i->second->length <= offset)
308 ++i;
309 }
310 return i;
311 }
312
313 // must be called under protection of the Cache lock
314 void _clear(Cache* cache);
315
316 // return value is the highest cache_private of a trimmed buffer, or 0.
317 int discard(Cache* cache, uint32_t offset, uint32_t length) {
318 std::lock_guard<std::recursive_mutex> l(cache->lock);
319 return _discard(cache, offset, length);
320 }
321 int _discard(Cache* cache, uint32_t offset, uint32_t length);
322
323 void write(Cache* cache, uint64_t seq, uint32_t offset, bufferlist& bl,
324 unsigned flags) {
325 std::lock_guard<std::recursive_mutex> l(cache->lock);
326 Buffer *b = new Buffer(this, Buffer::STATE_WRITING, seq, offset, bl,
327 flags);
328 b->cache_private = _discard(cache, offset, bl.length());
329 _add_buffer(cache, b, (flags & Buffer::FLAG_NOCACHE) ? 0 : 1, nullptr);
330 }
331 void finish_write(Cache* cache, uint64_t seq);
332 void did_read(Cache* cache, uint32_t offset, bufferlist& bl) {
333 std::lock_guard<std::recursive_mutex> l(cache->lock);
334 Buffer *b = new Buffer(this, Buffer::STATE_CLEAN, 0, offset, bl);
335 b->cache_private = _discard(cache, offset, bl.length());
336 _add_buffer(cache, b, 1, nullptr);
337 }
338
339 void read(Cache* cache, uint32_t offset, uint32_t length,
340 BlueStore::ready_regions_t& res,
341 interval_set<uint32_t>& res_intervals);
342
343 void truncate(Cache* cache, uint32_t offset) {
344 discard(cache, offset, (uint32_t)-1 - offset);
345 }
346
347 void split(Cache* cache, size_t pos, BufferSpace &r);
348
349 void dump(Cache* cache, Formatter *f) const {
350 std::lock_guard<std::recursive_mutex> l(cache->lock);
351 f->open_array_section("buffers");
352 for (auto& i : buffer_map) {
353 f->open_object_section("buffer");
354 assert(i.first == i.second->offset);
355 i.second->dump(f);
356 f->close_section();
357 }
358 f->close_section();
359 }
360 };
361
362 struct SharedBlobSet;
363
364 /// in-memory shared blob state (incl cached buffers)
365 struct SharedBlob {
366 MEMPOOL_CLASS_HELPERS();
367
368 std::atomic_int nref = {0}; ///< reference count
369 bool loaded = false;
370
371 CollectionRef coll;
372 union {
373 uint64_t sbid_unloaded; ///< sbid if persistent isn't loaded
374 bluestore_shared_blob_t *persistent; ///< persistent part of the shared blob if any
375 };
376 BufferSpace bc; ///< buffer cache
377
378 SharedBlob(Collection *_coll) : coll(_coll), sbid_unloaded(0) {
379 if (get_cache()) {
380 get_cache()->add_blob();
381 }
382 }
383 SharedBlob(uint64_t i, Collection *_coll);
384 ~SharedBlob();
385
386 uint64_t get_sbid() const {
387 return loaded ? persistent->sbid : sbid_unloaded;
388 }
389
390 friend void intrusive_ptr_add_ref(SharedBlob *b) { b->get(); }
391 friend void intrusive_ptr_release(SharedBlob *b) { b->put(); }
392
393 friend ostream& operator<<(ostream& out, const SharedBlob& sb);
394
395 void get() {
396 ++nref;
397 }
398 void put();
399
400 /// get logical references
401 void get_ref(uint64_t offset, uint32_t length);
402
403 /// put logical references, and get back any released extents
404 void put_ref(uint64_t offset, uint32_t length,
405 PExtentVector *r, set<SharedBlob*> *maybe_unshared_blobs);
406
407 friend bool operator==(const SharedBlob &l, const SharedBlob &r) {
408 return l.get_sbid() == r.get_sbid();
409 }
410 inline Cache* get_cache() {
411 return coll ? coll->cache : nullptr;
412 }
413 inline SharedBlobSet* get_parent() {
414 return coll ? &(coll->shared_blob_set) : nullptr;
415 }
416 inline bool is_loaded() const {
417 return loaded;
418 }
419
420 };
421 typedef boost::intrusive_ptr<SharedBlob> SharedBlobRef;
422
423 /// a lookup table of SharedBlobs
424 struct SharedBlobSet {
425 std::mutex lock; ///< protect lookup, insertion, removal
426
427 // we use a bare pointer because we don't want to affect the ref
428 // count
429 mempool::bluestore_cache_other::unordered_map<uint64_t,SharedBlob*> sb_map;
430
431 SharedBlobRef lookup(uint64_t sbid) {
432 std::lock_guard<std::mutex> l(lock);
433 auto p = sb_map.find(sbid);
434 if (p == sb_map.end() ||
435 p->second->nref == 0) {
436 return nullptr;
437 }
438 return p->second;
439 }
440
441 void add(Collection* coll, SharedBlob *sb) {
442 std::lock_guard<std::mutex> l(lock);
443 sb_map[sb->get_sbid()] = sb;
444 sb->coll = coll;
445 }
446
447 void remove(SharedBlob *sb) {
448 std::lock_guard<std::mutex> l(lock);
449 assert(sb->get_parent() == this);
450 // only remove if it still points to us
451 auto p = sb_map.find(sb->get_sbid());
452 if (p != sb_map.end() &&
453 p->second == sb) {
454 sb_map.erase(p);
455 }
456 }
457
458 bool empty() {
459 std::lock_guard<std::mutex> l(lock);
460 return sb_map.empty();
461 }
462
463 void dump(CephContext *cct, int lvl);
464 };
465
466 //#define CACHE_BLOB_BL // not sure if this is a win yet or not... :/
467
468 /// in-memory blob metadata and associated cached buffers (if any)
469 struct Blob {
470 MEMPOOL_CLASS_HELPERS();
471
472 std::atomic_int nref = {0}; ///< reference count
473 int16_t id = -1; ///< id, for spanning blobs only, >= 0
474 int16_t last_encoded_id = -1; ///< (ephemeral) used during encoding only
475 SharedBlobRef shared_blob; ///< shared blob state (if any)
476
477 private:
478 mutable bluestore_blob_t blob; ///< decoded blob metadata
479 #ifdef CACHE_BLOB_BL
480 mutable bufferlist blob_bl; ///< cached encoded blob, blob is dirty if empty
481 #endif
482 /// refs from this shard. ephemeral if id<0, persisted if spanning.
483 bluestore_blob_use_tracker_t used_in_blob;
484
485 public:
486
487 friend void intrusive_ptr_add_ref(Blob *b) { b->get(); }
488 friend void intrusive_ptr_release(Blob *b) { b->put(); }
489
490 friend ostream& operator<<(ostream& out, const Blob &b);
491
492 const bluestore_blob_use_tracker_t& get_blob_use_tracker() const {
493 return used_in_blob;
494 }
495 bool is_referenced() const {
496 return used_in_blob.is_not_empty();
497 }
498 uint32_t get_referenced_bytes() const {
499 return used_in_blob.get_referenced_bytes();
500 }
501
502 bool is_spanning() const {
503 return id >= 0;
504 }
505
506 bool can_split() const {
507 std::lock_guard<std::recursive_mutex> l(shared_blob->get_cache()->lock);
508 // splitting a BufferSpace writing list is too hard; don't try.
509 return shared_blob->bc.writing.empty() &&
510 used_in_blob.can_split() &&
511 get_blob().can_split();
512 }
513
514 bool can_split_at(uint32_t blob_offset) const {
515 return used_in_blob.can_split_at(blob_offset) &&
516 get_blob().can_split_at(blob_offset);
517 }
518
519 bool can_reuse_blob(uint32_t min_alloc_size,
520 uint32_t target_blob_size,
521 uint32_t b_offset,
522 uint32_t *length0);
523
524 void dup(Blob& o) {
525 o.shared_blob = shared_blob;
526 o.blob = blob;
527 #ifdef CACHE_BLOB_BL
528 o.blob_bl = blob_bl;
529 #endif
530 }
531
532 inline const bluestore_blob_t& get_blob() const {
533 return blob;
534 }
535 inline bluestore_blob_t& dirty_blob() {
536 #ifdef CACHE_BLOB_BL
537 blob_bl.clear();
538 #endif
539 return blob;
540 }
541
542 /// discard buffers for unallocated regions
543 void discard_unallocated(Collection *coll);
544
545 /// get logical references
546 void get_ref(Collection *coll, uint32_t offset, uint32_t length);
547 /// put logical references, and get back any released extents
548 bool put_ref(Collection *coll, uint32_t offset, uint32_t length,
549 PExtentVector *r);
550
551 /// split the blob
552 void split(Collection *coll, uint32_t blob_offset, Blob *o);
553
554 void get() {
555 ++nref;
556 }
557 void put() {
558 if (--nref == 0)
559 delete this;
560 }
561
562
563 #ifdef CACHE_BLOB_BL
564 void _encode() const {
565 if (blob_bl.length() == 0 ) {
566 ::encode(blob, blob_bl);
567 } else {
568 assert(blob_bl.length());
569 }
570 }
571 void bound_encode(
572 size_t& p,
573 bool include_ref_map) const {
574 _encode();
575 p += blob_bl.length();
576 if (include_ref_map) {
577 used_in_blob.bound_encode(p);
578 }
579 }
580 void encode(
581 bufferlist::contiguous_appender& p,
582 bool include_ref_map) const {
583 _encode();
584 p.append(blob_bl);
585 if (include_ref_map) {
586 used_in_blob.encode(p);
587 }
588 }
589 void decode(
590 Collection */*coll*/,
591 bufferptr::iterator& p,
592 bool include_ref_map) {
593 const char *start = p.get_pos();
594 denc(blob, p);
595 const char *end = p.get_pos();
596 blob_bl.clear();
597 blob_bl.append(start, end - start);
598 if (include_ref_map) {
599 used_in_blob.decode(p);
600 }
601 }
602 #else
603 void bound_encode(
604 size_t& p,
605 uint64_t struct_v,
606 uint64_t sbid,
607 bool include_ref_map) const {
608 denc(blob, p, struct_v);
609 if (blob.is_shared()) {
610 denc(sbid, p);
611 }
612 if (include_ref_map) {
613 used_in_blob.bound_encode(p);
614 }
615 }
616 void encode(
617 bufferlist::contiguous_appender& p,
618 uint64_t struct_v,
619 uint64_t sbid,
620 bool include_ref_map) const {
621 denc(blob, p, struct_v);
622 if (blob.is_shared()) {
623 denc(sbid, p);
624 }
625 if (include_ref_map) {
626 used_in_blob.encode(p);
627 }
628 }
629 void decode(
630 Collection *coll,
631 bufferptr::iterator& p,
632 uint64_t struct_v,
633 uint64_t* sbid,
634 bool include_ref_map);
635 #endif
636 };
637 typedef boost::intrusive_ptr<Blob> BlobRef;
638 typedef mempool::bluestore_cache_other::map<int,BlobRef> blob_map_t;
639
640 /// a logical extent, pointing to (some portion of) a blob
641 typedef boost::intrusive::set_base_hook<boost::intrusive::optimize_size<true> > ExtentBase; //making an alias to avoid build warnings
642 struct Extent : public ExtentBase {
643 MEMPOOL_CLASS_HELPERS();
644
645 uint32_t logical_offset = 0; ///< logical offset
646 uint32_t blob_offset = 0; ///< blob offset
647 uint32_t length = 0; ///< length
648 BlobRef blob; ///< the blob with our data
649
650 /// ctor for lookup only
651 explicit Extent(uint32_t lo) : ExtentBase(), logical_offset(lo) { }
652 /// ctor for delayed initialization (see decode_some())
653 explicit Extent() : ExtentBase() {
654 }
655 /// ctor for general usage
656 Extent(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b)
657 : ExtentBase(),
658 logical_offset(lo), blob_offset(o), length(l) {
659 assign_blob(b);
660 }
661 ~Extent() {
662 if (blob) {
663 blob->shared_blob->get_cache()->rm_extent();
664 }
665 }
666
667 void assign_blob(const BlobRef& b) {
668 assert(!blob);
669 blob = b;
670 blob->shared_blob->get_cache()->add_extent();
671 }
672
673 // comparators for intrusive_set
674 friend bool operator<(const Extent &a, const Extent &b) {
675 return a.logical_offset < b.logical_offset;
676 }
677 friend bool operator>(const Extent &a, const Extent &b) {
678 return a.logical_offset > b.logical_offset;
679 }
680 friend bool operator==(const Extent &a, const Extent &b) {
681 return a.logical_offset == b.logical_offset;
682 }
683
684 uint32_t blob_start() const {
685 return logical_offset - blob_offset;
686 }
687
688 uint32_t blob_end() const {
689 return blob_start() + blob->get_blob().get_logical_length();
690 }
691
692 uint32_t logical_end() const {
693 return logical_offset + length;
694 }
695
696 // return true if any piece of the blob is out of
697 // the given range [o, o + l].
698 bool blob_escapes_range(uint32_t o, uint32_t l) const {
699 return blob_start() < o || blob_end() > o + l;
700 }
701 };
702 typedef boost::intrusive::set<Extent> extent_map_t;
703
704
705 friend ostream& operator<<(ostream& out, const Extent& e);
706
707 struct OldExtent {
708 boost::intrusive::list_member_hook<> old_extent_item;
709 Extent e;
710 PExtentVector r;
711 bool blob_empty; // flag to track the last removed extent that makes blob
712 // empty - required to update compression stat properly
713 OldExtent(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b)
714 : e(lo, o, l, b), blob_empty(false) {
715 }
716 static OldExtent* create(CollectionRef c,
717 uint32_t lo,
718 uint32_t o,
719 uint32_t l,
720 BlobRef& b);
721 };
722 typedef boost::intrusive::list<
723 OldExtent,
724 boost::intrusive::member_hook<
725 OldExtent,
726 boost::intrusive::list_member_hook<>,
727 &OldExtent::old_extent_item> > old_extent_map_t;
728
729 struct Onode;
730
731 /// a sharded extent map, mapping offsets to lextents to blobs
732 struct ExtentMap {
733 Onode *onode;
734 extent_map_t extent_map; ///< map of Extents to Blobs
735 blob_map_t spanning_blob_map; ///< blobs that span shards
736
737 struct Shard {
738 bluestore_onode_t::shard_info *shard_info = nullptr;
739 unsigned extents = 0; ///< count extents in this shard
740 bool loaded = false; ///< true if shard is loaded
741 bool dirty = false; ///< true if shard is dirty and needs reencoding
742 };
743 mempool::bluestore_cache_other::vector<Shard> shards; ///< shards
744
745 bufferlist inline_bl; ///< cached encoded map, if unsharded; empty=>dirty
746
747 uint32_t needs_reshard_begin = 0;
748 uint32_t needs_reshard_end = 0;
749
750 bool needs_reshard() const {
751 return needs_reshard_end > needs_reshard_begin;
752 }
753 void clear_needs_reshard() {
754 needs_reshard_begin = needs_reshard_end = 0;
755 }
756 void request_reshard(uint32_t begin, uint32_t end) {
757 if (begin < needs_reshard_begin) {
758 needs_reshard_begin = begin;
759 }
760 if (end > needs_reshard_end) {
761 needs_reshard_end = end;
762 }
763 }
764
765 struct DeleteDisposer {
766 void operator()(Extent *e) { delete e; }
767 };
768
769 ExtentMap(Onode *o);
770 ~ExtentMap() {
771 extent_map.clear_and_dispose(DeleteDisposer());
772 }
773
774 void clear() {
775 extent_map.clear_and_dispose(DeleteDisposer());
776 shards.clear();
777 inline_bl.clear();
778 clear_needs_reshard();
779 }
780
781 bool encode_some(uint32_t offset, uint32_t length, bufferlist& bl,
782 unsigned *pn);
783 unsigned decode_some(bufferlist& bl);
784
785 void bound_encode_spanning_blobs(size_t& p);
786 void encode_spanning_blobs(bufferlist::contiguous_appender& p);
787 void decode_spanning_blobs(bufferptr::iterator& p);
788
789 BlobRef get_spanning_blob(int id) {
790 auto p = spanning_blob_map.find(id);
791 assert(p != spanning_blob_map.end());
792 return p->second;
793 }
794
795 void update(KeyValueDB::Transaction t, bool force);
796 decltype(BlueStore::Blob::id) allocate_spanning_blob_id();
797 void reshard(
798 KeyValueDB *db,
799 KeyValueDB::Transaction t);
800
801 /// initialize Shards from the onode
802 void init_shards(bool loaded, bool dirty);
803
804 /// return index of shard containing offset
805 /// or -1 if not found
806 int seek_shard(uint32_t offset) {
807 size_t end = shards.size();
808 size_t mid, left = 0;
809 size_t right = end; // one passed the right end
810
811 while (left < right) {
812 mid = left + (right - left) / 2;
813 if (offset >= shards[mid].shard_info->offset) {
814 size_t next = mid + 1;
815 if (next >= end || offset < shards[next].shard_info->offset)
816 return mid;
817 //continue to search forwards
818 left = next;
819 } else {
820 //continue to search backwards
821 right = mid;
822 }
823 }
824
825 return -1; // not found
826 }
827
828 /// check if a range spans a shard
829 bool spans_shard(uint32_t offset, uint32_t length) {
830 if (shards.empty()) {
831 return false;
832 }
833 int s = seek_shard(offset);
834 assert(s >= 0);
835 if (s == (int)shards.size() - 1) {
836 return false; // last shard
837 }
838 if (offset + length <= shards[s+1].shard_info->offset) {
839 return false;
840 }
841 return true;
842 }
843
844 /// ensure that a range of the map is loaded
845 void fault_range(KeyValueDB *db,
846 uint32_t offset, uint32_t length);
847
848 /// ensure a range of the map is marked dirty
849 void dirty_range(uint32_t offset, uint32_t length);
850
851 /// for seek_lextent test
852 extent_map_t::iterator find(uint64_t offset);
853
854 /// seek to the first lextent including or after offset
855 extent_map_t::iterator seek_lextent(uint64_t offset);
856 extent_map_t::const_iterator seek_lextent(uint64_t offset) const;
857
858 /// add a new Extent
859 void add(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b) {
860 extent_map.insert(*new Extent(lo, o, l, b));
861 }
862
863 /// remove (and delete) an Extent
864 void rm(extent_map_t::iterator p) {
865 extent_map.erase_and_dispose(p, DeleteDisposer());
866 }
867
868 bool has_any_lextents(uint64_t offset, uint64_t length);
869
870 /// consolidate adjacent lextents in extent_map
871 int compress_extent_map(uint64_t offset, uint64_t length);
872
873 /// punch a logical hole. add lextents to deref to target list.
874 void punch_hole(CollectionRef &c,
875 uint64_t offset, uint64_t length,
876 old_extent_map_t *old_extents);
877
878 /// put new lextent into lextent_map overwriting existing ones if
879 /// any and update references accordingly
880 Extent *set_lextent(CollectionRef &c,
881 uint64_t logical_offset,
882 uint64_t offset, uint64_t length,
883 BlobRef b,
884 old_extent_map_t *old_extents);
885
886 /// split a blob (and referring extents)
887 BlobRef split_blob(BlobRef lb, uint32_t blob_offset, uint32_t pos);
888 };
889
890 /// Compressed Blob Garbage collector
891 /*
892 The primary idea of the collector is to estimate a difference between
893 allocation units(AU) currently present for compressed blobs and new AUs
894 required to store that data uncompressed.
895 Estimation is performed for protrusive extents within a logical range
896 determined by a concatenation of old_extents collection and specific(current)
897 write request.
898 The root cause for old_extents use is the need to handle blob ref counts
899 properly. Old extents still hold blob refs and hence we need to traverse
900 the collection to determine if blob to be released.
901 Protrusive extents are extents that fit into the blob set in action
902 (ones that are below the logical range from above) but not removed totally
903 due to the current write.
904 E.g. for
905 extent1 <loffs = 100, boffs = 100, len = 100> ->
906 blob1<compressed, len_on_disk=4096, logical_len=8192>
907 extent2 <loffs = 200, boffs = 200, len = 100> ->
908 blob2<raw, len_on_disk=4096, llen=4096>
909 extent3 <loffs = 300, boffs = 300, len = 100> ->
910 blob1<compressed, len_on_disk=4096, llen=8192>
911 extent4 <loffs = 4096, boffs = 0, len = 100> ->
912 blob3<raw, len_on_disk=4096, llen=4096>
913 write(300~100)
914 protrusive extents are within the following ranges <0~300, 400~8192-400>
915 In this case existing AUs that might be removed due to GC (i.e. blob1)
916 use 2x4K bytes.
917 And new AUs expected after GC = 0 since extent1 to be merged into blob2.
918 Hence we should do a collect.
919 */
920 class GarbageCollector
921 {
922 public:
923 /// return amount of allocation units that might be saved due to GC
924 int64_t estimate(
925 uint64_t offset,
926 uint64_t length,
927 const ExtentMap& extent_map,
928 const old_extent_map_t& old_extents,
929 uint64_t min_alloc_size);
930
931 /// return a collection of extents to perform GC on
932 const vector<AllocExtent>& get_extents_to_collect() const {
933 return extents_to_collect;
934 }
935 GarbageCollector(CephContext* _cct) : cct(_cct) {}
936
937 private:
938 struct BlobInfo {
939 uint64_t referenced_bytes = 0; ///< amount of bytes referenced in blob
940 int64_t expected_allocations = 0; ///< new alloc units required
941 ///< in case of gc fulfilled
942 bool collect_candidate = false; ///< indicate if blob has any extents
943 ///< eligible for GC.
944 extent_map_t::const_iterator first_lextent; ///< points to the first
945 ///< lextent referring to
946 ///< the blob if any.
947 ///< collect_candidate flag
948 ///< determines the validity
949 extent_map_t::const_iterator last_lextent; ///< points to the last
950 ///< lextent referring to
951 ///< the blob if any.
952
953 BlobInfo(uint64_t ref_bytes) :
954 referenced_bytes(ref_bytes) {
955 }
956 };
957 CephContext* cct;
958 map<Blob*, BlobInfo> affected_blobs; ///< compressed blobs and their ref_map
959 ///< copies that are affected by the
960 ///< specific write
961
962 vector<AllocExtent> extents_to_collect; ///< protrusive extents that should
963 ///< be collected if GC takes place
964
965 boost::optional<uint64_t > used_alloc_unit; ///< last processed allocation
966 ///< unit when traversing
967 ///< protrusive extents.
968 ///< Other extents mapped to
969 ///< this AU to be ignored
970 ///< (except the case where
971 ///< uncompressed extent follows
972 ///< compressed one - see below).
973 BlobInfo* blob_info_counted = nullptr; ///< set if previous allocation unit
974 ///< caused expected_allocations
975 ///< counter increment at this blob.
976 ///< if uncompressed extent follows
977 ///< a decrement for the
978 ///< expected_allocations counter
979 ///< is needed
980 int64_t expected_allocations = 0; ///< new alloc units required in case
981 ///< of gc fulfilled
982 int64_t expected_for_release = 0; ///< alloc units currently used by
983 ///< compressed blobs that might
984 ///< gone after GC
985 uint64_t gc_start_offset; ///starting offset for GC
986 uint64_t gc_end_offset; ///ending offset for GC
987
988 protected:
989 void process_protrusive_extents(const BlueStore::ExtentMap& extent_map,
990 uint64_t start_offset,
991 uint64_t end_offset,
992 uint64_t start_touch_offset,
993 uint64_t end_touch_offset,
994 uint64_t min_alloc_size);
995 };
996
997 struct OnodeSpace;
998
999 /// an in-memory object
1000 struct Onode {
1001 MEMPOOL_CLASS_HELPERS();
1002
1003 std::atomic_int nref; ///< reference count
1004 Collection *c;
1005
1006 ghobject_t oid;
1007
1008 /// key under PREFIX_OBJ where we are stored
1009 mempool::bluestore_cache_other::string key;
1010
1011 boost::intrusive::list_member_hook<> lru_item;
1012
1013 bluestore_onode_t onode; ///< metadata stored as value in kv store
1014 bool exists; ///< true if object logically exists
1015
1016 ExtentMap extent_map;
1017
1018 // track txc's that have not been committed to kv store (and whose
1019 // effects cannot be read via the kvdb read methods)
1020 std::atomic<int> flushing_count = {0};
1021 std::mutex flush_lock; ///< protect flush_txns
1022 std::condition_variable flush_cond; ///< wait here for uncommitted txns
1023
1024 Onode(Collection *c, const ghobject_t& o,
1025 const mempool::bluestore_cache_other::string& k)
1026 : nref(0),
1027 c(c),
1028 oid(o),
1029 key(k),
1030 exists(false),
1031 extent_map(this) {
1032 }
1033
1034 void flush();
1035 void get() {
1036 ++nref;
1037 }
1038 void put() {
1039 if (--nref == 0)
1040 delete this;
1041 }
1042 };
1043 typedef boost::intrusive_ptr<Onode> OnodeRef;
1044
1045
1046 /// a cache (shard) of onodes and buffers
1047 struct Cache {
1048 CephContext* cct;
1049 PerfCounters *logger;
1050 std::recursive_mutex lock; ///< protect lru and other structures
1051
1052 std::atomic<uint64_t> num_extents = {0};
1053 std::atomic<uint64_t> num_blobs = {0};
1054
1055 static Cache *create(CephContext* cct, string type, PerfCounters *logger);
1056
1057 Cache(CephContext* cct) : cct(cct), logger(nullptr) {}
1058 virtual ~Cache() {}
1059
1060 virtual void _add_onode(OnodeRef& o, int level) = 0;
1061 virtual void _rm_onode(OnodeRef& o) = 0;
1062 virtual void _touch_onode(OnodeRef& o) = 0;
1063
1064 virtual void _add_buffer(Buffer *b, int level, Buffer *near) = 0;
1065 virtual void _rm_buffer(Buffer *b) = 0;
1066 virtual void _move_buffer(Cache *src, Buffer *b) = 0;
1067 virtual void _adjust_buffer_size(Buffer *b, int64_t delta) = 0;
1068 virtual void _touch_buffer(Buffer *b) = 0;
1069
1070 virtual uint64_t _get_num_onodes() = 0;
1071 virtual uint64_t _get_buffer_bytes() = 0;
1072
1073 void add_extent() {
1074 ++num_extents;
1075 }
1076 void rm_extent() {
1077 --num_extents;
1078 }
1079
1080 void add_blob() {
1081 ++num_blobs;
1082 }
1083 void rm_blob() {
1084 --num_blobs;
1085 }
1086
1087 void trim(uint64_t target_bytes,
1088 float target_meta_ratio,
1089 float target_data_ratio,
1090 float bytes_per_onode);
1091
1092 void trim_all();
1093
1094 virtual void _trim(uint64_t onode_max, uint64_t buffer_max) = 0;
1095
1096 virtual void add_stats(uint64_t *onodes, uint64_t *extents,
1097 uint64_t *blobs,
1098 uint64_t *buffers,
1099 uint64_t *bytes) = 0;
1100
1101 bool empty() {
1102 std::lock_guard<std::recursive_mutex> l(lock);
1103 return _get_num_onodes() == 0 && _get_buffer_bytes() == 0;
1104 }
1105
1106 #ifdef DEBUG_CACHE
1107 virtual void _audit(const char *s) = 0;
1108 #else
1109 void _audit(const char *s) { /* no-op */ }
1110 #endif
1111 };
1112
1113 /// simple LRU cache for onodes and buffers
1114 struct LRUCache : public Cache {
1115 private:
1116 typedef boost::intrusive::list<
1117 Onode,
1118 boost::intrusive::member_hook<
1119 Onode,
1120 boost::intrusive::list_member_hook<>,
1121 &Onode::lru_item> > onode_lru_list_t;
1122 typedef boost::intrusive::list<
1123 Buffer,
1124 boost::intrusive::member_hook<
1125 Buffer,
1126 boost::intrusive::list_member_hook<>,
1127 &Buffer::lru_item> > buffer_lru_list_t;
1128
1129 onode_lru_list_t onode_lru;
1130
1131 buffer_lru_list_t buffer_lru;
1132 uint64_t buffer_size = 0;
1133
1134 public:
1135 LRUCache(CephContext* cct) : Cache(cct) {}
1136 uint64_t _get_num_onodes() override {
1137 return onode_lru.size();
1138 }
1139 void _add_onode(OnodeRef& o, int level) override {
1140 if (level > 0)
1141 onode_lru.push_front(*o);
1142 else
1143 onode_lru.push_back(*o);
1144 }
1145 void _rm_onode(OnodeRef& o) override {
1146 auto q = onode_lru.iterator_to(*o);
1147 onode_lru.erase(q);
1148 }
1149 void _touch_onode(OnodeRef& o) override;
1150
1151 uint64_t _get_buffer_bytes() override {
1152 return buffer_size;
1153 }
1154 void _add_buffer(Buffer *b, int level, Buffer *near) override {
1155 if (near) {
1156 auto q = buffer_lru.iterator_to(*near);
1157 buffer_lru.insert(q, *b);
1158 } else if (level > 0) {
1159 buffer_lru.push_front(*b);
1160 } else {
1161 buffer_lru.push_back(*b);
1162 }
1163 buffer_size += b->length;
1164 }
1165 void _rm_buffer(Buffer *b) override {
1166 assert(buffer_size >= b->length);
1167 buffer_size -= b->length;
1168 auto q = buffer_lru.iterator_to(*b);
1169 buffer_lru.erase(q);
1170 }
1171 void _move_buffer(Cache *src, Buffer *b) override {
1172 src->_rm_buffer(b);
1173 _add_buffer(b, 0, nullptr);
1174 }
1175 void _adjust_buffer_size(Buffer *b, int64_t delta) override {
1176 assert((int64_t)buffer_size + delta >= 0);
1177 buffer_size += delta;
1178 }
1179 void _touch_buffer(Buffer *b) override {
1180 auto p = buffer_lru.iterator_to(*b);
1181 buffer_lru.erase(p);
1182 buffer_lru.push_front(*b);
1183 _audit("_touch_buffer end");
1184 }
1185
1186 void _trim(uint64_t onode_max, uint64_t buffer_max) override;
1187
1188 void add_stats(uint64_t *onodes, uint64_t *extents,
1189 uint64_t *blobs,
1190 uint64_t *buffers,
1191 uint64_t *bytes) override {
1192 std::lock_guard<std::recursive_mutex> l(lock);
1193 *onodes += onode_lru.size();
1194 *extents += num_extents;
1195 *blobs += num_blobs;
1196 *buffers += buffer_lru.size();
1197 *bytes += buffer_size;
1198 }
1199
1200 #ifdef DEBUG_CACHE
1201 void _audit(const char *s) override;
1202 #endif
1203 };
1204
1205 // 2Q cache for buffers, LRU for onodes
1206 struct TwoQCache : public Cache {
1207 private:
1208 // stick with LRU for onodes for now (fixme?)
1209 typedef boost::intrusive::list<
1210 Onode,
1211 boost::intrusive::member_hook<
1212 Onode,
1213 boost::intrusive::list_member_hook<>,
1214 &Onode::lru_item> > onode_lru_list_t;
1215 typedef boost::intrusive::list<
1216 Buffer,
1217 boost::intrusive::member_hook<
1218 Buffer,
1219 boost::intrusive::list_member_hook<>,
1220 &Buffer::lru_item> > buffer_list_t;
1221
1222 onode_lru_list_t onode_lru;
1223
1224 buffer_list_t buffer_hot; ///< "Am" hot buffers
1225 buffer_list_t buffer_warm_in; ///< "A1in" newly warm buffers
1226 buffer_list_t buffer_warm_out; ///< "A1out" empty buffers we've evicted
1227 uint64_t buffer_bytes = 0; ///< bytes
1228
1229 enum {
1230 BUFFER_NEW = 0,
1231 BUFFER_WARM_IN, ///< in buffer_warm_in
1232 BUFFER_WARM_OUT, ///< in buffer_warm_out
1233 BUFFER_HOT, ///< in buffer_hot
1234 BUFFER_TYPE_MAX
1235 };
1236
1237 uint64_t buffer_list_bytes[BUFFER_TYPE_MAX] = {0}; ///< bytes per type
1238
1239 public:
1240 TwoQCache(CephContext* cct) : Cache(cct) {}
1241 uint64_t _get_num_onodes() override {
1242 return onode_lru.size();
1243 }
1244 void _add_onode(OnodeRef& o, int level) override {
1245 if (level > 0)
1246 onode_lru.push_front(*o);
1247 else
1248 onode_lru.push_back(*o);
1249 }
1250 void _rm_onode(OnodeRef& o) override {
1251 auto q = onode_lru.iterator_to(*o);
1252 onode_lru.erase(q);
1253 }
1254 void _touch_onode(OnodeRef& o) override;
1255
1256 uint64_t _get_buffer_bytes() override {
1257 return buffer_bytes;
1258 }
1259 void _add_buffer(Buffer *b, int level, Buffer *near) override;
1260 void _rm_buffer(Buffer *b) override;
1261 void _move_buffer(Cache *src, Buffer *b) override;
1262 void _adjust_buffer_size(Buffer *b, int64_t delta) override;
1263 void _touch_buffer(Buffer *b) override {
1264 switch (b->cache_private) {
1265 case BUFFER_WARM_IN:
1266 // do nothing (somewhat counter-intuitively!)
1267 break;
1268 case BUFFER_WARM_OUT:
1269 // move from warm_out to hot LRU
1270 assert(0 == "this happens via discard hint");
1271 break;
1272 case BUFFER_HOT:
1273 // move to front of hot LRU
1274 buffer_hot.erase(buffer_hot.iterator_to(*b));
1275 buffer_hot.push_front(*b);
1276 break;
1277 }
1278 _audit("_touch_buffer end");
1279 }
1280
1281 void _trim(uint64_t onode_max, uint64_t buffer_max) override;
1282
1283 void add_stats(uint64_t *onodes, uint64_t *extents,
1284 uint64_t *blobs,
1285 uint64_t *buffers,
1286 uint64_t *bytes) override {
1287 std::lock_guard<std::recursive_mutex> l(lock);
1288 *onodes += onode_lru.size();
1289 *extents += num_extents;
1290 *blobs += num_blobs;
1291 *buffers += buffer_hot.size() + buffer_warm_in.size();
1292 *bytes += buffer_bytes;
1293 }
1294
1295 #ifdef DEBUG_CACHE
1296 void _audit(const char *s) override;
1297 #endif
1298 };
1299
1300 struct OnodeSpace {
1301 private:
1302 Cache *cache;
1303
1304 /// forward lookups
1305 mempool::bluestore_cache_other::unordered_map<ghobject_t,OnodeRef> onode_map;
1306
1307 friend class Collection; // for split_cache()
1308
1309 public:
1310 OnodeSpace(Cache *c) : cache(c) {}
1311 ~OnodeSpace() {
1312 clear();
1313 }
1314
1315 OnodeRef add(const ghobject_t& oid, OnodeRef o);
1316 OnodeRef lookup(const ghobject_t& o);
1317 void remove(const ghobject_t& oid) {
1318 onode_map.erase(oid);
1319 }
1320 void rename(OnodeRef& o, const ghobject_t& old_oid,
1321 const ghobject_t& new_oid,
1322 const mempool::bluestore_cache_other::string& new_okey);
1323 void clear();
1324 bool empty();
1325
1326 void dump(CephContext *cct, int lvl);
1327
1328 /// return true if f true for any item
1329 bool map_any(std::function<bool(OnodeRef)> f);
1330 };
1331
1332 struct Collection : public CollectionImpl {
1333 BlueStore *store;
1334 Cache *cache; ///< our cache shard
1335 coll_t cid;
1336 bluestore_cnode_t cnode;
1337 RWLock lock;
1338
1339 bool exists;
1340
1341 SharedBlobSet shared_blob_set; ///< open SharedBlobs
1342
1343 // cache onodes on a per-collection basis to avoid lock
1344 // contention.
1345 OnodeSpace onode_map;
1346
1347 //pool options
1348 pool_opts_t pool_opts;
1349
1350 OnodeRef get_onode(const ghobject_t& oid, bool create);
1351
1352 // the terminology is confusing here, sorry!
1353 //
1354 // blob_t shared_blob_t
1355 // !shared unused -> open
1356 // shared !loaded -> open + shared
1357 // shared loaded -> open + shared + loaded
1358 //
1359 // i.e.,
1360 // open = SharedBlob is instantiated
1361 // shared = blob_t shared flag is set; SharedBlob is hashed.
1362 // loaded = SharedBlob::shared_blob_t is loaded from kv store
1363 void open_shared_blob(uint64_t sbid, BlobRef b);
1364 void load_shared_blob(SharedBlobRef sb);
1365 void make_blob_shared(uint64_t sbid, BlobRef b);
1366 uint64_t make_blob_unshared(SharedBlob *sb);
1367
1368 BlobRef new_blob() {
1369 BlobRef b = new Blob();
1370 b->shared_blob = new SharedBlob(this);
1371 return b;
1372 }
1373
1374 const coll_t &get_cid() override {
1375 return cid;
1376 }
1377
1378 bool contains(const ghobject_t& oid) {
1379 if (cid.is_meta())
1380 return oid.hobj.pool == -1;
1381 spg_t spgid;
1382 if (cid.is_pg(&spgid))
1383 return
1384 spgid.pgid.contains(cnode.bits, oid) &&
1385 oid.shard_id == spgid.shard;
1386 return false;
1387 }
1388
1389 void split_cache(Collection *dest);
1390
1391 Collection(BlueStore *ns, Cache *ca, coll_t c);
1392 };
1393
1394 class OmapIteratorImpl : public ObjectMap::ObjectMapIteratorImpl {
1395 CollectionRef c;
1396 OnodeRef o;
1397 KeyValueDB::Iterator it;
1398 string head, tail;
1399 public:
1400 OmapIteratorImpl(CollectionRef c, OnodeRef o, KeyValueDB::Iterator it);
1401 int seek_to_first() override;
1402 int upper_bound(const string &after) override;
1403 int lower_bound(const string &to) override;
1404 bool valid() override;
1405 int next(bool validate=true) override;
1406 string key() override;
1407 bufferlist value() override;
1408 int status() override {
1409 return 0;
1410 }
1411 };
1412
1413 class OpSequencer;
1414 typedef boost::intrusive_ptr<OpSequencer> OpSequencerRef;
1415
1416 struct volatile_statfs{
1417 enum {
1418 STATFS_ALLOCATED = 0,
1419 STATFS_STORED,
1420 STATFS_COMPRESSED_ORIGINAL,
1421 STATFS_COMPRESSED,
1422 STATFS_COMPRESSED_ALLOCATED,
1423 STATFS_LAST
1424 };
1425 int64_t values[STATFS_LAST];
1426 volatile_statfs() {
1427 memset(this, 0, sizeof(volatile_statfs));
1428 }
1429 void reset() {
1430 *this = volatile_statfs();
1431 }
1432 volatile_statfs& operator+=(const volatile_statfs& other) {
1433 for (size_t i = 0; i < STATFS_LAST; ++i) {
1434 values[i] += other.values[i];
1435 }
1436 return *this;
1437 }
1438 int64_t& allocated() {
1439 return values[STATFS_ALLOCATED];
1440 }
1441 int64_t& stored() {
1442 return values[STATFS_STORED];
1443 }
1444 int64_t& compressed_original() {
1445 return values[STATFS_COMPRESSED_ORIGINAL];
1446 }
1447 int64_t& compressed() {
1448 return values[STATFS_COMPRESSED];
1449 }
1450 int64_t& compressed_allocated() {
1451 return values[STATFS_COMPRESSED_ALLOCATED];
1452 }
1453 bool is_empty() {
1454 return values[STATFS_ALLOCATED] == 0 &&
1455 values[STATFS_STORED] == 0 &&
1456 values[STATFS_COMPRESSED] == 0 &&
1457 values[STATFS_COMPRESSED_ORIGINAL] == 0 &&
1458 values[STATFS_COMPRESSED_ALLOCATED] == 0;
1459 }
1460 void decode(bufferlist::iterator& it) {
1461 for (size_t i = 0; i < STATFS_LAST; i++) {
1462 ::decode(values[i], it);
1463 }
1464 }
1465
1466 void encode(bufferlist& bl) {
1467 for (size_t i = 0; i < STATFS_LAST; i++) {
1468 ::encode(values[i], bl);
1469 }
1470 }
1471 };
1472
1473 struct TransContext : public AioContext {
1474 MEMPOOL_CLASS_HELPERS();
1475
1476 typedef enum {
1477 STATE_PREPARE,
1478 STATE_AIO_WAIT,
1479 STATE_IO_DONE,
1480 STATE_KV_QUEUED, // queued for kv_sync_thread submission
1481 STATE_KV_SUBMITTED, // submitted to kv; not yet synced
1482 STATE_KV_DONE,
1483 STATE_DEFERRED_QUEUED, // in deferred_queue (pending or running)
1484 STATE_DEFERRED_CLEANUP, // remove deferred kv record
1485 STATE_DEFERRED_DONE,
1486 STATE_FINISHING,
1487 STATE_DONE,
1488 } state_t;
1489
1490 state_t state = STATE_PREPARE;
1491
1492 const char *get_state_name() {
1493 switch (state) {
1494 case STATE_PREPARE: return "prepare";
1495 case STATE_AIO_WAIT: return "aio_wait";
1496 case STATE_IO_DONE: return "io_done";
1497 case STATE_KV_QUEUED: return "kv_queued";
1498 case STATE_KV_SUBMITTED: return "kv_submitted";
1499 case STATE_KV_DONE: return "kv_done";
1500 case STATE_DEFERRED_QUEUED: return "deferred_queued";
1501 case STATE_DEFERRED_CLEANUP: return "deferred_cleanup";
1502 case STATE_DEFERRED_DONE: return "deferred_done";
1503 case STATE_FINISHING: return "finishing";
1504 case STATE_DONE: return "done";
1505 }
1506 return "???";
1507 }
1508
1509 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1510 const char *get_state_latency_name(int state) {
1511 switch (state) {
1512 case l_bluestore_state_prepare_lat: return "prepare";
1513 case l_bluestore_state_aio_wait_lat: return "aio_wait";
1514 case l_bluestore_state_io_done_lat: return "io_done";
1515 case l_bluestore_state_kv_queued_lat: return "kv_queued";
1516 case l_bluestore_state_kv_committing_lat: return "kv_committing";
1517 case l_bluestore_state_kv_done_lat: return "kv_done";
1518 case l_bluestore_state_deferred_queued_lat: return "deferred_queued";
1519 case l_bluestore_state_deferred_cleanup_lat: return "deferred_cleanup";
1520 case l_bluestore_state_finishing_lat: return "finishing";
1521 case l_bluestore_state_done_lat: return "done";
1522 }
1523 return "???";
1524 }
1525 #endif
1526
1527 void log_state_latency(PerfCounters *logger, int state) {
1528 utime_t lat, now = ceph_clock_now();
1529 lat = now - last_stamp;
1530 logger->tinc(state, lat);
1531 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1532 if (state >= l_bluestore_state_prepare_lat && state <= l_bluestore_state_done_lat) {
1533 double usecs = (now.to_nsec()-last_stamp.to_nsec())/1000;
1534 OID_ELAPSED("", usecs, get_state_latency_name(state));
1535 }
1536 #endif
1537 last_stamp = now;
1538 }
1539
1540 OpSequencerRef osr;
1541 boost::intrusive::list_member_hook<> sequencer_item;
1542
1543 uint64_t bytes = 0, cost = 0;
1544
1545 set<OnodeRef> onodes; ///< these need to be updated/written
1546 set<OnodeRef> modified_objects; ///< objects we modified (and need a ref)
1547 set<SharedBlobRef> shared_blobs; ///< these need to be updated/written
1548 set<SharedBlobRef> shared_blobs_written; ///< update these on io completion
1549
1550 KeyValueDB::Transaction t; ///< then we will commit this
1551 Context *oncommit = nullptr; ///< signal on commit
1552 Context *onreadable = nullptr; ///< signal on readable
1553 Context *onreadable_sync = nullptr; ///< signal on readable
1554 list<Context*> oncommits; ///< more commit completions
1555 list<CollectionRef> removed_collections; ///< colls we removed
1556
1557 boost::intrusive::list_member_hook<> deferred_queue_item;
1558 bluestore_deferred_transaction_t *deferred_txn = nullptr; ///< if any
1559
1560 interval_set<uint64_t> allocated, released;
1561 volatile_statfs statfs_delta;
1562
1563 IOContext ioc;
1564 bool had_ios = false; ///< true if we submitted IOs before our kv txn
1565
1566 uint64_t seq = 0;
1567 utime_t start;
1568 utime_t last_stamp;
1569
1570 uint64_t last_nid = 0; ///< if non-zero, highest new nid we allocated
1571 uint64_t last_blobid = 0; ///< if non-zero, highest new blobid we allocated
1572
1573 explicit TransContext(CephContext* cct, OpSequencer *o)
1574 : osr(o),
1575 ioc(cct, this),
1576 start(ceph_clock_now()) {
1577 last_stamp = start;
1578 }
1579 ~TransContext() {
1580 delete deferred_txn;
1581 }
1582
1583 void write_onode(OnodeRef &o) {
1584 onodes.insert(o);
1585 }
1586 void write_shared_blob(SharedBlobRef &sb) {
1587 shared_blobs.insert(sb);
1588 }
1589 void unshare_blob(SharedBlob *sb) {
1590 shared_blobs.erase(sb);
1591 }
1592
1593 /// note we logically modified object (when onode itself is unmodified)
1594 void note_modified_object(OnodeRef &o) {
1595 // onode itself isn't written, though
1596 modified_objects.insert(o);
1597 }
1598 void removed(OnodeRef& o) {
1599 onodes.erase(o);
1600 modified_objects.erase(o);
1601 }
1602
1603 void aio_finish(BlueStore *store) override {
1604 store->txc_aio_finish(this);
1605 }
1606 };
1607
1608 typedef boost::intrusive::list<
1609 TransContext,
1610 boost::intrusive::member_hook<
1611 TransContext,
1612 boost::intrusive::list_member_hook<>,
1613 &TransContext::deferred_queue_item> > deferred_queue_t;
1614
1615 struct DeferredBatch : public AioContext {
1616 OpSequencer *osr;
1617 struct deferred_io {
1618 bufferlist bl; ///< data
1619 uint64_t seq; ///< deferred transaction seq
1620 };
1621 map<uint64_t,deferred_io> iomap; ///< map of ios in this batch
1622 deferred_queue_t txcs; ///< txcs in this batch
1623 IOContext ioc; ///< our aios
1624 /// bytes of pending io for each deferred seq (may be 0)
1625 map<uint64_t,int> seq_bytes;
1626
1627 void _discard(CephContext *cct, uint64_t offset, uint64_t length);
1628 void _audit(CephContext *cct);
1629
1630 DeferredBatch(CephContext *cct, OpSequencer *osr)
1631 : osr(osr), ioc(cct, this) {}
1632
1633 /// prepare a write
1634 void prepare_write(CephContext *cct,
1635 uint64_t seq, uint64_t offset, uint64_t length,
1636 bufferlist::const_iterator& p);
1637
1638 void aio_finish(BlueStore *store) override {
1639 store->_deferred_aio_finish(osr);
1640 }
1641 };
1642
1643 class OpSequencer : public Sequencer_impl {
1644 public:
1645 std::mutex qlock;
1646 std::condition_variable qcond;
1647 typedef boost::intrusive::list<
1648 TransContext,
1649 boost::intrusive::member_hook<
1650 TransContext,
1651 boost::intrusive::list_member_hook<>,
1652 &TransContext::sequencer_item> > q_list_t;
1653 q_list_t q; ///< transactions
1654
1655 boost::intrusive::list_member_hook<> deferred_osr_queue_item;
1656
1657 DeferredBatch *deferred_running = nullptr;
1658 DeferredBatch *deferred_pending = nullptr;
1659
1660 Sequencer *parent;
1661 BlueStore *store;
1662
1663 uint64_t last_seq = 0;
1664
1665 std::atomic_int txc_with_unstable_io = {0}; ///< num txcs with unstable io
1666
1667 std::atomic_int kv_committing_serially = {0};
1668
1669 std::atomic_int kv_submitted_waiters = {0};
1670
1671 std::atomic_bool registered = {true}; ///< registered in BlueStore's osr_set
1672 std::atomic_bool zombie = {false}; ///< owning Sequencer has gone away
1673
1674 OpSequencer(CephContext* cct, BlueStore *store)
1675 : Sequencer_impl(cct),
1676 parent(NULL), store(store) {
1677 store->register_osr(this);
1678 }
1679 ~OpSequencer() override {
1680 assert(q.empty());
1681 _unregister();
1682 }
1683
1684 void discard() override {
1685 // Note that we may have txc's in flight when the parent Sequencer
1686 // goes away. Reflect this with zombie==registered==true and let
1687 // _osr_drain_all clean up later.
1688 assert(!zombie);
1689 zombie = true;
1690 parent = nullptr;
1691 bool empty;
1692 {
1693 std::lock_guard<std::mutex> l(qlock);
1694 empty = q.empty();
1695 }
1696 if (empty) {
1697 _unregister();
1698 }
1699 }
1700
1701 void _unregister() {
1702 if (registered) {
1703 store->unregister_osr(this);
1704 registered = false;
1705 }
1706 }
1707
1708 void queue_new(TransContext *txc) {
1709 std::lock_guard<std::mutex> l(qlock);
1710 txc->seq = ++last_seq;
1711 q.push_back(*txc);
1712 }
1713
1714 void drain() {
1715 std::unique_lock<std::mutex> l(qlock);
1716 while (!q.empty())
1717 qcond.wait(l);
1718 }
1719
1720 void drain_preceding(TransContext *txc) {
1721 std::unique_lock<std::mutex> l(qlock);
1722 while (!q.empty() && &q.front() != txc)
1723 qcond.wait(l);
1724 }
1725
1726 bool _is_all_kv_submitted() {
1727 // caller must hold qlock
1728 if (q.empty()) {
1729 return true;
1730 }
1731 TransContext *txc = &q.back();
1732 if (txc->state >= TransContext::STATE_KV_SUBMITTED) {
1733 return true;
1734 }
1735 return false;
1736 }
1737
1738 void flush() override {
1739 std::unique_lock<std::mutex> l(qlock);
1740 while (true) {
1741 // set flag before the check because the condition
1742 // may become true outside qlock, and we need to make
1743 // sure those threads see waiters and signal qcond.
1744 ++kv_submitted_waiters;
1745 if (_is_all_kv_submitted()) {
1746 return;
1747 }
1748 qcond.wait(l);
1749 --kv_submitted_waiters;
1750 }
1751 }
1752
1753 bool flush_commit(Context *c) override {
1754 std::lock_guard<std::mutex> l(qlock);
1755 if (q.empty()) {
1756 return true;
1757 }
1758 TransContext *txc = &q.back();
1759 if (txc->state >= TransContext::STATE_KV_DONE) {
1760 return true;
1761 }
1762 txc->oncommits.push_back(c);
1763 return false;
1764 }
1765 };
1766
1767 typedef boost::intrusive::list<
1768 OpSequencer,
1769 boost::intrusive::member_hook<
1770 OpSequencer,
1771 boost::intrusive::list_member_hook<>,
1772 &OpSequencer::deferred_osr_queue_item> > deferred_osr_queue_t;
1773
1774 struct KVSyncThread : public Thread {
1775 BlueStore *store;
1776 explicit KVSyncThread(BlueStore *s) : store(s) {}
1777 void *entry() override {
1778 store->_kv_sync_thread();
1779 return NULL;
1780 }
1781 };
1782 struct KVFinalizeThread : public Thread {
1783 BlueStore *store;
1784 explicit KVFinalizeThread(BlueStore *s) : store(s) {}
1785 void *entry() {
1786 store->_kv_finalize_thread();
1787 return NULL;
1788 }
1789 };
1790
1791 struct DBHistogram {
1792 struct value_dist {
1793 uint64_t count;
1794 uint32_t max_len;
1795 };
1796
1797 struct key_dist {
1798 uint64_t count;
1799 uint32_t max_len;
1800 map<int, struct value_dist> val_map; ///< slab id to count, max length of value and key
1801 };
1802
1803 map<string, map<int, struct key_dist> > key_hist;
1804 map<int, uint64_t> value_hist;
1805 int get_key_slab(size_t sz);
1806 string get_key_slab_to_range(int slab);
1807 int get_value_slab(size_t sz);
1808 string get_value_slab_to_range(int slab);
1809 void update_hist_entry(map<string, map<int, struct key_dist> > &key_hist,
1810 const string &prefix, size_t key_size, size_t value_size);
1811 void dump(Formatter *f);
1812 };
1813
1814 // --------------------------------------------------------
1815 // members
1816 private:
1817 BlueFS *bluefs = nullptr;
1818 unsigned bluefs_shared_bdev = 0; ///< which bluefs bdev we are sharing
1819 bool bluefs_single_shared_device = true;
1820 utime_t bluefs_last_balance;
1821
1822 KeyValueDB *db = nullptr;
1823 BlockDevice *bdev = nullptr;
1824 std::string freelist_type;
1825 FreelistManager *fm = nullptr;
1826 Allocator *alloc = nullptr;
1827 uuid_d fsid;
1828 int path_fd = -1; ///< open handle to $path
1829 int fsid_fd = -1; ///< open handle (locked) to $path/fsid
1830 bool mounted = false;
1831
1832 RWLock coll_lock = {"BlueStore::coll_lock"}; ///< rwlock to protect coll_map
1833 mempool::bluestore_cache_other::unordered_map<coll_t, CollectionRef> coll_map;
1834
1835 vector<Cache*> cache_shards;
1836
1837 std::mutex osr_lock; ///< protect osd_set
1838 std::set<OpSequencerRef> osr_set; ///< set of all OpSequencers
1839
1840 std::atomic<uint64_t> nid_last = {0};
1841 std::atomic<uint64_t> nid_max = {0};
1842 std::atomic<uint64_t> blobid_last = {0};
1843 std::atomic<uint64_t> blobid_max = {0};
1844
1845 Throttle throttle_bytes; ///< submit to commit
1846 Throttle throttle_deferred_bytes; ///< submit to deferred complete
1847
1848 interval_set<uint64_t> bluefs_extents; ///< block extents owned by bluefs
1849 interval_set<uint64_t> bluefs_extents_reclaiming; ///< currently reclaiming
1850
1851 std::mutex deferred_lock;
1852 std::atomic<uint64_t> deferred_seq = {0};
1853 deferred_osr_queue_t deferred_queue; ///< osr's with deferred io pending
1854 int deferred_queue_size = 0; ///< num txc's queued across all osrs
1855 atomic_int deferred_aggressive = {0}; ///< aggressive wakeup of kv thread
1856 Finisher deferred_finisher;
1857
1858 int m_finisher_num = 1;
1859 vector<Finisher*> finishers;
1860
1861 KVSyncThread kv_sync_thread;
1862 std::mutex kv_lock;
1863 std::condition_variable kv_cond;
1864 bool _kv_only = false;
1865 bool kv_sync_started = false;
1866 bool kv_stop = false;
1867 bool kv_finalize_started = false;
1868 bool kv_finalize_stop = false;
1869 deque<TransContext*> kv_queue; ///< ready, already submitted
1870 deque<TransContext*> kv_queue_unsubmitted; ///< ready, need submit by kv thread
1871 deque<TransContext*> kv_committing; ///< currently syncing
1872 deque<DeferredBatch*> deferred_done_queue; ///< deferred ios done
1873 deque<DeferredBatch*> deferred_stable_queue; ///< deferred ios done + stable
1874
1875 KVFinalizeThread kv_finalize_thread;
1876 std::mutex kv_finalize_lock;
1877 std::condition_variable kv_finalize_cond;
1878 deque<TransContext*> kv_committing_to_finalize; ///< pending finalization
1879 deque<DeferredBatch*> deferred_stable_to_finalize; ///< pending finalization
1880
1881 PerfCounters *logger = nullptr;
1882
1883 list<CollectionRef> removed_collections;
1884
1885 RWLock debug_read_error_lock = {"BlueStore::debug_read_error_lock"};
1886 set<ghobject_t> debug_data_error_objects;
1887 set<ghobject_t> debug_mdata_error_objects;
1888
1889 std::atomic<int> csum_type = {Checksummer::CSUM_CRC32C};
1890
1891 uint64_t block_size = 0; ///< block size of block device (power of 2)
1892 uint64_t block_mask = 0; ///< mask to get just the block offset
1893 size_t block_size_order = 0; ///< bits to shift to get block size
1894
1895 uint64_t min_alloc_size = 0; ///< minimum allocation unit (power of 2)
1896 ///< bits for min_alloc_size
1897 uint8_t min_alloc_size_order = 0;
1898 static_assert(std::numeric_limits<uint8_t>::max() >
1899 std::numeric_limits<decltype(min_alloc_size)>::digits,
1900 "not enough bits for min_alloc_size");
1901
1902 ///< maximum allocation unit (power of 2)
1903 std::atomic<uint64_t> max_alloc_size = {0};
1904
1905 ///< number threshold for forced deferred writes
1906 std::atomic<int> deferred_batch_ops = {0};
1907
1908 ///< size threshold for forced deferred writes
1909 std::atomic<uint64_t> prefer_deferred_size = {0};
1910
1911 ///< approx cost per io, in bytes
1912 std::atomic<uint64_t> throttle_cost_per_io = {0};
1913
1914 std::atomic<Compressor::CompressionMode> comp_mode =
1915 {Compressor::COMP_NONE}; ///< compression mode
1916 CompressorRef compressor;
1917 std::atomic<uint64_t> comp_min_blob_size = {0};
1918 std::atomic<uint64_t> comp_max_blob_size = {0};
1919
1920 std::atomic<uint64_t> max_blob_size = {0}; ///< maximum blob size
1921
1922 uint64_t kv_ios = 0;
1923 uint64_t kv_throttle_costs = 0;
1924
1925 // cache trim control
1926 uint64_t cache_size = 0; ///< total cache size
1927 float cache_meta_ratio = 0; ///< cache ratio dedicated to metadata
1928 float cache_kv_ratio = 0; ///< cache ratio dedicated to kv (e.g., rocksdb)
1929 float cache_data_ratio = 0; ///< cache ratio dedicated to object data
1930
1931 std::mutex vstatfs_lock;
1932 volatile_statfs vstatfs;
1933
1934 struct MempoolThread : public Thread {
1935 BlueStore *store;
1936 Cond cond;
1937 Mutex lock;
1938 bool stop = false;
1939 public:
1940 explicit MempoolThread(BlueStore *s)
1941 : store(s),
1942 lock("BlueStore::MempoolThread::lock") {}
1943 void *entry() override;
1944 void init() {
1945 assert(stop == false);
1946 create("bstore_mempool");
1947 }
1948 void shutdown() {
1949 lock.Lock();
1950 stop = true;
1951 cond.Signal();
1952 lock.Unlock();
1953 join();
1954 }
1955 } mempool_thread;
1956
1957 // --------------------------------------------------------
1958 // private methods
1959
1960 void _init_logger();
1961 void _shutdown_logger();
1962 int _reload_logger();
1963
1964 int _open_path();
1965 void _close_path();
1966 int _open_fsid(bool create);
1967 int _lock_fsid();
1968 int _read_fsid(uuid_d *f);
1969 int _write_fsid();
1970 void _close_fsid();
1971 void _set_alloc_sizes();
1972 void _set_blob_size();
1973
1974 int _open_bdev(bool create);
1975 void _close_bdev();
1976 int _open_db(bool create);
1977 void _close_db();
1978 int _open_fm(bool create);
1979 void _close_fm();
1980 int _open_alloc();
1981 void _close_alloc();
1982 int _open_collections(int *errors=0);
1983 void _close_collections();
1984
1985 int _setup_block_symlink_or_file(string name, string path, uint64_t size,
1986 bool create);
1987
1988 public:
1989 static int _write_bdev_label(CephContext* cct,
1990 string path, bluestore_bdev_label_t label);
1991 static int _read_bdev_label(CephContext* cct, string path,
1992 bluestore_bdev_label_t *label);
1993 private:
1994 int _check_or_set_bdev_label(string path, uint64_t size, string desc,
1995 bool create);
1996
1997 int _open_super_meta();
1998
1999 void _open_statfs();
2000
2001 int _reconcile_bluefs_freespace();
2002 int _balance_bluefs_freespace(PExtentVector *extents);
2003 void _commit_bluefs_freespace(const PExtentVector& extents);
2004
2005 CollectionRef _get_collection(const coll_t& cid);
2006 void _queue_reap_collection(CollectionRef& c);
2007 void _reap_collections();
2008 void _update_cache_logger();
2009
2010 void _assign_nid(TransContext *txc, OnodeRef o);
2011 uint64_t _assign_blobid(TransContext *txc);
2012
2013 void _dump_onode(const OnodeRef& o, int log_level=30);
2014 void _dump_extent_map(ExtentMap& em, int log_level=30);
2015 void _dump_transaction(Transaction *t, int log_level = 30);
2016
2017 TransContext *_txc_create(OpSequencer *osr);
2018 void _txc_update_store_statfs(TransContext *txc);
2019 void _txc_add_transaction(TransContext *txc, Transaction *t);
2020 void _txc_calc_cost(TransContext *txc);
2021 void _txc_write_nodes(TransContext *txc, KeyValueDB::Transaction t);
2022 void _txc_state_proc(TransContext *txc);
2023 void _txc_aio_submit(TransContext *txc);
2024 public:
2025 void txc_aio_finish(void *p) {
2026 _txc_state_proc(static_cast<TransContext*>(p));
2027 }
2028 private:
2029 void _txc_finish_io(TransContext *txc);
2030 void _txc_finalize_kv(TransContext *txc, KeyValueDB::Transaction t);
2031 void _txc_applied_kv(TransContext *txc);
2032 void _txc_committed_kv(TransContext *txc);
2033 void _txc_finish(TransContext *txc);
2034 void _txc_release_alloc(TransContext *txc);
2035
2036 void _osr_drain_preceding(TransContext *txc);
2037 void _osr_drain_all();
2038 void _osr_unregister_all();
2039
2040 void _kv_start();
2041 void _kv_stop();
2042 void _kv_sync_thread();
2043 void _kv_finalize_thread();
2044
2045 bluestore_deferred_op_t *_get_deferred_op(TransContext *txc, OnodeRef o);
2046 void _deferred_queue(TransContext *txc);
2047 public:
2048 void deferred_try_submit();
2049 private:
2050 void _deferred_submit_unlock(OpSequencer *osr);
2051 void _deferred_aio_finish(OpSequencer *osr);
2052 int _deferred_replay();
2053
2054 public:
2055 using mempool_dynamic_bitset =
2056 boost::dynamic_bitset<uint64_t,
2057 mempool::bluestore_fsck::pool_allocator<uint64_t>>;
2058
2059 private:
2060 int _fsck_check_extents(
2061 const ghobject_t& oid,
2062 const PExtentVector& extents,
2063 bool compressed,
2064 mempool_dynamic_bitset &used_blocks,
2065 uint64_t granularity,
2066 store_statfs_t& expected_statfs);
2067
2068 void _buffer_cache_write(
2069 TransContext *txc,
2070 BlobRef b,
2071 uint64_t offset,
2072 bufferlist& bl,
2073 unsigned flags) {
2074 b->shared_blob->bc.write(b->shared_blob->get_cache(), txc->seq, offset, bl,
2075 flags);
2076 txc->shared_blobs_written.insert(b->shared_blob);
2077 }
2078
2079 int _collection_list(
2080 Collection *c, const ghobject_t& start, const ghobject_t& end,
2081 int max, vector<ghobject_t> *ls, ghobject_t *next);
2082
2083 template <typename T, typename F>
2084 T select_option(const std::string& opt_name, T val1, F f) {
2085 //NB: opt_name reserved for future use
2086 boost::optional<T> val2 = f();
2087 if (val2) {
2088 return *val2;
2089 }
2090 return val1;
2091 }
2092
2093 void _apply_padding(uint64_t head_pad,
2094 uint64_t tail_pad,
2095 bufferlist& padded);
2096
2097 // -- ondisk version ---
2098 public:
2099 const int32_t latest_ondisk_format = 2; ///< our version
2100 const int32_t min_readable_ondisk_format = 1; ///< what we can read
2101 const int32_t min_compat_ondisk_format = 2; ///< who can read us
2102
2103 private:
2104 int32_t ondisk_format = 0; ///< value detected on mount
2105
2106 int _upgrade_super(); ///< upgrade (called during open_super)
2107 void _prepare_ondisk_format_super(KeyValueDB::Transaction& t);
2108
2109 // --- public interface ---
2110 public:
2111 BlueStore(CephContext *cct, const string& path);
2112 BlueStore(CephContext *cct, const string& path, uint64_t min_alloc_size); // Ctor for UT only
2113 ~BlueStore() override;
2114
2115 string get_type() override {
2116 return "bluestore";
2117 }
2118
2119 bool needs_journal() override { return false; };
2120 bool wants_journal() override { return false; };
2121 bool allows_journal() override { return false; };
2122
2123 bool is_rotational() override;
2124 bool is_journal_rotational() override;
2125
2126 string get_default_device_class() override {
2127 string device_class;
2128 map<string, string> metadata;
2129 collect_metadata(&metadata);
2130 auto it = metadata.find("bluestore_bdev_type");
2131 if (it != metadata.end()) {
2132 device_class = it->second;
2133 }
2134 return device_class;
2135 }
2136
2137 static int get_block_device_fsid(CephContext* cct, const string& path,
2138 uuid_d *fsid);
2139
2140 bool test_mount_in_use() override;
2141
2142 private:
2143 int _mount(bool kv_only);
2144 public:
2145 int mount() override {
2146 return _mount(false);
2147 }
2148 int umount() override;
2149
2150 int start_kv_only(KeyValueDB **pdb) {
2151 int r = _mount(true);
2152 if (r < 0)
2153 return r;
2154 *pdb = db;
2155 return 0;
2156 }
2157
2158 int write_meta(const std::string& key, const std::string& value) override;
2159 int read_meta(const std::string& key, std::string *value) override;
2160
2161
2162 int fsck(bool deep) override {
2163 return _fsck(deep, false);
2164 }
2165 int repair(bool deep) override {
2166 return _fsck(deep, true);
2167 }
2168 int _fsck(bool deep, bool repair);
2169
2170 void set_cache_shards(unsigned num) override;
2171
2172 int validate_hobject_key(const hobject_t &obj) const override {
2173 return 0;
2174 }
2175 unsigned get_max_attr_name_length() override {
2176 return 256; // arbitrary; there is no real limit internally
2177 }
2178
2179 int mkfs() override;
2180 int mkjournal() override {
2181 return 0;
2182 }
2183
2184 void get_db_statistics(Formatter *f) override;
2185 void generate_db_histogram(Formatter *f) override;
2186 void _flush_cache();
2187 void flush_cache() override;
2188 void dump_perf_counters(Formatter *f) override {
2189 f->open_object_section("perf_counters");
2190 logger->dump_formatted(f, false);
2191 f->close_section();
2192 }
2193
2194 void register_osr(OpSequencer *osr) {
2195 std::lock_guard<std::mutex> l(osr_lock);
2196 osr_set.insert(osr);
2197 }
2198 void unregister_osr(OpSequencer *osr) {
2199 std::lock_guard<std::mutex> l(osr_lock);
2200 osr_set.erase(osr);
2201 }
2202
2203 public:
2204 int statfs(struct store_statfs_t *buf) override;
2205
2206 void collect_metadata(map<string,string> *pm) override;
2207
2208 bool exists(const coll_t& cid, const ghobject_t& oid) override;
2209 bool exists(CollectionHandle &c, const ghobject_t& oid) override;
2210 int set_collection_opts(
2211 const coll_t& cid,
2212 const pool_opts_t& opts) override;
2213 int stat(
2214 const coll_t& cid,
2215 const ghobject_t& oid,
2216 struct stat *st,
2217 bool allow_eio = false) override;
2218 int stat(
2219 CollectionHandle &c,
2220 const ghobject_t& oid,
2221 struct stat *st,
2222 bool allow_eio = false) override;
2223 int read(
2224 const coll_t& cid,
2225 const ghobject_t& oid,
2226 uint64_t offset,
2227 size_t len,
2228 bufferlist& bl,
2229 uint32_t op_flags = 0) override;
2230 int read(
2231 CollectionHandle &c,
2232 const ghobject_t& oid,
2233 uint64_t offset,
2234 size_t len,
2235 bufferlist& bl,
2236 uint32_t op_flags = 0) override;
2237 int _do_read(
2238 Collection *c,
2239 OnodeRef o,
2240 uint64_t offset,
2241 size_t len,
2242 bufferlist& bl,
2243 uint32_t op_flags = 0);
2244
2245 private:
2246 int _fiemap(CollectionHandle &c_, const ghobject_t& oid,
2247 uint64_t offset, size_t len, interval_set<uint64_t>& destset);
2248 public:
2249 int fiemap(const coll_t& cid, const ghobject_t& oid,
2250 uint64_t offset, size_t len, bufferlist& bl) override;
2251 int fiemap(CollectionHandle &c, const ghobject_t& oid,
2252 uint64_t offset, size_t len, bufferlist& bl) override;
2253 int fiemap(const coll_t& cid, const ghobject_t& oid,
2254 uint64_t offset, size_t len, map<uint64_t, uint64_t>& destmap) override;
2255 int fiemap(CollectionHandle &c, const ghobject_t& oid,
2256 uint64_t offset, size_t len, map<uint64_t, uint64_t>& destmap) override;
2257
2258
2259 int getattr(const coll_t& cid, const ghobject_t& oid, const char *name,
2260 bufferptr& value) override;
2261 int getattr(CollectionHandle &c, const ghobject_t& oid, const char *name,
2262 bufferptr& value) override;
2263
2264 int getattrs(const coll_t& cid, const ghobject_t& oid,
2265 map<string,bufferptr>& aset) override;
2266 int getattrs(CollectionHandle &c, const ghobject_t& oid,
2267 map<string,bufferptr>& aset) override;
2268
2269 int list_collections(vector<coll_t>& ls) override;
2270
2271 CollectionHandle open_collection(const coll_t &c) override;
2272
2273 bool collection_exists(const coll_t& c) override;
2274 int collection_empty(const coll_t& c, bool *empty) override;
2275 int collection_bits(const coll_t& c) override;
2276
2277 int collection_list(const coll_t& cid,
2278 const ghobject_t& start,
2279 const ghobject_t& end,
2280 int max,
2281 vector<ghobject_t> *ls, ghobject_t *next) override;
2282 int collection_list(CollectionHandle &c,
2283 const ghobject_t& start,
2284 const ghobject_t& end,
2285 int max,
2286 vector<ghobject_t> *ls, ghobject_t *next) override;
2287
2288 int omap_get(
2289 const coll_t& cid, ///< [in] Collection containing oid
2290 const ghobject_t &oid, ///< [in] Object containing omap
2291 bufferlist *header, ///< [out] omap header
2292 map<string, bufferlist> *out /// < [out] Key to value map
2293 ) override;
2294 int omap_get(
2295 CollectionHandle &c, ///< [in] Collection containing oid
2296 const ghobject_t &oid, ///< [in] Object containing omap
2297 bufferlist *header, ///< [out] omap header
2298 map<string, bufferlist> *out /// < [out] Key to value map
2299 ) override;
2300
2301 /// Get omap header
2302 int omap_get_header(
2303 const coll_t& cid, ///< [in] Collection containing oid
2304 const ghobject_t &oid, ///< [in] Object containing omap
2305 bufferlist *header, ///< [out] omap header
2306 bool allow_eio = false ///< [in] don't assert on eio
2307 ) override;
2308 int omap_get_header(
2309 CollectionHandle &c, ///< [in] Collection containing oid
2310 const ghobject_t &oid, ///< [in] Object containing omap
2311 bufferlist *header, ///< [out] omap header
2312 bool allow_eio = false ///< [in] don't assert on eio
2313 ) override;
2314
2315 /// Get keys defined on oid
2316 int omap_get_keys(
2317 const coll_t& cid, ///< [in] Collection containing oid
2318 const ghobject_t &oid, ///< [in] Object containing omap
2319 set<string> *keys ///< [out] Keys defined on oid
2320 ) override;
2321 int omap_get_keys(
2322 CollectionHandle &c, ///< [in] Collection containing oid
2323 const ghobject_t &oid, ///< [in] Object containing omap
2324 set<string> *keys ///< [out] Keys defined on oid
2325 ) override;
2326
2327 /// Get key values
2328 int omap_get_values(
2329 const coll_t& cid, ///< [in] Collection containing oid
2330 const ghobject_t &oid, ///< [in] Object containing omap
2331 const set<string> &keys, ///< [in] Keys to get
2332 map<string, bufferlist> *out ///< [out] Returned keys and values
2333 ) override;
2334 int omap_get_values(
2335 CollectionHandle &c, ///< [in] Collection containing oid
2336 const ghobject_t &oid, ///< [in] Object containing omap
2337 const set<string> &keys, ///< [in] Keys to get
2338 map<string, bufferlist> *out ///< [out] Returned keys and values
2339 ) override;
2340
2341 /// Filters keys into out which are defined on oid
2342 int omap_check_keys(
2343 const coll_t& cid, ///< [in] Collection containing oid
2344 const ghobject_t &oid, ///< [in] Object containing omap
2345 const set<string> &keys, ///< [in] Keys to check
2346 set<string> *out ///< [out] Subset of keys defined on oid
2347 ) override;
2348 int omap_check_keys(
2349 CollectionHandle &c, ///< [in] Collection containing oid
2350 const ghobject_t &oid, ///< [in] Object containing omap
2351 const set<string> &keys, ///< [in] Keys to check
2352 set<string> *out ///< [out] Subset of keys defined on oid
2353 ) override;
2354
2355 ObjectMap::ObjectMapIterator get_omap_iterator(
2356 const coll_t& cid, ///< [in] collection
2357 const ghobject_t &oid ///< [in] object
2358 ) override;
2359 ObjectMap::ObjectMapIterator get_omap_iterator(
2360 CollectionHandle &c, ///< [in] collection
2361 const ghobject_t &oid ///< [in] object
2362 ) override;
2363
2364 void set_fsid(uuid_d u) override {
2365 fsid = u;
2366 }
2367 uuid_d get_fsid() override {
2368 return fsid;
2369 }
2370
2371 uint64_t estimate_objects_overhead(uint64_t num_objects) override {
2372 return num_objects * 300; //assuming per-object overhead is 300 bytes
2373 }
2374
2375 struct BSPerfTracker {
2376 PerfCounters::avg_tracker<uint64_t> os_commit_latency;
2377 PerfCounters::avg_tracker<uint64_t> os_apply_latency;
2378
2379 objectstore_perf_stat_t get_cur_stats() const {
2380 objectstore_perf_stat_t ret;
2381 ret.os_commit_latency = os_commit_latency.current_avg();
2382 ret.os_apply_latency = os_apply_latency.current_avg();
2383 return ret;
2384 }
2385
2386 void update_from_perfcounters(PerfCounters &logger);
2387 } perf_tracker;
2388
2389 objectstore_perf_stat_t get_cur_stats() override {
2390 perf_tracker.update_from_perfcounters(*logger);
2391 return perf_tracker.get_cur_stats();
2392 }
2393 const PerfCounters* get_perf_counters() const override {
2394 return logger;
2395 }
2396
2397 int queue_transactions(
2398 Sequencer *osr,
2399 vector<Transaction>& tls,
2400 TrackedOpRef op = TrackedOpRef(),
2401 ThreadPool::TPHandle *handle = NULL) override;
2402
2403 // error injection
2404 void inject_data_error(const ghobject_t& o) override {
2405 RWLock::WLocker l(debug_read_error_lock);
2406 debug_data_error_objects.insert(o);
2407 }
2408 void inject_mdata_error(const ghobject_t& o) override {
2409 RWLock::WLocker l(debug_read_error_lock);
2410 debug_mdata_error_objects.insert(o);
2411 }
2412 void compact() override {
2413 assert(db);
2414 db->compact();
2415 }
2416 bool has_builtin_csum() const override {
2417 return true;
2418 }
2419
2420 private:
2421 bool _debug_data_eio(const ghobject_t& o) {
2422 if (!cct->_conf->bluestore_debug_inject_read_err) {
2423 return false;
2424 }
2425 RWLock::RLocker l(debug_read_error_lock);
2426 return debug_data_error_objects.count(o);
2427 }
2428 bool _debug_mdata_eio(const ghobject_t& o) {
2429 if (!cct->_conf->bluestore_debug_inject_read_err) {
2430 return false;
2431 }
2432 RWLock::RLocker l(debug_read_error_lock);
2433 return debug_mdata_error_objects.count(o);
2434 }
2435 void _debug_obj_on_delete(const ghobject_t& o) {
2436 if (cct->_conf->bluestore_debug_inject_read_err) {
2437 RWLock::WLocker l(debug_read_error_lock);
2438 debug_data_error_objects.erase(o);
2439 debug_mdata_error_objects.erase(o);
2440 }
2441 }
2442
2443 private:
2444
2445 // --------------------------------------------------------
2446 // read processing internal methods
2447 int _verify_csum(
2448 OnodeRef& o,
2449 const bluestore_blob_t* blob,
2450 uint64_t blob_xoffset,
2451 const bufferlist& bl,
2452 uint64_t logical_offset) const;
2453 int _decompress(bufferlist& source, bufferlist* result);
2454
2455
2456 // --------------------------------------------------------
2457 // write ops
2458
2459 struct WriteContext {
2460 bool buffered = false; ///< buffered write
2461 bool compress = false; ///< compressed write
2462 uint64_t target_blob_size = 0; ///< target (max) blob size
2463 unsigned csum_order = 0; ///< target checksum chunk order
2464
2465 old_extent_map_t old_extents; ///< must deref these blobs
2466
2467 struct write_item {
2468 uint64_t logical_offset; ///< write logical offset
2469 BlobRef b;
2470 uint64_t blob_length;
2471 uint64_t b_off;
2472 bufferlist bl;
2473 uint64_t b_off0; ///< original offset in a blob prior to padding
2474 uint64_t length0; ///< original data length prior to padding
2475
2476 bool mark_unused;
2477 bool new_blob; ///< whether new blob was created
2478
2479 bool compressed = false;
2480 bufferlist compressed_bl;
2481 size_t compressed_len = 0;
2482
2483 write_item(
2484 uint64_t logical_offs,
2485 BlobRef b,
2486 uint64_t blob_len,
2487 uint64_t o,
2488 bufferlist& bl,
2489 uint64_t o0,
2490 uint64_t l0,
2491 bool _mark_unused,
2492 bool _new_blob)
2493 :
2494 logical_offset(logical_offs),
2495 b(b),
2496 blob_length(blob_len),
2497 b_off(o),
2498 bl(bl),
2499 b_off0(o0),
2500 length0(l0),
2501 mark_unused(_mark_unused),
2502 new_blob(_new_blob) {}
2503 };
2504 vector<write_item> writes; ///< blobs we're writing
2505
2506 /// partial clone of the context
2507 void fork(const WriteContext& other) {
2508 buffered = other.buffered;
2509 compress = other.compress;
2510 target_blob_size = other.target_blob_size;
2511 csum_order = other.csum_order;
2512 }
2513 void write(
2514 uint64_t loffs,
2515 BlobRef b,
2516 uint64_t blob_len,
2517 uint64_t o,
2518 bufferlist& bl,
2519 uint64_t o0,
2520 uint64_t len0,
2521 bool _mark_unused,
2522 bool _new_blob) {
2523 writes.emplace_back(loffs,
2524 b,
2525 blob_len,
2526 o,
2527 bl,
2528 o0,
2529 len0,
2530 _mark_unused,
2531 _new_blob);
2532 }
2533 /// Checks for writes to the same pextent within a blob
2534 bool has_conflict(
2535 BlobRef b,
2536 uint64_t loffs,
2537 uint64_t loffs_end,
2538 uint64_t min_alloc_size);
2539 };
2540
2541 void _do_write_small(
2542 TransContext *txc,
2543 CollectionRef &c,
2544 OnodeRef o,
2545 uint64_t offset, uint64_t length,
2546 bufferlist::iterator& blp,
2547 WriteContext *wctx);
2548 void _do_write_big(
2549 TransContext *txc,
2550 CollectionRef &c,
2551 OnodeRef o,
2552 uint64_t offset, uint64_t length,
2553 bufferlist::iterator& blp,
2554 WriteContext *wctx);
2555 int _do_alloc_write(
2556 TransContext *txc,
2557 CollectionRef c,
2558 OnodeRef o,
2559 WriteContext *wctx);
2560 void _wctx_finish(
2561 TransContext *txc,
2562 CollectionRef& c,
2563 OnodeRef o,
2564 WriteContext *wctx,
2565 set<SharedBlob*> *maybe_unshared_blobs=0);
2566
2567 int _do_transaction(Transaction *t,
2568 TransContext *txc,
2569 ThreadPool::TPHandle *handle);
2570
2571 int _write(TransContext *txc,
2572 CollectionRef& c,
2573 OnodeRef& o,
2574 uint64_t offset, size_t len,
2575 bufferlist& bl,
2576 uint32_t fadvise_flags);
2577 void _pad_zeros(bufferlist *bl, uint64_t *offset,
2578 uint64_t chunk_size);
2579
2580 void _choose_write_options(CollectionRef& c,
2581 OnodeRef o,
2582 uint32_t fadvise_flags,
2583 WriteContext *wctx);
2584
2585 int _do_gc(TransContext *txc,
2586 CollectionRef& c,
2587 OnodeRef o,
2588 const GarbageCollector& gc,
2589 const WriteContext& wctx,
2590 uint64_t *dirty_start,
2591 uint64_t *dirty_end);
2592
2593 int _do_write(TransContext *txc,
2594 CollectionRef &c,
2595 OnodeRef o,
2596 uint64_t offset, uint64_t length,
2597 bufferlist& bl,
2598 uint32_t fadvise_flags);
2599 void _do_write_data(TransContext *txc,
2600 CollectionRef& c,
2601 OnodeRef o,
2602 uint64_t offset,
2603 uint64_t length,
2604 bufferlist& bl,
2605 WriteContext *wctx);
2606
2607 int _touch(TransContext *txc,
2608 CollectionRef& c,
2609 OnodeRef& o);
2610 int _do_zero(TransContext *txc,
2611 CollectionRef& c,
2612 OnodeRef& o,
2613 uint64_t offset, size_t len);
2614 int _zero(TransContext *txc,
2615 CollectionRef& c,
2616 OnodeRef& o,
2617 uint64_t offset, size_t len);
2618 void _do_truncate(TransContext *txc,
2619 CollectionRef& c,
2620 OnodeRef o,
2621 uint64_t offset,
2622 set<SharedBlob*> *maybe_unshared_blobs=0);
2623 int _truncate(TransContext *txc,
2624 CollectionRef& c,
2625 OnodeRef& o,
2626 uint64_t offset);
2627 int _remove(TransContext *txc,
2628 CollectionRef& c,
2629 OnodeRef& o);
2630 int _do_remove(TransContext *txc,
2631 CollectionRef& c,
2632 OnodeRef o);
2633 int _setattr(TransContext *txc,
2634 CollectionRef& c,
2635 OnodeRef& o,
2636 const string& name,
2637 bufferptr& val);
2638 int _setattrs(TransContext *txc,
2639 CollectionRef& c,
2640 OnodeRef& o,
2641 const map<string,bufferptr>& aset);
2642 int _rmattr(TransContext *txc,
2643 CollectionRef& c,
2644 OnodeRef& o,
2645 const string& name);
2646 int _rmattrs(TransContext *txc,
2647 CollectionRef& c,
2648 OnodeRef& o);
2649 void _do_omap_clear(TransContext *txc, uint64_t id);
2650 int _omap_clear(TransContext *txc,
2651 CollectionRef& c,
2652 OnodeRef& o);
2653 int _omap_setkeys(TransContext *txc,
2654 CollectionRef& c,
2655 OnodeRef& o,
2656 bufferlist& bl);
2657 int _omap_setheader(TransContext *txc,
2658 CollectionRef& c,
2659 OnodeRef& o,
2660 bufferlist& header);
2661 int _omap_rmkeys(TransContext *txc,
2662 CollectionRef& c,
2663 OnodeRef& o,
2664 bufferlist& bl);
2665 int _omap_rmkey_range(TransContext *txc,
2666 CollectionRef& c,
2667 OnodeRef& o,
2668 const string& first, const string& last);
2669 int _set_alloc_hint(
2670 TransContext *txc,
2671 CollectionRef& c,
2672 OnodeRef& o,
2673 uint64_t expected_object_size,
2674 uint64_t expected_write_size,
2675 uint32_t flags);
2676 int _do_clone_range(TransContext *txc,
2677 CollectionRef& c,
2678 OnodeRef& oldo,
2679 OnodeRef& newo,
2680 uint64_t srcoff, uint64_t length, uint64_t dstoff);
2681 int _clone(TransContext *txc,
2682 CollectionRef& c,
2683 OnodeRef& oldo,
2684 OnodeRef& newo);
2685 int _clone_range(TransContext *txc,
2686 CollectionRef& c,
2687 OnodeRef& oldo,
2688 OnodeRef& newo,
2689 uint64_t srcoff, uint64_t length, uint64_t dstoff);
2690 int _rename(TransContext *txc,
2691 CollectionRef& c,
2692 OnodeRef& oldo,
2693 OnodeRef& newo,
2694 const ghobject_t& new_oid);
2695 int _create_collection(TransContext *txc, const coll_t &cid,
2696 unsigned bits, CollectionRef *c);
2697 int _remove_collection(TransContext *txc, const coll_t &cid,
2698 CollectionRef *c);
2699 int _split_collection(TransContext *txc,
2700 CollectionRef& c,
2701 CollectionRef& d,
2702 unsigned bits, int rem);
2703 };
2704
2705 inline ostream& operator<<(ostream& out, const BlueStore::OpSequencer& s) {
2706 return out << *s.parent;
2707 }
2708
2709 static inline void intrusive_ptr_add_ref(BlueStore::Onode *o) {
2710 o->get();
2711 }
2712 static inline void intrusive_ptr_release(BlueStore::Onode *o) {
2713 o->put();
2714 }
2715
2716 static inline void intrusive_ptr_add_ref(BlueStore::OpSequencer *o) {
2717 o->get();
2718 }
2719 static inline void intrusive_ptr_release(BlueStore::OpSequencer *o) {
2720 o->put();
2721 }
2722
2723 #endif