1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "journal/ObjectPlayer.h"
5 #include "journal/Utils.h"
6 #include "common/Timer.h"
9 #define dout_subsys ceph_subsys_journaler
11 #define dout_prefix *_dout << "ObjectPlayer: " << this << " "
15 ObjectPlayer::ObjectPlayer(librados::IoCtx
&ioctx
,
16 const std::string
&object_oid_prefix
,
17 uint64_t object_num
, SafeTimer
&timer
,
18 Mutex
&timer_lock
, uint8_t order
,
19 uint64_t max_fetch_bytes
)
20 : RefCountedObject(NULL
, 0), m_object_num(object_num
),
21 m_oid(utils::get_object_name(object_oid_prefix
, m_object_num
)),
22 m_cct(NULL
), m_timer(timer
), m_timer_lock(timer_lock
), m_order(order
),
23 m_max_fetch_bytes(max_fetch_bytes
> 0 ? max_fetch_bytes
: 2 << order
),
24 m_watch_interval(0), m_watch_task(NULL
),
25 m_lock(utils::unique_lock_name("ObjectPlayer::m_lock", this)),
26 m_fetch_in_progress(false) {
28 m_cct
= reinterpret_cast<CephContext
*>(m_ioctx
.cct());
31 ObjectPlayer::~ObjectPlayer() {
33 Mutex::Locker
timer_locker(m_timer_lock
);
34 Mutex::Locker
locker(m_lock
);
35 assert(!m_fetch_in_progress
);
36 assert(m_watch_ctx
== nullptr);
40 void ObjectPlayer::fetch(Context
*on_finish
) {
41 ldout(m_cct
, 10) << __func__
<< ": " << m_oid
<< dendl
;
43 Mutex::Locker
locker(m_lock
);
44 assert(!m_fetch_in_progress
);
45 m_fetch_in_progress
= true;
47 C_Fetch
*context
= new C_Fetch(this, on_finish
);
48 librados::ObjectReadOperation op
;
49 op
.read(m_read_off
, m_max_fetch_bytes
, &context
->read_bl
, NULL
);
50 op
.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
);
52 librados::AioCompletion
*rados_completion
=
53 librados::Rados::aio_create_completion(context
, utils::rados_ctx_callback
,
55 int r
= m_ioctx
.aio_operate(m_oid
, rados_completion
, &op
, 0, NULL
);
57 rados_completion
->release();
60 void ObjectPlayer::watch(Context
*on_fetch
, double interval
) {
61 ldout(m_cct
, 20) << __func__
<< ": " << m_oid
<< " watch" << dendl
;
63 Mutex::Locker
timer_locker(m_timer_lock
);
64 m_watch_interval
= interval
;
66 assert(m_watch_ctx
== nullptr);
67 m_watch_ctx
= on_fetch
;
72 void ObjectPlayer::unwatch() {
73 ldout(m_cct
, 20) << __func__
<< ": " << m_oid
<< " unwatch" << dendl
;
74 Context
*watch_ctx
= nullptr;
76 Mutex::Locker
timer_locker(m_timer_lock
);
80 if (!cancel_watch()) {
84 std::swap(watch_ctx
, m_watch_ctx
);
87 if (watch_ctx
!= nullptr) {
88 watch_ctx
->complete(-ECANCELED
);
92 void ObjectPlayer::front(Entry
*entry
) const {
93 Mutex::Locker
locker(m_lock
);
94 assert(!m_entries
.empty());
95 *entry
= m_entries
.front();
98 void ObjectPlayer::pop_front() {
99 Mutex::Locker
locker(m_lock
);
100 assert(!m_entries
.empty());
102 auto &entry
= m_entries
.front();
103 m_entry_keys
.erase({entry
.get_tag_tid(), entry
.get_entry_tid()});
104 m_entries
.pop_front();
107 int ObjectPlayer::handle_fetch_complete(int r
, const bufferlist
&bl
,
109 ldout(m_cct
, 10) << __func__
<< ": " << m_oid
<< ", r=" << r
<< ", len="
110 << bl
.length() << dendl
;
117 } else if (bl
.length() == 0) {
121 Mutex::Locker
locker(m_lock
);
122 assert(m_fetch_in_progress
);
123 m_read_off
+= bl
.length();
124 m_read_bl
.append(bl
);
125 m_refetch_state
= REFETCH_STATE_REQUIRED
;
127 bool full_fetch
= (m_max_fetch_bytes
== 2U << m_order
);
128 bool partial_entry
= false;
129 bool invalid
= false;
130 uint32_t invalid_start_off
= 0;
132 clear_invalid_range(m_read_bl_off
, m_read_bl
.length());
133 bufferlist::iterator
iter(&m_read_bl
, 0);
134 while (!iter
.end()) {
135 uint32_t bytes_needed
;
136 uint32_t bl_off
= iter
.get_off();
137 if (!Entry::is_readable(iter
, &bytes_needed
)) {
138 if (bytes_needed
!= 0) {
139 invalid_start_off
= m_read_bl_off
+ bl_off
;
141 partial_entry
= true;
143 lderr(m_cct
) << ": partial record at offset " << invalid_start_off
146 ldout(m_cct
, 20) << ": partial record detected, will re-fetch"
153 invalid_start_off
= m_read_bl_off
+ bl_off
;
155 lderr(m_cct
) << ": detected corrupt journal entry at offset "
156 << invalid_start_off
<< dendl
;
163 ::decode(entry
, iter
);
164 ldout(m_cct
, 20) << ": " << entry
<< " decoded" << dendl
;
166 uint32_t entry_len
= iter
.get_off() - bl_off
;
168 // new corrupt region detected
169 uint32_t invalid_end_off
= m_read_bl_off
+ bl_off
;
170 lderr(m_cct
) << ": corruption range [" << invalid_start_off
171 << ", " << invalid_end_off
<< ")" << dendl
;
172 m_invalid_ranges
.insert(invalid_start_off
,
173 invalid_end_off
- invalid_start_off
);
177 EntryKey
entry_key(std::make_pair(entry
.get_tag_tid(),
178 entry
.get_entry_tid()));
179 if (m_entry_keys
.find(entry_key
) == m_entry_keys
.end()) {
180 m_entry_keys
[entry_key
] = m_entries
.insert(m_entries
.end(), entry
);
182 ldout(m_cct
, 10) << ": " << entry
<< " is duplicate, replacing" << dendl
;
183 *m_entry_keys
[entry_key
] = entry
;
186 // prune decoded / corrupted journal entries from front of bl
188 sub_bl
.substr_of(m_read_bl
, iter
.get_off(),
189 m_read_bl
.length() - iter
.get_off());
190 sub_bl
.swap(m_read_bl
);
191 iter
= bufferlist::iterator(&m_read_bl
, 0);
193 // advance the decoded entry offset
194 m_read_bl_off
+= entry_len
;
198 uint32_t invalid_end_off
= m_read_bl_off
+ m_read_bl
.length();
199 if (!partial_entry
) {
200 lderr(m_cct
) << ": corruption range [" << invalid_start_off
201 << ", " << invalid_end_off
<< ")" << dendl
;
203 m_invalid_ranges
.insert(invalid_start_off
,
204 invalid_end_off
- invalid_start_off
);
207 if (!m_invalid_ranges
.empty() && !partial_entry
) {
209 } else if (partial_entry
&& (full_fetch
|| m_entries
.empty())) {
217 void ObjectPlayer::clear_invalid_range(uint32_t off
, uint32_t len
) {
218 // possibly remove previously partial record region
219 InvalidRanges decode_range
;
220 decode_range
.insert(off
, len
);
221 InvalidRanges intersect_range
;
222 intersect_range
.intersection_of(m_invalid_ranges
, decode_range
);
223 if (!intersect_range
.empty()) {
224 ldout(m_cct
, 20) << ": clearing invalid range: " << intersect_range
226 m_invalid_ranges
.subtract(intersect_range
);
230 void ObjectPlayer::schedule_watch() {
231 assert(m_timer_lock
.is_locked());
232 if (m_watch_ctx
== NULL
) {
236 ldout(m_cct
, 20) << __func__
<< ": " << m_oid
<< " scheduling watch" << dendl
;
237 assert(m_watch_task
== NULL
);
238 m_watch_task
= new C_WatchTask(this);
239 m_timer
.add_event_after(m_watch_interval
, m_watch_task
);
242 bool ObjectPlayer::cancel_watch() {
243 assert(m_timer_lock
.is_locked());
244 ldout(m_cct
, 20) << __func__
<< ": " << m_oid
<< " cancelling watch" << dendl
;
245 if (m_watch_task
!= nullptr) {
246 bool canceled
= m_timer
.cancel_event(m_watch_task
);
249 m_watch_task
= nullptr;
255 void ObjectPlayer::handle_watch_task() {
256 assert(m_timer_lock
.is_locked());
258 ldout(m_cct
, 10) << __func__
<< ": " << m_oid
<< " polling" << dendl
;
259 assert(m_watch_ctx
!= nullptr);
260 assert(m_watch_task
!= nullptr);
262 m_watch_task
= nullptr;
263 fetch(new C_WatchFetch(this));
266 void ObjectPlayer::handle_watch_fetched(int r
) {
267 ldout(m_cct
, 10) << __func__
<< ": " << m_oid
<< " poll complete, r=" << r
270 Context
*watch_ctx
= nullptr;
272 Mutex::Locker
timer_locker(m_timer_lock
);
273 std::swap(watch_ctx
, m_watch_ctx
);
281 if (watch_ctx
!= nullptr) {
282 watch_ctx
->complete(r
);
286 void ObjectPlayer::C_Fetch::finish(int r
) {
287 bool refetch
= false;
288 r
= object_player
->handle_fetch_complete(r
, read_bl
, &refetch
);
291 Mutex::Locker
locker(object_player
->m_lock
);
292 object_player
->m_fetch_in_progress
= false;
296 object_player
->fetch(on_finish
);
300 object_player
.reset();
301 on_finish
->complete(r
);
304 void ObjectPlayer::C_WatchTask::finish(int r
) {
305 object_player
->handle_watch_task();
308 void ObjectPlayer::C_WatchFetch::finish(int r
) {
309 object_player
->handle_watch_fetched(r
);
312 } // namespace journal