]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/journal/ObjectPlayer.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / journal / ObjectPlayer.cc
index d4d9fb750b5e7f3dbc7840ecd0c9243d1e7f490d..56eec51f68fc15c17792507b6e5ffcc27298fd27 100644 (file)
 
 namespace journal {
 
+namespace {
+
+bool advance_to_last_pad_byte(uint32_t off, bufferlist::const_iterator *iter,
+                              uint32_t *pad_len, bool *partial_entry) {
+  const uint32_t MAX_PAD = 8;
+  auto pad_bytes = MAX_PAD - off % MAX_PAD;
+  auto next = *iter;
+
+  ceph_assert(!next.end());
+  if (*next != '\0') {
+    return false;
+  }
+
+  for (auto i = pad_bytes - 1; i > 0; i--) {
+    if ((++next).end()) {
+      *partial_entry = true;
+      return false;
+    }
+    if (*next != '\0') {
+      return false;
+    }
+  }
+
+  *iter = next;
+  *pad_len += pad_bytes;
+  return true;
+}
+
+} // anonymous namespace
+
 ObjectPlayer::ObjectPlayer(librados::IoCtx &ioctx,
-                           const std::string &object_oid_prefix,
+                           const std::stringobject_oid_prefix,
                            uint64_t object_num, SafeTimer &timer,
-                           Mutex &timer_lock, uint8_t order,
+                           ceph::mutex &timer_lock, uint8_t order,
                            uint64_t max_fetch_bytes)
-  : RefCountedObject(NULL, 0), m_object_num(object_num),
+  : m_object_num(object_num),
     m_oid(utils::get_object_name(object_oid_prefix, m_object_num)),
-    m_cct(NULL), m_timer(timer), m_timer_lock(timer_lock), m_order(order),
+    m_timer(timer), m_timer_lock(timer_lock), m_order(order),
     m_max_fetch_bytes(max_fetch_bytes > 0 ? max_fetch_bytes : 2 << order),
-    m_watch_interval(0), m_watch_task(NULL),
-    m_lock(utils::unique_lock_name("ObjectPlayer::m_lock", this)),
-    m_fetch_in_progress(false) {
+    m_lock(ceph::make_mutex(utils::unique_lock_name("ObjectPlayer::m_lock", this)))
+{
   m_ioctx.dup(ioctx);
   m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
 }
 
 ObjectPlayer::~ObjectPlayer() {
   {
-    Mutex::Locker timer_locker(m_timer_lock);
-    Mutex::Locker locker(m_lock);
+    std::lock_guard timer_locker{m_timer_lock};
+    std::lock_guard locker{m_lock};
     ceph_assert(!m_fetch_in_progress);
     ceph_assert(m_watch_ctx == nullptr);
   }
@@ -40,7 +69,7 @@ ObjectPlayer::~ObjectPlayer() {
 void ObjectPlayer::fetch(Context *on_finish) {
   ldout(m_cct, 10) << __func__ << ": " << m_oid << dendl;
 
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
   ceph_assert(!m_fetch_in_progress);
   m_fetch_in_progress = true;
 
@@ -49,9 +78,8 @@ void ObjectPlayer::fetch(Context *on_finish) {
   op.read(m_read_off, m_max_fetch_bytes, &context->read_bl, NULL);
   op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
 
-  librados::AioCompletion *rados_completion =
-    librados::Rados::aio_create_completion(context, utils::rados_ctx_callback,
-                                           NULL);
+  auto rados_completion =
+    librados::Rados::aio_create_completion(context, utils::rados_ctx_callback);
   int r = m_ioctx.aio_operate(m_oid, rados_completion, &op, 0, NULL);
   ceph_assert(r == 0);
   rados_completion->release();
@@ -60,7 +88,7 @@ void ObjectPlayer::fetch(Context *on_finish) {
 void ObjectPlayer::watch(Context *on_fetch, double interval) {
   ldout(m_cct, 20) << __func__ << ": " << m_oid << " watch" << dendl;
 
-  Mutex::Locker timer_locker(m_timer_lock);
+  std::lock_guard timer_locker{m_timer_lock};
   m_watch_interval = interval;
 
   ceph_assert(m_watch_ctx == nullptr);
@@ -73,7 +101,7 @@ void ObjectPlayer::unwatch() {
   ldout(m_cct, 20) << __func__ << ": " << m_oid << " unwatch" << dendl;
   Context *watch_ctx = nullptr;
   {
-    Mutex::Locker timer_locker(m_timer_lock);
+    std::lock_guard timer_locker{m_timer_lock};
     ceph_assert(!m_unwatched);
     m_unwatched = true;
 
@@ -90,13 +118,13 @@ void ObjectPlayer::unwatch() {
 }
 
 void ObjectPlayer::front(Entry *entry) const {
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
   ceph_assert(!m_entries.empty());
   *entry = m_entries.front();
 }
 
 void ObjectPlayer::pop_front() {
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
   ceph_assert(!m_entries.empty());
 
   auto &entry = m_entries.front();
@@ -118,7 +146,7 @@ int ObjectPlayer::handle_fetch_complete(int r, const bufferlist &bl,
     return 0;
   }
 
-  Mutex::Locker locker(m_lock);
+  std::lock_guard locker{m_lock};
   ceph_assert(m_fetch_in_progress);
   m_read_off += bl.length();
   m_read_bl.append(bl);
@@ -131,6 +159,7 @@ int ObjectPlayer::handle_fetch_complete(int r, const bufferlist &bl,
 
   clear_invalid_range(m_read_bl_off, m_read_bl.length());
   bufferlist::const_iterator iter{&m_read_bl, 0};
+  uint32_t pad_len = 0;
   while (!iter.end()) {
     uint32_t bytes_needed;
     uint32_t bl_off = iter.get_off();
@@ -149,11 +178,23 @@ int ObjectPlayer::handle_fetch_complete(int r, const bufferlist &bl,
         break;
       }
 
-      if (!invalid) {
+      if (!advance_to_last_pad_byte(m_read_bl_off + iter.get_off(), &iter,
+                                    &pad_len, &partial_entry)) {
         invalid_start_off = m_read_bl_off + bl_off;
         invalid = true;
-        lderr(m_cct) << ": detected corrupt journal entry at offset "
-                     << invalid_start_off << dendl;
+        if (partial_entry) {
+          if (full_fetch) {
+            lderr(m_cct) << ": partial pad at offset " << invalid_start_off
+                         << dendl;
+          } else {
+            ldout(m_cct, 20) << ": partial pad detected, will re-fetch"
+                             << dendl;
+          }
+        } else {
+          lderr(m_cct) << ": detected corrupt journal entry at offset "
+                       << invalid_start_off << dendl;
+        }
+        break;
       }
       ++iter;
       continue;
@@ -193,7 +234,8 @@ int ObjectPlayer::handle_fetch_complete(int r, const bufferlist &bl,
     iter = bufferlist::iterator(&m_read_bl, 0);
 
     // advance the decoded entry offset
-    m_read_bl_off += entry_len;
+    m_read_bl_off += entry_len + pad_len;
+    pad_len = 0;
   }
 
   if (invalid) {
@@ -230,7 +272,7 @@ void ObjectPlayer::clear_invalid_range(uint32_t off, uint32_t len) {
 }
 
 void ObjectPlayer::schedule_watch() {
-  ceph_assert(m_timer_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(m_timer_lock));
   if (m_watch_ctx == NULL) {
     return;
   }
@@ -239,13 +281,13 @@ void ObjectPlayer::schedule_watch() {
   ceph_assert(m_watch_task == nullptr);
   m_watch_task = m_timer.add_event_after(
     m_watch_interval,
-    new FunctionContext([this](int) {
+    new LambdaContext([this](int) {
        handle_watch_task();
       }));
 }
 
 bool ObjectPlayer::cancel_watch() {
-  ceph_assert(m_timer_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(m_timer_lock));
   ldout(m_cct, 20) << __func__ << ": " << m_oid << " cancelling watch" << dendl;
   if (m_watch_task != nullptr) {
     bool canceled = m_timer.cancel_event(m_watch_task);
@@ -258,7 +300,7 @@ bool ObjectPlayer::cancel_watch() {
 }
 
 void ObjectPlayer::handle_watch_task() {
-  ceph_assert(m_timer_lock.is_locked());
+  ceph_assert(ceph_mutex_is_locked(m_timer_lock));
 
   ldout(m_cct, 10) << __func__ << ": " << m_oid << " polling" << dendl;
   ceph_assert(m_watch_ctx != nullptr);
@@ -274,7 +316,7 @@ void ObjectPlayer::handle_watch_fetched(int r) {
 
   Context *watch_ctx = nullptr;
   {
-    Mutex::Locker timer_locker(m_timer_lock);
+    std::lock_guard timer_locker{m_timer_lock};
     std::swap(watch_ctx, m_watch_ctx);
 
     if (m_unwatched) {
@@ -293,7 +335,7 @@ void ObjectPlayer::C_Fetch::finish(int r) {
   r = object_player->handle_fetch_complete(r, read_bl, &refetch);
 
   {
-    Mutex::Locker locker(object_player->m_lock);
+    std::lock_guard locker{object_player->m_lock};
     object_player->m_fetch_in_progress = false;
   }