]> git.proxmox.com Git - ceph.git/blob - ceph/src/journal/ObjectPlayer.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / journal / ObjectPlayer.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "journal/ObjectPlayer.h"
5 #include "journal/Utils.h"
6 #include "common/Timer.h"
7 #include <limits>
8
9 #define dout_subsys ceph_subsys_journaler
10 #undef dout_prefix
11 #define dout_prefix *_dout << "ObjectPlayer: " << this << " "
12
13 namespace journal {
14
15 ObjectPlayer::ObjectPlayer(librados::IoCtx &ioctx,
16 const std::string &object_oid_prefix,
17 uint64_t object_num, SafeTimer &timer,
18 Mutex &timer_lock, uint8_t order,
19 uint64_t max_fetch_bytes)
20 : RefCountedObject(NULL, 0), m_object_num(object_num),
21 m_oid(utils::get_object_name(object_oid_prefix, m_object_num)),
22 m_cct(NULL), m_timer(timer), m_timer_lock(timer_lock), m_order(order),
23 m_max_fetch_bytes(max_fetch_bytes > 0 ? max_fetch_bytes : 2 << order),
24 m_watch_interval(0), m_watch_task(NULL),
25 m_lock(utils::unique_lock_name("ObjectPlayer::m_lock", this)),
26 m_fetch_in_progress(false) {
27 m_ioctx.dup(ioctx);
28 m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
29 }
30
31 ObjectPlayer::~ObjectPlayer() {
32 {
33 Mutex::Locker timer_locker(m_timer_lock);
34 Mutex::Locker locker(m_lock);
35 assert(!m_fetch_in_progress);
36 assert(m_watch_ctx == nullptr);
37 }
38 }
39
40 void ObjectPlayer::fetch(Context *on_finish) {
41 ldout(m_cct, 10) << __func__ << ": " << m_oid << dendl;
42
43 Mutex::Locker locker(m_lock);
44 assert(!m_fetch_in_progress);
45 m_fetch_in_progress = true;
46
47 C_Fetch *context = new C_Fetch(this, on_finish);
48 librados::ObjectReadOperation op;
49 op.read(m_read_off, m_max_fetch_bytes, &context->read_bl, NULL);
50 op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
51
52 librados::AioCompletion *rados_completion =
53 librados::Rados::aio_create_completion(context, utils::rados_ctx_callback,
54 NULL);
55 int r = m_ioctx.aio_operate(m_oid, rados_completion, &op, 0, NULL);
56 assert(r == 0);
57 rados_completion->release();
58 }
59
60 void ObjectPlayer::watch(Context *on_fetch, double interval) {
61 ldout(m_cct, 20) << __func__ << ": " << m_oid << " watch" << dendl;
62
63 Mutex::Locker timer_locker(m_timer_lock);
64 m_watch_interval = interval;
65
66 assert(m_watch_ctx == nullptr);
67 m_watch_ctx = on_fetch;
68
69 schedule_watch();
70 }
71
72 void ObjectPlayer::unwatch() {
73 ldout(m_cct, 20) << __func__ << ": " << m_oid << " unwatch" << dendl;
74 Context *watch_ctx = nullptr;
75 {
76 Mutex::Locker timer_locker(m_timer_lock);
77 assert(!m_unwatched);
78 m_unwatched = true;
79
80 if (!cancel_watch()) {
81 return;
82 }
83
84 std::swap(watch_ctx, m_watch_ctx);
85 }
86
87 if (watch_ctx != nullptr) {
88 watch_ctx->complete(-ECANCELED);
89 }
90 }
91
92 void ObjectPlayer::front(Entry *entry) const {
93 Mutex::Locker locker(m_lock);
94 assert(!m_entries.empty());
95 *entry = m_entries.front();
96 }
97
98 void ObjectPlayer::pop_front() {
99 Mutex::Locker locker(m_lock);
100 assert(!m_entries.empty());
101
102 auto &entry = m_entries.front();
103 m_entry_keys.erase({entry.get_tag_tid(), entry.get_entry_tid()});
104 m_entries.pop_front();
105 }
106
107 int ObjectPlayer::handle_fetch_complete(int r, const bufferlist &bl,
108 bool *refetch) {
109 ldout(m_cct, 10) << __func__ << ": " << m_oid << ", r=" << r << ", len="
110 << bl.length() << dendl;
111
112 *refetch = false;
113 if (r == -ENOENT) {
114 return 0;
115 } else if (r < 0) {
116 return r;
117 } else if (bl.length() == 0) {
118 return 0;
119 }
120
121 Mutex::Locker locker(m_lock);
122 assert(m_fetch_in_progress);
123 m_read_off += bl.length();
124 m_read_bl.append(bl);
125 m_refetch_state = REFETCH_STATE_REQUIRED;
126
127 bool full_fetch = (m_max_fetch_bytes == 2U << m_order);
128 bool partial_entry = false;
129 bool invalid = false;
130 uint32_t invalid_start_off = 0;
131
132 clear_invalid_range(m_read_bl_off, m_read_bl.length());
133 bufferlist::iterator iter(&m_read_bl, 0);
134 while (!iter.end()) {
135 uint32_t bytes_needed;
136 uint32_t bl_off = iter.get_off();
137 if (!Entry::is_readable(iter, &bytes_needed)) {
138 if (bytes_needed != 0) {
139 invalid_start_off = m_read_bl_off + bl_off;
140 invalid = true;
141 partial_entry = true;
142 if (full_fetch) {
143 lderr(m_cct) << ": partial record at offset " << invalid_start_off
144 << dendl;
145 } else {
146 ldout(m_cct, 20) << ": partial record detected, will re-fetch"
147 << dendl;
148 }
149 break;
150 }
151
152 if (!invalid) {
153 invalid_start_off = m_read_bl_off + bl_off;
154 invalid = true;
155 lderr(m_cct) << ": detected corrupt journal entry at offset "
156 << invalid_start_off << dendl;
157 }
158 ++iter;
159 continue;
160 }
161
162 Entry entry;
163 ::decode(entry, iter);
164 ldout(m_cct, 20) << ": " << entry << " decoded" << dendl;
165
166 uint32_t entry_len = iter.get_off() - bl_off;
167 if (invalid) {
168 // new corrupt region detected
169 uint32_t invalid_end_off = m_read_bl_off + bl_off;
170 lderr(m_cct) << ": corruption range [" << invalid_start_off
171 << ", " << invalid_end_off << ")" << dendl;
172 m_invalid_ranges.insert(invalid_start_off,
173 invalid_end_off - invalid_start_off);
174 invalid = false;
175 }
176
177 EntryKey entry_key(std::make_pair(entry.get_tag_tid(),
178 entry.get_entry_tid()));
179 if (m_entry_keys.find(entry_key) == m_entry_keys.end()) {
180 m_entry_keys[entry_key] = m_entries.insert(m_entries.end(), entry);
181 } else {
182 ldout(m_cct, 10) << ": " << entry << " is duplicate, replacing" << dendl;
183 *m_entry_keys[entry_key] = entry;
184 }
185
186 // prune decoded / corrupted journal entries from front of bl
187 bufferlist sub_bl;
188 sub_bl.substr_of(m_read_bl, iter.get_off(),
189 m_read_bl.length() - iter.get_off());
190 sub_bl.swap(m_read_bl);
191 iter = bufferlist::iterator(&m_read_bl, 0);
192
193 // advance the decoded entry offset
194 m_read_bl_off += entry_len;
195 }
196
197 if (invalid) {
198 uint32_t invalid_end_off = m_read_bl_off + m_read_bl.length();
199 if (!partial_entry) {
200 lderr(m_cct) << ": corruption range [" << invalid_start_off
201 << ", " << invalid_end_off << ")" << dendl;
202 }
203 m_invalid_ranges.insert(invalid_start_off,
204 invalid_end_off - invalid_start_off);
205 }
206
207 if (!m_invalid_ranges.empty() && !partial_entry) {
208 return -EBADMSG;
209 } else if (partial_entry && (full_fetch || m_entries.empty())) {
210 *refetch = true;
211 return -EAGAIN;
212 }
213
214 return 0;
215 }
216
217 void ObjectPlayer::clear_invalid_range(uint32_t off, uint32_t len) {
218 // possibly remove previously partial record region
219 InvalidRanges decode_range;
220 decode_range.insert(off, len);
221 InvalidRanges intersect_range;
222 intersect_range.intersection_of(m_invalid_ranges, decode_range);
223 if (!intersect_range.empty()) {
224 ldout(m_cct, 20) << ": clearing invalid range: " << intersect_range
225 << dendl;
226 m_invalid_ranges.subtract(intersect_range);
227 }
228 }
229
230 void ObjectPlayer::schedule_watch() {
231 assert(m_timer_lock.is_locked());
232 if (m_watch_ctx == NULL) {
233 return;
234 }
235
236 ldout(m_cct, 20) << __func__ << ": " << m_oid << " scheduling watch" << dendl;
237 assert(m_watch_task == NULL);
238 m_watch_task = new C_WatchTask(this);
239 m_timer.add_event_after(m_watch_interval, m_watch_task);
240 }
241
242 bool ObjectPlayer::cancel_watch() {
243 assert(m_timer_lock.is_locked());
244 ldout(m_cct, 20) << __func__ << ": " << m_oid << " cancelling watch" << dendl;
245 if (m_watch_task != nullptr) {
246 bool canceled = m_timer.cancel_event(m_watch_task);
247 assert(canceled);
248
249 m_watch_task = nullptr;
250 return true;
251 }
252 return false;
253 }
254
255 void ObjectPlayer::handle_watch_task() {
256 assert(m_timer_lock.is_locked());
257
258 ldout(m_cct, 10) << __func__ << ": " << m_oid << " polling" << dendl;
259 assert(m_watch_ctx != nullptr);
260 assert(m_watch_task != nullptr);
261
262 m_watch_task = nullptr;
263 fetch(new C_WatchFetch(this));
264 }
265
266 void ObjectPlayer::handle_watch_fetched(int r) {
267 ldout(m_cct, 10) << __func__ << ": " << m_oid << " poll complete, r=" << r
268 << dendl;
269
270 Context *watch_ctx = nullptr;
271 {
272 Mutex::Locker timer_locker(m_timer_lock);
273 std::swap(watch_ctx, m_watch_ctx);
274
275 if (m_unwatched) {
276 m_unwatched = false;
277 r = -ECANCELED;
278 }
279 }
280
281 if (watch_ctx != nullptr) {
282 watch_ctx->complete(r);
283 }
284 }
285
286 void ObjectPlayer::C_Fetch::finish(int r) {
287 bool refetch = false;
288 r = object_player->handle_fetch_complete(r, read_bl, &refetch);
289
290 {
291 Mutex::Locker locker(object_player->m_lock);
292 object_player->m_fetch_in_progress = false;
293 }
294
295 if (refetch) {
296 object_player->fetch(on_finish);
297 return;
298 }
299
300 object_player.reset();
301 on_finish->complete(r);
302 }
303
304 void ObjectPlayer::C_WatchTask::finish(int r) {
305 object_player->handle_watch_task();
306 }
307
308 void ObjectPlayer::C_WatchFetch::finish(int r) {
309 object_player->handle_watch_fetched(r);
310 }
311
312 } // namespace journal