delete hook;
hook = nullptr;
} else {
- r = admin_socket->register_command("bluestore bluefs stats",
+ r = admin_socket->register_command("bluefs stats",
hook,
"Dump internal statistics for bluefs."
"");
// init log
FileRef log_file = ceph::make_ref<File>();
log_file->fnode.ino = 1;
- log_file->vselector_hint = vselector->get_hint_by_device(BDEV_WAL);
+ log_file->vselector_hint = vselector->get_hint_for_log();
int r = _allocate(
vselector->select_prefer_bdev(log_file->vselector_hint),
cct->_conf->bluefs_max_log_runway,
if (!noop) {
log_file->fnode = super.log_fnode;
log_file->vselector_hint =
- vselector->get_hint_by_device(BDEV_WAL);
+ vselector->get_hint_for_log();
} else {
// do not use fnode from superblock in 'noop' mode - log_file's one should
// be fine and up-to-date
{
int r = _read(log_reader, &log_reader->buf, read_pos, super.block_size,
&bl, NULL);
- ceph_assert(r == (int)super.block_size);
+ if (r != (int)super.block_size && cct->_conf->bluefs_replay_recovery) {
+ r += do_replay_recovery_read(log_reader, pos, read_pos + r, super.block_size - r, &bl);
+ }
+ assert(r == (int)super.block_size);
read_pos += r;
}
uint64_t more = 0;
bufferlist t;
int r = _read(log_reader, &log_reader->buf, read_pos, more, &t, NULL);
if (r < (int)more) {
- derr << __func__ << " 0x" << std::hex << pos
- << ": stop: len is 0x" << bl.length() + more << std::dec
- << ", which is past eof" << dendl;
- break;
+ dout(10) << __func__ << " 0x" << std::hex << pos
+ << ": stop: len is 0x" << bl.length() + more << std::dec
+ << ", which is past eof" << dendl;
+ if (cct->_conf->bluefs_replay_recovery) {
+ //try to search for more data
+ r += do_replay_recovery_read(log_reader, pos, read_pos + r, more - r, &t);
+ if (r < (int)more) {
+ //in normal mode we must read r==more, for recovery it is too strict
+ break;
+ }
+ }
}
ceph_assert(r == (int)more);
bl.claim_append(t);
s_lock.unlock();
uint64_t x_off = 0;
auto p = h->file->fnode.seek(off, &x_off);
+ ceph_assert(p != h->file->fnode.extents.end());
uint64_t l = std::min(p->length - x_off, len);
dout(20) << __func__ << " read random 0x"
<< std::hex << x_off << "~" << l << std::dec
buf->bl_off = off & super.block_mask();
uint64_t x_off = 0;
auto p = h->file->fnode.seek(buf->bl_off, &x_off);
+ if (p == h->file->fnode.extents.end()) {
+ dout(5) << __func__ << " reading less then required "
+ << ret << "<" << ret + len << dendl;
+ break;
+ }
+
uint64_t want = round_up_to(len + (off & ~super.block_mask()),
super.block_size);
want = std::max(want, buf->max_prefetch);
void BlueFS::compact_log()
{
- std::unique_lock l(lock);
- if (cct->_conf->bluefs_compact_log_sync) {
- _compact_log_sync();
- } else {
- _compact_log_async(l);
+ std::unique_lock<ceph::mutex> l(lock);
+ if (!cct->_conf->bluefs_replay_recovery_disable_compact) {
+ if (cct->_conf->bluefs_compact_log_sync) {
+ _compact_log_sync();
+ } else {
+ _compact_log_async(l);
+ }
}
}
// allocate some more space (before we run out)?
int64_t runway = log_writer->file->fnode.get_allocated() -
log_writer->get_effective_write_pos();
+ bool just_expanded_log = false;
if (runway < (int64_t)cct->_conf->bluefs_min_log_runway) {
dout(10) << __func__ << " allocating more log runway (0x"
<< std::hex << runway << std::dec << " remaining)" << dendl;
ceph_assert(r == 0);
vselector->add_usage(log_writer->file->vselector_hint, log_writer->file->fnode);
log_t.op_file_update(log_writer->file->fnode);
+ just_expanded_log = true;
}
bufferlist bl;
logger->inc(l_bluefs_logged_bytes, bl.length());
+ if (just_expanded_log) {
+ ceph_assert(bl.length() <= runway); // if we write this, we will have an unrecoverable data loss
+ }
+
log_writer->append(bl);
log_t.clear();
}
#endif
-int BlueFS::_flush(FileWriter *h, bool force)
+int BlueFS::_flush(FileWriter *h, bool force, std::unique_lock<ceph::mutex>& l)
+{
+ bool flushed = false;
+ int r = _flush(h, force, &flushed);
+ if (r == 0 && flushed) {
+ _maybe_compact_log(l);
+ }
+ return r;
+}
+
+int BlueFS::_flush(FileWriter *h, bool force, bool *flushed)
{
h->buffer_appender.flush();
uint64_t length = h->buffer.length();
uint64_t offset = h->pos;
+ if (flushed) {
+ *flushed = false;
+ }
if (!force &&
length < cct->_conf->bluefs_min_flush_size) {
dout(10) << __func__ << " " << h << " ignoring, length " << length
<< std::hex << offset << "~" << length << std::dec
<< " to " << h->file->fnode << dendl;
ceph_assert(h->pos <= h->file->fnode.size);
- return _flush_range(h, offset, length);
+ int r = _flush_range(h, offset, length);
+ if (flushed) {
+ *flushed = true;
+ }
+ return r;
}
int BlueFS::_truncate(FileWriter *h, uint64_t offset)
void BlueFS::sync_metadata(bool avoid_compact)
{
- std::unique_lock l(lock);
+ std::unique_lock<ceph::mutex> l(lock);
if (log_t.empty() && dirty_files.empty()) {
dout(10) << __func__ << " - no pending log events" << dendl;
} else {
dout(10) << __func__ << " done in " << (ceph_clock_now() - start) << dendl;
}
- if (!avoid_compact && _should_compact_log()) {
+ if (!avoid_compact) {
+ _maybe_compact_log(l);
+ }
+}
+
+void BlueFS::_maybe_compact_log(std::unique_lock<ceph::mutex>& l)
+{
+ if (!cct->_conf->bluefs_replay_recovery_disable_compact &&
+ _should_compact_log()) {
if (cct->_conf->bluefs_compact_log_sync) {
_compact_log_sync();
} else {
FileRef file;
bool create = false;
+ bool truncate = false;
map<string,FileRef>::iterator q = dir->file_map.find(filename);
if (q == dir->file_map.end()) {
if (overwrite) {
for (auto& p : file->fnode.extents) {
pending_release[p.bdev].insert(p.offset, p.length);
}
+ truncate = true;
file->fnode.clear_extents();
}
file->fnode.mtime = ceph_clock_now();
file->vselector_hint = vselector->get_hint_by_dir(dirname);
+ if (create || truncate) {
+ vselector->add_usage(file->vselector_hint, file->fnode); // update file count
+ }
dout(20) << __func__ << " mapping " << dirname << "/" << filename
<< " vsel_hint " << file->vselector_hint
return bdev[BDEV_SLOW]->is_rotational();
}
+/*
+ Algorithm.
+ do_replay_recovery_read is used when bluefs log abruptly ends, but it seems that more data should be there.
+ Idea is to search disk for definiton of extents that will be accompanied with bluefs log in future,
+ and try if using it will produce healthy bluefs transaction.
+ We encode already known bluefs log extents and search disk for these bytes.
+ When we find it, we decode following bytes as extent.
+ We read that whole extent and then check if merged with existing log part gives a proper bluefs transaction.
+ */
+int BlueFS::do_replay_recovery_read(FileReader *log_reader,
+ size_t replay_pos,
+ size_t read_offset,
+ size_t read_len,
+ bufferlist* bl) {
+ dout(1) << __func__ << " replay_pos=0x" << std::hex << replay_pos <<
+ " needs 0x" << read_offset << "~" << read_len << std::dec << dendl;
+
+ bluefs_fnode_t& log_fnode = log_reader->file->fnode;
+ bufferlist bin_extents;
+ ceph::encode(log_fnode.extents, bin_extents);
+ dout(2) << __func__ << " log file encoded extents length = " << bin_extents.length() << dendl;
+
+ // cannot process if too small to effectively search
+ ceph_assert(bin_extents.length() >= 32);
+ bufferlist last_32;
+ last_32.substr_of(bin_extents, bin_extents.length() - 32, 32);
+
+ //read fixed part from replay_pos to end of bluefs_log extents
+ bufferlist fixed;
+ uint64_t e_off = 0;
+ auto e = log_fnode.seek(replay_pos, &e_off);
+ ceph_assert(e != log_fnode.extents.end());
+ int r = bdev[e->bdev]->read(e->offset + e_off, e->length - e_off, &fixed, ioc[e->bdev],
+ cct->_conf->bluefs_buffered_io);
+ ceph_assert(r == 0);
+ //capture dev of last good extent
+ uint8_t last_e_dev = e->bdev;
+ uint64_t last_e_off = e->offset;
+ ++e;
+ while (e != log_fnode.extents.end()) {
+ r = bdev[e->bdev]->read(e->offset, e->length, &fixed, ioc[e->bdev],
+ cct->_conf->bluefs_buffered_io);
+ ceph_assert(r == 0);
+ last_e_dev = e->bdev;
+ ++e;
+ }
+ ceph_assert(replay_pos + fixed.length() == read_offset);
+
+ dout(2) << __func__ << " valid data in log = " << fixed.length() << dendl;
+
+ struct compare {
+ bool operator()(const bluefs_extent_t& a, const bluefs_extent_t& b) const {
+ if (a.bdev < b.bdev) return true;
+ if (a.offset < b.offset) return true;
+ return a.length < b.length;
+ }
+ };
+ std::set<bluefs_extent_t, compare> extents_rejected;
+ for (int dcnt = 0; dcnt < 3; dcnt++) {
+ uint8_t dev = (last_e_dev + dcnt) % MAX_BDEV;
+ if (bdev[dev] == nullptr) continue;
+ dout(2) << __func__ << " processing " << get_device_name(dev) << dendl;
+ interval_set<uint64_t> disk_regions;
+ disk_regions.insert(0, bdev[dev]->get_size());
+ for (auto f : file_map) {
+ auto& e = f.second->fnode.extents;
+ for (auto& p : e) {
+ if (p.bdev == dev) {
+ disk_regions.erase(p.offset, p.length);
+ }
+ }
+ }
+ size_t disk_regions_count = disk_regions.num_intervals();
+ dout(5) << __func__ << " " << disk_regions_count << " regions to scan on " << get_device_name(dev) << dendl;
+
+ auto reg = disk_regions.lower_bound(last_e_off);
+ //for all except first, start from beginning
+ last_e_off = 0;
+ if (reg == disk_regions.end()) {
+ reg = disk_regions.begin();
+ }
+ const uint64_t chunk_size = 4 * 1024 * 1024;
+ const uint64_t page_size = 4096;
+ const uint64_t max_extent_size = 16;
+ uint64_t overlay_size = last_32.length() + max_extent_size;
+ for (size_t i = 0; i < disk_regions_count; reg++, i++) {
+ if (reg == disk_regions.end()) {
+ reg = disk_regions.begin();
+ }
+ uint64_t pos = reg.get_start();
+ uint64_t len = reg.get_len();
+
+ std::unique_ptr<char[]> raw_data_p{new char[page_size + chunk_size]};
+ char* raw_data = raw_data_p.get();
+ memset(raw_data, 0, page_size);
+
+ while (len > last_32.length()) {
+ uint64_t chunk_len = len > chunk_size ? chunk_size : len;
+ dout(5) << __func__ << " read "
+ << get_device_name(dev) << ":0x" << std::hex << pos << "+" << chunk_len << std::dec << dendl;
+ r = bdev[dev]->read_random(pos, chunk_len, raw_data + page_size, cct->_conf->bluefs_buffered_io);
+ ceph_assert(r == 0);
+
+ //search for fixed_last_32
+ char* chunk_b = raw_data + page_size;
+ char* chunk_e = chunk_b + chunk_len;
+
+ char* search_b = chunk_b - overlay_size;
+ char* search_e = chunk_e;
+
+ for (char* sp = search_b; ; sp += last_32.length()) {
+ sp = (char*)memmem(sp, search_e - sp, last_32.c_str(), last_32.length());
+ if (sp == nullptr) {
+ break;
+ }
+
+ char* n = sp + last_32.length();
+ dout(5) << __func__ << " checking location 0x" << std::hex << pos + (n - chunk_b) << std::dec << dendl;
+ bufferlist test;
+ test.append(n, std::min<size_t>(max_extent_size, chunk_e - n));
+ bluefs_extent_t ne;
+ try {
+ bufferlist::const_iterator p = test.begin();
+ ceph::decode(ne, p);
+ } catch (buffer::error& e) {
+ continue;
+ }
+ if (extents_rejected.count(ne) != 0) {
+ dout(5) << __func__ << " extent " << ne << " already refected" <<dendl;
+ continue;
+ }
+ //insert as rejected already. if we succeed, it wouldn't make difference.
+ extents_rejected.insert(ne);
+
+ if (ne.bdev >= MAX_BDEV ||
+ bdev[ne.bdev] == nullptr ||
+ ne.length > 16 * 1024 * 1024 ||
+ (ne.length & 4095) != 0 ||
+ ne.offset + ne.length > bdev[ne.bdev]->get_size() ||
+ (ne.offset & 4095) != 0) {
+ dout(5) << __func__ << " refusing extent " << ne << dendl;
+ continue;
+ }
+ dout(5) << __func__ << " checking extent " << ne << dendl;
+
+ //read candidate extent - whole
+ bufferlist candidate;
+ candidate.append(fixed);
+ r = bdev[ne.bdev]->read(ne.offset, ne.length, &candidate, ioc[ne.bdev],
+ cct->_conf->bluefs_buffered_io);
+ ceph_assert(r == 0);
+
+ //check if transaction & crc is ok
+ bluefs_transaction_t t;
+ try {
+ bufferlist::const_iterator p = candidate.cbegin();
+ decode(t, p);
+ }
+ catch (buffer::error& e) {
+ dout(5) << __func__ << " failed match" << dendl;
+ continue;
+ }
+
+ //success, it seems a probable candidate
+ uint64_t l = std::min<uint64_t>(ne.length, read_len);
+ //trim to required size
+ bufferlist requested_read;
+ requested_read.substr_of(candidate, fixed.length(), l);
+ bl->append(requested_read);
+ dout(5) << __func__ << " successful extension of log " << l << "/" << read_len << dendl;
+ log_fnode.append_extent(ne);
+ log_fnode.recalc_allocated();
+ log_reader->buf.pos += l;
+ return l;
+ }
+ //save overlay for next search
+ memcpy(search_b, chunk_e - overlay_size, overlay_size);
+ pos += chunk_len;
+ len -= chunk_len;
+ }
+ }
+ }
+ return 0;
+}
+
void BlueFS::debug_inject_duplicate_gift(unsigned id,
uint64_t offset,
uint64_t len)
// ===============================================
// OriginalVolumeSelector
-void* OriginalVolumeSelector::get_hint_by_device(uint8_t dev) const {
- return reinterpret_cast<void*>(dev);
+void* OriginalVolumeSelector::get_hint_for_log() const {
+ return reinterpret_cast<void*>(BlueFS::BDEV_WAL);
}
void* OriginalVolumeSelector::get_hint_by_dir(const string& dirname) const {
uint8_t res = BlueFS::BDEV_DB;