]> git.proxmox.com Git - ceph.git/blame - ceph/src/journal/ObjectRecorder.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / journal / ObjectRecorder.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "journal/ObjectRecorder.h"
5#include "journal/Future.h"
6#include "journal/Utils.h"
11fdf7f2 7#include "include/ceph_assert.h"
7c673cae
FG
8#include "common/Timer.h"
9#include "cls/journal/cls_journal_client.h"
10
11#define dout_subsys ceph_subsys_journaler
12#undef dout_prefix
13#define dout_prefix *_dout << "ObjectRecorder: " << this << " "
14
15using namespace cls::journal;
16using std::shared_ptr;
17
18namespace journal {
19
20ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
21 uint64_t object_number, shared_ptr<Mutex> lock,
22 ContextWQ *work_queue, SafeTimer &timer,
23 Mutex &timer_lock, Handler *handler,
24 uint8_t order, uint32_t flush_interval,
11fdf7f2
TL
25 uint64_t flush_bytes, double flush_age,
26 uint64_t max_in_flight_appends)
7c673cae
FG
27 : RefCountedObject(NULL, 0), m_oid(oid), m_object_number(object_number),
28 m_cct(NULL), m_op_work_queue(work_queue), m_timer(timer),
29 m_timer_lock(timer_lock), m_handler(handler), m_order(order),
30 m_soft_max_size(1 << m_order), m_flush_interval(flush_interval),
11fdf7f2
TL
31 m_flush_bytes(flush_bytes), m_flush_age(flush_age),
32 m_max_in_flight_appends(max_in_flight_appends), m_flush_handler(this),
3efd9988 33 m_lock(lock), m_append_tid(0), m_pending_bytes(0),
7c673cae
FG
34 m_size(0), m_overflowed(false), m_object_closed(false),
35 m_in_flight_flushes(false), m_aio_scheduled(false) {
36 m_ioctx.dup(ioctx);
37 m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
11fdf7f2 38 ceph_assert(m_handler != NULL);
7c673cae
FG
39}
40
41ObjectRecorder::~ObjectRecorder() {
11fdf7f2
TL
42 ceph_assert(m_append_task == NULL);
43 ceph_assert(m_append_buffers.empty());
44 ceph_assert(m_in_flight_tids.empty());
45 ceph_assert(m_in_flight_appends.empty());
46 ceph_assert(!m_aio_scheduled);
7c673cae
FG
47}
48
49bool ObjectRecorder::append_unlock(AppendBuffers &&append_buffers) {
11fdf7f2 50 ceph_assert(m_lock->is_locked());
7c673cae
FG
51
52 FutureImplPtr last_flushed_future;
53 bool schedule_append = false;
54
55 if (m_overflowed) {
56 m_append_buffers.insert(m_append_buffers.end(),
57 append_buffers.begin(), append_buffers.end());
58 m_lock->Unlock();
59 return false;
60 }
61
62 for (AppendBuffers::const_iterator iter = append_buffers.begin();
63 iter != append_buffers.end(); ++iter) {
64 if (append(*iter, &schedule_append)) {
65 last_flushed_future = iter->first;
66 }
67 }
68
69 if (last_flushed_future) {
70 flush(last_flushed_future);
71 m_lock->Unlock();
72 } else {
73 m_lock->Unlock();
74 if (schedule_append) {
75 schedule_append_task();
76 } else {
77 cancel_append_task();
78 }
79 }
80 return (!m_object_closed && !m_overflowed &&
81 m_size + m_pending_bytes >= m_soft_max_size);
82}
83
84void ObjectRecorder::flush(Context *on_safe) {
85 ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl;
86
87 cancel_append_task();
88 Future future;
89 {
90 Mutex::Locker locker(*m_lock);
91
92 // if currently handling flush notifications, wait so that
93 // we notify in the correct order (since lock is dropped on
94 // callback)
95 if (m_in_flight_flushes) {
96 m_in_flight_flushes_cond.Wait(*(m_lock.get()));
97 }
98
99 // attach the flush to the most recent append
100 if (!m_append_buffers.empty()) {
101 future = Future(m_append_buffers.rbegin()->first);
102
103 flush_appends(true);
11fdf7f2
TL
104 } else if (!m_pending_buffers.empty()) {
105 future = Future(m_pending_buffers.rbegin()->first);
7c673cae
FG
106 } else if (!m_in_flight_appends.empty()) {
107 AppendBuffers &append_buffers = m_in_flight_appends.rbegin()->second;
11fdf7f2 108 ceph_assert(!append_buffers.empty());
7c673cae
FG
109 future = Future(append_buffers.rbegin()->first);
110 }
111 }
112
113 if (future.is_valid()) {
114 future.flush(on_safe);
115 } else {
116 on_safe->complete(0);
117 }
118}
119
120void ObjectRecorder::flush(const FutureImplPtr &future) {
121 ldout(m_cct, 20) << __func__ << ": " << m_oid << " flushing " << *future
122 << dendl;
123
11fdf7f2 124 ceph_assert(m_lock->is_locked());
7c673cae
FG
125
126 if (future->get_flush_handler().get() != &m_flush_handler) {
127 // if we don't own this future, re-issue the flush so that it hits the
128 // correct journal object owner
129 future->flush();
130 return;
131 } else if (future->is_flush_in_progress()) {
132 return;
133 }
134
135 if (m_object_closed || m_overflowed) {
136 return;
137 }
138
139 AppendBuffers::reverse_iterator r_it;
140 for (r_it = m_append_buffers.rbegin(); r_it != m_append_buffers.rend();
141 ++r_it) {
142 if (r_it->first == future) {
143 break;
144 }
145 }
11fdf7f2 146 ceph_assert(r_it != m_append_buffers.rend());
7c673cae
FG
147
148 auto it = (++r_it).base();
11fdf7f2 149 ceph_assert(it != m_append_buffers.end());
7c673cae
FG
150 ++it;
151
152 AppendBuffers flush_buffers;
153 flush_buffers.splice(flush_buffers.end(), m_append_buffers,
154 m_append_buffers.begin(), it);
155 send_appends(&flush_buffers);
156}
157
158void ObjectRecorder::claim_append_buffers(AppendBuffers *append_buffers) {
159 ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl;
160
11fdf7f2
TL
161 ceph_assert(m_lock->is_locked());
162 ceph_assert(m_in_flight_tids.empty());
163 ceph_assert(m_in_flight_appends.empty());
164 ceph_assert(m_object_closed || m_overflowed);
7c673cae
FG
165 append_buffers->splice(append_buffers->end(), m_append_buffers,
166 m_append_buffers.begin(), m_append_buffers.end());
167}
168
169bool ObjectRecorder::close() {
11fdf7f2 170 ceph_assert(m_lock->is_locked());
7c673cae
FG
171
172 ldout(m_cct, 20) << __func__ << ": " << m_oid << dendl;
173
174 cancel_append_task();
175
176 flush_appends(true);
177
11fdf7f2 178 ceph_assert(!m_object_closed);
7c673cae
FG
179 m_object_closed = true;
180 return (m_in_flight_tids.empty() && !m_in_flight_flushes && !m_aio_scheduled);
181}
182
183void ObjectRecorder::handle_append_task() {
11fdf7f2 184 ceph_assert(m_timer_lock.is_locked());
7c673cae
FG
185 m_append_task = NULL;
186
187 Mutex::Locker locker(*m_lock);
188 flush_appends(true);
189}
190
191void ObjectRecorder::cancel_append_task() {
192 Mutex::Locker locker(m_timer_lock);
193 if (m_append_task != NULL) {
194 m_timer.cancel_event(m_append_task);
195 m_append_task = NULL;
196 }
197}
198
199void ObjectRecorder::schedule_append_task() {
200 Mutex::Locker locker(m_timer_lock);
3efd9988
FG
201 if (m_append_task == nullptr && m_flush_age > 0) {
202 m_append_task = m_timer.add_event_after(
203 m_flush_age, new FunctionContext([this](int) {
204 handle_append_task();
205 }));
7c673cae
FG
206 }
207}
208
209bool ObjectRecorder::append(const AppendBuffer &append_buffer,
210 bool *schedule_append) {
11fdf7f2 211 ceph_assert(m_lock->is_locked());
7c673cae
FG
212
213 bool flush_requested = false;
214 if (!m_object_closed && !m_overflowed) {
215 flush_requested = append_buffer.first->attach(&m_flush_handler);
216 }
217
218 m_append_buffers.push_back(append_buffer);
219 m_pending_bytes += append_buffer.second.length();
220
221 if (!flush_appends(false)) {
222 *schedule_append = true;
223 }
224 return flush_requested;
225}
226
227bool ObjectRecorder::flush_appends(bool force) {
11fdf7f2 228 ceph_assert(m_lock->is_locked());
7c673cae
FG
229 if (m_object_closed || m_overflowed) {
230 return true;
231 }
232
233 if (m_append_buffers.empty() ||
234 (!force &&
235 m_size + m_pending_bytes < m_soft_max_size &&
236 (m_flush_interval > 0 && m_append_buffers.size() < m_flush_interval) &&
237 (m_flush_bytes > 0 && m_pending_bytes < m_flush_bytes))) {
238 return false;
239 }
240
241 m_pending_bytes = 0;
242 AppendBuffers append_buffers;
243 append_buffers.swap(m_append_buffers);
244 send_appends(&append_buffers);
245 return true;
246}
247
248void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
249 ldout(m_cct, 10) << __func__ << ": " << m_oid << " tid=" << tid
250 << ", r=" << r << dendl;
251
252 AppendBuffers append_buffers;
253 {
254 m_lock->Lock();
255 auto tid_iter = m_in_flight_tids.find(tid);
11fdf7f2 256 ceph_assert(tid_iter != m_in_flight_tids.end());
7c673cae
FG
257 m_in_flight_tids.erase(tid_iter);
258
259 InFlightAppends::iterator iter = m_in_flight_appends.find(tid);
260 if (r == -EOVERFLOW || m_overflowed) {
261 if (iter != m_in_flight_appends.end()) {
262 m_overflowed = true;
263 } else {
264 // must have seen an overflow on a previous append op
11fdf7f2 265 ceph_assert(r == -EOVERFLOW && m_overflowed);
7c673cae
FG
266 }
267
268 // notify of overflow once all in-flight ops are complete
269 if (m_in_flight_tids.empty() && !m_aio_scheduled) {
11fdf7f2 270 m_append_buffers.splice(m_append_buffers.begin(), m_pending_buffers);
7c673cae
FG
271 append_overflowed();
272 notify_handler_unlock();
273 } else {
274 m_lock->Unlock();
275 }
276 return;
277 }
278
11fdf7f2 279 ceph_assert(iter != m_in_flight_appends.end());
7c673cae 280 append_buffers.swap(iter->second);
11fdf7f2 281 ceph_assert(!append_buffers.empty());
7c673cae
FG
282
283 m_in_flight_appends.erase(iter);
284 m_in_flight_flushes = true;
285 m_lock->Unlock();
286 }
287
288 // Flag the associated futures as complete.
289 for (AppendBuffers::iterator buf_it = append_buffers.begin();
290 buf_it != append_buffers.end(); ++buf_it) {
291 ldout(m_cct, 20) << __func__ << ": " << *buf_it->first << " marked safe"
292 << dendl;
293 buf_it->first->safe(r);
294 }
295
296 // wake up any flush requests that raced with a RADOS callback
297 m_lock->Lock();
298 m_in_flight_flushes = false;
299 m_in_flight_flushes_cond.Signal();
300
11fdf7f2
TL
301 if (!m_aio_scheduled) {
302 if (m_in_flight_appends.empty() && m_object_closed) {
303 // all remaining unsent appends should be redirected to new object
304 m_append_buffers.splice(m_append_buffers.begin(), m_pending_buffers);
305 notify_handler_unlock();
306 } else if (!m_pending_buffers.empty()) {
307 m_aio_scheduled = true;
308 m_lock->Unlock();
309 send_appends_aio();
310 } else {
311 m_lock->Unlock();
312 }
7c673cae
FG
313 } else {
314 m_lock->Unlock();
315 }
316}
317
318void ObjectRecorder::append_overflowed() {
319 ldout(m_cct, 10) << __func__ << ": " << m_oid << " append overflowed"
320 << dendl;
321
11fdf7f2
TL
322 ceph_assert(m_lock->is_locked());
323 ceph_assert(!m_in_flight_appends.empty());
7c673cae
FG
324
325 cancel_append_task();
326
327 InFlightAppends in_flight_appends;
328 in_flight_appends.swap(m_in_flight_appends);
329
330 AppendBuffers restart_append_buffers;
331 for (InFlightAppends::iterator it = in_flight_appends.begin();
332 it != in_flight_appends.end(); ++it) {
333 restart_append_buffers.insert(restart_append_buffers.end(),
334 it->second.begin(), it->second.end());
335 }
336
337 restart_append_buffers.splice(restart_append_buffers.end(),
338 m_append_buffers,
339 m_append_buffers.begin(),
340 m_append_buffers.end());
341 restart_append_buffers.swap(m_append_buffers);
342
343 for (AppendBuffers::const_iterator it = m_append_buffers.begin();
344 it != m_append_buffers.end(); ++it) {
345 ldout(m_cct, 20) << __func__ << ": overflowed " << *it->first
346 << dendl;
347 it->first->detach();
348 }
349}
350
351void ObjectRecorder::send_appends(AppendBuffers *append_buffers) {
11fdf7f2
TL
352 ceph_assert(m_lock->is_locked());
353 ceph_assert(!append_buffers->empty());
7c673cae
FG
354
355 for (AppendBuffers::iterator it = append_buffers->begin();
356 it != append_buffers->end(); ++it) {
357 ldout(m_cct, 20) << __func__ << ": flushing " << *it->first
358 << dendl;
359 it->first->set_flush_in_progress();
360 m_size += it->second.length();
361 }
362
363 m_pending_buffers.splice(m_pending_buffers.end(), *append_buffers,
364 append_buffers->begin(), append_buffers->end());
365 if (!m_aio_scheduled) {
366 m_op_work_queue->queue(new FunctionContext([this] (int r) {
367 send_appends_aio();
368 }));
369 m_aio_scheduled = true;
370 }
371}
372
373void ObjectRecorder::send_appends_aio() {
11fdf7f2 374 librados::AioCompletion *rados_completion;
7c673cae
FG
375 {
376 Mutex::Locker locker(*m_lock);
11fdf7f2
TL
377 m_aio_scheduled = false;
378
379 if (m_pending_buffers.empty()) {
380 ldout(m_cct, 20) << __func__ << ": " << m_oid << " pending buffers empty"
381 << dendl;
382 return;
383 }
384
385 if (m_max_in_flight_appends != 0 &&
386 m_in_flight_tids.size() >= m_max_in_flight_appends) {
387 ldout(m_cct, 20) << __func__ << ": " << m_oid
388 << " max in flight appends reached" << dendl;
389 return;
390 }
391
94b18763 392 uint64_t append_tid = m_append_tid++;
7c673cae
FG
393 m_in_flight_tids.insert(append_tid);
394
94b18763
FG
395 ldout(m_cct, 10) << __func__ << ": " << m_oid << " flushing journal tid="
396 << append_tid << dendl;
397
11fdf7f2
TL
398 librados::ObjectWriteOperation op;
399 client::guard_append(&op, m_soft_max_size);
94b18763
FG
400 auto append_buffers = &m_in_flight_appends[append_tid];
401
402 for (auto it = m_pending_buffers.begin(); it != m_pending_buffers.end(); ) {
403 ldout(m_cct, 20) << __func__ << ": flushing " << *it->first << dendl;
404 op.append(it->second);
405 op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
406 m_aio_sent_size += it->second.length();
407 append_buffers->push_back(*it);
408 it = m_pending_buffers.erase(it);
409 if (m_aio_sent_size >= m_soft_max_size) {
410 break;
411 }
412 }
11fdf7f2
TL
413 rados_completion = librados::Rados::aio_create_completion(
414 new C_AppendFlush(this, append_tid), nullptr,
415 utils::rados_ctx_callback);
416 int r = m_ioctx.aio_operate(m_oid, rados_completion, &op);
417 ceph_assert(r == 0);
7c673cae 418 }
7c673cae 419 rados_completion->release();
7c673cae
FG
420}
421
422void ObjectRecorder::notify_handler_unlock() {
11fdf7f2 423 ceph_assert(m_lock->is_locked());
7c673cae
FG
424 if (m_object_closed) {
425 m_lock->Unlock();
426 m_handler->closed(this);
427 } else {
428 // TODO need to delay completion until after aio_notify completes
429 m_lock->Unlock();
430 m_handler->overflow(this);
431 }
432}
433
434} // namespace journal