1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
5 #include "librbd/BlockGuard.h"
6 #include "librbd/cache/pwl/LogEntry.h"
7 #include "librbd/cache/pwl/AbstractWriteLog.h"
9 #define dout_subsys ceph_subsys_rbd_pwl
11 #define dout_prefix *_dout << "librbd::cache::pwl::Request: " << this << " " \
21 C_BlockIORequest
<T
>::C_BlockIORequest(T
&pwl
, const utime_t arrived
, io::Extents
&&extents
,
22 bufferlist
&& bl
, const int fadvise_flags
, Context
*user_req
)
23 : pwl(pwl
), image_extents(std::move(extents
)),
24 bl(std::move(bl
)), fadvise_flags(fadvise_flags
),
25 user_req(user_req
), image_extents_summary(image_extents
), m_arrived_time(arrived
) {
26 ldout(pwl
.get_context(), 99) << this << dendl
;
30 C_BlockIORequest
<T
>::~C_BlockIORequest() {
31 ldout(pwl
.get_context(), 99) << this << dendl
;
32 ceph_assert(m_cell_released
|| !m_cell
);
36 std::ostream
&operator<<(std::ostream
&os
,
37 const C_BlockIORequest
<T
> &req
) {
38 os
<< "image_extents=" << req
.image_extents
39 << ", image_extents_summary=[" << req
.image_extents_summary
41 << ", user_req=" << req
.user_req
42 << ", m_user_req_completed=" << req
.m_user_req_completed
43 << ", m_deferred=" << req
.m_deferred
44 << ", detained=" << req
.detained
;
49 void C_BlockIORequest
<T
>::set_cell(BlockGuardCell
*cell
) {
50 ldout(pwl
.get_context(), 20) << this << " cell=" << cell
<< dendl
;
57 BlockGuardCell
*C_BlockIORequest
<T
>::get_cell(void) {
58 ldout(pwl
.get_context(), 20) << this << " cell=" << m_cell
<< dendl
;
63 void C_BlockIORequest
<T
>::release_cell() {
64 ldout(pwl
.get_context(), 20) << this << " cell=" << m_cell
<< dendl
;
67 if (m_cell_released
.compare_exchange_strong(initial
, true)) {
68 pwl
.release_guarded_request(m_cell
);
70 ldout(pwl
.get_context(), 5) << "cell " << m_cell
<< " already released for " << this << dendl
;
75 void C_BlockIORequest
<T
>::complete_user_request(int r
) {
77 if (m_user_req_completed
.compare_exchange_strong(initial
, true)) {
78 ldout(pwl
.get_context(), 15) << this << " completing user req" << dendl
;
79 m_user_req_completed_time
= ceph_clock_now();
80 pwl
.complete_user_request(user_req
, r
);
82 ldout(pwl
.get_context(), 20) << this << " user req already completed" << dendl
;
87 void C_BlockIORequest
<T
>::finish(int r
) {
88 ldout(pwl
.get_context(), 20) << this << dendl
;
90 complete_user_request(r
);
92 if (m_finish_called
.compare_exchange_strong(initial
, true)) {
93 ldout(pwl
.get_context(), 15) << this << " finishing" << dendl
;
96 ldout(pwl
.get_context(), 20) << this << " already finished" << dendl
;
101 template <typename T
>
102 void C_BlockIORequest
<T
>::deferred() {
103 bool initial
= false;
104 if (m_deferred
.compare_exchange_strong(initial
, true)) {
109 template <typename T
>
110 C_WriteRequest
<T
>::C_WriteRequest(T
&pwl
, const utime_t arrived
, io::Extents
&&image_extents
,
111 bufferlist
&& bl
, const int fadvise_flags
, ceph::mutex
&lock
,
112 PerfCounters
*perfcounter
, Context
*user_req
)
113 : C_BlockIORequest
<T
>(pwl
, arrived
, std::move(image_extents
), std::move(bl
), fadvise_flags
, user_req
),
114 m_perfcounter(perfcounter
), m_lock(lock
) {
115 ldout(pwl
.get_context(), 99) << this << dendl
;
118 template <typename T
>
119 C_WriteRequest
<T
>::C_WriteRequest(T
&pwl
, const utime_t arrived
, io::Extents
&&image_extents
,
120 bufferlist
&& cmp_bl
, bufferlist
&& bl
, uint64_t *mismatch_offset
,
121 int fadvise_flags
, ceph::mutex
&lock
, PerfCounters
*perfcounter
,
123 : C_BlockIORequest
<T
>(pwl
, arrived
, std::move(image_extents
), std::move(bl
), fadvise_flags
, user_req
),
124 mismatch_offset(mismatch_offset
), cmp_bl(std::move(cmp_bl
)),
125 m_perfcounter(perfcounter
), m_lock(lock
) {
126 is_comp_and_write
= true;
127 ldout(pwl
.get_context(), 20) << dendl
;
130 template <typename T
>
131 C_WriteRequest
<T
>::~C_WriteRequest() {
132 ldout(pwl
.get_context(), 99) << this << dendl
;
135 template <typename T
>
136 std::ostream
&operator<<(std::ostream
&os
,
137 const C_WriteRequest
<T
> &req
) {
138 os
<< (C_BlockIORequest
<T
>&)req
139 << " m_resources.allocated=" << req
.m_resources
.allocated
;
141 os
<< " op_set=[" << *req
.op_set
<< "]";
146 template <typename T
>
147 void C_WriteRequest
<T
>::blockguard_acquired(GuardedRequestFunctionContext
&guard_ctx
) {
148 ldout(pwl
.get_context(), 20) << __func__
<< " write_req=" << this << " cell=" << guard_ctx
.cell
<< dendl
;
150 ceph_assert(guard_ctx
.cell
);
151 this->detained
= guard_ctx
.state
.detained
; /* overlapped */
152 this->m_queued
= guard_ctx
.state
.queued
; /* queued behind at least one barrier */
153 this->set_cell(guard_ctx
.cell
);
156 template <typename T
>
157 void C_WriteRequest
<T
>::finish_req(int r
) {
158 ldout(pwl
.get_context(), 15) << "write_req=" << this << " cell=" << this->get_cell() << dendl
;
160 /* Completed to caller by here (in finish(), which calls this) */
161 utime_t now
= ceph_clock_now();
162 if(is_comp_and_write
&& !compare_succeeded
) {
163 update_req_stats(now
);
166 pwl
.release_write_lanes(this);
167 ceph_assert(m_resources
.allocated
);
168 m_resources
.allocated
= false;
169 this->release_cell(); /* TODO: Consider doing this in appending state */
170 update_req_stats(now
);
173 template <typename T
>
174 std::shared_ptr
<WriteLogOperation
> C_WriteRequest
<T
>::create_operation(
175 uint64_t offset
, uint64_t len
) {
176 return pwl
.m_builder
->create_write_log_operation(
177 *op_set
, offset
, len
, pwl
.get_context(),
178 pwl
.m_builder
->create_write_log_entry(op_set
->sync_point
->log_entry
, offset
, len
));
181 template <typename T
>
182 void C_WriteRequest
<T
>::setup_log_operations(DeferredContexts
&on_exit
) {
183 GenericWriteLogEntries log_entries
;
185 std::lock_guard
locker(m_lock
);
186 std::shared_ptr
<SyncPoint
> current_sync_point
= pwl
.get_current_sync_point();
187 if ((!pwl
.get_persist_on_flush() && current_sync_point
->log_entry
->writes_completed
) ||
188 (current_sync_point
->log_entry
->writes
> MAX_WRITES_PER_SYNC_POINT
) ||
189 (current_sync_point
->log_entry
->bytes
> MAX_BYTES_PER_SYNC_POINT
)) {
190 /* Create new sync point and persist the previous one. This sequenced
191 * write will bear a sync gen number shared with no already completed
192 * writes. A group of sequenced writes may be safely flushed concurrently
193 * if they all arrived before any of them completed. We'll insert one on
194 * an aio_flush() from the application. Here we're inserting one to cap
195 * the number of bytes and writes per sync point. When the application is
196 * not issuing flushes, we insert sync points to record some observed
197 * write concurrency information that enables us to safely issue >1 flush
198 * write (for writes observed here to have been in flight simultaneously)
199 * at a time in persist-on-write mode.
201 pwl
.flush_new_sync_point(nullptr, on_exit
);
202 current_sync_point
= pwl
.get_current_sync_point();
204 uint64_t current_sync_gen
= pwl
.get_current_sync_gen();
206 make_unique
<WriteLogOperationSet
>(this->m_dispatched_time
,
209 pwl
.get_persist_on_flush(),
210 pwl
.get_context(), this);
211 ldout(pwl
.get_context(), 20) << "write_req=[" << *this
212 << "], op_set=" << op_set
.get() << dendl
;
213 ceph_assert(m_resources
.allocated
);
214 /* op_set->operations initialized differently for plain write or write same */
215 auto allocation
= m_resources
.buffers
.begin();
216 uint64_t buffer_offset
= 0;
217 for (auto &extent
: this->image_extents
) {
218 /* operation->on_write_persist connected to m_prior_log_entries_persisted Gather */
219 auto operation
= this->create_operation(extent
.first
, extent
.second
);
220 this->op_set
->operations
.emplace_back(operation
);
222 /* A WS is also a write */
223 ldout(pwl
.get_context(), 20) << "write_req=[" << *this
224 << "], op_set=" << op_set
.get()
225 << ", operation=" << operation
<< dendl
;
226 log_entries
.emplace_back(operation
->log_entry
);
227 if (!op_set
->persist_on_flush
) {
228 pwl
.inc_last_op_sequence_num();
230 operation
->init(true, allocation
, current_sync_gen
,
231 pwl
.get_last_op_sequence_num(), this->bl
, buffer_offset
, op_set
->persist_on_flush
);
232 buffer_offset
+= operation
->log_entry
->write_bytes();
233 ldout(pwl
.get_context(), 20) << "operation=[" << *operation
<< "]" << dendl
;
237 /* All extent ops subs created */
238 op_set
->extent_ops_appending
->activate();
239 op_set
->extent_ops_persist
->activate();
241 pwl
.add_into_log_map(log_entries
, this);
244 template <typename T
>
245 void C_WriteRequest
<T
>::copy_cache() {
246 pwl
.copy_bl_to_buffer(&m_resources
, op_set
);
249 template <typename T
>
250 bool C_WriteRequest
<T
>::append_write_request(std::shared_ptr
<SyncPoint
> sync_point
) {
251 std::lock_guard
locker(m_lock
);
252 auto write_req_sp
= this;
253 if (sync_point
->earlier_sync_point
) {
254 Context
*schedule_append_ctx
= new LambdaContext([write_req_sp
](int r
) {
255 write_req_sp
->schedule_append();
257 sync_point
->earlier_sync_point
->on_sync_point_appending
.push_back(schedule_append_ctx
);
263 template <typename T
>
264 void C_WriteRequest
<T
>::schedule_append() {
265 ceph_assert(++m_appended
== 1);
266 pwl
.setup_schedule_append(this->op_set
->operations
, m_do_early_flush
, this);
270 * Attempts to allocate log resources for a write. Returns true if successful.
272 * Resources include 1 lane per extent, 1 log entry per extent, and the payload
273 * data space for each extent.
275 * Lanes are released after the write persists via release_write_lanes()
277 template <typename T
>
278 bool C_WriteRequest
<T
>::alloc_resources() {
279 this->allocated_time
= ceph_clock_now();
280 return pwl
.alloc_resources(this);
284 * Takes custody of write_req. Resources must already be allocated.
289 template <typename T
>
290 void C_WriteRequest
<T
>::dispatch()
292 CephContext
*cct
= pwl
.get_context();
293 DeferredContexts on_exit
;
294 utime_t now
= ceph_clock_now();
295 this->m_dispatched_time
= now
;
297 ldout(cct
, 15) << "write_req=" << this << " cell=" << this->get_cell() << dendl
;
298 this->setup_log_operations(on_exit
);
300 bool append_deferred
= false;
301 if (!op_set
->persist_on_flush
&&
302 append_write_request(op_set
->sync_point
)) {
303 /* In persist-on-write mode, we defer the append of this write until the
304 * previous sync point is appending (meaning all the writes before it are
305 * persisted and that previous sync point can now appear in the
306 * log). Since we insert sync points in persist-on-write mode when writes
307 * have already completed to the current sync point, this limits us to
308 * one inserted sync point in flight at a time, and gives the next
309 * inserted sync point some time to accumulate a few writes if they
310 * arrive soon. Without this we can insert an absurd number of sync
311 * points, each with one or two writes. That uses a lot of log entries,
312 * and limits flushing to very few writes at a time. */
313 m_do_early_flush
= false;
314 append_deferred
= true;
316 /* The prior sync point is done, so we'll schedule append here. If this is
317 * persist-on-write, and probably still the caller's thread, we'll use this
318 * caller's thread to perform the persist & replication of the payload
321 !(this->detained
|| this->m_queued
|| this->m_deferred
|| op_set
->persist_on_flush
);
323 if (!append_deferred
) {
324 this->schedule_append();
328 template <typename T
>
329 C_FlushRequest
<T
>::C_FlushRequest(T
&pwl
, const utime_t arrived
,
330 io::Extents
&&image_extents
,
331 bufferlist
&& bl
, const int fadvise_flags
,
332 ceph::mutex
&lock
, PerfCounters
*perfcounter
,
334 : C_BlockIORequest
<T
>(pwl
, arrived
, std::move(image_extents
), std::move(bl
),
335 fadvise_flags
, user_req
),
336 m_lock(lock
), m_perfcounter(perfcounter
) {
337 ldout(pwl
.get_context(), 20) << this << dendl
;
340 template <typename T
>
341 void C_FlushRequest
<T
>::finish_req(int r
) {
342 ldout(pwl
.get_context(), 20) << "flush_req=" << this
343 << " cell=" << this->get_cell() << dendl
;
344 /* Block guard already released */
345 ceph_assert(!this->get_cell());
347 /* Completed to caller by here */
348 utime_t now
= ceph_clock_now();
349 m_perfcounter
->tinc(l_librbd_pwl_aio_flush_latency
, now
- this->m_arrived_time
);
352 template <typename T
>
353 bool C_FlushRequest
<T
>::alloc_resources() {
354 ldout(pwl
.get_context(), 20) << "req type=" << get_name()
355 << " req=[" << *this << "]" << dendl
;
356 return pwl
.alloc_resources(this);
359 template <typename T
>
360 void C_FlushRequest
<T
>::dispatch() {
361 utime_t now
= ceph_clock_now();
362 ldout(pwl
.get_context(), 20) << "req type=" << get_name()
363 << " req=[" << *this << "]" << dendl
;
364 ceph_assert(this->m_resources
.allocated
);
365 this->m_dispatched_time
= now
;
367 op
= std::make_shared
<SyncPointLogOperation
>(m_lock
,
373 m_perfcounter
->inc(l_librbd_pwl_log_ops
, 1);
374 pwl
.schedule_append(op
);
377 template <typename T
>
378 void C_FlushRequest
<T
>::setup_buffer_resources(
379 uint64_t *bytes_cached
, uint64_t *bytes_dirtied
, uint64_t *bytes_allocated
,
380 uint64_t *number_lanes
, uint64_t *number_log_entries
,
381 uint64_t *number_unpublished_reserves
) {
382 *number_log_entries
= 1;
385 template <typename T
>
386 std::ostream
&operator<<(std::ostream
&os
,
387 const C_FlushRequest
<T
> &req
) {
388 os
<< (C_BlockIORequest
<T
>&)req
389 << " m_resources.allocated=" << req
.m_resources
.allocated
;
393 template <typename T
>
394 C_DiscardRequest
<T
>::C_DiscardRequest(T
&pwl
, const utime_t arrived
, io::Extents
&&image_extents
,
395 uint32_t discard_granularity_bytes
, ceph::mutex
&lock
,
396 PerfCounters
*perfcounter
, Context
*user_req
)
397 : C_BlockIORequest
<T
>(pwl
, arrived
, std::move(image_extents
), bufferlist(), 0, user_req
),
398 m_discard_granularity_bytes(discard_granularity_bytes
),
400 m_perfcounter(perfcounter
) {
401 ldout(pwl
.get_context(), 20) << this << dendl
;
404 template <typename T
>
405 C_DiscardRequest
<T
>::~C_DiscardRequest() {
406 ldout(pwl
.get_context(), 20) << this << dendl
;
409 template <typename T
>
410 bool C_DiscardRequest
<T
>::alloc_resources() {
411 ldout(pwl
.get_context(), 20) << "req type=" << get_name()
412 << " req=[" << *this << "]" << dendl
;
413 return pwl
.alloc_resources(this);
416 template <typename T
>
417 void C_DiscardRequest
<T
>::setup_log_operations() {
418 std::lock_guard
locker(m_lock
);
419 GenericWriteLogEntries log_entries
;
420 for (auto &extent
: this->image_extents
) {
421 op
= pwl
.m_builder
->create_discard_log_operation(
422 pwl
.get_current_sync_point(), extent
.first
, extent
.second
,
423 m_discard_granularity_bytes
, this->m_dispatched_time
, m_perfcounter
,
425 log_entries
.emplace_back(op
->log_entry
);
428 uint64_t current_sync_gen
= pwl
.get_current_sync_gen();
429 bool persist_on_flush
= pwl
.get_persist_on_flush();
430 if (!persist_on_flush
) {
431 pwl
.inc_last_op_sequence_num();
433 auto discard_req
= this;
434 Context
*on_write_append
= pwl
.get_current_sync_point()->prior_persisted_gather_new_sub();
436 Context
*on_write_persist
= new LambdaContext(
437 [this, discard_req
](int r
) {
438 ldout(pwl
.get_context(), 20) << "discard_req=" << discard_req
439 << " cell=" << discard_req
->get_cell() << dendl
;
440 ceph_assert(discard_req
->get_cell());
441 discard_req
->complete_user_request(r
);
442 discard_req
->release_cell();
444 op
->init_op(current_sync_gen
, persist_on_flush
, pwl
.get_last_op_sequence_num(),
445 on_write_persist
, on_write_append
);
446 pwl
.add_into_log_map(log_entries
, this);
449 template <typename T
>
450 void C_DiscardRequest
<T
>::dispatch() {
451 utime_t now
= ceph_clock_now();
452 ldout(pwl
.get_context(), 20) << "req type=" << get_name()
453 << " req=[" << *this << "]" << dendl
;
454 ceph_assert(this->m_resources
.allocated
);
455 this->m_dispatched_time
= now
;
456 setup_log_operations();
457 m_perfcounter
->inc(l_librbd_pwl_log_ops
, 1);
458 pwl
.schedule_append(op
);
461 template <typename T
>
462 void C_DiscardRequest
<T
>::setup_buffer_resources(
463 uint64_t *bytes_cached
, uint64_t *bytes_dirtied
, uint64_t *bytes_allocated
,
464 uint64_t *number_lanes
, uint64_t *number_log_entries
,
465 uint64_t *number_unpublished_reserves
) {
466 *number_log_entries
= 1;
467 /* No bytes are allocated for a discard, but we count the discarded bytes
468 * as dirty. This means it's possible to have more bytes dirty than
469 * there are bytes cached or allocated. */
470 for (auto &extent
: this->image_extents
) {
471 *bytes_dirtied
= extent
.second
;
476 template <typename T
>
477 void C_DiscardRequest
<T
>::blockguard_acquired(GuardedRequestFunctionContext
&guard_ctx
) {
478 ldout(pwl
.get_context(), 20) << " cell=" << guard_ctx
.cell
<< dendl
;
480 ceph_assert(guard_ctx
.cell
);
481 this->detained
= guard_ctx
.state
.detained
; /* overlapped */
482 this->set_cell(guard_ctx
.cell
);
485 template <typename T
>
486 std::ostream
&operator<<(std::ostream
&os
,
487 const C_DiscardRequest
<T
> &req
) {
488 os
<< (C_BlockIORequest
<T
>&)req
;
490 os
<< " op=[" << *req
.op
<< "]";
497 template <typename T
>
498 C_WriteSameRequest
<T
>::C_WriteSameRequest(
499 T
&pwl
, const utime_t arrived
, io::Extents
&&image_extents
,
500 bufferlist
&& bl
, const int fadvise_flags
, ceph::mutex
&lock
,
501 PerfCounters
*perfcounter
, Context
*user_req
)
502 : C_WriteRequest
<T
>(pwl
, arrived
, std::move(image_extents
), std::move(bl
),
503 fadvise_flags
, lock
, perfcounter
, user_req
) {
504 ldout(pwl
.get_context(), 20) << this << dendl
;
507 template <typename T
>
508 C_WriteSameRequest
<T
>::~C_WriteSameRequest() {
509 ldout(pwl
.get_context(), 20) << this << dendl
;
512 template <typename T
>
513 void C_WriteSameRequest
<T
>::update_req_stats(utime_t
&now
) {
514 /* Write same stats excluded from most write stats
515 * because the read phase will make them look like slow writes in
516 * those histograms. */
517 ldout(pwl
.get_context(), 20) << this << dendl
;
518 utime_t comp_latency
= now
- this->m_arrived_time
;
519 this->m_perfcounter
->tinc(l_librbd_pwl_ws_latency
, comp_latency
);
522 template <typename T
>
523 std::shared_ptr
<WriteLogOperation
> C_WriteSameRequest
<T
>::create_operation(
524 uint64_t offset
, uint64_t len
) {
525 ceph_assert(this->image_extents
.size() == 1);
526 WriteLogOperationSet
&set
= *this->op_set
.get();
527 return pwl
.m_builder
->create_write_log_operation(
528 *this->op_set
.get(), offset
, len
, this->bl
.length(), pwl
.get_context(),
529 pwl
.m_builder
->create_writesame_log_entry(set
.sync_point
->log_entry
, offset
,
530 len
, this->bl
.length()));
533 template <typename T
>
534 std::ostream
&operator<<(std::ostream
&os
,
535 const C_WriteSameRequest
<T
> &req
) {
536 os
<< (C_WriteRequest
<T
>&)req
;
540 template <typename T
>
541 void C_WriteRequest
<T
>::update_req_stats(utime_t
&now
) {
542 /* Compare-and-write stats. Compare-and-write excluded from most write
543 * stats because the read phase will make them look like slow writes in
544 * those histograms. */
545 if(is_comp_and_write
) {
546 if (!compare_succeeded
) {
547 this->m_perfcounter
->inc(l_librbd_pwl_cmp_fails
, 1);
549 utime_t comp_latency
= now
- this->m_arrived_time
;
550 this->m_perfcounter
->tinc(l_librbd_pwl_cmp_latency
, comp_latency
);
556 } // namespace librbd
558 template class librbd::cache::pwl::C_BlockIORequest
<librbd::cache::pwl::AbstractWriteLog
<librbd::ImageCtx
> >;
559 template class librbd::cache::pwl::C_WriteRequest
<librbd::cache::pwl::AbstractWriteLog
<librbd::ImageCtx
> >;
560 template class librbd::cache::pwl::C_FlushRequest
<librbd::cache::pwl::AbstractWriteLog
<librbd::ImageCtx
> >;
561 template class librbd::cache::pwl::C_DiscardRequest
<librbd::cache::pwl::AbstractWriteLog
<librbd::ImageCtx
> >;
562 template class librbd::cache::pwl::C_WriteSameRequest
<librbd::cache::pwl::AbstractWriteLog
<librbd::ImageCtx
> >;