} while (bytes_remaining > 0);
return append_io_events(journal::EVENT_TYPE_AIO_WRITE, bufferlists, requests,
- offset, length, flush_entry);
+ offset, length, flush_entry, 0);
}
template <typename I>
uint64_t Journal<I>::append_io_event(journal::EventEntry &&event_entry,
const IOObjectRequests &requests,
uint64_t offset, size_t length,
- bool flush_entry) {
+ bool flush_entry, int filter_ret_val) {
bufferlist bl;
event_entry.timestamp = ceph_clock_now();
::encode(event_entry, bl);
return append_io_events(event_entry.get_event_type(), {bl}, requests, offset,
- length, flush_entry);
+ length, flush_entry, filter_ret_val);
}
template <typename I>
const Bufferlists &bufferlists,
const IOObjectRequests &requests,
uint64_t offset, size_t length,
- bool flush_entry) {
+ bool flush_entry, int filter_ret_val) {
assert(!bufferlists.empty());
uint64_t tid;
{
Mutex::Locker event_locker(m_event_lock);
- m_events[tid] = Event(futures, requests, offset, length);
+ m_events[tid] = Event(futures, requests, offset, length, filter_ret_val);
}
CephContext *cct = m_image_ctx.cct;
<< "r=" << r << dendl;
Event &event = it->second;
+ if (r < 0 && r == event.filter_ret_val) {
+ // ignore allowed error codes
+ r = 0;
+ }
if (r < 0) {
// event recorded to journal but failed to update disk, we cannot
// commit this IO event. this event must be replayed.
handle_flushing_replay();
}
});
+ ctx = new FunctionContext([this, ctx](int r) {
+ // ensure the commit position is flushed to disk
+ m_journaler->flush_commit_position(ctx);
+ });
ctx = new FunctionContext([this, cct, cancel_ops, ctx](int r) {
ldout(cct, 20) << this << " handle_replay_complete: "
<< "shut down replay" << dendl;
m_lock.Unlock();
// stop replay, shut down, and restart
- Context *ctx = new FunctionContext([this, cct](int r) {
+ Context* ctx = create_context_callback<
+ Journal<I>, &Journal<I>::handle_flushing_restart>(this);
+ ctx = new FunctionContext([this, ctx](int r) {
+ // ensure the commit position is flushed to disk
+ m_journaler->flush_commit_position(ctx);
+ });
+ ctx = new FunctionContext([this, cct, ctx](int r) {
ldout(cct, 20) << this << " handle_replay_process_safe: "
<< "shut down replay" << dendl;
{
assert(m_state == STATE_FLUSHING_RESTART);
}
- m_journal_replay->shut_down(true, create_context_callback<
- Journal<I>, &Journal<I>::handle_flushing_restart>(this));
+ m_journal_replay->shut_down(true, ctx);
});
m_journaler->stop_replay(ctx);
return;
it != aio_object_requests.end(); ++it) {
if (r < 0) {
// don't send aio requests if the journal fails -- bubble error up
- (*it)->complete(r);
+ (*it)->fail(r);
} else {
// send any waiting aio requests now that journal entry is safe
(*it)->send();