]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "journal/JournalRecorder.h" | |
5 | #include "common/errno.h" | |
6 | #include "journal/Entry.h" | |
7 | #include "journal/Utils.h" | |
8 | ||
31f18b77 FG |
9 | #include <atomic> |
10 | ||
7c673cae FG |
11 | #define dout_subsys ceph_subsys_journaler |
12 | #undef dout_prefix | |
494da23a TL |
13 | #define dout_prefix *_dout << "JournalRecorder: " << this << " " << __func__ \ |
14 | << ": " | |
7c673cae FG |
15 | |
16 | using std::shared_ptr; | |
17 | ||
18 | namespace journal { | |
19 | ||
20 | namespace { | |
21 | ||
22 | struct C_Flush : public Context { | |
9f95a23c | 23 | ceph::ref_t<JournalMetadata> journal_metadata; |
7c673cae | 24 | Context *on_finish; |
9f95a23c TL |
25 | std::atomic<int64_t> pending_flushes{0}; |
26 | int ret_val = 0; | |
7c673cae | 27 | |
9f95a23c | 28 | C_Flush(ceph::ref_t<JournalMetadata> _journal_metadata, Context *_on_finish, |
7c673cae | 29 | size_t _pending_flushes) |
9f95a23c TL |
30 | : journal_metadata(std::move(_journal_metadata)), |
31 | on_finish(_on_finish), | |
32 | pending_flushes(_pending_flushes) { | |
7c673cae FG |
33 | } |
34 | ||
35 | void complete(int r) override { | |
36 | if (r < 0 && ret_val == 0) { | |
37 | ret_val = r; | |
38 | } | |
31f18b77 | 39 | if (--pending_flushes == 0) { |
7c673cae FG |
40 | // ensure all prior callback have been flushed as well |
41 | journal_metadata->queue(on_finish, ret_val); | |
42 | delete this; | |
43 | } | |
44 | } | |
45 | void finish(int r) override { | |
46 | } | |
47 | }; | |
48 | ||
49 | } // anonymous namespace | |
50 | ||
51 | JournalRecorder::JournalRecorder(librados::IoCtx &ioctx, | |
9f95a23c TL |
52 | std::string_view object_oid_prefix, |
53 | ceph::ref_t<JournalMetadata> journal_metadata, | |
11fdf7f2 | 54 | uint64_t max_in_flight_appends) |
9f95a23c TL |
55 | : m_object_oid_prefix(object_oid_prefix), |
56 | m_journal_metadata(std::move(journal_metadata)), | |
57 | m_max_in_flight_appends(max_in_flight_appends), | |
58 | m_listener(this), | |
59 | m_object_handler(this), | |
60 | m_current_set(m_journal_metadata->get_active_set()), | |
61 | m_object_locks{ceph::make_lock_container<ceph::mutex>( | |
62 | m_journal_metadata->get_splay_width(), [](const size_t splay_offset) { | |
63 | return ceph::make_mutex("ObjectRecorder::m_lock::" + | |
64 | std::to_string(splay_offset)); | |
65 | })} | |
66 | { | |
67 | std::lock_guard locker{m_lock}; | |
7c673cae FG |
68 | m_ioctx.dup(ioctx); |
69 | m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct()); | |
70 | ||
71 | uint8_t splay_width = m_journal_metadata->get_splay_width(); | |
72 | for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) { | |
7c673cae | 73 | uint64_t object_number = splay_offset + (m_current_set * splay_width); |
9f95a23c | 74 | std::lock_guard locker{m_object_locks[splay_offset]}; |
7c673cae | 75 | m_object_ptrs[splay_offset] = create_object_recorder( |
9f95a23c | 76 | object_number, &m_object_locks[splay_offset]); |
7c673cae FG |
77 | } |
78 | ||
79 | m_journal_metadata->add_listener(&m_listener); | |
80 | } | |
81 | ||
82 | JournalRecorder::~JournalRecorder() { | |
83 | m_journal_metadata->remove_listener(&m_listener); | |
84 | ||
9f95a23c | 85 | std::lock_guard locker{m_lock}; |
11fdf7f2 TL |
86 | ceph_assert(m_in_flight_advance_sets == 0); |
87 | ceph_assert(m_in_flight_object_closes == 0); | |
7c673cae FG |
88 | } |
89 | ||
494da23a | 90 | void JournalRecorder::shut_down(Context *on_safe) { |
9f95a23c | 91 | on_safe = new LambdaContext( |
494da23a TL |
92 | [this, on_safe](int r) { |
93 | Context *ctx = nullptr; | |
94 | { | |
9f95a23c | 95 | std::lock_guard locker{m_lock}; |
494da23a TL |
96 | if (m_in_flight_advance_sets != 0) { |
97 | ceph_assert(m_on_object_set_advanced == nullptr); | |
9f95a23c | 98 | m_on_object_set_advanced = new LambdaContext( |
494da23a TL |
99 | [on_safe, r](int) { |
100 | on_safe->complete(r); | |
101 | }); | |
102 | } else { | |
103 | ctx = on_safe; | |
104 | } | |
105 | } | |
106 | if (ctx != nullptr) { | |
107 | ctx->complete(r); | |
108 | } | |
109 | }); | |
110 | flush(on_safe); | |
111 | } | |
112 | ||
113 | void JournalRecorder::set_append_batch_options(int flush_interval, | |
114 | uint64_t flush_bytes, | |
115 | double flush_age) { | |
116 | ldout(m_cct, 5) << "flush_interval=" << flush_interval << ", " | |
117 | << "flush_bytes=" << flush_bytes << ", " | |
118 | << "flush_age=" << flush_age << dendl; | |
119 | ||
9f95a23c | 120 | std::lock_guard locker{m_lock}; |
494da23a TL |
121 | m_flush_interval = flush_interval; |
122 | m_flush_bytes = flush_bytes; | |
123 | m_flush_age = flush_age; | |
124 | ||
125 | uint8_t splay_width = m_journal_metadata->get_splay_width(); | |
126 | for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) { | |
9f95a23c | 127 | std::lock_guard object_locker{m_object_locks[splay_offset]}; |
494da23a TL |
128 | auto object_recorder = get_object(splay_offset); |
129 | object_recorder->set_append_batch_options(flush_interval, flush_bytes, | |
130 | flush_age); | |
131 | } | |
132 | } | |
133 | ||
7c673cae FG |
134 | Future JournalRecorder::append(uint64_t tag_tid, |
135 | const bufferlist &payload_bl) { | |
494da23a | 136 | ldout(m_cct, 20) << "tag_tid=" << tag_tid << dendl; |
7c673cae | 137 | |
9f95a23c | 138 | m_lock.lock(); |
7c673cae FG |
139 | |
140 | uint64_t entry_tid = m_journal_metadata->allocate_entry_tid(tag_tid); | |
141 | uint8_t splay_width = m_journal_metadata->get_splay_width(); | |
142 | uint8_t splay_offset = entry_tid % splay_width; | |
143 | ||
9f95a23c | 144 | auto object_ptr = get_object(splay_offset); |
7c673cae FG |
145 | uint64_t commit_tid = m_journal_metadata->allocate_commit_tid( |
146 | object_ptr->get_object_number(), tag_tid, entry_tid); | |
9f95a23c | 147 | auto future = ceph::make_ref<FutureImpl>(tag_tid, entry_tid, commit_tid); |
7c673cae FG |
148 | future->init(m_prev_future); |
149 | m_prev_future = future; | |
150 | ||
9f95a23c TL |
151 | m_object_locks[splay_offset].lock(); |
152 | m_lock.unlock(); | |
7c673cae FG |
153 | |
154 | bufferlist entry_bl; | |
11fdf7f2 TL |
155 | encode(Entry(future->get_tag_tid(), future->get_entry_tid(), payload_bl), |
156 | entry_bl); | |
157 | ceph_assert(entry_bl.length() <= m_journal_metadata->get_object_size()); | |
7c673cae | 158 | |
494da23a | 159 | bool object_full = object_ptr->append({{future, entry_bl}}); |
9f95a23c | 160 | m_object_locks[splay_offset].unlock(); |
494da23a | 161 | |
7c673cae FG |
162 | if (object_full) { |
163 | ldout(m_cct, 10) << "object " << object_ptr->get_oid() << " now full" | |
164 | << dendl; | |
9f95a23c | 165 | std::lock_guard l{m_lock}; |
7c673cae FG |
166 | close_and_advance_object_set(object_ptr->get_object_number() / splay_width); |
167 | } | |
168 | return Future(future); | |
169 | } | |
170 | ||
171 | void JournalRecorder::flush(Context *on_safe) { | |
494da23a TL |
172 | ldout(m_cct, 20) << dendl; |
173 | ||
7c673cae FG |
174 | C_Flush *ctx; |
175 | { | |
9f95a23c | 176 | std::lock_guard locker{m_lock}; |
7c673cae FG |
177 | |
178 | ctx = new C_Flush(m_journal_metadata, on_safe, m_object_ptrs.size() + 1); | |
9f95a23c TL |
179 | for (const auto& p : m_object_ptrs) { |
180 | p.second->flush(ctx); | |
7c673cae FG |
181 | } |
182 | ||
183 | } | |
184 | ||
185 | // avoid holding the lock in case there is nothing to flush | |
186 | ctx->complete(0); | |
187 | } | |
188 | ||
9f95a23c TL |
189 | ceph::ref_t<ObjectRecorder> JournalRecorder::get_object(uint8_t splay_offset) { |
190 | ceph_assert(ceph_mutex_is_locked(m_lock)); | |
7c673cae | 191 | |
9f95a23c TL |
192 | const auto& object_recoder = m_object_ptrs.at(splay_offset); |
193 | ceph_assert(object_recoder); | |
7c673cae FG |
194 | return object_recoder; |
195 | } | |
196 | ||
197 | void JournalRecorder::close_and_advance_object_set(uint64_t object_set) { | |
9f95a23c | 198 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
7c673cae FG |
199 | |
200 | // entry overflow from open object | |
201 | if (m_current_set != object_set) { | |
494da23a | 202 | ldout(m_cct, 20) << "close already in-progress" << dendl; |
7c673cae FG |
203 | return; |
204 | } | |
205 | ||
206 | // we shouldn't overflow upon append if already closed and we | |
207 | // shouldn't receive an overflowed callback if already closed | |
11fdf7f2 TL |
208 | ceph_assert(m_in_flight_advance_sets == 0); |
209 | ceph_assert(m_in_flight_object_closes == 0); | |
7c673cae FG |
210 | |
211 | uint64_t active_set = m_journal_metadata->get_active_set(); | |
11fdf7f2 | 212 | ceph_assert(m_current_set == active_set); |
7c673cae FG |
213 | ++m_current_set; |
214 | ++m_in_flight_advance_sets; | |
215 | ||
494da23a | 216 | ldout(m_cct, 10) << "closing active object set " << object_set << dendl; |
7c673cae FG |
217 | if (close_object_set(m_current_set)) { |
218 | advance_object_set(); | |
219 | } | |
220 | } | |
221 | ||
222 | void JournalRecorder::advance_object_set() { | |
9f95a23c | 223 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
7c673cae | 224 | |
11fdf7f2 | 225 | ceph_assert(m_in_flight_object_closes == 0); |
494da23a | 226 | ldout(m_cct, 10) << "advance to object set " << m_current_set << dendl; |
7c673cae FG |
227 | m_journal_metadata->set_active_set(m_current_set, new C_AdvanceObjectSet( |
228 | this)); | |
229 | } | |
230 | ||
231 | void JournalRecorder::handle_advance_object_set(int r) { | |
494da23a TL |
232 | Context *on_object_set_advanced = nullptr; |
233 | { | |
9f95a23c | 234 | std::lock_guard locker{m_lock}; |
494da23a | 235 | ldout(m_cct, 20) << __func__ << ": r=" << r << dendl; |
7c673cae | 236 | |
494da23a TL |
237 | ceph_assert(m_in_flight_advance_sets > 0); |
238 | --m_in_flight_advance_sets; | |
7c673cae | 239 | |
494da23a TL |
240 | if (r < 0 && r != -ESTALE) { |
241 | lderr(m_cct) << "failed to advance object set: " << cpp_strerror(r) | |
242 | << dendl; | |
243 | } | |
7c673cae | 244 | |
494da23a TL |
245 | if (m_in_flight_advance_sets == 0 && m_in_flight_object_closes == 0) { |
246 | open_object_set(); | |
247 | std::swap(on_object_set_advanced, m_on_object_set_advanced); | |
248 | } | |
249 | } | |
250 | if (on_object_set_advanced != nullptr) { | |
251 | on_object_set_advanced->complete(0); | |
7c673cae FG |
252 | } |
253 | } | |
254 | ||
255 | void JournalRecorder::open_object_set() { | |
9f95a23c | 256 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
7c673cae | 257 | |
494da23a | 258 | ldout(m_cct, 10) << "opening object set " << m_current_set << dendl; |
7c673cae FG |
259 | |
260 | uint8_t splay_width = m_journal_metadata->get_splay_width(); | |
9f95a23c | 261 | bool overflowed = false; |
7c673cae | 262 | |
9f95a23c TL |
263 | auto lockers{lock_object_recorders()}; |
264 | for (const auto& p : m_object_ptrs) { | |
265 | const auto& object_recorder = p.second; | |
7c673cae FG |
266 | uint64_t object_number = object_recorder->get_object_number(); |
267 | if (object_number / splay_width != m_current_set) { | |
11fdf7f2 | 268 | ceph_assert(object_recorder->is_closed()); |
7c673cae FG |
269 | |
270 | // ready to close object and open object in active set | |
9f95a23c TL |
271 | if (create_next_object_recorder(object_recorder)) { |
272 | overflowed = true; | |
273 | } | |
7c673cae FG |
274 | } |
275 | } | |
9f95a23c TL |
276 | lockers.clear(); |
277 | ||
278 | if (overflowed) { | |
279 | ldout(m_cct, 10) << "object set " << m_current_set << " now full" << dendl; | |
280 | ldout(m_cct, 10) << "" << dendl; | |
281 | close_and_advance_object_set(m_current_set); | |
282 | } | |
7c673cae FG |
283 | } |
284 | ||
285 | bool JournalRecorder::close_object_set(uint64_t active_set) { | |
494da23a | 286 | ldout(m_cct, 10) << "active_set=" << active_set << dendl; |
9f95a23c | 287 | ceph_assert(ceph_mutex_is_locked(m_lock)); |
7c673cae FG |
288 | |
289 | // object recorders will invoke overflow handler as they complete | |
290 | // closing the object to ensure correct order of future appends | |
291 | uint8_t splay_width = m_journal_metadata->get_splay_width(); | |
9f95a23c TL |
292 | auto lockers{lock_object_recorders()}; |
293 | for (const auto& p : m_object_ptrs) { | |
294 | const auto& object_recorder = p.second; | |
7c673cae | 295 | if (object_recorder->get_object_number() / splay_width != active_set) { |
494da23a TL |
296 | ldout(m_cct, 10) << "closing object " << object_recorder->get_oid() |
297 | << dendl; | |
7c673cae FG |
298 | // flush out all queued appends and hold future appends |
299 | if (!object_recorder->close()) { | |
300 | ++m_in_flight_object_closes; | |
9f95a23c TL |
301 | ldout(m_cct, 10) << "object " << object_recorder->get_oid() << " " |
302 | << "close in-progress" << dendl; | |
7c673cae | 303 | } else { |
494da23a TL |
304 | ldout(m_cct, 10) << "object " << object_recorder->get_oid() << " closed" |
305 | << dendl; | |
7c673cae FG |
306 | } |
307 | } | |
308 | } | |
7c673cae FG |
309 | return (m_in_flight_object_closes == 0); |
310 | } | |
311 | ||
9f95a23c TL |
312 | ceph::ref_t<ObjectRecorder> JournalRecorder::create_object_recorder( |
313 | uint64_t object_number, ceph::mutex* lock) { | |
494da23a | 314 | ldout(m_cct, 10) << "object_number=" << object_number << dendl; |
9f95a23c | 315 | auto object_recorder = ceph::make_ref<ObjectRecorder>( |
7c673cae FG |
316 | m_ioctx, utils::get_object_name(m_object_oid_prefix, object_number), |
317 | object_number, lock, m_journal_metadata->get_work_queue(), | |
494da23a | 318 | &m_object_handler, m_journal_metadata->get_order(), |
9f95a23c | 319 | m_max_in_flight_appends); |
494da23a TL |
320 | object_recorder->set_append_batch_options(m_flush_interval, m_flush_bytes, |
321 | m_flush_age); | |
7c673cae FG |
322 | return object_recorder; |
323 | } | |
324 | ||
9f95a23c TL |
325 | bool JournalRecorder::create_next_object_recorder( |
326 | ceph::ref_t<ObjectRecorder> object_recorder) { | |
327 | ceph_assert(ceph_mutex_is_locked(m_lock)); | |
7c673cae FG |
328 | |
329 | uint64_t object_number = object_recorder->get_object_number(); | |
330 | uint8_t splay_width = m_journal_metadata->get_splay_width(); | |
331 | uint8_t splay_offset = object_number % splay_width; | |
494da23a | 332 | ldout(m_cct, 10) << "object_number=" << object_number << dendl; |
7c673cae | 333 | |
9f95a23c | 334 | ceph_assert(ceph_mutex_is_locked(m_object_locks[splay_offset])); |
7c673cae | 335 | |
9f95a23c TL |
336 | auto new_object_recorder = create_object_recorder( |
337 | (m_current_set * splay_width) + splay_offset, &m_object_locks[splay_offset]); | |
7c673cae | 338 | |
494da23a | 339 | ldout(m_cct, 10) << "old oid=" << object_recorder->get_oid() << ", " |
7c673cae FG |
340 | << "new oid=" << new_object_recorder->get_oid() << dendl; |
341 | AppendBuffers append_buffers; | |
342 | object_recorder->claim_append_buffers(&append_buffers); | |
343 | ||
344 | // update the commit record to point to the correct object number | |
345 | for (auto &append_buffer : append_buffers) { | |
346 | m_journal_metadata->overflow_commit_tid( | |
347 | append_buffer.first->get_commit_tid(), | |
348 | new_object_recorder->get_object_number()); | |
349 | } | |
350 | ||
9f95a23c TL |
351 | bool object_full = new_object_recorder->append(std::move(append_buffers)); |
352 | if (object_full) { | |
353 | ldout(m_cct, 10) << "object " << new_object_recorder->get_oid() << " " | |
354 | << "now full" << dendl; | |
355 | } | |
356 | ||
357 | m_object_ptrs[splay_offset] = std::move(new_object_recorder); | |
358 | return object_full; | |
7c673cae FG |
359 | } |
360 | ||
361 | void JournalRecorder::handle_update() { | |
9f95a23c | 362 | std::lock_guard locker{m_lock}; |
7c673cae FG |
363 | |
364 | uint64_t active_set = m_journal_metadata->get_active_set(); | |
365 | if (m_current_set < active_set) { | |
366 | // peer journal client advanced the active set | |
494da23a | 367 | ldout(m_cct, 10) << "current_set=" << m_current_set << ", " |
7c673cae FG |
368 | << "active_set=" << active_set << dendl; |
369 | ||
370 | uint64_t current_set = m_current_set; | |
371 | m_current_set = active_set; | |
372 | if (m_in_flight_advance_sets == 0 && m_in_flight_object_closes == 0) { | |
494da23a | 373 | ldout(m_cct, 10) << "closing current object set " << current_set << dendl; |
7c673cae FG |
374 | if (close_object_set(active_set)) { |
375 | open_object_set(); | |
376 | } | |
377 | } | |
378 | } | |
379 | } | |
380 | ||
381 | void JournalRecorder::handle_closed(ObjectRecorder *object_recorder) { | |
494da23a | 382 | ldout(m_cct, 10) << object_recorder->get_oid() << dendl; |
7c673cae | 383 | |
9f95a23c | 384 | std::lock_guard locker{m_lock}; |
7c673cae FG |
385 | |
386 | uint64_t object_number = object_recorder->get_object_number(); | |
387 | uint8_t splay_width = m_journal_metadata->get_splay_width(); | |
388 | uint8_t splay_offset = object_number % splay_width; | |
9f95a23c | 389 | auto& active_object_recorder = m_object_ptrs.at(splay_offset); |
11fdf7f2 | 390 | ceph_assert(active_object_recorder->get_object_number() == object_number); |
7c673cae | 391 | |
11fdf7f2 | 392 | ceph_assert(m_in_flight_object_closes > 0); |
7c673cae FG |
393 | --m_in_flight_object_closes; |
394 | ||
395 | // object closed after advance active set committed | |
494da23a TL |
396 | ldout(m_cct, 10) << "object " << active_object_recorder->get_oid() |
397 | << " closed" << dendl; | |
7c673cae FG |
398 | if (m_in_flight_object_closes == 0) { |
399 | if (m_in_flight_advance_sets == 0) { | |
400 | // peer forced closing of object set | |
401 | open_object_set(); | |
402 | } else { | |
403 | // local overflow advanced object set | |
404 | advance_object_set(); | |
405 | } | |
406 | } | |
407 | } | |
408 | ||
409 | void JournalRecorder::handle_overflow(ObjectRecorder *object_recorder) { | |
494da23a | 410 | ldout(m_cct, 10) << object_recorder->get_oid() << dendl; |
7c673cae | 411 | |
9f95a23c | 412 | std::lock_guard locker{m_lock}; |
7c673cae FG |
413 | |
414 | uint64_t object_number = object_recorder->get_object_number(); | |
415 | uint8_t splay_width = m_journal_metadata->get_splay_width(); | |
416 | uint8_t splay_offset = object_number % splay_width; | |
9f95a23c | 417 | auto& active_object_recorder = m_object_ptrs.at(splay_offset); |
11fdf7f2 | 418 | ceph_assert(active_object_recorder->get_object_number() == object_number); |
7c673cae | 419 | |
494da23a TL |
420 | ldout(m_cct, 10) << "object " << active_object_recorder->get_oid() |
421 | << " overflowed" << dendl; | |
7c673cae FG |
422 | close_and_advance_object_set(object_number / splay_width); |
423 | } | |
424 | ||
9f95a23c TL |
425 | JournalRecorder::Lockers JournalRecorder::lock_object_recorders() { |
426 | Lockers lockers; | |
427 | lockers.reserve(m_object_ptrs.size()); | |
428 | for (auto& lock : m_object_locks) { | |
429 | lockers.emplace_back(lock); | |
430 | } | |
431 | return lockers; | |
432 | } | |
433 | ||
7c673cae | 434 | } // namespace journal |