]> git.proxmox.com Git - ceph.git/blob - ceph/src/librbd/cache/pwl/AbstractWriteLog.cc
import ceph quincy 17.2.4
[ceph.git] / ceph / src / librbd / cache / pwl / AbstractWriteLog.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "AbstractWriteLog.h"
5 #include "include/buffer.h"
6 #include "include/Context.h"
7 #include "include/ceph_assert.h"
8 #include "common/deleter.h"
9 #include "common/dout.h"
10 #include "common/environment.h"
11 #include "common/errno.h"
12 #include "common/hostname.h"
13 #include "common/WorkQueue.h"
14 #include "common/Timer.h"
15 #include "common/perf_counters.h"
16 #include "librbd/ImageCtx.h"
17 #include "librbd/asio/ContextWQ.h"
18 #include "librbd/cache/pwl/ImageCacheState.h"
19 #include "librbd/cache/pwl/LogEntry.h"
20 #include "librbd/plugin/Api.h"
21 #include <map>
22 #include <vector>
23
24 #undef dout_subsys
25 #define dout_subsys ceph_subsys_rbd_pwl
26 #undef dout_prefix
27 #define dout_prefix *_dout << "librbd::cache::pwl::AbstractWriteLog: " << this \
28 << " " << __func__ << ": "
29
30 namespace librbd {
31 namespace cache {
32 namespace pwl {
33
34 using namespace std;
35 using namespace librbd::cache::pwl;
36
37 typedef AbstractWriteLog<ImageCtx>::Extent Extent;
38 typedef AbstractWriteLog<ImageCtx>::Extents Extents;
39
40 template <typename I>
41 AbstractWriteLog<I>::AbstractWriteLog(
42 I &image_ctx, librbd::cache::pwl::ImageCacheState<I>* cache_state,
43 Builder<This> *builder, cache::ImageWritebackInterface& image_writeback,
44 plugin::Api<I>& plugin_api)
45 : m_builder(builder),
46 m_write_log_guard(image_ctx.cct),
47 m_flush_guard(image_ctx.cct),
48 m_flush_guard_lock(ceph::make_mutex(pwl::unique_lock_name(
49 "librbd::cache::pwl::AbstractWriteLog::m_flush_guard_lock", this))),
50 m_deferred_dispatch_lock(ceph::make_mutex(pwl::unique_lock_name(
51 "librbd::cache::pwl::AbstractWriteLog::m_deferred_dispatch_lock", this))),
52 m_blockguard_lock(ceph::make_mutex(pwl::unique_lock_name(
53 "librbd::cache::pwl::AbstractWriteLog::m_blockguard_lock", this))),
54 m_thread_pool(
55 image_ctx.cct, "librbd::cache::pwl::AbstractWriteLog::thread_pool",
56 "tp_pwl", 4, ""),
57 m_cache_state(cache_state),
58 m_image_ctx(image_ctx),
59 m_log_pool_size(DEFAULT_POOL_SIZE),
60 m_image_writeback(image_writeback),
61 m_plugin_api(plugin_api),
62 m_log_retire_lock(ceph::make_mutex(pwl::unique_lock_name(
63 "librbd::cache::pwl::AbstractWriteLog::m_log_retire_lock", this))),
64 m_entry_reader_lock("librbd::cache::pwl::AbstractWriteLog::m_entry_reader_lock"),
65 m_log_append_lock(ceph::make_mutex(pwl::unique_lock_name(
66 "librbd::cache::pwl::AbstractWriteLog::m_log_append_lock", this))),
67 m_lock(ceph::make_mutex(pwl::unique_lock_name(
68 "librbd::cache::pwl::AbstractWriteLog::m_lock", this))),
69 m_blocks_to_log_entries(image_ctx.cct),
70 m_work_queue("librbd::cache::pwl::ReplicatedWriteLog::work_queue",
71 ceph::make_timespan(
72 image_ctx.config.template get_val<uint64_t>(
73 "rbd_op_thread_timeout")),
74 &m_thread_pool)
75 {
76 CephContext *cct = m_image_ctx.cct;
77 m_plugin_api.get_image_timer_instance(cct, &m_timer, &m_timer_lock);
78 }
79
80 template <typename I>
81 AbstractWriteLog<I>::~AbstractWriteLog() {
82 ldout(m_image_ctx.cct, 15) << "enter" << dendl;
83 {
84 std::lock_guard timer_locker(*m_timer_lock);
85 std::lock_guard locker(m_lock);
86 m_timer->cancel_event(m_timer_ctx);
87 m_thread_pool.stop();
88 ceph_assert(m_deferred_ios.size() == 0);
89 ceph_assert(m_ops_to_flush.size() == 0);
90 ceph_assert(m_ops_to_append.size() == 0);
91 ceph_assert(m_flush_ops_in_flight == 0);
92
93 delete m_cache_state;
94 m_cache_state = nullptr;
95 }
96 ldout(m_image_ctx.cct, 15) << "exit" << dendl;
97 }
98
99 template <typename I>
100 void AbstractWriteLog<I>::perf_start(std::string name) {
101 PerfCountersBuilder plb(m_image_ctx.cct, name, l_librbd_pwl_first,
102 l_librbd_pwl_last);
103
104 // Latency axis configuration for op histograms, values are in nanoseconds
105 PerfHistogramCommon::axis_config_d op_hist_x_axis_config{
106 "Latency (nsec)",
107 PerfHistogramCommon::SCALE_LOG2, ///< Latency in logarithmic scale
108 0, ///< Start at 0
109 5000, ///< Quantization unit is 5usec
110 16, ///< Ranges into the mS
111 };
112
113 // Syncpoint logentry number x-axis configuration for op histograms
114 PerfHistogramCommon::axis_config_d sp_logentry_number_config{
115 "logentry number",
116 PerfHistogramCommon::SCALE_LINEAR, // log entry number in linear scale
117 0, // Start at 0
118 1, // Quantization unit is 1
119 260, // Up to 260 > (MAX_WRITES_PER_SYNC_POINT)
120 };
121
122 // Syncpoint bytes number y-axis configuration for op histogram
123 PerfHistogramCommon::axis_config_d sp_bytes_number_config{
124 "Number of SyncPoint",
125 PerfHistogramCommon::SCALE_LOG2, // Request size in logarithmic scale
126 0, // Start at 0
127 512, // Quantization unit is 512
128 17, // Writes up to 8M >= MAX_BYTES_PER_SYNC_POINT
129 };
130
131 // Op size axis configuration for op histogram y axis, values are in bytes
132 PerfHistogramCommon::axis_config_d op_hist_y_axis_config{
133 "Request size (bytes)",
134 PerfHistogramCommon::SCALE_LOG2, ///< Request size in logarithmic scale
135 0, ///< Start at 0
136 512, ///< Quantization unit is 512 bytes
137 16, ///< Writes up to >32k
138 };
139
140 // Num items configuration for op histogram y axis, values are in items
141 PerfHistogramCommon::axis_config_d op_hist_y_axis_count_config{
142 "Number of items",
143 PerfHistogramCommon::SCALE_LINEAR, ///< Request size in linear scale
144 0, ///< Start at 0
145 1, ///< Quantization unit is 1
146 32, ///< Writes up to >32k
147 };
148
149 plb.add_u64_counter(l_librbd_pwl_rd_req, "rd", "Reads");
150 plb.add_u64_counter(l_librbd_pwl_rd_bytes, "rd_bytes", "Data size in reads");
151 plb.add_time_avg(l_librbd_pwl_rd_latency, "rd_latency", "Latency of reads");
152
153 plb.add_u64_counter(l_librbd_pwl_rd_hit_req, "hit_rd", "Reads completely hitting RWL");
154 plb.add_u64_counter(l_librbd_pwl_rd_hit_bytes, "rd_hit_bytes", "Bytes read from RWL");
155 plb.add_time_avg(l_librbd_pwl_rd_hit_latency, "hit_rd_latency", "Latency of read hits");
156
157 plb.add_u64_counter(l_librbd_pwl_rd_part_hit_req, "part_hit_rd", "reads partially hitting RWL");
158
159 plb.add_u64_counter_histogram(
160 l_librbd_pwl_syncpoint_hist, "syncpoint_logentry_bytes_histogram",
161 sp_logentry_number_config, sp_bytes_number_config,
162 "Histogram of syncpoint's logentry numbers vs bytes number");
163
164 plb.add_u64_counter(l_librbd_pwl_wr_req, "wr", "Writes");
165 plb.add_u64_counter(l_librbd_pwl_wr_bytes, "wr_bytes", "Data size in writes");
166 plb.add_u64_counter(l_librbd_pwl_wr_req_def, "wr_def", "Writes deferred for resources");
167 plb.add_u64_counter(l_librbd_pwl_wr_req_def_lanes, "wr_def_lanes", "Writes deferred for lanes");
168 plb.add_u64_counter(l_librbd_pwl_wr_req_def_log, "wr_def_log", "Writes deferred for log entries");
169 plb.add_u64_counter(l_librbd_pwl_wr_req_def_buf, "wr_def_buf", "Writes deferred for buffers");
170 plb.add_u64_counter(l_librbd_pwl_wr_req_overlap, "wr_overlap", "Writes overlapping with prior in-progress writes");
171 plb.add_u64_counter(l_librbd_pwl_wr_req_queued, "wr_q_barrier", "Writes queued for prior barriers (aio_flush)");
172
173 plb.add_u64_counter(l_librbd_pwl_log_ops, "log_ops", "Log appends");
174 plb.add_u64_avg(l_librbd_pwl_log_op_bytes, "log_op_bytes", "Average log append bytes");
175
176 plb.add_time_avg(
177 l_librbd_pwl_req_arr_to_all_t, "req_arr_to_all_t",
178 "Average arrival to allocation time (time deferred for overlap)");
179 plb.add_time_avg(
180 l_librbd_pwl_req_arr_to_dis_t, "req_arr_to_dis_t",
181 "Average arrival to dispatch time (includes time deferred for overlaps and allocation)");
182 plb.add_time_avg(
183 l_librbd_pwl_req_all_to_dis_t, "req_all_to_dis_t",
184 "Average allocation to dispatch time (time deferred for log resources)");
185 plb.add_time_avg(
186 l_librbd_pwl_wr_latency, "wr_latency",
187 "Latency of writes (persistent completion)");
188 plb.add_u64_counter_histogram(
189 l_librbd_pwl_wr_latency_hist, "wr_latency_bytes_histogram",
190 op_hist_x_axis_config, op_hist_y_axis_config,
191 "Histogram of write request latency (nanoseconds) vs. bytes written");
192 plb.add_time_avg(
193 l_librbd_pwl_wr_caller_latency, "caller_wr_latency",
194 "Latency of write completion to caller");
195 plb.add_time_avg(
196 l_librbd_pwl_nowait_req_arr_to_all_t, "req_arr_to_all_nw_t",
197 "Average arrival to allocation time (time deferred for overlap)");
198 plb.add_time_avg(
199 l_librbd_pwl_nowait_req_arr_to_dis_t, "req_arr_to_dis_nw_t",
200 "Average arrival to dispatch time (includes time deferred for overlaps and allocation)");
201 plb.add_time_avg(
202 l_librbd_pwl_nowait_req_all_to_dis_t, "req_all_to_dis_nw_t",
203 "Average allocation to dispatch time (time deferred for log resources)");
204 plb.add_time_avg(
205 l_librbd_pwl_nowait_wr_latency, "wr_latency_nw",
206 "Latency of writes (persistent completion) not deferred for free space");
207 plb.add_u64_counter_histogram(
208 l_librbd_pwl_nowait_wr_latency_hist, "wr_latency_nw_bytes_histogram",
209 op_hist_x_axis_config, op_hist_y_axis_config,
210 "Histogram of write request latency (nanoseconds) vs. bytes written for writes not deferred for free space");
211 plb.add_time_avg(
212 l_librbd_pwl_nowait_wr_caller_latency, "caller_wr_latency_nw",
213 "Latency of write completion to callerfor writes not deferred for free space");
214 plb.add_time_avg(l_librbd_pwl_log_op_alloc_t, "op_alloc_t", "Average buffer pmemobj_reserve() time");
215 plb.add_u64_counter_histogram(
216 l_librbd_pwl_log_op_alloc_t_hist, "op_alloc_t_bytes_histogram",
217 op_hist_x_axis_config, op_hist_y_axis_config,
218 "Histogram of buffer pmemobj_reserve() time (nanoseconds) vs. bytes written");
219 plb.add_time_avg(l_librbd_pwl_log_op_dis_to_buf_t, "op_dis_to_buf_t", "Average dispatch to buffer persist time");
220 plb.add_time_avg(l_librbd_pwl_log_op_dis_to_app_t, "op_dis_to_app_t", "Average dispatch to log append time");
221 plb.add_time_avg(l_librbd_pwl_log_op_dis_to_cmp_t, "op_dis_to_cmp_t", "Average dispatch to persist completion time");
222 plb.add_u64_counter_histogram(
223 l_librbd_pwl_log_op_dis_to_cmp_t_hist, "op_dis_to_cmp_t_bytes_histogram",
224 op_hist_x_axis_config, op_hist_y_axis_config,
225 "Histogram of op dispatch to persist complete time (nanoseconds) vs. bytes written");
226
227 plb.add_time_avg(
228 l_librbd_pwl_log_op_buf_to_app_t, "op_buf_to_app_t",
229 "Average buffer persist to log append time (write data persist/replicate + wait for append time)");
230 plb.add_time_avg(
231 l_librbd_pwl_log_op_buf_to_bufc_t, "op_buf_to_bufc_t",
232 "Average buffer persist time (write data persist/replicate time)");
233 plb.add_u64_counter_histogram(
234 l_librbd_pwl_log_op_buf_to_bufc_t_hist, "op_buf_to_bufc_t_bytes_histogram",
235 op_hist_x_axis_config, op_hist_y_axis_config,
236 "Histogram of write buffer persist time (nanoseconds) vs. bytes written");
237 plb.add_time_avg(
238 l_librbd_pwl_log_op_app_to_cmp_t, "op_app_to_cmp_t",
239 "Average log append to persist complete time (log entry append/replicate + wait for complete time)");
240 plb.add_time_avg(
241 l_librbd_pwl_log_op_app_to_appc_t, "op_app_to_appc_t",
242 "Average log append to persist complete time (log entry append/replicate time)");
243 plb.add_u64_counter_histogram(
244 l_librbd_pwl_log_op_app_to_appc_t_hist, "op_app_to_appc_t_bytes_histogram",
245 op_hist_x_axis_config, op_hist_y_axis_config,
246 "Histogram of log append persist time (nanoseconds) (vs. op bytes)");
247
248 plb.add_u64_counter(l_librbd_pwl_discard, "discard", "Discards");
249 plb.add_u64_counter(l_librbd_pwl_discard_bytes, "discard_bytes", "Bytes discarded");
250 plb.add_time_avg(l_librbd_pwl_discard_latency, "discard_lat", "Discard latency");
251
252 plb.add_u64_counter(l_librbd_pwl_aio_flush, "aio_flush", "AIO flush (flush to RWL)");
253 plb.add_u64_counter(l_librbd_pwl_aio_flush_def, "aio_flush_def", "AIO flushes deferred for resources");
254 plb.add_time_avg(l_librbd_pwl_aio_flush_latency, "aio_flush_lat", "AIO flush latency");
255
256 plb.add_u64_counter(l_librbd_pwl_ws,"ws", "Write Sames");
257 plb.add_u64_counter(l_librbd_pwl_ws_bytes, "ws_bytes", "Write Same bytes to image");
258 plb.add_time_avg(l_librbd_pwl_ws_latency, "ws_lat", "Write Same latency");
259
260 plb.add_u64_counter(l_librbd_pwl_cmp, "cmp", "Compare and Write requests");
261 plb.add_u64_counter(l_librbd_pwl_cmp_bytes, "cmp_bytes", "Compare and Write bytes compared/written");
262 plb.add_time_avg(l_librbd_pwl_cmp_latency, "cmp_lat", "Compare and Write latecy");
263 plb.add_u64_counter(l_librbd_pwl_cmp_fails, "cmp_fails", "Compare and Write compare fails");
264
265 plb.add_u64_counter(l_librbd_pwl_internal_flush, "internal_flush", "Flush RWL (write back to OSD)");
266 plb.add_time_avg(l_librbd_pwl_writeback_latency, "writeback_lat", "write back to OSD latency");
267 plb.add_u64_counter(l_librbd_pwl_invalidate_cache, "invalidate", "Invalidate RWL");
268 plb.add_u64_counter(l_librbd_pwl_invalidate_discard_cache, "discard", "Discard and invalidate RWL");
269
270 plb.add_time_avg(l_librbd_pwl_append_tx_t, "append_tx_lat", "Log append transaction latency");
271 plb.add_u64_counter_histogram(
272 l_librbd_pwl_append_tx_t_hist, "append_tx_lat_histogram",
273 op_hist_x_axis_config, op_hist_y_axis_count_config,
274 "Histogram of log append transaction time (nanoseconds) vs. entries appended");
275 plb.add_time_avg(l_librbd_pwl_retire_tx_t, "retire_tx_lat", "Log retire transaction latency");
276 plb.add_u64_counter_histogram(
277 l_librbd_pwl_retire_tx_t_hist, "retire_tx_lat_histogram",
278 op_hist_x_axis_config, op_hist_y_axis_count_config,
279 "Histogram of log retire transaction time (nanoseconds) vs. entries retired");
280
281 m_perfcounter = plb.create_perf_counters();
282 m_image_ctx.cct->get_perfcounters_collection()->add(m_perfcounter);
283 }
284
285 template <typename I>
286 void AbstractWriteLog<I>::perf_stop() {
287 ceph_assert(m_perfcounter);
288 m_image_ctx.cct->get_perfcounters_collection()->remove(m_perfcounter);
289 delete m_perfcounter;
290 }
291
292 template <typename I>
293 void AbstractWriteLog<I>::log_perf() {
294 bufferlist bl;
295 Formatter *f = Formatter::create("json-pretty");
296 bl.append("Perf dump follows\n--- Begin perf dump ---\n");
297 bl.append("{\n");
298 stringstream ss;
299 utime_t now = ceph_clock_now();
300 ss << "\"test_time\": \"" << now << "\",";
301 ss << "\"image\": \"" << m_image_ctx.name << "\",";
302 bl.append(ss);
303 bl.append("\"stats\": ");
304 m_image_ctx.cct->get_perfcounters_collection()->dump_formatted(f, 0);
305 f->flush(bl);
306 bl.append(",\n\"histograms\": ");
307 m_image_ctx.cct->get_perfcounters_collection()->dump_formatted_histograms(f, 0);
308 f->flush(bl);
309 delete f;
310 bl.append("}\n--- End perf dump ---\n");
311 bl.append('\0');
312 ldout(m_image_ctx.cct, 1) << bl.c_str() << dendl;
313 }
314
315 template <typename I>
316 void AbstractWriteLog<I>::periodic_stats() {
317 std::unique_lock locker(m_lock);
318 ldout(m_image_ctx.cct, 5) << "STATS: m_log_entries=" << m_log_entries.size()
319 << ", m_dirty_log_entries=" << m_dirty_log_entries.size()
320 << ", m_free_log_entries=" << m_free_log_entries
321 << ", m_bytes_allocated=" << m_bytes_allocated
322 << ", m_bytes_cached=" << m_bytes_cached
323 << ", m_bytes_dirty=" << m_bytes_dirty
324 << ", bytes available=" << m_bytes_allocated_cap - m_bytes_allocated
325 << ", m_first_valid_entry=" << m_first_valid_entry
326 << ", m_first_free_entry=" << m_first_free_entry
327 << ", m_current_sync_gen=" << m_current_sync_gen
328 << ", m_flushed_sync_gen=" << m_flushed_sync_gen
329 << dendl;
330
331 update_image_cache_state();
332 write_image_cache_state(locker);
333 }
334
335 template <typename I>
336 void AbstractWriteLog<I>::arm_periodic_stats() {
337 ceph_assert(ceph_mutex_is_locked(*m_timer_lock));
338 m_timer_ctx = new LambdaContext([this](int r) {
339 /* m_timer_lock is held */
340 periodic_stats();
341 arm_periodic_stats();
342 });
343 m_timer->add_event_after(LOG_STATS_INTERVAL_SECONDS, m_timer_ctx);
344 }
345
346 template <typename I>
347 void AbstractWriteLog<I>::update_entries(std::shared_ptr<GenericLogEntry> *log_entry,
348 WriteLogCacheEntry *cache_entry, std::map<uint64_t, bool> &missing_sync_points,
349 std::map<uint64_t, std::shared_ptr<SyncPointLogEntry>> &sync_point_entries,
350 uint64_t entry_index) {
351 bool writer = cache_entry->is_writer();
352 if (cache_entry->is_sync_point()) {
353 ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
354 << " is a sync point. cache_entry=[" << *cache_entry << "]" << dendl;
355 auto sync_point_entry = std::make_shared<SyncPointLogEntry>(cache_entry->sync_gen_number);
356 *log_entry = sync_point_entry;
357 sync_point_entries[cache_entry->sync_gen_number] = sync_point_entry;
358 missing_sync_points.erase(cache_entry->sync_gen_number);
359 m_current_sync_gen = cache_entry->sync_gen_number;
360 } else if (cache_entry->is_write()) {
361 ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
362 << " is a write. cache_entry=[" << *cache_entry << "]" << dendl;
363 auto write_entry =
364 m_builder->create_write_log_entry(nullptr, cache_entry->image_offset_bytes, cache_entry->write_bytes);
365 write_data_to_buffer(write_entry, cache_entry);
366 *log_entry = write_entry;
367 } else if (cache_entry->is_writesame()) {
368 ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
369 << " is a write same. cache_entry=[" << *cache_entry << "]" << dendl;
370 auto ws_entry =
371 m_builder->create_writesame_log_entry(nullptr, cache_entry->image_offset_bytes,
372 cache_entry->write_bytes, cache_entry->ws_datalen);
373 write_data_to_buffer(ws_entry, cache_entry);
374 *log_entry = ws_entry;
375 } else if (cache_entry->is_discard()) {
376 ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
377 << " is a discard. cache_entry=[" << *cache_entry << "]" << dendl;
378 auto discard_entry =
379 std::make_shared<DiscardLogEntry>(nullptr, cache_entry->image_offset_bytes, cache_entry->write_bytes,
380 m_discard_granularity_bytes);
381 *log_entry = discard_entry;
382 } else {
383 lderr(m_image_ctx.cct) << "Unexpected entry type in entry " << entry_index
384 << ", cache_entry=[" << *cache_entry << "]" << dendl;
385 }
386
387 if (writer) {
388 ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
389 << " writes. cache_entry=[" << *cache_entry << "]" << dendl;
390 if (!sync_point_entries[cache_entry->sync_gen_number]) {
391 missing_sync_points[cache_entry->sync_gen_number] = true;
392 }
393 }
394 }
395
396 template <typename I>
397 void AbstractWriteLog<I>::update_sync_points(std::map<uint64_t, bool> &missing_sync_points,
398 std::map<uint64_t, std::shared_ptr<SyncPointLogEntry>> &sync_point_entries,
399 DeferredContexts &later) {
400 /* Create missing sync points. These must not be appended until the
401 * entry reload is complete and the write map is up to
402 * date. Currently this is handled by the deferred contexts object
403 * passed to new_sync_point(). These contexts won't be completed
404 * until this function returns. */
405 for (auto &kv : missing_sync_points) {
406 ldout(m_image_ctx.cct, 5) << "Adding sync point " << kv.first << dendl;
407 if (0 == m_current_sync_gen) {
408 /* The unlikely case where the log contains writing entries, but no sync
409 * points (e.g. because they were all retired) */
410 m_current_sync_gen = kv.first-1;
411 }
412 ceph_assert(kv.first == m_current_sync_gen+1);
413 init_flush_new_sync_point(later);
414 ceph_assert(kv.first == m_current_sync_gen);
415 sync_point_entries[kv.first] = m_current_sync_point->log_entry;
416 }
417
418 /*
419 * Iterate over the log entries again (this time via the global
420 * entries list), connecting write entries to their sync points and
421 * updating the sync point stats.
422 *
423 * Add writes to the write log map.
424 */
425 std::shared_ptr<SyncPointLogEntry> previous_sync_point_entry = nullptr;
426 for (auto &log_entry : m_log_entries) {
427 if ((log_entry->write_bytes() > 0) || (log_entry->bytes_dirty() > 0)) {
428 /* This entry is one of the types that write */
429 auto gen_write_entry = static_pointer_cast<GenericWriteLogEntry>(log_entry);
430 if (gen_write_entry) {
431 auto sync_point_entry = sync_point_entries[gen_write_entry->ram_entry.sync_gen_number];
432 if (!sync_point_entry) {
433 lderr(m_image_ctx.cct) << "Sync point missing for entry=[" << *gen_write_entry << "]" << dendl;
434 ceph_assert(false);
435 } else {
436 gen_write_entry->sync_point_entry = sync_point_entry;
437 sync_point_entry->writes++;
438 sync_point_entry->bytes += gen_write_entry->ram_entry.write_bytes;
439 sync_point_entry->writes_completed++;
440 m_blocks_to_log_entries.add_log_entry(gen_write_entry);
441 /* This entry is only dirty if its sync gen number is > the flushed
442 * sync gen number from the root object. */
443 if (gen_write_entry->ram_entry.sync_gen_number > m_flushed_sync_gen) {
444 m_dirty_log_entries.push_back(log_entry);
445 m_bytes_dirty += gen_write_entry->bytes_dirty();
446 } else {
447 gen_write_entry->set_flushed(true);
448 sync_point_entry->writes_flushed++;
449 }
450
451 /* calc m_bytes_allocated & m_bytes_cached */
452 inc_allocated_cached_bytes(log_entry);
453 }
454 }
455 } else {
456 /* This entry is sync point entry */
457 auto sync_point_entry = static_pointer_cast<SyncPointLogEntry>(log_entry);
458 if (sync_point_entry) {
459 if (previous_sync_point_entry) {
460 previous_sync_point_entry->next_sync_point_entry = sync_point_entry;
461 if (previous_sync_point_entry->ram_entry.sync_gen_number > m_flushed_sync_gen) {
462 sync_point_entry->prior_sync_point_flushed = false;
463 ceph_assert(!previous_sync_point_entry->prior_sync_point_flushed ||
464 (0 == previous_sync_point_entry->writes) ||
465 (previous_sync_point_entry->writes >= previous_sync_point_entry->writes_flushed));
466 } else {
467 sync_point_entry->prior_sync_point_flushed = true;
468 ceph_assert(previous_sync_point_entry->prior_sync_point_flushed);
469 ceph_assert(previous_sync_point_entry->writes == previous_sync_point_entry->writes_flushed);
470 }
471 } else {
472 /* There are no previous sync points, so we'll consider them flushed */
473 sync_point_entry->prior_sync_point_flushed = true;
474 }
475 previous_sync_point_entry = sync_point_entry;
476 ldout(m_image_ctx.cct, 10) << "Loaded to sync point=[" << *sync_point_entry << dendl;
477 }
478 }
479 }
480 if (0 == m_current_sync_gen) {
481 /* If a re-opened log was completely flushed, we'll have found no sync point entries here,
482 * and not advanced m_current_sync_gen. Here we ensure it starts past the last flushed sync
483 * point recorded in the log. */
484 m_current_sync_gen = m_flushed_sync_gen;
485 }
486 }
487
488 template <typename I>
489 void AbstractWriteLog<I>::pwl_init(Context *on_finish, DeferredContexts &later) {
490 CephContext *cct = m_image_ctx.cct;
491 ldout(cct, 20) << dendl;
492 ceph_assert(m_cache_state);
493 std::lock_guard locker(m_lock);
494 ceph_assert(!m_initialized);
495 ldout(cct,5) << "image name: " << m_image_ctx.name << " id: " << m_image_ctx.id << dendl;
496
497 if (!m_cache_state->present) {
498 m_cache_state->host = ceph_get_short_hostname();
499 m_cache_state->size = m_image_ctx.config.template get_val<uint64_t>(
500 "rbd_persistent_cache_size");
501
502 string path = m_image_ctx.config.template get_val<string>(
503 "rbd_persistent_cache_path");
504 std::string pool_name = m_image_ctx.md_ctx.get_pool_name();
505 m_cache_state->path = path + "/rbd-pwl." + pool_name + "." + m_image_ctx.id + ".pool";
506 }
507
508 ldout(cct,5) << "pwl_size: " << m_cache_state->size << dendl;
509 ldout(cct,5) << "pwl_path: " << m_cache_state->path << dendl;
510
511 m_log_pool_name = m_cache_state->path;
512 m_log_pool_size = max(m_cache_state->size, MIN_POOL_SIZE);
513 m_log_pool_size = p2align(m_log_pool_size, POOL_SIZE_ALIGN);
514 ldout(cct, 5) << "pool " << m_log_pool_name << " size " << m_log_pool_size
515 << " (adjusted from " << m_cache_state->size << ")" << dendl;
516
517 if ((!m_cache_state->present) &&
518 (access(m_log_pool_name.c_str(), F_OK) == 0)) {
519 ldout(cct, 5) << "There's an existing pool file " << m_log_pool_name
520 << ", While there's no cache in the image metatata." << dendl;
521 if (remove(m_log_pool_name.c_str()) != 0) {
522 lderr(cct) << "failed to remove the pool file " << m_log_pool_name
523 << dendl;
524 on_finish->complete(-errno);
525 return;
526 } else {
527 ldout(cct, 5) << "Removed the existing pool file." << dendl;
528 }
529 } else if ((m_cache_state->present) &&
530 (access(m_log_pool_name.c_str(), F_OK) != 0)) {
531 lderr(cct) << "can't find the existed pool file: " << m_log_pool_name
532 << ". error: " << cpp_strerror(-errno) << dendl;
533 on_finish->complete(-errno);
534 return;
535 }
536
537 bool succeeded = initialize_pool(on_finish, later);
538 if (!succeeded) {
539 return ;
540 }
541
542 ldout(cct,1) << "pool " << m_log_pool_name << " has " << m_total_log_entries
543 << " log entries, " << m_free_log_entries << " of which are free."
544 << " first_valid=" << m_first_valid_entry
545 << ", first_free=" << m_first_free_entry
546 << ", flushed_sync_gen=" << m_flushed_sync_gen
547 << ", m_current_sync_gen=" << m_current_sync_gen << dendl;
548 if (m_first_free_entry == m_first_valid_entry) {
549 ldout(cct,1) << "write log is empty" << dendl;
550 m_cache_state->empty = true;
551 }
552
553 /* Start the sync point following the last one seen in the
554 * log. Flush the last sync point created during the loading of the
555 * existing log entries. */
556 init_flush_new_sync_point(later);
557 ldout(cct,20) << "new sync point = [" << m_current_sync_point << "]" << dendl;
558
559 m_initialized = true;
560 // Start the thread
561 m_thread_pool.start();
562
563 /* Do these after we drop lock */
564 later.add(new LambdaContext([this](int r) {
565 /* Log stats for the first time */
566 periodic_stats();
567 /* Arm periodic stats logging for the first time */
568 std::lock_guard timer_locker(*m_timer_lock);
569 arm_periodic_stats();
570 }));
571 m_image_ctx.op_work_queue->queue(on_finish, 0);
572 }
573
574 template <typename I>
575 void AbstractWriteLog<I>::write_image_cache_state(std::unique_lock<ceph::mutex>& locker) {
576 using klass = AbstractWriteLog<I>;
577 Context *ctx = util::create_context_callback<
578 klass, &klass::handle_write_image_cache_state>(this);
579 m_cache_state->write_image_cache_state(locker, ctx);
580 }
581
582 template <typename I>
583 void AbstractWriteLog<I>::update_image_cache_state() {
584 ldout(m_image_ctx.cct, 10) << dendl;
585
586 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
587 m_cache_state->allocated_bytes = m_bytes_allocated;
588 m_cache_state->cached_bytes = m_bytes_cached;
589 m_cache_state->dirty_bytes = m_bytes_dirty;
590 m_cache_state->free_bytes = m_bytes_allocated_cap - m_bytes_allocated;
591 m_cache_state->hits_full = m_perfcounter->get(l_librbd_pwl_rd_hit_req);
592 m_cache_state->hits_partial = m_perfcounter->get(l_librbd_pwl_rd_part_hit_req);
593 m_cache_state->misses = m_perfcounter->get(l_librbd_pwl_rd_req) -
594 m_cache_state->hits_full - m_cache_state->hits_partial;
595 m_cache_state->hit_bytes = m_perfcounter->get(l_librbd_pwl_rd_hit_bytes);
596 m_cache_state->miss_bytes = m_perfcounter->get(l_librbd_pwl_rd_bytes) -
597 m_cache_state->hit_bytes;
598 }
599
600 template <typename I>
601 void AbstractWriteLog<I>::handle_write_image_cache_state(int r) {
602 CephContext *cct = m_image_ctx.cct;
603 ldout(cct, 10) << "r=" << r << dendl;
604
605 if (r < 0) {
606 lderr(cct) << "failed to update image cache state: " << cpp_strerror(r)
607 << dendl;
608 return;
609 }
610 }
611
612 template <typename I>
613 void AbstractWriteLog<I>::init(Context *on_finish) {
614 CephContext *cct = m_image_ctx.cct;
615 ldout(cct, 20) << dendl;
616 auto pname = std::string("librbd-pwl-") + m_image_ctx.id +
617 std::string("-") + m_image_ctx.md_ctx.get_pool_name() +
618 std::string("-") + m_image_ctx.name;
619 perf_start(pname);
620
621 ceph_assert(!m_initialized);
622
623 Context *ctx = new LambdaContext(
624 [this, on_finish](int r) {
625 if (r >= 0) {
626 std::unique_lock locker(m_lock);
627 update_image_cache_state();
628 m_cache_state->write_image_cache_state(locker, on_finish);
629 } else {
630 on_finish->complete(r);
631 }
632 });
633
634 DeferredContexts later;
635 pwl_init(ctx, later);
636 }
637
638 template <typename I>
639 void AbstractWriteLog<I>::shut_down(Context *on_finish) {
640 CephContext *cct = m_image_ctx.cct;
641 ldout(cct, 20) << dendl;
642
643 ldout(cct,5) << "image name: " << m_image_ctx.name << " id: " << m_image_ctx.id << dendl;
644
645 Context *ctx = new LambdaContext(
646 [this, on_finish](int r) {
647 if (m_perfcounter) {
648 perf_stop();
649 }
650 ldout(m_image_ctx.cct, 6) << "shutdown complete" << dendl;
651 m_image_ctx.op_work_queue->queue(on_finish, r);
652 });
653 ctx = new LambdaContext(
654 [this, ctx](int r) {
655 ldout(m_image_ctx.cct, 6) << "image cache cleaned" << dendl;
656 Context *next_ctx = override_ctx(r, ctx);
657 periodic_stats();
658
659 std::unique_lock locker(m_lock);
660 check_image_cache_state_clean();
661 m_wake_up_enabled = false;
662 m_log_entries.clear();
663 m_cache_state->clean = true;
664 m_cache_state->empty = true;
665 remove_pool_file();
666 update_image_cache_state();
667 m_cache_state->write_image_cache_state(locker, next_ctx);
668 });
669 ctx = new LambdaContext(
670 [this, ctx](int r) {
671 Context *next_ctx = override_ctx(r, ctx);
672 ldout(m_image_ctx.cct, 6) << "waiting for in flight operations" << dendl;
673 // Wait for in progress IOs to complete
674 next_ctx = util::create_async_context_callback(&m_work_queue, next_ctx);
675 m_async_op_tracker.wait_for_ops(next_ctx);
676 });
677 ctx = new LambdaContext(
678 [this, ctx](int r) {
679 Context *next_ctx = override_ctx(r, ctx);
680 {
681 /* Sync with process_writeback_dirty_entries() */
682 RWLock::WLocker entry_reader_wlocker(m_entry_reader_lock);
683 m_shutting_down = true;
684 /* Flush all writes to OSDs (unless disabled) and wait for all
685 in-progress flush writes to complete */
686 ldout(m_image_ctx.cct, 6) << "flushing" << dendl;
687 periodic_stats();
688 }
689 flush_dirty_entries(next_ctx);
690 });
691 ctx = new LambdaContext(
692 [this, ctx](int r) {
693 ldout(m_image_ctx.cct, 6) << "Done internal_flush in shutdown" << dendl;
694 m_work_queue.queue(ctx, r);
695 });
696 /* Complete all in-flight writes before shutting down */
697 ldout(m_image_ctx.cct, 6) << "internal_flush in shutdown" << dendl;
698 internal_flush(false, ctx);
699 }
700
701 template <typename I>
702 void AbstractWriteLog<I>::read(Extents&& image_extents,
703 ceph::bufferlist* bl,
704 int fadvise_flags, Context *on_finish) {
705 CephContext *cct = m_image_ctx.cct;
706 utime_t now = ceph_clock_now();
707
708 on_finish = new LambdaContext(
709 [this, on_finish](int r) {
710 m_async_op_tracker.finish_op();
711 on_finish->complete(r);
712 });
713 C_ReadRequest *read_ctx = m_builder->create_read_request(
714 cct, now, m_perfcounter, bl, on_finish);
715 ldout(cct, 20) << "name: " << m_image_ctx.name << " id: " << m_image_ctx.id
716 << "image_extents=" << image_extents
717 << ", bl=" << bl
718 << ", on_finish=" << on_finish << dendl;
719
720 ceph_assert(m_initialized);
721 bl->clear();
722 m_perfcounter->inc(l_librbd_pwl_rd_req, 1);
723
724 std::vector<std::shared_ptr<GenericWriteLogEntry>> log_entries_to_read;
725 std::vector<bufferlist*> bls_to_read;
726
727 m_async_op_tracker.start_op();
728 Context *ctx = new LambdaContext(
729 [this, read_ctx, fadvise_flags](int r) {
730 if (read_ctx->miss_extents.empty()) {
731 /* All of this read comes from RWL */
732 read_ctx->complete(0);
733 } else {
734 /* Pass the read misses on to the layer below RWL */
735 m_image_writeback.aio_read(
736 std::move(read_ctx->miss_extents), &read_ctx->miss_bl,
737 fadvise_flags, read_ctx);
738 }
739 });
740
741 /*
742 * The strategy here is to look up all the WriteLogMapEntries that overlap
743 * this read, and iterate through those to separate this read into hits and
744 * misses. A new Extents object is produced here with Extents for each miss
745 * region. The miss Extents is then passed on to the read cache below RWL. We
746 * also produce an ImageExtentBufs for all the extents (hit or miss) in this
747 * read. When the read from the lower cache layer completes, we iterate
748 * through the ImageExtentBufs and insert buffers for each cache hit at the
749 * appropriate spot in the bufferlist returned from below for the miss
750 * read. The buffers we insert here refer directly to regions of various
751 * write log entry data buffers.
752 *
753 * Locking: These buffer objects hold a reference on the write log entries
754 * they refer to. Log entries can't be retired until there are no references.
755 * The GenericWriteLogEntry references are released by the buffer destructor.
756 */
757 for (auto &extent : image_extents) {
758 uint64_t extent_offset = 0;
759 RWLock::RLocker entry_reader_locker(m_entry_reader_lock);
760 WriteLogMapEntries map_entries = m_blocks_to_log_entries.find_map_entries(
761 block_extent(extent));
762 for (auto &map_entry : map_entries) {
763 Extent entry_image_extent(pwl::image_extent(map_entry.block_extent));
764 /* If this map entry starts after the current image extent offset ... */
765 if (entry_image_extent.first > extent.first + extent_offset) {
766 /* ... add range before map_entry to miss extents */
767 uint64_t miss_extent_start = extent.first + extent_offset;
768 uint64_t miss_extent_length = entry_image_extent.first -
769 miss_extent_start;
770 Extent miss_extent(miss_extent_start, miss_extent_length);
771 read_ctx->miss_extents.push_back(miss_extent);
772 /* Add miss range to read extents */
773 auto miss_extent_buf = std::make_shared<ImageExtentBuf>(miss_extent);
774 read_ctx->read_extents.push_back(miss_extent_buf);
775 extent_offset += miss_extent_length;
776 }
777 ceph_assert(entry_image_extent.first <= extent.first + extent_offset);
778 uint64_t entry_offset = 0;
779 /* If this map entry starts before the current image extent offset ... */
780 if (entry_image_extent.first < extent.first + extent_offset) {
781 /* ... compute offset into log entry for this read extent */
782 entry_offset = (extent.first + extent_offset) - entry_image_extent.first;
783 }
784 /* This read hit ends at the end of the extent or the end of the log
785 entry, whichever is less. */
786 uint64_t entry_hit_length = min(entry_image_extent.second - entry_offset,
787 extent.second - extent_offset);
788 Extent hit_extent(entry_image_extent.first, entry_hit_length);
789 if (0 == map_entry.log_entry->write_bytes() &&
790 0 < map_entry.log_entry->bytes_dirty()) {
791 /* discard log entry */
792 ldout(cct, 20) << "discard log entry" << dendl;
793 auto discard_entry = map_entry.log_entry;
794 ldout(cct, 20) << "read hit on discard entry: log_entry="
795 << *discard_entry
796 << dendl;
797 /* Discards read as zero, so we'll construct a bufferlist of zeros */
798 bufferlist zero_bl;
799 zero_bl.append_zero(entry_hit_length);
800 /* Add hit extent to read extents */
801 auto hit_extent_buf = std::make_shared<ImageExtentBuf>(
802 hit_extent, zero_bl);
803 read_ctx->read_extents.push_back(hit_extent_buf);
804 } else {
805 ldout(cct, 20) << "write or writesame log entry" << dendl;
806 /* write and writesame log entry */
807 /* Offset of the map entry into the log entry's buffer */
808 uint64_t map_entry_buffer_offset = entry_image_extent.first -
809 map_entry.log_entry->ram_entry.image_offset_bytes;
810 /* Offset into the log entry buffer of this read hit */
811 uint64_t read_buffer_offset = map_entry_buffer_offset + entry_offset;
812 /* Create buffer object referring to pmem pool for this read hit */
813 collect_read_extents(
814 read_buffer_offset, map_entry, log_entries_to_read, bls_to_read,
815 entry_hit_length, hit_extent, read_ctx);
816 }
817 /* Exclude RWL hit range from buffer and extent */
818 extent_offset += entry_hit_length;
819 ldout(cct, 20) << map_entry << dendl;
820 }
821 /* If the last map entry didn't consume the entire image extent ... */
822 if (extent.second > extent_offset) {
823 /* ... add the rest of this extent to miss extents */
824 uint64_t miss_extent_start = extent.first + extent_offset;
825 uint64_t miss_extent_length = extent.second - extent_offset;
826 Extent miss_extent(miss_extent_start, miss_extent_length);
827 read_ctx->miss_extents.push_back(miss_extent);
828 /* Add miss range to read extents */
829 auto miss_extent_buf = std::make_shared<ImageExtentBuf>(miss_extent);
830 read_ctx->read_extents.push_back(miss_extent_buf);
831 extent_offset += miss_extent_length;
832 }
833 }
834
835 ldout(cct, 20) << "miss_extents=" << read_ctx->miss_extents
836 << ", miss_bl=" << read_ctx->miss_bl << dendl;
837
838 complete_read(log_entries_to_read, bls_to_read, ctx);
839 }
840
841 template <typename I>
842 void AbstractWriteLog<I>::write(Extents &&image_extents,
843 bufferlist&& bl,
844 int fadvise_flags,
845 Context *on_finish) {
846 CephContext *cct = m_image_ctx.cct;
847
848 ldout(cct, 20) << "aio_write" << dendl;
849
850 utime_t now = ceph_clock_now();
851 m_perfcounter->inc(l_librbd_pwl_wr_req, 1);
852
853 ceph_assert(m_initialized);
854
855 /* Split image extents larger than 1M. This isn't strictly necessary but
856 * makes libpmemobj allocator's job easier and reduces pmemobj_defrag() cost.
857 * We plan to manage pmem space and allocation by ourselves in the future.
858 */
859 Extents split_image_extents;
860 uint64_t max_extent_size = get_max_extent();
861 if (max_extent_size != 0) {
862 for (auto extent : image_extents) {
863 if (extent.second > max_extent_size) {
864 uint64_t off = extent.first;
865 uint64_t extent_bytes = extent.second;
866 for (int i = 0; extent_bytes != 0; ++i) {
867 Extent _ext;
868 _ext.first = off + i * max_extent_size;
869 _ext.second = std::min(max_extent_size, extent_bytes);
870 extent_bytes = extent_bytes - _ext.second ;
871 split_image_extents.emplace_back(_ext);
872 }
873 } else {
874 split_image_extents.emplace_back(extent);
875 }
876 }
877 } else {
878 split_image_extents = image_extents;
879 }
880
881 C_WriteRequestT *write_req =
882 m_builder->create_write_request(*this, now, std::move(split_image_extents),
883 std::move(bl), fadvise_flags, m_lock,
884 m_perfcounter, on_finish);
885 m_perfcounter->inc(l_librbd_pwl_wr_bytes,
886 write_req->image_extents_summary.total_bytes);
887
888 /* The lambda below will be called when the block guard for all
889 * blocks affected by this write is obtained */
890 GuardedRequestFunctionContext *guarded_ctx =
891 new GuardedRequestFunctionContext([this,
892 write_req](GuardedRequestFunctionContext &guard_ctx) {
893 write_req->blockguard_acquired(guard_ctx);
894 alloc_and_dispatch_io_req(write_req);
895 });
896
897 detain_guarded_request(write_req, guarded_ctx, false);
898 }
899
900 template <typename I>
901 void AbstractWriteLog<I>::discard(uint64_t offset, uint64_t length,
902 uint32_t discard_granularity_bytes,
903 Context *on_finish) {
904 CephContext *cct = m_image_ctx.cct;
905
906 ldout(cct, 20) << dendl;
907
908 utime_t now = ceph_clock_now();
909 m_perfcounter->inc(l_librbd_pwl_discard, 1);
910 Extents discard_extents = {{offset, length}};
911 m_discard_granularity_bytes = discard_granularity_bytes;
912
913 ceph_assert(m_initialized);
914
915 auto *discard_req =
916 new C_DiscardRequestT(*this, now, std::move(discard_extents), discard_granularity_bytes,
917 m_lock, m_perfcounter, on_finish);
918
919 /* The lambda below will be called when the block guard for all
920 * blocks affected by this write is obtained */
921 GuardedRequestFunctionContext *guarded_ctx =
922 new GuardedRequestFunctionContext([this, discard_req](GuardedRequestFunctionContext &guard_ctx) {
923 discard_req->blockguard_acquired(guard_ctx);
924 alloc_and_dispatch_io_req(discard_req);
925 });
926
927 detain_guarded_request(discard_req, guarded_ctx, false);
928 }
929
930 /**
931 * Aio_flush completes when all previously completed writes are
932 * flushed to persistent cache. We make a best-effort attempt to also
933 * defer until all in-progress writes complete, but we may not know
934 * about all of the writes the application considers in-progress yet,
935 * due to uncertainty in the IO submission workq (multiple WQ threads
936 * may allow out-of-order submission).
937 *
938 * This flush operation will not wait for writes deferred for overlap
939 * in the block guard.
940 */
941 template <typename I>
942 void AbstractWriteLog<I>::flush(io::FlushSource flush_source, Context *on_finish) {
943 CephContext *cct = m_image_ctx.cct;
944 ldout(cct, 20) << "on_finish=" << on_finish << " flush_source=" << flush_source << dendl;
945
946 if (io::FLUSH_SOURCE_SHUTDOWN == flush_source || io::FLUSH_SOURCE_INTERNAL == flush_source ||
947 io::FLUSH_SOURCE_WRITE_BLOCK == flush_source) {
948 internal_flush(false, on_finish);
949 return;
950 }
951 m_perfcounter->inc(l_librbd_pwl_aio_flush, 1);
952
953 /* May be called even if initialization fails */
954 if (!m_initialized) {
955 ldout(cct, 05) << "never initialized" << dendl;
956 /* Deadlock if completed here */
957 m_image_ctx.op_work_queue->queue(on_finish, 0);
958 return;
959 }
960
961 {
962 std::shared_lock image_locker(m_image_ctx.image_lock);
963 if (m_image_ctx.snap_id != CEPH_NOSNAP || m_image_ctx.read_only) {
964 on_finish->complete(-EROFS);
965 return;
966 }
967 }
968
969 auto flush_req = make_flush_req(on_finish);
970
971 GuardedRequestFunctionContext *guarded_ctx =
972 new GuardedRequestFunctionContext([this, flush_req](GuardedRequestFunctionContext &guard_ctx) {
973 ldout(m_image_ctx.cct, 20) << "flush_req=" << flush_req << " cell=" << guard_ctx.cell << dendl;
974 ceph_assert(guard_ctx.cell);
975 flush_req->detained = guard_ctx.state.detained;
976 /* We don't call flush_req->set_cell(), because the block guard will be released here */
977 {
978 DeferredContexts post_unlock; /* Do these when the lock below is released */
979 std::lock_guard locker(m_lock);
980
981 if (!m_persist_on_flush && m_persist_on_write_until_flush) {
982 m_persist_on_flush = true;
983 ldout(m_image_ctx.cct, 5) << "now persisting on flush" << dendl;
984 }
985
986 /*
987 * Create a new sync point if there have been writes since the last
988 * one.
989 *
990 * We do not flush the caches below the RWL here.
991 */
992 flush_new_sync_point_if_needed(flush_req, post_unlock);
993 }
994
995 release_guarded_request(guard_ctx.cell);
996 });
997
998 detain_guarded_request(flush_req, guarded_ctx, true);
999 }
1000
1001 template <typename I>
1002 void AbstractWriteLog<I>::writesame(uint64_t offset, uint64_t length,
1003 bufferlist&& bl, int fadvise_flags,
1004 Context *on_finish) {
1005 CephContext *cct = m_image_ctx.cct;
1006
1007 ldout(cct, 20) << "aio_writesame" << dendl;
1008
1009 utime_t now = ceph_clock_now();
1010 Extents ws_extents = {{offset, length}};
1011 m_perfcounter->inc(l_librbd_pwl_ws, 1);
1012 ceph_assert(m_initialized);
1013
1014 /* A write same request is also a write request. The key difference is the
1015 * write same data buffer is shorter than the extent of the request. The full
1016 * extent will be used in the block guard, and appear in
1017 * m_blocks_to_log_entries_map. The data buffer allocated for the WS is only
1018 * as long as the length of the bl here, which is the pattern that's repeated
1019 * in the image for the entire length of this WS. Read hits and flushing of
1020 * write sames are different than normal writes. */
1021 C_WriteSameRequestT *ws_req =
1022 m_builder->create_writesame_request(*this, now, std::move(ws_extents), std::move(bl),
1023 fadvise_flags, m_lock, m_perfcounter, on_finish);
1024 m_perfcounter->inc(l_librbd_pwl_ws_bytes, ws_req->image_extents_summary.total_bytes);
1025
1026 /* The lambda below will be called when the block guard for all
1027 * blocks affected by this write is obtained */
1028 GuardedRequestFunctionContext *guarded_ctx =
1029 new GuardedRequestFunctionContext([this, ws_req](GuardedRequestFunctionContext &guard_ctx) {
1030 ws_req->blockguard_acquired(guard_ctx);
1031 alloc_and_dispatch_io_req(ws_req);
1032 });
1033
1034 detain_guarded_request(ws_req, guarded_ctx, false);
1035 }
1036
1037 template <typename I>
1038 void AbstractWriteLog<I>::compare_and_write(Extents &&image_extents,
1039 bufferlist&& cmp_bl,
1040 bufferlist&& bl,
1041 uint64_t *mismatch_offset,
1042 int fadvise_flags,
1043 Context *on_finish) {
1044 ldout(m_image_ctx.cct, 20) << dendl;
1045
1046 utime_t now = ceph_clock_now();
1047 m_perfcounter->inc(l_librbd_pwl_cmp, 1);
1048 ceph_assert(m_initialized);
1049
1050 /* A compare and write request is also a write request. We only allocate
1051 * resources and dispatch this write request if the compare phase
1052 * succeeds. */
1053 C_WriteRequestT *cw_req =
1054 m_builder->create_comp_and_write_request(
1055 *this, now, std::move(image_extents), std::move(cmp_bl), std::move(bl),
1056 mismatch_offset, fadvise_flags, m_lock, m_perfcounter, on_finish);
1057 m_perfcounter->inc(l_librbd_pwl_cmp_bytes, cw_req->image_extents_summary.total_bytes);
1058
1059 /* The lambda below will be called when the block guard for all
1060 * blocks affected by this write is obtained */
1061 GuardedRequestFunctionContext *guarded_ctx =
1062 new GuardedRequestFunctionContext([this, cw_req](GuardedRequestFunctionContext &guard_ctx) {
1063 cw_req->blockguard_acquired(guard_ctx);
1064
1065 auto read_complete_ctx = new LambdaContext(
1066 [this, cw_req](int r) {
1067 ldout(m_image_ctx.cct, 20) << "name: " << m_image_ctx.name << " id: " << m_image_ctx.id
1068 << "cw_req=" << cw_req << dendl;
1069
1070 /* Compare read_bl to cmp_bl to determine if this will produce a write */
1071 buffer::list aligned_read_bl;
1072 if (cw_req->cmp_bl.length() < cw_req->read_bl.length()) {
1073 aligned_read_bl.substr_of(cw_req->read_bl, 0, cw_req->cmp_bl.length());
1074 }
1075 if (cw_req->cmp_bl.contents_equal(cw_req->read_bl) ||
1076 cw_req->cmp_bl.contents_equal(aligned_read_bl)) {
1077 /* Compare phase succeeds. Begin write */
1078 ldout(m_image_ctx.cct, 5) << " cw_req=" << cw_req << " compare matched" << dendl;
1079 cw_req->compare_succeeded = true;
1080 *cw_req->mismatch_offset = 0;
1081 /* Continue with this request as a write. Blockguard release and
1082 * user request completion handled as if this were a plain
1083 * write. */
1084 alloc_and_dispatch_io_req(cw_req);
1085 } else {
1086 /* Compare phase fails. Comp-and write ends now. */
1087 ldout(m_image_ctx.cct, 15) << " cw_req=" << cw_req << " compare failed" << dendl;
1088 /* Bufferlist doesn't tell us where they differed, so we'll have to determine that here */
1089 uint64_t bl_index = 0;
1090 for (bl_index = 0; bl_index < cw_req->cmp_bl.length(); bl_index++) {
1091 if (cw_req->cmp_bl[bl_index] != cw_req->read_bl[bl_index]) {
1092 ldout(m_image_ctx.cct, 15) << " cw_req=" << cw_req << " mismatch at " << bl_index << dendl;
1093 break;
1094 }
1095 }
1096 cw_req->compare_succeeded = false;
1097 *cw_req->mismatch_offset = bl_index;
1098 cw_req->complete_user_request(-EILSEQ);
1099 cw_req->release_cell();
1100 cw_req->complete(0);
1101 }
1102 });
1103
1104 /* Read phase of comp-and-write must read through RWL */
1105 Extents image_extents_copy = cw_req->image_extents;
1106 read(std::move(image_extents_copy), &cw_req->read_bl, cw_req->fadvise_flags, read_complete_ctx);
1107 });
1108
1109 detain_guarded_request(cw_req, guarded_ctx, false);
1110 }
1111
1112 template <typename I>
1113 void AbstractWriteLog<I>::flush(Context *on_finish) {
1114 internal_flush(false, on_finish);
1115 }
1116
1117 template <typename I>
1118 void AbstractWriteLog<I>::invalidate(Context *on_finish) {
1119 internal_flush(true, on_finish);
1120 }
1121
1122 template <typename I>
1123 CephContext *AbstractWriteLog<I>::get_context() {
1124 return m_image_ctx.cct;
1125 }
1126
1127 template <typename I>
1128 BlockGuardCell* AbstractWriteLog<I>::detain_guarded_request_helper(GuardedRequest &req)
1129 {
1130 CephContext *cct = m_image_ctx.cct;
1131 BlockGuardCell *cell;
1132
1133 ceph_assert(ceph_mutex_is_locked_by_me(m_blockguard_lock));
1134 ldout(cct, 20) << dendl;
1135
1136 int r = m_write_log_guard.detain(req.block_extent, &req, &cell);
1137 ceph_assert(r>=0);
1138 if (r > 0) {
1139 ldout(cct, 20) << "detaining guarded request due to in-flight requests: "
1140 << "req=" << req << dendl;
1141 return nullptr;
1142 }
1143
1144 ldout(cct, 20) << "in-flight request cell: " << cell << dendl;
1145 return cell;
1146 }
1147
1148 template <typename I>
1149 BlockGuardCell* AbstractWriteLog<I>::detain_guarded_request_barrier_helper(
1150 GuardedRequest &req)
1151 {
1152 BlockGuardCell *cell = nullptr;
1153
1154 ceph_assert(ceph_mutex_is_locked_by_me(m_blockguard_lock));
1155 ldout(m_image_ctx.cct, 20) << dendl;
1156
1157 if (m_barrier_in_progress) {
1158 req.guard_ctx->state.queued = true;
1159 m_awaiting_barrier.push_back(req);
1160 } else {
1161 bool barrier = req.guard_ctx->state.barrier;
1162 if (barrier) {
1163 m_barrier_in_progress = true;
1164 req.guard_ctx->state.current_barrier = true;
1165 }
1166 cell = detain_guarded_request_helper(req);
1167 if (barrier) {
1168 /* Only non-null if the barrier acquires the guard now */
1169 m_barrier_cell = cell;
1170 }
1171 }
1172
1173 return cell;
1174 }
1175
1176 template <typename I>
1177 void AbstractWriteLog<I>::detain_guarded_request(
1178 C_BlockIORequestT *request,
1179 GuardedRequestFunctionContext *guarded_ctx,
1180 bool is_barrier)
1181 {
1182 BlockExtent extent;
1183 if (request) {
1184 extent = request->image_extents_summary.block_extent();
1185 } else {
1186 extent = block_extent(whole_volume_extent());
1187 }
1188 auto req = GuardedRequest(extent, guarded_ctx, is_barrier);
1189 BlockGuardCell *cell = nullptr;
1190
1191 ldout(m_image_ctx.cct, 20) << dendl;
1192 {
1193 std::lock_guard locker(m_blockguard_lock);
1194 cell = detain_guarded_request_barrier_helper(req);
1195 }
1196 if (cell) {
1197 req.guard_ctx->cell = cell;
1198 req.guard_ctx->complete(0);
1199 }
1200 }
1201
1202 template <typename I>
1203 void AbstractWriteLog<I>::release_guarded_request(BlockGuardCell *released_cell)
1204 {
1205 CephContext *cct = m_image_ctx.cct;
1206 WriteLogGuard::BlockOperations block_reqs;
1207 ldout(cct, 20) << "released_cell=" << released_cell << dendl;
1208
1209 {
1210 std::lock_guard locker(m_blockguard_lock);
1211 m_write_log_guard.release(released_cell, &block_reqs);
1212
1213 for (auto &req : block_reqs) {
1214 req.guard_ctx->state.detained = true;
1215 BlockGuardCell *detained_cell = detain_guarded_request_helper(req);
1216 if (detained_cell) {
1217 if (req.guard_ctx->state.current_barrier) {
1218 /* The current barrier is acquiring the block guard, so now we know its cell */
1219 m_barrier_cell = detained_cell;
1220 /* detained_cell could be == released_cell here */
1221 ldout(cct, 20) << "current barrier cell=" << detained_cell << " req=" << req << dendl;
1222 }
1223 req.guard_ctx->cell = detained_cell;
1224 m_work_queue.queue(req.guard_ctx);
1225 }
1226 }
1227
1228 if (m_barrier_in_progress && (released_cell == m_barrier_cell)) {
1229 ldout(cct, 20) << "current barrier released cell=" << released_cell << dendl;
1230 /* The released cell is the current barrier request */
1231 m_barrier_in_progress = false;
1232 m_barrier_cell = nullptr;
1233 /* Move waiting requests into the blockguard. Stop if there's another barrier */
1234 while (!m_barrier_in_progress && !m_awaiting_barrier.empty()) {
1235 auto &req = m_awaiting_barrier.front();
1236 ldout(cct, 20) << "submitting queued request to blockguard: " << req << dendl;
1237 BlockGuardCell *detained_cell = detain_guarded_request_barrier_helper(req);
1238 if (detained_cell) {
1239 req.guard_ctx->cell = detained_cell;
1240 m_work_queue.queue(req.guard_ctx);
1241 }
1242 m_awaiting_barrier.pop_front();
1243 }
1244 }
1245 }
1246
1247 ldout(cct, 20) << "exit" << dendl;
1248 }
1249
1250 template <typename I>
1251 void AbstractWriteLog<I>::append_scheduled(GenericLogOperations &ops, bool &ops_remain,
1252 bool &appending, bool isRWL)
1253 {
1254 const unsigned long int OPS_APPENDED = isRWL ? MAX_ALLOC_PER_TRANSACTION
1255 : MAX_WRITES_PER_SYNC_POINT;
1256 {
1257 std::lock_guard locker(m_lock);
1258 if (!appending && m_appending) {
1259 /* Another thread is appending */
1260 ldout(m_image_ctx.cct, 15) << "Another thread is appending" << dendl;
1261 return;
1262 }
1263 if (m_ops_to_append.size()) {
1264 appending = true;
1265 m_appending = true;
1266 auto last_in_batch = m_ops_to_append.begin();
1267 unsigned int ops_to_append = m_ops_to_append.size();
1268 if (ops_to_append > OPS_APPENDED) {
1269 ops_to_append = OPS_APPENDED;
1270 }
1271 std::advance(last_in_batch, ops_to_append);
1272 ops.splice(ops.end(), m_ops_to_append, m_ops_to_append.begin(), last_in_batch);
1273 ops_remain = true; /* Always check again before leaving */
1274 ldout(m_image_ctx.cct, 20) << "appending " << ops.size() << ", remain "
1275 << m_ops_to_append.size() << dendl;
1276 } else if (isRWL) {
1277 ops_remain = false;
1278 if (appending) {
1279 appending = false;
1280 m_appending = false;
1281 }
1282 }
1283 }
1284 }
1285
1286 template <typename I>
1287 void AbstractWriteLog<I>::schedule_append(GenericLogOperationsVector &ops, C_BlockIORequestT *req)
1288 {
1289 GenericLogOperations to_append(ops.begin(), ops.end());
1290
1291 schedule_append_ops(to_append, req);
1292 }
1293
1294 template <typename I>
1295 void AbstractWriteLog<I>::schedule_append(GenericLogOperationSharedPtr op, C_BlockIORequestT *req)
1296 {
1297 GenericLogOperations to_append { op };
1298
1299 schedule_append_ops(to_append, req);
1300 }
1301
1302 /*
1303 * Complete a set of write ops with the result of append_op_entries.
1304 */
1305 template <typename I>
1306 void AbstractWriteLog<I>::complete_op_log_entries(GenericLogOperations &&ops,
1307 const int result)
1308 {
1309 GenericLogEntries dirty_entries;
1310 int published_reserves = 0;
1311 bool need_update_state = false;
1312 ldout(m_image_ctx.cct, 20) << __func__ << ": completing" << dendl;
1313 for (auto &op : ops) {
1314 utime_t now = ceph_clock_now();
1315 auto log_entry = op->get_log_entry();
1316 log_entry->completed = true;
1317 if (op->is_writing_op()) {
1318 op->mark_log_entry_completed();
1319 dirty_entries.push_back(log_entry);
1320 }
1321 if (log_entry->is_write_entry()) {
1322 release_ram(log_entry);
1323 }
1324 if (op->reserved_allocated()) {
1325 published_reserves++;
1326 }
1327 {
1328 std::lock_guard locker(m_lock);
1329 m_unpublished_reserves -= published_reserves;
1330 m_dirty_log_entries.splice(m_dirty_log_entries.end(), dirty_entries);
1331 if (m_cache_state->clean && !this->m_dirty_log_entries.empty()) {
1332 m_cache_state->clean = false;
1333 update_image_cache_state();
1334 need_update_state = true;
1335 }
1336 }
1337 op->complete(result);
1338 m_perfcounter->tinc(l_librbd_pwl_log_op_dis_to_app_t,
1339 op->log_append_start_time - op->dispatch_time);
1340 m_perfcounter->tinc(l_librbd_pwl_log_op_dis_to_cmp_t, now - op->dispatch_time);
1341 m_perfcounter->hinc(l_librbd_pwl_log_op_dis_to_cmp_t_hist,
1342 utime_t(now - op->dispatch_time).to_nsec(),
1343 log_entry->ram_entry.write_bytes);
1344 utime_t app_lat = op->log_append_comp_time - op->log_append_start_time;
1345 m_perfcounter->tinc(l_librbd_pwl_log_op_app_to_appc_t, app_lat);
1346 m_perfcounter->hinc(l_librbd_pwl_log_op_app_to_appc_t_hist, app_lat.to_nsec(),
1347 log_entry->ram_entry.write_bytes);
1348 m_perfcounter->tinc(l_librbd_pwl_log_op_app_to_cmp_t, now - op->log_append_start_time);
1349 }
1350 if (need_update_state) {
1351 std::unique_lock locker(m_lock);
1352 write_image_cache_state(locker);
1353 }
1354 // New entries may be flushable
1355 {
1356 std::lock_guard locker(m_lock);
1357 wake_up();
1358 }
1359 }
1360
1361 /**
1362 * Dispatch as many deferred writes as possible
1363 */
1364 template <typename I>
1365 void AbstractWriteLog<I>::dispatch_deferred_writes(void)
1366 {
1367 C_BlockIORequestT *front_req = nullptr; /* req still on front of deferred list */
1368 C_BlockIORequestT *allocated_req = nullptr; /* req that was allocated, and is now off the list */
1369 bool allocated = false; /* front_req allocate succeeded */
1370 bool cleared_dispatching_flag = false;
1371
1372 /* If we can't become the dispatcher, we'll exit */
1373 {
1374 std::lock_guard locker(m_lock);
1375 if (m_dispatching_deferred_ops ||
1376 !m_deferred_ios.size()) {
1377 return;
1378 }
1379 m_dispatching_deferred_ops = true;
1380 }
1381
1382 /* There are ops to dispatch, and this should be the only thread dispatching them */
1383 {
1384 std::lock_guard deferred_dispatch(m_deferred_dispatch_lock);
1385 do {
1386 {
1387 std::lock_guard locker(m_lock);
1388 ceph_assert(m_dispatching_deferred_ops);
1389 if (allocated) {
1390 /* On the 2..n-1 th time we get lock, front_req->alloc_resources() will
1391 * have succeeded, and we'll need to pop it off the deferred ops list
1392 * here. */
1393 ceph_assert(front_req);
1394 ceph_assert(!allocated_req);
1395 m_deferred_ios.pop_front();
1396 allocated_req = front_req;
1397 front_req = nullptr;
1398 allocated = false;
1399 }
1400 ceph_assert(!allocated);
1401 if (!allocated && front_req) {
1402 /* front_req->alloc_resources() failed on the last iteration.
1403 * We'll stop dispatching. */
1404 wake_up();
1405 front_req = nullptr;
1406 ceph_assert(!cleared_dispatching_flag);
1407 m_dispatching_deferred_ops = false;
1408 cleared_dispatching_flag = true;
1409 } else {
1410 ceph_assert(!front_req);
1411 if (m_deferred_ios.size()) {
1412 /* New allocation candidate */
1413 front_req = m_deferred_ios.front();
1414 } else {
1415 ceph_assert(!cleared_dispatching_flag);
1416 m_dispatching_deferred_ops = false;
1417 cleared_dispatching_flag = true;
1418 }
1419 }
1420 }
1421 /* Try allocating for front_req before we decide what to do with allocated_req
1422 * (if any) */
1423 if (front_req) {
1424 ceph_assert(!cleared_dispatching_flag);
1425 allocated = front_req->alloc_resources();
1426 }
1427 if (allocated_req && front_req && allocated) {
1428 /* Push dispatch of the first allocated req to a wq */
1429 m_work_queue.queue(new LambdaContext(
1430 [allocated_req](int r) {
1431 allocated_req->dispatch();
1432 }), 0);
1433 allocated_req = nullptr;
1434 }
1435 ceph_assert(!(allocated_req && front_req && allocated));
1436
1437 /* Continue while we're still considering the front of the deferred ops list */
1438 } while (front_req);
1439 ceph_assert(!allocated);
1440 }
1441 ceph_assert(cleared_dispatching_flag);
1442
1443 /* If any deferred requests were allocated, the last one will still be in allocated_req */
1444 if (allocated_req) {
1445 allocated_req->dispatch();
1446 }
1447 }
1448
1449 /**
1450 * Returns the lanes used by this write, and attempts to dispatch the next
1451 * deferred write
1452 */
1453 template <typename I>
1454 void AbstractWriteLog<I>::release_write_lanes(C_BlockIORequestT *req)
1455 {
1456 {
1457 std::lock_guard locker(m_lock);
1458 m_free_lanes += req->image_extents.size();
1459 }
1460 dispatch_deferred_writes();
1461 }
1462
1463 /**
1464 * Attempts to allocate log resources for a write. Write is dispatched if
1465 * resources are available, or queued if they aren't.
1466 */
1467 template <typename I>
1468 void AbstractWriteLog<I>::alloc_and_dispatch_io_req(C_BlockIORequestT *req)
1469 {
1470 bool dispatch_here = false;
1471
1472 {
1473 /* If there are already deferred writes, queue behind them for resources */
1474 {
1475 std::lock_guard locker(m_lock);
1476 dispatch_here = m_deferred_ios.empty();
1477 // Only flush req's total_bytes is the max uint64
1478 if (req->image_extents_summary.total_bytes ==
1479 std::numeric_limits<uint64_t>::max() &&
1480 static_cast<C_FlushRequestT *>(req)->internal == true) {
1481 dispatch_here = true;
1482 }
1483 }
1484 if (dispatch_here) {
1485 dispatch_here = req->alloc_resources();
1486 }
1487 if (dispatch_here) {
1488 ldout(m_image_ctx.cct, 20) << "dispatching" << dendl;
1489 req->dispatch();
1490 } else {
1491 req->deferred();
1492 {
1493 std::lock_guard locker(m_lock);
1494 m_deferred_ios.push_back(req);
1495 }
1496 ldout(m_image_ctx.cct, 20) << "deferred IOs: " << m_deferred_ios.size() << dendl;
1497 dispatch_deferred_writes();
1498 }
1499 }
1500 }
1501
1502 template <typename I>
1503 bool AbstractWriteLog<I>::check_allocation(
1504 C_BlockIORequestT *req, uint64_t bytes_cached, uint64_t bytes_dirtied,
1505 uint64_t bytes_allocated, uint32_t num_lanes, uint32_t num_log_entries,
1506 uint32_t num_unpublished_reserves) {
1507 bool alloc_succeeds = true;
1508 bool no_space = false;
1509 {
1510 std::lock_guard locker(m_lock);
1511 if (m_free_lanes < num_lanes) {
1512 ldout(m_image_ctx.cct, 20) << "not enough free lanes (need "
1513 << num_lanes
1514 << ", have " << m_free_lanes << ") "
1515 << *req << dendl;
1516 alloc_succeeds = false;
1517 /* This isn't considered a "no space" alloc fail. Lanes are a throttling mechanism. */
1518 }
1519 if (m_free_log_entries < num_log_entries) {
1520 ldout(m_image_ctx.cct, 20) << "not enough free entries (need "
1521 << num_log_entries
1522 << ", have " << m_free_log_entries << ") "
1523 << *req << dendl;
1524 alloc_succeeds = false;
1525 no_space = true; /* Entries must be retired */
1526 }
1527 /* Don't attempt buffer allocate if we've exceeded the "full" threshold */
1528 if (m_bytes_allocated + bytes_allocated > m_bytes_allocated_cap) {
1529 ldout(m_image_ctx.cct, 20) << "Waiting for allocation cap (cap="
1530 << m_bytes_allocated_cap
1531 << ", allocated=" << m_bytes_allocated
1532 << ") in write [" << *req << "]" << dendl;
1533 alloc_succeeds = false;
1534 no_space = true; /* Entries must be retired */
1535 }
1536 }
1537
1538 if (alloc_succeeds) {
1539 reserve_cache(req, alloc_succeeds, no_space);
1540 }
1541
1542 if (alloc_succeeds) {
1543 std::lock_guard locker(m_lock);
1544 /* We need one free log entry per extent (each is a separate entry), and
1545 * one free "lane" for remote replication. */
1546 if ((m_free_lanes >= num_lanes) &&
1547 (m_free_log_entries >= num_log_entries) &&
1548 (m_bytes_allocated_cap >= m_bytes_allocated + bytes_allocated)) {
1549 m_free_lanes -= num_lanes;
1550 m_free_log_entries -= num_log_entries;
1551 m_unpublished_reserves += num_unpublished_reserves;
1552 m_bytes_allocated += bytes_allocated;
1553 m_bytes_cached += bytes_cached;
1554 m_bytes_dirty += bytes_dirtied;
1555 } else {
1556 alloc_succeeds = false;
1557 }
1558 }
1559
1560 if (!alloc_succeeds && no_space) {
1561 /* Expedite flushing and/or retiring */
1562 std::lock_guard locker(m_lock);
1563 m_alloc_failed_since_retire = true;
1564 m_last_alloc_fail = ceph_clock_now();
1565 }
1566
1567 return alloc_succeeds;
1568 }
1569
1570 template <typename I>
1571 C_FlushRequest<AbstractWriteLog<I>>* AbstractWriteLog<I>::make_flush_req(Context *on_finish) {
1572 utime_t flush_begins = ceph_clock_now();
1573 bufferlist bl;
1574 auto *flush_req =
1575 new C_FlushRequestT(*this, flush_begins, Extents({whole_volume_extent()}),
1576 std::move(bl), 0, m_lock, m_perfcounter, on_finish);
1577
1578 return flush_req;
1579 }
1580
1581 template <typename I>
1582 void AbstractWriteLog<I>::wake_up() {
1583 CephContext *cct = m_image_ctx.cct;
1584 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1585
1586 if (!m_wake_up_enabled) {
1587 // wake_up is disabled during shutdown after flushing completes
1588 ldout(m_image_ctx.cct, 6) << "deferred processing disabled" << dendl;
1589 return;
1590 }
1591
1592 if (m_wake_up_requested && m_wake_up_scheduled) {
1593 return;
1594 }
1595
1596 ldout(cct, 20) << dendl;
1597
1598 /* Wake-up can be requested while it's already scheduled */
1599 m_wake_up_requested = true;
1600
1601 /* Wake-up cannot be scheduled if it's already scheduled */
1602 if (m_wake_up_scheduled) {
1603 return;
1604 }
1605 m_wake_up_scheduled = true;
1606 m_async_process_work++;
1607 m_async_op_tracker.start_op();
1608 m_work_queue.queue(new LambdaContext(
1609 [this](int r) {
1610 process_work();
1611 m_async_op_tracker.finish_op();
1612 m_async_process_work--;
1613 }), 0);
1614 }
1615
1616 template <typename I>
1617 bool AbstractWriteLog<I>::can_flush_entry(std::shared_ptr<GenericLogEntry> log_entry) {
1618 CephContext *cct = m_image_ctx.cct;
1619
1620 ldout(cct, 20) << "" << dendl;
1621 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1622
1623 if (m_invalidating) {
1624 return true;
1625 }
1626
1627 /* For OWB we can flush entries with the same sync gen number (write between
1628 * aio_flush() calls) concurrently. Here we'll consider an entry flushable if
1629 * its sync gen number is <= the lowest sync gen number carried by all the
1630 * entries currently flushing.
1631 *
1632 * If the entry considered here bears a sync gen number lower than a
1633 * previously flushed entry, the application had to have submitted the write
1634 * bearing the higher gen number before the write with the lower gen number
1635 * completed. So, flushing these concurrently is OK.
1636 *
1637 * If the entry considered here bears a sync gen number higher than a
1638 * currently flushing entry, the write with the lower gen number may have
1639 * completed to the application before the write with the higher sync gen
1640 * number was submitted, and the application may rely on that completion
1641 * order for volume consistency. In this case the entry will not be
1642 * considered flushable until all the entries bearing lower sync gen numbers
1643 * finish flushing.
1644 */
1645
1646 if (m_flush_ops_in_flight &&
1647 (log_entry->ram_entry.sync_gen_number > m_lowest_flushing_sync_gen)) {
1648 return false;
1649 }
1650
1651 return (log_entry->can_writeback() &&
1652 (m_flush_ops_in_flight <= IN_FLIGHT_FLUSH_WRITE_LIMIT) &&
1653 (m_flush_bytes_in_flight <= IN_FLIGHT_FLUSH_BYTES_LIMIT));
1654 }
1655
1656 template <typename I>
1657 void AbstractWriteLog<I>::detain_flush_guard_request(std::shared_ptr<GenericLogEntry> log_entry,
1658 GuardedRequestFunctionContext *guarded_ctx) {
1659 ldout(m_image_ctx.cct, 20) << dendl;
1660
1661 BlockExtent extent;
1662 if (log_entry->is_sync_point()) {
1663 extent = block_extent(whole_volume_extent());
1664 } else {
1665 extent = log_entry->ram_entry.block_extent();
1666 }
1667
1668 auto req = GuardedRequest(extent, guarded_ctx, false);
1669 BlockGuardCell *cell = nullptr;
1670
1671 {
1672 std::lock_guard locker(m_flush_guard_lock);
1673 m_flush_guard.detain(req.block_extent, &req, &cell);
1674 }
1675 if (cell) {
1676 req.guard_ctx->cell = cell;
1677 m_image_ctx.op_work_queue->queue(req.guard_ctx, 0);
1678 }
1679 }
1680
1681 template <typename I>
1682 Context* AbstractWriteLog<I>::construct_flush_entry(std::shared_ptr<GenericLogEntry> log_entry,
1683 bool invalidating) {
1684 ldout(m_image_ctx.cct, 20) << "" << dendl;
1685
1686 /* Flush write completion action */
1687 utime_t writeback_start_time = ceph_clock_now();
1688 Context *ctx = new LambdaContext(
1689 [this, log_entry, writeback_start_time, invalidating](int r) {
1690 utime_t writeback_comp_time = ceph_clock_now();
1691 m_perfcounter->tinc(l_librbd_pwl_writeback_latency,
1692 writeback_comp_time - writeback_start_time);
1693 {
1694 std::lock_guard locker(m_lock);
1695 if (r < 0) {
1696 lderr(m_image_ctx.cct) << "failed to flush log entry"
1697 << cpp_strerror(r) << dendl;
1698 m_dirty_log_entries.push_front(log_entry);
1699 } else {
1700 ceph_assert(m_bytes_dirty >= log_entry->bytes_dirty());
1701 log_entry->set_flushed(true);
1702 m_bytes_dirty -= log_entry->bytes_dirty();
1703 sync_point_writer_flushed(log_entry->get_sync_point_entry());
1704 ldout(m_image_ctx.cct, 20) << "flushed: " << log_entry
1705 << " invalidating=" << invalidating
1706 << dendl;
1707 }
1708 m_flush_ops_in_flight -= 1;
1709 m_flush_bytes_in_flight -= log_entry->ram_entry.write_bytes;
1710 wake_up();
1711 }
1712 });
1713 /* Flush through lower cache before completing */
1714 ctx = new LambdaContext(
1715 [this, ctx, log_entry](int r) {
1716 {
1717
1718 WriteLogGuard::BlockOperations block_reqs;
1719 BlockGuardCell *detained_cell = nullptr;
1720
1721 std::lock_guard locker{m_flush_guard_lock};
1722 m_flush_guard.release(log_entry->m_cell, &block_reqs);
1723
1724 for (auto &req : block_reqs) {
1725 m_flush_guard.detain(req.block_extent, &req, &detained_cell);
1726 if (detained_cell) {
1727 req.guard_ctx->cell = detained_cell;
1728 m_image_ctx.op_work_queue->queue(req.guard_ctx, 0);
1729 }
1730 }
1731 }
1732
1733 if (r < 0) {
1734 lderr(m_image_ctx.cct) << "failed to flush log entry"
1735 << cpp_strerror(r) << dendl;
1736 ctx->complete(r);
1737 } else {
1738 m_image_writeback.aio_flush(io::FLUSH_SOURCE_WRITEBACK, ctx);
1739 }
1740 });
1741 return ctx;
1742 }
1743
1744 template <typename I>
1745 void AbstractWriteLog<I>::process_writeback_dirty_entries() {
1746 CephContext *cct = m_image_ctx.cct;
1747 bool all_clean = false;
1748 int flushed = 0;
1749 bool has_write_entry = false;
1750 bool need_update_state = false;
1751
1752 ldout(cct, 20) << "Look for dirty entries" << dendl;
1753 {
1754 DeferredContexts post_unlock;
1755 GenericLogEntries entries_to_flush;
1756
1757 std::shared_lock entry_reader_locker(m_entry_reader_lock);
1758 std::lock_guard locker(m_lock);
1759 while (flushed < IN_FLIGHT_FLUSH_WRITE_LIMIT) {
1760 if (m_shutting_down) {
1761 ldout(cct, 5) << "Flush during shutdown supressed" << dendl;
1762 /* Do flush complete only when all flush ops are finished */
1763 all_clean = !m_flush_ops_in_flight;
1764 break;
1765 }
1766 if (m_dirty_log_entries.empty()) {
1767 ldout(cct, 20) << "Nothing new to flush" << dendl;
1768 /* Do flush complete only when all flush ops are finished */
1769 all_clean = !m_flush_ops_in_flight;
1770 if (!m_cache_state->clean && all_clean) {
1771 m_cache_state->clean = true;
1772 update_image_cache_state();
1773 need_update_state = true;
1774 }
1775 break;
1776 }
1777
1778 auto candidate = m_dirty_log_entries.front();
1779 bool flushable = can_flush_entry(candidate);
1780 if (flushable) {
1781 entries_to_flush.push_back(candidate);
1782 flushed++;
1783 if (!has_write_entry)
1784 has_write_entry = candidate->is_write_entry();
1785 m_dirty_log_entries.pop_front();
1786
1787 // To track candidate, we should add m_flush_ops_in_flight in here
1788 {
1789 if (!m_flush_ops_in_flight ||
1790 (candidate->ram_entry.sync_gen_number < m_lowest_flushing_sync_gen)) {
1791 m_lowest_flushing_sync_gen = candidate->ram_entry.sync_gen_number;
1792 }
1793 m_flush_ops_in_flight += 1;
1794 /* For write same this is the bytes affected by the flush op, not the bytes transferred */
1795 m_flush_bytes_in_flight += candidate->ram_entry.write_bytes;
1796 }
1797 } else {
1798 ldout(cct, 20) << "Next dirty entry isn't flushable yet" << dendl;
1799 break;
1800 }
1801 }
1802
1803 construct_flush_entries(entries_to_flush, post_unlock, has_write_entry);
1804 }
1805 if (need_update_state) {
1806 std::unique_lock locker(m_lock);
1807 write_image_cache_state(locker);
1808 }
1809
1810 if (all_clean) {
1811 /* All flushing complete, drain outside lock */
1812 Contexts flush_contexts;
1813 {
1814 std::lock_guard locker(m_lock);
1815 flush_contexts.swap(m_flush_complete_contexts);
1816 }
1817 finish_contexts(m_image_ctx.cct, flush_contexts, 0);
1818 }
1819 }
1820
1821 /* Returns true if the specified SyncPointLogEntry is considered flushed, and
1822 * the log will be updated to reflect this. */
1823 template <typename I>
1824 bool AbstractWriteLog<I>::handle_flushed_sync_point(std::shared_ptr<SyncPointLogEntry> log_entry)
1825 {
1826 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1827 ceph_assert(log_entry);
1828
1829 if ((log_entry->writes_flushed == log_entry->writes) &&
1830 log_entry->completed && log_entry->prior_sync_point_flushed &&
1831 log_entry->next_sync_point_entry) {
1832 ldout(m_image_ctx.cct, 20) << "All writes flushed up to sync point="
1833 << *log_entry << dendl;
1834 log_entry->next_sync_point_entry->prior_sync_point_flushed = true;
1835 /* Don't move the flushed sync gen num backwards. */
1836 if (m_flushed_sync_gen < log_entry->ram_entry.sync_gen_number) {
1837 m_flushed_sync_gen = log_entry->ram_entry.sync_gen_number;
1838 }
1839 m_async_op_tracker.start_op();
1840 m_work_queue.queue(new LambdaContext(
1841 [this, next = std::move(log_entry->next_sync_point_entry)](int r) {
1842 bool handled_by_next;
1843 {
1844 std::lock_guard locker(m_lock);
1845 handled_by_next = handle_flushed_sync_point(std::move(next));
1846 }
1847 if (!handled_by_next) {
1848 persist_last_flushed_sync_gen();
1849 }
1850 m_async_op_tracker.finish_op();
1851 }));
1852 return true;
1853 }
1854 return false;
1855 }
1856
1857 template <typename I>
1858 void AbstractWriteLog<I>::sync_point_writer_flushed(std::shared_ptr<SyncPointLogEntry> log_entry)
1859 {
1860 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1861 ceph_assert(log_entry);
1862 log_entry->writes_flushed++;
1863
1864 /* If this entry might be completely flushed, look closer */
1865 if ((log_entry->writes_flushed == log_entry->writes) && log_entry->completed) {
1866 ldout(m_image_ctx.cct, 15) << "All writes flushed for sync point="
1867 << *log_entry << dendl;
1868 handle_flushed_sync_point(log_entry);
1869 }
1870 }
1871
1872 /* Make a new sync point and flush the previous during initialization, when there may or may
1873 * not be a previous sync point */
1874 template <typename I>
1875 void AbstractWriteLog<I>::init_flush_new_sync_point(DeferredContexts &later) {
1876 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1877 ceph_assert(!m_initialized); /* Don't use this after init */
1878
1879 if (!m_current_sync_point) {
1880 /* First sync point since start */
1881 new_sync_point(later);
1882 } else {
1883 flush_new_sync_point(nullptr, later);
1884 }
1885 }
1886
1887 /**
1888 * Begin a new sync point
1889 */
1890 template <typename I>
1891 void AbstractWriteLog<I>::new_sync_point(DeferredContexts &later) {
1892 CephContext *cct = m_image_ctx.cct;
1893 std::shared_ptr<SyncPoint> old_sync_point = m_current_sync_point;
1894 std::shared_ptr<SyncPoint> new_sync_point;
1895 ldout(cct, 20) << dendl;
1896
1897 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1898
1899 /* The first time this is called, if this is a newly created log,
1900 * this makes the first sync gen number we'll use 1. On the first
1901 * call for a re-opened log m_current_sync_gen will be the highest
1902 * gen number from all the sync point entries found in the re-opened
1903 * log, and this advances to the next sync gen number. */
1904 ++m_current_sync_gen;
1905
1906 new_sync_point = std::make_shared<SyncPoint>(m_current_sync_gen, cct);
1907 m_current_sync_point = new_sync_point;
1908
1909 /* If this log has been re-opened, old_sync_point will initially be
1910 * nullptr, but m_current_sync_gen may not be zero. */
1911 if (old_sync_point) {
1912 new_sync_point->setup_earlier_sync_point(old_sync_point, m_last_op_sequence_num);
1913 m_perfcounter->hinc(l_librbd_pwl_syncpoint_hist,
1914 old_sync_point->log_entry->writes,
1915 old_sync_point->log_entry->bytes);
1916 /* This sync point will acquire no more sub-ops. Activation needs
1917 * to acquire m_lock, so defer to later*/
1918 later.add(new LambdaContext(
1919 [old_sync_point](int r) {
1920 old_sync_point->prior_persisted_gather_activate();
1921 }));
1922 }
1923
1924 new_sync_point->prior_persisted_gather_set_finisher();
1925
1926 if (old_sync_point) {
1927 ldout(cct,6) << "new sync point = [" << *m_current_sync_point
1928 << "], prior = [" << *old_sync_point << "]" << dendl;
1929 } else {
1930 ldout(cct,6) << "first sync point = [" << *m_current_sync_point
1931 << "]" << dendl;
1932 }
1933 }
1934
1935 template <typename I>
1936 void AbstractWriteLog<I>::flush_new_sync_point(C_FlushRequestT *flush_req,
1937 DeferredContexts &later) {
1938 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1939
1940 if (!flush_req) {
1941 m_async_null_flush_finish++;
1942 m_async_op_tracker.start_op();
1943 Context *flush_ctx = new LambdaContext([this](int r) {
1944 m_async_null_flush_finish--;
1945 m_async_op_tracker.finish_op();
1946 });
1947 flush_req = make_flush_req(flush_ctx);
1948 flush_req->internal = true;
1949 }
1950
1951 /* Add a new sync point. */
1952 new_sync_point(later);
1953 std::shared_ptr<SyncPoint> to_append = m_current_sync_point->earlier_sync_point;
1954 ceph_assert(to_append);
1955
1956 /* This flush request will append/persist the (now) previous sync point */
1957 flush_req->to_append = to_append;
1958
1959 /* When the m_sync_point_persist Gather completes this sync point can be
1960 * appended. The only sub for this Gather is the finisher Context for
1961 * m_prior_log_entries_persisted, which records the result of the Gather in
1962 * the sync point, and completes. TODO: Do we still need both of these
1963 * Gathers?*/
1964 Context * ctx = new LambdaContext([this, flush_req](int r) {
1965 ldout(m_image_ctx.cct, 20) << "Flush req=" << flush_req
1966 << " sync point =" << flush_req->to_append
1967 << ". Ready to persist." << dendl;
1968 alloc_and_dispatch_io_req(flush_req);
1969 });
1970 to_append->persist_gather_set_finisher(ctx);
1971
1972 /* The m_sync_point_persist Gather has all the subs it will ever have, and
1973 * now has its finisher. If the sub is already complete, activation will
1974 * complete the Gather. The finisher will acquire m_lock, so we'll activate
1975 * this when we release m_lock.*/
1976 later.add(new LambdaContext([to_append](int r) {
1977 to_append->persist_gather_activate();
1978 }));
1979
1980 /* The flush request completes when the sync point persists */
1981 to_append->add_in_on_persisted_ctxs(flush_req);
1982 }
1983
1984 template <typename I>
1985 void AbstractWriteLog<I>::flush_new_sync_point_if_needed(C_FlushRequestT *flush_req,
1986 DeferredContexts &later) {
1987 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1988
1989 /* If there have been writes since the last sync point ... */
1990 if (m_current_sync_point->log_entry->writes) {
1991 flush_new_sync_point(flush_req, later);
1992 } else {
1993 /* There have been no writes to the current sync point. */
1994 if (m_current_sync_point->earlier_sync_point) {
1995 /* If previous sync point hasn't completed, complete this flush
1996 * with the earlier sync point. No alloc or dispatch needed. */
1997 m_current_sync_point->earlier_sync_point->on_sync_point_persisted.push_back(flush_req);
1998 } else {
1999 /* The previous sync point has already completed and been
2000 * appended. The current sync point has no writes, so this flush
2001 * has nothing to wait for. This flush completes now. */
2002 later.add(flush_req);
2003 }
2004 }
2005 }
2006
2007 /*
2008 * RWL internal flush - will actually flush the RWL.
2009 *
2010 * User flushes should arrive at aio_flush(), and only flush prior
2011 * writes to all log replicas.
2012 *
2013 * Librbd internal flushes will arrive at flush(invalidate=false,
2014 * discard=false), and traverse the block guard to ensure in-flight writes are
2015 * flushed.
2016 */
2017 template <typename I>
2018 void AbstractWriteLog<I>::flush_dirty_entries(Context *on_finish) {
2019 CephContext *cct = m_image_ctx.cct;
2020 bool all_clean;
2021 bool flushing;
2022 bool stop_flushing;
2023
2024 {
2025 std::unique_lock locker(m_lock);
2026 flushing = (0 != m_flush_ops_in_flight);
2027 all_clean = m_dirty_log_entries.empty();
2028 stop_flushing = (m_shutting_down);
2029 if (!m_cache_state->clean && all_clean && !flushing) {
2030 m_cache_state->clean = true;
2031 update_image_cache_state();
2032 write_image_cache_state(locker);
2033 }
2034 }
2035
2036 if (!flushing && (all_clean || stop_flushing)) {
2037 /* Complete without holding m_lock */
2038 if (all_clean) {
2039 ldout(cct, 20) << "no dirty entries" << dendl;
2040 } else {
2041 ldout(cct, 5) << "flush during shutdown suppressed" << dendl;
2042 }
2043 on_finish->complete(0);
2044 } else {
2045 if (all_clean) {
2046 ldout(cct, 5) << "flush ops still in progress" << dendl;
2047 } else {
2048 ldout(cct, 20) << "dirty entries remain" << dendl;
2049 }
2050 std::lock_guard locker(m_lock);
2051 /* on_finish can't be completed yet */
2052 m_flush_complete_contexts.push_back(new LambdaContext(
2053 [this, on_finish](int r) {
2054 flush_dirty_entries(on_finish);
2055 }));
2056 wake_up();
2057 }
2058 }
2059
2060 template <typename I>
2061 void AbstractWriteLog<I>::internal_flush(bool invalidate, Context *on_finish) {
2062 ldout(m_image_ctx.cct, 20) << "invalidate=" << invalidate << dendl;
2063
2064 if (m_perfcounter) {
2065 if (invalidate) {
2066 m_perfcounter->inc(l_librbd_pwl_invalidate_cache, 1);
2067 } else {
2068 m_perfcounter->inc(l_librbd_pwl_internal_flush, 1);
2069 }
2070 }
2071
2072 /* May be called even if initialization fails */
2073 if (!m_initialized) {
2074 ldout(m_image_ctx.cct, 05) << "never initialized" << dendl;
2075 /* Deadlock if completed here */
2076 m_image_ctx.op_work_queue->queue(on_finish, 0);
2077 return;
2078 }
2079
2080 /* Flush/invalidate must pass through block guard to ensure all layers of
2081 * cache are consistently flush/invalidated. This ensures no in-flight write leaves
2082 * some layers with valid regions, which may later produce inconsistent read
2083 * results. */
2084 GuardedRequestFunctionContext *guarded_ctx =
2085 new GuardedRequestFunctionContext(
2086 [this, on_finish, invalidate](GuardedRequestFunctionContext &guard_ctx) {
2087 DeferredContexts on_exit;
2088 ldout(m_image_ctx.cct, 20) << "cell=" << guard_ctx.cell << dendl;
2089 ceph_assert(guard_ctx.cell);
2090
2091 Context *ctx = new LambdaContext(
2092 [this, cell=guard_ctx.cell, invalidate, on_finish](int r) {
2093 std::lock_guard locker(m_lock);
2094 m_invalidating = false;
2095 ldout(m_image_ctx.cct, 6) << "Done flush/invalidating (invalidate="
2096 << invalidate << ")" << dendl;
2097 if (m_log_entries.size()) {
2098 ldout(m_image_ctx.cct, 1) << "m_log_entries.size()="
2099 << m_log_entries.size()
2100 << ", front()=" << *m_log_entries.front()
2101 << dendl;
2102 }
2103 if (invalidate) {
2104 ceph_assert(m_log_entries.size() == 0);
2105 }
2106 ceph_assert(m_dirty_log_entries.size() == 0);
2107 m_image_ctx.op_work_queue->queue(on_finish, r);
2108 release_guarded_request(cell);
2109 });
2110 ctx = new LambdaContext(
2111 [this, ctx, invalidate](int r) {
2112 Context *next_ctx = ctx;
2113 ldout(m_image_ctx.cct, 6) << "flush_dirty_entries finished" << dendl;
2114 if (r < 0) {
2115 /* Override on_finish status with this error */
2116 next_ctx = new LambdaContext([r, ctx](int _r) {
2117 ctx->complete(r);
2118 });
2119 }
2120 if (invalidate) {
2121 {
2122 std::lock_guard locker(m_lock);
2123 ceph_assert(m_dirty_log_entries.size() == 0);
2124 ceph_assert(!m_invalidating);
2125 ldout(m_image_ctx.cct, 6) << "Invalidating" << dendl;
2126 m_invalidating = true;
2127 }
2128 /* Discards all RWL entries */
2129 while (retire_entries(MAX_ALLOC_PER_TRANSACTION)) { }
2130 next_ctx->complete(0);
2131 } else {
2132 {
2133 std::lock_guard locker(m_lock);
2134 ceph_assert(m_dirty_log_entries.size() == 0);
2135 ceph_assert(!m_invalidating);
2136 }
2137 m_image_writeback.aio_flush(io::FLUSH_SOURCE_WRITEBACK, next_ctx);
2138 }
2139 });
2140 ctx = new LambdaContext(
2141 [this, ctx](int r) {
2142 flush_dirty_entries(ctx);
2143 });
2144 std::lock_guard locker(m_lock);
2145 /* Even if we're throwing everything away, but we want the last entry to
2146 * be a sync point so we can cleanly resume.
2147 *
2148 * Also, the blockguard only guarantees the replication of this op
2149 * can't overlap with prior ops. It doesn't guarantee those are all
2150 * completed and eligible for flush & retire, which we require here.
2151 */
2152 auto flush_req = make_flush_req(ctx);
2153 flush_new_sync_point_if_needed(flush_req, on_exit);
2154 });
2155 detain_guarded_request(nullptr, guarded_ctx, true);
2156 }
2157
2158 template <typename I>
2159 void AbstractWriteLog<I>::add_into_log_map(GenericWriteLogEntries &log_entries,
2160 C_BlockIORequestT *req) {
2161 req->copy_cache();
2162 m_blocks_to_log_entries.add_log_entries(log_entries);
2163 }
2164
2165 template <typename I>
2166 bool AbstractWriteLog<I>::can_retire_entry(std::shared_ptr<GenericLogEntry> log_entry) {
2167 CephContext *cct = m_image_ctx.cct;
2168
2169 ldout(cct, 20) << dendl;
2170 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
2171 ceph_assert(log_entry);
2172 return log_entry->can_retire();
2173 }
2174
2175 template <typename I>
2176 void AbstractWriteLog<I>::check_image_cache_state_clean() {
2177 ceph_assert(m_deferred_ios.empty());
2178 ceph_assert(m_ops_to_append.empty());
2179 ceph_assert(m_async_flush_ops == 0);
2180 ceph_assert(m_async_append_ops == 0);
2181 ceph_assert(m_dirty_log_entries.empty());
2182 ceph_assert(m_ops_to_flush.empty());
2183 ceph_assert(m_flush_ops_in_flight == 0);
2184 ceph_assert(m_flush_bytes_in_flight == 0);
2185 ceph_assert(m_bytes_dirty == 0);
2186 ceph_assert(m_work_queue.empty());
2187 }
2188
2189 } // namespace pwl
2190 } // namespace cache
2191 } // namespace librbd
2192
2193 template class librbd::cache::pwl::AbstractWriteLog<librbd::ImageCtx>;