1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
4 #include "AbstractWriteLog.h"
5 #include "include/buffer.h"
6 #include "include/Context.h"
7 #include "include/ceph_assert.h"
8 #include "common/deleter.h"
9 #include "common/dout.h"
10 #include "common/environment.h"
11 #include "common/errno.h"
12 #include "common/hostname.h"
13 #include "common/WorkQueue.h"
14 #include "common/Timer.h"
15 #include "common/perf_counters.h"
16 #include "librbd/ImageCtx.h"
17 #include "librbd/asio/ContextWQ.h"
18 #include "librbd/cache/pwl/ImageCacheState.h"
19 #include "librbd/cache/pwl/LogEntry.h"
20 #include "librbd/plugin/Api.h"
25 #define dout_subsys ceph_subsys_rbd_pwl
27 #define dout_prefix *_dout << "librbd::cache::pwl::AbstractWriteLog: " << this \
28 << " " << __func__ << ": "
35 using namespace librbd::cache::pwl
;
37 typedef AbstractWriteLog
<ImageCtx
>::Extent Extent
;
38 typedef AbstractWriteLog
<ImageCtx
>::Extents Extents
;
41 AbstractWriteLog
<I
>::AbstractWriteLog(
42 I
&image_ctx
, librbd::cache::pwl::ImageCacheState
<I
>* cache_state
,
43 Builder
<This
> *builder
, cache::ImageWritebackInterface
& image_writeback
,
44 plugin::Api
<I
>& plugin_api
)
46 m_write_log_guard(image_ctx
.cct
),
47 m_flush_guard(image_ctx
.cct
),
48 m_flush_guard_lock(ceph::make_mutex(pwl::unique_lock_name(
49 "librbd::cache::pwl::AbstractWriteLog::m_flush_guard_lock", this))),
50 m_deferred_dispatch_lock(ceph::make_mutex(pwl::unique_lock_name(
51 "librbd::cache::pwl::AbstractWriteLog::m_deferred_dispatch_lock", this))),
52 m_blockguard_lock(ceph::make_mutex(pwl::unique_lock_name(
53 "librbd::cache::pwl::AbstractWriteLog::m_blockguard_lock", this))),
55 image_ctx
.cct
, "librbd::cache::pwl::AbstractWriteLog::thread_pool",
57 m_cache_state(cache_state
),
58 m_image_ctx(image_ctx
),
59 m_log_pool_size(DEFAULT_POOL_SIZE
),
60 m_image_writeback(image_writeback
),
61 m_plugin_api(plugin_api
),
62 m_log_retire_lock(ceph::make_mutex(pwl::unique_lock_name(
63 "librbd::cache::pwl::AbstractWriteLog::m_log_retire_lock", this))),
64 m_entry_reader_lock("librbd::cache::pwl::AbstractWriteLog::m_entry_reader_lock"),
65 m_log_append_lock(ceph::make_mutex(pwl::unique_lock_name(
66 "librbd::cache::pwl::AbstractWriteLog::m_log_append_lock", this))),
67 m_lock(ceph::make_mutex(pwl::unique_lock_name(
68 "librbd::cache::pwl::AbstractWriteLog::m_lock", this))),
69 m_blocks_to_log_entries(image_ctx
.cct
),
70 m_work_queue("librbd::cache::pwl::ReplicatedWriteLog::work_queue",
72 image_ctx
.config
.template get_val
<uint64_t>(
73 "rbd_op_thread_timeout")),
76 CephContext
*cct
= m_image_ctx
.cct
;
77 m_plugin_api
.get_image_timer_instance(cct
, &m_timer
, &m_timer_lock
);
81 AbstractWriteLog
<I
>::~AbstractWriteLog() {
82 ldout(m_image_ctx
.cct
, 15) << "enter" << dendl
;
84 std::lock_guard
timer_locker(*m_timer_lock
);
85 std::lock_guard
locker(m_lock
);
86 m_timer
->cancel_event(m_timer_ctx
);
88 ceph_assert(m_deferred_ios
.size() == 0);
89 ceph_assert(m_ops_to_flush
.size() == 0);
90 ceph_assert(m_ops_to_append
.size() == 0);
91 ceph_assert(m_flush_ops_in_flight
== 0);
94 m_cache_state
= nullptr;
96 ldout(m_image_ctx
.cct
, 15) << "exit" << dendl
;
100 void AbstractWriteLog
<I
>::perf_start(std::string name
) {
101 PerfCountersBuilder
plb(m_image_ctx
.cct
, name
, l_librbd_pwl_first
,
104 // Latency axis configuration for op histograms, values are in nanoseconds
105 PerfHistogramCommon::axis_config_d op_hist_x_axis_config
{
107 PerfHistogramCommon::SCALE_LOG2
, ///< Latency in logarithmic scale
109 5000, ///< Quantization unit is 5usec
110 16, ///< Ranges into the mS
113 // Syncpoint logentry number x-axis configuration for op histograms
114 PerfHistogramCommon::axis_config_d sp_logentry_number_config
{
116 PerfHistogramCommon::SCALE_LINEAR
, // log entry number in linear scale
118 1, // Quantization unit is 1
119 260, // Up to 260 > (MAX_WRITES_PER_SYNC_POINT)
122 // Syncpoint bytes number y-axis configuration for op histogram
123 PerfHistogramCommon::axis_config_d sp_bytes_number_config
{
124 "Number of SyncPoint",
125 PerfHistogramCommon::SCALE_LOG2
, // Request size in logarithmic scale
127 512, // Quantization unit is 512
128 17, // Writes up to 8M >= MAX_BYTES_PER_SYNC_POINT
131 // Op size axis configuration for op histogram y axis, values are in bytes
132 PerfHistogramCommon::axis_config_d op_hist_y_axis_config
{
133 "Request size (bytes)",
134 PerfHistogramCommon::SCALE_LOG2
, ///< Request size in logarithmic scale
136 512, ///< Quantization unit is 512 bytes
137 16, ///< Writes up to >32k
140 // Num items configuration for op histogram y axis, values are in items
141 PerfHistogramCommon::axis_config_d op_hist_y_axis_count_config
{
143 PerfHistogramCommon::SCALE_LINEAR
, ///< Request size in linear scale
145 1, ///< Quantization unit is 1
146 32, ///< Writes up to >32k
149 plb
.add_u64_counter(l_librbd_pwl_rd_req
, "rd", "Reads");
150 plb
.add_u64_counter(l_librbd_pwl_rd_bytes
, "rd_bytes", "Data size in reads");
151 plb
.add_time_avg(l_librbd_pwl_rd_latency
, "rd_latency", "Latency of reads");
153 plb
.add_u64_counter(l_librbd_pwl_rd_hit_req
, "hit_rd", "Reads completely hitting RWL");
154 plb
.add_u64_counter(l_librbd_pwl_rd_hit_bytes
, "rd_hit_bytes", "Bytes read from RWL");
155 plb
.add_time_avg(l_librbd_pwl_rd_hit_latency
, "hit_rd_latency", "Latency of read hits");
157 plb
.add_u64_counter(l_librbd_pwl_rd_part_hit_req
, "part_hit_rd", "reads partially hitting RWL");
159 plb
.add_u64_counter_histogram(
160 l_librbd_pwl_syncpoint_hist
, "syncpoint_logentry_bytes_histogram",
161 sp_logentry_number_config
, sp_bytes_number_config
,
162 "Histogram of syncpoint's logentry numbers vs bytes number");
164 plb
.add_u64_counter(l_librbd_pwl_wr_req
, "wr", "Writes");
165 plb
.add_u64_counter(l_librbd_pwl_wr_bytes
, "wr_bytes", "Data size in writes");
166 plb
.add_u64_counter(l_librbd_pwl_wr_req_def
, "wr_def", "Writes deferred for resources");
167 plb
.add_u64_counter(l_librbd_pwl_wr_req_def_lanes
, "wr_def_lanes", "Writes deferred for lanes");
168 plb
.add_u64_counter(l_librbd_pwl_wr_req_def_log
, "wr_def_log", "Writes deferred for log entries");
169 plb
.add_u64_counter(l_librbd_pwl_wr_req_def_buf
, "wr_def_buf", "Writes deferred for buffers");
170 plb
.add_u64_counter(l_librbd_pwl_wr_req_overlap
, "wr_overlap", "Writes overlapping with prior in-progress writes");
171 plb
.add_u64_counter(l_librbd_pwl_wr_req_queued
, "wr_q_barrier", "Writes queued for prior barriers (aio_flush)");
173 plb
.add_u64_counter(l_librbd_pwl_log_ops
, "log_ops", "Log appends");
174 plb
.add_u64_avg(l_librbd_pwl_log_op_bytes
, "log_op_bytes", "Average log append bytes");
177 l_librbd_pwl_req_arr_to_all_t
, "req_arr_to_all_t",
178 "Average arrival to allocation time (time deferred for overlap)");
180 l_librbd_pwl_req_arr_to_dis_t
, "req_arr_to_dis_t",
181 "Average arrival to dispatch time (includes time deferred for overlaps and allocation)");
183 l_librbd_pwl_req_all_to_dis_t
, "req_all_to_dis_t",
184 "Average allocation to dispatch time (time deferred for log resources)");
186 l_librbd_pwl_wr_latency
, "wr_latency",
187 "Latency of writes (persistent completion)");
188 plb
.add_u64_counter_histogram(
189 l_librbd_pwl_wr_latency_hist
, "wr_latency_bytes_histogram",
190 op_hist_x_axis_config
, op_hist_y_axis_config
,
191 "Histogram of write request latency (nanoseconds) vs. bytes written");
193 l_librbd_pwl_wr_caller_latency
, "caller_wr_latency",
194 "Latency of write completion to caller");
196 l_librbd_pwl_nowait_req_arr_to_all_t
, "req_arr_to_all_nw_t",
197 "Average arrival to allocation time (time deferred for overlap)");
199 l_librbd_pwl_nowait_req_arr_to_dis_t
, "req_arr_to_dis_nw_t",
200 "Average arrival to dispatch time (includes time deferred for overlaps and allocation)");
202 l_librbd_pwl_nowait_req_all_to_dis_t
, "req_all_to_dis_nw_t",
203 "Average allocation to dispatch time (time deferred for log resources)");
205 l_librbd_pwl_nowait_wr_latency
, "wr_latency_nw",
206 "Latency of writes (persistent completion) not deferred for free space");
207 plb
.add_u64_counter_histogram(
208 l_librbd_pwl_nowait_wr_latency_hist
, "wr_latency_nw_bytes_histogram",
209 op_hist_x_axis_config
, op_hist_y_axis_config
,
210 "Histogram of write request latency (nanoseconds) vs. bytes written for writes not deferred for free space");
212 l_librbd_pwl_nowait_wr_caller_latency
, "caller_wr_latency_nw",
213 "Latency of write completion to callerfor writes not deferred for free space");
214 plb
.add_time_avg(l_librbd_pwl_log_op_alloc_t
, "op_alloc_t", "Average buffer pmemobj_reserve() time");
215 plb
.add_u64_counter_histogram(
216 l_librbd_pwl_log_op_alloc_t_hist
, "op_alloc_t_bytes_histogram",
217 op_hist_x_axis_config
, op_hist_y_axis_config
,
218 "Histogram of buffer pmemobj_reserve() time (nanoseconds) vs. bytes written");
219 plb
.add_time_avg(l_librbd_pwl_log_op_dis_to_buf_t
, "op_dis_to_buf_t", "Average dispatch to buffer persist time");
220 plb
.add_time_avg(l_librbd_pwl_log_op_dis_to_app_t
, "op_dis_to_app_t", "Average dispatch to log append time");
221 plb
.add_time_avg(l_librbd_pwl_log_op_dis_to_cmp_t
, "op_dis_to_cmp_t", "Average dispatch to persist completion time");
222 plb
.add_u64_counter_histogram(
223 l_librbd_pwl_log_op_dis_to_cmp_t_hist
, "op_dis_to_cmp_t_bytes_histogram",
224 op_hist_x_axis_config
, op_hist_y_axis_config
,
225 "Histogram of op dispatch to persist complete time (nanoseconds) vs. bytes written");
228 l_librbd_pwl_log_op_buf_to_app_t
, "op_buf_to_app_t",
229 "Average buffer persist to log append time (write data persist/replicate + wait for append time)");
231 l_librbd_pwl_log_op_buf_to_bufc_t
, "op_buf_to_bufc_t",
232 "Average buffer persist time (write data persist/replicate time)");
233 plb
.add_u64_counter_histogram(
234 l_librbd_pwl_log_op_buf_to_bufc_t_hist
, "op_buf_to_bufc_t_bytes_histogram",
235 op_hist_x_axis_config
, op_hist_y_axis_config
,
236 "Histogram of write buffer persist time (nanoseconds) vs. bytes written");
238 l_librbd_pwl_log_op_app_to_cmp_t
, "op_app_to_cmp_t",
239 "Average log append to persist complete time (log entry append/replicate + wait for complete time)");
241 l_librbd_pwl_log_op_app_to_appc_t
, "op_app_to_appc_t",
242 "Average log append to persist complete time (log entry append/replicate time)");
243 plb
.add_u64_counter_histogram(
244 l_librbd_pwl_log_op_app_to_appc_t_hist
, "op_app_to_appc_t_bytes_histogram",
245 op_hist_x_axis_config
, op_hist_y_axis_config
,
246 "Histogram of log append persist time (nanoseconds) (vs. op bytes)");
248 plb
.add_u64_counter(l_librbd_pwl_discard
, "discard", "Discards");
249 plb
.add_u64_counter(l_librbd_pwl_discard_bytes
, "discard_bytes", "Bytes discarded");
250 plb
.add_time_avg(l_librbd_pwl_discard_latency
, "discard_lat", "Discard latency");
252 plb
.add_u64_counter(l_librbd_pwl_aio_flush
, "aio_flush", "AIO flush (flush to RWL)");
253 plb
.add_u64_counter(l_librbd_pwl_aio_flush_def
, "aio_flush_def", "AIO flushes deferred for resources");
254 plb
.add_time_avg(l_librbd_pwl_aio_flush_latency
, "aio_flush_lat", "AIO flush latency");
256 plb
.add_u64_counter(l_librbd_pwl_ws
,"ws", "Write Sames");
257 plb
.add_u64_counter(l_librbd_pwl_ws_bytes
, "ws_bytes", "Write Same bytes to image");
258 plb
.add_time_avg(l_librbd_pwl_ws_latency
, "ws_lat", "Write Same latency");
260 plb
.add_u64_counter(l_librbd_pwl_cmp
, "cmp", "Compare and Write requests");
261 plb
.add_u64_counter(l_librbd_pwl_cmp_bytes
, "cmp_bytes", "Compare and Write bytes compared/written");
262 plb
.add_time_avg(l_librbd_pwl_cmp_latency
, "cmp_lat", "Compare and Write latecy");
263 plb
.add_u64_counter(l_librbd_pwl_cmp_fails
, "cmp_fails", "Compare and Write compare fails");
265 plb
.add_u64_counter(l_librbd_pwl_internal_flush
, "internal_flush", "Flush RWL (write back to OSD)");
266 plb
.add_time_avg(l_librbd_pwl_writeback_latency
, "writeback_lat", "write back to OSD latency");
267 plb
.add_u64_counter(l_librbd_pwl_invalidate_cache
, "invalidate", "Invalidate RWL");
268 plb
.add_u64_counter(l_librbd_pwl_invalidate_discard_cache
, "discard", "Discard and invalidate RWL");
270 plb
.add_time_avg(l_librbd_pwl_append_tx_t
, "append_tx_lat", "Log append transaction latency");
271 plb
.add_u64_counter_histogram(
272 l_librbd_pwl_append_tx_t_hist
, "append_tx_lat_histogram",
273 op_hist_x_axis_config
, op_hist_y_axis_count_config
,
274 "Histogram of log append transaction time (nanoseconds) vs. entries appended");
275 plb
.add_time_avg(l_librbd_pwl_retire_tx_t
, "retire_tx_lat", "Log retire transaction latency");
276 plb
.add_u64_counter_histogram(
277 l_librbd_pwl_retire_tx_t_hist
, "retire_tx_lat_histogram",
278 op_hist_x_axis_config
, op_hist_y_axis_count_config
,
279 "Histogram of log retire transaction time (nanoseconds) vs. entries retired");
281 m_perfcounter
= plb
.create_perf_counters();
282 m_image_ctx
.cct
->get_perfcounters_collection()->add(m_perfcounter
);
285 template <typename I
>
286 void AbstractWriteLog
<I
>::perf_stop() {
287 ceph_assert(m_perfcounter
);
288 m_image_ctx
.cct
->get_perfcounters_collection()->remove(m_perfcounter
);
289 delete m_perfcounter
;
292 template <typename I
>
293 void AbstractWriteLog
<I
>::log_perf() {
295 Formatter
*f
= Formatter::create("json-pretty");
296 bl
.append("Perf dump follows\n--- Begin perf dump ---\n");
299 utime_t now
= ceph_clock_now();
300 ss
<< "\"test_time\": \"" << now
<< "\",";
301 ss
<< "\"image\": \"" << m_image_ctx
.name
<< "\",";
303 bl
.append("\"stats\": ");
304 m_image_ctx
.cct
->get_perfcounters_collection()->dump_formatted(f
, 0);
306 bl
.append(",\n\"histograms\": ");
307 m_image_ctx
.cct
->get_perfcounters_collection()->dump_formatted_histograms(f
, 0);
310 bl
.append("}\n--- End perf dump ---\n");
312 ldout(m_image_ctx
.cct
, 1) << bl
.c_str() << dendl
;
315 template <typename I
>
316 void AbstractWriteLog
<I
>::periodic_stats() {
317 std::unique_lock
locker(m_lock
);
318 ldout(m_image_ctx
.cct
, 5) << "STATS: m_log_entries=" << m_log_entries
.size()
319 << ", m_dirty_log_entries=" << m_dirty_log_entries
.size()
320 << ", m_free_log_entries=" << m_free_log_entries
321 << ", m_bytes_allocated=" << m_bytes_allocated
322 << ", m_bytes_cached=" << m_bytes_cached
323 << ", m_bytes_dirty=" << m_bytes_dirty
324 << ", bytes available=" << m_bytes_allocated_cap
- m_bytes_allocated
325 << ", m_first_valid_entry=" << m_first_valid_entry
326 << ", m_first_free_entry=" << m_first_free_entry
327 << ", m_current_sync_gen=" << m_current_sync_gen
328 << ", m_flushed_sync_gen=" << m_flushed_sync_gen
331 update_image_cache_state();
332 write_image_cache_state(locker
);
335 template <typename I
>
336 void AbstractWriteLog
<I
>::arm_periodic_stats() {
337 ceph_assert(ceph_mutex_is_locked(*m_timer_lock
));
338 m_timer_ctx
= new LambdaContext([this](int r
) {
339 /* m_timer_lock is held */
341 arm_periodic_stats();
343 m_timer
->add_event_after(LOG_STATS_INTERVAL_SECONDS
, m_timer_ctx
);
346 template <typename I
>
347 void AbstractWriteLog
<I
>::update_entries(std::shared_ptr
<GenericLogEntry
> *log_entry
,
348 WriteLogCacheEntry
*cache_entry
, std::map
<uint64_t, bool> &missing_sync_points
,
349 std::map
<uint64_t, std::shared_ptr
<SyncPointLogEntry
>> &sync_point_entries
,
350 uint64_t entry_index
) {
351 bool writer
= cache_entry
->is_writer();
352 if (cache_entry
->is_sync_point()) {
353 ldout(m_image_ctx
.cct
, 20) << "Entry " << entry_index
354 << " is a sync point. cache_entry=[" << *cache_entry
<< "]" << dendl
;
355 auto sync_point_entry
= std::make_shared
<SyncPointLogEntry
>(cache_entry
->sync_gen_number
);
356 *log_entry
= sync_point_entry
;
357 sync_point_entries
[cache_entry
->sync_gen_number
] = sync_point_entry
;
358 missing_sync_points
.erase(cache_entry
->sync_gen_number
);
359 m_current_sync_gen
= cache_entry
->sync_gen_number
;
360 } else if (cache_entry
->is_write()) {
361 ldout(m_image_ctx
.cct
, 20) << "Entry " << entry_index
362 << " is a write. cache_entry=[" << *cache_entry
<< "]" << dendl
;
364 m_builder
->create_write_log_entry(nullptr, cache_entry
->image_offset_bytes
, cache_entry
->write_bytes
);
365 write_data_to_buffer(write_entry
, cache_entry
);
366 *log_entry
= write_entry
;
367 } else if (cache_entry
->is_writesame()) {
368 ldout(m_image_ctx
.cct
, 20) << "Entry " << entry_index
369 << " is a write same. cache_entry=[" << *cache_entry
<< "]" << dendl
;
371 m_builder
->create_writesame_log_entry(nullptr, cache_entry
->image_offset_bytes
,
372 cache_entry
->write_bytes
, cache_entry
->ws_datalen
);
373 write_data_to_buffer(ws_entry
, cache_entry
);
374 *log_entry
= ws_entry
;
375 } else if (cache_entry
->is_discard()) {
376 ldout(m_image_ctx
.cct
, 20) << "Entry " << entry_index
377 << " is a discard. cache_entry=[" << *cache_entry
<< "]" << dendl
;
379 std::make_shared
<DiscardLogEntry
>(nullptr, cache_entry
->image_offset_bytes
, cache_entry
->write_bytes
,
380 m_discard_granularity_bytes
);
381 *log_entry
= discard_entry
;
383 lderr(m_image_ctx
.cct
) << "Unexpected entry type in entry " << entry_index
384 << ", cache_entry=[" << *cache_entry
<< "]" << dendl
;
388 ldout(m_image_ctx
.cct
, 20) << "Entry " << entry_index
389 << " writes. cache_entry=[" << *cache_entry
<< "]" << dendl
;
390 if (!sync_point_entries
[cache_entry
->sync_gen_number
]) {
391 missing_sync_points
[cache_entry
->sync_gen_number
] = true;
396 template <typename I
>
397 void AbstractWriteLog
<I
>::update_sync_points(std::map
<uint64_t, bool> &missing_sync_points
,
398 std::map
<uint64_t, std::shared_ptr
<SyncPointLogEntry
>> &sync_point_entries
,
399 DeferredContexts
&later
) {
400 /* Create missing sync points. These must not be appended until the
401 * entry reload is complete and the write map is up to
402 * date. Currently this is handled by the deferred contexts object
403 * passed to new_sync_point(). These contexts won't be completed
404 * until this function returns. */
405 for (auto &kv
: missing_sync_points
) {
406 ldout(m_image_ctx
.cct
, 5) << "Adding sync point " << kv
.first
<< dendl
;
407 if (0 == m_current_sync_gen
) {
408 /* The unlikely case where the log contains writing entries, but no sync
409 * points (e.g. because they were all retired) */
410 m_current_sync_gen
= kv
.first
-1;
412 ceph_assert(kv
.first
== m_current_sync_gen
+1);
413 init_flush_new_sync_point(later
);
414 ceph_assert(kv
.first
== m_current_sync_gen
);
415 sync_point_entries
[kv
.first
] = m_current_sync_point
->log_entry
;
419 * Iterate over the log entries again (this time via the global
420 * entries list), connecting write entries to their sync points and
421 * updating the sync point stats.
423 * Add writes to the write log map.
425 std::shared_ptr
<SyncPointLogEntry
> previous_sync_point_entry
= nullptr;
426 for (auto &log_entry
: m_log_entries
) {
427 if ((log_entry
->write_bytes() > 0) || (log_entry
->bytes_dirty() > 0)) {
428 /* This entry is one of the types that write */
429 auto gen_write_entry
= static_pointer_cast
<GenericWriteLogEntry
>(log_entry
);
430 if (gen_write_entry
) {
431 auto sync_point_entry
= sync_point_entries
[gen_write_entry
->ram_entry
.sync_gen_number
];
432 if (!sync_point_entry
) {
433 lderr(m_image_ctx
.cct
) << "Sync point missing for entry=[" << *gen_write_entry
<< "]" << dendl
;
436 gen_write_entry
->sync_point_entry
= sync_point_entry
;
437 sync_point_entry
->writes
++;
438 sync_point_entry
->bytes
+= gen_write_entry
->ram_entry
.write_bytes
;
439 sync_point_entry
->writes_completed
++;
440 m_blocks_to_log_entries
.add_log_entry(gen_write_entry
);
441 /* This entry is only dirty if its sync gen number is > the flushed
442 * sync gen number from the root object. */
443 if (gen_write_entry
->ram_entry
.sync_gen_number
> m_flushed_sync_gen
) {
444 m_dirty_log_entries
.push_back(log_entry
);
445 m_bytes_dirty
+= gen_write_entry
->bytes_dirty();
447 gen_write_entry
->set_flushed(true);
448 sync_point_entry
->writes_flushed
++;
451 /* calc m_bytes_allocated & m_bytes_cached */
452 inc_allocated_cached_bytes(log_entry
);
456 /* This entry is sync point entry */
457 auto sync_point_entry
= static_pointer_cast
<SyncPointLogEntry
>(log_entry
);
458 if (sync_point_entry
) {
459 if (previous_sync_point_entry
) {
460 previous_sync_point_entry
->next_sync_point_entry
= sync_point_entry
;
461 if (previous_sync_point_entry
->ram_entry
.sync_gen_number
> m_flushed_sync_gen
) {
462 sync_point_entry
->prior_sync_point_flushed
= false;
463 ceph_assert(!previous_sync_point_entry
->prior_sync_point_flushed
||
464 (0 == previous_sync_point_entry
->writes
) ||
465 (previous_sync_point_entry
->writes
>= previous_sync_point_entry
->writes_flushed
));
467 sync_point_entry
->prior_sync_point_flushed
= true;
468 ceph_assert(previous_sync_point_entry
->prior_sync_point_flushed
);
469 ceph_assert(previous_sync_point_entry
->writes
== previous_sync_point_entry
->writes_flushed
);
472 /* There are no previous sync points, so we'll consider them flushed */
473 sync_point_entry
->prior_sync_point_flushed
= true;
475 previous_sync_point_entry
= sync_point_entry
;
476 ldout(m_image_ctx
.cct
, 10) << "Loaded to sync point=[" << *sync_point_entry
<< dendl
;
480 if (0 == m_current_sync_gen
) {
481 /* If a re-opened log was completely flushed, we'll have found no sync point entries here,
482 * and not advanced m_current_sync_gen. Here we ensure it starts past the last flushed sync
483 * point recorded in the log. */
484 m_current_sync_gen
= m_flushed_sync_gen
;
488 template <typename I
>
489 void AbstractWriteLog
<I
>::pwl_init(Context
*on_finish
, DeferredContexts
&later
) {
490 CephContext
*cct
= m_image_ctx
.cct
;
491 ldout(cct
, 20) << dendl
;
492 ceph_assert(m_cache_state
);
493 std::lock_guard
locker(m_lock
);
494 ceph_assert(!m_initialized
);
495 ldout(cct
,5) << "image name: " << m_image_ctx
.name
<< " id: " << m_image_ctx
.id
<< dendl
;
497 if (!m_cache_state
->present
) {
498 m_cache_state
->host
= ceph_get_short_hostname();
499 m_cache_state
->size
= m_image_ctx
.config
.template get_val
<uint64_t>(
500 "rbd_persistent_cache_size");
502 string path
= m_image_ctx
.config
.template get_val
<string
>(
503 "rbd_persistent_cache_path");
504 std::string pool_name
= m_image_ctx
.md_ctx
.get_pool_name();
505 m_cache_state
->path
= path
+ "/rbd-pwl." + pool_name
+ "." + m_image_ctx
.id
+ ".pool";
508 ldout(cct
,5) << "pwl_size: " << m_cache_state
->size
<< dendl
;
509 ldout(cct
,5) << "pwl_path: " << m_cache_state
->path
<< dendl
;
511 m_log_pool_name
= m_cache_state
->path
;
512 m_log_pool_size
= max(m_cache_state
->size
, MIN_POOL_SIZE
);
513 m_log_pool_size
= p2align(m_log_pool_size
, POOL_SIZE_ALIGN
);
514 ldout(cct
, 5) << "pool " << m_log_pool_name
<< " size " << m_log_pool_size
515 << " (adjusted from " << m_cache_state
->size
<< ")" << dendl
;
517 if ((!m_cache_state
->present
) &&
518 (access(m_log_pool_name
.c_str(), F_OK
) == 0)) {
519 ldout(cct
, 5) << "There's an existing pool file " << m_log_pool_name
520 << ", While there's no cache in the image metatata." << dendl
;
521 if (remove(m_log_pool_name
.c_str()) != 0) {
522 lderr(cct
) << "failed to remove the pool file " << m_log_pool_name
524 on_finish
->complete(-errno
);
527 ldout(cct
, 5) << "Removed the existing pool file." << dendl
;
529 } else if ((m_cache_state
->present
) &&
530 (access(m_log_pool_name
.c_str(), F_OK
) != 0)) {
531 lderr(cct
) << "can't find the existed pool file: " << m_log_pool_name
532 << ". error: " << cpp_strerror(-errno
) << dendl
;
533 on_finish
->complete(-errno
);
537 bool succeeded
= initialize_pool(on_finish
, later
);
542 ldout(cct
,1) << "pool " << m_log_pool_name
<< " has " << m_total_log_entries
543 << " log entries, " << m_free_log_entries
<< " of which are free."
544 << " first_valid=" << m_first_valid_entry
545 << ", first_free=" << m_first_free_entry
546 << ", flushed_sync_gen=" << m_flushed_sync_gen
547 << ", m_current_sync_gen=" << m_current_sync_gen
<< dendl
;
548 if (m_first_free_entry
== m_first_valid_entry
) {
549 ldout(cct
,1) << "write log is empty" << dendl
;
550 m_cache_state
->empty
= true;
553 /* Start the sync point following the last one seen in the
554 * log. Flush the last sync point created during the loading of the
555 * existing log entries. */
556 init_flush_new_sync_point(later
);
557 ldout(cct
,20) << "new sync point = [" << m_current_sync_point
<< "]" << dendl
;
559 m_initialized
= true;
561 m_thread_pool
.start();
563 /* Do these after we drop lock */
564 later
.add(new LambdaContext([this](int r
) {
565 /* Log stats for the first time */
567 /* Arm periodic stats logging for the first time */
568 std::lock_guard
timer_locker(*m_timer_lock
);
569 arm_periodic_stats();
571 m_image_ctx
.op_work_queue
->queue(on_finish
, 0);
574 template <typename I
>
575 void AbstractWriteLog
<I
>::write_image_cache_state(std::unique_lock
<ceph::mutex
>& locker
) {
576 using klass
= AbstractWriteLog
<I
>;
577 Context
*ctx
= util::create_context_callback
<
578 klass
, &klass::handle_write_image_cache_state
>(this);
579 m_cache_state
->write_image_cache_state(locker
, ctx
);
582 template <typename I
>
583 void AbstractWriteLog
<I
>::update_image_cache_state() {
584 ldout(m_image_ctx
.cct
, 10) << dendl
;
586 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
587 m_cache_state
->allocated_bytes
= m_bytes_allocated
;
588 m_cache_state
->cached_bytes
= m_bytes_cached
;
589 m_cache_state
->dirty_bytes
= m_bytes_dirty
;
590 m_cache_state
->free_bytes
= m_bytes_allocated_cap
- m_bytes_allocated
;
591 m_cache_state
->hits_full
= m_perfcounter
->get(l_librbd_pwl_rd_hit_req
);
592 m_cache_state
->hits_partial
= m_perfcounter
->get(l_librbd_pwl_rd_part_hit_req
);
593 m_cache_state
->misses
= m_perfcounter
->get(l_librbd_pwl_rd_req
) -
594 m_cache_state
->hits_full
- m_cache_state
->hits_partial
;
595 m_cache_state
->hit_bytes
= m_perfcounter
->get(l_librbd_pwl_rd_hit_bytes
);
596 m_cache_state
->miss_bytes
= m_perfcounter
->get(l_librbd_pwl_rd_bytes
) -
597 m_cache_state
->hit_bytes
;
600 template <typename I
>
601 void AbstractWriteLog
<I
>::handle_write_image_cache_state(int r
) {
602 CephContext
*cct
= m_image_ctx
.cct
;
603 ldout(cct
, 10) << "r=" << r
<< dendl
;
606 lderr(cct
) << "failed to update image cache state: " << cpp_strerror(r
)
612 template <typename I
>
613 void AbstractWriteLog
<I
>::init(Context
*on_finish
) {
614 CephContext
*cct
= m_image_ctx
.cct
;
615 ldout(cct
, 20) << dendl
;
616 auto pname
= std::string("librbd-pwl-") + m_image_ctx
.id
+
617 std::string("-") + m_image_ctx
.md_ctx
.get_pool_name() +
618 std::string("-") + m_image_ctx
.name
;
621 ceph_assert(!m_initialized
);
623 Context
*ctx
= new LambdaContext(
624 [this, on_finish
](int r
) {
626 std::unique_lock
locker(m_lock
);
627 update_image_cache_state();
628 m_cache_state
->write_image_cache_state(locker
, on_finish
);
630 on_finish
->complete(r
);
634 DeferredContexts later
;
635 pwl_init(ctx
, later
);
638 template <typename I
>
639 void AbstractWriteLog
<I
>::shut_down(Context
*on_finish
) {
640 CephContext
*cct
= m_image_ctx
.cct
;
641 ldout(cct
, 20) << dendl
;
643 ldout(cct
,5) << "image name: " << m_image_ctx
.name
<< " id: " << m_image_ctx
.id
<< dendl
;
645 Context
*ctx
= new LambdaContext(
646 [this, on_finish
](int r
) {
650 ldout(m_image_ctx
.cct
, 6) << "shutdown complete" << dendl
;
651 m_image_ctx
.op_work_queue
->queue(on_finish
, r
);
653 ctx
= new LambdaContext(
655 ldout(m_image_ctx
.cct
, 6) << "image cache cleaned" << dendl
;
656 Context
*next_ctx
= override_ctx(r
, ctx
);
659 std::unique_lock
locker(m_lock
);
660 check_image_cache_state_clean();
661 m_wake_up_enabled
= false;
662 m_log_entries
.clear();
663 m_cache_state
->clean
= true;
664 m_cache_state
->empty
= true;
666 update_image_cache_state();
667 m_cache_state
->write_image_cache_state(locker
, next_ctx
);
669 ctx
= new LambdaContext(
671 Context
*next_ctx
= override_ctx(r
, ctx
);
672 ldout(m_image_ctx
.cct
, 6) << "waiting for in flight operations" << dendl
;
673 // Wait for in progress IOs to complete
674 next_ctx
= util::create_async_context_callback(&m_work_queue
, next_ctx
);
675 m_async_op_tracker
.wait_for_ops(next_ctx
);
677 ctx
= new LambdaContext(
679 Context
*next_ctx
= override_ctx(r
, ctx
);
681 /* Sync with process_writeback_dirty_entries() */
682 RWLock::WLocker
entry_reader_wlocker(m_entry_reader_lock
);
683 m_shutting_down
= true;
684 /* Flush all writes to OSDs (unless disabled) and wait for all
685 in-progress flush writes to complete */
686 ldout(m_image_ctx
.cct
, 6) << "flushing" << dendl
;
689 flush_dirty_entries(next_ctx
);
691 ctx
= new LambdaContext(
693 ldout(m_image_ctx
.cct
, 6) << "Done internal_flush in shutdown" << dendl
;
694 m_work_queue
.queue(ctx
, r
);
696 /* Complete all in-flight writes before shutting down */
697 ldout(m_image_ctx
.cct
, 6) << "internal_flush in shutdown" << dendl
;
698 internal_flush(false, ctx
);
701 template <typename I
>
702 void AbstractWriteLog
<I
>::read(Extents
&& image_extents
,
703 ceph::bufferlist
* bl
,
704 int fadvise_flags
, Context
*on_finish
) {
705 CephContext
*cct
= m_image_ctx
.cct
;
706 utime_t now
= ceph_clock_now();
708 on_finish
= new LambdaContext(
709 [this, on_finish
](int r
) {
710 m_async_op_tracker
.finish_op();
711 on_finish
->complete(r
);
713 C_ReadRequest
*read_ctx
= m_builder
->create_read_request(
714 cct
, now
, m_perfcounter
, bl
, on_finish
);
715 ldout(cct
, 20) << "name: " << m_image_ctx
.name
<< " id: " << m_image_ctx
.id
716 << "image_extents=" << image_extents
718 << ", on_finish=" << on_finish
<< dendl
;
720 ceph_assert(m_initialized
);
722 m_perfcounter
->inc(l_librbd_pwl_rd_req
, 1);
724 std::vector
<std::shared_ptr
<GenericWriteLogEntry
>> log_entries_to_read
;
725 std::vector
<bufferlist
*> bls_to_read
;
727 m_async_op_tracker
.start_op();
728 Context
*ctx
= new LambdaContext(
729 [this, read_ctx
, fadvise_flags
](int r
) {
730 if (read_ctx
->miss_extents
.empty()) {
731 /* All of this read comes from RWL */
732 read_ctx
->complete(0);
734 /* Pass the read misses on to the layer below RWL */
735 m_image_writeback
.aio_read(
736 std::move(read_ctx
->miss_extents
), &read_ctx
->miss_bl
,
737 fadvise_flags
, read_ctx
);
742 * The strategy here is to look up all the WriteLogMapEntries that overlap
743 * this read, and iterate through those to separate this read into hits and
744 * misses. A new Extents object is produced here with Extents for each miss
745 * region. The miss Extents is then passed on to the read cache below RWL. We
746 * also produce an ImageExtentBufs for all the extents (hit or miss) in this
747 * read. When the read from the lower cache layer completes, we iterate
748 * through the ImageExtentBufs and insert buffers for each cache hit at the
749 * appropriate spot in the bufferlist returned from below for the miss
750 * read. The buffers we insert here refer directly to regions of various
751 * write log entry data buffers.
753 * Locking: These buffer objects hold a reference on the write log entries
754 * they refer to. Log entries can't be retired until there are no references.
755 * The GenericWriteLogEntry references are released by the buffer destructor.
757 for (auto &extent
: image_extents
) {
758 uint64_t extent_offset
= 0;
759 RWLock::RLocker
entry_reader_locker(m_entry_reader_lock
);
760 WriteLogMapEntries map_entries
= m_blocks_to_log_entries
.find_map_entries(
761 block_extent(extent
));
762 for (auto &map_entry
: map_entries
) {
763 Extent
entry_image_extent(pwl::image_extent(map_entry
.block_extent
));
764 /* If this map entry starts after the current image extent offset ... */
765 if (entry_image_extent
.first
> extent
.first
+ extent_offset
) {
766 /* ... add range before map_entry to miss extents */
767 uint64_t miss_extent_start
= extent
.first
+ extent_offset
;
768 uint64_t miss_extent_length
= entry_image_extent
.first
-
770 Extent
miss_extent(miss_extent_start
, miss_extent_length
);
771 read_ctx
->miss_extents
.push_back(miss_extent
);
772 /* Add miss range to read extents */
773 auto miss_extent_buf
= std::make_shared
<ImageExtentBuf
>(miss_extent
);
774 read_ctx
->read_extents
.push_back(miss_extent_buf
);
775 extent_offset
+= miss_extent_length
;
777 ceph_assert(entry_image_extent
.first
<= extent
.first
+ extent_offset
);
778 uint64_t entry_offset
= 0;
779 /* If this map entry starts before the current image extent offset ... */
780 if (entry_image_extent
.first
< extent
.first
+ extent_offset
) {
781 /* ... compute offset into log entry for this read extent */
782 entry_offset
= (extent
.first
+ extent_offset
) - entry_image_extent
.first
;
784 /* This read hit ends at the end of the extent or the end of the log
785 entry, whichever is less. */
786 uint64_t entry_hit_length
= min(entry_image_extent
.second
- entry_offset
,
787 extent
.second
- extent_offset
);
788 Extent
hit_extent(entry_image_extent
.first
, entry_hit_length
);
789 if (0 == map_entry
.log_entry
->write_bytes() &&
790 0 < map_entry
.log_entry
->bytes_dirty()) {
791 /* discard log entry */
792 ldout(cct
, 20) << "discard log entry" << dendl
;
793 auto discard_entry
= map_entry
.log_entry
;
794 ldout(cct
, 20) << "read hit on discard entry: log_entry="
797 /* Discards read as zero, so we'll construct a bufferlist of zeros */
799 zero_bl
.append_zero(entry_hit_length
);
800 /* Add hit extent to read extents */
801 auto hit_extent_buf
= std::make_shared
<ImageExtentBuf
>(
802 hit_extent
, zero_bl
);
803 read_ctx
->read_extents
.push_back(hit_extent_buf
);
805 ldout(cct
, 20) << "write or writesame log entry" << dendl
;
806 /* write and writesame log entry */
807 /* Offset of the map entry into the log entry's buffer */
808 uint64_t map_entry_buffer_offset
= entry_image_extent
.first
-
809 map_entry
.log_entry
->ram_entry
.image_offset_bytes
;
810 /* Offset into the log entry buffer of this read hit */
811 uint64_t read_buffer_offset
= map_entry_buffer_offset
+ entry_offset
;
812 /* Create buffer object referring to pmem pool for this read hit */
813 collect_read_extents(
814 read_buffer_offset
, map_entry
, log_entries_to_read
, bls_to_read
,
815 entry_hit_length
, hit_extent
, read_ctx
);
817 /* Exclude RWL hit range from buffer and extent */
818 extent_offset
+= entry_hit_length
;
819 ldout(cct
, 20) << map_entry
<< dendl
;
821 /* If the last map entry didn't consume the entire image extent ... */
822 if (extent
.second
> extent_offset
) {
823 /* ... add the rest of this extent to miss extents */
824 uint64_t miss_extent_start
= extent
.first
+ extent_offset
;
825 uint64_t miss_extent_length
= extent
.second
- extent_offset
;
826 Extent
miss_extent(miss_extent_start
, miss_extent_length
);
827 read_ctx
->miss_extents
.push_back(miss_extent
);
828 /* Add miss range to read extents */
829 auto miss_extent_buf
= std::make_shared
<ImageExtentBuf
>(miss_extent
);
830 read_ctx
->read_extents
.push_back(miss_extent_buf
);
831 extent_offset
+= miss_extent_length
;
835 ldout(cct
, 20) << "miss_extents=" << read_ctx
->miss_extents
836 << ", miss_bl=" << read_ctx
->miss_bl
<< dendl
;
838 complete_read(log_entries_to_read
, bls_to_read
, ctx
);
841 template <typename I
>
842 void AbstractWriteLog
<I
>::write(Extents
&&image_extents
,
845 Context
*on_finish
) {
846 CephContext
*cct
= m_image_ctx
.cct
;
848 ldout(cct
, 20) << "aio_write" << dendl
;
850 utime_t now
= ceph_clock_now();
851 m_perfcounter
->inc(l_librbd_pwl_wr_req
, 1);
853 ceph_assert(m_initialized
);
855 /* Split image extents larger than 1M. This isn't strictly necessary but
856 * makes libpmemobj allocator's job easier and reduces pmemobj_defrag() cost.
857 * We plan to manage pmem space and allocation by ourselves in the future.
859 Extents split_image_extents
;
860 uint64_t max_extent_size
= get_max_extent();
861 if (max_extent_size
!= 0) {
862 for (auto extent
: image_extents
) {
863 if (extent
.second
> max_extent_size
) {
864 uint64_t off
= extent
.first
;
865 uint64_t extent_bytes
= extent
.second
;
866 for (int i
= 0; extent_bytes
!= 0; ++i
) {
868 _ext
.first
= off
+ i
* max_extent_size
;
869 _ext
.second
= std::min(max_extent_size
, extent_bytes
);
870 extent_bytes
= extent_bytes
- _ext
.second
;
871 split_image_extents
.emplace_back(_ext
);
874 split_image_extents
.emplace_back(extent
);
878 split_image_extents
= image_extents
;
881 C_WriteRequestT
*write_req
=
882 m_builder
->create_write_request(*this, now
, std::move(split_image_extents
),
883 std::move(bl
), fadvise_flags
, m_lock
,
884 m_perfcounter
, on_finish
);
885 m_perfcounter
->inc(l_librbd_pwl_wr_bytes
,
886 write_req
->image_extents_summary
.total_bytes
);
888 /* The lambda below will be called when the block guard for all
889 * blocks affected by this write is obtained */
890 GuardedRequestFunctionContext
*guarded_ctx
=
891 new GuardedRequestFunctionContext([this,
892 write_req
](GuardedRequestFunctionContext
&guard_ctx
) {
893 write_req
->blockguard_acquired(guard_ctx
);
894 alloc_and_dispatch_io_req(write_req
);
897 detain_guarded_request(write_req
, guarded_ctx
, false);
900 template <typename I
>
901 void AbstractWriteLog
<I
>::discard(uint64_t offset
, uint64_t length
,
902 uint32_t discard_granularity_bytes
,
903 Context
*on_finish
) {
904 CephContext
*cct
= m_image_ctx
.cct
;
906 ldout(cct
, 20) << dendl
;
908 utime_t now
= ceph_clock_now();
909 m_perfcounter
->inc(l_librbd_pwl_discard
, 1);
910 Extents discard_extents
= {{offset
, length
}};
911 m_discard_granularity_bytes
= discard_granularity_bytes
;
913 ceph_assert(m_initialized
);
916 new C_DiscardRequestT(*this, now
, std::move(discard_extents
), discard_granularity_bytes
,
917 m_lock
, m_perfcounter
, on_finish
);
919 /* The lambda below will be called when the block guard for all
920 * blocks affected by this write is obtained */
921 GuardedRequestFunctionContext
*guarded_ctx
=
922 new GuardedRequestFunctionContext([this, discard_req
](GuardedRequestFunctionContext
&guard_ctx
) {
923 discard_req
->blockguard_acquired(guard_ctx
);
924 alloc_and_dispatch_io_req(discard_req
);
927 detain_guarded_request(discard_req
, guarded_ctx
, false);
931 * Aio_flush completes when all previously completed writes are
932 * flushed to persistent cache. We make a best-effort attempt to also
933 * defer until all in-progress writes complete, but we may not know
934 * about all of the writes the application considers in-progress yet,
935 * due to uncertainty in the IO submission workq (multiple WQ threads
936 * may allow out-of-order submission).
938 * This flush operation will not wait for writes deferred for overlap
939 * in the block guard.
941 template <typename I
>
942 void AbstractWriteLog
<I
>::flush(io::FlushSource flush_source
, Context
*on_finish
) {
943 CephContext
*cct
= m_image_ctx
.cct
;
944 ldout(cct
, 20) << "on_finish=" << on_finish
<< " flush_source=" << flush_source
<< dendl
;
946 if (io::FLUSH_SOURCE_SHUTDOWN
== flush_source
|| io::FLUSH_SOURCE_INTERNAL
== flush_source
||
947 io::FLUSH_SOURCE_WRITE_BLOCK
== flush_source
) {
948 internal_flush(false, on_finish
);
951 m_perfcounter
->inc(l_librbd_pwl_aio_flush
, 1);
953 /* May be called even if initialization fails */
954 if (!m_initialized
) {
955 ldout(cct
, 05) << "never initialized" << dendl
;
956 /* Deadlock if completed here */
957 m_image_ctx
.op_work_queue
->queue(on_finish
, 0);
962 std::shared_lock
image_locker(m_image_ctx
.image_lock
);
963 if (m_image_ctx
.snap_id
!= CEPH_NOSNAP
|| m_image_ctx
.read_only
) {
964 on_finish
->complete(-EROFS
);
969 auto flush_req
= make_flush_req(on_finish
);
971 GuardedRequestFunctionContext
*guarded_ctx
=
972 new GuardedRequestFunctionContext([this, flush_req
](GuardedRequestFunctionContext
&guard_ctx
) {
973 ldout(m_image_ctx
.cct
, 20) << "flush_req=" << flush_req
<< " cell=" << guard_ctx
.cell
<< dendl
;
974 ceph_assert(guard_ctx
.cell
);
975 flush_req
->detained
= guard_ctx
.state
.detained
;
976 /* We don't call flush_req->set_cell(), because the block guard will be released here */
978 DeferredContexts post_unlock
; /* Do these when the lock below is released */
979 std::lock_guard
locker(m_lock
);
981 if (!m_persist_on_flush
&& m_persist_on_write_until_flush
) {
982 m_persist_on_flush
= true;
983 ldout(m_image_ctx
.cct
, 5) << "now persisting on flush" << dendl
;
987 * Create a new sync point if there have been writes since the last
990 * We do not flush the caches below the RWL here.
992 flush_new_sync_point_if_needed(flush_req
, post_unlock
);
995 release_guarded_request(guard_ctx
.cell
);
998 detain_guarded_request(flush_req
, guarded_ctx
, true);
1001 template <typename I
>
1002 void AbstractWriteLog
<I
>::writesame(uint64_t offset
, uint64_t length
,
1003 bufferlist
&& bl
, int fadvise_flags
,
1004 Context
*on_finish
) {
1005 CephContext
*cct
= m_image_ctx
.cct
;
1007 ldout(cct
, 20) << "aio_writesame" << dendl
;
1009 utime_t now
= ceph_clock_now();
1010 Extents ws_extents
= {{offset
, length
}};
1011 m_perfcounter
->inc(l_librbd_pwl_ws
, 1);
1012 ceph_assert(m_initialized
);
1014 /* A write same request is also a write request. The key difference is the
1015 * write same data buffer is shorter than the extent of the request. The full
1016 * extent will be used in the block guard, and appear in
1017 * m_blocks_to_log_entries_map. The data buffer allocated for the WS is only
1018 * as long as the length of the bl here, which is the pattern that's repeated
1019 * in the image for the entire length of this WS. Read hits and flushing of
1020 * write sames are different than normal writes. */
1021 C_WriteSameRequestT
*ws_req
=
1022 m_builder
->create_writesame_request(*this, now
, std::move(ws_extents
), std::move(bl
),
1023 fadvise_flags
, m_lock
, m_perfcounter
, on_finish
);
1024 m_perfcounter
->inc(l_librbd_pwl_ws_bytes
, ws_req
->image_extents_summary
.total_bytes
);
1026 /* The lambda below will be called when the block guard for all
1027 * blocks affected by this write is obtained */
1028 GuardedRequestFunctionContext
*guarded_ctx
=
1029 new GuardedRequestFunctionContext([this, ws_req
](GuardedRequestFunctionContext
&guard_ctx
) {
1030 ws_req
->blockguard_acquired(guard_ctx
);
1031 alloc_and_dispatch_io_req(ws_req
);
1034 detain_guarded_request(ws_req
, guarded_ctx
, false);
1037 template <typename I
>
1038 void AbstractWriteLog
<I
>::compare_and_write(Extents
&&image_extents
,
1039 bufferlist
&& cmp_bl
,
1041 uint64_t *mismatch_offset
,
1043 Context
*on_finish
) {
1044 ldout(m_image_ctx
.cct
, 20) << dendl
;
1046 utime_t now
= ceph_clock_now();
1047 m_perfcounter
->inc(l_librbd_pwl_cmp
, 1);
1048 ceph_assert(m_initialized
);
1050 /* A compare and write request is also a write request. We only allocate
1051 * resources and dispatch this write request if the compare phase
1053 C_WriteRequestT
*cw_req
=
1054 m_builder
->create_comp_and_write_request(
1055 *this, now
, std::move(image_extents
), std::move(cmp_bl
), std::move(bl
),
1056 mismatch_offset
, fadvise_flags
, m_lock
, m_perfcounter
, on_finish
);
1057 m_perfcounter
->inc(l_librbd_pwl_cmp_bytes
, cw_req
->image_extents_summary
.total_bytes
);
1059 /* The lambda below will be called when the block guard for all
1060 * blocks affected by this write is obtained */
1061 GuardedRequestFunctionContext
*guarded_ctx
=
1062 new GuardedRequestFunctionContext([this, cw_req
](GuardedRequestFunctionContext
&guard_ctx
) {
1063 cw_req
->blockguard_acquired(guard_ctx
);
1065 auto read_complete_ctx
= new LambdaContext(
1066 [this, cw_req
](int r
) {
1067 ldout(m_image_ctx
.cct
, 20) << "name: " << m_image_ctx
.name
<< " id: " << m_image_ctx
.id
1068 << "cw_req=" << cw_req
<< dendl
;
1070 /* Compare read_bl to cmp_bl to determine if this will produce a write */
1071 buffer::list aligned_read_bl
;
1072 if (cw_req
->cmp_bl
.length() < cw_req
->read_bl
.length()) {
1073 aligned_read_bl
.substr_of(cw_req
->read_bl
, 0, cw_req
->cmp_bl
.length());
1075 if (cw_req
->cmp_bl
.contents_equal(cw_req
->read_bl
) ||
1076 cw_req
->cmp_bl
.contents_equal(aligned_read_bl
)) {
1077 /* Compare phase succeeds. Begin write */
1078 ldout(m_image_ctx
.cct
, 5) << " cw_req=" << cw_req
<< " compare matched" << dendl
;
1079 cw_req
->compare_succeeded
= true;
1080 *cw_req
->mismatch_offset
= 0;
1081 /* Continue with this request as a write. Blockguard release and
1082 * user request completion handled as if this were a plain
1084 alloc_and_dispatch_io_req(cw_req
);
1086 /* Compare phase fails. Comp-and write ends now. */
1087 ldout(m_image_ctx
.cct
, 15) << " cw_req=" << cw_req
<< " compare failed" << dendl
;
1088 /* Bufferlist doesn't tell us where they differed, so we'll have to determine that here */
1089 uint64_t bl_index
= 0;
1090 for (bl_index
= 0; bl_index
< cw_req
->cmp_bl
.length(); bl_index
++) {
1091 if (cw_req
->cmp_bl
[bl_index
] != cw_req
->read_bl
[bl_index
]) {
1092 ldout(m_image_ctx
.cct
, 15) << " cw_req=" << cw_req
<< " mismatch at " << bl_index
<< dendl
;
1096 cw_req
->compare_succeeded
= false;
1097 *cw_req
->mismatch_offset
= bl_index
;
1098 cw_req
->complete_user_request(-EILSEQ
);
1099 cw_req
->release_cell();
1100 cw_req
->complete(0);
1104 /* Read phase of comp-and-write must read through RWL */
1105 Extents image_extents_copy
= cw_req
->image_extents
;
1106 read(std::move(image_extents_copy
), &cw_req
->read_bl
, cw_req
->fadvise_flags
, read_complete_ctx
);
1109 detain_guarded_request(cw_req
, guarded_ctx
, false);
1112 template <typename I
>
1113 void AbstractWriteLog
<I
>::flush(Context
*on_finish
) {
1114 internal_flush(false, on_finish
);
1117 template <typename I
>
1118 void AbstractWriteLog
<I
>::invalidate(Context
*on_finish
) {
1119 internal_flush(true, on_finish
);
1122 template <typename I
>
1123 CephContext
*AbstractWriteLog
<I
>::get_context() {
1124 return m_image_ctx
.cct
;
1127 template <typename I
>
1128 BlockGuardCell
* AbstractWriteLog
<I
>::detain_guarded_request_helper(GuardedRequest
&req
)
1130 CephContext
*cct
= m_image_ctx
.cct
;
1131 BlockGuardCell
*cell
;
1133 ceph_assert(ceph_mutex_is_locked_by_me(m_blockguard_lock
));
1134 ldout(cct
, 20) << dendl
;
1136 int r
= m_write_log_guard
.detain(req
.block_extent
, &req
, &cell
);
1139 ldout(cct
, 20) << "detaining guarded request due to in-flight requests: "
1140 << "req=" << req
<< dendl
;
1144 ldout(cct
, 20) << "in-flight request cell: " << cell
<< dendl
;
1148 template <typename I
>
1149 BlockGuardCell
* AbstractWriteLog
<I
>::detain_guarded_request_barrier_helper(
1150 GuardedRequest
&req
)
1152 BlockGuardCell
*cell
= nullptr;
1154 ceph_assert(ceph_mutex_is_locked_by_me(m_blockguard_lock
));
1155 ldout(m_image_ctx
.cct
, 20) << dendl
;
1157 if (m_barrier_in_progress
) {
1158 req
.guard_ctx
->state
.queued
= true;
1159 m_awaiting_barrier
.push_back(req
);
1161 bool barrier
= req
.guard_ctx
->state
.barrier
;
1163 m_barrier_in_progress
= true;
1164 req
.guard_ctx
->state
.current_barrier
= true;
1166 cell
= detain_guarded_request_helper(req
);
1168 /* Only non-null if the barrier acquires the guard now */
1169 m_barrier_cell
= cell
;
1176 template <typename I
>
1177 void AbstractWriteLog
<I
>::detain_guarded_request(
1178 C_BlockIORequestT
*request
,
1179 GuardedRequestFunctionContext
*guarded_ctx
,
1184 extent
= request
->image_extents_summary
.block_extent();
1186 extent
= block_extent(whole_volume_extent());
1188 auto req
= GuardedRequest(extent
, guarded_ctx
, is_barrier
);
1189 BlockGuardCell
*cell
= nullptr;
1191 ldout(m_image_ctx
.cct
, 20) << dendl
;
1193 std::lock_guard
locker(m_blockguard_lock
);
1194 cell
= detain_guarded_request_barrier_helper(req
);
1197 req
.guard_ctx
->cell
= cell
;
1198 req
.guard_ctx
->complete(0);
1202 template <typename I
>
1203 void AbstractWriteLog
<I
>::release_guarded_request(BlockGuardCell
*released_cell
)
1205 CephContext
*cct
= m_image_ctx
.cct
;
1206 WriteLogGuard::BlockOperations block_reqs
;
1207 ldout(cct
, 20) << "released_cell=" << released_cell
<< dendl
;
1210 std::lock_guard
locker(m_blockguard_lock
);
1211 m_write_log_guard
.release(released_cell
, &block_reqs
);
1213 for (auto &req
: block_reqs
) {
1214 req
.guard_ctx
->state
.detained
= true;
1215 BlockGuardCell
*detained_cell
= detain_guarded_request_helper(req
);
1216 if (detained_cell
) {
1217 if (req
.guard_ctx
->state
.current_barrier
) {
1218 /* The current barrier is acquiring the block guard, so now we know its cell */
1219 m_barrier_cell
= detained_cell
;
1220 /* detained_cell could be == released_cell here */
1221 ldout(cct
, 20) << "current barrier cell=" << detained_cell
<< " req=" << req
<< dendl
;
1223 req
.guard_ctx
->cell
= detained_cell
;
1224 m_work_queue
.queue(req
.guard_ctx
);
1228 if (m_barrier_in_progress
&& (released_cell
== m_barrier_cell
)) {
1229 ldout(cct
, 20) << "current barrier released cell=" << released_cell
<< dendl
;
1230 /* The released cell is the current barrier request */
1231 m_barrier_in_progress
= false;
1232 m_barrier_cell
= nullptr;
1233 /* Move waiting requests into the blockguard. Stop if there's another barrier */
1234 while (!m_barrier_in_progress
&& !m_awaiting_barrier
.empty()) {
1235 auto &req
= m_awaiting_barrier
.front();
1236 ldout(cct
, 20) << "submitting queued request to blockguard: " << req
<< dendl
;
1237 BlockGuardCell
*detained_cell
= detain_guarded_request_barrier_helper(req
);
1238 if (detained_cell
) {
1239 req
.guard_ctx
->cell
= detained_cell
;
1240 m_work_queue
.queue(req
.guard_ctx
);
1242 m_awaiting_barrier
.pop_front();
1247 ldout(cct
, 20) << "exit" << dendl
;
1250 template <typename I
>
1251 void AbstractWriteLog
<I
>::append_scheduled(GenericLogOperations
&ops
, bool &ops_remain
,
1252 bool &appending
, bool isRWL
)
1254 const unsigned long int OPS_APPENDED
= isRWL
? MAX_ALLOC_PER_TRANSACTION
1255 : MAX_WRITES_PER_SYNC_POINT
;
1257 std::lock_guard
locker(m_lock
);
1258 if (!appending
&& m_appending
) {
1259 /* Another thread is appending */
1260 ldout(m_image_ctx
.cct
, 15) << "Another thread is appending" << dendl
;
1263 if (m_ops_to_append
.size()) {
1266 auto last_in_batch
= m_ops_to_append
.begin();
1267 unsigned int ops_to_append
= m_ops_to_append
.size();
1268 if (ops_to_append
> OPS_APPENDED
) {
1269 ops_to_append
= OPS_APPENDED
;
1271 std::advance(last_in_batch
, ops_to_append
);
1272 ops
.splice(ops
.end(), m_ops_to_append
, m_ops_to_append
.begin(), last_in_batch
);
1273 ops_remain
= true; /* Always check again before leaving */
1274 ldout(m_image_ctx
.cct
, 20) << "appending " << ops
.size() << ", remain "
1275 << m_ops_to_append
.size() << dendl
;
1280 m_appending
= false;
1286 template <typename I
>
1287 void AbstractWriteLog
<I
>::schedule_append(GenericLogOperationsVector
&ops
, C_BlockIORequestT
*req
)
1289 GenericLogOperations
to_append(ops
.begin(), ops
.end());
1291 schedule_append_ops(to_append
, req
);
1294 template <typename I
>
1295 void AbstractWriteLog
<I
>::schedule_append(GenericLogOperationSharedPtr op
, C_BlockIORequestT
*req
)
1297 GenericLogOperations to_append
{ op
};
1299 schedule_append_ops(to_append
, req
);
1303 * Complete a set of write ops with the result of append_op_entries.
1305 template <typename I
>
1306 void AbstractWriteLog
<I
>::complete_op_log_entries(GenericLogOperations
&&ops
,
1309 GenericLogEntries dirty_entries
;
1310 int published_reserves
= 0;
1311 bool need_update_state
= false;
1312 ldout(m_image_ctx
.cct
, 20) << __func__
<< ": completing" << dendl
;
1313 for (auto &op
: ops
) {
1314 utime_t now
= ceph_clock_now();
1315 auto log_entry
= op
->get_log_entry();
1316 log_entry
->completed
= true;
1317 if (op
->is_writing_op()) {
1318 op
->mark_log_entry_completed();
1319 dirty_entries
.push_back(log_entry
);
1321 if (log_entry
->is_write_entry()) {
1322 release_ram(log_entry
);
1324 if (op
->reserved_allocated()) {
1325 published_reserves
++;
1328 std::lock_guard
locker(m_lock
);
1329 m_unpublished_reserves
-= published_reserves
;
1330 m_dirty_log_entries
.splice(m_dirty_log_entries
.end(), dirty_entries
);
1331 if (m_cache_state
->clean
&& !this->m_dirty_log_entries
.empty()) {
1332 m_cache_state
->clean
= false;
1333 update_image_cache_state();
1334 need_update_state
= true;
1337 op
->complete(result
);
1338 m_perfcounter
->tinc(l_librbd_pwl_log_op_dis_to_app_t
,
1339 op
->log_append_start_time
- op
->dispatch_time
);
1340 m_perfcounter
->tinc(l_librbd_pwl_log_op_dis_to_cmp_t
, now
- op
->dispatch_time
);
1341 m_perfcounter
->hinc(l_librbd_pwl_log_op_dis_to_cmp_t_hist
,
1342 utime_t(now
- op
->dispatch_time
).to_nsec(),
1343 log_entry
->ram_entry
.write_bytes
);
1344 utime_t app_lat
= op
->log_append_comp_time
- op
->log_append_start_time
;
1345 m_perfcounter
->tinc(l_librbd_pwl_log_op_app_to_appc_t
, app_lat
);
1346 m_perfcounter
->hinc(l_librbd_pwl_log_op_app_to_appc_t_hist
, app_lat
.to_nsec(),
1347 log_entry
->ram_entry
.write_bytes
);
1348 m_perfcounter
->tinc(l_librbd_pwl_log_op_app_to_cmp_t
, now
- op
->log_append_start_time
);
1350 if (need_update_state
) {
1351 std::unique_lock
locker(m_lock
);
1352 write_image_cache_state(locker
);
1354 // New entries may be flushable
1356 std::lock_guard
locker(m_lock
);
1362 * Dispatch as many deferred writes as possible
1364 template <typename I
>
1365 void AbstractWriteLog
<I
>::dispatch_deferred_writes(void)
1367 C_BlockIORequestT
*front_req
= nullptr; /* req still on front of deferred list */
1368 C_BlockIORequestT
*allocated_req
= nullptr; /* req that was allocated, and is now off the list */
1369 bool allocated
= false; /* front_req allocate succeeded */
1370 bool cleared_dispatching_flag
= false;
1372 /* If we can't become the dispatcher, we'll exit */
1374 std::lock_guard
locker(m_lock
);
1375 if (m_dispatching_deferred_ops
||
1376 !m_deferred_ios
.size()) {
1379 m_dispatching_deferred_ops
= true;
1382 /* There are ops to dispatch, and this should be the only thread dispatching them */
1384 std::lock_guard
deferred_dispatch(m_deferred_dispatch_lock
);
1387 std::lock_guard
locker(m_lock
);
1388 ceph_assert(m_dispatching_deferred_ops
);
1390 /* On the 2..n-1 th time we get lock, front_req->alloc_resources() will
1391 * have succeeded, and we'll need to pop it off the deferred ops list
1393 ceph_assert(front_req
);
1394 ceph_assert(!allocated_req
);
1395 m_deferred_ios
.pop_front();
1396 allocated_req
= front_req
;
1397 front_req
= nullptr;
1400 ceph_assert(!allocated
);
1401 if (!allocated
&& front_req
) {
1402 /* front_req->alloc_resources() failed on the last iteration.
1403 * We'll stop dispatching. */
1405 front_req
= nullptr;
1406 ceph_assert(!cleared_dispatching_flag
);
1407 m_dispatching_deferred_ops
= false;
1408 cleared_dispatching_flag
= true;
1410 ceph_assert(!front_req
);
1411 if (m_deferred_ios
.size()) {
1412 /* New allocation candidate */
1413 front_req
= m_deferred_ios
.front();
1415 ceph_assert(!cleared_dispatching_flag
);
1416 m_dispatching_deferred_ops
= false;
1417 cleared_dispatching_flag
= true;
1421 /* Try allocating for front_req before we decide what to do with allocated_req
1424 ceph_assert(!cleared_dispatching_flag
);
1425 allocated
= front_req
->alloc_resources();
1427 if (allocated_req
&& front_req
&& allocated
) {
1428 /* Push dispatch of the first allocated req to a wq */
1429 m_work_queue
.queue(new LambdaContext(
1430 [allocated_req
](int r
) {
1431 allocated_req
->dispatch();
1433 allocated_req
= nullptr;
1435 ceph_assert(!(allocated_req
&& front_req
&& allocated
));
1437 /* Continue while we're still considering the front of the deferred ops list */
1438 } while (front_req
);
1439 ceph_assert(!allocated
);
1441 ceph_assert(cleared_dispatching_flag
);
1443 /* If any deferred requests were allocated, the last one will still be in allocated_req */
1444 if (allocated_req
) {
1445 allocated_req
->dispatch();
1450 * Returns the lanes used by this write, and attempts to dispatch the next
1453 template <typename I
>
1454 void AbstractWriteLog
<I
>::release_write_lanes(C_BlockIORequestT
*req
)
1457 std::lock_guard
locker(m_lock
);
1458 m_free_lanes
+= req
->image_extents
.size();
1460 dispatch_deferred_writes();
1464 * Attempts to allocate log resources for a write. Write is dispatched if
1465 * resources are available, or queued if they aren't.
1467 template <typename I
>
1468 void AbstractWriteLog
<I
>::alloc_and_dispatch_io_req(C_BlockIORequestT
*req
)
1470 bool dispatch_here
= false;
1473 /* If there are already deferred writes, queue behind them for resources */
1475 std::lock_guard
locker(m_lock
);
1476 dispatch_here
= m_deferred_ios
.empty();
1477 // Only flush req's total_bytes is the max uint64
1478 if (req
->image_extents_summary
.total_bytes
==
1479 std::numeric_limits
<uint64_t>::max() &&
1480 static_cast<C_FlushRequestT
*>(req
)->internal
== true) {
1481 dispatch_here
= true;
1484 if (dispatch_here
) {
1485 dispatch_here
= req
->alloc_resources();
1487 if (dispatch_here
) {
1488 ldout(m_image_ctx
.cct
, 20) << "dispatching" << dendl
;
1493 std::lock_guard
locker(m_lock
);
1494 m_deferred_ios
.push_back(req
);
1496 ldout(m_image_ctx
.cct
, 20) << "deferred IOs: " << m_deferred_ios
.size() << dendl
;
1497 dispatch_deferred_writes();
1502 template <typename I
>
1503 bool AbstractWriteLog
<I
>::check_allocation(
1504 C_BlockIORequestT
*req
, uint64_t bytes_cached
, uint64_t bytes_dirtied
,
1505 uint64_t bytes_allocated
, uint32_t num_lanes
, uint32_t num_log_entries
,
1506 uint32_t num_unpublished_reserves
) {
1507 bool alloc_succeeds
= true;
1508 bool no_space
= false;
1510 std::lock_guard
locker(m_lock
);
1511 if (m_free_lanes
< num_lanes
) {
1512 ldout(m_image_ctx
.cct
, 20) << "not enough free lanes (need "
1514 << ", have " << m_free_lanes
<< ") "
1516 alloc_succeeds
= false;
1517 /* This isn't considered a "no space" alloc fail. Lanes are a throttling mechanism. */
1519 if (m_free_log_entries
< num_log_entries
) {
1520 ldout(m_image_ctx
.cct
, 20) << "not enough free entries (need "
1522 << ", have " << m_free_log_entries
<< ") "
1524 alloc_succeeds
= false;
1525 no_space
= true; /* Entries must be retired */
1527 /* Don't attempt buffer allocate if we've exceeded the "full" threshold */
1528 if (m_bytes_allocated
+ bytes_allocated
> m_bytes_allocated_cap
) {
1529 ldout(m_image_ctx
.cct
, 20) << "Waiting for allocation cap (cap="
1530 << m_bytes_allocated_cap
1531 << ", allocated=" << m_bytes_allocated
1532 << ") in write [" << *req
<< "]" << dendl
;
1533 alloc_succeeds
= false;
1534 no_space
= true; /* Entries must be retired */
1538 if (alloc_succeeds
) {
1539 reserve_cache(req
, alloc_succeeds
, no_space
);
1542 if (alloc_succeeds
) {
1543 std::lock_guard
locker(m_lock
);
1544 /* We need one free log entry per extent (each is a separate entry), and
1545 * one free "lane" for remote replication. */
1546 if ((m_free_lanes
>= num_lanes
) &&
1547 (m_free_log_entries
>= num_log_entries
) &&
1548 (m_bytes_allocated_cap
>= m_bytes_allocated
+ bytes_allocated
)) {
1549 m_free_lanes
-= num_lanes
;
1550 m_free_log_entries
-= num_log_entries
;
1551 m_unpublished_reserves
+= num_unpublished_reserves
;
1552 m_bytes_allocated
+= bytes_allocated
;
1553 m_bytes_cached
+= bytes_cached
;
1554 m_bytes_dirty
+= bytes_dirtied
;
1556 alloc_succeeds
= false;
1560 if (!alloc_succeeds
&& no_space
) {
1561 /* Expedite flushing and/or retiring */
1562 std::lock_guard
locker(m_lock
);
1563 m_alloc_failed_since_retire
= true;
1564 m_last_alloc_fail
= ceph_clock_now();
1567 return alloc_succeeds
;
1570 template <typename I
>
1571 C_FlushRequest
<AbstractWriteLog
<I
>>* AbstractWriteLog
<I
>::make_flush_req(Context
*on_finish
) {
1572 utime_t flush_begins
= ceph_clock_now();
1575 new C_FlushRequestT(*this, flush_begins
, Extents({whole_volume_extent()}),
1576 std::move(bl
), 0, m_lock
, m_perfcounter
, on_finish
);
1581 template <typename I
>
1582 void AbstractWriteLog
<I
>::wake_up() {
1583 CephContext
*cct
= m_image_ctx
.cct
;
1584 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
1586 if (!m_wake_up_enabled
) {
1587 // wake_up is disabled during shutdown after flushing completes
1588 ldout(m_image_ctx
.cct
, 6) << "deferred processing disabled" << dendl
;
1592 if (m_wake_up_requested
&& m_wake_up_scheduled
) {
1596 ldout(cct
, 20) << dendl
;
1598 /* Wake-up can be requested while it's already scheduled */
1599 m_wake_up_requested
= true;
1601 /* Wake-up cannot be scheduled if it's already scheduled */
1602 if (m_wake_up_scheduled
) {
1605 m_wake_up_scheduled
= true;
1606 m_async_process_work
++;
1607 m_async_op_tracker
.start_op();
1608 m_work_queue
.queue(new LambdaContext(
1611 m_async_op_tracker
.finish_op();
1612 m_async_process_work
--;
1616 template <typename I
>
1617 bool AbstractWriteLog
<I
>::can_flush_entry(std::shared_ptr
<GenericLogEntry
> log_entry
) {
1618 CephContext
*cct
= m_image_ctx
.cct
;
1620 ldout(cct
, 20) << "" << dendl
;
1621 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
1623 if (m_invalidating
) {
1627 /* For OWB we can flush entries with the same sync gen number (write between
1628 * aio_flush() calls) concurrently. Here we'll consider an entry flushable if
1629 * its sync gen number is <= the lowest sync gen number carried by all the
1630 * entries currently flushing.
1632 * If the entry considered here bears a sync gen number lower than a
1633 * previously flushed entry, the application had to have submitted the write
1634 * bearing the higher gen number before the write with the lower gen number
1635 * completed. So, flushing these concurrently is OK.
1637 * If the entry considered here bears a sync gen number higher than a
1638 * currently flushing entry, the write with the lower gen number may have
1639 * completed to the application before the write with the higher sync gen
1640 * number was submitted, and the application may rely on that completion
1641 * order for volume consistency. In this case the entry will not be
1642 * considered flushable until all the entries bearing lower sync gen numbers
1646 if (m_flush_ops_in_flight
&&
1647 (log_entry
->ram_entry
.sync_gen_number
> m_lowest_flushing_sync_gen
)) {
1651 return (log_entry
->can_writeback() &&
1652 (m_flush_ops_in_flight
<= IN_FLIGHT_FLUSH_WRITE_LIMIT
) &&
1653 (m_flush_bytes_in_flight
<= IN_FLIGHT_FLUSH_BYTES_LIMIT
));
1656 template <typename I
>
1657 void AbstractWriteLog
<I
>::detain_flush_guard_request(std::shared_ptr
<GenericLogEntry
> log_entry
,
1658 GuardedRequestFunctionContext
*guarded_ctx
) {
1659 ldout(m_image_ctx
.cct
, 20) << dendl
;
1662 if (log_entry
->is_sync_point()) {
1663 extent
= block_extent(whole_volume_extent());
1665 extent
= log_entry
->ram_entry
.block_extent();
1668 auto req
= GuardedRequest(extent
, guarded_ctx
, false);
1669 BlockGuardCell
*cell
= nullptr;
1672 std::lock_guard
locker(m_flush_guard_lock
);
1673 m_flush_guard
.detain(req
.block_extent
, &req
, &cell
);
1676 req
.guard_ctx
->cell
= cell
;
1677 m_image_ctx
.op_work_queue
->queue(req
.guard_ctx
, 0);
1681 template <typename I
>
1682 Context
* AbstractWriteLog
<I
>::construct_flush_entry(std::shared_ptr
<GenericLogEntry
> log_entry
,
1683 bool invalidating
) {
1684 ldout(m_image_ctx
.cct
, 20) << "" << dendl
;
1686 /* Flush write completion action */
1687 utime_t writeback_start_time
= ceph_clock_now();
1688 Context
*ctx
= new LambdaContext(
1689 [this, log_entry
, writeback_start_time
, invalidating
](int r
) {
1690 utime_t writeback_comp_time
= ceph_clock_now();
1691 m_perfcounter
->tinc(l_librbd_pwl_writeback_latency
,
1692 writeback_comp_time
- writeback_start_time
);
1694 std::lock_guard
locker(m_lock
);
1696 lderr(m_image_ctx
.cct
) << "failed to flush log entry"
1697 << cpp_strerror(r
) << dendl
;
1698 m_dirty_log_entries
.push_front(log_entry
);
1700 ceph_assert(m_bytes_dirty
>= log_entry
->bytes_dirty());
1701 log_entry
->set_flushed(true);
1702 m_bytes_dirty
-= log_entry
->bytes_dirty();
1703 sync_point_writer_flushed(log_entry
->get_sync_point_entry());
1704 ldout(m_image_ctx
.cct
, 20) << "flushed: " << log_entry
1705 << " invalidating=" << invalidating
1708 m_flush_ops_in_flight
-= 1;
1709 m_flush_bytes_in_flight
-= log_entry
->ram_entry
.write_bytes
;
1713 /* Flush through lower cache before completing */
1714 ctx
= new LambdaContext(
1715 [this, ctx
, log_entry
](int r
) {
1718 WriteLogGuard::BlockOperations block_reqs
;
1719 BlockGuardCell
*detained_cell
= nullptr;
1721 std::lock_guard locker
{m_flush_guard_lock
};
1722 m_flush_guard
.release(log_entry
->m_cell
, &block_reqs
);
1724 for (auto &req
: block_reqs
) {
1725 m_flush_guard
.detain(req
.block_extent
, &req
, &detained_cell
);
1726 if (detained_cell
) {
1727 req
.guard_ctx
->cell
= detained_cell
;
1728 m_image_ctx
.op_work_queue
->queue(req
.guard_ctx
, 0);
1734 lderr(m_image_ctx
.cct
) << "failed to flush log entry"
1735 << cpp_strerror(r
) << dendl
;
1738 m_image_writeback
.aio_flush(io::FLUSH_SOURCE_WRITEBACK
, ctx
);
1744 template <typename I
>
1745 void AbstractWriteLog
<I
>::process_writeback_dirty_entries() {
1746 CephContext
*cct
= m_image_ctx
.cct
;
1747 bool all_clean
= false;
1749 bool has_write_entry
= false;
1750 bool need_update_state
= false;
1752 ldout(cct
, 20) << "Look for dirty entries" << dendl
;
1754 DeferredContexts post_unlock
;
1755 GenericLogEntries entries_to_flush
;
1757 std::shared_lock
entry_reader_locker(m_entry_reader_lock
);
1758 std::lock_guard
locker(m_lock
);
1759 while (flushed
< IN_FLIGHT_FLUSH_WRITE_LIMIT
) {
1760 if (m_shutting_down
) {
1761 ldout(cct
, 5) << "Flush during shutdown supressed" << dendl
;
1762 /* Do flush complete only when all flush ops are finished */
1763 all_clean
= !m_flush_ops_in_flight
;
1766 if (m_dirty_log_entries
.empty()) {
1767 ldout(cct
, 20) << "Nothing new to flush" << dendl
;
1768 /* Do flush complete only when all flush ops are finished */
1769 all_clean
= !m_flush_ops_in_flight
;
1770 if (!m_cache_state
->clean
&& all_clean
) {
1771 m_cache_state
->clean
= true;
1772 update_image_cache_state();
1773 need_update_state
= true;
1778 auto candidate
= m_dirty_log_entries
.front();
1779 bool flushable
= can_flush_entry(candidate
);
1781 entries_to_flush
.push_back(candidate
);
1783 if (!has_write_entry
)
1784 has_write_entry
= candidate
->is_write_entry();
1785 m_dirty_log_entries
.pop_front();
1787 // To track candidate, we should add m_flush_ops_in_flight in here
1789 if (!m_flush_ops_in_flight
||
1790 (candidate
->ram_entry
.sync_gen_number
< m_lowest_flushing_sync_gen
)) {
1791 m_lowest_flushing_sync_gen
= candidate
->ram_entry
.sync_gen_number
;
1793 m_flush_ops_in_flight
+= 1;
1794 /* For write same this is the bytes affected by the flush op, not the bytes transferred */
1795 m_flush_bytes_in_flight
+= candidate
->ram_entry
.write_bytes
;
1798 ldout(cct
, 20) << "Next dirty entry isn't flushable yet" << dendl
;
1803 construct_flush_entries(entries_to_flush
, post_unlock
, has_write_entry
);
1805 if (need_update_state
) {
1806 std::unique_lock
locker(m_lock
);
1807 write_image_cache_state(locker
);
1811 /* All flushing complete, drain outside lock */
1812 Contexts flush_contexts
;
1814 std::lock_guard
locker(m_lock
);
1815 flush_contexts
.swap(m_flush_complete_contexts
);
1817 finish_contexts(m_image_ctx
.cct
, flush_contexts
, 0);
1821 /* Returns true if the specified SyncPointLogEntry is considered flushed, and
1822 * the log will be updated to reflect this. */
1823 template <typename I
>
1824 bool AbstractWriteLog
<I
>::handle_flushed_sync_point(std::shared_ptr
<SyncPointLogEntry
> log_entry
)
1826 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
1827 ceph_assert(log_entry
);
1829 if ((log_entry
->writes_flushed
== log_entry
->writes
) &&
1830 log_entry
->completed
&& log_entry
->prior_sync_point_flushed
&&
1831 log_entry
->next_sync_point_entry
) {
1832 ldout(m_image_ctx
.cct
, 20) << "All writes flushed up to sync point="
1833 << *log_entry
<< dendl
;
1834 log_entry
->next_sync_point_entry
->prior_sync_point_flushed
= true;
1835 /* Don't move the flushed sync gen num backwards. */
1836 if (m_flushed_sync_gen
< log_entry
->ram_entry
.sync_gen_number
) {
1837 m_flushed_sync_gen
= log_entry
->ram_entry
.sync_gen_number
;
1839 m_async_op_tracker
.start_op();
1840 m_work_queue
.queue(new LambdaContext(
1841 [this, next
= std::move(log_entry
->next_sync_point_entry
)](int r
) {
1842 bool handled_by_next
;
1844 std::lock_guard
locker(m_lock
);
1845 handled_by_next
= handle_flushed_sync_point(std::move(next
));
1847 if (!handled_by_next
) {
1848 persist_last_flushed_sync_gen();
1850 m_async_op_tracker
.finish_op();
1857 template <typename I
>
1858 void AbstractWriteLog
<I
>::sync_point_writer_flushed(std::shared_ptr
<SyncPointLogEntry
> log_entry
)
1860 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
1861 ceph_assert(log_entry
);
1862 log_entry
->writes_flushed
++;
1864 /* If this entry might be completely flushed, look closer */
1865 if ((log_entry
->writes_flushed
== log_entry
->writes
) && log_entry
->completed
) {
1866 ldout(m_image_ctx
.cct
, 15) << "All writes flushed for sync point="
1867 << *log_entry
<< dendl
;
1868 handle_flushed_sync_point(log_entry
);
1872 /* Make a new sync point and flush the previous during initialization, when there may or may
1873 * not be a previous sync point */
1874 template <typename I
>
1875 void AbstractWriteLog
<I
>::init_flush_new_sync_point(DeferredContexts
&later
) {
1876 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
1877 ceph_assert(!m_initialized
); /* Don't use this after init */
1879 if (!m_current_sync_point
) {
1880 /* First sync point since start */
1881 new_sync_point(later
);
1883 flush_new_sync_point(nullptr, later
);
1888 * Begin a new sync point
1890 template <typename I
>
1891 void AbstractWriteLog
<I
>::new_sync_point(DeferredContexts
&later
) {
1892 CephContext
*cct
= m_image_ctx
.cct
;
1893 std::shared_ptr
<SyncPoint
> old_sync_point
= m_current_sync_point
;
1894 std::shared_ptr
<SyncPoint
> new_sync_point
;
1895 ldout(cct
, 20) << dendl
;
1897 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
1899 /* The first time this is called, if this is a newly created log,
1900 * this makes the first sync gen number we'll use 1. On the first
1901 * call for a re-opened log m_current_sync_gen will be the highest
1902 * gen number from all the sync point entries found in the re-opened
1903 * log, and this advances to the next sync gen number. */
1904 ++m_current_sync_gen
;
1906 new_sync_point
= std::make_shared
<SyncPoint
>(m_current_sync_gen
, cct
);
1907 m_current_sync_point
= new_sync_point
;
1909 /* If this log has been re-opened, old_sync_point will initially be
1910 * nullptr, but m_current_sync_gen may not be zero. */
1911 if (old_sync_point
) {
1912 new_sync_point
->setup_earlier_sync_point(old_sync_point
, m_last_op_sequence_num
);
1913 m_perfcounter
->hinc(l_librbd_pwl_syncpoint_hist
,
1914 old_sync_point
->log_entry
->writes
,
1915 old_sync_point
->log_entry
->bytes
);
1916 /* This sync point will acquire no more sub-ops. Activation needs
1917 * to acquire m_lock, so defer to later*/
1918 later
.add(new LambdaContext(
1919 [old_sync_point
](int r
) {
1920 old_sync_point
->prior_persisted_gather_activate();
1924 new_sync_point
->prior_persisted_gather_set_finisher();
1926 if (old_sync_point
) {
1927 ldout(cct
,6) << "new sync point = [" << *m_current_sync_point
1928 << "], prior = [" << *old_sync_point
<< "]" << dendl
;
1930 ldout(cct
,6) << "first sync point = [" << *m_current_sync_point
1935 template <typename I
>
1936 void AbstractWriteLog
<I
>::flush_new_sync_point(C_FlushRequestT
*flush_req
,
1937 DeferredContexts
&later
) {
1938 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
1941 m_async_null_flush_finish
++;
1942 m_async_op_tracker
.start_op();
1943 Context
*flush_ctx
= new LambdaContext([this](int r
) {
1944 m_async_null_flush_finish
--;
1945 m_async_op_tracker
.finish_op();
1947 flush_req
= make_flush_req(flush_ctx
);
1948 flush_req
->internal
= true;
1951 /* Add a new sync point. */
1952 new_sync_point(later
);
1953 std::shared_ptr
<SyncPoint
> to_append
= m_current_sync_point
->earlier_sync_point
;
1954 ceph_assert(to_append
);
1956 /* This flush request will append/persist the (now) previous sync point */
1957 flush_req
->to_append
= to_append
;
1959 /* When the m_sync_point_persist Gather completes this sync point can be
1960 * appended. The only sub for this Gather is the finisher Context for
1961 * m_prior_log_entries_persisted, which records the result of the Gather in
1962 * the sync point, and completes. TODO: Do we still need both of these
1964 Context
* ctx
= new LambdaContext([this, flush_req
](int r
) {
1965 ldout(m_image_ctx
.cct
, 20) << "Flush req=" << flush_req
1966 << " sync point =" << flush_req
->to_append
1967 << ". Ready to persist." << dendl
;
1968 alloc_and_dispatch_io_req(flush_req
);
1970 to_append
->persist_gather_set_finisher(ctx
);
1972 /* The m_sync_point_persist Gather has all the subs it will ever have, and
1973 * now has its finisher. If the sub is already complete, activation will
1974 * complete the Gather. The finisher will acquire m_lock, so we'll activate
1975 * this when we release m_lock.*/
1976 later
.add(new LambdaContext([to_append
](int r
) {
1977 to_append
->persist_gather_activate();
1980 /* The flush request completes when the sync point persists */
1981 to_append
->add_in_on_persisted_ctxs(flush_req
);
1984 template <typename I
>
1985 void AbstractWriteLog
<I
>::flush_new_sync_point_if_needed(C_FlushRequestT
*flush_req
,
1986 DeferredContexts
&later
) {
1987 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
1989 /* If there have been writes since the last sync point ... */
1990 if (m_current_sync_point
->log_entry
->writes
) {
1991 flush_new_sync_point(flush_req
, later
);
1993 /* There have been no writes to the current sync point. */
1994 if (m_current_sync_point
->earlier_sync_point
) {
1995 /* If previous sync point hasn't completed, complete this flush
1996 * with the earlier sync point. No alloc or dispatch needed. */
1997 m_current_sync_point
->earlier_sync_point
->on_sync_point_persisted
.push_back(flush_req
);
1999 /* The previous sync point has already completed and been
2000 * appended. The current sync point has no writes, so this flush
2001 * has nothing to wait for. This flush completes now. */
2002 later
.add(flush_req
);
2008 * RWL internal flush - will actually flush the RWL.
2010 * User flushes should arrive at aio_flush(), and only flush prior
2011 * writes to all log replicas.
2013 * Librbd internal flushes will arrive at flush(invalidate=false,
2014 * discard=false), and traverse the block guard to ensure in-flight writes are
2017 template <typename I
>
2018 void AbstractWriteLog
<I
>::flush_dirty_entries(Context
*on_finish
) {
2019 CephContext
*cct
= m_image_ctx
.cct
;
2025 std::unique_lock
locker(m_lock
);
2026 flushing
= (0 != m_flush_ops_in_flight
);
2027 all_clean
= m_dirty_log_entries
.empty();
2028 stop_flushing
= (m_shutting_down
);
2029 if (!m_cache_state
->clean
&& all_clean
&& !flushing
) {
2030 m_cache_state
->clean
= true;
2031 update_image_cache_state();
2032 write_image_cache_state(locker
);
2036 if (!flushing
&& (all_clean
|| stop_flushing
)) {
2037 /* Complete without holding m_lock */
2039 ldout(cct
, 20) << "no dirty entries" << dendl
;
2041 ldout(cct
, 5) << "flush during shutdown suppressed" << dendl
;
2043 on_finish
->complete(0);
2046 ldout(cct
, 5) << "flush ops still in progress" << dendl
;
2048 ldout(cct
, 20) << "dirty entries remain" << dendl
;
2050 std::lock_guard
locker(m_lock
);
2051 /* on_finish can't be completed yet */
2052 m_flush_complete_contexts
.push_back(new LambdaContext(
2053 [this, on_finish
](int r
) {
2054 flush_dirty_entries(on_finish
);
2060 template <typename I
>
2061 void AbstractWriteLog
<I
>::internal_flush(bool invalidate
, Context
*on_finish
) {
2062 ldout(m_image_ctx
.cct
, 20) << "invalidate=" << invalidate
<< dendl
;
2064 if (m_perfcounter
) {
2066 m_perfcounter
->inc(l_librbd_pwl_invalidate_cache
, 1);
2068 m_perfcounter
->inc(l_librbd_pwl_internal_flush
, 1);
2072 /* May be called even if initialization fails */
2073 if (!m_initialized
) {
2074 ldout(m_image_ctx
.cct
, 05) << "never initialized" << dendl
;
2075 /* Deadlock if completed here */
2076 m_image_ctx
.op_work_queue
->queue(on_finish
, 0);
2080 /* Flush/invalidate must pass through block guard to ensure all layers of
2081 * cache are consistently flush/invalidated. This ensures no in-flight write leaves
2082 * some layers with valid regions, which may later produce inconsistent read
2084 GuardedRequestFunctionContext
*guarded_ctx
=
2085 new GuardedRequestFunctionContext(
2086 [this, on_finish
, invalidate
](GuardedRequestFunctionContext
&guard_ctx
) {
2087 DeferredContexts on_exit
;
2088 ldout(m_image_ctx
.cct
, 20) << "cell=" << guard_ctx
.cell
<< dendl
;
2089 ceph_assert(guard_ctx
.cell
);
2091 Context
*ctx
= new LambdaContext(
2092 [this, cell
=guard_ctx
.cell
, invalidate
, on_finish
](int r
) {
2093 std::lock_guard
locker(m_lock
);
2094 m_invalidating
= false;
2095 ldout(m_image_ctx
.cct
, 6) << "Done flush/invalidating (invalidate="
2096 << invalidate
<< ")" << dendl
;
2097 if (m_log_entries
.size()) {
2098 ldout(m_image_ctx
.cct
, 1) << "m_log_entries.size()="
2099 << m_log_entries
.size()
2100 << ", front()=" << *m_log_entries
.front()
2104 ceph_assert(m_log_entries
.size() == 0);
2106 ceph_assert(m_dirty_log_entries
.size() == 0);
2107 m_image_ctx
.op_work_queue
->queue(on_finish
, r
);
2108 release_guarded_request(cell
);
2110 ctx
= new LambdaContext(
2111 [this, ctx
, invalidate
](int r
) {
2112 Context
*next_ctx
= ctx
;
2113 ldout(m_image_ctx
.cct
, 6) << "flush_dirty_entries finished" << dendl
;
2115 /* Override on_finish status with this error */
2116 next_ctx
= new LambdaContext([r
, ctx
](int _r
) {
2122 std::lock_guard
locker(m_lock
);
2123 ceph_assert(m_dirty_log_entries
.size() == 0);
2124 ceph_assert(!m_invalidating
);
2125 ldout(m_image_ctx
.cct
, 6) << "Invalidating" << dendl
;
2126 m_invalidating
= true;
2128 /* Discards all RWL entries */
2129 while (retire_entries(MAX_ALLOC_PER_TRANSACTION
)) { }
2130 next_ctx
->complete(0);
2133 std::lock_guard
locker(m_lock
);
2134 ceph_assert(m_dirty_log_entries
.size() == 0);
2135 ceph_assert(!m_invalidating
);
2137 m_image_writeback
.aio_flush(io::FLUSH_SOURCE_WRITEBACK
, next_ctx
);
2140 ctx
= new LambdaContext(
2141 [this, ctx
](int r
) {
2142 flush_dirty_entries(ctx
);
2144 std::lock_guard
locker(m_lock
);
2145 /* Even if we're throwing everything away, but we want the last entry to
2146 * be a sync point so we can cleanly resume.
2148 * Also, the blockguard only guarantees the replication of this op
2149 * can't overlap with prior ops. It doesn't guarantee those are all
2150 * completed and eligible for flush & retire, which we require here.
2152 auto flush_req
= make_flush_req(ctx
);
2153 flush_new_sync_point_if_needed(flush_req
, on_exit
);
2155 detain_guarded_request(nullptr, guarded_ctx
, true);
2158 template <typename I
>
2159 void AbstractWriteLog
<I
>::add_into_log_map(GenericWriteLogEntries
&log_entries
,
2160 C_BlockIORequestT
*req
) {
2162 m_blocks_to_log_entries
.add_log_entries(log_entries
);
2165 template <typename I
>
2166 bool AbstractWriteLog
<I
>::can_retire_entry(std::shared_ptr
<GenericLogEntry
> log_entry
) {
2167 CephContext
*cct
= m_image_ctx
.cct
;
2169 ldout(cct
, 20) << dendl
;
2170 ceph_assert(ceph_mutex_is_locked_by_me(m_lock
));
2171 ceph_assert(log_entry
);
2172 return log_entry
->can_retire();
2175 template <typename I
>
2176 void AbstractWriteLog
<I
>::check_image_cache_state_clean() {
2177 ceph_assert(m_deferred_ios
.empty());
2178 ceph_assert(m_ops_to_append
.empty());
2179 ceph_assert(m_async_flush_ops
== 0);
2180 ceph_assert(m_async_append_ops
== 0);
2181 ceph_assert(m_dirty_log_entries
.empty());
2182 ceph_assert(m_ops_to_flush
.empty());
2183 ceph_assert(m_flush_ops_in_flight
== 0);
2184 ceph_assert(m_flush_bytes_in_flight
== 0);
2185 ceph_assert(m_bytes_dirty
== 0);
2186 ceph_assert(m_work_queue
.empty());
2190 } // namespace cache
2191 } // namespace librbd
2193 template class librbd::cache::pwl::AbstractWriteLog
<librbd::ImageCtx
>;