]> git.proxmox.com Git - ceph.git/blame - ceph/src/journal/ObjectPlayer.cc
import 15.2.0 Octopus source
[ceph.git] / ceph / src / journal / ObjectPlayer.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "journal/ObjectPlayer.h"
5#include "journal/Utils.h"
6#include "common/Timer.h"
7#include <limits>
8
9#define dout_subsys ceph_subsys_journaler
10#undef dout_prefix
11#define dout_prefix *_dout << "ObjectPlayer: " << this << " "
12
13namespace journal {
14
9f95a23c
TL
15namespace {
16
17bool advance_to_last_pad_byte(uint32_t off, bufferlist::const_iterator *iter,
18 uint32_t *pad_len, bool *partial_entry) {
19 const uint32_t MAX_PAD = 8;
20 auto pad_bytes = MAX_PAD - off % MAX_PAD;
21 auto next = *iter;
22
23 ceph_assert(!next.end());
24 if (*next != '\0') {
25 return false;
26 }
27
28 for (auto i = pad_bytes - 1; i > 0; i--) {
29 if ((++next).end()) {
30 *partial_entry = true;
31 return false;
32 }
33 if (*next != '\0') {
34 return false;
35 }
36 }
37
38 *iter = next;
39 *pad_len += pad_bytes;
40 return true;
41}
42
43} // anonymous namespace
44
7c673cae 45ObjectPlayer::ObjectPlayer(librados::IoCtx &ioctx,
9f95a23c 46 const std::string& object_oid_prefix,
7c673cae 47 uint64_t object_num, SafeTimer &timer,
9f95a23c 48 ceph::mutex &timer_lock, uint8_t order,
7c673cae 49 uint64_t max_fetch_bytes)
9f95a23c 50 : m_object_num(object_num),
7c673cae 51 m_oid(utils::get_object_name(object_oid_prefix, m_object_num)),
9f95a23c 52 m_timer(timer), m_timer_lock(timer_lock), m_order(order),
7c673cae 53 m_max_fetch_bytes(max_fetch_bytes > 0 ? max_fetch_bytes : 2 << order),
9f95a23c
TL
54 m_lock(ceph::make_mutex(utils::unique_lock_name("ObjectPlayer::m_lock", this)))
55{
7c673cae
FG
56 m_ioctx.dup(ioctx);
57 m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
58}
59
60ObjectPlayer::~ObjectPlayer() {
61 {
9f95a23c
TL
62 std::lock_guard timer_locker{m_timer_lock};
63 std::lock_guard locker{m_lock};
11fdf7f2
TL
64 ceph_assert(!m_fetch_in_progress);
65 ceph_assert(m_watch_ctx == nullptr);
7c673cae
FG
66 }
67}
68
69void ObjectPlayer::fetch(Context *on_finish) {
70 ldout(m_cct, 10) << __func__ << ": " << m_oid << dendl;
71
9f95a23c 72 std::lock_guard locker{m_lock};
11fdf7f2 73 ceph_assert(!m_fetch_in_progress);
7c673cae
FG
74 m_fetch_in_progress = true;
75
76 C_Fetch *context = new C_Fetch(this, on_finish);
77 librados::ObjectReadOperation op;
78 op.read(m_read_off, m_max_fetch_bytes, &context->read_bl, NULL);
79 op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
80
9f95a23c
TL
81 auto rados_completion =
82 librados::Rados::aio_create_completion(context, utils::rados_ctx_callback);
7c673cae 83 int r = m_ioctx.aio_operate(m_oid, rados_completion, &op, 0, NULL);
11fdf7f2 84 ceph_assert(r == 0);
7c673cae
FG
85 rados_completion->release();
86}
87
88void ObjectPlayer::watch(Context *on_fetch, double interval) {
89 ldout(m_cct, 20) << __func__ << ": " << m_oid << " watch" << dendl;
90
9f95a23c 91 std::lock_guard timer_locker{m_timer_lock};
7c673cae
FG
92 m_watch_interval = interval;
93
11fdf7f2 94 ceph_assert(m_watch_ctx == nullptr);
7c673cae
FG
95 m_watch_ctx = on_fetch;
96
97 schedule_watch();
98}
99
100void ObjectPlayer::unwatch() {
101 ldout(m_cct, 20) << __func__ << ": " << m_oid << " unwatch" << dendl;
102 Context *watch_ctx = nullptr;
103 {
9f95a23c 104 std::lock_guard timer_locker{m_timer_lock};
11fdf7f2 105 ceph_assert(!m_unwatched);
7c673cae
FG
106 m_unwatched = true;
107
108 if (!cancel_watch()) {
109 return;
110 }
111
112 std::swap(watch_ctx, m_watch_ctx);
113 }
114
115 if (watch_ctx != nullptr) {
116 watch_ctx->complete(-ECANCELED);
117 }
118}
119
120void ObjectPlayer::front(Entry *entry) const {
9f95a23c 121 std::lock_guard locker{m_lock};
11fdf7f2 122 ceph_assert(!m_entries.empty());
7c673cae
FG
123 *entry = m_entries.front();
124}
125
126void ObjectPlayer::pop_front() {
9f95a23c 127 std::lock_guard locker{m_lock};
11fdf7f2 128 ceph_assert(!m_entries.empty());
7c673cae
FG
129
130 auto &entry = m_entries.front();
131 m_entry_keys.erase({entry.get_tag_tid(), entry.get_entry_tid()});
132 m_entries.pop_front();
133}
134
135int ObjectPlayer::handle_fetch_complete(int r, const bufferlist &bl,
136 bool *refetch) {
137 ldout(m_cct, 10) << __func__ << ": " << m_oid << ", r=" << r << ", len="
138 << bl.length() << dendl;
139
140 *refetch = false;
141 if (r == -ENOENT) {
142 return 0;
143 } else if (r < 0) {
144 return r;
145 } else if (bl.length() == 0) {
146 return 0;
147 }
148
9f95a23c 149 std::lock_guard locker{m_lock};
11fdf7f2 150 ceph_assert(m_fetch_in_progress);
7c673cae
FG
151 m_read_off += bl.length();
152 m_read_bl.append(bl);
153 m_refetch_state = REFETCH_STATE_REQUIRED;
154
155 bool full_fetch = (m_max_fetch_bytes == 2U << m_order);
156 bool partial_entry = false;
157 bool invalid = false;
158 uint32_t invalid_start_off = 0;
159
160 clear_invalid_range(m_read_bl_off, m_read_bl.length());
11fdf7f2 161 bufferlist::const_iterator iter{&m_read_bl, 0};
9f95a23c 162 uint32_t pad_len = 0;
7c673cae
FG
163 while (!iter.end()) {
164 uint32_t bytes_needed;
165 uint32_t bl_off = iter.get_off();
166 if (!Entry::is_readable(iter, &bytes_needed)) {
167 if (bytes_needed != 0) {
168 invalid_start_off = m_read_bl_off + bl_off;
169 invalid = true;
170 partial_entry = true;
171 if (full_fetch) {
172 lderr(m_cct) << ": partial record at offset " << invalid_start_off
173 << dendl;
174 } else {
175 ldout(m_cct, 20) << ": partial record detected, will re-fetch"
176 << dendl;
177 }
178 break;
179 }
180
9f95a23c
TL
181 if (!advance_to_last_pad_byte(m_read_bl_off + iter.get_off(), &iter,
182 &pad_len, &partial_entry)) {
7c673cae
FG
183 invalid_start_off = m_read_bl_off + bl_off;
184 invalid = true;
9f95a23c
TL
185 if (partial_entry) {
186 if (full_fetch) {
187 lderr(m_cct) << ": partial pad at offset " << invalid_start_off
188 << dendl;
189 } else {
190 ldout(m_cct, 20) << ": partial pad detected, will re-fetch"
191 << dendl;
192 }
193 } else {
194 lderr(m_cct) << ": detected corrupt journal entry at offset "
195 << invalid_start_off << dendl;
196 }
197 break;
7c673cae
FG
198 }
199 ++iter;
200 continue;
201 }
202
203 Entry entry;
11fdf7f2 204 decode(entry, iter);
7c673cae
FG
205 ldout(m_cct, 20) << ": " << entry << " decoded" << dendl;
206
207 uint32_t entry_len = iter.get_off() - bl_off;
208 if (invalid) {
209 // new corrupt region detected
210 uint32_t invalid_end_off = m_read_bl_off + bl_off;
211 lderr(m_cct) << ": corruption range [" << invalid_start_off
212 << ", " << invalid_end_off << ")" << dendl;
213 m_invalid_ranges.insert(invalid_start_off,
214 invalid_end_off - invalid_start_off);
215 invalid = false;
494da23a
TL
216
217 m_read_bl_off = invalid_end_off;
7c673cae
FG
218 }
219
220 EntryKey entry_key(std::make_pair(entry.get_tag_tid(),
221 entry.get_entry_tid()));
222 if (m_entry_keys.find(entry_key) == m_entry_keys.end()) {
223 m_entry_keys[entry_key] = m_entries.insert(m_entries.end(), entry);
224 } else {
225 ldout(m_cct, 10) << ": " << entry << " is duplicate, replacing" << dendl;
226 *m_entry_keys[entry_key] = entry;
227 }
228
229 // prune decoded / corrupted journal entries from front of bl
230 bufferlist sub_bl;
231 sub_bl.substr_of(m_read_bl, iter.get_off(),
232 m_read_bl.length() - iter.get_off());
233 sub_bl.swap(m_read_bl);
234 iter = bufferlist::iterator(&m_read_bl, 0);
235
236 // advance the decoded entry offset
9f95a23c
TL
237 m_read_bl_off += entry_len + pad_len;
238 pad_len = 0;
7c673cae
FG
239 }
240
241 if (invalid) {
242 uint32_t invalid_end_off = m_read_bl_off + m_read_bl.length();
243 if (!partial_entry) {
244 lderr(m_cct) << ": corruption range [" << invalid_start_off
245 << ", " << invalid_end_off << ")" << dendl;
246 }
247 m_invalid_ranges.insert(invalid_start_off,
248 invalid_end_off - invalid_start_off);
249 }
250
251 if (!m_invalid_ranges.empty() && !partial_entry) {
252 return -EBADMSG;
253 } else if (partial_entry && (full_fetch || m_entries.empty())) {
254 *refetch = true;
255 return -EAGAIN;
256 }
257
258 return 0;
259}
260
261void ObjectPlayer::clear_invalid_range(uint32_t off, uint32_t len) {
262 // possibly remove previously partial record region
263 InvalidRanges decode_range;
264 decode_range.insert(off, len);
265 InvalidRanges intersect_range;
266 intersect_range.intersection_of(m_invalid_ranges, decode_range);
267 if (!intersect_range.empty()) {
268 ldout(m_cct, 20) << ": clearing invalid range: " << intersect_range
269 << dendl;
270 m_invalid_ranges.subtract(intersect_range);
271 }
272}
273
274void ObjectPlayer::schedule_watch() {
9f95a23c 275 ceph_assert(ceph_mutex_is_locked(m_timer_lock));
7c673cae
FG
276 if (m_watch_ctx == NULL) {
277 return;
278 }
279
280 ldout(m_cct, 20) << __func__ << ": " << m_oid << " scheduling watch" << dendl;
11fdf7f2 281 ceph_assert(m_watch_task == nullptr);
3efd9988
FG
282 m_watch_task = m_timer.add_event_after(
283 m_watch_interval,
9f95a23c 284 new LambdaContext([this](int) {
3efd9988
FG
285 handle_watch_task();
286 }));
7c673cae
FG
287}
288
289bool ObjectPlayer::cancel_watch() {
9f95a23c 290 ceph_assert(ceph_mutex_is_locked(m_timer_lock));
7c673cae
FG
291 ldout(m_cct, 20) << __func__ << ": " << m_oid << " cancelling watch" << dendl;
292 if (m_watch_task != nullptr) {
293 bool canceled = m_timer.cancel_event(m_watch_task);
11fdf7f2 294 ceph_assert(canceled);
7c673cae
FG
295
296 m_watch_task = nullptr;
297 return true;
298 }
299 return false;
300}
301
302void ObjectPlayer::handle_watch_task() {
9f95a23c 303 ceph_assert(ceph_mutex_is_locked(m_timer_lock));
7c673cae
FG
304
305 ldout(m_cct, 10) << __func__ << ": " << m_oid << " polling" << dendl;
11fdf7f2
TL
306 ceph_assert(m_watch_ctx != nullptr);
307 ceph_assert(m_watch_task != nullptr);
7c673cae
FG
308
309 m_watch_task = nullptr;
310 fetch(new C_WatchFetch(this));
311}
312
313void ObjectPlayer::handle_watch_fetched(int r) {
314 ldout(m_cct, 10) << __func__ << ": " << m_oid << " poll complete, r=" << r
315 << dendl;
316
317 Context *watch_ctx = nullptr;
318 {
9f95a23c 319 std::lock_guard timer_locker{m_timer_lock};
7c673cae
FG
320 std::swap(watch_ctx, m_watch_ctx);
321
322 if (m_unwatched) {
323 m_unwatched = false;
324 r = -ECANCELED;
325 }
326 }
327
328 if (watch_ctx != nullptr) {
329 watch_ctx->complete(r);
330 }
331}
332
333void ObjectPlayer::C_Fetch::finish(int r) {
334 bool refetch = false;
335 r = object_player->handle_fetch_complete(r, read_bl, &refetch);
336
337 {
9f95a23c 338 std::lock_guard locker{object_player->m_lock};
7c673cae
FG
339 object_player->m_fetch_in_progress = false;
340 }
341
342 if (refetch) {
343 object_player->fetch(on_finish);
344 return;
345 }
346
347 object_player.reset();
348 on_finish->complete(r);
349}
350
7c673cae
FG
351void ObjectPlayer::C_WatchFetch::finish(int r) {
352 object_player->handle_watch_fetched(r);
353}
354
355} // namespace journal