l_bluestore_blob_split,
l_bluestore_extent_compress,
l_bluestore_gc_merged,
+ l_bluestore_read_eio,
l_bluestore_last
};
buffer_map[b->offset].reset(b);
if (b->is_writing()) {
b->data.reassign_to_mempool(mempool::mempool_bluestore_writing);
- writing.push_back(*b);
+ if (writing.empty() || writing.rbegin()->seq <= b->seq) {
+ writing.push_back(*b);
+ } else {
+ auto it = writing.begin();
+ while (it->seq < b->seq) {
+ ++it;
+ }
+
+ assert(it->seq >= b->seq);
+ // note that this will insert b before it
+ // hence the order is maintained
+ writing.insert(it, *b);
+ }
} else {
b->data.reassign_to_mempool(mempool::mempool_bluestore_cache_data);
cache->_add_buffer(b, level, near);
SharedBlobRef lookup(uint64_t sbid) {
std::lock_guard<std::mutex> l(lock);
auto p = sb_map.find(sbid);
- if (p == sb_map.end()) {
+ if (p == sb_map.end() ||
+ p->second->nref == 0) {
return nullptr;
}
return p->second;
sb->coll = coll;
}
- bool remove(SharedBlob *sb) {
+ void remove(SharedBlob *sb) {
std::lock_guard<std::mutex> l(lock);
- if (sb->nref == 0) {
- assert(sb->get_parent() == this);
- sb_map.erase(sb->get_sbid());
- return true;
+ assert(sb->get_parent() == this);
+ // only remove if it still points to us
+ auto p = sb_map.find(sb->get_sbid());
+ if (p != sb_map.end() &&
+ p->second == sb) {
+ sb_map.erase(p);
}
- return false;
}
bool empty() {
std::lock_guard<std::mutex> l(lock);
return sb_map.empty();
}
+
+ void dump(CephContext *cct, int lvl);
};
//#define CACHE_BLOB_BL // not sure if this is a win yet or not... :/
get_blob().can_split_at(blob_offset);
}
- bool try_reuse_blob(uint32_t min_alloc_size,
+ bool can_reuse_blob(uint32_t min_alloc_size,
uint32_t target_blob_size,
uint32_t b_offset,
uint32_t *length0);
#endif
}
- const bluestore_blob_t& get_blob() const {
+ inline const bluestore_blob_t& get_blob() const {
return blob;
}
- bluestore_blob_t& dirty_blob() {
+ inline bluestore_blob_t& dirty_blob() {
#ifdef CACHE_BLOB_BL
blob_bl.clear();
#endif
void clear();
bool empty();
+ void dump(CephContext *cct, int lvl);
+
/// return true if f true for any item
bool map_any(std::function<bool(OnodeRef)> f);
};
IOContext ioc;
bool had_ios = false; ///< true if we submitted IOs before our kv txn
- CollectionRef first_collection; ///< first referenced collection
-
uint64_t seq = 0;
utime_t start;
utime_t last_stamp;
deferred_osr_queue_t deferred_queue; ///< osr's with deferred io pending
int deferred_queue_size = 0; ///< num txc's queued across all osrs
atomic_int deferred_aggressive = {0}; ///< aggressive wakeup of kv thread
+ Finisher deferred_finisher;
int m_finisher_num = 1;
vector<Finisher*> finishers;
KVSyncThread kv_sync_thread;
std::mutex kv_lock;
std::condition_variable kv_cond;
+ bool _kv_only = false;
bool kv_sync_started = false;
bool kv_stop = false;
bool kv_finalize_started = false;
PerfCounters *logger = nullptr;
- std::mutex reap_lock;
list<CollectionRef> removed_collections;
RWLock debug_read_error_lock = {"BlueStore::debug_read_error_lock"};
size_t block_size_order = 0; ///< bits to shift to get block size
uint64_t min_alloc_size = 0; ///< minimum allocation unit (power of 2)
- std::atomic<int> deferred_batch_ops = {0}; ///< deferred batch size
-
///< bits for min_alloc_size
- std::atomic<uint8_t> min_alloc_size_order = {0};
+ uint8_t min_alloc_size_order = 0;
static_assert(std::numeric_limits<uint8_t>::max() >
std::numeric_limits<decltype(min_alloc_size)>::digits,
"not enough bits for min_alloc_size");
- ///< size threshold for forced deferred writes
- std::atomic<uint64_t> prefer_deferred_size = {0};
-
///< maximum allocation unit (power of 2)
std::atomic<uint64_t> max_alloc_size = {0};
+ ///< number threshold for forced deferred writes
+ std::atomic<int> deferred_batch_ops = {0};
+
+ ///< size threshold for forced deferred writes
+ std::atomic<uint64_t> prefer_deferred_size = {0};
+
///< approx cost per io, in bytes
std::atomic<uint64_t> throttle_cost_per_io = {0};
- std::atomic<Compressor::CompressionMode> comp_mode = {Compressor::COMP_NONE}; ///< compression mode
+ std::atomic<Compressor::CompressionMode> comp_mode =
+ {Compressor::COMP_NONE}; ///< compression mode
CompressorRef compressor;
std::atomic<uint64_t> comp_min_blob_size = {0};
std::atomic<uint64_t> comp_max_blob_size = {0};
uint64_t kv_throttle_costs = 0;
// cache trim control
+ uint64_t cache_size = 0; ///< total cache size
float cache_meta_ratio = 0; ///< cache ratio dedicated to metadata
float cache_kv_ratio = 0; ///< cache ratio dedicated to kv (e.g., rocksdb)
float cache_data_ratio = 0; ///< cache ratio dedicated to object data
int _setup_block_symlink_or_file(string name, string path, uint64_t size,
bool create);
- int _write_bdev_label(string path, bluestore_bdev_label_t label);
public:
+ static int _write_bdev_label(CephContext* cct,
+ string path, bluestore_bdev_label_t label);
static int _read_bdev_label(CephContext* cct, string path,
bluestore_bdev_label_t *label);
private:
int _open_super_meta();
- void open_statfs();
+ void _open_statfs();
int _reconcile_bluefs_freespace();
int _balance_bluefs_freespace(PExtentVector *extents);
void _assign_nid(TransContext *txc, OnodeRef o);
uint64_t _assign_blobid(TransContext *txc);
- void _dump_onode(OnodeRef o, int log_level=30);
+ void _dump_onode(const OnodeRef& o, int log_level=30);
void _dump_extent_map(ExtentMap& em, int log_level=30);
void _dump_transaction(Transaction *t, int log_level = 30);
bluestore_deferred_op_t *_get_deferred_op(TransContext *txc, OnodeRef o);
void _deferred_queue(TransContext *txc);
- void deferred_try_submit() {
- std::lock_guard<std::mutex> l(deferred_lock);
- _deferred_try_submit();
- }
- void _deferred_try_submit();
- void _deferred_submit(OpSequencer *osr);
+public:
+ void deferred_try_submit();
+private:
+ void _deferred_submit_unlock(OpSequencer *osr);
void _deferred_aio_finish(OpSequencer *osr);
int _deferred_replay();
const PExtentVector& extents,
bool compressed,
mempool_dynamic_bitset &used_blocks,
+ uint64_t granularity,
store_statfs_t& expected_statfs);
void _buffer_cache_write(
void _apply_padding(uint64_t head_pad,
uint64_t tail_pad,
- bufferlist& bl,
bufferlist& padded);
// -- ondisk version ---
bool allows_journal() override { return false; };
bool is_rotational() override;
+ bool is_journal_rotational() override;
+
+ string get_default_device_class() override {
+ string device_class;
+ map<string, string> metadata;
+ collect_metadata(&metadata);
+ auto it = metadata.find("bluestore_bdev_type");
+ if (it != metadata.end()) {
+ device_class = it->second;
+ }
+ return device_class;
+ }
static int get_block_device_fsid(CephContext* cct, const string& path,
uuid_d *fsid);
return 0;
}
- int fsck(bool deep) override;
+ int write_meta(const std::string& key, const std::string& value) override;
+ int read_meta(const std::string& key, std::string *value) override;
+
+
+ int fsck(bool deep) override {
+ return _fsck(deep, false);
+ }
+ int repair(bool deep) override {
+ return _fsck(deep, true);
+ }
+ int _fsck(bool deep, bool repair);
void set_cache_shards(unsigned num) override;
uint64_t offset,
size_t len,
bufferlist& bl,
- uint32_t op_flags = 0,
- bool allow_eio = false) override;
+ uint32_t op_flags = 0) override;
int read(
CollectionHandle &c,
const ghobject_t& oid,
uint64_t offset,
size_t len,
bufferlist& bl,
- uint32_t op_flags = 0,
- bool allow_eio = false) override;
+ uint32_t op_flags = 0) override;
int _do_read(
Collection *c,
OnodeRef o,
objectstore_perf_stat_t get_cur_stats() const {
objectstore_perf_stat_t ret;
- ret.os_commit_latency = os_commit_latency.avg();
- ret.os_apply_latency = os_apply_latency.avg();
+ ret.os_commit_latency = os_commit_latency.current_avg();
+ ret.os_apply_latency = os_apply_latency.current_avg();
return ret;
}
RWLock::WLocker l(debug_read_error_lock);
debug_mdata_error_objects.insert(o);
}
+ void compact() override {
+ assert(db);
+ db->compact();
+ }
+ bool has_builtin_csum() const override {
+ return true;
+ }
+
private:
bool _debug_data_eio(const ghobject_t& o) {
if (!cct->_conf->bluestore_debug_inject_read_err) {
bool mark_unused;
bool new_blob; ///< whether new blob was created
+ bool compressed = false;
+ bufferlist compressed_bl;
+ size_t compressed_len = 0;
+
write_item(
uint64_t logical_offs,
BlobRef b,
OnodeRef o,
uint64_t offset,
set<SharedBlob*> *maybe_unshared_blobs=0);
- void _truncate(TransContext *txc,
+ int _truncate(TransContext *txc,
CollectionRef& c,
OnodeRef& o,
uint64_t offset);