]> git.proxmox.com Git - ceph.git/blob - ceph/src/journal/ObjectRecorder.cc
import 14.2.4 nautilus point release
[ceph.git] / ceph / src / journal / ObjectRecorder.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "journal/ObjectRecorder.h"
5 #include "journal/Future.h"
6 #include "journal/Utils.h"
7 #include "include/ceph_assert.h"
8 #include "common/Timer.h"
9 #include "cls/journal/cls_journal_client.h"
10
11 #define dout_subsys ceph_subsys_journaler
12 #undef dout_prefix
13 #define dout_prefix *_dout << "ObjectRecorder: " << this << " " \
14 << __func__ << " (" << m_oid << "): "
15
16 using namespace cls::journal;
17 using std::shared_ptr;
18
19 namespace journal {
20
21 ObjectRecorder::ObjectRecorder(librados::IoCtx &ioctx, const std::string &oid,
22 uint64_t object_number, shared_ptr<Mutex> lock,
23 ContextWQ *work_queue, Handler *handler,
24 uint8_t order, int32_t max_in_flight_appends)
25 : RefCountedObject(NULL, 0), m_oid(oid), m_object_number(object_number),
26 m_cct(NULL), m_op_work_queue(work_queue), m_handler(handler),
27 m_order(order), m_soft_max_size(1 << m_order),
28 m_max_in_flight_appends(max_in_flight_appends), m_flush_handler(this),
29 m_lock(lock), m_last_flush_time(ceph_clock_now()), m_append_tid(0),
30 m_overflowed(false), m_object_closed(false), m_in_flight_flushes(false) {
31 m_ioctx.dup(ioctx);
32 m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
33 ceph_assert(m_handler != NULL);
34 ldout(m_cct, 20) << dendl;
35 }
36
37 ObjectRecorder::~ObjectRecorder() {
38 ldout(m_cct, 20) << dendl;
39 ceph_assert(m_pending_buffers.empty());
40 ceph_assert(m_in_flight_tids.empty());
41 ceph_assert(m_in_flight_appends.empty());
42 }
43
44 void ObjectRecorder::set_append_batch_options(int flush_interval,
45 uint64_t flush_bytes,
46 double flush_age) {
47 ldout(m_cct, 5) << "flush_interval=" << flush_interval << ", "
48 << "flush_bytes=" << flush_bytes << ", "
49 << "flush_age=" << flush_age << dendl;
50
51 ceph_assert(m_lock->is_locked());
52 m_flush_interval = flush_interval;
53 m_flush_bytes = flush_bytes;
54 m_flush_age = flush_age;
55 }
56
57 bool ObjectRecorder::append(AppendBuffers &&append_buffers) {
58 ldout(m_cct, 20) << "count=" << append_buffers.size() << dendl;
59
60 ceph_assert(m_lock->is_locked());
61
62 FutureImplPtr last_flushed_future;
63 for (auto& append_buffer : append_buffers) {
64 ldout(m_cct, 20) << *append_buffer.first << ", "
65 << "size=" << append_buffer.second.length() << dendl;
66 bool flush_requested = append_buffer.first->attach(&m_flush_handler);
67 if (flush_requested) {
68 last_flushed_future = append_buffer.first;
69 }
70
71 m_pending_buffers.push_back(append_buffer);
72 m_pending_bytes += append_buffer.second.length();
73 }
74
75 return send_appends(!!last_flushed_future, last_flushed_future);
76 }
77
78 void ObjectRecorder::flush(Context *on_safe) {
79 ldout(m_cct, 20) << dendl;
80
81 Future future;
82 {
83 Mutex::Locker locker(*m_lock);
84
85 // if currently handling flush notifications, wait so that
86 // we notify in the correct order (since lock is dropped on
87 // callback)
88 if (m_in_flight_flushes) {
89 m_in_flight_flushes_cond.Wait(*(m_lock.get()));
90 }
91
92 // attach the flush to the most recent append
93 if (!m_pending_buffers.empty()) {
94 future = Future(m_pending_buffers.rbegin()->first);
95 } else if (!m_in_flight_appends.empty()) {
96 AppendBuffers &append_buffers = m_in_flight_appends.rbegin()->second;
97 ceph_assert(!append_buffers.empty());
98 future = Future(append_buffers.rbegin()->first);
99 }
100 }
101
102 if (future.is_valid()) {
103 // cannot be invoked while the same lock context
104 m_op_work_queue->queue(new FunctionContext(
105 [future, on_safe] (int r) mutable {
106 future.flush(on_safe);
107 }));
108 } else {
109 on_safe->complete(0);
110 }
111 }
112
113 void ObjectRecorder::flush(const FutureImplPtr &future) {
114 ldout(m_cct, 20) << "flushing " << *future << dendl;
115
116 m_lock->Lock();
117 if (future->get_flush_handler().get() != &m_flush_handler) {
118 // if we don't own this future, re-issue the flush so that it hits the
119 // correct journal object owner
120 future->flush();
121 m_lock->Unlock();
122 return;
123 } else if (future->is_flush_in_progress()) {
124 m_lock->Unlock();
125 return;
126 }
127
128 bool overflowed = send_appends(true, future);
129 if (overflowed) {
130 notify_handler_unlock();
131 } else {
132 m_lock->Unlock();
133 }
134 }
135
136 void ObjectRecorder::claim_append_buffers(AppendBuffers *append_buffers) {
137 ldout(m_cct, 20) << dendl;
138
139 ceph_assert(m_lock->is_locked());
140 ceph_assert(m_in_flight_tids.empty());
141 ceph_assert(m_in_flight_appends.empty());
142 ceph_assert(m_object_closed || m_overflowed);
143
144 for (auto& append_buffer : m_pending_buffers) {
145 ldout(m_cct, 20) << "detached " << *append_buffer.first << dendl;
146 append_buffer.first->detach();
147 }
148 append_buffers->splice(append_buffers->end(), m_pending_buffers,
149 m_pending_buffers.begin(), m_pending_buffers.end());
150 }
151
152 bool ObjectRecorder::close() {
153 ceph_assert(m_lock->is_locked());
154
155 ldout(m_cct, 20) << dendl;
156 send_appends(true, {});
157
158 ceph_assert(!m_object_closed);
159 m_object_closed = true;
160 return (m_in_flight_tids.empty() && !m_in_flight_flushes);
161 }
162
163 void ObjectRecorder::handle_append_flushed(uint64_t tid, int r) {
164 ldout(m_cct, 20) << "tid=" << tid << ", r=" << r << dendl;
165
166 AppendBuffers append_buffers;
167 {
168 m_lock->Lock();
169 auto tid_iter = m_in_flight_tids.find(tid);
170 ceph_assert(tid_iter != m_in_flight_tids.end());
171 m_in_flight_tids.erase(tid_iter);
172
173 InFlightAppends::iterator iter = m_in_flight_appends.find(tid);
174 ceph_assert(iter != m_in_flight_appends.end());
175
176 if (r == -EOVERFLOW) {
177 ldout(m_cct, 10) << "append overflowed" << dendl;
178 m_overflowed = true;
179
180 // notify of overflow once all in-flight ops are complete
181 if (m_in_flight_tids.empty()) {
182 append_overflowed();
183 notify_handler_unlock();
184 } else {
185 m_lock->Unlock();
186 }
187 return;
188 }
189
190 append_buffers.swap(iter->second);
191 ceph_assert(!append_buffers.empty());
192
193 for (auto& append_buffer : append_buffers) {
194 m_object_bytes += append_buffer.second.length();
195 }
196 ldout(m_cct, 20) << "object_bytes=" << m_object_bytes << dendl;
197
198 m_in_flight_appends.erase(iter);
199 m_in_flight_flushes = true;
200 m_lock->Unlock();
201 }
202
203 // Flag the associated futures as complete.
204 for (auto& append_buffer : append_buffers) {
205 ldout(m_cct, 20) << *append_buffer.first << " marked safe" << dendl;
206 append_buffer.first->safe(r);
207 }
208
209 // wake up any flush requests that raced with a RADOS callback
210 m_lock->Lock();
211 m_in_flight_flushes = false;
212 m_in_flight_flushes_cond.Signal();
213
214 if (m_in_flight_appends.empty() && (m_object_closed || m_overflowed)) {
215 // all remaining unsent appends should be redirected to new object
216 notify_handler_unlock();
217 } else {
218 bool overflowed = send_appends(false, {});
219 if (overflowed) {
220 notify_handler_unlock();
221 } else {
222 m_lock->Unlock();
223 }
224 }
225 }
226
227 void ObjectRecorder::append_overflowed() {
228 ldout(m_cct, 10) << dendl;
229
230 ceph_assert(m_lock->is_locked());
231 ceph_assert(!m_in_flight_appends.empty());
232
233 InFlightAppends in_flight_appends;
234 in_flight_appends.swap(m_in_flight_appends);
235
236 AppendBuffers restart_append_buffers;
237 for (InFlightAppends::iterator it = in_flight_appends.begin();
238 it != in_flight_appends.end(); ++it) {
239 restart_append_buffers.insert(restart_append_buffers.end(),
240 it->second.begin(), it->second.end());
241 }
242
243 restart_append_buffers.splice(restart_append_buffers.end(),
244 m_pending_buffers,
245 m_pending_buffers.begin(),
246 m_pending_buffers.end());
247 restart_append_buffers.swap(m_pending_buffers);
248 }
249
250 bool ObjectRecorder::send_appends(bool force, FutureImplPtr flush_future) {
251 ldout(m_cct, 20) << dendl;
252
253 ceph_assert(m_lock->is_locked());
254 if (m_object_closed || m_overflowed) {
255 ldout(m_cct, 20) << "already closed or overflowed" << dendl;
256 return false;
257 }
258
259 if (m_pending_buffers.empty()) {
260 ldout(m_cct, 20) << "append buffers empty" << dendl;
261 return false;
262 }
263
264 if (!force &&
265 ((m_flush_interval > 0 && m_pending_buffers.size() >= m_flush_interval) ||
266 (m_flush_bytes > 0 && m_pending_bytes >= m_flush_bytes) ||
267 (m_flush_age > 0 &&
268 m_last_flush_time + m_flush_age >= ceph_clock_now()))) {
269 ldout(m_cct, 20) << "forcing batch flush" << dendl;
270 force = true;
271 }
272
273 auto max_in_flight_appends = m_max_in_flight_appends;
274 if (m_flush_interval > 0 || m_flush_bytes > 0 || m_flush_age > 0) {
275 if (!force && max_in_flight_appends == 0) {
276 ldout(m_cct, 20) << "attempting to batch AIO appends" << dendl;
277 max_in_flight_appends = 1;
278 }
279 } else if (max_in_flight_appends < 0) {
280 max_in_flight_appends = 0;
281 }
282
283 if (!force && max_in_flight_appends != 0 &&
284 static_cast<int32_t>(m_in_flight_tids.size()) >= max_in_flight_appends) {
285 ldout(m_cct, 10) << "max in flight appends reached" << dendl;
286 return false;
287 }
288
289 librados::ObjectWriteOperation op;
290 client::guard_append(&op, m_soft_max_size);
291
292 size_t append_bytes = 0;
293 AppendBuffers append_buffers;
294 for (auto it = m_pending_buffers.begin(); it != m_pending_buffers.end(); ) {
295 auto& future = it->first;
296 auto& bl = it->second;
297 auto size = m_object_bytes + m_in_flight_bytes + append_bytes + bl.length();
298 if (size == m_soft_max_size) {
299 ldout(m_cct, 10) << "object at capacity " << *future << dendl;
300 m_overflowed = true;
301 } else if (size > m_soft_max_size) {
302 ldout(m_cct, 10) << "object beyond capacity " << *future << dendl;
303 m_overflowed = true;
304 break;
305 }
306
307 bool flush_break = (force && flush_future && flush_future == future);
308 ldout(m_cct, 20) << "flushing " << *future << dendl;
309 future->set_flush_in_progress();
310
311 op.append(bl);
312 op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
313
314 append_bytes += bl.length();
315 append_buffers.push_back(*it);
316 it = m_pending_buffers.erase(it);
317
318 if (flush_break) {
319 ldout(m_cct, 20) << "stopping at requested flush future" << dendl;
320 break;
321 }
322 }
323
324 if (append_bytes > 0) {
325 m_last_flush_time = ceph_clock_now();
326
327 uint64_t append_tid = m_append_tid++;
328 m_in_flight_tids.insert(append_tid);
329 m_in_flight_appends[append_tid].swap(append_buffers);
330 m_in_flight_bytes += append_bytes;
331
332 ceph_assert(m_pending_bytes >= append_bytes);
333 m_pending_bytes -= append_bytes;
334
335 auto rados_completion = librados::Rados::aio_create_completion(
336 new C_AppendFlush(this, append_tid), nullptr, utils::rados_ctx_callback);
337 int r = m_ioctx.aio_operate(m_oid, rados_completion, &op);
338 ceph_assert(r == 0);
339 rados_completion->release();
340 ldout(m_cct, 20) << "flushing journal tid=" << append_tid << ", "
341 << "append_bytes=" << append_bytes << ", "
342 << "in_flight_bytes=" << m_in_flight_bytes << ", "
343 << "pending_bytes=" << m_pending_bytes << dendl;
344 }
345
346 return m_overflowed;
347 }
348
349 void ObjectRecorder::notify_handler_unlock() {
350 ceph_assert(m_lock->is_locked());
351 if (m_object_closed) {
352 m_lock->Unlock();
353 m_handler->closed(this);
354 } else {
355 // TODO need to delay completion until after aio_notify completes
356 m_lock->Unlock();
357 m_handler->overflow(this);
358 }
359 }
360
361 } // namespace journal