1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "journal/ObjectRecorder.h"
5 #include "journal/Future.h"
6 #include "journal/Utils.h"
7 #include "include/ceph_assert.h"
8 #include "common/Timer.h"
9 #include "cls/journal/cls_journal_client.h"
11 #define dout_subsys ceph_subsys_journaler
13 #define dout_prefix *_dout << "ObjectRecorder: " << this << " " \
14 << __func__ << " (" << m_oid << "): "
16 using namespace cls::journal
;
17 using std::shared_ptr
;
21 ObjectRecorder::ObjectRecorder(librados::IoCtx
&ioctx
, const std::string
&oid
,
22 uint64_t object_number
, shared_ptr
<Mutex
> lock
,
23 ContextWQ
*work_queue
, Handler
*handler
,
24 uint8_t order
, int32_t max_in_flight_appends
)
25 : RefCountedObject(NULL
, 0), m_oid(oid
), m_object_number(object_number
),
26 m_cct(NULL
), m_op_work_queue(work_queue
), m_handler(handler
),
27 m_order(order
), m_soft_max_size(1 << m_order
),
28 m_max_in_flight_appends(max_in_flight_appends
), m_flush_handler(this),
29 m_lock(lock
), m_last_flush_time(ceph_clock_now()), m_append_tid(0),
30 m_overflowed(false), m_object_closed(false), m_in_flight_flushes(false) {
32 m_cct
= reinterpret_cast<CephContext
*>(m_ioctx
.cct());
33 ceph_assert(m_handler
!= NULL
);
34 ldout(m_cct
, 20) << dendl
;
37 ObjectRecorder::~ObjectRecorder() {
38 ldout(m_cct
, 20) << dendl
;
39 ceph_assert(m_pending_buffers
.empty());
40 ceph_assert(m_in_flight_tids
.empty());
41 ceph_assert(m_in_flight_appends
.empty());
44 void ObjectRecorder::set_append_batch_options(int flush_interval
,
47 ldout(m_cct
, 5) << "flush_interval=" << flush_interval
<< ", "
48 << "flush_bytes=" << flush_bytes
<< ", "
49 << "flush_age=" << flush_age
<< dendl
;
51 ceph_assert(m_lock
->is_locked());
52 m_flush_interval
= flush_interval
;
53 m_flush_bytes
= flush_bytes
;
54 m_flush_age
= flush_age
;
57 bool ObjectRecorder::append(AppendBuffers
&&append_buffers
) {
58 ldout(m_cct
, 20) << "count=" << append_buffers
.size() << dendl
;
60 ceph_assert(m_lock
->is_locked());
62 FutureImplPtr last_flushed_future
;
63 for (auto& append_buffer
: append_buffers
) {
64 ldout(m_cct
, 20) << *append_buffer
.first
<< ", "
65 << "size=" << append_buffer
.second
.length() << dendl
;
66 bool flush_requested
= append_buffer
.first
->attach(&m_flush_handler
);
67 if (flush_requested
) {
68 last_flushed_future
= append_buffer
.first
;
71 m_pending_buffers
.push_back(append_buffer
);
72 m_pending_bytes
+= append_buffer
.second
.length();
75 return send_appends(!!last_flushed_future
, last_flushed_future
);
78 void ObjectRecorder::flush(Context
*on_safe
) {
79 ldout(m_cct
, 20) << dendl
;
83 Mutex::Locker
locker(*m_lock
);
85 // if currently handling flush notifications, wait so that
86 // we notify in the correct order (since lock is dropped on
88 if (m_in_flight_flushes
) {
89 m_in_flight_flushes_cond
.Wait(*(m_lock
.get()));
92 // attach the flush to the most recent append
93 if (!m_pending_buffers
.empty()) {
94 future
= Future(m_pending_buffers
.rbegin()->first
);
95 } else if (!m_in_flight_appends
.empty()) {
96 AppendBuffers
&append_buffers
= m_in_flight_appends
.rbegin()->second
;
97 ceph_assert(!append_buffers
.empty());
98 future
= Future(append_buffers
.rbegin()->first
);
102 if (future
.is_valid()) {
103 // cannot be invoked while the same lock context
104 m_op_work_queue
->queue(new FunctionContext(
105 [future
, on_safe
] (int r
) mutable {
106 future
.flush(on_safe
);
109 on_safe
->complete(0);
113 void ObjectRecorder::flush(const FutureImplPtr
&future
) {
114 ldout(m_cct
, 20) << "flushing " << *future
<< dendl
;
117 if (future
->get_flush_handler().get() != &m_flush_handler
) {
118 // if we don't own this future, re-issue the flush so that it hits the
119 // correct journal object owner
123 } else if (future
->is_flush_in_progress()) {
128 bool overflowed
= send_appends(true, future
);
130 notify_handler_unlock();
136 void ObjectRecorder::claim_append_buffers(AppendBuffers
*append_buffers
) {
137 ldout(m_cct
, 20) << dendl
;
139 ceph_assert(m_lock
->is_locked());
140 ceph_assert(m_in_flight_tids
.empty());
141 ceph_assert(m_in_flight_appends
.empty());
142 ceph_assert(m_object_closed
|| m_overflowed
);
144 for (auto& append_buffer
: m_pending_buffers
) {
145 ldout(m_cct
, 20) << "detached " << *append_buffer
.first
<< dendl
;
146 append_buffer
.first
->detach();
148 append_buffers
->splice(append_buffers
->end(), m_pending_buffers
,
149 m_pending_buffers
.begin(), m_pending_buffers
.end());
152 bool ObjectRecorder::close() {
153 ceph_assert(m_lock
->is_locked());
155 ldout(m_cct
, 20) << dendl
;
156 send_appends(true, {});
158 ceph_assert(!m_object_closed
);
159 m_object_closed
= true;
160 return (m_in_flight_tids
.empty() && !m_in_flight_flushes
);
163 void ObjectRecorder::handle_append_flushed(uint64_t tid
, int r
) {
164 ldout(m_cct
, 20) << "tid=" << tid
<< ", r=" << r
<< dendl
;
166 AppendBuffers append_buffers
;
169 auto tid_iter
= m_in_flight_tids
.find(tid
);
170 ceph_assert(tid_iter
!= m_in_flight_tids
.end());
171 m_in_flight_tids
.erase(tid_iter
);
173 InFlightAppends::iterator iter
= m_in_flight_appends
.find(tid
);
174 ceph_assert(iter
!= m_in_flight_appends
.end());
176 if (r
== -EOVERFLOW
) {
177 ldout(m_cct
, 10) << "append overflowed" << dendl
;
180 // notify of overflow once all in-flight ops are complete
181 if (m_in_flight_tids
.empty()) {
183 notify_handler_unlock();
190 append_buffers
.swap(iter
->second
);
191 ceph_assert(!append_buffers
.empty());
193 for (auto& append_buffer
: append_buffers
) {
194 m_object_bytes
+= append_buffer
.second
.length();
196 ldout(m_cct
, 20) << "object_bytes=" << m_object_bytes
<< dendl
;
198 m_in_flight_appends
.erase(iter
);
199 m_in_flight_flushes
= true;
203 // Flag the associated futures as complete.
204 for (auto& append_buffer
: append_buffers
) {
205 ldout(m_cct
, 20) << *append_buffer
.first
<< " marked safe" << dendl
;
206 append_buffer
.first
->safe(r
);
209 // wake up any flush requests that raced with a RADOS callback
211 m_in_flight_flushes
= false;
212 m_in_flight_flushes_cond
.Signal();
214 if (m_in_flight_appends
.empty() && (m_object_closed
|| m_overflowed
)) {
215 // all remaining unsent appends should be redirected to new object
216 notify_handler_unlock();
218 bool overflowed
= send_appends(false, {});
220 notify_handler_unlock();
227 void ObjectRecorder::append_overflowed() {
228 ldout(m_cct
, 10) << dendl
;
230 ceph_assert(m_lock
->is_locked());
231 ceph_assert(!m_in_flight_appends
.empty());
233 InFlightAppends in_flight_appends
;
234 in_flight_appends
.swap(m_in_flight_appends
);
236 AppendBuffers restart_append_buffers
;
237 for (InFlightAppends::iterator it
= in_flight_appends
.begin();
238 it
!= in_flight_appends
.end(); ++it
) {
239 restart_append_buffers
.insert(restart_append_buffers
.end(),
240 it
->second
.begin(), it
->second
.end());
243 restart_append_buffers
.splice(restart_append_buffers
.end(),
245 m_pending_buffers
.begin(),
246 m_pending_buffers
.end());
247 restart_append_buffers
.swap(m_pending_buffers
);
250 bool ObjectRecorder::send_appends(bool force
, FutureImplPtr flush_future
) {
251 ldout(m_cct
, 20) << dendl
;
253 ceph_assert(m_lock
->is_locked());
254 if (m_object_closed
|| m_overflowed
) {
255 ldout(m_cct
, 20) << "already closed or overflowed" << dendl
;
259 if (m_pending_buffers
.empty()) {
260 ldout(m_cct
, 20) << "append buffers empty" << dendl
;
265 ((m_flush_interval
> 0 && m_pending_buffers
.size() >= m_flush_interval
) ||
266 (m_flush_bytes
> 0 && m_pending_bytes
>= m_flush_bytes
) ||
268 m_last_flush_time
+ m_flush_age
>= ceph_clock_now()))) {
269 ldout(m_cct
, 20) << "forcing batch flush" << dendl
;
273 auto max_in_flight_appends
= m_max_in_flight_appends
;
274 if (m_flush_interval
> 0 || m_flush_bytes
> 0 || m_flush_age
> 0) {
275 if (!force
&& max_in_flight_appends
== 0) {
276 ldout(m_cct
, 20) << "attempting to batch AIO appends" << dendl
;
277 max_in_flight_appends
= 1;
279 } else if (max_in_flight_appends
< 0) {
280 max_in_flight_appends
= 0;
283 if (!force
&& max_in_flight_appends
!= 0 &&
284 static_cast<int32_t>(m_in_flight_tids
.size()) >= max_in_flight_appends
) {
285 ldout(m_cct
, 10) << "max in flight appends reached" << dendl
;
289 librados::ObjectWriteOperation op
;
290 client::guard_append(&op
, m_soft_max_size
);
292 size_t append_bytes
= 0;
293 AppendBuffers append_buffers
;
294 for (auto it
= m_pending_buffers
.begin(); it
!= m_pending_buffers
.end(); ) {
295 auto& future
= it
->first
;
296 auto& bl
= it
->second
;
297 auto size
= m_object_bytes
+ m_in_flight_bytes
+ append_bytes
+ bl
.length();
298 if (size
== m_soft_max_size
) {
299 ldout(m_cct
, 10) << "object at capacity " << *future
<< dendl
;
301 } else if (size
> m_soft_max_size
) {
302 ldout(m_cct
, 10) << "object beyond capacity " << *future
<< dendl
;
307 bool flush_break
= (force
&& flush_future
&& flush_future
== future
);
308 ldout(m_cct
, 20) << "flushing " << *future
<< dendl
;
309 future
->set_flush_in_progress();
312 op
.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
);
314 append_bytes
+= bl
.length();
315 append_buffers
.push_back(*it
);
316 it
= m_pending_buffers
.erase(it
);
319 ldout(m_cct
, 20) << "stopping at requested flush future" << dendl
;
324 if (append_bytes
> 0) {
325 m_last_flush_time
= ceph_clock_now();
327 uint64_t append_tid
= m_append_tid
++;
328 m_in_flight_tids
.insert(append_tid
);
329 m_in_flight_appends
[append_tid
].swap(append_buffers
);
330 m_in_flight_bytes
+= append_bytes
;
332 ceph_assert(m_pending_bytes
>= append_bytes
);
333 m_pending_bytes
-= append_bytes
;
335 auto rados_completion
= librados::Rados::aio_create_completion(
336 new C_AppendFlush(this, append_tid
), nullptr, utils::rados_ctx_callback
);
337 int r
= m_ioctx
.aio_operate(m_oid
, rados_completion
, &op
);
339 rados_completion
->release();
340 ldout(m_cct
, 20) << "flushing journal tid=" << append_tid
<< ", "
341 << "append_bytes=" << append_bytes
<< ", "
342 << "in_flight_bytes=" << m_in_flight_bytes
<< ", "
343 << "pending_bytes=" << m_pending_bytes
<< dendl
;
349 void ObjectRecorder::notify_handler_unlock() {
350 ceph_assert(m_lock
->is_locked());
351 if (m_object_closed
) {
353 m_handler
->closed(this);
355 // TODO need to delay completion until after aio_notify completes
357 m_handler
->overflow(this);
361 } // namespace journal