]> git.proxmox.com Git - ceph.git/blame - ceph/src/librbd/cache/pwl/Request.cc
update source to Ceph Pacific 16.2.2
[ceph.git] / ceph / src / librbd / cache / pwl / Request.cc
CommitLineData
f67539c2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "Request.h"
5#include "librbd/BlockGuard.h"
6#include "librbd/cache/pwl/LogEntry.h"
7#include "librbd/cache/pwl/AbstractWriteLog.h"
8
9#define dout_subsys ceph_subsys_rbd_pwl
10#undef dout_prefix
11#define dout_prefix *_dout << "librbd::cache::pwl::Request: " << this << " " \
12 << __func__ << ": "
13
14namespace librbd {
15namespace cache {
16namespace pwl {
17
18template <typename T>
19C_BlockIORequest<T>::C_BlockIORequest(T &pwl, const utime_t arrived, io::Extents &&extents,
20 bufferlist&& bl, const int fadvise_flags, Context *user_req)
21 : pwl(pwl), image_extents(std::move(extents)),
22 bl(std::move(bl)), fadvise_flags(fadvise_flags),
23 user_req(user_req), image_extents_summary(image_extents), m_arrived_time(arrived) {
24 ldout(pwl.get_context(), 99) << this << dendl;
25}
26
27template <typename T>
28C_BlockIORequest<T>::~C_BlockIORequest() {
29 ldout(pwl.get_context(), 99) << this << dendl;
30 ceph_assert(m_cell_released || !m_cell);
31}
32
33template <typename T>
34std::ostream &operator<<(std::ostream &os,
35 const C_BlockIORequest<T> &req) {
36 os << "image_extents=[" << req.image_extents << "], "
37 << "image_extents_summary=[" << req.image_extents_summary << "], "
38 << "bl=" << req.bl << ", "
39 << "user_req=" << req.user_req << ", "
40 << "m_user_req_completed=" << req.m_user_req_completed << ", "
41 << "m_deferred=" << req.m_deferred << ", "
42 << "detained=" << req.detained << ", "
43 << "waited_lanes=" << req.waited_lanes << ", "
44 << "waited_entries=" << req.waited_entries << ", "
45 << "waited_buffers=" << req.waited_buffers << "";
46 return os;
47}
48
49template <typename T>
50void C_BlockIORequest<T>::set_cell(BlockGuardCell *cell) {
51 ldout(pwl.get_context(), 20) << this << " cell=" << cell << dendl;
52 ceph_assert(cell);
53 ceph_assert(!m_cell);
54 m_cell = cell;
55}
56
57template <typename T>
58BlockGuardCell *C_BlockIORequest<T>::get_cell(void) {
59 ldout(pwl.get_context(), 20) << this << " cell=" << m_cell << dendl;
60 return m_cell;
61}
62
63template <typename T>
64void C_BlockIORequest<T>::release_cell() {
65 ldout(pwl.get_context(), 20) << this << " cell=" << m_cell << dendl;
66 ceph_assert(m_cell);
67 bool initial = false;
68 if (m_cell_released.compare_exchange_strong(initial, true)) {
69 pwl.release_guarded_request(m_cell);
70 } else {
71 ldout(pwl.get_context(), 5) << "cell " << m_cell << " already released for " << this << dendl;
72 }
73}
74
75template <typename T>
76void C_BlockIORequest<T>::complete_user_request(int r) {
77 bool initial = false;
78 if (m_user_req_completed.compare_exchange_strong(initial, true)) {
79 ldout(pwl.get_context(), 15) << this << " completing user req" << dendl;
80 m_user_req_completed_time = ceph_clock_now();
81 pwl.complete_user_request(user_req, r);
82 } else {
83 ldout(pwl.get_context(), 20) << this << " user req already completed" << dendl;
84 }
85}
86
87template <typename T>
88void C_BlockIORequest<T>::finish(int r) {
89 ldout(pwl.get_context(), 20) << this << dendl;
90
91 complete_user_request(r);
92 bool initial = false;
93 if (m_finish_called.compare_exchange_strong(initial, true)) {
94 ldout(pwl.get_context(), 15) << this << " finishing" << dendl;
95 finish_req(0);
96 } else {
97 ldout(pwl.get_context(), 20) << this << " already finished" << dendl;
98 ceph_assert(0);
99 }
100}
101
102template <typename T>
103void C_BlockIORequest<T>::deferred() {
104 bool initial = false;
105 if (m_deferred.compare_exchange_strong(initial, true)) {
106 deferred_handler();
107 }
108}
109
110template <typename T>
111C_WriteRequest<T>::C_WriteRequest(T &pwl, const utime_t arrived, io::Extents &&image_extents,
112 bufferlist&& bl, const int fadvise_flags, ceph::mutex &lock,
113 PerfCounters *perfcounter, Context *user_req)
114 : C_BlockIORequest<T>(pwl, arrived, std::move(image_extents), std::move(bl), fadvise_flags, user_req),
115 m_perfcounter(perfcounter), m_lock(lock) {
116 ldout(pwl.get_context(), 99) << this << dendl;
117}
118
119template <typename T>
120C_WriteRequest<T>::C_WriteRequest(T &pwl, const utime_t arrived, io::Extents &&image_extents,
121 bufferlist&& cmp_bl, bufferlist&& bl, uint64_t *mismatch_offset,
122 int fadvise_flags, ceph::mutex &lock, PerfCounters *perfcounter,
123 Context *user_req)
124 : C_BlockIORequest<T>(pwl, arrived, std::move(image_extents), std::move(bl), fadvise_flags, user_req),
125 mismatch_offset(mismatch_offset), cmp_bl(std::move(cmp_bl)),
126 m_perfcounter(perfcounter), m_lock(lock) {
127 is_comp_and_write = true;
128 ldout(pwl.get_context(), 20) << dendl;
129}
130
131template <typename T>
132C_WriteRequest<T>::~C_WriteRequest() {
133 ldout(pwl.get_context(), 99) << this << dendl;
134}
135
136template <typename T>
137std::ostream &operator<<(std::ostream &os,
138 const C_WriteRequest<T> &req) {
139 os << (C_BlockIORequest<T>&)req
140 << " m_resources.allocated=" << req.m_resources.allocated;
141 if (req.op_set) {
142 os << "op_set=" << *req.op_set;
143 }
144 return os;
145}
146
147template <typename T>
148void C_WriteRequest<T>::blockguard_acquired(GuardedRequestFunctionContext &guard_ctx) {
149 ldout(pwl.get_context(), 20) << __func__ << " write_req=" << this << " cell=" << guard_ctx.cell << dendl;
150
151 ceph_assert(guard_ctx.cell);
152 this->detained = guard_ctx.state.detained; /* overlapped */
153 this->m_queued = guard_ctx.state.queued; /* queued behind at least one barrier */
154 this->set_cell(guard_ctx.cell);
155}
156
157template <typename T>
158void C_WriteRequest<T>::finish_req(int r) {
159 ldout(pwl.get_context(), 15) << "write_req=" << this << " cell=" << this->get_cell() << dendl;
160
161 /* Completed to caller by here (in finish(), which calls this) */
162 utime_t now = ceph_clock_now();
163 if(is_comp_and_write && !compare_succeeded) {
164 update_req_stats(now);
165 return;
166 }
167 pwl.release_write_lanes(this);
168 ceph_assert(m_resources.allocated);
169 m_resources.allocated = false;
170 this->release_cell(); /* TODO: Consider doing this in appending state */
171 update_req_stats(now);
172}
173
174template <typename T>
175std::shared_ptr<WriteLogOperation> C_WriteRequest<T>::create_operation(
176 uint64_t offset, uint64_t len) {
177 return pwl.m_builder->create_write_log_operation(
178 *op_set, offset, len, pwl.get_context(),
179 pwl.m_builder->create_write_log_entry(op_set->sync_point->log_entry, offset, len));
180}
181
182template <typename T>
183void C_WriteRequest<T>::setup_log_operations(DeferredContexts &on_exit) {
184 GenericWriteLogEntries log_entries;
185 {
186 std::lock_guard locker(m_lock);
187 std::shared_ptr<SyncPoint> current_sync_point = pwl.get_current_sync_point();
188 if ((!pwl.get_persist_on_flush() && current_sync_point->log_entry->writes_completed) ||
189 (current_sync_point->log_entry->writes > MAX_WRITES_PER_SYNC_POINT) ||
190 (current_sync_point->log_entry->bytes > MAX_BYTES_PER_SYNC_POINT)) {
191 /* Create new sync point and persist the previous one. This sequenced
192 * write will bear a sync gen number shared with no already completed
193 * writes. A group of sequenced writes may be safely flushed concurrently
194 * if they all arrived before any of them completed. We'll insert one on
195 * an aio_flush() from the application. Here we're inserting one to cap
196 * the number of bytes and writes per sync point. When the application is
197 * not issuing flushes, we insert sync points to record some observed
198 * write concurrency information that enables us to safely issue >1 flush
199 * write (for writes observed here to have been in flight simultaneously)
200 * at a time in persist-on-write mode.
201 */
202 pwl.flush_new_sync_point(nullptr, on_exit);
203 current_sync_point = pwl.get_current_sync_point();
204 }
205 uint64_t current_sync_gen = pwl.get_current_sync_gen();
206 op_set =
207 make_unique<WriteLogOperationSet>(this->m_dispatched_time,
208 m_perfcounter,
209 current_sync_point,
210 pwl.get_persist_on_flush(),
211 pwl.get_context(), this);
212 ldout(pwl.get_context(), 20) << "write_req=" << *this << " op_set=" << op_set.get() << dendl;
213 ceph_assert(m_resources.allocated);
214 /* op_set->operations initialized differently for plain write or write same */
215 auto allocation = m_resources.buffers.begin();
216 uint64_t buffer_offset = 0;
217 for (auto &extent : this->image_extents) {
218 /* operation->on_write_persist connected to m_prior_log_entries_persisted Gather */
219 auto operation = this->create_operation(extent.first, extent.second);
220 this->op_set->operations.emplace_back(operation);
221
222 /* A WS is also a write */
223 ldout(pwl.get_context(), 20) << "write_req=" << *this << " op_set=" << op_set.get()
224 << " operation=" << operation << dendl;
225 log_entries.emplace_back(operation->log_entry);
226 if (!op_set->persist_on_flush) {
227 pwl.inc_last_op_sequence_num();
228 }
229 operation->init(true, allocation, current_sync_gen,
230 pwl.get_last_op_sequence_num(), this->bl, buffer_offset, op_set->persist_on_flush);
231 buffer_offset += operation->log_entry->write_bytes();
232 ldout(pwl.get_context(), 20) << "operation=[" << *operation << "]" << dendl;
233 allocation++;
234 }
235 }
236 /* All extent ops subs created */
237 op_set->extent_ops_appending->activate();
238 op_set->extent_ops_persist->activate();
239
240 pwl.add_into_log_map(log_entries, this);
241}
242
243template <typename T>
244void C_WriteRequest<T>::copy_cache() {
245 pwl.copy_bl_to_buffer(&m_resources, op_set);
246}
247
248template <typename T>
249bool C_WriteRequest<T>::append_write_request(std::shared_ptr<SyncPoint> sync_point) {
250 std::lock_guard locker(m_lock);
251 auto write_req_sp = this;
252 if (sync_point->earlier_sync_point) {
253 Context *schedule_append_ctx = new LambdaContext([this, write_req_sp](int r) {
254 write_req_sp->schedule_append();
255 });
256 sync_point->earlier_sync_point->on_sync_point_appending.push_back(schedule_append_ctx);
257 return true;
258 }
259 return false;
260}
261
262template <typename T>
263void C_WriteRequest<T>::schedule_append() {
264 ceph_assert(++m_appended == 1);
265 pwl.setup_schedule_append(this->op_set->operations, m_do_early_flush, this);
266}
267
268/**
269 * Attempts to allocate log resources for a write. Returns true if successful.
270 *
271 * Resources include 1 lane per extent, 1 log entry per extent, and the payload
272 * data space for each extent.
273 *
274 * Lanes are released after the write persists via release_write_lanes()
275 */
276template <typename T>
277bool C_WriteRequest<T>::alloc_resources() {
278 this->allocated_time = ceph_clock_now();
279 return pwl.alloc_resources(this);
280}
281
282/**
283 * Takes custody of write_req. Resources must already be allocated.
284 *
285 * Locking:
286 * Acquires lock
287 */
288template <typename T>
289void C_WriteRequest<T>::dispatch()
290{
291 CephContext *cct = pwl.get_context();
292 DeferredContexts on_exit;
293 utime_t now = ceph_clock_now();
294 this->m_dispatched_time = now;
295
296 ldout(cct, 15) << "write_req=" << this << " cell=" << this->get_cell() << dendl;
297 this->setup_log_operations(on_exit);
298
299 bool append_deferred = false;
300 if (!op_set->persist_on_flush &&
301 append_write_request(op_set->sync_point)) {
302 /* In persist-on-write mode, we defer the append of this write until the
303 * previous sync point is appending (meaning all the writes before it are
304 * persisted and that previous sync point can now appear in the
305 * log). Since we insert sync points in persist-on-write mode when writes
306 * have already completed to the current sync point, this limits us to
307 * one inserted sync point in flight at a time, and gives the next
308 * inserted sync point some time to accumulate a few writes if they
309 * arrive soon. Without this we can insert an absurd number of sync
310 * points, each with one or two writes. That uses a lot of log entries,
311 * and limits flushing to very few writes at a time. */
312 m_do_early_flush = false;
313 append_deferred = true;
314 } else {
315 /* The prior sync point is done, so we'll schedule append here. If this is
316 * persist-on-write, and probably still the caller's thread, we'll use this
317 * caller's thread to perform the persist & replication of the payload
318 * buffer. */
319 m_do_early_flush =
320 !(this->detained || this->m_queued || this->m_deferred || op_set->persist_on_flush);
321 }
322 if (!append_deferred) {
323 this->schedule_append();
324 }
325}
326
327template <typename T>
328C_FlushRequest<T>::C_FlushRequest(T &pwl, const utime_t arrived,
329 io::Extents &&image_extents,
330 bufferlist&& bl, const int fadvise_flags,
331 ceph::mutex &lock, PerfCounters *perfcounter,
332 Context *user_req)
333 : C_BlockIORequest<T>(pwl, arrived, std::move(image_extents), std::move(bl),
334 fadvise_flags, user_req),
335 m_lock(lock), m_perfcounter(perfcounter) {
336 ldout(pwl.get_context(), 20) << this << dendl;
337}
338
339template <typename T>
340void C_FlushRequest<T>::finish_req(int r) {
341 ldout(pwl.get_context(), 20) << "flush_req=" << this
342 << " cell=" << this->get_cell() << dendl;
343 /* Block guard already released */
344 ceph_assert(!this->get_cell());
345
346 /* Completed to caller by here */
347 utime_t now = ceph_clock_now();
348 m_perfcounter->tinc(l_librbd_pwl_aio_flush_latency, now - this->m_arrived_time);
349}
350
351template <typename T>
352bool C_FlushRequest<T>::alloc_resources() {
353 ldout(pwl.get_context(), 20) << "req type=" << get_name() << " "
354 << "req=[" << *this << "]" << dendl;
355 return pwl.alloc_resources(this);
356}
357
358template <typename T>
359void C_FlushRequest<T>::dispatch() {
360 utime_t now = ceph_clock_now();
361 ldout(pwl.get_context(), 20) << "req type=" << get_name() << " "
362 << "req=[" << *this << "]" << dendl;
363 ceph_assert(this->m_resources.allocated);
364 this->m_dispatched_time = now;
365
366 op = std::make_shared<SyncPointLogOperation>(m_lock,
367 to_append,
368 now,
369 m_perfcounter,
370 pwl.get_context());
371
372 m_perfcounter->inc(l_librbd_pwl_log_ops, 1);
373 pwl.schedule_append(op);
374}
375
376template <typename T>
377void C_FlushRequest<T>::setup_buffer_resources(
378 uint64_t *bytes_cached, uint64_t *bytes_dirtied, uint64_t *bytes_allocated,
379 uint64_t *number_lanes, uint64_t *number_log_entries,
380 uint64_t *number_unpublished_reserves) {
381 *number_log_entries = 1;
382}
383
384template <typename T>
385std::ostream &operator<<(std::ostream &os,
386 const C_FlushRequest<T> &req) {
387 os << (C_BlockIORequest<T>&)req
388 << " m_resources.allocated=" << req.m_resources.allocated;
389 return os;
390}
391
392template <typename T>
393C_DiscardRequest<T>::C_DiscardRequest(T &pwl, const utime_t arrived, io::Extents &&image_extents,
394 uint32_t discard_granularity_bytes, ceph::mutex &lock,
395 PerfCounters *perfcounter, Context *user_req)
396 : C_BlockIORequest<T>(pwl, arrived, std::move(image_extents), bufferlist(), 0, user_req),
397 m_discard_granularity_bytes(discard_granularity_bytes),
398 m_lock(lock),
399 m_perfcounter(perfcounter) {
400 ldout(pwl.get_context(), 20) << this << dendl;
401}
402
403template <typename T>
404C_DiscardRequest<T>::~C_DiscardRequest() {
405 ldout(pwl.get_context(), 20) << this << dendl;
406}
407
408template <typename T>
409bool C_DiscardRequest<T>::alloc_resources() {
410 ldout(pwl.get_context(), 20) << "req type=" << get_name() << " "
411 << "req=[" << *this << "]" << dendl;
412 return pwl.alloc_resources(this);
413}
414
415template <typename T>
416void C_DiscardRequest<T>::setup_log_operations() {
417 std::lock_guard locker(m_lock);
418 GenericWriteLogEntries log_entries;
419 for (auto &extent : this->image_extents) {
420 op = pwl.m_builder->create_discard_log_operation(
421 pwl.get_current_sync_point(), extent.first, extent.second,
422 m_discard_granularity_bytes, this->m_dispatched_time, m_perfcounter,
423 pwl.get_context());
424 log_entries.emplace_back(op->log_entry);
425 break;
426 }
427 uint64_t current_sync_gen = pwl.get_current_sync_gen();
428 bool persist_on_flush = pwl.get_persist_on_flush();
429 if (!persist_on_flush) {
430 pwl.inc_last_op_sequence_num();
431 }
432 auto discard_req = this;
433 Context *on_write_append = pwl.get_current_sync_point()->prior_persisted_gather_new_sub();
434
435 Context *on_write_persist = new LambdaContext(
436 [this, discard_req](int r) {
437 ldout(pwl.get_context(), 20) << "discard_req=" << discard_req
438 << " cell=" << discard_req->get_cell() << dendl;
439 ceph_assert(discard_req->get_cell());
440 discard_req->complete_user_request(r);
441 discard_req->release_cell();
442 });
443 op->init_op(current_sync_gen, persist_on_flush, pwl.get_last_op_sequence_num(),
444 on_write_persist, on_write_append);
445 pwl.add_into_log_map(log_entries, this);
446}
447
448template <typename T>
449void C_DiscardRequest<T>::dispatch() {
450 utime_t now = ceph_clock_now();
451 ldout(pwl.get_context(), 20) << "req type=" << get_name() << " "
452 << "req=[" << *this << "]" << dendl;
453 ceph_assert(this->m_resources.allocated);
454 this->m_dispatched_time = now;
455 setup_log_operations();
456 m_perfcounter->inc(l_librbd_pwl_log_ops, 1);
457 pwl.schedule_append(op);
458}
459
460template <typename T>
461void C_DiscardRequest<T>::setup_buffer_resources(
462 uint64_t *bytes_cached, uint64_t *bytes_dirtied, uint64_t *bytes_allocated,
463 uint64_t *number_lanes, uint64_t *number_log_entries,
464 uint64_t *number_unpublished_reserves) {
465 *number_log_entries = 1;
466 /* No bytes are allocated for a discard, but we count the discarded bytes
467 * as dirty. This means it's possible to have more bytes dirty than
468 * there are bytes cached or allocated. */
469 for (auto &extent : this->image_extents) {
470 *bytes_dirtied = extent.second;
471 break;
472 }
473}
474
475template <typename T>
476void C_DiscardRequest<T>::blockguard_acquired(GuardedRequestFunctionContext &guard_ctx) {
477 ldout(pwl.get_context(), 20) << " cell=" << guard_ctx.cell << dendl;
478
479 ceph_assert(guard_ctx.cell);
480 this->detained = guard_ctx.state.detained; /* overlapped */
481 this->set_cell(guard_ctx.cell);
482}
483
484template <typename T>
485std::ostream &operator<<(std::ostream &os,
486 const C_DiscardRequest<T> &req) {
487 os << (C_BlockIORequest<T>&)req;
488 if (req.op) {
489 os << " op=[" << *req.op << "]";
490 } else {
491 os << " op=nullptr";
492 }
493 return os;
494}
495
496template <typename T>
497C_WriteSameRequest<T>::C_WriteSameRequest(
498 T &pwl, const utime_t arrived, io::Extents &&image_extents,
499 bufferlist&& bl, const int fadvise_flags, ceph::mutex &lock,
500 PerfCounters *perfcounter, Context *user_req)
501 : C_WriteRequest<T>(pwl, arrived, std::move(image_extents), std::move(bl),
502 fadvise_flags, lock, perfcounter, user_req) {
503 ldout(pwl.get_context(), 20) << this << dendl;
504}
505
506template <typename T>
507C_WriteSameRequest<T>::~C_WriteSameRequest() {
508 ldout(pwl.get_context(), 20) << this << dendl;
509}
510
511template <typename T>
512void C_WriteSameRequest<T>::update_req_stats(utime_t &now) {
513 /* Write same stats excluded from most write stats
514 * because the read phase will make them look like slow writes in
515 * those histograms. */
516 ldout(pwl.get_context(), 20) << this << dendl;
517 utime_t comp_latency = now - this->m_arrived_time;
518 this->m_perfcounter->tinc(l_librbd_pwl_ws_latency, comp_latency);
519}
520
521template <typename T>
522std::shared_ptr<WriteLogOperation> C_WriteSameRequest<T>::create_operation(
523 uint64_t offset, uint64_t len) {
524 ceph_assert(this->image_extents.size() == 1);
525 WriteLogOperationSet &set = *this->op_set.get();
526 return pwl.m_builder->create_write_log_operation(
527 *this->op_set.get(), offset, len, this->bl.length(), pwl.get_context(),
528 pwl.m_builder->create_writesame_log_entry(set.sync_point->log_entry, offset,
529 len, this->bl.length()));
530}
531
532template <typename T>
533std::ostream &operator<<(std::ostream &os,
534 const C_WriteSameRequest<T> &req) {
535 os << (C_WriteRequest<T>&)req;
536 return os;
537}
538
539template <typename T>
540void C_WriteRequest<T>::update_req_stats(utime_t &now) {
541 /* Compare-and-write stats. Compare-and-write excluded from most write
542 * stats because the read phase will make them look like slow writes in
543 * those histograms. */
544 if(is_comp_and_write) {
545 if (!compare_succeeded) {
546 this->m_perfcounter->inc(l_librbd_pwl_cmp_fails, 1);
547 }
548 utime_t comp_latency = now - this->m_arrived_time;
549 this->m_perfcounter->tinc(l_librbd_pwl_cmp_latency, comp_latency);
550 }
551}
552
553} // namespace pwl
554} // namespace cache
555} // namespace librbd
556
557template class librbd::cache::pwl::C_BlockIORequest<librbd::cache::pwl::AbstractWriteLog<librbd::ImageCtx> >;
558template class librbd::cache::pwl::C_WriteRequest<librbd::cache::pwl::AbstractWriteLog<librbd::ImageCtx> >;
559template class librbd::cache::pwl::C_FlushRequest<librbd::cache::pwl::AbstractWriteLog<librbd::ImageCtx> >;
560template class librbd::cache::pwl::C_DiscardRequest<librbd::cache::pwl::AbstractWriteLog<librbd::ImageCtx> >;
561template class librbd::cache::pwl::C_WriteSameRequest<librbd::cache::pwl::AbstractWriteLog<librbd::ImageCtx> >;