]> git.proxmox.com Git - ceph.git/blob - ceph/src/os/bluestore/BlueStore.h
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / os / bluestore / BlueStore.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3 /*
4 * Ceph - scalable distributed file system
5 *
6 * Copyright (C) 2014 Red Hat
7 *
8 * This is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License version 2.1, as published by the Free Software
11 * Foundation. See file COPYING.
12 *
13 */
14
15 #ifndef CEPH_OSD_BLUESTORE_H
16 #define CEPH_OSD_BLUESTORE_H
17
18 #include "acconfig.h"
19
20 #include <unistd.h>
21
22 #include <atomic>
23 #include <mutex>
24 #include <condition_variable>
25
26 #include <boost/intrusive/list.hpp>
27 #include <boost/intrusive/unordered_set.hpp>
28 #include <boost/intrusive/set.hpp>
29 #include <boost/functional/hash.hpp>
30 #include <boost/dynamic_bitset.hpp>
31
32 #include "include/assert.h"
33 #include "include/unordered_map.h"
34 #include "include/memory.h"
35 #include "include/mempool.h"
36 #include "common/Finisher.h"
37 #include "common/perf_counters.h"
38 #include "compressor/Compressor.h"
39 #include "os/ObjectStore.h"
40
41 #include "bluestore_types.h"
42 #include "BlockDevice.h"
43 #include "common/EventTrace.h"
44
45 class Allocator;
46 class FreelistManager;
47 class BlueFS;
48
49 //#define DEBUG_CACHE
50 //#define DEBUG_DEFERRED
51
52 enum {
53 l_bluestore_first = 732430,
54 l_bluestore_kv_flush_lat,
55 l_bluestore_kv_commit_lat,
56 l_bluestore_kv_lat,
57 l_bluestore_state_prepare_lat,
58 l_bluestore_state_aio_wait_lat,
59 l_bluestore_state_io_done_lat,
60 l_bluestore_state_kv_queued_lat,
61 l_bluestore_state_kv_committing_lat,
62 l_bluestore_state_kv_done_lat,
63 l_bluestore_state_deferred_queued_lat,
64 l_bluestore_state_deferred_aio_wait_lat,
65 l_bluestore_state_deferred_cleanup_lat,
66 l_bluestore_state_finishing_lat,
67 l_bluestore_state_done_lat,
68 l_bluestore_throttle_lat,
69 l_bluestore_submit_lat,
70 l_bluestore_commit_lat,
71 l_bluestore_read_lat,
72 l_bluestore_read_onode_meta_lat,
73 l_bluestore_read_wait_aio_lat,
74 l_bluestore_compress_lat,
75 l_bluestore_decompress_lat,
76 l_bluestore_csum_lat,
77 l_bluestore_compress_success_count,
78 l_bluestore_compress_rejected_count,
79 l_bluestore_write_pad_bytes,
80 l_bluestore_deferred_write_ops,
81 l_bluestore_deferred_write_bytes,
82 l_bluestore_write_penalty_read_ops,
83 l_bluestore_allocated,
84 l_bluestore_stored,
85 l_bluestore_compressed,
86 l_bluestore_compressed_allocated,
87 l_bluestore_compressed_original,
88 l_bluestore_onodes,
89 l_bluestore_onode_hits,
90 l_bluestore_onode_misses,
91 l_bluestore_onode_shard_hits,
92 l_bluestore_onode_shard_misses,
93 l_bluestore_extents,
94 l_bluestore_blobs,
95 l_bluestore_buffers,
96 l_bluestore_buffer_bytes,
97 l_bluestore_buffer_hit_bytes,
98 l_bluestore_buffer_miss_bytes,
99 l_bluestore_write_big,
100 l_bluestore_write_big_bytes,
101 l_bluestore_write_big_blobs,
102 l_bluestore_write_small,
103 l_bluestore_write_small_bytes,
104 l_bluestore_write_small_unused,
105 l_bluestore_write_small_deferred,
106 l_bluestore_write_small_pre_read,
107 l_bluestore_write_small_new,
108 l_bluestore_txc,
109 l_bluestore_onode_reshard,
110 l_bluestore_blob_split,
111 l_bluestore_extent_compress,
112 l_bluestore_gc_merged,
113 l_bluestore_last
114 };
115
116 class BlueStore : public ObjectStore,
117 public md_config_obs_t {
118 // -----------------------------------------------------
119 // types
120 public:
121 // config observer
122 const char** get_tracked_conf_keys() const override;
123 void handle_conf_change(const struct md_config_t *conf,
124 const std::set<std::string> &changed) override;
125
126 void _set_csum();
127 void _set_compression();
128 void _set_throttle_params();
129
130 class TransContext;
131
132 typedef map<uint64_t, bufferlist> ready_regions_t;
133
134 struct BufferSpace;
135 struct Collection;
136 typedef boost::intrusive_ptr<Collection> CollectionRef;
137
138 struct AioContext {
139 virtual void aio_finish(BlueStore *store) = 0;
140 virtual ~AioContext() {}
141 };
142
143 /// cached buffer
144 struct Buffer {
145 MEMPOOL_CLASS_HELPERS();
146
147 enum {
148 STATE_EMPTY, ///< empty buffer -- used for cache history
149 STATE_CLEAN, ///< clean data that is up to date
150 STATE_WRITING, ///< data that is being written (io not yet complete)
151 };
152 static const char *get_state_name(int s) {
153 switch (s) {
154 case STATE_EMPTY: return "empty";
155 case STATE_CLEAN: return "clean";
156 case STATE_WRITING: return "writing";
157 default: return "???";
158 }
159 }
160 enum {
161 FLAG_NOCACHE = 1, ///< trim when done WRITING (do not become CLEAN)
162 // NOTE: fix operator<< when you define a second flag
163 };
164 static const char *get_flag_name(int s) {
165 switch (s) {
166 case FLAG_NOCACHE: return "nocache";
167 default: return "???";
168 }
169 }
170
171 BufferSpace *space;
172 uint16_t state; ///< STATE_*
173 uint16_t cache_private = 0; ///< opaque (to us) value used by Cache impl
174 uint32_t flags; ///< FLAG_*
175 uint64_t seq;
176 uint32_t offset, length;
177 bufferlist data;
178
179 boost::intrusive::list_member_hook<> lru_item;
180 boost::intrusive::list_member_hook<> state_item;
181
182 Buffer(BufferSpace *space, unsigned s, uint64_t q, uint32_t o, uint32_t l,
183 unsigned f = 0)
184 : space(space), state(s), flags(f), seq(q), offset(o), length(l) {}
185 Buffer(BufferSpace *space, unsigned s, uint64_t q, uint32_t o, bufferlist& b,
186 unsigned f = 0)
187 : space(space), state(s), flags(f), seq(q), offset(o),
188 length(b.length()), data(b) {}
189
190 bool is_empty() const {
191 return state == STATE_EMPTY;
192 }
193 bool is_clean() const {
194 return state == STATE_CLEAN;
195 }
196 bool is_writing() const {
197 return state == STATE_WRITING;
198 }
199
200 uint32_t end() const {
201 return offset + length;
202 }
203
204 void truncate(uint32_t newlen) {
205 assert(newlen < length);
206 if (data.length()) {
207 bufferlist t;
208 t.substr_of(data, 0, newlen);
209 data.claim(t);
210 }
211 length = newlen;
212 }
213
214 void dump(Formatter *f) const {
215 f->dump_string("state", get_state_name(state));
216 f->dump_unsigned("seq", seq);
217 f->dump_unsigned("offset", offset);
218 f->dump_unsigned("length", length);
219 f->dump_unsigned("data_length", data.length());
220 }
221 };
222
223 struct Cache;
224
225 /// map logical extent range (object) onto buffers
226 struct BufferSpace {
227 typedef boost::intrusive::list<
228 Buffer,
229 boost::intrusive::member_hook<
230 Buffer,
231 boost::intrusive::list_member_hook<>,
232 &Buffer::state_item> > state_list_t;
233
234 mempool::bluestore_meta_other::map<uint32_t, std::unique_ptr<Buffer>>
235 buffer_map;
236
237 // we use a bare intrusive list here instead of std::map because
238 // it uses less memory and we expect this to be very small (very
239 // few IOs in flight to the same Blob at the same time).
240 state_list_t writing; ///< writing buffers, sorted by seq, ascending
241
242 ~BufferSpace() {
243 assert(buffer_map.empty());
244 assert(writing.empty());
245 }
246
247 void _add_buffer(Cache* cache, Buffer *b, int level, Buffer *near) {
248 cache->_audit("_add_buffer start");
249 buffer_map[b->offset].reset(b);
250 if (b->is_writing()) {
251 writing.push_back(*b);
252 } else {
253 cache->_add_buffer(b, level, near);
254 }
255 cache->_audit("_add_buffer end");
256 }
257 void _rm_buffer(Cache* cache, Buffer *b) {
258 _rm_buffer(cache, buffer_map.find(b->offset));
259 }
260 void _rm_buffer(Cache* cache, map<uint32_t, std::unique_ptr<Buffer>>::iterator p) {
261 assert(p != buffer_map.end());
262 cache->_audit("_rm_buffer start");
263 if (p->second->is_writing()) {
264 writing.erase(writing.iterator_to(*p->second));
265 } else {
266 cache->_rm_buffer(p->second.get());
267 }
268 buffer_map.erase(p);
269 cache->_audit("_rm_buffer end");
270 }
271
272 map<uint32_t,std::unique_ptr<Buffer>>::iterator _data_lower_bound(
273 uint32_t offset) {
274 auto i = buffer_map.lower_bound(offset);
275 if (i != buffer_map.begin()) {
276 --i;
277 if (i->first + i->second->length <= offset)
278 ++i;
279 }
280 return i;
281 }
282
283 // must be called under protection of the Cache lock
284 void _clear(Cache* cache);
285
286 // return value is the highest cache_private of a trimmed buffer, or 0.
287 int discard(Cache* cache, uint32_t offset, uint32_t length) {
288 std::lock_guard<std::recursive_mutex> l(cache->lock);
289 return _discard(cache, offset, length);
290 }
291 int _discard(Cache* cache, uint32_t offset, uint32_t length);
292
293 void write(Cache* cache, uint64_t seq, uint32_t offset, bufferlist& bl,
294 unsigned flags) {
295 std::lock_guard<std::recursive_mutex> l(cache->lock);
296 Buffer *b = new Buffer(this, Buffer::STATE_WRITING, seq, offset, bl,
297 flags);
298 b->cache_private = _discard(cache, offset, bl.length());
299 _add_buffer(cache, b, (flags & Buffer::FLAG_NOCACHE) ? 0 : 1, nullptr);
300 }
301 void finish_write(Cache* cache, uint64_t seq);
302 void did_read(Cache* cache, uint32_t offset, bufferlist& bl) {
303 std::lock_guard<std::recursive_mutex> l(cache->lock);
304 Buffer *b = new Buffer(this, Buffer::STATE_CLEAN, 0, offset, bl);
305 b->cache_private = _discard(cache, offset, bl.length());
306 _add_buffer(cache, b, 1, nullptr);
307 }
308
309 void read(Cache* cache, uint32_t offset, uint32_t length,
310 BlueStore::ready_regions_t& res,
311 interval_set<uint32_t>& res_intervals);
312
313 void truncate(Cache* cache, uint32_t offset) {
314 discard(cache, offset, (uint32_t)-1 - offset);
315 }
316
317 void split(Cache* cache, size_t pos, BufferSpace &r);
318
319 void dump(Cache* cache, Formatter *f) const {
320 std::lock_guard<std::recursive_mutex> l(cache->lock);
321 f->open_array_section("buffers");
322 for (auto& i : buffer_map) {
323 f->open_object_section("buffer");
324 assert(i.first == i.second->offset);
325 i.second->dump(f);
326 f->close_section();
327 }
328 f->close_section();
329 }
330 };
331
332 struct SharedBlobSet;
333
334 /// in-memory shared blob state (incl cached buffers)
335 struct SharedBlob {
336 MEMPOOL_CLASS_HELPERS();
337
338 std::atomic_int nref = {0}; ///< reference count
339 bool loaded = false;
340
341 CollectionRef coll;
342 union {
343 uint64_t sbid_unloaded; ///< sbid if persistent isn't loaded
344 bluestore_shared_blob_t *persistent; ///< persistent part of the shared blob if any
345 };
346 BufferSpace bc; ///< buffer cache
347
348 SharedBlob(Collection *_coll) : coll(_coll), sbid_unloaded(0) {
349 if (get_cache()) {
350 get_cache()->add_blob();
351 }
352 }
353 SharedBlob(uint64_t i, Collection *_coll);
354 ~SharedBlob();
355
356 uint64_t get_sbid() const {
357 return loaded ? persistent->sbid : sbid_unloaded;
358 }
359
360 friend void intrusive_ptr_add_ref(SharedBlob *b) { b->get(); }
361 friend void intrusive_ptr_release(SharedBlob *b) { b->put(); }
362
363 friend ostream& operator<<(ostream& out, const SharedBlob& sb);
364
365 void get() {
366 ++nref;
367 }
368 void put();
369
370 /// get logical references
371 void get_ref(uint64_t offset, uint32_t length);
372
373 /// put logical references, and get back any released extents
374 void put_ref(uint64_t offset, uint32_t length,
375 PExtentVector *r);
376
377 friend bool operator==(const SharedBlob &l, const SharedBlob &r) {
378 return l.get_sbid() == r.get_sbid();
379 }
380 inline Cache* get_cache() {
381 return coll ? coll->cache : nullptr;
382 }
383 inline SharedBlobSet* get_parent() {
384 return coll ? &(coll->shared_blob_set) : nullptr;
385 }
386 inline bool is_loaded() const {
387 return loaded;
388 }
389
390 };
391 typedef boost::intrusive_ptr<SharedBlob> SharedBlobRef;
392
393 /// a lookup table of SharedBlobs
394 struct SharedBlobSet {
395 std::mutex lock; ///< protect lookup, insertion, removal
396
397 // we use a bare pointer because we don't want to affect the ref
398 // count
399 mempool::bluestore_meta_other::unordered_map<uint64_t,SharedBlob*> sb_map;
400
401 SharedBlobRef lookup(uint64_t sbid) {
402 std::lock_guard<std::mutex> l(lock);
403 auto p = sb_map.find(sbid);
404 if (p == sb_map.end()) {
405 return nullptr;
406 }
407 return p->second;
408 }
409
410 void add(Collection* coll, SharedBlob *sb) {
411 std::lock_guard<std::mutex> l(lock);
412 sb_map[sb->get_sbid()] = sb;
413 sb->coll = coll;
414 }
415
416 bool remove(SharedBlob *sb) {
417 std::lock_guard<std::mutex> l(lock);
418 if (sb->nref == 0) {
419 assert(sb->get_parent() == this);
420 sb_map.erase(sb->get_sbid());
421 return true;
422 }
423 return false;
424 }
425
426 bool empty() {
427 std::lock_guard<std::mutex> l(lock);
428 return sb_map.empty();
429 }
430 };
431
432 //#define CACHE_BLOB_BL // not sure if this is a win yet or not... :/
433
434 /// in-memory blob metadata and associated cached buffers (if any)
435 struct Blob {
436 MEMPOOL_CLASS_HELPERS();
437
438 std::atomic_int nref = {0}; ///< reference count
439 int16_t id = -1; ///< id, for spanning blobs only, >= 0
440 int16_t last_encoded_id = -1; ///< (ephemeral) used during encoding only
441 SharedBlobRef shared_blob; ///< shared blob state (if any)
442
443 private:
444 mutable bluestore_blob_t blob; ///< decoded blob metadata
445 #ifdef CACHE_BLOB_BL
446 mutable bufferlist blob_bl; ///< cached encoded blob, blob is dirty if empty
447 #endif
448 /// refs from this shard. ephemeral if id<0, persisted if spanning.
449 bluestore_blob_use_tracker_t used_in_blob;
450
451 public:
452
453 friend void intrusive_ptr_add_ref(Blob *b) { b->get(); }
454 friend void intrusive_ptr_release(Blob *b) { b->put(); }
455
456 friend ostream& operator<<(ostream& out, const Blob &b);
457
458 const bluestore_blob_use_tracker_t& get_blob_use_tracker() const {
459 return used_in_blob;
460 }
461 bool is_referenced() const {
462 return used_in_blob.is_not_empty();
463 }
464 uint32_t get_referenced_bytes() const {
465 return used_in_blob.get_referenced_bytes();
466 }
467
468 bool is_spanning() const {
469 return id >= 0;
470 }
471
472 bool can_split() const {
473 std::lock_guard<std::recursive_mutex> l(shared_blob->get_cache()->lock);
474 // splitting a BufferSpace writing list is too hard; don't try.
475 return shared_blob->bc.writing.empty() &&
476 used_in_blob.can_split() &&
477 get_blob().can_split();
478 }
479
480 bool can_split_at(uint32_t blob_offset) const {
481 return used_in_blob.can_split_at(blob_offset) &&
482 get_blob().can_split_at(blob_offset);
483 }
484
485 bool try_reuse_blob(uint32_t min_alloc_size,
486 uint32_t target_blob_size,
487 uint32_t b_offset,
488 uint32_t *length0);
489
490 void dup(Blob& o) {
491 o.shared_blob = shared_blob;
492 o.blob = blob;
493 #ifdef CACHE_BLOB_BL
494 o.blob_bl = blob_bl;
495 #endif
496 }
497
498 const bluestore_blob_t& get_blob() const {
499 return blob;
500 }
501 bluestore_blob_t& dirty_blob() {
502 #ifdef CACHE_BLOB_BL
503 blob_bl.clear();
504 #endif
505 return blob;
506 }
507
508 /// discard buffers for unallocated regions
509 void discard_unallocated(Collection *coll);
510
511 /// get logical references
512 void get_ref(Collection *coll, uint32_t offset, uint32_t length);
513 /// put logical references, and get back any released extents
514 bool put_ref(Collection *coll, uint32_t offset, uint32_t length,
515 PExtentVector *r);
516
517 /// split the blob
518 void split(Collection *coll, uint32_t blob_offset, Blob *o);
519
520 void get() {
521 ++nref;
522 }
523 void put() {
524 if (--nref == 0)
525 delete this;
526 }
527
528
529 #ifdef CACHE_BLOB_BL
530 void _encode() const {
531 if (blob_bl.length() == 0 ) {
532 ::encode(blob, blob_bl);
533 } else {
534 assert(blob_bl.length());
535 }
536 }
537 void bound_encode(
538 size_t& p,
539 bool include_ref_map) const {
540 _encode();
541 p += blob_bl.length();
542 if (include_ref_map) {
543 used_in_blob.bound_encode(p);
544 }
545 }
546 void encode(
547 bufferlist::contiguous_appender& p,
548 bool include_ref_map) const {
549 _encode();
550 p.append(blob_bl);
551 if (include_ref_map) {
552 used_in_blob.encode(p);
553 }
554 }
555 void decode(
556 Collection */*coll*/,
557 bufferptr::iterator& p,
558 bool include_ref_map) {
559 const char *start = p.get_pos();
560 denc(blob, p);
561 const char *end = p.get_pos();
562 blob_bl.clear();
563 blob_bl.append(start, end - start);
564 if (include_ref_map) {
565 used_in_blob.decode(p);
566 }
567 }
568 #else
569 void bound_encode(
570 size_t& p,
571 uint64_t struct_v,
572 uint64_t sbid,
573 bool include_ref_map) const {
574 denc(blob, p, struct_v);
575 if (blob.is_shared()) {
576 denc(sbid, p);
577 }
578 if (include_ref_map) {
579 used_in_blob.bound_encode(p);
580 }
581 }
582 void encode(
583 bufferlist::contiguous_appender& p,
584 uint64_t struct_v,
585 uint64_t sbid,
586 bool include_ref_map) const {
587 denc(blob, p, struct_v);
588 if (blob.is_shared()) {
589 denc(sbid, p);
590 }
591 if (include_ref_map) {
592 used_in_blob.encode(p);
593 }
594 }
595 void decode(
596 Collection *coll,
597 bufferptr::iterator& p,
598 uint64_t struct_v,
599 uint64_t* sbid,
600 bool include_ref_map);
601 #endif
602 };
603 typedef boost::intrusive_ptr<Blob> BlobRef;
604 typedef mempool::bluestore_meta_other::map<int,BlobRef> blob_map_t;
605
606 /// a logical extent, pointing to (some portion of) a blob
607 typedef boost::intrusive::set_base_hook<boost::intrusive::optimize_size<true> > ExtentBase; //making an alias to avoid build warnings
608 struct Extent : public ExtentBase {
609 MEMPOOL_CLASS_HELPERS();
610
611 uint32_t logical_offset = 0; ///< logical offset
612 uint32_t blob_offset = 0; ///< blob offset
613 uint32_t length = 0; ///< length
614 BlobRef blob; ///< the blob with our data
615
616 /// ctor for lookup only
617 explicit Extent(uint32_t lo) : ExtentBase(), logical_offset(lo) { }
618 /// ctor for delayed initialization (see decode_some())
619 explicit Extent() : ExtentBase() {
620 }
621 /// ctor for general usage
622 Extent(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b)
623 : ExtentBase(),
624 logical_offset(lo), blob_offset(o), length(l) {
625 assign_blob(b);
626 }
627 ~Extent() {
628 if (blob) {
629 blob->shared_blob->get_cache()->rm_extent();
630 }
631 }
632
633 void assign_blob(const BlobRef& b) {
634 assert(!blob);
635 blob = b;
636 blob->shared_blob->get_cache()->add_extent();
637 }
638
639 // comparators for intrusive_set
640 friend bool operator<(const Extent &a, const Extent &b) {
641 return a.logical_offset < b.logical_offset;
642 }
643 friend bool operator>(const Extent &a, const Extent &b) {
644 return a.logical_offset > b.logical_offset;
645 }
646 friend bool operator==(const Extent &a, const Extent &b) {
647 return a.logical_offset == b.logical_offset;
648 }
649
650 uint32_t blob_start() const {
651 return logical_offset - blob_offset;
652 }
653
654 uint32_t blob_end() const {
655 return blob_start() + blob->get_blob().get_logical_length();
656 }
657
658 uint32_t logical_end() const {
659 return logical_offset + length;
660 }
661
662 // return true if any piece of the blob is out of
663 // the given range [o, o + l].
664 bool blob_escapes_range(uint32_t o, uint32_t l) const {
665 return blob_start() < o || blob_end() > o + l;
666 }
667 };
668 typedef boost::intrusive::set<Extent> extent_map_t;
669
670
671 friend ostream& operator<<(ostream& out, const Extent& e);
672
673 struct OldExtent {
674 boost::intrusive::list_member_hook<> old_extent_item;
675 Extent e;
676 PExtentVector r;
677 bool blob_empty; // flag to track the last removed extent that makes blob
678 // empty - required to update compression stat properly
679 OldExtent(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b)
680 : e(lo, o, l, b), blob_empty(false) {
681 }
682 static OldExtent* create(CollectionRef c,
683 uint32_t lo,
684 uint32_t o,
685 uint32_t l,
686 BlobRef& b);
687 };
688 typedef boost::intrusive::list<
689 OldExtent,
690 boost::intrusive::member_hook<
691 OldExtent,
692 boost::intrusive::list_member_hook<>,
693 &OldExtent::old_extent_item> > old_extent_map_t;
694
695 struct Onode;
696
697 /// a sharded extent map, mapping offsets to lextents to blobs
698 struct ExtentMap {
699 Onode *onode;
700 extent_map_t extent_map; ///< map of Extents to Blobs
701 blob_map_t spanning_blob_map; ///< blobs that span shards
702
703 struct Shard {
704 bluestore_onode_t::shard_info *shard_info = nullptr;
705 unsigned extents = 0; ///< count extents in this shard
706 bool loaded = false; ///< true if shard is loaded
707 bool dirty = false; ///< true if shard is dirty and needs reencoding
708 };
709 mempool::bluestore_meta_other::vector<Shard> shards; ///< shards
710
711 bufferlist inline_bl; ///< cached encoded map, if unsharded; empty=>dirty
712
713 uint32_t needs_reshard_begin = 0;
714 uint32_t needs_reshard_end = 0;
715
716 bool needs_reshard() const {
717 return needs_reshard_end > needs_reshard_begin;
718 }
719 void clear_needs_reshard() {
720 needs_reshard_begin = needs_reshard_end = 0;
721 }
722 void request_reshard(uint32_t begin, uint32_t end) {
723 if (begin < needs_reshard_begin) {
724 needs_reshard_begin = begin;
725 }
726 if (end > needs_reshard_end) {
727 needs_reshard_end = end;
728 }
729 }
730
731 struct DeleteDisposer {
732 void operator()(Extent *e) { delete e; }
733 };
734
735 ExtentMap(Onode *o);
736 ~ExtentMap() {
737 extent_map.clear_and_dispose(DeleteDisposer());
738 }
739
740 void clear() {
741 extent_map.clear_and_dispose(DeleteDisposer());
742 shards.clear();
743 inline_bl.clear();
744 clear_needs_reshard();
745 }
746
747 bool encode_some(uint32_t offset, uint32_t length, bufferlist& bl,
748 unsigned *pn);
749 unsigned decode_some(bufferlist& bl);
750
751 void bound_encode_spanning_blobs(size_t& p);
752 void encode_spanning_blobs(bufferlist::contiguous_appender& p);
753 void decode_spanning_blobs(bufferptr::iterator& p);
754
755 BlobRef get_spanning_blob(int id) {
756 auto p = spanning_blob_map.find(id);
757 assert(p != spanning_blob_map.end());
758 return p->second;
759 }
760
761 void update(KeyValueDB::Transaction t, bool force);
762 void reshard(
763 KeyValueDB *db,
764 KeyValueDB::Transaction t);
765
766 /// initialize Shards from the onode
767 void init_shards(bool loaded, bool dirty);
768
769 /// return index of shard containing offset
770 /// or -1 if not found
771 int seek_shard(uint32_t offset) {
772 size_t end = shards.size();
773 size_t mid, left = 0;
774 size_t right = end; // one passed the right end
775
776 while (left < right) {
777 mid = left + (right - left) / 2;
778 if (offset >= shards[mid].shard_info->offset) {
779 size_t next = mid + 1;
780 if (next >= end || offset < shards[next].shard_info->offset)
781 return mid;
782 //continue to search forwards
783 left = next;
784 } else {
785 //continue to search backwards
786 right = mid;
787 }
788 }
789
790 return -1; // not found
791 }
792
793 /// check if a range spans a shard
794 bool spans_shard(uint32_t offset, uint32_t length) {
795 if (shards.empty()) {
796 return false;
797 }
798 int s = seek_shard(offset);
799 assert(s >= 0);
800 if (s == (int)shards.size() - 1) {
801 return false; // last shard
802 }
803 if (offset + length <= shards[s+1].shard_info->offset) {
804 return false;
805 }
806 return true;
807 }
808
809 /// ensure that a range of the map is loaded
810 void fault_range(KeyValueDB *db,
811 uint32_t offset, uint32_t length);
812
813 /// ensure a range of the map is marked dirty
814 void dirty_range(KeyValueDB::Transaction t,
815 uint32_t offset, uint32_t length);
816
817 extent_map_t::iterator find(uint64_t offset);
818
819 /// find a lextent that includes offset
820 extent_map_t::iterator find_lextent(uint64_t offset);
821
822 /// seek to the first lextent including or after offset
823 extent_map_t::iterator seek_lextent(uint64_t offset);
824 extent_map_t::const_iterator seek_lextent(uint64_t offset) const;
825
826 /// add a new Extent
827 void add(uint32_t lo, uint32_t o, uint32_t l, BlobRef& b) {
828 extent_map.insert(*new Extent(lo, o, l, b));
829 }
830
831 /// remove (and delete) an Extent
832 void rm(extent_map_t::iterator p) {
833 extent_map.erase_and_dispose(p, DeleteDisposer());
834 }
835
836 bool has_any_lextents(uint64_t offset, uint64_t length);
837
838 /// consolidate adjacent lextents in extent_map
839 int compress_extent_map(uint64_t offset, uint64_t length);
840
841 /// punch a logical hole. add lextents to deref to target list.
842 void punch_hole(CollectionRef &c,
843 uint64_t offset, uint64_t length,
844 old_extent_map_t *old_extents);
845
846 /// put new lextent into lextent_map overwriting existing ones if
847 /// any and update references accordingly
848 Extent *set_lextent(CollectionRef &c,
849 uint64_t logical_offset,
850 uint64_t offset, uint64_t length,
851 BlobRef b,
852 old_extent_map_t *old_extents);
853
854 /// split a blob (and referring extents)
855 BlobRef split_blob(BlobRef lb, uint32_t blob_offset, uint32_t pos);
856 };
857
858 /// Compressed Blob Garbage collector
859 /*
860 The primary idea of the collector is to estimate a difference between
861 allocation units(AU) currently present for compressed blobs and new AUs
862 required to store that data uncompressed.
863 Estimation is performed for protrusive extents within a logical range
864 determined by a concatenation of old_extents collection and specific(current)
865 write request.
866 The root cause for old_extents use is the need to handle blob ref counts
867 properly. Old extents still hold blob refs and hence we need to traverse
868 the collection to determine if blob to be released.
869 Protrusive extents are extents that fit into the blob set in action
870 (ones that are below the logical range from above) but not removed totally
871 due to the current write.
872 E.g. for
873 extent1 <loffs = 100, boffs = 100, len = 100> ->
874 blob1<compressed, len_on_disk=4096, logical_len=8192>
875 extent2 <loffs = 200, boffs = 200, len = 100> ->
876 blob2<raw, len_on_disk=4096, llen=4096>
877 extent3 <loffs = 300, boffs = 300, len = 100> ->
878 blob1<compressed, len_on_disk=4096, llen=8192>
879 extent4 <loffs = 4096, boffs = 0, len = 100> ->
880 blob3<raw, len_on_disk=4096, llen=4096>
881 write(300~100)
882 protrusive extents are within the following ranges <0~300, 400~8192-400>
883 In this case existing AUs that might be removed due to GC (i.e. blob1)
884 use 2x4K bytes.
885 And new AUs expected after GC = 0 since extent1 to be merged into blob2.
886 Hence we should do a collect.
887 */
888 class GarbageCollector
889 {
890 public:
891 /// return amount of allocation units that might be saved due to GC
892 int64_t estimate(
893 uint64_t offset,
894 uint64_t length,
895 const ExtentMap& extent_map,
896 const old_extent_map_t& old_extents,
897 uint64_t min_alloc_size);
898
899 /// return a collection of extents to perform GC on
900 const vector<AllocExtent>& get_extents_to_collect() const {
901 return extents_to_collect;
902 }
903 GarbageCollector(CephContext* _cct) : cct(_cct) {}
904
905 private:
906 struct BlobInfo {
907 uint64_t referenced_bytes = 0; ///< amount of bytes referenced in blob
908 int64_t expected_allocations = 0; ///< new alloc units required
909 ///< in case of gc fulfilled
910 bool collect_candidate = false; ///< indicate if blob has any extents
911 ///< eligible for GC.
912 extent_map_t::const_iterator first_lextent; ///< points to the first
913 ///< lextent referring to
914 ///< the blob if any.
915 ///< collect_candidate flag
916 ///< determines the validity
917 extent_map_t::const_iterator last_lextent; ///< points to the last
918 ///< lextent referring to
919 ///< the blob if any.
920
921 BlobInfo(uint64_t ref_bytes) :
922 referenced_bytes(ref_bytes) {
923 }
924 };
925 CephContext* cct;
926 map<Blob*, BlobInfo> affected_blobs; ///< compressed blobs and their ref_map
927 ///< copies that are affected by the
928 ///< specific write
929
930 vector<AllocExtent> extents_to_collect; ///< protrusive extents that should
931 ///< be collected if GC takes place
932
933 boost::optional<uint64_t > used_alloc_unit; ///< last processed allocation
934 ///< unit when traversing
935 ///< protrusive extents.
936 ///< Other extents mapped to
937 ///< this AU to be ignored
938 ///< (except the case where
939 ///< uncompressed extent follows
940 ///< compressed one - see below).
941 BlobInfo* blob_info_counted = nullptr; ///< set if previous allocation unit
942 ///< caused expected_allocations
943 ///< counter increment at this blob.
944 ///< if uncompressed extent follows
945 ///< a decrement for the
946 ///< expected_allocations counter
947 ///< is needed
948 int64_t expected_allocations = 0; ///< new alloc units required in case
949 ///< of gc fulfilled
950 int64_t expected_for_release = 0; ///< alloc units currently used by
951 ///< compressed blobs that might
952 ///< gone after GC
953 uint64_t gc_start_offset; ///starting offset for GC
954 uint64_t gc_end_offset; ///ending offset for GC
955
956 protected:
957 void process_protrusive_extents(const BlueStore::ExtentMap& extent_map,
958 uint64_t start_offset,
959 uint64_t end_offset,
960 uint64_t start_touch_offset,
961 uint64_t end_touch_offset,
962 uint64_t min_alloc_size);
963 };
964
965 struct OnodeSpace;
966
967 /// an in-memory object
968 struct Onode {
969 MEMPOOL_CLASS_HELPERS();
970
971 std::atomic_int nref; ///< reference count
972 Collection *c;
973
974 ghobject_t oid;
975
976 /// key under PREFIX_OBJ where we are stored
977 mempool::bluestore_meta_other::string key;
978
979 boost::intrusive::list_member_hook<> lru_item;
980
981 bluestore_onode_t onode; ///< metadata stored as value in kv store
982 bool exists; ///< true if object logically exists
983
984 ExtentMap extent_map;
985
986 // track txc's that have not been committed to kv store (and whose
987 // effects cannot be read via the kvdb read methods)
988 std::atomic<int> flushing_count = {0};
989 std::mutex flush_lock; ///< protect flush_txns
990 std::condition_variable flush_cond; ///< wait here for uncommitted txns
991
992 Onode(Collection *c, const ghobject_t& o,
993 const mempool::bluestore_meta_other::string& k)
994 : nref(0),
995 c(c),
996 oid(o),
997 key(k),
998 exists(false),
999 extent_map(this) {
1000 }
1001
1002 void flush();
1003 void get() {
1004 ++nref;
1005 }
1006 void put() {
1007 if (--nref == 0)
1008 delete this;
1009 }
1010 };
1011 typedef boost::intrusive_ptr<Onode> OnodeRef;
1012
1013
1014 /// a cache (shard) of onodes and buffers
1015 struct Cache {
1016 CephContext* cct;
1017 PerfCounters *logger;
1018 std::recursive_mutex lock; ///< protect lru and other structures
1019
1020 std::atomic<uint64_t> num_extents = {0};
1021 std::atomic<uint64_t> num_blobs = {0};
1022
1023 size_t last_trim_seq = 0;
1024
1025 static Cache *create(CephContext* cct, string type, PerfCounters *logger);
1026
1027 Cache(CephContext* cct) : cct(cct), logger(nullptr) {}
1028 virtual ~Cache() {}
1029
1030 virtual void _add_onode(OnodeRef& o, int level) = 0;
1031 virtual void _rm_onode(OnodeRef& o) = 0;
1032 virtual void _touch_onode(OnodeRef& o) = 0;
1033
1034 virtual void _add_buffer(Buffer *b, int level, Buffer *near) = 0;
1035 virtual void _rm_buffer(Buffer *b) = 0;
1036 virtual void _move_buffer(Cache *src, Buffer *b) = 0;
1037 virtual void _adjust_buffer_size(Buffer *b, int64_t delta) = 0;
1038 virtual void _touch_buffer(Buffer *b) = 0;
1039
1040 virtual uint64_t _get_num_onodes() = 0;
1041 virtual uint64_t _get_buffer_bytes() = 0;
1042
1043 void add_extent() {
1044 ++num_extents;
1045 }
1046 void rm_extent() {
1047 --num_extents;
1048 }
1049
1050 void add_blob() {
1051 ++num_blobs;
1052 }
1053 void rm_blob() {
1054 --num_blobs;
1055 }
1056
1057 void trim(uint64_t target_bytes, float target_meta_ratio,
1058 float bytes_per_onode);
1059
1060 void trim_all();
1061
1062 virtual void _trim(uint64_t onode_max, uint64_t buffer_max) = 0;
1063
1064 virtual void add_stats(uint64_t *onodes, uint64_t *extents,
1065 uint64_t *blobs,
1066 uint64_t *buffers,
1067 uint64_t *bytes) = 0;
1068
1069 #ifdef DEBUG_CACHE
1070 virtual void _audit(const char *s) = 0;
1071 #else
1072 void _audit(const char *s) { /* no-op */ }
1073 #endif
1074 };
1075
1076 /// simple LRU cache for onodes and buffers
1077 struct LRUCache : public Cache {
1078 private:
1079 typedef boost::intrusive::list<
1080 Onode,
1081 boost::intrusive::member_hook<
1082 Onode,
1083 boost::intrusive::list_member_hook<>,
1084 &Onode::lru_item> > onode_lru_list_t;
1085 typedef boost::intrusive::list<
1086 Buffer,
1087 boost::intrusive::member_hook<
1088 Buffer,
1089 boost::intrusive::list_member_hook<>,
1090 &Buffer::lru_item> > buffer_lru_list_t;
1091
1092 onode_lru_list_t onode_lru;
1093
1094 buffer_lru_list_t buffer_lru;
1095 uint64_t buffer_size = 0;
1096
1097 public:
1098 LRUCache(CephContext* cct) : Cache(cct) {}
1099 uint64_t _get_num_onodes() override {
1100 return onode_lru.size();
1101 }
1102 void _add_onode(OnodeRef& o, int level) override {
1103 if (level > 0)
1104 onode_lru.push_front(*o);
1105 else
1106 onode_lru.push_back(*o);
1107 }
1108 void _rm_onode(OnodeRef& o) override {
1109 auto q = onode_lru.iterator_to(*o);
1110 onode_lru.erase(q);
1111 }
1112 void _touch_onode(OnodeRef& o) override;
1113
1114 uint64_t _get_buffer_bytes() override {
1115 return buffer_size;
1116 }
1117 void _add_buffer(Buffer *b, int level, Buffer *near) override {
1118 if (near) {
1119 auto q = buffer_lru.iterator_to(*near);
1120 buffer_lru.insert(q, *b);
1121 } else if (level > 0) {
1122 buffer_lru.push_front(*b);
1123 } else {
1124 buffer_lru.push_back(*b);
1125 }
1126 buffer_size += b->length;
1127 }
1128 void _rm_buffer(Buffer *b) override {
1129 assert(buffer_size >= b->length);
1130 buffer_size -= b->length;
1131 auto q = buffer_lru.iterator_to(*b);
1132 buffer_lru.erase(q);
1133 }
1134 void _move_buffer(Cache *src, Buffer *b) override {
1135 src->_rm_buffer(b);
1136 _add_buffer(b, 0, nullptr);
1137 }
1138 void _adjust_buffer_size(Buffer *b, int64_t delta) override {
1139 assert((int64_t)buffer_size + delta >= 0);
1140 buffer_size += delta;
1141 }
1142 void _touch_buffer(Buffer *b) override {
1143 auto p = buffer_lru.iterator_to(*b);
1144 buffer_lru.erase(p);
1145 buffer_lru.push_front(*b);
1146 _audit("_touch_buffer end");
1147 }
1148
1149 void _trim(uint64_t onode_max, uint64_t buffer_max) override;
1150
1151 void add_stats(uint64_t *onodes, uint64_t *extents,
1152 uint64_t *blobs,
1153 uint64_t *buffers,
1154 uint64_t *bytes) override {
1155 std::lock_guard<std::recursive_mutex> l(lock);
1156 *onodes += onode_lru.size();
1157 *extents += num_extents;
1158 *blobs += num_blobs;
1159 *buffers += buffer_lru.size();
1160 *bytes += buffer_size;
1161 }
1162
1163 #ifdef DEBUG_CACHE
1164 void _audit(const char *s) override;
1165 #endif
1166 };
1167
1168 // 2Q cache for buffers, LRU for onodes
1169 struct TwoQCache : public Cache {
1170 private:
1171 // stick with LRU for onodes for now (fixme?)
1172 typedef boost::intrusive::list<
1173 Onode,
1174 boost::intrusive::member_hook<
1175 Onode,
1176 boost::intrusive::list_member_hook<>,
1177 &Onode::lru_item> > onode_lru_list_t;
1178 typedef boost::intrusive::list<
1179 Buffer,
1180 boost::intrusive::member_hook<
1181 Buffer,
1182 boost::intrusive::list_member_hook<>,
1183 &Buffer::lru_item> > buffer_list_t;
1184
1185 onode_lru_list_t onode_lru;
1186
1187 buffer_list_t buffer_hot; ///< "Am" hot buffers
1188 buffer_list_t buffer_warm_in; ///< "A1in" newly warm buffers
1189 buffer_list_t buffer_warm_out; ///< "A1out" empty buffers we've evicted
1190 uint64_t buffer_bytes = 0; ///< bytes
1191
1192 enum {
1193 BUFFER_NEW = 0,
1194 BUFFER_WARM_IN, ///< in buffer_warm_in
1195 BUFFER_WARM_OUT, ///< in buffer_warm_out
1196 BUFFER_HOT, ///< in buffer_hot
1197 BUFFER_TYPE_MAX
1198 };
1199
1200 uint64_t buffer_list_bytes[BUFFER_TYPE_MAX] = {0}; ///< bytes per type
1201
1202 public:
1203 TwoQCache(CephContext* cct) : Cache(cct) {}
1204 uint64_t _get_num_onodes() override {
1205 return onode_lru.size();
1206 }
1207 void _add_onode(OnodeRef& o, int level) override {
1208 if (level > 0)
1209 onode_lru.push_front(*o);
1210 else
1211 onode_lru.push_back(*o);
1212 }
1213 void _rm_onode(OnodeRef& o) override {
1214 auto q = onode_lru.iterator_to(*o);
1215 onode_lru.erase(q);
1216 }
1217 void _touch_onode(OnodeRef& o) override;
1218
1219 uint64_t _get_buffer_bytes() override {
1220 return buffer_bytes;
1221 }
1222 void _add_buffer(Buffer *b, int level, Buffer *near) override;
1223 void _rm_buffer(Buffer *b) override;
1224 void _move_buffer(Cache *src, Buffer *b) override;
1225 void _adjust_buffer_size(Buffer *b, int64_t delta) override;
1226 void _touch_buffer(Buffer *b) override {
1227 switch (b->cache_private) {
1228 case BUFFER_WARM_IN:
1229 // do nothing (somewhat counter-intuitively!)
1230 break;
1231 case BUFFER_WARM_OUT:
1232 // move from warm_out to hot LRU
1233 assert(0 == "this happens via discard hint");
1234 break;
1235 case BUFFER_HOT:
1236 // move to front of hot LRU
1237 buffer_hot.erase(buffer_hot.iterator_to(*b));
1238 buffer_hot.push_front(*b);
1239 break;
1240 }
1241 _audit("_touch_buffer end");
1242 }
1243
1244 void _trim(uint64_t onode_max, uint64_t buffer_max) override;
1245
1246 void add_stats(uint64_t *onodes, uint64_t *extents,
1247 uint64_t *blobs,
1248 uint64_t *buffers,
1249 uint64_t *bytes) override {
1250 std::lock_guard<std::recursive_mutex> l(lock);
1251 *onodes += onode_lru.size();
1252 *extents += num_extents;
1253 *blobs += num_blobs;
1254 *buffers += buffer_hot.size() + buffer_warm_in.size();
1255 *bytes += buffer_bytes;
1256 }
1257
1258 #ifdef DEBUG_CACHE
1259 void _audit(const char *s) override;
1260 #endif
1261 };
1262
1263 struct OnodeSpace {
1264 private:
1265 Cache *cache;
1266
1267 /// forward lookups
1268 mempool::bluestore_meta_other::unordered_map<ghobject_t,OnodeRef> onode_map;
1269
1270 friend class Collection; // for split_cache()
1271
1272 public:
1273 OnodeSpace(Cache *c) : cache(c) {}
1274 ~OnodeSpace() {
1275 clear();
1276 }
1277
1278 OnodeRef add(const ghobject_t& oid, OnodeRef o);
1279 OnodeRef lookup(const ghobject_t& o);
1280 void remove(const ghobject_t& oid) {
1281 onode_map.erase(oid);
1282 }
1283 void rename(OnodeRef& o, const ghobject_t& old_oid,
1284 const ghobject_t& new_oid,
1285 const mempool::bluestore_meta_other::string& new_okey);
1286 void clear();
1287 bool empty();
1288
1289 /// return true if f true for any item
1290 bool map_any(std::function<bool(OnodeRef)> f);
1291 };
1292
1293 struct Collection : public CollectionImpl {
1294 BlueStore *store;
1295 Cache *cache; ///< our cache shard
1296 coll_t cid;
1297 bluestore_cnode_t cnode;
1298 RWLock lock;
1299
1300 bool exists;
1301
1302 SharedBlobSet shared_blob_set; ///< open SharedBlobs
1303
1304 // cache onodes on a per-collection basis to avoid lock
1305 // contention.
1306 OnodeSpace onode_map;
1307
1308 //pool options
1309 pool_opts_t pool_opts;
1310
1311 OnodeRef get_onode(const ghobject_t& oid, bool create);
1312
1313 // the terminology is confusing here, sorry!
1314 //
1315 // blob_t shared_blob_t
1316 // !shared unused -> open
1317 // shared !loaded -> open + shared
1318 // shared loaded -> open + shared + loaded
1319 //
1320 // i.e.,
1321 // open = SharedBlob is instantiated
1322 // shared = blob_t shared flag is set; SharedBlob is hashed.
1323 // loaded = SharedBlob::shared_blob_t is loaded from kv store
1324 void open_shared_blob(uint64_t sbid, BlobRef b);
1325 void load_shared_blob(SharedBlobRef sb);
1326 void make_blob_shared(uint64_t sbid, BlobRef b);
1327
1328 BlobRef new_blob() {
1329 BlobRef b = new Blob();
1330 b->shared_blob = new SharedBlob(this);
1331 return b;
1332 }
1333
1334 const coll_t &get_cid() override {
1335 return cid;
1336 }
1337
1338 bool contains(const ghobject_t& oid) {
1339 if (cid.is_meta())
1340 return oid.hobj.pool == -1;
1341 spg_t spgid;
1342 if (cid.is_pg(&spgid))
1343 return
1344 spgid.pgid.contains(cnode.bits, oid) &&
1345 oid.shard_id == spgid.shard;
1346 return false;
1347 }
1348
1349 void split_cache(Collection *dest);
1350 void trim_cache();
1351
1352 Collection(BlueStore *ns, Cache *ca, coll_t c);
1353 };
1354
1355 class OmapIteratorImpl : public ObjectMap::ObjectMapIteratorImpl {
1356 CollectionRef c;
1357 OnodeRef o;
1358 KeyValueDB::Iterator it;
1359 string head, tail;
1360 public:
1361 OmapIteratorImpl(CollectionRef c, OnodeRef o, KeyValueDB::Iterator it);
1362 int seek_to_first() override;
1363 int upper_bound(const string &after) override;
1364 int lower_bound(const string &to) override;
1365 bool valid() override;
1366 int next(bool validate=true) override;
1367 string key() override;
1368 bufferlist value() override;
1369 int status() override {
1370 return 0;
1371 }
1372 };
1373
1374 class OpSequencer;
1375 typedef boost::intrusive_ptr<OpSequencer> OpSequencerRef;
1376
1377 struct TransContext : public AioContext {
1378 typedef enum {
1379 STATE_PREPARE,
1380 STATE_AIO_WAIT,
1381 STATE_IO_DONE,
1382 STATE_KV_QUEUED, // queued for kv_sync_thread submission
1383 STATE_KV_SUBMITTED, // submitted to kv; not yet synced
1384 STATE_KV_DONE,
1385 STATE_DEFERRED_QUEUED, // in deferred_queue (pending or running)
1386 STATE_DEFERRED_CLEANUP, // remove deferred kv record
1387 STATE_DEFERRED_DONE,
1388 STATE_FINISHING,
1389 STATE_DONE,
1390 } state_t;
1391
1392 state_t state = STATE_PREPARE;
1393
1394 const char *get_state_name() {
1395 switch (state) {
1396 case STATE_PREPARE: return "prepare";
1397 case STATE_AIO_WAIT: return "aio_wait";
1398 case STATE_IO_DONE: return "io_done";
1399 case STATE_KV_QUEUED: return "kv_queued";
1400 case STATE_KV_SUBMITTED: return "kv_submitted";
1401 case STATE_KV_DONE: return "kv_done";
1402 case STATE_DEFERRED_QUEUED: return "deferred_queued";
1403 case STATE_DEFERRED_CLEANUP: return "deferred_cleanup";
1404 case STATE_DEFERRED_DONE: return "deferred_done";
1405 case STATE_FINISHING: return "finishing";
1406 case STATE_DONE: return "done";
1407 }
1408 return "???";
1409 }
1410
1411 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1412 const char *get_state_latency_name(int state) {
1413 switch (state) {
1414 case l_bluestore_state_prepare_lat: return "prepare";
1415 case l_bluestore_state_aio_wait_lat: return "aio_wait";
1416 case l_bluestore_state_io_done_lat: return "io_done";
1417 case l_bluestore_state_kv_queued_lat: return "kv_queued";
1418 case l_bluestore_state_kv_committing_lat: return "kv_committing";
1419 case l_bluestore_state_kv_done_lat: return "kv_done";
1420 case l_bluestore_state_deferred_queued_lat: return "deferred_queued";
1421 case l_bluestore_state_deferred_cleanup_lat: return "deferred_cleanup";
1422 case l_bluestore_state_finishing_lat: return "finishing";
1423 case l_bluestore_state_done_lat: return "done";
1424 }
1425 return "???";
1426 }
1427 #endif
1428
1429 void log_state_latency(PerfCounters *logger, int state) {
1430 utime_t lat, now = ceph_clock_now();
1431 lat = now - last_stamp;
1432 logger->tinc(state, lat);
1433 #if defined(WITH_LTTNG) && defined(WITH_EVENTTRACE)
1434 if (state >= l_bluestore_state_prepare_lat && state <= l_bluestore_state_done_lat) {
1435 double usecs = (now.to_nsec()-last_stamp.to_nsec())/1000;
1436 OID_ELAPSED("", usecs, get_state_latency_name(state));
1437 }
1438 #endif
1439 last_stamp = now;
1440 }
1441
1442 OpSequencerRef osr;
1443 boost::intrusive::list_member_hook<> sequencer_item;
1444
1445 uint64_t bytes = 0, cost = 0;
1446
1447 set<OnodeRef> onodes; ///< these need to be updated/written
1448 set<OnodeRef> modified_objects; ///< objects we modified (and need a ref)
1449 set<SharedBlobRef> shared_blobs; ///< these need to be updated/written
1450 set<SharedBlobRef> shared_blobs_written; ///< update these on io completion
1451
1452 KeyValueDB::Transaction t; ///< then we will commit this
1453 Context *oncommit = nullptr; ///< signal on commit
1454 Context *onreadable = nullptr; ///< signal on readable
1455 Context *onreadable_sync = nullptr; ///< signal on readable
1456 list<Context*> oncommits; ///< more commit completions
1457 list<CollectionRef> removed_collections; ///< colls we removed
1458
1459 boost::intrusive::list_member_hook<> deferred_queue_item;
1460 bluestore_deferred_transaction_t *deferred_txn = nullptr; ///< if any
1461
1462 interval_set<uint64_t> allocated, released;
1463 struct volatile_statfs{
1464 enum {
1465 STATFS_ALLOCATED = 0,
1466 STATFS_STORED,
1467 STATFS_COMPRESSED_ORIGINAL,
1468 STATFS_COMPRESSED,
1469 STATFS_COMPRESSED_ALLOCATED,
1470 STATFS_LAST
1471 };
1472 int64_t values[STATFS_LAST];
1473 volatile_statfs() {
1474 memset(this, 0, sizeof(volatile_statfs));
1475 }
1476 void reset() {
1477 *this = volatile_statfs();
1478 }
1479 int64_t& allocated() {
1480 return values[STATFS_ALLOCATED];
1481 }
1482 int64_t& stored() {
1483 return values[STATFS_STORED];
1484 }
1485 int64_t& compressed_original() {
1486 return values[STATFS_COMPRESSED_ORIGINAL];
1487 }
1488 int64_t& compressed() {
1489 return values[STATFS_COMPRESSED];
1490 }
1491 int64_t& compressed_allocated() {
1492 return values[STATFS_COMPRESSED_ALLOCATED];
1493 }
1494 bool is_empty() {
1495 return values[STATFS_ALLOCATED] == 0 &&
1496 values[STATFS_STORED] == 0 &&
1497 values[STATFS_COMPRESSED] == 0 &&
1498 values[STATFS_COMPRESSED_ORIGINAL] == 0 &&
1499 values[STATFS_COMPRESSED_ALLOCATED] == 0;
1500 }
1501 void decode(bufferlist::iterator& it) {
1502 for (size_t i = 0; i < STATFS_LAST; i++) {
1503 ::decode(values[i], it);
1504 }
1505 }
1506
1507 void encode(bufferlist& bl) {
1508 for (size_t i = 0; i < STATFS_LAST; i++) {
1509 ::encode(values[i], bl);
1510 }
1511 }
1512 } statfs_delta;
1513
1514
1515 IOContext ioc;
1516 bool had_ios = false; ///< true if we submitted IOs before our kv txn
1517
1518 CollectionRef first_collection; ///< first referenced collection
1519
1520 uint64_t seq = 0;
1521 utime_t start;
1522 utime_t last_stamp;
1523
1524 uint64_t last_nid = 0; ///< if non-zero, highest new nid we allocated
1525 uint64_t last_blobid = 0; ///< if non-zero, highest new blobid we allocated
1526
1527 explicit TransContext(CephContext* cct, OpSequencer *o)
1528 : osr(o),
1529 ioc(cct, this),
1530 start(ceph_clock_now()) {
1531 last_stamp = start;
1532 }
1533 ~TransContext() {
1534 delete deferred_txn;
1535 }
1536
1537 void write_onode(OnodeRef &o) {
1538 onodes.insert(o);
1539 }
1540 void write_shared_blob(SharedBlobRef &sb) {
1541 shared_blobs.insert(sb);
1542 }
1543 /// note we logically modified object (when onode itself is unmodified)
1544 void note_modified_object(OnodeRef &o) {
1545 // onode itself isn't written, though
1546 modified_objects.insert(o);
1547 }
1548 void removed(OnodeRef& o) {
1549 onodes.erase(o);
1550 modified_objects.erase(o);
1551 }
1552
1553 void aio_finish(BlueStore *store) override {
1554 store->txc_aio_finish(this);
1555 }
1556 };
1557
1558 typedef boost::intrusive::list<
1559 TransContext,
1560 boost::intrusive::member_hook<
1561 TransContext,
1562 boost::intrusive::list_member_hook<>,
1563 &TransContext::deferred_queue_item> > deferred_queue_t;
1564
1565 struct DeferredBatch : public AioContext {
1566 OpSequencer *osr;
1567 struct deferred_io {
1568 bufferlist bl; ///< data
1569 uint64_t seq; ///< deferred transaction seq
1570 };
1571 map<uint64_t,deferred_io> iomap; ///< map of ios in this batch
1572 deferred_queue_t txcs; ///< txcs in this batch
1573 IOContext ioc; ///< our aios
1574 /// bytes of pending io for each deferred seq (may be 0)
1575 map<uint64_t,int> seq_bytes;
1576
1577 void _discard(CephContext *cct, uint64_t offset, uint64_t length);
1578 void _audit(CephContext *cct);
1579
1580 DeferredBatch(CephContext *cct, OpSequencer *osr)
1581 : osr(osr), ioc(cct, this) {}
1582
1583 /// prepare a write
1584 void prepare_write(CephContext *cct,
1585 uint64_t seq, uint64_t offset, uint64_t length,
1586 bufferlist::const_iterator& p);
1587
1588 void aio_finish(BlueStore *store) override {
1589 store->_deferred_aio_finish(osr);
1590 }
1591 };
1592
1593 class OpSequencer : public Sequencer_impl {
1594 public:
1595 std::mutex qlock;
1596 std::condition_variable qcond;
1597 typedef boost::intrusive::list<
1598 TransContext,
1599 boost::intrusive::member_hook<
1600 TransContext,
1601 boost::intrusive::list_member_hook<>,
1602 &TransContext::sequencer_item> > q_list_t;
1603 q_list_t q; ///< transactions
1604
1605 boost::intrusive::list_member_hook<> deferred_osr_queue_item;
1606
1607 DeferredBatch *deferred_running = nullptr;
1608 DeferredBatch *deferred_pending = nullptr;
1609
1610 Sequencer *parent;
1611 BlueStore *store;
1612
1613 uint64_t last_seq = 0;
1614
1615 std::atomic_int txc_with_unstable_io = {0}; ///< num txcs with unstable io
1616
1617 std::atomic_int kv_committing_serially = {0};
1618
1619 std::atomic_int kv_submitted_waiters = {0};
1620
1621 std::atomic_bool registered = {true}; ///< registered in BlueStore's osr_set
1622 std::atomic_bool zombie = {false}; ///< owning Sequencer has gone away
1623
1624 OpSequencer(CephContext* cct, BlueStore *store)
1625 : Sequencer_impl(cct),
1626 parent(NULL), store(store) {
1627 store->register_osr(this);
1628 }
1629 ~OpSequencer() override {
1630 assert(q.empty());
1631 _unregister();
1632 }
1633
1634 void discard() override {
1635 // Note that we may have txc's in flight when the parent Sequencer
1636 // goes away. Reflect this with zombie==registered==true and let
1637 // _osr_drain_all clean up later.
1638 assert(!zombie);
1639 zombie = true;
1640 parent = nullptr;
1641 bool empty;
1642 {
1643 std::lock_guard<std::mutex> l(qlock);
1644 empty = q.empty();
1645 }
1646 if (empty) {
1647 _unregister();
1648 }
1649 }
1650
1651 void _unregister() {
1652 if (registered) {
1653 store->unregister_osr(this);
1654 registered = false;
1655 }
1656 }
1657
1658 void queue_new(TransContext *txc) {
1659 std::lock_guard<std::mutex> l(qlock);
1660 txc->seq = ++last_seq;
1661 q.push_back(*txc);
1662 }
1663
1664 void drain() {
1665 std::unique_lock<std::mutex> l(qlock);
1666 while (!q.empty())
1667 qcond.wait(l);
1668 }
1669
1670 void drain_preceding(TransContext *txc) {
1671 std::unique_lock<std::mutex> l(qlock);
1672 while (!q.empty() && &q.front() != txc)
1673 qcond.wait(l);
1674 }
1675
1676 bool _is_all_kv_submitted() {
1677 // caller must hold qlock
1678 if (q.empty()) {
1679 return true;
1680 }
1681 TransContext *txc = &q.back();
1682 if (txc->state >= TransContext::STATE_KV_SUBMITTED) {
1683 return true;
1684 }
1685 return false;
1686 }
1687
1688 void flush() override {
1689 std::unique_lock<std::mutex> l(qlock);
1690 while (true) {
1691 // set flag before the check because the condition
1692 // may become true outside qlock, and we need to make
1693 // sure those threads see waiters and signal qcond.
1694 ++kv_submitted_waiters;
1695 if (_is_all_kv_submitted()) {
1696 return;
1697 }
1698 qcond.wait(l);
1699 --kv_submitted_waiters;
1700 }
1701 }
1702
1703 bool flush_commit(Context *c) override {
1704 std::lock_guard<std::mutex> l(qlock);
1705 if (q.empty()) {
1706 return true;
1707 }
1708 TransContext *txc = &q.back();
1709 if (txc->state >= TransContext::STATE_KV_DONE) {
1710 return true;
1711 }
1712 txc->oncommits.push_back(c);
1713 return false;
1714 }
1715 };
1716
1717 typedef boost::intrusive::list<
1718 OpSequencer,
1719 boost::intrusive::member_hook<
1720 OpSequencer,
1721 boost::intrusive::list_member_hook<>,
1722 &OpSequencer::deferred_osr_queue_item> > deferred_osr_queue_t;
1723
1724 struct KVSyncThread : public Thread {
1725 BlueStore *store;
1726 explicit KVSyncThread(BlueStore *s) : store(s) {}
1727 void *entry() override {
1728 store->_kv_sync_thread();
1729 return NULL;
1730 }
1731 };
1732
1733 struct DBHistogram {
1734 struct value_dist {
1735 uint64_t count;
1736 uint32_t max_len;
1737 };
1738
1739 struct key_dist {
1740 uint64_t count;
1741 uint32_t max_len;
1742 map<int, struct value_dist> val_map; ///< slab id to count, max length of value and key
1743 };
1744
1745 map<string, map<int, struct key_dist> > key_hist;
1746 map<int, uint64_t> value_hist;
1747 int get_key_slab(size_t sz);
1748 string get_key_slab_to_range(int slab);
1749 int get_value_slab(size_t sz);
1750 string get_value_slab_to_range(int slab);
1751 void update_hist_entry(map<string, map<int, struct key_dist> > &key_hist,
1752 const string &prefix, size_t key_size, size_t value_size);
1753 void dump(Formatter *f);
1754 };
1755
1756 // --------------------------------------------------------
1757 // members
1758 private:
1759 BlueFS *bluefs = nullptr;
1760 unsigned bluefs_shared_bdev = 0; ///< which bluefs bdev we are sharing
1761 bool bluefs_single_shared_device = true;
1762 utime_t bluefs_last_balance;
1763
1764 KeyValueDB *db = nullptr;
1765 BlockDevice *bdev = nullptr;
1766 std::string freelist_type;
1767 FreelistManager *fm = nullptr;
1768 Allocator *alloc = nullptr;
1769 uuid_d fsid;
1770 int path_fd = -1; ///< open handle to $path
1771 int fsid_fd = -1; ///< open handle (locked) to $path/fsid
1772 bool mounted = false;
1773
1774 RWLock coll_lock = {"BlueStore::coll_lock"}; ///< rwlock to protect coll_map
1775 mempool::bluestore_meta_other::unordered_map<coll_t, CollectionRef> coll_map;
1776
1777 vector<Cache*> cache_shards;
1778
1779 std::mutex osr_lock; ///< protect osd_set
1780 std::set<OpSequencerRef> osr_set; ///< set of all OpSequencers
1781
1782 std::atomic<uint64_t> nid_last = {0};
1783 std::atomic<uint64_t> nid_max = {0};
1784 std::atomic<uint64_t> blobid_last = {0};
1785 std::atomic<uint64_t> blobid_max = {0};
1786
1787 Throttle throttle_bytes; ///< submit to commit
1788 Throttle throttle_deferred_bytes; ///< submit to deferred complete
1789
1790 interval_set<uint64_t> bluefs_extents; ///< block extents owned by bluefs
1791 interval_set<uint64_t> bluefs_extents_reclaiming; ///< currently reclaiming
1792
1793 std::mutex deferred_lock;
1794 std::atomic<uint64_t> deferred_seq = {0};
1795 deferred_osr_queue_t deferred_queue; ///< osr's with deferred io pending
1796 int deferred_queue_size = 0; ///< num txc's queued across all osrs
1797 atomic_int deferred_aggressive = {0}; ///< aggressive wakeup of kv thread
1798
1799 int m_finisher_num = 1;
1800 vector<Finisher*> finishers;
1801
1802 KVSyncThread kv_sync_thread;
1803 std::mutex kv_lock;
1804 std::condition_variable kv_cond;
1805 bool kv_stop = false;
1806 deque<TransContext*> kv_queue; ///< ready, already submitted
1807 deque<TransContext*> kv_queue_unsubmitted; ///< ready, need submit by kv thread
1808 deque<TransContext*> kv_committing; ///< currently syncing
1809 deque<DeferredBatch*> deferred_done_queue; ///< deferred ios done
1810 deque<DeferredBatch*> deferred_stable_queue; ///< deferred ios done + stable
1811
1812 PerfCounters *logger = nullptr;
1813
1814 std::mutex reap_lock;
1815 list<CollectionRef> removed_collections;
1816
1817 RWLock debug_read_error_lock = {"BlueStore::debug_read_error_lock"};
1818 set<ghobject_t> debug_data_error_objects;
1819 set<ghobject_t> debug_mdata_error_objects;
1820
1821 std::atomic<int> csum_type = {Checksummer::CSUM_CRC32C};
1822
1823 uint64_t block_size = 0; ///< block size of block device (power of 2)
1824 uint64_t block_mask = 0; ///< mask to get just the block offset
1825 size_t block_size_order = 0; ///< bits to shift to get block size
1826
1827 uint64_t min_alloc_size = 0; ///< minimum allocation unit (power of 2)
1828 int deferred_batch_ops = 0; ///< deferred batch size
1829
1830 ///< bits for min_alloc_size
1831 std::atomic<uint8_t> min_alloc_size_order = {0};
1832 static_assert(std::numeric_limits<uint8_t>::max() >
1833 std::numeric_limits<decltype(min_alloc_size)>::digits,
1834 "not enough bits for min_alloc_size");
1835
1836 ///< size threshold for forced deferred writes
1837 std::atomic<uint64_t> prefer_deferred_size = {0};
1838
1839 ///< maximum allocation unit (power of 2)
1840 std::atomic<uint64_t> max_alloc_size = {0};
1841
1842 ///< approx cost per io, in bytes
1843 std::atomic<uint64_t> throttle_cost_per_io = {0};
1844
1845 std::atomic<Compressor::CompressionMode> comp_mode = {Compressor::COMP_NONE}; ///< compression mode
1846 CompressorRef compressor;
1847 std::atomic<uint64_t> comp_min_blob_size = {0};
1848 std::atomic<uint64_t> comp_max_blob_size = {0};
1849
1850 std::atomic<uint64_t> max_blob_size = {0}; ///< maximum blob size
1851
1852 // cache trim control
1853
1854 // note that these update in a racy way, but we don't *really* care if
1855 // they're perfectly accurate. they are all word sized so they will
1856 // individually update atomically, but may not be coherent with each other.
1857 size_t mempool_seq = 0;
1858 size_t mempool_bytes = 0;
1859 size_t mempool_onodes = 0;
1860
1861 void get_mempool_stats(size_t *seq, uint64_t *bytes, uint64_t *onodes) {
1862 *seq = mempool_seq;
1863 *bytes = mempool_bytes;
1864 *onodes = mempool_onodes;
1865 }
1866
1867 struct MempoolThread : public Thread {
1868 BlueStore *store;
1869 Cond cond;
1870 Mutex lock;
1871 bool stop = false;
1872 public:
1873 explicit MempoolThread(BlueStore *s)
1874 : store(s),
1875 lock("BlueStore::MempoolThread::lock") {}
1876 void *entry() override;
1877 void init() {
1878 assert(stop == false);
1879 create("bstore_mempool");
1880 }
1881 void shutdown() {
1882 lock.Lock();
1883 stop = true;
1884 cond.Signal();
1885 lock.Unlock();
1886 join();
1887 }
1888 } mempool_thread;
1889
1890 // --------------------------------------------------------
1891 // private methods
1892
1893 void _init_logger();
1894 void _shutdown_logger();
1895 int _reload_logger();
1896
1897 int _open_path();
1898 void _close_path();
1899 int _open_fsid(bool create);
1900 int _lock_fsid();
1901 int _read_fsid(uuid_d *f);
1902 int _write_fsid();
1903 void _close_fsid();
1904 void _set_alloc_sizes();
1905 void _set_blob_size();
1906
1907 int _open_bdev(bool create);
1908 void _close_bdev();
1909 int _open_db(bool create);
1910 void _close_db();
1911 int _open_fm(bool create);
1912 void _close_fm();
1913 int _open_alloc();
1914 void _close_alloc();
1915 int _open_collections(int *errors=0);
1916 void _close_collections();
1917
1918 int _setup_block_symlink_or_file(string name, string path, uint64_t size,
1919 bool create);
1920
1921 int _write_bdev_label(string path, bluestore_bdev_label_t label);
1922 public:
1923 static int _read_bdev_label(CephContext* cct, string path,
1924 bluestore_bdev_label_t *label);
1925 private:
1926 int _check_or_set_bdev_label(string path, uint64_t size, string desc,
1927 bool create);
1928
1929 int _open_super_meta();
1930
1931 int _reconcile_bluefs_freespace();
1932 int _balance_bluefs_freespace(PExtentVector *extents);
1933 void _commit_bluefs_freespace(const PExtentVector& extents);
1934
1935 CollectionRef _get_collection(const coll_t& cid);
1936 void _queue_reap_collection(CollectionRef& c);
1937 void _reap_collections();
1938 void _update_cache_logger();
1939
1940 void _assign_nid(TransContext *txc, OnodeRef o);
1941 uint64_t _assign_blobid(TransContext *txc);
1942
1943 void _dump_onode(OnodeRef o, int log_level=30);
1944 void _dump_extent_map(ExtentMap& em, int log_level=30);
1945 void _dump_transaction(Transaction *t, int log_level = 30);
1946
1947 TransContext *_txc_create(OpSequencer *osr);
1948 void _txc_update_store_statfs(TransContext *txc);
1949 void _txc_add_transaction(TransContext *txc, Transaction *t);
1950 void _txc_calc_cost(TransContext *txc);
1951 void _txc_write_nodes(TransContext *txc, KeyValueDB::Transaction t);
1952 void _txc_state_proc(TransContext *txc);
1953 void _txc_aio_submit(TransContext *txc);
1954 public:
1955 void txc_aio_finish(void *p) {
1956 _txc_state_proc(static_cast<TransContext*>(p));
1957 }
1958 private:
1959 void _txc_finish_io(TransContext *txc);
1960 void _txc_finalize_kv(TransContext *txc, KeyValueDB::Transaction t);
1961 void _txc_applied_kv(TransContext *txc);
1962 void _txc_committed_kv(TransContext *txc);
1963 void _txc_finish(TransContext *txc);
1964 void _txc_release_alloc(TransContext *txc);
1965
1966 void _osr_drain_preceding(TransContext *txc);
1967 void _osr_drain_all();
1968 void _osr_unregister_all();
1969
1970 void _kv_sync_thread();
1971 void _kv_stop() {
1972 {
1973 std::lock_guard<std::mutex> l(kv_lock);
1974 kv_stop = true;
1975 kv_cond.notify_all();
1976 }
1977 kv_sync_thread.join();
1978 {
1979 std::lock_guard<std::mutex> l(kv_lock);
1980 kv_stop = false;
1981 }
1982 }
1983
1984 bluestore_deferred_op_t *_get_deferred_op(TransContext *txc, OnodeRef o);
1985 void _deferred_queue(TransContext *txc);
1986 void deferred_try_submit() {
1987 std::lock_guard<std::mutex> l(deferred_lock);
1988 _deferred_try_submit();
1989 }
1990 void _deferred_try_submit();
1991 void _deferred_submit(OpSequencer *osr);
1992 void _deferred_aio_finish(OpSequencer *osr);
1993 int _deferred_replay();
1994
1995 public:
1996 using mempool_dynamic_bitset =
1997 boost::dynamic_bitset<uint64_t,
1998 mempool::bluestore_fsck::pool_allocator<uint64_t>>;
1999
2000 private:
2001 int _fsck_check_extents(
2002 const ghobject_t& oid,
2003 const PExtentVector& extents,
2004 bool compressed,
2005 mempool_dynamic_bitset &used_blocks,
2006 store_statfs_t& expected_statfs);
2007
2008 void _buffer_cache_write(
2009 TransContext *txc,
2010 BlobRef b,
2011 uint64_t offset,
2012 bufferlist& bl,
2013 unsigned flags) {
2014 b->shared_blob->bc.write(b->shared_blob->get_cache(), txc->seq, offset, bl,
2015 flags);
2016 txc->shared_blobs_written.insert(b->shared_blob);
2017 }
2018
2019 int _collection_list(
2020 Collection *c, const ghobject_t& start, const ghobject_t& end,
2021 int max, vector<ghobject_t> *ls, ghobject_t *next);
2022
2023 template <typename T, typename F>
2024 T select_option(const std::string& opt_name, T val1, F f) {
2025 //NB: opt_name reserved for future use
2026 boost::optional<T> val2 = f();
2027 if (val2) {
2028 return *val2;
2029 }
2030 return val1;
2031 }
2032
2033 void _apply_padding(uint64_t head_pad,
2034 uint64_t tail_pad,
2035 bufferlist& bl,
2036 bufferlist& padded);
2037
2038 // -- ondisk version ---
2039 public:
2040 const int32_t latest_ondisk_format = 2; ///< our version
2041 const int32_t min_readable_ondisk_format = 1; ///< what we can read
2042 const int32_t min_compat_ondisk_format = 2; ///< who can read us
2043
2044 private:
2045 int32_t ondisk_format = 0; ///< value detected on mount
2046
2047 int _upgrade_super(); ///< upgrade (called during open_super)
2048 void _prepare_ondisk_format_super(KeyValueDB::Transaction& t);
2049
2050 // --- public interface ---
2051 public:
2052 BlueStore(CephContext *cct, const string& path);
2053 BlueStore(CephContext *cct, const string& path, uint64_t min_alloc_size); // Ctor for UT only
2054 ~BlueStore() override;
2055
2056 string get_type() override {
2057 return "bluestore";
2058 }
2059
2060 bool needs_journal() override { return false; };
2061 bool wants_journal() override { return false; };
2062 bool allows_journal() override { return false; };
2063
2064 static int get_block_device_fsid(CephContext* cct, const string& path,
2065 uuid_d *fsid);
2066
2067 bool test_mount_in_use() override;
2068
2069 private:
2070 int _mount(bool kv_only);
2071 public:
2072 int mount() override {
2073 return _mount(false);
2074 }
2075 int umount() override;
2076
2077 int start_kv_only(KeyValueDB **pdb) {
2078 int r = _mount(true);
2079 if (r < 0)
2080 return r;
2081 *pdb = db;
2082 return 0;
2083 }
2084
2085 int fsck(bool deep) override;
2086
2087 void set_cache_shards(unsigned num) override;
2088
2089 int validate_hobject_key(const hobject_t &obj) const override {
2090 return 0;
2091 }
2092 unsigned get_max_attr_name_length() override {
2093 return 256; // arbitrary; there is no real limit internally
2094 }
2095
2096 int mkfs() override;
2097 int mkjournal() override {
2098 return 0;
2099 }
2100
2101 void get_db_statistics(Formatter *f) override;
2102 void generate_db_histogram(Formatter *f) override;
2103 void flush_cache() override;
2104 void dump_perf_counters(Formatter *f) override {
2105 f->open_object_section("perf_counters");
2106 logger->dump_formatted(f, false);
2107 f->close_section();
2108 }
2109
2110 void register_osr(OpSequencer *osr) {
2111 std::lock_guard<std::mutex> l(osr_lock);
2112 osr_set.insert(osr);
2113 }
2114 void unregister_osr(OpSequencer *osr) {
2115 std::lock_guard<std::mutex> l(osr_lock);
2116 osr_set.erase(osr);
2117 }
2118
2119 public:
2120 int statfs(struct store_statfs_t *buf) override;
2121
2122 void collect_metadata(map<string,string> *pm) override;
2123
2124 bool exists(const coll_t& cid, const ghobject_t& oid) override;
2125 bool exists(CollectionHandle &c, const ghobject_t& oid) override;
2126 int set_collection_opts(
2127 const coll_t& cid,
2128 const pool_opts_t& opts) override;
2129 int stat(
2130 const coll_t& cid,
2131 const ghobject_t& oid,
2132 struct stat *st,
2133 bool allow_eio = false) override;
2134 int stat(
2135 CollectionHandle &c,
2136 const ghobject_t& oid,
2137 struct stat *st,
2138 bool allow_eio = false) override;
2139 int read(
2140 const coll_t& cid,
2141 const ghobject_t& oid,
2142 uint64_t offset,
2143 size_t len,
2144 bufferlist& bl,
2145 uint32_t op_flags = 0,
2146 bool allow_eio = false) override;
2147 int read(
2148 CollectionHandle &c,
2149 const ghobject_t& oid,
2150 uint64_t offset,
2151 size_t len,
2152 bufferlist& bl,
2153 uint32_t op_flags = 0,
2154 bool allow_eio = false) override;
2155 int _do_read(
2156 Collection *c,
2157 OnodeRef o,
2158 uint64_t offset,
2159 size_t len,
2160 bufferlist& bl,
2161 uint32_t op_flags = 0);
2162
2163 private:
2164 int _fiemap(CollectionHandle &c_, const ghobject_t& oid,
2165 uint64_t offset, size_t len, interval_set<uint64_t>& destset);
2166 public:
2167 int fiemap(const coll_t& cid, const ghobject_t& oid,
2168 uint64_t offset, size_t len, bufferlist& bl) override;
2169 int fiemap(CollectionHandle &c, const ghobject_t& oid,
2170 uint64_t offset, size_t len, bufferlist& bl) override;
2171 int fiemap(const coll_t& cid, const ghobject_t& oid,
2172 uint64_t offset, size_t len, map<uint64_t, uint64_t>& destmap) override;
2173 int fiemap(CollectionHandle &c, const ghobject_t& oid,
2174 uint64_t offset, size_t len, map<uint64_t, uint64_t>& destmap) override;
2175
2176
2177 int getattr(const coll_t& cid, const ghobject_t& oid, const char *name,
2178 bufferptr& value) override;
2179 int getattr(CollectionHandle &c, const ghobject_t& oid, const char *name,
2180 bufferptr& value) override;
2181
2182 int getattrs(const coll_t& cid, const ghobject_t& oid,
2183 map<string,bufferptr>& aset) override;
2184 int getattrs(CollectionHandle &c, const ghobject_t& oid,
2185 map<string,bufferptr>& aset) override;
2186
2187 int list_collections(vector<coll_t>& ls) override;
2188
2189 CollectionHandle open_collection(const coll_t &c) override;
2190
2191 bool collection_exists(const coll_t& c) override;
2192 int collection_empty(const coll_t& c, bool *empty) override;
2193 int collection_bits(const coll_t& c) override;
2194
2195 int collection_list(const coll_t& cid,
2196 const ghobject_t& start,
2197 const ghobject_t& end,
2198 int max,
2199 vector<ghobject_t> *ls, ghobject_t *next) override;
2200 int collection_list(CollectionHandle &c,
2201 const ghobject_t& start,
2202 const ghobject_t& end,
2203 int max,
2204 vector<ghobject_t> *ls, ghobject_t *next) override;
2205
2206 int omap_get(
2207 const coll_t& cid, ///< [in] Collection containing oid
2208 const ghobject_t &oid, ///< [in] Object containing omap
2209 bufferlist *header, ///< [out] omap header
2210 map<string, bufferlist> *out /// < [out] Key to value map
2211 ) override;
2212 int omap_get(
2213 CollectionHandle &c, ///< [in] Collection containing oid
2214 const ghobject_t &oid, ///< [in] Object containing omap
2215 bufferlist *header, ///< [out] omap header
2216 map<string, bufferlist> *out /// < [out] Key to value map
2217 ) override;
2218
2219 /// Get omap header
2220 int omap_get_header(
2221 const coll_t& cid, ///< [in] Collection containing oid
2222 const ghobject_t &oid, ///< [in] Object containing omap
2223 bufferlist *header, ///< [out] omap header
2224 bool allow_eio = false ///< [in] don't assert on eio
2225 ) override;
2226 int omap_get_header(
2227 CollectionHandle &c, ///< [in] Collection containing oid
2228 const ghobject_t &oid, ///< [in] Object containing omap
2229 bufferlist *header, ///< [out] omap header
2230 bool allow_eio = false ///< [in] don't assert on eio
2231 ) override;
2232
2233 /// Get keys defined on oid
2234 int omap_get_keys(
2235 const coll_t& cid, ///< [in] Collection containing oid
2236 const ghobject_t &oid, ///< [in] Object containing omap
2237 set<string> *keys ///< [out] Keys defined on oid
2238 ) override;
2239 int omap_get_keys(
2240 CollectionHandle &c, ///< [in] Collection containing oid
2241 const ghobject_t &oid, ///< [in] Object containing omap
2242 set<string> *keys ///< [out] Keys defined on oid
2243 ) override;
2244
2245 /// Get key values
2246 int omap_get_values(
2247 const coll_t& cid, ///< [in] Collection containing oid
2248 const ghobject_t &oid, ///< [in] Object containing omap
2249 const set<string> &keys, ///< [in] Keys to get
2250 map<string, bufferlist> *out ///< [out] Returned keys and values
2251 ) override;
2252 int omap_get_values(
2253 CollectionHandle &c, ///< [in] Collection containing oid
2254 const ghobject_t &oid, ///< [in] Object containing omap
2255 const set<string> &keys, ///< [in] Keys to get
2256 map<string, bufferlist> *out ///< [out] Returned keys and values
2257 ) override;
2258
2259 /// Filters keys into out which are defined on oid
2260 int omap_check_keys(
2261 const coll_t& cid, ///< [in] Collection containing oid
2262 const ghobject_t &oid, ///< [in] Object containing omap
2263 const set<string> &keys, ///< [in] Keys to check
2264 set<string> *out ///< [out] Subset of keys defined on oid
2265 ) override;
2266 int omap_check_keys(
2267 CollectionHandle &c, ///< [in] Collection containing oid
2268 const ghobject_t &oid, ///< [in] Object containing omap
2269 const set<string> &keys, ///< [in] Keys to check
2270 set<string> *out ///< [out] Subset of keys defined on oid
2271 ) override;
2272
2273 ObjectMap::ObjectMapIterator get_omap_iterator(
2274 const coll_t& cid, ///< [in] collection
2275 const ghobject_t &oid ///< [in] object
2276 ) override;
2277 ObjectMap::ObjectMapIterator get_omap_iterator(
2278 CollectionHandle &c, ///< [in] collection
2279 const ghobject_t &oid ///< [in] object
2280 ) override;
2281
2282 void set_fsid(uuid_d u) override {
2283 fsid = u;
2284 }
2285 uuid_d get_fsid() override {
2286 return fsid;
2287 }
2288
2289 uint64_t estimate_objects_overhead(uint64_t num_objects) override {
2290 return num_objects * 300; //assuming per-object overhead is 300 bytes
2291 }
2292
2293 struct BSPerfTracker {
2294 PerfCounters::avg_tracker<uint64_t> os_commit_latency;
2295 PerfCounters::avg_tracker<uint64_t> os_apply_latency;
2296
2297 objectstore_perf_stat_t get_cur_stats() const {
2298 objectstore_perf_stat_t ret;
2299 ret.os_commit_latency = os_commit_latency.avg();
2300 ret.os_apply_latency = os_apply_latency.avg();
2301 return ret;
2302 }
2303
2304 void update_from_perfcounters(PerfCounters &logger);
2305 } perf_tracker;
2306
2307 objectstore_perf_stat_t get_cur_stats() override {
2308 perf_tracker.update_from_perfcounters(*logger);
2309 return perf_tracker.get_cur_stats();
2310 }
2311 const PerfCounters* get_perf_counters() const override {
2312 return logger;
2313 }
2314
2315 int queue_transactions(
2316 Sequencer *osr,
2317 vector<Transaction>& tls,
2318 TrackedOpRef op = TrackedOpRef(),
2319 ThreadPool::TPHandle *handle = NULL) override;
2320
2321 // error injection
2322 void inject_data_error(const ghobject_t& o) override {
2323 RWLock::WLocker l(debug_read_error_lock);
2324 debug_data_error_objects.insert(o);
2325 }
2326 void inject_mdata_error(const ghobject_t& o) override {
2327 RWLock::WLocker l(debug_read_error_lock);
2328 debug_mdata_error_objects.insert(o);
2329 }
2330 private:
2331 bool _debug_data_eio(const ghobject_t& o) {
2332 if (!cct->_conf->bluestore_debug_inject_read_err) {
2333 return false;
2334 }
2335 RWLock::RLocker l(debug_read_error_lock);
2336 return debug_data_error_objects.count(o);
2337 }
2338 bool _debug_mdata_eio(const ghobject_t& o) {
2339 if (!cct->_conf->bluestore_debug_inject_read_err) {
2340 return false;
2341 }
2342 RWLock::RLocker l(debug_read_error_lock);
2343 return debug_mdata_error_objects.count(o);
2344 }
2345 void _debug_obj_on_delete(const ghobject_t& o) {
2346 if (cct->_conf->bluestore_debug_inject_read_err) {
2347 RWLock::WLocker l(debug_read_error_lock);
2348 debug_data_error_objects.erase(o);
2349 debug_mdata_error_objects.erase(o);
2350 }
2351 }
2352
2353 private:
2354
2355 // --------------------------------------------------------
2356 // read processing internal methods
2357 int _verify_csum(
2358 OnodeRef& o,
2359 const bluestore_blob_t* blob,
2360 uint64_t blob_xoffset,
2361 const bufferlist& bl,
2362 uint64_t logical_offset) const;
2363 int _decompress(bufferlist& source, bufferlist* result);
2364
2365
2366 // --------------------------------------------------------
2367 // write ops
2368
2369 struct WriteContext {
2370 bool buffered = false; ///< buffered write
2371 bool compress = false; ///< compressed write
2372 uint64_t target_blob_size = 0; ///< target (max) blob size
2373 unsigned csum_order = 0; ///< target checksum chunk order
2374
2375 old_extent_map_t old_extents; ///< must deref these blobs
2376
2377 struct write_item {
2378 uint64_t logical_offset; ///< write logical offset
2379 BlobRef b;
2380 uint64_t blob_length;
2381 uint64_t b_off;
2382 bufferlist bl;
2383 uint64_t b_off0; ///< original offset in a blob prior to padding
2384 uint64_t length0; ///< original data length prior to padding
2385
2386 bool mark_unused;
2387 bool new_blob; ///< whether new blob was created
2388
2389 write_item(
2390 uint64_t logical_offs,
2391 BlobRef b,
2392 uint64_t blob_len,
2393 uint64_t o,
2394 bufferlist& bl,
2395 uint64_t o0,
2396 uint64_t l0,
2397 bool _mark_unused,
2398 bool _new_blob)
2399 :
2400 logical_offset(logical_offs),
2401 b(b),
2402 blob_length(blob_len),
2403 b_off(o),
2404 bl(bl),
2405 b_off0(o0),
2406 length0(l0),
2407 mark_unused(_mark_unused),
2408 new_blob(_new_blob) {}
2409 };
2410 vector<write_item> writes; ///< blobs we're writing
2411
2412 /// partial clone of the context
2413 void fork(const WriteContext& other) {
2414 buffered = other.buffered;
2415 compress = other.compress;
2416 target_blob_size = other.target_blob_size;
2417 csum_order = other.csum_order;
2418 }
2419 void write(
2420 uint64_t loffs,
2421 BlobRef b,
2422 uint64_t blob_len,
2423 uint64_t o,
2424 bufferlist& bl,
2425 uint64_t o0,
2426 uint64_t len0,
2427 bool _mark_unused,
2428 bool _new_blob) {
2429 writes.emplace_back(loffs,
2430 b,
2431 blob_len,
2432 o,
2433 bl,
2434 o0,
2435 len0,
2436 _mark_unused,
2437 _new_blob);
2438 }
2439 /// Checks for writes to the same pextent within a blob
2440 bool has_conflict(
2441 BlobRef b,
2442 uint64_t loffs,
2443 uint64_t loffs_end,
2444 uint64_t min_alloc_size);
2445 };
2446
2447 void _do_write_small(
2448 TransContext *txc,
2449 CollectionRef &c,
2450 OnodeRef o,
2451 uint64_t offset, uint64_t length,
2452 bufferlist::iterator& blp,
2453 WriteContext *wctx);
2454 void _do_write_big(
2455 TransContext *txc,
2456 CollectionRef &c,
2457 OnodeRef o,
2458 uint64_t offset, uint64_t length,
2459 bufferlist::iterator& blp,
2460 WriteContext *wctx);
2461 int _do_alloc_write(
2462 TransContext *txc,
2463 CollectionRef c,
2464 OnodeRef o,
2465 WriteContext *wctx);
2466 void _wctx_finish(
2467 TransContext *txc,
2468 CollectionRef& c,
2469 OnodeRef o,
2470 WriteContext *wctx);
2471
2472 int _do_transaction(Transaction *t,
2473 TransContext *txc,
2474 ThreadPool::TPHandle *handle);
2475
2476 int _write(TransContext *txc,
2477 CollectionRef& c,
2478 OnodeRef& o,
2479 uint64_t offset, size_t len,
2480 bufferlist& bl,
2481 uint32_t fadvise_flags);
2482 void _pad_zeros(bufferlist *bl, uint64_t *offset,
2483 uint64_t chunk_size);
2484
2485 int _do_write(TransContext *txc,
2486 CollectionRef &c,
2487 OnodeRef o,
2488 uint64_t offset, uint64_t length,
2489 bufferlist& bl,
2490 uint32_t fadvise_flags);
2491 void _do_write_data(TransContext *txc,
2492 CollectionRef& c,
2493 OnodeRef o,
2494 uint64_t offset,
2495 uint64_t length,
2496 bufferlist& bl,
2497 WriteContext *wctx);
2498
2499 int _touch(TransContext *txc,
2500 CollectionRef& c,
2501 OnodeRef& o);
2502 int _do_zero(TransContext *txc,
2503 CollectionRef& c,
2504 OnodeRef& o,
2505 uint64_t offset, size_t len);
2506 int _zero(TransContext *txc,
2507 CollectionRef& c,
2508 OnodeRef& o,
2509 uint64_t offset, size_t len);
2510 void _do_truncate(TransContext *txc,
2511 CollectionRef& c,
2512 OnodeRef o,
2513 uint64_t offset);
2514 void _truncate(TransContext *txc,
2515 CollectionRef& c,
2516 OnodeRef& o,
2517 uint64_t offset);
2518 int _remove(TransContext *txc,
2519 CollectionRef& c,
2520 OnodeRef& o);
2521 int _do_remove(TransContext *txc,
2522 CollectionRef& c,
2523 OnodeRef o);
2524 int _setattr(TransContext *txc,
2525 CollectionRef& c,
2526 OnodeRef& o,
2527 const string& name,
2528 bufferptr& val);
2529 int _setattrs(TransContext *txc,
2530 CollectionRef& c,
2531 OnodeRef& o,
2532 const map<string,bufferptr>& aset);
2533 int _rmattr(TransContext *txc,
2534 CollectionRef& c,
2535 OnodeRef& o,
2536 const string& name);
2537 int _rmattrs(TransContext *txc,
2538 CollectionRef& c,
2539 OnodeRef& o);
2540 void _do_omap_clear(TransContext *txc, uint64_t id);
2541 int _omap_clear(TransContext *txc,
2542 CollectionRef& c,
2543 OnodeRef& o);
2544 int _omap_setkeys(TransContext *txc,
2545 CollectionRef& c,
2546 OnodeRef& o,
2547 bufferlist& bl);
2548 int _omap_setheader(TransContext *txc,
2549 CollectionRef& c,
2550 OnodeRef& o,
2551 bufferlist& header);
2552 int _omap_rmkeys(TransContext *txc,
2553 CollectionRef& c,
2554 OnodeRef& o,
2555 bufferlist& bl);
2556 int _omap_rmkey_range(TransContext *txc,
2557 CollectionRef& c,
2558 OnodeRef& o,
2559 const string& first, const string& last);
2560 int _set_alloc_hint(
2561 TransContext *txc,
2562 CollectionRef& c,
2563 OnodeRef& o,
2564 uint64_t expected_object_size,
2565 uint64_t expected_write_size,
2566 uint32_t flags);
2567 int _do_clone_range(TransContext *txc,
2568 CollectionRef& c,
2569 OnodeRef& oldo,
2570 OnodeRef& newo,
2571 uint64_t srcoff, uint64_t length, uint64_t dstoff);
2572 int _clone(TransContext *txc,
2573 CollectionRef& c,
2574 OnodeRef& oldo,
2575 OnodeRef& newo);
2576 int _clone_range(TransContext *txc,
2577 CollectionRef& c,
2578 OnodeRef& oldo,
2579 OnodeRef& newo,
2580 uint64_t srcoff, uint64_t length, uint64_t dstoff);
2581 int _rename(TransContext *txc,
2582 CollectionRef& c,
2583 OnodeRef& oldo,
2584 OnodeRef& newo,
2585 const ghobject_t& new_oid);
2586 int _create_collection(TransContext *txc, const coll_t &cid,
2587 unsigned bits, CollectionRef *c);
2588 int _remove_collection(TransContext *txc, const coll_t &cid,
2589 CollectionRef *c);
2590 int _split_collection(TransContext *txc,
2591 CollectionRef& c,
2592 CollectionRef& d,
2593 unsigned bits, int rem);
2594 };
2595
2596 inline ostream& operator<<(ostream& out, const BlueStore::OpSequencer& s) {
2597 return out << *s.parent;
2598 }
2599
2600 static inline void intrusive_ptr_add_ref(BlueStore::Onode *o) {
2601 o->get();
2602 }
2603 static inline void intrusive_ptr_release(BlueStore::Onode *o) {
2604 o->put();
2605 }
2606
2607 static inline void intrusive_ptr_add_ref(BlueStore::OpSequencer *o) {
2608 o->get();
2609 }
2610 static inline void intrusive_ptr_release(BlueStore::OpSequencer *o) {
2611 o->put();
2612 }
2613
2614 #endif