1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
5 #include "librbd/BlockGuard.h"
6 #include "librbd/cache/pwl/LogEntry.h"
7 #include "librbd/cache/pwl/AbstractWriteLog.h"
9 #define dout_subsys ceph_subsys_rbd_pwl
11 #define dout_prefix *_dout << "librbd::cache::pwl::Request: " << this << " " \
19 C_BlockIORequest
<T
>::C_BlockIORequest(T
&pwl
, const utime_t arrived
, io::Extents
&&extents
,
20 bufferlist
&& bl
, const int fadvise_flags
, Context
*user_req
)
21 : pwl(pwl
), image_extents(std::move(extents
)),
22 bl(std::move(bl
)), fadvise_flags(fadvise_flags
),
23 user_req(user_req
), image_extents_summary(image_extents
), m_arrived_time(arrived
) {
24 ldout(pwl
.get_context(), 99) << this << dendl
;
28 C_BlockIORequest
<T
>::~C_BlockIORequest() {
29 ldout(pwl
.get_context(), 99) << this << dendl
;
30 ceph_assert(m_cell_released
|| !m_cell
);
34 std::ostream
&operator<<(std::ostream
&os
,
35 const C_BlockIORequest
<T
> &req
) {
36 os
<< "image_extents=[" << req
.image_extents
<< "], "
37 << "image_extents_summary=[" << req
.image_extents_summary
<< "], "
38 << "bl=" << req
.bl
<< ", "
39 << "user_req=" << req
.user_req
<< ", "
40 << "m_user_req_completed=" << req
.m_user_req_completed
<< ", "
41 << "m_deferred=" << req
.m_deferred
<< ", "
42 << "detained=" << req
.detained
<< ", "
43 << "waited_lanes=" << req
.waited_lanes
<< ", "
44 << "waited_entries=" << req
.waited_entries
<< ", "
45 << "waited_buffers=" << req
.waited_buffers
<< "";
50 void C_BlockIORequest
<T
>::set_cell(BlockGuardCell
*cell
) {
51 ldout(pwl
.get_context(), 20) << this << " cell=" << cell
<< dendl
;
58 BlockGuardCell
*C_BlockIORequest
<T
>::get_cell(void) {
59 ldout(pwl
.get_context(), 20) << this << " cell=" << m_cell
<< dendl
;
64 void C_BlockIORequest
<T
>::release_cell() {
65 ldout(pwl
.get_context(), 20) << this << " cell=" << m_cell
<< dendl
;
68 if (m_cell_released
.compare_exchange_strong(initial
, true)) {
69 pwl
.release_guarded_request(m_cell
);
71 ldout(pwl
.get_context(), 5) << "cell " << m_cell
<< " already released for " << this << dendl
;
76 void C_BlockIORequest
<T
>::complete_user_request(int r
) {
78 if (m_user_req_completed
.compare_exchange_strong(initial
, true)) {
79 ldout(pwl
.get_context(), 15) << this << " completing user req" << dendl
;
80 m_user_req_completed_time
= ceph_clock_now();
81 pwl
.complete_user_request(user_req
, r
);
83 ldout(pwl
.get_context(), 20) << this << " user req already completed" << dendl
;
88 void C_BlockIORequest
<T
>::finish(int r
) {
89 ldout(pwl
.get_context(), 20) << this << dendl
;
91 complete_user_request(r
);
93 if (m_finish_called
.compare_exchange_strong(initial
, true)) {
94 ldout(pwl
.get_context(), 15) << this << " finishing" << dendl
;
97 ldout(pwl
.get_context(), 20) << this << " already finished" << dendl
;
102 template <typename T
>
103 void C_BlockIORequest
<T
>::deferred() {
104 bool initial
= false;
105 if (m_deferred
.compare_exchange_strong(initial
, true)) {
110 template <typename T
>
111 C_WriteRequest
<T
>::C_WriteRequest(T
&pwl
, const utime_t arrived
, io::Extents
&&image_extents
,
112 bufferlist
&& bl
, const int fadvise_flags
, ceph::mutex
&lock
,
113 PerfCounters
*perfcounter
, Context
*user_req
)
114 : C_BlockIORequest
<T
>(pwl
, arrived
, std::move(image_extents
), std::move(bl
), fadvise_flags
, user_req
),
115 m_perfcounter(perfcounter
), m_lock(lock
) {
116 ldout(pwl
.get_context(), 99) << this << dendl
;
119 template <typename T
>
120 C_WriteRequest
<T
>::C_WriteRequest(T
&pwl
, const utime_t arrived
, io::Extents
&&image_extents
,
121 bufferlist
&& cmp_bl
, bufferlist
&& bl
, uint64_t *mismatch_offset
,
122 int fadvise_flags
, ceph::mutex
&lock
, PerfCounters
*perfcounter
,
124 : C_BlockIORequest
<T
>(pwl
, arrived
, std::move(image_extents
), std::move(bl
), fadvise_flags
, user_req
),
125 mismatch_offset(mismatch_offset
), cmp_bl(std::move(cmp_bl
)),
126 m_perfcounter(perfcounter
), m_lock(lock
) {
127 is_comp_and_write
= true;
128 ldout(pwl
.get_context(), 20) << dendl
;
131 template <typename T
>
132 C_WriteRequest
<T
>::~C_WriteRequest() {
133 ldout(pwl
.get_context(), 99) << this << dendl
;
136 template <typename T
>
137 std::ostream
&operator<<(std::ostream
&os
,
138 const C_WriteRequest
<T
> &req
) {
139 os
<< (C_BlockIORequest
<T
>&)req
140 << " m_resources.allocated=" << req
.m_resources
.allocated
;
142 os
<< "op_set=" << *req
.op_set
;
147 template <typename T
>
148 void C_WriteRequest
<T
>::blockguard_acquired(GuardedRequestFunctionContext
&guard_ctx
) {
149 ldout(pwl
.get_context(), 20) << __func__
<< " write_req=" << this << " cell=" << guard_ctx
.cell
<< dendl
;
151 ceph_assert(guard_ctx
.cell
);
152 this->detained
= guard_ctx
.state
.detained
; /* overlapped */
153 this->m_queued
= guard_ctx
.state
.queued
; /* queued behind at least one barrier */
154 this->set_cell(guard_ctx
.cell
);
157 template <typename T
>
158 void C_WriteRequest
<T
>::finish_req(int r
) {
159 ldout(pwl
.get_context(), 15) << "write_req=" << this << " cell=" << this->get_cell() << dendl
;
161 /* Completed to caller by here (in finish(), which calls this) */
162 utime_t now
= ceph_clock_now();
163 if(is_comp_and_write
&& !compare_succeeded
) {
164 update_req_stats(now
);
167 pwl
.release_write_lanes(this);
168 ceph_assert(m_resources
.allocated
);
169 m_resources
.allocated
= false;
170 this->release_cell(); /* TODO: Consider doing this in appending state */
171 update_req_stats(now
);
174 template <typename T
>
175 std::shared_ptr
<WriteLogOperation
> C_WriteRequest
<T
>::create_operation(
176 uint64_t offset
, uint64_t len
) {
177 return pwl
.m_builder
->create_write_log_operation(
178 *op_set
, offset
, len
, pwl
.get_context(),
179 pwl
.m_builder
->create_write_log_entry(op_set
->sync_point
->log_entry
, offset
, len
));
182 template <typename T
>
183 void C_WriteRequest
<T
>::setup_log_operations(DeferredContexts
&on_exit
) {
184 GenericWriteLogEntries log_entries
;
186 std::lock_guard
locker(m_lock
);
187 std::shared_ptr
<SyncPoint
> current_sync_point
= pwl
.get_current_sync_point();
188 if ((!pwl
.get_persist_on_flush() && current_sync_point
->log_entry
->writes_completed
) ||
189 (current_sync_point
->log_entry
->writes
> MAX_WRITES_PER_SYNC_POINT
) ||
190 (current_sync_point
->log_entry
->bytes
> MAX_BYTES_PER_SYNC_POINT
)) {
191 /* Create new sync point and persist the previous one. This sequenced
192 * write will bear a sync gen number shared with no already completed
193 * writes. A group of sequenced writes may be safely flushed concurrently
194 * if they all arrived before any of them completed. We'll insert one on
195 * an aio_flush() from the application. Here we're inserting one to cap
196 * the number of bytes and writes per sync point. When the application is
197 * not issuing flushes, we insert sync points to record some observed
198 * write concurrency information that enables us to safely issue >1 flush
199 * write (for writes observed here to have been in flight simultaneously)
200 * at a time in persist-on-write mode.
202 pwl
.flush_new_sync_point(nullptr, on_exit
);
203 current_sync_point
= pwl
.get_current_sync_point();
205 uint64_t current_sync_gen
= pwl
.get_current_sync_gen();
207 make_unique
<WriteLogOperationSet
>(this->m_dispatched_time
,
210 pwl
.get_persist_on_flush(),
211 pwl
.get_context(), this);
212 ldout(pwl
.get_context(), 20) << "write_req=" << *this << " op_set=" << op_set
.get() << dendl
;
213 ceph_assert(m_resources
.allocated
);
214 /* op_set->operations initialized differently for plain write or write same */
215 auto allocation
= m_resources
.buffers
.begin();
216 uint64_t buffer_offset
= 0;
217 for (auto &extent
: this->image_extents
) {
218 /* operation->on_write_persist connected to m_prior_log_entries_persisted Gather */
219 auto operation
= this->create_operation(extent
.first
, extent
.second
);
220 this->op_set
->operations
.emplace_back(operation
);
222 /* A WS is also a write */
223 ldout(pwl
.get_context(), 20) << "write_req=" << *this << " op_set=" << op_set
.get()
224 << " operation=" << operation
<< dendl
;
225 log_entries
.emplace_back(operation
->log_entry
);
226 if (!op_set
->persist_on_flush
) {
227 pwl
.inc_last_op_sequence_num();
229 operation
->init(true, allocation
, current_sync_gen
,
230 pwl
.get_last_op_sequence_num(), this->bl
, buffer_offset
, op_set
->persist_on_flush
);
231 buffer_offset
+= operation
->log_entry
->write_bytes();
232 ldout(pwl
.get_context(), 20) << "operation=[" << *operation
<< "]" << dendl
;
236 /* All extent ops subs created */
237 op_set
->extent_ops_appending
->activate();
238 op_set
->extent_ops_persist
->activate();
240 pwl
.add_into_log_map(log_entries
, this);
243 template <typename T
>
244 void C_WriteRequest
<T
>::copy_cache() {
245 pwl
.copy_bl_to_buffer(&m_resources
, op_set
);
248 template <typename T
>
249 bool C_WriteRequest
<T
>::append_write_request(std::shared_ptr
<SyncPoint
> sync_point
) {
250 std::lock_guard
locker(m_lock
);
251 auto write_req_sp
= this;
252 if (sync_point
->earlier_sync_point
) {
253 Context
*schedule_append_ctx
= new LambdaContext([this, write_req_sp
](int r
) {
254 write_req_sp
->schedule_append();
256 sync_point
->earlier_sync_point
->on_sync_point_appending
.push_back(schedule_append_ctx
);
262 template <typename T
>
263 void C_WriteRequest
<T
>::schedule_append() {
264 ceph_assert(++m_appended
== 1);
265 pwl
.setup_schedule_append(this->op_set
->operations
, m_do_early_flush
, this);
269 * Attempts to allocate log resources for a write. Returns true if successful.
271 * Resources include 1 lane per extent, 1 log entry per extent, and the payload
272 * data space for each extent.
274 * Lanes are released after the write persists via release_write_lanes()
276 template <typename T
>
277 bool C_WriteRequest
<T
>::alloc_resources() {
278 this->allocated_time
= ceph_clock_now();
279 return pwl
.alloc_resources(this);
283 * Takes custody of write_req. Resources must already be allocated.
288 template <typename T
>
289 void C_WriteRequest
<T
>::dispatch()
291 CephContext
*cct
= pwl
.get_context();
292 DeferredContexts on_exit
;
293 utime_t now
= ceph_clock_now();
294 this->m_dispatched_time
= now
;
296 ldout(cct
, 15) << "write_req=" << this << " cell=" << this->get_cell() << dendl
;
297 this->setup_log_operations(on_exit
);
299 bool append_deferred
= false;
300 if (!op_set
->persist_on_flush
&&
301 append_write_request(op_set
->sync_point
)) {
302 /* In persist-on-write mode, we defer the append of this write until the
303 * previous sync point is appending (meaning all the writes before it are
304 * persisted and that previous sync point can now appear in the
305 * log). Since we insert sync points in persist-on-write mode when writes
306 * have already completed to the current sync point, this limits us to
307 * one inserted sync point in flight at a time, and gives the next
308 * inserted sync point some time to accumulate a few writes if they
309 * arrive soon. Without this we can insert an absurd number of sync
310 * points, each with one or two writes. That uses a lot of log entries,
311 * and limits flushing to very few writes at a time. */
312 m_do_early_flush
= false;
313 append_deferred
= true;
315 /* The prior sync point is done, so we'll schedule append here. If this is
316 * persist-on-write, and probably still the caller's thread, we'll use this
317 * caller's thread to perform the persist & replication of the payload
320 !(this->detained
|| this->m_queued
|| this->m_deferred
|| op_set
->persist_on_flush
);
322 if (!append_deferred
) {
323 this->schedule_append();
327 template <typename T
>
328 C_FlushRequest
<T
>::C_FlushRequest(T
&pwl
, const utime_t arrived
,
329 io::Extents
&&image_extents
,
330 bufferlist
&& bl
, const int fadvise_flags
,
331 ceph::mutex
&lock
, PerfCounters
*perfcounter
,
333 : C_BlockIORequest
<T
>(pwl
, arrived
, std::move(image_extents
), std::move(bl
),
334 fadvise_flags
, user_req
),
335 m_lock(lock
), m_perfcounter(perfcounter
) {
336 ldout(pwl
.get_context(), 20) << this << dendl
;
339 template <typename T
>
340 void C_FlushRequest
<T
>::finish_req(int r
) {
341 ldout(pwl
.get_context(), 20) << "flush_req=" << this
342 << " cell=" << this->get_cell() << dendl
;
343 /* Block guard already released */
344 ceph_assert(!this->get_cell());
346 /* Completed to caller by here */
347 utime_t now
= ceph_clock_now();
348 m_perfcounter
->tinc(l_librbd_pwl_aio_flush_latency
, now
- this->m_arrived_time
);
351 template <typename T
>
352 bool C_FlushRequest
<T
>::alloc_resources() {
353 ldout(pwl
.get_context(), 20) << "req type=" << get_name() << " "
354 << "req=[" << *this << "]" << dendl
;
355 return pwl
.alloc_resources(this);
358 template <typename T
>
359 void C_FlushRequest
<T
>::dispatch() {
360 utime_t now
= ceph_clock_now();
361 ldout(pwl
.get_context(), 20) << "req type=" << get_name() << " "
362 << "req=[" << *this << "]" << dendl
;
363 ceph_assert(this->m_resources
.allocated
);
364 this->m_dispatched_time
= now
;
366 op
= std::make_shared
<SyncPointLogOperation
>(m_lock
,
372 m_perfcounter
->inc(l_librbd_pwl_log_ops
, 1);
373 pwl
.schedule_append(op
);
376 template <typename T
>
377 void C_FlushRequest
<T
>::setup_buffer_resources(
378 uint64_t *bytes_cached
, uint64_t *bytes_dirtied
, uint64_t *bytes_allocated
,
379 uint64_t *number_lanes
, uint64_t *number_log_entries
,
380 uint64_t *number_unpublished_reserves
) {
381 *number_log_entries
= 1;
384 template <typename T
>
385 std::ostream
&operator<<(std::ostream
&os
,
386 const C_FlushRequest
<T
> &req
) {
387 os
<< (C_BlockIORequest
<T
>&)req
388 << " m_resources.allocated=" << req
.m_resources
.allocated
;
392 template <typename T
>
393 C_DiscardRequest
<T
>::C_DiscardRequest(T
&pwl
, const utime_t arrived
, io::Extents
&&image_extents
,
394 uint32_t discard_granularity_bytes
, ceph::mutex
&lock
,
395 PerfCounters
*perfcounter
, Context
*user_req
)
396 : C_BlockIORequest
<T
>(pwl
, arrived
, std::move(image_extents
), bufferlist(), 0, user_req
),
397 m_discard_granularity_bytes(discard_granularity_bytes
),
399 m_perfcounter(perfcounter
) {
400 ldout(pwl
.get_context(), 20) << this << dendl
;
403 template <typename T
>
404 C_DiscardRequest
<T
>::~C_DiscardRequest() {
405 ldout(pwl
.get_context(), 20) << this << dendl
;
408 template <typename T
>
409 bool C_DiscardRequest
<T
>::alloc_resources() {
410 ldout(pwl
.get_context(), 20) << "req type=" << get_name() << " "
411 << "req=[" << *this << "]" << dendl
;
412 return pwl
.alloc_resources(this);
415 template <typename T
>
416 void C_DiscardRequest
<T
>::setup_log_operations() {
417 std::lock_guard
locker(m_lock
);
418 GenericWriteLogEntries log_entries
;
419 for (auto &extent
: this->image_extents
) {
420 op
= pwl
.m_builder
->create_discard_log_operation(
421 pwl
.get_current_sync_point(), extent
.first
, extent
.second
,
422 m_discard_granularity_bytes
, this->m_dispatched_time
, m_perfcounter
,
424 log_entries
.emplace_back(op
->log_entry
);
427 uint64_t current_sync_gen
= pwl
.get_current_sync_gen();
428 bool persist_on_flush
= pwl
.get_persist_on_flush();
429 if (!persist_on_flush
) {
430 pwl
.inc_last_op_sequence_num();
432 auto discard_req
= this;
433 Context
*on_write_append
= pwl
.get_current_sync_point()->prior_persisted_gather_new_sub();
435 Context
*on_write_persist
= new LambdaContext(
436 [this, discard_req
](int r
) {
437 ldout(pwl
.get_context(), 20) << "discard_req=" << discard_req
438 << " cell=" << discard_req
->get_cell() << dendl
;
439 ceph_assert(discard_req
->get_cell());
440 discard_req
->complete_user_request(r
);
441 discard_req
->release_cell();
443 op
->init_op(current_sync_gen
, persist_on_flush
, pwl
.get_last_op_sequence_num(),
444 on_write_persist
, on_write_append
);
445 pwl
.add_into_log_map(log_entries
, this);
448 template <typename T
>
449 void C_DiscardRequest
<T
>::dispatch() {
450 utime_t now
= ceph_clock_now();
451 ldout(pwl
.get_context(), 20) << "req type=" << get_name() << " "
452 << "req=[" << *this << "]" << dendl
;
453 ceph_assert(this->m_resources
.allocated
);
454 this->m_dispatched_time
= now
;
455 setup_log_operations();
456 m_perfcounter
->inc(l_librbd_pwl_log_ops
, 1);
457 pwl
.schedule_append(op
);
460 template <typename T
>
461 void C_DiscardRequest
<T
>::setup_buffer_resources(
462 uint64_t *bytes_cached
, uint64_t *bytes_dirtied
, uint64_t *bytes_allocated
,
463 uint64_t *number_lanes
, uint64_t *number_log_entries
,
464 uint64_t *number_unpublished_reserves
) {
465 *number_log_entries
= 1;
466 /* No bytes are allocated for a discard, but we count the discarded bytes
467 * as dirty. This means it's possible to have more bytes dirty than
468 * there are bytes cached or allocated. */
469 for (auto &extent
: this->image_extents
) {
470 *bytes_dirtied
= extent
.second
;
475 template <typename T
>
476 void C_DiscardRequest
<T
>::blockguard_acquired(GuardedRequestFunctionContext
&guard_ctx
) {
477 ldout(pwl
.get_context(), 20) << " cell=" << guard_ctx
.cell
<< dendl
;
479 ceph_assert(guard_ctx
.cell
);
480 this->detained
= guard_ctx
.state
.detained
; /* overlapped */
481 this->set_cell(guard_ctx
.cell
);
484 template <typename T
>
485 std::ostream
&operator<<(std::ostream
&os
,
486 const C_DiscardRequest
<T
> &req
) {
487 os
<< (C_BlockIORequest
<T
>&)req
;
489 os
<< " op=[" << *req
.op
<< "]";
496 template <typename T
>
497 C_WriteSameRequest
<T
>::C_WriteSameRequest(
498 T
&pwl
, const utime_t arrived
, io::Extents
&&image_extents
,
499 bufferlist
&& bl
, const int fadvise_flags
, ceph::mutex
&lock
,
500 PerfCounters
*perfcounter
, Context
*user_req
)
501 : C_WriteRequest
<T
>(pwl
, arrived
, std::move(image_extents
), std::move(bl
),
502 fadvise_flags
, lock
, perfcounter
, user_req
) {
503 ldout(pwl
.get_context(), 20) << this << dendl
;
506 template <typename T
>
507 C_WriteSameRequest
<T
>::~C_WriteSameRequest() {
508 ldout(pwl
.get_context(), 20) << this << dendl
;
511 template <typename T
>
512 void C_WriteSameRequest
<T
>::update_req_stats(utime_t
&now
) {
513 /* Write same stats excluded from most write stats
514 * because the read phase will make them look like slow writes in
515 * those histograms. */
516 ldout(pwl
.get_context(), 20) << this << dendl
;
517 utime_t comp_latency
= now
- this->m_arrived_time
;
518 this->m_perfcounter
->tinc(l_librbd_pwl_ws_latency
, comp_latency
);
521 template <typename T
>
522 std::shared_ptr
<WriteLogOperation
> C_WriteSameRequest
<T
>::create_operation(
523 uint64_t offset
, uint64_t len
) {
524 ceph_assert(this->image_extents
.size() == 1);
525 WriteLogOperationSet
&set
= *this->op_set
.get();
526 return pwl
.m_builder
->create_write_log_operation(
527 *this->op_set
.get(), offset
, len
, this->bl
.length(), pwl
.get_context(),
528 pwl
.m_builder
->create_writesame_log_entry(set
.sync_point
->log_entry
, offset
,
529 len
, this->bl
.length()));
532 template <typename T
>
533 std::ostream
&operator<<(std::ostream
&os
,
534 const C_WriteSameRequest
<T
> &req
) {
535 os
<< (C_WriteRequest
<T
>&)req
;
539 template <typename T
>
540 void C_WriteRequest
<T
>::update_req_stats(utime_t
&now
) {
541 /* Compare-and-write stats. Compare-and-write excluded from most write
542 * stats because the read phase will make them look like slow writes in
543 * those histograms. */
544 if(is_comp_and_write
) {
545 if (!compare_succeeded
) {
546 this->m_perfcounter
->inc(l_librbd_pwl_cmp_fails
, 1);
548 utime_t comp_latency
= now
- this->m_arrived_time
;
549 this->m_perfcounter
->tinc(l_librbd_pwl_cmp_latency
, comp_latency
);
555 } // namespace librbd
557 template class librbd::cache::pwl::C_BlockIORequest
<librbd::cache::pwl::AbstractWriteLog
<librbd::ImageCtx
> >;
558 template class librbd::cache::pwl::C_WriteRequest
<librbd::cache::pwl::AbstractWriteLog
<librbd::ImageCtx
> >;
559 template class librbd::cache::pwl::C_FlushRequest
<librbd::cache::pwl::AbstractWriteLog
<librbd::ImageCtx
> >;
560 template class librbd::cache::pwl::C_DiscardRequest
<librbd::cache::pwl::AbstractWriteLog
<librbd::ImageCtx
> >;
561 template class librbd::cache::pwl::C_WriteSameRequest
<librbd::cache::pwl::AbstractWriteLog
<librbd::ImageCtx
> >;