]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "journal/JournalRecorder.h" | |
5 | #include "common/errno.h" | |
6 | #include "journal/Entry.h" | |
7 | #include "journal/Utils.h" | |
8 | ||
9 | #define dout_subsys ceph_subsys_journaler | |
10 | #undef dout_prefix | |
11 | #define dout_prefix *_dout << "JournalRecorder: " << this << " " | |
12 | ||
13 | using std::shared_ptr; | |
14 | ||
15 | namespace journal { | |
16 | ||
17 | namespace { | |
18 | ||
19 | struct C_Flush : public Context { | |
20 | JournalMetadataPtr journal_metadata; | |
21 | Context *on_finish; | |
22 | atomic_t pending_flushes; | |
23 | int ret_val; | |
24 | ||
25 | C_Flush(JournalMetadataPtr _journal_metadata, Context *_on_finish, | |
26 | size_t _pending_flushes) | |
27 | : journal_metadata(_journal_metadata), on_finish(_on_finish), | |
28 | pending_flushes(_pending_flushes), ret_val(0) { | |
29 | } | |
30 | ||
31 | void complete(int r) override { | |
32 | if (r < 0 && ret_val == 0) { | |
33 | ret_val = r; | |
34 | } | |
35 | if (pending_flushes.dec() == 0) { | |
36 | // ensure all prior callback have been flushed as well | |
37 | journal_metadata->queue(on_finish, ret_val); | |
38 | delete this; | |
39 | } | |
40 | } | |
41 | void finish(int r) override { | |
42 | } | |
43 | }; | |
44 | ||
45 | } // anonymous namespace | |
46 | ||
47 | JournalRecorder::JournalRecorder(librados::IoCtx &ioctx, | |
48 | const std::string &object_oid_prefix, | |
49 | const JournalMetadataPtr& journal_metadata, | |
50 | uint32_t flush_interval, uint64_t flush_bytes, | |
51 | double flush_age) | |
52 | : m_cct(NULL), m_object_oid_prefix(object_oid_prefix), | |
53 | m_journal_metadata(journal_metadata), m_flush_interval(flush_interval), | |
54 | m_flush_bytes(flush_bytes), m_flush_age(flush_age), m_listener(this), | |
55 | m_object_handler(this), m_lock("JournalerRecorder::m_lock"), | |
56 | m_current_set(m_journal_metadata->get_active_set()) { | |
57 | ||
58 | Mutex::Locker locker(m_lock); | |
59 | m_ioctx.dup(ioctx); | |
60 | m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct()); | |
61 | ||
62 | uint8_t splay_width = m_journal_metadata->get_splay_width(); | |
63 | for (uint8_t splay_offset = 0; splay_offset < splay_width; ++splay_offset) { | |
64 | m_object_locks.push_back(shared_ptr<Mutex>( | |
65 | new Mutex("ObjectRecorder::m_lock::"+ | |
66 | std::to_string(splay_offset)))); | |
67 | uint64_t object_number = splay_offset + (m_current_set * splay_width); | |
68 | m_object_ptrs[splay_offset] = create_object_recorder( | |
69 | object_number, | |
70 | m_object_locks[splay_offset]); | |
71 | } | |
72 | ||
73 | m_journal_metadata->add_listener(&m_listener); | |
74 | } | |
75 | ||
76 | JournalRecorder::~JournalRecorder() { | |
77 | m_journal_metadata->remove_listener(&m_listener); | |
78 | ||
79 | Mutex::Locker locker(m_lock); | |
80 | assert(m_in_flight_advance_sets == 0); | |
81 | assert(m_in_flight_object_closes == 0); | |
82 | } | |
83 | ||
84 | Future JournalRecorder::append(uint64_t tag_tid, | |
85 | const bufferlist &payload_bl) { | |
86 | ||
87 | m_lock.Lock(); | |
88 | ||
89 | uint64_t entry_tid = m_journal_metadata->allocate_entry_tid(tag_tid); | |
90 | uint8_t splay_width = m_journal_metadata->get_splay_width(); | |
91 | uint8_t splay_offset = entry_tid % splay_width; | |
92 | ||
93 | ObjectRecorderPtr object_ptr = get_object(splay_offset); | |
94 | uint64_t commit_tid = m_journal_metadata->allocate_commit_tid( | |
95 | object_ptr->get_object_number(), tag_tid, entry_tid); | |
96 | FutureImplPtr future(new FutureImpl(tag_tid, entry_tid, commit_tid)); | |
97 | future->init(m_prev_future); | |
98 | m_prev_future = future; | |
99 | ||
100 | m_object_locks[splay_offset]->Lock(); | |
101 | m_lock.Unlock(); | |
102 | ||
103 | bufferlist entry_bl; | |
104 | ::encode(Entry(future->get_tag_tid(), future->get_entry_tid(), payload_bl), | |
105 | entry_bl); | |
106 | assert(entry_bl.length() <= m_journal_metadata->get_object_size()); | |
107 | ||
108 | bool object_full = object_ptr->append_unlock({{future, entry_bl}}); | |
109 | if (object_full) { | |
110 | ldout(m_cct, 10) << "object " << object_ptr->get_oid() << " now full" | |
111 | << dendl; | |
112 | Mutex::Locker l(m_lock); | |
113 | close_and_advance_object_set(object_ptr->get_object_number() / splay_width); | |
114 | } | |
115 | return Future(future); | |
116 | } | |
117 | ||
118 | void JournalRecorder::flush(Context *on_safe) { | |
119 | C_Flush *ctx; | |
120 | { | |
121 | Mutex::Locker locker(m_lock); | |
122 | ||
123 | ctx = new C_Flush(m_journal_metadata, on_safe, m_object_ptrs.size() + 1); | |
124 | for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin(); | |
125 | it != m_object_ptrs.end(); ++it) { | |
126 | it->second->flush(ctx); | |
127 | } | |
128 | ||
129 | } | |
130 | ||
131 | // avoid holding the lock in case there is nothing to flush | |
132 | ctx->complete(0); | |
133 | } | |
134 | ||
135 | ObjectRecorderPtr JournalRecorder::get_object(uint8_t splay_offset) { | |
136 | assert(m_lock.is_locked()); | |
137 | ||
138 | ObjectRecorderPtr object_recoder = m_object_ptrs[splay_offset]; | |
139 | assert(object_recoder != NULL); | |
140 | return object_recoder; | |
141 | } | |
142 | ||
143 | void JournalRecorder::close_and_advance_object_set(uint64_t object_set) { | |
144 | assert(m_lock.is_locked()); | |
145 | ||
146 | // entry overflow from open object | |
147 | if (m_current_set != object_set) { | |
148 | ldout(m_cct, 20) << __func__ << ": close already in-progress" << dendl; | |
149 | return; | |
150 | } | |
151 | ||
152 | // we shouldn't overflow upon append if already closed and we | |
153 | // shouldn't receive an overflowed callback if already closed | |
154 | assert(m_in_flight_advance_sets == 0); | |
155 | assert(m_in_flight_object_closes == 0); | |
156 | ||
157 | uint64_t active_set = m_journal_metadata->get_active_set(); | |
158 | assert(m_current_set == active_set); | |
159 | ++m_current_set; | |
160 | ++m_in_flight_advance_sets; | |
161 | ||
162 | ldout(m_cct, 20) << __func__ << ": closing active object set " | |
163 | << object_set << dendl; | |
164 | if (close_object_set(m_current_set)) { | |
165 | advance_object_set(); | |
166 | } | |
167 | } | |
168 | ||
169 | void JournalRecorder::advance_object_set() { | |
170 | assert(m_lock.is_locked()); | |
171 | ||
172 | assert(m_in_flight_object_closes == 0); | |
173 | ldout(m_cct, 20) << __func__ << ": advance to object set " << m_current_set | |
174 | << dendl; | |
175 | m_journal_metadata->set_active_set(m_current_set, new C_AdvanceObjectSet( | |
176 | this)); | |
177 | } | |
178 | ||
179 | void JournalRecorder::handle_advance_object_set(int r) { | |
180 | Mutex::Locker locker(m_lock); | |
181 | ldout(m_cct, 20) << __func__ << ": r=" << r << dendl; | |
182 | ||
183 | assert(m_in_flight_advance_sets > 0); | |
184 | --m_in_flight_advance_sets; | |
185 | ||
186 | if (r < 0 && r != -ESTALE) { | |
187 | lderr(m_cct) << __func__ << ": failed to advance object set: " | |
188 | << cpp_strerror(r) << dendl; | |
189 | } | |
190 | ||
191 | if (m_in_flight_advance_sets == 0 && m_in_flight_object_closes == 0) { | |
192 | open_object_set(); | |
193 | } | |
194 | } | |
195 | ||
196 | void JournalRecorder::open_object_set() { | |
197 | assert(m_lock.is_locked()); | |
198 | ||
199 | ldout(m_cct, 10) << __func__ << ": opening object set " << m_current_set | |
200 | << dendl; | |
201 | ||
202 | uint8_t splay_width = m_journal_metadata->get_splay_width(); | |
203 | ||
204 | lock_object_recorders(); | |
205 | for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin(); | |
206 | it != m_object_ptrs.end(); ++it) { | |
207 | ObjectRecorderPtr object_recorder = it->second; | |
208 | uint64_t object_number = object_recorder->get_object_number(); | |
209 | if (object_number / splay_width != m_current_set) { | |
210 | assert(object_recorder->is_closed()); | |
211 | ||
212 | // ready to close object and open object in active set | |
213 | create_next_object_recorder_unlock(object_recorder); | |
214 | } else { | |
215 | uint8_t splay_offset = object_number % splay_width; | |
216 | m_object_locks[splay_offset]->Unlock(); | |
217 | } | |
218 | } | |
219 | } | |
220 | ||
221 | bool JournalRecorder::close_object_set(uint64_t active_set) { | |
222 | assert(m_lock.is_locked()); | |
223 | ||
224 | // object recorders will invoke overflow handler as they complete | |
225 | // closing the object to ensure correct order of future appends | |
226 | uint8_t splay_width = m_journal_metadata->get_splay_width(); | |
227 | lock_object_recorders(); | |
228 | for (ObjectRecorderPtrs::iterator it = m_object_ptrs.begin(); | |
229 | it != m_object_ptrs.end(); ++it) { | |
230 | ObjectRecorderPtr object_recorder = it->second; | |
231 | if (object_recorder->get_object_number() / splay_width != active_set) { | |
232 | ldout(m_cct, 10) << __func__ << ": closing object " | |
233 | << object_recorder->get_oid() << dendl; | |
234 | // flush out all queued appends and hold future appends | |
235 | if (!object_recorder->close()) { | |
236 | ++m_in_flight_object_closes; | |
237 | } else { | |
238 | ldout(m_cct, 20) << __func__ << ": object " | |
239 | << object_recorder->get_oid() << " closed" << dendl; | |
240 | } | |
241 | } | |
242 | } | |
243 | unlock_object_recorders(); | |
244 | return (m_in_flight_object_closes == 0); | |
245 | } | |
246 | ||
247 | ObjectRecorderPtr JournalRecorder::create_object_recorder( | |
248 | uint64_t object_number, shared_ptr<Mutex> lock) { | |
249 | ObjectRecorderPtr object_recorder(new ObjectRecorder( | |
250 | m_ioctx, utils::get_object_name(m_object_oid_prefix, object_number), | |
251 | object_number, lock, m_journal_metadata->get_work_queue(), | |
252 | m_journal_metadata->get_timer(), m_journal_metadata->get_timer_lock(), | |
253 | &m_object_handler, m_journal_metadata->get_order(), m_flush_interval, | |
254 | m_flush_bytes, m_flush_age)); | |
255 | return object_recorder; | |
256 | } | |
257 | ||
258 | void JournalRecorder::create_next_object_recorder_unlock( | |
259 | ObjectRecorderPtr object_recorder) { | |
260 | assert(m_lock.is_locked()); | |
261 | ||
262 | uint64_t object_number = object_recorder->get_object_number(); | |
263 | uint8_t splay_width = m_journal_metadata->get_splay_width(); | |
264 | uint8_t splay_offset = object_number % splay_width; | |
265 | ||
266 | assert(m_object_locks[splay_offset]->is_locked()); | |
267 | ||
268 | ObjectRecorderPtr new_object_recorder = create_object_recorder( | |
269 | (m_current_set * splay_width) + splay_offset, m_object_locks[splay_offset]); | |
270 | ||
271 | ldout(m_cct, 10) << __func__ << ": " | |
272 | << "old oid=" << object_recorder->get_oid() << ", " | |
273 | << "new oid=" << new_object_recorder->get_oid() << dendl; | |
274 | AppendBuffers append_buffers; | |
275 | object_recorder->claim_append_buffers(&append_buffers); | |
276 | ||
277 | // update the commit record to point to the correct object number | |
278 | for (auto &append_buffer : append_buffers) { | |
279 | m_journal_metadata->overflow_commit_tid( | |
280 | append_buffer.first->get_commit_tid(), | |
281 | new_object_recorder->get_object_number()); | |
282 | } | |
283 | ||
284 | new_object_recorder->append_unlock(std::move(append_buffers)); | |
285 | m_object_ptrs[splay_offset] = new_object_recorder; | |
286 | } | |
287 | ||
288 | void JournalRecorder::handle_update() { | |
289 | Mutex::Locker locker(m_lock); | |
290 | ||
291 | uint64_t active_set = m_journal_metadata->get_active_set(); | |
292 | if (m_current_set < active_set) { | |
293 | // peer journal client advanced the active set | |
294 | ldout(m_cct, 20) << __func__ << ": " | |
295 | << "current_set=" << m_current_set << ", " | |
296 | << "active_set=" << active_set << dendl; | |
297 | ||
298 | uint64_t current_set = m_current_set; | |
299 | m_current_set = active_set; | |
300 | if (m_in_flight_advance_sets == 0 && m_in_flight_object_closes == 0) { | |
301 | ldout(m_cct, 20) << __func__ << ": closing current object set " | |
302 | << current_set << dendl; | |
303 | if (close_object_set(active_set)) { | |
304 | open_object_set(); | |
305 | } | |
306 | } | |
307 | } | |
308 | } | |
309 | ||
310 | void JournalRecorder::handle_closed(ObjectRecorder *object_recorder) { | |
311 | ldout(m_cct, 10) << __func__ << ": " << object_recorder->get_oid() << dendl; | |
312 | ||
313 | Mutex::Locker locker(m_lock); | |
314 | ||
315 | uint64_t object_number = object_recorder->get_object_number(); | |
316 | uint8_t splay_width = m_journal_metadata->get_splay_width(); | |
317 | uint8_t splay_offset = object_number % splay_width; | |
318 | ObjectRecorderPtr active_object_recorder = m_object_ptrs[splay_offset]; | |
319 | assert(active_object_recorder->get_object_number() == object_number); | |
320 | ||
321 | assert(m_in_flight_object_closes > 0); | |
322 | --m_in_flight_object_closes; | |
323 | ||
324 | // object closed after advance active set committed | |
325 | ldout(m_cct, 20) << __func__ << ": object " | |
326 | << active_object_recorder->get_oid() << " closed" << dendl; | |
327 | if (m_in_flight_object_closes == 0) { | |
328 | if (m_in_flight_advance_sets == 0) { | |
329 | // peer forced closing of object set | |
330 | open_object_set(); | |
331 | } else { | |
332 | // local overflow advanced object set | |
333 | advance_object_set(); | |
334 | } | |
335 | } | |
336 | } | |
337 | ||
338 | void JournalRecorder::handle_overflow(ObjectRecorder *object_recorder) { | |
339 | ldout(m_cct, 10) << __func__ << ": " << object_recorder->get_oid() << dendl; | |
340 | ||
341 | Mutex::Locker locker(m_lock); | |
342 | ||
343 | uint64_t object_number = object_recorder->get_object_number(); | |
344 | uint8_t splay_width = m_journal_metadata->get_splay_width(); | |
345 | uint8_t splay_offset = object_number % splay_width; | |
346 | ObjectRecorderPtr active_object_recorder = m_object_ptrs[splay_offset]; | |
347 | assert(active_object_recorder->get_object_number() == object_number); | |
348 | ||
349 | ldout(m_cct, 20) << __func__ << ": object " | |
350 | << active_object_recorder->get_oid() << " overflowed" | |
351 | << dendl; | |
352 | close_and_advance_object_set(object_number / splay_width); | |
353 | } | |
354 | ||
355 | } // namespace journal |