]> git.proxmox.com Git - ceph.git/blob - ceph/src/librbd/cache/pwl/Request.cc
import ceph quincy 17.2.1
[ceph.git] / ceph / src / librbd / cache / pwl / Request.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "Request.h"
5 #include "librbd/BlockGuard.h"
6 #include "librbd/cache/pwl/LogEntry.h"
7 #include "librbd/cache/pwl/AbstractWriteLog.h"
8
9 #define dout_subsys ceph_subsys_rbd_pwl
10 #undef dout_prefix
11 #define dout_prefix *_dout << "librbd::cache::pwl::Request: " << this << " " \
12 << __func__ << ": "
13
14 using namespace std;
15
16 namespace librbd {
17 namespace cache {
18 namespace pwl {
19
20 template <typename T>
21 C_BlockIORequest<T>::C_BlockIORequest(T &pwl, const utime_t arrived, io::Extents &&extents,
22 bufferlist&& bl, const int fadvise_flags, Context *user_req)
23 : pwl(pwl), image_extents(std::move(extents)),
24 bl(std::move(bl)), fadvise_flags(fadvise_flags),
25 user_req(user_req), image_extents_summary(image_extents), m_arrived_time(arrived) {
26 ldout(pwl.get_context(), 99) << this << dendl;
27 }
28
29 template <typename T>
30 C_BlockIORequest<T>::~C_BlockIORequest() {
31 ldout(pwl.get_context(), 99) << this << dendl;
32 ceph_assert(m_cell_released || !m_cell);
33 }
34
35 template <typename T>
36 std::ostream &operator<<(std::ostream &os,
37 const C_BlockIORequest<T> &req) {
38 os << "image_extents=" << req.image_extents
39 << ", image_extents_summary=[" << req.image_extents_summary
40 << "], bl=" << req.bl
41 << ", user_req=" << req.user_req
42 << ", m_user_req_completed=" << req.m_user_req_completed
43 << ", m_deferred=" << req.m_deferred
44 << ", detained=" << req.detained;
45 return os;
46 }
47
48 template <typename T>
49 void C_BlockIORequest<T>::set_cell(BlockGuardCell *cell) {
50 ldout(pwl.get_context(), 20) << this << " cell=" << cell << dendl;
51 ceph_assert(cell);
52 ceph_assert(!m_cell);
53 m_cell = cell;
54 }
55
56 template <typename T>
57 BlockGuardCell *C_BlockIORequest<T>::get_cell(void) {
58 ldout(pwl.get_context(), 20) << this << " cell=" << m_cell << dendl;
59 return m_cell;
60 }
61
62 template <typename T>
63 void C_BlockIORequest<T>::release_cell() {
64 ldout(pwl.get_context(), 20) << this << " cell=" << m_cell << dendl;
65 ceph_assert(m_cell);
66 bool initial = false;
67 if (m_cell_released.compare_exchange_strong(initial, true)) {
68 pwl.release_guarded_request(m_cell);
69 } else {
70 ldout(pwl.get_context(), 5) << "cell " << m_cell << " already released for " << this << dendl;
71 }
72 }
73
74 template <typename T>
75 void C_BlockIORequest<T>::complete_user_request(int r) {
76 bool initial = false;
77 if (m_user_req_completed.compare_exchange_strong(initial, true)) {
78 ldout(pwl.get_context(), 15) << this << " completing user req" << dendl;
79 m_user_req_completed_time = ceph_clock_now();
80 pwl.complete_user_request(user_req, r);
81 } else {
82 ldout(pwl.get_context(), 20) << this << " user req already completed" << dendl;
83 }
84 }
85
86 template <typename T>
87 void C_BlockIORequest<T>::finish(int r) {
88 ldout(pwl.get_context(), 20) << this << dendl;
89
90 complete_user_request(r);
91 bool initial = false;
92 if (m_finish_called.compare_exchange_strong(initial, true)) {
93 ldout(pwl.get_context(), 15) << this << " finishing" << dendl;
94 finish_req(0);
95 } else {
96 ldout(pwl.get_context(), 20) << this << " already finished" << dendl;
97 ceph_assert(0);
98 }
99 }
100
101 template <typename T>
102 void C_BlockIORequest<T>::deferred() {
103 bool initial = false;
104 if (m_deferred.compare_exchange_strong(initial, true)) {
105 deferred_handler();
106 }
107 }
108
109 template <typename T>
110 C_WriteRequest<T>::C_WriteRequest(T &pwl, const utime_t arrived, io::Extents &&image_extents,
111 bufferlist&& bl, const int fadvise_flags, ceph::mutex &lock,
112 PerfCounters *perfcounter, Context *user_req)
113 : C_BlockIORequest<T>(pwl, arrived, std::move(image_extents), std::move(bl), fadvise_flags, user_req),
114 m_perfcounter(perfcounter), m_lock(lock) {
115 ldout(pwl.get_context(), 99) << this << dendl;
116 }
117
118 template <typename T>
119 C_WriteRequest<T>::C_WriteRequest(T &pwl, const utime_t arrived, io::Extents &&image_extents,
120 bufferlist&& cmp_bl, bufferlist&& bl, uint64_t *mismatch_offset,
121 int fadvise_flags, ceph::mutex &lock, PerfCounters *perfcounter,
122 Context *user_req)
123 : C_BlockIORequest<T>(pwl, arrived, std::move(image_extents), std::move(bl), fadvise_flags, user_req),
124 mismatch_offset(mismatch_offset), cmp_bl(std::move(cmp_bl)),
125 m_perfcounter(perfcounter), m_lock(lock) {
126 is_comp_and_write = true;
127 ldout(pwl.get_context(), 20) << dendl;
128 }
129
130 template <typename T>
131 C_WriteRequest<T>::~C_WriteRequest() {
132 ldout(pwl.get_context(), 99) << this << dendl;
133 }
134
135 template <typename T>
136 std::ostream &operator<<(std::ostream &os,
137 const C_WriteRequest<T> &req) {
138 os << (C_BlockIORequest<T>&)req
139 << " m_resources.allocated=" << req.m_resources.allocated;
140 if (req.op_set) {
141 os << " op_set=[" << *req.op_set << "]";
142 }
143 return os;
144 }
145
146 template <typename T>
147 void C_WriteRequest<T>::blockguard_acquired(GuardedRequestFunctionContext &guard_ctx) {
148 ldout(pwl.get_context(), 20) << __func__ << " write_req=" << this << " cell=" << guard_ctx.cell << dendl;
149
150 ceph_assert(guard_ctx.cell);
151 this->detained = guard_ctx.state.detained; /* overlapped */
152 this->m_queued = guard_ctx.state.queued; /* queued behind at least one barrier */
153 this->set_cell(guard_ctx.cell);
154 }
155
156 template <typename T>
157 void C_WriteRequest<T>::finish_req(int r) {
158 ldout(pwl.get_context(), 15) << "write_req=" << this << " cell=" << this->get_cell() << dendl;
159
160 /* Completed to caller by here (in finish(), which calls this) */
161 utime_t now = ceph_clock_now();
162 if(is_comp_and_write && !compare_succeeded) {
163 update_req_stats(now);
164 return;
165 }
166 pwl.release_write_lanes(this);
167 ceph_assert(m_resources.allocated);
168 m_resources.allocated = false;
169 this->release_cell(); /* TODO: Consider doing this in appending state */
170 update_req_stats(now);
171 }
172
173 template <typename T>
174 std::shared_ptr<WriteLogOperation> C_WriteRequest<T>::create_operation(
175 uint64_t offset, uint64_t len) {
176 return pwl.m_builder->create_write_log_operation(
177 *op_set, offset, len, pwl.get_context(),
178 pwl.m_builder->create_write_log_entry(op_set->sync_point->log_entry, offset, len));
179 }
180
181 template <typename T>
182 void C_WriteRequest<T>::setup_log_operations(DeferredContexts &on_exit) {
183 GenericWriteLogEntries log_entries;
184 {
185 std::lock_guard locker(m_lock);
186 std::shared_ptr<SyncPoint> current_sync_point = pwl.get_current_sync_point();
187 if ((!pwl.get_persist_on_flush() && current_sync_point->log_entry->writes_completed) ||
188 (current_sync_point->log_entry->writes > MAX_WRITES_PER_SYNC_POINT) ||
189 (current_sync_point->log_entry->bytes > MAX_BYTES_PER_SYNC_POINT)) {
190 /* Create new sync point and persist the previous one. This sequenced
191 * write will bear a sync gen number shared with no already completed
192 * writes. A group of sequenced writes may be safely flushed concurrently
193 * if they all arrived before any of them completed. We'll insert one on
194 * an aio_flush() from the application. Here we're inserting one to cap
195 * the number of bytes and writes per sync point. When the application is
196 * not issuing flushes, we insert sync points to record some observed
197 * write concurrency information that enables us to safely issue >1 flush
198 * write (for writes observed here to have been in flight simultaneously)
199 * at a time in persist-on-write mode.
200 */
201 pwl.flush_new_sync_point(nullptr, on_exit);
202 current_sync_point = pwl.get_current_sync_point();
203 }
204 uint64_t current_sync_gen = pwl.get_current_sync_gen();
205 op_set =
206 make_unique<WriteLogOperationSet>(this->m_dispatched_time,
207 m_perfcounter,
208 current_sync_point,
209 pwl.get_persist_on_flush(),
210 pwl.get_context(), this);
211 ldout(pwl.get_context(), 20) << "write_req=[" << *this
212 << "], op_set=" << op_set.get() << dendl;
213 ceph_assert(m_resources.allocated);
214 /* op_set->operations initialized differently for plain write or write same */
215 auto allocation = m_resources.buffers.begin();
216 uint64_t buffer_offset = 0;
217 for (auto &extent : this->image_extents) {
218 /* operation->on_write_persist connected to m_prior_log_entries_persisted Gather */
219 auto operation = this->create_operation(extent.first, extent.second);
220 this->op_set->operations.emplace_back(operation);
221
222 /* A WS is also a write */
223 ldout(pwl.get_context(), 20) << "write_req=[" << *this
224 << "], op_set=" << op_set.get()
225 << ", operation=" << operation << dendl;
226 log_entries.emplace_back(operation->log_entry);
227 if (!op_set->persist_on_flush) {
228 pwl.inc_last_op_sequence_num();
229 }
230 operation->init(true, allocation, current_sync_gen,
231 pwl.get_last_op_sequence_num(), this->bl, buffer_offset, op_set->persist_on_flush);
232 buffer_offset += operation->log_entry->write_bytes();
233 ldout(pwl.get_context(), 20) << "operation=[" << *operation << "]" << dendl;
234 allocation++;
235 }
236 }
237 /* All extent ops subs created */
238 op_set->extent_ops_appending->activate();
239 op_set->extent_ops_persist->activate();
240
241 pwl.add_into_log_map(log_entries, this);
242 }
243
244 template <typename T>
245 void C_WriteRequest<T>::copy_cache() {
246 pwl.copy_bl_to_buffer(&m_resources, op_set);
247 }
248
249 template <typename T>
250 bool C_WriteRequest<T>::append_write_request(std::shared_ptr<SyncPoint> sync_point) {
251 std::lock_guard locker(m_lock);
252 auto write_req_sp = this;
253 if (sync_point->earlier_sync_point) {
254 Context *schedule_append_ctx = new LambdaContext([write_req_sp](int r) {
255 write_req_sp->schedule_append();
256 });
257 sync_point->earlier_sync_point->on_sync_point_appending.push_back(schedule_append_ctx);
258 return true;
259 }
260 return false;
261 }
262
263 template <typename T>
264 void C_WriteRequest<T>::schedule_append() {
265 ceph_assert(++m_appended == 1);
266 pwl.setup_schedule_append(this->op_set->operations, m_do_early_flush, this);
267 }
268
269 /**
270 * Attempts to allocate log resources for a write. Returns true if successful.
271 *
272 * Resources include 1 lane per extent, 1 log entry per extent, and the payload
273 * data space for each extent.
274 *
275 * Lanes are released after the write persists via release_write_lanes()
276 */
277 template <typename T>
278 bool C_WriteRequest<T>::alloc_resources() {
279 this->allocated_time = ceph_clock_now();
280 return pwl.alloc_resources(this);
281 }
282
283 /**
284 * Takes custody of write_req. Resources must already be allocated.
285 *
286 * Locking:
287 * Acquires lock
288 */
289 template <typename T>
290 void C_WriteRequest<T>::dispatch()
291 {
292 CephContext *cct = pwl.get_context();
293 DeferredContexts on_exit;
294 utime_t now = ceph_clock_now();
295 this->m_dispatched_time = now;
296
297 ldout(cct, 15) << "write_req=" << this << " cell=" << this->get_cell() << dendl;
298 this->setup_log_operations(on_exit);
299
300 bool append_deferred = false;
301 if (!op_set->persist_on_flush &&
302 append_write_request(op_set->sync_point)) {
303 /* In persist-on-write mode, we defer the append of this write until the
304 * previous sync point is appending (meaning all the writes before it are
305 * persisted and that previous sync point can now appear in the
306 * log). Since we insert sync points in persist-on-write mode when writes
307 * have already completed to the current sync point, this limits us to
308 * one inserted sync point in flight at a time, and gives the next
309 * inserted sync point some time to accumulate a few writes if they
310 * arrive soon. Without this we can insert an absurd number of sync
311 * points, each with one or two writes. That uses a lot of log entries,
312 * and limits flushing to very few writes at a time. */
313 m_do_early_flush = false;
314 append_deferred = true;
315 } else {
316 /* The prior sync point is done, so we'll schedule append here. If this is
317 * persist-on-write, and probably still the caller's thread, we'll use this
318 * caller's thread to perform the persist & replication of the payload
319 * buffer. */
320 m_do_early_flush =
321 !(this->detained || this->m_queued || this->m_deferred || op_set->persist_on_flush);
322 }
323 if (!append_deferred) {
324 this->schedule_append();
325 }
326 }
327
328 template <typename T>
329 C_FlushRequest<T>::C_FlushRequest(T &pwl, const utime_t arrived,
330 io::Extents &&image_extents,
331 bufferlist&& bl, const int fadvise_flags,
332 ceph::mutex &lock, PerfCounters *perfcounter,
333 Context *user_req)
334 : C_BlockIORequest<T>(pwl, arrived, std::move(image_extents), std::move(bl),
335 fadvise_flags, user_req),
336 m_lock(lock), m_perfcounter(perfcounter) {
337 ldout(pwl.get_context(), 20) << this << dendl;
338 }
339
340 template <typename T>
341 void C_FlushRequest<T>::finish_req(int r) {
342 ldout(pwl.get_context(), 20) << "flush_req=" << this
343 << " cell=" << this->get_cell() << dendl;
344 /* Block guard already released */
345 ceph_assert(!this->get_cell());
346
347 /* Completed to caller by here */
348 utime_t now = ceph_clock_now();
349 m_perfcounter->tinc(l_librbd_pwl_aio_flush_latency, now - this->m_arrived_time);
350 }
351
352 template <typename T>
353 bool C_FlushRequest<T>::alloc_resources() {
354 ldout(pwl.get_context(), 20) << "req type=" << get_name()
355 << " req=[" << *this << "]" << dendl;
356 return pwl.alloc_resources(this);
357 }
358
359 template <typename T>
360 void C_FlushRequest<T>::dispatch() {
361 utime_t now = ceph_clock_now();
362 ldout(pwl.get_context(), 20) << "req type=" << get_name()
363 << " req=[" << *this << "]" << dendl;
364 ceph_assert(this->m_resources.allocated);
365 this->m_dispatched_time = now;
366
367 op = std::make_shared<SyncPointLogOperation>(m_lock,
368 to_append,
369 now,
370 m_perfcounter,
371 pwl.get_context());
372
373 m_perfcounter->inc(l_librbd_pwl_log_ops, 1);
374 pwl.schedule_append(op);
375 }
376
377 template <typename T>
378 void C_FlushRequest<T>::setup_buffer_resources(
379 uint64_t *bytes_cached, uint64_t *bytes_dirtied, uint64_t *bytes_allocated,
380 uint64_t *number_lanes, uint64_t *number_log_entries,
381 uint64_t *number_unpublished_reserves) {
382 *number_log_entries = 1;
383 }
384
385 template <typename T>
386 std::ostream &operator<<(std::ostream &os,
387 const C_FlushRequest<T> &req) {
388 os << (C_BlockIORequest<T>&)req
389 << " m_resources.allocated=" << req.m_resources.allocated;
390 return os;
391 }
392
393 template <typename T>
394 C_DiscardRequest<T>::C_DiscardRequest(T &pwl, const utime_t arrived, io::Extents &&image_extents,
395 uint32_t discard_granularity_bytes, ceph::mutex &lock,
396 PerfCounters *perfcounter, Context *user_req)
397 : C_BlockIORequest<T>(pwl, arrived, std::move(image_extents), bufferlist(), 0, user_req),
398 m_discard_granularity_bytes(discard_granularity_bytes),
399 m_lock(lock),
400 m_perfcounter(perfcounter) {
401 ldout(pwl.get_context(), 20) << this << dendl;
402 }
403
404 template <typename T>
405 C_DiscardRequest<T>::~C_DiscardRequest() {
406 ldout(pwl.get_context(), 20) << this << dendl;
407 }
408
409 template <typename T>
410 bool C_DiscardRequest<T>::alloc_resources() {
411 ldout(pwl.get_context(), 20) << "req type=" << get_name()
412 << " req=[" << *this << "]" << dendl;
413 return pwl.alloc_resources(this);
414 }
415
416 template <typename T>
417 void C_DiscardRequest<T>::setup_log_operations() {
418 std::lock_guard locker(m_lock);
419 GenericWriteLogEntries log_entries;
420 for (auto &extent : this->image_extents) {
421 op = pwl.m_builder->create_discard_log_operation(
422 pwl.get_current_sync_point(), extent.first, extent.second,
423 m_discard_granularity_bytes, this->m_dispatched_time, m_perfcounter,
424 pwl.get_context());
425 log_entries.emplace_back(op->log_entry);
426 break;
427 }
428 uint64_t current_sync_gen = pwl.get_current_sync_gen();
429 bool persist_on_flush = pwl.get_persist_on_flush();
430 if (!persist_on_flush) {
431 pwl.inc_last_op_sequence_num();
432 }
433 auto discard_req = this;
434 Context *on_write_append = pwl.get_current_sync_point()->prior_persisted_gather_new_sub();
435
436 Context *on_write_persist = new LambdaContext(
437 [this, discard_req](int r) {
438 ldout(pwl.get_context(), 20) << "discard_req=" << discard_req
439 << " cell=" << discard_req->get_cell() << dendl;
440 ceph_assert(discard_req->get_cell());
441 discard_req->complete_user_request(r);
442 discard_req->release_cell();
443 });
444 op->init_op(current_sync_gen, persist_on_flush, pwl.get_last_op_sequence_num(),
445 on_write_persist, on_write_append);
446 pwl.add_into_log_map(log_entries, this);
447 }
448
449 template <typename T>
450 void C_DiscardRequest<T>::dispatch() {
451 utime_t now = ceph_clock_now();
452 ldout(pwl.get_context(), 20) << "req type=" << get_name()
453 << " req=[" << *this << "]" << dendl;
454 ceph_assert(this->m_resources.allocated);
455 this->m_dispatched_time = now;
456 setup_log_operations();
457 m_perfcounter->inc(l_librbd_pwl_log_ops, 1);
458 pwl.schedule_append(op);
459 }
460
461 template <typename T>
462 void C_DiscardRequest<T>::setup_buffer_resources(
463 uint64_t *bytes_cached, uint64_t *bytes_dirtied, uint64_t *bytes_allocated,
464 uint64_t *number_lanes, uint64_t *number_log_entries,
465 uint64_t *number_unpublished_reserves) {
466 *number_log_entries = 1;
467 /* No bytes are allocated for a discard, but we count the discarded bytes
468 * as dirty. This means it's possible to have more bytes dirty than
469 * there are bytes cached or allocated. */
470 for (auto &extent : this->image_extents) {
471 *bytes_dirtied = extent.second;
472 break;
473 }
474 }
475
476 template <typename T>
477 void C_DiscardRequest<T>::blockguard_acquired(GuardedRequestFunctionContext &guard_ctx) {
478 ldout(pwl.get_context(), 20) << " cell=" << guard_ctx.cell << dendl;
479
480 ceph_assert(guard_ctx.cell);
481 this->detained = guard_ctx.state.detained; /* overlapped */
482 this->set_cell(guard_ctx.cell);
483 }
484
485 template <typename T>
486 std::ostream &operator<<(std::ostream &os,
487 const C_DiscardRequest<T> &req) {
488 os << (C_BlockIORequest<T>&)req;
489 if (req.op) {
490 os << " op=[" << *req.op << "]";
491 } else {
492 os << " op=nullptr";
493 }
494 return os;
495 }
496
497 template <typename T>
498 C_WriteSameRequest<T>::C_WriteSameRequest(
499 T &pwl, const utime_t arrived, io::Extents &&image_extents,
500 bufferlist&& bl, const int fadvise_flags, ceph::mutex &lock,
501 PerfCounters *perfcounter, Context *user_req)
502 : C_WriteRequest<T>(pwl, arrived, std::move(image_extents), std::move(bl),
503 fadvise_flags, lock, perfcounter, user_req) {
504 ldout(pwl.get_context(), 20) << this << dendl;
505 }
506
507 template <typename T>
508 C_WriteSameRequest<T>::~C_WriteSameRequest() {
509 ldout(pwl.get_context(), 20) << this << dendl;
510 }
511
512 template <typename T>
513 void C_WriteSameRequest<T>::update_req_stats(utime_t &now) {
514 /* Write same stats excluded from most write stats
515 * because the read phase will make them look like slow writes in
516 * those histograms. */
517 ldout(pwl.get_context(), 20) << this << dendl;
518 utime_t comp_latency = now - this->m_arrived_time;
519 this->m_perfcounter->tinc(l_librbd_pwl_ws_latency, comp_latency);
520 }
521
522 template <typename T>
523 std::shared_ptr<WriteLogOperation> C_WriteSameRequest<T>::create_operation(
524 uint64_t offset, uint64_t len) {
525 ceph_assert(this->image_extents.size() == 1);
526 WriteLogOperationSet &set = *this->op_set.get();
527 return pwl.m_builder->create_write_log_operation(
528 *this->op_set.get(), offset, len, this->bl.length(), pwl.get_context(),
529 pwl.m_builder->create_writesame_log_entry(set.sync_point->log_entry, offset,
530 len, this->bl.length()));
531 }
532
533 template <typename T>
534 std::ostream &operator<<(std::ostream &os,
535 const C_WriteSameRequest<T> &req) {
536 os << (C_WriteRequest<T>&)req;
537 return os;
538 }
539
540 template <typename T>
541 void C_WriteRequest<T>::update_req_stats(utime_t &now) {
542 /* Compare-and-write stats. Compare-and-write excluded from most write
543 * stats because the read phase will make them look like slow writes in
544 * those histograms. */
545 if(is_comp_and_write) {
546 if (!compare_succeeded) {
547 this->m_perfcounter->inc(l_librbd_pwl_cmp_fails, 1);
548 }
549 utime_t comp_latency = now - this->m_arrived_time;
550 this->m_perfcounter->tinc(l_librbd_pwl_cmp_latency, comp_latency);
551 }
552 }
553
554 } // namespace pwl
555 } // namespace cache
556 } // namespace librbd
557
558 template class librbd::cache::pwl::C_BlockIORequest<librbd::cache::pwl::AbstractWriteLog<librbd::ImageCtx> >;
559 template class librbd::cache::pwl::C_WriteRequest<librbd::cache::pwl::AbstractWriteLog<librbd::ImageCtx> >;
560 template class librbd::cache::pwl::C_FlushRequest<librbd::cache::pwl::AbstractWriteLog<librbd::ImageCtx> >;
561 template class librbd::cache::pwl::C_DiscardRequest<librbd::cache::pwl::AbstractWriteLog<librbd::ImageCtx> >;
562 template class librbd::cache::pwl::C_WriteSameRequest<librbd::cache::pwl::AbstractWriteLog<librbd::ImageCtx> >;