1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
5 #include "librbd/BlockGuard.h"
6 #include "librbd/cache/rwl/LogEntry.h"
8 #define dout_subsys ceph_subsys_rbd_rwl
10 #define dout_prefix *_dout << "librbd::cache::rwl::Request: " << this << " " \
17 typedef std::list
<std::shared_ptr
<GenericWriteLogEntry
>> GenericWriteLogEntries
;
20 C_BlockIORequest
<T
>::C_BlockIORequest(T
&rwl
, const utime_t arrived
, io::Extents
&&extents
,
21 bufferlist
&& bl
, const int fadvise_flags
, Context
*user_req
)
22 : rwl(rwl
), image_extents(std::move(extents
)),
23 bl(std::move(bl
)), fadvise_flags(fadvise_flags
),
24 user_req(user_req
), image_extents_summary(image_extents
), m_arrived_time(arrived
) {
25 ldout(rwl
.get_context(), 99) << this << dendl
;
29 C_BlockIORequest
<T
>::~C_BlockIORequest() {
30 ldout(rwl
.get_context(), 99) << this << dendl
;
31 ceph_assert(m_cell_released
|| !m_cell
);
35 std::ostream
&operator<<(std::ostream
&os
,
36 const C_BlockIORequest
<T
> &req
) {
37 os
<< "image_extents=[" << req
.image_extents
<< "], "
38 << "image_extents_summary=[" << req
.image_extents_summary
<< "], "
39 << "bl=" << req
.bl
<< ", "
40 << "user_req=" << req
.user_req
<< ", "
41 << "m_user_req_completed=" << req
.m_user_req_completed
<< ", "
42 << "m_deferred=" << req
.m_deferred
<< ", "
43 << "detained=" << req
.detained
<< ", "
44 << "waited_lanes=" << req
.waited_lanes
<< ", "
45 << "waited_entries=" << req
.waited_entries
<< ", "
46 << "waited_buffers=" << req
.waited_buffers
<< "";
51 void C_BlockIORequest
<T
>::set_cell(BlockGuardCell
*cell
) {
52 ldout(rwl
.get_context(), 20) << this << " cell=" << cell
<< dendl
;
59 BlockGuardCell
*C_BlockIORequest
<T
>::get_cell(void) {
60 ldout(rwl
.get_context(), 20) << this << " cell=" << m_cell
<< dendl
;
65 void C_BlockIORequest
<T
>::release_cell() {
66 ldout(rwl
.get_context(), 20) << this << " cell=" << m_cell
<< dendl
;
69 if (m_cell_released
.compare_exchange_strong(initial
, true)) {
70 rwl
.release_guarded_request(m_cell
);
72 ldout(rwl
.get_context(), 5) << "cell " << m_cell
<< " already released for " << this << dendl
;
77 void C_BlockIORequest
<T
>::complete_user_request(int r
) {
79 if (m_user_req_completed
.compare_exchange_strong(initial
, true)) {
80 ldout(rwl
.get_context(), 15) << this << " completing user req" << dendl
;
81 m_user_req_completed_time
= ceph_clock_now();
82 user_req
->complete(r
);
83 // Set user_req as null as it is deleted
86 ldout(rwl
.get_context(), 20) << this << " user req already completed" << dendl
;
91 void C_BlockIORequest
<T
>::finish(int r
) {
92 ldout(rwl
.get_context(), 20) << this << dendl
;
94 complete_user_request(r
);
96 if (m_finish_called
.compare_exchange_strong(initial
, true)) {
97 ldout(rwl
.get_context(), 15) << this << " finishing" << dendl
;
100 ldout(rwl
.get_context(), 20) << this << " already finished" << dendl
;
105 template <typename T
>
106 void C_BlockIORequest
<T
>::deferred() {
107 bool initial
= false;
108 if (m_deferred
.compare_exchange_strong(initial
, true)) {
113 template <typename T
>
114 C_WriteRequest
<T
>::C_WriteRequest(T
&rwl
, const utime_t arrived
, io::Extents
&&image_extents
,
115 bufferlist
&& bl
, const int fadvise_flags
, ceph::mutex
&lock
,
116 PerfCounters
*perfcounter
, Context
*user_req
)
117 : C_BlockIORequest
<T
>(rwl
, arrived
, std::move(image_extents
), std::move(bl
), fadvise_flags
, user_req
),
118 m_lock(lock
), m_perfcounter(perfcounter
) {
119 ldout(rwl
.get_context(), 99) << this << dendl
;
122 template <typename T
>
123 C_WriteRequest
<T
>::~C_WriteRequest() {
124 ldout(rwl
.get_context(), 99) << this << dendl
;
127 template <typename T
>
128 std::ostream
&operator<<(std::ostream
&os
,
129 const C_WriteRequest
<T
> &req
) {
130 os
<< (C_BlockIORequest
<T
>&)req
131 << " m_resources.allocated=" << req
.m_resources
.allocated
;
133 os
<< "op_set=" << *req
.op_set
;
138 template <typename T
>
139 void C_WriteRequest
<T
>::blockguard_acquired(GuardedRequestFunctionContext
&guard_ctx
) {
140 ldout(rwl
.get_context(), 20) << __func__
<< " write_req=" << this << " cell=" << guard_ctx
.cell
<< dendl
;
142 ceph_assert(guard_ctx
.cell
);
143 this->detained
= guard_ctx
.state
.detained
; /* overlapped */
144 this->m_queued
= guard_ctx
.state
.queued
; /* queued behind at least one barrier */
145 this->set_cell(guard_ctx
.cell
);
148 template <typename T
>
149 void C_WriteRequest
<T
>::finish_req(int r
) {
150 ldout(rwl
.get_context(), 15) << "write_req=" << this << " cell=" << this->get_cell() << dendl
;
152 /* Completed to caller by here (in finish(), which calls this) */
153 utime_t now
= ceph_clock_now();
154 rwl
.release_write_lanes(this);
155 ceph_assert(m_resources
.allocated
);
156 m_resources
.allocated
= false;
157 this->release_cell(); /* TODO: Consider doing this in appending state */
158 update_req_stats(now
);
161 template <typename T
>
162 void C_WriteRequest
<T
>::setup_buffer_resources(
163 uint64_t &bytes_cached
, uint64_t &bytes_dirtied
, uint64_t &bytes_allocated
,
164 uint64_t &number_lanes
, uint64_t &number_log_entries
,
165 uint64_t &number_unpublished_reserves
) {
167 ceph_assert(!m_resources
.allocated
);
169 auto image_extents_size
= this->image_extents
.size();
170 m_resources
.buffers
.reserve(image_extents_size
);
174 number_lanes
= image_extents_size
;
175 number_log_entries
= image_extents_size
;
176 number_unpublished_reserves
= image_extents_size
;
178 for (auto &extent
: this->image_extents
) {
179 m_resources
.buffers
.emplace_back();
180 struct WriteBufferAllocation
&buffer
= m_resources
.buffers
.back();
181 buffer
.allocation_size
= MIN_WRITE_ALLOC_SIZE
;
182 buffer
.allocated
= false;
183 bytes_cached
+= extent
.second
;
184 if (extent
.second
> buffer
.allocation_size
) {
185 buffer
.allocation_size
= extent
.second
;
187 bytes_allocated
+= buffer
.allocation_size
;
189 bytes_dirtied
= bytes_cached
;
192 template <typename T
>
193 void C_WriteRequest
<T
>::setup_log_operations() {
195 std::lock_guard
locker(m_lock
);
196 // TODO: Add sync point if necessary
197 std::shared_ptr
<SyncPoint
> current_sync_point
= rwl
.get_current_sync_point();
198 uint64_t current_sync_gen
= rwl
.get_current_sync_gen();
200 make_unique
<WriteLogOperationSet
>(this->m_dispatched_time
,
203 rwl
.get_persist_on_flush(),
204 rwl
.get_context(), this);
205 ldout(rwl
.get_context(), 20) << "write_req=" << *this << " op_set=" << op_set
.get() << dendl
;
206 ceph_assert(m_resources
.allocated
);
207 /* op_set->operations initialized differently for plain write or write same */
208 auto allocation
= m_resources
.buffers
.begin();
209 uint64_t buffer_offset
= 0;
210 for (auto &extent
: this->image_extents
) {
211 /* operation->on_write_persist connected to m_prior_log_entries_persisted Gather */
213 std::make_shared
<WriteLogOperation
>(*op_set
, extent
.first
, extent
.second
, rwl
.get_context());
214 op_set
->operations
.emplace_back(operation
);
216 /* A WS is also a write */
217 ldout(rwl
.get_context(), 20) << "write_req=" << *this << " op_set=" << op_set
.get()
218 << " operation=" << operation
<< dendl
;
219 rwl
.inc_last_op_sequence_num();
220 operation
->init(true, allocation
, current_sync_gen
,
221 rwl
.get_last_op_sequence_num(), this->bl
, buffer_offset
, op_set
->persist_on_flush
);
222 buffer_offset
+= operation
->log_entry
->write_bytes();
223 ldout(rwl
.get_context(), 20) << "operation=[" << *operation
<< "]" << dendl
;
227 /* All extent ops subs created */
228 op_set
->extent_ops_appending
->activate();
229 op_set
->extent_ops_persist
->activate();
232 for (auto &operation
: op_set
->operations
) {
233 operation
->copy_bl_to_pmem_buffer();
237 template <typename T
>
238 bool C_WriteRequest
<T
>::append_write_request(std::shared_ptr
<SyncPoint
> sync_point
) {
239 std::lock_guard
locker(m_lock
);
240 auto write_req_sp
= this;
241 if (sync_point
->earlier_sync_point
) {
242 Context
*schedule_append_ctx
= new LambdaContext([this, write_req_sp
](int r
) {
243 write_req_sp
->schedule_append();
245 sync_point
->earlier_sync_point
->on_sync_point_appending
.push_back(schedule_append_ctx
);
251 template <typename T
>
252 void C_WriteRequest
<T
>::schedule_append() {
253 ceph_assert(++m_appended
== 1);
254 if (m_do_early_flush
) {
255 /* This caller is waiting for persist, so we'll use their thread to
257 rwl
.flush_pmem_buffer(this->op_set
->operations
);
258 rwl
.schedule_append(this->op_set
->operations
);
260 /* This is probably not still the caller's thread, so do the payload
261 * flushing/replicating later. */
262 rwl
.schedule_flush_and_append(this->op_set
->operations
);
267 * Attempts to allocate log resources for a write. Returns true if successful.
269 * Resources include 1 lane per extent, 1 log entry per extent, and the payload
270 * data space for each extent.
272 * Lanes are released after the write persists via release_write_lanes()
274 template <typename T
>
275 bool C_WriteRequest
<T
>::alloc_resources() {
276 this->allocated_time
= ceph_clock_now();
277 return rwl
.alloc_resources(this);
281 * Takes custody of write_req. Resources must already be allocated.
286 template <typename T
>
287 void C_WriteRequest
<T
>::dispatch()
289 CephContext
*cct
= rwl
.get_context();
290 utime_t now
= ceph_clock_now();
291 this->m_dispatched_time
= now
;
293 ldout(cct
, 15) << "write_req=" << this << " cell=" << this->get_cell() << dendl
;
294 setup_log_operations();
296 bool append_deferred
= false;
297 if (!op_set
->persist_on_flush
&&
298 append_write_request(op_set
->sync_point
)) {
299 /* In persist-on-write mode, we defer the append of this write until the
300 * previous sync point is appending (meaning all the writes before it are
301 * persisted and that previous sync point can now appear in the
302 * log). Since we insert sync points in persist-on-write mode when writes
303 * have already completed to the current sync point, this limits us to
304 * one inserted sync point in flight at a time, and gives the next
305 * inserted sync point some time to accumulate a few writes if they
306 * arrive soon. Without this we can insert an absurd number of sync
307 * points, each with one or two writes. That uses a lot of log entries,
308 * and limits flushing to very few writes at a time. */
309 m_do_early_flush
= false;
310 append_deferred
= true;
312 /* The prior sync point is done, so we'll schedule append here. If this is
313 * persist-on-write, and probably still the caller's thread, we'll use this
314 * caller's thread to perform the persist & replication of the payload
317 !(this->detained
|| this->m_queued
|| this->m_deferred
|| op_set
->persist_on_flush
);
319 if (!append_deferred
) {
320 this->schedule_append();
324 std::ostream
&operator<<(std::ostream
&os
,
325 const BlockGuardReqState
&r
) {
326 os
<< "barrier=" << r
.barrier
<< ", "
327 << "current_barrier=" << r
.current_barrier
<< ", "
328 << "detained=" << r
.detained
<< ", "
329 << "queued=" << r
.queued
;
333 GuardedRequestFunctionContext::GuardedRequestFunctionContext(boost::function
<void(GuardedRequestFunctionContext
&)> &&callback
)
334 : m_callback(std::move(callback
)){ }
336 GuardedRequestFunctionContext::~GuardedRequestFunctionContext(void) { }
338 void GuardedRequestFunctionContext::finish(int r
) {
343 std::ostream
&operator<<(std::ostream
&os
,
344 const GuardedRequest
&r
) {
345 os
<< "guard_ctx->state=[" << r
.guard_ctx
->state
<< "], "
346 << "block_extent.block_start=" << r
.block_extent
.block_start
<< ", "
347 << "block_extent.block_start=" << r
.block_extent
.block_end
;
353 } // namespace librbd