]> git.proxmox.com Git - ceph.git/blob - ceph/src/os/bluestore/BlueStore.h
1114e7b4f47cbfc2e4bf6a74eb6719cc971bf019
[ceph.git] / ceph / src / os / bluestore / BlueStore.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_OSD_BLUESTORE_H
16 #define CEPH_OSD_BLUESTORE_H
17
18 #include "acconfig.h"
19
20 #include <unistd.h>
21
22 #include <atomic>
23 #include <mutex>
24 #include <condition_variable>
25
26 #include <boost/intrusive/list.hpp>
27 #include <boost/intrusive/unordered_set.hpp>
28 #include <boost/intrusive/set.hpp>
29 #include <boost/functional/hash.hpp>
30 #include <boost/dynamic_bitset.hpp>
31
32 #include "include/assert.h"
33 #include "include/unordered_map.h"
34 #include "include/memory.h"
35 #include "include/mempool.h"
36 #include "common/Finisher.h"
37 #include "common/perf_counters.h"
38 #include "compressor/Compressor.h"
39 #include "os/ObjectStore.h"
40
41 #include "bluestore_types.h"
42 #include "BlockDevice.h"
43 #include "common/EventTrace.h"
44
45 class Allocator;
46 class FreelistManager;
47 class BlueFS;
48
49 //#define DEBUG_CACHE
50 //#define DEBUG_DEFERRED
51
52
53
54 // constants for Buffer::optimize()
55 #define MAX_BUFFER_SLOP_RATIO_DEN 8 // so actually 1/N
56
57
58 enum {
59 l_bluestore_first = 732430,
60 l_bluestore_kv_flush_lat,
61 l_bluestore_kv_commit_lat,
62 l_bluestore_kv_lat,
63 l_bluestore_state_prepare_lat,
64 l_bluestore_state_aio_wait_lat,
65 l_bluestore_state_io_done_lat,
66 l_bluestore_state_kv_queued_lat,
67 l_bluestore_state_kv_committing_lat,
68 l_bluestore_state_kv_done_lat,
69 l_bluestore_state_deferred_queued_lat,
70 l_bluestore_state_deferred_aio_wait_lat,
71 l_bluestore_state_deferred_cleanup_lat,
72 l_bluestore_state_finishing_lat,
73 l_bluestore_state_done_lat,
74 l_bluestore_throttle_lat,
75 l_bluestore_submit_lat,
76 l_bluestore_commit_lat,
77 l_bluestore_read_lat,
78 l_bluestore_read_onode_meta_lat,
79 l_bluestore_read_wait_aio_lat,
80 l_bluestore_compress_lat,
81 l_bluestore_decompress_lat,
82 l_bluestore_csum_lat,
83 l_bluestore_compress_success_count,
84 l_bluestore_compress_rejected_count,
85 l_bluestore_write_pad_bytes,
86 l_bluestore_deferred_write_ops,
87 l_bluestore_deferred_write_bytes,
88 l_bluestore_write_penalty_read_ops,
89 l_bluestore_allocated,
90 l_bluestore_stored,
91 l_bluestore_compressed,
92 l_bluestore_compressed_allocated,
93 l_bluestore_compressed_original,
94 l_bluestore_onodes,
95 l_bluestore_onode_hits,
96 l_bluestore_onode_misses,
97 l_bluestore_onode_shard_hits,
98 l_bluestore_onode_shard_misses,
99 l_bluestore_extents,
100 l_bluestore_blobs,
101 l_bluestore_buffers,
102 l_bluestore_buffer_bytes,
103 l_bluestore_buffer_hit_bytes,
104 l_bluestore_buffer_miss_bytes,
105 l_bluestore_write_big,
106 l_bluestore_write_big_bytes,
107 l_bluestore_write_big_blobs,
108 l_bluestore_write_small,
109 l_bluestore_write_small_bytes,
110 l_bluestore_write_small_unused,
111 l_bluestore_write_small_deferred,
112 l_bluestore_write_small_pre_read,
113 l_bluestore_write_small_new,
114 l_bluestore_txc,
115 l_bluestore_onode_reshard,
116 l_bluestore_blob_split,
117 l_bluestore_extent_compress,
118 l_bluestore_gc_merged,
119 l_bluestore_last
120 };
121
122 class BlueStore : public ObjectStore,
123 public md_config_obs_t {
124 // -----------------------------------------------------
125 // types
126 public:
127 // config observer
128 const char** get_tracked_conf_keys() const override;
129 void handle_conf_change(const struct md_config_t *conf,
130 const std::set<std::string> &changed) override;
131
132 void _set_csum();
133 void _set_compression();
134 void _set_throttle_params();
135 int _set_cache_sizes();
136
137 class TransContext;
138
139 typedef map<uint64_t, bufferlist> ready_regions_t;
140
141 struct BufferSpace;
142 struct Collection;
143 typedef boost::intrusive_ptr<Collection> CollectionRef;
144
145 struct AioContext {
146 virtual void aio_finish(BlueStore *store) = 0;
147 virtual ~AioContext() {}
148 };
149
150 /// cached buffer
151 struct Buffer {
152 MEMPOOL_CLASS_HELPERS();
153
154 enum {
155 STATE_EMPTY, ///< empty buffer -- used for cache history
156 STATE_CLEAN, ///< clean data that is up to date
157 STATE_WRITING, ///< data that is being written (io not yet complete)
158 };
159 static const char *get_state_name(int s) {
160 switch (s) {
161 case STATE_EMPTY: return "empty";
162 case STATE_CLEAN: return "clean";
163 case STATE_WRITING: return "writing";
164 default: return "???";
165 }
166 }
167 enum {
168 FLAG_NOCACHE = 1, ///< trim when done WRITING (do not become CLEAN)
169 // NOTE: fix operator<< when you define a second flag
170 };
171 static const char *get_flag_name(int s) {
172 switch (s) {
173 case FLAG_NOCACHE: return "nocache";
174 default: return "???";
175 }
176 }
177
178 BufferSpace *space;
179 uint16_t state; ///< STATE_*
180 uint16_t cache_private = 0; ///< opaque (to us) value used by Cache impl
181 uint32_t flags; ///< FLAG_*
182 uint64_t seq;
183 uint32_t offset, length;
184 bufferlist data;
185
186 boost::intrusive::list_member_hook<> lru_item;
187 boost::intrusive::list_member_hook<> state_item;
188
189 Buffer(BufferSpace *space, unsigned s, uint64_t q, uint32_t o, uint32_t l,
190 unsigned f = 0)
191 : space(space), state(s), flags(f), seq(q), offset(o), length(l) {}
192 Buffer(BufferSpace *space, unsigned s, uint64_t q, uint32_t o, bufferlist& b,
193 unsigned f = 0)
194 : space(space), state(s), flags(f), seq(q), offset(o),
195 length(b.length()), data(b) {}
196
197 bool is_empty() const {
198 return state == STATE_EMPTY;
199 }
200 bool is_clean() const {
201 return state == STATE_CLEAN;
202 }
203 bool is_writing() const {
204 return state == STATE_WRITING;
205 }
206
207 uint32_t end() const {
208 return offset + length;
209 }
210
211 void truncate(uint32_t newlen) {
212 assert(newlen < length);
213 if (data.length()) {
214 bufferlist t;
215 t.substr_of(data, 0, newlen);
216 data.claim(t);
217 }
218 length = newlen;
219 }
220 void maybe_rebuild() {
221 if (data.length() &&
222 (data.get_num_buffers() > 1 ||
223 data.front().wasted() > data.length() / MAX_BUFFER_SLOP_RATIO_DEN)) {
224 data.rebuild();
225 }
226 }
227
228 void dump(Formatter *f) const {
229 f->dump_string("state", get_state_name(state));
230 f->dump_unsigned("seq", seq);
231 f->dump_unsigned("offset", offset);
232 f->dump_unsigned("length", length);
233 f->dump_unsigned("data_length", data.length());
234 }
235 };
236
237 struct Cache;
238
239 /// map logical extent range (object) onto buffers
240 struct BufferSpace {
241 typedef boost::intrusive::list<
242 Buffer,
243 boost::intrusive::member_hook<
244 Buffer,
245 boost::intrusive::list_member_hook<>,
246 &Buffer::state_item> > state_list_t;
247
248 mempool::bluestore_cache_other::map<uint32_t, std::unique_ptr<Buffer>>
249 buffer_map;
250
251 // we use a bare intrusive list here instead of std::map because
252 // it uses less memory and we expect this to be very small (very
253 // few IOs in flight to the same Blob at the same time).
254 state_list_t writing; ///< writing buffers, sorted by seq, ascending
255
256 ~BufferSpace() {
257 assert(buffer_map.empty());
258 assert(writing.empty());
259 }
260
261 void _add_buffer(Cache* cache, Buffer *b, int level, Buffer *near) {
262 cache->_audit("_add_buffer start");
263 buffer_map[b->offset].reset(b);
264 if (b->is_writing()) {
265 b->data.reassign_to_mempool(mempool::mempool_bluestore_writing);
266 writing.push_back(*b);
267 } else {
268 b->data.reassign_to_mempool(mempool::mempool_bluestore_cache_data);
269 cache->_add_buffer(b, level, near);
270 }
271 cache->_audit("_add_buffer end");
272 }
273 void _rm_buffer(Cache* cache, Buffer *b) {
274 _rm_buffer(cache, buffer_map.find(b->offset));
275 }
276 void _rm_buffer(Cache* cache,
277 map<uint32_t, std::unique_ptr<Buffer>>::iterator p) {
278 assert(p != buffer_map.end());
279 cache->_audit("_rm_buffer start");
280 if (p->second->is_writing()) {
281 writing.erase(writing.iterator_to(*p->second));
282 } else {
283 cache->_rm_buffer(p->second.get());
284 }
285 buffer_map.erase(p);
286 cache->_audit("_rm_buffer end");
287 }
288
289 map<uint32_t,std::unique_ptr<Buffer>>::iterator _data_lower_bound(
290 uint32_t offset) {
291 auto i = buffer_map.lower_bound(offset);
292 if (i != buffer_map.begin()) {
293 --i;
294 if (i->first + i->second->length <= offset)
295 ++i;
296 }
297 return i;
298 }
299
300 // must be called under protection of the Cache lock
301 void _clear(Cache* cache);
302
303 // return value is the highest cache_private of a trimmed buffer, or 0.
304 int discard(Cache* cache, uint32_t offset, uint32_t length) {
305 std::lock_guard<std::recursive_mutex> l(cache->lock);
306 return _discard(cache, offset, length);
307 }
308 int _discard(Cache* cache, uint32_t offset, uint32_t length);
309
310 void write(Cache* cache, uint64_t seq, uint32_t offset, bufferlist& bl,
311 unsigned flags) {
312 std::lock_guard<std::recursive_mutex> l(cache->lock);
313 Buffer *b = new Buffer(this, Buffer::STATE_WRITING, seq, offset, bl,
314 flags);
315 b->cache_private = _discard(cache, offset, bl.length());
316 _add_buffer(cache, b, (flags & Buffer::FLAG_NOCACHE) ? 0 : 1, nullptr);
317 }
318 void finish_write(Cache* cache, uint64_t seq);
319 void did_read(Cache* cache, uint32_t offset, bufferlist& bl) {
320 std::lock_guard<std::recursive_mutex> l(cache->lock);
321 Buffer *b = new Buffer(this, Buffer::STATE_CLEAN, 0, offset, bl);
322 b->cache_private = _discard(cache, offset, bl.length());
323 _add_buffer(cache, b, 1, nullptr);
324 }
325
326 void read(Cache* cache, uint32_t offset, uint32_t length,
327 BlueStore::ready_regions_t& res,
328 interval_set<uint32_t>& res_intervals);
329
330 void truncate(Cache* cache, uint32_t offset) {
331 discard(cache, offset, (uint32_t)-1 - offset);
332 }
333
334 void split(Cache* cache, size_t pos, BufferSpace &r);
335
336 void dump(Cache* cache, Formatter *f) const {
337 std::lock_guard<std::recursive_mutex> l(cache->lock);
338 f->open_array_section("buffers");
339 for (auto& i : buffer_map) {
340 f->open_object_section("buffer");
341 assert(i.first == i.second->offset);
342 i.second->dump(f);
343 f->close_section();
344 }
345 f->close_section();
346 }
347 };
348
349 struct SharedBlobSet;
350
351 /// in-memory shared blob state (incl cached buffers)
352 struct SharedBlob {
353 MEMPOOL_CLASS_HELPERS();
354
355 std::atomic_int nref = {0}; ///< reference count
356 bool loaded = false;
357
358 CollectionRef coll;
359 union {
360 uint64_t sbid_unloaded; ///< sbid if persistent isn't loaded
361 bluestore_shared_blob_t *persistent; ///< persistent part of the shared blob if any
362 };
363 BufferSpace bc; ///< buffer cache
364
365 SharedBlob(Collection *_coll) : coll(_coll), sbid_unloaded(0) {
366 if (get_cache()) {
367 get_cache()->add_blob();
368 }
369 }
370 SharedBlob(uint64_t i, Collection *_coll);
371 ~SharedBlob();
372
373 uint64_t get_sbid() const {
374 return loaded ? persistent->sbid : sbid_unloaded;
375 }
376
377 friend void intrusive_ptr_add_ref(SharedBlob *b) { b->get(); }
378 friend void intrusive_ptr_release(SharedBlob *b) { b->put(); }
379
380 friend ostream& operator<<(ostream& out, const SharedBlob& sb);
381
382 void get() {
383 ++nref;
384 }
385 void put();
386
387 /// get logical references
388 void get_ref(uint64_t offset, uint32_t length);
389
390 /// put logical references, and get back any released extents
391 void put_ref(uint64_t offset, uint32_t length,
392 PExtentVector *r, set<SharedBlob*> *maybe_unshared_blobs);
393
394 friend bool operator==(const SharedBlob &l, const SharedBlob &r) {
395 return l.get_sbid() == r.get_sbid();
396 }
397 inline Cache* get_cache() {
398 return coll ? coll->cache : nullptr;
399 }
400 inline SharedBlobSet* get_parent() {
401 return coll ? &(coll->shared_blob_set) : nullptr;
402 }
403 inline bool is_loaded() const {
404 return loaded;
405 }
406
407 };
408 typedef boost::intrusive_ptr<SharedBlob> SharedBlobRef;
409
410 /// a lookup table of SharedBlobs
411 struct SharedBlobSet {
412 std::mutex lock; ///< protect lookup, insertion, removal
413
414 // we use a bare pointer because we don't want to affect the ref
415 // count
416 mempool::bluestore_cache_other::unordered_map<uint64_t,SharedBlob*> sb_map;
417
418 SharedBlobRef lookup(uint64_t sbid) {
419 std::lock_guard<std::mutex> l(lock);
420 auto p = sb_map.find(sbid);
421 if (p == sb_map.end()) {
422 return nullptr;
423 }
424 return p->second;
425 }
426
427 void add(Collection* coll, SharedBlob *sb) {
428 std::lock_guard<std::mutex> l(lock);
429 sb_map[sb->get_sbid()] = sb;
430 sb->coll = coll;
431 }
432
433 bool remove(SharedBlob *sb) {
434 std::lock_guard<std::mutex> l(lock);
435 if (sb->nref == 0) {
436 assert(sb->get_parent() == this);
437 sb_map.erase(sb->get_sbid());
438 return true;
439 }
440 return false;
441 }
442
443 bool empty() {
444 std::lock_guard<std::mutex> l(lock);
445 return sb_map.empty();
446 }
447 };
448
449 //#define CACHE_BLOB_BL // not sure if this is a win yet or not... :/
450
451 /// in-memory blob metadata and associated cached buffers (if any)
452 struct Blob {
453 MEMPOOL_CLASS_HELPERS();
454
455 std::atomic_int nref = {0}; ///< reference count
456 int16_t id = -1; ///< id, for spanning blobs only, >= 0
457 int16_t last_encoded_id = -1; ///< (ephemeral) used during encoding only
458 SharedBlobRef shared_blob; ///< shared blob state (if any)
459
460 private:
461 mutable bluestore_blob_t blob; ///< decoded blob metadata
462 #ifdef CACHE_BLOB_BL
463 mutable bufferlist blob_bl; ///< cached encoded blob, blob is dirty if empty
464 #endif
465 /// refs from this shard. ephemeral if id<0, persisted if spanning.
466 bluestore_blob_use_tracker_t used_in_blob;
467
468 public:
469
470 friend void intrusive_ptr_add_ref(Blob *b) { b->get(); }
471 friend void intrusive_ptr_release(Blob *b) { b->put(); }
472
473 friend ostream& operator<<(ostream& out, const Blob &b);
474
475 const bluestore_blob_use_tracker_t& get_blob_use_tracker() const {
476 return used_in_blob;
477 }
478 bool is_referenced() const {
479 return used_in_blob.is_not_empty();
480 }
481 uint32_t get_referenced_bytes() const {
482 return used_in_blob.get_referenced_bytes();
483 }
484
485 bool is_spanning() const {
486 return id >= 0;
487 }
488
489 bool can_split() const {
490 std::lock_guard<std::recursive_mutex> l(shared_blob->get_cache()->lock);
491 // splitting a BufferSpace writing list is too hard; don't try.
492 return shared_blob->bc.writing.empty() &&
493 used_in_blob.can_split() &&
494 get_blob().can_split();
495 }
496
497 bool can_split_at(uint32_t blob_offset) const {
498 return used_in_blob.can_split_at(blob_offset) &&
499 get_blob().can_split_at(blob_offset);
500 }
501
502 bool try_reuse_blob(uint32_t min_alloc_size,
503 uint32_t target_blob_size,
504 uint32_t b_offset,
505 uint32_t *length0);
506
507 void dup(Blob& o) {
508 o.shared_blob = shared_blob;
509 o.blob = blob;
510 #ifdef CACHE_BLOB_BL
511 o.blob_bl = blob_bl;
512 #endif
513 }
514
515 const bluestore_blob_t& get_blob() const {
516 return blob;
517 }
518 bluestore_blob_t& dirty_blob() {
519 #ifdef CACHE_BLOB_BL
520 blob_bl.clear();
521 #endif
522 return blob;
523 }
524
525 /// discard buffers for unallocated regions
526 void discard_unallocated(Collection *coll);
527
528 /// get logical references
529 void get_ref(Collection *coll, uint32_t offset, uint32_t length);
530 /// put logical references, and get back any released extents
531 bool put_ref(Collection *coll, uint32_t offset, uint32_t length,
532 PExtentVector *r);
533
534 /// split the blob
535 void split(Collection *coll, uint32_t blob_offset, Blob *o);
536
537 void get() {
538 ++nref;
539 }
540 void put() {
541 if (--nref == 0)
542 delete this;
543 }
544
545
546 #ifdef CACHE_BLOB_BL
547 void _encode() const {
548 if (blob_bl.length() == 0 ) {
549 ::encode(blob, blob_bl);
550 } else {
551 assert(blob_bl.length());
552 }
553 }
554 void bound_encode(
555 size_t& p,
556 bool include_ref_map) const {
557 _encode();
558 p += blob_bl.length();
559 if (include_ref_map) {
560 used_in_blob.bound_encode(p);
561 }
562 }
563 void encode(
564 bufferlist::contiguous_appender& p,
565 bool include_ref_map) const {
566 _encode();
567 p.append(blob_bl);
568 if (include_ref_map) {
569 used_in_blob.encode(p);
570 }
571 }
572 void decode(
573 Collection */*coll*/,
574 bufferptr::iterator& p,
575 bool include_ref_map) {
576 const char *start = p.get_pos();
577 denc(blob, p);
578 const char *end = p.get_pos();
579 blob_bl.clear();
580 blob_bl.append(start, end - start);
581 if (include_ref_map) {
582 used_in_blob.decode(p);
583 }
584 }
585 #else
586 void bound_encode(
587 size_t& p,
588 uint64_t struct_v,
589 uint64_t sbid,
590 bool include_ref_map) const {
591 denc(blob, p, struct_v);
592 if (blob.is_shared()) {
593 denc(sbid, p);
594 }
595 if (include_ref_map) {
596 used_in_blob.bound_encode(p);
597 }
598 }
599 void encode(
600 bufferlist::contiguous_appender& p,
601 uint64_t struct_v,
602 uint64_t sbid,
603 bool include_ref_map) const {
604 denc(blob, p, struct_v);
605 if (blob.is_shared()) {
606 denc(sbid, p);
607 }
608 if (include_ref_map) {
609 used_in_blob.encode(p);
610 }
611 }
612 void decode(
613 Collection *coll,
614 bufferptr::iterator& p,
615 uint64_t struct_v,
616 uint64_t* sbid,
617 bool include_ref_map);
618 #endif
619 };
620 typedef boost::intrusive_ptr<Blob> BlobRef;
621 typedef mempool::bluestore_cache_other::map<int,BlobRef> blob_map_t;
622
623 /// a logical extent, pointing to (some portion of) a blob
624 typedef boost::intrusive::set_base_hook<boost::intrusive::optimize_size<true> > ExtentBase; //making an alias to avoid build warnings
625 struct Extent : public ExtentBase {
626 MEMPOOL_CLASS_HELPERS();
627
628 uint32_t logical_offset = 0; ///< logical offset
629 uint32_t blob_offset = 0; ///< blob offset
630 uint32_t length = 0; ///< length
631 BlobRef blob; ///< the blob with our data
632
633 /// ctor for lookup only
634 explicit Extent(uint32_t lo) : ExtentBase(), logical_offset(lo) { }
635 /// ctor for delayed initialization (see decode_some())
636 explicit Extent() : ExtentBase() {
637 }
638 /// ctor for general usage
639 Extent(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b)
640 : ExtentBase(),
641 logical_offset(lo), blob_offset(o), length(l) {
642 assign_blob(b);
643 }
644 ~Extent() {
645 if (blob) {
646 blob->shared_blob->get_cache()->rm_extent();
647 }
648 }
649
650 void assign_blob(const BlobRef& b) {
651 assert(!blob);
652 blob = b;
653 blob->shared_blob->get_cache()->add_extent();
654 }
655
656 // comparators for intrusive_set
657 friend bool operator<(const Extent &a, const Extent &b) {
658 return a.logical_offset < b.logical_offset;
659 }
660 friend bool operator>(const Extent &a, const Extent &b) {
661 return a.logical_offset > b.logical_offset;
662 }
663 friend bool operator==(const Extent &a, const Extent &b) {
664 return a.logical_offset == b.logical_offset;
665 }
666
667 uint32_t blob_start() const {
668 return logical_offset - blob_offset;
669 }
670
671 uint32_t blob_end() const {
672 return blob_start() + blob->get_blob().get_logical_length();
673 }
674
675 uint32_t logical_end() const {
676 return logical_offset + length;
677 }
678
679 // return true if any piece of the blob is out of
680 // the given range [o, o + l].
681 bool blob_escapes_range(uint32_t o, uint32_t l) const {
682 return blob_start() < o || blob_end() > o + l;
683 }
684 };
685 typedef boost::intrusive::set<Extent> extent_map_t;
686
687
688 friend ostream& operator<<(ostream& out, const Extent& e);
689
690 struct OldExtent {
691 boost::intrusive::list_member_hook<> old_extent_item;
692 Extent e;
693 PExtentVector r;
694 bool blob_empty; // flag to track the last removed extent that makes blob
695 // empty - required to update compression stat properly
696 OldExtent(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b)
697 : e(lo, o, l, b), blob_empty(false) {
698 }
699 static OldExtent* create(CollectionRef c,
700 uint32_t lo,
701 uint32_t o,
702 uint32_t l,
703 BlobRef& b);
704 };
705 typedef boost::intrusive::list<
706 OldExtent,
707 boost::intrusive::member_hook<
708 OldExtent,
709 boost::intrusive::list_member_hook<>,
710 &OldExtent::old_extent_item> > old_extent_map_t;
711
712 struct Onode;
713
714 /// a sharded extent map, mapping offsets to lextents to blobs
715 struct ExtentMap {
716 Onode *onode;
717 extent_map_t extent_map; ///< map of Extents to Blobs
718 blob_map_t spanning_blob_map; ///< blobs that span shards
719
720 struct Shard {
721 bluestore_onode_t::shard_info *shard_info = nullptr;
722 unsigned extents = 0; ///< count extents in this shard
723 bool loaded = false; ///< true if shard is loaded
724 bool dirty = false; ///< true if shard is dirty and needs reencoding
725 };
726 mempool::bluestore_cache_other::vector<Shard> shards; ///< shards
727
728 bufferlist inline_bl; ///< cached encoded map, if unsharded; empty=>dirty
729
730 uint32_t needs_reshard_begin = 0;
731 uint32_t needs_reshard_end = 0;
732
733 bool needs_reshard() const {
734 return needs_reshard_end > needs_reshard_begin;
735 }
736 void clear_needs_reshard() {
737 needs_reshard_begin = needs_reshard_end = 0;
738 }
739 void request_reshard(uint32_t begin, uint32_t end) {
740 if (begin < needs_reshard_begin) {
741 needs_reshard_begin = begin;
742 }
743 if (end > needs_reshard_end) {
744 needs_reshard_end = end;
745 }
746 }
747
748 struct DeleteDisposer {
749 void operator()(Extent *e) { delete e; }
750 };
751
752 ExtentMap(Onode *o);
753 ~ExtentMap() {
754 extent_map.clear_and_dispose(DeleteDisposer());
755 }
756
757 void clear() {
758 extent_map.clear_and_dispose(DeleteDisposer());
759 shards.clear();
760 inline_bl.clear();
761 clear_needs_reshard();
762 }
763
764 bool encode_some(uint32_t offset, uint32_t length, bufferlist& bl,
765 unsigned *pn);
766 unsigned decode_some(bufferlist& bl);
767
768 void bound_encode_spanning_blobs(size_t& p);
769 void encode_spanning_blobs(bufferlist::contiguous_appender& p);
770 void decode_spanning_blobs(bufferptr::iterator& p);
771
772 BlobRef get_spanning_blob(int id) {
773 auto p = spanning_blob_map.find(id);
774 assert(p != spanning_blob_map.end());
775 return p->second;
776 }
777
778 void update(KeyValueDB::Transaction t, bool force);
779 decltype(BlueStore::Blob::id) allocate_spanning_blob_id();
780 void reshard(
781 KeyValueDB *db,
782 KeyValueDB::Transaction t);
783
784 /// initialize Shards from the onode
785 void init_shards(bool loaded, bool dirty);
786
787 /// return index of shard containing offset
788 /// or -1 if not found
789 int seek_shard(uint32_t offset) {
790 size_t end = shards.size();
791 size_t mid, left = 0;
792 size_t right = end; // one passed the right end
793
794 while (left < right) {
795 mid = left + (right - left) / 2;
796 if (offset >= shards[mid].shard_info->offset) {
797 size_t next = mid + 1;
798 if (next >= end || offset < shards[next].shard_info->offset)
799 return mid;
800 //continue to search forwards
801 left = next;
802 } else {
803 //continue to search backwards
804 right = mid;
805 }
806 }
807
808 return -1; // not found
809 }
810
811 /// check if a range spans a shard
812 bool spans_shard(uint32_t offset, uint32_t length) {
813 if (shards.empty()) {
814 return false;
815 }
816 int s = seek_shard(offset);
817 assert(s >= 0);
818 if (s == (int)shards.size() - 1) {
819 return false; // last shard
820 }
821 if (offset + length <= shards[s+1].shard_info->offset) {
822 return false;
823 }
824 return true;
825 }
826
827 /// ensure that a range of the map is loaded
828 void fault_range(KeyValueDB *db,
829 uint32_t offset, uint32_t length);
830
831 /// ensure a range of the map is marked dirty
832 void dirty_range(uint32_t offset, uint32_t length);
833
834 /// for seek_lextent test
835 extent_map_t::iterator find(uint64_t offset);
836
837 /// seek to the first lextent including or after offset
838 extent_map_t::iterator seek_lextent(uint64_t offset);
839 extent_map_t::const_iterator seek_lextent(uint64_t offset) const;
840
841 /// add a new Extent
842 void add(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b) {
843 extent_map.insert(*new Extent(lo, o, l, b));
844 }
845
846 /// remove (and delete) an Extent
847 void rm(extent_map_t::iterator p) {
848 extent_map.erase_and_dispose(p, DeleteDisposer());
849 }
850
851 bool has_any_lextents(uint64_t offset, uint64_t length);
852
853 /// consolidate adjacent lextents in extent_map
854 int compress_extent_map(uint64_t offset, uint64_t length);
855
856 /// punch a logical hole. add lextents to deref to target list.
857 void punch_hole(CollectionRef &c,
858 uint64_t offset, uint64_t length,
859 old_extent_map_t *old_extents);
860
861 /// put new lextent into lextent_map overwriting existing ones if
862 /// any and update references accordingly
863 Extent *set_lextent(CollectionRef &c,
864 uint64_t logical_offset,
865 uint64_t offset, uint64_t length,
866 BlobRef b,
867 old_extent_map_t *old_extents);
868
869 /// split a blob (and referring extents)
870 BlobRef split_blob(BlobRef lb, uint32_t blob_offset, uint32_t pos);
871 };
872
873 /// Compressed Blob Garbage collector
874 /*
875 The primary idea of the collector is to estimate a difference between
876 allocation units(AU) currently present for compressed blobs and new AUs
877 required to store that data uncompressed.
878 Estimation is performed for protrusive extents within a logical range
879 determined by a concatenation of old_extents collection and specific(current)
880 write request.
881 The root cause for old_extents use is the need to handle blob ref counts
882 properly. Old extents still hold blob refs and hence we need to traverse
883 the collection to determine if blob to be released.
884 Protrusive extents are extents that fit into the blob set in action
885 (ones that are below the logical range from above) but not removed totally
886 due to the current write.
887 E.g. for
888 extent1 <loffs = 100, boffs = 100, len = 100> ->
889 blob1<compressed, len_on_disk=4096, logical_len=8192>
890 extent2 <loffs = 200, boffs = 200, len = 100> ->
891 blob2<raw, len_on_disk=4096, llen=4096>
892 extent3 <loffs = 300, boffs = 300, len = 100> ->
893 blob1<compressed, len_on_disk=4096, llen=8192>
894 extent4 <loffs = 4096, boffs = 0, len = 100> ->
895 blob3<raw, len_on_disk=4096, llen=4096>
896 write(300~100)
897 protrusive extents are within the following ranges <0~300, 400~8192-400>
898 In this case existing AUs that might be removed due to GC (i.e. blob1)
899 use 2x4K bytes.
900 And new AUs expected after GC = 0 since extent1 to be merged into blob2.
901 Hence we should do a collect.
902 */
903 class GarbageCollector
904 {
905 public:
906 /// return amount of allocation units that might be saved due to GC
907 int64_t estimate(
908 uint64_t offset,
909 uint64_t length,
910 const ExtentMap& extent_map,
911 const old_extent_map_t& old_extents,
912 uint64_t min_alloc_size);
913
914 /// return a collection of extents to perform GC on
915 const vector<AllocExtent>& get_extents_to_collect() const {
916 return extents_to_collect;
917 }
918 GarbageCollector(CephContext* _cct) : cct(_cct) {}
919
920 private:
921 struct BlobInfo {
922 uint64_t referenced_bytes = 0; ///< amount of bytes referenced in blob
923 int64_t expected_allocations = 0; ///< new alloc units required
924 ///< in case of gc fulfilled
925 bool collect_candidate = false; ///< indicate if blob has any extents
926 ///< eligible for GC.
927 extent_map_t::const_iterator first_lextent; ///< points to the first
928 ///< lextent referring to
929 ///< the blob if any.
930 ///< collect_candidate flag
931 ///< determines the validity
932 extent_map_t::const_iterator last_lextent; ///< points to the last
933 ///< lextent referring to
934 ///< the blob if any.
935
936 BlobInfo(uint64_t ref_bytes) :
937 referenced_bytes(ref_bytes) {
938 }
939 };
940 CephContext* cct;
941 map<Blob*, BlobInfo> affected_blobs; ///< compressed blobs and their ref_map
942 ///< copies that are affected by the
943 ///< specific write
944
945 vector<AllocExtent> extents_to_collect; ///< protrusive extents that should
946 ///< be collected if GC takes place
947
948 boost::optional<uint64_t > used_alloc_unit; ///< last processed allocation
949 ///< unit when traversing
950 ///< protrusive extents.
951 ///< Other extents mapped to
952 ///< this AU to be ignored
953 ///< (except the case where
954 ///< uncompressed extent follows
955 ///< compressed one - see below).
956 BlobInfo* blob_info_counted = nullptr; ///< set if previous allocation unit
957 ///< caused expected_allocations
958 ///< counter increment at this blob.
959 ///< if uncompressed extent follows
960 ///< a decrement for the
961 ///< expected_allocations counter
962 ///< is needed
963 int64_t expected_allocations = 0; ///< new alloc units required in case
964 ///< of gc fulfilled
965 int64_t expected_for_release = 0; ///< alloc units currently used by
966 ///< compressed blobs that might
967 ///< gone after GC
968 uint64_t gc_start_offset; ///starting offset for GC
969 uint64_t gc_end_offset; ///ending offset for GC
970
971 protected:
972 void process_protrusive_extents(const BlueStore::ExtentMap& extent_map,
973 uint64_t start_offset,
974 uint64_t end_offset,
975 uint64_t start_touch_offset,
976 uint64_t end_touch_offset,
977 uint64_t min_alloc_size);
978 };
979
980 struct OnodeSpace;
981
982 /// an in-memory object
983 struct Onode {
984 MEMPOOL_CLASS_HELPERS();
985
986 std::atomic_int nref; ///< reference count
987 Collection *c;
988
989 ghobject_t oid;
990
991 /// key under PREFIX_OBJ where we are stored
992 mempool::bluestore_cache_other::string key;
993
994 boost::intrusive::list_member_hook<> lru_item;
995
996 bluestore_onode_t onode; ///< metadata stored as value in kv store
997 bool exists; ///< true if object logically exists
998
999 ExtentMap extent_map;
1000
1001 // track txc's that have not been committed to kv store (and whose
1002 // effects cannot be read via the kvdb read methods)
1003 std::atomic<int> flushing_count = {0};
1004 std::mutex flush_lock; ///< protect flush_txns
1005 std::condition_variable flush_cond; ///< wait here for uncommitted txns
1006
1007 Onode(Collection *c, const ghobject_t& o,
1008 const mempool::bluestore_cache_other::string& k)
1009 : nref(0),
1010 c(c),
1011 oid(o),
1012 key(k),
1013 exists(false),
1014 extent_map(this) {
1015 }
1016
1017 void flush();
1018 void get() {
1019 ++nref;
1020 }
1021 void put() {
1022 if (--nref == 0)
1023 delete this;
1024 }
1025 };
1026 typedef boost::intrusive_ptr<Onode> OnodeRef;
1027
1028
1029 /// a cache (shard) of onodes and buffers
1030 struct Cache {
1031 CephContext* cct;
1032 PerfCounters *logger;
1033 std::recursive_mutex lock; ///< protect lru and other structures
1034
1035 std::atomic<uint64_t> num_extents = {0};
1036 std::atomic<uint64_t> num_blobs = {0};
1037
1038 static Cache *create(CephContext* cct, string type, PerfCounters *logger);
1039
1040 Cache(CephContext* cct) : cct(cct), logger(nullptr) {}
1041 virtual ~Cache() {}
1042
1043 virtual void _add_onode(OnodeRef& o, int level) = 0;
1044 virtual void _rm_onode(OnodeRef& o) = 0;
1045 virtual void _touch_onode(OnodeRef& o) = 0;
1046
1047 virtual void _add_buffer(Buffer *b, int level, Buffer *near) = 0;
1048 virtual void _rm_buffer(Buffer *b) = 0;
1049 virtual void _move_buffer(Cache *src, Buffer *b) = 0;
1050 virtual void _adjust_buffer_size(Buffer *b, int64_t delta) = 0;
1051 virtual void _touch_buffer(Buffer *b) = 0;
1052
1053 virtual uint64_t _get_num_onodes() = 0;
1054 virtual uint64_t _get_buffer_bytes() = 0;
1055
1056 void add_extent() {
1057 ++num_extents;
1058 }
1059 void rm_extent() {
1060 --num_extents;
1061 }
1062
1063 void add_blob() {
1064 ++num_blobs;
1065 }
1066 void rm_blob() {
1067 --num_blobs;
1068 }
1069
1070 void trim(uint64_t target_bytes,
1071 float target_meta_ratio,
1072 float target_data_ratio,
1073 float bytes_per_onode);
1074
1075 void trim_all();
1076
1077 virtual void _trim(uint64_t onode_max, uint64_t buffer_max) = 0;
1078
1079 virtual void add_stats(uint64_t *onodes, uint64_t *extents,
1080 uint64_t *blobs,
1081 uint64_t *buffers,
1082 uint64_t *bytes) = 0;
1083
1084 bool empty() {
1085 std::lock_guard<std::recursive_mutex> l(lock);
1086 return _get_num_onodes() == 0 && _get_buffer_bytes() == 0;
1087 }
1088
1089 #ifdef DEBUG_CACHE
1090 virtual void _audit(const char *s) = 0;
1091 #else
1092 void _audit(const char *s) { /* no-op */ }
1093 #endif
1094 };
1095
1096 /// simple LRU cache for onodes and buffers
1097 struct LRUCache : public Cache {
1098 private:
1099 typedef boost::intrusive::list<
1100 Onode,
1101 boost::intrusive::member_hook<
1102 Onode,
1103 boost::intrusive::list_member_hook<>,
1104 &Onode::lru_item> > onode_lru_list_t;
1105 typedef boost::intrusive::list<
1106 Buffer,
1107 boost::intrusive::member_hook<
1108 Buffer,
1109 boost::intrusive::list_member_hook<>,
1110 &Buffer::lru_item> > buffer_lru_list_t;
1111
1112 onode_lru_list_t onode_lru;
1113
1114 buffer_lru_list_t buffer_lru;
1115 uint64_t buffer_size = 0;
1116
1117 public:
1118 LRUCache(CephContext* cct) : Cache(cct) {}
1119 uint64_t _get_num_onodes() override {
1120 return onode_lru.size();
1121 }
1122 void _add_onode(OnodeRef& o, int level) override {
1123 if (level > 0)
1124 onode_lru.push_front(*o);
1125 else
1126 onode_lru.push_back(*o);
1127 }
1128 void _rm_onode(OnodeRef& o) override {
1129 auto q = onode_lru.iterator_to(*o);
1130 onode_lru.erase(q);
1131 }
1132 void _touch_onode(OnodeRef& o) override;
1133
1134 uint64_t _get_buffer_bytes() override {
1135 return buffer_size;
1136 }
1137 void _add_buffer(Buffer *b, int level, Buffer *near) override {
1138 if (near) {
1139 auto q = buffer_lru.iterator_to(*near);
1140 buffer_lru.insert(q, *b);
1141 } else if (level > 0) {
1142 buffer_lru.push_front(*b);
1143 } else {
1144 buffer_lru.push_back(*b);
1145 }
1146 buffer_size += b->length;
1147 }
1148 void _rm_buffer(Buffer *b) override {
1149 assert(buffer_size >= b->length);
1150 buffer_size -= b->length;
1151 auto q = buffer_lru.iterator_to(*b);
1152 buffer_lru.erase(q);
1153 }
1154 void _move_buffer(Cache *src, Buffer *b) override {
1155 src->_rm_buffer(b);
1156 _add_buffer(b, 0, nullptr);
1157 }
1158 void _adjust_buffer_size(Buffer *b, int64_t delta) override {
1159 assert((int64_t)buffer_size + delta >= 0);
1160 buffer_size += delta;
1161 }
1162 void _touch_buffer(Buffer *b) override {
1163 auto p = buffer_lru.iterator_to(*b);
1164 buffer_lru.erase(p);
1165 buffer_lru.push_front(*b);
1166 _audit("_touch_buffer end");
1167 }
1168
1169 void _trim(uint64_t onode_max, uint64_t buffer_max) override;
1170
1171 void add_stats(uint64_t *onodes, uint64_t *extents,
1172 uint64_t *blobs,
1173 uint64_t *buffers,
1174 uint64_t *bytes) override {
1175 std::lock_guard<std::recursive_mutex> l(lock);
1176 *onodes += onode_lru.size();
1177 *extents += num_extents;
1178 *blobs += num_blobs;
1179 *buffers += buffer_lru.size();
1180 *bytes += buffer_size;
1181 }
1182
1183 #ifdef DEBUG_CACHE
1184 void _audit(const char *s) override;
1185 #endif
1186 };
1187
1188 // 2Q cache for buffers, LRU for onodes
1189 struct TwoQCache : public Cache {
1190 private:
1191 // stick with LRU for onodes for now (fixme?)
1192 typedef boost::intrusive::list<
1193 Onode,
1194 boost::intrusive::member_hook<
1195 Onode,
1196 boost::intrusive::list_member_hook<>,
1197 &Onode::lru_item> > onode_lru_list_t;
1198 typedef boost::intrusive::list<
1199 Buffer,
1200 boost::intrusive::member_hook<
1201 Buffer,
1202 boost::intrusive::list_member_hook<>,
1203 &Buffer::lru_item> > buffer_list_t;
1204
1205 onode_lru_list_t onode_lru;
1206
1207 buffer_list_t buffer_hot; ///< "Am" hot buffers
1208 buffer_list_t buffer_warm_in; ///< "A1in" newly warm buffers
1209 buffer_list_t buffer_warm_out; ///< "A1out" empty buffers we've evicted
1210 uint64_t buffer_bytes = 0; ///< bytes
1211
1212 enum {
1213 BUFFER_NEW = 0,
1214 BUFFER_WARM_IN, ///< in buffer_warm_in
1215 BUFFER_WARM_OUT, ///< in buffer_warm_out
1216 BUFFER_HOT, ///< in buffer_hot
1217 BUFFER_TYPE_MAX
1218 };
1219
1220 uint64_t buffer_list_bytes[BUFFER_TYPE_MAX] = {0}; ///< bytes per type
1221
1222 public:
1223 TwoQCache(CephContext* cct) : Cache(cct) {}
1224 uint64_t _get_num_onodes() override {
1225 return onode_lru.size();
1226 }
1227 void _add_onode(OnodeRef& o, int level) override {
1228 if (level > 0)
1229 onode_lru.push_front(*o);
1230 else
1231 onode_lru.push_back(*o);
1232 }
1233 void _rm_onode(OnodeRef& o) override {
1234 auto q = onode_lru.iterator_to(*o);
1235 onode_lru.erase(q);
1236 }
1237 void _touch_onode(OnodeRef& o) override;
1238
1239 uint64_t _get_buffer_bytes() override {
1240 return buffer_bytes;
1241 }
1242 void _add_buffer(Buffer *b, int level, Buffer *near) override;
1243 void _rm_buffer(Buffer *b) override;
1244 void _move_buffer(Cache *src, Buffer *b) override;
1245 void _adjust_buffer_size(Buffer *b, int64_t delta) override;
1246 void _touch_buffer(Buffer *b) override {
1247 switch (b->cache_private) {
1248 case BUFFER_WARM_IN:
1249 // do nothing (somewhat counter-intuitively!)
1250 break;
1251 case BUFFER_WARM_OUT:
1252 // move from warm_out to hot LRU
1253 assert(0 == "this happens via discard hint");
1254 break;
1255 case BUFFER_HOT:
1256 // move to front of hot LRU
1257 buffer_hot.erase(buffer_hot.iterator_to(*b));
1258 buffer_hot.push_front(*b);
1259 break;
1260 }
1261 _audit("_touch_buffer end");
1262 }
1263
1264 void _trim(uint64_t onode_max, uint64_t buffer_max) override;
1265
1266 void add_stats(uint64_t *onodes, uint64_t *extents,
1267 uint64_t *blobs,
1268 uint64_t *buffers,
1269 uint64_t *bytes) override {
1270 std::lock_guard<std::recursive_mutex> l(lock);
1271 *onodes += onode_lru.size();
1272 *extents += num_extents;
1273 *blobs += num_blobs;
1274 *buffers += buffer_hot.size() + buffer_warm_in.size();
1275 *bytes += buffer_bytes;
1276 }
1277
1278 #ifdef DEBUG_CACHE
1279 void _audit(const char *s) override;
1280 #endif
1281 };
1282
1283 struct OnodeSpace {
1284 private:
1285 Cache *cache;
1286
1287 /// forward lookups
1288 mempool::bluestore_cache_other::unordered_map<ghobject_t,OnodeRef> onode_map;
1289
1290 friend class Collection; // for split_cache()
1291
1292 public:
1293 OnodeSpace(Cache *c) : cache(c) {}
1294 ~OnodeSpace() {
1295 clear();
1296 }
1297
1298 OnodeRef add(const ghobject_t& oid, OnodeRef o);
1299 OnodeRef lookup(const ghobject_t& o);
1300 void remove(const ghobject_t& oid) {
1301 onode_map.erase(oid);
1302 }
1303 void rename(OnodeRef& o, const ghobject_t& old_oid,
1304 const ghobject_t& new_oid,
1305 const mempool::bluestore_cache_other::string& new_okey);
1306 void clear();
1307 bool empty();
1308
1309 /// return true if f true for any item
1310 bool map_any(std::function<bool(OnodeRef)> f);
1311 };
1312
1313 struct Collection : public CollectionImpl {
1314 BlueStore *store;
1315 Cache *cache; ///< our cache shard
1316 coll_t cid;
1317 bluestore_cnode_t cnode;
1318 RWLock lock;
1319
1320 bool exists;
1321
1322 SharedBlobSet shared_blob_set; ///< open SharedBlobs
1323
1324 // cache onodes on a per-collection basis to avoid lock
1325 // contention.
1326 OnodeSpace onode_map;
1327
1328 //pool options
1329 pool_opts_t pool_opts;
1330
1331 OnodeRef get_onode(const ghobject_t& oid, bool create);
1332
1333 // the terminology is confusing here, sorry!
1334 //
1335 // blob_t shared_blob_t
1336 // !shared unused -> open
1337 // shared !loaded -> open + shared
1338 // shared loaded -> open + shared + loaded
1339 //
1340 // i.e.,
1341 // open = SharedBlob is instantiated
1342 // shared = blob_t shared flag is set; SharedBlob is hashed.
1343 // loaded = SharedBlob::shared_blob_t is loaded from kv store
1344 void open_shared_blob(uint64_t sbid, BlobRef b);
1345 void load_shared_blob(SharedBlobRef sb);
1346 void make_blob_shared(uint64_t sbid, BlobRef b);
1347 uint64_t make_blob_unshared(SharedBlob *sb);
1348
1349 BlobRef new_blob() {
1350 BlobRef b = new Blob();
1351 b->shared_blob = new SharedBlob(this);
1352 return b;
1353 }
1354
1355 const coll_t &get_cid() override {
1356 return cid;
1357 }
1358
1359 bool contains(const ghobject_t& oid) {
1360 if (cid.is_meta())
1361 return oid.hobj.pool == -1;
1362 spg_t spgid;
1363 if (cid.is_pg(&spgid))
1364 return
1365 spgid.pgid.contains(cnode.bits, oid) &&
1366 oid.shard_id == spgid.shard;
1367 return false;
1368 }
1369
1370 void split_cache(Collection *dest);
1371
1372 Collection(BlueStore *ns, Cache *ca, coll_t c);
1373 };
1374
1375 class OmapIteratorImpl : public ObjectMap::ObjectMapIteratorImpl {
1376 CollectionRef c;
1377 OnodeRef o;
1378 KeyValueDB::Iterator it;
1379 string head, tail;
1380 public:
1381 OmapIteratorImpl(CollectionRef c, OnodeRef o, KeyValueDB::Iterator it);
1382 int seek_to_first() override;
1383 int upper_bound(const string &after) override;
1384 int lower_bound(const string &to) override;
1385 bool valid() override;
1386 int next(bool validate=true) override;
1387 string key() override;
1388 bufferlist value() override;
1389 int status() override {
1390 return 0;
1391 }
1392 };
1393
1394 class OpSequencer;
1395 typedef boost::intrusive_ptr<OpSequencer> OpSequencerRef;
1396
1397 struct volatile_statfs{
1398 enum {
1399 STATFS_ALLOCATED = 0,
1400 STATFS_STORED,
1401 STATFS_COMPRESSED_ORIGINAL,
1402 STATFS_COMPRESSED,
1403 STATFS_COMPRESSED_ALLOCATED,
1404 STATFS_LAST
1405 };
1406 int64_t values[STATFS_LAST];
1407 volatile_statfs() {
1408 memset(this, 0, sizeof(volatile_statfs));
1409 }
1410 void reset() {
1411 *this = volatile_statfs();
1412 }
1413 volatile_statfs& operator+=(const volatile_statfs& other) {
1414 for (size_t i = 0; i < STATFS_LAST; ++i) {
1415 values[i] += other.values[i];
1416 }
1417 return *this;
1418 }
1419 int64_t& allocated() {
1420 return values[STATFS_ALLOCATED];
1421 }
1422 int64_t& stored() {
1423 return values[STATFS_STORED];
1424 }
1425 int64_t& compressed_original() {
1426 return values[STATFS_COMPRESSED_ORIGINAL];
1427 }
1428 int64_t& compressed() {
1429 return values[STATFS_COMPRESSED];
1430 }
1431 int64_t& compressed_allocated() {
1432 return values[STATFS_COMPRESSED_ALLOCATED];
1433 }
1434 bool is_empty() {
1435 return values[STATFS_ALLOCATED] == 0 &&
1436 values[STATFS_STORED] == 0 &&
1437 values[STATFS_COMPRESSED] == 0 &&
1438 values[STATFS_COMPRESSED_ORIGINAL] == 0 &&
1439 values[STATFS_COMPRESSED_ALLOCATED] == 0;
1440 }
1441 void decode(bufferlist::iterator& it) {
1442 for (size_t i = 0; i < STATFS_LAST; i++) {
1443 ::decode(values[i], it);
1444 }
1445 }
1446
1447 void encode(bufferlist& bl) {
1448 for (size_t i = 0; i < STATFS_LAST; i++) {
1449 ::encode(values[i], bl);
1450 }
1451 }
1452 };
1453
1454 struct TransContext : public AioContext {
1455 MEMPOOL_CLASS_HELPERS();
1456
1457 typedef enum {
1458 STATE_PREPARE,
1459 STATE_AIO_WAIT,
1460 STATE_IO_DONE,
1461 STATE_KV_QUEUED, // queued for kv_sync_thread submission
1462 STATE_KV_SUBMITTED, // submitted to kv; not yet synced
1463 STATE_KV_DONE,
1464 STATE_DEFERRED_QUEUED, // in deferred_queue (pending or running)
1465 STATE_DEFERRED_CLEANUP, // remove deferred kv record
1466 STATE_DEFERRED_DONE,
1467 STATE_FINISHING,
1468 STATE_DONE,
1469 } state_t;
1470
1471 state_t state = STATE_PREPARE;
1472
1473 const char *get_state_name() {
1474 switch (state) {
1475 case STATE_PREPARE: return "prepare";
1476 case STATE_AIO_WAIT: return "aio_wait";
1477 case STATE_IO_DONE: return "io_done";
1478 case STATE_KV_QUEUED: return "kv_queued";
1479 case STATE_KV_SUBMITTED: return "kv_submitted";
1480 case STATE_KV_DONE: return "kv_done";
1481 case STATE_DEFERRED_QUEUED: return "deferred_queued";
1482 case STATE_DEFERRED_CLEANUP: return "deferred_cleanup";
1483 case STATE_DEFERRED_DONE: return "deferred_done";
1484 case STATE_FINISHING: return "finishing";
1485 case STATE_DONE: return "done";
1486 }
1487 return "???";
1488 }
1489
1490 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1491 const char *get_state_latency_name(int state) {
1492 switch (state) {
1493 case l_bluestore_state_prepare_lat: return "prepare";
1494 case l_bluestore_state_aio_wait_lat: return "aio_wait";
1495 case l_bluestore_state_io_done_lat: return "io_done";
1496 case l_bluestore_state_kv_queued_lat: return "kv_queued";
1497 case l_bluestore_state_kv_committing_lat: return "kv_committing";
1498 case l_bluestore_state_kv_done_lat: return "kv_done";
1499 case l_bluestore_state_deferred_queued_lat: return "deferred_queued";
1500 case l_bluestore_state_deferred_cleanup_lat: return "deferred_cleanup";
1501 case l_bluestore_state_finishing_lat: return "finishing";
1502 case l_bluestore_state_done_lat: return "done";
1503 }
1504 return "???";
1505 }
1506 #endif
1507
1508 void log_state_latency(PerfCounters *logger, int state) {
1509 utime_t lat, now = ceph_clock_now();
1510 lat = now - last_stamp;
1511 logger->tinc(state, lat);
1512 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1513 if (state >= l_bluestore_state_prepare_lat && state <= l_bluestore_state_done_lat) {
1514 double usecs = (now.to_nsec()-last_stamp.to_nsec())/1000;
1515 OID_ELAPSED("", usecs, get_state_latency_name(state));
1516 }
1517 #endif
1518 last_stamp = now;
1519 }
1520
1521 OpSequencerRef osr;
1522 boost::intrusive::list_member_hook<> sequencer_item;
1523
1524 uint64_t bytes = 0, cost = 0;
1525
1526 set<OnodeRef> onodes; ///< these need to be updated/written
1527 set<OnodeRef> modified_objects; ///< objects we modified (and need a ref)
1528 set<SharedBlobRef> shared_blobs; ///< these need to be updated/written
1529 set<SharedBlobRef> shared_blobs_written; ///< update these on io completion
1530
1531 KeyValueDB::Transaction t; ///< then we will commit this
1532 Context *oncommit = nullptr; ///< signal on commit
1533 Context *onreadable = nullptr; ///< signal on readable
1534 Context *onreadable_sync = nullptr; ///< signal on readable
1535 list<Context*> oncommits; ///< more commit completions
1536 list<CollectionRef> removed_collections; ///< colls we removed
1537
1538 boost::intrusive::list_member_hook<> deferred_queue_item;
1539 bluestore_deferred_transaction_t *deferred_txn = nullptr; ///< if any
1540
1541 interval_set<uint64_t> allocated, released;
1542 volatile_statfs statfs_delta;
1543
1544 IOContext ioc;
1545 bool had_ios = false; ///< true if we submitted IOs before our kv txn
1546
1547 CollectionRef first_collection; ///< first referenced collection
1548
1549 uint64_t seq = 0;
1550 utime_t start;
1551 utime_t last_stamp;
1552
1553 uint64_t last_nid = 0; ///< if non-zero, highest new nid we allocated
1554 uint64_t last_blobid = 0; ///< if non-zero, highest new blobid we allocated
1555
1556 explicit TransContext(CephContext* cct, OpSequencer *o)
1557 : osr(o),
1558 ioc(cct, this),
1559 start(ceph_clock_now()) {
1560 last_stamp = start;
1561 }
1562 ~TransContext() {
1563 delete deferred_txn;
1564 }
1565
1566 void write_onode(OnodeRef &o) {
1567 onodes.insert(o);
1568 }
1569 void write_shared_blob(SharedBlobRef &sb) {
1570 shared_blobs.insert(sb);
1571 }
1572 void unshare_blob(SharedBlob *sb) {
1573 shared_blobs.erase(sb);
1574 }
1575
1576 /// note we logically modified object (when onode itself is unmodified)
1577 void note_modified_object(OnodeRef &o) {
1578 // onode itself isn't written, though
1579 modified_objects.insert(o);
1580 }
1581 void removed(OnodeRef& o) {
1582 onodes.erase(o);
1583 modified_objects.erase(o);
1584 }
1585
1586 void aio_finish(BlueStore *store) override {
1587 store->txc_aio_finish(this);
1588 }
1589 };
1590
1591 typedef boost::intrusive::list<
1592 TransContext,
1593 boost::intrusive::member_hook<
1594 TransContext,
1595 boost::intrusive::list_member_hook<>,
1596 &TransContext::deferred_queue_item> > deferred_queue_t;
1597
1598 struct DeferredBatch : public AioContext {
1599 OpSequencer *osr;
1600 struct deferred_io {
1601 bufferlist bl; ///< data
1602 uint64_t seq; ///< deferred transaction seq
1603 };
1604 map<uint64_t,deferred_io> iomap; ///< map of ios in this batch
1605 deferred_queue_t txcs; ///< txcs in this batch
1606 IOContext ioc; ///< our aios
1607 /// bytes of pending io for each deferred seq (may be 0)
1608 map<uint64_t,int> seq_bytes;
1609
1610 void _discard(CephContext *cct, uint64_t offset, uint64_t length);
1611 void _audit(CephContext *cct);
1612
1613 DeferredBatch(CephContext *cct, OpSequencer *osr)
1614 : osr(osr), ioc(cct, this) {}
1615
1616 /// prepare a write
1617 void prepare_write(CephContext *cct,
1618 uint64_t seq, uint64_t offset, uint64_t length,
1619 bufferlist::const_iterator& p);
1620
1621 void aio_finish(BlueStore *store) override {
1622 store->_deferred_aio_finish(osr);
1623 }
1624 };
1625
1626 class OpSequencer : public Sequencer_impl {
1627 public:
1628 std::mutex qlock;
1629 std::condition_variable qcond;
1630 typedef boost::intrusive::list<
1631 TransContext,
1632 boost::intrusive::member_hook<
1633 TransContext,
1634 boost::intrusive::list_member_hook<>,
1635 &TransContext::sequencer_item> > q_list_t;
1636 q_list_t q; ///< transactions
1637
1638 boost::intrusive::list_member_hook<> deferred_osr_queue_item;
1639
1640 DeferredBatch *deferred_running = nullptr;
1641 DeferredBatch *deferred_pending = nullptr;
1642
1643 Sequencer *parent;
1644 BlueStore *store;
1645
1646 uint64_t last_seq = 0;
1647
1648 std::atomic_int txc_with_unstable_io = {0}; ///< num txcs with unstable io
1649
1650 std::atomic_int kv_committing_serially = {0};
1651
1652 std::atomic_int kv_submitted_waiters = {0};
1653
1654 std::atomic_bool registered = {true}; ///< registered in BlueStore's osr_set
1655 std::atomic_bool zombie = {false}; ///< owning Sequencer has gone away
1656
1657 OpSequencer(CephContext* cct, BlueStore *store)
1658 : Sequencer_impl(cct),
1659 parent(NULL), store(store) {
1660 store->register_osr(this);
1661 }
1662 ~OpSequencer() override {
1663 assert(q.empty());
1664 _unregister();
1665 }
1666
1667 void discard() override {
1668 // Note that we may have txc's in flight when the parent Sequencer
1669 // goes away. Reflect this with zombie==registered==true and let
1670 // _osr_drain_all clean up later.
1671 assert(!zombie);
1672 zombie = true;
1673 parent = nullptr;
1674 bool empty;
1675 {
1676 std::lock_guard<std::mutex> l(qlock);
1677 empty = q.empty();
1678 }
1679 if (empty) {
1680 _unregister();
1681 }
1682 }
1683
1684 void _unregister() {
1685 if (registered) {
1686 store->unregister_osr(this);
1687 registered = false;
1688 }
1689 }
1690
1691 void queue_new(TransContext *txc) {
1692 std::lock_guard<std::mutex> l(qlock);
1693 txc->seq = ++last_seq;
1694 q.push_back(*txc);
1695 }
1696
1697 void drain() {
1698 std::unique_lock<std::mutex> l(qlock);
1699 while (!q.empty())
1700 qcond.wait(l);
1701 }
1702
1703 void drain_preceding(TransContext *txc) {
1704 std::unique_lock<std::mutex> l(qlock);
1705 while (!q.empty() && &q.front() != txc)
1706 qcond.wait(l);
1707 }
1708
1709 bool _is_all_kv_submitted() {
1710 // caller must hold qlock
1711 if (q.empty()) {
1712 return true;
1713 }
1714 TransContext *txc = &q.back();
1715 if (txc->state >= TransContext::STATE_KV_SUBMITTED) {
1716 return true;
1717 }
1718 return false;
1719 }
1720
1721 void flush() override {
1722 std::unique_lock<std::mutex> l(qlock);
1723 while (true) {
1724 // set flag before the check because the condition
1725 // may become true outside qlock, and we need to make
1726 // sure those threads see waiters and signal qcond.
1727 ++kv_submitted_waiters;
1728 if (_is_all_kv_submitted()) {
1729 return;
1730 }
1731 qcond.wait(l);
1732 --kv_submitted_waiters;
1733 }
1734 }
1735
1736 bool flush_commit(Context *c) override {
1737 std::lock_guard<std::mutex> l(qlock);
1738 if (q.empty()) {
1739 return true;
1740 }
1741 TransContext *txc = &q.back();
1742 if (txc->state >= TransContext::STATE_KV_DONE) {
1743 return true;
1744 }
1745 txc->oncommits.push_back(c);
1746 return false;
1747 }
1748 };
1749
1750 typedef boost::intrusive::list<
1751 OpSequencer,
1752 boost::intrusive::member_hook<
1753 OpSequencer,
1754 boost::intrusive::list_member_hook<>,
1755 &OpSequencer::deferred_osr_queue_item> > deferred_osr_queue_t;
1756
1757 struct KVSyncThread : public Thread {
1758 BlueStore *store;
1759 explicit KVSyncThread(BlueStore *s) : store(s) {}
1760 void *entry() override {
1761 store->_kv_sync_thread();
1762 return NULL;
1763 }
1764 };
1765 struct KVFinalizeThread : public Thread {
1766 BlueStore *store;
1767 explicit KVFinalizeThread(BlueStore *s) : store(s) {}
1768 void *entry() {
1769 store->_kv_finalize_thread();
1770 return NULL;
1771 }
1772 };
1773
1774 struct DBHistogram {
1775 struct value_dist {
1776 uint64_t count;
1777 uint32_t max_len;
1778 };
1779
1780 struct key_dist {
1781 uint64_t count;
1782 uint32_t max_len;
1783 map<int, struct value_dist> val_map; ///< slab id to count, max length of value and key
1784 };
1785
1786 map<string, map<int, struct key_dist> > key_hist;
1787 map<int, uint64_t> value_hist;
1788 int get_key_slab(size_t sz);
1789 string get_key_slab_to_range(int slab);
1790 int get_value_slab(size_t sz);
1791 string get_value_slab_to_range(int slab);
1792 void update_hist_entry(map<string, map<int, struct key_dist> > &key_hist,
1793 const string &prefix, size_t key_size, size_t value_size);
1794 void dump(Formatter *f);
1795 };
1796
1797 // --------------------------------------------------------
1798 // members
1799 private:
1800 BlueFS *bluefs = nullptr;
1801 unsigned bluefs_shared_bdev = 0; ///< which bluefs bdev we are sharing
1802 bool bluefs_single_shared_device = true;
1803 utime_t bluefs_last_balance;
1804
1805 KeyValueDB *db = nullptr;
1806 BlockDevice *bdev = nullptr;
1807 std::string freelist_type;
1808 FreelistManager *fm = nullptr;
1809 Allocator *alloc = nullptr;
1810 uuid_d fsid;
1811 int path_fd = -1; ///< open handle to $path
1812 int fsid_fd = -1; ///< open handle (locked) to $path/fsid
1813 bool mounted = false;
1814
1815 RWLock coll_lock = {"BlueStore::coll_lock"}; ///< rwlock to protect coll_map
1816 mempool::bluestore_cache_other::unordered_map<coll_t, CollectionRef> coll_map;
1817
1818 vector<Cache*> cache_shards;
1819
1820 std::mutex osr_lock; ///< protect osd_set
1821 std::set<OpSequencerRef> osr_set; ///< set of all OpSequencers
1822
1823 std::atomic<uint64_t> nid_last = {0};
1824 std::atomic<uint64_t> nid_max = {0};
1825 std::atomic<uint64_t> blobid_last = {0};
1826 std::atomic<uint64_t> blobid_max = {0};
1827
1828 Throttle throttle_bytes; ///< submit to commit
1829 Throttle throttle_deferred_bytes; ///< submit to deferred complete
1830
1831 interval_set<uint64_t> bluefs_extents; ///< block extents owned by bluefs
1832 interval_set<uint64_t> bluefs_extents_reclaiming; ///< currently reclaiming
1833
1834 std::mutex deferred_lock;
1835 std::atomic<uint64_t> deferred_seq = {0};
1836 deferred_osr_queue_t deferred_queue; ///< osr's with deferred io pending
1837 int deferred_queue_size = 0; ///< num txc's queued across all osrs
1838 atomic_int deferred_aggressive = {0}; ///< aggressive wakeup of kv thread
1839
1840 int m_finisher_num = 1;
1841 vector<Finisher*> finishers;
1842
1843 KVSyncThread kv_sync_thread;
1844 std::mutex kv_lock;
1845 std::condition_variable kv_cond;
1846 bool kv_sync_started = false;
1847 bool kv_stop = false;
1848 bool kv_finalize_started = false;
1849 bool kv_finalize_stop = false;
1850 deque<TransContext*> kv_queue; ///< ready, already submitted
1851 deque<TransContext*> kv_queue_unsubmitted; ///< ready, need submit by kv thread
1852 deque<TransContext*> kv_committing; ///< currently syncing
1853 deque<DeferredBatch*> deferred_done_queue; ///< deferred ios done
1854 deque<DeferredBatch*> deferred_stable_queue; ///< deferred ios done + stable
1855
1856 KVFinalizeThread kv_finalize_thread;
1857 std::mutex kv_finalize_lock;
1858 std::condition_variable kv_finalize_cond;
1859 deque<TransContext*> kv_committing_to_finalize; ///< pending finalization
1860 deque<DeferredBatch*> deferred_stable_to_finalize; ///< pending finalization
1861
1862 PerfCounters *logger = nullptr;
1863
1864 std::mutex reap_lock;
1865 list<CollectionRef> removed_collections;
1866
1867 RWLock debug_read_error_lock = {"BlueStore::debug_read_error_lock"};
1868 set<ghobject_t> debug_data_error_objects;
1869 set<ghobject_t> debug_mdata_error_objects;
1870
1871 std::atomic<int> csum_type = {Checksummer::CSUM_CRC32C};
1872
1873 uint64_t block_size = 0; ///< block size of block device (power of 2)
1874 uint64_t block_mask = 0; ///< mask to get just the block offset
1875 size_t block_size_order = 0; ///< bits to shift to get block size
1876
1877 uint64_t min_alloc_size = 0; ///< minimum allocation unit (power of 2)
1878 std::atomic<int> deferred_batch_ops = {0}; ///< deferred batch size
1879
1880 ///< bits for min_alloc_size
1881 std::atomic<uint8_t> min_alloc_size_order = {0};
1882 static_assert(std::numeric_limits<uint8_t>::max() >
1883 std::numeric_limits<decltype(min_alloc_size)>::digits,
1884 "not enough bits for min_alloc_size");
1885
1886 ///< size threshold for forced deferred writes
1887 std::atomic<uint64_t> prefer_deferred_size = {0};
1888
1889 ///< maximum allocation unit (power of 2)
1890 std::atomic<uint64_t> max_alloc_size = {0};
1891
1892 ///< approx cost per io, in bytes
1893 std::atomic<uint64_t> throttle_cost_per_io = {0};
1894
1895 std::atomic<Compressor::CompressionMode> comp_mode = {Compressor::COMP_NONE}; ///< compression mode
1896 CompressorRef compressor;
1897 std::atomic<uint64_t> comp_min_blob_size = {0};
1898 std::atomic<uint64_t> comp_max_blob_size = {0};
1899
1900 std::atomic<uint64_t> max_blob_size = {0}; ///< maximum blob size
1901
1902 uint64_t kv_ios = 0;
1903 uint64_t kv_throttle_costs = 0;
1904
1905 // cache trim control
1906 float cache_meta_ratio = 0; ///< cache ratio dedicated to metadata
1907 float cache_kv_ratio = 0; ///< cache ratio dedicated to kv (e.g., rocksdb)
1908 float cache_data_ratio = 0; ///< cache ratio dedicated to object data
1909
1910 std::mutex vstatfs_lock;
1911 volatile_statfs vstatfs;
1912
1913 struct MempoolThread : public Thread {
1914 BlueStore *store;
1915 Cond cond;
1916 Mutex lock;
1917 bool stop = false;
1918 public:
1919 explicit MempoolThread(BlueStore *s)
1920 : store(s),
1921 lock("BlueStore::MempoolThread::lock") {}
1922 void *entry() override;
1923 void init() {
1924 assert(stop == false);
1925 create("bstore_mempool");
1926 }
1927 void shutdown() {
1928 lock.Lock();
1929 stop = true;
1930 cond.Signal();
1931 lock.Unlock();
1932 join();
1933 }
1934 } mempool_thread;
1935
1936 // --------------------------------------------------------
1937 // private methods
1938
1939 void _init_logger();
1940 void _shutdown_logger();
1941 int _reload_logger();
1942
1943 int _open_path();
1944 void _close_path();
1945 int _open_fsid(bool create);
1946 int _lock_fsid();
1947 int _read_fsid(uuid_d *f);
1948 int _write_fsid();
1949 void _close_fsid();
1950 void _set_alloc_sizes();
1951 void _set_blob_size();
1952
1953 int _open_bdev(bool create);
1954 void _close_bdev();
1955 int _open_db(bool create);
1956 void _close_db();
1957 int _open_fm(bool create);
1958 void _close_fm();
1959 int _open_alloc();
1960 void _close_alloc();
1961 int _open_collections(int *errors=0);
1962 void _close_collections();
1963
1964 int _setup_block_symlink_or_file(string name, string path, uint64_t size,
1965 bool create);
1966
1967 int _write_bdev_label(string path, bluestore_bdev_label_t label);
1968 public:
1969 static int _read_bdev_label(CephContext* cct, string path,
1970 bluestore_bdev_label_t *label);
1971 private:
1972 int _check_or_set_bdev_label(string path, uint64_t size, string desc,
1973 bool create);
1974
1975 int _open_super_meta();
1976
1977 void open_statfs();
1978
1979 int _reconcile_bluefs_freespace();
1980 int _balance_bluefs_freespace(PExtentVector *extents);
1981 void _commit_bluefs_freespace(const PExtentVector& extents);
1982
1983 CollectionRef _get_collection(const coll_t& cid);
1984 void _queue_reap_collection(CollectionRef& c);
1985 void _reap_collections();
1986 void _update_cache_logger();
1987
1988 void _assign_nid(TransContext *txc, OnodeRef o);
1989 uint64_t _assign_blobid(TransContext *txc);
1990
1991 void _dump_onode(OnodeRef o, int log_level=30);
1992 void _dump_extent_map(ExtentMap& em, int log_level=30);
1993 void _dump_transaction(Transaction *t, int log_level = 30);
1994
1995 TransContext *_txc_create(OpSequencer *osr);
1996 void _txc_update_store_statfs(TransContext *txc);
1997 void _txc_add_transaction(TransContext *txc, Transaction *t);
1998 void _txc_calc_cost(TransContext *txc);
1999 void _txc_write_nodes(TransContext *txc, KeyValueDB::Transaction t);
2000 void _txc_state_proc(TransContext *txc);
2001 void _txc_aio_submit(TransContext *txc);
2002 public:
2003 void txc_aio_finish(void *p) {
2004 _txc_state_proc(static_cast<TransContext*>(p));
2005 }
2006 private:
2007 void _txc_finish_io(TransContext *txc);
2008 void _txc_finalize_kv(TransContext *txc, KeyValueDB::Transaction t);
2009 void _txc_applied_kv(TransContext *txc);
2010 void _txc_committed_kv(TransContext *txc);
2011 void _txc_finish(TransContext *txc);
2012 void _txc_release_alloc(TransContext *txc);
2013
2014 void _osr_drain_preceding(TransContext *txc);
2015 void _osr_drain_all();
2016 void _osr_unregister_all();
2017
2018 void _kv_start();
2019 void _kv_stop();
2020 void _kv_sync_thread();
2021 void _kv_finalize_thread();
2022
2023 bluestore_deferred_op_t *_get_deferred_op(TransContext *txc, OnodeRef o);
2024 void _deferred_queue(TransContext *txc);
2025 void deferred_try_submit() {
2026 std::lock_guard<std::mutex> l(deferred_lock);
2027 _deferred_try_submit();
2028 }
2029 void _deferred_try_submit();
2030 void _deferred_submit(OpSequencer *osr);
2031 void _deferred_aio_finish(OpSequencer *osr);
2032 int _deferred_replay();
2033
2034 public:
2035 using mempool_dynamic_bitset =
2036 boost::dynamic_bitset<uint64_t,
2037 mempool::bluestore_fsck::pool_allocator<uint64_t>>;
2038
2039 private:
2040 int _fsck_check_extents(
2041 const ghobject_t& oid,
2042 const PExtentVector& extents,
2043 bool compressed,
2044 mempool_dynamic_bitset &used_blocks,
2045 store_statfs_t& expected_statfs);
2046
2047 void _buffer_cache_write(
2048 TransContext *txc,
2049 BlobRef b,
2050 uint64_t offset,
2051 bufferlist& bl,
2052 unsigned flags) {
2053 b->shared_blob->bc.write(b->shared_blob->get_cache(), txc->seq, offset, bl,
2054 flags);
2055 txc->shared_blobs_written.insert(b->shared_blob);
2056 }
2057
2058 int _collection_list(
2059 Collection *c, const ghobject_t& start, const ghobject_t& end,
2060 int max, vector<ghobject_t> *ls, ghobject_t *next);
2061
2062 template <typename T, typename F>
2063 T select_option(const std::string& opt_name, T val1, F f) {
2064 //NB: opt_name reserved for future use
2065 boost::optional<T> val2 = f();
2066 if (val2) {
2067 return *val2;
2068 }
2069 return val1;
2070 }
2071
2072 void _apply_padding(uint64_t head_pad,
2073 uint64_t tail_pad,
2074 bufferlist& bl,
2075 bufferlist& padded);
2076
2077 // -- ondisk version ---
2078 public:
2079 const int32_t latest_ondisk_format = 2; ///< our version
2080 const int32_t min_readable_ondisk_format = 1; ///< what we can read
2081 const int32_t min_compat_ondisk_format = 2; ///< who can read us
2082
2083 private:
2084 int32_t ondisk_format = 0; ///< value detected on mount
2085
2086 int _upgrade_super(); ///< upgrade (called during open_super)
2087 void _prepare_ondisk_format_super(KeyValueDB::Transaction& t);
2088
2089 // --- public interface ---
2090 public:
2091 BlueStore(CephContext *cct, const string& path);
2092 BlueStore(CephContext *cct, const string& path, uint64_t min_alloc_size); // Ctor for UT only
2093 ~BlueStore() override;
2094
2095 string get_type() override {
2096 return "bluestore";
2097 }
2098
2099 bool needs_journal() override { return false; };
2100 bool wants_journal() override { return false; };
2101 bool allows_journal() override { return false; };
2102
2103 bool is_rotational() override;
2104
2105 static int get_block_device_fsid(CephContext* cct, const string& path,
2106 uuid_d *fsid);
2107
2108 bool test_mount_in_use() override;
2109
2110 private:
2111 int _mount(bool kv_only);
2112 public:
2113 int mount() override {
2114 return _mount(false);
2115 }
2116 int umount() override;
2117
2118 int start_kv_only(KeyValueDB **pdb) {
2119 int r = _mount(true);
2120 if (r < 0)
2121 return r;
2122 *pdb = db;
2123 return 0;
2124 }
2125
2126 int fsck(bool deep) override;
2127
2128 void set_cache_shards(unsigned num) override;
2129
2130 int validate_hobject_key(const hobject_t &obj) const override {
2131 return 0;
2132 }
2133 unsigned get_max_attr_name_length() override {
2134 return 256; // arbitrary; there is no real limit internally
2135 }
2136
2137 int mkfs() override;
2138 int mkjournal() override {
2139 return 0;
2140 }
2141
2142 void get_db_statistics(Formatter *f) override;
2143 void generate_db_histogram(Formatter *f) override;
2144 void _flush_cache();
2145 void flush_cache() override;
2146 void dump_perf_counters(Formatter *f) override {
2147 f->open_object_section("perf_counters");
2148 logger->dump_formatted(f, false);
2149 f->close_section();
2150 }
2151
2152 void register_osr(OpSequencer *osr) {
2153 std::lock_guard<std::mutex> l(osr_lock);
2154 osr_set.insert(osr);
2155 }
2156 void unregister_osr(OpSequencer *osr) {
2157 std::lock_guard<std::mutex> l(osr_lock);
2158 osr_set.erase(osr);
2159 }
2160
2161 public:
2162 int statfs(struct store_statfs_t *buf) override;
2163
2164 void collect_metadata(map<string,string> *pm) override;
2165
2166 bool exists(const coll_t& cid, const ghobject_t& oid) override;
2167 bool exists(CollectionHandle &c, const ghobject_t& oid) override;
2168 int set_collection_opts(
2169 const coll_t& cid,
2170 const pool_opts_t& opts) override;
2171 int stat(
2172 const coll_t& cid,
2173 const ghobject_t& oid,
2174 struct stat *st,
2175 bool allow_eio = false) override;
2176 int stat(
2177 CollectionHandle &c,
2178 const ghobject_t& oid,
2179 struct stat *st,
2180 bool allow_eio = false) override;
2181 int read(
2182 const coll_t& cid,
2183 const ghobject_t& oid,
2184 uint64_t offset,
2185 size_t len,
2186 bufferlist& bl,
2187 uint32_t op_flags = 0,
2188 bool allow_eio = false) override;
2189 int read(
2190 CollectionHandle &c,
2191 const ghobject_t& oid,
2192 uint64_t offset,
2193 size_t len,
2194 bufferlist& bl,
2195 uint32_t op_flags = 0,
2196 bool allow_eio = false) override;
2197 int _do_read(
2198 Collection *c,
2199 OnodeRef o,
2200 uint64_t offset,
2201 size_t len,
2202 bufferlist& bl,
2203 uint32_t op_flags = 0);
2204
2205 private:
2206 int _fiemap(CollectionHandle &c_, const ghobject_t& oid,
2207 uint64_t offset, size_t len, interval_set<uint64_t>& destset);
2208 public:
2209 int fiemap(const coll_t& cid, const ghobject_t& oid,
2210 uint64_t offset, size_t len, bufferlist& bl) override;
2211 int fiemap(CollectionHandle &c, const ghobject_t& oid,
2212 uint64_t offset, size_t len, bufferlist& bl) override;
2213 int fiemap(const coll_t& cid, const ghobject_t& oid,
2214 uint64_t offset, size_t len, map<uint64_t, uint64_t>& destmap) override;
2215 int fiemap(CollectionHandle &c, const ghobject_t& oid,
2216 uint64_t offset, size_t len, map<uint64_t, uint64_t>& destmap) override;
2217
2218
2219 int getattr(const coll_t& cid, const ghobject_t& oid, const char *name,
2220 bufferptr& value) override;
2221 int getattr(CollectionHandle &c, const ghobject_t& oid, const char *name,
2222 bufferptr& value) override;
2223
2224 int getattrs(const coll_t& cid, const ghobject_t& oid,
2225 map<string,bufferptr>& aset) override;
2226 int getattrs(CollectionHandle &c, const ghobject_t& oid,
2227 map<string,bufferptr>& aset) override;
2228
2229 int list_collections(vector<coll_t>& ls) override;
2230
2231 CollectionHandle open_collection(const coll_t &c) override;
2232
2233 bool collection_exists(const coll_t& c) override;
2234 int collection_empty(const coll_t& c, bool *empty) override;
2235 int collection_bits(const coll_t& c) override;
2236
2237 int collection_list(const coll_t& cid,
2238 const ghobject_t& start,
2239 const ghobject_t& end,
2240 int max,
2241 vector<ghobject_t> *ls, ghobject_t *next) override;
2242 int collection_list(CollectionHandle &c,
2243 const ghobject_t& start,
2244 const ghobject_t& end,
2245 int max,
2246 vector<ghobject_t> *ls, ghobject_t *next) override;
2247
2248 int omap_get(
2249 const coll_t& cid, ///< [in] Collection containing oid
2250 const ghobject_t &oid, ///< [in] Object containing omap
2251 bufferlist *header, ///< [out] omap header
2252 map<string, bufferlist> *out /// < [out] Key to value map
2253 ) override;
2254 int omap_get(
2255 CollectionHandle &c, ///< [in] Collection containing oid
2256 const ghobject_t &oid, ///< [in] Object containing omap
2257 bufferlist *header, ///< [out] omap header
2258 map<string, bufferlist> *out /// < [out] Key to value map
2259 ) override;
2260
2261 /// Get omap header
2262 int omap_get_header(
2263 const coll_t& cid, ///< [in] Collection containing oid
2264 const ghobject_t &oid, ///< [in] Object containing omap
2265 bufferlist *header, ///< [out] omap header
2266 bool allow_eio = false ///< [in] don't assert on eio
2267 ) override;
2268 int omap_get_header(
2269 CollectionHandle &c, ///< [in] Collection containing oid
2270 const ghobject_t &oid, ///< [in] Object containing omap
2271 bufferlist *header, ///< [out] omap header
2272 bool allow_eio = false ///< [in] don't assert on eio
2273 ) override;
2274
2275 /// Get keys defined on oid
2276 int omap_get_keys(
2277 const coll_t& cid, ///< [in] Collection containing oid
2278 const ghobject_t &oid, ///< [in] Object containing omap
2279 set<string> *keys ///< [out] Keys defined on oid
2280 ) override;
2281 int omap_get_keys(
2282 CollectionHandle &c, ///< [in] Collection containing oid
2283 const ghobject_t &oid, ///< [in] Object containing omap
2284 set<string> *keys ///< [out] Keys defined on oid
2285 ) override;
2286
2287 /// Get key values
2288 int omap_get_values(
2289 const coll_t& cid, ///< [in] Collection containing oid
2290 const ghobject_t &oid, ///< [in] Object containing omap
2291 const set<string> &keys, ///< [in] Keys to get
2292 map<string, bufferlist> *out ///< [out] Returned keys and values
2293 ) override;
2294 int omap_get_values(
2295 CollectionHandle &c, ///< [in] Collection containing oid
2296 const ghobject_t &oid, ///< [in] Object containing omap
2297 const set<string> &keys, ///< [in] Keys to get
2298 map<string, bufferlist> *out ///< [out] Returned keys and values
2299 ) override;
2300
2301 /// Filters keys into out which are defined on oid
2302 int omap_check_keys(
2303 const coll_t& cid, ///< [in] Collection containing oid
2304 const ghobject_t &oid, ///< [in] Object containing omap
2305 const set<string> &keys, ///< [in] Keys to check
2306 set<string> *out ///< [out] Subset of keys defined on oid
2307 ) override;
2308 int omap_check_keys(
2309 CollectionHandle &c, ///< [in] Collection containing oid
2310 const ghobject_t &oid, ///< [in] Object containing omap
2311 const set<string> &keys, ///< [in] Keys to check
2312 set<string> *out ///< [out] Subset of keys defined on oid
2313 ) override;
2314
2315 ObjectMap::ObjectMapIterator get_omap_iterator(
2316 const coll_t& cid, ///< [in] collection
2317 const ghobject_t &oid ///< [in] object
2318 ) override;
2319 ObjectMap::ObjectMapIterator get_omap_iterator(
2320 CollectionHandle &c, ///< [in] collection
2321 const ghobject_t &oid ///< [in] object
2322 ) override;
2323
2324 void set_fsid(uuid_d u) override {
2325 fsid = u;
2326 }
2327 uuid_d get_fsid() override {
2328 return fsid;
2329 }
2330
2331 uint64_t estimate_objects_overhead(uint64_t num_objects) override {
2332 return num_objects * 300; //assuming per-object overhead is 300 bytes
2333 }
2334
2335 struct BSPerfTracker {
2336 PerfCounters::avg_tracker<uint64_t> os_commit_latency;
2337 PerfCounters::avg_tracker<uint64_t> os_apply_latency;
2338
2339 objectstore_perf_stat_t get_cur_stats() const {
2340 objectstore_perf_stat_t ret;
2341 ret.os_commit_latency = os_commit_latency.avg();
2342 ret.os_apply_latency = os_apply_latency.avg();
2343 return ret;
2344 }
2345
2346 void update_from_perfcounters(PerfCounters &logger);
2347 } perf_tracker;
2348
2349 objectstore_perf_stat_t get_cur_stats() override {
2350 perf_tracker.update_from_perfcounters(*logger);
2351 return perf_tracker.get_cur_stats();
2352 }
2353 const PerfCounters* get_perf_counters() const override {
2354 return logger;
2355 }
2356
2357 int queue_transactions(
2358 Sequencer *osr,
2359 vector<Transaction>& tls,
2360 TrackedOpRef op = TrackedOpRef(),
2361 ThreadPool::TPHandle *handle = NULL) override;
2362
2363 // error injection
2364 void inject_data_error(const ghobject_t& o) override {
2365 RWLock::WLocker l(debug_read_error_lock);
2366 debug_data_error_objects.insert(o);
2367 }
2368 void inject_mdata_error(const ghobject_t& o) override {
2369 RWLock::WLocker l(debug_read_error_lock);
2370 debug_mdata_error_objects.insert(o);
2371 }
2372 private:
2373 bool _debug_data_eio(const ghobject_t& o) {
2374 if (!cct->_conf->bluestore_debug_inject_read_err) {
2375 return false;
2376 }
2377 RWLock::RLocker l(debug_read_error_lock);
2378 return debug_data_error_objects.count(o);
2379 }
2380 bool _debug_mdata_eio(const ghobject_t& o) {
2381 if (!cct->_conf->bluestore_debug_inject_read_err) {
2382 return false;
2383 }
2384 RWLock::RLocker l(debug_read_error_lock);
2385 return debug_mdata_error_objects.count(o);
2386 }
2387 void _debug_obj_on_delete(const ghobject_t& o) {
2388 if (cct->_conf->bluestore_debug_inject_read_err) {
2389 RWLock::WLocker l(debug_read_error_lock);
2390 debug_data_error_objects.erase(o);
2391 debug_mdata_error_objects.erase(o);
2392 }
2393 }
2394
2395 private:
2396
2397 // --------------------------------------------------------
2398 // read processing internal methods
2399 int _verify_csum(
2400 OnodeRef& o,
2401 const bluestore_blob_t* blob,
2402 uint64_t blob_xoffset,
2403 const bufferlist& bl,
2404 uint64_t logical_offset) const;
2405 int _decompress(bufferlist& source, bufferlist* result);
2406
2407
2408 // --------------------------------------------------------
2409 // write ops
2410
2411 struct WriteContext {
2412 bool buffered = false; ///< buffered write
2413 bool compress = false; ///< compressed write
2414 uint64_t target_blob_size = 0; ///< target (max) blob size
2415 unsigned csum_order = 0; ///< target checksum chunk order
2416
2417 old_extent_map_t old_extents; ///< must deref these blobs
2418
2419 struct write_item {
2420 uint64_t logical_offset; ///< write logical offset
2421 BlobRef b;
2422 uint64_t blob_length;
2423 uint64_t b_off;
2424 bufferlist bl;
2425 uint64_t b_off0; ///< original offset in a blob prior to padding
2426 uint64_t length0; ///< original data length prior to padding
2427
2428 bool mark_unused;
2429 bool new_blob; ///< whether new blob was created
2430
2431 write_item(
2432 uint64_t logical_offs,
2433 BlobRef b,
2434 uint64_t blob_len,
2435 uint64_t o,
2436 bufferlist& bl,
2437 uint64_t o0,
2438 uint64_t l0,
2439 bool _mark_unused,
2440 bool _new_blob)
2441 :
2442 logical_offset(logical_offs),
2443 b(b),
2444 blob_length(blob_len),
2445 b_off(o),
2446 bl(bl),
2447 b_off0(o0),
2448 length0(l0),
2449 mark_unused(_mark_unused),
2450 new_blob(_new_blob) {}
2451 };
2452 vector<write_item> writes; ///< blobs we're writing
2453
2454 /// partial clone of the context
2455 void fork(const WriteContext& other) {
2456 buffered = other.buffered;
2457 compress = other.compress;
2458 target_blob_size = other.target_blob_size;
2459 csum_order = other.csum_order;
2460 }
2461 void write(
2462 uint64_t loffs,
2463 BlobRef b,
2464 uint64_t blob_len,
2465 uint64_t o,
2466 bufferlist& bl,
2467 uint64_t o0,
2468 uint64_t len0,
2469 bool _mark_unused,
2470 bool _new_blob) {
2471 writes.emplace_back(loffs,
2472 b,
2473 blob_len,
2474 o,
2475 bl,
2476 o0,
2477 len0,
2478 _mark_unused,
2479 _new_blob);
2480 }
2481 /// Checks for writes to the same pextent within a blob
2482 bool has_conflict(
2483 BlobRef b,
2484 uint64_t loffs,
2485 uint64_t loffs_end,
2486 uint64_t min_alloc_size);
2487 };
2488
2489 void _do_write_small(
2490 TransContext *txc,
2491 CollectionRef &c,
2492 OnodeRef o,
2493 uint64_t offset, uint64_t length,
2494 bufferlist::iterator& blp,
2495 WriteContext *wctx);
2496 void _do_write_big(
2497 TransContext *txc,
2498 CollectionRef &c,
2499 OnodeRef o,
2500 uint64_t offset, uint64_t length,
2501 bufferlist::iterator& blp,
2502 WriteContext *wctx);
2503 int _do_alloc_write(
2504 TransContext *txc,
2505 CollectionRef c,
2506 OnodeRef o,
2507 WriteContext *wctx);
2508 void _wctx_finish(
2509 TransContext *txc,
2510 CollectionRef& c,
2511 OnodeRef o,
2512 WriteContext *wctx,
2513 set<SharedBlob*> *maybe_unshared_blobs=0);
2514
2515 int _do_transaction(Transaction *t,
2516 TransContext *txc,
2517 ThreadPool::TPHandle *handle);
2518
2519 int _write(TransContext *txc,
2520 CollectionRef& c,
2521 OnodeRef& o,
2522 uint64_t offset, size_t len,
2523 bufferlist& bl,
2524 uint32_t fadvise_flags);
2525 void _pad_zeros(bufferlist *bl, uint64_t *offset,
2526 uint64_t chunk_size);
2527
2528 void _choose_write_options(CollectionRef& c,
2529 OnodeRef o,
2530 uint32_t fadvise_flags,
2531 WriteContext *wctx);
2532
2533 int _do_gc(TransContext *txc,
2534 CollectionRef& c,
2535 OnodeRef o,
2536 const GarbageCollector& gc,
2537 const WriteContext& wctx,
2538 uint64_t *dirty_start,
2539 uint64_t *dirty_end);
2540
2541 int _do_write(TransContext *txc,
2542 CollectionRef &c,
2543 OnodeRef o,
2544 uint64_t offset, uint64_t length,
2545 bufferlist& bl,
2546 uint32_t fadvise_flags);
2547 void _do_write_data(TransContext *txc,
2548 CollectionRef& c,
2549 OnodeRef o,
2550 uint64_t offset,
2551 uint64_t length,
2552 bufferlist& bl,
2553 WriteContext *wctx);
2554
2555 int _touch(TransContext *txc,
2556 CollectionRef& c,
2557 OnodeRef& o);
2558 int _do_zero(TransContext *txc,
2559 CollectionRef& c,
2560 OnodeRef& o,
2561 uint64_t offset, size_t len);
2562 int _zero(TransContext *txc,
2563 CollectionRef& c,
2564 OnodeRef& o,
2565 uint64_t offset, size_t len);
2566 void _do_truncate(TransContext *txc,
2567 CollectionRef& c,
2568 OnodeRef o,
2569 uint64_t offset,
2570 set<SharedBlob*> *maybe_unshared_blobs=0);
2571 void _truncate(TransContext *txc,
2572 CollectionRef& c,
2573 OnodeRef& o,
2574 uint64_t offset);
2575 int _remove(TransContext *txc,
2576 CollectionRef& c,
2577 OnodeRef& o);
2578 int _do_remove(TransContext *txc,
2579 CollectionRef& c,
2580 OnodeRef o);
2581 int _setattr(TransContext *txc,
2582 CollectionRef& c,
2583 OnodeRef& o,
2584 const string& name,
2585 bufferptr& val);
2586 int _setattrs(TransContext *txc,
2587 CollectionRef& c,
2588 OnodeRef& o,
2589 const map<string,bufferptr>& aset);
2590 int _rmattr(TransContext *txc,
2591 CollectionRef& c,
2592 OnodeRef& o,
2593 const string& name);
2594 int _rmattrs(TransContext *txc,
2595 CollectionRef& c,
2596 OnodeRef& o);
2597 void _do_omap_clear(TransContext *txc, uint64_t id);
2598 int _omap_clear(TransContext *txc,
2599 CollectionRef& c,
2600 OnodeRef& o);
2601 int _omap_setkeys(TransContext *txc,
2602 CollectionRef& c,
2603 OnodeRef& o,
2604 bufferlist& bl);
2605 int _omap_setheader(TransContext *txc,
2606 CollectionRef& c,
2607 OnodeRef& o,
2608 bufferlist& header);
2609 int _omap_rmkeys(TransContext *txc,
2610 CollectionRef& c,
2611 OnodeRef& o,
2612 bufferlist& bl);
2613 int _omap_rmkey_range(TransContext *txc,
2614 CollectionRef& c,
2615 OnodeRef& o,
2616 const string& first, const string& last);
2617 int _set_alloc_hint(
2618 TransContext *txc,
2619 CollectionRef& c,
2620 OnodeRef& o,
2621 uint64_t expected_object_size,
2622 uint64_t expected_write_size,
2623 uint32_t flags);
2624 int _do_clone_range(TransContext *txc,
2625 CollectionRef& c,
2626 OnodeRef& oldo,
2627 OnodeRef& newo,
2628 uint64_t srcoff, uint64_t length, uint64_t dstoff);
2629 int _clone(TransContext *txc,
2630 CollectionRef& c,
2631 OnodeRef& oldo,
2632 OnodeRef& newo);
2633 int _clone_range(TransContext *txc,
2634 CollectionRef& c,
2635 OnodeRef& oldo,
2636 OnodeRef& newo,
2637 uint64_t srcoff, uint64_t length, uint64_t dstoff);
2638 int _rename(TransContext *txc,
2639 CollectionRef& c,
2640 OnodeRef& oldo,
2641 OnodeRef& newo,
2642 const ghobject_t& new_oid);
2643 int _create_collection(TransContext *txc, const coll_t &cid,
2644 unsigned bits, CollectionRef *c);
2645 int _remove_collection(TransContext *txc, const coll_t &cid,
2646 CollectionRef *c);
2647 int _split_collection(TransContext *txc,
2648 CollectionRef& c,
2649 CollectionRef& d,
2650 unsigned bits, int rem);
2651 };
2652
2653 inline ostream& operator<<(ostream& out, const BlueStore::OpSequencer& s) {
2654 return out << *s.parent;
2655 }
2656
2657 static inline void intrusive_ptr_add_ref(BlueStore::Onode *o) {
2658 o->get();
2659 }
2660 static inline void intrusive_ptr_release(BlueStore::Onode *o) {
2661 o->put();
2662 }
2663
2664 static inline void intrusive_ptr_add_ref(BlueStore::OpSequencer *o) {
2665 o->get();
2666 }
2667 static inline void intrusive_ptr_release(BlueStore::OpSequencer *o) {
2668 o->put();
2669 }
2670
2671 #endif