]> git.proxmox.com Git - ceph.git/blame - ceph/src/librbd/cache/rwl/Request.cc
bump version to 15.2.11-pve1
[ceph.git] / ceph / src / librbd / cache / rwl / Request.cc
CommitLineData
9f95a23c
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "Request.h"
5#include "librbd/BlockGuard.h"
6#include "librbd/cache/rwl/LogEntry.h"
7
8#define dout_subsys ceph_subsys_rbd_rwl
9#undef dout_prefix
10#define dout_prefix *_dout << "librbd::cache::rwl::Request: " << this << " " \
11 << __func__ << ": "
12
13namespace librbd {
14namespace cache {
15namespace rwl {
16
17typedef std::list<std::shared_ptr<GenericWriteLogEntry>> GenericWriteLogEntries;
18
19template <typename T>
20C_BlockIORequest<T>::C_BlockIORequest(T &rwl, const utime_t arrived, io::Extents &&extents,
21 bufferlist&& bl, const int fadvise_flags, Context *user_req)
22 : rwl(rwl), image_extents(std::move(extents)),
23 bl(std::move(bl)), fadvise_flags(fadvise_flags),
24 user_req(user_req), image_extents_summary(image_extents), m_arrived_time(arrived) {
25 ldout(rwl.get_context(), 99) << this << dendl;
26}
27
28template <typename T>
29C_BlockIORequest<T>::~C_BlockIORequest() {
30 ldout(rwl.get_context(), 99) << this << dendl;
31 ceph_assert(m_cell_released || !m_cell);
32}
33
34template <typename T>
35std::ostream &operator<<(std::ostream &os,
36 const C_BlockIORequest<T> &req) {
37 os << "image_extents=[" << req.image_extents << "], "
38 << "image_extents_summary=[" << req.image_extents_summary << "], "
39 << "bl=" << req.bl << ", "
40 << "user_req=" << req.user_req << ", "
41 << "m_user_req_completed=" << req.m_user_req_completed << ", "
42 << "m_deferred=" << req.m_deferred << ", "
43 << "detained=" << req.detained << ", "
44 << "waited_lanes=" << req.waited_lanes << ", "
45 << "waited_entries=" << req.waited_entries << ", "
46 << "waited_buffers=" << req.waited_buffers << "";
47 return os;
48}
49
50template <typename T>
51void C_BlockIORequest<T>::set_cell(BlockGuardCell *cell) {
52 ldout(rwl.get_context(), 20) << this << " cell=" << cell << dendl;
53 ceph_assert(cell);
54 ceph_assert(!m_cell);
55 m_cell = cell;
56}
57
58template <typename T>
59BlockGuardCell *C_BlockIORequest<T>::get_cell(void) {
60 ldout(rwl.get_context(), 20) << this << " cell=" << m_cell << dendl;
61 return m_cell;
62}
63
64template <typename T>
65void C_BlockIORequest<T>::release_cell() {
66 ldout(rwl.get_context(), 20) << this << " cell=" << m_cell << dendl;
67 ceph_assert(m_cell);
68 bool initial = false;
69 if (m_cell_released.compare_exchange_strong(initial, true)) {
70 rwl.release_guarded_request(m_cell);
71 } else {
72 ldout(rwl.get_context(), 5) << "cell " << m_cell << " already released for " << this << dendl;
73 }
74}
75
76template <typename T>
77void C_BlockIORequest<T>::complete_user_request(int r) {
78 bool initial = false;
79 if (m_user_req_completed.compare_exchange_strong(initial, true)) {
80 ldout(rwl.get_context(), 15) << this << " completing user req" << dendl;
81 m_user_req_completed_time = ceph_clock_now();
82 user_req->complete(r);
83 // Set user_req as null as it is deleted
84 user_req = nullptr;
85 } else {
86 ldout(rwl.get_context(), 20) << this << " user req already completed" << dendl;
87 }
88}
89
90template <typename T>
91void C_BlockIORequest<T>::finish(int r) {
92 ldout(rwl.get_context(), 20) << this << dendl;
93
94 complete_user_request(r);
95 bool initial = false;
96 if (m_finish_called.compare_exchange_strong(initial, true)) {
97 ldout(rwl.get_context(), 15) << this << " finishing" << dendl;
98 finish_req(0);
99 } else {
100 ldout(rwl.get_context(), 20) << this << " already finished" << dendl;
101 ceph_assert(0);
102 }
103}
104
105template <typename T>
106void C_BlockIORequest<T>::deferred() {
107 bool initial = false;
108 if (m_deferred.compare_exchange_strong(initial, true)) {
109 deferred_handler();
110 }
111}
112
113template <typename T>
114C_WriteRequest<T>::C_WriteRequest(T &rwl, const utime_t arrived, io::Extents &&image_extents,
115 bufferlist&& bl, const int fadvise_flags, ceph::mutex &lock,
116 PerfCounters *perfcounter, Context *user_req)
117 : C_BlockIORequest<T>(rwl, arrived, std::move(image_extents), std::move(bl), fadvise_flags, user_req),
118 m_lock(lock), m_perfcounter(perfcounter) {
119 ldout(rwl.get_context(), 99) << this << dendl;
120}
121
122template <typename T>
123C_WriteRequest<T>::~C_WriteRequest() {
124 ldout(rwl.get_context(), 99) << this << dendl;
125}
126
127template <typename T>
128std::ostream &operator<<(std::ostream &os,
129 const C_WriteRequest<T> &req) {
130 os << (C_BlockIORequest<T>&)req
131 << " m_resources.allocated=" << req.m_resources.allocated;
132 if (req.op_set) {
133 os << "op_set=" << *req.op_set;
134 }
135 return os;
136};
137
138template <typename T>
139void C_WriteRequest<T>::blockguard_acquired(GuardedRequestFunctionContext &guard_ctx) {
140 ldout(rwl.get_context(), 20) << __func__ << " write_req=" << this << " cell=" << guard_ctx.cell << dendl;
141
142 ceph_assert(guard_ctx.cell);
143 this->detained = guard_ctx.state.detained; /* overlapped */
144 this->m_queued = guard_ctx.state.queued; /* queued behind at least one barrier */
145 this->set_cell(guard_ctx.cell);
146}
147
148template <typename T>
149void C_WriteRequest<T>::finish_req(int r) {
150 ldout(rwl.get_context(), 15) << "write_req=" << this << " cell=" << this->get_cell() << dendl;
151
152 /* Completed to caller by here (in finish(), which calls this) */
153 utime_t now = ceph_clock_now();
154 rwl.release_write_lanes(this);
155 ceph_assert(m_resources.allocated);
156 m_resources.allocated = false;
157 this->release_cell(); /* TODO: Consider doing this in appending state */
158 update_req_stats(now);
159}
160
161template <typename T>
162void C_WriteRequest<T>::setup_buffer_resources(
163 uint64_t &bytes_cached, uint64_t &bytes_dirtied, uint64_t &bytes_allocated,
164 uint64_t &number_lanes, uint64_t &number_log_entries,
165 uint64_t &number_unpublished_reserves) {
166
167 ceph_assert(!m_resources.allocated);
168
169 auto image_extents_size = this->image_extents.size();
170 m_resources.buffers.reserve(image_extents_size);
171
172 bytes_cached = 0;
173 bytes_allocated = 0;
174 number_lanes = image_extents_size;
175 number_log_entries = image_extents_size;
176 number_unpublished_reserves = image_extents_size;
177
178 for (auto &extent : this->image_extents) {
179 m_resources.buffers.emplace_back();
180 struct WriteBufferAllocation &buffer = m_resources.buffers.back();
181 buffer.allocation_size = MIN_WRITE_ALLOC_SIZE;
182 buffer.allocated = false;
183 bytes_cached += extent.second;
184 if (extent.second > buffer.allocation_size) {
185 buffer.allocation_size = extent.second;
186 }
187 bytes_allocated += buffer.allocation_size;
188 }
189 bytes_dirtied = bytes_cached;
190}
191
192template <typename T>
193void C_WriteRequest<T>::setup_log_operations() {
194 {
195 std::lock_guard locker(m_lock);
196 // TODO: Add sync point if necessary
197 std::shared_ptr<SyncPoint> current_sync_point = rwl.get_current_sync_point();
198 uint64_t current_sync_gen = rwl.get_current_sync_gen();
199 op_set =
200 make_unique<WriteLogOperationSet>(this->m_dispatched_time,
201 m_perfcounter,
202 current_sync_point,
203 rwl.get_persist_on_flush(),
204 rwl.get_context(), this);
205 ldout(rwl.get_context(), 20) << "write_req=" << *this << " op_set=" << op_set.get() << dendl;
206 ceph_assert(m_resources.allocated);
207 /* op_set->operations initialized differently for plain write or write same */
208 auto allocation = m_resources.buffers.begin();
209 uint64_t buffer_offset = 0;
210 for (auto &extent : this->image_extents) {
211 /* operation->on_write_persist connected to m_prior_log_entries_persisted Gather */
212 auto operation =
213 std::make_shared<WriteLogOperation>(*op_set, extent.first, extent.second, rwl.get_context());
214 op_set->operations.emplace_back(operation);
215
216 /* A WS is also a write */
217 ldout(rwl.get_context(), 20) << "write_req=" << *this << " op_set=" << op_set.get()
218 << " operation=" << operation << dendl;
219 rwl.inc_last_op_sequence_num();
220 operation->init(true, allocation, current_sync_gen,
221 rwl.get_last_op_sequence_num(), this->bl, buffer_offset, op_set->persist_on_flush);
222 buffer_offset += operation->log_entry->write_bytes();
223 ldout(rwl.get_context(), 20) << "operation=[" << *operation << "]" << dendl;
224 allocation++;
225 }
226 }
227 /* All extent ops subs created */
228 op_set->extent_ops_appending->activate();
229 op_set->extent_ops_persist->activate();
230
231 /* Write data */
232 for (auto &operation : op_set->operations) {
233 operation->copy_bl_to_pmem_buffer();
234 }
235}
236
237template <typename T>
238bool C_WriteRequest<T>::append_write_request(std::shared_ptr<SyncPoint> sync_point) {
239 std::lock_guard locker(m_lock);
240 auto write_req_sp = this;
241 if (sync_point->earlier_sync_point) {
242 Context *schedule_append_ctx = new LambdaContext([this, write_req_sp](int r) {
243 write_req_sp->schedule_append();
244 });
245 sync_point->earlier_sync_point->on_sync_point_appending.push_back(schedule_append_ctx);
246 return true;
247 }
248 return false;
249}
250
251template <typename T>
252void C_WriteRequest<T>::schedule_append() {
253 ceph_assert(++m_appended == 1);
254 if (m_do_early_flush) {
255 /* This caller is waiting for persist, so we'll use their thread to
256 * expedite it */
257 rwl.flush_pmem_buffer(this->op_set->operations);
258 rwl.schedule_append(this->op_set->operations);
259 } else {
260 /* This is probably not still the caller's thread, so do the payload
261 * flushing/replicating later. */
262 rwl.schedule_flush_and_append(this->op_set->operations);
263 }
264}
265
266/**
267 * Attempts to allocate log resources for a write. Returns true if successful.
268 *
269 * Resources include 1 lane per extent, 1 log entry per extent, and the payload
270 * data space for each extent.
271 *
272 * Lanes are released after the write persists via release_write_lanes()
273 */
274template <typename T>
275bool C_WriteRequest<T>::alloc_resources() {
276 this->allocated_time = ceph_clock_now();
277 return rwl.alloc_resources(this);
278}
279
280/**
281 * Takes custody of write_req. Resources must already be allocated.
282 *
283 * Locking:
284 * Acquires lock
285 */
286template <typename T>
287void C_WriteRequest<T>::dispatch()
288{
289 CephContext *cct = rwl.get_context();
290 utime_t now = ceph_clock_now();
291 this->m_dispatched_time = now;
292
293 ldout(cct, 15) << "write_req=" << this << " cell=" << this->get_cell() << dendl;
294 setup_log_operations();
295
296 bool append_deferred = false;
297 if (!op_set->persist_on_flush &&
298 append_write_request(op_set->sync_point)) {
299 /* In persist-on-write mode, we defer the append of this write until the
300 * previous sync point is appending (meaning all the writes before it are
301 * persisted and that previous sync point can now appear in the
302 * log). Since we insert sync points in persist-on-write mode when writes
303 * have already completed to the current sync point, this limits us to
304 * one inserted sync point in flight at a time, and gives the next
305 * inserted sync point some time to accumulate a few writes if they
306 * arrive soon. Without this we can insert an absurd number of sync
307 * points, each with one or two writes. That uses a lot of log entries,
308 * and limits flushing to very few writes at a time. */
309 m_do_early_flush = false;
310 append_deferred = true;
311 } else {
312 /* The prior sync point is done, so we'll schedule append here. If this is
313 * persist-on-write, and probably still the caller's thread, we'll use this
314 * caller's thread to perform the persist & replication of the payload
315 * buffer. */
316 m_do_early_flush =
317 !(this->detained || this->m_queued || this->m_deferred || op_set->persist_on_flush);
318 }
319 if (!append_deferred) {
320 this->schedule_append();
321 }
322}
323
324std::ostream &operator<<(std::ostream &os,
325 const BlockGuardReqState &r) {
326 os << "barrier=" << r.barrier << ", "
327 << "current_barrier=" << r.current_barrier << ", "
328 << "detained=" << r.detained << ", "
329 << "queued=" << r.queued;
330 return os;
331};
332
333GuardedRequestFunctionContext::GuardedRequestFunctionContext(boost::function<void(GuardedRequestFunctionContext&)> &&callback)
334 : m_callback(std::move(callback)){ }
335
336GuardedRequestFunctionContext::~GuardedRequestFunctionContext(void) { }
337
338void GuardedRequestFunctionContext::finish(int r) {
339 ceph_assert(cell);
340 m_callback(*this);
341}
342
343std::ostream &operator<<(std::ostream &os,
344 const GuardedRequest &r) {
345 os << "guard_ctx->state=[" << r.guard_ctx->state << "], "
346 << "block_extent.block_start=" << r.block_extent.block_start << ", "
347 << "block_extent.block_start=" << r.block_extent.block_end;
348 return os;
349};
350
351} // namespace rwl
352} // namespace cache
353} // namespace librbd