namespace journal {
+namespace {
+
+bool advance_to_last_pad_byte(uint32_t off, bufferlist::const_iterator *iter,
+ uint32_t *pad_len, bool *partial_entry) {
+ const uint32_t MAX_PAD = 8;
+ auto pad_bytes = MAX_PAD - off % MAX_PAD;
+ auto next = *iter;
+
+ ceph_assert(!next.end());
+ if (*next != '\0') {
+ return false;
+ }
+
+ for (auto i = pad_bytes - 1; i > 0; i--) {
+ if ((++next).end()) {
+ *partial_entry = true;
+ return false;
+ }
+ if (*next != '\0') {
+ return false;
+ }
+ }
+
+ *iter = next;
+ *pad_len += pad_bytes;
+ return true;
+}
+
+} // anonymous namespace
+
ObjectPlayer::ObjectPlayer(librados::IoCtx &ioctx,
- const std::string &object_oid_prefix,
+ const std::string& object_oid_prefix,
uint64_t object_num, SafeTimer &timer,
- Mutex &timer_lock, uint8_t order,
+ ceph::mutex &timer_lock, uint8_t order,
uint64_t max_fetch_bytes)
- : RefCountedObject(NULL, 0), m_object_num(object_num),
+ : m_object_num(object_num),
m_oid(utils::get_object_name(object_oid_prefix, m_object_num)),
- m_cct(NULL), m_timer(timer), m_timer_lock(timer_lock), m_order(order),
+ m_timer(timer), m_timer_lock(timer_lock), m_order(order),
m_max_fetch_bytes(max_fetch_bytes > 0 ? max_fetch_bytes : 2 << order),
- m_watch_interval(0), m_watch_task(NULL),
- m_lock(utils::unique_lock_name("ObjectPlayer::m_lock", this)),
- m_fetch_in_progress(false) {
+ m_lock(ceph::make_mutex(utils::unique_lock_name("ObjectPlayer::m_lock", this)))
+{
m_ioctx.dup(ioctx);
m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
}
ObjectPlayer::~ObjectPlayer() {
{
- Mutex::Locker timer_locker(m_timer_lock);
- Mutex::Locker locker(m_lock);
+ std::lock_guard timer_locker{m_timer_lock};
+ std::lock_guard locker{m_lock};
ceph_assert(!m_fetch_in_progress);
ceph_assert(m_watch_ctx == nullptr);
}
void ObjectPlayer::fetch(Context *on_finish) {
ldout(m_cct, 10) << __func__ << ": " << m_oid << dendl;
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
ceph_assert(!m_fetch_in_progress);
m_fetch_in_progress = true;
op.read(m_read_off, m_max_fetch_bytes, &context->read_bl, NULL);
op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
- librados::AioCompletion *rados_completion =
- librados::Rados::aio_create_completion(context, utils::rados_ctx_callback,
- NULL);
+ auto rados_completion =
+ librados::Rados::aio_create_completion(context, utils::rados_ctx_callback);
int r = m_ioctx.aio_operate(m_oid, rados_completion, &op, 0, NULL);
ceph_assert(r == 0);
rados_completion->release();
void ObjectPlayer::watch(Context *on_fetch, double interval) {
ldout(m_cct, 20) << __func__ << ": " << m_oid << " watch" << dendl;
- Mutex::Locker timer_locker(m_timer_lock);
+ std::lock_guard timer_locker{m_timer_lock};
m_watch_interval = interval;
ceph_assert(m_watch_ctx == nullptr);
ldout(m_cct, 20) << __func__ << ": " << m_oid << " unwatch" << dendl;
Context *watch_ctx = nullptr;
{
- Mutex::Locker timer_locker(m_timer_lock);
+ std::lock_guard timer_locker{m_timer_lock};
ceph_assert(!m_unwatched);
m_unwatched = true;
}
void ObjectPlayer::front(Entry *entry) const {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
ceph_assert(!m_entries.empty());
*entry = m_entries.front();
}
void ObjectPlayer::pop_front() {
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
ceph_assert(!m_entries.empty());
auto &entry = m_entries.front();
return 0;
}
- Mutex::Locker locker(m_lock);
+ std::lock_guard locker{m_lock};
ceph_assert(m_fetch_in_progress);
m_read_off += bl.length();
m_read_bl.append(bl);
clear_invalid_range(m_read_bl_off, m_read_bl.length());
bufferlist::const_iterator iter{&m_read_bl, 0};
+ uint32_t pad_len = 0;
while (!iter.end()) {
uint32_t bytes_needed;
uint32_t bl_off = iter.get_off();
break;
}
- if (!invalid) {
+ if (!advance_to_last_pad_byte(m_read_bl_off + iter.get_off(), &iter,
+ &pad_len, &partial_entry)) {
invalid_start_off = m_read_bl_off + bl_off;
invalid = true;
- lderr(m_cct) << ": detected corrupt journal entry at offset "
- << invalid_start_off << dendl;
+ if (partial_entry) {
+ if (full_fetch) {
+ lderr(m_cct) << ": partial pad at offset " << invalid_start_off
+ << dendl;
+ } else {
+ ldout(m_cct, 20) << ": partial pad detected, will re-fetch"
+ << dendl;
+ }
+ } else {
+ lderr(m_cct) << ": detected corrupt journal entry at offset "
+ << invalid_start_off << dendl;
+ }
+ break;
}
++iter;
continue;
iter = bufferlist::iterator(&m_read_bl, 0);
// advance the decoded entry offset
- m_read_bl_off += entry_len;
+ m_read_bl_off += entry_len + pad_len;
+ pad_len = 0;
}
if (invalid) {
}
void ObjectPlayer::schedule_watch() {
- ceph_assert(m_timer_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_timer_lock));
if (m_watch_ctx == NULL) {
return;
}
ceph_assert(m_watch_task == nullptr);
m_watch_task = m_timer.add_event_after(
m_watch_interval,
- new FunctionContext([this](int) {
+ new LambdaContext([this](int) {
handle_watch_task();
}));
}
bool ObjectPlayer::cancel_watch() {
- ceph_assert(m_timer_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_timer_lock));
ldout(m_cct, 20) << __func__ << ": " << m_oid << " cancelling watch" << dendl;
if (m_watch_task != nullptr) {
bool canceled = m_timer.cancel_event(m_watch_task);
}
void ObjectPlayer::handle_watch_task() {
- ceph_assert(m_timer_lock.is_locked());
+ ceph_assert(ceph_mutex_is_locked(m_timer_lock));
ldout(m_cct, 10) << __func__ << ": " << m_oid << " polling" << dendl;
ceph_assert(m_watch_ctx != nullptr);
Context *watch_ctx = nullptr;
{
- Mutex::Locker timer_locker(m_timer_lock);
+ std::lock_guard timer_locker{m_timer_lock};
std::swap(watch_ctx, m_watch_ctx);
if (m_unwatched) {
r = object_player->handle_fetch_complete(r, read_bl, &refetch);
{
- Mutex::Locker locker(object_player->m_lock);
+ std::lock_guard locker{object_player->m_lock};
object_player->m_fetch_in_progress = false;
}