#include <sys/stat.h>
#include <fcntl.h>
+#include "include/cpp-btree/btree_set.h"
+
#include "BlueStore.h"
#include "os/kv.h"
#include "include/compat.h"
#define dout_context cct
#define dout_subsys ceph_subsys_bluestore
-// bluestore_meta_onode
+using bid_t = decltype(BlueStore::Blob::id);
+
+// bluestore_cache_onode
MEMPOOL_DEFINE_OBJECT_FACTORY(BlueStore::Onode, bluestore_onode,
- bluestore_meta_onode);
+ bluestore_cache_onode);
-// bluestore_meta_other
+// bluestore_cache_other
MEMPOOL_DEFINE_OBJECT_FACTORY(BlueStore::Buffer, bluestore_buffer,
- bluestore_meta_other);
+ bluestore_cache_other);
MEMPOOL_DEFINE_OBJECT_FACTORY(BlueStore::Extent, bluestore_extent,
- bluestore_meta_other);
+ bluestore_cache_other);
MEMPOOL_DEFINE_OBJECT_FACTORY(BlueStore::Blob, bluestore_blob,
- bluestore_meta_other);
+ bluestore_cache_other);
MEMPOOL_DEFINE_OBJECT_FACTORY(BlueStore::SharedBlob, bluestore_shared_blob,
- bluestore_meta_other);
+ bluestore_cache_other);
+
+// bluestore_txc
+MEMPOOL_DEFINE_OBJECT_FACTORY(BlueStore::TransContext, bluestore_transcontext,
+ bluestore_txc);
+
// kv store prefixes
const string PREFIX_SUPER = "S"; // field -> value
template<typename S>
static void append_escaped(const string &in, S *out)
{
- char hexbyte[8];
+ char hexbyte[in.length() * 3 + 1];
+ char* ptr = &hexbyte[0];
for (string::const_iterator i = in.begin(); i != in.end(); ++i) {
if (*i <= '#') {
- snprintf(hexbyte, sizeof(hexbyte), "#%02x", (uint8_t)*i);
- out->append(hexbyte);
+ *ptr++ = '#';
+ *ptr++ = "0123456789abcdef"[(*i >> 4) & 0x0f];
+ *ptr++ = "0123456789abcdef"[*i & 0x0f];
} else if (*i >= '~') {
- snprintf(hexbyte, sizeof(hexbyte), "~%02x", (uint8_t)*i);
- out->append(hexbyte);
+ *ptr++ = '~';
+ *ptr++ = "0123456789abcdef"[(*i >> 4) & 0x0f];
+ *ptr++ = "0123456789abcdef"[*i & 0x0f];
} else {
- out->push_back(*i);
+ *ptr++ = *i;
}
}
- out->push_back('!');
+ *ptr++ = '!';
+ out->append(hexbyte, ptr - &hexbyte[0]);
+}
+
+inline unsigned h2i(char c)
+{
+ if ((c >= '0') && (c <= '9')) {
+ return c - 0x30;
+ } else if ((c >= 'a') && (c <= 'f')) {
+ return c - 'a' + 10;
+ } else if ((c >= 'A') && (c <= 'F')) {
+ return c - 'A' + 10;
+ } else {
+ return 256; // make it always larger than 255
+ }
}
static int decode_escaped(const char *p, string *out)
{
+ char buff[256];
+ char* ptr = &buff[0];
+ char* max = &buff[252];
const char *orig_p = p;
while (*p && *p != '!') {
if (*p == '#' || *p == '~') {
- unsigned hex;
- int r = sscanf(++p, "%2x", &hex);
- if (r < 1)
- return -EINVAL;
- out->push_back((char)hex);
- p += 2;
+ unsigned hex = 0;
+ p++;
+ hex = h2i(*p++) << 4;
+ if (hex > 255) {
+ return -EINVAL;
+ }
+ hex |= h2i(*p++);
+ if (hex > 255) {
+ return -EINVAL;
+ }
+ *ptr++ = hex;
} else {
- out->push_back(*p++);
+ *ptr++ = *p++;
+ }
+ if (ptr > max) {
+ out->append(buff, ptr-buff);
+ ptr = &buff[0];
}
}
+ if (ptr != buff) {
+ out->append(buff, ptr-buff);
+ }
return p - orig_p;
}
const char *p = key.c_str();
if (key.length() < sizeof(uint64_t))
return -1;
- p = _key_decode_u64(p, sbid);
+ _key_decode_u64(p, sbid);
return 0;
}
int okey_len = key.size() - sizeof(uint32_t) - 1;
*onode_key = key.substr(0, okey_len);
const char *p = key.data() + okey_len;
- p = _key_decode_u32(p, offset);
+ _key_decode_u32(p, offset);
return 0;
}
}
bExit = it == bi.last_lextent;
++it;
- } while(!bExit);
+ } while (!bExit);
}
expected_for_release += blob_expected_for_release;
expected_allocations += bi.expected_allocations;
{
std::lock_guard<std::recursive_mutex> l(lock);
_trim(0, 0);
- assert(_get_num_onodes() == 0);
- assert(_get_buffer_bytes() == 0);
}
void BlueStore::Cache::trim(
uint64_t target_bytes,
float target_meta_ratio,
+ float target_data_ratio,
float bytes_per_onode)
{
std::lock_guard<std::recursive_mutex> l(lock);
uint64_t current_buffer = _get_buffer_bytes();
uint64_t current = current_meta + current_buffer;
- uint64_t target_meta = target_bytes * (double)target_meta_ratio; //need to cast to double
- //since float(1) might produce inaccurate value
- // for target_meta (a bit greater than target_bytes)
- // that causes overflow in target_buffer below.
- //Consider the following code:
- //uint64_t i =(uint64_t)227*1024*1024*1024 + 1;
- //float f = 1;
- //uint64_t i2 = i*f;
- //assert(i == i2);
+ uint64_t target_meta = target_bytes * target_meta_ratio;
+ uint64_t target_buffer = target_bytes * target_data_ratio;
- target_meta = min(target_bytes, target_meta); //and just in case that ratio is > 1
- uint64_t target_buffer = target_bytes - target_meta;
+ // correct for overflow or float imprecision
+ target_meta = min(target_bytes, target_meta);
+ target_buffer = min(target_bytes - target_meta, target_buffer);
if (current <= target_bytes) {
dout(10) << __func__
<< " shard target " << pretty_si_t(target_bytes)
- << " ratio " << target_meta_ratio << " ("
+ << " meta/data ratios " << target_meta_ratio
+ << " + " << target_data_ratio << " ("
<< pretty_si_t(target_meta) << " + "
<< pretty_si_t(target_buffer) << "), "
<< " current " << pretty_si_t(current) << " ("
if (b->data.length()) {
bufferlist bl;
bl.substr_of(b->data, b->length - tail, tail);
- _add_buffer(cache, new Buffer(this, b->state, b->seq, end, bl), 0, b);
+ Buffer *nb = new Buffer(this, b->state, b->seq, end, bl);
+ nb->maybe_rebuild();
+ _add_buffer(cache, nb, 0, b);
} else {
- _add_buffer(cache, new Buffer(this, b->state, b->seq, end, tail), 0, b);
+ _add_buffer(cache, new Buffer(this, b->state, b->seq, end, tail),
+ 0, b);
}
if (!b->is_writing()) {
cache->_adjust_buffer_size(b, front - (int64_t)b->length);
}
b->truncate(front);
+ b->maybe_rebuild();
cache->_audit("discard end 1");
break;
} else {
cache->_adjust_buffer_size(b, front - (int64_t)b->length);
}
b->truncate(front);
+ b->maybe_rebuild();
++i;
continue;
}
if (b->data.length()) {
bufferlist bl;
bl.substr_of(b->data, b->length - keep, keep);
- _add_buffer(cache, new Buffer(this, b->state, b->seq, end, bl), 0, b);
+ Buffer *nb = new Buffer(this, b->state, b->seq, end, bl);
+ nb->maybe_rebuild();
+ _add_buffer(cache, nb, 0, b);
} else {
_add_buffer(cache, new Buffer(this, b->state, b->seq, end, keep), 0, b);
}
void BlueStore::BufferSpace::read(
Cache* cache,
- uint32_t offset, uint32_t length,
+ uint32_t offset,
+ uint32_t length,
BlueStore::ready_regions_t& res,
interval_set<uint32_t>& res_intervals)
{
- std::lock_guard<std::recursive_mutex> l(cache->lock);
res.clear();
res_intervals.clear();
uint32_t want_bytes = length;
uint32_t end = offset + length;
- for (auto i = _data_lower_bound(offset);
- i != buffer_map.end() && offset < end && i->first < end;
- ++i) {
- Buffer *b = i->second.get();
- assert(b->end() > offset);
- if (b->is_writing() || b->is_clean()) {
- if (b->offset < offset) {
- uint32_t skip = offset - b->offset;
- uint32_t l = MIN(length, b->length - skip);
- res[offset].substr_of(b->data, skip, l);
- res_intervals.insert(offset, l);
- offset += l;
- length -= l;
- if (!b->is_writing()) {
+
+ {
+ std::lock_guard<std::recursive_mutex> l(cache->lock);
+ for (auto i = _data_lower_bound(offset);
+ i != buffer_map.end() && offset < end && i->first < end;
+ ++i) {
+ Buffer *b = i->second.get();
+ assert(b->end() > offset);
+ if (b->is_writing() || b->is_clean()) {
+ if (b->offset < offset) {
+ uint32_t skip = offset - b->offset;
+ uint32_t l = MIN(length, b->length - skip);
+ res[offset].substr_of(b->data, skip, l);
+ res_intervals.insert(offset, l);
+ offset += l;
+ length -= l;
+ if (!b->is_writing()) {
+ cache->_touch_buffer(b);
+ }
+ continue;
+ }
+ if (b->offset > offset) {
+ uint32_t gap = b->offset - offset;
+ if (length <= gap) {
+ break;
+ }
+ offset += gap;
+ length -= gap;
+ }
+ if (!b->is_writing()) {
cache->_touch_buffer(b);
- }
- continue;
- }
- if (b->offset > offset) {
- uint32_t gap = b->offset - offset;
- if (length <= gap) {
- break;
- }
- offset += gap;
- length -= gap;
- }
- if (!b->is_writing()) {
- cache->_touch_buffer(b);
- }
- if (b->length > length) {
- res[offset].substr_of(b->data, 0, length);
- res_intervals.insert(offset, length);
- break;
- } else {
- res[offset].append(b->data);
- res_intervals.insert(offset, b->length);
- if (b->length == length)
+ }
+ if (b->length > length) {
+ res[offset].substr_of(b->data, 0, length);
+ res_intervals.insert(offset, length);
break;
- offset += b->length;
- length -= b->length;
+ } else {
+ res[offset].append(b->data);
+ res_intervals.insert(offset, b->length);
+ if (b->length == length)
+ break;
+ offset += b->length;
+ length -= b->length;
+ }
}
}
}
} else {
b->state = Buffer::STATE_CLEAN;
writing.erase(i++);
+ b->maybe_rebuild();
+ b->data.reassign_to_mempool(mempool::mempool_bluestore_cache_data);
cache->_add_buffer(b, 1, nullptr);
ldout(cache->cct, 20) << __func__ << " added " << *b << dendl;
}
BlueStore::OnodeRef BlueStore::OnodeSpace::lookup(const ghobject_t& oid)
{
- std::lock_guard<std::recursive_mutex> l(cache->lock);
ldout(cache->cct, 30) << __func__ << dendl;
- ceph::unordered_map<ghobject_t,OnodeRef>::iterator p = onode_map.find(oid);
- if (p == onode_map.end()) {
- ldout(cache->cct, 30) << __func__ << " " << oid << " miss" << dendl;
+ OnodeRef o;
+ bool hit = false;
+
+ {
+ std::lock_guard<std::recursive_mutex> l(cache->lock);
+ ceph::unordered_map<ghobject_t,OnodeRef>::iterator p = onode_map.find(oid);
+ if (p == onode_map.end()) {
+ ldout(cache->cct, 30) << __func__ << " " << oid << " miss" << dendl;
+ } else {
+ ldout(cache->cct, 30) << __func__ << " " << oid << " hit " << p->second
+ << dendl;
+ cache->_touch_onode(p->second);
+ hit = true;
+ o = p->second;
+ }
+ }
+
+ if (hit) {
+ cache->logger->inc(l_bluestore_onode_hits);
+ } else {
cache->logger->inc(l_bluestore_onode_misses);
- return OnodeRef();
}
- ldout(cache->cct, 30) << __func__ << " " << oid << " hit " << p->second
- << dendl;
- cache->_touch_onode(p->second);
- cache->logger->inc(l_bluestore_onode_hits);
- return p->second;
+ return o;
}
void BlueStore::OnodeSpace::clear()
OnodeRef& oldo,
const ghobject_t& old_oid,
const ghobject_t& new_oid,
- const mempool::bluestore_meta_other::string& new_okey)
+ const mempool::bluestore_cache_other::string& new_okey)
{
std::lock_guard<std::recursive_mutex> l(cache->lock);
ldout(cache->cct, 30) << __func__ << " " << old_oid << " -> " << new_oid
return false;
}
+void BlueStore::OnodeSpace::dump(CephContext *cct, int lvl)
+{
+ for (auto& i : onode_map) {
+ ldout(cct, lvl) << i.first << " : " << i.second << dendl;
+ }
+}
// SharedBlob
<< " removing self from set " << get_parent()
<< dendl;
if (get_parent()) {
- if (get_parent()->remove(this)) {
- delete this;
- } else {
- ldout(coll->store->cct, 20)
- << __func__ << " " << this << " lost race to remove myself from set"
- << dendl;
- }
- } else {
- delete this;
+ get_parent()->remove(this);
}
+ delete this;
}
}
}
void BlueStore::SharedBlob::put_ref(uint64_t offset, uint32_t length,
- PExtentVector *r)
+ PExtentVector *r,
+ set<SharedBlob*> *maybe_unshared)
{
assert(persistent);
- persistent->ref_map.put(offset, length, r);
+ bool maybe = false;
+ persistent->ref_map.put(offset, length, r, maybe_unshared ? &maybe : nullptr);
+ if (maybe_unshared && maybe) {
+ maybe_unshared->insert(this);
+ }
+}
+
+// SharedBlobSet
+
+#undef dout_prefix
+#define dout_prefix *_dout << "bluestore.sharedblobset(" << this << ") "
+
+void BlueStore::SharedBlobSet::dump(CephContext *cct, int lvl)
+{
+ std::lock_guard<std::mutex> l(lock);
+ for (auto& i : sb_map) {
+ ldout(cct, lvl) << i.first << " : " << *i.second << dendl;
+ }
}
// Blob
if (b.is_spanning()) {
out << " spanning " << b.id;
}
- out << " " << b.get_blob() << " " << b.get_blob_use_tracker()
- << " " << *b.shared_blob
- << ")";
+ out << " " << b.get_blob() << " " << b.get_blob_use_tracker();
+ if (b.shared_blob) {
+ out << " " << *b.shared_blob;
+ } else {
+ out << " (shared_blob=NULL)";
+ }
+ out << ")";
return out;
}
void BlueStore::Blob::discard_unallocated(Collection *coll)
{
- if (blob.is_shared()) {
+ if (get_blob().is_shared()) {
return;
}
- if (blob.is_compressed()) {
+ if (get_blob().is_compressed()) {
bool discard = false;
bool all_invalid = true;
- for (auto e : blob.get_extents()) {
+ for (auto e : get_blob().get_extents()) {
if (!e.is_valid()) {
discard = true;
} else {
assert(discard == all_invalid); // in case of compressed blob all
// or none pextents are invalid.
if (discard) {
- shared_blob->bc.discard(shared_blob->get_cache(), 0, blob.get_logical_length());
+ shared_blob->bc.discard(shared_blob->get_cache(), 0,
+ get_blob().get_logical_length());
}
} else {
size_t pos = 0;
- for (auto e : blob.get_extents()) {
+ for (auto e : get_blob().get_extents()) {
if (!e.is_valid()) {
ldout(coll->store->cct, 20) << __func__ << " 0x" << std::hex << pos
<< "~" << e.length
}
pos += e.length;
}
- if (blob.can_prune_tail()) {
- dirty_blob();
- blob.prune_tail();
- used_in_blob.prune_tail(blob.get_ondisk_length());
+ if (get_blob().can_prune_tail()) {
+ dirty_blob().prune_tail();
+ used_in_blob.prune_tail(get_blob().get_ondisk_length());
auto cct = coll->store->cct; //used by dout
- dout(20) << __func__ << " pruned tail, now " << blob << dendl;
+ dout(20) << __func__ << " pruned tail, now " << get_blob() << dendl;
}
}
}
if (used_in_blob.is_empty()) {
uint32_t min_release_size =
- blob.get_release_size(coll->store->min_alloc_size);
- uint64_t l = blob.get_logical_length();
- dout(20) << __func__ << " init 0x" << std::hex << l << ", " << min_release_size
- << std::dec << dendl;
+ get_blob().get_release_size(coll->store->min_alloc_size);
+ uint64_t l = get_blob().get_logical_length();
+ dout(20) << __func__ << " init 0x" << std::hex << l << ", "
+ << min_release_size << std::dec << dendl;
used_in_blob.init(l, min_release_size);
}
used_in_blob.get(
return b.release_extents(empty, logical, r);
}
-bool BlueStore::Blob::try_reuse_blob(uint32_t min_alloc_size,
+bool BlueStore::Blob::can_reuse_blob(uint32_t min_alloc_size,
uint32_t target_blob_size,
uint32_t b_offset,
uint32_t *length0) {
target_blob_size = MAX(blen, target_blob_size);
if (b_offset >= blen) {
- //new data totally stands out of the existing blob
- new_blen = b_offset + length;
+ // new data totally stands out of the existing blob
+ new_blen = end;
} else {
- //new data overlaps with the existing blob
- new_blen = MAX(blen, length + b_offset);
- if (!get_blob().is_unallocated(
- b_offset,
- new_blen > blen ? blen - b_offset : length)) {
- return false;
+ // new data overlaps with the existing blob
+ new_blen = MAX(blen, end);
+
+ uint32_t overlap = 0;
+ if (new_blen > blen) {
+ overlap = blen - b_offset;
+ } else {
+ overlap = length;
+ }
+
+ if (!get_blob().is_unallocated(b_offset, overlap)) {
+ // abort if any piece of the overlap has already been allocated
+ return false;
}
}
+
if (new_blen > blen) {
int64_t overflow = int64_t(new_blen) - target_blob_size;
// Unable to decrease the provided length to fit into max_blob_size
length -= overflow;
*length0 = length;
}
+
if (new_blen > blen) {
dirty_blob().add_tail(new_blen);
used_in_blob.add_tail(new_blen,
- blob.get_release_size(min_alloc_size));
+ get_blob().get_release_size(min_alloc_size));
}
}
return true;
unsigned n;
// we need to encode inline_bl to measure encoded length
bool never_happen = encode_some(0, OBJECT_MAX_SIZE, inline_bl, &n);
+ inline_bl.reassign_to_mempool(mempool::mempool_bluestore_cache_other);
assert(!never_happen);
size_t len = inline_bl.length();
dout(20) << __func__ << " inline shard " << len << " bytes from " << n
auto p = shards.begin();
auto prev_p = p;
while (p != shards.end()) {
+ assert(p->shard_info->offset >= prev_p->shard_info->offset);
auto n = p;
++n;
if (p->dirty) {
// avoid resharding the trailing shard, even if it is small
else if (n != shards.end() &&
len < g_conf->bluestore_extent_map_shard_min_size) {
- // we are small; combine with a neighbor
- if (p == shards.begin() && endoff == OBJECT_MAX_SIZE) {
- // we are an only shard
- request_reshard(0, OBJECT_MAX_SIZE);
- return;
- } else if (p == shards.begin()) {
- // combine with next shard
+ assert(endoff != OBJECT_MAX_SIZE);
+ if (p == shards.begin()) {
+ // we are the first shard, combine with next shard
request_reshard(p->shard_info->offset, endoff + 1);
- } else if (endoff == OBJECT_MAX_SIZE) {
- // combine with previous shard
- request_reshard(prev_p->shard_info->offset, endoff);
- return;
} else {
- // combine with the smaller of the two
+ // combine either with the previous shard or the next,
+ // whichever is smaller
if (prev_p->shard_info->bytes > n->shard_info->bytes) {
request_reshard(p->shard_info->offset, endoff + 1);
} else {
}
}
+bid_t BlueStore::ExtentMap::allocate_spanning_blob_id()
+{
+ if (spanning_blob_map.empty())
+ return 0;
+ bid_t bid = spanning_blob_map.rbegin()->first + 1;
+ // bid is valid and available.
+ if (bid >= 0)
+ return bid;
+ // Find next unused bid;
+ bid = rand() % (numeric_limits<bid_t>::max() + 1);
+ const auto begin_bid = bid;
+ do {
+ if (!spanning_blob_map.count(bid))
+ return bid;
+ else {
+ bid++;
+ if (bid < 0) bid = 0;
+ }
+ } while (bid != begin_bid);
+ assert(0 == "no available blob id");
+}
+
void BlueStore::ExtentMap::reshard(
KeyValueDB *db,
KeyValueDB::Transaction t)
<< needs_reshard_end << ")" << std::dec << dendl;
}
- fault_range(db, needs_reshard_begin, needs_reshard_end);
+ fault_range(db, needs_reshard_begin, (needs_reshard_end - needs_reshard_begin));
// we may need to fault in a larger interval later must have all
// referring extents for spanning blobs loaded in order to have
// reshard
unsigned estimate = 0;
- unsigned offset = 0;
+ unsigned offset = needs_reshard_begin;
vector<bluestore_onode_t::shard_info> new_shard_info;
unsigned max_blob_end = 0;
Extent dummy(needs_reshard_begin);
if (estimate &&
estimate + extent_avg > target + (would_span ? slop : 0)) {
// new shard
- if (offset == 0) {
+ if (offset == needs_reshard_begin) {
new_shard_info.emplace_back(bluestore_onode_t::shard_info());
new_shard_info.back().offset = offset;
dout(20) << __func__ << " new shard 0x" << std::hex << offset
- << std::dec << dendl;
+ << std::dec << dendl;
}
offset = e->logical_offset;
new_shard_info.emplace_back(bluestore_onode_t::shard_info());
estimate = 0;
}
estimate += extent_avg;
- unsigned bb = e->blob_start();
- if (bb < spanning_scan_begin) {
- spanning_scan_begin = bb;
+ unsigned bs = e->blob_start();
+ if (bs < spanning_scan_begin) {
+ spanning_scan_begin = bs;
}
uint32_t be = e->blob_end();
if (be > max_blob_end) {
new_shard_info.begin(),
new_shard_info.end());
shards.insert(shards.begin() + si_begin, new_shard_info.size(), Shard());
- unsigned n = sv.size();
si_end = si_begin + new_shard_info.size();
- for (unsigned i = si_begin; i < si_end; ++i) {
+
+ assert(sv.size() == shards.size());
+
+ // note that we need to update every shard_info of shards here,
+ // as sv might have been totally re-allocated above
+ for (unsigned i = 0; i < shards.size(); i++) {
shards[i].shard_info = &sv[i];
+ }
+
+ // mark newly added shards as dirty
+ for (unsigned i = si_begin; i < si_end; ++i) {
shards[i].loaded = true;
shards[i].dirty = true;
}
- for (unsigned i = si_end; i < n; ++i) {
- shards[i].shard_info = &sv[i];
- }
}
dout(20) << __func__ << " fin " << sv << dendl;
inline_bl.clear();
}
if (spanning_scan_end > needs_reshard_end) {
fault_range(db, needs_reshard_end,
- spanning_scan_end - needs_reshard_begin);
+ spanning_scan_end - needs_reshard_end);
}
auto sp = sv.begin() + si_begin;
auto esp = sv.end();
} else {
shard_end = sp->offset;
}
- int bid;
- if (spanning_blob_map.empty()) {
- bid = 0;
- } else {
- bid = spanning_blob_map.rbegin()->first + 1;
- }
Extent dummy(needs_reshard_begin);
for (auto e = extent_map.lower_bound(dummy); e != extent_map.end(); ++e) {
if (e->logical_offset >= needs_reshard_end) {
must_span = true;
}
if (must_span) {
- b->id = bid++;
+ auto bid = allocate_spanning_blob_id();
+ b->id = bid;
spanning_blob_map[b->id] = b;
dout(20) << __func__ << " adding spanning " << *b << dendl;
}
unsigned n = 0;
size_t bound = 0;
- denc(struct_v, bound);
- denc_varint(0, bound);
bool must_reshard = false;
for (auto p = start;
p != extent_map.end() && p->logical_offset < end;
request_reshard(p->blob_start(), p->blob_end());
must_reshard = true;
}
- denc_varint(0, bound); // blobid
- denc_varint(0, bound); // logical_offset
- denc_varint(0, bound); // len
- denc_varint(0, bound); // blob_offset
+ if (!must_reshard) {
+ denc_varint(0, bound); // blobid
+ denc_varint(0, bound); // logical_offset
+ denc_varint(0, bound); // len
+ denc_varint(0, bound); // blob_offset
- p->blob->bound_encode(
- bound,
- struct_v,
- p->blob->shared_blob->get_sbid(),
- false);
+ p->blob->bound_encode(
+ bound,
+ struct_v,
+ p->blob->shared_blob->get_sbid(),
+ false);
+ }
}
if (must_reshard) {
return true;
}
+ denc(struct_v, bound);
+ denc_varint(0, bound); // number of extents
+
{
auto app = bl.get_contiguous_appender(bound);
denc(struct_v, app);
}
void BlueStore::ExtentMap::dirty_range(
- KeyValueDB::Transaction t,
uint32_t offset,
uint32_t length)
{
return extent_map.find(dummy);
}
-BlueStore::extent_map_t::iterator BlueStore::ExtentMap::find_lextent(
- uint64_t offset)
-{
- auto fp = seek_lextent(offset);
- if (fp != extent_map.end() && fp->logical_offset > offset)
- return extent_map.end(); // extent is past offset
- return fp;
-}
-
BlueStore::extent_map_t::iterator BlueStore::ExtentMap::seek_lextent(
uint64_t offset)
{
for (auto w : writes) {
if (b == w.b) {
auto loffs2 = P2ALIGN(w.logical_offset, min_alloc_size);
- auto loffs2_end = ROUND_UP_TO( w.logical_offset + w.length0, min_alloc_size);
+ auto loffs2_end = P2ROUNDUP(w.logical_offset + w.length0, min_alloc_size);
if ((loffs <= loffs2 && loffs_end > loffs2) ||
- (loffs >= loffs2 && loffs < loffs2_end)) {
+ (loffs >= loffs2 && loffs < loffs2_end)) {
return true;
}
}
assert(i.second); // this should be a new insertion
i.first->second.seq = seq;
blp.copy(length, i.first->second.bl);
+ i.first->second.bl.reassign_to_mempool(
+ mempool::mempool_bluestore_writing_deferred);
dout(20) << __func__ << " seq " << seq
<< " 0x" << std::hex << offset << "~" << length
<< " crc " << i.first->second.bl.crc32c(-1)
<< " 0x" << std::hex << p->first << "~" << p->second.bl.length()
<< " -> 0x" << head.length() << std::dec << dendl;
auto i = seq_bytes.find(p->second.seq);
+ assert(i != seq_bytes.end());
if (end > offset + length) {
bufferlist tail;
tail.substr_of(p->second.bl, offset + length - p->first,
} else {
i->second -= end - offset;
}
+ assert(i->second >= 0);
p->second.bl.swap(head);
}
++p;
break;
}
auto i = seq_bytes.find(p->second.seq);
+ assert(i != seq_bytes.end());
auto end = p->first + p->second.bl.length();
if (end > offset + length) {
unsigned drop_front = offset + length - p->first;
<< std::dec << dendl;
i->second -= p->second.bl.length();
}
+ assert(i->second >= 0);
p = iomap.erase(p);
}
}
void BlueStore::Collection::make_blob_shared(uint64_t sbid, BlobRef b)
{
- assert(!b->shared_blob->is_loaded());
-
ldout(store->cct, 10) << __func__ << " " << *b << dendl;
- bluestore_blob_t& blob = b->dirty_blob();
+ assert(!b->shared_blob->is_loaded());
// update blob
+ bluestore_blob_t& blob = b->dirty_blob();
blob.set_flag(bluestore_blob_t::FLAG_SHARED);
- blob.clear_flag(bluestore_blob_t::FLAG_MUTABLE);
// update shared blob
b->shared_blob->loaded = true;
ldout(store->cct, 20) << __func__ << " now " << *b << dendl;
}
+uint64_t BlueStore::Collection::make_blob_unshared(SharedBlob *sb)
+{
+ ldout(store->cct, 10) << __func__ << " " << *sb << dendl;
+ assert(sb->is_loaded());
+
+ uint64_t sbid = sb->get_sbid();
+ shared_blob_set.remove(sb);
+ sb->loaded = false;
+ delete sb->persistent;
+ sb->sbid_unloaded = 0;
+ ldout(store->cct, 20) << __func__ << " now " << *sb << dendl;
+ return sbid;
+}
+
BlueStore::OnodeRef BlueStore::Collection::get_onode(
const ghobject_t& oid,
bool create)
if (o)
return o;
- mempool::bluestore_meta_other::string key;
+ mempool::bluestore_cache_other::string key;
get_object_key(store->cct, oid, &key);
ldout(store->cct, 20) << __func__ << " oid " << oid << " key "
assert(r >= 0);
on = new Onode(this, oid, key);
on->exists = true;
- bufferptr::iterator p = v.front().begin();
+ bufferptr::iterator p = v.front().begin_deep();
on->onode.decode(p);
+ for (auto& i : on->onode.attrs) {
+ i.second.reassign_to_mempool(mempool::mempool_bluestore_cache_other);
+ }
// initialize extent_map
on->extent_map.decode_spanning_blobs(p);
if (on->onode.extent_map_shards.empty()) {
denc(on->extent_map.inline_bl, p);
on->extent_map.decode_some(on->extent_map.inline_bl);
+ on->extent_map.inline_bl.reassign_to_mempool(
+ mempool::mempool_bluestore_cache_other);
} else {
on->extent_map.init_shards(false, false);
}
continue;
}
ldout(store->cct, 20) << __func__ << " moving " << *sb << dendl;
+ if (sb->get_sbid()) {
+ ldout(store->cct, 20) << __func__
+ << " moving registration " << *sb << dendl;
+ shared_blob_set.remove(sb);
+ dest->shared_blob_set.add(dest, sb);
+ }
sb->coll = dest;
if (dest->cache != cache) {
- if (sb->get_sbid()) {
- ldout(store->cct, 20) << __func__ << " moving registration " << *sb << dendl;
- shared_blob_set.remove(sb);
- dest->shared_blob_set.add(dest, sb);
- }
for (auto& i : sb->bc.buffer_map) {
if (!i.second->is_writing()) {
ldout(store->cct, 20) << __func__ << " moving " << *i.second
}
}
}
-
-
}
}
}
-void BlueStore::Collection::trim_cache()
-{
- // see if mempool stats have updated
- uint64_t total_bytes;
- uint64_t total_onodes;
- size_t seq;
- store->get_mempool_stats(&seq, &total_bytes, &total_onodes);
- if (seq == cache->last_trim_seq) {
- ldout(store->cct, 30) << __func__ << " no new mempool stats; nothing to do"
- << dendl;
- return;
- }
- cache->last_trim_seq = seq;
-
- // trim
- if (total_onodes < 2) {
- total_onodes = 2;
- }
- float bytes_per_onode = (float)total_bytes / (float)total_onodes;
- size_t num_shards = store->cache_shards.size();
- uint64_t shard_target = store->cct->_conf->bluestore_cache_size / num_shards;
- ldout(store->cct, 30) << __func__
- << " total meta bytes " << total_bytes
- << ", total onodes " << total_onodes
- << ", bytes_per_onode " << bytes_per_onode
- << dendl;
- cache->trim(shard_target, store->cct->_conf->bluestore_cache_meta_ratio,
- bytes_per_onode);
-
- store->_update_cache_logger();
-}
-
// =======================================================
void *BlueStore::MempoolThread::entry()
{
Mutex::Locker l(lock);
while (!stop) {
- store->mempool_bytes = mempool::bluestore_meta_other::allocated_bytes() +
- mempool::bluestore_meta_onode::allocated_bytes();
- store->mempool_onodes = mempool::bluestore_meta_onode::allocated_items();
- ++store->mempool_seq;
+ uint64_t meta_bytes =
+ mempool::bluestore_cache_other::allocated_bytes() +
+ mempool::bluestore_cache_onode::allocated_bytes();
+ uint64_t onode_num =
+ mempool::bluestore_cache_onode::allocated_items();
+
+ if (onode_num < 2) {
+ onode_num = 2;
+ }
+
+ float bytes_per_onode = (float)meta_bytes / (float)onode_num;
+ size_t num_shards = store->cache_shards.size();
+ float target_ratio = store->cache_meta_ratio + store->cache_data_ratio;
+ // A little sloppy but should be close enough
+ uint64_t shard_target = target_ratio * (store->cache_size / num_shards);
+
+ for (auto i : store->cache_shards) {
+ i->trim(shard_target,
+ store->cache_meta_ratio,
+ store->cache_data_ratio,
+ bytes_per_onode);
+ }
+
+ store->_update_cache_logger();
+
utime_t wait;
wait += store->cct->_conf->bluestore_cache_trim_interval;
cond.WaitInterval(lock, wait);
// =======================================================
+// OmapIteratorImpl
+
+#undef dout_prefix
+#define dout_prefix *_dout << "bluestore.OmapIteratorImpl(" << this << ") "
+
+BlueStore::OmapIteratorImpl::OmapIteratorImpl(
+ CollectionRef c, OnodeRef o, KeyValueDB::Iterator it)
+ : c(c), o(o), it(it)
+{
+ RWLock::RLocker l(c->lock);
+ if (o->onode.has_omap()) {
+ get_omap_key(o->onode.nid, string(), &head);
+ get_omap_tail(o->onode.nid, &tail);
+ it->lower_bound(head);
+ }
+}
+
+int BlueStore::OmapIteratorImpl::seek_to_first()
+{
+ RWLock::RLocker l(c->lock);
+ if (o->onode.has_omap()) {
+ it->lower_bound(head);
+ } else {
+ it = KeyValueDB::Iterator();
+ }
+ return 0;
+}
+
+int BlueStore::OmapIteratorImpl::upper_bound(const string& after)
+{
+ RWLock::RLocker l(c->lock);
+ if (o->onode.has_omap()) {
+ string key;
+ get_omap_key(o->onode.nid, after, &key);
+ ldout(c->store->cct,20) << __func__ << " after " << after << " key "
+ << pretty_binary_string(key) << dendl;
+ it->upper_bound(key);
+ } else {
+ it = KeyValueDB::Iterator();
+ }
+ return 0;
+}
+
+int BlueStore::OmapIteratorImpl::lower_bound(const string& to)
+{
+ RWLock::RLocker l(c->lock);
+ if (o->onode.has_omap()) {
+ string key;
+ get_omap_key(o->onode.nid, to, &key);
+ ldout(c->store->cct,20) << __func__ << " to " << to << " key "
+ << pretty_binary_string(key) << dendl;
+ it->lower_bound(key);
+ } else {
+ it = KeyValueDB::Iterator();
+ }
+ return 0;
+}
+
+bool BlueStore::OmapIteratorImpl::valid()
+{
+ RWLock::RLocker l(c->lock);
+ bool r = o->onode.has_omap() && it && it->valid() &&
+ it->raw_key().second <= tail;
+ if (it && it->valid()) {
+ ldout(c->store->cct,20) << __func__ << " is at "
+ << pretty_binary_string(it->raw_key().second)
+ << dendl;
+ }
+ return r;
+}
+
+int BlueStore::OmapIteratorImpl::next(bool validate)
+{
+ RWLock::RLocker l(c->lock);
+ if (o->onode.has_omap()) {
+ it->next();
+ return 0;
+ } else {
+ return -1;
+ }
+}
+
+string BlueStore::OmapIteratorImpl::key()
+{
+ RWLock::RLocker l(c->lock);
+ assert(it->valid());
+ string db_key = it->raw_key().second;
+ string user_key;
+ decode_omap_key(db_key, &user_key);
+ return user_key;
+}
+
+bufferlist BlueStore::OmapIteratorImpl::value()
+{
+ RWLock::RLocker l(c->lock);
+ assert(it->valid());
+ return it->value();
+}
+
+
+// =====================================
+
#undef dout_prefix
#define dout_prefix *_dout << "bluestore(" << path << ") "
throttle_deferred_bytes(cct, "bluestore_throttle_deferred_bytes",
cct->_conf->bluestore_throttle_bytes +
cct->_conf->bluestore_throttle_deferred_bytes),
+ deferred_finisher(cct, "defered_finisher", "dfin"),
kv_sync_thread(this),
+ kv_finalize_thread(this),
mempool_thread(this)
{
_init_logger();
cct->_conf->add_observer(this);
set_cache_shards(1);
-
- if (cct->_conf->bluestore_shard_finishers) {
- m_finisher_num = cct->_conf->osd_op_num_shards;
- }
-
- for (int i = 0; i < m_finisher_num; ++i) {
- ostringstream oss;
- oss << "finisher-" << i;
- Finisher *f = new Finisher(cct, oss.str(), "finisher");
- finishers.push_back(f);
- }
}
BlueStore::BlueStore(CephContext *cct,
throttle_deferred_bytes(cct, "bluestore_throttle_deferred_bytes",
cct->_conf->bluestore_throttle_bytes +
cct->_conf->bluestore_throttle_deferred_bytes),
+ deferred_finisher(cct, "defered_finisher", "dfin"),
kv_sync_thread(this),
+ kv_finalize_thread(this),
min_alloc_size(_min_alloc_size),
min_alloc_size_order(ctz(_min_alloc_size)),
mempool_thread(this)
_init_logger();
cct->_conf->add_observer(this);
set_cache_shards(1);
-
- if (cct->_conf->bluestore_shard_finishers) {
- m_finisher_num = cct->_conf->osd_op_num_shards;
- }
-
- for (int i = 0; i < m_finisher_num; ++i) {
- ostringstream oss;
- oss << "finisher-" << i;
- Finisher *f = new Finisher(cct, oss.str(), "finisher");
- finishers.push_back(f);
- }
}
BlueStore::~BlueStore()
"bluestore_compression_max_blob_size",
"bluestore_compression_max_blob_size_ssd",
"bluestore_compression_max_blob_size_hdd",
+ "bluestore_compression_required_ratio",
"bluestore_max_alloc_size",
"bluestore_prefer_deferred_size",
- "bleustore_deferred_batch_ops",
- "bleustore_deferred_batch_ops_hdd",
- "bleustore_deferred_batch_ops_ssd",
+ "bluestore_prefer_deferred_size_hdd",
+ "bluestore_prefer_deferred_size_ssd",
+ "bluestore_deferred_batch_ops",
+ "bluestore_deferred_batch_ops_hdd",
+ "bluestore_deferred_batch_ops_ssd",
"bluestore_throttle_bytes",
"bluestore_throttle_deferred_bytes",
"bluestore_throttle_cost_per_io_hdd",
}
}
if (changed.count("bluestore_prefer_deferred_size") ||
+ changed.count("bluestore_prefer_deferred_size_hdd") ||
+ changed.count("bluestore_prefer_deferred_size_ssd") ||
changed.count("bluestore_max_alloc_size") ||
changed.count("bluestore_deferred_batch_ops") ||
changed.count("bluestore_deferred_batch_ops_hdd") ||
void BlueStore::_set_compression()
{
- if (cct->_conf->bluestore_compression_max_blob_size) {
- comp_min_blob_size = cct->_conf->bluestore_compression_max_blob_size;
+ auto m = Compressor::get_comp_mode_type(cct->_conf->bluestore_compression_mode);
+ if (m) {
+ comp_mode = *m;
} else {
- assert(bdev);
- if (bdev->is_rotational()) {
- comp_min_blob_size = cct->_conf->bluestore_compression_min_blob_size_hdd;
- } else {
- comp_min_blob_size = cct->_conf->bluestore_compression_min_blob_size_ssd;
- }
- }
+ derr << __func__ << " unrecognized value '"
+ << cct->_conf->bluestore_compression_mode
+ << "' for bluestore_compression_mode, reverting to 'none'"
+ << dendl;
+ comp_mode = Compressor::COMP_NONE;
+ }
+
+ compressor = nullptr;
+
+ if (comp_mode == Compressor::COMP_NONE) {
+ dout(10) << __func__ << " compression mode set to 'none', "
+ << "ignore other compression setttings" << dendl;
+ return;
+ }
+
+ if (cct->_conf->bluestore_compression_min_blob_size) {
+ comp_min_blob_size = cct->_conf->bluestore_compression_min_blob_size;
+ } else {
+ assert(bdev);
+ if (bdev->is_rotational()) {
+ comp_min_blob_size = cct->_conf->bluestore_compression_min_blob_size_hdd;
+ } else {
+ comp_min_blob_size = cct->_conf->bluestore_compression_min_blob_size_ssd;
+ }
+ }
if (cct->_conf->bluestore_compression_max_blob_size) {
comp_max_blob_size = cct->_conf->bluestore_compression_max_blob_size;
}
}
- auto m = Compressor::get_comp_mode_type(cct->_conf->bluestore_compression_mode);
- if (m) {
- comp_mode = *m;
- } else {
- derr << __func__ << " unrecognized value '"
- << cct->_conf->bluestore_compression_mode
- << "' for bluestore_compression_mode, reverting to 'none'"
- << dendl;
- comp_mode = Compressor::COMP_NONE;
- }
-
- compressor = nullptr;
-
auto& alg_name = cct->_conf->bluestore_compression_algorithm;
if (!alg_name.empty()) {
compressor = Compressor::create(cct, alg_name);
<< std::dec << dendl;
}
+int BlueStore::_set_cache_sizes()
+{
+ assert(bdev);
+ if (cct->_conf->bluestore_cache_size) {
+ cache_size = cct->_conf->bluestore_cache_size;
+ } else {
+ // choose global cache size based on backend type
+ if (bdev->is_rotational()) {
+ cache_size = cct->_conf->bluestore_cache_size_hdd;
+ } else {
+ cache_size = cct->_conf->bluestore_cache_size_ssd;
+ }
+ }
+ cache_meta_ratio = cct->_conf->bluestore_cache_meta_ratio;
+ cache_kv_ratio = cct->_conf->bluestore_cache_kv_ratio;
+
+ double cache_kv_max = cct->_conf->bluestore_cache_kv_max;
+ double cache_kv_max_ratio = 0;
+
+ // if cache_kv_max is negative, disable it
+ if (cache_size > 0 && cache_kv_max >= 0) {
+ cache_kv_max_ratio = (double) cache_kv_max / (double) cache_size;
+ if (cache_kv_max_ratio < 1.0 && cache_kv_max_ratio < cache_kv_ratio) {
+ dout(1) << __func__ << " max " << cache_kv_max_ratio
+ << " < ratio " << cache_kv_ratio
+ << dendl;
+ cache_meta_ratio = cache_meta_ratio + cache_kv_ratio - cache_kv_max_ratio;
+ cache_kv_ratio = cache_kv_max_ratio;
+ }
+ }
+
+ cache_data_ratio =
+ (double)1.0 - (double)cache_meta_ratio - (double)cache_kv_ratio;
+
+ if (cache_meta_ratio < 0 || cache_meta_ratio > 1.0) {
+ derr << __func__ << " bluestore_cache_meta_ratio (" << cache_meta_ratio
+ << ") must be in range [0,1.0]" << dendl;
+ return -EINVAL;
+ }
+ if (cache_kv_ratio < 0 || cache_kv_ratio > 1.0) {
+ derr << __func__ << " bluestore_cache_kv_ratio (" << cache_kv_ratio
+ << ") must be in range [0,1.0]" << dendl;
+ return -EINVAL;
+ }
+ if (cache_meta_ratio + cache_kv_ratio > 1.0) {
+ derr << __func__ << " bluestore_cache_meta_ratio (" << cache_meta_ratio
+ << ") + bluestore_cache_kv_ratio (" << cache_kv_ratio
+ << ") = " << cache_meta_ratio + cache_kv_ratio << "; must be <= 1.0"
+ << dendl;
+ return -EINVAL;
+ }
+ if (cache_data_ratio < 0) {
+ // deal with floating point imprecision
+ cache_data_ratio = 0;
+ }
+ dout(1) << __func__ << " cache_size " << cache_size
+ << " meta " << cache_meta_ratio
+ << " kv " << cache_kv_ratio
+ << " data " << cache_data_ratio
+ << dendl;
+ return 0;
+}
+
+int BlueStore::write_meta(const std::string& key, const std::string& value)
+{
+ bluestore_bdev_label_t label;
+ string p = path + "/block";
+ int r = _read_bdev_label(cct, p, &label);
+ if (r < 0) {
+ return ObjectStore::write_meta(key, value);
+ }
+ label.meta[key] = value;
+ r = _write_bdev_label(cct, p, label);
+ assert(r == 0);
+ return ObjectStore::write_meta(key, value);
+}
+
+int BlueStore::read_meta(const std::string& key, std::string *value)
+{
+ bluestore_bdev_label_t label;
+ string p = path + "/block";
+ int r = _read_bdev_label(cct, p, &label);
+ if (r < 0) {
+ return ObjectStore::read_meta(key, value);
+ }
+ auto i = label.meta.find(key);
+ if (i == label.meta.end()) {
+ return ObjectStore::read_meta(key, value);
+ }
+ *value = i->second;
+ return 0;
+}
+
void BlueStore::_init_logger()
{
PerfCountersBuilder b(cct, "bluestore",
b.add_u64_counter(l_bluestore_gc_merged, "bluestore_gc_merged",
"Sum for extents that have been merged due to garbage "
"collection");
+ b.add_u64_counter(l_bluestore_read_eio, "bluestore_read_eio",
+ "Read EIO errors propagated to high level callers");
logger = b.create_perf_counters();
cct->get_perfcounters_collection()->add(logger);
}
int BlueStore::_open_path()
{
+ // sanity check(s)
+ if (cct->_conf->get_val<uint64_t>("osd_max_object_size") >=
+ 4*1024*1024*1024ull) {
+ derr << __func__ << " osd_max_object_size >= 4GB; BlueStore has hard limit of 4GB." << dendl;
+ return -EINVAL;
+ }
assert(path_fd < 0);
- path_fd = ::open(path.c_str(), O_DIRECTORY);
+ path_fd = TEMP_FAILURE_RETRY(::open(path.c_str(), O_DIRECTORY));
if (path_fd < 0) {
int r = -errno;
derr << __func__ << " unable to open " << path << ": " << cpp_strerror(r)
path_fd = -1;
}
-int BlueStore::_write_bdev_label(string path, bluestore_bdev_label_t label)
+int BlueStore::_write_bdev_label(CephContext *cct,
+ string path, bluestore_bdev_label_t label)
{
dout(10) << __func__ << " path " << path << " label " << label << dendl;
bufferlist bl;
z.zero();
bl.append(std::move(z));
- int fd = ::open(path.c_str(), O_WRONLY);
+ int fd = TEMP_FAILURE_RETRY(::open(path.c_str(), O_WRONLY));
if (fd < 0) {
fd = -errno;
derr << __func__ << " failed to open " << path << ": " << cpp_strerror(fd)
derr << __func__ << " failed to write to " << path
<< ": " << cpp_strerror(r) << dendl;
}
+ r = ::fsync(fd);
+ if (r < 0) {
+ derr << __func__ << " failed to fsync " << path
+ << ": " << cpp_strerror(r) << dendl;
+ }
VOID_TEMP_FAILURE_RETRY(::close(fd));
return r;
}
bluestore_bdev_label_t *label)
{
dout(10) << __func__ << dendl;
- int fd = ::open(path.c_str(), O_RDONLY);
+ int fd = TEMP_FAILURE_RETRY(::open(path.c_str(), O_RDONLY));
if (fd < 0) {
fd = -errno;
derr << __func__ << " failed to open " << path << ": " << cpp_strerror(fd)
::decode(expected_crc, p);
}
catch (buffer::error& e) {
- derr << __func__ << " unable to decode label at offset " << p.get_off()
+ dout(2) << __func__ << " unable to decode label at offset " << p.get_off()
<< ": " << e.what()
<< dendl;
- return -EINVAL;
+ return -ENOENT;
}
if (crc != expected_crc) {
derr << __func__ << " bad crc on label, expected " << expected_crc
label.size = size;
label.btime = ceph_clock_now();
label.description = desc;
- int r = _write_bdev_label(path, label);
+ int r = _write_bdev_label(cct, path, label);
if (r < 0)
return r;
} else {
int r = _read_bdev_label(cct, path, &label);
if (r < 0)
return r;
- if (label.osd_uuid != fsid) {
+ if (cct->_conf->bluestore_debug_permit_any_bdev_label) {
+ dout(20) << __func__ << " bdev " << path << " fsid " << label.osd_uuid
+ << " and fsid " << fsid << " check bypassed" << dendl;
+ }
+ else if (label.osd_uuid != fsid) {
derr << __func__ << " bdev " << path << " fsid " << label.osd_uuid
<< " does not match our fsid " << fsid << dendl;
return -EIO;
void BlueStore::_set_alloc_sizes(void)
{
- min_alloc_size_order = ctz(min_alloc_size);
- assert(min_alloc_size == 1u << min_alloc_size_order);
-
max_alloc_size = cct->_conf->bluestore_max_alloc_size;
if (cct->_conf->bluestore_prefer_deferred_size) {
block_mask = ~(block_size - 1);
block_size_order = ctz(block_size);
assert(block_size == 1u << block_size_order);
+ // and set cache_size based on device type
+ r = _set_cache_sizes();
+ if (r < 0) {
+ goto fail_close;
+ }
return 0;
fail_close:
bl.append(freelist_type);
t->set(PREFIX_SUPER, "freelist_type", bl);
}
- fm->create(bdev->get_size(), t);
+ // being able to allocate in units less than bdev block size
+ // seems to be a bad idea.
+ assert( cct->_conf->bdev_block_size <= (int64_t)min_alloc_size);
+ fm->create(bdev->get_size(), (int64_t)min_alloc_size, t);
// allocate superblock reserved space. note that we do not mark
// bluefs space as allocated in the freelist; we instead rely on
// bluefs_extents.
- fm->allocate(0, SUPER_RESERVED, t);
+ uint64_t reserved = ROUND_UP_TO(MAX(SUPER_RESERVED, min_alloc_size),
+ min_alloc_size);
+ fm->allocate(0, reserved, t);
- uint64_t reserved = 0;
if (cct->_conf->bluestore_bluefs) {
assert(bluefs_extents.num_intervals() == 1);
interval_set<uint64_t>::iterator p = bluefs_extents.begin();
- reserved = p.get_start() + p.get_len();
+ reserved = ROUND_UP_TO(p.get_start() + p.get_len(), min_alloc_size);
dout(20) << __func__ << " reserved 0x" << std::hex << reserved << std::dec
<< " for bluefs" << dendl;
bufferlist bl;
t->set(PREFIX_SUPER, "bluefs_extents", bl);
dout(20) << __func__ << " bluefs_extents 0x" << std::hex << bluefs_extents
<< std::dec << dendl;
- } else {
- reserved = SUPER_RESERVED;
}
if (cct->_conf->bluestore_debug_prefill > 0) {
db->submit_transaction_sync(t);
}
- int r = fm->init();
+ int r = fm->init(bdev->get_size());
if (r < 0) {
derr << __func__ << " freelist init failed: " << cpp_strerror(r) << dendl;
delete fm;
++num;
bytes += length;
}
+ fm->enumerate_reset();
dout(1) << __func__ << " loaded " << pretty_si_t(bytes)
<< " in " << num << " extents"
<< dendl;
return 0;
}
+bool BlueStore::is_rotational()
+{
+ if (bdev) {
+ return bdev->is_rotational();
+ }
+
+ bool rotational = true;
+ int r = _open_path();
+ if (r < 0)
+ goto out;
+ r = _open_fsid(false);
+ if (r < 0)
+ goto out_path;
+ r = _read_fsid(&fsid);
+ if (r < 0)
+ goto out_fsid;
+ r = _lock_fsid();
+ if (r < 0)
+ goto out_fsid;
+ r = _open_bdev(false);
+ if (r < 0)
+ goto out_fsid;
+ rotational = bdev->is_rotational();
+ _close_bdev();
+ out_fsid:
+ _close_fsid();
+ out_path:
+ _close_path();
+ out:
+ return rotational;
+}
+
+bool BlueStore::is_journal_rotational()
+{
+ if (!bluefs) {
+ dout(5) << __func__ << " bluefs disabled, default to store media type"
+ << dendl;
+ return is_rotational();
+ }
+ dout(10) << __func__ << " " << (int)bluefs->wal_is_rotational() << dendl;
+ return bluefs->wal_is_rotational();
+}
+
bool BlueStore::test_mount_in_use()
{
// most error conditions mean the mount is not in use (e.g., because
bluefs_shared_bdev = BlueFS::BDEV_SLOW;
bluefs_single_shared_device = false;
} else {
- bluefs_shared_bdev = BlueFS::BDEV_DB;
+ r = -errno;
+ if (::lstat(bfn.c_str(), &st) == -1) {
+ r = 0;
+ bluefs_shared_bdev = BlueFS::BDEV_DB;
+ } else {
+ derr << __func__ << " " << bfn << " symlink exists but target unusable: "
+ << cpp_strerror(r) << dendl;
+ goto free_bluefs;
+ }
}
// shared device
bdev->get_size() * (cct->_conf->bluestore_bluefs_min_ratio +
cct->_conf->bluestore_bluefs_gift_ratio);
initial = MAX(initial, cct->_conf->bluestore_bluefs_min);
+ if (cct->_conf->bluefs_alloc_size % min_alloc_size) {
+ derr << __func__ << " bluefs_alloc_size 0x" << std::hex
+ << cct->_conf->bluefs_alloc_size << " is not a multiple of "
+ << "min_alloc_size 0x" << min_alloc_size << std::dec << dendl;
+ r = -EINVAL;
+ goto free_bluefs;
+ }
// align to bluefs's alloc_size
initial = P2ROUNDUP(initial, cct->_conf->bluefs_alloc_size);
- initial += cct->_conf->bluefs_alloc_size - SUPER_RESERVED;
- bluefs->add_block_extent(bluefs_shared_bdev, SUPER_RESERVED, initial);
- bluefs_extents.insert(SUPER_RESERVED, initial);
+ // put bluefs in the middle of the device in case it is an HDD
+ uint64_t start = P2ALIGN((bdev->get_size() - initial) / 2,
+ cct->_conf->bluefs_alloc_size);
+ bluefs->add_block_extent(bluefs_shared_bdev, start, initial);
+ bluefs_extents.insert(start, initial);
}
bfn = path + "/block.wal";
cct->_conf->set_val("rocksdb_separate_wal_dir", "true");
bluefs_single_shared_device = false;
} else {
- cct->_conf->set_val("rocksdb_separate_wal_dir", "false");
+ r = -errno;
+ if (::lstat(bfn.c_str(), &st) == -1) {
+ r = 0;
+ cct->_conf->set_val("rocksdb_separate_wal_dir", "false");
+ } else {
+ derr << __func__ << " " << bfn << " symlink exists but target unusable: "
+ << cpp_strerror(r) << dendl;
+ goto free_bluefs;
+ }
}
if (create) {
FreelistManager::setup_merge_operators(db);
db->set_merge_operator(PREFIX_STAT, merge_op);
+ db->set_cache_size(cache_size * cache_kv_ratio);
+
if (kv_backend == "rocksdb")
options = cct->_conf->bluestore_rocksdb_options;
db->init(options);
<< " > max_ratio " << cct->_conf->bluestore_bluefs_max_ratio
<< ", should reclaim " << pretty_si_t(reclaim) << dendl;
}
+
+ // don't take over too much of the freespace
+ uint64_t free_cap = cct->_conf->bluestore_bluefs_max_ratio * total_free;
if (bluefs_total < cct->_conf->bluestore_bluefs_min &&
- cct->_conf->bluestore_bluefs_min <
- (uint64_t)(cct->_conf->bluestore_bluefs_max_ratio * total_free)) {
+ cct->_conf->bluestore_bluefs_min < free_cap) {
uint64_t g = cct->_conf->bluestore_bluefs_min - bluefs_total;
dout(10) << __func__ << " bluefs_total " << bluefs_total
<< " < min " << cct->_conf->bluestore_bluefs_min
gift = g;
reclaim = 0;
}
+ uint64_t min_free = cct->_conf->get_val<uint64_t>("bluestore_bluefs_min_free");
+ if (bluefs_free < min_free &&
+ min_free < free_cap) {
+ uint64_t g = min_free - bluefs_free;
+ dout(10) << __func__ << " bluefs_free " << bluefs_total
+ << " < min " << min_free
+ << ", should gift " << pretty_si_t(g) << dendl;
+ if (g > gift)
+ gift = g;
+ reclaim = 0;
+ }
if (gift) {
// round up to alloc size
int64_t alloc_len = alloc->allocate(gift, cct->_conf->bluefs_alloc_size,
0, 0, &exts);
- if (alloc_len < (int64_t)gift) {
- derr << __func__ << " allocate failed on 0x" << std::hex << gift
- << " min_alloc_size 0x" << min_alloc_size << std::dec << dendl;
+ if (alloc_len <= 0) {
+ dout(1) << __func__ << " no allocate on 0x" << std::hex << gift
+ << " min_alloc_size 0x" << min_alloc_size << std::dec << dendl;
+ alloc->unreserve(gift);
+ alloc->dump();
+ return 0;
+ } else if (alloc_len < (int64_t)gift) {
+ dout(1) << __func__ << " insufficient allocate on 0x" << std::hex << gift
+ << " min_alloc_size 0x" << min_alloc_size
+ << " allocated 0x" << alloc_len
+ << std::dec << dendl;
+ alloc->unreserve(gift - alloc_len);
alloc->dump();
- assert(0 == "allocate failed, wtf");
- return -ENOSPC;
}
for (auto& p : exts) {
bluestore_pextent_t e = bluestore_pextent_t(p);
int BlueStore::_open_collections(int *errors)
{
+ dout(10) << __func__ << dendl;
assert(coll_map.empty());
KeyValueDB::Iterator it = db->get_iterator(PREFIX_COLL);
for (it->upper_bound(string());
<< pretty_binary_string(it->key()) << dendl;
return -EIO;
}
- dout(20) << __func__ << " opened " << cid << " " << c << dendl;
+ dout(20) << __func__ << " opened " << cid << " " << c
+ << " " << c->cnode << dendl;
coll_map[cid] = c;
} else {
derr << __func__ << " unrecognized collection " << it->key() << dendl;
return 0;
}
+void BlueStore::_open_statfs()
+{
+ bufferlist bl;
+ int r = db->get(PREFIX_STAT, "bluestore_statfs", &bl);
+ if (r >= 0) {
+ if (size_t(bl.length()) >= sizeof(vstatfs.values)) {
+ auto it = bl.begin();
+ vstatfs.decode(it);
+ } else {
+ dout(10) << __func__ << " store_statfs is corrupt, using empty" << dendl;
+ }
+ }
+ else {
+ dout(10) << __func__ << " store_statfs missed, using empty" << dendl;
+ }
+}
+
int BlueStore::_setup_block_symlink_or_file(
string name,
string epath,
}
if (cct->_conf->bluestore_block_preallocate_file) {
-#ifdef HAVE_POSIX_FALLOCATE
- r = ::posix_fallocate(fd, 0, size);
- if (r) {
+ r = ::ceph_posix_fallocate(fd, 0, size);
+ if (r > 0) {
derr << __func__ << " failed to prefallocate " << name << " file to "
<< size << ": " << cpp_strerror(r) << dendl;
VOID_TEMP_FAILURE_RETRY(::close(fd));
return -r;
}
-#else
- char data[1024*128];
- for (uint64_t off = 0; off < size; off += sizeof(data)) {
- if (off + sizeof(data) > size)
- r = ::write(fd, data, size - off);
- else
- r = ::write(fd, data, sizeof(data));
- if (r < 0) {
- r = -errno;
- derr << __func__ << " failed to prefallocate w/ write " << name << " file to "
- << size << ": " << cpp_strerror(r) << dendl;
- VOID_TEMP_FAILURE_RETRY(::close(fd));
- return r;
- }
- }
-#endif
}
dout(1) << __func__ << " resized " << name << " file to "
<< pretty_si_t(size) << "B" << dendl;
if (r < 0)
goto out_close_fsid;
+ // choose min_alloc_size
+ if (cct->_conf->bluestore_min_alloc_size) {
+ min_alloc_size = cct->_conf->bluestore_min_alloc_size;
+ } else {
+ assert(bdev);
+ if (bdev->is_rotational()) {
+ min_alloc_size = cct->_conf->bluestore_min_alloc_size_hdd;
+ } else {
+ min_alloc_size = cct->_conf->bluestore_min_alloc_size_ssd;
+ }
+ }
+
+ // make sure min_alloc_size is power of 2 aligned.
+ if (!ISP2(min_alloc_size)) {
+ derr << __func__ << " min_alloc_size 0x"
+ << std::hex << min_alloc_size << std::dec
+ << " is not power of 2 aligned!"
+ << dendl;
+ r = -EINVAL;
+ goto out_close_bdev;
+ }
+
r = _open_db(true);
if (r < 0)
goto out_close_bdev;
t->set(PREFIX_SUPER, "blobid_max", bl);
}
- // choose min_alloc_size
- if (cct->_conf->bluestore_min_alloc_size) {
- min_alloc_size = cct->_conf->bluestore_min_alloc_size;
- } else {
- assert(bdev);
- if (bdev->is_rotational()) {
- min_alloc_size = cct->_conf->bluestore_min_alloc_size_hdd;
- } else {
- min_alloc_size = cct->_conf->bluestore_min_alloc_size_ssd;
- }
- }
- _set_alloc_sizes();
{
bufferlist bl;
::encode((uint64_t)min_alloc_size, bl);
db->submit_transaction_sync(t);
}
- r = _open_alloc();
- if (r < 0)
- goto out_close_fm;
r = write_meta("kv_backend", cct->_conf->bluestore_kvbackend);
if (r < 0)
- goto out_close_alloc;
- r = write_meta("bluefs", stringify((int)cct->_conf->bluestore_bluefs));
+ goto out_close_fm;
+
+ r = write_meta("bluefs", stringify(bluefs ? 1 : 0));
if (r < 0)
- goto out_close_alloc;
+ goto out_close_fm;
if (fsid != old_fsid) {
r = _write_fsid();
if (r < 0) {
derr << __func__ << " error writing fsid: " << cpp_strerror(r) << dendl;
- goto out_close_alloc;
+ goto out_close_fm;
}
}
- // indicate success by writing the 'mkfs_done' file
- r = write_meta("mkfs_done", "yes");
- if (r < 0)
- goto out_close_alloc;
- dout(10) << __func__ << " success" << dendl;
-
- out_close_alloc:
- _close_alloc();
out_close_fm:
_close_fm();
out_close_db:
r = -EIO;
}
}
+
+ if (r == 0) {
+ // indicate success by writing the 'mkfs_done' file
+ r = write_meta("mkfs_done", "yes");
+ }
+
if (r < 0) {
derr << __func__ << " failed, " << cpp_strerror(r) << dendl;
+ } else {
+ dout(0) << __func__ << " success" << dendl;
}
return r;
}
{
dout(1) << __func__ << " path " << path << dendl;
+ _kv_only = kv_only;
+
{
string type;
int r = read_meta("type", &type);
goto out_coll;
}
- for (auto f : finishers) {
- f->start();
- }
- kv_sync_thread.create("bstore_kv_sync");
+ _kv_start();
r = _deferred_replay();
if (r < 0)
mempool_thread.init();
-
mounted = true;
return 0;
out_stop:
_kv_stop();
- for (auto f : finishers) {
- f->wait_for_empty();
- f->stop();
- }
out_coll:
- flush_cache();
+ _flush_cache();
out_alloc:
_close_alloc();
out_fm:
int BlueStore::umount()
{
- assert(mounted);
+ assert(_kv_only || mounted);
dout(1) << __func__ << dendl;
_osr_drain_all();
_osr_unregister_all();
- mempool_thread.shutdown();
+ mounted = false;
+ if (!_kv_only) {
+ mempool_thread.shutdown();
+ dout(20) << __func__ << " stopping kv thread" << dendl;
+ _kv_stop();
+ _flush_cache();
+ dout(20) << __func__ << " closing" << dendl;
- dout(20) << __func__ << " stopping kv thread" << dendl;
- _kv_stop();
- for (auto f : finishers) {
- dout(20) << __func__ << " draining finisher" << dendl;
- f->wait_for_empty();
- dout(20) << __func__ << " stopping finisher" << dendl;
- f->stop();
+ _close_alloc();
+ _close_fm();
}
- _reap_collections();
- flush_cache();
- dout(20) << __func__ << " closing" << dendl;
-
- mounted = false;
- _close_alloc();
- _close_fm();
_close_db();
_close_bdev();
_close_fsid();
uint64_t len,
uint64_t granularity,
BlueStore::mempool_dynamic_bitset &bitset,
- const char *what,
std::function<void(uint64_t,
BlueStore::mempool_dynamic_bitset &)> f) {
auto end = ROUND_UP_TO(off + len, granularity);
const PExtentVector& extents,
bool compressed,
mempool_dynamic_bitset &used_blocks,
+ uint64_t granularity,
store_statfs_t& expected_statfs)
{
dout(30) << __func__ << " oid " << oid << " extents " << extents << dendl;
}
bool already = false;
apply(
- e.offset, e.length, block_size, used_blocks, __func__,
+ e.offset, e.length, granularity, used_blocks,
[&](uint64_t pos, mempool_dynamic_bitset &bs) {
+ assert(pos < bs.size());
if (bs.test(pos))
already = true;
else
return errors;
}
-int BlueStore::fsck(bool deep)
+int BlueStore::_fsck(bool deep, bool repair)
{
- dout(1) << __func__ << (deep ? " (deep)" : " (shallow)") << " start" << dendl;
+ dout(1) << __func__
+ << (repair ? " fsck" : " repair")
+ << (deep ? " (deep)" : " (shallow)") << " start" << dendl;
int errors = 0;
- mempool::bluestore_fsck::set<uint64_t> used_nids;
- mempool::bluestore_fsck::set<uint64_t> used_omap_head;
+ int repaired = 0;
+
+ typedef btree::btree_set<
+ uint64_t,std::less<uint64_t>,
+ mempool::bluestore_fsck::pool_allocator<uint64_t>> uint64_t_btree_t;
+ uint64_t_btree_t used_nids;
+ uint64_t_btree_t used_omap_head;
+ uint64_t_btree_t used_sbids;
+
mempool_dynamic_bitset used_blocks;
- mempool::bluestore_fsck::set<uint64_t> used_sbids;
KeyValueDB::Iterator it;
store_statfs_t expected_statfs, actual_statfs;
struct sb_info_t {
mempool_thread.init();
+ // we need finishers and kv_{sync,finalize}_thread *just* for replay
+ _kv_start();
r = _deferred_replay();
+ _kv_stop();
if (r < 0)
goto out_scan;
- used_blocks.resize(bdev->get_size() / block_size);
+ used_blocks.resize(fm->get_alloc_units());
apply(
- 0, SUPER_RESERVED, block_size, used_blocks, "0~SUPER_RESERVED",
+ 0, MAX(min_alloc_size, SUPER_RESERVED), fm->get_alloc_size(), used_blocks,
[&](uint64_t pos, mempool_dynamic_bitset &bs) {
+ assert(pos < bs.size());
bs.set(pos);
}
);
if (bluefs) {
for (auto e = bluefs_extents.begin(); e != bluefs_extents.end(); ++e) {
apply(
- e.get_start(), e.get_len(), block_size, used_blocks, "bluefs",
+ e.get_start(), e.get_len(), fm->get_alloc_size(), used_blocks,
[&](uint64_t pos, mempool_dynamic_bitset &bs) {
+ assert(pos < bs.size());
bs.set(pos);
}
);
spg_t pgid;
mempool::bluestore_fsck::list<string> expecting_shards;
for (it->lower_bound(string()); it->valid(); it->next()) {
+ if (g_conf->bluestore_debug_fsck_abort) {
+ goto out_scan;
+ }
dout(30) << " key " << pretty_binary_string(it->key()) << dendl;
if (is_extent_shard_key(it->key())) {
while (!expecting_shards.empty() &&
expecting_shards.front() < it->key()) {
- derr << __func__ << " error: missing shard key "
+ derr << "fsck error: missing shard key "
<< pretty_binary_string(expecting_shards.front())
<< dendl;
++errors;
uint32_t offset;
string okey;
get_key_extent_shard(it->key(), &okey, &offset);
- derr << __func__ << " error: stray shard 0x" << std::hex << offset
+ derr << "fsck error: stray shard 0x" << std::hex << offset
<< std::dec << dendl;
if (expecting_shards.empty()) {
- derr << __func__ << " error: " << pretty_binary_string(it->key())
+ derr << "fsck error: " << pretty_binary_string(it->key())
<< " is unexpected" << dendl;
++errors;
continue;
}
while (expecting_shards.front() > it->key()) {
- derr << __func__ << " error: saw " << pretty_binary_string(it->key())
+ derr << "fsck error: saw " << pretty_binary_string(it->key())
<< dendl;
- derr << __func__ << " error: exp "
+ derr << "fsck error: exp "
<< pretty_binary_string(expecting_shards.front()) << dendl;
++errors;
expecting_shards.pop_front();
ghobject_t oid;
int r = get_key_object(it->key(), &oid);
if (r < 0) {
- derr << __func__ << " error: bad object key "
+ derr << "fsck error: bad object key "
<< pretty_binary_string(it->key()) << dendl;
++errors;
continue;
}
}
if (!c) {
- derr << __func__ << " error: stray object " << oid
+ derr << "fsck error: stray object " << oid
<< " not owned by any collection" << dendl;
++errors;
continue;
}
c->cid.is_pg(&pgid);
- dout(20) << __func__ << " collection " << c->cid << dendl;
+ dout(20) << __func__ << " collection " << c->cid << " " << c->cnode
+ << dendl;
}
if (!expecting_shards.empty()) {
for (auto &k : expecting_shards) {
- derr << __func__ << " error: missing shard key "
+ derr << "fsck error: missing shard key "
<< pretty_binary_string(k) << dendl;
}
++errors;
OnodeRef o = c->get_onode(oid, false);
if (o->onode.nid) {
if (o->onode.nid > nid_max) {
- derr << __func__ << " error: " << oid << " nid " << o->onode.nid
+ derr << "fsck error: " << oid << " nid " << o->onode.nid
<< " > nid_max " << nid_max << dendl;
++errors;
}
if (used_nids.count(o->onode.nid)) {
- derr << __func__ << " error: " << oid << " nid " << o->onode.nid
+ derr << "fsck error: " << oid << " nid " << o->onode.nid
<< " already in use" << dendl;
++errors;
continue; // go for next object
get_extent_shard_key(o->key, s.shard_info->offset,
&expecting_shards.back());
if (s.shard_info->offset >= o->onode.size) {
- derr << __func__ << " error: " << oid << " shard 0x" << std::hex
+ derr << "fsck error: " << oid << " shard 0x" << std::hex
<< s.shard_info->offset << " past EOF at 0x" << o->onode.size
<< std::dec << dendl;
++errors;
for (auto& l : o->extent_map.extent_map) {
dout(20) << __func__ << " " << l << dendl;
if (l.logical_offset < pos) {
- derr << __func__ << " error: " << oid << " lextent at 0x"
+ derr << "fsck error: " << oid << " lextent at 0x"
<< std::hex << l.logical_offset
<< " overlaps with the previous, which ends at 0x" << pos
<< std::dec << dendl;
++errors;
}
if (o->extent_map.spans_shard(l.logical_offset, l.length)) {
- derr << __func__ << " error: " << oid << " lextent at 0x"
+ derr << "fsck error: " << oid << " lextent at 0x"
<< std::hex << l.logical_offset << "~" << l.length
<< " spans a shard boundary"
<< std::dec << dendl;
<< std::dec << " for " << *i.first << dendl;
const bluestore_blob_t& blob = i.first->get_blob();
if (i.second & blob.unused) {
- derr << __func__ << " error: " << oid << " blob claims unused 0x"
+ derr << "fsck error: " << oid << " blob claims unused 0x"
<< std::hex << blob.unused
<< " but extents reference 0x" << i.second
<< " on blob " << *i.first << dendl;
if ((blob.unused & mask) == mask) {
// this csum chunk region is marked unused
if (blob.get_csum_item(p) != 0) {
- derr << __func__ << " error: " << oid
+ derr << "fsck error: " << oid
<< " blob claims csum chunk 0x" << std::hex << pos
<< "~" << csum_chunk_size
<< " is unused (mask 0x" << mask << " of unused 0x"
const bluestore_blob_t& blob = i.first->get_blob();
bool equal = i.first->get_blob_use_tracker().equal(i.second);
if (!equal) {
- derr << __func__ << " error: " << oid << " blob " << *i.first
+ derr << "fsck error: " << oid << " blob " << *i.first
<< " doesn't match expected ref_map " << i.second << dendl;
++errors;
}
}
if (blob.is_shared()) {
if (i.first->shared_blob->get_sbid() > blobid_max) {
- derr << __func__ << " error: " << oid << " blob " << blob
+ derr << "fsck error: " << oid << " blob " << blob
<< " sbid " << i.first->shared_blob->get_sbid() << " > blobid_max "
<< blobid_max << dendl;
++errors;
} else if (i.first->shared_blob->get_sbid() == 0) {
- derr << __func__ << " error: " << oid << " blob " << blob
+ derr << "fsck error: " << oid << " blob " << blob
<< " marked as shared but has uninitialized sbid"
<< dendl;
++errors;
errors += _fsck_check_extents(oid, blob.get_extents(),
blob.is_compressed(),
used_blocks,
+ fm->get_alloc_size(),
expected_statfs);
}
}
int r = _do_read(c.get(), o, 0, o->onode.size, bl, 0);
if (r < 0) {
++errors;
- derr << __func__ << " error: " << oid << " error during read: "
+ derr << "fsck error: " << oid << " error during read: "
<< cpp_strerror(r) << dendl;
}
}
// omap
if (o->onode.has_omap()) {
if (used_omap_head.count(o->onode.nid)) {
- derr << __func__ << " error: " << oid << " omap_head " << o->onode.nid
+ derr << "fsck error: " << oid << " omap_head " << o->onode.nid
<< " already in use" << dendl;
++errors;
} else {
used_omap_head.insert(o->onode.nid);
}
}
- c->trim_cache();
}
}
dout(1) << __func__ << " checking shared_blobs" << dendl;
string key = it->key();
uint64_t sbid;
if (get_key_shared_blob(key, &sbid)) {
- derr << __func__ << " error: bad key '" << key
+ derr << "fsck error: bad key '" << key
<< "' in shared blob namespace" << dendl;
++errors;
continue;
}
auto p = sb_info.find(sbid);
if (p == sb_info.end()) {
- derr << __func__ << " error: found stray shared blob data for sbid 0x"
+ derr << "fsck error: found stray shared blob data for sbid 0x"
<< std::hex << sbid << std::dec << dendl;
++errors;
} else {
::decode(shared_blob, blp);
dout(20) << __func__ << " " << *sbi.sb << " " << shared_blob << dendl;
if (shared_blob.ref_map != sbi.ref_map) {
- derr << __func__ << " error: shared blob 0x" << std::hex << sbid
+ derr << "fsck error: shared blob 0x" << std::hex << sbid
<< std::dec << " ref_map " << shared_blob.ref_map
<< " != expected " << sbi.ref_map << dendl;
++errors;
errors += _fsck_check_extents(p->second.oids.front(),
extents,
p->second.compressed,
- used_blocks, expected_statfs);
+ used_blocks,
+ fm->get_alloc_size(),
+ expected_statfs);
sb_info.erase(p);
}
}
}
for (auto &p : sb_info) {
- derr << __func__ << " error: shared_blob 0x" << p.first
+ derr << "fsck error: shared_blob 0x" << p.first
<< " key is missing (" << *p.second.sb << ")" << dendl;
++errors;
}
if (!(actual_statfs == expected_statfs)) {
- derr << __func__ << " error: actual " << actual_statfs
+ derr << "fsck error: actual " << actual_statfs
<< " != expected " << expected_statfs << dendl;
++errors;
}
uint64_t omap_head;
_key_decode_u64(it->key().c_str(), &omap_head);
if (used_omap_head.count(omap_head) == 0) {
- derr << __func__ << " error: found stray omap data on omap_head "
+ derr << "fsck error: found stray omap data on omap_head "
<< omap_head << dendl;
++errors;
}
try {
::decode(wt, p);
} catch (buffer::error& e) {
- derr << __func__ << " error: failed to decode deferred txn "
+ derr << "fsck error: failed to decode deferred txn "
<< pretty_binary_string(it->key()) << dendl;
r = -EIO;
goto out_scan;
<< " released 0x" << std::hex << wt.released << std::dec << dendl;
for (auto e = wt.released.begin(); e != wt.released.end(); ++e) {
apply(
- e.get_start(), e.get_len(), block_size, used_blocks, "deferred",
+ e.get_start(), e.get_len(), fm->get_alloc_size(), used_blocks,
[&](uint64_t pos, mempool_dynamic_bitset &bs) {
+ assert(pos < bs.size());
bs.set(pos);
}
);
// know they are allocated.
for (auto e = bluefs_extents.begin(); e != bluefs_extents.end(); ++e) {
apply(
- e.get_start(), e.get_len(), block_size, used_blocks, "bluefs_extents",
+ e.get_start(), e.get_len(), fm->get_alloc_size(), used_blocks,
[&](uint64_t pos, mempool_dynamic_bitset &bs) {
+ assert(pos < bs.size());
bs.reset(pos);
}
);
while (fm->enumerate_next(&offset, &length)) {
bool intersects = false;
apply(
- offset, length, block_size, used_blocks, "free",
+ offset, length, fm->get_alloc_size(), used_blocks,
[&](uint64_t pos, mempool_dynamic_bitset &bs) {
+ assert(pos < bs.size());
if (bs.test(pos)) {
intersects = true;
} else {
}
);
if (intersects) {
- derr << __func__ << " error: free extent 0x" << std::hex << offset
- << "~" << length << std::dec
- << " intersects allocated blocks" << dendl;
- ++errors;
+ if (offset == SUPER_RESERVED &&
+ length == min_alloc_size - SUPER_RESERVED) {
+ // this is due to the change just after luminous to min_alloc_size
+ // granularity allocations, and our baked in assumption at the top
+ // of _fsck that 0~ROUND_UP_TO(SUPER_RESERVED,min_alloc_size) is used
+ // (vs luminous's ROUND_UP_TO(SUPER_RESERVED,block_size)). harmless,
+ // since we will never allocate this region below min_alloc_size.
+ dout(10) << __func__ << " ignoring free extent between SUPER_RESERVED"
+ << " and min_alloc_size, 0x" << std::hex << offset << "~"
+ << length << dendl;
+ } else {
+ derr << "fsck error: free extent 0x" << std::hex << offset
+ << "~" << length << std::dec
+ << " intersects allocated blocks" << dendl;
+ ++errors;
+ }
}
}
+ fm->enumerate_reset();
size_t count = used_blocks.count();
if (used_blocks.size() != count) {
assert(used_blocks.size() > count);
- derr << __func__ << " error: leaked some space;"
- << (used_blocks.size() - count) * min_alloc_size
- << " bytes leaked" << dendl;
++errors;
+ used_blocks.flip();
+ size_t start = used_blocks.find_first();
+ while (start != decltype(used_blocks)::npos) {
+ size_t cur = start;
+ while (true) {
+ size_t next = used_blocks.find_next(cur);
+ if (next != cur + 1) {
+ derr << "fsck error: leaked extent 0x" << std::hex
+ << ((uint64_t)start * fm->get_alloc_size()) << "~"
+ << ((cur + 1 - start) * fm->get_alloc_size()) << std::dec
+ << dendl;
+ start = next;
+ break;
+ }
+ cur = next;
+ }
+ }
+ used_blocks.flip();
}
}
out_scan:
mempool_thread.shutdown();
- flush_cache();
+ _flush_cache();
out_alloc:
_close_alloc();
out_fm:
<< dendl;
utime_t duration = ceph_clock_now() - start;
- dout(1) << __func__ << " finish with " << errors << " errors in "
+ dout(1) << __func__ << " finish with " << errors << " errors, " << repaired
+ << " repaired, " << (errors - repaired) << " remaining in "
<< duration << " seconds" << dendl;
- return errors;
+ return errors - repaired;
}
void BlueStore::collect_metadata(map<string,string> *pm)
buf->available = alloc->get_free();
if (bluefs) {
- // part of our shared device is "free" according to BlueFS
- // Don't include bluestore_bluefs_min because that space can't
- // be used for any other purpose.
- buf->available += bluefs->get_free(bluefs_shared_bdev) - cct->_conf->bluestore_bluefs_min;
-
- // include dedicated db, too, if that isn't the shared device.
- if (bluefs_shared_bdev != BlueFS::BDEV_DB) {
- buf->total += bluefs->get_total(BlueFS::BDEV_DB);
+ // part of our shared device is "free" according to BlueFS, but we
+ // can't touch bluestore_bluefs_min of it.
+ int64_t shared_available = std::min(
+ bluefs->get_free(bluefs_shared_bdev),
+ bluefs->get_total(bluefs_shared_bdev) - cct->_conf->bluestore_bluefs_min);
+ if (shared_available > 0) {
+ buf->available += shared_available;
}
}
- bufferlist bl;
- int r = db->get(PREFIX_STAT, "bluestore_statfs", &bl);
- if (r >= 0) {
- TransContext::volatile_statfs vstatfs;
- if (size_t(bl.length()) >= sizeof(vstatfs.values)) {
- auto it = bl.begin();
- vstatfs.decode(it);
-
- buf->allocated = vstatfs.allocated();
- buf->stored = vstatfs.stored();
- buf->compressed = vstatfs.compressed();
- buf->compressed_original = vstatfs.compressed_original();
- buf->compressed_allocated = vstatfs.compressed_allocated();
- } else {
- dout(10) << __func__ << " store_statfs is corrupt, using empty" << dendl;
- }
- } else {
- dout(10) << __func__ << " store_statfs missed, using empty" << dendl;
+ {
+ std::lock_guard<std::mutex> l(vstatfs_lock);
+
+ buf->allocated = vstatfs.allocated();
+ buf->stored = vstatfs.stored();
+ buf->compressed = vstatfs.compressed();
+ buf->compressed_original = vstatfs.compressed_original();
+ buf->compressed_allocated = vstatfs.compressed_allocated();
}
-
dout(20) << __func__ << *buf << dendl;
return 0;
}
void BlueStore::_queue_reap_collection(CollectionRef& c)
{
dout(10) << __func__ << " " << c << " " << c->cid << dendl;
- std::lock_guard<std::mutex> l(reap_lock);
+ // _reap_collections and this in the same thread,
+ // so no need a lock.
removed_collections.push_back(c);
}
void BlueStore::_reap_collections()
{
+
list<CollectionRef> removed_colls;
{
- std::lock_guard<std::mutex> l(reap_lock);
- removed_colls.swap(removed_collections);
+ // _queue_reap_collection and this in the same thread.
+ // So no need a lock.
+ if (!removed_collections.empty())
+ removed_colls.swap(removed_collections);
+ else
+ return;
}
- bool all_reaped = true;
-
- for (list<CollectionRef>::iterator p = removed_colls.begin();
- p != removed_colls.end();
- ++p) {
+ list<CollectionRef>::iterator p = removed_colls.begin();
+ while (p != removed_colls.end()) {
CollectionRef c = *p;
dout(10) << __func__ << " " << c << " " << c->cid << dendl;
if (c->onode_map.map_any([&](OnodeRef o) {
if (o->flushing_count.load()) {
dout(10) << __func__ << " " << c << " " << c->cid << " " << o->oid
<< " flush_txns " << o->flushing_count << dendl;
- return false;
+ return true;
}
- return true;
+ return false;
})) {
- all_reaped = false;
+ ++p;
continue;
}
c->onode_map.clear();
+ p = removed_colls.erase(p);
dout(10) << __func__ << " " << c << " " << c->cid << " done" << dendl;
}
-
- if (all_reaped) {
+ if (removed_colls.empty()) {
dout(10) << __func__ << " all reaped" << dendl;
+ } else {
+ removed_collections.splice(removed_collections.begin(), removed_colls);
}
}
r = false;
}
- c->trim_cache();
return r;
}
st->st_nlink = 1;
}
- c->trim_cache();
int r = 0;
if (_debug_mdata_eio(oid)) {
r = -EIO;
uint64_t offset,
size_t length,
bufferlist& bl,
- uint32_t op_flags,
- bool allow_eio)
+ uint32_t op_flags)
{
CollectionHandle c = _get_collection(cid);
if (!c)
return -ENOENT;
- return read(c, oid, offset, length, bl, op_flags, allow_eio);
+ return read(c, oid, offset, length, bl, op_flags);
}
int BlueStore::read(
uint64_t offset,
size_t length,
bufferlist& bl,
- uint32_t op_flags,
- bool allow_eio)
+ uint32_t op_flags)
{
utime_t start = ceph_clock_now();
Collection *c = static_cast<Collection *>(c_.get());
length = o->onode.size;
r = _do_read(c, o, offset, length, bl, op_flags);
+ if (r == -EIO) {
+ logger->inc(l_bluestore_read_eio);
+ }
}
out:
- assert(allow_eio || r != -EIO);
- c->trim_cache();
- if (r == 0 && _debug_data_eio(oid)) {
+ if (r >= 0 && _debug_data_eio(oid)) {
r = -EIO;
derr << __func__ << " " << c->cid << " " << oid << " INJECT EIO" << dendl;
+ } else if (cct->_conf->bluestore_debug_random_read_err &&
+ (rand() % (int)(cct->_conf->bluestore_debug_random_read_err * 100.0)) == 0) {
+ dout(0) << __func__ << ": inject random EIO" << dendl;
+ r = -EIO;
}
dout(10) << __func__ << " " << cid << " " << oid
<< " 0x" << std::hex << offset << "~" << length << std::dec
uint32_t op_flags)
{
FUNCTRACE();
- boost::intrusive::set<Extent>::iterator ep, eend;
int r = 0;
dout(20) << __func__ << " 0x" << std::hex << offset << "~" << length
pos += hole;
left -= hole;
}
- BlobRef bptr = lp->blob;
+ BlobRef& bptr = lp->blob;
unsigned l_off = pos - lp->logical_offset;
unsigned b_off = l_off + lp->blob_offset;
unsigned b_len = std::min(left, lp->length - l_off);
// measure the whole block below.
// The error isn't that much...
vector<bufferlist> compressed_blob_bls;
- IOContext ioc(cct, NULL);
+ IOContext ioc(cct, NULL, true); // allow EIO
for (auto& p : blobs2read) {
- BlobRef bptr = p.first;
+ const BlobRef& bptr = p.first;
dout(20) << __func__ << " blob " << *bptr << std::hex
<< " need " << p.second << std::dec << dendl;
if (bptr->get_blob().is_compressed()) {
return r;
return 0;
});
+ if (r < 0) {
+ derr << __func__ << " bdev-read failed: " << cpp_strerror(r) << dendl;
+ if (r == -EIO) {
+ // propagate EIO to caller
+ return r;
+ }
assert(r == 0);
+ }
} else {
// read the pieces
for (auto& reg : p.second) {
return r;
return 0;
});
- assert(r == 0);
+ if (r < 0) {
+ derr << __func__ << " bdev-read failed: " << cpp_strerror(r)
+ << dendl;
+ if (r == -EIO) {
+ // propagate EIO to caller
+ return r;
+ }
+ assert(r == 0);
+ }
assert(reg.bl.length() == r_len);
}
}
bdev->aio_submit(&ioc);
dout(20) << __func__ << " waiting for aio" << dendl;
ioc.aio_wait();
+ r = ioc.get_return_value();
+ if (r < 0) {
+ assert(r == -EIO); // no other errors allowed
+ return -EIO;
+ }
}
logger->tinc(l_bluestore_read_wait_aio_lat, ceph_clock_now() - start);
auto p = compressed_blob_bls.begin();
blobs2read_t::iterator b2r_it = blobs2read.begin();
while (b2r_it != blobs2read.end()) {
- BlobRef bptr = b2r_it->first;
+ const BlobRef& bptr = b2r_it->first;
dout(20) << __func__ << " blob " << *bptr << std::hex
<< " need 0x" << b2r_it->second << std::dec << dendl;
if (bptr->get_blob().is_compressed()) {
}
out:
- c->trim_cache();
dout(20) << __func__ << " 0x" << std::hex << offset << "~" << length
<< " size = 0x(" << destset << ")" << std::dec << dendl;
return 0;
int r;
{
RWLock::RLocker l(c->lock);
- mempool::bluestore_meta_other::string k(name);
+ mempool::bluestore_cache_other::string k(name);
OnodeRef o = c->get_onode(oid, false);
if (!o || !o->exists) {
r = 0;
}
out:
- c->trim_cache();
if (r == 0 && _debug_mdata_eio(oid)) {
r = -EIO;
derr << __func__ << " " << c->cid << " " << oid << " INJECT EIO" << dendl;
}
out:
- c->trim_cache();
if (r == 0 && _debug_mdata_eio(oid)) {
r = -EIO;
derr << __func__ << " " << c->cid << " " << oid << " INJECT EIO" << dendl;
r = _collection_list(c, start, end, max, ls, pnext);
}
- c->trim_cache();
dout(10) << __func__ << " " << c->cid
<< " start " << start << " end " << end << " max " << max
<< " = " << r << ", ls.size() = " << ls->size()
return r;
}
-// omap reads
-
-BlueStore::OmapIteratorImpl::OmapIteratorImpl(
- CollectionRef c, OnodeRef o, KeyValueDB::Iterator it)
- : c(c), o(o), it(it)
-{
- RWLock::RLocker l(c->lock);
- if (o->onode.has_omap()) {
- get_omap_key(o->onode.nid, string(), &head);
- get_omap_tail(o->onode.nid, &tail);
- it->lower_bound(head);
- }
-}
-
-int BlueStore::OmapIteratorImpl::seek_to_first()
-{
- RWLock::RLocker l(c->lock);
- if (o->onode.has_omap()) {
- it->lower_bound(head);
- } else {
- it = KeyValueDB::Iterator();
- }
- return 0;
-}
-
-int BlueStore::OmapIteratorImpl::upper_bound(const string& after)
-{
- RWLock::RLocker l(c->lock);
- if (o->onode.has_omap()) {
- string key;
- get_omap_key(o->onode.nid, after, &key);
- it->upper_bound(key);
- } else {
- it = KeyValueDB::Iterator();
- }
- return 0;
-}
-
-int BlueStore::OmapIteratorImpl::lower_bound(const string& to)
-{
- RWLock::RLocker l(c->lock);
- if (o->onode.has_omap()) {
- string key;
- get_omap_key(o->onode.nid, to, &key);
- it->lower_bound(key);
- } else {
- it = KeyValueDB::Iterator();
- }
- return 0;
-}
-
-bool BlueStore::OmapIteratorImpl::valid()
-{
- RWLock::RLocker l(c->lock);
- return o->onode.has_omap() && it && it->valid() && it->raw_key().second <= tail;
-}
-
-int BlueStore::OmapIteratorImpl::next(bool validate)
-{
- RWLock::RLocker l(c->lock);
- if (o->onode.has_omap()) {
- it->next();
- return 0;
- } else {
- return -1;
- }
-}
-
-string BlueStore::OmapIteratorImpl::key()
-{
- RWLock::RLocker l(c->lock);
- assert(it->valid());
- string db_key = it->raw_key().second;
- string user_key;
- decode_omap_key(db_key, &user_key);
- return user_key;
-}
-
-bufferlist BlueStore::OmapIteratorImpl::value()
-{
- RWLock::RLocker l(c->lock);
- assert(it->valid());
- return it->value();
-}
-
int BlueStore::omap_get(
const coll_t& cid, ///< [in] Collection containing oid
const ghobject_t &oid, ///< [in] Object containing omap
uint64_t val;
::decode(val, p);
min_alloc_size = val;
+ min_alloc_size_order = ctz(val);
+ assert(min_alloc_size == 1u << min_alloc_size_order);
} catch (buffer::error& e) {
derr << __func__ << " unable to read min_alloc_size" << dendl;
return -EIO;
dout(10) << __func__ << " min_alloc_size 0x" << std::hex << min_alloc_size
<< std::dec << dendl;
}
+ _open_statfs();
_set_alloc_sizes();
_set_throttle_params();
void BlueStore::_assign_nid(TransContext *txc, OnodeRef o)
{
- if (o->onode.nid)
+ if (o->onode.nid) {
+ assert(o->exists);
return;
+ }
uint64_t nid = ++nid_last;
dout(20) << __func__ << " " << nid << dendl;
o->onode.nid = nid;
txc->last_nid = nid;
+ o->exists = true;
}
uint64_t BlueStore::_assign_blobid(TransContext *txc)
logger->inc(l_bluestore_compressed_allocated, txc->statfs_delta.compressed_allocated());
logger->inc(l_bluestore_compressed_original, txc->statfs_delta.compressed_original());
+ {
+ std::lock_guard<std::mutex> l(vstatfs_lock);
+ vstatfs += txc->statfs_delta;
+ }
+
bufferlist bl;
txc->statfs_delta.encode(bl);
<< dendl;
} else {
txc->state = TransContext::STATE_KV_SUBMITTED;
- int r = db->submit_transaction(txc->t);
+ int r = cct->_conf->bluestore_debug_omit_kv_commit ? 0 : db->submit_transaction(txc->t);
assert(r == 0);
_txc_applied_kv(txc);
}
kv_queue_unsubmitted.push_back(txc);
++txc->osr->kv_committing_serially;
}
+ if (txc->had_ios)
+ kv_ios++;
+ kv_throttle_costs += txc->cost;
}
return;
case TransContext::STATE_KV_SUBMITTED:
std::lock_guard<std::mutex> l(osr->qlock);
txc->state = TransContext::STATE_IO_DONE;
+ // release aio contexts (including pinned buffers).
+ txc->ioc.running_aios.clear();
+
OpSequencer::q_list_t::iterator p = osr->q.iterator_to(*txc);
while (p != osr->q.begin()) {
--p;
bufferlist bl;
::encode(*(sb->persistent), bl);
dout(20) << " shared_blob 0x" << std::hex << sbid << std::dec
- << " is " << bl.length() << dendl;
+ << " is " << bl.length() << " " << *sb << dendl;
t->set(PREFIX_SHARED_BLOB, key, bl);
}
}
}
OpSequencerRef osr = txc->osr;
- CollectionRef c;
bool empty = false;
+ bool submit_deferred = false;
OpSequencer::q_list_t releasing_txc;
{
std::lock_guard<std::mutex> l(osr->qlock);
// for _osr_drain_preceding()
notify = true;
}
+ if (txc->state == TransContext::STATE_DEFERRED_QUEUED &&
+ osr->q.size() > g_conf->bluestore_max_deferred_txc) {
+ submit_deferred = true;
+ }
break;
}
- if (!c && txc->first_collection) {
- c = txc->first_collection;
- }
osr->q.pop_front();
releasing_txc.push_back(*txc);
notify = true;
delete txc;
}
- if (c) {
- c->trim_cache();
+ if (submit_deferred) {
+ // we're pinning memory; flush! we could be more fine-grained here but
+ // i'm not sure it's worth the bother.
+ deferred_try_submit();
}
-
if (empty && osr->zombie) {
dout(10) << __func__ << " reaping empty zombie osr " << osr << dendl;
osr->_unregister();
{
// update allocator with full released set
if (!cct->_conf->bluestore_debug_no_reuse_blocks) {
- dout(10) << __func__ << " " << txc << " " << txc->released << dendl;
+ dout(10) << __func__ << " " << txc << " " << std::hex
+ << txc->released << std::dec << dendl;
for (interval_set<uint64_t>::iterator p = txc->released.begin();
p != txc->released.end();
++p) {
++deferred_aggressive; // FIXME: maybe osr-local aggressive flag?
{
// submit anything pending
- std::lock_guard<std::mutex> l(deferred_lock);
+ deferred_lock.lock();
if (osr->deferred_pending) {
- _deferred_submit(osr);
+ _deferred_submit_unlock(osr);
+ } else {
+ deferred_lock.unlock();
}
}
{
++deferred_aggressive;
{
// submit anything pending
- std::lock_guard<std::mutex> l(deferred_lock);
- _deferred_try_submit();
+ deferred_try_submit();
}
{
// wake up any previously finished deferred events
std::lock_guard<std::mutex> l(kv_lock);
kv_cond.notify_one();
}
+ {
+ std::lock_guard<std::mutex> l(kv_finalize_lock);
+ kv_finalize_cond.notify_one();
+ }
for (auto osr : s) {
dout(20) << __func__ << " drain " << osr << dendl;
osr->drain();
}
}
+void BlueStore::_kv_start()
+{
+ dout(10) << __func__ << dendl;
+
+ if (cct->_conf->bluestore_shard_finishers) {
+ if (cct->_conf->osd_op_num_shards) {
+ m_finisher_num = cct->_conf->osd_op_num_shards;
+ } else {
+ assert(bdev);
+ if (bdev->is_rotational()) {
+ m_finisher_num = cct->_conf->osd_op_num_shards_hdd;
+ } else {
+ m_finisher_num = cct->_conf->osd_op_num_shards_ssd;
+ }
+ }
+ }
+
+ assert(m_finisher_num != 0);
+
+ for (int i = 0; i < m_finisher_num; ++i) {
+ ostringstream oss;
+ oss << "finisher-" << i;
+ Finisher *f = new Finisher(cct, oss.str(), "finisher");
+ finishers.push_back(f);
+ }
+
+ deferred_finisher.start();
+ for (auto f : finishers) {
+ f->start();
+ }
+ kv_sync_thread.create("bstore_kv_sync");
+ kv_finalize_thread.create("bstore_kv_final");
+}
+
+void BlueStore::_kv_stop()
+{
+ dout(10) << __func__ << dendl;
+ {
+ std::unique_lock<std::mutex> l(kv_lock);
+ while (!kv_sync_started) {
+ kv_cond.wait(l);
+ }
+ kv_stop = true;
+ kv_cond.notify_all();
+ }
+ {
+ std::unique_lock<std::mutex> l(kv_finalize_lock);
+ while (!kv_finalize_started) {
+ kv_finalize_cond.wait(l);
+ }
+ kv_finalize_stop = true;
+ kv_finalize_cond.notify_all();
+ }
+ kv_sync_thread.join();
+ kv_finalize_thread.join();
+ assert(removed_collections.empty());
+ {
+ std::lock_guard<std::mutex> l(kv_lock);
+ kv_stop = false;
+ }
+ {
+ std::lock_guard<std::mutex> l(kv_finalize_lock);
+ kv_finalize_stop = false;
+ }
+ dout(10) << __func__ << " stopping finishers" << dendl;
+ deferred_finisher.wait_for_empty();
+ deferred_finisher.stop();
+ for (auto f : finishers) {
+ f->wait_for_empty();
+ f->stop();
+ }
+ dout(10) << __func__ << " stopped" << dendl;
+}
+
void BlueStore::_kv_sync_thread()
{
dout(10) << __func__ << " start" << dendl;
std::unique_lock<std::mutex> l(kv_lock);
+ assert(!kv_sync_started);
+ kv_sync_started = true;
+ kv_cond.notify_all();
while (true) {
assert(kv_committing.empty());
if (kv_queue.empty() &&
} else {
deque<TransContext*> kv_submitting;
deque<DeferredBatch*> deferred_done, deferred_stable;
+ uint64_t aios = 0, costs = 0;
+
dout(20) << __func__ << " committing " << kv_queue.size()
<< " submitting " << kv_queue_unsubmitted.size()
<< " deferred done " << deferred_done_queue.size()
kv_submitting.swap(kv_queue_unsubmitted);
deferred_done.swap(deferred_done_queue);
deferred_stable.swap(deferred_stable_queue);
+ aios = kv_ios;
+ costs = kv_throttle_costs;
+ kv_ios = 0;
+ kv_throttle_costs = 0;
utime_t start = ceph_clock_now();
l.unlock();
dout(30) << __func__ << " deferred_done " << deferred_done << dendl;
dout(30) << __func__ << " deferred_stable " << deferred_stable << dendl;
- int num_aios = 0;
- for (auto txc : kv_committing) {
- if (txc->had_ios) {
- ++num_aios;
- }
- }
-
bool force_flush = false;
// if bluefs is sharing the same device as data (only), then we
// can rely on the bluefs commit to flush the device and make
// deferred aios stable. that means that if we do have done deferred
// txcs AND we are not on a single device, we need to force a flush.
if (bluefs_single_shared_device && bluefs) {
- if (num_aios) {
+ if (aios) {
force_flush = true;
} else if (kv_committing.empty() && kv_submitting.empty() &&
deferred_stable.empty()) {
force_flush = true;
if (force_flush) {
- dout(20) << __func__ << " num_aios=" << num_aios
+ dout(20) << __func__ << " num_aios=" << aios
<< " force_flush=" << (int)force_flush
<< ", flushing, deferred done->stable" << dendl;
// flush/barrier on block device
t->set(PREFIX_SUPER, "blobid_max", bl);
dout(10) << __func__ << " new_blobid_max " << new_blobid_max << dendl;
}
- for (auto txc : kv_submitting) {
- assert(txc->state == TransContext::STATE_KV_QUEUED);
- txc->log_state_latency(logger, l_bluestore_state_kv_queued_lat);
- int r = db->submit_transaction(txc->t);
- assert(r == 0);
- _txc_applied_kv(txc);
- --txc->osr->kv_committing_serially;
- txc->state = TransContext::STATE_KV_SUBMITTED;
- if (txc->osr->kv_submitted_waiters) {
- std::lock_guard<std::mutex> l(txc->osr->qlock);
- if (txc->osr->_is_all_kv_submitted()) {
- txc->osr->qcond.notify_all();
+
+ for (auto txc : kv_committing) {
+ if (txc->state == TransContext::STATE_KV_QUEUED) {
+ txc->log_state_latency(logger, l_bluestore_state_kv_queued_lat);
+ int r = cct->_conf->bluestore_debug_omit_kv_commit ? 0 : db->submit_transaction(txc->t);
+ assert(r == 0);
+ _txc_applied_kv(txc);
+ --txc->osr->kv_committing_serially;
+ txc->state = TransContext::STATE_KV_SUBMITTED;
+ if (txc->osr->kv_submitted_waiters) {
+ std::lock_guard<std::mutex> l(txc->osr->qlock);
+ if (txc->osr->_is_all_kv_submitted()) {
+ txc->osr->qcond.notify_all();
+ }
}
+
+ } else {
+ assert(txc->state == TransContext::STATE_KV_SUBMITTED);
+ txc->log_state_latency(logger, l_bluestore_state_kv_queued_lat);
}
- }
- for (auto txc : kv_committing) {
if (txc->had_ios) {
--txc->osr->txc_with_unstable_io;
}
- txc->log_state_latency(logger, l_bluestore_state_kv_queued_lat);
- // release throttle *before* we commit. this allows new ops
- // to be prepared and enter pipeline while we are waiting on
- // the kv commit sync/flush. then hopefully on the next
- // iteration there will already be ops awake. otherwise, we
- // end up going to sleep, and then wake up when the very first
- // transaction is ready for commit.
- throttle_bytes.put(txc->cost);
}
+ // release throttle *before* we commit. this allows new ops
+ // to be prepared and enter pipeline while we are waiting on
+ // the kv commit sync/flush. then hopefully on the next
+ // iteration there will already be ops awake. otherwise, we
+ // end up going to sleep, and then wake up when the very first
+ // transaction is ready for commit.
+ throttle_bytes.put(costs);
+
PExtentVector bluefs_gift_extents;
if (bluefs &&
after_flush - bluefs_last_balance >
}
// submit synct synchronously (block and wait for it to commit)
- int r = db->submit_transaction_sync(synct);
+ int r = cct->_conf->bluestore_debug_omit_kv_commit ? 0 : db->submit_transaction_sync(synct);
assert(r == 0);
if (new_nid_max) {
dout(10) << __func__ << " blobid_max now " << blobid_max << dendl;
}
- utime_t finish = ceph_clock_now();
- utime_t dur_flush = after_flush - start;
- utime_t dur_kv = finish - after_flush;
- utime_t dur = finish - start;
- dout(20) << __func__ << " committed " << kv_committing.size()
- << " cleaned " << deferred_stable.size()
- << " in " << dur
- << " (" << dur_flush << " flush + " << dur_kv << " kv commit)"
- << dendl;
- if (logger) {
+ {
+ utime_t finish = ceph_clock_now();
+ utime_t dur_flush = after_flush - start;
+ utime_t dur_kv = finish - after_flush;
+ utime_t dur = finish - start;
+ dout(20) << __func__ << " committed " << kv_committing.size()
+ << " cleaned " << deferred_stable.size()
+ << " in " << dur
+ << " (" << dur_flush << " flush + " << dur_kv << " kv commit)"
+ << dendl;
logger->tinc(l_bluestore_kv_flush_lat, dur_flush);
logger->tinc(l_bluestore_kv_commit_lat, dur_kv);
logger->tinc(l_bluestore_kv_lat, dur);
}
- while (!kv_committing.empty()) {
- TransContext *txc = kv_committing.front();
+
+ if (bluefs) {
+ if (!bluefs_gift_extents.empty()) {
+ _commit_bluefs_freespace(bluefs_gift_extents);
+ }
+ for (auto p = bluefs_extents_reclaiming.begin();
+ p != bluefs_extents_reclaiming.end();
+ ++p) {
+ dout(20) << __func__ << " releasing old bluefs 0x" << std::hex
+ << p.get_start() << "~" << p.get_len() << std::dec
+ << dendl;
+ alloc->release(p.get_start(), p.get_len());
+ }
+ bluefs_extents_reclaiming.clear();
+ }
+
+ {
+ std::unique_lock<std::mutex> m(kv_finalize_lock);
+ if (kv_committing_to_finalize.empty()) {
+ kv_committing_to_finalize.swap(kv_committing);
+ } else {
+ kv_committing_to_finalize.insert(
+ kv_committing_to_finalize.end(),
+ kv_committing.begin(),
+ kv_committing.end());
+ kv_committing.clear();
+ }
+ if (deferred_stable_to_finalize.empty()) {
+ deferred_stable_to_finalize.swap(deferred_stable);
+ } else {
+ deferred_stable_to_finalize.insert(
+ deferred_stable_to_finalize.end(),
+ deferred_stable.begin(),
+ deferred_stable.end());
+ deferred_stable.clear();
+ }
+ kv_finalize_cond.notify_one();
+ }
+
+ l.lock();
+ // previously deferred "done" are now "stable" by virtue of this
+ // commit cycle.
+ deferred_stable_queue.swap(deferred_done);
+ }
+ }
+ dout(10) << __func__ << " finish" << dendl;
+ kv_sync_started = false;
+}
+
+void BlueStore::_kv_finalize_thread()
+{
+ deque<TransContext*> kv_committed;
+ deque<DeferredBatch*> deferred_stable;
+ dout(10) << __func__ << " start" << dendl;
+ std::unique_lock<std::mutex> l(kv_finalize_lock);
+ assert(!kv_finalize_started);
+ kv_finalize_started = true;
+ kv_finalize_cond.notify_all();
+ while (true) {
+ assert(kv_committed.empty());
+ assert(deferred_stable.empty());
+ if (kv_committing_to_finalize.empty() &&
+ deferred_stable_to_finalize.empty()) {
+ if (kv_finalize_stop)
+ break;
+ dout(20) << __func__ << " sleep" << dendl;
+ kv_finalize_cond.wait(l);
+ dout(20) << __func__ << " wake" << dendl;
+ } else {
+ kv_committed.swap(kv_committing_to_finalize);
+ deferred_stable.swap(deferred_stable_to_finalize);
+ l.unlock();
+ dout(20) << __func__ << " kv_committed " << kv_committed << dendl;
+ dout(20) << __func__ << " deferred_stable " << deferred_stable << dendl;
+
+ while (!kv_committed.empty()) {
+ TransContext *txc = kv_committed.front();
assert(txc->state == TransContext::STATE_KV_SUBMITTED);
_txc_state_proc(txc);
- kv_committing.pop_front();
+ kv_committed.pop_front();
}
+
for (auto b : deferred_stable) {
auto p = b->txcs.begin();
while (p != b->txcs.end()) {
}
delete b;
}
+ deferred_stable.clear();
if (!deferred_aggressive) {
- std::lock_guard<std::mutex> l(deferred_lock);
- if (deferred_queue_size >= deferred_batch_ops ||
+ if (deferred_queue_size >= deferred_batch_ops.load() ||
throttle_deferred_bytes.past_midpoint()) {
- _deferred_try_submit();
+ deferred_try_submit();
}
}
// this is as good a place as any ...
_reap_collections();
- if (bluefs) {
- if (!bluefs_gift_extents.empty()) {
- _commit_bluefs_freespace(bluefs_gift_extents);
- }
- for (auto p = bluefs_extents_reclaiming.begin();
- p != bluefs_extents_reclaiming.end();
- ++p) {
- dout(20) << __func__ << " releasing old bluefs 0x" << std::hex
- << p.get_start() << "~" << p.get_len() << std::dec
- << dendl;
- alloc->release(p.get_start(), p.get_len());
- }
- bluefs_extents_reclaiming.clear();
- }
-
l.lock();
- // previously deferred "done" are now "stable" by virtue of this
- // commit cycle.
- deferred_stable_queue.swap(deferred_done);
}
}
dout(10) << __func__ << " finish" << dendl;
+ kv_finalize_started = false;
}
bluestore_deferred_op_t *BlueStore::_get_deferred_op(
void BlueStore::_deferred_queue(TransContext *txc)
{
dout(20) << __func__ << " txc " << txc << " osr " << txc->osr << dendl;
- std::lock_guard<std::mutex> l(deferred_lock);
+ deferred_lock.lock();
if (!txc->osr->deferred_pending &&
!txc->osr->deferred_running) {
deferred_queue.push_back(*txc->osr);
}
if (deferred_aggressive &&
!txc->osr->deferred_running) {
- _deferred_submit(txc->osr.get());
+ _deferred_submit_unlock(txc->osr.get());
+ } else {
+ deferred_lock.unlock();
}
}
-void BlueStore::_deferred_try_submit()
+void BlueStore::deferred_try_submit()
{
dout(20) << __func__ << " " << deferred_queue.size() << " osrs, "
<< deferred_queue_size << " txcs" << dendl;
+ std::lock_guard<std::mutex> l(deferred_lock);
+ vector<OpSequencerRef> osrs;
+ osrs.reserve(deferred_queue.size());
for (auto& osr : deferred_queue) {
- if (!osr.deferred_running) {
- _deferred_submit(&osr);
+ osrs.push_back(&osr);
+ }
+ for (auto& osr : osrs) {
+ if (osr->deferred_pending) {
+ if (!osr->deferred_running) {
+ _deferred_submit_unlock(osr.get());
+ deferred_lock.lock();
+ } else {
+ dout(20) << __func__ << " osr " << osr << " already has running"
+ << dendl;
+ }
+ } else {
+ dout(20) << __func__ << " osr " << osr << " has no pending" << dendl;
}
}
}
-void BlueStore::_deferred_submit(OpSequencer *osr)
+void BlueStore::_deferred_submit_unlock(OpSequencer *osr)
{
dout(10) << __func__ << " osr " << osr
<< " " << osr->deferred_pending->iomap.size() << " ios pending "
bl.claim_append(i->second.bl);
++i;
}
+
+ deferred_lock.unlock();
bdev->aio_submit(&b->ioc);
}
+struct C_DeferredTrySubmit : public Context {
+ BlueStore *store;
+ C_DeferredTrySubmit(BlueStore *s) : store(s) {}
+ void finish(int r) {
+ store->deferred_try_submit();
+ }
+};
+
void BlueStore::_deferred_aio_finish(OpSequencer *osr)
{
dout(10) << __func__ << " osr " << osr << dendl;
assert(osr->deferred_running == b);
osr->deferred_running = nullptr;
if (!osr->deferred_pending) {
+ dout(20) << __func__ << " dequeueing" << dendl;
auto q = deferred_queue.iterator_to(*osr);
deferred_queue.erase(q);
} else if (deferred_aggressive) {
- _deferred_submit(osr);
+ dout(20) << __func__ << " queuing async deferred_try_submit" << dendl;
+ deferred_finisher.queue(new C_DeferredTrySubmit(this));
+ } else {
+ dout(20) << __func__ << " leaving queued, more pending" << dendl;
}
}
{
+ uint64_t costs = 0;
std::lock_guard<std::mutex> l2(osr->qlock);
for (auto& i : b->txcs) {
TransContext *txc = &i;
txc->state = TransContext::STATE_DEFERRED_CLEANUP;
- txc->osr->qcond.notify_all();
- throttle_deferred_bytes.put(txc->cost);
+ costs += txc->cost;
}
+ osr->qcond.notify_all();
+ throttle_deferred_bytes.put(costs);
std::lock_guard<std::mutex> l(kv_lock);
deferred_done_queue.emplace_back(b);
}
if (txc->deferred_txn) {
// ensure we do not block here because of deferred writes
if (!throttle_deferred_bytes.get_or_fail(txc->cost)) {
+ dout(10) << __func__ << " failed get throttle_deferred_bytes, aggressive"
+ << dendl;
+ ++deferred_aggressive;
deferred_try_submit();
+ {
+ // wake up any previously finished deferred events
+ std::lock_guard<std::mutex> l(kv_lock);
+ kv_cond.notify_one();
+ }
throttle_deferred_bytes.get(txc->cost);
- }
+ --deferred_aggressive;
+ }
}
utime_t tend = ceph_clock_now();
for (vector<coll_t>::iterator p = i.colls.begin(); p != i.colls.end();
++p, ++j) {
cvec[j] = _get_collection(*p);
-
- // note first collection we reference
- if (!txc->first_collection)
- txc->first_collection = cvec[j];
}
vector<OnodeRef> ovec(i.objects.size());
case Transaction::OP_TRUNCATE:
{
uint64_t off = op->off;
- _truncate(txc, c, o, off);
+ r = _truncate(txc, c, o, off);
}
break;
{
dout(15) << __func__ << " " << c->cid << " " << o->oid << dendl;
int r = 0;
- o->exists = true;
_assign_nid(txc, o);
txc->write_onode(o);
dout(10) << __func__ << " " << c->cid << " " << o->oid << " = " << r << dendl;
return r;
}
-void BlueStore::_dump_onode(OnodeRef o, int log_level)
+void BlueStore::_dump_onode(const OnodeRef& o, int log_level)
{
if (!cct->_conf->subsys.should_gather(ceph_subsys_bluestore, log_level))
return;
if (front_pad) {
size_t front_copy = MIN(chunk_size - front_pad, length);
bufferptr z = buffer::create_page_aligned(chunk_size);
- memset(z.c_str(), 0, front_pad);
+ z.zero(0, front_pad, false);
pad_count += front_pad;
- memcpy(z.c_str() + front_pad, bl->get_contiguous(0, front_copy), front_copy);
+ bl->copy(0, front_copy, z.c_str() + front_pad);
if (front_copy + front_pad < chunk_size) {
back_pad = chunk_size - (length + front_pad);
- memset(z.c_str() + front_pad + length, 0, back_pad);
+ z.zero(front_pad + length, back_pad, false);
pad_count += back_pad;
}
bufferlist old, t;
bl->append(z);
bl->claim_append(t);
*offset -= front_pad;
- length += front_pad + back_pad;
+ length += pad_count;
}
// back
back_pad = chunk_size - back_copy;
assert(back_copy <= length);
bufferptr tail(chunk_size);
- memcpy(tail.c_str(), bl->get_contiguous(length - back_copy, back_copy),
- back_copy);
- memset(tail.c_str() + back_copy, 0, back_pad);
+ bl->copy(length - back_copy, back_copy, tail.c_str());
+ tail.zero(back_copy, back_pad, false);
bufferlist old;
old.swap(*bl);
bl->substr_of(old, 0, length - back_copy);
// search suitable extent in both forward and reverse direction in
// [offset - target_max_blob_size, offset + target_max_blob_size] range
- // then check if blob can be reused via try_reuse_blob func or apply
+ // then check if blob can be reused via can_reuse_blob func or apply
// direct/deferred write (the latter for extents including or higher
// than 'offset' only).
do {
b->get_blob().get_ondisk_length() >= b_off + b_len &&
b->get_blob().is_unused(b_off, b_len) &&
b->get_blob().is_allocated(b_off, b_len)) {
- bufferlist padded;
- _apply_padding(head_pad, tail_pad, bl, padded);
+ _apply_padding(head_pad, tail_pad, bl);
dout(20) << __func__ << " write to unused 0x" << std::hex
<< b_off << "~" << b_len
<< " pad 0x" << head_pad << " + 0x" << tail_pad
<< std::dec << " of mutable " << *b << dendl;
- _buffer_cache_write(txc, b, b_off, padded,
+ _buffer_cache_write(txc, b, b_off, bl,
wctx->buffered ? 0 : Buffer::FLAG_NOCACHE);
if (!g_conf->bluestore_debug_omit_block_device_write) {
op->extents.emplace_back(bluestore_pextent_t(offset, length));
return 0;
});
- op->data = padded;
+ op->data = bl;
} else {
b->get_blob().map_bl(
- b_off, padded,
+ b_off, bl,
[&](uint64_t offset, bufferlist& t) {
bdev->aio_write(offset, t,
&txc->ioc, wctx->buffered);
});
}
}
- b->dirty_blob().calc_csum(b_off, padded);
+ b->dirty_blob().calc_csum(b_off, bl);
dout(20) << __func__ << " lex old " << *ep << dendl;
Extent *le = o->extent_map.set_lextent(c, offset, b_off + head_pad, length,
b,
b_len % chunk_size == 0 &&
b->get_blob().is_allocated(b_off, b_len)) {
- bufferlist padded;
- _apply_padding(head_pad, tail_pad, bl, padded);
+ _apply_padding(head_pad, tail_pad, bl);
dout(20) << __func__ << " reading head 0x" << std::hex << head_read
<< " and tail 0x" << tail_read << std::dec << dendl;
head_bl.append_zero(zlen);
logger->inc(l_bluestore_write_pad_bytes, zlen);
}
- head_bl.claim_append(padded);
- padded.swap(head_bl);
+ bl.claim_prepend(head_bl);
logger->inc(l_bluestore_write_penalty_read_ops);
}
if (tail_read) {
tail_bl.append_zero(zlen);
logger->inc(l_bluestore_write_pad_bytes, zlen);
}
- padded.claim_append(tail_bl);
+ bl.claim_append(tail_bl);
logger->inc(l_bluestore_write_penalty_read_ops);
}
logger->inc(l_bluestore_write_small_pre_read);
bluestore_deferred_op_t *op = _get_deferred_op(txc, o);
op->op = bluestore_deferred_op_t::OP_WRITE;
- _buffer_cache_write(txc, b, b_off, padded,
+ _buffer_cache_write(txc, b, b_off, bl,
wctx->buffered ? 0 : Buffer::FLAG_NOCACHE);
int r = b->get_blob().map(
});
assert(r == 0);
if (b->get_blob().csum_type) {
- b->dirty_blob().calc_csum(b_off, padded);
+ b->dirty_blob().calc_csum(b_off, bl);
}
- op->data.claim(padded);
+ op->data.claim(bl);
dout(20) << __func__ << " deferred write 0x" << std::hex << b_off << "~"
<< b_len << std::dec << " of mutable " << *b
<< " at " << op->extents << dendl;
logger->inc(l_bluestore_write_small_deferred);
return;
}
- //try to reuse blob
- if (b->try_reuse_blob(min_alloc_size,
+ // try to reuse blob if we can
+ if (b->can_reuse_blob(min_alloc_size,
max_bsize,
offset0 - bstart,
&alloc_len)) {
_pad_zeros(&bl, &b_off0, chunk_size);
dout(20) << __func__ << " reuse blob " << *b << std::hex
- << " (" << b_off0 << "~" << bl.length() << ")"
- << " (" << b_off << "~" << length << ")"
+ << " (0x" << b_off0 << "~" << bl.length() << ")"
+ << " (0x" << b_off << "~" << length << ")"
<< std::dec << dendl;
o->extent_map.punch_hole(c, offset, length, &wctx->old_extents);
auto bstart = prev_ep->blob_start();
dout(20) << __func__ << " considering " << *b
<< " bstart 0x" << std::hex << bstart << std::dec << dendl;
- if (b->try_reuse_blob(min_alloc_size,
+ if (b->can_reuse_blob(min_alloc_size,
max_bsize,
offset0 - bstart,
&alloc_len)) {
_pad_zeros(&bl, &b_off0, chunk_size);
dout(20) << __func__ << " reuse blob " << *b << std::hex
- << " (" << b_off0 << "~" << bl.length() << ")"
- << " (" << b_off << "~" << length << ")"
+ << " (0x" << b_off0 << "~" << bl.length() << ")"
+ << " (0x" << b_off << "~" << length << ")"
<< std::dec << dendl;
o->extent_map.punch_hole(c, offset, length, &wctx->old_extents);
auto min_off = offset >= max_bsize ? offset - max_bsize : 0;
// search suitable extent in both forward and reverse direction in
// [offset - target_max_blob_size, offset + target_max_blob_size] range
- // then check if blob can be reused via try_reuse_blob func.
+ // then check if blob can be reused via can_reuse_blob func.
bool any_change;
do {
any_change = false;
if (ep != end && ep->logical_offset < offset + max_bsize) {
if (offset >= ep->blob_start() &&
- ep->blob->try_reuse_blob(min_alloc_size, max_bsize,
+ ep->blob->can_reuse_blob(min_alloc_size, max_bsize,
offset - ep->blob_start(),
&l)) {
b = ep->blob;
b_off = offset - ep->blob_start();
prev_ep = end; // to avoid check below
dout(20) << __func__ << " reuse blob " << *b << std::hex
- << " (" << b_off << "~" << l << ")" << std::dec << dendl;
+ << " (0x" << b_off << "~" << l << ")" << std::dec << dendl;
} else {
++ep;
any_change = true;
}
if (prev_ep != end && prev_ep->logical_offset >= min_off) {
- if (prev_ep->blob->try_reuse_blob(min_alloc_size, max_bsize,
+ if (prev_ep->blob->can_reuse_blob(min_alloc_size, max_bsize,
offset - prev_ep->blob_start(),
&l)) {
b = prev_ep->blob;
b_off = offset - prev_ep->blob_start();
dout(20) << __func__ << " reuse blob " << *b << std::hex
- << " (" << b_off << "~" << l << ")" << std::dec << dendl;
+ << " (0x" << b_off << "~" << l << ")" << std::dec << dendl;
} else if (prev_ep != begin) {
--prev_ep;
any_change = true;
dout(20) << __func__ << " txc " << txc
<< " " << wctx->writes.size() << " blobs"
<< dendl;
-
- uint64_t need = 0;
- auto max_bsize = MAX(wctx->target_blob_size, min_alloc_size);
- for (auto &wi : wctx->writes) {
- need += wi.blob_length;
- }
- int r = alloc->reserve(need);
- if (r < 0) {
- derr << __func__ << " failed to reserve 0x" << std::hex << need << std::dec
- << dendl;
- return r;
+ if (wctx->writes.empty()) {
+ return 0;
}
- uint64_t hint = 0;
CompressorRef c;
double crr = 0;
if (wctx->compress) {
cct->_conf->bluestore_compression_required_ratio,
[&]() {
double val;
- if(coll->pool_opts.get(pool_opts_t::COMPRESSION_REQUIRED_RATIO, &val)) {
+ if (coll->pool_opts.get(pool_opts_t::COMPRESSION_REQUIRED_RATIO, &val)) {
return boost::optional<double>(val);
}
return boost::optional<double>();
csum,
[&]() {
int val;
- if(coll->pool_opts.get(pool_opts_t::CSUM_TYPE, &val)) {
+ if (coll->pool_opts.get(pool_opts_t::CSUM_TYPE, &val)) {
return boost::optional<int>(val);
}
return boost::optional<int>();
}
);
+ // compress (as needed) and calc needed space
+ uint64_t need = 0;
+ auto max_bsize = MAX(wctx->target_blob_size, min_alloc_size);
for (auto& wi : wctx->writes) {
- BlobRef b = wi.b;
- bluestore_blob_t& dblob = b->dirty_blob();
- uint64_t b_off = wi.b_off;
- bufferlist *l = &wi.bl;
- uint64_t final_length = wi.blob_length;
- uint64_t csum_length = wi.blob_length;
- unsigned csum_order = block_size_order;
- bufferlist compressed_bl;
- bool compressed = false;
- if(c && wi.blob_length > min_alloc_size) {
-
+ if (c && wi.blob_length > min_alloc_size) {
utime_t start = ceph_clock_now();
// compress
- assert(b_off == 0);
- assert(wi.blob_length == l->length());
- bluestore_compression_header_t chdr;
- chdr.type = c->get_type();
+ assert(wi.b_off == 0);
+ assert(wi.blob_length == wi.bl.length());
+
// FIXME: memory alignment here is bad
bufferlist t;
-
- r = c->compress(*l, t);
+ int r = c->compress(wi.bl, t);
assert(r == 0);
+ bluestore_compression_header_t chdr;
+ chdr.type = c->get_type();
chdr.length = t.length();
- ::encode(chdr, compressed_bl);
- compressed_bl.claim_append(t);
- uint64_t rawlen = compressed_bl.length();
- uint64_t newlen = P2ROUNDUP(rawlen, min_alloc_size);
- uint64_t want_len_raw = final_length * crr;
+ ::encode(chdr, wi.compressed_bl);
+ wi.compressed_bl.claim_append(t);
+
+ wi.compressed_len = wi.compressed_bl.length();
+ uint64_t newlen = P2ROUNDUP(wi.compressed_len, min_alloc_size);
+ uint64_t want_len_raw = wi.blob_length * crr;
uint64_t want_len = P2ROUNDUP(want_len_raw, min_alloc_size);
- if (newlen <= want_len && newlen < final_length) {
- // Cool. We compressed at least as much as we were hoping to.
- // pad out to min_alloc_size
- compressed_bl.append_zero(newlen - rawlen);
- logger->inc(l_bluestore_write_pad_bytes, newlen - rawlen);
+ if (newlen <= want_len && newlen < wi.blob_length) {
+ // Cool. We compressed at least as much as we were hoping to.
+ // pad out to min_alloc_size
+ wi.compressed_bl.append_zero(newlen - wi.compressed_len);
+ logger->inc(l_bluestore_write_pad_bytes, newlen - wi.compressed_len);
dout(20) << __func__ << std::hex << " compressed 0x" << wi.blob_length
- << " -> 0x" << rawlen << " => 0x" << newlen
+ << " -> 0x" << wi.compressed_len << " => 0x" << newlen
<< " with " << c->get_type()
<< std::dec << dendl;
- txc->statfs_delta.compressed() += rawlen;
- txc->statfs_delta.compressed_original() += l->length();
+ txc->statfs_delta.compressed() += wi.compressed_len;
+ txc->statfs_delta.compressed_original() += wi.blob_length;
txc->statfs_delta.compressed_allocated() += newlen;
- l = &compressed_bl;
- final_length = newlen;
- csum_length = newlen;
- csum_order = ctz(newlen);
- dblob.set_compressed(wi.blob_length, rawlen);
- compressed = true;
- logger->inc(l_bluestore_compress_success_count);
+ logger->inc(l_bluestore_compress_success_count);
+ wi.compressed = true;
+ need += newlen;
} else {
- dout(20) << __func__ << std::hex << " 0x" << l->length()
- << " compressed to 0x" << rawlen << " -> 0x" << newlen
- << " with " << c->get_type()
- << ", which is more than required 0x" << want_len_raw
+ dout(20) << __func__ << std::hex << " 0x" << wi.blob_length
+ << " compressed to 0x" << wi.compressed_len << " -> 0x" << newlen
+ << " with " << c->get_type()
+ << ", which is more than required 0x" << want_len_raw
<< " -> 0x" << want_len
- << ", leaving uncompressed"
- << std::dec << dendl;
- logger->inc(l_bluestore_compress_rejected_count);
+ << ", leaving uncompressed"
+ << std::dec << dendl;
+ logger->inc(l_bluestore_compress_rejected_count);
+ need += wi.blob_length;
}
logger->tinc(l_bluestore_compress_lat,
ceph_clock_now() - start);
+ } else {
+ need += wi.blob_length;
}
- if (!compressed && wi.new_blob) {
- // initialize newly created blob only
- assert(!dblob.has_flag(bluestore_blob_t::FLAG_MUTABLE));
- dblob.set_flag(bluestore_blob_t::FLAG_MUTABLE);
+ }
+ int r = alloc->reserve(need);
+ if (r < 0) {
+ derr << __func__ << " failed to reserve 0x" << std::hex << need << std::dec
+ << dendl;
+ return r;
+ }
+ AllocExtentVector prealloc;
+ prealloc.reserve(2 * wctx->writes.size());;
+ int prealloc_left = 0;
+ prealloc_left = alloc->allocate(
+ need, min_alloc_size, need,
+ 0, &prealloc);
+ assert(prealloc_left == (int64_t)need);
+ dout(20) << __func__ << " prealloc " << prealloc << dendl;
+ auto prealloc_pos = prealloc.begin();
+ for (auto& wi : wctx->writes) {
+ BlobRef b = wi.b;
+ bluestore_blob_t& dblob = b->dirty_blob();
+ uint64_t b_off = wi.b_off;
+ bufferlist *l = &wi.bl;
+ uint64_t final_length = wi.blob_length;
+ uint64_t csum_length = wi.blob_length;
+ unsigned csum_order = block_size_order;
+ if (wi.compressed) {
+ final_length = wi.compressed_bl.length();
+ csum_length = final_length;
+ csum_order = ctz(csum_length);
+ l = &wi.compressed_bl;
+ dblob.set_compressed(wi.blob_length, wi.compressed_len);
+ } else if (wi.new_blob) {
+ // initialize newly created blob only
+ assert(dblob.is_mutable());
if (l->length() != wi.blob_length) {
// hrm, maybe we could do better here, but let's not bother.
dout(20) << __func__ << " forcing csum_order to block_size_order "
<< block_size_order << dendl;
- csum_order = block_size_order;
+ csum_order = block_size_order;
} else {
csum_order = std::min(wctx->csum_order, ctz(l->length()));
}
if ((suggested_boff % (1 << csum_order)) == 0 &&
suggested_boff + final_length <= max_bsize &&
suggested_boff > b_off) {
- dout(20) << __func__ << " forcing blob_offset to "
+ dout(20) << __func__ << " forcing blob_offset to 0x"
<< std::hex << suggested_boff << std::dec << dendl;
assert(suggested_boff >= b_off);
csum_length += suggested_boff - b_off;
b_off = suggested_boff;
}
+ if (csum != Checksummer::CSUM_NONE) {
+ dout(20) << __func__ << " initialize csum setting for new blob " << *b
+ << " csum_type " << Checksummer::get_csum_type_string(csum)
+ << " csum_order " << csum_order
+ << " csum_length 0x" << std::hex << csum_length << std::dec
+ << dendl;
+ dblob.init_csum(csum, csum_order, csum_length);
+ }
}
AllocExtentVector extents;
- extents.reserve(4); // 4 should be (more than) enough for most allocations
- int64_t got = alloc->allocate(final_length, min_alloc_size,
- max_alloc_size.load(),
- hint, &extents);
- assert(got == (int64_t)final_length);
- need -= got;
- txc->statfs_delta.allocated() += got;
+ int64_t left = final_length;
+ while (left > 0) {
+ assert(prealloc_left > 0);
+ if (prealloc_pos->length <= left) {
+ prealloc_left -= prealloc_pos->length;
+ left -= prealloc_pos->length;
+ txc->statfs_delta.allocated() += prealloc_pos->length;
+ extents.push_back(*prealloc_pos);
+ ++prealloc_pos;
+ } else {
+ extents.emplace_back(prealloc_pos->offset, left);
+ prealloc_pos->offset += left;
+ prealloc_pos->length -= left;
+ prealloc_left -= left;
+ txc->statfs_delta.allocated() += left;
+ left = 0;
+ break;
+ }
+ }
for (auto& p : extents) {
- bluestore_pextent_t e = bluestore_pextent_t(p);
- txc->allocated.insert(e.offset, e.length);
- hint = p.end();
+ txc->allocated.insert(p.offset, p.length);
}
dblob.allocated(P2ALIGN(b_off, min_alloc_size), final_length, extents);
- dout(20) << __func__ << " blob " << *b
- << " csum_type " << Checksummer::get_csum_type_string(csum)
- << " csum_order " << csum_order
- << " csum_length 0x" << std::hex << csum_length << std::dec
- << dendl;
-
- if (csum != Checksummer::CSUM_NONE) {
- if (!dblob.has_csum()) {
- dblob.init_csum(csum, csum_order, csum_length);
- }
+ dout(20) << __func__ << " blob " << *b << dendl;
+ if (dblob.has_csum()) {
dblob.calc_csum(b_off, *l);
}
+
if (wi.mark_unused) {
auto b_end = b_off + wi.bl.length();
if (b_off) {
}
}
}
- if (need > 0) {
- alloc->unreserve(need);
- }
+ assert(prealloc_pos == prealloc.end());
+ assert(prealloc_left == 0);
return 0;
}
TransContext *txc,
CollectionRef& c,
OnodeRef o,
- WriteContext *wctx)
+ WriteContext *wctx,
+ set<SharedBlob*> *maybe_unshared_blobs)
{
auto oep = wctx->old_extents.begin();
while (oep != wctx->old_extents.end()) {
PExtentVector final;
c->load_shared_blob(b->shared_blob);
for (auto e : r) {
- b->shared_blob->put_ref(e.offset, e.length, &final);
+ b->shared_blob->put_ref(
+ e.offset, e.length, &final,
+ b->is_referenced() ? nullptr : maybe_unshared_blobs);
}
dout(20) << __func__ << " shared_blob release " << final
<< " from " << *b->shared_blob << dendl;
}
}
-int BlueStore::_do_write(
- TransContext *txc,
- CollectionRef& c,
- OnodeRef o,
- uint64_t offset,
- uint64_t length,
- bufferlist& bl,
- uint32_t fadvise_flags)
+void BlueStore::_choose_write_options(
+ CollectionRef& c,
+ OnodeRef o,
+ uint32_t fadvise_flags,
+ WriteContext *wctx)
{
- int r = 0;
-
- dout(20) << __func__
- << " " << o->oid
- << " 0x" << std::hex << offset << "~" << length
- << " - have 0x" << o->onode.size
- << " (" << std::dec << o->onode.size << ")"
- << " bytes"
- << " fadvise_flags 0x" << std::hex << fadvise_flags << std::dec
- << dendl;
- _dump_onode(o);
-
- if (length == 0) {
- return 0;
- }
-
- uint64_t end = offset + length;
- bool was_gc = false;
- GarbageCollector gc(c->store->cct);
- int64_t benefit;
- auto dirty_start = offset;
- auto dirty_end = offset + length;
-
- WriteContext wctx, wctx_gc;
if (fadvise_flags & CEPH_OSD_OP_FLAG_FADVISE_WILLNEED) {
dout(20) << __func__ << " will do buffered write" << dendl;
- wctx.buffered = true;
+ wctx->buffered = true;
} else if (cct->_conf->bluestore_default_buffered_write &&
(fadvise_flags & (CEPH_OSD_OP_FLAG_FADVISE_DONTNEED |
CEPH_OSD_OP_FLAG_FADVISE_NOCACHE)) == 0) {
dout(20) << __func__ << " defaulting to buffered write" << dendl;
- wctx.buffered = true;
+ wctx->buffered = true;
}
- // FIXME: Using the MAX of the block_size_order and preferred_csum_order
- // results in poor small random read performance when data was initially
- // written out in large chunks. Reverting to previous behavior for now.
- wctx.csum_order = block_size_order;
+ // apply basic csum block size
+ wctx->csum_order = block_size_order;
// compression parameters
unsigned alloc_hints = o->onode.alloc_hint_flags;
auto cm = select_option(
"compression_mode",
- comp_mode.load(),
+ comp_mode.load(),
[&]() {
string val;
if(c->pool_opts.get(pool_opts_t::COMPRESSION_MODE, &val)) {
- return boost::optional<Compressor::CompressionMode>(Compressor::get_comp_mode_type(val));
+ return boost::optional<Compressor::CompressionMode>(
+ Compressor::get_comp_mode_type(val));
}
return boost::optional<Compressor::CompressionMode>();
}
);
- wctx.compress = (cm != Compressor::COMP_NONE) &&
+
+ wctx->compress = (cm != Compressor::COMP_NONE) &&
((cm == Compressor::COMP_FORCE) ||
(cm == Compressor::COMP_AGGRESSIVE &&
(alloc_hints & CEPH_OSD_ALLOC_HINT_FLAG_INCOMPRESSIBLE) == 0) ||
if ((alloc_hints & CEPH_OSD_ALLOC_HINT_FLAG_SEQUENTIAL_READ) &&
(alloc_hints & CEPH_OSD_ALLOC_HINT_FLAG_RANDOM_READ) == 0 &&
- (alloc_hints & (CEPH_OSD_ALLOC_HINT_FLAG_IMMUTABLE|
- CEPH_OSD_ALLOC_HINT_FLAG_APPEND_ONLY)) &&
+ (alloc_hints & (CEPH_OSD_ALLOC_HINT_FLAG_IMMUTABLE |
+ CEPH_OSD_ALLOC_HINT_FLAG_APPEND_ONLY)) &&
(alloc_hints & CEPH_OSD_ALLOC_HINT_FLAG_RANDOM_WRITE) == 0) {
+
dout(20) << __func__ << " will prefer large blob and csum sizes" << dendl;
- auto order = min_alloc_size_order.load();
+
if (o->onode.expected_write_size) {
- wctx.csum_order = std::max(order,
- (uint8_t)ctz(o->onode.expected_write_size));
+ wctx->csum_order = std::max(min_alloc_size_order,
+ (uint8_t)ctz(o->onode.expected_write_size));
} else {
- wctx.csum_order = order;
+ wctx->csum_order = min_alloc_size_order;
}
- if (wctx.compress) {
- wctx.target_blob_size = select_option(
+ if (wctx->compress) {
+ wctx->target_blob_size = select_option(
"compression_max_blob_size",
- comp_max_blob_size.load(),
+ comp_max_blob_size.load(),
[&]() {
int val;
if(c->pool_opts.get(pool_opts_t::COMPRESSION_MAX_BLOB_SIZE, &val)) {
);
}
} else {
- if (wctx.compress) {
- wctx.target_blob_size = select_option(
+ if (wctx->compress) {
+ wctx->target_blob_size = select_option(
"compression_min_blob_size",
- comp_min_blob_size.load(),
+ comp_min_blob_size.load(),
[&]() {
int val;
if(c->pool_opts.get(pool_opts_t::COMPRESSION_MIN_BLOB_SIZE, &val)) {
);
}
}
+
uint64_t max_bsize = max_blob_size.load();
- if (wctx.target_blob_size == 0 || wctx.target_blob_size > max_bsize) {
- wctx.target_blob_size = max_bsize;
+ if (wctx->target_blob_size == 0 || wctx->target_blob_size > max_bsize) {
+ wctx->target_blob_size = max_bsize;
}
+
// set the min blob size floor at 2x the min_alloc_size, or else we
// won't be able to allocate a smaller extent for the compressed
// data.
- if (wctx.compress &&
- wctx.target_blob_size < min_alloc_size * 2) {
- wctx.target_blob_size = min_alloc_size * 2;
+ if (wctx->compress &&
+ wctx->target_blob_size < min_alloc_size * 2) {
+ wctx->target_blob_size = min_alloc_size * 2;
}
+
+ dout(20) << __func__ << " prefer csum_order " << wctx->csum_order
+ << " target_blob_size 0x" << std::hex << wctx->target_blob_size
+ << std::dec << dendl;
+}
+
+int BlueStore::_do_gc(
+ TransContext *txc,
+ CollectionRef& c,
+ OnodeRef o,
+ const GarbageCollector& gc,
+ const WriteContext& wctx,
+ uint64_t *dirty_start,
+ uint64_t *dirty_end)
+{
+ auto& extents_to_collect = gc.get_extents_to_collect();
+
+ WriteContext wctx_gc;
wctx_gc.fork(wctx); // make a clone for garbage collection
- dout(20) << __func__ << " prefer csum_order " << wctx.csum_order
- << " target_blob_size 0x" << std::hex << wctx.target_blob_size
- << std::dec << dendl;
+ for (auto it = extents_to_collect.begin();
+ it != extents_to_collect.end();
+ ++it) {
+ bufferlist bl;
+ int r = _do_read(c.get(), o, it->offset, it->length, bl, 0);
+ assert(r == (int)it->length);
+
+ o->extent_map.fault_range(db, it->offset, it->length);
+ _do_write_data(txc, c, o, it->offset, it->length, bl, &wctx_gc);
+ logger->inc(l_bluestore_gc_merged, it->length);
+
+ if (*dirty_start > it->offset) {
+ *dirty_start = it->offset;
+ }
+
+ if (*dirty_end < it->offset + it->length) {
+ *dirty_end = it->offset + it->length;
+ }
+ }
+
+ dout(30) << __func__ << " alloc write" << dendl;
+ int r = _do_alloc_write(txc, c, o, &wctx_gc);
+ if (r < 0) {
+ derr << __func__ << " _do_alloc_write failed with " << cpp_strerror(r)
+ << dendl;
+ return r;
+ }
+
+ _wctx_finish(txc, c, o, &wctx_gc);
+ return 0;
+}
+
+int BlueStore::_do_write(
+ TransContext *txc,
+ CollectionRef& c,
+ OnodeRef o,
+ uint64_t offset,
+ uint64_t length,
+ bufferlist& bl,
+ uint32_t fadvise_flags)
+{
+ int r = 0;
+
+ dout(20) << __func__
+ << " " << o->oid
+ << " 0x" << std::hex << offset << "~" << length
+ << " - have 0x" << o->onode.size
+ << " (" << std::dec << o->onode.size << ")"
+ << " bytes"
+ << " fadvise_flags 0x" << std::hex << fadvise_flags << std::dec
+ << dendl;
+ _dump_onode(o);
+
+ if (length == 0) {
+ return 0;
+ }
+
+ uint64_t end = offset + length;
+
+ GarbageCollector gc(c->store->cct);
+ int64_t benefit;
+ auto dirty_start = offset;
+ auto dirty_end = end;
+
+ WriteContext wctx;
+ _choose_write_options(c, o, fadvise_flags, &wctx);
o->extent_map.fault_range(db, offset, length);
_do_write_data(txc, c, o, offset, length, bl, &wctx);
-
r = _do_alloc_write(txc, c, o, &wctx);
if (r < 0) {
derr << __func__ << " _do_alloc_write failed with " << cpp_strerror(r)
goto out;
}
+ // NB: _wctx_finish() will empty old_extents
+ // so we must do gc estimation before that
benefit = gc.estimate(offset,
- length,
- o->extent_map,
- wctx.old_extents,
- min_alloc_size);
+ length,
+ o->extent_map,
+ wctx.old_extents,
+ min_alloc_size);
_wctx_finish(txc, c, o, &wctx);
if (end > o->onode.size) {
dout(20) << __func__ << " extending size to 0x" << std::hex << end
- << std::dec << dendl;
+ << std::dec << dendl;
o->onode.size = end;
}
if (benefit >= g_conf->bluestore_gc_enable_total_threshold) {
- dout(20) << __func__ << " perform garbage collection, expected benefit = "
- << benefit << " AUs" << dendl;
- auto& extents_to_collect = gc.get_extents_to_collect();
- for (auto it = extents_to_collect.begin();
- it != extents_to_collect.end();
- ++it) {
- bufferlist bl;
- int r = _do_read(c.get(), o, it->offset, it->length, bl, 0);
- assert(r == (int)it->length);
- o->extent_map.fault_range(db, it->offset, it->length);
- _do_write_data(txc, c, o, it->offset, it->length, bl, &wctx_gc);
- logger->inc(l_bluestore_gc_merged, it->length);
- was_gc = true;
- if (dirty_start > it->offset) {
- dirty_start = it->offset;
- }
- if (dirty_end < it->offset + it->length) {
- dirty_end = it->offset + it->length;
+ if (!gc.get_extents_to_collect().empty()) {
+ dout(20) << __func__ << " perform garbage collection, "
+ << "expected benefit = " << benefit << " AUs" << dendl;
+ r = _do_gc(txc, c, o, gc, wctx, &dirty_start, &dirty_end);
+ if (r < 0) {
+ derr << __func__ << " _do_gc failed with " << cpp_strerror(r)
+ << dendl;
+ goto out;
}
}
}
- if (was_gc) {
- dout(30) << __func__ << " alloc write for GC" << dendl;
- r = _do_alloc_write(txc, c, o, &wctx_gc);
- if (r < 0) {
- derr << __func__ << " _do_alloc_write(gc) failed with " << cpp_strerror(r)
- << dendl;
- goto out;
- }
- _wctx_finish(txc, c, o, &wctx_gc);
- }
o->extent_map.compress_extent_map(dirty_start, dirty_end - dirty_start);
- o->extent_map.dirty_range(txc->t, dirty_start, dirty_end - dirty_start);
+ o->extent_map.dirty_range(dirty_start, dirty_end - dirty_start);
+
r = 0;
out:
int BlueStore::_write(TransContext *txc,
CollectionRef& c,
OnodeRef& o,
- uint64_t offset, size_t length,
- bufferlist& bl,
- uint32_t fadvise_flags)
+ uint64_t offset, size_t length,
+ bufferlist& bl,
+ uint32_t fadvise_flags)
{
dout(15) << __func__ << " " << c->cid << " " << o->oid
<< " 0x" << std::hex << offset << "~" << length << std::dec
<< dendl;
- o->exists = true;
- _assign_nid(txc, o);
- int r = _do_write(txc, c, o, offset, length, bl, fadvise_flags);
- txc->write_onode(o);
-
+ int r = 0;
+ if (offset + length >= OBJECT_MAX_SIZE) {
+ r = -E2BIG;
+ } else {
+ _assign_nid(txc, o);
+ r = _do_write(txc, c, o, offset, length, bl, fadvise_flags);
+ txc->write_onode(o);
+ }
dout(10) << __func__ << " " << c->cid << " " << o->oid
<< " 0x" << std::hex << offset << "~" << length << std::dec
<< " = " << r << dendl;
dout(15) << __func__ << " " << c->cid << " " << o->oid
<< " 0x" << std::hex << offset << "~" << length << std::dec
<< dendl;
- o->exists = true;
- _assign_nid(txc, o);
- int r = _do_zero(txc, c, o, offset, length);
+ int r = 0;
+ if (offset + length >= OBJECT_MAX_SIZE) {
+ r = -E2BIG;
+ } else {
+ _assign_nid(txc, o);
+ r = _do_zero(txc, c, o, offset, length);
+ }
dout(10) << __func__ << " " << c->cid << " " << o->oid
<< " 0x" << std::hex << offset << "~" << length << std::dec
<< " = " << r << dendl;
WriteContext wctx;
o->extent_map.fault_range(db, offset, length);
o->extent_map.punch_hole(c, offset, length, &wctx.old_extents);
- o->extent_map.dirty_range(txc->t, offset, length);
+ o->extent_map.dirty_range(offset, length);
_wctx_finish(txc, c, o, &wctx);
- if (offset + length > o->onode.size) {
+ if (length > 0 && offset + length > o->onode.size) {
o->onode.size = offset + length;
dout(20) << __func__ << " extending size to " << offset + length
<< dendl;
}
void BlueStore::_do_truncate(
- TransContext *txc, CollectionRef& c, OnodeRef o, uint64_t offset)
+ TransContext *txc, CollectionRef& c, OnodeRef o, uint64_t offset,
+ set<SharedBlob*> *maybe_unshared_blobs)
{
dout(15) << __func__ << " " << c->cid << " " << o->oid
<< " 0x" << std::hex << offset << std::dec << dendl;
_dump_onode(o, 30);
if (offset == o->onode.size)
- return ;
+ return;
if (offset < o->onode.size) {
WriteContext wctx;
uint64_t length = o->onode.size - offset;
o->extent_map.fault_range(db, offset, length);
o->extent_map.punch_hole(c, offset, length, &wctx.old_extents);
- o->extent_map.dirty_range(txc->t, offset, length);
- _wctx_finish(txc, c, o, &wctx);
+ o->extent_map.dirty_range(offset, length);
+ _wctx_finish(txc, c, o, &wctx, maybe_unshared_blobs);
// if we have shards past EOF, ask for a reshard
if (!o->onode.extent_map_shards.empty() &&
txc->write_onode(o);
}
-void BlueStore::_truncate(TransContext *txc,
+int BlueStore::_truncate(TransContext *txc,
CollectionRef& c,
OnodeRef& o,
uint64_t offset)
dout(15) << __func__ << " " << c->cid << " " << o->oid
<< " 0x" << std::hex << offset << std::dec
<< dendl;
- _do_truncate(txc, c, o, offset);
+ int r = 0;
+ if (offset >= OBJECT_MAX_SIZE) {
+ r = -E2BIG;
+ } else {
+ _do_truncate(txc, c, o, offset);
+ }
+ dout(10) << __func__ << " " << c->cid << " " << o->oid
+ << " 0x" << std::hex << offset << std::dec
+ << " = " << r << dendl;
+ return r;
}
int BlueStore::_do_remove(
CollectionRef& c,
OnodeRef o)
{
- _do_truncate(txc, c, o, 0);
+ set<SharedBlob*> maybe_unshared_blobs;
+ bool is_gen = !o->oid.is_no_gen();
+ _do_truncate(txc, c, o, 0, is_gen ? &maybe_unshared_blobs : nullptr);
if (o->onode.has_omap()) {
o->flush();
_do_omap_clear(txc, o->onode.nid);
o->extent_map.clear();
o->onode = bluestore_onode_t();
_debug_obj_on_delete(o->oid);
+
+ if (!is_gen || maybe_unshared_blobs.empty()) {
+ return 0;
+ }
+
+ // see if we can unshare blobs still referenced by the head
+ dout(10) << __func__ << " gen and maybe_unshared_blobs "
+ << maybe_unshared_blobs << dendl;
+ ghobject_t nogen = o->oid;
+ nogen.generation = ghobject_t::NO_GEN;
+ OnodeRef h = c->onode_map.lookup(nogen);
+
+ if (!h || !h->exists) {
+ return 0;
+ }
+
+ dout(20) << __func__ << " checking for unshareable blobs on " << h
+ << " " << h->oid << dendl;
+ map<SharedBlob*,bluestore_extent_ref_map_t> expect;
+ for (auto& e : h->extent_map.extent_map) {
+ const bluestore_blob_t& b = e.blob->get_blob();
+ SharedBlob *sb = e.blob->shared_blob.get();
+ if (b.is_shared() &&
+ sb->loaded &&
+ maybe_unshared_blobs.count(sb)) {
+ if (b.is_compressed()) {
+ expect[sb].get(0, b.get_ondisk_length());
+ } else {
+ b.map(e.blob_offset, e.length, [&](uint64_t off, uint64_t len) {
+ expect[sb].get(off, len);
+ return 0;
+ });
+ }
+ }
+ }
+
+ vector<SharedBlob*> unshared_blobs;
+ unshared_blobs.reserve(maybe_unshared_blobs.size());
+ for (auto& p : expect) {
+ dout(20) << " ? " << *p.first << " vs " << p.second << dendl;
+ if (p.first->persistent->ref_map == p.second) {
+ SharedBlob *sb = p.first;
+ dout(20) << __func__ << " unsharing " << *sb << dendl;
+ unshared_blobs.push_back(sb);
+ txc->unshare_blob(sb);
+ uint64_t sbid = c->make_blob_unshared(sb);
+ string key;
+ get_shared_blob_key(sbid, &key);
+ txc->t->rmkey(PREFIX_SHARED_BLOB, key);
+ }
+ }
+
+ if (unshared_blobs.empty()) {
+ return 0;
+ }
+
+ for (auto& e : h->extent_map.extent_map) {
+ const bluestore_blob_t& b = e.blob->get_blob();
+ SharedBlob *sb = e.blob->shared_blob.get();
+ if (b.is_shared() &&
+ std::find(unshared_blobs.begin(), unshared_blobs.end(),
+ sb) != unshared_blobs.end()) {
+ dout(20) << __func__ << " unsharing " << e << dendl;
+ bluestore_blob_t& blob = e.blob->dirty_blob();
+ blob.clear_flag(bluestore_blob_t::FLAG_SHARED);
+ h->extent_map.dirty_range(e.logical_offset, 1);
+ }
+ }
+ txc->write_onode(h);
+
return 0;
}
<< " " << name << " (" << val.length() << " bytes)"
<< dendl;
int r = 0;
- if (val.is_partial())
- o->onode.attrs[name.c_str()] = bufferptr(val.c_str(), val.length());
- else
- o->onode.attrs[name.c_str()] = val;
+ if (val.is_partial()) {
+ auto& b = o->onode.attrs[name.c_str()] = bufferptr(val.c_str(),
+ val.length());
+ b.reassign_to_mempool(mempool::mempool_bluestore_cache_other);
+ } else {
+ auto& b = o->onode.attrs[name.c_str()] = val;
+ b.reassign_to_mempool(mempool::mempool_bluestore_cache_other);
+ }
txc->write_onode(o);
dout(10) << __func__ << " " << c->cid << " " << o->oid
<< " " << name << " (" << val.length() << " bytes)"
int r = 0;
for (map<string,bufferptr>::const_iterator p = aset.begin();
p != aset.end(); ++p) {
- if (p->second.is_partial())
- o->onode.attrs[p->first.c_str()] =
+ if (p->second.is_partial()) {
+ auto& b = o->onode.attrs[p->first.c_str()] =
bufferptr(p->second.c_str(), p->second.length());
- else
- o->onode.attrs[p->first.c_str()] = p->second;
+ b.reassign_to_mempool(mempool::mempool_bluestore_cache_other);
+ } else {
+ auto& b = o->onode.attrs[p->first.c_str()] = p->second;
+ b.reassign_to_mempool(mempool::mempool_bluestore_cache_other);
+ }
}
txc->write_onode(o);
dout(10) << __func__ << " " << c->cid << " " << o->oid
return -EINVAL;
}
- newo->exists = true;
_assign_nid(txc, newo);
// clone data
CollectionRef& c,
OnodeRef& oldo,
OnodeRef& newo,
- uint64_t srcoff, uint64_t length, uint64_t dstoff)
+ uint64_t srcoff,
+ uint64_t length,
+ uint64_t dstoff)
{
dout(15) << __func__ << " " << c->cid << " " << oldo->oid << " -> "
<< newo->oid
e.blob->last_encoded_id = -1;
}
int n = 0;
- bool dirtied_oldo = false;
uint64_t end = srcoff + length;
+ uint32_t dirty_range_begin = 0;
+ uint32_t dirty_range_end = 0;
+ bool src_dirty = false;
for (auto ep = oldo->extent_map.seek_lextent(srcoff);
ep != oldo->extent_map.extent_map.end();
++ep) {
// make sure it is shared
if (!blob.is_shared()) {
c->make_blob_shared(_assign_blobid(txc), e.blob);
- dirtied_oldo = true; // fixme: overkill
+ if (!src_dirty) {
+ src_dirty = true;
+ dirty_range_begin = e.logical_offset;
+ }
+ assert(e.logical_end() > 0);
+ // -1 to exclude next potential shard
+ dirty_range_end = e.logical_end() - 1;
} else {
c->load_shared_blob(e.blob->shared_blob);
}
dout(20) << __func__ << " dst " << *ne << dendl;
++n;
}
- if (dirtied_oldo) {
- oldo->extent_map.dirty_range(txc->t, srcoff, length); // overkill
+ if (src_dirty) {
+ oldo->extent_map.dirty_range(dirty_range_begin,
+ dirty_range_end - dirty_range_begin);
txc->write_onode(oldo);
}
txc->write_onode(newo);
if (dstoff + length > newo->onode.size) {
newo->onode.size = dstoff + length;
}
- newo->extent_map.dirty_range(txc->t, dstoff, length);
+ newo->extent_map.dirty_range(dstoff, length);
_dump_onode(oldo);
_dump_onode(newo);
return 0;
<< " to offset 0x" << dstoff << std::dec << dendl;
int r = 0;
+ if (srcoff + length >= OBJECT_MAX_SIZE ||
+ dstoff + length >= OBJECT_MAX_SIZE) {
+ r = -E2BIG;
+ goto out;
+ }
if (srcoff + length > oldo->onode.size) {
r = -EINVAL;
goto out;
}
- newo->exists = true;
_assign_nid(txc, newo);
if (length > 0) {
<< new_oid << dendl;
int r;
ghobject_t old_oid = oldo->oid;
- mempool::bluestore_meta_other::string new_okey;
+ mempool::bluestore_cache_other::string new_okey;
if (newo) {
if (newo->exists) {
}
-void BlueStore::flush_cache()
+void BlueStore::_flush_cache()
{
dout(10) << __func__ << dendl;
for (auto i : cache_shards) {
i->trim_all();
+ assert(i->empty());
}
for (auto& p : coll_map) {
+ if (!p.second->onode_map.empty()) {
+ derr << __func__ << "stray onodes on " << p.first << dendl;
+ p.second->onode_map.dump(cct, 0);
+ }
+ if (!p.second->shared_blob_set.empty()) {
+ derr << __func__ << " stray shared blobs on " << p.first << dendl;
+ p.second->shared_blob_set.dump(cct, 0);
+ }
assert(p.second->onode_map.empty());
assert(p.second->shared_blob_set.empty());
}
coll_map.clear();
}
+// For external caller.
+// We use a best-effort policy instead, e.g.,
+// we don't care if there are still some pinned onodes/data in the cache
+// after this command is completed.
+void BlueStore::flush_cache()
+{
+ dout(10) << __func__ << dendl;
+ for (auto i : cache_shards) {
+ i->trim_all();
+ }
+}
+
void BlueStore::_apply_padding(uint64_t head_pad,
uint64_t tail_pad,
- bufferlist& bl,
bufferlist& padded)
{
- padded = bl;
if (head_pad) {
- bufferlist z;
- z.append_zero(head_pad);
- z.claim_append(padded);
- padded.claim(z);
+ padded.prepend_zero(head_pad);
}
if (tail_pad) {
padded.append_zero(tail_pad);