]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "journal/JournalRecorder.h" | |
5 | #include "common/errno.h" | |
6 | #include "journal/Entry.h" | |
7 | #include "journal/Utils.h" | |
8 | ||
31f18b77 FG |
9 | #include <atomic> |
10 | ||
7c673cae FG |
11 | #define dout_subsys ceph_subsys_journaler |
12 | #undef dout_prefix | |
13 | #define dout_prefix *_dout << "JournalRecorder: " << this << " " | |
14 | ||
15 | using std::shared_ptr; | |
16 | ||
17 | namespace journal { | |
18 | ||
19 | namespace { | |
20 | ||
21 | struct C_Flush : public Context { | |
22 | JournalMetadataPtr journal_metadata; | |
23 | Context *on_finish; | |
31f18b77 | 24 | std::atomic<int64_t> pending_flushes = { 0 }; |
7c673cae FG |
25 | int ret_val; |
26 | ||
27 | C_Flush(JournalMetadataPtr _journal_metadata, Context *_on_finish, | |
28 | size_t _pending_flushes) | |
29 | : journal_metadata(_journal_metadata), on_finish(_on_finish), | |
30 | pending_flushes(_pending_flushes), ret_val(0) { | |
31 | } | |
32 | ||
33 | void complete(int r) override { | |
34 | if (r < 0 && ret_val == 0) { | |
35 | ret_val = r; | |
36 | } | |
31f18b77 | 37 | if (--pending_flushes == 0) { |
7c673cae FG |
38 | // ensure all prior callback have been flushed as well |
39 | journal_metadata->queue(on_finish, ret_val); | |
40 | delete this; | |
41 | } | |
42 | } | |
43 | void finish(int r) override { | |
44 | } | |
45 | }; | |
46 | ||
47 | } // anonymous namespace | |
48 | ||
49 | JournalRecorder::JournalRecorder(librados::IoCtx &ioctx, | |
50 | const std::string &object_oid_prefix, | |
51 | const JournalMetadataPtr& journal_metadata, | |
52 | uint32_t flush_interval, uint64_t flush_bytes, | |
53 | double flush_age) | |
54 | : m_cct(NULL), m_object_oid_prefix(object_oid_prefix), | |
55 | m_journal_metadata(journal_metadata), m_flush_interval(flush_interval), | |
56 | m_flush_bytes(flush_bytes), m_flush_age(flush_age), m_listener(this), | |
57 | m_object_handler(this), m_lock("JournalerRecorder::m_lock"), | |
58 | m_current_set(m_journal_metadata->get_active_set()) { | |
59 | ||
60 | Mutex::Locker locker(m_lock); | |
61 | m_ioctx.dup(ioctx); | |
62 | m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct()); | |
63 | ||
64 | uint8_t splay_width = m_journal_metadata->get_splay_width(); | |
65 | for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) { | |
66 | m_object_locks.push_back(shared_ptr<Mutex>( | |
67 | new Mutex("ObjectRecorder::m_lock::"+ | |
68 | std::to_string(splay_offset)))); | |
69 | uint64_t object_number = splay_offset + (m_current_set * splay_width); | |
70 | m_object_ptrs[splay_offset] = create_object_recorder( | |
71 | object_number, | |
72 | m_object_locks[splay_offset]); | |
73 | } | |
74 | ||
75 | m_journal_metadata->add_listener(&m_listener); | |
76 | } | |
77 | ||
78 | JournalRecorder::~JournalRecorder() { | |
79 | m_journal_metadata->remove_listener(&m_listener); | |
80 | ||
81 | Mutex::Locker locker(m_lock); | |
82 | assert(m_in_flight_advance_sets == 0); | |
83 | assert(m_in_flight_object_closes == 0); | |
84 | } | |
85 | ||
86 | Future JournalRecorder::append(uint64_t tag_tid, | |
87 | const bufferlist &payload_bl) { | |
88 | ||
89 | m_lock.Lock(); | |
90 | ||
91 | uint64_t entry_tid = m_journal_metadata->allocate_entry_tid(tag_tid); | |
92 | uint8_t splay_width = m_journal_metadata->get_splay_width(); | |
93 | uint8_t splay_offset = entry_tid % splay_width; | |
94 | ||
95 | ObjectRecorderPtr object_ptr = get_object(splay_offset); | |
96 | uint64_t commit_tid = m_journal_metadata->allocate_commit_tid( | |
97 | object_ptr->get_object_number(), tag_tid, entry_tid); | |
98 | FutureImplPtr future(new FutureImpl(tag_tid, entry_tid, commit_tid)); | |
99 | future->init(m_prev_future); | |
100 | m_prev_future = future; | |
101 | ||
102 | m_object_locks[splay_offset]->Lock(); | |
103 | m_lock.Unlock(); | |
104 | ||
105 | bufferlist entry_bl; | |
106 | ::encode(Entry(future->get_tag_tid(), future->get_entry_tid(), payload_bl), | |
107 | entry_bl); | |
108 | assert(entry_bl.length() <= m_journal_metadata->get_object_size()); | |
109 | ||
110 | bool object_full = object_ptr->append_unlock({{future, entry_bl}}); | |
111 | if (object_full) { | |
112 | ldout(m_cct, 10) << "object " << object_ptr->get_oid() << " now full" | |
113 | << dendl; | |
114 | Mutex::Locker l(m_lock); | |
115 | close_and_advance_object_set(object_ptr->get_object_number() / splay_width); | |
116 | } | |
117 | return Future(future); | |
118 | } | |
119 | ||
120 | void JournalRecorder::flush(Context *on_safe) { | |
121 | C_Flush *ctx; | |
122 | { | |
123 | Mutex::Locker locker(m_lock); | |
124 | ||
125 | ctx = new C_Flush(m_journal_metadata, on_safe, m_object_ptrs.size() + 1); | |
126 | for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin(); | |
127 | it != m_object_ptrs.end(); ++it) { | |
128 | it->second->flush(ctx); | |
129 | } | |
130 | ||
131 | } | |
132 | ||
133 | // avoid holding the lock in case there is nothing to flush | |
134 | ctx->complete(0); | |
135 | } | |
136 | ||
137 | ObjectRecorderPtr JournalRecorder::get_object(uint8_t splay_offset) { | |
138 | assert(m_lock.is_locked()); | |
139 | ||
140 | ObjectRecorderPtr object_recoder = m_object_ptrs[splay_offset]; | |
141 | assert(object_recoder != NULL); | |
142 | return object_recoder; | |
143 | } | |
144 | ||
145 | void JournalRecorder::close_and_advance_object_set(uint64_t object_set) { | |
146 | assert(m_lock.is_locked()); | |
147 | ||
148 | // entry overflow from open object | |
149 | if (m_current_set != object_set) { | |
150 | ldout(m_cct, 20) << __func__ << ": close already in-progress" << dendl; | |
151 | return; | |
152 | } | |
153 | ||
154 | // we shouldn't overflow upon append if already closed and we | |
155 | // shouldn't receive an overflowed callback if already closed | |
156 | assert(m_in_flight_advance_sets == 0); | |
157 | assert(m_in_flight_object_closes == 0); | |
158 | ||
159 | uint64_t active_set = m_journal_metadata->get_active_set(); | |
160 | assert(m_current_set == active_set); | |
161 | ++m_current_set; | |
162 | ++m_in_flight_advance_sets; | |
163 | ||
164 | ldout(m_cct, 20) << __func__ << ": closing active object set " | |
165 | << object_set << dendl; | |
166 | if (close_object_set(m_current_set)) { | |
167 | advance_object_set(); | |
168 | } | |
169 | } | |
170 | ||
171 | void JournalRecorder::advance_object_set() { | |
172 | assert(m_lock.is_locked()); | |
173 | ||
174 | assert(m_in_flight_object_closes == 0); | |
175 | ldout(m_cct, 20) << __func__ << ": advance to object set " << m_current_set | |
176 | << dendl; | |
177 | m_journal_metadata->set_active_set(m_current_set, new C_AdvanceObjectSet( | |
178 | this)); | |
179 | } | |
180 | ||
181 | void JournalRecorder::handle_advance_object_set(int r) { | |
182 | Mutex::Locker locker(m_lock); | |
183 | ldout(m_cct, 20) << __func__ << ": r=" << r << dendl; | |
184 | ||
185 | assert(m_in_flight_advance_sets > 0); | |
186 | --m_in_flight_advance_sets; | |
187 | ||
188 | if (r < 0 && r != -ESTALE) { | |
189 | lderr(m_cct) << __func__ << ": failed to advance object set: " | |
190 | << cpp_strerror(r) << dendl; | |
191 | } | |
192 | ||
193 | if (m_in_flight_advance_sets == 0 && m_in_flight_object_closes == 0) { | |
194 | open_object_set(); | |
195 | } | |
196 | } | |
197 | ||
198 | void JournalRecorder::open_object_set() { | |
199 | assert(m_lock.is_locked()); | |
200 | ||
201 | ldout(m_cct, 10) << __func__ << ": opening object set " << m_current_set | |
202 | << dendl; | |
203 | ||
204 | uint8_t splay_width = m_journal_metadata->get_splay_width(); | |
205 | ||
206 | lock_object_recorders(); | |
207 | for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin(); | |
208 | it != m_object_ptrs.end(); ++it) { | |
209 | ObjectRecorderPtr object_recorder = it->second; | |
210 | uint64_t object_number = object_recorder->get_object_number(); | |
211 | if (object_number / splay_width != m_current_set) { | |
212 | assert(object_recorder->is_closed()); | |
213 | ||
214 | // ready to close object and open object in active set | |
215 | create_next_object_recorder_unlock(object_recorder); | |
216 | } else { | |
217 | uint8_t splay_offset = object_number % splay_width; | |
218 | m_object_locks[splay_offset]->Unlock(); | |
219 | } | |
220 | } | |
221 | } | |
222 | ||
223 | bool JournalRecorder::close_object_set(uint64_t active_set) { | |
224 | assert(m_lock.is_locked()); | |
225 | ||
226 | // object recorders will invoke overflow handler as they complete | |
227 | // closing the object to ensure correct order of future appends | |
228 | uint8_t splay_width = m_journal_metadata->get_splay_width(); | |
229 | lock_object_recorders(); | |
230 | for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin(); | |
231 | it != m_object_ptrs.end(); ++it) { | |
232 | ObjectRecorderPtr object_recorder = it->second; | |
233 | if (object_recorder->get_object_number() / splay_width != active_set) { | |
234 | ldout(m_cct, 10) << __func__ << ": closing object " | |
235 | << object_recorder->get_oid() << dendl; | |
236 | // flush out all queued appends and hold future appends | |
237 | if (!object_recorder->close()) { | |
238 | ++m_in_flight_object_closes; | |
239 | } else { | |
240 | ldout(m_cct, 20) << __func__ << ": object " | |
241 | << object_recorder->get_oid() << " closed" << dendl; | |
242 | } | |
243 | } | |
244 | } | |
245 | unlock_object_recorders(); | |
246 | return (m_in_flight_object_closes == 0); | |
247 | } | |
248 | ||
249 | ObjectRecorderPtr JournalRecorder::create_object_recorder( | |
250 | uint64_t object_number, shared_ptr<Mutex> lock) { | |
251 | ObjectRecorderPtr object_recorder(new ObjectRecorder( | |
252 | m_ioctx, utils::get_object_name(m_object_oid_prefix, object_number), | |
253 | object_number, lock, m_journal_metadata->get_work_queue(), | |
254 | m_journal_metadata->get_timer(), m_journal_metadata->get_timer_lock(), | |
255 | &m_object_handler, m_journal_metadata->get_order(), m_flush_interval, | |
256 | m_flush_bytes, m_flush_age)); | |
257 | return object_recorder; | |
258 | } | |
259 | ||
260 | void JournalRecorder::create_next_object_recorder_unlock( | |
261 | ObjectRecorderPtr object_recorder) { | |
262 | assert(m_lock.is_locked()); | |
263 | ||
264 | uint64_t object_number = object_recorder->get_object_number(); | |
265 | uint8_t splay_width = m_journal_metadata->get_splay_width(); | |
266 | uint8_t splay_offset = object_number % splay_width; | |
267 | ||
268 | assert(m_object_locks[splay_offset]->is_locked()); | |
269 | ||
270 | ObjectRecorderPtr new_object_recorder = create_object_recorder( | |
271 | (m_current_set * splay_width) + splay_offset, m_object_locks[splay_offset]); | |
272 | ||
273 | ldout(m_cct, 10) << __func__ << ": " | |
274 | << "old oid=" << object_recorder->get_oid() << ", " | |
275 | << "new oid=" << new_object_recorder->get_oid() << dendl; | |
276 | AppendBuffers append_buffers; | |
277 | object_recorder->claim_append_buffers(&append_buffers); | |
278 | ||
279 | // update the commit record to point to the correct object number | |
280 | for (auto &append_buffer : append_buffers) { | |
281 | m_journal_metadata->overflow_commit_tid( | |
282 | append_buffer.first->get_commit_tid(), | |
283 | new_object_recorder->get_object_number()); | |
284 | } | |
285 | ||
286 | new_object_recorder->append_unlock(std::move(append_buffers)); | |
287 | m_object_ptrs[splay_offset] = new_object_recorder; | |
288 | } | |
289 | ||
290 | void JournalRecorder::handle_update() { | |
291 | Mutex::Locker locker(m_lock); | |
292 | ||
293 | uint64_t active_set = m_journal_metadata->get_active_set(); | |
294 | if (m_current_set < active_set) { | |
295 | // peer journal client advanced the active set | |
296 | ldout(m_cct, 20) << __func__ << ": " | |
297 | << "current_set=" << m_current_set << ", " | |
298 | << "active_set=" << active_set << dendl; | |
299 | ||
300 | uint64_t current_set = m_current_set; | |
301 | m_current_set = active_set; | |
302 | if (m_in_flight_advance_sets == 0 && m_in_flight_object_closes == 0) { | |
303 | ldout(m_cct, 20) << __func__ << ": closing current object set " | |
304 | << current_set << dendl; | |
305 | if (close_object_set(active_set)) { | |
306 | open_object_set(); | |
307 | } | |
308 | } | |
309 | } | |
310 | } | |
311 | ||
312 | void JournalRecorder::handle_closed(ObjectRecorder *object_recorder) { | |
313 | ldout(m_cct, 10) << __func__ << ": " << object_recorder->get_oid() << dendl; | |
314 | ||
315 | Mutex::Locker locker(m_lock); | |
316 | ||
317 | uint64_t object_number = object_recorder->get_object_number(); | |
318 | uint8_t splay_width = m_journal_metadata->get_splay_width(); | |
319 | uint8_t splay_offset = object_number % splay_width; | |
320 | ObjectRecorderPtr active_object_recorder = m_object_ptrs[splay_offset]; | |
321 | assert(active_object_recorder->get_object_number() == object_number); | |
322 | ||
323 | assert(m_in_flight_object_closes > 0); | |
324 | --m_in_flight_object_closes; | |
325 | ||
326 | // object closed after advance active set committed | |
327 | ldout(m_cct, 20) << __func__ << ": object " | |
328 | << active_object_recorder->get_oid() << " closed" << dendl; | |
329 | if (m_in_flight_object_closes == 0) { | |
330 | if (m_in_flight_advance_sets == 0) { | |
331 | // peer forced closing of object set | |
332 | open_object_set(); | |
333 | } else { | |
334 | // local overflow advanced object set | |
335 | advance_object_set(); | |
336 | } | |
337 | } | |
338 | } | |
339 | ||
340 | void JournalRecorder::handle_overflow(ObjectRecorder *object_recorder) { | |
341 | ldout(m_cct, 10) << __func__ << ": " << object_recorder->get_oid() << dendl; | |
342 | ||
343 | Mutex::Locker locker(m_lock); | |
344 | ||
345 | uint64_t object_number = object_recorder->get_object_number(); | |
346 | uint8_t splay_width = m_journal_metadata->get_splay_width(); | |
347 | uint8_t splay_offset = object_number % splay_width; | |
348 | ObjectRecorderPtr active_object_recorder = m_object_ptrs[splay_offset]; | |
349 | assert(active_object_recorder->get_object_number() == object_number); | |
350 | ||
351 | ldout(m_cct, 20) << __func__ << ": object " | |
352 | << active_object_recorder->get_oid() << " overflowed" | |
353 | << dendl; | |
354 | close_and_advance_object_set(object_number / splay_width); | |
355 | } | |
356 | ||
357 | } // namespace journal |