]> git.proxmox.com Git - ceph.git/blame - ceph/src/journal/JournalRecorder.cc
update sources to v12.1.0
[ceph.git] / ceph / src / journal / JournalRecorder.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "journal/JournalRecorder.h"
5#include "common/errno.h"
6#include "journal/Entry.h"
7#include "journal/Utils.h"
8
31f18b77
FG
9#include <atomic>
10
7c673cae
FG
11#define dout_subsys ceph_subsys_journaler
12#undef dout_prefix
13#define dout_prefix *_dout << "JournalRecorder: " << this << " "
14
15using std::shared_ptr;
16
17namespace journal {
18
19namespace {
20
21struct C_Flush : public Context {
22 JournalMetadataPtr journal_metadata;
23 Context *on_finish;
31f18b77 24 std::atomic<int64_t> pending_flushes = { 0 };
7c673cae
FG
25 int ret_val;
26
27 C_Flush(JournalMetadataPtr _journal_metadata, Context *_on_finish,
28 size_t _pending_flushes)
29 : journal_metadata(_journal_metadata), on_finish(_on_finish),
30 pending_flushes(_pending_flushes), ret_val(0) {
31 }
32
33 void complete(int r) override {
34 if (r < 0 && ret_val == 0) {
35 ret_val = r;
36 }
31f18b77 37 if (--pending_flushes == 0) {
7c673cae
FG
38 // ensure all prior callback have been flushed as well
39 journal_metadata->queue(on_finish, ret_val);
40 delete this;
41 }
42 }
43 void finish(int r) override {
44 }
45};
46
47} // anonymous namespace
48
49JournalRecorder::JournalRecorder(librados::IoCtx &ioctx,
50 const std::string &object_oid_prefix,
51 const JournalMetadataPtr& journal_metadata,
52 uint32_t flush_interval, uint64_t flush_bytes,
53 double flush_age)
54 : m_cct(NULL), m_object_oid_prefix(object_oid_prefix),
55 m_journal_metadata(journal_metadata), m_flush_interval(flush_interval),
56 m_flush_bytes(flush_bytes), m_flush_age(flush_age), m_listener(this),
57 m_object_handler(this), m_lock("JournalerRecorder::m_lock"),
58 m_current_set(m_journal_metadata->get_active_set()) {
59
60 Mutex::Locker locker(m_lock);
61 m_ioctx.dup(ioctx);
62 m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
63
64 uint8_t splay_width = m_journal_metadata->get_splay_width();
65 for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) {
66 m_object_locks.push_back(shared_ptr<Mutex>(
67 new Mutex("ObjectRecorder::m_lock::"+
68 std::to_string(splay_offset))));
69 uint64_t object_number = splay_offset + (m_current_set * splay_width);
70 m_object_ptrs[splay_offset] = create_object_recorder(
71 object_number,
72 m_object_locks[splay_offset]);
73 }
74
75 m_journal_metadata->add_listener(&m_listener);
76}
77
78JournalRecorder::~JournalRecorder() {
79 m_journal_metadata->remove_listener(&m_listener);
80
81 Mutex::Locker locker(m_lock);
82 assert(m_in_flight_advance_sets == 0);
83 assert(m_in_flight_object_closes == 0);
84}
85
86Future JournalRecorder::append(uint64_t tag_tid,
87 const bufferlist &payload_bl) {
88
89 m_lock.Lock();
90
91 uint64_t entry_tid = m_journal_metadata->allocate_entry_tid(tag_tid);
92 uint8_t splay_width = m_journal_metadata->get_splay_width();
93 uint8_t splay_offset = entry_tid % splay_width;
94
95 ObjectRecorderPtr object_ptr = get_object(splay_offset);
96 uint64_t commit_tid = m_journal_metadata->allocate_commit_tid(
97 object_ptr->get_object_number(), tag_tid, entry_tid);
98 FutureImplPtr future(new FutureImpl(tag_tid, entry_tid, commit_tid));
99 future->init(m_prev_future);
100 m_prev_future = future;
101
102 m_object_locks[splay_offset]->Lock();
103 m_lock.Unlock();
104
105 bufferlist entry_bl;
106 ::encode(Entry(future->get_tag_tid(), future->get_entry_tid(), payload_bl),
107 entry_bl);
108 assert(entry_bl.length() <= m_journal_metadata->get_object_size());
109
110 bool object_full = object_ptr->append_unlock({{future, entry_bl}});
111 if (object_full) {
112 ldout(m_cct, 10) << "object " << object_ptr->get_oid() << " now full"
113 << dendl;
114 Mutex::Locker l(m_lock);
115 close_and_advance_object_set(object_ptr->get_object_number() / splay_width);
116 }
117 return Future(future);
118}
119
120void JournalRecorder::flush(Context *on_safe) {
121 C_Flush *ctx;
122 {
123 Mutex::Locker locker(m_lock);
124
125 ctx = new C_Flush(m_journal_metadata, on_safe, m_object_ptrs.size() + 1);
126 for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin();
127 it != m_object_ptrs.end(); ++it) {
128 it->second->flush(ctx);
129 }
130
131 }
132
133 // avoid holding the lock in case there is nothing to flush
134 ctx->complete(0);
135}
136
137ObjectRecorderPtr JournalRecorder::get_object(uint8_t splay_offset) {
138 assert(m_lock.is_locked());
139
140 ObjectRecorderPtr object_recoder = m_object_ptrs[splay_offset];
141 assert(object_recoder != NULL);
142 return object_recoder;
143}
144
145void JournalRecorder::close_and_advance_object_set(uint64_t object_set) {
146 assert(m_lock.is_locked());
147
148 // entry overflow from open object
149 if (m_current_set != object_set) {
150 ldout(m_cct, 20) << __func__ << ": close already in-progress" << dendl;
151 return;
152 }
153
154 // we shouldn't overflow upon append if already closed and we
155 // shouldn't receive an overflowed callback if already closed
156 assert(m_in_flight_advance_sets == 0);
157 assert(m_in_flight_object_closes == 0);
158
159 uint64_t active_set = m_journal_metadata->get_active_set();
160 assert(m_current_set == active_set);
161 ++m_current_set;
162 ++m_in_flight_advance_sets;
163
164 ldout(m_cct, 20) << __func__ << ": closing active object set "
165 << object_set << dendl;
166 if (close_object_set(m_current_set)) {
167 advance_object_set();
168 }
169}
170
171void JournalRecorder::advance_object_set() {
172 assert(m_lock.is_locked());
173
174 assert(m_in_flight_object_closes == 0);
175 ldout(m_cct, 20) << __func__ << ": advance to object set " << m_current_set
176 << dendl;
177 m_journal_metadata->set_active_set(m_current_set, new C_AdvanceObjectSet(
178 this));
179}
180
181void JournalRecorder::handle_advance_object_set(int r) {
182 Mutex::Locker locker(m_lock);
183 ldout(m_cct, 20) << __func__ << ": r=" << r << dendl;
184
185 assert(m_in_flight_advance_sets > 0);
186 --m_in_flight_advance_sets;
187
188 if (r < 0 && r != -ESTALE) {
189 lderr(m_cct) << __func__ << ": failed to advance object set: "
190 << cpp_strerror(r) << dendl;
191 }
192
193 if (m_in_flight_advance_sets == 0 && m_in_flight_object_closes == 0) {
194 open_object_set();
195 }
196}
197
198void JournalRecorder::open_object_set() {
199 assert(m_lock.is_locked());
200
201 ldout(m_cct, 10) << __func__ << ": opening object set " << m_current_set
202 << dendl;
203
204 uint8_t splay_width = m_journal_metadata->get_splay_width();
205
206 lock_object_recorders();
207 for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin();
208 it != m_object_ptrs.end(); ++it) {
209 ObjectRecorderPtr object_recorder = it->second;
210 uint64_t object_number = object_recorder->get_object_number();
211 if (object_number / splay_width != m_current_set) {
212 assert(object_recorder->is_closed());
213
214 // ready to close object and open object in active set
215 create_next_object_recorder_unlock(object_recorder);
216 } else {
217 uint8_t splay_offset = object_number % splay_width;
218 m_object_locks[splay_offset]->Unlock();
219 }
220 }
221}
222
223bool JournalRecorder::close_object_set(uint64_t active_set) {
224 assert(m_lock.is_locked());
225
226 // object recorders will invoke overflow handler as they complete
227 // closing the object to ensure correct order of future appends
228 uint8_t splay_width = m_journal_metadata->get_splay_width();
229 lock_object_recorders();
230 for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin();
231 it != m_object_ptrs.end(); ++it) {
232 ObjectRecorderPtr object_recorder = it->second;
233 if (object_recorder->get_object_number() / splay_width != active_set) {
234 ldout(m_cct, 10) << __func__ << ": closing object "
235 << object_recorder->get_oid() << dendl;
236 // flush out all queued appends and hold future appends
237 if (!object_recorder->close()) {
238 ++m_in_flight_object_closes;
239 } else {
240 ldout(m_cct, 20) << __func__ << ": object "
241 << object_recorder->get_oid() << " closed" << dendl;
242 }
243 }
244 }
245 unlock_object_recorders();
246 return (m_in_flight_object_closes == 0);
247}
248
249ObjectRecorderPtr JournalRecorder::create_object_recorder(
250 uint64_t object_number, shared_ptr<Mutex> lock) {
251 ObjectRecorderPtr object_recorder(new ObjectRecorder(
252 m_ioctx, utils::get_object_name(m_object_oid_prefix, object_number),
253 object_number, lock, m_journal_metadata->get_work_queue(),
254 m_journal_metadata->get_timer(), m_journal_metadata->get_timer_lock(),
255 &m_object_handler, m_journal_metadata->get_order(), m_flush_interval,
256 m_flush_bytes, m_flush_age));
257 return object_recorder;
258}
259
260void JournalRecorder::create_next_object_recorder_unlock(
261 ObjectRecorderPtr object_recorder) {
262 assert(m_lock.is_locked());
263
264 uint64_t object_number = object_recorder->get_object_number();
265 uint8_t splay_width = m_journal_metadata->get_splay_width();
266 uint8_t splay_offset = object_number % splay_width;
267
268 assert(m_object_locks[splay_offset]->is_locked());
269
270 ObjectRecorderPtr new_object_recorder = create_object_recorder(
271 (m_current_set * splay_width) + splay_offset, m_object_locks[splay_offset]);
272
273 ldout(m_cct, 10) << __func__ << ": "
274 << "old oid=" << object_recorder->get_oid() << ", "
275 << "new oid=" << new_object_recorder->get_oid() << dendl;
276 AppendBuffers append_buffers;
277 object_recorder->claim_append_buffers(&append_buffers);
278
279 // update the commit record to point to the correct object number
280 for (auto &append_buffer : append_buffers) {
281 m_journal_metadata->overflow_commit_tid(
282 append_buffer.first->get_commit_tid(),
283 new_object_recorder->get_object_number());
284 }
285
286 new_object_recorder->append_unlock(std::move(append_buffers));
287 m_object_ptrs[splay_offset] = new_object_recorder;
288}
289
290void JournalRecorder::handle_update() {
291 Mutex::Locker locker(m_lock);
292
293 uint64_t active_set = m_journal_metadata->get_active_set();
294 if (m_current_set < active_set) {
295 // peer journal client advanced the active set
296 ldout(m_cct, 20) << __func__ << ": "
297 << "current_set=" << m_current_set << ", "
298 << "active_set=" << active_set << dendl;
299
300 uint64_t current_set = m_current_set;
301 m_current_set = active_set;
302 if (m_in_flight_advance_sets == 0 && m_in_flight_object_closes == 0) {
303 ldout(m_cct, 20) << __func__ << ": closing current object set "
304 << current_set << dendl;
305 if (close_object_set(active_set)) {
306 open_object_set();
307 }
308 }
309 }
310}
311
312void JournalRecorder::handle_closed(ObjectRecorder *object_recorder) {
313 ldout(m_cct, 10) << __func__ << ": " << object_recorder->get_oid() << dendl;
314
315 Mutex::Locker locker(m_lock);
316
317 uint64_t object_number = object_recorder->get_object_number();
318 uint8_t splay_width = m_journal_metadata->get_splay_width();
319 uint8_t splay_offset = object_number % splay_width;
320 ObjectRecorderPtr active_object_recorder = m_object_ptrs[splay_offset];
321 assert(active_object_recorder->get_object_number() == object_number);
322
323 assert(m_in_flight_object_closes > 0);
324 --m_in_flight_object_closes;
325
326 // object closed after advance active set committed
327 ldout(m_cct, 20) << __func__ << ": object "
328 << active_object_recorder->get_oid() << " closed" << dendl;
329 if (m_in_flight_object_closes == 0) {
330 if (m_in_flight_advance_sets == 0) {
331 // peer forced closing of object set
332 open_object_set();
333 } else {
334 // local overflow advanced object set
335 advance_object_set();
336 }
337 }
338}
339
340void JournalRecorder::handle_overflow(ObjectRecorder *object_recorder) {
341 ldout(m_cct, 10) << __func__ << ": " << object_recorder->get_oid() << dendl;
342
343 Mutex::Locker locker(m_lock);
344
345 uint64_t object_number = object_recorder->get_object_number();
346 uint8_t splay_width = m_journal_metadata->get_splay_width();
347 uint8_t splay_offset = object_number % splay_width;
348 ObjectRecorderPtr active_object_recorder = m_object_ptrs[splay_offset];
349 assert(active_object_recorder->get_object_number() == object_number);
350
351 ldout(m_cct, 20) << __func__ << ": object "
352 << active_object_recorder->get_oid() << " overflowed"
353 << dendl;
354 close_and_advance_object_set(object_number / splay_width);
355}
356
357} // namespace journal