#include <boost/functional/hash.hpp>
#include <boost/dynamic_bitset.hpp>
-#include "include/assert.h"
+#include "include/ceph_assert.h"
#include "include/unordered_map.h"
-#include "include/memory.h"
#include "include/mempool.h"
+#include "common/bloom_filter.hpp"
#include "common/Finisher.h"
+#include "common/Throttle.h"
#include "common/perf_counters.h"
#include "common/PriorityCache.h"
#include "compressor/Compressor.h"
#include "bluestore_types.h"
#include "BlockDevice.h"
+#include "BlueFS.h"
#include "common/EventTrace.h"
class Allocator;
class FreelistManager;
-class BlueFS;
+class BlueStoreRepairer;
//#define DEBUG_CACHE
//#define DEBUG_DEFERRED
l_bluestore_first = 732430,
l_bluestore_kv_flush_lat,
l_bluestore_kv_commit_lat,
- l_bluestore_kv_lat,
+ l_bluestore_kv_sync_lat,
+ l_bluestore_kv_final_lat,
l_bluestore_state_prepare_lat,
l_bluestore_state_aio_wait_lat,
l_bluestore_state_io_done_lat,
l_bluestore_read_eio,
l_bluestore_reads_with_retries,
l_bluestore_fragmentation,
+ l_bluestore_omap_seek_to_first_lat,
+ l_bluestore_omap_upper_bound_lat,
+ l_bluestore_omap_lower_bound_lat,
+ l_bluestore_omap_next_lat,
+ l_bluestore_clist_lat,
l_bluestore_last
};
+#define META_POOL_ID ((uint64_t)-1ull)
+
class BlueStore : public ObjectStore,
+ public BlueFSDeviceExpander,
public md_config_obs_t {
// -----------------------------------------------------
// types
public:
// config observer
const char** get_tracked_conf_keys() const override;
- void handle_conf_change(const struct md_config_t *conf,
- const std::set<std::string> &changed) override;
+ void handle_conf_change(const ConfigProxy& conf,
+ const std::set<std::string> &changed) override;
+
+ //handler for discard event
+ void handle_discard(interval_set<uint64_t>& to_release);
void _set_csum();
void _set_compression();
}
void truncate(uint32_t newlen) {
- assert(newlen < length);
+ ceph_assert(newlen < length);
if (data.length()) {
bufferlist t;
t.substr_of(data, 0, newlen);
state_list_t writing; ///< writing buffers, sorted by seq, ascending
~BufferSpace() {
- assert(buffer_map.empty());
- assert(writing.empty());
+ ceph_assert(buffer_map.empty());
+ ceph_assert(writing.empty());
}
void _add_buffer(Cache* cache, Buffer *b, int level, Buffer *near) {
++it;
}
- assert(it->seq >= b->seq);
+ ceph_assert(it->seq >= b->seq);
// note that this will insert b before it
// hence the order is maintained
writing.insert(it, *b);
}
void _rm_buffer(Cache* cache,
map<uint32_t, std::unique_ptr<Buffer>>::iterator p) {
- assert(p != buffer_map.end());
+ ceph_assert(p != buffer_map.end());
cache->_audit("_rm_buffer start");
if (p->second->is_writing()) {
writing.erase(writing.iterator_to(*p->second));
// return value is the highest cache_private of a trimmed buffer, or 0.
int discard(Cache* cache, uint32_t offset, uint32_t length) {
- std::lock_guard<std::recursive_mutex> l(cache->lock);
+ std::lock_guard l(cache->lock);
return _discard(cache, offset, length);
}
int _discard(Cache* cache, uint32_t offset, uint32_t length);
void write(Cache* cache, uint64_t seq, uint32_t offset, bufferlist& bl,
unsigned flags) {
- std::lock_guard<std::recursive_mutex> l(cache->lock);
+ std::lock_guard l(cache->lock);
Buffer *b = new Buffer(this, Buffer::STATE_WRITING, seq, offset, bl,
flags);
b->cache_private = _discard(cache, offset, bl.length());
}
void _finish_write(Cache* cache, uint64_t seq);
void did_read(Cache* cache, uint32_t offset, bufferlist& bl) {
- std::lock_guard<std::recursive_mutex> l(cache->lock);
+ std::lock_guard l(cache->lock);
Buffer *b = new Buffer(this, Buffer::STATE_CLEAN, 0, offset, bl);
b->cache_private = _discard(cache, offset, bl.length());
_add_buffer(cache, b, 1, nullptr);
void split(Cache* cache, size_t pos, BufferSpace &r);
void dump(Cache* cache, Formatter *f) const {
- std::lock_guard<std::recursive_mutex> l(cache->lock);
+ std::lock_guard l(cache->lock);
f->open_array_section("buffers");
for (auto& i : buffer_map) {
f->open_object_section("buffer");
- assert(i.first == i.second->offset);
+ ceph_assert(i.first == i.second->offset);
i.second->dump(f);
f->close_section();
}
/// put logical references, and get back any released extents
void put_ref(uint64_t offset, uint32_t length,
- PExtentVector *r, set<SharedBlob*> *maybe_unshared_blobs);
+ PExtentVector *r, bool *unshare);
void finish_write(uint64_t seq);
/// a lookup table of SharedBlobs
struct SharedBlobSet {
- std::mutex lock; ///< protect lookup, insertion, removal
+ /// protect lookup, insertion, removal
+ ceph::mutex lock = ceph::make_mutex("BlueStore::SharedBlobSet::lock");
// we use a bare pointer because we don't want to affect the ref
// count
mempool::bluestore_cache_other::unordered_map<uint64_t,SharedBlob*> sb_map;
SharedBlobRef lookup(uint64_t sbid) {
- std::lock_guard<std::mutex> l(lock);
+ std::lock_guard l(lock);
auto p = sb_map.find(sbid);
if (p == sb_map.end() ||
p->second->nref == 0) {
}
void add(Collection* coll, SharedBlob *sb) {
- std::lock_guard<std::mutex> l(lock);
+ std::lock_guard l(lock);
sb_map[sb->get_sbid()] = sb;
sb->coll = coll;
}
bool remove(SharedBlob *sb, bool verify_nref_is_zero=false) {
- std::lock_guard<std::mutex> l(lock);
- assert(sb->get_parent() == this);
+ std::lock_guard l(lock);
+ ceph_assert(sb->get_parent() == this);
if (verify_nref_is_zero && sb->nref != 0) {
return false;
}
}
bool empty() {
- std::lock_guard<std::mutex> l(lock);
+ std::lock_guard l(lock);
return sb_map.empty();
}
- void dump(CephContext *cct, int lvl);
+ template <int LogLevelV>
+ void dump(CephContext *cct);
};
//#define CACHE_BLOB_BL // not sure if this is a win yet or not... :/
}
bool can_split() const {
- std::lock_guard<std::recursive_mutex> l(shared_blob->get_cache()->lock);
+ std::lock_guard l(shared_blob->get_cache()->lock);
// splitting a BufferSpace writing list is too hard; don't try.
return shared_blob->bc.writing.empty() &&
used_in_blob.can_split() &&
#ifdef CACHE_BLOB_BL
void _encode() const {
if (blob_bl.length() == 0 ) {
- ::encode(blob, blob_bl);
+ encode(blob, blob_bl);
} else {
- assert(blob_bl.length());
+ ceph_assert(blob_bl.length());
}
}
void bound_encode(
}
void decode(
Collection */*coll*/,
- bufferptr::iterator& p,
+ bufferptr::const_iterator& p,
bool include_ref_map) {
const char *start = p.get_pos();
denc(blob, p);
}
void decode(
Collection *coll,
- bufferptr::iterator& p,
+ bufferptr::const_iterator& p,
uint64_t struct_v,
uint64_t* sbid,
bool include_ref_map);
}
void assign_blob(const BlobRef& b) {
- assert(!blob);
+ ceph_assert(!blob);
blob = b;
blob->shared_blob->get_cache()->add_extent();
}
Onode *onode;
extent_map_t extent_map; ///< map of Extents to Blobs
blob_map_t spanning_blob_map; ///< blobs that span shards
+ typedef boost::intrusive_ptr<Onode> OnodeRef;
struct Shard {
bluestore_onode_t::shard_info *shard_info = nullptr;
uint32_t needs_reshard_begin = 0;
uint32_t needs_reshard_end = 0;
+ void dup(BlueStore* b, TransContext*, CollectionRef&, OnodeRef&, OnodeRef&,
+ uint64_t&, uint64_t&, uint64_t&);
+
bool needs_reshard() const {
return needs_reshard_end > needs_reshard_begin;
}
void bound_encode_spanning_blobs(size_t& p);
void encode_spanning_blobs(bufferlist::contiguous_appender& p);
- void decode_spanning_blobs(bufferptr::iterator& p);
+ void decode_spanning_blobs(bufferptr::const_iterator& p);
BlobRef get_spanning_blob(int id) {
auto p = spanning_blob_map.find(id);
- assert(p != spanning_blob_map.end());
+ ceph_assert(p != spanning_blob_map.end());
return p->second;
}
return false;
}
int s = seek_shard(offset);
- assert(s >= 0);
+ ceph_assert(s >= 0);
if (s == (int)shards.size() - 1) {
return false; // last shard
}
int64_t expected_for_release = 0; ///< alloc units currently used by
///< compressed blobs that might
///< gone after GC
- uint64_t gc_start_offset; ///starting offset for GC
- uint64_t gc_end_offset; ///ending offset for GC
+ uint64_t gc_start_offset = 0; ///starting offset for GC
+ uint64_t gc_end_offset = 0; ///ending offset for GC
protected:
void process_protrusive_extents(const BlueStore::ExtentMap& extent_map,
// track txc's that have not been committed to kv store (and whose
// effects cannot be read via the kvdb read methods)
std::atomic<int> flushing_count = {0};
- std::mutex flush_lock; ///< protect flush_txns
- std::condition_variable flush_cond; ///< wait here for uncommitted txns
+ /// protect flush_txns
+ ceph::mutex flush_lock = ceph::make_mutex("BlueStore::Onode::flush_lock");
+ ceph::condition_variable flush_cond; ///< wait here for uncommitted txns
Onode(Collection *c, const ghobject_t& o,
const mempool::bluestore_cache_other::string& k)
struct Cache {
CephContext* cct;
PerfCounters *logger;
- std::recursive_mutex lock; ///< protect lru and other structures
+
+ /// protect lru and other structures
+ ceph::recursive_mutex lock = {
+ ceph::make_recursive_mutex("BlueStore::Cache::lock") };
std::atomic<uint64_t> num_extents = {0};
std::atomic<uint64_t> num_blobs = {0};
uint64_t *bytes) = 0;
bool empty() {
- std::lock_guard<std::recursive_mutex> l(lock);
+ std::lock_guard l(lock);
return _get_num_onodes() == 0 && _get_buffer_bytes() == 0;
}
buffer_size += b->length;
}
void _rm_buffer(Buffer *b) override {
- assert(buffer_size >= b->length);
+ ceph_assert(buffer_size >= b->length);
buffer_size -= b->length;
auto q = buffer_lru.iterator_to(*b);
buffer_lru.erase(q);
_add_buffer(b, 0, nullptr);
}
void _adjust_buffer_size(Buffer *b, int64_t delta) override {
- assert((int64_t)buffer_size + delta >= 0);
+ ceph_assert((int64_t)buffer_size + delta >= 0);
buffer_size += delta;
}
void _touch_buffer(Buffer *b) override {
uint64_t *blobs,
uint64_t *buffers,
uint64_t *bytes) override {
- std::lock_guard<std::recursive_mutex> l(lock);
+ std::lock_guard l(lock);
*onodes += onode_lru.size();
*extents += num_extents;
*blobs += num_blobs;
break;
case BUFFER_WARM_OUT:
// move from warm_out to hot LRU
- assert(0 == "this happens via discard hint");
+ ceph_abort_msg("this happens via discard hint");
break;
case BUFFER_HOT:
// move to front of hot LRU
uint64_t *blobs,
uint64_t *buffers,
uint64_t *bytes) override {
- std::lock_guard<std::recursive_mutex> l(lock);
+ std::lock_guard l(lock);
*onodes += onode_lru.size();
*extents += num_extents;
*blobs += num_blobs;
void clear();
bool empty();
- void dump(CephContext *cct, int lvl);
+ template <int LogLevelV>
+ void dump(CephContext *cct);
/// return true if f true for any item
bool map_any(std::function<bool(OnodeRef)> f);
};
+ class OpSequencer;
+ typedef boost::intrusive_ptr<OpSequencer> OpSequencerRef;
+
struct Collection : public CollectionImpl {
BlueStore *store;
+ OpSequencerRef osr;
Cache *cache; ///< our cache shard
- coll_t cid;
bluestore_cnode_t cnode;
RWLock lock;
//pool options
pool_opts_t pool_opts;
+ ContextQueue *commit_queue;
OnodeRef get_onode(const ghobject_t& oid, bool create);
return b;
}
- const coll_t &get_cid() override {
- return cid;
- }
-
bool contains(const ghobject_t& oid) {
if (cid.is_meta())
return oid.hobj.pool == -1;
void split_cache(Collection *dest);
+ bool flush_commit(Context *c) override;
+ void flush() override;
+ void flush_all_but_last();
+
Collection(BlueStore *ns, Cache *ca, coll_t c);
};
OnodeRef o;
KeyValueDB::Iterator it;
string head, tail;
+
+ string _stringify() const;
+
public:
OmapIteratorImpl(CollectionRef c, OnodeRef o, KeyValueDB::Iterator it);
int seek_to_first() override;
int upper_bound(const string &after) override;
int lower_bound(const string &to) override;
bool valid() override;
- int next(bool validate=true) override;
+ int next() override;
string key() override;
bufferlist value() override;
int status() override {
}
};
- class OpSequencer;
- typedef boost::intrusive_ptr<OpSequencer> OpSequencerRef;
-
struct volatile_statfs{
enum {
STATFS_ALLOCATED = 0,
void reset() {
*this = volatile_statfs();
}
+ void publish(store_statfs_t* buf) const {
+ buf->allocated = allocated();
+ buf->data_stored = stored();
+ buf->data_compressed = compressed();
+ buf->data_compressed_original = compressed_original();
+ buf->data_compressed_allocated = compressed_allocated();
+ }
+
volatile_statfs& operator+=(const volatile_statfs& other) {
for (size_t i = 0; i < STATFS_LAST; ++i) {
values[i] += other.values[i];
int64_t& compressed_allocated() {
return values[STATFS_COMPRESSED_ALLOCATED];
}
+ int64_t allocated() const {
+ return values[STATFS_ALLOCATED];
+ }
+ int64_t stored() const {
+ return values[STATFS_STORED];
+ }
+ int64_t compressed_original() const {
+ return values[STATFS_COMPRESSED_ORIGINAL];
+ }
+ int64_t compressed() const {
+ return values[STATFS_COMPRESSED];
+ }
+ int64_t compressed_allocated() const {
+ return values[STATFS_COMPRESSED_ALLOCATED];
+ }
+ volatile_statfs& operator=(const store_statfs_t& st) {
+ values[STATFS_ALLOCATED] = st.allocated;
+ values[STATFS_STORED] = st.data_stored;
+ values[STATFS_COMPRESSED_ORIGINAL] = st.data_compressed_original;
+ values[STATFS_COMPRESSED] = st.data_compressed;
+ values[STATFS_COMPRESSED_ALLOCATED] = st.data_compressed_allocated;
+ return *this;
+ }
bool is_empty() {
return values[STATFS_ALLOCATED] == 0 &&
values[STATFS_STORED] == 0 &&
values[STATFS_COMPRESSED_ORIGINAL] == 0 &&
values[STATFS_COMPRESSED_ALLOCATED] == 0;
}
- void decode(bufferlist::iterator& it) {
+ void decode(bufferlist::const_iterator& it) {
+ using ceph::decode;
for (size_t i = 0; i < STATFS_LAST; i++) {
- ::decode(values[i], it);
+ decode(values[i], it);
}
}
void encode(bufferlist& bl) {
+ using ceph::encode;
for (size_t i = 0; i < STATFS_LAST; i++) {
- ::encode(values[i], bl);
+ encode(values[i], bl);
}
}
};
- struct TransContext : public AioContext {
+ struct TransContext final : public AioContext {
MEMPOOL_CLASS_HELPERS();
typedef enum {
}
#endif
- void log_state_latency(PerfCounters *logger, int state) {
+ utime_t log_state_latency(PerfCounters *logger, int state) {
utime_t lat, now = ceph_clock_now();
lat = now - last_stamp;
logger->tinc(state, lat);
}
#endif
last_stamp = now;
+ return lat;
}
- OpSequencerRef osr;
+ CollectionRef ch;
+ OpSequencerRef osr; // this should be ch->osr
boost::intrusive::list_member_hook<> sequencer_item;
uint64_t bytes = 0, cost = 0;
set<SharedBlobRef> shared_blobs_written; ///< update these on io completion
KeyValueDB::Transaction t; ///< then we will commit this
- Context *oncommit = nullptr; ///< signal on commit
- Context *onreadable = nullptr; ///< signal on readable
- Context *onreadable_sync = nullptr; ///< signal on readable
list<Context*> oncommits; ///< more commit completions
list<CollectionRef> removed_collections; ///< colls we removed
bluestore_deferred_transaction_t *deferred_txn = nullptr; ///< if any
interval_set<uint64_t> allocated, released;
- volatile_statfs statfs_delta;
-
+ volatile_statfs statfs_delta; ///< overall store statistics delta
+ uint64_t osd_pool_id = META_POOL_ID; ///< osd pool id we're operating on
+
IOContext ioc;
bool had_ios = false; ///< true if we submitted IOs before our kv txn
uint64_t last_nid = 0; ///< if non-zero, highest new nid we allocated
uint64_t last_blobid = 0; ///< if non-zero, highest new blobid we allocated
- explicit TransContext(CephContext* cct, OpSequencer *o)
- : osr(o),
+ explicit TransContext(CephContext* cct, Collection *c, OpSequencer *o,
+ list<Context*> *on_commits)
+ : ch(c),
+ osr(o),
ioc(cct, this),
start(ceph_clock_now()) {
last_stamp = start;
+ if (on_commits) {
+ oncommits.swap(*on_commits);
+ }
}
~TransContext() {
delete deferred_txn;
boost::intrusive::list_member_hook<>,
&TransContext::deferred_queue_item> > deferred_queue_t;
- struct DeferredBatch : public AioContext {
+ struct DeferredBatch final : public AioContext {
OpSequencer *osr;
struct deferred_io {
bufferlist bl; ///< data
}
};
- class OpSequencer : public Sequencer_impl {
+ class OpSequencer : public RefCountedObject {
public:
- std::mutex qlock;
- std::condition_variable qcond;
+ ceph::mutex qlock = ceph::make_mutex("BlueStore::OpSequencer::qlock");
+ ceph::condition_variable qcond;
typedef boost::intrusive::list<
TransContext,
boost::intrusive::member_hook<
DeferredBatch *deferred_running = nullptr;
DeferredBatch *deferred_pending = nullptr;
- Sequencer *parent;
BlueStore *store;
+ coll_t cid;
uint64_t last_seq = 0;
std::atomic_int kv_submitted_waiters = {0};
- std::atomic_bool registered = {true}; ///< registered in BlueStore's osr_set
- std::atomic_bool zombie = {false}; ///< owning Sequencer has gone away
+ std::atomic_bool zombie = {false}; ///< in zombie_osr set (collection going away)
- OpSequencer(CephContext* cct, BlueStore *store)
- : Sequencer_impl(cct),
- parent(NULL), store(store) {
- store->register_osr(this);
+ OpSequencer(BlueStore *store, const coll_t& c)
+ : RefCountedObject(store->cct, 0),
+ store(store), cid(c) {
}
- ~OpSequencer() override {
- assert(q.empty());
- _unregister();
- }
-
- void discard() override {
- // Note that we may have txc's in flight when the parent Sequencer
- // goes away. Reflect this with zombie==registered==true and let
- // _osr_drain_all clean up later.
- assert(!zombie);
- zombie = true;
- parent = nullptr;
- bool empty;
- {
- std::lock_guard<std::mutex> l(qlock);
- empty = q.empty();
- }
- if (empty) {
- _unregister();
- }
- }
-
- void _unregister() {
- if (registered) {
- store->unregister_osr(this);
- registered = false;
- }
+ ~OpSequencer() {
+ ceph_assert(q.empty());
}
void queue_new(TransContext *txc) {
- std::lock_guard<std::mutex> l(qlock);
+ std::lock_guard l(qlock);
txc->seq = ++last_seq;
q.push_back(*txc);
}
void drain() {
- std::unique_lock<std::mutex> l(qlock);
+ std::unique_lock l(qlock);
while (!q.empty())
qcond.wait(l);
}
void drain_preceding(TransContext *txc) {
- std::unique_lock<std::mutex> l(qlock);
+ std::unique_lock l(qlock);
while (!q.empty() && &q.front() != txc)
qcond.wait(l);
}
bool _is_all_kv_submitted() {
- // caller must hold qlock
- if (q.empty()) {
- return true;
- }
+ // caller must hold qlock & q.empty() must not empty
+ ceph_assert(!q.empty());
TransContext *txc = &q.back();
if (txc->state >= TransContext::STATE_KV_SUBMITTED) {
return true;
return false;
}
- void flush() override {
- std::unique_lock<std::mutex> l(qlock);
+ void flush() {
+ std::unique_lock l(qlock);
while (true) {
// set flag before the check because the condition
// may become true outside qlock, and we need to make
// sure those threads see waiters and signal qcond.
++kv_submitted_waiters;
- if (_is_all_kv_submitted()) {
+ if (q.empty() || _is_all_kv_submitted()) {
+ --kv_submitted_waiters;
return;
}
qcond.wait(l);
}
}
- bool flush_commit(Context *c) override {
- std::lock_guard<std::mutex> l(qlock);
+ void flush_all_but_last() {
+ std::unique_lock l(qlock);
+ assert (q.size() >= 1);
+ while (true) {
+ // set flag before the check because the condition
+ // may become true outside qlock, and we need to make
+ // sure those threads see waiters and signal qcond.
+ ++kv_submitted_waiters;
+ if (q.size() <= 1) {
+ --kv_submitted_waiters;
+ return;
+ } else {
+ auto it = q.rbegin();
+ it++;
+ if (it->state >= TransContext::STATE_KV_SUBMITTED) {
+ return;
+ }
+ }
+ qcond.wait(l);
+ --kv_submitted_waiters;
+ }
+ }
+
+ bool flush_commit(Context *c) {
+ std::lock_guard l(qlock);
if (q.empty()) {
return true;
}
BlueFS *bluefs = nullptr;
unsigned bluefs_shared_bdev = 0; ///< which bluefs bdev we are sharing
bool bluefs_single_shared_device = true;
- utime_t bluefs_last_balance;
- utime_t next_dump_on_bluefs_balance_failure;
+ mono_time bluefs_last_balance;
+ utime_t next_dump_on_bluefs_alloc_failure;
KeyValueDB *db = nullptr;
BlockDevice *bdev = nullptr;
RWLock coll_lock = {"BlueStore::coll_lock"}; ///< rwlock to protect coll_map
mempool::bluestore_cache_other::unordered_map<coll_t, CollectionRef> coll_map;
+ map<coll_t,CollectionRef> new_coll_map;
vector<Cache*> cache_shards;
- std::mutex osr_lock; ///< protect osd_set
- std::set<OpSequencerRef> osr_set; ///< set of all OpSequencers
+ /// protect zombie_osr_set
+ ceph::mutex zombie_osr_lock = ceph::make_mutex("BlueStore::zombie_osr_lock");
+ std::map<coll_t,OpSequencerRef> zombie_osr_set; ///< set of OpSequencers for deleted collections
std::atomic<uint64_t> nid_last = {0};
std::atomic<uint64_t> nid_max = {0};
interval_set<uint64_t> bluefs_extents; ///< block extents owned by bluefs
interval_set<uint64_t> bluefs_extents_reclaiming; ///< currently reclaiming
- std::mutex deferred_lock;
+ ceph::mutex deferred_lock = ceph::make_mutex("BlueStore::deferred_lock");
std::atomic<uint64_t> deferred_seq = {0};
deferred_osr_queue_t deferred_queue; ///< osr's with deferred io pending
int deferred_queue_size = 0; ///< num txc's queued across all osrs
atomic_int deferred_aggressive = {0}; ///< aggressive wakeup of kv thread
- Finisher deferred_finisher;
-
- int m_finisher_num = 1;
- vector<Finisher*> finishers;
+ Finisher deferred_finisher, finisher;
KVSyncThread kv_sync_thread;
- std::mutex kv_lock;
- std::condition_variable kv_cond;
+ ceph::mutex kv_lock = ceph::make_mutex("BlueStore::kv_lock");
+ ceph::condition_variable kv_cond;
bool _kv_only = false;
bool kv_sync_started = false;
bool kv_stop = false;
deque<TransContext*> kv_queue_unsubmitted; ///< ready, need submit by kv thread
deque<TransContext*> kv_committing; ///< currently syncing
deque<DeferredBatch*> deferred_done_queue; ///< deferred ios done
- deque<DeferredBatch*> deferred_stable_queue; ///< deferred ios done + stable
KVFinalizeThread kv_finalize_thread;
- std::mutex kv_finalize_lock;
- std::condition_variable kv_finalize_cond;
+ ceph::mutex kv_finalize_lock = ceph::make_mutex("BlueStore::kv_finalize_lock");
+ ceph::condition_variable kv_finalize_cond;
deque<TransContext*> kv_committing_to_finalize; ///< pending finalization
deque<DeferredBatch*> deferred_stable_to_finalize; ///< pending finalization
double cache_kv_ratio = 0; ///< cache ratio dedicated to kv (e.g., rocksdb)
double cache_data_ratio = 0; ///< cache ratio dedicated to object data
bool cache_autotune = false; ///< cache autotune setting
- uint64_t cache_autotune_chunk_size = 0; ///< cache autotune chunk size
double cache_autotune_interval = 0; ///< time to wait between cache rebalancing
uint64_t osd_memory_target = 0; ///< OSD memory target when autotuning cache
uint64_t osd_memory_base = 0; ///< OSD base memory when autotuning cache
double osd_memory_expected_fragmentation = 0; ///< expected memory fragmentation
- uint64_t osd_memory_cache_min = 0; ///< Min memory to assign when autotuning cahce
+ uint64_t osd_memory_cache_min = 0; ///< Min memory to assign when autotuning cache
double osd_memory_cache_resize_interval = 0; ///< Time to wait between cache resizing
- std::mutex vstatfs_lock;
+
+ typedef map<uint64_t, volatile_statfs> osd_pools_map;
+
+ ceph::mutex vstatfs_lock = ceph::make_mutex("BlueStore::vstatfs_lock");
volatile_statfs vstatfs;
+ osd_pools_map osd_pools; // protected by vstatfs_lock as well
+
+ bool per_pool_stat_collection = true;
struct MempoolThread : public Thread {
public:
BlueStore *store;
- Cond cond;
- Mutex lock;
+ ceph::condition_variable cond;
+ ceph::mutex lock = ceph::make_mutex("BlueStore::MempoolThread::lock");
bool stop = false;
uint64_t autotune_cache_size = 0;
+ std::shared_ptr<PriorityCache::PriCache> binned_kv_cache = nullptr;
struct MempoolCache : public PriorityCache::PriCache {
BlueStore *store;
- int64_t cache_bytes[PriorityCache::Priority::LAST+1];
+ int64_t cache_bytes[PriorityCache::Priority::LAST+1] = {0};
+ int64_t committed_bytes = 0;
double cache_ratio = 0;
MempoolCache(BlueStore *s) : store(s) {};
virtual uint64_t _get_used_bytes() const = 0;
virtual int64_t request_cache_bytes(
- PriorityCache::Priority pri, uint64_t chunk_bytes) const {
+ PriorityCache::Priority pri, uint64_t total_cache) const {
int64_t assigned = get_cache_bytes(pri);
switch (pri) {
// All cache items are currently shoved into the LAST priority
case PriorityCache::Priority::LAST:
{
- uint64_t usage = _get_used_bytes();
- int64_t request = PriorityCache::get_chunk(usage, chunk_bytes);
+ int64_t request = _get_used_bytes();
return(request > assigned) ? request - assigned : 0;
}
default:
virtual void add_cache_bytes(PriorityCache::Priority pri, int64_t bytes) {
cache_bytes[pri] += bytes;
}
- virtual int64_t commit_cache_size() {
- return get_cache_bytes();
+ virtual int64_t commit_cache_size(uint64_t total_cache) {
+ committed_bytes = PriorityCache::get_chunk(
+ get_cache_bytes(), total_cache);
+ return committed_bytes;
+ }
+ virtual int64_t get_committed_size() const {
+ return committed_bytes;
}
virtual double get_cache_ratio() const {
return cache_ratio;
double get_bytes_per_onode() const {
return (double)_get_used_bytes() / (double)_get_num_onodes();
}
- } meta_cache;
+ };
+ std::shared_ptr<MetaCache> meta_cache;
struct DataCache : public MempoolCache {
DataCache(BlueStore *s) : MempoolCache(s) {};
virtual string get_cache_name() const {
return "BlueStore Data Cache";
}
- } data_cache;
+ };
+ std::shared_ptr<DataCache> data_cache;
public:
explicit MempoolThread(BlueStore *s)
: store(s),
- lock("BlueStore::MempoolThread::lock"),
- meta_cache(MetaCache(s)),
- data_cache(DataCache(s)) {}
+ meta_cache(new MetaCache(s)),
+ data_cache(new DataCache(s)) {}
void *entry() override;
void init() {
- assert(stop == false);
+ ceph_assert(stop == false);
create("bstore_mempool");
}
void shutdown() {
- lock.Lock();
+ lock.lock();
stop = true;
- cond.Signal();
- lock.Unlock();
+ cond.notify_all();
+ lock.unlock();
join();
}
void _adjust_cache_settings();
void _trim_shards(bool interval_stats);
void _tune_cache_size(bool interval_stats);
- void _balance_cache(const std::list<PriorityCache::PriCache *>& caches);
- void _balance_cache_pri(int64_t *mem_avail,
- const std::list<PriorityCache::PriCache *>& caches,
- PriorityCache::Priority pri);
+ void _balance_cache(
+ const std::list<std::shared_ptr<PriorityCache::PriCache>>& caches);
+ void _balance_cache_pri(
+ int64_t *mem_avail,
+ const std::list<std::shared_ptr<PriorityCache::PriCache>>& caches,
+ PriorityCache::Priority pri);
} mempool_thread;
// --------------------------------------------------------
void _set_finisher_num();
int _open_bdev(bool create);
+ // Verifies if disk space is enough for reserved + min bluefs
+ // and alters the latter if needed.
+ // Depends on min_alloc_size hence should be called after
+ // its initialization (and outside of _open_bdev)
+ void _validate_bdev();
void _close_bdev();
- int _open_db(bool create);
+
+ int _minimal_open_bluefs(bool create);
+ void _minimal_close_bluefs();
+ int _open_bluefs(bool create);
+ void _close_bluefs();
+
+ // Limited (u)mount intended for BlueFS operations only
+ int _mount_for_bluefs();
+ void _umount_for_bluefs();
+
+
+ int _is_bluefs(bool create, bool* ret);
+ /*
+ * opens both DB and dependant super_meta, FreelistManager and allocator
+ * in the proper order
+ */
+ int _open_db_and_around(bool read_only);
+ void _close_db_and_around();
+
+ // updates legacy bluefs related recs in DB to a state valid for
+ // downgrades from nautilus.
+ void _sync_bluefs_and_fm();
+
+ /*
+ * @warning to_repair_db means that we open this db to repair it, will not
+ * hold the rocksdb's file lock.
+ */
+ int _open_db(bool create,
+ bool to_repair_db=false,
+ bool read_only = false);
void _close_db();
- int _open_fm(bool create);
+ int _open_fm(KeyValueDB::Transaction t);
void _close_fm();
int _open_alloc();
void _close_alloc();
int _open_super_meta();
void _open_statfs();
+ void _get_statfs_overall(struct store_statfs_t *buf);
+
+ void _dump_alloc_on_failure();
- void _dump_alloc_on_rebalance_failure();
- int _reconcile_bluefs_freespace();
- int _balance_bluefs_freespace(PExtentVector *extents);
- void _commit_bluefs_freespace(const PExtentVector& extents);
+ int64_t _get_bluefs_size_delta(uint64_t bluefs_free, uint64_t bluefs_total);
+ int _balance_bluefs_freespace();
CollectionRef _get_collection(const coll_t& cid);
void _queue_reap_collection(CollectionRef& c);
void _assign_nid(TransContext *txc, OnodeRef o);
uint64_t _assign_blobid(TransContext *txc);
- void _dump_onode(const OnodeRef& o, int log_level=30);
- void _dump_extent_map(ExtentMap& em, int log_level=30);
- void _dump_transaction(Transaction *t, int log_level = 30);
+ template <int LogLevelV>
+ friend void _dump_onode(CephContext *cct, const Onode& o);
+ template <int LogLevelV>
+ friend void _dump_extent_map(CephContext *cct, const ExtentMap& em);
+ template <int LogLevelV>
+ friend void _dump_transaction(CephContext *cct, Transaction *t);
- TransContext *_txc_create(OpSequencer *osr);
+ TransContext *_txc_create(Collection *c, OpSequencer *osr,
+ list<Context*> *on_commits);
void _txc_update_store_statfs(TransContext *txc);
void _txc_add_transaction(TransContext *txc, Transaction *t);
void _txc_calc_cost(TransContext *txc);
void _txc_finish(TransContext *txc);
void _txc_release_alloc(TransContext *txc);
+ void _osr_attach(Collection *c);
+ void _osr_register_zombie(OpSequencer *osr);
+ void _osr_drain(OpSequencer *osr);
void _osr_drain_preceding(TransContext *txc);
void _osr_drain_all();
- void _osr_unregister_all();
void _kv_start();
void _kv_stop();
private:
int _fsck_check_extents(
+ const coll_t& cid,
const ghobject_t& oid,
const PExtentVector& extents,
bool compressed,
mempool_dynamic_bitset &used_blocks,
uint64_t granularity,
+ BlueStoreRepairer* repairer,
store_statfs_t& expected_statfs);
+ using per_pool_statfs =
+ mempool::bluestore_fsck::map<uint64_t, store_statfs_t>;
+ void _fsck_check_pool_statfs(
+ per_pool_statfs& expected_pool_statfs,
+ bool need_per_pool_stats,
+ int& errors,
+ BlueStoreRepairer* repairer);
+
void _buffer_cache_write(
TransContext *txc,
BlobRef b,
uint64_t tail_pad,
bufferlist& padded);
+ void _record_onode(OnodeRef &o, KeyValueDB::Transaction &txn);
+
// -- ondisk version ---
public:
const int32_t latest_ondisk_format = 2; ///< our version
int32_t ondisk_format = 0; ///< value detected on mount
int _upgrade_super(); ///< upgrade (called during open_super)
+ uint64_t _get_ondisk_reserved() const;
void _prepare_ondisk_format_super(KeyValueDB::Transaction& t);
// --- public interface ---
bool wants_journal() override { return false; };
bool allows_journal() override { return false; };
+ int get_devices(set<string> *ls) override;
+
bool is_rotational() override;
bool is_journal_rotational() override;
return device_class;
}
+ int get_numa_node(
+ int *numa_node,
+ set<int> *nodes,
+ set<string> *failed) override;
+
static int get_block_device_fsid(CephContext* cct, const string& path,
uuid_d *fsid);
bool test_mount_in_use() override;
private:
- int _mount(bool kv_only);
+ int _mount(bool kv_only, bool open_db=true);
public:
int mount() override {
return _mount(false);
}
int umount() override;
- int start_kv_only(KeyValueDB **pdb) {
- int r = _mount(true);
+ int start_kv_only(KeyValueDB **pdb, bool open_db=true) {
+ int r = _mount(true, open_db);
if (r < 0)
return r;
*pdb = db;
int _fsck(bool deep, bool repair);
void set_cache_shards(unsigned num) override;
+ void dump_cache_stats(Formatter *f) override {
+ int onode_count = 0, buffers_bytes = 0;
+ for (auto i: cache_shards) {
+ onode_count += i->_get_num_onodes();
+ buffers_bytes += i->_get_buffer_bytes();
+ }
+ f->dump_int("bluestore_onode", onode_count);
+ f->dump_int("bluestore_buffers", buffers_bytes);
+ }
+ void dump_cache_stats(ostream& ss) override {
+ int onode_count = 0, buffers_bytes = 0;
+ for (auto i: cache_shards) {
+ onode_count += i->_get_num_onodes();
+ buffers_bytes += i->_get_buffer_bytes();
+ }
+ ss << "bluestore_onode: " << onode_count;
+ ss << "bluestore_buffers: " << buffers_bytes;
+ }
int validate_hobject_key(const hobject_t &obj) const override {
return 0;
void get_db_statistics(Formatter *f) override;
void generate_db_histogram(Formatter *f) override;
void _flush_cache();
- void flush_cache() override;
+ int flush_cache(ostream *os = NULL) override;
void dump_perf_counters(Formatter *f) override {
f->open_object_section("perf_counters");
logger->dump_formatted(f, false);
f->close_section();
}
- void register_osr(OpSequencer *osr) {
- std::lock_guard<std::mutex> l(osr_lock);
- osr_set.insert(osr);
- }
- void unregister_osr(OpSequencer *osr) {
- std::lock_guard<std::mutex> l(osr_lock);
- osr_set.erase(osr);
- }
+ int add_new_bluefs_device(int id, const string& path);
+ int migrate_to_existing_bluefs_device(const set<int>& devs_source,
+ int id);
+ int migrate_to_new_bluefs_device(const set<int>& devs_source,
+ int id,
+ const string& path);
+ int expand_devices(ostream& out);
+ string get_device_path(unsigned id);
public:
- int statfs(struct store_statfs_t *buf) override;
+ int statfs(struct store_statfs_t *buf,
+ osd_alert_list_t* alerts = nullptr) override;
+ int pool_statfs(uint64_t pool_id, struct store_statfs_t *buf) override;
void collect_metadata(map<string,string> *pm) override;
- bool exists(const coll_t& cid, const ghobject_t& oid) override;
bool exists(CollectionHandle &c, const ghobject_t& oid) override;
int set_collection_opts(
- const coll_t& cid,
+ CollectionHandle& c,
const pool_opts_t& opts) override;
- int stat(
- const coll_t& cid,
- const ghobject_t& oid,
- struct stat *st,
- bool allow_eio = false) override;
int stat(
CollectionHandle &c,
const ghobject_t& oid,
struct stat *st,
bool allow_eio = false) override;
- int read(
- const coll_t& cid,
- const ghobject_t& oid,
- uint64_t offset,
- size_t len,
- bufferlist& bl,
- uint32_t op_flags = 0) override;
int read(
CollectionHandle &c,
const ghobject_t& oid,
int _fiemap(CollectionHandle &c_, const ghobject_t& oid,
uint64_t offset, size_t len, interval_set<uint64_t>& destset);
public:
- int fiemap(const coll_t& cid, const ghobject_t& oid,
- uint64_t offset, size_t len, bufferlist& bl) override;
int fiemap(CollectionHandle &c, const ghobject_t& oid,
uint64_t offset, size_t len, bufferlist& bl) override;
- int fiemap(const coll_t& cid, const ghobject_t& oid,
- uint64_t offset, size_t len, map<uint64_t, uint64_t>& destmap) override;
int fiemap(CollectionHandle &c, const ghobject_t& oid,
uint64_t offset, size_t len, map<uint64_t, uint64_t>& destmap) override;
- int getattr(const coll_t& cid, const ghobject_t& oid, const char *name,
- bufferptr& value) override;
int getattr(CollectionHandle &c, const ghobject_t& oid, const char *name,
bufferptr& value) override;
- int getattrs(const coll_t& cid, const ghobject_t& oid,
- map<string,bufferptr>& aset) override;
int getattrs(CollectionHandle &c, const ghobject_t& oid,
map<string,bufferptr>& aset) override;
int list_collections(vector<coll_t>& ls) override;
CollectionHandle open_collection(const coll_t &c) override;
+ CollectionHandle create_new_collection(const coll_t& cid) override;
+ void set_collection_commit_queue(const coll_t& cid,
+ ContextQueue *commit_queue) override;
bool collection_exists(const coll_t& c) override;
- int collection_empty(const coll_t& c, bool *empty) override;
- int collection_bits(const coll_t& c) override;
+ int collection_empty(CollectionHandle& c, bool *empty) override;
+ int collection_bits(CollectionHandle& c) override;
- int collection_list(const coll_t& cid,
- const ghobject_t& start,
- const ghobject_t& end,
- int max,
- vector<ghobject_t> *ls, ghobject_t *next) override;
int collection_list(CollectionHandle &c,
const ghobject_t& start,
const ghobject_t& end,
int max,
vector<ghobject_t> *ls, ghobject_t *next) override;
- int omap_get(
- const coll_t& cid, ///< [in] Collection containing oid
- const ghobject_t &oid, ///< [in] Object containing omap
- bufferlist *header, ///< [out] omap header
- map<string, bufferlist> *out /// < [out] Key to value map
- ) override;
int omap_get(
CollectionHandle &c, ///< [in] Collection containing oid
const ghobject_t &oid, ///< [in] Object containing omap
) override;
/// Get omap header
- int omap_get_header(
- const coll_t& cid, ///< [in] Collection containing oid
- const ghobject_t &oid, ///< [in] Object containing omap
- bufferlist *header, ///< [out] omap header
- bool allow_eio = false ///< [in] don't assert on eio
- ) override;
int omap_get_header(
CollectionHandle &c, ///< [in] Collection containing oid
const ghobject_t &oid, ///< [in] Object containing omap
) override;
/// Get keys defined on oid
- int omap_get_keys(
- const coll_t& cid, ///< [in] Collection containing oid
- const ghobject_t &oid, ///< [in] Object containing omap
- set<string> *keys ///< [out] Keys defined on oid
- ) override;
int omap_get_keys(
CollectionHandle &c, ///< [in] Collection containing oid
const ghobject_t &oid, ///< [in] Object containing omap
) override;
/// Get key values
- int omap_get_values(
- const coll_t& cid, ///< [in] Collection containing oid
- const ghobject_t &oid, ///< [in] Object containing omap
- const set<string> &keys, ///< [in] Keys to get
- map<string, bufferlist> *out ///< [out] Returned keys and values
- ) override;
int omap_get_values(
CollectionHandle &c, ///< [in] Collection containing oid
const ghobject_t &oid, ///< [in] Object containing omap
) override;
/// Filters keys into out which are defined on oid
- int omap_check_keys(
- const coll_t& cid, ///< [in] Collection containing oid
- const ghobject_t &oid, ///< [in] Object containing omap
- const set<string> &keys, ///< [in] Keys to check
- set<string> *out ///< [out] Subset of keys defined on oid
- ) override;
int omap_check_keys(
CollectionHandle &c, ///< [in] Collection containing oid
const ghobject_t &oid, ///< [in] Object containing omap
set<string> *out ///< [out] Subset of keys defined on oid
) override;
- ObjectMap::ObjectMapIterator get_omap_iterator(
- const coll_t& cid, ///< [in] collection
- const ghobject_t &oid ///< [in] object
- ) override;
ObjectMap::ObjectMapIterator get_omap_iterator(
CollectionHandle &c, ///< [in] collection
const ghobject_t &oid ///< [in] object
}
struct BSPerfTracker {
- PerfCounters::avg_tracker<uint64_t> os_commit_latency;
- PerfCounters::avg_tracker<uint64_t> os_apply_latency;
+ PerfCounters::avg_tracker<uint64_t> os_commit_latency_ns;
+ PerfCounters::avg_tracker<uint64_t> os_apply_latency_ns;
objectstore_perf_stat_t get_cur_stats() const {
objectstore_perf_stat_t ret;
- ret.os_commit_latency = os_commit_latency.current_avg();
- ret.os_apply_latency = os_apply_latency.current_avg();
+ ret.os_commit_latency_ns = os_commit_latency_ns.current_avg();
+ ret.os_apply_latency_ns = os_apply_latency_ns.current_avg();
return ret;
}
}
int queue_transactions(
- Sequencer *osr,
+ CollectionHandle& ch,
vector<Transaction>& tls,
TrackedOpRef op = TrackedOpRef(),
ThreadPool::TPHandle *handle = NULL) override;
RWLock::WLocker l(debug_read_error_lock);
debug_mdata_error_objects.insert(o);
}
+
+ /// methods to inject various errors fsck can repair
+ void inject_broken_shared_blob_key(const string& key,
+ const bufferlist& bl);
+ void inject_leaked(uint64_t len);
+ void inject_false_free(coll_t cid, ghobject_t oid);
+ void inject_statfs(const string& key, const store_statfs_t& new_statfs);
+ void inject_misreference(coll_t cid1, ghobject_t oid1,
+ coll_t cid2, ghobject_t oid2,
+ uint64_t offset);
+
void compact() override {
- assert(db);
+ ceph_assert(db);
db->compact();
}
bool has_builtin_csum() const override {
return true;
}
+ /*
+ Allocate space for BlueFS from slow device.
+ Either automatically applies allocated extents to underlying
+ BlueFS (extents == nullptr) or just return them (non-null extents) provided
+ */
+ int allocate_bluefs_freespace(
+ uint64_t min_size,
+ uint64_t size,
+ PExtentVector* extents);
+
+ inline void log_latency(const char* name,
+ int idx,
+ const ceph::timespan& lat,
+ double lat_threshold,
+ const char* info = "") const;
+
+ inline void log_latency_fn(const char* name,
+ int idx,
+ const ceph::timespan& lat,
+ double lat_threshold,
+ std::function<string (const ceph::timespan& lat)> fn) const;
+
private:
bool _debug_data_eio(const ghobject_t& o) {
if (!cct->_conf->bluestore_debug_inject_read_err) {
debug_mdata_error_objects.erase(o);
}
}
+private:
+ ceph::mutex qlock = ceph::make_mutex("BlueStore::Alerts::qlock");
+ string failed_cmode;
+ set<string> failed_compressors;
+ string spillover_alert;
+ string legacy_statfs_alert;
+ string disk_size_mismatch_alert;
+
+ void _log_alerts(osd_alert_list_t& alerts);
+ bool _set_compression_alert(bool cmode, const char* s) {
+ std::lock_guard l(qlock);
+ if (cmode) {
+ bool ret = failed_cmode.empty();
+ failed_cmode = s;
+ return ret;
+ }
+ return failed_compressors.emplace(s).second;
+ }
+ void _clear_compression_alert() {
+ std::lock_guard l(qlock);
+ failed_compressors.clear();
+ failed_cmode.clear();
+ }
+
+ void _set_spillover_alert(const string& s) {
+ std::lock_guard l(qlock);
+ spillover_alert = s;
+ }
+ void _clear_spillover_alert() {
+ std::lock_guard l(qlock);
+ spillover_alert.clear();
+ }
+
+ void _check_legacy_statfs_alert();
+ void _set_disk_size_mismatch_alert(const string& s) {
+ std::lock_guard l(qlock);
+ disk_size_mismatch_alert = s;
+ }
private:
WriteContext *wctx,
set<SharedBlob*> *maybe_unshared_blobs=0);
- int _do_transaction(Transaction *t,
- TransContext *txc,
- ThreadPool::TPHandle *handle);
-
int _write(TransContext *txc,
CollectionRef& c,
OnodeRef& o,
int _rmattrs(TransContext *txc,
CollectionRef& c,
OnodeRef& o);
- void _do_omap_clear(TransContext *txc, uint64_t id);
+ void _do_omap_clear(TransContext *txc, const string& prefix, uint64_t id);
int _omap_clear(TransContext *txc,
CollectionRef& c,
OnodeRef& o);
unsigned bits, CollectionRef *c);
int _remove_collection(TransContext *txc, const coll_t &cid,
CollectionRef *c);
+ void _do_remove_collection(TransContext *txc, CollectionRef *c);
int _split_collection(TransContext *txc,
CollectionRef& c,
CollectionRef& d,
unsigned bits, int rem);
+ int _merge_collection(TransContext *txc,
+ CollectionRef *c,
+ CollectionRef& d,
+ unsigned bits);
+
+private:
+ std::atomic<uint64_t> out_of_sync_fm = {0};
+ // --------------------------------------------------------
+ // BlueFSDeviceExpander implementation
+ uint64_t get_recommended_expansion_delta(uint64_t bluefs_free,
+ uint64_t bluefs_total) override {
+ auto delta = _get_bluefs_size_delta(bluefs_free, bluefs_total);
+ return delta > 0 ? delta : 0;
+ }
+ int allocate_freespace(
+ uint64_t min_size,
+ uint64_t size,
+ PExtentVector& extents) override {
+ return allocate_bluefs_freespace(min_size, size, &extents);
+ };
};
-inline ostream& operator<<(ostream& out, const BlueStore::OpSequencer& s) {
- return out << *s.parent;
+inline ostream& operator<<(ostream& out, const BlueStore::volatile_statfs& s) {
+ return out
+ << " allocated:"
+ << s.values[BlueStore::volatile_statfs::STATFS_ALLOCATED]
+ << " stored:"
+ << s.values[BlueStore::volatile_statfs::STATFS_STORED]
+ << " compressed:"
+ << s.values[BlueStore::volatile_statfs::STATFS_COMPRESSED]
+ << " compressed_orig:"
+ << s.values[BlueStore::volatile_statfs::STATFS_COMPRESSED_ORIGINAL]
+ << " compressed_alloc:"
+ << s.values[BlueStore::volatile_statfs::STATFS_COMPRESSED_ALLOCATED];
}
static inline void intrusive_ptr_add_ref(BlueStore::Onode *o) {
o->put();
}
+class BlueStoreRepairer
+{
+public:
+ // to simplify future potential migration to mempools
+ using fsck_interval = interval_set<uint64_t>;
+
+ // Structure to track what pextents are used for specific cid/oid.
+ // Similar to Bloom filter positive and false-positive matches are
+ // possible only.
+ // Maintains two lists of bloom filters for both cids and oids
+ // where each list entry is a BF for specific disk pextent
+ // The length of the extent per filter is measured on init.
+ // Allows to filter out 'uninteresting' pextents to speadup subsequent
+ // 'is_used' access.
+ struct StoreSpaceTracker {
+ const uint64_t BLOOM_FILTER_SALT_COUNT = 2;
+ const uint64_t BLOOM_FILTER_TABLE_SIZE = 32; // bytes per single filter
+ const uint64_t BLOOM_FILTER_EXPECTED_COUNT = 16; // arbitrary selected
+ static const uint64_t DEF_MEM_CAP = 128 * 1024 * 1024;
+
+ typedef mempool::bluestore_fsck::vector<bloom_filter> bloom_vector;
+ bloom_vector collections_bfs;
+ bloom_vector objects_bfs;
+
+ bool was_filtered_out = false;
+ uint64_t granularity = 0; // extent length for a single filter
+
+ StoreSpaceTracker() {
+ }
+ StoreSpaceTracker(const StoreSpaceTracker& from) :
+ collections_bfs(from.collections_bfs),
+ objects_bfs(from.objects_bfs),
+ granularity(from.granularity) {
+ }
+
+ void init(uint64_t total,
+ uint64_t min_alloc_size,
+ uint64_t mem_cap = DEF_MEM_CAP) {
+ ceph_assert(!granularity); // not initialized yet
+ ceph_assert(min_alloc_size && isp2(min_alloc_size));
+ ceph_assert(mem_cap);
+
+ total = round_up_to(total, min_alloc_size);
+ granularity = total * BLOOM_FILTER_TABLE_SIZE * 2 / mem_cap;
+
+ if (!granularity) {
+ granularity = min_alloc_size;
+ } else {
+ granularity = round_up_to(granularity, min_alloc_size);
+ }
+
+ uint64_t entries = round_up_to(total, granularity) / granularity;
+ collections_bfs.resize(entries,
+ bloom_filter(BLOOM_FILTER_SALT_COUNT,
+ BLOOM_FILTER_TABLE_SIZE,
+ 0,
+ BLOOM_FILTER_EXPECTED_COUNT));
+ objects_bfs.resize(entries,
+ bloom_filter(BLOOM_FILTER_SALT_COUNT,
+ BLOOM_FILTER_TABLE_SIZE,
+ 0,
+ BLOOM_FILTER_EXPECTED_COUNT));
+ }
+ inline uint32_t get_hash(const coll_t& cid) const {
+ return cid.hash_to_shard(1);
+ }
+ inline void set_used(uint64_t offset, uint64_t len,
+ const coll_t& cid, const ghobject_t& oid) {
+ ceph_assert(granularity); // initialized
+
+ // can't call this func after filter_out has been applied
+ ceph_assert(!was_filtered_out);
+ if (!len) {
+ return;
+ }
+ auto pos = offset / granularity;
+ auto end_pos = (offset + len - 1) / granularity;
+ while (pos <= end_pos) {
+ collections_bfs[pos].insert(get_hash(cid));
+ objects_bfs[pos].insert(oid.hobj.get_hash());
+ ++pos;
+ }
+ }
+ // filter-out entries unrelated to the specified(broken) extents.
+ // 'is_used' calls are permitted after that only
+ size_t filter_out(const fsck_interval& extents);
+
+ // determines if collection's present after filtering-out
+ inline bool is_used(const coll_t& cid) const {
+ ceph_assert(was_filtered_out);
+ for(auto& bf : collections_bfs) {
+ if (bf.contains(get_hash(cid))) {
+ return true;
+ }
+ }
+ return false;
+ }
+ // determines if object's present after filtering-out
+ inline bool is_used(const ghobject_t& oid) const {
+ ceph_assert(was_filtered_out);
+ for(auto& bf : objects_bfs) {
+ if (bf.contains(oid.hobj.get_hash())) {
+ return true;
+ }
+ }
+ return false;
+ }
+ // determines if collection's present before filtering-out
+ inline bool is_used(const coll_t& cid, uint64_t offs) const {
+ ceph_assert(granularity); // initialized
+ ceph_assert(!was_filtered_out);
+ auto &bf = collections_bfs[offs / granularity];
+ if (bf.contains(get_hash(cid))) {
+ return true;
+ }
+ return false;
+ }
+ // determines if object's present before filtering-out
+ inline bool is_used(const ghobject_t& oid, uint64_t offs) const {
+ ceph_assert(granularity); // initialized
+ ceph_assert(!was_filtered_out);
+ auto &bf = objects_bfs[offs / granularity];
+ if (bf.contains(oid.hobj.get_hash())) {
+ return true;
+ }
+ return false;
+ }
+ };
+public:
+
+ bool remove_key(KeyValueDB *db, const string& prefix, const string& key);
+ bool fix_shared_blob(KeyValueDB *db,
+ uint64_t sbid,
+ const bufferlist* bl);
+ bool fix_statfs(KeyValueDB *db, const string& key,
+ const store_statfs_t& new_statfs);
+
+ bool fix_leaked(KeyValueDB *db,
+ FreelistManager* fm,
+ uint64_t offset, uint64_t len);
+ bool fix_false_free(KeyValueDB *db,
+ FreelistManager* fm,
+ uint64_t offset, uint64_t len);
+ bool fix_bluefs_extents(std::atomic<uint64_t>& out_of_sync_flag);
+
+ void init(uint64_t total_space, uint64_t lres_tracking_unit_size);
+
+ bool preprocess_misreference(KeyValueDB *db);
+
+ unsigned apply(KeyValueDB* db);
+
+ void note_misreference(uint64_t offs, uint64_t len, bool inc_error) {
+ misreferenced_extents.union_insert(offs, len);
+ if (inc_error) {
+ ++to_repair_cnt;
+ }
+ }
+
+ StoreSpaceTracker& get_space_usage_tracker() {
+ return space_usage_tracker;
+ }
+ const fsck_interval& get_misreferences() const {
+ return misreferenced_extents;
+ }
+ KeyValueDB::Transaction get_fix_misreferences_txn() {
+ return fix_misreferences_txn;
+ }
+
+private:
+ unsigned to_repair_cnt = 0;
+ KeyValueDB::Transaction fix_fm_leaked_txn;
+ KeyValueDB::Transaction fix_fm_false_free_txn;
+ KeyValueDB::Transaction remove_key_txn;
+ KeyValueDB::Transaction fix_statfs_txn;
+ KeyValueDB::Transaction fix_shared_blob_txn;
+
+ KeyValueDB::Transaction fix_misreferences_txn;
+
+ StoreSpaceTracker space_usage_tracker;
+
+ // non-shared extents with multiple references
+ fsck_interval misreferenced_extents;
+
+};
#endif