1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "journal/JournalRecorder.h"
5 #include "common/errno.h"
6 #include "journal/Entry.h"
7 #include "journal/Utils.h"
11 #define dout_subsys ceph_subsys_journaler
13 #define dout_prefix *_dout << "JournalRecorder: " << this << " "
15 using std::shared_ptr
;
21 struct C_Flush
: public Context
{
22 JournalMetadataPtr journal_metadata
;
24 std::atomic
<int64_t> pending_flushes
= { 0 };
27 C_Flush(JournalMetadataPtr _journal_metadata
, Context
*_on_finish
,
28 size_t _pending_flushes
)
29 : journal_metadata(_journal_metadata
), on_finish(_on_finish
),
30 pending_flushes(_pending_flushes
), ret_val(0) {
33 void complete(int r
) override
{
34 if (r
< 0 && ret_val
== 0) {
37 if (--pending_flushes
== 0) {
38 // ensure all prior callback have been flushed as well
39 journal_metadata
->queue(on_finish
, ret_val
);
43 void finish(int r
) override
{
47 } // anonymous namespace
49 JournalRecorder::JournalRecorder(librados::IoCtx
&ioctx
,
50 const std::string
&object_oid_prefix
,
51 const JournalMetadataPtr
& journal_metadata
,
52 uint32_t flush_interval
, uint64_t flush_bytes
,
54 uint64_t max_in_flight_appends
)
55 : m_cct(NULL
), m_object_oid_prefix(object_oid_prefix
),
56 m_journal_metadata(journal_metadata
), m_flush_interval(flush_interval
),
57 m_flush_bytes(flush_bytes
), m_flush_age(flush_age
),
58 m_max_in_flight_appends(max_in_flight_appends
), m_listener(this),
59 m_object_handler(this), m_lock("JournalerRecorder::m_lock"),
60 m_current_set(m_journal_metadata
->get_active_set()) {
62 Mutex::Locker
locker(m_lock
);
64 m_cct
= reinterpret_cast<CephContext
*>(m_ioctx
.cct());
66 uint8_t splay_width
= m_journal_metadata
->get_splay_width();
67 for (uint8_t splay_offset
= 0; splay_offset
< splay_width
; ++splay_offset
) {
68 m_object_locks
.push_back(shared_ptr
<Mutex
>(
69 new Mutex("ObjectRecorder::m_lock::"+
70 std::to_string(splay_offset
))));
71 uint64_t object_number
= splay_offset
+ (m_current_set
* splay_width
);
72 m_object_ptrs
[splay_offset
] = create_object_recorder(
74 m_object_locks
[splay_offset
]);
77 m_journal_metadata
->add_listener(&m_listener
);
80 JournalRecorder::~JournalRecorder() {
81 m_journal_metadata
->remove_listener(&m_listener
);
83 Mutex::Locker
locker(m_lock
);
84 ceph_assert(m_in_flight_advance_sets
== 0);
85 ceph_assert(m_in_flight_object_closes
== 0);
88 Future
JournalRecorder::append(uint64_t tag_tid
,
89 const bufferlist
&payload_bl
) {
93 uint64_t entry_tid
= m_journal_metadata
->allocate_entry_tid(tag_tid
);
94 uint8_t splay_width
= m_journal_metadata
->get_splay_width();
95 uint8_t splay_offset
= entry_tid
% splay_width
;
97 ObjectRecorderPtr object_ptr
= get_object(splay_offset
);
98 uint64_t commit_tid
= m_journal_metadata
->allocate_commit_tid(
99 object_ptr
->get_object_number(), tag_tid
, entry_tid
);
100 FutureImplPtr
future(new FutureImpl(tag_tid
, entry_tid
, commit_tid
));
101 future
->init(m_prev_future
);
102 m_prev_future
= future
;
104 m_object_locks
[splay_offset
]->Lock();
108 encode(Entry(future
->get_tag_tid(), future
->get_entry_tid(), payload_bl
),
110 ceph_assert(entry_bl
.length() <= m_journal_metadata
->get_object_size());
112 bool object_full
= object_ptr
->append_unlock({{future
, entry_bl
}});
114 ldout(m_cct
, 10) << "object " << object_ptr
->get_oid() << " now full"
116 Mutex::Locker
l(m_lock
);
117 close_and_advance_object_set(object_ptr
->get_object_number() / splay_width
);
119 return Future(future
);
122 void JournalRecorder::flush(Context
*on_safe
) {
125 Mutex::Locker
locker(m_lock
);
127 ctx
= new C_Flush(m_journal_metadata
, on_safe
, m_object_ptrs
.size() + 1);
128 for (ObjectRecorderPtrs::iterator it
= m_object_ptrs
.begin();
129 it
!= m_object_ptrs
.end(); ++it
) {
130 it
->second
->flush(ctx
);
135 // avoid holding the lock in case there is nothing to flush
139 ObjectRecorderPtr
JournalRecorder::get_object(uint8_t splay_offset
) {
140 ceph_assert(m_lock
.is_locked());
142 ObjectRecorderPtr object_recoder
= m_object_ptrs
[splay_offset
];
143 ceph_assert(object_recoder
!= NULL
);
144 return object_recoder
;
147 void JournalRecorder::close_and_advance_object_set(uint64_t object_set
) {
148 ceph_assert(m_lock
.is_locked());
150 // entry overflow from open object
151 if (m_current_set
!= object_set
) {
152 ldout(m_cct
, 20) << __func__
<< ": close already in-progress" << dendl
;
156 // we shouldn't overflow upon append if already closed and we
157 // shouldn't receive an overflowed callback if already closed
158 ceph_assert(m_in_flight_advance_sets
== 0);
159 ceph_assert(m_in_flight_object_closes
== 0);
161 uint64_t active_set
= m_journal_metadata
->get_active_set();
162 ceph_assert(m_current_set
== active_set
);
164 ++m_in_flight_advance_sets
;
166 ldout(m_cct
, 20) << __func__
<< ": closing active object set "
167 << object_set
<< dendl
;
168 if (close_object_set(m_current_set
)) {
169 advance_object_set();
173 void JournalRecorder::advance_object_set() {
174 ceph_assert(m_lock
.is_locked());
176 ceph_assert(m_in_flight_object_closes
== 0);
177 ldout(m_cct
, 20) << __func__
<< ": advance to object set " << m_current_set
179 m_journal_metadata
->set_active_set(m_current_set
, new C_AdvanceObjectSet(
183 void JournalRecorder::handle_advance_object_set(int r
) {
184 Mutex::Locker
locker(m_lock
);
185 ldout(m_cct
, 20) << __func__
<< ": r=" << r
<< dendl
;
187 ceph_assert(m_in_flight_advance_sets
> 0);
188 --m_in_flight_advance_sets
;
190 if (r
< 0 && r
!= -ESTALE
) {
191 lderr(m_cct
) << __func__
<< ": failed to advance object set: "
192 << cpp_strerror(r
) << dendl
;
195 if (m_in_flight_advance_sets
== 0 && m_in_flight_object_closes
== 0) {
200 void JournalRecorder::open_object_set() {
201 ceph_assert(m_lock
.is_locked());
203 ldout(m_cct
, 10) << __func__
<< ": opening object set " << m_current_set
206 uint8_t splay_width
= m_journal_metadata
->get_splay_width();
208 lock_object_recorders();
209 for (ObjectRecorderPtrs::iterator it
= m_object_ptrs
.begin();
210 it
!= m_object_ptrs
.end(); ++it
) {
211 ObjectRecorderPtr object_recorder
= it
->second
;
212 uint64_t object_number
= object_recorder
->get_object_number();
213 if (object_number
/ splay_width
!= m_current_set
) {
214 ceph_assert(object_recorder
->is_closed());
216 // ready to close object and open object in active set
217 create_next_object_recorder_unlock(object_recorder
);
219 uint8_t splay_offset
= object_number
% splay_width
;
220 m_object_locks
[splay_offset
]->Unlock();
225 bool JournalRecorder::close_object_set(uint64_t active_set
) {
226 ceph_assert(m_lock
.is_locked());
228 // object recorders will invoke overflow handler as they complete
229 // closing the object to ensure correct order of future appends
230 uint8_t splay_width
= m_journal_metadata
->get_splay_width();
231 lock_object_recorders();
232 for (ObjectRecorderPtrs::iterator it
= m_object_ptrs
.begin();
233 it
!= m_object_ptrs
.end(); ++it
) {
234 ObjectRecorderPtr object_recorder
= it
->second
;
235 if (object_recorder
->get_object_number() / splay_width
!= active_set
) {
236 ldout(m_cct
, 10) << __func__
<< ": closing object "
237 << object_recorder
->get_oid() << dendl
;
238 // flush out all queued appends and hold future appends
239 if (!object_recorder
->close()) {
240 ++m_in_flight_object_closes
;
242 ldout(m_cct
, 20) << __func__
<< ": object "
243 << object_recorder
->get_oid() << " closed" << dendl
;
247 unlock_object_recorders();
248 return (m_in_flight_object_closes
== 0);
251 ObjectRecorderPtr
JournalRecorder::create_object_recorder(
252 uint64_t object_number
, shared_ptr
<Mutex
> lock
) {
253 ObjectRecorderPtr
object_recorder(new ObjectRecorder(
254 m_ioctx
, utils::get_object_name(m_object_oid_prefix
, object_number
),
255 object_number
, lock
, m_journal_metadata
->get_work_queue(),
256 m_journal_metadata
->get_timer(), m_journal_metadata
->get_timer_lock(),
257 &m_object_handler
, m_journal_metadata
->get_order(), m_flush_interval
,
258 m_flush_bytes
, m_flush_age
, m_max_in_flight_appends
));
259 return object_recorder
;
262 void JournalRecorder::create_next_object_recorder_unlock(
263 ObjectRecorderPtr object_recorder
) {
264 ceph_assert(m_lock
.is_locked());
266 uint64_t object_number
= object_recorder
->get_object_number();
267 uint8_t splay_width
= m_journal_metadata
->get_splay_width();
268 uint8_t splay_offset
= object_number
% splay_width
;
270 ceph_assert(m_object_locks
[splay_offset
]->is_locked());
272 ObjectRecorderPtr new_object_recorder
= create_object_recorder(
273 (m_current_set
* splay_width
) + splay_offset
, m_object_locks
[splay_offset
]);
275 ldout(m_cct
, 10) << __func__
<< ": "
276 << "old oid=" << object_recorder
->get_oid() << ", "
277 << "new oid=" << new_object_recorder
->get_oid() << dendl
;
278 AppendBuffers append_buffers
;
279 object_recorder
->claim_append_buffers(&append_buffers
);
281 // update the commit record to point to the correct object number
282 for (auto &append_buffer
: append_buffers
) {
283 m_journal_metadata
->overflow_commit_tid(
284 append_buffer
.first
->get_commit_tid(),
285 new_object_recorder
->get_object_number());
288 new_object_recorder
->append_unlock(std::move(append_buffers
));
289 m_object_ptrs
[splay_offset
] = new_object_recorder
;
292 void JournalRecorder::handle_update() {
293 Mutex::Locker
locker(m_lock
);
295 uint64_t active_set
= m_journal_metadata
->get_active_set();
296 if (m_current_set
< active_set
) {
297 // peer journal client advanced the active set
298 ldout(m_cct
, 20) << __func__
<< ": "
299 << "current_set=" << m_current_set
<< ", "
300 << "active_set=" << active_set
<< dendl
;
302 uint64_t current_set
= m_current_set
;
303 m_current_set
= active_set
;
304 if (m_in_flight_advance_sets
== 0 && m_in_flight_object_closes
== 0) {
305 ldout(m_cct
, 20) << __func__
<< ": closing current object set "
306 << current_set
<< dendl
;
307 if (close_object_set(active_set
)) {
314 void JournalRecorder::handle_closed(ObjectRecorder
*object_recorder
) {
315 ldout(m_cct
, 10) << __func__
<< ": " << object_recorder
->get_oid() << dendl
;
317 Mutex::Locker
locker(m_lock
);
319 uint64_t object_number
= object_recorder
->get_object_number();
320 uint8_t splay_width
= m_journal_metadata
->get_splay_width();
321 uint8_t splay_offset
= object_number
% splay_width
;
322 ObjectRecorderPtr active_object_recorder
= m_object_ptrs
[splay_offset
];
323 ceph_assert(active_object_recorder
->get_object_number() == object_number
);
325 ceph_assert(m_in_flight_object_closes
> 0);
326 --m_in_flight_object_closes
;
328 // object closed after advance active set committed
329 ldout(m_cct
, 20) << __func__
<< ": object "
330 << active_object_recorder
->get_oid() << " closed" << dendl
;
331 if (m_in_flight_object_closes
== 0) {
332 if (m_in_flight_advance_sets
== 0) {
333 // peer forced closing of object set
336 // local overflow advanced object set
337 advance_object_set();
342 void JournalRecorder::handle_overflow(ObjectRecorder
*object_recorder
) {
343 ldout(m_cct
, 10) << __func__
<< ": " << object_recorder
->get_oid() << dendl
;
345 Mutex::Locker
locker(m_lock
);
347 uint64_t object_number
= object_recorder
->get_object_number();
348 uint8_t splay_width
= m_journal_metadata
->get_splay_width();
349 uint8_t splay_offset
= object_number
% splay_width
;
350 ObjectRecorderPtr active_object_recorder
= m_object_ptrs
[splay_offset
];
351 ceph_assert(active_object_recorder
->get_object_number() == object_number
);
353 ldout(m_cct
, 20) << __func__
<< ": object "
354 << active_object_recorder
->get_oid() << " overflowed"
356 close_and_advance_object_set(object_number
/ splay_width
);
359 } // namespace journal