]>
Commit | Line | Data |
---|---|---|
7c673cae FG |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "journal/ObjectRecorder.h" | |
5 | #include "journal/Future.h" | |
6 | #include "journal/Utils.h" | |
11fdf7f2 | 7 | #include "include/ceph_assert.h" |
7c673cae FG |
8 | #include "common/Timer.h" |
9 | #include "cls/journal/cls_journal_client.h" | |
10 | ||
11 | #define dout_subsys ceph_subsys_journaler | |
12 | #undef dout_prefix | |
494da23a TL |
13 | #define dout_prefix *_dout << "ObjectRecorder: " << this << " " \ |
14 | << __func__ << " (" << m_oid << "): " | |
7c673cae FG |
15 | |
16 | using namespace cls::journal; | |
17 | using std::shared_ptr; | |
18 | ||
19 | namespace journal { | |
20 | ||
21 | ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid, | |
22 | uint64_t object_number, shared_ptr<Mutex> lock, | |
494da23a TL |
23 | ContextWQ *work_queue, Handler *handler, |
24 | uint8_t order, int32_t max_in_flight_appends) | |
7c673cae | 25 | : RefCountedObject(NULL, 0), m_oid(oid), m_object_number(object_number), |
494da23a TL |
26 | m_cct(NULL), m_op_work_queue(work_queue), m_handler(handler), |
27 | m_order(order), m_soft_max_size(1 << m_order), | |
11fdf7f2 | 28 | m_max_in_flight_appends(max_in_flight_appends), m_flush_handler(this), |
494da23a TL |
29 | m_lock(lock), m_last_flush_time(ceph_clock_now()), m_append_tid(0), |
30 | m_overflowed(false), m_object_closed(false), m_in_flight_flushes(false) { | |
7c673cae FG |
31 | m_ioctx.dup(ioctx); |
32 | m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct()); | |
11fdf7f2 | 33 | ceph_assert(m_handler != NULL); |
494da23a | 34 | ldout(m_cct, 20) << dendl; |
7c673cae FG |
35 | } |
36 | ||
37 | ObjectRecorder::~ObjectRecorder() { | |
494da23a TL |
38 | ldout(m_cct, 20) << dendl; |
39 | ceph_assert(m_pending_buffers.empty()); | |
11fdf7f2 TL |
40 | ceph_assert(m_in_flight_tids.empty()); |
41 | ceph_assert(m_in_flight_appends.empty()); | |
7c673cae FG |
42 | } |
43 | ||
494da23a TL |
44 | void ObjectRecorder::set_append_batch_options(int flush_interval, |
45 | uint64_t flush_bytes, | |
46 | double flush_age) { | |
47 | ldout(m_cct, 5) << "flush_interval=" << flush_interval << ", " | |
48 | << "flush_bytes=" << flush_bytes << ", " | |
49 | << "flush_age=" << flush_age << dendl; | |
50 | ||
11fdf7f2 | 51 | ceph_assert(m_lock->is_locked()); |
494da23a TL |
52 | m_flush_interval = flush_interval; |
53 | m_flush_bytes = flush_bytes; | |
54 | m_flush_age = flush_age; | |
55 | } | |
7c673cae | 56 | |
494da23a TL |
57 | bool ObjectRecorder::append(AppendBuffers &&append_buffers) { |
58 | ldout(m_cct, 20) << "count=" << append_buffers.size() << dendl; | |
7c673cae | 59 | |
494da23a | 60 | ceph_assert(m_lock->is_locked()); |
7c673cae | 61 | |
494da23a TL |
62 | FutureImplPtr last_flushed_future; |
63 | for (auto& append_buffer : append_buffers) { | |
64 | ldout(m_cct, 20) << *append_buffer.first << ", " | |
65 | << "size=" << append_buffer.second.length() << dendl; | |
66 | bool flush_requested = append_buffer.first->attach(&m_flush_handler); | |
67 | if (flush_requested) { | |
68 | last_flushed_future = append_buffer.first; | |
7c673cae | 69 | } |
7c673cae | 70 | |
494da23a TL |
71 | m_pending_buffers.push_back(append_buffer); |
72 | m_pending_bytes += append_buffer.second.length(); | |
7c673cae | 73 | } |
494da23a TL |
74 | |
75 | return send_appends(!!last_flushed_future, last_flushed_future); | |
7c673cae FG |
76 | } |
77 | ||
78 | void ObjectRecorder::flush(Context *on_safe) { | |
494da23a | 79 | ldout(m_cct, 20) << dendl; |
7c673cae | 80 | |
7c673cae FG |
81 | Future future; |
82 | { | |
83 | Mutex::Locker locker(*m_lock); | |
84 | ||
85 | // if currently handling flush notifications, wait so that | |
86 | // we notify in the correct order (since lock is dropped on | |
87 | // callback) | |
88 | if (m_in_flight_flushes) { | |
89 | m_in_flight_flushes_cond.Wait(*(m_lock.get())); | |
90 | } | |
91 | ||
92 | // attach the flush to the most recent append | |
494da23a | 93 | if (!m_pending_buffers.empty()) { |
11fdf7f2 | 94 | future = Future(m_pending_buffers.rbegin()->first); |
7c673cae FG |
95 | } else if (!m_in_flight_appends.empty()) { |
96 | AppendBuffers &append_buffers = m_in_flight_appends.rbegin()->second; | |
11fdf7f2 | 97 | ceph_assert(!append_buffers.empty()); |
7c673cae FG |
98 | future = Future(append_buffers.rbegin()->first); |
99 | } | |
100 | } | |
101 | ||
102 | if (future.is_valid()) { | |
494da23a TL |
103 | // cannot be invoked while the same lock context |
104 | m_op_work_queue->queue(new FunctionContext( | |
105 | [future, on_safe] (int r) mutable { | |
106 | future.flush(on_safe); | |
107 | })); | |
7c673cae FG |
108 | } else { |
109 | on_safe->complete(0); | |
110 | } | |
111 | } | |
112 | ||
113 | void ObjectRecorder::flush(const FutureImplPtr &future) { | |
494da23a | 114 | ldout(m_cct, 20) << "flushing " << *future << dendl; |
7c673cae | 115 | |
494da23a | 116 | m_lock->Lock(); |
7c673cae FG |
117 | if (future->get_flush_handler().get() != &m_flush_handler) { |
118 | // if we don't own this future, re-issue the flush so that it hits the | |
119 | // correct journal object owner | |
120 | future->flush(); | |
494da23a | 121 | m_lock->Unlock(); |
7c673cae FG |
122 | return; |
123 | } else if (future->is_flush_in_progress()) { | |
494da23a | 124 | m_lock->Unlock(); |
7c673cae FG |
125 | return; |
126 | } | |
127 | ||
494da23a TL |
128 | bool overflowed = send_appends(true, future); |
129 | if (overflowed) { | |
130 | notify_handler_unlock(); | |
131 | } else { | |
132 | m_lock->Unlock(); | |
7c673cae | 133 | } |
7c673cae FG |
134 | } |
135 | ||
136 | void ObjectRecorder::claim_append_buffers(AppendBuffers *append_buffers) { | |
494da23a | 137 | ldout(m_cct, 20) << dendl; |
7c673cae | 138 | |
11fdf7f2 TL |
139 | ceph_assert(m_lock->is_locked()); |
140 | ceph_assert(m_in_flight_tids.empty()); | |
141 | ceph_assert(m_in_flight_appends.empty()); | |
142 | ceph_assert(m_object_closed || m_overflowed); | |
494da23a TL |
143 | |
144 | for (auto& append_buffer : m_pending_buffers) { | |
145 | ldout(m_cct, 20) << "detached " << *append_buffer.first << dendl; | |
146 | append_buffer.first->detach(); | |
147 | } | |
148 | append_buffers->splice(append_buffers->end(), m_pending_buffers, | |
149 | m_pending_buffers.begin(), m_pending_buffers.end()); | |
7c673cae FG |
150 | } |
151 | ||
152 | bool ObjectRecorder::close() { | |
11fdf7f2 | 153 | ceph_assert(m_lock->is_locked()); |
7c673cae | 154 | |
494da23a TL |
155 | ldout(m_cct, 20) << dendl; |
156 | send_appends(true, {}); | |
7c673cae | 157 | |
11fdf7f2 | 158 | ceph_assert(!m_object_closed); |
7c673cae | 159 | m_object_closed = true; |
494da23a | 160 | return (m_in_flight_tids.empty() && !m_in_flight_flushes); |
7c673cae FG |
161 | } |
162 | ||
163 | void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) { | |
494da23a | 164 | ldout(m_cct, 20) << "tid=" << tid << ", r=" << r << dendl; |
7c673cae FG |
165 | |
166 | AppendBuffers append_buffers; | |
167 | { | |
168 | m_lock->Lock(); | |
169 | auto tid_iter = m_in_flight_tids.find(tid); | |
11fdf7f2 | 170 | ceph_assert(tid_iter != m_in_flight_tids.end()); |
7c673cae FG |
171 | m_in_flight_tids.erase(tid_iter); |
172 | ||
173 | InFlightAppends::iterator iter = m_in_flight_appends.find(tid); | |
494da23a TL |
174 | ceph_assert(iter != m_in_flight_appends.end()); |
175 | ||
176 | if (r == -EOVERFLOW) { | |
177 | ldout(m_cct, 10) << "append overflowed" << dendl; | |
178 | m_overflowed = true; | |
7c673cae FG |
179 | |
180 | // notify of overflow once all in-flight ops are complete | |
494da23a | 181 | if (m_in_flight_tids.empty()) { |
7c673cae FG |
182 | append_overflowed(); |
183 | notify_handler_unlock(); | |
184 | } else { | |
185 | m_lock->Unlock(); | |
186 | } | |
187 | return; | |
188 | } | |
189 | ||
7c673cae | 190 | append_buffers.swap(iter->second); |
11fdf7f2 | 191 | ceph_assert(!append_buffers.empty()); |
7c673cae | 192 | |
494da23a TL |
193 | for (auto& append_buffer : append_buffers) { |
194 | m_object_bytes += append_buffer.second.length(); | |
195 | } | |
196 | ldout(m_cct, 20) << "object_bytes=" << m_object_bytes << dendl; | |
197 | ||
7c673cae FG |
198 | m_in_flight_appends.erase(iter); |
199 | m_in_flight_flushes = true; | |
200 | m_lock->Unlock(); | |
201 | } | |
202 | ||
203 | // Flag the associated futures as complete. | |
494da23a TL |
204 | for (auto& append_buffer : append_buffers) { |
205 | ldout(m_cct, 20) << *append_buffer.first << " marked safe" << dendl; | |
206 | append_buffer.first->safe(r); | |
7c673cae FG |
207 | } |
208 | ||
209 | // wake up any flush requests that raced with a RADOS callback | |
210 | m_lock->Lock(); | |
211 | m_in_flight_flushes = false; | |
212 | m_in_flight_flushes_cond.Signal(); | |
213 | ||
494da23a TL |
214 | if (m_in_flight_appends.empty() && (m_object_closed || m_overflowed)) { |
215 | // all remaining unsent appends should be redirected to new object | |
216 | notify_handler_unlock(); | |
217 | } else { | |
218 | bool overflowed = send_appends(false, {}); | |
219 | if (overflowed) { | |
11fdf7f2 | 220 | notify_handler_unlock(); |
11fdf7f2 TL |
221 | } else { |
222 | m_lock->Unlock(); | |
223 | } | |
7c673cae FG |
224 | } |
225 | } | |
226 | ||
227 | void ObjectRecorder::append_overflowed() { | |
494da23a | 228 | ldout(m_cct, 10) << dendl; |
7c673cae | 229 | |
11fdf7f2 TL |
230 | ceph_assert(m_lock->is_locked()); |
231 | ceph_assert(!m_in_flight_appends.empty()); | |
7c673cae | 232 | |
7c673cae FG |
233 | InFlightAppends in_flight_appends; |
234 | in_flight_appends.swap(m_in_flight_appends); | |
235 | ||
236 | AppendBuffers restart_append_buffers; | |
237 | for (InFlightAppends::iterator it = in_flight_appends.begin(); | |
238 | it != in_flight_appends.end(); ++it) { | |
239 | restart_append_buffers.insert(restart_append_buffers.end(), | |
240 | it->second.begin(), it->second.end()); | |
241 | } | |
242 | ||
243 | restart_append_buffers.splice(restart_append_buffers.end(), | |
494da23a TL |
244 | m_pending_buffers, |
245 | m_pending_buffers.begin(), | |
246 | m_pending_buffers.end()); | |
247 | restart_append_buffers.swap(m_pending_buffers); | |
7c673cae FG |
248 | } |
249 | ||
494da23a TL |
250 | bool ObjectRecorder::send_appends(bool force, FutureImplPtr flush_future) { |
251 | ldout(m_cct, 20) << dendl; | |
252 | ||
11fdf7f2 | 253 | ceph_assert(m_lock->is_locked()); |
494da23a TL |
254 | if (m_object_closed || m_overflowed) { |
255 | ldout(m_cct, 20) << "already closed or overflowed" << dendl; | |
256 | return false; | |
7c673cae FG |
257 | } |
258 | ||
494da23a TL |
259 | if (m_pending_buffers.empty()) { |
260 | ldout(m_cct, 20) << "append buffers empty" << dendl; | |
261 | return false; | |
7c673cae | 262 | } |
7c673cae | 263 | |
494da23a TL |
264 | if (!force && |
265 | ((m_flush_interval > 0 && m_pending_buffers.size() >= m_flush_interval) || | |
266 | (m_flush_bytes > 0 && m_pending_bytes >= m_flush_bytes) || | |
267 | (m_flush_age > 0 && | |
268 | m_last_flush_time + m_flush_age >= ceph_clock_now()))) { | |
269 | ldout(m_cct, 20) << "forcing batch flush" << dendl; | |
270 | force = true; | |
271 | } | |
11fdf7f2 | 272 | |
494da23a TL |
273 | auto max_in_flight_appends = m_max_in_flight_appends; |
274 | if (m_flush_interval > 0 || m_flush_bytes > 0 || m_flush_age > 0) { | |
275 | if (!force && max_in_flight_appends == 0) { | |
276 | ldout(m_cct, 20) << "attempting to batch AIO appends" << dendl; | |
277 | max_in_flight_appends = 1; | |
11fdf7f2 | 278 | } |
494da23a TL |
279 | } else if (max_in_flight_appends < 0) { |
280 | max_in_flight_appends = 0; | |
281 | } | |
11fdf7f2 | 282 | |
494da23a TL |
283 | if (!force && max_in_flight_appends != 0 && |
284 | static_cast<int32_t>(m_in_flight_tids.size()) >= max_in_flight_appends) { | |
285 | ldout(m_cct, 10) << "max in flight appends reached" << dendl; | |
286 | return false; | |
287 | } | |
288 | ||
289 | librados::ObjectWriteOperation op; | |
290 | client::guard_append(&op, m_soft_max_size); | |
291 | ||
292 | size_t append_bytes = 0; | |
293 | AppendBuffers append_buffers; | |
294 | for (auto it = m_pending_buffers.begin(); it != m_pending_buffers.end(); ) { | |
295 | auto& future = it->first; | |
296 | auto& bl = it->second; | |
297 | auto size = m_object_bytes + m_in_flight_bytes + append_bytes + bl.length(); | |
298 | if (size == m_soft_max_size) { | |
299 | ldout(m_cct, 10) << "object at capacity " << *future << dendl; | |
300 | m_overflowed = true; | |
301 | } else if (size > m_soft_max_size) { | |
302 | ldout(m_cct, 10) << "object beyond capacity " << *future << dendl; | |
303 | m_overflowed = true; | |
304 | break; | |
11fdf7f2 TL |
305 | } |
306 | ||
494da23a TL |
307 | bool flush_break = (force && flush_future && flush_future == future); |
308 | ldout(m_cct, 20) << "flushing " << *future << dendl; | |
309 | future->set_flush_in_progress(); | |
310 | ||
311 | op.append(bl); | |
312 | op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED); | |
313 | ||
314 | append_bytes += bl.length(); | |
315 | append_buffers.push_back(*it); | |
316 | it = m_pending_buffers.erase(it); | |
317 | ||
318 | if (flush_break) { | |
319 | ldout(m_cct, 20) << "stopping at requested flush future" << dendl; | |
320 | break; | |
321 | } | |
322 | } | |
323 | ||
324 | if (append_bytes > 0) { | |
325 | m_last_flush_time = ceph_clock_now(); | |
326 | ||
94b18763 | 327 | uint64_t append_tid = m_append_tid++; |
7c673cae | 328 | m_in_flight_tids.insert(append_tid); |
494da23a TL |
329 | m_in_flight_appends[append_tid].swap(append_buffers); |
330 | m_in_flight_bytes += append_bytes; | |
7c673cae | 331 | |
494da23a TL |
332 | ceph_assert(m_pending_bytes >= append_bytes); |
333 | m_pending_bytes -= append_bytes; | |
334 | ||
335 | auto rados_completion = librados::Rados::aio_create_completion( | |
336 | new C_AppendFlush(this, append_tid), nullptr, utils::rados_ctx_callback); | |
11fdf7f2 TL |
337 | int r = m_ioctx.aio_operate(m_oid, rados_completion, &op); |
338 | ceph_assert(r == 0); | |
494da23a TL |
339 | rados_completion->release(); |
340 | ldout(m_cct, 20) << "flushing journal tid=" << append_tid << ", " | |
341 | << "append_bytes=" << append_bytes << ", " | |
342 | << "in_flight_bytes=" << m_in_flight_bytes << ", " | |
343 | << "pending_bytes=" << m_pending_bytes << dendl; | |
7c673cae | 344 | } |
494da23a TL |
345 | |
346 | return m_overflowed; | |
7c673cae FG |
347 | } |
348 | ||
349 | void ObjectRecorder::notify_handler_unlock() { | |
11fdf7f2 | 350 | ceph_assert(m_lock->is_locked()); |
7c673cae FG |
351 | if (m_object_closed) { |
352 | m_lock->Unlock(); | |
353 | m_handler->closed(this); | |
354 | } else { | |
355 | // TODO need to delay completion until after aio_notify completes | |
356 | m_lock->Unlock(); | |
357 | m_handler->overflow(this); | |
358 | } | |
359 | } | |
360 | ||
361 | } // namespace journal |