]> git.proxmox.com Git - ceph.git/blob - ceph/src/journal/JournalRecorder.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / journal / JournalRecorder.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "journal/JournalRecorder.h"
5 #include "common/errno.h"
6 #include "journal/Entry.h"
7 #include "journal/Utils.h"
8
9 #include <atomic>
10
11 #define dout_subsys ceph_subsys_journaler
12 #undef dout_prefix
13 #define dout_prefix *_dout << "JournalRecorder: " << this << " "
14
15 using std::shared_ptr;
16
17 namespace journal {
18
19 namespace {
20
21 struct C_Flush : public Context {
22 JournalMetadataPtr journal_metadata;
23 Context *on_finish;
24 std::atomic<int64_t> pending_flushes = { 0 };
25 int ret_val;
26
27 C_Flush(JournalMetadataPtr _journal_metadata, Context *_on_finish,
28 size_t _pending_flushes)
29 : journal_metadata(_journal_metadata), on_finish(_on_finish),
30 pending_flushes(_pending_flushes), ret_val(0) {
31 }
32
33 void complete(int r) override {
34 if (r < 0 && ret_val == 0) {
35 ret_val = r;
36 }
37 if (--pending_flushes == 0) {
38 // ensure all prior callback have been flushed as well
39 journal_metadata->queue(on_finish, ret_val);
40 delete this;
41 }
42 }
43 void finish(int r) override {
44 }
45 };
46
47 } // anonymous namespace
48
49 JournalRecorder::JournalRecorder(librados::IoCtx &ioctx,
50 const std::string &object_oid_prefix,
51 const JournalMetadataPtr& journal_metadata,
52 uint32_t flush_interval, uint64_t flush_bytes,
53 double flush_age,
54 uint64_t max_in_flight_appends)
55 : m_cct(NULL), m_object_oid_prefix(object_oid_prefix),
56 m_journal_metadata(journal_metadata), m_flush_interval(flush_interval),
57 m_flush_bytes(flush_bytes), m_flush_age(flush_age),
58 m_max_in_flight_appends(max_in_flight_appends), m_listener(this),
59 m_object_handler(this), m_lock("JournalerRecorder::m_lock"),
60 m_current_set(m_journal_metadata->get_active_set()) {
61
62 Mutex::Locker locker(m_lock);
63 m_ioctx.dup(ioctx);
64 m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
65
66 uint8_t splay_width = m_journal_metadata->get_splay_width();
67 for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) {
68 m_object_locks.push_back(shared_ptr<Mutex>(
69 new Mutex("ObjectRecorder::m_lock::"+
70 std::to_string(splay_offset))));
71 uint64_t object_number = splay_offset + (m_current_set * splay_width);
72 m_object_ptrs[splay_offset] = create_object_recorder(
73 object_number,
74 m_object_locks[splay_offset]);
75 }
76
77 m_journal_metadata->add_listener(&m_listener);
78 }
79
80 JournalRecorder::~JournalRecorder() {
81 m_journal_metadata->remove_listener(&m_listener);
82
83 Mutex::Locker locker(m_lock);
84 ceph_assert(m_in_flight_advance_sets == 0);
85 ceph_assert(m_in_flight_object_closes == 0);
86 }
87
88 Future JournalRecorder::append(uint64_t tag_tid,
89 const bufferlist &payload_bl) {
90
91 m_lock.Lock();
92
93 uint64_t entry_tid = m_journal_metadata->allocate_entry_tid(tag_tid);
94 uint8_t splay_width = m_journal_metadata->get_splay_width();
95 uint8_t splay_offset = entry_tid % splay_width;
96
97 ObjectRecorderPtr object_ptr = get_object(splay_offset);
98 uint64_t commit_tid = m_journal_metadata->allocate_commit_tid(
99 object_ptr->get_object_number(), tag_tid, entry_tid);
100 FutureImplPtr future(new FutureImpl(tag_tid, entry_tid, commit_tid));
101 future->init(m_prev_future);
102 m_prev_future = future;
103
104 m_object_locks[splay_offset]->Lock();
105 m_lock.Unlock();
106
107 bufferlist entry_bl;
108 encode(Entry(future->get_tag_tid(), future->get_entry_tid(), payload_bl),
109 entry_bl);
110 ceph_assert(entry_bl.length() <= m_journal_metadata->get_object_size());
111
112 bool object_full = object_ptr->append_unlock({{future, entry_bl}});
113 if (object_full) {
114 ldout(m_cct, 10) << "object " << object_ptr->get_oid() << " now full"
115 << dendl;
116 Mutex::Locker l(m_lock);
117 close_and_advance_object_set(object_ptr->get_object_number() / splay_width);
118 }
119 return Future(future);
120 }
121
122 void JournalRecorder::flush(Context *on_safe) {
123 C_Flush *ctx;
124 {
125 Mutex::Locker locker(m_lock);
126
127 ctx = new C_Flush(m_journal_metadata, on_safe, m_object_ptrs.size() + 1);
128 for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin();
129 it != m_object_ptrs.end(); ++it) {
130 it->second->flush(ctx);
131 }
132
133 }
134
135 // avoid holding the lock in case there is nothing to flush
136 ctx->complete(0);
137 }
138
139 ObjectRecorderPtr JournalRecorder::get_object(uint8_t splay_offset) {
140 ceph_assert(m_lock.is_locked());
141
142 ObjectRecorderPtr object_recoder = m_object_ptrs[splay_offset];
143 ceph_assert(object_recoder != NULL);
144 return object_recoder;
145 }
146
147 void JournalRecorder::close_and_advance_object_set(uint64_t object_set) {
148 ceph_assert(m_lock.is_locked());
149
150 // entry overflow from open object
151 if (m_current_set != object_set) {
152 ldout(m_cct, 20) << __func__ << ": close already in-progress" << dendl;
153 return;
154 }
155
156 // we shouldn't overflow upon append if already closed and we
157 // shouldn't receive an overflowed callback if already closed
158 ceph_assert(m_in_flight_advance_sets == 0);
159 ceph_assert(m_in_flight_object_closes == 0);
160
161 uint64_t active_set = m_journal_metadata->get_active_set();
162 ceph_assert(m_current_set == active_set);
163 ++m_current_set;
164 ++m_in_flight_advance_sets;
165
166 ldout(m_cct, 20) << __func__ << ": closing active object set "
167 << object_set << dendl;
168 if (close_object_set(m_current_set)) {
169 advance_object_set();
170 }
171 }
172
173 void JournalRecorder::advance_object_set() {
174 ceph_assert(m_lock.is_locked());
175
176 ceph_assert(m_in_flight_object_closes == 0);
177 ldout(m_cct, 20) << __func__ << ": advance to object set " << m_current_set
178 << dendl;
179 m_journal_metadata->set_active_set(m_current_set, new C_AdvanceObjectSet(
180 this));
181 }
182
183 void JournalRecorder::handle_advance_object_set(int r) {
184 Mutex::Locker locker(m_lock);
185 ldout(m_cct, 20) << __func__ << ": r=" << r << dendl;
186
187 ceph_assert(m_in_flight_advance_sets > 0);
188 --m_in_flight_advance_sets;
189
190 if (r < 0 && r != -ESTALE) {
191 lderr(m_cct) << __func__ << ": failed to advance object set: "
192 << cpp_strerror(r) << dendl;
193 }
194
195 if (m_in_flight_advance_sets == 0 && m_in_flight_object_closes == 0) {
196 open_object_set();
197 }
198 }
199
200 void JournalRecorder::open_object_set() {
201 ceph_assert(m_lock.is_locked());
202
203 ldout(m_cct, 10) << __func__ << ": opening object set " << m_current_set
204 << dendl;
205
206 uint8_t splay_width = m_journal_metadata->get_splay_width();
207
208 lock_object_recorders();
209 for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin();
210 it != m_object_ptrs.end(); ++it) {
211 ObjectRecorderPtr object_recorder = it->second;
212 uint64_t object_number = object_recorder->get_object_number();
213 if (object_number / splay_width != m_current_set) {
214 ceph_assert(object_recorder->is_closed());
215
216 // ready to close object and open object in active set
217 create_next_object_recorder_unlock(object_recorder);
218 } else {
219 uint8_t splay_offset = object_number % splay_width;
220 m_object_locks[splay_offset]->Unlock();
221 }
222 }
223 }
224
225 bool JournalRecorder::close_object_set(uint64_t active_set) {
226 ceph_assert(m_lock.is_locked());
227
228 // object recorders will invoke overflow handler as they complete
229 // closing the object to ensure correct order of future appends
230 uint8_t splay_width = m_journal_metadata->get_splay_width();
231 lock_object_recorders();
232 for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin();
233 it != m_object_ptrs.end(); ++it) {
234 ObjectRecorderPtr object_recorder = it->second;
235 if (object_recorder->get_object_number() / splay_width != active_set) {
236 ldout(m_cct, 10) << __func__ << ": closing object "
237 << object_recorder->get_oid() << dendl;
238 // flush out all queued appends and hold future appends
239 if (!object_recorder->close()) {
240 ++m_in_flight_object_closes;
241 } else {
242 ldout(m_cct, 20) << __func__ << ": object "
243 << object_recorder->get_oid() << " closed" << dendl;
244 }
245 }
246 }
247 unlock_object_recorders();
248 return (m_in_flight_object_closes == 0);
249 }
250
251 ObjectRecorderPtr JournalRecorder::create_object_recorder(
252 uint64_t object_number, shared_ptr<Mutex> lock) {
253 ObjectRecorderPtr object_recorder(new ObjectRecorder(
254 m_ioctx, utils::get_object_name(m_object_oid_prefix, object_number),
255 object_number, lock, m_journal_metadata->get_work_queue(),
256 m_journal_metadata->get_timer(), m_journal_metadata->get_timer_lock(),
257 &m_object_handler, m_journal_metadata->get_order(), m_flush_interval,
258 m_flush_bytes, m_flush_age, m_max_in_flight_appends));
259 return object_recorder;
260 }
261
262 void JournalRecorder::create_next_object_recorder_unlock(
263 ObjectRecorderPtr object_recorder) {
264 ceph_assert(m_lock.is_locked());
265
266 uint64_t object_number = object_recorder->get_object_number();
267 uint8_t splay_width = m_journal_metadata->get_splay_width();
268 uint8_t splay_offset = object_number % splay_width;
269
270 ceph_assert(m_object_locks[splay_offset]->is_locked());
271
272 ObjectRecorderPtr new_object_recorder = create_object_recorder(
273 (m_current_set * splay_width) + splay_offset, m_object_locks[splay_offset]);
274
275 ldout(m_cct, 10) << __func__ << ": "
276 << "old oid=" << object_recorder->get_oid() << ", "
277 << "new oid=" << new_object_recorder->get_oid() << dendl;
278 AppendBuffers append_buffers;
279 object_recorder->claim_append_buffers(&append_buffers);
280
281 // update the commit record to point to the correct object number
282 for (auto &append_buffer : append_buffers) {
283 m_journal_metadata->overflow_commit_tid(
284 append_buffer.first->get_commit_tid(),
285 new_object_recorder->get_object_number());
286 }
287
288 new_object_recorder->append_unlock(std::move(append_buffers));
289 m_object_ptrs[splay_offset] = new_object_recorder;
290 }
291
292 void JournalRecorder::handle_update() {
293 Mutex::Locker locker(m_lock);
294
295 uint64_t active_set = m_journal_metadata->get_active_set();
296 if (m_current_set < active_set) {
297 // peer journal client advanced the active set
298 ldout(m_cct, 20) << __func__ << ": "
299 << "current_set=" << m_current_set << ", "
300 << "active_set=" << active_set << dendl;
301
302 uint64_t current_set = m_current_set;
303 m_current_set = active_set;
304 if (m_in_flight_advance_sets == 0 && m_in_flight_object_closes == 0) {
305 ldout(m_cct, 20) << __func__ << ": closing current object set "
306 << current_set << dendl;
307 if (close_object_set(active_set)) {
308 open_object_set();
309 }
310 }
311 }
312 }
313
314 void JournalRecorder::handle_closed(ObjectRecorder *object_recorder) {
315 ldout(m_cct, 10) << __func__ << ": " << object_recorder->get_oid() << dendl;
316
317 Mutex::Locker locker(m_lock);
318
319 uint64_t object_number = object_recorder->get_object_number();
320 uint8_t splay_width = m_journal_metadata->get_splay_width();
321 uint8_t splay_offset = object_number % splay_width;
322 ObjectRecorderPtr active_object_recorder = m_object_ptrs[splay_offset];
323 ceph_assert(active_object_recorder->get_object_number() == object_number);
324
325 ceph_assert(m_in_flight_object_closes > 0);
326 --m_in_flight_object_closes;
327
328 // object closed after advance active set committed
329 ldout(m_cct, 20) << __func__ << ": object "
330 << active_object_recorder->get_oid() << " closed" << dendl;
331 if (m_in_flight_object_closes == 0) {
332 if (m_in_flight_advance_sets == 0) {
333 // peer forced closing of object set
334 open_object_set();
335 } else {
336 // local overflow advanced object set
337 advance_object_set();
338 }
339 }
340 }
341
342 void JournalRecorder::handle_overflow(ObjectRecorder *object_recorder) {
343 ldout(m_cct, 10) << __func__ << ": " << object_recorder->get_oid() << dendl;
344
345 Mutex::Locker locker(m_lock);
346
347 uint64_t object_number = object_recorder->get_object_number();
348 uint8_t splay_width = m_journal_metadata->get_splay_width();
349 uint8_t splay_offset = object_number % splay_width;
350 ObjectRecorderPtr active_object_recorder = m_object_ptrs[splay_offset];
351 ceph_assert(active_object_recorder->get_object_number() == object_number);
352
353 ldout(m_cct, 20) << __func__ << ": object "
354 << active_object_recorder->get_oid() << " overflowed"
355 << dendl;
356 close_and_advance_object_set(object_number / splay_width);
357 }
358
359 } // namespace journal