]> git.proxmox.com Git - ceph.git/blob - ceph/src/journal/ObjectRecorder.cc
d/control: depend on python3-yaml for ceph-mgr
[ceph.git] / ceph / src / journal / ObjectRecorder.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "journal/ObjectRecorder.h"
5 #include "journal/Future.h"
6 #include "journal/Utils.h"
7 #include "include/ceph_assert.h"
8 #include "common/Timer.h"
9 #include "common/errno.h"
10 #include "cls/journal/cls_journal_client.h"
11
12 #define dout_subsys ceph_subsys_journaler
13 #undef dout_prefix
14 #define dout_prefix *_dout << "ObjectRecorder: " << this << " " \
15 << __func__ << " (" << m_oid << "): "
16
17 using namespace cls::journal;
18 using std::shared_ptr;
19
20 namespace journal {
21
22 ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, std::string_view oid,
23 uint64_t object_number, ceph::mutex* lock,
24 ContextWQ *work_queue, Handler *handler,
25 uint8_t order, int32_t max_in_flight_appends)
26 : m_oid(oid), m_object_number(object_number),
27 m_op_work_queue(work_queue), m_handler(handler),
28 m_order(order), m_soft_max_size(1 << m_order),
29 m_max_in_flight_appends(max_in_flight_appends),
30 m_lock(lock)
31 {
32 m_ioctx.dup(ioctx);
33 m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
34 ceph_assert(m_handler != NULL);
35
36 librados::Rados rados(m_ioctx);
37 int8_t require_osd_release = 0;
38 int r = rados.get_min_compatible_osd(&require_osd_release);
39 if (r < 0) {
40 ldout(m_cct, 0) << "failed to retrieve min OSD release: "
41 << cpp_strerror(r) << dendl;
42 }
43 m_compat_mode = require_osd_release < CEPH_RELEASE_OCTOPUS;
44
45 ldout(m_cct, 20) << dendl;
46 }
47
48 ObjectRecorder::~ObjectRecorder() {
49 ldout(m_cct, 20) << dendl;
50 ceph_assert(m_pending_buffers.empty());
51 ceph_assert(m_in_flight_tids.empty());
52 ceph_assert(m_in_flight_appends.empty());
53 }
54
55 void ObjectRecorder::set_append_batch_options(int flush_interval,
56 uint64_t flush_bytes,
57 double flush_age) {
58 ldout(m_cct, 5) << "flush_interval=" << flush_interval << ", "
59 << "flush_bytes=" << flush_bytes << ", "
60 << "flush_age=" << flush_age << dendl;
61
62 ceph_assert(ceph_mutex_is_locked(*m_lock));
63 m_flush_interval = flush_interval;
64 m_flush_bytes = flush_bytes;
65 m_flush_age = flush_age;
66 }
67
68 bool ObjectRecorder::append(AppendBuffers &&append_buffers) {
69 ldout(m_cct, 20) << "count=" << append_buffers.size() << dendl;
70
71 ceph_assert(ceph_mutex_is_locked(*m_lock));
72
73 ceph::ref_t<FutureImpl> last_flushed_future;
74 auto flush_handler = get_flush_handler();
75 for (auto& append_buffer : append_buffers) {
76 ldout(m_cct, 20) << *append_buffer.first << ", "
77 << "size=" << append_buffer.second.length() << dendl;
78 bool flush_requested = append_buffer.first->attach(flush_handler);
79 if (flush_requested) {
80 last_flushed_future = append_buffer.first;
81 }
82
83 m_pending_buffers.push_back(append_buffer);
84 m_pending_bytes += append_buffer.second.length();
85 }
86
87 return send_appends(!!last_flushed_future, last_flushed_future);
88 }
89
90 void ObjectRecorder::flush(Context *on_safe) {
91 ldout(m_cct, 20) << dendl;
92
93 Future future;
94 {
95 std::unique_lock locker{*m_lock};
96
97 // if currently handling flush notifications, wait so that
98 // we notify in the correct order (since lock is dropped on
99 // callback)
100 while (m_in_flight_callbacks) {
101 m_in_flight_callbacks_cond.wait(locker);
102 }
103
104 // attach the flush to the most recent append
105 if (!m_pending_buffers.empty()) {
106 future = Future(m_pending_buffers.rbegin()->first);
107 } else if (!m_in_flight_appends.empty()) {
108 AppendBuffers &append_buffers = m_in_flight_appends.rbegin()->second;
109 ceph_assert(!append_buffers.empty());
110 future = Future(append_buffers.rbegin()->first);
111 }
112 }
113
114 if (future.is_valid()) {
115 // cannot be invoked while the same lock context
116 m_op_work_queue->queue(new LambdaContext(
117 [future, on_safe] (int r) mutable {
118 future.flush(on_safe);
119 }));
120 } else {
121 on_safe->complete(0);
122 }
123 }
124
125 void ObjectRecorder::flush(const ceph::ref_t<FutureImpl>& future) {
126 ldout(m_cct, 20) << "flushing " << *future << dendl;
127
128 std::unique_lock locker{*m_lock};
129 auto flush_handler = future->get_flush_handler();
130 auto my_handler = get_flush_handler();
131 if (flush_handler != my_handler) {
132 // if we don't own this future, re-issue the flush so that it hits the
133 // correct journal object owner
134 future->flush();
135 return;
136 } else if (future->is_flush_in_progress()) {
137 return;
138 }
139
140 if (!m_object_closed && !m_overflowed && send_appends(true, future)) {
141 m_in_flight_callbacks = true;
142 notify_handler_unlock(locker, true);
143 }
144 }
145
146 void ObjectRecorder::claim_append_buffers(AppendBuffers *append_buffers) {
147 ldout(m_cct, 20) << dendl;
148
149 ceph_assert(ceph_mutex_is_locked(*m_lock));
150 ceph_assert(m_in_flight_tids.empty());
151 ceph_assert(m_in_flight_appends.empty());
152 ceph_assert(m_object_closed || m_overflowed);
153
154 for (auto& append_buffer : m_pending_buffers) {
155 ldout(m_cct, 20) << "detached " << *append_buffer.first << dendl;
156 append_buffer.first->detach();
157 }
158 append_buffers->splice(append_buffers->end(), m_pending_buffers,
159 m_pending_buffers.begin(), m_pending_buffers.end());
160 }
161
162 bool ObjectRecorder::close() {
163 ceph_assert(ceph_mutex_is_locked(*m_lock));
164
165 ldout(m_cct, 20) << dendl;
166
167 send_appends(true, {});
168
169 ceph_assert(!m_object_closed);
170 m_object_closed = true;
171
172 if (!m_in_flight_tids.empty() || m_in_flight_callbacks) {
173 m_object_closed_notify = true;
174 return false;
175 }
176
177 return true;
178 }
179
180 void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
181 ldout(m_cct, 20) << "tid=" << tid << ", r=" << r << dendl;
182
183 std::unique_lock locker{*m_lock};
184 m_in_flight_callbacks = true;
185
186 auto tid_iter = m_in_flight_tids.find(tid);
187 ceph_assert(tid_iter != m_in_flight_tids.end());
188 m_in_flight_tids.erase(tid_iter);
189
190 InFlightAppends::iterator iter = m_in_flight_appends.find(tid);
191 ceph_assert(iter != m_in_flight_appends.end());
192
193 bool notify_overflowed = false;
194 AppendBuffers append_buffers;
195 if (r == -EOVERFLOW) {
196 ldout(m_cct, 10) << "append overflowed: "
197 << "idle=" << m_in_flight_tids.empty() << ", "
198 << "previous_overflow=" << m_overflowed << dendl;
199 if (m_in_flight_tids.empty()) {
200 append_overflowed();
201 }
202
203 if (!m_object_closed && !m_overflowed) {
204 notify_overflowed = true;
205 }
206 m_overflowed = true;
207 } else {
208 append_buffers.swap(iter->second);
209 ceph_assert(!append_buffers.empty());
210
211 for (auto& append_buffer : append_buffers) {
212 auto length = append_buffer.second.length();
213 m_object_bytes += length;
214
215 ceph_assert(m_in_flight_bytes >= length);
216 m_in_flight_bytes -= length;
217 }
218 ldout(m_cct, 20) << "object_bytes=" << m_object_bytes << dendl;
219
220 m_in_flight_appends.erase(iter);
221 }
222 locker.unlock();
223
224 // Flag the associated futures as complete.
225 for (auto& append_buffer : append_buffers) {
226 ldout(m_cct, 20) << *append_buffer.first << " marked safe" << dendl;
227 append_buffer.first->safe(r);
228 }
229
230 // attempt to kick off more appends to the object
231 locker.lock();
232 if (!m_object_closed && !m_overflowed && send_appends(false, {})) {
233 notify_overflowed = true;
234 }
235
236 ldout(m_cct, 20) << "pending tids=" << m_in_flight_tids << dendl;
237
238 // all remaining unsent appends should be redirected to new object
239 notify_handler_unlock(locker, notify_overflowed);
240 }
241
242 void ObjectRecorder::append_overflowed() {
243 ldout(m_cct, 10) << dendl;
244
245 ceph_assert(ceph_mutex_is_locked(*m_lock));
246 ceph_assert(!m_in_flight_appends.empty());
247
248 InFlightAppends in_flight_appends;
249 in_flight_appends.swap(m_in_flight_appends);
250
251 AppendBuffers restart_append_buffers;
252 for (InFlightAppends::iterator it = in_flight_appends.begin();
253 it != in_flight_appends.end(); ++it) {
254 restart_append_buffers.insert(restart_append_buffers.end(),
255 it->second.begin(), it->second.end());
256 }
257
258 restart_append_buffers.splice(restart_append_buffers.end(),
259 m_pending_buffers,
260 m_pending_buffers.begin(),
261 m_pending_buffers.end());
262 restart_append_buffers.swap(m_pending_buffers);
263 }
264
265 bool ObjectRecorder::send_appends(bool force, ceph::ref_t<FutureImpl> flush_future) {
266 ldout(m_cct, 20) << dendl;
267
268 ceph_assert(ceph_mutex_is_locked(*m_lock));
269 if (m_object_closed || m_overflowed) {
270 ldout(m_cct, 20) << "already closed or overflowed" << dendl;
271 return false;
272 }
273
274 if (m_pending_buffers.empty()) {
275 ldout(m_cct, 20) << "append buffers empty" << dendl;
276 return false;
277 }
278
279 if (!force &&
280 ((m_flush_interval > 0 && m_pending_buffers.size() >= m_flush_interval) ||
281 (m_flush_bytes > 0 && m_pending_bytes >= m_flush_bytes) ||
282 (m_flush_age > 0 && !m_last_flush_time.is_zero() &&
283 m_last_flush_time + m_flush_age <= ceph_clock_now()))) {
284 ldout(m_cct, 20) << "forcing batch flush" << dendl;
285 force = true;
286 }
287
288 // start tracking flush time after the first append event
289 if (m_last_flush_time.is_zero()) {
290 m_last_flush_time = ceph_clock_now();
291 }
292
293 auto max_in_flight_appends = m_max_in_flight_appends;
294 if (m_flush_interval > 0 || m_flush_bytes > 0 || m_flush_age > 0) {
295 if (!force && max_in_flight_appends == 0) {
296 ldout(m_cct, 20) << "attempting to batch AIO appends" << dendl;
297 max_in_flight_appends = 1;
298 }
299 } else if (max_in_flight_appends < 0) {
300 max_in_flight_appends = 0;
301 }
302
303 if (!force && max_in_flight_appends != 0 &&
304 static_cast<int32_t>(m_in_flight_tids.size()) >= max_in_flight_appends) {
305 ldout(m_cct, 10) << "max in flight appends reached" << dendl;
306 return false;
307 }
308
309 librados::ObjectWriteOperation op;
310 if (m_compat_mode) {
311 client::guard_append(&op, m_soft_max_size);
312 }
313
314 size_t append_bytes = 0;
315 AppendBuffers append_buffers;
316 bufferlist append_bl;
317 for (auto it = m_pending_buffers.begin(); it != m_pending_buffers.end(); ) {
318 auto& future = it->first;
319 auto& bl = it->second;
320 auto size = m_object_bytes + m_in_flight_bytes + append_bytes + bl.length();
321 if (size == m_soft_max_size) {
322 ldout(m_cct, 10) << "object at capacity (" << size << ") " << *future << dendl;
323 m_overflowed = true;
324 } else if (size > m_soft_max_size) {
325 ldout(m_cct, 10) << "object beyond capacity (" << size << ") " << *future << dendl;
326 m_overflowed = true;
327 break;
328 }
329
330 bool flush_break = (force && flush_future && flush_future == future);
331 ldout(m_cct, 20) << "flushing " << *future << dendl;
332 future->set_flush_in_progress();
333
334 if (m_compat_mode) {
335 op.append(bl);
336 op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
337 } else {
338 append_bl.append(bl);
339 }
340
341 append_bytes += bl.length();
342 append_buffers.push_back(*it);
343 it = m_pending_buffers.erase(it);
344
345 if (flush_break) {
346 ldout(m_cct, 20) << "stopping at requested flush future" << dendl;
347 break;
348 }
349 }
350
351 if (append_bytes > 0) {
352 m_last_flush_time = ceph_clock_now();
353
354 uint64_t append_tid = m_append_tid++;
355 m_in_flight_tids.insert(append_tid);
356 m_in_flight_appends[append_tid].swap(append_buffers);
357 m_in_flight_bytes += append_bytes;
358
359 ceph_assert(m_pending_bytes >= append_bytes);
360 m_pending_bytes -= append_bytes;
361
362 if (!m_compat_mode) {
363 client::append(&op, m_soft_max_size, append_bl);
364 }
365
366 auto rados_completion = librados::Rados::aio_create_completion(
367 new C_AppendFlush(this, append_tid), utils::rados_ctx_callback);
368 int r = m_ioctx.aio_operate(m_oid, rados_completion, &op);
369 ceph_assert(r == 0);
370 rados_completion->release();
371 ldout(m_cct, 20) << "flushing journal tid=" << append_tid << ", "
372 << "append_bytes=" << append_bytes << ", "
373 << "in_flight_bytes=" << m_in_flight_bytes << ", "
374 << "pending_bytes=" << m_pending_bytes << dendl;
375 }
376
377 return m_overflowed;
378 }
379
380 void ObjectRecorder::wake_up_flushes() {
381 ceph_assert(ceph_mutex_is_locked(*m_lock));
382 m_in_flight_callbacks = false;
383 m_in_flight_callbacks_cond.notify_all();
384 }
385
386 void ObjectRecorder::notify_handler_unlock(
387 std::unique_lock<ceph::mutex>& locker, bool notify_overflowed) {
388 ceph_assert(ceph_mutex_is_locked(*m_lock));
389 ceph_assert(m_in_flight_callbacks);
390
391 if (!m_object_closed && notify_overflowed) {
392 // TODO need to delay completion until after aio_notify completes
393 ldout(m_cct, 10) << "overflow" << dendl;
394 ceph_assert(m_overflowed);
395
396 locker.unlock();
397 m_handler->overflow(this);
398 locker.lock();
399 }
400
401 // wake up blocked flush requests
402 wake_up_flushes();
403
404 // An overflow notification might have blocked a close. A close
405 // notification could lead to the immediate destruction of this object
406 // so the object shouldn't be referenced anymore
407 bool object_closed_notify = false;
408 if (m_in_flight_tids.empty()) {
409 std::swap(object_closed_notify, m_object_closed_notify);
410 }
411 ceph_assert(m_object_closed || !object_closed_notify);
412 locker.unlock();
413
414 if (object_closed_notify) {
415 ldout(m_cct, 10) << "closed" << dendl;
416 m_handler->closed(this);
417 }
418 }
419
420 } // namespace journal