]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "journal/ObjectRecorder.h" | |
5 | #include "journal/Future.h" | |
6 | #include "journal/Utils.h" | |
11fdf7f2 | 7 | #include "include/ceph_assert.h" |
7c673cae FG |
8 | #include "common/Timer.h" |
9 | #include "cls/journal/cls_journal_client.h" | |
10 | ||
11 | #define dout_subsys ceph_subsys_journaler | |
12 | #undef dout_prefix | |
13 | #define dout_prefix *_dout << "ObjectRecorder: " << this << " " | |
14 | ||
15 | using namespace cls::journal; | |
16 | using std::shared_ptr; | |
17 | ||
18 | namespace journal { | |
19 | ||
20 | ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid, | |
21 | uint64_t object_number, shared_ptr<Mutex> lock, | |
22 | ContextWQ *work_queue, SafeTimer &timer, | |
23 | Mutex &timer_lock, Handler *handler, | |
24 | uint8_t order, uint32_t flush_interval, | |
11fdf7f2 TL |
25 | uint64_t flush_bytes, double flush_age, |
26 | uint64_t max_in_flight_appends) | |
7c673cae FG |
27 | : RefCountedObject(NULL, 0), m_oid(oid), m_object_number(object_number), |
28 | m_cct(NULL), m_op_work_queue(work_queue), m_timer(timer), | |
29 | m_timer_lock(timer_lock), m_handler(handler), m_order(order), | |
30 | m_soft_max_size(1 << m_order), m_flush_interval(flush_interval), | |
11fdf7f2 TL |
31 | m_flush_bytes(flush_bytes), m_flush_age(flush_age), |
32 | m_max_in_flight_appends(max_in_flight_appends), m_flush_handler(this), | |
3efd9988 | 33 | m_lock(lock), m_append_tid(0), m_pending_bytes(0), |
7c673cae FG |
34 | m_size(0), m_overflowed(false), m_object_closed(false), |
35 | m_in_flight_flushes(false), m_aio_scheduled(false) { | |
36 | m_ioctx.dup(ioctx); | |
37 | m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct()); | |
11fdf7f2 | 38 | ceph_assert(m_handler != NULL); |
7c673cae FG |
39 | } |
40 | ||
41 | ObjectRecorder::~ObjectRecorder() { | |
11fdf7f2 TL |
42 | ceph_assert(m_append_task == NULL); |
43 | ceph_assert(m_append_buffers.empty()); | |
44 | ceph_assert(m_in_flight_tids.empty()); | |
45 | ceph_assert(m_in_flight_appends.empty()); | |
46 | ceph_assert(!m_aio_scheduled); | |
7c673cae FG |
47 | } |
48 | ||
49 | bool ObjectRecorder::append_unlock(AppendBuffers &&append_buffers) { | |
11fdf7f2 | 50 | ceph_assert(m_lock->is_locked()); |
7c673cae FG |
51 | |
52 | FutureImplPtr last_flushed_future; | |
53 | bool schedule_append = false; | |
54 | ||
55 | if (m_overflowed) { | |
56 | m_append_buffers.insert(m_append_buffers.end(), | |
57 | append_buffers.begin(), append_buffers.end()); | |
58 | m_lock->Unlock(); | |
59 | return false; | |
60 | } | |
61 | ||
62 | for (AppendBuffers::const_iterator iter = append_buffers.begin(); | |
63 | iter != append_buffers.end(); ++iter) { | |
64 | if (append(*iter, &schedule_append)) { | |
65 | last_flushed_future = iter->first; | |
66 | } | |
67 | } | |
68 | ||
69 | if (last_flushed_future) { | |
70 | flush(last_flushed_future); | |
71 | m_lock->Unlock(); | |
72 | } else { | |
73 | m_lock->Unlock(); | |
74 | if (schedule_append) { | |
75 | schedule_append_task(); | |
76 | } else { | |
77 | cancel_append_task(); | |
78 | } | |
79 | } | |
80 | return (!m_object_closed && !m_overflowed && | |
81 | m_size + m_pending_bytes >= m_soft_max_size); | |
82 | } | |
83 | ||
84 | void ObjectRecorder::flush(Context *on_safe) { | |
85 | ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl; | |
86 | ||
87 | cancel_append_task(); | |
88 | Future future; | |
89 | { | |
90 | Mutex::Locker locker(*m_lock); | |
91 | ||
92 | // if currently handling flush notifications, wait so that | |
93 | // we notify in the correct order (since lock is dropped on | |
94 | // callback) | |
95 | if (m_in_flight_flushes) { | |
96 | m_in_flight_flushes_cond.Wait(*(m_lock.get())); | |
97 | } | |
98 | ||
99 | // attach the flush to the most recent append | |
100 | if (!m_append_buffers.empty()) { | |
101 | future = Future(m_append_buffers.rbegin()->first); | |
102 | ||
103 | flush_appends(true); | |
11fdf7f2 TL |
104 | } else if (!m_pending_buffers.empty()) { |
105 | future = Future(m_pending_buffers.rbegin()->first); | |
7c673cae FG |
106 | } else if (!m_in_flight_appends.empty()) { |
107 | AppendBuffers &append_buffers = m_in_flight_appends.rbegin()->second; | |
11fdf7f2 | 108 | ceph_assert(!append_buffers.empty()); |
7c673cae FG |
109 | future = Future(append_buffers.rbegin()->first); |
110 | } | |
111 | } | |
112 | ||
113 | if (future.is_valid()) { | |
114 | future.flush(on_safe); | |
115 | } else { | |
116 | on_safe->complete(0); | |
117 | } | |
118 | } | |
119 | ||
120 | void ObjectRecorder::flush(const FutureImplPtr &future) { | |
121 | ldout(m_cct, 20) << __func__ << ": " << m_oid << " flushing " << *future | |
122 | << dendl; | |
123 | ||
11fdf7f2 | 124 | ceph_assert(m_lock->is_locked()); |
7c673cae FG |
125 | |
126 | if (future->get_flush_handler().get() != &m_flush_handler) { | |
127 | // if we don't own this future, re-issue the flush so that it hits the | |
128 | // correct journal object owner | |
129 | future->flush(); | |
130 | return; | |
131 | } else if (future->is_flush_in_progress()) { | |
132 | return; | |
133 | } | |
134 | ||
135 | if (m_object_closed || m_overflowed) { | |
136 | return; | |
137 | } | |
138 | ||
139 | AppendBuffers::reverse_iterator r_it; | |
140 | for (r_it = m_append_buffers.rbegin(); r_it != m_append_buffers.rend(); | |
141 | ++r_it) { | |
142 | if (r_it->first == future) { | |
143 | break; | |
144 | } | |
145 | } | |
11fdf7f2 | 146 | ceph_assert(r_it != m_append_buffers.rend()); |
7c673cae FG |
147 | |
148 | auto it = (++r_it).base(); | |
11fdf7f2 | 149 | ceph_assert(it != m_append_buffers.end()); |
7c673cae FG |
150 | ++it; |
151 | ||
152 | AppendBuffers flush_buffers; | |
153 | flush_buffers.splice(flush_buffers.end(), m_append_buffers, | |
154 | m_append_buffers.begin(), it); | |
155 | send_appends(&flush_buffers); | |
156 | } | |
157 | ||
158 | void ObjectRecorder::claim_append_buffers(AppendBuffers *append_buffers) { | |
159 | ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl; | |
160 | ||
11fdf7f2 TL |
161 | ceph_assert(m_lock->is_locked()); |
162 | ceph_assert(m_in_flight_tids.empty()); | |
163 | ceph_assert(m_in_flight_appends.empty()); | |
164 | ceph_assert(m_object_closed || m_overflowed); | |
7c673cae FG |
165 | append_buffers->splice(append_buffers->end(), m_append_buffers, |
166 | m_append_buffers.begin(), m_append_buffers.end()); | |
167 | } | |
168 | ||
169 | bool ObjectRecorder::close() { | |
11fdf7f2 | 170 | ceph_assert(m_lock->is_locked()); |
7c673cae FG |
171 | |
172 | ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl; | |
173 | ||
174 | cancel_append_task(); | |
175 | ||
176 | flush_appends(true); | |
177 | ||
11fdf7f2 | 178 | ceph_assert(!m_object_closed); |
7c673cae FG |
179 | m_object_closed = true; |
180 | return (m_in_flight_tids.empty() && !m_in_flight_flushes && !m_aio_scheduled); | |
181 | } | |
182 | ||
183 | void ObjectRecorder::handle_append_task() { | |
11fdf7f2 | 184 | ceph_assert(m_timer_lock.is_locked()); |
7c673cae FG |
185 | m_append_task = NULL; |
186 | ||
187 | Mutex::Locker locker(*m_lock); | |
188 | flush_appends(true); | |
189 | } | |
190 | ||
191 | void ObjectRecorder::cancel_append_task() { | |
192 | Mutex::Locker locker(m_timer_lock); | |
193 | if (m_append_task != NULL) { | |
194 | m_timer.cancel_event(m_append_task); | |
195 | m_append_task = NULL; | |
196 | } | |
197 | } | |
198 | ||
199 | void ObjectRecorder::schedule_append_task() { | |
200 | Mutex::Locker locker(m_timer_lock); | |
3efd9988 FG |
201 | if (m_append_task == nullptr && m_flush_age > 0) { |
202 | m_append_task = m_timer.add_event_after( | |
203 | m_flush_age, new FunctionContext([this](int) { | |
204 | handle_append_task(); | |
205 | })); | |
7c673cae FG |
206 | } |
207 | } | |
208 | ||
209 | bool ObjectRecorder::append(const AppendBuffer &append_buffer, | |
210 | bool *schedule_append) { | |
11fdf7f2 | 211 | ceph_assert(m_lock->is_locked()); |
7c673cae FG |
212 | |
213 | bool flush_requested = false; | |
214 | if (!m_object_closed && !m_overflowed) { | |
215 | flush_requested = append_buffer.first->attach(&m_flush_handler); | |
216 | } | |
217 | ||
218 | m_append_buffers.push_back(append_buffer); | |
219 | m_pending_bytes += append_buffer.second.length(); | |
220 | ||
221 | if (!flush_appends(false)) { | |
222 | *schedule_append = true; | |
223 | } | |
224 | return flush_requested; | |
225 | } | |
226 | ||
227 | bool ObjectRecorder::flush_appends(bool force) { | |
11fdf7f2 | 228 | ceph_assert(m_lock->is_locked()); |
7c673cae FG |
229 | if (m_object_closed || m_overflowed) { |
230 | return true; | |
231 | } | |
232 | ||
233 | if (m_append_buffers.empty() || | |
234 | (!force && | |
235 | m_size + m_pending_bytes < m_soft_max_size && | |
236 | (m_flush_interval > 0 && m_append_buffers.size() < m_flush_interval) && | |
237 | (m_flush_bytes > 0 && m_pending_bytes < m_flush_bytes))) { | |
238 | return false; | |
239 | } | |
240 | ||
241 | m_pending_bytes = 0; | |
242 | AppendBuffers append_buffers; | |
243 | append_buffers.swap(m_append_buffers); | |
244 | send_appends(&append_buffers); | |
245 | return true; | |
246 | } | |
247 | ||
248 | void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) { | |
249 | ldout(m_cct, 10) << __func__ << ": " << m_oid << " tid=" << tid | |
250 | << ", r=" << r << dendl; | |
251 | ||
252 | AppendBuffers append_buffers; | |
253 | { | |
254 | m_lock->Lock(); | |
255 | auto tid_iter = m_in_flight_tids.find(tid); | |
11fdf7f2 | 256 | ceph_assert(tid_iter != m_in_flight_tids.end()); |
7c673cae FG |
257 | m_in_flight_tids.erase(tid_iter); |
258 | ||
259 | InFlightAppends::iterator iter = m_in_flight_appends.find(tid); | |
260 | if (r == -EOVERFLOW || m_overflowed) { | |
261 | if (iter != m_in_flight_appends.end()) { | |
262 | m_overflowed = true; | |
263 | } else { | |
264 | // must have seen an overflow on a previous append op | |
11fdf7f2 | 265 | ceph_assert(r == -EOVERFLOW && m_overflowed); |
7c673cae FG |
266 | } |
267 | ||
268 | // notify of overflow once all in-flight ops are complete | |
269 | if (m_in_flight_tids.empty() && !m_aio_scheduled) { | |
11fdf7f2 | 270 | m_append_buffers.splice(m_append_buffers.begin(), m_pending_buffers); |
7c673cae FG |
271 | append_overflowed(); |
272 | notify_handler_unlock(); | |
273 | } else { | |
274 | m_lock->Unlock(); | |
275 | } | |
276 | return; | |
277 | } | |
278 | ||
11fdf7f2 | 279 | ceph_assert(iter != m_in_flight_appends.end()); |
7c673cae | 280 | append_buffers.swap(iter->second); |
11fdf7f2 | 281 | ceph_assert(!append_buffers.empty()); |
7c673cae FG |
282 | |
283 | m_in_flight_appends.erase(iter); | |
284 | m_in_flight_flushes = true; | |
285 | m_lock->Unlock(); | |
286 | } | |
287 | ||
288 | // Flag the associated futures as complete. | |
289 | for (AppendBuffers::iterator buf_it = append_buffers.begin(); | |
290 | buf_it != append_buffers.end(); ++buf_it) { | |
291 | ldout(m_cct, 20) << __func__ << ": " << *buf_it->first << " marked safe" | |
292 | << dendl; | |
293 | buf_it->first->safe(r); | |
294 | } | |
295 | ||
296 | // wake up any flush requests that raced with a RADOS callback | |
297 | m_lock->Lock(); | |
298 | m_in_flight_flushes = false; | |
299 | m_in_flight_flushes_cond.Signal(); | |
300 | ||
11fdf7f2 TL |
301 | if (!m_aio_scheduled) { |
302 | if (m_in_flight_appends.empty() && m_object_closed) { | |
303 | // all remaining unsent appends should be redirected to new object | |
304 | m_append_buffers.splice(m_append_buffers.begin(), m_pending_buffers); | |
305 | notify_handler_unlock(); | |
306 | } else if (!m_pending_buffers.empty()) { | |
307 | m_aio_scheduled = true; | |
308 | m_lock->Unlock(); | |
309 | send_appends_aio(); | |
310 | } else { | |
311 | m_lock->Unlock(); | |
312 | } | |
7c673cae FG |
313 | } else { |
314 | m_lock->Unlock(); | |
315 | } | |
316 | } | |
317 | ||
318 | void ObjectRecorder::append_overflowed() { | |
319 | ldout(m_cct, 10) << __func__ << ": " << m_oid << " append overflowed" | |
320 | << dendl; | |
321 | ||
11fdf7f2 TL |
322 | ceph_assert(m_lock->is_locked()); |
323 | ceph_assert(!m_in_flight_appends.empty()); | |
7c673cae FG |
324 | |
325 | cancel_append_task(); | |
326 | ||
327 | InFlightAppends in_flight_appends; | |
328 | in_flight_appends.swap(m_in_flight_appends); | |
329 | ||
330 | AppendBuffers restart_append_buffers; | |
331 | for (InFlightAppends::iterator it = in_flight_appends.begin(); | |
332 | it != in_flight_appends.end(); ++it) { | |
333 | restart_append_buffers.insert(restart_append_buffers.end(), | |
334 | it->second.begin(), it->second.end()); | |
335 | } | |
336 | ||
337 | restart_append_buffers.splice(restart_append_buffers.end(), | |
338 | m_append_buffers, | |
339 | m_append_buffers.begin(), | |
340 | m_append_buffers.end()); | |
341 | restart_append_buffers.swap(m_append_buffers); | |
342 | ||
343 | for (AppendBuffers::const_iterator it = m_append_buffers.begin(); | |
344 | it != m_append_buffers.end(); ++it) { | |
345 | ldout(m_cct, 20) << __func__ << ": overflowed " << *it->first | |
346 | << dendl; | |
347 | it->first->detach(); | |
348 | } | |
349 | } | |
350 | ||
351 | void ObjectRecorder::send_appends(AppendBuffers *append_buffers) { | |
11fdf7f2 TL |
352 | ceph_assert(m_lock->is_locked()); |
353 | ceph_assert(!append_buffers->empty()); | |
7c673cae FG |
354 | |
355 | for (AppendBuffers::iterator it = append_buffers->begin(); | |
356 | it != append_buffers->end(); ++it) { | |
357 | ldout(m_cct, 20) << __func__ << ": flushing " << *it->first | |
358 | << dendl; | |
359 | it->first->set_flush_in_progress(); | |
360 | m_size += it->second.length(); | |
361 | } | |
362 | ||
363 | m_pending_buffers.splice(m_pending_buffers.end(), *append_buffers, | |
364 | append_buffers->begin(), append_buffers->end()); | |
365 | if (!m_aio_scheduled) { | |
366 | m_op_work_queue->queue(new FunctionContext([this] (int r) { | |
367 | send_appends_aio(); | |
368 | })); | |
369 | m_aio_scheduled = true; | |
370 | } | |
371 | } | |
372 | ||
373 | void ObjectRecorder::send_appends_aio() { | |
11fdf7f2 | 374 | librados::AioCompletion *rados_completion; |
7c673cae FG |
375 | { |
376 | Mutex::Locker locker(*m_lock); | |
11fdf7f2 TL |
377 | m_aio_scheduled = false; |
378 | ||
379 | if (m_pending_buffers.empty()) { | |
380 | ldout(m_cct, 20) << __func__ << ": " << m_oid << " pending buffers empty" | |
381 | << dendl; | |
382 | return; | |
383 | } | |
384 | ||
385 | if (m_max_in_flight_appends != 0 && | |
386 | m_in_flight_tids.size() >= m_max_in_flight_appends) { | |
387 | ldout(m_cct, 20) << __func__ << ": " << m_oid | |
388 | << " max in flight appends reached" << dendl; | |
389 | return; | |
390 | } | |
391 | ||
94b18763 | 392 | uint64_t append_tid = m_append_tid++; |
7c673cae FG |
393 | m_in_flight_tids.insert(append_tid); |
394 | ||
94b18763 FG |
395 | ldout(m_cct, 10) << __func__ << ": " << m_oid << " flushing journal tid=" |
396 | << append_tid << dendl; | |
397 | ||
11fdf7f2 TL |
398 | librados::ObjectWriteOperation op; |
399 | client::guard_append(&op, m_soft_max_size); | |
94b18763 FG |
400 | auto append_buffers = &m_in_flight_appends[append_tid]; |
401 | ||
402 | for (auto it = m_pending_buffers.begin(); it != m_pending_buffers.end(); ) { | |
403 | ldout(m_cct, 20) << __func__ << ": flushing " << *it->first << dendl; | |
404 | op.append(it->second); | |
405 | op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED); | |
406 | m_aio_sent_size += it->second.length(); | |
407 | append_buffers->push_back(*it); | |
408 | it = m_pending_buffers.erase(it); | |
409 | if (m_aio_sent_size >= m_soft_max_size) { | |
410 | break; | |
411 | } | |
412 | } | |
11fdf7f2 TL |
413 | rados_completion = librados::Rados::aio_create_completion( |
414 | new C_AppendFlush(this, append_tid), nullptr, | |
415 | utils::rados_ctx_callback); | |
416 | int r = m_ioctx.aio_operate(m_oid, rados_completion, &op); | |
417 | ceph_assert(r == 0); | |
7c673cae | 418 | } |
7c673cae | 419 | rados_completion->release(); |
7c673cae FG |
420 | } |
421 | ||
422 | void ObjectRecorder::notify_handler_unlock() { | |
11fdf7f2 | 423 | ceph_assert(m_lock->is_locked()); |
7c673cae FG |
424 | if (m_object_closed) { |
425 | m_lock->Unlock(); | |
426 | m_handler->closed(this); | |
427 | } else { | |
428 | // TODO need to delay completion until after aio_notify completes | |
429 | m_lock->Unlock(); | |
430 | m_handler->overflow(this); | |
431 | } | |
432 | } | |
433 | ||
434 | } // namespace journal |