]> git.proxmox.com Git - ceph.git/blame - ceph/src/librbd/cache/pwl/AbstractWriteLog.cc
import ceph quincy 17.2.1
[ceph.git] / ceph / src / librbd / cache / pwl / AbstractWriteLog.cc
CommitLineData
f67539c2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "AbstractWriteLog.h"
5#include "include/buffer.h"
6#include "include/Context.h"
7#include "include/ceph_assert.h"
8#include "common/deleter.h"
9#include "common/dout.h"
10#include "common/environment.h"
11#include "common/errno.h"
12#include "common/hostname.h"
13#include "common/WorkQueue.h"
14#include "common/Timer.h"
15#include "common/perf_counters.h"
16#include "librbd/ImageCtx.h"
17#include "librbd/asio/ContextWQ.h"
18#include "librbd/cache/pwl/ImageCacheState.h"
19#include "librbd/cache/pwl/LogEntry.h"
20#include "librbd/plugin/Api.h"
21#include <map>
22#include <vector>
23
24#undef dout_subsys
25#define dout_subsys ceph_subsys_rbd_pwl
26#undef dout_prefix
27#define dout_prefix *_dout << "librbd::cache::pwl::AbstractWriteLog: " << this \
28 << " " << __func__ << ": "
29
30namespace librbd {
31namespace cache {
32namespace pwl {
33
20effc67 34using namespace std;
f67539c2
TL
35using namespace librbd::cache::pwl;
36
37typedef AbstractWriteLog<ImageCtx>::Extent Extent;
38typedef AbstractWriteLog<ImageCtx>::Extents Extents;
39
40template <typename I>
41AbstractWriteLog<I>::AbstractWriteLog(
42 I &image_ctx, librbd::cache::pwl::ImageCacheState<I>* cache_state,
43 Builder<This> *builder, cache::ImageWritebackInterface& image_writeback,
44 plugin::Api<I>& plugin_api)
45 : m_builder(builder),
46 m_write_log_guard(image_ctx.cct),
20effc67
TL
47 m_flush_guard(image_ctx.cct),
48 m_flush_guard_lock(ceph::make_mutex(pwl::unique_lock_name(
49 "librbd::cache::pwl::AbstractWriteLog::m_flush_guard_lock", this))),
f67539c2
TL
50 m_deferred_dispatch_lock(ceph::make_mutex(pwl::unique_lock_name(
51 "librbd::cache::pwl::AbstractWriteLog::m_deferred_dispatch_lock", this))),
52 m_blockguard_lock(ceph::make_mutex(pwl::unique_lock_name(
53 "librbd::cache::pwl::AbstractWriteLog::m_blockguard_lock", this))),
54 m_thread_pool(
55 image_ctx.cct, "librbd::cache::pwl::AbstractWriteLog::thread_pool",
56 "tp_pwl", 4, ""),
57 m_cache_state(cache_state),
58 m_image_ctx(image_ctx),
a4b75251 59 m_log_pool_size(DEFAULT_POOL_SIZE),
f67539c2
TL
60 m_image_writeback(image_writeback),
61 m_plugin_api(plugin_api),
62 m_log_retire_lock(ceph::make_mutex(pwl::unique_lock_name(
63 "librbd::cache::pwl::AbstractWriteLog::m_log_retire_lock", this))),
64 m_entry_reader_lock("librbd::cache::pwl::AbstractWriteLog::m_entry_reader_lock"),
65 m_log_append_lock(ceph::make_mutex(pwl::unique_lock_name(
66 "librbd::cache::pwl::AbstractWriteLog::m_log_append_lock", this))),
67 m_lock(ceph::make_mutex(pwl::unique_lock_name(
68 "librbd::cache::pwl::AbstractWriteLog::m_lock", this))),
69 m_blocks_to_log_entries(image_ctx.cct),
70 m_work_queue("librbd::cache::pwl::ReplicatedWriteLog::work_queue",
71 ceph::make_timespan(
72 image_ctx.config.template get_val<uint64_t>(
73 "rbd_op_thread_timeout")),
74 &m_thread_pool)
75{
76 CephContext *cct = m_image_ctx.cct;
77 m_plugin_api.get_image_timer_instance(cct, &m_timer, &m_timer_lock);
78}
79
80template <typename I>
81AbstractWriteLog<I>::~AbstractWriteLog() {
82 ldout(m_image_ctx.cct, 15) << "enter" << dendl;
83 {
84 std::lock_guard timer_locker(*m_timer_lock);
85 std::lock_guard locker(m_lock);
86 m_timer->cancel_event(m_timer_ctx);
87 m_thread_pool.stop();
88 ceph_assert(m_deferred_ios.size() == 0);
89 ceph_assert(m_ops_to_flush.size() == 0);
90 ceph_assert(m_ops_to_append.size() == 0);
91 ceph_assert(m_flush_ops_in_flight == 0);
92
93 delete m_cache_state;
94 m_cache_state = nullptr;
95 }
96 ldout(m_image_ctx.cct, 15) << "exit" << dendl;
97}
98
99template <typename I>
100void AbstractWriteLog<I>::perf_start(std::string name) {
101 PerfCountersBuilder plb(m_image_ctx.cct, name, l_librbd_pwl_first,
102 l_librbd_pwl_last);
103
104 // Latency axis configuration for op histograms, values are in nanoseconds
105 PerfHistogramCommon::axis_config_d op_hist_x_axis_config{
106 "Latency (nsec)",
107 PerfHistogramCommon::SCALE_LOG2, ///< Latency in logarithmic scale
108 0, ///< Start at 0
109 5000, ///< Quantization unit is 5usec
110 16, ///< Ranges into the mS
111 };
112
113 // Syncpoint logentry number x-axis configuration for op histograms
114 PerfHistogramCommon::axis_config_d sp_logentry_number_config{
115 "logentry number",
116 PerfHistogramCommon::SCALE_LINEAR, // log entry number in linear scale
117 0, // Start at 0
118 1, // Quantization unit is 1
119 260, // Up to 260 > (MAX_WRITES_PER_SYNC_POINT)
120 };
121
122 // Syncpoint bytes number y-axis configuration for op histogram
123 PerfHistogramCommon::axis_config_d sp_bytes_number_config{
124 "Number of SyncPoint",
125 PerfHistogramCommon::SCALE_LOG2, // Request size in logarithmic scale
126 0, // Start at 0
127 512, // Quantization unit is 512
128 17, // Writes up to 8M >= MAX_BYTES_PER_SYNC_POINT
129 };
130
131 // Op size axis configuration for op histogram y axis, values are in bytes
132 PerfHistogramCommon::axis_config_d op_hist_y_axis_config{
133 "Request size (bytes)",
134 PerfHistogramCommon::SCALE_LOG2, ///< Request size in logarithmic scale
135 0, ///< Start at 0
136 512, ///< Quantization unit is 512 bytes
137 16, ///< Writes up to >32k
138 };
139
140 // Num items configuration for op histogram y axis, values are in items
141 PerfHistogramCommon::axis_config_d op_hist_y_axis_count_config{
142 "Number of items",
143 PerfHistogramCommon::SCALE_LINEAR, ///< Request size in linear scale
144 0, ///< Start at 0
145 1, ///< Quantization unit is 1
146 32, ///< Writes up to >32k
147 };
148
149 plb.add_u64_counter(l_librbd_pwl_rd_req, "rd", "Reads");
150 plb.add_u64_counter(l_librbd_pwl_rd_bytes, "rd_bytes", "Data size in reads");
151 plb.add_time_avg(l_librbd_pwl_rd_latency, "rd_latency", "Latency of reads");
152
153 plb.add_u64_counter(l_librbd_pwl_rd_hit_req, "hit_rd", "Reads completely hitting RWL");
154 plb.add_u64_counter(l_librbd_pwl_rd_hit_bytes, "rd_hit_bytes", "Bytes read from RWL");
155 plb.add_time_avg(l_librbd_pwl_rd_hit_latency, "hit_rd_latency", "Latency of read hits");
156
157 plb.add_u64_counter(l_librbd_pwl_rd_part_hit_req, "part_hit_rd", "reads partially hitting RWL");
158
159 plb.add_u64_counter_histogram(
160 l_librbd_pwl_syncpoint_hist, "syncpoint_logentry_bytes_histogram",
161 sp_logentry_number_config, sp_bytes_number_config,
162 "Histogram of syncpoint's logentry numbers vs bytes number");
163
164 plb.add_u64_counter(l_librbd_pwl_wr_req, "wr", "Writes");
a4b75251 165 plb.add_u64_counter(l_librbd_pwl_wr_bytes, "wr_bytes", "Data size in writes");
f67539c2
TL
166 plb.add_u64_counter(l_librbd_pwl_wr_req_def, "wr_def", "Writes deferred for resources");
167 plb.add_u64_counter(l_librbd_pwl_wr_req_def_lanes, "wr_def_lanes", "Writes deferred for lanes");
168 plb.add_u64_counter(l_librbd_pwl_wr_req_def_log, "wr_def_log", "Writes deferred for log entries");
169 plb.add_u64_counter(l_librbd_pwl_wr_req_def_buf, "wr_def_buf", "Writes deferred for buffers");
170 plb.add_u64_counter(l_librbd_pwl_wr_req_overlap, "wr_overlap", "Writes overlapping with prior in-progress writes");
171 plb.add_u64_counter(l_librbd_pwl_wr_req_queued, "wr_q_barrier", "Writes queued for prior barriers (aio_flush)");
f67539c2
TL
172
173 plb.add_u64_counter(l_librbd_pwl_log_ops, "log_ops", "Log appends");
174 plb.add_u64_avg(l_librbd_pwl_log_op_bytes, "log_op_bytes", "Average log append bytes");
175
176 plb.add_time_avg(
177 l_librbd_pwl_req_arr_to_all_t, "req_arr_to_all_t",
178 "Average arrival to allocation time (time deferred for overlap)");
179 plb.add_time_avg(
180 l_librbd_pwl_req_arr_to_dis_t, "req_arr_to_dis_t",
181 "Average arrival to dispatch time (includes time deferred for overlaps and allocation)");
182 plb.add_time_avg(
183 l_librbd_pwl_req_all_to_dis_t, "req_all_to_dis_t",
184 "Average allocation to dispatch time (time deferred for log resources)");
185 plb.add_time_avg(
186 l_librbd_pwl_wr_latency, "wr_latency",
187 "Latency of writes (persistent completion)");
188 plb.add_u64_counter_histogram(
189 l_librbd_pwl_wr_latency_hist, "wr_latency_bytes_histogram",
190 op_hist_x_axis_config, op_hist_y_axis_config,
191 "Histogram of write request latency (nanoseconds) vs. bytes written");
192 plb.add_time_avg(
193 l_librbd_pwl_wr_caller_latency, "caller_wr_latency",
194 "Latency of write completion to caller");
195 plb.add_time_avg(
196 l_librbd_pwl_nowait_req_arr_to_all_t, "req_arr_to_all_nw_t",
197 "Average arrival to allocation time (time deferred for overlap)");
198 plb.add_time_avg(
199 l_librbd_pwl_nowait_req_arr_to_dis_t, "req_arr_to_dis_nw_t",
200 "Average arrival to dispatch time (includes time deferred for overlaps and allocation)");
201 plb.add_time_avg(
202 l_librbd_pwl_nowait_req_all_to_dis_t, "req_all_to_dis_nw_t",
203 "Average allocation to dispatch time (time deferred for log resources)");
204 plb.add_time_avg(
205 l_librbd_pwl_nowait_wr_latency, "wr_latency_nw",
206 "Latency of writes (persistent completion) not deferred for free space");
207 plb.add_u64_counter_histogram(
208 l_librbd_pwl_nowait_wr_latency_hist, "wr_latency_nw_bytes_histogram",
209 op_hist_x_axis_config, op_hist_y_axis_config,
210 "Histogram of write request latency (nanoseconds) vs. bytes written for writes not deferred for free space");
211 plb.add_time_avg(
212 l_librbd_pwl_nowait_wr_caller_latency, "caller_wr_latency_nw",
213 "Latency of write completion to callerfor writes not deferred for free space");
214 plb.add_time_avg(l_librbd_pwl_log_op_alloc_t, "op_alloc_t", "Average buffer pmemobj_reserve() time");
215 plb.add_u64_counter_histogram(
216 l_librbd_pwl_log_op_alloc_t_hist, "op_alloc_t_bytes_histogram",
217 op_hist_x_axis_config, op_hist_y_axis_config,
218 "Histogram of buffer pmemobj_reserve() time (nanoseconds) vs. bytes written");
219 plb.add_time_avg(l_librbd_pwl_log_op_dis_to_buf_t, "op_dis_to_buf_t", "Average dispatch to buffer persist time");
220 plb.add_time_avg(l_librbd_pwl_log_op_dis_to_app_t, "op_dis_to_app_t", "Average dispatch to log append time");
221 plb.add_time_avg(l_librbd_pwl_log_op_dis_to_cmp_t, "op_dis_to_cmp_t", "Average dispatch to persist completion time");
222 plb.add_u64_counter_histogram(
223 l_librbd_pwl_log_op_dis_to_cmp_t_hist, "op_dis_to_cmp_t_bytes_histogram",
224 op_hist_x_axis_config, op_hist_y_axis_config,
225 "Histogram of op dispatch to persist complete time (nanoseconds) vs. bytes written");
226
227 plb.add_time_avg(
228 l_librbd_pwl_log_op_buf_to_app_t, "op_buf_to_app_t",
229 "Average buffer persist to log append time (write data persist/replicate + wait for append time)");
230 plb.add_time_avg(
231 l_librbd_pwl_log_op_buf_to_bufc_t, "op_buf_to_bufc_t",
232 "Average buffer persist time (write data persist/replicate time)");
233 plb.add_u64_counter_histogram(
234 l_librbd_pwl_log_op_buf_to_bufc_t_hist, "op_buf_to_bufc_t_bytes_histogram",
235 op_hist_x_axis_config, op_hist_y_axis_config,
236 "Histogram of write buffer persist time (nanoseconds) vs. bytes written");
237 plb.add_time_avg(
238 l_librbd_pwl_log_op_app_to_cmp_t, "op_app_to_cmp_t",
239 "Average log append to persist complete time (log entry append/replicate + wait for complete time)");
240 plb.add_time_avg(
241 l_librbd_pwl_log_op_app_to_appc_t, "op_app_to_appc_t",
242 "Average log append to persist complete time (log entry append/replicate time)");
243 plb.add_u64_counter_histogram(
244 l_librbd_pwl_log_op_app_to_appc_t_hist, "op_app_to_appc_t_bytes_histogram",
245 op_hist_x_axis_config, op_hist_y_axis_config,
246 "Histogram of log append persist time (nanoseconds) (vs. op bytes)");
247
248 plb.add_u64_counter(l_librbd_pwl_discard, "discard", "Discards");
249 plb.add_u64_counter(l_librbd_pwl_discard_bytes, "discard_bytes", "Bytes discarded");
250 plb.add_time_avg(l_librbd_pwl_discard_latency, "discard_lat", "Discard latency");
251
252 plb.add_u64_counter(l_librbd_pwl_aio_flush, "aio_flush", "AIO flush (flush to RWL)");
253 plb.add_u64_counter(l_librbd_pwl_aio_flush_def, "aio_flush_def", "AIO flushes deferred for resources");
254 plb.add_time_avg(l_librbd_pwl_aio_flush_latency, "aio_flush_lat", "AIO flush latency");
255
256 plb.add_u64_counter(l_librbd_pwl_ws,"ws", "Write Sames");
257 plb.add_u64_counter(l_librbd_pwl_ws_bytes, "ws_bytes", "Write Same bytes to image");
258 plb.add_time_avg(l_librbd_pwl_ws_latency, "ws_lat", "Write Same latency");
259
260 plb.add_u64_counter(l_librbd_pwl_cmp, "cmp", "Compare and Write requests");
261 plb.add_u64_counter(l_librbd_pwl_cmp_bytes, "cmp_bytes", "Compare and Write bytes compared/written");
262 plb.add_time_avg(l_librbd_pwl_cmp_latency, "cmp_lat", "Compare and Write latecy");
263 plb.add_u64_counter(l_librbd_pwl_cmp_fails, "cmp_fails", "Compare and Write compare fails");
264
a4b75251
TL
265 plb.add_u64_counter(l_librbd_pwl_internal_flush, "internal_flush", "Flush RWL (write back to OSD)");
266 plb.add_time_avg(l_librbd_pwl_writeback_latency, "writeback_lat", "write back to OSD latency");
f67539c2
TL
267 plb.add_u64_counter(l_librbd_pwl_invalidate_cache, "invalidate", "Invalidate RWL");
268 plb.add_u64_counter(l_librbd_pwl_invalidate_discard_cache, "discard", "Discard and invalidate RWL");
269
270 plb.add_time_avg(l_librbd_pwl_append_tx_t, "append_tx_lat", "Log append transaction latency");
271 plb.add_u64_counter_histogram(
272 l_librbd_pwl_append_tx_t_hist, "append_tx_lat_histogram",
273 op_hist_x_axis_config, op_hist_y_axis_count_config,
274 "Histogram of log append transaction time (nanoseconds) vs. entries appended");
275 plb.add_time_avg(l_librbd_pwl_retire_tx_t, "retire_tx_lat", "Log retire transaction latency");
276 plb.add_u64_counter_histogram(
277 l_librbd_pwl_retire_tx_t_hist, "retire_tx_lat_histogram",
278 op_hist_x_axis_config, op_hist_y_axis_count_config,
279 "Histogram of log retire transaction time (nanoseconds) vs. entries retired");
280
281 m_perfcounter = plb.create_perf_counters();
282 m_image_ctx.cct->get_perfcounters_collection()->add(m_perfcounter);
283}
284
285template <typename I>
286void AbstractWriteLog<I>::perf_stop() {
287 ceph_assert(m_perfcounter);
288 m_image_ctx.cct->get_perfcounters_collection()->remove(m_perfcounter);
289 delete m_perfcounter;
290}
291
292template <typename I>
293void AbstractWriteLog<I>::log_perf() {
294 bufferlist bl;
295 Formatter *f = Formatter::create("json-pretty");
296 bl.append("Perf dump follows\n--- Begin perf dump ---\n");
297 bl.append("{\n");
298 stringstream ss;
299 utime_t now = ceph_clock_now();
300 ss << "\"test_time\": \"" << now << "\",";
301 ss << "\"image\": \"" << m_image_ctx.name << "\",";
302 bl.append(ss);
303 bl.append("\"stats\": ");
304 m_image_ctx.cct->get_perfcounters_collection()->dump_formatted(f, 0);
305 f->flush(bl);
306 bl.append(",\n\"histograms\": ");
307 m_image_ctx.cct->get_perfcounters_collection()->dump_formatted_histograms(f, 0);
308 f->flush(bl);
309 delete f;
310 bl.append("}\n--- End perf dump ---\n");
311 bl.append('\0');
312 ldout(m_image_ctx.cct, 1) << bl.c_str() << dendl;
313}
314
315template <typename I>
316void AbstractWriteLog<I>::periodic_stats() {
317 std::lock_guard locker(m_lock);
33c7a0ef
TL
318 update_image_cache_state();
319 ldout(m_image_ctx.cct, 5) << "STATS: m_log_entries=" << m_log_entries.size()
a4b75251
TL
320 << ", m_dirty_log_entries=" << m_dirty_log_entries.size()
321 << ", m_free_log_entries=" << m_free_log_entries
322 << ", m_bytes_allocated=" << m_bytes_allocated
323 << ", m_bytes_cached=" << m_bytes_cached
324 << ", m_bytes_dirty=" << m_bytes_dirty
325 << ", bytes available=" << m_bytes_allocated_cap - m_bytes_allocated
326 << ", m_first_valid_entry=" << m_first_valid_entry
327 << ", m_first_free_entry=" << m_first_free_entry
328 << ", m_current_sync_gen=" << m_current_sync_gen
329 << ", m_flushed_sync_gen=" << m_flushed_sync_gen
f67539c2
TL
330 << dendl;
331}
332
333template <typename I>
334void AbstractWriteLog<I>::arm_periodic_stats() {
335 ceph_assert(ceph_mutex_is_locked(*m_timer_lock));
33c7a0ef
TL
336 m_timer_ctx = new LambdaContext([this](int r) {
337 /* m_timer_lock is held */
338 periodic_stats();
339 arm_periodic_stats();
340 });
341 m_timer->add_event_after(LOG_STATS_INTERVAL_SECONDS, m_timer_ctx);
f67539c2
TL
342}
343
344template <typename I>
345void AbstractWriteLog<I>::update_entries(std::shared_ptr<GenericLogEntry> *log_entry,
346 WriteLogCacheEntry *cache_entry, std::map<uint64_t, bool> &missing_sync_points,
347 std::map<uint64_t, std::shared_ptr<SyncPointLogEntry>> &sync_point_entries,
a4b75251 348 uint64_t entry_index) {
f67539c2
TL
349 bool writer = cache_entry->is_writer();
350 if (cache_entry->is_sync_point()) {
351 ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
352 << " is a sync point. cache_entry=[" << *cache_entry << "]" << dendl;
353 auto sync_point_entry = std::make_shared<SyncPointLogEntry>(cache_entry->sync_gen_number);
354 *log_entry = sync_point_entry;
355 sync_point_entries[cache_entry->sync_gen_number] = sync_point_entry;
356 missing_sync_points.erase(cache_entry->sync_gen_number);
357 m_current_sync_gen = cache_entry->sync_gen_number;
358 } else if (cache_entry->is_write()) {
359 ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
360 << " is a write. cache_entry=[" << *cache_entry << "]" << dendl;
361 auto write_entry =
362 m_builder->create_write_log_entry(nullptr, cache_entry->image_offset_bytes, cache_entry->write_bytes);
363 write_data_to_buffer(write_entry, cache_entry);
364 *log_entry = write_entry;
365 } else if (cache_entry->is_writesame()) {
366 ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
367 << " is a write same. cache_entry=[" << *cache_entry << "]" << dendl;
368 auto ws_entry =
369 m_builder->create_writesame_log_entry(nullptr, cache_entry->image_offset_bytes,
370 cache_entry->write_bytes, cache_entry->ws_datalen);
371 write_data_to_buffer(ws_entry, cache_entry);
372 *log_entry = ws_entry;
373 } else if (cache_entry->is_discard()) {
374 ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
375 << " is a discard. cache_entry=[" << *cache_entry << "]" << dendl;
376 auto discard_entry =
377 std::make_shared<DiscardLogEntry>(nullptr, cache_entry->image_offset_bytes, cache_entry->write_bytes,
378 m_discard_granularity_bytes);
379 *log_entry = discard_entry;
380 } else {
381 lderr(m_image_ctx.cct) << "Unexpected entry type in entry " << entry_index
382 << ", cache_entry=[" << *cache_entry << "]" << dendl;
383 }
384
385 if (writer) {
386 ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
387 << " writes. cache_entry=[" << *cache_entry << "]" << dendl;
388 if (!sync_point_entries[cache_entry->sync_gen_number]) {
389 missing_sync_points[cache_entry->sync_gen_number] = true;
390 }
391 }
392}
393
394template <typename I>
395void AbstractWriteLog<I>::update_sync_points(std::map<uint64_t, bool> &missing_sync_points,
396 std::map<uint64_t, std::shared_ptr<SyncPointLogEntry>> &sync_point_entries,
a4b75251 397 DeferredContexts &later) {
f67539c2
TL
398 /* Create missing sync points. These must not be appended until the
399 * entry reload is complete and the write map is up to
400 * date. Currently this is handled by the deferred contexts object
401 * passed to new_sync_point(). These contexts won't be completed
402 * until this function returns. */
403 for (auto &kv : missing_sync_points) {
404 ldout(m_image_ctx.cct, 5) << "Adding sync point " << kv.first << dendl;
405 if (0 == m_current_sync_gen) {
406 /* The unlikely case where the log contains writing entries, but no sync
407 * points (e.g. because they were all retired) */
408 m_current_sync_gen = kv.first-1;
409 }
410 ceph_assert(kv.first == m_current_sync_gen+1);
411 init_flush_new_sync_point(later);
412 ceph_assert(kv.first == m_current_sync_gen);
20effc67 413 sync_point_entries[kv.first] = m_current_sync_point->log_entry;
f67539c2
TL
414 }
415
416 /*
417 * Iterate over the log entries again (this time via the global
418 * entries list), connecting write entries to their sync points and
419 * updating the sync point stats.
420 *
421 * Add writes to the write log map.
422 */
423 std::shared_ptr<SyncPointLogEntry> previous_sync_point_entry = nullptr;
424 for (auto &log_entry : m_log_entries) {
425 if ((log_entry->write_bytes() > 0) || (log_entry->bytes_dirty() > 0)) {
426 /* This entry is one of the types that write */
427 auto gen_write_entry = static_pointer_cast<GenericWriteLogEntry>(log_entry);
428 if (gen_write_entry) {
429 auto sync_point_entry = sync_point_entries[gen_write_entry->ram_entry.sync_gen_number];
430 if (!sync_point_entry) {
431 lderr(m_image_ctx.cct) << "Sync point missing for entry=[" << *gen_write_entry << "]" << dendl;
432 ceph_assert(false);
433 } else {
434 gen_write_entry->sync_point_entry = sync_point_entry;
435 sync_point_entry->writes++;
436 sync_point_entry->bytes += gen_write_entry->ram_entry.write_bytes;
437 sync_point_entry->writes_completed++;
438 m_blocks_to_log_entries.add_log_entry(gen_write_entry);
439 /* This entry is only dirty if its sync gen number is > the flushed
440 * sync gen number from the root object. */
441 if (gen_write_entry->ram_entry.sync_gen_number > m_flushed_sync_gen) {
442 m_dirty_log_entries.push_back(log_entry);
443 m_bytes_dirty += gen_write_entry->bytes_dirty();
444 } else {
445 gen_write_entry->set_flushed(true);
446 sync_point_entry->writes_flushed++;
447 }
a4b75251
TL
448
449 /* calc m_bytes_allocated & m_bytes_cached */
450 inc_allocated_cached_bytes(log_entry);
f67539c2
TL
451 }
452 }
453 } else {
454 /* This entry is sync point entry */
455 auto sync_point_entry = static_pointer_cast<SyncPointLogEntry>(log_entry);
456 if (sync_point_entry) {
457 if (previous_sync_point_entry) {
458 previous_sync_point_entry->next_sync_point_entry = sync_point_entry;
459 if (previous_sync_point_entry->ram_entry.sync_gen_number > m_flushed_sync_gen) {
460 sync_point_entry->prior_sync_point_flushed = false;
461 ceph_assert(!previous_sync_point_entry->prior_sync_point_flushed ||
462 (0 == previous_sync_point_entry->writes) ||
463 (previous_sync_point_entry->writes >= previous_sync_point_entry->writes_flushed));
464 } else {
465 sync_point_entry->prior_sync_point_flushed = true;
466 ceph_assert(previous_sync_point_entry->prior_sync_point_flushed);
467 ceph_assert(previous_sync_point_entry->writes == previous_sync_point_entry->writes_flushed);
468 }
469 } else {
470 /* There are no previous sync points, so we'll consider them flushed */
471 sync_point_entry->prior_sync_point_flushed = true;
472 }
473 previous_sync_point_entry = sync_point_entry;
474 ldout(m_image_ctx.cct, 10) << "Loaded to sync point=[" << *sync_point_entry << dendl;
475 }
476 }
477 }
478 if (0 == m_current_sync_gen) {
479 /* If a re-opened log was completely flushed, we'll have found no sync point entries here,
480 * and not advanced m_current_sync_gen. Here we ensure it starts past the last flushed sync
481 * point recorded in the log. */
482 m_current_sync_gen = m_flushed_sync_gen;
483 }
484}
485
486template <typename I>
487void AbstractWriteLog<I>::pwl_init(Context *on_finish, DeferredContexts &later) {
488 CephContext *cct = m_image_ctx.cct;
489 ldout(cct, 20) << dendl;
490 ceph_assert(m_cache_state);
491 std::lock_guard locker(m_lock);
492 ceph_assert(!m_initialized);
493 ldout(cct,5) << "image name: " << m_image_ctx.name << " id: " << m_image_ctx.id << dendl;
494
495 if (!m_cache_state->present) {
496 m_cache_state->host = ceph_get_short_hostname();
497 m_cache_state->size = m_image_ctx.config.template get_val<uint64_t>(
498 "rbd_persistent_cache_size");
499
500 string path = m_image_ctx.config.template get_val<string>(
501 "rbd_persistent_cache_path");
502 std::string pool_name = m_image_ctx.md_ctx.get_pool_name();
503 m_cache_state->path = path + "/rbd-pwl." + pool_name + "." + m_image_ctx.id + ".pool";
504 }
505
506 ldout(cct,5) << "pwl_size: " << m_cache_state->size << dendl;
507 ldout(cct,5) << "pwl_path: " << m_cache_state->path << dendl;
508
509 m_log_pool_name = m_cache_state->path;
a4b75251
TL
510 m_log_pool_size = max(m_cache_state->size, MIN_POOL_SIZE);
511 m_log_pool_size = p2align(m_log_pool_size, POOL_SIZE_ALIGN);
512 ldout(cct, 5) << "pool " << m_log_pool_name << " size " << m_log_pool_size
513 << " (adjusted from " << m_cache_state->size << ")" << dendl;
f67539c2
TL
514
515 if ((!m_cache_state->present) &&
516 (access(m_log_pool_name.c_str(), F_OK) == 0)) {
517 ldout(cct, 5) << "There's an existing pool file " << m_log_pool_name
518 << ", While there's no cache in the image metatata." << dendl;
519 if (remove(m_log_pool_name.c_str()) != 0) {
20effc67 520 lderr(cct) << "failed to remove the pool file " << m_log_pool_name
f67539c2
TL
521 << dendl;
522 on_finish->complete(-errno);
523 return;
524 } else {
525 ldout(cct, 5) << "Removed the existing pool file." << dendl;
526 }
527 } else if ((m_cache_state->present) &&
528 (access(m_log_pool_name.c_str(), F_OK) != 0)) {
20effc67
TL
529 lderr(cct) << "can't find the existed pool file: " << m_log_pool_name
530 << ". error: " << cpp_strerror(-errno) << dendl;
f67539c2 531 on_finish->complete(-errno);
a4b75251 532 return;
f67539c2
TL
533 }
534
a4b75251
TL
535 bool succeeded = initialize_pool(on_finish, later);
536 if (!succeeded) {
537 return ;
538 }
f67539c2
TL
539
540 ldout(cct,1) << "pool " << m_log_pool_name << " has " << m_total_log_entries
541 << " log entries, " << m_free_log_entries << " of which are free."
542 << " first_valid=" << m_first_valid_entry
543 << ", first_free=" << m_first_free_entry
544 << ", flushed_sync_gen=" << m_flushed_sync_gen
545 << ", m_current_sync_gen=" << m_current_sync_gen << dendl;
546 if (m_first_free_entry == m_first_valid_entry) {
547 ldout(cct,1) << "write log is empty" << dendl;
548 m_cache_state->empty = true;
549 }
550
551 /* Start the sync point following the last one seen in the
552 * log. Flush the last sync point created during the loading of the
553 * existing log entries. */
554 init_flush_new_sync_point(later);
555 ldout(cct,20) << "new sync point = [" << m_current_sync_point << "]" << dendl;
556
557 m_initialized = true;
558 // Start the thread
559 m_thread_pool.start();
560
f67539c2
TL
561 /* Do these after we drop lock */
562 later.add(new LambdaContext([this](int r) {
f67539c2
TL
563 /* Log stats for the first time */
564 periodic_stats();
565 /* Arm periodic stats logging for the first time */
566 std::lock_guard timer_locker(*m_timer_lock);
567 arm_periodic_stats();
33c7a0ef 568 }));
f67539c2
TL
569 m_image_ctx.op_work_queue->queue(on_finish, 0);
570}
571
33c7a0ef
TL
572template <typename I>
573void AbstractWriteLog<I>::update_image_cache_state() {
574 using klass = AbstractWriteLog<I>;
575 Context *ctx = util::create_context_callback<
576 klass, &klass::handle_update_image_cache_state>(this);
577 update_image_cache_state(ctx);
578}
579
f67539c2
TL
580template <typename I>
581void AbstractWriteLog<I>::update_image_cache_state(Context *on_finish) {
33c7a0ef
TL
582 ldout(m_image_ctx.cct, 10) << dendl;
583
584 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
585 m_cache_state->allocated_bytes = m_bytes_allocated;
586 m_cache_state->cached_bytes = m_bytes_cached;
587 m_cache_state->dirty_bytes = m_bytes_dirty;
588 m_cache_state->free_bytes = m_bytes_allocated_cap - m_bytes_allocated;
589 m_cache_state->hits_full = m_perfcounter->get(l_librbd_pwl_rd_hit_req);
590 m_cache_state->hits_partial = m_perfcounter->get(l_librbd_pwl_rd_part_hit_req);
591 m_cache_state->misses = m_perfcounter->get(l_librbd_pwl_rd_req) -
592 m_cache_state->hits_full - m_cache_state->hits_partial;
593 m_cache_state->hit_bytes = m_perfcounter->get(l_librbd_pwl_rd_hit_bytes);
594 m_cache_state->miss_bytes = m_perfcounter->get(l_librbd_pwl_rd_bytes) -
595 m_cache_state->hit_bytes;
f67539c2
TL
596 m_cache_state->write_image_cache_state(on_finish);
597}
598
33c7a0ef
TL
599template <typename I>
600void AbstractWriteLog<I>::handle_update_image_cache_state(int r) {
601 CephContext *cct = m_image_ctx.cct;
602 ldout(cct, 10) << "r=" << r << dendl;
603
604 if (r < 0) {
605 lderr(cct) << "failed to update image cache state: " << cpp_strerror(r)
606 << dendl;
607 return;
608 }
609}
610
f67539c2
TL
611template <typename I>
612void AbstractWriteLog<I>::init(Context *on_finish) {
613 CephContext *cct = m_image_ctx.cct;
614 ldout(cct, 20) << dendl;
a4b75251
TL
615 auto pname = std::string("librbd-pwl-") + m_image_ctx.id +
616 std::string("-") + m_image_ctx.md_ctx.get_pool_name() +
617 std::string("-") + m_image_ctx.name;
618 perf_start(pname);
f67539c2
TL
619
620 ceph_assert(!m_initialized);
621
622 Context *ctx = new LambdaContext(
623 [this, on_finish](int r) {
624 if (r >= 0) {
33c7a0ef 625 std::lock_guard locker(m_lock);
f67539c2
TL
626 update_image_cache_state(on_finish);
627 } else {
628 on_finish->complete(r);
629 }
630 });
631
632 DeferredContexts later;
633 pwl_init(ctx, later);
634}
635
636template <typename I>
637void AbstractWriteLog<I>::shut_down(Context *on_finish) {
638 CephContext *cct = m_image_ctx.cct;
639 ldout(cct, 20) << dendl;
640
641 ldout(cct,5) << "image name: " << m_image_ctx.name << " id: " << m_image_ctx.id << dendl;
642
643 Context *ctx = new LambdaContext(
644 [this, on_finish](int r) {
33c7a0ef
TL
645 if (m_perfcounter) {
646 perf_stop();
647 }
f67539c2
TL
648 ldout(m_image_ctx.cct, 6) << "shutdown complete" << dendl;
649 m_image_ctx.op_work_queue->queue(on_finish, r);
650 });
651 ctx = new LambdaContext(
652 [this, ctx](int r) {
653 ldout(m_image_ctx.cct, 6) << "image cache cleaned" << dendl;
654 Context *next_ctx = override_ctx(r, ctx);
33c7a0ef 655 periodic_stats();
f67539c2 656
33c7a0ef
TL
657 std::lock_guard locker(m_lock);
658 check_image_cache_state_clean();
659 m_wake_up_enabled = false;
660 m_log_entries.clear();
661 m_cache_state->clean = true;
662 m_cache_state->empty = true;
663 remove_pool_file();
f67539c2
TL
664 update_image_cache_state(next_ctx);
665 });
a4b75251
TL
666 ctx = new LambdaContext(
667 [this, ctx](int r) {
668 Context *next_ctx = override_ctx(r, ctx);
669 ldout(m_image_ctx.cct, 6) << "waiting for in flight operations" << dendl;
670 // Wait for in progress IOs to complete
671 next_ctx = util::create_async_context_callback(&m_work_queue, next_ctx);
672 m_async_op_tracker.wait_for_ops(next_ctx);
673 });
f67539c2
TL
674 ctx = new LambdaContext(
675 [this, ctx](int r) {
676 Context *next_ctx = override_ctx(r, ctx);
677 {
678 /* Sync with process_writeback_dirty_entries() */
679 RWLock::WLocker entry_reader_wlocker(m_entry_reader_lock);
680 m_shutting_down = true;
681 /* Flush all writes to OSDs (unless disabled) and wait for all
682 in-progress flush writes to complete */
683 ldout(m_image_ctx.cct, 6) << "flushing" << dendl;
33c7a0ef 684 periodic_stats();
f67539c2
TL
685 }
686 flush_dirty_entries(next_ctx);
687 });
f67539c2
TL
688 ctx = new LambdaContext(
689 [this, ctx](int r) {
690 ldout(m_image_ctx.cct, 6) << "Done internal_flush in shutdown" << dendl;
691 m_work_queue.queue(ctx, r);
692 });
693 /* Complete all in-flight writes before shutting down */
694 ldout(m_image_ctx.cct, 6) << "internal_flush in shutdown" << dendl;
695 internal_flush(false, ctx);
696}
697
698template <typename I>
699void AbstractWriteLog<I>::read(Extents&& image_extents,
700 ceph::bufferlist* bl,
701 int fadvise_flags, Context *on_finish) {
702 CephContext *cct = m_image_ctx.cct;
703 utime_t now = ceph_clock_now();
704
705 on_finish = new LambdaContext(
706 [this, on_finish](int r) {
707 m_async_op_tracker.finish_op();
708 on_finish->complete(r);
709 });
710 C_ReadRequest *read_ctx = m_builder->create_read_request(
711 cct, now, m_perfcounter, bl, on_finish);
712 ldout(cct, 20) << "name: " << m_image_ctx.name << " id: " << m_image_ctx.id
20effc67
TL
713 << "image_extents=" << image_extents
714 << ", bl=" << bl
715 << ", on_finish=" << on_finish << dendl;
f67539c2
TL
716
717 ceph_assert(m_initialized);
718 bl->clear();
719 m_perfcounter->inc(l_librbd_pwl_rd_req, 1);
720
a4b75251 721 std::vector<std::shared_ptr<GenericWriteLogEntry>> log_entries_to_read;
f67539c2
TL
722 std::vector<bufferlist*> bls_to_read;
723
724 m_async_op_tracker.start_op();
725 Context *ctx = new LambdaContext(
726 [this, read_ctx, fadvise_flags](int r) {
727 if (read_ctx->miss_extents.empty()) {
728 /* All of this read comes from RWL */
729 read_ctx->complete(0);
730 } else {
731 /* Pass the read misses on to the layer below RWL */
732 m_image_writeback.aio_read(
733 std::move(read_ctx->miss_extents), &read_ctx->miss_bl,
734 fadvise_flags, read_ctx);
735 }
736 });
737
738 /*
739 * The strategy here is to look up all the WriteLogMapEntries that overlap
740 * this read, and iterate through those to separate this read into hits and
741 * misses. A new Extents object is produced here with Extents for each miss
742 * region. The miss Extents is then passed on to the read cache below RWL. We
743 * also produce an ImageExtentBufs for all the extents (hit or miss) in this
744 * read. When the read from the lower cache layer completes, we iterate
745 * through the ImageExtentBufs and insert buffers for each cache hit at the
746 * appropriate spot in the bufferlist returned from below for the miss
747 * read. The buffers we insert here refer directly to regions of various
748 * write log entry data buffers.
749 *
750 * Locking: These buffer objects hold a reference on the write log entries
751 * they refer to. Log entries can't be retired until there are no references.
752 * The GenericWriteLogEntry references are released by the buffer destructor.
753 */
754 for (auto &extent : image_extents) {
755 uint64_t extent_offset = 0;
756 RWLock::RLocker entry_reader_locker(m_entry_reader_lock);
757 WriteLogMapEntries map_entries = m_blocks_to_log_entries.find_map_entries(
758 block_extent(extent));
759 for (auto &map_entry : map_entries) {
760 Extent entry_image_extent(pwl::image_extent(map_entry.block_extent));
761 /* If this map entry starts after the current image extent offset ... */
762 if (entry_image_extent.first > extent.first + extent_offset) {
763 /* ... add range before map_entry to miss extents */
764 uint64_t miss_extent_start = extent.first + extent_offset;
765 uint64_t miss_extent_length = entry_image_extent.first -
766 miss_extent_start;
767 Extent miss_extent(miss_extent_start, miss_extent_length);
768 read_ctx->miss_extents.push_back(miss_extent);
769 /* Add miss range to read extents */
770 auto miss_extent_buf = std::make_shared<ImageExtentBuf>(miss_extent);
771 read_ctx->read_extents.push_back(miss_extent_buf);
772 extent_offset += miss_extent_length;
773 }
774 ceph_assert(entry_image_extent.first <= extent.first + extent_offset);
775 uint64_t entry_offset = 0;
776 /* If this map entry starts before the current image extent offset ... */
777 if (entry_image_extent.first < extent.first + extent_offset) {
778 /* ... compute offset into log entry for this read extent */
779 entry_offset = (extent.first + extent_offset) - entry_image_extent.first;
780 }
781 /* This read hit ends at the end of the extent or the end of the log
782 entry, whichever is less. */
783 uint64_t entry_hit_length = min(entry_image_extent.second - entry_offset,
784 extent.second - extent_offset);
785 Extent hit_extent(entry_image_extent.first, entry_hit_length);
786 if (0 == map_entry.log_entry->write_bytes() &&
787 0 < map_entry.log_entry->bytes_dirty()) {
788 /* discard log entry */
789 ldout(cct, 20) << "discard log entry" << dendl;
790 auto discard_entry = map_entry.log_entry;
791 ldout(cct, 20) << "read hit on discard entry: log_entry="
792 << *discard_entry
793 << dendl;
794 /* Discards read as zero, so we'll construct a bufferlist of zeros */
795 bufferlist zero_bl;
796 zero_bl.append_zero(entry_hit_length);
797 /* Add hit extent to read extents */
798 auto hit_extent_buf = std::make_shared<ImageExtentBuf>(
799 hit_extent, zero_bl);
800 read_ctx->read_extents.push_back(hit_extent_buf);
801 } else {
802 ldout(cct, 20) << "write or writesame log entry" << dendl;
803 /* write and writesame log entry */
804 /* Offset of the map entry into the log entry's buffer */
805 uint64_t map_entry_buffer_offset = entry_image_extent.first -
806 map_entry.log_entry->ram_entry.image_offset_bytes;
807 /* Offset into the log entry buffer of this read hit */
808 uint64_t read_buffer_offset = map_entry_buffer_offset + entry_offset;
809 /* Create buffer object referring to pmem pool for this read hit */
810 collect_read_extents(
811 read_buffer_offset, map_entry, log_entries_to_read, bls_to_read,
812 entry_hit_length, hit_extent, read_ctx);
813 }
814 /* Exclude RWL hit range from buffer and extent */
815 extent_offset += entry_hit_length;
816 ldout(cct, 20) << map_entry << dendl;
817 }
818 /* If the last map entry didn't consume the entire image extent ... */
819 if (extent.second > extent_offset) {
820 /* ... add the rest of this extent to miss extents */
821 uint64_t miss_extent_start = extent.first + extent_offset;
822 uint64_t miss_extent_length = extent.second - extent_offset;
823 Extent miss_extent(miss_extent_start, miss_extent_length);
824 read_ctx->miss_extents.push_back(miss_extent);
825 /* Add miss range to read extents */
826 auto miss_extent_buf = std::make_shared<ImageExtentBuf>(miss_extent);
827 read_ctx->read_extents.push_back(miss_extent_buf);
828 extent_offset += miss_extent_length;
829 }
830 }
831
20effc67
TL
832 ldout(cct, 20) << "miss_extents=" << read_ctx->miss_extents
833 << ", miss_bl=" << read_ctx->miss_bl << dendl;
f67539c2
TL
834
835 complete_read(log_entries_to_read, bls_to_read, ctx);
836}
837
838template <typename I>
839void AbstractWriteLog<I>::write(Extents &&image_extents,
840 bufferlist&& bl,
841 int fadvise_flags,
842 Context *on_finish) {
843 CephContext *cct = m_image_ctx.cct;
844
845 ldout(cct, 20) << "aio_write" << dendl;
846
847 utime_t now = ceph_clock_now();
848 m_perfcounter->inc(l_librbd_pwl_wr_req, 1);
849
850 ceph_assert(m_initialized);
851
20effc67
TL
852 /* Split image extents larger than 1M. This isn't strictly necessary but
853 * makes libpmemobj allocator's job easier and reduces pmemobj_defrag() cost.
854 * We plan to manage pmem space and allocation by ourselves in the future.
f67539c2
TL
855 */
856 Extents split_image_extents;
857 uint64_t max_extent_size = get_max_extent();
858 if (max_extent_size != 0) {
859 for (auto extent : image_extents) {
860 if (extent.second > max_extent_size) {
861 uint64_t off = extent.first;
862 uint64_t extent_bytes = extent.second;
863 for (int i = 0; extent_bytes != 0; ++i) {
864 Extent _ext;
865 _ext.first = off + i * max_extent_size;
866 _ext.second = std::min(max_extent_size, extent_bytes);
867 extent_bytes = extent_bytes - _ext.second ;
868 split_image_extents.emplace_back(_ext);
869 }
870 } else {
871 split_image_extents.emplace_back(extent);
872 }
873 }
874 } else {
875 split_image_extents = image_extents;
876 }
877
878 C_WriteRequestT *write_req =
879 m_builder->create_write_request(*this, now, std::move(split_image_extents),
880 std::move(bl), fadvise_flags, m_lock,
881 m_perfcounter, on_finish);
882 m_perfcounter->inc(l_librbd_pwl_wr_bytes,
883 write_req->image_extents_summary.total_bytes);
884
885 /* The lambda below will be called when the block guard for all
886 * blocks affected by this write is obtained */
887 GuardedRequestFunctionContext *guarded_ctx =
888 new GuardedRequestFunctionContext([this,
889 write_req](GuardedRequestFunctionContext &guard_ctx) {
890 write_req->blockguard_acquired(guard_ctx);
891 alloc_and_dispatch_io_req(write_req);
892 });
893
894 detain_guarded_request(write_req, guarded_ctx, false);
895}
896
897template <typename I>
898void AbstractWriteLog<I>::discard(uint64_t offset, uint64_t length,
899 uint32_t discard_granularity_bytes,
900 Context *on_finish) {
901 CephContext *cct = m_image_ctx.cct;
902
903 ldout(cct, 20) << dendl;
904
905 utime_t now = ceph_clock_now();
906 m_perfcounter->inc(l_librbd_pwl_discard, 1);
907 Extents discard_extents = {{offset, length}};
908 m_discard_granularity_bytes = discard_granularity_bytes;
909
910 ceph_assert(m_initialized);
911
912 auto *discard_req =
913 new C_DiscardRequestT(*this, now, std::move(discard_extents), discard_granularity_bytes,
914 m_lock, m_perfcounter, on_finish);
915
916 /* The lambda below will be called when the block guard for all
917 * blocks affected by this write is obtained */
918 GuardedRequestFunctionContext *guarded_ctx =
919 new GuardedRequestFunctionContext([this, discard_req](GuardedRequestFunctionContext &guard_ctx) {
920 discard_req->blockguard_acquired(guard_ctx);
921 alloc_and_dispatch_io_req(discard_req);
922 });
923
924 detain_guarded_request(discard_req, guarded_ctx, false);
925}
926
927/**
928 * Aio_flush completes when all previously completed writes are
929 * flushed to persistent cache. We make a best-effort attempt to also
930 * defer until all in-progress writes complete, but we may not know
931 * about all of the writes the application considers in-progress yet,
932 * due to uncertainty in the IO submission workq (multiple WQ threads
933 * may allow out-of-order submission).
934 *
935 * This flush operation will not wait for writes deferred for overlap
936 * in the block guard.
937 */
938template <typename I>
939void AbstractWriteLog<I>::flush(io::FlushSource flush_source, Context *on_finish) {
940 CephContext *cct = m_image_ctx.cct;
941 ldout(cct, 20) << "on_finish=" << on_finish << " flush_source=" << flush_source << dendl;
942
943 if (io::FLUSH_SOURCE_SHUTDOWN == flush_source || io::FLUSH_SOURCE_INTERNAL == flush_source ||
944 io::FLUSH_SOURCE_WRITE_BLOCK == flush_source) {
945 internal_flush(false, on_finish);
946 return;
947 }
948 m_perfcounter->inc(l_librbd_pwl_aio_flush, 1);
949
950 /* May be called even if initialization fails */
951 if (!m_initialized) {
952 ldout(cct, 05) << "never initialized" << dendl;
953 /* Deadlock if completed here */
954 m_image_ctx.op_work_queue->queue(on_finish, 0);
955 return;
956 }
957
958 {
959 std::shared_lock image_locker(m_image_ctx.image_lock);
960 if (m_image_ctx.snap_id != CEPH_NOSNAP || m_image_ctx.read_only) {
961 on_finish->complete(-EROFS);
962 return;
963 }
964 }
965
966 auto flush_req = make_flush_req(on_finish);
967
968 GuardedRequestFunctionContext *guarded_ctx =
969 new GuardedRequestFunctionContext([this, flush_req](GuardedRequestFunctionContext &guard_ctx) {
970 ldout(m_image_ctx.cct, 20) << "flush_req=" << flush_req << " cell=" << guard_ctx.cell << dendl;
971 ceph_assert(guard_ctx.cell);
972 flush_req->detained = guard_ctx.state.detained;
973 /* We don't call flush_req->set_cell(), because the block guard will be released here */
974 {
975 DeferredContexts post_unlock; /* Do these when the lock below is released */
976 std::lock_guard locker(m_lock);
977
978 if (!m_persist_on_flush && m_persist_on_write_until_flush) {
979 m_persist_on_flush = true;
980 ldout(m_image_ctx.cct, 5) << "now persisting on flush" << dendl;
981 }
982
983 /*
984 * Create a new sync point if there have been writes since the last
985 * one.
986 *
987 * We do not flush the caches below the RWL here.
988 */
989 flush_new_sync_point_if_needed(flush_req, post_unlock);
990 }
991
992 release_guarded_request(guard_ctx.cell);
993 });
994
995 detain_guarded_request(flush_req, guarded_ctx, true);
996}
997
998template <typename I>
999void AbstractWriteLog<I>::writesame(uint64_t offset, uint64_t length,
1000 bufferlist&& bl, int fadvise_flags,
1001 Context *on_finish) {
1002 CephContext *cct = m_image_ctx.cct;
1003
1004 ldout(cct, 20) << "aio_writesame" << dendl;
1005
1006 utime_t now = ceph_clock_now();
1007 Extents ws_extents = {{offset, length}};
1008 m_perfcounter->inc(l_librbd_pwl_ws, 1);
1009 ceph_assert(m_initialized);
1010
1011 /* A write same request is also a write request. The key difference is the
1012 * write same data buffer is shorter than the extent of the request. The full
1013 * extent will be used in the block guard, and appear in
1014 * m_blocks_to_log_entries_map. The data buffer allocated for the WS is only
1015 * as long as the length of the bl here, which is the pattern that's repeated
1016 * in the image for the entire length of this WS. Read hits and flushing of
1017 * write sames are different than normal writes. */
1018 C_WriteSameRequestT *ws_req =
1019 m_builder->create_writesame_request(*this, now, std::move(ws_extents), std::move(bl),
1020 fadvise_flags, m_lock, m_perfcounter, on_finish);
1021 m_perfcounter->inc(l_librbd_pwl_ws_bytes, ws_req->image_extents_summary.total_bytes);
1022
1023 /* The lambda below will be called when the block guard for all
1024 * blocks affected by this write is obtained */
1025 GuardedRequestFunctionContext *guarded_ctx =
1026 new GuardedRequestFunctionContext([this, ws_req](GuardedRequestFunctionContext &guard_ctx) {
1027 ws_req->blockguard_acquired(guard_ctx);
1028 alloc_and_dispatch_io_req(ws_req);
1029 });
1030
1031 detain_guarded_request(ws_req, guarded_ctx, false);
1032}
1033
1034template <typename I>
1035void AbstractWriteLog<I>::compare_and_write(Extents &&image_extents,
1036 bufferlist&& cmp_bl,
1037 bufferlist&& bl,
1038 uint64_t *mismatch_offset,
1039 int fadvise_flags,
1040 Context *on_finish) {
1041 ldout(m_image_ctx.cct, 20) << dendl;
1042
1043 utime_t now = ceph_clock_now();
1044 m_perfcounter->inc(l_librbd_pwl_cmp, 1);
1045 ceph_assert(m_initialized);
1046
1047 /* A compare and write request is also a write request. We only allocate
1048 * resources and dispatch this write request if the compare phase
1049 * succeeds. */
1050 C_WriteRequestT *cw_req =
1051 m_builder->create_comp_and_write_request(
1052 *this, now, std::move(image_extents), std::move(cmp_bl), std::move(bl),
1053 mismatch_offset, fadvise_flags, m_lock, m_perfcounter, on_finish);
1054 m_perfcounter->inc(l_librbd_pwl_cmp_bytes, cw_req->image_extents_summary.total_bytes);
1055
1056 /* The lambda below will be called when the block guard for all
1057 * blocks affected by this write is obtained */
1058 GuardedRequestFunctionContext *guarded_ctx =
1059 new GuardedRequestFunctionContext([this, cw_req](GuardedRequestFunctionContext &guard_ctx) {
1060 cw_req->blockguard_acquired(guard_ctx);
1061
1062 auto read_complete_ctx = new LambdaContext(
1063 [this, cw_req](int r) {
1064 ldout(m_image_ctx.cct, 20) << "name: " << m_image_ctx.name << " id: " << m_image_ctx.id
1065 << "cw_req=" << cw_req << dendl;
1066
1067 /* Compare read_bl to cmp_bl to determine if this will produce a write */
1068 buffer::list aligned_read_bl;
1069 if (cw_req->cmp_bl.length() < cw_req->read_bl.length()) {
1070 aligned_read_bl.substr_of(cw_req->read_bl, 0, cw_req->cmp_bl.length());
1071 }
1072 if (cw_req->cmp_bl.contents_equal(cw_req->read_bl) ||
1073 cw_req->cmp_bl.contents_equal(aligned_read_bl)) {
1074 /* Compare phase succeeds. Begin write */
1075 ldout(m_image_ctx.cct, 5) << " cw_req=" << cw_req << " compare matched" << dendl;
1076 cw_req->compare_succeeded = true;
1077 *cw_req->mismatch_offset = 0;
1078 /* Continue with this request as a write. Blockguard release and
1079 * user request completion handled as if this were a plain
1080 * write. */
1081 alloc_and_dispatch_io_req(cw_req);
1082 } else {
1083 /* Compare phase fails. Comp-and write ends now. */
1084 ldout(m_image_ctx.cct, 15) << " cw_req=" << cw_req << " compare failed" << dendl;
1085 /* Bufferlist doesn't tell us where they differed, so we'll have to determine that here */
1086 uint64_t bl_index = 0;
1087 for (bl_index = 0; bl_index < cw_req->cmp_bl.length(); bl_index++) {
1088 if (cw_req->cmp_bl[bl_index] != cw_req->read_bl[bl_index]) {
1089 ldout(m_image_ctx.cct, 15) << " cw_req=" << cw_req << " mismatch at " << bl_index << dendl;
1090 break;
1091 }
1092 }
1093 cw_req->compare_succeeded = false;
1094 *cw_req->mismatch_offset = bl_index;
1095 cw_req->complete_user_request(-EILSEQ);
1096 cw_req->release_cell();
1097 cw_req->complete(0);
1098 }
1099 });
1100
1101 /* Read phase of comp-and-write must read through RWL */
1102 Extents image_extents_copy = cw_req->image_extents;
1103 read(std::move(image_extents_copy), &cw_req->read_bl, cw_req->fadvise_flags, read_complete_ctx);
1104 });
1105
1106 detain_guarded_request(cw_req, guarded_ctx, false);
1107}
1108
1109template <typename I>
1110void AbstractWriteLog<I>::flush(Context *on_finish) {
1111 internal_flush(false, on_finish);
1112}
1113
1114template <typename I>
1115void AbstractWriteLog<I>::invalidate(Context *on_finish) {
1116 internal_flush(true, on_finish);
1117}
1118
1119template <typename I>
1120CephContext *AbstractWriteLog<I>::get_context() {
1121 return m_image_ctx.cct;
1122}
1123
1124template <typename I>
1125BlockGuardCell* AbstractWriteLog<I>::detain_guarded_request_helper(GuardedRequest &req)
1126{
1127 CephContext *cct = m_image_ctx.cct;
1128 BlockGuardCell *cell;
1129
1130 ceph_assert(ceph_mutex_is_locked_by_me(m_blockguard_lock));
1131 ldout(cct, 20) << dendl;
1132
1133 int r = m_write_log_guard.detain(req.block_extent, &req, &cell);
1134 ceph_assert(r>=0);
1135 if (r > 0) {
1136 ldout(cct, 20) << "detaining guarded request due to in-flight requests: "
1137 << "req=" << req << dendl;
1138 return nullptr;
1139 }
1140
1141 ldout(cct, 20) << "in-flight request cell: " << cell << dendl;
1142 return cell;
1143}
1144
1145template <typename I>
1146BlockGuardCell* AbstractWriteLog<I>::detain_guarded_request_barrier_helper(
1147 GuardedRequest &req)
1148{
1149 BlockGuardCell *cell = nullptr;
1150
1151 ceph_assert(ceph_mutex_is_locked_by_me(m_blockguard_lock));
1152 ldout(m_image_ctx.cct, 20) << dendl;
1153
1154 if (m_barrier_in_progress) {
1155 req.guard_ctx->state.queued = true;
1156 m_awaiting_barrier.push_back(req);
1157 } else {
1158 bool barrier = req.guard_ctx->state.barrier;
1159 if (barrier) {
1160 m_barrier_in_progress = true;
1161 req.guard_ctx->state.current_barrier = true;
1162 }
1163 cell = detain_guarded_request_helper(req);
1164 if (barrier) {
1165 /* Only non-null if the barrier acquires the guard now */
1166 m_barrier_cell = cell;
1167 }
1168 }
1169
1170 return cell;
1171}
1172
1173template <typename I>
1174void AbstractWriteLog<I>::detain_guarded_request(
1175 C_BlockIORequestT *request,
1176 GuardedRequestFunctionContext *guarded_ctx,
1177 bool is_barrier)
1178{
1179 BlockExtent extent;
1180 if (request) {
1181 extent = request->image_extents_summary.block_extent();
1182 } else {
1183 extent = block_extent(whole_volume_extent());
1184 }
1185 auto req = GuardedRequest(extent, guarded_ctx, is_barrier);
1186 BlockGuardCell *cell = nullptr;
1187
1188 ldout(m_image_ctx.cct, 20) << dendl;
1189 {
1190 std::lock_guard locker(m_blockguard_lock);
1191 cell = detain_guarded_request_barrier_helper(req);
1192 }
1193 if (cell) {
1194 req.guard_ctx->cell = cell;
1195 req.guard_ctx->complete(0);
1196 }
1197}
1198
1199template <typename I>
1200void AbstractWriteLog<I>::release_guarded_request(BlockGuardCell *released_cell)
1201{
1202 CephContext *cct = m_image_ctx.cct;
1203 WriteLogGuard::BlockOperations block_reqs;
1204 ldout(cct, 20) << "released_cell=" << released_cell << dendl;
1205
1206 {
1207 std::lock_guard locker(m_blockguard_lock);
1208 m_write_log_guard.release(released_cell, &block_reqs);
1209
1210 for (auto &req : block_reqs) {
1211 req.guard_ctx->state.detained = true;
1212 BlockGuardCell *detained_cell = detain_guarded_request_helper(req);
1213 if (detained_cell) {
1214 if (req.guard_ctx->state.current_barrier) {
1215 /* The current barrier is acquiring the block guard, so now we know its cell */
1216 m_barrier_cell = detained_cell;
1217 /* detained_cell could be == released_cell here */
1218 ldout(cct, 20) << "current barrier cell=" << detained_cell << " req=" << req << dendl;
1219 }
1220 req.guard_ctx->cell = detained_cell;
1221 m_work_queue.queue(req.guard_ctx);
1222 }
1223 }
1224
1225 if (m_barrier_in_progress && (released_cell == m_barrier_cell)) {
1226 ldout(cct, 20) << "current barrier released cell=" << released_cell << dendl;
1227 /* The released cell is the current barrier request */
1228 m_barrier_in_progress = false;
1229 m_barrier_cell = nullptr;
1230 /* Move waiting requests into the blockguard. Stop if there's another barrier */
1231 while (!m_barrier_in_progress && !m_awaiting_barrier.empty()) {
1232 auto &req = m_awaiting_barrier.front();
1233 ldout(cct, 20) << "submitting queued request to blockguard: " << req << dendl;
1234 BlockGuardCell *detained_cell = detain_guarded_request_barrier_helper(req);
1235 if (detained_cell) {
1236 req.guard_ctx->cell = detained_cell;
1237 m_work_queue.queue(req.guard_ctx);
1238 }
1239 m_awaiting_barrier.pop_front();
1240 }
1241 }
1242 }
1243
1244 ldout(cct, 20) << "exit" << dendl;
1245}
1246
1247template <typename I>
1248void AbstractWriteLog<I>::append_scheduled(GenericLogOperations &ops, bool &ops_remain,
1249 bool &appending, bool isRWL)
1250{
1251 const unsigned long int OPS_APPENDED = isRWL ? MAX_ALLOC_PER_TRANSACTION
1252 : MAX_WRITES_PER_SYNC_POINT;
1253 {
1254 std::lock_guard locker(m_lock);
1255 if (!appending && m_appending) {
1256 /* Another thread is appending */
1257 ldout(m_image_ctx.cct, 15) << "Another thread is appending" << dendl;
1258 return;
1259 }
1260 if (m_ops_to_append.size()) {
1261 appending = true;
1262 m_appending = true;
1263 auto last_in_batch = m_ops_to_append.begin();
1264 unsigned int ops_to_append = m_ops_to_append.size();
1265 if (ops_to_append > OPS_APPENDED) {
1266 ops_to_append = OPS_APPENDED;
1267 }
1268 std::advance(last_in_batch, ops_to_append);
1269 ops.splice(ops.end(), m_ops_to_append, m_ops_to_append.begin(), last_in_batch);
1270 ops_remain = true; /* Always check again before leaving */
20effc67
TL
1271 ldout(m_image_ctx.cct, 20) << "appending " << ops.size() << ", remain "
1272 << m_ops_to_append.size() << dendl;
f67539c2
TL
1273 } else if (isRWL) {
1274 ops_remain = false;
1275 if (appending) {
1276 appending = false;
1277 m_appending = false;
1278 }
1279 }
1280 }
1281}
1282
1283template <typename I>
a4b75251 1284void AbstractWriteLog<I>::schedule_append(GenericLogOperationsVector &ops, C_BlockIORequestT *req)
f67539c2
TL
1285{
1286 GenericLogOperations to_append(ops.begin(), ops.end());
1287
a4b75251 1288 schedule_append_ops(to_append, req);
f67539c2
TL
1289}
1290
1291template <typename I>
a4b75251 1292void AbstractWriteLog<I>::schedule_append(GenericLogOperationSharedPtr op, C_BlockIORequestT *req)
f67539c2
TL
1293{
1294 GenericLogOperations to_append { op };
1295
a4b75251 1296 schedule_append_ops(to_append, req);
f67539c2
TL
1297}
1298
1299/*
1300 * Complete a set of write ops with the result of append_op_entries.
1301 */
1302template <typename I>
1303void AbstractWriteLog<I>::complete_op_log_entries(GenericLogOperations &&ops,
1304 const int result)
1305{
1306 GenericLogEntries dirty_entries;
1307 int published_reserves = 0;
1308 ldout(m_image_ctx.cct, 20) << __func__ << ": completing" << dendl;
1309 for (auto &op : ops) {
1310 utime_t now = ceph_clock_now();
1311 auto log_entry = op->get_log_entry();
1312 log_entry->completed = true;
1313 if (op->is_writing_op()) {
1314 op->mark_log_entry_completed();
1315 dirty_entries.push_back(log_entry);
1316 }
1317 if (log_entry->is_write_entry()) {
1318 release_ram(log_entry);
1319 }
1320 if (op->reserved_allocated()) {
1321 published_reserves++;
1322 }
1323 {
1324 std::lock_guard locker(m_lock);
1325 m_unpublished_reserves -= published_reserves;
1326 m_dirty_log_entries.splice(m_dirty_log_entries.end(), dirty_entries);
33c7a0ef
TL
1327 if (m_cache_state->clean && !this->m_dirty_log_entries.empty()) {
1328 m_cache_state->clean = false;
1329 update_image_cache_state();
1330 }
f67539c2
TL
1331 }
1332 op->complete(result);
1333 m_perfcounter->tinc(l_librbd_pwl_log_op_dis_to_app_t,
a4b75251 1334 op->log_append_start_time - op->dispatch_time);
f67539c2
TL
1335 m_perfcounter->tinc(l_librbd_pwl_log_op_dis_to_cmp_t, now - op->dispatch_time);
1336 m_perfcounter->hinc(l_librbd_pwl_log_op_dis_to_cmp_t_hist,
1337 utime_t(now - op->dispatch_time).to_nsec(),
1338 log_entry->ram_entry.write_bytes);
a4b75251 1339 utime_t app_lat = op->log_append_comp_time - op->log_append_start_time;
f67539c2
TL
1340 m_perfcounter->tinc(l_librbd_pwl_log_op_app_to_appc_t, app_lat);
1341 m_perfcounter->hinc(l_librbd_pwl_log_op_app_to_appc_t_hist, app_lat.to_nsec(),
1342 log_entry->ram_entry.write_bytes);
a4b75251 1343 m_perfcounter->tinc(l_librbd_pwl_log_op_app_to_cmp_t, now - op->log_append_start_time);
f67539c2
TL
1344 }
1345 // New entries may be flushable
1346 {
1347 std::lock_guard locker(m_lock);
1348 wake_up();
1349 }
1350}
1351
1352/**
1353 * Dispatch as many deferred writes as possible
1354 */
1355template <typename I>
1356void AbstractWriteLog<I>::dispatch_deferred_writes(void)
1357{
1358 C_BlockIORequestT *front_req = nullptr; /* req still on front of deferred list */
1359 C_BlockIORequestT *allocated_req = nullptr; /* req that was allocated, and is now off the list */
1360 bool allocated = false; /* front_req allocate succeeded */
1361 bool cleared_dispatching_flag = false;
1362
1363 /* If we can't become the dispatcher, we'll exit */
1364 {
1365 std::lock_guard locker(m_lock);
1366 if (m_dispatching_deferred_ops ||
1367 !m_deferred_ios.size()) {
1368 return;
1369 }
1370 m_dispatching_deferred_ops = true;
1371 }
1372
1373 /* There are ops to dispatch, and this should be the only thread dispatching them */
1374 {
1375 std::lock_guard deferred_dispatch(m_deferred_dispatch_lock);
1376 do {
1377 {
1378 std::lock_guard locker(m_lock);
1379 ceph_assert(m_dispatching_deferred_ops);
1380 if (allocated) {
1381 /* On the 2..n-1 th time we get lock, front_req->alloc_resources() will
1382 * have succeeded, and we'll need to pop it off the deferred ops list
1383 * here. */
1384 ceph_assert(front_req);
1385 ceph_assert(!allocated_req);
1386 m_deferred_ios.pop_front();
1387 allocated_req = front_req;
1388 front_req = nullptr;
1389 allocated = false;
1390 }
1391 ceph_assert(!allocated);
1392 if (!allocated && front_req) {
1393 /* front_req->alloc_resources() failed on the last iteration.
1394 * We'll stop dispatching. */
1395 wake_up();
1396 front_req = nullptr;
1397 ceph_assert(!cleared_dispatching_flag);
1398 m_dispatching_deferred_ops = false;
1399 cleared_dispatching_flag = true;
1400 } else {
1401 ceph_assert(!front_req);
1402 if (m_deferred_ios.size()) {
1403 /* New allocation candidate */
1404 front_req = m_deferred_ios.front();
1405 } else {
1406 ceph_assert(!cleared_dispatching_flag);
1407 m_dispatching_deferred_ops = false;
1408 cleared_dispatching_flag = true;
1409 }
1410 }
1411 }
1412 /* Try allocating for front_req before we decide what to do with allocated_req
1413 * (if any) */
1414 if (front_req) {
1415 ceph_assert(!cleared_dispatching_flag);
1416 allocated = front_req->alloc_resources();
1417 }
1418 if (allocated_req && front_req && allocated) {
1419 /* Push dispatch of the first allocated req to a wq */
1420 m_work_queue.queue(new LambdaContext(
33c7a0ef 1421 [allocated_req](int r) {
f67539c2
TL
1422 allocated_req->dispatch();
1423 }), 0);
1424 allocated_req = nullptr;
1425 }
1426 ceph_assert(!(allocated_req && front_req && allocated));
1427
1428 /* Continue while we're still considering the front of the deferred ops list */
1429 } while (front_req);
1430 ceph_assert(!allocated);
1431 }
1432 ceph_assert(cleared_dispatching_flag);
1433
1434 /* If any deferred requests were allocated, the last one will still be in allocated_req */
1435 if (allocated_req) {
1436 allocated_req->dispatch();
1437 }
1438}
1439
1440/**
1441 * Returns the lanes used by this write, and attempts to dispatch the next
1442 * deferred write
1443 */
1444template <typename I>
1445void AbstractWriteLog<I>::release_write_lanes(C_BlockIORequestT *req)
1446{
1447 {
1448 std::lock_guard locker(m_lock);
1449 m_free_lanes += req->image_extents.size();
1450 }
1451 dispatch_deferred_writes();
1452}
1453
1454/**
1455 * Attempts to allocate log resources for a write. Write is dispatched if
1456 * resources are available, or queued if they aren't.
1457 */
1458template <typename I>
1459void AbstractWriteLog<I>::alloc_and_dispatch_io_req(C_BlockIORequestT *req)
1460{
1461 bool dispatch_here = false;
1462
1463 {
1464 /* If there are already deferred writes, queue behind them for resources */
1465 {
1466 std::lock_guard locker(m_lock);
1467 dispatch_here = m_deferred_ios.empty();
1468 // Only flush req's total_bytes is the max uint64
a4b75251
TL
1469 if (req->image_extents_summary.total_bytes ==
1470 std::numeric_limits<uint64_t>::max() &&
1471 static_cast<C_FlushRequestT *>(req)->internal == true) {
f67539c2
TL
1472 dispatch_here = true;
1473 }
1474 }
1475 if (dispatch_here) {
1476 dispatch_here = req->alloc_resources();
1477 }
1478 if (dispatch_here) {
1479 ldout(m_image_ctx.cct, 20) << "dispatching" << dendl;
1480 req->dispatch();
1481 } else {
1482 req->deferred();
1483 {
1484 std::lock_guard locker(m_lock);
1485 m_deferred_ios.push_back(req);
1486 }
1487 ldout(m_image_ctx.cct, 20) << "deferred IOs: " << m_deferred_ios.size() << dendl;
1488 dispatch_deferred_writes();
1489 }
1490 }
1491}
1492
1493template <typename I>
a4b75251
TL
1494bool AbstractWriteLog<I>::check_allocation(
1495 C_BlockIORequestT *req, uint64_t bytes_cached, uint64_t bytes_dirtied,
1496 uint64_t bytes_allocated, uint32_t num_lanes, uint32_t num_log_entries,
1497 uint32_t num_unpublished_reserves) {
f67539c2
TL
1498 bool alloc_succeeds = true;
1499 bool no_space = false;
1500 {
1501 std::lock_guard locker(m_lock);
1502 if (m_free_lanes < num_lanes) {
f67539c2
TL
1503 ldout(m_image_ctx.cct, 20) << "not enough free lanes (need "
1504 << num_lanes
1505 << ", have " << m_free_lanes << ") "
1506 << *req << dendl;
1507 alloc_succeeds = false;
1508 /* This isn't considered a "no space" alloc fail. Lanes are a throttling mechanism. */
1509 }
1510 if (m_free_log_entries < num_log_entries) {
f67539c2
TL
1511 ldout(m_image_ctx.cct, 20) << "not enough free entries (need "
1512 << num_log_entries
1513 << ", have " << m_free_log_entries << ") "
1514 << *req << dendl;
1515 alloc_succeeds = false;
1516 no_space = true; /* Entries must be retired */
1517 }
1518 /* Don't attempt buffer allocate if we've exceeded the "full" threshold */
a4b75251 1519 if (m_bytes_allocated + bytes_allocated > m_bytes_allocated_cap) {
20effc67
TL
1520 ldout(m_image_ctx.cct, 20) << "Waiting for allocation cap (cap="
1521 << m_bytes_allocated_cap
1522 << ", allocated=" << m_bytes_allocated
1523 << ") in write [" << *req << "]" << dendl;
f67539c2
TL
1524 alloc_succeeds = false;
1525 no_space = true; /* Entries must be retired */
1526 }
1527 }
1528
1529 if (alloc_succeeds) {
1530 reserve_cache(req, alloc_succeeds, no_space);
1531 }
1532
1533 if (alloc_succeeds) {
1534 std::lock_guard locker(m_lock);
1535 /* We need one free log entry per extent (each is a separate entry), and
1536 * one free "lane" for remote replication. */
1537 if ((m_free_lanes >= num_lanes) &&
a4b75251
TL
1538 (m_free_log_entries >= num_log_entries) &&
1539 (m_bytes_allocated_cap >= m_bytes_allocated + bytes_allocated)) {
f67539c2
TL
1540 m_free_lanes -= num_lanes;
1541 m_free_log_entries -= num_log_entries;
1542 m_unpublished_reserves += num_unpublished_reserves;
1543 m_bytes_allocated += bytes_allocated;
1544 m_bytes_cached += bytes_cached;
1545 m_bytes_dirty += bytes_dirtied;
f67539c2
TL
1546 } else {
1547 alloc_succeeds = false;
1548 }
1549 }
1550
1551 if (!alloc_succeeds && no_space) {
1552 /* Expedite flushing and/or retiring */
1553 std::lock_guard locker(m_lock);
1554 m_alloc_failed_since_retire = true;
1555 m_last_alloc_fail = ceph_clock_now();
1556 }
1557
1558 return alloc_succeeds;
1559}
1560
1561template <typename I>
1562C_FlushRequest<AbstractWriteLog<I>>* AbstractWriteLog<I>::make_flush_req(Context *on_finish) {
1563 utime_t flush_begins = ceph_clock_now();
1564 bufferlist bl;
1565 auto *flush_req =
1566 new C_FlushRequestT(*this, flush_begins, Extents({whole_volume_extent()}),
1567 std::move(bl), 0, m_lock, m_perfcounter, on_finish);
1568
1569 return flush_req;
1570}
1571
1572template <typename I>
1573void AbstractWriteLog<I>::wake_up() {
1574 CephContext *cct = m_image_ctx.cct;
1575 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1576
1577 if (!m_wake_up_enabled) {
1578 // wake_up is disabled during shutdown after flushing completes
1579 ldout(m_image_ctx.cct, 6) << "deferred processing disabled" << dendl;
1580 return;
1581 }
1582
1583 if (m_wake_up_requested && m_wake_up_scheduled) {
1584 return;
1585 }
1586
1587 ldout(cct, 20) << dendl;
1588
1589 /* Wake-up can be requested while it's already scheduled */
1590 m_wake_up_requested = true;
1591
1592 /* Wake-up cannot be scheduled if it's already scheduled */
1593 if (m_wake_up_scheduled) {
1594 return;
1595 }
1596 m_wake_up_scheduled = true;
1597 m_async_process_work++;
1598 m_async_op_tracker.start_op();
1599 m_work_queue.queue(new LambdaContext(
1600 [this](int r) {
1601 process_work();
1602 m_async_op_tracker.finish_op();
1603 m_async_process_work--;
1604 }), 0);
1605}
1606
1607template <typename I>
1608bool AbstractWriteLog<I>::can_flush_entry(std::shared_ptr<GenericLogEntry> log_entry) {
1609 CephContext *cct = m_image_ctx.cct;
1610
1611 ldout(cct, 20) << "" << dendl;
1612 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1613
1614 if (m_invalidating) {
1615 return true;
1616 }
1617
1618 /* For OWB we can flush entries with the same sync gen number (write between
1619 * aio_flush() calls) concurrently. Here we'll consider an entry flushable if
1620 * its sync gen number is <= the lowest sync gen number carried by all the
1621 * entries currently flushing.
1622 *
1623 * If the entry considered here bears a sync gen number lower than a
1624 * previously flushed entry, the application had to have submitted the write
1625 * bearing the higher gen number before the write with the lower gen number
1626 * completed. So, flushing these concurrently is OK.
1627 *
1628 * If the entry considered here bears a sync gen number higher than a
1629 * currently flushing entry, the write with the lower gen number may have
1630 * completed to the application before the write with the higher sync gen
1631 * number was submitted, and the application may rely on that completion
1632 * order for volume consistency. In this case the entry will not be
1633 * considered flushable until all the entries bearing lower sync gen numbers
1634 * finish flushing.
1635 */
1636
1637 if (m_flush_ops_in_flight &&
1638 (log_entry->ram_entry.sync_gen_number > m_lowest_flushing_sync_gen)) {
1639 return false;
1640 }
1641
1642 return (log_entry->can_writeback() &&
1643 (m_flush_ops_in_flight <= IN_FLIGHT_FLUSH_WRITE_LIMIT) &&
1644 (m_flush_bytes_in_flight <= IN_FLIGHT_FLUSH_BYTES_LIMIT));
1645}
1646
1647template <typename I>
20effc67
TL
1648void AbstractWriteLog<I>::detain_flush_guard_request(std::shared_ptr<GenericLogEntry> log_entry,
1649 GuardedRequestFunctionContext *guarded_ctx) {
1650 ldout(m_image_ctx.cct, 20) << dendl;
f67539c2 1651
20effc67
TL
1652 BlockExtent extent;
1653 if (log_entry->is_sync_point()) {
1654 extent = block_extent(whole_volume_extent());
1655 } else {
1656 extent = log_entry->ram_entry.block_extent();
f67539c2 1657 }
20effc67
TL
1658
1659 auto req = GuardedRequest(extent, guarded_ctx, false);
1660 BlockGuardCell *cell = nullptr;
1661
1662 {
1663 std::lock_guard locker(m_flush_guard_lock);
1664 m_flush_guard.detain(req.block_extent, &req, &cell);
1665 }
1666 if (cell) {
1667 req.guard_ctx->cell = cell;
1668 m_image_ctx.op_work_queue->queue(req.guard_ctx, 0);
1669 }
1670}
1671
1672template <typename I>
1673Context* AbstractWriteLog<I>::construct_flush_entry(std::shared_ptr<GenericLogEntry> log_entry,
1674 bool invalidating) {
1675 ldout(m_image_ctx.cct, 20) << "" << dendl;
f67539c2
TL
1676
1677 /* Flush write completion action */
a4b75251 1678 utime_t writeback_start_time = ceph_clock_now();
f67539c2 1679 Context *ctx = new LambdaContext(
a4b75251
TL
1680 [this, log_entry, writeback_start_time, invalidating](int r) {
1681 utime_t writeback_comp_time = ceph_clock_now();
1682 m_perfcounter->tinc(l_librbd_pwl_writeback_latency,
1683 writeback_comp_time - writeback_start_time);
f67539c2
TL
1684 {
1685 std::lock_guard locker(m_lock);
1686 if (r < 0) {
1687 lderr(m_image_ctx.cct) << "failed to flush log entry"
1688 << cpp_strerror(r) << dendl;
1689 m_dirty_log_entries.push_front(log_entry);
1690 } else {
1691 ceph_assert(m_bytes_dirty >= log_entry->bytes_dirty());
1692 log_entry->set_flushed(true);
1693 m_bytes_dirty -= log_entry->bytes_dirty();
1694 sync_point_writer_flushed(log_entry->get_sync_point_entry());
1695 ldout(m_image_ctx.cct, 20) << "flushed: " << log_entry
1696 << " invalidating=" << invalidating
1697 << dendl;
1698 }
1699 m_flush_ops_in_flight -= 1;
1700 m_flush_bytes_in_flight -= log_entry->ram_entry.write_bytes;
1701 wake_up();
1702 }
1703 });
1704 /* Flush through lower cache before completing */
1705 ctx = new LambdaContext(
20effc67
TL
1706 [this, ctx, log_entry](int r) {
1707 {
1708
1709 WriteLogGuard::BlockOperations block_reqs;
1710 BlockGuardCell *detained_cell = nullptr;
1711
1712 std::lock_guard locker{m_flush_guard_lock};
1713 m_flush_guard.release(log_entry->m_cell, &block_reqs);
1714
1715 for (auto &req : block_reqs) {
1716 m_flush_guard.detain(req.block_extent, &req, &detained_cell);
1717 if (detained_cell) {
1718 req.guard_ctx->cell = detained_cell;
1719 m_image_ctx.op_work_queue->queue(req.guard_ctx, 0);
1720 }
1721 }
1722 }
1723
f67539c2
TL
1724 if (r < 0) {
1725 lderr(m_image_ctx.cct) << "failed to flush log entry"
1726 << cpp_strerror(r) << dendl;
1727 ctx->complete(r);
1728 } else {
1729 m_image_writeback.aio_flush(io::FLUSH_SOURCE_WRITEBACK, ctx);
1730 }
1731 });
1732 return ctx;
1733}
1734
1735template <typename I>
1736void AbstractWriteLog<I>::process_writeback_dirty_entries() {
1737 CephContext *cct = m_image_ctx.cct;
1738 bool all_clean = false;
1739 int flushed = 0;
a4b75251 1740 bool has_write_entry = false;
f67539c2
TL
1741
1742 ldout(cct, 20) << "Look for dirty entries" << dendl;
1743 {
1744 DeferredContexts post_unlock;
a4b75251
TL
1745 GenericLogEntries entries_to_flush;
1746
f67539c2 1747 std::shared_lock entry_reader_locker(m_entry_reader_lock);
a4b75251 1748 std::lock_guard locker(m_lock);
f67539c2 1749 while (flushed < IN_FLIGHT_FLUSH_WRITE_LIMIT) {
f67539c2
TL
1750 if (m_shutting_down) {
1751 ldout(cct, 5) << "Flush during shutdown supressed" << dendl;
1752 /* Do flush complete only when all flush ops are finished */
1753 all_clean = !m_flush_ops_in_flight;
1754 break;
1755 }
1756 if (m_dirty_log_entries.empty()) {
1757 ldout(cct, 20) << "Nothing new to flush" << dendl;
1758 /* Do flush complete only when all flush ops are finished */
1759 all_clean = !m_flush_ops_in_flight;
33c7a0ef
TL
1760 if (!m_cache_state->clean && all_clean) {
1761 m_cache_state->clean = true;
1762 update_image_cache_state();
1763 }
f67539c2
TL
1764 break;
1765 }
a4b75251 1766
f67539c2
TL
1767 auto candidate = m_dirty_log_entries.front();
1768 bool flushable = can_flush_entry(candidate);
1769 if (flushable) {
a4b75251 1770 entries_to_flush.push_back(candidate);
f67539c2 1771 flushed++;
a4b75251
TL
1772 if (!has_write_entry)
1773 has_write_entry = candidate->is_write_entry();
f67539c2 1774 m_dirty_log_entries.pop_front();
20effc67
TL
1775
1776 // To track candidate, we should add m_flush_ops_in_flight in here
1777 {
1778 if (!m_flush_ops_in_flight ||
1779 (candidate->ram_entry.sync_gen_number < m_lowest_flushing_sync_gen)) {
1780 m_lowest_flushing_sync_gen = candidate->ram_entry.sync_gen_number;
1781 }
1782 m_flush_ops_in_flight += 1;
1783 /* For write same this is the bytes affected by the flush op, not the bytes transferred */
1784 m_flush_bytes_in_flight += candidate->ram_entry.write_bytes;
1785 }
f67539c2
TL
1786 } else {
1787 ldout(cct, 20) << "Next dirty entry isn't flushable yet" << dendl;
1788 break;
1789 }
1790 }
a4b75251
TL
1791
1792 construct_flush_entries(entries_to_flush, post_unlock, has_write_entry);
f67539c2
TL
1793 }
1794
1795 if (all_clean) {
1796 /* All flushing complete, drain outside lock */
1797 Contexts flush_contexts;
1798 {
1799 std::lock_guard locker(m_lock);
1800 flush_contexts.swap(m_flush_complete_contexts);
1801 }
1802 finish_contexts(m_image_ctx.cct, flush_contexts, 0);
1803 }
1804}
1805
1806/* Returns true if the specified SyncPointLogEntry is considered flushed, and
1807 * the log will be updated to reflect this. */
1808template <typename I>
1809bool AbstractWriteLog<I>::handle_flushed_sync_point(std::shared_ptr<SyncPointLogEntry> log_entry)
1810{
1811 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1812 ceph_assert(log_entry);
1813
1814 if ((log_entry->writes_flushed == log_entry->writes) &&
1815 log_entry->completed && log_entry->prior_sync_point_flushed &&
1816 log_entry->next_sync_point_entry) {
1817 ldout(m_image_ctx.cct, 20) << "All writes flushed up to sync point="
1818 << *log_entry << dendl;
1819 log_entry->next_sync_point_entry->prior_sync_point_flushed = true;
1820 /* Don't move the flushed sync gen num backwards. */
1821 if (m_flushed_sync_gen < log_entry->ram_entry.sync_gen_number) {
1822 m_flushed_sync_gen = log_entry->ram_entry.sync_gen_number;
1823 }
1824 m_async_op_tracker.start_op();
1825 m_work_queue.queue(new LambdaContext(
a4b75251 1826 [this, next = std::move(log_entry->next_sync_point_entry)](int r) {
f67539c2
TL
1827 bool handled_by_next;
1828 {
1829 std::lock_guard locker(m_lock);
a4b75251 1830 handled_by_next = handle_flushed_sync_point(std::move(next));
f67539c2
TL
1831 }
1832 if (!handled_by_next) {
1833 persist_last_flushed_sync_gen();
1834 }
1835 m_async_op_tracker.finish_op();
1836 }));
1837 return true;
1838 }
1839 return false;
1840}
1841
1842template <typename I>
1843void AbstractWriteLog<I>::sync_point_writer_flushed(std::shared_ptr<SyncPointLogEntry> log_entry)
1844{
1845 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1846 ceph_assert(log_entry);
1847 log_entry->writes_flushed++;
1848
1849 /* If this entry might be completely flushed, look closer */
1850 if ((log_entry->writes_flushed == log_entry->writes) && log_entry->completed) {
1851 ldout(m_image_ctx.cct, 15) << "All writes flushed for sync point="
1852 << *log_entry << dendl;
1853 handle_flushed_sync_point(log_entry);
1854 }
1855}
1856
1857/* Make a new sync point and flush the previous during initialization, when there may or may
1858 * not be a previous sync point */
1859template <typename I>
1860void AbstractWriteLog<I>::init_flush_new_sync_point(DeferredContexts &later) {
1861 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1862 ceph_assert(!m_initialized); /* Don't use this after init */
1863
1864 if (!m_current_sync_point) {
1865 /* First sync point since start */
1866 new_sync_point(later);
1867 } else {
1868 flush_new_sync_point(nullptr, later);
1869 }
1870}
1871
1872/**
1873 * Begin a new sync point
1874 */
1875template <typename I>
1876void AbstractWriteLog<I>::new_sync_point(DeferredContexts &later) {
1877 CephContext *cct = m_image_ctx.cct;
1878 std::shared_ptr<SyncPoint> old_sync_point = m_current_sync_point;
1879 std::shared_ptr<SyncPoint> new_sync_point;
1880 ldout(cct, 20) << dendl;
1881
1882 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1883
1884 /* The first time this is called, if this is a newly created log,
1885 * this makes the first sync gen number we'll use 1. On the first
1886 * call for a re-opened log m_current_sync_gen will be the highest
1887 * gen number from all the sync point entries found in the re-opened
1888 * log, and this advances to the next sync gen number. */
1889 ++m_current_sync_gen;
1890
1891 new_sync_point = std::make_shared<SyncPoint>(m_current_sync_gen, cct);
1892 m_current_sync_point = new_sync_point;
1893
1894 /* If this log has been re-opened, old_sync_point will initially be
1895 * nullptr, but m_current_sync_gen may not be zero. */
1896 if (old_sync_point) {
1897 new_sync_point->setup_earlier_sync_point(old_sync_point, m_last_op_sequence_num);
1898 m_perfcounter->hinc(l_librbd_pwl_syncpoint_hist,
1899 old_sync_point->log_entry->writes,
1900 old_sync_point->log_entry->bytes);
1901 /* This sync point will acquire no more sub-ops. Activation needs
1902 * to acquire m_lock, so defer to later*/
1903 later.add(new LambdaContext(
33c7a0ef 1904 [old_sync_point](int r) {
f67539c2
TL
1905 old_sync_point->prior_persisted_gather_activate();
1906 }));
1907 }
1908
1909 new_sync_point->prior_persisted_gather_set_finisher();
1910
1911 if (old_sync_point) {
1912 ldout(cct,6) << "new sync point = [" << *m_current_sync_point
1913 << "], prior = [" << *old_sync_point << "]" << dendl;
1914 } else {
1915 ldout(cct,6) << "first sync point = [" << *m_current_sync_point
1916 << "]" << dendl;
1917 }
1918}
1919
1920template <typename I>
1921void AbstractWriteLog<I>::flush_new_sync_point(C_FlushRequestT *flush_req,
1922 DeferredContexts &later) {
1923 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1924
1925 if (!flush_req) {
1926 m_async_null_flush_finish++;
1927 m_async_op_tracker.start_op();
1928 Context *flush_ctx = new LambdaContext([this](int r) {
1929 m_async_null_flush_finish--;
1930 m_async_op_tracker.finish_op();
1931 });
1932 flush_req = make_flush_req(flush_ctx);
1933 flush_req->internal = true;
1934 }
1935
1936 /* Add a new sync point. */
1937 new_sync_point(later);
1938 std::shared_ptr<SyncPoint> to_append = m_current_sync_point->earlier_sync_point;
1939 ceph_assert(to_append);
1940
1941 /* This flush request will append/persist the (now) previous sync point */
1942 flush_req->to_append = to_append;
1943
1944 /* When the m_sync_point_persist Gather completes this sync point can be
1945 * appended. The only sub for this Gather is the finisher Context for
1946 * m_prior_log_entries_persisted, which records the result of the Gather in
1947 * the sync point, and completes. TODO: Do we still need both of these
1948 * Gathers?*/
1949 Context * ctx = new LambdaContext([this, flush_req](int r) {
1950 ldout(m_image_ctx.cct, 20) << "Flush req=" << flush_req
1951 << " sync point =" << flush_req->to_append
1952 << ". Ready to persist." << dendl;
1953 alloc_and_dispatch_io_req(flush_req);
1954 });
1955 to_append->persist_gather_set_finisher(ctx);
1956
1957 /* The m_sync_point_persist Gather has all the subs it will ever have, and
1958 * now has its finisher. If the sub is already complete, activation will
1959 * complete the Gather. The finisher will acquire m_lock, so we'll activate
1960 * this when we release m_lock.*/
33c7a0ef 1961 later.add(new LambdaContext([to_append](int r) {
f67539c2
TL
1962 to_append->persist_gather_activate();
1963 }));
1964
1965 /* The flush request completes when the sync point persists */
1966 to_append->add_in_on_persisted_ctxs(flush_req);
1967}
1968
1969template <typename I>
1970void AbstractWriteLog<I>::flush_new_sync_point_if_needed(C_FlushRequestT *flush_req,
1971 DeferredContexts &later) {
1972 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1973
1974 /* If there have been writes since the last sync point ... */
1975 if (m_current_sync_point->log_entry->writes) {
1976 flush_new_sync_point(flush_req, later);
1977 } else {
1978 /* There have been no writes to the current sync point. */
1979 if (m_current_sync_point->earlier_sync_point) {
1980 /* If previous sync point hasn't completed, complete this flush
1981 * with the earlier sync point. No alloc or dispatch needed. */
1982 m_current_sync_point->earlier_sync_point->on_sync_point_persisted.push_back(flush_req);
1983 } else {
1984 /* The previous sync point has already completed and been
1985 * appended. The current sync point has no writes, so this flush
1986 * has nothing to wait for. This flush completes now. */
1987 later.add(flush_req);
1988 }
1989 }
1990}
1991
1992/*
1993 * RWL internal flush - will actually flush the RWL.
1994 *
1995 * User flushes should arrive at aio_flush(), and only flush prior
1996 * writes to all log replicas.
1997 *
1998 * Librbd internal flushes will arrive at flush(invalidate=false,
1999 * discard=false), and traverse the block guard to ensure in-flight writes are
2000 * flushed.
2001 */
2002template <typename I>
2003void AbstractWriteLog<I>::flush_dirty_entries(Context *on_finish) {
2004 CephContext *cct = m_image_ctx.cct;
2005 bool all_clean;
2006 bool flushing;
2007 bool stop_flushing;
2008
2009 {
2010 std::lock_guard locker(m_lock);
2011 flushing = (0 != m_flush_ops_in_flight);
2012 all_clean = m_dirty_log_entries.empty();
33c7a0ef
TL
2013 if (!m_cache_state->clean && all_clean && !flushing) {
2014 m_cache_state->clean = true;
2015 update_image_cache_state();
2016 }
f67539c2
TL
2017 stop_flushing = (m_shutting_down);
2018 }
2019
2020 if (!flushing && (all_clean || stop_flushing)) {
2021 /* Complete without holding m_lock */
2022 if (all_clean) {
2023 ldout(cct, 20) << "no dirty entries" << dendl;
2024 } else {
2025 ldout(cct, 5) << "flush during shutdown suppressed" << dendl;
2026 }
2027 on_finish->complete(0);
2028 } else {
2029 if (all_clean) {
2030 ldout(cct, 5) << "flush ops still in progress" << dendl;
2031 } else {
2032 ldout(cct, 20) << "dirty entries remain" << dendl;
2033 }
2034 std::lock_guard locker(m_lock);
2035 /* on_finish can't be completed yet */
2036 m_flush_complete_contexts.push_back(new LambdaContext(
2037 [this, on_finish](int r) {
2038 flush_dirty_entries(on_finish);
2039 }));
2040 wake_up();
2041 }
2042}
2043
2044template <typename I>
2045void AbstractWriteLog<I>::internal_flush(bool invalidate, Context *on_finish) {
2046 ldout(m_image_ctx.cct, 20) << "invalidate=" << invalidate << dendl;
2047
2048 if (m_perfcounter) {
2049 if (invalidate) {
2050 m_perfcounter->inc(l_librbd_pwl_invalidate_cache, 1);
2051 } else {
a4b75251 2052 m_perfcounter->inc(l_librbd_pwl_internal_flush, 1);
f67539c2
TL
2053 }
2054 }
2055
2056 /* May be called even if initialization fails */
2057 if (!m_initialized) {
2058 ldout(m_image_ctx.cct, 05) << "never initialized" << dendl;
2059 /* Deadlock if completed here */
2060 m_image_ctx.op_work_queue->queue(on_finish, 0);
2061 return;
2062 }
2063
2064 /* Flush/invalidate must pass through block guard to ensure all layers of
2065 * cache are consistently flush/invalidated. This ensures no in-flight write leaves
2066 * some layers with valid regions, which may later produce inconsistent read
2067 * results. */
2068 GuardedRequestFunctionContext *guarded_ctx =
2069 new GuardedRequestFunctionContext(
2070 [this, on_finish, invalidate](GuardedRequestFunctionContext &guard_ctx) {
2071 DeferredContexts on_exit;
2072 ldout(m_image_ctx.cct, 20) << "cell=" << guard_ctx.cell << dendl;
2073 ceph_assert(guard_ctx.cell);
2074
2075 Context *ctx = new LambdaContext(
2076 [this, cell=guard_ctx.cell, invalidate, on_finish](int r) {
2077 std::lock_guard locker(m_lock);
2078 m_invalidating = false;
2079 ldout(m_image_ctx.cct, 6) << "Done flush/invalidating (invalidate="
2080 << invalidate << ")" << dendl;
2081 if (m_log_entries.size()) {
2082 ldout(m_image_ctx.cct, 1) << "m_log_entries.size()="
20effc67
TL
2083 << m_log_entries.size()
2084 << ", front()=" << *m_log_entries.front()
f67539c2
TL
2085 << dendl;
2086 }
2087 if (invalidate) {
2088 ceph_assert(m_log_entries.size() == 0);
2089 }
2090 ceph_assert(m_dirty_log_entries.size() == 0);
2091 m_image_ctx.op_work_queue->queue(on_finish, r);
2092 release_guarded_request(cell);
2093 });
2094 ctx = new LambdaContext(
2095 [this, ctx, invalidate](int r) {
2096 Context *next_ctx = ctx;
2097 ldout(m_image_ctx.cct, 6) << "flush_dirty_entries finished" << dendl;
2098 if (r < 0) {
2099 /* Override on_finish status with this error */
2100 next_ctx = new LambdaContext([r, ctx](int _r) {
2101 ctx->complete(r);
2102 });
2103 }
2104 if (invalidate) {
2105 {
2106 std::lock_guard locker(m_lock);
2107 ceph_assert(m_dirty_log_entries.size() == 0);
2108 ceph_assert(!m_invalidating);
2109 ldout(m_image_ctx.cct, 6) << "Invalidating" << dendl;
2110 m_invalidating = true;
2111 }
2112 /* Discards all RWL entries */
2113 while (retire_entries(MAX_ALLOC_PER_TRANSACTION)) { }
2114 next_ctx->complete(0);
2115 } else {
2116 {
2117 std::lock_guard locker(m_lock);
2118 ceph_assert(m_dirty_log_entries.size() == 0);
2119 ceph_assert(!m_invalidating);
2120 }
2121 m_image_writeback.aio_flush(io::FLUSH_SOURCE_WRITEBACK, next_ctx);
2122 }
2123 });
2124 ctx = new LambdaContext(
2125 [this, ctx](int r) {
2126 flush_dirty_entries(ctx);
2127 });
2128 std::lock_guard locker(m_lock);
2129 /* Even if we're throwing everything away, but we want the last entry to
2130 * be a sync point so we can cleanly resume.
2131 *
2132 * Also, the blockguard only guarantees the replication of this op
2133 * can't overlap with prior ops. It doesn't guarantee those are all
2134 * completed and eligible for flush & retire, which we require here.
2135 */
2136 auto flush_req = make_flush_req(ctx);
2137 flush_new_sync_point_if_needed(flush_req, on_exit);
2138 });
2139 detain_guarded_request(nullptr, guarded_ctx, true);
2140}
2141
2142template <typename I>
2143void AbstractWriteLog<I>::add_into_log_map(GenericWriteLogEntries &log_entries,
2144 C_BlockIORequestT *req) {
2145 req->copy_cache();
2146 m_blocks_to_log_entries.add_log_entries(log_entries);
2147}
2148
2149template <typename I>
2150bool AbstractWriteLog<I>::can_retire_entry(std::shared_ptr<GenericLogEntry> log_entry) {
2151 CephContext *cct = m_image_ctx.cct;
2152
2153 ldout(cct, 20) << dendl;
2154 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
20effc67 2155 ceph_assert(log_entry);
f67539c2
TL
2156 return log_entry->can_retire();
2157}
2158
a4b75251
TL
2159template <typename I>
2160void AbstractWriteLog<I>::check_image_cache_state_clean() {
2161 ceph_assert(m_deferred_ios.empty());
20effc67 2162 ceph_assert(m_ops_to_append.empty());
a4b75251
TL
2163 ceph_assert(m_async_flush_ops == 0);
2164 ceph_assert(m_async_append_ops == 0);
2165 ceph_assert(m_dirty_log_entries.empty());
2166 ceph_assert(m_ops_to_flush.empty());
2167 ceph_assert(m_flush_ops_in_flight == 0);
2168 ceph_assert(m_flush_bytes_in_flight == 0);
2169 ceph_assert(m_bytes_dirty == 0);
2170 ceph_assert(m_work_queue.empty());
2171}
2172
f67539c2
TL
2173} // namespace pwl
2174} // namespace cache
2175} // namespace librbd
2176
2177template class librbd::cache::pwl::AbstractWriteLog<librbd::ImageCtx>;