]> git.proxmox.com Git - ceph.git/blame - ceph/src/librbd/cache/pwl/AbstractWriteLog.cc
import ceph 16.2.7
[ceph.git] / ceph / src / librbd / cache / pwl / AbstractWriteLog.cc
CommitLineData
f67539c2
TL
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
4#include "AbstractWriteLog.h"
5#include "include/buffer.h"
6#include "include/Context.h"
7#include "include/ceph_assert.h"
8#include "common/deleter.h"
9#include "common/dout.h"
10#include "common/environment.h"
11#include "common/errno.h"
12#include "common/hostname.h"
13#include "common/WorkQueue.h"
14#include "common/Timer.h"
15#include "common/perf_counters.h"
16#include "librbd/ImageCtx.h"
17#include "librbd/asio/ContextWQ.h"
18#include "librbd/cache/pwl/ImageCacheState.h"
19#include "librbd/cache/pwl/LogEntry.h"
20#include "librbd/plugin/Api.h"
21#include <map>
22#include <vector>
23
24#undef dout_subsys
25#define dout_subsys ceph_subsys_rbd_pwl
26#undef dout_prefix
27#define dout_prefix *_dout << "librbd::cache::pwl::AbstractWriteLog: " << this \
28 << " " << __func__ << ": "
29
30namespace librbd {
31namespace cache {
32namespace pwl {
33
34using namespace librbd::cache::pwl;
35
36typedef AbstractWriteLog<ImageCtx>::Extent Extent;
37typedef AbstractWriteLog<ImageCtx>::Extents Extents;
38
39template <typename I>
40AbstractWriteLog<I>::AbstractWriteLog(
41 I &image_ctx, librbd::cache::pwl::ImageCacheState<I>* cache_state,
42 Builder<This> *builder, cache::ImageWritebackInterface& image_writeback,
43 plugin::Api<I>& plugin_api)
44 : m_builder(builder),
45 m_write_log_guard(image_ctx.cct),
46 m_deferred_dispatch_lock(ceph::make_mutex(pwl::unique_lock_name(
47 "librbd::cache::pwl::AbstractWriteLog::m_deferred_dispatch_lock", this))),
48 m_blockguard_lock(ceph::make_mutex(pwl::unique_lock_name(
49 "librbd::cache::pwl::AbstractWriteLog::m_blockguard_lock", this))),
50 m_thread_pool(
51 image_ctx.cct, "librbd::cache::pwl::AbstractWriteLog::thread_pool",
52 "tp_pwl", 4, ""),
53 m_cache_state(cache_state),
54 m_image_ctx(image_ctx),
a4b75251 55 m_log_pool_size(DEFAULT_POOL_SIZE),
f67539c2
TL
56 m_image_writeback(image_writeback),
57 m_plugin_api(plugin_api),
58 m_log_retire_lock(ceph::make_mutex(pwl::unique_lock_name(
59 "librbd::cache::pwl::AbstractWriteLog::m_log_retire_lock", this))),
60 m_entry_reader_lock("librbd::cache::pwl::AbstractWriteLog::m_entry_reader_lock"),
61 m_log_append_lock(ceph::make_mutex(pwl::unique_lock_name(
62 "librbd::cache::pwl::AbstractWriteLog::m_log_append_lock", this))),
63 m_lock(ceph::make_mutex(pwl::unique_lock_name(
64 "librbd::cache::pwl::AbstractWriteLog::m_lock", this))),
65 m_blocks_to_log_entries(image_ctx.cct),
66 m_work_queue("librbd::cache::pwl::ReplicatedWriteLog::work_queue",
67 ceph::make_timespan(
68 image_ctx.config.template get_val<uint64_t>(
69 "rbd_op_thread_timeout")),
70 &m_thread_pool)
71{
72 CephContext *cct = m_image_ctx.cct;
73 m_plugin_api.get_image_timer_instance(cct, &m_timer, &m_timer_lock);
74}
75
76template <typename I>
77AbstractWriteLog<I>::~AbstractWriteLog() {
78 ldout(m_image_ctx.cct, 15) << "enter" << dendl;
79 {
80 std::lock_guard timer_locker(*m_timer_lock);
81 std::lock_guard locker(m_lock);
82 m_timer->cancel_event(m_timer_ctx);
83 m_thread_pool.stop();
84 ceph_assert(m_deferred_ios.size() == 0);
85 ceph_assert(m_ops_to_flush.size() == 0);
86 ceph_assert(m_ops_to_append.size() == 0);
87 ceph_assert(m_flush_ops_in_flight == 0);
88
89 delete m_cache_state;
90 m_cache_state = nullptr;
91 }
92 ldout(m_image_ctx.cct, 15) << "exit" << dendl;
93}
94
95template <typename I>
96void AbstractWriteLog<I>::perf_start(std::string name) {
97 PerfCountersBuilder plb(m_image_ctx.cct, name, l_librbd_pwl_first,
98 l_librbd_pwl_last);
99
100 // Latency axis configuration for op histograms, values are in nanoseconds
101 PerfHistogramCommon::axis_config_d op_hist_x_axis_config{
102 "Latency (nsec)",
103 PerfHistogramCommon::SCALE_LOG2, ///< Latency in logarithmic scale
104 0, ///< Start at 0
105 5000, ///< Quantization unit is 5usec
106 16, ///< Ranges into the mS
107 };
108
109 // Syncpoint logentry number x-axis configuration for op histograms
110 PerfHistogramCommon::axis_config_d sp_logentry_number_config{
111 "logentry number",
112 PerfHistogramCommon::SCALE_LINEAR, // log entry number in linear scale
113 0, // Start at 0
114 1, // Quantization unit is 1
115 260, // Up to 260 > (MAX_WRITES_PER_SYNC_POINT)
116 };
117
118 // Syncpoint bytes number y-axis configuration for op histogram
119 PerfHistogramCommon::axis_config_d sp_bytes_number_config{
120 "Number of SyncPoint",
121 PerfHistogramCommon::SCALE_LOG2, // Request size in logarithmic scale
122 0, // Start at 0
123 512, // Quantization unit is 512
124 17, // Writes up to 8M >= MAX_BYTES_PER_SYNC_POINT
125 };
126
127 // Op size axis configuration for op histogram y axis, values are in bytes
128 PerfHistogramCommon::axis_config_d op_hist_y_axis_config{
129 "Request size (bytes)",
130 PerfHistogramCommon::SCALE_LOG2, ///< Request size in logarithmic scale
131 0, ///< Start at 0
132 512, ///< Quantization unit is 512 bytes
133 16, ///< Writes up to >32k
134 };
135
136 // Num items configuration for op histogram y axis, values are in items
137 PerfHistogramCommon::axis_config_d op_hist_y_axis_count_config{
138 "Number of items",
139 PerfHistogramCommon::SCALE_LINEAR, ///< Request size in linear scale
140 0, ///< Start at 0
141 1, ///< Quantization unit is 1
142 32, ///< Writes up to >32k
143 };
144
145 plb.add_u64_counter(l_librbd_pwl_rd_req, "rd", "Reads");
146 plb.add_u64_counter(l_librbd_pwl_rd_bytes, "rd_bytes", "Data size in reads");
147 plb.add_time_avg(l_librbd_pwl_rd_latency, "rd_latency", "Latency of reads");
148
149 plb.add_u64_counter(l_librbd_pwl_rd_hit_req, "hit_rd", "Reads completely hitting RWL");
150 plb.add_u64_counter(l_librbd_pwl_rd_hit_bytes, "rd_hit_bytes", "Bytes read from RWL");
151 plb.add_time_avg(l_librbd_pwl_rd_hit_latency, "hit_rd_latency", "Latency of read hits");
152
153 plb.add_u64_counter(l_librbd_pwl_rd_part_hit_req, "part_hit_rd", "reads partially hitting RWL");
154
155 plb.add_u64_counter_histogram(
156 l_librbd_pwl_syncpoint_hist, "syncpoint_logentry_bytes_histogram",
157 sp_logentry_number_config, sp_bytes_number_config,
158 "Histogram of syncpoint's logentry numbers vs bytes number");
159
160 plb.add_u64_counter(l_librbd_pwl_wr_req, "wr", "Writes");
a4b75251 161 plb.add_u64_counter(l_librbd_pwl_wr_bytes, "wr_bytes", "Data size in writes");
f67539c2
TL
162 plb.add_u64_counter(l_librbd_pwl_wr_req_def, "wr_def", "Writes deferred for resources");
163 plb.add_u64_counter(l_librbd_pwl_wr_req_def_lanes, "wr_def_lanes", "Writes deferred for lanes");
164 plb.add_u64_counter(l_librbd_pwl_wr_req_def_log, "wr_def_log", "Writes deferred for log entries");
165 plb.add_u64_counter(l_librbd_pwl_wr_req_def_buf, "wr_def_buf", "Writes deferred for buffers");
166 plb.add_u64_counter(l_librbd_pwl_wr_req_overlap, "wr_overlap", "Writes overlapping with prior in-progress writes");
167 plb.add_u64_counter(l_librbd_pwl_wr_req_queued, "wr_q_barrier", "Writes queued for prior barriers (aio_flush)");
f67539c2
TL
168
169 plb.add_u64_counter(l_librbd_pwl_log_ops, "log_ops", "Log appends");
170 plb.add_u64_avg(l_librbd_pwl_log_op_bytes, "log_op_bytes", "Average log append bytes");
171
172 plb.add_time_avg(
173 l_librbd_pwl_req_arr_to_all_t, "req_arr_to_all_t",
174 "Average arrival to allocation time (time deferred for overlap)");
175 plb.add_time_avg(
176 l_librbd_pwl_req_arr_to_dis_t, "req_arr_to_dis_t",
177 "Average arrival to dispatch time (includes time deferred for overlaps and allocation)");
178 plb.add_time_avg(
179 l_librbd_pwl_req_all_to_dis_t, "req_all_to_dis_t",
180 "Average allocation to dispatch time (time deferred for log resources)");
181 plb.add_time_avg(
182 l_librbd_pwl_wr_latency, "wr_latency",
183 "Latency of writes (persistent completion)");
184 plb.add_u64_counter_histogram(
185 l_librbd_pwl_wr_latency_hist, "wr_latency_bytes_histogram",
186 op_hist_x_axis_config, op_hist_y_axis_config,
187 "Histogram of write request latency (nanoseconds) vs. bytes written");
188 plb.add_time_avg(
189 l_librbd_pwl_wr_caller_latency, "caller_wr_latency",
190 "Latency of write completion to caller");
191 plb.add_time_avg(
192 l_librbd_pwl_nowait_req_arr_to_all_t, "req_arr_to_all_nw_t",
193 "Average arrival to allocation time (time deferred for overlap)");
194 plb.add_time_avg(
195 l_librbd_pwl_nowait_req_arr_to_dis_t, "req_arr_to_dis_nw_t",
196 "Average arrival to dispatch time (includes time deferred for overlaps and allocation)");
197 plb.add_time_avg(
198 l_librbd_pwl_nowait_req_all_to_dis_t, "req_all_to_dis_nw_t",
199 "Average allocation to dispatch time (time deferred for log resources)");
200 plb.add_time_avg(
201 l_librbd_pwl_nowait_wr_latency, "wr_latency_nw",
202 "Latency of writes (persistent completion) not deferred for free space");
203 plb.add_u64_counter_histogram(
204 l_librbd_pwl_nowait_wr_latency_hist, "wr_latency_nw_bytes_histogram",
205 op_hist_x_axis_config, op_hist_y_axis_config,
206 "Histogram of write request latency (nanoseconds) vs. bytes written for writes not deferred for free space");
207 plb.add_time_avg(
208 l_librbd_pwl_nowait_wr_caller_latency, "caller_wr_latency_nw",
209 "Latency of write completion to callerfor writes not deferred for free space");
210 plb.add_time_avg(l_librbd_pwl_log_op_alloc_t, "op_alloc_t", "Average buffer pmemobj_reserve() time");
211 plb.add_u64_counter_histogram(
212 l_librbd_pwl_log_op_alloc_t_hist, "op_alloc_t_bytes_histogram",
213 op_hist_x_axis_config, op_hist_y_axis_config,
214 "Histogram of buffer pmemobj_reserve() time (nanoseconds) vs. bytes written");
215 plb.add_time_avg(l_librbd_pwl_log_op_dis_to_buf_t, "op_dis_to_buf_t", "Average dispatch to buffer persist time");
216 plb.add_time_avg(l_librbd_pwl_log_op_dis_to_app_t, "op_dis_to_app_t", "Average dispatch to log append time");
217 plb.add_time_avg(l_librbd_pwl_log_op_dis_to_cmp_t, "op_dis_to_cmp_t", "Average dispatch to persist completion time");
218 plb.add_u64_counter_histogram(
219 l_librbd_pwl_log_op_dis_to_cmp_t_hist, "op_dis_to_cmp_t_bytes_histogram",
220 op_hist_x_axis_config, op_hist_y_axis_config,
221 "Histogram of op dispatch to persist complete time (nanoseconds) vs. bytes written");
222
223 plb.add_time_avg(
224 l_librbd_pwl_log_op_buf_to_app_t, "op_buf_to_app_t",
225 "Average buffer persist to log append time (write data persist/replicate + wait for append time)");
226 plb.add_time_avg(
227 l_librbd_pwl_log_op_buf_to_bufc_t, "op_buf_to_bufc_t",
228 "Average buffer persist time (write data persist/replicate time)");
229 plb.add_u64_counter_histogram(
230 l_librbd_pwl_log_op_buf_to_bufc_t_hist, "op_buf_to_bufc_t_bytes_histogram",
231 op_hist_x_axis_config, op_hist_y_axis_config,
232 "Histogram of write buffer persist time (nanoseconds) vs. bytes written");
233 plb.add_time_avg(
234 l_librbd_pwl_log_op_app_to_cmp_t, "op_app_to_cmp_t",
235 "Average log append to persist complete time (log entry append/replicate + wait for complete time)");
236 plb.add_time_avg(
237 l_librbd_pwl_log_op_app_to_appc_t, "op_app_to_appc_t",
238 "Average log append to persist complete time (log entry append/replicate time)");
239 plb.add_u64_counter_histogram(
240 l_librbd_pwl_log_op_app_to_appc_t_hist, "op_app_to_appc_t_bytes_histogram",
241 op_hist_x_axis_config, op_hist_y_axis_config,
242 "Histogram of log append persist time (nanoseconds) (vs. op bytes)");
243
244 plb.add_u64_counter(l_librbd_pwl_discard, "discard", "Discards");
245 plb.add_u64_counter(l_librbd_pwl_discard_bytes, "discard_bytes", "Bytes discarded");
246 plb.add_time_avg(l_librbd_pwl_discard_latency, "discard_lat", "Discard latency");
247
248 plb.add_u64_counter(l_librbd_pwl_aio_flush, "aio_flush", "AIO flush (flush to RWL)");
249 plb.add_u64_counter(l_librbd_pwl_aio_flush_def, "aio_flush_def", "AIO flushes deferred for resources");
250 plb.add_time_avg(l_librbd_pwl_aio_flush_latency, "aio_flush_lat", "AIO flush latency");
251
252 plb.add_u64_counter(l_librbd_pwl_ws,"ws", "Write Sames");
253 plb.add_u64_counter(l_librbd_pwl_ws_bytes, "ws_bytes", "Write Same bytes to image");
254 plb.add_time_avg(l_librbd_pwl_ws_latency, "ws_lat", "Write Same latency");
255
256 plb.add_u64_counter(l_librbd_pwl_cmp, "cmp", "Compare and Write requests");
257 plb.add_u64_counter(l_librbd_pwl_cmp_bytes, "cmp_bytes", "Compare and Write bytes compared/written");
258 plb.add_time_avg(l_librbd_pwl_cmp_latency, "cmp_lat", "Compare and Write latecy");
259 plb.add_u64_counter(l_librbd_pwl_cmp_fails, "cmp_fails", "Compare and Write compare fails");
260
a4b75251
TL
261 plb.add_u64_counter(l_librbd_pwl_internal_flush, "internal_flush", "Flush RWL (write back to OSD)");
262 plb.add_time_avg(l_librbd_pwl_writeback_latency, "writeback_lat", "write back to OSD latency");
f67539c2
TL
263 plb.add_u64_counter(l_librbd_pwl_invalidate_cache, "invalidate", "Invalidate RWL");
264 plb.add_u64_counter(l_librbd_pwl_invalidate_discard_cache, "discard", "Discard and invalidate RWL");
265
266 plb.add_time_avg(l_librbd_pwl_append_tx_t, "append_tx_lat", "Log append transaction latency");
267 plb.add_u64_counter_histogram(
268 l_librbd_pwl_append_tx_t_hist, "append_tx_lat_histogram",
269 op_hist_x_axis_config, op_hist_y_axis_count_config,
270 "Histogram of log append transaction time (nanoseconds) vs. entries appended");
271 plb.add_time_avg(l_librbd_pwl_retire_tx_t, "retire_tx_lat", "Log retire transaction latency");
272 plb.add_u64_counter_histogram(
273 l_librbd_pwl_retire_tx_t_hist, "retire_tx_lat_histogram",
274 op_hist_x_axis_config, op_hist_y_axis_count_config,
275 "Histogram of log retire transaction time (nanoseconds) vs. entries retired");
276
277 m_perfcounter = plb.create_perf_counters();
278 m_image_ctx.cct->get_perfcounters_collection()->add(m_perfcounter);
279}
280
281template <typename I>
282void AbstractWriteLog<I>::perf_stop() {
283 ceph_assert(m_perfcounter);
284 m_image_ctx.cct->get_perfcounters_collection()->remove(m_perfcounter);
285 delete m_perfcounter;
286}
287
288template <typename I>
289void AbstractWriteLog<I>::log_perf() {
290 bufferlist bl;
291 Formatter *f = Formatter::create("json-pretty");
292 bl.append("Perf dump follows\n--- Begin perf dump ---\n");
293 bl.append("{\n");
294 stringstream ss;
295 utime_t now = ceph_clock_now();
296 ss << "\"test_time\": \"" << now << "\",";
297 ss << "\"image\": \"" << m_image_ctx.name << "\",";
298 bl.append(ss);
299 bl.append("\"stats\": ");
300 m_image_ctx.cct->get_perfcounters_collection()->dump_formatted(f, 0);
301 f->flush(bl);
302 bl.append(",\n\"histograms\": ");
303 m_image_ctx.cct->get_perfcounters_collection()->dump_formatted_histograms(f, 0);
304 f->flush(bl);
305 delete f;
306 bl.append("}\n--- End perf dump ---\n");
307 bl.append('\0');
308 ldout(m_image_ctx.cct, 1) << bl.c_str() << dendl;
309}
310
311template <typename I>
312void AbstractWriteLog<I>::periodic_stats() {
313 std::lock_guard locker(m_lock);
a4b75251
TL
314 ldout(m_image_ctx.cct, 1) << "STATS: m_log_entries=" << m_log_entries.size()
315 << ", m_dirty_log_entries=" << m_dirty_log_entries.size()
316 << ", m_free_log_entries=" << m_free_log_entries
317 << ", m_bytes_allocated=" << m_bytes_allocated
318 << ", m_bytes_cached=" << m_bytes_cached
319 << ", m_bytes_dirty=" << m_bytes_dirty
320 << ", bytes available=" << m_bytes_allocated_cap - m_bytes_allocated
321 << ", m_first_valid_entry=" << m_first_valid_entry
322 << ", m_first_free_entry=" << m_first_free_entry
323 << ", m_current_sync_gen=" << m_current_sync_gen
324 << ", m_flushed_sync_gen=" << m_flushed_sync_gen
f67539c2
TL
325 << dendl;
326}
327
328template <typename I>
329void AbstractWriteLog<I>::arm_periodic_stats() {
330 ceph_assert(ceph_mutex_is_locked(*m_timer_lock));
331 if (m_periodic_stats_enabled) {
332 m_timer_ctx = new LambdaContext(
333 [this](int r) {
334 /* m_timer_lock is held */
335 periodic_stats();
336 arm_periodic_stats();
337 });
338 m_timer->add_event_after(LOG_STATS_INTERVAL_SECONDS, m_timer_ctx);
339 }
340}
341
342template <typename I>
343void AbstractWriteLog<I>::update_entries(std::shared_ptr<GenericLogEntry> *log_entry,
344 WriteLogCacheEntry *cache_entry, std::map<uint64_t, bool> &missing_sync_points,
345 std::map<uint64_t, std::shared_ptr<SyncPointLogEntry>> &sync_point_entries,
a4b75251 346 uint64_t entry_index) {
f67539c2
TL
347 bool writer = cache_entry->is_writer();
348 if (cache_entry->is_sync_point()) {
349 ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
350 << " is a sync point. cache_entry=[" << *cache_entry << "]" << dendl;
351 auto sync_point_entry = std::make_shared<SyncPointLogEntry>(cache_entry->sync_gen_number);
352 *log_entry = sync_point_entry;
353 sync_point_entries[cache_entry->sync_gen_number] = sync_point_entry;
354 missing_sync_points.erase(cache_entry->sync_gen_number);
355 m_current_sync_gen = cache_entry->sync_gen_number;
356 } else if (cache_entry->is_write()) {
357 ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
358 << " is a write. cache_entry=[" << *cache_entry << "]" << dendl;
359 auto write_entry =
360 m_builder->create_write_log_entry(nullptr, cache_entry->image_offset_bytes, cache_entry->write_bytes);
361 write_data_to_buffer(write_entry, cache_entry);
362 *log_entry = write_entry;
363 } else if (cache_entry->is_writesame()) {
364 ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
365 << " is a write same. cache_entry=[" << *cache_entry << "]" << dendl;
366 auto ws_entry =
367 m_builder->create_writesame_log_entry(nullptr, cache_entry->image_offset_bytes,
368 cache_entry->write_bytes, cache_entry->ws_datalen);
369 write_data_to_buffer(ws_entry, cache_entry);
370 *log_entry = ws_entry;
371 } else if (cache_entry->is_discard()) {
372 ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
373 << " is a discard. cache_entry=[" << *cache_entry << "]" << dendl;
374 auto discard_entry =
375 std::make_shared<DiscardLogEntry>(nullptr, cache_entry->image_offset_bytes, cache_entry->write_bytes,
376 m_discard_granularity_bytes);
377 *log_entry = discard_entry;
378 } else {
379 lderr(m_image_ctx.cct) << "Unexpected entry type in entry " << entry_index
380 << ", cache_entry=[" << *cache_entry << "]" << dendl;
381 }
382
383 if (writer) {
384 ldout(m_image_ctx.cct, 20) << "Entry " << entry_index
385 << " writes. cache_entry=[" << *cache_entry << "]" << dendl;
386 if (!sync_point_entries[cache_entry->sync_gen_number]) {
387 missing_sync_points[cache_entry->sync_gen_number] = true;
388 }
389 }
390}
391
392template <typename I>
393void AbstractWriteLog<I>::update_sync_points(std::map<uint64_t, bool> &missing_sync_points,
394 std::map<uint64_t, std::shared_ptr<SyncPointLogEntry>> &sync_point_entries,
a4b75251 395 DeferredContexts &later) {
f67539c2
TL
396 /* Create missing sync points. These must not be appended until the
397 * entry reload is complete and the write map is up to
398 * date. Currently this is handled by the deferred contexts object
399 * passed to new_sync_point(). These contexts won't be completed
400 * until this function returns. */
401 for (auto &kv : missing_sync_points) {
402 ldout(m_image_ctx.cct, 5) << "Adding sync point " << kv.first << dendl;
403 if (0 == m_current_sync_gen) {
404 /* The unlikely case where the log contains writing entries, but no sync
405 * points (e.g. because they were all retired) */
406 m_current_sync_gen = kv.first-1;
407 }
408 ceph_assert(kv.first == m_current_sync_gen+1);
409 init_flush_new_sync_point(later);
410 ceph_assert(kv.first == m_current_sync_gen);
411 sync_point_entries[kv.first] = m_current_sync_point->log_entry;;
412 }
413
414 /*
415 * Iterate over the log entries again (this time via the global
416 * entries list), connecting write entries to their sync points and
417 * updating the sync point stats.
418 *
419 * Add writes to the write log map.
420 */
421 std::shared_ptr<SyncPointLogEntry> previous_sync_point_entry = nullptr;
422 for (auto &log_entry : m_log_entries) {
423 if ((log_entry->write_bytes() > 0) || (log_entry->bytes_dirty() > 0)) {
424 /* This entry is one of the types that write */
425 auto gen_write_entry = static_pointer_cast<GenericWriteLogEntry>(log_entry);
426 if (gen_write_entry) {
427 auto sync_point_entry = sync_point_entries[gen_write_entry->ram_entry.sync_gen_number];
428 if (!sync_point_entry) {
429 lderr(m_image_ctx.cct) << "Sync point missing for entry=[" << *gen_write_entry << "]" << dendl;
430 ceph_assert(false);
431 } else {
432 gen_write_entry->sync_point_entry = sync_point_entry;
433 sync_point_entry->writes++;
434 sync_point_entry->bytes += gen_write_entry->ram_entry.write_bytes;
435 sync_point_entry->writes_completed++;
436 m_blocks_to_log_entries.add_log_entry(gen_write_entry);
437 /* This entry is only dirty if its sync gen number is > the flushed
438 * sync gen number from the root object. */
439 if (gen_write_entry->ram_entry.sync_gen_number > m_flushed_sync_gen) {
440 m_dirty_log_entries.push_back(log_entry);
441 m_bytes_dirty += gen_write_entry->bytes_dirty();
442 } else {
443 gen_write_entry->set_flushed(true);
444 sync_point_entry->writes_flushed++;
445 }
a4b75251
TL
446
447 /* calc m_bytes_allocated & m_bytes_cached */
448 inc_allocated_cached_bytes(log_entry);
f67539c2
TL
449 }
450 }
451 } else {
452 /* This entry is sync point entry */
453 auto sync_point_entry = static_pointer_cast<SyncPointLogEntry>(log_entry);
454 if (sync_point_entry) {
455 if (previous_sync_point_entry) {
456 previous_sync_point_entry->next_sync_point_entry = sync_point_entry;
457 if (previous_sync_point_entry->ram_entry.sync_gen_number > m_flushed_sync_gen) {
458 sync_point_entry->prior_sync_point_flushed = false;
459 ceph_assert(!previous_sync_point_entry->prior_sync_point_flushed ||
460 (0 == previous_sync_point_entry->writes) ||
461 (previous_sync_point_entry->writes >= previous_sync_point_entry->writes_flushed));
462 } else {
463 sync_point_entry->prior_sync_point_flushed = true;
464 ceph_assert(previous_sync_point_entry->prior_sync_point_flushed);
465 ceph_assert(previous_sync_point_entry->writes == previous_sync_point_entry->writes_flushed);
466 }
467 } else {
468 /* There are no previous sync points, so we'll consider them flushed */
469 sync_point_entry->prior_sync_point_flushed = true;
470 }
471 previous_sync_point_entry = sync_point_entry;
472 ldout(m_image_ctx.cct, 10) << "Loaded to sync point=[" << *sync_point_entry << dendl;
473 }
474 }
475 }
476 if (0 == m_current_sync_gen) {
477 /* If a re-opened log was completely flushed, we'll have found no sync point entries here,
478 * and not advanced m_current_sync_gen. Here we ensure it starts past the last flushed sync
479 * point recorded in the log. */
480 m_current_sync_gen = m_flushed_sync_gen;
481 }
482}
483
484template <typename I>
485void AbstractWriteLog<I>::pwl_init(Context *on_finish, DeferredContexts &later) {
486 CephContext *cct = m_image_ctx.cct;
487 ldout(cct, 20) << dendl;
488 ceph_assert(m_cache_state);
489 std::lock_guard locker(m_lock);
490 ceph_assert(!m_initialized);
491 ldout(cct,5) << "image name: " << m_image_ctx.name << " id: " << m_image_ctx.id << dendl;
492
493 if (!m_cache_state->present) {
494 m_cache_state->host = ceph_get_short_hostname();
495 m_cache_state->size = m_image_ctx.config.template get_val<uint64_t>(
496 "rbd_persistent_cache_size");
497
498 string path = m_image_ctx.config.template get_val<string>(
499 "rbd_persistent_cache_path");
500 std::string pool_name = m_image_ctx.md_ctx.get_pool_name();
501 m_cache_state->path = path + "/rbd-pwl." + pool_name + "." + m_image_ctx.id + ".pool";
502 }
503
504 ldout(cct,5) << "pwl_size: " << m_cache_state->size << dendl;
505 ldout(cct,5) << "pwl_path: " << m_cache_state->path << dendl;
506
507 m_log_pool_name = m_cache_state->path;
a4b75251
TL
508 m_log_pool_size = max(m_cache_state->size, MIN_POOL_SIZE);
509 m_log_pool_size = p2align(m_log_pool_size, POOL_SIZE_ALIGN);
510 ldout(cct, 5) << "pool " << m_log_pool_name << " size " << m_log_pool_size
511 << " (adjusted from " << m_cache_state->size << ")" << dendl;
f67539c2
TL
512
513 if ((!m_cache_state->present) &&
514 (access(m_log_pool_name.c_str(), F_OK) == 0)) {
515 ldout(cct, 5) << "There's an existing pool file " << m_log_pool_name
516 << ", While there's no cache in the image metatata." << dendl;
517 if (remove(m_log_pool_name.c_str()) != 0) {
518 lderr(cct) << "Failed to remove the pool file " << m_log_pool_name
519 << dendl;
520 on_finish->complete(-errno);
521 return;
522 } else {
523 ldout(cct, 5) << "Removed the existing pool file." << dendl;
524 }
525 } else if ((m_cache_state->present) &&
526 (access(m_log_pool_name.c_str(), F_OK) != 0)) {
527 ldout(cct, 5) << "Can't find the existed pool file " << m_log_pool_name << dendl;
528 on_finish->complete(-errno);
a4b75251 529 return;
f67539c2
TL
530 }
531
a4b75251
TL
532 bool succeeded = initialize_pool(on_finish, later);
533 if (!succeeded) {
534 return ;
535 }
f67539c2
TL
536
537 ldout(cct,1) << "pool " << m_log_pool_name << " has " << m_total_log_entries
538 << " log entries, " << m_free_log_entries << " of which are free."
539 << " first_valid=" << m_first_valid_entry
540 << ", first_free=" << m_first_free_entry
541 << ", flushed_sync_gen=" << m_flushed_sync_gen
542 << ", m_current_sync_gen=" << m_current_sync_gen << dendl;
543 if (m_first_free_entry == m_first_valid_entry) {
544 ldout(cct,1) << "write log is empty" << dendl;
545 m_cache_state->empty = true;
546 }
547
548 /* Start the sync point following the last one seen in the
549 * log. Flush the last sync point created during the loading of the
550 * existing log entries. */
551 init_flush_new_sync_point(later);
552 ldout(cct,20) << "new sync point = [" << m_current_sync_point << "]" << dendl;
553
554 m_initialized = true;
555 // Start the thread
556 m_thread_pool.start();
557
558 m_periodic_stats_enabled = m_cache_state->log_periodic_stats;
559 /* Do these after we drop lock */
560 later.add(new LambdaContext([this](int r) {
561 if (m_periodic_stats_enabled) {
562 /* Log stats for the first time */
563 periodic_stats();
564 /* Arm periodic stats logging for the first time */
565 std::lock_guard timer_locker(*m_timer_lock);
566 arm_periodic_stats();
567 }
568 }));
569 m_image_ctx.op_work_queue->queue(on_finish, 0);
570}
571
572template <typename I>
573void AbstractWriteLog<I>::update_image_cache_state(Context *on_finish) {
574 m_cache_state->write_image_cache_state(on_finish);
575}
576
577template <typename I>
578void AbstractWriteLog<I>::init(Context *on_finish) {
579 CephContext *cct = m_image_ctx.cct;
580 ldout(cct, 20) << dendl;
a4b75251
TL
581 auto pname = std::string("librbd-pwl-") + m_image_ctx.id +
582 std::string("-") + m_image_ctx.md_ctx.get_pool_name() +
583 std::string("-") + m_image_ctx.name;
584 perf_start(pname);
f67539c2
TL
585
586 ceph_assert(!m_initialized);
587
588 Context *ctx = new LambdaContext(
589 [this, on_finish](int r) {
590 if (r >= 0) {
591 update_image_cache_state(on_finish);
592 } else {
593 on_finish->complete(r);
594 }
595 });
596
597 DeferredContexts later;
598 pwl_init(ctx, later);
599}
600
601template <typename I>
602void AbstractWriteLog<I>::shut_down(Context *on_finish) {
603 CephContext *cct = m_image_ctx.cct;
604 ldout(cct, 20) << dendl;
605
606 ldout(cct,5) << "image name: " << m_image_ctx.name << " id: " << m_image_ctx.id << dendl;
607
608 Context *ctx = new LambdaContext(
609 [this, on_finish](int r) {
610 ldout(m_image_ctx.cct, 6) << "shutdown complete" << dendl;
611 m_image_ctx.op_work_queue->queue(on_finish, r);
612 });
613 ctx = new LambdaContext(
614 [this, ctx](int r) {
615 ldout(m_image_ctx.cct, 6) << "image cache cleaned" << dendl;
616 Context *next_ctx = override_ctx(r, ctx);
617 bool periodic_stats_enabled = m_periodic_stats_enabled;
618 m_periodic_stats_enabled = false;
619
620 if (periodic_stats_enabled) {
621 /* Log stats one last time if they were enabled */
622 periodic_stats();
623 }
624 {
625 std::lock_guard locker(m_lock);
a4b75251 626 check_image_cache_state_clean();
f67539c2
TL
627 m_wake_up_enabled = false;
628 m_cache_state->clean = true;
629 m_log_entries.clear();
630
631 remove_pool_file();
632
633 if (m_perfcounter) {
634 perf_stop();
635 }
636 }
637 update_image_cache_state(next_ctx);
638 });
a4b75251
TL
639 ctx = new LambdaContext(
640 [this, ctx](int r) {
641 Context *next_ctx = override_ctx(r, ctx);
642 ldout(m_image_ctx.cct, 6) << "waiting for in flight operations" << dendl;
643 // Wait for in progress IOs to complete
644 next_ctx = util::create_async_context_callback(&m_work_queue, next_ctx);
645 m_async_op_tracker.wait_for_ops(next_ctx);
646 });
f67539c2
TL
647 ctx = new LambdaContext(
648 [this, ctx](int r) {
649 Context *next_ctx = override_ctx(r, ctx);
650 {
651 /* Sync with process_writeback_dirty_entries() */
652 RWLock::WLocker entry_reader_wlocker(m_entry_reader_lock);
653 m_shutting_down = true;
654 /* Flush all writes to OSDs (unless disabled) and wait for all
655 in-progress flush writes to complete */
656 ldout(m_image_ctx.cct, 6) << "flushing" << dendl;
657 if (m_periodic_stats_enabled) {
658 periodic_stats();
659 }
660 }
661 flush_dirty_entries(next_ctx);
662 });
f67539c2
TL
663 ctx = new LambdaContext(
664 [this, ctx](int r) {
665 ldout(m_image_ctx.cct, 6) << "Done internal_flush in shutdown" << dendl;
666 m_work_queue.queue(ctx, r);
667 });
668 /* Complete all in-flight writes before shutting down */
669 ldout(m_image_ctx.cct, 6) << "internal_flush in shutdown" << dendl;
670 internal_flush(false, ctx);
671}
672
673template <typename I>
674void AbstractWriteLog<I>::read(Extents&& image_extents,
675 ceph::bufferlist* bl,
676 int fadvise_flags, Context *on_finish) {
677 CephContext *cct = m_image_ctx.cct;
678 utime_t now = ceph_clock_now();
679
680 on_finish = new LambdaContext(
681 [this, on_finish](int r) {
682 m_async_op_tracker.finish_op();
683 on_finish->complete(r);
684 });
685 C_ReadRequest *read_ctx = m_builder->create_read_request(
686 cct, now, m_perfcounter, bl, on_finish);
687 ldout(cct, 20) << "name: " << m_image_ctx.name << " id: " << m_image_ctx.id
688 << "image_extents=" << image_extents << ", "
689 << "bl=" << bl << ", "
690 << "on_finish=" << on_finish << dendl;
691
692 ceph_assert(m_initialized);
693 bl->clear();
694 m_perfcounter->inc(l_librbd_pwl_rd_req, 1);
695
a4b75251 696 std::vector<std::shared_ptr<GenericWriteLogEntry>> log_entries_to_read;
f67539c2
TL
697 std::vector<bufferlist*> bls_to_read;
698
699 m_async_op_tracker.start_op();
700 Context *ctx = new LambdaContext(
701 [this, read_ctx, fadvise_flags](int r) {
702 if (read_ctx->miss_extents.empty()) {
703 /* All of this read comes from RWL */
704 read_ctx->complete(0);
705 } else {
706 /* Pass the read misses on to the layer below RWL */
707 m_image_writeback.aio_read(
708 std::move(read_ctx->miss_extents), &read_ctx->miss_bl,
709 fadvise_flags, read_ctx);
710 }
711 });
712
713 /*
714 * The strategy here is to look up all the WriteLogMapEntries that overlap
715 * this read, and iterate through those to separate this read into hits and
716 * misses. A new Extents object is produced here with Extents for each miss
717 * region. The miss Extents is then passed on to the read cache below RWL. We
718 * also produce an ImageExtentBufs for all the extents (hit or miss) in this
719 * read. When the read from the lower cache layer completes, we iterate
720 * through the ImageExtentBufs and insert buffers for each cache hit at the
721 * appropriate spot in the bufferlist returned from below for the miss
722 * read. The buffers we insert here refer directly to regions of various
723 * write log entry data buffers.
724 *
725 * Locking: These buffer objects hold a reference on the write log entries
726 * they refer to. Log entries can't be retired until there are no references.
727 * The GenericWriteLogEntry references are released by the buffer destructor.
728 */
729 for (auto &extent : image_extents) {
730 uint64_t extent_offset = 0;
731 RWLock::RLocker entry_reader_locker(m_entry_reader_lock);
732 WriteLogMapEntries map_entries = m_blocks_to_log_entries.find_map_entries(
733 block_extent(extent));
734 for (auto &map_entry : map_entries) {
735 Extent entry_image_extent(pwl::image_extent(map_entry.block_extent));
736 /* If this map entry starts after the current image extent offset ... */
737 if (entry_image_extent.first > extent.first + extent_offset) {
738 /* ... add range before map_entry to miss extents */
739 uint64_t miss_extent_start = extent.first + extent_offset;
740 uint64_t miss_extent_length = entry_image_extent.first -
741 miss_extent_start;
742 Extent miss_extent(miss_extent_start, miss_extent_length);
743 read_ctx->miss_extents.push_back(miss_extent);
744 /* Add miss range to read extents */
745 auto miss_extent_buf = std::make_shared<ImageExtentBuf>(miss_extent);
746 read_ctx->read_extents.push_back(miss_extent_buf);
747 extent_offset += miss_extent_length;
748 }
749 ceph_assert(entry_image_extent.first <= extent.first + extent_offset);
750 uint64_t entry_offset = 0;
751 /* If this map entry starts before the current image extent offset ... */
752 if (entry_image_extent.first < extent.first + extent_offset) {
753 /* ... compute offset into log entry for this read extent */
754 entry_offset = (extent.first + extent_offset) - entry_image_extent.first;
755 }
756 /* This read hit ends at the end of the extent or the end of the log
757 entry, whichever is less. */
758 uint64_t entry_hit_length = min(entry_image_extent.second - entry_offset,
759 extent.second - extent_offset);
760 Extent hit_extent(entry_image_extent.first, entry_hit_length);
761 if (0 == map_entry.log_entry->write_bytes() &&
762 0 < map_entry.log_entry->bytes_dirty()) {
763 /* discard log entry */
764 ldout(cct, 20) << "discard log entry" << dendl;
765 auto discard_entry = map_entry.log_entry;
766 ldout(cct, 20) << "read hit on discard entry: log_entry="
767 << *discard_entry
768 << dendl;
769 /* Discards read as zero, so we'll construct a bufferlist of zeros */
770 bufferlist zero_bl;
771 zero_bl.append_zero(entry_hit_length);
772 /* Add hit extent to read extents */
773 auto hit_extent_buf = std::make_shared<ImageExtentBuf>(
774 hit_extent, zero_bl);
775 read_ctx->read_extents.push_back(hit_extent_buf);
776 } else {
777 ldout(cct, 20) << "write or writesame log entry" << dendl;
778 /* write and writesame log entry */
779 /* Offset of the map entry into the log entry's buffer */
780 uint64_t map_entry_buffer_offset = entry_image_extent.first -
781 map_entry.log_entry->ram_entry.image_offset_bytes;
782 /* Offset into the log entry buffer of this read hit */
783 uint64_t read_buffer_offset = map_entry_buffer_offset + entry_offset;
784 /* Create buffer object referring to pmem pool for this read hit */
785 collect_read_extents(
786 read_buffer_offset, map_entry, log_entries_to_read, bls_to_read,
787 entry_hit_length, hit_extent, read_ctx);
788 }
789 /* Exclude RWL hit range from buffer and extent */
790 extent_offset += entry_hit_length;
791 ldout(cct, 20) << map_entry << dendl;
792 }
793 /* If the last map entry didn't consume the entire image extent ... */
794 if (extent.second > extent_offset) {
795 /* ... add the rest of this extent to miss extents */
796 uint64_t miss_extent_start = extent.first + extent_offset;
797 uint64_t miss_extent_length = extent.second - extent_offset;
798 Extent miss_extent(miss_extent_start, miss_extent_length);
799 read_ctx->miss_extents.push_back(miss_extent);
800 /* Add miss range to read extents */
801 auto miss_extent_buf = std::make_shared<ImageExtentBuf>(miss_extent);
802 read_ctx->read_extents.push_back(miss_extent_buf);
803 extent_offset += miss_extent_length;
804 }
805 }
806
807 ldout(cct, 20) << "miss_extents=" << read_ctx->miss_extents << ", "
808 << "miss_bl=" << read_ctx->miss_bl << dendl;
809
810 complete_read(log_entries_to_read, bls_to_read, ctx);
811}
812
813template <typename I>
814void AbstractWriteLog<I>::write(Extents &&image_extents,
815 bufferlist&& bl,
816 int fadvise_flags,
817 Context *on_finish) {
818 CephContext *cct = m_image_ctx.cct;
819
820 ldout(cct, 20) << "aio_write" << dendl;
821
822 utime_t now = ceph_clock_now();
823 m_perfcounter->inc(l_librbd_pwl_wr_req, 1);
824
825 ceph_assert(m_initialized);
826
827 /* Split images because PMDK's space management is not perfect, there are
828 * fragment problems. The larger the block size difference of the block,
829 * the easier the fragmentation problem will occur, resulting in the
830 * remaining space can not be allocated in large size. We plan to manage
831 * pmem space and allocation by ourselves in the future.
832 */
833 Extents split_image_extents;
834 uint64_t max_extent_size = get_max_extent();
835 if (max_extent_size != 0) {
836 for (auto extent : image_extents) {
837 if (extent.second > max_extent_size) {
838 uint64_t off = extent.first;
839 uint64_t extent_bytes = extent.second;
840 for (int i = 0; extent_bytes != 0; ++i) {
841 Extent _ext;
842 _ext.first = off + i * max_extent_size;
843 _ext.second = std::min(max_extent_size, extent_bytes);
844 extent_bytes = extent_bytes - _ext.second ;
845 split_image_extents.emplace_back(_ext);
846 }
847 } else {
848 split_image_extents.emplace_back(extent);
849 }
850 }
851 } else {
852 split_image_extents = image_extents;
853 }
854
855 C_WriteRequestT *write_req =
856 m_builder->create_write_request(*this, now, std::move(split_image_extents),
857 std::move(bl), fadvise_flags, m_lock,
858 m_perfcounter, on_finish);
859 m_perfcounter->inc(l_librbd_pwl_wr_bytes,
860 write_req->image_extents_summary.total_bytes);
861
862 /* The lambda below will be called when the block guard for all
863 * blocks affected by this write is obtained */
864 GuardedRequestFunctionContext *guarded_ctx =
865 new GuardedRequestFunctionContext([this,
866 write_req](GuardedRequestFunctionContext &guard_ctx) {
867 write_req->blockguard_acquired(guard_ctx);
868 alloc_and_dispatch_io_req(write_req);
869 });
870
871 detain_guarded_request(write_req, guarded_ctx, false);
872}
873
874template <typename I>
875void AbstractWriteLog<I>::discard(uint64_t offset, uint64_t length,
876 uint32_t discard_granularity_bytes,
877 Context *on_finish) {
878 CephContext *cct = m_image_ctx.cct;
879
880 ldout(cct, 20) << dendl;
881
882 utime_t now = ceph_clock_now();
883 m_perfcounter->inc(l_librbd_pwl_discard, 1);
884 Extents discard_extents = {{offset, length}};
885 m_discard_granularity_bytes = discard_granularity_bytes;
886
887 ceph_assert(m_initialized);
888
889 auto *discard_req =
890 new C_DiscardRequestT(*this, now, std::move(discard_extents), discard_granularity_bytes,
891 m_lock, m_perfcounter, on_finish);
892
893 /* The lambda below will be called when the block guard for all
894 * blocks affected by this write is obtained */
895 GuardedRequestFunctionContext *guarded_ctx =
896 new GuardedRequestFunctionContext([this, discard_req](GuardedRequestFunctionContext &guard_ctx) {
897 discard_req->blockguard_acquired(guard_ctx);
898 alloc_and_dispatch_io_req(discard_req);
899 });
900
901 detain_guarded_request(discard_req, guarded_ctx, false);
902}
903
904/**
905 * Aio_flush completes when all previously completed writes are
906 * flushed to persistent cache. We make a best-effort attempt to also
907 * defer until all in-progress writes complete, but we may not know
908 * about all of the writes the application considers in-progress yet,
909 * due to uncertainty in the IO submission workq (multiple WQ threads
910 * may allow out-of-order submission).
911 *
912 * This flush operation will not wait for writes deferred for overlap
913 * in the block guard.
914 */
915template <typename I>
916void AbstractWriteLog<I>::flush(io::FlushSource flush_source, Context *on_finish) {
917 CephContext *cct = m_image_ctx.cct;
918 ldout(cct, 20) << "on_finish=" << on_finish << " flush_source=" << flush_source << dendl;
919
920 if (io::FLUSH_SOURCE_SHUTDOWN == flush_source || io::FLUSH_SOURCE_INTERNAL == flush_source ||
921 io::FLUSH_SOURCE_WRITE_BLOCK == flush_source) {
922 internal_flush(false, on_finish);
923 return;
924 }
925 m_perfcounter->inc(l_librbd_pwl_aio_flush, 1);
926
927 /* May be called even if initialization fails */
928 if (!m_initialized) {
929 ldout(cct, 05) << "never initialized" << dendl;
930 /* Deadlock if completed here */
931 m_image_ctx.op_work_queue->queue(on_finish, 0);
932 return;
933 }
934
935 {
936 std::shared_lock image_locker(m_image_ctx.image_lock);
937 if (m_image_ctx.snap_id != CEPH_NOSNAP || m_image_ctx.read_only) {
938 on_finish->complete(-EROFS);
939 return;
940 }
941 }
942
943 auto flush_req = make_flush_req(on_finish);
944
945 GuardedRequestFunctionContext *guarded_ctx =
946 new GuardedRequestFunctionContext([this, flush_req](GuardedRequestFunctionContext &guard_ctx) {
947 ldout(m_image_ctx.cct, 20) << "flush_req=" << flush_req << " cell=" << guard_ctx.cell << dendl;
948 ceph_assert(guard_ctx.cell);
949 flush_req->detained = guard_ctx.state.detained;
950 /* We don't call flush_req->set_cell(), because the block guard will be released here */
951 {
952 DeferredContexts post_unlock; /* Do these when the lock below is released */
953 std::lock_guard locker(m_lock);
954
955 if (!m_persist_on_flush && m_persist_on_write_until_flush) {
956 m_persist_on_flush = true;
957 ldout(m_image_ctx.cct, 5) << "now persisting on flush" << dendl;
958 }
959
960 /*
961 * Create a new sync point if there have been writes since the last
962 * one.
963 *
964 * We do not flush the caches below the RWL here.
965 */
966 flush_new_sync_point_if_needed(flush_req, post_unlock);
967 }
968
969 release_guarded_request(guard_ctx.cell);
970 });
971
972 detain_guarded_request(flush_req, guarded_ctx, true);
973}
974
975template <typename I>
976void AbstractWriteLog<I>::writesame(uint64_t offset, uint64_t length,
977 bufferlist&& bl, int fadvise_flags,
978 Context *on_finish) {
979 CephContext *cct = m_image_ctx.cct;
980
981 ldout(cct, 20) << "aio_writesame" << dendl;
982
983 utime_t now = ceph_clock_now();
984 Extents ws_extents = {{offset, length}};
985 m_perfcounter->inc(l_librbd_pwl_ws, 1);
986 ceph_assert(m_initialized);
987
988 /* A write same request is also a write request. The key difference is the
989 * write same data buffer is shorter than the extent of the request. The full
990 * extent will be used in the block guard, and appear in
991 * m_blocks_to_log_entries_map. The data buffer allocated for the WS is only
992 * as long as the length of the bl here, which is the pattern that's repeated
993 * in the image for the entire length of this WS. Read hits and flushing of
994 * write sames are different than normal writes. */
995 C_WriteSameRequestT *ws_req =
996 m_builder->create_writesame_request(*this, now, std::move(ws_extents), std::move(bl),
997 fadvise_flags, m_lock, m_perfcounter, on_finish);
998 m_perfcounter->inc(l_librbd_pwl_ws_bytes, ws_req->image_extents_summary.total_bytes);
999
1000 /* The lambda below will be called when the block guard for all
1001 * blocks affected by this write is obtained */
1002 GuardedRequestFunctionContext *guarded_ctx =
1003 new GuardedRequestFunctionContext([this, ws_req](GuardedRequestFunctionContext &guard_ctx) {
1004 ws_req->blockguard_acquired(guard_ctx);
1005 alloc_and_dispatch_io_req(ws_req);
1006 });
1007
1008 detain_guarded_request(ws_req, guarded_ctx, false);
1009}
1010
1011template <typename I>
1012void AbstractWriteLog<I>::compare_and_write(Extents &&image_extents,
1013 bufferlist&& cmp_bl,
1014 bufferlist&& bl,
1015 uint64_t *mismatch_offset,
1016 int fadvise_flags,
1017 Context *on_finish) {
1018 ldout(m_image_ctx.cct, 20) << dendl;
1019
1020 utime_t now = ceph_clock_now();
1021 m_perfcounter->inc(l_librbd_pwl_cmp, 1);
1022 ceph_assert(m_initialized);
1023
1024 /* A compare and write request is also a write request. We only allocate
1025 * resources and dispatch this write request if the compare phase
1026 * succeeds. */
1027 C_WriteRequestT *cw_req =
1028 m_builder->create_comp_and_write_request(
1029 *this, now, std::move(image_extents), std::move(cmp_bl), std::move(bl),
1030 mismatch_offset, fadvise_flags, m_lock, m_perfcounter, on_finish);
1031 m_perfcounter->inc(l_librbd_pwl_cmp_bytes, cw_req->image_extents_summary.total_bytes);
1032
1033 /* The lambda below will be called when the block guard for all
1034 * blocks affected by this write is obtained */
1035 GuardedRequestFunctionContext *guarded_ctx =
1036 new GuardedRequestFunctionContext([this, cw_req](GuardedRequestFunctionContext &guard_ctx) {
1037 cw_req->blockguard_acquired(guard_ctx);
1038
1039 auto read_complete_ctx = new LambdaContext(
1040 [this, cw_req](int r) {
1041 ldout(m_image_ctx.cct, 20) << "name: " << m_image_ctx.name << " id: " << m_image_ctx.id
1042 << "cw_req=" << cw_req << dendl;
1043
1044 /* Compare read_bl to cmp_bl to determine if this will produce a write */
1045 buffer::list aligned_read_bl;
1046 if (cw_req->cmp_bl.length() < cw_req->read_bl.length()) {
1047 aligned_read_bl.substr_of(cw_req->read_bl, 0, cw_req->cmp_bl.length());
1048 }
1049 if (cw_req->cmp_bl.contents_equal(cw_req->read_bl) ||
1050 cw_req->cmp_bl.contents_equal(aligned_read_bl)) {
1051 /* Compare phase succeeds. Begin write */
1052 ldout(m_image_ctx.cct, 5) << " cw_req=" << cw_req << " compare matched" << dendl;
1053 cw_req->compare_succeeded = true;
1054 *cw_req->mismatch_offset = 0;
1055 /* Continue with this request as a write. Blockguard release and
1056 * user request completion handled as if this were a plain
1057 * write. */
1058 alloc_and_dispatch_io_req(cw_req);
1059 } else {
1060 /* Compare phase fails. Comp-and write ends now. */
1061 ldout(m_image_ctx.cct, 15) << " cw_req=" << cw_req << " compare failed" << dendl;
1062 /* Bufferlist doesn't tell us where they differed, so we'll have to determine that here */
1063 uint64_t bl_index = 0;
1064 for (bl_index = 0; bl_index < cw_req->cmp_bl.length(); bl_index++) {
1065 if (cw_req->cmp_bl[bl_index] != cw_req->read_bl[bl_index]) {
1066 ldout(m_image_ctx.cct, 15) << " cw_req=" << cw_req << " mismatch at " << bl_index << dendl;
1067 break;
1068 }
1069 }
1070 cw_req->compare_succeeded = false;
1071 *cw_req->mismatch_offset = bl_index;
1072 cw_req->complete_user_request(-EILSEQ);
1073 cw_req->release_cell();
1074 cw_req->complete(0);
1075 }
1076 });
1077
1078 /* Read phase of comp-and-write must read through RWL */
1079 Extents image_extents_copy = cw_req->image_extents;
1080 read(std::move(image_extents_copy), &cw_req->read_bl, cw_req->fadvise_flags, read_complete_ctx);
1081 });
1082
1083 detain_guarded_request(cw_req, guarded_ctx, false);
1084}
1085
1086template <typename I>
1087void AbstractWriteLog<I>::flush(Context *on_finish) {
1088 internal_flush(false, on_finish);
1089}
1090
1091template <typename I>
1092void AbstractWriteLog<I>::invalidate(Context *on_finish) {
1093 internal_flush(true, on_finish);
1094}
1095
1096template <typename I>
1097CephContext *AbstractWriteLog<I>::get_context() {
1098 return m_image_ctx.cct;
1099}
1100
1101template <typename I>
1102BlockGuardCell* AbstractWriteLog<I>::detain_guarded_request_helper(GuardedRequest &req)
1103{
1104 CephContext *cct = m_image_ctx.cct;
1105 BlockGuardCell *cell;
1106
1107 ceph_assert(ceph_mutex_is_locked_by_me(m_blockguard_lock));
1108 ldout(cct, 20) << dendl;
1109
1110 int r = m_write_log_guard.detain(req.block_extent, &req, &cell);
1111 ceph_assert(r>=0);
1112 if (r > 0) {
1113 ldout(cct, 20) << "detaining guarded request due to in-flight requests: "
1114 << "req=" << req << dendl;
1115 return nullptr;
1116 }
1117
1118 ldout(cct, 20) << "in-flight request cell: " << cell << dendl;
1119 return cell;
1120}
1121
1122template <typename I>
1123BlockGuardCell* AbstractWriteLog<I>::detain_guarded_request_barrier_helper(
1124 GuardedRequest &req)
1125{
1126 BlockGuardCell *cell = nullptr;
1127
1128 ceph_assert(ceph_mutex_is_locked_by_me(m_blockguard_lock));
1129 ldout(m_image_ctx.cct, 20) << dendl;
1130
1131 if (m_barrier_in_progress) {
1132 req.guard_ctx->state.queued = true;
1133 m_awaiting_barrier.push_back(req);
1134 } else {
1135 bool barrier = req.guard_ctx->state.barrier;
1136 if (barrier) {
1137 m_barrier_in_progress = true;
1138 req.guard_ctx->state.current_barrier = true;
1139 }
1140 cell = detain_guarded_request_helper(req);
1141 if (barrier) {
1142 /* Only non-null if the barrier acquires the guard now */
1143 m_barrier_cell = cell;
1144 }
1145 }
1146
1147 return cell;
1148}
1149
1150template <typename I>
1151void AbstractWriteLog<I>::detain_guarded_request(
1152 C_BlockIORequestT *request,
1153 GuardedRequestFunctionContext *guarded_ctx,
1154 bool is_barrier)
1155{
1156 BlockExtent extent;
1157 if (request) {
1158 extent = request->image_extents_summary.block_extent();
1159 } else {
1160 extent = block_extent(whole_volume_extent());
1161 }
1162 auto req = GuardedRequest(extent, guarded_ctx, is_barrier);
1163 BlockGuardCell *cell = nullptr;
1164
1165 ldout(m_image_ctx.cct, 20) << dendl;
1166 {
1167 std::lock_guard locker(m_blockguard_lock);
1168 cell = detain_guarded_request_barrier_helper(req);
1169 }
1170 if (cell) {
1171 req.guard_ctx->cell = cell;
1172 req.guard_ctx->complete(0);
1173 }
1174}
1175
1176template <typename I>
1177void AbstractWriteLog<I>::release_guarded_request(BlockGuardCell *released_cell)
1178{
1179 CephContext *cct = m_image_ctx.cct;
1180 WriteLogGuard::BlockOperations block_reqs;
1181 ldout(cct, 20) << "released_cell=" << released_cell << dendl;
1182
1183 {
1184 std::lock_guard locker(m_blockguard_lock);
1185 m_write_log_guard.release(released_cell, &block_reqs);
1186
1187 for (auto &req : block_reqs) {
1188 req.guard_ctx->state.detained = true;
1189 BlockGuardCell *detained_cell = detain_guarded_request_helper(req);
1190 if (detained_cell) {
1191 if (req.guard_ctx->state.current_barrier) {
1192 /* The current barrier is acquiring the block guard, so now we know its cell */
1193 m_barrier_cell = detained_cell;
1194 /* detained_cell could be == released_cell here */
1195 ldout(cct, 20) << "current barrier cell=" << detained_cell << " req=" << req << dendl;
1196 }
1197 req.guard_ctx->cell = detained_cell;
1198 m_work_queue.queue(req.guard_ctx);
1199 }
1200 }
1201
1202 if (m_barrier_in_progress && (released_cell == m_barrier_cell)) {
1203 ldout(cct, 20) << "current barrier released cell=" << released_cell << dendl;
1204 /* The released cell is the current barrier request */
1205 m_barrier_in_progress = false;
1206 m_barrier_cell = nullptr;
1207 /* Move waiting requests into the blockguard. Stop if there's another barrier */
1208 while (!m_barrier_in_progress && !m_awaiting_barrier.empty()) {
1209 auto &req = m_awaiting_barrier.front();
1210 ldout(cct, 20) << "submitting queued request to blockguard: " << req << dendl;
1211 BlockGuardCell *detained_cell = detain_guarded_request_barrier_helper(req);
1212 if (detained_cell) {
1213 req.guard_ctx->cell = detained_cell;
1214 m_work_queue.queue(req.guard_ctx);
1215 }
1216 m_awaiting_barrier.pop_front();
1217 }
1218 }
1219 }
1220
1221 ldout(cct, 20) << "exit" << dendl;
1222}
1223
1224template <typename I>
1225void AbstractWriteLog<I>::append_scheduled(GenericLogOperations &ops, bool &ops_remain,
1226 bool &appending, bool isRWL)
1227{
1228 const unsigned long int OPS_APPENDED = isRWL ? MAX_ALLOC_PER_TRANSACTION
1229 : MAX_WRITES_PER_SYNC_POINT;
1230 {
1231 std::lock_guard locker(m_lock);
1232 if (!appending && m_appending) {
1233 /* Another thread is appending */
1234 ldout(m_image_ctx.cct, 15) << "Another thread is appending" << dendl;
1235 return;
1236 }
1237 if (m_ops_to_append.size()) {
1238 appending = true;
1239 m_appending = true;
1240 auto last_in_batch = m_ops_to_append.begin();
1241 unsigned int ops_to_append = m_ops_to_append.size();
1242 if (ops_to_append > OPS_APPENDED) {
1243 ops_to_append = OPS_APPENDED;
1244 }
1245 std::advance(last_in_batch, ops_to_append);
1246 ops.splice(ops.end(), m_ops_to_append, m_ops_to_append.begin(), last_in_batch);
1247 ops_remain = true; /* Always check again before leaving */
1248 ldout(m_image_ctx.cct, 20) << "appending " << ops.size() << ", "
1249 << m_ops_to_append.size() << " remain" << dendl;
1250 } else if (isRWL) {
1251 ops_remain = false;
1252 if (appending) {
1253 appending = false;
1254 m_appending = false;
1255 }
1256 }
1257 }
1258}
1259
1260template <typename I>
a4b75251 1261void AbstractWriteLog<I>::schedule_append(GenericLogOperationsVector &ops, C_BlockIORequestT *req)
f67539c2
TL
1262{
1263 GenericLogOperations to_append(ops.begin(), ops.end());
1264
a4b75251 1265 schedule_append_ops(to_append, req);
f67539c2
TL
1266}
1267
1268template <typename I>
a4b75251 1269void AbstractWriteLog<I>::schedule_append(GenericLogOperationSharedPtr op, C_BlockIORequestT *req)
f67539c2
TL
1270{
1271 GenericLogOperations to_append { op };
1272
a4b75251 1273 schedule_append_ops(to_append, req);
f67539c2
TL
1274}
1275
1276/*
1277 * Complete a set of write ops with the result of append_op_entries.
1278 */
1279template <typename I>
1280void AbstractWriteLog<I>::complete_op_log_entries(GenericLogOperations &&ops,
1281 const int result)
1282{
1283 GenericLogEntries dirty_entries;
1284 int published_reserves = 0;
1285 ldout(m_image_ctx.cct, 20) << __func__ << ": completing" << dendl;
1286 for (auto &op : ops) {
1287 utime_t now = ceph_clock_now();
1288 auto log_entry = op->get_log_entry();
1289 log_entry->completed = true;
1290 if (op->is_writing_op()) {
1291 op->mark_log_entry_completed();
1292 dirty_entries.push_back(log_entry);
1293 }
1294 if (log_entry->is_write_entry()) {
1295 release_ram(log_entry);
1296 }
1297 if (op->reserved_allocated()) {
1298 published_reserves++;
1299 }
1300 {
1301 std::lock_guard locker(m_lock);
1302 m_unpublished_reserves -= published_reserves;
1303 m_dirty_log_entries.splice(m_dirty_log_entries.end(), dirty_entries);
1304 }
1305 op->complete(result);
1306 m_perfcounter->tinc(l_librbd_pwl_log_op_dis_to_app_t,
a4b75251 1307 op->log_append_start_time - op->dispatch_time);
f67539c2
TL
1308 m_perfcounter->tinc(l_librbd_pwl_log_op_dis_to_cmp_t, now - op->dispatch_time);
1309 m_perfcounter->hinc(l_librbd_pwl_log_op_dis_to_cmp_t_hist,
1310 utime_t(now - op->dispatch_time).to_nsec(),
1311 log_entry->ram_entry.write_bytes);
a4b75251 1312 utime_t app_lat = op->log_append_comp_time - op->log_append_start_time;
f67539c2
TL
1313 m_perfcounter->tinc(l_librbd_pwl_log_op_app_to_appc_t, app_lat);
1314 m_perfcounter->hinc(l_librbd_pwl_log_op_app_to_appc_t_hist, app_lat.to_nsec(),
1315 log_entry->ram_entry.write_bytes);
a4b75251 1316 m_perfcounter->tinc(l_librbd_pwl_log_op_app_to_cmp_t, now - op->log_append_start_time);
f67539c2
TL
1317 }
1318 // New entries may be flushable
1319 {
1320 std::lock_guard locker(m_lock);
1321 wake_up();
1322 }
1323}
1324
1325/**
1326 * Dispatch as many deferred writes as possible
1327 */
1328template <typename I>
1329void AbstractWriteLog<I>::dispatch_deferred_writes(void)
1330{
1331 C_BlockIORequestT *front_req = nullptr; /* req still on front of deferred list */
1332 C_BlockIORequestT *allocated_req = nullptr; /* req that was allocated, and is now off the list */
1333 bool allocated = false; /* front_req allocate succeeded */
1334 bool cleared_dispatching_flag = false;
1335
1336 /* If we can't become the dispatcher, we'll exit */
1337 {
1338 std::lock_guard locker(m_lock);
1339 if (m_dispatching_deferred_ops ||
1340 !m_deferred_ios.size()) {
1341 return;
1342 }
1343 m_dispatching_deferred_ops = true;
1344 }
1345
1346 /* There are ops to dispatch, and this should be the only thread dispatching them */
1347 {
1348 std::lock_guard deferred_dispatch(m_deferred_dispatch_lock);
1349 do {
1350 {
1351 std::lock_guard locker(m_lock);
1352 ceph_assert(m_dispatching_deferred_ops);
1353 if (allocated) {
1354 /* On the 2..n-1 th time we get lock, front_req->alloc_resources() will
1355 * have succeeded, and we'll need to pop it off the deferred ops list
1356 * here. */
1357 ceph_assert(front_req);
1358 ceph_assert(!allocated_req);
1359 m_deferred_ios.pop_front();
1360 allocated_req = front_req;
1361 front_req = nullptr;
1362 allocated = false;
1363 }
1364 ceph_assert(!allocated);
1365 if (!allocated && front_req) {
1366 /* front_req->alloc_resources() failed on the last iteration.
1367 * We'll stop dispatching. */
1368 wake_up();
1369 front_req = nullptr;
1370 ceph_assert(!cleared_dispatching_flag);
1371 m_dispatching_deferred_ops = false;
1372 cleared_dispatching_flag = true;
1373 } else {
1374 ceph_assert(!front_req);
1375 if (m_deferred_ios.size()) {
1376 /* New allocation candidate */
1377 front_req = m_deferred_ios.front();
1378 } else {
1379 ceph_assert(!cleared_dispatching_flag);
1380 m_dispatching_deferred_ops = false;
1381 cleared_dispatching_flag = true;
1382 }
1383 }
1384 }
1385 /* Try allocating for front_req before we decide what to do with allocated_req
1386 * (if any) */
1387 if (front_req) {
1388 ceph_assert(!cleared_dispatching_flag);
1389 allocated = front_req->alloc_resources();
1390 }
1391 if (allocated_req && front_req && allocated) {
1392 /* Push dispatch of the first allocated req to a wq */
1393 m_work_queue.queue(new LambdaContext(
1394 [this, allocated_req](int r) {
1395 allocated_req->dispatch();
1396 }), 0);
1397 allocated_req = nullptr;
1398 }
1399 ceph_assert(!(allocated_req && front_req && allocated));
1400
1401 /* Continue while we're still considering the front of the deferred ops list */
1402 } while (front_req);
1403 ceph_assert(!allocated);
1404 }
1405 ceph_assert(cleared_dispatching_flag);
1406
1407 /* If any deferred requests were allocated, the last one will still be in allocated_req */
1408 if (allocated_req) {
1409 allocated_req->dispatch();
1410 }
1411}
1412
1413/**
1414 * Returns the lanes used by this write, and attempts to dispatch the next
1415 * deferred write
1416 */
1417template <typename I>
1418void AbstractWriteLog<I>::release_write_lanes(C_BlockIORequestT *req)
1419{
1420 {
1421 std::lock_guard locker(m_lock);
1422 m_free_lanes += req->image_extents.size();
1423 }
1424 dispatch_deferred_writes();
1425}
1426
1427/**
1428 * Attempts to allocate log resources for a write. Write is dispatched if
1429 * resources are available, or queued if they aren't.
1430 */
1431template <typename I>
1432void AbstractWriteLog<I>::alloc_and_dispatch_io_req(C_BlockIORequestT *req)
1433{
1434 bool dispatch_here = false;
1435
1436 {
1437 /* If there are already deferred writes, queue behind them for resources */
1438 {
1439 std::lock_guard locker(m_lock);
1440 dispatch_here = m_deferred_ios.empty();
1441 // Only flush req's total_bytes is the max uint64
a4b75251
TL
1442 if (req->image_extents_summary.total_bytes ==
1443 std::numeric_limits<uint64_t>::max() &&
1444 static_cast<C_FlushRequestT *>(req)->internal == true) {
f67539c2
TL
1445 dispatch_here = true;
1446 }
1447 }
1448 if (dispatch_here) {
1449 dispatch_here = req->alloc_resources();
1450 }
1451 if (dispatch_here) {
1452 ldout(m_image_ctx.cct, 20) << "dispatching" << dendl;
1453 req->dispatch();
1454 } else {
1455 req->deferred();
1456 {
1457 std::lock_guard locker(m_lock);
1458 m_deferred_ios.push_back(req);
1459 }
1460 ldout(m_image_ctx.cct, 20) << "deferred IOs: " << m_deferred_ios.size() << dendl;
1461 dispatch_deferred_writes();
1462 }
1463 }
1464}
1465
1466template <typename I>
a4b75251
TL
1467bool AbstractWriteLog<I>::check_allocation(
1468 C_BlockIORequestT *req, uint64_t bytes_cached, uint64_t bytes_dirtied,
1469 uint64_t bytes_allocated, uint32_t num_lanes, uint32_t num_log_entries,
1470 uint32_t num_unpublished_reserves) {
f67539c2
TL
1471 bool alloc_succeeds = true;
1472 bool no_space = false;
1473 {
1474 std::lock_guard locker(m_lock);
1475 if (m_free_lanes < num_lanes) {
1476 req->set_io_waited_for_lanes(true);
1477 ldout(m_image_ctx.cct, 20) << "not enough free lanes (need "
1478 << num_lanes
1479 << ", have " << m_free_lanes << ") "
1480 << *req << dendl;
1481 alloc_succeeds = false;
1482 /* This isn't considered a "no space" alloc fail. Lanes are a throttling mechanism. */
1483 }
1484 if (m_free_log_entries < num_log_entries) {
1485 req->set_io_waited_for_entries(true);
1486 ldout(m_image_ctx.cct, 20) << "not enough free entries (need "
1487 << num_log_entries
1488 << ", have " << m_free_log_entries << ") "
1489 << *req << dendl;
1490 alloc_succeeds = false;
1491 no_space = true; /* Entries must be retired */
1492 }
1493 /* Don't attempt buffer allocate if we've exceeded the "full" threshold */
a4b75251 1494 if (m_bytes_allocated + bytes_allocated > m_bytes_allocated_cap) {
f67539c2
TL
1495 if (!req->has_io_waited_for_buffers()) {
1496 req->set_io_waited_for_buffers(true);
a4b75251
TL
1497 ldout(m_image_ctx.cct, 5) << "Waiting for allocation cap (cap="
1498 << m_bytes_allocated_cap
f67539c2
TL
1499 << ", allocated=" << m_bytes_allocated
1500 << ") in write [" << *req << "]" << dendl;
1501 }
1502 alloc_succeeds = false;
1503 no_space = true; /* Entries must be retired */
1504 }
1505 }
1506
1507 if (alloc_succeeds) {
1508 reserve_cache(req, alloc_succeeds, no_space);
1509 }
1510
1511 if (alloc_succeeds) {
1512 std::lock_guard locker(m_lock);
1513 /* We need one free log entry per extent (each is a separate entry), and
1514 * one free "lane" for remote replication. */
1515 if ((m_free_lanes >= num_lanes) &&
a4b75251
TL
1516 (m_free_log_entries >= num_log_entries) &&
1517 (m_bytes_allocated_cap >= m_bytes_allocated + bytes_allocated)) {
f67539c2
TL
1518 m_free_lanes -= num_lanes;
1519 m_free_log_entries -= num_log_entries;
1520 m_unpublished_reserves += num_unpublished_reserves;
1521 m_bytes_allocated += bytes_allocated;
1522 m_bytes_cached += bytes_cached;
1523 m_bytes_dirty += bytes_dirtied;
1524 if (req->has_io_waited_for_buffers()) {
1525 req->set_io_waited_for_buffers(false);
1526 }
f67539c2
TL
1527 } else {
1528 alloc_succeeds = false;
1529 }
1530 }
1531
1532 if (!alloc_succeeds && no_space) {
1533 /* Expedite flushing and/or retiring */
1534 std::lock_guard locker(m_lock);
1535 m_alloc_failed_since_retire = true;
1536 m_last_alloc_fail = ceph_clock_now();
1537 }
1538
1539 return alloc_succeeds;
1540}
1541
1542template <typename I>
1543C_FlushRequest<AbstractWriteLog<I>>* AbstractWriteLog<I>::make_flush_req(Context *on_finish) {
1544 utime_t flush_begins = ceph_clock_now();
1545 bufferlist bl;
1546 auto *flush_req =
1547 new C_FlushRequestT(*this, flush_begins, Extents({whole_volume_extent()}),
1548 std::move(bl), 0, m_lock, m_perfcounter, on_finish);
1549
1550 return flush_req;
1551}
1552
1553template <typename I>
1554void AbstractWriteLog<I>::wake_up() {
1555 CephContext *cct = m_image_ctx.cct;
1556 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1557
1558 if (!m_wake_up_enabled) {
1559 // wake_up is disabled during shutdown after flushing completes
1560 ldout(m_image_ctx.cct, 6) << "deferred processing disabled" << dendl;
1561 return;
1562 }
1563
1564 if (m_wake_up_requested && m_wake_up_scheduled) {
1565 return;
1566 }
1567
1568 ldout(cct, 20) << dendl;
1569
1570 /* Wake-up can be requested while it's already scheduled */
1571 m_wake_up_requested = true;
1572
1573 /* Wake-up cannot be scheduled if it's already scheduled */
1574 if (m_wake_up_scheduled) {
1575 return;
1576 }
1577 m_wake_up_scheduled = true;
1578 m_async_process_work++;
1579 m_async_op_tracker.start_op();
1580 m_work_queue.queue(new LambdaContext(
1581 [this](int r) {
1582 process_work();
1583 m_async_op_tracker.finish_op();
1584 m_async_process_work--;
1585 }), 0);
1586}
1587
1588template <typename I>
1589bool AbstractWriteLog<I>::can_flush_entry(std::shared_ptr<GenericLogEntry> log_entry) {
1590 CephContext *cct = m_image_ctx.cct;
1591
1592 ldout(cct, 20) << "" << dendl;
1593 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1594
1595 if (m_invalidating) {
1596 return true;
1597 }
1598
1599 /* For OWB we can flush entries with the same sync gen number (write between
1600 * aio_flush() calls) concurrently. Here we'll consider an entry flushable if
1601 * its sync gen number is <= the lowest sync gen number carried by all the
1602 * entries currently flushing.
1603 *
1604 * If the entry considered here bears a sync gen number lower than a
1605 * previously flushed entry, the application had to have submitted the write
1606 * bearing the higher gen number before the write with the lower gen number
1607 * completed. So, flushing these concurrently is OK.
1608 *
1609 * If the entry considered here bears a sync gen number higher than a
1610 * currently flushing entry, the write with the lower gen number may have
1611 * completed to the application before the write with the higher sync gen
1612 * number was submitted, and the application may rely on that completion
1613 * order for volume consistency. In this case the entry will not be
1614 * considered flushable until all the entries bearing lower sync gen numbers
1615 * finish flushing.
1616 */
1617
1618 if (m_flush_ops_in_flight &&
1619 (log_entry->ram_entry.sync_gen_number > m_lowest_flushing_sync_gen)) {
1620 return false;
1621 }
1622
1623 return (log_entry->can_writeback() &&
1624 (m_flush_ops_in_flight <= IN_FLIGHT_FLUSH_WRITE_LIMIT) &&
1625 (m_flush_bytes_in_flight <= IN_FLIGHT_FLUSH_BYTES_LIMIT));
1626}
1627
1628template <typename I>
1629Context* AbstractWriteLog<I>::construct_flush_entry(std::shared_ptr<GenericLogEntry> log_entry,
1630 bool invalidating) {
1631 CephContext *cct = m_image_ctx.cct;
1632
1633 ldout(cct, 20) << "" << dendl;
1634 ceph_assert(m_entry_reader_lock.is_locked());
1635 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1636 if (!m_flush_ops_in_flight ||
1637 (log_entry->ram_entry.sync_gen_number < m_lowest_flushing_sync_gen)) {
1638 m_lowest_flushing_sync_gen = log_entry->ram_entry.sync_gen_number;
1639 }
1640 m_flush_ops_in_flight += 1;
a4b75251 1641 m_flush_ops_will_send += 1;
f67539c2
TL
1642 /* For write same this is the bytes affected by the flush op, not the bytes transferred */
1643 m_flush_bytes_in_flight += log_entry->ram_entry.write_bytes;
1644
1645 /* Flush write completion action */
a4b75251 1646 utime_t writeback_start_time = ceph_clock_now();
f67539c2 1647 Context *ctx = new LambdaContext(
a4b75251
TL
1648 [this, log_entry, writeback_start_time, invalidating](int r) {
1649 utime_t writeback_comp_time = ceph_clock_now();
1650 m_perfcounter->tinc(l_librbd_pwl_writeback_latency,
1651 writeback_comp_time - writeback_start_time);
f67539c2
TL
1652 {
1653 std::lock_guard locker(m_lock);
1654 if (r < 0) {
1655 lderr(m_image_ctx.cct) << "failed to flush log entry"
1656 << cpp_strerror(r) << dendl;
1657 m_dirty_log_entries.push_front(log_entry);
1658 } else {
1659 ceph_assert(m_bytes_dirty >= log_entry->bytes_dirty());
1660 log_entry->set_flushed(true);
1661 m_bytes_dirty -= log_entry->bytes_dirty();
1662 sync_point_writer_flushed(log_entry->get_sync_point_entry());
1663 ldout(m_image_ctx.cct, 20) << "flushed: " << log_entry
1664 << " invalidating=" << invalidating
1665 << dendl;
1666 }
1667 m_flush_ops_in_flight -= 1;
1668 m_flush_bytes_in_flight -= log_entry->ram_entry.write_bytes;
1669 wake_up();
1670 }
1671 });
1672 /* Flush through lower cache before completing */
1673 ctx = new LambdaContext(
1674 [this, ctx](int r) {
1675 if (r < 0) {
1676 lderr(m_image_ctx.cct) << "failed to flush log entry"
1677 << cpp_strerror(r) << dendl;
1678 ctx->complete(r);
1679 } else {
1680 m_image_writeback.aio_flush(io::FLUSH_SOURCE_WRITEBACK, ctx);
1681 }
1682 });
1683 return ctx;
1684}
1685
1686template <typename I>
1687void AbstractWriteLog<I>::process_writeback_dirty_entries() {
1688 CephContext *cct = m_image_ctx.cct;
1689 bool all_clean = false;
1690 int flushed = 0;
a4b75251 1691 bool has_write_entry = false;
f67539c2
TL
1692
1693 ldout(cct, 20) << "Look for dirty entries" << dendl;
1694 {
1695 DeferredContexts post_unlock;
a4b75251
TL
1696 GenericLogEntries entries_to_flush;
1697
f67539c2 1698 std::shared_lock entry_reader_locker(m_entry_reader_lock);
a4b75251 1699 std::lock_guard locker(m_lock);
f67539c2 1700 while (flushed < IN_FLIGHT_FLUSH_WRITE_LIMIT) {
f67539c2
TL
1701 if (m_shutting_down) {
1702 ldout(cct, 5) << "Flush during shutdown supressed" << dendl;
1703 /* Do flush complete only when all flush ops are finished */
1704 all_clean = !m_flush_ops_in_flight;
1705 break;
1706 }
1707 if (m_dirty_log_entries.empty()) {
1708 ldout(cct, 20) << "Nothing new to flush" << dendl;
1709 /* Do flush complete only when all flush ops are finished */
1710 all_clean = !m_flush_ops_in_flight;
1711 break;
1712 }
a4b75251
TL
1713
1714 if (m_flush_ops_will_send) {
1715 ldout(cct, 20) << "Previous flush-ops is still not sent" << dendl;
1716 break;
1717 }
f67539c2
TL
1718 auto candidate = m_dirty_log_entries.front();
1719 bool flushable = can_flush_entry(candidate);
1720 if (flushable) {
a4b75251 1721 entries_to_flush.push_back(candidate);
f67539c2 1722 flushed++;
a4b75251
TL
1723 if (!has_write_entry)
1724 has_write_entry = candidate->is_write_entry();
f67539c2
TL
1725 m_dirty_log_entries.pop_front();
1726 } else {
1727 ldout(cct, 20) << "Next dirty entry isn't flushable yet" << dendl;
1728 break;
1729 }
1730 }
a4b75251
TL
1731
1732 construct_flush_entries(entries_to_flush, post_unlock, has_write_entry);
f67539c2
TL
1733 }
1734
1735 if (all_clean) {
1736 /* All flushing complete, drain outside lock */
1737 Contexts flush_contexts;
1738 {
1739 std::lock_guard locker(m_lock);
1740 flush_contexts.swap(m_flush_complete_contexts);
1741 }
1742 finish_contexts(m_image_ctx.cct, flush_contexts, 0);
1743 }
1744}
1745
1746/* Returns true if the specified SyncPointLogEntry is considered flushed, and
1747 * the log will be updated to reflect this. */
1748template <typename I>
1749bool AbstractWriteLog<I>::handle_flushed_sync_point(std::shared_ptr<SyncPointLogEntry> log_entry)
1750{
1751 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1752 ceph_assert(log_entry);
1753
1754 if ((log_entry->writes_flushed == log_entry->writes) &&
1755 log_entry->completed && log_entry->prior_sync_point_flushed &&
1756 log_entry->next_sync_point_entry) {
1757 ldout(m_image_ctx.cct, 20) << "All writes flushed up to sync point="
1758 << *log_entry << dendl;
1759 log_entry->next_sync_point_entry->prior_sync_point_flushed = true;
1760 /* Don't move the flushed sync gen num backwards. */
1761 if (m_flushed_sync_gen < log_entry->ram_entry.sync_gen_number) {
1762 m_flushed_sync_gen = log_entry->ram_entry.sync_gen_number;
1763 }
1764 m_async_op_tracker.start_op();
1765 m_work_queue.queue(new LambdaContext(
a4b75251 1766 [this, next = std::move(log_entry->next_sync_point_entry)](int r) {
f67539c2
TL
1767 bool handled_by_next;
1768 {
1769 std::lock_guard locker(m_lock);
a4b75251 1770 handled_by_next = handle_flushed_sync_point(std::move(next));
f67539c2
TL
1771 }
1772 if (!handled_by_next) {
1773 persist_last_flushed_sync_gen();
1774 }
1775 m_async_op_tracker.finish_op();
1776 }));
1777 return true;
1778 }
1779 return false;
1780}
1781
1782template <typename I>
1783void AbstractWriteLog<I>::sync_point_writer_flushed(std::shared_ptr<SyncPointLogEntry> log_entry)
1784{
1785 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1786 ceph_assert(log_entry);
1787 log_entry->writes_flushed++;
1788
1789 /* If this entry might be completely flushed, look closer */
1790 if ((log_entry->writes_flushed == log_entry->writes) && log_entry->completed) {
1791 ldout(m_image_ctx.cct, 15) << "All writes flushed for sync point="
1792 << *log_entry << dendl;
1793 handle_flushed_sync_point(log_entry);
1794 }
1795}
1796
1797/* Make a new sync point and flush the previous during initialization, when there may or may
1798 * not be a previous sync point */
1799template <typename I>
1800void AbstractWriteLog<I>::init_flush_new_sync_point(DeferredContexts &later) {
1801 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1802 ceph_assert(!m_initialized); /* Don't use this after init */
1803
1804 if (!m_current_sync_point) {
1805 /* First sync point since start */
1806 new_sync_point(later);
1807 } else {
1808 flush_new_sync_point(nullptr, later);
1809 }
1810}
1811
1812/**
1813 * Begin a new sync point
1814 */
1815template <typename I>
1816void AbstractWriteLog<I>::new_sync_point(DeferredContexts &later) {
1817 CephContext *cct = m_image_ctx.cct;
1818 std::shared_ptr<SyncPoint> old_sync_point = m_current_sync_point;
1819 std::shared_ptr<SyncPoint> new_sync_point;
1820 ldout(cct, 20) << dendl;
1821
1822 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1823
1824 /* The first time this is called, if this is a newly created log,
1825 * this makes the first sync gen number we'll use 1. On the first
1826 * call for a re-opened log m_current_sync_gen will be the highest
1827 * gen number from all the sync point entries found in the re-opened
1828 * log, and this advances to the next sync gen number. */
1829 ++m_current_sync_gen;
1830
1831 new_sync_point = std::make_shared<SyncPoint>(m_current_sync_gen, cct);
1832 m_current_sync_point = new_sync_point;
1833
1834 /* If this log has been re-opened, old_sync_point will initially be
1835 * nullptr, but m_current_sync_gen may not be zero. */
1836 if (old_sync_point) {
1837 new_sync_point->setup_earlier_sync_point(old_sync_point, m_last_op_sequence_num);
1838 m_perfcounter->hinc(l_librbd_pwl_syncpoint_hist,
1839 old_sync_point->log_entry->writes,
1840 old_sync_point->log_entry->bytes);
1841 /* This sync point will acquire no more sub-ops. Activation needs
1842 * to acquire m_lock, so defer to later*/
1843 later.add(new LambdaContext(
1844 [this, old_sync_point](int r) {
1845 old_sync_point->prior_persisted_gather_activate();
1846 }));
1847 }
1848
1849 new_sync_point->prior_persisted_gather_set_finisher();
1850
1851 if (old_sync_point) {
1852 ldout(cct,6) << "new sync point = [" << *m_current_sync_point
1853 << "], prior = [" << *old_sync_point << "]" << dendl;
1854 } else {
1855 ldout(cct,6) << "first sync point = [" << *m_current_sync_point
1856 << "]" << dendl;
1857 }
1858}
1859
1860template <typename I>
1861void AbstractWriteLog<I>::flush_new_sync_point(C_FlushRequestT *flush_req,
1862 DeferredContexts &later) {
1863 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1864
1865 if (!flush_req) {
1866 m_async_null_flush_finish++;
1867 m_async_op_tracker.start_op();
1868 Context *flush_ctx = new LambdaContext([this](int r) {
1869 m_async_null_flush_finish--;
1870 m_async_op_tracker.finish_op();
1871 });
1872 flush_req = make_flush_req(flush_ctx);
1873 flush_req->internal = true;
1874 }
1875
1876 /* Add a new sync point. */
1877 new_sync_point(later);
1878 std::shared_ptr<SyncPoint> to_append = m_current_sync_point->earlier_sync_point;
1879 ceph_assert(to_append);
1880
1881 /* This flush request will append/persist the (now) previous sync point */
1882 flush_req->to_append = to_append;
1883
1884 /* When the m_sync_point_persist Gather completes this sync point can be
1885 * appended. The only sub for this Gather is the finisher Context for
1886 * m_prior_log_entries_persisted, which records the result of the Gather in
1887 * the sync point, and completes. TODO: Do we still need both of these
1888 * Gathers?*/
1889 Context * ctx = new LambdaContext([this, flush_req](int r) {
1890 ldout(m_image_ctx.cct, 20) << "Flush req=" << flush_req
1891 << " sync point =" << flush_req->to_append
1892 << ". Ready to persist." << dendl;
1893 alloc_and_dispatch_io_req(flush_req);
1894 });
1895 to_append->persist_gather_set_finisher(ctx);
1896
1897 /* The m_sync_point_persist Gather has all the subs it will ever have, and
1898 * now has its finisher. If the sub is already complete, activation will
1899 * complete the Gather. The finisher will acquire m_lock, so we'll activate
1900 * this when we release m_lock.*/
1901 later.add(new LambdaContext([this, to_append](int r) {
1902 to_append->persist_gather_activate();
1903 }));
1904
1905 /* The flush request completes when the sync point persists */
1906 to_append->add_in_on_persisted_ctxs(flush_req);
1907}
1908
1909template <typename I>
1910void AbstractWriteLog<I>::flush_new_sync_point_if_needed(C_FlushRequestT *flush_req,
1911 DeferredContexts &later) {
1912 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
1913
1914 /* If there have been writes since the last sync point ... */
1915 if (m_current_sync_point->log_entry->writes) {
1916 flush_new_sync_point(flush_req, later);
1917 } else {
1918 /* There have been no writes to the current sync point. */
1919 if (m_current_sync_point->earlier_sync_point) {
1920 /* If previous sync point hasn't completed, complete this flush
1921 * with the earlier sync point. No alloc or dispatch needed. */
1922 m_current_sync_point->earlier_sync_point->on_sync_point_persisted.push_back(flush_req);
1923 } else {
1924 /* The previous sync point has already completed and been
1925 * appended. The current sync point has no writes, so this flush
1926 * has nothing to wait for. This flush completes now. */
1927 later.add(flush_req);
1928 }
1929 }
1930}
1931
1932/*
1933 * RWL internal flush - will actually flush the RWL.
1934 *
1935 * User flushes should arrive at aio_flush(), and only flush prior
1936 * writes to all log replicas.
1937 *
1938 * Librbd internal flushes will arrive at flush(invalidate=false,
1939 * discard=false), and traverse the block guard to ensure in-flight writes are
1940 * flushed.
1941 */
1942template <typename I>
1943void AbstractWriteLog<I>::flush_dirty_entries(Context *on_finish) {
1944 CephContext *cct = m_image_ctx.cct;
1945 bool all_clean;
1946 bool flushing;
1947 bool stop_flushing;
1948
1949 {
1950 std::lock_guard locker(m_lock);
1951 flushing = (0 != m_flush_ops_in_flight);
1952 all_clean = m_dirty_log_entries.empty();
1953 stop_flushing = (m_shutting_down);
1954 }
1955
1956 if (!flushing && (all_clean || stop_flushing)) {
1957 /* Complete without holding m_lock */
1958 if (all_clean) {
1959 ldout(cct, 20) << "no dirty entries" << dendl;
1960 } else {
1961 ldout(cct, 5) << "flush during shutdown suppressed" << dendl;
1962 }
1963 on_finish->complete(0);
1964 } else {
1965 if (all_clean) {
1966 ldout(cct, 5) << "flush ops still in progress" << dendl;
1967 } else {
1968 ldout(cct, 20) << "dirty entries remain" << dendl;
1969 }
1970 std::lock_guard locker(m_lock);
1971 /* on_finish can't be completed yet */
1972 m_flush_complete_contexts.push_back(new LambdaContext(
1973 [this, on_finish](int r) {
1974 flush_dirty_entries(on_finish);
1975 }));
1976 wake_up();
1977 }
1978}
1979
1980template <typename I>
1981void AbstractWriteLog<I>::internal_flush(bool invalidate, Context *on_finish) {
1982 ldout(m_image_ctx.cct, 20) << "invalidate=" << invalidate << dendl;
1983
1984 if (m_perfcounter) {
1985 if (invalidate) {
1986 m_perfcounter->inc(l_librbd_pwl_invalidate_cache, 1);
1987 } else {
a4b75251 1988 m_perfcounter->inc(l_librbd_pwl_internal_flush, 1);
f67539c2
TL
1989 }
1990 }
1991
1992 /* May be called even if initialization fails */
1993 if (!m_initialized) {
1994 ldout(m_image_ctx.cct, 05) << "never initialized" << dendl;
1995 /* Deadlock if completed here */
1996 m_image_ctx.op_work_queue->queue(on_finish, 0);
1997 return;
1998 }
1999
2000 /* Flush/invalidate must pass through block guard to ensure all layers of
2001 * cache are consistently flush/invalidated. This ensures no in-flight write leaves
2002 * some layers with valid regions, which may later produce inconsistent read
2003 * results. */
2004 GuardedRequestFunctionContext *guarded_ctx =
2005 new GuardedRequestFunctionContext(
2006 [this, on_finish, invalidate](GuardedRequestFunctionContext &guard_ctx) {
2007 DeferredContexts on_exit;
2008 ldout(m_image_ctx.cct, 20) << "cell=" << guard_ctx.cell << dendl;
2009 ceph_assert(guard_ctx.cell);
2010
2011 Context *ctx = new LambdaContext(
2012 [this, cell=guard_ctx.cell, invalidate, on_finish](int r) {
2013 std::lock_guard locker(m_lock);
2014 m_invalidating = false;
2015 ldout(m_image_ctx.cct, 6) << "Done flush/invalidating (invalidate="
2016 << invalidate << ")" << dendl;
2017 if (m_log_entries.size()) {
2018 ldout(m_image_ctx.cct, 1) << "m_log_entries.size()="
2019 << m_log_entries.size() << ", "
2020 << "front()=" << *m_log_entries.front()
2021 << dendl;
2022 }
2023 if (invalidate) {
2024 ceph_assert(m_log_entries.size() == 0);
2025 }
2026 ceph_assert(m_dirty_log_entries.size() == 0);
2027 m_image_ctx.op_work_queue->queue(on_finish, r);
2028 release_guarded_request(cell);
2029 });
2030 ctx = new LambdaContext(
2031 [this, ctx, invalidate](int r) {
2032 Context *next_ctx = ctx;
2033 ldout(m_image_ctx.cct, 6) << "flush_dirty_entries finished" << dendl;
2034 if (r < 0) {
2035 /* Override on_finish status with this error */
2036 next_ctx = new LambdaContext([r, ctx](int _r) {
2037 ctx->complete(r);
2038 });
2039 }
2040 if (invalidate) {
2041 {
2042 std::lock_guard locker(m_lock);
2043 ceph_assert(m_dirty_log_entries.size() == 0);
2044 ceph_assert(!m_invalidating);
2045 ldout(m_image_ctx.cct, 6) << "Invalidating" << dendl;
2046 m_invalidating = true;
2047 }
2048 /* Discards all RWL entries */
2049 while (retire_entries(MAX_ALLOC_PER_TRANSACTION)) { }
2050 next_ctx->complete(0);
2051 } else {
2052 {
2053 std::lock_guard locker(m_lock);
2054 ceph_assert(m_dirty_log_entries.size() == 0);
2055 ceph_assert(!m_invalidating);
2056 }
2057 m_image_writeback.aio_flush(io::FLUSH_SOURCE_WRITEBACK, next_ctx);
2058 }
2059 });
2060 ctx = new LambdaContext(
2061 [this, ctx](int r) {
2062 flush_dirty_entries(ctx);
2063 });
2064 std::lock_guard locker(m_lock);
2065 /* Even if we're throwing everything away, but we want the last entry to
2066 * be a sync point so we can cleanly resume.
2067 *
2068 * Also, the blockguard only guarantees the replication of this op
2069 * can't overlap with prior ops. It doesn't guarantee those are all
2070 * completed and eligible for flush & retire, which we require here.
2071 */
2072 auto flush_req = make_flush_req(ctx);
2073 flush_new_sync_point_if_needed(flush_req, on_exit);
2074 });
2075 detain_guarded_request(nullptr, guarded_ctx, true);
2076}
2077
2078template <typename I>
2079void AbstractWriteLog<I>::add_into_log_map(GenericWriteLogEntries &log_entries,
2080 C_BlockIORequestT *req) {
2081 req->copy_cache();
2082 m_blocks_to_log_entries.add_log_entries(log_entries);
2083}
2084
2085template <typename I>
2086bool AbstractWriteLog<I>::can_retire_entry(std::shared_ptr<GenericLogEntry> log_entry) {
2087 CephContext *cct = m_image_ctx.cct;
2088
2089 ldout(cct, 20) << dendl;
2090 ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
2091 return log_entry->can_retire();
2092}
2093
a4b75251
TL
2094template <typename I>
2095void AbstractWriteLog<I>::check_image_cache_state_clean() {
2096 ceph_assert(m_deferred_ios.empty());
2097 ceph_assert(m_ops_to_append.empty());;
2098 ceph_assert(m_async_flush_ops == 0);
2099 ceph_assert(m_async_append_ops == 0);
2100 ceph_assert(m_dirty_log_entries.empty());
2101 ceph_assert(m_ops_to_flush.empty());
2102 ceph_assert(m_flush_ops_in_flight == 0);
2103 ceph_assert(m_flush_bytes_in_flight == 0);
2104 ceph_assert(m_bytes_dirty == 0);
2105 ceph_assert(m_work_queue.empty());
2106}
2107
f67539c2
TL
2108} // namespace pwl
2109} // namespace cache
2110} // namespace librbd
2111
2112template class librbd::cache::pwl::AbstractWriteLog<librbd::ImageCtx>;