]>
Commit | Line | Data |
---|---|---|
9f95a23c TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
4 | #include "Request.h" | |
5 | #include "librbd/BlockGuard.h" | |
6 | #include "librbd/cache/rwl/LogEntry.h" | |
7 | ||
8 | #define dout_subsys ceph_subsys_rbd_rwl | |
9 | #undef dout_prefix | |
10 | #define dout_prefix *_dout << "librbd::cache::rwl::Request: " << this << " " \ | |
11 | << __func__ << ": " | |
12 | ||
13 | namespace librbd { | |
14 | namespace cache { | |
15 | namespace rwl { | |
16 | ||
17 | typedef std::list<std::shared_ptr<GenericWriteLogEntry>> GenericWriteLogEntries; | |
18 | ||
19 | template <typename T> | |
20 | C_BlockIORequest<T>::C_BlockIORequest(T &rwl, const utime_t arrived, io::Extents &&extents, | |
21 | bufferlist&& bl, const int fadvise_flags, Context *user_req) | |
22 | : rwl(rwl), image_extents(std::move(extents)), | |
23 | bl(std::move(bl)), fadvise_flags(fadvise_flags), | |
24 | user_req(user_req), image_extents_summary(image_extents), m_arrived_time(arrived) { | |
25 | ldout(rwl.get_context(), 99) << this << dendl; | |
26 | } | |
27 | ||
28 | template <typename T> | |
29 | C_BlockIORequest<T>::~C_BlockIORequest() { | |
30 | ldout(rwl.get_context(), 99) << this << dendl; | |
31 | ceph_assert(m_cell_released || !m_cell); | |
32 | } | |
33 | ||
34 | template <typename T> | |
35 | std::ostream &operator<<(std::ostream &os, | |
36 | const C_BlockIORequest<T> &req) { | |
37 | os << "image_extents=[" << req.image_extents << "], " | |
38 | << "image_extents_summary=[" << req.image_extents_summary << "], " | |
39 | << "bl=" << req.bl << ", " | |
40 | << "user_req=" << req.user_req << ", " | |
41 | << "m_user_req_completed=" << req.m_user_req_completed << ", " | |
42 | << "m_deferred=" << req.m_deferred << ", " | |
43 | << "detained=" << req.detained << ", " | |
44 | << "waited_lanes=" << req.waited_lanes << ", " | |
45 | << "waited_entries=" << req.waited_entries << ", " | |
46 | << "waited_buffers=" << req.waited_buffers << ""; | |
47 | return os; | |
48 | } | |
49 | ||
50 | template <typename T> | |
51 | void C_BlockIORequest<T>::set_cell(BlockGuardCell *cell) { | |
52 | ldout(rwl.get_context(), 20) << this << " cell=" << cell << dendl; | |
53 | ceph_assert(cell); | |
54 | ceph_assert(!m_cell); | |
55 | m_cell = cell; | |
56 | } | |
57 | ||
58 | template <typename T> | |
59 | BlockGuardCell *C_BlockIORequest<T>::get_cell(void) { | |
60 | ldout(rwl.get_context(), 20) << this << " cell=" << m_cell << dendl; | |
61 | return m_cell; | |
62 | } | |
63 | ||
64 | template <typename T> | |
65 | void C_BlockIORequest<T>::release_cell() { | |
66 | ldout(rwl.get_context(), 20) << this << " cell=" << m_cell << dendl; | |
67 | ceph_assert(m_cell); | |
68 | bool initial = false; | |
69 | if (m_cell_released.compare_exchange_strong(initial, true)) { | |
70 | rwl.release_guarded_request(m_cell); | |
71 | } else { | |
72 | ldout(rwl.get_context(), 5) << "cell " << m_cell << " already released for " << this << dendl; | |
73 | } | |
74 | } | |
75 | ||
76 | template <typename T> | |
77 | void C_BlockIORequest<T>::complete_user_request(int r) { | |
78 | bool initial = false; | |
79 | if (m_user_req_completed.compare_exchange_strong(initial, true)) { | |
80 | ldout(rwl.get_context(), 15) << this << " completing user req" << dendl; | |
81 | m_user_req_completed_time = ceph_clock_now(); | |
82 | user_req->complete(r); | |
83 | // Set user_req as null as it is deleted | |
84 | user_req = nullptr; | |
85 | } else { | |
86 | ldout(rwl.get_context(), 20) << this << " user req already completed" << dendl; | |
87 | } | |
88 | } | |
89 | ||
90 | template <typename T> | |
91 | void C_BlockIORequest<T>::finish(int r) { | |
92 | ldout(rwl.get_context(), 20) << this << dendl; | |
93 | ||
94 | complete_user_request(r); | |
95 | bool initial = false; | |
96 | if (m_finish_called.compare_exchange_strong(initial, true)) { | |
97 | ldout(rwl.get_context(), 15) << this << " finishing" << dendl; | |
98 | finish_req(0); | |
99 | } else { | |
100 | ldout(rwl.get_context(), 20) << this << " already finished" << dendl; | |
101 | ceph_assert(0); | |
102 | } | |
103 | } | |
104 | ||
105 | template <typename T> | |
106 | void C_BlockIORequest<T>::deferred() { | |
107 | bool initial = false; | |
108 | if (m_deferred.compare_exchange_strong(initial, true)) { | |
109 | deferred_handler(); | |
110 | } | |
111 | } | |
112 | ||
113 | template <typename T> | |
114 | C_WriteRequest<T>::C_WriteRequest(T &rwl, const utime_t arrived, io::Extents &&image_extents, | |
115 | bufferlist&& bl, const int fadvise_flags, ceph::mutex &lock, | |
116 | PerfCounters *perfcounter, Context *user_req) | |
117 | : C_BlockIORequest<T>(rwl, arrived, std::move(image_extents), std::move(bl), fadvise_flags, user_req), | |
118 | m_lock(lock), m_perfcounter(perfcounter) { | |
119 | ldout(rwl.get_context(), 99) << this << dendl; | |
120 | } | |
121 | ||
122 | template <typename T> | |
123 | C_WriteRequest<T>::~C_WriteRequest() { | |
124 | ldout(rwl.get_context(), 99) << this << dendl; | |
125 | } | |
126 | ||
127 | template <typename T> | |
128 | std::ostream &operator<<(std::ostream &os, | |
129 | const C_WriteRequest<T> &req) { | |
130 | os << (C_BlockIORequest<T>&)req | |
131 | << " m_resources.allocated=" << req.m_resources.allocated; | |
132 | if (req.op_set) { | |
133 | os << "op_set=" << *req.op_set; | |
134 | } | |
135 | return os; | |
136 | }; | |
137 | ||
138 | template <typename T> | |
139 | void C_WriteRequest<T>::blockguard_acquired(GuardedRequestFunctionContext &guard_ctx) { | |
140 | ldout(rwl.get_context(), 20) << __func__ << " write_req=" << this << " cell=" << guard_ctx.cell << dendl; | |
141 | ||
142 | ceph_assert(guard_ctx.cell); | |
143 | this->detained = guard_ctx.state.detained; /* overlapped */ | |
144 | this->m_queued = guard_ctx.state.queued; /* queued behind at least one barrier */ | |
145 | this->set_cell(guard_ctx.cell); | |
146 | } | |
147 | ||
148 | template <typename T> | |
149 | void C_WriteRequest<T>::finish_req(int r) { | |
150 | ldout(rwl.get_context(), 15) << "write_req=" << this << " cell=" << this->get_cell() << dendl; | |
151 | ||
152 | /* Completed to caller by here (in finish(), which calls this) */ | |
153 | utime_t now = ceph_clock_now(); | |
154 | rwl.release_write_lanes(this); | |
155 | ceph_assert(m_resources.allocated); | |
156 | m_resources.allocated = false; | |
157 | this->release_cell(); /* TODO: Consider doing this in appending state */ | |
158 | update_req_stats(now); | |
159 | } | |
160 | ||
161 | template <typename T> | |
162 | void C_WriteRequest<T>::setup_buffer_resources( | |
163 | uint64_t &bytes_cached, uint64_t &bytes_dirtied, uint64_t &bytes_allocated, | |
164 | uint64_t &number_lanes, uint64_t &number_log_entries, | |
165 | uint64_t &number_unpublished_reserves) { | |
166 | ||
167 | ceph_assert(!m_resources.allocated); | |
168 | ||
169 | auto image_extents_size = this->image_extents.size(); | |
170 | m_resources.buffers.reserve(image_extents_size); | |
171 | ||
172 | bytes_cached = 0; | |
173 | bytes_allocated = 0; | |
174 | number_lanes = image_extents_size; | |
175 | number_log_entries = image_extents_size; | |
176 | number_unpublished_reserves = image_extents_size; | |
177 | ||
178 | for (auto &extent : this->image_extents) { | |
179 | m_resources.buffers.emplace_back(); | |
180 | struct WriteBufferAllocation &buffer = m_resources.buffers.back(); | |
181 | buffer.allocation_size = MIN_WRITE_ALLOC_SIZE; | |
182 | buffer.allocated = false; | |
183 | bytes_cached += extent.second; | |
184 | if (extent.second > buffer.allocation_size) { | |
185 | buffer.allocation_size = extent.second; | |
186 | } | |
187 | bytes_allocated += buffer.allocation_size; | |
188 | } | |
189 | bytes_dirtied = bytes_cached; | |
190 | } | |
191 | ||
192 | template <typename T> | |
193 | void C_WriteRequest<T>::setup_log_operations() { | |
194 | { | |
195 | std::lock_guard locker(m_lock); | |
196 | // TODO: Add sync point if necessary | |
197 | std::shared_ptr<SyncPoint> current_sync_point = rwl.get_current_sync_point(); | |
198 | uint64_t current_sync_gen = rwl.get_current_sync_gen(); | |
199 | op_set = | |
200 | make_unique<WriteLogOperationSet>(this->m_dispatched_time, | |
201 | m_perfcounter, | |
202 | current_sync_point, | |
203 | rwl.get_persist_on_flush(), | |
204 | rwl.get_context(), this); | |
205 | ldout(rwl.get_context(), 20) << "write_req=" << *this << " op_set=" << op_set.get() << dendl; | |
206 | ceph_assert(m_resources.allocated); | |
207 | /* op_set->operations initialized differently for plain write or write same */ | |
208 | auto allocation = m_resources.buffers.begin(); | |
209 | uint64_t buffer_offset = 0; | |
210 | for (auto &extent : this->image_extents) { | |
211 | /* operation->on_write_persist connected to m_prior_log_entries_persisted Gather */ | |
212 | auto operation = | |
213 | std::make_shared<WriteLogOperation>(*op_set, extent.first, extent.second, rwl.get_context()); | |
214 | op_set->operations.emplace_back(operation); | |
215 | ||
216 | /* A WS is also a write */ | |
217 | ldout(rwl.get_context(), 20) << "write_req=" << *this << " op_set=" << op_set.get() | |
218 | << " operation=" << operation << dendl; | |
219 | rwl.inc_last_op_sequence_num(); | |
220 | operation->init(true, allocation, current_sync_gen, | |
221 | rwl.get_last_op_sequence_num(), this->bl, buffer_offset, op_set->persist_on_flush); | |
222 | buffer_offset += operation->log_entry->write_bytes(); | |
223 | ldout(rwl.get_context(), 20) << "operation=[" << *operation << "]" << dendl; | |
224 | allocation++; | |
225 | } | |
226 | } | |
227 | /* All extent ops subs created */ | |
228 | op_set->extent_ops_appending->activate(); | |
229 | op_set->extent_ops_persist->activate(); | |
230 | ||
231 | /* Write data */ | |
232 | for (auto &operation : op_set->operations) { | |
233 | operation->copy_bl_to_pmem_buffer(); | |
234 | } | |
235 | } | |
236 | ||
237 | template <typename T> | |
238 | bool C_WriteRequest<T>::append_write_request(std::shared_ptr<SyncPoint> sync_point) { | |
239 | std::lock_guard locker(m_lock); | |
240 | auto write_req_sp = this; | |
241 | if (sync_point->earlier_sync_point) { | |
242 | Context *schedule_append_ctx = new LambdaContext([this, write_req_sp](int r) { | |
243 | write_req_sp->schedule_append(); | |
244 | }); | |
245 | sync_point->earlier_sync_point->on_sync_point_appending.push_back(schedule_append_ctx); | |
246 | return true; | |
247 | } | |
248 | return false; | |
249 | } | |
250 | ||
251 | template <typename T> | |
252 | void C_WriteRequest<T>::schedule_append() { | |
253 | ceph_assert(++m_appended == 1); | |
254 | if (m_do_early_flush) { | |
255 | /* This caller is waiting for persist, so we'll use their thread to | |
256 | * expedite it */ | |
257 | rwl.flush_pmem_buffer(this->op_set->operations); | |
258 | rwl.schedule_append(this->op_set->operations); | |
259 | } else { | |
260 | /* This is probably not still the caller's thread, so do the payload | |
261 | * flushing/replicating later. */ | |
262 | rwl.schedule_flush_and_append(this->op_set->operations); | |
263 | } | |
264 | } | |
265 | ||
266 | /** | |
267 | * Attempts to allocate log resources for a write. Returns true if successful. | |
268 | * | |
269 | * Resources include 1 lane per extent, 1 log entry per extent, and the payload | |
270 | * data space for each extent. | |
271 | * | |
272 | * Lanes are released after the write persists via release_write_lanes() | |
273 | */ | |
274 | template <typename T> | |
275 | bool C_WriteRequest<T>::alloc_resources() { | |
276 | this->allocated_time = ceph_clock_now(); | |
277 | return rwl.alloc_resources(this); | |
278 | } | |
279 | ||
280 | /** | |
281 | * Takes custody of write_req. Resources must already be allocated. | |
282 | * | |
283 | * Locking: | |
284 | * Acquires lock | |
285 | */ | |
286 | template <typename T> | |
287 | void C_WriteRequest<T>::dispatch() | |
288 | { | |
289 | CephContext *cct = rwl.get_context(); | |
290 | utime_t now = ceph_clock_now(); | |
291 | this->m_dispatched_time = now; | |
292 | ||
293 | ldout(cct, 15) << "write_req=" << this << " cell=" << this->get_cell() << dendl; | |
294 | setup_log_operations(); | |
295 | ||
296 | bool append_deferred = false; | |
297 | if (!op_set->persist_on_flush && | |
298 | append_write_request(op_set->sync_point)) { | |
299 | /* In persist-on-write mode, we defer the append of this write until the | |
300 | * previous sync point is appending (meaning all the writes before it are | |
301 | * persisted and that previous sync point can now appear in the | |
302 | * log). Since we insert sync points in persist-on-write mode when writes | |
303 | * have already completed to the current sync point, this limits us to | |
304 | * one inserted sync point in flight at a time, and gives the next | |
305 | * inserted sync point some time to accumulate a few writes if they | |
306 | * arrive soon. Without this we can insert an absurd number of sync | |
307 | * points, each with one or two writes. That uses a lot of log entries, | |
308 | * and limits flushing to very few writes at a time. */ | |
309 | m_do_early_flush = false; | |
310 | append_deferred = true; | |
311 | } else { | |
312 | /* The prior sync point is done, so we'll schedule append here. If this is | |
313 | * persist-on-write, and probably still the caller's thread, we'll use this | |
314 | * caller's thread to perform the persist & replication of the payload | |
315 | * buffer. */ | |
316 | m_do_early_flush = | |
317 | !(this->detained || this->m_queued || this->m_deferred || op_set->persist_on_flush); | |
318 | } | |
319 | if (!append_deferred) { | |
320 | this->schedule_append(); | |
321 | } | |
322 | } | |
323 | ||
324 | std::ostream &operator<<(std::ostream &os, | |
325 | const BlockGuardReqState &r) { | |
326 | os << "barrier=" << r.barrier << ", " | |
327 | << "current_barrier=" << r.current_barrier << ", " | |
328 | << "detained=" << r.detained << ", " | |
329 | << "queued=" << r.queued; | |
330 | return os; | |
331 | }; | |
332 | ||
333 | GuardedRequestFunctionContext::GuardedRequestFunctionContext(boost::function<void(GuardedRequestFunctionContext&)> &&callback) | |
334 | : m_callback(std::move(callback)){ } | |
335 | ||
336 | GuardedRequestFunctionContext::~GuardedRequestFunctionContext(void) { } | |
337 | ||
338 | void GuardedRequestFunctionContext::finish(int r) { | |
339 | ceph_assert(cell); | |
340 | m_callback(*this); | |
341 | } | |
342 | ||
343 | std::ostream &operator<<(std::ostream &os, | |
344 | const GuardedRequest &r) { | |
345 | os << "guard_ctx->state=[" << r.guard_ctx->state << "], " | |
346 | << "block_extent.block_start=" << r.block_extent.block_start << ", " | |
347 | << "block_extent.block_start=" << r.block_extent.block_end; | |
348 | return os; | |
349 | }; | |
350 | ||
351 | } // namespace rwl | |
352 | } // namespace cache | |
353 | } // namespace librbd |