1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "journal/ObjectRecorder.h"
5 #include "journal/Future.h"
6 #include "journal/Utils.h"
7 #include "include/ceph_assert.h"
8 #include "common/Timer.h"
9 #include "common/errno.h"
10 #include "cls/journal/cls_journal_client.h"
12 #define dout_subsys ceph_subsys_journaler
14 #define dout_prefix *_dout << "ObjectRecorder: " << this << " " \
15 << __func__ << " (" << m_oid << "): "
17 using namespace cls::journal
;
18 using std::shared_ptr
;
22 ObjectRecorder::ObjectRecorder(librados::IoCtx
&ioctx
, std::string_view oid
,
23 uint64_t object_number
, ceph::mutex
* lock
,
24 ContextWQ
*work_queue
, Handler
*handler
,
25 uint8_t order
, int32_t max_in_flight_appends
)
26 : m_oid(oid
), m_object_number(object_number
),
27 m_op_work_queue(work_queue
), m_handler(handler
),
28 m_order(order
), m_soft_max_size(1 << m_order
),
29 m_max_in_flight_appends(max_in_flight_appends
),
33 m_cct
= reinterpret_cast<CephContext
*>(m_ioctx
.cct());
34 ceph_assert(m_handler
!= NULL
);
36 librados::Rados
rados(m_ioctx
);
37 int8_t require_osd_release
= 0;
38 int r
= rados
.get_min_compatible_osd(&require_osd_release
);
40 ldout(m_cct
, 0) << "failed to retrieve min OSD release: "
41 << cpp_strerror(r
) << dendl
;
43 m_compat_mode
= require_osd_release
< CEPH_RELEASE_OCTOPUS
;
45 ldout(m_cct
, 20) << dendl
;
48 ObjectRecorder::~ObjectRecorder() {
49 ldout(m_cct
, 20) << dendl
;
50 ceph_assert(m_pending_buffers
.empty());
51 ceph_assert(m_in_flight_tids
.empty());
52 ceph_assert(m_in_flight_appends
.empty());
55 void ObjectRecorder::set_append_batch_options(int flush_interval
,
58 ldout(m_cct
, 5) << "flush_interval=" << flush_interval
<< ", "
59 << "flush_bytes=" << flush_bytes
<< ", "
60 << "flush_age=" << flush_age
<< dendl
;
62 ceph_assert(ceph_mutex_is_locked(*m_lock
));
63 m_flush_interval
= flush_interval
;
64 m_flush_bytes
= flush_bytes
;
65 m_flush_age
= flush_age
;
68 bool ObjectRecorder::append(AppendBuffers
&&append_buffers
) {
69 ldout(m_cct
, 20) << "count=" << append_buffers
.size() << dendl
;
71 ceph_assert(ceph_mutex_is_locked(*m_lock
));
73 ceph::ref_t
<FutureImpl
> last_flushed_future
;
74 auto flush_handler
= get_flush_handler();
75 for (auto& append_buffer
: append_buffers
) {
76 ldout(m_cct
, 20) << *append_buffer
.first
<< ", "
77 << "size=" << append_buffer
.second
.length() << dendl
;
78 bool flush_requested
= append_buffer
.first
->attach(flush_handler
);
79 if (flush_requested
) {
80 last_flushed_future
= append_buffer
.first
;
83 m_pending_buffers
.push_back(append_buffer
);
84 m_pending_bytes
+= append_buffer
.second
.length();
87 return send_appends(!!last_flushed_future
, last_flushed_future
);
90 void ObjectRecorder::flush(Context
*on_safe
) {
91 ldout(m_cct
, 20) << dendl
;
95 std::unique_lock locker
{*m_lock
};
97 // if currently handling flush notifications, wait so that
98 // we notify in the correct order (since lock is dropped on
100 while (m_in_flight_callbacks
) {
101 m_in_flight_callbacks_cond
.wait(locker
);
104 // attach the flush to the most recent append
105 if (!m_pending_buffers
.empty()) {
106 future
= Future(m_pending_buffers
.rbegin()->first
);
107 } else if (!m_in_flight_appends
.empty()) {
108 AppendBuffers
&append_buffers
= m_in_flight_appends
.rbegin()->second
;
109 ceph_assert(!append_buffers
.empty());
110 future
= Future(append_buffers
.rbegin()->first
);
114 if (future
.is_valid()) {
115 // cannot be invoked while the same lock context
116 m_op_work_queue
->queue(new LambdaContext(
117 [future
, on_safe
] (int r
) mutable {
118 future
.flush(on_safe
);
121 on_safe
->complete(0);
125 void ObjectRecorder::flush(const ceph::ref_t
<FutureImpl
>& future
) {
126 ldout(m_cct
, 20) << "flushing " << *future
<< dendl
;
128 std::unique_lock locker
{*m_lock
};
129 auto flush_handler
= future
->get_flush_handler();
130 auto my_handler
= get_flush_handler();
131 if (flush_handler
!= my_handler
) {
132 // if we don't own this future, re-issue the flush so that it hits the
133 // correct journal object owner
136 } else if (future
->is_flush_in_progress()) {
140 if (!m_object_closed
&& !m_overflowed
&& send_appends(true, future
)) {
141 m_in_flight_callbacks
= true;
142 notify_handler_unlock(locker
, true);
146 void ObjectRecorder::claim_append_buffers(AppendBuffers
*append_buffers
) {
147 ldout(m_cct
, 20) << dendl
;
149 ceph_assert(ceph_mutex_is_locked(*m_lock
));
150 ceph_assert(m_in_flight_tids
.empty());
151 ceph_assert(m_in_flight_appends
.empty());
152 ceph_assert(m_object_closed
|| m_overflowed
);
154 for (auto& append_buffer
: m_pending_buffers
) {
155 ldout(m_cct
, 20) << "detached " << *append_buffer
.first
<< dendl
;
156 append_buffer
.first
->detach();
158 append_buffers
->splice(append_buffers
->end(), m_pending_buffers
,
159 m_pending_buffers
.begin(), m_pending_buffers
.end());
162 bool ObjectRecorder::close() {
163 ceph_assert(ceph_mutex_is_locked(*m_lock
));
165 ldout(m_cct
, 20) << dendl
;
167 send_appends(true, {});
169 ceph_assert(!m_object_closed
);
170 m_object_closed
= true;
172 if (!m_in_flight_tids
.empty() || m_in_flight_callbacks
) {
173 m_object_closed_notify
= true;
180 void ObjectRecorder::handle_append_flushed(uint64_t tid
, int r
) {
181 ldout(m_cct
, 20) << "tid=" << tid
<< ", r=" << r
<< dendl
;
183 std::unique_lock locker
{*m_lock
};
184 m_in_flight_callbacks
= true;
186 auto tid_iter
= m_in_flight_tids
.find(tid
);
187 ceph_assert(tid_iter
!= m_in_flight_tids
.end());
188 m_in_flight_tids
.erase(tid_iter
);
190 InFlightAppends::iterator iter
= m_in_flight_appends
.find(tid
);
191 ceph_assert(iter
!= m_in_flight_appends
.end());
193 bool notify_overflowed
= false;
194 AppendBuffers append_buffers
;
195 if (r
== -EOVERFLOW
) {
196 ldout(m_cct
, 10) << "append overflowed: "
197 << "idle=" << m_in_flight_tids
.empty() << ", "
198 << "previous_overflow=" << m_overflowed
<< dendl
;
199 if (m_in_flight_tids
.empty()) {
203 if (!m_object_closed
&& !m_overflowed
) {
204 notify_overflowed
= true;
208 append_buffers
.swap(iter
->second
);
209 ceph_assert(!append_buffers
.empty());
211 for (auto& append_buffer
: append_buffers
) {
212 auto length
= append_buffer
.second
.length();
213 m_object_bytes
+= length
;
215 ceph_assert(m_in_flight_bytes
>= length
);
216 m_in_flight_bytes
-= length
;
218 ldout(m_cct
, 20) << "object_bytes=" << m_object_bytes
<< dendl
;
220 m_in_flight_appends
.erase(iter
);
224 // Flag the associated futures as complete.
225 for (auto& append_buffer
: append_buffers
) {
226 ldout(m_cct
, 20) << *append_buffer
.first
<< " marked safe" << dendl
;
227 append_buffer
.first
->safe(r
);
230 // attempt to kick off more appends to the object
232 if (!m_object_closed
&& !m_overflowed
&& send_appends(false, {})) {
233 notify_overflowed
= true;
236 ldout(m_cct
, 20) << "pending tids=" << m_in_flight_tids
<< dendl
;
238 // all remaining unsent appends should be redirected to new object
239 notify_handler_unlock(locker
, notify_overflowed
);
242 void ObjectRecorder::append_overflowed() {
243 ldout(m_cct
, 10) << dendl
;
245 ceph_assert(ceph_mutex_is_locked(*m_lock
));
246 ceph_assert(!m_in_flight_appends
.empty());
248 InFlightAppends in_flight_appends
;
249 in_flight_appends
.swap(m_in_flight_appends
);
251 AppendBuffers restart_append_buffers
;
252 for (InFlightAppends::iterator it
= in_flight_appends
.begin();
253 it
!= in_flight_appends
.end(); ++it
) {
254 restart_append_buffers
.insert(restart_append_buffers
.end(),
255 it
->second
.begin(), it
->second
.end());
258 restart_append_buffers
.splice(restart_append_buffers
.end(),
260 m_pending_buffers
.begin(),
261 m_pending_buffers
.end());
262 restart_append_buffers
.swap(m_pending_buffers
);
265 bool ObjectRecorder::send_appends(bool force
, ceph::ref_t
<FutureImpl
> flush_future
) {
266 ldout(m_cct
, 20) << dendl
;
268 ceph_assert(ceph_mutex_is_locked(*m_lock
));
269 if (m_object_closed
|| m_overflowed
) {
270 ldout(m_cct
, 20) << "already closed or overflowed" << dendl
;
274 if (m_pending_buffers
.empty()) {
275 ldout(m_cct
, 20) << "append buffers empty" << dendl
;
280 ((m_flush_interval
> 0 && m_pending_buffers
.size() >= m_flush_interval
) ||
281 (m_flush_bytes
> 0 && m_pending_bytes
>= m_flush_bytes
) ||
282 (m_flush_age
> 0 && !m_last_flush_time
.is_zero() &&
283 m_last_flush_time
+ m_flush_age
<= ceph_clock_now()))) {
284 ldout(m_cct
, 20) << "forcing batch flush" << dendl
;
288 // start tracking flush time after the first append event
289 if (m_last_flush_time
.is_zero()) {
290 m_last_flush_time
= ceph_clock_now();
293 auto max_in_flight_appends
= m_max_in_flight_appends
;
294 if (m_flush_interval
> 0 || m_flush_bytes
> 0 || m_flush_age
> 0) {
295 if (!force
&& max_in_flight_appends
== 0) {
296 ldout(m_cct
, 20) << "attempting to batch AIO appends" << dendl
;
297 max_in_flight_appends
= 1;
299 } else if (max_in_flight_appends
< 0) {
300 max_in_flight_appends
= 0;
303 if (!force
&& max_in_flight_appends
!= 0 &&
304 static_cast<int32_t>(m_in_flight_tids
.size()) >= max_in_flight_appends
) {
305 ldout(m_cct
, 10) << "max in flight appends reached" << dendl
;
309 librados::ObjectWriteOperation op
;
311 client::guard_append(&op
, m_soft_max_size
);
314 size_t append_bytes
= 0;
315 AppendBuffers append_buffers
;
316 bufferlist append_bl
;
317 for (auto it
= m_pending_buffers
.begin(); it
!= m_pending_buffers
.end(); ) {
318 auto& future
= it
->first
;
319 auto& bl
= it
->second
;
320 auto size
= m_object_bytes
+ m_in_flight_bytes
+ append_bytes
+ bl
.length();
321 if (size
== m_soft_max_size
) {
322 ldout(m_cct
, 10) << "object at capacity (" << size
<< ") " << *future
<< dendl
;
324 } else if (size
> m_soft_max_size
) {
325 ldout(m_cct
, 10) << "object beyond capacity (" << size
<< ") " << *future
<< dendl
;
330 bool flush_break
= (force
&& flush_future
&& flush_future
== future
);
331 ldout(m_cct
, 20) << "flushing " << *future
<< dendl
;
332 future
->set_flush_in_progress();
336 op
.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED
);
338 append_bl
.append(bl
);
341 append_bytes
+= bl
.length();
342 append_buffers
.push_back(*it
);
343 it
= m_pending_buffers
.erase(it
);
346 ldout(m_cct
, 20) << "stopping at requested flush future" << dendl
;
351 if (append_bytes
> 0) {
352 m_last_flush_time
= ceph_clock_now();
354 uint64_t append_tid
= m_append_tid
++;
355 m_in_flight_tids
.insert(append_tid
);
356 m_in_flight_appends
[append_tid
].swap(append_buffers
);
357 m_in_flight_bytes
+= append_bytes
;
359 ceph_assert(m_pending_bytes
>= append_bytes
);
360 m_pending_bytes
-= append_bytes
;
362 if (!m_compat_mode
) {
363 client::append(&op
, m_soft_max_size
, append_bl
);
366 auto rados_completion
= librados::Rados::aio_create_completion(
367 new C_AppendFlush(this, append_tid
), utils::rados_ctx_callback
);
368 int r
= m_ioctx
.aio_operate(m_oid
, rados_completion
, &op
);
370 rados_completion
->release();
371 ldout(m_cct
, 20) << "flushing journal tid=" << append_tid
<< ", "
372 << "append_bytes=" << append_bytes
<< ", "
373 << "in_flight_bytes=" << m_in_flight_bytes
<< ", "
374 << "pending_bytes=" << m_pending_bytes
<< dendl
;
380 void ObjectRecorder::wake_up_flushes() {
381 ceph_assert(ceph_mutex_is_locked(*m_lock
));
382 m_in_flight_callbacks
= false;
383 m_in_flight_callbacks_cond
.notify_all();
386 void ObjectRecorder::notify_handler_unlock(
387 std::unique_lock
<ceph::mutex
>& locker
, bool notify_overflowed
) {
388 ceph_assert(ceph_mutex_is_locked(*m_lock
));
389 ceph_assert(m_in_flight_callbacks
);
391 if (!m_object_closed
&& notify_overflowed
) {
392 // TODO need to delay completion until after aio_notify completes
393 ldout(m_cct
, 10) << "overflow" << dendl
;
394 ceph_assert(m_overflowed
);
397 m_handler
->overflow(this);
401 // wake up blocked flush requests
404 // An overflow notification might have blocked a close. A close
405 // notification could lead to the immediate destruction of this object
406 // so the object shouldn't be referenced anymore
407 bool object_closed_notify
= false;
408 if (m_in_flight_tids
.empty()) {
409 std::swap(object_closed_notify
, m_object_closed_notify
);
411 ceph_assert(m_object_closed
|| !object_closed_notify
);
414 if (object_closed_notify
) {
415 ldout(m_cct
, 10) << "closed" << dendl
;
416 m_handler
->closed(this);
420 } // namespace journal