]> git.proxmox.com Git - ceph.git/blame - ceph/src/journal/JournalRecorder.cc
bump version to 12.0.3-pve3
[ceph.git] / ceph / src / journal / JournalRecorder.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "journal/JournalRecorder.h"
5#include "common/errno.h"
6#include "journal/Entry.h"
7#include "journal/Utils.h"
8
9#define dout_subsys ceph_subsys_journaler
10#undef dout_prefix
11#define dout_prefix *_dout << "JournalRecorder: " << this << " "
12
13using std::shared_ptr;
14
15namespace journal {
16
17namespace {
18
19struct C_Flush : public Context {
20 JournalMetadataPtr journal_metadata;
21 Context *on_finish;
22 atomic_t pending_flushes;
23 int ret_val;
24
25 C_Flush(JournalMetadataPtr _journal_metadata, Context *_on_finish,
26 size_t _pending_flushes)
27 : journal_metadata(_journal_metadata), on_finish(_on_finish),
28 pending_flushes(_pending_flushes), ret_val(0) {
29 }
30
31 void complete(int r) override {
32 if (r < 0 && ret_val == 0) {
33 ret_val = r;
34 }
35 if (pending_flushes.dec() == 0) {
36 // ensure all prior callback have been flushed as well
37 journal_metadata->queue(on_finish, ret_val);
38 delete this;
39 }
40 }
41 void finish(int r) override {
42 }
43};
44
45} // anonymous namespace
46
47JournalRecorder::JournalRecorder(librados::IoCtx &ioctx,
48 const std::string &object_oid_prefix,
49 const JournalMetadataPtr& journal_metadata,
50 uint32_t flush_interval, uint64_t flush_bytes,
51 double flush_age)
52 : m_cct(NULL), m_object_oid_prefix(object_oid_prefix),
53 m_journal_metadata(journal_metadata), m_flush_interval(flush_interval),
54 m_flush_bytes(flush_bytes), m_flush_age(flush_age), m_listener(this),
55 m_object_handler(this), m_lock("JournalerRecorder::m_lock"),
56 m_current_set(m_journal_metadata->get_active_set()) {
57
58 Mutex::Locker locker(m_lock);
59 m_ioctx.dup(ioctx);
60 m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
61
62 uint8_t splay_width = m_journal_metadata->get_splay_width();
63 for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) {
64 m_object_locks.push_back(shared_ptr<Mutex>(
65 new Mutex("ObjectRecorder::m_lock::"+
66 std::to_string(splay_offset))));
67 uint64_t object_number = splay_offset + (m_current_set * splay_width);
68 m_object_ptrs[splay_offset] = create_object_recorder(
69 object_number,
70 m_object_locks[splay_offset]);
71 }
72
73 m_journal_metadata->add_listener(&m_listener);
74}
75
76JournalRecorder::~JournalRecorder() {
77 m_journal_metadata->remove_listener(&m_listener);
78
79 Mutex::Locker locker(m_lock);
80 assert(m_in_flight_advance_sets == 0);
81 assert(m_in_flight_object_closes == 0);
82}
83
84Future JournalRecorder::append(uint64_t tag_tid,
85 const bufferlist &payload_bl) {
86
87 m_lock.Lock();
88
89 uint64_t entry_tid = m_journal_metadata->allocate_entry_tid(tag_tid);
90 uint8_t splay_width = m_journal_metadata->get_splay_width();
91 uint8_t splay_offset = entry_tid % splay_width;
92
93 ObjectRecorderPtr object_ptr = get_object(splay_offset);
94 uint64_t commit_tid = m_journal_metadata->allocate_commit_tid(
95 object_ptr->get_object_number(), tag_tid, entry_tid);
96 FutureImplPtr future(new FutureImpl(tag_tid, entry_tid, commit_tid));
97 future->init(m_prev_future);
98 m_prev_future = future;
99
100 m_object_locks[splay_offset]->Lock();
101 m_lock.Unlock();
102
103 bufferlist entry_bl;
104 ::encode(Entry(future->get_tag_tid(), future->get_entry_tid(), payload_bl),
105 entry_bl);
106 assert(entry_bl.length() <= m_journal_metadata->get_object_size());
107
108 bool object_full = object_ptr->append_unlock({{future, entry_bl}});
109 if (object_full) {
110 ldout(m_cct, 10) << "object " << object_ptr->get_oid() << " now full"
111 << dendl;
112 Mutex::Locker l(m_lock);
113 close_and_advance_object_set(object_ptr->get_object_number() / splay_width);
114 }
115 return Future(future);
116}
117
118void JournalRecorder::flush(Context *on_safe) {
119 C_Flush *ctx;
120 {
121 Mutex::Locker locker(m_lock);
122
123 ctx = new C_Flush(m_journal_metadata, on_safe, m_object_ptrs.size() + 1);
124 for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin();
125 it != m_object_ptrs.end(); ++it) {
126 it->second->flush(ctx);
127 }
128
129 }
130
131 // avoid holding the lock in case there is nothing to flush
132 ctx->complete(0);
133}
134
135ObjectRecorderPtr JournalRecorder::get_object(uint8_t splay_offset) {
136 assert(m_lock.is_locked());
137
138 ObjectRecorderPtr object_recoder = m_object_ptrs[splay_offset];
139 assert(object_recoder != NULL);
140 return object_recoder;
141}
142
143void JournalRecorder::close_and_advance_object_set(uint64_t object_set) {
144 assert(m_lock.is_locked());
145
146 // entry overflow from open object
147 if (m_current_set != object_set) {
148 ldout(m_cct, 20) << __func__ << ": close already in-progress" << dendl;
149 return;
150 }
151
152 // we shouldn't overflow upon append if already closed and we
153 // shouldn't receive an overflowed callback if already closed
154 assert(m_in_flight_advance_sets == 0);
155 assert(m_in_flight_object_closes == 0);
156
157 uint64_t active_set = m_journal_metadata->get_active_set();
158 assert(m_current_set == active_set);
159 ++m_current_set;
160 ++m_in_flight_advance_sets;
161
162 ldout(m_cct, 20) << __func__ << ": closing active object set "
163 << object_set << dendl;
164 if (close_object_set(m_current_set)) {
165 advance_object_set();
166 }
167}
168
169void JournalRecorder::advance_object_set() {
170 assert(m_lock.is_locked());
171
172 assert(m_in_flight_object_closes == 0);
173 ldout(m_cct, 20) << __func__ << ": advance to object set " << m_current_set
174 << dendl;
175 m_journal_metadata->set_active_set(m_current_set, new C_AdvanceObjectSet(
176 this));
177}
178
179void JournalRecorder::handle_advance_object_set(int r) {
180 Mutex::Locker locker(m_lock);
181 ldout(m_cct, 20) << __func__ << ": r=" << r << dendl;
182
183 assert(m_in_flight_advance_sets > 0);
184 --m_in_flight_advance_sets;
185
186 if (r < 0 && r != -ESTALE) {
187 lderr(m_cct) << __func__ << ": failed to advance object set: "
188 << cpp_strerror(r) << dendl;
189 }
190
191 if (m_in_flight_advance_sets == 0 && m_in_flight_object_closes == 0) {
192 open_object_set();
193 }
194}
195
196void JournalRecorder::open_object_set() {
197 assert(m_lock.is_locked());
198
199 ldout(m_cct, 10) << __func__ << ": opening object set " << m_current_set
200 << dendl;
201
202 uint8_t splay_width = m_journal_metadata->get_splay_width();
203
204 lock_object_recorders();
205 for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin();
206 it != m_object_ptrs.end(); ++it) {
207 ObjectRecorderPtr object_recorder = it->second;
208 uint64_t object_number = object_recorder->get_object_number();
209 if (object_number / splay_width != m_current_set) {
210 assert(object_recorder->is_closed());
211
212 // ready to close object and open object in active set
213 create_next_object_recorder_unlock(object_recorder);
214 } else {
215 uint8_t splay_offset = object_number % splay_width;
216 m_object_locks[splay_offset]->Unlock();
217 }
218 }
219}
220
221bool JournalRecorder::close_object_set(uint64_t active_set) {
222 assert(m_lock.is_locked());
223
224 // object recorders will invoke overflow handler as they complete
225 // closing the object to ensure correct order of future appends
226 uint8_t splay_width = m_journal_metadata->get_splay_width();
227 lock_object_recorders();
228 for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin();
229 it != m_object_ptrs.end(); ++it) {
230 ObjectRecorderPtr object_recorder = it->second;
231 if (object_recorder->get_object_number() / splay_width != active_set) {
232 ldout(m_cct, 10) << __func__ << ": closing object "
233 << object_recorder->get_oid() << dendl;
234 // flush out all queued appends and hold future appends
235 if (!object_recorder->close()) {
236 ++m_in_flight_object_closes;
237 } else {
238 ldout(m_cct, 20) << __func__ << ": object "
239 << object_recorder->get_oid() << " closed" << dendl;
240 }
241 }
242 }
243 unlock_object_recorders();
244 return (m_in_flight_object_closes == 0);
245}
246
247ObjectRecorderPtr JournalRecorder::create_object_recorder(
248 uint64_t object_number, shared_ptr<Mutex> lock) {
249 ObjectRecorderPtr object_recorder(new ObjectRecorder(
250 m_ioctx, utils::get_object_name(m_object_oid_prefix, object_number),
251 object_number, lock, m_journal_metadata->get_work_queue(),
252 m_journal_metadata->get_timer(), m_journal_metadata->get_timer_lock(),
253 &m_object_handler, m_journal_metadata->get_order(), m_flush_interval,
254 m_flush_bytes, m_flush_age));
255 return object_recorder;
256}
257
258void JournalRecorder::create_next_object_recorder_unlock(
259 ObjectRecorderPtr object_recorder) {
260 assert(m_lock.is_locked());
261
262 uint64_t object_number = object_recorder->get_object_number();
263 uint8_t splay_width = m_journal_metadata->get_splay_width();
264 uint8_t splay_offset = object_number % splay_width;
265
266 assert(m_object_locks[splay_offset]->is_locked());
267
268 ObjectRecorderPtr new_object_recorder = create_object_recorder(
269 (m_current_set * splay_width) + splay_offset, m_object_locks[splay_offset]);
270
271 ldout(m_cct, 10) << __func__ << ": "
272 << "old oid=" << object_recorder->get_oid() << ", "
273 << "new oid=" << new_object_recorder->get_oid() << dendl;
274 AppendBuffers append_buffers;
275 object_recorder->claim_append_buffers(&append_buffers);
276
277 // update the commit record to point to the correct object number
278 for (auto &append_buffer : append_buffers) {
279 m_journal_metadata->overflow_commit_tid(
280 append_buffer.first->get_commit_tid(),
281 new_object_recorder->get_object_number());
282 }
283
284 new_object_recorder->append_unlock(std::move(append_buffers));
285 m_object_ptrs[splay_offset] = new_object_recorder;
286}
287
288void JournalRecorder::handle_update() {
289 Mutex::Locker locker(m_lock);
290
291 uint64_t active_set = m_journal_metadata->get_active_set();
292 if (m_current_set < active_set) {
293 // peer journal client advanced the active set
294 ldout(m_cct, 20) << __func__ << ": "
295 << "current_set=" << m_current_set << ", "
296 << "active_set=" << active_set << dendl;
297
298 uint64_t current_set = m_current_set;
299 m_current_set = active_set;
300 if (m_in_flight_advance_sets == 0 && m_in_flight_object_closes == 0) {
301 ldout(m_cct, 20) << __func__ << ": closing current object set "
302 << current_set << dendl;
303 if (close_object_set(active_set)) {
304 open_object_set();
305 }
306 }
307 }
308}
309
310void JournalRecorder::handle_closed(ObjectRecorder *object_recorder) {
311 ldout(m_cct, 10) << __func__ << ": " << object_recorder->get_oid() << dendl;
312
313 Mutex::Locker locker(m_lock);
314
315 uint64_t object_number = object_recorder->get_object_number();
316 uint8_t splay_width = m_journal_metadata->get_splay_width();
317 uint8_t splay_offset = object_number % splay_width;
318 ObjectRecorderPtr active_object_recorder = m_object_ptrs[splay_offset];
319 assert(active_object_recorder->get_object_number() == object_number);
320
321 assert(m_in_flight_object_closes > 0);
322 --m_in_flight_object_closes;
323
324 // object closed after advance active set committed
325 ldout(m_cct, 20) << __func__ << ": object "
326 << active_object_recorder->get_oid() << " closed" << dendl;
327 if (m_in_flight_object_closes == 0) {
328 if (m_in_flight_advance_sets == 0) {
329 // peer forced closing of object set
330 open_object_set();
331 } else {
332 // local overflow advanced object set
333 advance_object_set();
334 }
335 }
336}
337
338void JournalRecorder::handle_overflow(ObjectRecorder *object_recorder) {
339 ldout(m_cct, 10) << __func__ << ": " << object_recorder->get_oid() << dendl;
340
341 Mutex::Locker locker(m_lock);
342
343 uint64_t object_number = object_recorder->get_object_number();
344 uint8_t splay_width = m_journal_metadata->get_splay_width();
345 uint8_t splay_offset = object_number % splay_width;
346 ObjectRecorderPtr active_object_recorder = m_object_ptrs[splay_offset];
347 assert(active_object_recorder->get_object_number() == object_number);
348
349 ldout(m_cct, 20) << __func__ << ": object "
350 << active_object_recorder->get_oid() << " overflowed"
351 << dendl;
352 close_and_advance_object_set(object_number / splay_width);
353}
354
355} // namespace journal