]> git.proxmox.com Git - ceph.git/blame - ceph/src/librbd/cache/pwl/AbstractWriteLog.cc
import quincy beta 17.1.0
[ceph.git] / ceph / src / librbd / cache / pwl / AbstractWriteLog.cc
CommitLineData
f67539c2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "AbstractWriteLog.h"
5#include "include/buffer.h"
6#include "include/Context.h"
7#include "include/ceph_assert.h"
8#include "common/deleter.h"
9#include "common/dout.h"
10#include "common/environment.h"
11#include "common/errno.h"
12#include "common/hostname.h"
13#include "common/WorkQueue.h"
14#include "common/Timer.h"
15#include "common/perf_counters.h"
16#include "librbd/ImageCtx.h"
17#include "librbd/asio/ContextWQ.h"
18#include "librbd/cache/pwl/ImageCacheState.h"
19#include "librbd/cache/pwl/LogEntry.h"
20#include "librbd/plugin/Api.h"
21#include <map>
22#include <vector>
23
24#undef dout_subsys
25#define dout_subsys ceph_subsys_rbd_pwl
26#undef dout_prefix
27#define dout_prefix *_dout << "librbd::cache::pwl::AbstractWriteLog: " << this \
28 << " " << __func__ << ": "
29
30namespace librbd {
31namespace cache {
32namespace pwl {
33
20effc67 34using namespace std;
f67539c2
TL
35using namespace librbd::cache::pwl;
36
37typedef AbstractWriteLog<ImageCtx>::Extent Extent;
38typedef AbstractWriteLog<ImageCtx>::Extents Extents;
39
40template <typename I>
41AbstractWriteLog<I>::AbstractWriteLog(
42 I &image_ctx, librbd::cache::pwl::ImageCacheState<I>* cache_state,
43 Builder<This> *builder, cache::ImageWritebackInterface& image_writeback,
44 plugin::Api<I>& plugin_api)
45 : m_builder(builder),
46 m_write_log_guard(image_ctx.cct),
20effc67
TL
47 m_flush_guard(image_ctx.cct),
48 m_flush_guard_lock(ceph::make_mutex(pwl::unique_lock_name(
49 "librbd::cache::pwl::AbstractWriteLog::m_flush_guard_lock", this))),
f67539c2
TL
50 m_deferred_dispatch_lock(ceph::make_mutex(pwl::unique_lock_name(
51 "librbd::cache::pwl::AbstractWriteLog::m_deferred_dispatch_lock", this))),
52 m_blockguard_lock(ceph::make_mutex(pwl::unique_lock_name(
53 "librbd::cache::pwl::AbstractWriteLog::m_blockguard_lock", this))),
54 m_thread_pool(
55 image_ctx.cct, "librbd::cache::pwl::AbstractWriteLog::thread_pool",
56 "tp_pwl", 4, ""),
57 m_cache_state(cache_state),
58 m_image_ctx(image_ctx),
a4b75251 59 m_log_pool_size(DEFAULT_POOL_SIZE),
f67539c2
TL
60 m_image_writeback(image_writeback),
61 m_plugin_api(plugin_api),
62 m_log_retire_lock(ceph::make_mutex(pwl::unique_lock_name(
63 "librbd::cache::pwl::AbstractWriteLog::m_log_retire_lock", this))),
64 m_entry_reader_lock("librbd::cache::pwl::AbstractWriteLog::m_entry_reader_lock"),
65 m_log_append_lock(ceph::make_mutex(pwl::unique_lock_name(
66 "librbd::cache::pwl::AbstractWriteLog::m_log_append_lock", this))),
67 m_lock(ceph::make_mutex(pwl::unique_lock_name(
68 "librbd::cache::pwl::AbstractWriteLog::m_lock", this))),
69 m_blocks_to_log_entries(image_ctx.cct),
70 m_work_queue("librbd::cache::pwl::ReplicatedWriteLog::work_queue",
71 ceph::make_timespan(
72 image_ctx.config.template get_val<uint64_t>(
73 "rbd_op_thread_timeout")),
74 &m_thread_pool)
75{
76 CephContext *cct = m_image_ctx.cct;
77 m_plugin_api.get_image_timer_instance(cct, &m_timer, &m_timer_lock);
78}
79
80template <typename I>
81AbstractWriteLog<I>::~AbstractWriteLog() {
82 ldout(m_image_ctx.cct, 15) << "enter" << dendl;
83 {
84 std::lock_guard timer_locker(*m_timer_lock);
85 std::lock_guard locker(m_lock);
86 m_timer->cancel_event(m_timer_ctx);
87 m_thread_pool.stop();
88 ceph_assert(m_deferred_ios.size() == 0);
89 ceph_assert(m_ops_to_flush.size() == 0);
90 ceph_assert(m_ops_to_append.size() == 0);
91 ceph_assert(m_flush_ops_in_flight == 0);
92
93 delete m_cache_state;
94 m_cache_state = nullptr;
95 }
96 ldout(m_image_ctx.cct, 15) << "exit" << dendl;
97}
98
99template <typename I>
100void AbstractWriteLog<I>::perf_start(std::string name) {
101 PerfCountersBuilder plb(m_image_ctx.cct, name, l_librbd_pwl_first,
102 l_librbd_pwl_last);
103
104 // Latency axis configuration for op histograms, values are in nanoseconds
105 PerfHistogramCommon::axis_config_d op_hist_x_axis_config{
106 "Latency (nsec)",
107 PerfHistogramCommon::SCALE_LOG2, ///< Latency in logarithmic scale
108 0, ///< Start at 0
109 5000, ///< Quantization unit is 5usec
110 16, ///< Ranges into the mS
111 };
112
113 // Syncpoint logentry number x-axis configuration for op histograms
114 PerfHistogramCommon::axis_config_d sp_logentry_number_config{
115 "logentry number",
116 PerfHistogramCommon::SCALE_LINEAR, // log entry number in linear scale
117 0, // Start at 0
118 1, // Quantization unit is 1
119 260, // Up to 260 > (MAX_WRITES_PER_SYNC_POINT)
120 };
121
122 // Syncpoint bytes number y-axis configuration for op histogram
123 PerfHistogramCommon::axis_config_d sp_bytes_number_config{
124 "Number of SyncPoint",
125 PerfHistogramCommon::SCALE_LOG2, // Request size in logarithmic scale
126 0, // Start at 0
127 512, // Quantization unit is 512
128 17, // Writes up to 8M >= MAX_BYTES_PER_SYNC_POINT
129 };
130
131 // Op size axis configuration for op histogram y axis, values are in bytes
132 PerfHistogramCommon::axis_config_d op_hist_y_axis_config{
133 "Request size (bytes)",
134 PerfHistogramCommon::SCALE_LOG2, ///< Request size in logarithmic scale
135 0, ///< Start at 0
136 512, ///< Quantization unit is 512 bytes
137 16, ///< Writes up to >32k
138 };
139
140 // Num items configuration for op histogram y axis, values are in items
141 PerfHistogramCommon::axis_config_d op_hist_y_axis_count_config{
142 "Number of items",
143 PerfHistogramCommon::SCALE_LINEAR, ///< Request size in linear scale
144 0, ///< Start at 0
145 1, ///< Quantization unit is 1
146 32, ///< Writes up to >32k
147 };
148
149 plb.add_u64_counter(l_librbd_pwl_rd_req, "rd", "Reads");
150 plb.add_u64_counter(l_librbd_pwl_rd_bytes, "rd_bytes", "Data size in reads");
151 plb.add_time_avg(l_librbd_pwl_rd_latency, "rd_latency", "Latency of reads");
152
153 plb.add_u64_counter(l_librbd_pwl_rd_hit_req, "hit_rd", "Reads completely hitting RWL");
154 plb.add_u64_counter(l_librbd_pwl_rd_hit_bytes, "rd_hit_bytes", "Bytes read from RWL");
155 plb.add_time_avg(l_librbd_pwl_rd_hit_latency, "hit_rd_latency", "Latency of read hits");
156
157 plb.add_u64_counter(l_librbd_pwl_rd_part_hit_req, "part_hit_rd", "reads partially hitting RWL");
158
159 plb.add_u64_counter_histogram(
160 l_librbd_pwl_syncpoint_hist, "syncpoint_logentry_bytes_histogram",
161 sp_logentry_number_config, sp_bytes_number_config,
162 "Histogram of syncpoint's logentry numbers vs bytes number");
163
164 plb.add_u64_counter(l_librbd_pwl_wr_req, "wr", "Writes");
a4b75251 165 plb.add_u64_counter(l_librbd_pwl_wr_bytes, "wr_bytes", "Data size in writes");
f67539c2
TL
166 plb.add_u64_counter(l_librbd_pwl_wr_req_def, "wr_def", "Writes deferred for resources");
167 plb.add_u64_counter(l_librbd_pwl_wr_req_def_lanes, "wr_def_lanes", "Writes deferred for lanes");
168 plb.add_u64_counter(l_librbd_pwl_wr_req_def_log, "wr_def_log", "Writes deferred for log entries");
169 plb.add_u64_counter(l_librbd_pwl_wr_req_def_buf, "wr_def_buf", "Writes deferred for buffers");
170 plb.add_u64_counter(l_librbd_pwl_wr_req_overlap, "wr_overlap", "Writes overlapping with prior in-progress writes");
171 plb.add_u64_counter(l_librbd_pwl_wr_req_queued, "wr_q_barrier", "Writes queued for prior barriers (aio_flush)");
f67539c2
TL
172
173 plb.add_u64_counter(l_librbd_pwl_log_ops, "log_ops", "Log appends");
174 plb.add_u64_avg(l_librbd_pwl_log_op_bytes, "log_op_bytes", "Average log append bytes");
175
176 plb.add_time_avg(
177 l_librbd_pwl_req_arr_to_all_t, "req_arr_to_all_t",
178 "Average arrival to allocation time (time deferred for overlap)");
179 plb.add_time_avg(
180 l_librbd_pwl_req_arr_to_dis_t, "req_arr_to_dis_t",
181 "Average arrival to dispatch time (includes time deferred for overlaps and allocation)");
182 plb.add_time_avg(
183 l_librbd_pwl_req_all_to_dis_t, "req_all_to_dis_t",
184 "Average allocation to dispatch time (time deferred for log resources)");
185 plb.add_time_avg(
186 l_librbd_pwl_wr_latency, "wr_latency",
187 "Latency of writes (persistent completion)");
188 plb.add_u64_counter_histogram(
189 l_librbd_pwl_wr_latency_hist, "wr_latency_bytes_histogram",
190 op_hist_x_axis_config, op_hist_y_axis_config,
191 "Histogram of write request latency (nanoseconds) vs. bytes written");
192 plb.add_time_avg(
193 l_librbd_pwl_wr_caller_latency, "caller_wr_latency",
194 "Latency of write completion to caller");
195 plb.add_time_avg(
196 l_librbd_pwl_nowait_req_arr_to_all_t, "req_arr_to_all_nw_t",
197 "Average arrival to allocation time (time deferred for overlap)");
198 plb.add_time_avg(
199 l_librbd_pwl_nowait_req_arr_to_dis_t, "req_arr_to_dis_nw_t",
200 "Average arrival to dispatch time (includes time deferred for overlaps and allocation)");
201 plb.add_time_avg(
202 l_librbd_pwl_nowait_req_all_to_dis_t, "req_all_to_dis_nw_t",
203 "Average allocation to dispatch time (time deferred for log resources)");
204 plb.add_time_avg(
205 l_librbd_pwl_nowait_wr_latency, "wr_latency_nw",
206 "Latency of writes (persistent completion) not deferred for free space");
207 plb.add_u64_counter_histogram(
208 l_librbd_pwl_nowait_wr_latency_hist, "wr_latency_nw_bytes_histogram",
209 op_hist_x_axis_config, op_hist_y_axis_config,
210 "Histogram of write request latency (nanoseconds) vs. bytes written for writes not deferred for free space");
211 plb.add_time_avg(
212 l_librbd_pwl_nowait_wr_caller_latency, "caller_wr_latency_nw",
213 "Latency of write completion to callerfor writes not deferred for free space");
214 plb.add_time_avg(l_librbd_pwl_log_op_alloc_t, "op_alloc_t", "Average buffer pmemobj_reserve() time");
215 plb.add_u64_counter_histogram(
216 l_librbd_pwl_log_op_alloc_t_hist, "op_alloc_t_bytes_histogram",
217 op_hist_x_axis_config, op_hist_y_axis_config,
218 "Histogram of buffer pmemobj_reserve() time (nanoseconds) vs. bytes written");
219 plb.add_time_avg(l_librbd_pwl_log_op_dis_to_buf_t, "op_dis_to_buf_t", "Average dispatch to buffer persist time");
220 plb.add_time_avg(l_librbd_pwl_log_op_dis_to_app_t, "op_dis_to_app_t", "Average dispatch to log append time");
221 plb.add_time_avg(l_librbd_pwl_log_op_dis_to_cmp_t, "op_dis_to_cmp_t", "Average dispatch to persist completion time");
222 plb.add_u64_counter_histogram(
223 l_librbd_pwl_log_op_dis_to_cmp_t_hist, "op_dis_to_cmp_t_bytes_histogram",
224 op_hist_x_axis_config, op_hist_y_axis_config,
225 "Histogram of op dispatch to persist complete time (nanoseconds) vs. bytes written");
226
227 plb.add_time_avg(
228 l_librbd_pwl_log_op_buf_to_app_t, "op_buf_to_app_t",
229 "Average buffer persist to log append time (write data persist/replicate + wait for append time)");
230 plb.add_time_avg(
231 l_librbd_pwl_log_op_buf_to_bufc_t, "op_buf_to_bufc_t",
232 "Average buffer persist time (write data persist/replicate time)");
233 plb.add_u64_counter_histogram(
234 l_librbd_pwl_log_op_buf_to_bufc_t_hist, "op_buf_to_bufc_t_bytes_histogram",
235 op_hist_x_axis_config, op_hist_y_axis_config,
236 "Histogram of write buffer persist time (nanoseconds) vs. bytes written");
237 plb.add_time_avg(
238 l_librbd_pwl_log_op_app_to_cmp_t, "op_app_to_cmp_t",
239 "Average log append to persist complete time (log entry append/replicate + wait for complete time)");
240 plb.add_time_avg(
241 l_librbd_pwl_log_op_app_to_appc_t, "op_app_to_appc_t",
242 "Average log append to persist complete time (log entry append/replicate time)");
243 plb.add_u64_counter_histogram(
244 l_librbd_pwl_log_op_app_to_appc_t_hist, "op_app_to_appc_t_bytes_histogram",
245 op_hist_x_axis_config, op_hist_y_axis_config,
246 "Histogram of log append persist time (nanoseconds) (vs. op bytes)");
247
248 plb.add_u64_counter(l_librbd_pwl_discard, "discard", "Discards");
249 plb.add_u64_counter(l_librbd_pwl_discard_bytes, "discard_bytes", "Bytes discarded");
250 plb.add_time_avg(l_librbd_pwl_discard_latency, "discard_lat", "Discard latency");
251
252 plb.add_u64_counter(l_librbd_pwl_aio_flush, "aio_flush", "AIO flush (flush to RWL)");
253 plb.add_u64_counter(l_librbd_pwl_aio_flush_def, "aio_flush_def", "AIO flushes deferred for resources");
254 plb.add_time_avg(l_librbd_pwl_aio_flush_latency, "aio_flush_lat", "AIO flush latency");
255
256 plb.add_u64_counter(l_librbd_pwl_ws,"ws", "Write Sames");
257 plb.add_u64_counter(l_librbd_pwl_ws_bytes, "ws_bytes", "Write Same bytes to image");
258 plb.add_time_avg(l_librbd_pwl_ws_latency, "ws_lat", "Write Same latency");
259
260 plb.add_u64_counter(l_librbd_pwl_cmp, "cmp", "Compare and Write requests");
261 plb.add_u64_counter(l_librbd_pwl_cmp_bytes, "cmp_bytes", "Compare and Write bytes compared/written");
262 plb.add_time_avg(l_librbd_pwl_cmp_latency, "cmp_lat", "Compare and Write latecy");
263 plb.add_u64_counter(l_librbd_pwl_cmp_fails, "cmp_fails", "Compare and Write compare fails");
264
a4b75251
TL
265 plb.add_u64_counter(l_librbd_pwl_internal_flush, "internal_flush", "Flush RWL (write back to OSD)");
266 plb.add_time_avg(l_librbd_pwl_writeback_latency, "writeback_lat", "write back to OSD latency");
f67539c2
TL
267 plb.add_u64_counter(l_librbd_pwl_invalidate_cache, "invalidate", "Invalidate RWL");
268 plb.add_u64_counter(l_librbd_pwl_invalidate_discard_cache, "discard", "Discard and invalidate RWL");
269
270 plb.add_time_avg(l_librbd_pwl_append_tx_t, "append_tx_lat", "Log append transaction latency");
271 plb.add_u64_counter_histogram(
272 l_librbd_pwl_append_tx_t_hist, "append_tx_lat_histogram",
273 op_hist_x_axis_config, op_hist_y_axis_count_config,
274 "Histogram of log append transaction time (nanoseconds) vs. entries appended");
275 plb.add_time_avg(l_librbd_pwl_retire_tx_t, "retire_tx_lat", "Log retire transaction latency");
276 plb.add_u64_counter_histogram(
277 l_librbd_pwl_retire_tx_t_hist, "retire_tx_lat_histogram",
278 op_hist_x_axis_config, op_hist_y_axis_count_config,
279 "Histogram of log retire transaction time (nanoseconds) vs. entries retired");
280
281 m_perfcounter = plb.create_perf_counters();
282 m_image_ctx.cct->get_perfcounters_collection()->add(m_perfcounter);
283}
284
285template <typename I>
286void AbstractWriteLog<I>::perf_stop() {
287 ceph_assert(m_perfcounter);
288 m_image_ctx.cct->get_perfcounters_collection()->remove(m_perfcounter);
289 delete m_perfcounter;
290}
291
292template <typename I>
293void AbstractWriteLog<I>::log_perf() {
294 bufferlist bl;
295 Formatter *f = Formatter::create("json-pretty");
296 bl.append("Perf dump follows\n--- Begin perf dump ---\n");
297 bl.append("{\n");
298 stringstream ss;
299 utime_t now = ceph_clock_now();
300 ss << "\"test_time\": \"" << now << "\",";
301 ss << "\"image\": \"" << m_image_ctx.name << "\",";
302 bl.append(ss);
303 bl.append("\"stats\": ");
304 m_image_ctx.cct->get_perfcounters_collection()->dump_formatted(f, 0);
305 f->flush(bl);
306 bl.append(",\n\"histograms\": ");
307 m_image_ctx.cct->get_perfcounters_collection()->dump_formatted_histograms(f, 0);
308 f->flush(bl);
309 delete f;
310 bl.append("}\n--- End perf dump ---\n");
311 bl.append('\0');
312 ldout(m_image_ctx.cct, 1) << bl.c_str() << dendl;
313}
314
315template <typename I>
316void AbstractWriteLog<I>::periodic_stats() {
317 std::lock_guard locker(m_lock);
a4b75251
TL
318 ldout(m_image_ctx.cct, 1) << "STATS: m_log_entries=" << m_log_entries.size()
319 << ", m_dirty_log_entries=" << m_dirty_log_entries.size()
320 << ", m_free_log_entries=" << m_free_log_entries
321 << ", m_bytes_allocated=" << m_bytes_allocated
322 << ", m_bytes_cached=" << m_bytes_cached
323 << ", m_bytes_dirty=" << m_bytes_dirty
324 << ", bytes available=" << m_bytes_allocated_cap - m_bytes_allocated
325 << ", m_first_valid_entry=" << m_first_valid_entry
326 << ", m_first_free_entry=" << m_first_free_entry
327 << ", m_current_sync_gen=" << m_current_sync_gen
328 << ", m_flushed_sync_gen=" << m_flushed_sync_gen
f67539c2
TL
329 << dendl;
330}
331
332template <typename I>
333void AbstractWriteLog<I>::arm_periodic_stats() {
334 ceph_assert(ceph_mutex_is_locked(*m_timer_lock));
335 if (m_periodic_stats_enabled) {
336 m_timer_ctx = new LambdaContext(
337 [this](int r) {
338 /* m_timer_lock is held */
339 periodic_stats();
340 arm_periodic_stats();
341 });
342 m_timer->add_event_after(LOG_STATS_INTERVAL_SECONDS, m_timer_ctx);
343 }
344}
345
346template <typename I>
347void AbstractWriteLog<I>::update_entries(std::shared_ptr<GenericLogEntry> *log_entry,
348 WriteLogCacheEntry *cache_entry, std::map<uint64_t, bool> &missing_sync_points,
349 std::map<uint64_t, std::shared_ptr<SyncPointLogEntry>> &sync_point_entries,
a4b75251 350 uint64_t entry_index) {
f67539c2
TL
351 bool writer = cache_entry->is_writer();
352 if (cache_entry->is_sync_point()) {
353 ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
354 << " is a sync point. cache_entry=[" << *cache_entry << "]" << dendl;
355 auto sync_point_entry = std::make_shared<SyncPointLogEntry>(cache_entry->sync_gen_number);
356 *log_entry = sync_point_entry;
357 sync_point_entries[cache_entry->sync_gen_number] = sync_point_entry;
358 missing_sync_points.erase(cache_entry->sync_gen_number);
359 m_current_sync_gen = cache_entry->sync_gen_number;
360 } else if (cache_entry->is_write()) {
361 ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
362 << " is a write. cache_entry=[" << *cache_entry << "]" << dendl;
363 auto write_entry =
364 m_builder->create_write_log_entry(nullptr, cache_entry->image_offset_bytes, cache_entry->write_bytes);
365 write_data_to_buffer(write_entry, cache_entry);
366 *log_entry = write_entry;
367 } else if (cache_entry->is_writesame()) {
368 ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
369 << " is a write same. cache_entry=[" << *cache_entry << "]" << dendl;
370 auto ws_entry =
371 m_builder->create_writesame_log_entry(nullptr, cache_entry->image_offset_bytes,
372 cache_entry->write_bytes, cache_entry->ws_datalen);
373 write_data_to_buffer(ws_entry, cache_entry);
374 *log_entry = ws_entry;
375 } else if (cache_entry->is_discard()) {
376 ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
377 << " is a discard. cache_entry=[" << *cache_entry << "]" << dendl;
378 auto discard_entry =
379 std::make_shared<DiscardLogEntry>(nullptr, cache_entry->image_offset_bytes, cache_entry->write_bytes,
380 m_discard_granularity_bytes);
381 *log_entry = discard_entry;
382 } else {
383 lderr(m_image_ctx.cct) << "Unexpected entry type in entry " << entry_index
384 << ", cache_entry=[" << *cache_entry << "]" << dendl;
385 }
386
387 if (writer) {
388 ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
389 << " writes. cache_entry=[" << *cache_entry << "]" << dendl;
390 if (!sync_point_entries[cache_entry->sync_gen_number]) {
391 missing_sync_points[cache_entry->sync_gen_number] = true;
392 }
393 }
394}
395
396template <typename I>
397void AbstractWriteLog<I>::update_sync_points(std::map<uint64_t, bool> &missing_sync_points,
398 std::map<uint64_t, std::shared_ptr<SyncPointLogEntry>> &sync_point_entries,
a4b75251 399 DeferredContexts &later) {
f67539c2
TL
400 /* Create missing sync points. These must not be appended until the
401 * entry reload is complete and the write map is up to
402 * date. Currently this is handled by the deferred contexts object
403 * passed to new_sync_point(). These contexts won't be completed
404 * until this function returns. */
405 for (auto &kv : missing_sync_points) {
406 ldout(m_image_ctx.cct, 5) << "Adding sync point " << kv.first << dendl;
407 if (0 == m_current_sync_gen) {
408 /* The unlikely case where the log contains writing entries, but no sync
409 * points (e.g. because they were all retired) */
410 m_current_sync_gen = kv.first-1;
411 }
412 ceph_assert(kv.first == m_current_sync_gen+1);
413 init_flush_new_sync_point(later);
414 ceph_assert(kv.first == m_current_sync_gen);
20effc67 415 sync_point_entries[kv.first] = m_current_sync_point->log_entry;
f67539c2
TL
416 }
417
418 /*
419 * Iterate over the log entries again (this time via the global
420 * entries list), connecting write entries to their sync points and
421 * updating the sync point stats.
422 *
423 * Add writes to the write log map.
424 */
425 std::shared_ptr<SyncPointLogEntry> previous_sync_point_entry = nullptr;
426 for (auto &log_entry : m_log_entries) {
427 if ((log_entry->write_bytes() > 0) || (log_entry->bytes_dirty() > 0)) {
428 /* This entry is one of the types that write */
429 auto gen_write_entry = static_pointer_cast<GenericWriteLogEntry>(log_entry);
430 if (gen_write_entry) {
431 auto sync_point_entry = sync_point_entries[gen_write_entry->ram_entry.sync_gen_number];
432 if (!sync_point_entry) {
433 lderr(m_image_ctx.cct) << "Sync point missing for entry=[" << *gen_write_entry << "]" << dendl;
434 ceph_assert(false);
435 } else {
436 gen_write_entry->sync_point_entry = sync_point_entry;
437 sync_point_entry->writes++;
438 sync_point_entry->bytes += gen_write_entry->ram_entry.write_bytes;
439 sync_point_entry->writes_completed++;
440 m_blocks_to_log_entries.add_log_entry(gen_write_entry);
441 /* This entry is only dirty if its sync gen number is > the flushed
442 * sync gen number from the root object. */
443 if (gen_write_entry->ram_entry.sync_gen_number > m_flushed_sync_gen) {
444 m_dirty_log_entries.push_back(log_entry);
445 m_bytes_dirty += gen_write_entry->bytes_dirty();
446 } else {
447 gen_write_entry->set_flushed(true);
448 sync_point_entry->writes_flushed++;
449 }
a4b75251
TL
450
451 /* calc m_bytes_allocated & m_bytes_cached */
452 inc_allocated_cached_bytes(log_entry);
f67539c2
TL
453 }
454 }
455 } else {
456 /* This entry is sync point entry */
457 auto sync_point_entry = static_pointer_cast<SyncPointLogEntry>(log_entry);
458 if (sync_point_entry) {
459 if (previous_sync_point_entry) {
460 previous_sync_point_entry->next_sync_point_entry = sync_point_entry;
461 if (previous_sync_point_entry->ram_entry.sync_gen_number > m_flushed_sync_gen) {
462 sync_point_entry->prior_sync_point_flushed = false;
463 ceph_assert(!previous_sync_point_entry->prior_sync_point_flushed ||
464 (0 == previous_sync_point_entry->writes) ||
465 (previous_sync_point_entry->writes >= previous_sync_point_entry->writes_flushed));
466 } else {
467 sync_point_entry->prior_sync_point_flushed = true;
468 ceph_assert(previous_sync_point_entry->prior_sync_point_flushed);
469 ceph_assert(previous_sync_point_entry->writes == previous_sync_point_entry->writes_flushed);
470 }
471 } else {
472 /* There are no previous sync points, so we'll consider them flushed */
473 sync_point_entry->prior_sync_point_flushed = true;
474 }
475 previous_sync_point_entry = sync_point_entry;
476 ldout(m_image_ctx.cct, 10) << "Loaded to sync point=[" << *sync_point_entry << dendl;
477 }
478 }
479 }
480 if (0 == m_current_sync_gen) {
481 /* If a re-opened log was completely flushed, we'll have found no sync point entries here,
482 * and not advanced m_current_sync_gen. Here we ensure it starts past the last flushed sync
483 * point recorded in the log. */
484 m_current_sync_gen = m_flushed_sync_gen;
485 }
486}
487
488template <typename I>
489void AbstractWriteLog<I>::pwl_init(Context *on_finish, DeferredContexts &later) {
490 CephContext *cct = m_image_ctx.cct;
491 ldout(cct, 20) << dendl;
492 ceph_assert(m_cache_state);
493 std::lock_guard locker(m_lock);
494 ceph_assert(!m_initialized);
495 ldout(cct,5) << "image name: " << m_image_ctx.name << " id: " << m_image_ctx.id << dendl;
496
497 if (!m_cache_state->present) {
498 m_cache_state->host = ceph_get_short_hostname();
499 m_cache_state->size = m_image_ctx.config.template get_val<uint64_t>(
500 "rbd_persistent_cache_size");
501
502 string path = m_image_ctx.config.template get_val<string>(
503 "rbd_persistent_cache_path");
504 std::string pool_name = m_image_ctx.md_ctx.get_pool_name();
505 m_cache_state->path = path + "/rbd-pwl." + pool_name + "." + m_image_ctx.id + ".pool";
506 }
507
508 ldout(cct,5) << "pwl_size: " << m_cache_state->size << dendl;
509 ldout(cct,5) << "pwl_path: " << m_cache_state->path << dendl;
510
511 m_log_pool_name = m_cache_state->path;
a4b75251
TL
512 m_log_pool_size = max(m_cache_state->size, MIN_POOL_SIZE);
513 m_log_pool_size = p2align(m_log_pool_size, POOL_SIZE_ALIGN);
514 ldout(cct, 5) << "pool " << m_log_pool_name << " size " << m_log_pool_size
515 << " (adjusted from " << m_cache_state->size << ")" << dendl;
f67539c2
TL
516
517 if ((!m_cache_state->present) &&
518 (access(m_log_pool_name.c_str(), F_OK) == 0)) {
519 ldout(cct, 5) << "There's an existing pool file " << m_log_pool_name
520 << ", While there's no cache in the image metatata." << dendl;
521 if (remove(m_log_pool_name.c_str()) != 0) {
20effc67 522 lderr(cct) << "failed to remove the pool file " << m_log_pool_name
f67539c2
TL
523 << dendl;
524 on_finish->complete(-errno);
525 return;
526 } else {
527 ldout(cct, 5) << "Removed the existing pool file." << dendl;
528 }
529 } else if ((m_cache_state->present) &&
530 (access(m_log_pool_name.c_str(), F_OK) != 0)) {
20effc67
TL
531 lderr(cct) << "can't find the existed pool file: " << m_log_pool_name
532 << ". error: " << cpp_strerror(-errno) << dendl;
f67539c2 533 on_finish->complete(-errno);
a4b75251 534 return;
f67539c2
TL
535 }
536
a4b75251
TL
537 bool succeeded = initialize_pool(on_finish, later);
538 if (!succeeded) {
539 return ;
540 }
f67539c2
TL
541
542 ldout(cct,1) << "pool " << m_log_pool_name << " has " << m_total_log_entries
543 << " log entries, " << m_free_log_entries << " of which are free."
544 << " first_valid=" << m_first_valid_entry
545 << ", first_free=" << m_first_free_entry
546 << ", flushed_sync_gen=" << m_flushed_sync_gen
547 << ", m_current_sync_gen=" << m_current_sync_gen << dendl;
548 if (m_first_free_entry == m_first_valid_entry) {
549 ldout(cct,1) << "write log is empty" << dendl;
550 m_cache_state->empty = true;
551 }
552
553 /* Start the sync point following the last one seen in the
554 * log. Flush the last sync point created during the loading of the
555 * existing log entries. */
556 init_flush_new_sync_point(later);
557 ldout(cct,20) << "new sync point = [" << m_current_sync_point << "]" << dendl;
558
559 m_initialized = true;
560 // Start the thread
561 m_thread_pool.start();
562
563 m_periodic_stats_enabled = m_cache_state->log_periodic_stats;
564 /* Do these after we drop lock */
565 later.add(new LambdaContext([this](int r) {
566 if (m_periodic_stats_enabled) {
567 /* Log stats for the first time */
568 periodic_stats();
569 /* Arm periodic stats logging for the first time */
570 std::lock_guard timer_locker(*m_timer_lock);
571 arm_periodic_stats();
572 }
573 }));
574 m_image_ctx.op_work_queue->queue(on_finish, 0);
575}
576
577template <typename I>
578void AbstractWriteLog<I>::update_image_cache_state(Context *on_finish) {
579 m_cache_state->write_image_cache_state(on_finish);
580}
581
582template <typename I>
583void AbstractWriteLog<I>::init(Context *on_finish) {
584 CephContext *cct = m_image_ctx.cct;
585 ldout(cct, 20) << dendl;
a4b75251
TL
586 auto pname = std::string("librbd-pwl-") + m_image_ctx.id +
587 std::string("-") + m_image_ctx.md_ctx.get_pool_name() +
588 std::string("-") + m_image_ctx.name;
589 perf_start(pname);
f67539c2
TL
590
591 ceph_assert(!m_initialized);
592
593 Context *ctx = new LambdaContext(
594 [this, on_finish](int r) {
595 if (r >= 0) {
596 update_image_cache_state(on_finish);
597 } else {
598 on_finish->complete(r);
599 }
600 });
601
602 DeferredContexts later;
603 pwl_init(ctx, later);
604}
605
606template <typename I>
607void AbstractWriteLog<I>::shut_down(Context *on_finish) {
608 CephContext *cct = m_image_ctx.cct;
609 ldout(cct, 20) << dendl;
610
611 ldout(cct,5) << "image name: " << m_image_ctx.name << " id: " << m_image_ctx.id << dendl;
612
613 Context *ctx = new LambdaContext(
614 [this, on_finish](int r) {
615 ldout(m_image_ctx.cct, 6) << "shutdown complete" << dendl;
616 m_image_ctx.op_work_queue->queue(on_finish, r);
617 });
618 ctx = new LambdaContext(
619 [this, ctx](int r) {
620 ldout(m_image_ctx.cct, 6) << "image cache cleaned" << dendl;
621 Context *next_ctx = override_ctx(r, ctx);
622 bool periodic_stats_enabled = m_periodic_stats_enabled;
623 m_periodic_stats_enabled = false;
624
625 if (periodic_stats_enabled) {
626 /* Log stats one last time if they were enabled */
627 periodic_stats();
628 }
629 {
630 std::lock_guard locker(m_lock);
a4b75251 631 check_image_cache_state_clean();
f67539c2
TL
632 m_wake_up_enabled = false;
633 m_cache_state->clean = true;
634 m_log_entries.clear();
635
636 remove_pool_file();
637
638 if (m_perfcounter) {
639 perf_stop();
640 }
641 }
642 update_image_cache_state(next_ctx);
643 });
a4b75251
TL
644 ctx = new LambdaContext(
645 [this, ctx](int r) {
646 Context *next_ctx = override_ctx(r, ctx);
647 ldout(m_image_ctx.cct, 6) << "waiting for in flight operations" << dendl;
648 // Wait for in progress IOs to complete
649 next_ctx = util::create_async_context_callback(&m_work_queue, next_ctx);
650 m_async_op_tracker.wait_for_ops(next_ctx);
651 });
f67539c2
TL
652 ctx = new LambdaContext(
653 [this, ctx](int r) {
654 Context *next_ctx = override_ctx(r, ctx);
655 {
656 /* Sync with process_writeback_dirty_entries() */
657 RWLock::WLocker entry_reader_wlocker(m_entry_reader_lock);
658 m_shutting_down = true;
659 /* Flush all writes to OSDs (unless disabled) and wait for all
660 in-progress flush writes to complete */
661 ldout(m_image_ctx.cct, 6) << "flushing" << dendl;
662 if (m_periodic_stats_enabled) {
663 periodic_stats();
664 }
665 }
666 flush_dirty_entries(next_ctx);
667 });
f67539c2
TL
668 ctx = new LambdaContext(
669 [this, ctx](int r) {
670 ldout(m_image_ctx.cct, 6) << "Done internal_flush in shutdown" << dendl;
671 m_work_queue.queue(ctx, r);
672 });
673 /* Complete all in-flight writes before shutting down */
674 ldout(m_image_ctx.cct, 6) << "internal_flush in shutdown" << dendl;
675 internal_flush(false, ctx);
676}
677
678template <typename I>
679void AbstractWriteLog<I>::read(Extents&& image_extents,
680 ceph::bufferlist* bl,
681 int fadvise_flags, Context *on_finish) {
682 CephContext *cct = m_image_ctx.cct;
683 utime_t now = ceph_clock_now();
684
685 on_finish = new LambdaContext(
686 [this, on_finish](int r) {
687 m_async_op_tracker.finish_op();
688 on_finish->complete(r);
689 });
690 C_ReadRequest *read_ctx = m_builder->create_read_request(
691 cct, now, m_perfcounter, bl, on_finish);
692 ldout(cct, 20) << "name: " << m_image_ctx.name << " id: " << m_image_ctx.id
20effc67
TL
693 << "image_extents=" << image_extents
694 << ", bl=" << bl
695 << ", on_finish=" << on_finish << dendl;
f67539c2
TL
696
697 ceph_assert(m_initialized);
698 bl->clear();
699 m_perfcounter->inc(l_librbd_pwl_rd_req, 1);
700
a4b75251 701 std::vector<std::shared_ptr<GenericWriteLogEntry>> log_entries_to_read;
f67539c2
TL
702 std::vector<bufferlist*> bls_to_read;
703
704 m_async_op_tracker.start_op();
705 Context *ctx = new LambdaContext(
706 [this, read_ctx, fadvise_flags](int r) {
707 if (read_ctx->miss_extents.empty()) {
708 /* All of this read comes from RWL */
709 read_ctx->complete(0);
710 } else {
711 /* Pass the read misses on to the layer below RWL */
712 m_image_writeback.aio_read(
713 std::move(read_ctx->miss_extents), &read_ctx->miss_bl,
714 fadvise_flags, read_ctx);
715 }
716 });
717
718 /*
719 * The strategy here is to look up all the WriteLogMapEntries that overlap
720 * this read, and iterate through those to separate this read into hits and
721 * misses. A new Extents object is produced here with Extents for each miss
722 * region. The miss Extents is then passed on to the read cache below RWL. We
723 * also produce an ImageExtentBufs for all the extents (hit or miss) in this
724 * read. When the read from the lower cache layer completes, we iterate
725 * through the ImageExtentBufs and insert buffers for each cache hit at the
726 * appropriate spot in the bufferlist returned from below for the miss
727 * read. The buffers we insert here refer directly to regions of various
728 * write log entry data buffers.
729 *
730 * Locking: These buffer objects hold a reference on the write log entries
731 * they refer to. Log entries can't be retired until there are no references.
732 * The GenericWriteLogEntry references are released by the buffer destructor.
733 */
734 for (auto &extent : image_extents) {
735 uint64_t extent_offset = 0;
736 RWLock::RLocker entry_reader_locker(m_entry_reader_lock);
737 WriteLogMapEntries map_entries = m_blocks_to_log_entries.find_map_entries(
738 block_extent(extent));
739 for (auto &map_entry : map_entries) {
740 Extent entry_image_extent(pwl::image_extent(map_entry.block_extent));
741 /* If this map entry starts after the current image extent offset ... */
742 if (entry_image_extent.first > extent.first + extent_offset) {
743 /* ... add range before map_entry to miss extents */
744 uint64_t miss_extent_start = extent.first + extent_offset;
745 uint64_t miss_extent_length = entry_image_extent.first -
746 miss_extent_start;
747 Extent miss_extent(miss_extent_start, miss_extent_length);
748 read_ctx->miss_extents.push_back(miss_extent);
749 /* Add miss range to read extents */
750 auto miss_extent_buf = std::make_shared<ImageExtentBuf>(miss_extent);
751 read_ctx->read_extents.push_back(miss_extent_buf);
752 extent_offset += miss_extent_length;
753 }
754 ceph_assert(entry_image_extent.first <= extent.first + extent_offset);
755 uint64_t entry_offset = 0;
756 /* If this map entry starts before the current image extent offset ... */
757 if (entry_image_extent.first < extent.first + extent_offset) {
758 /* ... compute offset into log entry for this read extent */
759 entry_offset = (extent.first + extent_offset) - entry_image_extent.first;
760 }
761 /* This read hit ends at the end of the extent or the end of the log
762 entry, whichever is less. */
763 uint64_t entry_hit_length = min(entry_image_extent.second - entry_offset,
764 extent.second - extent_offset);
765 Extent hit_extent(entry_image_extent.first, entry_hit_length);
766 if (0 == map_entry.log_entry->write_bytes() &&
767 0 < map_entry.log_entry->bytes_dirty()) {
768 /* discard log entry */
769 ldout(cct, 20) << "discard log entry" << dendl;
770 auto discard_entry = map_entry.log_entry;
771 ldout(cct, 20) << "read hit on discard entry: log_entry="
772 << *discard_entry
773 << dendl;
774 /* Discards read as zero, so we'll construct a bufferlist of zeros */
775 bufferlist zero_bl;
776 zero_bl.append_zero(entry_hit_length);
777 /* Add hit extent to read extents */
778 auto hit_extent_buf = std::make_shared<ImageExtentBuf>(
779 hit_extent, zero_bl);
780 read_ctx->read_extents.push_back(hit_extent_buf);
781 } else {
782 ldout(cct, 20) << "write or writesame log entry" << dendl;
783 /* write and writesame log entry */
784 /* Offset of the map entry into the log entry's buffer */
785 uint64_t map_entry_buffer_offset = entry_image_extent.first -
786 map_entry.log_entry->ram_entry.image_offset_bytes;
787 /* Offset into the log entry buffer of this read hit */
788 uint64_t read_buffer_offset = map_entry_buffer_offset + entry_offset;
789 /* Create buffer object referring to pmem pool for this read hit */
790 collect_read_extents(
791 read_buffer_offset, map_entry, log_entries_to_read, bls_to_read,
792 entry_hit_length, hit_extent, read_ctx);
793 }
794 /* Exclude RWL hit range from buffer and extent */
795 extent_offset += entry_hit_length;
796 ldout(cct, 20) << map_entry << dendl;
797 }
798 /* If the last map entry didn't consume the entire image extent ... */
799 if (extent.second > extent_offset) {
800 /* ... add the rest of this extent to miss extents */
801 uint64_t miss_extent_start = extent.first + extent_offset;
802 uint64_t miss_extent_length = extent.second - extent_offset;
803 Extent miss_extent(miss_extent_start, miss_extent_length);
804 read_ctx->miss_extents.push_back(miss_extent);
805 /* Add miss range to read extents */
806 auto miss_extent_buf = std::make_shared<ImageExtentBuf>(miss_extent);
807 read_ctx->read_extents.push_back(miss_extent_buf);
808 extent_offset += miss_extent_length;
809 }
810 }
811
20effc67
TL
812 ldout(cct, 20) << "miss_extents=" << read_ctx->miss_extents
813 << ", miss_bl=" << read_ctx->miss_bl << dendl;
f67539c2
TL
814
815 complete_read(log_entries_to_read, bls_to_read, ctx);
816}
817
818template <typename I>
819void AbstractWriteLog<I>::write(Extents &&image_extents,
820 bufferlist&& bl,
821 int fadvise_flags,
822 Context *on_finish) {
823 CephContext *cct = m_image_ctx.cct;
824
825 ldout(cct, 20) << "aio_write" << dendl;
826
827 utime_t now = ceph_clock_now();
828 m_perfcounter->inc(l_librbd_pwl_wr_req, 1);
829
830 ceph_assert(m_initialized);
831
20effc67
TL
832 /* Split image extents larger than 1M. This isn't strictly necessary but
833 * makes libpmemobj allocator's job easier and reduces pmemobj_defrag() cost.
834 * We plan to manage pmem space and allocation by ourselves in the future.
f67539c2
TL
835 */
836 Extents split_image_extents;
837 uint64_t max_extent_size = get_max_extent();
838 if (max_extent_size != 0) {
839 for (auto extent : image_extents) {
840 if (extent.second > max_extent_size) {
841 uint64_t off = extent.first;
842 uint64_t extent_bytes = extent.second;
843 for (int i = 0; extent_bytes != 0; ++i) {
844 Extent _ext;
845 _ext.first = off + i * max_extent_size;
846 _ext.second = std::min(max_extent_size, extent_bytes);
847 extent_bytes = extent_bytes - _ext.second ;
848 split_image_extents.emplace_back(_ext);
849 }
850 } else {
851 split_image_extents.emplace_back(extent);
852 }
853 }
854 } else {
855 split_image_extents = image_extents;
856 }
857
858 C_WriteRequestT *write_req =
859 m_builder->create_write_request(*this, now, std::move(split_image_extents),
860 std::move(bl), fadvise_flags, m_lock,
861 m_perfcounter, on_finish);
862 m_perfcounter->inc(l_librbd_pwl_wr_bytes,
863 write_req->image_extents_summary.total_bytes);
864
865 /* The lambda below will be called when the block guard for all
866 * blocks affected by this write is obtained */
867 GuardedRequestFunctionContext *guarded_ctx =
868 new GuardedRequestFunctionContext([this,
869 write_req](GuardedRequestFunctionContext &guard_ctx) {
870 write_req->blockguard_acquired(guard_ctx);
871 alloc_and_dispatch_io_req(write_req);
872 });
873
874 detain_guarded_request(write_req, guarded_ctx, false);
875}
876
877template <typename I>
878void AbstractWriteLog<I>::discard(uint64_t offset, uint64_t length,
879 uint32_t discard_granularity_bytes,
880 Context *on_finish) {
881 CephContext *cct = m_image_ctx.cct;
882
883 ldout(cct, 20) << dendl;
884
885 utime_t now = ceph_clock_now();
886 m_perfcounter->inc(l_librbd_pwl_discard, 1);
887 Extents discard_extents = {{offset, length}};
888 m_discard_granularity_bytes = discard_granularity_bytes;
889
890 ceph_assert(m_initialized);
891
892 auto *discard_req =
893 new C_DiscardRequestT(*this, now, std::move(discard_extents), discard_granularity_bytes,
894 m_lock, m_perfcounter, on_finish);
895
896 /* The lambda below will be called when the block guard for all
897 * blocks affected by this write is obtained */
898 GuardedRequestFunctionContext *guarded_ctx =
899 new GuardedRequestFunctionContext([this, discard_req](GuardedRequestFunctionContext &guard_ctx) {
900 discard_req->blockguard_acquired(guard_ctx);
901 alloc_and_dispatch_io_req(discard_req);
902 });
903
904 detain_guarded_request(discard_req, guarded_ctx, false);
905}
906
907/**
908 * Aio_flush completes when all previously completed writes are
909 * flushed to persistent cache. We make a best-effort attempt to also
910 * defer until all in-progress writes complete, but we may not know
911 * about all of the writes the application considers in-progress yet,
912 * due to uncertainty in the IO submission workq (multiple WQ threads
913 * may allow out-of-order submission).
914 *
915 * This flush operation will not wait for writes deferred for overlap
916 * in the block guard.
917 */
918template <typename I>
919void AbstractWriteLog<I>::flush(io::FlushSource flush_source, Context *on_finish) {
920 CephContext *cct = m_image_ctx.cct;
921 ldout(cct, 20) << "on_finish=" << on_finish << " flush_source=" << flush_source << dendl;
922
923 if (io::FLUSH_SOURCE_SHUTDOWN == flush_source || io::FLUSH_SOURCE_INTERNAL == flush_source ||
924 io::FLUSH_SOURCE_WRITE_BLOCK == flush_source) {
925 internal_flush(false, on_finish);
926 return;
927 }
928 m_perfcounter->inc(l_librbd_pwl_aio_flush, 1);
929
930 /* May be called even if initialization fails */
931 if (!m_initialized) {
932 ldout(cct, 05) << "never initialized" << dendl;
933 /* Deadlock if completed here */
934 m_image_ctx.op_work_queue->queue(on_finish, 0);
935 return;
936 }
937
938 {
939 std::shared_lock image_locker(m_image_ctx.image_lock);
940 if (m_image_ctx.snap_id != CEPH_NOSNAP || m_image_ctx.read_only) {
941 on_finish->complete(-EROFS);
942 return;
943 }
944 }
945
946 auto flush_req = make_flush_req(on_finish);
947
948 GuardedRequestFunctionContext *guarded_ctx =
949 new GuardedRequestFunctionContext([this, flush_req](GuardedRequestFunctionContext &guard_ctx) {
950 ldout(m_image_ctx.cct, 20) << "flush_req=" << flush_req << " cell=" << guard_ctx.cell << dendl;
951 ceph_assert(guard_ctx.cell);
952 flush_req->detained = guard_ctx.state.detained;
953 /* We don't call flush_req->set_cell(), because the block guard will be released here */
954 {
955 DeferredContexts post_unlock; /* Do these when the lock below is released */
956 std::lock_guard locker(m_lock);
957
958 if (!m_persist_on_flush && m_persist_on_write_until_flush) {
959 m_persist_on_flush = true;
960 ldout(m_image_ctx.cct, 5) << "now persisting on flush" << dendl;
961 }
962
963 /*
964 * Create a new sync point if there have been writes since the last
965 * one.
966 *
967 * We do not flush the caches below the RWL here.
968 */
969 flush_new_sync_point_if_needed(flush_req, post_unlock);
970 }
971
972 release_guarded_request(guard_ctx.cell);
973 });
974
975 detain_guarded_request(flush_req, guarded_ctx, true);
976}
977
978template <typename I>
979void AbstractWriteLog<I>::writesame(uint64_t offset, uint64_t length,
980 bufferlist&& bl, int fadvise_flags,
981 Context *on_finish) {
982 CephContext *cct = m_image_ctx.cct;
983
984 ldout(cct, 20) << "aio_writesame" << dendl;
985
986 utime_t now = ceph_clock_now();
987 Extents ws_extents = {{offset, length}};
988 m_perfcounter->inc(l_librbd_pwl_ws, 1);
989 ceph_assert(m_initialized);
990
991 /* A write same request is also a write request. The key difference is the
992 * write same data buffer is shorter than the extent of the request. The full
993 * extent will be used in the block guard, and appear in
994 * m_blocks_to_log_entries_map. The data buffer allocated for the WS is only
995 * as long as the length of the bl here, which is the pattern that's repeated
996 * in the image for the entire length of this WS. Read hits and flushing of
997 * write sames are different than normal writes. */
998 C_WriteSameRequestT *ws_req =
999 m_builder->create_writesame_request(*this, now, std::move(ws_extents), std::move(bl),
1000 fadvise_flags, m_lock, m_perfcounter, on_finish);
1001 m_perfcounter->inc(l_librbd_pwl_ws_bytes, ws_req->image_extents_summary.total_bytes);
1002
1003 /* The lambda below will be called when the block guard for all
1004 * blocks affected by this write is obtained */
1005 GuardedRequestFunctionContext *guarded_ctx =
1006 new GuardedRequestFunctionContext([this, ws_req](GuardedRequestFunctionContext &guard_ctx) {
1007 ws_req->blockguard_acquired(guard_ctx);
1008 alloc_and_dispatch_io_req(ws_req);
1009 });
1010
1011 detain_guarded_request(ws_req, guarded_ctx, false);
1012}
1013
1014template <typename I>
1015void AbstractWriteLog<I>::compare_and_write(Extents &&image_extents,
1016 bufferlist&& cmp_bl,
1017 bufferlist&& bl,
1018 uint64_t *mismatch_offset,
1019 int fadvise_flags,
1020 Context *on_finish) {
1021 ldout(m_image_ctx.cct, 20) << dendl;
1022
1023 utime_t now = ceph_clock_now();
1024 m_perfcounter->inc(l_librbd_pwl_cmp, 1);
1025 ceph_assert(m_initialized);
1026
1027 /* A compare and write request is also a write request. We only allocate
1028 * resources and dispatch this write request if the compare phase
1029 * succeeds. */
1030 C_WriteRequestT *cw_req =
1031 m_builder->create_comp_and_write_request(
1032 *this, now, std::move(image_extents), std::move(cmp_bl), std::move(bl),
1033 mismatch_offset, fadvise_flags, m_lock, m_perfcounter, on_finish);
1034 m_perfcounter->inc(l_librbd_pwl_cmp_bytes, cw_req->image_extents_summary.total_bytes);
1035
1036 /* The lambda below will be called when the block guard for all
1037 * blocks affected by this write is obtained */
1038 GuardedRequestFunctionContext *guarded_ctx =
1039 new GuardedRequestFunctionContext([this, cw_req](GuardedRequestFunctionContext &guard_ctx) {
1040 cw_req->blockguard_acquired(guard_ctx);
1041
1042 auto read_complete_ctx = new LambdaContext(
1043 [this, cw_req](int r) {
1044 ldout(m_image_ctx.cct, 20) << "name: " << m_image_ctx.name << " id: " << m_image_ctx.id
1045 << "cw_req=" << cw_req << dendl;
1046
1047 /* Compare read_bl to cmp_bl to determine if this will produce a write */
1048 buffer::list aligned_read_bl;
1049 if (cw_req->cmp_bl.length() < cw_req->read_bl.length()) {
1050 aligned_read_bl.substr_of(cw_req->read_bl, 0, cw_req->cmp_bl.length());
1051 }
1052 if (cw_req->cmp_bl.contents_equal(cw_req->read_bl) ||
1053 cw_req->cmp_bl.contents_equal(aligned_read_bl)) {
1054 /* Compare phase succeeds. Begin write */
1055 ldout(m_image_ctx.cct, 5) << " cw_req=" << cw_req << " compare matched" << dendl;
1056 cw_req->compare_succeeded = true;
1057 *cw_req->mismatch_offset = 0;
1058 /* Continue with this request as a write. Blockguard release and
1059 * user request completion handled as if this were a plain
1060 * write. */
1061 alloc_and_dispatch_io_req(cw_req);
1062 } else {
1063 /* Compare phase fails. Comp-and write ends now. */
1064 ldout(m_image_ctx.cct, 15) << " cw_req=" << cw_req << " compare failed" << dendl;
1065 /* Bufferlist doesn't tell us where they differed, so we'll have to determine that here */
1066 uint64_t bl_index = 0;
1067 for (bl_index = 0; bl_index < cw_req->cmp_bl.length(); bl_index++) {
1068 if (cw_req->cmp_bl[bl_index] != cw_req->read_bl[bl_index]) {
1069 ldout(m_image_ctx.cct, 15) << " cw_req=" << cw_req << " mismatch at " << bl_index << dendl;
1070 break;
1071 }
1072 }
1073 cw_req->compare_succeeded = false;
1074 *cw_req->mismatch_offset = bl_index;
1075 cw_req->complete_user_request(-EILSEQ);
1076 cw_req->release_cell();
1077 cw_req->complete(0);
1078 }
1079 });
1080
1081 /* Read phase of comp-and-write must read through RWL */
1082 Extents image_extents_copy = cw_req->image_extents;
1083 read(std::move(image_extents_copy), &cw_req->read_bl, cw_req->fadvise_flags, read_complete_ctx);
1084 });
1085
1086 detain_guarded_request(cw_req, guarded_ctx, false);
1087}
1088
1089template <typename I>
1090void AbstractWriteLog<I>::flush(Context *on_finish) {
1091 internal_flush(false, on_finish);
1092}
1093
1094template <typename I>
1095void AbstractWriteLog<I>::invalidate(Context *on_finish) {
1096 internal_flush(true, on_finish);
1097}
1098
1099template <typename I>
1100CephContext *AbstractWriteLog<I>::get_context() {
1101 return m_image_ctx.cct;
1102}
1103
1104template <typename I>
1105BlockGuardCell* AbstractWriteLog<I>::detain_guarded_request_helper(GuardedRequest &req)
1106{
1107 CephContext *cct = m_image_ctx.cct;
1108 BlockGuardCell *cell;
1109
1110 ceph_assert(ceph_mutex_is_locked_by_me(m_blockguard_lock));
1111 ldout(cct, 20) << dendl;
1112
1113 int r = m_write_log_guard.detain(req.block_extent, &req, &cell);
1114 ceph_assert(r>=0);
1115 if (r > 0) {
1116 ldout(cct, 20) << "detaining guarded request due to in-flight requests: "
1117 << "req=" << req << dendl;
1118 return nullptr;
1119 }
1120
1121 ldout(cct, 20) << "in-flight request cell: " << cell << dendl;
1122 return cell;
1123}
1124
1125template <typename I>
1126BlockGuardCell* AbstractWriteLog<I>::detain_guarded_request_barrier_helper(
1127 GuardedRequest &req)
1128{
1129 BlockGuardCell *cell = nullptr;
1130
1131 ceph_assert(ceph_mutex_is_locked_by_me(m_blockguard_lock));
1132 ldout(m_image_ctx.cct, 20) << dendl;
1133
1134 if (m_barrier_in_progress) {
1135 req.guard_ctx->state.queued = true;
1136 m_awaiting_barrier.push_back(req);
1137 } else {
1138 bool barrier = req.guard_ctx->state.barrier;
1139 if (barrier) {
1140 m_barrier_in_progress = true;
1141 req.guard_ctx->state.current_barrier = true;
1142 }
1143 cell = detain_guarded_request_helper(req);
1144 if (barrier) {
1145 /* Only non-null if the barrier acquires the guard now */
1146 m_barrier_cell = cell;
1147 }
1148 }
1149
1150 return cell;
1151}
1152
1153template <typename I>
1154void AbstractWriteLog<I>::detain_guarded_request(
1155 C_BlockIORequestT *request,
1156 GuardedRequestFunctionContext *guarded_ctx,
1157 bool is_barrier)
1158{
1159 BlockExtent extent;
1160 if (request) {
1161 extent = request->image_extents_summary.block_extent();
1162 } else {
1163 extent = block_extent(whole_volume_extent());
1164 }
1165 auto req = GuardedRequest(extent, guarded_ctx, is_barrier);
1166 BlockGuardCell *cell = nullptr;
1167
1168 ldout(m_image_ctx.cct, 20) << dendl;
1169 {
1170 std::lock_guard locker(m_blockguard_lock);
1171 cell = detain_guarded_request_barrier_helper(req);
1172 }
1173 if (cell) {
1174 req.guard_ctx->cell = cell;
1175 req.guard_ctx->complete(0);
1176 }
1177}
1178
1179template <typename I>
1180void AbstractWriteLog<I>::release_guarded_request(BlockGuardCell *released_cell)
1181{
1182 CephContext *cct = m_image_ctx.cct;
1183 WriteLogGuard::BlockOperations block_reqs;
1184 ldout(cct, 20) << "released_cell=" << released_cell << dendl;
1185
1186 {
1187 std::lock_guard locker(m_blockguard_lock);
1188 m_write_log_guard.release(released_cell, &block_reqs);
1189
1190 for (auto &req : block_reqs) {
1191 req.guard_ctx->state.detained = true;
1192 BlockGuardCell *detained_cell = detain_guarded_request_helper(req);
1193 if (detained_cell) {
1194 if (req.guard_ctx->state.current_barrier) {
1195 /* The current barrier is acquiring the block guard, so now we know its cell */
1196 m_barrier_cell = detained_cell;
1197 /* detained_cell could be == released_cell here */
1198 ldout(cct, 20) << "current barrier cell=" << detained_cell << " req=" << req << dendl;
1199 }
1200 req.guard_ctx->cell = detained_cell;
1201 m_work_queue.queue(req.guard_ctx);
1202 }
1203 }
1204
1205 if (m_barrier_in_progress && (released_cell == m_barrier_cell)) {
1206 ldout(cct, 20) << "current barrier released cell=" << released_cell << dendl;
1207 /* The released cell is the current barrier request */
1208 m_barrier_in_progress = false;
1209 m_barrier_cell = nullptr;
1210 /* Move waiting requests into the blockguard. Stop if there's another barrier */
1211 while (!m_barrier_in_progress && !m_awaiting_barrier.empty()) {
1212 auto &req = m_awaiting_barrier.front();
1213 ldout(cct, 20) << "submitting queued request to blockguard: " << req << dendl;
1214 BlockGuardCell *detained_cell = detain_guarded_request_barrier_helper(req);
1215 if (detained_cell) {
1216 req.guard_ctx->cell = detained_cell;
1217 m_work_queue.queue(req.guard_ctx);
1218 }
1219 m_awaiting_barrier.pop_front();
1220 }
1221 }
1222 }
1223
1224 ldout(cct, 20) << "exit" << dendl;
1225}
1226
1227template <typename I>
1228void AbstractWriteLog<I>::append_scheduled(GenericLogOperations &ops, bool &ops_remain,
1229 bool &appending, bool isRWL)
1230{
1231 const unsigned long int OPS_APPENDED = isRWL ? MAX_ALLOC_PER_TRANSACTION
1232 : MAX_WRITES_PER_SYNC_POINT;
1233 {
1234 std::lock_guard locker(m_lock);
1235 if (!appending && m_appending) {
1236 /* Another thread is appending */
1237 ldout(m_image_ctx.cct, 15) << "Another thread is appending" << dendl;
1238 return;
1239 }
1240 if (m_ops_to_append.size()) {
1241 appending = true;
1242 m_appending = true;
1243 auto last_in_batch = m_ops_to_append.begin();
1244 unsigned int ops_to_append = m_ops_to_append.size();
1245 if (ops_to_append > OPS_APPENDED) {
1246 ops_to_append = OPS_APPENDED;
1247 }
1248 std::advance(last_in_batch, ops_to_append);
1249 ops.splice(ops.end(), m_ops_to_append, m_ops_to_append.begin(), last_in_batch);
1250 ops_remain = true; /* Always check again before leaving */
20effc67
TL
1251 ldout(m_image_ctx.cct, 20) << "appending " << ops.size() << ", remain "
1252 << m_ops_to_append.size() << dendl;
f67539c2
TL
1253 } else if (isRWL) {
1254 ops_remain = false;
1255 if (appending) {
1256 appending = false;
1257 m_appending = false;
1258 }
1259 }
1260 }
1261}
1262
1263template <typename I>
a4b75251 1264void AbstractWriteLog<I>::schedule_append(GenericLogOperationsVector &ops, C_BlockIORequestT *req)
f67539c2
TL
1265{
1266 GenericLogOperations to_append(ops.begin(), ops.end());
1267
a4b75251 1268 schedule_append_ops(to_append, req);
f67539c2
TL
1269}
1270
1271template <typename I>
a4b75251 1272void AbstractWriteLog<I>::schedule_append(GenericLogOperationSharedPtr op, C_BlockIORequestT *req)
f67539c2
TL
1273{
1274 GenericLogOperations to_append { op };
1275
a4b75251 1276 schedule_append_ops(to_append, req);
f67539c2
TL
1277}
1278
1279/*
1280 * Complete a set of write ops with the result of append_op_entries.
1281 */
1282template <typename I>
1283void AbstractWriteLog<I>::complete_op_log_entries(GenericLogOperations &&ops,
1284 const int result)
1285{
1286 GenericLogEntries dirty_entries;
1287 int published_reserves = 0;
1288 ldout(m_image_ctx.cct, 20) << __func__ << ": completing" << dendl;
1289 for (auto &op : ops) {
1290 utime_t now = ceph_clock_now();
1291 auto log_entry = op->get_log_entry();
1292 log_entry->completed = true;
1293 if (op->is_writing_op()) {
1294 op->mark_log_entry_completed();
1295 dirty_entries.push_back(log_entry);
1296 }
1297 if (log_entry->is_write_entry()) {
1298 release_ram(log_entry);
1299 }
1300 if (op->reserved_allocated()) {
1301 published_reserves++;
1302 }
1303 {
1304 std::lock_guard locker(m_lock);
1305 m_unpublished_reserves -= published_reserves;
1306 m_dirty_log_entries.splice(m_dirty_log_entries.end(), dirty_entries);
1307 }
1308 op->complete(result);
1309 m_perfcounter->tinc(l_librbd_pwl_log_op_dis_to_app_t,
a4b75251 1310 op->log_append_start_time - op->dispatch_time);
f67539c2
TL
1311 m_perfcounter->tinc(l_librbd_pwl_log_op_dis_to_cmp_t, now - op->dispatch_time);
1312 m_perfcounter->hinc(l_librbd_pwl_log_op_dis_to_cmp_t_hist,
1313 utime_t(now - op->dispatch_time).to_nsec(),
1314 log_entry->ram_entry.write_bytes);
a4b75251 1315 utime_t app_lat = op->log_append_comp_time - op->log_append_start_time;
f67539c2
TL
1316 m_perfcounter->tinc(l_librbd_pwl_log_op_app_to_appc_t, app_lat);
1317 m_perfcounter->hinc(l_librbd_pwl_log_op_app_to_appc_t_hist, app_lat.to_nsec(),
1318 log_entry->ram_entry.write_bytes);
a4b75251 1319 m_perfcounter->tinc(l_librbd_pwl_log_op_app_to_cmp_t, now - op->log_append_start_time);
f67539c2
TL
1320 }
1321 // New entries may be flushable
1322 {
1323 std::lock_guard locker(m_lock);
1324 wake_up();
1325 }
1326}
1327
1328/**
1329 * Dispatch as many deferred writes as possible
1330 */
1331template <typename I>
1332void AbstractWriteLog<I>::dispatch_deferred_writes(void)
1333{
1334 C_BlockIORequestT *front_req = nullptr; /* req still on front of deferred list */
1335 C_BlockIORequestT *allocated_req = nullptr; /* req that was allocated, and is now off the list */
1336 bool allocated = false; /* front_req allocate succeeded */
1337 bool cleared_dispatching_flag = false;
1338
1339 /* If we can't become the dispatcher, we'll exit */
1340 {
1341 std::lock_guard locker(m_lock);
1342 if (m_dispatching_deferred_ops ||
1343 !m_deferred_ios.size()) {
1344 return;
1345 }
1346 m_dispatching_deferred_ops = true;
1347 }
1348
1349 /* There are ops to dispatch, and this should be the only thread dispatching them */
1350 {
1351 std::lock_guard deferred_dispatch(m_deferred_dispatch_lock);
1352 do {
1353 {
1354 std::lock_guard locker(m_lock);
1355 ceph_assert(m_dispatching_deferred_ops);
1356 if (allocated) {
1357 /* On the 2..n-1 th time we get lock, front_req->alloc_resources() will
1358 * have succeeded, and we'll need to pop it off the deferred ops list
1359 * here. */
1360 ceph_assert(front_req);
1361 ceph_assert(!allocated_req);
1362 m_deferred_ios.pop_front();
1363 allocated_req = front_req;
1364 front_req = nullptr;
1365 allocated = false;
1366 }
1367 ceph_assert(!allocated);
1368 if (!allocated && front_req) {
1369 /* front_req->alloc_resources() failed on the last iteration.
1370 * We'll stop dispatching. */
1371 wake_up();
1372 front_req = nullptr;
1373 ceph_assert(!cleared_dispatching_flag);
1374 m_dispatching_deferred_ops = false;
1375 cleared_dispatching_flag = true;
1376 } else {
1377 ceph_assert(!front_req);
1378 if (m_deferred_ios.size()) {
1379 /* New allocation candidate */
1380 front_req = m_deferred_ios.front();
1381 } else {
1382 ceph_assert(!cleared_dispatching_flag);
1383 m_dispatching_deferred_ops = false;
1384 cleared_dispatching_flag = true;
1385 }
1386 }
1387 }
1388 /* Try allocating for front_req before we decide what to do with allocated_req
1389 * (if any) */
1390 if (front_req) {
1391 ceph_assert(!cleared_dispatching_flag);
1392 allocated = front_req->alloc_resources();
1393 }
1394 if (allocated_req && front_req && allocated) {
1395 /* Push dispatch of the first allocated req to a wq */
1396 m_work_queue.queue(new LambdaContext(
1397 [this, allocated_req](int r) {
1398 allocated_req->dispatch();
1399 }), 0);
1400 allocated_req = nullptr;
1401 }
1402 ceph_assert(!(allocated_req && front_req && allocated));
1403
1404 /* Continue while we're still considering the front of the deferred ops list */
1405 } while (front_req);
1406 ceph_assert(!allocated);
1407 }
1408 ceph_assert(cleared_dispatching_flag);
1409
1410 /* If any deferred requests were allocated, the last one will still be in allocated_req */
1411 if (allocated_req) {
1412 allocated_req->dispatch();
1413 }
1414}
1415
1416/**
1417 * Returns the lanes used by this write, and attempts to dispatch the next
1418 * deferred write
1419 */
1420template <typename I>
1421void AbstractWriteLog<I>::release_write_lanes(C_BlockIORequestT *req)
1422{
1423 {
1424 std::lock_guard locker(m_lock);
1425 m_free_lanes += req->image_extents.size();
1426 }
1427 dispatch_deferred_writes();
1428}
1429
1430/**
1431 * Attempts to allocate log resources for a write. Write is dispatched if
1432 * resources are available, or queued if they aren't.
1433 */
1434template <typename I>
1435void AbstractWriteLog<I>::alloc_and_dispatch_io_req(C_BlockIORequestT *req)
1436{
1437 bool dispatch_here = false;
1438
1439 {
1440 /* If there are already deferred writes, queue behind them for resources */
1441 {
1442 std::lock_guard locker(m_lock);
1443 dispatch_here = m_deferred_ios.empty();
1444 // Only flush req's total_bytes is the max uint64
a4b75251
TL
1445 if (req->image_extents_summary.total_bytes ==
1446 std::numeric_limits<uint64_t>::max() &&
1447 static_cast<C_FlushRequestT *>(req)->internal == true) {
f67539c2
TL
1448 dispatch_here = true;
1449 }
1450 }
1451 if (dispatch_here) {
1452 dispatch_here = req->alloc_resources();
1453 }
1454 if (dispatch_here) {
1455 ldout(m_image_ctx.cct, 20) << "dispatching" << dendl;
1456 req->dispatch();
1457 } else {
1458 req->deferred();
1459 {
1460 std::lock_guard locker(m_lock);
1461 m_deferred_ios.push_back(req);
1462 }
1463 ldout(m_image_ctx.cct, 20) << "deferred IOs: " << m_deferred_ios.size() << dendl;
1464 dispatch_deferred_writes();
1465 }
1466 }
1467}
1468
1469template <typename I>
a4b75251
TL
1470bool AbstractWriteLog<I>::check_allocation(
1471 C_BlockIORequestT *req, uint64_t bytes_cached, uint64_t bytes_dirtied,
1472 uint64_t bytes_allocated, uint32_t num_lanes, uint32_t num_log_entries,
1473 uint32_t num_unpublished_reserves) {
f67539c2
TL
1474 bool alloc_succeeds = true;
1475 bool no_space = false;
1476 {
1477 std::lock_guard locker(m_lock);
1478 if (m_free_lanes < num_lanes) {
f67539c2
TL
1479 ldout(m_image_ctx.cct, 20) << "not enough free lanes (need "
1480 << num_lanes
1481 << ", have " << m_free_lanes << ") "
1482 << *req << dendl;
1483 alloc_succeeds = false;
1484 /* This isn't considered a "no space" alloc fail. Lanes are a throttling mechanism. */
1485 }
1486 if (m_free_log_entries < num_log_entries) {
f67539c2
TL
1487 ldout(m_image_ctx.cct, 20) << "not enough free entries (need "
1488 << num_log_entries
1489 << ", have " << m_free_log_entries << ") "
1490 << *req << dendl;
1491 alloc_succeeds = false;
1492 no_space = true; /* Entries must be retired */
1493 }
1494 /* Don't attempt buffer allocate if we've exceeded the "full" threshold */
a4b75251 1495 if (m_bytes_allocated + bytes_allocated > m_bytes_allocated_cap) {
20effc67
TL
1496 ldout(m_image_ctx.cct, 20) << "Waiting for allocation cap (cap="
1497 << m_bytes_allocated_cap
1498 << ", allocated=" << m_bytes_allocated
1499 << ") in write [" << *req << "]" << dendl;
f67539c2
TL
1500 alloc_succeeds = false;
1501 no_space = true; /* Entries must be retired */
1502 }
1503 }
1504
1505 if (alloc_succeeds) {
1506 reserve_cache(req, alloc_succeeds, no_space);
1507 }
1508
1509 if (alloc_succeeds) {
1510 std::lock_guard locker(m_lock);
1511 /* We need one free log entry per extent (each is a separate entry), and
1512 * one free "lane" for remote replication. */
1513 if ((m_free_lanes >= num_lanes) &&
a4b75251
TL
1514 (m_free_log_entries >= num_log_entries) &&
1515 (m_bytes_allocated_cap >= m_bytes_allocated + bytes_allocated)) {
f67539c2
TL
1516 m_free_lanes -= num_lanes;
1517 m_free_log_entries -= num_log_entries;
1518 m_unpublished_reserves += num_unpublished_reserves;
1519 m_bytes_allocated += bytes_allocated;
1520 m_bytes_cached += bytes_cached;
1521 m_bytes_dirty += bytes_dirtied;
f67539c2
TL
1522 } else {
1523 alloc_succeeds = false;
1524 }
1525 }
1526
1527 if (!alloc_succeeds && no_space) {
1528 /* Expedite flushing and/or retiring */
1529 std::lock_guard locker(m_lock);
1530 m_alloc_failed_since_retire = true;
1531 m_last_alloc_fail = ceph_clock_now();
1532 }
1533
1534 return alloc_succeeds;
1535}
1536
1537template <typename I>
1538C_FlushRequest<AbstractWriteLog<I>>* AbstractWriteLog<I>::make_flush_req(Context *on_finish) {
1539 utime_t flush_begins = ceph_clock_now();
1540 bufferlist bl;
1541 auto *flush_req =
1542 new C_FlushRequestT(*this, flush_begins, Extents({whole_volume_extent()}),
1543 std::move(bl), 0, m_lock, m_perfcounter, on_finish);
1544
1545 return flush_req;
1546}
1547
1548template <typename I>
1549void AbstractWriteLog<I>::wake_up() {
1550 CephContext *cct = m_image_ctx.cct;
1551 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1552
1553 if (!m_wake_up_enabled) {
1554 // wake_up is disabled during shutdown after flushing completes
1555 ldout(m_image_ctx.cct, 6) << "deferred processing disabled" << dendl;
1556 return;
1557 }
1558
1559 if (m_wake_up_requested && m_wake_up_scheduled) {
1560 return;
1561 }
1562
1563 ldout(cct, 20) << dendl;
1564
1565 /* Wake-up can be requested while it's already scheduled */
1566 m_wake_up_requested = true;
1567
1568 /* Wake-up cannot be scheduled if it's already scheduled */
1569 if (m_wake_up_scheduled) {
1570 return;
1571 }
1572 m_wake_up_scheduled = true;
1573 m_async_process_work++;
1574 m_async_op_tracker.start_op();
1575 m_work_queue.queue(new LambdaContext(
1576 [this](int r) {
1577 process_work();
1578 m_async_op_tracker.finish_op();
1579 m_async_process_work--;
1580 }), 0);
1581}
1582
1583template <typename I>
1584bool AbstractWriteLog<I>::can_flush_entry(std::shared_ptr<GenericLogEntry> log_entry) {
1585 CephContext *cct = m_image_ctx.cct;
1586
1587 ldout(cct, 20) << "" << dendl;
1588 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1589
1590 if (m_invalidating) {
1591 return true;
1592 }
1593
1594 /* For OWB we can flush entries with the same sync gen number (write between
1595 * aio_flush() calls) concurrently. Here we'll consider an entry flushable if
1596 * its sync gen number is <= the lowest sync gen number carried by all the
1597 * entries currently flushing.
1598 *
1599 * If the entry considered here bears a sync gen number lower than a
1600 * previously flushed entry, the application had to have submitted the write
1601 * bearing the higher gen number before the write with the lower gen number
1602 * completed. So, flushing these concurrently is OK.
1603 *
1604 * If the entry considered here bears a sync gen number higher than a
1605 * currently flushing entry, the write with the lower gen number may have
1606 * completed to the application before the write with the higher sync gen
1607 * number was submitted, and the application may rely on that completion
1608 * order for volume consistency. In this case the entry will not be
1609 * considered flushable until all the entries bearing lower sync gen numbers
1610 * finish flushing.
1611 */
1612
1613 if (m_flush_ops_in_flight &&
1614 (log_entry->ram_entry.sync_gen_number > m_lowest_flushing_sync_gen)) {
1615 return false;
1616 }
1617
1618 return (log_entry->can_writeback() &&
1619 (m_flush_ops_in_flight <= IN_FLIGHT_FLUSH_WRITE_LIMIT) &&
1620 (m_flush_bytes_in_flight <= IN_FLIGHT_FLUSH_BYTES_LIMIT));
1621}
1622
1623template <typename I>
20effc67
TL
1624void AbstractWriteLog<I>::detain_flush_guard_request(std::shared_ptr<GenericLogEntry> log_entry,
1625 GuardedRequestFunctionContext *guarded_ctx) {
1626 ldout(m_image_ctx.cct, 20) << dendl;
f67539c2 1627
20effc67
TL
1628 BlockExtent extent;
1629 if (log_entry->is_sync_point()) {
1630 extent = block_extent(whole_volume_extent());
1631 } else {
1632 extent = log_entry->ram_entry.block_extent();
f67539c2 1633 }
20effc67
TL
1634
1635 auto req = GuardedRequest(extent, guarded_ctx, false);
1636 BlockGuardCell *cell = nullptr;
1637
1638 {
1639 std::lock_guard locker(m_flush_guard_lock);
1640 m_flush_guard.detain(req.block_extent, &req, &cell);
1641 }
1642 if (cell) {
1643 req.guard_ctx->cell = cell;
1644 m_image_ctx.op_work_queue->queue(req.guard_ctx, 0);
1645 }
1646}
1647
1648template <typename I>
1649Context* AbstractWriteLog<I>::construct_flush_entry(std::shared_ptr<GenericLogEntry> log_entry,
1650 bool invalidating) {
1651 ldout(m_image_ctx.cct, 20) << "" << dendl;
f67539c2
TL
1652
1653 /* Flush write completion action */
a4b75251 1654 utime_t writeback_start_time = ceph_clock_now();
f67539c2 1655 Context *ctx = new LambdaContext(
a4b75251
TL
1656 [this, log_entry, writeback_start_time, invalidating](int r) {
1657 utime_t writeback_comp_time = ceph_clock_now();
1658 m_perfcounter->tinc(l_librbd_pwl_writeback_latency,
1659 writeback_comp_time - writeback_start_time);
f67539c2
TL
1660 {
1661 std::lock_guard locker(m_lock);
1662 if (r < 0) {
1663 lderr(m_image_ctx.cct) << "failed to flush log entry"
1664 << cpp_strerror(r) << dendl;
1665 m_dirty_log_entries.push_front(log_entry);
1666 } else {
1667 ceph_assert(m_bytes_dirty >= log_entry->bytes_dirty());
1668 log_entry->set_flushed(true);
1669 m_bytes_dirty -= log_entry->bytes_dirty();
1670 sync_point_writer_flushed(log_entry->get_sync_point_entry());
1671 ldout(m_image_ctx.cct, 20) << "flushed: " << log_entry
1672 << " invalidating=" << invalidating
1673 << dendl;
1674 }
1675 m_flush_ops_in_flight -= 1;
1676 m_flush_bytes_in_flight -= log_entry->ram_entry.write_bytes;
1677 wake_up();
1678 }
1679 });
1680 /* Flush through lower cache before completing */
1681 ctx = new LambdaContext(
20effc67
TL
1682 [this, ctx, log_entry](int r) {
1683 {
1684
1685 WriteLogGuard::BlockOperations block_reqs;
1686 BlockGuardCell *detained_cell = nullptr;
1687
1688 std::lock_guard locker{m_flush_guard_lock};
1689 m_flush_guard.release(log_entry->m_cell, &block_reqs);
1690
1691 for (auto &req : block_reqs) {
1692 m_flush_guard.detain(req.block_extent, &req, &detained_cell);
1693 if (detained_cell) {
1694 req.guard_ctx->cell = detained_cell;
1695 m_image_ctx.op_work_queue->queue(req.guard_ctx, 0);
1696 }
1697 }
1698 }
1699
f67539c2
TL
1700 if (r < 0) {
1701 lderr(m_image_ctx.cct) << "failed to flush log entry"
1702 << cpp_strerror(r) << dendl;
1703 ctx->complete(r);
1704 } else {
1705 m_image_writeback.aio_flush(io::FLUSH_SOURCE_WRITEBACK, ctx);
1706 }
1707 });
1708 return ctx;
1709}
1710
1711template <typename I>
1712void AbstractWriteLog<I>::process_writeback_dirty_entries() {
1713 CephContext *cct = m_image_ctx.cct;
1714 bool all_clean = false;
1715 int flushed = 0;
a4b75251 1716 bool has_write_entry = false;
f67539c2
TL
1717
1718 ldout(cct, 20) << "Look for dirty entries" << dendl;
1719 {
1720 DeferredContexts post_unlock;
a4b75251
TL
1721 GenericLogEntries entries_to_flush;
1722
f67539c2 1723 std::shared_lock entry_reader_locker(m_entry_reader_lock);
a4b75251 1724 std::lock_guard locker(m_lock);
f67539c2 1725 while (flushed < IN_FLIGHT_FLUSH_WRITE_LIMIT) {
f67539c2
TL
1726 if (m_shutting_down) {
1727 ldout(cct, 5) << "Flush during shutdown supressed" << dendl;
1728 /* Do flush complete only when all flush ops are finished */
1729 all_clean = !m_flush_ops_in_flight;
1730 break;
1731 }
1732 if (m_dirty_log_entries.empty()) {
1733 ldout(cct, 20) << "Nothing new to flush" << dendl;
1734 /* Do flush complete only when all flush ops are finished */
1735 all_clean = !m_flush_ops_in_flight;
1736 break;
1737 }
a4b75251 1738
f67539c2
TL
1739 auto candidate = m_dirty_log_entries.front();
1740 bool flushable = can_flush_entry(candidate);
1741 if (flushable) {
a4b75251 1742 entries_to_flush.push_back(candidate);
f67539c2 1743 flushed++;
a4b75251
TL
1744 if (!has_write_entry)
1745 has_write_entry = candidate->is_write_entry();
f67539c2 1746 m_dirty_log_entries.pop_front();
20effc67
TL
1747
1748 // To track candidate, we should add m_flush_ops_in_flight in here
1749 {
1750 if (!m_flush_ops_in_flight ||
1751 (candidate->ram_entry.sync_gen_number < m_lowest_flushing_sync_gen)) {
1752 m_lowest_flushing_sync_gen = candidate->ram_entry.sync_gen_number;
1753 }
1754 m_flush_ops_in_flight += 1;
1755 /* For write same this is the bytes affected by the flush op, not the bytes transferred */
1756 m_flush_bytes_in_flight += candidate->ram_entry.write_bytes;
1757 }
f67539c2
TL
1758 } else {
1759 ldout(cct, 20) << "Next dirty entry isn't flushable yet" << dendl;
1760 break;
1761 }
1762 }
a4b75251
TL
1763
1764 construct_flush_entries(entries_to_flush, post_unlock, has_write_entry);
f67539c2
TL
1765 }
1766
1767 if (all_clean) {
1768 /* All flushing complete, drain outside lock */
1769 Contexts flush_contexts;
1770 {
1771 std::lock_guard locker(m_lock);
1772 flush_contexts.swap(m_flush_complete_contexts);
1773 }
1774 finish_contexts(m_image_ctx.cct, flush_contexts, 0);
1775 }
1776}
1777
1778/* Returns true if the specified SyncPointLogEntry is considered flushed, and
1779 * the log will be updated to reflect this. */
1780template <typename I>
1781bool AbstractWriteLog<I>::handle_flushed_sync_point(std::shared_ptr<SyncPointLogEntry> log_entry)
1782{
1783 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1784 ceph_assert(log_entry);
1785
1786 if ((log_entry->writes_flushed == log_entry->writes) &&
1787 log_entry->completed && log_entry->prior_sync_point_flushed &&
1788 log_entry->next_sync_point_entry) {
1789 ldout(m_image_ctx.cct, 20) << "All writes flushed up to sync point="
1790 << *log_entry << dendl;
1791 log_entry->next_sync_point_entry->prior_sync_point_flushed = true;
1792 /* Don't move the flushed sync gen num backwards. */
1793 if (m_flushed_sync_gen < log_entry->ram_entry.sync_gen_number) {
1794 m_flushed_sync_gen = log_entry->ram_entry.sync_gen_number;
1795 }
1796 m_async_op_tracker.start_op();
1797 m_work_queue.queue(new LambdaContext(
a4b75251 1798 [this, next = std::move(log_entry->next_sync_point_entry)](int r) {
f67539c2
TL
1799 bool handled_by_next;
1800 {
1801 std::lock_guard locker(m_lock);
a4b75251 1802 handled_by_next = handle_flushed_sync_point(std::move(next));
f67539c2
TL
1803 }
1804 if (!handled_by_next) {
1805 persist_last_flushed_sync_gen();
1806 }
1807 m_async_op_tracker.finish_op();
1808 }));
1809 return true;
1810 }
1811 return false;
1812}
1813
1814template <typename I>
1815void AbstractWriteLog<I>::sync_point_writer_flushed(std::shared_ptr<SyncPointLogEntry> log_entry)
1816{
1817 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1818 ceph_assert(log_entry);
1819 log_entry->writes_flushed++;
1820
1821 /* If this entry might be completely flushed, look closer */
1822 if ((log_entry->writes_flushed == log_entry->writes) && log_entry->completed) {
1823 ldout(m_image_ctx.cct, 15) << "All writes flushed for sync point="
1824 << *log_entry << dendl;
1825 handle_flushed_sync_point(log_entry);
1826 }
1827}
1828
1829/* Make a new sync point and flush the previous during initialization, when there may or may
1830 * not be a previous sync point */
1831template <typename I>
1832void AbstractWriteLog<I>::init_flush_new_sync_point(DeferredContexts &later) {
1833 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1834 ceph_assert(!m_initialized); /* Don't use this after init */
1835
1836 if (!m_current_sync_point) {
1837 /* First sync point since start */
1838 new_sync_point(later);
1839 } else {
1840 flush_new_sync_point(nullptr, later);
1841 }
1842}
1843
1844/**
1845 * Begin a new sync point
1846 */
1847template <typename I>
1848void AbstractWriteLog<I>::new_sync_point(DeferredContexts &later) {
1849 CephContext *cct = m_image_ctx.cct;
1850 std::shared_ptr<SyncPoint> old_sync_point = m_current_sync_point;
1851 std::shared_ptr<SyncPoint> new_sync_point;
1852 ldout(cct, 20) << dendl;
1853
1854 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1855
1856 /* The first time this is called, if this is a newly created log,
1857 * this makes the first sync gen number we'll use 1. On the first
1858 * call for a re-opened log m_current_sync_gen will be the highest
1859 * gen number from all the sync point entries found in the re-opened
1860 * log, and this advances to the next sync gen number. */
1861 ++m_current_sync_gen;
1862
1863 new_sync_point = std::make_shared<SyncPoint>(m_current_sync_gen, cct);
1864 m_current_sync_point = new_sync_point;
1865
1866 /* If this log has been re-opened, old_sync_point will initially be
1867 * nullptr, but m_current_sync_gen may not be zero. */
1868 if (old_sync_point) {
1869 new_sync_point->setup_earlier_sync_point(old_sync_point, m_last_op_sequence_num);
1870 m_perfcounter->hinc(l_librbd_pwl_syncpoint_hist,
1871 old_sync_point->log_entry->writes,
1872 old_sync_point->log_entry->bytes);
1873 /* This sync point will acquire no more sub-ops. Activation needs
1874 * to acquire m_lock, so defer to later*/
1875 later.add(new LambdaContext(
1876 [this, old_sync_point](int r) {
1877 old_sync_point->prior_persisted_gather_activate();
1878 }));
1879 }
1880
1881 new_sync_point->prior_persisted_gather_set_finisher();
1882
1883 if (old_sync_point) {
1884 ldout(cct,6) << "new sync point = [" << *m_current_sync_point
1885 << "], prior = [" << *old_sync_point << "]" << dendl;
1886 } else {
1887 ldout(cct,6) << "first sync point = [" << *m_current_sync_point
1888 << "]" << dendl;
1889 }
1890}
1891
1892template <typename I>
1893void AbstractWriteLog<I>::flush_new_sync_point(C_FlushRequestT *flush_req,
1894 DeferredContexts &later) {
1895 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1896
1897 if (!flush_req) {
1898 m_async_null_flush_finish++;
1899 m_async_op_tracker.start_op();
1900 Context *flush_ctx = new LambdaContext([this](int r) {
1901 m_async_null_flush_finish--;
1902 m_async_op_tracker.finish_op();
1903 });
1904 flush_req = make_flush_req(flush_ctx);
1905 flush_req->internal = true;
1906 }
1907
1908 /* Add a new sync point. */
1909 new_sync_point(later);
1910 std::shared_ptr<SyncPoint> to_append = m_current_sync_point->earlier_sync_point;
1911 ceph_assert(to_append);
1912
1913 /* This flush request will append/persist the (now) previous sync point */
1914 flush_req->to_append = to_append;
1915
1916 /* When the m_sync_point_persist Gather completes this sync point can be
1917 * appended. The only sub for this Gather is the finisher Context for
1918 * m_prior_log_entries_persisted, which records the result of the Gather in
1919 * the sync point, and completes. TODO: Do we still need both of these
1920 * Gathers?*/
1921 Context * ctx = new LambdaContext([this, flush_req](int r) {
1922 ldout(m_image_ctx.cct, 20) << "Flush req=" << flush_req
1923 << " sync point =" << flush_req->to_append
1924 << ". Ready to persist." << dendl;
1925 alloc_and_dispatch_io_req(flush_req);
1926 });
1927 to_append->persist_gather_set_finisher(ctx);
1928
1929 /* The m_sync_point_persist Gather has all the subs it will ever have, and
1930 * now has its finisher. If the sub is already complete, activation will
1931 * complete the Gather. The finisher will acquire m_lock, so we'll activate
1932 * this when we release m_lock.*/
1933 later.add(new LambdaContext([this, to_append](int r) {
1934 to_append->persist_gather_activate();
1935 }));
1936
1937 /* The flush request completes when the sync point persists */
1938 to_append->add_in_on_persisted_ctxs(flush_req);
1939}
1940
1941template <typename I>
1942void AbstractWriteLog<I>::flush_new_sync_point_if_needed(C_FlushRequestT *flush_req,
1943 DeferredContexts &later) {
1944 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1945
1946 /* If there have been writes since the last sync point ... */
1947 if (m_current_sync_point->log_entry->writes) {
1948 flush_new_sync_point(flush_req, later);
1949 } else {
1950 /* There have been no writes to the current sync point. */
1951 if (m_current_sync_point->earlier_sync_point) {
1952 /* If previous sync point hasn't completed, complete this flush
1953 * with the earlier sync point. No alloc or dispatch needed. */
1954 m_current_sync_point->earlier_sync_point->on_sync_point_persisted.push_back(flush_req);
1955 } else {
1956 /* The previous sync point has already completed and been
1957 * appended. The current sync point has no writes, so this flush
1958 * has nothing to wait for. This flush completes now. */
1959 later.add(flush_req);
1960 }
1961 }
1962}
1963
1964/*
1965 * RWL internal flush - will actually flush the RWL.
1966 *
1967 * User flushes should arrive at aio_flush(), and only flush prior
1968 * writes to all log replicas.
1969 *
1970 * Librbd internal flushes will arrive at flush(invalidate=false,
1971 * discard=false), and traverse the block guard to ensure in-flight writes are
1972 * flushed.
1973 */
1974template <typename I>
1975void AbstractWriteLog<I>::flush_dirty_entries(Context *on_finish) {
1976 CephContext *cct = m_image_ctx.cct;
1977 bool all_clean;
1978 bool flushing;
1979 bool stop_flushing;
1980
1981 {
1982 std::lock_guard locker(m_lock);
1983 flushing = (0 != m_flush_ops_in_flight);
1984 all_clean = m_dirty_log_entries.empty();
1985 stop_flushing = (m_shutting_down);
1986 }
1987
1988 if (!flushing && (all_clean || stop_flushing)) {
1989 /* Complete without holding m_lock */
1990 if (all_clean) {
1991 ldout(cct, 20) << "no dirty entries" << dendl;
1992 } else {
1993 ldout(cct, 5) << "flush during shutdown suppressed" << dendl;
1994 }
1995 on_finish->complete(0);
1996 } else {
1997 if (all_clean) {
1998 ldout(cct, 5) << "flush ops still in progress" << dendl;
1999 } else {
2000 ldout(cct, 20) << "dirty entries remain" << dendl;
2001 }
2002 std::lock_guard locker(m_lock);
2003 /* on_finish can't be completed yet */
2004 m_flush_complete_contexts.push_back(new LambdaContext(
2005 [this, on_finish](int r) {
2006 flush_dirty_entries(on_finish);
2007 }));
2008 wake_up();
2009 }
2010}
2011
2012template <typename I>
2013void AbstractWriteLog<I>::internal_flush(bool invalidate, Context *on_finish) {
2014 ldout(m_image_ctx.cct, 20) << "invalidate=" << invalidate << dendl;
2015
2016 if (m_perfcounter) {
2017 if (invalidate) {
2018 m_perfcounter->inc(l_librbd_pwl_invalidate_cache, 1);
2019 } else {
a4b75251 2020 m_perfcounter->inc(l_librbd_pwl_internal_flush, 1);
f67539c2
TL
2021 }
2022 }
2023
2024 /* May be called even if initialization fails */
2025 if (!m_initialized) {
2026 ldout(m_image_ctx.cct, 05) << "never initialized" << dendl;
2027 /* Deadlock if completed here */
2028 m_image_ctx.op_work_queue->queue(on_finish, 0);
2029 return;
2030 }
2031
2032 /* Flush/invalidate must pass through block guard to ensure all layers of
2033 * cache are consistently flush/invalidated. This ensures no in-flight write leaves
2034 * some layers with valid regions, which may later produce inconsistent read
2035 * results. */
2036 GuardedRequestFunctionContext *guarded_ctx =
2037 new GuardedRequestFunctionContext(
2038 [this, on_finish, invalidate](GuardedRequestFunctionContext &guard_ctx) {
2039 DeferredContexts on_exit;
2040 ldout(m_image_ctx.cct, 20) << "cell=" << guard_ctx.cell << dendl;
2041 ceph_assert(guard_ctx.cell);
2042
2043 Context *ctx = new LambdaContext(
2044 [this, cell=guard_ctx.cell, invalidate, on_finish](int r) {
2045 std::lock_guard locker(m_lock);
2046 m_invalidating = false;
2047 ldout(m_image_ctx.cct, 6) << "Done flush/invalidating (invalidate="
2048 << invalidate << ")" << dendl;
2049 if (m_log_entries.size()) {
2050 ldout(m_image_ctx.cct, 1) << "m_log_entries.size()="
20effc67
TL
2051 << m_log_entries.size()
2052 << ", front()=" << *m_log_entries.front()
f67539c2
TL
2053 << dendl;
2054 }
2055 if (invalidate) {
2056 ceph_assert(m_log_entries.size() == 0);
2057 }
2058 ceph_assert(m_dirty_log_entries.size() == 0);
2059 m_image_ctx.op_work_queue->queue(on_finish, r);
2060 release_guarded_request(cell);
2061 });
2062 ctx = new LambdaContext(
2063 [this, ctx, invalidate](int r) {
2064 Context *next_ctx = ctx;
2065 ldout(m_image_ctx.cct, 6) << "flush_dirty_entries finished" << dendl;
2066 if (r < 0) {
2067 /* Override on_finish status with this error */
2068 next_ctx = new LambdaContext([r, ctx](int _r) {
2069 ctx->complete(r);
2070 });
2071 }
2072 if (invalidate) {
2073 {
2074 std::lock_guard locker(m_lock);
2075 ceph_assert(m_dirty_log_entries.size() == 0);
2076 ceph_assert(!m_invalidating);
2077 ldout(m_image_ctx.cct, 6) << "Invalidating" << dendl;
2078 m_invalidating = true;
2079 }
2080 /* Discards all RWL entries */
2081 while (retire_entries(MAX_ALLOC_PER_TRANSACTION)) { }
2082 next_ctx->complete(0);
2083 } else {
2084 {
2085 std::lock_guard locker(m_lock);
2086 ceph_assert(m_dirty_log_entries.size() == 0);
2087 ceph_assert(!m_invalidating);
2088 }
2089 m_image_writeback.aio_flush(io::FLUSH_SOURCE_WRITEBACK, next_ctx);
2090 }
2091 });
2092 ctx = new LambdaContext(
2093 [this, ctx](int r) {
2094 flush_dirty_entries(ctx);
2095 });
2096 std::lock_guard locker(m_lock);
2097 /* Even if we're throwing everything away, but we want the last entry to
2098 * be a sync point so we can cleanly resume.
2099 *
2100 * Also, the blockguard only guarantees the replication of this op
2101 * can't overlap with prior ops. It doesn't guarantee those are all
2102 * completed and eligible for flush & retire, which we require here.
2103 */
2104 auto flush_req = make_flush_req(ctx);
2105 flush_new_sync_point_if_needed(flush_req, on_exit);
2106 });
2107 detain_guarded_request(nullptr, guarded_ctx, true);
2108}
2109
2110template <typename I>
2111void AbstractWriteLog<I>::add_into_log_map(GenericWriteLogEntries &log_entries,
2112 C_BlockIORequestT *req) {
2113 req->copy_cache();
2114 m_blocks_to_log_entries.add_log_entries(log_entries);
2115}
2116
2117template <typename I>
2118bool AbstractWriteLog<I>::can_retire_entry(std::shared_ptr<GenericLogEntry> log_entry) {
2119 CephContext *cct = m_image_ctx.cct;
2120
2121 ldout(cct, 20) << dendl;
2122 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
20effc67 2123 ceph_assert(log_entry);
f67539c2
TL
2124 return log_entry->can_retire();
2125}
2126
a4b75251
TL
2127template <typename I>
2128void AbstractWriteLog<I>::check_image_cache_state_clean() {
2129 ceph_assert(m_deferred_ios.empty());
20effc67 2130 ceph_assert(m_ops_to_append.empty());
a4b75251
TL
2131 ceph_assert(m_async_flush_ops == 0);
2132 ceph_assert(m_async_append_ops == 0);
2133 ceph_assert(m_dirty_log_entries.empty());
2134 ceph_assert(m_ops_to_flush.empty());
2135 ceph_assert(m_flush_ops_in_flight == 0);
2136 ceph_assert(m_flush_bytes_in_flight == 0);
2137 ceph_assert(m_bytes_dirty == 0);
2138 ceph_assert(m_work_queue.empty());
2139}
2140
f67539c2
TL
2141} // namespace pwl
2142} // namespace cache
2143} // namespace librbd
2144
2145template class librbd::cache::pwl::AbstractWriteLog<librbd::ImageCtx>;