]> git.proxmox.com Git - ceph.git/blame - ceph/src/journal/JournalRecorder.cc
bump version to 18.2.2-pve1
[ceph.git] / ceph / src / journal / JournalRecorder.cc
CommitLineData
7c673cae
FG
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "journal/JournalRecorder.h"
5#include "common/errno.h"
6#include "journal/Entry.h"
7#include "journal/Utils.h"
8
31f18b77
FG
9#include <atomic>
10
7c673cae
FG
11#define dout_subsys ceph_subsys_journaler
12#undef dout_prefix
494da23a
TL
13#define dout_prefix *_dout << "JournalRecorder: " << this << " " << __func__ \
14 << ": "
7c673cae
FG
15
16using std::shared_ptr;
17
18namespace journal {
19
20namespace {
21
22struct C_Flush : public Context {
9f95a23c 23 ceph::ref_t<JournalMetadata> journal_metadata;
7c673cae 24 Context *on_finish;
9f95a23c
TL
25 std::atomic<int64_t> pending_flushes{0};
26 int ret_val = 0;
7c673cae 27
9f95a23c 28 C_Flush(ceph::ref_t<JournalMetadata> _journal_metadata, Context *_on_finish,
7c673cae 29 size_t _pending_flushes)
9f95a23c
TL
30 : journal_metadata(std::move(_journal_metadata)),
31 on_finish(_on_finish),
32 pending_flushes(_pending_flushes) {
7c673cae
FG
33 }
34
35 void complete(int r) override {
36 if (r < 0 && ret_val == 0) {
37 ret_val = r;
38 }
31f18b77 39 if (--pending_flushes == 0) {
7c673cae
FG
40 // ensure all prior callback have been flushed as well
41 journal_metadata->queue(on_finish, ret_val);
42 delete this;
43 }
44 }
45 void finish(int r) override {
46 }
47};
48
49} // anonymous namespace
50
51JournalRecorder::JournalRecorder(librados::IoCtx &ioctx,
9f95a23c
TL
52 std::string_view object_oid_prefix,
53 ceph::ref_t<JournalMetadata> journal_metadata,
11fdf7f2 54 uint64_t max_in_flight_appends)
9f95a23c
TL
55 : m_object_oid_prefix(object_oid_prefix),
56 m_journal_metadata(std::move(journal_metadata)),
57 m_max_in_flight_appends(max_in_flight_appends),
58 m_listener(this),
59 m_object_handler(this),
60 m_current_set(m_journal_metadata->get_active_set()),
61 m_object_locks{ceph::make_lock_container<ceph::mutex>(
62 m_journal_metadata->get_splay_width(), [](const size_t splay_offset) {
63 return ceph::make_mutex("ObjectRecorder::m_lock::" +
64 std::to_string(splay_offset));
65 })}
66{
67 std::lock_guard locker{m_lock};
7c673cae
FG
68 m_ioctx.dup(ioctx);
69 m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
70
71 uint8_t splay_width = m_journal_metadata->get_splay_width();
72 for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) {
7c673cae 73 uint64_t object_number = splay_offset + (m_current_set * splay_width);
9f95a23c 74 std::lock_guard locker{m_object_locks[splay_offset]};
7c673cae 75 m_object_ptrs[splay_offset] = create_object_recorder(
9f95a23c 76 object_number, &m_object_locks[splay_offset]);
7c673cae
FG
77 }
78
79 m_journal_metadata->add_listener(&m_listener);
80}
81
82JournalRecorder::~JournalRecorder() {
83 m_journal_metadata->remove_listener(&m_listener);
84
9f95a23c 85 std::lock_guard locker{m_lock};
11fdf7f2
TL
86 ceph_assert(m_in_flight_advance_sets == 0);
87 ceph_assert(m_in_flight_object_closes == 0);
7c673cae
FG
88}
89
494da23a 90void JournalRecorder::shut_down(Context *on_safe) {
9f95a23c 91 on_safe = new LambdaContext(
494da23a
TL
92 [this, on_safe](int r) {
93 Context *ctx = nullptr;
94 {
9f95a23c 95 std::lock_guard locker{m_lock};
494da23a
TL
96 if (m_in_flight_advance_sets != 0) {
97 ceph_assert(m_on_object_set_advanced == nullptr);
9f95a23c 98 m_on_object_set_advanced = new LambdaContext(
494da23a
TL
99 [on_safe, r](int) {
100 on_safe->complete(r);
101 });
102 } else {
103 ctx = on_safe;
104 }
105 }
106 if (ctx != nullptr) {
107 ctx->complete(r);
108 }
109 });
110 flush(on_safe);
111}
112
113void JournalRecorder::set_append_batch_options(int flush_interval,
114 uint64_t flush_bytes,
115 double flush_age) {
116 ldout(m_cct, 5) << "flush_interval=" << flush_interval << ", "
117 << "flush_bytes=" << flush_bytes << ", "
118 << "flush_age=" << flush_age << dendl;
119
9f95a23c 120 std::lock_guard locker{m_lock};
494da23a
TL
121 m_flush_interval = flush_interval;
122 m_flush_bytes = flush_bytes;
123 m_flush_age = flush_age;
124
125 uint8_t splay_width = m_journal_metadata->get_splay_width();
126 for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) {
9f95a23c 127 std::lock_guard object_locker{m_object_locks[splay_offset]};
494da23a
TL
128 auto object_recorder = get_object(splay_offset);
129 object_recorder->set_append_batch_options(flush_interval, flush_bytes,
130 flush_age);
131 }
132}
133
7c673cae
FG
134Future JournalRecorder::append(uint64_t tag_tid,
135 const bufferlist &payload_bl) {
494da23a 136 ldout(m_cct, 20) << "tag_tid=" << tag_tid << dendl;
7c673cae 137
9f95a23c 138 m_lock.lock();
7c673cae
FG
139
140 uint64_t entry_tid = m_journal_metadata->allocate_entry_tid(tag_tid);
141 uint8_t splay_width = m_journal_metadata->get_splay_width();
142 uint8_t splay_offset = entry_tid % splay_width;
143
9f95a23c 144 auto object_ptr = get_object(splay_offset);
7c673cae
FG
145 uint64_t commit_tid = m_journal_metadata->allocate_commit_tid(
146 object_ptr->get_object_number(), tag_tid, entry_tid);
9f95a23c 147 auto future = ceph::make_ref<FutureImpl>(tag_tid, entry_tid, commit_tid);
7c673cae
FG
148 future->init(m_prev_future);
149 m_prev_future = future;
150
9f95a23c
TL
151 m_object_locks[splay_offset].lock();
152 m_lock.unlock();
7c673cae
FG
153
154 bufferlist entry_bl;
11fdf7f2
TL
155 encode(Entry(future->get_tag_tid(), future->get_entry_tid(), payload_bl),
156 entry_bl);
157 ceph_assert(entry_bl.length() <= m_journal_metadata->get_object_size());
7c673cae 158
494da23a 159 bool object_full = object_ptr->append({{future, entry_bl}});
9f95a23c 160 m_object_locks[splay_offset].unlock();
494da23a 161
7c673cae
FG
162 if (object_full) {
163 ldout(m_cct, 10) << "object " << object_ptr->get_oid() << " now full"
164 << dendl;
9f95a23c 165 std::lock_guard l{m_lock};
7c673cae
FG
166 close_and_advance_object_set(object_ptr->get_object_number() / splay_width);
167 }
168 return Future(future);
169}
170
171void JournalRecorder::flush(Context *on_safe) {
494da23a
TL
172 ldout(m_cct, 20) << dendl;
173
7c673cae
FG
174 C_Flush *ctx;
175 {
9f95a23c 176 std::lock_guard locker{m_lock};
7c673cae
FG
177
178 ctx = new C_Flush(m_journal_metadata, on_safe, m_object_ptrs.size() + 1);
9f95a23c
TL
179 for (const auto& p : m_object_ptrs) {
180 p.second->flush(ctx);
7c673cae
FG
181 }
182
183 }
184
185 // avoid holding the lock in case there is nothing to flush
186 ctx->complete(0);
187}
188
9f95a23c
TL
189ceph::ref_t<ObjectRecorder> JournalRecorder::get_object(uint8_t splay_offset) {
190 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae 191
9f95a23c
TL
192 const auto& object_recoder = m_object_ptrs.at(splay_offset);
193 ceph_assert(object_recoder);
7c673cae
FG
194 return object_recoder;
195}
196
197void JournalRecorder::close_and_advance_object_set(uint64_t object_set) {
9f95a23c 198 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae
FG
199
200 // entry overflow from open object
201 if (m_current_set != object_set) {
494da23a 202 ldout(m_cct, 20) << "close already in-progress" << dendl;
7c673cae
FG
203 return;
204 }
205
206 // we shouldn't overflow upon append if already closed and we
207 // shouldn't receive an overflowed callback if already closed
11fdf7f2
TL
208 ceph_assert(m_in_flight_advance_sets == 0);
209 ceph_assert(m_in_flight_object_closes == 0);
7c673cae
FG
210
211 uint64_t active_set = m_journal_metadata->get_active_set();
11fdf7f2 212 ceph_assert(m_current_set == active_set);
7c673cae
FG
213 ++m_current_set;
214 ++m_in_flight_advance_sets;
215
494da23a 216 ldout(m_cct, 10) << "closing active object set " << object_set << dendl;
7c673cae
FG
217 if (close_object_set(m_current_set)) {
218 advance_object_set();
219 }
220}
221
222void JournalRecorder::advance_object_set() {
9f95a23c 223 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae 224
11fdf7f2 225 ceph_assert(m_in_flight_object_closes == 0);
494da23a 226 ldout(m_cct, 10) << "advance to object set " << m_current_set << dendl;
7c673cae
FG
227 m_journal_metadata->set_active_set(m_current_set, new C_AdvanceObjectSet(
228 this));
229}
230
231void JournalRecorder::handle_advance_object_set(int r) {
494da23a
TL
232 Context *on_object_set_advanced = nullptr;
233 {
9f95a23c 234 std::lock_guard locker{m_lock};
494da23a 235 ldout(m_cct, 20) << __func__ << ": r=" << r << dendl;
7c673cae 236
494da23a
TL
237 ceph_assert(m_in_flight_advance_sets > 0);
238 --m_in_flight_advance_sets;
7c673cae 239
494da23a
TL
240 if (r < 0 && r != -ESTALE) {
241 lderr(m_cct) << "failed to advance object set: " << cpp_strerror(r)
242 << dendl;
243 }
7c673cae 244
494da23a
TL
245 if (m_in_flight_advance_sets == 0 && m_in_flight_object_closes == 0) {
246 open_object_set();
247 std::swap(on_object_set_advanced, m_on_object_set_advanced);
248 }
249 }
250 if (on_object_set_advanced != nullptr) {
251 on_object_set_advanced->complete(0);
7c673cae
FG
252 }
253}
254
255void JournalRecorder::open_object_set() {
9f95a23c 256 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae 257
494da23a 258 ldout(m_cct, 10) << "opening object set " << m_current_set << dendl;
7c673cae
FG
259
260 uint8_t splay_width = m_journal_metadata->get_splay_width();
9f95a23c 261 bool overflowed = false;
7c673cae 262
9f95a23c
TL
263 auto lockers{lock_object_recorders()};
264 for (const auto& p : m_object_ptrs) {
265 const auto& object_recorder = p.second;
7c673cae
FG
266 uint64_t object_number = object_recorder->get_object_number();
267 if (object_number / splay_width != m_current_set) {
11fdf7f2 268 ceph_assert(object_recorder->is_closed());
7c673cae
FG
269
270 // ready to close object and open object in active set
9f95a23c
TL
271 if (create_next_object_recorder(object_recorder)) {
272 overflowed = true;
273 }
7c673cae
FG
274 }
275 }
9f95a23c
TL
276 lockers.clear();
277
278 if (overflowed) {
279 ldout(m_cct, 10) << "object set " << m_current_set << " now full" << dendl;
280 ldout(m_cct, 10) << "" << dendl;
281 close_and_advance_object_set(m_current_set);
282 }
7c673cae
FG
283}
284
285bool JournalRecorder::close_object_set(uint64_t active_set) {
494da23a 286 ldout(m_cct, 10) << "active_set=" << active_set << dendl;
9f95a23c 287 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae
FG
288
289 // object recorders will invoke overflow handler as they complete
290 // closing the object to ensure correct order of future appends
291 uint8_t splay_width = m_journal_metadata->get_splay_width();
9f95a23c
TL
292 auto lockers{lock_object_recorders()};
293 for (const auto& p : m_object_ptrs) {
294 const auto& object_recorder = p.second;
7c673cae 295 if (object_recorder->get_object_number() / splay_width != active_set) {
494da23a
TL
296 ldout(m_cct, 10) << "closing object " << object_recorder->get_oid()
297 << dendl;
7c673cae
FG
298 // flush out all queued appends and hold future appends
299 if (!object_recorder->close()) {
300 ++m_in_flight_object_closes;
9f95a23c
TL
301 ldout(m_cct, 10) << "object " << object_recorder->get_oid() << " "
302 << "close in-progress" << dendl;
7c673cae 303 } else {
494da23a
TL
304 ldout(m_cct, 10) << "object " << object_recorder->get_oid() << " closed"
305 << dendl;
7c673cae
FG
306 }
307 }
308 }
7c673cae
FG
309 return (m_in_flight_object_closes == 0);
310}
311
9f95a23c
TL
312ceph::ref_t<ObjectRecorder> JournalRecorder::create_object_recorder(
313 uint64_t object_number, ceph::mutex* lock) {
494da23a 314 ldout(m_cct, 10) << "object_number=" << object_number << dendl;
9f95a23c 315 auto object_recorder = ceph::make_ref<ObjectRecorder>(
7c673cae
FG
316 m_ioctx, utils::get_object_name(m_object_oid_prefix, object_number),
317 object_number, lock, m_journal_metadata->get_work_queue(),
494da23a 318 &m_object_handler, m_journal_metadata->get_order(),
9f95a23c 319 m_max_in_flight_appends);
494da23a
TL
320 object_recorder->set_append_batch_options(m_flush_interval, m_flush_bytes,
321 m_flush_age);
7c673cae
FG
322 return object_recorder;
323}
324
9f95a23c
TL
325bool JournalRecorder::create_next_object_recorder(
326 ceph::ref_t<ObjectRecorder> object_recorder) {
327 ceph_assert(ceph_mutex_is_locked(m_lock));
7c673cae
FG
328
329 uint64_t object_number = object_recorder->get_object_number();
330 uint8_t splay_width = m_journal_metadata->get_splay_width();
331 uint8_t splay_offset = object_number % splay_width;
494da23a 332 ldout(m_cct, 10) << "object_number=" << object_number << dendl;
7c673cae 333
9f95a23c 334 ceph_assert(ceph_mutex_is_locked(m_object_locks[splay_offset]));
7c673cae 335
9f95a23c
TL
336 auto new_object_recorder = create_object_recorder(
337 (m_current_set * splay_width) + splay_offset, &m_object_locks[splay_offset]);
7c673cae 338
494da23a 339 ldout(m_cct, 10) << "old oid=" << object_recorder->get_oid() << ", "
7c673cae
FG
340 << "new oid=" << new_object_recorder->get_oid() << dendl;
341 AppendBuffers append_buffers;
342 object_recorder->claim_append_buffers(&append_buffers);
343
344 // update the commit record to point to the correct object number
345 for (auto &append_buffer : append_buffers) {
346 m_journal_metadata->overflow_commit_tid(
347 append_buffer.first->get_commit_tid(),
348 new_object_recorder->get_object_number());
349 }
350
9f95a23c
TL
351 bool object_full = new_object_recorder->append(std::move(append_buffers));
352 if (object_full) {
353 ldout(m_cct, 10) << "object " << new_object_recorder->get_oid() << " "
354 << "now full" << dendl;
355 }
356
357 m_object_ptrs[splay_offset] = std::move(new_object_recorder);
358 return object_full;
7c673cae
FG
359}
360
361void JournalRecorder::handle_update() {
9f95a23c 362 std::lock_guard locker{m_lock};
7c673cae
FG
363
364 uint64_t active_set = m_journal_metadata->get_active_set();
365 if (m_current_set < active_set) {
366 // peer journal client advanced the active set
494da23a 367 ldout(m_cct, 10) << "current_set=" << m_current_set << ", "
7c673cae
FG
368 << "active_set=" << active_set << dendl;
369
370 uint64_t current_set = m_current_set;
371 m_current_set = active_set;
372 if (m_in_flight_advance_sets == 0 && m_in_flight_object_closes == 0) {
494da23a 373 ldout(m_cct, 10) << "closing current object set " << current_set << dendl;
7c673cae
FG
374 if (close_object_set(active_set)) {
375 open_object_set();
376 }
377 }
378 }
379}
380
381void JournalRecorder::handle_closed(ObjectRecorder *object_recorder) {
494da23a 382 ldout(m_cct, 10) << object_recorder->get_oid() << dendl;
7c673cae 383
9f95a23c 384 std::lock_guard locker{m_lock};
7c673cae
FG
385
386 uint64_t object_number = object_recorder->get_object_number();
387 uint8_t splay_width = m_journal_metadata->get_splay_width();
388 uint8_t splay_offset = object_number % splay_width;
9f95a23c 389 auto& active_object_recorder = m_object_ptrs.at(splay_offset);
11fdf7f2 390 ceph_assert(active_object_recorder->get_object_number() == object_number);
7c673cae 391
11fdf7f2 392 ceph_assert(m_in_flight_object_closes > 0);
7c673cae
FG
393 --m_in_flight_object_closes;
394
395 // object closed after advance active set committed
494da23a
TL
396 ldout(m_cct, 10) << "object " << active_object_recorder->get_oid()
397 << " closed" << dendl;
7c673cae
FG
398 if (m_in_flight_object_closes == 0) {
399 if (m_in_flight_advance_sets == 0) {
400 // peer forced closing of object set
401 open_object_set();
402 } else {
403 // local overflow advanced object set
404 advance_object_set();
405 }
406 }
407}
408
409void JournalRecorder::handle_overflow(ObjectRecorder *object_recorder) {
494da23a 410 ldout(m_cct, 10) << object_recorder->get_oid() << dendl;
7c673cae 411
9f95a23c 412 std::lock_guard locker{m_lock};
7c673cae
FG
413
414 uint64_t object_number = object_recorder->get_object_number();
415 uint8_t splay_width = m_journal_metadata->get_splay_width();
416 uint8_t splay_offset = object_number % splay_width;
9f95a23c 417 auto& active_object_recorder = m_object_ptrs.at(splay_offset);
11fdf7f2 418 ceph_assert(active_object_recorder->get_object_number() == object_number);
7c673cae 419
494da23a
TL
420 ldout(m_cct, 10) << "object " << active_object_recorder->get_oid()
421 << " overflowed" << dendl;
7c673cae
FG
422 close_and_advance_object_set(object_number / splay_width);
423}
424
9f95a23c
TL
425JournalRecorder::Lockers JournalRecorder::lock_object_recorders() {
426 Lockers lockers;
427 lockers.reserve(m_object_ptrs.size());
428 for (auto& lock : m_object_locks) {
429 lockers.emplace_back(lock);
430 }
431 return lockers;
432}
433
7c673cae 434} // namespace journal