]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "journal/JournalRecorder.h" | |
5 | #include "common/errno.h" | |
6 | #include "journal/Entry.h" | |
7 | #include "journal/Utils.h" | |
8 | ||
31f18b77 FG |
9 | #include <atomic> |
10 | ||
7c673cae FG |
11 | #define dout_subsys ceph_subsys_journaler |
12 | #undef dout_prefix | |
13 | #define dout_prefix *_dout << "JournalRecorder: " << this << " " | |
14 | ||
15 | using std::shared_ptr; | |
16 | ||
17 | namespace journal { | |
18 | ||
19 | namespace { | |
20 | ||
21 | struct C_Flush : public Context { | |
22 | JournalMetadataPtr journal_metadata; | |
23 | Context *on_finish; | |
31f18b77 | 24 | std::atomic<int64_t> pending_flushes = { 0 }; |
7c673cae FG |
25 | int ret_val; |
26 | ||
27 | C_Flush(JournalMetadataPtr _journal_metadata, Context *_on_finish, | |
28 | size_t _pending_flushes) | |
29 | : journal_metadata(_journal_metadata), on_finish(_on_finish), | |
30 | pending_flushes(_pending_flushes), ret_val(0) { | |
31 | } | |
32 | ||
33 | void complete(int r) override { | |
34 | if (r < 0 && ret_val == 0) { | |
35 | ret_val = r; | |
36 | } | |
31f18b77 | 37 | if (--pending_flushes == 0) { |
7c673cae FG |
38 | // ensure all prior callback have been flushed as well |
39 | journal_metadata->queue(on_finish, ret_val); | |
40 | delete this; | |
41 | } | |
42 | } | |
43 | void finish(int r) override { | |
44 | } | |
45 | }; | |
46 | ||
47 | } // anonymous namespace | |
48 | ||
49 | JournalRecorder::JournalRecorder(librados::IoCtx &ioctx, | |
50 | const std::string &object_oid_prefix, | |
51 | const JournalMetadataPtr& journal_metadata, | |
52 | uint32_t flush_interval, uint64_t flush_bytes, | |
11fdf7f2 TL |
53 | double flush_age, |
54 | uint64_t max_in_flight_appends) | |
7c673cae FG |
55 | : m_cct(NULL), m_object_oid_prefix(object_oid_prefix), |
56 | m_journal_metadata(journal_metadata), m_flush_interval(flush_interval), | |
11fdf7f2 TL |
57 | m_flush_bytes(flush_bytes), m_flush_age(flush_age), |
58 | m_max_in_flight_appends(max_in_flight_appends), m_listener(this), | |
7c673cae FG |
59 | m_object_handler(this), m_lock("JournalerRecorder::m_lock"), |
60 | m_current_set(m_journal_metadata->get_active_set()) { | |
61 | ||
62 | Mutex::Locker locker(m_lock); | |
63 | m_ioctx.dup(ioctx); | |
64 | m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct()); | |
65 | ||
66 | uint8_t splay_width = m_journal_metadata->get_splay_width(); | |
67 | for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) { | |
68 | m_object_locks.push_back(shared_ptr<Mutex>( | |
69 | new Mutex("ObjectRecorder::m_lock::"+ | |
70 | std::to_string(splay_offset)))); | |
71 | uint64_t object_number = splay_offset + (m_current_set * splay_width); | |
72 | m_object_ptrs[splay_offset] = create_object_recorder( | |
73 | object_number, | |
74 | m_object_locks[splay_offset]); | |
75 | } | |
76 | ||
77 | m_journal_metadata->add_listener(&m_listener); | |
78 | } | |
79 | ||
80 | JournalRecorder::~JournalRecorder() { | |
81 | m_journal_metadata->remove_listener(&m_listener); | |
82 | ||
83 | Mutex::Locker locker(m_lock); | |
11fdf7f2 TL |
84 | ceph_assert(m_in_flight_advance_sets == 0); |
85 | ceph_assert(m_in_flight_object_closes == 0); | |
7c673cae FG |
86 | } |
87 | ||
88 | Future JournalRecorder::append(uint64_t tag_tid, | |
89 | const bufferlist &payload_bl) { | |
90 | ||
91 | m_lock.Lock(); | |
92 | ||
93 | uint64_t entry_tid = m_journal_metadata->allocate_entry_tid(tag_tid); | |
94 | uint8_t splay_width = m_journal_metadata->get_splay_width(); | |
95 | uint8_t splay_offset = entry_tid % splay_width; | |
96 | ||
97 | ObjectRecorderPtr object_ptr = get_object(splay_offset); | |
98 | uint64_t commit_tid = m_journal_metadata->allocate_commit_tid( | |
99 | object_ptr->get_object_number(), tag_tid, entry_tid); | |
100 | FutureImplPtr future(new FutureImpl(tag_tid, entry_tid, commit_tid)); | |
101 | future->init(m_prev_future); | |
102 | m_prev_future = future; | |
103 | ||
104 | m_object_locks[splay_offset]->Lock(); | |
105 | m_lock.Unlock(); | |
106 | ||
107 | bufferlist entry_bl; | |
11fdf7f2 TL |
108 | encode(Entry(future->get_tag_tid(), future->get_entry_tid(), payload_bl), |
109 | entry_bl); | |
110 | ceph_assert(entry_bl.length() <= m_journal_metadata->get_object_size()); | |
7c673cae FG |
111 | |
112 | bool object_full = object_ptr->append_unlock({{future, entry_bl}}); | |
113 | if (object_full) { | |
114 | ldout(m_cct, 10) << "object " << object_ptr->get_oid() << " now full" | |
115 | << dendl; | |
116 | Mutex::Locker l(m_lock); | |
117 | close_and_advance_object_set(object_ptr->get_object_number() / splay_width); | |
118 | } | |
119 | return Future(future); | |
120 | } | |
121 | ||
122 | void JournalRecorder::flush(Context *on_safe) { | |
123 | C_Flush *ctx; | |
124 | { | |
125 | Mutex::Locker locker(m_lock); | |
126 | ||
127 | ctx = new C_Flush(m_journal_metadata, on_safe, m_object_ptrs.size() + 1); | |
128 | for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin(); | |
129 | it != m_object_ptrs.end(); ++it) { | |
130 | it->second->flush(ctx); | |
131 | } | |
132 | ||
133 | } | |
134 | ||
135 | // avoid holding the lock in case there is nothing to flush | |
136 | ctx->complete(0); | |
137 | } | |
138 | ||
139 | ObjectRecorderPtr JournalRecorder::get_object(uint8_t splay_offset) { | |
11fdf7f2 | 140 | ceph_assert(m_lock.is_locked()); |
7c673cae FG |
141 | |
142 | ObjectRecorderPtr object_recoder = m_object_ptrs[splay_offset]; | |
11fdf7f2 | 143 | ceph_assert(object_recoder != NULL); |
7c673cae FG |
144 | return object_recoder; |
145 | } | |
146 | ||
147 | void JournalRecorder::close_and_advance_object_set(uint64_t object_set) { | |
11fdf7f2 | 148 | ceph_assert(m_lock.is_locked()); |
7c673cae FG |
149 | |
150 | // entry overflow from open object | |
151 | if (m_current_set != object_set) { | |
152 | ldout(m_cct, 20) << __func__ << ": close already in-progress" << dendl; | |
153 | return; | |
154 | } | |
155 | ||
156 | // we shouldn't overflow upon append if already closed and we | |
157 | // shouldn't receive an overflowed callback if already closed | |
11fdf7f2 TL |
158 | ceph_assert(m_in_flight_advance_sets == 0); |
159 | ceph_assert(m_in_flight_object_closes == 0); | |
7c673cae FG |
160 | |
161 | uint64_t active_set = m_journal_metadata->get_active_set(); | |
11fdf7f2 | 162 | ceph_assert(m_current_set == active_set); |
7c673cae FG |
163 | ++m_current_set; |
164 | ++m_in_flight_advance_sets; | |
165 | ||
166 | ldout(m_cct, 20) << __func__ << ": closing active object set " | |
167 | << object_set << dendl; | |
168 | if (close_object_set(m_current_set)) { | |
169 | advance_object_set(); | |
170 | } | |
171 | } | |
172 | ||
173 | void JournalRecorder::advance_object_set() { | |
11fdf7f2 | 174 | ceph_assert(m_lock.is_locked()); |
7c673cae | 175 | |
11fdf7f2 | 176 | ceph_assert(m_in_flight_object_closes == 0); |
7c673cae FG |
177 | ldout(m_cct, 20) << __func__ << ": advance to object set " << m_current_set |
178 | << dendl; | |
179 | m_journal_metadata->set_active_set(m_current_set, new C_AdvanceObjectSet( | |
180 | this)); | |
181 | } | |
182 | ||
183 | void JournalRecorder::handle_advance_object_set(int r) { | |
184 | Mutex::Locker locker(m_lock); | |
185 | ldout(m_cct, 20) << __func__ << ": r=" << r << dendl; | |
186 | ||
11fdf7f2 | 187 | ceph_assert(m_in_flight_advance_sets > 0); |
7c673cae FG |
188 | --m_in_flight_advance_sets; |
189 | ||
190 | if (r < 0 && r != -ESTALE) { | |
191 | lderr(m_cct) << __func__ << ": failed to advance object set: " | |
192 | << cpp_strerror(r) << dendl; | |
193 | } | |
194 | ||
195 | if (m_in_flight_advance_sets == 0 && m_in_flight_object_closes == 0) { | |
196 | open_object_set(); | |
197 | } | |
198 | } | |
199 | ||
200 | void JournalRecorder::open_object_set() { | |
11fdf7f2 | 201 | ceph_assert(m_lock.is_locked()); |
7c673cae FG |
202 | |
203 | ldout(m_cct, 10) << __func__ << ": opening object set " << m_current_set | |
204 | << dendl; | |
205 | ||
206 | uint8_t splay_width = m_journal_metadata->get_splay_width(); | |
207 | ||
208 | lock_object_recorders(); | |
209 | for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin(); | |
210 | it != m_object_ptrs.end(); ++it) { | |
211 | ObjectRecorderPtr object_recorder = it->second; | |
212 | uint64_t object_number = object_recorder->get_object_number(); | |
213 | if (object_number / splay_width != m_current_set) { | |
11fdf7f2 | 214 | ceph_assert(object_recorder->is_closed()); |
7c673cae FG |
215 | |
216 | // ready to close object and open object in active set | |
217 | create_next_object_recorder_unlock(object_recorder); | |
218 | } else { | |
219 | uint8_t splay_offset = object_number % splay_width; | |
220 | m_object_locks[splay_offset]->Unlock(); | |
221 | } | |
222 | } | |
223 | } | |
224 | ||
225 | bool JournalRecorder::close_object_set(uint64_t active_set) { | |
11fdf7f2 | 226 | ceph_assert(m_lock.is_locked()); |
7c673cae FG |
227 | |
228 | // object recorders will invoke overflow handler as they complete | |
229 | // closing the object to ensure correct order of future appends | |
230 | uint8_t splay_width = m_journal_metadata->get_splay_width(); | |
231 | lock_object_recorders(); | |
232 | for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin(); | |
233 | it != m_object_ptrs.end(); ++it) { | |
234 | ObjectRecorderPtr object_recorder = it->second; | |
235 | if (object_recorder->get_object_number() / splay_width != active_set) { | |
236 | ldout(m_cct, 10) << __func__ << ": closing object " | |
237 | << object_recorder->get_oid() << dendl; | |
238 | // flush out all queued appends and hold future appends | |
239 | if (!object_recorder->close()) { | |
240 | ++m_in_flight_object_closes; | |
241 | } else { | |
242 | ldout(m_cct, 20) << __func__ << ": object " | |
243 | << object_recorder->get_oid() << " closed" << dendl; | |
244 | } | |
245 | } | |
246 | } | |
247 | unlock_object_recorders(); | |
248 | return (m_in_flight_object_closes == 0); | |
249 | } | |
250 | ||
251 | ObjectRecorderPtr JournalRecorder::create_object_recorder( | |
252 | uint64_t object_number, shared_ptr<Mutex> lock) { | |
253 | ObjectRecorderPtr object_recorder(new ObjectRecorder( | |
254 | m_ioctx, utils::get_object_name(m_object_oid_prefix, object_number), | |
255 | object_number, lock, m_journal_metadata->get_work_queue(), | |
256 | m_journal_metadata->get_timer(), m_journal_metadata->get_timer_lock(), | |
257 | &m_object_handler, m_journal_metadata->get_order(), m_flush_interval, | |
11fdf7f2 | 258 | m_flush_bytes, m_flush_age, m_max_in_flight_appends)); |
7c673cae FG |
259 | return object_recorder; |
260 | } | |
261 | ||
262 | void JournalRecorder::create_next_object_recorder_unlock( | |
263 | ObjectRecorderPtr object_recorder) { | |
11fdf7f2 | 264 | ceph_assert(m_lock.is_locked()); |
7c673cae FG |
265 | |
266 | uint64_t object_number = object_recorder->get_object_number(); | |
267 | uint8_t splay_width = m_journal_metadata->get_splay_width(); | |
268 | uint8_t splay_offset = object_number % splay_width; | |
269 | ||
11fdf7f2 | 270 | ceph_assert(m_object_locks[splay_offset]->is_locked()); |
7c673cae FG |
271 | |
272 | ObjectRecorderPtr new_object_recorder = create_object_recorder( | |
273 | (m_current_set * splay_width) + splay_offset, m_object_locks[splay_offset]); | |
274 | ||
275 | ldout(m_cct, 10) << __func__ << ": " | |
276 | << "old oid=" << object_recorder->get_oid() << ", " | |
277 | << "new oid=" << new_object_recorder->get_oid() << dendl; | |
278 | AppendBuffers append_buffers; | |
279 | object_recorder->claim_append_buffers(&append_buffers); | |
280 | ||
281 | // update the commit record to point to the correct object number | |
282 | for (auto &append_buffer : append_buffers) { | |
283 | m_journal_metadata->overflow_commit_tid( | |
284 | append_buffer.first->get_commit_tid(), | |
285 | new_object_recorder->get_object_number()); | |
286 | } | |
287 | ||
288 | new_object_recorder->append_unlock(std::move(append_buffers)); | |
289 | m_object_ptrs[splay_offset] = new_object_recorder; | |
290 | } | |
291 | ||
292 | void JournalRecorder::handle_update() { | |
293 | Mutex::Locker locker(m_lock); | |
294 | ||
295 | uint64_t active_set = m_journal_metadata->get_active_set(); | |
296 | if (m_current_set < active_set) { | |
297 | // peer journal client advanced the active set | |
298 | ldout(m_cct, 20) << __func__ << ": " | |
299 | << "current_set=" << m_current_set << ", " | |
300 | << "active_set=" << active_set << dendl; | |
301 | ||
302 | uint64_t current_set = m_current_set; | |
303 | m_current_set = active_set; | |
304 | if (m_in_flight_advance_sets == 0 && m_in_flight_object_closes == 0) { | |
305 | ldout(m_cct, 20) << __func__ << ": closing current object set " | |
306 | << current_set << dendl; | |
307 | if (close_object_set(active_set)) { | |
308 | open_object_set(); | |
309 | } | |
310 | } | |
311 | } | |
312 | } | |
313 | ||
314 | void JournalRecorder::handle_closed(ObjectRecorder *object_recorder) { | |
315 | ldout(m_cct, 10) << __func__ << ": " << object_recorder->get_oid() << dendl; | |
316 | ||
317 | Mutex::Locker locker(m_lock); | |
318 | ||
319 | uint64_t object_number = object_recorder->get_object_number(); | |
320 | uint8_t splay_width = m_journal_metadata->get_splay_width(); | |
321 | uint8_t splay_offset = object_number % splay_width; | |
322 | ObjectRecorderPtr active_object_recorder = m_object_ptrs[splay_offset]; | |
11fdf7f2 | 323 | ceph_assert(active_object_recorder->get_object_number() == object_number); |
7c673cae | 324 | |
11fdf7f2 | 325 | ceph_assert(m_in_flight_object_closes > 0); |
7c673cae FG |
326 | --m_in_flight_object_closes; |
327 | ||
328 | // object closed after advance active set committed | |
329 | ldout(m_cct, 20) << __func__ << ": object " | |
330 | << active_object_recorder->get_oid() << " closed" << dendl; | |
331 | if (m_in_flight_object_closes == 0) { | |
332 | if (m_in_flight_advance_sets == 0) { | |
333 | // peer forced closing of object set | |
334 | open_object_set(); | |
335 | } else { | |
336 | // local overflow advanced object set | |
337 | advance_object_set(); | |
338 | } | |
339 | } | |
340 | } | |
341 | ||
342 | void JournalRecorder::handle_overflow(ObjectRecorder *object_recorder) { | |
343 | ldout(m_cct, 10) << __func__ << ": " << object_recorder->get_oid() << dendl; | |
344 | ||
345 | Mutex::Locker locker(m_lock); | |
346 | ||
347 | uint64_t object_number = object_recorder->get_object_number(); | |
348 | uint8_t splay_width = m_journal_metadata->get_splay_width(); | |
349 | uint8_t splay_offset = object_number % splay_width; | |
350 | ObjectRecorderPtr active_object_recorder = m_object_ptrs[splay_offset]; | |
11fdf7f2 | 351 | ceph_assert(active_object_recorder->get_object_number() == object_number); |
7c673cae FG |
352 | |
353 | ldout(m_cct, 20) << __func__ << ": object " | |
354 | << active_object_recorder->get_oid() << " overflowed" | |
355 | << dendl; | |
356 | close_and_advance_object_set(object_number / splay_width); | |
357 | } | |
358 | ||
359 | } // namespace journal |