X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=ceph%2Fsrc%2Fosdc%2FJournaler.cc;h=8084a661d7d34be5cc04986dea37cc83a27d2fb6;hb=20effc670b57271cb089376d6d0800990e5218d5;hp=5949ff3b4016cd4282279996f15719cd7281bdae;hpb=7c673caec407dd16107e56e4b51a6d00f021315c;p=ceph.git diff --git a/ceph/src/osdc/Journaler.cc b/ceph/src/osdc/Journaler.cc index 5949ff3b4..8084a661d 100644 --- a/ceph/src/osdc/Journaler.cc +++ b/ceph/src/osdc/Journaler.cc @@ -18,7 +18,7 @@ #include "msg/Messenger.h" #include "osdc/Journaler.h" #include "common/errno.h" -#include "include/assert.h" +#include "include/ceph_assert.h" #include "common/Finisher.h" #define dout_subsys ceph_subsys_journaler @@ -26,13 +26,14 @@ #define dout_prefix *_dout << objecter->messenger->get_myname() \ << ".journaler." << name << (readonly ? "(ro) ":"(rw) ") +using namespace std; using std::chrono::seconds; class Journaler::C_DelayFlush : public Context { Journaler *journaler; public: - C_DelayFlush(Journaler *j) : journaler(j) {} + explicit C_DelayFlush(Journaler *j) : journaler(j) {} void finish(int r) override { journaler->_do_delayed_flush(); } @@ -58,7 +59,7 @@ void Journaler::create(file_layout_t *l, stream_format_t const sf) { lock_guard lk(lock); - assert(!readonly); + ceph_assert(!readonly); state = STATE_ACTIVE; stream_format = sf; @@ -84,15 +85,17 @@ void Journaler::_set_layout(file_layout_t const *l) { layout = *l; - assert(layout.pool_id == pg_pool); + if (layout.pool_id != pg_pool) { + // user can reset pool id through cephfs-journal-tool + lderr(cct) << "may got older pool id from header layout" << dendl; + ceph_abort(); + } last_written.layout = layout; last_committed.layout = layout; // prefetch intelligently. // (watch out, this is big if you use big objects or weird striping) - uint64_t periods = cct->_conf->journaler_prefetch_periods; - if (periods < 2) - periods = 2; // we need at least 2 periods to make progress. + uint64_t periods = cct->_conf.get_val("journaler_prefetch_periods"); fetch_len = layout.get_period() * periods; } @@ -155,14 +158,14 @@ public: void Journaler::recover(Context *onread) { lock_guard l(lock); - if (stopping) { + if (is_stopping()) { onread->complete(-EAGAIN); return; } ldout(cct, 1) << "recover start" << dendl; - assert(state != STATE_ACTIVE); - assert(readonly); + ceph_assert(state != STATE_ACTIVE); + ceph_assert(readonly); if (onread) waitfor_recover.push_back(wrap_finisher(onread)); @@ -181,7 +184,7 @@ void Journaler::recover(Context *onread) void Journaler::_read_head(Context *on_finish, bufferlist *bl) { // lock is locked - assert(state == STATE_READHEAD || state == STATE_REREADHEAD); + ceph_assert(state == STATE_READHEAD || state == STATE_REREADHEAD); object_t oid = file_object_t(ino, 0); object_locator_t oloc(pg_pool); @@ -205,7 +208,7 @@ void Journaler::reread_head(Context *onfinish) void Journaler::_reread_head(Context *onfinish) { ldout(cct, 10) << "reread_head" << dendl; - assert(state == STATE_ACTIVE); + ceph_assert(state == STATE_ACTIVE); state = STATE_REREADHEAD; C_RereadHead *fin = new C_RereadHead(this, onfinish); @@ -215,16 +218,20 @@ void Journaler::_reread_head(Context *onfinish) void Journaler::_finish_reread_head(int r, bufferlist& bl, Context *finish) { lock_guard l(lock); + if (is_stopping()) { + finish->complete(-EAGAIN); + return; + } //read on-disk header into - assert(bl.length() || r < 0 ); + ceph_assert(bl.length() || r < 0 ); // unpack header if (r == 0) { Header h; - bufferlist::iterator p = bl.begin(); + auto p = bl.cbegin(); try { - ::decode(h, p); + decode(h, p); } catch (const buffer::error &e) { finish->complete(-EINVAL); return; @@ -243,8 +250,10 @@ void Journaler::_finish_reread_head(int r, bufferlist& bl, Context *finish) void Journaler::_finish_read_head(int r, bufferlist& bl) { lock_guard l(lock); + if (is_stopping()) + return; - assert(state == STATE_READHEAD); + ceph_assert(state == STATE_READHEAD); if (r!=0) { ldout(cct, 0) << "error getting journal off disk" << dendl; @@ -267,9 +276,9 @@ void Journaler::_finish_read_head(int r, bufferlist& bl) // unpack header bool corrupt = false; Header h; - bufferlist::iterator p = bl.begin(); + auto p = bl.cbegin(); try { - ::decode(h, p); + decode(h, p); if (h.magic != magic) { ldout(cct, 0) << "on disk magic '" << h.magic << "' != my magic '" @@ -312,7 +321,7 @@ void Journaler::_probe(Context *finish, uint64_t *end) { // lock is locked ldout(cct, 1) << "probing for end of the log" << dendl; - assert(state == STATE_PROBING || state == STATE_REPROBING); + ceph_assert(state == STATE_PROBING || state == STATE_REPROBING); // probe the log filer.probe(ino, &layout, CEPH_NOSNAP, write_pos, end, true, 0, wrap_finisher(finish)); @@ -321,7 +330,7 @@ void Journaler::_probe(Context *finish, uint64_t *end) void Journaler::_reprobe(C_OnFinisher *finish) { ldout(cct, 10) << "reprobe" << dendl; - assert(state == STATE_ACTIVE); + ceph_assert(state == STATE_ACTIVE); state = STATE_REPROBING; C_ReProbe *fin = new C_ReProbe(this, finish); @@ -333,8 +342,12 @@ void Journaler::_finish_reprobe(int r, uint64_t new_end, C_OnFinisher *onfinish) { lock_guard l(lock); + if (is_stopping()) { + onfinish->complete(-EAGAIN); + return; + } - assert(new_end >= write_pos || r < 0); + ceph_assert(new_end >= write_pos || r < 0); ldout(cct, 1) << "_finish_reprobe new_end = " << new_end << " (header had " << write_pos << ")." << dendl; @@ -346,8 +359,10 @@ void Journaler::_finish_reprobe(int r, uint64_t new_end, void Journaler::_finish_probe_end(int r, uint64_t end) { lock_guard l(lock); + if (is_stopping()) + return; - assert(state == STATE_PROBING); + ceph_assert(state == STATE_PROBING); if (r < 0) { // error in probing goto out; } @@ -357,7 +372,7 @@ void Journaler::_finish_probe_end(int r, uint64_t end) << write_pos << "). log was empty. recovered." << dendl; ceph_abort(); // hrm. } else { - assert(end >= write_pos); + ceph_assert(end >= write_pos); ldout(cct, 1) << "_finish_probe_end write_pos = " << end << " (header had " << write_pos << "). recovered." << dendl; @@ -390,7 +405,7 @@ void Journaler::reread_head_and_probe(Context *onfinish) { lock_guard l(lock); - assert(state == STATE_ACTIVE); + ceph_assert(state == STATE_ACTIVE); _reread_head(new C_RereadHeadProbe(this, wrap_finisher(onfinish))); } @@ -398,8 +413,19 @@ void Journaler::_finish_reread_head_and_probe(int r, C_OnFinisher *onfinish) { // Expect to be called back from finish_reread_head, which already takes lock // lock is locked + if (is_stopping()) { + onfinish->complete(-EAGAIN); + return; + } - assert(!r); //if we get an error, we're boned + // Let the caller know that the operation has failed or was intentionally + // failed since the caller has been blocklisted. + if (r == -EBLOCKLISTED) { + onfinish->complete(r); + return; + } + + ceph_assert(!r); //if we get an error, we're boned _reprobe(onfinish); } @@ -427,8 +453,8 @@ void Journaler::write_head(Context *oncommit) void Journaler::_write_head(Context *oncommit) { - assert(!readonly); - assert(state == STATE_ACTIVE); + ceph_assert(!readonly); + ceph_assert(state == STATE_ACTIVE); last_written.trimmed_pos = trimmed_pos; last_written.expire_pos = expire_pos; last_written.unused_field = expire_pos; @@ -437,13 +463,13 @@ void Journaler::_write_head(Context *oncommit) ldout(cct, 10) << "write_head " << last_written << dendl; // Avoid persisting bad pointers in case of bugs - assert(last_written.write_pos >= last_written.expire_pos); - assert(last_written.expire_pos >= last_written.trimmed_pos); + ceph_assert(last_written.write_pos >= last_written.expire_pos); + ceph_assert(last_written.expire_pos >= last_written.trimmed_pos); last_wrote_head = ceph::real_clock::now(); bufferlist bl; - ::encode(last_written, bl); + encode(last_written, bl); SnapContext snapc; object_t oid = file_object_t(ino, 0); @@ -465,7 +491,7 @@ void Journaler::_finish_write_head(int r, Header &wrote, handle_write_error(r); return; } - assert(!readonly); + ceph_assert(!readonly); ldout(cct, 10) << "_finish_write_head " << wrote << dendl; last_committed = wrote; if (oncommit) { @@ -493,7 +519,7 @@ public: void Journaler::_finish_flush(int r, uint64_t start, ceph::real_time stamp) { lock_guard l(lock); - assert(!readonly); + ceph_assert(!readonly); if (r < 0) { lderr(cct) << "_finish_flush got " << cpp_strerror(r) << dendl; @@ -501,7 +527,7 @@ void Journaler::_finish_flush(int r, uint64_t start, ceph::real_time stamp) return; } - assert(start < flush_pos); + ceph_assert(start < flush_pos); // calc latency? if (logger) { @@ -511,12 +537,13 @@ void Journaler::_finish_flush(int r, uint64_t start, ceph::real_time stamp) // adjust safe_pos auto it = pending_safe.find(start); - assert(it != pending_safe.end()); + ceph_assert(it != pending_safe.end()); + uint64_t min_next_safe_pos = pending_safe.begin()->second; pending_safe.erase(it); if (pending_safe.empty()) safe_pos = next_safe_pos; else - safe_pos = pending_safe.begin()->second; + safe_pos = min_next_safe_pos; ldout(cct, 10) << "_finish_flush safe from " << start << ", pending_safe " << pending_safe @@ -526,11 +553,16 @@ void Journaler::_finish_flush(int r, uint64_t start, ceph::real_time stamp) << dendl; // kick waiters <= safe_pos - while (!waitfor_safe.empty()) { - if (waitfor_safe.begin()->first > safe_pos) - break; - finish_contexts(cct, waitfor_safe.begin()->second); - waitfor_safe.erase(waitfor_safe.begin()); + if (!waitfor_safe.empty()) { + list ls; + while (!waitfor_safe.empty()) { + auto it = waitfor_safe.begin(); + if (it->first > safe_pos) + break; + ls.splice(ls.end(), it->second); + waitfor_safe.erase(it); + } + finish_contexts(cct, ls); } } @@ -540,7 +572,7 @@ uint64_t Journaler::append_entry(bufferlist& bl) { unique_lock l(lock); - assert(!readonly); + ceph_assert(!readonly); uint32_t s = bl.length(); // append @@ -560,7 +592,7 @@ uint64_t Journaler::append_entry(bufferlist& bl) // flush previous object? uint64_t su = get_layout_period(); - assert(su > 0); + ceph_assert(su > 0); uint64_t write_off = write_pos % su; uint64_t write_obj = write_pos / su; uint64_t flush_obj = flush_pos / su; @@ -568,8 +600,12 @@ uint64_t Journaler::append_entry(bufferlist& bl) ldout(cct, 10) << " flushing completed object(s) (su " << su << " wro " << write_obj << " flo " << flush_obj << ")" << dendl; _do_flush(write_buf.length() - write_off); - if (write_off) { - // current entry isn't being flushed, set next_safe_pos to the end of previous entry + + // if _do_flush() skips flushing some data, it does do a best effort to + // update next_safe_pos. + if (write_buf.length() > 0 && + write_buf.length() <= wrote) { // the unflushed data are within this entry + // set next_safe_pos to end of previous entry next_safe_pos = write_pos - wrote; } } @@ -580,14 +616,16 @@ uint64_t Journaler::append_entry(bufferlist& bl) void Journaler::_do_flush(unsigned amount) { + if (is_stopping()) + return; if (write_pos == flush_pos) return; - assert(write_pos > flush_pos); - assert(!readonly); + ceph_assert(write_pos > flush_pos); + ceph_assert(!readonly); // flush uint64_t len = write_pos - flush_pos; - assert(len == write_buf.length()); + ceph_assert(len == write_buf.length()); if (amount && amount < len) len = amount; @@ -602,19 +640,16 @@ void Journaler::_do_flush(unsigned amount) ldout(cct, 10) << "_do_flush wanted to do " << flush_pos << "~" << len << " already too close to prezero_pos " << prezero_pos << ", zeroing first" << dendl; - waiting_for_zero = true; + waiting_for_zero_pos = flush_pos + len; return; } if (static_cast(newlen) < len) { ldout(cct, 10) << "_do_flush wanted to do " << flush_pos << "~" << len << " but hit prezero_pos " << prezero_pos << ", will do " << flush_pos << "~" << newlen << dendl; + waiting_for_zero_pos = flush_pos + len; len = newlen; - } else { - waiting_for_zero = false; } - } else { - waiting_for_zero = false; } ldout(cct, 10) << "_do_flush flushing " << flush_pos << "~" << len << dendl; @@ -634,6 +669,17 @@ void Journaler::_do_flush(unsigned amount) next_safe_pos = write_pos; } else { write_buf.splice(0, len, &write_bl); + // Keys of waitfor_safe map are journal entry boundaries. + // Try finding a journal entry that we are actually flushing + // and set next_safe_pos to end of it. This is best effort. + // The one we found may not be the lastest flushing entry. + auto p = waitfor_safe.lower_bound(flush_pos + len); + if (p != waitfor_safe.end()) { + if (p->first > flush_pos + len && p != waitfor_safe.begin()) + --p; + if (p->first <= flush_pos + len && p->first > next_safe_pos) + next_safe_pos = p->first; + } } filer.write(ino, &layout, snapc, @@ -642,7 +688,7 @@ void Journaler::_do_flush(unsigned amount) wrap_finisher(onsafe), write_iohint); flush_pos += len; - assert(write_buf.length() == write_pos - flush_pos); + ceph_assert(write_buf.length() == write_pos - flush_pos); write_buf_throttle.put(len); ldout(cct, 20) << "write_buf_throttle put, len " << len << dendl; @@ -658,8 +704,9 @@ void Journaler::_do_flush(unsigned amount) void Journaler::wait_for_flush(Context *onsafe) { lock_guard l(lock); - if (stopping) { - onsafe->complete(-EAGAIN); + if (is_stopping()) { + if (onsafe) + onsafe->complete(-EAGAIN); return; } _wait_for_flush(onsafe); @@ -667,11 +714,11 @@ void Journaler::wait_for_flush(Context *onsafe) void Journaler::_wait_for_flush(Context *onsafe) { - assert(!readonly); + ceph_assert(!readonly); // all flushed and safe? if (write_pos == safe_pos) { - assert(write_buf.length() == 0); + ceph_assert(write_buf.length() == 0); ldout(cct, 10) << "flush nothing to flush, (prezeroing/prezero)/write/flush/safe " "pointers at " << "(" << prezeroing_pos << "/" << prezero_pos << ")/" @@ -691,15 +738,20 @@ void Journaler::_wait_for_flush(Context *onsafe) void Journaler::flush(Context *onsafe) { lock_guard l(lock); + if (is_stopping()) { + if (onsafe) + onsafe->complete(-EAGAIN); + return; + } _flush(wrap_finisher(onsafe)); } void Journaler::_flush(C_OnFinisher *onsafe) { - assert(!readonly); + ceph_assert(!readonly); if (write_pos == flush_pos) { - assert(write_buf.length() == 0); + ceph_assert(write_buf.length() == 0); ldout(cct, 10) << "flush nothing to flush, (prezeroing/prezero)/write/" "flush/safe pointers at " << "(" << prezeroing_pos << "/" << prezero_pos << ")/" << write_pos << "/" << flush_pos << "/" << safe_pos @@ -720,7 +772,7 @@ void Journaler::_flush(C_OnFinisher *onsafe) bool Journaler::_write_head_needed() { - return last_wrote_head + seconds(cct->_conf->journaler_write_head_interval) + return last_wrote_head + seconds(cct->_conf.get_val("journaler_write_head_interval")) < ceph::real_clock::now(); } @@ -739,12 +791,9 @@ struct C_Journaler_Prezero : public Context { void Journaler::_issue_prezero() { - assert(prezeroing_pos >= flush_pos); - - // we need to zero at least two periods, minimum, to ensure that we - // have a full empty object/period in front of us. - uint64_t num_periods = MAX(2, cct->_conf->journaler_prezero_periods); + ceph_assert(prezeroing_pos >= flush_pos); + uint64_t num_periods = cct->_conf.get_val("journaler_prezero_periods"); /* * issue zero requests based on write_pos, even though the invariant * is that we zero ahead of flush_pos. @@ -796,7 +845,7 @@ void Journaler::_finish_prezero(int r, uint64_t start, uint64_t len) return; } - assert(r == 0 || r == -ENOENT); + ceph_assert(r == 0 || r == -ENOENT); if (start == prezero_pos) { prezero_pos += len; @@ -807,8 +856,15 @@ void Journaler::_finish_prezero(int r, uint64_t start, uint64_t len) pending_zero.erase(b); } - if (waiting_for_zero) { - _do_flush(); + if (waiting_for_zero_pos > flush_pos) { + _do_flush(waiting_for_zero_pos - flush_pos); + } + + if (prezero_pos == prezeroing_pos && + !waitfor_prezero.empty()) { + list ls; + ls.swap(waitfor_prezero); + finish_contexts(cct, ls, 0); } } else { pending_zero.insert(start, len); @@ -819,6 +875,17 @@ void Journaler::_finish_prezero(int r, uint64_t start, uint64_t len) << dendl; } +void Journaler::wait_for_prezero(Context *onfinish) +{ + ceph_assert(onfinish); + lock_guard l(lock); + + if (prezero_pos == prezeroing_pos) { + finisher->queue(onfinish, 0); + return; + } + waitfor_prezero.push_back(wrap_finisher(onfinish)); +} /***************** READING *******************/ @@ -911,15 +978,15 @@ void Journaler::_assimilate_prefetch() << p->second.length() << dendl; received_pos += p->second.length(); read_buf.claim_append(p->second); - assert(received_pos <= requested_pos); + ceph_assert(received_pos <= requested_pos); prefetch_buf.erase(p); got_any = true; } if (got_any) { ldout(cct, 10) << "_assimilate_prefetch read_buf now " << read_pos << "~" - << read_buf.length() << ", read pointers " << read_pos - << "/" << received_pos << "/" << requested_pos + << read_buf.length() << ", read pointers read_pos=" << read_pos + << " received_pos=" << received_pos << " requested_pos=" << requested_pos << dendl; // Update readability (this will also hit any decode errors resulting @@ -944,15 +1011,24 @@ void Journaler::_issue_read(uint64_t len) { // stuck at safe_pos? (this is needed if we are reading the tail of // a journal we are also writing to) - assert(requested_pos <= safe_pos); + ceph_assert(requested_pos <= safe_pos); if (requested_pos == safe_pos) { ldout(cct, 10) << "_issue_read requested_pos = safe_pos = " << safe_pos << ", waiting" << dendl; - assert(write_pos > requested_pos); + ceph_assert(write_pos > requested_pos); if (pending_safe.empty()) { _flush(NULL); } - waitfor_safe[flush_pos].push_back(new C_RetryRead(this)); + + // Make sure keys of waitfor_safe map are journal entry boundaries. + // The key we used here is either next_safe_pos or old value of + // next_safe_pos. next_safe_pos is always set to journal entry + // boundary. + auto p = pending_safe.rbegin(); + if (p != pending_safe.rend()) + waitfor_safe[p->second].push_back(new C_RetryRead(this)); + else + waitfor_safe[next_safe_pos].push_back(new C_RetryRead(this)); return; } @@ -965,8 +1041,8 @@ void Journaler::_issue_read(uint64_t len) // go. ldout(cct, 10) << "_issue_read reading " << requested_pos << "~" << len - << ", read pointers " << read_pos << "/" << received_pos - << "/" << (requested_pos+len) << dendl; + << ", read pointers read_pos=" << read_pos << " received_pos=" << received_pos + << " requested_pos+len=" << (requested_pos+len) << dendl; // step by period (object). _don't_ do a single big filer.read() // here because it will wait for all object reads to complete before @@ -989,6 +1065,9 @@ void Journaler::_issue_read(uint64_t len) void Journaler::_prefetch() { + if (is_stopping()) + return; + ldout(cct, 10) << "_prefetch" << dendl; // prefetch uint64_t pf; @@ -1061,7 +1140,8 @@ bool Journaler::_is_readable() // adjust write_pos prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = next_safe_pos = read_pos; - assert(write_buf.length() == 0); + ceph_assert(write_buf.length() == 0); + ceph_assert(waitfor_safe.empty()); // reset read state requested_pos = received_pos = read_pos; @@ -1133,6 +1213,10 @@ void Journaler::erase(Context *completion) void Journaler::_finish_erase(int data_result, C_OnFinisher *completion) { lock_guard l(lock); + if (is_stopping()) { + completion->complete(-EAGAIN); + return; + } if (data_result == 0) { // Async delete the journal header @@ -1165,7 +1249,7 @@ bool Journaler::try_read_entry(bufferlist& bl) try { consumed = journal_stream.read(read_buf, &bl, &start_ptr); if (stream_format >= JOURNAL_FORMAT_RESILIENT) { - assert(start_ptr == read_pos); + ceph_assert(start_ptr == read_pos); } } catch (const buffer::error &e) { lderr(cct) << __func__ << ": decode error from journal_stream" << dendl; @@ -1189,18 +1273,25 @@ bool Journaler::try_read_entry(bufferlist& bl) // prefetch? _prefetch(); + + // If bufferlist consists of discontiguous memory, decoding types whose + // denc_traits needs contiguous memory is inefficient. The bufferlist may + // get copied to temporary memory multiple times (copy_shallow() in + // src/include/denc.h actually does deep copy) + if (bl.get_num_buffers() > 1) + bl.rebuild(); return true; } void Journaler::wait_for_readable(Context *onreadable) { lock_guard l(lock); - if (stopping) { - onreadable->complete(-EAGAIN); + if (is_stopping()) { + finisher->queue(onreadable, -EAGAIN); return; } - assert(on_readable == 0); + ceph_assert(on_readable == 0); if (!readable) { ldout(cct, 10) << "wait_for_readable at " << read_pos << " onreadable " << onreadable << dendl; @@ -1240,7 +1331,10 @@ void Journaler::trim() void Journaler::_trim() { - assert(!readonly); + if (is_stopping()) + return; + + ceph_assert(!readonly); uint64_t period = get_layout_period(); uint64_t trim_to = last_committed.expire_pos; trim_to -= trim_to % period; @@ -1260,9 +1354,9 @@ void Journaler::_trim() } // trim - assert(trim_to <= write_pos); - assert(trim_to <= expire_pos); - assert(trim_to > trimming_pos); + ceph_assert(trim_to <= write_pos); + ceph_assert(trim_to <= expire_pos); + ceph_assert(trim_to > trimming_pos); ldout(cct, 10) << "trim trimming to " << trim_to << ", trimmed/trimming/expire are " << trimmed_pos << "/" << trimming_pos << "/" << expire_pos @@ -1282,7 +1376,7 @@ void Journaler::_finish_trim(int r, uint64_t to) { lock_guard l(lock); - assert(!readonly); + ceph_assert(!readonly); ldout(cct, 10) << "_finish_trim trimmed_pos was " << trimmed_pos << ", trimmed/trimming/expire now " << to << "/" << trimming_pos << "/" << expire_pos @@ -1293,10 +1387,10 @@ void Journaler::_finish_trim(int r, uint64_t to) return; } - assert(r >= 0 || r == -ENOENT); + ceph_assert(r >= 0 || r == -ENOENT); - assert(to <= trimming_pos); - assert(to > trimmed_pos); + ceph_assert(to <= trimming_pos); + ceph_assert(to > trimmed_pos); trimmed_pos = to; } @@ -1316,7 +1410,7 @@ void Journaler::handle_write_error(int r) lderr(cct) << __func__ << ": multiple write errors, handler already called" << dendl; } else { - assert(0 == "unhandled write error"); + ceph_abort_msg("unhandled write error"); } } @@ -1331,11 +1425,11 @@ void Journaler::handle_write_error(int r) */ bool JournalStream::readable(bufferlist &read_buf, uint64_t *need) const { - assert(need != NULL); + ceph_assert(need != NULL); uint32_t entry_size = 0; uint64_t entry_sentinel = 0; - bufferlist::iterator p = read_buf.begin(); + auto p = read_buf.cbegin(); // Do we have enough data to decode an entry prefix? if (format >= JOURNAL_FORMAT_RESILIENT) { @@ -1345,13 +1439,13 @@ bool JournalStream::readable(bufferlist &read_buf, uint64_t *need) const } if (read_buf.length() >= *need) { if (format >= JOURNAL_FORMAT_RESILIENT) { - ::decode(entry_sentinel, p); + decode(entry_sentinel, p); if (entry_sentinel != sentinel) { throw buffer::malformed_input("Invalid sentinel"); } } - ::decode(entry_size, p); + decode(entry_size, p); } else { return false; } @@ -1388,29 +1482,29 @@ bool JournalStream::readable(bufferlist &read_buf, uint64_t *need) const size_t JournalStream::read(bufferlist &from, bufferlist *entry, uint64_t *start_ptr) { - assert(start_ptr != NULL); - assert(entry != NULL); - assert(entry->length() == 0); + ceph_assert(start_ptr != NULL); + ceph_assert(entry != NULL); + ceph_assert(entry->length() == 0); uint32_t entry_size = 0; // Consume envelope prefix: entry_size and entry_sentinel - bufferlist::iterator from_ptr = from.begin(); + auto from_ptr = from.cbegin(); if (format >= JOURNAL_FORMAT_RESILIENT) { uint64_t entry_sentinel = 0; - ::decode(entry_sentinel, from_ptr); + decode(entry_sentinel, from_ptr); // Assertion instead of clean check because of precondition of this // fn is that readable() already passed - assert(entry_sentinel == sentinel); + ceph_assert(entry_sentinel == sentinel); } - ::decode(entry_size, from_ptr); + decode(entry_size, from_ptr); // Read out the payload from_ptr.copy(entry_size, *entry); // Consume the envelope suffix (start_ptr) if (format >= JOURNAL_FORMAT_RESILIENT) { - ::decode(*start_ptr, from_ptr); + decode(*start_ptr, from_ptr); } else { *start_ptr = 0; } @@ -1428,16 +1522,16 @@ size_t JournalStream::read(bufferlist &from, bufferlist *entry, size_t JournalStream::write(bufferlist &entry, bufferlist *to, uint64_t const &start_ptr) { - assert(to != NULL); + ceph_assert(to != NULL); uint32_t const entry_size = entry.length(); if (format >= JOURNAL_FORMAT_RESILIENT) { - ::encode(sentinel, *to); + encode(sentinel, *to); } - ::encode(entry_size, *to); + encode(entry_size, *to); to->claim_append(entry); if (format >= JOURNAL_FORMAT_RESILIENT) { - ::encode(start_ptr, *to); + encode(start_ptr, *to); } if (format >= JOURNAL_FORMAT_RESILIENT) { @@ -1463,7 +1557,7 @@ size_t JournalStream::write(bufferlist &entry, bufferlist *to, */ void Journaler::set_write_error_handler(Context *c) { lock_guard l(lock); - assert(!on_write_error); + ceph_assert(!on_write_error); on_write_error = wrap_finisher(c); called_write_error = false; } @@ -1490,8 +1584,8 @@ void Journaler::shutdown() ldout(cct, 1) << __func__ << dendl; + state = STATE_STOPPING; readable = false; - stopping = true; // Kick out anyone reading from journal error = -EAGAIN; @@ -1501,7 +1595,9 @@ void Journaler::shutdown() f->complete(-EAGAIN); } - finish_contexts(cct, waitfor_recover, -ESHUTDOWN); + list ls; + ls.swap(waitfor_recover); + finish_contexts(cct, ls, -ESHUTDOWN); std::map >::iterator i; for (i = waitfor_safe.begin(); i != waitfor_safe.end(); ++i) {