1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "journal/ObjectRecorder.h"
5 #include "journal/Future.h"
6 #include "journal/Utils.h"
7 #include "include/assert.h"
8 #include "common/Timer.h"
9 #include "cls/journal/cls_journal_client.h"
11 #define dout_subsys ceph_subsys_journaler
13 #define dout_prefix *_dout << "ObjectRecorder: " << this << " "
15 using namespace cls::journal
;
16 using std::shared_ptr
;
20 ObjectRecorder::ObjectRecorder(librados::IoCtx
&ioctx
, const std::string
&oid
,
21 uint64_t object_number
, shared_ptr
<Mutex
> lock
,
22 ContextWQ
*work_queue
, SafeTimer
&timer
,
23 Mutex
&timer_lock
, Handler
*handler
,
24 uint8_t order
, uint32_t flush_interval
,
25 uint64_t flush_bytes
, double flush_age
)
26 : RefCountedObject(NULL
, 0), m_oid(oid
), m_object_number(object_number
),
27 m_cct(NULL
), m_op_work_queue(work_queue
), m_timer(timer
),
28 m_timer_lock(timer_lock
), m_handler(handler
), m_order(order
),
29 m_soft_max_size(1 << m_order
), m_flush_interval(flush_interval
),
30 m_flush_bytes(flush_bytes
), m_flush_age(flush_age
), m_flush_handler(this),
31 m_lock(lock
), m_append_tid(0), m_pending_bytes(0),
32 m_size(0), m_overflowed(false), m_object_closed(false),
33 m_in_flight_flushes(false), m_aio_scheduled(false) {
35 m_cct
= reinterpret_cast<CephContext
*>(m_ioctx
.cct());
36 assert(m_handler
!= NULL
);
39 ObjectRecorder::~ObjectRecorder() {
40 assert(m_append_task
== NULL
);
41 assert(m_append_buffers
.empty());
42 assert(m_in_flight_tids
.empty());
43 assert(m_in_flight_appends
.empty());
44 assert(!m_aio_scheduled
);
47 bool ObjectRecorder::append_unlock(AppendBuffers
&&append_buffers
) {
48 assert(m_lock
->is_locked());
50 FutureImplPtr last_flushed_future
;
51 bool schedule_append
= false;
54 m_append_buffers
.insert(m_append_buffers
.end(),
55 append_buffers
.begin(), append_buffers
.end());
60 for (AppendBuffers::const_iterator iter
= append_buffers
.begin();
61 iter
!= append_buffers
.end(); ++iter
) {
62 if (append(*iter
, &schedule_append
)) {
63 last_flushed_future
= iter
->first
;
67 if (last_flushed_future
) {
68 flush(last_flushed_future
);
72 if (schedule_append
) {
73 schedule_append_task();
78 return (!m_object_closed
&& !m_overflowed
&&
79 m_size
+ m_pending_bytes
>= m_soft_max_size
);
82 void ObjectRecorder::flush(Context
*on_safe
) {
83 ldout(m_cct
, 20) << __func__
<< ": " << m_oid
<< dendl
;
88 Mutex::Locker
locker(*m_lock
);
90 // if currently handling flush notifications, wait so that
91 // we notify in the correct order (since lock is dropped on
93 if (m_in_flight_flushes
) {
94 m_in_flight_flushes_cond
.Wait(*(m_lock
.get()));
97 // attach the flush to the most recent append
98 if (!m_append_buffers
.empty()) {
99 future
= Future(m_append_buffers
.rbegin()->first
);
102 } else if (!m_in_flight_appends
.empty()) {
103 AppendBuffers
&append_buffers
= m_in_flight_appends
.rbegin()->second
;
104 assert(!append_buffers
.empty());
105 future
= Future(append_buffers
.rbegin()->first
);
109 if (future
.is_valid()) {
110 future
.flush(on_safe
);
112 on_safe
->complete(0);
116 void ObjectRecorder::flush(const FutureImplPtr
&future
) {
117 ldout(m_cct
, 20) << __func__
<< ": " << m_oid
<< " flushing " << *future
120 assert(m_lock
->is_locked());
122 if (future
->get_flush_handler().get() != &m_flush_handler
) {
123 // if we don't own this future, re-issue the flush so that it hits the
124 // correct journal object owner
127 } else if (future
->is_flush_in_progress()) {
131 if (m_object_closed
|| m_overflowed
) {
135 AppendBuffers::reverse_iterator r_it
;
136 for (r_it
= m_append_buffers
.rbegin(); r_it
!= m_append_buffers
.rend();
138 if (r_it
->first
== future
) {
142 assert(r_it
!= m_append_buffers
.rend());
144 auto it
= (++r_it
).base();
145 assert(it
!= m_append_buffers
.end());
148 AppendBuffers flush_buffers
;
149 flush_buffers
.splice(flush_buffers
.end(), m_append_buffers
,
150 m_append_buffers
.begin(), it
);
151 send_appends(&flush_buffers
);
154 void ObjectRecorder::claim_append_buffers(AppendBuffers
*append_buffers
) {
155 ldout(m_cct
, 20) << __func__
<< ": " << m_oid
<< dendl
;
157 assert(m_lock
->is_locked());
158 assert(m_in_flight_tids
.empty());
159 assert(m_in_flight_appends
.empty());
160 assert(m_object_closed
|| m_overflowed
);
161 append_buffers
->splice(append_buffers
->end(), m_append_buffers
,
162 m_append_buffers
.begin(), m_append_buffers
.end());
165 bool ObjectRecorder::close() {
166 assert (m_lock
->is_locked());
168 ldout(m_cct
, 20) << __func__
<< ": " << m_oid
<< dendl
;
170 cancel_append_task();
174 assert(!m_object_closed
);
175 m_object_closed
= true;
176 return (m_in_flight_tids
.empty() && !m_in_flight_flushes
&& !m_aio_scheduled
);
179 void ObjectRecorder::handle_append_task() {
180 assert(m_timer_lock
.is_locked());
181 m_append_task
= NULL
;
183 Mutex::Locker
locker(*m_lock
);
187 void ObjectRecorder::cancel_append_task() {
188 Mutex::Locker
locker(m_timer_lock
);
189 if (m_append_task
!= NULL
) {
190 m_timer
.cancel_event(m_append_task
);
191 m_append_task
= NULL
;
195 void ObjectRecorder::schedule_append_task() {
196 Mutex::Locker
locker(m_timer_lock
);
197 if (m_append_task
== nullptr && m_flush_age
> 0) {
198 m_append_task
= m_timer
.add_event_after(
199 m_flush_age
, new FunctionContext([this](int) {
200 handle_append_task();
205 bool ObjectRecorder::append(const AppendBuffer
&append_buffer
,
206 bool *schedule_append
) {
207 assert(m_lock
->is_locked());
209 bool flush_requested
= false;
210 if (!m_object_closed
&& !m_overflowed
) {
211 flush_requested
= append_buffer
.first
->attach(&m_flush_handler
);
214 m_append_buffers
.push_back(append_buffer
);
215 m_pending_bytes
+= append_buffer
.second
.length();
217 if (!flush_appends(false)) {
218 *schedule_append
= true;
220 return flush_requested
;
223 bool ObjectRecorder::flush_appends(bool force
) {
224 assert(m_lock
->is_locked());
225 if (m_object_closed
|| m_overflowed
) {
229 if (m_append_buffers
.empty() ||
231 m_size
+ m_pending_bytes
< m_soft_max_size
&&
232 (m_flush_interval
> 0 && m_append_buffers
.size() < m_flush_interval
) &&
233 (m_flush_bytes
> 0 && m_pending_bytes
< m_flush_bytes
))) {
238 AppendBuffers append_buffers
;
239 append_buffers
.swap(m_append_buffers
);
240 send_appends(&append_buffers
);
244 void ObjectRecorder::handle_append_flushed(uint64_t tid
, int r
) {
245 ldout(m_cct
, 10) << __func__
<< ": " << m_oid
<< " tid=" << tid
246 << ", r=" << r
<< dendl
;
248 AppendBuffers append_buffers
;
251 auto tid_iter
= m_in_flight_tids
.find(tid
);
252 assert(tid_iter
!= m_in_flight_tids
.end());
253 m_in_flight_tids
.erase(tid_iter
);
255 InFlightAppends::iterator iter
= m_in_flight_appends
.find(tid
);
256 if (r
== -EOVERFLOW
|| m_overflowed
) {
257 if (iter
!= m_in_flight_appends
.end()) {
260 // must have seen an overflow on a previous append op
261 assert(r
== -EOVERFLOW
&& m_overflowed
);
264 // notify of overflow once all in-flight ops are complete
265 if (m_in_flight_tids
.empty() && !m_aio_scheduled
) {
267 notify_handler_unlock();
274 assert(iter
!= m_in_flight_appends
.end());
275 append_buffers
.swap(iter
->second
);
276 assert(!append_buffers
.empty());
278 m_in_flight_appends
.erase(iter
);
279 m_in_flight_flushes
= true;
283 // Flag the associated futures as complete.
284 for (AppendBuffers::iterator buf_it
= append_buffers
.begin();
285 buf_it
!= append_buffers
.end(); ++buf_it
) {
286 ldout(m_cct
, 20) << __func__
<< ": " << *buf_it
->first
<< " marked safe"
288 buf_it
->first
->safe(r
);
291 // wake up any flush requests that raced with a RADOS callback
293 m_in_flight_flushes
= false;
294 m_in_flight_flushes_cond
.Signal();
296 if (m_in_flight_appends
.empty() && !m_aio_scheduled
&& m_object_closed
) {
297 // all remaining unsent appends should be redirected to new object
298 notify_handler_unlock();
304 void ObjectRecorder::append_overflowed() {
305 ldout(m_cct
, 10) << __func__
<< ": " << m_oid
<< " append overflowed"
308 assert(m_lock
->is_locked());
309 assert(!m_in_flight_appends
.empty());
311 cancel_append_task();
313 InFlightAppends in_flight_appends
;
314 in_flight_appends
.swap(m_in_flight_appends
);
316 AppendBuffers restart_append_buffers
;
317 for (InFlightAppends::iterator it
= in_flight_appends
.begin();
318 it
!= in_flight_appends
.end(); ++it
) {
319 restart_append_buffers
.insert(restart_append_buffers
.end(),
320 it
->second
.begin(), it
->second
.end());
323 restart_append_buffers
.splice(restart_append_buffers
.end(),
325 m_append_buffers
.begin(),
326 m_append_buffers
.end());
327 restart_append_buffers
.swap(m_append_buffers
);
329 for (AppendBuffers::const_iterator it
= m_append_buffers
.begin();
330 it
!= m_append_buffers
.end(); ++it
) {
331 ldout(m_cct
, 20) << __func__
<< ": overflowed " << *it
->first
337 void ObjectRecorder::send_appends(AppendBuffers
*append_buffers
) {
338 assert(m_lock
->is_locked());
339 assert(!append_buffers
->empty());
341 for (AppendBuffers::iterator it
= append_buffers
->begin();
342 it
!= append_buffers
->end(); ++it
) {
343 ldout(m_cct
, 20) << __func__
<< ": flushing " << *it
->first
345 it
->first
->set_flush_in_progress();
346 m_size
+= it
->second
.length();
349 m_pending_buffers
.splice(m_pending_buffers
.end(), *append_buffers
,
350 append_buffers
->begin(), append_buffers
->end());
351 if (!m_aio_scheduled
) {
352 m_op_work_queue
->queue(new FunctionContext([this] (int r
) {
355 m_aio_scheduled
= true;
359 void ObjectRecorder::send_appends_aio() {
360 AppendBuffers
*append_buffers
;
363 Mutex::Locker
locker(*m_lock
);
364 append_tid
= m_append_tid
++;
365 m_in_flight_tids
.insert(append_tid
);
367 // safe to hold pointer outside lock until op is submitted
368 append_buffers
= &m_in_flight_appends
[append_tid
];
369 append_buffers
->swap(m_pending_buffers
);
372 ldout(m_cct
, 10) << __func__
<< ": " << m_oid
<< " flushing journal tid="
373 << append_tid
<< dendl
;
374 C_AppendFlush
*append_flush
= new C_AppendFlush(this, append_tid
);
375 C_Gather
*gather_ctx
= new C_Gather(m_cct
, append_flush
);
377 librados::ObjectWriteOperation op
;
378 client::guard_append(&op
, m_soft_max_size
);
379 for (AppendBuffers::iterator it
= append_buffers
->begin();
380 it
!= append_buffers
->end(); ++it
) {
381 ldout(m_cct
, 20) << __func__
<< ": flushing " << *it
->first
383 op
.append(it
->second
);
384 op
.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
);
387 librados::AioCompletion
*rados_completion
=
388 librados::Rados::aio_create_completion(gather_ctx
->new_sub(), nullptr,
389 utils::rados_ctx_callback
);
390 int r
= m_ioctx
.aio_operate(m_oid
, rados_completion
, &op
);
392 rados_completion
->release();
396 if (m_pending_buffers
.empty()) {
397 m_aio_scheduled
= false;
398 if (m_in_flight_appends
.empty() && m_object_closed
) {
399 // all remaining unsent appends should be redirected to new object
400 notify_handler_unlock();
405 // additional pending items -- reschedule
406 m_op_work_queue
->queue(new FunctionContext([this] (int r
) {
413 // allow append op to complete
414 gather_ctx
->activate();
417 void ObjectRecorder::notify_handler_unlock() {
418 assert(m_lock
->is_locked());
419 if (m_object_closed
) {
421 m_handler
->closed(this);
423 // TODO need to delay completion until after aio_notify completes
425 m_handler
->overflow(this);
429 } // namespace journal