//#define DEBUG_CACHE
//#define DEBUG_DEFERRED
+
+
+// constants for Buffer::optimize()
+#define MAX_BUFFER_SLOP_RATIO_DEN 8 // so actually 1/N
+
+
enum {
l_bluestore_first = 732430,
l_bluestore_kv_flush_lat,
void _set_csum();
void _set_compression();
void _set_throttle_params();
+ int _set_cache_sizes();
class TransContext;
}
length = newlen;
}
+ void maybe_rebuild() {
+ if (data.length() &&
+ (data.get_num_buffers() > 1 ||
+ data.front().wasted() > data.length() / MAX_BUFFER_SLOP_RATIO_DEN)) {
+ data.rebuild();
+ }
+ }
void dump(Formatter *f) const {
f->dump_string("state", get_state_name(state));
boost::intrusive::list_member_hook<>,
&Buffer::state_item> > state_list_t;
- mempool::bluestore_meta_other::map<uint32_t, std::unique_ptr<Buffer>>
+ mempool::bluestore_cache_other::map<uint32_t, std::unique_ptr<Buffer>>
buffer_map;
// we use a bare intrusive list here instead of std::map because
cache->_audit("_add_buffer start");
buffer_map[b->offset].reset(b);
if (b->is_writing()) {
- writing.push_back(*b);
+ b->data.reassign_to_mempool(mempool::mempool_bluestore_writing);
+ if (writing.empty() || writing.rbegin()->seq <= b->seq) {
+ writing.push_back(*b);
+ } else {
+ auto it = writing.begin();
+ while (it->seq < b->seq) {
+ ++it;
+ }
+
+ assert(it->seq >= b->seq);
+ // note that this will insert b before it
+ // hence the order is maintained
+ writing.insert(it, *b);
+ }
} else {
+ b->data.reassign_to_mempool(mempool::mempool_bluestore_cache_data);
cache->_add_buffer(b, level, near);
}
cache->_audit("_add_buffer end");
void _rm_buffer(Cache* cache, Buffer *b) {
_rm_buffer(cache, buffer_map.find(b->offset));
}
- void _rm_buffer(Cache* cache, map<uint32_t, std::unique_ptr<Buffer>>::iterator p) {
+ void _rm_buffer(Cache* cache,
+ map<uint32_t, std::unique_ptr<Buffer>>::iterator p) {
assert(p != buffer_map.end());
cache->_audit("_rm_buffer start");
if (p->second->is_writing()) {
/// put logical references, and get back any released extents
void put_ref(uint64_t offset, uint32_t length,
- PExtentVector *r);
+ PExtentVector *r, set<SharedBlob*> *maybe_unshared_blobs);
friend bool operator==(const SharedBlob &l, const SharedBlob &r) {
return l.get_sbid() == r.get_sbid();
// we use a bare pointer because we don't want to affect the ref
// count
- mempool::bluestore_meta_other::unordered_map<uint64_t,SharedBlob*> sb_map;
+ mempool::bluestore_cache_other::unordered_map<uint64_t,SharedBlob*> sb_map;
SharedBlobRef lookup(uint64_t sbid) {
std::lock_guard<std::mutex> l(lock);
get_blob().can_split_at(blob_offset);
}
- bool try_reuse_blob(uint32_t min_alloc_size,
+ bool can_reuse_blob(uint32_t min_alloc_size,
uint32_t target_blob_size,
uint32_t b_offset,
uint32_t *length0);
#endif
}
- const bluestore_blob_t& get_blob() const {
+ inline const bluestore_blob_t& get_blob() const {
return blob;
}
- bluestore_blob_t& dirty_blob() {
+ inline bluestore_blob_t& dirty_blob() {
#ifdef CACHE_BLOB_BL
blob_bl.clear();
#endif
#endif
};
typedef boost::intrusive_ptr<Blob> BlobRef;
- typedef mempool::bluestore_meta_other::map<int,BlobRef> blob_map_t;
+ typedef mempool::bluestore_cache_other::map<int,BlobRef> blob_map_t;
/// a logical extent, pointing to (some portion of) a blob
typedef boost::intrusive::set_base_hook<boost::intrusive::optimize_size<true> > ExtentBase; //making an alias to avoid build warnings
bool loaded = false; ///< true if shard is loaded
bool dirty = false; ///< true if shard is dirty and needs reencoding
};
- mempool::bluestore_meta_other::vector<Shard> shards; ///< shards
+ mempool::bluestore_cache_other::vector<Shard> shards; ///< shards
bufferlist inline_bl; ///< cached encoded map, if unsharded; empty=>dirty
}
void update(KeyValueDB::Transaction t, bool force);
+ decltype(BlueStore::Blob::id) allocate_spanning_blob_id();
void reshard(
KeyValueDB *db,
KeyValueDB::Transaction t);
uint32_t offset, uint32_t length);
/// ensure a range of the map is marked dirty
- void dirty_range(KeyValueDB::Transaction t,
- uint32_t offset, uint32_t length);
+ void dirty_range(uint32_t offset, uint32_t length);
+ /// for seek_lextent test
extent_map_t::iterator find(uint64_t offset);
- /// find a lextent that includes offset
- extent_map_t::iterator find_lextent(uint64_t offset);
-
/// seek to the first lextent including or after offset
extent_map_t::iterator seek_lextent(uint64_t offset);
extent_map_t::const_iterator seek_lextent(uint64_t offset) const;
ghobject_t oid;
/// key under PREFIX_OBJ where we are stored
- mempool::bluestore_meta_other::string key;
+ mempool::bluestore_cache_other::string key;
boost::intrusive::list_member_hook<> lru_item;
std::condition_variable flush_cond; ///< wait here for uncommitted txns
Onode(Collection *c, const ghobject_t& o,
- const mempool::bluestore_meta_other::string& k)
+ const mempool::bluestore_cache_other::string& k)
: nref(0),
c(c),
oid(o),
std::atomic<uint64_t> num_extents = {0};
std::atomic<uint64_t> num_blobs = {0};
- size_t last_trim_seq = 0;
-
static Cache *create(CephContext* cct, string type, PerfCounters *logger);
Cache(CephContext* cct) : cct(cct), logger(nullptr) {}
--num_blobs;
}
- void trim(uint64_t target_bytes, float target_meta_ratio,
+ void trim(uint64_t target_bytes,
+ float target_meta_ratio,
+ float target_data_ratio,
float bytes_per_onode);
void trim_all();
uint64_t *buffers,
uint64_t *bytes) = 0;
+ bool empty() {
+ std::lock_guard<std::recursive_mutex> l(lock);
+ return _get_num_onodes() == 0 && _get_buffer_bytes() == 0;
+ }
+
#ifdef DEBUG_CACHE
virtual void _audit(const char *s) = 0;
#else
Cache *cache;
/// forward lookups
- mempool::bluestore_meta_other::unordered_map<ghobject_t,OnodeRef> onode_map;
+ mempool::bluestore_cache_other::unordered_map<ghobject_t,OnodeRef> onode_map;
friend class Collection; // for split_cache()
}
void rename(OnodeRef& o, const ghobject_t& old_oid,
const ghobject_t& new_oid,
- const mempool::bluestore_meta_other::string& new_okey);
+ const mempool::bluestore_cache_other::string& new_okey);
void clear();
bool empty();
void open_shared_blob(uint64_t sbid, BlobRef b);
void load_shared_blob(SharedBlobRef sb);
void make_blob_shared(uint64_t sbid, BlobRef b);
+ uint64_t make_blob_unshared(SharedBlob *sb);
BlobRef new_blob() {
BlobRef b = new Blob();
}
void split_cache(Collection *dest);
- void trim_cache();
Collection(BlueStore *ns, Cache *ca, coll_t c);
};
class OpSequencer;
typedef boost::intrusive_ptr<OpSequencer> OpSequencerRef;
+ struct volatile_statfs{
+ enum {
+ STATFS_ALLOCATED = 0,
+ STATFS_STORED,
+ STATFS_COMPRESSED_ORIGINAL,
+ STATFS_COMPRESSED,
+ STATFS_COMPRESSED_ALLOCATED,
+ STATFS_LAST
+ };
+ int64_t values[STATFS_LAST];
+ volatile_statfs() {
+ memset(this, 0, sizeof(volatile_statfs));
+ }
+ void reset() {
+ *this = volatile_statfs();
+ }
+ volatile_statfs& operator+=(const volatile_statfs& other) {
+ for (size_t i = 0; i < STATFS_LAST; ++i) {
+ values[i] += other.values[i];
+ }
+ return *this;
+ }
+ int64_t& allocated() {
+ return values[STATFS_ALLOCATED];
+ }
+ int64_t& stored() {
+ return values[STATFS_STORED];
+ }
+ int64_t& compressed_original() {
+ return values[STATFS_COMPRESSED_ORIGINAL];
+ }
+ int64_t& compressed() {
+ return values[STATFS_COMPRESSED];
+ }
+ int64_t& compressed_allocated() {
+ return values[STATFS_COMPRESSED_ALLOCATED];
+ }
+ bool is_empty() {
+ return values[STATFS_ALLOCATED] == 0 &&
+ values[STATFS_STORED] == 0 &&
+ values[STATFS_COMPRESSED] == 0 &&
+ values[STATFS_COMPRESSED_ORIGINAL] == 0 &&
+ values[STATFS_COMPRESSED_ALLOCATED] == 0;
+ }
+ void decode(bufferlist::iterator& it) {
+ for (size_t i = 0; i < STATFS_LAST; i++) {
+ ::decode(values[i], it);
+ }
+ }
+
+ void encode(bufferlist& bl) {
+ for (size_t i = 0; i < STATFS_LAST; i++) {
+ ::encode(values[i], bl);
+ }
+ }
+ };
+
struct TransContext : public AioContext {
+ MEMPOOL_CLASS_HELPERS();
+
typedef enum {
STATE_PREPARE,
STATE_AIO_WAIT,
bluestore_deferred_transaction_t *deferred_txn = nullptr; ///< if any
interval_set<uint64_t> allocated, released;
- struct volatile_statfs{
- enum {
- STATFS_ALLOCATED = 0,
- STATFS_STORED,
- STATFS_COMPRESSED_ORIGINAL,
- STATFS_COMPRESSED,
- STATFS_COMPRESSED_ALLOCATED,
- STATFS_LAST
- };
- int64_t values[STATFS_LAST];
- volatile_statfs() {
- memset(this, 0, sizeof(volatile_statfs));
- }
- void reset() {
- *this = volatile_statfs();
- }
- int64_t& allocated() {
- return values[STATFS_ALLOCATED];
- }
- int64_t& stored() {
- return values[STATFS_STORED];
- }
- int64_t& compressed_original() {
- return values[STATFS_COMPRESSED_ORIGINAL];
- }
- int64_t& compressed() {
- return values[STATFS_COMPRESSED];
- }
- int64_t& compressed_allocated() {
- return values[STATFS_COMPRESSED_ALLOCATED];
- }
- bool is_empty() {
- return values[STATFS_ALLOCATED] == 0 &&
- values[STATFS_STORED] == 0 &&
- values[STATFS_COMPRESSED] == 0 &&
- values[STATFS_COMPRESSED_ORIGINAL] == 0 &&
- values[STATFS_COMPRESSED_ALLOCATED] == 0;
- }
- void decode(bufferlist::iterator& it) {
- for (size_t i = 0; i < STATFS_LAST; i++) {
- ::decode(values[i], it);
- }
- }
-
- void encode(bufferlist& bl) {
- for (size_t i = 0; i < STATFS_LAST; i++) {
- ::encode(values[i], bl);
- }
- }
- } statfs_delta;
-
+ volatile_statfs statfs_delta;
IOContext ioc;
bool had_ios = false; ///< true if we submitted IOs before our kv txn
- CollectionRef first_collection; ///< first referenced collection
-
uint64_t seq = 0;
utime_t start;
utime_t last_stamp;
void write_shared_blob(SharedBlobRef &sb) {
shared_blobs.insert(sb);
}
+ void unshare_blob(SharedBlob *sb) {
+ shared_blobs.erase(sb);
+ }
+
/// note we logically modified object (when onode itself is unmodified)
void note_modified_object(OnodeRef &o) {
// onode itself isn't written, though
return NULL;
}
};
+ struct KVFinalizeThread : public Thread {
+ BlueStore *store;
+ explicit KVFinalizeThread(BlueStore *s) : store(s) {}
+ void *entry() {
+ store->_kv_finalize_thread();
+ return NULL;
+ }
+ };
struct DBHistogram {
struct value_dist {
bool mounted = false;
RWLock coll_lock = {"BlueStore::coll_lock"}; ///< rwlock to protect coll_map
- mempool::bluestore_meta_other::unordered_map<coll_t, CollectionRef> coll_map;
+ mempool::bluestore_cache_other::unordered_map<coll_t, CollectionRef> coll_map;
vector<Cache*> cache_shards;
interval_set<uint64_t> bluefs_extents; ///< block extents owned by bluefs
interval_set<uint64_t> bluefs_extents_reclaiming; ///< currently reclaiming
- std::mutex deferred_lock;
+ std::mutex deferred_lock, deferred_submit_lock;
std::atomic<uint64_t> deferred_seq = {0};
deferred_osr_queue_t deferred_queue; ///< osr's with deferred io pending
int deferred_queue_size = 0; ///< num txc's queued across all osrs
KVSyncThread kv_sync_thread;
std::mutex kv_lock;
std::condition_variable kv_cond;
+ bool kv_sync_started = false;
bool kv_stop = false;
+ bool kv_finalize_started = false;
+ bool kv_finalize_stop = false;
deque<TransContext*> kv_queue; ///< ready, already submitted
deque<TransContext*> kv_queue_unsubmitted; ///< ready, need submit by kv thread
deque<TransContext*> kv_committing; ///< currently syncing
deque<DeferredBatch*> deferred_done_queue; ///< deferred ios done
deque<DeferredBatch*> deferred_stable_queue; ///< deferred ios done + stable
+ KVFinalizeThread kv_finalize_thread;
+ std::mutex kv_finalize_lock;
+ std::condition_variable kv_finalize_cond;
+ deque<TransContext*> kv_committing_to_finalize; ///< pending finalization
+ deque<DeferredBatch*> deferred_stable_to_finalize; ///< pending finalization
+
PerfCounters *logger = nullptr;
std::mutex reap_lock;
size_t block_size_order = 0; ///< bits to shift to get block size
uint64_t min_alloc_size = 0; ///< minimum allocation unit (power of 2)
- int deferred_batch_ops = 0; ///< deferred batch size
-
///< bits for min_alloc_size
- std::atomic<uint8_t> min_alloc_size_order = {0};
+ uint8_t min_alloc_size_order = 0;
static_assert(std::numeric_limits<uint8_t>::max() >
std::numeric_limits<decltype(min_alloc_size)>::digits,
"not enough bits for min_alloc_size");
- ///< size threshold for forced deferred writes
- std::atomic<uint64_t> prefer_deferred_size = {0};
-
///< maximum allocation unit (power of 2)
std::atomic<uint64_t> max_alloc_size = {0};
+ ///< number threshold for forced deferred writes
+ std::atomic<int> deferred_batch_ops = {0};
+
+ ///< size threshold for forced deferred writes
+ std::atomic<uint64_t> prefer_deferred_size = {0};
+
///< approx cost per io, in bytes
std::atomic<uint64_t> throttle_cost_per_io = {0};
- std::atomic<Compressor::CompressionMode> comp_mode = {Compressor::COMP_NONE}; ///< compression mode
+ std::atomic<Compressor::CompressionMode> comp_mode =
+ {Compressor::COMP_NONE}; ///< compression mode
CompressorRef compressor;
std::atomic<uint64_t> comp_min_blob_size = {0};
std::atomic<uint64_t> comp_max_blob_size = {0};
std::atomic<uint64_t> max_blob_size = {0}; ///< maximum blob size
+ uint64_t kv_ios = 0;
+ uint64_t kv_throttle_costs = 0;
+
// cache trim control
+ uint64_t cache_size = 0; ///< total cache size
+ float cache_meta_ratio = 0; ///< cache ratio dedicated to metadata
+ float cache_kv_ratio = 0; ///< cache ratio dedicated to kv (e.g., rocksdb)
+ float cache_data_ratio = 0; ///< cache ratio dedicated to object data
- // note that these update in a racy way, but we don't *really* care if
- // they're perfectly accurate. they are all word sized so they will
- // individually update atomically, but may not be coherent with each other.
- size_t mempool_seq = 0;
- size_t mempool_bytes = 0;
- size_t mempool_onodes = 0;
-
- void get_mempool_stats(size_t *seq, uint64_t *bytes, uint64_t *onodes) {
- *seq = mempool_seq;
- *bytes = mempool_bytes;
- *onodes = mempool_onodes;
- }
+ std::mutex vstatfs_lock;
+ volatile_statfs vstatfs;
struct MempoolThread : public Thread {
BlueStore *store;
int _open_super_meta();
+ void _open_statfs();
+
int _reconcile_bluefs_freespace();
int _balance_bluefs_freespace(PExtentVector *extents);
void _commit_bluefs_freespace(const PExtentVector& extents);
void _osr_drain_all();
void _osr_unregister_all();
+ void _kv_start();
+ void _kv_stop();
void _kv_sync_thread();
- void _kv_stop() {
- {
- std::lock_guard<std::mutex> l(kv_lock);
- kv_stop = true;
- kv_cond.notify_all();
- }
- kv_sync_thread.join();
- {
- std::lock_guard<std::mutex> l(kv_lock);
- kv_stop = false;
- }
- }
+ void _kv_finalize_thread();
bluestore_deferred_op_t *_get_deferred_op(TransContext *txc, OnodeRef o);
void _deferred_queue(TransContext *txc);
- void deferred_try_submit() {
- std::lock_guard<std::mutex> l(deferred_lock);
- _deferred_try_submit();
- }
- void _deferred_try_submit();
- void _deferred_submit(OpSequencer *osr);
+ void deferred_try_submit();
+ void _deferred_submit_unlock(OpSequencer *osr);
void _deferred_aio_finish(OpSequencer *osr);
int _deferred_replay();
void _apply_padding(uint64_t head_pad,
uint64_t tail_pad,
- bufferlist& bl,
bufferlist& padded);
// -- ondisk version ---
bool wants_journal() override { return false; };
bool allows_journal() override { return false; };
+ bool is_rotational() override;
+
+ string get_default_device_class() override {
+ string device_class;
+ map<string, string> metadata;
+ collect_metadata(&metadata);
+ auto it = metadata.find("bluestore_bdev_type");
+ if (it != metadata.end()) {
+ device_class = it->second;
+ }
+ return device_class;
+ }
+
static int get_block_device_fsid(CephContext* cct, const string& path,
uuid_d *fsid);
void get_db_statistics(Formatter *f) override;
void generate_db_histogram(Formatter *f) override;
+ void _flush_cache();
void flush_cache() override;
void dump_perf_counters(Formatter *f) override {
f->open_object_section("perf_counters");
uint64_t offset,
size_t len,
bufferlist& bl,
- uint32_t op_flags = 0,
- bool allow_eio = false) override;
+ uint32_t op_flags = 0) override;
int read(
CollectionHandle &c,
const ghobject_t& oid,
uint64_t offset,
size_t len,
bufferlist& bl,
- uint32_t op_flags = 0,
- bool allow_eio = false) override;
+ uint32_t op_flags = 0) override;
int _do_read(
Collection *c,
OnodeRef o,
objectstore_perf_stat_t get_cur_stats() const {
objectstore_perf_stat_t ret;
- ret.os_commit_latency = os_commit_latency.avg();
- ret.os_apply_latency = os_apply_latency.avg();
+ ret.os_commit_latency = os_commit_latency.current_avg();
+ ret.os_apply_latency = os_apply_latency.current_avg();
return ret;
}
RWLock::WLocker l(debug_read_error_lock);
debug_mdata_error_objects.insert(o);
}
+ void compact() override {
+ assert(db);
+ db->compact();
+ }
+
private:
bool _debug_data_eio(const ghobject_t& o) {
if (!cct->_conf->bluestore_debug_inject_read_err) {
TransContext *txc,
CollectionRef& c,
OnodeRef o,
- WriteContext *wctx);
+ WriteContext *wctx,
+ set<SharedBlob*> *maybe_unshared_blobs=0);
int _do_transaction(Transaction *t,
TransContext *txc,
void _pad_zeros(bufferlist *bl, uint64_t *offset,
uint64_t chunk_size);
+ void _choose_write_options(CollectionRef& c,
+ OnodeRef o,
+ uint32_t fadvise_flags,
+ WriteContext *wctx);
+
+ int _do_gc(TransContext *txc,
+ CollectionRef& c,
+ OnodeRef o,
+ const GarbageCollector& gc,
+ const WriteContext& wctx,
+ uint64_t *dirty_start,
+ uint64_t *dirty_end);
+
int _do_write(TransContext *txc,
CollectionRef &c,
OnodeRef o,
void _do_truncate(TransContext *txc,
CollectionRef& c,
OnodeRef o,
- uint64_t offset);
+ uint64_t offset,
+ set<SharedBlob*> *maybe_unshared_blobs=0);
void _truncate(TransContext *txc,
CollectionRef& c,
OnodeRef& o,