#undef dout_prefix
#define dout_prefix *_dout << "journal "
+using std::list;
+using std::map;
+using std::ostream;
+using std::ostringstream;
+using std::pair;
+using std::set;
+using std::string;
+using std::stringstream;
+using std::vector;
+
+using ceph::bufferlist;
+using ceph::bufferptr;
+using ceph::Formatter;
+using ceph::JSONFormatter;
+
const static int64_t ONE_MEG(1 << 20);
const static int CEPH_DIRECTIO_ALIGNMENT(4096);
<< cpp_strerror(err) << dendl;
}
}
- fd = TEMP_FAILURE_RETRY(::open(fn.c_str(), flags, 0644));
+ fd = TEMP_FAILURE_RETRY(::open(fn.c_str(), flags|O_CLOEXEC, 0644));
if (fd < 0) {
int err = errno;
dout(2) << "FileJournal::_open unable to open journal "
int FileJournal::_open_block_device()
{
int64_t bdev_sz = 0;
- int ret = get_block_device_size(fd, &bdev_sz);
+ BlkDev blkdev(fd);
+ int ret = blkdev.get_size(&bdev_sz);
if (ret) {
dout(0) << __func__ << ": failed to read block device size." << dendl;
return -EIO;
block_size = cct->_conf->journal_block_size;
if (cct->_conf->journal_discard) {
- discard = block_device_support_discard(fn.c_str());
+ discard = blkdev.support_discard();
dout(10) << fn << " support discard: " << (int)discard << dendl;
}
<< newsize << " bytes: " << cpp_strerror(err) << dendl;
return -err;
}
-#ifdef HAVE_POSIX_FALLOCATE
- ret = ::posix_fallocate(fd, 0, newsize);
+ ret = ceph_posix_fallocate(fd, 0, newsize);
if (ret) {
derr << "FileJournal::_open_file : unable to preallocation journal to "
<< newsize << " bytes: " << cpp_strerror(ret) << dendl;
return -ret;
}
max_size = newsize;
-#elif defined(__APPLE__)
- fstore_t store;
- store.fst_flags = F_ALLOCATECONTIG;
- store.fst_posmode = F_PEOFPOSMODE;
- store.fst_offset = 0;
- store.fst_length = newsize;
-
- ret = ::fcntl(fd, F_PREALLOCATE, &store);
- if (ret == -1) {
- ret = -errno;
- derr << "FileJournal::_open_file : unable to preallocation journal to "
- << newsize << " bytes: " << cpp_strerror(ret) << dendl;
- return ret;
- }
- max_size = newsize;
-#else
-# error "Journal pre-allocation not supported on platform."
-#endif
}
else {
max_size = oldsize;
for (; (i + write_size) <= (uint64_t)max_size; i += write_size) {
ret = ::pwrite(fd, static_cast<void*>(buf), write_size, i);
if (ret < 0) {
- free(buf);
+ aligned_free(buf);
return -errno;
}
}
if (i < (uint64_t)max_size) {
ret = ::pwrite(fd, static_cast<void*>(buf), max_size - i, i);
if (ret < 0) {
- free(buf);
+ aligned_free(buf);
return -errno;
}
}
- free(buf);
+ aligned_free(buf);
}
{
int ret;
- assert(fd == -1);
+ ceph_assert(fd == -1);
ret = _open(false, false);
if (ret)
return ret;
void *buf = 0;
int64_t needed_space;
int ret;
- buffer::ptr bp;
+ ceph::buffer::ptr bp;
dout(2) << "create " << fn << " fsid " << fsid << dendl;
ret = _open(true, true);
goto free_buf;
}
- needed_space = ((int64_t)cct->_conf->osd_max_write_size) << 20;
+ needed_space = cct->_conf->osd_max_write_size << 20;
needed_space += (2 * sizeof(entry_header_t)) + get_top();
if (header.max_size - header.start < needed_space) {
derr << "FileJournal::create: OSD journal is not large enough to hold "
// This can not be used on an active journal
int FileJournal::peek_fsid(uuid_d& fsid)
{
- assert(fd == -1);
+ ceph_assert(fd == -1);
int r = _open(false, false);
if (r)
return r;
dout(2) << "open " << fn << " fsid " << fsid << " fs_op_seq " << fs_op_seq << dendl;
uint64_t next_seq = fs_op_seq + 1;
+ uint64_t seq = -1;
int err = _open(false);
if (err)
// read header?
err = read_header(&header);
if (err < 0)
- return err;
+ goto out;
// static zeroed buffer for alignment padding
delete [] zero_buf;
if (header.fsid != fsid) {
derr << "FileJournal::open: ondisk fsid " << header.fsid << " doesn't match expected " << fsid
<< ", invalid (someone else's?) journal" << dendl;
- return -EINVAL;
+ err = -EINVAL;
+ goto out;
}
if (header.max_size > max_size) {
dout(2) << "open journal size " << header.max_size << " > current " << max_size << dendl;
- return -EINVAL;
+ err = -EINVAL;
+ goto out;
}
if (header.block_size != block_size) {
dout(2) << "open journal block size " << header.block_size << " != current " << block_size << dendl;
- return -EINVAL;
+ err = -EINVAL;
+ goto out;
}
if (header.max_size % header.block_size) {
dout(2) << "open journal max size " << header.max_size
<< " not a multiple of block size " << header.block_size << dendl;
- return -EINVAL;
+ err = -EINVAL;
+ goto out;
}
if (header.alignment != block_size && directio) {
dout(0) << "open journal alignment " << header.alignment << " does not match block size "
<< block_size << " (required for direct_io journal mode)" << dendl;
- return -EINVAL;
+ err = -EINVAL;
+ goto out;
}
if ((header.alignment % CEPH_DIRECTIO_ALIGNMENT) && directio) {
dout(0) << "open journal alignment " << header.alignment
<< " is not multiple of minimum directio alignment "
<< CEPH_DIRECTIO_ALIGNMENT << " (required for direct_io journal mode)"
<< dendl;
- return -EINVAL;
+ err = -EINVAL;
+ goto out;
}
// looks like a valid header.
// find next entry
read_pos = header.start;
- uint64_t seq = header.start_seq;
-
- // last_committed_seq is 1 before the start of the journal or
- // 0 if the start is 0
- last_committed_seq = seq > 0 ? seq - 1 : seq;
- if (last_committed_seq < fs_op_seq) {
- dout(2) << "open advancing committed_seq " << last_committed_seq
- << " to fs op_seq " << fs_op_seq << dendl;
- last_committed_seq = fs_op_seq;
- }
+ seq = header.start_seq;
while (1) {
bufferlist bl;
}
return 0;
+out:
+ close();
+ return err;
}
void FileJournal::_close(int fd) const
stop_writer();
// close
- assert(writeq_empty());
- assert(!must_write_header);
- assert(fd >= 0);
+ ceph_assert(writeq_empty());
+ ceph_assert(!must_write_header);
+ ceph_assert(fd >= 0);
_close(fd);
fd = -1;
}
{
dout(10) << "_fdump" << dendl;
- assert(fd == -1);
+ ceph_assert(fd == -1);
int err = _open(false, false);
if (err)
return err;
f.dump_unsigned("bl.length", bl.length());
} else {
f.open_array_section("transactions");
- bufferlist::iterator p = bl.begin();
+ auto p = bl.cbegin();
int trans_num = 0;
while (!p.end()) {
ObjectStore::Transaction t(p);
if (!write_stop)
{
{
- Mutex::Locker l(write_lock);
- Mutex::Locker p(writeq_lock);
+ std::lock_guard l{write_lock};
+ std::lock_guard p{writeq_lock};
write_stop = true;
- writeq_cond.Signal();
+ writeq_cond.notify_all();
// Doesn't hurt to signal commit_cond in case thread is waiting there
// and caller didn't use committed_thru() first.
- commit_cond.Signal();
+ commit_cond.notify_all();
}
write_thread.join();
// stop aio completeion thread *after* writer thread has stopped
// and has submitted all of its io
if (aio && !aio_stop) {
- aio_lock.Lock();
+ aio_lock.lock();
aio_stop = true;
- aio_cond.Signal();
- write_finish_cond.Signal();
- aio_lock.Unlock();
+ aio_cond.notify_all();
+ write_finish_cond.notify_all();
+ aio_lock.unlock();
write_finish_thread.join();
}
#endif
dout(10) << "read_header" << dendl;
bufferlist bl;
- buffer::ptr bp = buffer::create_page_aligned(block_size);
+ ceph::buffer::ptr bp = ceph::buffer::create_small_page_aligned(block_size);
char* bpdata = bp.c_str();
int r = ::pread(fd, bpdata, bp.length(), 0);
bl.push_back(std::move(bp));
try {
- bufferlist::iterator p = bl.begin();
- ::decode(*hdr, p);
+ auto p = bl.cbegin();
+ decode(*hdr, p);
}
- catch (buffer::error& e) {
+ catch (ceph::buffer::error& e) {
derr << "read_header error decoding journal header" << dendl;
return -EINVAL;
}
{
bufferlist bl;
{
- Mutex::Locker l(finisher_lock);
+ std::lock_guard l{finisher_lock};
header.committed_up_to = journaled_seq;
}
- ::encode(header, bl);
- bufferptr bp = buffer::create_page_aligned(get_top());
+ encode(header, bl);
+ bufferptr bp = ceph::buffer::create_small_page_aligned(get_top());
// don't use bp.zero() here, because it also invalidates
// crc cache (which is not yet populated anyway)
char* data = bp.c_str();
void FileJournal::write_header_sync()
{
- Mutex::Locker locker(write_lock);
+ std::lock_guard locker{write_lock};
must_write_header = true;
bufferlist bl;
do_write(bl);
if (room >= (header.max_size >> 1) &&
room - size < (header.max_size >> 1)) {
dout(10) << " passing half full mark, triggering commit" << dendl;
- do_sync_cond->SloppySignal(); // initiate a real commit so we can trim
+#ifdef CEPH_DEBUG_MUTEX
+ do_sync_cond->notify_all(true); // initiate a real commit so we can trim
+#else
+ do_sync_cond->notify_all();
+#endif
}
}
items.erase(it++);
#ifdef HAVE_LIBAIO
{
- Mutex::Locker locker(aio_lock);
- assert(aio_write_queue_ops > 0);
+ std::lock_guard locker{aio_lock};
+ ceph_assert(aio_write_queue_ops > 0);
aio_write_queue_ops--;
- assert(aio_write_queue_bytes >= bytes);
+ ceph_assert(aio_write_queue_bytes >= bytes);
aio_write_queue_bytes -= bytes;
}
#else
}
print_header(header);
}
-
+
return -ENOSPC; // hrm, full on first op
}
if (eleft) {
out:
dout(20) << "prepare_multi_write queue_pos now " << queue_pos << dendl;
- assert((write_pos + bl.length() == queue_pos) ||
+ ceph_assert((write_pos + bl.length() == queue_pos) ||
(write_pos + bl.length() - header.max_size + get_top() == queue_pos));
return 0;
}
void FileJournal::queue_completions_thru(uint64_t seq)
{
- assert(finisher_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(finisher_lock));
utime_t now = ceph_clock_now();
list<completion_item> items;
batch_pop_completions(items);
items.erase(it++);
}
batch_unpop_completions(items);
- finisher_cond.Signal();
+ finisher_cond.notify_all();
}
{
// make sure list segments are page aligned
if (directio && !bl.is_aligned_size_and_memory(block_size, CEPH_DIRECTIO_ALIGNMENT)) {
- assert((bl.length() & (CEPH_DIRECTIO_ALIGNMENT - 1)) == 0);
- assert((pos & (CEPH_DIRECTIO_ALIGNMENT - 1)) == 0);
- assert(0 == "bl was not aligned");
+ ceph_assert((bl.length() & (CEPH_DIRECTIO_ALIGNMENT - 1)) == 0);
+ ceph_assert((pos & (CEPH_DIRECTIO_ALIGNMENT - 1)) == 0);
+ ceph_abort_msg("bl was not aligned");
}
}
if (bl.length() == 0 && !must_write_header)
return;
- buffer::ptr hbp;
+ ceph::buffer::ptr hbp;
if (cct->_conf->journal_write_header_frequency &&
(((++journaled_since_start) %
cct->_conf->journal_write_header_frequency) == 0)) {
if (write_pos >= header.max_size)
write_pos = write_pos - header.max_size + get_top();
- write_lock.Unlock();
+ write_lock.unlock();
// split?
off64_t split = 0;
split = header.max_size - pos;
first.substr_of(bl, 0, split);
second.substr_of(bl, split, bl.length() - split);
- assert(first.length() + second.length() == bl.length());
+ ceph_assert(first.length() + second.length() == bl.length());
dout(10) << "do_write wrapping, first bit at " << pos << " len " << first.length()
<< " second bit len " << second.length() << " (orig len " << bl.length() << ")" << dendl;
// header too?
if (hbp.length()) {
// be sneaky: include the header in the second fragment
- second.push_front(hbp);
+ bufferlist tmp;
+ tmp.push_back(hbp);
+ tmp.claim_append(second);
+ second.swap(tmp);
pos = 0; // we included the header
}
// Write the second portion first possible with the header, so
check_align(first_pos, first);
ceph_abort();
}
- assert(first_pos == get_top());
+ ceph_assert(first_pos == get_top());
} else {
// header too?
if (hbp.length()) {
* flush disk caches or commits any sort of metadata.
*/
int ret = 0;
-#if defined(DARWIN) || defined(__FreeBSD__)
+#if defined(__APPLE__) || defined(__FreeBSD__)
ret = ::fsync(fd);
#else
ret = ::fdatasync(fd);
utime_t lat = ceph_clock_now() - from;
dout(20) << "do_write latency " << lat << dendl;
- write_lock.Lock();
+ write_lock.lock();
- assert(write_pos == pos);
- assert(write_pos % header.alignment == 0);
+ ceph_assert(write_pos == pos);
+ ceph_assert(write_pos % header.alignment == 0);
{
- Mutex::Locker locker(finisher_lock);
+ std::lock_guard locker{finisher_lock};
journaled_seq = writing_seq;
// kick finisher?
{
dout(10) << "waiting for completions to empty" << dendl;
{
- Mutex::Locker l(finisher_lock);
- while (!completions_empty())
- finisher_cond.Wait(finisher_lock);
+ std::unique_lock l{finisher_lock};
+ finisher_cond.wait(l, [this] { return completions_empty(); });
}
dout(10) << "flush waiting for finisher" << dendl;
finisher->wait_for_empty();
dout(10) << "write_thread_entry start" << dendl;
while (1) {
{
- Mutex::Locker locker(writeq_lock);
+ std::unique_lock locker{writeq_lock};
if (writeq.empty() && !must_write_header) {
if (write_stop)
break;
dout(20) << "write_thread_entry going to sleep" << dendl;
- writeq_cond.Wait(writeq_lock);
+ writeq_cond.wait(locker);
dout(20) << "write_thread_entry woke up" << dendl;
continue;
}
#ifdef HAVE_LIBAIO
if (aio) {
- Mutex::Locker locker(aio_lock);
+ std::unique_lock locker{aio_lock};
// should we back off to limit aios in flight? try to do this
// adaptively so that we submit larger aios once we have lots of
// them in flight.
// flight if we hit this limit to ensure we keep the device
// saturated.
while (aio_num > 0) {
- int exp = MIN(aio_num * 2, 24);
+ int exp = std::min<int>(aio_num * 2, 24);
long unsigned min_new = 1ull << exp;
uint64_t cur = aio_write_queue_bytes;
dout(20) << "write_thread_entry aio throttle: aio num " << aio_num << " bytes " << aio_bytes
dout(20) << "write_thread_entry deferring until more aios complete: "
<< aio_num << " aios with " << aio_bytes << " bytes needs " << min_new
<< " bytes to start a new aio (currently " << cur << " pending)" << dendl;
- aio_cond.Wait(aio_lock);
+ aio_cond.wait(locker);
dout(20) << "write_thread_entry woke up" << dendl;
}
}
#endif
- Mutex::Locker locker(write_lock);
+ std::unique_lock locker{write_lock};
uint64_t orig_ops = 0;
uint64_t orig_bytes = 0;
r = 0;
} else {
dout(20) << "write_thread_entry full, going to sleep (waiting for commit)" << dendl;
- commit_cond.Wait(write_lock);
+ commit_cond.wait(locker);
dout(20) << "write_thread_entry woke up" << dendl;
continue;
}
}
- assert(r == 0);
+ ceph_assert(r == 0);
if (logger) {
logger->inc(l_filestore_journal_wr);
if (bl.length() == 0 && !must_write_header)
return;
- buffer::ptr hbp;
+ ceph::buffer::ptr hbp;
if (must_write_header) {
must_write_header = false;
hbp = prepare_header();
split = header.max_size - pos;
first.substr_of(bl, 0, split);
second.substr_of(bl, split, bl.length() - split);
- assert(first.length() + second.length() == bl.length());
+ ceph_assert(first.length() + second.length() == bl.length());
dout(10) << "do_aio_write wrapping, first bit at " << pos << "~" << first.length() << dendl;
if (write_aio_bl(pos, first, 0)) {
<< ") failed" << dendl;
ceph_abort();
}
- assert(pos == header.max_size);
+ ceph_assert(pos == header.max_size);
if (hbp.length()) {
// be sneaky: include the header in the second fragment
- second.push_front(hbp);
+ bufferlist tmp;
+ tmp.push_back(hbp);
+ tmp.claim_append(second);
+ second.swap(tmp);
pos = 0; // we included the header
} else
pos = get_top(); // no header, start after that
write_pos = pos;
if (write_pos == header.max_size)
write_pos = get_top();
- assert(write_pos % header.alignment == 0);
+ ceph_assert(write_pos % header.alignment == 0);
}
/**
dout(20) << "write_aio_bl " << pos << "~" << bl.length() << " seq " << seq << dendl;
while (bl.length() > 0) {
- int max = MIN(bl.get_num_buffers(), IOV_MAX-1);
+ int max = std::min<int>(bl.get_num_buffers(), IOV_MAX-1);
iovec *iov = new iovec[max];
int n = 0;
unsigned len = 0;
- for (std::list<buffer::ptr>::const_iterator p = bl.buffers().begin();
- n < max;
- ++p, ++n) {
- assert(p != bl.buffers().end());
- iov[n].iov_base = (void *)p->c_str();
+ for (auto p = std::cbegin(bl.buffers()); n < max; ++p, ++n) {
+ ceph_assert(p != std::cend(bl.buffers()));
+ iov[n].iov_base = const_cast<void*>(static_cast<const void*>(p->c_str()));
iov[n].iov_len = p->length();
len += p->length();
}
// lock only aio_queue, current aio, aio_num, aio_bytes, which may be
// modified in check_aio_completion
- aio_lock.Lock();
+ aio_lock.lock();
aio_queue.push_back(aio_info(tbl, pos, bl.length() > 0 ? 0 : seq));
aio_info& aio = aio_queue.back();
aio.iov = iov;
// aio could be ereased from aio_queue once it is done
uint64_t cur_len = aio.len;
// unlock aio_lock because following io_submit might take time to return
- aio_lock.Unlock();
+ aio_lock.unlock();
iocb *piocb = &aio.iocb;
-
+
// 2^16 * 125us = ~8 seconds, so max sleep is ~16 seconds
int attempts = 16;
int delay = 125;
continue;
}
check_align(pos, tbl);
- assert(0 == "io_submit got unexpected error");
+ ceph_abort_msg("io_submit got unexpected error");
} else {
break;
}
} while (true);
pos += cur_len;
}
- aio_lock.Lock();
- write_finish_cond.Signal();
- aio_lock.Unlock();
+ aio_lock.lock();
+ write_finish_cond.notify_all();
+ aio_lock.unlock();
return 0;
}
#endif
void FileJournal::write_finish_thread_entry()
{
#ifdef HAVE_LIBAIO
- dout(10) << "write_finish_thread_entry enter" << dendl;
+ dout(10) << __func__ << " enter" << dendl;
while (true) {
{
- Mutex::Locker locker(aio_lock);
+ std::unique_lock locker{aio_lock};
if (aio_queue.empty()) {
if (aio_stop)
break;
- dout(20) << "write_finish_thread_entry sleeping" << dendl;
- write_finish_cond.Wait(aio_lock);
+ dout(20) << __func__ << " sleeping" << dendl;
+ write_finish_cond.wait(locker);
continue;
}
}
- dout(20) << "write_finish_thread_entry waiting for aio(s)" << dendl;
+ dout(20) << __func__ << " waiting for aio(s)" << dendl;
io_event event[16];
int r = io_getevents(aio_ctx, 1, 16, event, NULL);
if (r < 0) {
continue;
}
derr << "io_getevents got " << cpp_strerror(r) << dendl;
- assert(0 == "got unexpected error from io_getevents");
+ if (r == -EIO) {
+ note_io_error_event(devname.c_str(), fn.c_str(), -EIO, 0, 0, 0);
+ }
+ ceph_abort_msg("got unexpected error from io_getevents");
}
{
- Mutex::Locker locker(aio_lock);
+ std::lock_guard locker{aio_lock};
for (int i=0; i<r; i++) {
aio_info *ai = (aio_info *)event[i].obj;
if (event[i].res != ai->len) {
derr << "aio to " << ai->off << "~" << ai->len
<< " returned: " << (int)event[i].res << dendl;
- assert(0 == "unexpected aio error");
+ ceph_abort_msg("unexpected aio error");
}
- dout(10) << "write_finish_thread_entry aio " << ai->off
+ dout(10) << __func__ << " aio " << ai->off
<< "~" << ai->len << " done" << dendl;
ai->done = true;
}
check_aio_completion();
}
}
- dout(10) << "write_finish_thread_entry exit" << dendl;
+ dout(10) << __func__ << " exit" << dendl;
#endif
}
*/
void FileJournal::check_aio_completion()
{
- assert(aio_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(aio_lock));
dout(20) << "check_aio_completion" << dendl;
bool completed_something = false, signal = false;
if (completed_something) {
// kick finisher?
// only if we haven't filled up recently!
- Mutex::Locker locker(finisher_lock);
+ std::lock_guard locker{finisher_lock};
journaled_seq = new_journaled_seq;
if (full_state != FULL_NOTFULL) {
dout(10) << "check_aio_completion NOT queueing finisher seq " << journaled_seq
}
if (signal) {
// maybe write queue was waiting for aio count to drop?
- aio_cond.Signal();
+ aio_cond.notify_all();
}
}
#endif
data_len = (*p).get_data_length();
data_align = ((*p).get_data_alignment() - bl.length()) & ~CEPH_PAGE_MASK;
}
- ::encode(*p, bl);
+ encode(*p, bl);
}
if (tbl->length()) {
bl.claim_append(*tbl);
memset(&h, 0, sizeof(h));
if (data_align >= 0)
h.pre_pad = ((unsigned int)data_align - (unsigned int)head_size) & ~CEPH_PAGE_MASK;
- off64_t size = ROUND_UP_TO(base_size + h.pre_pad, header.alignment);
+ off64_t size = round_up_to(base_size + h.pre_pad, header.alignment);
unsigned post_pad = size - base_size - h.pre_pad;
h.len = bl.length();
h.post_pad = post_pad;
// header
ebl.append((const char*)&h, sizeof(h));
if (h.pre_pad) {
- ebl.push_back(buffer::create_static(h.pre_pad, zero_buf));
+ ebl.push_back(ceph::buffer::create_static(h.pre_pad, zero_buf));
}
// payload
- ebl.claim_append(bl, buffer::list::CLAIM_ALLOW_NONSHAREABLE); // potential zero-copy
+ ebl.claim_append(bl);
if (h.post_pad) {
- ebl.push_back(buffer::create_static(h.post_pad, zero_buf));
+ ebl.push_back(ceph::buffer::create_static(h.post_pad, zero_buf));
}
// footer
ebl.append((const char*)&h, sizeof(h));
if (directio)
ebl.rebuild_aligned(CEPH_DIRECTIO_ALIGNMENT);
- tbl->claim(ebl);
+ *tbl = std::move(ebl);
return h.len;
}
dout(5) << "submit_entry seq " << seq
<< " len " << e.length()
<< " (" << oncommit << ")" << dendl;
- assert(e.length() > 0);
- assert(e.length() < header.max_size);
+ ceph_assert(e.length() > 0);
+ ceph_assert(e.length() < header.max_size);
- if (osd_op)
- osd_op->mark_event("commit_queued_for_journal_write");
if (logger) {
logger->inc(l_filestore_journal_queue_bytes, orig_len);
logger->inc(l_filestore_journal_queue_ops, 1);
}
}
{
- Mutex::Locker l1(writeq_lock);
+ std::lock_guard l1{writeq_lock};
#ifdef HAVE_LIBAIO
- Mutex::Locker l2(aio_lock);
+ std::lock_guard l2{aio_lock};
#endif
- Mutex::Locker l3(completions_lock);
+ std::lock_guard l3{completions_lock};
#ifdef HAVE_LIBAIO
aio_write_queue_ops++;
aio_write_queue_bytes += e.length();
- aio_cond.Signal();
+ aio_cond.notify_all();
#endif
completions.push_back(
completion_item(
seq, oncommit, ceph_clock_now(), osd_op));
if (writeq.empty())
- writeq_cond.Signal();
+ writeq_cond.notify_all();
writeq.push_back(write_item(seq, e, orig_len, osd_op));
if (osd_op)
osd_op->journal_trace.keyval("queue depth", writeq.size());
bool FileJournal::writeq_empty()
{
- Mutex::Locker locker(writeq_lock);
+ std::lock_guard locker{writeq_lock};
return writeq.empty();
}
FileJournal::write_item &FileJournal::peek_write()
{
- assert(write_lock.is_locked());
- Mutex::Locker locker(writeq_lock);
+ ceph_assert(ceph_mutex_is_locked(write_lock));
+ std::lock_guard locker{writeq_lock};
return writeq.front();
}
void FileJournal::pop_write()
{
- assert(write_lock.is_locked());
- Mutex::Locker locker(writeq_lock);
+ ceph_assert(ceph_mutex_is_locked(write_lock));
+ std::lock_guard locker{writeq_lock};
if (logger) {
logger->dec(l_filestore_journal_queue_bytes, writeq.front().orig_len);
logger->dec(l_filestore_journal_queue_ops, 1);
void FileJournal::batch_pop_write(list<write_item> &items)
{
- assert(write_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(write_lock));
{
- Mutex::Locker locker(writeq_lock);
+ std::lock_guard locker{writeq_lock};
writeq.swap(items);
}
for (auto &&i : items) {
void FileJournal::batch_unpop_write(list<write_item> &items)
{
- assert(write_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(write_lock));
for (auto &&i : items) {
if (logger) {
logger->inc(l_filestore_journal_queue_bytes, i.orig_len);
logger->inc(l_filestore_journal_queue_ops, 1);
}
}
- Mutex::Locker locker(writeq_lock);
+ std::lock_guard locker{writeq_lock};
writeq.splice(writeq.begin(), items);
}
*/
void FileJournal::do_discard(int64_t offset, int64_t end)
{
- dout(10) << __func__ << "trim(" << offset << ", " << end << dendl;
+ dout(10) << __func__ << " trim(" << offset << ", " << end << dendl;
- offset = ROUND_UP_TO(offset, block_size);
+ offset = round_up_to(offset, block_size);
if (offset >= end)
return;
- end = ROUND_UP_TO(end - block_size, block_size);
- assert(end >= offset);
- if (offset < end)
- if (block_device_discard(fd, offset, end - offset) < 0)
+ end = round_up_to(end - block_size, block_size);
+ ceph_assert(end >= offset);
+ if (offset < end) {
+ BlkDev blkdev(fd);
+ if (blkdev.discard(offset, end - offset) < 0) {
dout(1) << __func__ << "ioctl(BLKDISCARD) error:" << cpp_strerror(errno) << dendl;
+ }
+ }
}
void FileJournal::committed_thru(uint64_t seq)
{
- Mutex::Locker locker(write_lock);
+ std::lock_guard locker{write_lock};
auto released = throttle.flush(seq);
if (logger) {
if (seq < last_committed_seq) {
dout(5) << "committed_thru " << seq << " < last_committed_seq " << last_committed_seq << dendl;
- assert(seq >= last_committed_seq);
+ ceph_assert(seq >= last_committed_seq);
return;
}
if (seq == last_committed_seq) {
// completions!
{
- Mutex::Locker locker(finisher_lock);
+ std::lock_guard locker{finisher_lock};
queue_completions_thru(seq);
if (plug_journal_completions && seq >= header.start_seq) {
dout(10) << " removing completion plug, queuing completions thru journaled_seq " << journaled_seq << dendl;
pop_write();
}
- commit_cond.Signal();
+ commit_cond.notify_all();
dout(10) << "committed_thru done" << dendl;
}
len = olen; // rest
int64_t actual = ::lseek64(fd, pos, SEEK_SET);
- assert(actual == pos);
+ ceph_assert(actual == pos);
- bufferptr bp = buffer::create(len);
+ bufferptr bp = ceph::buffer::create(len);
int r = safe_read_exact(fd, bp.c_str(), len);
if (r) {
derr << "FileJournal::wrap_read_bl: safe_read_exact " << pos << "~" << len << " returned "
- << r << dendl;
+ << cpp_strerror(r) << dendl;
ceph_abort();
}
bl->push_back(std::move(bp));
journaled_seq = seq;
return true;
}
+ } else {
+ derr << "do_read_entry(" << pos << "): " << ss.str() << dendl;
}
if (seq && seq < header.committed_up_to) {
}
}
- dout(25) << ss.str() << dendl;
dout(2) << "No further valid entries found, journal is most likely valid"
<< dendl;
return false;
if (_h)
*_h = *h;
- assert(cur_pos % header.alignment == 0);
+ ceph_assert(cur_pos % header.alignment == 0);
return SUCCESS;
}
corrupt_at = corrupt_at + get_top() - header.max_size;
int64_t actual = ::lseek64(fd, corrupt_at, SEEK_SET);
- assert(actual == corrupt_at);
+ ceph_assert(actual == corrupt_at);
char buf[10];
int r = safe_read_exact(fd, buf, 1);
- assert(r == 0);
+ ceph_assert(r == 0);
actual = ::lseek64(wfd, corrupt_at, SEEK_SET);
- assert(actual == corrupt_at);
+ ceph_assert(actual == corrupt_at);
buf[0]++;
r = safe_write(wfd, buf, 1);
- assert(r == 0);
+ ceph_assert(r == 0);
}
void FileJournal::corrupt_payload(
dout(20) << __func__ << " journal size=" << size << dendl;
return size;
}
+
+void FileJournal::get_devices(set<string> *ls)
+{
+ string dev_node;
+ BlkDev blkdev(fd);
+ if (int rc = blkdev.wholedisk(&dev_node); rc) {
+ return;
+ }
+ get_raw_devices(dev_node, ls);
+}
+
+void FileJournal::collect_metadata(map<string,string> *pm)
+{
+ BlkDev blkdev(fd);
+ char partition_path[PATH_MAX];
+ char dev_node[PATH_MAX];
+ if (blkdev.partition(partition_path, PATH_MAX)) {
+ (*pm)["backend_filestore_journal_partition_path"] = "unknown";
+ } else {
+ (*pm)["backend_filestore_journal_partition_path"] = string(partition_path);
+ }
+ if (blkdev.wholedisk(dev_node, PATH_MAX)) {
+ (*pm)["backend_filestore_journal_dev_node"] = "unknown";
+ } else {
+ (*pm)["backend_filestore_journal_dev_node"] = string(dev_node);
+ devname = dev_node;
+ }
+}